mirror of
https://github.com/xcat2/confluent.git
synced 2025-04-15 17:49:34 +00:00
Fix up the async http to actually function
Still need to review the return data to determine best format
This commit is contained in:
parent
7d67ea0685
commit
8fac1ce5da
@ -66,7 +66,7 @@ class AsyncSession(object):
|
||||
self.reaper = eventlet.spawn_after(15, self.destroy)
|
||||
|
||||
def add(self, rsp, requestid):
|
||||
self.responses.append(rsp, requestid)
|
||||
self.responses.append((rsp, requestid))
|
||||
if self._evt:
|
||||
self._evt.send()
|
||||
self._evt = None
|
||||
@ -79,19 +79,19 @@ class AsyncSession(object):
|
||||
|
||||
def run_handler(self, handler, requestid):
|
||||
for rsp in handler:
|
||||
self.add(rsp, requestid)
|
||||
self.add({'_requestdone': True}, requestid)
|
||||
self.add(requestid, rsp)
|
||||
self.add(requestid, messages.AsyncCompletion())
|
||||
|
||||
def get_responses(self, timeout=25):
|
||||
self.reaper.cancel()
|
||||
self.reaper = eventlet.spawn_after(timeout + 15, self.destroy)
|
||||
if self._evt():
|
||||
if self._evt:
|
||||
# TODO(jjohnson2): This precludes the goal of 'double barreled'
|
||||
# access.... revisit if this could matter
|
||||
raise Exception('get_responses is not re-entrant')
|
||||
if not self.responses: # wait to accumulate some
|
||||
self._evt = eventlet.event.Event()
|
||||
with eventlet.Timout(timeout, False):
|
||||
with eventlet.Timeout(timeout, False):
|
||||
self._evt.wait()
|
||||
self._evt = None
|
||||
while self.responses:
|
||||
@ -101,7 +101,7 @@ class AsyncSession(object):
|
||||
def run_handler(hdlr, env):
|
||||
asyncsessid = env['HTTP_CONFLUENTASYNCID']
|
||||
try:
|
||||
asyncsession = _asyncsessions[asyncsessid]
|
||||
asyncsession = _asyncsessions[asyncsessid]['asyncsession']
|
||||
requestid = env['HTTP_CONFLUENTREQUESTID']
|
||||
except KeyError:
|
||||
raise exc.InvalidArgumentException(
|
||||
@ -122,7 +122,7 @@ def handle_async(env, querydict):
|
||||
return
|
||||
currsess = _asyncsessions[querydict['asyncid']]['asyncsession']
|
||||
for rsp in currsess.get_responses():
|
||||
yield rsp
|
||||
yield messages.AsyncMessage(rsp)
|
||||
|
||||
|
||||
|
||||
|
@ -487,7 +487,7 @@ def resourcehandler_backend(env, start_response):
|
||||
hdlr = pluginapi.handle_path(url, operation,
|
||||
cfgmgr, querydict)
|
||||
if 'HTTP_CONFLUENTASYNCID' in env:
|
||||
asynchttp.run_handler(hdlr, env)
|
||||
confluent.asynchttp.run_handler(hdlr, env)
|
||||
start_response('202 Accepted', headers)
|
||||
yield 'Request queued'
|
||||
return
|
||||
|
@ -835,6 +835,24 @@ class EventCollection(ConfluentMessage):
|
||||
self.kvpairs = {name: {'events': eventdata}}
|
||||
|
||||
|
||||
class AsyncCompletion(ConfluentMessage):
|
||||
def __init__(self):
|
||||
self.stripped = True
|
||||
self.notnode = True
|
||||
|
||||
def raw(self):
|
||||
return {'_requestdone': True}
|
||||
|
||||
class AsyncMessage(ConfluentMessage):
|
||||
def __init__(self, pair):
|
||||
self.stripped = True
|
||||
self.notnode = True
|
||||
self.msgpair = pair
|
||||
|
||||
def raw(self):
|
||||
return {'requestid': self.msgpair[0],
|
||||
'response': self.msgpair[1].raw()}
|
||||
|
||||
class AsyncSession(ConfluentMessage):
|
||||
def __init__(self, id):
|
||||
self.desc = 'foo'
|
||||
|
Loading…
x
Reference in New Issue
Block a user