2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-02-05 21:42:24 +00:00
Jarrod Johnson 7341164f36 Have pure proxyDHCP trigger discovery and logs
For users that fully delegate core DHCP, provide discovery and
logging for PXE as it comes in.
2022-11-22 09:09:21 -05:00

338 lines
10 KiB
Python

# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 IBM Corporation
# Copyright 2015-2017 Lenovo
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This is the main application.
# It should check for existing UDP socket to negotiate socket listen takeover
# It will have three paths into it:
# -Unix domain socket
# -TLS socket
# -WSGI
# Additionally, it will be able to receive particular UDP packets to facilitate
# Things like heartbeating and discovery
# It also will optionally snoop SLP DA requests
import atexit
import confluent.auth as auth
import confluent.config.conf as conf
import confluent.config.configmanager as configmanager
try:
import anydbm as dbm
except ModuleNotFoundError:
import dbm
import confluent.consoleserver as consoleserver
import confluent.core as confluentcore
import confluent.httpapi as httpapi
import confluent.log as log
import confluent.collective.manager as collective
import confluent.discovery.protocols.pxe as pxe
try:
import confluent.sockapi as sockapi
except ImportError:
#On platforms without pwd, give up on the sockapi in general and be http
#only for now
pass
import confluent.discovery.core as disco
import eventlet
dbgif = False
try:
import eventlet.backdoor as backdoor
dbgif = True
except Exception:
pass
havefcntl = True
try:
import fcntl
except ImportError:
havefcntl = False
#import multiprocessing
import gc
from greenlet import greenlet
import sys
import os
import glob
import signal
import socket
import subprocess
import time
import traceback
import tempfile
import uuid
def _daemonize():
if not 'fork' in os.__dict__:
return
thispid = os.fork()
if thispid > 0:
os.waitpid(thispid, 0)
os._exit(0)
os.setsid()
thispid = os.fork()
if thispid > 0:
print('confluent server starting as pid {0}'.format(thispid))
os._exit(0)
os.closerange(0, 2)
os.open(os.devnull, os.O_RDWR)
os.dup2(0, 1)
os.dup2(0, 2)
log.daemonized = True
def _redirectoutput():
os.umask(63)
sys.stdout = log.Logger('stdout', buffered=False)
sys.stderr = log.Logger('stderr', buffered=False)
def _updatepidfile():
pidfile = open('/var/run/confluent/pid', 'w+')
fcntl.flock(pidfile, fcntl.LOCK_EX)
pidfile.write(str(os.getpid()))
fcntl.flock(pidfile, fcntl.LOCK_UN)
pidfile.close()
def is_running():
# Utility function for utilities to check if confluent is running
try:
pidfile = open('/var/run/confluent/pid', 'r+')
fcntl.flock(pidfile, fcntl.LOCK_SH)
pid = pidfile.read()
if pid != '':
try:
os.kill(int(pid), 0)
return pid
except OSError:
# There is no process running by that pid, must be stale
pass
fcntl.flock(pidfile, fcntl.LOCK_UN)
pidfile.close()
except IOError:
pass
return None
def _checkpidfile():
try:
pidfile = open('/var/run/confluent/pid', 'r+')
fcntl.flock(pidfile, fcntl.LOCK_EX)
pid = pidfile.read()
if pid != '':
try:
os.kill(int(pid), 0)
print ('/var/run/confluent/pid exists and indicates %s is still '
'running' % pid)
sys.exit(1)
except OSError:
# There is no process running by that pid, must be stale
pass
pidfile.seek(0)
pidfile.write(str(os.getpid()))
fcntl.flock(pidfile, fcntl.LOCK_UN)
pidfile.close()
except IOError:
try:
pidfile = open('/var/run/confluent/pid', 'w')
except IOError as e:
if e.errno != 2:
raise
os.makedirs('/var/run/confluent')
pidfile = open('/var/run/confluent/pid', 'w')
fcntl.flock(pidfile, fcntl.LOCK_EX)
pidfile.write(str(os.getpid()))
fcntl.flock(pidfile, fcntl.LOCK_UN)
pidfile.close()
pidfile = open('/var/run/confluent/pid', 'r')
fcntl.flock(pidfile, fcntl.LOCK_SH)
pid = pidfile.read()
if pid != str(os.getpid()):
print ('/var/run/confluent/pid exists and indicates %s is still '
'running' % pid)
sys.exit(1)
fcntl.flock(pidfile, fcntl.LOCK_UN)
pidfile.close()
def terminate(signalname, frame):
sys.exit(0)
def dumptrace(signalname, frame):
ht = open('/var/log/confluent/hangtraces', 'a')
ht.write('Dumping active trace on ' + time.strftime('%X %x\n'))
ht.write(''.join(traceback.format_stack(frame)))
for o in gc.get_objects():
if not isinstance(o, greenlet):
continue
if not o:
continue
ht.write('Thread trace: ({0})\n'.format(id(o)))
ht.write(''.join(traceback.format_stack(o.gr_frame)))
ht.close()
def doexit():
if not havefcntl:
return
try:
os.remove('/var/run/confluent/dbg.sock')
except OSError:
pass
pidfile = open('/var/run/confluent/pid')
pid = pidfile.read()
if pid == str(os.getpid()):
os.remove('/var/run/confluent/pid')
def _initsecurity(config):
if config.has_option('security', 'externalcfgkey'):
keyfile = config.get('security', 'externalcfgkey')
with open(keyfile, 'r') as keyhandle:
key = keyhandle.read()
configmanager.init_masterkey(key)
# We don't want to os._exit() until sync finishes from
# init above
configmanager.ConfigManager.wait_for_sync()
def setlimits():
try:
import resource
currlimit = resource.getrlimit(resource.RLIMIT_NOFILE)
if currlimit[0] < currlimit[1]:
resource.setrlimit(
resource.RLIMIT_NOFILE, (currlimit[1], currlimit[1]))
except Exception:
pass
def assure_ownership(path):
try:
if os.getuid() != os.stat(path).st_uid:
sys.stderr.write('{} is not owned by confluent user, change ownership\n'.format(path))
sys.exit(1)
except OSError as e:
if e.errno == 13:
sys.stderr.write('{} is not owned by confluent user, change ownership\n'.format(path))
sys.exit(1)
def sanity_check():
if os.getuid() == 0:
return True
assure_ownership('/etc/confluent')
assure_ownership('/etc/confluent/cfg')
for filename in glob.glob('/etc/confluent/cfg/*'):
assure_ownership(filename)
assure_ownership('/etc/confluent/privkey.pem')
assure_ownership('/etc/confluent/srvcert.pem')
def migrate_db():
tdir = tempfile.mkdtemp()
subprocess.check_call(['python3', '-c', 'pass'])
subprocess.check_call(['python2', '/opt/confluent/bin/confluentdbutil', 'dump', '-u', tdir])
subprocess.check_call(['python3', '/opt/confluent/bin/confluentdbutil', 'restore', '-u', tdir])
subprocess.check_call(['rm', '-rf', tdir])
configmanager.init()
def run(args):
setlimits()
try:
configmanager.ConfigManager(None)
except dbm.error:
migrate_db()
try:
signal.signal(signal.SIGUSR1, dumptrace)
except AttributeError:
pass # silly windows
if havefcntl:
_checkpidfile()
conf.init_config()
try:
config = conf.get_config()
_initsecurity(config)
except:
sys.stderr.write("Error unlocking credential store\n")
doexit()
sys.exit(1)
sanity_check()
try:
confluentcore.load_plugins()
except:
doexit()
raise
try:
log.log({'info': 'Confluent management service starting'}, flush=True)
except (OSError, IOError) as e:
print(repr(e))
sys.exit(1)
if '-f' not in args:
_daemonize()
if '-o' not in args:
_redirectoutput()
if havefcntl:
_updatepidfile()
signal.signal(signal.SIGINT, terminate)
signal.signal(signal.SIGTERM, terminate)
atexit.register(doexit)
confluentuuid = configmanager.get_global('confluent_uuid')
if not confluentuuid:
confluentuuid = str(uuid.uuid4())
configmanager.set_global('confluent_uuid', confluentuuid)
if not configmanager._masterkey:
configmanager.init_masterkey()
if dbgif:
oumask = os.umask(0o077)
try:
os.remove('/var/run/confluent/dbg.sock')
except OSError:
pass # We are not expecting the file to exist
try:
dbgsock = eventlet.listen("/var/run/confluent/dbg.sock",
family=socket.AF_UNIX)
eventlet.spawn_n(backdoor.backdoor_server, dbgsock)
except AttributeError:
pass # Windows...
os.umask(oumask)
collective.startup()
consoleserver.initialize()
http_bind_host, http_bind_port = _get_connector_config('http')
sock_bind_host, sock_bind_port = _get_connector_config('socket')
try:
sockservice = sockapi.SockApi(sock_bind_host, sock_bind_port)
sockservice.start()
except NameError:
pass
webservice = httpapi.HttpApi(http_bind_host, http_bind_port)
webservice.start()
while len(list(configmanager.list_collective())) >= 2:
# If in a collective, stall automatic startup activity
# until we establish quorum
try:
configmanager.check_quorum()
break
except Exception:
eventlet.sleep(0.5)
disco.start_detection()
eventlet.sleep(1)
consoleserver.start_console_sessions()
while 1:
eventlet.sleep(100)
def _get_connector_config(session):
host = conf.get_option(session, 'bindhost')
port = conf.get_int_option(session, 'bindport')
return (host, port)