mirror of
https://github.com/xcat2/confluent.git
synced 2025-01-16 04:39:16 +00:00
Apply changes from leader subscription
This commit is contained in:
parent
035f10e7d7
commit
81bb16476c
@ -69,7 +69,7 @@ def connect_to_leader(cert=None, name=None):
|
||||
cfm.set_global(globvar, globaldata[globvar])
|
||||
cfm.ConfigManager(tenant=None)._load_from_json(dbjson)
|
||||
cfm.ConfigManager._bg_sync_to_file()
|
||||
cfm.set_leader_channel(remote)
|
||||
cfm.follow_channel(remote)
|
||||
|
||||
|
||||
def handle_connection(connection, cert, request, local=False):
|
||||
@ -156,7 +156,7 @@ def handle_connection(connection, cert, request, local=False):
|
||||
cfgdata = cfm.ConfigManager(None)._dump_to_json()
|
||||
tlvdata.send(connection, {'dbsize': len(cfgdata)})
|
||||
connection.sendall(cfgdata)
|
||||
tlvdata.send(connection, {'tenants': 0}) # skip the tenants for now, so far unused anyway
|
||||
#tlvdata.send(connection, {'tenants': 0}) # skip the tenants for now, so far unused anyway
|
||||
cfm.register_config_listener(drone, connection)
|
||||
# ok, we have a connecting member whose certificate checks out
|
||||
# He needs to bootstrap his configuration and subscribe it to updates
|
||||
|
@ -389,9 +389,26 @@ def clear_configuration():
|
||||
pass
|
||||
|
||||
cfgleader = None
|
||||
def set_leader_channel(channel):
|
||||
def follow_channel(channel):
|
||||
global cfgleader
|
||||
cfgleader = channel
|
||||
msg = channel.recv(8)
|
||||
while msg:
|
||||
sz = struct.unpack('!Q', msg)
|
||||
if sz != 0:
|
||||
rpc = ''
|
||||
while len(rpc) < sz:
|
||||
nrpc = channel.recv(sz - len(rpc))
|
||||
if not nrpc:
|
||||
raise Exception('Truncated message error')
|
||||
rpc += nrpc
|
||||
rpc = cPickle.loads(rpc)
|
||||
locals()[rpc['function']](*rpc['args'])
|
||||
_txcount = rpc['txcount']
|
||||
if 'xid' in rpc:
|
||||
_pendingchangesets[rpc['xid']].send()
|
||||
|
||||
|
||||
|
||||
def add_collective_member(name, address, fingerprint):
|
||||
try:
|
||||
@ -1375,7 +1392,7 @@ class ConfigManager(object):
|
||||
'args': (attribmap, autocreate),
|
||||
'txcount': _txcount})
|
||||
for res in pushes.imap(_push_rpc,
|
||||
[(payload, s) for s in cfgstreams]):
|
||||
[(s, payload) for s in cfgstreams]):
|
||||
print(repr(res))
|
||||
|
||||
def set_node_attributes(self, attribmap, autocreate=False):
|
||||
@ -1385,7 +1402,7 @@ class ConfigManager(object):
|
||||
self.set_follower_node_attributes(attribmap, autocreate)
|
||||
self._set_node_attributes(attribmap, autocreate)
|
||||
|
||||
def _set_node_attributes(self, attribmap, autocreate, xid=None):
|
||||
def _set_node_attributes(self, attribmap, autocreate):
|
||||
# TODO(jbjohnso): multi mgr support, here if we have peers,
|
||||
# pickle the arguments and fire them off in eventlet
|
||||
# flows to peers, all should have the same result
|
||||
|
Loading…
x
Reference in New Issue
Block a user