2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-01-15 04:07:51 +00:00

Significantly rework the collective startup behavior

One, make the tracking bools enforce a lock to reduce confusion

Treat an initializing peer as failed, to avoid getting too fixated
on an uncertain target.

Make sure that no more than one follower is tried at a time by
killing before starting a new one, and syncing up the configmanager
state

Decline to act on an assimilation request if we are trying to connect
and also if the current leader asks us to connect and we already are.

Avoid calling get_leader while connecting, as that can cause a member
to decide to become a leader while trying to connect, by swapping
the reactions to the connect request.

Avoid trying to assimilate existing followers.

Fix some logging.
This commit is contained in:
Jarrod Johnson 2018-10-12 11:45:23 -04:00
parent f525c25ba6
commit 3105b9b1f9

View File

@ -41,12 +41,15 @@ retrythread = None
class ContextBool(object):
def __init__(self):
self.active = False
self.mylock = threading.RLock()
def __enter__(self):
self.active = True
self.mylock.__enter__()
def __exit__(self, exc_type, exc_val, exc_tb):
self.active = False
self.mylock.__exit__(exc_type, exc_val, exc_tb)
connecting = ContextBool()
leader_init = ContextBool()
@ -81,11 +84,9 @@ def connect_to_leader(cert=None, name=None, leader=None):
if 'backoff' in keydata:
log.log({
'info': 'Collective initialization in progress on '
'{0}, will retry connection'.format(leader),
'{0}'.format(leader),
'subsystem': 'collective'})
eventlet.spawn_after(random.random(), connect_to_leader,
cert, name, leader)
return True
return False
if 'leader' in keydata:
log.log(
{'info': 'Prospective leader {0} has redirected this '
@ -101,13 +102,17 @@ def connect_to_leader(cert=None, name=None, leader=None):
log.log({'info':
'Prospective leader {0} has inferior '
'transaction count, becoming leader'
''.format(leader), 'subsystem': 'collective'})
''.format(leader), 'subsystem': 'collective',
'subsystem': 'collective'})
return become_leader(remote)
print(keydata['error'])
return False
follower.kill()
cfm.stop_following()
follower = None
if follower:
follower.kill()
cfm.stop_following()
follower = None
log.log({'info': 'Following leader {0}'.format(leader),
'subsystem': 'collective'})
colldata = tlvdata.recv(remote)
@ -364,6 +369,18 @@ def handle_connection(connection, cert, request, local=False):
'transaction count',
'txcount': cfm._txcount,})
return
if connecting.active:
# don't try to connect while actively already trying to connect
tlvdata.send(connection, {'status': 0})
connection.close()
return
if (currentleader == connection.getpeername()[0] and
follower and follower.isAlive()):
# if we are happily following this leader already, don't stir
# the pot
tlvdata.send(connection, {'status': 0})
connection.close()
return
log.log({'info': 'Connecting in response to assimilation',
'subsystem': 'collective'})
eventlet.spawn_n(connect_to_leader, None, None,
@ -394,6 +411,11 @@ def handle_connection(connection, cert, request, local=False):
connection.close()
return
myself = connection.getsockname()[0]
if connecting.active:
tlvdata.send(connection, {'error': 'Connecting right now',
'backoff': True})
connection.close()
return
if myself != get_leader(connection):
tlvdata.send(
connection,
@ -401,11 +423,6 @@ def handle_connection(connection, cert, request, local=False):
'in another castle', 'leader': currentleader})
connection.close()
return
if connecting.active:
tlvdata.send(connection, {'error': 'Connecting right now',
'backoff': True})
connection.close()
return
if request['txcount'] > cfm._txcount:
retire_as_leader()
tlvdata.send(connection,
@ -491,6 +508,12 @@ def try_assimilate(drone):
def get_leader(connection):
if currentleader is None or connection.getpeername()[0] == currentleader:
if currentleader is None:
msg = 'Becoming leader as no leader known'
else:
msg = 'Becoming leader because {0} attempted to connect and it ' \
'is current leader'.format(currentleader)
log.log({'info': msg, 'subsystem': 'collective'})
become_leader(connection)
return currentleader
@ -507,6 +530,7 @@ def become_leader(connection):
'subsystem': 'collective'})
if follower:
follower.kill()
cfm.stop_following()
follower = None
if retrythread:
retrythread.cancel()
@ -514,9 +538,12 @@ def become_leader(connection):
currentleader = connection.getsockname()[0]
skipaddr = connection.getpeername()[0]
myname = get_myname()
skipem = set(cfm.cfgstreams)
skipem.add(currentleader)
skipem.add(skipaddr)
for member in cfm.list_collective():
dronecandidate = cfm.get_collective_member(member)['address']
if dronecandidate in (currentleader, skipaddr) or member == myname:
if dronecandidate in skipem or member == myname:
continue
eventlet.spawn_n(try_assimilate, dronecandidate)
@ -533,6 +560,7 @@ def start_collective():
global retrythread
if follower:
follower.kill()
cfm.stop_following()
follower = None
try:
if cfm.cfgstreams: