2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-02-20 12:31:37 +00:00

Merge pull request #22 from chenglch/log_rotation

Add log rotation support
This commit is contained in:
Jarrod Johnson 2015-09-24 10:01:50 -04:00
commit 5128a80c79
4 changed files with 505 additions and 50 deletions

View File

@ -0,0 +1,50 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 IBM Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#This defines config variable to store the global configuration for confluent
import ConfigParser
_config = None
def init_config():
global _config
configfile = "/etc/confluent/service.cfg"
_config = ConfigParser.ConfigParser()
_config.read(configfile)
def get_config():
return _config
def get_int_option(section, option):
try:
return _config.getint(section, option)
except (
ConfigParser.NoSectionError, ConfigParser.NoOptionError, ValueError):
return None
def get_boolean_option(section, option):
try:
return _config.getboolean(section, option)
except (
ConfigParser.NoSectionError, ConfigParser.NoOptionError, ValueError):
return None
def get_option(section, option):
try:
return _config.get(section, option)
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
return None

View File

@ -56,4 +56,8 @@ class ForbiddenRequest(ConfluentException):
class NotImplementedException(ConfluentException):
# The current configuration/plugin is unable to perform
# the requested task. http code 501
pass
pass
class GlobalConfigError(ConfluentException):
# The configuration in the global config file is not right
pass

View File

@ -61,10 +61,15 @@
import collections
import confluent.config.configmanager
import confluent.config.conf as conf
import confluent.exceptions as exc
import eventlet
import fcntl
import glob
import json
import os
import re
import stat
import struct
import time
import traceback
@ -80,14 +85,14 @@ import traceback
# if that happens, warn to have user increase ulimit for optimal
# performance
MIDNIGHT = 24 * 60 * 60
_loggers = {}
class Events(object):
(
undefined, clearscreen, clientconnect, clientdisconnect,
consoledisconnect, consoleconnect, stacktrace
) = range(7)
consoledisconnect, consoleconnect, stacktrace, logrollover
) = range(8)
logstr = {
2: 'connection by ',
3: 'disconnection by ',
@ -98,6 +103,366 @@ class DataTypes(object):
text, dictionary, console, event = range(4)
class RollingTypes(object):
no_rolling, size_rolling, time_rolling = range(3)
class BaseRotatingHandler(object):
def __init__(self, filepath, logname):
"""
Use the specified filename for streamed logging
"""
self.filepath = filepath
self.textpath = self.filepath +logname
self.binpath = self.filepath + logname + ".cbl"
self.textfile = None
self.binfile = None
def open(self):
if self.textfile is None:
self.textfile = open(self.textpath, mode='ab')
if self.binfile is None:
self.binfile = open(self.binpath, mode='ab')
return self.textfile, self.binfile
def try_emit(self, binrecord, textrecord):
"""
Emit a record.
Output the record to the file, catering for rollover as described
in doRollover().
"""
rolling_type = self.shouldRollover(binrecord, textrecord)
if rolling_type:
return self.doRollover(rolling_type)
return None
def emit(self, binrecord, textrecord):
if self.textfile is None:
self.textfile = open(self.textpath, mode='ab')
if self.binfile is None:
self.binfile = open(self.binpath, mode='ab')
self.textfile.write(textrecord)
self.binfile.write(binrecord)
self.textfile.flush()
self.binfile.flush()
def get_textfile_offset(self, data_len):
if self.textfile is None:
self.textfile = open(self.textpath, mode='ab')
return self.textfile.tell() + data_len
def close(self):
if self.textfile:
self.textfile.close
self.textfile = None
if self.binfile:
self.binfile.close
self.binfile = None
class TimedAndSizeRotatingFileHandler(BaseRotatingHandler):
"""
Handler for logging to a file, rotating the log file at certain timed
intervals.
If backupCount is > 0, when rollover is done, no more than backupCount
files are kept - the oldest ones are deleted.
"""
def __init__(self, filepath, logname, interval=1):
BaseRotatingHandler.__init__(self, filepath, logname)
try:
self.when = conf.get_option('log', 'when').upper()
except (AttributeError):
self.when = 'D'
self.backupCount = conf.get_int_option('log', 'backup_count') or 3
self.maxBytes = conf.get_int_option(
'log','max_bytes') or 4 * 1024 * 1024 * 1024
if self.maxBytes < 8192:
raise exc.GlobalConfigError("The minimum value of max_bytes "
"of log rolling size in the log "
"section should larger than 8192.")
self.utc = conf.get_boolean_option('log', 'utc') or False
# Calculate the real rollover interval, which is just the number of
# seconds between rollovers. Also set the filename suffix used when
# a rollover occurs. Current 'when' events supported:
# S - Seconds
# M - Minutes
# H - Hours
# D - Days
# midnight - roll over at midnight
# W{0-6} - roll over on a certain day; 0 - Monday
#
# Case of the 'when' specifier is not important; lower or upper case
# will work.
if self.when == 'S':
self.interval = 1 # one second
self.suffix = "%Y-%m-%d_%H-%M-%S"
self.extMatch = r"^(cbl\.){0,1}\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}\.{0,1}\d*$"
elif self.when == 'M':
self.interval = 60 # one minute
self.suffix = "%Y-%m-%d_%H-%M"
self.extMatch = r"^(cbl\.){0,1}\d{4}-\d{2}-\d{2}_\d{2}-\d{2}\.{0,1}\d*$"
elif self.when == 'H':
self.interval = 60 * 60 # one hour
self.suffix = "%Y-%m-%d_%H"
self.extMatch = r"^(cbl\.){0,1}\d{4}-\d{2}-\d{2}_\d{2}$"
elif self.when == 'D' or self.when == 'MIDNIGHT':
self.interval = 60 * 60 * 24 # one day
self.suffix = "%Y-%m-%d"
self.extMatch = r"^(cbl\.){0,1}\d{4}-\d{2}-\d{2}\.{0,1}\d*$"
elif self.when.startswith('W'):
self.interval = 60 * 60 * 24 * 7 # one week
if len(self.when) != 2:
raise ValueError("You must specify a day for weekly rollover from 0 to 6 (0 is Monday): %s" % self.when)
if self.when[1] < '0' or self.when[1] > '6':
raise ValueError("Invalid day specified for weekly rollover: %s" % self.when)
self.dayOfWeek = int(self.when[1])
self.suffix = "%Y-%m-%d"
self.extMatch = r"^(cbl\.){0,1}\d{4}-\d{2}-\d{2}\.{0,1}\d*$"
else:
raise ValueError("Invalid rollover interval specified: %s" % self.when)
self.extMatch = re.compile(self.extMatch)
self.interval = self.interval * interval # multiply by units requested
# Note: Get the modify time of text log file to calculate the
# rollover time
if os.path.exists(self.textpath):
t = os.stat(self.textpath)[stat.ST_MTIME]
else:
t = int(time.time())
self.rolloverAt = self.computeRollover(t)
self.sizeRollingCount = 0
self.initSizeRollingCount()
def computeRollover(self, currentTime):
"""
Work out the rollover time based on the specified time.
"""
result = currentTime + self.interval
# If we are rolling over at midnight or weekly, then the interval is already known.
# What we need to figure out is WHEN the next interval is. In other words,
# if you are rolling over at midnight, then your base interval is 1 day,
# but you want to start that one day clock at midnight, not now. So, we
# have to fudge the rolloverAt value in order to trigger the first rollover
# at the right time. After that, the regular interval will take care of
# the rest. Note that this code doesn't care about leap seconds. :)
if self.when == 'MIDNIGHT' or self.when.startswith('W'):
# This could be done with less code, but I wanted it to be clear
if self.utc:
t = time.gmtime(currentTime)
else:
t = time.localtime(currentTime)
currentHour = t[3]
currentMinute = t[4]
currentSecond = t[5]
# r is the number of seconds left between now and midnight
r = MIDNIGHT - ((currentHour * 60 + currentMinute) * 60 +
currentSecond)
result = currentTime + r
# If we are rolling over on a certain day, add in the number of days until
# the next rollover, but offset by 1 since we just calculated the time
# until the next day starts. There are three cases:
# Case 1) The day to rollover is today; in this case, do nothing
# Case 2) The day to rollover is further in the interval (i.e., today is
# day 2 (Wednesday) and rollover is on day 6 (Sunday). Days to
# next rollover is simply 6 - 2 - 1, or 3.
# Case 3) The day to rollover is behind us in the interval (i.e., today
# is day 5 (Saturday) and rollover is on day 3 (Thursday).
# Days to rollover is 6 - 5 + 3, or 4. In this case, it's the
# number of days left in the current week (1) plus the number
# of days in the next week until the rollover day (3).
# The calculations described in 2) and 3) above need to have a day added.
# This is because the above time calculation takes us to midnight on this
# day, i.e. the start of the next day.
if self.when.startswith('W'):
day = t[6] # 0 is Monday
if day != self.dayOfWeek:
if day < self.dayOfWeek:
daysToWait = self.dayOfWeek - day
else:
daysToWait = 6 - day + self.dayOfWeek + 1
newRolloverAt = result + (daysToWait * (60 * 60 * 24))
if not self.utc:
dstNow = t[-1]
dstAtRollover = time.localtime(newRolloverAt)[-1]
if dstNow != dstAtRollover:
if not dstNow: # DST kicks in before next rollover, so we need to deduct an hour
addend = -3600
else: # DST bows out before next rollover, so we need to add an hour
addend = 3600
newRolloverAt += addend
result = newRolloverAt
return result
def shouldRollover(self, binrecord, textrecord):
"""
Determine if rollover should occur.
Just compare times.
"""
# time rolling first
t = int(time.time())
if t >= self.rolloverAt:
return RollingTypes.time_rolling
self.open()
if self.maxBytes > 0: # are we rolling over?
if self.textfile.tell() + len(textrecord) >= self.maxBytes:
return RollingTypes.size_rolling
if self.binfile.tell() + len(binrecord) >= self.maxBytes:
return RollingTypes.size_rolling
return RollingTypes.no_rolling
def getFilesToDelete(self):
"""
Determine the files to delete when rolling over.
"""
dirName, baseName = os.path.split(self.textpath)
files = []
prefix = baseName + "."
filePaths = glob.glob(os.path.join(dirName, "%s*" % prefix))
fileNames = [os.path.split(f)[1] for f in filePaths]
plen = len(prefix)
t_set = set()
for fileName in fileNames:
suffix = fileName[plen:]
if self.extMatch.match(suffix):
s = suffix.split(".")
t = s[1] if suffix.startswith("cbl") else s[0]
t_set.add(t)
files.append({'time': t, 'file': os.path.join(dirName,
fileName)})
t_list = list(t_set)
t_list.sort()
result = [f['file'] for f in files if
f['time'] in t_list[:-(self.backupCount - 1)]]
return result
def initSizeRollingCount(self):
"""
Init the max number of log files for current time.
"""
dirName, baseName = os.path.split(self.textpath)
prefix = baseName + "."
filePaths = glob.glob(os.path.join(dirName, "%s*" % prefix))
fileNames = [os.path.split(f)[1] for f in filePaths]
plen = len(prefix)
for fileName in fileNames:
suffix = fileName[plen:]
try:
self.sizeRollingCount = max(self.sizeRollingCount, int(suffix))
except ValueError:
pass
def _sizeRoll(self):
self.close()
for i in range(self.sizeRollingCount, 0, -1):
sbfn = "%s.%d" % (self.binpath, i)
dbfn = "%s.%d" % (self.binpath, i + 1)
stfn = "%s.%d" % (self.textpath, i)
dtfn = "%s.%d" % (self.textpath, i + 1)
if os.path.exists(sbfn):
if os.path.exists(dbfn):
os.remove(dbfn)
os.rename(sbfn, dbfn)
if os.path.exists(stfn):
if os.path.exists(dtfn):
os.remove(dtfn)
os.rename(stfn, dtfn)
# size rolling happens, add statistics count
self.sizeRollingCount += 1
dbfn = self.binpath + ".1"
dtfn = self.textpath + ".1"
if os.path.exists(dbfn):
os.remove(dbfn)
if os.path.exists(dtfn):
os.remove(dtfn)
if os.path.exists(self.binpath):
os.rename(self.binpath, dbfn)
if os.path.exists(self.textpath):
os.rename(self.textpath, dtfn)
return dbfn, dtfn
def _timeRoll(self):
self.close()
# get the time that this sequence started at and make it a TimeTuple
currentTime = int(time.time())
dstNow = time.localtime(currentTime)[-1]
t = self.rolloverAt - self.interval
if self.utc:
timeTuple = time.gmtime(t)
else:
timeTuple = time.localtime(t)
dstThen = timeTuple[-1]
if dstNow != dstThen:
if dstNow:
addend = 3600
else:
addend = -3600
timeTuple = time.localtime(t + addend)
# if size rolling files exist
for i in range(self.sizeRollingCount, 0, -1):
sbfn = "%s.%d" % ( self.binpath, i)
dbfn = "%s.%s.%d" % (
self.binpath, time.strftime(self.suffix, timeTuple),i)
stfn = "%s.%d" % (self.textpath, i)
dtfn = "%s.%s.%d" % (
self.textpath, time.strftime(self.suffix, timeTuple), i)
if os.path.exists(sbfn):
if os.path.exists(dbfn):
os.remove(dbfn)
os.rename(sbfn, dbfn)
if os.path.exists(stfn):
if os.path.exists(dtfn):
os.remove(dtfn)
os.rename(stfn, dtfn)
# As time rolling happens, reset statistics count
self.sizeRollingCount = 0
dbfn = self.binpath + "." + time.strftime(self.suffix, timeTuple)
dtfn = self.textpath + "." + time.strftime(self.suffix, timeTuple)
if os.path.exists(dbfn):
os.remove(dbfn)
if os.path.exists(dtfn):
os.remove(dtfn)
if os.path.exists(self.binpath):
os.rename(self.binpath, dbfn)
if os.path.exists(self.textpath):
os.rename(self.textpath, dtfn)
if self.backupCount > 0:
for s in self.getFilesToDelete():
os.remove(s)
newRolloverAt = self.computeRollover(currentTime)
while newRolloverAt <= currentTime:
newRolloverAt = newRolloverAt + self.interval
#If DST changes and midnight or weekly rollover, adjust for this.
if (self.when == 'MIDNIGHT' or self.when.startswith('W')) and not self.utc:
dstAtRollover = time.localtime(newRolloverAt)[-1]
if dstNow != dstAtRollover:
if not dstNow: # DST kicks in before next rollover, so we need to deduct an hour
addend = -3600
else: # DST bows out before next rollover, so we need to add an hour
addend = 3600
newRolloverAt += addend
self.rolloverAt = newRolloverAt
return dbfn, dtfn
def doRollover(self, rolling_type):
"""
do a rollover based on the rolling type.
"""
if rolling_type == RollingTypes.size_rolling:
return self._sizeRoll()
if rolling_type == RollingTypes.time_rolling:
return self._timeRoll()
class Logger(object):
"""
:param console: If true, [] will be used to denote non-text events. If
@ -128,20 +493,23 @@ class Logger(object):
self.filepath += "consoles/"
if not os.path.isdir(self.filepath):
os.makedirs(self.filepath, 448)
self.textpath = self.filepath + logname
self.binpath = self.filepath + logname + ".cbl"
self.writer = None
self.closer = None
self.textfile = None
self.binfile = None
self.handler = TimedAndSizeRotatingFileHandler(self.filepath, logname,
interval=1)
self.lockfile = None
self.logname = logname
self.logentries = collections.deque()
def _lock(self, arrribute):
if self.lockfile is None or self.lockfile.closed:
lockpath = os.path.join(self.filepath, "%s-lock" % self.logname)
self.lockfile = open(lockpath, 'a')
fcntl.flock(self.lockfile, arrribute)
def writedata(self):
if self.textfile is None:
self.textfile = open(self.textpath, mode='ab')
if self.binfile is None:
self.binfile = open(self.binpath, mode='ab')
while self.logentries:
textfile, binfile = self.handler.open()
entry = self.logentries.popleft()
ltype = entry[0]
tstamp = entry[1]
@ -156,8 +524,8 @@ class Logger(object):
elif not self.isconsole:
textdate = time.strftime(
'%b %d %H:%M:%S ', time.localtime(tstamp))
fcntl.flock(self.textfile, fcntl.LOCK_EX)
offset = self.textfile.tell() + len(textdate)
self._lock(fcntl.LOCK_EX)
offset = textfile.tell() + len(textdate)
datalen = len(data)
eventaux = entry[4]
if eventaux is None:
@ -175,53 +543,95 @@ class Logger(object):
textrecord = textdate + data
if not textrecord.endswith('\n'):
textrecord += '\n'
self.textfile.write(textrecord)
fcntl.flock(self.textfile, fcntl.LOCK_UN)
fcntl.flock(self.binfile, fcntl.LOCK_EX)
self.binfile.write(binrecord)
fcntl.flock(self.binfile, fcntl.LOCK_UN)
self.textfile.flush()
self.binfile.flush()
files = self.handler.try_emit(binrecord, textrecord)
if not files:
self.handler.emit(binrecord, textrecord)
else:
# Log the rolling event at first, then log the last data
# which cause the rolling event.
to_bfile, to_tfile = files
self.logentries.appendleft(entry)
roll_data = "rename:%s>%s" % (self.handler.textpath, to_tfile)
self.logentries.appendleft([DataTypes.event, tstamp, roll_data,
Events.logrollover, None])
self._lock(fcntl.LOCK_UN)
if self.closer is None:
self.closer = eventlet.spawn_after(15, self.closelog)
self.writer = None
def read_recent_text(self, size):
def parse_last_rolling_files(textfile, offset, datalen):
textfile.seek(offset, 0)
textpath = textfile.read(datalen).split('>')[1]
dir_name, base_name = os.path.split(textpath)
temp = base_name.split('.')
temp.insert(1,'cbl')
# find the recent bin file
binpath = os.path.join(dir_name, ".".join(temp))
return textpath, binpath
textpath = self.handler.textpath
binpath = self.handler.binpath
try:
textfile = open(self.textpath, mode='r')
binfile = open(self.binpath, mode='r')
textfile = open(textpath, mode='r')
binfile = open(binpath, mode='r')
except IOError:
return '', 0, 0
fcntl.flock(binfile, fcntl.LOCK_SH)
self._lock(fcntl.LOCK_SH)
binfile.seek(0, 2)
binidx = binfile.tell() - 16
binidx = binfile.tell()
currsize = 0
offsets = []
termstate = None
recenttimestamp = 0
access_last_rename = False
while binidx > 0 and currsize < size:
binfile.seek(binidx, 0)
binidx -= 16
binfile.seek(binidx, 0)
recbytes = binfile.read(16)
(_, ltype, offset, datalen, tstamp, evtdata, eventaux, _) = \
struct.unpack(">BBIHIBBH", recbytes)
if ltype != 2:
# rolling events found.
if ltype == DataTypes.event and evtdata == Events.logrollover:
# Now, we can only find the last renamed file which logging
# the data.
if access_last_rename == False:
access_last_rename = True
else:
break
textpath, binpath = parse_last_rolling_files(textfile, offset,
datalen)
# Rolling event detected, close the current bin file, then open
# the renamed bin file.
binfile.close()
try:
binfile = open(binpath, mode='r')
except IOError:
return '', 0, 0
binfile.seek(0, 2)
binidx = binfile.tell()
elif ltype != 2:
continue
if tstamp > recenttimestamp:
recenttimestamp = tstamp
currsize += datalen
offsets.append((offset, datalen))
offsets.append((offset, datalen, textpath))
if termstate is None:
termstate = eventaux
fcntl.flock(binfile, fcntl.LOCK_UN)
binfile.close()
textdata = ''
fcntl.flock(textfile, fcntl.LOCK_SH)
while offsets:
(offset, length) = offsets.pop()
(offset, length, textpath) = offsets.pop()
if textfile.name != textpath:
textfile.close()
try:
textfile = open(textpath)
except IOError:
return '', 0, 0
textfile.seek(offset, 0)
textdata += textfile.read(length)
fcntl.flock(textfile, fcntl.LOCK_UN)
self._lock(fcntl.LOCK_UN)
textfile.close()
if termstate is None:
termstate = 0
@ -270,8 +680,5 @@ class Logger(object):
self.writer = eventlet.spawn_after(2, self.writedata)
def closelog(self):
self.textfile.close()
self.binfile.close()
self.textfile = None
self.binfile = None
self.handler.close()
self.closer = None

View File

@ -27,6 +27,7 @@
import atexit
import confluent.auth as auth
import confluent.config.conf as conf
import confluent.config.configmanager as configmanager
import confluent.consoleserver as consoleserver
import confluent.core as confluentcore
@ -40,7 +41,6 @@ import fcntl
import sys
import os
import signal
import ConfigParser
def _daemonize():
@ -126,10 +126,9 @@ def _initsecurity(config):
def run():
_checkpidfile()
configfile = "/etc/confluent/service.cfg"
config = ConfigParser.ConfigParser()
config.read(configfile)
conf.init_config()
try:
config = conf.get_config()
_initsecurity(config)
except:
sys.stderr.write("Error unlocking credential store\n")
@ -150,8 +149,8 @@ def run():
#dbgsock = eventlet.listen("/var/run/confluent/dbg.sock",
# family=socket.AF_UNIX)
#eventlet.spawn_n(backdoor.backdoor_server, dbgsock)
http_bind_host, http_bind_port = _get_connector_config(config, 'http')
sock_bind_host, sock_bind_port = _get_connector_config(config, 'socket')
http_bind_host, http_bind_port = _get_connector_config('http')
sock_bind_host, sock_bind_port = _get_connector_config('socket')
consoleserver.start_console_sessions()
webservice = httpapi.HttpApi(http_bind_host, http_bind_port)
webservice.start()
@ -161,12 +160,7 @@ def run():
while 1:
eventlet.sleep(100)
def _get_connector_config(config, session):
try:
host = config.get(session, 'bindhost')
port = config.getint(session, 'bindport')
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError) as e:
host = None
port = None
def _get_connector_config(session):
host = conf.get_option(session, 'bindhost')
port = conf.get_int_option(session, 'bindport')
return (host, port)