2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-09-03 08:48:29 +00:00

Purge eventlet and greenlet and long-polling support

Rather than try to support long deprecated http api behavior,
purge it for simpler code and remove eventlet/greenlet from the http
stack.
This commit is contained in:
Jarrod Johnson
2024-04-11 09:09:02 -04:00
parent fb8ac158cb
commit c3cafd9bf8
2 changed files with 17 additions and 144 deletions

View File

@@ -14,21 +14,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# Overall, the result of this shall be:
# - Web clients can create the same out-of-order responsiveness as socket
# clients (but with more complexity on their end)
# - Web clients can share single request among console sessions
# - Web clients can get async notify of things like node add/remove, events
# This provides an async strategy to http clients. The design is that a http
# session may have an 'async' resource. In such a case, any requests are
# queued and immediately the response is given accepting the queued request.
# A request flags itself as queue-compatible through an HTTP header indicating
# the identifier of the async thread. As responses happen to the queued
# request, data is dispatched to the first registered poller for data on
# the session. This way, a client may elect to provide multiple pollers
# to mitigate general choppiness of http network pattern. It may not be
# worth it, but it's possible.
# This handles ownership of asynchronous behavior driving sessions
# with websockets. There was a long-polling HTTP mechanism but that is removed
# Now it's possible to have asynchronous requests multiplexed over a single websockets
# with none of the "choppiness" inherent to multiple long-polling requests
# Additionally support console session multiplexing, to mitigate needed
# connection count.
@@ -44,12 +33,9 @@ import collections
import confluent.exceptions as exc
import confluent.messages as messages
import confluent.util as util
import eventlet
import greenlet
import time
_asyncsessions = {}
_cleanthread = None
_consolesessions = None
@@ -75,27 +61,14 @@ class AsyncTermRelation(object):
class AsyncSession(object):
def __init__(self, wshandler=None):
def __init__(self, wshandler):
self.asyncid = _assign_asyncid(self)
self.responses = collections.deque()
self.wshandler = wshandler
self._evt = None
self.termrelations = []
self.consoles = set([])
if not wshandler:
self.reaper = eventlet.spawn_after(15, self.destroy)
async def add(self, requestid, rsp):
if self.wshandler:
await self.wshandler(messages.AsyncMessage((requestid, rsp)))
if self.responses is None:
return
self.responses.append((requestid, rsp))
if self._evt:
self._evt.send()
self._evt = None
await self.wshandler(messages.AsyncMessage((requestid, rsp)))
def set_term_relation(self, env):
# need a term relation to keep track of what data belongs
@@ -110,13 +83,9 @@ class AsyncSession(object):
self.consoles.add(sessionid)
def destroy(self):
if self._evt:
self._evt.send()
self._evt = None
for console in self.consoles:
_consolesessions[console]['session'].destroy()
self.consoles = set([])
self.responses = None
del _asyncsessions[self.asyncid]
async def run_handler(self, handler, requestid):
@@ -129,28 +98,6 @@ class AsyncSession(object):
print(repr(e))
await self.add(requestid, e)
def get_responses(self, timeout=25):
self.reaper.cancel()
self.reaper = eventlet.spawn_after(timeout + 15, self.destroy)
nextexpiry = time.time() + 90
for csess in list(self.consoles):
try:
_consolesessions[csess]['expiry'] = nextexpiry
except KeyError: # session has been closed elsewhere
self.consoles.discard(csess)
if self._evt:
# TODO(jjohnson2): This precludes the goal of 'double barreled'
# access.... revisit if this could matter
raise Exception('get_responses is not re-entrant')
if not self.responses: # wait to accumulate some
self._evt = eventlet.event.Event()
with eventlet.Timeout(timeout, False):
self._evt.wait()
self._evt = None
while self.responses:
yield self.responses.popleft()
async def run_handler(hdlr, req):
asyncsessid = req.headers['ConfluentAsyncId']
try:
@@ -161,17 +108,14 @@ async def run_handler(hdlr, req):
'Invalid Session ID or missing request id')
cloop = asyncio.get_event_loop()
cloop.create_task(asyncsession.run_handler(hdlr, requestid))
#eventlet.spawn_n(asyncsession.run_handler, hdlr, requestid)
return requestid
def get_async(env, querydict):
global _cleanthread
return _asyncsessions[env['HTTP_CONFLUENTASYNCID']]['asyncsession']
def handle_async(env, querydict, threadset, wshandler=None):
global _cleanthread
# This may be one of two things, a request for a new async stream
# or a request for next data from async stream
# httpapi otherwise handles requests an injecting them to queue
@@ -181,25 +125,7 @@ def handle_async(env, querydict, threadset, wshandler=None):
if wshandler:
yield currsess
return
yield messages.AsyncSession(currsess.asyncid)
return
if querydict['asyncid'] not in _asyncsessions:
raise exc.InvalidArgumentException(
'Invalid or expired async id')
mythreadid = greenlet.getcurrent()
threadset.add(mythreadid)
loggedout = None
currsess = None
try:
currsess = _asyncsessions[querydict['asyncid']]['asyncsession']
for rsp in currsess.get_responses():
yield messages.AsyncMessage(rsp)
except greenlet.GreenletExit as ge:
loggedout = ge
threadset.discard(mythreadid)
if loggedout is not None:
currsess.destroy()
raise exc.LoggedOut()
raise Exception("Long polling asynchttp is discontinued")
def set_console_sessions(consolesessions):

View File

@@ -43,9 +43,6 @@ import confluent.shellserver as shellserver
import confluent.tlvdata
import confluent.util as util
import copy
import eventlet
import eventlet.greenthread
import greenlet
import json
import socket
import sys
@@ -55,11 +52,11 @@ try:
import urlparse
except ModuleNotFoundError:
import urllib.parse as urlparse
import eventlet.websocket
import eventlet.wsgi
tlvdata = confluent.tlvdata
_cleaner = None
auditlog = None
tracelog = None
consolesessions = {}
@@ -149,7 +146,7 @@ create_resource_functions = {
}
def _sessioncleaner():
async def _sessioncleaner():
while True:
currtime = time.time()
targsessions = []
@@ -165,7 +162,7 @@ def _sessioncleaner():
targsessions.append(session)
for session in targsessions:
del consolesessions[session]
eventlet.sleep(10)
await asyncio.sleep(10)
def _get_query_dict(req, reqbody, reqtype):
@@ -300,7 +297,7 @@ async def _authorize_request(req, operation, reqbody):
for mythread in httpsessions[sessionid]['inflight']:
targets.append(mythread)
for mythread in targets:
eventlet.greenthread.kill(mythread)
print(repr(mythread))
forwarder.close_session(sessionid)
del httpsessions[sessionid]
return ('logout',)
@@ -791,7 +788,6 @@ async def resourcehandler_backend(req, make_response):
return rsp
elif (operation == 'create' and ('/console/session' in reqpath or
'/shell/sessions/' in reqpath)):
#hard bake JSON into this path, do not support other incarnations
if '/console/session' in reqpath:
prefix, _, _ = reqpath.partition('/console/session')
shellsession = False
@@ -879,54 +875,7 @@ async def resourcehandler_backend(req, make_response):
rsp.write(json.dumps({'session': querydict['session']}))
return rsp
else: # no keys, but a session, means it's hooking to receive data
sessid = querydict['session']
if sessid not in consolesessions:
start_response('400 Expired Session', headers)
return rsp
consolesessions[sessid]['expiry'] = time.time() + 90
# add our thread to the 'inflight' to have a hook to terminate
# a long polling request
loggedout = None
mythreadid = greenlet.getcurrent()
httpsessions[authorized['sessionid']]['inflight'].add(mythreadid)
try:
outdata = consolesessions[sessid]['session'].get_next_output(
timeout=25)
except greenlet.GreenletExit as ge:
loggedout = ge
httpsessions[authorized['sessionid']]['inflight'].discard(
mythreadid)
if sessid not in consolesessions:
start_response('400 Expired Session', headers)
return rsp
if loggedout is not None:
consolesessions[sessid]['session'].destroy()
start_response('401 Logged out', headers)
rsp.write(b'{"loggedout": 1}')
return rsp
bufferage = False
if 'stampsent' not in consolesessions[sessid]:
consolesessions[sessid]['stampsent'] = True
bufferage = consolesessions[sessid]['session'].get_buffer_age()
if isinstance(outdata, dict):
rspdata = outdata
rspdata['session'] = querydict['session']
else:
rspdata = {'session': querydict['session'],
'data': outdata}
if bufferage is not False:
rspdata['bufferage'] = bufferage
try:
rspj = json.dumps(rspdata)
except UnicodeDecodeError:
try:
rspj = json.dumps(rspdata, encoding='cp437')
except UnicodeDecodeError:
rspj = json.dumps({'session': querydict['session'],
'data': 'DECODEERROR'})
start_response('200 OK', headers)
rsp.write(rspj)
return rsp
raise Exception("long polling console sessions are discontinued")
else:
# normal request
url = reqpath
@@ -1128,8 +1077,6 @@ async def serve(bind_host, bind_port):
await runner.setup()
site = web.SockSite(runner, sock)
await site.start()
# eventlet.wsgi.server(sock, resourcehandler, log=False, log_output=False,
# debug=False, socket_timeout=60, keepalive=False)
@@ -1142,13 +1089,13 @@ class HttpApi(object):
self.bind_port = bind_port or 4005
def start(self):
global _cleaner
global auditlog
global tracelog
if _cleaner is None:
_cleaner = asyncio.get_event_loop().create_task(
_sessioncleaner())
tracelog = log.Logger('trace')
auditlog = log.Logger('audit')
self.server = asyncio.get_event_loop().create_task(
serve(self.bind_host, self.bind_port))
# self.server = eventlet.spawn(serve, self.bind_host, self.bind_port)
_cleaner = eventlet.spawn(_sessioncleaner)