mirror of
https://github.com/xcat2/confluent.git
synced 2025-01-09 04:56:12 +00:00
Parallelize cross-manager requests
Rather than doing it at one at a time, parallelize the requests for improved performance.
This commit is contained in:
parent
3ab4203104
commit
a7b8f0ab0c
@ -349,9 +349,9 @@ class ConsoleHandler(object):
|
||||
self.reconnect.cancel()
|
||||
self.reconnect = None
|
||||
try:
|
||||
self._console = plugin.handle_path(
|
||||
self._console = list(plugin.handle_path(
|
||||
self._plugin_path.format(self.node),
|
||||
"create", self.cfgmgr)
|
||||
"create", self.cfgmgr))[0]
|
||||
except (exc.NotImplementedException, exc.NotFoundException):
|
||||
self._console = None
|
||||
except:
|
||||
|
@ -55,7 +55,9 @@ except ImportError:
|
||||
# Only required for collective mode
|
||||
crypto = None
|
||||
import confluent.util as util
|
||||
import eventlet.greenpool as greenpool
|
||||
import eventlet.green.ssl as ssl
|
||||
import eventlet.queue as queue
|
||||
import itertools
|
||||
import os
|
||||
try:
|
||||
@ -711,7 +713,7 @@ def handle_node_request(configmanager, inputdata, operation,
|
||||
else:
|
||||
raise Exception("TODO here")
|
||||
del pathcomponents[0:2]
|
||||
passvalues = []
|
||||
passvalues = queue.Queue()
|
||||
plugroute = routespec.routeinfo
|
||||
inputdata = msg.get_input_message(
|
||||
pathcomponents, operation, inputdata, nodes, isnoderange,
|
||||
@ -764,24 +766,25 @@ def handle_node_request(configmanager, inputdata, operation,
|
||||
nodesbyhandler[hfunc].append(node)
|
||||
else:
|
||||
nodesbyhandler[hfunc] = [node]
|
||||
workers = greenpool.GreenPool()
|
||||
numworkers = 0
|
||||
for hfunc in nodesbyhandler:
|
||||
passvalues.append(hfunc(
|
||||
nodes=nodesbyhandler[hfunc], element=pathcomponents,
|
||||
configmanager=configmanager,
|
||||
inputdata=inputdata))
|
||||
numworkers += 1
|
||||
workers.spawn(addtoqueue, passvalues, hfunc, {'nodes': nodesbyhandler[hfunc],
|
||||
'element': pathcomponents,
|
||||
'configmanager': configmanager,
|
||||
'inputdata': inputdata})
|
||||
for manager in nodesbymanager:
|
||||
passvalues.append(dispatch_request(
|
||||
nodes=nodesbymanager[manager], manager=manager,
|
||||
element=pathcomponents, configmanager=configmanager,
|
||||
inputdata=inputdata, operation=operation))
|
||||
numworkers += 1
|
||||
workers.spawn(addtoqueue, passvalues, dispatch_request, {
|
||||
'nodes': nodesbymanager[manager], 'manager': manager,
|
||||
'element': pathcomponents, 'configmanager': configmanager,
|
||||
'inputdata': inputdata, 'operation': operation})
|
||||
if isnoderange or not autostrip:
|
||||
return itertools.chain(*passvalues)
|
||||
return iterate_queue(numworkers, passvalues)
|
||||
else:
|
||||
if len(passvalues) > 0:
|
||||
if isinstance(passvalues[0], console.Console):
|
||||
return passvalues[0]
|
||||
else:
|
||||
return stripnode(passvalues[0], nodes[0])
|
||||
if numworkers > 0:
|
||||
return iterate_queue(numworkers, passvalues, nodes[0])
|
||||
else:
|
||||
raise exc.NotImplementedException()
|
||||
|
||||
@ -791,6 +794,30 @@ def handle_node_request(configmanager, inputdata, operation,
|
||||
# return stripnode(passvalues[0], nodes[0])
|
||||
|
||||
|
||||
def iterate_queue(numworkers, passvalues, strip=False):
|
||||
completions = 0
|
||||
while completions < numworkers:
|
||||
nv = passvalues.get()
|
||||
if nv == 'theend':
|
||||
completions += 1
|
||||
else:
|
||||
if strip and not isinstance(nv, console.Console):
|
||||
nv.strip_node(strip)
|
||||
yield nv
|
||||
|
||||
|
||||
def addtoqueue(theq, fun, kwargs):
|
||||
try:
|
||||
result = fun(**kwargs)
|
||||
if isinstance(result, console.Console):
|
||||
theq.put(result)
|
||||
else:
|
||||
for pv in result:
|
||||
theq.put(pv)
|
||||
finally:
|
||||
theq.put('theend')
|
||||
|
||||
|
||||
def dispatch_request(nodes, manager, element, configmanager, inputdata,
|
||||
operation):
|
||||
a = configmanager.get_collective_member(manager)
|
||||
|
Loading…
Reference in New Issue
Block a user