From 285a159ba57a5de64a5201eef8ecde74162621f0 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Mon, 24 Jul 2023 11:11:39 -0400 Subject: [PATCH] Implement a number of improvements for collective For one, remove 'non-voting' members from being leaders. Large number of leader candidates create long delays for converging on a valid organization. Further, some treat 'non-voting' more roughly, inducing the worst case convergence scenario of unclean shutdown of leader. Convergence now happens fairly quickly for collectives with large number of non-voting members. During initial DB transfer, the leader would be tied up unreasonably long handling the jsonification of a large configuration. Offload to a worker process to allow the leader to continue operation while this intensive, rare operation occurs. Reliably run a reassimilation procedure for the lifetime of the leader. This allows orphaned members to be prompted to join the correct leader. Serialize the onboarding of a connecting member, and have redundancy more gracefully paused. This avoids excessive waiting in lock and more deterministic timing with respect to timeout expectations by the connecting system. --- .../confluent/collective/manager.py | 53 ++++++++++++++--- .../confluent/config/configmanager.py | 58 ++++++++++++++++--- 2 files changed, 95 insertions(+), 16 deletions(-) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 3549a9e2..ebd90c7c 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -84,6 +84,8 @@ def connect_to_leader(cert=None, name=None, leader=None, remote=None): with connecting: with cfm._initlock: banner = tlvdata.recv(remote) # the banner + if not banner: + return vers = banner.split()[2] if vers not in (b'v2', b'v3'): raise Exception('This instance only supports protocol 2 or 3, synchronize versions between collective members') @@ -103,7 +105,12 @@ def connect_to_leader(cert=None, name=None, leader=None, remote=None): '{0}'.format(leader), 'subsystem': 'collective'}) return False + if 'waitinline' in keydata: + eventlet.sleep(0.1) + return connect_to_leader(cert, name, leader, remote) if 'leader' in keydata: + if keydata['leader'] == None: + return None log.log( {'info': 'Prospective leader {0} has redirected this ' 'member to {1}'.format(leader, keydata['leader']), @@ -271,7 +278,11 @@ def handle_connection(connection, cert, request, local=False): return if follower is not None: linfo = cfm.get_collective_member_by_address(currentleader) - remote = socket.create_connection((currentleader, 13001), 15) + try: + remote = socket.create_connection((currentleader, 13001), 15) + except Exception: + cfm.stop_following() + return remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE, keyfile='/etc/confluent/privkey.pem', certfile='/etc/confluent/srvcert.pem') @@ -537,6 +548,11 @@ def handle_connection(connection, cert, request, local=False): 'backoff': True}) connection.close() return + if leader_init.active: + tlvdata.send(connection, {'error': 'Servicing a connection', + 'waitinline': True}) + connection.close() + return if myself != get_leader(connection): tlvdata.send( connection, @@ -569,9 +585,15 @@ def handle_connection(connection, cert, request, local=False): tlvdata.send(connection, cfm._cfgstore['collective']) tlvdata.send(connection, {'confluent_uuid': cfm.get_global('confluent_uuid')}) # cfm.get_globals()) cfgdata = cfm.ConfigManager(None)._dump_to_json() - tlvdata.send(connection, {'txcount': cfm._txcount, - 'dbsize': len(cfgdata)}) - connection.sendall(cfgdata) + try: + tlvdata.send(connection, {'txcount': cfm._txcount, + 'dbsize': len(cfgdata)}) + connection.sendall(cfgdata) + except Exception: + try: + connection.close() + finally: + return None #tlvdata.send(connection, {'tenants': 0}) # skip the tenants for now, # so far unused anyway connection.settimeout(90) @@ -660,7 +682,11 @@ def get_leader(connection): def retire_as_leader(newleader=None): global currentleader + global reassimilate cfm.stop_leading(newleader) + if reassimilate is not None: + reassimilate.kill() + reassimilate = None currentleader = None def become_leader(connection): @@ -668,6 +694,10 @@ def become_leader(connection): global follower global retrythread global reassimilate + if cfm.get_collective_member(get_myname()).get('role', None) == 'nonvoting': + log.log({'info': 'Refraining from being leader of collective (nonvoting)', + 'subsystem': 'collective'}) + return False log.log({'info': 'Becoming leader of collective', 'subsystem': 'collective'}) if follower is not None: @@ -679,15 +709,20 @@ def become_leader(connection): retrythread = None currentleader = connection.getsockname()[0] skipaddr = connection.getpeername()[0] + if reassimilate is not None: + reassimilate.kill() + reassimilate = eventlet.spawn(reassimilate_missing) if _assimilate_missing(skipaddr): schedule_rebalance() - if reassimilate is not None: - reassimilate.kill() - reassimilate = eventlet.spawn(reassimilate_missing) + def reassimilate_missing(): eventlet.sleep(30) - while cfm.cfgstreams and _assimilate_missing(): + while True: + try: + _assimilate_missing() + except Exception as e: + cfm.logException() eventlet.sleep(30) def _assimilate_missing(skipaddr=None): @@ -801,6 +836,8 @@ def start_collective(): for member in sorted(list(cfm.list_collective())): if member == myname: continue + if cfm.get_collective_member(member).get('role', None) == 'nonvoting': + continue if cfm.cfgleader is None: cfm.stop_following(True) ldrcandidate = cfm.get_collective_member(member)['address'] diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index a2d789c9..2eac06db 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -59,6 +59,14 @@ except ModuleNotFoundError: import ast import base64 from binascii import hexlify +import os +import sys + +if __name__ == '__main__': + path = os.path.dirname(os.path.realpath(__file__)) + path = os.path.realpath(os.path.join(path, '..', '..')) + if path.startswith('/opt'): + sys.path.append(path) import confluent.config.attributes as allattributes import confluent.config.conf as conf import confluent.log @@ -77,17 +85,16 @@ import eventlet import eventlet.event as event import eventlet.green.select as select import eventlet.green.threading as gthread +import eventlet.green.subprocess as subprocess import fnmatch import hashlib import json import msgpack import operator -import os import random import re import string import struct -import sys import threading import time import traceback @@ -422,7 +429,10 @@ def _push_rpc(stream, payload): pass if membership_callback: membership_callback() - stream.close() + try: + stream.close() + except Exception: + pass def decrypt_value(cryptvalue, @@ -690,7 +700,9 @@ def relay_slaved_requests(name, listener): if name not in cfgstreams: raise Exception("Unexpected loss of node in followers: " + name) sz = struct.unpack('!Q', msg)[0] - if sz != 0: + if sz == 0: + _push_rpc(listener, b'') + else: rpc = b'' while len(rpc) < sz: nrpc = listener.recv(sz - len(rpc)) @@ -740,6 +752,16 @@ def relay_slaved_requests(name, listener): return False return True +lastheartbeat = None +def check_leader(): + _push_rpc(cfgleader, b'') + tries = 0 + while tries < 30: + eventlet.sleep(0.1) + tries += 1 + if lastheartbeat and lastheartbeat > (confluent.util.monotonic_time() - 3): + return True + raise Exception("Leader has disappeared") class StreamHandler(object): def __init__(self, sock): @@ -761,10 +783,13 @@ class StreamHandler(object): res = _push_rpc(self.sock, b'') # nulls are a keepalive if not res: return None + #TODO: this test can work fine even if the other end is + # gone, go to a more affirmative test to more quickly + # detect outage to peer self.keepalive = confluent.util.monotonic_time() + 20 self.expiry = confluent.util.monotonic_time() + 60 msg = self.sock.recv(8) - except Exception: + except Exception as e: msg = None return msg @@ -858,6 +883,7 @@ cfgleader = None def follow_channel(channel): global _txcount global _hasquorum + global lastheartbeat try: stop_leading() stop_following(channel) @@ -865,7 +891,9 @@ def follow_channel(channel): msg = lh.get_next_msg() while msg: sz = struct.unpack('!Q', msg)[0] - if sz != 0: + if sz == 0: + lastheartbeat = confluent.util.monotonic_time() + else: rpc = b'' while len(rpc) < sz: nrpc = channel.recv(sz - len(rpc)) @@ -2540,6 +2568,15 @@ class ConfigManager(object): data. """ + with open(os.devnull, 'w+') as devnull: + worker = subprocess.Popen( + [sys.executable, __file__, '-r' if redact else ''], + stdin=devnull, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + stdout, stderr = worker.communicate() + return stdout + + def _real_dump_to_json(self, redact=None): dumpdata = {} for confarea in _config_areas: if confarea not in self._cfgstore: @@ -2871,9 +2908,9 @@ def dump_db_to_directory(location, password, redact=None, skipkeys=False): with open(os.path.join(location, 'keys.json'), 'w') as cfgfile: cfgfile.write(_dump_keys(password)) cfgfile.write('\n') - with open(os.path.join(location, 'main.json'), 'w') as cfgfile: + with open(os.path.join(location, 'main.json'), 'wb') as cfgfile: cfgfile.write(ConfigManager(tenant=None)._dump_to_json(redact=redact)) - cfgfile.write('\n') + cfgfile.write(b'\n') if 'collective' in _cfgstore: with open(os.path.join(location, 'collective.json'), 'w') as cfgfile: cfgfile.write(json.dumps(_cfgstore['collective'])) @@ -2914,6 +2951,11 @@ def init(stateless=False): _cfgstore = {} +if __name__ == '__main__': + redact=None + if '-r' in sys.argv: + redact=True + sys.stdout.write(ConfigManager(None)._real_dump_to_json(redact)) # some unit tests worth implementing: # set group attribute on lower priority group, result is that node should not # change