diff --git a/confluent_server/confluent/asynchttp.py b/confluent_server/confluent/asynchttp.py index ab12c9be..31f3cff8 100644 --- a/confluent_server/confluent/asynchttp.py +++ b/confluent_server/confluent/asynchttp.py @@ -110,9 +110,12 @@ class AsyncSession(object): del _asyncsessions[self.asyncid] def run_handler(self, handler, requestid): - for rsp in handler: - self.add(requestid, rsp) - self.add(requestid, messages.AsyncCompletion()) + try: + for rsp in handler: + self.add(requestid, rsp) + self.add(requestid, messages.AsyncCompletion()) + except Exception as e: + self.add(requestid, e) def get_responses(self, timeout=25): self.reaper.cancel() diff --git a/confluent_server/confluent/messages.py b/confluent_server/confluent/messages.py index 1c778f54..4f43abe1 100644 --- a/confluent_server/confluent/messages.py +++ b/confluent_server/confluent/messages.py @@ -880,6 +880,11 @@ class AsyncMessage(ConfluentMessage): if (isinstance(rsp, ConfluentMessage) or isinstance(rsp, ConfluentNodeError)): rspdict = rsp.raw() + elif isinstance(rsp, exc.ConfluentException): + rspdict = {'exceptioncode': rsp.apierrorcode, + 'exception': rsp.get_error_body()} + elif isinstance(rsp, Exception): + rspdict = {'exceptioncode': 500, 'exception': str(rsp)} elif isinstance(rsp, dict): # console metadata rspdict = rsp else: # terminal text