2
0
mirror of https://github.com/xcat2/confluent.git synced 2024-11-22 17:43:14 +00:00

Do a keepalive to track idle, but alive collective

When relaying a command, use a keepalive to indicate that
the connection is alive, just not sending data.
This commit is contained in:
Jarrod Johnson 2021-04-09 17:27:00 -04:00
parent b6fb91b228
commit 23dffe882e
2 changed files with 18 additions and 2 deletions

View File

@ -159,6 +159,7 @@ def connect_to_leader(cert=None, name=None, leader=None, remote=None):
raise
currentleader = leader
#spawn this as a thread...
remote.settimeout(90)
follower = eventlet.spawn(follow_leader, remote, leader)
return True

View File

@ -57,9 +57,11 @@ except ImportError:
# Only required for collective mode
crypto = None
import confluent.util as util
import eventlet
import eventlet.greenpool as greenpool
import eventlet.green.ssl as ssl
import eventlet.queue as queue
import eventlet.semaphore as semaphore
import itertools
import msgpack
import os
@ -745,6 +747,12 @@ def abbreviate_noderange(configmanager, inputdata, operation):
return (msg.KeyValueData({'noderange': noderange.ReverseNodeRange(inputdata['nodes'], configmanager).noderange}),)
def _keepalivefn(connection, xmitlock):
while True:
eventlet.sleep(30)
with xmitlock:
connection.sendall(b'\x00\x00\x00\x00\x00\x00\x00\x01\x00')
def handle_dispatch(connection, cert, dispatch, peername):
cert = crypto.dump_certificate(crypto.FILETYPE_ASN1, cert)
if not util.cert_matches(
@ -757,6 +765,8 @@ def handle_dispatch(connection, cert, dispatch, peername):
# under 0x20 or so.
connection.close()
return
xmitlock = semaphore.Semaphore()
keepalive = eventlet.spawn(_keepalivefn, connection, xmitlock)
dispatch = msgpack.unpackb(dispatch[2:], raw=False)
configmanager = cfm.ConfigManager(dispatch['tenant'])
nodes = dispatch['nodes']
@ -796,9 +806,12 @@ def handle_dispatch(connection, cert, dispatch, peername):
configmanager=configmanager,
inputdata=inputdata))
for res in itertools.chain(*passvalues):
_forward_rsp(connection, res)
with xmitlock:
_forward_rsp(connection, res)
except Exception as res:
_forward_rsp(connection, res)
with xmitlock:
_forward_rsp(connection, res)
keepalive.kill()
connection.sendall('\x00\x00\x00\x00\x00\x00\x00\x00')
@ -1104,6 +1117,8 @@ def dispatch_request(nodes, manager, element, configmanager, inputdata,
a['name']))
return
rsp += nrsp
if rsp == b'\x00':
continue
try:
rsp = msg.msg_deserialize(rsp)
except Exception: