2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-01-16 20:57:53 +00:00

Apply changes from leader subscription

This commit is contained in:
Jarrod Johnson 2018-05-04 16:12:53 -04:00
parent a78aa6816c
commit aec4e746e9
2 changed files with 22 additions and 5 deletions

View File

@ -69,7 +69,7 @@ def connect_to_leader(cert=None, name=None):
cfm.set_global(globvar, globaldata[globvar])
cfm.ConfigManager(tenant=None)._load_from_json(dbjson)
cfm.ConfigManager._bg_sync_to_file()
cfm.set_leader_channel(remote)
cfm.follow_channel(remote)
def handle_connection(connection, cert, request, local=False):
@ -156,7 +156,7 @@ def handle_connection(connection, cert, request, local=False):
cfgdata = cfm.ConfigManager(None)._dump_to_json()
tlvdata.send(connection, {'dbsize': len(cfgdata)})
connection.sendall(cfgdata)
tlvdata.send(connection, {'tenants': 0}) # skip the tenants for now, so far unused anyway
#tlvdata.send(connection, {'tenants': 0}) # skip the tenants for now, so far unused anyway
cfm.register_config_listener(drone, connection)
# ok, we have a connecting member whose certificate checks out
# He needs to bootstrap his configuration and subscribe it to updates

View File

@ -389,9 +389,26 @@ def clear_configuration():
pass
cfgleader = None
def set_leader_channel(channel):
def follow_channel(channel):
global cfgleader
cfgleader = channel
msg = channel.recv(8)
while msg:
sz = struct.unpack('!Q', msg)
if sz != 0:
rpc = ''
while len(rpc) < sz:
nrpc = channel.recv(sz - len(rpc))
if not nrpc:
raise Exception('Truncated message error')
rpc += nrpc
rpc = cPickle.loads(rpc)
locals()[rpc['function']](*rpc['args'])
_txcount = rpc['txcount']
if 'xid' in rpc:
_pendingchangesets[rpc['xid']].send()
def add_collective_member(name, address, fingerprint):
try:
@ -1375,7 +1392,7 @@ class ConfigManager(object):
'args': (attribmap, autocreate),
'txcount': _txcount})
for res in pushes.imap(_push_rpc,
[(payload, s) for s in cfgstreams]):
[(s, payload) for s in cfgstreams]):
print(repr(res))
def set_node_attributes(self, attribmap, autocreate=False):
@ -1385,7 +1402,7 @@ class ConfigManager(object):
self.set_follower_node_attributes(attribmap, autocreate)
self._set_node_attributes(attribmap, autocreate)
def _set_node_attributes(self, attribmap, autocreate, xid=None):
def _set_node_attributes(self, attribmap, autocreate):
# TODO(jbjohnso): multi mgr support, here if we have peers,
# pickle the arguments and fire them off in eventlet
# flows to peers, all should have the same result