2
0
mirror of https://opendev.org/x/pyghmi synced 2025-04-16 10:09:30 +00:00
pyghmi/pyghmi/redfish/command.py
Jarrod Johnson 66973bfd0c Add disk hardware information
Use the storage management capabilities
to provide information about disk hardware.

Change-Id: Ide2cc01ece07f7ca7839bc0bb5f1b877d65bc80d
2019-05-21 13:19:26 -04:00

1197 lines
48 KiB
Python

# vim: tabstop=4 shiftwidth=4 softtabstop=4
# coding=utf8
# Copyright 2019 Lenovo
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# The command module for redfish systems. Provides https-only support
# for redfish compliant endpoints
from datetime import datetime, timedelta
from fnmatch import fnmatch
import json
import os
import socket
import struct
import time
import pyghmi.exceptions as exc
import pyghmi.constants as const
import pyghmi.util.webclient as webclient
import pyghmi.redfish.oem.lookup as oem
import re
from dateutil import tz
numregex = re.compile('([0-9]+)')
powerstates = {
'on': 'On',
'off': 'ForceOff',
'softoff': 'GracefulShutdown',
'shutdown': 'GracefulShutdown',
'reset': 'ForceRestart',
'boot': None,
}
boot_devices_write = {
'net': 'Pxe',
'network': 'Pxe',
'pxe': 'Pxe',
'hd': 'Hdd',
'cd': 'Cd',
'cdrom': 'Cd',
'optical': 'Cd',
'dvd': 'Cd',
'floppy': 'Floppy',
'default': 'None',
'setup': 'BiosSetup',
'bios': 'BiosSetup',
'f1': 'BiosSetup',
}
boot_devices_read = {
'BiosSetup': 'setup',
'Cd': 'optical',
'Floppy': 'floppy',
'Hdd': 'hd',
'None': 'default',
'Pxe': 'network',
'Usb': 'usb',
'SDCard': 'sdcard',
}
_healthmap = {
'Critical': const.Health.Critical,
'Warning': const.Health.Warning,
'OK': const.Health.Ok,
}
def _parse_time(timeval):
if timeval is None:
return None
try:
retval = datetime.strptime(timeval, '%Y-%m-%dT%H:%M:%SZ')
return retval.replace(tzinfo=tz.tzutc())
except ValueError:
pass
try:
positive = None
offset = None
if '+' in timeval:
timeval, offset = timeval.split('+', 1)
positive = 1
elif len(timeval.split('-')) > 3:
timeval, offset = timeval.rsplit('-', 1)
positive = -1
if positive:
hrs, mins = offset.split(':', 1)
secs = int(hrs) * 60 + int(mins)
secs = secs * 60 * positive
ms = None
if '.' in timeval:
timeval, ms = timeval.split('.', 1)
ms = int(ms)
ms = timedelta(0, 0, 0, ms)
retval = datetime.strptime(timeval, '%Y-%m-%dT%H:%M:%S')
if ms:
retval += ms
return retval.replace(tzinfo=tz.tzoffset('', secs))
except ValueError:
pass
try:
return datetime.strptime(timeval, '%Y-%m-%dT%H:%M:%S')
except ValueError:
pass
try:
return datetime.strptime(timeval, '%Y-%m-%d')
except ValueError:
pass
return None
def _mask_to_cidr(mask):
maskn = socket.inet_pton(socket.AF_INET, mask)
maskn = struct.unpack('!I', maskn)[0]
cidr = 32
while maskn & 0b1 == 0 and cidr > 0:
cidr -= 1
maskn >>= 1
return cidr
def _cidr_to_mask(cidr):
return socket.inet_ntop(
socket.AF_INET, struct.pack(
'!I', (2**32 - 1) ^ (2**(32 - cidr) - 1)))
def naturalize_string(key):
"""Analyzes string in a human way to enable natural sort
:param nodename: The node name to analyze
:returns: A structure that can be consumed by 'sorted'
"""
return [int(text) if text.isdigit() else text.lower()
for text in re.split(numregex, key)]
def natural_sort(iterable):
"""Return a sort using natural sort if possible
:param iterable:
:return:
"""
try:
return sorted(iterable, key=naturalize_string)
except TypeError:
# The natural sort attempt failed, fallback to ascii sort
return sorted(iterable)
class SensorReading(object):
def __init__(self, healthinfo, sensor=None, value=None, units=None):
if sensor:
self.name = sensor['name']
else:
self.name = healthinfo['Name']
self.health = _healthmap[healthinfo['Status']['Health']]
self.states = [healthinfo['Status']['Health']]
self.value = value
self.state_ids = None
self.imprecision = None
self.units = units
class AttrDependencyHandler(object):
def __init__(self, dependencies, currsettings, pendingsettings):
self.dependencymap = {}
for dep in dependencies.get('Dependencies', [[]]):
if dep['Type'] != 'Map':
continue
if 'Dependency' not in dep:
continue
if dep['DependencyFor'] in self.dependencymap:
self.dependencymap[
dep['DependencyFor']].append(dep['Dependency'])
else:
self.dependencymap[
dep['DependencyFor']] = [dep['Dependency']]
self.curr = currsettings
self.pend = pendingsettings
self.reg = dependencies['Attributes']
def get_overrides(self, setting):
overrides = {}
blameattrs = []
if setting not in self.dependencymap:
return {}, []
for depinfo in self.dependencymap[setting]:
lastoper = None
lastcond = None
for mapfrom in depinfo.get('MapFrom', []):
if lastcond is not None and not lastoper:
break # MapTerm required to make sense of this, give up
currattr = mapfrom['MapFromAttribute']
blameattrs.append(currattr)
currprop = mapfrom['MapFromProperty']
if currprop == 'CurrentValue':
if currattr in self.pend:
currval = self.pend[currattr]
else:
currval = self.curr[currattr]
else:
currval = self.reg[currattr][currprop]
lastcond = self.process(currval, mapfrom, lastcond, lastoper)
lastoper = mapfrom.get('MapTerms', None)
if lastcond:
if setting not in overrides:
overrides[setting] = {}
if depinfo['MapToAttribute'] not in overrides[setting]:
overrides[depinfo['MapToAttribute']] = {}
overrides[depinfo['MapToAttribute']][
depinfo['MapToProperty']] = depinfo['MapToValue']
return overrides, blameattrs
def process(self, currval, mapfrom, lastcond, lastoper):
newcond = None
mfc = mapfrom['MapFromCondition']
if mfc == 'EQU':
newcond = currval == mapfrom['MapFromValue']
if mfc == 'NEQ':
newcond = currval != mapfrom['MapFromValue']
if mfc == 'GEQ':
newcond = float(currval) >= float(mapfrom['MapFromValue'])
if mfc == 'GTR':
newcond = float(currval) > float(mapfrom['MapFromValue'])
if mfc == 'LEQ':
newcond = float(currval) <= float(mapfrom['MapFromValue'])
if mfc == 'LSS':
newcond = float(currval) < float(mapfrom['MapFromValue'])
if lastcond is not None:
if lastoper == 'AND':
return lastcond and newcond
elif lastoper == 'OR':
return lastcond or newcond
return None
return newcond
class Command(object):
def __init__(self, bmc, userid, password, verifycallback, sysurl=None,
bmcurl=None, chassisurl=None, pool=None):
self.wc = webclient.SecureHTTPConnection(
bmc, 443, verifycallback=verifycallback)
self._hwnamemap = {}
self._fwnamemap = {}
self._urlcache = {}
self._varbmcurl = bmcurl
self._varbiosurl = None
self._varbmcnicurl = None
self._varsetbiosurl = None
self._varchassisurl = chassisurl
self._varresetbmcurl = None
self._varupdateservice = None
self._varfwinventory = None
self._oem = None
self._gpool = pool
self.wc.set_header('Accept', 'application/json')
self.wc.set_header('User-Agent', 'pyghmi')
self.wc.set_header('Accept-Encoding', 'gzip')
self.wc.set_header('OData-Version', '4.0')
overview = self.wc.grab_json_response('/redfish/v1/')
self.wc.set_basic_credentials(userid, password)
self.wc.set_header('Content-Type', 'application/json')
systems = overview['Systems']['@odata.id']
res = self.wc.grab_json_response_with_status(systems)
if res[1] == 401:
raise exc.PyghmiException('Access Denied')
elif res[1] < 200 or res[1] >= 300:
raise exc.PyghmiException(repr(res[0]))
members = res[0]
self._varsensormap = {}
systems = members['Members']
if sysurl:
for system in systems:
if system['@odata.id'] == sysurl:
self.sysurl = sysurl
break
else:
raise exc.PyghmiException(
'Specified sysurl not found: {0}'.format(sysurl))
else:
if len(systems) != 1:
raise exc.PyghmiException(
'Multi system manager, sysurl is required parameter')
self.sysurl = systems[0]['@odata.id']
self.powerurl = self.sysinfo.get('Actions', {}).get(
'#ComputerSystem.Reset', {}).get('target', None)
@property
def _updateservice(self):
if not self._varupdateservice:
overview = self._do_web_request('/redfish/v1/')
us = overview.get('UpdateService', {}).get('@odata.id', None)
if not us:
raise exc.UnsupportedFunctionality(
'BMC does not implement extended firmware information')
self._varupdateservice = us
return self._varupdateservice
@property
def _fwinventory(self):
if not self._varfwinventory:
usi = self._do_web_request(self._updateservice)
self._varfwinventory = usi.get('FirmwareInventory', {}).get(
'@odata.id', None)
if not self._varfwinventory:
raise exc.UnsupportedFunctionality(
'BMC does not implement extended firmware information')
return self._varfwinventory
@property
def sysinfo(self):
return self._do_web_request(self.sysurl)
def get_power(self):
currinfo = self._do_web_request(self.sysurl, cache=False)
return {'powerstate': str(currinfo['PowerState'].lower())}
def set_power(self, powerstate, wait=False):
if powerstate == 'boot':
oldpowerstate = self.get_power()['powerstate']
powerstate = 'on' if oldpowerstate == 'off' else 'reset'
reqpowerstate = powerstate
if powerstate not in powerstates:
raise exc.InvalidParameterValue(
"Unknown power state %s requested" % powerstate)
powerstate = powerstates[powerstate]
result = self.wc.grab_json_response_with_status(
self.powerurl, {'ResetType': powerstate})
if result[1] < 200 or result[1] >= 300:
raise exc.PyghmiException(result[0])
if wait and reqpowerstate in ('on', 'off', 'softoff', 'shutdown'):
if reqpowerstate in ('softoff', 'shutdown'):
reqpowerstate = 'off'
timeout = os.times()[4] + 300
while (self.get_power()['powerstate'] != reqpowerstate
and os.times()[4] < timeout):
time.sleep(1)
if self.get_power()['powerstate'] != reqpowerstate:
raise exc.PyghmiException(
"System did not accomplish power state change")
return {'powerstate': reqpowerstate}
return {'pendingpowerstate': reqpowerstate}
def _get_cache(self, url, cache=30):
now = os.times()[4]
cachent = self._urlcache.get(url, None)
if cachent and cachent['vintage'] > now - cache:
return cachent['contents']
return None
def _do_bulk_requests(self, urls, cache=True):
if self._gpool:
urls = [(x, None, None, cache) for x in urls]
for res in self._gpool.starmap(self._do_web_request_withurl, urls):
yield res
else:
for url in urls:
yield self._do_web_request_withurl(url, cache=cache)
def _do_web_request_withurl(self, url, payload=None, method=None,
cache=True):
return self._do_web_request(url, payload, method, cache), url
def _do_web_request(self, url, payload=None, method=None, cache=True, etag=None):
res = None
if cache and payload is None and method is None:
res = self._get_cache(url, cache)
if res:
return res
wc = self.wc.dupe()
if etag:
wc.stdheaders['If-Match'] = etag
try:
res = wc.grab_json_response_with_status(url, payload,
method=method)
finally:
if 'If-Match' in wc.stdheaders:
del wc.stdheaders['If-Match']
if res[1] < 200 or res[1] >= 300:
try:
info = json.loads(res[0])
errmsg = [
x.get('Message', x['MessageId']) for x in info.get(
'error', {}).get('@Message.ExtendedInfo', {})]
errmsg = ','.join(errmsg)
raise exc.RedfishError(errmsg)
except (ValueError, KeyError):
raise exc.PyghmiException(res[0])
if payload is None and method is None:
self._urlcache[url] = {'contents': res[0],
'vintage': os.times()[4]}
return res[0]
def get_bootdev(self):
"""Get current boot device override information.
:raises: PyghmiException on error
:returns: dict
"""
result = self._do_web_request(self.sysurl)
overridestate = result.get('Boot', {}).get(
'BootSourceOverrideEnabled', None)
if overridestate == 'Disabled':
return {'bootdev': 'default', 'persistent': True}
persistent = None
if overridestate == 'Once':
persistent = False
elif overridestate == 'Continuous':
persistent = True
else:
raise exc.PyghmiException('Unrecognized Boot state: '
+ repr(overridestate))
uefimode = result.get('Boot', {}).get('BootSourceOverrideMode', None)
if uefimode == 'UEFI':
uefimode = True
elif uefimode == 'Legacy':
uefimode = False
else:
raise exc.PyghmiException('Unrecognized mode: ' + uefimode)
bootdev = result.get('Boot', {}).get('BootSourceOverrideTarget', None)
if bootdev not in boot_devices_read:
raise exc.PyghmiException('Unrecognized boot target: '
+ repr(bootdev))
bootdev = boot_devices_read[bootdev]
return {'bootdev': bootdev, 'persistent': persistent,
'uefimode': uefimode}
def set_bootdev(self, bootdev, persist=False, uefiboot=None):
"""Set boot device to use on next reboot
:param bootdev:
*network -- Request network boot
*hd -- Boot from hard drive
*safe -- Boot from hard drive, requesting 'safe mode'
*optical -- boot from CD/DVD/BD drive
*setup -- Boot into setup utility
*default -- remove any directed boot device request
:param persist: If true, ask that system firmware use this device
beyond next boot. Be aware many systems do not honor
this
:param uefiboot: If true, request UEFI boot explicitly. If False,
request BIOS style boot.
None (default) does not modify the boot mode.
:raises: PyghmiException on an error.
:returns: dict or True -- If callback is not provided, the response
"""
reqbootdev = bootdev
if (bootdev not in boot_devices_write
and bootdev not in boot_devices_read):
raise exc.InvalidParameterValue('Unsupported device '
+ repr(bootdev))
bootdev = boot_devices_write.get(bootdev, bootdev)
if bootdev == 'None':
payload = {'Boot': {'BootSourceOverrideEnabled': 'Disabled'}}
else:
payload = {'Boot': {
'BootSourceOverrideEnabled': 'Continuous' if persist
else 'Once',
'BootSourceOverrideTarget': bootdev,
}}
if uefiboot is not None:
uefiboot = 'UEFI' if uefiboot else 'Legacy'
payload['BootSourceOverrideMode'] = uefiboot
try:
self._do_web_request(self.sysurl, payload, method='PATCH')
return {'bootdev': reqbootdev}
except Exception:
del payload['BootSourceOverrideMode']
self._do_web_request(self.sysurl, payload, method='PATCH')
return {'bootdev': reqbootdev}
@property
def _biosurl(self):
if not self._varbiosurl:
self._varbiosurl = self.sysinfo.get('Bios', {}).get('@odata.id',
None)
if self._varbiosurl is None:
raise exc.UnsupportedFunctionality(
'Bios management not detected on this platform')
return self._varbiosurl
@property
def _setbiosurl(self):
if self._varsetbiosurl is None:
biosinfo = self._do_web_request(self._biosurl)
self._varsetbiosurl = biosinfo.get(
'@Redfish.Settings', {}).get('SettingsObject', {}).get(
'@odata.id', None)
if self._varsetbiosurl is None:
raise exc.UnsupportedFunctionality('Ability to set BIOS settings '
'not detected on this platform')
return self._varsetbiosurl
@property
def _sensormap(self):
if not self._varsensormap:
for chassis in self.sysinfo.get('Links', {}).get('Chassis', []):
chassisurl = chassis['@odata.id']
chassisinfo = self._do_web_request(chassisurl)
powurl = chassisinfo.get('Power', {}).get('@odata.id', '')
if powurl:
powinf = self._do_web_request(powurl)
for voltage in powinf.get('Voltages', []):
if 'Name' in voltage:
self._varsensormap[voltage['Name']] = {
'name': voltage['Name'], 'url': powurl,
'type': 'Voltage'}
thermurl = chassisinfo.get('Thermal', {}).get('@odata.id', '')
if thermurl:
therminf = self._do_web_request(thermurl)
for fan in therminf.get('Fans', []):
if 'Name' in fan:
self._varsensormap[fan['Name']] = {
'name': fan['Name'], 'type': 'Fan',
'url': thermurl}
for temp in therminf.get('Temperatures', []):
if 'Name' in temp:
self._varsensormap[temp['Name']] = {
'name': temp['Name'], 'type': 'Temperature',
'url': thermurl}
return self._varsensormap
@property
def _bmcurl(self):
if not self._varbmcurl:
self._varbmcurl = self.sysinfo.get('Links', {}).get(
'ManagedBy', [{}])[0].get('@odata.id', None)
return self._varbmcurl
@property
def _bmcnicurl(self):
if not self._varbmcnicurl:
bmcinfo = self._do_web_request(self._bmcurl)
nicurl = bmcinfo.get('EthernetInterfaces', {}).get('@odata.id',
None)
niclist = self._do_web_request(nicurl)
foundnics = 0
lastnicurl = None
for nic in niclist.get('Members', []):
curl = nic.get('@odata.id', None)
if not curl:
continue
nicinfo = self._do_web_request(curl)
if nicinfo.get('Links', {}).get('HostInterface', None):
# skip host interface
continue
foundnics += 1
lastnicurl = curl
if foundnics != 1:
raise exc.PyghmiException(
'BMC does not have exactly one interface')
self._varbmcnicurl = lastnicurl
return self._varbmcnicurl
@property
def _bmcreseturl(self):
if not self._varresetbmcurl:
bmcinfo = self._do_web_request(self._bmcurl)
self._varresetbmcurl = bmcinfo.get('Actions', {}).get(
'#Manager.Reset', {}).get('target', None)
return self._varresetbmcurl
def reset_bmc(self):
self._do_web_request(self._bmcreseturl,
{'ResetType': 'ForceRestart'})
def set_identify(self, on=True, blink=None):
self._do_web_request(
self.sysurl,
{'IndicatorLED': 'Blinking' if blink else 'Lit' if on else 'Off'},
method='PATCH')
_idstatemap = {
'Blinking': 'blink',
'Lit': 'on',
'Off': 'off',
}
def get_identify(self):
ledstate = self.sysinfo['IndicatorLED']
return {'identifystate': self._idstatemap[ledstate]}
def get_health(self, verbose=True):
health = self.sysinfo.get('Status', {})
health = health.get('HealthRollup', health.get('Health', 'Unknown'))
health = _healthmap[health]
summary = {'badreadings': [], 'health': health}
if health > 0 and verbose:
# now have to manually peruse all psus, fans, processors, ram,
# storage
procurl = self.sysinfo.get('Processors', {}).get('@odata.id', None)
if procurl:
for cpu in self._do_web_request(procurl).get('Members', []):
cinfo = self._do_web_request(cpu['@odata.id'])
if cinfo['Status']['Health'] != 'OK':
summary['badreadings'].append(SensorReading(cinfo))
if self.sysinfo.get('MemorySummary', {}).get('Status', {}).get(
'HealthRollup', 'OK') not in ('OK', None):
dimmfound = False
for mem in self._do_web_request(
self.sysinfo['Memory']['@odata.id'])['Members']:
dimminfo = self._do_web_request(mem)
if dimminfo['Status']['Health'] not in ('OK', None):
summary['badreadings'].append(SensorReading(dimminfo))
dimmfound = True
if not dimmfound:
meminfo = self.sysinfo['MemorySummary']
meminfo['Name'] = 'Memory'
summary['badreadings'].append(SensorReading(meminfo))
for adapter in self.sysinfo['PCIeDevices']:
adpinfo = self._do_web_request(adapter['@odata.id'])
if adpinfo['Status']['Health'] not in ('OK', None):
summary['badreadings'].append(SensorReading(adpinfo))
for fun in self.sysinfo['PCIeFunctions']:
funinfo = self._do_web_request(fun['@odata.id'])
if funinfo['Status']['Health'] not in ('OK', None):
summary['badreadings'].append(SensorReading(funinfo))
return summary
def _get_biosreg(self, url):
addon = {}
valtodisplay = {}
displaytoval = {}
reg = self._do_web_request(url)
reg = reg['RegistryEntries']
for attr in reg['Attributes']:
vals = attr.get('Value', [])
if vals:
valtodisplay[attr['AttributeName']] = {}
displaytoval[attr['AttributeName']] = {}
for val in vals:
valtodisplay[
attr['AttributeName']][val['ValueName']] = val[
'ValueDisplayName']
displaytoval[
attr['AttributeName']][val['ValueDisplayName']] = val[
'ValueName']
defaultval = attr.get('DefaultValue', None)
defaultval = valtodisplay.get(attr['AttributeName'], {}).get(
defaultval, defaultval)
if attr['Type'] == 'Integer' and defaultval:
defaultval = int(defaultval)
addon[attr['AttributeName']] = {
'default': defaultval,
'help': attr.get('HelpText', None),
'sortid': attr.get('DisplayOrder', None),
'possible': [x['ValueDisplayName'] for x in vals],
}
return addon, valtodisplay, displaytoval, reg
def get_system_configuration(self, hideadvanced=True):
biosinfo = self._do_web_request(self._biosurl, cache=False)
extrainfo = {}
valtodisplay = {}
self.attrdeps = []
if 'AttributeRegistry' in biosinfo:
overview = self._do_web_request('/redfish/v1/')
reglist = overview['Registries']['@odata.id']
reglist = self._do_web_request(reglist)
regurl = None
for cand in reglist.get('Members', []):
cand = cand.get('@odata.id', '')
candname = cand.split('/')[-1]
if candname == '': # implementation uses trailing slash
candname = cand.split('/')[-2]
if candname == biosinfo['AttributeRegistry']:
regurl = cand
break
if not regurl:
# Workaround a vendor bug where they link to a non-existant name
for cand in reglist.get('Members', []):
cand = cand.get('@odata.id', '')
candname = cand.split('/')[-1]
candname = candname.split('.')[0]
if candname == biosinfo[
'AttributeRegistry'].split('.')[0]:
regurl = cand
break
if regurl:
reginfo = self._do_web_request(regurl)
for reg in reginfo.get('Location', []):
if reg.get('Language', 'en').startswith('en'):
reguri = reg['Uri']
reginfo = self._get_biosreg(reguri)
extrainfo, valtodisplay, _, self.attrdeps = reginfo
currsettings = {}
pendingsettings = self._do_web_request(self._setbiosurl)
pendingsettings = pendingsettings.get('Attributes', {})
for setting in biosinfo.get('Attributes', {}):
val = biosinfo['Attributes'][setting]
currval = val
if setting in pendingsettings:
val = pendingsettings[setting]
val = valtodisplay.get(setting, {}).get(val, val)
currval = valtodisplay.get(setting, {}).get(currval, currval)
val = {'value': val}
if currval != val['value']:
val['active'] = currval
val.update(**extrainfo.get(setting, {}))
currsettings[setting] = val
return currsettings
def clear_system_configuration(self):
"""Clear the BIOS/UEFI configuration
"""
biosinfo = self._do_web_request(self._biosurl)
rb = biosinfo.get('Actions', {}).get('#Bios.ResetBios', {})
rb = rb.get('target', '')
if not rb:
raise Exception('BIOS reset not detected on this system')
self._do_web_request(rb, {'Action': 'Bios.ResetBios'})
def set_system_configuration(self, changeset):
currsettings = self.get_system_configuration()
rawsettings = self._do_web_request(self._biosurl, cache=False)
rawsettings = rawsettings.get('Attributes', {})
pendingsettings = self._do_web_request(self._setbiosurl)
pendingsettings = pendingsettings.get('Attributes', {})
dephandler = AttrDependencyHandler(self.attrdeps, rawsettings, pendingsettings)
for change in list(changeset):
if change not in currsettings:
found = False
for attr in currsettings:
if fnmatch(attr.lower(), change.lower()):
found = True
changeset[attr] = changeset[change]
if found:
del changeset[change]
for change in changeset:
changeval = changeset[change]
overrides, blameattrs = dephandler.get_overrides(change)
meta = {}
for attr in self.attrdeps['Attributes']:
if attr['AttributeName'] == change:
meta = dict(attr)
break
meta.update(**overrides.get(change, {}))
if meta.get('ReadOnly', False) or meta.get('GrayOut', False):
errstr = '{0} is read only'.format(change)
if blameattrs:
errstr += ' due to one of the following settings: ' \
'{0}'.format(','.join(sorted(blameattrs)))
raise exc.InvalidParameterValue(errstr)
if (currsettings.get(change, {}).get('possible', [])
and changeval not in currsettings[change]['possible']):
normval = changeval.lower()
normval = re.sub(r'\s+', ' ', normval)
if not normval.endswith('*'):
normval += '*'
for cand in currsettings[change]['possible']:
if fnmatch(cand.lower(), normval):
changeset[change] = cand
break
else:
raise exc.InvalidParameterValue(
'{0} is not a valid value for {1} '
'({2})'.format(
changeval, change,
','.join(currsettings[change]['possible'])))
redfishsettings = {'Attributes': changeset}
self._do_web_request(self._setbiosurl, redfishsettings, 'PATCH')
def set_net_configuration(self, ipv4_address=None, ipv4_configuration=None,
ipv4_gateway=None):
patch = {}
ipinfo = {}
netmask = None
if ipv4_address:
if '/' in ipv4_address:
ipv4_address, cidr = ipv4_address.split('/')
netmask = _cidr_to_mask(int(cidr))
patch['IPv4StaticAddresses'] = [ipinfo]
ipinfo['Address'] = ipv4_address
if netmask:
ipinfo['SubnetMask'] = netmask
if ipv4_gateway:
patch['IPv4StaticAddresses'] = [ipinfo]
ipinfo['Gateway'] = ipv4_gateway
if ipv4_configuration.lower() == 'dhcp':
patch['DHCPv4'] = {'DHCPEnabled': True}
elif (ipv4_configuration == 'static'
or 'IPv4StaticAddresses' in patch):
patch['DHCPv4'] = {'DHCPEnabled': False}
if patch:
self._do_web_request(self._bmcnicurl, patch, 'PATCH')
def get_net_configuration(self):
netcfg = self._do_web_request(self._bmcnicurl, cache=False)
ipv4 = netcfg.get('IPv4Addresses', {})
if not ipv4:
raise exc.PyghmiException('Unable to locate network information')
retval = {}
if len(netcfg['IPv4Addresses']) != 1:
netcfg['IPv4Addresses'] = [
x for x in netcfg['IPv4Addresses']
if x['Address'] != '0.0.0.0']
if len(netcfg['IPv4Addresses']) != 1:
raise exc.PyghmiException('Multiple IP addresses not supported')
currip = netcfg['IPv4Addresses'][0]
cidr = _mask_to_cidr(currip['SubnetMask'])
retval['ipv4_address'] = '{0}/{1}'.format(currip['Address'], cidr)
retval['mac_address'] = netcfg['MACAddress']
hasgateway = _mask_to_cidr(currip['Gateway'])
retval['ipv4_gateway'] = currip['Gateway'] if hasgateway else None
retval['ipv4_configuration'] = currip['AddressOrigin']
return retval
def get_hostname(self):
netcfg = self._do_web_request(self._bmcnicurl)
return netcfg['HostName']
def get_firmware(self, components=()):
fwlist = self._do_web_request(self._fwinventory)
fwurls = [x['@odata.id'] for x in fwlist.get('Members', [])]
self._fwnamemap = {}
for res in self._do_bulk_requests(fwurls):
res = self._extract_fwinfo(res)
if res[0] is None:
continue
yield res
def _extract_fwinfo(self, inf):
currinf = {}
fwi, url = inf
fwname = fwi.get('Name', 'Unknown')
if fwname in self._fwnamemap:
fwname = fwi.get('Id', fwname)
if fwname in self._fwnamemap:
# Block duplicates for by name retrieval
self._fwnamemap[fwname] = None
else:
self._fwnamemap[fwname] = url
currinf['name'] = fwname
currinf['id'] = fwi.get('Id', None)
currinf['version'] = fwi.get('Version', 'Unknown')
currinf['date'] = _parse_time(fwi.get('ReleaseDate', ''))
if not (currinf['version'] or currinf['date']):
return None, None
# TODO: OEM extended data with buildid
currstate = fwi.get('Status', {}).get('State', 'Unknown')
if currstate == 'StandbyOffline':
currinf['state'] = 'pending'
elif currstate == 'Enabled':
currinf['state'] = 'active'
elif currstate == 'StandbySpare':
currinf['state'] = 'backup'
return fwname, currinf
def get_inventory_descriptions(self, withids=False):
yield "System"
self._hwnamemap = {}
for cpu in self._get_cpu_inventory(True, withids):
yield cpu
for mem in self._get_mem_inventory(True, withids):
yield mem
for adp in self._get_adp_inventory(True, withids):
yield adp
def get_inventory_of_component(self, component):
if component.lower() == 'system':
sysinfo = {
'UUID': self.sysinfo.get('UUID', ''),
'Serial Number': self.sysinfo.get('SerialNumber', ''),
'Manufacturer': self.sysinfo.get('Manufacturer', ''),
'Product Name': self.sysinfo.get('Model', ''),
'Model': self.sysinfo.get(
'SKU', self.sysinfo.get('PartNumber', '')),
}
return sysinfo
else:
for invpair in self.get_inventory():
if invpair[0].lower() == component.lower():
return invpair[1]
def get_inventory(self, withids=False):
sysinfo = {
'UUID': self.sysinfo.get('UUID', ''),
'Serial Number': self.sysinfo.get('SerialNumber', ''),
'Manufacturer': self.sysinfo.get('Manufacturer', ''),
'Product Name': self.sysinfo.get('Model', ''),
'Model': self.sysinfo.get(
'SKU', self.sysinfo.get('PartNumber', '')),
}
yield ('System', sysinfo)
self._hwnamemap = {}
memurl = self.sysinfo.get('Memory', {}).get('@odata.id', None)
cpurl = self.sysinfo.get('Processors', {}).get('@odata.id', None)
list(self._do_bulk_requests([memurl, cpurl]))
adpurls = self._get_adp_urls()
cpurls = self._get_cpu_urls()
memurls = self._get_mem_urls()
diskurls = self._get_disk_urls()
allurls = adpurls + cpurls + memurls + diskurls
list(self._do_bulk_requests(allurls))
for cpu in self._get_cpu_inventory(withids=withids, urls=cpurls):
yield cpu
for mem in self._get_mem_inventory(withids=withids, urls=memurls):
yield mem
for adp in self._get_adp_inventory(withids=withids, urls=adpurls):
yield adp
for disk in self._get_disk_inventory(withids=withids, urls=diskurls):
yield disk
def _get_disk_inventory(self, onlyname=False, withids=False, urls=None):
if not urls:
urls = self._get_disk_urls()
for inf in self._do_bulk_requests(urls):
inf, _ = inf
ddata = {
'Model': inf.get('Model', None),
'Serial Number': inf.get('SerialNumber', None),
'Description': inf.get('Name'),
}
loc = inf.get('PhysicalLocation', {}).get('Info', None)
if loc:
dname = 'Disk {0}'.format(loc)
else:
dname = inf.get('Id', 'Disk')
yield (dname, ddata)
def _get_adp_inventory(self, onlyname=False, withids=False, urls=None):
if not urls:
urls = self._get_adp_urls()
if not urls:
# No PCIe device inventory, but *maybe* ethernet inventory...
aidx = 1
for nicinfo in self._get_eth_urls():
nicinfo = self._do_web_request(nicinfo)
nicname = nicinfo.get('Name', None)
nicinfo = nicinfo.get('MACAddress', None)
if not nicname:
nicname = 'NIC'
if nicinfo:
yield (nicname, {'MAC Address {0}'.format(aidx): nicinfo})
aidx += 1
return
for inf in self._do_bulk_requests(urls):
adpinfo, url = inf
aname = adpinfo.get('Name', 'Unknown')
if aname in self._hwnamemap:
aname = adpinfo.get('Id', aname)
if aname in self._hwnamemap:
self._hwnamemap[aname] = None
else:
self._hwnamemap[aname] = (url, self._get_adp_inventory)
if onlyname:
if withids:
yield aname, adpinfo.get('Id', aname)
else:
yield aname
continue
functions = adpinfo.get('Links', {}).get('PCIeFunctions', [])
nicidx = 1
if withids:
yieldinf = {'Id': adpinfo.get('Id', aname)}
else:
yieldinf = {}
funurls = [x['@odata.id'] for x in functions]
for fun in self._do_bulk_requests(funurls):
funinfo, url = fun
yieldinf['PCI Device ID'] = funinfo['DeviceId'].replace('0x',
'')
yieldinf['PCI Vendor ID'] = funinfo['VendorId'].replace('0x',
'')
yieldinf['PCI Subsystem Device ID'] = funinfo[
'SubsystemId'].replace('0x', '')
yieldinf['PCI Subsystem Vendor ID'] = funinfo[
'SubsystemVendorId'].replace('0x', '')
yieldinf['Type'] = funinfo['DeviceClass']
for nicinfo in funinfo.get('Links', {}).get(
'EthernetInterfaces', []):
nicinfo = self._do_web_request(nicinfo['@odata.id'])
macaddr = nicinfo.get('MACAddress', None)
if macaddr:
yieldinf['MAC Address {0}'.format(nicidx)] = macaddr
nicidx += 1
yield aname, yieldinf
def _get_eth_urls(self):
ethurls = self.sysinfo.get('EthernetInterfaces', {})
ethurls = ethurls.get('@odata.id', None)
if ethurls:
ethurls = self._do_web_request(ethurls)
ethurls = ethurls.get('Members', [])
urls = [x['@odata.id'] for x in ethurls]
else:
urls = []
return urls
def _get_adp_urls(self):
adpurls = self.sysinfo.get('PCIeDevices', [])
if adpurls:
urls = [x['@odata.id'] for x in adpurls]
else:
urls = []
return urls
@property
def oem(self):
if not self._oem:
self._oem = oem.get_oem_handler(
self.sysinfo, self.sysurl, self.wc, self._urlcache)
return self._oem
def get_description(self):
return self.oem.get_description()
def _get_cpu_inventory(self, onlynames=False, withids=False, urls=None):
if not urls:
urls = self._get_cpu_urls()
if not urls:
return
for res in self._do_bulk_requests(urls):
currcpuinfo, url = res
name = currcpuinfo.get('Name', 'CPU')
if name in self._hwnamemap:
self._hwnamemap[name] = None
else:
self._hwnamemap[name] = (url, self._get_cpu_inventory)
if onlynames:
yield name
continue
cpuinfo = {'Model': currcpuinfo.get('Model', None)}
yield (name, cpuinfo)
def _get_disk_urls(self):
storurl = self.sysinfo.get('Storage', {}).get('@odata.id', None)
urls = []
if storurl:
storurl = self._do_web_request(storurl)
for url in storurl.get('Members', []):
url = url['@odata.id']
ctldata = self._do_web_request(url)
for durl in ctldata.get('Drives', []):
urls.append(durl['@odata.id'])
return urls
def _get_cpu_urls(self):
cpurl = self.sysinfo.get('Processors', {}).get('@odata.id', None)
if cpurl is None:
urls = []
else:
cpurl = self._do_web_request(cpurl)
urls = [x['@odata.id'] for x in cpurl.get('Members', [])]
return urls
def _get_mem_inventory(self, onlyname=False, withids=False, urls=None):
if not urls:
urls = self._get_mem_urls()
if not urls:
return
for mem in self._do_bulk_requests(urls):
currmeminfo, url = mem
name = currmeminfo.get('Name', 'Memory')
if name in self._hwnamemap:
self._hwnamemap[name] = None
else:
self._hwnamemap[name] = (url, self._get_mem_inventory)
if onlyname:
yield name
continue
if currmeminfo.get(
'Status', {}).get('State', 'Absent') == 'Absent':
yield (name, None)
continue
currspeed = currmeminfo.get('OperatingSpeedMhz', None)
if currspeed:
currspeed = int(currspeed)
currspeed = currspeed * 8 - (currspeed * 8 % 100)
meminfo = {
'capacity_mb': currmeminfo.get('CapacityMiB', None),
'manufacturer': currmeminfo.get('Manufacturer', None),
'memory_type': currmeminfo.get('MemoryDeviceType', None),
'model': currmeminfo.get('PartNumber', None),
'module_type': currmeminfo.get('BaseModuleType', None),
'serial': currmeminfo.get('SerialNumber', None),
'speed': currspeed,
}
yield (name, meminfo)
def _get_mem_urls(self):
memurl = self.sysinfo.get('Memory', {}).get('@odata.id', None)
if not memurl:
urls = []
else:
memurl = self._do_web_request(memurl)
urls = [x['@odata.id'] for x in memurl.get('Members', [])]
return urls
def get_event_log(self, clear=False):
bmcinfo = self._do_web_request(self._bmcurl)
lsurl = bmcinfo.get('LogServices', {}).get('@odata.id', None)
if not lsurl:
return
currtime = bmcinfo.get('DateTime', None)
correction = timedelta(0)
if currtime:
currtime = _parse_time(currtime)
if currtime:
now = datetime.now(tz.tzoffset('', 0))
try:
correction = now - currtime
except TypeError:
correction = now - currtime.replace(tzinfo=tz.tzoffset('', 0))
lurls = self._do_web_request(lsurl).get('Members', [])
for lurl in lurls:
lurl = lurl['@odata.id']
loginfo = self._do_web_request(lurl, cache=(not clear))
logtag = loginfo.get('@odata.etag', None)
entriesurl = loginfo.get('Entries', {}).get('@odata.id', None)
if not entriesurl:
continue
entries = self._do_web_request(entriesurl, cache=False)
if clear:
# The clear is against the log service etag, not entries
# so we have to fetch service etag after we fetch entries
# until we can verify that the etag is consistent to prove
# that the clear is atomic
newloginfo = self._do_web_request(lurl, cache=False)
clearurl = newloginfo.get('Actions', {}).get(
'#LogService.ClearLog', {}).get('target', '')
while clearurl:
while logtag != newloginfo.get('@odata.etag', None):
logtag = newloginfo.get('@odata.etag', None)
entries = self._do_web_request(entriesurl, cache=False)
newloginfo = self._do_web_request(lurl, cache=False)
try:
self._do_web_request(clearurl, method='POST', etag=logtag)
clearurl = False
except exc.PyghmiException as e:
if 'EtagPreconditionalFailed' not in str(e):
raise
newloginfo = self._do_web_request(lurl, cache=False)
for log in entries.get('Members', []):
record = {}
entime = _parse_time(log.get('Created', '')) + correction
entime = entime.astimezone(tz.gettz())
record['timestamp'] = entime.strftime('%Y-%m-%dT%H:%M:%S')
record['message'] = log.get('Message', None)
record['severity'] = _healthmap.get(
entries.get('Severity', 'Warning'), const.Health.Critical)
yield record
def get_sensor_descriptions(self):
for sensor in natural_sort(self._sensormap):
yield self._sensormap[sensor]
def get_sensor_reading(self, sensorname):
if sensorname not in self._sensormap:
raise Exception('Sensor not found')
sensor = self._sensormap[sensorname]
reading = self._do_web_request(sensor['url'], cache=1)
return self._extract_reading(sensor, reading)
def get_sensor_data(self):
for sensor in natural_sort(self._sensormap):
yield self.get_sensor_reading(sensor)
def _extract_reading(self, sensor, reading):
if sensor['type'] == 'Fan':
for fan in reading['Fans']:
if fan['Name'] == sensor['name']:
return SensorReading(None, sensor, value=fan['Reading'], units=fan['ReadingUnits'])
elif sensor['type'] == 'Temperature':
for temp in reading['Temperatures']:
if temp['Name'] == sensor['name'] and 'ReadingCelsius' in temp:
return SensorReading(None, sensor, value=temp['ReadingCelsius'], units='°C')
elif sensor['type'] == 'Voltage':
for volt in reading['Voltages']:
if volt['Name'] == sensor['name'] and 'ReadingVolts' in volt:
return SensorReading(None, sensor, value=volt['ReadingVolts'], units='V')
if __name__ == '__main__':
import os
import sys
print(repr(
Command(sys.argv[1], os.environ['BMCUSER'], os.environ['BMCPASS'],
verifycallback=lambda x: True).get_power()))