From 079a04b28b74f45bcfe090d4129a715c4491c723 Mon Sep 17 00:00:00 2001 From: Penghui Cui Date: Thu, 25 Nov 2021 10:01:35 +0800 Subject: [PATCH] Support getting message extention in plugin --- confluent_server/confluent/core.py | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/confluent_server/confluent/core.py b/confluent_server/confluent/core.py index 8b833587..fd9aa58e 100644 --- a/confluent_server/confluent/core.py +++ b/confluent_server/confluent/core.py @@ -948,14 +948,18 @@ def handle_node_request(configmanager, inputdata, operation, del pathcomponents[0:2] passvalues = queue.Queue() plugroute = routespec.routeinfo - msginputdata = msg.get_input_message( - pathcomponents, operation, inputdata, nodes, isnoderange, - configmanager) + _plugin = None + if 'handler' in plugroute: # fixed handler definition, easy enough if isinstance(plugroute['handler'], str): hfunc = getattr(pluginmap[plugroute['handler']], operation) + _plugin = pluginmap[plugroute['handler']] else: hfunc = getattr(plugroute['handler'], operation) + _plugin = plugroute['handler'] + msginputdata = _get_input_data(_plugin, pathcomponents, operation, + inputdata, nodes, isnoderange, + configmanager) passvalue = hfunc( nodes=nodes, element=pathcomponents, configmanager=configmanager, @@ -995,6 +999,7 @@ def handle_node_request(configmanager, inputdata, operation, continue if plugpath: try: + _plugin = pluginmap[plugpath] hfunc = getattr(pluginmap[plugpath], operation) except KeyError: nodesbyhandler[BadPlugin(node, plugpath).error] = [node] @@ -1012,7 +1017,9 @@ def handle_node_request(configmanager, inputdata, operation, workers.spawn(addtoqueue, passvalues, hfunc, {'nodes': nodesbyhandler[hfunc], 'element': pathcomponents, 'configmanager': configmanager, - 'inputdata': msginputdata}) + 'inputdata': _get_input_data(_plugin, pathcomponents, + operation, inputdata,nodes, + isnoderange, configmanager)}) for manager in nodesbymanager: numworkers += 1 workers.spawn(addtoqueue, passvalues, dispatch_request, { @@ -1033,6 +1040,17 @@ def handle_node_request(configmanager, inputdata, operation, # return stripnode(passvalues[0], nodes[0]) +def _get_input_data(plugin_ext, pathcomponents, operation, inputdata, + nodes, isnoderange, configmanager): + if plugin_ext is not None and hasattr(plugin_ext, 'get_input_message'): + return plugin_ext.get_input_message(pathcomponents, operation, + inputdata, nodes, isnoderange, + configmanager) + else: + return msg.get_input_message(pathcomponents, operation, inputdata, + nodes, isnoderange,configmanager) + + def iterate_queue(numworkers, passvalues, strip=False): completions = 0 while completions < numworkers: