#!/usr/bin/python2 import argparse import errno import os import socket import subprocess import sys path = os.path.dirname(os.path.realpath(__file__)) path = os.path.realpath(os.path.join(path, '..', 'lib', 'python')) if path.startswith('/opt'): # if installed into system path, do not muck with things sys.path.append(path) import confluent.client as client import confluent.tlvdata as tlvdata try: input = raw_input except NameError: pass def make_certificate(): umask = os.umask(0o77) try: os.makedirs('/etc/confluent/cfg') except OSError as e: if e.errno == errno.EEXIST and os.path.isdir('/etc/confluent/cfg'): pass else: raise if subprocess.check_call( 'openssl ecparam -name secp384r1 -genkey -out ' '/etc/confluent/privkey.pem'.split(' ')): raise Exception('Error generating private key') if subprocess.check_call('openssl req -new -x509 -key ' '/etc/confluent/privkey.pem -days 7300 -out ' '/etc/confluent/srvcert.pem -subj /CN=' '{0}'.format(socket.gethostname()).split(' ')): raise Exception('Error generating certificate') try: uid = pwd.getpwnam('confluent').pw_uid os.chown('/etc/confluent/privkey.pem', uid, -1) os.chown('/etc/confluent/srvcert.pem', uid, -1) except KeyError: pass print('Certificate generated successfully') os.umask(umask) def show_invitation(name): if not os.path.exists('/etc/confluent/srvcert.pem'): make_certificate() s = client.Command().connection tlvdata.send(s, {'collective': {'operation': 'invite', 'name': name}}) invite = tlvdata.recv(s)['collective'] if 'error' in invite: sys.stderr.write(invite['error'] + '\n') return print('{0}'.format(invite['invitation'])) def join_collective(server, invitation): if not os.path.exists('/etc/confluent/srvcert.pem'): make_certificate() s = client.Command().connection while not invitation: invitation = input('Paste the invitation here: ') tlvdata.send(s, {'collective': {'operation': 'join', 'invitation': invitation, 'server': server}}) res = tlvdata.recv(s) res = res.get('collective', {'status': 'Unknown response: ' + repr(res)}) print(res.get('status', res.get('error', repr(res)))) if 'error' in res: sys.exit(1) def delete_member(name): s = client.Command().connection tlvdata.send(s, {'collective': {'operation': 'delete', 'member': name}}) res = tlvdata.recv(s) res = res.get('collective', {'status': 'Unknown response: ' + repr(res)}) print(res.get('status', res.get('error', repr(res)))) if 'error' in res: sys.exit(1) def show_collective(): s = client.Command().connection tlvdata.send(s, {'collective': {'operation': 'show'}}) res = tlvdata.recv(s) if 'error' in res: print(res['error']) return if 'error' in res['collective']: print(res['collective']['error']) return if 'quorum' in res['collective']: print('Quorum: {0}'.format(res['collective']['quorum'])) print('Leader: {0}'.format(res['collective']['leader'])) if 'active' in res['collective']: if res['collective']['active']: print('Active collective members:') for member in res['collective']['active']: print(' {0}'.format(member)) if res['collective']['offline']: print('Offline collective members:') for member in res['collective']['offline']: print(' {0}'.format(member)) else: print('Run collective show on leader for more data') def main(): a = argparse.ArgumentParser(description='Confluent server utility') sp = a.add_subparsers(dest='command') gc = sp.add_parser('gencert', help='Generate Confluent Certificates for ' 'collective mode and remote CLI access') sl = sp.add_parser('show', help='Show information about the collective') ic = sp.add_parser('invite', help='Generate a invitation to allow a new ' 'confluent instance to join as a ' 'collective member. Run collective invite -h for more information') ic.add_argument('name', help='Name of server to invite to join the ' 'collective') dc = sp.add_parser('delete', help='Delete a member of a collective') dc.add_argument('name', help='Name of server to delete from collective') jc = sp.add_parser('join', help='Join a collective. Run collective join -h for more information') jc.add_argument('server', help='Existing collective member that ran invite and generated a token') jc.add_argument('-i', help='Invitation provided by runniing invite on an ' 'existing collective member') cmdset = a.parse_args() if cmdset.command == 'gencert': make_certificate() elif cmdset.command == 'invite': show_invitation(cmdset.name) elif cmdset.command == 'join': join_collective(cmdset.server, cmdset.i) elif cmdset.command == 'show': show_collective() elif cmdset.command == 'delete': delete_member(cmdset.name) if __name__ == '__main__': main()