mirror of
https://github.com/xcat2/confluent.git
synced 2024-11-22 17:43:14 +00:00
More explicit follower health checking
Be more careful about collective member health assessment, explicitly closing a follower when it has failure and prefacing collective commands with a check prior to performing the actual changes.
This commit is contained in:
parent
0ed7e4eac3
commit
30398fc434
@ -328,6 +328,11 @@ def exec_on_leader(function, *args):
|
||||
|
||||
|
||||
def exec_on_followers(fnname, *args):
|
||||
pushes = eventlet.GreenPool()
|
||||
# Check health of collective prior to attempting
|
||||
for _ in pushes.starmap(
|
||||
_push_rpc, [(cfgstreams[s], b'') for s in cfgstreams]):
|
||||
pass
|
||||
if len(cfgstreams) < (len(_cfgstore['collective']) // 2):
|
||||
# the leader counts in addition to registered streams
|
||||
raise exc.DegradedCollective()
|
||||
@ -386,9 +391,15 @@ def init_masterkey(password=None, autogen=True):
|
||||
|
||||
def _push_rpc(stream, payload):
|
||||
with _rpclock:
|
||||
stream.sendall(struct.pack('!Q', len(payload)))
|
||||
if len(payload):
|
||||
stream.sendall(payload)
|
||||
try:
|
||||
stream.sendall(struct.pack('!Q', len(payload)))
|
||||
if len(payload):
|
||||
stream.sendall(payload)
|
||||
return True
|
||||
except Exception:
|
||||
logException()
|
||||
del cfgstreams[stream]
|
||||
stream.close()
|
||||
|
||||
|
||||
def decrypt_value(cryptvalue,
|
||||
@ -578,11 +589,18 @@ def relay_slaved_requests(name, listener, vers):
|
||||
lh = StreamHandler(listener)
|
||||
_hasquorum = len(cfgstreams) >= (
|
||||
len(_cfgstore['collective']) // 2)
|
||||
payload = cPickle.dumps({'quorum': _hasquorum}, protocol=lowestver)
|
||||
for _ in pushes.starmap(
|
||||
_push_rpc,
|
||||
[(cfgstreams[s], payload) for s in cfgstreams]):
|
||||
pass
|
||||
_newquorum = None
|
||||
while _hasquorum != _newquorum:
|
||||
if _newquorum is not None:
|
||||
_hasquorum = _newquorum
|
||||
payload = cPickle.dumps({'quorum': _hasquorum}, protocol=lowestver)
|
||||
for _ in pushes.starmap(
|
||||
_push_rpc,
|
||||
[(cfgstreams[s], payload) for s in cfgstreams]):
|
||||
pass
|
||||
_newquorum = len(cfgstreams) >= (
|
||||
len(_cfgstore['collective']) // 2)
|
||||
_hasquorum = _newquorum
|
||||
if _hasquorum and _pending_collective_updates:
|
||||
apply_pending_collective_updates()
|
||||
msg = lh.get_next_msg()
|
||||
@ -604,8 +622,10 @@ def relay_slaved_requests(name, listener, vers):
|
||||
except Exception as e:
|
||||
exc = e
|
||||
if 'xid' in rpc:
|
||||
_push_rpc(listener, cPickle.dumps({'xid': rpc['xid'],
|
||||
res = _push_rpc(listener, cPickle.dumps({'xid': rpc['xid'],
|
||||
'exc': exc}, protocol=vers))
|
||||
if not res:
|
||||
break
|
||||
try:
|
||||
msg = lh.get_next_msg()
|
||||
except Exception:
|
||||
@ -650,7 +670,9 @@ class StreamHandler(object):
|
||||
if confluent.util.monotonic_time() > self.expiry:
|
||||
return None
|
||||
if confluent.util.monotonic_time() > self.keepalive:
|
||||
_push_rpc(self.sock, b'') # nulls are a keepalive
|
||||
res = _push_rpc(self.sock, b'') # nulls are a keepalive
|
||||
if not res:
|
||||
return None
|
||||
self.keepalive = confluent.util.monotonic_time() + 20
|
||||
self.expiry = confluent.util.monotonic_time() + 60
|
||||
msg = self.sock.recv(8)
|
||||
@ -767,7 +789,9 @@ def follow_channel(channel, proto=2):
|
||||
_pendingchangesets[rpc['xid']].send()
|
||||
if 'quorum' in rpc:
|
||||
_hasquorum = rpc['quorum']
|
||||
_push_rpc(channel, b'') # use null as ACK
|
||||
res = _push_rpc(channel, b'') # use null as ACK
|
||||
if not res:
|
||||
break
|
||||
msg = lh.get_next_msg()
|
||||
finally:
|
||||
# mark the connection as broken
|
||||
|
Loading…
Reference in New Issue
Block a user