2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-01-14 19:57:50 +00:00

More collective startup/recovery cleanup

If assimilated during startup, stop doing startup.
This commit is contained in:
Jarrod Johnson 2021-04-09 13:32:32 -04:00
parent 1d902c4d90
commit ac4d1fa8e1

View File

@ -119,7 +119,7 @@ def connect_to_leader(cert=None, name=None, leader=None, remote=None):
follower.kill()
cfm.stop_following()
follower = None
if follower:
if follower is not None:
follower.kill()
cfm.stop_following()
follower = None
@ -166,6 +166,7 @@ def connect_to_leader(cert=None, name=None, leader=None, remote=None):
def follow_leader(remote, leader):
global currentleader
global retrythread
global follower
cleanexit = False
newleader = None
try:
@ -186,6 +187,9 @@ def follow_leader(remote, leader):
log.log({'info': 'Current leader ({0}) has disappeared, restarting '
'collective membership'.format(leader), 'subsystem': 'collective'})
# The leader has folded, time to startup again...
if follower is not None:
follower.kill()
follower = None
cfm.stop_following()
currentleader = None
if retrythread is None: # start a recovery
@ -254,7 +258,7 @@ def handle_connection(connection, cert, request, local=False):
'enabled on this '
'system'}})
return
if follower:
if follower is not None:
linfo = cfm.get_collective_member_by_address(currentleader)
remote = socket.create_connection((currentleader, 13001), 15)
remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE,
@ -447,7 +451,7 @@ def handle_connection(connection, cert, request, local=False):
{'error': 'Refusing, my name is better',
'txcount': cfm._txcount,})
return
if follower:
if follower is not None:
tlvdata.send(
connection,
{'error': 'Already following, assimilate leader first',
@ -635,7 +639,7 @@ def become_leader(connection):
global retrythread
log.log({'info': 'Becoming leader of collective',
'subsystem': 'collective'})
if follower:
if follower is not None:
follower.kill()
cfm.stop_following()
follower = None
@ -727,10 +731,9 @@ def start_collective():
retrythread = None
try:
cfm.membership_callback = schedule_rebalance
if follower:
follower.kill()
cfm.stop_following()
follower = None
if follower is not None:
initting = False
return
try:
if cfm.cfgstreams:
cfm.check_quorum()
@ -756,17 +759,17 @@ def start_collective():
member, remote = ent
if isinstance(remote, Exception):
continue
log.log({'info': 'Performing startup attempt to {0}'.format(
member), 'subsystem': 'collective'})
if connect_to_leader(name=myname, leader=member, remote=remote):
break
else:
if retrythread is None:
retrythread = eventlet.spawn_after(5 + random.random(),
start_collective)
if follower is None:
log.log({'info': 'Performing startup attempt to {0}'.format(
member), 'subsystem': 'collective'})
if not connect_to_leader(name=myname, leader=member, remote=remote):
remote.close()
else:
remote.close()
except Exception as e:
if retrythread is None:
retrythread = eventlet.spawn_after(random.random(),
start_collective)
pass
finally:
if retrythread is None and follower is None:
retrythread = eventlet.spawn_after(5 + random.random(),
start_collective)
initting = False