From 8ce5a7dccfe9aec28076b68555f03888b4486695 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Fri, 1 Mar 2019 13:21:57 -0500 Subject: [PATCH] Phase 1 of node rename support Provide foundation for node renaming, including updating groups and inheritance and notifying collection watchers of the change, and updating the existing watchers with the new notification fingerprint. --- .../confluent/config/configmanager.py | 55 ++++++++++++++++++- confluent_server/confluent/consoleserver.py | 7 ++- confluent_server/confluent/core.py | 1 + confluent_server/confluent/discovery/core.py | 3 +- 4 files changed, 61 insertions(+), 5 deletions(-) diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 4f6ba68d..ebb1dfd7 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -180,6 +180,8 @@ def _rpc_set_user(tenant, name, attributemap): def _rpc_master_set_node_attributes(tenant, attribmap, autocreate): ConfigManager(tenant).set_node_attributes(attribmap, autocreate) +def _rpc_master_rename_nodes(tenant, renamemap): + ConfigManager(tenant).rename_nodes(renamemap) def _rpc_master_clear_node_attributes(tenant, nodes, attributes): ConfigManager(tenant).clear_node_attributes(nodes, attributes) @@ -234,6 +236,10 @@ def _rpc_set_node_attributes(tenant, attribmap, autocreate): ConfigManager(tenant)._true_set_node_attributes(attribmap, autocreate) +def _rpc_rename_nodes(tenant, renamemap): + ConfigManager(tenant)._true_rename_nodes(renamemap) + + def _rpc_set_group_attributes(tenant, attribmap, autocreate): ConfigManager(tenant)._true_set_group_attributes(attribmap, autocreate) @@ -288,9 +294,9 @@ def logException(): event=confluent.log.Events.stacktrace) -def _do_add_watcher(watcher, added, configmanager): +def _do_add_watcher(watcher, added, configmanager, renamed=()): try: - watcher(added=added, deleting=(), renamed=(), configmanager=configmanager) + watcher(added=added, deleting=(), renamed=renamed, configmanager=configmanager) except Exception: logException() @@ -1789,6 +1795,51 @@ class ConfigManager(object): attribmap[node]['groups'] = [] self.set_node_attributes(attribmap, autocreate=True) + def rename_nodes(self, renamemap): + if cfgleader: + return exec_on_leader('_rpc_master_rename_nodes', self.tenant, + renamemap) + if cfgstreams: + exec_on_followers('_rpc_rename_nodes', self.tenant, renamemap) + self._true_rename_nodes(renamemap) + + def _true_rename_nodes(self, renamemap): + oldnames = set(renamemap) + exprmgr = None + currnodes = set(self._cfgstore['nodes']) + missingnodes = oldnames - currnodes + if missingnodes: + raise ValueError( + 'The following nodes to rename do not exist: {0}'.format( + ','.join(missingnodes))) + newnames = set([]) + for name in renamemap: + newnames.add(renamemap[name]) + if newnames & currnodes: + raise ValueError( + 'The following requested new names conflict with existing nodes: {0}'.format( + ','.join(newnames & currnodes))) + for name in renamemap: + self._cfgstore['nodes'][renamemap[name]] = self._cfgstore['nodes'][name] + del self._cfgstore['nodes'][name] + _mark_dirtykey('nodes', name, self.tenant) + _mark_dirtykey('nodes', renamemap[name], self.tenant) + for group in self._cfgstore['nodes'][renamemap[name]].get('groups', []): + self._cfgstore['nodegroups'][group]['nodes'].discard(name) + self._cfgstore['nodegroups'][group]['nodes'].add(renamemap[name]) + _mark_dirtykey('nodegroups', group, self.tenant) + cfgobj = self._cfgstore['nodes'][renamemap[name]] + node = renamemap[name] + changeset = {} + if exprmgr is None: + exprmgr = _ExpressionFormat(cfgobj, node) + self._recalculate_expressions(cfgobj, formatter=exprmgr, node=renamemap[name], changeset=changeset) + if self.tenant in self._nodecollwatchers: + nodecollwatchers = self._nodecollwatchers[self.tenant] + for watcher in nodecollwatchers.itervalues(): + eventlet.spawn_n(_do_add_watcher, watcher, None, self, renamemap) + self._bg_sync_to_file() + def set_node_attributes(self, attribmap, autocreate=False): if cfgleader: # currently config slave to another return exec_on_leader('_rpc_master_set_node_attributes', diff --git a/confluent_server/confluent/consoleserver.py b/confluent_server/confluent/consoleserver.py index 6dd65441..c973c039 100644 --- a/confluent_server/confluent/consoleserver.py +++ b/confluent_server/confluent/consoleserver.py @@ -622,10 +622,13 @@ def disconnect_node(node, configmanager): def _nodechange(added, deleting, renamed, configmanager, renamed=()): - for node in added: - connect_node(node, configmanager) for node in deleting: disconnect_node(node, configmanager) + for node in renamed: + disconnect_node(node, configmanager) + connect_node(renamed[node], configmanager) + for node in added: + connect_node(node, configmanager) def _start_tenant_sessions(cfm): diff --git a/confluent_server/confluent/core.py b/confluent_server/confluent/core.py index 47f2cec6..0d71bd9c 100644 --- a/confluent_server/confluent/core.py +++ b/confluent_server/confluent/core.py @@ -144,6 +144,7 @@ def _init_core(): # be enumerated in any collection noderesources = { 'attributes': { + 'rename': PluginRoute({'handler': 'attributes'}), 'all': PluginRoute({'handler': 'attributes'}), 'current': PluginRoute({'handler': 'attributes'}), 'expression': PluginRoute({'handler': 'attributes'}), diff --git a/confluent_server/confluent/discovery/core.py b/confluent_server/confluent/discovery/core.py index 9d1f3031..0b445d64 100644 --- a/confluent_server/confluent/discovery/core.py +++ b/confluent_server/confluent/discovery/core.py @@ -1133,7 +1133,8 @@ def newnodes(added, deleting, renamed, configmanager): global attribwatcher global needaddhandled global nodeaddhandler - for node in deleting: + alldeleting = set(deleting) | set(renamed) + for node in alldeleting: if node not in known_nodes: continue for mac in known_nodes[node]: