2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-02-17 02:58:51 +00:00

Add hooks for collective mode and refactor

In support of config replication, need configmanager to do a few things
This commit is contained in:
Jarrod Johnson 2018-04-30 16:09:28 -04:00
parent af940c972f
commit 641bc7344a

View File

@ -69,6 +69,7 @@ import confluent.config.conf as conf
import confluent.log
import confluent.noderange as noderange
import confluent.util
import confluent.netutil as netutil
import confluent.exceptions as exc
import copy
import cPickle
@ -362,6 +363,9 @@ def set_global(globalname, value):
_cfgstore['globals'][globalname] = value
ConfigManager._bg_sync_to_file()
cfgstreams = {}
def register_config_listener(name, listener):
cfgstreams[listener] = name
def add_collective_member(name, address, fingerprint):
try:
@ -372,7 +376,7 @@ def add_collective_member(name, address, fingerprint):
init()
if 'collective' not in _cfgstore:
_cfgstore['collective'] = {}
_cfgstore['collective'][name] = {'address': address,
_cfgstore['collective'][name] = {'name': name, 'address': address,
'fingerprint': fingerprint}
with _dirtylock:
if 'collectivedirty' not in _cfgstore:
@ -383,6 +387,14 @@ def add_collective_member(name, address, fingerprint):
def get_collective_member(name):
return _cfgstore['collective'][name]
def get_collective_member_by_address(address):
for name in _cfgstore.get('collective', {}):
currdrone = _cfgstore['collective'][name]
if netutil.addresses_match(address, currdrone['address']):
return currdrone
def _mark_dirtykey(category, key, tenant=None):
if type(key) in (str, unicode):
key = key.encode('utf-8')
@ -1691,7 +1703,7 @@ def _restore_keys(jsond, password, newpassword=None):
# At this point, we should have the key situation all sorted
def _dump_keys(password):
def _dump_keys(password, dojson=True):
if _masterkey is None or _masterintegritykey is None:
init_masterkey()
cryptkey = _format_key(_masterkey, password=password)
@ -1708,8 +1720,10 @@ def _dump_keys(password):
else:
integritykey = '*unencrypted:{0}'.format(base64.b64encode(
integritykey['unencryptedvalue']))
return json.dumps({'cryptkey': cryptkey, 'integritykey': integritykey},
sort_keys=True, indent=4, separators=(',', ': '))
keydata = {'cryptkey': cryptkey, 'integritykey': integritykey}
if dojson:
return json.dumps(keydata, sort_keys=True, indent=4, separators=(',', ': '))
return keydata
def restore_db_from_directory(location, password):
@ -1754,11 +1768,7 @@ def dump_db_to_directory(location, password, redact=None, skipkeys=False):
with open(os.path.join(location, 'main.json'), 'w') as cfgfile:
cfgfile.write(ConfigManager(tenant=None)._dump_to_json(redact=redact))
cfgfile.write('\n')
bkupglobals = {}
for globvar in _cfgstore['globals']:
if globvar.endswith('_key'):
continue
bkupglobals[globvar] = _cfgstore['globals'][globvar]
bkupglobals = get_globals()
if bkupglobals:
json.dump(bkupglobals, open(os.path.join(location, 'globals.json')))
try:
@ -1772,6 +1782,16 @@ def dump_db_to_directory(location, password, redact=None, skipkeys=False):
except OSError:
pass
def get_globals():
bkupglobals = {}
for globvar in _cfgstore['globals']:
if globvar.endswith('_key'):
continue
bkupglobals[globvar] = _cfgstore['globals'][globvar]
return bkupglobals
def init(stateless=False):
global _cfgstore
if stateless: