2
0
mirror of https://github.com/xcat2/confluent.git synced 2024-11-22 01:22:00 +00:00

Implement a number of improvements for collective

For one, remove 'non-voting' members from being leaders.
Large number of leader candidates create long delays for
converging on a valid organization.  Further, some treat 'non-voting'
more roughly, inducing the worst case convergence scenario of unclean
shutdown of leader.
Convergence now happens fairly quickly for collectives with large
number of non-voting members.

During initial DB transfer, the leader would be tied up unreasonably
long handling the jsonification of a large configuration.  Offload to a worker
process to allow the leader to continue operation while this intensive, rare
operation occurs.

Reliably run a reassimilation procedure for the lifetime of the leader.
This allows orphaned members to be prompted to join the correct leader.

Serialize the onboarding of a connecting member, and have redundancy more gracefully
paused. This avoids excessive waiting in lock and more deterministic timing
with respect to timeout expectations by the connecting system.
This commit is contained in:
Jarrod Johnson 2023-07-24 11:11:39 -04:00
parent 8ea2ba046e
commit 285a159ba5
2 changed files with 95 additions and 16 deletions

View File

@ -84,6 +84,8 @@ def connect_to_leader(cert=None, name=None, leader=None, remote=None):
with connecting:
with cfm._initlock:
banner = tlvdata.recv(remote) # the banner
if not banner:
return
vers = banner.split()[2]
if vers not in (b'v2', b'v3'):
raise Exception('This instance only supports protocol 2 or 3, synchronize versions between collective members')
@ -103,7 +105,12 @@ def connect_to_leader(cert=None, name=None, leader=None, remote=None):
'{0}'.format(leader),
'subsystem': 'collective'})
return False
if 'waitinline' in keydata:
eventlet.sleep(0.1)
return connect_to_leader(cert, name, leader, remote)
if 'leader' in keydata:
if keydata['leader'] == None:
return None
log.log(
{'info': 'Prospective leader {0} has redirected this '
'member to {1}'.format(leader, keydata['leader']),
@ -271,7 +278,11 @@ def handle_connection(connection, cert, request, local=False):
return
if follower is not None:
linfo = cfm.get_collective_member_by_address(currentleader)
remote = socket.create_connection((currentleader, 13001), 15)
try:
remote = socket.create_connection((currentleader, 13001), 15)
except Exception:
cfm.stop_following()
return
remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE,
keyfile='/etc/confluent/privkey.pem',
certfile='/etc/confluent/srvcert.pem')
@ -537,6 +548,11 @@ def handle_connection(connection, cert, request, local=False):
'backoff': True})
connection.close()
return
if leader_init.active:
tlvdata.send(connection, {'error': 'Servicing a connection',
'waitinline': True})
connection.close()
return
if myself != get_leader(connection):
tlvdata.send(
connection,
@ -569,9 +585,15 @@ def handle_connection(connection, cert, request, local=False):
tlvdata.send(connection, cfm._cfgstore['collective'])
tlvdata.send(connection, {'confluent_uuid': cfm.get_global('confluent_uuid')}) # cfm.get_globals())
cfgdata = cfm.ConfigManager(None)._dump_to_json()
tlvdata.send(connection, {'txcount': cfm._txcount,
'dbsize': len(cfgdata)})
connection.sendall(cfgdata)
try:
tlvdata.send(connection, {'txcount': cfm._txcount,
'dbsize': len(cfgdata)})
connection.sendall(cfgdata)
except Exception:
try:
connection.close()
finally:
return None
#tlvdata.send(connection, {'tenants': 0}) # skip the tenants for now,
# so far unused anyway
connection.settimeout(90)
@ -660,7 +682,11 @@ def get_leader(connection):
def retire_as_leader(newleader=None):
global currentleader
global reassimilate
cfm.stop_leading(newleader)
if reassimilate is not None:
reassimilate.kill()
reassimilate = None
currentleader = None
def become_leader(connection):
@ -668,6 +694,10 @@ def become_leader(connection):
global follower
global retrythread
global reassimilate
if cfm.get_collective_member(get_myname()).get('role', None) == 'nonvoting':
log.log({'info': 'Refraining from being leader of collective (nonvoting)',
'subsystem': 'collective'})
return False
log.log({'info': 'Becoming leader of collective',
'subsystem': 'collective'})
if follower is not None:
@ -679,15 +709,20 @@ def become_leader(connection):
retrythread = None
currentleader = connection.getsockname()[0]
skipaddr = connection.getpeername()[0]
if reassimilate is not None:
reassimilate.kill()
reassimilate = eventlet.spawn(reassimilate_missing)
if _assimilate_missing(skipaddr):
schedule_rebalance()
if reassimilate is not None:
reassimilate.kill()
reassimilate = eventlet.spawn(reassimilate_missing)
def reassimilate_missing():
eventlet.sleep(30)
while cfm.cfgstreams and _assimilate_missing():
while True:
try:
_assimilate_missing()
except Exception as e:
cfm.logException()
eventlet.sleep(30)
def _assimilate_missing(skipaddr=None):
@ -801,6 +836,8 @@ def start_collective():
for member in sorted(list(cfm.list_collective())):
if member == myname:
continue
if cfm.get_collective_member(member).get('role', None) == 'nonvoting':
continue
if cfm.cfgleader is None:
cfm.stop_following(True)
ldrcandidate = cfm.get_collective_member(member)['address']

View File

@ -59,6 +59,14 @@ except ModuleNotFoundError:
import ast
import base64
from binascii import hexlify
import os
import sys
if __name__ == '__main__':
path = os.path.dirname(os.path.realpath(__file__))
path = os.path.realpath(os.path.join(path, '..', '..'))
if path.startswith('/opt'):
sys.path.append(path)
import confluent.config.attributes as allattributes
import confluent.config.conf as conf
import confluent.log
@ -77,17 +85,16 @@ import eventlet
import eventlet.event as event
import eventlet.green.select as select
import eventlet.green.threading as gthread
import eventlet.green.subprocess as subprocess
import fnmatch
import hashlib
import json
import msgpack
import operator
import os
import random
import re
import string
import struct
import sys
import threading
import time
import traceback
@ -422,7 +429,10 @@ def _push_rpc(stream, payload):
pass
if membership_callback:
membership_callback()
stream.close()
try:
stream.close()
except Exception:
pass
def decrypt_value(cryptvalue,
@ -690,7 +700,9 @@ def relay_slaved_requests(name, listener):
if name not in cfgstreams:
raise Exception("Unexpected loss of node in followers: " + name)
sz = struct.unpack('!Q', msg)[0]
if sz != 0:
if sz == 0:
_push_rpc(listener, b'')
else:
rpc = b''
while len(rpc) < sz:
nrpc = listener.recv(sz - len(rpc))
@ -740,6 +752,16 @@ def relay_slaved_requests(name, listener):
return False
return True
lastheartbeat = None
def check_leader():
_push_rpc(cfgleader, b'')
tries = 0
while tries < 30:
eventlet.sleep(0.1)
tries += 1
if lastheartbeat and lastheartbeat > (confluent.util.monotonic_time() - 3):
return True
raise Exception("Leader has disappeared")
class StreamHandler(object):
def __init__(self, sock):
@ -761,10 +783,13 @@ class StreamHandler(object):
res = _push_rpc(self.sock, b'') # nulls are a keepalive
if not res:
return None
#TODO: this test can work fine even if the other end is
# gone, go to a more affirmative test to more quickly
# detect outage to peer
self.keepalive = confluent.util.monotonic_time() + 20
self.expiry = confluent.util.monotonic_time() + 60
msg = self.sock.recv(8)
except Exception:
except Exception as e:
msg = None
return msg
@ -858,6 +883,7 @@ cfgleader = None
def follow_channel(channel):
global _txcount
global _hasquorum
global lastheartbeat
try:
stop_leading()
stop_following(channel)
@ -865,7 +891,9 @@ def follow_channel(channel):
msg = lh.get_next_msg()
while msg:
sz = struct.unpack('!Q', msg)[0]
if sz != 0:
if sz == 0:
lastheartbeat = confluent.util.monotonic_time()
else:
rpc = b''
while len(rpc) < sz:
nrpc = channel.recv(sz - len(rpc))
@ -2540,6 +2568,15 @@ class ConfigManager(object):
data.
"""
with open(os.devnull, 'w+') as devnull:
worker = subprocess.Popen(
[sys.executable, __file__, '-r' if redact else ''],
stdin=devnull, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = worker.communicate()
return stdout
def _real_dump_to_json(self, redact=None):
dumpdata = {}
for confarea in _config_areas:
if confarea not in self._cfgstore:
@ -2871,9 +2908,9 @@ def dump_db_to_directory(location, password, redact=None, skipkeys=False):
with open(os.path.join(location, 'keys.json'), 'w') as cfgfile:
cfgfile.write(_dump_keys(password))
cfgfile.write('\n')
with open(os.path.join(location, 'main.json'), 'w') as cfgfile:
with open(os.path.join(location, 'main.json'), 'wb') as cfgfile:
cfgfile.write(ConfigManager(tenant=None)._dump_to_json(redact=redact))
cfgfile.write('\n')
cfgfile.write(b'\n')
if 'collective' in _cfgstore:
with open(os.path.join(location, 'collective.json'), 'w') as cfgfile:
cfgfile.write(json.dumps(_cfgstore['collective']))
@ -2914,6 +2951,11 @@ def init(stateless=False):
_cfgstore = {}
if __name__ == '__main__':
redact=None
if '-r' in sys.argv:
redact=True
sys.stdout.write(ConfigManager(None)._real_dump_to_json(redact))
# some unit tests worth implementing:
# set group attribute on lower priority group, result is that node should not
# change