2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-02-16 10:39:23 +00:00

Begin rework of macmap.py

Redo offload to asyncio subprocess, and
replace eventlet Events with futures for
messaging.
This commit is contained in:
Jarrod Johnson 2024-05-17 17:07:18 -04:00
parent c03aa728cc
commit 6e751c811e

View File

@ -190,17 +190,20 @@ async def _affluent_map_switch(args):
else:
_nodesbymac[mac] = (nodename, nummacs)
def _offload_map_switch(switch, password, user):
async def _offload_map_switch(switch, password, user):
if _offloader is None:
_start_offloader()
await _start_offloader()
evtid = random.randint(0, 4294967295)
while evtid in _offloadevts:
evtid = random.randint(0, 4294967295)
_offloadevts[evtid] = eventlet.Event()
_offloadevts[evtid] = asyncio.get_event_loop().create_future()
_offloader.stdin.write(msgpack.packb((evtid, switch, password, user),
use_bin_type=True))
_offloader.stdin.flush()
result = _offloadevts[evtid].wait()
use_bin_type=True))
#_offloader.stdin.flush()
await _offloader.stdin.drain()
print("dispatched..")
result = await _offloadevts[evtid]
print("returned..")
del _offloadevts[evtid]
if len(result) == 2:
if result[0] == 1:
@ -211,33 +214,35 @@ def _offload_map_switch(switch, password, user):
def _start_offloader():
async def _start_offloader():
global _offloader
_offloader = subprocess.Popen(
[sys.executable, __file__, '-o'], bufsize=0, stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
fl = fcntl.fcntl(_offloader.stdout.fileno(), fcntl.F_GETFL)
fcntl.fcntl(_offloader.stdout.fileno(),
fcntl.F_SETFL, fl | os.O_NONBLOCK)
eventlet.spawn_n(_recv_offload)
eventlet.sleep(0)
#_offloader = subprocess.Popen(
# [sys.executable, __file__, '-o'], bufsize=0, stdin=subprocess.PIPE,
# stdout=subprocess.PIPE)
_offloader = await asyncio.subprocess.create_subprocess_exec(sys.executable, __file__, '-o', stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE)
#fl = fcntl.fcntl(_offloader.stdout.fileno(), fcntl.F_GETFL)
#fcntl.fcntl(_offloader.stdout.fileno(),
# fcntl.F_SETFL, fl | os.O_NONBLOCK)
asyncio.get_event_loop().create_task(_recv_offload())
def _recv_offload():
async def _recv_offload():
try:
upacker = msgpack.Unpacker(encoding='utf8')
except TypeError:
upacker = msgpack.Unpacker(raw=False, strict_map_key=False)
instream = _offloader.stdout.fileno()
#instream = _offloader.stdout.fileno()
while True:
select.select([_offloader.stdout], [], [])
upacker.feed(os.read(instream, 128))
#select.select([_offloader.stdout], [], [])
datum = await _offloader.stdout.read(512)
print(repr(datum))
upacker.feed(datum)
for result in upacker:
if result[0] not in _offloadevts:
print("Uh oh, unexpected event id... " + repr(result))
continue
_offloadevts[result[0]].send(result[1:])
eventlet.sleep(0)
_offloadevts[result[0]].set_result(result[1:])
await asyncio.sleep(0)
async def _map_switch_backend(args):
@ -274,7 +279,7 @@ async def _map_switch_backend(args):
'expected due to reinstall or new certificate'.format(switch)})
except Exception:
pass
mactobridge, ifnamemap, bridgetoifmap = _offload_map_switch(
mactobridge, ifnamemap, bridgetoifmap = await _offload_map_switch(
switch, password, user)
maccounts = {}
bridgetoifvalid = False