From 29d0e904876a249a6a3afdddc789b931b589e66e Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Wed, 14 Aug 2024 11:26:51 -0400 Subject: [PATCH] Implement confluentdbutil 'merge' For now, implement 'skip', where conflicting nodes/groups are ignored in new input. --- confluent_server/bin/confluentdbutil | 15 ++- .../confluent/config/configmanager.py | 106 ++++++++++++------ 2 files changed, 79 insertions(+), 42 deletions(-) diff --git a/confluent_server/bin/confluentdbutil b/confluent_server/bin/confluentdbutil index 25a5acf8..b7c1e5c7 100755 --- a/confluent_server/bin/confluentdbutil +++ b/confluent_server/bin/confluentdbutil @@ -30,7 +30,7 @@ import confluent.config.conf as conf import confluent.main as main argparser = optparse.OptionParser( - usage="Usage: %prog [options] [dump|restore] [path]") + usage="Usage: %prog [options] [dump|restore|merge] [path]") argparser.add_option('-p', '--password', help='Password to use to protect/unlock a protected dump') argparser.add_option('-i', '--interactivepassword', help='Prompt for password', @@ -51,13 +51,13 @@ argparser.add_option('-s', '--skipkeys', action='store_true', 'data is needed. keys do not change and as such ' 'they do not require incremental backup') (options, args) = argparser.parse_args() -if len(args) != 2 or args[0] not in ('dump', 'restore'): +if len(args) != 2 or args[0] not in ('dump', 'restore', 'merge'): argparser.print_help() sys.exit(1) dumpdir = args[1] -if args[0] == 'restore': +if args[0] in ('restore', 'merge'): pid = main.is_running() if pid is not None: print("Confluent is running, must shut down to restore db") @@ -69,9 +69,12 @@ if args[0] == 'restore': if options.interactivepassword: password = getpass.getpass('Enter password to restore backup: ') try: - cfm.init(True) - cfm.statelessmode = True - cfm.restore_db_from_directory(dumpdir, password) + stateless = args[0] == 'restore' + cfm.init(stateless) + cfm.statelessmode = stateless + cfm.restore_db_from_directory( + dumpdir, password, + merge="skip" if args[0] == 'merge' else False) cfm.statelessmode = False cfm.ConfigManager.wait_for_sync(True) if owner != 0: diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 6cbf4604..788c2d60 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -1903,7 +1903,7 @@ class ConfigManager(object): def add_group_attributes(self, attribmap): self.set_group_attributes(attribmap, autocreate=True) - def set_group_attributes(self, attribmap, autocreate=False): + def set_group_attributes(self, attribmap, autocreate=False, merge="replace", keydata=None, skipped=None): for group in attribmap: curr = attribmap[group] for attrib in curr: @@ -1924,11 +1924,11 @@ class ConfigManager(object): if cfgstreams: exec_on_followers('_rpc_set_group_attributes', self.tenant, attribmap, autocreate) - self._true_set_group_attributes(attribmap, autocreate) + self._true_set_group_attributes(attribmap, autocreate, merge=merge, keydata=keydata, skipped=skipped) - def _true_set_group_attributes(self, attribmap, autocreate=False): + def _true_set_group_attributes(self, attribmap, autocreate=False, merge="replace", keydata=None, skipped=None): changeset = {} - for group in attribmap: + for group in list(attribmap): if group == '': raise ValueError('"{0}" is not a valid group name'.format( group)) @@ -1941,6 +1941,11 @@ class ConfigManager(object): group)) if not autocreate and group not in self._cfgstore['nodegroups']: raise ValueError("{0} group does not exist".format(group)) + if merge == 'skip' and group in self._cfgstore['nodegroups']: + if skipped is not None: + skipped.append(group) + del attribmap[group] + continue for attr in list(attribmap[group]): # first do a pass to normalize out any aliased attribute names if attr in _attraliases: @@ -2015,6 +2020,9 @@ class ConfigManager(object): newdict = {'value': attribmap[group][attr]} else: newdict = attribmap[group][attr] + if keydata and attr.startswith('secret.') and 'cryptvalue' in newdict: + newdict['value'] = decrypt_value(newdict['cryptvalue'], keydata['cryptkey'], keydata['integritykey']) + del newdict['cryptvalue'] if 'value' in newdict and attr.startswith("secret."): newdict['cryptvalue'] = crypt_value(newdict['value']) del newdict['value'] @@ -2349,7 +2357,7 @@ class ConfigManager(object): - def set_node_attributes(self, attribmap, autocreate=False): + def set_node_attributes(self, attribmap, autocreate=False, merge="replace", keydata=None, skipped=None): for node in attribmap: curr = attribmap[node] for attrib in curr: @@ -2370,9 +2378,9 @@ class ConfigManager(object): if cfgstreams: exec_on_followers('_rpc_set_node_attributes', self.tenant, attribmap, autocreate) - self._true_set_node_attributes(attribmap, autocreate) + self._true_set_node_attributes(attribmap, autocreate, merge, keydata, skipped) - def _true_set_node_attributes(self, attribmap, autocreate): + def _true_set_node_attributes(self, attribmap, autocreate, merge="replace", keydata=None, skipped=None): # TODO(jbjohnso): multi mgr support, here if we have peers, # pickle the arguments and fire them off in eventlet # flows to peers, all should have the same result @@ -2380,7 +2388,7 @@ class ConfigManager(object): changeset = {} # first do a sanity check of the input upfront # this mitigates risk of arguments being partially applied - for node in attribmap: + for node in list(attribmap): node = confluent.util.stringify(node) if node == '': raise ValueError('"{0}" is not a valid node name'.format(node)) @@ -2393,6 +2401,11 @@ class ConfigManager(object): '"{0}" is not a valid node name'.format(node)) if autocreate is False and node not in self._cfgstore['nodes']: raise ValueError("node {0} does not exist".format(node)) + if merge == "skip" and node in self._cfgstore['nodes']: + del attribmap[node] + if skipped is not None: + skipped.append(node) + continue if 'groups' not in attribmap[node] and node not in self._cfgstore['nodes']: attribmap[node]['groups'] = [] for attrname in list(attribmap[node]): @@ -2463,6 +2476,9 @@ class ConfigManager(object): # add check here, skip None attributes if newdict is None: continue + if keydata and attrname.startswith('secret.') and 'cryptvalue' in newdict: + newdict['value'] = decrypt_value(newdict['cryptvalue'], keydata['cryptkey'], keydata['integritykey']) + del newdict['cryptvalue'] if 'value' in newdict and attrname.startswith("secret."): newdict['cryptvalue'] = crypt_value(newdict['value']) del newdict['value'] @@ -2503,14 +2519,14 @@ class ConfigManager(object): self._bg_sync_to_file() #TODO: wait for synchronization to suceed/fail??) - def _load_from_json(self, jsondata, sync=True): + def _load_from_json(self, jsondata, sync=True, merge=False, keydata=None): self.inrestore = True try: - self._load_from_json_backend(jsondata, sync=True) + self._load_from_json_backend(jsondata, sync=True, merge=merge, keydata=keydata) finally: self.inrestore = False - def _load_from_json_backend(self, jsondata, sync=True): + def _load_from_json_backend(self, jsondata, sync=True, merge=False, keydata=None): """Load fresh configuration data from jsondata :param jsondata: String of jsondata @@ -2563,20 +2579,27 @@ class ConfigManager(object): pass # Now we have to iterate through each fixed up element, using the # set attribute to flesh out inheritence and expressions - _cfgstore['main']['idmap'] = {} + if (not merge) or _cfgstore.get('main', {}).get('idmap', None) is None: + _cfgstore['main']['idmap'] = {} + attribmerge = merge if merge else "replace" for confarea in _config_areas: - self._cfgstore[confarea] = {} + if not merge or confarea not in self._cfgstore: + self._cfgstore[confarea] = {} if confarea not in tmpconfig: continue if confarea == 'nodes': - self.set_node_attributes(tmpconfig[confarea], True) + self.set_node_attributes(tmpconfig[confarea], True, merge=attribmerge, keydata=keydata) elif confarea == 'nodegroups': - self.set_group_attributes(tmpconfig[confarea], True) + self.set_group_attributes(tmpconfig[confarea], True, merge=attribmerge, keydata=keydata) elif confarea == 'usergroups': + if merge: + continue for usergroup in tmpconfig[confarea]: role = tmpconfig[confarea][usergroup].get('role', 'Administrator') self.create_usergroup(usergroup, role=role) elif confarea == 'users': + if merge: + continue for user in tmpconfig[confarea]: ucfg = tmpconfig[confarea][user] uid = ucfg.get('id', None) @@ -2876,7 +2899,7 @@ def _restore_keys(jsond, password, newpassword=None, sync=True): newpassword = keyfile.read() set_global('master_privacy_key', _format_key(cryptkey, password=newpassword), sync) - if integritykey: + if integritykey: set_global('master_integrity_key', _format_key(integritykey, password=newpassword), sync) _masterkey = cryptkey @@ -2911,35 +2934,46 @@ def _dump_keys(password, dojson=True): return keydata -def restore_db_from_directory(location, password): +def restore_db_from_directory(location, password, merge=False): + kdd = None try: with open(os.path.join(location, 'keys.json'), 'r') as cfgfile: keydata = cfgfile.read() - json.loads(keydata) - _restore_keys(keydata, password) + kdd = json.loads(keydata) + if merge: + if 'cryptkey' in kdd: + kdd['cryptkey'] = _parse_key(kdd['cryptkey'], password) + if 'integritykey' in kdd: + kdd['integritykey'] = _parse_key(kdd['integritykey'], password) + else: + kdd['integritykey'] = None # GCM + else: + kdd = None + _restore_keys(keydata, password) except IOError as e: if e.errno == 2: raise Exception("Cannot restore without keys, this may be a " "redacted dump") - try: - moreglobals = json.load(open(os.path.join(location, 'globals.json'))) - for globvar in moreglobals: - set_global(globvar, moreglobals[globvar]) - except IOError as e: - if e.errno != 2: - raise - try: - collective = json.load(open(os.path.join(location, 'collective.json'))) - _cfgstore['collective'] = {} - for coll in collective: - add_collective_member(coll, collective[coll]['address'], - collective[coll]['fingerprint']) - except IOError as e: - if e.errno != 2: - raise + if not merge: + try: + moreglobals = json.load(open(os.path.join(location, 'globals.json'))) + for globvar in moreglobals: + set_global(globvar, moreglobals[globvar]) + except IOError as e: + if e.errno != 2: + raise + try: + collective = json.load(open(os.path.join(location, 'collective.json'))) + _cfgstore['collective'] = {} + for coll in collective: + add_collective_member(coll, collective[coll]['address'], + collective[coll]['fingerprint']) + except IOError as e: + if e.errno != 2: + raise with open(os.path.join(location, 'main.json'), 'r') as cfgfile: cfgdata = cfgfile.read() - ConfigManager(tenant=None)._load_from_json(cfgdata) + ConfigManager(tenant=None)._load_from_json(cfgdata, merge=merge, keydata=kdd) ConfigManager.wait_for_sync(True)