diff --git a/confluent_server/confluent/asynchttp.py b/confluent_server/confluent/asynchttp.py index 731d8726..9b7a1ba9 100644 --- a/confluent_server/confluent/asynchttp.py +++ b/confluent_server/confluent/asynchttp.py @@ -74,15 +74,18 @@ class AsyncTermRelation(object): class AsyncSession(object): - def __init__(self): + def __init__(self, wshandler=None): self.asyncid = _assign_asyncid(self) self.responses = collections.deque() + self.wshandler = wshandler self._evt = None self.termrelations = [] self.consoles = set([]) self.reaper = eventlet.spawn_after(15, self.destroy) def add(self, requestid, rsp): + if self.wshandler: + self.wshandler(messages.AsyncMessage((requestid, rsp))) if self.responses is None: return self.responses.append((requestid, rsp)) @@ -108,7 +111,7 @@ class AsyncSession(object): self._evt = None for console in self.consoles: _consolesessions[console]['session'].destroy() - self.consoles = None + self.consoles = set([]) self.responses = None del _asyncsessions[self.asyncid] @@ -159,14 +162,27 @@ def get_async(env, querydict): return _asyncsessions[env['HTTP_CONFLUENTASYNCID']]['asyncsession'] -def handle_async(env, querydict, threadset): +def handle_async(env, querydict, threadset, wshandler=None, wsin=None): global _cleanthread # This may be one of two things, a request for a new async stream # or a request for next data from async stream # httpapi otherwise handles requests an injecting them to queue if 'asyncid' not in querydict or not querydict['asyncid']: # This is a new request, create a new multiplexer - currsess = AsyncSession() + currsess = AsyncSession(wshandler) + if wshandler: # websocket mode, block until exited + # just stay alive until client goes away + wsin.send(' ASYNCID: {0}'.format(currsess.asyncid)) + clientmsg = True + while clientmsg: + # get input from ws... + clientmsg = wsin.wait() + if clientmsg and clientmsg[0] == '?': + wsin.send('?') + elif clientmsg: + print(repr(clientmsg)) + currsess.destroy() + return yield messages.AsyncSession(currsess.asyncid) return if querydict['asyncid'] not in _asyncsessions: @@ -190,4 +206,4 @@ def handle_async(env, querydict, threadset): def set_console_sessions(consolesessions): global _consolesessions - _consolesessions = consolesessions \ No newline at end of file + _consolesessions = consolesessions diff --git a/confluent_server/confluent/httpapi.py b/confluent_server/confluent/httpapi.py index a287971c..aa869a0f 100644 --- a/confluent_server/confluent/httpapi.py +++ b/confluent_server/confluent/httpapi.py @@ -392,7 +392,7 @@ def _assign_consessionid(consolesession): return sessid @eventlet.websocket.WebSocketWSGI.configured( - supported_protocols=['confluent.console']) + supported_protocols=['confluent.console', 'confluent.asyncweb']) def wsock_handler(ws): sessid = ws.wait() sessid = sessid.replace('ConfluentSessionId:', '') @@ -411,6 +411,13 @@ def wsock_handler(ws): authdata = auth.authorize(name, ws.path) if not authdata: return + if ws.path == '/sessions/current/async': + def asyncwscallback(rsp): + rsp = json.dumps(rsp.raw()) + ws.send(u'!' + rsp) + for res in confluent.asynchttp.handle_async({}, {}, None, asyncwscallback, ws): + pass + return if '/console/session' in ws.path or '/shell/sessions/' in ws.path: #hard bake JSON into this path, do not support other incarnations if '/console/session' in ws.path: