2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-02-28 08:11:45 +00:00

Phase 1 of node rename support

Provide foundation for node renaming, including
updating groups and inheritance and notifying collection
watchers of the change, and updating the existing watchers
with the new notification fingerprint.
This commit is contained in:
Jarrod Johnson 2019-03-01 13:21:57 -05:00
parent 23c9e6315a
commit 8ce5a7dccf
4 changed files with 61 additions and 5 deletions

View File

@ -180,6 +180,8 @@ def _rpc_set_user(tenant, name, attributemap):
def _rpc_master_set_node_attributes(tenant, attribmap, autocreate):
ConfigManager(tenant).set_node_attributes(attribmap, autocreate)
def _rpc_master_rename_nodes(tenant, renamemap):
ConfigManager(tenant).rename_nodes(renamemap)
def _rpc_master_clear_node_attributes(tenant, nodes, attributes):
ConfigManager(tenant).clear_node_attributes(nodes, attributes)
@ -234,6 +236,10 @@ def _rpc_set_node_attributes(tenant, attribmap, autocreate):
ConfigManager(tenant)._true_set_node_attributes(attribmap, autocreate)
def _rpc_rename_nodes(tenant, renamemap):
ConfigManager(tenant)._true_rename_nodes(renamemap)
def _rpc_set_group_attributes(tenant, attribmap, autocreate):
ConfigManager(tenant)._true_set_group_attributes(attribmap, autocreate)
@ -288,9 +294,9 @@ def logException():
event=confluent.log.Events.stacktrace)
def _do_add_watcher(watcher, added, configmanager):
def _do_add_watcher(watcher, added, configmanager, renamed=()):
try:
watcher(added=added, deleting=(), renamed=(), configmanager=configmanager)
watcher(added=added, deleting=(), renamed=renamed, configmanager=configmanager)
except Exception:
logException()
@ -1789,6 +1795,51 @@ class ConfigManager(object):
attribmap[node]['groups'] = []
self.set_node_attributes(attribmap, autocreate=True)
def rename_nodes(self, renamemap):
if cfgleader:
return exec_on_leader('_rpc_master_rename_nodes', self.tenant,
renamemap)
if cfgstreams:
exec_on_followers('_rpc_rename_nodes', self.tenant, renamemap)
self._true_rename_nodes(renamemap)
def _true_rename_nodes(self, renamemap):
oldnames = set(renamemap)
exprmgr = None
currnodes = set(self._cfgstore['nodes'])
missingnodes = oldnames - currnodes
if missingnodes:
raise ValueError(
'The following nodes to rename do not exist: {0}'.format(
','.join(missingnodes)))
newnames = set([])
for name in renamemap:
newnames.add(renamemap[name])
if newnames & currnodes:
raise ValueError(
'The following requested new names conflict with existing nodes: {0}'.format(
','.join(newnames & currnodes)))
for name in renamemap:
self._cfgstore['nodes'][renamemap[name]] = self._cfgstore['nodes'][name]
del self._cfgstore['nodes'][name]
_mark_dirtykey('nodes', name, self.tenant)
_mark_dirtykey('nodes', renamemap[name], self.tenant)
for group in self._cfgstore['nodes'][renamemap[name]].get('groups', []):
self._cfgstore['nodegroups'][group]['nodes'].discard(name)
self._cfgstore['nodegroups'][group]['nodes'].add(renamemap[name])
_mark_dirtykey('nodegroups', group, self.tenant)
cfgobj = self._cfgstore['nodes'][renamemap[name]]
node = renamemap[name]
changeset = {}
if exprmgr is None:
exprmgr = _ExpressionFormat(cfgobj, node)
self._recalculate_expressions(cfgobj, formatter=exprmgr, node=renamemap[name], changeset=changeset)
if self.tenant in self._nodecollwatchers:
nodecollwatchers = self._nodecollwatchers[self.tenant]
for watcher in nodecollwatchers.itervalues():
eventlet.spawn_n(_do_add_watcher, watcher, None, self, renamemap)
self._bg_sync_to_file()
def set_node_attributes(self, attribmap, autocreate=False):
if cfgleader: # currently config slave to another
return exec_on_leader('_rpc_master_set_node_attributes',

View File

@ -622,10 +622,13 @@ def disconnect_node(node, configmanager):
def _nodechange(added, deleting, renamed, configmanager, renamed=()):
for node in added:
connect_node(node, configmanager)
for node in deleting:
disconnect_node(node, configmanager)
for node in renamed:
disconnect_node(node, configmanager)
connect_node(renamed[node], configmanager)
for node in added:
connect_node(node, configmanager)
def _start_tenant_sessions(cfm):

View File

@ -144,6 +144,7 @@ def _init_core():
# be enumerated in any collection
noderesources = {
'attributes': {
'rename': PluginRoute({'handler': 'attributes'}),
'all': PluginRoute({'handler': 'attributes'}),
'current': PluginRoute({'handler': 'attributes'}),
'expression': PluginRoute({'handler': 'attributes'}),

View File

@ -1133,7 +1133,8 @@ def newnodes(added, deleting, renamed, configmanager):
global attribwatcher
global needaddhandled
global nodeaddhandler
for node in deleting:
alldeleting = set(deleting) | set(renamed)
for node in alldeleting:
if node not in known_nodes:
continue
for mac in known_nodes[node]: