2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-04-13 16:57:59 +00:00

Add quorum check to non-config requests

This commit is contained in:
Jarrod Johnson 2018-06-26 13:52:20 -04:00
parent 7433dd3e38
commit 11e6145a46
2 changed files with 12 additions and 2 deletions

View File

@ -211,6 +211,15 @@ def _rpc_set_node_attributes(tenant, attribmap, autocreate):
def _rpc_set_group_attributes(tenant, attribmap, autocreate):
ConfigManager(tenant)._true_set_group_attributes(attribmap, autocreate)
def check_quorum():
if isinstance(cfgleader, bool):
raise exc.DegradedCollective()
if cfgstreams and len(cfgstreams) < (len(_cfgstore['collective']) // 2):
# the leader counts in addition to registered streams
raise exc.DegradedCollective()
def exec_on_leader(function, *args):
if isinstance(cfgleader, bool):
raise exc.DegradedCollective()
@ -231,9 +240,9 @@ def exec_on_leader(function, *args):
def exec_on_followers(fnname, *args):
global _txcount
_txcount += 1
if len(cfgstreams) < (len(_cfgstore['collective']) // 2) :
if len(cfgstreams) < (len(_cfgstore['collective']) // 2):
# the leader counts in addition to registered streams
raise Exception("collective does not have quorum")
raise exc.DegradedCollective()
pushes = eventlet.GreenPool()
payload = cPickle.dumps({'function': fnname, 'args': args,
'txcount': _txcount})

View File

@ -744,6 +744,7 @@ def handle_node_request(configmanager, inputdata, operation,
elif 'default' in plugroute:
plugpath = plugroute['default']
if plugpath in dispatch_plugins:
cfm.check_quorum()
manager = nodeattr[node].get('collective.manager', {}).get(
'value', None)
if manager: