From 81bb16476ce57e691d8c0c5d25f01566b3d1a819 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Fri, 4 May 2018 16:12:53 -0400 Subject: [PATCH] Apply changes from leader subscription --- .../confluent/collective/manager.py | 4 ++-- .../confluent/config/configmanager.py | 23 ++++++++++++++++--- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 00a17d2c..807259d0 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -69,7 +69,7 @@ def connect_to_leader(cert=None, name=None): cfm.set_global(globvar, globaldata[globvar]) cfm.ConfigManager(tenant=None)._load_from_json(dbjson) cfm.ConfigManager._bg_sync_to_file() - cfm.set_leader_channel(remote) + cfm.follow_channel(remote) def handle_connection(connection, cert, request, local=False): @@ -156,7 +156,7 @@ def handle_connection(connection, cert, request, local=False): cfgdata = cfm.ConfigManager(None)._dump_to_json() tlvdata.send(connection, {'dbsize': len(cfgdata)}) connection.sendall(cfgdata) - tlvdata.send(connection, {'tenants': 0}) # skip the tenants for now, so far unused anyway + #tlvdata.send(connection, {'tenants': 0}) # skip the tenants for now, so far unused anyway cfm.register_config_listener(drone, connection) # ok, we have a connecting member whose certificate checks out # He needs to bootstrap his configuration and subscribe it to updates diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index e2e24a9f..ccb5489d 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -389,9 +389,26 @@ def clear_configuration(): pass cfgleader = None -def set_leader_channel(channel): +def follow_channel(channel): global cfgleader cfgleader = channel + msg = channel.recv(8) + while msg: + sz = struct.unpack('!Q', msg) + if sz != 0: + rpc = '' + while len(rpc) < sz: + nrpc = channel.recv(sz - len(rpc)) + if not nrpc: + raise Exception('Truncated message error') + rpc += nrpc + rpc = cPickle.loads(rpc) + locals()[rpc['function']](*rpc['args']) + _txcount = rpc['txcount'] + if 'xid' in rpc: + _pendingchangesets[rpc['xid']].send() + + def add_collective_member(name, address, fingerprint): try: @@ -1375,7 +1392,7 @@ class ConfigManager(object): 'args': (attribmap, autocreate), 'txcount': _txcount}) for res in pushes.imap(_push_rpc, - [(payload, s) for s in cfgstreams]): + [(s, payload) for s in cfgstreams]): print(repr(res)) def set_node_attributes(self, attribmap, autocreate=False): @@ -1385,7 +1402,7 @@ class ConfigManager(object): self.set_follower_node_attributes(attribmap, autocreate) self._set_node_attributes(attribmap, autocreate) - def _set_node_attributes(self, attribmap, autocreate, xid=None): + def _set_node_attributes(self, attribmap, autocreate): # TODO(jbjohnso): multi mgr support, here if we have peers, # pickle the arguments and fire them off in eventlet # flows to peers, all should have the same result