2
0
mirror of https://opendev.org/x/pyghmi synced 2025-08-25 20:40:19 +00:00

Use Lenovo OEM system configuration

For newer systems, provide access
to the richer configuration in
the proprietary Lenovo interface.

Fall back for standard for older platforms that still require IPMI
for that configuration.

Change-Id: Id66515f00116ce3c22fcba80c0003cfb9e6b1a5d
This commit is contained in:
Jarrod Johnson
2022-10-31 16:57:28 -04:00
parent 79ac63f409
commit 0eef3fcfba
2 changed files with 145 additions and 2 deletions

View File

@@ -44,6 +44,8 @@ WRITE_COMMAND = [0x03]
CLOSE_COMMAND = [0x05]
SIZE_COMMAND = [0x06]
class Unsupported(Exception):
pass
def fromstring(inputdata):
if b'!entity' in inputdata.lower():
@@ -133,11 +135,14 @@ def _eval_conditional(expression, cfg, setting):
class LenovoFirmwareConfig(object):
def __init__(self, xc):
def __init__(self, xc, useipmi=True):
if not etree:
raise Exception("python-lxml and python-eficompressor required "
"for this function")
self.connection = xc.ipmicmd
if useipmi:
self.connection = xc.ipmicmd
else:
self.connection = None
self.xc = xc
def imm_size(self, filename):
@@ -274,6 +279,8 @@ class LenovoFirmwareConfig(object):
data = base64.b64decode(data)
data = EfiCompressor.FrameworkDecompress(data, len(data))
else:
if self.connection is None:
raise Unsupported('Not Supported')
for _ in range(0, 30):
filehandle = self.imm_open(cfgfilename)
size = self.imm_size(cfgfilename)
@@ -566,6 +573,8 @@ class LenovoFirmwareConfig(object):
{'Action': 'DSWriteFile', 'Resize': len(data),
'FileName': 'asu_update.efi', 'Content': bdata})
if rsp[1] != 204:
if self.connection is None:
raise Unsupported('Not Supported')
filehandle = self.imm_open("asu_update.efi", write=True,
size=len(data))
self.imm_write(filehandle, len(data), data)

View File

@@ -26,6 +26,7 @@ import six
import pyghmi.exceptions as pygexc
import pyghmi.ipmi.private.util as util
import pyghmi.ipmi.oem.lenovo.config as config
import pyghmi.media as media
import pyghmi.redfish.oem.generic as generic
import pyghmi.storage as storage
@@ -111,6 +112,134 @@ class OEMHandler(generic.OEMHandler):
self.weblogging = False
self.updating = False
self.datacache = {}
self.fwc = None
self.fwo = None
def get_system_configuration(self, hideadvanced=True, fishclient=None,
fetchimm=False):
if not self.fwc:
self.fwc = config.LenovoFirmwareConfig(self, useipmi=False)
try:
self.fwo = self.fwc.get_fw_options(fetchimm=fetchimm)
except config.Unsupported:
return super(OEMHandler, self).get_system_configuration(
hideadvanced, fishclient)
except Exception:
raise Exception('%s failed to retrieve UEFI configuration'
% self.bmcname)
self.fwovintage = util._monotonic_time()
retcfg = {}
for opt in self.fwo:
if 'MegaRAIDConfigurationTool' in opt:
# Suppress the Avago configuration to be consistent with
# other tools.
continue
if (hideadvanced and self.fwo[opt]['lenovo_protect']
or self.fwo[opt]['hidden']):
# Do not enumerate hidden settings
continue
retcfg[opt] = {}
retcfg[opt]['value'] = self.fwo[opt]['current']
retcfg[opt]['default'] = self.fwo[opt]['default']
retcfg[opt]['help'] = self.fwo[opt]['help']
retcfg[opt]['possible'] = self.fwo[opt]['possible']
retcfg[opt]['sortid'] = self.fwo[opt]['sortid']
return retcfg
def set_system_configuration(self, changeset, fishclient):
if not self.fwc:
self.fwc = config.LenovoFirmwareConfig(self, useipmi=False)
fetchimm = False
if not self.fwo or util._monotonic_time() - self.fwovintage > 30:
try:
self.fwo = self.fwc.get_fw_options(fetchimm=fetchimm)
except config.Unsupported:
return super(OEMHandler, self).set_system_configuration(
changeset, fishclient)
self.fwovintage = util._monotonic_time()
for key in list(changeset):
if key not in self.fwo:
found = False
for rkey in self.fwo:
if fnmatch.fnmatch(rkey.lower(), key.lower()):
changeset[rkey] = changeset[key]
found = True
elif self.fwo[rkey].get('alias', None) != rkey:
calias = self.fwo[rkey]['alias']
if fnmatch.fnmatch(calias.lower(), key.lower()):
changeset[rkey] = changeset[key]
found = True
if not found and not fetchimm:
fetchimm = True
self.fwo = self.fwc.get_fw_options(fetchimm=fetchimm)
if key in self.fwo:
continue
else:
found = False
for rkey in self.fwo:
if fnmatch.fnmatch(rkey.lower(), key.lower()):
changeset[rkey] = changeset[key]
found = True
elif self.fwo[rkey].get('alias', None) != rkey:
calias = self.fwo[rkey]['alias']
if fnmatch.fnmatch(
calias.lower(), key.lower()):
changeset[rkey] = changeset[key]
found = True
if found:
del changeset[key]
else:
raise pygexc.InvalidParameterValue(
'{0} not a known setting'.format(key))
self.merge_changeset(changeset)
if changeset:
try:
self.fwc.set_fw_options(self.fwo)
finally:
self.fwo = None
self.fwovintage = 0
def merge_changeset(self, changeset):
for key in changeset:
if isinstance(changeset[key], six.string_types):
changeset[key] = {'value': changeset[key]}
newvalue = changeset[key]['value']
if self.fwo[key]['is_list'] and not isinstance(newvalue, list):
if '=' in newvalue:
# ASU set a precedent of = delimited settings
# for now, honor that delimiter as well
newvalues = newvalue.split('=')
else:
newvalues = newvalue.split(',')
else:
newvalues = [newvalue]
newnewvalues = []
for newvalue in newvalues:
newv = re.sub(r'\s+', ' ', newvalue)
if (self.fwo[key]['possible']
and newvalue not in self.fwo[key]['possible']):
candlist = []
for candidate in self.fwo[key]['possible']:
candid = re.sub(r'\s+', ' ', candidate)
if newv.lower().startswith(candid.lower()):
newvalue = candidate
break
if candid.lower().startswith(newv.lower()):
candlist.append(candidate)
else:
if len(candlist) == 1:
newvalue = candlist[0]
else:
raise pygexc.InvalidParameterValue(
'{0} is not a valid value for {1} '
'({2})'.format(
newvalue, key,
','.join(self.fwo[key]['possible'])))
newnewvalues.append(newvalue)
if len(newnewvalues) == 1:
self.fwo[key]['new_value'] = newnewvalues[0]
else:
self.fwo[key]['new_value'] = newnewvalues
def reseat_bay(self, bay):
if bay != -1:
@@ -842,6 +971,11 @@ class OEMHandler(generic.OEMHandler):
wc.set_header('X-XSRF-TOKEN', wc.cookies['_csrf_token'])
return wc
def grab_redfish_response_with_status(self, url, body=None, method=None):
wc = self.webclient.dupe()
res = wc.grab_json_response_with_status(url, body, method=method)
return res
def list_media(self, fishclient):
rt = self.wc.grab_json_response('/api/providers/rp_vm_remote_getdisk')
if 'items' in rt: