2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-08-28 05:50:30 +00:00

Merge pull request #75 from jjohnson42/reworkworkers

Add a recourse to drain queue
This commit is contained in:
Jarrod Johnson
2017-04-25 16:51:32 -04:00
committed by GitHub

View File

@@ -290,6 +290,15 @@ def perform_requests(operator, nodes, element, cfg, inputdata):
for t in list(livingthreads):
if t.dead:
livingthreads.discard(t)
try:
# drain queue if a thread put something on the queue and died
while True:
datum = resultdata.get_nowait()
if datum != 'Done':
yield datum
except queue.Empty:
pass
def perform_request(operator, node, element,