mirror of
https://github.com/xcat2/confluent.git
synced 2025-01-15 12:17:47 +00:00
b809514ef9
Make sure confluent has made /etc/confluent, and further always initialize the encryption key, as it will almost certainly be needed and easiest to just always generate on first startup.
320 lines
9.5 KiB
Python
320 lines
9.5 KiB
Python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
|
|
|
# Copyright 2014 IBM Corporation
|
|
# Copyright 2015-2017 Lenovo
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
# This is the main application.
|
|
# It should check for existing UDP socket to negotiate socket listen takeover
|
|
# It will have three paths into it:
|
|
# -Unix domain socket
|
|
# -TLS socket
|
|
# -WSGI
|
|
# Additionally, it will be able to receive particular UDP packets to facilitate
|
|
# Things like heartbeating and discovery
|
|
# It also will optionally snoop SLP DA requests
|
|
|
|
import atexit
|
|
import confluent.auth as auth
|
|
import confluent.config.conf as conf
|
|
import confluent.config.configmanager as configmanager
|
|
import confluent.consoleserver as consoleserver
|
|
import confluent.core as confluentcore
|
|
import confluent.httpapi as httpapi
|
|
import confluent.log as log
|
|
import confluent.collective.manager as collective
|
|
import confluent.discovery.protocols.pxe as pxe
|
|
try:
|
|
import confluent.sockapi as sockapi
|
|
except ImportError:
|
|
#On platforms without pwd, give up on the sockapi in general and be http
|
|
#only for now
|
|
pass
|
|
import confluent.discovery.core as disco
|
|
import eventlet
|
|
dbgif = False
|
|
try:
|
|
import eventlet.backdoor as backdoor
|
|
dbgif = True
|
|
except Exception:
|
|
pass
|
|
havefcntl = True
|
|
try:
|
|
import fcntl
|
|
except ImportError:
|
|
havefcntl = False
|
|
#import multiprocessing
|
|
import gc
|
|
from greenlet import greenlet
|
|
import sys
|
|
import os
|
|
import glob
|
|
import signal
|
|
import socket
|
|
import time
|
|
import traceback
|
|
import uuid
|
|
|
|
|
|
def _daemonize():
|
|
if not 'fork' in os.__dict__:
|
|
return
|
|
thispid = os.fork()
|
|
if thispid > 0:
|
|
os.waitpid(thispid, 0)
|
|
os._exit(0)
|
|
os.setsid()
|
|
thispid = os.fork()
|
|
if thispid > 0:
|
|
print('confluent server starting as pid {0}'.format(thispid))
|
|
os._exit(0)
|
|
os.closerange(0, 2)
|
|
os.open(os.devnull, os.O_RDWR)
|
|
os.dup2(0, 1)
|
|
os.dup2(0, 2)
|
|
log.daemonized = True
|
|
|
|
|
|
def _redirectoutput():
|
|
os.umask(63)
|
|
sys.stdout = log.Logger('stdout', buffered=False)
|
|
sys.stderr = log.Logger('stderr', buffered=False)
|
|
|
|
|
|
def _updatepidfile():
|
|
pidfile = open('/var/run/confluent/pid', 'w+')
|
|
fcntl.flock(pidfile, fcntl.LOCK_EX)
|
|
pidfile.write(str(os.getpid()))
|
|
fcntl.flock(pidfile, fcntl.LOCK_UN)
|
|
pidfile.close()
|
|
|
|
|
|
def is_running():
|
|
# Utility function for utilities to check if confluent is running
|
|
try:
|
|
pidfile = open('/var/run/confluent/pid', 'r+')
|
|
fcntl.flock(pidfile, fcntl.LOCK_SH)
|
|
pid = pidfile.read()
|
|
if pid != '':
|
|
try:
|
|
os.kill(int(pid), 0)
|
|
return pid
|
|
except OSError:
|
|
# There is no process running by that pid, must be stale
|
|
pass
|
|
fcntl.flock(pidfile, fcntl.LOCK_UN)
|
|
pidfile.close()
|
|
except IOError:
|
|
pass
|
|
return None
|
|
|
|
|
|
def _checkpidfile():
|
|
try:
|
|
pidfile = open('/var/run/confluent/pid', 'r+')
|
|
fcntl.flock(pidfile, fcntl.LOCK_EX)
|
|
pid = pidfile.read()
|
|
if pid != '':
|
|
try:
|
|
os.kill(int(pid), 0)
|
|
print ('/var/run/confluent/pid exists and indicates %s is still '
|
|
'running' % pid)
|
|
sys.exit(1)
|
|
except OSError:
|
|
# There is no process running by that pid, must be stale
|
|
pass
|
|
pidfile.seek(0)
|
|
pidfile.write(str(os.getpid()))
|
|
fcntl.flock(pidfile, fcntl.LOCK_UN)
|
|
pidfile.close()
|
|
except IOError:
|
|
try:
|
|
pidfile = open('/var/run/confluent/pid', 'w')
|
|
except IOError as e:
|
|
if e.errno != 2:
|
|
raise
|
|
os.makedirs('/var/run/confluent')
|
|
pidfile = open('/var/run/confluent/pid', 'w')
|
|
fcntl.flock(pidfile, fcntl.LOCK_EX)
|
|
pidfile.write(str(os.getpid()))
|
|
fcntl.flock(pidfile, fcntl.LOCK_UN)
|
|
pidfile.close()
|
|
pidfile = open('/var/run/confluent/pid', 'r')
|
|
fcntl.flock(pidfile, fcntl.LOCK_SH)
|
|
pid = pidfile.read()
|
|
if pid != str(os.getpid()):
|
|
print ('/var/run/confluent/pid exists and indicates %s is still '
|
|
'running' % pid)
|
|
sys.exit(1)
|
|
fcntl.flock(pidfile, fcntl.LOCK_UN)
|
|
pidfile.close()
|
|
|
|
|
|
def terminate(signalname, frame):
|
|
sys.exit(0)
|
|
|
|
def dumptrace(signalname, frame):
|
|
ht = open('/var/log/confluent/hangtraces', 'a')
|
|
ht.write('Dumping active trace on ' + time.strftime('%X %x\n'))
|
|
ht.write(''.join(traceback.format_stack(frame)))
|
|
for o in gc.get_objects():
|
|
if not isinstance(o, greenlet):
|
|
continue
|
|
if not o:
|
|
continue
|
|
ht.write('Thread trace: ({0})\n'.format(id(o)))
|
|
ht.write(''.join(traceback.format_stack(o.gr_frame)))
|
|
ht.close()
|
|
|
|
def doexit():
|
|
if not havefcntl:
|
|
return
|
|
try:
|
|
os.remove('/var/run/confluent/dbg.sock')
|
|
except OSError:
|
|
pass
|
|
pidfile = open('/var/run/confluent/pid')
|
|
pid = pidfile.read()
|
|
if pid == str(os.getpid()):
|
|
os.remove('/var/run/confluent/pid')
|
|
|
|
|
|
def _initsecurity(config):
|
|
if config.has_option('security', 'externalcfgkey'):
|
|
keyfile = config.get('security', 'externalcfgkey')
|
|
with open(keyfile, 'r') as keyhandle:
|
|
key = keyhandle.read()
|
|
configmanager.init_masterkey(key)
|
|
# We don't want to os._exit() until sync finishes from
|
|
# init above
|
|
configmanager.ConfigManager.wait_for_sync()
|
|
|
|
|
|
def setlimits():
|
|
try:
|
|
import resource
|
|
currlimit = resource.getrlimit(resource.RLIMIT_NOFILE)
|
|
if currlimit[0] < currlimit[1]:
|
|
resource.setrlimit(
|
|
resource.RLIMIT_NOFILE, (currlimit[1], currlimit[1]))
|
|
except Exception:
|
|
pass
|
|
|
|
def assure_ownership(path):
|
|
try:
|
|
if os.getuid() != os.stat(path).st_uid:
|
|
sys.stderr.write('{} is not owned by confluent user, change ownership\n'.format(path))
|
|
sys.exit(1)
|
|
except OSError as e:
|
|
if e.errno == 13:
|
|
sys.stderr.write('{} is not owned by confluent user, change ownership\n'.format(path))
|
|
sys.exit(1)
|
|
|
|
def sanity_check():
|
|
if os.getuid() == 0:
|
|
return True
|
|
assure_ownership('/etc/confluent')
|
|
assure_ownership('/etc/confluent/cfg')
|
|
for filename in glob.glob('/etc/confluent/cfg/*'):
|
|
assure_ownership(filename)
|
|
assure_ownership('/etc/confluent/privkey.pem')
|
|
assure_ownership('/etc/confluent/srvcert.pem')
|
|
|
|
|
|
def run(args):
|
|
setlimits()
|
|
try:
|
|
signal.signal(signal.SIGUSR1, dumptrace)
|
|
except AttributeError:
|
|
pass # silly windows
|
|
if havefcntl:
|
|
_checkpidfile()
|
|
conf.init_config()
|
|
try:
|
|
config = conf.get_config()
|
|
_initsecurity(config)
|
|
except:
|
|
sys.stderr.write("Error unlocking credential store\n")
|
|
doexit()
|
|
sys.exit(1)
|
|
sanity_check()
|
|
try:
|
|
confluentcore.load_plugins()
|
|
except:
|
|
doexit()
|
|
raise
|
|
try:
|
|
log.log({'info': 'Confluent management service starting'}, flush=True)
|
|
except (OSError, IOError) as e:
|
|
print(repr(e))
|
|
sys.exit(1)
|
|
if '-f' not in args:
|
|
_daemonize()
|
|
if '-o' not in args:
|
|
_redirectoutput()
|
|
if havefcntl:
|
|
_updatepidfile()
|
|
signal.signal(signal.SIGINT, terminate)
|
|
signal.signal(signal.SIGTERM, terminate)
|
|
atexit.register(doexit)
|
|
confluentuuid = configmanager.get_global('confluent_uuid')
|
|
if not confluentuuid:
|
|
confluentuuid = str(uuid.uuid4())
|
|
configmanager.set_global('confluent_uuid', confluentuuid)
|
|
if not configmanager._masterkey:
|
|
configmanager.init_masterkey()
|
|
if dbgif:
|
|
oumask = os.umask(0o077)
|
|
try:
|
|
os.remove('/var/run/confluent/dbg.sock')
|
|
except OSError:
|
|
pass # We are not expecting the file to exist
|
|
try:
|
|
dbgsock = eventlet.listen("/var/run/confluent/dbg.sock",
|
|
family=socket.AF_UNIX)
|
|
eventlet.spawn_n(backdoor.backdoor_server, dbgsock)
|
|
except AttributeError:
|
|
pass # Windows...
|
|
os.umask(oumask)
|
|
collective.startup()
|
|
consoleserver.initialize()
|
|
http_bind_host, http_bind_port = _get_connector_config('http')
|
|
sock_bind_host, sock_bind_port = _get_connector_config('socket')
|
|
try:
|
|
sockservice = sockapi.SockApi(sock_bind_host, sock_bind_port)
|
|
sockservice.start()
|
|
except NameError:
|
|
pass
|
|
webservice = httpapi.HttpApi(http_bind_host, http_bind_port)
|
|
webservice.start()
|
|
while len(list(configmanager.list_collective())) >= 2:
|
|
# If in a collective, stall automatic startup activity
|
|
# until we establish quorum
|
|
try:
|
|
configmanager.check_quorum()
|
|
break
|
|
except Exception:
|
|
eventlet.sleep(0.5)
|
|
disco.start_detection()
|
|
pxe.start_proxydhcp()
|
|
eventlet.sleep(1)
|
|
consoleserver.start_console_sessions()
|
|
while 1:
|
|
eventlet.sleep(100)
|
|
|
|
def _get_connector_config(session):
|
|
host = conf.get_option(session, 'bindhost')
|
|
port = conf.get_int_option(session, 'bindport')
|
|
return (host, port)
|