From 8d16b412aea1f0f4628167c44ea6ca00075f8629 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Thu, 8 Apr 2021 13:44:20 -0400 Subject: [PATCH] Further refine collective start process Serialize assimilation, do not induce activity that may have been aborted by an earlier chain. Further, accelerate initial startup by making potential timeouts occur concurrently, rather than sequentially. --- .../confluent/collective/manager.py | 65 ++++++++++++++----- 1 file changed, 48 insertions(+), 17 deletions(-) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 9979107b..1f863e96 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -23,6 +23,7 @@ import confluent.noderange as noderange import confluent.tlvdata as tlvdata import confluent.util as util import eventlet +import eventlet.greenpool as greenpool import eventlet.green.socket as socket import eventlet.green.ssl as ssl import eventlet.green.threading as threading @@ -59,7 +60,7 @@ class ContextBool(object): connecting = ContextBool() leader_init = ContextBool() -def connect_to_leader(cert=None, name=None, leader=None): +def connect_to_leader(cert=None, name=None, leader=None, remote=None): global currentleader global follower if leader is None: @@ -67,7 +68,7 @@ def connect_to_leader(cert=None, name=None, leader=None): log.log({'info': 'Attempting connection to leader {0}'.format(leader), 'subsystem': 'collective'}) try: - remote = connect_to_collective(cert, leader) + remote = connect_to_collective(cert, leader, remote) except socket.error as e: log.log({'error': 'Collective connection attempt to {0} failed: {1}' ''.format(leader, str(e)), @@ -187,13 +188,24 @@ def follow_leader(remote, leader): currentleader = None eventlet.spawn_n(start_collective) +def create_connection(member): + remote = None + try: + remote = socket.create_connection((member, 13001), 2) + remote.settimeout(15) + # TLS cert validation is custom and will not pass normal CA vetting + # to override completely in the right place requires enormous effort, so just defer until after connect + remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE, keyfile='/etc/confluent/privkey.pem', + certfile='/etc/confluent/srvcert.pem') + except Exception as e: + return member, e + return member, remote -def connect_to_collective(cert, member): - remote = socket.create_connection((member, 13001), 15) - # TLS cert validation is custom and will not pass normal CA vetting - # to override completely in the right place requires enormous effort, so just defer until after connect - remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE, keyfile='/etc/confluent/privkey.pem', - certfile='/etc/confluent/srvcert.pem') +def connect_to_collective(cert, member, remote=None): + if remote is None: + _, remote = create_connection(member) + if isinstance(remote, Exception): + raise remote if cert: fprint = cert else: @@ -541,9 +553,9 @@ def populate_collinfo(collinfo): collinfo['offline'].append(member) -def try_assimilate(drone, followcount): +def try_assimilate(drone, followcount, remote): try: - remote = connect_to_collective(None, drone) + remote = connect_to_collective(None, drone, remote) except socket.error: # Oh well, unable to connect, hopefully the rest will be # in order @@ -561,23 +573,25 @@ def try_assimilate(drone, followcount): 'No answer from {0} while trying to assimilate'.format( drone), 'subsystem': 'collective'}) - return + return True if 'txcount' in answer: log.log({'info': 'Deferring to {0} due to target being a better leader'.format( drone), 'subsystem': 'collective'}) + retire_as_leader(drone) connect_to_leader(None, None, leader=remote.getpeername()[0]) - return + return False if 'leader' in answer: # Will wait for leader to see about assimilation - return + return True if 'error' in answer: log.log({ 'error': 'Error encountered while attempting to ' 'assimilate {0}: {1}'.format(drone, answer['error']), 'subsystem': 'collective'}) - return + return True log.log({'info': 'Assimilated {0} into collective'.format(drone), 'subsystem': 'collective'}) + return True def get_leader(connection): @@ -616,11 +630,20 @@ def become_leader(connection): numfollowers = len(list(skipem)) skipem.add(currentleader) skipem.add(skipaddr) + connecto = [] for member in cfm.list_collective(): dronecandidate = cfm.get_collective_member(member)['address'] if dronecandidate in skipem or member == myname: continue - eventlet.spawn_n(try_assimilate, dronecandidate, numfollowers) + connecto.append(dronecandidate) + conpool = greenpool.GreenPool(64) + connections = conpool.imap(create_connection, connecto) + for ent in connections: + member, remote = ent + if isinstance(remote, Exception): + continue + if not try_assimilate(member, numfollowers, remote): + return schedule_rebalance() @@ -697,15 +720,23 @@ def start_collective(): # xmitting data to a follower return myname = get_myname() + connecto = [] for member in sorted(list(cfm.list_collective())): if member == myname: continue if cfm.cfgleader is None: cfm.stop_following(True) ldrcandidate = cfm.get_collective_member(member)['address'] + connecto.append(ldrcandidate) + conpool = greenpool.GreenPool(64) + connections = conpool.imap(create_connection, connecto) + for ent in connections: + member, remote = ent + if isinstance(remote, Exception): + continue log.log({'info': 'Performing startup attempt to {0}'.format( - ldrcandidate), 'subsystem': 'collective'}) - if connect_to_leader(name=myname, leader=ldrcandidate): + member), 'subsystem': 'collective'}) + if connect_to_leader(name=myname, leader=member, remote=remote): break else: retrythread = eventlet.spawn_after(5 + random.random(),