2013-08-09 16:59:08 -04:00
|
|
|
# Copyright (C) IBM 2013
|
|
|
|
# All rights reserved
|
|
|
|
# This SCGI server provides a http wrap to confluent api
|
|
|
|
# It additionally manages httprequest console sessions as supported by
|
|
|
|
# shillinabox javascript
|
|
|
|
import base64
|
2013-10-09 16:17:37 -04:00
|
|
|
import Cookie
|
2013-09-04 15:40:35 -04:00
|
|
|
import confluent.auth as auth
|
2013-11-02 13:25:56 -04:00
|
|
|
import confluent.consoleserver as consoleserver
|
2013-11-02 17:32:48 -04:00
|
|
|
import confluent.exceptions as exc
|
2013-11-03 17:07:17 -05:00
|
|
|
import confluent.messages
|
2013-10-12 18:04:27 -04:00
|
|
|
import confluent.pluginapi as pluginapi
|
2013-08-09 16:59:08 -04:00
|
|
|
import confluent.util as util
|
|
|
|
import eventlet
|
2013-09-13 16:07:39 -04:00
|
|
|
import json
|
2013-08-09 16:59:08 -04:00
|
|
|
import os
|
|
|
|
import string
|
2013-11-03 10:12:50 -05:00
|
|
|
import traceback
|
2013-10-09 16:17:37 -04:00
|
|
|
import time
|
2013-08-16 16:37:19 -04:00
|
|
|
import urlparse
|
2013-10-03 17:05:40 -04:00
|
|
|
import eventlet.wsgi
|
|
|
|
#scgi = eventlet.import_patched('flup.server.scgi')
|
2013-08-09 16:59:08 -04:00
|
|
|
|
|
|
|
|
|
|
|
consolesessions = {}
|
2013-10-09 16:17:37 -04:00
|
|
|
httpsessions = {}
|
2013-11-02 18:57:51 -04:00
|
|
|
opmap = {
|
|
|
|
'POST': 'create',
|
|
|
|
'GET': 'retrieve',
|
|
|
|
'PUT': 'update',
|
|
|
|
'DELETE': 'delete',
|
|
|
|
}
|
2013-08-09 16:59:08 -04:00
|
|
|
|
|
|
|
|
2013-10-09 16:17:37 -04:00
|
|
|
def _sessioncleaner():
|
|
|
|
while (1):
|
|
|
|
currtime = time.time()
|
|
|
|
for session in httpsessions.keys():
|
|
|
|
if httpsessions[session]['expiry'] < currtime:
|
|
|
|
del httpsessions[session]
|
|
|
|
for session in consolesessions.keys():
|
|
|
|
if consolesessions[session]['expiry'] < currtime:
|
|
|
|
del consolesessions[session]
|
|
|
|
eventlet.sleep(10)
|
|
|
|
|
|
|
|
|
|
|
|
def _get_query_dict(env, reqbody, reqtype):
|
2013-08-09 16:59:08 -04:00
|
|
|
qdict = {}
|
2013-10-09 16:17:37 -04:00
|
|
|
try:
|
|
|
|
qstring = env['QUERY_STRING']
|
|
|
|
except KeyError:
|
|
|
|
qstring = None
|
2013-09-12 16:54:39 -04:00
|
|
|
if qstring:
|
|
|
|
for qpair in qstring.split('&'):
|
|
|
|
qkey, qvalue = qpair.split('=')
|
|
|
|
qdict[qkey] = qvalue
|
2013-08-16 16:37:19 -04:00
|
|
|
if reqbody is not None:
|
2013-09-12 16:54:39 -04:00
|
|
|
if "application/x-www-form-urlencoded" in reqtype:
|
2013-11-03 08:44:28 -05:00
|
|
|
pbody = urlparse.parse_qs(reqbody, True)
|
2013-09-13 11:45:17 -04:00
|
|
|
for ky in pbody.iterkeys():
|
2013-11-03 01:11:19 -04:00
|
|
|
if len(pbody[ky]) > 1: # e.g. REST explorer
|
2013-11-13 14:52:32 -05:00
|
|
|
na = [i for i in pbody[ky] if i != '']
|
|
|
|
qdict[ky] = na
|
2013-11-03 01:11:19 -04:00
|
|
|
else:
|
|
|
|
qdict[ky] = pbody[ky][0]
|
2013-11-03 08:44:28 -05:00
|
|
|
if 'restexplorerhonorkey' in qdict:
|
|
|
|
nqdict = {}
|
|
|
|
for key in qdict:
|
|
|
|
if key == 'restexplorerop':
|
|
|
|
nqdict[key] = qdict['restexplorerop']
|
|
|
|
continue
|
|
|
|
if key in qdict['restexplorerhonorkey']:
|
|
|
|
nqdict[key] = qdict[key]
|
|
|
|
qdict = nqdict
|
2013-08-09 16:59:08 -04:00
|
|
|
return qdict
|
|
|
|
|
|
|
|
|
|
|
|
def _authorize_request(env):
|
|
|
|
"""Grant/Deny access based on data from wsgi env
|
|
|
|
|
|
|
|
"""
|
2013-10-09 16:17:37 -04:00
|
|
|
authdata = False
|
|
|
|
cookie = Cookie.SimpleCookie()
|
|
|
|
if 'HTTP_COOKIE' in env:
|
|
|
|
#attempt to use the cookie. If it matches
|
|
|
|
cc = Cookie.SimpleCookie()
|
|
|
|
cc.load(env['HTTP_COOKIE'])
|
|
|
|
if 'confluentsessionid' in cc:
|
|
|
|
sessionid = cc['confluentsessionid'].value
|
|
|
|
if sessionid in httpsessions:
|
|
|
|
httpsessions[sessionid]['expiry'] = time.time() + 90
|
|
|
|
name = httpsessions[sessionid]['name']
|
|
|
|
authdata = auth.authorize(name, element=None)
|
|
|
|
if authdata is False and 'HTTP_AUTHORIZATION' in env:
|
|
|
|
name, passphrase = base64.b64decode(
|
|
|
|
env['HTTP_AUTHORIZATION'].replace('Basic ','')).split(':',1)
|
|
|
|
authdata = auth.check_user_passphrase(name, passphrase, element=None)
|
|
|
|
sessid = util.randomstring(32)
|
|
|
|
while sessid in httpsessions:
|
|
|
|
sessid = util.randomstring(32)
|
|
|
|
httpsessions[sessid] = {'name': name, 'expiry': time.time() + 90}
|
|
|
|
cookie['confluentsessionid']=sessid
|
|
|
|
cookie['confluentsessionid']['secure'] = 1
|
|
|
|
cookie['confluentsessionid']['httponly'] = 1
|
|
|
|
cookie['confluentsessionid']['path'] = '/'
|
|
|
|
if authdata:
|
|
|
|
return {'code': 200,
|
|
|
|
'cookie': cookie,
|
|
|
|
'cfgmgr': authdata[1],
|
|
|
|
'userdata': authdata[0]}
|
|
|
|
else:
|
|
|
|
return {'code': 401}
|
2013-08-09 16:59:08 -04:00
|
|
|
# TODO(jbjohnso): actually evaluate the request for authorization
|
|
|
|
# In theory, the x509 or http auth stuff will get translated and then
|
|
|
|
# passed on to the core authorization function in an appropriate form
|
|
|
|
# expresses return in the form of http code
|
|
|
|
# 401 if there is no known identity
|
|
|
|
# 403 if valid identity, but no access
|
|
|
|
# going to run 200 just to get going for now
|
|
|
|
|
|
|
|
|
|
|
|
def _pick_mimetype(env):
|
|
|
|
"""Detect the http indicated mime to send back.
|
|
|
|
|
|
|
|
Note that as it gets into the ACCEPT header honoring, it only looks for
|
|
|
|
application/json and else gives up and assumes html. This is because
|
2013-08-18 19:11:49 -04:00
|
|
|
browsers are very chaotic about ACCEPT HEADER. It is assumed that
|
2013-08-09 16:59:08 -04:00
|
|
|
XMLHttpRequest.setRequestHeader will be used by clever javascript
|
|
|
|
if the '.json' scheme doesn't cut it.
|
|
|
|
"""
|
|
|
|
if env['PATH_INFO'].endswith('.json'):
|
2013-11-02 18:37:26 -04:00
|
|
|
return 'application/json; charset=utf-8'
|
2013-08-09 16:59:08 -04:00
|
|
|
elif env['PATH_INFO'].endswith('.html'):
|
|
|
|
return 'text/html'
|
|
|
|
elif 'application/json' in env['HTTP_ACCEPT']:
|
2013-11-02 18:37:26 -04:00
|
|
|
return 'application/json; charset=utf-8'
|
2013-08-09 16:59:08 -04:00
|
|
|
else:
|
|
|
|
return 'text/html'
|
|
|
|
|
|
|
|
|
|
|
|
def _assign_consessionid(consolesession):
|
2013-10-09 16:17:37 -04:00
|
|
|
sessid = util.randomstring(32)
|
2013-08-09 16:59:08 -04:00
|
|
|
while sessid in consolesessions.keys():
|
2013-10-09 16:17:37 -04:00
|
|
|
sessid = util.randomstring(32)
|
|
|
|
consolesessions[sessid] = {'session': consolesession,
|
|
|
|
'expiry': time.time() + 60}
|
2013-08-09 16:59:08 -04:00
|
|
|
return sessid
|
|
|
|
|
|
|
|
def resourcehandler(env, start_response):
|
|
|
|
"""Function to handle new wsgi requests
|
|
|
|
"""
|
|
|
|
authorized = _authorize_request(env)
|
|
|
|
mimetype = _pick_mimetype(env)
|
2013-08-16 16:37:19 -04:00
|
|
|
reqbody = None
|
|
|
|
reqtype = None
|
2013-09-04 15:40:35 -04:00
|
|
|
if 'CONTENT_LENGTH' in env and int(env['CONTENT_LENGTH']) > 0:
|
2013-08-16 16:37:19 -04:00
|
|
|
reqbody = env['wsgi.input'].read(int(env['CONTENT_LENGTH']))
|
|
|
|
reqtype = env['CONTENT_TYPE']
|
2013-09-04 15:40:35 -04:00
|
|
|
if authorized['code'] == 401:
|
|
|
|
start_response('401 Authentication Required',
|
|
|
|
[('Content-type', 'text/plain'),
|
2013-10-09 16:17:37 -04:00
|
|
|
('WWW-Authenticate', 'Basic realm="confluent"')])
|
2013-10-13 20:51:30 -04:00
|
|
|
yield 'authentication required'
|
|
|
|
return
|
2013-09-04 15:40:35 -04:00
|
|
|
if authorized['code'] == 403:
|
|
|
|
start_response('403 Forbidden',
|
|
|
|
[('Content-type', 'text/plain'),
|
2013-10-09 16:17:37 -04:00
|
|
|
('WWW-Authenticate', 'Basic realm="confluent"')])
|
2013-10-13 20:51:30 -04:00
|
|
|
yield 'authorization failed'
|
|
|
|
return
|
2013-09-04 15:40:35 -04:00
|
|
|
if authorized['code'] != 200:
|
|
|
|
raise Exception("Unrecognized code from auth engine")
|
2013-11-02 18:37:26 -04:00
|
|
|
headers = [('Content-Type', mimetype) ]
|
2013-10-09 16:17:37 -04:00
|
|
|
headers.extend(("Set-Cookie", m.OutputString())
|
|
|
|
for m in authorized['cookie'].values())
|
2013-09-06 16:15:59 -04:00
|
|
|
cfgmgr = authorized['cfgmgr']
|
2013-11-03 00:52:59 -04:00
|
|
|
operation = opmap[env['REQUEST_METHOD']]
|
2013-10-09 16:17:37 -04:00
|
|
|
querydict = _get_query_dict(env, reqbody, reqtype)
|
2013-11-03 01:11:19 -04:00
|
|
|
if 'restexplorerop' in querydict:
|
|
|
|
operation = querydict['restexplorerop']
|
|
|
|
del querydict['restexplorerop']
|
2013-08-09 16:59:08 -04:00
|
|
|
if '/console/session' in env['PATH_INFO']:
|
2013-08-16 16:37:19 -04:00
|
|
|
#hard bake JSON into this path, do not support other incarnations
|
2013-08-09 16:59:08 -04:00
|
|
|
prefix, _, _ = env['PATH_INFO'].partition('/console/session')
|
|
|
|
_, _, nodename = prefix.rpartition('/')
|
|
|
|
if 'session' not in querydict.keys() or not querydict['session']:
|
|
|
|
# Request for new session
|
2013-11-02 13:25:56 -04:00
|
|
|
consession = consoleserver.ConsoleSession(node=nodename,
|
2013-09-04 15:40:35 -04:00
|
|
|
configmanager=cfgmgr)
|
2013-08-09 16:59:08 -04:00
|
|
|
if not consession:
|
2013-10-09 16:17:37 -04:00
|
|
|
start_response("500 Internal Server Error", headers)
|
2013-08-09 16:59:08 -04:00
|
|
|
return
|
|
|
|
sessid = _assign_consessionid(consession)
|
2013-10-09 16:17:37 -04:00
|
|
|
start_response('200 OK', headers)
|
2013-10-13 20:51:30 -04:00
|
|
|
yield '{"session":"%s","data":""}' % sessid
|
|
|
|
return
|
2013-09-12 16:54:39 -04:00
|
|
|
elif 'keys' in querydict.keys():
|
|
|
|
# client wishes to push some keys into the remote console
|
|
|
|
input = ""
|
2013-09-13 16:07:39 -04:00
|
|
|
for idx in xrange(0, len(querydict['keys']), 2):
|
|
|
|
input += chr(int(querydict['keys'][idx:idx+2],16))
|
2013-09-12 16:54:39 -04:00
|
|
|
sessid = querydict['session']
|
2013-10-09 16:17:37 -04:00
|
|
|
consolesessions[sessid]['expiry'] = time.time() + 90
|
|
|
|
consolesessions[sessid]['session'].write(input)
|
|
|
|
start_response('200 OK', headers)
|
2013-10-13 20:51:30 -04:00
|
|
|
return # client has requests to send or receive, not both...
|
2013-09-12 16:54:39 -04:00
|
|
|
else: #no keys, but a session, means it's hooking to receive data
|
2013-09-13 11:45:17 -04:00
|
|
|
sessid = querydict['session']
|
2013-10-09 16:17:37 -04:00
|
|
|
consolesessions[sessid]['expiry'] = time.time() + 90
|
|
|
|
outdata = consolesessions[sessid]['session'].get_next_output(timeout=45)
|
2013-09-15 11:19:04 -04:00
|
|
|
try:
|
|
|
|
rsp = json.dumps({'session': querydict['session'], 'data': outdata})
|
|
|
|
except UnicodeDecodeError:
|
2013-09-20 14:36:55 -04:00
|
|
|
rsp = json.dumps({'session': querydict['session'], 'data': outdata}, encoding='cp437')
|
2013-09-15 11:19:04 -04:00
|
|
|
except UnicodeDecodeError:
|
|
|
|
rsp = json.dumps({'session': querydict['session'], 'data': 'DECODEERROR'})
|
2013-10-09 16:17:37 -04:00
|
|
|
start_response('200 OK', headers)
|
2013-10-13 20:51:30 -04:00
|
|
|
yield rsp
|
|
|
|
return
|
2013-10-12 18:04:27 -04:00
|
|
|
else:
|
2013-11-02 18:57:51 -04:00
|
|
|
# normal request
|
2013-11-03 00:02:13 -04:00
|
|
|
url = env['PATH_INFO']
|
|
|
|
url = url.replace('.json', '')
|
|
|
|
url = url.replace('.html', '')
|
|
|
|
resource = '.' + url[url.rindex('/'):]
|
2013-10-15 21:13:48 -04:00
|
|
|
try:
|
2013-11-03 00:02:13 -04:00
|
|
|
hdlr = pluginapi.handle_path(url, operation,
|
2013-11-02 19:45:10 -04:00
|
|
|
cfgmgr, querydict)
|
2013-11-02 17:32:48 -04:00
|
|
|
except exc.NotFoundException:
|
2013-10-15 21:13:48 -04:00
|
|
|
start_response('404 Not found', headers)
|
|
|
|
yield "404 - Request path not recognized"
|
|
|
|
return
|
2013-11-03 09:52:43 -05:00
|
|
|
except exc.InvalidArgumentException:
|
2013-11-03 10:12:50 -05:00
|
|
|
traceback.print_exc()
|
2013-11-03 09:52:43 -05:00
|
|
|
start_response('400 Bad Request', headers)
|
|
|
|
yield '400 - Bad Request'
|
|
|
|
return
|
2013-11-02 17:32:48 -04:00
|
|
|
start_response('200 OK', headers)
|
2013-11-02 18:37:26 -04:00
|
|
|
if mimetype == 'text/html':
|
2013-11-07 14:49:16 -05:00
|
|
|
for datum in _assemble_html(hdlr, resource, querydict, url):
|
2013-11-03 08:44:28 -05:00
|
|
|
yield datum
|
2013-11-02 18:37:26 -04:00
|
|
|
else:
|
2013-11-07 14:49:16 -05:00
|
|
|
for datum in _assemble_json(hdlr, resource, url):
|
2013-11-03 08:44:28 -05:00
|
|
|
yield datum
|
|
|
|
|
|
|
|
|
2013-11-07 14:49:16 -05:00
|
|
|
def _assemble_html(responses, resource, querydict, url):
|
2013-11-03 08:50:03 -05:00
|
|
|
yield '<html><head><title>'
|
|
|
|
yield 'Confluent REST Explorer: ' + resource + '</title></head>'
|
|
|
|
yield '<body><form action="' + resource + '" method="post">'
|
2013-11-03 09:05:50 -05:00
|
|
|
if querydict:
|
|
|
|
yield 'Response to input data:<br>'
|
|
|
|
yield json.dumps(querydict, separators=(',', ': '),
|
|
|
|
indent=4, sort_keys=True)
|
|
|
|
yield '<hr>'
|
2013-11-03 10:39:37 -05:00
|
|
|
yield 'Only fields that have their boxes checked will have their '
|
2013-11-03 08:50:03 -05:00
|
|
|
yield 'respective values honored by the confluent server.<hr>'
|
2013-11-03 08:44:28 -05:00
|
|
|
yield '<input type="hidden" name="restexplorerop" value="update">'
|
|
|
|
yield '<input type="hidden" name="restexplorerhonorkey" value="">'
|
2013-11-04 10:20:51 -05:00
|
|
|
yield '<a rel="self" href="%s">%s</a><br>' % (resource, resource)
|
2013-11-07 14:49:16 -05:00
|
|
|
if url == '/':
|
2013-11-07 14:39:34 -05:00
|
|
|
pass
|
2013-11-07 14:49:16 -05:00
|
|
|
elif resource[-1] == '/':
|
2013-11-04 10:20:51 -05:00
|
|
|
yield '<a rel="collection" href="../">../</a><br>'
|
|
|
|
else:
|
|
|
|
yield '<a rel="collection" href="./">./</a><br>'
|
|
|
|
pendingrsp = []
|
2013-11-03 08:44:28 -05:00
|
|
|
for rsp in responses:
|
2013-11-04 10:20:51 -05:00
|
|
|
if isinstance(rsp, confluent.messages.LinkRelation):
|
|
|
|
yield rsp.html()
|
|
|
|
yield "<br>"
|
|
|
|
else:
|
|
|
|
pendingrsp.append(rsp)
|
|
|
|
for rsp in pendingrsp:
|
2013-11-03 08:44:28 -05:00
|
|
|
yield rsp.html()
|
|
|
|
yield "<br>"
|
2013-11-04 10:20:51 -05:00
|
|
|
yield '<input value="PUT" type="submit"></form></body></html>'
|
2013-11-03 08:44:28 -05:00
|
|
|
|
|
|
|
|
2013-11-07 14:49:16 -05:00
|
|
|
def _assemble_json(responses, resource, url):
|
2013-11-04 09:53:16 -05:00
|
|
|
#NOTE(jbjohnso) I'm considering giving up on yielding bit by bit
|
|
|
|
#in json case over http. Notably, duplicate key values from plugin
|
|
|
|
#overwrite, but we'd want to preserve them into an array instead.
|
|
|
|
#the downside is that http would just always blurt it ll out at
|
|
|
|
#once and hold on to all the data in memory
|
2013-11-03 08:44:28 -05:00
|
|
|
docomma = False
|
2013-11-03 17:07:17 -05:00
|
|
|
links = {
|
2013-11-04 09:53:16 -05:00
|
|
|
'self': ['{"href":"%s"}' % resource],
|
2013-11-03 17:07:17 -05:00
|
|
|
}
|
2013-11-07 14:49:16 -05:00
|
|
|
if url == '/':
|
|
|
|
pass
|
|
|
|
elif resource[-1] == '/':
|
2013-11-04 10:20:51 -05:00
|
|
|
links['collection'] = ['{"href":"%s"}' % '../']
|
|
|
|
else:
|
|
|
|
links['collection'] = ['{"href":"%s"}' % './']
|
2013-11-03 17:07:17 -05:00
|
|
|
yield '{'
|
|
|
|
hadrsp = False
|
2013-11-03 08:44:28 -05:00
|
|
|
for rsp in responses:
|
2013-11-03 17:07:17 -05:00
|
|
|
if isinstance(rsp, confluent.messages.LinkRelation):
|
|
|
|
haldata = rsp.json_hal()
|
|
|
|
for hk in haldata.iterkeys():
|
|
|
|
if hk in links:
|
|
|
|
links[hk].append(haldata[hk])
|
|
|
|
else:
|
|
|
|
links[hk] = [haldata[hk]]
|
|
|
|
continue
|
|
|
|
hadrsp = True
|
2013-11-03 08:44:28 -05:00
|
|
|
if docomma:
|
|
|
|
yield ','
|
|
|
|
else:
|
|
|
|
docomma = True
|
|
|
|
yield rsp.json()
|
2013-11-03 17:07:17 -05:00
|
|
|
docomma = False
|
|
|
|
if hadrsp:
|
|
|
|
yield ','
|
|
|
|
yield '"_links": {'
|
|
|
|
groupcomma = False
|
|
|
|
for link in links.iterkeys():
|
|
|
|
if groupcomma:
|
|
|
|
yield ','
|
|
|
|
else:
|
|
|
|
groupcomma = True
|
|
|
|
yield json.dumps(link) + ":"
|
|
|
|
if len(links[link]) == 1:
|
|
|
|
yield links[link][0]
|
|
|
|
else:
|
|
|
|
yield '['
|
|
|
|
for lk in links[link]:
|
|
|
|
if docomma:
|
|
|
|
yield ','
|
|
|
|
else:
|
|
|
|
docomma = True
|
|
|
|
yield lk
|
|
|
|
yield ']'
|
|
|
|
yield '}'
|
|
|
|
yield '}'
|
2013-08-09 16:59:08 -04:00
|
|
|
|
2013-09-14 20:21:58 -04:00
|
|
|
|
|
|
|
def serve():
|
|
|
|
# TODO(jbjohnso): move to unix socket and explore
|
|
|
|
# either making apache deal with it
|
|
|
|
# or just supporting nginx or lighthttpd
|
|
|
|
# for now, http port access
|
2013-10-03 17:05:40 -04:00
|
|
|
#scgi.WSGIServer(resourcehandler, bindAddress=("localhost",4004)).run()
|
|
|
|
#based on a bakeoff perf wise, eventlet http support proxied actually did
|
|
|
|
#edge out patched flup. unpatched flup was about the same as eventlet http
|
|
|
|
#but deps are simpler without flup
|
|
|
|
#also, the potential for direct http can be handy
|
|
|
|
#todo remains unix domain socket for even http
|
|
|
|
eventlet.wsgi.server(eventlet.listen(("",4005)),resourcehandler)
|
2013-09-14 20:21:58 -04:00
|
|
|
|
|
|
|
|
2013-08-09 16:59:08 -04:00
|
|
|
class HttpApi(object):
|
|
|
|
def start(self):
|
2013-09-14 20:21:58 -04:00
|
|
|
self.server = eventlet.spawn(serve)
|
2013-08-09 16:59:08 -04:00
|
|
|
|
2013-10-09 16:17:37 -04:00
|
|
|
_cleaner = eventlet.spawn(_sessioncleaner)
|
|
|
|
|
2013-08-09 16:59:08 -04:00
|
|
|
|
|
|
|
|
|
|
|
|