diff --git a/confluent_server/confluent/asynchttp.py b/confluent_server/confluent/asynchttp.py index 64dca6a9..43fe5362 100644 --- a/confluent_server/confluent/asynchttp.py +++ b/confluent_server/confluent/asynchttp.py @@ -44,6 +44,7 @@ import confluent.exceptions as exc import confluent.messages as messages import confluent.util as util import eventlet +import greenlet _asyncsessions = {} _cleanthread = None @@ -110,7 +111,7 @@ def run_handler(hdlr, env): return requestid -def handle_async(env, querydict): +def handle_async(env, querydict, threadset): global _cleanthread # This may be one of two things, a request for a new async stream # or a request for next data from async stream @@ -120,9 +121,17 @@ def handle_async(env, querydict): currsess = AsyncSession() yield messages.AsyncSession(currsess.asyncid) return - currsess = _asyncsessions[querydict['asyncid']]['asyncsession'] - for rsp in currsess.get_responses(): - yield messages.AsyncMessage(rsp) - - - + mythreadid = greenlet.getcurrent() + threadset.add(mythreadid) + loggedout = None + currsess = None + try: + currsess = _asyncsessions[querydict['asyncid']]['asyncsession'] + for rsp in currsess.get_responses(): + yield messages.AsyncMessage(rsp) + except greenlet.GreenletExit as ge: + loggedout = ge + threadset.discard(mythreadid) + if loggedout is not None: + currsess.destroy() + raise exc.LoggedOut() \ No newline at end of file diff --git a/confluent_server/confluent/exceptions.py b/confluent_server/confluent/exceptions.py index 9bf7b623..bea78955 100644 --- a/confluent_server/confluent/exceptions.py +++ b/confluent_server/confluent/exceptions.py @@ -89,3 +89,10 @@ class PubkeyInvalid(ConfluentException): def get_error_body(self): return self.errorbody + +class LoggedOut(ConfluentException): + apierrorcode = 401 + apierrorstr = '401 - Logged out' + + def get_error_body(self): + return '{"loggedout": 1}' diff --git a/confluent_server/confluent/httpapi.py b/confluent_server/confluent/httpapi.py index 755c42e9..7a1159d9 100644 --- a/confluent_server/confluent/httpapi.py +++ b/confluent_server/confluent/httpapi.py @@ -355,12 +355,22 @@ def resourcehandler_backend(env, start_response): cfgmgr = authorized['cfgmgr'] if (operation == 'create') and env['PATH_INFO'] == '/sessions/current/async': pagecontent = "" - for rsp in _assemble_json( - confluent.asynchttp.handle_async(env, querydict)): - pagecontent += rsp - start_response("200 OK", headers) - yield pagecontent - return + try: + for rsp in _assemble_json( + confluent.asynchttp.handle_async( + env, querydict, + httpsessions[authorized['sessionid']]['inflight'])): + pagecontent += rsp + start_response("200 OK", headers) + yield pagecontent + return + except exc.ConfluentException as e: + if e.apierrorcode == 500: + # raise generics to trigger the tracelog + raise + start_response('{0} {1}'.format(e.apierrorcode, e.apierrorstr), + headers) + yield e.get_error_body() elif (operation == 'create' and ('/console/session' in env['PATH_INFO'] or '/shell/sessions/' in env['PATH_INFO'])): #hard bake JSON into this path, do not support other incarnations