diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index e85d39e8..56b18ac9 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -48,6 +48,7 @@ class ContextBool(object): self.active = False connecting = ContextBool() +leader_init = ContextBool() def connect_to_leader(cert=None, name=None, leader=None): @@ -385,18 +386,23 @@ def handle_connection(connection, cert, request, local=False): if retrythread: retrythread.cancel() retrythread = None - cfm.update_collective_address(request['name'], - connection.getpeername()[0]) - tlvdata.send(connection, cfm._dump_keys(None, False)) - tlvdata.send(connection, cfm._cfgstore['collective']) - tlvdata.send(connection, cfm.get_globals()) - cfgdata = cfm.ConfigManager(None)._dump_to_json() - tlvdata.send(connection, {'txcount': cfm._txcount, - 'dbsize': len(cfgdata)}) - connection.sendall(cfgdata) + with leader_init: + cfm.update_collective_address(request['name'], + connection.getpeername()[0]) + tlvdata.send(connection, cfm._dump_keys(None, False)) + tlvdata.send(connection, cfm._cfgstore['collective']) + tlvdata.send(connection, cfm.get_globals()) + cfgdata = cfm.ConfigManager(None)._dump_to_json() + tlvdata.send(connection, {'txcount': cfm._txcount, + 'dbsize': len(cfgdata)}) + connection.sendall(cfgdata) #tlvdata.send(connection, {'tenants': 0}) # skip the tenants for now, # so far unused anyway - cfm.relay_slaved_requests(drone, connection) + if not cfm.relay_slaved_requests(drone, connection): + if not retrythread: # start a recovery if everyone else seems + # to have disappeared + retrythread = eventlet.spawn_after(30 + random.random(), + start_collective) # ok, we have a connecting member whose certificate checks out # He needs to bootstrap his configuration and subscribe it to updates @@ -471,8 +477,11 @@ def start_collective(): if follower: follower.kill() follower = None + if leader_init.active: # do not start trying to connect if we are + # xmitting data to a follower + return myname = get_myname() - for member in cfm.list_collective(): + for member in sorted(list(cfm.list_collective())): if member == myname: continue if cfm.cfgleader is None: diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 5193d775..6544aef4 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -550,6 +550,8 @@ def relay_slaved_requests(name, listener): pass if not cfgstreams and not cfgleader: # last one out, set cfgleader to boolean to mark dead collective stop_following(True) + return False + return True class StreamHandler(object):