2
0
mirror of https://github.com/xcat2/confluent.git synced 2024-11-23 01:53:28 +00:00

Finish conversion of slp to asyncio.

Make process_peer async, with socket connection being async,
and dependency.

Have getaddrinfo use the asyncio version.

Rework the snoop to be more effective.

Rework the scan to be less convoluted.
This commit is contained in:
Jarrod Johnson 2024-05-08 11:35:33 -04:00
parent 23658680a5
commit 25d4d13a96

View File

@ -20,7 +20,6 @@ import confluent.util as util
import confluent.log as log
import os
import random
#import eventlet.green.socket as socket
import socket
import struct
import traceback
@ -199,7 +198,7 @@ def _generate_request_payload(srvtype, multicast, xid, prlist=''):
return header + payload
def _find_srvtype(net, net4, srvtype, addresses, xid):
async def _find_srvtype(net, net4, srvtype, addresses, xid):
"""Internal function to find a single service type
Helper to do singleton requests to srvtype
@ -209,10 +208,11 @@ def _find_srvtype(net, net4, srvtype, addresses, xid):
:param addresses: Pass through of addresses argument from find_targets
:return:
"""
cloop = asyncio.get_running_loop()
data = _generate_request_payload(srvtype, True, xid)
if addresses is not None:
for addr in addresses:
for saddr in socket.getaddrinfo(addr, 427):
for saddr in await cloop.getaddrinfo(addr, 427):
if saddr[0] == socket.AF_INET:
net4.sendto(data, saddr[4])
elif saddr[0] == socket.AF_INET6:
@ -265,42 +265,6 @@ def sock_read(fut, sock, cloop, allsocks):
fut.set_result(sock)
allsocks.discard(sock)
async def _bulk_recvfrom(socks, timeout):
allsocks = set([])
cloop = asyncio.get_running_loop()
done = True
while done:
currfutures = []
for sock in socks:
sock.setblocking(0)
currfut = asyncio.Future()
cloop.add_reader(sock, sock_read, currfut, sock, cloop, allsocks)
currfutures.append(currfut)
done, dumbfutures = await asyncio.wait(currfutures, timeout=timeout, return_when=asyncio.FIRST_COMPLETED)
for sk in allsocks:
cloop.remove_reader(sk)
for dumbfuture in dumbfutures:
dumbfuture.cancel()
socks = []
for currfut in done:
socks.append(await currfut)
for sock in socks:
sock.setblocking(0)
try:
yield (sock,) + sock.recvfrom(9000)
except socket.error:
print("shouldn't happen...")
continue
print("done recv")
async def _grab_rsps(socks, rsps, interval, xidmap, deferrals):
async for srp in _bulk_recvfrom(socks, interval):
sock, rsp, peer = srp
_parse_slp_packet(rsp, peer, rsps, xidmap, deferrals, sock)
def _parse_attrlist(attrstr):
attribs = {}
@ -414,11 +378,12 @@ async def _add_attributes(parsed):
def unicast_scan(address):
pass
def query_srvtypes(target):
async def query_srvtypes(target):
"""Query the srvtypes advertised by the target
:param target: A sockaddr tuple (if you get the peer info)
"""
cloop = asyncio.get_running_loop()
payload = b'\x00\x00\xff\xff\x00\x07DEFAULT'
header = _generate_slp_header(payload, False, functionid=9, xid=1)
packet = header + payload
@ -433,20 +398,18 @@ def query_srvtypes(target):
while tries and not connected:
tries -= 1
try:
net.settimeout(1.0)
net.connect(target)
net.settimeout(0)
await asyncio.wait_for(cloop.sock_connect(net, target), 2.0)
connected = True
except socket.error:
pass
if not connected:
return [u'']
net.sendall(packet)
rs = net.recv(8192)
except (socket.error, asyncio.exceptions.TimeoutError) as te:
return [u'']
await cloop.sock_sendall(net, packet)
rs = await cloop.sock_recv(net, 8192)
net.close()
parsed = _parse_slp_header(rs)
if parsed:
payload = parsed['payload']
if payload[:2] != '\x00\x00':
if payload[:2] != b'\x00\x00':
return
stypelen = struct.unpack('!H', bytes(payload[2:4]))[0]
stypes = payload[4:4+stypelen].decode('utf-8')
@ -466,6 +429,14 @@ async def rescan(handler):
handler(scanned)
def relay_packet(sock, pktq):
sock.setblocking(0)
try:
rsp, peer = sock.recvfrom(9000)
except socket.error as se:
return
pktq.put_nowait((sock, rsp, peer))
async def snoop(handler, protocol=None):
"""Watch for SLP activity
@ -506,7 +477,10 @@ async def snoop(handler, protocol=None):
# socket in use can occur when aliased ipv4 are encountered
net.bind(('', 427))
net4.bind(('', 427))
pktq = asyncio.Queue()
cloop = asyncio.get_running_loop()
cloop.add_reader(net, relay_packet, net, pktq)
cloop.add_reader(net4, relay_packet, net4, pktq)
while True:
try:
newmacs = set([])
@ -521,29 +495,36 @@ async def snoop(handler, protocol=None):
deferpeers = []
timeo = 60
rdy = True
while rdy and len(deferpeers) < 256:
rdy = False
async for srp in _bulk_recvfrom((net, net4), timeo):
rdy = True
s, rsp, peer = srp
if peer in known_peers:
continue
mac = neighutil.get_hwaddr(peer[0])
if not mac:
probepeer = (peer[0], struct.unpack('H', os.urandom(2))[0] | 1025) + peer[2:]
srp = await pktq.get()
while srp and len(deferpeers) < 256:
s, rsp, peer = srp
if peer in known_peers:
continue
mac = neighutil.get_hwaddr(peer[0])
if not mac:
probepeer = (peer[0], struct.unpack('H', os.urandom(2))[0] | 1025) + peer[2:]
try:
s.setblocking(1)
s.sendto(b'\x00', probepeer)
except Exception as e:
try:
s.setblocking(1)
s.sendto(b'\x00', probepeer)
except Exception:
continue
deferpeers.append(peer)
srp = await asyncio.wait_for(pktq.get(), 0.2)
except asyncio.exceptions.TimeoutError:
break
continue
process_peer(newmacs, known_peers, peerbymacaddress, peer)
timeo = 0.2
deferpeers.append(peer)
continue
await process_peer(newmacs, known_peers, peerbymacaddress, peer)
if len(deferpeers) >= 256:
break
try:
srp = await asyncio.wait_for(pktq.get(), 0.2)
except asyncio.exceptions.TimeoutError:
break
if deferpeers:
await asyncio.sleep(2.2)
for peer in deferpeers:
process_peer(newmacs, known_peers, peerbymacaddress, peer)
await process_peer(newmacs, known_peers, peerbymacaddress, peer)
for mac in newmacs:
peerbymacaddress[mac]['xid'] = 1
await _add_attributes(peerbymacaddress[mac])
@ -554,7 +535,6 @@ async def snoop(handler, protocol=None):
srvurl = srvurl[:-3]
if srvurl.endswith('://Athena:'):
continue
print(repr(peerbymacaddress[mac]))
if 'service:ipmi' in peerbymacaddress[mac]['services']:
continue
if 'service:lightttpd' in peerbymacaddress[mac]['services']:
@ -569,7 +549,7 @@ async def snoop(handler, protocol=None):
tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event,
event=log.Events.stacktrace)
def process_peer(newmacs, known_peers, peerbymacaddress, peer):
async def process_peer(newmacs, known_peers, peerbymacaddress, peer):
mac = neighutil.get_hwaddr(peer[0])
if not mac:
return
@ -578,7 +558,7 @@ def process_peer(newmacs, known_peers, peerbymacaddress, peer):
peerbymacaddress[mac]['addresses'].append(peer)
else:
try:
q = query_srvtypes(peer)
q = await query_srvtypes(peer)
except Exception as e:
q = None
if not q or not q[0]:
@ -602,12 +582,10 @@ def process_peer(newmacs, known_peers, peerbymacaddress, peer):
async def active_scan(handler, protocol=None):
known_peers = set([])
toprocess = []
# Implement a warmup, inducing neighbor table activity
# by kernel and giving 2 seconds for a retry or two if
# needed
async for scanned in scan():
print('fun with: ' + repr(scanned['services']))
for addr in scanned['addresses']:
if addr in known_peers:
break
@ -647,6 +625,10 @@ async def scan(srvtypes=_slp_services, addresses=None, localonly=False):
# too, so force it
#net.setsockopt(IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
# we are going to do broadcast, so allow that...
cloop = asyncio.get_running_loop()
pktq = asyncio.Queue()
cloop.add_reader(net, relay_packet, net, pktq)
cloop.add_reader(net4, relay_packet, net, pktq)
initxid = random.randint(0, 32768)
xididx = 0
xidmap = {}
@ -655,16 +637,22 @@ async def scan(srvtypes=_slp_services, addresses=None, localonly=False):
rsps = {}
deferrals = []
print('commence')
rcvq = asyncio.Queue()
for srvtype in srvtypes:
xididx += 1
_find_srvtype(net, net4, srvtype, addresses, initxid + xididx)
await _find_srvtype(net, net4, srvtype, addresses, initxid + xididx)
xidmap[initxid + xididx] = srvtype
await _grab_rsps((net, net4), rsps, 0.1, xidmap, deferrals)
# now do a more slow check to work to get stragglers,
# but fortunately the above should have taken the brunt of volume, so
# reduced chance of many responses overwhelming receive buffer.
await asyncio.sleep(0) # give async a chance to move things off buffer to queue
print('waity')
await _grab_rsps((net, net4), rsps, 1, xidmap, deferrals)
while True:
try:
srp = await asyncio.wait_for(pktq.get(), 1.0)
sock, rsp, peer = srp
_parse_slp_packet(rsp, peer, rsps, xidmap, deferrals, sock)
except asyncio.exceptions.TimeoutError:
break
cloop.remove_reader(net)
cloop.remove_reader(net4)
print(len(rsps))
if deferrals:
await asyncio.sleep(1.2) # already have a one second pause from select above