diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index efc5f630..ae52e287 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -89,6 +89,7 @@ statelessmode = False _cfgstore = None _pendingchangesets = {} _txcount = 0 +_hasquorum = True _attraliases = { 'bmc': 'hardwaremanagement.manager', @@ -240,6 +241,8 @@ def check_quorum(): if cfgstreams and len(cfgstreams) < (len(_cfgstore['collective']) // 2): # the leader counts in addition to registered streams raise exc.DegradedCollective() + if cfgleader and not _hasquorum: + raise exc.DegradedCollective() def exec_on_leader(function, *args): @@ -484,6 +487,8 @@ def set_global(globalname, value, sync=True): cfgstreams = {} def relay_slaved_requests(name, listener): global cfgleader + global _hasquorum + pushes = eventlet.GreenPool() if name not in _followerlocks: _followerlocks[name] = gthread.RLock() with _followerlocks[name]: @@ -494,8 +499,16 @@ def relay_slaved_requests(name, listener): cfgstreams[name].close() except Exception: pass + del cfgstreams[name] cfgstreams[name] = listener lh = StreamHandler(listener) + _hasquorum = len(cfgstreams) >= ( + len(_cfgstore['collective']) // 2) + payload = cPickle.dumps({'quorum': _hasquorum}) + for _ in pushes.starmap( + _push_rpc, + [(cfgstreams[s], payload) for s in cfgstreams]): + pass msg = lh.get_next_msg() while msg: if name not in cfgstreams: @@ -525,6 +538,14 @@ def relay_slaved_requests(name, listener): del cfgstreams[name] except KeyError: pass # May have already been closed/deleted... + if cfgstreams: + _hasquorum = len(cfgstreams) >= ( + len(_cfgstore['collective']) // 2) + payload = cPickle.dumps({'quorum': _hasquorum}) + for _ in pushes.starmap( + _push_rpc, + [(cfgstreams[s], payload) for s in cfgstreams]): + pass if not cfgstreams and not cfgleader: # last one out, set cfgleader to boolean to mark dead collective stop_following(True) @@ -627,6 +648,7 @@ cfgleader = None def follow_channel(channel): global _txcount + global _hasquorum stop_leading() stop_following(channel) lh = StreamHandler(channel) @@ -647,6 +669,8 @@ def follow_channel(channel): globals()[rpc['function']](*rpc['args']) if 'xid' in rpc and rpc['xid']: _pendingchangesets[rpc['xid']].send() + if 'quorum' in rpc: + _hasquorum = rpc['quorum'] _push_rpc(channel, b'') # use null as ACK msg = lh.get_next_msg() # mark the connection as broken