diff --git a/confluent_server/bin/collective b/confluent_server/bin/collective index 15ef2582..34d6accc 100644 --- a/confluent_server/bin/collective +++ b/confluent_server/bin/collective @@ -1,6 +1,7 @@ #!/usr/bin/env python import argparse +import errno import os import socket import subprocess @@ -22,6 +23,13 @@ except NameError: def make_certificate(): umask = os.umask(0077) + try: + os.makedirs('/etc/confluent/cfg') + except OSError as e: + if e.errno == errno.EEXIST and os.path.isdir('/etc/confluent/cfg'): + pass + else: + raise if subprocess.check_call( 'openssl ecparam -name secp384r1 -genkey -out ' '/etc/confluent/privkey.pem'.split(' ')): @@ -62,11 +70,18 @@ def join_collective(server, invitation): {'status': 'Unknown response: ' + repr(res)})['status']) +def show_collective(): + s = client.Command().connection + tlvdata.send(s, {'collective': {'operation': 'show'}}) + res = tlvdata.recv(s) + print(repr(res)) + def main(): a = argparse.ArgumentParser(description='Confluent server utility') sp = a.add_subparsers(dest='command') gc = sp.add_parser('gencert', help='Generate Confluent Certificates for ' 'collective mode and remote CLI access') + sl = sp.add_parser('show', help='Show information about the collective') ic = sp.add_parser('invite', help='Generate a invitation to allow a new ' 'confluent instance to join as a ' 'collective member') @@ -83,6 +98,8 @@ def main(): show_invitation(cmdset.name) elif cmdset.command == 'join': join_collective(cmdset.server, cmdset.i) + elif cmdset.command == 'show': + show_collective() if __name__ == '__main__': main() diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index c15fc743..d8b87b0d 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -160,6 +160,21 @@ def handle_connection(connection, cert, request, local=False): else: if not local: return + + if 'show' == operation: + try: + cfm.check_quorum() + except exc.DegradedCollective: + tlvdata.send(connection, + {'collective': + {'error': 'Collective does not have quorum'}}) + if follower: + myleader = cfm.get_collective_member_by_address( + currentleader)['name'] + else: + myleader = get_myname() + tlvdata.send(connection, {'collective': {'leader': myleader}}) + return if 'invite' == operation: if follower: tlvdata.send(connection,