diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index ccb5489d..c8cf99ed 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -174,6 +174,10 @@ def _do_notifier(cfg, watcher, callback): logException() +def _remote_set_node_attributes(tenant, attribmap, autocreate): + ConfigManager(tenant)._set_node_attributes(attribmap, autocreate) + + def logException(): global tracelog if tracelog is None: @@ -213,6 +217,11 @@ def init_masterkey(password=None): password=password)) +def _push_rpc(stream, payload): + stream.sendall(struct.pack('!Q', len(payload))) + stream.sendall(payload) + + def decrypt_value(cryptvalue, key=None, integritykey=None): @@ -394,7 +403,7 @@ def follow_channel(channel): cfgleader = channel msg = channel.recv(8) while msg: - sz = struct.unpack('!Q', msg) + sz = struct.unpack('!Q', msg)[0] if sz != 0: rpc = '' while len(rpc) < sz: @@ -403,7 +412,7 @@ def follow_channel(channel): raise Exception('Truncated message error') rpc += nrpc rpc = cPickle.loads(rpc) - locals()[rpc['function']](*rpc['args']) + globals()[rpc['function']](*rpc['args']) _txcount = rpc['txcount'] if 'xid' in rpc: _pendingchangesets[rpc['xid']].send() @@ -1377,10 +1386,6 @@ class ConfigManager(object): _pendingchangesets[xid].wait(0) return - def _push_rpc(self, stream, payload): - stream.sendall(struct.pack('!Q', len(payload))) - stream.sendall(payload) - def set_follower_node_attributes(self, attribmap, autocreate): global _txcount _txcount += 1 @@ -1388,11 +1393,11 @@ class ConfigManager(object): # the leader counts in addition to registered streams raise Exception("collective does not have quorum") pushes = eventlet.GreenPool() - payload = cPickle.dumps({'function': '_set_node_attributes', - 'args': (attribmap, autocreate), + payload = cPickle.dumps({'function': '_remote_set_node_attributes', + 'args': (self.tenant, attribmap, autocreate), 'txcount': _txcount}) - for res in pushes.imap(_push_rpc, - [(s, payload) for s in cfgstreams]): + for res in pushes.starmap( + _push_rpc, [(cfgstreams[s], payload) for s in cfgstreams]): print(repr(res)) def set_node_attributes(self, attribmap, autocreate=False):