From 64d5081be33339e9fe67d98f19395c1a6562f914 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Thu, 8 Apr 2021 15:56:33 -0400 Subject: [PATCH] Fix collective retry logic It erroneously treated a thread as a bool, need to check if None to know if it is scheduled. --- .../confluent/collective/manager.py | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 1f863e96..0b82b755 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -186,7 +186,9 @@ def follow_leader(remote, leader): # The leader has folded, time to startup again... cfm.stop_following() currentleader = None - eventlet.spawn_n(start_collective) + if retrythread is None: # start a recovery + retrythread = eventlet.spawn_after( + random.random(), start_collective) def create_connection(member): remote = None @@ -517,7 +519,7 @@ def handle_connection(connection, cert, request, local=False): connection.getpeername()[0]) connection.close() return - if retrythread: + if retrythread is not None: retrythread.cancel() retrythread = None with leader_init: @@ -533,7 +535,7 @@ def handle_connection(connection, cert, request, local=False): #tlvdata.send(connection, {'tenants': 0}) # skip the tenants for now, # so far unused anyway if not cfm.relay_slaved_requests(drone, connection): - if not retrythread: # start a recovery if everyone else seems + if retrythread is None: # start a recovery if everyone else seems # to have disappeared retrythread = eventlet.spawn_after(5 + random.random(), start_collective) @@ -596,6 +598,7 @@ def try_assimilate(drone, followcount, remote): def get_leader(connection): if currentleader is None or connection.getpeername()[0] == currentleader: + # cancel retry if a retry is pending if currentleader is None: msg = 'Becoming leader as no leader known' else: @@ -620,7 +623,7 @@ def become_leader(connection): follower.kill() cfm.stop_following() follower = None - if retrythread: + if retrythread is not None: retrythread.cancel() retrythread = None currentleader = connection.getsockname()[0] @@ -739,7 +742,10 @@ def start_collective(): if connect_to_leader(name=myname, leader=member, remote=remote): break else: - retrythread = eventlet.spawn_after(5 + random.random(), - start_collective) + if retrythread is None: + retrythread = eventlet.spawn_after(5 + random.random(), + start_collective) except Exception as e: - retrythread = eventlet.spawn_after(random.random(), start_collective) + if retrythread is None: + retrythread = eventlet.spawn_after(random.random(), + start_collective)