2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-04-22 04:55:53 +00:00

Port some of the collective management to asyncio

This commit is contained in:
Jarrod Johnson 2024-04-15 17:19:27 -04:00
parent bfe7529d21
commit e8110551db
4 changed files with 52 additions and 42 deletions

View File

@ -1,6 +1,7 @@
#!/usr/bin/python2
#!/usr/bin/python3
import argparse
import asyncio
import errno
import os
import pwd
@ -55,37 +56,44 @@ def make_certificate():
async def show_invitation(name, nonvoting=False):
if not os.path.exists('/etc/confluent/srvcert.pem'):
make_certificate()
s = client.Command().connection
clicmd = client.Command()
await clicmd.ensure_connected()
s = clicmd.connection
role = 'nonvoting' if nonvoting else None
tlvdata.send(s, {'collective': {'operation': 'invite', 'name': name, 'role': role}})
invite = tlvdata.recv(s)['collective']
await tlvdata.send(s, {'collective': {'operation': 'invite', 'name': name, 'role': role}})
invite = await tlvdata.recv(s)
invite = invite['collective']
if 'error' in invite:
sys.stderr.write(invite['error'] + '\n')
return
print('{0}'.format(invite['invitation']))
def join_collective(server, invitation):
async def join_collective(server, invitation):
if not os.path.exists('/etc/confluent/srvcert.pem'):
make_certificate()
s = client.Command().connection
clicmd = client.Command()
await clicmd.ensure_connected()
s = clicmd.connection
while not invitation:
invitation = input('Paste the invitation here: ')
tlvdata.send(s, {'collective': {'operation': 'join',
await tlvdata.send(s, {'collective': {'operation': 'join',
'invitation': invitation,
'server': server}})
res = tlvdata.recv(s)
res = await tlvdata.recv(s)
res = res.get('collective',
{'status': 'Unknown response: ' + repr(res)})
print(res.get('status', res.get('error', repr(res))))
if 'error' in res:
sys.exit(1)
def delete_member(name):
s = client.Command().connection
tlvdata.send(s, {'collective': {'operation': 'delete',
async def delete_member(name):
clicmd = client.Command()
await clicmd.ensure_connected()
s = clicmd.connection
await tlvdata.send(s, {'collective': {'operation': 'delete',
'member': name}})
res = tlvdata.recv(s)
res = await tlvdata.recv(s)
res = res.get('collective',
{'status': 'Unknown response: ' + repr(res)})
print(res.get('status', res.get('error', repr(res))))
@ -93,10 +101,12 @@ def delete_member(name):
sys.exit(1)
def show_collective():
s = client.Command().connection
tlvdata.send(s, {'collective': {'operation': 'show'}})
res = tlvdata.recv(s)
async def show_collective():
clicmd = client.Command()
await clicmd.ensure_connected()
s = clicmd.connection
await tlvdata.send(s, {'collective': {'operation': 'show'}})
res = await tlvdata.recv(s)
if 'error' in res:
print(res['error'])
return
@ -143,13 +153,13 @@ async def main():
if cmdset.command == 'gencert':
make_certificate()
elif cmdset.command == 'invite':
show_invitation(cmdset.name, cmdset.n)
await show_invitation(cmdset.name, cmdset.n)
elif cmdset.command == 'join':
join_collective(cmdset.server, cmdset.i)
await join_collective(cmdset.server, cmdset.i)
elif cmdset.command == 'show':
show_collective()
await show_collective()
elif cmdset.command == 'delete':
delete_member(cmdset.name)
await delete_member(cmdset.name)
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())

View File

@ -259,7 +259,7 @@ def get_myname():
def in_collective():
return bool(list(cfm.list_collective()))
def handle_connection(connection, cert, request, local=False):
async def handle_connection(connection, cert, request, local=False):
global currentleader
global retrythread
global initting
@ -272,7 +272,7 @@ def handle_connection(connection, cert, request, local=False):
return
if operation in ('show', 'delete'):
if not list(cfm.list_collective()):
tlvdata.send(connection,
await tlvdata.send(connection,
{'collective': {'error': 'Collective mode not '
'enabled on this '
'system'}})
@ -292,17 +292,17 @@ def handle_connection(connection, cert, request, local=False):
linfo['fingerprint'],
cert)):
remote.close()
tlvdata.send(connection,
await tlvdata.send(connection,
{'error': 'Invalid certificate, '
'redo invitation process'})
connection.close()
return
tlvdata.recv(remote) # ignore banner
tlvdata.recv(remote) # ignore authpassed: 0
tlvdata.send(remote,
await tlvdata.recv(remote) # ignore banner
await tlvdata.recv(remote) # ignore authpassed: 0
await tlvdata.send(remote,
{'collective': {'operation': 'getinfo',
'name': get_myname()}})
collinfo = tlvdata.recv(remote)
collinfo = await tlvdata.recv(remote)
else:
collinfo = {}
populate_collinfo(collinfo)
@ -312,20 +312,20 @@ def handle_connection(connection, cert, request, local=False):
except exc.DegradedCollective:
collinfo['quorum'] = False
if operation == 'show':
tlvdata.send(connection, {'collective': collinfo})
await tlvdata.send(connection, {'collective': collinfo})
elif operation == 'delete':
todelete = request['member']
if (todelete == collinfo['leader'] or
todelete in collinfo['active']):
tlvdata.send(connection, {'collective':
await tlvdata.send(connection, {'collective':
{'error': '{0} is still active, stop the confluent service to remove it'.format(todelete)}})
return
if todelete not in collinfo['offline']:
tlvdata.send(connection, {'collective':
await tlvdata.send(connection, {'collective':
{'error': '{0} is not a recognized collective member'.format(todelete)}})
return
cfm.del_collective_member(todelete)
tlvdata.send(connection,
await tlvdata.send(connection,
{'collective': {'status': 'Successfully deleted {0}'.format(todelete)}})
connection.close()
return
@ -342,7 +342,7 @@ def handle_connection(connection, cert, request, local=False):
name = request['name']
role = request.get('role', '')
invitation = invites.create_server_invitation(name, role)
tlvdata.send(connection,
await tlvdata.send(connection,
{'collective': {'invitation': invitation}})
connection.close()
if 'join' == operation:
@ -352,7 +352,7 @@ def handle_connection(connection, cert, request, local=False):
name, invitation = invitation.split(b'@', 1)
name = util.stringify(name)
except Exception:
tlvdata.send(
await tlvdata.send(
connection,
{'collective':
{'status': 'Invalid token format'}})
@ -369,7 +369,7 @@ def handle_connection(connection, cert, request, local=False):
keyfile='/etc/confluent/privkey.pem',
certfile='/etc/confluent/srvcert.pem')
except Exception:
tlvdata.send(
await tlvdata.send(
connection,
{'collective':
{'status': 'Failed to connect to {0}'.format(host)}})
@ -380,11 +380,11 @@ def handle_connection(connection, cert, request, local=False):
cert = remote.getpeercert(binary_form=True)
proof = base64.b64encode(invites.create_client_proof(
invitation, mycert, cert))
tlvdata.recv(remote) # ignore banner
tlvdata.recv(remote) # ignore authpassed: 0
tlvdata.send(remote, {'collective': {'operation': 'enroll',
await tlvdata.recv(remote) # ignore banner
await tlvdata.recv(remote) # ignore authpassed: 0
await tlvdata.send(remote, {'collective': {'operation': 'enroll',
'name': name, 'hmac': proof}})
rsp = tlvdata.recv(remote)
rsp = await tlvdata.recv(remote)
if 'error' in rsp:
tlvdata.send(connection, {'collective':
{'status': rsp['error']}})
@ -395,7 +395,7 @@ def handle_connection(connection, cert, request, local=False):
j = invites.check_server_proof(invitation, mycert, cert, proof)
if not j:
remote.close()
tlvdata.send(connection, {'collective':
await tlvdata.send(connection, {'collective':
{'status': 'Bad server token'}})
connection.close()
return

View File

@ -750,7 +750,7 @@ def create_node(inputdata, configmanager):
yield msg.CreatedResource(nodename)
def create_noderange(inputdata, configmanager):
async def create_noderange(inputdata, configmanager):
try:
noder = inputdata['name']
del inputdata['name']

View File

@ -122,7 +122,7 @@ async def sessionhdl(connection, authname, skipauth=False, cert=None):
if not response:
return
if 'collective' in response:
return collective.handle_connection(connection, cert,
return await collective.handle_connection(connection, cert,
response['collective'])
while not configmanager.config_is_ready():
await asyncio.sleep(1)
@ -152,7 +152,7 @@ async def sessionhdl(connection, authname, skipauth=False, cert=None):
request = await tlvdata.recv(connection)
if request and isinstance(request, dict) and 'collective' in request:
if skipauth:
return collective.handle_connection(
return await collective.handle_connection(
connection, None, request['collective'], local=True)
else:
tlvdata.send(