diff --git a/confluent_server/confluent/asynchttp.py b/confluent_server/confluent/asynchttp.py index 9516b223..d9353c16 100644 --- a/confluent_server/confluent/asynchttp.py +++ b/confluent_server/confluent/asynchttp.py @@ -163,7 +163,7 @@ def get_async(env, querydict): return _asyncsessions[env['HTTP_CONFLUENTASYNCID']]['asyncsession'] -def handle_async(env, querydict, threadset, wshandler=None, wsin=None): +def handle_async(env, querydict, threadset, wshandler=None): global _cleanthread # This may be one of two things, a request for a new async stream # or a request for next data from async stream @@ -171,21 +171,8 @@ def handle_async(env, querydict, threadset, wshandler=None, wsin=None): if 'asyncid' not in querydict or not querydict['asyncid']: # This is a new request, create a new multiplexer currsess = AsyncSession(wshandler) - if wshandler: # websocket mode, block until exited - # just stay alive until client goes away - mythreadid = greenlet.getcurrent() - threadset.add(mythreadid) - wsin.send(' ASYNCID: {0}'.format(currsess.asyncid)) - clientmsg = True - while clientmsg: - # get input from ws... - clientmsg = wsin.wait() - if clientmsg and clientmsg[0] == '?': - wsin.send('?') - elif clientmsg: - print(repr(clientmsg)) - currsess.destroy() - threadset.discard(mythreadid) + if wshandler: + yield currsess return yield messages.AsyncSession(currsess.asyncid) return diff --git a/confluent_server/confluent/httpapi.py b/confluent_server/confluent/httpapi.py index d09688cb..89af5cdc 100644 --- a/confluent_server/confluent/httpapi.py +++ b/confluent_server/confluent/httpapi.py @@ -391,6 +391,25 @@ def _assign_consessionid(consolesession): 'expiry': time.time() + 60} return sessid + +def websockify_data(data): + if isinstance(data, dict): + data = json.dumps(data) + data = u'!' + data + else: + try: + data = data.decode('utf8') + except UnicodeDecodeError: + data = data.decode('cp437') + data = u' ' + data + return data + +def datacallback_bound(clientsessid, ws): + def datacallback(data): + data = websockify_data(data) + ws.send('${0}$'.format(clientsessid) + data) + return datacallback + @eventlet.websocket.WebSocketWSGI.configured( supported_protocols=['confluent.console', 'confluent.asyncweb']) def wsock_handler(ws): @@ -413,15 +432,85 @@ def wsock_handler(ws): authdata = auth.authorize(name, ws.path) if not authdata: return + cfgmgr = httpsessions[sessid]['cfgmgr'] + username = httpsessions[sessid]['name'] if ws.path == '/sessions/current/async': + myconsoles = {} def asyncwscallback(rsp): rsp = json.dumps(rsp.raw()) ws.send(u'!' + rsp) - for res in confluent.asynchttp.handle_async( - {}, {}, currsess['inflight'], asyncwscallback, ws): - pass + mythreadid = greenlet.getcurrent() + currsess['inflight'].add(mythreadid) + asess = None + try: + for asess in confluent.asynchttp.handle_async( + {}, {}, currsess['inflight'], asyncwscallback): + ws.send(u' ASYNCID: {0}'.format(asess.asyncid)) + clientmsg = True + while clientmsg: + clientmsg = ws.wait() + if clientmsg: + if clientmsg[0] == '?': + ws.send('?') + elif clientmsg[0] == '$': + targid, data = clientmsg[1:].split('$', 1) + if data[0] == ' ': + myconsoles[targid].write(data[1:]) + elif clientmsg[0] == '!': + msg = json.loads(clientmsg[1:]) + action = msg.get('operation', None) + if action == 'start': + targ = msg['target'] + if '/console/session' in targ or '/shell/sessions' in targ: + width = msg['width'] + height = msg['height'] + clientsessid = '{0}'.format(msg['sessid']) + skipreplay = msg.get('skipreplay', False) + delimit = None + if '/console/session' in targ: + delimit = '/console/session' + shellsession = False + else: + delimit = '/shell/sessions' + shellsession = True + node = targ.split(delimit, 1)[0] + node = node.rsplit('/', 1)[-1] + auditmsg = {'operation': 'start', 'target': targ, + 'user': util.stringify(username)} + auditlog.log(auditmsg) + datacallback = datacallback_bound(clientsessid, ws) + if shellsession: + consession = shellserver.ShellSession( + node=node, configmanager=cfgmgr, + username=username, skipreplay=skipreplay, + datacallback=datacallback, + width=width, height=height) + else: + consession = consoleserver.ConsoleSession( + node=node, configmanager=cfgmgr, + username=username, skipreplay=skipreplay, + datacallback=datacallback, + width=width, height=height) + myconsoles[clientsessid] = consession + + else: + print(repr(clientmsg)) + finally: + for cons in myconsoles: + myconsoles[cons].destroy() + if asess: + asess.destroy() + currsess['inflight'].discard(mythreadid) return if '/console/session' in ws.path or '/shell/sessions/' in ws.path: + def datacallback(data): + ws.send(websockify_data(data)) + geom = ws.wait() + geom = geom[1:] + geom = json.loads(geom) + width = geom['width'] + height = geom['height'] + skipreplay = geom.get('skipreplay', False) #hard bake JSON into this path, do not support other incarnations if '/console/session' in ws.path: prefix, _, _ = ws.path.partition('/console/session') @@ -430,24 +519,7 @@ def wsock_handler(ws): prefix, _, _ = ws.path.partition('/shell/sessions') shellsession = True _, _, nodename = prefix.rpartition('/') - geom = ws.wait() - geom = geom[1:] - geom = json.loads(geom) - width = geom['width'] - height = geom['height'] - skipreplay = geom.get('skipreplay', False) - cfgmgr = httpsessions[sessid]['cfgmgr'] - username = httpsessions[sessid]['name'] - def datacallback(data): - if isinstance(data, dict): - data = json.dumps(data) - ws.send(u'!' + data) - else: - try: - data = data.decode('utf8') - except UnicodeDecodeError: - data = data.decode('cp437') - ws.send(u' ' + data) + try: if shellsession: consession = shellserver.ShellSession(