diff --git a/confluent_server/confluent/asynchttp.py b/confluent_server/confluent/asynchttp.py index 65d70818..64dca6a9 100644 --- a/confluent_server/confluent/asynchttp.py +++ b/confluent_server/confluent/asynchttp.py @@ -66,7 +66,7 @@ class AsyncSession(object): self.reaper = eventlet.spawn_after(15, self.destroy) def add(self, rsp, requestid): - self.responses.append(rsp, requestid) + self.responses.append((rsp, requestid)) if self._evt: self._evt.send() self._evt = None @@ -79,19 +79,19 @@ class AsyncSession(object): def run_handler(self, handler, requestid): for rsp in handler: - self.add(rsp, requestid) - self.add({'_requestdone': True}, requestid) + self.add(requestid, rsp) + self.add(requestid, messages.AsyncCompletion()) def get_responses(self, timeout=25): self.reaper.cancel() self.reaper = eventlet.spawn_after(timeout + 15, self.destroy) - if self._evt(): + if self._evt: # TODO(jjohnson2): This precludes the goal of 'double barreled' # access.... revisit if this could matter raise Exception('get_responses is not re-entrant') if not self.responses: # wait to accumulate some self._evt = eventlet.event.Event() - with eventlet.Timout(timeout, False): + with eventlet.Timeout(timeout, False): self._evt.wait() self._evt = None while self.responses: @@ -101,7 +101,7 @@ class AsyncSession(object): def run_handler(hdlr, env): asyncsessid = env['HTTP_CONFLUENTASYNCID'] try: - asyncsession = _asyncsessions[asyncsessid] + asyncsession = _asyncsessions[asyncsessid]['asyncsession'] requestid = env['HTTP_CONFLUENTREQUESTID'] except KeyError: raise exc.InvalidArgumentException( @@ -122,7 +122,7 @@ def handle_async(env, querydict): return currsess = _asyncsessions[querydict['asyncid']]['asyncsession'] for rsp in currsess.get_responses(): - yield rsp + yield messages.AsyncMessage(rsp) diff --git a/confluent_server/confluent/httpapi.py b/confluent_server/confluent/httpapi.py index 4769bad8..755c42e9 100644 --- a/confluent_server/confluent/httpapi.py +++ b/confluent_server/confluent/httpapi.py @@ -487,7 +487,7 @@ def resourcehandler_backend(env, start_response): hdlr = pluginapi.handle_path(url, operation, cfgmgr, querydict) if 'HTTP_CONFLUENTASYNCID' in env: - asynchttp.run_handler(hdlr, env) + confluent.asynchttp.run_handler(hdlr, env) start_response('202 Accepted', headers) yield 'Request queued' return diff --git a/confluent_server/confluent/messages.py b/confluent_server/confluent/messages.py index 78845ee4..8233ae89 100644 --- a/confluent_server/confluent/messages.py +++ b/confluent_server/confluent/messages.py @@ -835,6 +835,24 @@ class EventCollection(ConfluentMessage): self.kvpairs = {name: {'events': eventdata}} +class AsyncCompletion(ConfluentMessage): + def __init__(self): + self.stripped = True + self.notnode = True + + def raw(self): + return {'_requestdone': True} + +class AsyncMessage(ConfluentMessage): + def __init__(self, pair): + self.stripped = True + self.notnode = True + self.msgpair = pair + + def raw(self): + return {'requestid': self.msgpair[0], + 'response': self.msgpair[1].raw()} + class AsyncSession(ConfluentMessage): def __init__(self, id): self.desc = 'foo'