2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-04-15 17:49:34 +00:00

Merge pull request #59 from jjohnson42/multiplex

Background HTTP Function
This commit is contained in:
Jarrod Johnson 2016-03-26 13:43:10 -04:00
commit 94d2be4a87
6 changed files with 301 additions and 20 deletions

View File

@ -19,15 +19,25 @@ import confluent.tlv as tlv
import json
import struct
def decodestr(value):
ret = None
try:
ret = value.decode('utf-8')
except UnicodeDecodeError:
try:
ret = value.decode('cp437')
except UnicodeDecodeError:
ret = value
return ret
def unicode_dictvalues(dictdata):
for key in dictdata:
if isinstance(dictdata[key], str):
dictdata[key] = dictdata[key].decode('utf-8')
dictdata[key] = decodestr(dictdata[key])
elif isinstance(dictdata[key], list):
for i in xrange(len(dictdata[key])):
if isinstance(dictdata[key][i], str):
dictdata[key][i] = dictdata[key][i].decode('utf-8')
dictdata[key][i] = decodestr(dictdata[key][i])
elif isinstance(dictdata[key][i], dict):
unicode_dictvalues(dictdata[key][i])
elif isinstance(dictdata[key], dict):

View File

@ -0,0 +1,184 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2016 Lenovo
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Overall, the result of this shall be:
# - Web clients can create the same out-of-order responsiveness as socket
# clients (but with more complexity on their end)
# - Web clients can share single request among console sessions
# - Web clients can get async notify of things like node add/remove, events
# This provides an async strategy to http clients. The design is that a http
# session may have an 'async' resource. In such a case, any requests are
# queued and immediately the response is given accepting the queued request.
# A request flags itself as queue-compatible through an HTTP header indicating
# the identifier of the async thread. As responses happen to the queued
# request, data is dispatched to the first registered poller for data on
# the session. This way, a client may elect to provide multiple pollers
# to mitigate general choppiness of http network pattern. It may not be
# worth it, but it's possible.
# Additionally support console session multiplexing, to mitigate needed
# connection count.
# Also, this should allow a client to register for notifications of things
# like node add/delete or an event firing, ultimately.
# Much like console sessions, these will be reaped if a client spends too
# far away.
import collections
import confluent.exceptions as exc
import confluent.messages as messages
import confluent.util as util
import eventlet
import greenlet
import time
_asyncsessions = {}
_cleanthread = None
_consolesessions = None
def _assign_asyncid(asyncsession):
sessid = util.randomstring(32)
while sessid in _asyncsessions:
sessid = util.randomstring(32)
_asyncsessions[sessid] = {'asyncsession': asyncsession}
return sessid
class AsyncTermRelation(object):
# Need to keep an association of term object to async
# This allows the async handler to know the context of
# outgoing data to provide to calling code
def __init__(self, termid, async):
self.async = async
self.termid = termid
def got_data(self, data):
self.async.add(self.termid, data)
class AsyncSession(object):
def __init__(self):
self.asyncid = _assign_asyncid(self)
self.responses = collections.deque()
self._evt = None
self.termrelations = []
self.consoles = set([])
self.reaper = eventlet.spawn_after(15, self.destroy)
def add(self, requestid, rsp):
self.responses.append((requestid, rsp))
if self._evt:
self._evt.send()
self._evt = None
def set_term_relation(self, env):
# need a term relation to keep track of what data belongs
# to what object (since the callback does not provide context
# for data, and here ultimately the client is responsible
# for sorting out which is which.
termrel = AsyncTermRelation(env['HTTP_CONFLUENTREQUESTID'], self)
self.termrelations.append(termrel)
return termrel
def add_console_session(self, sessionid):
self.consoles.add(sessionid)
def destroy(self):
if self._evt:
self._evt.send()
self._evt = None
for console in self.consoles:
_consolesessions[console]['session'].destroy()
self.consoles = None
del _asyncsessions[self.asyncid]
def run_handler(self, handler, requestid):
for rsp in handler:
self.add(requestid, rsp)
self.add(requestid, messages.AsyncCompletion())
def get_responses(self, timeout=25):
self.reaper.cancel()
self.reaper = eventlet.spawn_after(timeout + 15, self.destroy)
nextexpiry = time.time() + 90
for csess in list(self.consoles):
try:
_consolesessions[csess]['expiry'] = nextexpiry
except KeyError: # session has been closed elsewhere
self.consoles.discard(csess)
if self._evt:
# TODO(jjohnson2): This precludes the goal of 'double barreled'
# access.... revisit if this could matter
raise Exception('get_responses is not re-entrant')
if not self.responses: # wait to accumulate some
self._evt = eventlet.event.Event()
with eventlet.Timeout(timeout, False):
self._evt.wait()
self._evt = None
while self.responses:
yield self.responses.popleft()
def run_handler(hdlr, env):
asyncsessid = env['HTTP_CONFLUENTASYNCID']
try:
asyncsession = _asyncsessions[asyncsessid]['asyncsession']
requestid = env['HTTP_CONFLUENTREQUESTID']
except KeyError:
raise exc.InvalidArgumentException(
'Invalid Session ID or missing request id')
eventlet.spawn_n(asyncsession.run_handler, hdlr, requestid)
return requestid
def get_async(env, querydict):
global _cleanthread
return _asyncsessions[env['HTTP_CONFLUENTASYNCID']]['asyncsession']
def handle_async(env, querydict, threadset):
global _cleanthread
# This may be one of two things, a request for a new async stream
# or a request for next data from async stream
# httpapi otherwise handles requests an injecting them to queue
if 'asyncid' not in querydict or not querydict['asyncid']:
# This is a new request, create a new multiplexer
currsess = AsyncSession()
yield messages.AsyncSession(currsess.asyncid)
return
mythreadid = greenlet.getcurrent()
threadset.add(mythreadid)
loggedout = None
currsess = None
try:
currsess = _asyncsessions[querydict['asyncid']]['asyncsession']
for rsp in currsess.get_responses():
yield messages.AsyncMessage(rsp)
except greenlet.GreenletExit as ge:
loggedout = ge
threadset.discard(mythreadid)
if loggedout is not None:
currsess.destroy()
raise exc.LoggedOut()
def set_console_sessions(consolesessions):
global _consolesessions
_consolesessions = consolesessions

View File

@ -89,3 +89,10 @@ class PubkeyInvalid(ConfluentException):
def get_error_body(self):
return self.errorbody
class LoggedOut(ConfluentException):
apierrorcode = 401
apierrorstr = '401 - Logged out'
def get_error_body(self):
return '{"loggedout": 1}'

View File

@ -25,6 +25,7 @@ import confluent.exceptions as exc
import confluent.log as log
import confluent.messages
import confluent.core as pluginapi
import confluent.asynchttp
import confluent.shellserver as shellserver
import confluent.tlvdata
import confluent.util as util
@ -45,6 +46,7 @@ tlvdata = confluent.tlvdata
auditlog = None
tracelog = None
consolesessions = {}
confluent.asynchttp.set_console_sessions(consolesessions)
httpsessions = {}
opmap = {
'POST': 'create',
@ -352,7 +354,25 @@ def resourcehandler_backend(env, start_response):
("Set-Cookie", m.OutputString())
for m in authorized['cookie'].values())
cfgmgr = authorized['cfgmgr']
if (operation == 'create' and ('/console/session' in env['PATH_INFO'] or
if (operation == 'create') and env['PATH_INFO'] == '/sessions/current/async':
pagecontent = ""
try:
for rsp in _assemble_json(
confluent.asynchttp.handle_async(
env, querydict,
httpsessions[authorized['sessionid']]['inflight'])):
pagecontent += rsp
start_response("200 OK", headers)
yield pagecontent
return
except exc.ConfluentException as e:
if e.apierrorcode == 500:
# raise generics to trigger the tracelog
raise
start_response('{0} {1}'.format(e.apierrorcode, e.apierrorstr),
headers)
yield e.get_error_body()
elif (operation == 'create' and ('/console/session' in env['PATH_INFO'] or
'/shell/sessions/' in env['PATH_INFO'])):
#hard bake JSON into this path, do not support other incarnations
if '/console/session' in env['PATH_INFO']:
@ -375,15 +395,25 @@ def resourcehandler_backend(env, start_response):
skipreplay = False
if 'skipreplay' in querydict and querydict['skipreplay']:
skipreplay = True
datacallback = None
async = None
if 'HTTP_CONFLUENTASYNCID' in env:
async = confluent.asynchttp.get_async(env, querydict)
termrel = async.set_term_relation(env)
datacallback = termrel.got_data
try:
if shellsession:
consession = shellserver.ShellSession(
node=nodename, configmanager=cfgmgr,
username=authorized['username'], skipreplay=skipreplay)
username=authorized['username'], skipreplay=skipreplay,
datacallback=datacallback
)
else:
consession = consoleserver.ConsoleSession(
node=nodename, configmanager=cfgmgr,
username=authorized['username'], skipreplay=skipreplay)
username=authorized['username'], skipreplay=skipreplay,
datacallback=datacallback
)
except exc.NotFoundException:
start_response("404 Not found", headers)
yield "404 - Request Path not recognized"
@ -392,6 +422,8 @@ def resourcehandler_backend(env, start_response):
start_response("500 Internal Server Error", headers)
return
sessid = _assign_consessionid(consession)
if async:
async.add_console_session(sessid)
start_response('200 OK', headers)
yield '{"session":"%s","data":""}' % sessid
return
@ -477,7 +509,11 @@ def resourcehandler_backend(env, start_response):
try:
hdlr = pluginapi.handle_path(url, operation,
cfgmgr, querydict)
if 'HTTP_CONFLUENTASYNCID' in env:
confluent.asynchttp.run_handler(hdlr, env)
start_response('202 Accepted', headers)
yield 'Request queued'
return
pagecontent = ""
if mimetype == 'text/html':
for datum in _assemble_html(hdlr, resource, lquerydict, url,
@ -569,21 +605,21 @@ def _assemble_html(responses, resource, querydict, url, extension):
'</form></body></html>')
def _assemble_json(responses, resource, url, extension):
def _assemble_json(responses, resource=None, url=None, extension=None):
#NOTE(jbjohnso) I'm considering giving up on yielding bit by bit
#in json case over http. Notably, duplicate key values from plugin
#overwrite, but we'd want to preserve them into an array instead.
#the downside is that http would just always blurt it ll out at
#once and hold on to all the data in memory
links = {
'self': {"href": resource + extension},
}
if url == '/':
pass
elif resource[-1] == '/':
links['collection'] = {"href": "../" + extension}
else:
links['collection'] = {"href": "./" + extension}
links = {}
if resource is not None:
links['self'] = {"href": resource + extension}
if url == '/':
pass
elif resource[-1] == '/':
links['collection'] = {"href": "../" + extension}
else:
links['collection'] = {"href": "./" + extension}
rspdata = {}
for rsp in responses:
if isinstance(rsp, confluent.messages.LinkRelation):
@ -611,7 +647,7 @@ def _assemble_json(responses, resource, url, extension):
else:
rspdata[dk] = [rspdata[dk], rsp[dk]]
else:
if dk == 'databynode':
if dk == 'databynode' or dk == 'asyncresponse':
# a quirk, databynode suggests noderange
# multi response. This should *always* be a list,
# even if it will be length 1

View File

@ -50,6 +50,8 @@ try:
except ImportError:
havefcntl = False
#import multiprocessing
import gc
from greenlet import greenlet
import sys
import os
import signal
@ -129,6 +131,13 @@ def dumptrace(signalname, frame):
ht = open('/var/log/confluent/hangtraces', 'a')
ht.write('Dumping active trace on ' + time.strftime('%X %x\n'))
ht.write(''.join(traceback.format_stack(frame)))
for o in gc.get_objects():
if not isinstance(o, greenlet):
continue
if not o:
continue
ht.write('Thread trace:\n')
ht.write(''.join(traceback.format_stack(o.gr_frame)))
ht.close()
def doexit():

View File

@ -1,7 +1,7 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 IBM Corporation
# Copyright 2015 Lenovo
# Copyright 2015-2016 Lenovo
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -489,8 +489,8 @@ class InputCredential(ConfluentMessage):
if node not in self.credentials:
return {}
credential = self.credentials[node]
for attr in credentials:
if type(credentials[attr]) in (str, unicode):
for attr in credential:
if type(credential[attr]) in (str, unicode):
try:
# as above, use format() to see if string follows
# expression, store value back in case of escapes
@ -835,6 +835,41 @@ class EventCollection(ConfluentMessage):
self.kvpairs = {name: {'events': eventdata}}
class AsyncCompletion(ConfluentMessage):
def __init__(self):
self.stripped = True
self.notnode = True
def raw(self):
return {'_requestdone': True}
class AsyncMessage(ConfluentMessage):
def __init__(self, pair):
self.stripped = True
self.notnode = True
self.msgpair = pair
def raw(self):
rsp = self.msgpair[1]
rspdict = None
if isinstance(rsp, ConfluentMessage):
rspdict = rsp.raw()
elif isinstance(rsp, dict): # console metadata
rspdict = rsp
else: # terminal text
rspdict = {'data': rsp}
return {'asyncresponse':
{'requestid': self.msgpair[0],
'response': rspdict}}
class AsyncSession(ConfluentMessage):
def __init__(self, id):
self.desc = 'foo'
self.notnode = True
self.stripped = True
self.kvpairs = {'asyncid': id}
class User(ConfluentMessage):
def __init__(self, uid, username, privilege_level, name=None):
self.desc = 'foo'