From cf4e8713ff86370cce010a641697a638bff841aa Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Fri, 18 Apr 2014 14:34:16 -0400 Subject: [PATCH] Daemonize and honor pid file lock for exclusive execution --- confluent/httpapi.py | 21 +++++++++---- confluent/main.py | 70 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 86 insertions(+), 5 deletions(-) diff --git a/confluent/httpapi.py b/confluent/httpapi.py index 1faedbd0..4201b239 100644 --- a/confluent/httpapi.py +++ b/confluent/httpapi.py @@ -168,15 +168,15 @@ def _authorize_request(env, operation): cookie['confluentsessionid']['secure'] = 1 cookie['confluentsessionid']['httponly'] = 1 cookie['confluentsessionid']['path'] = '/' - auditmsg = { - 'user': name, - 'operation': operation, - 'target': env['PATH_INFO'], - } skiplog = False if '/console/session' in env['PATH_INFO']: skiplog = True if authdata: + auditmsg = { + 'user': name, + 'operation': operation, + 'target': env['PATH_INFO'], + } authinfo = {'code': 200, 'cookie': cookie, 'cfgmgr': authdata[1], @@ -229,6 +229,17 @@ def _assign_consessionid(consolesession): def resourcehandler(env, start_response): + try: + for rsp in resourcehandler_backend(env, start_response): + yield rsp + except: + tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event, + event=log.Events.stacktrace) + start_response('500 - Internal Server Error', []) + yield '500 - Internal Server Error' + return + +def resourcehandler_backend(env, start_response): """Function to handle new wsgi requests """ mimetype = _pick_mimetype(env) diff --git a/confluent/main.py b/confluent/main.py index 74652e8c..81d63f15 100644 --- a/confluent/main.py +++ b/confluent/main.py @@ -24,6 +24,7 @@ # Things like heartbeating and discovery # It also will optionally snoop SLP DA requests +import atexit import confluent.pluginapi as pluginapi import confluent.httpapi as httpapi import confluent.sockapi as sockapi @@ -31,12 +32,80 @@ import eventlet import eventlet.backdoor as backdoor from eventlet.green import socket from eventlet import wsgi +import fcntl import multiprocessing import sys import os +import signal + +def _daemonize(): + thispid = os.fork() + if thispid > 0: + os.waitpid(thispid, 0) + os._exit(0) + os.setsid() + thispid = os.fork() + if thispid > 0: + print 'confluent server starting as pid %d' % thispid + os._exit(0) + os.closerange(0,2) + os.umask(63) + os.open(os.devnull, os.O_RDWR) + os.dup2(0, 1) + os.dup2(0, 2) + + +def _updatepidfile(): + pidfile = open('/var/run/confluent/pid', 'w+') + fcntl.flock(pidfile, fcntl.LOCK_EX) + pidfile.write(str(os.getpid())) + fcntl.flock(pidfile, fcntl.LOCK_UN) + pidfile.close() + + +def _checkpidfile(): + try: + pidfile = open('/var/run/confluent/pid', 'r+') + fcntl.flock(pidfile, fcntl.LOCK_EX) + pid = pidfile.read() + if pid != '': + print '/var/run/confluent/pid exists and indicates %s is still running' % pid + sys.exit(1) + pidfile.write(str(os.getpid())) + fcntl.flock(pidfile, fcntl.LOCK_UN) + pidfile.close() + except IOError: + pidfile = open('/var/run/confluent/pid', 'w') + fcntl.flock(pidfile, fcntl.LOCK_EX) + pidfile.write(str(os.getpid())) + fcntl.flock(pidfile, fcntl.LOCK_UN) + pidfile.close() + pidfile = open('/var/run/confluent/pid', 'r') + fcntl.flock(pidfile, fcntl.LOCK_SH) + pid = pidfile.read() + if pid != str(os.getpid()): + print '/var/run/confluent/pid exists and indicates %s is still running' % pid + sys.exit(1) + fcntl.flock(pidfile, fcntl.LOCK_UN) + pidfile.close() + + +def terminate(signal, frame): + sys.exit(0) + +def exit(): + pidfile = open('/var/run/confluent/pid') + pid = pidfile.read() + if pid == str(os.getpid()): + os.remove('/var/run/confluent/pid') def run(): + _checkpidfile() pluginapi.load_plugins() + _daemonize() + _updatepidfile() + signal.signal(signal.SIGINT, terminate) + signal.signal(signal.SIGTERM, terminate) #TODO(jbjohnso): eventlet has a bug about unix domain sockets, this code #works with bugs fixed #dbgsock = eventlet.listen("/var/run/confluent/dbg.sock", @@ -46,6 +115,7 @@ def run(): webservice.start() sockservice = sockapi.SockApi() sockservice.start() + atexit.register(exit) while (1): eventlet.sleep(100)