2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-01-27 19:37:57 +00:00

Attempt to read in recent state/data from log

The attempt seems to not work right at the moment, but it seems to be in the
right direction and also does grab terminal state.
This commit is contained in:
Jarrod Johnson 2014-03-12 17:12:01 -04:00
parent 2ba93379b7
commit 28a9c9b900
2 changed files with 56 additions and 4 deletions

View File

@ -24,10 +24,17 @@ class _ConsoleHandler(object):
self.node = node
self.logger = log.Logger(node, tenant=configmanager.tenant)
self.buffer = bytearray()
self._connect()
self.users = {}
(text, termstate) = self.logger.read_recent_text(8192)
self.buffer += text
self.appmodedetected = False
self.shiftin = None
if termstate & 1:
self.appmodedetected = True
if termstate & 2:
self.shiftin = '0'
self._connect()
self.users = {}
def _connect(self):
self._console = plugin.handle_path(
@ -159,6 +166,10 @@ class _ConsoleHandler(object):
self._console.write(data)
def connect_node(node, configmanager):
consk = (node, configmanager.tenant)
if consk not in _handled_consoles:
_handled_consoles[consk] = _ConsoleHandler(node, configmanager)
#this represents some api view of a console handler. This handles things like
#holding the caller specific queue data, for example, when http api should be
#sending data, but there is no outstanding POST request to hold it,
@ -178,8 +189,7 @@ class ConsoleSession(object):
consk = (node, self.tenant)
self.ckey = consk
self.username = username
if consk not in _handled_consoles:
_handled_consoles[consk] = _ConsoleHandler(node, configmanager)
connect_node(node, configmanager)
_handled_consoles[consk].attachuser(username)
self._evt = threading.Event()
self.node = node

View File

@ -48,6 +48,7 @@
import collections
import confluent.config.configmanager as configuration
import eventlet
import fcntl
import os
import struct
import time
@ -116,6 +117,7 @@ class Logger(object):
elif not self.isconsole:
textdate = time.strftime(
'%b %d %H:%M:%S ', time.localtime(tstamp))
fcntl.flock(self.textfile, fcntl.LOCK_EX)
offset = self.textfile.tell() + len(textdate)
datalen = len(data)
eventaux = entry[4]
@ -132,13 +134,53 @@ class Logger(object):
else:
textrecord = textdate + data + '\n'
self.textfile.write(textrecord)
fcntl.flock(self.textfile, fcntl.LOCK_UN)
fcntl.flock(self.binfile, fcntl.LOCK_EX)
self.binfile.write(binrecord)
fcntl.flock(self.binfile, fcntl.LOCK_UN)
self.textfile.flush()
self.binfile.flush()
if self.closer is None:
self.closer = eventlet.spawn_after(15, self.closelog)
self.writer = None
def read_recent_text(self, size):
try:
textfile = open(self.textpath, mode='r')
binfile = open(self.binpath, mode='r')
except IOError:
return ('', 0)
fcntl.flock(binfile, fcntl.LOCK_SH)
binfile.seek(0, 2)
binidx = binfile.tell() - 16
currsize = 0
offsets = collections.deque()
termstate = 0
while binidx > 0 and currsize < size:
binfile.seek(binidx, 0)
binidx -= 16
recbytes = binfile.read(16)
(_, ltype, offset, datalen, tstamp, evtdata, eventaux, _) = \
struct.unpack(">BBIHIBBH", recbytes)
binrecord = struct.pack(">BBIHIBBH",
16, ltype, offset, datalen, tstamp, evtdata, eventaux, 0)
if ltype != 2:
continue
currsize += datalen
offsets.append((offset, datalen))
termstate = termstate | eventaux
fcntl.flock(binfile, fcntl.LOCK_UN)
binfile.close()
textdata = ''
fcntl.flock(textfile, fcntl.LOCK_SH)
while offsets:
(offset, len) = offsets.popleft()
textfile.seek(offset, 0)
textdata += textfile.read(len)
fcntl.flock(textfile, fcntl.LOCK_UN)
textfile.close()
return (textdata, termstate)
def log(self, logdata=None, ltype=None, event=0, eventdata=None):
if type(logdata) not in (str, unicode, dict):
raise Exception("Unsupported logdata")