From 4f73ddc41e722a0b9b3562deab5cf0688e44dca2 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Mon, 14 May 2018 16:22:27 -0400 Subject: [PATCH] Start setting the stage for leader change on restart Have connect() have a way to recover if leader is dead. Also these will be involved in configmanager detected loss of leader --- .../confluent/collective/manager.py | 57 +++++++++++++++++-- .../confluent/config/configmanager.py | 6 ++ 2 files changed, 58 insertions(+), 5 deletions(-) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 002eb94b..eebfc332 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -51,8 +51,11 @@ def connect_to_leader(cert=None, name=None, leader=None): raise Exception("Certificate mismatch in the collective") tlvdata.recv(remote) # the banner tlvdata.recv(remote) # authpassed... 0.. + if name is None: + name = get_myname() tlvdata.send(remote, {'collective': {'operation': 'connect', - 'name': name}}) + 'name': name, + 'txcount': cfm._txcount}}) keydata = tlvdata.recv(remote) if 'error' in keydata: if 'leader' in keydata: @@ -159,6 +162,23 @@ def handle_connection(connection, cert, request, local=False): tlvdata.send(connection, {'collective': {'approval': myrsp, 'leader': get_leader(connection)}}) + if 'assimilate' == operation: + drone = request['name'] + droneinfo = cfm.get_collective_member(drone) + if not util.cert_matches(droneinfo['fingerprint'], cert): + tlvdata.send(connection, + {'error': 'Invalid certificate, ' + 'redo invitation process'}) + return + if request['txcount'] < cfm._txcount: + tlvdata.send(connection, + {'error': 'Refusing to be assimilated by inferior' + 'transaction count', + 'txcount': cfm._txcount}) + return + eventlet.spawn_n(connect_to_leader, None, None, + leader=connection.getpeername()[0]) + tlvdata.send(connection, {'status': 0}) if 'connect' == operation: myself = connection.getsockname()[0] if myself != get_leader(connection): @@ -171,9 +191,18 @@ def handle_connection(connection, cert, request, local=False): droneinfo = cfm.get_collective_member(drone) if not util.cert_matches(droneinfo['fingerprint'], cert): tlvdata.send(connection, - {'error': 'Invalid certificate,' + {'error': 'Invalid certificate, ' 'redo invitation process'}) return + if request['txcount'] > cfm._txcount: + retire_as_leader() + tlvdata.send(connection, + {'error': 'Client has higher tranasaction count, ' + 'should assimilate me, connecting..', + 'txcount': cfm._txcount}) + eventlet.spawn_n(connect_to_leader, None, None, + connection.getpeername()[0]) + return tlvdata.send(connection, cfm._dump_keys(None, False)) tlvdata.send(connection, cfm._cfgstore['collective']) tlvdata.send(connection, cfm.get_globals()) @@ -186,12 +215,30 @@ def handle_connection(connection, cert, request, local=False): # ok, we have a connecting member whose certificate checks out # He needs to bootstrap his configuration and subscribe it to updates +def try_assimilate(drone): + pass + def get_leader(connection): - global currentleader - if currentleader is None: - currentleader = connection.getsockname()[0] + if currentleader is None or connection.getpeername()[0] == currentleader: + become_leader(connection) return currentleader +def retire_as_leader(): + global currentleader + cfm.stop_leading() + currentleader = None + +def become_leader(connection): + global currentleader + currentleader = connection.getsockname()[0] + skipaddr = connection.getpeername()[0] + for member in cfm.list_collective(): + dronecandidate = cfm.get_collective_member(member)['address'] + if dronecandidate in (currentleader, skipaddr): + continue + eventlet.spawn_n(try_assimilate, dronecandidate) + + def startup(): members = list(cfm.list_collective()) if len(members) < 2: diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 46e95100..90a8d566 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -395,6 +395,12 @@ def relay_slaved_requests(name, listener): msg = listener.recv(8) +def stop_leading(): + for stream in list(cfgstreams): + cfgstreams[stream].close() + del cfgstreams[stream] + + def clear_configuration(): global _cfgstore _cfgstore = {}