mirror of
https://github.com/xcat2/confluent.git
synced 2025-01-23 08:04:09 +00:00
baa365fcac
Provide for applications where only a small subset of collective members should be considered to count toward whether the collective can proceed. Commonly, 'service' nodes may be numerous to do work, but may all want to go offline during a maintenance window.
156 lines
6.1 KiB
Python
156 lines
6.1 KiB
Python
#!/usr/bin/python2
|
|
|
|
import argparse
|
|
import errno
|
|
import os
|
|
import pwd
|
|
import socket
|
|
import subprocess
|
|
import sys
|
|
|
|
path = os.path.dirname(os.path.realpath(__file__))
|
|
path = os.path.realpath(os.path.join(path, '..', 'lib', 'python'))
|
|
if path.startswith('/opt'):
|
|
# if installed into system path, do not muck with things
|
|
sys.path.append(path)
|
|
|
|
import confluent.client as client
|
|
import confluent.sortutil as sortutil
|
|
import confluent.tlvdata as tlvdata
|
|
|
|
try:
|
|
input = raw_input
|
|
except NameError:
|
|
pass
|
|
|
|
def make_certificate():
|
|
umask = os.umask(0o77)
|
|
try:
|
|
os.makedirs('/etc/confluent/cfg')
|
|
except OSError as e:
|
|
if e.errno == errno.EEXIST and os.path.isdir('/etc/confluent/cfg'):
|
|
pass
|
|
else:
|
|
raise
|
|
if subprocess.check_call(
|
|
'openssl ecparam -name secp384r1 -genkey -out '
|
|
'/etc/confluent/privkey.pem'.split(' ')):
|
|
raise Exception('Error generating private key')
|
|
|
|
if subprocess.check_call('openssl req -new -x509 -key '
|
|
'/etc/confluent/privkey.pem -days 7300 -out '
|
|
'/etc/confluent/srvcert.pem -subj /CN='
|
|
'{0}'.format(socket.gethostname()).split(' ')):
|
|
raise Exception('Error generating certificate')
|
|
try:
|
|
uid = pwd.getpwnam('confluent').pw_uid
|
|
os.chown('/etc/confluent/privkey.pem', uid, -1)
|
|
os.chown('/etc/confluent/srvcert.pem', uid, -1)
|
|
except KeyError:
|
|
pass
|
|
print('Certificate generated successfully')
|
|
os.umask(umask)
|
|
|
|
|
|
def show_invitation(name, nonvoting=False):
|
|
if not os.path.exists('/etc/confluent/srvcert.pem'):
|
|
make_certificate()
|
|
s = client.Command().connection
|
|
role = 'nonvoting' if nonvoting else None
|
|
tlvdata.send(s, {'collective': {'operation': 'invite', 'name': name, 'role': role}})
|
|
invite = tlvdata.recv(s)['collective']
|
|
if 'error' in invite:
|
|
sys.stderr.write(invite['error'] + '\n')
|
|
return
|
|
print('{0}'.format(invite['invitation']))
|
|
|
|
|
|
def join_collective(server, invitation):
|
|
if not os.path.exists('/etc/confluent/srvcert.pem'):
|
|
make_certificate()
|
|
s = client.Command().connection
|
|
while not invitation:
|
|
invitation = input('Paste the invitation here: ')
|
|
tlvdata.send(s, {'collective': {'operation': 'join',
|
|
'invitation': invitation,
|
|
'server': server}})
|
|
res = tlvdata.recv(s)
|
|
res = res.get('collective',
|
|
{'status': 'Unknown response: ' + repr(res)})
|
|
print(res.get('status', res.get('error', repr(res))))
|
|
if 'error' in res:
|
|
sys.exit(1)
|
|
|
|
def delete_member(name):
|
|
s = client.Command().connection
|
|
tlvdata.send(s, {'collective': {'operation': 'delete',
|
|
'member': name}})
|
|
res = tlvdata.recv(s)
|
|
res = res.get('collective',
|
|
{'status': 'Unknown response: ' + repr(res)})
|
|
print(res.get('status', res.get('error', repr(res))))
|
|
if 'error' in res:
|
|
sys.exit(1)
|
|
|
|
|
|
def show_collective():
|
|
s = client.Command().connection
|
|
tlvdata.send(s, {'collective': {'operation': 'show'}})
|
|
res = tlvdata.recv(s)
|
|
if 'error' in res:
|
|
print(res['error'])
|
|
return
|
|
if 'error' in res['collective']:
|
|
print(res['collective']['error'])
|
|
return
|
|
if 'quorum' in res['collective']:
|
|
print('Quorum: {0}'.format(res['collective']['quorum']))
|
|
leadernonvoting = res['collective']['leader'] in res['collective'].get('nonvoting', ())
|
|
print('Leader: {0}{1}'.format(res['collective']['leader'], ' (non-voting)' if leadernonvoting else ''))
|
|
if 'active' in res['collective']:
|
|
if res['collective']['active']:
|
|
print('Active collective members:')
|
|
for member in sortutil.natural_sort(res['collective']['active']):
|
|
nonvoter = member in res['collective'].get('nonvoting', ())
|
|
print(' {0}{1}'.format(member, ' (nonvoting)' if nonvoter else ''))
|
|
if res['collective']['offline']:
|
|
print('Offline collective members:')
|
|
for member in sortutil.natural_sort(res['collective']['offline']):
|
|
nonvoter = member in res['collective'].get('nonvoting', ())
|
|
print(' {0}{1}'.format(member, ' (nonvoting)' if nonvoter else ''))
|
|
else:
|
|
print('Run collective show on leader for more data')
|
|
|
|
def main():
|
|
a = argparse.ArgumentParser(description='Confluent server utility')
|
|
sp = a.add_subparsers(dest='command')
|
|
gc = sp.add_parser('gencert', help='Generate Confluent Certificates for '
|
|
'collective mode and remote CLI access')
|
|
sl = sp.add_parser('show', help='Show information about the collective')
|
|
ic = sp.add_parser('invite', help='Generate a invitation to allow a new '
|
|
'confluent instance to join as a '
|
|
'collective member. Run collective invite -h for more information')
|
|
ic.add_argument('name', help='Name of server to invite to join the '
|
|
'collective')
|
|
ic.add_argument('-n', help='Join as a non-voting member, do not have this member contribute to quorum', action='store_true')
|
|
dc = sp.add_parser('delete', help='Delete a member of a collective')
|
|
dc.add_argument('name', help='Name of server to delete from collective')
|
|
jc = sp.add_parser('join', help='Join a collective. Run collective join -h for more information')
|
|
jc.add_argument('server', help='Existing collective member that ran invite and generated a token')
|
|
jc.add_argument('-i', help='Invitation provided by runniing invite on an '
|
|
'existing collective member')
|
|
cmdset = a.parse_args()
|
|
if cmdset.command == 'gencert':
|
|
make_certificate()
|
|
elif cmdset.command == 'invite':
|
|
show_invitation(cmdset.name, cmdset.n)
|
|
elif cmdset.command == 'join':
|
|
join_collective(cmdset.server, cmdset.i)
|
|
elif cmdset.command == 'show':
|
|
show_collective()
|
|
elif cmdset.command == 'delete':
|
|
delete_member(cmdset.name)
|
|
|
|
if __name__ == '__main__':
|
|
main()
|