diff --git a/confluent_server/confluent/core.py b/confluent_server/confluent/core.py index 50d2b32a..4575a46a 100644 --- a/confluent_server/confluent/core.py +++ b/confluent_server/confluent/core.py @@ -831,6 +831,7 @@ def dispatch_request(nodes, manager, element, configmanager, inputdata, a = configmanager.get_collective_member(manager) try: remote = socket.create_connection((a['address'], 13001)) + remote.settimeout(90) remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE, keyfile='/etc/confluent/privkey.pem', certfile='/etc/confluent/srvcert.pem') @@ -851,20 +852,48 @@ def dispatch_request(nodes, manager, element, configmanager, inputdata, tlvdata.send(remote, {'dispatch': {'name': myname, 'length': len(dreq)}}) remote.sendall(dreq) while True: - rlen = remote.recv(8) + try: + rlen = remote.recv(8) + except Exception: + for node in nodes: + yield msg.ConfluentResourceUnavailable( + node, 'Collective member {0} went unreachable'.format( + a['name'])) + return while len(rlen) < 8: - nlen = remote.recv(8 - len(rlen)) + try: + nlen = remote.recv(8 - len(rlen)) + except Exception: + nlen = 0 if not nlen: - raise Exception('Error receiving data') + for node in nodes: + yield msg.ConfluentResourceUnavailable( + node, 'Collective member {0} went unreachable'.format( + a['name'])) + return rlen += nlen rlen = struct.unpack('!Q', rlen)[0] if rlen == 0: break - rsp = remote.recv(rlen) + try: + rsp = remote.recv(rlen) + except Exception: + for node in nodes: + yield msg.ConfluentResourceUnavailable( + node, 'Collective member {0} went unreachable'.format( + a['name'])) + return while len(rsp) < rlen: - nrsp = remote.recv(rlen - len(rsp)) + try: + nrsp = remote.recv(rlen - len(rsp)) + except Exception: + nrsp = 0 if not nrsp: - raise Exception('Error receving data') + for node in nodes: + yield msg.ConfluentResourceUnavailable( + node, 'Collective member {0} went unreachable'.format( + a['name'])) + return rsp += nrsp rsp = pickle.loads(rsp) if isinstance(rsp, Exception):