From 485c3236081ca0efd6627413ab5d6c0c3bc50433 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Thu, 27 Jan 2022 17:29:17 -0500 Subject: [PATCH] Stage uploads in memory The strategy of duping file descriptors is inadequate. The copies share identical offsets. Fix this by reading the file once into memory, and using BytesIO to fake a file. This is relatively memory intensive in theory, but in practice pyghmi library had been duping everything to memory anyway, so it is a wash for now. --- confluent_server/confluent/firmwaremanager.py | 123 ++++++++++-------- 1 file changed, 69 insertions(+), 54 deletions(-) diff --git a/confluent_server/confluent/firmwaremanager.py b/confluent_server/confluent/firmwaremanager.py index 11c73ffc..33f14a80 100644 --- a/confluent_server/confluent/firmwaremanager.py +++ b/confluent_server/confluent/firmwaremanager.py @@ -22,6 +22,7 @@ import confluent.exceptions as exc import confluent.log as log import confluent.messages as msg import eventlet +import io import os import pwd import socket @@ -32,63 +33,71 @@ uploadsbytarget = {} downloadsbytarget = {} updatepool = eventlet.greenpool.GreenPool(256) _tracelog = None -filecontentbyname = {} +sharedfiles = {} def execupdate(handler, filename, updateobj, type, owner, node, datfile): global _tracelog - if type != 'ffdc' and not datfile: - errstr = False - if not os.path.exists(filename): - errstr = '{0} does not appear to exist on {1}, or is in a directory with permissions forbidding confluent user/group access'.format( - filename, socket.gethostname()) - elif not os.access(filename, os.R_OK): - errstr = '{0} is not readable by confluent on {1} (ensure confluent user or group can access file and parent directories)'.format( - filename, socket.gethostname()) - if errstr: - updateobj.handle_progress({'phase': 'error', 'progress': 0.0, - 'detail': errstr}) - return - if type == 'ffdc' and os.path.isdir(filename): - filename += '/' + node - if 'type' == 'ffdc': - errstr = False - if os.path.exists(filename): - errstr = '{0} already exists on {1}, cannot overwrite'.format( - filename, socket.gethostname()) - elif not os.access(os.path.dirname(filename), os.W_OK): - errstr = '{0} directory not writable by confluent user/group on {1}, check the directory and parent directory ownership and permissions'.format(filename, socket.gethostname()) - if errstr: - updateobj.handle_progress({'phase': 'error', 'progress': 0.0, - 'detail': errstr}) - return try: - if type == 'firmware': - completion = handler(filename, progress=updateobj.handle_progress, - data=datfile, bank=updateobj.bank) - else: - completion = handler(filename, progress=updateobj.handle_progress, - data=datfile) - if type == 'ffdc' and completion: - filename = completion - completion = None - if completion is None: - completion = 'complete' - if owner: - pwent = pwd.getpwnam(owner) - os.chown(filename, pwent.pw_uid, pwent.pw_gid) - updateobj.handle_progress({'phase': completion, 'progress': 100.0}) - except exc.PubkeyInvalid as pi: - errstr = 'Certificate mismatch detected, does not match value in ' \ - 'attribute {0}'.format(pi.attrname) - updateobj.handle_progress({'phase': 'error', 'progress': 0.0, - 'detail': errstr}) - except Exception as e: - if _tracelog is None: - _tracelog = log.Logger('trace') - _tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event, event=log.Events.stacktrace) - updateobj.handle_progress({'phase': 'error', 'progress': 0.0, - 'detail': str(e)}) + if type != 'ffdc' and not datfile: + errstr = False + if not os.path.exists(filename): + errstr = '{0} does not appear to exist on {1}, or is in a directory with permissions forbidding confluent user/group access'.format( + filename, socket.gethostname()) + elif not os.access(filename, os.R_OK): + errstr = '{0} is not readable by confluent on {1} (ensure confluent user or group can access file and parent directories)'.format( + filename, socket.gethostname()) + if errstr: + updateobj.handle_progress({'phase': 'error', 'progress': 0.0, + 'detail': errstr}) + return + if type == 'ffdc' and os.path.isdir(filename): + filename += '/' + node + if 'type' == 'ffdc': + errstr = False + if os.path.exists(filename): + errstr = '{0} already exists on {1}, cannot overwrite'.format( + filename, socket.gethostname()) + elif not os.access(os.path.dirname(filename), os.W_OK): + errstr = '{0} directory not writable by confluent user/group on {1}, check the directory and parent directory ownership and permissions'.format(filename, socket.gethostname()) + if errstr: + updateobj.handle_progress({'phase': 'error', 'progress': 0.0, + 'detail': errstr}) + return + try: + if type == 'firmware': + completion = handler(filename, progress=updateobj.handle_progress, + data=datfile, bank=updateobj.bank) + else: + completion = handler(filename, progress=updateobj.handle_progress, + data=datfile) + if type == 'ffdc' and completion: + filename = completion + completion = None + if completion is None: + completion = 'complete' + if owner: + pwent = pwd.getpwnam(owner) + os.chown(filename, pwent.pw_uid, pwent.pw_gid) + updateobj.handle_progress({'phase': completion, 'progress': 100.0}) + except exc.PubkeyInvalid as pi: + errstr = 'Certificate mismatch detected, does not match value in ' \ + 'attribute {0}'.format(pi.attrname) + updateobj.handle_progress({'phase': 'error', 'progress': 0.0, + 'detail': errstr}) + except Exception as e: + if _tracelog is None: + _tracelog = log.Logger('trace') + _tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event, event=log.Events.stacktrace) + updateobj.handle_progress({'phase': 'error', 'progress': 0.0, + 'detail': str(e)}) + finally: + if filename in sharedfiles: + if sharedfiles[filename][0] == 1: + del sharedfiles[filename] + else: + sharedfiles[filename][0] -= 1 + class Updater(object): def __init__(self, node, handler, filename, tenant=None, name=None, @@ -99,8 +108,14 @@ class Updater(object): self.detail = '' self.percent = 0.0 if configmanager and filename in configmanager.clientfiles: - cf = configmanager.clientfiles[filename] - datfile = os.fdopen(os.dup(cf.fileno()), cf.mode) + if filename in sharedfiles: + sharedfiles[filename][0] += 1 + else: + cf = configmanager.clientfiles[filename] + datfile = os.fdopen(os.dup(cf.fileno()), cf.mode) + sharedfiles[filename] = [1, datfile.read()] + datfile.close() + datfile = io.BytesIO(sharedfiles[filename][1]) else: datfile = None self.datfile = datfile