2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-01-16 20:57:53 +00:00

Have attrib set wait on all collective members

This will mean that it is reliable that a nodeattrib ; <command>
in delegation scenarios is guaranteed to execute in order.
This commit is contained in:
Jarrod Johnson 2018-05-09 17:01:23 -04:00
parent c962d10222
commit 267d83e6e4

View File

@ -174,9 +174,9 @@ def _do_notifier(cfg, watcher, callback):
logException()
def _rpc_master_set_node_attributes(tenant, attribmap, autocreate, xid):
def _rpc_master_set_node_attributes(tenant, attribmap, autocreate):
c = ConfigManager(tenant)
c.send_to_followers(xid, '_rpc_set_node_attributes', tenant,
c.send_to_followers('_rpc_set_node_attributes', tenant,
attribmap, autocreate)
c._true_set_node_attributes(attribmap, autocreate)
@ -405,6 +405,8 @@ def relay_slaved_requests(name, listener):
rpc += nrpc
rpc = cPickle.loads(rpc)
globals()[rpc['function']](*rpc['args'])
if 'xid' in rpc:
_push_rpc(listener, cPickle.dumps({'xid': rpc['xid']}))
msg = listener.recv(8)
@ -433,8 +435,10 @@ def follow_channel(channel):
raise Exception('Truncated message error')
rpc += nrpc
rpc = cPickle.loads(rpc)
globals()[rpc['function']](*rpc['args'])
_txcount = rpc['txcount']
if 'function' in rpc:
globals()[rpc['function']](*rpc['args'])
if 'txcount' in rpc:
_txcount = rpc['txcount']
if 'xid' in rpc and rpc['xid']:
_pendingchangesets[rpc['xid']].send()
msg = channel.recv(8)
@ -1399,15 +1403,15 @@ class ConfigManager(object):
_pendingchangesets[xid] = event.Event()
rpcpayload = cPickle.dumps({'function': '_rpc_master_set_node_attributes',
'args': (self.tenant, attribmap,
autocreate, xid)})
autocreate), 'xid': xid})
rpclen = len(rpcpayload)
cfgleader.sendall(struct.pack('!Q', rpclen))
cfgleader.sendall(rpcpayload)
_pendingchangesets[xid].wait(0)
_pendingchangesets[xid].wait()
del _pendingchangesets[xid]
return
def send_to_followers(self, xid, fnname, *args):
def send_to_followers(self, fnname, *args):
global _txcount
_txcount += 1
if len(cfgstreams) < (len(_cfgstore['collective']) // 2) :
@ -1415,7 +1419,7 @@ class ConfigManager(object):
raise Exception("collective does not have quorum")
pushes = eventlet.GreenPool()
payload = cPickle.dumps({'function': fnname, 'args': args,
'txcount': _txcount, 'xid': xid})
'txcount': _txcount})
for res in pushes.starmap(
_push_rpc, [(cfgstreams[s], payload) for s in cfgstreams]):
pass
@ -1424,7 +1428,7 @@ class ConfigManager(object):
if cfgleader: # currently config slave to another
return self.set_leader_node_attributes(attribmap, autocreate)
if cfgstreams:
self.send_to_followers(None, '_rpc_set_node_attributes',
self.send_to_followers('_rpc_set_node_attributes',
self.tenant, attribmap, autocreate)
self._true_set_node_attributes(attribmap, autocreate)