2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-02-16 18:49:04 +00:00

Preferentially support HTTPS on Eaton PDU

While Eaton does not do HTTPS by default,
it can be configured to do so.

Support when available.

Mitigate downgrade attack by
stickying the cert fingerprint.
If fingerprint is present, then refuse
to even think about port 80.
This commit is contained in:
Jarrod Johnson 2023-02-15 17:03:35 -05:00
parent 90af99e864
commit abc639e32b

View File

@ -17,6 +17,9 @@ import confluent.util as util
import confluent.messages as msg
import confluent.exceptions as exc
import eventlet
import eventlet.green.socket as socket
import pyghmi.util.webclient as wc
import confluent.util as util
import re
import hashlib
import json
@ -59,12 +62,32 @@ class WebResponse(httplib.HTTPResponse):
def _check_close(self):
return True
class WebConnection(httplib.HTTPConnection):
class WebConnection(wc.SecureHTTPConnection):
response_class = WebResponse
def __init__(self, host):
httplib.HTTPConnection.__init__(self, host, 80)
def __init__(self, host, secure, verifycallback):
if secure:
port = 443
else:
port = 80
wc.SecureHTTPConnection.__init__(self, host, port, verifycallback=verifycallback)
self.secure = secure
self.cookies = {}
def connect(self):
if self.secure:
return super(WebConnection, self).connect()
addrinfo = socket.getaddrinfo(self.host, self.port)[0]
# workaround problems of too large mtu, moderately frequent occurance
# in this space
plainsock = socket.socket(addrinfo[0])
plainsock.settimeout(self.mytimeout)
try:
plainsock.setsockopt(socket.IPPROTO_TCP, socket.TCP_MAXSEG, 1456)
except socket.error:
pass
plainsock.connect(addrinfo[4])
self.sock = plainsock
def getresponse(self):
try:
rsp = super(WebConnection, self).getresponse()
@ -132,8 +155,18 @@ class PDUClient(object):
if not target:
target = self.node
target = target.split('/', 1)[0]
self._wc = WebConnection(target)
self.login(self.configmanager)
verifier = util.TLSCertVerifier(
self.configmanager, self.node, 'pubkeys.tls_hardwaremanager')
try:
self._wc = WebConnection(target, secure=True, verifycallback=verifier.verify_cert)
self.login(self.configmanager)
except socket.error as e:
pkey = self.configmanager.get_node_attributes(self.node, 'pubkeys.tls_hardwaremanager')
pkey = pkey.get(self.node, {}).get('pubkeys.tls_hardwaremanager', {}).get('value', None)
if pkey:
raise
self._wc = WebConnection(target, secure=False, verifycallback=verifier.verify_cert)
self.login(self.configmanager)
return self._wc
def login(self, configmanager):
@ -153,18 +186,27 @@ class PDUClient(object):
if not username or not passwd:
raise Exception('Missing username or password')
b64user = base64.b64encode(username.encode('utf8')).decode('utf8')
b64pass = base64.b64encode(passwd.encode('utf8')).decode('utf8')
rsp = self.wc.grab_response('/config/gateway?page=cgi_authentication&login={}&_dc={}'.format(b64user, int(time.time())))
rsp = json.loads(sanitize_json(rsp[0]))
parms = answer_challenge(username, passwd, rsp['data'][-1])
self.sessid = rsp['data'][0]
url = '/config/gateway?page=cgi_authenticationChallenge&sessionId={}&login={}&sessionKey={}&szResponse={}&szResponseValue={}&dc={}'.format(
rsp['data'][0],
b64user,
parms['sessionKey'],
parms['szResponse'],
parms['szResponseValue'],
int(time.time()),
)
if rsp['data'][-1] == 'password':
url = '/config/gateway?page=cgi_authenticationPassword&login={}&sessionId={}&password={}&dc={}'.format(
b64user,
rsp['data'][0],
b64pass,
int(time.time()),
)
else:
parms = answer_challenge(username, passwd, rsp['data'][-1])
url = '/config/gateway?page=cgi_authenticationChallenge&sessionId={}&login={}&sessionKey={}&szResponse={}&szResponseValue={}&dc={}'.format(
rsp['data'][0],
b64user,
parms['sessionKey'],
parms['szResponse'],
parms['szResponseValue'],
int(time.time()),
)
rsp = self.wc.grab_response(url)
rsp = json.loads(sanitize_json(rsp[0]))
if rsp['success'] != True:
@ -177,7 +219,7 @@ class PDUClient(object):
return wc.grab_response(url)
def logout(self):
print(repr(self.do_request('cgi_logout')))
self.do_request('cgi_logout')
def get_outlet(self, outlet):
rsp = self.do_request('cgi_pdu_outlets')