2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-01-13 11:17:49 +00:00

Handle updating address of collective member on connect

If a collective member changes its IP address, update at the next
possible opportunity.
This commit is contained in:
Jarrod Johnson 2018-07-19 15:24:08 -04:00
parent b053d41cd8
commit 7d16c943a8
2 changed files with 26 additions and 0 deletions

View File

@ -340,6 +340,8 @@ def handle_connection(connection, cert, request, local=False):
connection.getpeername()[0])
connection.close()
return
cfm.update_collective_address(request['name'],
connection.getpeername()[0])
tlvdata.send(connection, cfm._dump_keys(None, False))
tlvdata.send(connection, cfm._cfgstore['collective'])
tlvdata.send(connection, cfm.get_globals())

View File

@ -509,6 +509,8 @@ def relay_slaved_requests(name, listener):
_push_rpc,
[(cfgstreams[s], payload) for s in cfgstreams]):
pass
if _hasquorum and _pending_collective_updates:
apply_pending_collective_updates()
msg = lh.get_next_msg()
while msg:
if name not in cfgstreams:
@ -687,6 +689,28 @@ def add_collective_member(name, address, fingerprint):
exec_on_followers('_true_add_collective_member', name, address, fingerprint)
_true_add_collective_member(name, address, fingerprint)
_pending_collective_updates = {}
def update_collective_address(name ,address):
fprint = _cfgstore['collective'][name]['fingerprint']
oldaddress = _cfgstore['collective'][name]['address']
if oldaddress == address:
return
try:
check_quorum()
add_collective_member(name, address, fprint)
except exc.DegradedCollective:
_pending_collective_updates[name] = address
def apply_pending_collective_updates():
for name in list(_pending_collective_updates):
fprint = _cfgstore['collective'][name]['fingerprint']
address = _pending_collective_updates[name]
add_collective_member(name, address, fprint)
del _pending_collective_updates[name]
def _true_add_collective_member(name, address, fingerprint, sync=True):
try: