diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 3a6d7bb1..b41a1093 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -163,8 +163,10 @@ def connect_to_leader(cert=None, name=None, leader=None): def follow_leader(remote, leader): global currentleader cleanexit = False + newleader = None try: - cfm.follow_channel(remote) + exitcause = cfm.follow_channel(remote) + newleader = exitcause.get('newleader', None) except greenlet.GreenletExit: cleanexit = True finally: @@ -172,6 +174,11 @@ def follow_leader(remote, leader): log.log({'info': 'Previous following cleanly closed', 'subsystem': 'collective'}) return + if newleader: + log.log( + {'info': 'Previous leader directed us to join new leader {}'.format(newleader)}) + eventlet.spawn_n(connect_to_leader, None, get_myname(), newleader) + return log.log({'info': 'Current leader ({0}) has disappeared, restarting ' 'collective membership'.format(leader), 'subsystem': 'collective'}) # The leader has folded, time to startup again... @@ -399,6 +406,28 @@ def handle_connection(connection, cert, request, local=False): 'transaction count', 'txcount': cfm._txcount,}) return + if cfm.cfgstreams and request['txcount'] == cfm._txcount: + try: + cfm.check_quorum() + tlvdata.send(connection, + {'error': 'Refusing to be assimilated as I am a leader with quorum', + 'txcount': cfm._txcount,}) + return + except exc.DegradedCollective: + followcount = request.get('followcount', None) + myfollowcount = len(list(cfm.cfgstreams)) + if followcount is not None and followcount < myfollowcount: + tlvdata.send(connection, + {'error': 'Refusing to be assimilated by leader with fewer followers', + 'txcount': cfm._txcount,}) + return + if follower: + tlvdata.send( + connection, + {'error': 'Already following, assimilate leader first', + 'leader': currentleader}) + connection.close() + return if connecting.active: # don't try to connect while actively already trying to connect tlvdata.send(connection, {'status': 0}) @@ -501,7 +530,7 @@ def populate_collinfo(collinfo): collinfo['offline'].append(member) -def try_assimilate(drone): +def try_assimilate(drone, followcount): try: remote = connect_to_collective(None, drone) except socket.error: @@ -510,6 +539,7 @@ def try_assimilate(drone): return tlvdata.send(remote, {'collective': {'operation': 'assimilate', 'name': get_myname(), + 'followcount': followcount, 'txcount': cfm._txcount}}) tlvdata.recv(remote) # the banner tlvdata.recv(remote) # authpassed... 0.. @@ -522,10 +552,13 @@ def try_assimilate(drone): 'subsystem': 'collective'}) return if 'txcount' in answer: - log.log({'info': 'Deferring to {0} due to transaction count'.format( + log.log({'info': 'Deferring to {0} due to target being a better leader'.format( drone), 'subsystem': 'collective'}) connect_to_leader(None, None, leader=remote.getpeername()[0]) return + if 'leader' in answer: + # Will wait for leader to see about assimilation + return if 'error' in answer: log.log({ 'error': 'Error encountered while attempting to ' @@ -547,9 +580,9 @@ def get_leader(connection): become_leader(connection) return currentleader -def retire_as_leader(): +def retire_as_leader(newleader=None): global currentleader - cfm.stop_leading() + cfm.stop_leading(newleader) currentleader = None def become_leader(connection): @@ -569,13 +602,14 @@ def become_leader(connection): skipaddr = connection.getpeername()[0] myname = get_myname() skipem = set(cfm.cfgstreams) + numfollowers = len(list(skipem)) skipem.add(currentleader) skipem.add(skipaddr) for member in cfm.list_collective(): dronecandidate = cfm.get_collective_member(member)['address'] if dronecandidate in skipem or member == myname: continue - eventlet.spawn_n(try_assimilate, dronecandidate) + eventlet.spawn_n(try_assimilate, dronecandidate, numfollowers) schedule_rebalance() diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index c8056402..6c78f53b 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -739,9 +739,14 @@ def stop_following(replacement=None): pass cfgleader = replacement -def stop_leading(): +def stop_leading(newleader=None): + rpcpayload = None + if newleader is not None: + rpcpayload = msgpack.packb({'newleader': newleader}, use_bin_type=False) for stream in list(cfgstreams): try: + if rpcpayload is not None: + _push_rpc(cfgstreams[stream], rpcpayload) cfgstreams[stream].close() except Exception: pass @@ -827,6 +832,8 @@ def follow_channel(channel): rpc = msgpack.unpackb(rpc, raw=False) if 'txcount' in rpc: _txcount = rpc['txcount'] + if 'newleader' in rpc: + return rpc if 'function' in rpc: if not (rpc['function'].startswith('_true') or rpc['function'].startswith('_rpc')): raise Exception("Received unsupported function call: {0}".format(rpc['function'])) @@ -856,6 +863,7 @@ def follow_channel(channel): stop_following(None) else: stop_following(True) + return {} def add_collective_member(name, address, fingerprint):