mirror of
https://github.com/xcat2/confluent.git
synced 2025-01-15 04:07:51 +00:00
334ec00f3f
Previously an expired async id would trigger unexpected error. Now it returns an error to client and avoids the trace log.
193 lines
6.8 KiB
Python
193 lines
6.8 KiB
Python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
|
|
|
# Copyright 2016-2018 Lenovo
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
# Overall, the result of this shall be:
|
|
# - Web clients can create the same out-of-order responsiveness as socket
|
|
# clients (but with more complexity on their end)
|
|
# - Web clients can share single request among console sessions
|
|
# - Web clients can get async notify of things like node add/remove, events
|
|
|
|
# This provides an async strategy to http clients. The design is that a http
|
|
# session may have an 'async' resource. In such a case, any requests are
|
|
# queued and immediately the response is given accepting the queued request.
|
|
# A request flags itself as queue-compatible through an HTTP header indicating
|
|
# the identifier of the async thread. As responses happen to the queued
|
|
# request, data is dispatched to the first registered poller for data on
|
|
# the session. This way, a client may elect to provide multiple pollers
|
|
# to mitigate general choppiness of http network pattern. It may not be
|
|
# worth it, but it's possible.
|
|
|
|
# Additionally support console session multiplexing, to mitigate needed
|
|
# connection count.
|
|
|
|
# Also, this should allow a client to register for notifications of things
|
|
# like node add/delete or an event firing, ultimately.
|
|
|
|
# Much like console sessions, these will be reaped if a client spends too
|
|
# far away.
|
|
|
|
import collections
|
|
import confluent.exceptions as exc
|
|
import confluent.messages as messages
|
|
import confluent.util as util
|
|
import eventlet
|
|
import greenlet
|
|
import time
|
|
|
|
_asyncsessions = {}
|
|
_cleanthread = None
|
|
_consolesessions = None
|
|
|
|
|
|
def _assign_asyncid(asyncsession):
|
|
sessid = util.randomstring(32)
|
|
while sessid in _asyncsessions:
|
|
sessid = util.randomstring(32)
|
|
_asyncsessions[sessid] = {'asyncsession': asyncsession}
|
|
return sessid
|
|
|
|
|
|
class AsyncTermRelation(object):
|
|
# Need to keep an association of term object to async
|
|
# This allows the async handler to know the context of
|
|
# outgoing data to provide to calling code
|
|
def __init__(self, termid, async):
|
|
self.async = async
|
|
self.termid = termid
|
|
|
|
def got_data(self, data):
|
|
self.async.add(self.termid, data)
|
|
|
|
|
|
class AsyncSession(object):
|
|
|
|
def __init__(self):
|
|
self.asyncid = _assign_asyncid(self)
|
|
self.responses = collections.deque()
|
|
self._evt = None
|
|
self.termrelations = []
|
|
self.consoles = set([])
|
|
self.reaper = eventlet.spawn_after(15, self.destroy)
|
|
|
|
def add(self, requestid, rsp):
|
|
if self.responses is None:
|
|
return
|
|
self.responses.append((requestid, rsp))
|
|
if self._evt:
|
|
self._evt.send()
|
|
self._evt = None
|
|
|
|
def set_term_relation(self, env):
|
|
# need a term relation to keep track of what data belongs
|
|
# to what object (since the callback does not provide context
|
|
# for data, and here ultimately the client is responsible
|
|
# for sorting out which is which.
|
|
termrel = AsyncTermRelation(env['HTTP_CONFLUENTREQUESTID'], self)
|
|
self.termrelations.append(termrel)
|
|
return termrel
|
|
|
|
def add_console_session(self, sessionid):
|
|
self.consoles.add(sessionid)
|
|
|
|
def destroy(self):
|
|
if self._evt:
|
|
self._evt.send()
|
|
self._evt = None
|
|
for console in self.consoles:
|
|
_consolesessions[console]['session'].destroy()
|
|
self.consoles = None
|
|
self.responses = None
|
|
del _asyncsessions[self.asyncid]
|
|
|
|
def run_handler(self, handler, requestid):
|
|
try:
|
|
for rsp in handler:
|
|
self.add(requestid, rsp)
|
|
self.add(requestid, messages.AsyncCompletion())
|
|
except Exception as e:
|
|
self.add(requestid, e)
|
|
|
|
def get_responses(self, timeout=25):
|
|
self.reaper.cancel()
|
|
self.reaper = eventlet.spawn_after(timeout + 15, self.destroy)
|
|
nextexpiry = time.time() + 90
|
|
for csess in list(self.consoles):
|
|
try:
|
|
_consolesessions[csess]['expiry'] = nextexpiry
|
|
except KeyError: # session has been closed elsewhere
|
|
self.consoles.discard(csess)
|
|
if self._evt:
|
|
# TODO(jjohnson2): This precludes the goal of 'double barreled'
|
|
# access.... revisit if this could matter
|
|
raise Exception('get_responses is not re-entrant')
|
|
if not self.responses: # wait to accumulate some
|
|
self._evt = eventlet.event.Event()
|
|
with eventlet.Timeout(timeout, False):
|
|
self._evt.wait()
|
|
self._evt = None
|
|
while self.responses:
|
|
yield self.responses.popleft()
|
|
|
|
|
|
def run_handler(hdlr, env):
|
|
asyncsessid = env['HTTP_CONFLUENTASYNCID']
|
|
try:
|
|
asyncsession = _asyncsessions[asyncsessid]['asyncsession']
|
|
requestid = env['HTTP_CONFLUENTREQUESTID']
|
|
except KeyError:
|
|
raise exc.InvalidArgumentException(
|
|
'Invalid Session ID or missing request id')
|
|
eventlet.spawn_n(asyncsession.run_handler, hdlr, requestid)
|
|
return requestid
|
|
|
|
|
|
def get_async(env, querydict):
|
|
global _cleanthread
|
|
return _asyncsessions[env['HTTP_CONFLUENTASYNCID']]['asyncsession']
|
|
|
|
|
|
def handle_async(env, querydict, threadset):
|
|
global _cleanthread
|
|
# This may be one of two things, a request for a new async stream
|
|
# or a request for next data from async stream
|
|
# httpapi otherwise handles requests an injecting them to queue
|
|
if 'asyncid' not in querydict or not querydict['asyncid']:
|
|
# This is a new request, create a new multiplexer
|
|
currsess = AsyncSession()
|
|
yield messages.AsyncSession(currsess.asyncid)
|
|
return
|
|
if querydict['asyncid'] not in _asyncsessions:
|
|
raise exc.InvalidArgumentException(
|
|
'Invalid or expired async id')
|
|
mythreadid = greenlet.getcurrent()
|
|
threadset.add(mythreadid)
|
|
loggedout = None
|
|
currsess = None
|
|
try:
|
|
currsess = _asyncsessions[querydict['asyncid']]['asyncsession']
|
|
for rsp in currsess.get_responses():
|
|
yield messages.AsyncMessage(rsp)
|
|
except greenlet.GreenletExit as ge:
|
|
loggedout = ge
|
|
threadset.discard(mythreadid)
|
|
if loggedout is not None:
|
|
currsess.destroy()
|
|
raise exc.LoggedOut()
|
|
|
|
|
|
def set_console_sessions(consolesessions):
|
|
global _consolesessions
|
|
_consolesessions = consolesessions |