mirror of
https://github.com/xcat2/confluent.git
synced 2024-11-25 19:10:10 +00:00
Implement dbm style backing for the configuration persistence
The pickling would get horrendously slow as total node count increased. This meant very long time to sync to disk for just one change out of 65,000. This strategy changes things to more selective and only do things for the dirty keys rather than everything. Large changes to small amounts of nodes will take more time (because more calls to dump pickle), but small changes to a small subset of nodes will take much less time.
This commit is contained in:
parent
7595984cb9
commit
9617d1f4a4
8
TODO
8
TODO
@ -1,3 +1,11 @@
|
||||
-user can bog down all requests by hammering it with bad auth requests causing
|
||||
the auth facility to get bogged down in PBKDF
|
||||
Option:
|
||||
-a multiprocessing pool to handle new authentications. The auth action
|
||||
itself would be stalled out whilst an attack was underway, but once
|
||||
in, the respective session layers will provide a caching session that
|
||||
should accelerate things after the client gets in once
|
||||
-penalizing a client clearly trying to break in
|
||||
-expressionkeys never gets smaller - perf impact
|
||||
-need event notification for config change- e.g. set attribute triggers consol
|
||||
session object check to see if credentials changed
|
||||
|
@ -52,12 +52,14 @@ from Crypto.Cipher import AES
|
||||
from Crypto.Hash import HMAC
|
||||
from Crypto.Hash import SHA256
|
||||
import array
|
||||
import anydbm as dbm
|
||||
import ast
|
||||
import collections
|
||||
import confluent.config.attributes as allattributes
|
||||
import confluent.util
|
||||
import copy
|
||||
import cPickle
|
||||
import errno
|
||||
import eventlet
|
||||
import fcntl
|
||||
import math
|
||||
@ -70,8 +72,17 @@ import threading
|
||||
|
||||
_masterkey = None
|
||||
_masterintegritykey = None
|
||||
_dirtylock = threading.RLock()
|
||||
|
||||
|
||||
def _mkpath(pathname):
|
||||
try:
|
||||
os.makedirs(pathname)
|
||||
except OSError as exc:
|
||||
if exc.errno == errno.EEXIST and os.path.isdir(pathname):
|
||||
pass
|
||||
else:
|
||||
raise
|
||||
|
||||
def _derive_keys(passphrase, salt):
|
||||
#implement our specific combination of pbkdf2 transforms to get at
|
||||
@ -117,7 +128,7 @@ def init_masterkey(passphrase=None):
|
||||
_masterkey = _get_protected_key(cfgn, passphrase=passphrase)
|
||||
else:
|
||||
_masterkey = os.urandom(32)
|
||||
configmanager.set_global('master_privacy_key', _format_key(
|
||||
set_global('master_privacy_key', _format_key(
|
||||
_masterkey,
|
||||
passphrase=passphrase))
|
||||
cfgn = get_global('master_integrity_key')
|
||||
@ -125,7 +136,7 @@ def init_masterkey(passphrase=None):
|
||||
_masterintegritykey = _get_protected_key(cfgn, passphrase=passphrase)
|
||||
else:
|
||||
_masterintegritykey = os.urandom(64)
|
||||
configmanager.set_global('master_integrity_key', _format_key(
|
||||
set_global('master_integrity_key', _format_key(
|
||||
_masterintegritykey,
|
||||
passphrase=passphrase))
|
||||
|
||||
@ -172,6 +183,18 @@ def crypt_value(value,
|
||||
return (iv, cryptval, hmac)
|
||||
|
||||
|
||||
def _load_dict_from_dbm(dpath, tdb):
|
||||
try:
|
||||
dbe = dbm.open(tdb, 'r')
|
||||
currdict = _cfgstore
|
||||
for elem in dpath:
|
||||
if elem not in currdict:
|
||||
currdict[elem] = {}
|
||||
currdict = currdict[elem]
|
||||
for tk in dbe.iterkeys():
|
||||
currdict[tk] = cPickle.loads(dbe[tk])
|
||||
except dbm.error:
|
||||
return
|
||||
|
||||
def is_tenant(tenant):
|
||||
try:
|
||||
@ -200,6 +223,10 @@ def set_global(globalname, value):
|
||||
:param globalname: The global parameter name to store
|
||||
:param value: The value to set the global parameter to.
|
||||
"""
|
||||
with _dirtylock:
|
||||
if 'dirtyglobals' not in _cfgstore:
|
||||
_cfgstore['dirtyglobals'] = set()
|
||||
_cfgstore['dirtyglobals'].add(globalname)
|
||||
if 'globals' not in _cfgstore:
|
||||
_cfgstore['globals'] = { globalname: value }
|
||||
else:
|
||||
@ -207,6 +234,17 @@ def set_global(globalname, value):
|
||||
ConfigManager._bg_sync_to_file()
|
||||
|
||||
|
||||
def _mark_dirtykey(category, key, tenant=None):
|
||||
with _dirtylock:
|
||||
if 'dirtykeys' not in _cfgstore:
|
||||
_cfgstore['dirtykeys'] = {}
|
||||
if tenant not in _cfgstore['dirtykeys']:
|
||||
_cfgstore['dirtykeys'][tenant] = {}
|
||||
if category not in _cfgstore['dirtykeys'][tenant]:
|
||||
_cfgstore['dirtykeys'][tenant][category] = set()
|
||||
_cfgstore['dirtykeys'][tenant][category].add(key)
|
||||
|
||||
|
||||
def _generate_new_id():
|
||||
# generate a random id outside the usual ranges used for norml users in
|
||||
# /etc/passwd. Leave an equivalent amount of space near the end disused,
|
||||
@ -322,7 +360,7 @@ def _decode_attribute(attribute, nodeobj, formatter=None, decrypt=False):
|
||||
# most of the time as things are automatic
|
||||
|
||||
class ConfigManager(object):
|
||||
_cfgfilename = "/etc/confluent/cfgdb"
|
||||
_cfgdir = "/etc/confluent/cfg/"
|
||||
_cfgwriter = None
|
||||
_writepending = False
|
||||
|
||||
@ -332,14 +370,14 @@ class ConfigManager(object):
|
||||
if tenant is None:
|
||||
self.tenant = None
|
||||
if 'main' not in _cfgstore:
|
||||
_cfgstore['main'] = {'id': None}
|
||||
_cfgstore['main'] = {}
|
||||
self._cfgstore = _cfgstore['main']
|
||||
return
|
||||
elif 'tenant' not in _cfgstore:
|
||||
_cfgstore['tenant'] = {tenant: {'id': tenant}}
|
||||
_cfgstore['tenant'] = {tenant: {}}
|
||||
self._bg_sync_to_file()
|
||||
elif tenant not in _cfgstore['tenant']:
|
||||
_cfgstore['tenant'][tenant] = {'id': tenant}
|
||||
_cfgstore['tenant'][tenant] = {}
|
||||
self._bg_sync_to_file()
|
||||
self.tenant = tenant
|
||||
self._cfgstore = _cfgstore['tenant'][tenant]
|
||||
@ -369,6 +407,7 @@ class ConfigManager(object):
|
||||
:param attributemap: A dict of key values to set
|
||||
"""
|
||||
user = self._cfgstore['users'][name]
|
||||
_mark_dirtykey('users', name, self.tenant)
|
||||
for attribute in attributemap:
|
||||
user[attribute] = attributemap[attribute]
|
||||
self._bg_sync_to_file()
|
||||
@ -393,11 +432,13 @@ class ConfigManager(object):
|
||||
self._cfgstore['users'] = { }
|
||||
if name in self._cfgstore['users']:
|
||||
raise Exception("Duplicate username requested")
|
||||
_mark_dirtykey('users', name, self.tenant)
|
||||
self._cfgstore['users'][name] = {'id': id}
|
||||
if displayname is not None:
|
||||
self._cfgstore['users'][name]['displayname'] = displayname
|
||||
if 'idmap' not in _cfgstore:
|
||||
_cfgstore['idmap'] = {}
|
||||
_mark_dirtykey('idmap', id)
|
||||
_cfgstore['idmap'][id] = {
|
||||
'tenant': self.tenant,
|
||||
'username': name
|
||||
@ -490,6 +531,7 @@ class ConfigManager(object):
|
||||
continue
|
||||
try:
|
||||
if nodecfg[attrib]['inheritedfrom'] == group:
|
||||
_mark_dirtykey('node', node, self.tenant)
|
||||
del nodecfg[attrib] # remove invalid inherited data
|
||||
self._do_inheritance(nodecfg, attrib, node)
|
||||
except KeyError: # inheritedfrom not set, move on
|
||||
@ -513,6 +555,7 @@ class ConfigManager(object):
|
||||
if srcgroup is not None and group != srcgroup:
|
||||
# skip needless deepcopy
|
||||
return
|
||||
_mark_dirtykey('nodes', nodename, self.tenant)
|
||||
nodecfg[attrib] = \
|
||||
copy.deepcopy(self._cfgstore['groups'][group][attrib])
|
||||
nodecfg[attrib]['inheritedfrom'] = group
|
||||
@ -531,15 +574,14 @@ class ConfigManager(object):
|
||||
self._cfgstore['groups'][group]['nodes'].discard(node)
|
||||
self._node_removed_from_group(node, group)
|
||||
for group in groups:
|
||||
if 'grouplist' not in self._cfgstore:
|
||||
self._cfgstore['grouplist'] = [group]
|
||||
elif group not in self._cfgstore['grouplist']:
|
||||
self._cfgstore['grouplist'].append(group)
|
||||
if group not in self._cfgstore['groups']:
|
||||
_mark_dirtykey('groups', group, self.tenant)
|
||||
self._cfgstore['groups'][group] = {'nodes': set([node])}
|
||||
elif 'nodes' not in self._cfgstore['groups'][group]:
|
||||
_mark_dirtykey('groups', group, self.tenant)
|
||||
self._cfgstore['groups'][group]['nodes'] = set([node])
|
||||
elif node not in self._cfgstore['groups'][group]['nodes']:
|
||||
_mark_dirtykey('groups', group, self.tenant)
|
||||
self._cfgstore['groups'][group]['nodes'].add(node)
|
||||
# node was not already in given group, perform inheritence fixup
|
||||
self._node_added_to_group(node, group)
|
||||
@ -554,10 +596,13 @@ class ConfigManager(object):
|
||||
self._node_removed_from_group(node, group)
|
||||
for node in nodes:
|
||||
if node not in self._cfgstore['nodes']:
|
||||
_mark_dirtykey('nodes', node, self.tenant)
|
||||
self._cfgstore['nodes'][node] = {'groups': [group]}
|
||||
elif 'groups' not in self._cfgstore['nodes'][node]:
|
||||
_mark_dirtykey('nodes', node, self.tenant)
|
||||
self._cfgstore['nodes'][node]['groups'] = [group]
|
||||
elif group not in self._cfgstore['nodes'][node]['groups']:
|
||||
_mark_dirtykey('nodes', node, self.tenant)
|
||||
self._cfgstore['nodes'][node]['groups'].insert(0, group)
|
||||
else:
|
||||
continue # next node, this node already in
|
||||
@ -567,6 +612,7 @@ class ConfigManager(object):
|
||||
if 'groups' not in self._cfgstore:
|
||||
self._cfgstore['groups'] = {}
|
||||
for group in attribmap.iterkeys():
|
||||
_mark_dirtykey('groups', group, self.tenant)
|
||||
if group not in self._cfgstore['groups']:
|
||||
self._cfgstore['groups'][group] = {'nodes': set([])}
|
||||
cfgobj = self._cfgstore['groups'][group]
|
||||
@ -617,6 +663,7 @@ class ConfigManager(object):
|
||||
for node in nodes:
|
||||
if node in self._cfgstore['nodes']:
|
||||
self._sync_groups_to_node(node=node, groups=[])
|
||||
_mark_dirtykey('nodes', node, self.tenant)
|
||||
del self._cfgstore['nodes'][node]
|
||||
self._bg_sync_to_file()
|
||||
|
||||
@ -626,6 +673,7 @@ class ConfigManager(object):
|
||||
for group in groups:
|
||||
if group in self._cfgstore['groups']:
|
||||
self._sync_nodes_to_group(group=group, nodes=[])
|
||||
_mark_dirtykey('groups', group, self.tenant)
|
||||
del self._cfgstore['groups'][group]
|
||||
self._bg_sync_to_file()
|
||||
|
||||
@ -640,6 +688,7 @@ class ConfigManager(object):
|
||||
if attrib in nodek and 'inheritedfrom' not in nodek[attrib]:
|
||||
# if the attribute is set and not inherited,
|
||||
# delete it and check for inheritence to backfil data
|
||||
_mark_dirtykey('nodes', node, self.tenant)
|
||||
del nodek[attrib]
|
||||
self._do_inheritance(nodek, attrib, node)
|
||||
if ('_expressionkeys' in nodek and
|
||||
@ -657,7 +706,9 @@ class ConfigManager(object):
|
||||
# pickle the arguments and fire them off in eventlet
|
||||
# flows to peers, all should have the same result
|
||||
for node in attribmap.iterkeys():
|
||||
_mark_dirtykey('nodes', node, self.tenant)
|
||||
exprmgr = None
|
||||
_mark_dirtykey('nodes', node, self.tenant)
|
||||
if node not in self._cfgstore['nodes']:
|
||||
self._cfgstore['nodes'][node] = {}
|
||||
cfgobj = self._cfgstore['nodes'][node]
|
||||
@ -696,12 +747,24 @@ class ConfigManager(object):
|
||||
#TODO: wait for synchronization to suceed/fail??)
|
||||
|
||||
@classmethod
|
||||
def _read_from_file(cls):
|
||||
def _read_from_path(cls):
|
||||
global _cfgstore
|
||||
nhandle = open(cls._cfgfilename, 'rb')
|
||||
fcntl.lockf(nhandle, fcntl.LOCK_SH)
|
||||
_cfgstore = cPickle.load(nhandle)
|
||||
fcntl.lockf(nhandle, fcntl.LOCK_UN)
|
||||
_cfgstore = {}
|
||||
rootpath = cls._cfgdir
|
||||
_load_dict_from_dbm(['globals'], rootpath + "/globals")
|
||||
_load_dict_from_dbm(['main', 'nodes'], rootpath + "/nodes")
|
||||
_load_dict_from_dbm(['main', 'users'], rootpath + "/users")
|
||||
_load_dict_from_dbm(['main', 'groups'], rootpath + "/groups")
|
||||
try:
|
||||
for tenant in os.listdir(rootpath + '/tenants/'):
|
||||
_load_dict_from_dbm(
|
||||
['main', tenant, 'nodes'], "%s/%s/nodes" % (rootpath,tenant))
|
||||
_load_dict_from_dbm(
|
||||
['main', tenant, 'groups'], "%s/%s/groups" % (rootpath,tenant))
|
||||
_load_dict_from_dbm(
|
||||
['main', tenant, 'users'], "%s/%s/users" % (rootpath,tenant))
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
def _bg_sync_to_file(cls):
|
||||
@ -717,23 +780,42 @@ class ConfigManager(object):
|
||||
|
||||
@classmethod
|
||||
def _sync_to_file(cls):
|
||||
# TODO: this is a pretty nasty performance penalty to pay
|
||||
# as much as it is mitigated and deferred, still need to do better
|
||||
# possibilities include:
|
||||
# doing dbm for the major trees, marking the objects that need update
|
||||
# and only doing changes for those
|
||||
# the in memory facet seems serviceable though
|
||||
nfn = cls._cfgfilename + '.new'
|
||||
nhandle = open(nfn, 'wb')
|
||||
fcntl.lockf(nhandle, fcntl.LOCK_EX)
|
||||
cPickle.dump(_cfgstore, nhandle, protocol=2)
|
||||
fcntl.lockf(nhandle, fcntl.LOCK_UN)
|
||||
nhandle.close()
|
||||
try:
|
||||
os.rename(cls._cfgfilename, cls._cfgfilename + '.old')
|
||||
except OSError:
|
||||
pass
|
||||
os.rename(nfn, cls._cfgfilename)
|
||||
if 'dirtyglobals' in _cfgstore:
|
||||
with _dirtylock:
|
||||
dirtyglobals = copy.deepcopy(_cfgstore['dirtyglobals']);
|
||||
del _cfgstore['dirtyglobals']
|
||||
_mkpath(cls._cfgdir)
|
||||
globalf = dbm.open(cls._cfgdir + "/globals", 'c', 384) # 0600
|
||||
for globalkey in dirtyglobals:
|
||||
if globalkey in _cfgstore['globals']:
|
||||
globalf[globalkey] = \
|
||||
cPickle.dumps(_cfgstore['globals'][globalkey])
|
||||
else:
|
||||
if globalkey in globalf:
|
||||
del globalf[globalkey]
|
||||
globalf.close()
|
||||
if 'dirtykeys' in _cfgstore:
|
||||
#with lock:
|
||||
with _dirtylock:
|
||||
currdirt = copy.deepcopy(_cfgstore['dirtykeys'])
|
||||
del _cfgstore['dirtykeys']
|
||||
for tenant in currdirt.iterkeys():
|
||||
dkdict = currdirt[tenant]
|
||||
if tenant is None:
|
||||
pathname = cls._cfgdir
|
||||
currdict = _cfgstore['main']
|
||||
else:
|
||||
pathname = cls._cfgdir + '/tenants/' + tenant + '/'
|
||||
currdict = _cfgstore['tenant'][tenant]
|
||||
for category in dkdict.iterkeys():
|
||||
_mkpath(pathname)
|
||||
dbf = dbm.open(pathname + category, 'c', 384) # 0600 mode
|
||||
for ck in dkdict[category]:
|
||||
if ck not in currdict[category]:
|
||||
if ck in dbf:
|
||||
del dbf[ck]
|
||||
else:
|
||||
dbf[ck] = cPickle.dumps(currdict[category][ck])
|
||||
if cls._writepending:
|
||||
cls._writepending = False
|
||||
return cls._sync_to_file()
|
||||
@ -753,7 +835,7 @@ class ConfigManager(object):
|
||||
|
||||
|
||||
try:
|
||||
ConfigManager._read_from_file()
|
||||
ConfigManager._read_from_path()
|
||||
except IOError:
|
||||
_cfgstore = {}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user