2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-10-26 00:45:43 +00:00

Add keepalive and acks to collective

Detect unplugged condition (eventually).
This commit is contained in:
Jarrod Johnson
2018-07-18 13:45:03 -04:00
parent 2a34388d09
commit dcad9f5a75

View File

@@ -61,6 +61,7 @@ import cPickle
import errno
import eventlet
import eventlet.event as event
import eventlet.green.select as select
import eventlet.green.threading as gthread
import fnmatch
import json
@@ -80,6 +81,7 @@ _masterintegritykey = None
_dirtylock = threading.RLock()
_leaderlock = gthread.RLock()
_synclock = threading.RLock()
_rpclock = gthread.RLock()
_followerlocks = {}
_config_areas = ('nodegroups', 'nodes', 'usergroups', 'users')
tracelog = None
@@ -311,8 +313,10 @@ def init_masterkey(password=None, autogen=True):
def _push_rpc(stream, payload):
stream.sendall(struct.pack('!Q', len(payload)))
stream.sendall(payload)
with _rpclock:
stream.sendall(struct.pack('!Q', len(payload)))
if len(payload):
stream.sendall(payload)
def decrypt_value(cryptvalue,
@@ -491,10 +495,8 @@ def relay_slaved_requests(name, listener):
except Exception:
pass
cfgstreams[name] = listener
try:
msg = listener.recv(8)
except Exception:
msg = None
lh = StreamHandler(listener)
msg = lh.get_next_msg()
while msg:
if name not in cfgstreams:
raise Exception("Unexpected loss of node in followers: " + name)
@@ -511,7 +513,7 @@ def relay_slaved_requests(name, listener):
if 'xid' in rpc:
_push_rpc(listener, cPickle.dumps({'xid': rpc['xid']}))
try:
msg = listener.recv(8)
msg = lh.get_next_msg()
except Exception:
msg = None
finally:
@@ -527,6 +529,35 @@ def relay_slaved_requests(name, listener):
stop_following(True)
class StreamHandler(object):
def __init__(self, sock):
self.sock = sock
self.keepalive = confluent.util.monotonic_time() + 20
self.expiry = self.keepalive + 40
def get_next_msg(self):
r = (False,)
try:
while not r[0]:
r = select.select(
(self.sock,), (), (),
self.keepalive - confluent.util.monotonic_time())
if confluent.util.monotonic_time() > self.expiry:
return None
if confluent.util.monotonic_time() > self.keepalive:
_push_rpc(self.sock, b'') # nulls are a keepalive
self.keepalive = confluent.util.monotonic_time() + 20
self.expiry = confluent.util.monotonic_time() + 60
msg = self.sock.recv(8)
except Exception:
msg = None
return msg
def close(self):
self.sock = None
def stop_following(replacement=None):
with _leaderlock:
global cfgleader
@@ -598,10 +629,8 @@ def follow_channel(channel):
global _txcount
stop_leading()
stop_following(channel)
try:
msg = channel.recv(8)
except Exception:
msg = None
lh = StreamHandler(channel)
msg = lh.get_next_msg()
while msg:
sz = struct.unpack('!Q', msg)[0]
if sz != 0:
@@ -618,10 +647,8 @@ def follow_channel(channel):
globals()[rpc['function']](*rpc['args'])
if 'xid' in rpc and rpc['xid']:
_pendingchangesets[rpc['xid']].send()
try:
msg = channel.recv(8)
except Exception:
msg = None
_push_rpc(channel, b'') # use null as ACK
msg = lh.get_next_msg()
# mark the connection as broken
if cfgstreams:
stop_following(None)