#!/usr/bin/python3 __author__ = 'jjohnson2,bfinley' import argparse import asyncio import glob import os import os.path import pwd import shutil import sys import time path = os.path.dirname(os.path.realpath(__file__)) path = os.path.realpath(os.path.join(path, '..', 'lib', 'python')) if path.startswith('/opt'): sys.path.append(path) import confluent.collective.manager as collective import eventlet.green.subprocess as subprocess import confluent.selfservice as selfservice import confluent.util as util import confluent.client as client import confluent.sshutil as sshutil import confluent.certutil as certutil import confluent.netutil as netutil import socket try: input = raw_input except NameError: pass def emprint(txt): if sys.stdout.isatty(): print('\x1b[1m\x1b[4m' + txt + '\x1b[0m') else: print(txt) fnamechars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789.^' async def main(args): ap = argparse.ArgumentParser(description='Manage OS deployment resources') sp = ap.add_subparsers(dest='command') wiz = sp.add_parser('initialize', help='Do OS deployment preparation') wiz.add_argument('-a', help='Initialize SSH access by confluent to nodes for automation such as ansible playbook execution or syncfiles', action='store_true') wiz.add_argument('-g', help='Initialize a Genesis profile to boot systems into a rescue or staging environment', action='store_true') wiz.add_argument('-u', help='Pull in root user key for node deployment', action='store_true') wiz.add_argument('-s', help='Set up SSH CA for managing node to node ssh and known hosts', action='store_true') wiz.add_argument('-k', help='Update local global known hosts file with confluent CA', action='store_true') wiz.add_argument('-t', help='Generate new TLS key for HTTPS operation and register with confluent repository', action='store_true') wiz.add_argument('-p', help='Copy in TFTP contents required for PXE support', action='store_true') wiz.add_argument('-i', help='Interactively prompt for behaviors', action='store_true') wiz.add_argument('-l', help='Set up local management node to allow login from managed nodes', action='store_true') osip = sp.add_parser('importcheck', help='Check import of an OS image from an ISO image') osip.add_argument('imagefile', help='File to use for source of importing') osip = sp.add_parser('import', help='Import an OS image from an ISO image') osip.add_argument('imagefile', help='File to use for source of importing') osip.add_argument('-n', help='Specific a custom distribution name') upb = sp.add_parser( 'updateboot', help='Push profile.yaml of the named profile data into boot assets as appropriate') upb.add_argument('profile', help='Profile to update boot assets') osls = sp.add_parser('list', help='List OS images available for deployment') ubp = sp.add_parser('rebase', help='Update stock profile content from packaged updates') ubp.add_argument('profile', help='Profile to rebase from packaged content') cmdset = ap.parse_args() if cmdset.command == 'list': return oslist() if cmdset.command == 'import': return osimport(cmdset.imagefile, custname=cmdset.n) if cmdset.command == 'importcheck': return osimport(cmdset.imagefile, checkonly=True) if cmdset.command == 'initialize': return await initialize(cmdset) if cmdset.command == 'updateboot': return updateboot(cmdset.profile) if cmdset.command == 'rebase': return rebase(cmdset.profile) ap.print_help() def symlinkp(src, trg): try: os.symlink(src, trg) except Exception as e: if e.errno != 17: raise def initialize_genesis(): if not os.path.exists('/opt/confluent/genesis/x86_64/boot/kernel'): emprint('Install the confluent-genesis package to have the ' 'resources for a genesis profile') return 1 hasconfluentuser = None try: hasconfluentuser = pwd.getpwnam('confluent') except KeyError: pass pid = os.fork() if pid: retval = os.waitpid(pid, 0) return retval[1] retcode = 0 try: util.mkdirp('/var/lib/confluent', 0o755) if hasconfluentuser: os.chown('/var/lib/confluent', hasconfluentuser.pw_uid, -1) os.setgid(hasconfluentuser.pw_gid) os.setuid(hasconfluentuser.pw_uid) os.umask(0o22) util.mkdirp('/var/lib/confluent/public/os/genesis-x86_64/boot/efi/boot', 0o755) util.mkdirp('/var/lib/confluent/public/os/genesis-x86_64/boot/initramfs', 0o755) symlinkp('/opt/confluent/genesis/x86_64/boot/efi/boot/BOOTX64.EFI', '/var/lib/confluent/public/os/genesis-x86_64/boot/efi/boot/BOOTX64.EFI') symlinkp('/opt/confluent/genesis/x86_64/boot/efi/boot/grubx64.efi', '/var/lib/confluent/public/os/genesis-x86_64/boot/efi/boot/grubx64.efi') symlinkp('/opt/confluent/genesis/x86_64/boot/initramfs/distribution', '/var/lib/confluent/public/os/genesis-x86_64/boot/initramfs/distribution') symlinkp('/var/lib/confluent/public/site/initramfs.cpio', '/var/lib/confluent/public/os/genesis-x86_64/boot/initramfs/site.cpio') symlinkp('/opt/confluent/lib/osdeploy/genesis/initramfs/addons.cpio', '/var/lib/confluent/public/os/genesis-x86_64/boot/initramfs/addons.cpio') symlinkp('/opt/confluent/genesis/x86_64/boot/kernel', '/var/lib/confluent/public/os/genesis-x86_64/boot/kernel') if not os.path.exists('/var/lib/confluent/public/os/genesis-x86_64/ansible/'): shutil.copytree('/opt/confluent/lib/osdeploy/genesis/profiles/default/ansible/', '/var/lib/confluent/public/os/genesis-x86_64/ansible/') shutil.copytree('/opt/confluent/lib/osdeploy/genesis/profiles/default/scripts/', '/var/lib/confluent/public/os/genesis-x86_64/scripts/') shutil.copyfile('/opt/confluent/lib/osdeploy/genesis/profiles/default/profile.yaml', '/var/lib/confluent/public/os/genesis-x86_64/profile.yaml') except Exception as e: sys.stderr.write(str(e) + '\n') retcode = 1 finally: os._exit(retcode) mynamedone = False def init_confluent_myname(): global mynamedone if mynamedone: return mynamedone = True hasconfluentuser = None neededuid = os.stat('/etc/confluent').st_uid if neededuid == 0: return try: hasconfluentuser = pwd.getpwnam('confluent') except KeyError: pass if hasconfluentuser: print("Ok") pid = os.fork() if pid: os.waitpid(pid, 0) else: os.setgid(hasconfluentuser.pw_gid) os.setuid(hasconfluentuser.pw_uid) collective.get_myname() os._exit(0) def local_node_trust_setup(): init_confluent_myname() allnodes, domain = selfservice.get_cluster_list() myname = collective.get_myname() myprincipals = set([myname]) restorecon = os.path.exists('/usr/sbin/restorecon') neededlines = set([ 'HostbasedAuthentication yes', 'HostbasedUsesNameFromPacketOnly yes', 'IgnoreRhosts no']) myshortname = myname.split('.')[0] myprincipals.add(myshortname) if domain: myprincipals.add('{0}.{1}'.format(myshortname, domain)) for addr in netutil.get_my_addresses(): addr = socket.inet_ntop(addr[0], addr[1]) myprincipals.add(addr) for pubkey in glob.glob('/etc/ssh/ssh_host_*_key.pub'): currpubkey = open(pubkey, 'rb').read() cert = sshutil.sign_host_key(currpubkey, myname, myprincipals) certfile = pubkey.replace('key.pub', 'key-cert.pub') neededlines.add('HostCertificate {0}'.format(certfile)) if os.path.exists(certfile): os.unlink(certfile) with open(certfile, 'w') as certout: certout.write(cert) if restorecon: subprocess.check_call(['/usr/sbin/restorecon', certfile]) with open('/etc/ssh/sshd_config', 'r') as sshconf: currconfig = sshconf.read().split('\n') for conline in currconfig: conline = conline.strip() neededlines.discard(conline) if neededlines: with open('/etc/ssh/sshd_config', 'a') as cfgout: for currline in neededlines: cfgout.write(currline) cfgout.write('\n') with open('/etc/ssh/shosts.equiv', 'w') as equivout: for node in util.natural_sort(allnodes): equivout.write(node + '\n') with open('/root/.shosts', 'w') as equivout: for node in util.natural_sort(allnodes): equivout.write(node + '\n') if restorecon: subprocess.check_call( ['/usr/sbin/restorecon', '/etc/ssh/shosts.equiv', '/root/.shosts']) def install_tftp_content(): tftplocation = None candidates = ('/tftpboot', '/var/lib/tftpboot', '/srv/tftpboot', '/srv/tftp') for cand in candidates: if os.path.isdir(cand): tftplocation = cand break if not tftplocation: emprint('Unable to detect a directory for tftp content (check that tftp server is installed)') return 1 if os.path.exists('/usr/lib/systemd/system/tftp.socket'): if tftplocation == '/tftpboot': emprint('/tftpboot is detected as tftp directory, will not try to automatically enable tftp, as it is presumed to be externally managed') else: try: subprocess.check_call(['systemctl', 'enable', 'tftp.socket', '--now']) print('TFTP service is enabled and running') except Exception: emprint('Unable to automatically enable and start tftp.socket, tftp server may already be running outside of systemd control') else: emprint( 'Detected {0} as tftp directory, but unable to determine tftp service, ensure that a tftp server is installed and enabled manually'.format(tftplocation)) otftplocation = tftplocation tftplocation = '{0}/confluent/x86_64'.format(tftplocation) try: os.makedirs(tftplocation) except OSError as e: if e.errno != 17: raise armtftplocation = '{0}/confluent/aarch64'.format(otftplocation) try: os.makedirs(armtftplocation) except OSError as e: if e.errno != 17: raise shutil.copy('/opt/confluent/lib/ipxe/ipxe.efi', tftplocation) shutil.copy('/opt/confluent/lib/ipxe/ipxe.kkpxe', tftplocation) if os.path.exists('/opt/confluent/lib/ipxe/ipxe-aarch64.efi'): shutil.copy('/opt/confluent/lib/ipxe/ipxe-aarch64.efi', os.path.join(armtftplocation, 'ipxe.efi')) async def initialize(cmdset): if os.getuid() != 0: sys.stderr.write('This command must run as root user\n') sys.exit(1) if cmdset.i: didsomething = True sys.stdout.write('Add root user key to be authorized to log into nodes (-u)? (y/N): ') sys.stdout.flush() cmdset.u = input().strip().lower().startswith('y') sys.stdout.write('Initialize a profile to boot Genesis on target systems (a small Linux environment for rescue and staging use)? (y/N): ') cmdset.g = input().strip().lower().startswith('y') sys.stdout.write('Set up an SSH authority to help manage known_hosts and node to node ssh for all users (-s)? (y/N): ') cmdset.s = input().strip().lower().startswith('y') sys.stdout.write('Update global known hosts on this server to trust local CA certificates (-k)? (y/N): ') cmdset.k = input().strip().lower().startswith('y') sys.stdout.write('Allow managed nodes to ssh to this management node without a password (-l)? (y/N): ') cmdset.l = input().strip().lower().startswith('y') sys.stdout.write('Update tftp directory with binaries to support PXE (-p) (y/N): ') cmdset.p = input().strip().lower().startswith('y') sys.stdout.write('Initialize confluent ssh user key so confluent can execute remote automation (e.g. Ansible plays) (-a) (y/N): ') cmdset.a = input().strip().lower().startswith('y') sys.stdout.write('Generate new TLS certificates for HTTP, replacing any existing certificate (-t)? (y/N): ') cmdset.t = input().strip().lower().startswith('y') if not cmdset.t: print( 'In order to use your own certificate authority, make sure ' 'to put the certificate authority into ' '/var/lib/confluent/public/site/tls/ directory as a .pem file ' 'as well as named (hash).0 where (hash) is the hash of the ' 'subject.') else: didsomething = False if not os.path.exists('/etc/confluent/cfg'): sys.stderr.write('Start confluent service prior to initializng OS deployment\n') sys.exit(1) if cmdset.t or cmdset.s or cmdset.a or cmdset.u: neededuid = os.stat('/etc/confluent').st_uid util.mkdirp('/var/lib/confluent') os.chown('/var/lib/confluent', neededuid, -1) if cmdset.u: didsomething = True if not glob.glob('/root/.ssh/*.pub'): sys.stderr.write('No user keys for root detected, it is recommended ' 'to run ssh-keygen -t ed25519 to generate a user ' 'key. For optimal security, a passphrase should be ' 'used. ssh-agent may be used to make use of a ' 'passphrase protected ssh key easier.\n') sys.exit(1) init_confluent_myname() sshutil.initialize_root_key(False) if cmdset.t: didsomething = True init_confluent_myname() certutil.create_certificate() if os.path.exists('/usr/lib/systemd/system/httpd.service'): try: subprocess.check_call(['systemctl', 'try-restart', 'httpd']) print('HTTP server has been restarted if it was running') except Exception: emprint('New HTTPS certificates generated, restart the web server manually') elif os.path.exists('/usr/lib/systemd/system/apache2.service'): try: subprocess.check_call(['systemctl', 'try-restart', 'apache2']) print('HTTP server has been restarted if it was running') except Exception: emprint('New HTTPS certificates generated, restart the web server manually') else: emprint('New HTTPS certificates generated, restart the web server manually') if cmdset.s: didsomething = True init_confluent_myname() try: sshutil.initialize_ca() except sshutil.AlreadyExists: emprint('Skipping generation of SSH CA, already present and would likely be more problematic to regenerate than to reuse (if absolutely sure you want to discard old CA, then delete /etc/confluent/ssh/ca* and restart confluent)') if cmdset.a: didsomething = True init_confluent_myname() try: sshutil.initialize_root_key(True, True) except sshutil.AlreadyExists: emprint('Skipping generation of new automation key, already present and regeneration usually causes more problems. (If absolutely certain, delete /etc/confluent/ssh/automation* and restart confluent)') if cmdset.p: install_tftp_content() if cmdset.l: local_node_trust_setup() if cmdset.k: cas = set([]) cakeys = set([]) try: with open('/etc/ssh/ssh_known_hosts', 'rb') as skh: for line in skh.read().split(b'\n'): try: cakey = line.split()[3] cakeys.add(cakey) except IndexError: pass if line: cas.add(line) except IOError: pass with open('/etc/ssh/ssh_known_hosts', 'wb') as skh: for ca in cas: skh.write(ca) skh.write(b'\n') for cafile in glob.glob('/var/lib/confluent/public/site/ssh/*.ca'): cacert = open(cafile, 'rb').read() cakey = cacert.split()[1] if cakey in cakeys: continue cacert = b'@cert-authority * ' + cacert skh.write(cacert) if cmdset.g: rc = initialize_genesis() if rc != 0: sys.exit(rc) if not didsomething and (cmdset.k or cmdset.l or cmdset.g or cmdset.p): if cmdset.g: updateboot('genesis-x86_64') sys.exit(0) if not didsomething: sys.stderr.write('Nothing was done, use initialize -i for ' 'interactive mode, or see initialize -h for more options\n') sys.exit(1) tmpname = '/var/lib/confluent/public/site/initramfs.cpio.' for x in bytearray(os.urandom(22)): tmpname += fnamechars[x >> 2] topack = [] opath = os.getcwd() os.chdir('/var/lib/confluent/public/site') totar = [] if not os.path.exists('confluent_uuid'): c = client.Command() async for rsp in c.read('/uuid'): uuid = rsp.get('uuid', {}).get('value', None) if uuid: oum = os.umask(0o11) try: with open('confluent_uuid', 'w') as uuidout: uuidout.write(uuid) uuidout.write('\n') os.chmod('confluent_uuid', 0o644) finally: os.umask(oum) totar.append('confluent_uuid') topack.append('confluent_uuid') if os.path.exists('ssh'): totar.append('ssh') topack.append('ssh/') for currd, _, files in os.walk('ssh'): for fname in files: topack.append(os.path.join(currd, fname)) if os.path.exists('tls'): totar.append('tls') topack.append('tls/') for currd, _, files in os.walk('tls'): for fname in files: topack.append(os.path.join(currd, fname)) with open(tmpname, 'wb') as initramfs: packit = subprocess.Popen(['cpio', '-H', 'newc', '-o'], stdout=initramfs, stdin=subprocess.PIPE) for packfile in topack: if not isinstance(packfile, bytes): packfile = packfile.encode('utf8') packit.stdin.write(packfile) packit.stdin.write(b'\n') packit.stdin.close() res = packit.wait() if res: sys.stderr.write('Error occurred while packing site initramfs') sys.exit(1) oum = os.umask(0o22) try: os.rename(tmpname, '/var/lib/confluent/public/site/initramfs.cpio') os.chmod('/var/lib/confluent/public/site/initramfs.cpio', 0o644) finally: os.umask(oum) oum = os.umask(0o22) try: os.chmod('/var/lib/confluent/public/site/initramfs.cpio', 0o644) finally: os.umask(oum) if cmdset.g: updateboot('genesis-x86_64') if totar: tmptarname = tmpname.replace('cpio', 'tgz') tarcmd = ['tar', '-czf', tmptarname] + totar subprocess.check_call(tarcmd) os.rename(tmptarname, '/var/lib/confluent/public/site/initramfs.tgz') oum = os.umask(0o22) try: os.chmod('/var/lib/confluent/public/site/initramfs.tgz', 0o644) finally: os.umask(0o22) os.chdir(opath) print('Site initramfs content packed successfully') if not os.path.exists('/etc/confluent/srvcert.pem'): subprocess.check_call(['collective', 'gencert']) # TODO: check selinux and segetbool for httpd_can_network_connect # httpd available and enabled? def updateboot(profilename): if not os.path.exists('/var/lib/confluent/public/site/initramfs.cpio'): emprint('Must generate site content first (TLS (-t) and/or SSH (-s))') return 1 c = client.Command() for rsp in c.update('/deployment/profiles/{0}'.format(profilename), {'updateboot': 1}): if 'updated' in rsp: print('Updated: {0}'.format(rsp['updated'])) else: print(repr(rsp)) def rebase(profilename): c = client.Command() for rsp in c.update('/deployment/profiles/{0}'.format(profilename), {'rebase': 1}): if 'updated' in rsp: print('Updated: {0}'.format(rsp['updated'])) elif 'customized' in rsp: print('Skipping update of {0} as current copy was customized or no manifest data was available'.format(rsp['customized'])) elif 'error' in rsp: sys.stderr.write(rsp['error'] + '\n') sys.exit(rsp['errorcode']) else: print(repr(rsp)) def oslist(): c = client.Command() print("Distributions:") for rsp in c.read('/deployment/distributions'): if 'error' in rsp: sys.stderr.write(rsp['error'] + '\n') exitcode = 1 else: print(" " + rsp['item']['href'].replace('/', '')) print("") print("Profiles:") for rsp in c.read('/deployment/profiles'): if 'error' in rsp: sys.stderr.write(rsp['error'] + '\n') exitcode = 1 else: print(" " + rsp['item']['href'].replace('/', '')) print("") def osimport(imagefile, checkonly=False, custname=None): c = client.Command() imagefile = os.path.abspath(imagefile) if c.unixdomain: ofile = open(imagefile, 'rb') try: c.add_file(imagefile, ofile.fileno(), 'rb') except Exception: pass importing = False shortname = None apipath = '/deployment/importing/' if checkonly: apipath = '/deployment/fingerprint/' apiargs = {'filename': imagefile} if custname: apiargs['custname'] = custname for rsp in c.create(apipath, apiargs): if 'target' in rsp: importing = True shortname = rsp['name'] print('Importing from {0} to {1}'.format(imagefile, rsp['target'])) elif 'targetpath' in rsp: tpath = rsp.get('targetpath', None) tname = rsp.get('name', None) oscat = rsp.get('oscategory', None) if tpath: print('Detected target directory: ' + tpath) if tname: print('Detected distribution name: ' + tname) if oscat: print('Detected OS category: ' + oscat) for err in rsp.get('errors', []): sys.stderr.write('Error: ' + err + '\n') elif 'error' in rsp: sys.stderr.write(rsp['error'] + '\n') sys.exit(rsp.get('errorcode', 1)) else: print(repr(rsp)) try: while importing: for rsp in c.read('/deployment/importing/{0}'.format(shortname)): if 'progress' in rsp: sys.stdout.write('{0}: {1:.2f}% \r'.format(rsp['phase'], rsp['progress'])) if rsp['phase'] == 'complete': importing = False sys.stdout.write('\n') for profile in rsp['profiles']: print('Deployment profile created: {0}'.format(profile)) if rsp['phase'] == 'error': sys.stderr.write('{0}\n'.format(rsp['error'])) if 'Permission denied' in rsp['error']: sys.stderr.write('Confluent service unable to write to destination, check that confluent user has access to target\n') return sys.stdout.flush() else: print(repr(rsp)) time.sleep(0.5) finally: if shortname: list(c.delete('/deployment/importing/{0}'.format(shortname))) if __name__ == '__main__': asyncio.get_event_loop().run_until_complete(main(sys.argv))