2
0
mirror of https://opendev.org/x/pyghmi synced 2025-01-27 19:37:44 +00:00

Fix various python2/3 problems

Normalize as possible to str type, whether that is bytes or unicode.

Fix internal consumers of xraw_command that have bytes expectation that
get broken by memoryview being more bytearray like.

Use io.BytesIO as python3 stdlib doesn't like it's own StringIO.

Change-Id: I0140c8c19421786949a52b0e1fa4b392684a78de
This commit is contained in:
Jarrod Johnson 2019-10-01 15:34:09 -04:00
parent 33aee1ecdb
commit 1fe80e7960
6 changed files with 41 additions and 21 deletions

View File

@ -1910,8 +1910,8 @@ class Command(object):
"""
self.oem_init()
mcinfo = self.xraw_command(netfn=6, command=1)
bmcver = '{0}.{1}'.format(
ord(mcinfo['data'][2]), hex(ord(mcinfo['data'][3]))[2:])
major, minor = struct.unpack('BB', mcinfo['data'][2:4])
bmcver = '{0}.{1}'.format(major, hex(minor)[2:])
return self._oem.get_oem_firmware(bmcver, components)
def get_capping_enabled(self):

View File

@ -227,7 +227,7 @@ class FRU(object):
# to fru info, particularly the last values
# Additionally 0xfe has been observed, which should be a thorn, but
# again assuming termination of string is more likely than thorn.
retinfo = retinfo.rstrip('\xfe\xff\x10\x03\x00 ')
retinfo = retinfo.rstrip(b'\xfe\xff\x10\x03\x00 ')
if lang in (0, 25):
try:
retinfo = retinfo.decode('iso-8859-1')
@ -242,7 +242,7 @@ class FRU(object):
# removing trailing spaces and nulls like makes sense for text
# and rely on vendors to workaround deviations in their OEM
# module
retinfo = retinfo.rstrip('\x00 ')
#retinfo = retinfo.rstrip(b'\x00 ')
return retinfo, newoffset
elif currtype == 1: # BCD 'plus'
retdata = ''

View File

@ -25,10 +25,10 @@ class EnergyManager(object):
# the Lenovo, then fallback to IBM
# We start with a 'find firmware instance' to test the water and
# get the handle (which has always been the same, but just in case
self.iana = bytearray('\x66\x4a\x00')
self.iana = bytearray(b'\x66\x4a\x00')
try:
rsp = ipmicmd.xraw_command(netfn=0x2e, command=0x82,
data=self.iana + '\x00\x00\x01')
data=self.iana + b'\x00\x00\x01')
except pygexc.IpmiException as ie:
if ie.ipmicode == 193: # try again with IBM IANA
self.iana = bytearray('\x4d\x4f\x00')

View File

@ -16,6 +16,7 @@
# limitations under the License.
import base64
import binascii
from datetime import datetime
import errno
import fnmatch
@ -77,7 +78,10 @@ def natural_sort(iterable):
def fixup_uuid(uuidprop):
baduuid = ''.join(uuidprop.split())
uuidprefix = (baduuid[:8], baduuid[8:12], baduuid[12:16])
a = struct.pack('<IHH', *[int(x, 16) for x in uuidprefix]).encode('hex')
a = struct.pack('<IHH', *[int(x, 16) for x in uuidprefix])
a = binascii.hexlify(a)
if not isinstance(a, str):
a = a.decode('utf-8')
uuid = (a[:8], a[8:12], a[12:16], baduuid[16:20], baduuid[20:])
return '-'.join(uuid).upper()
@ -129,6 +133,8 @@ class IMMClient(object):
@staticmethod
def _parse_builddate(strval):
if not isinstance(strval, ,str):
strval = strval.decode('utf-8')
try:
return datetime.strptime(strval, '%Y/%m/%d %H:%M:%S')
except ValueError:
@ -157,8 +163,10 @@ class IMMClient(object):
@classmethod
def parse_imm_buildinfo(cls, buildinfo):
buildid = buildinfo[:9].rstrip(' \x00')
bdt = ' '.join(buildinfo[9:].replace('\x00', ' ').split())
buildid = bytes(buildinfo[:9]).rstrip(b' \x00')
if not isinstance(buildid, str):
buildid = buildid.decode('utf-8')
bdt = b' '.join(bytes(buildinfo[9:]).replace(b'\x00', b' ').split())
bdate = cls._parse_builddate(bdt)
return buildid, bdate
@ -294,7 +302,10 @@ class IMMClient(object):
return None
propdata = rsp['data'][3:] # second two bytes are size, don't need it
if propdata[0] & 0b10000000: # string, for now assume length valid
return str(propdata[1:]).rstrip(' \x00')
ret = bytes(propdata[1:]).rstrip(b' \x00')
if not isinstance(ret, str):
ret = ret.decode('utf-8')
return ret
else:
raise Exception('Unknown format for property: ' + repr(propdata))
@ -1455,7 +1466,7 @@ class XCCClient(IMMClient):
try:
fpga = self.ipmicmd.xraw_command(netfn=0x3a, command=0x6b,
data=(0,))
fpga = '{0}.{1}.{2}'.format(*[ord(x) for x in fpga['data']])
fpga = '{0}.{1}.{2}'.format(*struct.unpack('BBB', fpga['data']))
yield ('FPGA', {'version': fpga})
except pygexc.IpmiException as ie:
if ie.ipmicode != 193:
@ -1795,7 +1806,7 @@ class XCCClient(IMMClient):
def get_health(self, summary):
try:
wc = self.get_webclient(False)
except Exception:
except (socket.timeout, socket.error) as e:
wc = None
if not wc:
summary['health'] = pygconst.Health.Critical;

View File

@ -597,9 +597,14 @@ class SDREntry(object):
tstr += chr(((data[2] & 0b11) << 4) +
(data[1] >> 4) + 0x20)
tstr += chr((data[2] >> 2) + 0x20)
if not isinstance(tstr, str):
tstr = tstr.decode('utf-8')
return tstr
elif ipmitype == 3: # ACSII+LATIN1
return struct.pack("%dB" % len(data), *data)
ret = struct.pack("%dB" % len(data), *data)
if not isinstance(ret, str):
ret = ret.decode('utf-8')
return ret
class SDR(object):

View File

@ -1,4 +1,4 @@
# Copyright 2015-2017 Lenovo
# Copyright 2015-2019 Lenovo
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -28,11 +28,10 @@ import threading
try:
import Cookie
import httplib
import StringIO
except ImportError:
import http.client as httplib
import http.cookies as Cookie
import io as StringIO
import io
__author__ = 'jjohnson2'
@ -140,8 +139,13 @@ class SecureHTTPConnection(httplib.HTTPConnection, object):
self.stdheaders[key] = value
def set_basic_credentials(self, username, password):
self.stdheaders['Authorization'] = 'Basic {0}'.format(
base64.b64encode(':'.join((username, password))))
authinfo = ':'.join((username, password))
if not isinstance(authinfo, bytes):
authinfo = authinfo.encode('utf-8')
authinfo = base64.b64encode(authinfo)
if not isinstance(authinfo, str):
authinfo = authinfo.decode('utf-8')
self.stdheaders['Authorization'] = 'Basic {0}'.format(authinfo)
def connect(self):
addrinfo = socket.getaddrinfo(self.host, self.port)[0]
@ -203,7 +207,7 @@ class SecureHTTPConnection(httplib.HTTPConnection, object):
rsp = webclient.getresponse()
body = rsp.read()
if rsp.getheader('Content-Encoding', None) == 'gzip':
body = gzip.GzipFile(fileobj=StringIO.StringIO(body)).read()
body = gzip.GzipFile(fileobj=io.BytesIO(body)).read()
if rsp.status >= 200 and rsp.status < 300:
return json.loads(body) if body else {}, rsp.status
return body, rsp.status
@ -245,7 +249,7 @@ class SecureHTTPConnection(httplib.HTTPConnection, object):
"""
if data is None:
data = open(filename, 'rb')
self._upbuffer = StringIO.StringIO(get_upload_form(filename, data,
self._upbuffer = io.BytesIO(get_upload_form(filename, data,
formname,
otherfields))
ulheaders = self.stdheaders.copy()
@ -268,7 +272,7 @@ class SecureHTTPConnection(httplib.HTTPConnection, object):
rsp.read())
body = rsp.read()
if rsp.getheader('Content-Encoding', None) == 'gzip':
body = gzip.GzipFile(fileobj=StringIO.StringIO(body)).read()
body = gzip.GzipFile(fileobj=io.BytesIO(body)).read()
return body
def get_upload_progress(self):