mirror of
https://github.com/xcat2/confluent.git
synced 2025-01-16 20:57:53 +00:00
d5be1ccf8c
If a session is closed, also kill off any associated relays in progress. One exception, video port relay in ESTABLISHED is left alone due to limitation, but at least no new open.
803 lines
32 KiB
Python
803 lines
32 KiB
Python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
|
|
|
# Copyright 2014 IBM Corporation
|
|
# Copyright 2015-2016 Lenovo
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
# This SCGI server provides a http wrap to confluent api
|
|
# It additionally manages httprequest console sessions
|
|
import base64
|
|
import Cookie
|
|
import confluent.auth as auth
|
|
import confluent.config.attributes as attribs
|
|
import confluent.consoleserver as consoleserver
|
|
import confluent.forwarder as forwarder
|
|
import confluent.exceptions as exc
|
|
import confluent.log as log
|
|
import confluent.messages
|
|
import confluent.core as pluginapi
|
|
import confluent.asynchttp
|
|
import confluent.shellserver as shellserver
|
|
import confluent.tlvdata
|
|
import confluent.util as util
|
|
import copy
|
|
import eventlet
|
|
import eventlet.greenthread
|
|
import greenlet
|
|
import json
|
|
import socket
|
|
import sys
|
|
import traceback
|
|
import time
|
|
import urlparse
|
|
import eventlet.wsgi
|
|
#scgi = eventlet.import_patched('flup.server.scgi')
|
|
tlvdata = confluent.tlvdata
|
|
|
|
|
|
auditlog = None
|
|
tracelog = None
|
|
consolesessions = {}
|
|
confluent.asynchttp.set_console_sessions(consolesessions)
|
|
httpsessions = {}
|
|
opmap = {
|
|
'POST': 'create',
|
|
'GET': 'retrieve',
|
|
'PUT': 'update',
|
|
'DELETE': 'delete',
|
|
}
|
|
|
|
|
|
class RobustCookie(Cookie.SimpleCookie):
|
|
# this is very bad form, but BaseCookie has a terrible flaw
|
|
def _BaseCookie__set(self, K, rval, cval):
|
|
try:
|
|
super(RobustCookie, self)._BaseCookie__set(K, rval, cval)
|
|
except Cookie.CookieError:
|
|
# empty value if SimpleCookie rejects
|
|
dict.__setitem__(self, K, Cookie.Morsel())
|
|
|
|
|
|
def group_creation_resources():
|
|
yield confluent.messages.Attributes(
|
|
kv={'name': None}, desc="Name of the group").html() + '<br>'
|
|
yield confluent.messages.ListAttributes(kv={'nodes': []},
|
|
desc='Nodes to add to the group'
|
|
).html() + '<br>\n'
|
|
for attr in sorted(attribs.node.iterkeys()):
|
|
if attr == 'groups':
|
|
continue
|
|
if attr.startswith("secret."):
|
|
yield confluent.messages.CryptedAttributes(
|
|
kv={attr: None},
|
|
desc=attribs.node[attr]['description']).html() + '<br>\n'
|
|
elif ('type' in attribs.node[attr] and
|
|
list == attribs.node[attr]['type']):
|
|
yield confluent.messages.ListAttributes(
|
|
kv={attr: []},
|
|
desc=attribs.node[attr]['description']).html() + '<br>\n'
|
|
else:
|
|
yield confluent.messages.Attributes(
|
|
kv={attr: None},
|
|
desc=attribs.node[attr]['description']).html() + '<br>\n'
|
|
|
|
|
|
def node_creation_resources():
|
|
yield confluent.messages.Attributes(
|
|
kv={'name': None}, desc="Name of the node").html() + '<br>'
|
|
for attr in sorted(attribs.node.iterkeys()):
|
|
if attr.startswith("secret."):
|
|
yield confluent.messages.CryptedAttributes(
|
|
kv={attr: None},
|
|
desc=attribs.node[attr]['description']).html() + '<br>\n'
|
|
elif ('type' in attribs.node[attr] and
|
|
list == attribs.node[attr]['type']):
|
|
yield confluent.messages.ListAttributes(
|
|
kv={attr: []},
|
|
desc=attribs.node[attr]['description']).html() + '<br>\n'
|
|
else:
|
|
yield confluent.messages.Attributes(
|
|
kv={attr: None},
|
|
desc=attribs.node[attr]['description']).html() + '<br>\n'
|
|
|
|
|
|
def user_creation_resources():
|
|
credential = {
|
|
'uid': {
|
|
'description': (''),
|
|
},
|
|
'username': {
|
|
'description': (''),
|
|
},
|
|
'password': {
|
|
'description': (''),
|
|
},
|
|
'privilege_level': {
|
|
'description': (''),
|
|
},
|
|
}
|
|
for attr in sorted(credential.iterkeys()):
|
|
if attr == "password":
|
|
yield confluent.messages.CryptedAttributes(
|
|
kv={attr: None},
|
|
desc=credential[attr]['description']).html() + '<br>\n'
|
|
else:
|
|
yield confluent.messages.Attributes(
|
|
kv={attr: None},
|
|
desc=credential[attr]['description']).html() + '<br>\n'
|
|
|
|
|
|
create_resource_functions = {
|
|
'nodes': node_creation_resources,
|
|
'groups': group_creation_resources,
|
|
'users': user_creation_resources,
|
|
}
|
|
|
|
|
|
def _sessioncleaner():
|
|
while True:
|
|
currtime = time.time()
|
|
targsessions = []
|
|
for session in httpsessions:
|
|
if httpsessions[session]['expiry'] < currtime:
|
|
targsessions.append(session)
|
|
for session in targsessions:
|
|
forwarder.close_session(session)
|
|
del httpsessions[session]
|
|
targsessions = []
|
|
for session in consolesessions:
|
|
if consolesessions[session]['expiry'] < currtime:
|
|
targsessions.append(session)
|
|
for session in targsessions:
|
|
del consolesessions[session]
|
|
eventlet.sleep(10)
|
|
|
|
|
|
def _get_query_dict(env, reqbody, reqtype):
|
|
qdict = {}
|
|
try:
|
|
qstring = env['QUERY_STRING']
|
|
except KeyError:
|
|
qstring = None
|
|
if qstring:
|
|
for qpair in qstring.split('&'):
|
|
qkey, qvalue = qpair.split('=')
|
|
qdict[qkey] = qvalue
|
|
if reqbody is not None:
|
|
if "application/x-www-form-urlencoded" in reqtype:
|
|
pbody = urlparse.parse_qs(reqbody, True)
|
|
for ky in pbody.iterkeys():
|
|
if len(pbody[ky]) > 1: # e.g. REST explorer
|
|
na = [i for i in pbody[ky] if i != '']
|
|
qdict[ky] = na
|
|
else:
|
|
qdict[ky] = pbody[ky][0]
|
|
elif 'application/json' in reqtype:
|
|
pbody = json.loads(reqbody)
|
|
for key in pbody.iterkeys():
|
|
qdict[key] = pbody[key]
|
|
if 'restexplorerhonorkey' in qdict:
|
|
nqdict = {}
|
|
for key in qdict:
|
|
if key == 'restexplorerop':
|
|
nqdict[key] = qdict['restexplorerop']
|
|
continue
|
|
if key in qdict['restexplorerhonorkey']:
|
|
nqdict[key] = qdict[key]
|
|
qdict = nqdict
|
|
return qdict
|
|
|
|
def _should_skip_authlog(env):
|
|
if ('/console/session' in env['PATH_INFO'] or
|
|
'/shell/sessions/' in env['PATH_INFO']):
|
|
# we should only log starting of a console
|
|
return True
|
|
if '/sessions/current/async' in env['PATH_INFO']:
|
|
# this is effectively invisible
|
|
return True
|
|
if (env['REQUEST_METHOD'] == 'GET' and
|
|
('/sensors/' in env['PATH_INFO'] or
|
|
'/health/' in env['PATH_INFO'] or
|
|
'/power/state' in env['PATH_INFO'] or
|
|
'/nodes/' == env['PATH_INFO'] or
|
|
'/sessions/current/info' == env['PATH_INFO'] or
|
|
(env['PATH_INFO'].startswith('/noderange/') and
|
|
env['PATH_INFO'].endswith('/nodes/')))):
|
|
# these are pretty innocuous, and noisy to log.
|
|
return True
|
|
return False
|
|
|
|
|
|
def _csrf_exempt(path):
|
|
# first a get of info to get CSRF key, also '/forward/web' to enable
|
|
# the popup ability to just forward
|
|
return path == '/sessions/current/info' or path.endswith('/forward/web')
|
|
|
|
|
|
def _csrf_valid(env, session):
|
|
# This could be simplified into a statement, but this is more readable
|
|
# to have it broken out
|
|
if env['REQUEST_METHOD'] == 'GET' and _csrf_exempt(env['PATH_INFO']):
|
|
# Provide a web client a safe hook to request the CSRF token
|
|
# This means that we consider GET of /sessions/current/info to be
|
|
# a safe thing to inflict via CSRF, since CORS should prevent
|
|
# hypothetical attacker from reading the data and it has no
|
|
# side effects to speak of
|
|
return True
|
|
if 'csrftoken' not in session:
|
|
# The client has not (yet) requested CSRF protection
|
|
# so we return true
|
|
if 'HTTP_CONFLUENTAUTHTOKEN' in env:
|
|
# The client has requested CSRF countermeasures,
|
|
# oblige the request and apply a new token to the
|
|
# session
|
|
session['csrftoken'] = util.randomstring(32)
|
|
elif 'HTTP_REFERER' in env:
|
|
# If there is a referrer, make sure it stays consistent
|
|
# across the session. A change in referer is a bad thing
|
|
try:
|
|
referer = env['HTTP_REFERER'].split('/')[2]
|
|
except IndexError:
|
|
return False
|
|
if 'validreferer' not in session:
|
|
session['validreferer'] = referer
|
|
elif session['validreferer'] != referer:
|
|
return False
|
|
return True
|
|
# The session has CSRF protection enabled, only mark valid if
|
|
# the client has provided an auth token and that token matches the
|
|
# value protecting the session
|
|
return ('HTTP_CONFLUENTAUTHTOKEN' in env and
|
|
env['HTTP_CONFLUENTAUTHTOKEN'] == session['csrftoken'])
|
|
|
|
|
|
def _authorize_request(env, operation):
|
|
"""Grant/Deny access based on data from wsgi env
|
|
|
|
"""
|
|
authdata = None
|
|
name = ''
|
|
sessionid = None
|
|
cookie = Cookie.SimpleCookie()
|
|
if 'HTTP_COOKIE' in env:
|
|
#attempt to use the cookie. If it matches
|
|
cc = RobustCookie()
|
|
cc.load(env['HTTP_COOKIE'])
|
|
if 'confluentsessionid' in cc:
|
|
sessionid = cc['confluentsessionid'].value
|
|
sessid = sessionid
|
|
if sessionid in httpsessions:
|
|
if _csrf_valid(env, httpsessions[sessionid]):
|
|
if env['PATH_INFO'] == '/sessions/current/logout':
|
|
targets = []
|
|
for mythread in httpsessions[sessionid]['inflight']:
|
|
targets.append(mythread)
|
|
for mythread in targets:
|
|
eventlet.greenthread.kill(mythread)
|
|
forwarder.close_session(sessionid)
|
|
del httpsessions[sessionid]
|
|
return ('logout',)
|
|
httpsessions[sessionid]['expiry'] = time.time() + 90
|
|
name = httpsessions[sessionid]['name']
|
|
authdata = auth.authorize(
|
|
name, element=None,
|
|
skipuserobj=httpsessions[sessionid]['skipuserobject'])
|
|
if (not authdata) and 'HTTP_AUTHORIZATION' in env:
|
|
if env['PATH_INFO'] == '/sessions/current/logout':
|
|
if 'HTTP_REFERER' in env:
|
|
# note that this doesn't actually do harm
|
|
# otherwise, but this way do not give appearance
|
|
# of something having a side effect if it has the smell
|
|
# of a CSRF
|
|
return {'code': 401}
|
|
return ('logout',)
|
|
name, passphrase = base64.b64decode(
|
|
env['HTTP_AUTHORIZATION'].replace('Basic ', '')).split(':', 1)
|
|
authdata = auth.check_user_passphrase(name, passphrase, element=None)
|
|
if not authdata:
|
|
return {'code': 401}
|
|
sessid = util.randomstring(32)
|
|
while sessid in httpsessions:
|
|
sessid = util.randomstring(32)
|
|
httpsessions[sessid] = {'name': name, 'expiry': time.time() + 90,
|
|
'skipuserobject': authdata[4],
|
|
'inflight': set([])}
|
|
if 'HTTP_CONFLUENTAUTHTOKEN' in env:
|
|
httpsessions[sessid]['csrftoken'] = util.randomstring(32)
|
|
cookie['confluentsessionid'] = sessid
|
|
cookie['confluentsessionid']['secure'] = 1
|
|
cookie['confluentsessionid']['httponly'] = 1
|
|
cookie['confluentsessionid']['path'] = '/'
|
|
skiplog = _should_skip_authlog(env)
|
|
if authdata:
|
|
auditmsg = {
|
|
'user': name,
|
|
'operation': operation,
|
|
'target': env['PATH_INFO'],
|
|
}
|
|
authinfo = {'code': 200,
|
|
'cookie': cookie,
|
|
'cfgmgr': authdata[1],
|
|
'username': authdata[2],
|
|
'userdata': authdata[0]}
|
|
if authdata[3] is not None:
|
|
auditmsg['tenant'] = authdata[3]
|
|
authinfo['tenant'] = authdata[3]
|
|
auditmsg['user'] = authdata[2]
|
|
if sessid is not None:
|
|
authinfo['sessionid'] = sessid
|
|
if not skiplog:
|
|
auditlog.log(auditmsg)
|
|
if 'csrftoken' in httpsessions[sessid]:
|
|
authinfo['authtoken'] = httpsessions[sessid]['csrftoken']
|
|
return authinfo
|
|
else:
|
|
return {'code': 401}
|
|
# TODO(jbjohnso): actually evaluate the request for authorization
|
|
# In theory, the x509 or http auth stuff will get translated and then
|
|
# passed on to the core authorization function in an appropriate form
|
|
# expresses return in the form of http code
|
|
# 401 if there is no known identity
|
|
# 403 if valid identity, but no access
|
|
# going to run 200 just to get going for now
|
|
|
|
|
|
def _pick_mimetype(env):
|
|
"""Detect the http indicated mime to send back.
|
|
|
|
Note that as it gets into the ACCEPT header honoring, it only looks for
|
|
application/json and else gives up and assumes html. This is because
|
|
browsers are very chaotic about ACCEPT HEADER. It is assumed that
|
|
XMLHttpRequest.setRequestHeader will be used by clever javascript
|
|
if the '.json' scheme doesn't cut it.
|
|
"""
|
|
if env['PATH_INFO'].endswith('.json'):
|
|
return 'application/json; charset=utf-8', '.json'
|
|
elif env['PATH_INFO'].endswith('.html'):
|
|
return 'text/html', '.html'
|
|
elif 'HTTP_ACCEPT' in env and 'application/json' in env['HTTP_ACCEPT']:
|
|
return 'application/json; charset=utf-8', ''
|
|
else:
|
|
return 'text/html', ''
|
|
|
|
|
|
def _assign_consessionid(consolesession):
|
|
sessid = util.randomstring(32)
|
|
while sessid in consolesessions:
|
|
sessid = util.randomstring(32)
|
|
consolesessions[sessid] = {'session': consolesession,
|
|
'expiry': time.time() + 60}
|
|
return sessid
|
|
|
|
|
|
def resourcehandler(env, start_response):
|
|
try:
|
|
for rsp in resourcehandler_backend(env, start_response):
|
|
yield rsp
|
|
except:
|
|
tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event,
|
|
event=log.Events.stacktrace)
|
|
start_response('500 - Internal Server Error', [])
|
|
yield '500 - Internal Server Error'
|
|
return
|
|
|
|
|
|
def resourcehandler_backend(env, start_response):
|
|
"""Function to handle new wsgi requests
|
|
"""
|
|
mimetype, extension = _pick_mimetype(env)
|
|
headers = [('Content-Type', mimetype), ('Cache-Control', 'no-store'),
|
|
('Pragma', 'no-cache'),
|
|
('X-Content-Type-Options', 'nosniff'),
|
|
('Content-Security-Policy', "default-src 'self'"),
|
|
('X-XSS-Protection', '1'), ('X-Frame-Options', 'deny'),
|
|
('Strict-Transport-Security', 'max-age=86400'),
|
|
('X-Permitted-Cross-Domain-Policies', 'none')]
|
|
reqbody = None
|
|
reqtype = None
|
|
if 'CONTENT_LENGTH' in env and int(env['CONTENT_LENGTH']) > 0:
|
|
reqbody = env['wsgi.input'].read(int(env['CONTENT_LENGTH']))
|
|
reqtype = env['CONTENT_TYPE']
|
|
operation = opmap[env['REQUEST_METHOD']]
|
|
querydict = _get_query_dict(env, reqbody, reqtype)
|
|
if 'restexplorerop' in querydict:
|
|
operation = querydict['restexplorerop']
|
|
del querydict['restexplorerop']
|
|
authorized = _authorize_request(env, operation)
|
|
if 'logout' in authorized:
|
|
start_response('200 Successful logout', headers)
|
|
yield('{"result": "200 - Successful logout"}')
|
|
return
|
|
if 'HTTP_SUPPRESSAUTHHEADER' in env or 'HTTP_CONFLUENTAUTHTOKEN' in env:
|
|
badauth = [('Content-type', 'text/plain')]
|
|
else:
|
|
badauth = [('Content-type', 'text/plain'),
|
|
('WWW-Authenticate', 'Basic realm="confluent"')]
|
|
if authorized['code'] == 401:
|
|
start_response('401 Authentication Required', badauth)
|
|
yield 'authentication required'
|
|
return
|
|
if authorized['code'] == 403:
|
|
start_response('403 Forbidden', badauth)
|
|
yield 'authorization failed'
|
|
return
|
|
if authorized['code'] != 200:
|
|
raise Exception("Unrecognized code from auth engine")
|
|
headers.extend(
|
|
("Set-Cookie", m.OutputString())
|
|
for m in authorized['cookie'].values())
|
|
cfgmgr = authorized['cfgmgr']
|
|
if (operation == 'create') and env['PATH_INFO'] == '/sessions/current/async':
|
|
pagecontent = ""
|
|
try:
|
|
for rsp in _assemble_json(
|
|
confluent.asynchttp.handle_async(
|
|
env, querydict,
|
|
httpsessions[authorized['sessionid']]['inflight'])):
|
|
pagecontent += rsp
|
|
start_response("200 OK", headers)
|
|
yield pagecontent
|
|
return
|
|
except exc.ConfluentException as e:
|
|
if e.apierrorcode == 500:
|
|
# raise generics to trigger the tracelog
|
|
raise
|
|
start_response('{0} {1}'.format(e.apierrorcode, e.apierrorstr),
|
|
headers)
|
|
yield e.get_error_body()
|
|
elif (env['PATH_INFO'].endswith('/forward/web') and
|
|
env['PATH_INFO'].startswith('/nodes/')):
|
|
prefix, _, _ = env['PATH_INFO'].partition('/forward/web')
|
|
_, _, nodename = prefix.rpartition('/')
|
|
hm = cfgmgr.get_node_attributes(nodename, 'hardwaremanagement.manager')
|
|
targip = hm.get(nodename, {}).get(
|
|
'hardwaremanagement.manager', {}).get('value', None)
|
|
if not targip:
|
|
start_response('404 Not Found', headers)
|
|
yield 'No hardwaremanagemnet.manager defined for node'
|
|
return
|
|
funport = forwarder.get_port(targip, env['HTTP_X_FORWARDED_FOR'],
|
|
authorized['sessionid'])
|
|
host = env['HTTP_X_FORWARDED_HOST']
|
|
url = 'https://{0}:{1}/'.format(host, funport)
|
|
start_response('302', [('Location', url)])
|
|
yield 'Our princess is in another castle!'
|
|
return
|
|
elif (operation == 'create' and ('/console/session' in env['PATH_INFO'] or
|
|
'/shell/sessions/' in env['PATH_INFO'])):
|
|
#hard bake JSON into this path, do not support other incarnations
|
|
if '/console/session' in env['PATH_INFO']:
|
|
prefix, _, _ = env['PATH_INFO'].partition('/console/session')
|
|
shellsession = False
|
|
elif '/shell/sessions/' in env['PATH_INFO']:
|
|
prefix, _, _ = env['PATH_INFO'].partition('/shell/sessions')
|
|
shellsession = True
|
|
_, _, nodename = prefix.rpartition('/')
|
|
if 'session' not in querydict.keys() or not querydict['session']:
|
|
auditmsg = {
|
|
'operation': 'start',
|
|
'target': env['PATH_INFO'],
|
|
'user': authorized['username'],
|
|
}
|
|
if 'tenant' in authorized:
|
|
auditmsg['tenant'] = authorized['tenant']
|
|
auditlog.log(auditmsg)
|
|
# Request for new session
|
|
skipreplay = False
|
|
if 'skipreplay' in querydict and querydict['skipreplay']:
|
|
skipreplay = True
|
|
datacallback = None
|
|
async = None
|
|
if 'HTTP_CONFLUENTASYNCID' in env:
|
|
async = confluent.asynchttp.get_async(env, querydict)
|
|
termrel = async.set_term_relation(env)
|
|
datacallback = termrel.got_data
|
|
try:
|
|
if shellsession:
|
|
consession = shellserver.ShellSession(
|
|
node=nodename, configmanager=cfgmgr,
|
|
username=authorized['username'], skipreplay=skipreplay,
|
|
datacallback=datacallback
|
|
)
|
|
else:
|
|
consession = consoleserver.ConsoleSession(
|
|
node=nodename, configmanager=cfgmgr,
|
|
username=authorized['username'], skipreplay=skipreplay,
|
|
datacallback=datacallback
|
|
)
|
|
except exc.NotFoundException:
|
|
start_response("404 Not found", headers)
|
|
yield "404 - Request Path not recognized"
|
|
return
|
|
if not consession:
|
|
start_response("500 Internal Server Error", headers)
|
|
return
|
|
sessid = _assign_consessionid(consession)
|
|
if async:
|
|
async.add_console_session(sessid)
|
|
start_response('200 OK', headers)
|
|
yield '{"session":"%s","data":""}' % sessid
|
|
return
|
|
elif 'bytes' in querydict.keys(): # not keycodes...
|
|
myinput = querydict['bytes']
|
|
sessid = querydict['session']
|
|
if sessid not in consolesessions:
|
|
start_response('400 Expired Session', headers)
|
|
return
|
|
consolesessions[sessid]['expiry'] = time.time() + 90
|
|
consolesessions[sessid]['session'].write(myinput)
|
|
start_response('200 OK', headers)
|
|
yield json.dumps({'session': querydict['session']})
|
|
return # client has requests to send or receive, not both...
|
|
elif 'closesession' in querydict:
|
|
consolesessions[querydict['session']]['session'].destroy()
|
|
del consolesessions[querydict['session']]
|
|
start_response('200 OK', headers)
|
|
yield '{"sessionclosed": true}'
|
|
return
|
|
elif 'action' in querydict:
|
|
if querydict['action'] == 'break':
|
|
consolesessions[querydict['session']]['session'].send_break()
|
|
elif querydict['action'] == 'reopen':
|
|
consolesessions[querydict['session']]['session'].reopen()
|
|
else:
|
|
start_response('400 Bad Request')
|
|
yield 'Unrecognized action ' + querydict['action']
|
|
return
|
|
start_response('200 OK', headers)
|
|
yield json.dumps({'session': querydict['session']})
|
|
else: # no keys, but a session, means it's hooking to receive data
|
|
sessid = querydict['session']
|
|
if sessid not in consolesessions:
|
|
start_response('400 Expired Session', headers)
|
|
yield ''
|
|
return
|
|
consolesessions[sessid]['expiry'] = time.time() + 90
|
|
# add our thread to the 'inflight' to have a hook to terminate
|
|
# a long polling request
|
|
loggedout = None
|
|
mythreadid = greenlet.getcurrent()
|
|
httpsessions[authorized['sessionid']]['inflight'].add(mythreadid)
|
|
try:
|
|
outdata = consolesessions[sessid]['session'].get_next_output(
|
|
timeout=25)
|
|
except greenlet.GreenletExit as ge:
|
|
loggedout = ge
|
|
httpsessions[authorized['sessionid']]['inflight'].discard(
|
|
mythreadid)
|
|
if sessid not in consolesessions:
|
|
start_response('400 Expired Session', headers)
|
|
yield ''
|
|
return
|
|
if loggedout is not None:
|
|
consolesessions[sessid]['session'].destroy()
|
|
start_response('401 Logged out', headers)
|
|
yield '{"loggedout": 1}'
|
|
return
|
|
bufferage = False
|
|
if 'stampsent' not in consolesessions[sessid]:
|
|
consolesessions[sessid]['stampsent'] = True
|
|
bufferage = consolesessions[sessid]['session'].get_buffer_age()
|
|
if isinstance(outdata, dict):
|
|
rspdata = outdata
|
|
rspdata['session'] = querydict['session']
|
|
else:
|
|
rspdata = {'session': querydict['session'],
|
|
'data': outdata}
|
|
if bufferage is not False:
|
|
rspdata['bufferage'] = bufferage
|
|
try:
|
|
rsp = json.dumps(rspdata)
|
|
except UnicodeDecodeError:
|
|
try:
|
|
rsp = json.dumps(rspdata, encoding='cp437')
|
|
except UnicodeDecodeError:
|
|
rsp = json.dumps({'session': querydict['session'],
|
|
'data': 'DECODEERROR'})
|
|
start_response('200 OK', headers)
|
|
yield rsp
|
|
return
|
|
else:
|
|
# normal request
|
|
url = env['PATH_INFO']
|
|
url = url.replace('.json', '')
|
|
url = url.replace('.html', '')
|
|
if url == '/sessions/current/info':
|
|
start_response('200 OK', headers)
|
|
sessinfo = {'username': authorized['username']}
|
|
if 'authtoken' in authorized:
|
|
sessinfo['authtoken'] = authorized['authtoken']
|
|
yield json.dumps(sessinfo)
|
|
return
|
|
resource = '.' + url[url.rindex('/'):]
|
|
lquerydict = copy.deepcopy(querydict)
|
|
try:
|
|
hdlr = pluginapi.handle_path(url, operation,
|
|
cfgmgr, querydict)
|
|
if 'HTTP_CONFLUENTASYNCID' in env:
|
|
confluent.asynchttp.run_handler(hdlr, env)
|
|
start_response('202 Accepted', headers)
|
|
yield 'Request queued'
|
|
return
|
|
pagecontent = ""
|
|
if mimetype == 'text/html':
|
|
for datum in _assemble_html(hdlr, resource, lquerydict, url,
|
|
extension):
|
|
pagecontent += datum
|
|
else:
|
|
for datum in _assemble_json(hdlr, resource, url, extension):
|
|
pagecontent += datum
|
|
start_response('200 OK', headers)
|
|
yield pagecontent
|
|
except exc.ConfluentException as e:
|
|
if ((not isinstance(e, exc.LockedCredentials)) and
|
|
e.apierrorcode == 500):
|
|
# raise generics to trigger the tracelog
|
|
raise
|
|
start_response('{0} {1}'.format(e.apierrorcode, e.apierrorstr),
|
|
headers)
|
|
yield e.get_error_body()
|
|
|
|
def _assemble_html(responses, resource, querydict, url, extension):
|
|
yield '<html><head><meta charset="UTF-8"><title>' \
|
|
'Confluent REST Explorer: ' + url + '</title></head>' \
|
|
'<body><form action="' + \
|
|
resource + '" method="post">'
|
|
if querydict:
|
|
yield 'Response to input data:<br>' + \
|
|
json.dumps(querydict, separators=(',', ': '),
|
|
indent=4, sort_keys=True) + '<hr>'
|
|
yield 'Only fields that have their boxes checked will have their ' \
|
|
'respective values honored by the confluent server.<hr>' \
|
|
'<input type="hidden" name="restexplorerhonorkey" value="">' + \
|
|
'<a rel="self" href="{0}{1}">{0}{1}</a><br>'.format(
|
|
resource, extension)
|
|
if url == '/':
|
|
iscollection = True
|
|
elif resource[-1] == '/':
|
|
iscollection = True
|
|
yield '<a rel="collection" href="../{0}">../{0}</a><br>'.format(
|
|
extension)
|
|
else:
|
|
iscollection = False
|
|
yield '<a rel="collection" href="./{0}">./{0}</a><br>'.format(
|
|
extension)
|
|
pendingrsp = []
|
|
for rsp in responses:
|
|
if isinstance(rsp, confluent.messages.LinkRelation):
|
|
yield rsp.html(extension) + "<br>"
|
|
else:
|
|
pendingrsp.append(rsp)
|
|
for rsp in pendingrsp:
|
|
yield rsp.html() + "<br>"
|
|
if iscollection:
|
|
# localpath = url[:-2] (why was this here??)
|
|
try:
|
|
if url == '/users/':
|
|
return
|
|
firstpass = True
|
|
module = url.split('/')
|
|
if not module:
|
|
return
|
|
for y in create_resource_functions[module[-2]]():
|
|
if firstpass:
|
|
yield "<hr>Define new resource in %s:<BR>" % module[-2]
|
|
firstpass = False
|
|
yield y
|
|
yield ('<input value="create" name="restexplorerop" type="submit">'
|
|
'</form></body></html>')
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
yield ('<input value="update" name="restexplorerop" type="submit">'
|
|
'</form></body></html>')
|
|
|
|
|
|
def _assemble_json(responses, resource=None, url=None, extension=None):
|
|
#NOTE(jbjohnso) I'm considering giving up on yielding bit by bit
|
|
#in json case over http. Notably, duplicate key values from plugin
|
|
#overwrite, but we'd want to preserve them into an array instead.
|
|
#the downside is that http would just always blurt it ll out at
|
|
#once and hold on to all the data in memory
|
|
links = {}
|
|
if resource is not None:
|
|
links['self'] = {"href": resource + extension}
|
|
if url == '/':
|
|
pass
|
|
elif resource[-1] == '/':
|
|
links['collection'] = {"href": "../" + extension}
|
|
else:
|
|
links['collection'] = {"href": "./" + extension}
|
|
rspdata = {}
|
|
for rsp in responses:
|
|
if isinstance(rsp, confluent.messages.LinkRelation):
|
|
haldata = rsp.raw()
|
|
for hk in haldata.iterkeys():
|
|
if 'href' in haldata[hk]:
|
|
if isinstance(haldata[hk]['href'], int):
|
|
haldata[hk]['href'] = str(haldata[hk]['href'])
|
|
haldata[hk]['href'] += extension
|
|
if hk in links:
|
|
if isinstance(links[hk], list):
|
|
links[hk].append(haldata[hk])
|
|
else:
|
|
links[hk] = [links[hk], haldata[hk]]
|
|
elif hk == 'item':
|
|
links[hk] = [haldata[hk],]
|
|
else:
|
|
links[hk] = haldata[hk]
|
|
else:
|
|
rsp = rsp.raw()
|
|
for dk in rsp.iterkeys():
|
|
if dk in rspdata:
|
|
if isinstance(rspdata[dk], list):
|
|
rspdata[dk].append(rsp[dk])
|
|
else:
|
|
rspdata[dk] = [rspdata[dk], rsp[dk]]
|
|
else:
|
|
if dk == 'databynode' or dk == 'asyncresponse':
|
|
# a quirk, databynode suggests noderange
|
|
# multi response. This should *always* be a list,
|
|
# even if it will be length 1
|
|
rspdata[dk] = [rsp[dk]]
|
|
else:
|
|
rspdata[dk] = rsp[dk]
|
|
rspdata["_links"] = links
|
|
tlvdata.unicode_dictvalues(rspdata)
|
|
yield json.dumps(
|
|
rspdata, sort_keys=True, indent=4, ensure_ascii=False).encode('utf-8')
|
|
|
|
|
|
def serve(bind_host, bind_port):
|
|
# TODO(jbjohnso): move to unix socket and explore
|
|
# either making apache deal with it
|
|
# or just supporting nginx or lighthttpd
|
|
# for now, http port access
|
|
#scgi.WSGIServer(resourcehandler, bindAddress=("localhost",4004)).run()
|
|
#based on a bakeoff perf wise, eventlet http support proxied actually did
|
|
#edge out patched flup. unpatched flup was about the same as eventlet http
|
|
#but deps are simpler without flup
|
|
#also, the potential for direct http can be handy
|
|
#todo remains unix domain socket for even http
|
|
sock = None
|
|
while not sock:
|
|
try:
|
|
sock = eventlet.listen(
|
|
(bind_host, bind_port, 0, 0), family=socket.AF_INET6)
|
|
except socket.error as e:
|
|
if e.errno != 98:
|
|
raise
|
|
sys.stderr.write(
|
|
'Failed to open HTTP due to busy port, trying again in'
|
|
' a second\n')
|
|
eventlet.sleep(1)
|
|
eventlet.wsgi.server(sock, resourcehandler, log=False, log_output=False,
|
|
debug=False)
|
|
|
|
|
|
class HttpApi(object):
|
|
def __init__(self, bind_host=None, bind_port=None):
|
|
self.server = None
|
|
self.bind_host = bind_host or '::'
|
|
self.bind_port = bind_port or 4005
|
|
|
|
def start(self):
|
|
global auditlog
|
|
global tracelog
|
|
tracelog = log.Logger('trace')
|
|
auditlog = log.Logger('audit')
|
|
self.server = eventlet.spawn(serve, self.bind_host, self.bind_port)
|
|
|
|
|
|
_cleaner = eventlet.spawn(_sessioncleaner)
|