diff --git a/confluent_client/confluent/tlvdata.py b/confluent_client/confluent/tlvdata.py index 5a13f901..9676a35a 100644 --- a/confluent_client/confluent/tlvdata.py +++ b/confluent_client/confluent/tlvdata.py @@ -19,15 +19,25 @@ import confluent.tlv as tlv import json import struct +def decodestr(value): + ret = None + try: + ret = value.decode('utf-8') + except UnicodeDecodeError: + try: + ret = value.decode('cp437') + except UnicodeDecodeError: + ret = value + return ret def unicode_dictvalues(dictdata): for key in dictdata: if isinstance(dictdata[key], str): - dictdata[key] = dictdata[key].decode('utf-8') + dictdata[key] = decodestr(dictdata[key]) elif isinstance(dictdata[key], list): for i in xrange(len(dictdata[key])): if isinstance(dictdata[key][i], str): - dictdata[key][i] = dictdata[key][i].decode('utf-8') + dictdata[key][i] = decodestr(dictdata[key][i]) elif isinstance(dictdata[key][i], dict): unicode_dictvalues(dictdata[key][i]) elif isinstance(dictdata[key], dict): diff --git a/confluent_server/confluent/asynchttp.py b/confluent_server/confluent/asynchttp.py new file mode 100644 index 00000000..ab12c9be --- /dev/null +++ b/confluent_server/confluent/asynchttp.py @@ -0,0 +1,184 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2016 Lenovo +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Overall, the result of this shall be: +# - Web clients can create the same out-of-order responsiveness as socket +# clients (but with more complexity on their end) +# - Web clients can share single request among console sessions +# - Web clients can get async notify of things like node add/remove, events + +# This provides an async strategy to http clients. The design is that a http +# session may have an 'async' resource. In such a case, any requests are +# queued and immediately the response is given accepting the queued request. +# A request flags itself as queue-compatible through an HTTP header indicating +# the identifier of the async thread. As responses happen to the queued +# request, data is dispatched to the first registered poller for data on +# the session. This way, a client may elect to provide multiple pollers +# to mitigate general choppiness of http network pattern. It may not be +# worth it, but it's possible. + +# Additionally support console session multiplexing, to mitigate needed +# connection count. + +# Also, this should allow a client to register for notifications of things +# like node add/delete or an event firing, ultimately. + +# Much like console sessions, these will be reaped if a client spends too +# far away. + +import collections +import confluent.exceptions as exc +import confluent.messages as messages +import confluent.util as util +import eventlet +import greenlet +import time + +_asyncsessions = {} +_cleanthread = None +_consolesessions = None + + +def _assign_asyncid(asyncsession): + sessid = util.randomstring(32) + while sessid in _asyncsessions: + sessid = util.randomstring(32) + _asyncsessions[sessid] = {'asyncsession': asyncsession} + return sessid + + +class AsyncTermRelation(object): + # Need to keep an association of term object to async + # This allows the async handler to know the context of + # outgoing data to provide to calling code + def __init__(self, termid, async): + self.async = async + self.termid = termid + + def got_data(self, data): + self.async.add(self.termid, data) + + +class AsyncSession(object): + + def __init__(self): + self.asyncid = _assign_asyncid(self) + self.responses = collections.deque() + self._evt = None + self.termrelations = [] + self.consoles = set([]) + self.reaper = eventlet.spawn_after(15, self.destroy) + + def add(self, requestid, rsp): + self.responses.append((requestid, rsp)) + if self._evt: + self._evt.send() + self._evt = None + + def set_term_relation(self, env): + # need a term relation to keep track of what data belongs + # to what object (since the callback does not provide context + # for data, and here ultimately the client is responsible + # for sorting out which is which. + termrel = AsyncTermRelation(env['HTTP_CONFLUENTREQUESTID'], self) + self.termrelations.append(termrel) + return termrel + + def add_console_session(self, sessionid): + self.consoles.add(sessionid) + + def destroy(self): + if self._evt: + self._evt.send() + self._evt = None + for console in self.consoles: + _consolesessions[console]['session'].destroy() + self.consoles = None + del _asyncsessions[self.asyncid] + + def run_handler(self, handler, requestid): + for rsp in handler: + self.add(requestid, rsp) + self.add(requestid, messages.AsyncCompletion()) + + def get_responses(self, timeout=25): + self.reaper.cancel() + self.reaper = eventlet.spawn_after(timeout + 15, self.destroy) + nextexpiry = time.time() + 90 + for csess in list(self.consoles): + try: + _consolesessions[csess]['expiry'] = nextexpiry + except KeyError: # session has been closed elsewhere + self.consoles.discard(csess) + if self._evt: + # TODO(jjohnson2): This precludes the goal of 'double barreled' + # access.... revisit if this could matter + raise Exception('get_responses is not re-entrant') + if not self.responses: # wait to accumulate some + self._evt = eventlet.event.Event() + with eventlet.Timeout(timeout, False): + self._evt.wait() + self._evt = None + while self.responses: + yield self.responses.popleft() + + +def run_handler(hdlr, env): + asyncsessid = env['HTTP_CONFLUENTASYNCID'] + try: + asyncsession = _asyncsessions[asyncsessid]['asyncsession'] + requestid = env['HTTP_CONFLUENTREQUESTID'] + except KeyError: + raise exc.InvalidArgumentException( + 'Invalid Session ID or missing request id') + eventlet.spawn_n(asyncsession.run_handler, hdlr, requestid) + return requestid + + +def get_async(env, querydict): + global _cleanthread + return _asyncsessions[env['HTTP_CONFLUENTASYNCID']]['asyncsession'] + + +def handle_async(env, querydict, threadset): + global _cleanthread + # This may be one of two things, a request for a new async stream + # or a request for next data from async stream + # httpapi otherwise handles requests an injecting them to queue + if 'asyncid' not in querydict or not querydict['asyncid']: + # This is a new request, create a new multiplexer + currsess = AsyncSession() + yield messages.AsyncSession(currsess.asyncid) + return + mythreadid = greenlet.getcurrent() + threadset.add(mythreadid) + loggedout = None + currsess = None + try: + currsess = _asyncsessions[querydict['asyncid']]['asyncsession'] + for rsp in currsess.get_responses(): + yield messages.AsyncMessage(rsp) + except greenlet.GreenletExit as ge: + loggedout = ge + threadset.discard(mythreadid) + if loggedout is not None: + currsess.destroy() + raise exc.LoggedOut() + + +def set_console_sessions(consolesessions): + global _consolesessions + _consolesessions = consolesessions \ No newline at end of file diff --git a/confluent_server/confluent/exceptions.py b/confluent_server/confluent/exceptions.py index 9bf7b623..bea78955 100644 --- a/confluent_server/confluent/exceptions.py +++ b/confluent_server/confluent/exceptions.py @@ -89,3 +89,10 @@ class PubkeyInvalid(ConfluentException): def get_error_body(self): return self.errorbody + +class LoggedOut(ConfluentException): + apierrorcode = 401 + apierrorstr = '401 - Logged out' + + def get_error_body(self): + return '{"loggedout": 1}' diff --git a/confluent_server/confluent/httpapi.py b/confluent_server/confluent/httpapi.py index 5560836a..9be9a515 100644 --- a/confluent_server/confluent/httpapi.py +++ b/confluent_server/confluent/httpapi.py @@ -25,6 +25,7 @@ import confluent.exceptions as exc import confluent.log as log import confluent.messages import confluent.core as pluginapi +import confluent.asynchttp import confluent.shellserver as shellserver import confluent.tlvdata import confluent.util as util @@ -45,6 +46,7 @@ tlvdata = confluent.tlvdata auditlog = None tracelog = None consolesessions = {} +confluent.asynchttp.set_console_sessions(consolesessions) httpsessions = {} opmap = { 'POST': 'create', @@ -352,7 +354,25 @@ def resourcehandler_backend(env, start_response): ("Set-Cookie", m.OutputString()) for m in authorized['cookie'].values()) cfgmgr = authorized['cfgmgr'] - if (operation == 'create' and ('/console/session' in env['PATH_INFO'] or + if (operation == 'create') and env['PATH_INFO'] == '/sessions/current/async': + pagecontent = "" + try: + for rsp in _assemble_json( + confluent.asynchttp.handle_async( + env, querydict, + httpsessions[authorized['sessionid']]['inflight'])): + pagecontent += rsp + start_response("200 OK", headers) + yield pagecontent + return + except exc.ConfluentException as e: + if e.apierrorcode == 500: + # raise generics to trigger the tracelog + raise + start_response('{0} {1}'.format(e.apierrorcode, e.apierrorstr), + headers) + yield e.get_error_body() + elif (operation == 'create' and ('/console/session' in env['PATH_INFO'] or '/shell/sessions/' in env['PATH_INFO'])): #hard bake JSON into this path, do not support other incarnations if '/console/session' in env['PATH_INFO']: @@ -375,15 +395,25 @@ def resourcehandler_backend(env, start_response): skipreplay = False if 'skipreplay' in querydict and querydict['skipreplay']: skipreplay = True + datacallback = None + async = None + if 'HTTP_CONFLUENTASYNCID' in env: + async = confluent.asynchttp.get_async(env, querydict) + termrel = async.set_term_relation(env) + datacallback = termrel.got_data try: if shellsession: consession = shellserver.ShellSession( node=nodename, configmanager=cfgmgr, - username=authorized['username'], skipreplay=skipreplay) + username=authorized['username'], skipreplay=skipreplay, + datacallback=datacallback + ) else: consession = consoleserver.ConsoleSession( node=nodename, configmanager=cfgmgr, - username=authorized['username'], skipreplay=skipreplay) + username=authorized['username'], skipreplay=skipreplay, + datacallback=datacallback + ) except exc.NotFoundException: start_response("404 Not found", headers) yield "404 - Request Path not recognized" @@ -392,6 +422,8 @@ def resourcehandler_backend(env, start_response): start_response("500 Internal Server Error", headers) return sessid = _assign_consessionid(consession) + if async: + async.add_console_session(sessid) start_response('200 OK', headers) yield '{"session":"%s","data":""}' % sessid return @@ -477,7 +509,11 @@ def resourcehandler_backend(env, start_response): try: hdlr = pluginapi.handle_path(url, operation, cfgmgr, querydict) - + if 'HTTP_CONFLUENTASYNCID' in env: + confluent.asynchttp.run_handler(hdlr, env) + start_response('202 Accepted', headers) + yield 'Request queued' + return pagecontent = "" if mimetype == 'text/html': for datum in _assemble_html(hdlr, resource, lquerydict, url, @@ -569,21 +605,21 @@ def _assemble_html(responses, resource, querydict, url, extension): '') -def _assemble_json(responses, resource, url, extension): +def _assemble_json(responses, resource=None, url=None, extension=None): #NOTE(jbjohnso) I'm considering giving up on yielding bit by bit #in json case over http. Notably, duplicate key values from plugin #overwrite, but we'd want to preserve them into an array instead. #the downside is that http would just always blurt it ll out at #once and hold on to all the data in memory - links = { - 'self': {"href": resource + extension}, - } - if url == '/': - pass - elif resource[-1] == '/': - links['collection'] = {"href": "../" + extension} - else: - links['collection'] = {"href": "./" + extension} + links = {} + if resource is not None: + links['self'] = {"href": resource + extension} + if url == '/': + pass + elif resource[-1] == '/': + links['collection'] = {"href": "../" + extension} + else: + links['collection'] = {"href": "./" + extension} rspdata = {} for rsp in responses: if isinstance(rsp, confluent.messages.LinkRelation): @@ -611,7 +647,7 @@ def _assemble_json(responses, resource, url, extension): else: rspdata[dk] = [rspdata[dk], rsp[dk]] else: - if dk == 'databynode': + if dk == 'databynode' or dk == 'asyncresponse': # a quirk, databynode suggests noderange # multi response. This should *always* be a list, # even if it will be length 1 diff --git a/confluent_server/confluent/main.py b/confluent_server/confluent/main.py index 9acc7cb2..2eaee4da 100644 --- a/confluent_server/confluent/main.py +++ b/confluent_server/confluent/main.py @@ -50,6 +50,8 @@ try: except ImportError: havefcntl = False #import multiprocessing +import gc +from greenlet import greenlet import sys import os import signal @@ -129,6 +131,13 @@ def dumptrace(signalname, frame): ht = open('/var/log/confluent/hangtraces', 'a') ht.write('Dumping active trace on ' + time.strftime('%X %x\n')) ht.write(''.join(traceback.format_stack(frame))) + for o in gc.get_objects(): + if not isinstance(o, greenlet): + continue + if not o: + continue + ht.write('Thread trace:\n') + ht.write(''.join(traceback.format_stack(o.gr_frame))) ht.close() def doexit(): diff --git a/confluent_server/confluent/messages.py b/confluent_server/confluent/messages.py index 4af17429..573ce738 100644 --- a/confluent_server/confluent/messages.py +++ b/confluent_server/confluent/messages.py @@ -1,7 +1,7 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright 2014 IBM Corporation -# Copyright 2015 Lenovo +# Copyright 2015-2016 Lenovo # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -489,8 +489,8 @@ class InputCredential(ConfluentMessage): if node not in self.credentials: return {} credential = self.credentials[node] - for attr in credentials: - if type(credentials[attr]) in (str, unicode): + for attr in credential: + if type(credential[attr]) in (str, unicode): try: # as above, use format() to see if string follows # expression, store value back in case of escapes @@ -835,6 +835,41 @@ class EventCollection(ConfluentMessage): self.kvpairs = {name: {'events': eventdata}} +class AsyncCompletion(ConfluentMessage): + def __init__(self): + self.stripped = True + self.notnode = True + + def raw(self): + return {'_requestdone': True} + + +class AsyncMessage(ConfluentMessage): + def __init__(self, pair): + self.stripped = True + self.notnode = True + self.msgpair = pair + + def raw(self): + rsp = self.msgpair[1] + rspdict = None + if isinstance(rsp, ConfluentMessage): + rspdict = rsp.raw() + elif isinstance(rsp, dict): # console metadata + rspdict = rsp + else: # terminal text + rspdict = {'data': rsp} + return {'asyncresponse': + {'requestid': self.msgpair[0], + 'response': rspdict}} + +class AsyncSession(ConfluentMessage): + def __init__(self, id): + self.desc = 'foo' + self.notnode = True + self.stripped = True + self.kvpairs = {'asyncid': id} + class User(ConfluentMessage): def __init__(self, uid, username, privilege_level, name=None): self.desc = 'foo'