diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 6544aef4..610867e0 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -524,9 +524,14 @@ def relay_slaved_requests(name, listener): raise Exception('Truncated client error') rpc += nrpc rpc = cPickle.loads(rpc) - globals()[rpc['function']](*rpc['args']) + exc = None + try: + globals()[rpc['function']](*rpc['args']) + except Exception as e: + exc = e if 'xid' in rpc: - _push_rpc(listener, cPickle.dumps({'xid': rpc['xid']})) + _push_rpc(listener, cPickle.dumps({'xid': rpc['xid'], + 'exc': exc})) try: msg = lh.get_next_msg() except Exception: @@ -670,9 +675,15 @@ def follow_channel(channel): if 'txcount' in rpc: _txcount = rpc['txcount'] if 'function' in rpc: - globals()[rpc['function']](*rpc['args']) + try: + globals()[rpc['function']](*rpc['args']) + except Exception as e: + print(repr(e)) if 'xid' in rpc and rpc['xid']: - _pendingchangesets[rpc['xid']].send() + if rpc['exc']: + _pendingchangesets[rpc['xid']].send_exception(rpc['exc']) + else: + _pendingchangesets[rpc['xid']].send() if 'quorum' in rpc: _hasquorum = rpc['quorum'] _push_rpc(channel, b'') # use null as ACK