2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-03-19 18:07:48 +00:00

Implement async session on websocket

This eliminates long polling and sets the stage to restore
socket sharing by terminals and shells.
This commit is contained in:
Jarrod Johnson 2022-04-05 16:33:03 -04:00
parent 8ef91c16c0
commit 105536656e
2 changed files with 29 additions and 6 deletions

View File

@ -74,15 +74,18 @@ class AsyncTermRelation(object):
class AsyncSession(object):
def __init__(self):
def __init__(self, wshandler=None):
self.asyncid = _assign_asyncid(self)
self.responses = collections.deque()
self.wshandler = wshandler
self._evt = None
self.termrelations = []
self.consoles = set([])
self.reaper = eventlet.spawn_after(15, self.destroy)
def add(self, requestid, rsp):
if self.wshandler:
self.wshandler(messages.AsyncMessage((requestid, rsp)))
if self.responses is None:
return
self.responses.append((requestid, rsp))
@ -108,7 +111,7 @@ class AsyncSession(object):
self._evt = None
for console in self.consoles:
_consolesessions[console]['session'].destroy()
self.consoles = None
self.consoles = set([])
self.responses = None
del _asyncsessions[self.asyncid]
@ -159,14 +162,27 @@ def get_async(env, querydict):
return _asyncsessions[env['HTTP_CONFLUENTASYNCID']]['asyncsession']
def handle_async(env, querydict, threadset):
def handle_async(env, querydict, threadset, wshandler=None, wsin=None):
global _cleanthread
# This may be one of two things, a request for a new async stream
# or a request for next data from async stream
# httpapi otherwise handles requests an injecting them to queue
if 'asyncid' not in querydict or not querydict['asyncid']:
# This is a new request, create a new multiplexer
currsess = AsyncSession()
currsess = AsyncSession(wshandler)
if wshandler: # websocket mode, block until exited
# just stay alive until client goes away
wsin.send(' ASYNCID: {0}'.format(currsess.asyncid))
clientmsg = True
while clientmsg:
# get input from ws...
clientmsg = wsin.wait()
if clientmsg and clientmsg[0] == '?':
wsin.send('?')
elif clientmsg:
print(repr(clientmsg))
currsess.destroy()
return
yield messages.AsyncSession(currsess.asyncid)
return
if querydict['asyncid'] not in _asyncsessions:
@ -190,4 +206,4 @@ def handle_async(env, querydict, threadset):
def set_console_sessions(consolesessions):
global _consolesessions
_consolesessions = consolesessions
_consolesessions = consolesessions

View File

@ -392,7 +392,7 @@ def _assign_consessionid(consolesession):
return sessid
@eventlet.websocket.WebSocketWSGI.configured(
supported_protocols=['confluent.console'])
supported_protocols=['confluent.console', 'confluent.asyncweb'])
def wsock_handler(ws):
sessid = ws.wait()
sessid = sessid.replace('ConfluentSessionId:', '')
@ -411,6 +411,13 @@ def wsock_handler(ws):
authdata = auth.authorize(name, ws.path)
if not authdata:
return
if ws.path == '/sessions/current/async':
def asyncwscallback(rsp):
rsp = json.dumps(rsp.raw())
ws.send(u'!' + rsp)
for res in confluent.asynchttp.handle_async({}, {}, None, asyncwscallback, ws):
pass
return
if '/console/session' in ws.path or '/shell/sessions/' in ws.path:
#hard bake JSON into this path, do not support other incarnations
if '/console/session' in ws.path: