2
0
mirror of https://github.com/xcat2/confluent.git synced 2024-11-22 17:43:14 +00:00

Migrate intra-collective requests to asyncio

Update dispatch to be asyncio based, remove eventlet from core

Clean up some overly verbose print statements.
This commit is contained in:
Jarrod Johnson 2024-04-30 13:56:00 -04:00
parent 553916340e
commit b967c552fd
4 changed files with 67 additions and 78 deletions

View File

@ -55,20 +55,11 @@ try:
import confluent.shellmodule as shellmodule
except ImportError:
pass
try:
import OpenSSL.crypto as crypto
except ImportError:
# Only required for collective mode
crypto = None
import confluent.util as util
import eventlet
import eventlet.green.ssl as ssl
import eventlet.semaphore as semaphore
import inspect
import itertools
import msgpack
import os
import eventlet.green.socket as socket
import struct
import sys
import yaml
@ -867,14 +858,13 @@ def abbreviate_noderange(configmanager, inputdata, operation):
return (msg.KeyValueData({'noderange': noderange.ReverseNodeRange(inputdata['nodes'], configmanager).noderange}),)
def _keepalivefn(connection, xmitlock):
async def _keepalivefn(connection, xmitlock):
while True:
eventlet.sleep(30)
with xmitlock:
await asyncio.sleep(30)
async with xmitlock:
connection.sendall(b'\x00\x00\x00\x00\x00\x00\x00\x01\x00')
def handle_dispatch(connection, cert, dispatch, peername):
cert = crypto.dump_certificate(crypto.FILETYPE_ASN1, cert)
async def handle_dispatch(connection, cert, dispatch, peername):
if not util.cert_matches(
cfm.get_collective_member(peername)['fingerprint'], cert):
connection.close()
@ -883,10 +873,11 @@ def handle_dispatch(connection, cert, dispatch, peername):
# We only support msgpack now
# The magic should preclude any pickle, as the first byte can never be
# under 0x20 or so.
connection.close()
connection[1].close()
await connection[1].wait_closed()
return
xmitlock = semaphore.Semaphore()
keepalive = eventlet.spawn(_keepalivefn, connection, xmitlock)
xmitlock = asyncio.Lock()
keepalive = util.spawn(_keepalivefn(connection, xmitlock))
dispatch = msgpack.unpackb(dispatch[2:], raw=False)
configmanager = cfm.ConfigManager(dispatch['tenant'])
nodes = dispatch['nodes']
@ -899,15 +890,16 @@ def handle_dispatch(connection, cert, dispatch, peername):
pathcomponents, operation, inputdata, nodes, dispatch['isnoderange'],
configmanager)
except Exception as res:
with xmitlock:
_forward_rsp(connection, res)
keepalive.kill()
connection.sendall('\x00\x00\x00\x00\x00\x00\x00\x00')
connection.close()
async with xmitlock:
await _forward_rsp(connection, res)
keepalive.cancel()
connection[1].write('\x00\x00\x00\x00\x00\x00\x00\x00')
await connection[1].drain()
connection[1].close()
await connection[1].wait_closed()
return
plugroute = routespec.routeinfo
nodesbyhandler = {}
passvalues = []
nodeattr = configmanager.get_node_attributes(
nodes, plugroute['pluginattrs'])
for node in nodes:
@ -928,25 +920,32 @@ def handle_dispatch(connection, cert, dispatch, peername):
else:
nodesbyhandler[hfunc] = [node]
try:
passvalues = asyncio.Queue()
numworkers = 0
for hfunc in nodesbyhandler:
passvalues.append(hfunc(
nodes=nodesbyhandler[hfunc], element=pathcomponents,
configmanager=configmanager,
inputdata=inputdata))
for res in itertools.chain(*passvalues):
with xmitlock:
_forward_rsp(connection, res)
numworkers += 1
asyncio.create_task(addtoqueue(passvalues, hfunc, {
'nodes': nodesbyhandler[hfunc],
'element': pathcomponents,
'configmanager': configmanager,
'inputdata': inputdata}))
async for res in iterate_queue(numworkers, passvalues):
async with xmitlock:
await _forward_rsp(connection, res)
except Exception as res:
with xmitlock:
_forward_rsp(connection, res)
keepalive.kill()
connection.sendall('\x00\x00\x00\x00\x00\x00\x00\x00')
connection.close()
print("oh noes, " + repr(res))
async with xmitlock:
await _forward_rsp(connection, res)
keepalive.cancel()
connection[1].write(b'\x00\x00\x00\x00\x00\x00\x00\x00')
await connection[1].drain()
connection[1].close()
await connection[1].wait_closed()
def _forward_rsp(connection, res):
async def _forward_rsp(connection, res):
try:
r = res.serialize()
r = res.serialize()
except AttributeError:
if isinstance(res, Exception):
r = msgpack.packb(['Exception', str(res)], use_bin_type=False)
@ -956,13 +955,15 @@ def _forward_rsp(connection, res):
use_bin_type=False)
except Exception as e:
r = msgpack.packb(
['Exception', 'Unable to serialize response ' + repr(res) + ' due to ' + str(e)],
use_bin_type=False)
['Exception',
'Unable to serialize response ' + repr(res) + ' due to ' + str(e)],
use_bin_type=False)
rlen = len(r)
if not rlen:
return
connection.sendall(struct.pack('!Q', rlen))
connection.sendall(r)
connection[1].write(struct.pack('!Q', rlen))
connection[1].write(r)
await connection[1].drain()
async def handle_node_request(configmanager, inputdata, operation,
@ -1170,30 +1171,21 @@ async def addtoqueue(theq, fun, kwargs):
if isinstance(result, console.Console):
await theq.put(result)
else:
if isinstance(result, types.AsyncGeneratorType):
async for pv in result:
await theq.put(pv)
else:
print(repr(result))
for pv in result:
await theq.put(pv)
async for pv in iterate_responses(result):
await theq.put(pv)
except Exception as e:
await theq.put(e)
finally:
await theq.put('theend')
def dispatch_request(nodes, manager, element, configmanager, inputdata,
async def dispatch_request(nodes, manager, element, configmanager, inputdata,
operation, isnoderange):
a = configmanager.get_collective_member(manager)
try:
remote = socket.create_connection((a['address'], 13001))
remote.settimeout(180)
remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE,
keyfile='/etc/confluent/privkey.pem',
certfile='/etc/confluent/srvcert.pem')
remote = await collective.connect_to_collective(a['fingerprint'], a['address'])
except Exception as e:
raise
for node in nodes:
if a:
yield msg.ConfluentResourceUnavailable(
@ -1206,10 +1198,7 @@ def dispatch_request(nodes, manager, element, configmanager, inputdata,
manager))
return
if not util.cert_matches(a['fingerprint'], remote.getpeercert(
binary_form=True)):
raise Exception("Invalid certificate on peer")
banner = tlvdata.recv(remote)
banner = await tlvdata.recv(remote)
vers = banner.split()[2]
if vers == b'v0':
pvers = 2
@ -1217,17 +1206,18 @@ def dispatch_request(nodes, manager, element, configmanager, inputdata,
pvers = 4
if sys.version_info[0] < 3:
pvers = 2
tlvdata.recv(remote)
await tlvdata.recv(remote)
myname = collective.get_myname()
dreq = b'\x01\x03' + msgpack.packb(
{'name': myname, 'nodes': list(nodes),
'path': element,'tenant': configmanager.tenant,
'operation': operation, 'inputdata': inputdata, 'isnoderange': isnoderange}, use_bin_type=False)
tlvdata.send(remote, {'dispatch': {'name': myname, 'length': len(dreq)}})
remote.sendall(dreq)
await tlvdata.send(remote, {'dispatch': {'name': myname, 'length': len(dreq)}})
remote[1].write(dreq)
await remote[1].drain()
while True:
try:
rlen = remote.recv(8)
rlen = await remote[0].read(8)
except Exception:
for node in nodes:
yield msg.ConfluentResourceUnavailable(
@ -1236,7 +1226,7 @@ def dispatch_request(nodes, manager, element, configmanager, inputdata,
return
while len(rlen) < 8:
try:
nlen = remote.recv(8 - len(rlen))
nlen = await remote[0].read(8 - len(rlen))
except Exception:
nlen = 0
if not nlen:
@ -1250,7 +1240,7 @@ def dispatch_request(nodes, manager, element, configmanager, inputdata,
if rlen == 0:
break
try:
rsp = remote.recv(rlen)
rsp = await remote[0].read(rlen)
except Exception:
for node in nodes:
yield msg.ConfluentResourceUnavailable(
@ -1259,7 +1249,7 @@ def dispatch_request(nodes, manager, element, configmanager, inputdata,
return
while len(rsp) < rlen:
try:
nrsp = remote.recv(rlen - len(rsp))
nrsp = await remote[0].read(rlen - len(rsp))
except Exception:
nrsp = 0
if not nrsp:

View File

@ -411,7 +411,7 @@ async def perform_requests(operator, nodes, element, cfg, inputdata, realop):
except asyncio.QueueEmpty:
pass
except asyncio.TimeoutError:
print("whoopsie?")
print("odd timeout?")
pass
finally:
for datum in sorted(

View File

@ -127,12 +127,12 @@ async def sessionhdl(connection, authname, skipauth=False, cert=None):
while not configmanager.config_is_ready():
await asyncio.sleep(1)
if 'dispatch' in response:
dreq = tlvdata.recvall(
dreq = await tlvdata.recvall(
connection, response['dispatch']['length'])
return pluginapi.handle_dispatch(connection, cert, dreq,
return await pluginapi.handle_dispatch(connection, cert, dreq,
response['dispatch']['name'])
if 'proxyconsole' in response:
return start_proxy_term(connection, cert,
return await start_proxy_term(connection, cert,
response['proxyconsole'])
authname = response['username']
passphrase = response['password']
@ -238,7 +238,7 @@ async def process_request(
auditlog.log(auditmsg)
try:
if operation == 'start':
return start_term(authname, cfm, connection, params, path,
return await start_term(authname, cfm, connection, params, path,
authdata, skipauth)
elif operation == 'shutdown' and skipauth:
configmanager.ConfigManager.shutdown()
@ -256,7 +256,7 @@ async def process_request(
return
def start_proxy_term(connection, cert, request):
async def start_proxy_term(connection, cert, request):
droneinfo = configmanager.get_collective_member(request['name'])
if not util.cert_matches(droneinfo['fingerprint'], cert):
connection.close()
@ -268,10 +268,10 @@ def start_proxy_term(connection, cert, request):
datacallback=ccons.sendall, skipreplay=request['skipreplay'],
direct=False, width=request.get('width', 80), height=request.get(
'height', 24))
term_interact(None, None, ccons, None, connection, consession, None)
await term_interact(None, None, ccons, None, connection, consession, None)
def start_term(authname, cfm, connection, params, path, authdata, skipauth):
async def start_term(authname, cfm, connection, params, path, authdata, skipauth):
elems = path.split('/')
if len(elems) < 4 or elems[1] != 'nodes':
raise exc.InvalidArgumentException('Invalid path {0}'.format(path))
@ -298,11 +298,11 @@ def start_term(authname, cfm, connection, params, path, authdata, skipauth):
raise exc.InvalidArgumentException('Invalid path {0}'.format(path))
if consession is None:
raise Exception("TODO")
term_interact(authdata, authname, ccons, cfm, connection, consession,
await term_interact(authdata, authname, ccons, cfm, connection, consession,
skipauth)
def term_interact(authdata, authname, ccons, cfm, connection, consession,
async def term_interact(authdata, authname, ccons, cfm, connection, consession,
skipauth):
send_data(connection, {'started': 1})
ccons.startsending()
@ -310,7 +310,7 @@ def term_interact(authdata, authname, ccons, cfm, connection, consession,
if bufferage is not False:
send_data(connection, {'bufferage': bufferage})
while consession is not None:
data = tlvdata.recv(connection)
data = await tlvdata.recv(connection)
if type(data) == dict:
if data['operation'] == 'stop':
consession.destroy()

View File

@ -40,7 +40,6 @@ def mkdirp(path, mode=0o777):
async def _sleep_and_run(sleeptime, func, args):
await asyncio.sleep(sleeptime)
print(repr(func))
await func(*args)