diff --git a/confluent_server/confluent/selfservice.py b/confluent_server/confluent/selfservice.py index 5e4cee4c..619c383f 100644 --- a/confluent_server/confluent/selfservice.py +++ b/confluent_server/confluent/selfservice.py @@ -1,4 +1,5 @@ import confluent.runansible as runansible +import confluent.syncfiles as syncfiles import confluent.config.configmanager as configmanager import confluent.collective.manager as collective import confluent.netutil as netutil @@ -281,6 +282,18 @@ def handle_request(env, start_response): start_response('200 OK', ()) yield '' return + elif env['PATH_INFO'].startswith('/self/remotesyncfiles'): + if 'POST' == operation: + result = syncfiles.start_syncfiles( + nodename, cfg, json.loads(reqbody)) + start_response(result, ()) + yield '' + return + if 'GET' == operation: + status, output = syncfiles.get_syncresult(nodename) + start_response(status, ()) + yield output + return elif env['PATH_INFO'].startswith('/self/remoteconfig/status'): rst = runansible.running_status.get(nodename, None) if not rst: diff --git a/confluent_server/confluent/sshutil.py b/confluent_server/confluent/sshutil.py index c77f08f1..141f2c2a 100644 --- a/confluent_server/confluent/sshutil.py +++ b/confluent_server/confluent/sshutil.py @@ -128,7 +128,8 @@ def initialize_root_key(generate, automation=False): subprocess.check_call( ['ssh-keygen', '-t', 'ed25519', '-f','/etc/confluent/ssh/automation', '-N', get_passphrase(), - '-C', 'Confluent Automation'], preexec_fn=normalize_uid) + '-C', 'Confluent Automation by {}'.format(myname)], + preexec_fn=normalize_uid) authorized = ['/etc/confluent/ssh/automation.pub'] try: os.makedirs('/var/lib/confluent/public/site/ssh', mode=0o755) diff --git a/confluent_server/confluent/syncfiles.py b/confluent_server/confluent/syncfiles.py index daf29d33..77996b9a 100644 --- a/confluent_server/confluent/syncfiles.py +++ b/confluent_server/confluent/syncfiles.py @@ -19,13 +19,14 @@ import os import shutil import tempfile import confluent.sshutil as sshutil - +import eventlet.green.subprocess as subprocess +import eventlet def mkdirp(path): try: os.makedirs(path) except OSError as e: - if errno != 17: + if e.errno != 17: raise class SyncList(object): @@ -39,11 +40,18 @@ class SyncList(object): entries = slist.split('\n') currmap = self.replacemap for ent in entries: + try: + cmtidx = ent.index('#') + ent = ent[:cmtidx] + except ValueError: + pass ent = ent.strip() + if not ent: + continue if ent[-1] == ':': - if ent == 'APPEND': + if ent == 'APPEND:': currmap = self.appendmap - elif ent == 'MERGE': + elif ent == 'MERGE:': currmap = self.mergemap else: raise Exception( @@ -58,35 +66,90 @@ class SyncList(object): currmap[k] = v -def sync_list_to_node(synclist, node): +def sync_list_to_node(synclist, node, suffixes): targdir = tempfile.mkdtemp('.syncto{}'.format(node)) + output = '' try: sl = SyncList(synclist) for ent in sl.replacemap: - dst = sl.replacemap[ent] - everyfent = [] - allfents = ent.split() - for tmpent in allfents: - fents = glob.glob(tmpent) - if fents: - everyfent.extend(fents) - else: - everyfent.extend(os.path.dirname(tmpent)) - if not everyfent: - raise Exception('No matching files for "{}"'.format(ent)) - fulltarg = os.path.join(targdir, dst) - if dst[-1] == '/' or len(everyfent) > 1 or os.path.isdir(everyfent[0]): - # target *must* be a directory - fulltargdir = fulltarg - else: - fulltargdir = os.path.join(targdir, os.path.dirname(dst)) - mkdirp(fulltargdir) - for targ in everyfent: - if fulltargdir == fulltarg: - os.symlink( - targ, os.path.join( - fulltargdir, os.path.basename(targ))) - else: - os.symlink(targ, fulltarg) + stage_ent(sl.replacemap, ent, targdir) + if 'append' in suffixes: + while suffixes['append'] and suffixes['append'][0] == '/': + suffixes['append'] = suffixes['append'][1:] + for ent in sl.appendmap: + stage_ent(sl.appendmap, ent, + os.path.join(targdir, suffixes['append'])) + if 'merge' in suffixes: + while suffixes['merge'] and suffixes['merge'][0] == '/': + suffixes['merge'] = suffixes['merge'][1:] + for ent in sl.appendmap: + stage_ent(sl.appendmap, ent, + os.path.join(targdir, suffixes['append'])) + sshutil.prep_ssh_key('/etc/confluent/ssh/automation') + output = subprocess.check_output( + ['rsync', '-aL', targdir + '/', 'root@{}:/'.format(node)]) finally: shutil.rmtree(targdir) + return output + +def stage_ent(currmap, ent, targdir): + dst = currmap[ent] + everyfent = [] + allfents = ent.split() + for tmpent in allfents: + fents = glob.glob(tmpent) + if fents: + everyfent.extend(fents) + else: + everyfent.extend(os.path.dirname(tmpent)) + if not everyfent: + raise Exception('No matching files for "{}"'.format(ent)) + while dst and dst[0] == '/': + dst = dst[1:] + fulltarg = os.path.join(targdir, dst) + if dst[-1] == '/' or len(everyfent) > 1 or os.path.isdir(everyfent[0]): + # target *must* be a directory + fulltargdir = fulltarg + else: + fulltargdir = os.path.join(targdir, os.path.dirname(dst)) + mkdirp(fulltargdir) + for targ in everyfent: + if fulltargdir == fulltarg: + os.symlink( + targ, os.path.join( + fulltargdir, os.path.basename(targ))) + else: + os.symlink(targ, fulltarg) + +syncrunners = {} + + +def start_syncfiles(nodename, cfg, suffixes): + deployinfo = cfg.get_node_attributes( + nodename, ('deployment.*',)) + deployinfo = deployinfo.get(nodename, {}) + profile = deployinfo.get( + 'deployment.pendingprofile', {}).get('value', '') + if not profile: + profile = deployinfo.get( + 'deployment.stagedprofile', {}).get('value', '') + if not profile: + profile = deployinfo.get( + 'deployment.profile', {}).get('value', '') + if not profile: + raise Exception('Cannot perform syncfiles without profile assigned') + synclist = '/var/lib/confluent/public/os/{}/syncfiles'.format(profile) + if not os.path.exists(synclist): + return '200 OK' # not running + syncrunners[nodename] = eventlet.spawn( + sync_list_to_node, synclist, nodename, suffixes) + return '202 Queued' # backgrounded + +def get_syncresult(nodename): + if nodename not in syncrunners: + return ('204 Not Running', '') + if not syncrunners[nodename].dead: + return ('200 OK', '') + result = syncrunners[nodename].wait() + del syncrunners[nodename] + return ('200 OK', result)