diff --git a/confluent_server/confluent/discovery/core.py b/confluent_server/confluent/discovery/core.py index 3f66d95d..148cdac3 100644 --- a/confluent_server/confluent/discovery/core.py +++ b/confluent_server/confluent/discovery/core.py @@ -78,6 +78,7 @@ import confluent.util as util import traceback import eventlet +import eventlet.greenpool import eventlet.semaphore class nesteddict(dict): @@ -106,6 +107,9 @@ servicebyname = { 'lenovo-xcc': 'service:management-hardware.Lenovo:lenovo-xclarity-controller', 'lenovo-imm2': 'service:management-hardware.IBM:integrated-management-module2', } + +discopool = eventlet.greenpool.GreenPool(500) +runningevals = {} # Passive-only auto-detection protocols: # PXE @@ -141,7 +145,6 @@ known_uuids = nesteddict() known_nodes = nesteddict() unknown_info = {} pending_nodes = {} -blockrecheck = False def enrich_pxe_info(info): @@ -386,8 +389,14 @@ def detected_models(): def _recheck_nodes(nodeattribs, configmanager): - if blockrecheck: + if rechecklock.locked(): + # if already in progress, don't run again + # it may make sense to schedule a repeat, but will try the easier and less redundant way first return + with rechecklock: + return _recheck_nodes_backend(nodeattribs, configmanager) + +def _recheck_nodes_backend(nodeattribs, configmanager): global rechecker _map_unique_ids(nodeattribs) # for the nodes whose attributes have changed, consider them as potential @@ -410,7 +419,7 @@ def _recheck_nodes(nodeattribs, configmanager): info = pending_nodes[nodename] try: handler = info['handler'].NodeHandler(info, configmanager) - eventlet.spawn_n(eval_node, configmanager, handler, info, nodename) + discopool.spawn_n(eval_node, configmanager, handler, info, nodename) except Exception: traceback.print_exc() log.log({'error': 'Unexpected error during discovery of {0}, check debug ' @@ -466,25 +475,29 @@ def _recheck_single_unknown(configmanager, mac): known_nodes[nodename][info['hwaddr']] = info info['discostatus'] = 'discovered' return # already known, no need for more - eventlet.spawn_n(eval_node, configmanager, handler, info, nodename) + discopool.spawn_n(eval_node, configmanager, handler, info, nodename) def safe_detected(info): - eventlet.spawn_n(eval_detected, info) + if 'hwaddr' not in info: + return + if info['hwaddr'] in runningevals: + # Do not evaluate the same mac multiple times at once + return + runningevals[info['hwaddr']] = discopool.spawn(eval_detected, info) def eval_detected(info): try: - return detected(info) + detected(info) except Exception as e: traceback.print_exc() + del runningevals[info['hwaddr']] def detected(info): global rechecker global rechecktime - if 'hwaddr' not in info: - return # For now, require hwaddr field to proceed # later, manual and CMM discovery may act on SN and/or UUID for service in info['services']: if nodehandlers.get(service, None): @@ -696,7 +709,6 @@ def eval_node(cfg, handler, info, nodename, manual=False): def discover_node(cfg, handler, info, nodename, manual): - global blockrecheck known_nodes[nodename][info['hwaddr']] = info if info['hwaddr'] in unknown_info: del unknown_info[info['hwaddr']] @@ -753,9 +765,7 @@ def discover_node(cfg, handler, info, nodename, manual): newnodeattribs['pubkeys.tls_hardwaremanager'] = \ util.get_fingerprint(handler.https_cert) if newnodeattribs: - blockrecheck = True cfg.set_node_attributes({nodename: newnodeattribs}) - blockrecheck = False log.log({'info': 'Discovered {0} ({1})'.format(nodename, handler.devname)}) info['discostatus'] = 'discovered' @@ -771,7 +781,6 @@ def do_pxe_discovery(cfg, handler, info, manual, nodename, policies): # use uuid based scheme in lieu of tls cert, ideally only # for stateless 'discovery' targets like pxe, where data does not # change - global blockrecheck uuidinfo = cfg.get_node_attributes(nodename, ['id.uuid', 'id.serial', 'id.model', 'net*.bootable']) if manual or policies & set(('open', 'pxe')): enrich_pxe_info(info) @@ -791,9 +800,7 @@ def do_pxe_discovery(cfg, handler, info, manual, nodename, policies): newattrname = attrname[:-8] + 'hwaddr' attribs[newattrname] = info['hwaddr'] if attribs: - blockrecheck = True cfg.set_node_attributes({nodename: attribs}) - blockrecheck = False if info['uuid'] in known_pxe_uuids: return True known_pxe_uuids[info['uuid']] = nodename @@ -843,21 +850,18 @@ def _periodic_recheck(configmanager): global rechecker global rechecktime rechecker = None - # There shouldn't be anything causing this to double up, but just in case - # use a semaphore to absolutely guarantee this doesn't multiply - with rechecklock: - try: - _recheck_nodes((), configmanager) - except Exception: - traceback.print_exc() - log.log({'error': 'Unexpected error during discovery, check debug ' - 'logs'}) - # if rechecker is set, it means that an accelerated schedule - # for rechecker was requested in the course of recheck_nodes - if rechecker is None: - rechecktime = util.monotonic_time() + 900 - rechecker = eventlet.spawn_after(900, _periodic_recheck, - configmanager) + try: + _recheck_nodes((), configmanager) + except Exception: + traceback.print_exc() + log.log({'error': 'Unexpected error during discovery, check debug ' + 'logs'}) + # if rechecker is set, it means that an accelerated schedule + # for rechecker was requested in the course of recheck_nodes + if rechecker is None: + rechecktime = util.monotonic_time() + 900 + rechecker = eventlet.spawn_after(900, _periodic_recheck, + configmanager) def rescan():