mirror of
https://github.com/xcat2/confluent.git
synced 2025-01-13 11:17:49 +00:00
Error on loss of manager in flight
This commit is contained in:
parent
200569e7af
commit
b053d41cd8
@ -831,6 +831,7 @@ def dispatch_request(nodes, manager, element, configmanager, inputdata,
|
||||
a = configmanager.get_collective_member(manager)
|
||||
try:
|
||||
remote = socket.create_connection((a['address'], 13001))
|
||||
remote.settimeout(90)
|
||||
remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE,
|
||||
keyfile='/etc/confluent/privkey.pem',
|
||||
certfile='/etc/confluent/srvcert.pem')
|
||||
@ -851,20 +852,48 @@ def dispatch_request(nodes, manager, element, configmanager, inputdata,
|
||||
tlvdata.send(remote, {'dispatch': {'name': myname, 'length': len(dreq)}})
|
||||
remote.sendall(dreq)
|
||||
while True:
|
||||
rlen = remote.recv(8)
|
||||
try:
|
||||
rlen = remote.recv(8)
|
||||
except Exception:
|
||||
for node in nodes:
|
||||
yield msg.ConfluentResourceUnavailable(
|
||||
node, 'Collective member {0} went unreachable'.format(
|
||||
a['name']))
|
||||
return
|
||||
while len(rlen) < 8:
|
||||
nlen = remote.recv(8 - len(rlen))
|
||||
try:
|
||||
nlen = remote.recv(8 - len(rlen))
|
||||
except Exception:
|
||||
nlen = 0
|
||||
if not nlen:
|
||||
raise Exception('Error receiving data')
|
||||
for node in nodes:
|
||||
yield msg.ConfluentResourceUnavailable(
|
||||
node, 'Collective member {0} went unreachable'.format(
|
||||
a['name']))
|
||||
return
|
||||
rlen += nlen
|
||||
rlen = struct.unpack('!Q', rlen)[0]
|
||||
if rlen == 0:
|
||||
break
|
||||
rsp = remote.recv(rlen)
|
||||
try:
|
||||
rsp = remote.recv(rlen)
|
||||
except Exception:
|
||||
for node in nodes:
|
||||
yield msg.ConfluentResourceUnavailable(
|
||||
node, 'Collective member {0} went unreachable'.format(
|
||||
a['name']))
|
||||
return
|
||||
while len(rsp) < rlen:
|
||||
nrsp = remote.recv(rlen - len(rsp))
|
||||
try:
|
||||
nrsp = remote.recv(rlen - len(rsp))
|
||||
except Exception:
|
||||
nrsp = 0
|
||||
if not nrsp:
|
||||
raise Exception('Error receving data')
|
||||
for node in nodes:
|
||||
yield msg.ConfluentResourceUnavailable(
|
||||
node, 'Collective member {0} went unreachable'.format(
|
||||
a['name']))
|
||||
return
|
||||
rsp += nrsp
|
||||
rsp = pickle.loads(rsp)
|
||||
if isinstance(rsp, Exception):
|
||||
|
Loading…
x
Reference in New Issue
Block a user