2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-08-21 02:30:22 +00:00

Advance state of asyncio

Add a mechanism to close a session the right way
in tlvdata

Fix confluentdbutil/configmanager to restore/dump db to directory

Move auth to asyncio away from eventlet

Fix some issues with httpapi, enable reading body via aiohttp

Fix health from ipmi plugin

Fix user creation across a collective.
This commit is contained in:
Jarrod Johnson
2024-06-13 16:32:02 -04:00
parent bdb7f064d6
commit b6a0250e5c
8 changed files with 54 additions and 49 deletions

View File

@@ -198,6 +198,12 @@ async def sendall(handle, data):
cloop = asyncio.get_event_loop()
return await cloop.sock_sendall(handle, data)
async def close(handle):
if isinstance(handle, tuple):
handle[1].close()
await handle[1].wait_closed()
else:
handle.close()
async def send(handle, data, filehandle=None):
cloop = asyncio.get_event_loop()

View File

@@ -16,6 +16,7 @@
# limitations under the License.
import asyncio
import getpass
import optparse
import sys
@@ -71,7 +72,8 @@ if args[0] == 'restore':
try:
cfm.init(True)
cfm.statelessmode = True
cfm.restore_db_from_directory(dumpdir, password)
dp = cfm.restore_db_from_directory(dumpdir, password)
asyncio.get_event_loop().run_until_complete(dp)
cfm.statelessmode = False
cfm.ConfigManager.wait_for_sync(True)
if owner != 0:
@@ -101,8 +103,8 @@ elif args[0] == 'dump':
main._initsecurity(conf.get_config())
if not os.path.exists(dumpdir):
os.makedirs(dumpdir)
cfm.dump_db_to_directory(dumpdir, password, options.redact,
options.skipkeys)
dp = cfm.dump_db_to_directory(dumpdir, password, options.redact,
options.skipkeys)
asyncio.get_event_loop().run_until_complete(dp)

View File

@@ -21,12 +21,11 @@
import asyncio
import confluent.config.configmanager as configmanager
import eventlet
import eventlet.tpool
try:
import Cryptodome.Protocol.KDF as KDF
except ImportError:
import Crypto.Protocol.KDF as KDF
from concurrent.futures import ProcessPoolExecutor
from fnmatch import fnmatch
import hashlib
import hmac
@@ -314,11 +313,11 @@ async def check_user_passphrase(name, passphrase, operation=None, element=None,
global authworkers
global authcleaner
if authworkers is None:
authworkers = multiprocessing.Pool(processes=1)
authworkers = ProcessPoolExecutor(max_workers=1) # multiprocessing.Pool(processes=1)
else:
authcleaner.cancel()
authcleaner = eventlet.spawn_after(30, _clean_authworkers)
crypted = eventlet.tpool.execute(_do_pbkdf, passphrase, salt)
authcleaner = util.spawn_after(30, _clean_authworkers)
crypted = await _do_pbkdf(passphrase, salt)
del _passchecking[(user, tenant)]
await asyncio.sleep(
0.05) # either way, we want to stall so that client can't
@@ -391,9 +390,10 @@ def _clean_authworkers():
authcleaner = None
def _do_pbkdf(passphrase, salt):
async def _do_pbkdf(passphrase, salt):
# we must get it over to the authworkers pool or else get blocked in
# compute. However, we do want to wait for result, so we have
# one of the exceedingly rare sort of circumstances where 'apply'
# actually makes sense
return authworkers.apply(_apply_pbkdf, [passphrase, salt])
res = await asyncio.get_event_loop().run_in_executor(authworkers, _apply_pbkdf, passphrase, salt)
return res

View File

@@ -407,8 +407,7 @@ async def handle_connection(connection, cert, request, local=False):
connection,
{'collective':
{'status': 'Invalid token format'}})
connection[1].close()
await connection[1].wait_closed()
await tlvdata.close(connection)
return
host = request['server']
try:
@@ -446,8 +445,7 @@ async def handle_connection(connection, cert, request, local=False):
connection,
{'collective':
{'status': 'Failed to connect to {0}'.format(host)}})
connection[1].close()
await connection[1].wait_closed()
await tlvdata.close(connection)
raise
return
mycert = util.get_certificate_from_file(
@@ -474,8 +472,7 @@ async def handle_connection(connection, cert, request, local=False):
{'status': 'Bad server token'}})
return
await tlvdata.send(connection, {'collective': {'status': 'Success'}})
connection[1].close()
await connection[1].wait_closed()
await tlvdata.close(connection)
currentleader = rsp['collective']['leader']
f = open('/etc/confluent/cfg/myname', 'w')
f.write(name)

View File

@@ -281,12 +281,12 @@ def _rpc_del_user(tenant, name):
ConfigManager(tenant)._true_del_user(name)
def _rpc_master_create_user(tenant, *args):
ConfigManager(tenant).create_user(*args)
async def _rpc_master_create_user(tenant, *args):
await ConfigManager(tenant).create_user(*args)
def _rpc_master_create_usergroup(tenant, *args):
ConfigManager(tenant).create_usergroup(*args)
async def _rpc_master_create_usergroup(tenant, *args):
await ConfigManager(tenant).create_usergroup(*args)
def _rpc_create_user(tenant, *args):
@@ -1697,8 +1697,8 @@ class ConfigManager(object):
:param displayname: Optional long format name for UI consumption
"""
if cfgleader:
return exec_on_leader('_rpc_master_create_user', self.tenant,
name, role, uid, displayname, attributemap)
return await exec_on_leader('_rpc_master_create_user', self.tenant,
name, role, uid, displayname, attributemap)
if cfgstreams:
await exec_on_followers('_rpc_create_user', self.tenant, name,
role, uid, displayname, attributemap)
@@ -2942,8 +2942,8 @@ async def restore_db_from_directory(location, password):
collective = json.load(open(os.path.join(location, 'collective.json')))
_cfgstore['collective'] = {}
for coll in collective:
add_collective_member(coll, collective[coll]['address'],
collective[coll]['fingerprint'])
await add_collective_member(coll, collective[coll]['address'],
collective[coll]['fingerprint'])
except IOError as e:
if e.errno != 2:
raise
@@ -2953,13 +2953,13 @@ async def restore_db_from_directory(location, password):
ConfigManager.wait_for_sync(True)
def dump_db_to_directory(location, password, redact=None, skipkeys=False):
async def dump_db_to_directory(location, password, redact=None, skipkeys=False):
if not redact and not skipkeys:
with open(os.path.join(location, 'keys.json'), 'w') as cfgfile:
cfgfile.write(_dump_keys(password))
cfgfile.write('\n')
with open(os.path.join(location, 'main.json'), 'wb') as cfgfile:
cfgfile.write(ConfigManager(tenant=None)._dump_to_json(redact=redact))
cfgfile.write(await ConfigManager(tenant=None)._dump_to_json(redact=redact))
cfgfile.write(b'\n')
if 'collective' in _cfgstore:
with open(os.path.join(location, 'collective.json'), 'w') as cfgfile:

View File

@@ -586,7 +586,7 @@ def _init_core():
}
def create_user(inputdata, configmanager):
async def create_user(inputdata, configmanager):
try:
username = inputdata['name']
del inputdata['name']
@@ -594,10 +594,10 @@ def create_user(inputdata, configmanager):
del inputdata['role']
except (KeyError, ValueError):
raise exc.InvalidArgumentException('Missing user name or role')
configmanager.create_user(username, role, attributemap=inputdata)
await configmanager.create_user(username, role, attributemap=inputdata)
def create_usergroup(inputdata, configmanager):
async def create_usergroup(inputdata, configmanager):
try:
groupname = inputdata['name']
role = inputdata['role']
@@ -605,18 +605,18 @@ def create_usergroup(inputdata, configmanager):
del inputdata['role']
except (KeyError, ValueError):
raise exc.InvalidArgumentException("Missing user name or role")
configmanager.create_usergroup(groupname, role)
await configmanager.create_usergroup(groupname, role)
def update_usergroup(groupname, attribmap, configmanager):
async def update_usergroup(groupname, attribmap, configmanager):
try:
configmanager.set_usergroup(groupname, attribmap)
await configmanager.set_usergroup(groupname, attribmap)
except ValueError as e:
raise exc.InvalidArgumentException(str(e))
def update_user(name, attribmap, configmanager):
async def update_user(name, attribmap, configmanager):
try:
configmanager.set_user(name, attribmap)
await configmanager.set_user(name, attribmap)
except ValueError as e:
raise exc.InvalidArgumentException(str(e))
@@ -1334,7 +1334,7 @@ async def handle_path(path, operation, configmanager, inputdata=None, autostrip=
inputdata = msg.get_input_message(
pathcomponents, operation, inputdata,
configmanager=configmanager)
create_usergroup(inputdata.attribs, configmanager)
await create_usergroup(inputdata.attribs, configmanager)
return iterate_collections(configmanager.list_usergroups(),
forcecollection=False)
if usergroup not in configmanager.list_usergroups():
@@ -1347,7 +1347,7 @@ async def handle_path(path, operation, configmanager, inputdata=None, autostrip=
inputdata = msg.get_input_message(
pathcomponents, operation, inputdata,
configmanager=configmanager)
update_usergroup(usergroup, inputdata.attribs, configmanager)
await update_usergroup(usergroup, inputdata.attribs, configmanager)
return show_usergroup(usergroup, configmanager)
elif pathcomponents[0] == 'users':
# TODO: when non-administrator accounts exist,
@@ -1359,7 +1359,7 @@ async def handle_path(path, operation, configmanager, inputdata=None, autostrip=
inputdata = msg.get_input_message(
pathcomponents, operation, inputdata,
configmanager=configmanager)
create_user(inputdata.attribs, configmanager)
await create_user(inputdata.attribs, configmanager)
return iterate_collections(configmanager.list_users(),
forcecollection=False)
if user not in configmanager.list_users():

View File

@@ -291,7 +291,7 @@ async def _authorize_request(req, operation, reqbody):
if sessionid:
if sessionid in httpsessions:
if _csrf_valid(req, httpsessions[sessionid]):
if element == '/sessions/current/logout':
if req.rel_url.path == '/sessions/current/logout':
targets = []
for mythread in httpsessions[sessionid]['inflight']:
targets.append(mythread)
@@ -305,8 +305,9 @@ async def _authorize_request(req, operation, reqbody):
authdata = auth.authorize(
name, element=element, operation=operation,
skipuserobj=httpsessions[sessionid]['skipuserobject'])
if (not authdata) and 'Authorization' in req.headers:
if element == '/sessions/current/logout':
if req.rel_url.path == '/sessions/current/logout':
if 'Referer' in req.headers:
# note that this doesn't actually do harm
# otherwise, but this way do not give appearance
@@ -331,7 +332,6 @@ async def _authorize_request(req, operation, reqbody):
return {'code': 403}
elif not authdata:
return {'code': 401}
print(repr(authdata))
sessid = _establish_http_session(req, authdata, name, cookie)
if authdata and element and element.startswith('/sessions/current/webauthn/validate/'):
if webauthn:
@@ -344,7 +344,7 @@ async def _authorize_request(req, operation, reqbody):
auditmsg = {
'user': util.stringify(name),
'operation': operation,
'target': element,
'target': req.rel_url.path,
}
authinfo = {'code': 200,
'cookie': cookie,
@@ -677,8 +677,8 @@ async def resourcehandler_backend(req, make_response):
rsp = make_response(mimetype, 302, 'Found', {'Location': redir})
return
if req.content_length:
reqbody = env['wsgi.input'].read(int(env['CONTENT_LENGTH']))
reqtype = env['CONTENT_TYPE']
reqbody = await req.read()
reqtype = req.content_type
operation = opmap[req.method]
querydict = _get_query_dict(req, reqbody, reqtype)
if operation != 'retrieve' and 'restexplorerop' in querydict:
@@ -688,7 +688,7 @@ async def resourcehandler_backend(req, make_response):
if 'logout' in authorized:
rsp = await make_response("application/json", 200, 'Successful logout')
await rsp.write(b'{"result": "200 - Successful logout"}')
return
return rsp
if 'SuppressAuthHeader' in req.headers or 'ConfluentAuthToken' in req.headers:
badauth = {'Content-type': 'text/plain'}
else:

View File

@@ -215,7 +215,7 @@ class IpmiCommandWrapper(ipmicommand.Command):
return self._lasthealth
self._inhealth = True
try:
self._lasthealth = super(IpmiCommandWrapper, self).get_health()
self._lasthealth = await super(IpmiCommandWrapper, self).get_health()
except Exception:
self._inhealth = False
raise
@@ -560,7 +560,7 @@ class IpmiHandler:
elif self.element == ['boot', 'nextdevice']:
self.bootdevice()
elif self.element == ['health', 'hardware']:
self.health()
await self.health()
elif self.element == ['identify']:
self.identify()
elif self.element[0] == 'sensors':
@@ -1323,14 +1323,14 @@ class IpmiHandler:
return
health = response['health']
health = _str_health(health)
self.output.put(msg.HealthSummary(health, self.node))
await self.output.put(msg.HealthSummary(health, self.node))
if 'badreadings' in response:
badsensors = []
for reading in response['badreadings']:
if hasattr(reading, 'health'):
reading.health = _str_health(reading.health)
badsensors.append(reading)
self.output.put(msg.SensorReadings(badsensors, name=self.node))
await self.output.put(msg.SensorReadings(badsensors, name=self.node))
else:
raise exc.InvalidArgumentException('health is read-only')