diff --git a/confluent/config/configmanager.py b/confluent/config/configmanager.py index 8a6d1e9c..a8d06831 100644 --- a/confluent/config/configmanager.py +++ b/confluent/config/configmanager.py @@ -599,7 +599,7 @@ class ConfigManager(object): return for attrib in groupcfg.iterkeys(): self._do_inheritance(nodecfg, attrib, node) - self._notif_attribwatchers([attrib], [node]) + self._notif_attribwatchers({node:{attrib:1}}) def _node_removed_from_group(self, node, group): try: @@ -616,7 +616,7 @@ class ConfigManager(object): _mark_dirtykey('nodes', node, self.tenant) del nodecfg[attrib] # remove invalid inherited data self._do_inheritance(nodecfg, attrib, node) - self._notif_attribwatchers([attrib], [node]) + self._notif_attribwatchers({node:{attrib:1}}) except KeyError: # inheritedfrom not set, move on pass @@ -643,7 +643,6 @@ class ConfigManager(object): copy.deepcopy(self._cfgstore['groups'][group][attrib]) nodecfg[attrib]['inheritedfrom'] = group self._refresh_nodecfg(nodecfg, attrib, nodename) - self._notif_attribwatchers([attrib], [nodename]) return if srcgroup is not None and group == srcgroup: # break out @@ -728,7 +727,7 @@ class ConfigManager(object): for node in cfgobj['nodes']: nodecfg = self._cfgstore['nodes'][node] self._do_inheritance(nodecfg, attr, node, srcgroup=group) - self._notif_attribwatchers([attr], [node]) + self._notif_attribwatchers({node:{attr:1}}) self._bg_sync_to_file() def _refresh_nodecfg(self, cfgobj, attrname, node): @@ -744,30 +743,32 @@ class ConfigManager(object): exprmgr = _ExpressionFormat(cfgobj, node) self._recalculate_expressions(cfgobj, formatter=exprmgr, node=node) - def _notif_attribwatchers(self, attrnames, nodes): + def _notif_attribwatchers(self, nodeattrs): if self.tenant not in self._attribwatchers: return notifdata = {} attribwatchers = self._attribwatchers[self.tenant] - for node in nodes: + for node in nodeattrs.iterkeys(): if node not in attribwatchers: continue attribwatchers = attribwatchers[node] - for attrname in attrnames: + for attrname in nodeattrs[node].iterkeys(): if attrname not in attribwatchers: continue for notifierid in attribwatchers[attrname].iterkeys(): - if notifierid not in notifdata: + if notifierid in notifdata: + if node in notifdata[notifierid]: + notifdata[notifierid]['nodeattrs'][node].append(attrname) + else: + notifdata[notifierid]['nodeattrs'][node] = [attrname] + else: notifdata[notifierid] = { - 'nodes': [ node ], - 'attributes': [ attrname ], + 'nodeattrs': { node: [attrname] }, 'callback': attribwatchers[attrname][notifierid] } for watcher in notifdata.itervalues(): - nodes = watcher['nodes'] - attributes = watcher['attributes'] callback = watcher['callback'] - callback(nodes=nodes, attributes=attributes, configmanager=self) + callback(nodeattribs=watcher['nodeattrs'], configmanager=self) def del_nodes(self, nodes): if self.tenant in self._nodecollwatchers: @@ -807,7 +808,7 @@ class ConfigManager(object): _mark_dirtykey('nodes', node, self.tenant) del nodek[attrib] self._do_inheritance(nodek, attrib, node) - self._notif_attribwatchers([attrib], [node]) + self._notif_attribwatchers({node:{attrib:1}}) if ('_expressionkeys' in nodek and attrib in nodek['_expressionkeys']): recalcexpressions = True @@ -823,6 +824,7 @@ class ConfigManager(object): # pickle the arguments and fire them off in eventlet # flows to peers, all should have the same result newnodes = [] + changeset = {} for node in attribmap.iterkeys(): node = node.encode('utf-8') _mark_dirtykey('nodes', node, self.tenant) @@ -861,11 +863,13 @@ class ConfigManager(object): formatter=exprmgr) # if any code is watching these attributes, notify # them of the change - self._notif_attribwatchers([attrname], [node]) + self._addchange(changeset, node, attrname) if recalcexpressions: if exprmgr is None: exprmgr = _ExpressionFormat(cfgobj, node) self._recalculate_expressions(cfgobj, formatter=exprmgr, node=node) + if changeset: + self._notif_attribwatchers(changeset) if newnodes: if self.tenant in self._nodecollwatchers: nodecollwatchers = self._nodecollwatchers[self.tenant] @@ -874,6 +878,12 @@ class ConfigManager(object): self._bg_sync_to_file() #TODO: wait for synchronization to suceed/fail??) + def _addchange(self, changeset, node, attrname): + if node not in changeset: + changeset[node] = { attrname: 1 } + else: + changeset[node][attrname] = 1 + @classmethod def _read_from_path(cls): global _cfgstore @@ -955,7 +965,7 @@ class ConfigManager(object): if 'expression' in cfgobj[key]: cfgobj[key] = _decode_attribute(key, cfgobj, formatter=formatter) - self._notif_attribwatchers([key], [node]) + self._notif_attribwatchers({node:{key:1}}) elif ('cryptvalue' not in cfgobj[key] and 'value' not in cfgobj[key]): # recurse for nested structures, with some hint tha