2
0
mirror of https://github.com/xcat2/confluent.git synced 2024-11-22 01:22:00 +00:00

In-progress Eaton PDU support

This commit is contained in:
Jarrod Johnson 2022-06-03 16:26:07 -04:00
parent 80186f2c77
commit 799050fea2

View File

@ -0,0 +1,229 @@
# Copyright 2022 Lenovo
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import confluent.util as util
import confluent.messages as msg
import confluent.exceptions as exc
import eventlet
import re
import hashlib
import json
import time
#eaton uses 'eval' rather than json, massage it to be valid json
def sanitize_json(data):
if not isinstance(data, str):
data = data.decode('utf8')
return re.sub(r'([^ {:,]*):', r'"\1":', data).replace("'", '"')
def answer_challenge(username, password, data):
realm = data[0]
nonce = data[1].encode('utf8')
cnonce = data[2].encode('utf8')
uri = data[3].encode('utf8')
operation = data[4].encode('utf8')
incvalue = '{:08d}'.format(int(data[5])).encode('utf8')
a1 = hashlib.md5(':'.join([username, realm, password]).encode('utf8')).digest()
a1 = b':'.join([a1, nonce, cnonce])
skey = hashlib.md5(a1).hexdigest().encode('utf8')
ac2 = b'AUTHENTICATE:' + uri
s2c = hashlib.md5(ac2).hexdigest().encode('utf8')
rsp = hashlib.md5(b':'.join([skey, nonce, incvalue, cnonce, operation, s2c])).hexdigest().encode('utf8')
a2server = b':' + uri
s2server = hashlib.md5(a2server).hexdigest().encode('utf8')
s2rsp = hashlib.md5(b':'.join([skey, nonce, incvalue, cnonce, operation, s2server])).hexdigest().encode('utf8')
return {'sessionKey': skey.decode('utf8'), 'szResponse': rsp.decode('utf8'), 'szResponseValue': s2rsp.decode('utf8')}
try:
import Cookie
httplib = eventlet.import_patched('httplib')
except ImportError:
httplib = eventlet.import_patched('http.client')
import http.cookies as Cookie
# Delta PDU webserver always closes connection,
# replace conditionals with always close
class WebResponse(httplib.HTTPResponse):
def _check_close(self):
return True
class WebConnection(httplib.HTTPConnection):
response_class = WebResponse
def __init__(self, host):
httplib.HTTPConnection.__init__(self, host, 80)
self.cookies = {}
def getresponse(self):
try:
rsp = super(WebConnection, self).getresponse()
try:
hdrs = [x.split(':', 1) for x in rsp.msg.headers]
except AttributeError:
hdrs = rsp.msg.items()
for hdr in hdrs:
if hdr[0] == 'Set-Cookie':
c = Cookie.BaseCookie(hdr[1])
for k in c:
self.cookies[k] = c[k].value
except httplib.BadStatusLine:
self.broken = True
raise
return rsp
def request(self, method, url, body=None):
headers = {}
if body:
headers['Content-Length'] = len(body)
cookies = []
for cookie in self.cookies:
cookies.append('{0}={1}'.format(cookie, self.cookies[cookie]))
headers['Cookie'] = ';'.join(cookies)
headers['Host'] = 'pdu.cluster.net'
headers['Accept'] = '*/*'
headers['Accept-Language'] = 'en-US,en;q=0.9'
headers['Connection'] = 'close'
headers['Referer'] = 'http://pdu.cluster.net/setting_admin.htm'
return super(WebConnection, self).request(method, url, body, headers)
def grab_response(self, url, body=None, method=None):
if method is None:
method = 'GET' if body is None else 'POST'
if body:
self.request(method, url, body)
else:
self.request(method, url)
rsp = self.getresponse()
body = rsp.read()
return body, rsp.status
class PDUClient(object):
def __init__(self, pdu, configmanager):
self.node = pdu
self.configmanager = configmanager
self._token = None
self._wc = None
self.username = None
self.sessid = None
@property
def wc(self):
if self._wc:
return self._wc
targcfg = self.configmanager.get_node_attributes(self.node,
['hardwaremanagement.manager'],
decrypt=True)
targcfg = targcfg.get(self.node, {})
target = targcfg.get(
'hardwaremanagement.manager', {}).get('value', None)
if not target:
target = self.node
self._wc = WebConnection(target)
self.login(self.configmanager)
return self._wc
def login(self, configmanager):
credcfg = configmanager.get_node_attributes(self.node,
['secret.hardwaremanagementuser',
'secret.hardwaremanagementpassword'],
decrypt=True)
credcfg = credcfg.get(self.node, {})
username = credcfg.get(
'secret.hardwaremanagementuser', {}).get('value', None)
passwd = credcfg.get(
'secret.hardwaremanagementpassword', {}).get('value', None)
if not isinstance(username, str):
username = username.decode('utf8')
if not isinstance(passwd, str):
passwd = passwd.decode('utf8')
if not username or not passwd:
raise Exception('Missing username or password')
b64user = base64.b64encode(username.encode('utf8')).decode('utf8')
rsp = self.wc.grab_response('/config/gateway?page=cgi_authentication&login={}&_dc={}'.format(b64user, int(time.time())))
rsp = json.loads(sanitize_json(rsp[0]))
parms = answer_challenge(username, passwd, rsp['data'][-1])
self.sessid = rsp['data'][0]
url = '/config/gateway?page=cgi_authenticationChallenge&sessionId={}&login={}&sessionKey={}&szResponse={}&szResponseValue={}&dc={}'.format(
rsp['data'][0],
b64user,
parms['sessionKey'],
parms['szResponse'],
parms['szResponseValue'],
int(time.time()),
)
rsp = self.wc.grab_response(url)
rsp = json.loads(sanitize_json(rsp[0]))
if rsp['success'] != True:
raise Exception('Failed to login to device')
rsp = self.wc.grab_response('/config/gateway?page=cgi_checkUserSession&sessionId={}&_dc={}'.format(self.sessid, int(time.time())))
def do_request(self, suburl):
wc = self.wc
url = '/config/gateway?page={}&sessionId={}&_dc={}'.format(suburl, self.sessid, int(time.time()))
return wc.grab_response(url)
def logout(self):
print(repr(self.do_request('cgi_logout')))
#print(repr(self.wc.grab_response('/config/gateway?page=cgi_logout&sessionId={}&_dc={}'.format(self.sessid, int(time.time())))))
def get_outlet(self, outlet):
rsp = self.do_request('cgi_pdu_outlets')
data = sanitize_json(rsp[0])
data = json.loads(data)
from pprint import pprint
pprint(data)
#self.wc.grab_response('/config/gateway?page=pdu_outlets&sessionId={}&_dc={}'.format(self.sessid, int(time.time())))
return
rsp = self.wc.grab_response('/setting_admin4.xml')
xd = fromstring(rsp[0])
for ch in xd:
if 'relay' not in ch.tag:
continue
outnum = ch.tag.split('relay')[-1]
if outnum == outlet:
return ch.text.lower()
def set_outlet(self, outlet, state):
state = 0 if state == 'off' else 1
outlet = int(outlet)
sitem = '/SetParm?item=s4r{:02d}?content={}'.format(outlet, state)
self.wc.grab_response(sitem)
def retrieve(nodes, element, configmanager, inputdata):
if 'outlets' not in element:
for node in nodes:
yield msg.ConfluentResourceUnavailable(node, 'Not implemented')
return
for node in nodes:
gc = PDUClient(node, configmanager)
try:
state = gc.get_outlet(element[-1])
#yield msg.PowerState(node=node, state=state)
finally:
gc.logout()
def update(nodes, element, configmanager, inputdata):
if 'outlets' not in element:
yield msg.ConfluentResourceUnavailable(node, 'Not implemented')
return
for node in nodes:
gc = PDUClient(node, configmanager)
newstate = inputdata.powerstate(node)
gc.set_outlet(element[-1], newstate)
gc.logout()
eventlet.sleep(2)
for res in retrieve(nodes, element, configmanager, inputdata):
yield res