2
0
mirror of https://github.com/xcat2/confluent.git synced 2024-11-25 19:10:10 +00:00

Ensure no more than one in-flight slave connection from a given follower

This will prevent a connection from deregistering itself after the
replacement registers itself.
This commit is contained in:
Jarrod Johnson 2018-07-17 10:36:31 -04:00
parent a7b8f0ab0c
commit 0d936e0059

View File

@ -80,6 +80,7 @@ _masterintegritykey = None
_dirtylock = threading.RLock()
_leaderlock = gthread.RLock()
_synclock = threading.RLock()
_followerlocks = {}
_config_areas = ('nodegroups', 'nodes', 'usergroups', 'users')
tracelog = None
statelessmode = False
@ -463,48 +464,51 @@ def set_global(globalname, value, sync=True):
cfgstreams = {}
def relay_slaved_requests(name, listener):
global cfgleader
try:
stop_following()
if name in cfgstreams:
try:
cfgstreams[name].close()
except Exception:
pass
cfgstreams[name] = listener
if name not in _followerlocks:
_followerlocks[name] = gthread.RLock()
with _followerlocks[name]:
try:
msg = listener.recv(8)
except Exception:
msg = None
while msg:
if name not in cfgstreams:
raise Exception("Unexpected loss of node in followers: " + name)
sz = struct.unpack('!Q', msg)[0]
if sz != 0:
rpc = ''
while len(rpc) < sz:
nrpc = listener.recv(sz - len(rpc))
if not nrpc:
raise Exception('Truncated client error')
rpc += nrpc
rpc = cPickle.loads(rpc)
globals()[rpc['function']](*rpc['args'])
if 'xid' in rpc:
_push_rpc(listener, cPickle.dumps({'xid': rpc['xid']}))
stop_following()
if name in cfgstreams:
try:
cfgstreams[name].close()
except Exception:
pass
cfgstreams[name] = listener
try:
msg = listener.recv(8)
except Exception:
msg = None
finally:
try:
listener.close()
except Exception:
pass
try:
del cfgstreams[name]
except KeyError:
pass # May have already been closed/deleted...
if not cfgstreams and not cfgleader: # last one out, set cfgleader to boolean to mark dead collective
stop_following(True)
while msg:
if name not in cfgstreams:
raise Exception("Unexpected loss of node in followers: " + name)
sz = struct.unpack('!Q', msg)[0]
if sz != 0:
rpc = ''
while len(rpc) < sz:
nrpc = listener.recv(sz - len(rpc))
if not nrpc:
raise Exception('Truncated client error')
rpc += nrpc
rpc = cPickle.loads(rpc)
globals()[rpc['function']](*rpc['args'])
if 'xid' in rpc:
_push_rpc(listener, cPickle.dumps({'xid': rpc['xid']}))
try:
msg = listener.recv(8)
except Exception:
msg = None
finally:
try:
listener.close()
except Exception:
pass
try:
del cfgstreams[name]
except KeyError:
pass # May have already been closed/deleted...
if not cfgstreams and not cfgleader: # last one out, set cfgleader to boolean to mark dead collective
stop_following(True)
def stop_following(replacement=None):
@ -603,7 +607,10 @@ def follow_channel(channel):
except Exception:
msg = None
# mark the connection as broken
stop_following(True)
if cfgstreams:
stop_following(None)
else:
stop_following(True)
def add_collective_member(name, address, fingerprint):