2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-02-15 01:59:48 +00:00

Implement syncfiles server side

This commit is contained in:
Jarrod Johnson 2021-03-24 16:00:54 -04:00
parent d650f11255
commit 35ef6170ba
3 changed files with 108 additions and 31 deletions

View File

@ -1,4 +1,5 @@
import confluent.runansible as runansible
import confluent.syncfiles as syncfiles
import confluent.config.configmanager as configmanager
import confluent.collective.manager as collective
import confluent.netutil as netutil
@ -281,6 +282,18 @@ def handle_request(env, start_response):
start_response('200 OK', ())
yield ''
return
elif env['PATH_INFO'].startswith('/self/remotesyncfiles'):
if 'POST' == operation:
result = syncfiles.start_syncfiles(
nodename, cfg, json.loads(reqbody))
start_response(result, ())
yield ''
return
if 'GET' == operation:
status, output = syncfiles.get_syncresult(nodename)
start_response(status, ())
yield output
return
elif env['PATH_INFO'].startswith('/self/remoteconfig/status'):
rst = runansible.running_status.get(nodename, None)
if not rst:

View File

@ -128,7 +128,8 @@ def initialize_root_key(generate, automation=False):
subprocess.check_call(
['ssh-keygen', '-t', 'ed25519',
'-f','/etc/confluent/ssh/automation', '-N', get_passphrase(),
'-C', 'Confluent Automation'], preexec_fn=normalize_uid)
'-C', 'Confluent Automation by {}'.format(myname)],
preexec_fn=normalize_uid)
authorized = ['/etc/confluent/ssh/automation.pub']
try:
os.makedirs('/var/lib/confluent/public/site/ssh', mode=0o755)

View File

@ -19,13 +19,14 @@ import os
import shutil
import tempfile
import confluent.sshutil as sshutil
import eventlet.green.subprocess as subprocess
import eventlet
def mkdirp(path):
try:
os.makedirs(path)
except OSError as e:
if errno != 17:
if e.errno != 17:
raise
class SyncList(object):
@ -39,11 +40,18 @@ class SyncList(object):
entries = slist.split('\n')
currmap = self.replacemap
for ent in entries:
try:
cmtidx = ent.index('#')
ent = ent[:cmtidx]
except ValueError:
pass
ent = ent.strip()
if not ent:
continue
if ent[-1] == ':':
if ent == 'APPEND':
if ent == 'APPEND:':
currmap = self.appendmap
elif ent == 'MERGE':
elif ent == 'MERGE:':
currmap = self.mergemap
else:
raise Exception(
@ -58,35 +66,90 @@ class SyncList(object):
currmap[k] = v
def sync_list_to_node(synclist, node):
def sync_list_to_node(synclist, node, suffixes):
targdir = tempfile.mkdtemp('.syncto{}'.format(node))
output = ''
try:
sl = SyncList(synclist)
for ent in sl.replacemap:
dst = sl.replacemap[ent]
everyfent = []
allfents = ent.split()
for tmpent in allfents:
fents = glob.glob(tmpent)
if fents:
everyfent.extend(fents)
else:
everyfent.extend(os.path.dirname(tmpent))
if not everyfent:
raise Exception('No matching files for "{}"'.format(ent))
fulltarg = os.path.join(targdir, dst)
if dst[-1] == '/' or len(everyfent) > 1 or os.path.isdir(everyfent[0]):
# target *must* be a directory
fulltargdir = fulltarg
else:
fulltargdir = os.path.join(targdir, os.path.dirname(dst))
mkdirp(fulltargdir)
for targ in everyfent:
if fulltargdir == fulltarg:
os.symlink(
targ, os.path.join(
fulltargdir, os.path.basename(targ)))
else:
os.symlink(targ, fulltarg)
stage_ent(sl.replacemap, ent, targdir)
if 'append' in suffixes:
while suffixes['append'] and suffixes['append'][0] == '/':
suffixes['append'] = suffixes['append'][1:]
for ent in sl.appendmap:
stage_ent(sl.appendmap, ent,
os.path.join(targdir, suffixes['append']))
if 'merge' in suffixes:
while suffixes['merge'] and suffixes['merge'][0] == '/':
suffixes['merge'] = suffixes['merge'][1:]
for ent in sl.appendmap:
stage_ent(sl.appendmap, ent,
os.path.join(targdir, suffixes['append']))
sshutil.prep_ssh_key('/etc/confluent/ssh/automation')
output = subprocess.check_output(
['rsync', '-aL', targdir + '/', 'root@{}:/'.format(node)])
finally:
shutil.rmtree(targdir)
return output
def stage_ent(currmap, ent, targdir):
dst = currmap[ent]
everyfent = []
allfents = ent.split()
for tmpent in allfents:
fents = glob.glob(tmpent)
if fents:
everyfent.extend(fents)
else:
everyfent.extend(os.path.dirname(tmpent))
if not everyfent:
raise Exception('No matching files for "{}"'.format(ent))
while dst and dst[0] == '/':
dst = dst[1:]
fulltarg = os.path.join(targdir, dst)
if dst[-1] == '/' or len(everyfent) > 1 or os.path.isdir(everyfent[0]):
# target *must* be a directory
fulltargdir = fulltarg
else:
fulltargdir = os.path.join(targdir, os.path.dirname(dst))
mkdirp(fulltargdir)
for targ in everyfent:
if fulltargdir == fulltarg:
os.symlink(
targ, os.path.join(
fulltargdir, os.path.basename(targ)))
else:
os.symlink(targ, fulltarg)
syncrunners = {}
def start_syncfiles(nodename, cfg, suffixes):
deployinfo = cfg.get_node_attributes(
nodename, ('deployment.*',))
deployinfo = deployinfo.get(nodename, {})
profile = deployinfo.get(
'deployment.pendingprofile', {}).get('value', '')
if not profile:
profile = deployinfo.get(
'deployment.stagedprofile', {}).get('value', '')
if not profile:
profile = deployinfo.get(
'deployment.profile', {}).get('value', '')
if not profile:
raise Exception('Cannot perform syncfiles without profile assigned')
synclist = '/var/lib/confluent/public/os/{}/syncfiles'.format(profile)
if not os.path.exists(synclist):
return '200 OK' # not running
syncrunners[nodename] = eventlet.spawn(
sync_list_to_node, synclist, nodename, suffixes)
return '202 Queued' # backgrounded
def get_syncresult(nodename):
if nodename not in syncrunners:
return ('204 Not Running', '')
if not syncrunners[nodename].dead:
return ('200 OK', '')
result = syncrunners[nodename].wait()
del syncrunners[nodename]
return ('200 OK', result)