From 820d255a1ab7a08aca2799460946365736fc1de4 Mon Sep 17 00:00:00 2001 From: tkucherera Date: Mon, 2 Oct 2023 11:23:23 -0400 Subject: [PATCH 1/2] staging feature --- confluent_server/confluent/core.py | 86 +++++++++++++++++++++++++- confluent_server/confluent/httpapi.py | 39 ++++++++++++ confluent_server/confluent/messages.py | 12 ++++ 3 files changed, 136 insertions(+), 1 deletion(-) diff --git a/confluent_server/confluent/core.py b/confluent_server/confluent/core.py index a9ee1dba..e94d2abb 100644 --- a/confluent_server/confluent/core.py +++ b/confluent_server/confluent/core.py @@ -69,6 +69,7 @@ import os import eventlet.green.socket as socket import struct import sys +import uuid pluginmap = {} dispatch_plugins = (b'ipmi', u'ipmi', b'redfish', u'redfish', b'tsmsol', u'tsmsol', b'geist', u'geist', b'deltapdu', u'deltapdu', b'eatonpdu', u'eatonpdu', b'affluent', u'affluent', b'cnos', u'cnos') @@ -160,7 +161,7 @@ def _merge_dict(original, custom): rootcollections = ['deployment/', 'discovery/', 'events/', 'networking/', 'noderange/', 'nodes/', 'nodegroups/', 'usergroups/' , - 'users/', 'uuid', 'version'] + 'users/', 'uuid', 'version', 'staging/'] class PluginRoute(object): @@ -1211,6 +1212,87 @@ def handle_discovery(pathcomponents, operation, configmanager, inputdata): if pathcomponents[0] == 'detected': pass +class Staging: + def __init__(self, user, uuid): + self.uuid_str = uuid + self.storage_folder = '/var/lib/confluent/client_assets/' + self.uuid_str + self.filename = None + self.user = user + self.base_folder = os.path.exists('/var/lib/confluent/client_assets/') + + if not self.base_folder: + try: + os.mkdir('/var/lib/confluent/client_assets/') + except Exception as e: + raise OSError(str(e)) + + def getUUID(self): + return self.uuid_str + + def get_push_url(self): + return 'staging/{0}/{1}'.format(self.user,self.uuid_str) + + def create_directory(self): + try: + os.mkdir(self.storage_folder) + return True + except OSError as e: + raise exc.InvalidArgumentException(str(e)) + + def get_file_name(self): + stage_file = '{}/filename.txt'.format(self.storage_folder) + try: + with open(stage_file, 'r') as f: + filename = f.readline() + os.remove(stage_file) + return self.storage_folder + '/{}'.format(filename) + except FileNotFoundError: + file = None + return False + + def deldirectory(self): + pass + +def handle_staging(pathcomponents, operation, configmanager, inputdata): + ''' + e.g push_url: /confluent-api/staging/user/ + ''' + if operation == 'create': + if len(pathcomponents) == 1: + stage = Staging(inputdata['user'],str(uuid.uuid1())) + if stage.create_directory(): + if 'filename' in inputdata: + data_file = stage.storage_folder + '/filename.txt' + with open(data_file, 'w') as f: + f.write(inputdata['filename']) + else: + raise Exception('Error: Missing filename arg') + push_url = stage.get_push_url() + yield msg.CreatedResource(push_url) + + elif len(pathcomponents) == 3: + stage = Staging(pathcomponents[1], pathcomponents[2]) + file = stage.get_file_name() + if 'filedata' in inputdata and file: + content_length = inputdata['content_length'] + remaining_length = content_length + filedata = inputdata['filedata'] + chunk_size = 16384 + progress = 0.0 + with open(file, 'wb') as f: + while remaining_length > 0: + progress = (1 - (remaining_length/content_length)) * 100 + datachunk = filedata['wsgi.input'].read(min(chunk_size, remaining_length)) + f.write(datachunk) + remaining_length -= len(datachunk) + yield msg.FileUploadProgress(progress) + yield msg.FileUploadProgress(100) + + + elif operation == 'retrieve': + pass + return + def handle_path(path, operation, configmanager, inputdata=None, autostrip=True): """Given a full path request, return an object. @@ -1316,5 +1398,7 @@ def handle_path(path, operation, configmanager, inputdata=None, autostrip=True): elif pathcomponents[0] == 'discovery': return handle_discovery(pathcomponents[1:], operation, configmanager, inputdata) + elif pathcomponents[0] == 'staging': + return handle_staging(pathcomponents, operation, configmanager, inputdata) else: raise exc.NotFoundException() diff --git a/confluent_server/confluent/httpapi.py b/confluent_server/confluent/httpapi.py index 5a145a0c..baf40606 100644 --- a/confluent_server/confluent/httpapi.py +++ b/confluent_server/confluent/httpapi.py @@ -887,6 +887,45 @@ def resourcehandler_backend(env, start_response): start_response('200 OK', headers) yield rsp return + + elif (operation == 'create' and ('/staging' in env['PATH_INFO'])): + url = env['PATH_INFO'] + args_dict = {} + content_length = int(env.get('CONTENT_LENGTH', 0)) + if content_length > 0 and (len(url.split('/')) > 2): + # check if the user and the url defined user are the same + if authorized['username'] == url.split('/')[2]: + args_dict.update({'filedata':env, 'content_length': content_length}) + hdlr = pluginapi.handle_path(url, operation, cfgmgr, args_dict) + for resp in hdlr: + if isinstance(resp, confluent.messages.FileUploadProgress): + if resp.kvpairs['progress']['value'] == 100: + progress = resp.kvpairs['progress']['value'] + start_response('200 OK', headers) + yield json.dumps({'data': 'done'}) + return + else: + start_response('401 Unauthorized', headers) + yield json.dumps({'data': 'You do not have permission to write to file'}) + return + elif 'application/json' in reqtype and (len(url.split('/')) == 2): + if not isinstance(reqbody, str): + reqbody = reqbody.decode('utf8') + pbody = json.loads(reqbody) + args = pbody['args'] + args_dict.update({'filename': args, 'user': authorized['username']}) + try: + args_dict.update({'bank': pbody['bank']}) + except KeyError: + pass + hdlr = pluginapi.handle_path(url, operation, cfgmgr, args_dict) + for res in hdlr: + if isinstance(res, confluent.messages.CreatedResource): + stageurl = res.kvpairs['created'] + start_response('200 OK', headers) + yield json.dumps({'data': stageurl}) + return + else: # normal request url = env['PATH_INFO'] diff --git a/confluent_server/confluent/messages.py b/confluent_server/confluent/messages.py index a24a4d78..d0e720be 100644 --- a/confluent_server/confluent/messages.py +++ b/confluent_server/confluent/messages.py @@ -622,6 +622,18 @@ class SavedFile(ConfluentMessage): self.myargs = (node, file) self.kvpairs = {node: {'filename': file}} +class FileUploadProgress(ConfluentMessage): + readonly = True + + def __init__(self, progress, name=None): + self.myargs = (progress) + self.stripped = False + self.notnode = name is None + if self.notnode: + self.kvpairs = {'progress': {'value': progress}} + else: + self.kvpairs = {name: {'progress': {'value': progress}}} + class InputAlertData(ConfluentMessage): def __init__(self, path, inputdata, nodes=None): From d553ab864bbcc3da82db8a316b825aa76b133014 Mon Sep 17 00:00:00 2001 From: tkucherera Date: Thu, 12 Sep 2024 10:27:05 -0400 Subject: [PATCH 2/2] resolve merge conflicts --- confluent_server/confluent/core.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/confluent_server/confluent/core.py b/confluent_server/confluent/core.py index 55456453..ec62c57d 100644 --- a/confluent_server/confluent/core.py +++ b/confluent_server/confluent/core.py @@ -70,6 +70,7 @@ import eventlet.green.socket as socket import struct import sys import uuid +import yaml pluginmap = {} dispatch_plugins = (b'ipmi', u'ipmi', b'redfish', u'redfish', b'tsmsol', u'tsmsol', b'geist', u'geist', b'deltapdu', u'deltapdu', b'eatonpdu', u'eatonpdu', b'affluent', u'affluent', b'cnos', u'cnos') @@ -160,7 +161,7 @@ def _merge_dict(original, custom): rootcollections = ['deployment/', 'discovery/', 'events/', 'networking/', - 'noderange/', 'nodes/', 'nodegroups/', 'usergroups/' , + 'noderange/', 'nodes/', 'nodegroups/', 'storage/', 'usergroups/' , 'users/', 'uuid', 'version', 'staging/']