mirror of
https://github.com/xcat2/confluent.git
synced 2025-01-14 19:57:50 +00:00
Further refine collective startup behavior
This commit is contained in:
parent
64d5081be3
commit
153956b2cd
@ -164,6 +164,7 @@ def connect_to_leader(cert=None, name=None, leader=None, remote=None):
|
||||
|
||||
def follow_leader(remote, leader):
|
||||
global currentleader
|
||||
global retrythread
|
||||
cleanexit = False
|
||||
newleader = None
|
||||
try:
|
||||
@ -179,8 +180,8 @@ def follow_leader(remote, leader):
|
||||
if newleader:
|
||||
log.log(
|
||||
{'info': 'Previous leader directed us to join new leader {}'.format(newleader)})
|
||||
eventlet.spawn_n(connect_to_leader, None, get_myname(), newleader)
|
||||
return
|
||||
if connect_to_leader(None, get_myname(), newleader):
|
||||
return
|
||||
log.log({'info': 'Current leader ({0}) has disappeared, restarting '
|
||||
'collective membership'.format(leader), 'subsystem': 'collective'})
|
||||
# The leader has folded, time to startup again...
|
||||
@ -238,6 +239,7 @@ def get_myname():
|
||||
def handle_connection(connection, cert, request, local=False):
|
||||
global currentleader
|
||||
global retrythread
|
||||
connection.settimeout(5)
|
||||
operation = request['operation']
|
||||
if cert:
|
||||
cert = crypto.dump_certificate(crypto.FILETYPE_ASN1, cert)
|
||||
@ -467,10 +469,13 @@ def handle_connection(connection, cert, request, local=False):
|
||||
'subsystem': 'collective'})
|
||||
if cfm.cfgstreams:
|
||||
retire_as_leader(connection.getpeername())
|
||||
eventlet.spawn_n(connect_to_leader, None, None,
|
||||
leader=connection.getpeername()[0])
|
||||
tlvdata.send(connection, {'status': 0})
|
||||
newleader = connection.getpeername()[0]
|
||||
connection.close()
|
||||
if not connect_to_leader(None, None, leader=newleader):
|
||||
if retrythread is None:
|
||||
retrythread = eventlet.spawn_after(random.random(),
|
||||
start_collective)
|
||||
if 'getinfo' == operation:
|
||||
drone = request['name']
|
||||
droneinfo = cfm.get_collective_member(drone)
|
||||
@ -514,10 +519,13 @@ def handle_connection(connection, cert, request, local=False):
|
||||
'should assimilate me, connecting..',
|
||||
'txcount': cfm._txcount})
|
||||
log.log({'info': 'Connecting to leader due to superior '
|
||||
'transaction count', 'subsystem': collective})
|
||||
eventlet.spawn_n(connect_to_leader, None, None,
|
||||
connection.getpeername()[0])
|
||||
'transaction count', 'subsystem': 'collective'})
|
||||
connection.close()
|
||||
if not connect_to_leader(
|
||||
None, None, connection.getpeername()[0]):
|
||||
if retrythread is None:
|
||||
retrythread = eventlet.spawn_after(5 + random.random(),
|
||||
start_collective)
|
||||
return
|
||||
if retrythread is not None:
|
||||
retrythread.cancel()
|
||||
@ -535,6 +543,8 @@ def handle_connection(connection, cert, request, local=False):
|
||||
#tlvdata.send(connection, {'tenants': 0}) # skip the tenants for now,
|
||||
# so far unused anyway
|
||||
if not cfm.relay_slaved_requests(drone, connection):
|
||||
log.log({'info': 'All clients have disconnected, starting recovery process',
|
||||
'subsystem': 'collective'})
|
||||
if retrythread is None: # start a recovery if everyone else seems
|
||||
# to have disappeared
|
||||
retrythread = eventlet.spawn_after(5 + random.random(),
|
||||
@ -556,6 +566,7 @@ def populate_collinfo(collinfo):
|
||||
|
||||
|
||||
def try_assimilate(drone, followcount, remote):
|
||||
global retrythread
|
||||
try:
|
||||
remote = connect_to_collective(None, drone, remote)
|
||||
except socket.error:
|
||||
@ -580,7 +591,10 @@ def try_assimilate(drone, followcount, remote):
|
||||
log.log({'info': 'Deferring to {0} due to target being a better leader'.format(
|
||||
drone), 'subsystem': 'collective'})
|
||||
retire_as_leader(drone)
|
||||
connect_to_leader(None, None, leader=remote.getpeername()[0])
|
||||
if not connect_to_leader(None, None, leader=remote.getpeername()[0]):
|
||||
if retrythread is None:
|
||||
retrythread = eventlet.spawn_after(random.random(),
|
||||
start_collective)
|
||||
return False
|
||||
if 'leader' in answer:
|
||||
# Will wait for leader to see about assimilation
|
||||
@ -706,6 +720,7 @@ def schedule_rebalance():
|
||||
def start_collective():
|
||||
global follower
|
||||
global retrythread
|
||||
retrythread = None
|
||||
try:
|
||||
cfm.membership_callback = schedule_rebalance
|
||||
if follower:
|
||||
|
Loading…
x
Reference in New Issue
Block a user