2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-01-16 04:39:16 +00:00

Persint collective info to disk

Additionally, simplify the concluding steps of the join conversation.
This commit is contained in:
Jarrod Johnson 2018-04-26 15:48:15 -04:00
parent 3a354a6300
commit a41a42ffd0
2 changed files with 51 additions and 12 deletions

View File

@ -16,6 +16,7 @@
import base64
import confluent.collective.invites as invites
import confluent.config.configmanager as cfm
import confluent.tlvdata as tlvdata
import confluent.util as util
import eventlet.green.socket as socket
@ -26,8 +27,7 @@ except ImportError:
# while not always required, we use pyopenssl required for at least collective
crypto = None
collcerts = {}
currentleader = None
def handle_connection(connection, cert, request, local=False):
operation = request['operation']
@ -67,13 +67,8 @@ def handle_connection(connection, cert, request, local=False):
proof = base64.b64decode(proof)
j = invites.check_server_proof(invitation, mycert, cert, proof)
if not j:
tlvdata.send(connection,
{'errorcode': 500,
'error': 'Response failed validation'})
return
tlvdata.send(remote, {'collective': {'success': True}})
tlvdata.send(connection, {'collective': {'status': 'Success'}})
#Ok, here start getting assimilated, connect to get the database and register for changes...
if 'joinchallenge' == operation:
mycert = util.get_certificate_from_file('/etc/confluent/srvcert.pem')
proof = base64.b64decode(request['hmac'])
@ -84,8 +79,16 @@ def handle_connection(connection, cert, request, local=False):
connection.close()
return
myrsp = base64.b64encode(myrsp)
tlvdata.send(connection, {'collective': {'approval': myrsp}})
clientready = tlvdata.recv(connection)
if clientready.get('collective', {}).get('success', False):
collcerts[request['name']] = cert
# store certificate signature for the collective trust
fprint = util.get_fingerprint(cert)
cfm.add_collective_member(request['name'],
connection.getpeername()[0], fprint)
tlvdata.send(connection,
{'collective': {'approval': myrsp,
'leader': get_leader(connection)}})
def get_leader(connection):
global currentleader
if currentleader is None:
currentleader = connection.getsockname()[0]
return currentleader

View File

@ -363,6 +363,24 @@ def set_global(globalname, value):
ConfigManager._bg_sync_to_file()
def add_collective_member(name, address, fingerprint):
try:
name = name.encode('utf-8')
except AttributeError:
pass
if _cfgstore is None:
init()
if 'collective' not in _cfgstore:
_cfgstore['collective'] = {}
_cfgstore['collective'][name] = {'address': address,
'fingerprint': fingerprint}
with _dirtylock:
if 'collectivedirty' not in _cfgstore:
_cfgstore['collectivedirty'] = set([])
_cfgstore['collectivedirty'].add(name)
ConfigManager._bg_sync_to_file()
def _mark_dirtykey(category, key, tenant=None):
if type(key) in (str, unicode):
key = key.encode('utf-8')
@ -1521,6 +1539,8 @@ class ConfigManager(object):
global _cfgstore
_cfgstore = {}
rootpath = cls._cfgdir
_load_dict_from_dbm(['collective'], os.path.join(rootpath,
"collective"))
_load_dict_from_dbm(['globals'], os.path.join(rootpath, "globals"))
for confarea in _config_areas:
_load_dict_from_dbm(['main', confarea], os.path.join(rootpath, confarea))
@ -1579,6 +1599,22 @@ class ConfigManager(object):
del globalf[globalkey]
finally:
globalf.close()
if 'collectivedirty' in _cfgstore:
collectivef = dbm.open(os.path.join(cls._cfgdir, "collective"),
'c', 384)
try:
with _dirtylock:
colls = copy.deepcopy(_cfgstore['collectivedirty'])
del _cfgstore['collectivedirty']
for coll in colls:
if coll in _cfgstore['collective']:
collectivef[coll] = cPickle.dumps(
_cfgstore['collective'][coll])
else:
if coll in collectivef:
del globalf[coll]
finally:
collectivef.close()
if 'dirtykeys' in _cfgstore:
with _dirtylock:
currdirt = copy.deepcopy(_cfgstore['dirtykeys'])