diff --git a/confluent_client/confluent/asynctlvdata.py b/confluent_client/confluent/asynctlvdata.py index 629687d6..7d05ecd8 100644 --- a/confluent_client/confluent/asynctlvdata.py +++ b/confluent_client/confluent/asynctlvdata.py @@ -198,6 +198,12 @@ async def sendall(handle, data): cloop = asyncio.get_event_loop() return await cloop.sock_sendall(handle, data) +async def close(handle): + if isinstance(handle, tuple): + handle[1].close() + await handle[1].wait_closed() + else: + handle.close() async def send(handle, data, filehandle=None): cloop = asyncio.get_event_loop() diff --git a/confluent_server/bin/confluentdbutil b/confluent_server/bin/confluentdbutil index 25a5acf8..6022f127 100755 --- a/confluent_server/bin/confluentdbutil +++ b/confluent_server/bin/confluentdbutil @@ -16,6 +16,7 @@ # limitations under the License. +import asyncio import getpass import optparse import sys @@ -71,7 +72,8 @@ if args[0] == 'restore': try: cfm.init(True) cfm.statelessmode = True - cfm.restore_db_from_directory(dumpdir, password) + dp = cfm.restore_db_from_directory(dumpdir, password) + asyncio.get_event_loop().run_until_complete(dp) cfm.statelessmode = False cfm.ConfigManager.wait_for_sync(True) if owner != 0: @@ -101,8 +103,8 @@ elif args[0] == 'dump': main._initsecurity(conf.get_config()) if not os.path.exists(dumpdir): os.makedirs(dumpdir) - cfm.dump_db_to_directory(dumpdir, password, options.redact, - options.skipkeys) - + dp = cfm.dump_db_to_directory(dumpdir, password, options.redact, + options.skipkeys) + asyncio.get_event_loop().run_until_complete(dp) diff --git a/confluent_server/confluent/auth.py b/confluent_server/confluent/auth.py index 9b6c6eda..cfcd54e9 100644 --- a/confluent_server/confluent/auth.py +++ b/confluent_server/confluent/auth.py @@ -21,12 +21,11 @@ import asyncio import confluent.config.configmanager as configmanager -import eventlet -import eventlet.tpool try: import Cryptodome.Protocol.KDF as KDF except ImportError: import Crypto.Protocol.KDF as KDF +from concurrent.futures import ProcessPoolExecutor from fnmatch import fnmatch import hashlib import hmac @@ -314,11 +313,11 @@ async def check_user_passphrase(name, passphrase, operation=None, element=None, global authworkers global authcleaner if authworkers is None: - authworkers = multiprocessing.Pool(processes=1) + authworkers = ProcessPoolExecutor(max_workers=1) # multiprocessing.Pool(processes=1) else: authcleaner.cancel() - authcleaner = eventlet.spawn_after(30, _clean_authworkers) - crypted = eventlet.tpool.execute(_do_pbkdf, passphrase, salt) + authcleaner = util.spawn_after(30, _clean_authworkers) + crypted = await _do_pbkdf(passphrase, salt) del _passchecking[(user, tenant)] await asyncio.sleep( 0.05) # either way, we want to stall so that client can't @@ -391,9 +390,10 @@ def _clean_authworkers(): authcleaner = None -def _do_pbkdf(passphrase, salt): +async def _do_pbkdf(passphrase, salt): # we must get it over to the authworkers pool or else get blocked in # compute. However, we do want to wait for result, so we have # one of the exceedingly rare sort of circumstances where 'apply' # actually makes sense - return authworkers.apply(_apply_pbkdf, [passphrase, salt]) + res = await asyncio.get_event_loop().run_in_executor(authworkers, _apply_pbkdf, passphrase, salt) + return res diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index edc9db88..c3d5457e 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -407,8 +407,7 @@ async def handle_connection(connection, cert, request, local=False): connection, {'collective': {'status': 'Invalid token format'}}) - connection[1].close() - await connection[1].wait_closed() + await tlvdata.close(connection) return host = request['server'] try: @@ -446,8 +445,7 @@ async def handle_connection(connection, cert, request, local=False): connection, {'collective': {'status': 'Failed to connect to {0}'.format(host)}}) - connection[1].close() - await connection[1].wait_closed() + await tlvdata.close(connection) raise return mycert = util.get_certificate_from_file( @@ -474,8 +472,7 @@ async def handle_connection(connection, cert, request, local=False): {'status': 'Bad server token'}}) return await tlvdata.send(connection, {'collective': {'status': 'Success'}}) - connection[1].close() - await connection[1].wait_closed() + await tlvdata.close(connection) currentleader = rsp['collective']['leader'] f = open('/etc/confluent/cfg/myname', 'w') f.write(name) diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 34b48a45..1aaf08bf 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -281,12 +281,12 @@ def _rpc_del_user(tenant, name): ConfigManager(tenant)._true_del_user(name) -def _rpc_master_create_user(tenant, *args): - ConfigManager(tenant).create_user(*args) +async def _rpc_master_create_user(tenant, *args): + await ConfigManager(tenant).create_user(*args) -def _rpc_master_create_usergroup(tenant, *args): - ConfigManager(tenant).create_usergroup(*args) +async def _rpc_master_create_usergroup(tenant, *args): + await ConfigManager(tenant).create_usergroup(*args) def _rpc_create_user(tenant, *args): @@ -1697,8 +1697,8 @@ class ConfigManager(object): :param displayname: Optional long format name for UI consumption """ if cfgleader: - return exec_on_leader('_rpc_master_create_user', self.tenant, - name, role, uid, displayname, attributemap) + return await exec_on_leader('_rpc_master_create_user', self.tenant, + name, role, uid, displayname, attributemap) if cfgstreams: await exec_on_followers('_rpc_create_user', self.tenant, name, role, uid, displayname, attributemap) @@ -2942,8 +2942,8 @@ async def restore_db_from_directory(location, password): collective = json.load(open(os.path.join(location, 'collective.json'))) _cfgstore['collective'] = {} for coll in collective: - add_collective_member(coll, collective[coll]['address'], - collective[coll]['fingerprint']) + await add_collective_member(coll, collective[coll]['address'], + collective[coll]['fingerprint']) except IOError as e: if e.errno != 2: raise @@ -2953,13 +2953,13 @@ async def restore_db_from_directory(location, password): ConfigManager.wait_for_sync(True) -def dump_db_to_directory(location, password, redact=None, skipkeys=False): +async def dump_db_to_directory(location, password, redact=None, skipkeys=False): if not redact and not skipkeys: with open(os.path.join(location, 'keys.json'), 'w') as cfgfile: cfgfile.write(_dump_keys(password)) cfgfile.write('\n') with open(os.path.join(location, 'main.json'), 'wb') as cfgfile: - cfgfile.write(ConfigManager(tenant=None)._dump_to_json(redact=redact)) + cfgfile.write(await ConfigManager(tenant=None)._dump_to_json(redact=redact)) cfgfile.write(b'\n') if 'collective' in _cfgstore: with open(os.path.join(location, 'collective.json'), 'w') as cfgfile: diff --git a/confluent_server/confluent/core.py b/confluent_server/confluent/core.py index 161b43c3..6900ad68 100644 --- a/confluent_server/confluent/core.py +++ b/confluent_server/confluent/core.py @@ -586,7 +586,7 @@ def _init_core(): } -def create_user(inputdata, configmanager): +async def create_user(inputdata, configmanager): try: username = inputdata['name'] del inputdata['name'] @@ -594,10 +594,10 @@ def create_user(inputdata, configmanager): del inputdata['role'] except (KeyError, ValueError): raise exc.InvalidArgumentException('Missing user name or role') - configmanager.create_user(username, role, attributemap=inputdata) + await configmanager.create_user(username, role, attributemap=inputdata) -def create_usergroup(inputdata, configmanager): +async def create_usergroup(inputdata, configmanager): try: groupname = inputdata['name'] role = inputdata['role'] @@ -605,18 +605,18 @@ def create_usergroup(inputdata, configmanager): del inputdata['role'] except (KeyError, ValueError): raise exc.InvalidArgumentException("Missing user name or role") - configmanager.create_usergroup(groupname, role) + await configmanager.create_usergroup(groupname, role) -def update_usergroup(groupname, attribmap, configmanager): +async def update_usergroup(groupname, attribmap, configmanager): try: - configmanager.set_usergroup(groupname, attribmap) + await configmanager.set_usergroup(groupname, attribmap) except ValueError as e: raise exc.InvalidArgumentException(str(e)) -def update_user(name, attribmap, configmanager): +async def update_user(name, attribmap, configmanager): try: - configmanager.set_user(name, attribmap) + await configmanager.set_user(name, attribmap) except ValueError as e: raise exc.InvalidArgumentException(str(e)) @@ -1334,7 +1334,7 @@ async def handle_path(path, operation, configmanager, inputdata=None, autostrip= inputdata = msg.get_input_message( pathcomponents, operation, inputdata, configmanager=configmanager) - create_usergroup(inputdata.attribs, configmanager) + await create_usergroup(inputdata.attribs, configmanager) return iterate_collections(configmanager.list_usergroups(), forcecollection=False) if usergroup not in configmanager.list_usergroups(): @@ -1347,7 +1347,7 @@ async def handle_path(path, operation, configmanager, inputdata=None, autostrip= inputdata = msg.get_input_message( pathcomponents, operation, inputdata, configmanager=configmanager) - update_usergroup(usergroup, inputdata.attribs, configmanager) + await update_usergroup(usergroup, inputdata.attribs, configmanager) return show_usergroup(usergroup, configmanager) elif pathcomponents[0] == 'users': # TODO: when non-administrator accounts exist, @@ -1359,7 +1359,7 @@ async def handle_path(path, operation, configmanager, inputdata=None, autostrip= inputdata = msg.get_input_message( pathcomponents, operation, inputdata, configmanager=configmanager) - create_user(inputdata.attribs, configmanager) + await create_user(inputdata.attribs, configmanager) return iterate_collections(configmanager.list_users(), forcecollection=False) if user not in configmanager.list_users(): diff --git a/confluent_server/confluent/httpapi.py b/confluent_server/confluent/httpapi.py index b62db319..3e1e7aa2 100644 --- a/confluent_server/confluent/httpapi.py +++ b/confluent_server/confluent/httpapi.py @@ -291,7 +291,7 @@ async def _authorize_request(req, operation, reqbody): if sessionid: if sessionid in httpsessions: if _csrf_valid(req, httpsessions[sessionid]): - if element == '/sessions/current/logout': + if req.rel_url.path == '/sessions/current/logout': targets = [] for mythread in httpsessions[sessionid]['inflight']: targets.append(mythread) @@ -305,8 +305,9 @@ async def _authorize_request(req, operation, reqbody): authdata = auth.authorize( name, element=element, operation=operation, skipuserobj=httpsessions[sessionid]['skipuserobject']) + if (not authdata) and 'Authorization' in req.headers: - if element == '/sessions/current/logout': + if req.rel_url.path == '/sessions/current/logout': if 'Referer' in req.headers: # note that this doesn't actually do harm # otherwise, but this way do not give appearance @@ -331,7 +332,6 @@ async def _authorize_request(req, operation, reqbody): return {'code': 403} elif not authdata: return {'code': 401} - print(repr(authdata)) sessid = _establish_http_session(req, authdata, name, cookie) if authdata and element and element.startswith('/sessions/current/webauthn/validate/'): if webauthn: @@ -344,7 +344,7 @@ async def _authorize_request(req, operation, reqbody): auditmsg = { 'user': util.stringify(name), 'operation': operation, - 'target': element, + 'target': req.rel_url.path, } authinfo = {'code': 200, 'cookie': cookie, @@ -677,8 +677,8 @@ async def resourcehandler_backend(req, make_response): rsp = make_response(mimetype, 302, 'Found', {'Location': redir}) return if req.content_length: - reqbody = env['wsgi.input'].read(int(env['CONTENT_LENGTH'])) - reqtype = env['CONTENT_TYPE'] + reqbody = await req.read() + reqtype = req.content_type operation = opmap[req.method] querydict = _get_query_dict(req, reqbody, reqtype) if operation != 'retrieve' and 'restexplorerop' in querydict: @@ -688,7 +688,7 @@ async def resourcehandler_backend(req, make_response): if 'logout' in authorized: rsp = await make_response("application/json", 200, 'Successful logout') await rsp.write(b'{"result": "200 - Successful logout"}') - return + return rsp if 'SuppressAuthHeader' in req.headers or 'ConfluentAuthToken' in req.headers: badauth = {'Content-type': 'text/plain'} else: diff --git a/confluent_server/confluent/plugins/hardwaremanagement/ipmi.py b/confluent_server/confluent/plugins/hardwaremanagement/ipmi.py index db6ab48c..36aa2fc9 100644 --- a/confluent_server/confluent/plugins/hardwaremanagement/ipmi.py +++ b/confluent_server/confluent/plugins/hardwaremanagement/ipmi.py @@ -215,7 +215,7 @@ class IpmiCommandWrapper(ipmicommand.Command): return self._lasthealth self._inhealth = True try: - self._lasthealth = super(IpmiCommandWrapper, self).get_health() + self._lasthealth = await super(IpmiCommandWrapper, self).get_health() except Exception: self._inhealth = False raise @@ -560,7 +560,7 @@ class IpmiHandler: elif self.element == ['boot', 'nextdevice']: self.bootdevice() elif self.element == ['health', 'hardware']: - self.health() + await self.health() elif self.element == ['identify']: self.identify() elif self.element[0] == 'sensors': @@ -1323,14 +1323,14 @@ class IpmiHandler: return health = response['health'] health = _str_health(health) - self.output.put(msg.HealthSummary(health, self.node)) + await self.output.put(msg.HealthSummary(health, self.node)) if 'badreadings' in response: badsensors = [] for reading in response['badreadings']: if hasattr(reading, 'health'): reading.health = _str_health(reading.health) badsensors.append(reading) - self.output.put(msg.SensorReadings(badsensors, name=self.node)) + await self.output.put(msg.SensorReadings(badsensors, name=self.node)) else: raise exc.InvalidArgumentException('health is read-only')