2
0
mirror of https://opendev.org/x/pyghmi synced 2025-08-22 02:50:23 +00:00

Report/Configure LAN alert destinations

Provide a facility to manipulate the LAN alert destinations.
Additionally update the LAN configuration functions to have
a function to intelligently find the correct LAN channel.
For now the 'check current channel' preference should always
activate, but the function should be able to cover a hypothetical
in-band scenario should session become so enabled.

Change-Id: I45ce81addb882110e04d0526997a96b6b2ab0c2e
This commit is contained in:
Jarrod Johnson
2015-05-22 14:34:33 -04:00
parent 8b94506ce9
commit 9efd878fd5

View File

@@ -16,6 +16,7 @@
# limitations under the License.
# This represents the low layer message framing portion of IPMI
from itertools import chain
import pyghmi.constants as const
import pyghmi.exceptions as exc
@@ -25,6 +26,7 @@ from pyghmi.ipmi.oem.lookup import get_oem_handler
from pyghmi.ipmi.private import session
import pyghmi.ipmi.private.util as pygutil
import pyghmi.ipmi.sdr as sdr
import socket
import struct
@@ -93,6 +95,7 @@ class Command(object):
self.bmc = bmc
self._sdr = None
self._oem = None
self._netchannel = None
if onlogon is not None:
self.ipmi_session = session.Session(bmc=bmc,
userid=userid,
@@ -556,7 +559,164 @@ class Command(object):
yield {'name': self._sdr.sensors[sensor].name,
'type': self._sdr.sensors[sensor].sensor_type}
def set_channel_access(self, channel=14, access_update_mode='non_volatile',
def get_network_channel(self):
"""Get a reasonable 'default' network channel.
When configuring/examining network configuration, it's desirable to
find the correct channel. Here we run with the 'real' number of the
current channel if it is a LAN channel, otherwise it evaluates
all of the channels to find the first workable LAN channel and returns
that
"""
if self._netchannel is None:
for channel in chain((0xe, ), xrange(1, 0xc)):
try:
rsp = self.xraw_command(
netfn=6, command=0x42, data=(channel,))
except exc.IpmiException as ie:
if ie.ipmicode == 0xcc:
# We have hit an invalid channel, move on to next
# candidate
continue
else:
raise
chantype = ord(rsp['data'][1]) & 0b1111111
if chantype in (4, 6):
try:
# Some implementations denote an inactive channel
# by refusing to do parameter retrieval
self.xraw_command(
netfn=0xc, command=2, data=(channel, 5, 0, 0))
# If still here, the channel seems serviceable...
# However some implementations may still have
# ambiguous channel info, that will need to be
# picked up on an OEM extension...
self._netchannel = ord(rsp['data'][0]) & 0b1111
break
except exc.IpmiException as ie:
# This means the attempt to fetch parameter 5 failed,
# therefore move on to next candidate channel
continue
return self._netchannel
def get_alert_destination_count(self, channel=None):
"""Get the number of supported alert destinations
:param channel: Channel for alerts to be examined, defaults to current
"""
if channel is None:
channel = self.get_network_channel()
rqdata = (channel, 0x11, 0, 0)
rsp = self.xraw_command(netfn=0xc, command=2, data=rqdata)
return ord(rsp['data'][1])
def get_alert_destination(self, destination=0, channel=None):
"""Get alert destination
Get a specified alert destination. Returns a dictionary of relevant
configuration. The following keys may be present:
acknowledge_required - Indicates whether the target expects an
acknowledgement
acknowledgement_timeout - How long it will wait for an acknowledgment
before retrying
retries - How many attempts will be made to deliver the alert to this
destination
address_format - 'ipv4' or 'ipv6'
address - The IP address of the target
:param destination: The destination number. Defaults to 0
:param channel: The channel for alerting. Defaults to current channel
"""
destinfo = {}
if channel is None:
channel = self.get_network_channel()
rqdata = (channel, 18, destination, 0)
rsp = self.xraw_command(netfn=0xc, command=2, data=rqdata)
dtype, acktimeout, retries = struct.unpack('BBB', rsp['data'][2:])
destinfo['acknowledge_required'] = dtype & 0b10000000 == 0b10000000
# Ignore destination type for now...
if destinfo['acknowledge_required']:
destinfo['acknowledgement_timeout'] = acktimeout
destinfo['retries'] = retries
rqdata = (channel, 19, destination, 0)
rsp = self.xraw_command(netfn=0xc, command=2, data=rqdata)
if ord(rsp['data'][2]) & 0b11110000 == 0:
destinfo['address_format'] = 'ipv4'
destinfo['address'] = socket.inet_ntop(socket.AF_INET,
rsp['data'][4:8])
elif ord(rsp['data'][2]) & 0b11110000 == 0b10000:
destinfo['address_format'] = 'ipv6'
destinfo['address'] = socket.inet_ntop(socket.AF_INET6,
rsp['data'][3:])
return destinfo
def clear_alert_destination(self, destination=0, channel=None):
"""Clear an alert destination
Remove the specified alert destination configuration.
:param destination: The destination to clear (defaults to 0)
"""
if channel is None:
channel = self.get_network_channel()
self.set_alert_destination(
'0.0.0.0', False, 0, 0, destination, channel)
def set_alert_destination(self, ip=None, acknowledge_required=None,
acknowledge_timeout=None, retries=None,
destination=0, channel=None):
"""Configure one or more parameters of an alert destination
If any parameter is 'None' (default), that parameter is left unchanged.
Otherwise, all given parameters are set by this command.
:param ip: IP address of the destination. It is currently expected
that the calling code will handle any name lookup and
present this data as IP address.
:param acknowledge_required: Whether or not the target should expect
an acknowledgement from this alert target.
:param acknowledge_timeout: Time to wait for acknowledgement if enabled
:param retries: How many times to attempt transmit of an alert.
:param destination: Destination index, defaults to 0.
:param channel: The channel to configure the alert on. Defaults to
current
"""
if channel is None:
channel = self.get_network_channel()
if ip is not None:
destdata = bytearray((channel, 19, destination))
try:
parsedip = socket.inet_pton(socket.AF_INET, ip)
destdata.extend((0, 0))
destdata.extend(parsedip)
destdata.extend('\x00\x00\x00\x00\x00\x00')
except socket.error:
parsedip = socket.inet_pton(socket.AF_INET6, ip)
destdata.append(0b10000000)
destdata.extend(parsedip)
self.xraw_command(netfn=0xc, command=1, data=destdata)
if (acknowledge_required is not None or retries is not None or
acknowledge_timeout is not None):
currtype = self.xraw_command(netfn=0xc, command=2, data=(
channel, 18, destination, 0))
if currtype['data'][0] != '\x11':
raise exc.PyghmiException("Unknown parameter format")
currtype = bytearray(currtype['data'][1:])
if acknowledge_required is not None:
if acknowledge_required:
currtype[1] |= 0b10000000
else:
currtype[1] &= 0b1111111
if acknowledge_timeout is not None:
currtype[2] = acknowledge_timeout
if retries is not None:
currtype[3] = retries
destreq = bytearray((channel, 18))
destreq.extend(currtype)
self.xraw_command(netfn=0xc, command=1, data=destreq)
def set_channel_access(self, channel=None,
access_update_mode='non_volatile',
alerting=False, per_msg_auth=False,
user_level_auth=False, access_mode='always',
privilege_update_mode='non_volatile',
@@ -618,6 +778,8 @@ class Command(object):
* administrator
* proprietary = used by OEM
"""
if channel is None:
channel = self.get_network_channel()
data = []
data.append(channel & 0b00001111)
access_update_modes = {
@@ -666,7 +828,7 @@ class Command(object):
raise Exception(response['error'])
return True
def get_channel_access(self, channel=14, read_mode='volatile'):
def get_channel_access(self, channel=None, read_mode='volatile'):
"""Get channel access
:param channel: number [1:7]
@@ -694,6 +856,8 @@ class Command(object):
}
}
"""
if channel is None:
channel = self.get_network_channel()
data = []
data.append(channel & 0b00001111)
b = 0
@@ -735,7 +899,7 @@ class Command(object):
r['privilege_level'] = privilege_levels[data[1] & 0b00001111]
return r
def get_channel_info(self, channel=14):
def get_channel_info(self, channel=None):
"""Get channel info
:param channel: number [1:7]
@@ -749,6 +913,8 @@ class Command(object):
single- and multi-session operation, as can occur with a
serial/modem channel that supports connection mode auto-detect)
"""
if channel is None:
channel = self.get_network_channel()
data = []
data.append(channel & 0b00001111)
response = self.raw_command(netfn=0x06, command=0x42, data=data)
@@ -794,8 +960,8 @@ class Command(object):
r['Auxiliary Channel Info'] = [data[7], data[8]]
return r
def set_user_access(self, uid, channel=14, callback=False, link_auth=True,
ipmi_msg=True, privilege_level='user'):
def set_user_access(self, uid, channel=None, callback=False,
link_auth=True, ipmi_msg=True, privilege_level='user'):
"""Set user access
:param uid: user number [1:16]
@@ -841,6 +1007,8 @@ class Command(object):
* proprietary
* no_access
"""
if channel is None:
channel = self.get_network_channel()
b = 0b10000000
if callback:
b |= 0b01000000
@@ -865,7 +1033,7 @@ class Command(object):
raise Exception(response['error'])
return True
def get_user_access(self, uid, channel=14):
def get_user_access(self, uid, channel=None):
"""Get user access
:param uid: user number [1:16]
@@ -885,6 +1053,8 @@ class Command(object):
operatorm administrator, proprietary, no_access]
"""
## user access available during call-in or callback direct connection
if channel is None:
channel = self.get_network_channel()
data = [channel, uid]
response = self.raw_command(netfn=0x06, command=0x44, data=data)
if 'error' in response:
@@ -990,16 +1160,18 @@ class Command(object):
raise Exception(response['error'])
return True
def get_channel_max_user_count(self, channel=14):
def get_channel_max_user_count(self, channel=None):
"""Get max users in channel (helper)
:param channel: number [1:7]
:return: int -- often 16
"""
if channel is None:
channel = self.get_network_channel()
access = self.get_user_access(channel=channel, uid=1)
return access['channel_info']['max_user_count']
def get_user(self, uid, channel=14):
def get_user(self, uid, channel=None):
"""Get user (helper)
:param uid: user number [1:16]
@@ -1016,19 +1188,23 @@ class Command(object):
privilege_level: (str)[callback, user, operatorm administrator,
proprietary, no_access]
"""
if channel is None:
channel = self.get_network_channel()
name = self.get_user_name(uid)
access = self.get_user_access(uid, channel)
data = {'name': name, 'uid': uid, 'channel': channel,
'access': access['access']}
return data
def get_name_uids(self, name, channel=14):
def get_name_uids(self, name, channel=None):
"""get list of users (helper)
:param channel: number [1:7]
:return: list of users
"""
if channel is None:
channel = self.get_network_channel()
uid_list = []
max_ids = self.get_channel_max_user_count(channel)
for uid in range(1, max_ids):
@@ -1036,7 +1212,7 @@ class Command(object):
uid_list.append(uid)
return uid_list
def get_users(self, channel=14):
def get_users(self, channel=None):
"""get list of users and channel access information (helper)
:param channel: number [1:7]
@@ -1052,6 +1228,8 @@ class Command(object):
privilege_level: (str)[callback, user, operatorm administrator,
proprietary, no_access]
"""
if channel is None:
channel = self.get_network_channel()
names = {}
max_ids = self.get_channel_max_user_count(channel)
for uid in range(1, max_ids):
@@ -1060,7 +1238,7 @@ class Command(object):
names[uid] = self.get_user(uid=uid, channel=channel)
return names
def create_user(self, uid, name, password, channel=14, callback=False,
def create_user(self, uid, name, password, channel=None, callback=False,
link_auth=True, ipmi_msg=True,
privilege_level='user'):
"""create/ensure a user is created with provided settings (helper)
@@ -1077,6 +1255,8 @@ class Command(object):
"""
# current user might be trying to update.. dont disable
# set_user_password(uid, password, mode='disable')
if channel is None:
channel = self.get_network_channel()
self.set_user_name(uid, name)
self.set_user_access(uid, channel, callback=callback,
link_auth=link_auth, ipmi_msg=ipmi_msg,
@@ -1085,12 +1265,14 @@ class Command(object):
self.set_user_password(uid, mode='enable', password=password)
return True
def user_delete(self, uid, channel=14):
def user_delete(self, uid, channel=None):
"""Delete user (helper)
:param uid: user number [1:16]
:param channel: number [1:7]
"""
if channel is None:
channel = self.get_network_channel()
self.set_user_password(uid, mode='disable', password=None)
self.set_user_name(uid, '')
# TODO(steveweber) perhaps should set user access on all channels