2
0
mirror of https://github.com/xcat2/confluent.git synced 2024-11-25 11:01:09 +00:00

Merge pull request #164 from tkucherera-lenovo/webauth_update

remove hardcorded values
This commit is contained in:
Jarrod Johnson 2024-09-19 16:29:33 -04:00 committed by GitHub
commit 65ae52782e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -209,27 +209,27 @@ class RegistrarImpl(CredentialsRegistrar):
)
APP_ORIGIN = 'https://ndiamai'
APP_TIMEOUT = 60000
APP_RELYING_PARTY = PublicKeyCredentialRpEntity(name='Confluent Web UI', id="ndiamai")
APP_CCO_BUILDER = CredentialCreationOptionsBuilder(
APP_TIMEOUT = 60000
APP_CREDENTIALS_BACKEND = CredentialsBackend(RegistrarImpl())
def registration_request(username, cfg, APP_RELYING_PARTY):
APP_CCO_BUILDER = CredentialCreationOptionsBuilder(
rp=APP_RELYING_PARTY,
pub_key_cred_params=[
PublicKeyCredentialParameters(type=PublicKeyCredentialType.PUBLIC_KEY,
alg=COSEAlgorithmIdentifier.Value.ES256)
],
timeout=APP_TIMEOUT,
)
)
APP_CRO_BUILDER = CredentialRequestOptionsBuilder(
rp_id=APP_RELYING_PARTY.id,
timeout=APP_TIMEOUT,
)
APP_CREDENTIALS_BACKEND = CredentialsBackend(RegistrarImpl())
def registration_request(username, cfg):
user_model = User.get(username)
if user_model is None:
raise Exception("User not foud")
@ -254,7 +254,7 @@ def registration_request(username, cfg):
'creationOptions': options_json
}
def registration_response(request, username):
def registration_response(request, username, APP_RELYING_PARTY, APP_ORIGIN):
try:
challengeID = request["challengeID"]
credential = parse_public_key_credential(json.loads(request["credential"]))
@ -292,7 +292,12 @@ def registration_response(request, username):
return True
def authentication_request(username):
def authentication_request(username, APP_RELYING_PARTY):
APP_CRO_BUILDER = CredentialRequestOptionsBuilder(
rp_id=APP_RELYING_PARTY.id,
timeout=APP_TIMEOUT,
)
user_model = User.get(username)
if user_model is None:
@ -323,7 +328,7 @@ def authentication_request(username):
'requestOptions': options_json
}
def authentication_response(request, username):
def authentication_response(request, username, APP_RELYING_PARTY, APP_ORIGIN):
try:
challengeID = request["challengeID"]
credential = parse_public_key_credential(json.loads(request["credential"]))
@ -365,10 +370,15 @@ def authentication_response(request, username):
def handle_api_request(url, env, start_response, username, cfm, headers, reqbody, authorized):
"""
For now webauth is going to be limited to just one passkey per user
If you try to register a new passkey this will just clear the old one and regist the new passkey
If you try to register a new passkey this will just clear the old one and register the new passkey
"""
global CONFIG_MANAGER
CONFIG_MANAGER = cfm
APP_ORIGIN = 'https://' + env['HTTP_X_FORWARDED_HOST']
HOST = env['HTTP_X_FORWARDED_HOST']
APP_RELYING_PARTY = PublicKeyCredentialRpEntity(name='Confluent Web UI', id=HOST)
if env['REQUEST_METHOD'] != 'POST':
raise Exception('Only POST supported for webauthn operations')
url = url.replace('/sessions/current/webauthn', '')
@ -381,7 +391,7 @@ def handle_api_request(url, env, start_response, username, cfm, headers, reqbody
if not authid:
authid = secrets.token_bytes(64)
cfm.set_user(username, {'webauthid': authid})
opts = registration_request(username, cfm)
opts = registration_request(username, cfm, APP_RELYING_PARTY)
start_response('200 OK', headers)
yield json.dumps(opts)
elif url.startswith('/registered_credentials/'):
@ -389,7 +399,7 @@ def handle_api_request(url, env, start_response, username, cfm, headers, reqbody
userinfo = cfm.get_user(username)
if not isinstance(username, bytes):
username = username.encode('utf8')
opts = authentication_request(username)
opts = authentication_request(username, APP_RELYING_PARTY)
start_response('200 OK', headers)
yield json.dumps(opts)
elif url.startswith('/validate/'):
@ -398,7 +408,7 @@ def handle_api_request(url, env, start_response, username, cfm, headers, reqbody
if not isinstance(username, bytes):
username = username.encode('utf8')
req = json.loads(reqbody)
rsp = authentication_response(req, username)
rsp = authentication_response(req, username, APP_RELYING_PARTY, APP_ORIGIN)
if rsp == 'Timeout':
start_response('408 Timeout', headers)
elif rsp['verified'] and start_response:
@ -417,7 +427,7 @@ def handle_api_request(url, env, start_response, username, cfm, headers, reqbody
userinfo = cfm.get_user(username)
if not isinstance(username, bytes):
username = username.encode('utf8')
rsp = registration_response(req, username)
rsp = registration_response(req, username, APP_RELYING_PARTY, APP_ORIGIN)
if rsp == 'Timeout':
start_response('408 Timeout', headers)
else: