mirror of
https://github.com/xcat2/confluent.git
synced 2025-08-26 05:00:46 +00:00
Have async sessions detect logout
This commit is contained in:
@@ -44,6 +44,7 @@ import confluent.exceptions as exc
|
||||
import confluent.messages as messages
|
||||
import confluent.util as util
|
||||
import eventlet
|
||||
import greenlet
|
||||
|
||||
_asyncsessions = {}
|
||||
_cleanthread = None
|
||||
@@ -110,7 +111,7 @@ def run_handler(hdlr, env):
|
||||
return requestid
|
||||
|
||||
|
||||
def handle_async(env, querydict):
|
||||
def handle_async(env, querydict, threadset):
|
||||
global _cleanthread
|
||||
# This may be one of two things, a request for a new async stream
|
||||
# or a request for next data from async stream
|
||||
@@ -120,9 +121,17 @@ def handle_async(env, querydict):
|
||||
currsess = AsyncSession()
|
||||
yield messages.AsyncSession(currsess.asyncid)
|
||||
return
|
||||
currsess = _asyncsessions[querydict['asyncid']]['asyncsession']
|
||||
for rsp in currsess.get_responses():
|
||||
yield messages.AsyncMessage(rsp)
|
||||
|
||||
|
||||
|
||||
mythreadid = greenlet.getcurrent()
|
||||
threadset.add(mythreadid)
|
||||
loggedout = None
|
||||
currsess = None
|
||||
try:
|
||||
currsess = _asyncsessions[querydict['asyncid']]['asyncsession']
|
||||
for rsp in currsess.get_responses():
|
||||
yield messages.AsyncMessage(rsp)
|
||||
except greenlet.GreenletExit as ge:
|
||||
loggedout = ge
|
||||
threadset.discard(mythreadid)
|
||||
if loggedout is not None:
|
||||
currsess.destroy()
|
||||
raise exc.LoggedOut()
|
@@ -89,3 +89,10 @@ class PubkeyInvalid(ConfluentException):
|
||||
|
||||
def get_error_body(self):
|
||||
return self.errorbody
|
||||
|
||||
class LoggedOut(ConfluentException):
|
||||
apierrorcode = 401
|
||||
apierrorstr = '401 - Logged out'
|
||||
|
||||
def get_error_body(self):
|
||||
return '{"loggedout": 1}'
|
||||
|
@@ -355,12 +355,22 @@ def resourcehandler_backend(env, start_response):
|
||||
cfgmgr = authorized['cfgmgr']
|
||||
if (operation == 'create') and env['PATH_INFO'] == '/sessions/current/async':
|
||||
pagecontent = ""
|
||||
for rsp in _assemble_json(
|
||||
confluent.asynchttp.handle_async(env, querydict)):
|
||||
pagecontent += rsp
|
||||
start_response("200 OK", headers)
|
||||
yield pagecontent
|
||||
return
|
||||
try:
|
||||
for rsp in _assemble_json(
|
||||
confluent.asynchttp.handle_async(
|
||||
env, querydict,
|
||||
httpsessions[authorized['sessionid']]['inflight'])):
|
||||
pagecontent += rsp
|
||||
start_response("200 OK", headers)
|
||||
yield pagecontent
|
||||
return
|
||||
except exc.ConfluentException as e:
|
||||
if e.apierrorcode == 500:
|
||||
# raise generics to trigger the tracelog
|
||||
raise
|
||||
start_response('{0} {1}'.format(e.apierrorcode, e.apierrorstr),
|
||||
headers)
|
||||
yield e.get_error_body()
|
||||
elif (operation == 'create' and ('/console/session' in env['PATH_INFO'] or
|
||||
'/shell/sessions/' in env['PATH_INFO'])):
|
||||
#hard bake JSON into this path, do not support other incarnations
|
||||
|
Reference in New Issue
Block a user