diff --git a/confluent_server/confluent/core.py b/confluent_server/confluent/core.py index a677309f..5d5bcd6b 100644 --- a/confluent_server/confluent/core.py +++ b/confluent_server/confluent/core.py @@ -697,6 +697,9 @@ def handle_dispatch(connection, cert, dispatch, peername): operation = dispatch['operation'] pathcomponents = dispatch['path'] routespec = nested_lookup(noderesources, pathcomponents) + inputdata = msg.get_input_message( + pathcomponents, operation, inputdata, nodes, dispatch['isnoderange'], + configmanager) plugroute = routespec.routeinfo plugpath = None nodesbyhandler = {} @@ -836,7 +839,7 @@ def handle_node_request(configmanager, inputdata, operation, del pathcomponents[0:2] passvalues = queue.Queue() plugroute = routespec.routeinfo - inputdata = msg.get_input_message( + msginputdata = msg.get_input_message( pathcomponents, operation, inputdata, nodes, isnoderange, configmanager) if 'handler' in plugroute: # fixed handler definition, easy enough @@ -847,7 +850,7 @@ def handle_node_request(configmanager, inputdata, operation, passvalue = hfunc( nodes=nodes, element=pathcomponents, configmanager=configmanager, - inputdata=inputdata) + inputdata=msginputdata) if isnoderange: return passvalue elif isinstance(passvalue, console.Console): @@ -900,13 +903,13 @@ def handle_node_request(configmanager, inputdata, operation, workers.spawn(addtoqueue, passvalues, hfunc, {'nodes': nodesbyhandler[hfunc], 'element': pathcomponents, 'configmanager': configmanager, - 'inputdata': inputdata}) + 'inputdata': msginputdata}) for manager in nodesbymanager: numworkers += 1 workers.spawn(addtoqueue, passvalues, dispatch_request, { 'nodes': nodesbymanager[manager], 'manager': manager, 'element': pathcomponents, 'configmanager': configmanager, - 'inputdata': inputdata, 'operation': operation}) + 'inputdata': inputdata, 'operation': operation, 'isnoderange': isnoderange}) if isnoderange or not autostrip: return iterate_queue(numworkers, passvalues) else: @@ -950,7 +953,7 @@ def addtoqueue(theq, fun, kwargs): def dispatch_request(nodes, manager, element, configmanager, inputdata, - operation): + operation, isnoderange): a = configmanager.get_collective_member(manager) try: remote = socket.create_connection((a['address'], 13001)) @@ -987,7 +990,7 @@ def dispatch_request(nodes, manager, element, configmanager, inputdata, dreq = b'\x01\x03' + msgpack.packb( {'name': myname, 'nodes': list(nodes), 'path': element,'tenant': configmanager.tenant, - 'operation': operation, 'inputdata': inputdata}, use_bin_type=False) + 'operation': operation, 'inputdata': inputdata, 'isnoderange': isnoderange}, use_bin_type=False) tlvdata.send(remote, {'dispatch': {'name': myname, 'length': len(dreq)}}) remote.sendall(dreq) while True: