2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-04-18 02:59:34 +00:00

More work toward asyncio

aiohttp now covers a lot of httpapi GET, and some of websocket.
This commit is contained in:
Jarrod Johnson 2024-04-03 16:58:40 -04:00
parent 198ffb8be6
commit 587ccd13cc
4 changed files with 276 additions and 255 deletions

View File

@ -84,9 +84,9 @@ class AsyncSession(object):
if not wshandler:
self.reaper = eventlet.spawn_after(15, self.destroy)
def add(self, requestid, rsp):
async def add(self, requestid, rsp):
if self.wshandler:
self.wshandler(messages.AsyncMessage((requestid, rsp)))
await self.wshandler(messages.AsyncMessage((requestid, rsp)))
if self.responses is None:
return
self.responses.append((requestid, rsp))
@ -116,10 +116,10 @@ class AsyncSession(object):
self.responses = None
del _asyncsessions[self.asyncid]
def run_handler(self, handler, requestid):
async def run_handler(self, handler, requestid):
try:
for rsp in handler:
self.add(requestid, rsp)
await self.add(requestid, rsp)
self.add(requestid, messages.AsyncCompletion())
except Exception as e:
self.add(requestid, e)
@ -146,7 +146,7 @@ class AsyncSession(object):
yield self.responses.popleft()
def run_handler(hdlr, env):
async def run_handler(hdlr, env):
asyncsessid = env['HTTP_CONFLUENTASYNCID']
try:
asyncsession = _asyncsessions[asyncsessid]['asyncsession']

View File

@ -1,3 +1,4 @@
import asyncio
import code
import sys
@ -6,26 +7,29 @@ import sys
# since we have to asyncio up the input and output, we use InteractiveInterpreter and handle the
# input ourselves, since code is not asyncio friendly in and of itself
#code.InteractiveConsole().interact()
prompt = '>>> '
itr = code.InteractiveInterpreter()
while True:
sys.stdout.write(prompt)
prompt = '... '
sys.stdout.flush()
newinput = input()
somecode += newinput + '\n'
if newinput.startswith(' '):
async def interact(sock):
cloop = asyncio.get_event_loop()
prompt = '>>> '
itr = code.InteractiveInterpreter()
while True:
await cloop.sock_sendall(prompt)
prompt = '... '
continue
try:
compcode = code.compile_command(somecode)
except SyntaxError as e:
print(repr(e))
compcode = None
somecode = ''
prompt = '>>> '
if compcode:
itr.runcode(compcode)
somecode = ''
prompt = '>>> '
newinput = b''
while b'\n' not in newinput:
newinput += await cloop.sock_recv()
somecode += newinput
if newinput.startswith(' '):
prompt = '... '
continue
try:
compcode = code.compile_command(somecode)
except SyntaxError as e:
await cloop.sock_sendall(repr(e).encode('utf8'))
compcode = None
somecode = ''
prompt = '>>> '
if compcode:
itr.runcode(compcode)
somecode = ''
prompt = '>>> '

View File

@ -25,6 +25,8 @@ try:
import confluent.webauthn as webauthn
except ImportError:
webauthn = None
import asyncio
from aiohttp import web, web_urldispatcher, connector, ClientSession
import confluent.auth as auth
import confluent.config.attributes as attribs
import confluent.config.configmanager as configmanager
@ -55,7 +57,6 @@ except ModuleNotFoundError:
import urllib.parse as urlparse
import eventlet.websocket
import eventlet.wsgi
#scgi = eventlet.import_patched('flup.server.scgi')
tlvdata = confluent.tlvdata
@ -167,16 +168,10 @@ def _sessioncleaner():
eventlet.sleep(10)
def _get_query_dict(env, reqbody, reqtype):
def _get_query_dict(req, reqbody, reqtype):
qdict = {}
try:
qstring = env['QUERY_STRING']
except KeyError:
qstring = None
if qstring:
for qpair in qstring.split('&'):
qkey, qvalue = qpair.split('=')
qdict[qkey] = qvalue
qstring = req.rel_url.query_string
qdict.update(req.rel_url.query)
if reqbody is not None:
if "application/x-www-form-urlencoded" in reqtype:
if not isinstance(reqbody, str):
@ -205,24 +200,25 @@ def _get_query_dict(env, reqbody, reqtype):
qdict = nqdict
return qdict
def _should_skip_authlog(env):
if ('/console/session' in env['PATH_INFO'] or
'/shell/sessions/' in env['PATH_INFO']):
def _should_skip_authlog(req):
thepath = req.rel_url.path
if ('/console/session' in thepath or
'/shell/sessions/' in thepath):
# we should only log starting of a console
return True
if '/sessions/current/async' in env['PATH_INFO']:
if '/sessions/current/async' in thepath:
# this is effectively invisible
return True
if '/sessions/current/webauthn/registered_credentials' in env['PATH_INFO']:
if '/sessions/current/webauthn/registered_credentials' in thepath:
return True
if (env['REQUEST_METHOD'] == 'GET' and
('/sensors/' in env['PATH_INFO'] or
'/health/' in env['PATH_INFO'] or
'/power/state' in env['PATH_INFO'] or
'/nodes/' == env['PATH_INFO'] or
'/sessions/current/info' == env['PATH_INFO'] or
(env['PATH_INFO'].startswith('/noderange/') and
env['PATH_INFO'].endswith('/nodes/')))):
if (req.method == 'GET' and
('/sensors/' in thepath or
'/health/' in thepath or
'/power/state' in thepath or
'/nodes/' == thepath or
'/sessions/current/info' == thepath or
(thepath.startswith('/noderange/') and
thepath.endswith('/nodes/')))):
# these are pretty innocuous, and noisy to log.
return True
return False
@ -234,10 +230,10 @@ def _csrf_exempt(path):
return path == '/sessions/current/info' or path.endswith('/forward/web')
def _csrf_valid(env, session):
def _csrf_valid(req, session):
# This could be simplified into a statement, but this is more readable
# to have it broken out
if env['REQUEST_METHOD'] == 'GET' and _csrf_exempt(env['PATH_INFO']):
if req.method == 'GET' and _csrf_exempt(req.rel_url.path):
# Provide a web client a safe hook to request the CSRF token
# This means that we consider GET of /sessions/current/info to be
# a safe thing to inflict via CSRF, since CORS should prevent
@ -247,16 +243,17 @@ def _csrf_valid(env, session):
if 'csrftoken' not in session:
# The client has not (yet) requested CSRF protection
# so we return true
if 'HTTP_CONFLUENTAUTHTOKEN' in env:
if 'ConfluentAuthToken' in req.headers:
# The client has requested CSRF countermeasures,
# oblige the request and apply a new token to the
# session
session['csrftoken'] = util.randomstring(32)
elif 'HTTP_REFERER' in env:
elif 'Referer' in req.headers:
print('refererrrrrrrr')
# If there is a referrer, make sure it stays consistent
# across the session. A change in referer is a bad thing
try:
referer = env['HTTP_REFERER'].split('/')[2]
referer = req.headers['Referer'].split('/')[2]
except IndexError:
return False
if 'validreferer' not in session:
@ -267,12 +264,12 @@ def _csrf_valid(env, session):
# The session has CSRF protection enabled, only mark valid if
# the client has provided an auth token and that token matches the
# value protecting the session
return ('HTTP_CONFLUENTAUTHTOKEN' in env and
env['HTTP_CONFLUENTAUTHTOKEN'] == session['csrftoken'])
return ('ConfluentAuthToken' in req.headers and
req.headers['ConfluentAuthToken'] == session['csrftoken'])
def _authorize_request(env, operation, reqbody):
"""Grant/Deny access based on data from wsgi env
async def _authorize_request(req, operation, reqbody):
"""Grant/Deny access based on data from request
"""
authdata = None
@ -280,7 +277,7 @@ def _authorize_request(env, operation, reqbody):
sessionid = None
sessid = None
cookie = Cookie.SimpleCookie()
element = env['PATH_INFO']
element = req.rel_url.path
if element.startswith('/sessions/current/'):
if (element.startswith('/sessions/current/webauthn/registered_credentials/')
or element.startswith('/sessions/current/webauthn/validate/')):
@ -289,18 +286,16 @@ def _authorize_request(env, operation, reqbody):
else:
element = None
if not authdata:
if 'HTTP_CONFLUENTSESSION' in env:
sessionid = env['HTTP_CONFLUENTSESSION']
if 'ConfluentSession' in req.headers:
sessionid = req.headers['ConfluentSession']
sessid = sessionid
elif 'confluentsessionid' in req.cookies:
sessionid = req.cookies['confluentsessionid']
sessid = sessionid
elif 'HTTP_COOKIE' in env:
cidx = (env['HTTP_COOKIE']).find('confluentsessionid=')
if cidx >= 0:
sessionid = env['HTTP_COOKIE'][cidx+19:cidx+51]
sessid = sessionid
if sessionid:
if sessionid in httpsessions:
if _csrf_valid(env, httpsessions[sessionid]):
if env['PATH_INFO'] == '/sessions/current/logout':
if _csrf_valid(req, httpsessions[sessionid]):
if element == '/sessions/current/logout':
targets = []
for mythread in httpsessions[sessionid]['inflight']:
targets.append(mythread)
@ -314,24 +309,24 @@ def _authorize_request(env, operation, reqbody):
authdata = auth.authorize(
name, element=element, operation=operation,
skipuserobj=httpsessions[sessionid]['skipuserobject'])
if (not authdata) and 'HTTP_AUTHORIZATION' in env:
if env['PATH_INFO'] == '/sessions/current/logout':
if 'HTTP_REFERER' in env:
if (not authdata) and 'Authorization' in req.headers:
if element == '/sessions/current/logout':
if 'Referer' in req.headers:
# note that this doesn't actually do harm
# otherwise, but this way do not give appearance
# of something having a side effect if it has the smell
# of a CSRF
return {'code': 401}
return ('logout',)
if env['HTTP_AUTHORIZATION'].startswith('MultiBasic '):
if req.headers['Authorization'].startswith('MultiBasic '):
name, passphrase = base64.b64decode(
env['HTTP_AUTHORIZATION'].replace('MultiBasic ', '')).split(b':', 1)
req.headers['Authorization'].replace('MultiBasic ', '')).split(b':', 1)
passphrase = json.loads(passphrase)
else:
name, passphrase = base64.b64decode(
env['HTTP_AUTHORIZATION'].replace('Basic ', '')).split(b':', 1)
req.headers['Authorization'].replace('Basic ', '')).split(b':', 1)
try:
authdata = auth.check_user_passphrase(name, passphrase, operation=operation, element=element)
authdata = await auth.check_user_passphrase(name, passphrase, operation=operation, element=element)
except Exception as e:
if hasattr(e, 'prompts'):
return {'code': 403, 'prompts': e.prompts}
@ -340,19 +335,20 @@ def _authorize_request(env, operation, reqbody):
return {'code': 403}
elif not authdata:
return {'code': 401}
sessid = _establish_http_session(env, authdata, name, cookie)
print(repr(authdata))
sessid = _establish_http_session(req, authdata, name, cookie)
if authdata and element and element.startswith('/sessions/current/webauthn/validate/'):
if webauthn:
for rsp in webauthn.handle_api_request(element, env, None, authdata[2], authdata[1], None, reqbody, None):
if rsp['verified']:
sessid = _establish_http_session(env, authdata, name, cookie)
break
skiplog = _should_skip_authlog(env)
skiplog = _should_skip_authlog(req)
if authdata:
auditmsg = {
'user': util.stringify(name),
'operation': operation,
'target': env['PATH_INFO'],
'target': element,
}
authinfo = {'code': 200,
'cookie': cookie,
@ -376,14 +372,14 @@ def _authorize_request(env, operation, reqbody):
else:
return {'code': 403}
def _establish_http_session(env, authdata, name, cookie):
def _establish_http_session(req, authdata, name, cookie):
sessid = util.randomstring(32)
while sessid in httpsessions:
sessid = util.randomstring(32)
httpsessions[sessid] = {'name': name, 'expiry': time.time() + 90,
'skipuserobject': authdata[4],
'inflight': set([])}
if 'HTTP_CONFLUENTAUTHTOKEN' in env:
if 'ConfluentAuthToken' in req.headers:
httpsessions[sessid]['csrftoken'] = util.randomstring(32)
cookie['confluentsessionid'] = util.stringify(sessid)
cookie['confluentsessionid']['secure'] = 1
@ -392,7 +388,7 @@ def _establish_http_session(env, authdata, name, cookie):
return sessid
def _pick_mimetype(env):
def _pick_mimetype(req):
"""Detect the http indicated mime to send back.
Note that as it gets into the ACCEPT header honoring, it only looks for
@ -401,11 +397,11 @@ def _pick_mimetype(env):
XMLHttpRequest.setRequestHeader will be used by clever javascript
if the '.json' scheme doesn't cut it.
"""
if env['PATH_INFO'].endswith('.json'):
if req.rel_url.path.endswith('.json'):
return 'application/json; charset=utf-8', '.json'
elif env['PATH_INFO'].endswith('.html'):
elif req.rel_url.path.endswith('.html'):
return 'text/html', '.html'
elif 'HTTP_ACCEPT' in env and 'application/json' in env['HTTP_ACCEPT']:
elif 'Accept' in req.headers and 'application/json' in req.headers['Accept']:
return 'application/json; charset=utf-8', ''
else:
return 'text/html', ''
@ -438,45 +434,52 @@ def datacallback_bound(clientsessid, ws):
ws.send('${0}$'.format(clientsessid) + data)
return datacallback
@eventlet.websocket.WebSocketWSGI.configured(
supported_protocols=['confluent.console', 'confluent.asyncweb'])
def wsock_handler(ws):
sessid = ws.wait()
async def wsock_handler(req):
rsp = web.WebSocketResponse(
heartbeat=25.0,
protocols=('confluent.console', 'confluent.asyncweb'))
await rsp.prepare(req)
sessid = await rsp.receive()
if not sessid:
return
sessid = sessid.data
sessid = sessid.replace('ConfluentSessionId:', '')
sessid = sessid[:-1]
currsess = httpsessions.get(sessid, None)
if not currsess:
return
authtoken = ws.wait()
authtoken = await rsp.receive()
authtoken = authtoken.data
authtoken = authtoken.replace('ConfluentAuthToken:', '')
authtoken = authtoken[:-1]
if currsess['csrftoken'] != authtoken:
return
mythreadid = greenlet.getcurrent()
httpsessions[sessid]['inflight'].add(mythreadid)
httpsessions[sessid]['inflight'].add(rsp)
name = httpsessions[sessid]['name']
authdata = auth.authorize(name, ws.path, operation='start')
print(req.rel_url.path)
authdata = auth.authorize(name, req.rel_url.path, operation='start')
if not authdata:
return
cfgmgr = httpsessions[sessid]['cfgmgr']
username = httpsessions[sessid]['name']
if ws.path == '/sessions/current/async':
if req.rel_url.path == '/sessions/current/async':
myconsoles = {}
def asyncwscallback(rsp):
async def asyncwscallback(rsp):
rsp = json.dumps(rsp.raw())
ws.send(u'!' + rsp)
mythreadid = greenlet.getcurrent()
currsess['inflight'].add(mythreadid)
await rsp.send_str(u'!' + rsp)
currsess['inflight'].add(rsp)
asess = None
try:
for asess in confluent.asynchttp.handle_async(
{}, {}, currsess['inflight'], asyncwscallback):
ws.send(u' ASYNCID: {0}'.format(asess.asyncid))
await rsp.send_str(u' ASYNCID: {0}'.format(asess.asyncid))
clientmsg = True
while clientmsg:
clientmsg = ws.wait()
clientmsg = await rsp.receive()
clientmsg = clientmsg.data
if clientmsg:
if clientmsg[0] == '?':
ws.send('?')
@ -580,7 +583,6 @@ def wsock_handler(ws):
)
except exc.NotFoundException:
return
mythreadid = greenlet.getcurrent()
currsess['inflight'].add(mythreadid)
clientmsg = ws.wait()
try:
@ -603,47 +605,62 @@ def wsock_handler(ws):
consession.destroy()
def resourcehandler(env, start_response):
async def resourcehandler(request):
# start_response is akin to doing headers
# and calling 'prepare() on a 'StreamResponse'
# any 'yield' needs to become a write to the streamresponse
async def make_response(mimetype, status=200, reason=None, headers=None, cookies=None):
rspheaders = {
'Cache-Control': 'no-store',
'Pragma': 'no-cache',
'X-Content-Type-Options': 'nosniff',
'Content-Security-Policy': "default-src 'self'",
'X-XSS-Protection': '1; mode=block',
'X-Frame-Options': 'deny',
'Strict-Transport-Security': 'max-age=86400',
'X-Permitted-Cross-Domain-Policies': 'none',
}
if headers:
rspheaders.update(headers)
rsp = web.StreamResponse(status=status, reason=reason, headers=rspheaders)
if cookies:
rsp.cookies.update(cookies)
rsp.content_type = mimetype
await rsp.prepare(request)
return rsp
try:
if 'HTTP_SEC_WEBSOCKET_VERSION' in env:
for rsp in wsock_handler(env, start_response):
yield rsp
if 'Sec-WebSocket-Version' in request.headers:
print('WebSocket....')
return await wsock_handler(request)
else:
for rsp in resourcehandler_backend(env, start_response):
yield rsp
return await resourcehandler_backend(request, make_response)
except Exception as e:
tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event,
event=log.Events.stacktrace)
start_response('500 - ' + str(e), [])
yield '500 - ' + str(e)
return
#start_response('500 - ' + str(e), [])
rsp = web.StreamResponse(status=500, reason=str(e))
await rsp.prepare(request)
await rsp.write(str(e).encode('utf8'))
return rsp
def resourcehandler_backend(env, start_response):
async def resourcehandler_backend(req, make_response):
"""Function to handle new wsgi requests
"""
mimetype, extension = _pick_mimetype(env)
headers = [('Content-Type', mimetype), ('Cache-Control', 'no-store'),
('Pragma', 'no-cache'),
('X-Content-Type-Options', 'nosniff'),
('Content-Security-Policy', "default-src 'self'"),
('X-XSS-Protection', '1; mode=block'), ('X-Frame-Options', 'deny'),
('Strict-Transport-Security', 'max-age=86400'),
('X-Permitted-Cross-Domain-Policies', 'none')]
mimetype, extension = _pick_mimetype(req)
reqbody = None
reqtype = None
if env.get('PATH_INFO', '').startswith('/self/'):
for res in selfservice.handle_request(env, start_response):
yield res
return
reqpath = env.get('PATH_INFO', '')
reqpath = req.rel_url.path
if reqpath.startswith('/self/'):
return await selfservice.handle_request(req, make_response)
if reqpath.startswith('/boot/'):
request = env['PATH_INFO'].split('/')
request = reqpath.split('/')
if not request[0]:
request = request[1:]
if len(request) != 4:
start_response('400 Bad Request', headers)
yield ''
return
return await make_response(mimetype, 400, 'Bad Request')
if request[1] == 'by-mac':
mac = request[2].replace('-', ':')
nodename = disco.get_node_by_uuid_or_mac(mac)
@ -657,38 +674,34 @@ def resourcehandler_backend(env, start_response):
nodec = cfg.get_node_attributes(nodename, 'deployment.pendingprofile')
pprofile = nodec.get(nodename, {}).get('deployment.pendingprofile', {}).get('value', None)
if not pprofile:
start_response('404 Not Found', headers)
yield ''
return
return await make_response(mimetype, 404, 'Not Found')
redir = '/confluent-public/os/{0}/boot.{1}'.format(pprofile, bootfile)
headers.append(('Location', redir))
start_response('302 Found', headers)
yield ''
rsp = make_response(mimetype, 302, 'Found', {'Location': redir})
return
if 'CONTENT_LENGTH' in env and int(env['CONTENT_LENGTH']) > 0:
if req.content_length:
reqbody = env['wsgi.input'].read(int(env['CONTENT_LENGTH']))
reqtype = env['CONTENT_TYPE']
operation = opmap[env['REQUEST_METHOD']]
querydict = _get_query_dict(env, reqbody, reqtype)
operation = opmap[req.method]
querydict = _get_query_dict(req, reqbody, reqtype)
if operation != 'retrieve' and 'restexplorerop' in querydict:
operation = querydict['restexplorerop']
del querydict['restexplorerop']
authorized = _authorize_request(env, operation, reqbody)
authorized = await _authorize_request(req, operation, reqbody)
if 'logout' in authorized:
start_response('200 Successful logout', headers)
yield('{"result": "200 - Successful logout"}')
rsp = await make_response("application/json", 200, 'Successful logout')
await rsp.write(b'{"result": "200 - Successful logout"}')
return
if 'HTTP_SUPPRESSAUTHHEADER' in env or 'HTTP_CONFLUENTAUTHTOKEN' in env:
badauth = [('Content-type', 'text/plain')]
if 'SuppressAuthHeader' in req.headers or 'ConfluentAuthToken' in req.headers:
badauth = {'Content-type': 'text/plain'}
else:
badauth = [('Content-type', 'text/plain'),
('WWW-Authenticate', 'Basic realm="confluent"')]
badauth = {'Content-type': 'text/plain',
'WWW-Authenticate': 'Basic realm="confluent"'}
if authorized['code'] == 401:
start_response('401 Authentication Required', badauth)
yield 'authentication required'
return
rsp = await make_response('text/plain', 401, 'Authentication Required', badauth)
await rsp.write(b'authentication required')
return rsp
if authorized['code'] == 403:
start_response('403 Forbidden', badauth)
rsp = await make_response('application/json', 403, 'Forbidden', badauth)
response = {'result': 'Forbidden'}
if 'prompts' in authorized:
response['prompts'] = []
@ -696,40 +709,38 @@ def resourcehandler_backend(env, start_response):
if not isinstance(prompt, str):
prompt = prompt.decode('utf8')
response['prompts'].append(prompt)
yield json.dumps(response)
return
await rsp.write(json.dumps(response))
return rsp
if authorized['code'] != 200:
raise Exception("Unrecognized code from auth engine")
headers.extend(
("Set-Cookie", m.OutputString())
for m in authorized.get('cookie', {}).values())
cookies = authorized.get('cookie', None)
cfgmgr = authorized['cfgmgr']
if (operation == 'create') and env['PATH_INFO'] == '/sessions/current/async':
if (operation == 'create') and reqpath == '/sessions/current/async':
pagecontent = ""
try:
for rsp in _assemble_json(
async for rsp in _assemble_json(
confluent.asynchttp.handle_async(
env, querydict,
httpsessions[authorized['sessionid']]['inflight'])):
pagecontent += rsp
start_response("200 OK", headers)
rsp = await make_response(mimetype, 200, cookies=cookies)
if not isinstance(pagecontent, bytes):
pagecontent = pagecontent.encode('utf-8')
yield pagecontent
return
await rsp.write(pagecontent)
return rsp
except exc.ConfluentException as e:
if e.apierrorcode == 500:
# raise generics to trigger the tracelog
raise
start_response('{0} {1}'.format(e.apierrorcode, e.apierrorstr),
headers)
yield e.get_error_body()
elif (env['PATH_INFO'].endswith('/forward/web') and
env['PATH_INFO'].startswith('/nodes/')):
prefix, _, _ = env['PATH_INFO'].partition('/forward/web')
rsp = await make_response(mimetype, e.apierrorcode, e.apierrorstr)
await rsp.write(e.get_error_body())
return rsp
elif (reqpath.endswith('/forward/web') and
reqpath.startswith('/nodes/')):
prefix, _, _ = reqpath.partition('/forward/web')
#_, _, nodename = prefix.rpartition('/')
default = False
if 'default' in env['PATH_INFO']:
if 'default' in reqpath:
default = True
_,_,nodename,_ = prefix.split('/')
else:
@ -738,33 +749,33 @@ def resourcehandler_backend(env, start_response):
targip = hm.get(nodename, {}).get(
'hardwaremanagement.manager', {}).get('value', None)
if not targip:
start_response('404 Not Found', headers)
yield 'No hardwaremanagement.manager defined for node'
return
rsp = await make_response('text/plain', 404)
await rsp.write(b'No hardwaremanagement.manager defined for node')
return rsp
targip = targip.split('/', 1)[0]
if default:
try:
ip_info = socket.getaddrinfo(targip, 0, 0, socket.SOCK_STREAM)
except socket.gaierror:
start_response('404 Not Found', headers)
yield 'hardwaremanagement.manager definition could not be resolved'
return
rsp = await make_response('text/plain', 404)
await rsp.write(b'hardwaremanagement.manager definition could not be resolved')
return rsp
# this is just to future proof just in case the indexes of the address family change in future
for i in range(len(ip_info)):
if ip_info[i][0] == socket.AF_INET:
url = 'https://{0}/'.format(ip_info[i][-1][0])
start_response('302', [('Location', url)])
yield 'Our princess is in another castle!'
return
rsp = await make_response('text/plain', 302, headers={'Location': url})
await rsp.write(b'Our princess is in another castle!')
return rsp
elif ip_info[i][0] == socket.AF_INET6:
url = 'https://[{0}]/'.format(ip_info[i][-1][0])
if url.startswith('https://[fe80'):
start_response('405 Method Not Allowed', headers)
yield 'link local ipv6 address cannot be used in browser'
return
start_response('302', [('Location', url)])
yield 'Our princess is in another castle!'
return
rsp = await make_response('text/plain', 405)
await rsp.write(b'link local ipv6 address cannot be used in browser')
return rsp
rsp = await make_response('text/plain', 302, {'Location': url})
await rsp.write(b'Our princess is in another castle!')
return rsp
funport = forwarder.get_port(targip, env['HTTP_X_FORWARDED_FOR'],
authorized['sessionid'])
host = env['HTTP_X_FORWARDED_HOST']
@ -774,22 +785,22 @@ def resourcehandler_backend(env, start_response):
host = host.rsplit(':', 1)[0]
url = 'https://{0}:{1}/'.format(host, funport)
start_response('302', [('Location', url)])
yield 'Our princess is in another castle!'
return
elif (operation == 'create' and ('/console/session' in env['PATH_INFO'] or
'/shell/sessions/' in env['PATH_INFO'])):
rsp.write(b'Our princess is in another castle!')
return rsp
elif (operation == 'create' and ('/console/session' in reqpath or
'/shell/sessions/' in reqpath)):
#hard bake JSON into this path, do not support other incarnations
if '/console/session' in env['PATH_INFO']:
prefix, _, _ = env['PATH_INFO'].partition('/console/session')
if '/console/session' in reqpath:
prefix, _, _ = reqpath.partition('/console/session')
shellsession = False
elif '/shell/sessions/' in env['PATH_INFO']:
prefix, _, _ = env['PATH_INFO'].partition('/shell/sessions')
elif '/shell/sessions/' in reqpath:
prefix, _, _ = reqpath.partition('/shell/sessions')
shellsession = True
_, _, nodename = prefix.rpartition('/')
if 'session' not in querydict.keys() or not querydict['session']:
auditmsg = {
'operation': 'start',
'target': env['PATH_INFO'],
'target': reqpath,
'user': util.stringify(authorized['username']),
}
if 'tenant' in authorized:
@ -803,7 +814,7 @@ def resourcehandler_backend(env, start_response):
height = querydict.get('height', 24)
datacallback = None
asynchdl = None
if 'HTTP_CONFLUENTASYNCID' in env:
if 'ConfluentAsyncId' in req.headers:
asynchdl = confluent.asynchttp.get_async(env, querydict)
termrel = asynchdl.set_term_relation(env)
datacallback = termrel.got_data
@ -821,35 +832,35 @@ def resourcehandler_backend(env, start_response):
datacallback=datacallback, width=width, height=height
)
except exc.NotFoundException:
start_response("404 Not found", headers)
yield "404 - Request Path not recognized"
return
rsp = await make_response('text/plain', 404)
await rsp.write(b"404 - Request Path not recognized")
return rsp
if not consession:
start_response("500 Internal Server Error", headers)
return
rsp = await make_response('', 500)
return rsp
sessid = _assign_consessionid(consession)
if asynchdl:
asynchdl.add_console_session(sessid)
start_response('200 OK', headers)
yield '{"session":"%s","data":""}' % sessid
return
rsp = await make_response('application/json', 200, cookies=cookies)
rsp.write(b'{"session":"%s","data":""}' % sessid)
return rsp
elif 'bytes' in querydict.keys(): # not keycodes...
myinput = querydict['bytes']
sessid = querydict['session']
if sessid not in consolesessions:
start_response('400 Expired Session', headers)
return
return rsp
consolesessions[sessid]['expiry'] = time.time() + 90
consolesessions[sessid]['session'].write(myinput)
start_response('200 OK', headers)
yield json.dumps({'session': querydict['session']})
return # client has requests to send or receive, not both...
rsp.write(json.dumps({'session': querydict['session']}))
return rsp # client has requests to send or receive, not both...
elif 'closesession' in querydict:
consolesessions[querydict['session']]['session'].destroy()
del consolesessions[querydict['session']]
start_response('200 OK', headers)
yield '{"sessionclosed": true}'
return
rsp.write(b'{"sessionclosed": true}')
return rsp
elif 'action' in querydict:
if querydict['action'] == 'break':
consolesessions[querydict['session']]['session'].send_break()
@ -860,16 +871,16 @@ def resourcehandler_backend(env, start_response):
consolesessions[querydict['session']]['session'].reopen()
else:
start_response('400 Bad Request')
yield 'Unrecognized action ' + querydict['action']
return
rsp.write(b'Unrecognized action ' + querydict['action'])
return rsp
start_response('200 OK', headers)
yield json.dumps({'session': querydict['session']})
rsp.write(json.dumps({'session': querydict['session']}))
return rsp
else: # no keys, but a session, means it's hooking to receive data
sessid = querydict['session']
if sessid not in consolesessions:
start_response('400 Expired Session', headers)
yield ''
return
return rsp
consolesessions[sessid]['expiry'] = time.time() + 90
# add our thread to the 'inflight' to have a hook to terminate
# a long polling request
@ -885,13 +896,12 @@ def resourcehandler_backend(env, start_response):
mythreadid)
if sessid not in consolesessions:
start_response('400 Expired Session', headers)
yield ''
return
return rsp
if loggedout is not None:
consolesessions[sessid]['session'].destroy()
start_response('401 Logged out', headers)
yield '{"loggedout": 1}'
return
rsp.write(b'{"loggedout": 1}')
return rsp
bufferage = False
if 'stampsent' not in consolesessions[sessid]:
consolesessions[sessid]['stampsent'] = True
@ -905,69 +915,70 @@ def resourcehandler_backend(env, start_response):
if bufferage is not False:
rspdata['bufferage'] = bufferage
try:
rsp = json.dumps(rspdata)
rspj = json.dumps(rspdata)
except UnicodeDecodeError:
try:
rsp = json.dumps(rspdata, encoding='cp437')
rspj = json.dumps(rspdata, encoding='cp437')
except UnicodeDecodeError:
rsp = json.dumps({'session': querydict['session'],
rspj = json.dumps({'session': querydict['session'],
'data': 'DECODEERROR'})
start_response('200 OK', headers)
yield rsp
return
rsp.write(rspj)
return rsp
else:
# normal request
url = env['PATH_INFO']
url = reqpath
url = url.replace('.json', '')
url = url.replace('.html', '')
if url == '/sessions/current/info':
start_response('200 OK', headers)
rsp = await make_response('application/json', 200, cookies=cookies)
sessinfo = {'username': authorized['username']}
if 'authtoken' in authorized:
sessinfo['authtoken'] = authorized['authtoken']
if 'sessionid' in authorized:
sessinfo['sessionid'] = authorized['sessionid']
tlvdata.unicode_dictvalues(sessinfo)
yield json.dumps(sessinfo)
return
await rsp.write(json.dumps(sessinfo).encode('utf8'))
return rsp
elif url.startswith('/sessions/current/webauthn/'):
if not webauthn:
start_response('501 Not Implemented', headers)
yield ''
return
for rsp in webauthn.handle_api_request(url, env, start_response, authorized['username'], cfgmgr, headers, reqbody, authorized):
yield rsp
return
return rsp
for rspd in webauthn.handle_api_request(url, env, start_response, authorized['username'], cfgmgr, headers, reqbody, authorized):
rsp.write(rspd)
return rsp
resource = '.' + url[url.rindex('/'):]
lquerydict = copy.deepcopy(querydict)
try:
hdlr = pluginapi.handle_path(url, operation,
cfgmgr, querydict)
if 'HTTP_CONFLUENTASYNCID' in env:
if 'ConfluentAsyncId' in req.headers:
confluent.asynchttp.run_handler(hdlr, env)
start_response('202 Accepted', headers)
yield 'Request queued'
return
await make_response('text/plain', 202, cookies=cookies)
rsp.write(b'Request queued')
return rsp
pagecontent = ""
if mimetype == 'text/html':
for datum in _assemble_html(hdlr, resource, lquerydict, url,
extension):
pagecontent += datum
else:
for datum in _assemble_json(hdlr, resource, url, extension):
async for datum in _assemble_json(hdlr, resource, url, extension):
pagecontent += datum
start_response('200 OK', headers)
rsp = await make_response(mimetype, 200, cookies=cookies)
if not isinstance(pagecontent, bytes):
pagecontent = pagecontent.encode('utf-8')
yield pagecontent
await rsp.write(pagecontent)
return rsp
except exc.ConfluentException as e:
if ((not isinstance(e, exc.LockedCredentials)) and
e.apierrorcode == 500):
# raise generics to trigger the tracelog
raise
start_response('{0} {1}'.format(e.apierrorcode, e.apierrorstr),
headers)
yield e.get_error_body()
rsp = await make_response(mimetype, e.apierrorcode, e.apierrorstr,
cookies=cookies)
await rsp.write(e.get_error_body().encode('utf8'))
return rsp
def _assemble_html(responses, resource, querydict, url, extension):
yield '<html><head><meta charset="UTF-8"><title>' \
@ -1024,7 +1035,7 @@ def _assemble_html(responses, resource, querydict, url, extension):
'</form></body></html>')
def _assemble_json(responses, resource=None, url=None, extension=None):
async def _assemble_json(responses, resource=None, url=None, extension=None):
#NOTE(jbjohnso) I'm considering giving up on yielding bit by bit
#in json case over http. Notably, duplicate key values from plugin
#overwrite, but we'd want to preserve them into an array instead.
@ -1040,7 +1051,7 @@ def _assemble_json(responses, resource=None, url=None, extension=None):
else:
links['collection'] = {"href": "./" + extension}
rspdata = {}
for rsp in responses:
for rsp in await responses:
if isinstance(rsp, confluent.messages.LinkRelation):
haldata = rsp.raw()
for hk in haldata:
@ -1082,40 +1093,44 @@ def _assemble_json(responses, resource=None, url=None, extension=None):
rspdata, sort_keys=True, indent=4, ensure_ascii=False).encode('utf-8'))
def serve(bind_host, bind_port):
async def serve(bind_host, bind_port):
# TODO(jbjohnso): move to unix socket and explore
# either making apache deal with it
# or just supporting nginx or lighthttpd
# for now, http port access
#scgi.WSGIServer(resourcehandler, bindAddress=("localhost",4004)).run()
#based on a bakeoff perf wise, eventlet http support proxied actually did
#edge out patched flup. unpatched flup was about the same as eventlet http
#but deps are simpler without flup
#also, the potential for direct http can be handy
#todo remains unix domain socket for even http
# todo remains unix domain socket for even http
sock = None
while not sock:
try:
sock = eventlet.listen(
(bind_host, bind_port, 0, 0), family=socket.AF_INET6)
sock = socket.socket(socket.AF_INET6)
sock.settimeout(0)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
sock.bind((bind_host, bind_port, 0, 0))
sock.listen(128)
except socket.error as e:
if e.errno != 98:
raise
sys.stderr.write(
'Failed to open HTTP due to busy port, trying again in'
' a second\n')
eventlet.sleep(1)
await asyncio.sleep(1)
# TCP_FASTOPEN
try:
sock.setsockopt(socket.SOL_TCP, 23, 5)
except Exception:
pass # we gave it our best shot there
try:
eventlet.wsgi.server(sock, resourcehandler, log=False, log_output=False,
debug=False, socket_timeout=60, keepalive=False)
except TypeError:
# Older eventlet in place, skip arguments it does not understand
eventlet.wsgi.server(sock, resourcehandler, log=False, debug=False)
app = web.Application()
app.router.add_route("*", "/{path_info:.*}", resourcehandler)
runner = web.AppRunner(app)
await runner.setup()
site = web.SockSite(runner, sock)
await site.start()
# eventlet.wsgi.server(sock, resourcehandler, log=False, log_output=False,
# debug=False, socket_timeout=60, keepalive=False)
class HttpApi(object):
@ -1129,7 +1144,9 @@ class HttpApi(object):
global tracelog
tracelog = log.Logger('trace')
auditlog = log.Logger('audit')
self.server = eventlet.spawn(serve, self.bind_host, self.bind_port)
self.server = asyncio.get_event_loop().create_task(
serve(self.bind_host, self.bind_port))
# self.server = eventlet.spawn(serve, self.bind_host, self.bind_port)
_cleaner = eventlet.spawn(_sessioncleaner)

View File

@ -81,7 +81,7 @@ def get_extra_names(nodename, cfg, myip=None):
names.add(nip)
return names
def handle_request(env, start_response):
def handle_request(req, make_response):
global currtz
global keymap
global currlocale