diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 7f779601..4580bd39 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -159,6 +159,7 @@ def connect_to_leader(cert=None, name=None, leader=None, remote=None): raise currentleader = leader #spawn this as a thread... + remote.settimeout(90) follower = eventlet.spawn(follow_leader, remote, leader) return True diff --git a/confluent_server/confluent/core.py b/confluent_server/confluent/core.py index dd012679..1816b2b5 100644 --- a/confluent_server/confluent/core.py +++ b/confluent_server/confluent/core.py @@ -57,9 +57,11 @@ except ImportError: # Only required for collective mode crypto = None import confluent.util as util +import eventlet import eventlet.greenpool as greenpool import eventlet.green.ssl as ssl import eventlet.queue as queue +import eventlet.semaphore as semaphore import itertools import msgpack import os @@ -745,6 +747,12 @@ def abbreviate_noderange(configmanager, inputdata, operation): return (msg.KeyValueData({'noderange': noderange.ReverseNodeRange(inputdata['nodes'], configmanager).noderange}),) +def _keepalivefn(connection, xmitlock): + while True: + eventlet.sleep(30) + with xmitlock: + connection.sendall(b'\x00\x00\x00\x00\x00\x00\x00\x01\x00') + def handle_dispatch(connection, cert, dispatch, peername): cert = crypto.dump_certificate(crypto.FILETYPE_ASN1, cert) if not util.cert_matches( @@ -757,6 +765,8 @@ def handle_dispatch(connection, cert, dispatch, peername): # under 0x20 or so. connection.close() return + xmitlock = semaphore.Semaphore() + keepalive = eventlet.spawn(_keepalivefn, connection, xmitlock) dispatch = msgpack.unpackb(dispatch[2:], raw=False) configmanager = cfm.ConfigManager(dispatch['tenant']) nodes = dispatch['nodes'] @@ -796,9 +806,12 @@ def handle_dispatch(connection, cert, dispatch, peername): configmanager=configmanager, inputdata=inputdata)) for res in itertools.chain(*passvalues): - _forward_rsp(connection, res) + with xmitlock: + _forward_rsp(connection, res) except Exception as res: - _forward_rsp(connection, res) + with xmitlock: + _forward_rsp(connection, res) + keepalive.kill() connection.sendall('\x00\x00\x00\x00\x00\x00\x00\x00') @@ -1104,6 +1117,8 @@ def dispatch_request(nodes, manager, element, configmanager, inputdata, a['name'])) return rsp += nrsp + if rsp == b'\x00': + continue try: rsp = msg.msg_deserialize(rsp) except Exception: