diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 669b4ed8..6cea50ea 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -16,6 +16,7 @@ import base64 import confluent.collective.invites as invites +import confluent.config.configmanager as cfm import confluent.tlvdata as tlvdata import confluent.util as util import eventlet.green.socket as socket @@ -26,8 +27,7 @@ except ImportError: # while not always required, we use pyopenssl required for at least collective crypto = None -collcerts = {} - +currentleader = None def handle_connection(connection, cert, request, local=False): operation = request['operation'] @@ -67,13 +67,8 @@ def handle_connection(connection, cert, request, local=False): proof = base64.b64decode(proof) j = invites.check_server_proof(invitation, mycert, cert, proof) if not j: - tlvdata.send(connection, - {'errorcode': 500, - 'error': 'Response failed validation'}) return - tlvdata.send(remote, {'collective': {'success': True}}) tlvdata.send(connection, {'collective': {'status': 'Success'}}) - #Ok, here start getting assimilated, connect to get the database and register for changes... if 'joinchallenge' == operation: mycert = util.get_certificate_from_file('/etc/confluent/srvcert.pem') proof = base64.b64decode(request['hmac']) @@ -84,8 +79,16 @@ def handle_connection(connection, cert, request, local=False): connection.close() return myrsp = base64.b64encode(myrsp) - tlvdata.send(connection, {'collective': {'approval': myrsp}}) - clientready = tlvdata.recv(connection) - if clientready.get('collective', {}).get('success', False): - collcerts[request['name']] = cert - # store certificate signature for the collective trust + fprint = util.get_fingerprint(cert) + cfm.add_collective_member(request['name'], + connection.getpeername()[0], fprint) + tlvdata.send(connection, + {'collective': {'approval': myrsp, + 'leader': get_leader(connection)}}) + + +def get_leader(connection): + global currentleader + if currentleader is None: + currentleader = connection.getsockname()[0] + return currentleader diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 9d2087af..61bfbea8 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -363,6 +363,24 @@ def set_global(globalname, value): ConfigManager._bg_sync_to_file() +def add_collective_member(name, address, fingerprint): + try: + name = name.encode('utf-8') + except AttributeError: + pass + if _cfgstore is None: + init() + if 'collective' not in _cfgstore: + _cfgstore['collective'] = {} + _cfgstore['collective'][name] = {'address': address, + 'fingerprint': fingerprint} + with _dirtylock: + if 'collectivedirty' not in _cfgstore: + _cfgstore['collectivedirty'] = set([]) + _cfgstore['collectivedirty'].add(name) + ConfigManager._bg_sync_to_file() + + def _mark_dirtykey(category, key, tenant=None): if type(key) in (str, unicode): key = key.encode('utf-8') @@ -1521,6 +1539,8 @@ class ConfigManager(object): global _cfgstore _cfgstore = {} rootpath = cls._cfgdir + _load_dict_from_dbm(['collective'], os.path.join(rootpath, + "collective")) _load_dict_from_dbm(['globals'], os.path.join(rootpath, "globals")) for confarea in _config_areas: _load_dict_from_dbm(['main', confarea], os.path.join(rootpath, confarea)) @@ -1579,6 +1599,22 @@ class ConfigManager(object): del globalf[globalkey] finally: globalf.close() + if 'collectivedirty' in _cfgstore: + collectivef = dbm.open(os.path.join(cls._cfgdir, "collective"), + 'c', 384) + try: + with _dirtylock: + colls = copy.deepcopy(_cfgstore['collectivedirty']) + del _cfgstore['collectivedirty'] + for coll in colls: + if coll in _cfgstore['collective']: + collectivef[coll] = cPickle.dumps( + _cfgstore['collective'][coll]) + else: + if coll in collectivef: + del globalf[coll] + finally: + collectivef.close() if 'dirtykeys' in _cfgstore: with _dirtylock: currdirt = copy.deepcopy(_cfgstore['dirtykeys'])