2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-02-11 08:15:24 +00:00

Stage uploads in memory

The strategy of duping file descriptors
is inadequate. The copies share
identical offsets.

Fix this by reading the file once into
memory, and using BytesIO to fake a file.

This is relatively memory intensive in theory, but in practice
pyghmi library had been duping everything to memory
anyway, so it is a wash for now.
This commit is contained in:
Jarrod Johnson 2022-01-27 17:29:17 -05:00
parent 2bc080f4e0
commit 485c323608

View File

@ -22,6 +22,7 @@ import confluent.exceptions as exc
import confluent.log as log
import confluent.messages as msg
import eventlet
import io
import os
import pwd
import socket
@ -32,63 +33,71 @@ uploadsbytarget = {}
downloadsbytarget = {}
updatepool = eventlet.greenpool.GreenPool(256)
_tracelog = None
filecontentbyname = {}
sharedfiles = {}
def execupdate(handler, filename, updateobj, type, owner, node, datfile):
global _tracelog
if type != 'ffdc' and not datfile:
errstr = False
if not os.path.exists(filename):
errstr = '{0} does not appear to exist on {1}, or is in a directory with permissions forbidding confluent user/group access'.format(
filename, socket.gethostname())
elif not os.access(filename, os.R_OK):
errstr = '{0} is not readable by confluent on {1} (ensure confluent user or group can access file and parent directories)'.format(
filename, socket.gethostname())
if errstr:
updateobj.handle_progress({'phase': 'error', 'progress': 0.0,
'detail': errstr})
return
if type == 'ffdc' and os.path.isdir(filename):
filename += '/' + node
if 'type' == 'ffdc':
errstr = False
if os.path.exists(filename):
errstr = '{0} already exists on {1}, cannot overwrite'.format(
filename, socket.gethostname())
elif not os.access(os.path.dirname(filename), os.W_OK):
errstr = '{0} directory not writable by confluent user/group on {1}, check the directory and parent directory ownership and permissions'.format(filename, socket.gethostname())
if errstr:
updateobj.handle_progress({'phase': 'error', 'progress': 0.0,
'detail': errstr})
return
try:
if type == 'firmware':
completion = handler(filename, progress=updateobj.handle_progress,
data=datfile, bank=updateobj.bank)
else:
completion = handler(filename, progress=updateobj.handle_progress,
data=datfile)
if type == 'ffdc' and completion:
filename = completion
completion = None
if completion is None:
completion = 'complete'
if owner:
pwent = pwd.getpwnam(owner)
os.chown(filename, pwent.pw_uid, pwent.pw_gid)
updateobj.handle_progress({'phase': completion, 'progress': 100.0})
except exc.PubkeyInvalid as pi:
errstr = 'Certificate mismatch detected, does not match value in ' \
'attribute {0}'.format(pi.attrname)
updateobj.handle_progress({'phase': 'error', 'progress': 0.0,
'detail': errstr})
except Exception as e:
if _tracelog is None:
_tracelog = log.Logger('trace')
_tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event, event=log.Events.stacktrace)
updateobj.handle_progress({'phase': 'error', 'progress': 0.0,
'detail': str(e)})
if type != 'ffdc' and not datfile:
errstr = False
if not os.path.exists(filename):
errstr = '{0} does not appear to exist on {1}, or is in a directory with permissions forbidding confluent user/group access'.format(
filename, socket.gethostname())
elif not os.access(filename, os.R_OK):
errstr = '{0} is not readable by confluent on {1} (ensure confluent user or group can access file and parent directories)'.format(
filename, socket.gethostname())
if errstr:
updateobj.handle_progress({'phase': 'error', 'progress': 0.0,
'detail': errstr})
return
if type == 'ffdc' and os.path.isdir(filename):
filename += '/' + node
if 'type' == 'ffdc':
errstr = False
if os.path.exists(filename):
errstr = '{0} already exists on {1}, cannot overwrite'.format(
filename, socket.gethostname())
elif not os.access(os.path.dirname(filename), os.W_OK):
errstr = '{0} directory not writable by confluent user/group on {1}, check the directory and parent directory ownership and permissions'.format(filename, socket.gethostname())
if errstr:
updateobj.handle_progress({'phase': 'error', 'progress': 0.0,
'detail': errstr})
return
try:
if type == 'firmware':
completion = handler(filename, progress=updateobj.handle_progress,
data=datfile, bank=updateobj.bank)
else:
completion = handler(filename, progress=updateobj.handle_progress,
data=datfile)
if type == 'ffdc' and completion:
filename = completion
completion = None
if completion is None:
completion = 'complete'
if owner:
pwent = pwd.getpwnam(owner)
os.chown(filename, pwent.pw_uid, pwent.pw_gid)
updateobj.handle_progress({'phase': completion, 'progress': 100.0})
except exc.PubkeyInvalid as pi:
errstr = 'Certificate mismatch detected, does not match value in ' \
'attribute {0}'.format(pi.attrname)
updateobj.handle_progress({'phase': 'error', 'progress': 0.0,
'detail': errstr})
except Exception as e:
if _tracelog is None:
_tracelog = log.Logger('trace')
_tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event, event=log.Events.stacktrace)
updateobj.handle_progress({'phase': 'error', 'progress': 0.0,
'detail': str(e)})
finally:
if filename in sharedfiles:
if sharedfiles[filename][0] == 1:
del sharedfiles[filename]
else:
sharedfiles[filename][0] -= 1
class Updater(object):
def __init__(self, node, handler, filename, tenant=None, name=None,
@ -99,8 +108,14 @@ class Updater(object):
self.detail = ''
self.percent = 0.0
if configmanager and filename in configmanager.clientfiles:
cf = configmanager.clientfiles[filename]
datfile = os.fdopen(os.dup(cf.fileno()), cf.mode)
if filename in sharedfiles:
sharedfiles[filename][0] += 1
else:
cf = configmanager.clientfiles[filename]
datfile = os.fdopen(os.dup(cf.fileno()), cf.mode)
sharedfiles[filename] = [1, datfile.read()]
datfile.close()
datfile = io.BytesIO(sharedfiles[filename][1])
else:
datfile = None
self.datfile = datfile