mirror of
https://github.com/xcat2/confluent.git
synced 2025-09-02 08:18:28 +00:00
Implement preliminary console logging facility
Implement the bits and pieces that are at least required for conserver like logging. This has a plaintext file and a binary metadata file. The plaintext file basically resembles a conserver log, while the binary file facilitates faster seeking to points of interest with the file and much more precise timestamp information.
This commit is contained in:
@@ -8,6 +8,7 @@
|
||||
#we track nodes that are actively being logged, watched, or have attached
|
||||
#there should be no more than one handler per node
|
||||
import confluent.interface.console as conapi
|
||||
import confluent.log as log
|
||||
import confluent.pluginapi as plugin
|
||||
import eventlet
|
||||
import eventlet.green.threading as threading
|
||||
@@ -21,6 +22,7 @@ class _ConsoleHandler(object):
|
||||
self.rcpts = {}
|
||||
self.cfgmgr = configmanager
|
||||
self.node = node
|
||||
self.logger = log.Logger(node, tenant=configmanager.tenant)
|
||||
self.buffer = bytearray()
|
||||
self._connect()
|
||||
|
||||
@@ -55,11 +57,22 @@ class _ConsoleHandler(object):
|
||||
# to the console object
|
||||
eventlet.spawn(self._handle_console_output, data)
|
||||
|
||||
def attachuser(self, username):
|
||||
self.logger.log(
|
||||
logdata=username, ltype=log.DataTypes.event,
|
||||
event=log.Events.clientconnect)
|
||||
|
||||
def detachuser(self, username):
|
||||
self.logger.log(
|
||||
logdata=username, ltype=log.DataTypes.event,
|
||||
event=log.Events.clientdisconnect)
|
||||
|
||||
def _handle_console_output(self, data):
|
||||
if type(data) == int:
|
||||
if data == conapi.ConsoleEvent.Disconnect:
|
||||
self._connect()
|
||||
return
|
||||
self.logger.log(data)
|
||||
self.buffer += data
|
||||
#TODO: analyze buffer for registered events, examples:
|
||||
# panics
|
||||
@@ -121,12 +134,14 @@ class ConsoleSession(object):
|
||||
:param node: Name of the node for which this session will be created
|
||||
"""
|
||||
|
||||
def __init__(self, node, configmanager, datacallback=None):
|
||||
def __init__(self, node, configmanager, username, datacallback=None):
|
||||
self.tenant = configmanager.tenant
|
||||
consk = (node, self.tenant)
|
||||
self.ckey = consk
|
||||
self.username = username
|
||||
if consk not in _handled_consoles:
|
||||
_handled_consoles[consk] = _ConsoleHandler(node, configmanager)
|
||||
_handled_consoles[consk].attachuser(username)
|
||||
self._evt = threading.Event()
|
||||
self.node = node
|
||||
self.conshdl = _handled_consoles[consk]
|
||||
@@ -142,6 +157,7 @@ class ConsoleSession(object):
|
||||
datacallback(recdata)
|
||||
|
||||
def destroy(self):
|
||||
_handled_consoles[self.ckey].detachuser(self.username)
|
||||
_handled_consoles[self.ckey].unregister_rcpt(self.reghdl)
|
||||
self.databuffer = None
|
||||
self._evt = None
|
||||
|
@@ -155,6 +155,7 @@ def _authorize_request(env):
|
||||
return {'code': 200,
|
||||
'cookie': cookie,
|
||||
'cfgmgr': authdata[1],
|
||||
'username': name,
|
||||
'userdata': authdata[0]}
|
||||
else:
|
||||
return {'code': 401}
|
||||
@@ -237,8 +238,9 @@ def resourcehandler(env, start_response):
|
||||
_, _, nodename = prefix.rpartition('/')
|
||||
if 'session' not in querydict.keys() or not querydict['session']:
|
||||
# Request for new session
|
||||
consession = consoleserver.ConsoleSession(node=nodename,
|
||||
configmanager=cfgmgr)
|
||||
consession = consoleserver.ConsoleSession(
|
||||
node=nodename, configmanager=cfgmgr,
|
||||
username=authorized['username'])
|
||||
if not consession:
|
||||
start_response("500 Internal Server Error", headers)
|
||||
return
|
||||
|
109
confluent/log.py
109
confluent/log.py
@@ -36,7 +36,8 @@
|
||||
# The information to store:
|
||||
# - leading bit reserved, 0 for now
|
||||
# - length of metadata record 7 bits
|
||||
# - type of data referenced by this entry (one byte)
|
||||
# - type of data referenced by this entry (one byte), currently:
|
||||
# 0=text event, 1=json, 2=console data
|
||||
# - offset into the text log to begin (4 bytes)
|
||||
# - length of data referenced by this entry (2 bytes)
|
||||
# - UTC timestamp of this entry in seconds since epoch (unsigned 32 bit?)
|
||||
@@ -44,7 +45,11 @@
|
||||
# (a future extended version might include suport for Forward Secure Sealing
|
||||
# or other fields)
|
||||
|
||||
import confluent.config.configmanager as configuration
|
||||
import eventlet
|
||||
import os
|
||||
import struct
|
||||
import time
|
||||
|
||||
# on conserving filehandles:
|
||||
# upon write, if file not open, open it for append
|
||||
@@ -57,7 +62,103 @@ import os
|
||||
# if that happens, warn to have user increase ulimit for optimal
|
||||
# performance
|
||||
|
||||
|
||||
class Events(object):
|
||||
undefined, clearscreen, clientconnect, clientdisconnect = range(4)
|
||||
logstr = {
|
||||
2: 'connection by ',
|
||||
3: 'disconnection by ',
|
||||
}
|
||||
|
||||
|
||||
class DataTypes(object):
|
||||
text, dictionary, console, event = range(4)
|
||||
|
||||
class Logger(object):
|
||||
def __init__(self, location, console=True, configmanager):
|
||||
self.location = location
|
||||
os.path.isdir(location)
|
||||
"""
|
||||
:param console: If true, [] will be used to denote non-text events. If
|
||||
False, events will be formatted like syslog:
|
||||
date: message<CR>
|
||||
"""
|
||||
def __init__(self, logname, console=True, tenant=None):
|
||||
self.filepath = configuration.get_global("logdirectory")
|
||||
if self.filepath is None:
|
||||
self.filepath = "/var/log/confluent/"
|
||||
self.isconsole = console
|
||||
if console:
|
||||
self.filepath += "consoles/"
|
||||
self.textpath = self.filepath + logname
|
||||
self.binpath = self.filepath + logname + ".cbl"
|
||||
self.writer = None
|
||||
self.closer = None
|
||||
self.textfile = None
|
||||
self.binfile = None
|
||||
self.logentries = []
|
||||
|
||||
def writedata(self):
|
||||
if self.textfile is None:
|
||||
self.textfile = open(self.textpath, mode='ab')
|
||||
if self.binfile is None:
|
||||
self.binfile = open(self.binpath, mode='ab')
|
||||
for entry in self.logentries:
|
||||
ltype = entry[0]
|
||||
tstamp = entry[1]
|
||||
data = entry[2]
|
||||
evtdata = entry[3]
|
||||
textdate = ''
|
||||
if self.isconsole and ltype != 2:
|
||||
textdate = time.strftime(
|
||||
'[%m/%d %H:%M:%S ', time.localtime(tstamp))
|
||||
if ltype == DataTypes.event and evtdata in Events.logstr:
|
||||
textdate += Events.logstr[evtdata]
|
||||
elif not self.isconsole:
|
||||
textdate = time.strftime(
|
||||
'%b %d %H:%M:%S ', time.localtime(tstamp))
|
||||
offset = self.textfile.tell() + len(textdate)
|
||||
datalen = len(data)
|
||||
# metadata length is always 16 for this code at the moment
|
||||
binrecord = struct.pack(">BBIHII",
|
||||
16, ltype, offset, datalen, tstamp, evtdata)
|
||||
if self.isconsole:
|
||||
if ltype == 2:
|
||||
textrecord = data
|
||||
else:
|
||||
textrecord = textdate + data + ']'
|
||||
else:
|
||||
textrecord = textdate + data + '\n'
|
||||
self.textfile.write(textrecord)
|
||||
self.binfile.write(binrecord)
|
||||
self.logentries = []
|
||||
if self.closer is None:
|
||||
self.closer = eventlet.spawn_after(15, self.closelog)
|
||||
self.writer = None
|
||||
|
||||
def log(self, logdata=None, ltype=None, event=0):
|
||||
if type(logdata) not in (str, unicode, dict):
|
||||
raise Exception("Unsupported logdata")
|
||||
if ltype is None:
|
||||
if type(logdata) == dict:
|
||||
ltype = 1
|
||||
elif self.isconsole:
|
||||
ltype = 2
|
||||
else:
|
||||
ltype = 0
|
||||
if self.closer is not None:
|
||||
self.closer.cancel()
|
||||
self.closer = None
|
||||
timestamp = int(time.time())
|
||||
if (len(self.logentries) > 0 and ltype == 2 and
|
||||
event == 0 and self.logentries[-1][0] == 2 and
|
||||
self.logentries[-1][1] == timestamp):
|
||||
self.logentries[-1][2] += logdata
|
||||
else:
|
||||
self.logentries.append([ltype, timestamp, logdata, event])
|
||||
if self.writer is None:
|
||||
self.writer = eventlet.spawn_after(2, self.writedata)
|
||||
|
||||
def closelog(self):
|
||||
self.textfile.close()
|
||||
self.binfile.close()
|
||||
self.textfile = None
|
||||
self.binfile = None
|
||||
self.closer = None
|
||||
|
@@ -48,6 +48,7 @@ def sessionhdl(connection, authname):
|
||||
authdata = None
|
||||
if authname and isinstance(authname, bool):
|
||||
authenticated = True
|
||||
authname = "superuser"
|
||||
cfm = configmanager.ConfigManager(tenant=None)
|
||||
elif authname:
|
||||
authdata = auth.authorize(authname, element=None)
|
||||
@@ -58,13 +59,13 @@ def sessionhdl(connection, authname):
|
||||
while not authenticated: # prompt for name and passphrase
|
||||
tlvdata.send(connection, {'authpassed': 0})
|
||||
response = tlvdata.recv(connection)
|
||||
username = response['username']
|
||||
authname = response['username']
|
||||
passphrase = response['passphrase']
|
||||
# note(jbjohnso): here, we need to authenticate, but not
|
||||
# authorize a user. When authorization starts understanding
|
||||
# element path, that authorization will need to be called
|
||||
# per request the user makes
|
||||
authdata = auth.check_user_passphrase(username, passphrase)
|
||||
authdata = auth.check_user_passphrase(authname, passphrase)
|
||||
if authdata is not None:
|
||||
authenticated = True
|
||||
cfm = authdata[1]
|
||||
@@ -72,7 +73,7 @@ def sessionhdl(connection, authname):
|
||||
request = tlvdata.recv(connection)
|
||||
while request is not None:
|
||||
try:
|
||||
process_request(connection, request, cfm, authdata)
|
||||
process_request(connection, request, cfm, authdata, authname)
|
||||
except:
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
@@ -90,7 +91,7 @@ def send_response(responses, connection):
|
||||
tlvdata.send(connection, {'_requestdone': 1})
|
||||
|
||||
|
||||
def process_request(connection, request, cfm, authdata):
|
||||
def process_request(connection, request, cfm, authdata, authname):
|
||||
#TODO(jbjohnso): authorize each request
|
||||
if type(request) == dict:
|
||||
operation = request['operation']
|
||||
@@ -105,7 +106,8 @@ def process_request(connection, request, cfm, authdata):
|
||||
node = elems[2]
|
||||
ccons = ClientConsole(connection)
|
||||
consession = consoleserver.ConsoleSession(
|
||||
node=node, configmanager=cfm, datacallback=ccons.sendall)
|
||||
node=node, configmanager=cfm, username=authname,
|
||||
datacallback=ccons.sendall)
|
||||
if consession is None:
|
||||
raise Exception("TODO")
|
||||
tlvdata.send(connection, {'started': 1})
|
||||
|
Reference in New Issue
Block a user