From 17bf92ce16f44fafdb81a85c7849602fdbfc3f16 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Tue, 1 Apr 2014 14:48:31 -0400 Subject: [PATCH] Aggregate changes into sets and notify only after changeset full This change causes cfg change notifications to more accurately reflect atomic expectactions. If multiple fields are changed on multiple nodes that a watcher may have registered, they will now get that data in one chunk instead of many. --- confluent/config/configmanager.py | 78 +++++++++++++++++++------------ 1 file changed, 47 insertions(+), 31 deletions(-) diff --git a/confluent/config/configmanager.py b/confluent/config/configmanager.py index a8d06831..06897f31 100644 --- a/confluent/config/configmanager.py +++ b/confluent/config/configmanager.py @@ -591,17 +591,17 @@ class ConfigManager(object): retdict[node] = nodeobj return retdict - def _node_added_to_group(self, node, group): + def _node_added_to_group(self, node, group, changeset): try: nodecfg = self._cfgstore['nodes'][node] groupcfg = self._cfgstore['groups'][group] except KeyError: # something did not exist, nothing to do return for attrib in groupcfg.iterkeys(): - self._do_inheritance(nodecfg, attrib, node) - self._notif_attribwatchers({node:{attrib:1}}) + self._do_inheritance(nodecfg, attrib, node, changeset) + self._addchange(changeset, node, attrib) - def _node_removed_from_group(self, node, group): + def _node_removed_from_group(self, node, group, changeset): try: nodecfg = self._cfgstore['nodes'][node] except KeyError: # node did not exist, nothing to do @@ -615,12 +615,12 @@ class ConfigManager(object): if nodecfg[attrib]['inheritedfrom'] == group: _mark_dirtykey('nodes', node, self.tenant) del nodecfg[attrib] # remove invalid inherited data - self._do_inheritance(nodecfg, attrib, node) - self._notif_attribwatchers({node:{attrib:1}}) + self._do_inheritance(nodecfg, attrib, node, changeset) + self._addchange(changeset, node, attrib) except KeyError: # inheritedfrom not set, move on pass - def _do_inheritance(self, nodecfg, attrib, nodename, srcgroup=None): + def _do_inheritance(self, nodecfg, attrib, nodename, changeset, srcgroup=None): # for now, just do single inheritance # TODO: concatenating inheritance if requested if attrib in ('nodes', 'groups'): @@ -642,13 +642,14 @@ class ConfigManager(object): nodecfg[attrib] = \ copy.deepcopy(self._cfgstore['groups'][group][attrib]) nodecfg[attrib]['inheritedfrom'] = group - self._refresh_nodecfg(nodecfg, attrib, nodename) + self._refresh_nodecfg(nodecfg, attrib, nodename, + changeset=changeset) return if srcgroup is not None and group == srcgroup: # break out return - def _sync_groups_to_node(self, groups, node): + def _sync_groups_to_node(self, groups, node, changeset): if 'groups' not in self._cfgstore: self._cfgstore['groups'] = {} for group in self._cfgstore['groups'].iterkeys(): @@ -656,7 +657,7 @@ class ConfigManager(object): if node in self._cfgstore['groups'][group]['nodes']: self._cfgstore['groups'][group]['nodes'].discard(node) _mark_dirtykey('groups', group, self.tenant) - self._node_removed_from_group(node, group) + self._node_removed_from_group(node, group, changeset) for group in groups: if group not in self._cfgstore['groups']: _mark_dirtykey('groups', group, self.tenant) @@ -668,16 +669,16 @@ class ConfigManager(object): _mark_dirtykey('groups', group, self.tenant) self._cfgstore['groups'][group]['nodes'].add(node) # node was not already in given group, perform inheritence fixup - self._node_added_to_group(node, group) + self._node_added_to_group(node, group, changeset) - def _sync_nodes_to_group(self, nodes, group): + def _sync_nodes_to_group(self, nodes, group, changeset): if 'nodes' not in self._cfgstore: self._cfgstore['nodes'] = {} for node in self._cfgstore['nodes'].iterkeys(): if node not in nodes and 'groups' in self._cfgstore['nodes'][node]: if group in self._cfgstore['nodes'][node]['groups']: self._cfgstore['nodes'][node]['groups'].remove(group) - self._node_removed_from_group(node, group) + self._node_removed_from_group(node, group, changeset) for node in nodes: if node not in self._cfgstore['nodes']: _mark_dirtykey('nodes', node, self.tenant) @@ -690,11 +691,12 @@ class ConfigManager(object): self._cfgstore['nodes'][node]['groups'].insert(0, group) else: continue # next node, this node already in - self._node_added_to_group(node, group) + self._node_added_to_group(node, group, changeset) def set_group_attributes(self, attribmap): if 'groups' not in self._cfgstore: self._cfgstore['groups'] = {} + changeset = {} for group in attribmap.iterkeys(): group = group.encode('utf-8') _mark_dirtykey('groups', group, self.tenant) @@ -722,15 +724,17 @@ class ConfigManager(object): cfgobj[attr] = newdict if attr == 'nodes': self._sync_nodes_to_group(group=group, - nodes=attribmap[group]['nodes']) + nodes=attribmap[group]['nodes'], + changeset=changeset) else: # update inheritence for node in cfgobj['nodes']: nodecfg = self._cfgstore['nodes'][node] - self._do_inheritance(nodecfg, attr, node, srcgroup=group) - self._notif_attribwatchers({node:{attr:1}}) + self._do_inheritance(nodecfg, attr, node, changeset, srcgroup=group) + self._addchange(changeset, node, attr) + self._notif_attribwatchers(changeset) self._bg_sync_to_file() - def _refresh_nodecfg(self, cfgobj, attrname, node): + def _refresh_nodecfg(self, cfgobj, attrname, node, changeset): exprmgr = None if 'expression' in cfgobj[attrname]: # evaluate now if exprmgr is None: @@ -741,7 +745,8 @@ class ConfigManager(object): attrname in cfgobj['_expressionkeys']): if exprmgr is None: exprmgr = _ExpressionFormat(cfgobj, node) - self._recalculate_expressions(cfgobj, formatter=exprmgr, node=node) + self._recalculate_expressions(cfgobj, formatter=exprmgr, node=node, + changeset=changeset) def _notif_attribwatchers(self, nodeattrs): if self.tenant not in self._attribwatchers: @@ -757,7 +762,7 @@ class ConfigManager(object): continue for notifierid in attribwatchers[attrname].iterkeys(): if notifierid in notifdata: - if node in notifdata[notifierid]: + if node in notifdata[notifierid]['nodeattrs']: notifdata[notifierid]['nodeattrs'][node].append(attrname) else: notifdata[notifierid]['nodeattrs'][node] = [attrname] @@ -774,26 +779,35 @@ class ConfigManager(object): if self.tenant in self._nodecollwatchers: for watcher in self._nodecollwatchers[self.tenant].itervalues(): watcher(added=[], deleting=nodes, configmanager=self) + changeset = {} if 'nodes' not in self._cfgstore: return for node in nodes: if node in self._cfgstore['nodes']: - self._sync_groups_to_node(node=node, groups=[]) + self._sync_groups_to_node(node=node, groups=[], + changeset=changeset) _mark_dirtykey('nodes', node, self.tenant) del self._cfgstore['nodes'][node] + self._notif_attribwatchers(changeset) self._bg_sync_to_file() def del_groups(self, groups): if 'groups' not in self._cfgstore: return + changeset = {} for group in groups: if group in self._cfgstore['groups']: - self._sync_nodes_to_group(group=group, nodes=[]) - _mark_dirtykey('groups', group, self.tenant) + self._sync_nodes_to_group(group=group, nodes=[], + changeset=changeset) + _mark_dirtykey('groups', group, self.tenant, + changeset=changeset) del self._cfgstore['groups'][group] + self._notif_attribwatchers(changeset) self._bg_sync_to_file() def clear_node_attributes(self, nodes, attributes): + # accumulate all changes into a changeset and push in one go + changeset = {} for node in nodes: node = node.encode('utf-8') try: @@ -807,14 +821,16 @@ class ConfigManager(object): # delete it and check for inheritence to backfil data _mark_dirtykey('nodes', node, self.tenant) del nodek[attrib] - self._do_inheritance(nodek, attrib, node) - self._notif_attribwatchers({node:{attrib:1}}) + self._do_inheritance(nodek, attrib, node, changeset) + self._addchange(changeset, node, attrib) if ('_expressionkeys' in nodek and attrib in nodek['_expressionkeys']): recalcexpressions = True if recalcexpressions: exprmgr = _ExpressionFormat(nodek, node) - self._recalculate_expressions(nodek, formatter=exprmgr, node=node) + self._recalculate_expressions(nodek, formatter=exprmgr, + node=node, changeset=changeset) + self._notif_attribwatchers(changeset) self._bg_sync_to_file() def set_node_attributes(self, attribmap): @@ -852,7 +868,7 @@ class ConfigManager(object): cfgobj[attrname] = newdict if attrname == 'groups': self._sync_groups_to_node(node=node, - groups=attribmap[node]['groups']) + groups=attribmap[node]['groups'], changeset=changeset) if ('_expressionkeys' in cfgobj and attrname in cfgobj['_expressionkeys']): recalcexpressions = True @@ -867,9 +883,9 @@ class ConfigManager(object): if recalcexpressions: if exprmgr is None: exprmgr = _ExpressionFormat(cfgobj, node) - self._recalculate_expressions(cfgobj, formatter=exprmgr, node=node) - if changeset: - self._notif_attribwatchers(changeset) + self._recalculate_expressions(cfgobj, formatter=exprmgr, + node=node, changeset=changeset) + self._notif_attribwatchers(changeset) if newnodes: if self.tenant in self._nodecollwatchers: nodecollwatchers = self._nodecollwatchers[self.tenant] @@ -965,7 +981,7 @@ class ConfigManager(object): if 'expression' in cfgobj[key]: cfgobj[key] = _decode_attribute(key, cfgobj, formatter=formatter) - self._notif_attribwatchers({node:{key:1}}) + self._addchange(changeset, node, key) elif ('cryptvalue' not in cfgobj[key] and 'value' not in cfgobj[key]): # recurse for nested structures, with some hint tha