From baa365fcac93cbb414e50975be51630aa4273246 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Mon, 6 Mar 2023 11:56:15 -0500 Subject: [PATCH] Implement non-voting collective members Provide for applications where only a small subset of collective members should be considered to count toward whether the collective can proceed. Commonly, 'service' nodes may be numerous to do work, but may all want to go offline during a maintenance window. --- confluent_server/bin/collective | 17 +++-- .../confluent/collective/invites.py | 13 ++-- .../confluent/collective/manager.py | 14 ++-- .../confluent/config/configmanager.py | 74 +++++++++++++------ 4 files changed, 81 insertions(+), 37 deletions(-) diff --git a/confluent_server/bin/collective b/confluent_server/bin/collective index 4f761ca0..b5ebab5a 100644 --- a/confluent_server/bin/collective +++ b/confluent_server/bin/collective @@ -52,11 +52,12 @@ def make_certificate(): os.umask(umask) -def show_invitation(name): +def show_invitation(name, nonvoting=False): if not os.path.exists('/etc/confluent/srvcert.pem'): make_certificate() s = client.Command().connection - tlvdata.send(s, {'collective': {'operation': 'invite', 'name': name}}) + role = 'nonvoting' if nonvoting else None + tlvdata.send(s, {'collective': {'operation': 'invite', 'name': name, 'role': role}}) invite = tlvdata.recv(s)['collective'] if 'error' in invite: sys.stderr.write(invite['error'] + '\n') @@ -104,16 +105,19 @@ def show_collective(): return if 'quorum' in res['collective']: print('Quorum: {0}'.format(res['collective']['quorum'])) - print('Leader: {0}'.format(res['collective']['leader'])) + leadernonvoting = res['collective']['leader'] in res['collective'].get('nonvoting', ()) + print('Leader: {0}{1}'.format(res['collective']['leader'], ' (non-voting)' if leadernonvoting else '')) if 'active' in res['collective']: if res['collective']['active']: print('Active collective members:') for member in sortutil.natural_sort(res['collective']['active']): - print(' {0}'.format(member)) + nonvoter = member in res['collective'].get('nonvoting', ()) + print(' {0}{1}'.format(member, ' (nonvoting)' if nonvoter else '')) if res['collective']['offline']: print('Offline collective members:') for member in sortutil.natural_sort(res['collective']['offline']): - print(' {0}'.format(member)) + nonvoter = member in res['collective'].get('nonvoting', ()) + print(' {0}{1}'.format(member, ' (nonvoting)' if nonvoter else '')) else: print('Run collective show on leader for more data') @@ -128,6 +132,7 @@ def main(): 'collective member. Run collective invite -h for more information') ic.add_argument('name', help='Name of server to invite to join the ' 'collective') + ic.add_argument('-n', help='Join as a non-voting member, do not have this member contribute to quorum', action='store_true') dc = sp.add_parser('delete', help='Delete a member of a collective') dc.add_argument('name', help='Name of server to delete from collective') jc = sp.add_parser('join', help='Join a collective. Run collective join -h for more information') @@ -138,7 +143,7 @@ def main(): if cmdset.command == 'gencert': make_certificate() elif cmdset.command == 'invite': - show_invitation(cmdset.name) + show_invitation(cmdset.name, cmdset.n) elif cmdset.command == 'join': join_collective(cmdset.server, cmdset.i) elif cmdset.command == 'show': diff --git a/confluent_server/confluent/collective/invites.py b/confluent_server/confluent/collective/invites.py index b635dbeb..71693b1d 100644 --- a/confluent_server/confluent/collective/invites.py +++ b/confluent_server/confluent/collective/invites.py @@ -22,12 +22,13 @@ import hmac import os pending_invites = {} -def create_server_invitation(servername): +def create_server_invitation(servername, role): servername = servername.encode('utf-8') randbytes = (3 - ((len(servername) + 2) % 3)) % 3 + 64 invitation = os.urandom(randbytes) - pending_invites[servername] = invitation - return base64.b64encode(servername + b'@' + invitation) + pending_invites[servername] = {'invitation': invitation, 'role': role} + invite = servername + b'@' + invitation + return base64.b64encode(invite) def create_client_proof(invitation, mycert, peercert): return hmac.new(invitation, peercert + mycert, hashlib.sha256).digest() @@ -42,6 +43,8 @@ def check_client_proof(servername, mycert, peercert, proof): if servername not in pending_invites: return False invitation = pending_invites[servername] + role = invitation['role'] + invitation = invitation['invitation'] validproof = hmac.new(invitation, mycert + peercert, hashlib.sha256 ).digest() if proof == validproof: @@ -54,7 +57,7 @@ def check_client_proof(servername, mycert, peercert, proof): # Now to generate an answer...., reverse the cert order so our answer # is different, but still proving things return hmac.new(invitation, peercert + mycert, hashlib.sha256 - ).digest() + ).digest(), role # The given proof did not verify the invitation - return False + return False, None diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 07217049..7b369ff5 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -153,7 +153,7 @@ def connect_to_leader(cert=None, name=None, leader=None, remote=None): for c in colldata: cfm._true_add_collective_member(c, colldata[c]['address'], colldata[c]['fingerprint'], - sync=False) + sync=False, role=colldata[c].get('role', None)) for globvar in globaldata: cfm.set_global(globvar, globaldata[globvar], False) cfm._txcount = dbi.get('txcount', 0) @@ -328,7 +328,8 @@ def handle_connection(connection, cert, request, local=False): #TODO(jjohnson2): Cannot do the invitation if not the head node, the certificate hand-carrying #can't work in such a case. name = request['name'] - invitation = invites.create_server_invitation(name) + role = request.get('role', '') + invitation = invites.create_server_invitation(name, role) tlvdata.send(connection, {'collective': {'invitation': invitation}}) connection.close() @@ -401,8 +402,8 @@ def handle_connection(connection, cert, request, local=False): cfm.check_quorum() mycert = util.get_certificate_from_file('/etc/confluent/srvcert.pem') proof = base64.b64decode(request['hmac']) - myrsp = invites.check_client_proof(request['name'], mycert, - cert, proof) + myrsp, role = invites.check_client_proof(request['name'], mycert, + cert, proof) if not myrsp: tlvdata.send(connection, {'error': 'Invalid token'}) connection.close() @@ -418,7 +419,7 @@ def handle_connection(connection, cert, request, local=False): cfm.add_collective_member(get_myname(), connection.getsockname()[0], myfprint) cfm.add_collective_member(request['name'], - connection.getpeername()[0], fprint) + connection.getpeername()[0], fprint, role) myleader = get_leader(connection) ldrfprint = cfm.get_collective_member_by_address( myleader)['fingerprint'] @@ -590,9 +591,12 @@ def populate_collinfo(collinfo): activemembers = set(cfm.cfgstreams) activemembers.add(iam) collinfo['offline'] = [] + collinfo['nonvoting'] = [] for member in cfm.list_collective(): if member not in activemembers: collinfo['offline'].append(member) + if cfm.get_collective_member(member).get('role', None) == 'nonvoting': + collinfo['nonvoting'].append(member) def try_assimilate(drone, followcount, remote): diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index e980a8be..46984055 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -89,6 +89,7 @@ import string import struct import sys import threading +import time import traceback try: unicode @@ -320,7 +321,7 @@ def _rpc_set_group_attributes(tenant, attribmap, autocreate): def check_quorum(): if isinstance(cfgleader, bool): raise exc.DegradedCollective() - if (not cfgleader) and len(cfgstreams) < (len(_cfgstore.get('collective', {})) // 2): + if (not cfgleader) and (not has_quorum()): # the leader counts in addition to registered streams raise exc.DegradedCollective() if cfgleader and not _hasquorum: @@ -348,9 +349,9 @@ def exec_on_followers(fnname, *args): pushes = eventlet.GreenPool() # Check health of collective prior to attempting for _ in pushes.starmap( - _push_rpc, [(cfgstreams[s], b'') for s in cfgstreams]): + _push_rpc, [(cfgstreams[s]['stream'], b'') for s in cfgstreams]): pass - if len(cfgstreams) < (len(_cfgstore['collective']) // 2): + if not has_quorum(): # the leader counts in addition to registered streams raise exc.DegradedCollective() exec_on_followers_unconditional(fnname, *args) @@ -363,7 +364,7 @@ def exec_on_followers_unconditional(fnname, *args): payload = msgpack.packb({'function': fnname, 'args': args, 'txcount': _txcount}, use_bin_type=False) for _ in pushes.starmap( - _push_rpc, [(cfgstreams[s], payload) for s in cfgstreams]): + _push_rpc, [(cfgstreams[s]['stream'], payload) for s in cfgstreams]): pass @@ -615,6 +616,37 @@ def set_global(globalname, value, sync=True): if sync: ConfigManager._bg_sync_to_file() +mycachedname = [None, 0] +def get_myname(): + if mycachedname[1] > time.time() - 15: + return mycachedname[0] + try: + with open('/etc/confluent/cfg/myname', 'r') as f: + mycachedname[0] = f.read().strip() + mycachedname[1] = time.time() + return mycachedname[0] + except IOError: + myname = socket.gethostname().split('.')[0] + with open('/etc/confluent/cfg/myname', 'w') as f: + f.write(myname) + mycachedname[0] = myname + mycachedname[1] = time.time() + return myname + +def has_quorum(): + voters = 0 + for follower in cfgstreams: + if cfgstreams[follower].get('role', None) != 'nonvoting': + voters += 1 + myrole = get_collective_member(get_myname()).get('role', None) + if myrole != 'nonvoting': + voters += 1 + allvoters = 0 + for ghost in list_collective(): + if get_collective_member(ghost).get('role', None) != 'nonvoting': + allvoters += 1 + return voters > allvoters // 2 + cfgstreams = {} def relay_slaved_requests(name, listener): global cfgleader @@ -622,21 +654,21 @@ def relay_slaved_requests(name, listener): pushes = eventlet.GreenPool() if name not in _followerlocks: _followerlocks[name] = gthread.RLock() + meminfo = get_collective_member(name) with _followerlocks[name]: try: stop_following() if name in cfgstreams: try: - cfgstreams[name].close() + cfgstreams[name]['stream'].close() except Exception: pass del cfgstreams[name] if membership_callback: membership_callback() - cfgstreams[name] = listener + cfgstreams[name] = {'stream': listener, 'role': meminfo.get('role', None)} lh = StreamHandler(listener) - _hasquorum = len(cfgstreams) >= ( - len(_cfgstore['collective']) // 2) + _hasquorum = has_quorum() _newquorum = None while _hasquorum != _newquorum: if _newquorum is not None: @@ -644,10 +676,9 @@ def relay_slaved_requests(name, listener): payload = msgpack.packb({'quorum': _hasquorum}, use_bin_type=False) for _ in pushes.starmap( _push_rpc, - [(cfgstreams[s], payload) for s in cfgstreams]): + [(cfgstreams[s]['stream'], payload) for s in cfgstreams]): pass - _newquorum = len(cfgstreams) >= ( - len(_cfgstore['collective']) // 2) + _newquorum = has_quorum() _hasquorum = _newquorum if _hasquorum and _pending_collective_updates: apply_pending_collective_updates() @@ -693,12 +724,11 @@ def relay_slaved_requests(name, listener): except KeyError: pass # May have already been closed/deleted... if cfgstreams: - _hasquorum = len(cfgstreams) >= ( - len(_cfgstore['collective']) // 2) + _hasquorum = has_quorum() payload = msgpack.packb({'quorum': _hasquorum}, use_bin_type=False) for _ in pushes.starmap( _push_rpc, - [(cfgstreams[s], payload) for s in cfgstreams]): + [(cfgstreams[s]['stream'], payload) for s in cfgstreams]): pass if membership_callback: membership_callback() @@ -756,8 +786,8 @@ def stop_leading(newleader=None): for stream in list(cfgstreams): try: if rpcpayload is not None: - _push_rpc(cfgstreams[stream], rpcpayload) - cfgstreams[stream].close() + _push_rpc(cfgstreams[stream]['stream'], rpcpayload) + cfgstreams[stream]['stream'].close() except Exception: pass try: @@ -876,12 +906,12 @@ def follow_channel(channel): return {} -def add_collective_member(name, address, fingerprint): +def add_collective_member(name, address, fingerprint, role=None): if cfgleader: - return exec_on_leader('add_collective_member', name, address, fingerprint) + return exec_on_leader('add_collective_member', name, address, fingerprint, role) if cfgstreams: - exec_on_followers('_true_add_collective_member', name, address, fingerprint) - _true_add_collective_member(name, address, fingerprint) + exec_on_followers('_true_add_collective_member', name, address, fingerprint, True, role) + _true_add_collective_member(name, address, fingerprint, role=role) def del_collective_member(name): if cfgleader and not isinstance(cfgleader, bool): @@ -934,7 +964,7 @@ def apply_pending_collective_updates(): del _pending_collective_updates[name] -def _true_add_collective_member(name, address, fingerprint, sync=True): +def _true_add_collective_member(name, address, fingerprint, sync=True, role=None): name = confluent.util.stringify(name) if _cfgstore is None: init(not sync) # use not sync to avoid read from disk @@ -942,6 +972,8 @@ def _true_add_collective_member(name, address, fingerprint, sync=True): _cfgstore['collective'] = {} _cfgstore['collective'][name] = {'name': name, 'address': address, 'fingerprint': fingerprint} + if role: + _cfgstore['collective'][name]['role'] = role with _dirtylock: if 'collectivedirty' not in _cfgstore: _cfgstore['collectivedirty'] = set([])