diff --git a/confluent_server/confluent/asynchttp.py b/confluent_server/confluent/asynchttp.py index d9353c16..ff14d783 100644 --- a/confluent_server/confluent/asynchttp.py +++ b/confluent_server/confluent/asynchttp.py @@ -84,9 +84,9 @@ class AsyncSession(object): if not wshandler: self.reaper = eventlet.spawn_after(15, self.destroy) - def add(self, requestid, rsp): + async def add(self, requestid, rsp): if self.wshandler: - self.wshandler(messages.AsyncMessage((requestid, rsp))) + await self.wshandler(messages.AsyncMessage((requestid, rsp))) if self.responses is None: return self.responses.append((requestid, rsp)) @@ -116,10 +116,10 @@ class AsyncSession(object): self.responses = None del _asyncsessions[self.asyncid] - def run_handler(self, handler, requestid): + async def run_handler(self, handler, requestid): try: for rsp in handler: - self.add(requestid, rsp) + await self.add(requestid, rsp) self.add(requestid, messages.AsyncCompletion()) except Exception as e: self.add(requestid, e) @@ -146,7 +146,7 @@ class AsyncSession(object): yield self.responses.popleft() -def run_handler(hdlr, env): +async def run_handler(hdlr, env): asyncsessid = env['HTTP_CONFLUENTASYNCID'] try: asyncsession = _asyncsessions[asyncsessid]['asyncsession'] diff --git a/confluent_server/confluent/debugger.py b/confluent_server/confluent/debugger.py index 41fc152d..b6980d26 100644 --- a/confluent_server/confluent/debugger.py +++ b/confluent_server/confluent/debugger.py @@ -1,3 +1,4 @@ +import asyncio import code import sys @@ -6,26 +7,29 @@ import sys # since we have to asyncio up the input and output, we use InteractiveInterpreter and handle the # input ourselves, since code is not asyncio friendly in and of itself #code.InteractiveConsole().interact() -prompt = '>>> ' -itr = code.InteractiveInterpreter() -while True: - sys.stdout.write(prompt) - prompt = '... ' - sys.stdout.flush() - newinput = input() - somecode += newinput + '\n' - if newinput.startswith(' '): +async def interact(sock): + cloop = asyncio.get_event_loop() + prompt = '>>> ' + itr = code.InteractiveInterpreter() + while True: + await cloop.sock_sendall(prompt) prompt = '... ' - continue - try: - compcode = code.compile_command(somecode) - except SyntaxError as e: - print(repr(e)) - compcode = None - somecode = '' - prompt = '>>> ' - if compcode: - itr.runcode(compcode) - somecode = '' - prompt = '>>> ' + newinput = b'' + while b'\n' not in newinput: + newinput += await cloop.sock_recv() + somecode += newinput + if newinput.startswith(' '): + prompt = '... ' + continue + try: + compcode = code.compile_command(somecode) + except SyntaxError as e: + await cloop.sock_sendall(repr(e).encode('utf8')) + compcode = None + somecode = '' + prompt = '>>> ' + if compcode: + itr.runcode(compcode) + somecode = '' + prompt = '>>> ' diff --git a/confluent_server/confluent/httpapi.py b/confluent_server/confluent/httpapi.py index e30df36d..f10b4207 100644 --- a/confluent_server/confluent/httpapi.py +++ b/confluent_server/confluent/httpapi.py @@ -25,6 +25,8 @@ try: import confluent.webauthn as webauthn except ImportError: webauthn = None +import asyncio +from aiohttp import web, web_urldispatcher, connector, ClientSession import confluent.auth as auth import confluent.config.attributes as attribs import confluent.config.configmanager as configmanager @@ -55,7 +57,6 @@ except ModuleNotFoundError: import urllib.parse as urlparse import eventlet.websocket import eventlet.wsgi -#scgi = eventlet.import_patched('flup.server.scgi') tlvdata = confluent.tlvdata @@ -167,16 +168,10 @@ def _sessioncleaner(): eventlet.sleep(10) -def _get_query_dict(env, reqbody, reqtype): +def _get_query_dict(req, reqbody, reqtype): qdict = {} - try: - qstring = env['QUERY_STRING'] - except KeyError: - qstring = None - if qstring: - for qpair in qstring.split('&'): - qkey, qvalue = qpair.split('=') - qdict[qkey] = qvalue + qstring = req.rel_url.query_string + qdict.update(req.rel_url.query) if reqbody is not None: if "application/x-www-form-urlencoded" in reqtype: if not isinstance(reqbody, str): @@ -205,24 +200,25 @@ def _get_query_dict(env, reqbody, reqtype): qdict = nqdict return qdict -def _should_skip_authlog(env): - if ('/console/session' in env['PATH_INFO'] or - '/shell/sessions/' in env['PATH_INFO']): +def _should_skip_authlog(req): + thepath = req.rel_url.path + if ('/console/session' in thepath or + '/shell/sessions/' in thepath): # we should only log starting of a console return True - if '/sessions/current/async' in env['PATH_INFO']: + if '/sessions/current/async' in thepath: # this is effectively invisible return True - if '/sessions/current/webauthn/registered_credentials' in env['PATH_INFO']: + if '/sessions/current/webauthn/registered_credentials' in thepath: return True - if (env['REQUEST_METHOD'] == 'GET' and - ('/sensors/' in env['PATH_INFO'] or - '/health/' in env['PATH_INFO'] or - '/power/state' in env['PATH_INFO'] or - '/nodes/' == env['PATH_INFO'] or - '/sessions/current/info' == env['PATH_INFO'] or - (env['PATH_INFO'].startswith('/noderange/') and - env['PATH_INFO'].endswith('/nodes/')))): + if (req.method == 'GET' and + ('/sensors/' in thepath or + '/health/' in thepath or + '/power/state' in thepath or + '/nodes/' == thepath or + '/sessions/current/info' == thepath or + (thepath.startswith('/noderange/') and + thepath.endswith('/nodes/')))): # these are pretty innocuous, and noisy to log. return True return False @@ -234,10 +230,10 @@ def _csrf_exempt(path): return path == '/sessions/current/info' or path.endswith('/forward/web') -def _csrf_valid(env, session): +def _csrf_valid(req, session): # This could be simplified into a statement, but this is more readable # to have it broken out - if env['REQUEST_METHOD'] == 'GET' and _csrf_exempt(env['PATH_INFO']): + if req.method == 'GET' and _csrf_exempt(req.rel_url.path): # Provide a web client a safe hook to request the CSRF token # This means that we consider GET of /sessions/current/info to be # a safe thing to inflict via CSRF, since CORS should prevent @@ -247,16 +243,17 @@ def _csrf_valid(env, session): if 'csrftoken' not in session: # The client has not (yet) requested CSRF protection # so we return true - if 'HTTP_CONFLUENTAUTHTOKEN' in env: + if 'ConfluentAuthToken' in req.headers: # The client has requested CSRF countermeasures, # oblige the request and apply a new token to the # session session['csrftoken'] = util.randomstring(32) - elif 'HTTP_REFERER' in env: + elif 'Referer' in req.headers: + print('refererrrrrrrr') # If there is a referrer, make sure it stays consistent # across the session. A change in referer is a bad thing try: - referer = env['HTTP_REFERER'].split('/')[2] + referer = req.headers['Referer'].split('/')[2] except IndexError: return False if 'validreferer' not in session: @@ -267,12 +264,12 @@ def _csrf_valid(env, session): # The session has CSRF protection enabled, only mark valid if # the client has provided an auth token and that token matches the # value protecting the session - return ('HTTP_CONFLUENTAUTHTOKEN' in env and - env['HTTP_CONFLUENTAUTHTOKEN'] == session['csrftoken']) + return ('ConfluentAuthToken' in req.headers and + req.headers['ConfluentAuthToken'] == session['csrftoken']) -def _authorize_request(env, operation, reqbody): - """Grant/Deny access based on data from wsgi env +async def _authorize_request(req, operation, reqbody): + """Grant/Deny access based on data from request """ authdata = None @@ -280,7 +277,7 @@ def _authorize_request(env, operation, reqbody): sessionid = None sessid = None cookie = Cookie.SimpleCookie() - element = env['PATH_INFO'] + element = req.rel_url.path if element.startswith('/sessions/current/'): if (element.startswith('/sessions/current/webauthn/registered_credentials/') or element.startswith('/sessions/current/webauthn/validate/')): @@ -289,18 +286,16 @@ def _authorize_request(env, operation, reqbody): else: element = None if not authdata: - if 'HTTP_CONFLUENTSESSION' in env: - sessionid = env['HTTP_CONFLUENTSESSION'] + if 'ConfluentSession' in req.headers: + sessionid = req.headers['ConfluentSession'] + sessid = sessionid + elif 'confluentsessionid' in req.cookies: + sessionid = req.cookies['confluentsessionid'] sessid = sessionid - elif 'HTTP_COOKIE' in env: - cidx = (env['HTTP_COOKIE']).find('confluentsessionid=') - if cidx >= 0: - sessionid = env['HTTP_COOKIE'][cidx+19:cidx+51] - sessid = sessionid if sessionid: if sessionid in httpsessions: - if _csrf_valid(env, httpsessions[sessionid]): - if env['PATH_INFO'] == '/sessions/current/logout': + if _csrf_valid(req, httpsessions[sessionid]): + if element == '/sessions/current/logout': targets = [] for mythread in httpsessions[sessionid]['inflight']: targets.append(mythread) @@ -314,24 +309,24 @@ def _authorize_request(env, operation, reqbody): authdata = auth.authorize( name, element=element, operation=operation, skipuserobj=httpsessions[sessionid]['skipuserobject']) - if (not authdata) and 'HTTP_AUTHORIZATION' in env: - if env['PATH_INFO'] == '/sessions/current/logout': - if 'HTTP_REFERER' in env: + if (not authdata) and 'Authorization' in req.headers: + if element == '/sessions/current/logout': + if 'Referer' in req.headers: # note that this doesn't actually do harm # otherwise, but this way do not give appearance # of something having a side effect if it has the smell # of a CSRF return {'code': 401} return ('logout',) - if env['HTTP_AUTHORIZATION'].startswith('MultiBasic '): + if req.headers['Authorization'].startswith('MultiBasic '): name, passphrase = base64.b64decode( - env['HTTP_AUTHORIZATION'].replace('MultiBasic ', '')).split(b':', 1) + req.headers['Authorization'].replace('MultiBasic ', '')).split(b':', 1) passphrase = json.loads(passphrase) else: name, passphrase = base64.b64decode( - env['HTTP_AUTHORIZATION'].replace('Basic ', '')).split(b':', 1) + req.headers['Authorization'].replace('Basic ', '')).split(b':', 1) try: - authdata = auth.check_user_passphrase(name, passphrase, operation=operation, element=element) + authdata = await auth.check_user_passphrase(name, passphrase, operation=operation, element=element) except Exception as e: if hasattr(e, 'prompts'): return {'code': 403, 'prompts': e.prompts} @@ -340,19 +335,20 @@ def _authorize_request(env, operation, reqbody): return {'code': 403} elif not authdata: return {'code': 401} - sessid = _establish_http_session(env, authdata, name, cookie) + print(repr(authdata)) + sessid = _establish_http_session(req, authdata, name, cookie) if authdata and element and element.startswith('/sessions/current/webauthn/validate/'): if webauthn: for rsp in webauthn.handle_api_request(element, env, None, authdata[2], authdata[1], None, reqbody, None): if rsp['verified']: sessid = _establish_http_session(env, authdata, name, cookie) break - skiplog = _should_skip_authlog(env) + skiplog = _should_skip_authlog(req) if authdata: auditmsg = { 'user': util.stringify(name), 'operation': operation, - 'target': env['PATH_INFO'], + 'target': element, } authinfo = {'code': 200, 'cookie': cookie, @@ -376,14 +372,14 @@ def _authorize_request(env, operation, reqbody): else: return {'code': 403} -def _establish_http_session(env, authdata, name, cookie): +def _establish_http_session(req, authdata, name, cookie): sessid = util.randomstring(32) while sessid in httpsessions: sessid = util.randomstring(32) httpsessions[sessid] = {'name': name, 'expiry': time.time() + 90, 'skipuserobject': authdata[4], 'inflight': set([])} - if 'HTTP_CONFLUENTAUTHTOKEN' in env: + if 'ConfluentAuthToken' in req.headers: httpsessions[sessid]['csrftoken'] = util.randomstring(32) cookie['confluentsessionid'] = util.stringify(sessid) cookie['confluentsessionid']['secure'] = 1 @@ -392,7 +388,7 @@ def _establish_http_session(env, authdata, name, cookie): return sessid -def _pick_mimetype(env): +def _pick_mimetype(req): """Detect the http indicated mime to send back. Note that as it gets into the ACCEPT header honoring, it only looks for @@ -401,11 +397,11 @@ def _pick_mimetype(env): XMLHttpRequest.setRequestHeader will be used by clever javascript if the '.json' scheme doesn't cut it. """ - if env['PATH_INFO'].endswith('.json'): + if req.rel_url.path.endswith('.json'): return 'application/json; charset=utf-8', '.json' - elif env['PATH_INFO'].endswith('.html'): + elif req.rel_url.path.endswith('.html'): return 'text/html', '.html' - elif 'HTTP_ACCEPT' in env and 'application/json' in env['HTTP_ACCEPT']: + elif 'Accept' in req.headers and 'application/json' in req.headers['Accept']: return 'application/json; charset=utf-8', '' else: return 'text/html', '' @@ -438,45 +434,52 @@ def datacallback_bound(clientsessid, ws): ws.send('${0}$'.format(clientsessid) + data) return datacallback -@eventlet.websocket.WebSocketWSGI.configured( - supported_protocols=['confluent.console', 'confluent.asyncweb']) -def wsock_handler(ws): - sessid = ws.wait() +async def wsock_handler(req): + + rsp = web.WebSocketResponse( + heartbeat=25.0, + protocols=('confluent.console', 'confluent.asyncweb')) + await rsp.prepare(req) + + + sessid = await rsp.receive() if not sessid: return + sessid = sessid.data sessid = sessid.replace('ConfluentSessionId:', '') sessid = sessid[:-1] currsess = httpsessions.get(sessid, None) if not currsess: return - authtoken = ws.wait() + authtoken = await rsp.receive() + authtoken = authtoken.data authtoken = authtoken.replace('ConfluentAuthToken:', '') authtoken = authtoken[:-1] if currsess['csrftoken'] != authtoken: return - mythreadid = greenlet.getcurrent() - httpsessions[sessid]['inflight'].add(mythreadid) + httpsessions[sessid]['inflight'].add(rsp) name = httpsessions[sessid]['name'] - authdata = auth.authorize(name, ws.path, operation='start') + print(req.rel_url.path) + authdata = auth.authorize(name, req.rel_url.path, operation='start') if not authdata: return cfgmgr = httpsessions[sessid]['cfgmgr'] username = httpsessions[sessid]['name'] - if ws.path == '/sessions/current/async': + if req.rel_url.path == '/sessions/current/async': myconsoles = {} - def asyncwscallback(rsp): + async def asyncwscallback(rsp): rsp = json.dumps(rsp.raw()) - ws.send(u'!' + rsp) - mythreadid = greenlet.getcurrent() - currsess['inflight'].add(mythreadid) + await rsp.send_str(u'!' + rsp) + currsess['inflight'].add(rsp) asess = None try: for asess in confluent.asynchttp.handle_async( {}, {}, currsess['inflight'], asyncwscallback): - ws.send(u' ASYNCID: {0}'.format(asess.asyncid)) + await rsp.send_str(u' ASYNCID: {0}'.format(asess.asyncid)) clientmsg = True while clientmsg: - clientmsg = ws.wait() + clientmsg = await rsp.receive() + clientmsg = clientmsg.data if clientmsg: if clientmsg[0] == '?': ws.send('?') @@ -580,7 +583,6 @@ def wsock_handler(ws): ) except exc.NotFoundException: return - mythreadid = greenlet.getcurrent() currsess['inflight'].add(mythreadid) clientmsg = ws.wait() try: @@ -603,47 +605,62 @@ def wsock_handler(ws): consession.destroy() -def resourcehandler(env, start_response): +async def resourcehandler(request): + # start_response is akin to doing headers + # and calling 'prepare() on a 'StreamResponse' + # any 'yield' needs to become a write to the streamresponse + async def make_response(mimetype, status=200, reason=None, headers=None, cookies=None): + rspheaders = { + 'Cache-Control': 'no-store', + 'Pragma': 'no-cache', + 'X-Content-Type-Options': 'nosniff', + 'Content-Security-Policy': "default-src 'self'", + 'X-XSS-Protection': '1; mode=block', + 'X-Frame-Options': 'deny', + 'Strict-Transport-Security': 'max-age=86400', + 'X-Permitted-Cross-Domain-Policies': 'none', + } + if headers: + rspheaders.update(headers) + rsp = web.StreamResponse(status=status, reason=reason, headers=rspheaders) + if cookies: + rsp.cookies.update(cookies) + rsp.content_type = mimetype + await rsp.prepare(request) + return rsp + try: - if 'HTTP_SEC_WEBSOCKET_VERSION' in env: - for rsp in wsock_handler(env, start_response): - yield rsp + if 'Sec-WebSocket-Version' in request.headers: + print('WebSocket....') + return await wsock_handler(request) else: - for rsp in resourcehandler_backend(env, start_response): - yield rsp + return await resourcehandler_backend(request, make_response) except Exception as e: tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event, event=log.Events.stacktrace) - start_response('500 - ' + str(e), []) - yield '500 - ' + str(e) - return + #start_response('500 - ' + str(e), []) + rsp = web.StreamResponse(status=500, reason=str(e)) + await rsp.prepare(request) + await rsp.write(str(e).encode('utf8')) + return rsp -def resourcehandler_backend(env, start_response): + +async def resourcehandler_backend(req, make_response): """Function to handle new wsgi requests """ - mimetype, extension = _pick_mimetype(env) - headers = [('Content-Type', mimetype), ('Cache-Control', 'no-store'), - ('Pragma', 'no-cache'), - ('X-Content-Type-Options', 'nosniff'), - ('Content-Security-Policy', "default-src 'self'"), - ('X-XSS-Protection', '1; mode=block'), ('X-Frame-Options', 'deny'), - ('Strict-Transport-Security', 'max-age=86400'), - ('X-Permitted-Cross-Domain-Policies', 'none')] + mimetype, extension = _pick_mimetype(req) + reqbody = None reqtype = None - if env.get('PATH_INFO', '').startswith('/self/'): - for res in selfservice.handle_request(env, start_response): - yield res - return - reqpath = env.get('PATH_INFO', '') + reqpath = req.rel_url.path + if reqpath.startswith('/self/'): + return await selfservice.handle_request(req, make_response) if reqpath.startswith('/boot/'): - request = env['PATH_INFO'].split('/') + request = reqpath.split('/') if not request[0]: request = request[1:] if len(request) != 4: - start_response('400 Bad Request', headers) - yield '' - return + return await make_response(mimetype, 400, 'Bad Request') if request[1] == 'by-mac': mac = request[2].replace('-', ':') nodename = disco.get_node_by_uuid_or_mac(mac) @@ -657,38 +674,34 @@ def resourcehandler_backend(env, start_response): nodec = cfg.get_node_attributes(nodename, 'deployment.pendingprofile') pprofile = nodec.get(nodename, {}).get('deployment.pendingprofile', {}).get('value', None) if not pprofile: - start_response('404 Not Found', headers) - yield '' - return + return await make_response(mimetype, 404, 'Not Found') redir = '/confluent-public/os/{0}/boot.{1}'.format(pprofile, bootfile) - headers.append(('Location', redir)) - start_response('302 Found', headers) - yield '' + rsp = make_response(mimetype, 302, 'Found', {'Location': redir}) return - if 'CONTENT_LENGTH' in env and int(env['CONTENT_LENGTH']) > 0: + if req.content_length: reqbody = env['wsgi.input'].read(int(env['CONTENT_LENGTH'])) reqtype = env['CONTENT_TYPE'] - operation = opmap[env['REQUEST_METHOD']] - querydict = _get_query_dict(env, reqbody, reqtype) + operation = opmap[req.method] + querydict = _get_query_dict(req, reqbody, reqtype) if operation != 'retrieve' and 'restexplorerop' in querydict: operation = querydict['restexplorerop'] del querydict['restexplorerop'] - authorized = _authorize_request(env, operation, reqbody) + authorized = await _authorize_request(req, operation, reqbody) if 'logout' in authorized: - start_response('200 Successful logout', headers) - yield('{"result": "200 - Successful logout"}') + rsp = await make_response("application/json", 200, 'Successful logout') + await rsp.write(b'{"result": "200 - Successful logout"}') return - if 'HTTP_SUPPRESSAUTHHEADER' in env or 'HTTP_CONFLUENTAUTHTOKEN' in env: - badauth = [('Content-type', 'text/plain')] + if 'SuppressAuthHeader' in req.headers or 'ConfluentAuthToken' in req.headers: + badauth = {'Content-type': 'text/plain'} else: - badauth = [('Content-type', 'text/plain'), - ('WWW-Authenticate', 'Basic realm="confluent"')] + badauth = {'Content-type': 'text/plain', + 'WWW-Authenticate': 'Basic realm="confluent"'} if authorized['code'] == 401: - start_response('401 Authentication Required', badauth) - yield 'authentication required' - return + rsp = await make_response('text/plain', 401, 'Authentication Required', badauth) + await rsp.write(b'authentication required') + return rsp if authorized['code'] == 403: - start_response('403 Forbidden', badauth) + rsp = await make_response('application/json', 403, 'Forbidden', badauth) response = {'result': 'Forbidden'} if 'prompts' in authorized: response['prompts'] = [] @@ -696,40 +709,38 @@ def resourcehandler_backend(env, start_response): if not isinstance(prompt, str): prompt = prompt.decode('utf8') response['prompts'].append(prompt) - yield json.dumps(response) - return + await rsp.write(json.dumps(response)) + return rsp if authorized['code'] != 200: raise Exception("Unrecognized code from auth engine") - headers.extend( - ("Set-Cookie", m.OutputString()) - for m in authorized.get('cookie', {}).values()) + cookies = authorized.get('cookie', None) cfgmgr = authorized['cfgmgr'] - if (operation == 'create') and env['PATH_INFO'] == '/sessions/current/async': + if (operation == 'create') and reqpath == '/sessions/current/async': pagecontent = "" try: - for rsp in _assemble_json( + async for rsp in _assemble_json( confluent.asynchttp.handle_async( env, querydict, httpsessions[authorized['sessionid']]['inflight'])): pagecontent += rsp - start_response("200 OK", headers) + rsp = await make_response(mimetype, 200, cookies=cookies) if not isinstance(pagecontent, bytes): pagecontent = pagecontent.encode('utf-8') - yield pagecontent - return + await rsp.write(pagecontent) + return rsp except exc.ConfluentException as e: if e.apierrorcode == 500: # raise generics to trigger the tracelog raise - start_response('{0} {1}'.format(e.apierrorcode, e.apierrorstr), - headers) - yield e.get_error_body() - elif (env['PATH_INFO'].endswith('/forward/web') and - env['PATH_INFO'].startswith('/nodes/')): - prefix, _, _ = env['PATH_INFO'].partition('/forward/web') + rsp = await make_response(mimetype, e.apierrorcode, e.apierrorstr) + await rsp.write(e.get_error_body()) + return rsp + elif (reqpath.endswith('/forward/web') and + reqpath.startswith('/nodes/')): + prefix, _, _ = reqpath.partition('/forward/web') #_, _, nodename = prefix.rpartition('/') default = False - if 'default' in env['PATH_INFO']: + if 'default' in reqpath: default = True _,_,nodename,_ = prefix.split('/') else: @@ -738,33 +749,33 @@ def resourcehandler_backend(env, start_response): targip = hm.get(nodename, {}).get( 'hardwaremanagement.manager', {}).get('value', None) if not targip: - start_response('404 Not Found', headers) - yield 'No hardwaremanagement.manager defined for node' - return + rsp = await make_response('text/plain', 404) + await rsp.write(b'No hardwaremanagement.manager defined for node') + return rsp targip = targip.split('/', 1)[0] if default: try: ip_info = socket.getaddrinfo(targip, 0, 0, socket.SOCK_STREAM) except socket.gaierror: - start_response('404 Not Found', headers) - yield 'hardwaremanagement.manager definition could not be resolved' - return + rsp = await make_response('text/plain', 404) + await rsp.write(b'hardwaremanagement.manager definition could not be resolved') + return rsp # this is just to future proof just in case the indexes of the address family change in future for i in range(len(ip_info)): if ip_info[i][0] == socket.AF_INET: url = 'https://{0}/'.format(ip_info[i][-1][0]) - start_response('302', [('Location', url)]) - yield 'Our princess is in another castle!' - return + rsp = await make_response('text/plain', 302, headers={'Location': url}) + await rsp.write(b'Our princess is in another castle!') + return rsp elif ip_info[i][0] == socket.AF_INET6: url = 'https://[{0}]/'.format(ip_info[i][-1][0]) if url.startswith('https://[fe80'): - start_response('405 Method Not Allowed', headers) - yield 'link local ipv6 address cannot be used in browser' - return - start_response('302', [('Location', url)]) - yield 'Our princess is in another castle!' - return + rsp = await make_response('text/plain', 405) + await rsp.write(b'link local ipv6 address cannot be used in browser') + return rsp + rsp = await make_response('text/plain', 302, {'Location': url}) + await rsp.write(b'Our princess is in another castle!') + return rsp funport = forwarder.get_port(targip, env['HTTP_X_FORWARDED_FOR'], authorized['sessionid']) host = env['HTTP_X_FORWARDED_HOST'] @@ -774,22 +785,22 @@ def resourcehandler_backend(env, start_response): host = host.rsplit(':', 1)[0] url = 'https://{0}:{1}/'.format(host, funport) start_response('302', [('Location', url)]) - yield 'Our princess is in another castle!' - return - elif (operation == 'create' and ('/console/session' in env['PATH_INFO'] or - '/shell/sessions/' in env['PATH_INFO'])): + rsp.write(b'Our princess is in another castle!') + return rsp + elif (operation == 'create' and ('/console/session' in reqpath or + '/shell/sessions/' in reqpath)): #hard bake JSON into this path, do not support other incarnations - if '/console/session' in env['PATH_INFO']: - prefix, _, _ = env['PATH_INFO'].partition('/console/session') + if '/console/session' in reqpath: + prefix, _, _ = reqpath.partition('/console/session') shellsession = False - elif '/shell/sessions/' in env['PATH_INFO']: - prefix, _, _ = env['PATH_INFO'].partition('/shell/sessions') + elif '/shell/sessions/' in reqpath: + prefix, _, _ = reqpath.partition('/shell/sessions') shellsession = True _, _, nodename = prefix.rpartition('/') if 'session' not in querydict.keys() or not querydict['session']: auditmsg = { 'operation': 'start', - 'target': env['PATH_INFO'], + 'target': reqpath, 'user': util.stringify(authorized['username']), } if 'tenant' in authorized: @@ -803,7 +814,7 @@ def resourcehandler_backend(env, start_response): height = querydict.get('height', 24) datacallback = None asynchdl = None - if 'HTTP_CONFLUENTASYNCID' in env: + if 'ConfluentAsyncId' in req.headers: asynchdl = confluent.asynchttp.get_async(env, querydict) termrel = asynchdl.set_term_relation(env) datacallback = termrel.got_data @@ -821,35 +832,35 @@ def resourcehandler_backend(env, start_response): datacallback=datacallback, width=width, height=height ) except exc.NotFoundException: - start_response("404 Not found", headers) - yield "404 - Request Path not recognized" - return + rsp = await make_response('text/plain', 404) + await rsp.write(b"404 - Request Path not recognized") + return rsp if not consession: - start_response("500 Internal Server Error", headers) - return + rsp = await make_response('', 500) + return rsp sessid = _assign_consessionid(consession) if asynchdl: asynchdl.add_console_session(sessid) - start_response('200 OK', headers) - yield '{"session":"%s","data":""}' % sessid - return + rsp = await make_response('application/json', 200, cookies=cookies) + rsp.write(b'{"session":"%s","data":""}' % sessid) + return rsp elif 'bytes' in querydict.keys(): # not keycodes... myinput = querydict['bytes'] sessid = querydict['session'] if sessid not in consolesessions: start_response('400 Expired Session', headers) - return + return rsp consolesessions[sessid]['expiry'] = time.time() + 90 consolesessions[sessid]['session'].write(myinput) start_response('200 OK', headers) - yield json.dumps({'session': querydict['session']}) - return # client has requests to send or receive, not both... + rsp.write(json.dumps({'session': querydict['session']})) + return rsp # client has requests to send or receive, not both... elif 'closesession' in querydict: consolesessions[querydict['session']]['session'].destroy() del consolesessions[querydict['session']] start_response('200 OK', headers) - yield '{"sessionclosed": true}' - return + rsp.write(b'{"sessionclosed": true}') + return rsp elif 'action' in querydict: if querydict['action'] == 'break': consolesessions[querydict['session']]['session'].send_break() @@ -860,16 +871,16 @@ def resourcehandler_backend(env, start_response): consolesessions[querydict['session']]['session'].reopen() else: start_response('400 Bad Request') - yield 'Unrecognized action ' + querydict['action'] - return + rsp.write(b'Unrecognized action ' + querydict['action']) + return rsp start_response('200 OK', headers) - yield json.dumps({'session': querydict['session']}) + rsp.write(json.dumps({'session': querydict['session']})) + return rsp else: # no keys, but a session, means it's hooking to receive data sessid = querydict['session'] if sessid not in consolesessions: start_response('400 Expired Session', headers) - yield '' - return + return rsp consolesessions[sessid]['expiry'] = time.time() + 90 # add our thread to the 'inflight' to have a hook to terminate # a long polling request @@ -885,13 +896,12 @@ def resourcehandler_backend(env, start_response): mythreadid) if sessid not in consolesessions: start_response('400 Expired Session', headers) - yield '' - return + return rsp if loggedout is not None: consolesessions[sessid]['session'].destroy() start_response('401 Logged out', headers) - yield '{"loggedout": 1}' - return + rsp.write(b'{"loggedout": 1}') + return rsp bufferage = False if 'stampsent' not in consolesessions[sessid]: consolesessions[sessid]['stampsent'] = True @@ -905,69 +915,70 @@ def resourcehandler_backend(env, start_response): if bufferage is not False: rspdata['bufferage'] = bufferage try: - rsp = json.dumps(rspdata) + rspj = json.dumps(rspdata) except UnicodeDecodeError: try: - rsp = json.dumps(rspdata, encoding='cp437') + rspj = json.dumps(rspdata, encoding='cp437') except UnicodeDecodeError: - rsp = json.dumps({'session': querydict['session'], + rspj = json.dumps({'session': querydict['session'], 'data': 'DECODEERROR'}) start_response('200 OK', headers) - yield rsp - return + rsp.write(rspj) + return rsp else: # normal request - url = env['PATH_INFO'] + url = reqpath url = url.replace('.json', '') url = url.replace('.html', '') if url == '/sessions/current/info': - start_response('200 OK', headers) + rsp = await make_response('application/json', 200, cookies=cookies) sessinfo = {'username': authorized['username']} if 'authtoken' in authorized: sessinfo['authtoken'] = authorized['authtoken'] if 'sessionid' in authorized: sessinfo['sessionid'] = authorized['sessionid'] tlvdata.unicode_dictvalues(sessinfo) - yield json.dumps(sessinfo) - return + await rsp.write(json.dumps(sessinfo).encode('utf8')) + return rsp elif url.startswith('/sessions/current/webauthn/'): if not webauthn: start_response('501 Not Implemented', headers) - yield '' - return - for rsp in webauthn.handle_api_request(url, env, start_response, authorized['username'], cfgmgr, headers, reqbody, authorized): - yield rsp - return + return rsp + for rspd in webauthn.handle_api_request(url, env, start_response, authorized['username'], cfgmgr, headers, reqbody, authorized): + rsp.write(rspd) + return rsp resource = '.' + url[url.rindex('/'):] lquerydict = copy.deepcopy(querydict) try: hdlr = pluginapi.handle_path(url, operation, cfgmgr, querydict) - if 'HTTP_CONFLUENTASYNCID' in env: + if 'ConfluentAsyncId' in req.headers: confluent.asynchttp.run_handler(hdlr, env) - start_response('202 Accepted', headers) - yield 'Request queued' - return + await make_response('text/plain', 202, cookies=cookies) + rsp.write(b'Request queued') + return rsp pagecontent = "" if mimetype == 'text/html': for datum in _assemble_html(hdlr, resource, lquerydict, url, extension): pagecontent += datum else: - for datum in _assemble_json(hdlr, resource, url, extension): + async for datum in _assemble_json(hdlr, resource, url, extension): pagecontent += datum - start_response('200 OK', headers) + rsp = await make_response(mimetype, 200, cookies=cookies) if not isinstance(pagecontent, bytes): pagecontent = pagecontent.encode('utf-8') - yield pagecontent + await rsp.write(pagecontent) + return rsp except exc.ConfluentException as e: if ((not isinstance(e, exc.LockedCredentials)) and e.apierrorcode == 500): # raise generics to trigger the tracelog raise - start_response('{0} {1}'.format(e.apierrorcode, e.apierrorstr), - headers) - yield e.get_error_body() + rsp = await make_response(mimetype, e.apierrorcode, e.apierrorstr, + cookies=cookies) + await rsp.write(e.get_error_body().encode('utf8')) + return rsp def _assemble_html(responses, resource, querydict, url, extension): yield '