2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-08-24 12:10:26 +00:00

Cleanup leftover sockets and more reliably be following or leading

Before there was a chance to be in a half state, leading to an inability
to reach consensus on leader.
This commit is contained in:
Jarrod Johnson
2018-07-13 21:20:42 -04:00
parent 80a1bd72e7
commit fd7c428d1f
2 changed files with 47 additions and 29 deletions

View File

@@ -100,9 +100,12 @@ def connect_to_leader(cert=None, name=None, leader=None):
while (len(dbjson) < dbsize):
ndata = remote.recv(dbsize - len(dbjson))
if not ndata:
try:
remote.close()
except Exception:
pass
raise Exception("Error doing initial DB transfer")
dbjson += ndata
cfm.stop_following()
cfm.clear_configuration()
try:
cfm._restore_keys(keydata, None, sync=False)
@@ -117,6 +120,7 @@ def connect_to_leader(cert=None, name=None, leader=None):
sync=False)
cfm.commit_clear()
except Exception:
cfm.stop_following()
cfm.rollback_clear()
raise
currentleader = leader

View File

@@ -463,34 +463,42 @@ def set_global(globalname, value, sync=True):
cfgstreams = {}
def relay_slaved_requests(name, listener):
global cfgleader
stop_following()
if name in cfgstreams:
cfgstreams[name].close()
cfgstreams[name] = listener
msg = listener.recv(8)
while msg:
if name not in cfgstreams:
raise Exception("Unexpected loss of node in followers: " + name)
sz = struct.unpack('!Q', msg)[0]
if sz != 0:
rpc = ''
while len(rpc) < sz:
nrpc = listener.recv(sz - len(rpc))
if not nrpc:
raise Exception('Truncated client error')
rpc += nrpc
rpc = cPickle.loads(rpc)
globals()[rpc['function']](*rpc['args'])
if 'xid' in rpc:
_push_rpc(listener, cPickle.dumps({'xid': rpc['xid']}))
msg = listener.recv(8)
listener.close()
try:
del cfgstreams[name]
except KeyError:
pass # May have already been closed/deleted...
if not cfgstreams and not cfgleader:
stop_following(True)
stop_following()
if name in cfgstreams:
try:
cfgstreams[name].close()
except Exception:
pass
cfgstreams[name] = listener
msg = listener.recv(8)
while msg:
if name not in cfgstreams:
raise Exception("Unexpected loss of node in followers: " + name)
sz = struct.unpack('!Q', msg)[0]
if sz != 0:
rpc = ''
while len(rpc) < sz:
nrpc = listener.recv(sz - len(rpc))
if not nrpc:
raise Exception('Truncated client error')
rpc += nrpc
rpc = cPickle.loads(rpc)
globals()[rpc['function']](*rpc['args'])
if 'xid' in rpc:
_push_rpc(listener, cPickle.dumps({'xid': rpc['xid']}))
msg = listener.recv(8)
finally:
try:
listener.close()
except Exception:
pass
try:
del cfgstreams[name]
except KeyError:
pass # May have already been closed/deleted...
if not cfgstreams and not cfgleader: # last one out, set cfgleader to boolean to mark dead collective
stop_following(True)
def stop_following(replacement=None):
@@ -505,7 +513,10 @@ def stop_following(replacement=None):
def stop_leading():
for stream in list(cfgstreams):
cfgstreams[stream].close()
try:
cfgstreams[stream].close()
except Exception:
pass
try:
del cfgstreams[stream]
except KeyError:
@@ -533,6 +544,8 @@ def clear_configuration():
global _txcount
global _oldcfgstore
global _oldtxcount
stop_leading()
stop_following()
_oldcfgstore = _cfgstore
_oldtxcount = _txcount
_cfgstore = {}
@@ -557,6 +570,7 @@ cfgleader = None
def follow_channel(channel):
global _txcount
stop_leading()
stop_following(channel)
msg = channel.recv(8)
while msg: