2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-02-16 10:39:23 +00:00

Change config backend writeback to threading from eventlet. Threading should be used sparingly, but in this case eventlet is pretty terrible at doing what

needs to be done... this may bode ill for the rest of using eventlet...

Rename ConfigData to ConfigManager

Add some hooks for tenant and user management in config

Begin actually consulting user/tenant info in http api
This commit is contained in:
Jarrod Johnson 2013-09-04 15:40:35 -04:00
parent 42b23fb8f8
commit db0596d17b
3 changed files with 76 additions and 48 deletions

View File

@ -45,9 +45,12 @@ import operator
import os
import re
import string
import threading
_cfgstore = None
def get_global(varname):
if (_cfgstore is None or 'globals' not in _cfgstore or
varname not in _cfgstore['globals']):
@ -61,7 +64,7 @@ def set_global(varname, value):
_cfgstore['globals'] = { varname: value }
else:
_cfgstore['globals'][varname] = value
ConfigData._bg_sync_to_file()
ConfigManager._bg_sync_to_file()
class _ExpressionFormat(string.Formatter):
@ -152,20 +155,46 @@ def _decode_attribute(attribute, nodeobj, formatter=None, decrypt=False):
return retdict
return nodeobj[attribute]
def get_tenant_id(tenantname):
for tenant in _cfgstore['tenant'].iterkeys():
if ('name' in _cfgstore['tenant'][tenant] and
_cfgstore['tenant'][tenant]['name'] == tenantname):
return tenant
def get_tenant_names():
global _cfgstore
if _cfgstore is None or 'tenant' not in _cfgstore:
return
for tenant in _cfgstore['tenant'].iterkeys():
if 'name' in _cfgstore['tenant'][tenant]:
yield _cfgstore['tenant'][tenant]['name']
def get_user(name, tenant):
global _cfgstore
for user in _cfgstore['tenant'][tenant]['users'].iterkeys():
if _cfgstore['tenant'][tenant]['users'][user]['name'] == user:
return _cfgstore['tenant'][tenant]['users'][user]
# my thinking at this point is that noderange and configdata objects
# will be constructed and passed as part of a context object to plugins
# reasoning being that the main program will handle establishing the
# tenant context and then modules need not consider the current tenant
# most of the time as things are automatic
class ConfigData(object):
class ConfigManager(object):
_cfgfilename = "/etc/confluent/cfgdb"
_cfgwriter = None
_writepending = False
def __init__(self, tenant=0, decrypt=False):
def __init__(self, tenant, decrypt=False):
global _cfgstore
if _cfgstore is None:
self._read_from_file()
try:
self._read_from_file()
except IOError:
_cfgstore = {}
self.decrypt = decrypt
if 'tenant' not in _cfgstore:
_cfgstore['tenant'] = {}
@ -280,30 +309,18 @@ class ConfigData(object):
if cls._writepending:
# already have a write scheduled
return
elif cls._cfgwriter is not None:
elif cls._cfgwriter is not None and cls._cfgwriter.isAlive():
#write in progress, request write when done
cls._writepending = True
else:
cls._cfgwriter = eventlet.spawn(cls._sync_to_file)
cls._cfgwriter.link(cls._write_complete)
@classmethod
def _write_complete(cls, gt):
if cls._writepending:
cls._writepending = False
cls._cfgwriter = eventlet.spawn(cls._sync_to_file)
cls._cfgwriter.link(cls._write_complete)
else:
cls._cfgwriter = None
cls._cfgwriter = threading.Thread(target=cls._sync_to_file)
cls._cfgwriter.start()
@classmethod
def _sync_to_file(cls):
# TODO: this is a pretty nasty performance penalty to pay
# as much as it is mitigated and deferred, still need to do better
# possibilities include:
# threading this out (take it out of greenthread), GIL in theory should
# not penalize this sort of threading, able to hide a great deal of
# sins there...
# doing dbm for the major trees, marking the objects that need update
# and only doing changes for those
# the in memory facet seems serviceable though
@ -313,8 +330,14 @@ class ConfigData(object):
cPickle.dump(_cfgstore, nhandle, protocol=2)
fcntl.lockf(nhandle, fcntl.LOCK_UN)
nhandle.close()
os.rename(cls._cfgfilename, cls._cfgfilename + '.old')
try:
os.rename(cls._cfgfilename, cls._cfgfilename + '.old')
except OSError:
pass
os.rename(nfn, cls._cfgfilename)
if cls._writepending:
cls._writepending = False
return cls._sync_to_file()
def _recalculate_expressions(self, cfgobj, formatter):
for key in cfgobj.keys():

View File

@ -5,6 +5,7 @@
# shillinabox javascript
import base64
import confluent.console as console
import confluent.auth as auth
import confluent.util as util
import eventlet
import os
@ -25,8 +26,7 @@ def _get_query_dict(qstring, reqbody, reqtype):
qdict[qkey] = qvalue
if reqbody is not None:
if reqtype == "application/x-www-form-urlencoded":
print reqbody
return qdict
@ -34,6 +34,17 @@ def _authorize_request(env):
"""Grant/Deny access based on data from wsgi env
"""
if 'REMOTE_USER' in env: # HTTP Basic auth passed
user = env['REMOTE_USER']
#TODO: actually pass in the element
authdata = auth.authorize(user, element=None)
if authdata is None:
return {'code': 401}
else:
return {'code': 200,
'tenant': authdata[0],
'user': authdata[1]}
# TODO(jbjohnso): actually evaluate the request for authorization
# In theory, the x509 or http auth stuff will get translated and then
# passed on to the core authorization function in an appropriate form
@ -41,7 +52,6 @@ def _authorize_request(env):
# 401 if there is no known identity
# 403 if valid identity, but no access
# going to run 200 just to get going for now
return 200
def _pick_mimetype(env):
@ -77,13 +87,23 @@ def resourcehandler(env, start_response):
mimetype = _pick_mimetype(env)
reqbody = None
reqtype = None
if 'CONTENT_LENGTH' in env and env['CONTENT_LENGTH']:
if 'CONTENT_LENGTH' in env and int(env['CONTENT_LENGTH']) > 0:
reqbody = env['wsgi.input'].read(int(env['CONTENT_LENGTH']))
reqtype = env['CONTENT_TYPE']
print env
if authorized in (401, 403):
start_response(authorized, [])
return
if authorized['code'] == 401:
start_response('401 Authentication Required',
[('Content-type', 'text/plain'),
('WWW-Authenticate', 'Basic realm="confluent"')])
return 'authentication required'
if authorized['code'] == 403:
start_response('403 Forbidden',
[('Content-type', 'text/plain'),
('WWW-Authenticate', 'Basic realm="confluent"')])
return 'authorization failed'
if authorized['code'] != 200:
raise Exception("Unrecognized code from auth engine")
cfgmgr = config.ConfigManager(authorized['tenant'])
querydict = _get_query_dict(env['QUERY_STRING'], reqbody, reqtype)
if '/console/session' in env['PATH_INFO']:
#hard bake JSON into this path, do not support other incarnations
@ -91,12 +111,14 @@ def resourcehandler(env, start_response):
_, _, nodename = prefix.rpartition('/')
if 'session' not in querydict.keys() or not querydict['session']:
# Request for new session
consession = console.ConsoleSession(node=nodename)
consession = console.ConsoleSession(node=nodename,
configmanager=cfgmgr)
if not consession:
start_response("500 Internal Server Error", [])
return
sessid = _assign_consessionid(consession)
start_response('200 OK', [('Content-Type', 'application/json; charset=utf-8')])
start_response('200 OK', [('Content-Type',
'application/json; charset=utf-8')])
return ['{"session":"%s","data":""}' % sessid]
start_response('404 Not Found', [])
return ["Unrecognized directive (404)"]

View File

@ -11,6 +11,7 @@
# Things like heartbeating and discovery
# It also will optionally snoop SLP DA requests
import confluent.pluginapi as pluginapi
import confluent.httpapi as httpapi
import eventlet
from eventlet.green import socket
@ -19,26 +20,8 @@ import multiprocessing
import sys
import os
pluginmap = {}
def _load_plugins():
# To know our plugins directory, we get the parent path of 'bin'
path=os.path.dirname(os.path.realpath(__file__))
plugindir = os.path.realpath(os.path.join(path,'..','plugins'))
sys.path.append(plugindir)
plugins = set()
#two passes, to avoid adding both py and pyc files
for plugin in os.listdir(plugindir):
plugin = os.path.splitext(plugin)[0]
plugins.add(plugin)
for plugin in plugins:
tmpmod = __import__(plugin)
if 'plugin_names' in tmpmod.__dict__:
for name in tmpmod.plugin_names:
pluginmap[name] = tmpmod
def run():
_load_plugins()
pluginapi.load_plugins()
webservice = httpapi.HttpApi()
webservice.start()
while (1):