diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index bf3c0ded..bc282660 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -607,7 +607,7 @@ def relay_slaved_requests(name, listener): if not nrpc: raise Exception('Truncated client error') rpc += nrpc - rpc = msgpack.unpackb(rpc) + rpc = msgpack.unpackb(rpc, raw=False) exc = None if not (rpc['function'].startswith('_rpc_') or rpc['function'].endswith('_collective_member')): raise Exception('Unsupported function {0} called'.format(rpc['function'])) @@ -763,7 +763,7 @@ def follow_channel(channel): if not nrpc: raise Exception('Truncated message error') rpc += nrpc - rpc = msgpack.unpackb(rpc) + rpc = msgpack.unpackb(rpc, raw=False) if 'txcount' in rpc: _txcount = rpc['txcount'] if 'function' in rpc: @@ -1886,6 +1886,8 @@ class ConfigManager(object): eventlet.spawn_n(_do_notifier, self, watcher, callback) def del_nodes(self, nodes): + if isinstance(nodes, set): + nodes = list(nodes) # msgpack can't handle set if cfgleader: # slaved to a collective return exec_on_leader('_rpc_master_del_nodes', self.tenant, nodes)