diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index dfe87baa..06109be7 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -119,7 +119,7 @@ def connect_to_leader(cert=None, name=None, leader=None, remote=None): follower.kill() cfm.stop_following() follower = None - if follower: + if follower is not None: follower.kill() cfm.stop_following() follower = None @@ -166,6 +166,7 @@ def connect_to_leader(cert=None, name=None, leader=None, remote=None): def follow_leader(remote, leader): global currentleader global retrythread + global follower cleanexit = False newleader = None try: @@ -186,6 +187,9 @@ def follow_leader(remote, leader): log.log({'info': 'Current leader ({0}) has disappeared, restarting ' 'collective membership'.format(leader), 'subsystem': 'collective'}) # The leader has folded, time to startup again... + if follower is not None: + follower.kill() + follower = None cfm.stop_following() currentleader = None if retrythread is None: # start a recovery @@ -254,7 +258,7 @@ def handle_connection(connection, cert, request, local=False): 'enabled on this ' 'system'}}) return - if follower: + if follower is not None: linfo = cfm.get_collective_member_by_address(currentleader) remote = socket.create_connection((currentleader, 13001), 15) remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE, @@ -447,7 +451,7 @@ def handle_connection(connection, cert, request, local=False): {'error': 'Refusing, my name is better', 'txcount': cfm._txcount,}) return - if follower: + if follower is not None: tlvdata.send( connection, {'error': 'Already following, assimilate leader first', @@ -635,7 +639,7 @@ def become_leader(connection): global retrythread log.log({'info': 'Becoming leader of collective', 'subsystem': 'collective'}) - if follower: + if follower is not None: follower.kill() cfm.stop_following() follower = None @@ -727,10 +731,9 @@ def start_collective(): retrythread = None try: cfm.membership_callback = schedule_rebalance - if follower: - follower.kill() - cfm.stop_following() - follower = None + if follower is not None: + initting = False + return try: if cfm.cfgstreams: cfm.check_quorum() @@ -756,17 +759,17 @@ def start_collective(): member, remote = ent if isinstance(remote, Exception): continue - log.log({'info': 'Performing startup attempt to {0}'.format( - member), 'subsystem': 'collective'}) - if connect_to_leader(name=myname, leader=member, remote=remote): - break - else: - if retrythread is None: - retrythread = eventlet.spawn_after(5 + random.random(), - start_collective) + if follower is None: + log.log({'info': 'Performing startup attempt to {0}'.format( + member), 'subsystem': 'collective'}) + if not connect_to_leader(name=myname, leader=member, remote=remote): + remote.close() + else: + remote.close() except Exception as e: - if retrythread is None: - retrythread = eventlet.spawn_after(random.random(), - start_collective) + pass finally: + if retrythread is None and follower is None: + retrythread = eventlet.spawn_after(5 + random.random(), + start_collective) initting = False