mirror of
https://github.com/xcat2/confluent.git
synced 2025-01-13 03:08:14 +00:00
Make the takeover process more deterministic
Try to avoid submitting to be a follower while we are currently becoming a leader
This commit is contained in:
parent
e5c4219ee9
commit
5cf1671350
@ -48,6 +48,7 @@ class ContextBool(object):
|
||||
self.active = False
|
||||
|
||||
connecting = ContextBool()
|
||||
leader_init = ContextBool()
|
||||
|
||||
|
||||
def connect_to_leader(cert=None, name=None, leader=None):
|
||||
@ -385,18 +386,23 @@ def handle_connection(connection, cert, request, local=False):
|
||||
if retrythread:
|
||||
retrythread.cancel()
|
||||
retrythread = None
|
||||
cfm.update_collective_address(request['name'],
|
||||
connection.getpeername()[0])
|
||||
tlvdata.send(connection, cfm._dump_keys(None, False))
|
||||
tlvdata.send(connection, cfm._cfgstore['collective'])
|
||||
tlvdata.send(connection, cfm.get_globals())
|
||||
cfgdata = cfm.ConfigManager(None)._dump_to_json()
|
||||
tlvdata.send(connection, {'txcount': cfm._txcount,
|
||||
'dbsize': len(cfgdata)})
|
||||
connection.sendall(cfgdata)
|
||||
with leader_init:
|
||||
cfm.update_collective_address(request['name'],
|
||||
connection.getpeername()[0])
|
||||
tlvdata.send(connection, cfm._dump_keys(None, False))
|
||||
tlvdata.send(connection, cfm._cfgstore['collective'])
|
||||
tlvdata.send(connection, cfm.get_globals())
|
||||
cfgdata = cfm.ConfigManager(None)._dump_to_json()
|
||||
tlvdata.send(connection, {'txcount': cfm._txcount,
|
||||
'dbsize': len(cfgdata)})
|
||||
connection.sendall(cfgdata)
|
||||
#tlvdata.send(connection, {'tenants': 0}) # skip the tenants for now,
|
||||
# so far unused anyway
|
||||
cfm.relay_slaved_requests(drone, connection)
|
||||
if not cfm.relay_slaved_requests(drone, connection):
|
||||
if not retrythread: # start a recovery if everyone else seems
|
||||
# to have disappeared
|
||||
retrythread = eventlet.spawn_after(30 + random.random(),
|
||||
start_collective)
|
||||
# ok, we have a connecting member whose certificate checks out
|
||||
# He needs to bootstrap his configuration and subscribe it to updates
|
||||
|
||||
@ -471,8 +477,11 @@ def start_collective():
|
||||
if follower:
|
||||
follower.kill()
|
||||
follower = None
|
||||
if leader_init.active: # do not start trying to connect if we are
|
||||
# xmitting data to a follower
|
||||
return
|
||||
myname = get_myname()
|
||||
for member in cfm.list_collective():
|
||||
for member in sorted(list(cfm.list_collective())):
|
||||
if member == myname:
|
||||
continue
|
||||
if cfm.cfgleader is None:
|
||||
|
@ -550,6 +550,8 @@ def relay_slaved_requests(name, listener):
|
||||
pass
|
||||
if not cfgstreams and not cfgleader: # last one out, set cfgleader to boolean to mark dead collective
|
||||
stop_following(True)
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
class StreamHandler(object):
|
||||
|
Loading…
x
Reference in New Issue
Block a user