From 4443d67da6e29b0edb92398e9cacc5096712683e Mon Sep 17 00:00:00 2001 From: Jarrod Johnon Date: Mon, 6 Oct 2014 15:12:16 -0400 Subject: [PATCH] Implement timestamp reporting Have confetty include the vintage of incoming data so that user may know how likely the data is relevant to now. --- confluent_client/bin/confetty | 35 ++++++++++++++++----- confluent_server/confluent/consoleserver.py | 30 +++++++++++++++++- confluent_server/confluent/httpapi.py | 6 ++++ confluent_server/confluent/log.py | 5 ++- confluent_server/confluent/sockapi.py | 3 ++ 5 files changed, 70 insertions(+), 9 deletions(-) diff --git a/confluent_client/bin/confetty b/confluent_client/bin/confetty index a4fb7a57..351c578e 100755 --- a/confluent_client/bin/confetty +++ b/confluent_client/bin/confetty @@ -41,6 +41,7 @@ # ~ I will not use for now... import fcntl +import math import getpass import optparse import os @@ -49,6 +50,7 @@ import select import shlex import sys import termios +import time import tty exitcode = 0 @@ -67,18 +69,31 @@ conserversequence = '\x05c' # ctrl-e, c oldtcattr = termios.tcgetattr(sys.stdin.fileno()) netserver = None +laststate = {} -def updatestatus(stateinfo): +def updatestatus(stateinfo={}): status = consolename info = [] - if ('connectstate' in stateinfo and - stateinfo['connectstate'] != 'connected'): + for statekey in stateinfo.iterkeys(): + laststate[statekey] = stateinfo[statekey] + if ('connectstate' in laststate and + laststate['connectstate'] != 'connected'): info.append(stateinfo['connectstate']) - if 'error' in stateinfo: - info.append(stateinfo['error']) - if 'clientcount' in stateinfo and stateinfo['clientcount'] != 1: - info.append('clients: %d' % stateinfo['clientcount']) + if 'error' in laststate: + info.append(laststate['error']) + if 'clientcount' in laststate and laststate['clientcount'] != 1: + info.append('clients: %d' % laststate['clientcount']) + if 'bufferage' in stateinfo: + laststate['showtime'] = time.time() - stateinfo['bufferage'] + if 'showtime' in laststate: + showtime = laststate['showtime'] + age = time.time() - laststate['showtime'] + if age > 86400: # older than one day + # disambiguate by putting date in and time + info.append(time.strftime('%m-%dT%H:%M', time.localtime(showtime))) + else: + info.append(time.strftime('%H:%M', time.localtime(showtime))) if info: status += ' [' + ','.join(info) + ']' sys.stdout.write('\x1b]0;console: %s\x07' % status) @@ -554,6 +569,12 @@ while not doexit: continue if data is not None: sys.stdout.write(data) + now = time.time() + if ('showtime' not in laststate or + (now // 60) != laststate['showtime'] // 60): + # don't bother churning if minute does not change + laststate['showtime'] = now + updatestatus() sys.stdout.flush() else: doexit = True diff --git a/confluent_server/confluent/consoleserver.py b/confluent_server/confluent/consoleserver.py index 7b9db555..6a87e6a6 100644 --- a/confluent_server/confluent/consoleserver.py +++ b/confluent_server/confluent/consoleserver.py @@ -26,6 +26,7 @@ import confluent.exceptions as exc import confluent.interface.console as conapi import confluent.log as log import confluent.core as plugin +import confluent.util as util import eventlet import eventlet.event import random @@ -52,7 +53,18 @@ class _ConsoleHandler(object): self.logger = log.Logger(node, console=True, tenant=configmanager.tenant) self.buffer = bytearray() - (text, termstate) = self.logger.read_recent_text(8192) + (text, termstate, timestamp) = self.logger.read_recent_text(8192) + # when reading from log file, we will use wall clock + # it should usually match walltime. + self.lasttime = 0 + if timestamp: + timediff = time.time() - timestamp + if timediff > 0: + self.lasttime = util.monotonic_time() - timediff + else: + # wall clock has gone backwards, use current time as best + # guess + self.lasttime = util.monotonic_time() self.buffer += text self.appmodedetected = False self.shiftin = None @@ -86,6 +98,15 @@ class _ConsoleHandler(object): elif (attrvalue[self.node]['console.logging']['value']) == 'none': self._dologging = False + def get_buffer_age(self): + """Return age of buffered data + + Returns age in seconds of the buffered data or + False in the event of calling before buffered data""" + if self.lasttime: + return util.monotonic_time() - self.lasttime + return False + def _attribschanged(self, nodeattribs, configmanager, **kwargs): if 'console.logging' in nodeattribs[self.node]: # decide whether logging changes how we react or not @@ -318,6 +339,7 @@ class _ConsoleHandler(object): if self.shiftin is not None: eventdata |= 2 self.log(data, eventdata=eventdata) + self.lasttime = util.monotonic_time() self.buffer += data #TODO: analyze buffer for registered events, examples: # panics @@ -453,6 +475,12 @@ class ConsoleSession(object): def send_break(self): self.conshdl.send_break() + def get_buffer_age(self): + """Get the age in seconds of the buffered data + + Returns False if no data buffered yet""" + self.conshdl.get_buffer_age() + def reopen(self): self.conshdl.reopen() diff --git a/confluent_server/confluent/httpapi.py b/confluent_server/confluent/httpapi.py index d5fe3f78..2231182b 100644 --- a/confluent_server/confluent/httpapi.py +++ b/confluent_server/confluent/httpapi.py @@ -334,12 +334,18 @@ def resourcehandler_backend(env, start_response): consolesessions[sessid]['expiry'] = time.time() + 90 outdata = consolesessions[sessid]['session'].get_next_output( timeout=45) + bufferage = False + if 'stampsent' not in consolesessions[sessid]: + consoleserver[sessid]['stampsent'] = True + bufferage = consolesessions[sessid]['session'].get_buffer_age() if isinstance(outdata, dict): rspdata = outdata rspdata['session'] = querydict['session'] else: rspdata = {'session': querydict['session'], 'data': outdata} + if bufferage is not False: + rspdata['bufferage'] = bufferage try: rsp = json.dumps(rspdata) except UnicodeDecodeError: diff --git a/confluent_server/confluent/log.py b/confluent_server/confluent/log.py index 57a48f76..90c11235 100644 --- a/confluent_server/confluent/log.py +++ b/confluent_server/confluent/log.py @@ -196,6 +196,7 @@ class Logger(object): currsize = 0 offsets = [] termstate = None + recenttimestamp = 0 while binidx > 0 and currsize < size: binfile.seek(binidx, 0) binidx -= 16 @@ -204,6 +205,8 @@ class Logger(object): struct.unpack(">BBIHIBBH", recbytes) if ltype != 2: continue + if tstamp > recenttimestamp: + recenttimestamp = tstamp currsize += datalen offsets.append((offset, datalen)) if termstate is None: @@ -220,7 +223,7 @@ class Logger(object): textfile.close() if termstate is None: termstate = 0 - return textdata, termstate + return textdata, termstate, recenttimestamp def write(self, data): """Write plain text to log diff --git a/confluent_server/confluent/sockapi.py b/confluent_server/confluent/sockapi.py index 4e3b9b7f..9a291dd7 100644 --- a/confluent_server/confluent/sockapi.py +++ b/confluent_server/confluent/sockapi.py @@ -163,6 +163,9 @@ def process_request(connection, request, cfm, authdata, authname, skipauth): raise Exception("TODO") tlvdata.send(connection, {'started': 1}) ccons.startsending() + bufferage = consession.get_buffer_age() + if bufferage is not False: + tlvdata.send(connection, {'bufferage': bufferage}) while consession is not None: data = tlvdata.recv(connection) if type(data) == dict: