2
0
mirror of https://github.com/xcat2/confluent.git synced 2024-11-22 01:22:00 +00:00

Fix keepalive/logout behavior in ws async

Use the websocket liveness as the keepalive, so
the reaper is not scheduled for such sockets.

Additionally, register the async thread as to be killed on logout.
This commit is contained in:
Jarrod Johnson 2022-04-05 16:56:36 -04:00
parent 105536656e
commit 7a0dee8af8
2 changed files with 12 additions and 2 deletions

View File

@ -81,7 +81,8 @@ class AsyncSession(object):
self._evt = None
self.termrelations = []
self.consoles = set([])
self.reaper = eventlet.spawn_after(15, self.destroy)
if not wshandler:
self.reaper = eventlet.spawn_after(15, self.destroy)
def add(self, requestid, rsp):
if self.wshandler:
@ -172,6 +173,8 @@ def handle_async(env, querydict, threadset, wshandler=None, wsin=None):
currsess = AsyncSession(wshandler)
if wshandler: # websocket mode, block until exited
# just stay alive until client goes away
mythreadid = greenlet.getcurrent()
threadset.add(mythreadid)
wsin.send(' ASYNCID: {0}'.format(currsess.asyncid))
clientmsg = True
while clientmsg:
@ -182,6 +185,7 @@ def handle_async(env, querydict, threadset, wshandler=None, wsin=None):
elif clientmsg:
print(repr(clientmsg))
currsess.destroy()
threadset.discard(mythreadid)
return
yield messages.AsyncSession(currsess.asyncid)
return

View File

@ -395,6 +395,8 @@ def _assign_consessionid(consolesession):
supported_protocols=['confluent.console', 'confluent.asyncweb'])
def wsock_handler(ws):
sessid = ws.wait()
if not sessid:
return
sessid = sessid.replace('ConfluentSessionId:', '')
sessid = sessid[:-1]
currsess = httpsessions.get(sessid, None)
@ -415,7 +417,8 @@ def wsock_handler(ws):
def asyncwscallback(rsp):
rsp = json.dumps(rsp.raw())
ws.send(u'!' + rsp)
for res in confluent.asynchttp.handle_async({}, {}, None, asyncwscallback, ws):
for res in confluent.asynchttp.handle_async(
{}, {}, currsess['inflight'], asyncwscallback, ws):
pass
return
if '/console/session' in ws.path or '/shell/sessions/' in ws.path:
@ -460,6 +463,8 @@ def wsock_handler(ws):
)
except exc.NotFoundException:
return
mythreadid = greenlet.getcurrent()
currsess['inflight'].add(mythreadid)
clientmsg = ws.wait()
try:
while clientmsg is not None:
@ -477,6 +482,7 @@ def wsock_handler(ws):
ws.send(u'?')
clientmsg = ws.wait()
finally:
currsess['inflight'].discard(mythreadid)
consession.destroy()