diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 03cf6574..e457da4d 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -64,6 +64,7 @@ def connect_to_leader(cert=None, name=None, leader=None): raise Exception(keydata['error']) if follower is not None: follower.kill() + follower = None colldata = tlvdata.recv(remote) globaldata = tlvdata.recv(remote) dbsize = tlvdata.recv(remote)['dbsize'] @@ -133,6 +134,12 @@ def handle_connection(connection, cert, request, local=False): if not local: return if 'invite' == operation: + if follower: + tlvdata.send(connection, + {'collective': + {'error': 'Invite can only be run from current ' + 'leader ({0})'.format(currentleader)}}) + return #TODO(jjohnson2): Cannot do the invitation if not the head node, the certificate hand-carrying #can't work in such a case. name = request['name'] @@ -266,6 +273,10 @@ def retire_as_leader(): def become_leader(connection): global currentleader + global follower + if follower: + follower.kill() + follower = None currentleader = connection.getsockname()[0] skipaddr = connection.getpeername()[0] myname = get_myname() @@ -287,6 +298,10 @@ def startup(): eventlet.spawn_n(start_collective) def start_collective(): + global follower + if follower: + follower.kill() + follower = None myname = get_myname() for member in cfm.list_collective(): if member == myname: