2
0
mirror of https://github.com/xcat2/confluent.git synced 2024-11-22 09:32:21 +00:00

Aggregate changes into sets and notify only after changeset full

This change causes cfg change notifications to more accurately reflect atomic
expectactions.  If multiple fields are changed on multiple nodes that a watcher may
have registered, they will now get that data in one chunk instead of many.
This commit is contained in:
Jarrod Johnson 2014-04-01 14:48:31 -04:00
parent 43c6ed5c93
commit 17bf92ce16

View File

@ -591,17 +591,17 @@ class ConfigManager(object):
retdict[node] = nodeobj
return retdict
def _node_added_to_group(self, node, group):
def _node_added_to_group(self, node, group, changeset):
try:
nodecfg = self._cfgstore['nodes'][node]
groupcfg = self._cfgstore['groups'][group]
except KeyError: # something did not exist, nothing to do
return
for attrib in groupcfg.iterkeys():
self._do_inheritance(nodecfg, attrib, node)
self._notif_attribwatchers({node:{attrib:1}})
self._do_inheritance(nodecfg, attrib, node, changeset)
self._addchange(changeset, node, attrib)
def _node_removed_from_group(self, node, group):
def _node_removed_from_group(self, node, group, changeset):
try:
nodecfg = self._cfgstore['nodes'][node]
except KeyError: # node did not exist, nothing to do
@ -615,12 +615,12 @@ class ConfigManager(object):
if nodecfg[attrib]['inheritedfrom'] == group:
_mark_dirtykey('nodes', node, self.tenant)
del nodecfg[attrib] # remove invalid inherited data
self._do_inheritance(nodecfg, attrib, node)
self._notif_attribwatchers({node:{attrib:1}})
self._do_inheritance(nodecfg, attrib, node, changeset)
self._addchange(changeset, node, attrib)
except KeyError: # inheritedfrom not set, move on
pass
def _do_inheritance(self, nodecfg, attrib, nodename, srcgroup=None):
def _do_inheritance(self, nodecfg, attrib, nodename, changeset, srcgroup=None):
# for now, just do single inheritance
# TODO: concatenating inheritance if requested
if attrib in ('nodes', 'groups'):
@ -642,13 +642,14 @@ class ConfigManager(object):
nodecfg[attrib] = \
copy.deepcopy(self._cfgstore['groups'][group][attrib])
nodecfg[attrib]['inheritedfrom'] = group
self._refresh_nodecfg(nodecfg, attrib, nodename)
self._refresh_nodecfg(nodecfg, attrib, nodename,
changeset=changeset)
return
if srcgroup is not None and group == srcgroup:
# break out
return
def _sync_groups_to_node(self, groups, node):
def _sync_groups_to_node(self, groups, node, changeset):
if 'groups' not in self._cfgstore:
self._cfgstore['groups'] = {}
for group in self._cfgstore['groups'].iterkeys():
@ -656,7 +657,7 @@ class ConfigManager(object):
if node in self._cfgstore['groups'][group]['nodes']:
self._cfgstore['groups'][group]['nodes'].discard(node)
_mark_dirtykey('groups', group, self.tenant)
self._node_removed_from_group(node, group)
self._node_removed_from_group(node, group, changeset)
for group in groups:
if group not in self._cfgstore['groups']:
_mark_dirtykey('groups', group, self.tenant)
@ -668,16 +669,16 @@ class ConfigManager(object):
_mark_dirtykey('groups', group, self.tenant)
self._cfgstore['groups'][group]['nodes'].add(node)
# node was not already in given group, perform inheritence fixup
self._node_added_to_group(node, group)
self._node_added_to_group(node, group, changeset)
def _sync_nodes_to_group(self, nodes, group):
def _sync_nodes_to_group(self, nodes, group, changeset):
if 'nodes' not in self._cfgstore:
self._cfgstore['nodes'] = {}
for node in self._cfgstore['nodes'].iterkeys():
if node not in nodes and 'groups' in self._cfgstore['nodes'][node]:
if group in self._cfgstore['nodes'][node]['groups']:
self._cfgstore['nodes'][node]['groups'].remove(group)
self._node_removed_from_group(node, group)
self._node_removed_from_group(node, group, changeset)
for node in nodes:
if node not in self._cfgstore['nodes']:
_mark_dirtykey('nodes', node, self.tenant)
@ -690,11 +691,12 @@ class ConfigManager(object):
self._cfgstore['nodes'][node]['groups'].insert(0, group)
else:
continue # next node, this node already in
self._node_added_to_group(node, group)
self._node_added_to_group(node, group, changeset)
def set_group_attributes(self, attribmap):
if 'groups' not in self._cfgstore:
self._cfgstore['groups'] = {}
changeset = {}
for group in attribmap.iterkeys():
group = group.encode('utf-8')
_mark_dirtykey('groups', group, self.tenant)
@ -722,15 +724,17 @@ class ConfigManager(object):
cfgobj[attr] = newdict
if attr == 'nodes':
self._sync_nodes_to_group(group=group,
nodes=attribmap[group]['nodes'])
nodes=attribmap[group]['nodes'],
changeset=changeset)
else: # update inheritence
for node in cfgobj['nodes']:
nodecfg = self._cfgstore['nodes'][node]
self._do_inheritance(nodecfg, attr, node, srcgroup=group)
self._notif_attribwatchers({node:{attr:1}})
self._do_inheritance(nodecfg, attr, node, changeset, srcgroup=group)
self._addchange(changeset, node, attr)
self._notif_attribwatchers(changeset)
self._bg_sync_to_file()
def _refresh_nodecfg(self, cfgobj, attrname, node):
def _refresh_nodecfg(self, cfgobj, attrname, node, changeset):
exprmgr = None
if 'expression' in cfgobj[attrname]: # evaluate now
if exprmgr is None:
@ -741,7 +745,8 @@ class ConfigManager(object):
attrname in cfgobj['_expressionkeys']):
if exprmgr is None:
exprmgr = _ExpressionFormat(cfgobj, node)
self._recalculate_expressions(cfgobj, formatter=exprmgr, node=node)
self._recalculate_expressions(cfgobj, formatter=exprmgr, node=node,
changeset=changeset)
def _notif_attribwatchers(self, nodeattrs):
if self.tenant not in self._attribwatchers:
@ -757,7 +762,7 @@ class ConfigManager(object):
continue
for notifierid in attribwatchers[attrname].iterkeys():
if notifierid in notifdata:
if node in notifdata[notifierid]:
if node in notifdata[notifierid]['nodeattrs']:
notifdata[notifierid]['nodeattrs'][node].append(attrname)
else:
notifdata[notifierid]['nodeattrs'][node] = [attrname]
@ -774,26 +779,35 @@ class ConfigManager(object):
if self.tenant in self._nodecollwatchers:
for watcher in self._nodecollwatchers[self.tenant].itervalues():
watcher(added=[], deleting=nodes, configmanager=self)
changeset = {}
if 'nodes' not in self._cfgstore:
return
for node in nodes:
if node in self._cfgstore['nodes']:
self._sync_groups_to_node(node=node, groups=[])
self._sync_groups_to_node(node=node, groups=[],
changeset=changeset)
_mark_dirtykey('nodes', node, self.tenant)
del self._cfgstore['nodes'][node]
self._notif_attribwatchers(changeset)
self._bg_sync_to_file()
def del_groups(self, groups):
if 'groups' not in self._cfgstore:
return
changeset = {}
for group in groups:
if group in self._cfgstore['groups']:
self._sync_nodes_to_group(group=group, nodes=[])
_mark_dirtykey('groups', group, self.tenant)
self._sync_nodes_to_group(group=group, nodes=[],
changeset=changeset)
_mark_dirtykey('groups', group, self.tenant,
changeset=changeset)
del self._cfgstore['groups'][group]
self._notif_attribwatchers(changeset)
self._bg_sync_to_file()
def clear_node_attributes(self, nodes, attributes):
# accumulate all changes into a changeset and push in one go
changeset = {}
for node in nodes:
node = node.encode('utf-8')
try:
@ -807,14 +821,16 @@ class ConfigManager(object):
# delete it and check for inheritence to backfil data
_mark_dirtykey('nodes', node, self.tenant)
del nodek[attrib]
self._do_inheritance(nodek, attrib, node)
self._notif_attribwatchers({node:{attrib:1}})
self._do_inheritance(nodek, attrib, node, changeset)
self._addchange(changeset, node, attrib)
if ('_expressionkeys' in nodek and
attrib in nodek['_expressionkeys']):
recalcexpressions = True
if recalcexpressions:
exprmgr = _ExpressionFormat(nodek, node)
self._recalculate_expressions(nodek, formatter=exprmgr, node=node)
self._recalculate_expressions(nodek, formatter=exprmgr,
node=node, changeset=changeset)
self._notif_attribwatchers(changeset)
self._bg_sync_to_file()
def set_node_attributes(self, attribmap):
@ -852,7 +868,7 @@ class ConfigManager(object):
cfgobj[attrname] = newdict
if attrname == 'groups':
self._sync_groups_to_node(node=node,
groups=attribmap[node]['groups'])
groups=attribmap[node]['groups'], changeset=changeset)
if ('_expressionkeys' in cfgobj and
attrname in cfgobj['_expressionkeys']):
recalcexpressions = True
@ -867,9 +883,9 @@ class ConfigManager(object):
if recalcexpressions:
if exprmgr is None:
exprmgr = _ExpressionFormat(cfgobj, node)
self._recalculate_expressions(cfgobj, formatter=exprmgr, node=node)
if changeset:
self._notif_attribwatchers(changeset)
self._recalculate_expressions(cfgobj, formatter=exprmgr,
node=node, changeset=changeset)
self._notif_attribwatchers(changeset)
if newnodes:
if self.tenant in self._nodecollwatchers:
nodecollwatchers = self._nodecollwatchers[self.tenant]
@ -965,7 +981,7 @@ class ConfigManager(object):
if 'expression' in cfgobj[key]:
cfgobj[key] = _decode_attribute(key, cfgobj,
formatter=formatter)
self._notif_attribwatchers({node:{key:1}})
self._addchange(changeset, node, key)
elif ('cryptvalue' not in cfgobj[key] and
'value' not in cfgobj[key]):
# recurse for nested structures, with some hint tha