mirror of
https://github.com/xcat2/confluent.git
synced 2024-11-22 17:43:14 +00:00
Fix attribute synchronization
Specify a finite read to actually return from the buffer. Convert some functions to async/await as appropriate.
This commit is contained in:
parent
d2edcb62c6
commit
a5dc10debf
@ -82,8 +82,6 @@ try:
|
||||
except ModuleNotFoundError:
|
||||
import pickle as cPickle
|
||||
import errno
|
||||
import eventlet.event as event
|
||||
import eventlet.green.select as select
|
||||
import socket
|
||||
import fnmatch
|
||||
import hashlib
|
||||
@ -241,8 +239,8 @@ def _rpc_set_user(tenant, name, attributemap):
|
||||
ConfigManager(tenant)._true_set_user(name, attributemap)
|
||||
|
||||
|
||||
def _rpc_master_set_node_attributes(tenant, attribmap, autocreate):
|
||||
ConfigManager(tenant).set_node_attributes(attribmap, autocreate)
|
||||
async def _rpc_master_set_node_attributes(tenant, attribmap, autocreate):
|
||||
await ConfigManager(tenant).set_node_attributes(attribmap, autocreate)
|
||||
|
||||
|
||||
def _rpc_master_rename_nodes(tenant, renamemap):
|
||||
@ -306,12 +304,12 @@ def _rpc_del_groups(tenant, groups):
|
||||
ConfigManager(tenant)._true_del_groups(groups)
|
||||
|
||||
|
||||
def _rpc_master_del_nodes(tenant, nodes):
|
||||
ConfigManager(tenant).del_nodes(nodes)
|
||||
async def _rpc_master_del_nodes(tenant, nodes):
|
||||
await ConfigManager(tenant).del_nodes(nodes)
|
||||
|
||||
|
||||
def _rpc_del_nodes(tenant, nodes):
|
||||
ConfigManager(tenant)._true_del_nodes(nodes)
|
||||
async def _rpc_del_nodes(tenant, nodes):
|
||||
await ConfigManager(tenant)._true_del_nodes(nodes)
|
||||
|
||||
|
||||
def _rpc_set_node_attributes(tenant, attribmap, autocreate):
|
||||
@ -425,7 +423,6 @@ async def _push_rpc(stream, payload):
|
||||
stream[1].write(struct.pack('!Q', len(payload)))
|
||||
if len(payload):
|
||||
stream[1].write(payload)
|
||||
print("sent payload: " + repr(payload))
|
||||
await stream[1].drain()
|
||||
return True
|
||||
except Exception:
|
||||
@ -704,11 +701,11 @@ async def relay_slaved_requests(name, listener):
|
||||
raise Exception("Unexpected loss of node in followers: " + name)
|
||||
sz = struct.unpack('!Q', msg)[0]
|
||||
if sz == 0:
|
||||
_push_rpc(listener, b'')
|
||||
await _push_rpc(listener, b'')
|
||||
else:
|
||||
rpc = b''
|
||||
while len(rpc) < sz:
|
||||
nrpc = listener.recv(sz - len(rpc))
|
||||
nrpc = await listener[0].read(sz - len(rpc))
|
||||
if not nrpc:
|
||||
raise Exception('Truncated client error')
|
||||
rpc += nrpc
|
||||
@ -719,14 +716,17 @@ async def relay_slaved_requests(name, listener):
|
||||
retv = None
|
||||
try:
|
||||
retv = globals()[rpc['function']](*rpc['args'])
|
||||
if asyncio.iscoroutine(retv):
|
||||
retv = await retv
|
||||
except ValueError as ve:
|
||||
exc = ['ValueError', str(ve)]
|
||||
except Exception as e:
|
||||
logException()
|
||||
exc = ['Exception', str(e)]
|
||||
if 'xid' in rpc:
|
||||
res = _push_rpc(listener, msgpack.packb({'xid': rpc['xid'],
|
||||
'exc': exc, 'ret': retv}, use_bin_type=False))
|
||||
res = await _push_rpc(listener,
|
||||
msgpack.packb({'xid': rpc['xid'],
|
||||
'exc': exc, 'ret': retv}, use_bin_type=False))
|
||||
if not res:
|
||||
break
|
||||
try:
|
||||
@ -778,7 +778,7 @@ class StreamHandler(object):
|
||||
try:
|
||||
while not msg:
|
||||
try:
|
||||
msg = await asyncio.wait_for(self.sock[0].read(), timeout=self.keepalive - confluent.util.monotonic_time())
|
||||
msg = await asyncio.wait_for(self.sock[0].read(8), timeout=self.keepalive - confluent.util.monotonic_time())
|
||||
except TimeoutError:
|
||||
msg = None
|
||||
if confluent.util.monotonic_time() > self.expiry:
|
||||
@ -931,7 +931,9 @@ async def follow_channel(channel):
|
||||
if not (rpc['function'].startswith('_true') or rpc['function'].startswith('_rpc')):
|
||||
raise Exception("Received unsupported function call: {0}".format(rpc['function']))
|
||||
try:
|
||||
globals()[rpc['function']](*rpc['args'])
|
||||
retv = globals()[rpc['function']](*rpc['args'])
|
||||
if asyncio.iscoroutine(retv):
|
||||
retv = await retv
|
||||
except Exception as e:
|
||||
print(repr(e))
|
||||
if 'xid' in rpc and rpc['xid']:
|
||||
@ -2279,7 +2281,7 @@ class ConfigManager(object):
|
||||
|
||||
async def rename_nodes(self, renamemap):
|
||||
if cfgleader:
|
||||
return exec_on_leader('_rpc_master_rename_nodes', self.tenant,
|
||||
return await exec_on_leader('_rpc_master_rename_nodes', self.tenant,
|
||||
renamemap)
|
||||
if cfgstreams:
|
||||
await exec_on_followers('_rpc_rename_nodes', self.tenant, renamemap)
|
||||
@ -2378,7 +2380,7 @@ class ConfigManager(object):
|
||||
if 'value' in curr[attrib]:
|
||||
del curr[attrib]['value']
|
||||
if cfgleader: # currently config slave to another
|
||||
return exec_on_leader('_rpc_master_set_node_attributes',
|
||||
return await exec_on_leader('_rpc_master_set_node_attributes',
|
||||
self.tenant, attribmap, autocreate)
|
||||
if cfgstreams:
|
||||
await exec_on_followers('_rpc_set_node_attributes',
|
||||
|
Loading…
Reference in New Issue
Block a user