diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 99a2dfa5..7f95549b 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -37,21 +37,9 @@ def connect_to_leader(cert=None, name=None, leader=None): if leader is None: leader = currentleader try: - remote = socket.create_connection((leader, 13001)) + remote = connect_to_collective(cert, leader) except socket.error: return - # TLS cert validation is custom and will not pass normal CA vetting - # to override completely in the right place requires enormous effort, so just defer until after connect - remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE, keyfile='/etc/confluent/privkey.pem', - certfile='/etc/confluent/srvcert.pem') - if cert: - fprint = util.get_fingerprint(cert) - else: - collent = cfm.get_collective_member_by_address(leader) - fprint = collent['fingerprint'] - if not util.cert_matches(fprint, remote.getpeercert(binary_form=True)): - # probably Janeway up to something - raise Exception("Certificate mismatch in the collective") tlvdata.recv(remote) # the banner tlvdata.recv(remote) # authpassed... 0.. if name is None: @@ -92,6 +80,24 @@ def connect_to_leader(cert=None, name=None, leader=None): # The leader has folded, time to startup again... eventlet.spawn_n(start_collective) + +def connect_to_collective(cert, member): + remote = socket.create_connection((member, 13001)) + # TLS cert validation is custom and will not pass normal CA vetting + # to override completely in the right place requires enormous effort, so just defer until after connect + remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE, keyfile='/etc/confluent/privkey.pem', + certfile='/etc/confluent/srvcert.pem') + if cert: + fprint = util.get_fingerprint(cert) + else: + collent = cfm.get_collective_member_by_address(member) + fprint = collent['fingerprint'] + if not util.cert_matches(fprint, remote.getpeercert(binary_form=True)): + # probably Janeway up to something + raise Exception("Certificate mismatch in the collective") + return remote + + def get_myname(): try: with open('/etc/confluent/cfg/myname', 'r') as f: @@ -226,7 +232,11 @@ def handle_connection(connection, cert, request, local=False): # He needs to bootstrap his configuration and subscribe it to updates def try_assimilate(drone): - pass + remote = connect_to_collective(None, drone) + tlvdata.send(remote, {'operation': 'assimilate', 'name': get_myname(), 'txcount': cfm._txcount}) + answer = tlvdata.recv(remote) + if 'error' in answer: + connect_to_leader(None, None, leader=remote.getpeername()[0]) def get_leader(connection): if currentleader is None or connection.getpeername()[0] == currentleader: