From bcb9c2660fb95f9318ea67c2b093ffd617837e73 Mon Sep 17 00:00:00 2001
From: Jarrod Johnson <jjohnson2@lenovo.com>
Date: Tue, 9 Feb 2016 09:38:47 -0500
Subject: [PATCH 01/12] Implement a multiplex facility (WIP)

Allow an arbitrary number of HTTP requests using a
small pool of connections, as is likely in a
common web browser.
---
 confluent_server/confluent/httpapi.py     |  5 +-
 confluent_server/confluent/messages.py    |  2 +-
 confluent_server/confluent/multiplexer.py | 80 +++++++++++++++++++++++
 3 files changed, 85 insertions(+), 2 deletions(-)
 create mode 100644 confluent_server/confluent/multiplexer.py

diff --git a/confluent_server/confluent/httpapi.py b/confluent_server/confluent/httpapi.py
index 5560836a..8a18ea77 100644
--- a/confluent_server/confluent/httpapi.py
+++ b/confluent_server/confluent/httpapi.py
@@ -25,6 +25,7 @@ import confluent.exceptions as exc
 import confluent.log as log
 import confluent.messages
 import confluent.core as pluginapi
+import confluent.requestmultiplexer
 import confluent.shellserver as shellserver
 import confluent.tlvdata
 import confluent.util as util
@@ -352,7 +353,9 @@ def resourcehandler_backend(env, start_response):
         ("Set-Cookie", m.OutputString())
         for m in authorized['cookie'].values())
     cfgmgr = authorized['cfgmgr']
-    if (operation == 'create' and ('/console/session' in env['PATH_INFO'] or
+    if (operation == 'create') and env['PATH_INFO'] == '/multiplexer':
+        confluent.multiplexer.handle_http(env, querydict)
+    elif (operation == 'create' and ('/console/session' in env['PATH_INFO'] or
             '/shell/sessions/' in env['PATH_INFO'])):
         #hard bake JSON into this path, do not support other incarnations
         if '/console/session' in env['PATH_INFO']:
diff --git a/confluent_server/confluent/messages.py b/confluent_server/confluent/messages.py
index 4af17429..3ef4128c 100644
--- a/confluent_server/confluent/messages.py
+++ b/confluent_server/confluent/messages.py
@@ -1,7 +1,7 @@
 # vim: tabstop=4 shiftwidth=4 softtabstop=4
 
 # Copyright 2014 IBM Corporation
-# Copyright 2015 Lenovo
+# Copyright 2015-2016 Lenovo
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
diff --git a/confluent_server/confluent/multiplexer.py b/confluent_server/confluent/multiplexer.py
new file mode 100644
index 00000000..a1e6ce07
--- /dev/null
+++ b/confluent_server/confluent/multiplexer.py
@@ -0,0 +1,80 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2016 Lenovo
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This module handles the task of multplexing console and any watchers.
+# For example, 3 console windows may share a single http long poller
+# It can additionally add watchers for certain messages
+# messages.py will then check in for any watchers for the relevant resource
+# and trigger notifications on watchers.
+# This will allow a request to watch each individual nodes power state ond/or
+# health results will async come
+# over the watcher.  A client may only request to monitor a resource
+# if it would normally be allowed to actually request it.  Tho monitoring
+# continues, meaning any request, related or not, will send a notification
+# to a watching client
+# This enables, for example, for a web page to react on the fly to anyone
+# noticing the health, power state, add, or delete of a node (any message
+# suitably instrumented in messages.py).
+
+# This is broken out so that messages and httpapi can both import it.
+# This could be added to the socket api as well, but for now the focus shall
+# be on httpapi to enable dynamic web behavior.
+
+import confluent.util as util
+import eventlet
+import time
+
+_multiplexers = {}
+_cleanthread = None
+
+
+def _assaign_multiplexid(multiplexer):
+    sessid = util.randomstring(32)
+    while sessid in _multiplexers:
+        sessid = util.randomstring(32)
+    _multiplexers[sessid] = {'multiplexer': multiplexer,
+                               'expiry': time.time() + 60}
+    return sessid
+
+
+def _expire_multiplexers():
+    global _cleanthread
+    while multiplexers:
+        currtime = time.time()
+        for session in _multiplexers:
+            if _multiplexers[session]['expiry'] < currtime:
+                del _multiplexers[session]
+    if multiplexers:
+        _cleanthread = eventlet.spawn_after(15, _expire_multiplexers)
+    else:
+        _cleanthread = None
+
+
+class Multiplexer(object):
+    def __init__(self):
+        _assign_multiplexid(self)
+
+
+def handle_http(env, querydict):
+    global _cleanthread
+    if _cleanthread is None:
+        _cleanthread = eventlet.spawn_after(60, _expire_multiplexers)
+    if 'multiplexid' not in querydict or not querydict['multiplexid']:
+        # This is a new request, create a new multiplexer
+        multiplexer = Multiplexer()
+    else:
+        multiplexer = _multiplexers['multiplexid']['multiplexer']
+

From 7d67ea06859ddae3be9138dd7764725b1de9892f Mon Sep 17 00:00:00 2001
From: Jarrod Johnson <jjohnson2@lenovo.com>
Date: Fri, 18 Mar 2016 17:04:05 -0400
Subject: [PATCH 02/12] Refine asyncsupport

Asyncsupport progress continues.  Renaming from 'multiplex'
as 'async' seems to describe the pattern better.
---
 confluent_server/confluent/asynchttp.py   | 128 ++++++++++++++++++++++
 confluent_server/confluent/httpapi.py     |  38 ++++---
 confluent_server/confluent/messages.py    |   7 ++
 confluent_server/confluent/multiplexer.py |  80 --------------
 4 files changed, 159 insertions(+), 94 deletions(-)
 create mode 100644 confluent_server/confluent/asynchttp.py
 delete mode 100644 confluent_server/confluent/multiplexer.py

diff --git a/confluent_server/confluent/asynchttp.py b/confluent_server/confluent/asynchttp.py
new file mode 100644
index 00000000..65d70818
--- /dev/null
+++ b/confluent_server/confluent/asynchttp.py
@@ -0,0 +1,128 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2016 Lenovo
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Overall, the result of this shall be:
+# - Web clients can create the same out-of-order responsiveness as socket
+#   clients (but with more complexity on their end)
+# - Web clients can share single request among console sessions
+# - Web clients can get async notify of things like node add/remove, events
+
+# This provides an async strategy to http clients.  The design is that a http
+# session may have an 'async' resource.  In such a case, any requests are
+# queued and immediately the response is given accepting the queued request.
+# A request flags itself as queue-compatible through an HTTP header indicating
+# the identifier of the async thread.  As responses happen to the queued
+# request, data is dispatched to the first registered poller for data on
+# the session.  This way, a client may elect to provide multiple pollers
+# to mitigate general choppiness of http network pattern.  It may not be
+# worth it, but it's possible.
+
+# Additionally support console session multiplexing, to mitigate needed
+# connection count.
+
+# Also, this should allow a client to register for notifications of things
+# like node add/delete or an event firing, ultimately.
+
+# Much like console sessions, these will be reaped if a client spends too
+# far away.
+
+import collections
+import confluent.exceptions as exc
+import confluent.messages as messages
+import confluent.util as util
+import eventlet
+
+_asyncsessions = {}
+_cleanthread = None
+
+
+def _assign_asyncid(asyncsession):
+    sessid = util.randomstring(32)
+    while sessid in _asyncsessions:
+        sessid = util.randomstring(32)
+    _asyncsessions[sessid] = {'asyncsession': asyncsession}
+    return sessid
+
+
+class AsyncSession(object):
+
+    def __init__(self):
+        self.asyncid = _assign_asyncid(self)
+        self.responses = collections.deque()
+        self._evt = None
+        self.reaper = eventlet.spawn_after(15, self.destroy)
+
+    def add(self, rsp, requestid):
+        self.responses.append(rsp, requestid)
+        if self._evt:
+            self._evt.send()
+            self._evt = None
+
+    def destroy(self):
+        if self._evt:
+            self._evt.send()
+            self._evt = None
+        del _asyncsessions[self.asyncid]
+
+    def run_handler(self, handler, requestid):
+        for rsp in handler:
+            self.add(rsp, requestid)
+        self.add({'_requestdone': True}, requestid)
+
+    def get_responses(self, timeout=25):
+        self.reaper.cancel()
+        self.reaper = eventlet.spawn_after(timeout + 15, self.destroy)
+        if self._evt():
+            # TODO(jjohnson2): This precludes the goal of 'double barreled'
+            # access....  revisit if this could matter
+            raise Exception('get_responses is not re-entrant')
+        if not self.responses:  # wait to accumulate some
+            self._evt = eventlet.event.Event()
+            with eventlet.Timout(timeout, False):
+                self._evt.wait()
+            self._evt = None
+        while self.responses:
+            yield self.responses.popleft()
+
+
+def run_handler(hdlr, env):
+    asyncsessid = env['HTTP_CONFLUENTASYNCID']
+    try:
+        asyncsession = _asyncsessions[asyncsessid]
+        requestid = env['HTTP_CONFLUENTREQUESTID']
+    except KeyError:
+        raise exc.InvalidArgumentException(
+                'Invalid Session ID or missing request id')
+    eventlet.spawn_n(asyncsession.run_handler, hdlr, requestid)
+    return requestid
+
+
+def handle_async(env, querydict):
+    global _cleanthread
+    # This may be one of two things, a request for a new async stream
+    # or a request for next data from async stream
+    # httpapi otherwise handles requests an injecting them to queue
+    if 'asyncid' not in querydict or not querydict['asyncid']:
+        # This is a new request, create a new multiplexer
+        currsess = AsyncSession()
+        yield messages.AsyncSession(currsess.asyncid)
+        return
+    currsess = _asyncsessions[querydict['asyncid']]['asyncsession']
+    for rsp in currsess.get_responses():
+        yield rsp
+
+
+
diff --git a/confluent_server/confluent/httpapi.py b/confluent_server/confluent/httpapi.py
index 8a18ea77..4769bad8 100644
--- a/confluent_server/confluent/httpapi.py
+++ b/confluent_server/confluent/httpapi.py
@@ -25,7 +25,7 @@ import confluent.exceptions as exc
 import confluent.log as log
 import confluent.messages
 import confluent.core as pluginapi
-import confluent.requestmultiplexer
+import confluent.asynchttp
 import confluent.shellserver as shellserver
 import confluent.tlvdata
 import confluent.util as util
@@ -353,8 +353,14 @@ def resourcehandler_backend(env, start_response):
         ("Set-Cookie", m.OutputString())
         for m in authorized['cookie'].values())
     cfgmgr = authorized['cfgmgr']
-    if (operation == 'create') and env['PATH_INFO'] == '/multiplexer':
-        confluent.multiplexer.handle_http(env, querydict)
+    if (operation == 'create') and env['PATH_INFO'] == '/sessions/current/async':
+        pagecontent = ""
+        for rsp in _assemble_json(
+                confluent.asynchttp.handle_async(env, querydict)):
+            pagecontent += rsp
+        start_response("200 OK", headers)
+        yield pagecontent
+        return
     elif (operation == 'create' and ('/console/session' in env['PATH_INFO'] or
             '/shell/sessions/' in env['PATH_INFO'])):
         #hard bake JSON into this path, do not support other incarnations
@@ -480,7 +486,11 @@ def resourcehandler_backend(env, start_response):
         try:
             hdlr = pluginapi.handle_path(url, operation,
                                          cfgmgr, querydict)
-
+            if 'HTTP_CONFLUENTASYNCID' in env:
+                asynchttp.run_handler(hdlr, env)
+                start_response('202 Accepted', headers)
+                yield 'Request queued'
+                return
             pagecontent = ""
             if mimetype == 'text/html':
                 for datum in _assemble_html(hdlr, resource, lquerydict, url,
@@ -572,21 +582,21 @@ def _assemble_html(responses, resource, querydict, url, extension):
                '</form></body></html>')
 
 
-def _assemble_json(responses, resource, url, extension):
+def _assemble_json(responses, resource=None, url=None, extension=None):
     #NOTE(jbjohnso) I'm considering giving up on yielding bit by bit
     #in json case over http.  Notably, duplicate key values from plugin
     #overwrite, but we'd want to preserve them into an array instead.
     #the downside is that http would just always blurt it ll out at
     #once and hold on to all the data in memory
-    links = {
-        'self': {"href": resource + extension},
-    }
-    if url == '/':
-        pass
-    elif resource[-1] == '/':
-        links['collection'] = {"href": "../" + extension}
-    else:
-        links['collection'] = {"href": "./" + extension}
+    links = {}
+    if resource is not None:
+        links['self'] = {"href": resource + extension}
+        if url == '/':
+            pass
+        elif resource[-1] == '/':
+            links['collection'] = {"href": "../" + extension}
+        else:
+            links['collection'] = {"href": "./" + extension}
     rspdata = {}
     for rsp in responses:
         if isinstance(rsp, confluent.messages.LinkRelation):
diff --git a/confluent_server/confluent/messages.py b/confluent_server/confluent/messages.py
index 3ef4128c..78845ee4 100644
--- a/confluent_server/confluent/messages.py
+++ b/confluent_server/confluent/messages.py
@@ -835,6 +835,13 @@ class EventCollection(ConfluentMessage):
             self.kvpairs = {name: {'events': eventdata}}
 
 
+class AsyncSession(ConfluentMessage):
+    def __init__(self, id):
+        self.desc = 'foo'
+        self.notnode = True
+        self.stripped = True
+        self.kvpairs = {'asyncid': id}
+
 class User(ConfluentMessage):
     def __init__(self, uid, username, privilege_level, name=None):
         self.desc = 'foo'
diff --git a/confluent_server/confluent/multiplexer.py b/confluent_server/confluent/multiplexer.py
deleted file mode 100644
index a1e6ce07..00000000
--- a/confluent_server/confluent/multiplexer.py
+++ /dev/null
@@ -1,80 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2016 Lenovo
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# This module handles the task of multplexing console and any watchers.
-# For example, 3 console windows may share a single http long poller
-# It can additionally add watchers for certain messages
-# messages.py will then check in for any watchers for the relevant resource
-# and trigger notifications on watchers.
-# This will allow a request to watch each individual nodes power state ond/or
-# health results will async come
-# over the watcher.  A client may only request to monitor a resource
-# if it would normally be allowed to actually request it.  Tho monitoring
-# continues, meaning any request, related or not, will send a notification
-# to a watching client
-# This enables, for example, for a web page to react on the fly to anyone
-# noticing the health, power state, add, or delete of a node (any message
-# suitably instrumented in messages.py).
-
-# This is broken out so that messages and httpapi can both import it.
-# This could be added to the socket api as well, but for now the focus shall
-# be on httpapi to enable dynamic web behavior.
-
-import confluent.util as util
-import eventlet
-import time
-
-_multiplexers = {}
-_cleanthread = None
-
-
-def _assaign_multiplexid(multiplexer):
-    sessid = util.randomstring(32)
-    while sessid in _multiplexers:
-        sessid = util.randomstring(32)
-    _multiplexers[sessid] = {'multiplexer': multiplexer,
-                               'expiry': time.time() + 60}
-    return sessid
-
-
-def _expire_multiplexers():
-    global _cleanthread
-    while multiplexers:
-        currtime = time.time()
-        for session in _multiplexers:
-            if _multiplexers[session]['expiry'] < currtime:
-                del _multiplexers[session]
-    if multiplexers:
-        _cleanthread = eventlet.spawn_after(15, _expire_multiplexers)
-    else:
-        _cleanthread = None
-
-
-class Multiplexer(object):
-    def __init__(self):
-        _assign_multiplexid(self)
-
-
-def handle_http(env, querydict):
-    global _cleanthread
-    if _cleanthread is None:
-        _cleanthread = eventlet.spawn_after(60, _expire_multiplexers)
-    if 'multiplexid' not in querydict or not querydict['multiplexid']:
-        # This is a new request, create a new multiplexer
-        multiplexer = Multiplexer()
-    else:
-        multiplexer = _multiplexers['multiplexid']['multiplexer']
-

From 8fac1ce5dabef8f42d25630a1f95f7c05f344c4e Mon Sep 17 00:00:00 2001
From: Jarrod Johnson <jjohnson2@lenovo.com>
Date: Fri, 18 Mar 2016 20:46:37 -0400
Subject: [PATCH 03/12] Fix up the async http to actually function

Still need to review the return data to determine best format
---
 confluent_server/confluent/asynchttp.py | 14 +++++++-------
 confluent_server/confluent/httpapi.py   |  2 +-
 confluent_server/confluent/messages.py  | 18 ++++++++++++++++++
 3 files changed, 26 insertions(+), 8 deletions(-)

diff --git a/confluent_server/confluent/asynchttp.py b/confluent_server/confluent/asynchttp.py
index 65d70818..64dca6a9 100644
--- a/confluent_server/confluent/asynchttp.py
+++ b/confluent_server/confluent/asynchttp.py
@@ -66,7 +66,7 @@ class AsyncSession(object):
         self.reaper = eventlet.spawn_after(15, self.destroy)
 
     def add(self, rsp, requestid):
-        self.responses.append(rsp, requestid)
+        self.responses.append((rsp, requestid))
         if self._evt:
             self._evt.send()
             self._evt = None
@@ -79,19 +79,19 @@ class AsyncSession(object):
 
     def run_handler(self, handler, requestid):
         for rsp in handler:
-            self.add(rsp, requestid)
-        self.add({'_requestdone': True}, requestid)
+            self.add(requestid, rsp)
+        self.add(requestid, messages.AsyncCompletion())
 
     def get_responses(self, timeout=25):
         self.reaper.cancel()
         self.reaper = eventlet.spawn_after(timeout + 15, self.destroy)
-        if self._evt():
+        if self._evt:
             # TODO(jjohnson2): This precludes the goal of 'double barreled'
             # access....  revisit if this could matter
             raise Exception('get_responses is not re-entrant')
         if not self.responses:  # wait to accumulate some
             self._evt = eventlet.event.Event()
-            with eventlet.Timout(timeout, False):
+            with eventlet.Timeout(timeout, False):
                 self._evt.wait()
             self._evt = None
         while self.responses:
@@ -101,7 +101,7 @@ class AsyncSession(object):
 def run_handler(hdlr, env):
     asyncsessid = env['HTTP_CONFLUENTASYNCID']
     try:
-        asyncsession = _asyncsessions[asyncsessid]
+        asyncsession = _asyncsessions[asyncsessid]['asyncsession']
         requestid = env['HTTP_CONFLUENTREQUESTID']
     except KeyError:
         raise exc.InvalidArgumentException(
@@ -122,7 +122,7 @@ def handle_async(env, querydict):
         return
     currsess = _asyncsessions[querydict['asyncid']]['asyncsession']
     for rsp in currsess.get_responses():
-        yield rsp
+        yield messages.AsyncMessage(rsp)
 
 
 
diff --git a/confluent_server/confluent/httpapi.py b/confluent_server/confluent/httpapi.py
index 4769bad8..755c42e9 100644
--- a/confluent_server/confluent/httpapi.py
+++ b/confluent_server/confluent/httpapi.py
@@ -487,7 +487,7 @@ def resourcehandler_backend(env, start_response):
             hdlr = pluginapi.handle_path(url, operation,
                                          cfgmgr, querydict)
             if 'HTTP_CONFLUENTASYNCID' in env:
-                asynchttp.run_handler(hdlr, env)
+                confluent.asynchttp.run_handler(hdlr, env)
                 start_response('202 Accepted', headers)
                 yield 'Request queued'
                 return
diff --git a/confluent_server/confluent/messages.py b/confluent_server/confluent/messages.py
index 78845ee4..8233ae89 100644
--- a/confluent_server/confluent/messages.py
+++ b/confluent_server/confluent/messages.py
@@ -835,6 +835,24 @@ class EventCollection(ConfluentMessage):
             self.kvpairs = {name: {'events': eventdata}}
 
 
+class AsyncCompletion(ConfluentMessage):
+    def __init__(self):
+        self.stripped = True
+        self.notnode = True
+
+    def raw(self):
+        return {'_requestdone': True}
+
+class AsyncMessage(ConfluentMessage):
+    def __init__(self, pair):
+        self.stripped = True
+        self.notnode = True
+        self.msgpair = pair
+
+    def raw(self):
+        return {'requestid': self.msgpair[0],
+                'response': self.msgpair[1].raw()}
+
 class AsyncSession(ConfluentMessage):
     def __init__(self, id):
         self.desc = 'foo'

From 75a747a6a2c08c218145d4bdee4d90c582bdaa93 Mon Sep 17 00:00:00 2001
From: Jarrod Johnson <jjohnson2@lenovo.com>
Date: Sat, 19 Mar 2016 17:25:47 -0400
Subject: [PATCH 04/12] Amend structure of AsyncMessage

This is an easier structure to traverse for a client.
---
 confluent_server/confluent/messages.py | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/confluent_server/confluent/messages.py b/confluent_server/confluent/messages.py
index 8233ae89..cb824755 100644
--- a/confluent_server/confluent/messages.py
+++ b/confluent_server/confluent/messages.py
@@ -850,8 +850,9 @@ class AsyncMessage(ConfluentMessage):
         self.msgpair = pair
 
     def raw(self):
-        return {'requestid': self.msgpair[0],
-                'response': self.msgpair[1].raw()}
+        return {'asyncresponse':
+            {'requestid': self.msgpair[0],
+             'response': self.msgpair[1].raw()}}
 
 class AsyncSession(ConfluentMessage):
     def __init__(self, id):

From 2b3d5f7b626270abd41f25cd7e4cfcd909352c80 Mon Sep 17 00:00:00 2001
From: Jarrod Johnson <jjohnson2@lenovo.com>
Date: Sun, 20 Mar 2016 08:19:33 -0400
Subject: [PATCH 05/12] Have async sessions detect logout

---
 confluent_server/confluent/asynchttp.py  | 23 ++++++++++++++++-------
 confluent_server/confluent/exceptions.py |  7 +++++++
 confluent_server/confluent/httpapi.py    | 22 ++++++++++++++++------
 3 files changed, 39 insertions(+), 13 deletions(-)

diff --git a/confluent_server/confluent/asynchttp.py b/confluent_server/confluent/asynchttp.py
index 64dca6a9..43fe5362 100644
--- a/confluent_server/confluent/asynchttp.py
+++ b/confluent_server/confluent/asynchttp.py
@@ -44,6 +44,7 @@ import confluent.exceptions as exc
 import confluent.messages as messages
 import confluent.util as util
 import eventlet
+import greenlet
 
 _asyncsessions = {}
 _cleanthread = None
@@ -110,7 +111,7 @@ def run_handler(hdlr, env):
     return requestid
 
 
-def handle_async(env, querydict):
+def handle_async(env, querydict, threadset):
     global _cleanthread
     # This may be one of two things, a request for a new async stream
     # or a request for next data from async stream
@@ -120,9 +121,17 @@ def handle_async(env, querydict):
         currsess = AsyncSession()
         yield messages.AsyncSession(currsess.asyncid)
         return
-    currsess = _asyncsessions[querydict['asyncid']]['asyncsession']
-    for rsp in currsess.get_responses():
-        yield messages.AsyncMessage(rsp)
-
-
-
+    mythreadid = greenlet.getcurrent()
+    threadset.add(mythreadid)
+    loggedout = None
+    currsess = None
+    try:
+        currsess = _asyncsessions[querydict['asyncid']]['asyncsession']
+        for rsp in currsess.get_responses():
+            yield messages.AsyncMessage(rsp)
+    except greenlet.GreenletExit as ge:
+        loggedout = ge
+    threadset.discard(mythreadid)
+    if loggedout is not None:
+        currsess.destroy()
+        raise exc.LoggedOut()
\ No newline at end of file
diff --git a/confluent_server/confluent/exceptions.py b/confluent_server/confluent/exceptions.py
index 9bf7b623..bea78955 100644
--- a/confluent_server/confluent/exceptions.py
+++ b/confluent_server/confluent/exceptions.py
@@ -89,3 +89,10 @@ class PubkeyInvalid(ConfluentException):
 
     def get_error_body(self):
         return self.errorbody
+
+class LoggedOut(ConfluentException):
+    apierrorcode = 401
+    apierrorstr = '401 - Logged out'
+
+    def get_error_body(self):
+        return '{"loggedout": 1}'
diff --git a/confluent_server/confluent/httpapi.py b/confluent_server/confluent/httpapi.py
index 755c42e9..7a1159d9 100644
--- a/confluent_server/confluent/httpapi.py
+++ b/confluent_server/confluent/httpapi.py
@@ -355,12 +355,22 @@ def resourcehandler_backend(env, start_response):
     cfgmgr = authorized['cfgmgr']
     if (operation == 'create') and env['PATH_INFO'] == '/sessions/current/async':
         pagecontent = ""
-        for rsp in _assemble_json(
-                confluent.asynchttp.handle_async(env, querydict)):
-            pagecontent += rsp
-        start_response("200 OK", headers)
-        yield pagecontent
-        return
+        try:
+            for rsp in _assemble_json(
+                    confluent.asynchttp.handle_async(
+                            env, querydict,
+                            httpsessions[authorized['sessionid']]['inflight'])):
+                pagecontent += rsp
+            start_response("200 OK", headers)
+            yield pagecontent
+            return
+        except exc.ConfluentException as e:
+            if e.apierrorcode == 500:
+                # raise generics to trigger the tracelog
+                raise
+            start_response('{0} {1}'.format(e.apierrorcode, e.apierrorstr),
+                           headers)
+            yield e.get_error_body()
     elif (operation == 'create' and ('/console/session' in env['PATH_INFO'] or
             '/shell/sessions/' in env['PATH_INFO'])):
         #hard bake JSON into this path, do not support other incarnations

From 3cd96a4f598c679b6443c35dd9fbdfbe2313107a Mon Sep 17 00:00:00 2001
From: Jarrod Johnson <jjohnson2@lenovo.com>
Date: Sun, 20 Mar 2016 13:53:06 -0400
Subject: [PATCH 06/12] Force asyncresponse http to be JSON array

Rather than let it be ambiguous, force it to provide a JSON array.
---
 confluent_server/confluent/httpapi.py  | 2 +-
 confluent_server/confluent/messages.py | 9 +++++----
 2 files changed, 6 insertions(+), 5 deletions(-)

diff --git a/confluent_server/confluent/httpapi.py b/confluent_server/confluent/httpapi.py
index 7a1159d9..7a9f0c32 100644
--- a/confluent_server/confluent/httpapi.py
+++ b/confluent_server/confluent/httpapi.py
@@ -634,7 +634,7 @@ def _assemble_json(responses, resource=None, url=None, extension=None):
                     else:
                         rspdata[dk] = [rspdata[dk], rsp[dk]]
                 else:
-                    if dk == 'databynode':
+                    if dk == 'databynode' or dk == 'asyncresponse':
                         # a quirk, databynode suggests noderange
                         # multi response.  This should *always* be a list,
                         # even if it will be length 1
diff --git a/confluent_server/confluent/messages.py b/confluent_server/confluent/messages.py
index cb824755..8c13c9ac 100644
--- a/confluent_server/confluent/messages.py
+++ b/confluent_server/confluent/messages.py
@@ -489,8 +489,8 @@ class InputCredential(ConfluentMessage):
         if node not in self.credentials:
             return {}
         credential = self.credentials[node]
-        for attr in credentials:
-            if type(credentials[attr]) in (str, unicode):
+        for attr in credential:
+            if type(credential[attr]) in (str, unicode):
                 try:
                     # as above, use format() to see if string follows
                     # expression, store value back in case of escapes
@@ -843,6 +843,7 @@ class AsyncCompletion(ConfluentMessage):
     def raw(self):
         return {'_requestdone': True}
 
+
 class AsyncMessage(ConfluentMessage):
     def __init__(self, pair):
         self.stripped = True
@@ -851,8 +852,8 @@ class AsyncMessage(ConfluentMessage):
 
     def raw(self):
         return {'asyncresponse':
-            {'requestid': self.msgpair[0],
-             'response': self.msgpair[1].raw()}}
+                    {'requestid': self.msgpair[0],
+                      'response': self.msgpair[1].raw()}}
 
 class AsyncSession(ConfluentMessage):
     def __init__(self, id):

From d753ac28331748d70a5f559f730322b7184da8bd Mon Sep 17 00:00:00 2001
From: Jarrod Johnson <jjohnson2@lenovo.com>
Date: Fri, 25 Mar 2016 14:50:47 -0400
Subject: [PATCH 07/12] Add terminal sessions to async http

This functionality enables a browser to hold more terminals open
than their max connection rating would normally allow.
---
 confluent_server/confluent/asynchttp.py | 43 ++++++++++++++++++++++++-
 confluent_server/confluent/httpapi.py   | 17 ++++++++--
 confluent_server/confluent/messages.py  | 10 +++++-
 3 files changed, 66 insertions(+), 4 deletions(-)

diff --git a/confluent_server/confluent/asynchttp.py b/confluent_server/confluent/asynchttp.py
index 43fe5362..9e53e3bd 100644
--- a/confluent_server/confluent/asynchttp.py
+++ b/confluent_server/confluent/asynchttp.py
@@ -45,9 +45,11 @@ import confluent.messages as messages
 import confluent.util as util
 import eventlet
 import greenlet
+import time
 
 _asyncsessions = {}
 _cleanthread = None
+_consolesessions = None
 
 
 def _assign_asyncid(asyncsession):
@@ -58,12 +60,26 @@ def _assign_asyncid(asyncsession):
     return sessid
 
 
+class AsyncTermRelation(object):
+    # Need to keep an association of term object to async
+    # This allows the async handler to know the context of
+    # outgoing data to provide to calling code
+    def __init__(self, termid, async):
+        self.async = async
+        self.termid = termid
+
+    def got_data(self, data):
+        self.async.add(data, self.termid)
+
+
 class AsyncSession(object):
 
     def __init__(self):
         self.asyncid = _assign_asyncid(self)
         self.responses = collections.deque()
         self._evt = None
+        self.termrelations = []
+        self.consoles = set([])
         self.reaper = eventlet.spawn_after(15, self.destroy)
 
     def add(self, rsp, requestid):
@@ -72,6 +88,18 @@ class AsyncSession(object):
             self._evt.send()
             self._evt = None
 
+    def set_term_relation(self, env):
+        # need a term relation to keep track of what data belongs
+        # to what object (since the callback does not provide context
+        # for data, and here ultimately the client is responsible
+        # for sorting out which is which.
+        termrel = AsyncTermRelation(['HTTP_CONFLUENTREQUESTID'], self)
+        self.termrelations.append(termrel)
+        return termrel
+
+    def add_console_session(self, sessionid):
+        self.consoles.add(sessionid)
+
     def destroy(self):
         if self._evt:
             self._evt.send()
@@ -85,6 +113,9 @@ class AsyncSession(object):
 
     def get_responses(self, timeout=25):
         self.reaper.cancel()
+        nextexpiry = time.time() + 90
+        for csess in self.consoles:
+            _consolesessions[csess]['expiry'] = nextexpiry
         self.reaper = eventlet.spawn_after(timeout + 15, self.destroy)
         if self._evt:
             # TODO(jjohnson2): This precludes the goal of 'double barreled'
@@ -111,6 +142,11 @@ def run_handler(hdlr, env):
     return requestid
 
 
+def get_async(env, querydict):
+    global _cleanthread
+    return _asyncsessions[querydict['asyncid']]['asyncsession']
+
+
 def handle_async(env, querydict, threadset):
     global _cleanthread
     # This may be one of two things, a request for a new async stream
@@ -134,4 +170,9 @@ def handle_async(env, querydict, threadset):
     threadset.discard(mythreadid)
     if loggedout is not None:
         currsess.destroy()
-        raise exc.LoggedOut()
\ No newline at end of file
+        raise exc.LoggedOut()
+
+
+def set_console_sessions(consolesessions):
+    global _consolesessions
+    _consolesessions = consolesessions
\ No newline at end of file
diff --git a/confluent_server/confluent/httpapi.py b/confluent_server/confluent/httpapi.py
index 7a9f0c32..a5ce66e3 100644
--- a/confluent_server/confluent/httpapi.py
+++ b/confluent_server/confluent/httpapi.py
@@ -46,6 +46,7 @@ tlvdata = confluent.tlvdata
 auditlog = None
 tracelog = None
 consolesessions = {}
+confluent.asynchttp.set_console_sessions(consolesessions)
 httpsessions = {}
 opmap = {
     'POST': 'create',
@@ -394,15 +395,25 @@ def resourcehandler_backend(env, start_response):
             skipreplay = False
             if 'skipreplay' in querydict and querydict['skipreplay']:
                 skipreplay = True
+            datacallback = None
+            async = None
+            if 'HTTP_CONFLUENTASYNCID' in env:
+                async = confluent.asynchttp.get_async(env, querydict)
+                termrel = async.set_term_relation(env)
+                datacallback = termrel.got_data
             try:
                 if shellsession:
                     consession = shellserver.ShellSession(
                         node=nodename, configmanager=cfgmgr,
-                        username=authorized['username'], skipreplay=skipreplay)
+                        username=authorized['username'], skipreplay=skipreplay,
+                        datacallback=datacallback
+                    )
                 else:
                     consession = consoleserver.ConsoleSession(
                         node=nodename, configmanager=cfgmgr,
-                        username=authorized['username'], skipreplay=skipreplay)
+                        username=authorized['username'], skipreplay=skipreplay,
+                        datacallback=datacallback
+                    )
             except exc.NotFoundException:
                 start_response("404 Not found", headers)
                 yield "404 - Request Path not recognized"
@@ -411,6 +422,8 @@ def resourcehandler_backend(env, start_response):
                 start_response("500 Internal Server Error", headers)
                 return
             sessid = _assign_consessionid(consession)
+            if async:
+                async.add_consolesession(sessid)
             start_response('200 OK', headers)
             yield '{"session":"%s","data":""}' % sessid
             return
diff --git a/confluent_server/confluent/messages.py b/confluent_server/confluent/messages.py
index 8c13c9ac..573ce738 100644
--- a/confluent_server/confluent/messages.py
+++ b/confluent_server/confluent/messages.py
@@ -851,9 +851,17 @@ class AsyncMessage(ConfluentMessage):
         self.msgpair = pair
 
     def raw(self):
+        rsp = self.msgpair[1]
+        rspdict = None
+        if isinstance(rsp, ConfluentMessage):
+            rspdict = rsp.raw()
+        elif isinstance(rsp, dict):  # console metadata
+            rspdict = rsp
+        else: # terminal text
+            rspdict = {'data': rsp}
         return {'asyncresponse':
                     {'requestid': self.msgpair[0],
-                      'response': self.msgpair[1].raw()}}
+                      'response': rspdict}}
 
 class AsyncSession(ConfluentMessage):
     def __init__(self, id):

From 50aefee72863989e74386f851f79faf6b23f7e90 Mon Sep 17 00:00:00 2001
From: Jarrod Johnson <jjohnson2@lenovo.com>
Date: Sat, 26 Mar 2016 09:34:46 -0400
Subject: [PATCH 08/12] Correct a number of issues

There were a number of careless mistakes in the feature, correct
the bad usage and typos.
---
 confluent_server/confluent/asynchttp.py | 10 +++++-----
 confluent_server/confluent/httpapi.py   |  2 +-
 2 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/confluent_server/confluent/asynchttp.py b/confluent_server/confluent/asynchttp.py
index 9e53e3bd..24c51a8b 100644
--- a/confluent_server/confluent/asynchttp.py
+++ b/confluent_server/confluent/asynchttp.py
@@ -69,7 +69,7 @@ class AsyncTermRelation(object):
         self.termid = termid
 
     def got_data(self, data):
-        self.async.add(data, self.termid)
+        self.async.add(self.termid, data)
 
 
 class AsyncSession(object):
@@ -82,8 +82,8 @@ class AsyncSession(object):
         self.consoles = set([])
         self.reaper = eventlet.spawn_after(15, self.destroy)
 
-    def add(self, rsp, requestid):
-        self.responses.append((rsp, requestid))
+    def add(self, requestid, rsp):
+        self.responses.append((requestid, rsp))
         if self._evt:
             self._evt.send()
             self._evt = None
@@ -93,7 +93,7 @@ class AsyncSession(object):
         # to what object (since the callback does not provide context
         # for data, and here ultimately the client is responsible
         # for sorting out which is which.
-        termrel = AsyncTermRelation(['HTTP_CONFLUENTREQUESTID'], self)
+        termrel = AsyncTermRelation(env['HTTP_CONFLUENTREQUESTID'], self)
         self.termrelations.append(termrel)
         return termrel
 
@@ -144,7 +144,7 @@ def run_handler(hdlr, env):
 
 def get_async(env, querydict):
     global _cleanthread
-    return _asyncsessions[querydict['asyncid']]['asyncsession']
+    return _asyncsessions[env['HTTP_CONFLUENTASYNCID']]['asyncsession']
 
 
 def handle_async(env, querydict, threadset):
diff --git a/confluent_server/confluent/httpapi.py b/confluent_server/confluent/httpapi.py
index a5ce66e3..9be9a515 100644
--- a/confluent_server/confluent/httpapi.py
+++ b/confluent_server/confluent/httpapi.py
@@ -423,7 +423,7 @@ def resourcehandler_backend(env, start_response):
                 return
             sessid = _assign_consessionid(consession)
             if async:
-                async.add_consolesession(sessid)
+                async.add_console_session(sessid)
             start_response('200 OK', headers)
             yield '{"session":"%s","data":""}' % sessid
             return

From 79b1268a75be215725417178c3c38e6b56072b1e Mon Sep 17 00:00:00 2001
From: Jarrod Johnson <jjohnson2@lenovo.com>
Date: Sat, 26 Mar 2016 10:02:16 -0400
Subject: [PATCH 09/12] Tolerate cp437 format text

UEFI output may still be cp437.  Tolerate through
attempting to use it.  UTF-8 continues to be preferred.
---
 confluent_client/confluent/tlvdata.py | 14 ++++++++++++--
 1 file changed, 12 insertions(+), 2 deletions(-)

diff --git a/confluent_client/confluent/tlvdata.py b/confluent_client/confluent/tlvdata.py
index 5a13f901..9676a35a 100644
--- a/confluent_client/confluent/tlvdata.py
+++ b/confluent_client/confluent/tlvdata.py
@@ -19,15 +19,25 @@ import confluent.tlv as tlv
 import json
 import struct
 
+def decodestr(value):
+    ret = None
+    try:
+        ret = value.decode('utf-8')
+    except UnicodeDecodeError:
+        try:
+            ret = value.decode('cp437')
+        except UnicodeDecodeError:
+            ret = value
+    return ret
 
 def unicode_dictvalues(dictdata):
     for key in dictdata:
         if isinstance(dictdata[key], str):
-            dictdata[key] = dictdata[key].decode('utf-8')
+            dictdata[key] = decodestr(dictdata[key])
         elif isinstance(dictdata[key], list):
             for i in xrange(len(dictdata[key])):
                 if isinstance(dictdata[key][i], str):
-                    dictdata[key][i] = dictdata[key][i].decode('utf-8')
+                    dictdata[key][i] = decodestr(dictdata[key][i])
                 elif isinstance(dictdata[key][i], dict):
                     unicode_dictvalues(dictdata[key][i])
         elif isinstance(dictdata[key], dict):

From 03b2cdab5a5c2cb5c12d7fbfb29b39f441119ac3 Mon Sep 17 00:00:00 2001
From: Jarrod Johnson <jjohnson2@lenovo.com>
Date: Sat, 26 Mar 2016 10:26:17 -0400
Subject: [PATCH 10/12] Assure console sessions get reaped

When an error (to be fixed) happened while updating expiry,
an asyncsession failed to have a reaper scheduled for cleanup.
Correct this by putting the reaper schedule right after the
cancellation.

Further, an async being destroyed did not reap related console
sessions.  Add code to reap related console sessions when
the async session gets destroyed.
---
 confluent_server/confluent/asynchttp.py | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/confluent_server/confluent/asynchttp.py b/confluent_server/confluent/asynchttp.py
index 24c51a8b..f109377c 100644
--- a/confluent_server/confluent/asynchttp.py
+++ b/confluent_server/confluent/asynchttp.py
@@ -104,6 +104,9 @@ class AsyncSession(object):
         if self._evt:
             self._evt.send()
             self._evt = None
+        for console in self.consoles:
+            _consolesessions[console]['session'].destroy()
+        self.consoles = None
         del _asyncsessions[self.asyncid]
 
     def run_handler(self, handler, requestid):
@@ -113,10 +116,10 @@ class AsyncSession(object):
 
     def get_responses(self, timeout=25):
         self.reaper.cancel()
+        self.reaper = eventlet.spawn_after(timeout + 15, self.destroy)
         nextexpiry = time.time() + 90
         for csess in self.consoles:
             _consolesessions[csess]['expiry'] = nextexpiry
-        self.reaper = eventlet.spawn_after(timeout + 15, self.destroy)
         if self._evt:
             # TODO(jjohnson2): This precludes the goal of 'double barreled'
             # access....  revisit if this could matter

From 417e70e5c1e13c6d3fb171d93a1bdd0d3af6314b Mon Sep 17 00:00:00 2001
From: Jarrod Johnson <jjohnson2@lenovo.com>
Date: Sat, 26 Mar 2016 10:45:47 -0400
Subject: [PATCH 11/12] Tolerate terminal closure

When a terminal closes and notifies server, it was
pulling the rug out from asyncsession consoles.
Make asyncsession aware that the console may be gone
and discard tracking it rather than give a 500.
---
 confluent_server/confluent/asynchttp.py | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git a/confluent_server/confluent/asynchttp.py b/confluent_server/confluent/asynchttp.py
index f109377c..ab12c9be 100644
--- a/confluent_server/confluent/asynchttp.py
+++ b/confluent_server/confluent/asynchttp.py
@@ -118,8 +118,11 @@ class AsyncSession(object):
         self.reaper.cancel()
         self.reaper = eventlet.spawn_after(timeout + 15, self.destroy)
         nextexpiry = time.time() + 90
-        for csess in self.consoles:
-            _consolesessions[csess]['expiry'] = nextexpiry
+        for csess in list(self.consoles):
+            try:
+                _consolesessions[csess]['expiry'] = nextexpiry
+            except KeyError:  # session has been closed elsewhere
+                self.consoles.discard(csess)
         if self._evt:
             # TODO(jjohnson2): This precludes the goal of 'double barreled'
             # access....  revisit if this could matter

From 2ea7ee0dcb831eaa9e4a05c078011c455bb66148 Mon Sep 17 00:00:00 2001
From: Jarrod Johnson <jjohnson2@lenovo.com>
Date: Sat, 26 Mar 2016 13:34:21 -0400
Subject: [PATCH 12/12] Add thread traces to USR1 handler

When receiving a USR1 signal, it did usefully provide
'the' current stack, useful for diagnosing really hard
hangs.  However, it's frequently informative to see all
the thread stack traces, so add that data to the diagnostic
feature.
---
 confluent_server/confluent/main.py | 9 +++++++++
 1 file changed, 9 insertions(+)

diff --git a/confluent_server/confluent/main.py b/confluent_server/confluent/main.py
index 9acc7cb2..2eaee4da 100644
--- a/confluent_server/confluent/main.py
+++ b/confluent_server/confluent/main.py
@@ -50,6 +50,8 @@ try:
 except ImportError:
     havefcntl = False
 #import multiprocessing
+import gc
+from greenlet import greenlet
 import sys
 import os
 import signal
@@ -129,6 +131,13 @@ def dumptrace(signalname, frame):
     ht = open('/var/log/confluent/hangtraces', 'a')
     ht.write('Dumping active trace on ' + time.strftime('%X %x\n'))
     ht.write(''.join(traceback.format_stack(frame)))
+    for o in gc.get_objects():
+        if not isinstance(o, greenlet):
+            continue
+        if not o:
+            continue
+        ht.write('Thread trace:\n')
+        ht.write(''.join(traceback.format_stack(o.gr_frame)))
     ht.close()
 
 def doexit():