2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-01-28 03:48:35 +00:00

Change webauthn to use confluent user db

This allows persistence with the rest of the
configuration.
This commit is contained in:
Jarrod Johnson 2022-05-26 17:41:16 -04:00
parent 7072a85d79
commit b638bb69f9
2 changed files with 23 additions and 25 deletions

View File

@ -2447,10 +2447,10 @@ class ConfigManager(object):
uid = tmpconfig[confarea].get('id', None)
displayname = tmpconfig[confarea].get('displayname', None)
self.create_user(user, uid=uid, displayname=displayname)
if 'cryptpass' in tmpconfig[confarea][user]:
self._cfgstore['users'][user]['cryptpass'] = \
tmpconfig[confarea][user]['cryptpass']
_mark_dirtykey('users', user, self.tenant)
for attrname in ('authid', 'authenticators', 'cryptpass'):
if attrname in tmpconfig[confarea][user]:
self._cfgstore['users'][user][attrname] = tmpconfig[confarea][user][attrname]
_mark_dirtykey('users', user, self.tenant)
if sync:
self._bg_sync_to_file()

View File

@ -6,36 +6,36 @@ import pywarp
import pywarp.backends
import pywarp.credentials
creds = {}
challenges = {}
class TestBackend(pywarp.backends.CredentialStorageBackend):
def __init__(self):
global creds
try:
with open('/tmp/mycreds.json', 'r') as ji:
creds = json.load(ji)
except Exception:
pass
class ConfluentBackend(pywarp.backends.CredentialStorageBackend):
def __init__(self, cfg):
self.cfg = cfg
def get_credential_ids_by_email(self, email):
if not isinstance(email, str):
email = email.decode('utf8')
for cid in creds[email]:
authenticators = self.cfg.get_user(email).get('authenticators', {})
if not authenticators:
raise Exception('No authenticators found')
for cid in authenticators:
yield base64.b64decode(cid)
def get_credential_by_email_id(self, email, id):
if not isinstance(email, str):
email = email.decode('utf8')
authenticators = self.cfg.get_user(email).get('authenticators', {})
cid = base64.b64encode(id).decode('utf8')
pk = creds[email][cid]['cpk']
pk = authenticators[cid]['cpk']
pk = base64.b64decode(pk)
return pywarp.credentials.Credential(credential_id=id, credential_public_key=pk)
def get_credential_by_email(self, email):
if not isinstance(email, str):
email = email.decode('utf8')
cred = creds[email]
authenticators = self.cfg.get_user(email)
cid = list(authenticators)[0]
cred = authenticators[cid]
cid = base64.b64decode(cred['cid'])
cpk = base64.b64decode(cred['cpk'])
return pywarp.credentials.Credential(credential_id=cid, credential_public_key=cpk)
@ -45,11 +45,9 @@ class TestBackend(pywarp.backends.CredentialStorageBackend):
email = email.decode('utf8')
cid = base64.b64encode(credential.id).decode('utf8')
credential = {'cid': cid, 'cpk': base64.b64encode(bytes(credential.public_key)).decode('utf8')}
if email not in creds:
creds[email] = {}
creds[email][cid] = credential
with open('/tmp/mycreds.json', 'w') as jo:
json.dump(creds, jo)
authenticators = self.cfg.get_user(email).get('authenticators', {})
authenticators[cid] = credential
self.cfg.set_user(email, {'authenticators': authenticators})
def save_challenge_for_user(self, email, challenge, type):
if not isinstance(email, str):
@ -67,7 +65,7 @@ def handle_api_request(url, env, start_response, username, cfm, headers, reqbody
raise Exception('Only POST supported for webauthn operations')
url = url.replace('/sessions/current/webauthn', '')
if url == '/registration_options':
rp = pywarp.RelyingPartyManager('Confluent Web UI', credential_storage_backend=TestBackend(), require_attestation=False)
rp = pywarp.RelyingPartyManager('Confluent Web UI', credential_storage_backend=ConfluentBackend(cfm), require_attestation=False)
userinfo = cfm.get_user(username)
if not userinfo:
cfm.create_user(username, role='Stub')
@ -89,7 +87,7 @@ def handle_api_request(url, env, start_response, username, cfm, headers, reqbody
yield json.dumps(opts)
elif url.startswith('/registered_credentials/'):
username = url.rsplit('/', 1)[-1]
rp = pywarp.RelyingPartyManager('Confluent Web UI', credential_storage_backend=TestBackend())
rp = pywarp.RelyingPartyManager('Confluent Web UI', credential_storage_backend=ConfluentBackend(cfm))
if not isinstance(username, bytes):
username = username.encode('utf8')
opts = rp.get_authentication_options(username)
@ -100,7 +98,7 @@ def handle_api_request(url, env, start_response, username, cfm, headers, reqbody
username = url.rsplit('/', 1)[-1]
if not isinstance(username, bytes):
username = username.encode('utf8')
rp = pywarp.RelyingPartyManager('Confluent Web UI', credential_storage_backend=TestBackend())
rp = pywarp.RelyingPartyManager('Confluent Web UI', credential_storage_backend=ConfluentBackend(cfm))
req = json.loads(reqbody)
for x in req:
req[x] = base64.b64decode(req[x].replace('-', '+').replace('_', '/'))
@ -118,7 +116,7 @@ def handle_api_request(url, env, start_response, username, cfm, headers, reqbody
else:
yield rsp
elif url == '/register_credential':
rp = pywarp.RelyingPartyManager('Confluent Web UI', credential_storage_backend=TestBackend(), require_attestation=False)
rp = pywarp.RelyingPartyManager('Confluent Web UI', credential_storage_backend=ConfluentBackend(cfm), require_attestation=False)
req = json.loads(reqbody)
for x in req:
req[x] = base64.b64decode(req[x].replace('-', '+').replace('_', '/'))