2
0
mirror of https://github.com/xcat2/confluent.git synced 2024-11-21 17:11:58 +00:00

Merge pull request #170 from tkucherera-lenovo/webauth_update

use webauthn instead of webauthn-rp
This commit is contained in:
Jarrod Johnson 2024-11-19 10:38:24 -05:00 committed by GitHub
commit ebecc9c844
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 93 additions and 226 deletions

View File

@ -1,26 +1,22 @@
from webauthn_rp.registrars import CredentialData
import confluent.tlvdata as tlvdata
import confluent.util as util
import json
import copy
import base64
import secrets, time
from typing import Any, Optional
from webauthn_rp.backends import CredentialsBackend
from webauthn_rp.builders import *
from webauthn_rp.converters import cose_key, jsonify
from webauthn_rp.errors import WebAuthnRPError
from webauthn_rp.parsers import parse_cose_key, parse_public_key_credential
from webauthn_rp.registrars import *
from webauthn_rp.types import (
AttestationObject, AttestationType, AuthenticatorAssertionResponse,
AuthenticatorAttestationResponse, AuthenticatorData,
COSEAlgorithmIdentifier, PublicKeyCredential,
PublicKeyCredentialDescriptor, PublicKeyCredentialParameters,
PublicKeyCredentialRpEntity, PublicKeyCredentialType,
PublicKeyCredentialUserEntity, TrustedPath)
from webauthn import (
generate_registration_options,
options_to_json,
generate_authentication_options,
)
from webauthn.helpers.structs import (
AuthenticatorSelectionCriteria,
UserVerificationRequirement,
)
from webauthn import verify_registration_response
from webauthn import verify_authentication_response
challenges = {}
@ -34,15 +30,12 @@ class Credential():
self.credential_public_key = public_key
class Challenge():
def __init__(self, request, timstamp_ms, id=None) -> None:
def __init__(self, request, id=None) -> None:
if id is None:
self.id = util.randomstring(16)
else:
self.id = id
self.request = request
self.timestamp_ms = timstamp_ms
def _load_credentials(creds):
if creds is None:
@ -55,7 +48,8 @@ def _load_credentials(creds):
def _load_authenticators(authenticators):
ret = authenticators
if 'challenges' in ret:
ret['challenges']['request'] = base64.b64decode(ret['challenges']['request'])
if not ret['challenges'] is None:
ret['challenges']['request'] = base64.b64decode(ret['challenges']['request'])
if 'credentials' in ret:
ret['credentials'] = _load_credentials(ret['credentials'])
return ret
@ -78,7 +72,7 @@ class User():
def __parse_challenges(self):
if self.challenges:
request = base64.b64encode(self.challenges.request).decode()
return {"id": self.challenges.id, 'request': request, 'timestamp_ms': self.challenges.timestamp_ms}
return {"id": self.challenges.id, 'request': request}
@staticmethod
@ -97,8 +91,7 @@ class User():
#for now leaving signature count as None
return (Credential(id=credential["id"], signature_count=None, public_key=credential["credential_public_key"]), username)
return None
@staticmethod
def get_credential(credential_id, username):
@ -112,13 +105,11 @@ class User():
if credential_id is None:
return Credential(id=credential["id"], signature_count=credential["signature_count"], public_key=credential["credential_public_key"])
if credential["id"] == credential_id:
return Credential(id=credential["id"], signature_count=credential["signature_count"], public_key=credential["credential_public_key"])
return None
@staticmethod
def get_challenge(challengeID, username):
def get_challenge(username):
if not isinstance(username, str):
username = username.decode('utf8')
authuser = CONFIG_MANAGER.get_user(username)
@ -127,10 +118,8 @@ class User():
authenticators = authuser.get('authenticators', {})
authenticators = _load_authenticators(authenticators)
challenge = authenticators['challenges']
if challenge["id"] == challengeID:
return Challenge(request=challenge["request"], timstamp_ms=challenge["timestamp_ms"], id=challenge["id"])
return None
return Challenge(request=challenge["request"], id=challenge["id"])
@staticmethod
def get(username):
@ -155,7 +144,7 @@ class User():
authid = base64.b64decode(b64authid)
challenge = authenticators.get("challenges", None)
if challenge:
challenges_return = Challenge(challenge['request'], challenge['timestamp_ms'], id=challenge["id"])
challenges_return = Challenge(challenge['request'], id=challenge["id"])
credential = authenticators.get("credentials", None)
if credential:
@ -187,221 +176,102 @@ class User():
#raise Exception("Credential item not found")
def timestamp_ms():
return int(time.time() * 1000)
class RegistrarImpl(CredentialsRegistrar):
def register_credential_attestation(
self,
credential: PublicKeyCredential,
att: AttestationObject,
att_type: AttestationType,
user: PublicKeyCredentialUserEntity,
rp: PublicKeyCredentialRpEntity,
trusted_path: Optional[TrustedPath] = None) -> Any:
assert att.auth_data is not None
assert att.auth_data.attested_credential_data is not None
cpk = att.auth_data.attested_credential_data.credential_public_key
user_model = User.get(user.name)
if user_model is None:
return 'No user found'
credential_model = Credential(id=credential.raw_id, signature_count=None, public_key=cose_key(cpk))
user_model.add(credential_model)
user_model.save()
def register_credential_assertion(
self,
credential: PublicKeyCredential,
authenticator_data: AuthenticatorData,
user: PublicKeyCredentialUserEntity,
rp: PublicKeyCredentialRpEntity) -> Any:
user_model = User.get(user.name)
credential_model = User.get_credential(credential_id=credential.raw_id, username=user.name)
credential_model.signature_count = None
user_model.update(credential_model)
user_model.save()
def get_credential_data(
self,
credential_id: bytes) -> Optional[CredentialData]:
#credential_model = User.get_credential(credential_id=credential_id, username=username)
(credential_model, username) = User.seek_credential_by_id(credential_id)
user_model = User.get(username)
return CredentialData(
parse_cose_key(credential_model.credential_public_key),
credential_model.signature_count,
PublicKeyCredentialUserEntity(
name=user_model.username,
id=user_model.user_handle,
display_name=user_model.username
)
)
APP_TIMEOUT = 60000
APP_CREDENTIALS_BACKEND = CredentialsBackend(RegistrarImpl())
def registration_request(username, cfg, APP_RELYING_PARTY):
APP_CCO_BUILDER = CredentialCreationOptionsBuilder(
rp=APP_RELYING_PARTY,
pub_key_cred_params=[
PublicKeyCredentialParameters(type=PublicKeyCredentialType.PUBLIC_KEY,
alg=COSEAlgorithmIdentifier.Value.ES256)
],
timeout=APP_TIMEOUT,
)
user_model = User.get(username)
if user_model is None:
raise Exception("User not foud")
challenge_bytes = secrets.token_bytes(64)
challenge = Challenge(request=challenge_bytes, timstamp_ms=timestamp_ms())
user_model.add(challenge)
user_model.save()
options = APP_CCO_BUILDER.build(
user=PublicKeyCredentialUserEntity(
name=username,
id=user_model.user_handle,
display_name=username
options = generate_registration_options(
rp_name=APP_RELYING_PARTY.name,
rp_id=APP_RELYING_PARTY.id,
user_id=user_model.user_handle,
user_name=username,
authenticator_selection=AuthenticatorSelectionCriteria(
user_verification=UserVerificationRequirement.REQUIRED,
),
challenge=challenge_bytes
)
options_json = jsonify(options)
return {
'challengeID': challenge.id,
'creationOptions': options_json
}
challenge = Challenge(options.challenge)
user_model.add(challenge)
user_model.save()
options_json = options_to_json(options)
return options_json
def registration_response(request, username, APP_RELYING_PARTY, APP_ORIGIN):
try:
challengeID = request["challengeID"]
credential = parse_public_key_credential(json.loads(request["credential"]))
except Exception:
raise Exception("Could not parse input data")
if type(credential.response) is not AuthenticatorAttestationResponse:
raise Exception("Invalid response type")
challenge_model = User.get_challenge(challengeID, username)
challenge_model = User.get_challenge(username)
if not challenge_model:
raise Exception("Could not find challenge matching given id")
user_model = User.get(username)
if not user_model:
raise Exception("Invalid Username")
current_timestamp = timestamp_ms()
if current_timestamp - challenge_model.timestamp_ms > APP_TIMEOUT:
return "Timeout"
user_entity = PublicKeyCredentialUserEntity(name=user_model.username, id=user_model.user_handle, display_name=user_model.username)
try:
APP_CREDENTIALS_BACKEND.handle_credential_attestation(
credential=credential,
user=user_entity,
rp=APP_RELYING_PARTY,
expected_challenge=challenge_model.request,
expected_origin=APP_ORIGIN
registration_verification = verify_registration_response(
credential=request,
expected_challenge=challenge_model.request,
expected_rp_id=APP_RELYING_PARTY.id,
expected_origin=APP_ORIGIN,
require_user_verification=True,
)
except WebAuthnRPError as wrp:
except Exception as err:
raise Exception("Could not handle credential attestation")
return True
credential = Credential(id=registration_verification.credential_id, signature_count=registration_verification.sign_count, public_key=registration_verification.credential_public_key)
user_model.add(credential)
user_model.save()
return {"verified": True}
def authentication_request(username, APP_RELYING_PARTY):
APP_CRO_BUILDER = CredentialRequestOptionsBuilder(
rp_id=APP_RELYING_PARTY.id,
timeout=APP_TIMEOUT,
)
user_model = User.get(username)
if user_model is None:
return 'User not registered'
credential = user_model.get_credential(None, username)
if credential is None:
return 'No credential found'
challenge_bytes = secrets.token_bytes(64)
challenge = Challenge(request=challenge_bytes, timstamp_ms=timestamp_ms())
user_model.add(challenge)
user_model.save()
options = APP_CRO_BUILDER.build(
challenge=challenge_bytes,
allow_credentials=[
PublicKeyCredentialDescriptor(
id=credential.id,
type=PublicKeyCredentialType.PUBLIC_KEY
)
]
)
options_json = jsonify(options)
return {
'challengeID': challenge.id,
'requestOptions': options_json
}
def authentication_response(request, username, APP_RELYING_PARTY, APP_ORIGIN):
try:
challengeID = request["challengeID"]
credential = parse_public_key_credential(json.loads(request["credential"]))
except Exception:
raise Exception("Could not parse input data")
if type(credential.response) is not AuthenticatorAssertionResponse:
raise Exception('Invalid response type')
challenge_model = User.get_challenge(challengeID, username)
if not challenge_model:
raise Exception("Could not find challenge matching given id")
user_model = User.get(username)
if not user_model:
raise Exception("Invalid Username")
current_timestamp = timestamp_ms()
if current_timestamp - challenge_model.timestamp_ms > APP_TIMEOUT:
return "Timeout"
user_entity = PublicKeyCredentialUserEntity(name=user_model.username, id=user_model.user_handle, display_name=user_model.username)
try:
APP_CREDENTIALS_BACKEND.handle_credential_assertion(
credential=credential,
user=user_entity,
rp=APP_RELYING_PARTY,
expected_challenge=challenge_model.request,
expected_origin=APP_ORIGIN
)
except WebAuthnRPError:
raise Exception('Could not handle credential assertion')
options = generate_authentication_options(
rp_id=APP_RELYING_PARTY.id,
user_verification=UserVerificationRequirement.REQUIRED,
)
challenge = Challenge(options.challenge)
user_model.add(challenge)
user_model.save()
opts = options_to_json(options)
return opts
def authentication_response(request, username, APP_RELYING_PARTY, APP_ORIGIN):
user_model = User.get(username)
if not user_model:
raise Exception("Invalid Username")
challenge_model = User.get_challenge(username)
if not challenge_model:
raise Exception("Could not find challenge matching given id")
credential_model = User.get_credential(credential_id=None, username=username)
if not credential_model:
raise Exception("No credential for user")
verification = verify_authentication_response(
credential=request,
expected_challenge=challenge_model.request,
expected_rp_id=APP_RELYING_PARTY.id,
expected_origin=APP_ORIGIN,
credential_public_key = credential_model.credential_public_key,
credential_current_sign_count = 0,
require_user_verification = True
)
return {"verified": True}
class RpEntity(object):
def __init__(self, name, id):
self.name = name
self.id = id
def handle_api_request(url, env, start_response, username, cfm, headers, reqbody, authorized):
"""
@ -413,7 +283,7 @@ def handle_api_request(url, env, start_response, username, cfm, headers, reqbody
APP_ORIGIN = 'https://' + env['HTTP_X_FORWARDED_HOST']
HOST = env['HTTP_X_FORWARDED_HOST']
APP_RELYING_PARTY = PublicKeyCredentialRpEntity(name='Confluent Web UI', id=HOST)
APP_RELYING_PARTY = RpEntity(name='Confluent Web UI', id=HOST)
if env['REQUEST_METHOD'] != 'POST':
raise Exception('Only POST supported for webauthn operations')
@ -465,10 +335,7 @@ def handle_api_request(url, env, start_response, username, cfm, headers, reqbody
if not isinstance(username, bytes):
username = username.encode('utf8')
rsp = registration_response(req, username, APP_RELYING_PARTY, APP_ORIGIN)
if rsp == 'Timeout':
start_response('408 Timeout', headers)
else:
print('worked out')
if rsp.get('verified', False):
start_response('200 OK', headers)
yield json.dumps({'status': 'Success'})

View File

@ -19,15 +19,15 @@ BuildArch: noarch
Requires: confluent_vtbufferd
%if "%{dist}" == ".el7"
Requires: python-pyghmi >= 1.5.71, python-eventlet, python-greenlet, python-pycryptodomex >= 3.4.7, confluent_client == %{version}, python-pyparsing, python-paramiko, python-webauthn-rp, python-dnspython, python-netifaces, python2-pyasn1 >= 0.2.3, python-pysnmp >= 4.3.4, python-lxml, python-eficompressor, python-setuptools, python-dateutil, python-websocket-client python2-msgpack python-libarchive-c python-yaml python-monotonic
Requires: python-pyghmi >= 1.5.71, python-eventlet, python-greenlet, python-pycryptodomex >= 3.4.7, confluent_client == %{version}, python-pyparsing, python-paramiko, python-dnspython, python-netifaces, python2-pyasn1 >= 0.2.3, python-pysnmp >= 4.3.4, python-lxml, python-eficompressor, python-setuptools, python-dateutil, python-websocket-client python2-msgpack python-libarchive-c python-yaml python-monotonic
%else
%if "%{dist}" == ".el8"
Requires: python3-pyghmi >= 1.5.71, python3-eventlet, python3-greenlet, python3-pycryptodomex >= 3.4.7, confluent_client == %{version}, python3-pyparsing, python3-paramiko, python3-webauthn-rp, python3-dns, python3-netifaces, python3-pyasn1 >= 0.2.3, python3-pysnmp >= 4.3.4, python3-lxml, python3-eficompressor, python3-setuptools, python3-dateutil, python3-enum34, python3-asn1crypto, python3-cffi, python3-pyOpenSSL, python3-websocket-client python3-msgpack python3-libarchive-c python3-yaml openssl iproute
Requires: python3-pyghmi >= 1.5.71, python3-eventlet, python3-greenlet, python3-pycryptodomex >= 3.4.7, confluent_client == %{version}, python3-pyparsing, python3-paramiko, python3-dns, python3-netifaces, python3-pyasn1 >= 0.2.3, python3-pysnmp >= 4.3.4, python3-lxml, python3-eficompressor, python3-setuptools, python3-dateutil, python3-enum34, python3-asn1crypto, python3-cffi, python3-pyOpenSSL, python3-websocket-client python3-msgpack python3-libarchive-c python3-yaml openssl iproute
%else
%if "%{dist}" == ".el9"
Requires: python3-pyghmi >= 1.5.71, python3-eventlet, python3-greenlet, python3-pycryptodomex >= 3.4.7, confluent_client == %{version}, python3-pyparsing, python3-paramiko, python3-dns, python3-webauthn-rp, python3-netifaces, python3-pyasn1 >= 0.2.3, python3-pysnmp >= 4.3.4, python3-lxml, python3-eficompressor, python3-setuptools, python3-dateutil, python3-cffi, python3-pyOpenSSL, python3-websocket-client python3-msgpack python3-libarchive-c python3-yaml openssl iproute
Requires: python3-pyghmi >= 1.5.71, python3-eventlet, python3-greenlet, python3-pycryptodomex >= 3.4.7, confluent_client == %{version}, python3-pyparsing, python3-paramiko, python3-dns, python3-webauthn, python3-netifaces, python3-pyasn1 >= 0.2.3, python3-pysnmp >= 4.3.4, python3-lxml, python3-eficompressor, python3-setuptools, python3-dateutil, python3-cffi, python3-pyOpenSSL, python3-websocket-client python3-msgpack python3-libarchive-c python3-yaml openssl iproute
%else
Requires: python3-dbm,python3-pyghmi >= 1.5.71, python3-eventlet, python3-greenlet, python3-pycryptodome >= 3.4.7, confluent_client == %{version}, python3-pyparsing, python3-paramiko, python3-webauthn-rp, python3-dnspython, python3-netifaces, python3-pyasn1 >= 0.2.3, python3-pysnmp >= 4.3.4, python3-lxml, python3-eficompressor, python3-setuptools, python3-dateutil, python3-cffi, python3-pyOpenSSL, python3-websocket-client python3-msgpack python3-libarchive-c python3-PyYAML openssl iproute
Requires: python3-dbm,python3-pyghmi >= 1.5.71, python3-eventlet, python3-greenlet, python3-pycryptodome >= 3.4.7, confluent_client == %{version}, python3-pyparsing, python3-paramiko, python3-dnspython, python3-netifaces, python3-pyasn1 >= 0.2.3, python3-pysnmp >= 4.3.4, python3-lxml, python3-eficompressor, python3-setuptools, python3-dateutil, python3-cffi, python3-pyOpenSSL, python3-websocket-client python3-msgpack python3-libarchive-c python3-PyYAML openssl iproute
%endif
%endif