From 6a4086679cde219c5c47360a166558fd6cee1ecb Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Wed, 7 Apr 2021 13:05:02 -0400 Subject: [PATCH] Rework collective assimilation logic Followers will only depart if their current leader is assimilated. Leaders with quorum will refuse assimilation and instruct member trying to assimilate to join it. Leaders without quorum will either follow the assimilation leader or refuse, depending on who has highest transaction count, and if a tie, which has the larger set of followers --- .../confluent/collective/manager.py | 46 ++++++++++++++++--- .../confluent/config/configmanager.py | 10 +++- 2 files changed, 49 insertions(+), 7 deletions(-) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 3a6d7bb1..b41a1093 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -163,8 +163,10 @@ def connect_to_leader(cert=None, name=None, leader=None): def follow_leader(remote, leader): global currentleader cleanexit = False + newleader = None try: - cfm.follow_channel(remote) + exitcause = cfm.follow_channel(remote) + newleader = exitcause.get('newleader', None) except greenlet.GreenletExit: cleanexit = True finally: @@ -172,6 +174,11 @@ def follow_leader(remote, leader): log.log({'info': 'Previous following cleanly closed', 'subsystem': 'collective'}) return + if newleader: + log.log( + {'info': 'Previous leader directed us to join new leader {}'.format(newleader)}) + eventlet.spawn_n(connect_to_leader, None, get_myname(), newleader) + return log.log({'info': 'Current leader ({0}) has disappeared, restarting ' 'collective membership'.format(leader), 'subsystem': 'collective'}) # The leader has folded, time to startup again... @@ -399,6 +406,28 @@ def handle_connection(connection, cert, request, local=False): 'transaction count', 'txcount': cfm._txcount,}) return + if cfm.cfgstreams and request['txcount'] == cfm._txcount: + try: + cfm.check_quorum() + tlvdata.send(connection, + {'error': 'Refusing to be assimilated as I am a leader with quorum', + 'txcount': cfm._txcount,}) + return + except exc.DegradedCollective: + followcount = request.get('followcount', None) + myfollowcount = len(list(cfm.cfgstreams)) + if followcount is not None and followcount < myfollowcount: + tlvdata.send(connection, + {'error': 'Refusing to be assimilated by leader with fewer followers', + 'txcount': cfm._txcount,}) + return + if follower: + tlvdata.send( + connection, + {'error': 'Already following, assimilate leader first', + 'leader': currentleader}) + connection.close() + return if connecting.active: # don't try to connect while actively already trying to connect tlvdata.send(connection, {'status': 0}) @@ -501,7 +530,7 @@ def populate_collinfo(collinfo): collinfo['offline'].append(member) -def try_assimilate(drone): +def try_assimilate(drone, followcount): try: remote = connect_to_collective(None, drone) except socket.error: @@ -510,6 +539,7 @@ def try_assimilate(drone): return tlvdata.send(remote, {'collective': {'operation': 'assimilate', 'name': get_myname(), + 'followcount': followcount, 'txcount': cfm._txcount}}) tlvdata.recv(remote) # the banner tlvdata.recv(remote) # authpassed... 0.. @@ -522,10 +552,13 @@ def try_assimilate(drone): 'subsystem': 'collective'}) return if 'txcount' in answer: - log.log({'info': 'Deferring to {0} due to transaction count'.format( + log.log({'info': 'Deferring to {0} due to target being a better leader'.format( drone), 'subsystem': 'collective'}) connect_to_leader(None, None, leader=remote.getpeername()[0]) return + if 'leader' in answer: + # Will wait for leader to see about assimilation + return if 'error' in answer: log.log({ 'error': 'Error encountered while attempting to ' @@ -547,9 +580,9 @@ def get_leader(connection): become_leader(connection) return currentleader -def retire_as_leader(): +def retire_as_leader(newleader=None): global currentleader - cfm.stop_leading() + cfm.stop_leading(newleader) currentleader = None def become_leader(connection): @@ -569,13 +602,14 @@ def become_leader(connection): skipaddr = connection.getpeername()[0] myname = get_myname() skipem = set(cfm.cfgstreams) + numfollowers = len(list(skipem)) skipem.add(currentleader) skipem.add(skipaddr) for member in cfm.list_collective(): dronecandidate = cfm.get_collective_member(member)['address'] if dronecandidate in skipem or member == myname: continue - eventlet.spawn_n(try_assimilate, dronecandidate) + eventlet.spawn_n(try_assimilate, dronecandidate, numfollowers) schedule_rebalance() diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index c8056402..6c78f53b 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -739,9 +739,14 @@ def stop_following(replacement=None): pass cfgleader = replacement -def stop_leading(): +def stop_leading(newleader=None): + rpcpayload = None + if newleader is not None: + rpcpayload = msgpack.packb({'newleader': newleader}, use_bin_type=False) for stream in list(cfgstreams): try: + if rpcpayload is not None: + _push_rpc(cfgstreams[stream], rpcpayload) cfgstreams[stream].close() except Exception: pass @@ -827,6 +832,8 @@ def follow_channel(channel): rpc = msgpack.unpackb(rpc, raw=False) if 'txcount' in rpc: _txcount = rpc['txcount'] + if 'newleader' in rpc: + return rpc if 'function' in rpc: if not (rpc['function'].startswith('_true') or rpc['function'].startswith('_rpc')): raise Exception("Received unsupported function call: {0}".format(rpc['function'])) @@ -856,6 +863,7 @@ def follow_channel(channel): stop_following(None) else: stop_following(True) + return {} def add_collective_member(name, address, fingerprint):