From e1e3244af676d637381f08f57fbb81e0c5be298a Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Thu, 9 May 2024 09:40:03 -0400 Subject: [PATCH] Port PXE to asyncio and re-enable --- confluent_server/confluent/discovery/core.py | 1 + .../confluent/discovery/protocols/pxe.py | 83 +++++++------------ 2 files changed, 31 insertions(+), 53 deletions(-) diff --git a/confluent_server/confluent/discovery/core.py b/confluent_server/confluent/discovery/core.py index 3dad9f71..66055a40 100644 --- a/confluent_server/confluent/discovery/core.py +++ b/confluent_server/confluent/discovery/core.py @@ -1663,6 +1663,7 @@ def stop_autosense(): def start_autosense(): autosensors.add(util.spawn(slp.snoop(safe_detected, slp))) #autosensors.add(eventlet.spawn(mdns.snoop, safe_detected, mdns)) + util.spawn(pxe.snoop(safe_detected, pxe, get_node_guess_by_uuid)) #autosensors.add(eventlet.spawn(pxe.snoop, safe_detected, pxe, get_node_guess_by_uuid)) util.spawn(remotescan()) diff --git a/confluent_server/confluent/discovery/protocols/pxe.py b/confluent_server/confluent/discovery/protocols/pxe.py index b4c98627..becf24b9 100644 --- a/confluent_server/confluent/discovery/protocols/pxe.py +++ b/confluent_server/confluent/discovery/protocols/pxe.py @@ -388,6 +388,31 @@ def start_proxydhcp(handler, nodeguess=None): util.spawn(proxydhcp(handler, nodeguess)) +def new_dhcp_packet(handler, nodeguess, cfg, net4): + data, cmsgs, flags, peer = net4.recvmsg(9000, 9000) + if len(data) < 64: + return + for cmsg in cmsgs: + level, typ, cdata = cmsg + if level == socket.IPPROTO_IP and typ == IP_PKTINFO: + idx, recv = struct.unpack('II', cdata[:8]) + recv = ipfromint(recv) + rqv = memoryview(data) + if rqv[0] == 1: + process_dhcp4req(handler, nodeguess, cfg, net4, idx, recv, rqv) + + +def new_dhcp6_packet(handler, net6, cfg, nodeguess): + recv = 'ff02::1:2' + pkt, addr = net6.recvfrom(2048) + idx = addr[-1] + if len(pkt) < 64: + return + rqv = memoryview(pkt) + if rqv[0] in (1, 3): + process_dhcp6req(handler, rqv, addr, net6, cfg, nodeguess) + + async def snoop(handler, protocol=None, nodeguess=None): #TODO(jjohnson2): ipv6 socket and multicast for DHCPv6, should that be #prominent @@ -417,60 +442,12 @@ async def snoop(handler, protocol=None, nodeguess=None): v6grp = v6addr + struct.pack('=I', ifidx) net6.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, v6grp) net6.bind(('', 547)) - clientaddr = sockaddr_in() - rawbuffer = bytearray(2048) - data = pkttype.from_buffer(rawbuffer) - msg = msghdr() - cmsgarr = bytearray(cmsgsize) - cmsg = cmsgtype.from_buffer(cmsgarr) - iov = iovec() - iov.iov_base = ctypes.addressof(data) - iov.iov_len = 2048 - msg.msg_iov = ctypes.pointer(iov) - msg.msg_iovlen = 1 - msg.msg_control = ctypes.addressof(cmsg) - msg.msg_controllen = ctypes.sizeof(cmsg) - msg.msg_name = ctypes.addressof(clientaddr) - msg.msg_namelen = ctypes.sizeof(clientaddr) - # We'll leave name and namelen blank for now - while True: - try: - # Just need some delay, picked a prime number so that overlap with other - # timers might be reduced, though it really is probably nothing - ready = select.select([net4, net6], [], [], None) - if not ready or not ready[0]: - continue - for netc in ready[0]: - idx = None - if netc == net4: - i = recvmsg(netc.fileno(), ctypes.pointer(msg), 0) - # if we have a small packet, just skip, it can't possible hold enough - # data and avoids some downstream IndexErrors that would be messy - # with try/except - if i < 64: - continue - _, level, typ = struct.unpack('QII', cmsgarr[:16]) - if level == socket.IPPROTO_IP and typ == IP_PKTINFO: - idx, recv = struct.unpack('II', cmsgarr[16:24]) - recv = ipfromint(recv) - rqv = memoryview(rawbuffer)[:i] - if rawbuffer[0] == 1: # Boot request - process_dhcp4req(handler, nodeguess, cfg, net4, idx, recv, rqv) - elif netc == net6: - recv = 'ff02::1:2' - pkt, addr = netc.recvfrom(2048) - idx = addr[-1] - i = len(pkt) - if i < 64: - continue - rqv = memoryview(pkt) - rq = bytearray(rqv[:2]) - if rq[0] in (1, 3): # dhcpv6 solicit - process_dhcp6req(handler, rqv, addr, netc, cfg, nodeguess) + net6.settimeout(0) + net4.settimeout(0) + cloop = asyncio.get_running_loop() + cloop.add_reader(net4, new_dhcp_packet, handler, nodeguess, cfg, net4) + cloop.add_reader(net6, new_dhcp6_packet, handler, net6, cfg, nodeguess) - except Exception as e: - tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event, - event=log.Events.stacktrace) def process_dhcp6req(handler, rqv, addr, net, cfg, nodeguess): ip = addr[0]