diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index cb68a9d3..fc143db6 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -31,6 +31,28 @@ except ImportError: currentleader = None +def connect_to_leader(): + remote = socket.create_connection((currentleader, 13001)) + # TLS cert validation is custom and will not pass normal CA vetting + # to override completely in the right place requires enormous effort, so just defer until after connect + remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE, keyfile='/etc/confluent/privkey.pem', + certfile='/etc/confluent/srvcert.pem') + collent = cfm.get_collective_member_by_address(currentleader) + if not util.cert_matches(remote.getpeercert(binary_form=True), collent['fingerprint']): + raise Exception("Certificate mismatch in the collective") # probably Janeway up to something + tlvdata.send(remote, {'collective': {'operation': 'connect'}}) + keydata = tlvdata.recv(remote) + colldata = tlvdata.recv(remote) + globaldata = tlvdata.recv(remote) + dbsize = tlvdata.recv(remote)['dbsize'] + dbjson = '' + while (len(dbjson) < dbsize): + ndata = remote.recv(dbsize - len(dbjson)) + if not ndata: + raise Exception("Error doing initial DB transfer") + dbjson += ndata + + def handle_connection(connection, cert, request, local=False): global currentleader operation = request['operation'] @@ -74,6 +96,7 @@ def handle_connection(connection, cert, request, local=False): return tlvdata.send(connection, {'collective': {'status': 'Success'}}) currentleader = rsp['collective']['leader'] + eventlet.spawn_n(connect_to_leader()) if 'enroll' == operation: mycert = util.get_certificate_from_file('/etc/confluent/srvcert.pem') proof = base64.b64decode(request['hmac']) @@ -105,6 +128,14 @@ def handle_connection(connection, cert, request, local=False): {'error': 'Invalid certificate,' 'redo invitation process'}) return + tlvdata.send(connection, cfm._dump_keys(None, False)) + tlvdata.send(connection, cfm._cfgstore['collective]']) + tlvdata.send(connection, cfm.get_globals()) + cfgdata = cfm.ConfigManager(None)._dump_to_json() + tlvdata.send(connection, {'dbsize': len(cfgdata)}) + connection.write(cfgdata) + tlvdata.send(connection, {'tenants': 0}) # skip the tenants for now, so far unused anyway + cfm.register_cfg_listener(drone, connection) # ok, we have a connecting member whose certificate checks out # He needs to bootstrap his configuration and subscribe it to updates