From 4be41000145d69d0646b968093d449b56d2b049c Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Wed, 29 Jan 2020 09:24:57 -0500 Subject: [PATCH] Fix configmanager msgpack msgpack method had some regressions. For one, python2 strings became bytes on mixed collective, fix by using raw=False on the receiver. Additionally, del_nodes tends to use sets, and that's not viable for msgpack. Guard against that. --- confluent_server/confluent/config/configmanager.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index bf3c0ded..bc282660 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -607,7 +607,7 @@ def relay_slaved_requests(name, listener): if not nrpc: raise Exception('Truncated client error') rpc += nrpc - rpc = msgpack.unpackb(rpc) + rpc = msgpack.unpackb(rpc, raw=False) exc = None if not (rpc['function'].startswith('_rpc_') or rpc['function'].endswith('_collective_member')): raise Exception('Unsupported function {0} called'.format(rpc['function'])) @@ -763,7 +763,7 @@ def follow_channel(channel): if not nrpc: raise Exception('Truncated message error') rpc += nrpc - rpc = msgpack.unpackb(rpc) + rpc = msgpack.unpackb(rpc, raw=False) if 'txcount' in rpc: _txcount = rpc['txcount'] if 'function' in rpc: @@ -1886,6 +1886,8 @@ class ConfigManager(object): eventlet.spawn_n(_do_notifier, self, watcher, callback) def del_nodes(self, nodes): + if isinstance(nodes, set): + nodes = list(nodes) # msgpack can't handle set if cfgleader: # slaved to a collective return exec_on_leader('_rpc_master_del_nodes', self.tenant, nodes)