From 25d4d13a9680399fbf6cd0e15acabe9196ddb904 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Wed, 8 May 2024 11:35:33 -0400 Subject: [PATCH] Finish conversion of slp to asyncio. Make process_peer async, with socket connection being async, and dependency. Have getaddrinfo use the asyncio version. Rework the snoop to be more effective. Rework the scan to be less convoluted. --- .../confluent/discovery/protocols/slp.py | 146 ++++++++---------- 1 file changed, 67 insertions(+), 79 deletions(-) diff --git a/confluent_server/confluent/discovery/protocols/slp.py b/confluent_server/confluent/discovery/protocols/slp.py index 460dba17..a6bbb40e 100644 --- a/confluent_server/confluent/discovery/protocols/slp.py +++ b/confluent_server/confluent/discovery/protocols/slp.py @@ -20,7 +20,6 @@ import confluent.util as util import confluent.log as log import os import random -#import eventlet.green.socket as socket import socket import struct import traceback @@ -199,7 +198,7 @@ def _generate_request_payload(srvtype, multicast, xid, prlist=''): return header + payload -def _find_srvtype(net, net4, srvtype, addresses, xid): +async def _find_srvtype(net, net4, srvtype, addresses, xid): """Internal function to find a single service type Helper to do singleton requests to srvtype @@ -209,10 +208,11 @@ def _find_srvtype(net, net4, srvtype, addresses, xid): :param addresses: Pass through of addresses argument from find_targets :return: """ + cloop = asyncio.get_running_loop() data = _generate_request_payload(srvtype, True, xid) if addresses is not None: for addr in addresses: - for saddr in socket.getaddrinfo(addr, 427): + for saddr in await cloop.getaddrinfo(addr, 427): if saddr[0] == socket.AF_INET: net4.sendto(data, saddr[4]) elif saddr[0] == socket.AF_INET6: @@ -265,42 +265,6 @@ def sock_read(fut, sock, cloop, allsocks): fut.set_result(sock) allsocks.discard(sock) -async def _bulk_recvfrom(socks, timeout): - allsocks = set([]) - cloop = asyncio.get_running_loop() - done = True - while done: - currfutures = [] - for sock in socks: - sock.setblocking(0) - currfut = asyncio.Future() - cloop.add_reader(sock, sock_read, currfut, sock, cloop, allsocks) - currfutures.append(currfut) - done, dumbfutures = await asyncio.wait(currfutures, timeout=timeout, return_when=asyncio.FIRST_COMPLETED) - for sk in allsocks: - cloop.remove_reader(sk) - for dumbfuture in dumbfutures: - dumbfuture.cancel() - socks = [] - for currfut in done: - socks.append(await currfut) - for sock in socks: - sock.setblocking(0) - try: - yield (sock,) + sock.recvfrom(9000) - except socket.error: - print("shouldn't happen...") - continue - print("done recv") - - - -async def _grab_rsps(socks, rsps, interval, xidmap, deferrals): - async for srp in _bulk_recvfrom(socks, interval): - sock, rsp, peer = srp - _parse_slp_packet(rsp, peer, rsps, xidmap, deferrals, sock) - - def _parse_attrlist(attrstr): attribs = {} @@ -414,11 +378,12 @@ async def _add_attributes(parsed): def unicast_scan(address): pass -def query_srvtypes(target): +async def query_srvtypes(target): """Query the srvtypes advertised by the target :param target: A sockaddr tuple (if you get the peer info) """ + cloop = asyncio.get_running_loop() payload = b'\x00\x00\xff\xff\x00\x07DEFAULT' header = _generate_slp_header(payload, False, functionid=9, xid=1) packet = header + payload @@ -433,20 +398,18 @@ def query_srvtypes(target): while tries and not connected: tries -= 1 try: - net.settimeout(1.0) - net.connect(target) + net.settimeout(0) + await asyncio.wait_for(cloop.sock_connect(net, target), 2.0) connected = True - except socket.error: - pass - if not connected: - return [u''] - net.sendall(packet) - rs = net.recv(8192) + except (socket.error, asyncio.exceptions.TimeoutError) as te: + return [u''] + await cloop.sock_sendall(net, packet) + rs = await cloop.sock_recv(net, 8192) net.close() parsed = _parse_slp_header(rs) if parsed: payload = parsed['payload'] - if payload[:2] != '\x00\x00': + if payload[:2] != b'\x00\x00': return stypelen = struct.unpack('!H', bytes(payload[2:4]))[0] stypes = payload[4:4+stypelen].decode('utf-8') @@ -466,6 +429,14 @@ async def rescan(handler): handler(scanned) +def relay_packet(sock, pktq): + sock.setblocking(0) + try: + rsp, peer = sock.recvfrom(9000) + except socket.error as se: + return + pktq.put_nowait((sock, rsp, peer)) + async def snoop(handler, protocol=None): """Watch for SLP activity @@ -506,7 +477,10 @@ async def snoop(handler, protocol=None): # socket in use can occur when aliased ipv4 are encountered net.bind(('', 427)) net4.bind(('', 427)) - + pktq = asyncio.Queue() + cloop = asyncio.get_running_loop() + cloop.add_reader(net, relay_packet, net, pktq) + cloop.add_reader(net4, relay_packet, net4, pktq) while True: try: newmacs = set([]) @@ -521,29 +495,36 @@ async def snoop(handler, protocol=None): deferpeers = [] timeo = 60 rdy = True - while rdy and len(deferpeers) < 256: - rdy = False - async for srp in _bulk_recvfrom((net, net4), timeo): - rdy = True - s, rsp, peer = srp - if peer in known_peers: - continue - mac = neighutil.get_hwaddr(peer[0]) - if not mac: - probepeer = (peer[0], struct.unpack('H', os.urandom(2))[0] | 1025) + peer[2:] + srp = await pktq.get() + while srp and len(deferpeers) < 256: + s, rsp, peer = srp + if peer in known_peers: + continue + mac = neighutil.get_hwaddr(peer[0]) + if not mac: + probepeer = (peer[0], struct.unpack('H', os.urandom(2))[0] | 1025) + peer[2:] + try: + s.setblocking(1) + s.sendto(b'\x00', probepeer) + except Exception as e: try: - s.setblocking(1) - s.sendto(b'\x00', probepeer) - except Exception: - continue - deferpeers.append(peer) + srp = await asyncio.wait_for(pktq.get(), 0.2) + except asyncio.exceptions.TimeoutError: + break continue - process_peer(newmacs, known_peers, peerbymacaddress, peer) - timeo = 0.2 + deferpeers.append(peer) + continue + await process_peer(newmacs, known_peers, peerbymacaddress, peer) + if len(deferpeers) >= 256: + break + try: + srp = await asyncio.wait_for(pktq.get(), 0.2) + except asyncio.exceptions.TimeoutError: + break if deferpeers: await asyncio.sleep(2.2) for peer in deferpeers: - process_peer(newmacs, known_peers, peerbymacaddress, peer) + await process_peer(newmacs, known_peers, peerbymacaddress, peer) for mac in newmacs: peerbymacaddress[mac]['xid'] = 1 await _add_attributes(peerbymacaddress[mac]) @@ -554,7 +535,6 @@ async def snoop(handler, protocol=None): srvurl = srvurl[:-3] if srvurl.endswith('://Athena:'): continue - print(repr(peerbymacaddress[mac])) if 'service:ipmi' in peerbymacaddress[mac]['services']: continue if 'service:lightttpd' in peerbymacaddress[mac]['services']: @@ -569,7 +549,7 @@ async def snoop(handler, protocol=None): tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event, event=log.Events.stacktrace) -def process_peer(newmacs, known_peers, peerbymacaddress, peer): +async def process_peer(newmacs, known_peers, peerbymacaddress, peer): mac = neighutil.get_hwaddr(peer[0]) if not mac: return @@ -578,7 +558,7 @@ def process_peer(newmacs, known_peers, peerbymacaddress, peer): peerbymacaddress[mac]['addresses'].append(peer) else: try: - q = query_srvtypes(peer) + q = await query_srvtypes(peer) except Exception as e: q = None if not q or not q[0]: @@ -602,12 +582,10 @@ def process_peer(newmacs, known_peers, peerbymacaddress, peer): async def active_scan(handler, protocol=None): known_peers = set([]) - toprocess = [] # Implement a warmup, inducing neighbor table activity # by kernel and giving 2 seconds for a retry or two if # needed async for scanned in scan(): - print('fun with: ' + repr(scanned['services'])) for addr in scanned['addresses']: if addr in known_peers: break @@ -647,6 +625,10 @@ async def scan(srvtypes=_slp_services, addresses=None, localonly=False): # too, so force it #net.setsockopt(IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) # we are going to do broadcast, so allow that... + cloop = asyncio.get_running_loop() + pktq = asyncio.Queue() + cloop.add_reader(net, relay_packet, net, pktq) + cloop.add_reader(net4, relay_packet, net, pktq) initxid = random.randint(0, 32768) xididx = 0 xidmap = {} @@ -655,16 +637,22 @@ async def scan(srvtypes=_slp_services, addresses=None, localonly=False): rsps = {} deferrals = [] print('commence') + rcvq = asyncio.Queue() for srvtype in srvtypes: xididx += 1 - _find_srvtype(net, net4, srvtype, addresses, initxid + xididx) + await _find_srvtype(net, net4, srvtype, addresses, initxid + xididx) xidmap[initxid + xididx] = srvtype - await _grab_rsps((net, net4), rsps, 0.1, xidmap, deferrals) - # now do a more slow check to work to get stragglers, - # but fortunately the above should have taken the brunt of volume, so - # reduced chance of many responses overwhelming receive buffer. + await asyncio.sleep(0) # give async a chance to move things off buffer to queue print('waity') - await _grab_rsps((net, net4), rsps, 1, xidmap, deferrals) + while True: + try: + srp = await asyncio.wait_for(pktq.get(), 1.0) + sock, rsp, peer = srp + _parse_slp_packet(rsp, peer, rsps, xidmap, deferrals, sock) + except asyncio.exceptions.TimeoutError: + break + cloop.remove_reader(net) + cloop.remove_reader(net4) print(len(rsps)) if deferrals: await asyncio.sleep(1.2) # already have a one second pause from select above