2
0
mirror of https://github.com/xcat2/confluent.git synced 2024-11-22 17:43:14 +00:00

Further refine collective start process

Serialize assimilation, do not induce activity that may have been
aborted by an earlier chain.

Further, accelerate initial startup by making potential timeouts
occur concurrently, rather than sequentially.
This commit is contained in:
Jarrod Johnson 2021-04-08 13:44:20 -04:00
parent c5ec34d5a5
commit 8d16b412ae

View File

@ -23,6 +23,7 @@ import confluent.noderange as noderange
import confluent.tlvdata as tlvdata
import confluent.util as util
import eventlet
import eventlet.greenpool as greenpool
import eventlet.green.socket as socket
import eventlet.green.ssl as ssl
import eventlet.green.threading as threading
@ -59,7 +60,7 @@ class ContextBool(object):
connecting = ContextBool()
leader_init = ContextBool()
def connect_to_leader(cert=None, name=None, leader=None):
def connect_to_leader(cert=None, name=None, leader=None, remote=None):
global currentleader
global follower
if leader is None:
@ -67,7 +68,7 @@ def connect_to_leader(cert=None, name=None, leader=None):
log.log({'info': 'Attempting connection to leader {0}'.format(leader),
'subsystem': 'collective'})
try:
remote = connect_to_collective(cert, leader)
remote = connect_to_collective(cert, leader, remote)
except socket.error as e:
log.log({'error': 'Collective connection attempt to {0} failed: {1}'
''.format(leader, str(e)),
@ -187,13 +188,24 @@ def follow_leader(remote, leader):
currentleader = None
eventlet.spawn_n(start_collective)
def create_connection(member):
remote = None
try:
remote = socket.create_connection((member, 13001), 2)
remote.settimeout(15)
# TLS cert validation is custom and will not pass normal CA vetting
# to override completely in the right place requires enormous effort, so just defer until after connect
remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE, keyfile='/etc/confluent/privkey.pem',
certfile='/etc/confluent/srvcert.pem')
except Exception as e:
return member, e
return member, remote
def connect_to_collective(cert, member):
remote = socket.create_connection((member, 13001), 15)
# TLS cert validation is custom and will not pass normal CA vetting
# to override completely in the right place requires enormous effort, so just defer until after connect
remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE, keyfile='/etc/confluent/privkey.pem',
certfile='/etc/confluent/srvcert.pem')
def connect_to_collective(cert, member, remote=None):
if remote is None:
_, remote = create_connection(member)
if isinstance(remote, Exception):
raise remote
if cert:
fprint = cert
else:
@ -541,9 +553,9 @@ def populate_collinfo(collinfo):
collinfo['offline'].append(member)
def try_assimilate(drone, followcount):
def try_assimilate(drone, followcount, remote):
try:
remote = connect_to_collective(None, drone)
remote = connect_to_collective(None, drone, remote)
except socket.error:
# Oh well, unable to connect, hopefully the rest will be
# in order
@ -561,23 +573,25 @@ def try_assimilate(drone, followcount):
'No answer from {0} while trying to assimilate'.format(
drone),
'subsystem': 'collective'})
return
return True
if 'txcount' in answer:
log.log({'info': 'Deferring to {0} due to target being a better leader'.format(
drone), 'subsystem': 'collective'})
retire_as_leader(drone)
connect_to_leader(None, None, leader=remote.getpeername()[0])
return
return False
if 'leader' in answer:
# Will wait for leader to see about assimilation
return
return True
if 'error' in answer:
log.log({
'error': 'Error encountered while attempting to '
'assimilate {0}: {1}'.format(drone, answer['error']),
'subsystem': 'collective'})
return
return True
log.log({'info': 'Assimilated {0} into collective'.format(drone),
'subsystem': 'collective'})
return True
def get_leader(connection):
@ -616,11 +630,20 @@ def become_leader(connection):
numfollowers = len(list(skipem))
skipem.add(currentleader)
skipem.add(skipaddr)
connecto = []
for member in cfm.list_collective():
dronecandidate = cfm.get_collective_member(member)['address']
if dronecandidate in skipem or member == myname:
continue
eventlet.spawn_n(try_assimilate, dronecandidate, numfollowers)
connecto.append(dronecandidate)
conpool = greenpool.GreenPool(64)
connections = conpool.imap(create_connection, connecto)
for ent in connections:
member, remote = ent
if isinstance(remote, Exception):
continue
if not try_assimilate(member, numfollowers, remote):
return
schedule_rebalance()
@ -697,15 +720,23 @@ def start_collective():
# xmitting data to a follower
return
myname = get_myname()
connecto = []
for member in sorted(list(cfm.list_collective())):
if member == myname:
continue
if cfm.cfgleader is None:
cfm.stop_following(True)
ldrcandidate = cfm.get_collective_member(member)['address']
connecto.append(ldrcandidate)
conpool = greenpool.GreenPool(64)
connections = conpool.imap(create_connection, connecto)
for ent in connections:
member, remote = ent
if isinstance(remote, Exception):
continue
log.log({'info': 'Performing startup attempt to {0}'.format(
ldrcandidate), 'subsystem': 'collective'})
if connect_to_leader(name=myname, leader=ldrcandidate):
member), 'subsystem': 'collective'})
if connect_to_leader(name=myname, leader=member, remote=remote):
break
else:
retrythread = eventlet.spawn_after(5 + random.random(),