2
0
mirror of https://github.com/xcat2/confluent.git synced 2024-11-28 20:39:40 +00:00

Persist passkeys as text

confluentdbutil and collective would choke
on the binary.  Have the binary bits be
safely converted to/from base64.
This commit is contained in:
Jarrod Johnson 2024-09-20 17:26:02 -04:00
parent 65ae52782e
commit a8df3692b6

View File

@ -2,6 +2,8 @@ from webauthn_rp.registrars import CredentialData
import confluent.tlvdata as tlvdata
import confluent.util as util
import json
import copy
import base64
import secrets, time
@ -42,6 +44,22 @@ class Challenge():
def _load_credentials(creds):
if creds is None:
return None
ret = copy.deepcopy(creds)
ret['credential_public_key'] = base64.b64decode(creds['credential_public_key'])
ret['id'] = base64.b64decode(creds['id'])
return ret
def _load_authenticators(authenticators):
ret = authenticators
if 'challenges' in ret:
ret['challenges']['request'] = base64.b64decode(ret['challenges']['request'])
if 'credentials' in ret:
ret['credentials'] = _load_credentials(ret['credentials'])
return ret
class User():
def __init__(self, id, username, user_handle, challenge: Challenge = None, credential: Credential = None):
self.id = id
@ -52,12 +70,15 @@ class User():
def __parse_credentials(self):
if self.credentials:
return {"id": self.credentials.id, "signature_count": self.credentials.signature_count, "credential_public_key": self.credentials.credential_public_key}
credid = base64.b64encode(self.credentials.id).decode()
pubkey = base64.b64encode(self.credentials.credential_public_key).decode()
return {"id": credid, "signature_count": self.credentials.signature_count, "credential_public_key": pubkey}
def __parse_challenges(self):
if self.challenges:
return {"id": self.challenges.id, 'request': self.challenges.request, 'timestamp_ms': self.challenges.timestamp_ms}
request = base64.b64encode(self.challenges.request).decode()
return {"id": self.challenges.id, 'request': request, 'timestamp_ms': self.challenges.timestamp_ms}
@staticmethod
@ -67,8 +88,9 @@ class User():
"""
for username in CONFIG_MANAGER.list_users():
authenticators = CONFIG_MANAGER.get_user(username).get('authenticators', {})
authenticators = _load_authenticators(authenticators)
try:
credential = authenticators['credentials']
credential = authenticators['credentials']
except KeyError:
continue
if "id" in credential.keys() and credential["id"] == credential_id:
@ -83,7 +105,8 @@ class User():
if not isinstance(username, str):
username = username.decode('utf8')
authenticators = CONFIG_MANAGER.get_user(username).get('authenticators', {})
credential = authenticators.get('credentials', None)
authenticators = _load_authenticators(authenticators)
credential = authenticators.get('credentials', None)
if credential is None:
return None
@ -98,7 +121,11 @@ class User():
def get_challenge(challengeID, username):
if not isinstance(username, str):
username = username.decode('utf8')
authenticators = CONFIG_MANAGER.get_user(username).get('authenticators', {})
authuser = CONFIG_MANAGER.get_user(username)
if not authuser:
return None
authenticators = authuser.get('authenticators', {})
authenticators = _load_authenticators(authenticators)
challenge = authenticators['challenges']
if challenge["id"] == challengeID:
return Challenge(request=challenge["request"], timstamp_ms=challenge["timestamp_ms"], id=challenge["id"])
@ -114,10 +141,18 @@ class User():
if not isinstance(username, str):
username = username.decode('utf8')
userinfo = CONFIG_MANAGER.get_user(username)
authenticators = CONFIG_MANAGER.get_user(username).get('authenticators', {})
try:
authenticators = CONFIG_MANAGER.get_user(username).get('authenticators', {})
except AttributeError:
return None
if userinfo is None:
return None
authid = userinfo.get('webauthid', None)
authenticators = _load_authenticators(authenticators)
b64authid = userinfo.get('webauthid', None)
if b64authid is None:
authid = None
else:
authid = base64.b64decode(b64authid)
challenge = authenticators.get("challenges", None)
if challenge:
challenges_return = Challenge(challenge['request'], challenge['timestamp_ms'], id=challenge["id"])
@ -130,6 +165,7 @@ class User():
def save(self):
authenticators = CONFIG_MANAGER.get_user(self.username).get('authenticators', {})
authenticators = _load_authenticators(authenticators)
authenticators['challenges'] = self.__parse_challenges() # Looks like the bigger the array we encounter problems changing to just save one challenge
authenticators['credentials'] = self.__parse_credentials()
@ -286,7 +322,7 @@ def registration_response(request, username, APP_RELYING_PARTY, APP_ORIGIN):
expected_challenge=challenge_model.request,
expected_origin=APP_ORIGIN
)
except WebAuthnRPError:
except WebAuthnRPError as wrp:
raise Exception("Could not handle credential attestation")
return True
@ -390,7 +426,8 @@ def handle_api_request(url, env, start_response, username, cfm, headers, reqbody
authid = userinfo.get('webauthid', None)
if not authid:
authid = secrets.token_bytes(64)
cfm.set_user(username, {'webauthid': authid})
b64authid = base64.b64encode(authid).decode()
cfm.set_user(username, {'webauthid': b64authid})
opts = registration_request(username, cfm, APP_RELYING_PARTY)
start_response('200 OK', headers)
yield json.dumps(opts)