mirror of
https://github.com/xcat2/confluent.git
synced 2025-01-16 20:57:53 +00:00
659c4e4217
If a node is deleted, act similar to if it were defined with no console.method, to avoid superfluous trace output. In the future, it may make sense to filter out nodes with no console.method earlier, since a fair amount of startup work is done that is ultimately ignored for situations where console is not enabled.
711 lines
26 KiB
Python
711 lines
26 KiB
Python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
|
|
|
# Copyright 2014 IBM Corporation
|
|
# Copyright 2017 Lenovo
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
# This is the common console support for confluent. It takes over
|
|
# whatever filehandle is conversing with the client and starts
|
|
# relaying data. It uses Ctrl-] like telnet for escape back to prompt
|
|
|
|
# we track nodes that are actively being logged, watched, or have attached
|
|
# there should be no more than one handler per node
|
|
import codecs
|
|
import collections
|
|
import confluent.config.configmanager as configmodule
|
|
import confluent.exceptions as exc
|
|
import confluent.interface.console as conapi
|
|
import confluent.log as log
|
|
import confluent.core as plugin
|
|
import confluent.util as util
|
|
import eventlet
|
|
import eventlet.event
|
|
import pyte
|
|
import random
|
|
import time
|
|
import traceback
|
|
|
|
_handled_consoles = {}
|
|
|
|
_tracelog = None
|
|
|
|
try:
|
|
range = xrange
|
|
except NameError:
|
|
pass
|
|
|
|
pytecolors2ansi = {
|
|
'black': 0,
|
|
'red': 1,
|
|
'green': 2,
|
|
'brown': 3,
|
|
'blue': 4,
|
|
'magenta': 5,
|
|
'cyan': 6,
|
|
'white': 7,
|
|
'default': 9,
|
|
}
|
|
# might be able to use IBMPC map from pyte charsets,
|
|
# in that case, would have to mask out certain things (like ESC)
|
|
# in the same way that Screen's draw method would do
|
|
# for now at least get some of the arrows in there (note ESC is one
|
|
# of those arrows... so skip it...
|
|
ansichars = dict(zip((0x18, 0x19), u'\u2191\u2193'))
|
|
|
|
|
|
def _utf8_normalize(data, shiftin, decoder):
|
|
# first we give the stateful decoder a crack at the byte stream,
|
|
# we may come up empty in the event of a partial multibyte
|
|
try:
|
|
data = decoder.decode(data)
|
|
except UnicodeDecodeError:
|
|
# first order of business is to reset the state of
|
|
# the decoder to a clean one, so we can switch back to utf-8
|
|
# when things change, for example going from an F1 setup menu stuck
|
|
# in the old days to a modern platform using utf-8
|
|
decoder.setstate(codecs.getincrementaldecoder('utf-8')().getstate())
|
|
# Ok, so we have something that is not valid UTF-8,
|
|
# our next stop is to try CP437. We don't try incremental
|
|
# decode, since cp437 is single byte
|
|
# replace is silly here, since there does not exist invalid c437,
|
|
# but just in case
|
|
data = data.decode('cp437', 'replace')
|
|
# Finally, the low part of ascii is valid utf-8, but we are going to be
|
|
# more interested in the cp437 versions (since this is console *output*
|
|
# not input
|
|
if shiftin is None:
|
|
data = data.translate(ansichars)
|
|
return data.encode('utf-8')
|
|
|
|
|
|
def pytechars2line(chars, maxlen=None):
|
|
line = '\x1b[m' # start at default params
|
|
lb = False # last bold
|
|
li = False # last italic
|
|
lu = False # last underline
|
|
ls = False # last strikethrough
|
|
lr = False # last reverse
|
|
lfg = 'default' # last fg color
|
|
lbg = 'default' # last bg color
|
|
hasdata = False
|
|
len = 1
|
|
for charidx in range(maxlen):
|
|
char = chars[charidx]
|
|
csi = []
|
|
if char.fg != lfg:
|
|
csi.append(30 + pytecolors2ansi[char.fg])
|
|
lfg = char.fg
|
|
if char.bg != lbg:
|
|
csi.append(40 + pytecolors2ansi[char.bg])
|
|
lbg = char.bg
|
|
if char.bold != lb:
|
|
lb = char.bold
|
|
csi.append(1 if lb else 22)
|
|
if char.italics != li:
|
|
li = char.italics
|
|
csi.append(3 if li else 23)
|
|
if char.underscore != lu:
|
|
lu = char.underscore
|
|
csi.append(4 if lu else 24)
|
|
if char.strikethrough != ls:
|
|
ls = char.strikethrough
|
|
csi.append(9 if ls else 29)
|
|
if char.reverse != lr:
|
|
lr = char.reverse
|
|
csi.append(7 if lr else 27)
|
|
if csi:
|
|
line += b'\x1b[' + b';'.join(['{0}'.format(x) for x in csi]) + b'm'
|
|
if not hasdata and char.data.encode('utf-8').rstrip():
|
|
hasdata = True
|
|
line += char.data.encode('utf-8')
|
|
if maxlen and len >= maxlen:
|
|
break
|
|
len += 1
|
|
return line, hasdata
|
|
|
|
|
|
class ConsoleHandler(object):
|
|
_plugin_path = '/nodes/{0}/_console/session'
|
|
_logtobuffer = True
|
|
_genwatchattribs = frozenset(('console.method', 'console.logging'))
|
|
|
|
def __init__(self, node, configmanager):
|
|
self.clearpending = False
|
|
self._dologging = True
|
|
self._isondemand = False
|
|
self.error = None
|
|
self.cfgmgr = configmanager
|
|
self.node = node
|
|
self.connectstate = 'unconnected'
|
|
self._isalive = True
|
|
self.buffer = pyte.Screen(100, 31)
|
|
self.termstream = pyte.ByteStream()
|
|
self.termstream.attach(self.buffer)
|
|
self.livesessions = set([])
|
|
self.utf8decoder = codecs.getincrementaldecoder('utf-8')()
|
|
if self._logtobuffer:
|
|
self.logger = log.Logger(node, console=True,
|
|
tenant=configmanager.tenant)
|
|
(text, termstate, timestamp) = (b'', 0, False)
|
|
# when reading from log file, we will use wall clock
|
|
# it should usually match walltime.
|
|
self.lasttime = 0
|
|
if timestamp:
|
|
timediff = time.time() - timestamp
|
|
if timediff > 0:
|
|
self.lasttime = util.monotonic_time() - timediff
|
|
else:
|
|
# wall clock has gone backwards, use current time as best
|
|
# guess
|
|
self.lasttime = util.monotonic_time()
|
|
self.clearbuffer()
|
|
self.appmodedetected = False
|
|
self.shiftin = None
|
|
self.reconnect = None
|
|
if termstate & 1:
|
|
self.appmodedetected = True
|
|
if termstate & 2:
|
|
self.shiftin = '0'
|
|
self.users = {}
|
|
self._attribwatcher = None
|
|
self._console = None
|
|
self.connectionthread = None
|
|
self.send_break = None
|
|
if self._genwatchattribs:
|
|
self._attribwatcher = self.cfgmgr.watch_attributes(
|
|
(self.node,), self._genwatchattribs, self._attribschanged)
|
|
self.check_isondemand()
|
|
if not self._isondemand:
|
|
self.connectstate = 'connecting'
|
|
eventlet.spawn(self._connect)
|
|
|
|
def feedbuffer(self, data):
|
|
try:
|
|
self.termstream.feed(data)
|
|
except StopIteration: # corrupt parser state, start over
|
|
self.termstream = pyte.ByteStream()
|
|
self.termstream.attach(self.buffer)
|
|
except Exception:
|
|
_tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event,
|
|
event=log.Events.stacktrace)
|
|
|
|
def check_isondemand(self):
|
|
self._dologging = True
|
|
attrvalue = self.cfgmgr.get_node_attributes(
|
|
(self.node,), ('console.logging',))
|
|
if self.node not in attrvalue:
|
|
self._isondemand = False
|
|
elif 'console.logging' not in attrvalue[self.node]:
|
|
self._isondemand = False
|
|
else:
|
|
if (attrvalue[self.node]['console.logging']['value'] not in (
|
|
'full', '', 'buffer')):
|
|
self._isondemand = True
|
|
if (attrvalue[self.node]['console.logging']['value']) in ('none', 'memory'):
|
|
self._dologging = False
|
|
|
|
def get_buffer_age(self):
|
|
"""Return age of buffered data
|
|
|
|
Returns age in seconds of the buffered data or
|
|
False in the event of calling before buffered data"""
|
|
if self.lasttime:
|
|
return util.monotonic_time() - self.lasttime
|
|
return False
|
|
|
|
def _attribschanged(self, nodeattribs, configmanager, **kwargs):
|
|
if 'console.logging' in nodeattribs[self.node]:
|
|
# decide whether logging changes how we react or not
|
|
self._dologging = True
|
|
logvalue = 'full'
|
|
attributevalue = configmanager.get_node_attributes(
|
|
(self.node,), ('console.logging',))
|
|
try:
|
|
logvalue = \
|
|
attributevalue[self.node]['console.logging']['value']
|
|
except KeyError:
|
|
pass
|
|
if logvalue in ('full', ''):
|
|
# if the *only* thing to change is the log,
|
|
# then let always on handle reconnect if needed,
|
|
# since we want to avoid a senseless disconnect
|
|
# if already connected
|
|
# if other things change, then unconditionally reconnect
|
|
onlylogging = len(nodeattribs[self.node]) == 1
|
|
self._alwayson(doconnect=onlylogging)
|
|
if onlylogging:
|
|
return
|
|
else:
|
|
self._ondemand()
|
|
if logvalue in ('none', 'memory'):
|
|
self._dologging = False
|
|
if not self._isondemand or self.livesessions:
|
|
eventlet.spawn(self._connect)
|
|
|
|
def log(self, *args, **kwargs):
|
|
if not self._dologging:
|
|
return
|
|
self.logger.log(*args, **kwargs)
|
|
|
|
def _alwayson(self, doconnect=True):
|
|
self._isondemand = False
|
|
if not doconnect:
|
|
return
|
|
if not self._console and not self.connectionthread:
|
|
self._connect()
|
|
else:
|
|
self._console.ping()
|
|
|
|
def clearbuffer(self):
|
|
self.feedbuffer(
|
|
'\x1bc[no replay buffer due to console.logging attribute set to '
|
|
'none or interactive,\r\nconnection loss, or service restart]')
|
|
self.clearpending = True
|
|
|
|
def _disconnect(self):
|
|
if self.connectionthread:
|
|
self.connectionthread.kill()
|
|
self.connectionthread = None
|
|
# clear the terminal buffer when disconnected
|
|
self.clearbuffer()
|
|
if self._console:
|
|
self.log(
|
|
logdata='console disconnected', ltype=log.DataTypes.event,
|
|
event=log.Events.consoledisconnect)
|
|
self._console.close()
|
|
self._console = None
|
|
self.connectstate = 'unconnected'
|
|
self._send_rcpts({'connectstate': self.connectstate})
|
|
|
|
def _ondemand(self):
|
|
self._isondemand = True
|
|
if not self.livesessions and self._console:
|
|
self._disconnect()
|
|
|
|
def _connect(self):
|
|
if self.connectionthread:
|
|
self.connectionthread.kill()
|
|
self.connectionthread = None
|
|
self.connectionthread = eventlet.spawn(self._connect_backend)
|
|
|
|
def _connect_backend(self):
|
|
if self._console:
|
|
self._console.close()
|
|
self._console = None
|
|
self.connectstate = 'connecting'
|
|
self._send_rcpts({'connectstate': self.connectstate})
|
|
if self.reconnect:
|
|
self.reconnect.cancel()
|
|
self.reconnect = None
|
|
try:
|
|
self._console = plugin.handle_path(
|
|
self._plugin_path.format(self.node),
|
|
"create", self.cfgmgr)
|
|
except (exc.NotImplementedException, exc.NotFoundException):
|
|
self._console = None
|
|
except:
|
|
_tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event,
|
|
event=log.Events.stacktrace)
|
|
if not isinstance(self._console, conapi.Console):
|
|
self.clearbuffer()
|
|
self.connectstate = 'unconnected'
|
|
self.error = 'misconfigured'
|
|
self._send_rcpts({'connectstate': self.connectstate,
|
|
'error': self.error})
|
|
return
|
|
self.send_break = self._console.send_break
|
|
if self._attribwatcher:
|
|
self.cfgmgr.remove_watcher(self._attribwatcher)
|
|
self._attribwatcher = None
|
|
if hasattr(self._console, "configattributes"):
|
|
attribstowatch = self._console.configattributes | self._genwatchattribs
|
|
else:
|
|
attribstowatch = self._genwatchattribs
|
|
if self._genwatchattribs:
|
|
self._attribwatcher = self.cfgmgr.watch_attributes(
|
|
(self.node,), attribstowatch, self._attribschanged)
|
|
try:
|
|
self._console.connect(self.get_console_output)
|
|
except exc.TargetEndpointBadCredentials:
|
|
self.clearbuffer()
|
|
self.error = 'badcredentials'
|
|
self.connectstate = 'unconnected'
|
|
self._send_rcpts({'connectstate': self.connectstate,
|
|
'error': self.error})
|
|
retrytime = 120 + (120 * random.random())
|
|
if not self.reconnect:
|
|
self.reconnect = eventlet.spawn_after(retrytime, self._connect)
|
|
return
|
|
except exc.TargetEndpointUnreachable:
|
|
self.clearbuffer()
|
|
self.error = 'unreachable'
|
|
self.connectstate = 'unconnected'
|
|
self._send_rcpts({'connectstate': self.connectstate,
|
|
'error': self.error})
|
|
retrytime = 120 + (120 * random.random())
|
|
if not self.reconnect:
|
|
self.reconnect = eventlet.spawn_after(retrytime, self._connect)
|
|
return
|
|
except Exception:
|
|
self.clearbuffer()
|
|
_tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event,
|
|
event=log.Events.stacktrace)
|
|
self.error = 'unknown'
|
|
self.connectstate = 'unconnected'
|
|
self._send_rcpts({'connectstate': self.connectstate,
|
|
'error': self.error})
|
|
retrytime = 120 + (120 * random.random())
|
|
if not self.reconnect:
|
|
self.reconnect = eventlet.spawn_after(retrytime, self._connect)
|
|
return
|
|
self._got_connected()
|
|
|
|
def _got_connected(self):
|
|
self.connectstate = 'connected'
|
|
self.log(
|
|
logdata='console connected', ltype=log.DataTypes.event,
|
|
event=log.Events.consoleconnect)
|
|
self._send_rcpts({'connectstate': self.connectstate})
|
|
|
|
def _got_disconnected(self):
|
|
self.clearbuffer()
|
|
if self.connectstate != 'unconnected':
|
|
self.connectstate = 'unconnected'
|
|
self.log(
|
|
logdata='console disconnected', ltype=log.DataTypes.event,
|
|
event=log.Events.consoledisconnect)
|
|
self._send_rcpts({'connectstate': self.connectstate})
|
|
if self._isalive:
|
|
self._connect()
|
|
|
|
def close(self):
|
|
self._isalive = False
|
|
self._send_rcpts({'deleting': True})
|
|
self._disconnect()
|
|
if self._console:
|
|
|
|
self._console.close()
|
|
self._console = None
|
|
if self.connectionthread:
|
|
self.connectionthread.kill()
|
|
self.connectionthread = None
|
|
|
|
def get_console_output(self, data):
|
|
# Spawn as a greenthread, return control as soon as possible
|
|
# to the console object
|
|
eventlet.spawn(self._handle_console_output, data)
|
|
|
|
def attachsession(self, session):
|
|
edata = 1
|
|
for currsession in self.livesessions:
|
|
if currsession.username == session.username:
|
|
# indicate that user has multiple connections
|
|
edata = 2
|
|
self.livesessions.add(session)
|
|
self.log(
|
|
logdata=session.username, ltype=log.DataTypes.event,
|
|
event=log.Events.clientconnect, eventdata=edata)
|
|
self._send_rcpts({'clientcount': len(self.livesessions)})
|
|
if self.connectstate == 'unconnected':
|
|
# if console is not connected, take time to try to assert
|
|
# connectivity now.
|
|
if self.reconnect:
|
|
# cancel an automated retry if one is pending
|
|
self.reconnect.cancel()
|
|
self.reconnect = None
|
|
self.connectstate = 'connecting'
|
|
eventlet.spawn(self._connect)
|
|
|
|
|
|
|
|
def detachsession(self, session):
|
|
edata = 0
|
|
self.livesessions.discard(session)
|
|
for currsession in self.livesessions:
|
|
if currsession.username == session.username:
|
|
edata += 1
|
|
if edata > 1: # don't bother counting beyond 2 in the log
|
|
break
|
|
self.log(
|
|
logdata=session.username, ltype=log.DataTypes.event,
|
|
event=log.Events.clientdisconnect, eventdata=edata)
|
|
self._send_rcpts({'clientcount': len(self.livesessions)})
|
|
if self._isondemand and not self.livesessions:
|
|
self._disconnect()
|
|
|
|
|
|
def reopen(self):
|
|
self._got_disconnected()
|
|
|
|
def _handle_console_output(self, data):
|
|
if type(data) == int:
|
|
if data == conapi.ConsoleEvent.Disconnect:
|
|
self._got_disconnected()
|
|
return
|
|
elif data == '':
|
|
# ignore empty strings from a cconsole provider
|
|
return
|
|
if '\x1b[?1l' in data: # request for ansi mode cursor keys
|
|
self.appmodedetected = False
|
|
if '\x1b[?1h' in data: # remember the session wants the client to use
|
|
# 'application mode' Thus far only observed on esxi
|
|
self.appmodedetected = True
|
|
if '\x1b)0' in data:
|
|
# console indicates it wants access to special drawing characters
|
|
self.shiftin = '0'
|
|
eventdata = 0
|
|
if self.appmodedetected:
|
|
eventdata |= 1
|
|
if self.shiftin is not None:
|
|
eventdata |= 2
|
|
self.log(data, eventdata=eventdata)
|
|
self.lasttime = util.monotonic_time()
|
|
self.feedbuffer(data)
|
|
# TODO: analyze buffer for registered events, examples:
|
|
# panics
|
|
# certificate signing request
|
|
if self.clearpending:
|
|
self.clearpending = False
|
|
self.feedbuffer(b'\x1bc')
|
|
self._send_rcpts(b'\x1bc')
|
|
self._send_rcpts(_utf8_normalize(data, self.shiftin, self.utf8decoder))
|
|
|
|
def _send_rcpts(self, data):
|
|
for rcpt in list(self.livesessions):
|
|
try:
|
|
rcpt.data_handler(data)
|
|
except: # No matter the reason, advance to next recipient
|
|
_tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event,
|
|
event=log.Events.stacktrace)
|
|
|
|
def get_recent(self):
|
|
"""Retrieve 'recent' data
|
|
|
|
Replay data in the intent to perhaps reproduce the display.
|
|
"""
|
|
# For now, just try to seek back in buffer to find a clear screen
|
|
# If that fails, just return buffer
|
|
# a scheme always tracking the last clear screen would be too costly
|
|
connstate = {
|
|
'connectstate': self.connectstate,
|
|
'clientcount': len(self.livesessions),
|
|
}
|
|
retdata = b'\x1b[H\x1b[J' # clear screen
|
|
pendingbl = b'' # pending blank lines
|
|
maxlen = 0
|
|
for line in self.buffer.display:
|
|
line = line.rstrip()
|
|
if len(line) > maxlen:
|
|
maxlen = len(line)
|
|
for line in range(self.buffer.lines):
|
|
nline, notblank = pytechars2line(self.buffer.buffer[line], maxlen)
|
|
if notblank:
|
|
if pendingbl:
|
|
retdata += pendingbl
|
|
pendingbl = b''
|
|
retdata += nline + '\r\n'
|
|
else:
|
|
pendingbl += nline + '\r\n'
|
|
if len(retdata) > 6:
|
|
retdata = retdata[:-2] # remove the last \r\n
|
|
retdata += b'\x1b[{0};{1}H'.format(self.buffer.cursor.y + 1,
|
|
self.buffer.cursor.x + 1)
|
|
if self.shiftin is not None: # detected that terminal requested a
|
|
# shiftin character set, relay that to the terminal that cannected
|
|
retdata += '\x1b)' + self.shiftin
|
|
if self.appmodedetected:
|
|
retdata += '\x1b[?1h'
|
|
else:
|
|
retdata += '\x1b[?1l'
|
|
return retdata, connstate
|
|
|
|
def write(self, data):
|
|
if self.connectstate == 'connected':
|
|
try:
|
|
self._console.write(data)
|
|
except Exception:
|
|
_tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event,
|
|
event=log.Events.stacktrace)
|
|
self._got_disconnected()
|
|
|
|
|
|
def disconnect_node(node, configmanager):
|
|
consk = (node, configmanager.tenant)
|
|
if consk in _handled_consoles:
|
|
_handled_consoles[consk].close()
|
|
del _handled_consoles[consk]
|
|
|
|
|
|
def _nodechange(added, deleting, configmanager):
|
|
for node in added:
|
|
connect_node(node, configmanager)
|
|
for node in deleting:
|
|
disconnect_node(node, configmanager)
|
|
|
|
|
|
def _start_tenant_sessions(cfm):
|
|
for node in cfm.list_nodes():
|
|
try:
|
|
connect_node(node, cfm)
|
|
except:
|
|
_tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event,
|
|
event=log.Events.stacktrace)
|
|
cfm.watch_nodecollection(_nodechange)
|
|
|
|
|
|
def start_console_sessions():
|
|
global _tracelog
|
|
_tracelog = log.Logger('trace')
|
|
configmodule.hook_new_configmanagers(_start_tenant_sessions)
|
|
|
|
|
|
def connect_node(node, configmanager, username=None):
|
|
consk = (node, configmanager.tenant)
|
|
if consk not in _handled_consoles:
|
|
_handled_consoles[consk] = ConsoleHandler(node, configmanager)
|
|
return _handled_consoles[consk]
|
|
|
|
# this represents some api view of a console handler. This handles things like
|
|
# holding the caller specific queue data, for example, when http api should be
|
|
# sending data, but there is no outstanding POST request to hold it,
|
|
# this object has the job of holding the data
|
|
|
|
|
|
class ConsoleSession(object):
|
|
"""Create a new socket to converse with node console
|
|
|
|
This object provides a filehandle that can be read/written
|
|
too in a normal fashion and the concurrency, logging, and
|
|
event watching will all be handled seamlessly
|
|
|
|
:param node: Name of the node for which this session will be created
|
|
:param configmanager: A configuration manager object for current context
|
|
:param username: Username for which this session object will operate
|
|
:param datacallback: An asynchronous data handler, to be called when data
|
|
is available. Note that if passed, it makes
|
|
'get_next_output' non-functional
|
|
:param skipreplay: If true, will skip the attempt to redraw the screen
|
|
"""
|
|
connector = connect_node
|
|
|
|
def __init__(self, node, configmanager, username, datacallback=None,
|
|
skipreplay=False):
|
|
self.registered = False
|
|
self.tenant = configmanager.tenant
|
|
if not configmanager.is_node(node):
|
|
raise exc.NotFoundException("Invalid node")
|
|
self.username = username
|
|
self.node = node
|
|
self.configmanager = configmanager
|
|
self.connect_session()
|
|
self.registered = True
|
|
self._evt = None
|
|
self.node = node
|
|
self.write = self.conshdl.write
|
|
if datacallback is None:
|
|
self.reaper = eventlet.spawn_after(15, self.destroy)
|
|
self.databuffer = collections.deque([])
|
|
self.data_handler = self.got_data
|
|
if not skipreplay:
|
|
self.databuffer.extend(self.conshdl.get_recent())
|
|
else:
|
|
self.data_handler = datacallback
|
|
if not skipreplay:
|
|
for recdata in self.conshdl.get_recent():
|
|
if recdata:
|
|
datacallback(recdata)
|
|
self.conshdl.attachsession(self)
|
|
|
|
|
|
def connect_session(self):
|
|
"""Connect to the appropriate backend handler
|
|
|
|
This is not intended to be called by your usual consumer,
|
|
it is a hook for confluent to abstract the concept of a terminal
|
|
between console and shell.
|
|
"""
|
|
self.conshdl = connect_node(self.node, self.configmanager,
|
|
self.username)
|
|
def send_break(self):
|
|
"""Send break to remote system
|
|
"""
|
|
self.conshdl.send_break()
|
|
|
|
def get_buffer_age(self):
|
|
"""Get the age in seconds of the buffered data
|
|
|
|
Returns False if no data buffered yet"""
|
|
return self.conshdl.get_buffer_age()
|
|
|
|
def reopen(self):
|
|
"""Reopen the session
|
|
|
|
This can be useful if there is suspicion that the remote console is
|
|
dead. Note that developers should consider need for this a bug unless
|
|
there really is some fundamental, unavoidable limitation regarding
|
|
automatically detecting an unusable console in the underlying
|
|
technology that cannot be unambiguously autodetected.
|
|
"""
|
|
self.conshdl.reopen()
|
|
|
|
def destroy(self):
|
|
if self.registered:
|
|
self.conshdl.detachsession(self)
|
|
if self._evt:
|
|
self._evt.send()
|
|
self._evt = None
|
|
self.reghdl = None
|
|
|
|
def got_data(self, data):
|
|
"""Receive data from console and buffer
|
|
|
|
If the caller does not provide a callback and instead will be polling
|
|
for data, we must maintain data in a buffer until retrieved. This is
|
|
an internal function used as a means to convert the async behavior to
|
|
polling for consumers that cannot do the async behavior.
|
|
"""
|
|
self.databuffer.append(data)
|
|
if self._evt:
|
|
self._evt.send()
|
|
self._evt = None
|
|
|
|
def get_next_output(self, timeout=45):
|
|
"""Poll for next available output on this console.
|
|
|
|
Ideally purely event driven scheme is perfect. AJAX over HTTP is
|
|
at least one case where we don't have that luxury. This function
|
|
will not work if the session was initialized with a data callback
|
|
instead of polling mode.
|
|
"""
|
|
self.reaper.cancel()
|
|
# postpone death to be 15 seconds after this would timeout
|
|
self.reaper = eventlet.spawn_after(timeout + 15, self.destroy)
|
|
if self._evt:
|
|
raise Exception('get_next_output is not re-entrant')
|
|
if not self.databuffer:
|
|
self._evt = eventlet.event.Event()
|
|
with eventlet.Timeout(timeout, False):
|
|
self._evt.wait()
|
|
self._evt = None
|
|
if not self.databuffer:
|
|
return ""
|
|
currdata = self.databuffer.popleft()
|
|
if isinstance(currdata, dict):
|
|
return currdata
|
|
retval = currdata
|
|
while self.databuffer and not isinstance(self.databuffer[0], dict):
|
|
retval += self.databuffer.popleft()
|
|
|
|
return retval
|