mirror of
https://github.com/xcat2/confluent.git
synced 2024-11-28 20:39:40 +00:00
f68f9f4693
If syncfiles fails, keep it retrying. Also, slow down sync checking to avoid hammering the system. Further, randomized delay to spread highly synchronized requestors. Block attempts to do multiple concurrent syncfile runs.
282 lines
10 KiB
Python
282 lines
10 KiB
Python
#!/usr/bin/python
|
|
import importlib
|
|
import tempfile
|
|
import json
|
|
import os
|
|
import shutil
|
|
import pwd
|
|
import time
|
|
import grp
|
|
try:
|
|
from importlib.machinery import SourceFileLoader
|
|
def load_source(mod, path):
|
|
return SourceFileLoader(mod, path).load_module()
|
|
except ImportError:
|
|
from imp import load_source
|
|
|
|
try:
|
|
apiclient = load_source('apiclient', '/opt/confluent/bin/apiclient')
|
|
except IOError:
|
|
apiclient = load_source('apiclient', '/etc/confluent/apiclient')
|
|
|
|
|
|
|
|
def partitionhostsline(line):
|
|
comment = ''
|
|
try:
|
|
cmdidx = line.index('#')
|
|
comment = line[cmdidx:]
|
|
line = line[:cmdidx].strip()
|
|
except ValueError:
|
|
pass
|
|
if not line:
|
|
return '', [], comment
|
|
ipaddr, names = line.split(maxsplit=1)
|
|
names = names.split()
|
|
return ipaddr, names, comment
|
|
|
|
class HostMerger(object):
|
|
def __init__(self):
|
|
self.byip = {}
|
|
self.byname = {}
|
|
self.sourcelines = []
|
|
self.targlines = []
|
|
|
|
def read_source(self, sourcefile):
|
|
with open(sourcefile, 'r') as hfile:
|
|
self.sourcelines = hfile.read().split('\n')
|
|
while not self.sourcelines[-1]:
|
|
self.sourcelines = self.sourcelines[:-1]
|
|
for x in range(len(self.sourcelines)):
|
|
line = self.sourcelines[x]
|
|
currip, names, comment = partitionhostsline(line)
|
|
if currip:
|
|
self.byip[currip] = x
|
|
for name in names:
|
|
self.byname[name] = x
|
|
|
|
def read_target(self, targetfile):
|
|
with open(targetfile, 'r') as hfile:
|
|
lines = hfile.read().split('\n')
|
|
if not lines[-1]:
|
|
lines = lines[:-1]
|
|
for y in range(len(lines)):
|
|
line = lines[y]
|
|
currip, names, comment = partitionhostsline(line)
|
|
if currip in self.byip:
|
|
x = self.byip[currip]
|
|
if self.sourcelines[x] is None:
|
|
# have already consumed this enntry
|
|
continue
|
|
self.targlines.append(self.sourcelines[x])
|
|
self.sourcelines[x] = None
|
|
continue
|
|
for name in names:
|
|
if name in self.byname:
|
|
x = self.byname[name]
|
|
if self.sourcelines[x] is None:
|
|
break
|
|
self.targlines.append(self.sourcelines[x])
|
|
self.sourcelines[x] = None
|
|
break
|
|
else:
|
|
self.targlines.append(line)
|
|
|
|
def write_out(self, targetfile):
|
|
while not self.targlines[-1]:
|
|
self.targlines = self.targlines[:-1]
|
|
if not self.targlines:
|
|
break
|
|
while not self.sourcelines[-1]:
|
|
self.sourcelines = self.sourcelines[:-1]
|
|
if not self.sourcelines:
|
|
break
|
|
with open(targetfile, 'w') as hosts:
|
|
for line in self.targlines:
|
|
hosts.write(line + '\n')
|
|
for line in self.sourcelines:
|
|
if line is not None:
|
|
hosts.write(line + '\n')
|
|
|
|
|
|
class CredMerger:
|
|
def __init__(self):
|
|
try:
|
|
with open('/etc/login.defs', 'r') as ldefs:
|
|
defs = ldefs.read().split('\n')
|
|
except FileNotFoundError:
|
|
defs = []
|
|
lkup = {}
|
|
self.discardnames = {}
|
|
self.shadowednames = {}
|
|
for line in defs:
|
|
try:
|
|
line = line[:line.index('#')]
|
|
except ValueError:
|
|
pass
|
|
keyval = line.split()
|
|
if len(keyval) < 2:
|
|
continue
|
|
lkup[keyval[0]] = keyval[1]
|
|
self.uidmin = int(lkup.get('UID_MIN', 1000))
|
|
self.uidmax = int(lkup.get('UID_MAX', 60000))
|
|
self.gidmin = int(lkup.get('GID_MIN', 1000))
|
|
self.gidmax = int(lkup.get('GID_MAX', 60000))
|
|
self.shadowlines = None
|
|
|
|
def read_passwd(self, source, targfile=False):
|
|
self.read_generic(source, self.uidmin, self.uidmax, targfile)
|
|
|
|
def read_group(self, source, targfile=False):
|
|
self.read_generic(source, self.gidmin, self.gidmax, targfile)
|
|
|
|
def read_generic(self, source, minid, maxid, targfile):
|
|
if targfile:
|
|
self.targdata = []
|
|
else:
|
|
self.sourcedata = []
|
|
with open(source, 'r') as inputfile:
|
|
for line in inputfile.read().split('\n'):
|
|
try:
|
|
name, _, uid, _ = line.split(':', 3)
|
|
uid = int(uid)
|
|
except ValueError:
|
|
continue
|
|
if targfile:
|
|
if uid < minid or uid > maxid:
|
|
self.targdata.append(line)
|
|
else:
|
|
self.discardnames[name] = 1
|
|
else:
|
|
if name[0] in ('+', '#', '@'):
|
|
self.sourcedata.append(line)
|
|
elif uid >= minid and uid <= maxid:
|
|
self.sourcedata.append(line)
|
|
|
|
def read_shadow(self, source):
|
|
self.shadowlines = []
|
|
try:
|
|
with open(source, 'r') as inshadow:
|
|
for line in inshadow.read().split('\n'):
|
|
try:
|
|
name, _ = line.split(':' , 1)
|
|
except ValueError:
|
|
continue
|
|
if name in self.discardnames:
|
|
continue
|
|
self.shadowednames[name] = 1
|
|
self.shadowlines.append(line)
|
|
except FileNotFoundError:
|
|
return
|
|
|
|
def write_out(self, outfile):
|
|
with open(outfile, 'w') as targ:
|
|
for line in self.targdata:
|
|
targ.write(line + '\n')
|
|
for line in self.sourcedata:
|
|
targ.write(line + '\n')
|
|
if outfile == '/etc/passwd':
|
|
if self.shadowlines is None:
|
|
self.read_shadow('/etc/shadow')
|
|
with open('/etc/shadow', 'w') as shadout:
|
|
for line in self.shadowlines:
|
|
shadout.write(line + '\n')
|
|
for line in self.sourcedata:
|
|
name, _ = line.split(':', 1)
|
|
if name[0] in ('+', '#', '@'):
|
|
continue
|
|
if name in self.shadowednames:
|
|
continue
|
|
shadout.write(name + ':!:::::::\n')
|
|
if outfile == '/etc/group':
|
|
if self.shadowlines is None:
|
|
self.read_shadow('/etc/gshadow')
|
|
with open('/etc/gshadow', 'w') as shadout:
|
|
for line in self.shadowlines:
|
|
shadout.write(line + '\n')
|
|
for line in self.sourcedata:
|
|
name, _ = line.split(':' , 1)
|
|
if name in self.shadowednames:
|
|
continue
|
|
shadout.write(name + ':!::\n')
|
|
|
|
def appendonce(basepath, filename):
|
|
with open(filename, 'rb') as filehdl:
|
|
thedata = filehdl.read()
|
|
targname = filename.replace(basepath, '')
|
|
try:
|
|
with open(targname, 'rb') as filehdl:
|
|
targdata = filehdl.read()
|
|
except IOError:
|
|
targdata = b''
|
|
if thedata in targdata:
|
|
return
|
|
with open(targname, 'ab') as targhdl:
|
|
targhdl.write(thedata)
|
|
|
|
def synchronize():
|
|
tmpdir = tempfile.mkdtemp()
|
|
appendoncedir = tempfile.mkdtemp()
|
|
try:
|
|
ac = apiclient.HTTPSClient()
|
|
data = json.dumps({'merge': tmpdir, 'appendonce': appendoncedir})
|
|
status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles', data)
|
|
if status == 202:
|
|
lastrsp = ''
|
|
while status != 204:
|
|
time.sleep(2)
|
|
status, rsp = ac.grab_url_with_status('/confluent-api/self/remotesyncfiles')
|
|
if not isinstance(rsp, str):
|
|
rsp = rsp.decode('utf8')
|
|
if status == 200:
|
|
lastrsp = rsp
|
|
pendpasswd = os.path.join(tmpdir, 'etc/passwd')
|
|
if os.path.exists(pendpasswd):
|
|
cm = CredMerger()
|
|
cm.read_passwd(pendpasswd, targfile=False)
|
|
cm.read_passwd('/etc/passwd', targfile=True)
|
|
cm.write_out('/etc/passwd')
|
|
pendgroup = os.path.join(tmpdir, 'etc/group')
|
|
if os.path.exists(pendgroup):
|
|
cm = CredMerger()
|
|
cm.read_group(pendgroup, targfile=False)
|
|
cm.read_group('/etc/group', targfile=True)
|
|
cm.write_out('/etc/group')
|
|
pendhosts = os.path.join(tmpdir, 'etc/hosts')
|
|
if os.path.exists(pendhosts):
|
|
cm = HostMerger()
|
|
cm.read_source(pendhosts)
|
|
cm.read_target('/etc/hosts')
|
|
cm.write_out('/etc/hosts')
|
|
for dirn in os.walk(appendoncedir):
|
|
for filen in dirn[2]:
|
|
appendonce(appendoncedir, os.path.join(dirn[0], filen))
|
|
if lastrsp:
|
|
lastrsp = json.loads(lastrsp)
|
|
opts = lastrsp.get('options', {})
|
|
for fname in opts:
|
|
uid = -1
|
|
gid = -1
|
|
for opt in opts[fname]:
|
|
if opt == 'owner':
|
|
try:
|
|
uid = pwd.getpwnam(opts[fname][opt]['name']).pw_uid
|
|
except KeyError:
|
|
uid = opts[fname][opt]['id']
|
|
elif opt == 'group':
|
|
try:
|
|
gid = grp.getgrnam(opts[fname][opt]['name']).gr_gid
|
|
except KeyError:
|
|
gid = opts[fname][opt]['id']
|
|
elif opt == 'permissions':
|
|
os.chmod(fname, int(opts[fname][opt], 8))
|
|
if uid != -1 or gid != -1:
|
|
os.chown(fname, uid, gid)
|
|
finally:
|
|
shutil.rmtree(tmpdir)
|
|
shutil.rmtree(appendoncedir)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
synchronize()
|