diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 15d69d63..6bfb532b 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -157,8 +157,9 @@ def handle_connection(connection, cert, request, local=False): cfgdata = cfm.ConfigManager(None)._dump_to_json() tlvdata.send(connection, {'dbsize': len(cfgdata)}) connection.sendall(cfgdata) - #tlvdata.send(connection, {'tenants': 0}) # skip the tenants for now, so far unused anyway - cfm.register_config_listener(drone, connection) + #tlvdata.send(connection, {'tenants': 0}) # skip the tenants for now, + # so far unused anyway + cfm.relay_slaved_requests(drone, connection) # ok, we have a connecting member whose certificate checks out # He needs to bootstrap his configuration and subscribe it to updates diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 8f0db2b9..5f92c591 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -1,4 +1,4 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 +7# vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright 2014 IBM Corporation # Copyright 2015-2018 Lenovo @@ -174,8 +174,15 @@ def _do_notifier(cfg, watcher, callback): logException() -def _remote_set_node_attributes(tenant, attribmap, autocreate): - ConfigManager(tenant)._set_node_attributes(attribmap, autocreate) +def _rpc_master_set_node_attributes(tenant, attribmap, autocreate, xid): + c = ConfigManager(tenant) + c.send_to_followers(xid, '_rpc_set_node_attributes', tenant, + attribmap, autocreate) + c._true_set_node_attributes(attribmap, autocreate) + + +def _rpc_set_node_attributes(tenant, attribmap, autocreate): + ConfigManager(tenant)._true_set_node_attributes(attribmap, autocreate) def logException(): @@ -384,8 +391,22 @@ def set_global(globalname, value): ConfigManager._bg_sync_to_file() cfgstreams = {} -def register_config_listener(name, listener): +def relay_slaved_requests(name, listener): cfgstreams[name] = listener + msg = listener.recv(8) + while msg: + sz = struct.unpack('!Q', msg)[0] + if sz != 0: + rpc = '' + while len(rpc) < sz: + nrpc = listener.recv(sz - len(rpc)) + if not nrpc: + raise Exception('Truncated client error') + rpc += nrpc + rpc = cPickle.loads(rpc) + globals()[rpc['function']](*rpc['args']) + msg = listener.recv(8) + def clear_configuration(): global _cfgstore @@ -414,7 +435,7 @@ def follow_channel(channel): rpc = cPickle.loads(rpc) globals()[rpc['function']](*rpc['args']) _txcount = rpc['txcount'] - if 'xid' in rpc: + if 'xid' in rpc and rpc['xid']: _pendingchangesets[rpc['xid']].send() msg = channel.recv(8) @@ -1376,37 +1397,38 @@ class ConfigManager(object): while xid in _pendingchangesets: xid = os.urandom(8) _pendingchangesets[xid] = event.Event() - rpcpayload = cPickle.dumps({'function': 'set_node_attributes', - 'args': (attribmap, autocreate), - 'xid': xid}) + rpcpayload = cPickle.dumps({'function': '_rpc_master_set_node_attributes', + 'args': (self.tenant, attribmap, + autocreate, xid)}) rpclen = len(rpcpayload) cfgleader.sendall(struct.pack('!Q', rpclen)) cfgleader.sendall(rpcpayload) _pendingchangesets[xid].wait(0) + del _pendingchangesets[xid] return - def set_follower_node_attributes(self, attribmap, autocreate): + def send_to_followers(self, xid, fnname, *args): global _txcount _txcount += 1 if len(cfgstreams) < (len(_cfgstore['collective']) // 2) : # the leader counts in addition to registered streams raise Exception("collective does not have quorum") pushes = eventlet.GreenPool() - payload = cPickle.dumps({'function': '_remote_set_node_attributes', - 'args': (self.tenant, attribmap, autocreate), - 'txcount': _txcount}) + payload = cPickle.dumps({'function': fnname, 'args': args, + 'txcount': _txcount, 'xid': xid}) for res in pushes.starmap( _push_rpc, [(cfgstreams[s], payload) for s in cfgstreams]): - print(repr(res)) + pass def set_node_attributes(self, attribmap, autocreate=False): if cfgleader: # currently config slave to another return self.set_leader_node_attributes(attribmap, autocreate) if cfgstreams: - self.set_follower_node_attributes(attribmap, autocreate) - self._set_node_attributes(attribmap, autocreate) + self.send_to_followers(None, '_rpc_set_node_attributes', + self.tenant, attribmap, autocreate) + self._true_set_node_attributes(attribmap, autocreate) - def _set_node_attributes(self, attribmap, autocreate): + def _true_set_node_attributes(self, attribmap, autocreate): # TODO(jbjohnso): multi mgr support, here if we have peers, # pickle the arguments and fire them off in eventlet # flows to peers, all should have the same result