2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-01-28 11:57:37 +00:00

Periodic reassimilation

Have leader periodically try to
assimilate offline members.

This will recover from some scenarios
where a rogue non-quorum collective
or a stray non-collective state
persists.
This commit is contained in:
Jarrod Johnson 2021-04-21 10:30:54 -04:00
parent 9009f63b2d
commit 34cfd99fd8

View File

@ -44,6 +44,7 @@ follower = None
retrythread = None
failovercheck = None
initting = True
reassimilate = None
class ContextBool(object):
def __init__(self):
@ -450,7 +451,7 @@ def handle_connection(connection, cert, request, local=False):
{'error': 'Refusing, my name is better',
'txcount': cfm._txcount,})
return
if follower is not None:
if follower is not None and not follower.dead:
tlvdata.send(
connection,
{'error': 'Already following, assimilate leader first',
@ -636,6 +637,7 @@ def become_leader(connection):
global currentleader
global follower
global retrythread
global reassimilate
log.log({'info': 'Becoming leader of collective',
'subsystem': 'collective'})
if follower is not None:
@ -647,17 +649,31 @@ def become_leader(connection):
retrythread = None
currentleader = connection.getsockname()[0]
skipaddr = connection.getpeername()[0]
if _assimilate_missing(skipaddr):
schedule_rebalance()
if reassimilate is not None:
reassimilate.kill()
reassimilate = eventlet.spawn(reassimilate_missing)
def reassimilate_missing():
while cfm.cfgstreams and _assimilate_missing():
eventlet.sleep(30)
def _assimilate_missing(skipaddr=None):
connecto = []
myname = get_myname()
skipem = set(cfm.cfgstreams)
numfollowers = len(list(skipem))
numfollowers = len(skipem)
skipem.add(currentleader)
skipem.add(skipaddr)
connecto = []
if skipaddr is not None:
skipem.add(skipaddr)
for member in cfm.list_collective():
dronecandidate = cfm.get_collective_member(member)['address']
if dronecandidate in skipem or member == myname:
if dronecandidate in skipem or member == myname or member in skipem:
continue
connecto.append(dronecandidate)
if not connecto:
return True
conpool = greenpool.GreenPool(64)
connections = conpool.imap(create_connection, connecto)
for ent in connections:
@ -665,8 +681,8 @@ def become_leader(connection):
if isinstance(remote, Exception):
continue
if not try_assimilate(member, numfollowers, remote):
return
schedule_rebalance()
return False
return True
def startup():
@ -679,6 +695,11 @@ def startup():
def check_managers():
global failovercheck
if not follower:
try:
cfm.check_quorum()
except exc.DegradedCollective:
failovercheck = None
return
c = cfm.ConfigManager(None)
collinfo = {}
populate_collinfo(collinfo)
@ -714,6 +735,7 @@ def check_managers():
continue
c.set_node_attributes({node: {'collective.manager': {'value': targets[0]}}})
availmanagers[targets[0]] += 1
_assimilate_missing()
failovercheck = None
def schedule_rebalance():