2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-10-25 08:25:36 +00:00

Push quorum state to followers

The followers need to know quorum state.
This commit is contained in:
Jarrod Johnson
2018-07-19 13:27:21 -04:00
parent 10c82a72b5
commit c3c0e1570a

View File

@@ -89,6 +89,7 @@ statelessmode = False
_cfgstore = None
_pendingchangesets = {}
_txcount = 0
_hasquorum = True
_attraliases = {
'bmc': 'hardwaremanagement.manager',
@@ -240,6 +241,8 @@ def check_quorum():
if cfgstreams and len(cfgstreams) < (len(_cfgstore['collective']) // 2):
# the leader counts in addition to registered streams
raise exc.DegradedCollective()
if cfgleader and not _hasquorum:
raise exc.DegradedCollective()
def exec_on_leader(function, *args):
@@ -484,6 +487,8 @@ def set_global(globalname, value, sync=True):
cfgstreams = {}
def relay_slaved_requests(name, listener):
global cfgleader
global _hasquorum
pushes = eventlet.GreenPool()
if name not in _followerlocks:
_followerlocks[name] = gthread.RLock()
with _followerlocks[name]:
@@ -494,8 +499,16 @@ def relay_slaved_requests(name, listener):
cfgstreams[name].close()
except Exception:
pass
del cfgstreams[name]
cfgstreams[name] = listener
lh = StreamHandler(listener)
_hasquorum = len(cfgstreams) >= (
len(_cfgstore['collective']) // 2)
payload = cPickle.dumps({'quorum': _hasquorum})
for _ in pushes.starmap(
_push_rpc,
[(cfgstreams[s], payload) for s in cfgstreams]):
pass
msg = lh.get_next_msg()
while msg:
if name not in cfgstreams:
@@ -525,6 +538,14 @@ def relay_slaved_requests(name, listener):
del cfgstreams[name]
except KeyError:
pass # May have already been closed/deleted...
if cfgstreams:
_hasquorum = len(cfgstreams) >= (
len(_cfgstore['collective']) // 2)
payload = cPickle.dumps({'quorum': _hasquorum})
for _ in pushes.starmap(
_push_rpc,
[(cfgstreams[s], payload) for s in cfgstreams]):
pass
if not cfgstreams and not cfgleader: # last one out, set cfgleader to boolean to mark dead collective
stop_following(True)
@@ -627,6 +648,7 @@ cfgleader = None
def follow_channel(channel):
global _txcount
global _hasquorum
stop_leading()
stop_following(channel)
lh = StreamHandler(channel)
@@ -647,6 +669,8 @@ def follow_channel(channel):
globals()[rpc['function']](*rpc['args'])
if 'xid' in rpc and rpc['xid']:
_pendingchangesets[rpc['xid']].send()
if 'quorum' in rpc:
_hasquorum = rpc['quorum']
_push_rpc(channel, b'') # use null as ACK
msg = lh.get_next_msg()
# mark the connection as broken