diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 9979107b..1f863e96 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -23,6 +23,7 @@ import confluent.noderange as noderange import confluent.tlvdata as tlvdata import confluent.util as util import eventlet +import eventlet.greenpool as greenpool import eventlet.green.socket as socket import eventlet.green.ssl as ssl import eventlet.green.threading as threading @@ -59,7 +60,7 @@ class ContextBool(object): connecting = ContextBool() leader_init = ContextBool() -def connect_to_leader(cert=None, name=None, leader=None): +def connect_to_leader(cert=None, name=None, leader=None, remote=None): global currentleader global follower if leader is None: @@ -67,7 +68,7 @@ def connect_to_leader(cert=None, name=None, leader=None): log.log({'info': 'Attempting connection to leader {0}'.format(leader), 'subsystem': 'collective'}) try: - remote = connect_to_collective(cert, leader) + remote = connect_to_collective(cert, leader, remote) except socket.error as e: log.log({'error': 'Collective connection attempt to {0} failed: {1}' ''.format(leader, str(e)), @@ -187,13 +188,24 @@ def follow_leader(remote, leader): currentleader = None eventlet.spawn_n(start_collective) +def create_connection(member): + remote = None + try: + remote = socket.create_connection((member, 13001), 2) + remote.settimeout(15) + # TLS cert validation is custom and will not pass normal CA vetting + # to override completely in the right place requires enormous effort, so just defer until after connect + remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE, keyfile='/etc/confluent/privkey.pem', + certfile='/etc/confluent/srvcert.pem') + except Exception as e: + return member, e + return member, remote -def connect_to_collective(cert, member): - remote = socket.create_connection((member, 13001), 15) - # TLS cert validation is custom and will not pass normal CA vetting - # to override completely in the right place requires enormous effort, so just defer until after connect - remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE, keyfile='/etc/confluent/privkey.pem', - certfile='/etc/confluent/srvcert.pem') +def connect_to_collective(cert, member, remote=None): + if remote is None: + _, remote = create_connection(member) + if isinstance(remote, Exception): + raise remote if cert: fprint = cert else: @@ -541,9 +553,9 @@ def populate_collinfo(collinfo): collinfo['offline'].append(member) -def try_assimilate(drone, followcount): +def try_assimilate(drone, followcount, remote): try: - remote = connect_to_collective(None, drone) + remote = connect_to_collective(None, drone, remote) except socket.error: # Oh well, unable to connect, hopefully the rest will be # in order @@ -561,23 +573,25 @@ def try_assimilate(drone, followcount): 'No answer from {0} while trying to assimilate'.format( drone), 'subsystem': 'collective'}) - return + return True if 'txcount' in answer: log.log({'info': 'Deferring to {0} due to target being a better leader'.format( drone), 'subsystem': 'collective'}) + retire_as_leader(drone) connect_to_leader(None, None, leader=remote.getpeername()[0]) - return + return False if 'leader' in answer: # Will wait for leader to see about assimilation - return + return True if 'error' in answer: log.log({ 'error': 'Error encountered while attempting to ' 'assimilate {0}: {1}'.format(drone, answer['error']), 'subsystem': 'collective'}) - return + return True log.log({'info': 'Assimilated {0} into collective'.format(drone), 'subsystem': 'collective'}) + return True def get_leader(connection): @@ -616,11 +630,20 @@ def become_leader(connection): numfollowers = len(list(skipem)) skipem.add(currentleader) skipem.add(skipaddr) + connecto = [] for member in cfm.list_collective(): dronecandidate = cfm.get_collective_member(member)['address'] if dronecandidate in skipem or member == myname: continue - eventlet.spawn_n(try_assimilate, dronecandidate, numfollowers) + connecto.append(dronecandidate) + conpool = greenpool.GreenPool(64) + connections = conpool.imap(create_connection, connecto) + for ent in connections: + member, remote = ent + if isinstance(remote, Exception): + continue + if not try_assimilate(member, numfollowers, remote): + return schedule_rebalance() @@ -697,15 +720,23 @@ def start_collective(): # xmitting data to a follower return myname = get_myname() + connecto = [] for member in sorted(list(cfm.list_collective())): if member == myname: continue if cfm.cfgleader is None: cfm.stop_following(True) ldrcandidate = cfm.get_collective_member(member)['address'] + connecto.append(ldrcandidate) + conpool = greenpool.GreenPool(64) + connections = conpool.imap(create_connection, connecto) + for ent in connections: + member, remote = ent + if isinstance(remote, Exception): + continue log.log({'info': 'Performing startup attempt to {0}'.format( - ldrcandidate), 'subsystem': 'collective'}) - if connect_to_leader(name=myname, leader=ldrcandidate): + member), 'subsystem': 'collective'}) + if connect_to_leader(name=myname, leader=member, remote=remote): break else: retrythread = eventlet.spawn_after(5 + random.random(),