diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index d8b87b0d..dafffe0d 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -132,7 +132,7 @@ def connect_to_collective(cert, member): remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE, keyfile='/etc/confluent/privkey.pem', certfile='/etc/confluent/srvcert.pem') if cert: - fprint = util.get_fingerprint(cert) + fprint = cert else: collent = cfm.get_collective_member_by_address(member) fprint = collent['fingerprint'] @@ -176,12 +176,6 @@ def handle_connection(connection, cert, request, local=False): tlvdata.send(connection, {'collective': {'leader': myleader}}) return if 'invite' == operation: - if follower: - tlvdata.send(connection, - {'collective': - {'error': 'Invite can only be run from current ' - 'leader ({0})'.format(currentleader)}}) - return try: cfm.check_quorum() except exc.DegradedCollective: @@ -231,7 +225,7 @@ def handle_connection(connection, cert, request, local=False): f = open('/etc/confluent/cfg/myname', 'w') f.write(name) f.close() - eventlet.spawn_n(connect_to_leader, cert, name) + eventlet.spawn_n(connect_to_leader, rsp['collective']['fingerprint'], name) if 'enroll' == operation: #TODO(jjohnson2): error appropriately when asked to enroll, but the master is elsewhere mycert = util.get_certificate_from_file('/etc/confluent/srvcert.pem') @@ -249,8 +243,12 @@ def handle_connection(connection, cert, request, local=False): connection.getsockname()[0], myfprint) cfm.add_collective_member(request['name'], connection.getpeername()[0], fprint) + myleader = get_leader(connection) + ldrfprint = cfm.get_collective_member_by_address( + myleader)['fingerprint'] tlvdata.send(connection, {'collective': {'approval': myrsp, + 'fingerprint': ldrfprint, 'leader': get_leader(connection)}}) if 'assimilate' == operation: drone = request['name'] diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 5658d378..f07a5202 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -525,7 +525,16 @@ def follow_channel(channel): # mark the connection as broken cfgleader = True + def add_collective_member(name, address, fingerprint): + if cfgleader: + return exec_on_leader('add_collective_member', name, address, fingerprint) + if cfgstreams: + exec_on_followers('_true_add_collective_member', name, address, fingerprint) + _true_add_collective_member(name, address, fingerprint) + + +def _true_add_collective_member(name, address, fingerprint): try: name = name.encode('utf-8') except AttributeError: