diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index c45cb9b5..868b1aed 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -340,6 +340,8 @@ def handle_connection(connection, cert, request, local=False): connection.getpeername()[0]) connection.close() return + cfm.update_collective_address(request['name'], + connection.getpeername()[0]) tlvdata.send(connection, cfm._dump_keys(None, False)) tlvdata.send(connection, cfm._cfgstore['collective']) tlvdata.send(connection, cfm.get_globals()) diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index ae52e287..5193d775 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -509,6 +509,8 @@ def relay_slaved_requests(name, listener): _push_rpc, [(cfgstreams[s], payload) for s in cfgstreams]): pass + if _hasquorum and _pending_collective_updates: + apply_pending_collective_updates() msg = lh.get_next_msg() while msg: if name not in cfgstreams: @@ -687,6 +689,28 @@ def add_collective_member(name, address, fingerprint): exec_on_followers('_true_add_collective_member', name, address, fingerprint) _true_add_collective_member(name, address, fingerprint) +_pending_collective_updates = {} + + +def update_collective_address(name ,address): + fprint = _cfgstore['collective'][name]['fingerprint'] + oldaddress = _cfgstore['collective'][name]['address'] + if oldaddress == address: + return + try: + check_quorum() + add_collective_member(name, address, fprint) + except exc.DegradedCollective: + _pending_collective_updates[name] = address + + +def apply_pending_collective_updates(): + for name in list(_pending_collective_updates): + fprint = _cfgstore['collective'][name]['fingerprint'] + address = _pending_collective_updates[name] + add_collective_member(name, address, fprint) + del _pending_collective_updates[name] + def _true_add_collective_member(name, address, fingerprint, sync=True): try: