diff --git a/confluent_server/bin/collective b/confluent_server/bin/collective index 4f761ca0..b5ebab5a 100644 --- a/confluent_server/bin/collective +++ b/confluent_server/bin/collective @@ -52,11 +52,12 @@ def make_certificate(): os.umask(umask) -def show_invitation(name): +def show_invitation(name, nonvoting=False): if not os.path.exists('/etc/confluent/srvcert.pem'): make_certificate() s = client.Command().connection - tlvdata.send(s, {'collective': {'operation': 'invite', 'name': name}}) + role = 'nonvoting' if nonvoting else None + tlvdata.send(s, {'collective': {'operation': 'invite', 'name': name, 'role': role}}) invite = tlvdata.recv(s)['collective'] if 'error' in invite: sys.stderr.write(invite['error'] + '\n') @@ -104,16 +105,19 @@ def show_collective(): return if 'quorum' in res['collective']: print('Quorum: {0}'.format(res['collective']['quorum'])) - print('Leader: {0}'.format(res['collective']['leader'])) + leadernonvoting = res['collective']['leader'] in res['collective'].get('nonvoting', ()) + print('Leader: {0}{1}'.format(res['collective']['leader'], ' (non-voting)' if leadernonvoting else '')) if 'active' in res['collective']: if res['collective']['active']: print('Active collective members:') for member in sortutil.natural_sort(res['collective']['active']): - print(' {0}'.format(member)) + nonvoter = member in res['collective'].get('nonvoting', ()) + print(' {0}{1}'.format(member, ' (nonvoting)' if nonvoter else '')) if res['collective']['offline']: print('Offline collective members:') for member in sortutil.natural_sort(res['collective']['offline']): - print(' {0}'.format(member)) + nonvoter = member in res['collective'].get('nonvoting', ()) + print(' {0}{1}'.format(member, ' (nonvoting)' if nonvoter else '')) else: print('Run collective show on leader for more data') @@ -128,6 +132,7 @@ def main(): 'collective member. Run collective invite -h for more information') ic.add_argument('name', help='Name of server to invite to join the ' 'collective') + ic.add_argument('-n', help='Join as a non-voting member, do not have this member contribute to quorum', action='store_true') dc = sp.add_parser('delete', help='Delete a member of a collective') dc.add_argument('name', help='Name of server to delete from collective') jc = sp.add_parser('join', help='Join a collective. Run collective join -h for more information') @@ -138,7 +143,7 @@ def main(): if cmdset.command == 'gencert': make_certificate() elif cmdset.command == 'invite': - show_invitation(cmdset.name) + show_invitation(cmdset.name, cmdset.n) elif cmdset.command == 'join': join_collective(cmdset.server, cmdset.i) elif cmdset.command == 'show': diff --git a/confluent_server/confluent/collective/invites.py b/confluent_server/confluent/collective/invites.py index b635dbeb..71693b1d 100644 --- a/confluent_server/confluent/collective/invites.py +++ b/confluent_server/confluent/collective/invites.py @@ -22,12 +22,13 @@ import hmac import os pending_invites = {} -def create_server_invitation(servername): +def create_server_invitation(servername, role): servername = servername.encode('utf-8') randbytes = (3 - ((len(servername) + 2) % 3)) % 3 + 64 invitation = os.urandom(randbytes) - pending_invites[servername] = invitation - return base64.b64encode(servername + b'@' + invitation) + pending_invites[servername] = {'invitation': invitation, 'role': role} + invite = servername + b'@' + invitation + return base64.b64encode(invite) def create_client_proof(invitation, mycert, peercert): return hmac.new(invitation, peercert + mycert, hashlib.sha256).digest() @@ -42,6 +43,8 @@ def check_client_proof(servername, mycert, peercert, proof): if servername not in pending_invites: return False invitation = pending_invites[servername] + role = invitation['role'] + invitation = invitation['invitation'] validproof = hmac.new(invitation, mycert + peercert, hashlib.sha256 ).digest() if proof == validproof: @@ -54,7 +57,7 @@ def check_client_proof(servername, mycert, peercert, proof): # Now to generate an answer...., reverse the cert order so our answer # is different, but still proving things return hmac.new(invitation, peercert + mycert, hashlib.sha256 - ).digest() + ).digest(), role # The given proof did not verify the invitation - return False + return False, None diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 07217049..7b369ff5 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -153,7 +153,7 @@ def connect_to_leader(cert=None, name=None, leader=None, remote=None): for c in colldata: cfm._true_add_collective_member(c, colldata[c]['address'], colldata[c]['fingerprint'], - sync=False) + sync=False, role=colldata[c].get('role', None)) for globvar in globaldata: cfm.set_global(globvar, globaldata[globvar], False) cfm._txcount = dbi.get('txcount', 0) @@ -328,7 +328,8 @@ def handle_connection(connection, cert, request, local=False): #TODO(jjohnson2): Cannot do the invitation if not the head node, the certificate hand-carrying #can't work in such a case. name = request['name'] - invitation = invites.create_server_invitation(name) + role = request.get('role', '') + invitation = invites.create_server_invitation(name, role) tlvdata.send(connection, {'collective': {'invitation': invitation}}) connection.close() @@ -401,8 +402,8 @@ def handle_connection(connection, cert, request, local=False): cfm.check_quorum() mycert = util.get_certificate_from_file('/etc/confluent/srvcert.pem') proof = base64.b64decode(request['hmac']) - myrsp = invites.check_client_proof(request['name'], mycert, - cert, proof) + myrsp, role = invites.check_client_proof(request['name'], mycert, + cert, proof) if not myrsp: tlvdata.send(connection, {'error': 'Invalid token'}) connection.close() @@ -418,7 +419,7 @@ def handle_connection(connection, cert, request, local=False): cfm.add_collective_member(get_myname(), connection.getsockname()[0], myfprint) cfm.add_collective_member(request['name'], - connection.getpeername()[0], fprint) + connection.getpeername()[0], fprint, role) myleader = get_leader(connection) ldrfprint = cfm.get_collective_member_by_address( myleader)['fingerprint'] @@ -590,9 +591,12 @@ def populate_collinfo(collinfo): activemembers = set(cfm.cfgstreams) activemembers.add(iam) collinfo['offline'] = [] + collinfo['nonvoting'] = [] for member in cfm.list_collective(): if member not in activemembers: collinfo['offline'].append(member) + if cfm.get_collective_member(member).get('role', None) == 'nonvoting': + collinfo['nonvoting'].append(member) def try_assimilate(drone, followcount, remote): diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index e980a8be..46984055 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -89,6 +89,7 @@ import string import struct import sys import threading +import time import traceback try: unicode @@ -320,7 +321,7 @@ def _rpc_set_group_attributes(tenant, attribmap, autocreate): def check_quorum(): if isinstance(cfgleader, bool): raise exc.DegradedCollective() - if (not cfgleader) and len(cfgstreams) < (len(_cfgstore.get('collective', {})) // 2): + if (not cfgleader) and (not has_quorum()): # the leader counts in addition to registered streams raise exc.DegradedCollective() if cfgleader and not _hasquorum: @@ -348,9 +349,9 @@ def exec_on_followers(fnname, *args): pushes = eventlet.GreenPool() # Check health of collective prior to attempting for _ in pushes.starmap( - _push_rpc, [(cfgstreams[s], b'') for s in cfgstreams]): + _push_rpc, [(cfgstreams[s]['stream'], b'') for s in cfgstreams]): pass - if len(cfgstreams) < (len(_cfgstore['collective']) // 2): + if not has_quorum(): # the leader counts in addition to registered streams raise exc.DegradedCollective() exec_on_followers_unconditional(fnname, *args) @@ -363,7 +364,7 @@ def exec_on_followers_unconditional(fnname, *args): payload = msgpack.packb({'function': fnname, 'args': args, 'txcount': _txcount}, use_bin_type=False) for _ in pushes.starmap( - _push_rpc, [(cfgstreams[s], payload) for s in cfgstreams]): + _push_rpc, [(cfgstreams[s]['stream'], payload) for s in cfgstreams]): pass @@ -615,6 +616,37 @@ def set_global(globalname, value, sync=True): if sync: ConfigManager._bg_sync_to_file() +mycachedname = [None, 0] +def get_myname(): + if mycachedname[1] > time.time() - 15: + return mycachedname[0] + try: + with open('/etc/confluent/cfg/myname', 'r') as f: + mycachedname[0] = f.read().strip() + mycachedname[1] = time.time() + return mycachedname[0] + except IOError: + myname = socket.gethostname().split('.')[0] + with open('/etc/confluent/cfg/myname', 'w') as f: + f.write(myname) + mycachedname[0] = myname + mycachedname[1] = time.time() + return myname + +def has_quorum(): + voters = 0 + for follower in cfgstreams: + if cfgstreams[follower].get('role', None) != 'nonvoting': + voters += 1 + myrole = get_collective_member(get_myname()).get('role', None) + if myrole != 'nonvoting': + voters += 1 + allvoters = 0 + for ghost in list_collective(): + if get_collective_member(ghost).get('role', None) != 'nonvoting': + allvoters += 1 + return voters > allvoters // 2 + cfgstreams = {} def relay_slaved_requests(name, listener): global cfgleader @@ -622,21 +654,21 @@ def relay_slaved_requests(name, listener): pushes = eventlet.GreenPool() if name not in _followerlocks: _followerlocks[name] = gthread.RLock() + meminfo = get_collective_member(name) with _followerlocks[name]: try: stop_following() if name in cfgstreams: try: - cfgstreams[name].close() + cfgstreams[name]['stream'].close() except Exception: pass del cfgstreams[name] if membership_callback: membership_callback() - cfgstreams[name] = listener + cfgstreams[name] = {'stream': listener, 'role': meminfo.get('role', None)} lh = StreamHandler(listener) - _hasquorum = len(cfgstreams) >= ( - len(_cfgstore['collective']) // 2) + _hasquorum = has_quorum() _newquorum = None while _hasquorum != _newquorum: if _newquorum is not None: @@ -644,10 +676,9 @@ def relay_slaved_requests(name, listener): payload = msgpack.packb({'quorum': _hasquorum}, use_bin_type=False) for _ in pushes.starmap( _push_rpc, - [(cfgstreams[s], payload) for s in cfgstreams]): + [(cfgstreams[s]['stream'], payload) for s in cfgstreams]): pass - _newquorum = len(cfgstreams) >= ( - len(_cfgstore['collective']) // 2) + _newquorum = has_quorum() _hasquorum = _newquorum if _hasquorum and _pending_collective_updates: apply_pending_collective_updates() @@ -693,12 +724,11 @@ def relay_slaved_requests(name, listener): except KeyError: pass # May have already been closed/deleted... if cfgstreams: - _hasquorum = len(cfgstreams) >= ( - len(_cfgstore['collective']) // 2) + _hasquorum = has_quorum() payload = msgpack.packb({'quorum': _hasquorum}, use_bin_type=False) for _ in pushes.starmap( _push_rpc, - [(cfgstreams[s], payload) for s in cfgstreams]): + [(cfgstreams[s]['stream'], payload) for s in cfgstreams]): pass if membership_callback: membership_callback() @@ -756,8 +786,8 @@ def stop_leading(newleader=None): for stream in list(cfgstreams): try: if rpcpayload is not None: - _push_rpc(cfgstreams[stream], rpcpayload) - cfgstreams[stream].close() + _push_rpc(cfgstreams[stream]['stream'], rpcpayload) + cfgstreams[stream]['stream'].close() except Exception: pass try: @@ -876,12 +906,12 @@ def follow_channel(channel): return {} -def add_collective_member(name, address, fingerprint): +def add_collective_member(name, address, fingerprint, role=None): if cfgleader: - return exec_on_leader('add_collective_member', name, address, fingerprint) + return exec_on_leader('add_collective_member', name, address, fingerprint, role) if cfgstreams: - exec_on_followers('_true_add_collective_member', name, address, fingerprint) - _true_add_collective_member(name, address, fingerprint) + exec_on_followers('_true_add_collective_member', name, address, fingerprint, True, role) + _true_add_collective_member(name, address, fingerprint, role=role) def del_collective_member(name): if cfgleader and not isinstance(cfgleader, bool): @@ -934,7 +964,7 @@ def apply_pending_collective_updates(): del _pending_collective_updates[name] -def _true_add_collective_member(name, address, fingerprint, sync=True): +def _true_add_collective_member(name, address, fingerprint, sync=True, role=None): name = confluent.util.stringify(name) if _cfgstore is None: init(not sync) # use not sync to avoid read from disk @@ -942,6 +972,8 @@ def _true_add_collective_member(name, address, fingerprint, sync=True): _cfgstore['collective'] = {} _cfgstore['collective'][name] = {'name': name, 'address': address, 'fingerprint': fingerprint} + if role: + _cfgstore['collective'][name]['role'] = role with _dirtylock: if 'collectivedirty' not in _cfgstore: _cfgstore['collectivedirty'] = set([])