diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index d27c5e9b..7bcf4e49 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -211,6 +211,15 @@ def _rpc_set_node_attributes(tenant, attribmap, autocreate): def _rpc_set_group_attributes(tenant, attribmap, autocreate): ConfigManager(tenant)._true_set_group_attributes(attribmap, autocreate) + +def check_quorum(): + if isinstance(cfgleader, bool): + raise exc.DegradedCollective() + if cfgstreams and len(cfgstreams) < (len(_cfgstore['collective']) // 2): + # the leader counts in addition to registered streams + raise exc.DegradedCollective() + + def exec_on_leader(function, *args): if isinstance(cfgleader, bool): raise exc.DegradedCollective() @@ -231,9 +240,9 @@ def exec_on_leader(function, *args): def exec_on_followers(fnname, *args): global _txcount _txcount += 1 - if len(cfgstreams) < (len(_cfgstore['collective']) // 2) : + if len(cfgstreams) < (len(_cfgstore['collective']) // 2): # the leader counts in addition to registered streams - raise Exception("collective does not have quorum") + raise exc.DegradedCollective() pushes = eventlet.GreenPool() payload = cPickle.dumps({'function': fnname, 'args': args, 'txcount': _txcount}) diff --git a/confluent_server/confluent/core.py b/confluent_server/confluent/core.py index 26a7f9da..d70cfc7f 100644 --- a/confluent_server/confluent/core.py +++ b/confluent_server/confluent/core.py @@ -744,6 +744,7 @@ def handle_node_request(configmanager, inputdata, operation, elif 'default' in plugroute: plugpath = plugroute['default'] if plugpath in dispatch_plugins: + cfm.check_quorum() manager = nodeattr[node].get('collective.manager', {}).get( 'value', None) if manager: