mirror of
https://github.com/xcat2/confluent.git
synced 2024-11-26 03:19:48 +00:00
Implement timestamp reporting
Have confetty include the vintage of incoming data so that user may know how likely the data is relevant to now.
This commit is contained in:
parent
afb6f4296c
commit
4443d67da6
@ -41,6 +41,7 @@
|
||||
# ~ I will not use for now...
|
||||
|
||||
import fcntl
|
||||
import math
|
||||
import getpass
|
||||
import optparse
|
||||
import os
|
||||
@ -49,6 +50,7 @@ import select
|
||||
import shlex
|
||||
import sys
|
||||
import termios
|
||||
import time
|
||||
import tty
|
||||
|
||||
exitcode = 0
|
||||
@ -67,18 +69,31 @@ conserversequence = '\x05c' # ctrl-e, c
|
||||
|
||||
oldtcattr = termios.tcgetattr(sys.stdin.fileno())
|
||||
netserver = None
|
||||
laststate = {}
|
||||
|
||||
|
||||
def updatestatus(stateinfo):
|
||||
def updatestatus(stateinfo={}):
|
||||
status = consolename
|
||||
info = []
|
||||
if ('connectstate' in stateinfo and
|
||||
stateinfo['connectstate'] != 'connected'):
|
||||
for statekey in stateinfo.iterkeys():
|
||||
laststate[statekey] = stateinfo[statekey]
|
||||
if ('connectstate' in laststate and
|
||||
laststate['connectstate'] != 'connected'):
|
||||
info.append(stateinfo['connectstate'])
|
||||
if 'error' in stateinfo:
|
||||
info.append(stateinfo['error'])
|
||||
if 'clientcount' in stateinfo and stateinfo['clientcount'] != 1:
|
||||
info.append('clients: %d' % stateinfo['clientcount'])
|
||||
if 'error' in laststate:
|
||||
info.append(laststate['error'])
|
||||
if 'clientcount' in laststate and laststate['clientcount'] != 1:
|
||||
info.append('clients: %d' % laststate['clientcount'])
|
||||
if 'bufferage' in stateinfo:
|
||||
laststate['showtime'] = time.time() - stateinfo['bufferage']
|
||||
if 'showtime' in laststate:
|
||||
showtime = laststate['showtime']
|
||||
age = time.time() - laststate['showtime']
|
||||
if age > 86400: # older than one day
|
||||
# disambiguate by putting date in and time
|
||||
info.append(time.strftime('%m-%dT%H:%M', time.localtime(showtime)))
|
||||
else:
|
||||
info.append(time.strftime('%H:%M', time.localtime(showtime)))
|
||||
if info:
|
||||
status += ' [' + ','.join(info) + ']'
|
||||
sys.stdout.write('\x1b]0;console: %s\x07' % status)
|
||||
@ -554,6 +569,12 @@ while not doexit:
|
||||
continue
|
||||
if data is not None:
|
||||
sys.stdout.write(data)
|
||||
now = time.time()
|
||||
if ('showtime' not in laststate or
|
||||
(now // 60) != laststate['showtime'] // 60):
|
||||
# don't bother churning if minute does not change
|
||||
laststate['showtime'] = now
|
||||
updatestatus()
|
||||
sys.stdout.flush()
|
||||
else:
|
||||
doexit = True
|
||||
|
@ -26,6 +26,7 @@ import confluent.exceptions as exc
|
||||
import confluent.interface.console as conapi
|
||||
import confluent.log as log
|
||||
import confluent.core as plugin
|
||||
import confluent.util as util
|
||||
import eventlet
|
||||
import eventlet.event
|
||||
import random
|
||||
@ -52,7 +53,18 @@ class _ConsoleHandler(object):
|
||||
self.logger = log.Logger(node, console=True,
|
||||
tenant=configmanager.tenant)
|
||||
self.buffer = bytearray()
|
||||
(text, termstate) = self.logger.read_recent_text(8192)
|
||||
(text, termstate, timestamp) = self.logger.read_recent_text(8192)
|
||||
# when reading from log file, we will use wall clock
|
||||
# it should usually match walltime.
|
||||
self.lasttime = 0
|
||||
if timestamp:
|
||||
timediff = time.time() - timestamp
|
||||
if timediff > 0:
|
||||
self.lasttime = util.monotonic_time() - timediff
|
||||
else:
|
||||
# wall clock has gone backwards, use current time as best
|
||||
# guess
|
||||
self.lasttime = util.monotonic_time()
|
||||
self.buffer += text
|
||||
self.appmodedetected = False
|
||||
self.shiftin = None
|
||||
@ -86,6 +98,15 @@ class _ConsoleHandler(object):
|
||||
elif (attrvalue[self.node]['console.logging']['value']) == 'none':
|
||||
self._dologging = False
|
||||
|
||||
def get_buffer_age(self):
|
||||
"""Return age of buffered data
|
||||
|
||||
Returns age in seconds of the buffered data or
|
||||
False in the event of calling before buffered data"""
|
||||
if self.lasttime:
|
||||
return util.monotonic_time() - self.lasttime
|
||||
return False
|
||||
|
||||
def _attribschanged(self, nodeattribs, configmanager, **kwargs):
|
||||
if 'console.logging' in nodeattribs[self.node]:
|
||||
# decide whether logging changes how we react or not
|
||||
@ -318,6 +339,7 @@ class _ConsoleHandler(object):
|
||||
if self.shiftin is not None:
|
||||
eventdata |= 2
|
||||
self.log(data, eventdata=eventdata)
|
||||
self.lasttime = util.monotonic_time()
|
||||
self.buffer += data
|
||||
#TODO: analyze buffer for registered events, examples:
|
||||
# panics
|
||||
@ -453,6 +475,12 @@ class ConsoleSession(object):
|
||||
def send_break(self):
|
||||
self.conshdl.send_break()
|
||||
|
||||
def get_buffer_age(self):
|
||||
"""Get the age in seconds of the buffered data
|
||||
|
||||
Returns False if no data buffered yet"""
|
||||
self.conshdl.get_buffer_age()
|
||||
|
||||
def reopen(self):
|
||||
self.conshdl.reopen()
|
||||
|
||||
|
@ -334,12 +334,18 @@ def resourcehandler_backend(env, start_response):
|
||||
consolesessions[sessid]['expiry'] = time.time() + 90
|
||||
outdata = consolesessions[sessid]['session'].get_next_output(
|
||||
timeout=45)
|
||||
bufferage = False
|
||||
if 'stampsent' not in consolesessions[sessid]:
|
||||
consoleserver[sessid]['stampsent'] = True
|
||||
bufferage = consolesessions[sessid]['session'].get_buffer_age()
|
||||
if isinstance(outdata, dict):
|
||||
rspdata = outdata
|
||||
rspdata['session'] = querydict['session']
|
||||
else:
|
||||
rspdata = {'session': querydict['session'],
|
||||
'data': outdata}
|
||||
if bufferage is not False:
|
||||
rspdata['bufferage'] = bufferage
|
||||
try:
|
||||
rsp = json.dumps(rspdata)
|
||||
except UnicodeDecodeError:
|
||||
|
@ -196,6 +196,7 @@ class Logger(object):
|
||||
currsize = 0
|
||||
offsets = []
|
||||
termstate = None
|
||||
recenttimestamp = 0
|
||||
while binidx > 0 and currsize < size:
|
||||
binfile.seek(binidx, 0)
|
||||
binidx -= 16
|
||||
@ -204,6 +205,8 @@ class Logger(object):
|
||||
struct.unpack(">BBIHIBBH", recbytes)
|
||||
if ltype != 2:
|
||||
continue
|
||||
if tstamp > recenttimestamp:
|
||||
recenttimestamp = tstamp
|
||||
currsize += datalen
|
||||
offsets.append((offset, datalen))
|
||||
if termstate is None:
|
||||
@ -220,7 +223,7 @@ class Logger(object):
|
||||
textfile.close()
|
||||
if termstate is None:
|
||||
termstate = 0
|
||||
return textdata, termstate
|
||||
return textdata, termstate, recenttimestamp
|
||||
|
||||
def write(self, data):
|
||||
"""Write plain text to log
|
||||
|
@ -163,6 +163,9 @@ def process_request(connection, request, cfm, authdata, authname, skipauth):
|
||||
raise Exception("TODO")
|
||||
tlvdata.send(connection, {'started': 1})
|
||||
ccons.startsending()
|
||||
bufferage = consession.get_buffer_age()
|
||||
if bufferage is not False:
|
||||
tlvdata.send(connection, {'bufferage': bufferage})
|
||||
while consession is not None:
|
||||
data = tlvdata.recv(connection)
|
||||
if type(data) == dict:
|
||||
|
Loading…
Reference in New Issue
Block a user