2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-01-14 19:57:50 +00:00

Attempt to catch filesystem full condition

Provide a more obvious behavior when filesystem fills
to explain confluent behavior in this situation.
This commit is contained in:
Jarrod Johnson 2019-02-21 16:32:37 -05:00
parent 2691722f48
commit ca9e7d1d93
3 changed files with 41 additions and 15 deletions

View File

@ -35,6 +35,7 @@
import confluent
import confluent.alerts as alerts
import confluent.log as log
import confluent.tlvdata as tlvdata
import confluent.config.attributes as attrscheme
import confluent.config.configmanager as cfm
@ -685,6 +686,8 @@ def _forward_rsp(connection, res):
def handle_node_request(configmanager, inputdata, operation,
pathcomponents, autostrip=True):
if log.logfull:
raise exc.TargetResourceUnavailable('Filesystem full, free up space and restart confluent service')
iscollection = False
routespec = None
if pathcomponents[0] == 'noderange':

View File

@ -73,6 +73,8 @@ import struct
import time
import traceback
daemonized = False
logfull = False
try:
from fcntl import flock, LOCK_EX, LOCK_UN, LOCK_SH
except ImportError:
@ -150,21 +152,34 @@ class BaseRotatingHandler(object):
Output the record to the file, catering for rollover as described
in doRollover().
"""
rolling_type = self.shouldRollover(binrecord, textrecord)
if rolling_type:
flock(self.textfile, LOCK_UN)
return self.doRollover(rolling_type)
return None
global logfull
try:
rolling_type = self.shouldRollover(binrecord, textrecord)
if rolling_type:
flock(self.textfile, LOCK_UN)
return self.doRollover(rolling_type)
return None
except (IOError, OSError) as e:
if not daemonized:
raise
logfull = True
def emit(self, binrecord, textrecord):
if self.textfile is None:
self.textfile = open(self.textpath, mode='ab')
if self.binfile is None:
self.binfile = open(self.binpath, mode='ab')
self.textfile.write(textrecord)
self.binfile.write(binrecord)
self.textfile.flush()
self.binfile.flush()
global logfull
try:
if self.textfile is None:
self.textfile = open(self.textpath, mode='ab')
if self.binfile is None:
self.binfile = open(self.binpath, mode='ab')
self.textfile.write(textrecord)
self.binfile.write(binrecord)
self.textfile.flush()
self.binfile.flush()
except (IOError, OSError) as e:
if not daemonized:
raise
logfull = True
def get_textfile_offset(self, data_len):
if self.textfile is None:
@ -753,11 +768,13 @@ globaleventlog = None
tracelog = None
def log(logdata=None, ltype=None, event=0, eventdata=None):
def log(logdata=None, ltype=None, event=0, eventdata=None, flush=False):
global globaleventlog
if globaleventlog is None:
globaleventlog = Logger('events')
globaleventlog.log(logdata, ltype, event, eventdata)
if flush:
globaleventlog.writedata()
def logtrace():
global tracelog

View File

@ -72,7 +72,7 @@ def _daemonize():
os.setsid()
thispid = os.fork()
if thispid > 0:
print 'confluent server starting as pid %d' % thispid
print('confluent server starting as pid {0}'.format(thispid))
os._exit(0)
os.closerange(0, 2)
os.umask(63)
@ -81,6 +81,7 @@ def _daemonize():
os.dup2(0, 2)
sys.stdout = log.Logger('stdout', buffered=False)
sys.stderr = log.Logger('stderr', buffered=False)
log.daemonized = True
def _updatepidfile():
@ -224,6 +225,11 @@ def run():
except:
doexit()
raise
try:
log.log({'info': 'Confluent management service starting'}, flush=True)
except (OSError, IOError) as e:
print(repr(e))
sys.exit(1)
_daemonize()
if havefcntl:
_updatepidfile()