mirror of
https://github.com/xcat2/confluent.git
synced 2025-01-16 20:57:53 +00:00
Succeed in pushing config to followers from leader
Still more work to be done for multiple transactions.
This commit is contained in:
parent
aec4e746e9
commit
d11c716b6a
@ -174,6 +174,10 @@ def _do_notifier(cfg, watcher, callback):
|
||||
logException()
|
||||
|
||||
|
||||
def _remote_set_node_attributes(tenant, attribmap, autocreate):
|
||||
ConfigManager(tenant)._set_node_attributes(attribmap, autocreate)
|
||||
|
||||
|
||||
def logException():
|
||||
global tracelog
|
||||
if tracelog is None:
|
||||
@ -213,6 +217,11 @@ def init_masterkey(password=None):
|
||||
password=password))
|
||||
|
||||
|
||||
def _push_rpc(stream, payload):
|
||||
stream.sendall(struct.pack('!Q', len(payload)))
|
||||
stream.sendall(payload)
|
||||
|
||||
|
||||
def decrypt_value(cryptvalue,
|
||||
key=None,
|
||||
integritykey=None):
|
||||
@ -394,7 +403,7 @@ def follow_channel(channel):
|
||||
cfgleader = channel
|
||||
msg = channel.recv(8)
|
||||
while msg:
|
||||
sz = struct.unpack('!Q', msg)
|
||||
sz = struct.unpack('!Q', msg)[0]
|
||||
if sz != 0:
|
||||
rpc = ''
|
||||
while len(rpc) < sz:
|
||||
@ -403,7 +412,7 @@ def follow_channel(channel):
|
||||
raise Exception('Truncated message error')
|
||||
rpc += nrpc
|
||||
rpc = cPickle.loads(rpc)
|
||||
locals()[rpc['function']](*rpc['args'])
|
||||
globals()[rpc['function']](*rpc['args'])
|
||||
_txcount = rpc['txcount']
|
||||
if 'xid' in rpc:
|
||||
_pendingchangesets[rpc['xid']].send()
|
||||
@ -1377,10 +1386,6 @@ class ConfigManager(object):
|
||||
_pendingchangesets[xid].wait(0)
|
||||
return
|
||||
|
||||
def _push_rpc(self, stream, payload):
|
||||
stream.sendall(struct.pack('!Q', len(payload)))
|
||||
stream.sendall(payload)
|
||||
|
||||
def set_follower_node_attributes(self, attribmap, autocreate):
|
||||
global _txcount
|
||||
_txcount += 1
|
||||
@ -1388,11 +1393,11 @@ class ConfigManager(object):
|
||||
# the leader counts in addition to registered streams
|
||||
raise Exception("collective does not have quorum")
|
||||
pushes = eventlet.GreenPool()
|
||||
payload = cPickle.dumps({'function': '_set_node_attributes',
|
||||
'args': (attribmap, autocreate),
|
||||
payload = cPickle.dumps({'function': '_remote_set_node_attributes',
|
||||
'args': (self.tenant, attribmap, autocreate),
|
||||
'txcount': _txcount})
|
||||
for res in pushes.imap(_push_rpc,
|
||||
[(s, payload) for s in cfgstreams]):
|
||||
for res in pushes.starmap(
|
||||
_push_rpc, [(cfgstreams[s], payload) for s in cfgstreams]):
|
||||
print(repr(res))
|
||||
|
||||
def set_node_attributes(self, attribmap, autocreate=False):
|
||||
|
Loading…
x
Reference in New Issue
Block a user