From a8df3692b6296a426834090896ae4fe98fab02c3 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Fri, 20 Sep 2024 17:26:02 -0400 Subject: [PATCH] Persist passkeys as text confluentdbutil and collective would choke on the binary. Have the binary bits be safely converted to/from base64. --- confluent_server/confluent/webauthn.py | 55 +++++++++++++++++++++----- 1 file changed, 46 insertions(+), 9 deletions(-) diff --git a/confluent_server/confluent/webauthn.py b/confluent_server/confluent/webauthn.py index b9699f5a..4920ac33 100644 --- a/confluent_server/confluent/webauthn.py +++ b/confluent_server/confluent/webauthn.py @@ -2,6 +2,8 @@ from webauthn_rp.registrars import CredentialData import confluent.tlvdata as tlvdata import confluent.util as util import json +import copy +import base64 import secrets, time @@ -42,6 +44,22 @@ class Challenge(): +def _load_credentials(creds): + if creds is None: + return None + ret = copy.deepcopy(creds) + ret['credential_public_key'] = base64.b64decode(creds['credential_public_key']) + ret['id'] = base64.b64decode(creds['id']) + return ret + +def _load_authenticators(authenticators): + ret = authenticators + if 'challenges' in ret: + ret['challenges']['request'] = base64.b64decode(ret['challenges']['request']) + if 'credentials' in ret: + ret['credentials'] = _load_credentials(ret['credentials']) + return ret + class User(): def __init__(self, id, username, user_handle, challenge: Challenge = None, credential: Credential = None): self.id = id @@ -52,12 +70,15 @@ class User(): def __parse_credentials(self): if self.credentials: - return {"id": self.credentials.id, "signature_count": self.credentials.signature_count, "credential_public_key": self.credentials.credential_public_key} + credid = base64.b64encode(self.credentials.id).decode() + pubkey = base64.b64encode(self.credentials.credential_public_key).decode() + return {"id": credid, "signature_count": self.credentials.signature_count, "credential_public_key": pubkey} def __parse_challenges(self): if self.challenges: - return {"id": self.challenges.id, 'request': self.challenges.request, 'timestamp_ms': self.challenges.timestamp_ms} + request = base64.b64encode(self.challenges.request).decode() + return {"id": self.challenges.id, 'request': request, 'timestamp_ms': self.challenges.timestamp_ms} @staticmethod @@ -67,8 +88,9 @@ class User(): """ for username in CONFIG_MANAGER.list_users(): authenticators = CONFIG_MANAGER.get_user(username).get('authenticators', {}) + authenticators = _load_authenticators(authenticators) try: - credential = authenticators['credentials'] + credential = authenticators['credentials'] except KeyError: continue if "id" in credential.keys() and credential["id"] == credential_id: @@ -83,7 +105,8 @@ class User(): if not isinstance(username, str): username = username.decode('utf8') authenticators = CONFIG_MANAGER.get_user(username).get('authenticators', {}) - credential = authenticators.get('credentials', None) + authenticators = _load_authenticators(authenticators) + credential = authenticators.get('credentials', None) if credential is None: return None @@ -98,7 +121,11 @@ class User(): def get_challenge(challengeID, username): if not isinstance(username, str): username = username.decode('utf8') - authenticators = CONFIG_MANAGER.get_user(username).get('authenticators', {}) + authuser = CONFIG_MANAGER.get_user(username) + if not authuser: + return None + authenticators = authuser.get('authenticators', {}) + authenticators = _load_authenticators(authenticators) challenge = authenticators['challenges'] if challenge["id"] == challengeID: return Challenge(request=challenge["request"], timstamp_ms=challenge["timestamp_ms"], id=challenge["id"]) @@ -114,10 +141,18 @@ class User(): if not isinstance(username, str): username = username.decode('utf8') userinfo = CONFIG_MANAGER.get_user(username) - authenticators = CONFIG_MANAGER.get_user(username).get('authenticators', {}) + try: + authenticators = CONFIG_MANAGER.get_user(username).get('authenticators', {}) + except AttributeError: + return None if userinfo is None: return None - authid = userinfo.get('webauthid', None) + authenticators = _load_authenticators(authenticators) + b64authid = userinfo.get('webauthid', None) + if b64authid is None: + authid = None + else: + authid = base64.b64decode(b64authid) challenge = authenticators.get("challenges", None) if challenge: challenges_return = Challenge(challenge['request'], challenge['timestamp_ms'], id=challenge["id"]) @@ -130,6 +165,7 @@ class User(): def save(self): authenticators = CONFIG_MANAGER.get_user(self.username).get('authenticators', {}) + authenticators = _load_authenticators(authenticators) authenticators['challenges'] = self.__parse_challenges() # Looks like the bigger the array we encounter problems changing to just save one challenge authenticators['credentials'] = self.__parse_credentials() @@ -286,7 +322,7 @@ def registration_response(request, username, APP_RELYING_PARTY, APP_ORIGIN): expected_challenge=challenge_model.request, expected_origin=APP_ORIGIN ) - except WebAuthnRPError: + except WebAuthnRPError as wrp: raise Exception("Could not handle credential attestation") return True @@ -390,7 +426,8 @@ def handle_api_request(url, env, start_response, username, cfm, headers, reqbody authid = userinfo.get('webauthid', None) if not authid: authid = secrets.token_bytes(64) - cfm.set_user(username, {'webauthid': authid}) + b64authid = base64.b64encode(authid).decode() + cfm.set_user(username, {'webauthid': b64authid}) opts = registration_request(username, cfm, APP_RELYING_PARTY) start_response('200 OK', headers) yield json.dumps(opts)