2
0
mirror of https://github.com/xcat2/confluent.git synced 2024-11-22 09:32:21 +00:00

Merge pull request #133 from tkucherera-lenovo/staging

Staging
This commit is contained in:
Jarrod Johnson 2024-09-12 11:38:49 -04:00 committed by GitHub
commit 78dc6e31d7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 138 additions and 2 deletions

View File

@ -70,6 +70,7 @@ import os
import eventlet.green.socket as socket
import struct
import sys
import uuid
import yaml
pluginmap = {}
@ -161,8 +162,9 @@ def _merge_dict(original, custom):
rootcollections = ['deployment/', 'discovery/', 'events/', 'networking/',
'noderange/', 'nodes/', 'nodegroups/', 'storage/', 'usergroups/' ,
'users/', 'uuid', 'version']
'noderange/', 'nodes/', 'nodegroups/', 'storage/', 'usergroups/',
'users/', 'uuid', 'version', 'staging/']
class PluginRoute(object):
@ -1267,6 +1269,87 @@ def handle_discovery(pathcomponents, operation, configmanager, inputdata):
if pathcomponents[0] == 'detected':
pass
class Staging:
def __init__(self, user, uuid):
self.uuid_str = uuid
self.storage_folder = '/var/lib/confluent/client_assets/' + self.uuid_str
self.filename = None
self.user = user
self.base_folder = os.path.exists('/var/lib/confluent/client_assets/')
if not self.base_folder:
try:
os.mkdir('/var/lib/confluent/client_assets/')
except Exception as e:
raise OSError(str(e))
def getUUID(self):
return self.uuid_str
def get_push_url(self):
return 'staging/{0}/{1}'.format(self.user,self.uuid_str)
def create_directory(self):
try:
os.mkdir(self.storage_folder)
return True
except OSError as e:
raise exc.InvalidArgumentException(str(e))
def get_file_name(self):
stage_file = '{}/filename.txt'.format(self.storage_folder)
try:
with open(stage_file, 'r') as f:
filename = f.readline()
os.remove(stage_file)
return self.storage_folder + '/{}'.format(filename)
except FileNotFoundError:
file = None
return False
def deldirectory(self):
pass
def handle_staging(pathcomponents, operation, configmanager, inputdata):
'''
e.g push_url: /confluent-api/staging/user/<unique_id>
'''
if operation == 'create':
if len(pathcomponents) == 1:
stage = Staging(inputdata['user'],str(uuid.uuid1()))
if stage.create_directory():
if 'filename' in inputdata:
data_file = stage.storage_folder + '/filename.txt'
with open(data_file, 'w') as f:
f.write(inputdata['filename'])
else:
raise Exception('Error: Missing filename arg')
push_url = stage.get_push_url()
yield msg.CreatedResource(push_url)
elif len(pathcomponents) == 3:
stage = Staging(pathcomponents[1], pathcomponents[2])
file = stage.get_file_name()
if 'filedata' in inputdata and file:
content_length = inputdata['content_length']
remaining_length = content_length
filedata = inputdata['filedata']
chunk_size = 16384
progress = 0.0
with open(file, 'wb') as f:
while remaining_length > 0:
progress = (1 - (remaining_length/content_length)) * 100
datachunk = filedata['wsgi.input'].read(min(chunk_size, remaining_length))
f.write(datachunk)
remaining_length -= len(datachunk)
yield msg.FileUploadProgress(progress)
yield msg.FileUploadProgress(100)
elif operation == 'retrieve':
pass
return
def handle_path(path, operation, configmanager, inputdata=None, autostrip=True):
"""Given a full path request, return an object.
@ -1375,5 +1458,7 @@ def handle_path(path, operation, configmanager, inputdata=None, autostrip=True):
elif pathcomponents[0] == 'discovery':
return handle_discovery(pathcomponents[1:], operation, configmanager,
inputdata)
elif pathcomponents[0] == 'staging':
return handle_staging(pathcomponents, operation, configmanager, inputdata)
else:
raise exc.NotFoundException()

View File

@ -935,6 +935,45 @@ def resourcehandler_backend(env, start_response):
start_response('200 OK', headers)
yield rsp
return
elif (operation == 'create' and ('/staging' in env['PATH_INFO'])):
url = env['PATH_INFO']
args_dict = {}
content_length = int(env.get('CONTENT_LENGTH', 0))
if content_length > 0 and (len(url.split('/')) > 2):
# check if the user and the url defined user are the same
if authorized['username'] == url.split('/')[2]:
args_dict.update({'filedata':env, 'content_length': content_length})
hdlr = pluginapi.handle_path(url, operation, cfgmgr, args_dict)
for resp in hdlr:
if isinstance(resp, confluent.messages.FileUploadProgress):
if resp.kvpairs['progress']['value'] == 100:
progress = resp.kvpairs['progress']['value']
start_response('200 OK', headers)
yield json.dumps({'data': 'done'})
return
else:
start_response('401 Unauthorized', headers)
yield json.dumps({'data': 'You do not have permission to write to file'})
return
elif 'application/json' in reqtype and (len(url.split('/')) == 2):
if not isinstance(reqbody, str):
reqbody = reqbody.decode('utf8')
pbody = json.loads(reqbody)
args = pbody['args']
args_dict.update({'filename': args, 'user': authorized['username']})
try:
args_dict.update({'bank': pbody['bank']})
except KeyError:
pass
hdlr = pluginapi.handle_path(url, operation, cfgmgr, args_dict)
for res in hdlr:
if isinstance(res, confluent.messages.CreatedResource):
stageurl = res.kvpairs['created']
start_response('200 OK', headers)
yield json.dumps({'data': stageurl})
return
else:
# normal request
url = env['PATH_INFO']

View File

@ -640,6 +640,18 @@ class SavedFile(ConfluentMessage):
self.myargs = (node, file)
self.kvpairs = {node: {'filename': file}}
class FileUploadProgress(ConfluentMessage):
readonly = True
def __init__(self, progress, name=None):
self.myargs = (progress)
self.stripped = False
self.notnode = name is None
if self.notnode:
self.kvpairs = {'progress': {'value': progress}}
else:
self.kvpairs = {name: {'progress': {'value': progress}}}
class InputAlertData(ConfluentMessage):
def __init__(self, path, inputdata, nodes=None):