From 9617d1f4a49b6e4e3d6cb816f84f46f577a5a616 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Mon, 3 Mar 2014 20:01:13 -0500 Subject: [PATCH] Implement dbm style backing for the configuration persistence The pickling would get horrendously slow as total node count increased. This meant very long time to sync to disk for just one change out of 65,000. This strategy changes things to more selective and only do things for the dirty keys rather than everything. Large changes to small amounts of nodes will take more time (because more calls to dump pickle), but small changes to a small subset of nodes will take much less time. --- TODO | 8 ++ confluent/config/configmanager.py | 148 +++++++++++++++++++++++------- 2 files changed, 123 insertions(+), 33 deletions(-) diff --git a/TODO b/TODO index 0604ffb7..44c98095 100644 --- a/TODO +++ b/TODO @@ -1,3 +1,11 @@ +-user can bog down all requests by hammering it with bad auth requests causing + the auth facility to get bogged down in PBKDF + Option: + -a multiprocessing pool to handle new authentications. The auth action + itself would be stalled out whilst an attack was underway, but once + in, the respective session layers will provide a caching session that + should accelerate things after the client gets in once + -penalizing a client clearly trying to break in -expressionkeys never gets smaller - perf impact -need event notification for config change- e.g. set attribute triggers consol session object check to see if credentials changed diff --git a/confluent/config/configmanager.py b/confluent/config/configmanager.py index 9aef0c28..baaa9e91 100644 --- a/confluent/config/configmanager.py +++ b/confluent/config/configmanager.py @@ -52,12 +52,14 @@ from Crypto.Cipher import AES from Crypto.Hash import HMAC from Crypto.Hash import SHA256 import array +import anydbm as dbm import ast import collections import confluent.config.attributes as allattributes import confluent.util import copy import cPickle +import errno import eventlet import fcntl import math @@ -70,8 +72,17 @@ import threading _masterkey = None _masterintegritykey = None +_dirtylock = threading.RLock() +def _mkpath(pathname): + try: + os.makedirs(pathname) + except OSError as exc: + if exc.errno == errno.EEXIST and os.path.isdir(pathname): + pass + else: + raise def _derive_keys(passphrase, salt): #implement our specific combination of pbkdf2 transforms to get at @@ -117,7 +128,7 @@ def init_masterkey(passphrase=None): _masterkey = _get_protected_key(cfgn, passphrase=passphrase) else: _masterkey = os.urandom(32) - configmanager.set_global('master_privacy_key', _format_key( + set_global('master_privacy_key', _format_key( _masterkey, passphrase=passphrase)) cfgn = get_global('master_integrity_key') @@ -125,7 +136,7 @@ def init_masterkey(passphrase=None): _masterintegritykey = _get_protected_key(cfgn, passphrase=passphrase) else: _masterintegritykey = os.urandom(64) - configmanager.set_global('master_integrity_key', _format_key( + set_global('master_integrity_key', _format_key( _masterintegritykey, passphrase=passphrase)) @@ -172,6 +183,18 @@ def crypt_value(value, return (iv, cryptval, hmac) +def _load_dict_from_dbm(dpath, tdb): + try: + dbe = dbm.open(tdb, 'r') + currdict = _cfgstore + for elem in dpath: + if elem not in currdict: + currdict[elem] = {} + currdict = currdict[elem] + for tk in dbe.iterkeys(): + currdict[tk] = cPickle.loads(dbe[tk]) + except dbm.error: + return def is_tenant(tenant): try: @@ -200,6 +223,10 @@ def set_global(globalname, value): :param globalname: The global parameter name to store :param value: The value to set the global parameter to. """ + with _dirtylock: + if 'dirtyglobals' not in _cfgstore: + _cfgstore['dirtyglobals'] = set() + _cfgstore['dirtyglobals'].add(globalname) if 'globals' not in _cfgstore: _cfgstore['globals'] = { globalname: value } else: @@ -207,6 +234,17 @@ def set_global(globalname, value): ConfigManager._bg_sync_to_file() +def _mark_dirtykey(category, key, tenant=None): + with _dirtylock: + if 'dirtykeys' not in _cfgstore: + _cfgstore['dirtykeys'] = {} + if tenant not in _cfgstore['dirtykeys']: + _cfgstore['dirtykeys'][tenant] = {} + if category not in _cfgstore['dirtykeys'][tenant]: + _cfgstore['dirtykeys'][tenant][category] = set() + _cfgstore['dirtykeys'][tenant][category].add(key) + + def _generate_new_id(): # generate a random id outside the usual ranges used for norml users in # /etc/passwd. Leave an equivalent amount of space near the end disused, @@ -322,7 +360,7 @@ def _decode_attribute(attribute, nodeobj, formatter=None, decrypt=False): # most of the time as things are automatic class ConfigManager(object): - _cfgfilename = "/etc/confluent/cfgdb" + _cfgdir = "/etc/confluent/cfg/" _cfgwriter = None _writepending = False @@ -332,14 +370,14 @@ class ConfigManager(object): if tenant is None: self.tenant = None if 'main' not in _cfgstore: - _cfgstore['main'] = {'id': None} + _cfgstore['main'] = {} self._cfgstore = _cfgstore['main'] return elif 'tenant' not in _cfgstore: - _cfgstore['tenant'] = {tenant: {'id': tenant}} + _cfgstore['tenant'] = {tenant: {}} self._bg_sync_to_file() elif tenant not in _cfgstore['tenant']: - _cfgstore['tenant'][tenant] = {'id': tenant} + _cfgstore['tenant'][tenant] = {} self._bg_sync_to_file() self.tenant = tenant self._cfgstore = _cfgstore['tenant'][tenant] @@ -369,6 +407,7 @@ class ConfigManager(object): :param attributemap: A dict of key values to set """ user = self._cfgstore['users'][name] + _mark_dirtykey('users', name, self.tenant) for attribute in attributemap: user[attribute] = attributemap[attribute] self._bg_sync_to_file() @@ -393,11 +432,13 @@ class ConfigManager(object): self._cfgstore['users'] = { } if name in self._cfgstore['users']: raise Exception("Duplicate username requested") + _mark_dirtykey('users', name, self.tenant) self._cfgstore['users'][name] = {'id': id} if displayname is not None: self._cfgstore['users'][name]['displayname'] = displayname if 'idmap' not in _cfgstore: _cfgstore['idmap'] = {} + _mark_dirtykey('idmap', id) _cfgstore['idmap'][id] = { 'tenant': self.tenant, 'username': name @@ -490,6 +531,7 @@ class ConfigManager(object): continue try: if nodecfg[attrib]['inheritedfrom'] == group: + _mark_dirtykey('node', node, self.tenant) del nodecfg[attrib] # remove invalid inherited data self._do_inheritance(nodecfg, attrib, node) except KeyError: # inheritedfrom not set, move on @@ -513,6 +555,7 @@ class ConfigManager(object): if srcgroup is not None and group != srcgroup: # skip needless deepcopy return + _mark_dirtykey('nodes', nodename, self.tenant) nodecfg[attrib] = \ copy.deepcopy(self._cfgstore['groups'][group][attrib]) nodecfg[attrib]['inheritedfrom'] = group @@ -531,15 +574,14 @@ class ConfigManager(object): self._cfgstore['groups'][group]['nodes'].discard(node) self._node_removed_from_group(node, group) for group in groups: - if 'grouplist' not in self._cfgstore: - self._cfgstore['grouplist'] = [group] - elif group not in self._cfgstore['grouplist']: - self._cfgstore['grouplist'].append(group) if group not in self._cfgstore['groups']: + _mark_dirtykey('groups', group, self.tenant) self._cfgstore['groups'][group] = {'nodes': set([node])} elif 'nodes' not in self._cfgstore['groups'][group]: + _mark_dirtykey('groups', group, self.tenant) self._cfgstore['groups'][group]['nodes'] = set([node]) elif node not in self._cfgstore['groups'][group]['nodes']: + _mark_dirtykey('groups', group, self.tenant) self._cfgstore['groups'][group]['nodes'].add(node) # node was not already in given group, perform inheritence fixup self._node_added_to_group(node, group) @@ -554,10 +596,13 @@ class ConfigManager(object): self._node_removed_from_group(node, group) for node in nodes: if node not in self._cfgstore['nodes']: + _mark_dirtykey('nodes', node, self.tenant) self._cfgstore['nodes'][node] = {'groups': [group]} elif 'groups' not in self._cfgstore['nodes'][node]: + _mark_dirtykey('nodes', node, self.tenant) self._cfgstore['nodes'][node]['groups'] = [group] elif group not in self._cfgstore['nodes'][node]['groups']: + _mark_dirtykey('nodes', node, self.tenant) self._cfgstore['nodes'][node]['groups'].insert(0, group) else: continue # next node, this node already in @@ -567,6 +612,7 @@ class ConfigManager(object): if 'groups' not in self._cfgstore: self._cfgstore['groups'] = {} for group in attribmap.iterkeys(): + _mark_dirtykey('groups', group, self.tenant) if group not in self._cfgstore['groups']: self._cfgstore['groups'][group] = {'nodes': set([])} cfgobj = self._cfgstore['groups'][group] @@ -617,6 +663,7 @@ class ConfigManager(object): for node in nodes: if node in self._cfgstore['nodes']: self._sync_groups_to_node(node=node, groups=[]) + _mark_dirtykey('nodes', node, self.tenant) del self._cfgstore['nodes'][node] self._bg_sync_to_file() @@ -626,6 +673,7 @@ class ConfigManager(object): for group in groups: if group in self._cfgstore['groups']: self._sync_nodes_to_group(group=group, nodes=[]) + _mark_dirtykey('groups', group, self.tenant) del self._cfgstore['groups'][group] self._bg_sync_to_file() @@ -640,6 +688,7 @@ class ConfigManager(object): if attrib in nodek and 'inheritedfrom' not in nodek[attrib]: # if the attribute is set and not inherited, # delete it and check for inheritence to backfil data + _mark_dirtykey('nodes', node, self.tenant) del nodek[attrib] self._do_inheritance(nodek, attrib, node) if ('_expressionkeys' in nodek and @@ -657,7 +706,9 @@ class ConfigManager(object): # pickle the arguments and fire them off in eventlet # flows to peers, all should have the same result for node in attribmap.iterkeys(): + _mark_dirtykey('nodes', node, self.tenant) exprmgr = None + _mark_dirtykey('nodes', node, self.tenant) if node not in self._cfgstore['nodes']: self._cfgstore['nodes'][node] = {} cfgobj = self._cfgstore['nodes'][node] @@ -696,12 +747,24 @@ class ConfigManager(object): #TODO: wait for synchronization to suceed/fail??) @classmethod - def _read_from_file(cls): + def _read_from_path(cls): global _cfgstore - nhandle = open(cls._cfgfilename, 'rb') - fcntl.lockf(nhandle, fcntl.LOCK_SH) - _cfgstore = cPickle.load(nhandle) - fcntl.lockf(nhandle, fcntl.LOCK_UN) + _cfgstore = {} + rootpath = cls._cfgdir + _load_dict_from_dbm(['globals'], rootpath + "/globals") + _load_dict_from_dbm(['main', 'nodes'], rootpath + "/nodes") + _load_dict_from_dbm(['main', 'users'], rootpath + "/users") + _load_dict_from_dbm(['main', 'groups'], rootpath + "/groups") + try: + for tenant in os.listdir(rootpath + '/tenants/'): + _load_dict_from_dbm( + ['main', tenant, 'nodes'], "%s/%s/nodes" % (rootpath,tenant)) + _load_dict_from_dbm( + ['main', tenant, 'groups'], "%s/%s/groups" % (rootpath,tenant)) + _load_dict_from_dbm( + ['main', tenant, 'users'], "%s/%s/users" % (rootpath,tenant)) + except OSError: + pass @classmethod def _bg_sync_to_file(cls): @@ -717,23 +780,42 @@ class ConfigManager(object): @classmethod def _sync_to_file(cls): - # TODO: this is a pretty nasty performance penalty to pay - # as much as it is mitigated and deferred, still need to do better - # possibilities include: - # doing dbm for the major trees, marking the objects that need update - # and only doing changes for those - # the in memory facet seems serviceable though - nfn = cls._cfgfilename + '.new' - nhandle = open(nfn, 'wb') - fcntl.lockf(nhandle, fcntl.LOCK_EX) - cPickle.dump(_cfgstore, nhandle, protocol=2) - fcntl.lockf(nhandle, fcntl.LOCK_UN) - nhandle.close() - try: - os.rename(cls._cfgfilename, cls._cfgfilename + '.old') - except OSError: - pass - os.rename(nfn, cls._cfgfilename) + if 'dirtyglobals' in _cfgstore: + with _dirtylock: + dirtyglobals = copy.deepcopy(_cfgstore['dirtyglobals']); + del _cfgstore['dirtyglobals'] + _mkpath(cls._cfgdir) + globalf = dbm.open(cls._cfgdir + "/globals", 'c', 384) # 0600 + for globalkey in dirtyglobals: + if globalkey in _cfgstore['globals']: + globalf[globalkey] = \ + cPickle.dumps(_cfgstore['globals'][globalkey]) + else: + if globalkey in globalf: + del globalf[globalkey] + globalf.close() + if 'dirtykeys' in _cfgstore: + #with lock: + with _dirtylock: + currdirt = copy.deepcopy(_cfgstore['dirtykeys']) + del _cfgstore['dirtykeys'] + for tenant in currdirt.iterkeys(): + dkdict = currdirt[tenant] + if tenant is None: + pathname = cls._cfgdir + currdict = _cfgstore['main'] + else: + pathname = cls._cfgdir + '/tenants/' + tenant + '/' + currdict = _cfgstore['tenant'][tenant] + for category in dkdict.iterkeys(): + _mkpath(pathname) + dbf = dbm.open(pathname + category, 'c', 384) # 0600 mode + for ck in dkdict[category]: + if ck not in currdict[category]: + if ck in dbf: + del dbf[ck] + else: + dbf[ck] = cPickle.dumps(currdict[category][ck]) if cls._writepending: cls._writepending = False return cls._sync_to_file() @@ -753,7 +835,7 @@ class ConfigManager(object): try: - ConfigManager._read_from_file() + ConfigManager._read_from_path() except IOError: _cfgstore = {}