From e8110551db85cd639bb78710e16f84b1184b2778 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Mon, 15 Apr 2024 17:19:27 -0400 Subject: [PATCH] Port some of the collective management to asyncio --- confluent_server/bin/collective | 50 +++++++++++-------- .../confluent/collective/manager.py | 38 +++++++------- confluent_server/confluent/core.py | 2 +- confluent_server/confluent/sockapi.py | 4 +- 4 files changed, 52 insertions(+), 42 deletions(-) diff --git a/confluent_server/bin/collective b/confluent_server/bin/collective index 0168abfd..71a9daa6 100644 --- a/confluent_server/bin/collective +++ b/confluent_server/bin/collective @@ -1,6 +1,7 @@ -#!/usr/bin/python2 +#!/usr/bin/python3 import argparse +import asyncio import errno import os import pwd @@ -55,37 +56,44 @@ def make_certificate(): async def show_invitation(name, nonvoting=False): if not os.path.exists('/etc/confluent/srvcert.pem'): make_certificate() - s = client.Command().connection + clicmd = client.Command() + await clicmd.ensure_connected() + s = clicmd.connection role = 'nonvoting' if nonvoting else None - tlvdata.send(s, {'collective': {'operation': 'invite', 'name': name, 'role': role}}) - invite = tlvdata.recv(s)['collective'] + await tlvdata.send(s, {'collective': {'operation': 'invite', 'name': name, 'role': role}}) + invite = await tlvdata.recv(s) + invite = invite['collective'] if 'error' in invite: sys.stderr.write(invite['error'] + '\n') return print('{0}'.format(invite['invitation'])) -def join_collective(server, invitation): +async def join_collective(server, invitation): if not os.path.exists('/etc/confluent/srvcert.pem'): make_certificate() - s = client.Command().connection + clicmd = client.Command() + await clicmd.ensure_connected() + s = clicmd.connection while not invitation: invitation = input('Paste the invitation here: ') - tlvdata.send(s, {'collective': {'operation': 'join', + await tlvdata.send(s, {'collective': {'operation': 'join', 'invitation': invitation, 'server': server}}) - res = tlvdata.recv(s) + res = await tlvdata.recv(s) res = res.get('collective', {'status': 'Unknown response: ' + repr(res)}) print(res.get('status', res.get('error', repr(res)))) if 'error' in res: sys.exit(1) -def delete_member(name): - s = client.Command().connection - tlvdata.send(s, {'collective': {'operation': 'delete', +async def delete_member(name): + clicmd = client.Command() + await clicmd.ensure_connected() + s = clicmd.connection + await tlvdata.send(s, {'collective': {'operation': 'delete', 'member': name}}) - res = tlvdata.recv(s) + res = await tlvdata.recv(s) res = res.get('collective', {'status': 'Unknown response: ' + repr(res)}) print(res.get('status', res.get('error', repr(res)))) @@ -93,10 +101,12 @@ def delete_member(name): sys.exit(1) -def show_collective(): - s = client.Command().connection - tlvdata.send(s, {'collective': {'operation': 'show'}}) - res = tlvdata.recv(s) +async def show_collective(): + clicmd = client.Command() + await clicmd.ensure_connected() + s = clicmd.connection + await tlvdata.send(s, {'collective': {'operation': 'show'}}) + res = await tlvdata.recv(s) if 'error' in res: print(res['error']) return @@ -143,13 +153,13 @@ async def main(): if cmdset.command == 'gencert': make_certificate() elif cmdset.command == 'invite': - show_invitation(cmdset.name, cmdset.n) + await show_invitation(cmdset.name, cmdset.n) elif cmdset.command == 'join': - join_collective(cmdset.server, cmdset.i) + await join_collective(cmdset.server, cmdset.i) elif cmdset.command == 'show': - show_collective() + await show_collective() elif cmdset.command == 'delete': - delete_member(cmdset.name) + await delete_member(cmdset.name) if __name__ == '__main__': asyncio.get_event_loop().run_until_complete(main()) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 5c1264b1..77239d66 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -259,7 +259,7 @@ def get_myname(): def in_collective(): return bool(list(cfm.list_collective())) -def handle_connection(connection, cert, request, local=False): +async def handle_connection(connection, cert, request, local=False): global currentleader global retrythread global initting @@ -272,7 +272,7 @@ def handle_connection(connection, cert, request, local=False): return if operation in ('show', 'delete'): if not list(cfm.list_collective()): - tlvdata.send(connection, + await tlvdata.send(connection, {'collective': {'error': 'Collective mode not ' 'enabled on this ' 'system'}}) @@ -292,17 +292,17 @@ def handle_connection(connection, cert, request, local=False): linfo['fingerprint'], cert)): remote.close() - tlvdata.send(connection, + await tlvdata.send(connection, {'error': 'Invalid certificate, ' 'redo invitation process'}) connection.close() return - tlvdata.recv(remote) # ignore banner - tlvdata.recv(remote) # ignore authpassed: 0 - tlvdata.send(remote, + await tlvdata.recv(remote) # ignore banner + await tlvdata.recv(remote) # ignore authpassed: 0 + await tlvdata.send(remote, {'collective': {'operation': 'getinfo', 'name': get_myname()}}) - collinfo = tlvdata.recv(remote) + collinfo = await tlvdata.recv(remote) else: collinfo = {} populate_collinfo(collinfo) @@ -312,20 +312,20 @@ def handle_connection(connection, cert, request, local=False): except exc.DegradedCollective: collinfo['quorum'] = False if operation == 'show': - tlvdata.send(connection, {'collective': collinfo}) + await tlvdata.send(connection, {'collective': collinfo}) elif operation == 'delete': todelete = request['member'] if (todelete == collinfo['leader'] or todelete in collinfo['active']): - tlvdata.send(connection, {'collective': + await tlvdata.send(connection, {'collective': {'error': '{0} is still active, stop the confluent service to remove it'.format(todelete)}}) return if todelete not in collinfo['offline']: - tlvdata.send(connection, {'collective': + await tlvdata.send(connection, {'collective': {'error': '{0} is not a recognized collective member'.format(todelete)}}) return cfm.del_collective_member(todelete) - tlvdata.send(connection, + await tlvdata.send(connection, {'collective': {'status': 'Successfully deleted {0}'.format(todelete)}}) connection.close() return @@ -342,7 +342,7 @@ def handle_connection(connection, cert, request, local=False): name = request['name'] role = request.get('role', '') invitation = invites.create_server_invitation(name, role) - tlvdata.send(connection, + await tlvdata.send(connection, {'collective': {'invitation': invitation}}) connection.close() if 'join' == operation: @@ -352,7 +352,7 @@ def handle_connection(connection, cert, request, local=False): name, invitation = invitation.split(b'@', 1) name = util.stringify(name) except Exception: - tlvdata.send( + await tlvdata.send( connection, {'collective': {'status': 'Invalid token format'}}) @@ -369,7 +369,7 @@ def handle_connection(connection, cert, request, local=False): keyfile='/etc/confluent/privkey.pem', certfile='/etc/confluent/srvcert.pem') except Exception: - tlvdata.send( + await tlvdata.send( connection, {'collective': {'status': 'Failed to connect to {0}'.format(host)}}) @@ -380,11 +380,11 @@ def handle_connection(connection, cert, request, local=False): cert = remote.getpeercert(binary_form=True) proof = base64.b64encode(invites.create_client_proof( invitation, mycert, cert)) - tlvdata.recv(remote) # ignore banner - tlvdata.recv(remote) # ignore authpassed: 0 - tlvdata.send(remote, {'collective': {'operation': 'enroll', + await tlvdata.recv(remote) # ignore banner + await tlvdata.recv(remote) # ignore authpassed: 0 + await tlvdata.send(remote, {'collective': {'operation': 'enroll', 'name': name, 'hmac': proof}}) - rsp = tlvdata.recv(remote) + rsp = await tlvdata.recv(remote) if 'error' in rsp: tlvdata.send(connection, {'collective': {'status': rsp['error']}}) @@ -395,7 +395,7 @@ def handle_connection(connection, cert, request, local=False): j = invites.check_server_proof(invitation, mycert, cert, proof) if not j: remote.close() - tlvdata.send(connection, {'collective': + await tlvdata.send(connection, {'collective': {'status': 'Bad server token'}}) connection.close() return diff --git a/confluent_server/confluent/core.py b/confluent_server/confluent/core.py index fd7040bd..9b2d8698 100644 --- a/confluent_server/confluent/core.py +++ b/confluent_server/confluent/core.py @@ -750,7 +750,7 @@ def create_node(inputdata, configmanager): yield msg.CreatedResource(nodename) -def create_noderange(inputdata, configmanager): +async def create_noderange(inputdata, configmanager): try: noder = inputdata['name'] del inputdata['name'] diff --git a/confluent_server/confluent/sockapi.py b/confluent_server/confluent/sockapi.py index d75a5d4f..9bdd94b2 100644 --- a/confluent_server/confluent/sockapi.py +++ b/confluent_server/confluent/sockapi.py @@ -122,7 +122,7 @@ async def sessionhdl(connection, authname, skipauth=False, cert=None): if not response: return if 'collective' in response: - return collective.handle_connection(connection, cert, + return await collective.handle_connection(connection, cert, response['collective']) while not configmanager.config_is_ready(): await asyncio.sleep(1) @@ -152,7 +152,7 @@ async def sessionhdl(connection, authname, skipauth=False, cert=None): request = await tlvdata.recv(connection) if request and isinstance(request, dict) and 'collective' in request: if skipauth: - return collective.handle_connection( + return await collective.handle_connection( connection, None, request['collective'], local=True) else: tlvdata.send(