2
0
mirror of https://github.com/xcat2/confluent.git synced 2024-11-24 18:41:55 +00:00

Implement non-voting collective members

Provide for applications
where only a small subset of collective
members should be
considered to count
toward whether the collective
can proceed.

Commonly, 'service' nodes may
be numerous to do work, but may all want to go offline
during a maintenance window.
This commit is contained in:
Jarrod Johnson 2023-03-06 11:56:15 -05:00
parent a385b1e93d
commit baa365fcac
4 changed files with 81 additions and 37 deletions

View File

@ -52,11 +52,12 @@ def make_certificate():
os.umask(umask)
def show_invitation(name):
def show_invitation(name, nonvoting=False):
if not os.path.exists('/etc/confluent/srvcert.pem'):
make_certificate()
s = client.Command().connection
tlvdata.send(s, {'collective': {'operation': 'invite', 'name': name}})
role = 'nonvoting' if nonvoting else None
tlvdata.send(s, {'collective': {'operation': 'invite', 'name': name, 'role': role}})
invite = tlvdata.recv(s)['collective']
if 'error' in invite:
sys.stderr.write(invite['error'] + '\n')
@ -104,16 +105,19 @@ def show_collective():
return
if 'quorum' in res['collective']:
print('Quorum: {0}'.format(res['collective']['quorum']))
print('Leader: {0}'.format(res['collective']['leader']))
leadernonvoting = res['collective']['leader'] in res['collective'].get('nonvoting', ())
print('Leader: {0}{1}'.format(res['collective']['leader'], ' (non-voting)' if leadernonvoting else ''))
if 'active' in res['collective']:
if res['collective']['active']:
print('Active collective members:')
for member in sortutil.natural_sort(res['collective']['active']):
print(' {0}'.format(member))
nonvoter = member in res['collective'].get('nonvoting', ())
print(' {0}{1}'.format(member, ' (nonvoting)' if nonvoter else ''))
if res['collective']['offline']:
print('Offline collective members:')
for member in sortutil.natural_sort(res['collective']['offline']):
print(' {0}'.format(member))
nonvoter = member in res['collective'].get('nonvoting', ())
print(' {0}{1}'.format(member, ' (nonvoting)' if nonvoter else ''))
else:
print('Run collective show on leader for more data')
@ -128,6 +132,7 @@ def main():
'collective member. Run collective invite -h for more information')
ic.add_argument('name', help='Name of server to invite to join the '
'collective')
ic.add_argument('-n', help='Join as a non-voting member, do not have this member contribute to quorum', action='store_true')
dc = sp.add_parser('delete', help='Delete a member of a collective')
dc.add_argument('name', help='Name of server to delete from collective')
jc = sp.add_parser('join', help='Join a collective. Run collective join -h for more information')
@ -138,7 +143,7 @@ def main():
if cmdset.command == 'gencert':
make_certificate()
elif cmdset.command == 'invite':
show_invitation(cmdset.name)
show_invitation(cmdset.name, cmdset.n)
elif cmdset.command == 'join':
join_collective(cmdset.server, cmdset.i)
elif cmdset.command == 'show':

View File

@ -22,12 +22,13 @@ import hmac
import os
pending_invites = {}
def create_server_invitation(servername):
def create_server_invitation(servername, role):
servername = servername.encode('utf-8')
randbytes = (3 - ((len(servername) + 2) % 3)) % 3 + 64
invitation = os.urandom(randbytes)
pending_invites[servername] = invitation
return base64.b64encode(servername + b'@' + invitation)
pending_invites[servername] = {'invitation': invitation, 'role': role}
invite = servername + b'@' + invitation
return base64.b64encode(invite)
def create_client_proof(invitation, mycert, peercert):
return hmac.new(invitation, peercert + mycert, hashlib.sha256).digest()
@ -42,6 +43,8 @@ def check_client_proof(servername, mycert, peercert, proof):
if servername not in pending_invites:
return False
invitation = pending_invites[servername]
role = invitation['role']
invitation = invitation['invitation']
validproof = hmac.new(invitation, mycert + peercert, hashlib.sha256
).digest()
if proof == validproof:
@ -54,7 +57,7 @@ def check_client_proof(servername, mycert, peercert, proof):
# Now to generate an answer...., reverse the cert order so our answer
# is different, but still proving things
return hmac.new(invitation, peercert + mycert, hashlib.sha256
).digest()
).digest(), role
# The given proof did not verify the invitation
return False
return False, None

View File

@ -153,7 +153,7 @@ def connect_to_leader(cert=None, name=None, leader=None, remote=None):
for c in colldata:
cfm._true_add_collective_member(c, colldata[c]['address'],
colldata[c]['fingerprint'],
sync=False)
sync=False, role=colldata[c].get('role', None))
for globvar in globaldata:
cfm.set_global(globvar, globaldata[globvar], False)
cfm._txcount = dbi.get('txcount', 0)
@ -328,7 +328,8 @@ def handle_connection(connection, cert, request, local=False):
#TODO(jjohnson2): Cannot do the invitation if not the head node, the certificate hand-carrying
#can't work in such a case.
name = request['name']
invitation = invites.create_server_invitation(name)
role = request.get('role', '')
invitation = invites.create_server_invitation(name, role)
tlvdata.send(connection,
{'collective': {'invitation': invitation}})
connection.close()
@ -401,8 +402,8 @@ def handle_connection(connection, cert, request, local=False):
cfm.check_quorum()
mycert = util.get_certificate_from_file('/etc/confluent/srvcert.pem')
proof = base64.b64decode(request['hmac'])
myrsp = invites.check_client_proof(request['name'], mycert,
cert, proof)
myrsp, role = invites.check_client_proof(request['name'], mycert,
cert, proof)
if not myrsp:
tlvdata.send(connection, {'error': 'Invalid token'})
connection.close()
@ -418,7 +419,7 @@ def handle_connection(connection, cert, request, local=False):
cfm.add_collective_member(get_myname(),
connection.getsockname()[0], myfprint)
cfm.add_collective_member(request['name'],
connection.getpeername()[0], fprint)
connection.getpeername()[0], fprint, role)
myleader = get_leader(connection)
ldrfprint = cfm.get_collective_member_by_address(
myleader)['fingerprint']
@ -590,9 +591,12 @@ def populate_collinfo(collinfo):
activemembers = set(cfm.cfgstreams)
activemembers.add(iam)
collinfo['offline'] = []
collinfo['nonvoting'] = []
for member in cfm.list_collective():
if member not in activemembers:
collinfo['offline'].append(member)
if cfm.get_collective_member(member).get('role', None) == 'nonvoting':
collinfo['nonvoting'].append(member)
def try_assimilate(drone, followcount, remote):

View File

@ -89,6 +89,7 @@ import string
import struct
import sys
import threading
import time
import traceback
try:
unicode
@ -320,7 +321,7 @@ def _rpc_set_group_attributes(tenant, attribmap, autocreate):
def check_quorum():
if isinstance(cfgleader, bool):
raise exc.DegradedCollective()
if (not cfgleader) and len(cfgstreams) < (len(_cfgstore.get('collective', {})) // 2):
if (not cfgleader) and (not has_quorum()):
# the leader counts in addition to registered streams
raise exc.DegradedCollective()
if cfgleader and not _hasquorum:
@ -348,9 +349,9 @@ def exec_on_followers(fnname, *args):
pushes = eventlet.GreenPool()
# Check health of collective prior to attempting
for _ in pushes.starmap(
_push_rpc, [(cfgstreams[s], b'') for s in cfgstreams]):
_push_rpc, [(cfgstreams[s]['stream'], b'') for s in cfgstreams]):
pass
if len(cfgstreams) < (len(_cfgstore['collective']) // 2):
if not has_quorum():
# the leader counts in addition to registered streams
raise exc.DegradedCollective()
exec_on_followers_unconditional(fnname, *args)
@ -363,7 +364,7 @@ def exec_on_followers_unconditional(fnname, *args):
payload = msgpack.packb({'function': fnname, 'args': args,
'txcount': _txcount}, use_bin_type=False)
for _ in pushes.starmap(
_push_rpc, [(cfgstreams[s], payload) for s in cfgstreams]):
_push_rpc, [(cfgstreams[s]['stream'], payload) for s in cfgstreams]):
pass
@ -615,6 +616,37 @@ def set_global(globalname, value, sync=True):
if sync:
ConfigManager._bg_sync_to_file()
mycachedname = [None, 0]
def get_myname():
if mycachedname[1] > time.time() - 15:
return mycachedname[0]
try:
with open('/etc/confluent/cfg/myname', 'r') as f:
mycachedname[0] = f.read().strip()
mycachedname[1] = time.time()
return mycachedname[0]
except IOError:
myname = socket.gethostname().split('.')[0]
with open('/etc/confluent/cfg/myname', 'w') as f:
f.write(myname)
mycachedname[0] = myname
mycachedname[1] = time.time()
return myname
def has_quorum():
voters = 0
for follower in cfgstreams:
if cfgstreams[follower].get('role', None) != 'nonvoting':
voters += 1
myrole = get_collective_member(get_myname()).get('role', None)
if myrole != 'nonvoting':
voters += 1
allvoters = 0
for ghost in list_collective():
if get_collective_member(ghost).get('role', None) != 'nonvoting':
allvoters += 1
return voters > allvoters // 2
cfgstreams = {}
def relay_slaved_requests(name, listener):
global cfgleader
@ -622,21 +654,21 @@ def relay_slaved_requests(name, listener):
pushes = eventlet.GreenPool()
if name not in _followerlocks:
_followerlocks[name] = gthread.RLock()
meminfo = get_collective_member(name)
with _followerlocks[name]:
try:
stop_following()
if name in cfgstreams:
try:
cfgstreams[name].close()
cfgstreams[name]['stream'].close()
except Exception:
pass
del cfgstreams[name]
if membership_callback:
membership_callback()
cfgstreams[name] = listener
cfgstreams[name] = {'stream': listener, 'role': meminfo.get('role', None)}
lh = StreamHandler(listener)
_hasquorum = len(cfgstreams) >= (
len(_cfgstore['collective']) // 2)
_hasquorum = has_quorum()
_newquorum = None
while _hasquorum != _newquorum:
if _newquorum is not None:
@ -644,10 +676,9 @@ def relay_slaved_requests(name, listener):
payload = msgpack.packb({'quorum': _hasquorum}, use_bin_type=False)
for _ in pushes.starmap(
_push_rpc,
[(cfgstreams[s], payload) for s in cfgstreams]):
[(cfgstreams[s]['stream'], payload) for s in cfgstreams]):
pass
_newquorum = len(cfgstreams) >= (
len(_cfgstore['collective']) // 2)
_newquorum = has_quorum()
_hasquorum = _newquorum
if _hasquorum and _pending_collective_updates:
apply_pending_collective_updates()
@ -693,12 +724,11 @@ def relay_slaved_requests(name, listener):
except KeyError:
pass # May have already been closed/deleted...
if cfgstreams:
_hasquorum = len(cfgstreams) >= (
len(_cfgstore['collective']) // 2)
_hasquorum = has_quorum()
payload = msgpack.packb({'quorum': _hasquorum}, use_bin_type=False)
for _ in pushes.starmap(
_push_rpc,
[(cfgstreams[s], payload) for s in cfgstreams]):
[(cfgstreams[s]['stream'], payload) for s in cfgstreams]):
pass
if membership_callback:
membership_callback()
@ -756,8 +786,8 @@ def stop_leading(newleader=None):
for stream in list(cfgstreams):
try:
if rpcpayload is not None:
_push_rpc(cfgstreams[stream], rpcpayload)
cfgstreams[stream].close()
_push_rpc(cfgstreams[stream]['stream'], rpcpayload)
cfgstreams[stream]['stream'].close()
except Exception:
pass
try:
@ -876,12 +906,12 @@ def follow_channel(channel):
return {}
def add_collective_member(name, address, fingerprint):
def add_collective_member(name, address, fingerprint, role=None):
if cfgleader:
return exec_on_leader('add_collective_member', name, address, fingerprint)
return exec_on_leader('add_collective_member', name, address, fingerprint, role)
if cfgstreams:
exec_on_followers('_true_add_collective_member', name, address, fingerprint)
_true_add_collective_member(name, address, fingerprint)
exec_on_followers('_true_add_collective_member', name, address, fingerprint, True, role)
_true_add_collective_member(name, address, fingerprint, role=role)
def del_collective_member(name):
if cfgleader and not isinstance(cfgleader, bool):
@ -934,7 +964,7 @@ def apply_pending_collective_updates():
del _pending_collective_updates[name]
def _true_add_collective_member(name, address, fingerprint, sync=True):
def _true_add_collective_member(name, address, fingerprint, sync=True, role=None):
name = confluent.util.stringify(name)
if _cfgstore is None:
init(not sync) # use not sync to avoid read from disk
@ -942,6 +972,8 @@ def _true_add_collective_member(name, address, fingerprint, sync=True):
_cfgstore['collective'] = {}
_cfgstore['collective'][name] = {'name': name, 'address': address,
'fingerprint': fingerprint}
if role:
_cfgstore['collective'][name]['role'] = role
with _dirtylock:
if 'collectivedirty' not in _cfgstore:
_cfgstore['collectivedirty'] = set([])