From 30398fc434e3b6f169652c2c141c56c4a28971e4 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Mon, 11 Nov 2019 10:49:25 -0500 Subject: [PATCH] More explicit follower health checking Be more careful about collective member health assessment, explicitly closing a follower when it has failure and prefacing collective commands with a check prior to performing the actual changes. --- .../confluent/config/configmanager.py | 46 ++++++++++++++----- 1 file changed, 35 insertions(+), 11 deletions(-) diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index d258f981..77fd730e 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -328,6 +328,11 @@ def exec_on_leader(function, *args): def exec_on_followers(fnname, *args): + pushes = eventlet.GreenPool() + # Check health of collective prior to attempting + for _ in pushes.starmap( + _push_rpc, [(cfgstreams[s], b'') for s in cfgstreams]): + pass if len(cfgstreams) < (len(_cfgstore['collective']) // 2): # the leader counts in addition to registered streams raise exc.DegradedCollective() @@ -386,9 +391,15 @@ def init_masterkey(password=None, autogen=True): def _push_rpc(stream, payload): with _rpclock: - stream.sendall(struct.pack('!Q', len(payload))) - if len(payload): - stream.sendall(payload) + try: + stream.sendall(struct.pack('!Q', len(payload))) + if len(payload): + stream.sendall(payload) + return True + except Exception: + logException() + del cfgstreams[stream] + stream.close() def decrypt_value(cryptvalue, @@ -578,11 +589,18 @@ def relay_slaved_requests(name, listener, vers): lh = StreamHandler(listener) _hasquorum = len(cfgstreams) >= ( len(_cfgstore['collective']) // 2) - payload = cPickle.dumps({'quorum': _hasquorum}, protocol=lowestver) - for _ in pushes.starmap( - _push_rpc, - [(cfgstreams[s], payload) for s in cfgstreams]): - pass + _newquorum = None + while _hasquorum != _newquorum: + if _newquorum is not None: + _hasquorum = _newquorum + payload = cPickle.dumps({'quorum': _hasquorum}, protocol=lowestver) + for _ in pushes.starmap( + _push_rpc, + [(cfgstreams[s], payload) for s in cfgstreams]): + pass + _newquorum = len(cfgstreams) >= ( + len(_cfgstore['collective']) // 2) + _hasquorum = _newquorum if _hasquorum and _pending_collective_updates: apply_pending_collective_updates() msg = lh.get_next_msg() @@ -604,8 +622,10 @@ def relay_slaved_requests(name, listener, vers): except Exception as e: exc = e if 'xid' in rpc: - _push_rpc(listener, cPickle.dumps({'xid': rpc['xid'], + res = _push_rpc(listener, cPickle.dumps({'xid': rpc['xid'], 'exc': exc}, protocol=vers)) + if not res: + break try: msg = lh.get_next_msg() except Exception: @@ -650,7 +670,9 @@ class StreamHandler(object): if confluent.util.monotonic_time() > self.expiry: return None if confluent.util.monotonic_time() > self.keepalive: - _push_rpc(self.sock, b'') # nulls are a keepalive + res = _push_rpc(self.sock, b'') # nulls are a keepalive + if not res: + return None self.keepalive = confluent.util.monotonic_time() + 20 self.expiry = confluent.util.monotonic_time() + 60 msg = self.sock.recv(8) @@ -767,7 +789,9 @@ def follow_channel(channel, proto=2): _pendingchangesets[rpc['xid']].send() if 'quorum' in rpc: _hasquorum = rpc['quorum'] - _push_rpc(channel, b'') # use null as ACK + res = _push_rpc(channel, b'') # use null as ACK + if not res: + break msg = lh.get_next_msg() finally: # mark the connection as broken