diff --git a/confluent_server/confluent/discovery/protocols/ssdp.py b/confluent_server/confluent/discovery/protocols/ssdp.py index 3c1edc74..c5b89679 100644 --- a/confluent_server/confluent/discovery/protocols/ssdp.py +++ b/confluent_server/confluent/discovery/protocols/ssdp.py @@ -28,6 +28,7 @@ # NTS: ssdp:alive +import asyncio import confluent.config.configmanager as cfm import confluent.collective.manager as collective import confluent.neighutil as neighutil @@ -35,16 +36,13 @@ import confluent.noderange as noderange import confluent.util as util import confluent.log as log import confluent.netutil as netutil -import eventlet -import eventlet.green.select as select -import eventlet.green.socket as socket -import eventlet.greenpool as gp +import socket import os import time import struct import traceback -webclient = eventlet.import_patched('pyghmi.util.webclient') +import aiohmi.util.webclient as webclient mcastv4addr = '239.255.255.250' mcastv6addr = 'ff02::c' @@ -56,9 +54,9 @@ smsg = ('M-SEARCH * HTTP/1.1\r\n' 'MX: 3\r\n\r\n') -def active_scan(handler, protocol=None): +async def active_scan(handler, protocol=None): known_peers = set([]) - for scanned in scan(['urn:dmtf-org:service:redfish-rest:1', 'urn::service:affluent']): + async for scanned in scan(['urn:dmtf-org:service:redfish-rest:1', 'urn::service:affluent']): for addr in scanned['addresses']: if addr in known_peers: break @@ -72,9 +70,9 @@ def active_scan(handler, protocol=None): scanned['protocol'] = protocol handler(scanned) -def scan(services, target=None): +async def scan(services, target=None): for service in services: - for rply in _find_service(service, target): + async for rply in _find_service(service, target): yield rply @@ -108,10 +106,10 @@ def _process_snoop(peer, rsp, mac, known_peers, newmacs, peerbymacaddress, byeha if not value.endswith('/DeviceDescription.json'): return if handler: - eventlet.spawn_n(check_fish_handler, handler, peerdata, known_peers, newmacs, peerbymacaddress, machandlers, mac, peer) + util.spawn(check_fish_handler(handler, peerdata, known_peers, newmacs, peerbymacaddress, machandlers, mac, peer)) -def check_fish_handler(handler, peerdata, known_peers, newmacs, peerbymacaddress, machandlers, mac, peer): - retdata = check_fish(('/DeviceDescription.json', peerdata)) +async def check_fish_handler(handler, peerdata, known_peers, newmacs, peerbymacaddress, machandlers, mac, peer): + retdata = await check_fish(('/DeviceDescription.json', peerdata)) if retdata: known_peers.add(peer) newmacs.add(mac) @@ -119,7 +117,7 @@ def check_fish_handler(handler, peerdata, known_peers, newmacs, peerbymacaddress machandlers[mac] = handler -def snoop(handler, byehandler=None, protocol=None, uuidlookup=None): +async def snoop(handler, byehandler=None, protocol=None, uuidlookup=None): """Watch for SSDP notify messages The handler shall be called on any service coming online. @@ -136,8 +134,9 @@ def snoop(handler, byehandler=None, protocol=None, uuidlookup=None): # dabbling in multicast wizardry here, such sockets can cause big problems, # so we will have two distinct sockets tracelog = log.Logger('trace') + cloop = asyncio.get_running_loop() try: - active_scan(handler, protocol) + await active_scan(handler, protocol) except Exception as e: tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event, event=log.Events.stacktrace) @@ -163,125 +162,136 @@ def snoop(handler, byehandler=None, protocol=None, uuidlookup=None): net4.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) net4.bind(('', 1900)) net6.bind(('', 1900)) + pktq = asyncio.Queue() + cloop = asyncio.get_running_loop() + cloop.add_reader(net4, _relay_pkt, net4, pktq) + cloop.add_reader(net6, _relay_pkt, net6, pktq) peerbymacaddress = {} while True: try: newmacs = set([]) deferrednotifies = [] machandlers = {} - r = select.select((net4, net6), (), (), 60) - if r: - r = r[0] + timeout = None + srp = await pktq.get() recent_peers = set([]) - while r and len(deferrednotifies) < 256: - for s in r: - (rsp, peer) = s.recvfrom(9000) - if rsp[:4] == b'PING': + while srp and len(deferrednotifies) < 256: + srp = None + if timeout is None: + srp = await pktq.get() + else: + try: + srp = await asyncio.wait_for(pktq.get(), timeout=timeout) + except asyncio.exceptions.TimeoutError: + break + timeout = 0.2 + s, rsp, peer = srp + if rsp[:4] == b'PING': + continue + if peer in recent_peers: + continue + rsp = rsp.split(b'\r\n') + if b' ' not in rsp[0]: + continue + method, _ = rsp[0].split(b' ', 1) + if method == b'NOTIFY': + if peer in known_peers: continue - if peer in recent_peers: + recent_peers.add(peer) + mac = neighutil.get_hwaddr(peer[0]) + if mac == False: + # neighutil determined peer ip is not local, skip attempt + # to probe and critically, skip growing deferrednotifiers continue - rsp = rsp.split(b'\r\n') - if b' ' not in rsp[0]: - continue - method, _ = rsp[0].split(b' ', 1) - if method == b'NOTIFY': - if peer in known_peers: + if not mac: + probepeer = (peer[0], struct.unpack('H', os.urandom(2))[0] | 1025) + peer[2:] + try: + s.setblocking(1) + s.sendto(b'\x00', probepeer) + except Exception: continue - recent_peers.add(peer) - mac = neighutil.get_hwaddr(peer[0]) - if mac == False: - # neighutil determined peer ip is not local, skip attempt - # to probe and critically, skip growing deferrednotifiers + deferrednotifies.append((peer, rsp)) + continue + _process_snoop(peer, rsp, mac, known_peers, newmacs, peerbymacaddress, byehandler, machandlers, handler) + elif method == b'M-SEARCH': + if not uuidlookup: + continue + #ip = peer[0].partition('%')[0] + for headline in rsp[1:]: + if not headline: continue - if not mac: - probepeer = (peer[0], struct.unpack('H', os.urandom(2))[0] | 1025) + peer[2:] + headline = util.stringify(headline) + headline = headline.partition(':') + if len(headline) < 3: + continue + forcereply = False + if headline[0] == 'ST' and headline[-1].startswith(' urn:xcat.org:service:confluent:'): try: - s.sendto(b'\x00', probepeer) + cfm.check_quorum() except Exception: continue - deferrednotifies.append((peer, rsp)) - continue - _process_snoop(peer, rsp, mac, known_peers, newmacs, peerbymacaddress, byehandler, machandlers, handler) - elif method == b'M-SEARCH': - if not uuidlookup: - continue - #ip = peer[0].partition('%')[0] - for headline in rsp[1:]: - if not headline: - continue - headline = util.stringify(headline) - headline = headline.partition(':') - if len(headline) < 3: - continue - forcereply = False - if headline[0] == 'ST' and headline[-1].startswith(' urn:xcat.org:service:confluent:'): - try: - cfm.check_quorum() - except Exception: - continue - for query in headline[-1].split('/'): - node = None - if query.startswith('confluentuuid='): - myuuid = cfm.get_global('confluent_uuid') - curruuid = query.split('=', 1)[1].lower() - if curruuid != myuuid: - break - forcereply = True - elif query.startswith('allconfluent=1'): - reply = 'HTTP/1.1 200 OK\r\n\r\nCONFLUENT: PRESENT\r\n' - if not isinstance(reply, bytes): - reply = reply.encode('utf8') - s.sendto(reply, peer) - elif query.startswith('uuid='): - curruuid = query.split('=', 1)[1].lower() - node = uuidlookup(curruuid) - elif query.startswith('mac='): - currmac = query.split('=', 1)[1].lower() - node = uuidlookup(currmac) - if node: - cfg = cfm.ConfigManager(None) - cfd = cfg.get_node_attributes( - node, ['deployment.pendingprofile', 'collective.managercandidates']) - if not forcereply: - # Do not bother replying to a node that - # we have no deployment activity - # planned for - if not cfd.get(node, {}).get( - 'deployment.pendingprofile', {}).get('value', None): - break - candmgrs = cfd.get(node, {}).get('collective.managercandidates', {}).get('value', None) - if candmgrs: - candmgrs = noderange.NodeRange(candmgrs, cfg).nodes - if collective.get_myname() not in candmgrs: - break - currtime = time.time() - seconds = int(currtime) - msecs = int(currtime * 1000 % 1000) - reply = 'HTTP/1.1 200 OK\r\nNODENAME: {0}\r\nCURRTIME: {1}\r\nCURRMSECS: {2}\r\n'.format(node, seconds, msecs) - theip = peer[0].split('%', 1)[0] - if netutil.ip_on_same_subnet(theip, 'fe80::', 64): - if '%' in peer[0]: - ifidx = peer[0].split('%', 1)[1] - iface = socket.getaddrinfo(peer[0], 0, socket.AF_INET6, socket.SOCK_DGRAM)[0][-1][-1] - else: - ifidx = '{}'.format(peer[-1]) - iface = peer[-1] - reply += 'MGTIFACE: {0}\r\n'.format(ifidx) - ncfg = netutil.get_nic_config( - cfg, node, ifidx=iface) - if ncfg.get('matchesnodename', None): - reply += 'DEFAULTNET: 1\r\n' - elif not netutil.address_is_local(peer[0]): - continue - if not isinstance(reply, bytes): - reply = reply.encode('utf8') - s.sendto(reply, peer) + for query in headline[-1].split('/'): + node = None + if query.startswith('confluentuuid='): + myuuid = cfm.get_global('confluent_uuid') + curruuid = query.split('=', 1)[1].lower() + if curruuid != myuuid: break - r = select.select((net4, net6), (), (), 0.2) - if r: - r = r[0] + forcereply = True + elif query.startswith('allconfluent=1'): + reply = 'HTTP/1.1 200 OK\r\n\r\nCONFLUENT: PRESENT\r\n' + if not isinstance(reply, bytes): + reply = reply.encode('utf8') + s.setblocking(1) + s.sendto(reply, peer) + elif query.startswith('uuid='): + curruuid = query.split('=', 1)[1].lower() + node = uuidlookup(curruuid) + elif query.startswith('mac='): + currmac = query.split('=', 1)[1].lower() + node = uuidlookup(currmac) + if node: + cfg = cfm.ConfigManager(None) + cfd = cfg.get_node_attributes( + node, ['deployment.pendingprofile', 'collective.managercandidates']) + if not forcereply: + # Do not bother replying to a node that + # we have no deployment activity + # planned for + if not cfd.get(node, {}).get( + 'deployment.pendingprofile', {}).get('value', None): + break + candmgrs = cfd.get(node, {}).get('collective.managercandidates', {}).get('value', None) + if candmgrs: + candmgrs = noderange.NodeRange(candmgrs, cfg).nodes + if collective.get_myname() not in candmgrs: + break + currtime = time.time() + seconds = int(currtime) + msecs = int(currtime * 1000 % 1000) + reply = 'HTTP/1.1 200 OK\r\nNODENAME: {0}\r\nCURRTIME: {1}\r\nCURRMSECS: {2}\r\n'.format(node, seconds, msecs) + theip = peer[0].split('%', 1)[0] + if netutil.ip_on_same_subnet(theip, 'fe80::', 64): + if '%' in peer[0]: + ifidx = peer[0].split('%', 1)[1] + iface = await cloop.getaddrinfo(peer[0], 0, socket.AF_INET6, socket.SOCK_DGRAM)[0][-1][-1] + else: + ifidx = '{}'.format(peer[-1]) + iface = peer[-1] + reply += 'MGTIFACE: {0}\r\n'.format(ifidx) + ncfg = netutil.get_nic_config( + cfg, node, ifidx=iface) + if ncfg.get('matchesnodename', None): + reply += 'DEFAULTNET: 1\r\n' + elif not netutil.address_is_local(peer[0]): + continue + if not isinstance(reply, bytes): + reply = reply.encode('utf8') + s.setblocking(1) + s.sendto(reply, peer) + break if deferrednotifies: - eventlet.sleep(2.2) + await asyncio.sleep(2.2) for peerrsp in deferrednotifies: peer, rsp = peerrsp mac = neighutil.get_hwaddr(peer[0]) @@ -305,12 +315,24 @@ def _get_svrip(peerdata): return addr[0] return peerdata['addresses'][0][0] -def _find_service(service, target): +def _relay_pkt(sock, pktq): + sock.setblocking(0) + try: + rsp, peer = sock.recvfrom(9000) + except socket.error as se: + return + pktq.put_nowait((sock, rsp, peer)) + +async def _find_service(service, target): + cloop = asyncio.get_running_loop() net4 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) net6 = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) net6.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1) + pktq = asyncio.Queue() + cloop.add_reader(net4, _relay_pkt, net4, pktq) + cloop.add_reader(net6, _relay_pkt, net6, pktq) if target: - addrs = socket.getaddrinfo(target, 1900, 0, socket.SOCK_DGRAM) + addrs = await cloop.getaddrinfo(target, 1900, 0, socket.SOCK_DGRAM) for addr in addrs: host = addr[4][0] if addr[0] == socket.AF_INET: @@ -362,31 +384,35 @@ def _find_service(service, target): # SSDP by spec encourages responses to spread out over a 3 second interval # hence we must be a bit more patient deadline = util.monotonic_time() + 4 - r, _, _ = select.select((net4, net6), (), (), 4) peerdata = {} deferparse = [] - while r: - for s in r: - (rsp, peer) = s.recvfrom(9000) - if not neighutil.get_hwaddr(peer[0]): - probepeer = (peer[0], struct.unpack('H', os.urandom(2))[0] | 1025) + peer[2:] - try: - s.sendto(b'\x00', probepeer) - except Exception: - continue - deferparse.append((rsp, peer)) + srp = True + timeout = 4 + while timeout and srp and len(deferparse) < 256: + try: + srp = await asyncio.wait_for(pktq.get(), timeout) + except asyncio.exceptions.TimeoutError: + break + s, rsp, peer = srp + if not neighutil.get_hwaddr(peer[0]): + probepeer = (peer[0], struct.unpack('H', os.urandom(2))[0] | 1025) + peer[2:] + try: + s.sendto(b'\x00', probepeer) + except Exception: + srp = True continue - _parse_ssdp(peer, rsp, peerdata) + deferparse.append((rsp, peer)) + srp = True + continue + _parse_ssdp(peer, rsp, peerdata) timeout = deadline - util.monotonic_time() if timeout < 0: timeout = 0 - r, _, _ = select.select((net4, net6), (), (), timeout) if deferparse: - eventlet.sleep(2.2) + await asyncio.sleep(2.2) for dp in deferparse: rsp, peer = dp _parse_ssdp(peer, rsp, peerdata) - querypool = gp.GreenPool() pooltargs = [] for nid in peerdata: if peerdata[nid].get('services', [None])[0] == 'urn::service:affluent:1': @@ -418,17 +444,24 @@ def _find_service(service, target): # or we drop support for those devices #else: # pooltargs.append(('/redfish/v1/', peerdata[nid])) - for pi in querypool.imap(check_fish, pooltargs): - if pi is not None: - yield pi + tsks = [] + for targ in pooltargs: + tsks.append(util.spawn(check_fish(targ))) + while tsks: + done, tsks = await asyncio.wait(tsks, return_when=asyncio.FIRST_COMPLETED) + for dt in done: + dt = await dt + if dt is None: + continue + yield dt -def check_fish(urldata, port=443, verifycallback=None): +async def check_fish(urldata, port=443, verifycallback=None): if not verifycallback: verifycallback = lambda x: True url, data = urldata try: - wc = webclient.SecureHTTPConnection(_get_svrip(data), port, verifycallback=verifycallback, timeout=1.5) - peerinfo = wc.grab_json_response(url) + wc = webclient.WebConnection(_get_svrip(data), port, verifycallback=verifycallback) + peerinfo = await wc.grab_json_response(url) except socket.error: return None if url == '/DeviceDescription.json': @@ -444,7 +477,7 @@ def check_fish(urldata, port=443, verifycallback=None): except (IndexError, KeyError): return None url = '/redfish/v1/' - peerinfo = wc.grab_json_response('/redfish/v1/') + peerinfo = await wc.grab_json_response('/redfish/v1/') if url == '/redfish/v1/': if 'UUID' in peerinfo: data['services'] = ['service:redfish-bmc']