mirror of
https://github.com/xcat2/confluent.git
synced 2025-02-20 12:31:37 +00:00
Add log rotation support
Add TimedAndSizeRotatingFileHandler which mixes together the RotatingFileHandler and TimedRotatingFileHandler from python logging module to process the log data. Add logrollover event to track the renamed information, so that console session can read the log data from current log file and last renamed file. Global configuration is used by the log handler. The format of the log section in '/etc/confluent/service.cfg' is like: [log] when = m backup_count = 3 max_bytes = 8192 utc = False
This commit is contained in:
parent
a1a61dfdbd
commit
dc436fda74
50
confluent_server/confluent/config/conf.py
Normal file
50
confluent_server/confluent/config/conf.py
Normal file
@ -0,0 +1,50 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2014 IBM Corporation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
#This defines config variable to store the global configuration for confluent
|
||||
import ConfigParser
|
||||
|
||||
_config = None
|
||||
|
||||
def init_config():
|
||||
global _config
|
||||
configfile = "/etc/confluent/service.cfg"
|
||||
_config = ConfigParser.ConfigParser()
|
||||
_config.read(configfile)
|
||||
|
||||
def get_config():
|
||||
return _config
|
||||
|
||||
def get_int_option(section, option):
|
||||
try:
|
||||
return _config.getint(section, option)
|
||||
except (
|
||||
ConfigParser.NoSectionError, ConfigParser.NoOptionError, ValueError):
|
||||
return None
|
||||
|
||||
def get_boolean_option(section, option):
|
||||
try:
|
||||
return _config.getboolean(section, option)
|
||||
except (
|
||||
ConfigParser.NoSectionError, ConfigParser.NoOptionError, ValueError):
|
||||
return None
|
||||
|
||||
def get_option(section, option):
|
||||
try:
|
||||
return _config.get(section, option)
|
||||
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError):
|
||||
return None
|
@ -56,4 +56,8 @@ class ForbiddenRequest(ConfluentException):
|
||||
class NotImplementedException(ConfluentException):
|
||||
# The current configuration/plugin is unable to perform
|
||||
# the requested task. http code 501
|
||||
pass
|
||||
pass
|
||||
|
||||
class GlobalConfigError(ConfluentException):
|
||||
# The configuration in the global config file is not right
|
||||
pass
|
||||
|
@ -61,10 +61,15 @@
|
||||
|
||||
import collections
|
||||
import confluent.config.configmanager
|
||||
import confluent.config.conf as conf
|
||||
import confluent.exceptions as exc
|
||||
import eventlet
|
||||
import fcntl
|
||||
import glob
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import stat
|
||||
import struct
|
||||
import time
|
||||
import traceback
|
||||
@ -80,14 +85,14 @@ import traceback
|
||||
# if that happens, warn to have user increase ulimit for optimal
|
||||
# performance
|
||||
|
||||
MIDNIGHT = 24 * 60 * 60
|
||||
_loggers = {}
|
||||
|
||||
|
||||
class Events(object):
|
||||
(
|
||||
undefined, clearscreen, clientconnect, clientdisconnect,
|
||||
consoledisconnect, consoleconnect, stacktrace
|
||||
) = range(7)
|
||||
consoledisconnect, consoleconnect, stacktrace, logrollover
|
||||
) = range(8)
|
||||
logstr = {
|
||||
2: 'connection by ',
|
||||
3: 'disconnection by ',
|
||||
@ -98,6 +103,366 @@ class DataTypes(object):
|
||||
text, dictionary, console, event = range(4)
|
||||
|
||||
|
||||
class RollingTypes(object):
|
||||
no_rolling, size_rolling, time_rolling = range(3)
|
||||
|
||||
|
||||
class BaseRotatingHandler(object):
|
||||
|
||||
def __init__(self, filepath, logname):
|
||||
"""
|
||||
Use the specified filename for streamed logging
|
||||
"""
|
||||
self.filepath = filepath
|
||||
self.textpath = self.filepath +logname
|
||||
self.binpath = self.filepath + logname + ".cbl"
|
||||
self.textfile = None
|
||||
self.binfile = None
|
||||
|
||||
def open(self):
|
||||
if self.textfile is None:
|
||||
self.textfile = open(self.textpath, mode='ab')
|
||||
if self.binfile is None:
|
||||
self.binfile = open(self.binpath, mode='ab')
|
||||
return self.textfile, self.binfile
|
||||
|
||||
def try_emit(self, binrecord, textrecord):
|
||||
"""
|
||||
Emit a record.
|
||||
|
||||
Output the record to the file, catering for rollover as described
|
||||
in doRollover().
|
||||
"""
|
||||
rolling_type = self.shouldRollover(binrecord, textrecord)
|
||||
if rolling_type:
|
||||
return self.doRollover(rolling_type)
|
||||
return None
|
||||
|
||||
def emit(self, binrecord, textrecord):
|
||||
if self.textfile is None:
|
||||
self.textfile = open(self.textpath, mode='ab')
|
||||
if self.binfile is None:
|
||||
self.binfile = open(self.binpath, mode='ab')
|
||||
self.textfile.write(textrecord)
|
||||
self.binfile.write(binrecord)
|
||||
self.textfile.flush()
|
||||
self.binfile.flush()
|
||||
|
||||
def get_textfile_offset(self, data_len):
|
||||
if self.textfile is None:
|
||||
self.textfile = open(self.textpath, mode='ab')
|
||||
return self.textfile.tell() + data_len
|
||||
|
||||
def close(self):
|
||||
if self.textfile:
|
||||
self.textfile.close
|
||||
self.textfile = None
|
||||
if self.binfile:
|
||||
self.binfile.close
|
||||
self.binfile = None
|
||||
|
||||
|
||||
class TimedAndSizeRotatingFileHandler(BaseRotatingHandler):
|
||||
"""
|
||||
Handler for logging to a file, rotating the log file at certain timed
|
||||
intervals.
|
||||
|
||||
If backupCount is > 0, when rollover is done, no more than backupCount
|
||||
files are kept - the oldest ones are deleted.
|
||||
"""
|
||||
|
||||
def __init__(self, filepath, logname, interval=1):
|
||||
BaseRotatingHandler.__init__(self, filepath, logname)
|
||||
try:
|
||||
self.when = conf.get_option('log', 'when').upper()
|
||||
except (AttributeError):
|
||||
self.when = 'D'
|
||||
self.backupCount = conf.get_int_option('log', 'backup_count') or 3
|
||||
self.maxBytes = conf.get_int_option(
|
||||
'log','max_bytes') or 4 * 1024 * 1024 * 1024
|
||||
if self.maxBytes < 8192:
|
||||
raise exc.GlobalConfigError("The minimum value of max_bytes "
|
||||
"of log rolling size in the log "
|
||||
"section should larger than 8192.")
|
||||
self.utc = conf.get_boolean_option('log', 'utc') or False
|
||||
|
||||
# Calculate the real rollover interval, which is just the number of
|
||||
# seconds between rollovers. Also set the filename suffix used when
|
||||
# a rollover occurs. Current 'when' events supported:
|
||||
# S - Seconds
|
||||
# M - Minutes
|
||||
# H - Hours
|
||||
# D - Days
|
||||
# midnight - roll over at midnight
|
||||
# W{0-6} - roll over on a certain day; 0 - Monday
|
||||
#
|
||||
# Case of the 'when' specifier is not important; lower or upper case
|
||||
# will work.
|
||||
if self.when == 'S':
|
||||
self.interval = 1 # one second
|
||||
self.suffix = "%Y-%m-%d_%H-%M-%S"
|
||||
self.extMatch = r"^(cbl\.){0,1}\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}\.{0,1}\d*$"
|
||||
elif self.when == 'M':
|
||||
self.interval = 60 # one minute
|
||||
self.suffix = "%Y-%m-%d_%H-%M"
|
||||
self.extMatch = r"^(cbl\.){0,1}\d{4}-\d{2}-\d{2}_\d{2}-\d{2}\.{0,1}\d*$"
|
||||
elif self.when == 'H':
|
||||
self.interval = 60 * 60 # one hour
|
||||
self.suffix = "%Y-%m-%d_%H"
|
||||
self.extMatch = r"^(cbl\.){0,1}\d{4}-\d{2}-\d{2}_\d{2}$"
|
||||
elif self.when == 'D' or self.when == 'MIDNIGHT':
|
||||
self.interval = 60 * 60 * 24 # one day
|
||||
self.suffix = "%Y-%m-%d"
|
||||
self.extMatch = r"^(cbl\.){0,1}\d{4}-\d{2}-\d{2}\.{0,1}\d*$"
|
||||
elif self.when.startswith('W'):
|
||||
self.interval = 60 * 60 * 24 * 7 # one week
|
||||
if len(self.when) != 2:
|
||||
raise ValueError("You must specify a day for weekly rollover from 0 to 6 (0 is Monday): %s" % self.when)
|
||||
if self.when[1] < '0' or self.when[1] > '6':
|
||||
raise ValueError("Invalid day specified for weekly rollover: %s" % self.when)
|
||||
self.dayOfWeek = int(self.when[1])
|
||||
self.suffix = "%Y-%m-%d"
|
||||
self.extMatch = r"^(cbl\.){0,1}\d{4}-\d{2}-\d{2}\.{0,1}\d*$"
|
||||
else:
|
||||
raise ValueError("Invalid rollover interval specified: %s" % self.when)
|
||||
|
||||
self.extMatch = re.compile(self.extMatch)
|
||||
self.interval = self.interval * interval # multiply by units requested
|
||||
# Note: Get the modify time of text log file to calculate the
|
||||
# rollover time
|
||||
if os.path.exists(self.textpath):
|
||||
t = os.stat(self.textpath)[stat.ST_MTIME]
|
||||
else:
|
||||
t = int(time.time())
|
||||
self.rolloverAt = self.computeRollover(t)
|
||||
self.sizeRollingCount = 0
|
||||
self.initSizeRollingCount()
|
||||
|
||||
def computeRollover(self, currentTime):
|
||||
"""
|
||||
Work out the rollover time based on the specified time.
|
||||
"""
|
||||
result = currentTime + self.interval
|
||||
# If we are rolling over at midnight or weekly, then the interval is already known.
|
||||
# What we need to figure out is WHEN the next interval is. In other words,
|
||||
# if you are rolling over at midnight, then your base interval is 1 day,
|
||||
# but you want to start that one day clock at midnight, not now. So, we
|
||||
# have to fudge the rolloverAt value in order to trigger the first rollover
|
||||
# at the right time. After that, the regular interval will take care of
|
||||
# the rest. Note that this code doesn't care about leap seconds. :)
|
||||
if self.when == 'MIDNIGHT' or self.when.startswith('W'):
|
||||
# This could be done with less code, but I wanted it to be clear
|
||||
if self.utc:
|
||||
t = time.gmtime(currentTime)
|
||||
else:
|
||||
t = time.localtime(currentTime)
|
||||
currentHour = t[3]
|
||||
currentMinute = t[4]
|
||||
currentSecond = t[5]
|
||||
# r is the number of seconds left between now and midnight
|
||||
r = MIDNIGHT - ((currentHour * 60 + currentMinute) * 60 +
|
||||
currentSecond)
|
||||
result = currentTime + r
|
||||
# If we are rolling over on a certain day, add in the number of days until
|
||||
# the next rollover, but offset by 1 since we just calculated the time
|
||||
# until the next day starts. There are three cases:
|
||||
# Case 1) The day to rollover is today; in this case, do nothing
|
||||
# Case 2) The day to rollover is further in the interval (i.e., today is
|
||||
# day 2 (Wednesday) and rollover is on day 6 (Sunday). Days to
|
||||
# next rollover is simply 6 - 2 - 1, or 3.
|
||||
# Case 3) The day to rollover is behind us in the interval (i.e., today
|
||||
# is day 5 (Saturday) and rollover is on day 3 (Thursday).
|
||||
# Days to rollover is 6 - 5 + 3, or 4. In this case, it's the
|
||||
# number of days left in the current week (1) plus the number
|
||||
# of days in the next week until the rollover day (3).
|
||||
# The calculations described in 2) and 3) above need to have a day added.
|
||||
# This is because the above time calculation takes us to midnight on this
|
||||
# day, i.e. the start of the next day.
|
||||
if self.when.startswith('W'):
|
||||
day = t[6] # 0 is Monday
|
||||
if day != self.dayOfWeek:
|
||||
if day < self.dayOfWeek:
|
||||
daysToWait = self.dayOfWeek - day
|
||||
else:
|
||||
daysToWait = 6 - day + self.dayOfWeek + 1
|
||||
newRolloverAt = result + (daysToWait * (60 * 60 * 24))
|
||||
if not self.utc:
|
||||
dstNow = t[-1]
|
||||
dstAtRollover = time.localtime(newRolloverAt)[-1]
|
||||
if dstNow != dstAtRollover:
|
||||
if not dstNow: # DST kicks in before next rollover, so we need to deduct an hour
|
||||
addend = -3600
|
||||
else: # DST bows out before next rollover, so we need to add an hour
|
||||
addend = 3600
|
||||
newRolloverAt += addend
|
||||
result = newRolloverAt
|
||||
return result
|
||||
|
||||
def shouldRollover(self, binrecord, textrecord):
|
||||
"""
|
||||
Determine if rollover should occur.
|
||||
Just compare times.
|
||||
"""
|
||||
# time rolling first
|
||||
t = int(time.time())
|
||||
if t >= self.rolloverAt:
|
||||
return RollingTypes.time_rolling
|
||||
self.open()
|
||||
if self.maxBytes > 0: # are we rolling over?
|
||||
if self.textfile.tell() + len(textrecord) >= self.maxBytes:
|
||||
return RollingTypes.size_rolling
|
||||
if self.binfile.tell() + len(binrecord) >= self.maxBytes:
|
||||
return RollingTypes.size_rolling
|
||||
return RollingTypes.no_rolling
|
||||
|
||||
def getFilesToDelete(self):
|
||||
"""
|
||||
Determine the files to delete when rolling over.
|
||||
"""
|
||||
dirName, baseName = os.path.split(self.textpath)
|
||||
files = []
|
||||
prefix = baseName + "."
|
||||
filePaths = glob.glob(os.path.join(dirName, "%s*" % prefix))
|
||||
fileNames = [os.path.split(f)[1] for f in filePaths]
|
||||
plen = len(prefix)
|
||||
t_set = set()
|
||||
for fileName in fileNames:
|
||||
suffix = fileName[plen:]
|
||||
if self.extMatch.match(suffix):
|
||||
s = suffix.split(".")
|
||||
t = s[1] if suffix.startswith("cbl") else s[0]
|
||||
t_set.add(t)
|
||||
files.append({'time': t, 'file': os.path.join(dirName,
|
||||
fileName)})
|
||||
|
||||
t_list = list(t_set)
|
||||
t_list.sort()
|
||||
result = [f['file'] for f in files if
|
||||
f['time'] in t_list[:-(self.backupCount - 1)]]
|
||||
return result
|
||||
|
||||
def initSizeRollingCount(self):
|
||||
"""
|
||||
Init the max number of log files for current time.
|
||||
"""
|
||||
dirName, baseName = os.path.split(self.textpath)
|
||||
prefix = baseName + "."
|
||||
filePaths = glob.glob(os.path.join(dirName, "%s*" % prefix))
|
||||
fileNames = [os.path.split(f)[1] for f in filePaths]
|
||||
plen = len(prefix)
|
||||
for fileName in fileNames:
|
||||
suffix = fileName[plen:]
|
||||
try:
|
||||
self.sizeRollingCount = max(self.sizeRollingCount, int(suffix))
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
def _sizeRoll(self):
|
||||
self.close()
|
||||
for i in range(self.sizeRollingCount, 0, -1):
|
||||
sbfn = "%s.%d" % (self.binpath, i)
|
||||
dbfn = "%s.%d" % (self.binpath, i + 1)
|
||||
stfn = "%s.%d" % (self.textpath, i)
|
||||
dtfn = "%s.%d" % (self.textpath, i + 1)
|
||||
if os.path.exists(sbfn):
|
||||
if os.path.exists(dbfn):
|
||||
os.remove(dbfn)
|
||||
os.rename(sbfn, dbfn)
|
||||
if os.path.exists(stfn):
|
||||
if os.path.exists(dtfn):
|
||||
os.remove(dtfn)
|
||||
os.rename(stfn, dtfn)
|
||||
# size rolling happens, add statistics count
|
||||
self.sizeRollingCount += 1
|
||||
dbfn = self.binpath + ".1"
|
||||
dtfn = self.textpath + ".1"
|
||||
if os.path.exists(dbfn):
|
||||
os.remove(dbfn)
|
||||
if os.path.exists(dtfn):
|
||||
os.remove(dtfn)
|
||||
if os.path.exists(self.binpath):
|
||||
os.rename(self.binpath, dbfn)
|
||||
if os.path.exists(self.textpath):
|
||||
os.rename(self.textpath, dtfn)
|
||||
return dbfn, dtfn
|
||||
|
||||
def _timeRoll(self):
|
||||
self.close()
|
||||
# get the time that this sequence started at and make it a TimeTuple
|
||||
currentTime = int(time.time())
|
||||
dstNow = time.localtime(currentTime)[-1]
|
||||
t = self.rolloverAt - self.interval
|
||||
if self.utc:
|
||||
timeTuple = time.gmtime(t)
|
||||
else:
|
||||
timeTuple = time.localtime(t)
|
||||
dstThen = timeTuple[-1]
|
||||
if dstNow != dstThen:
|
||||
if dstNow:
|
||||
addend = 3600
|
||||
else:
|
||||
addend = -3600
|
||||
timeTuple = time.localtime(t + addend)
|
||||
|
||||
# if size rolling files exist
|
||||
for i in range(self.sizeRollingCount, 0, -1):
|
||||
sbfn = "%s.%d" % ( self.binpath, i)
|
||||
dbfn = "%s.%s.%d" % (
|
||||
self.binpath, time.strftime(self.suffix, timeTuple),i)
|
||||
stfn = "%s.%d" % (self.textpath, i)
|
||||
dtfn = "%s.%s.%d" % (
|
||||
self.textpath, time.strftime(self.suffix, timeTuple), i)
|
||||
if os.path.exists(sbfn):
|
||||
if os.path.exists(dbfn):
|
||||
os.remove(dbfn)
|
||||
os.rename(sbfn, dbfn)
|
||||
if os.path.exists(stfn):
|
||||
if os.path.exists(dtfn):
|
||||
os.remove(dtfn)
|
||||
os.rename(stfn, dtfn)
|
||||
|
||||
# As time rolling happens, reset statistics count
|
||||
self.sizeRollingCount = 0
|
||||
dbfn = self.binpath + "." + time.strftime(self.suffix, timeTuple)
|
||||
dtfn = self.textpath + "." + time.strftime(self.suffix, timeTuple)
|
||||
|
||||
if os.path.exists(dbfn):
|
||||
os.remove(dbfn)
|
||||
if os.path.exists(dtfn):
|
||||
os.remove(dtfn)
|
||||
if os.path.exists(self.binpath):
|
||||
os.rename(self.binpath, dbfn)
|
||||
if os.path.exists(self.textpath):
|
||||
os.rename(self.textpath, dtfn)
|
||||
if self.backupCount > 0:
|
||||
for s in self.getFilesToDelete():
|
||||
os.remove(s)
|
||||
|
||||
newRolloverAt = self.computeRollover(currentTime)
|
||||
while newRolloverAt <= currentTime:
|
||||
newRolloverAt = newRolloverAt + self.interval
|
||||
#If DST changes and midnight or weekly rollover, adjust for this.
|
||||
if (self.when == 'MIDNIGHT' or self.when.startswith('W')) and not self.utc:
|
||||
dstAtRollover = time.localtime(newRolloverAt)[-1]
|
||||
if dstNow != dstAtRollover:
|
||||
if not dstNow: # DST kicks in before next rollover, so we need to deduct an hour
|
||||
addend = -3600
|
||||
else: # DST bows out before next rollover, so we need to add an hour
|
||||
addend = 3600
|
||||
newRolloverAt += addend
|
||||
self.rolloverAt = newRolloverAt
|
||||
return dbfn, dtfn
|
||||
|
||||
def doRollover(self, rolling_type):
|
||||
"""
|
||||
do a rollover based on the rolling type.
|
||||
"""
|
||||
if rolling_type == RollingTypes.size_rolling:
|
||||
return self._sizeRoll()
|
||||
if rolling_type == RollingTypes.time_rolling:
|
||||
return self._timeRoll()
|
||||
|
||||
|
||||
class Logger(object):
|
||||
"""
|
||||
:param console: If true, [] will be used to denote non-text events. If
|
||||
@ -128,20 +493,23 @@ class Logger(object):
|
||||
self.filepath += "consoles/"
|
||||
if not os.path.isdir(self.filepath):
|
||||
os.makedirs(self.filepath, 448)
|
||||
self.textpath = self.filepath + logname
|
||||
self.binpath = self.filepath + logname + ".cbl"
|
||||
self.writer = None
|
||||
self.closer = None
|
||||
self.textfile = None
|
||||
self.binfile = None
|
||||
self.handler = TimedAndSizeRotatingFileHandler(self.filepath, logname,
|
||||
interval=1)
|
||||
self.lockfile = None
|
||||
self.logname = logname
|
||||
self.logentries = collections.deque()
|
||||
|
||||
def _lock(self, arrribute):
|
||||
if self.lockfile is None or self.lockfile.closed:
|
||||
lockpath = os.path.join(self.filepath, "%s-lock" % self.logname)
|
||||
self.lockfile = open(lockpath, 'a')
|
||||
fcntl.flock(self.lockfile, arrribute)
|
||||
|
||||
def writedata(self):
|
||||
if self.textfile is None:
|
||||
self.textfile = open(self.textpath, mode='ab')
|
||||
if self.binfile is None:
|
||||
self.binfile = open(self.binpath, mode='ab')
|
||||
while self.logentries:
|
||||
textfile, binfile = self.handler.open()
|
||||
entry = self.logentries.popleft()
|
||||
ltype = entry[0]
|
||||
tstamp = entry[1]
|
||||
@ -156,8 +524,8 @@ class Logger(object):
|
||||
elif not self.isconsole:
|
||||
textdate = time.strftime(
|
||||
'%b %d %H:%M:%S ', time.localtime(tstamp))
|
||||
fcntl.flock(self.textfile, fcntl.LOCK_EX)
|
||||
offset = self.textfile.tell() + len(textdate)
|
||||
self._lock(fcntl.LOCK_EX)
|
||||
offset = textfile.tell() + len(textdate)
|
||||
datalen = len(data)
|
||||
eventaux = entry[4]
|
||||
if eventaux is None:
|
||||
@ -175,53 +543,95 @@ class Logger(object):
|
||||
textrecord = textdate + data
|
||||
if not textrecord.endswith('\n'):
|
||||
textrecord += '\n'
|
||||
self.textfile.write(textrecord)
|
||||
fcntl.flock(self.textfile, fcntl.LOCK_UN)
|
||||
fcntl.flock(self.binfile, fcntl.LOCK_EX)
|
||||
self.binfile.write(binrecord)
|
||||
fcntl.flock(self.binfile, fcntl.LOCK_UN)
|
||||
self.textfile.flush()
|
||||
self.binfile.flush()
|
||||
files = self.handler.try_emit(binrecord, textrecord)
|
||||
if not files:
|
||||
self.handler.emit(binrecord, textrecord)
|
||||
else:
|
||||
# Log the rolling event at first, then log the last data
|
||||
# which cause the rolling event.
|
||||
to_bfile, to_tfile = files
|
||||
self.logentries.appendleft(entry)
|
||||
roll_data = "rename:%s>%s" % (self.handler.textpath, to_tfile)
|
||||
self.logentries.appendleft([DataTypes.event, tstamp, roll_data,
|
||||
Events.logrollover, None])
|
||||
self._lock(fcntl.LOCK_UN)
|
||||
if self.closer is None:
|
||||
self.closer = eventlet.spawn_after(15, self.closelog)
|
||||
self.writer = None
|
||||
|
||||
def read_recent_text(self, size):
|
||||
|
||||
def parse_last_rolling_files(textfile, offset, datalen):
|
||||
textfile.seek(offset, 0)
|
||||
textpath = textfile.read(datalen).split('>')[1]
|
||||
dir_name, base_name = os.path.split(textpath)
|
||||
temp = base_name.split('.')
|
||||
temp.insert(1,'cbl')
|
||||
# find the recent bin file
|
||||
binpath = os.path.join(dir_name, ".".join(temp))
|
||||
return textpath, binpath
|
||||
|
||||
textpath = self.handler.textpath
|
||||
binpath = self.handler.binpath
|
||||
try:
|
||||
textfile = open(self.textpath, mode='r')
|
||||
binfile = open(self.binpath, mode='r')
|
||||
textfile = open(textpath, mode='r')
|
||||
binfile = open(binpath, mode='r')
|
||||
except IOError:
|
||||
return '', 0, 0
|
||||
fcntl.flock(binfile, fcntl.LOCK_SH)
|
||||
self._lock(fcntl.LOCK_SH)
|
||||
binfile.seek(0, 2)
|
||||
binidx = binfile.tell() - 16
|
||||
binidx = binfile.tell()
|
||||
currsize = 0
|
||||
offsets = []
|
||||
termstate = None
|
||||
recenttimestamp = 0
|
||||
access_last_rename = False
|
||||
while binidx > 0 and currsize < size:
|
||||
binfile.seek(binidx, 0)
|
||||
binidx -= 16
|
||||
binfile.seek(binidx, 0)
|
||||
recbytes = binfile.read(16)
|
||||
(_, ltype, offset, datalen, tstamp, evtdata, eventaux, _) = \
|
||||
struct.unpack(">BBIHIBBH", recbytes)
|
||||
if ltype != 2:
|
||||
# rolling events found.
|
||||
if ltype == DataTypes.event and evtdata == Events.logrollover:
|
||||
# Now, we can only find the last renamed file which logging
|
||||
# the data.
|
||||
if access_last_rename == False:
|
||||
access_last_rename = True
|
||||
else:
|
||||
break
|
||||
textpath, binpath = parse_last_rolling_files(textfile, offset,
|
||||
datalen)
|
||||
# Rolling event detected, close the current bin file, then open
|
||||
# the renamed bin file.
|
||||
binfile.close()
|
||||
try:
|
||||
binfile = open(binpath, mode='r')
|
||||
except IOError:
|
||||
return '', 0, 0
|
||||
binfile.seek(0, 2)
|
||||
binidx = binfile.tell()
|
||||
elif ltype != 2:
|
||||
continue
|
||||
if tstamp > recenttimestamp:
|
||||
recenttimestamp = tstamp
|
||||
currsize += datalen
|
||||
offsets.append((offset, datalen))
|
||||
offsets.append((offset, datalen, textpath))
|
||||
if termstate is None:
|
||||
termstate = eventaux
|
||||
fcntl.flock(binfile, fcntl.LOCK_UN)
|
||||
binfile.close()
|
||||
textdata = ''
|
||||
fcntl.flock(textfile, fcntl.LOCK_SH)
|
||||
while offsets:
|
||||
(offset, length) = offsets.pop()
|
||||
(offset, length, textpath) = offsets.pop()
|
||||
if textfile.name != textpath:
|
||||
textfile.close()
|
||||
try:
|
||||
textfile = open(textpath)
|
||||
except IOError:
|
||||
return '', 0, 0
|
||||
textfile.seek(offset, 0)
|
||||
textdata += textfile.read(length)
|
||||
fcntl.flock(textfile, fcntl.LOCK_UN)
|
||||
self._lock(fcntl.LOCK_UN)
|
||||
textfile.close()
|
||||
if termstate is None:
|
||||
termstate = 0
|
||||
@ -270,8 +680,5 @@ class Logger(object):
|
||||
self.writer = eventlet.spawn_after(2, self.writedata)
|
||||
|
||||
def closelog(self):
|
||||
self.textfile.close()
|
||||
self.binfile.close()
|
||||
self.textfile = None
|
||||
self.binfile = None
|
||||
self.handler.close()
|
||||
self.closer = None
|
||||
|
@ -27,6 +27,7 @@
|
||||
|
||||
import atexit
|
||||
import confluent.auth as auth
|
||||
import confluent.config.conf as conf
|
||||
import confluent.config.configmanager as configmanager
|
||||
import confluent.consoleserver as consoleserver
|
||||
import confluent.core as confluentcore
|
||||
@ -40,7 +41,6 @@ import fcntl
|
||||
import sys
|
||||
import os
|
||||
import signal
|
||||
import ConfigParser
|
||||
|
||||
|
||||
def _daemonize():
|
||||
@ -126,10 +126,9 @@ def _initsecurity(config):
|
||||
|
||||
def run():
|
||||
_checkpidfile()
|
||||
configfile = "/etc/confluent/service.cfg"
|
||||
config = ConfigParser.ConfigParser()
|
||||
config.read(configfile)
|
||||
conf.init_config()
|
||||
try:
|
||||
config = conf.get_config()
|
||||
_initsecurity(config)
|
||||
except:
|
||||
sys.stderr.write("Error unlocking credential store\n")
|
||||
@ -150,8 +149,8 @@ def run():
|
||||
#dbgsock = eventlet.listen("/var/run/confluent/dbg.sock",
|
||||
# family=socket.AF_UNIX)
|
||||
#eventlet.spawn_n(backdoor.backdoor_server, dbgsock)
|
||||
http_bind_host, http_bind_port = _get_connector_config(config, 'http')
|
||||
sock_bind_host, sock_bind_port = _get_connector_config(config, 'socket')
|
||||
http_bind_host, http_bind_port = _get_connector_config('http')
|
||||
sock_bind_host, sock_bind_port = _get_connector_config('socket')
|
||||
consoleserver.start_console_sessions()
|
||||
webservice = httpapi.HttpApi(http_bind_host, http_bind_port)
|
||||
webservice.start()
|
||||
@ -161,12 +160,7 @@ def run():
|
||||
while 1:
|
||||
eventlet.sleep(100)
|
||||
|
||||
|
||||
def _get_connector_config(config, session):
|
||||
try:
|
||||
host = config.get(session, 'bindhost')
|
||||
port = config.getint(session, 'bindport')
|
||||
except (ConfigParser.NoSectionError, ConfigParser.NoOptionError) as e:
|
||||
host = None
|
||||
port = None
|
||||
def _get_connector_config(session):
|
||||
host = conf.get_option(session, 'bindhost')
|
||||
port = conf.get_int_option(session, 'bindport')
|
||||
return (host, port)
|
||||
|
Loading…
x
Reference in New Issue
Block a user