diff --git a/confluent/config/configmanager.py b/confluent/config/configmanager.py index 88fc40dd..8a6d1e9c 100644 --- a/confluent/config/configmanager.py +++ b/confluent/config/configmanager.py @@ -65,8 +65,10 @@ import fcntl import math import operator import os +import random import re import string +import sys import threading @@ -363,6 +365,9 @@ class ConfigManager(object): _cfgdir = "/etc/confluent/cfg/" _cfgwriter = None _writepending = False + _attribwatchers = {} + _nodecollwatchers = {} + _notifierids = {} def __init__(self, tenant, decrypt=False): global _cfgstore @@ -382,6 +387,81 @@ class ConfigManager(object): self.tenant = tenant self._cfgstore = _cfgstore['tenant'][tenant] + def watch_attributes(self, nodes, attributes, callback): + """ + Watch a list of attributes for changes on a list of nodes + + :param nodes: An iterable of node names to be watching + :param attributes: An iterable of attribute names to be notified about + :param callback: A callback to process a notification + + Returns an identifier that can be used to unsubscribe from these + notifications using remove_watcher + """ + notifierid = random.randint(0, sys.maxint) + while notifierid in self._notifierids: + notifierid = random.randint(0, sys.maxint) + self._notifierids[notifierid] = { 'attriblist': [] } + if self.tenant not in self._attribwatchers: + self._attribwatchers[self.tenant] = {} + attribwatchers = self._attribwatchers[self.tenant] + for node in nodes: + if node not in attribwatchers: + attribwatchers[node] = {} + for attribute in attributes: + self._notifierids[notifierid]['attriblist'].append( + (node,attribute)) + if attribute not in attribwatchers[node]: + attribwatchers[node][attribute] = { + notifierid: callback + } + else: + attribwatchers[node][attribute][notifierid] = callback + return notifierid + + def watch_nodecollection(self, callback): + """ + Watch the nodecollection for addition or removal of nodes. + + A watcher is notified prior after node has been added and before node + is actually removed. + + :param callback: Function to call when a node is added or removed + + Returns an identifier that can be used to unsubscribe from these + notifications using remove_watcher + """ + # first provide an identifier for the calling code to + # use in case of cancellation. + # I anticipate no more than a handful of watchers of this sort, so + # this loop should not have to iterate too many times + notifierid = random.randint(0, sys.maxint) + while notifierid in self._notifierids: + notifierid = random.randint(0, sys.maxint) + # going to track that this is a nodecollection type watcher, + # but there is no additional data associated. + self.notifierids[notifierid] = set(['nodecollection']) + if self.tenant not in self._nodecollwatchers: + self._nodecollwatchers[self.tenant] = {} + self._nodecollwatchers[self.tenant][notifierid] = callback + return notifierid + + def remove_watcher(self, watcher): + # identifier of int would be a collection watcher + if watcher not in self._notifierids: + raise Exception("Invalid") + # return + if 'attriblist' in self._notifierids[watcher]: + attribwatchers = self._attribwatchers[self.tenant] + for nodeattrib in self._notifierids[watcher]['attriblist']: + node, attrib = nodeattrib + del attribwatchers[node][attrib][watcher] + elif 'nodecollection' in self.notifierids[watcher]: + del self._nodecollwatchers[self.tenant][watcher] + else: + raise Exception("Completely not a valid place to be") + del self._notifierids[watcher] + def get_user(self, name): """Get user information from DB @@ -430,6 +510,7 @@ class ConfigManager(object): raise Exception("Duplicate id requested") if 'users' not in self._cfgstore: self._cfgstore['users'] = { } + name = name.encode('utf-8') if name in self._cfgstore['users']: raise Exception("Duplicate username requested") _mark_dirtykey('users', name, self.tenant) @@ -518,6 +599,7 @@ class ConfigManager(object): return for attrib in groupcfg.iterkeys(): self._do_inheritance(nodecfg, attrib, node) + self._notif_attribwatchers([attrib], [node]) def _node_removed_from_group(self, node, group): try: @@ -534,6 +616,7 @@ class ConfigManager(object): _mark_dirtykey('nodes', node, self.tenant) del nodecfg[attrib] # remove invalid inherited data self._do_inheritance(nodecfg, attrib, node) + self._notif_attribwatchers([attrib], [node]) except KeyError: # inheritedfrom not set, move on pass @@ -560,6 +643,7 @@ class ConfigManager(object): copy.deepcopy(self._cfgstore['groups'][group][attrib]) nodecfg[attrib]['inheritedfrom'] = group self._refresh_nodecfg(nodecfg, attrib, nodename) + self._notif_attribwatchers([attrib], [nodename]) return if srcgroup is not None and group == srcgroup: # break out @@ -613,6 +697,7 @@ class ConfigManager(object): if 'groups' not in self._cfgstore: self._cfgstore['groups'] = {} for group in attribmap.iterkeys(): + group = group.encode('utf-8') _mark_dirtykey('groups', group, self.tenant) if group not in self._cfgstore['groups']: self._cfgstore['groups'][group] = {'nodes': set([])} @@ -627,7 +712,7 @@ class ConfigManager(object): if not isinstance(attribmap[group][attr], list): raise ValueError newdict = set(attribmap[group][attr]) - elif (isinstance(attribmap[group][attr], str) or + elif (isinstance(attribmap[group][attr], str) or isinstance(attribmap[group][attr], unicode)): newdict = { 'value': attribmap[group][attr] } else: @@ -643,6 +728,7 @@ class ConfigManager(object): for node in cfgobj['nodes']: nodecfg = self._cfgstore['nodes'][node] self._do_inheritance(nodecfg, attr, node, srcgroup=group) + self._notif_attribwatchers([attr], [node]) self._bg_sync_to_file() def _refresh_nodecfg(self, cfgobj, attrname, node): @@ -656,9 +742,37 @@ class ConfigManager(object): attrname in cfgobj['_expressionkeys']): if exprmgr is None: exprmgr = _ExpressionFormat(cfgobj, node) - self._recalculate_expressions(cfgobj, formatter=exprmgr) + self._recalculate_expressions(cfgobj, formatter=exprmgr, node=node) + + def _notif_attribwatchers(self, attrnames, nodes): + if self.tenant not in self._attribwatchers: + return + notifdata = {} + attribwatchers = self._attribwatchers[self.tenant] + for node in nodes: + if node not in attribwatchers: + continue + attribwatchers = attribwatchers[node] + for attrname in attrnames: + if attrname not in attribwatchers: + continue + for notifierid in attribwatchers[attrname].iterkeys(): + if notifierid not in notifdata: + notifdata[notifierid] = { + 'nodes': [ node ], + 'attributes': [ attrname ], + 'callback': attribwatchers[attrname][notifierid] + } + for watcher in notifdata.itervalues(): + nodes = watcher['nodes'] + attributes = watcher['attributes'] + callback = watcher['callback'] + callback(nodes=nodes, attributes=attributes, configmanager=self) def del_nodes(self, nodes): + if self.tenant in self._nodecollwatchers: + for watcher in self._nodecollwatchers[self.tenant].itervalues(): + watcher(added=[], deleting=nodes, configmanager=self) if 'nodes' not in self._cfgstore: return for node in nodes: @@ -680,6 +794,7 @@ class ConfigManager(object): def clear_node_attributes(self, nodes, attributes): for node in nodes: + node = node.encode('utf-8') try: nodek = self._cfgstore['nodes'][node] except KeyError: @@ -692,12 +807,13 @@ class ConfigManager(object): _mark_dirtykey('nodes', node, self.tenant) del nodek[attrib] self._do_inheritance(nodek, attrib, node) + self._notif_attribwatchers([attrib], [node]) if ('_expressionkeys' in nodek and attrib in nodek['_expressionkeys']): recalcexpressions = True if recalcexpressions: exprmgr = _ExpressionFormat(nodek, node) - self._recalculate_expressions(nodek, formatter=exprmgr) + self._recalculate_expressions(nodek, formatter=exprmgr, node=node) self._bg_sync_to_file() def set_node_attributes(self, attribmap): @@ -706,11 +822,14 @@ class ConfigManager(object): # TODO(jbjohnso): multi mgr support, here if we have peers, # pickle the arguments and fire them off in eventlet # flows to peers, all should have the same result + newnodes = [] for node in attribmap.iterkeys(): + node = node.encode('utf-8') _mark_dirtykey('nodes', node, self.tenant) exprmgr = None _mark_dirtykey('nodes', node, self.tenant) if node not in self._cfgstore['nodes']: + newnodes.append(node) self._cfgstore['nodes'][node] = {} cfgobj = self._cfgstore['nodes'][node] recalcexpressions = False @@ -740,10 +859,18 @@ class ConfigManager(object): exprmgr = _ExpressionFormat(cfgobj, node) cfgobj[attrname] = _decode_attribute(attrname, cfgobj, formatter=exprmgr) + # if any code is watching these attributes, notify + # them of the change + self._notif_attribwatchers([attrname], [node]) if recalcexpressions: if exprmgr is None: exprmgr = _ExpressionFormat(cfgobj, node) - self._recalculate_expressions(cfgobj, formatter=exprmgr) + self._recalculate_expressions(cfgobj, formatter=exprmgr, node=node) + if newnodes: + if self.tenant in self._nodecollwatchers: + nodecollwatchers = self._nodecollwatchers[self.tenant] + for watcher in nodecollwatchers[self.tenant].itervalues(): + watcher(added=newnodes, deleting=[], configmanager=self) self._bg_sync_to_file() #TODO: wait for synchronization to suceed/fail??) @@ -821,18 +948,19 @@ class ConfigManager(object): cls._writepending = False return cls._sync_to_file() - def _recalculate_expressions(self, cfgobj, formatter): + def _recalculate_expressions(self, cfgobj, formatter, node): for key in cfgobj.iterkeys(): if not isinstance(cfgobj[key],dict): continue if 'expression' in cfgobj[key]: cfgobj[key] = _decode_attribute(key, cfgobj, formatter=formatter) + self._notif_attribwatchers([key], [node]) elif ('cryptvalue' not in cfgobj[key] and 'value' not in cfgobj[key]): # recurse for nested structures, with some hint tha # it might indeed be a nested structure - _recalculate_expressions(cfgobj[key], formatter) + _recalculate_expressions(cfgobj[key], formatter, node) try: