2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-01-07 12:06:00 +00:00

Support getting message extention in plugin

This commit is contained in:
Penghui Cui 2021-11-25 10:01:35 +08:00
parent 2e10289807
commit 079a04b28b

View File

@ -948,14 +948,18 @@ def handle_node_request(configmanager, inputdata, operation,
del pathcomponents[0:2]
passvalues = queue.Queue()
plugroute = routespec.routeinfo
msginputdata = msg.get_input_message(
pathcomponents, operation, inputdata, nodes, isnoderange,
configmanager)
_plugin = None
if 'handler' in plugroute: # fixed handler definition, easy enough
if isinstance(plugroute['handler'], str):
hfunc = getattr(pluginmap[plugroute['handler']], operation)
_plugin = pluginmap[plugroute['handler']]
else:
hfunc = getattr(plugroute['handler'], operation)
_plugin = plugroute['handler']
msginputdata = _get_input_data(_plugin, pathcomponents, operation,
inputdata, nodes, isnoderange,
configmanager)
passvalue = hfunc(
nodes=nodes, element=pathcomponents,
configmanager=configmanager,
@ -995,6 +999,7 @@ def handle_node_request(configmanager, inputdata, operation,
continue
if plugpath:
try:
_plugin = pluginmap[plugpath]
hfunc = getattr(pluginmap[plugpath], operation)
except KeyError:
nodesbyhandler[BadPlugin(node, plugpath).error] = [node]
@ -1012,7 +1017,9 @@ def handle_node_request(configmanager, inputdata, operation,
workers.spawn(addtoqueue, passvalues, hfunc, {'nodes': nodesbyhandler[hfunc],
'element': pathcomponents,
'configmanager': configmanager,
'inputdata': msginputdata})
'inputdata': _get_input_data(_plugin, pathcomponents,
operation, inputdata,nodes,
isnoderange, configmanager)})
for manager in nodesbymanager:
numworkers += 1
workers.spawn(addtoqueue, passvalues, dispatch_request, {
@ -1033,6 +1040,17 @@ def handle_node_request(configmanager, inputdata, operation,
# return stripnode(passvalues[0], nodes[0])
def _get_input_data(plugin_ext, pathcomponents, operation, inputdata,
nodes, isnoderange, configmanager):
if plugin_ext is not None and hasattr(plugin_ext, 'get_input_message'):
return plugin_ext.get_input_message(pathcomponents, operation,
inputdata, nodes, isnoderange,
configmanager)
else:
return msg.get_input_message(pathcomponents, operation, inputdata,
nodes, isnoderange,configmanager)
def iterate_queue(numworkers, passvalues, strip=False):
completions = 0
while completions < numworkers: