2
0
mirror of https://opendev.org/x/pyghmi synced 2025-01-27 19:37:44 +00:00

Refactor Lenovo OEM support

Rearrange things such that in the future, distinct generations
of IMM that are very similar may be supported with common
code through inheritence while deviating in the ways that are
needed.

This also removes the redundancy of registering the certificate
 callback in the OEM layer and standard layer and instead just
has it in the standard layer.

Change-Id: Ic386553d080163ed63b6e4f2294839654e377269
This commit is contained in:
Jarrod Johnson 2017-01-04 14:31:03 -05:00
parent de59a8e2e7
commit 17fe93826f
4 changed files with 311 additions and 331 deletions

View File

@ -120,6 +120,7 @@ class Command(object):
self._oem = None
self._netchannel = None
self._ipv6support = None
self.certverify = None
if onlogon is not None:
self.ipmi_session = session.Session(bmc=bmc,
userid=userid,
@ -151,9 +152,7 @@ class Command(object):
defaults to 'tls'
"""
if type == 'tls':
self._certverify = callback
self.oem_init()
self._oem.register_key_handler(callback, type)
self.certverify = callback
def logged(self, response):
self.onlogon(response, self)

View File

@ -29,22 +29,6 @@ class OEMHandler(object):
def __init__(self, oemid, ipmicmd):
pass
def register_key_handler(self, callback, type='tls'):
"""Assign a verification handler for a public key
When the library attempts to communicate with the management target
using a non-IPMI protocol, it will try to verify a key. This
allows a caller to register a key handler for accepting or rejecting
a public key/certificate. The callback will be passed the peer public
key or certificate.
:param callback: The function to call with public key/certificate
:param type: Whether the callback is meant to handle 'tls' or 'ssh',
defaults to 'tls'
"""
if type == 'tls':
self._certverify = callback
def get_video_launchdata(self):
return {}

View File

@ -148,6 +148,8 @@ class OEMHandler(generic.OEMHandler):
self.oem_inventory_info = None
self._mrethidx = None
self._hasimm = None
if self.has_imm:
self.immhandler = imm.IMMClient(ipmicmd)
@property
def _megarac_eth_index(self):
@ -311,7 +313,7 @@ class OEMHandler(generic.OEMHandler):
self._collect_tsm_inventory()
return iter(self.oem_inventory_info)
elif self.has_imm:
return imm.get_hw_descriptions(self.ipmicmd, self._certverify)
return self.immhandler.get_hw_descriptions()
return ()
def get_oem_inventory(self):
@ -320,7 +322,7 @@ class OEMHandler(generic.OEMHandler):
for compname in self.oem_inventory_info:
yield (compname, self.oem_inventory_info[compname])
elif self.has_imm:
for inv in imm.get_hw_inventory(self.ipmicmd, self._certverify):
for inv in self.immhandler.get_hw_inventory():
yield inv
def get_sensor_data(self):
@ -343,8 +345,7 @@ class OEMHandler(generic.OEMHandler):
self._collect_tsm_inventory()
return self.oem_inventory_info.get(component, None)
if self.has_imm:
return imm.get_component_inventory(self.ipmicmd, self._certverify,
component)
return self.immhandler.get_component_inventory(component)
def _collect_tsm_inventory(self):
self.oem_inventory_info = {}
@ -503,8 +504,7 @@ class OEMHandler(generic.OEMHandler):
rsp = self.ipmicmd.xraw_command(**command["command"])
return command["parser"](rsp["data"])
elif self.has_imm:
return imm.get_firmware_inventory(self.ipmicmd, bmcver,
self._certverify)
return self.immhandler.get_firmware_inventory(bmcver)
return super(OEMHandler, self).get_oem_firmware(bmcver)
def get_oem_capping_enabled(self):
@ -577,10 +577,10 @@ class OEMHandler(generic.OEMHandler):
def _get_ts_remote_console(self, bmc, username, password):
# We don't establish non-secure connections without checking
# certificates
if not self._certverify:
if not self.ipmicmd.certverify:
return
conn = wc.SecureHTTPConnection(bmc, 443,
verifycallback=self._certverify)
verifycallback=self.ipmicmd.certverify)
conn.connect()
params = urllib.urlencode({
'WEBVAR_USERNAME': username,
@ -763,8 +763,7 @@ class OEMHandler(generic.OEMHandler):
def attach_remote_media(self, url, username, password):
if self.has_imm:
imm.attach_remote_media(self.ipmicmd, self._certverify, url,
username, password)
self.immhandler.attach_remote_media(url, username, password)
elif self.has_megarac:
proto, host, path = util.urlsplit(url)
if proto == 'smb':
@ -788,7 +787,7 @@ class OEMHandler(generic.OEMHandler):
def detach_remote_media(self):
if self.has_imm:
imm.detach_remote_media(self.ipmicmd, self._certverify)
self.immhandler.detach_remote_media()
elif self.has_megarac:
self.ipmicmd.xraw_command(
netfn=0x32, command=0x9f, data=(8, 10, 0, 0))

View File

@ -19,332 +19,330 @@ import json
from pyghmi.ipmi.private.session import _monotonic_time
import pyghmi.util.webclient as webclient
import urllib
import weakref
def _parse_builddate(strval):
try:
return datetime.strptime(strval, '%Y/%m/%d %H:%M:%S')
except ValueError:
pass
try:
return datetime.strptime(strval, '%Y-%m-%d %H:%M:%S')
except ValueError:
pass
try:
return datetime.strptime(strval, '%Y/%m/%d')
except ValueError:
pass
try:
return datetime.strptime(strval, '%m/%d/%Y')
except ValueError:
pass
try:
return datetime.strptime(strval, '%m %d %Y')
except ValueError:
pass
return None
class IMMClient(object):
def __init__(self, ipmicmd):
self.ipmicmd = weakref.proxy(ipmicmd)
self.imm = ipmicmd.bmc
self.username = ipmicmd.ipmi_session.userid
self.password = ipmicmd.ipmi_session.password
self._wc = None # The webclient shall be initiated on demand
self.datacache = {}
def get_imm_property(ipmicmd, propname):
propname = propname.encode('utf-8')
proplen = len(propname) | 0b10000000
cmdlen = len(propname) + 1
cdata = bytearray([0, 0, cmdlen, proplen]) + propname
rsp = ipmicmd.xraw_command(netfn=0x3a, command=0xc4, data=cdata)
rsp['data'] = bytearray(rsp['data'])
if rsp['data'][0] != 0:
@staticmethod
def _parse_builddate(strval):
try:
return datetime.strptime(strval, '%Y/%m/%d %H:%M:%S')
except ValueError:
pass
try:
return datetime.strptime(strval, '%Y-%m-%d %H:%M:%S')
except ValueError:
pass
try:
return datetime.strptime(strval, '%Y/%m/%d')
except ValueError:
pass
try:
return datetime.strptime(strval, '%m/%d/%Y')
except ValueError:
pass
try:
return datetime.strptime(strval, '%m %d %Y')
except ValueError:
pass
return None
propdata = rsp['data'][3:] # second two bytes are size, don't need it
if propdata[0] & 0b10000000: # string, for now assume length valid
return str(propdata[1:]).rstrip(' \x00')
else:
raise Exception('Unknown format for property: ' + repr(propdata))
@classmethod
def parse_imm_buildinfo(cls, buildinfo):
buildid = buildinfo[:9].rstrip(' \x00')
bdt = ' '.join(buildinfo[9:].replace('\x00', ' ').split())
bdate = cls._parse_builddate(bdt)
return buildid, bdate
def get_imm_webclient(imm, certverify, uid, password):
wc = webclient.SecureHTTPConnection(imm, 443,
verifycallback=certverify)
try:
wc.connect()
except Exception:
return None
adata = urllib.urlencode({'user': uid,
'password': password,
'SessionTimeout': 60
})
headers = {'Connection': 'keep-alive',
'Content-Type': 'application/x-www-form-urlencoded'}
wc.request('POST', '/data/login', adata, headers)
rsp = wc.getresponse()
if rsp.status == 200:
rspdata = json.loads(rsp.read())
if rspdata['authResult'] == '0' and rspdata['status'] == 'ok':
if 'token2_name' in rspdata and 'token2_value' in rspdata:
wc.set_header(rspdata['token2_name'], rspdata['token2_value'])
return wc
@classmethod
def datefromprop(cls, propstr):
if propstr is None:
return None
return cls._parse_builddate(propstr)
def parse_imm_buildinfo(buildinfo):
buildid = buildinfo[:9].rstrip(' \x00')
bdt = ' '.join(buildinfo[9:].replace('\x00', ' ').split())
bdate = _parse_builddate(bdt)
return (buildid, bdate)
def datefromprop(propstr):
if propstr is None:
return None
return _parse_builddate(propstr)
def fetch_grouped_properties(ipmicmd, groupinfo):
retdata = {}
for keyval in groupinfo:
retdata[keyval] = get_imm_property(ipmicmd, groupinfo[keyval])
if keyval == 'date':
retdata[keyval] = datefromprop(retdata[keyval])
returnit = False
for keyval in list(retdata):
if retdata[keyval] in (None, ''):
del retdata[keyval]
def get_property(self, propname):
propname = propname.encode('utf-8')
proplen = len(propname) | 0b10000000
cmdlen = len(propname) + 1
cdata = bytearray([0, 0, cmdlen, proplen]) + propname
rsp = self.ipmicmd.xraw_command(netfn=0x3a, command=0xc4, data=cdata)
rsp['data'] = bytearray(rsp['data'])
if rsp['data'][0] != 0:
return None
propdata = rsp['data'][3:] # second two bytes are size, don't need it
if propdata[0] & 0b10000000: # string, for now assume length valid
return str(propdata[1:]).rstrip(' \x00')
else:
returnit = True
if returnit:
return retdata
raise Exception('Unknown format for property: ' + repr(propdata))
def get_webclient(self):
cv = self.ipmicmd.certverify
wc = webclient.SecureHTTPConnection(self.imm, 443, verifycallback=cv)
try:
wc.connect()
except Exception:
return None
adata = urllib.urlencode({'user': self.username,
'password': self.password,
'SessionTimeout': 60
})
headers = {'Connection': 'keep-alive',
'Content-Type': 'application/x-www-form-urlencoded'}
wc.request('POST', '/data/login', adata, headers)
rsp = wc.getresponse()
if rsp.status == 200:
rspdata = json.loads(rsp.read())
if rspdata['authResult'] == '0' and rspdata['status'] == 'ok':
if 'token2_name' in rspdata and 'token2_value' in rspdata:
wc.set_header(rspdata['token2_name'],
rspdata['token2_value'])
return wc
def get_cached_data(ipmicmd, attribute):
try:
kv = getattr(ipmicmd.ipmi_session, attribute)
if kv[1] > _monotonic_time() - 30:
return kv[0]
except AttributeError:
return None
@property
def wc(self):
if not self._wc:
self._wc = self.get_webclient()
return self._wc
def fetch_grouped_properties(self, groupinfo):
retdata = {}
for keyval in groupinfo:
retdata[keyval] = self.get_property(groupinfo[keyval])
if keyval == 'date':
retdata[keyval] = self.datefromprop(retdata[keyval])
returnit = False
for keyval in list(retdata):
if retdata[keyval] in (None, ''):
del retdata[keyval]
else:
returnit = True
if returnit:
return retdata
def get_web_session(ipmicmd, certverify, wc):
if wc:
return wc
wc = get_imm_webclient(ipmicmd.bmc, certverify,
ipmicmd.ipmi_session.userid,
ipmicmd.ipmi_session.password)
return wc
def get_cached_data(self, attribute):
try:
kv = self.datacache[attribute]
if kv[1] > _monotonic_time() - 30:
return kv[0]
except KeyError:
return None
def attach_remote_media(ipmicmd, certverify, url, user, password):
wc = get_web_session(ipmicmd, certverify, None)
url = url.replace(':', '\:')
params = urllib.urlencode({
'RP_VmAllocateMountUrl({0},{1},1,,)'.format(
ipmicmd.ipmi_session.userid, url): ''
})
result = wc.grab_json_response('/data?set', params)
if result['return'] != 'Success':
raise Exception(result['reason'])
wc.grab_json_response('/data/logout')
def detach_remote_media(ipmicmd, certverify):
wc = get_web_session(ipmicmd, certverify, None)
mnt = wc.grab_json_response('/designs/imm/dataproviders/imm_rp_images.php')
removeurls = []
for item in mnt['items']:
if 'urls' in item:
for url in item['urls']:
removeurls.append(url['url'])
for url in removeurls:
def attach_remote_media(self, url, user, password):
url = url.replace(':', '\:')
params = urllib.urlencode({
'RP_VmAllocateUnMountUrl({0},{1},0,)'.format(
ipmicmd.ipmi_session.userid, url): ''})
result = wc.grab_json_response('/data?set', params)
'RP_VmAllocateMountUrl({0},{1},1,,)'.format(
self.username, url): ''
})
result = self.wc.grab_json_response('/data?set', params)
if result['return'] != 'Success':
raise Exception(result['reason'])
wc.grab_json_response('/data/logout')
self.wc.grab_json_response('/data/logout')
def detach_remote_media(self):
mnt = self.wc.grab_json_response(
'/designs/imm/dataproviders/imm_rp_images.php')
removeurls = []
for item in mnt['items']:
if 'urls' in item:
for url in item['urls']:
removeurls.append(url['url'])
for url in removeurls:
url = url.replace(':', '\:')
params = urllib.urlencode({
'RP_VmAllocateUnMountUrl({0},{1},0,)'.format(
self.username, url): ''})
result = self.wc.grab_json_response('/data?set', params)
if result['return'] != 'Success':
raise Exception(result['reason'])
self.wc.grab_json_response('/data/logout')
def fetch_agentless_firmware(ipmicmd, certverify):
wc = None
adapterdata = get_cached_data(ipmicmd, 'lenovo_cached_adapters')
if not adapterdata:
wc = get_web_session(ipmicmd, certverify, wc)
if wc:
adapterdata = wc.grab_json_response(
'/designs/imm/dataproviders/imm_adapters.php')
if adapterdata:
ipmicmd.ipmi_session.lenovo_cached_adapters = (
adapterdata, _monotonic_time())
if adapterdata and 'items' in adapterdata:
for adata in adapterdata['items']:
aname = adata['adapter.adapterName']
donenames = set([])
for fundata in adata['adapter.functions']:
fdata = fundata.get('firmwares', ())
for firm in fdata:
fname = firm['firmwareName'].rstrip()
if '.' in fname:
fname = firm['description'].rstrip()
if fname in donenames:
# ignore redundant entry
continue
donenames.add(fname)
def fetch_agentless_firmware(self):
adapterdata = self.get_cached_data('lenovo_cached_adapters')
if not adapterdata:
if self.wc:
adapterdata = self.wc.grab_json_response(
'/designs/imm/dataproviders/imm_adapters.php')
if adapterdata:
self.datacache['lenovo_cached_adapters'] = (
adapterdata, _monotonic_time())
if adapterdata and 'items' in adapterdata:
for adata in adapterdata['items']:
aname = adata['adapter.adapterName']
donenames = set([])
for fundata in adata['adapter.functions']:
fdata = fundata.get('firmwares', ())
for firm in fdata:
fname = firm['firmwareName'].rstrip()
if '.' in fname:
fname = firm['description'].rstrip()
if fname in donenames:
# ignore redundant entry
continue
donenames.add(fname)
bdata = {}
if 'versionStr' in firm and firm['versionStr']:
bdata['version'] = firm['versionStr']
if ('releaseDate' in firm and
firm['releaseDate'] and
firm['releaseDate'] != 'N/A'):
try:
bdata['date'] = self._parse_builddate(
firm['releaseDate'])
except ValueError:
pass
yield ('{0} {1}'.format(aname, fname), bdata)
storagedata = self.get_cached_data('lenovo_cached_storage')
if not storagedata:
if self.wc:
storagedata = self.wc.grab_json_response(
'/designs/imm/dataproviders/raid_alldevices.php')
if storagedata:
self.datacache['lenovo_cached_storage'] = (
storagedata, _monotonic_time())
if storagedata and 'items' in storagedata:
for adp in storagedata['items']:
if 'storage.vpd.productName' not in adp:
continue
adpname = adp['storage.vpd.productName']
if 'children' not in adp:
adp['children'] = ()
for diskent in adp['children']:
bdata = {}
if 'versionStr' in firm and firm['versionStr']:
bdata['version'] = firm['versionStr']
if ('releaseDate' in firm and
firm['releaseDate'] and
firm['releaseDate'] != 'N/A'):
try:
bdata['date'] = _parse_builddate(
firm['releaseDate'])
except ValueError:
pass
yield ('{0} {1}'.format(aname, fname), bdata)
storagedata = get_cached_data(ipmicmd, 'lenovo_cached_storage')
if not storagedata:
wc = get_web_session(ipmicmd, certverify, wc)
if wc:
storagedata = wc.grab_json_response(
'/designs/imm/dataproviders/raid_alldevices.php')
if storagedata:
ipmicmd.ipmi_session.lenovo_cached_storage = (
storagedata, _monotonic_time())
if storagedata and 'items' in storagedata:
for adp in storagedata['items']:
if 'storage.vpd.productName' not in adp:
continue
adpname = adp['storage.vpd.productName']
if 'children' not in adp:
adp['children'] = ()
for diskent in adp['children']:
bdata = {}
diskname = '{0} Disk {1}'.format(
adpname,
diskent['storage.slotNo'])
bdata['model'] = diskent['storage.vpd.productName'].rstrip()
bdata['version'] = diskent['storage.firmwares'][0][
'versionStr']
yield (diskname, bdata)
if wc:
wc.grab_json_response('/data/logout')
diskname = '{0} Disk {1}'.format(
adpname,
diskent['storage.slotNo'])
bdata['model'] = diskent[
'storage.vpd.productName'].rstrip()
bdata['version'] = diskent['storage.firmwares'][0][
'versionStr']
yield (diskname, bdata)
if self.wc:
self.wc.grab_json_response('/data/logout')
self._wc = None
def get_hw_inventory(self):
hwmap = self.hardware_inventory_map()
for key in hwmap:
yield (key, hwmap[key])
def get_hw_inventory(ipmicmd, certverify):
hwmap = hardware_inventory_map(ipmicmd, certverify)
for key in hwmap:
yield (key, hwmap[key])
def get_hw_descriptions(self):
hwmap = self.hardware_inventory_map()
for key in hwmap:
yield key
def get_component_inventory(self, compname):
hwmap = self.hardware_inventory_map()
try:
return hwmap[compname]
except KeyError:
return None
def get_hw_descriptions(ipmicmd, certverify):
hwmap = hardware_inventory_map(ipmicmd, certverify)
for key in hwmap:
yield key
def weblogout(self):
if self._wc:
self._wc.grab_json_response('/data/logout')
self._wc = None
def get_component_inventory(ipmicmd, certverify, compname):
hwmap = hardware_inventory_map(ipmicmd, certverify)
try:
return hwmap[compname]
except KeyError:
return None
def hardware_inventory_map(ipmicmd, certverify):
hwmap = get_cached_data(ipmicmd, 'lenovo_cached_hwmap')
if hwmap:
def hardware_inventory_map(self):
hwmap = self.get_cached_data('lenovo_cached_hwmap')
if hwmap:
return hwmap
hwmap = {}
adapterdata = self.get_cached_data('lenovo_cached_adapters')
if not adapterdata:
if self.wc:
adapterdata = self.wc.grab_json_response(
'/designs/imm/dataproviders/imm_adapters.php')
if adapterdata:
self.datacache['lenovo_cached_adapters'] = (
adapterdata, _monotonic_time())
if adapterdata and 'items' in adapterdata:
for adata in adapterdata['items']:
skipadapter = False
if not adata['adapter.oobSupported']:
continue
aname = adata['adapter.adapterName']
clabel = adata['adapter.connectorLabel']
if clabel == 'Unknown':
continue
if clabel != 'Onboard':
aslot = adata['adapter.slotNo']
if clabel == 'ML2':
clabel = 'ML2 (Slot {0})'.format(aslot)
else:
clabel = 'Slot {0}'.format(aslot)
bdata = {'location': clabel}
for fundata in adata['adapter.functions']:
bdata['pcislot'] = '{0:02x}:{1:02x}'.format(
fundata['generic.busNo'], fundata['generic.devNo'])
serialdata = fundata.get('vpd.serialNo', None)
if (serialdata and serialdata != 'N/A' and
'---' not in serialdata):
bdata['serial'] = serialdata
partnum = fundata.get('vpd.partNo', None)
if partnum and partnum != 'N/A':
bdata['partnumber'] = partnum
if 'network.pPorts' in fundata:
for portinfo in fundata['network.pPorts']:
for lp in portinfo['logicalPorts']:
ma = lp['networkAddr']
ma = ':'.join(
[ma[i:i+2] for i in range(
0, len(ma), 2)]).lower()
bdata['MAC Address {0}'.format(
portinfo['portIndex'])] = ma
elif clabel == 'Onboard': # skip the various non-nic
skipadapter = True
if not skipadapter:
hwmap[aname] = bdata
self.datacache['lenovo_cached_hwmap'] = (hwmap, _monotonic_time())
self.weblogout()
return hwmap
hwmap = {}
wc = None
adapterdata = get_cached_data(ipmicmd, 'lenovo_cached_adapters')
if not adapterdata:
wc = get_web_session(ipmicmd, certverify, wc)
if wc:
adapterdata = wc.grab_json_response(
'/designs/imm/dataproviders/imm_adapters.php')
if adapterdata:
ipmicmd.ipmi_session.lenovo_cached_adapters = (
adapterdata, _monotonic_time())
if adapterdata and 'items' in adapterdata:
for adata in adapterdata['items']:
skipadapter = False
if not adata['adapter.oobSupported']:
continue
aslot = None
aname = adata['adapter.adapterName']
clabel = adata['adapter.connectorLabel']
if clabel == 'Unknown':
continue
if clabel != 'Onboard':
aslot = adata['adapter.slotNo']
if clabel == 'ML2':
clabel = 'ML2 (Slot {0})'.format(aslot)
else:
clabel = 'Slot {0}'.format(aslot)
bdata = {'location': clabel}
for fundata in adata['adapter.functions']:
bdata['pcislot'] = '{0:02x}:{1:02x}'.format(
fundata['generic.busNo'], fundata['generic.devNo'])
serialdata = fundata.get('vpd.serialNo', None)
if (serialdata and serialdata != 'N/A' and
'---' not in serialdata):
bdata['serial'] = serialdata
partnum = fundata.get('vpd.partNo', None)
if partnum and partnum != 'N/A':
bdata['partnumber'] = partnum
if 'network.pPorts' in fundata:
for portinfo in fundata['network.pPorts']:
for lp in portinfo['logicalPorts']:
ma = lp['networkAddr']
ma = ':'.join(
[ma[i:i+2] for i in range(
0, len(ma), 2)]).lower()
bdata['MAC Address {0}'.format(
portinfo['portIndex'])] = ma
elif clabel == 'Onboard': # skip the various onboard non-nic
skipadapter = True
if not skipadapter:
hwmap[aname] = bdata
ipmicmd.ipmi_session.lenovo_cached_hwmap = (hwmap, _monotonic_time())
if wc:
wc.grab_json_response('/data/logout')
return hwmap
def get_firmware_inventory(ipmicmd, bmcver, certverify):
# First we fetch the system firmware found in imm properties
# then check for agentless, if agentless, get adapter info using
# https, using the caller TLS verification scheme
rsp = ipmicmd.xraw_command(netfn=0x3a, command=0x50)
immverdata = parse_imm_buildinfo(rsp['data'])
bdata = {'version': bmcver, 'build': immverdata[0], 'date': immverdata[1]}
yield ('IMM', bdata)
bdata = fetch_grouped_properties(ipmicmd, {
'build': '/v2/ibmc/dm/fw/imm2/backup_build_id',
'version': '/v2/ibmc/dm/fw/imm2/backup_build_version',
'date': '/v2/ibmc/dm/fw/imm2/backup_build_date'})
if bdata:
yield ('IMM Backup', bdata)
bdata = fetch_grouped_properties(ipmicmd, {
'build': '/v2/ibmc/trusted_buildid',
})
if bdata:
yield ('IMM Trusted Image', bdata)
bdata = fetch_grouped_properties(ipmicmd, {
'build': '/v2/bios/build_id',
'version': '/v2/bios/build_version',
'date': '/v2/bios/build_date'})
if bdata:
yield ('UEFI', bdata)
bdata = fetch_grouped_properties(ipmicmd, {
'build': '/v2/ibmc/dm/fw/bios/backup_build_id',
'version': '/v2/ibmc/dm/fw/bios/backup_build_version'})
if bdata:
yield ('UEFI Backup', bdata)
# Note that the next pending could be pending for either primary
# or backup, so can't promise where it will go
bdata = fetch_grouped_properties(ipmicmd, {
'build': '/v2/bios/pending_build_id'})
if bdata:
yield ('UEFI Pending Update', bdata)
for firm in fetch_agentless_firmware(ipmicmd, certverify):
yield firm
def get_firmware_inventory(self, bmcver):
# First we fetch the system firmware found in imm properties
# then check for agentless, if agentless, get adapter info using
# https, using the caller TLS verification scheme
rsp = self.ipmicmd.xraw_command(netfn=0x3a, command=0x50)
immverdata = self.parse_imm_buildinfo(rsp['data'])
bdata = {
'version': bmcver, 'build': immverdata[0], 'date': immverdata[1]}
yield ('IMM', bdata)
bdata = self.fetch_grouped_properties({
'build': '/v2/ibmc/dm/fw/imm2/backup_build_id',
'version': '/v2/ibmc/dm/fw/imm2/backup_build_version',
'date': '/v2/ibmc/dm/fw/imm2/backup_build_date'})
if bdata:
yield ('IMM Backup', bdata)
bdata = self.fetch_grouped_properties({
'build': '/v2/ibmc/trusted_buildid',
})
if bdata:
yield ('IMM Trusted Image', bdata)
bdata = self.fetch_grouped_properties({
'build': '/v2/bios/build_id',
'version': '/v2/bios/build_version',
'date': '/v2/bios/build_date'})
if bdata:
yield ('UEFI', bdata)
bdata = self.fetch_grouped_properties({
'build': '/v2/ibmc/dm/fw/bios/backup_build_id',
'version': '/v2/ibmc/dm/fw/bios/backup_build_version'})
if bdata:
yield ('UEFI Backup', bdata)
# Note that the next pending could be pending for either primary
# or backup, so can't promise where it will go
bdata = self.fetch_grouped_properties({
'build': '/v2/bios/pending_build_id'})
if bdata:
yield ('UEFI Pending Update', bdata)
for firm in self.fetch_agentless_firmware():
yield firm