From b967c552fd5fe828f3542edad49f2639c8134ef0 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Tue, 30 Apr 2024 13:56:00 -0400 Subject: [PATCH] Migrate intra-collective requests to asyncio Update dispatch to be asyncio based, remove eventlet from core Clean up some overly verbose print statements. --- confluent_server/confluent/core.py | 122 ++++++++---------- .../plugins/hardwaremanagement/ipmi.py | 2 +- confluent_server/confluent/sockapi.py | 20 +-- confluent_server/confluent/util.py | 1 - 4 files changed, 67 insertions(+), 78 deletions(-) diff --git a/confluent_server/confluent/core.py b/confluent_server/confluent/core.py index 12a382f0..17fb8757 100644 --- a/confluent_server/confluent/core.py +++ b/confluent_server/confluent/core.py @@ -55,20 +55,11 @@ try: import confluent.shellmodule as shellmodule except ImportError: pass -try: - import OpenSSL.crypto as crypto -except ImportError: - # Only required for collective mode - crypto = None import confluent.util as util -import eventlet -import eventlet.green.ssl as ssl -import eventlet.semaphore as semaphore import inspect import itertools import msgpack import os -import eventlet.green.socket as socket import struct import sys import yaml @@ -867,14 +858,13 @@ def abbreviate_noderange(configmanager, inputdata, operation): return (msg.KeyValueData({'noderange': noderange.ReverseNodeRange(inputdata['nodes'], configmanager).noderange}),) -def _keepalivefn(connection, xmitlock): +async def _keepalivefn(connection, xmitlock): while True: - eventlet.sleep(30) - with xmitlock: + await asyncio.sleep(30) + async with xmitlock: connection.sendall(b'\x00\x00\x00\x00\x00\x00\x00\x01\x00') -def handle_dispatch(connection, cert, dispatch, peername): - cert = crypto.dump_certificate(crypto.FILETYPE_ASN1, cert) +async def handle_dispatch(connection, cert, dispatch, peername): if not util.cert_matches( cfm.get_collective_member(peername)['fingerprint'], cert): connection.close() @@ -883,10 +873,11 @@ def handle_dispatch(connection, cert, dispatch, peername): # We only support msgpack now # The magic should preclude any pickle, as the first byte can never be # under 0x20 or so. - connection.close() + connection[1].close() + await connection[1].wait_closed() return - xmitlock = semaphore.Semaphore() - keepalive = eventlet.spawn(_keepalivefn, connection, xmitlock) + xmitlock = asyncio.Lock() + keepalive = util.spawn(_keepalivefn(connection, xmitlock)) dispatch = msgpack.unpackb(dispatch[2:], raw=False) configmanager = cfm.ConfigManager(dispatch['tenant']) nodes = dispatch['nodes'] @@ -899,15 +890,16 @@ def handle_dispatch(connection, cert, dispatch, peername): pathcomponents, operation, inputdata, nodes, dispatch['isnoderange'], configmanager) except Exception as res: - with xmitlock: - _forward_rsp(connection, res) - keepalive.kill() - connection.sendall('\x00\x00\x00\x00\x00\x00\x00\x00') - connection.close() + async with xmitlock: + await _forward_rsp(connection, res) + keepalive.cancel() + connection[1].write('\x00\x00\x00\x00\x00\x00\x00\x00') + await connection[1].drain() + connection[1].close() + await connection[1].wait_closed() return plugroute = routespec.routeinfo nodesbyhandler = {} - passvalues = [] nodeattr = configmanager.get_node_attributes( nodes, plugroute['pluginattrs']) for node in nodes: @@ -928,25 +920,32 @@ def handle_dispatch(connection, cert, dispatch, peername): else: nodesbyhandler[hfunc] = [node] try: + passvalues = asyncio.Queue() + numworkers = 0 for hfunc in nodesbyhandler: - passvalues.append(hfunc( - nodes=nodesbyhandler[hfunc], element=pathcomponents, - configmanager=configmanager, - inputdata=inputdata)) - for res in itertools.chain(*passvalues): - with xmitlock: - _forward_rsp(connection, res) + numworkers += 1 + asyncio.create_task(addtoqueue(passvalues, hfunc, { + 'nodes': nodesbyhandler[hfunc], + 'element': pathcomponents, + 'configmanager': configmanager, + 'inputdata': inputdata})) + async for res in iterate_queue(numworkers, passvalues): + async with xmitlock: + await _forward_rsp(connection, res) except Exception as res: - with xmitlock: - _forward_rsp(connection, res) - keepalive.kill() - connection.sendall('\x00\x00\x00\x00\x00\x00\x00\x00') - connection.close() + print("oh noes, " + repr(res)) + async with xmitlock: + await _forward_rsp(connection, res) + keepalive.cancel() + connection[1].write(b'\x00\x00\x00\x00\x00\x00\x00\x00') + await connection[1].drain() + connection[1].close() + await connection[1].wait_closed() -def _forward_rsp(connection, res): +async def _forward_rsp(connection, res): try: - r = res.serialize() + r = res.serialize() except AttributeError: if isinstance(res, Exception): r = msgpack.packb(['Exception', str(res)], use_bin_type=False) @@ -956,13 +955,15 @@ def _forward_rsp(connection, res): use_bin_type=False) except Exception as e: r = msgpack.packb( - ['Exception', 'Unable to serialize response ' + repr(res) + ' due to ' + str(e)], - use_bin_type=False) + ['Exception', + 'Unable to serialize response ' + repr(res) + ' due to ' + str(e)], + use_bin_type=False) rlen = len(r) if not rlen: return - connection.sendall(struct.pack('!Q', rlen)) - connection.sendall(r) + connection[1].write(struct.pack('!Q', rlen)) + connection[1].write(r) + await connection[1].drain() async def handle_node_request(configmanager, inputdata, operation, @@ -1170,30 +1171,21 @@ async def addtoqueue(theq, fun, kwargs): if isinstance(result, console.Console): await theq.put(result) else: - - if isinstance(result, types.AsyncGeneratorType): - async for pv in result: - await theq.put(pv) - else: - print(repr(result)) - for pv in result: - await theq.put(pv) + async for pv in iterate_responses(result): + await theq.put(pv) except Exception as e: await theq.put(e) finally: await theq.put('theend') -def dispatch_request(nodes, manager, element, configmanager, inputdata, +async def dispatch_request(nodes, manager, element, configmanager, inputdata, operation, isnoderange): a = configmanager.get_collective_member(manager) try: - remote = socket.create_connection((a['address'], 13001)) - remote.settimeout(180) - remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE, - keyfile='/etc/confluent/privkey.pem', - certfile='/etc/confluent/srvcert.pem') + remote = await collective.connect_to_collective(a['fingerprint'], a['address']) except Exception as e: + raise for node in nodes: if a: yield msg.ConfluentResourceUnavailable( @@ -1206,10 +1198,7 @@ def dispatch_request(nodes, manager, element, configmanager, inputdata, manager)) return - if not util.cert_matches(a['fingerprint'], remote.getpeercert( - binary_form=True)): - raise Exception("Invalid certificate on peer") - banner = tlvdata.recv(remote) + banner = await tlvdata.recv(remote) vers = banner.split()[2] if vers == b'v0': pvers = 2 @@ -1217,17 +1206,18 @@ def dispatch_request(nodes, manager, element, configmanager, inputdata, pvers = 4 if sys.version_info[0] < 3: pvers = 2 - tlvdata.recv(remote) + await tlvdata.recv(remote) myname = collective.get_myname() dreq = b'\x01\x03' + msgpack.packb( {'name': myname, 'nodes': list(nodes), 'path': element,'tenant': configmanager.tenant, 'operation': operation, 'inputdata': inputdata, 'isnoderange': isnoderange}, use_bin_type=False) - tlvdata.send(remote, {'dispatch': {'name': myname, 'length': len(dreq)}}) - remote.sendall(dreq) + await tlvdata.send(remote, {'dispatch': {'name': myname, 'length': len(dreq)}}) + remote[1].write(dreq) + await remote[1].drain() while True: try: - rlen = remote.recv(8) + rlen = await remote[0].read(8) except Exception: for node in nodes: yield msg.ConfluentResourceUnavailable( @@ -1236,7 +1226,7 @@ def dispatch_request(nodes, manager, element, configmanager, inputdata, return while len(rlen) < 8: try: - nlen = remote.recv(8 - len(rlen)) + nlen = await remote[0].read(8 - len(rlen)) except Exception: nlen = 0 if not nlen: @@ -1250,7 +1240,7 @@ def dispatch_request(nodes, manager, element, configmanager, inputdata, if rlen == 0: break try: - rsp = remote.recv(rlen) + rsp = await remote[0].read(rlen) except Exception: for node in nodes: yield msg.ConfluentResourceUnavailable( @@ -1259,7 +1249,7 @@ def dispatch_request(nodes, manager, element, configmanager, inputdata, return while len(rsp) < rlen: try: - nrsp = remote.recv(rlen - len(rsp)) + nrsp = await remote[0].read(rlen - len(rsp)) except Exception: nrsp = 0 if not nrsp: diff --git a/confluent_server/confluent/plugins/hardwaremanagement/ipmi.py b/confluent_server/confluent/plugins/hardwaremanagement/ipmi.py index 602ce45b..46aebdaf 100644 --- a/confluent_server/confluent/plugins/hardwaremanagement/ipmi.py +++ b/confluent_server/confluent/plugins/hardwaremanagement/ipmi.py @@ -411,7 +411,7 @@ async def perform_requests(operator, nodes, element, cfg, inputdata, realop): except asyncio.QueueEmpty: pass except asyncio.TimeoutError: - print("whoopsie?") + print("odd timeout?") pass finally: for datum in sorted( diff --git a/confluent_server/confluent/sockapi.py b/confluent_server/confluent/sockapi.py index c82d69b8..71cffe55 100644 --- a/confluent_server/confluent/sockapi.py +++ b/confluent_server/confluent/sockapi.py @@ -127,12 +127,12 @@ async def sessionhdl(connection, authname, skipauth=False, cert=None): while not configmanager.config_is_ready(): await asyncio.sleep(1) if 'dispatch' in response: - dreq = tlvdata.recvall( + dreq = await tlvdata.recvall( connection, response['dispatch']['length']) - return pluginapi.handle_dispatch(connection, cert, dreq, + return await pluginapi.handle_dispatch(connection, cert, dreq, response['dispatch']['name']) if 'proxyconsole' in response: - return start_proxy_term(connection, cert, + return await start_proxy_term(connection, cert, response['proxyconsole']) authname = response['username'] passphrase = response['password'] @@ -238,7 +238,7 @@ async def process_request( auditlog.log(auditmsg) try: if operation == 'start': - return start_term(authname, cfm, connection, params, path, + return await start_term(authname, cfm, connection, params, path, authdata, skipauth) elif operation == 'shutdown' and skipauth: configmanager.ConfigManager.shutdown() @@ -256,7 +256,7 @@ async def process_request( return -def start_proxy_term(connection, cert, request): +async def start_proxy_term(connection, cert, request): droneinfo = configmanager.get_collective_member(request['name']) if not util.cert_matches(droneinfo['fingerprint'], cert): connection.close() @@ -268,10 +268,10 @@ def start_proxy_term(connection, cert, request): datacallback=ccons.sendall, skipreplay=request['skipreplay'], direct=False, width=request.get('width', 80), height=request.get( 'height', 24)) - term_interact(None, None, ccons, None, connection, consession, None) + await term_interact(None, None, ccons, None, connection, consession, None) -def start_term(authname, cfm, connection, params, path, authdata, skipauth): +async def start_term(authname, cfm, connection, params, path, authdata, skipauth): elems = path.split('/') if len(elems) < 4 or elems[1] != 'nodes': raise exc.InvalidArgumentException('Invalid path {0}'.format(path)) @@ -298,11 +298,11 @@ def start_term(authname, cfm, connection, params, path, authdata, skipauth): raise exc.InvalidArgumentException('Invalid path {0}'.format(path)) if consession is None: raise Exception("TODO") - term_interact(authdata, authname, ccons, cfm, connection, consession, + await term_interact(authdata, authname, ccons, cfm, connection, consession, skipauth) -def term_interact(authdata, authname, ccons, cfm, connection, consession, +async def term_interact(authdata, authname, ccons, cfm, connection, consession, skipauth): send_data(connection, {'started': 1}) ccons.startsending() @@ -310,7 +310,7 @@ def term_interact(authdata, authname, ccons, cfm, connection, consession, if bufferage is not False: send_data(connection, {'bufferage': bufferage}) while consession is not None: - data = tlvdata.recv(connection) + data = await tlvdata.recv(connection) if type(data) == dict: if data['operation'] == 'stop': consession.destroy() diff --git a/confluent_server/confluent/util.py b/confluent_server/confluent/util.py index e132ffff..a8a9f59f 100644 --- a/confluent_server/confluent/util.py +++ b/confluent_server/confluent/util.py @@ -40,7 +40,6 @@ def mkdirp(path, mode=0o777): async def _sleep_and_run(sleeptime, func, args): await asyncio.sleep(sleeptime) - print(repr(func)) await func(*args)