diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index eebfc332..81966754 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -251,6 +251,8 @@ def start_collective(): for member in cfm.list_collective(): if member == myname: continue + if cfm.cfgleader is None: + cfm.cfgleader = True ldrcandidate = cfm.get_collective_member(member)['address'] connect_to_leader(name=myname, leader=ldrcandidate) diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 91f5074c..e6374bcd 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -212,6 +212,8 @@ def _rpc_set_group_attributes(tenant, attribmap, autocreate): ConfigManager(tenant)._true_set_group_attributes(attribmap, autocreate) def exec_on_leader(function, *args): + if isinstance(cfgleader, bool): + raise exc.DegradedCollective() xid = os.urandom(8) while xid in _pendingchangesets: xid = os.urandom(8) diff --git a/confluent_server/confluent/exceptions.py b/confluent_server/confluent/exceptions.py index 6364db7d..a2c67b68 100644 --- a/confluent_server/confluent/exceptions.py +++ b/confluent_server/confluent/exceptions.py @@ -68,6 +68,11 @@ class LockedCredentials(ConfluentException): _apierrorstr = 'Credential store locked' +class DegradedCollective(ConfluentException): + # We are in a collective with at least half of the member nodes missing + _apierrorstr = 'Collective does not have quorum' + + class ForbiddenRequest(ConfluentException): # The client request is not allowed by authorization engine apierrorcode = 403