2
0
mirror of https://github.com/xcat2/confluent.git synced 2024-11-25 19:10:10 +00:00

Implement SSDP asyncio

This covers SSDP devices as well as confluent deployment
discovery.
This commit is contained in:
Jarrod Johnson 2024-05-08 13:17:43 -04:00
parent 96a43013b5
commit 42b7cbe421

View File

@ -28,6 +28,7 @@
# NTS: ssdp:alive
import asyncio
import confluent.config.configmanager as cfm
import confluent.collective.manager as collective
import confluent.neighutil as neighutil
@ -35,16 +36,13 @@ import confluent.noderange as noderange
import confluent.util as util
import confluent.log as log
import confluent.netutil as netutil
import eventlet
import eventlet.green.select as select
import eventlet.green.socket as socket
import eventlet.greenpool as gp
import socket
import os
import time
import struct
import traceback
webclient = eventlet.import_patched('pyghmi.util.webclient')
import aiohmi.util.webclient as webclient
mcastv4addr = '239.255.255.250'
mcastv6addr = 'ff02::c'
@ -56,9 +54,9 @@ smsg = ('M-SEARCH * HTTP/1.1\r\n'
'MX: 3\r\n\r\n')
def active_scan(handler, protocol=None):
async def active_scan(handler, protocol=None):
known_peers = set([])
for scanned in scan(['urn:dmtf-org:service:redfish-rest:1', 'urn::service:affluent']):
async for scanned in scan(['urn:dmtf-org:service:redfish-rest:1', 'urn::service:affluent']):
for addr in scanned['addresses']:
if addr in known_peers:
break
@ -72,9 +70,9 @@ def active_scan(handler, protocol=None):
scanned['protocol'] = protocol
handler(scanned)
def scan(services, target=None):
async def scan(services, target=None):
for service in services:
for rply in _find_service(service, target):
async for rply in _find_service(service, target):
yield rply
@ -108,10 +106,10 @@ def _process_snoop(peer, rsp, mac, known_peers, newmacs, peerbymacaddress, byeha
if not value.endswith('/DeviceDescription.json'):
return
if handler:
eventlet.spawn_n(check_fish_handler, handler, peerdata, known_peers, newmacs, peerbymacaddress, machandlers, mac, peer)
util.spawn(check_fish_handler(handler, peerdata, known_peers, newmacs, peerbymacaddress, machandlers, mac, peer))
def check_fish_handler(handler, peerdata, known_peers, newmacs, peerbymacaddress, machandlers, mac, peer):
retdata = check_fish(('/DeviceDescription.json', peerdata))
async def check_fish_handler(handler, peerdata, known_peers, newmacs, peerbymacaddress, machandlers, mac, peer):
retdata = await check_fish(('/DeviceDescription.json', peerdata))
if retdata:
known_peers.add(peer)
newmacs.add(mac)
@ -119,7 +117,7 @@ def check_fish_handler(handler, peerdata, known_peers, newmacs, peerbymacaddress
machandlers[mac] = handler
def snoop(handler, byehandler=None, protocol=None, uuidlookup=None):
async def snoop(handler, byehandler=None, protocol=None, uuidlookup=None):
"""Watch for SSDP notify messages
The handler shall be called on any service coming online.
@ -136,8 +134,9 @@ def snoop(handler, byehandler=None, protocol=None, uuidlookup=None):
# dabbling in multicast wizardry here, such sockets can cause big problems,
# so we will have two distinct sockets
tracelog = log.Logger('trace')
cloop = asyncio.get_running_loop()
try:
active_scan(handler, protocol)
await active_scan(handler, protocol)
except Exception as e:
tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event,
event=log.Events.stacktrace)
@ -163,125 +162,136 @@ def snoop(handler, byehandler=None, protocol=None, uuidlookup=None):
net4.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
net4.bind(('', 1900))
net6.bind(('', 1900))
pktq = asyncio.Queue()
cloop = asyncio.get_running_loop()
cloop.add_reader(net4, _relay_pkt, net4, pktq)
cloop.add_reader(net6, _relay_pkt, net6, pktq)
peerbymacaddress = {}
while True:
try:
newmacs = set([])
deferrednotifies = []
machandlers = {}
r = select.select((net4, net6), (), (), 60)
if r:
r = r[0]
timeout = None
srp = await pktq.get()
recent_peers = set([])
while r and len(deferrednotifies) < 256:
for s in r:
(rsp, peer) = s.recvfrom(9000)
if rsp[:4] == b'PING':
while srp and len(deferrednotifies) < 256:
srp = None
if timeout is None:
srp = await pktq.get()
else:
try:
srp = await asyncio.wait_for(pktq.get(), timeout=timeout)
except asyncio.exceptions.TimeoutError:
break
timeout = 0.2
s, rsp, peer = srp
if rsp[:4] == b'PING':
continue
if peer in recent_peers:
continue
rsp = rsp.split(b'\r\n')
if b' ' not in rsp[0]:
continue
method, _ = rsp[0].split(b' ', 1)
if method == b'NOTIFY':
if peer in known_peers:
continue
if peer in recent_peers:
recent_peers.add(peer)
mac = neighutil.get_hwaddr(peer[0])
if mac == False:
# neighutil determined peer ip is not local, skip attempt
# to probe and critically, skip growing deferrednotifiers
continue
rsp = rsp.split(b'\r\n')
if b' ' not in rsp[0]:
continue
method, _ = rsp[0].split(b' ', 1)
if method == b'NOTIFY':
if peer in known_peers:
if not mac:
probepeer = (peer[0], struct.unpack('H', os.urandom(2))[0] | 1025) + peer[2:]
try:
s.setblocking(1)
s.sendto(b'\x00', probepeer)
except Exception:
continue
recent_peers.add(peer)
mac = neighutil.get_hwaddr(peer[0])
if mac == False:
# neighutil determined peer ip is not local, skip attempt
# to probe and critically, skip growing deferrednotifiers
deferrednotifies.append((peer, rsp))
continue
_process_snoop(peer, rsp, mac, known_peers, newmacs, peerbymacaddress, byehandler, machandlers, handler)
elif method == b'M-SEARCH':
if not uuidlookup:
continue
#ip = peer[0].partition('%')[0]
for headline in rsp[1:]:
if not headline:
continue
if not mac:
probepeer = (peer[0], struct.unpack('H', os.urandom(2))[0] | 1025) + peer[2:]
headline = util.stringify(headline)
headline = headline.partition(':')
if len(headline) < 3:
continue
forcereply = False
if headline[0] == 'ST' and headline[-1].startswith(' urn:xcat.org:service:confluent:'):
try:
s.sendto(b'\x00', probepeer)
cfm.check_quorum()
except Exception:
continue
deferrednotifies.append((peer, rsp))
continue
_process_snoop(peer, rsp, mac, known_peers, newmacs, peerbymacaddress, byehandler, machandlers, handler)
elif method == b'M-SEARCH':
if not uuidlookup:
continue
#ip = peer[0].partition('%')[0]
for headline in rsp[1:]:
if not headline:
continue
headline = util.stringify(headline)
headline = headline.partition(':')
if len(headline) < 3:
continue
forcereply = False
if headline[0] == 'ST' and headline[-1].startswith(' urn:xcat.org:service:confluent:'):
try:
cfm.check_quorum()
except Exception:
continue
for query in headline[-1].split('/'):
node = None
if query.startswith('confluentuuid='):
myuuid = cfm.get_global('confluent_uuid')
curruuid = query.split('=', 1)[1].lower()
if curruuid != myuuid:
break
forcereply = True
elif query.startswith('allconfluent=1'):
reply = 'HTTP/1.1 200 OK\r\n\r\nCONFLUENT: PRESENT\r\n'
if not isinstance(reply, bytes):
reply = reply.encode('utf8')
s.sendto(reply, peer)
elif query.startswith('uuid='):
curruuid = query.split('=', 1)[1].lower()
node = uuidlookup(curruuid)
elif query.startswith('mac='):
currmac = query.split('=', 1)[1].lower()
node = uuidlookup(currmac)
if node:
cfg = cfm.ConfigManager(None)
cfd = cfg.get_node_attributes(
node, ['deployment.pendingprofile', 'collective.managercandidates'])
if not forcereply:
# Do not bother replying to a node that
# we have no deployment activity
# planned for
if not cfd.get(node, {}).get(
'deployment.pendingprofile', {}).get('value', None):
break
candmgrs = cfd.get(node, {}).get('collective.managercandidates', {}).get('value', None)
if candmgrs:
candmgrs = noderange.NodeRange(candmgrs, cfg).nodes
if collective.get_myname() not in candmgrs:
break
currtime = time.time()
seconds = int(currtime)
msecs = int(currtime * 1000 % 1000)
reply = 'HTTP/1.1 200 OK\r\nNODENAME: {0}\r\nCURRTIME: {1}\r\nCURRMSECS: {2}\r\n'.format(node, seconds, msecs)
theip = peer[0].split('%', 1)[0]
if netutil.ip_on_same_subnet(theip, 'fe80::', 64):
if '%' in peer[0]:
ifidx = peer[0].split('%', 1)[1]
iface = socket.getaddrinfo(peer[0], 0, socket.AF_INET6, socket.SOCK_DGRAM)[0][-1][-1]
else:
ifidx = '{}'.format(peer[-1])
iface = peer[-1]
reply += 'MGTIFACE: {0}\r\n'.format(ifidx)
ncfg = netutil.get_nic_config(
cfg, node, ifidx=iface)
if ncfg.get('matchesnodename', None):
reply += 'DEFAULTNET: 1\r\n'
elif not netutil.address_is_local(peer[0]):
continue
if not isinstance(reply, bytes):
reply = reply.encode('utf8')
s.sendto(reply, peer)
for query in headline[-1].split('/'):
node = None
if query.startswith('confluentuuid='):
myuuid = cfm.get_global('confluent_uuid')
curruuid = query.split('=', 1)[1].lower()
if curruuid != myuuid:
break
r = select.select((net4, net6), (), (), 0.2)
if r:
r = r[0]
forcereply = True
elif query.startswith('allconfluent=1'):
reply = 'HTTP/1.1 200 OK\r\n\r\nCONFLUENT: PRESENT\r\n'
if not isinstance(reply, bytes):
reply = reply.encode('utf8')
s.setblocking(1)
s.sendto(reply, peer)
elif query.startswith('uuid='):
curruuid = query.split('=', 1)[1].lower()
node = uuidlookup(curruuid)
elif query.startswith('mac='):
currmac = query.split('=', 1)[1].lower()
node = uuidlookup(currmac)
if node:
cfg = cfm.ConfigManager(None)
cfd = cfg.get_node_attributes(
node, ['deployment.pendingprofile', 'collective.managercandidates'])
if not forcereply:
# Do not bother replying to a node that
# we have no deployment activity
# planned for
if not cfd.get(node, {}).get(
'deployment.pendingprofile', {}).get('value', None):
break
candmgrs = cfd.get(node, {}).get('collective.managercandidates', {}).get('value', None)
if candmgrs:
candmgrs = noderange.NodeRange(candmgrs, cfg).nodes
if collective.get_myname() not in candmgrs:
break
currtime = time.time()
seconds = int(currtime)
msecs = int(currtime * 1000 % 1000)
reply = 'HTTP/1.1 200 OK\r\nNODENAME: {0}\r\nCURRTIME: {1}\r\nCURRMSECS: {2}\r\n'.format(node, seconds, msecs)
theip = peer[0].split('%', 1)[0]
if netutil.ip_on_same_subnet(theip, 'fe80::', 64):
if '%' in peer[0]:
ifidx = peer[0].split('%', 1)[1]
iface = await cloop.getaddrinfo(peer[0], 0, socket.AF_INET6, socket.SOCK_DGRAM)[0][-1][-1]
else:
ifidx = '{}'.format(peer[-1])
iface = peer[-1]
reply += 'MGTIFACE: {0}\r\n'.format(ifidx)
ncfg = netutil.get_nic_config(
cfg, node, ifidx=iface)
if ncfg.get('matchesnodename', None):
reply += 'DEFAULTNET: 1\r\n'
elif not netutil.address_is_local(peer[0]):
continue
if not isinstance(reply, bytes):
reply = reply.encode('utf8')
s.setblocking(1)
s.sendto(reply, peer)
break
if deferrednotifies:
eventlet.sleep(2.2)
await asyncio.sleep(2.2)
for peerrsp in deferrednotifies:
peer, rsp = peerrsp
mac = neighutil.get_hwaddr(peer[0])
@ -305,12 +315,24 @@ def _get_svrip(peerdata):
return addr[0]
return peerdata['addresses'][0][0]
def _find_service(service, target):
def _relay_pkt(sock, pktq):
sock.setblocking(0)
try:
rsp, peer = sock.recvfrom(9000)
except socket.error as se:
return
pktq.put_nowait((sock, rsp, peer))
async def _find_service(service, target):
cloop = asyncio.get_running_loop()
net4 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
net6 = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
net6.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
pktq = asyncio.Queue()
cloop.add_reader(net4, _relay_pkt, net4, pktq)
cloop.add_reader(net6, _relay_pkt, net6, pktq)
if target:
addrs = socket.getaddrinfo(target, 1900, 0, socket.SOCK_DGRAM)
addrs = await cloop.getaddrinfo(target, 1900, 0, socket.SOCK_DGRAM)
for addr in addrs:
host = addr[4][0]
if addr[0] == socket.AF_INET:
@ -362,31 +384,35 @@ def _find_service(service, target):
# SSDP by spec encourages responses to spread out over a 3 second interval
# hence we must be a bit more patient
deadline = util.monotonic_time() + 4
r, _, _ = select.select((net4, net6), (), (), 4)
peerdata = {}
deferparse = []
while r:
for s in r:
(rsp, peer) = s.recvfrom(9000)
if not neighutil.get_hwaddr(peer[0]):
probepeer = (peer[0], struct.unpack('H', os.urandom(2))[0] | 1025) + peer[2:]
try:
s.sendto(b'\x00', probepeer)
except Exception:
continue
deferparse.append((rsp, peer))
srp = True
timeout = 4
while timeout and srp and len(deferparse) < 256:
try:
srp = await asyncio.wait_for(pktq.get(), timeout)
except asyncio.exceptions.TimeoutError:
break
s, rsp, peer = srp
if not neighutil.get_hwaddr(peer[0]):
probepeer = (peer[0], struct.unpack('H', os.urandom(2))[0] | 1025) + peer[2:]
try:
s.sendto(b'\x00', probepeer)
except Exception:
srp = True
continue
_parse_ssdp(peer, rsp, peerdata)
deferparse.append((rsp, peer))
srp = True
continue
_parse_ssdp(peer, rsp, peerdata)
timeout = deadline - util.monotonic_time()
if timeout < 0:
timeout = 0
r, _, _ = select.select((net4, net6), (), (), timeout)
if deferparse:
eventlet.sleep(2.2)
await asyncio.sleep(2.2)
for dp in deferparse:
rsp, peer = dp
_parse_ssdp(peer, rsp, peerdata)
querypool = gp.GreenPool()
pooltargs = []
for nid in peerdata:
if peerdata[nid].get('services', [None])[0] == 'urn::service:affluent:1':
@ -418,17 +444,24 @@ def _find_service(service, target):
# or we drop support for those devices
#else:
# pooltargs.append(('/redfish/v1/', peerdata[nid]))
for pi in querypool.imap(check_fish, pooltargs):
if pi is not None:
yield pi
tsks = []
for targ in pooltargs:
tsks.append(util.spawn(check_fish(targ)))
while tsks:
done, tsks = await asyncio.wait(tsks, return_when=asyncio.FIRST_COMPLETED)
for dt in done:
dt = await dt
if dt is None:
continue
yield dt
def check_fish(urldata, port=443, verifycallback=None):
async def check_fish(urldata, port=443, verifycallback=None):
if not verifycallback:
verifycallback = lambda x: True
url, data = urldata
try:
wc = webclient.SecureHTTPConnection(_get_svrip(data), port, verifycallback=verifycallback, timeout=1.5)
peerinfo = wc.grab_json_response(url)
wc = webclient.WebConnection(_get_svrip(data), port, verifycallback=verifycallback)
peerinfo = await wc.grab_json_response(url)
except socket.error:
return None
if url == '/DeviceDescription.json':
@ -444,7 +477,7 @@ def check_fish(urldata, port=443, verifycallback=None):
except (IndexError, KeyError):
return None
url = '/redfish/v1/'
peerinfo = wc.grab_json_response('/redfish/v1/')
peerinfo = await wc.grab_json_response('/redfish/v1/')
if url == '/redfish/v1/':
if 'UUID' in peerinfo:
data['services'] = ['service:redfish-bmc']