From 153956b2cd82d627d7a3a925e234fce387ec2789 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Thu, 8 Apr 2021 18:26:59 -0400 Subject: [PATCH] Further refine collective startup behavior --- .../confluent/collective/manager.py | 31 ++++++++++++++----- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 0b82b755..489062b0 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -164,6 +164,7 @@ def connect_to_leader(cert=None, name=None, leader=None, remote=None): def follow_leader(remote, leader): global currentleader + global retrythread cleanexit = False newleader = None try: @@ -179,8 +180,8 @@ def follow_leader(remote, leader): if newleader: log.log( {'info': 'Previous leader directed us to join new leader {}'.format(newleader)}) - eventlet.spawn_n(connect_to_leader, None, get_myname(), newleader) - return + if connect_to_leader(None, get_myname(), newleader): + return log.log({'info': 'Current leader ({0}) has disappeared, restarting ' 'collective membership'.format(leader), 'subsystem': 'collective'}) # The leader has folded, time to startup again... @@ -238,6 +239,7 @@ def get_myname(): def handle_connection(connection, cert, request, local=False): global currentleader global retrythread + connection.settimeout(5) operation = request['operation'] if cert: cert = crypto.dump_certificate(crypto.FILETYPE_ASN1, cert) @@ -467,10 +469,13 @@ def handle_connection(connection, cert, request, local=False): 'subsystem': 'collective'}) if cfm.cfgstreams: retire_as_leader(connection.getpeername()) - eventlet.spawn_n(connect_to_leader, None, None, - leader=connection.getpeername()[0]) tlvdata.send(connection, {'status': 0}) + newleader = connection.getpeername()[0] connection.close() + if not connect_to_leader(None, None, leader=newleader): + if retrythread is None: + retrythread = eventlet.spawn_after(random.random(), + start_collective) if 'getinfo' == operation: drone = request['name'] droneinfo = cfm.get_collective_member(drone) @@ -514,10 +519,13 @@ def handle_connection(connection, cert, request, local=False): 'should assimilate me, connecting..', 'txcount': cfm._txcount}) log.log({'info': 'Connecting to leader due to superior ' - 'transaction count', 'subsystem': collective}) - eventlet.spawn_n(connect_to_leader, None, None, - connection.getpeername()[0]) + 'transaction count', 'subsystem': 'collective'}) connection.close() + if not connect_to_leader( + None, None, connection.getpeername()[0]): + if retrythread is None: + retrythread = eventlet.spawn_after(5 + random.random(), + start_collective) return if retrythread is not None: retrythread.cancel() @@ -535,6 +543,8 @@ def handle_connection(connection, cert, request, local=False): #tlvdata.send(connection, {'tenants': 0}) # skip the tenants for now, # so far unused anyway if not cfm.relay_slaved_requests(drone, connection): + log.log({'info': 'All clients have disconnected, starting recovery process', + 'subsystem': 'collective'}) if retrythread is None: # start a recovery if everyone else seems # to have disappeared retrythread = eventlet.spawn_after(5 + random.random(), @@ -556,6 +566,7 @@ def populate_collinfo(collinfo): def try_assimilate(drone, followcount, remote): + global retrythread try: remote = connect_to_collective(None, drone, remote) except socket.error: @@ -580,7 +591,10 @@ def try_assimilate(drone, followcount, remote): log.log({'info': 'Deferring to {0} due to target being a better leader'.format( drone), 'subsystem': 'collective'}) retire_as_leader(drone) - connect_to_leader(None, None, leader=remote.getpeername()[0]) + if not connect_to_leader(None, None, leader=remote.getpeername()[0]): + if retrythread is None: + retrythread = eventlet.spawn_after(random.random(), + start_collective) return False if 'leader' in answer: # Will wait for leader to see about assimilation @@ -706,6 +720,7 @@ def schedule_rebalance(): def start_collective(): global follower global retrythread + retrythread = None try: cfm.membership_callback = schedule_rebalance if follower: