mirror of
https://github.com/xcat2/confluent.git
synced 2024-11-22 01:22:00 +00:00
Amend the notification structure to better bind attributes to nodes
This commit is contained in:
parent
fda2dd08d1
commit
43c6ed5c93
@ -599,7 +599,7 @@ class ConfigManager(object):
|
||||
return
|
||||
for attrib in groupcfg.iterkeys():
|
||||
self._do_inheritance(nodecfg, attrib, node)
|
||||
self._notif_attribwatchers([attrib], [node])
|
||||
self._notif_attribwatchers({node:{attrib:1}})
|
||||
|
||||
def _node_removed_from_group(self, node, group):
|
||||
try:
|
||||
@ -616,7 +616,7 @@ class ConfigManager(object):
|
||||
_mark_dirtykey('nodes', node, self.tenant)
|
||||
del nodecfg[attrib] # remove invalid inherited data
|
||||
self._do_inheritance(nodecfg, attrib, node)
|
||||
self._notif_attribwatchers([attrib], [node])
|
||||
self._notif_attribwatchers({node:{attrib:1}})
|
||||
except KeyError: # inheritedfrom not set, move on
|
||||
pass
|
||||
|
||||
@ -643,7 +643,6 @@ class ConfigManager(object):
|
||||
copy.deepcopy(self._cfgstore['groups'][group][attrib])
|
||||
nodecfg[attrib]['inheritedfrom'] = group
|
||||
self._refresh_nodecfg(nodecfg, attrib, nodename)
|
||||
self._notif_attribwatchers([attrib], [nodename])
|
||||
return
|
||||
if srcgroup is not None and group == srcgroup:
|
||||
# break out
|
||||
@ -728,7 +727,7 @@ class ConfigManager(object):
|
||||
for node in cfgobj['nodes']:
|
||||
nodecfg = self._cfgstore['nodes'][node]
|
||||
self._do_inheritance(nodecfg, attr, node, srcgroup=group)
|
||||
self._notif_attribwatchers([attr], [node])
|
||||
self._notif_attribwatchers({node:{attr:1}})
|
||||
self._bg_sync_to_file()
|
||||
|
||||
def _refresh_nodecfg(self, cfgobj, attrname, node):
|
||||
@ -744,30 +743,32 @@ class ConfigManager(object):
|
||||
exprmgr = _ExpressionFormat(cfgobj, node)
|
||||
self._recalculate_expressions(cfgobj, formatter=exprmgr, node=node)
|
||||
|
||||
def _notif_attribwatchers(self, attrnames, nodes):
|
||||
def _notif_attribwatchers(self, nodeattrs):
|
||||
if self.tenant not in self._attribwatchers:
|
||||
return
|
||||
notifdata = {}
|
||||
attribwatchers = self._attribwatchers[self.tenant]
|
||||
for node in nodes:
|
||||
for node in nodeattrs.iterkeys():
|
||||
if node not in attribwatchers:
|
||||
continue
|
||||
attribwatchers = attribwatchers[node]
|
||||
for attrname in attrnames:
|
||||
for attrname in nodeattrs[node].iterkeys():
|
||||
if attrname not in attribwatchers:
|
||||
continue
|
||||
for notifierid in attribwatchers[attrname].iterkeys():
|
||||
if notifierid not in notifdata:
|
||||
if notifierid in notifdata:
|
||||
if node in notifdata[notifierid]:
|
||||
notifdata[notifierid]['nodeattrs'][node].append(attrname)
|
||||
else:
|
||||
notifdata[notifierid]['nodeattrs'][node] = [attrname]
|
||||
else:
|
||||
notifdata[notifierid] = {
|
||||
'nodes': [ node ],
|
||||
'attributes': [ attrname ],
|
||||
'nodeattrs': { node: [attrname] },
|
||||
'callback': attribwatchers[attrname][notifierid]
|
||||
}
|
||||
for watcher in notifdata.itervalues():
|
||||
nodes = watcher['nodes']
|
||||
attributes = watcher['attributes']
|
||||
callback = watcher['callback']
|
||||
callback(nodes=nodes, attributes=attributes, configmanager=self)
|
||||
callback(nodeattribs=watcher['nodeattrs'], configmanager=self)
|
||||
|
||||
def del_nodes(self, nodes):
|
||||
if self.tenant in self._nodecollwatchers:
|
||||
@ -807,7 +808,7 @@ class ConfigManager(object):
|
||||
_mark_dirtykey('nodes', node, self.tenant)
|
||||
del nodek[attrib]
|
||||
self._do_inheritance(nodek, attrib, node)
|
||||
self._notif_attribwatchers([attrib], [node])
|
||||
self._notif_attribwatchers({node:{attrib:1}})
|
||||
if ('_expressionkeys' in nodek and
|
||||
attrib in nodek['_expressionkeys']):
|
||||
recalcexpressions = True
|
||||
@ -823,6 +824,7 @@ class ConfigManager(object):
|
||||
# pickle the arguments and fire them off in eventlet
|
||||
# flows to peers, all should have the same result
|
||||
newnodes = []
|
||||
changeset = {}
|
||||
for node in attribmap.iterkeys():
|
||||
node = node.encode('utf-8')
|
||||
_mark_dirtykey('nodes', node, self.tenant)
|
||||
@ -861,11 +863,13 @@ class ConfigManager(object):
|
||||
formatter=exprmgr)
|
||||
# if any code is watching these attributes, notify
|
||||
# them of the change
|
||||
self._notif_attribwatchers([attrname], [node])
|
||||
self._addchange(changeset, node, attrname)
|
||||
if recalcexpressions:
|
||||
if exprmgr is None:
|
||||
exprmgr = _ExpressionFormat(cfgobj, node)
|
||||
self._recalculate_expressions(cfgobj, formatter=exprmgr, node=node)
|
||||
if changeset:
|
||||
self._notif_attribwatchers(changeset)
|
||||
if newnodes:
|
||||
if self.tenant in self._nodecollwatchers:
|
||||
nodecollwatchers = self._nodecollwatchers[self.tenant]
|
||||
@ -874,6 +878,12 @@ class ConfigManager(object):
|
||||
self._bg_sync_to_file()
|
||||
#TODO: wait for synchronization to suceed/fail??)
|
||||
|
||||
def _addchange(self, changeset, node, attrname):
|
||||
if node not in changeset:
|
||||
changeset[node] = { attrname: 1 }
|
||||
else:
|
||||
changeset[node][attrname] = 1
|
||||
|
||||
@classmethod
|
||||
def _read_from_path(cls):
|
||||
global _cfgstore
|
||||
@ -955,7 +965,7 @@ class ConfigManager(object):
|
||||
if 'expression' in cfgobj[key]:
|
||||
cfgobj[key] = _decode_attribute(key, cfgobj,
|
||||
formatter=formatter)
|
||||
self._notif_attribwatchers([key], [node])
|
||||
self._notif_attribwatchers({node:{key:1}})
|
||||
elif ('cryptvalue' not in cfgobj[key] and
|
||||
'value' not in cfgobj[key]):
|
||||
# recurse for nested structures, with some hint tha
|
||||
|
Loading…
Reference in New Issue
Block a user