mirror of
https://github.com/xcat2/confluent.git
synced 2025-04-13 16:57:59 +00:00
Improve reliability of collective join
While servicing an enrollment, there's a window for a collective member to be 'defined' but not yet active, meaning quorum may transiently be lost as multiple enrollments progress. Serialize enrollments by holding the enrollment process open. Also, there is a chance that a transient transfer error may occur during loading of the DB. In such a case, restart the connection rather thn aborting.
This commit is contained in:
parent
6df2e822a5
commit
b99034f539
@ -61,10 +61,15 @@ class ContextBool(object):
|
||||
|
||||
connecting = ContextBool()
|
||||
leader_init = ContextBool()
|
||||
enrolling = ContextBool()
|
||||
|
||||
def connect_to_leader(cert=None, name=None, leader=None, remote=None):
|
||||
global currentleader
|
||||
global follower
|
||||
ocert = cert
|
||||
oname = name
|
||||
oleader = leader
|
||||
oremote = remote
|
||||
if leader is None:
|
||||
leader = currentleader
|
||||
log.log({'info': 'Attempting connection to leader {0}'.format(leader),
|
||||
@ -138,7 +143,9 @@ def connect_to_leader(cert=None, name=None, leader=None, remote=None):
|
||||
remote.close()
|
||||
except Exception:
|
||||
pass
|
||||
raise Exception("Error doing initial DB transfer")
|
||||
log.log({'error': 'Retrying connection, error during initial sync', 'subsystem': 'collective'})
|
||||
return connect_to_leader(ocert, oname, oleader, oremote)
|
||||
raise Exception("Error doing initial DB transfer") # bad ssl write retry
|
||||
dbjson += ndata
|
||||
cfm.clear_configuration()
|
||||
try:
|
||||
@ -390,34 +397,42 @@ def handle_connection(connection, cert, request, local=False):
|
||||
eventlet.spawn_n(connect_to_leader, rsp['collective'][
|
||||
'fingerprint'], name)
|
||||
if 'enroll' == operation:
|
||||
#TODO(jjohnson2): error appropriately when asked to enroll, but the master is elsewhere
|
||||
mycert = util.get_certificate_from_file('/etc/confluent/srvcert.pem')
|
||||
proof = base64.b64decode(request['hmac'])
|
||||
myrsp = invites.check_client_proof(request['name'], mycert,
|
||||
cert, proof)
|
||||
if not myrsp:
|
||||
tlvdata.send(connection, {'error': 'Invalid token'})
|
||||
connection.close()
|
||||
return
|
||||
if not list(cfm.list_collective()):
|
||||
# First enrollment of a collective, since the collective doesn't
|
||||
# quite exist, then set initting false to let the enrollment action
|
||||
# drive this particular initialization
|
||||
initting = False
|
||||
myrsp = base64.b64encode(myrsp)
|
||||
fprint = util.get_fingerprint(cert)
|
||||
myfprint = util.get_fingerprint(mycert)
|
||||
cfm.add_collective_member(get_myname(),
|
||||
connection.getsockname()[0], myfprint)
|
||||
cfm.add_collective_member(request['name'],
|
||||
connection.getpeername()[0], fprint)
|
||||
myleader = get_leader(connection)
|
||||
ldrfprint = cfm.get_collective_member_by_address(
|
||||
myleader)['fingerprint']
|
||||
tlvdata.send(connection,
|
||||
{'collective': {'approval': myrsp,
|
||||
'fingerprint': ldrfprint,
|
||||
'leader': get_leader(connection)}})
|
||||
with enrolling:
|
||||
cfm.check_quorum()
|
||||
mycert = util.get_certificate_from_file('/etc/confluent/srvcert.pem')
|
||||
proof = base64.b64decode(request['hmac'])
|
||||
myrsp = invites.check_client_proof(request['name'], mycert,
|
||||
cert, proof)
|
||||
if not myrsp:
|
||||
tlvdata.send(connection, {'error': 'Invalid token'})
|
||||
connection.close()
|
||||
return
|
||||
if not list(cfm.list_collective()):
|
||||
# First enrollment of a collective, since the collective doesn't
|
||||
# quite exist, then set initting false to let the enrollment action
|
||||
# drive this particular initialization
|
||||
initting = False
|
||||
myrsp = base64.b64encode(myrsp)
|
||||
fprint = util.get_fingerprint(cert)
|
||||
myfprint = util.get_fingerprint(mycert)
|
||||
cfm.add_collective_member(get_myname(),
|
||||
connection.getsockname()[0], myfprint)
|
||||
cfm.add_collective_member(request['name'],
|
||||
connection.getpeername()[0], fprint)
|
||||
myleader = get_leader(connection)
|
||||
ldrfprint = cfm.get_collective_member_by_address(
|
||||
myleader)['fingerprint']
|
||||
tlvdata.send(connection,
|
||||
{'collective': {'approval': myrsp,
|
||||
'fingerprint': ldrfprint,
|
||||
'leader': get_leader(connection)}})
|
||||
havequorum = False
|
||||
while not havequorum:
|
||||
try:
|
||||
cfm.check_quorum()
|
||||
havequorum = True
|
||||
except exc.DegradedCollective:
|
||||
eventlet.sleep(0.1)
|
||||
if 'assimilate' == operation:
|
||||
drone = request['name']
|
||||
droneinfo = cfm.get_collective_member(drone)
|
||||
|
Loading…
x
Reference in New Issue
Block a user