2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-02-26 07:11:35 +00:00

sed to use single quotes

This commit is contained in:
Wera Grzeda 2024-04-23 18:14:48 +02:00
parent bf004fb7b9
commit d779f015d6

View File

@ -1,13 +1,13 @@
# Copyright 2022 Lenovo
#
# Licensed under the Apache License, Version 2.0 (the "License");
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
@ -23,7 +23,7 @@ import eventlet.greenpool as greenpool
def simplify_name(name):
return name.lower().replace(" ", "_").replace("/", "-").replace("_-_", "-")
return name.lower().replace(' ', '_').replace('/', '-').replace('_-_', '-')
pdupool = greenpool.GreenPool(128)
@ -33,15 +33,15 @@ def data_by_type(indata):
databytype = {}
for keyname in indata:
obj = indata[keyname]
objtype = obj.get("type", None)
objtype = obj.get('type', None)
if not objtype:
continue
if objtype in databytype:
raise Exception(
"Multiple instances of type {} not yet supported".format(objtype)
'Multiple instances of type {} not yet supported'.format(objtype)
)
databytype[objtype] = obj
obj["keyname"] = keyname
obj['keyname'] = keyname
return databytype
@ -64,15 +64,15 @@ class GeistClient(object):
if self._wc:
return self._wc
targcfg = self.configmanager.get_node_attributes(
self.node, ["hardwaremanagement.manager"], decrypt=True
self.node, ['hardwaremanagement.manager'], decrypt=True
)
targcfg = targcfg.get(self.node, {})
target = targcfg.get("hardwaremanagement.manager", {}).get("value", None)
target = targcfg.get('hardwaremanagement.manager', {}).get('value', None)
if not target:
target = self.node
target = target.split("/", 1)[0]
target = target.split('/', 1)[0]
cv = util.TLSCertVerifier(
self.configmanager, self.node, "pubkeys.tls_hardwaremanager"
self.configmanager, self.node, 'pubkeys.tls_hardwaremanager'
).verify_cert
self._wc = wc.SecureHTTPConnection(target, port=443, verifycallback=cv)
return self._wc
@ -80,68 +80,68 @@ class GeistClient(object):
def login(self, configmanager):
credcfg = configmanager.get_node_attributes(
self.node,
["secret.hardwaremanagementuser", "secret.hardwaremanagementpassword"],
['secret.hardwaremanagementuser', 'secret.hardwaremanagementpassword'],
decrypt=True,
)
credcfg = credcfg.get(self.node, {})
username = credcfg.get("secret.hardwaremanagementuser", {}).get("value", None)
passwd = credcfg.get("secret.hardwaremanagementpassword", {}).get("value", None)
username = credcfg.get('secret.hardwaremanagementuser', {}).get('value', None)
passwd = credcfg.get('secret.hardwaremanagementpassword', {}).get('value', None)
if not isinstance(username, str):
username = username.decode("utf8")
username = username.decode('utf8')
if not isinstance(passwd, str):
passwd = passwd.decode("utf8")
passwd = passwd.decode('utf8')
if not username or not passwd:
raise Exception("Missing username or password")
raise Exception('Missing username or password')
self.username = username
rsp = self.wc.grab_json_response(
"/api/auth/{0}".format(username),
{"cmd": "login", "data": {"password": passwd}},
'/api/auth/{0}'.format(username),
{'cmd': 'login', 'data': {'password': passwd}},
)
token = rsp["data"]["token"]
token = rsp['data']['token']
return token
def logout(self):
if self._token:
self.wc.grab_json_response(
"/api/auth/{0}".format(self.username),
{"cmd": "logout", "token": self.token},
'/api/auth/{0}'.format(self.username),
{'cmd': 'logout', 'token': self.token},
)
self._token = None
def get_outlet(self, outlet):
rsp = self.wc.grab_json_response("/api/dev")
rsp = rsp["data"]
rsp = self.wc.grab_json_response('/api/dev')
rsp = rsp['data']
dbt = data_by_type(rsp)
if "t3hd" in dbt:
del dbt["t3hd"]
if 't3hd' in dbt:
del dbt['t3hd']
if len(dbt) != 1:
raise Exception("Multiple PDUs not supported per pdu")
raise Exception('Multiple PDUs not supported per pdu')
pdutype = list(dbt)[0]
outlet = dbt[pdutype]["outlet"][str(int(outlet) - 1)]
outlet = dbt[pdutype]['outlet'][str(int(outlet) - 1)]
state = outlet["state"].split("2")[-1]
state = outlet['state'].split('2')[-1]
return state
def set_outlet(self, outlet, state):
rsp = self.wc.grab_json_response("/api/dev")
dbt = data_by_type(rsp["data"])
if "t3hd" in dbt:
del dbt["t3hd"]
rsp = self.wc.grab_json_response('/api/dev')
dbt = data_by_type(rsp['data'])
if 't3hd' in dbt:
del dbt['t3hd']
if len(dbt) != 1:
self.logout()
raise Exception("Multiple PDUs per endpoint not supported")
pdu = dbt[list(dbt)[0]]["keyname"]
raise Exception('Multiple PDUs per endpoint not supported')
pdu = dbt[list(dbt)[0]]['keyname']
outlet = int(outlet) - 1
rsp = self.wc.grab_json_response(
"/api/dev/{0}/outlet/{1}".format(pdu, outlet),
'/api/dev/{0}/outlet/{1}'.format(pdu, outlet),
{
"cmd": "control",
"token": self.token,
"data": {"action": state, "delay": False},
'cmd': 'control',
'token': self.token,
'data': {'action': state, 'delay': False},
},
)
@ -149,51 +149,51 @@ class GeistClient(object):
def process_measurement(
keyname, name, enttype, entname, measurement, readings, category
):
if measurement["type"] == "realPower":
if category not in ("all", "power"):
if measurement['type'] == 'realPower':
if category not in ('all', 'power'):
return
readtype = "Real Power"
elif measurement["type"] == "apparentPower":
if category not in ("all", "power"):
readtype = 'Real Power'
elif measurement['type'] == 'apparentPower':
if category not in ('all', 'power'):
return
readtype = "Apparent Power"
elif measurement["type"] == "energy":
if category not in ("all", "energy"):
readtype = 'Apparent Power'
elif measurement['type'] == 'energy':
if category not in ('all', 'energy'):
return
readtype = "Energy"
elif measurement["type"] == "voltage":
if category not in ("all",):
readtype = 'Energy'
elif measurement['type'] == 'voltage':
if category not in ('all',):
return
readtype = "Voltage"
elif measurement["type"] == "current":
if category not in ("all",):
readtype = 'Voltage'
elif measurement['type'] == 'current':
if category not in ('all',):
return
readtype = "Current"
elif measurement["type"] == "temperature":
readtype = "Temperature"
elif measurement["type"] == "dewpoint":
readtype = "Dewpoint"
elif measurement["type"] == "humidity":
readtype = "Humidity"
readtype = 'Current'
elif measurement['type'] == 'temperature':
readtype = 'Temperature'
elif measurement['type'] == 'dewpoint':
readtype = 'Dewpoint'
elif measurement['type'] == 'humidity':
readtype = 'Humidity'
else:
return
myname = entname + " " + readtype
if name != "all" and simplify_name(myname) != name:
myname = entname + ' ' + readtype
if name != 'all' and simplify_name(myname) != name:
return
readings.append(
{
"name": myname,
"value": float(measurement["value"]),
"units": measurement["units"],
"type": readtype.split()[-1],
'name': myname,
'value': float(measurement['value']),
'units': measurement['units'],
'type': readtype.split()[-1],
}
)
def process_measurements(name, category, measurements, enttype, readings):
for measure in util.natural_sort(list(measurements)):
measurement = measurements[measure]["measurement"]
entname = measurements[measure]["name"]
measurement = measurements[measure]['measurement']
entname = measurements[measure]['name']
for measureid in measurement:
process_measurement(
measure,
@ -215,28 +215,28 @@ def read_sensors(element, node, configmanager):
if len(element) == 3:
# just get names
category = name
name = "all"
name = 'all'
justnames = True
if category in ("leds, fans", "temperature"):
if category in ('leds, fans', 'temperature'):
return
sn = _sensors_by_node.get(node, None)
if not sn or sn[1] < time.time():
gc = GeistClient(node, configmanager)
adev = gc.wc.grab_json_response("/api/dev")
adev = gc.wc.grab_json_response('/api/dev')
_sensors_by_node[node] = (adev, time.time() + 1)
sn = _sensors_by_node.get(node, None)
dbt = data_by_type(sn[0]["data"])
dbt = data_by_type(sn[0]['data'])
readings = []
for datatype in dbt:
datum = dbt[datatype]
process_measurements(name, category, datum["entity"], "entity", readings)
process_measurements(name, category, datum['entity'], 'entity', readings)
if "outlet" in datum:
process_measurements(name, category, datum["outlet"], "outlet", readings)
if 'outlet' in datum:
process_measurements(name, category, datum['outlet'], 'outlet', readings)
if justnames:
for reading in readings:
yield msg.ChildCollection(simplify_name(reading["name"]))
yield msg.ChildCollection(simplify_name(reading['name']))
else:
yield msg.SensorReadings(readings, name=node)
@ -250,42 +250,42 @@ def get_outlet(element, node, configmanager):
def read_firmware(node, configmanager):
gc = GeistClient(node, configmanager)
adev = gc.wc.grab_json_response("/api/sys")
myversion = adev["data"]["version"]
yield msg.Firmware([{"PDU Firmware": {"version": myversion}}], node)
adev = gc.wc.grab_json_response('/api/sys')
myversion = adev['data']['version']
yield msg.Firmware([{'PDU Firmware': {'version': myversion}}], node)
def read_inventory(element, node, configmanager):
_inventory = {}
inventory = {}
gc = GeistClient(node, configmanager)
adev = gc.wc.grab_json_response("/api/sys")
basedata = adev["data"]
inventory["present"] = True
inventory["name"] = "PDU"
adev = gc.wc.grab_json_response('/api/sys')
basedata = adev['data']
inventory['present'] = True
inventory['name'] = 'PDU'
for elem in basedata.items():
if (
elem[0] != "component"
and elem[0] != "locale"
and elem[0] != "state"
and elem[0] != "contact"
and elem[0] != "appVersion"
and elem[0] != "build"
and elem[0] != "version"
and elem[0] != "apiVersion"
elem[0] != 'component'
and elem[0] != 'locale'
and elem[0] != 'state'
and elem[0] != 'contact'
and elem[0] != 'appVersion'
and elem[0] != 'build'
and elem[0] != 'version'
and elem[0] != 'apiVersion'
):
temp = elem[0]
if elem[0] == "serialNumber":
temp = "Serial"
elif elem[0] == "partNumber":
temp = "P/N"
elif elem[0] == "modelNumber":
temp = "Lenovo P/N and Serial"
if elem[0] == 'serialNumber':
temp = 'Serial'
elif elem[0] == 'partNumber':
temp = 'P/N'
elif elem[0] == 'modelNumber':
temp = 'Lenovo P/N and Serial'
_inventory[temp] = elem[1]
elif elem[0] == "component":
tempname = ""
for component in basedata["component"].items():
elif elem[0] == 'component':
tempname = ''
for component in basedata['component'].items():
for item in component:
if type(item) == str:
@ -293,18 +293,18 @@ def read_inventory(element, node, configmanager):
else:
for entry in item.items():
temp = entry[0]
if temp == "sn":
temp = "Serial"
_inventory[tempname + " " + temp] = entry[1]
if temp == 'sn':
temp = 'Serial'
_inventory[tempname + ' ' + temp] = entry[1]
inventory["information"] = _inventory
inventory['information'] = _inventory
yield msg.KeyValueData({"inventory": [inventory]}, node)
yield msg.KeyValueData({'inventory': [inventory]}, node)
def retrieve(nodes, element, configmanager, inputdata):
if "outlets" in element:
if 'outlets' in element:
gp = greenpool.GreenPile(pdupool)
for node in nodes:
@ -313,7 +313,7 @@ def retrieve(nodes, element, configmanager, inputdata):
yield res
return
elif element[0] == "sensors":
elif element[0] == 'sensors':
gp = greenpool.GreenPile(pdupool)
for node in nodes:
gp.spawn(read_sensors, element, node, configmanager)
@ -321,7 +321,7 @@ def retrieve(nodes, element, configmanager, inputdata):
for datum in rsp:
yield datum
return
elif "/".join(element).startswith("inventory/firmware/all"):
elif '/'.join(element).startswith('inventory/firmware/all'):
gp = greenpool.GreenPile(pdupool)
for node in nodes:
gp.spawn(read_firmware, node, configmanager)
@ -329,7 +329,7 @@ def retrieve(nodes, element, configmanager, inputdata):
for datum in rsp:
yield datum
elif "/".join(element).startswith("inventory/hardware/all"):
elif '/'.join(element).startswith('inventory/hardware/all'):
gp = greenpool.GreenPile(pdupool)
for node in nodes:
gp.spawn(read_inventory, element, node, configmanager)
@ -338,13 +338,13 @@ def retrieve(nodes, element, configmanager, inputdata):
yield datum
else:
for node in nodes:
yield msg.ConfluentResourceUnavailable(node, "Not implemented")
yield msg.ConfluentResourceUnavailable(node, 'Not implemented')
return
def update(nodes, element, configmanager, inputdata):
if "outlets" not in element:
yield msg.ConfluentResourceUnavailable(node, "Not implemented")
if 'outlets' not in element:
yield msg.ConfluentResourceUnavailable(node, 'Not implemented')
return
for node in nodes:
gc = GeistClient(node, configmanager)