2
0
mirror of https://github.com/xcat2/confluent.git synced 2024-11-22 09:32:21 +00:00

Fix closing sockets in various contexts

With asyncio, we must close the writer half of a pair

Also rework the get_next_msg to work better.

Still need to allow stop_following to interrupt get_next_msg
This commit is contained in:
Jarrod Johnson 2024-05-16 15:40:43 -04:00
parent 5a9f608451
commit 207cc3471e
4 changed files with 94 additions and 37 deletions

View File

@ -37,8 +37,12 @@ import confluent.main
#p.enable()
#try:
import multiprocessing
async def main():
await confluent.main.run(sys.argv)
if __name__ == '__main__':
multiprocessing.freeze_support()
#multiprocessing.freeze_support()
#asyncio.get_event_loop().run_until_complete(main())
gt = spawn_for_awaitable(confluent.main.run(sys.argv))
gt.wait()
#except:

View File

@ -33,8 +33,6 @@ import random
import time
import sys
import OpenSSL.crypto as crypto
class PyObject_HEAD(ctypes.Structure):
_fields_ = [
@ -101,7 +99,6 @@ async def connect_to_leader(cert=None, name=None, leader=None, remote=None, isre
''.format(leader, str(e)),
'subsystem': 'collective'})
return False
print("connecting to leader")
async with connecting:
async with cfm._initlock:
# remote is a socket...
@ -169,7 +166,8 @@ async def connect_to_leader(cert=None, name=None, leader=None, remote=None, isre
ndata = await remote[0].read(dbsize - len(dbjson))
if not ndata:
try:
remote[0].close()
remote[1].close()
await remote[1].wait_closed()
except Exception:
pass
log.log({'error': 'Retrying connection, error during initial sync', 'subsystem': 'collective'})
@ -190,7 +188,7 @@ async def connect_to_leader(cert=None, name=None, leader=None, remote=None, isre
sync=False)
cfm.commit_clear()
except Exception:
print("huh????")
print(repr(e))
await cfm.stop_following()
cfm.rollback_clear()
raise
@ -328,8 +326,11 @@ async def handle_connection(connection, cert, request, local=False):
linfo = cfm.get_collective_member_by_address(currentleader)
try:
_, remote = await create_connection(currentleader)
except Exception:
cfm.stop_following()
if isinstance(remote, Exception):
raise remote
except Exception as e:
print(repr(e))
await cfm.stop_following()
return
#remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE,
# keyfile='/etc/confluent/privkey.pem',
@ -338,11 +339,13 @@ async def handle_connection(connection, cert, request, local=False):
if not (linfo and util.cert_matches(
linfo['fingerprint'],
cert)):
#remote.close()
remote[1].close()
await remote[1].wait_closed()
await tlvdata.send(connection,
{'error': 'Invalid certificate, '
'redo invitation process'})
# connection.close()
connection[1].close()
await connection[1].wait_closed()
return
await tlvdata.recv(remote) # ignore banner
await tlvdata.recv(remote) # ignore authpassed: 0
@ -374,7 +377,8 @@ async def handle_connection(connection, cert, request, local=False):
await cfm.del_collective_member(todelete)
await tlvdata.send(connection,
{'collective': {'status': 'Successfully deleted {0}'.format(todelete)}})
connection.close()
connection[1].close()
await connection[1].wait_closed()
return
if 'invite' == operation:
try:
@ -391,7 +395,8 @@ async def handle_connection(connection, cert, request, local=False):
invitation = invites.create_server_invitation(name, role)
await tlvdata.send(connection,
{'collective': {'invitation': invitation}})
connection.close()
connection[1].close()
await connection[1].wait_closed()
if 'join' == operation:
invitation = request['invitation']
try:
@ -403,7 +408,8 @@ async def handle_connection(connection, cert, request, local=False):
connection,
{'collective':
{'status': 'Invalid token format'}})
connection.close()
connection[1].close()
await connection[1].wait_closed()
return
host = request['server']
try:
@ -441,7 +447,8 @@ async def handle_connection(connection, cert, request, local=False):
connection,
{'collective':
{'status': 'Failed to connect to {0}'.format(host)}})
connection.close()
connection[1].close()
await connection[1].wait_closed()
raise
return
mycert = util.get_certificate_from_file(
@ -462,12 +469,14 @@ async def handle_connection(connection, cert, request, local=False):
proof = base64.b64decode(proof)
j = invites.check_server_proof(invitation, mycert, cert, proof)
if not j:
remote.close()
remote[1].close()
await remote[1].wait_closed()
await tlvdata.send(connection, {'collective':
{'status': 'Bad server token'}})
return
await tlvdata.send(connection, {'collective': {'status': 'Success'}})
# connection.close()
connection[1].close()
await connection[1].wait_closed()
currentleader = rsp['collective']['leader']
f = open('/etc/confluent/cfg/myname', 'w')
f.write(name)
@ -561,12 +570,14 @@ async def handle_connection(connection, cert, request, local=False):
connection,
{'error': 'Already following, assimilate leader first',
'leader': currentleader})
connection.close()
connection[1].close()
await connection[1].wait_closed()
return
if connecting.active:
# don't try to connect while actively already trying to connect
await tlvdata.send(connection, {'status': 0})
#connection.close()
connection[1].close()
await connection[1].wait_closed()
return
cnn = connection[1].get_extra_info('socket')
if (currentleader == cnn.getpeername()[0] and
@ -574,7 +585,8 @@ async def handle_connection(connection, cert, request, local=False):
# if we are happily following this leader already, don't stir
# the pot
await tlvdata.send(connection, {'status': 0})
connection.close()
connection[1].close()
await connection[1].wait_closed()
return
log.log({'info': 'Connecting in response to assimilation',
'subsystem': 'collective'})
@ -582,7 +594,8 @@ async def handle_connection(connection, cert, request, local=False):
if cfm.cfgstreams:
await retire_as_leader(newleader)
await tlvdata.send(connection, {'status': 0})
cnn.close()
connection[1].close()
await connection[1].wait_closed()
if not await connect_to_leader(None, None, leader=newleader):
if retrythread is None:
retrythread = util.spawn_after(random.random(),
@ -595,7 +608,8 @@ async def handle_connection(connection, cert, request, local=False):
await tlvdata.send(connection,
{'error': 'Invalid certificate, '
'redo invitation process'})
connection.close()
connection[1].close()
await connection[1].wait_closed()
return
collinfo = {}
populate_collinfo(collinfo)
@ -608,26 +622,31 @@ async def handle_connection(connection, cert, request, local=False):
await tlvdata.send(connection,
{'error': 'Invalid certificate, '
'redo invitation process'})
#connection.close()
connection[1].close()
await connection[1].wait_closed()
return
cnn = connection[1].transport.get_extra_info('socket')
myself = cnn.getsockname()[0]
if connecting.active or initting:
await tlvdata.send(connection, {'error': 'Connecting right now',
'backoff': True})
#connection.close()
connection[1].close()
await connection[1].wait_closed()
return
if leader_init.active:
print("initting leader....")
await tlvdata.send(connection, {'error': 'Servicing a connection',
'waitinline': True})
#connection.close()
connection[1].close()
await connection[1].wait_closed()
return
if myself != await get_leader(connection):
await tlvdata.send(
connection,
{'error': 'Cannot assimilate, our leader is '
'in another castle', 'leader': currentleader})
#connection.close()
connection[1].close()
await connection[1].wait_closed()
return
if request['txcount'] > cfm._txcount:
await retire_as_leader()
@ -639,7 +658,8 @@ async def handle_connection(connection, cert, request, local=False):
'transaction count', 'subsystem': 'collective'})
cnn = connection[1].transport.get_extra_info('socket')
peername = cnn.getpeername()[0]
cnn.close()
connection[1].close()
await connection[1].wait_closed()
if not await connect_to_leader(
None, None, peername):
if retrythread is None:
@ -662,9 +682,11 @@ async def handle_connection(connection, cert, request, local=False):
'dbsize': len(cfgdata)})
connection[1].write(cfgdata)
await connection[1].drain()
except Exception:
except Exception as e:
print(repr(e))
try:
connection.close()
connection[1].close()
await connection[1].wait_closed()
finally:
raise
return None
@ -936,9 +958,11 @@ async def start_collective():
log.log({'info': 'Performing startup attempt to {0}'.format(
member), 'subsystem': 'collective'})
if not await connect_to_leader(name=myname, leader=member, remote=remote):
remote.close()
remote[1].close()
await remote[1].wait_closed()
else:
remote.close()
remote[1].close()
await remote[1].wait_closed()
except Exception as e:
pass
finally:

View File

@ -434,7 +434,8 @@ async def _push_rpc(stream, payload):
if membership_callback:
membership_callback()
try:
stream.close()
stream[1].close()
await stream[1].wait_closed()
except Exception:
pass
@ -778,12 +779,19 @@ class StreamHandler(object):
try:
while not msg:
try:
# TODO: add an asyncio event or similar to allow stop_leading to shorten this loop,
# since the read will not abort just because the associated writer closes, unfortunately
msg = await asyncio.wait_for(self.sock[0].read(8), timeout=self.keepalive - confluent.util.monotonic_time())
except TimeoutError:
if msg:
self.expiry = confluent.util.monotonic_time() + 60
except asyncio.exceptions.TimeoutError:
print("timeout")
msg = None
if confluent.util.monotonic_time() > self.expiry:
print("expired")
return None
if confluent.util.monotonic_time() > self.keepalive:
print("kept alive")
res = await _push_rpc(self.sock, b'') # nulls are a keepalive
if not res:
return None
@ -805,6 +813,7 @@ class StreamHandler(object):
#self.expiry = confluent.util.monotonic_time() + 60
#msg = self.sock.recv(8)
except Exception as e:
print(repr(e))
msg = None
return msg
@ -813,12 +822,16 @@ class StreamHandler(object):
async def stop_following(replacement=None):
print("discontinuing following")
async with _leaderlock:
global cfgleader
if cfgleader and not isinstance(cfgleader, bool):
try:
cfgleader.close()
except Exception:
cfgleader[1].close()
await cfgleader[1].wait_closed()
except Exception as e:
print(repr(cfgleader))
print(repr(e))
pass
cfgleader = replacement
@ -901,15 +914,24 @@ def commit_clear():
cfgleader = None
killswitch = None
async def follow_channel(channel):
global _txcount
global _hasquorum
global lastheartbeat
global killswitch
print("channel to follow now")
try:
await stop_leading()
await stop_following(channel)
lh = StreamHandler(channel)
# TODO: add killswitch mechanism. stop_following closes the writer, but
# that seems to have no impact on the reader, causing a senseless delay
# to timeout.
# to trigger this, kill the leader and immediately run 'collective show'
# on a follower
msg = await lh.get_next_msg()
while msg:
msg, rpc = msg[:8], msg[8:]
@ -954,6 +976,7 @@ async def follow_channel(channel):
msg = await lh.get_next_msg()
finally:
# mark the connection as broken
print("I'm out...")
if cfgstreams:
await stop_following(None)
else:

View File

@ -39,7 +39,7 @@ import confluent.auth as auth
import confluent.credserver as credserver
import confluent.config.conf as conf
import confluent.tlvdata as tlvdata
import confluent.consoleserver as consoleserver
#import confluent.consoleserver as consoleserver
import confluent.config.configmanager as configmanager
import confluent.exceptions as exc
import confluent.log as log
@ -192,8 +192,14 @@ async def sessionhdl(connection, authname, skipauth=False, cert=None):
if cfm:
cfm.close_client_files()
try:
connection.close()
except Exception:
if isinstance(connection, tuple):
connection[1].close()
await connection[1].wait_closed()
connection = connection[1].get_extra_info('socket')
else:
connection.close()
except Exception as e:
print(repr(e))
pass