2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-01-28 03:48:35 +00:00
Jarrod Johnson f68f9f4693 Make syncfile step robust or pause
If syncfiles fails, keep it retrying.

Also, slow down sync checking to avoid hammering the system.

Further, randomized delay to spread highly synchronized requestors.

Block attempts to do multiple concurrent syncfile runs.
2024-04-09 11:07:11 -04:00

303 lines
11 KiB
Python

#!/usr/bin/python3
import random
import time
import subprocess
import importlib
import tempfile
import json
import os
import shutil
import pwd
import grp
from importlib.machinery import SourceFileLoader
try:
apiclient = SourceFileLoader('apiclient', '/opt/confluent/bin/apiclient').load_module()
except FileNotFoundError:
apiclient = SourceFileLoader('apiclient', '/etc/confluent/apiclient').load_module()
def partitionhostsline(line):
comment = ''
try:
cmdidx = line.index('#')
comment = line[cmdidx:]
line = line[:cmdidx].strip()
except ValueError:
pass
if not line:
return '', [], comment
ipaddr, names = line.split(maxsplit=1)
names = names.split()
return ipaddr, names, comment
class HostMerger(object):
def __init__(self):
self.byip = {}
self.byname = {}
self.sourcelines = []
self.targlines = []
def read_source(self, sourcefile):
with open(sourcefile, 'r') as hfile:
self.sourcelines = hfile.read().split('\n')
while not self.sourcelines[-1]:
self.sourcelines = self.sourcelines[:-1]
for x in range(len(self.sourcelines)):
line = self.sourcelines[x]
currip, names, comment = partitionhostsline(line)
if currip:
self.byip[currip] = x
for name in names:
self.byname[name] = x
def read_target(self, targetfile):
with open(targetfile, 'r') as hfile:
lines = hfile.read().split('\n')
if not lines[-1]:
lines = lines[:-1]
for y in range(len(lines)):
line = lines[y]
currip, names, comment = partitionhostsline(line)
if currip in self.byip:
x = self.byip[currip]
if self.sourcelines[x] is None:
# have already consumed this enntry
continue
self.targlines.append(self.sourcelines[x])
self.sourcelines[x] = None
continue
for name in names:
if name in self.byname:
x = self.byname[name]
if self.sourcelines[x] is None:
break
self.targlines.append(self.sourcelines[x])
self.sourcelines[x] = None
break
else:
self.targlines.append(line)
def write_out(self, targetfile):
while not self.targlines[-1]:
self.targlines = self.targlines[:-1]
if not self.targlines:
break
while not self.sourcelines[-1]:
self.sourcelines = self.sourcelines[:-1]
if not self.sourcelines:
break
with open(targetfile, 'w') as hosts:
for line in self.targlines:
hosts.write(line + '\n')
for line in self.sourcelines:
if line is not None:
hosts.write(line + '\n')
class CredMerger:
def __init__(self):
try:
with open('/etc/login.defs', 'r') as ldefs:
defs = ldefs.read().split('\n')
except FileNotFoundError:
defs = []
lkup = {}
self.discardnames = {}
self.shadowednames = {}
for line in defs:
try:
line = line[:line.index('#')]
except ValueError:
pass
keyval = line.split()
if len(keyval) < 2:
continue
lkup[keyval[0]] = keyval[1]
self.uidmin = int(lkup.get('UID_MIN', 1000))
self.uidmax = int(lkup.get('UID_MAX', 60000))
self.gidmin = int(lkup.get('GID_MIN', 1000))
self.gidmax = int(lkup.get('GID_MAX', 60000))
self.shadowlines = None
def read_passwd(self, source, targfile=False):
self.read_generic(source, self.uidmin, self.uidmax, targfile)
def read_group(self, source, targfile=False):
self.read_generic(source, self.gidmin, self.gidmax, targfile)
def read_generic(self, source, minid, maxid, targfile):
if targfile:
self.targdata = []
else:
self.sourcedata = []
with open(source, 'r') as inputfile:
for line in inputfile.read().split('\n'):
try:
name, _, uid, _ = line.split(':', 3)
uid = int(uid)
except ValueError:
continue
if targfile:
if uid < minid or uid > maxid:
self.targdata.append(line)
else:
self.discardnames[name] = 1
else:
if name[0] in ('+', '#', '@'):
self.sourcedata.append(line)
elif uid >= minid and uid <= maxid:
self.sourcedata.append(line)
def read_shadow(self, source):
self.shadowlines = []
try:
with open(source, 'r') as inshadow:
for line in inshadow.read().split('\n'):
try:
name, _ = line.split(':' , 1)
except ValueError:
continue
if name in self.discardnames:
continue
self.shadowednames[name] = 1
self.shadowlines.append(line)
except FileNotFoundError:
return
def write_out(self, outfile):
with open(outfile, 'w') as targ:
for line in self.targdata:
targ.write(line + '\n')
for line in self.sourcedata:
targ.write(line + '\n')
if outfile == '/etc/passwd':
if self.shadowlines is None:
self.read_shadow('/etc/shadow')
with open('/etc/shadow', 'w') as shadout:
for line in self.shadowlines:
shadout.write(line + '\n')
for line in self.sourcedata:
name, _ = line.split(':', 1)
if name[0] in ('+', '#', '@'):
continue
if name in self.shadowednames:
continue
shadout.write(name + ':!:::::::\n')
if outfile == '/etc/group':
if self.shadowlines is None:
self.read_shadow('/etc/gshadow')
with open('/etc/gshadow', 'w') as shadout:
for line in self.shadowlines:
shadout.write(line + '\n')
for line in self.sourcedata:
name, _ = line.split(':' , 1)
if name in self.shadowednames:
continue
shadout.write(name + ':!::\n')
def appendonce(basepath, filename):
with open(filename, 'rb') as filehdl:
thedata = filehdl.read()
targname = filename.replace(basepath, '')
try:
with open(targname, 'rb') as filehdl:
targdata = filehdl.read()
except IOError:
targdata = b''
if thedata in targdata:
return
with open(targname, 'ab') as targhdl:
targhdl.write(thedata)
def synchronize():
tmpdir = tempfile.mkdtemp()
appendoncedir = tempfile.mkdtemp()
try:
ac = apiclient.HTTPSClient()
myips = []
ipaddrs = subprocess.check_output(['ip', '-br', 'a']).split(b'\n')
for line in ipaddrs:
isa = line.split()
if len(isa) < 3 or isa[1] != b'UP':
continue
for addr in isa[2:]:
if addr.startswith(b'fe80::') or addr.startswith(b'169.254'):
continue
addr = addr.split(b'/')[0]
if not isinstance(addr, str):
addr = addr.decode('utf8')
myips.append(addr)
data = json.dumps({'merge': tmpdir, 'appendonce': appendoncedir, 'myips': myips})
status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles', data)
if status >= 300:
sys.stderr.write("Error starting syncfiles - {}:\n".format(status))
sys.stderr.write(repr(rsp))
return status
if status == 202:
lastrsp = ''
while status != 204:
time.sleep(1+(2*random.random(a)))
status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles')
if not isinstance(rsp, str):
rsp = rsp.decode('utf8')
if status == 200:
lastrsp = rsp
pendpasswd = os.path.join(tmpdir, 'etc/passwd')
if os.path.exists(pendpasswd):
cm = CredMerger()
cm.read_passwd(pendpasswd, targfile=False)
cm.read_passwd('/etc/passwd', targfile=True)
cm.write_out('/etc/passwd')
pendgroup = os.path.join(tmpdir, 'etc/group')
if os.path.exists(pendgroup):
cm = CredMerger()
cm.read_group(pendgroup, targfile=False)
cm.read_group('/etc/group', targfile=True)
cm.write_out('/etc/group')
pendhosts = os.path.join(tmpdir, 'etc/hosts')
if os.path.exists(pendhosts):
cm = HostMerger()
cm.read_source(pendhosts)
cm.read_target('/etc/hosts')
cm.write_out('/etc/hosts')
for dirn in os.walk(appendoncedir):
for filen in dirn[2]:
appendonce(appendoncedir, os.path.join(dirn[0], filen))
if lastrsp:
lastrsp = json.loads(lastrsp)
opts = lastrsp.get('options', {})
for fname in opts:
uid = -1
gid = -1
for opt in opts[fname]:
if opt == 'owner':
try:
uid = pwd.getpwnam(opts[fname][opt]['name']).pw_uid
except KeyError:
uid = opts[fname][opt]['id']
elif opt == 'group':
try:
gid = grp.getgrnam(opts[fname][opt]['name']).gr_gid
except KeyError:
gid = opts[fname][opt]['id']
elif opt == 'permissions':
os.chmod(fname, int(opts[fname][opt], 8))
if uid != -1 or gid != -1:
os.chown(fname, uid, gid)
return status
finally:
shutil.rmtree(tmpdir)
shutil.rmtree(appendoncedir)
if __name__ == '__main__':
status = 202
while status not in (204, 200):
try:
status = synchronize()
except Exception as e:
sys.stderr.write(str(e))
status = 300
if status not in (204, 200):
time.sleep((random.random()*3)+2)