mirror of
https://github.com/xcat2/confluent.git
synced 2025-01-27 19:37:57 +00:00
Schedule periodic attempts to restart collective
If collective is lost due to connectivity, this will cause occasional attempts to bring it back.
This commit is contained in:
parent
7d16c943a8
commit
a09792f969
@ -35,6 +35,7 @@ except ImportError:
|
||||
currentleader = None
|
||||
cfginitlock = None
|
||||
follower = None
|
||||
retrythread = None
|
||||
|
||||
class ContextBool(object):
|
||||
def __init__(self):
|
||||
@ -168,6 +169,7 @@ def get_myname():
|
||||
|
||||
def handle_connection(connection, cert, request, local=False):
|
||||
global currentleader
|
||||
global retrythread
|
||||
operation = request['operation']
|
||||
if cert:
|
||||
cert = crypto.dump_certificate(crypto.FILETYPE_ASN1, cert)
|
||||
@ -340,6 +342,9 @@ def handle_connection(connection, cert, request, local=False):
|
||||
connection.getpeername()[0])
|
||||
connection.close()
|
||||
return
|
||||
if retrythread:
|
||||
retrythread.cancel()
|
||||
retrythread = None
|
||||
cfm.update_collective_address(request['name'],
|
||||
connection.getpeername()[0])
|
||||
tlvdata.send(connection, cfm._dump_keys(None, False))
|
||||
@ -409,6 +414,7 @@ def startup():
|
||||
|
||||
def start_collective():
|
||||
global follower
|
||||
global retrythread
|
||||
if follower:
|
||||
follower.kill()
|
||||
follower = None
|
||||
@ -422,10 +428,7 @@ def start_collective():
|
||||
if connect_to_leader(name=myname, leader=ldrcandidate):
|
||||
break
|
||||
else:
|
||||
for member in cfm.list_collective():
|
||||
if member == myname:
|
||||
continue
|
||||
eventlet.spawn_n(try_assimilate,
|
||||
cfm.get_collective_member(member)['address'])
|
||||
retrythread = eventlet.spawn_after(30 + random.random(),
|
||||
start_collective)
|
||||
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user