diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 7e1584f8..07217049 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -61,10 +61,15 @@ class ContextBool(object): connecting = ContextBool() leader_init = ContextBool() +enrolling = ContextBool() def connect_to_leader(cert=None, name=None, leader=None, remote=None): global currentleader global follower + ocert = cert + oname = name + oleader = leader + oremote = remote if leader is None: leader = currentleader log.log({'info': 'Attempting connection to leader {0}'.format(leader), @@ -138,7 +143,9 @@ def connect_to_leader(cert=None, name=None, leader=None, remote=None): remote.close() except Exception: pass - raise Exception("Error doing initial DB transfer") + log.log({'error': 'Retrying connection, error during initial sync', 'subsystem': 'collective'}) + return connect_to_leader(ocert, oname, oleader, oremote) + raise Exception("Error doing initial DB transfer") # bad ssl write retry dbjson += ndata cfm.clear_configuration() try: @@ -390,34 +397,42 @@ def handle_connection(connection, cert, request, local=False): eventlet.spawn_n(connect_to_leader, rsp['collective'][ 'fingerprint'], name) if 'enroll' == operation: - #TODO(jjohnson2): error appropriately when asked to enroll, but the master is elsewhere - mycert = util.get_certificate_from_file('/etc/confluent/srvcert.pem') - proof = base64.b64decode(request['hmac']) - myrsp = invites.check_client_proof(request['name'], mycert, - cert, proof) - if not myrsp: - tlvdata.send(connection, {'error': 'Invalid token'}) - connection.close() - return - if not list(cfm.list_collective()): - # First enrollment of a collective, since the collective doesn't - # quite exist, then set initting false to let the enrollment action - # drive this particular initialization - initting = False - myrsp = base64.b64encode(myrsp) - fprint = util.get_fingerprint(cert) - myfprint = util.get_fingerprint(mycert) - cfm.add_collective_member(get_myname(), - connection.getsockname()[0], myfprint) - cfm.add_collective_member(request['name'], - connection.getpeername()[0], fprint) - myleader = get_leader(connection) - ldrfprint = cfm.get_collective_member_by_address( - myleader)['fingerprint'] - tlvdata.send(connection, - {'collective': {'approval': myrsp, - 'fingerprint': ldrfprint, - 'leader': get_leader(connection)}}) + with enrolling: + cfm.check_quorum() + mycert = util.get_certificate_from_file('/etc/confluent/srvcert.pem') + proof = base64.b64decode(request['hmac']) + myrsp = invites.check_client_proof(request['name'], mycert, + cert, proof) + if not myrsp: + tlvdata.send(connection, {'error': 'Invalid token'}) + connection.close() + return + if not list(cfm.list_collective()): + # First enrollment of a collective, since the collective doesn't + # quite exist, then set initting false to let the enrollment action + # drive this particular initialization + initting = False + myrsp = base64.b64encode(myrsp) + fprint = util.get_fingerprint(cert) + myfprint = util.get_fingerprint(mycert) + cfm.add_collective_member(get_myname(), + connection.getsockname()[0], myfprint) + cfm.add_collective_member(request['name'], + connection.getpeername()[0], fprint) + myleader = get_leader(connection) + ldrfprint = cfm.get_collective_member_by_address( + myleader)['fingerprint'] + tlvdata.send(connection, + {'collective': {'approval': myrsp, + 'fingerprint': ldrfprint, + 'leader': get_leader(connection)}}) + havequorum = False + while not havequorum: + try: + cfm.check_quorum() + havequorum = True + except exc.DegradedCollective: + eventlet.sleep(0.1) if 'assimilate' == operation: drone = request['name'] droneinfo = cfm.get_collective_member(drone)