2
0
mirror of https://github.com/xcat2/confluent.git synced 2024-11-22 17:43:14 +00:00

Add collective member deletion

This allows deletion of a dead member, down to deleting down to non-collective
mode.
This commit is contained in:
Jarrod Johnson 2019-10-10 11:30:03 -04:00
parent 6ce6740b77
commit 8cab591a8b
3 changed files with 71 additions and 25 deletions

View File

@ -69,6 +69,19 @@ def join_collective(server, invitation):
res = res.get('collective',
{'status': 'Unknown response: ' + repr(res)})
print(res.get('status', res.get('error', repr(res))))
if 'error' in res:
sys.exit(1)
def delete_member(name):
s = client.Command().connection
tlvdata.send(s, {'collective': {'operation': 'delete',
'member': name}})
res = tlvdata.recv(s)
res = res.get('collective',
{'status': 'Unknown response: ' + repr(res)})
print(res.get('status', res.get('error', repr(res))))
if 'error' in res:
sys.exit(1)
def show_collective():
@ -104,6 +117,8 @@ def main():
'collective member. Run collective invite -h for more information')
ic.add_argument('name', help='Name of server to invite to join the '
'collective')
dc = sp.add_parser('delete', help='Delete a member of a collective')
dc.add_argument('name', help='Name of server to delete from collective')
jc = sp.add_parser('join', help='Join a collective. Run collective join -h for more information')
jc.add_argument('server', help='Existing collective member that ran invite and generated a token')
jc.add_argument('-i', help='Invitation provided by runniing invite on an '
@ -117,6 +132,8 @@ def main():
join_collective(cmdset.server, cmdset.i)
elif cmdset.command == 'show':
show_collective()
elif cmdset.command == 'delete':
delete_member(cmdset.name)
if __name__ == '__main__':
main()

View File

@ -220,8 +220,7 @@ def handle_connection(connection, cert, request, local=False):
else:
if not local:
return
if 'show' == operation:
if operation in ('show', 'delete'):
if not list(cfm.list_collective()):
tlvdata.send(connection,
{'collective': {'error': 'Collective mode not '
@ -258,7 +257,23 @@ def handle_connection(connection, cert, request, local=False):
collinfo['quorum'] = True
except exc.DegradedCollective:
collinfo['quorum'] = False
tlvdata.send(connection, {'collective': collinfo})
if operation == 'show':
tlvdata.send(connection, {'collective': collinfo})
elif operation == 'delete':
todelete = request['member']
if (todelete == collinfo['leader'] or
todelete in collinfo['active']):
tlvdata.send(connection, {'collective':
{'error': '{0} is still active, stop the confluent service to remove it'.format(todelete)}})
return
if todelete not in collinfo['offline']:
tlvdata.send(connection, {'collective':
{'error': '{0} is not a recognized collective member'.format(todelete)}})
return
cfm.del_collective_member(todelete)
tlvdata.send(connection,
{'collective': {'status': 'Successfully deleted {0}'.format(todelete)}})
connection.close()
return
if 'invite' == operation:
try:

View File

@ -328,15 +328,19 @@ def exec_on_leader(function, *args):
def exec_on_followers(fnname, *args):
global _txcount
if len(cfgstreams) < (len(_cfgstore['collective']) // 2):
# the leader counts in addition to registered streams
raise exc.DegradedCollective()
exec_on_followers_unconditional(fnname, *args)
def exec_on_followers_unconditional(fnname, *args):
global _txcount
pushes = eventlet.GreenPool()
_txcount += 1
payload = cPickle.dumps({'function': fnname, 'args': args,
'txcount': _txcount}, protocol=lowestver)
for res in pushes.starmap(
for _ in pushes.starmap(
_push_rpc, [(cfgstreams[s], payload) for s in cfgstreams]):
pass
@ -781,13 +785,14 @@ def add_collective_member(name, address, fingerprint):
_true_add_collective_member(name, address, fingerprint)
def del_collective_member(name):
if cfgleader:
return exec_on_leader('add_collective_member', name)
if cfgleader and not isinstance(cfgleader, bool):
return exec_on_leader('del_collective_member', name)
if cfgstreams:
exec_on_followers('_true_add_collective_member', name)
exec_on_followers_unconditional('_true_del_collective_member', name)
_true_del_collective_member(name)
def _true_del_collective_member(name, sync=True):
global cfgleader
name = confluent.util.stringify(name)
if _cfgstore is None:
return
@ -800,6 +805,9 @@ def _true_del_collective_member(name, sync=True):
if 'collectivedirty' not in _cfgstore:
_cfgstore['collectivedirty'] = set([])
_cfgstore['collectivedirty'].add(name)
if len(_cfgstore['collective']) < 2:
del _cfgstore['collective']
cfgleader = None
if sync:
ConfigManager._bg_sync_to_file()
@ -2371,24 +2379,30 @@ class ConfigManager(object):
finally:
globalf.close()
if fullsync or 'collectivedirty' in _cfgstore:
collectivef = dbm.open(os.path.join(cls._cfgdir, "collective"),
'c', 384)
try:
if fullsync:
colls = _cfgstore['collective']
else:
with _dirtylock:
colls = copy.deepcopy(_cfgstore['collectivedirty'])
del _cfgstore['collectivedirty']
for coll in colls:
if coll in _cfgstore['collective']:
collectivef[coll] = cPickle.dumps(
_cfgstore['collective'][coll], protocol=cPickle.HIGHEST_PROTOCOL)
if len(_cfgstore.get('collective', ())) > 1:
collectivef = dbm.open(os.path.join(cls._cfgdir, "collective"),
'c', 384)
try:
if fullsync:
colls = _cfgstore['collective']
else:
if coll in collectivef:
del globalf[coll]
finally:
collectivef.close()
with _dirtylock:
colls = copy.deepcopy(_cfgstore['collectivedirty'])
del _cfgstore['collectivedirty']
for coll in colls:
if coll in _cfgstore['collective']:
collectivef[coll] = cPickle.dumps(
_cfgstore['collective'][coll], protocol=cPickle.HIGHEST_PROTOCOL)
else:
if coll in collectivef:
del collectivef[coll]
finally:
collectivef.close()
else:
try:
os.remove(os.path.join(cls._cfgdir, "collective"))
except OSError:
pass
if fullsync:
pathname = cls._cfgdir
currdict = _cfgstore['main']