From 34cfd99fd8e7a2ccd1f011a310b33e658e96d569 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Wed, 21 Apr 2021 10:30:54 -0400 Subject: [PATCH] Periodic reassimilation Have leader periodically try to assimilate offline members. This will recover from some scenarios where a rogue non-quorum collective or a stray non-collective state persists. --- .../confluent/collective/manager.py | 36 +++++++++++++++---- 1 file changed, 29 insertions(+), 7 deletions(-) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 4580bd39..c4e9d0ac 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -44,6 +44,7 @@ follower = None retrythread = None failovercheck = None initting = True +reassimilate = None class ContextBool(object): def __init__(self): @@ -450,7 +451,7 @@ def handle_connection(connection, cert, request, local=False): {'error': 'Refusing, my name is better', 'txcount': cfm._txcount,}) return - if follower is not None: + if follower is not None and not follower.dead: tlvdata.send( connection, {'error': 'Already following, assimilate leader first', @@ -636,6 +637,7 @@ def become_leader(connection): global currentleader global follower global retrythread + global reassimilate log.log({'info': 'Becoming leader of collective', 'subsystem': 'collective'}) if follower is not None: @@ -647,17 +649,31 @@ def become_leader(connection): retrythread = None currentleader = connection.getsockname()[0] skipaddr = connection.getpeername()[0] + if _assimilate_missing(skipaddr): + schedule_rebalance() + if reassimilate is not None: + reassimilate.kill() + reassimilate = eventlet.spawn(reassimilate_missing) + +def reassimilate_missing(): + while cfm.cfgstreams and _assimilate_missing(): + eventlet.sleep(30) + +def _assimilate_missing(skipaddr=None): + connecto = [] myname = get_myname() skipem = set(cfm.cfgstreams) - numfollowers = len(list(skipem)) + numfollowers = len(skipem) skipem.add(currentleader) - skipem.add(skipaddr) - connecto = [] + if skipaddr is not None: + skipem.add(skipaddr) for member in cfm.list_collective(): dronecandidate = cfm.get_collective_member(member)['address'] - if dronecandidate in skipem or member == myname: + if dronecandidate in skipem or member == myname or member in skipem: continue connecto.append(dronecandidate) + if not connecto: + return True conpool = greenpool.GreenPool(64) connections = conpool.imap(create_connection, connecto) for ent in connections: @@ -665,8 +681,8 @@ def become_leader(connection): if isinstance(remote, Exception): continue if not try_assimilate(member, numfollowers, remote): - return - schedule_rebalance() + return False + return True def startup(): @@ -679,6 +695,11 @@ def startup(): def check_managers(): global failovercheck if not follower: + try: + cfm.check_quorum() + except exc.DegradedCollective: + failovercheck = None + return c = cfm.ConfigManager(None) collinfo = {} populate_collinfo(collinfo) @@ -714,6 +735,7 @@ def check_managers(): continue c.set_node_attributes({node: {'collective.manager': {'value': targets[0]}}}) availmanagers[targets[0]] += 1 + _assimilate_missing() failovercheck = None def schedule_rebalance():