2
0
mirror of https://github.com/xcat2/confluent.git synced 2024-11-22 09:32:21 +00:00

Isolate following to dedicated greenthread and put fallback to assert leadership

This commit is contained in:
Jarrod Johnson 2018-06-26 14:06:01 -04:00
parent 11e6145a46
commit 0843b991ea

View File

@ -32,11 +32,13 @@ except ImportError:
currentleader = None
cfginitlock = None
follower = None
def connect_to_leader(cert=None, name=None, leader=None):
global currentleader
global cfginitlock
global follower
if cfginitlock is None:
cfginitlock = threading.RLock()
if leader is None:
@ -60,6 +62,8 @@ def connect_to_leader(cert=None, name=None, leader=None):
raise Exception("Redirected to self")
return connect_to_leader(name=name, leader=keydata['leader'])
raise Exception(keydata['error'])
if follower is not None:
follower.kill()
colldata = tlvdata.recv(remote)
globaldata = tlvdata.recv(remote)
dbsize = tlvdata.recv(remote)['dbsize']
@ -82,6 +86,12 @@ def connect_to_leader(cert=None, name=None, leader=None):
cfm.ConfigManager(tenant=None)._load_from_json(dbjson)
cfm.ConfigManager._bg_sync_to_file()
currentleader = leader
#spawn this as a thread...
follower = eventlet.spawn(follow_leader, remote)
return True
def follow_leader(remote):
cfm.follow_channel(remote)
# The leader has folded, time to startup again...
eventlet.spawn_n(start_collective)
@ -258,9 +268,10 @@ def become_leader(connection):
global currentleader
currentleader = connection.getsockname()[0]
skipaddr = connection.getpeername()[0]
myname = get_myname()
for member in cfm.list_collective():
dronecandidate = cfm.get_collective_member(member)['address']
if dronecandidate in (currentleader, skipaddr):
if dronecandidate in (currentleader, skipaddr) or member == myname:
continue
eventlet.spawn_n(try_assimilate, dronecandidate)
@ -283,5 +294,13 @@ def start_collective():
if cfm.cfgleader is None:
cfm.cfgleader = True
ldrcandidate = cfm.get_collective_member(member)['address']
connect_to_leader(name=myname, leader=ldrcandidate)
if connect_to_leader(name=myname, leader=ldrcandidate):
break
else:
for member in cfm.list_collective():
if member == myname:
continue
eventlet.spawn_n(try_assimilate,
cfm.get_collective_member(member)['address'])