2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-01-23 16:13:47 +00:00
Jarrod Johnson baa365fcac Implement non-voting collective members
Provide for applications
where only a small subset of collective
members should be
considered to count
toward whether the collective
can proceed.

Commonly, 'service' nodes may
be numerous to do work, but may all want to go offline
during a maintenance window.
2023-03-06 11:56:15 -05:00

156 lines
6.1 KiB
Python

#!/usr/bin/python2
import argparse
import errno
import os
import pwd
import socket
import subprocess
import sys
path = os.path.dirname(os.path.realpath(__file__))
path = os.path.realpath(os.path.join(path, '..', 'lib', 'python'))
if path.startswith('/opt'):
# if installed into system path, do not muck with things
sys.path.append(path)
import confluent.client as client
import confluent.sortutil as sortutil
import confluent.tlvdata as tlvdata
try:
input = raw_input
except NameError:
pass
def make_certificate():
umask = os.umask(0o77)
try:
os.makedirs('/etc/confluent/cfg')
except OSError as e:
if e.errno == errno.EEXIST and os.path.isdir('/etc/confluent/cfg'):
pass
else:
raise
if subprocess.check_call(
'openssl ecparam -name secp384r1 -genkey -out '
'/etc/confluent/privkey.pem'.split(' ')):
raise Exception('Error generating private key')
if subprocess.check_call('openssl req -new -x509 -key '
'/etc/confluent/privkey.pem -days 7300 -out '
'/etc/confluent/srvcert.pem -subj /CN='
'{0}'.format(socket.gethostname()).split(' ')):
raise Exception('Error generating certificate')
try:
uid = pwd.getpwnam('confluent').pw_uid
os.chown('/etc/confluent/privkey.pem', uid, -1)
os.chown('/etc/confluent/srvcert.pem', uid, -1)
except KeyError:
pass
print('Certificate generated successfully')
os.umask(umask)
def show_invitation(name, nonvoting=False):
if not os.path.exists('/etc/confluent/srvcert.pem'):
make_certificate()
s = client.Command().connection
role = 'nonvoting' if nonvoting else None
tlvdata.send(s, {'collective': {'operation': 'invite', 'name': name, 'role': role}})
invite = tlvdata.recv(s)['collective']
if 'error' in invite:
sys.stderr.write(invite['error'] + '\n')
return
print('{0}'.format(invite['invitation']))
def join_collective(server, invitation):
if not os.path.exists('/etc/confluent/srvcert.pem'):
make_certificate()
s = client.Command().connection
while not invitation:
invitation = input('Paste the invitation here: ')
tlvdata.send(s, {'collective': {'operation': 'join',
'invitation': invitation,
'server': server}})
res = tlvdata.recv(s)
res = res.get('collective',
{'status': 'Unknown response: ' + repr(res)})
print(res.get('status', res.get('error', repr(res))))
if 'error' in res:
sys.exit(1)
def delete_member(name):
s = client.Command().connection
tlvdata.send(s, {'collective': {'operation': 'delete',
'member': name}})
res = tlvdata.recv(s)
res = res.get('collective',
{'status': 'Unknown response: ' + repr(res)})
print(res.get('status', res.get('error', repr(res))))
if 'error' in res:
sys.exit(1)
def show_collective():
s = client.Command().connection
tlvdata.send(s, {'collective': {'operation': 'show'}})
res = tlvdata.recv(s)
if 'error' in res:
print(res['error'])
return
if 'error' in res['collective']:
print(res['collective']['error'])
return
if 'quorum' in res['collective']:
print('Quorum: {0}'.format(res['collective']['quorum']))
leadernonvoting = res['collective']['leader'] in res['collective'].get('nonvoting', ())
print('Leader: {0}{1}'.format(res['collective']['leader'], ' (non-voting)' if leadernonvoting else ''))
if 'active' in res['collective']:
if res['collective']['active']:
print('Active collective members:')
for member in sortutil.natural_sort(res['collective']['active']):
nonvoter = member in res['collective'].get('nonvoting', ())
print(' {0}{1}'.format(member, ' (nonvoting)' if nonvoter else ''))
if res['collective']['offline']:
print('Offline collective members:')
for member in sortutil.natural_sort(res['collective']['offline']):
nonvoter = member in res['collective'].get('nonvoting', ())
print(' {0}{1}'.format(member, ' (nonvoting)' if nonvoter else ''))
else:
print('Run collective show on leader for more data')
def main():
a = argparse.ArgumentParser(description='Confluent server utility')
sp = a.add_subparsers(dest='command')
gc = sp.add_parser('gencert', help='Generate Confluent Certificates for '
'collective mode and remote CLI access')
sl = sp.add_parser('show', help='Show information about the collective')
ic = sp.add_parser('invite', help='Generate a invitation to allow a new '
'confluent instance to join as a '
'collective member. Run collective invite -h for more information')
ic.add_argument('name', help='Name of server to invite to join the '
'collective')
ic.add_argument('-n', help='Join as a non-voting member, do not have this member contribute to quorum', action='store_true')
dc = sp.add_parser('delete', help='Delete a member of a collective')
dc.add_argument('name', help='Name of server to delete from collective')
jc = sp.add_parser('join', help='Join a collective. Run collective join -h for more information')
jc.add_argument('server', help='Existing collective member that ran invite and generated a token')
jc.add_argument('-i', help='Invitation provided by runniing invite on an '
'existing collective member')
cmdset = a.parse_args()
if cmdset.command == 'gencert':
make_certificate()
elif cmdset.command == 'invite':
show_invitation(cmdset.name, cmdset.n)
elif cmdset.command == 'join':
join_collective(cmdset.server, cmdset.i)
elif cmdset.command == 'show':
show_collective()
elif cmdset.command == 'delete':
delete_member(cmdset.name)
if __name__ == '__main__':
main()