2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-01-13 11:17:49 +00:00

First pass at configuration change notification

Add ability for code to add watchers on nodes and their attributes.  This is likely to
be reworked internally to better aggregate requests, but the code interface
is potentially complete.
This commit is contained in:
Jarrod Johnson 2014-04-01 11:01:26 -04:00
parent 9d8ff78716
commit fda2dd08d1

View File

@ -65,8 +65,10 @@ import fcntl
import math
import operator
import os
import random
import re
import string
import sys
import threading
@ -363,6 +365,9 @@ class ConfigManager(object):
_cfgdir = "/etc/confluent/cfg/"
_cfgwriter = None
_writepending = False
_attribwatchers = {}
_nodecollwatchers = {}
_notifierids = {}
def __init__(self, tenant, decrypt=False):
global _cfgstore
@ -382,6 +387,81 @@ class ConfigManager(object):
self.tenant = tenant
self._cfgstore = _cfgstore['tenant'][tenant]
def watch_attributes(self, nodes, attributes, callback):
"""
Watch a list of attributes for changes on a list of nodes
:param nodes: An iterable of node names to be watching
:param attributes: An iterable of attribute names to be notified about
:param callback: A callback to process a notification
Returns an identifier that can be used to unsubscribe from these
notifications using remove_watcher
"""
notifierid = random.randint(0, sys.maxint)
while notifierid in self._notifierids:
notifierid = random.randint(0, sys.maxint)
self._notifierids[notifierid] = { 'attriblist': [] }
if self.tenant not in self._attribwatchers:
self._attribwatchers[self.tenant] = {}
attribwatchers = self._attribwatchers[self.tenant]
for node in nodes:
if node not in attribwatchers:
attribwatchers[node] = {}
for attribute in attributes:
self._notifierids[notifierid]['attriblist'].append(
(node,attribute))
if attribute not in attribwatchers[node]:
attribwatchers[node][attribute] = {
notifierid: callback
}
else:
attribwatchers[node][attribute][notifierid] = callback
return notifierid
def watch_nodecollection(self, callback):
"""
Watch the nodecollection for addition or removal of nodes.
A watcher is notified prior after node has been added and before node
is actually removed.
:param callback: Function to call when a node is added or removed
Returns an identifier that can be used to unsubscribe from these
notifications using remove_watcher
"""
# first provide an identifier for the calling code to
# use in case of cancellation.
# I anticipate no more than a handful of watchers of this sort, so
# this loop should not have to iterate too many times
notifierid = random.randint(0, sys.maxint)
while notifierid in self._notifierids:
notifierid = random.randint(0, sys.maxint)
# going to track that this is a nodecollection type watcher,
# but there is no additional data associated.
self.notifierids[notifierid] = set(['nodecollection'])
if self.tenant not in self._nodecollwatchers:
self._nodecollwatchers[self.tenant] = {}
self._nodecollwatchers[self.tenant][notifierid] = callback
return notifierid
def remove_watcher(self, watcher):
# identifier of int would be a collection watcher
if watcher not in self._notifierids:
raise Exception("Invalid")
# return
if 'attriblist' in self._notifierids[watcher]:
attribwatchers = self._attribwatchers[self.tenant]
for nodeattrib in self._notifierids[watcher]['attriblist']:
node, attrib = nodeattrib
del attribwatchers[node][attrib][watcher]
elif 'nodecollection' in self.notifierids[watcher]:
del self._nodecollwatchers[self.tenant][watcher]
else:
raise Exception("Completely not a valid place to be")
del self._notifierids[watcher]
def get_user(self, name):
"""Get user information from DB
@ -430,6 +510,7 @@ class ConfigManager(object):
raise Exception("Duplicate id requested")
if 'users' not in self._cfgstore:
self._cfgstore['users'] = { }
name = name.encode('utf-8')
if name in self._cfgstore['users']:
raise Exception("Duplicate username requested")
_mark_dirtykey('users', name, self.tenant)
@ -518,6 +599,7 @@ class ConfigManager(object):
return
for attrib in groupcfg.iterkeys():
self._do_inheritance(nodecfg, attrib, node)
self._notif_attribwatchers([attrib], [node])
def _node_removed_from_group(self, node, group):
try:
@ -534,6 +616,7 @@ class ConfigManager(object):
_mark_dirtykey('nodes', node, self.tenant)
del nodecfg[attrib] # remove invalid inherited data
self._do_inheritance(nodecfg, attrib, node)
self._notif_attribwatchers([attrib], [node])
except KeyError: # inheritedfrom not set, move on
pass
@ -560,6 +643,7 @@ class ConfigManager(object):
copy.deepcopy(self._cfgstore['groups'][group][attrib])
nodecfg[attrib]['inheritedfrom'] = group
self._refresh_nodecfg(nodecfg, attrib, nodename)
self._notif_attribwatchers([attrib], [nodename])
return
if srcgroup is not None and group == srcgroup:
# break out
@ -613,6 +697,7 @@ class ConfigManager(object):
if 'groups' not in self._cfgstore:
self._cfgstore['groups'] = {}
for group in attribmap.iterkeys():
group = group.encode('utf-8')
_mark_dirtykey('groups', group, self.tenant)
if group not in self._cfgstore['groups']:
self._cfgstore['groups'][group] = {'nodes': set([])}
@ -627,7 +712,7 @@ class ConfigManager(object):
if not isinstance(attribmap[group][attr], list):
raise ValueError
newdict = set(attribmap[group][attr])
elif (isinstance(attribmap[group][attr], str) or
elif (isinstance(attribmap[group][attr], str) or
isinstance(attribmap[group][attr], unicode)):
newdict = { 'value': attribmap[group][attr] }
else:
@ -643,6 +728,7 @@ class ConfigManager(object):
for node in cfgobj['nodes']:
nodecfg = self._cfgstore['nodes'][node]
self._do_inheritance(nodecfg, attr, node, srcgroup=group)
self._notif_attribwatchers([attr], [node])
self._bg_sync_to_file()
def _refresh_nodecfg(self, cfgobj, attrname, node):
@ -656,9 +742,37 @@ class ConfigManager(object):
attrname in cfgobj['_expressionkeys']):
if exprmgr is None:
exprmgr = _ExpressionFormat(cfgobj, node)
self._recalculate_expressions(cfgobj, formatter=exprmgr)
self._recalculate_expressions(cfgobj, formatter=exprmgr, node=node)
def _notif_attribwatchers(self, attrnames, nodes):
if self.tenant not in self._attribwatchers:
return
notifdata = {}
attribwatchers = self._attribwatchers[self.tenant]
for node in nodes:
if node not in attribwatchers:
continue
attribwatchers = attribwatchers[node]
for attrname in attrnames:
if attrname not in attribwatchers:
continue
for notifierid in attribwatchers[attrname].iterkeys():
if notifierid not in notifdata:
notifdata[notifierid] = {
'nodes': [ node ],
'attributes': [ attrname ],
'callback': attribwatchers[attrname][notifierid]
}
for watcher in notifdata.itervalues():
nodes = watcher['nodes']
attributes = watcher['attributes']
callback = watcher['callback']
callback(nodes=nodes, attributes=attributes, configmanager=self)
def del_nodes(self, nodes):
if self.tenant in self._nodecollwatchers:
for watcher in self._nodecollwatchers[self.tenant].itervalues():
watcher(added=[], deleting=nodes, configmanager=self)
if 'nodes' not in self._cfgstore:
return
for node in nodes:
@ -680,6 +794,7 @@ class ConfigManager(object):
def clear_node_attributes(self, nodes, attributes):
for node in nodes:
node = node.encode('utf-8')
try:
nodek = self._cfgstore['nodes'][node]
except KeyError:
@ -692,12 +807,13 @@ class ConfigManager(object):
_mark_dirtykey('nodes', node, self.tenant)
del nodek[attrib]
self._do_inheritance(nodek, attrib, node)
self._notif_attribwatchers([attrib], [node])
if ('_expressionkeys' in nodek and
attrib in nodek['_expressionkeys']):
recalcexpressions = True
if recalcexpressions:
exprmgr = _ExpressionFormat(nodek, node)
self._recalculate_expressions(nodek, formatter=exprmgr)
self._recalculate_expressions(nodek, formatter=exprmgr, node=node)
self._bg_sync_to_file()
def set_node_attributes(self, attribmap):
@ -706,11 +822,14 @@ class ConfigManager(object):
# TODO(jbjohnso): multi mgr support, here if we have peers,
# pickle the arguments and fire them off in eventlet
# flows to peers, all should have the same result
newnodes = []
for node in attribmap.iterkeys():
node = node.encode('utf-8')
_mark_dirtykey('nodes', node, self.tenant)
exprmgr = None
_mark_dirtykey('nodes', node, self.tenant)
if node not in self._cfgstore['nodes']:
newnodes.append(node)
self._cfgstore['nodes'][node] = {}
cfgobj = self._cfgstore['nodes'][node]
recalcexpressions = False
@ -740,10 +859,18 @@ class ConfigManager(object):
exprmgr = _ExpressionFormat(cfgobj, node)
cfgobj[attrname] = _decode_attribute(attrname, cfgobj,
formatter=exprmgr)
# if any code is watching these attributes, notify
# them of the change
self._notif_attribwatchers([attrname], [node])
if recalcexpressions:
if exprmgr is None:
exprmgr = _ExpressionFormat(cfgobj, node)
self._recalculate_expressions(cfgobj, formatter=exprmgr)
self._recalculate_expressions(cfgobj, formatter=exprmgr, node=node)
if newnodes:
if self.tenant in self._nodecollwatchers:
nodecollwatchers = self._nodecollwatchers[self.tenant]
for watcher in nodecollwatchers[self.tenant].itervalues():
watcher(added=newnodes, deleting=[], configmanager=self)
self._bg_sync_to_file()
#TODO: wait for synchronization to suceed/fail??)
@ -821,18 +948,19 @@ class ConfigManager(object):
cls._writepending = False
return cls._sync_to_file()
def _recalculate_expressions(self, cfgobj, formatter):
def _recalculate_expressions(self, cfgobj, formatter, node):
for key in cfgobj.iterkeys():
if not isinstance(cfgobj[key],dict):
continue
if 'expression' in cfgobj[key]:
cfgobj[key] = _decode_attribute(key, cfgobj,
formatter=formatter)
self._notif_attribwatchers([key], [node])
elif ('cryptvalue' not in cfgobj[key] and
'value' not in cfgobj[key]):
# recurse for nested structures, with some hint tha
# it might indeed be a nested structure
_recalculate_expressions(cfgobj[key], formatter)
_recalculate_expressions(cfgobj[key], formatter, node)
try: