mirror of
https://github.com/xcat2/confluent.git
synced 2025-01-15 04:07:51 +00:00
Begin work on webauthn support
Provide appropriate registration options as a first step.
This commit is contained in:
parent
48d46bcfae
commit
9b39c96135
@ -162,6 +162,8 @@ def authorize(name, element, tenant=False, operation='create',
|
||||
return False
|
||||
manager = configmanager.ConfigManager(tenant, username=user)
|
||||
userobj = manager.get_user(user)
|
||||
if userobj and userobj.get('role', None) == 'Stub':
|
||||
userobj = None
|
||||
if not userobj:
|
||||
for group in userutil.grouplist(user):
|
||||
userobj = manager.get_usergroup(group)
|
||||
|
@ -113,7 +113,7 @@ _attraliases = {
|
||||
'bmcpass': 'secret.hardwaremanagementpassword',
|
||||
'switchpass': 'secret.hardwaremanagementpassword',
|
||||
}
|
||||
_validroles = ('Administrator', 'Operator', 'Monitor')
|
||||
_validroles = ('Administrator', 'Operator', 'Monitor', 'Stub')
|
||||
|
||||
membership_callback = None
|
||||
|
||||
@ -2777,8 +2777,8 @@ def dump_db_to_directory(location, password, redact=None, skipkeys=False):
|
||||
cfgfile.write('\n')
|
||||
bkupglobals = get_globals()
|
||||
if bkupglobals:
|
||||
json.dump(bkupglobals, open(os.path.join(location, 'globals.json'),
|
||||
'w'))
|
||||
with open(os.path.join(location, 'globals.json'), 'w') as globout:
|
||||
json.dump(bkupglobals, globout)
|
||||
try:
|
||||
for tenant in os.listdir(
|
||||
os.path.join(ConfigManager._cfgdir, '/tenants/')):
|
||||
|
@ -22,9 +22,9 @@ try:
|
||||
except ModuleNotFoundError:
|
||||
import http.cookies as Cookie
|
||||
try:
|
||||
import pywarp
|
||||
import confluent.webauthn as webauthn
|
||||
except ImportError:
|
||||
pywarp = None
|
||||
webauthn = None
|
||||
import confluent.auth as auth
|
||||
import confluent.config.attributes as attribs
|
||||
import confluent.consoleserver as consoleserver
|
||||
@ -834,6 +834,14 @@ def resourcehandler_backend(env, start_response):
|
||||
tlvdata.unicode_dictvalues(sessinfo)
|
||||
yield json.dumps(sessinfo)
|
||||
return
|
||||
elif url.startswith('/sessions/current/webauthn/'):
|
||||
if not webauthn:
|
||||
start_response('501 Not Implemented', headers)
|
||||
yield ''
|
||||
return
|
||||
for rsp in webauthn.handle_api_request(url, env, start_response, authorized['username'], cfgmgr, headers):
|
||||
yield rsp
|
||||
return
|
||||
resource = '.' + url[url.rindex('/'):]
|
||||
lquerydict = copy.deepcopy(querydict)
|
||||
try:
|
||||
|
51
confluent_server/confluent/webauthn.py
Normal file
51
confluent_server/confluent/webauthn.py
Normal file
@ -0,0 +1,51 @@
|
||||
import confluent.util as util
|
||||
import json
|
||||
import pywarp
|
||||
import pywarp.backends
|
||||
|
||||
creds = {}
|
||||
challenges = {}
|
||||
|
||||
class TestBackend(pywarp.backends.CredentialStorageBackend):
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def get_credential_by_email(self, email):
|
||||
return creds[email]
|
||||
|
||||
def save_credential_for_user(self, email, credential):
|
||||
creds[email] = credential
|
||||
|
||||
def save_challenge_for_user(self, email, challenge, type):
|
||||
challenges[email] = challenge
|
||||
|
||||
def get_challenge_for_user(self, email, type):
|
||||
return challenges[email]
|
||||
|
||||
|
||||
def handle_api_request(url, env, start_response, username, cfm, headers):
|
||||
if env['REQUEST_METHOD'] != 'POST':
|
||||
raise Exception('Only POST supported for webauthn operations')
|
||||
url = url.replace('/sessions/current/webauthn', '')
|
||||
if'CONTENT_LENGTH' in env and int(env['CONTENT_LENGTH']) > 0:
|
||||
reqbody = env['wsgi.input'].read(int(env['CONTENT_LENGTH']))
|
||||
reqtype = env['CONTENT_TYPE']
|
||||
if url == '/registration_options':
|
||||
rp = pywarp.RelyingPartyManager('Confluent Web UI', credential_storage_backend=TestBackend())
|
||||
userinfo = cfm.get_user(username)
|
||||
if not userinfo:
|
||||
cfm.create_user(username, role='Stub')
|
||||
userinfo = cfm.get_user(username)
|
||||
authid = userinfo.get('authid', None)
|
||||
if not authid:
|
||||
authid = util.randomstring(64)
|
||||
cfm.set_user(username, {'authid': authid})
|
||||
opts = rp.get_registration_options(username)
|
||||
# pywarp generates an id derived
|
||||
# from username, which is a 'must not' in the spec
|
||||
# we replace that with a complying approach
|
||||
opts['user']['id'] = authid
|
||||
if 'icon' in opts['user']:
|
||||
del opts['user']['icon']
|
||||
start_response('200 OK', headers)
|
||||
yield json.dumps(opts)
|
Loading…
x
Reference in New Issue
Block a user