diff --git a/confluent_server/confluent/asynchttp.py b/confluent_server/confluent/asynchttp.py index 9b7a1ba9..9516b223 100644 --- a/confluent_server/confluent/asynchttp.py +++ b/confluent_server/confluent/asynchttp.py @@ -81,7 +81,8 @@ class AsyncSession(object): self._evt = None self.termrelations = [] self.consoles = set([]) - self.reaper = eventlet.spawn_after(15, self.destroy) + if not wshandler: + self.reaper = eventlet.spawn_after(15, self.destroy) def add(self, requestid, rsp): if self.wshandler: @@ -172,6 +173,8 @@ def handle_async(env, querydict, threadset, wshandler=None, wsin=None): currsess = AsyncSession(wshandler) if wshandler: # websocket mode, block until exited # just stay alive until client goes away + mythreadid = greenlet.getcurrent() + threadset.add(mythreadid) wsin.send(' ASYNCID: {0}'.format(currsess.asyncid)) clientmsg = True while clientmsg: @@ -182,6 +185,7 @@ def handle_async(env, querydict, threadset, wshandler=None, wsin=None): elif clientmsg: print(repr(clientmsg)) currsess.destroy() + threadset.discard(mythreadid) return yield messages.AsyncSession(currsess.asyncid) return diff --git a/confluent_server/confluent/httpapi.py b/confluent_server/confluent/httpapi.py index aa869a0f..d09688cb 100644 --- a/confluent_server/confluent/httpapi.py +++ b/confluent_server/confluent/httpapi.py @@ -395,6 +395,8 @@ def _assign_consessionid(consolesession): supported_protocols=['confluent.console', 'confluent.asyncweb']) def wsock_handler(ws): sessid = ws.wait() + if not sessid: + return sessid = sessid.replace('ConfluentSessionId:', '') sessid = sessid[:-1] currsess = httpsessions.get(sessid, None) @@ -415,7 +417,8 @@ def wsock_handler(ws): def asyncwscallback(rsp): rsp = json.dumps(rsp.raw()) ws.send(u'!' + rsp) - for res in confluent.asynchttp.handle_async({}, {}, None, asyncwscallback, ws): + for res in confluent.asynchttp.handle_async( + {}, {}, currsess['inflight'], asyncwscallback, ws): pass return if '/console/session' in ws.path or '/shell/sessions/' in ws.path: @@ -460,6 +463,8 @@ def wsock_handler(ws): ) except exc.NotFoundException: return + mythreadid = greenlet.getcurrent() + currsess['inflight'].add(mythreadid) clientmsg = ws.wait() try: while clientmsg is not None: @@ -477,6 +482,7 @@ def wsock_handler(ws): ws.send(u'?') clientmsg = ws.wait() finally: + currsess['inflight'].discard(mythreadid) consession.destroy()