From 7a0dee8af83716ff8ddfa226cb5668c34c6d16c5 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Tue, 5 Apr 2022 16:56:36 -0400 Subject: [PATCH] Fix keepalive/logout behavior in ws async Use the websocket liveness as the keepalive, so the reaper is not scheduled for such sockets. Additionally, register the async thread as to be killed on logout. --- confluent_server/confluent/asynchttp.py | 6 +++++- confluent_server/confluent/httpapi.py | 8 +++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/confluent_server/confluent/asynchttp.py b/confluent_server/confluent/asynchttp.py index 9b7a1ba9..9516b223 100644 --- a/confluent_server/confluent/asynchttp.py +++ b/confluent_server/confluent/asynchttp.py @@ -81,7 +81,8 @@ class AsyncSession(object): self._evt = None self.termrelations = [] self.consoles = set([]) - self.reaper = eventlet.spawn_after(15, self.destroy) + if not wshandler: + self.reaper = eventlet.spawn_after(15, self.destroy) def add(self, requestid, rsp): if self.wshandler: @@ -172,6 +173,8 @@ def handle_async(env, querydict, threadset, wshandler=None, wsin=None): currsess = AsyncSession(wshandler) if wshandler: # websocket mode, block until exited # just stay alive until client goes away + mythreadid = greenlet.getcurrent() + threadset.add(mythreadid) wsin.send(' ASYNCID: {0}'.format(currsess.asyncid)) clientmsg = True while clientmsg: @@ -182,6 +185,7 @@ def handle_async(env, querydict, threadset, wshandler=None, wsin=None): elif clientmsg: print(repr(clientmsg)) currsess.destroy() + threadset.discard(mythreadid) return yield messages.AsyncSession(currsess.asyncid) return diff --git a/confluent_server/confluent/httpapi.py b/confluent_server/confluent/httpapi.py index aa869a0f..d09688cb 100644 --- a/confluent_server/confluent/httpapi.py +++ b/confluent_server/confluent/httpapi.py @@ -395,6 +395,8 @@ def _assign_consessionid(consolesession): supported_protocols=['confluent.console', 'confluent.asyncweb']) def wsock_handler(ws): sessid = ws.wait() + if not sessid: + return sessid = sessid.replace('ConfluentSessionId:', '') sessid = sessid[:-1] currsess = httpsessions.get(sessid, None) @@ -415,7 +417,8 @@ def wsock_handler(ws): def asyncwscallback(rsp): rsp = json.dumps(rsp.raw()) ws.send(u'!' + rsp) - for res in confluent.asynchttp.handle_async({}, {}, None, asyncwscallback, ws): + for res in confluent.asynchttp.handle_async( + {}, {}, currsess['inflight'], asyncwscallback, ws): pass return if '/console/session' in ws.path or '/shell/sessions/' in ws.path: @@ -460,6 +463,8 @@ def wsock_handler(ws): ) except exc.NotFoundException: return + mythreadid = greenlet.getcurrent() + currsess['inflight'].add(mythreadid) clientmsg = ws.wait() try: while clientmsg is not None: @@ -477,6 +482,7 @@ def wsock_handler(ws): ws.send(u'?') clientmsg = ws.wait() finally: + currsess['inflight'].discard(mythreadid) consession.destroy()