2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-01-14 19:57:50 +00:00

Begin draft work to support websocket

First use case will be consoles.

Then we can consider the async case
for smoother async operation as well
though that is less critical.
This commit is contained in:
Jarrod Johnson 2021-04-19 17:04:26 -04:00
parent 0fae615756
commit ab6bf82da6

View File

@ -47,6 +47,7 @@ try:
import urlparse
except ModuleNotFoundError:
import urllib.parse as urlparse
import eventlet.websocket
import eventlet.wsgi
#scgi = eventlet.import_patched('flup.server.scgi')
tlvdata = confluent.tlvdata
@ -379,9 +380,48 @@ def _assign_consessionid(consolesession):
'expiry': time.time() + 60}
return sessid
@eventlet.websocket.WebSocketWSGI
def wsock_handler(ws):
sessid = ws.wait()
sessid = sessid.replace('ConfluentSessionId:', '')
sessid = sessid[:-1]
currsess = httpsessions.get(sessid, None)
if not currsess:
return
authtoken = ws.wait()
authtoken = authtoken.replace('ConfluentAuthToken:', '')
authtoken = authtoken[:-1]
if currsess['csrftoken'] != authtoken:
return
pathrequested = ws.wait()
mythreadid = greenlet.getcurrent()
httpsessions[sessid]['inflight'].add(mythreadid)
name = httpsessions[sessid]['name']
authdata = auth.authorize(name, ws.path)
if not authdata:
return
if '/console/session' in ws.path or '/shell/sessions/' in ws.path:
#hard bake JSON into this path, do not support other incarnations
if '/console/session' in ws.path:
prefix, _, _ = ws.path.partition('/console/session')
shellsession = False
elif '/shell/sessions/' in ws.path:
prefix, _, _ = ws.path.partition('/shell/sessions')
shellsession = True
_, _, nodename = prefix.rpartition('/')
# ok, here we need to set up a callback handler
# note that currently, it's always json wrapped
# it may be wise to send text back straight
# for example, could assume our json always has '{'
# as the first character, else plain text since it is console
# we will loop on ws.wait() to input data, and the data handler
# callback will send
def resourcehandler(env, start_response):
try:
if 'HTTP_SEC_WEBSOCKET_VERSION' in env:
return wsock_handler(env, start_response)
for rsp in resourcehandler_backend(env, start_response):
yield rsp
except Exception as e:
@ -632,6 +672,8 @@ def resourcehandler_backend(env, start_response):
sessinfo = {'username': authorized['username']}
if 'authtoken' in authorized:
sessinfo['authtoken'] = authorized['authtoken']
if 'sessionid' in authorized:
sessinfo['sessionid'] = authorized['sessionid']
tlvdata.unicode_dictvalues(sessinfo)
yield json.dumps(sessinfo)
return