From 6e751c811e748c89d458d69a9f8d5bffc24842e7 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Fri, 17 May 2024 17:07:18 -0400 Subject: [PATCH] Begin rework of macmap.py Redo offload to asyncio subprocess, and replace eventlet Events with futures for messaging. --- .../confluent/networking/macmap.py | 49 ++++++++++--------- 1 file changed, 27 insertions(+), 22 deletions(-) diff --git a/confluent_server/confluent/networking/macmap.py b/confluent_server/confluent/networking/macmap.py index c2f39e91..fa741071 100644 --- a/confluent_server/confluent/networking/macmap.py +++ b/confluent_server/confluent/networking/macmap.py @@ -190,17 +190,20 @@ async def _affluent_map_switch(args): else: _nodesbymac[mac] = (nodename, nummacs) -def _offload_map_switch(switch, password, user): +async def _offload_map_switch(switch, password, user): if _offloader is None: - _start_offloader() + await _start_offloader() evtid = random.randint(0, 4294967295) while evtid in _offloadevts: evtid = random.randint(0, 4294967295) - _offloadevts[evtid] = eventlet.Event() + _offloadevts[evtid] = asyncio.get_event_loop().create_future() _offloader.stdin.write(msgpack.packb((evtid, switch, password, user), - use_bin_type=True)) - _offloader.stdin.flush() - result = _offloadevts[evtid].wait() + use_bin_type=True)) + #_offloader.stdin.flush() + await _offloader.stdin.drain() + print("dispatched..") + result = await _offloadevts[evtid] + print("returned..") del _offloadevts[evtid] if len(result) == 2: if result[0] == 1: @@ -211,33 +214,35 @@ def _offload_map_switch(switch, password, user): -def _start_offloader(): +async def _start_offloader(): global _offloader - _offloader = subprocess.Popen( - [sys.executable, __file__, '-o'], bufsize=0, stdin=subprocess.PIPE, - stdout=subprocess.PIPE) - fl = fcntl.fcntl(_offloader.stdout.fileno(), fcntl.F_GETFL) - fcntl.fcntl(_offloader.stdout.fileno(), - fcntl.F_SETFL, fl | os.O_NONBLOCK) - eventlet.spawn_n(_recv_offload) - eventlet.sleep(0) + #_offloader = subprocess.Popen( + # [sys.executable, __file__, '-o'], bufsize=0, stdin=subprocess.PIPE, + # stdout=subprocess.PIPE) + _offloader = await asyncio.subprocess.create_subprocess_exec(sys.executable, __file__, '-o', stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE) + #fl = fcntl.fcntl(_offloader.stdout.fileno(), fcntl.F_GETFL) + #fcntl.fcntl(_offloader.stdout.fileno(), + # fcntl.F_SETFL, fl | os.O_NONBLOCK) + asyncio.get_event_loop().create_task(_recv_offload()) -def _recv_offload(): +async def _recv_offload(): try: upacker = msgpack.Unpacker(encoding='utf8') except TypeError: upacker = msgpack.Unpacker(raw=False, strict_map_key=False) - instream = _offloader.stdout.fileno() + #instream = _offloader.stdout.fileno() while True: - select.select([_offloader.stdout], [], []) - upacker.feed(os.read(instream, 128)) + #select.select([_offloader.stdout], [], []) + datum = await _offloader.stdout.read(512) + print(repr(datum)) + upacker.feed(datum) for result in upacker: if result[0] not in _offloadevts: print("Uh oh, unexpected event id... " + repr(result)) continue - _offloadevts[result[0]].send(result[1:]) - eventlet.sleep(0) + _offloadevts[result[0]].set_result(result[1:]) + await asyncio.sleep(0) async def _map_switch_backend(args): @@ -274,7 +279,7 @@ async def _map_switch_backend(args): 'expected due to reinstall or new certificate'.format(switch)}) except Exception: pass - mactobridge, ifnamemap, bridgetoifmap = _offload_map_switch( + mactobridge, ifnamemap, bridgetoifmap = await _offload_map_switch( switch, password, user) maccounts = {} bridgetoifvalid = False