2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-01-28 03:48:35 +00:00

Rework collective assimilation logic

Followers will only depart if their current leader
is assimilated.

Leaders with quorum will refuse assimilation and instruct
member trying to assimilate to join it.

Leaders without quorum will either follow the assimilation leader
or refuse, depending on who has highest transaction count, and if
a tie, which has the larger set of followers
This commit is contained in:
Jarrod Johnson 2021-04-07 13:05:02 -04:00
parent 3d2422ded3
commit 6a4086679c
2 changed files with 49 additions and 7 deletions

View File

@ -163,8 +163,10 @@ def connect_to_leader(cert=None, name=None, leader=None):
def follow_leader(remote, leader):
global currentleader
cleanexit = False
newleader = None
try:
cfm.follow_channel(remote)
exitcause = cfm.follow_channel(remote)
newleader = exitcause.get('newleader', None)
except greenlet.GreenletExit:
cleanexit = True
finally:
@ -172,6 +174,11 @@ def follow_leader(remote, leader):
log.log({'info': 'Previous following cleanly closed',
'subsystem': 'collective'})
return
if newleader:
log.log(
{'info': 'Previous leader directed us to join new leader {}'.format(newleader)})
eventlet.spawn_n(connect_to_leader, None, get_myname(), newleader)
return
log.log({'info': 'Current leader ({0}) has disappeared, restarting '
'collective membership'.format(leader), 'subsystem': 'collective'})
# The leader has folded, time to startup again...
@ -399,6 +406,28 @@ def handle_connection(connection, cert, request, local=False):
'transaction count',
'txcount': cfm._txcount,})
return
if cfm.cfgstreams and request['txcount'] == cfm._txcount:
try:
cfm.check_quorum()
tlvdata.send(connection,
{'error': 'Refusing to be assimilated as I am a leader with quorum',
'txcount': cfm._txcount,})
return
except exc.DegradedCollective:
followcount = request.get('followcount', None)
myfollowcount = len(list(cfm.cfgstreams))
if followcount is not None and followcount < myfollowcount:
tlvdata.send(connection,
{'error': 'Refusing to be assimilated by leader with fewer followers',
'txcount': cfm._txcount,})
return
if follower:
tlvdata.send(
connection,
{'error': 'Already following, assimilate leader first',
'leader': currentleader})
connection.close()
return
if connecting.active:
# don't try to connect while actively already trying to connect
tlvdata.send(connection, {'status': 0})
@ -501,7 +530,7 @@ def populate_collinfo(collinfo):
collinfo['offline'].append(member)
def try_assimilate(drone):
def try_assimilate(drone, followcount):
try:
remote = connect_to_collective(None, drone)
except socket.error:
@ -510,6 +539,7 @@ def try_assimilate(drone):
return
tlvdata.send(remote, {'collective': {'operation': 'assimilate',
'name': get_myname(),
'followcount': followcount,
'txcount': cfm._txcount}})
tlvdata.recv(remote) # the banner
tlvdata.recv(remote) # authpassed... 0..
@ -522,10 +552,13 @@ def try_assimilate(drone):
'subsystem': 'collective'})
return
if 'txcount' in answer:
log.log({'info': 'Deferring to {0} due to transaction count'.format(
log.log({'info': 'Deferring to {0} due to target being a better leader'.format(
drone), 'subsystem': 'collective'})
connect_to_leader(None, None, leader=remote.getpeername()[0])
return
if 'leader' in answer:
# Will wait for leader to see about assimilation
return
if 'error' in answer:
log.log({
'error': 'Error encountered while attempting to '
@ -547,9 +580,9 @@ def get_leader(connection):
become_leader(connection)
return currentleader
def retire_as_leader():
def retire_as_leader(newleader=None):
global currentleader
cfm.stop_leading()
cfm.stop_leading(newleader)
currentleader = None
def become_leader(connection):
@ -569,13 +602,14 @@ def become_leader(connection):
skipaddr = connection.getpeername()[0]
myname = get_myname()
skipem = set(cfm.cfgstreams)
numfollowers = len(list(skipem))
skipem.add(currentleader)
skipem.add(skipaddr)
for member in cfm.list_collective():
dronecandidate = cfm.get_collective_member(member)['address']
if dronecandidate in skipem or member == myname:
continue
eventlet.spawn_n(try_assimilate, dronecandidate)
eventlet.spawn_n(try_assimilate, dronecandidate, numfollowers)
schedule_rebalance()

View File

@ -739,9 +739,14 @@ def stop_following(replacement=None):
pass
cfgleader = replacement
def stop_leading():
def stop_leading(newleader=None):
rpcpayload = None
if newleader is not None:
rpcpayload = msgpack.packb({'newleader': newleader}, use_bin_type=False)
for stream in list(cfgstreams):
try:
if rpcpayload is not None:
_push_rpc(cfgstreams[stream], rpcpayload)
cfgstreams[stream].close()
except Exception:
pass
@ -827,6 +832,8 @@ def follow_channel(channel):
rpc = msgpack.unpackb(rpc, raw=False)
if 'txcount' in rpc:
_txcount = rpc['txcount']
if 'newleader' in rpc:
return rpc
if 'function' in rpc:
if not (rpc['function'].startswith('_true') or rpc['function'].startswith('_rpc')):
raise Exception("Received unsupported function call: {0}".format(rpc['function']))
@ -856,6 +863,7 @@ def follow_channel(channel):
stop_following(None)
else:
stop_following(True)
return {}
def add_collective_member(name, address, fingerprint):