2
0
mirror of https://github.com/xcat2/confluent.git synced 2024-11-26 19:40:12 +00:00

Succeed in pushing config to followers from leader

Still more work to be done for multiple transactions.
This commit is contained in:
Jarrod Johnson 2018-05-08 11:33:42 -04:00
parent 81bb16476c
commit f1e29393df

View File

@ -174,6 +174,10 @@ def _do_notifier(cfg, watcher, callback):
logException()
def _remote_set_node_attributes(tenant, attribmap, autocreate):
ConfigManager(tenant)._set_node_attributes(attribmap, autocreate)
def logException():
global tracelog
if tracelog is None:
@ -213,6 +217,11 @@ def init_masterkey(password=None):
password=password))
def _push_rpc(stream, payload):
stream.sendall(struct.pack('!Q', len(payload)))
stream.sendall(payload)
def decrypt_value(cryptvalue,
key=None,
integritykey=None):
@ -394,7 +403,7 @@ def follow_channel(channel):
cfgleader = channel
msg = channel.recv(8)
while msg:
sz = struct.unpack('!Q', msg)
sz = struct.unpack('!Q', msg)[0]
if sz != 0:
rpc = ''
while len(rpc) < sz:
@ -403,7 +412,7 @@ def follow_channel(channel):
raise Exception('Truncated message error')
rpc += nrpc
rpc = cPickle.loads(rpc)
locals()[rpc['function']](*rpc['args'])
globals()[rpc['function']](*rpc['args'])
_txcount = rpc['txcount']
if 'xid' in rpc:
_pendingchangesets[rpc['xid']].send()
@ -1377,10 +1386,6 @@ class ConfigManager(object):
_pendingchangesets[xid].wait(0)
return
def _push_rpc(self, stream, payload):
stream.sendall(struct.pack('!Q', len(payload)))
stream.sendall(payload)
def set_follower_node_attributes(self, attribmap, autocreate):
global _txcount
_txcount += 1
@ -1388,11 +1393,11 @@ class ConfigManager(object):
# the leader counts in addition to registered streams
raise Exception("collective does not have quorum")
pushes = eventlet.GreenPool()
payload = cPickle.dumps({'function': '_set_node_attributes',
'args': (attribmap, autocreate),
payload = cPickle.dumps({'function': '_remote_set_node_attributes',
'args': (self.tenant, attribmap, autocreate),
'txcount': _txcount})
for res in pushes.imap(_push_rpc,
[(s, payload) for s in cfgstreams]):
for res in pushes.starmap(
_push_rpc, [(cfgstreams[s], payload) for s in cfgstreams]):
print(repr(res))
def set_node_attributes(self, attribmap, autocreate=False):