2
0
mirror of https://opendev.org/x/pyghmi synced 2025-07-08 21:41:31 +00:00

Fix all pep8 errors except E128 and E501

Ran autopep8 on the four .py files in this module.
Manully fixed many additional pep8 errors as well,
Turned some long-line-comments into proper NOTE lines,
but there are more to do...

Change-Id: I657ba037863860ec3956150931c2c0e41085bd63
This commit is contained in:
Devananda van der Veen
2013-07-02 06:09:21 -07:00
parent 08d2e54d68
commit ca22a48de5
4 changed files with 846 additions and 788 deletions

View File

@ -1,27 +1,22 @@
"""
@author: Jarrod Johnson <jbjohnso@us.ibm.com>
# vim: tabstop=4 shiftwidth=4 softtabstop=4
Copyright 2013 IBM Corporation
# Copyright 2013 IBM Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This represents the low layer message framing portion of IPMI
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
from ipmi.private import session
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from ipmi.private.session import Session, call_with_optional_args
def _raiseorcall(callback,response,args=None):
if callback is None:
if 'error' in response:
raise Exception(response['error'])
else:
call_with_optional_args(callback,args)
boot_devices = {
'net': 4,
@ -48,21 +43,32 @@ power_states = {
"reset": 3,
"softoff": 5,
"shutdown": 5,
"boot": -1, #not a valid direct boot state, but here for convenience of 'in' statement
# NOTE(jbjohnso): -1 is not a valid direct boot state,
# but here for convenience of 'in' statement
"boot": -1,
}
def _raiseorcall(callback, response, args=None):
if callback is None:
if 'error' in response:
raise Exception(response['error'])
else:
session.call_with_optional_args(callback, args)
class Command(object):
"""Send IPMI commands to BMCs.
This object represents a persistent session to an IPMI device (bmc) and
This object represents a persistent session to an IPMI device (bmc) and
allows the caller to reuse a single session to issue multiple commands.
This class can be used in a synchronous (wait for answer and return) or
asynchronous fashion (return immediately and provide responses by
This class can be used in a synchronous (wait for answer and return) or
asynchronous fashion (return immediately and provide responses by
callbacks). Synchronous mode is the default behavior.
For asynchronous mode, simply pass in a callback function. It is
recommended to pass in an instance method to callback and ignore the
callback_args parameter. However, callback_args can optionally be populated
For asynchronous mode, simply pass in a callback function. It is
recommended to pass in an instance method to callback and ignore the
callback_args parameter. However, callback_args can optionally be populated
if desired.
:param bmc: hostname or ip address of the BMC
@ -71,44 +77,44 @@ class Command(object):
:param kg: Optional parameter to use if BMC has a particular Kg configured
"""
def __init__(self,bmc,userid,password,kg=None):
#TODO(jbjohnso): accept tuples and lists of each parameter for mass
#operations without pushing the async complexities up the stack
self.ipmi_session=Session(bmc=bmc,
userid=userid,
password=password,
kg=kg)
def __init__(self, bmc, userid, password, kg=None):
# TODO(jbjohnso): accept tuples and lists of each parameter for mass
# operations without pushing the async complexities up the stack
self.ipmi_session = session.Session(bmc=bmc,
userid=userid,
password=password,
kg=kg)
def get_bootdev(self,callback=None,callback_args=None):
def get_bootdev(self, callback=None, callback_args=None):
"""Get current boot device override information.
Provides the current requested boot device. Be aware that not all IPMI
devices support this. Even in BMCs that claim to, occasionally the BIOS
or UEFI fail to honor it. This is usually only applicable to the next
or UEFI fail to honor it. This is usually only applicable to the next
reboot.
:param callback: optional callback
:param callback_args: optional arguments to callback
:param callback_args: optional arguments to callback
:returns: dict or True -- If callback is not provided, the response
will be provided in the return
"""
self.commandcallback=callback
self.commandcallbackargs=callback_args
self.commandcallback = callback
self.commandcallbackargs = callback_args
self.ipmi_session.raw_command(netfn=0,
command=9,
data=(5,0,0),
data=(5, 0, 0),
callback=self._got_bootdev)
return self._waitifsync()
def _waitifsync(self):
self.requestpending=True
self.requestpending = True
if self.commandcallback is None:
while self.requestpending:
Session.wait_for_rsp()
session.Session.wait_for_rsp()
return self.lastresponse
return True
def set_power(self,powerstate,wait=False,callback=None,callback_args=None):
def set_power(self, powerstate, wait=False, callback=None, callback_args=None):
"""Request power state change
:param powerstate:
@ -121,67 +127,70 @@ class Command(object):
:param wait: If True, do not return or callback until system actually
completes requested state change
:param callback: optional callback
:param callback_args: optional arguments to callback
:param callback_args: optional arguments to callback
:returns: dict or True -- If callback is not provided, the response
"""
self.commandcallback=callback
self.commandcallbackargs=callback_args
self.commandcallback = callback
self.commandcallbackargs = callback_args
if powerstate not in power_states:
_raiseorcall(self.commandcallback,
{'error':
"Unknown power state %s requested"%powerstate},
{'error':
"Unknown power state %s requested" % powerstate},
self.commandcallbackargs)
self.newpowerstate=powerstate
self.wait_for_power=wait
self.newpowerstate = powerstate
self.wait_for_power = wait
self.ipmi_session.raw_command(netfn=0,
command=1,
callback=self._set_power_with_chassis_info
)
)
return self._waitifsync()
def _set_power_with_chassis_info(self,response):
def _set_power_with_chassis_info(self, response):
if 'error' in response:
_raiseorcall(self.commandcallback,response,self.commandcallbackargs)
_raiseorcall(
self.commandcallback, response, self.commandcallbackargs)
return
self.powerstate = 'on' if (response['data'][0] & 1) else 'off'
if self.newpowerstate=='boot':
self.newpowerstate = 'on' if self.powerstate=='off' else 'reset'
if self.newpowerstate == 'boot':
self.newpowerstate = 'on' if self.powerstate == 'off' else 'reset'
self.ipmi_session.raw_command(netfn=0,
command=2,
data=[power_states[self.newpowerstate]],
callback=self._power_set)
def _power_set(self,response):
def _power_set(self, response):
if 'error' in response:
_raiseorcall(self.commandcallback,response,self.commandcallbackargs)
_raiseorcall(
self.commandcallback, response, self.commandcallbackargs)
return
self.lastresponse={'pendingpowerstate': self.newpowerstate}
if (self.wait_for_power and
self.newpowerstate in ('on','off','shutdown','softoff')):
if self.newpowerstate in ('softoff','shutdown'):
self.waitpowerstate='off'
self.lastresponse = {'pendingpowerstate': self.newpowerstate}
if (self.wait_for_power and
self.newpowerstate in ('on', 'off', 'shutdown', 'softoff')):
if self.newpowerstate in ('softoff', 'shutdown'):
self.waitpowerstate = 'off'
else:
self.waitpowerstate=self.newpowerstate
self.waitpowerstate = self.newpowerstate
self.ipmi_session.raw_command(netfn=0,
command=1,
callback=self._power_wait)
else:
self.requestpending=False
self.requestpending = False
if self.commandcallback:
call_with_optional_args(self.commandcallback,
session.call_with_optional_args(self.commandcallback,
self.lastresponse,
self.commandcallbackargs)
def _power_wait(self,response):
def _power_wait(self, response):
if 'error' in response:
_raiseorcall(self.commandcallback,response,self.commandcallbackargs)
_raiseorcall(
self.commandcallback, response, self.commandcallbackargs)
return
self.powerstate = 'on' if (response['data'][0] & 1) else 'off'
if self.powerstate==self.waitpowerstate:
self.requestpending=False
self.lastresponse={'powerstate': self.powerstate}
if self.powerstate == self.waitpowerstate:
self.requestpending = False
self.lastresponse = {'powerstate': self.powerstate}
if self.commandcallback:
call_with_optional_args(self.commandcallback,
session.call_with_optional_args(self.commandcallback,
self.lastresponse,
self.commandcallbackargs)
return
@ -211,51 +220,53 @@ class Command(object):
In practice, this flag not being set does not preclude
UEFI boot on any system I've encountered.
:param callback: optional callback
:param callback_args: optional arguments to callback
:param callback_args: optional arguments to callback
:returns: dict or True -- If callback is not provided, the response
"""
self.commandcallback=callback
self.commandcallbackargs=callback_args
self.commandcallback = callback
self.commandcallbackargs = callback_args
if bootdev not in boot_devices:
_raiseorcall(self.commandcallback,
{'error': "Unknown bootdevice %s requested"%bootdev},
self.commandcallbackargs)
self.bootdev=boot_devices[bootdev]
self.persistboot=persist
self.uefiboot=uefiboot
#first, we disable timer by way of set system boot options,
#then move on to set chassis capabilities
self.requestpending=True
#Set System Boot Options is netfn=0, command=8, data
{'error': "Unknown bootdevice %s requested" %
bootdev},
self.commandcallbackargs)
self.bootdev = boot_devices[bootdev]
self.persistboot = persist
self.uefiboot = uefiboot
# first, we disable timer by way of set system boot options,
# then move on to set chassis capabilities
self.requestpending = True
# Set System Boot Options is netfn=0, command=8, data
self.ipmi_session.raw_command(netfn=0,
command=8,data=(3,8),
command=8, data=(3, 8),
callback=self._bootdev_timer_disabled)
if callback is None:
while self.requestpending:
Session.wait_for_rsp()
session.Session.wait_for_rsp()
return self.lastresponse
def _bootdev_timer_disabled(self,response):
self.requestpending=False
self.lastresponse=response
def _bootdev_timer_disabled(self, response):
self.requestpending = False
self.lastresponse = response
if 'error' in response:
_raiseorcall(self.commandcallback,response,self.commandcallbackargs)
_raiseorcall(
self.commandcallback, response, self.commandcallbackargs)
return
bootflags=0x80
bootflags = 0x80
if self.uefiboot:
bootflags = bootflags | 1<<5
bootflags = bootflags | 1 << 5
if self.persistboot:
bootflags = bootflags | 1<<6
if self.bootdev==0:
bootflags=0
data=(5,bootflags,self.bootdev,0,0,0)
bootflags = bootflags | 1 << 6
if self.bootdev == 0:
bootflags = 0
data = (5, bootflags, self.bootdev, 0, 0, 0)
self.ipmi_session.raw_command(netfn=0,
command=8,
data=data,
callback=self.commandcallback,
callback_args=self.commandcallbackargs)
def raw_command(self,
netfn,
command,
@ -273,85 +284,73 @@ class Command(object):
:param command: Command value
:param data: Command data as a tuple or list
:param callback: optional callback
:param callback_args: optional arguments to callback
:param callback_args: optional arguments to callback
:returns: dict or True -- If callback is not provided, the response
"""
response=self.ipmi_session.raw_command(netfn=0,
command=1,
callback=callback,
callback_args=callback_args)
if response: #this means there was no callback
response = self.ipmi_session.raw_command(netfn=0,
command=1,
callback=callback,
callback_args=callback_args)
if response: # this means there was no callback
if 'error' in response:
raise Exception(response['error'])
return response
def _got_bootdev(self,response):
#interpret response per 'get system boot options'
self.requestpending=False
def _got_bootdev(self, response):
# interpret response per 'get system boot options'
self.requestpending = False
if 'error' in response:
_raiseorcall(self.commandcallback,response,self.commandcallbackargs)
_raiseorcall(
self.commandcallback, response, self.commandcallbackargs)
return
#this should only be invoked for get system boot option complying to
#ipmi spec and targeting the 'boot flags' parameter
assert (response['command'] == 9 and
response['netfn'] == 1 and
response['data'][0]==1 and
(response['data'][1]&0b1111111)==5)
if (response['data'][1] & 0b10000000 or
not response['data'][2] & 0b10000000):
self.lastresponse={ 'bootdev': 'default' }
else: #will consult data2 of the boot flags parameter for the data
# this should only be invoked for get system boot option complying to
# ipmi spec and targeting the 'boot flags' parameter
assert (response['command'] == 9 and
response['netfn'] == 1 and
response['data'][0] == 1 and
(response['data'][1] & 0b1111111) == 5)
if (response['data'][1] & 0b10000000 or
not response['data'][2] & 0b10000000):
self.lastresponse = {'bootdev': 'default'}
else: # will consult data2 of the boot flags parameter for the data
bootnum = (response['data'][3] & 0b111100) >> 2
bootdev = boot_devices[bootnum]
if (bootdev):
self.lastresponse={'bootdev': bootdev}
self.lastresponse = {'bootdev': bootdev}
else:
self.lastresponse={'bootdev': bootnum}
self.lastresponse = {'bootdev': bootnum}
if self.commandcallback:
call_with_optional_args(self.commandcallback,
session.call_with_optional_args(self.commandcallback,
self.lastresponse,
self.commandcallbackargs)
def get_power(self,callback=None,callback_args=None):
"""
Get current power state of the managed system
The response, if successful, should contain 'powerstate' key and
def get_power(self, callback=None, callback_args=None):
"""Get current power state of the managed system
The response, if successful, should contain 'powerstate' key and
either 'on' or 'off' to indicate current state.
:param callback: optional callback
:param callback_args: optional arguments to callback
:param callback_args: optional arguments to callback
:returns: dict or True -- If callback is not provided, the response
"""
self.commandcallback=callback
self.commandcallbackargs=callback_args
self.commandcallback = callback
self.commandcallbackargs = callback_args
self.ipmi_session.raw_command(netfn=0,
command=1,
callback=self._got_power)
return self._waitifsync()
def _got_power(self,response):
self.requestpending=False
def _got_power(self, response):
self.requestpending = False
if 'error' in response:
_raiseorcall(self.commandcallback,response,self.commandcallbackargs)
_raiseorcall(
self.commandcallback, response, self.commandcallbackargs)
return
assert(response['command'] == 1 and response['netfn'] == 1)
self.powerstate = 'on' if (response['data'][0] & 1) else 'off'
self.lastresponse={'powerstate': self.powerstate}
self.lastresponse = {'powerstate': self.powerstate}
if self.commandcallback:
call_with_optional_args(self.commandcallback,
session.call_with_optional_args(self.commandcallback,
self.lastresponse,
self.commandcallbackargs)
if __name__ == "__main__":
import sys
import os
ipmicmd = ipmi_command(bmc=sys.argv[1],
userid=sys.argv[2],
password=os.environ['IPMIPASS'])
print ipmicmd.get_power()
print ipmicmd.set_power('on',wait=True)
print ipmicmd.get_bootdev()
print ipmicmd.set_bootdev('network')
print ipmicmd.get_bootdev()
print ipmicmd.set_bootdev('default')
print ipmicmd.get_bootdev()

View File

@ -1,25 +1,23 @@
"""
@author: Jarrod Johnson <jbjohnso@us.ibm.com>
# vim: tabstop=4 shiftwidth=4 softtabstop=4
Copyright 2013 IBM Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
# Copyright 2013 IBM Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
payload_types = {
'ipmi': 0x0,
'sol' : 0x1,
'sol': 0x1,
'rmcpplusopenreq': 0x10,
'rmcpplusopenresponse': 0x11,
'rakp1': 0x12,
@ -29,50 +27,51 @@ payload_types = {
}
rmcp_codes = {
1: 'Insufficient resources to create new session (wait for existing sessions to timeout)',
2: 'Invalid Session ID',
3: 'Invalid payload type',
4: 'Invalid authentication algorithm',
5: 'Invalid integrity algorithm',
6: 'No matching integrity payload',
7: 'No matching integrity payload',
8: 'Inactive Session ID',
9: 'Invalid role',
0xa: 'Unauthorized role or privilege level requested',
0xb: 'Insufficient resources tocreate a session at the requested role',
0xc: 'Invalid username length',
0xd: 'Unauthorized name',
0xe: 'Unauthorized GUID',
0xf: 'Invalid integrity check value',
0x10: 'Invalid confidentiality algorithm',
0x11: 'No Cipher suite match with proposed security algorithms',
0x12: 'Illegal or unrecognized parameter',
1: ("Insufficient resources to create new session (wait for existing "
"sessions to timeout)"),
2: "Invalid Session ID",
3: "Invalid payload type",
4: "Invalid authentication algorithm",
5: "Invalid integrity algorithm",
6: "No matching integrity payload",
7: "No matching integrity payload",
8: "Inactive Session ID",
9: "Invalid role",
0xa: "Unauthorized role or privilege level requested",
0xb: "Insufficient resources tocreate a session at the requested role",
0xc: "Invalid username length",
0xd: "Unauthorized name",
0xe: "Unauthorized GUID",
0xf: "Invalid integrity check value",
0x10: "Invalid confidentiality algorithm",
0x11: "No Cipher suite match with proposed security algorithms",
0x12: "Illegal or unrecognized parameter",
}
command_completion_codes = {
(7,0x39): {
(7, 0x39): {
0x81: "Invalid user name",
0x82: "Null user disabled",
},
(7,0x3a): {
(7, 0x3a): {
0x81: "No available login slots",
0x82: "No available login slots for requested user",
0x83: "No slot available with requested privilege level",
0x84: "Session sequence number out of range",
0x85: "Invalid session ID",
0x86: "Requested privilege level exceeds requested user permissions on this channel",
0x86: ("Requested privilege level exceeds requested user permissions "
"on this channel"),
},
(7,0x3b): { #Set session privilege level
(7, 0x3b): { # Set session privilege level
0x80: "User is not allowed requested priveleg level",
0x81: "Requested privilege level is not allowed over this channel",
0x82: "Cannot disable user level authentication",
},
(1,8): { #set system boot options
(1, 8): { # set system boot options
0x80: "Parameter not supported",
0x81: "Attempt to set set 'set in progress' when not 'set complete'",
0x82: "Attempt to write read-only parameter",
}
}
ipmi_completion_codes = {
@ -102,4 +101,3 @@ ipmi_completion_codes = {
0xd6: "Cannot execute command because subfunction disabled or unavailable",
0xff: "Unspecified",
}

File diff suppressed because it is too large Load Diff

View File

@ -24,22 +24,22 @@ import os
import sys
from ipmi.command import Command
password=os.environ['IPMIPASSWORD']
os.environ['IPMIPASSWORD']=""
password = os.environ['IPMIPASSWORD']
os.environ['IPMIPASSWORD'] = ""
if (len(sys.argv) < 3):
print "Usage:"
print " IPMIPASSWORD=password %s bmc username <cmd> <optarg>"%sys.argv[0]
print " IPMIPASSWORD=password %s bmc username <cmd> <optarg>" % sys.argv[0]
sys.exit(1)
bmc=sys.argv[1]
userid=sys.argv[2]
command=sys.argv[3]
arg=None
if len(sys.argv)==5:
arg=sys.argv[4]
ipmicmd = Command(bmc=bmc,userid=userid,password=password)
bmc = sys.argv[1]
userid = sys.argv[2]
command = sys.argv[3]
arg = None
if len(sys.argv) == 5:
arg = sys.argv[4]
ipmicmd = Command(bmc=bmc, userid=userid, password=password)
if command == 'power':
if arg:
print ipmicmd.set_power(arg,wait=True)
print ipmicmd.set_power(arg, wait=True)
else:
print ipmicmd.get_power()
elif command == 'bootdev':
@ -47,4 +47,3 @@ elif command == 'bootdev':
print ipmicmd.set_bootdev(arg)
else:
print ipmicmd.get_bootdev()