From 0d936e0059db0ab6c5bd4cb6dd6c31aa9fa497d7 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Tue, 17 Jul 2018 10:36:31 -0400 Subject: [PATCH] Ensure no more than one in-flight slave connection from a given follower This will prevent a connection from deregistering itself after the replacement registers itself. --- .../confluent/config/configmanager.py | 83 ++++++++++--------- 1 file changed, 45 insertions(+), 38 deletions(-) diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index c6946686..36491b8a 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -80,6 +80,7 @@ _masterintegritykey = None _dirtylock = threading.RLock() _leaderlock = gthread.RLock() _synclock = threading.RLock() +_followerlocks = {} _config_areas = ('nodegroups', 'nodes', 'usergroups', 'users') tracelog = None statelessmode = False @@ -463,48 +464,51 @@ def set_global(globalname, value, sync=True): cfgstreams = {} def relay_slaved_requests(name, listener): global cfgleader - try: - stop_following() - if name in cfgstreams: - try: - cfgstreams[name].close() - except Exception: - pass - cfgstreams[name] = listener + if name not in _followerlocks: + _followerlocks[name] = gthread.RLock() + with _followerlocks[name]: try: - msg = listener.recv(8) - except Exception: - msg = None - while msg: - if name not in cfgstreams: - raise Exception("Unexpected loss of node in followers: " + name) - sz = struct.unpack('!Q', msg)[0] - if sz != 0: - rpc = '' - while len(rpc) < sz: - nrpc = listener.recv(sz - len(rpc)) - if not nrpc: - raise Exception('Truncated client error') - rpc += nrpc - rpc = cPickle.loads(rpc) - globals()[rpc['function']](*rpc['args']) - if 'xid' in rpc: - _push_rpc(listener, cPickle.dumps({'xid': rpc['xid']})) + stop_following() + if name in cfgstreams: + try: + cfgstreams[name].close() + except Exception: + pass + cfgstreams[name] = listener try: msg = listener.recv(8) except Exception: msg = None - finally: - try: - listener.close() - except Exception: - pass - try: - del cfgstreams[name] - except KeyError: - pass # May have already been closed/deleted... - if not cfgstreams and not cfgleader: # last one out, set cfgleader to boolean to mark dead collective - stop_following(True) + while msg: + if name not in cfgstreams: + raise Exception("Unexpected loss of node in followers: " + name) + sz = struct.unpack('!Q', msg)[0] + if sz != 0: + rpc = '' + while len(rpc) < sz: + nrpc = listener.recv(sz - len(rpc)) + if not nrpc: + raise Exception('Truncated client error') + rpc += nrpc + rpc = cPickle.loads(rpc) + globals()[rpc['function']](*rpc['args']) + if 'xid' in rpc: + _push_rpc(listener, cPickle.dumps({'xid': rpc['xid']})) + try: + msg = listener.recv(8) + except Exception: + msg = None + finally: + try: + listener.close() + except Exception: + pass + try: + del cfgstreams[name] + except KeyError: + pass # May have already been closed/deleted... + if not cfgstreams and not cfgleader: # last one out, set cfgleader to boolean to mark dead collective + stop_following(True) def stop_following(replacement=None): @@ -603,7 +607,10 @@ def follow_channel(channel): except Exception: msg = None # mark the connection as broken - stop_following(True) + if cfgstreams: + stop_following(None) + else: + stop_following(True) def add_collective_member(name, address, fingerprint):