#!/usr/bin/python import eventlet import eventlet.green.select as select import eventlet.green.subprocess as subprocess import logging logging.getLogger('libarchive').addHandler(logging.NullHandler()) import libarchive import hashlib import os import shutil import sys import time COPY = 1 EXTRACT = 2 READFILES = set([ 'README.diskdefines', 'media.1/products', 'media.2/products', '.discinfo', ]) HEADERSUMS = set([b'\x85\xeddW\x86\xc5\xbdhx\xbe\x81\x18X\x1e\xb4O\x14\x9d\x11\xb7C8\x9b\x97R\x0c-\xb8Ht\xcb\xb3']) HASHPRINTS = { '69d5f1c5e4474d70b0fb5374bfcb29bf57ba828ff00a55237cd757e61ed71048': {'name': 'cumulus-broadcom-amd64-4.0.0', 'method': COPY}, } from ctypes import byref, c_longlong, c_size_t, c_void_p from libarchive.ffi import ( write_disk_new, write_disk_set_options, write_free, write_header, read_data_block, write_data_block, write_finish_entry, ARCHIVE_EOF ) def extract_entries(entries, flags=0, callback=None, totalsize=None, extractlist=None): """Extracts the given archive entries into the current directory. """ buff, size, offset = c_void_p(), c_size_t(), c_longlong() buff_p, size_p, offset_p = byref(buff), byref(size), byref(offset) sizedone = 0 printat = 0 with libarchive.extract.new_archive_write_disk(flags) as write_p: for entry in entries: if str(entry).endswith('TRANS.TBL'): continue if extractlist and str(entry) not in extractlist: continue write_header(write_p, entry._entry_p) read_p = entry._archive_p while 1: r = read_data_block(read_p, buff_p, size_p, offset_p) sizedone += size.value if callback and time.time() > printat: callback({'progress': float(sizedone) / float(totalsize)}) printat = time.time() + 0.5 if r == ARCHIVE_EOF: break write_data_block(write_p, buff, size, offset) write_finish_entry(write_p) if callback: callback({'progress': float(sizedone) / float(totalsize)}) def extract_file(filepath, flags=0, callback=lambda x: None, imginfo=(), extractlist=None): """Extracts an archive from a file into the current directory.""" totalsize = 0 for img in imginfo: if not imginfo[img]: continue totalsize += imginfo[img] with libarchive.file_reader(filepath) as archive: extract_entries(archive, flags, callback, totalsize, extractlist) def check_centos(isoinfo): ver = None arch = None for entry in isoinfo[0]: if 'centos-release-7' in entry: dotsplit = entry.split('.') arch = dotsplit[-2] ver = dotsplit[0].split('release-')[-1].replace('-', '.') break elif 'centos-release-8' in entry: ver = entry.split('-')[2] arch = entry.split('.')[-2] break else: return None return {'name': 'centos-{0}-{1}'.format(ver, arch), 'method': EXTRACT} def check_ubuntu(isoinfo): if 'README.diskdefines' not in isoinfo[1]: return None arch = None variant = None ver = None diskdefs = isoinfo[1]['README.diskdefines'] for info in diskdefs.split(b'\n'): if not info: continue _, key, val = info.split(b' ', 2) val = val.strip() if key == b'ARCH': arch = val if arch == b'amd64': arch = b'x86_64' elif key == b'DISKNAME': variant, ver, _ = val.split(b' ', 2) if variant != b'Ubuntu-Server': return None if variant: if not isinstance(ver, str): ver = ver.decode('utf8') if not isinstance(arch, str): arch = arch.decode('utf8') major = '.'.join(ver.split('.', 2)[:2]) return {'name': 'ubuntu-{0}-{1}'.format(ver, arch), 'method': EXTRACT|COPY, 'extractlist': ['casper/vmlinuz', 'casper/initrd', 'EFI/BOOT/BOOTx64.EFI', 'EFI/BOOT/grubx64.efi' ], 'copyto': 'install.iso', 'category': 'ubuntu{0}'.format(major)} def check_sles(isoinfo): ver = None arch = 'x86_64' disk = None distro = '' if 'media.1/products' in isoinfo[1]: medianame = 'media.1/products' elif 'media.2/products' in isoinfo[1]: medianame = 'media.2/products' else: return None prodinfo = isoinfo[1][medianame] if not isinstance(prodinfo, str): prodinfo = prodinfo.decode('utf8') prodinfo = prodinfo.split('\n') hline = prodinfo[0].split(' ') ver = hline[-1].split('-')[0] major = ver.split('.', 2)[0] if hline[-1].startswith('15'): distro = 'sle' if hline[0] == '/': disk = '1' elif hline[0].startswith('/Module'): disk = '2' elif hline[-1].startswith('12'): if 'SLES' in hline[1]: distro = 'sles' if '.1' in medianame: disk = '1' elif '.2' in medianame: disk = '2' if disk and distro: return {'name': '{0}-{1}-{2}'.format(distro, ver, arch), 'method': EXTRACT, 'subname': disk, 'category': 'suse{0}'.format(major)} return None def check_rhel(isoinfo): ver = None arch = None for entry in isoinfo[0]: if 'redhat-release-7' in entry: dotsplit = entry.split('.') arch = dotsplit[-2] ver = dotsplit[0].split('release-')[-1].replace('-', '.') break elif 'redhat-release-8' in entry: ver = entry.split('-')[2] arch = entry.split('.')[-2] break else: return None major = ver.split('.', 1)[0] return {'name': 'rhel-{0}-{1}'.format(ver, arch), 'method': EXTRACT, 'category': 'el{0}'.format(major)} def scan_iso(filename): filesizes = {} filecontents = {} with libarchive.file_reader(filename) as reader: for ent in reader: if str(ent).endswith('TRANS.TBL'): continue eventlet.sleep(0) filesizes[str(ent)] = ent.size if str(ent) in READFILES: filecontents[str(ent)] = b'' for block in ent.get_blocks(): filecontents[str(ent)] += bytes(block) return filesizes, filecontents def fingerprint(filename): with open(filename, 'rb') as archive: header = archive.read(32768) archive.seek(32769) if archive.read(6) == b'CD001\x01': # ISO image isoinfo = scan_iso(filename) name = None for fun in globals(): if fun.startswith('check_'): name = globals()[fun](isoinfo) if name: return name, isoinfo[0] return None else: sum = hashlib.sha256(header) if sum.digest() in HEADERSUMS: archive.seek(32768) chunk = archive.read(32768) while chunk: sum.update(chunk) chunk = archive.read(32768) imginfo = HASHPRINTS.get(sum.hexdigest(), None) if imginfo: return imginfo, None def import_image(filename, callback, backend=False): identity = fingerprint(filename) if not identity: return -1 identity, imginfo = identity targpath = identity['name'] if identity.get('subname', None): targpath += '/' + identity['subname'] targpath = '/var/lib/confluent/distributions/' + targpath try: os.makedirs(targpath) except OSError as e: if e.errno != 17: raise filename = os.path.abspath(filename) os.chdir(targpath) if not backend: print('Importing OS to ' + targpath + ':') printit({'progress': 0.0}) if EXTRACT & identity['method']: extract_file(filename, callback=callback, imginfo=imginfo, extractlist=identity.get('extractlist', None)) if COPY & identity['method']: basename = identity.get('copyto', os.path.basename(filename)) targpath = os.path.join(targpath, basename) shutil.copyfile(filename, targpath) printit({'progress': 1.0}) sys.stdout.write('\n') def printit(info): sys.stdout.write(' \r{:.2f}%'.format(100 * info['progress'])) sys.stdout.flush() importing = {} class MediaImporter(object): def __init__(self, media): identity = fingerprint(media) identity, imginfo = identity if not identity: raise Exception('Unrecognized OS Media') importkey = ','.join((identity['name'], identity.get('subname', ''))) if importkey in importing: raise Exception('Media import already in progress for this media') importing[importkey] = self self.osname = identity['name'] self.oscategory = identity.get('category', None) targpath = identity['name'] if identity.get('subname', None): targpath += '/' + identity['subname'] self.targpath = '/var/lib/confluent/distributions/' + targpath try: os.makedirs(self.targpath) except OSError as e: if e.errno != 17: raise self.filename = os.path.abspath(media) self.importer = eventlet.spawn(self.importmedia) self.profiles = [] def importmedia(self): wkr = subprocess.Popen( [sys.executable, __file__, self.filename, '-b'], stdin=subprocess.DEVNULL, stdout=subprocess.PIPE) currline = b'' while wkr.poll() is None: currline += wkr.stdout.read(1) if b'\r' in currline: print(repr(currline)) currline = b'' a = wkr.stdout.read(1) while a: currline += a if b'\r' in currline: val = currline.split(b'%')[0] if val: self.percent = float(val) currline = b'' a = wkr.stdout.read(1) if self.oscategory: defprofile = '/opt/confluent/lib/osdeploy/{0}'.format( self.oscategory) osd, osversion, arch = self.osname.split('-') for prof in os.listdir('{0}/profiles'.format(defprofile)): srcname = '{0}/profiles/{1}'.format(defprofile, prof) profname = '{0}-{1}'.format(self.osname, prof) dirname = '/var/lib/confluent/public/os/{0}'.format(profname) shutil.copytree(srcname, dirname) profdata = None try: os.makedirs('{0}/boot/initramfs'.format(dirname)) except OSError as e: if e.errno != 17: raise with open('{0}/profile.yaml'.format(dirname)) as yin: profdata = yin.read() profdata = profdata.replace('%%DISTRO%%', osd) profdata = profdata.replace('%%VERSION%%', osversion) profdata = profdata.replace('%%ARCH%%', arch) if profdata: with open('{0}/profile.yaml'.format(dirname), 'w') as yout: yout.write(profdata) for initrd in os.listdir('{0}/initramfs'.format(defprofile)): fullpath = '{0}/initramfs/{1}'.format(defprofile, initrd) os.symlink(fullpath, '{0}/boot/initramfs/{1}'.format(dirname, initrd)) os.symlink( '/var/lib/confluent/public/site/initramfs.cpio', '{0}/boot/initramfs/site.cpio'.format(dirname)) subprocess.check_call( ['sh', '{0}/initprofile.sh'.format(dirname), self.targpath, dirname]) self.profiles.append(profname) if __name__ == '__main__': if len(sys.argv) > 2: sys.exit(import_image(sys.argv[1], callback=printit, backend=True)) else: sys.exit(import_image(sys.argv[1], callback=printit))