2
0
mirror of https://github.com/xcat2/confluent.git synced 2024-11-23 01:53:28 +00:00

Force cleanup if follow thread dies of exception

If something killed a follow thread, it was not always able to fire the
recovery off.  Wrap the risky code in a try statement.
This commit is contained in:
Jarrod Johnson 2018-08-20 15:02:34 -04:00
parent df7cba00fd
commit 784e4bed2f
2 changed files with 43 additions and 39 deletions

View File

@ -134,11 +134,13 @@ def connect_to_leader(cert=None, name=None, leader=None):
def follow_leader(remote):
global currentleader
cfm.follow_channel(remote)
# The leader has folded, time to startup again...
cfm.stop_following()
currentleader = None
eventlet.spawn_n(start_collective)
try:
cfm.follow_channel(remote)
finally:
# The leader has folded, time to startup again...
cfm.stop_following()
currentleader = None
eventlet.spawn_n(start_collective)
def connect_to_collective(cert, member):

View File

@ -658,41 +658,43 @@ cfgleader = None
def follow_channel(channel):
global _txcount
global _hasquorum
stop_leading()
stop_following(channel)
lh = StreamHandler(channel)
msg = lh.get_next_msg()
while msg:
sz = struct.unpack('!Q', msg)[0]
if sz != 0:
rpc = ''
while len(rpc) < sz:
nrpc = channel.recv(sz - len(rpc))
if not nrpc:
raise Exception('Truncated message error')
rpc += nrpc
rpc = cPickle.loads(rpc)
if 'txcount' in rpc:
_txcount = rpc['txcount']
if 'function' in rpc:
try:
globals()[rpc['function']](*rpc['args'])
except Exception as e:
print(repr(e))
if 'xid' in rpc and rpc['xid']:
if rpc.get('exc', None):
_pendingchangesets[rpc['xid']].send_exception(rpc['exc'])
else:
_pendingchangesets[rpc['xid']].send()
if 'quorum' in rpc:
_hasquorum = rpc['quorum']
_push_rpc(channel, b'') # use null as ACK
try:
stop_leading()
stop_following(channel)
lh = StreamHandler(channel)
msg = lh.get_next_msg()
# mark the connection as broken
if cfgstreams:
stop_following(None)
else:
stop_following(True)
while msg:
sz = struct.unpack('!Q', msg)[0]
if sz != 0:
rpc = ''
while len(rpc) < sz:
nrpc = channel.recv(sz - len(rpc))
if not nrpc:
raise Exception('Truncated message error')
rpc += nrpc
rpc = cPickle.loads(rpc)
if 'txcount' in rpc:
_txcount = rpc['txcount']
if 'function' in rpc:
try:
globals()[rpc['function']](*rpc['args'])
except Exception as e:
print(repr(e))
if 'xid' in rpc and rpc['xid']:
if rpc.get('exc', None):
_pendingchangesets[rpc['xid']].send_exception(rpc['exc'])
else:
_pendingchangesets[rpc['xid']].send()
if 'quorum' in rpc:
_hasquorum = rpc['quorum']
_push_rpc(channel, b'') # use null as ACK
msg = lh.get_next_msg()
finally:
# mark the connection as broken
if cfgstreams:
stop_following(None)
else:
stop_following(True)
def add_collective_member(name, address, fingerprint):