2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-01-28 11:57:37 +00:00

Fix collective on rpc exception

Exceptions on collective calls were not correctly handled, fix
the handling so that collective continues and also the calling function
is correctly given the exception.
This commit is contained in:
Jarrod Johnson 2018-07-30 09:33:24 -04:00
parent 6a8e24dd0e
commit 36a202842a

View File

@ -524,9 +524,14 @@ def relay_slaved_requests(name, listener):
raise Exception('Truncated client error')
rpc += nrpc
rpc = cPickle.loads(rpc)
globals()[rpc['function']](*rpc['args'])
exc = None
try:
globals()[rpc['function']](*rpc['args'])
except Exception as e:
exc = e
if 'xid' in rpc:
_push_rpc(listener, cPickle.dumps({'xid': rpc['xid']}))
_push_rpc(listener, cPickle.dumps({'xid': rpc['xid'],
'exc': exc}))
try:
msg = lh.get_next_msg()
except Exception:
@ -670,9 +675,15 @@ def follow_channel(channel):
if 'txcount' in rpc:
_txcount = rpc['txcount']
if 'function' in rpc:
globals()[rpc['function']](*rpc['args'])
try:
globals()[rpc['function']](*rpc['args'])
except Exception as e:
print(repr(e))
if 'xid' in rpc and rpc['xid']:
_pendingchangesets[rpc['xid']].send()
if rpc['exc']:
_pendingchangesets[rpc['xid']].send_exception(rpc['exc'])
else:
_pendingchangesets[rpc['xid']].send()
if 'quorum' in rpc:
_hasquorum = rpc['quorum']
_push_rpc(channel, b'') # use null as ACK