From c03aa728cc2cfb0991793ddbeaa9d371622eec1d Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Fri, 17 May 2024 16:03:37 -0400 Subject: [PATCH] Properly detect killed leader If leader closes connection, then have get_next_msg return None as it did before. --- .../confluent/collective/manager.py | 1 - .../confluent/config/configmanager.py | 65 ++++++------------- 2 files changed, 20 insertions(+), 46 deletions(-) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 453acd15..752636ff 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -207,7 +207,6 @@ async def follow_leader(remote, leader): newleader = None try: exitcause = await cfm.follow_channel(remote) - print(repr(exitcause)) newleader = exitcause.get('newleader', None) except greenlet.GreenletExit: cleanexit = True diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index b3f87818..34b48a45 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -774,55 +774,33 @@ class StreamHandler(object): async def get_next_msg(self): - r = (False,) - msg = None - try: - while not msg: - try: - # TODO: add an asyncio event or similar to allow stop_leading to shorten this loop, - # since the read will not abort just because the associated writer closes, unfortunately - msg = await asyncio.wait_for(self.sock[0].read(8), timeout=self.keepalive - confluent.util.monotonic_time()) - if msg: - self.expiry = confluent.util.monotonic_time() + 60 - except asyncio.exceptions.TimeoutError: - print("timeout") - msg = None - if confluent.util.monotonic_time() > self.expiry: - print("expired") + while True: + try: + msg = await asyncio.wait_for(self.sock[0].read(8), timeout=self.keepalive - confluent.util.monotonic_time()) + if msg: + self.expiry = confluent.util.monotonic_time() + 60 + self.keepalive = confluent.util.monotonic_time() + 20 + return msg + else: + return None + except asyncio.exceptions.TimeoutError: + if confluent.util.monotonic_time() > self.expiry: + return None + if confluent.util.monotonic_time() >= self.keepalive: + res = await _push_rpc(self.sock, b'') # nulls are a keepalive + if not res: return None - if confluent.util.monotonic_time() > self.keepalive: - print("kept alive") - res = await _push_rpc(self.sock, b'') # nulls are a keepalive - if not res: - return None - self.keepalive = confluent.util.monotonic_time() + 20 - # while not r[0]: - # r = select.select( - # (self.sock,), (), (), - # self.keepalive - confluent.util.monotonic_time()) - # if confluent.util.monotonic_time() > self.expiry: - # return None - # if confluent.util.monotonic_time() > self.keepalive: - # res = await _push_rpc(self.sock, b'') # nulls are a keepalive - # if not res: - # return None - # #TODO: this test can work fine even if the other end is - # # gone, go to a more affirmative test to more quickly - # # detect outage to peer - # self.keepalive = confluent.util.monotonic_time() + 20 - #self.expiry = confluent.util.monotonic_time() + 60 - #msg = self.sock.recv(8) - except Exception as e: - print(repr(e)) - msg = None - return msg + self.keepalive = confluent.util.monotonic_time() + 20 + except Exception as e: + print(repr(e)) + return None + def close(self): self.sock = None async def stop_following(replacement=None): - print("discontinuing following") async with _leaderlock: global cfgleader if cfgleader and not isinstance(cfgleader, bool): @@ -830,7 +808,6 @@ async def stop_following(replacement=None): cfgleader[1].close() await cfgleader[1].wait_closed() except Exception as e: - print(repr(cfgleader)) print(repr(e)) pass cfgleader = replacement @@ -922,7 +899,6 @@ async def follow_channel(channel): global _hasquorum global lastheartbeat global killswitch - print("channel to follow now") try: await stop_leading() await stop_following(channel) @@ -976,7 +952,6 @@ async def follow_channel(channel): msg = await lh.get_next_msg() finally: # mark the connection as broken - print("I'm out...") if cfgstreams: await stop_following(None) else: