2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-02-16 10:39:23 +00:00

Properly detect killed leader

If leader closes connection, then have get_next_msg return None
as it did before.
This commit is contained in:
Jarrod Johnson 2024-05-17 16:03:37 -04:00
parent fbdb35e33d
commit c03aa728cc
2 changed files with 20 additions and 46 deletions

View File

@ -207,7 +207,6 @@ async def follow_leader(remote, leader):
newleader = None
try:
exitcause = await cfm.follow_channel(remote)
print(repr(exitcause))
newleader = exitcause.get('newleader', None)
except greenlet.GreenletExit:
cleanexit = True

View File

@ -774,55 +774,33 @@ class StreamHandler(object):
async def get_next_msg(self):
r = (False,)
msg = None
try:
while not msg:
try:
# TODO: add an asyncio event or similar to allow stop_leading to shorten this loop,
# since the read will not abort just because the associated writer closes, unfortunately
msg = await asyncio.wait_for(self.sock[0].read(8), timeout=self.keepalive - confluent.util.monotonic_time())
if msg:
self.expiry = confluent.util.monotonic_time() + 60
except asyncio.exceptions.TimeoutError:
print("timeout")
msg = None
if confluent.util.monotonic_time() > self.expiry:
print("expired")
while True:
try:
msg = await asyncio.wait_for(self.sock[0].read(8), timeout=self.keepalive - confluent.util.monotonic_time())
if msg:
self.expiry = confluent.util.monotonic_time() + 60
self.keepalive = confluent.util.monotonic_time() + 20
return msg
else:
return None
except asyncio.exceptions.TimeoutError:
if confluent.util.monotonic_time() > self.expiry:
return None
if confluent.util.monotonic_time() >= self.keepalive:
res = await _push_rpc(self.sock, b'') # nulls are a keepalive
if not res:
return None
if confluent.util.monotonic_time() > self.keepalive:
print("kept alive")
res = await _push_rpc(self.sock, b'') # nulls are a keepalive
if not res:
return None
self.keepalive = confluent.util.monotonic_time() + 20
# while not r[0]:
# r = select.select(
# (self.sock,), (), (),
# self.keepalive - confluent.util.monotonic_time())
# if confluent.util.monotonic_time() > self.expiry:
# return None
# if confluent.util.monotonic_time() > self.keepalive:
# res = await _push_rpc(self.sock, b'') # nulls are a keepalive
# if not res:
# return None
# #TODO: this test can work fine even if the other end is
# # gone, go to a more affirmative test to more quickly
# # detect outage to peer
# self.keepalive = confluent.util.monotonic_time() + 20
#self.expiry = confluent.util.monotonic_time() + 60
#msg = self.sock.recv(8)
except Exception as e:
print(repr(e))
msg = None
return msg
self.keepalive = confluent.util.monotonic_time() + 20
except Exception as e:
print(repr(e))
return None
def close(self):
self.sock = None
async def stop_following(replacement=None):
print("discontinuing following")
async with _leaderlock:
global cfgleader
if cfgleader and not isinstance(cfgleader, bool):
@ -830,7 +808,6 @@ async def stop_following(replacement=None):
cfgleader[1].close()
await cfgleader[1].wait_closed()
except Exception as e:
print(repr(cfgleader))
print(repr(e))
pass
cfgleader = replacement
@ -922,7 +899,6 @@ async def follow_channel(channel):
global _hasquorum
global lastheartbeat
global killswitch
print("channel to follow now")
try:
await stop_leading()
await stop_following(channel)
@ -976,7 +952,6 @@ async def follow_channel(channel):
msg = await lh.get_next_msg()
finally:
# mark the connection as broken
print("I'm out...")
if cfgstreams:
await stop_following(None)
else: