From db0596d17b7dec45dbc2a7ba85d9dc19bff04831 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Wed, 4 Sep 2013 15:40:35 -0400 Subject: [PATCH] Change config backend writeback to threading from eventlet. Threading should be used sparingly, but in this case eventlet is pretty terrible at doing what needs to be done... this may bode ill for the rest of using eventlet... Rename ConfigData to ConfigManager Add some hooks for tenant and user management in config Begin actually consulting user/tenant info in http api --- confluent/config.py | 63 ++++++++++++++++++++++++++++++-------------- confluent/httpapi.py | 40 +++++++++++++++++++++------- confluent/main.py | 21 ++------------- 3 files changed, 76 insertions(+), 48 deletions(-) diff --git a/confluent/config.py b/confluent/config.py index 1f496ba4..41fcdc2e 100644 --- a/confluent/config.py +++ b/confluent/config.py @@ -45,9 +45,12 @@ import operator import os import re import string +import threading + _cfgstore = None + def get_global(varname): if (_cfgstore is None or 'globals' not in _cfgstore or varname not in _cfgstore['globals']): @@ -61,7 +64,7 @@ def set_global(varname, value): _cfgstore['globals'] = { varname: value } else: _cfgstore['globals'][varname] = value - ConfigData._bg_sync_to_file() + ConfigManager._bg_sync_to_file() class _ExpressionFormat(string.Formatter): @@ -152,20 +155,46 @@ def _decode_attribute(attribute, nodeobj, formatter=None, decrypt=False): return retdict return nodeobj[attribute] + +def get_tenant_id(tenantname): + for tenant in _cfgstore['tenant'].iterkeys(): + if ('name' in _cfgstore['tenant'][tenant] and + _cfgstore['tenant'][tenant]['name'] == tenantname): + return tenant + + +def get_tenant_names(): + global _cfgstore + if _cfgstore is None or 'tenant' not in _cfgstore: + return + for tenant in _cfgstore['tenant'].iterkeys(): + if 'name' in _cfgstore['tenant'][tenant]: + yield _cfgstore['tenant'][tenant]['name'] + +def get_user(name, tenant): + global _cfgstore + for user in _cfgstore['tenant'][tenant]['users'].iterkeys(): + if _cfgstore['tenant'][tenant]['users'][user]['name'] == user: + return _cfgstore['tenant'][tenant]['users'][user] + # my thinking at this point is that noderange and configdata objects # will be constructed and passed as part of a context object to plugins # reasoning being that the main program will handle establishing the # tenant context and then modules need not consider the current tenant # most of the time as things are automatic -class ConfigData(object): +class ConfigManager(object): _cfgfilename = "/etc/confluent/cfgdb" _cfgwriter = None _writepending = False - def __init__(self, tenant=0, decrypt=False): + + def __init__(self, tenant, decrypt=False): global _cfgstore if _cfgstore is None: - self._read_from_file() + try: + self._read_from_file() + except IOError: + _cfgstore = {} self.decrypt = decrypt if 'tenant' not in _cfgstore: _cfgstore['tenant'] = {} @@ -280,30 +309,18 @@ class ConfigData(object): if cls._writepending: # already have a write scheduled return - elif cls._cfgwriter is not None: + elif cls._cfgwriter is not None and cls._cfgwriter.isAlive(): #write in progress, request write when done cls._writepending = True else: - cls._cfgwriter = eventlet.spawn(cls._sync_to_file) - cls._cfgwriter.link(cls._write_complete) - - @classmethod - def _write_complete(cls, gt): - if cls._writepending: - cls._writepending = False - cls._cfgwriter = eventlet.spawn(cls._sync_to_file) - cls._cfgwriter.link(cls._write_complete) - else: - cls._cfgwriter = None + cls._cfgwriter = threading.Thread(target=cls._sync_to_file) + cls._cfgwriter.start() @classmethod def _sync_to_file(cls): # TODO: this is a pretty nasty performance penalty to pay # as much as it is mitigated and deferred, still need to do better # possibilities include: - # threading this out (take it out of greenthread), GIL in theory should - # not penalize this sort of threading, able to hide a great deal of - # sins there... # doing dbm for the major trees, marking the objects that need update # and only doing changes for those # the in memory facet seems serviceable though @@ -313,8 +330,14 @@ class ConfigData(object): cPickle.dump(_cfgstore, nhandle, protocol=2) fcntl.lockf(nhandle, fcntl.LOCK_UN) nhandle.close() - os.rename(cls._cfgfilename, cls._cfgfilename + '.old') + try: + os.rename(cls._cfgfilename, cls._cfgfilename + '.old') + except OSError: + pass os.rename(nfn, cls._cfgfilename) + if cls._writepending: + cls._writepending = False + return cls._sync_to_file() def _recalculate_expressions(self, cfgobj, formatter): for key in cfgobj.keys(): diff --git a/confluent/httpapi.py b/confluent/httpapi.py index 7bc2dfd6..f4b738f1 100644 --- a/confluent/httpapi.py +++ b/confluent/httpapi.py @@ -5,6 +5,7 @@ # shillinabox javascript import base64 import confluent.console as console +import confluent.auth as auth import confluent.util as util import eventlet import os @@ -25,8 +26,7 @@ def _get_query_dict(qstring, reqbody, reqtype): qdict[qkey] = qvalue if reqbody is not None: if reqtype == "application/x-www-form-urlencoded": - - + print reqbody return qdict @@ -34,6 +34,17 @@ def _authorize_request(env): """Grant/Deny access based on data from wsgi env """ + if 'REMOTE_USER' in env: # HTTP Basic auth passed + user = env['REMOTE_USER'] + #TODO: actually pass in the element + authdata = auth.authorize(user, element=None) + if authdata is None: + return {'code': 401} + else: + return {'code': 200, + 'tenant': authdata[0], + 'user': authdata[1]} + # TODO(jbjohnso): actually evaluate the request for authorization # In theory, the x509 or http auth stuff will get translated and then # passed on to the core authorization function in an appropriate form @@ -41,7 +52,6 @@ def _authorize_request(env): # 401 if there is no known identity # 403 if valid identity, but no access # going to run 200 just to get going for now - return 200 def _pick_mimetype(env): @@ -77,13 +87,23 @@ def resourcehandler(env, start_response): mimetype = _pick_mimetype(env) reqbody = None reqtype = None - if 'CONTENT_LENGTH' in env and env['CONTENT_LENGTH']: + if 'CONTENT_LENGTH' in env and int(env['CONTENT_LENGTH']) > 0: reqbody = env['wsgi.input'].read(int(env['CONTENT_LENGTH'])) reqtype = env['CONTENT_TYPE'] print env - if authorized in (401, 403): - start_response(authorized, []) - return + if authorized['code'] == 401: + start_response('401 Authentication Required', + [('Content-type', 'text/plain'), + ('WWW-Authenticate', 'Basic realm="confluent"')]) + return 'authentication required' + if authorized['code'] == 403: + start_response('403 Forbidden', + [('Content-type', 'text/plain'), + ('WWW-Authenticate', 'Basic realm="confluent"')]) + return 'authorization failed' + if authorized['code'] != 200: + raise Exception("Unrecognized code from auth engine") + cfgmgr = config.ConfigManager(authorized['tenant']) querydict = _get_query_dict(env['QUERY_STRING'], reqbody, reqtype) if '/console/session' in env['PATH_INFO']: #hard bake JSON into this path, do not support other incarnations @@ -91,12 +111,14 @@ def resourcehandler(env, start_response): _, _, nodename = prefix.rpartition('/') if 'session' not in querydict.keys() or not querydict['session']: # Request for new session - consession = console.ConsoleSession(node=nodename) + consession = console.ConsoleSession(node=nodename, + configmanager=cfgmgr) if not consession: start_response("500 Internal Server Error", []) return sessid = _assign_consessionid(consession) - start_response('200 OK', [('Content-Type', 'application/json; charset=utf-8')]) + start_response('200 OK', [('Content-Type', + 'application/json; charset=utf-8')]) return ['{"session":"%s","data":""}' % sessid] start_response('404 Not Found', []) return ["Unrecognized directive (404)"] diff --git a/confluent/main.py b/confluent/main.py index 2e699368..b62a7978 100644 --- a/confluent/main.py +++ b/confluent/main.py @@ -11,6 +11,7 @@ # Things like heartbeating and discovery # It also will optionally snoop SLP DA requests +import confluent.pluginapi as pluginapi import confluent.httpapi as httpapi import eventlet from eventlet.green import socket @@ -19,26 +20,8 @@ import multiprocessing import sys import os -pluginmap = {} -def _load_plugins(): - # To know our plugins directory, we get the parent path of 'bin' - path=os.path.dirname(os.path.realpath(__file__)) - plugindir = os.path.realpath(os.path.join(path,'..','plugins')) - sys.path.append(plugindir) - plugins = set() - #two passes, to avoid adding both py and pyc files - for plugin in os.listdir(plugindir): - plugin = os.path.splitext(plugin)[0] - plugins.add(plugin) - for plugin in plugins: - tmpmod = __import__(plugin) - if 'plugin_names' in tmpmod.__dict__: - for name in tmpmod.plugin_names: - pluginmap[name] = tmpmod - - def run(): - _load_plugins() + pluginapi.load_plugins() webservice = httpapi.HttpApi() webservice.start() while (1):