diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 5f92c591..cf95d21a 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -174,9 +174,9 @@ def _do_notifier(cfg, watcher, callback): logException() -def _rpc_master_set_node_attributes(tenant, attribmap, autocreate, xid): +def _rpc_master_set_node_attributes(tenant, attribmap, autocreate): c = ConfigManager(tenant) - c.send_to_followers(xid, '_rpc_set_node_attributes', tenant, + c.send_to_followers('_rpc_set_node_attributes', tenant, attribmap, autocreate) c._true_set_node_attributes(attribmap, autocreate) @@ -405,6 +405,8 @@ def relay_slaved_requests(name, listener): rpc += nrpc rpc = cPickle.loads(rpc) globals()[rpc['function']](*rpc['args']) + if 'xid' in rpc: + _push_rpc(listener, cPickle.dumps({'xid': rpc['xid']})) msg = listener.recv(8) @@ -433,8 +435,10 @@ def follow_channel(channel): raise Exception('Truncated message error') rpc += nrpc rpc = cPickle.loads(rpc) - globals()[rpc['function']](*rpc['args']) - _txcount = rpc['txcount'] + if 'function' in rpc: + globals()[rpc['function']](*rpc['args']) + if 'txcount' in rpc: + _txcount = rpc['txcount'] if 'xid' in rpc and rpc['xid']: _pendingchangesets[rpc['xid']].send() msg = channel.recv(8) @@ -1399,15 +1403,15 @@ class ConfigManager(object): _pendingchangesets[xid] = event.Event() rpcpayload = cPickle.dumps({'function': '_rpc_master_set_node_attributes', 'args': (self.tenant, attribmap, - autocreate, xid)}) + autocreate), 'xid': xid}) rpclen = len(rpcpayload) cfgleader.sendall(struct.pack('!Q', rpclen)) cfgleader.sendall(rpcpayload) - _pendingchangesets[xid].wait(0) + _pendingchangesets[xid].wait() del _pendingchangesets[xid] return - def send_to_followers(self, xid, fnname, *args): + def send_to_followers(self, fnname, *args): global _txcount _txcount += 1 if len(cfgstreams) < (len(_cfgstore['collective']) // 2) : @@ -1415,7 +1419,7 @@ class ConfigManager(object): raise Exception("collective does not have quorum") pushes = eventlet.GreenPool() payload = cPickle.dumps({'function': fnname, 'args': args, - 'txcount': _txcount, 'xid': xid}) + 'txcount': _txcount}) for res in pushes.starmap( _push_rpc, [(cfgstreams[s], payload) for s in cfgstreams]): pass @@ -1424,7 +1428,7 @@ class ConfigManager(object): if cfgleader: # currently config slave to another return self.set_leader_node_attributes(attribmap, autocreate) if cfgstreams: - self.send_to_followers(None, '_rpc_set_node_attributes', + self.send_to_followers('_rpc_set_node_attributes', self.tenant, attribmap, autocreate) self._true_set_node_attributes(attribmap, autocreate)