mirror of
https://github.com/xcat2/confluent.git
synced 2025-01-15 04:07:51 +00:00
Fix collective retry logic
It erroneously treated a thread as a bool, need to check if None to know if it is scheduled.
This commit is contained in:
parent
8d16b412ae
commit
64d5081be3
@ -186,7 +186,9 @@ def follow_leader(remote, leader):
|
||||
# The leader has folded, time to startup again...
|
||||
cfm.stop_following()
|
||||
currentleader = None
|
||||
eventlet.spawn_n(start_collective)
|
||||
if retrythread is None: # start a recovery
|
||||
retrythread = eventlet.spawn_after(
|
||||
random.random(), start_collective)
|
||||
|
||||
def create_connection(member):
|
||||
remote = None
|
||||
@ -517,7 +519,7 @@ def handle_connection(connection, cert, request, local=False):
|
||||
connection.getpeername()[0])
|
||||
connection.close()
|
||||
return
|
||||
if retrythread:
|
||||
if retrythread is not None:
|
||||
retrythread.cancel()
|
||||
retrythread = None
|
||||
with leader_init:
|
||||
@ -533,7 +535,7 @@ def handle_connection(connection, cert, request, local=False):
|
||||
#tlvdata.send(connection, {'tenants': 0}) # skip the tenants for now,
|
||||
# so far unused anyway
|
||||
if not cfm.relay_slaved_requests(drone, connection):
|
||||
if not retrythread: # start a recovery if everyone else seems
|
||||
if retrythread is None: # start a recovery if everyone else seems
|
||||
# to have disappeared
|
||||
retrythread = eventlet.spawn_after(5 + random.random(),
|
||||
start_collective)
|
||||
@ -596,6 +598,7 @@ def try_assimilate(drone, followcount, remote):
|
||||
|
||||
def get_leader(connection):
|
||||
if currentleader is None or connection.getpeername()[0] == currentleader:
|
||||
# cancel retry if a retry is pending
|
||||
if currentleader is None:
|
||||
msg = 'Becoming leader as no leader known'
|
||||
else:
|
||||
@ -620,7 +623,7 @@ def become_leader(connection):
|
||||
follower.kill()
|
||||
cfm.stop_following()
|
||||
follower = None
|
||||
if retrythread:
|
||||
if retrythread is not None:
|
||||
retrythread.cancel()
|
||||
retrythread = None
|
||||
currentleader = connection.getsockname()[0]
|
||||
@ -739,7 +742,10 @@ def start_collective():
|
||||
if connect_to_leader(name=myname, leader=member, remote=remote):
|
||||
break
|
||||
else:
|
||||
retrythread = eventlet.spawn_after(5 + random.random(),
|
||||
start_collective)
|
||||
if retrythread is None:
|
||||
retrythread = eventlet.spawn_after(5 + random.random(),
|
||||
start_collective)
|
||||
except Exception as e:
|
||||
retrythread = eventlet.spawn_after(random.random(), start_collective)
|
||||
if retrythread is None:
|
||||
retrythread = eventlet.spawn_after(random.random(),
|
||||
start_collective)
|
||||
|
Loading…
x
Reference in New Issue
Block a user