diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index ef537889..1d6e9b23 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -82,8 +82,6 @@ try: except ModuleNotFoundError: import pickle as cPickle import errno -import eventlet.event as event -import eventlet.green.select as select import socket import fnmatch import hashlib @@ -241,8 +239,8 @@ def _rpc_set_user(tenant, name, attributemap): ConfigManager(tenant)._true_set_user(name, attributemap) -def _rpc_master_set_node_attributes(tenant, attribmap, autocreate): - ConfigManager(tenant).set_node_attributes(attribmap, autocreate) +async def _rpc_master_set_node_attributes(tenant, attribmap, autocreate): + await ConfigManager(tenant).set_node_attributes(attribmap, autocreate) def _rpc_master_rename_nodes(tenant, renamemap): @@ -306,12 +304,12 @@ def _rpc_del_groups(tenant, groups): ConfigManager(tenant)._true_del_groups(groups) -def _rpc_master_del_nodes(tenant, nodes): - ConfigManager(tenant).del_nodes(nodes) +async def _rpc_master_del_nodes(tenant, nodes): + await ConfigManager(tenant).del_nodes(nodes) -def _rpc_del_nodes(tenant, nodes): - ConfigManager(tenant)._true_del_nodes(nodes) +async def _rpc_del_nodes(tenant, nodes): + await ConfigManager(tenant)._true_del_nodes(nodes) def _rpc_set_node_attributes(tenant, attribmap, autocreate): @@ -425,7 +423,6 @@ async def _push_rpc(stream, payload): stream[1].write(struct.pack('!Q', len(payload))) if len(payload): stream[1].write(payload) - print("sent payload: " + repr(payload)) await stream[1].drain() return True except Exception: @@ -704,11 +701,11 @@ async def relay_slaved_requests(name, listener): raise Exception("Unexpected loss of node in followers: " + name) sz = struct.unpack('!Q', msg)[0] if sz == 0: - _push_rpc(listener, b'') + await _push_rpc(listener, b'') else: rpc = b'' while len(rpc) < sz: - nrpc = listener.recv(sz - len(rpc)) + nrpc = await listener[0].read(sz - len(rpc)) if not nrpc: raise Exception('Truncated client error') rpc += nrpc @@ -719,14 +716,17 @@ async def relay_slaved_requests(name, listener): retv = None try: retv = globals()[rpc['function']](*rpc['args']) + if asyncio.iscoroutine(retv): + retv = await retv except ValueError as ve: exc = ['ValueError', str(ve)] except Exception as e: logException() exc = ['Exception', str(e)] if 'xid' in rpc: - res = _push_rpc(listener, msgpack.packb({'xid': rpc['xid'], - 'exc': exc, 'ret': retv}, use_bin_type=False)) + res = await _push_rpc(listener, + msgpack.packb({'xid': rpc['xid'], + 'exc': exc, 'ret': retv}, use_bin_type=False)) if not res: break try: @@ -778,7 +778,7 @@ class StreamHandler(object): try: while not msg: try: - msg = await asyncio.wait_for(self.sock[0].read(), timeout=self.keepalive - confluent.util.monotonic_time()) + msg = await asyncio.wait_for(self.sock[0].read(8), timeout=self.keepalive - confluent.util.monotonic_time()) except TimeoutError: msg = None if confluent.util.monotonic_time() > self.expiry: @@ -931,7 +931,9 @@ async def follow_channel(channel): if not (rpc['function'].startswith('_true') or rpc['function'].startswith('_rpc')): raise Exception("Received unsupported function call: {0}".format(rpc['function'])) try: - globals()[rpc['function']](*rpc['args']) + retv = globals()[rpc['function']](*rpc['args']) + if asyncio.iscoroutine(retv): + retv = await retv except Exception as e: print(repr(e)) if 'xid' in rpc and rpc['xid']: @@ -2279,7 +2281,7 @@ class ConfigManager(object): async def rename_nodes(self, renamemap): if cfgleader: - return exec_on_leader('_rpc_master_rename_nodes', self.tenant, + return await exec_on_leader('_rpc_master_rename_nodes', self.tenant, renamemap) if cfgstreams: await exec_on_followers('_rpc_rename_nodes', self.tenant, renamemap) @@ -2378,7 +2380,7 @@ class ConfigManager(object): if 'value' in curr[attrib]: del curr[attrib]['value'] if cfgleader: # currently config slave to another - return exec_on_leader('_rpc_master_set_node_attributes', + return await exec_on_leader('_rpc_master_set_node_attributes', self.tenant, attribmap, autocreate) if cfgstreams: await exec_on_followers('_rpc_set_node_attributes',