diff --git a/confluent/config.py b/confluent/config.py index 1f496ba4..41fcdc2e 100644 --- a/confluent/config.py +++ b/confluent/config.py @@ -45,9 +45,12 @@ import operator import os import re import string +import threading + _cfgstore = None + def get_global(varname): if (_cfgstore is None or 'globals' not in _cfgstore or varname not in _cfgstore['globals']): @@ -61,7 +64,7 @@ def set_global(varname, value): _cfgstore['globals'] = { varname: value } else: _cfgstore['globals'][varname] = value - ConfigData._bg_sync_to_file() + ConfigManager._bg_sync_to_file() class _ExpressionFormat(string.Formatter): @@ -152,20 +155,46 @@ def _decode_attribute(attribute, nodeobj, formatter=None, decrypt=False): return retdict return nodeobj[attribute] + +def get_tenant_id(tenantname): + for tenant in _cfgstore['tenant'].iterkeys(): + if ('name' in _cfgstore['tenant'][tenant] and + _cfgstore['tenant'][tenant]['name'] == tenantname): + return tenant + + +def get_tenant_names(): + global _cfgstore + if _cfgstore is None or 'tenant' not in _cfgstore: + return + for tenant in _cfgstore['tenant'].iterkeys(): + if 'name' in _cfgstore['tenant'][tenant]: + yield _cfgstore['tenant'][tenant]['name'] + +def get_user(name, tenant): + global _cfgstore + for user in _cfgstore['tenant'][tenant]['users'].iterkeys(): + if _cfgstore['tenant'][tenant]['users'][user]['name'] == user: + return _cfgstore['tenant'][tenant]['users'][user] + # my thinking at this point is that noderange and configdata objects # will be constructed and passed as part of a context object to plugins # reasoning being that the main program will handle establishing the # tenant context and then modules need not consider the current tenant # most of the time as things are automatic -class ConfigData(object): +class ConfigManager(object): _cfgfilename = "/etc/confluent/cfgdb" _cfgwriter = None _writepending = False - def __init__(self, tenant=0, decrypt=False): + + def __init__(self, tenant, decrypt=False): global _cfgstore if _cfgstore is None: - self._read_from_file() + try: + self._read_from_file() + except IOError: + _cfgstore = {} self.decrypt = decrypt if 'tenant' not in _cfgstore: _cfgstore['tenant'] = {} @@ -280,30 +309,18 @@ class ConfigData(object): if cls._writepending: # already have a write scheduled return - elif cls._cfgwriter is not None: + elif cls._cfgwriter is not None and cls._cfgwriter.isAlive(): #write in progress, request write when done cls._writepending = True else: - cls._cfgwriter = eventlet.spawn(cls._sync_to_file) - cls._cfgwriter.link(cls._write_complete) - - @classmethod - def _write_complete(cls, gt): - if cls._writepending: - cls._writepending = False - cls._cfgwriter = eventlet.spawn(cls._sync_to_file) - cls._cfgwriter.link(cls._write_complete) - else: - cls._cfgwriter = None + cls._cfgwriter = threading.Thread(target=cls._sync_to_file) + cls._cfgwriter.start() @classmethod def _sync_to_file(cls): # TODO: this is a pretty nasty performance penalty to pay # as much as it is mitigated and deferred, still need to do better # possibilities include: - # threading this out (take it out of greenthread), GIL in theory should - # not penalize this sort of threading, able to hide a great deal of - # sins there... # doing dbm for the major trees, marking the objects that need update # and only doing changes for those # the in memory facet seems serviceable though @@ -313,8 +330,14 @@ class ConfigData(object): cPickle.dump(_cfgstore, nhandle, protocol=2) fcntl.lockf(nhandle, fcntl.LOCK_UN) nhandle.close() - os.rename(cls._cfgfilename, cls._cfgfilename + '.old') + try: + os.rename(cls._cfgfilename, cls._cfgfilename + '.old') + except OSError: + pass os.rename(nfn, cls._cfgfilename) + if cls._writepending: + cls._writepending = False + return cls._sync_to_file() def _recalculate_expressions(self, cfgobj, formatter): for key in cfgobj.keys(): diff --git a/confluent/httpapi.py b/confluent/httpapi.py index 7bc2dfd6..f4b738f1 100644 --- a/confluent/httpapi.py +++ b/confluent/httpapi.py @@ -5,6 +5,7 @@ # shillinabox javascript import base64 import confluent.console as console +import confluent.auth as auth import confluent.util as util import eventlet import os @@ -25,8 +26,7 @@ def _get_query_dict(qstring, reqbody, reqtype): qdict[qkey] = qvalue if reqbody is not None: if reqtype == "application/x-www-form-urlencoded": - - + print reqbody return qdict @@ -34,6 +34,17 @@ def _authorize_request(env): """Grant/Deny access based on data from wsgi env """ + if 'REMOTE_USER' in env: # HTTP Basic auth passed + user = env['REMOTE_USER'] + #TODO: actually pass in the element + authdata = auth.authorize(user, element=None) + if authdata is None: + return {'code': 401} + else: + return {'code': 200, + 'tenant': authdata[0], + 'user': authdata[1]} + # TODO(jbjohnso): actually evaluate the request for authorization # In theory, the x509 or http auth stuff will get translated and then # passed on to the core authorization function in an appropriate form @@ -41,7 +52,6 @@ def _authorize_request(env): # 401 if there is no known identity # 403 if valid identity, but no access # going to run 200 just to get going for now - return 200 def _pick_mimetype(env): @@ -77,13 +87,23 @@ def resourcehandler(env, start_response): mimetype = _pick_mimetype(env) reqbody = None reqtype = None - if 'CONTENT_LENGTH' in env and env['CONTENT_LENGTH']: + if 'CONTENT_LENGTH' in env and int(env['CONTENT_LENGTH']) > 0: reqbody = env['wsgi.input'].read(int(env['CONTENT_LENGTH'])) reqtype = env['CONTENT_TYPE'] print env - if authorized in (401, 403): - start_response(authorized, []) - return + if authorized['code'] == 401: + start_response('401 Authentication Required', + [('Content-type', 'text/plain'), + ('WWW-Authenticate', 'Basic realm="confluent"')]) + return 'authentication required' + if authorized['code'] == 403: + start_response('403 Forbidden', + [('Content-type', 'text/plain'), + ('WWW-Authenticate', 'Basic realm="confluent"')]) + return 'authorization failed' + if authorized['code'] != 200: + raise Exception("Unrecognized code from auth engine") + cfgmgr = config.ConfigManager(authorized['tenant']) querydict = _get_query_dict(env['QUERY_STRING'], reqbody, reqtype) if '/console/session' in env['PATH_INFO']: #hard bake JSON into this path, do not support other incarnations @@ -91,12 +111,14 @@ def resourcehandler(env, start_response): _, _, nodename = prefix.rpartition('/') if 'session' not in querydict.keys() or not querydict['session']: # Request for new session - consession = console.ConsoleSession(node=nodename) + consession = console.ConsoleSession(node=nodename, + configmanager=cfgmgr) if not consession: start_response("500 Internal Server Error", []) return sessid = _assign_consessionid(consession) - start_response('200 OK', [('Content-Type', 'application/json; charset=utf-8')]) + start_response('200 OK', [('Content-Type', + 'application/json; charset=utf-8')]) return ['{"session":"%s","data":""}' % sessid] start_response('404 Not Found', []) return ["Unrecognized directive (404)"] diff --git a/confluent/main.py b/confluent/main.py index 2e699368..b62a7978 100644 --- a/confluent/main.py +++ b/confluent/main.py @@ -11,6 +11,7 @@ # Things like heartbeating and discovery # It also will optionally snoop SLP DA requests +import confluent.pluginapi as pluginapi import confluent.httpapi as httpapi import eventlet from eventlet.green import socket @@ -19,26 +20,8 @@ import multiprocessing import sys import os -pluginmap = {} -def _load_plugins(): - # To know our plugins directory, we get the parent path of 'bin' - path=os.path.dirname(os.path.realpath(__file__)) - plugindir = os.path.realpath(os.path.join(path,'..','plugins')) - sys.path.append(plugindir) - plugins = set() - #two passes, to avoid adding both py and pyc files - for plugin in os.listdir(plugindir): - plugin = os.path.splitext(plugin)[0] - plugins.add(plugin) - for plugin in plugins: - tmpmod = __import__(plugin) - if 'plugin_names' in tmpmod.__dict__: - for name in tmpmod.plugin_names: - pluginmap[name] = tmpmod - - def run(): - _load_plugins() + pluginapi.load_plugins() webservice = httpapi.HttpApi() webservice.start() while (1):