2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-04-15 17:49:34 +00:00

Move input handling to destination

It is tricky to serialize a configmanager object, and
probably was making the requests gigantic anyway.

Serialize the parameters to let the target use its local copy instead
of serializing an entire config manager.
This commit is contained in:
Jarrod Johnson 2020-01-27 15:26:54 -05:00
parent 8a9e9aa7b3
commit 09582d7597

View File

@ -697,6 +697,9 @@ def handle_dispatch(connection, cert, dispatch, peername):
operation = dispatch['operation']
pathcomponents = dispatch['path']
routespec = nested_lookup(noderesources, pathcomponents)
inputdata = msg.get_input_message(
pathcomponents, operation, inputdata, nodes, dispatch['isnoderange'],
configmanager)
plugroute = routespec.routeinfo
plugpath = None
nodesbyhandler = {}
@ -836,7 +839,7 @@ def handle_node_request(configmanager, inputdata, operation,
del pathcomponents[0:2]
passvalues = queue.Queue()
plugroute = routespec.routeinfo
inputdata = msg.get_input_message(
msginputdata = msg.get_input_message(
pathcomponents, operation, inputdata, nodes, isnoderange,
configmanager)
if 'handler' in plugroute: # fixed handler definition, easy enough
@ -847,7 +850,7 @@ def handle_node_request(configmanager, inputdata, operation,
passvalue = hfunc(
nodes=nodes, element=pathcomponents,
configmanager=configmanager,
inputdata=inputdata)
inputdata=msginputdata)
if isnoderange:
return passvalue
elif isinstance(passvalue, console.Console):
@ -900,13 +903,13 @@ def handle_node_request(configmanager, inputdata, operation,
workers.spawn(addtoqueue, passvalues, hfunc, {'nodes': nodesbyhandler[hfunc],
'element': pathcomponents,
'configmanager': configmanager,
'inputdata': inputdata})
'inputdata': msginputdata})
for manager in nodesbymanager:
numworkers += 1
workers.spawn(addtoqueue, passvalues, dispatch_request, {
'nodes': nodesbymanager[manager], 'manager': manager,
'element': pathcomponents, 'configmanager': configmanager,
'inputdata': inputdata, 'operation': operation})
'inputdata': inputdata, 'operation': operation, 'isnoderange': isnoderange})
if isnoderange or not autostrip:
return iterate_queue(numworkers, passvalues)
else:
@ -950,7 +953,7 @@ def addtoqueue(theq, fun, kwargs):
def dispatch_request(nodes, manager, element, configmanager, inputdata,
operation):
operation, isnoderange):
a = configmanager.get_collective_member(manager)
try:
remote = socket.create_connection((a['address'], 13001))
@ -987,7 +990,7 @@ def dispatch_request(nodes, manager, element, configmanager, inputdata,
dreq = b'\x01\x03' + msgpack.packb(
{'name': myname, 'nodes': list(nodes),
'path': element,'tenant': configmanager.tenant,
'operation': operation, 'inputdata': inputdata}, use_bin_type=False)
'operation': operation, 'inputdata': inputdata, 'isnoderange': isnoderange}, use_bin_type=False)
tlvdata.send(remote, {'dispatch': {'name': myname, 'length': len(dreq)}})
remote.sendall(dreq)
while True: