From 8cab591a8b8d705e61f1be0b1599f195882d1091 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Thu, 10 Oct 2019 11:30:03 -0400 Subject: [PATCH] Add collective member deletion This allows deletion of a dead member, down to deleting down to non-collective mode. --- confluent_server/bin/collective | 17 ++++++ .../confluent/collective/manager.py | 21 ++++++- .../confluent/config/configmanager.py | 58 ++++++++++++------- 3 files changed, 71 insertions(+), 25 deletions(-) diff --git a/confluent_server/bin/collective b/confluent_server/bin/collective index 297401a0..fee56ab5 100644 --- a/confluent_server/bin/collective +++ b/confluent_server/bin/collective @@ -69,6 +69,19 @@ def join_collective(server, invitation): res = res.get('collective', {'status': 'Unknown response: ' + repr(res)}) print(res.get('status', res.get('error', repr(res)))) + if 'error' in res: + sys.exit(1) + +def delete_member(name): + s = client.Command().connection + tlvdata.send(s, {'collective': {'operation': 'delete', + 'member': name}}) + res = tlvdata.recv(s) + res = res.get('collective', + {'status': 'Unknown response: ' + repr(res)}) + print(res.get('status', res.get('error', repr(res)))) + if 'error' in res: + sys.exit(1) def show_collective(): @@ -104,6 +117,8 @@ def main(): 'collective member. Run collective invite -h for more information') ic.add_argument('name', help='Name of server to invite to join the ' 'collective') + dc = sp.add_parser('delete', help='Delete a member of a collective') + dc.add_argument('name', help='Name of server to delete from collective') jc = sp.add_parser('join', help='Join a collective. Run collective join -h for more information') jc.add_argument('server', help='Existing collective member that ran invite and generated a token') jc.add_argument('-i', help='Invitation provided by runniing invite on an ' @@ -117,6 +132,8 @@ def main(): join_collective(cmdset.server, cmdset.i) elif cmdset.command == 'show': show_collective() + elif cmdset.command == 'delete': + delete_member(cmdset.name) if __name__ == '__main__': main() diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 789858e4..ccefc0ae 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -220,8 +220,7 @@ def handle_connection(connection, cert, request, local=False): else: if not local: return - - if 'show' == operation: + if operation in ('show', 'delete'): if not list(cfm.list_collective()): tlvdata.send(connection, {'collective': {'error': 'Collective mode not ' @@ -258,7 +257,23 @@ def handle_connection(connection, cert, request, local=False): collinfo['quorum'] = True except exc.DegradedCollective: collinfo['quorum'] = False - tlvdata.send(connection, {'collective': collinfo}) + if operation == 'show': + tlvdata.send(connection, {'collective': collinfo}) + elif operation == 'delete': + todelete = request['member'] + if (todelete == collinfo['leader'] or + todelete in collinfo['active']): + tlvdata.send(connection, {'collective': + {'error': '{0} is still active, stop the confluent service to remove it'.format(todelete)}}) + return + if todelete not in collinfo['offline']: + tlvdata.send(connection, {'collective': + {'error': '{0} is not a recognized collective member'.format(todelete)}}) + return + cfm.del_collective_member(todelete) + tlvdata.send(connection, + {'collective': {'status': 'Successfully deleted {0}'.format(todelete)}}) + connection.close() return if 'invite' == operation: try: diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 8a43a44c..e1fb2046 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -328,15 +328,19 @@ def exec_on_leader(function, *args): def exec_on_followers(fnname, *args): - global _txcount if len(cfgstreams) < (len(_cfgstore['collective']) // 2): # the leader counts in addition to registered streams raise exc.DegradedCollective() + exec_on_followers_unconditional(fnname, *args) + + +def exec_on_followers_unconditional(fnname, *args): + global _txcount pushes = eventlet.GreenPool() _txcount += 1 payload = cPickle.dumps({'function': fnname, 'args': args, 'txcount': _txcount}, protocol=lowestver) - for res in pushes.starmap( + for _ in pushes.starmap( _push_rpc, [(cfgstreams[s], payload) for s in cfgstreams]): pass @@ -781,13 +785,14 @@ def add_collective_member(name, address, fingerprint): _true_add_collective_member(name, address, fingerprint) def del_collective_member(name): - if cfgleader: - return exec_on_leader('add_collective_member', name) + if cfgleader and not isinstance(cfgleader, bool): + return exec_on_leader('del_collective_member', name) if cfgstreams: - exec_on_followers('_true_add_collective_member', name) + exec_on_followers_unconditional('_true_del_collective_member', name) _true_del_collective_member(name) def _true_del_collective_member(name, sync=True): + global cfgleader name = confluent.util.stringify(name) if _cfgstore is None: return @@ -800,6 +805,9 @@ def _true_del_collective_member(name, sync=True): if 'collectivedirty' not in _cfgstore: _cfgstore['collectivedirty'] = set([]) _cfgstore['collectivedirty'].add(name) + if len(_cfgstore['collective']) < 2: + del _cfgstore['collective'] + cfgleader = None if sync: ConfigManager._bg_sync_to_file() @@ -2371,24 +2379,30 @@ class ConfigManager(object): finally: globalf.close() if fullsync or 'collectivedirty' in _cfgstore: - collectivef = dbm.open(os.path.join(cls._cfgdir, "collective"), - 'c', 384) - try: - if fullsync: - colls = _cfgstore['collective'] - else: - with _dirtylock: - colls = copy.deepcopy(_cfgstore['collectivedirty']) - del _cfgstore['collectivedirty'] - for coll in colls: - if coll in _cfgstore['collective']: - collectivef[coll] = cPickle.dumps( - _cfgstore['collective'][coll], protocol=cPickle.HIGHEST_PROTOCOL) + if len(_cfgstore.get('collective', ())) > 1: + collectivef = dbm.open(os.path.join(cls._cfgdir, "collective"), + 'c', 384) + try: + if fullsync: + colls = _cfgstore['collective'] else: - if coll in collectivef: - del globalf[coll] - finally: - collectivef.close() + with _dirtylock: + colls = copy.deepcopy(_cfgstore['collectivedirty']) + del _cfgstore['collectivedirty'] + for coll in colls: + if coll in _cfgstore['collective']: + collectivef[coll] = cPickle.dumps( + _cfgstore['collective'][coll], protocol=cPickle.HIGHEST_PROTOCOL) + else: + if coll in collectivef: + del collectivef[coll] + finally: + collectivef.close() + else: + try: + os.remove(os.path.join(cls._cfgdir, "collective")) + except OSError: + pass if fullsync: pathname = cls._cfgdir currdict = _cfgstore['main']