mirror of
https://github.com/xcat2/confluent.git
synced 2025-01-16 12:47:50 +00:00
Place limits on discovery resource consumption
Situation of filehandle exhaustion was seen. In the scenario observed, multiple connections to the same target were seen. So: 1) Backout the recheck block and replace with more comprehensively placed semaphore. 2) Place a discovery pool limit of 500 to generally constrain things. 3) Further limit things to one detected thread per mac address
This commit is contained in:
parent
2a0b038c30
commit
cb00c5d35d
@ -78,6 +78,7 @@ import confluent.util as util
|
||||
import traceback
|
||||
|
||||
import eventlet
|
||||
import eventlet.greenpool
|
||||
import eventlet.semaphore
|
||||
|
||||
class nesteddict(dict):
|
||||
@ -106,6 +107,9 @@ servicebyname = {
|
||||
'lenovo-xcc': 'service:management-hardware.Lenovo:lenovo-xclarity-controller',
|
||||
'lenovo-imm2': 'service:management-hardware.IBM:integrated-management-module2',
|
||||
}
|
||||
|
||||
discopool = eventlet.greenpool.GreenPool(500)
|
||||
runningevals = {}
|
||||
# Passive-only auto-detection protocols:
|
||||
# PXE
|
||||
|
||||
@ -141,7 +145,6 @@ known_uuids = nesteddict()
|
||||
known_nodes = nesteddict()
|
||||
unknown_info = {}
|
||||
pending_nodes = {}
|
||||
blockrecheck = False
|
||||
|
||||
|
||||
def enrich_pxe_info(info):
|
||||
@ -386,8 +389,14 @@ def detected_models():
|
||||
|
||||
|
||||
def _recheck_nodes(nodeattribs, configmanager):
|
||||
if blockrecheck:
|
||||
if rechecklock.locked():
|
||||
# if already in progress, don't run again
|
||||
# it may make sense to schedule a repeat, but will try the easier and less redundant way first
|
||||
return
|
||||
with rechecklock:
|
||||
return _recheck_nodes_backend(nodeattribs, configmanager)
|
||||
|
||||
def _recheck_nodes_backend(nodeattribs, configmanager):
|
||||
global rechecker
|
||||
_map_unique_ids(nodeattribs)
|
||||
# for the nodes whose attributes have changed, consider them as potential
|
||||
@ -410,7 +419,7 @@ def _recheck_nodes(nodeattribs, configmanager):
|
||||
info = pending_nodes[nodename]
|
||||
try:
|
||||
handler = info['handler'].NodeHandler(info, configmanager)
|
||||
eventlet.spawn_n(eval_node, configmanager, handler, info, nodename)
|
||||
discopool.spawn_n(eval_node, configmanager, handler, info, nodename)
|
||||
except Exception:
|
||||
traceback.print_exc()
|
||||
log.log({'error': 'Unexpected error during discovery of {0}, check debug '
|
||||
@ -466,25 +475,29 @@ def _recheck_single_unknown(configmanager, mac):
|
||||
known_nodes[nodename][info['hwaddr']] = info
|
||||
info['discostatus'] = 'discovered'
|
||||
return # already known, no need for more
|
||||
eventlet.spawn_n(eval_node, configmanager, handler, info, nodename)
|
||||
discopool.spawn_n(eval_node, configmanager, handler, info, nodename)
|
||||
|
||||
|
||||
def safe_detected(info):
|
||||
eventlet.spawn_n(eval_detected, info)
|
||||
if 'hwaddr' not in info:
|
||||
return
|
||||
if info['hwaddr'] in runningevals:
|
||||
# Do not evaluate the same mac multiple times at once
|
||||
return
|
||||
runningevals[info['hwaddr']] = discopool.spawn(eval_detected, info)
|
||||
|
||||
|
||||
def eval_detected(info):
|
||||
try:
|
||||
return detected(info)
|
||||
detected(info)
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
del runningevals[info['hwaddr']]
|
||||
|
||||
|
||||
def detected(info):
|
||||
global rechecker
|
||||
global rechecktime
|
||||
if 'hwaddr' not in info:
|
||||
return # For now, require hwaddr field to proceed
|
||||
# later, manual and CMM discovery may act on SN and/or UUID
|
||||
for service in info['services']:
|
||||
if nodehandlers.get(service, None):
|
||||
@ -696,7 +709,6 @@ def eval_node(cfg, handler, info, nodename, manual=False):
|
||||
|
||||
|
||||
def discover_node(cfg, handler, info, nodename, manual):
|
||||
global blockrecheck
|
||||
known_nodes[nodename][info['hwaddr']] = info
|
||||
if info['hwaddr'] in unknown_info:
|
||||
del unknown_info[info['hwaddr']]
|
||||
@ -753,9 +765,7 @@ def discover_node(cfg, handler, info, nodename, manual):
|
||||
newnodeattribs['pubkeys.tls_hardwaremanager'] = \
|
||||
util.get_fingerprint(handler.https_cert)
|
||||
if newnodeattribs:
|
||||
blockrecheck = True
|
||||
cfg.set_node_attributes({nodename: newnodeattribs})
|
||||
blockrecheck = False
|
||||
log.log({'info': 'Discovered {0} ({1})'.format(nodename,
|
||||
handler.devname)})
|
||||
info['discostatus'] = 'discovered'
|
||||
@ -771,7 +781,6 @@ def do_pxe_discovery(cfg, handler, info, manual, nodename, policies):
|
||||
# use uuid based scheme in lieu of tls cert, ideally only
|
||||
# for stateless 'discovery' targets like pxe, where data does not
|
||||
# change
|
||||
global blockrecheck
|
||||
uuidinfo = cfg.get_node_attributes(nodename, ['id.uuid', 'id.serial', 'id.model', 'net*.bootable'])
|
||||
if manual or policies & set(('open', 'pxe')):
|
||||
enrich_pxe_info(info)
|
||||
@ -791,9 +800,7 @@ def do_pxe_discovery(cfg, handler, info, manual, nodename, policies):
|
||||
newattrname = attrname[:-8] + 'hwaddr'
|
||||
attribs[newattrname] = info['hwaddr']
|
||||
if attribs:
|
||||
blockrecheck = True
|
||||
cfg.set_node_attributes({nodename: attribs})
|
||||
blockrecheck = False
|
||||
if info['uuid'] in known_pxe_uuids:
|
||||
return True
|
||||
known_pxe_uuids[info['uuid']] = nodename
|
||||
@ -843,21 +850,18 @@ def _periodic_recheck(configmanager):
|
||||
global rechecker
|
||||
global rechecktime
|
||||
rechecker = None
|
||||
# There shouldn't be anything causing this to double up, but just in case
|
||||
# use a semaphore to absolutely guarantee this doesn't multiply
|
||||
with rechecklock:
|
||||
try:
|
||||
_recheck_nodes((), configmanager)
|
||||
except Exception:
|
||||
traceback.print_exc()
|
||||
log.log({'error': 'Unexpected error during discovery, check debug '
|
||||
'logs'})
|
||||
# if rechecker is set, it means that an accelerated schedule
|
||||
# for rechecker was requested in the course of recheck_nodes
|
||||
if rechecker is None:
|
||||
rechecktime = util.monotonic_time() + 900
|
||||
rechecker = eventlet.spawn_after(900, _periodic_recheck,
|
||||
configmanager)
|
||||
try:
|
||||
_recheck_nodes((), configmanager)
|
||||
except Exception:
|
||||
traceback.print_exc()
|
||||
log.log({'error': 'Unexpected error during discovery, check debug '
|
||||
'logs'})
|
||||
# if rechecker is set, it means that an accelerated schedule
|
||||
# for rechecker was requested in the course of recheck_nodes
|
||||
if rechecker is None:
|
||||
rechecktime = util.monotonic_time() + 900
|
||||
rechecker = eventlet.spawn_after(900, _periodic_recheck,
|
||||
configmanager)
|
||||
|
||||
|
||||
def rescan():
|
||||
|
Loading…
x
Reference in New Issue
Block a user