2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-01-16 20:57:53 +00:00

Allow slave collective drones to set

It works (once), but needs xid fix.
This commit is contained in:
Jarrod Johnson 2018-05-08 16:53:59 -04:00
parent e5f553801b
commit c962d10222
2 changed files with 41 additions and 18 deletions

View File

@ -157,8 +157,9 @@ def handle_connection(connection, cert, request, local=False):
cfgdata = cfm.ConfigManager(None)._dump_to_json()
tlvdata.send(connection, {'dbsize': len(cfgdata)})
connection.sendall(cfgdata)
#tlvdata.send(connection, {'tenants': 0}) # skip the tenants for now, so far unused anyway
cfm.register_config_listener(drone, connection)
#tlvdata.send(connection, {'tenants': 0}) # skip the tenants for now,
# so far unused anyway
cfm.relay_slaved_requests(drone, connection)
# ok, we have a connecting member whose certificate checks out
# He needs to bootstrap his configuration and subscribe it to updates

View File

@ -1,4 +1,4 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
7# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 IBM Corporation
# Copyright 2015-2018 Lenovo
@ -174,8 +174,15 @@ def _do_notifier(cfg, watcher, callback):
logException()
def _remote_set_node_attributes(tenant, attribmap, autocreate):
ConfigManager(tenant)._set_node_attributes(attribmap, autocreate)
def _rpc_master_set_node_attributes(tenant, attribmap, autocreate, xid):
c = ConfigManager(tenant)
c.send_to_followers(xid, '_rpc_set_node_attributes', tenant,
attribmap, autocreate)
c._true_set_node_attributes(attribmap, autocreate)
def _rpc_set_node_attributes(tenant, attribmap, autocreate):
ConfigManager(tenant)._true_set_node_attributes(attribmap, autocreate)
def logException():
@ -384,8 +391,22 @@ def set_global(globalname, value):
ConfigManager._bg_sync_to_file()
cfgstreams = {}
def register_config_listener(name, listener):
def relay_slaved_requests(name, listener):
cfgstreams[name] = listener
msg = listener.recv(8)
while msg:
sz = struct.unpack('!Q', msg)[0]
if sz != 0:
rpc = ''
while len(rpc) < sz:
nrpc = listener.recv(sz - len(rpc))
if not nrpc:
raise Exception('Truncated client error')
rpc += nrpc
rpc = cPickle.loads(rpc)
globals()[rpc['function']](*rpc['args'])
msg = listener.recv(8)
def clear_configuration():
global _cfgstore
@ -414,7 +435,7 @@ def follow_channel(channel):
rpc = cPickle.loads(rpc)
globals()[rpc['function']](*rpc['args'])
_txcount = rpc['txcount']
if 'xid' in rpc:
if 'xid' in rpc and rpc['xid']:
_pendingchangesets[rpc['xid']].send()
msg = channel.recv(8)
@ -1376,37 +1397,38 @@ class ConfigManager(object):
while xid in _pendingchangesets:
xid = os.urandom(8)
_pendingchangesets[xid] = event.Event()
rpcpayload = cPickle.dumps({'function': 'set_node_attributes',
'args': (attribmap, autocreate),
'xid': xid})
rpcpayload = cPickle.dumps({'function': '_rpc_master_set_node_attributes',
'args': (self.tenant, attribmap,
autocreate, xid)})
rpclen = len(rpcpayload)
cfgleader.sendall(struct.pack('!Q', rpclen))
cfgleader.sendall(rpcpayload)
_pendingchangesets[xid].wait(0)
del _pendingchangesets[xid]
return
def set_follower_node_attributes(self, attribmap, autocreate):
def send_to_followers(self, xid, fnname, *args):
global _txcount
_txcount += 1
if len(cfgstreams) < (len(_cfgstore['collective']) // 2) :
# the leader counts in addition to registered streams
raise Exception("collective does not have quorum")
pushes = eventlet.GreenPool()
payload = cPickle.dumps({'function': '_remote_set_node_attributes',
'args': (self.tenant, attribmap, autocreate),
'txcount': _txcount})
payload = cPickle.dumps({'function': fnname, 'args': args,
'txcount': _txcount, 'xid': xid})
for res in pushes.starmap(
_push_rpc, [(cfgstreams[s], payload) for s in cfgstreams]):
print(repr(res))
pass
def set_node_attributes(self, attribmap, autocreate=False):
if cfgleader: # currently config slave to another
return self.set_leader_node_attributes(attribmap, autocreate)
if cfgstreams:
self.set_follower_node_attributes(attribmap, autocreate)
self._set_node_attributes(attribmap, autocreate)
self.send_to_followers(None, '_rpc_set_node_attributes',
self.tenant, attribmap, autocreate)
self._true_set_node_attributes(attribmap, autocreate)
def _set_node_attributes(self, attribmap, autocreate):
def _true_set_node_attributes(self, attribmap, autocreate):
# TODO(jbjohnso): multi mgr support, here if we have peers,
# pickle the arguments and fire them off in eventlet
# flows to peers, all should have the same result