2
0
mirror of https://github.com/xcat2/confluent.git synced 2024-11-23 01:53:28 +00:00

Support walking back through multiple logs

The rollback support and replaydid not follow more than one log back.  Do the work to recurse
into older and older files, until big enough buffer or run out of files.
This commit is contained in:
Jarrod Johnson 2016-03-13 19:50:02 -04:00
parent f75f2cae51
commit 9d40c67974

View File

@ -1,7 +1,7 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 IBM Corporation
# Copyright 2015 Lenovo
# Copyright 2015-2016 Lenovo
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -604,7 +604,6 @@ class Logger(object):
offsets = []
termstate = None
recenttimestamp = 0
access_last_rename = False
flock(textfile, LOCK_SH)
while binidx > 0 and currsize < size:
binidx -= 16
@ -614,23 +613,21 @@ class Logger(object):
struct.unpack(">BBIHIBBH", recbytes)
# rolling events found.
if ltype == DataTypes.event and evtdata == Events.logrollover:
# Now, we can only find the last renamed file which logging
# the data.
if access_last_rename == False:
access_last_rename = True
else:
break
textpath, binpath = parse_last_rolling_files(textfile, offset,
datalen)
# Rolling event detected, close the current bin file, then open
# the renamed bin file.
flock(binfile, LOCK_UN)
flock(textfile, LOCK_UN)
binfile.close()
textfile.close()
try:
binfile = open(binpath, mode='r')
textfile = open(textpath, mode='r')
except IOError:
return '', 0, 0
break
flock(binfile, LOCK_SH)
flock(textfile, LOCK_SH)
binfile.seek(0, 2)
binidx = binfile.tell()
# things have been set up for next iteration to dig to
@ -644,22 +641,32 @@ class Logger(object):
offsets.append((offset, datalen, textpath))
if termstate is None:
termstate = eventaux
flock(binfile, LOCK_UN)
binfile.close()
try:
flock(binfile, LOCK_UN)
binfile.close()
except:
pass
textdata = ''
while offsets:
(offset, length, textpath) = offsets.pop()
if textfile.name != textpath:
flock(textfile, LOCK_UN)
textfile.close()
try:
textfile = open(textpath)
except IOError:
return '', 0, 0
textfile.seek(offset, 0)
flock(textfile, LOCK_UN)
textfile.close()
textfile = open(textpath, mode='r')
except (ValueError, IOError) as e:
break
try:
textfile.seek(offset, 0)
except ValueError:
# file was closed, settle with what we have and continue
break
textdata += textfile.read(length)
flock(textfile, LOCK_UN)
textfile.close()
try:
flock(textfile, LOCK_UN)
textfile.close()
except:
pass
if termstate is None:
termstate = 0
return textdata, termstate, recenttimestamp