2
0
mirror of https://github.com/xcat2/confluent.git synced 2024-11-26 11:30:23 +00:00

Add terminal sessions to async http

This functionality enables a browser to hold more terminals open
than their max connection rating would normally allow.
This commit is contained in:
Jarrod Johnson 2016-03-25 14:50:47 -04:00
parent 3cd96a4f59
commit d753ac2833
3 changed files with 66 additions and 4 deletions

View File

@ -45,9 +45,11 @@ import confluent.messages as messages
import confluent.util as util
import eventlet
import greenlet
import time
_asyncsessions = {}
_cleanthread = None
_consolesessions = None
def _assign_asyncid(asyncsession):
@ -58,12 +60,26 @@ def _assign_asyncid(asyncsession):
return sessid
class AsyncTermRelation(object):
# Need to keep an association of term object to async
# This allows the async handler to know the context of
# outgoing data to provide to calling code
def __init__(self, termid, async):
self.async = async
self.termid = termid
def got_data(self, data):
self.async.add(data, self.termid)
class AsyncSession(object):
def __init__(self):
self.asyncid = _assign_asyncid(self)
self.responses = collections.deque()
self._evt = None
self.termrelations = []
self.consoles = set([])
self.reaper = eventlet.spawn_after(15, self.destroy)
def add(self, rsp, requestid):
@ -72,6 +88,18 @@ class AsyncSession(object):
self._evt.send()
self._evt = None
def set_term_relation(self, env):
# need a term relation to keep track of what data belongs
# to what object (since the callback does not provide context
# for data, and here ultimately the client is responsible
# for sorting out which is which.
termrel = AsyncTermRelation(['HTTP_CONFLUENTREQUESTID'], self)
self.termrelations.append(termrel)
return termrel
def add_console_session(self, sessionid):
self.consoles.add(sessionid)
def destroy(self):
if self._evt:
self._evt.send()
@ -85,6 +113,9 @@ class AsyncSession(object):
def get_responses(self, timeout=25):
self.reaper.cancel()
nextexpiry = time.time() + 90
for csess in self.consoles:
_consolesessions[csess]['expiry'] = nextexpiry
self.reaper = eventlet.spawn_after(timeout + 15, self.destroy)
if self._evt:
# TODO(jjohnson2): This precludes the goal of 'double barreled'
@ -111,6 +142,11 @@ def run_handler(hdlr, env):
return requestid
def get_async(env, querydict):
global _cleanthread
return _asyncsessions[querydict['asyncid']]['asyncsession']
def handle_async(env, querydict, threadset):
global _cleanthread
# This may be one of two things, a request for a new async stream
@ -134,4 +170,9 @@ def handle_async(env, querydict, threadset):
threadset.discard(mythreadid)
if loggedout is not None:
currsess.destroy()
raise exc.LoggedOut()
raise exc.LoggedOut()
def set_console_sessions(consolesessions):
global _consolesessions
_consolesessions = consolesessions

View File

@ -46,6 +46,7 @@ tlvdata = confluent.tlvdata
auditlog = None
tracelog = None
consolesessions = {}
confluent.asynchttp.set_console_sessions(consolesessions)
httpsessions = {}
opmap = {
'POST': 'create',
@ -394,15 +395,25 @@ def resourcehandler_backend(env, start_response):
skipreplay = False
if 'skipreplay' in querydict and querydict['skipreplay']:
skipreplay = True
datacallback = None
async = None
if 'HTTP_CONFLUENTASYNCID' in env:
async = confluent.asynchttp.get_async(env, querydict)
termrel = async.set_term_relation(env)
datacallback = termrel.got_data
try:
if shellsession:
consession = shellserver.ShellSession(
node=nodename, configmanager=cfgmgr,
username=authorized['username'], skipreplay=skipreplay)
username=authorized['username'], skipreplay=skipreplay,
datacallback=datacallback
)
else:
consession = consoleserver.ConsoleSession(
node=nodename, configmanager=cfgmgr,
username=authorized['username'], skipreplay=skipreplay)
username=authorized['username'], skipreplay=skipreplay,
datacallback=datacallback
)
except exc.NotFoundException:
start_response("404 Not found", headers)
yield "404 - Request Path not recognized"
@ -411,6 +422,8 @@ def resourcehandler_backend(env, start_response):
start_response("500 Internal Server Error", headers)
return
sessid = _assign_consessionid(consession)
if async:
async.add_consolesession(sessid)
start_response('200 OK', headers)
yield '{"session":"%s","data":""}' % sessid
return

View File

@ -851,9 +851,17 @@ class AsyncMessage(ConfluentMessage):
self.msgpair = pair
def raw(self):
rsp = self.msgpair[1]
rspdict = None
if isinstance(rsp, ConfluentMessage):
rspdict = rsp.raw()
elif isinstance(rsp, dict): # console metadata
rspdict = rsp
else: # terminal text
rspdict = {'data': rsp}
return {'asyncresponse':
{'requestid': self.msgpair[0],
'response': self.msgpair[1].raw()}}
'response': rspdict}}
class AsyncSession(ConfluentMessage):
def __init__(self, id):