mirror of
https://github.com/xcat2/confluent.git
synced 2024-11-25 19:10:10 +00:00
Daemonize and honor pid file lock for exclusive execution
This commit is contained in:
parent
f509bc5db9
commit
cf4e8713ff
@ -168,15 +168,15 @@ def _authorize_request(env, operation):
|
||||
cookie['confluentsessionid']['secure'] = 1
|
||||
cookie['confluentsessionid']['httponly'] = 1
|
||||
cookie['confluentsessionid']['path'] = '/'
|
||||
auditmsg = {
|
||||
'user': name,
|
||||
'operation': operation,
|
||||
'target': env['PATH_INFO'],
|
||||
}
|
||||
skiplog = False
|
||||
if '/console/session' in env['PATH_INFO']:
|
||||
skiplog = True
|
||||
if authdata:
|
||||
auditmsg = {
|
||||
'user': name,
|
||||
'operation': operation,
|
||||
'target': env['PATH_INFO'],
|
||||
}
|
||||
authinfo = {'code': 200,
|
||||
'cookie': cookie,
|
||||
'cfgmgr': authdata[1],
|
||||
@ -229,6 +229,17 @@ def _assign_consessionid(consolesession):
|
||||
|
||||
|
||||
def resourcehandler(env, start_response):
|
||||
try:
|
||||
for rsp in resourcehandler_backend(env, start_response):
|
||||
yield rsp
|
||||
except:
|
||||
tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event,
|
||||
event=log.Events.stacktrace)
|
||||
start_response('500 - Internal Server Error', [])
|
||||
yield '500 - Internal Server Error'
|
||||
return
|
||||
|
||||
def resourcehandler_backend(env, start_response):
|
||||
"""Function to handle new wsgi requests
|
||||
"""
|
||||
mimetype = _pick_mimetype(env)
|
||||
|
@ -24,6 +24,7 @@
|
||||
# Things like heartbeating and discovery
|
||||
# It also will optionally snoop SLP DA requests
|
||||
|
||||
import atexit
|
||||
import confluent.pluginapi as pluginapi
|
||||
import confluent.httpapi as httpapi
|
||||
import confluent.sockapi as sockapi
|
||||
@ -31,12 +32,80 @@ import eventlet
|
||||
import eventlet.backdoor as backdoor
|
||||
from eventlet.green import socket
|
||||
from eventlet import wsgi
|
||||
import fcntl
|
||||
import multiprocessing
|
||||
import sys
|
||||
import os
|
||||
import signal
|
||||
|
||||
def _daemonize():
|
||||
thispid = os.fork()
|
||||
if thispid > 0:
|
||||
os.waitpid(thispid, 0)
|
||||
os._exit(0)
|
||||
os.setsid()
|
||||
thispid = os.fork()
|
||||
if thispid > 0:
|
||||
print 'confluent server starting as pid %d' % thispid
|
||||
os._exit(0)
|
||||
os.closerange(0,2)
|
||||
os.umask(63)
|
||||
os.open(os.devnull, os.O_RDWR)
|
||||
os.dup2(0, 1)
|
||||
os.dup2(0, 2)
|
||||
|
||||
|
||||
def _updatepidfile():
|
||||
pidfile = open('/var/run/confluent/pid', 'w+')
|
||||
fcntl.flock(pidfile, fcntl.LOCK_EX)
|
||||
pidfile.write(str(os.getpid()))
|
||||
fcntl.flock(pidfile, fcntl.LOCK_UN)
|
||||
pidfile.close()
|
||||
|
||||
|
||||
def _checkpidfile():
|
||||
try:
|
||||
pidfile = open('/var/run/confluent/pid', 'r+')
|
||||
fcntl.flock(pidfile, fcntl.LOCK_EX)
|
||||
pid = pidfile.read()
|
||||
if pid != '':
|
||||
print '/var/run/confluent/pid exists and indicates %s is still running' % pid
|
||||
sys.exit(1)
|
||||
pidfile.write(str(os.getpid()))
|
||||
fcntl.flock(pidfile, fcntl.LOCK_UN)
|
||||
pidfile.close()
|
||||
except IOError:
|
||||
pidfile = open('/var/run/confluent/pid', 'w')
|
||||
fcntl.flock(pidfile, fcntl.LOCK_EX)
|
||||
pidfile.write(str(os.getpid()))
|
||||
fcntl.flock(pidfile, fcntl.LOCK_UN)
|
||||
pidfile.close()
|
||||
pidfile = open('/var/run/confluent/pid', 'r')
|
||||
fcntl.flock(pidfile, fcntl.LOCK_SH)
|
||||
pid = pidfile.read()
|
||||
if pid != str(os.getpid()):
|
||||
print '/var/run/confluent/pid exists and indicates %s is still running' % pid
|
||||
sys.exit(1)
|
||||
fcntl.flock(pidfile, fcntl.LOCK_UN)
|
||||
pidfile.close()
|
||||
|
||||
|
||||
def terminate(signal, frame):
|
||||
sys.exit(0)
|
||||
|
||||
def exit():
|
||||
pidfile = open('/var/run/confluent/pid')
|
||||
pid = pidfile.read()
|
||||
if pid == str(os.getpid()):
|
||||
os.remove('/var/run/confluent/pid')
|
||||
|
||||
def run():
|
||||
_checkpidfile()
|
||||
pluginapi.load_plugins()
|
||||
_daemonize()
|
||||
_updatepidfile()
|
||||
signal.signal(signal.SIGINT, terminate)
|
||||
signal.signal(signal.SIGTERM, terminate)
|
||||
#TODO(jbjohnso): eventlet has a bug about unix domain sockets, this code
|
||||
#works with bugs fixed
|
||||
#dbgsock = eventlet.listen("/var/run/confluent/dbg.sock",
|
||||
@ -46,6 +115,7 @@ def run():
|
||||
webservice.start()
|
||||
sockservice = sockapi.SockApi()
|
||||
sockservice.start()
|
||||
atexit.register(exit)
|
||||
while (1):
|
||||
eventlet.sleep(100)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user