2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-01-27 11:30:06 +00:00

Stage 3 of msgpack for dispatch

This may complete the dispatch portion of the msgpack migration.
This commit is contained in:
Jarrod Johnson 2020-01-21 14:15:14 -05:00
parent 9d770632ce
commit 3bf083deb3

View File

@ -60,19 +60,14 @@ import eventlet.greenpool as greenpool
import eventlet.green.ssl as ssl
import eventlet.queue as queue
import itertools
import msgpack
import os
try:
import cPickle as pickle
pargs = {}
except ImportError:
import pickle
pargs = {'encoding': 'utf-8'}
import socket
import struct
import sys
pluginmap = {}
dispatch_plugins = (b'ipmi', u'ipmi')
dispatch_plugins = (b'ipmi', u'ipmi', b'redfish', u'redfish')
def seek_element(currplace, currkey):
@ -689,10 +684,13 @@ def handle_dispatch(connection, cert, dispatch, peername):
cfm.get_collective_member(peername)['fingerprint'], cert):
connection.close()
return
pversion = 0
if bytearray(dispatch)[0] == 0x80:
pversion = bytearray(dispatch)[1]
dispatch = pickle.loads(dispatch, **pargs)
if dispatch[0:2] != b'\x01\x03': # magic value to indicate msgpack
# We only support msgpack now
# The magic should preclude any pickle, as the first byte can never be
# under 0x20 or so.
connection.close()
return
dispatch = msgpack.unpackb(dispatch[2:])
configmanager = cfm.ConfigManager(dispatch['tenant'])
nodes = dispatch['nodes']
inputdata = dispatch['inputdata']
@ -736,10 +734,16 @@ def handle_dispatch(connection, cert, dispatch, peername):
def _forward_rsp(connection, res, pversion):
try:
r = pickle.dumps(res, protocol=pversion)
except TypeError:
r = pickle.dumps(Exception(
'Cannot serialize error, check collective.manager error logs for details' + str(res)), protocol=pversion)
r = res.serialize()
except AttributeError:
if isinstance(res, Exception):
r = msgpack.packb(['Exception', str(res)])
else:
r = msgpack.packb(
['Exception', 'Unable to serialize response ' + repr(res)])
except Exception:
r = msgpack.packb(
['Exception', 'Unable to serialize response ' + repr(res)])
rlen = len(r)
if not rlen:
return
@ -978,10 +982,10 @@ def dispatch_request(nodes, manager, element, configmanager, inputdata,
pvers = 2
tlvdata.recv(remote)
myname = collective.get_myname()
dreq = pickle.dumps({'name': myname, 'nodes': list(nodes),
'path': element,'tenant': configmanager.tenant,
'operation': operation, 'inputdata': inputdata},
protocol=pvers)
dreq = b'\x01\x03' + msgpack.packb(
{'name': myname, 'nodes': list(nodes),
'path': element,'tenant': configmanager.tenant,
'operation': operation, 'inputdata': inputdata})
tlvdata.send(remote, {'dispatch': {'name': myname, 'length': len(dreq)}})
remote.sendall(dreq)
while True:
@ -1029,9 +1033,9 @@ def dispatch_request(nodes, manager, element, configmanager, inputdata,
return
rsp += nrsp
try:
rsp = pickle.loads(rsp, **pargs)
except UnicodeDecodeError:
rsp = pickle.loads(rsp, encoding='latin1')
rsp = msg.msg_deserialize(rsp)
except Exception:
rsp = exc.deserialize_exc(rsp)
if isinstance(rsp, Exception):
raise rsp
yield rsp