From c7b01e00b616672a7a1d55434942ce480c136191 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Thu, 26 Apr 2018 16:34:54 -0400 Subject: [PATCH] Begin the 'connect' collective operation First check if we are current leader, reject if not, then if cert is invalid, reject, then comes the TODO. --- .../confluent/collective/manager.py | 29 ++++++++++++++++--- .../confluent/config/configmanager.py | 2 ++ 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 6cea50ea..7b8f1a9b 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -24,12 +24,15 @@ import eventlet.green.ssl as ssl try: import OpenSSL.crypto as crypto except ImportError: - # while not always required, we use pyopenssl required for at least collective + # while not always required, we use pyopenssl required for at least + # collective crypto = None currentleader = None + def handle_connection(connection, cert, request, local=False): + global currentleader operation = request['operation'] if cert: cert = crypto.dump_certificate(crypto.FILETYPE_ASN1, cert) @@ -39,7 +42,8 @@ def handle_connection(connection, cert, request, local=False): if 'invite' == operation: name = request['name'] invitation = invites.create_server_invitation(name) - tlvdata.send(connection, {'collective': {'invitation': invitation}}) + tlvdata.send(connection, + {'collective': {'invitation': invitation}}) if 'join' == operation: invitation = request['invitation'] invitation = base64.b64decode(invitation) @@ -61,7 +65,7 @@ def handle_connection(connection, cert, request, local=False): tlvdata.recv(remote) # ignore banner tlvdata.recv(remote) # ignore authpassed: 0 tlvdata.send(remote, {'collective': {'operation': 'joinchallenge', - 'name': name, 'hmac': proof}}) + 'name': name, 'hmac': proof}}) rsp = tlvdata.recv(remote) proof = rsp['collective']['approval'] proof = base64.b64decode(proof) @@ -69,6 +73,7 @@ def handle_connection(connection, cert, request, local=False): if not j: return tlvdata.send(connection, {'collective': {'status': 'Success'}}) + currentleader = rsp['collective']['leader'] if 'joinchallenge' == operation: mycert = util.get_certificate_from_file('/etc/confluent/srvcert.pem') proof = base64.b64decode(request['hmac']) @@ -85,7 +90,23 @@ def handle_connection(connection, cert, request, local=False): tlvdata.send(connection, {'collective': {'approval': myrsp, 'leader': get_leader(connection)}}) - + if 'connect' == operation: + myself = connection.getsockname()[0] + if myself != get_leader(connection): + tlvdata.send( + connection, + {'error': 'Cannot assimilate, our leader is ' + 'in another castle', 'leader': currentleader}) + return + drone = request['name'] + droneinfo = cfm.get_collective_member(drone) + if not util.cert_matches(droneinfo['fingerprint'], cert): + tlvdata.send(connection, + {'error': 'Invalid certificate,' + 'redo invitation process'}) + return + # ok, we have a connecting member whose certificate checks out + # He needs to bootstrap his configuration and subscribe it to updates def get_leader(connection): global currentleader diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 05c2d520..2bb83740 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -380,6 +380,8 @@ def add_collective_member(name, address, fingerprint): _cfgstore['collectivedirty'].add(name) ConfigManager._bg_sync_to_file() +def get_collective_member(name): + return _cfgstore['collective'][name] def _mark_dirtykey(category, key, tenant=None): if type(key) in (str, unicode):