From 0843b991ea817abc0c1103d6eb57fd3ab4d8cafe Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Tue, 26 Jun 2018 14:06:01 -0400 Subject: [PATCH] Isolate following to dedicated greenthread and put fallback to assert leadership --- .../confluent/collective/manager.py | 23 +++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index d7f5266d..03cf6574 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -32,11 +32,13 @@ except ImportError: currentleader = None cfginitlock = None +follower = None def connect_to_leader(cert=None, name=None, leader=None): global currentleader global cfginitlock + global follower if cfginitlock is None: cfginitlock = threading.RLock() if leader is None: @@ -60,6 +62,8 @@ def connect_to_leader(cert=None, name=None, leader=None): raise Exception("Redirected to self") return connect_to_leader(name=name, leader=keydata['leader']) raise Exception(keydata['error']) + if follower is not None: + follower.kill() colldata = tlvdata.recv(remote) globaldata = tlvdata.recv(remote) dbsize = tlvdata.recv(remote)['dbsize'] @@ -82,6 +86,12 @@ def connect_to_leader(cert=None, name=None, leader=None): cfm.ConfigManager(tenant=None)._load_from_json(dbjson) cfm.ConfigManager._bg_sync_to_file() currentleader = leader + #spawn this as a thread... + follower = eventlet.spawn(follow_leader, remote) + return True + + +def follow_leader(remote): cfm.follow_channel(remote) # The leader has folded, time to startup again... eventlet.spawn_n(start_collective) @@ -258,9 +268,10 @@ def become_leader(connection): global currentleader currentleader = connection.getsockname()[0] skipaddr = connection.getpeername()[0] + myname = get_myname() for member in cfm.list_collective(): dronecandidate = cfm.get_collective_member(member)['address'] - if dronecandidate in (currentleader, skipaddr): + if dronecandidate in (currentleader, skipaddr) or member == myname: continue eventlet.spawn_n(try_assimilate, dronecandidate) @@ -283,5 +294,13 @@ def start_collective(): if cfm.cfgleader is None: cfm.cfgleader = True ldrcandidate = cfm.get_collective_member(member)['address'] - connect_to_leader(name=myname, leader=ldrcandidate) + if connect_to_leader(name=myname, leader=ldrcandidate): + break + else: + for member in cfm.list_collective(): + if member == myname: + continue + eventlet.spawn_n(try_assimilate, + cfm.get_collective_member(member)['address']) +