2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-01-28 11:57:37 +00:00

Attempt to mitigate memory consumption

This commit is contained in:
Jarrod Johnson 2013-09-16 16:56:59 -04:00
parent 807b5366ee
commit e746d0dd3b
2 changed files with 26 additions and 16 deletions

View File

@ -9,6 +9,7 @@
#there should be no more than one handler per node
import confluent.pluginapi as plugin
import confluent.util as util
from cStringIO import StringIO
_handled_consoles = {}
@ -34,7 +35,6 @@ class _ConsoleHandler(object):
#also, timestamp data...
def get_console_output(self, data):
self.buffer += data
#TODO: analyze buffer for registered events, examples:
# panics
# certificate signing request
@ -42,24 +42,29 @@ class _ConsoleHandler(object):
#call to function to get generic data to log if applicable
#and shrink buffer
self.flushbuffer()
bufidx = len(self.buffer)
bufsz = len(data)
self.buffer.extend(data)
view = buffer(self.buffer, bufidx, bufsz)
for rcpt in self.rcpts:
rcpt(data)
rcpt(view)
def get_recent(self):
#this is one scheme to clear screen, move cursor then clear
if len(self.buffer) > 512:
bgn = -512
else:
bflen = len(self.buffer)
bgn = bflen - 512
if bgn < 0:
bgn = 0
#this covers the case of move cursor to begin, clear to end
bufidx = self.buffer[bgn:].rfind('\x1b[H\x1b[J')
if bufidx >= 0:
return str(self.buffer[bufidx:])
#another scheme is the 2J scheme
return buffer(self.buffer,bufidx, bflen - bufidx)
#this handles the 'clear all' control code, often used by firmware
bufidx = self.buffer[bgn:].rfind('\x1b[2J')
if bufidx >= 0:
return str(self.buffer[bufidx:])
return buffer(self.buffer,bufidx, bflen - bufidx)
else:
return str(self.buffer[bgn:])
return buffer(self.buffer,bgn, bflen - bufidx)
def write(self, data):
#TODO.... take note of data coming in from audit/log perspective?
@ -87,7 +92,7 @@ class ConsoleSession(object):
self.conshdl = _handled_consoles[node]
self.write = _handled_consoles[node].write
if datacallback is None:
self.databuffer = _handled_consoles[node].get_recent()
self.databuffers = [ _handled_consoles[node].get_recent() ]
_handled_consoles[node].register_rcpt(self.got_data)
else:
_handled_consoles[node].register_rcpt(datacallback)
@ -99,7 +104,7 @@ class ConsoleSession(object):
If the caller does not provide a callback and instead will be polling
for data, we must maintain data in a buffer until retrieved
"""
self.databuffer += data
self.databuffers.append(data)
def get_next_output(self, timeout=45):
"""Poll for next available output on this console.
@ -109,13 +114,15 @@ class ConsoleSession(object):
"""
currtime = util.monotonic_time()
deadline = currtime + 45
while len(self.databuffer) == 0 and currtime < deadline:
while len(self.databuffers) == 0 and currtime < deadline:
timeo = deadline - currtime
self.conshdl._console.wait_for_data(timeout=timeo)
currtime = util.monotonic_time()
retval = self.databuffer
self.databuffer = ""
return retval
retval = StringIO()
for buf in self.databuffers:
retval.write(buf)
self.databuffers = []
return retval.getvalue()
def handle_request(request=None, connection=None, releaseconnection=False):
@ -123,7 +130,7 @@ def handle_request(request=None, connection=None, releaseconnection=False):
Process a request from confluent.
:param request: For 'datagram' style console, this represents a wait for
data or input.
data or input.
:param connection: For socket style console, this is a read/write socket
that the caller has released from it's control and
console plugin will do all IO

View File

@ -15,6 +15,7 @@ import confluent.pluginapi as pluginapi
import confluent.httpapi as httpapi
import confluent.sockapi as sockapi
import eventlet
import eventlet.backdoor as backdoor
from eventlet.green import socket
from eventlet import wsgi
import multiprocessing
@ -27,6 +28,8 @@ def run():
webservice.start()
sockservice = sockapi.SockApi()
sockservice.start()
#TODO: Unix domain socket instead
eventlet.spawn(backdoor.backdoor_server, eventlet.listen(('localhost', 2121)))
while (1):
eventlet.sleep(100)