From 6d2918ed4507556bbe95644e10d00413bc7a5534 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Wed, 6 May 2020 14:22:25 -0400 Subject: [PATCH] Extend osimport Prepare osimport for inclusion in confluent server, by making it break off intense activity to a subprocess. Additionally, get it ready to track long running task in a way that can map to api. Finally, have it combine osdeploy stock profiles with image into install profiles. --- librarian/osimport | 136 +++++++++++++++++++++++++++++++++++++++------ 1 file changed, 118 insertions(+), 18 deletions(-) diff --git a/librarian/osimport b/librarian/osimport index 13e7d3a8..ecf472c7 100644 --- a/librarian/osimport +++ b/librarian/osimport @@ -1,4 +1,7 @@ #!/usr/bin/python +import eventlet +import eventlet.green.select as select +import eventlet.green.subprocess as subprocess import logging logging.getLogger('libarchive').addHandler(logging.NullHandler()) import libarchive @@ -6,9 +9,10 @@ import hashlib import os import shutil import sys +import time -COPY = 0 -EXTRACT = 1 +COPY = 1 +EXTRACT = 2 READFILES = set([ 'README.diskdefines', 'media.1/products', @@ -34,6 +38,7 @@ def extract_entries(entries, flags=0, callback=None, totalsize=None, extractlist buff, size, offset = c_void_p(), c_size_t(), c_longlong() buff_p, size_p, offset_p = byref(buff), byref(size), byref(offset) sizedone = 0 + printat = 0 with libarchive.extract.new_archive_write_disk(flags) as write_p: for entry in entries: if str(entry).endswith('TRANS.TBL'): @@ -45,12 +50,15 @@ def extract_entries(entries, flags=0, callback=None, totalsize=None, extractlist while 1: r = read_data_block(read_p, buff_p, size_p, offset_p) sizedone += size.value - if callback: + if callback and time.time() > printat: callback({'progress': float(sizedone) / float(totalsize)}) + printat = time.time() + 0.5 if r == ARCHIVE_EOF: break write_data_block(write_p, buff, size, offset) write_finish_entry(write_p) + if callback: + callback({'progress': float(sizedone) / float(totalsize)}) def extract_file(filepath, flags=0, callback=lambda x: None, imginfo=(), extractlist=None): """Extracts an archive from a file into the current directory.""" @@ -104,12 +112,14 @@ def check_ubuntu(isoinfo): ver = ver.decode('utf8') if not isinstance(arch, str): arch = arch.decode('utf8') + major = '.'.join(ver.split('.', 2)[:2]) return {'name': 'ubuntu-{0}-{1}'.format(ver, arch), - 'method': 'EXTRACTANDCOPY', + 'method': EXTRACT|COPY, 'extractlist': ['casper/vmlinuz', 'casper/initrd', 'EFI/BOOT/BOOTx64.EFI', 'EFI/BOOT/grubx64.efi' ], - 'copyto': 'install.iso'} + 'copyto': 'install.iso', + 'category': 'ubuntu{0}'.format(major)} def check_sles(isoinfo): @@ -129,6 +139,7 @@ def check_sles(isoinfo): prodinfo = prodinfo.split('\n') hline = prodinfo[0].split(' ') ver = hline[-1].split('-')[0] + major = ver.split('.', 2)[0] if hline[-1].startswith('15'): distro = 'sle' if hline[0] == '/': @@ -144,7 +155,8 @@ def check_sles(isoinfo): disk = '2' if disk and distro: return {'name': '{0}-{1}-{2}'.format(distro, ver, arch), - 'method': EXTRACT, 'subname': disk} + 'method': EXTRACT, 'subname': disk, + 'category': 'suse{0}'.format(major)} return None @@ -163,7 +175,8 @@ def check_rhel(isoinfo): break else: return None - return {'name': 'rhel-{0}-{1}'.format(ver, arch), 'method': EXTRACT} + major = ver.split('.', 1)[0] + return {'name': 'rhel-{0}-{1}'.format(ver, arch), 'method': EXTRACT, 'category': 'el{0}'.format(major)} def scan_iso(filename): filesizes = {} @@ -172,6 +185,7 @@ def scan_iso(filename): for ent in reader: if str(ent).endswith('TRANS.TBL'): continue + eventlet.sleep(0) filesizes[str(ent)] = ent.size if str(ent) in READFILES: filecontents[str(ent)] = b'' @@ -180,7 +194,7 @@ def scan_iso(filename): return filesizes, filecontents def fingerprint(filename): - with open(sys.argv[1], 'rb') as archive: + with open(filename, 'rb') as archive: header = archive.read(32768) archive.seek(32769) if archive.read(6) == b'CD001\x01': @@ -206,11 +220,7 @@ def fingerprint(filename): return imginfo, None -def printit(info): - sys.stdout.write(' \r{:.2f}%'.format(100 * info['progress'])) - sys.stdout.flush() - -def import_image(filename): +def import_image(filename, callback, backend=False): identity = fingerprint(filename) if not identity: return -1 @@ -226,17 +236,107 @@ def import_image(filename): raise filename = os.path.abspath(filename) os.chdir(targpath) - print('Importing OS to ' + targpath + ':') + if not backend: + print('Importing OS to ' + targpath + ':') printit({'progress': 0.0}) - if 'EXTRACT' in identity['method']: - extract_file(filename, callback=printit, imginfo=imginfo, extractlist=identity.get('extractlist', None)) - if 'COPY' in identity['method']: + if EXTRACT & identity['method']: + extract_file(filename, callback=callback, imginfo=imginfo, extractlist=identity.get('extractlist', None)) + if COPY & identity['method']: basename = identity.get('copyto', os.path.basename(filename)) targpath = os.path.join(targpath, basename) shutil.copyfile(filename, targpath) printit({'progress': 1.0}) sys.stdout.write('\n') +def printit(info): + sys.stdout.write(' \r{:.2f}%'.format(100 * info['progress'])) + sys.stdout.flush() + + +importing = {} +class MediaImporter(object): + + def __init__(self, media): + identity = fingerprint(media) + identity, imginfo = identity + if not identity: + raise Exception('Unrecognized OS Media') + importkey = ','.join((identity['name'], identity.get('subname', ''))) + if importkey in importing: + raise Exception('Media import already in progress for this media') + importing[importkey] = self + self.osname = identity['name'] + self.oscategory = identity.get('category', None) + targpath = identity['name'] + if identity.get('subname', None): + targpath += '/' + identity['subname'] + self.targpath = '/var/lib/confluent/distributions/' + targpath + try: + os.makedirs(self.targpath) + except OSError as e: + if e.errno != 17: + raise + self.filename = os.path.abspath(media) + self.importer = eventlet.spawn(self.importmedia) + self.profiles = [] + + def importmedia(self): + wkr = subprocess.Popen( + [sys.executable, __file__, self.filename, '-b'], + stdin=subprocess.DEVNULL, stdout=subprocess.PIPE) + currline = b'' + while wkr.poll() is None: + currline += wkr.stdout.read(1) + if b'\r' in currline: + print(repr(currline)) + currline = b'' + a = wkr.stdout.read(1) + while a: + currline += a + if b'\r' in currline: + val = currline.split(b'%')[0] + if val: + self.percent = float(val) + currline = b'' + a = wkr.stdout.read(1) + if self.oscategory: + defprofile = '/opt/confluent/lib/osdeploy/{0}'.format( + self.oscategory) + osd, osversion, arch = self.osname.split('-') + for prof in os.listdir('{0}/profiles'.format(defprofile)): + srcname = '{0}/profiles/{1}'.format(defprofile, prof) + profname = '{0}-{1}'.format(self.osname, prof) + dirname = '/var/lib/confluent/public/os/{0}'.format(profname) + shutil.copytree(srcname, dirname) + profdata = None + try: + os.makedirs('{0}/boot/initramfs'.format(dirname)) + except OSError as e: + if e.errno != 17: + raise + with open('{0}/profile.yaml'.format(dirname)) as yin: + profdata = yin.read() + profdata = profdata.replace('%%DISTRO%%', osd) + profdata = profdata.replace('%%VERSION%%', osversion) + profdata = profdata.replace('%%ARCH%%', arch) + if profdata: + with open('{0}/profile.yaml'.format(dirname), 'w') as yout: + yout.write(profdata) + for initrd in os.listdir('{0}/initramfs'.format(defprofile)): + fullpath = '{0}/initramfs/{1}'.format(defprofile, initrd) + os.symlink(fullpath, '{0}/boot/initramfs/{1}'.format(dirname, initrd)) + os.symlink( + '/var/lib/confluent/public/site/initramfs.cpio', + '{0}/boot/initramfs/site.cpio'.format(dirname)) + subprocess.check_call( + ['sh', '{0}/initprofile.sh'.format(dirname), + self.targpath, dirname]) + self.profiles.append(profname) + + if __name__ == '__main__': - sys.exit(import_image(sys.argv[1])) + if len(sys.argv) > 2: + sys.exit(import_image(sys.argv[1], callback=printit, backend=True)) + else: + sys.exit(import_image(sys.argv[1], callback=printit))