2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-09-27 20:48:12 +00:00

Implement confluentdbutil 'merge'

For now, implement 'skip', where conflicting nodes/groups are
ignored in new input.
This commit is contained in:
Jarrod Johnson
2024-08-14 11:26:51 -04:00
parent 8fd39c36bb
commit 29d0e90487
2 changed files with 79 additions and 42 deletions

View File

@@ -30,7 +30,7 @@ import confluent.config.conf as conf
import confluent.main as main
argparser = optparse.OptionParser(
usage="Usage: %prog [options] [dump|restore] [path]")
usage="Usage: %prog [options] [dump|restore|merge] [path]")
argparser.add_option('-p', '--password',
help='Password to use to protect/unlock a protected dump')
argparser.add_option('-i', '--interactivepassword', help='Prompt for password',
@@ -51,13 +51,13 @@ argparser.add_option('-s', '--skipkeys', action='store_true',
'data is needed. keys do not change and as such '
'they do not require incremental backup')
(options, args) = argparser.parse_args()
if len(args) != 2 or args[0] not in ('dump', 'restore'):
if len(args) != 2 or args[0] not in ('dump', 'restore', 'merge'):
argparser.print_help()
sys.exit(1)
dumpdir = args[1]
if args[0] == 'restore':
if args[0] in ('restore', 'merge'):
pid = main.is_running()
if pid is not None:
print("Confluent is running, must shut down to restore db")
@@ -69,9 +69,12 @@ if args[0] == 'restore':
if options.interactivepassword:
password = getpass.getpass('Enter password to restore backup: ')
try:
cfm.init(True)
cfm.statelessmode = True
cfm.restore_db_from_directory(dumpdir, password)
stateless = args[0] == 'restore'
cfm.init(stateless)
cfm.statelessmode = stateless
cfm.restore_db_from_directory(
dumpdir, password,
merge="skip" if args[0] == 'merge' else False)
cfm.statelessmode = False
cfm.ConfigManager.wait_for_sync(True)
if owner != 0:

View File

@@ -1903,7 +1903,7 @@ class ConfigManager(object):
def add_group_attributes(self, attribmap):
self.set_group_attributes(attribmap, autocreate=True)
def set_group_attributes(self, attribmap, autocreate=False):
def set_group_attributes(self, attribmap, autocreate=False, merge="replace", keydata=None, skipped=None):
for group in attribmap:
curr = attribmap[group]
for attrib in curr:
@@ -1924,11 +1924,11 @@ class ConfigManager(object):
if cfgstreams:
exec_on_followers('_rpc_set_group_attributes', self.tenant,
attribmap, autocreate)
self._true_set_group_attributes(attribmap, autocreate)
self._true_set_group_attributes(attribmap, autocreate, merge=merge, keydata=keydata, skipped=skipped)
def _true_set_group_attributes(self, attribmap, autocreate=False):
def _true_set_group_attributes(self, attribmap, autocreate=False, merge="replace", keydata=None, skipped=None):
changeset = {}
for group in attribmap:
for group in list(attribmap):
if group == '':
raise ValueError('"{0}" is not a valid group name'.format(
group))
@@ -1941,6 +1941,11 @@ class ConfigManager(object):
group))
if not autocreate and group not in self._cfgstore['nodegroups']:
raise ValueError("{0} group does not exist".format(group))
if merge == 'skip' and group in self._cfgstore['nodegroups']:
if skipped is not None:
skipped.append(group)
del attribmap[group]
continue
for attr in list(attribmap[group]):
# first do a pass to normalize out any aliased attribute names
if attr in _attraliases:
@@ -2015,6 +2020,9 @@ class ConfigManager(object):
newdict = {'value': attribmap[group][attr]}
else:
newdict = attribmap[group][attr]
if keydata and attr.startswith('secret.') and 'cryptvalue' in newdict:
newdict['value'] = decrypt_value(newdict['cryptvalue'], keydata['cryptkey'], keydata['integritykey'])
del newdict['cryptvalue']
if 'value' in newdict and attr.startswith("secret."):
newdict['cryptvalue'] = crypt_value(newdict['value'])
del newdict['value']
@@ -2349,7 +2357,7 @@ class ConfigManager(object):
def set_node_attributes(self, attribmap, autocreate=False):
def set_node_attributes(self, attribmap, autocreate=False, merge="replace", keydata=None, skipped=None):
for node in attribmap:
curr = attribmap[node]
for attrib in curr:
@@ -2370,9 +2378,9 @@ class ConfigManager(object):
if cfgstreams:
exec_on_followers('_rpc_set_node_attributes',
self.tenant, attribmap, autocreate)
self._true_set_node_attributes(attribmap, autocreate)
self._true_set_node_attributes(attribmap, autocreate, merge, keydata, skipped)
def _true_set_node_attributes(self, attribmap, autocreate):
def _true_set_node_attributes(self, attribmap, autocreate, merge="replace", keydata=None, skipped=None):
# TODO(jbjohnso): multi mgr support, here if we have peers,
# pickle the arguments and fire them off in eventlet
# flows to peers, all should have the same result
@@ -2380,7 +2388,7 @@ class ConfigManager(object):
changeset = {}
# first do a sanity check of the input upfront
# this mitigates risk of arguments being partially applied
for node in attribmap:
for node in list(attribmap):
node = confluent.util.stringify(node)
if node == '':
raise ValueError('"{0}" is not a valid node name'.format(node))
@@ -2393,6 +2401,11 @@ class ConfigManager(object):
'"{0}" is not a valid node name'.format(node))
if autocreate is False and node not in self._cfgstore['nodes']:
raise ValueError("node {0} does not exist".format(node))
if merge == "skip" and node in self._cfgstore['nodes']:
del attribmap[node]
if skipped is not None:
skipped.append(node)
continue
if 'groups' not in attribmap[node] and node not in self._cfgstore['nodes']:
attribmap[node]['groups'] = []
for attrname in list(attribmap[node]):
@@ -2463,6 +2476,9 @@ class ConfigManager(object):
# add check here, skip None attributes
if newdict is None:
continue
if keydata and attrname.startswith('secret.') and 'cryptvalue' in newdict:
newdict['value'] = decrypt_value(newdict['cryptvalue'], keydata['cryptkey'], keydata['integritykey'])
del newdict['cryptvalue']
if 'value' in newdict and attrname.startswith("secret."):
newdict['cryptvalue'] = crypt_value(newdict['value'])
del newdict['value']
@@ -2503,14 +2519,14 @@ class ConfigManager(object):
self._bg_sync_to_file()
#TODO: wait for synchronization to suceed/fail??)
def _load_from_json(self, jsondata, sync=True):
def _load_from_json(self, jsondata, sync=True, merge=False, keydata=None):
self.inrestore = True
try:
self._load_from_json_backend(jsondata, sync=True)
self._load_from_json_backend(jsondata, sync=True, merge=merge, keydata=keydata)
finally:
self.inrestore = False
def _load_from_json_backend(self, jsondata, sync=True):
def _load_from_json_backend(self, jsondata, sync=True, merge=False, keydata=None):
"""Load fresh configuration data from jsondata
:param jsondata: String of jsondata
@@ -2563,20 +2579,27 @@ class ConfigManager(object):
pass
# Now we have to iterate through each fixed up element, using the
# set attribute to flesh out inheritence and expressions
_cfgstore['main']['idmap'] = {}
if (not merge) or _cfgstore.get('main', {}).get('idmap', None) is None:
_cfgstore['main']['idmap'] = {}
attribmerge = merge if merge else "replace"
for confarea in _config_areas:
self._cfgstore[confarea] = {}
if not merge or confarea not in self._cfgstore:
self._cfgstore[confarea] = {}
if confarea not in tmpconfig:
continue
if confarea == 'nodes':
self.set_node_attributes(tmpconfig[confarea], True)
self.set_node_attributes(tmpconfig[confarea], True, merge=attribmerge, keydata=keydata)
elif confarea == 'nodegroups':
self.set_group_attributes(tmpconfig[confarea], True)
self.set_group_attributes(tmpconfig[confarea], True, merge=attribmerge, keydata=keydata)
elif confarea == 'usergroups':
if merge:
continue
for usergroup in tmpconfig[confarea]:
role = tmpconfig[confarea][usergroup].get('role', 'Administrator')
self.create_usergroup(usergroup, role=role)
elif confarea == 'users':
if merge:
continue
for user in tmpconfig[confarea]:
ucfg = tmpconfig[confarea][user]
uid = ucfg.get('id', None)
@@ -2876,7 +2899,7 @@ def _restore_keys(jsond, password, newpassword=None, sync=True):
newpassword = keyfile.read()
set_global('master_privacy_key', _format_key(cryptkey,
password=newpassword), sync)
if integritykey:
if integritykey:
set_global('master_integrity_key', _format_key(integritykey,
password=newpassword), sync)
_masterkey = cryptkey
@@ -2911,35 +2934,46 @@ def _dump_keys(password, dojson=True):
return keydata
def restore_db_from_directory(location, password):
def restore_db_from_directory(location, password, merge=False):
kdd = None
try:
with open(os.path.join(location, 'keys.json'), 'r') as cfgfile:
keydata = cfgfile.read()
json.loads(keydata)
_restore_keys(keydata, password)
kdd = json.loads(keydata)
if merge:
if 'cryptkey' in kdd:
kdd['cryptkey'] = _parse_key(kdd['cryptkey'], password)
if 'integritykey' in kdd:
kdd['integritykey'] = _parse_key(kdd['integritykey'], password)
else:
kdd['integritykey'] = None # GCM
else:
kdd = None
_restore_keys(keydata, password)
except IOError as e:
if e.errno == 2:
raise Exception("Cannot restore without keys, this may be a "
"redacted dump")
try:
moreglobals = json.load(open(os.path.join(location, 'globals.json')))
for globvar in moreglobals:
set_global(globvar, moreglobals[globvar])
except IOError as e:
if e.errno != 2:
raise
try:
collective = json.load(open(os.path.join(location, 'collective.json')))
_cfgstore['collective'] = {}
for coll in collective:
add_collective_member(coll, collective[coll]['address'],
collective[coll]['fingerprint'])
except IOError as e:
if e.errno != 2:
raise
if not merge:
try:
moreglobals = json.load(open(os.path.join(location, 'globals.json')))
for globvar in moreglobals:
set_global(globvar, moreglobals[globvar])
except IOError as e:
if e.errno != 2:
raise
try:
collective = json.load(open(os.path.join(location, 'collective.json')))
_cfgstore['collective'] = {}
for coll in collective:
add_collective_member(coll, collective[coll]['address'],
collective[coll]['fingerprint'])
except IOError as e:
if e.errno != 2:
raise
with open(os.path.join(location, 'main.json'), 'r') as cfgfile:
cfgdata = cfgfile.read()
ConfigManager(tenant=None)._load_from_json(cfgdata)
ConfigManager(tenant=None)._load_from_json(cfgdata, merge=merge, keydata=kdd)
ConfigManager.wait_for_sync(True)