2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-02-19 20:16:04 +00:00

Add support for consolidated term websocket

Since browsers can be stingy with websockets, have
the consoles all share a single websocket.
This commit is contained in:
Jarrod Johnson 2022-04-13 16:08:13 -04:00
parent 272398884c
commit 2bf9a6d415
2 changed files with 96 additions and 37 deletions

View File

@ -163,7 +163,7 @@ def get_async(env, querydict):
return _asyncsessions[env['HTTP_CONFLUENTASYNCID']]['asyncsession']
def handle_async(env, querydict, threadset, wshandler=None, wsin=None):
def handle_async(env, querydict, threadset, wshandler=None):
global _cleanthread
# This may be one of two things, a request for a new async stream
# or a request for next data from async stream
@ -171,21 +171,8 @@ def handle_async(env, querydict, threadset, wshandler=None, wsin=None):
if 'asyncid' not in querydict or not querydict['asyncid']:
# This is a new request, create a new multiplexer
currsess = AsyncSession(wshandler)
if wshandler: # websocket mode, block until exited
# just stay alive until client goes away
mythreadid = greenlet.getcurrent()
threadset.add(mythreadid)
wsin.send(' ASYNCID: {0}'.format(currsess.asyncid))
clientmsg = True
while clientmsg:
# get input from ws...
clientmsg = wsin.wait()
if clientmsg and clientmsg[0] == '?':
wsin.send('?')
elif clientmsg:
print(repr(clientmsg))
currsess.destroy()
threadset.discard(mythreadid)
if wshandler:
yield currsess
return
yield messages.AsyncSession(currsess.asyncid)
return

View File

@ -391,6 +391,25 @@ def _assign_consessionid(consolesession):
'expiry': time.time() + 60}
return sessid
def websockify_data(data):
if isinstance(data, dict):
data = json.dumps(data)
data = u'!' + data
else:
try:
data = data.decode('utf8')
except UnicodeDecodeError:
data = data.decode('cp437')
data = u' ' + data
return data
def datacallback_bound(clientsessid, ws):
def datacallback(data):
data = websockify_data(data)
ws.send('${0}$'.format(clientsessid) + data)
return datacallback
@eventlet.websocket.WebSocketWSGI.configured(
supported_protocols=['confluent.console', 'confluent.asyncweb'])
def wsock_handler(ws):
@ -413,15 +432,85 @@ def wsock_handler(ws):
authdata = auth.authorize(name, ws.path)
if not authdata:
return
cfgmgr = httpsessions[sessid]['cfgmgr']
username = httpsessions[sessid]['name']
if ws.path == '/sessions/current/async':
myconsoles = {}
def asyncwscallback(rsp):
rsp = json.dumps(rsp.raw())
ws.send(u'!' + rsp)
for res in confluent.asynchttp.handle_async(
{}, {}, currsess['inflight'], asyncwscallback, ws):
pass
mythreadid = greenlet.getcurrent()
currsess['inflight'].add(mythreadid)
asess = None
try:
for asess in confluent.asynchttp.handle_async(
{}, {}, currsess['inflight'], asyncwscallback):
ws.send(u' ASYNCID: {0}'.format(asess.asyncid))
clientmsg = True
while clientmsg:
clientmsg = ws.wait()
if clientmsg:
if clientmsg[0] == '?':
ws.send('?')
elif clientmsg[0] == '$':
targid, data = clientmsg[1:].split('$', 1)
if data[0] == ' ':
myconsoles[targid].write(data[1:])
elif clientmsg[0] == '!':
msg = json.loads(clientmsg[1:])
action = msg.get('operation', None)
if action == 'start':
targ = msg['target']
if '/console/session' in targ or '/shell/sessions' in targ:
width = msg['width']
height = msg['height']
clientsessid = '{0}'.format(msg['sessid'])
skipreplay = msg.get('skipreplay', False)
delimit = None
if '/console/session' in targ:
delimit = '/console/session'
shellsession = False
else:
delimit = '/shell/sessions'
shellsession = True
node = targ.split(delimit, 1)[0]
node = node.rsplit('/', 1)[-1]
auditmsg = {'operation': 'start', 'target': targ,
'user': util.stringify(username)}
auditlog.log(auditmsg)
datacallback = datacallback_bound(clientsessid, ws)
if shellsession:
consession = shellserver.ShellSession(
node=node, configmanager=cfgmgr,
username=username, skipreplay=skipreplay,
datacallback=datacallback,
width=width, height=height)
else:
consession = consoleserver.ConsoleSession(
node=node, configmanager=cfgmgr,
username=username, skipreplay=skipreplay,
datacallback=datacallback,
width=width, height=height)
myconsoles[clientsessid] = consession
else:
print(repr(clientmsg))
finally:
for cons in myconsoles:
myconsoles[cons].destroy()
if asess:
asess.destroy()
currsess['inflight'].discard(mythreadid)
return
if '/console/session' in ws.path or '/shell/sessions/' in ws.path:
def datacallback(data):
ws.send(websockify_data(data))
geom = ws.wait()
geom = geom[1:]
geom = json.loads(geom)
width = geom['width']
height = geom['height']
skipreplay = geom.get('skipreplay', False)
#hard bake JSON into this path, do not support other incarnations
if '/console/session' in ws.path:
prefix, _, _ = ws.path.partition('/console/session')
@ -430,24 +519,7 @@ def wsock_handler(ws):
prefix, _, _ = ws.path.partition('/shell/sessions')
shellsession = True
_, _, nodename = prefix.rpartition('/')
geom = ws.wait()
geom = geom[1:]
geom = json.loads(geom)
width = geom['width']
height = geom['height']
skipreplay = geom.get('skipreplay', False)
cfgmgr = httpsessions[sessid]['cfgmgr']
username = httpsessions[sessid]['name']
def datacallback(data):
if isinstance(data, dict):
data = json.dumps(data)
ws.send(u'!' + data)
else:
try:
data = data.decode('utf8')
except UnicodeDecodeError:
data = data.decode('cp437')
ws.send(u' ' + data)
try:
if shellsession:
consession = shellserver.ShellSession(