mirror of
https://github.com/xcat2/confluent.git
synced 2025-01-17 21:23:18 +00:00
Implement a multiplex facility (WIP)
Allow an arbitrary number of HTTP requests using a small pool of connections, as is likely in a common web browser.
This commit is contained in:
parent
6504acecad
commit
bcb9c2660f
@ -25,6 +25,7 @@ import confluent.exceptions as exc
|
||||
import confluent.log as log
|
||||
import confluent.messages
|
||||
import confluent.core as pluginapi
|
||||
import confluent.requestmultiplexer
|
||||
import confluent.shellserver as shellserver
|
||||
import confluent.tlvdata
|
||||
import confluent.util as util
|
||||
@ -352,7 +353,9 @@ def resourcehandler_backend(env, start_response):
|
||||
("Set-Cookie", m.OutputString())
|
||||
for m in authorized['cookie'].values())
|
||||
cfgmgr = authorized['cfgmgr']
|
||||
if (operation == 'create' and ('/console/session' in env['PATH_INFO'] or
|
||||
if (operation == 'create') and env['PATH_INFO'] == '/multiplexer':
|
||||
confluent.multiplexer.handle_http(env, querydict)
|
||||
elif (operation == 'create' and ('/console/session' in env['PATH_INFO'] or
|
||||
'/shell/sessions/' in env['PATH_INFO'])):
|
||||
#hard bake JSON into this path, do not support other incarnations
|
||||
if '/console/session' in env['PATH_INFO']:
|
||||
|
@ -1,7 +1,7 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2014 IBM Corporation
|
||||
# Copyright 2015 Lenovo
|
||||
# Copyright 2015-2016 Lenovo
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
|
80
confluent_server/confluent/multiplexer.py
Normal file
80
confluent_server/confluent/multiplexer.py
Normal file
@ -0,0 +1,80 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2016 Lenovo
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# This module handles the task of multplexing console and any watchers.
|
||||
# For example, 3 console windows may share a single http long poller
|
||||
# It can additionally add watchers for certain messages
|
||||
# messages.py will then check in for any watchers for the relevant resource
|
||||
# and trigger notifications on watchers.
|
||||
# This will allow a request to watch each individual nodes power state ond/or
|
||||
# health results will async come
|
||||
# over the watcher. A client may only request to monitor a resource
|
||||
# if it would normally be allowed to actually request it. Tho monitoring
|
||||
# continues, meaning any request, related or not, will send a notification
|
||||
# to a watching client
|
||||
# This enables, for example, for a web page to react on the fly to anyone
|
||||
# noticing the health, power state, add, or delete of a node (any message
|
||||
# suitably instrumented in messages.py).
|
||||
|
||||
# This is broken out so that messages and httpapi can both import it.
|
||||
# This could be added to the socket api as well, but for now the focus shall
|
||||
# be on httpapi to enable dynamic web behavior.
|
||||
|
||||
import confluent.util as util
|
||||
import eventlet
|
||||
import time
|
||||
|
||||
_multiplexers = {}
|
||||
_cleanthread = None
|
||||
|
||||
|
||||
def _assaign_multiplexid(multiplexer):
|
||||
sessid = util.randomstring(32)
|
||||
while sessid in _multiplexers:
|
||||
sessid = util.randomstring(32)
|
||||
_multiplexers[sessid] = {'multiplexer': multiplexer,
|
||||
'expiry': time.time() + 60}
|
||||
return sessid
|
||||
|
||||
|
||||
def _expire_multiplexers():
|
||||
global _cleanthread
|
||||
while multiplexers:
|
||||
currtime = time.time()
|
||||
for session in _multiplexers:
|
||||
if _multiplexers[session]['expiry'] < currtime:
|
||||
del _multiplexers[session]
|
||||
if multiplexers:
|
||||
_cleanthread = eventlet.spawn_after(15, _expire_multiplexers)
|
||||
else:
|
||||
_cleanthread = None
|
||||
|
||||
|
||||
class Multiplexer(object):
|
||||
def __init__(self):
|
||||
_assign_multiplexid(self)
|
||||
|
||||
|
||||
def handle_http(env, querydict):
|
||||
global _cleanthread
|
||||
if _cleanthread is None:
|
||||
_cleanthread = eventlet.spawn_after(60, _expire_multiplexers)
|
||||
if 'multiplexid' not in querydict or not querydict['multiplexid']:
|
||||
# This is a new request, create a new multiplexer
|
||||
multiplexer = Multiplexer()
|
||||
else:
|
||||
multiplexer = _multiplexers['multiplexid']['multiplexer']
|
||||
|
Loading…
x
Reference in New Issue
Block a user