From 207cc3471e77cd0331e727033f14725dfc645e4e Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Thu, 16 May 2024 15:40:43 -0400 Subject: [PATCH] Fix closing sockets in various contexts With asyncio, we must close the writer half of a pair Also rework the get_next_msg to work better. Still need to allow stop_following to interrupt get_next_msg --- confluent_server/bin/confluent | 6 +- .../confluent/collective/manager.py | 82 ++++++++++++------- .../confluent/config/configmanager.py | 31 ++++++- confluent_server/confluent/sockapi.py | 12 ++- 4 files changed, 94 insertions(+), 37 deletions(-) diff --git a/confluent_server/bin/confluent b/confluent_server/bin/confluent index 751208a2..88b3ea81 100755 --- a/confluent_server/bin/confluent +++ b/confluent_server/bin/confluent @@ -37,8 +37,12 @@ import confluent.main #p.enable() #try: import multiprocessing + +async def main(): + await confluent.main.run(sys.argv) if __name__ == '__main__': - multiprocessing.freeze_support() + #multiprocessing.freeze_support() + #asyncio.get_event_loop().run_until_complete(main()) gt = spawn_for_awaitable(confluent.main.run(sys.argv)) gt.wait() #except: diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 173add9d..453acd15 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -33,8 +33,6 @@ import random import time import sys -import OpenSSL.crypto as crypto - class PyObject_HEAD(ctypes.Structure): _fields_ = [ @@ -101,7 +99,6 @@ async def connect_to_leader(cert=None, name=None, leader=None, remote=None, isre ''.format(leader, str(e)), 'subsystem': 'collective'}) return False - print("connecting to leader") async with connecting: async with cfm._initlock: # remote is a socket... @@ -169,7 +166,8 @@ async def connect_to_leader(cert=None, name=None, leader=None, remote=None, isre ndata = await remote[0].read(dbsize - len(dbjson)) if not ndata: try: - remote[0].close() + remote[1].close() + await remote[1].wait_closed() except Exception: pass log.log({'error': 'Retrying connection, error during initial sync', 'subsystem': 'collective'}) @@ -190,7 +188,7 @@ async def connect_to_leader(cert=None, name=None, leader=None, remote=None, isre sync=False) cfm.commit_clear() except Exception: - print("huh????") + print(repr(e)) await cfm.stop_following() cfm.rollback_clear() raise @@ -328,8 +326,11 @@ async def handle_connection(connection, cert, request, local=False): linfo = cfm.get_collective_member_by_address(currentleader) try: _, remote = await create_connection(currentleader) - except Exception: - cfm.stop_following() + if isinstance(remote, Exception): + raise remote + except Exception as e: + print(repr(e)) + await cfm.stop_following() return #remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE, # keyfile='/etc/confluent/privkey.pem', @@ -338,11 +339,13 @@ async def handle_connection(connection, cert, request, local=False): if not (linfo and util.cert_matches( linfo['fingerprint'], cert)): - #remote.close() + remote[1].close() + await remote[1].wait_closed() await tlvdata.send(connection, {'error': 'Invalid certificate, ' 'redo invitation process'}) - # connection.close() + connection[1].close() + await connection[1].wait_closed() return await tlvdata.recv(remote) # ignore banner await tlvdata.recv(remote) # ignore authpassed: 0 @@ -374,7 +377,8 @@ async def handle_connection(connection, cert, request, local=False): await cfm.del_collective_member(todelete) await tlvdata.send(connection, {'collective': {'status': 'Successfully deleted {0}'.format(todelete)}}) - connection.close() + connection[1].close() + await connection[1].wait_closed() return if 'invite' == operation: try: @@ -391,7 +395,8 @@ async def handle_connection(connection, cert, request, local=False): invitation = invites.create_server_invitation(name, role) await tlvdata.send(connection, {'collective': {'invitation': invitation}}) - connection.close() + connection[1].close() + await connection[1].wait_closed() if 'join' == operation: invitation = request['invitation'] try: @@ -403,7 +408,8 @@ async def handle_connection(connection, cert, request, local=False): connection, {'collective': {'status': 'Invalid token format'}}) - connection.close() + connection[1].close() + await connection[1].wait_closed() return host = request['server'] try: @@ -441,7 +447,8 @@ async def handle_connection(connection, cert, request, local=False): connection, {'collective': {'status': 'Failed to connect to {0}'.format(host)}}) - connection.close() + connection[1].close() + await connection[1].wait_closed() raise return mycert = util.get_certificate_from_file( @@ -462,12 +469,14 @@ async def handle_connection(connection, cert, request, local=False): proof = base64.b64decode(proof) j = invites.check_server_proof(invitation, mycert, cert, proof) if not j: - remote.close() + remote[1].close() + await remote[1].wait_closed() await tlvdata.send(connection, {'collective': {'status': 'Bad server token'}}) return await tlvdata.send(connection, {'collective': {'status': 'Success'}}) - # connection.close() + connection[1].close() + await connection[1].wait_closed() currentleader = rsp['collective']['leader'] f = open('/etc/confluent/cfg/myname', 'w') f.write(name) @@ -561,12 +570,14 @@ async def handle_connection(connection, cert, request, local=False): connection, {'error': 'Already following, assimilate leader first', 'leader': currentleader}) - connection.close() + connection[1].close() + await connection[1].wait_closed() return if connecting.active: # don't try to connect while actively already trying to connect await tlvdata.send(connection, {'status': 0}) - #connection.close() + connection[1].close() + await connection[1].wait_closed() return cnn = connection[1].get_extra_info('socket') if (currentleader == cnn.getpeername()[0] and @@ -574,7 +585,8 @@ async def handle_connection(connection, cert, request, local=False): # if we are happily following this leader already, don't stir # the pot await tlvdata.send(connection, {'status': 0}) - connection.close() + connection[1].close() + await connection[1].wait_closed() return log.log({'info': 'Connecting in response to assimilation', 'subsystem': 'collective'}) @@ -582,7 +594,8 @@ async def handle_connection(connection, cert, request, local=False): if cfm.cfgstreams: await retire_as_leader(newleader) await tlvdata.send(connection, {'status': 0}) - cnn.close() + connection[1].close() + await connection[1].wait_closed() if not await connect_to_leader(None, None, leader=newleader): if retrythread is None: retrythread = util.spawn_after(random.random(), @@ -595,7 +608,8 @@ async def handle_connection(connection, cert, request, local=False): await tlvdata.send(connection, {'error': 'Invalid certificate, ' 'redo invitation process'}) - connection.close() + connection[1].close() + await connection[1].wait_closed() return collinfo = {} populate_collinfo(collinfo) @@ -608,26 +622,31 @@ async def handle_connection(connection, cert, request, local=False): await tlvdata.send(connection, {'error': 'Invalid certificate, ' 'redo invitation process'}) - #connection.close() + connection[1].close() + await connection[1].wait_closed() return cnn = connection[1].transport.get_extra_info('socket') myself = cnn.getsockname()[0] if connecting.active or initting: await tlvdata.send(connection, {'error': 'Connecting right now', 'backoff': True}) - #connection.close() + connection[1].close() + await connection[1].wait_closed() return if leader_init.active: + print("initting leader....") await tlvdata.send(connection, {'error': 'Servicing a connection', 'waitinline': True}) - #connection.close() + connection[1].close() + await connection[1].wait_closed() return if myself != await get_leader(connection): await tlvdata.send( connection, {'error': 'Cannot assimilate, our leader is ' 'in another castle', 'leader': currentleader}) - #connection.close() + connection[1].close() + await connection[1].wait_closed() return if request['txcount'] > cfm._txcount: await retire_as_leader() @@ -639,7 +658,8 @@ async def handle_connection(connection, cert, request, local=False): 'transaction count', 'subsystem': 'collective'}) cnn = connection[1].transport.get_extra_info('socket') peername = cnn.getpeername()[0] - cnn.close() + connection[1].close() + await connection[1].wait_closed() if not await connect_to_leader( None, None, peername): if retrythread is None: @@ -662,9 +682,11 @@ async def handle_connection(connection, cert, request, local=False): 'dbsize': len(cfgdata)}) connection[1].write(cfgdata) await connection[1].drain() - except Exception: + except Exception as e: + print(repr(e)) try: - connection.close() + connection[1].close() + await connection[1].wait_closed() finally: raise return None @@ -936,9 +958,11 @@ async def start_collective(): log.log({'info': 'Performing startup attempt to {0}'.format( member), 'subsystem': 'collective'}) if not await connect_to_leader(name=myname, leader=member, remote=remote): - remote.close() + remote[1].close() + await remote[1].wait_closed() else: - remote.close() + remote[1].close() + await remote[1].wait_closed() except Exception as e: pass finally: diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 1d6e9b23..b3f87818 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -434,7 +434,8 @@ async def _push_rpc(stream, payload): if membership_callback: membership_callback() try: - stream.close() + stream[1].close() + await stream[1].wait_closed() except Exception: pass @@ -778,12 +779,19 @@ class StreamHandler(object): try: while not msg: try: + # TODO: add an asyncio event or similar to allow stop_leading to shorten this loop, + # since the read will not abort just because the associated writer closes, unfortunately msg = await asyncio.wait_for(self.sock[0].read(8), timeout=self.keepalive - confluent.util.monotonic_time()) - except TimeoutError: + if msg: + self.expiry = confluent.util.monotonic_time() + 60 + except asyncio.exceptions.TimeoutError: + print("timeout") msg = None if confluent.util.monotonic_time() > self.expiry: + print("expired") return None if confluent.util.monotonic_time() > self.keepalive: + print("kept alive") res = await _push_rpc(self.sock, b'') # nulls are a keepalive if not res: return None @@ -805,6 +813,7 @@ class StreamHandler(object): #self.expiry = confluent.util.monotonic_time() + 60 #msg = self.sock.recv(8) except Exception as e: + print(repr(e)) msg = None return msg @@ -813,12 +822,16 @@ class StreamHandler(object): async def stop_following(replacement=None): + print("discontinuing following") async with _leaderlock: global cfgleader if cfgleader and not isinstance(cfgleader, bool): try: - cfgleader.close() - except Exception: + cfgleader[1].close() + await cfgleader[1].wait_closed() + except Exception as e: + print(repr(cfgleader)) + print(repr(e)) pass cfgleader = replacement @@ -901,15 +914,24 @@ def commit_clear(): cfgleader = None +killswitch = None + async def follow_channel(channel): global _txcount global _hasquorum global lastheartbeat + global killswitch + print("channel to follow now") try: await stop_leading() await stop_following(channel) lh = StreamHandler(channel) + # TODO: add killswitch mechanism. stop_following closes the writer, but + # that seems to have no impact on the reader, causing a senseless delay + # to timeout. + # to trigger this, kill the leader and immediately run 'collective show' + # on a follower msg = await lh.get_next_msg() while msg: msg, rpc = msg[:8], msg[8:] @@ -954,6 +976,7 @@ async def follow_channel(channel): msg = await lh.get_next_msg() finally: # mark the connection as broken + print("I'm out...") if cfgstreams: await stop_following(None) else: diff --git a/confluent_server/confluent/sockapi.py b/confluent_server/confluent/sockapi.py index 71cffe55..47bfbdd3 100644 --- a/confluent_server/confluent/sockapi.py +++ b/confluent_server/confluent/sockapi.py @@ -39,7 +39,7 @@ import confluent.auth as auth import confluent.credserver as credserver import confluent.config.conf as conf import confluent.tlvdata as tlvdata -import confluent.consoleserver as consoleserver +#import confluent.consoleserver as consoleserver import confluent.config.configmanager as configmanager import confluent.exceptions as exc import confluent.log as log @@ -192,8 +192,14 @@ async def sessionhdl(connection, authname, skipauth=False, cert=None): if cfm: cfm.close_client_files() try: - connection.close() - except Exception: + if isinstance(connection, tuple): + connection[1].close() + await connection[1].wait_closed() + connection = connection[1].get_extra_info('socket') + else: + connection.close() + except Exception as e: + print(repr(e)) pass