2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-01-18 05:33:17 +00:00

Retrofit consoleserver and make shellserver

Provide a common 'shellserver' capability cloned off of 'consoleserver'.
This will enable the concept of per-user shells with option for multiple
shells per.  Each user will have their own set of shell sessions rather
than shared across users.  Can revisit in future if sharing between
users is desired.
This commit is contained in:
Jarrod Johnson 2016-01-05 15:28:49 -05:00
parent dfc1e32546
commit 30ed563810
3 changed files with 165 additions and 36 deletions

View File

@ -18,8 +18,8 @@
# whatever filehandle is conversing with the client and starts
# relaying data. It uses Ctrl-] like telnet for escape back to prompt
#we track nodes that are actively being logged, watched, or have attached
#there should be no more than one handler per node
# we track nodes that are actively being logged, watched, or have attached
# there should be no more than one handler per node
import collections
import confluent.config.configmanager as configmodule
import confluent.exceptions as exc
@ -40,7 +40,9 @@ _genwatchattribs = frozenset(('console.method', 'console.logging'))
_tracelog = None
class _ConsoleHandler(object):
class ConsoleHandler(object):
_plugin_path = '/nodes/{0}/_console/session'
def __init__(self, node, configmanager):
self._dologging = True
self._isondemand = False
@ -186,7 +188,7 @@ class _ConsoleHandler(object):
self.reconnect = None
try:
self._console = plugin.handle_path(
"/nodes/%s/_console/session" % self.node,
self._plugin_path.format(self.node),
"create", self.cfgmgr)
except:
_tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event,
@ -332,8 +334,8 @@ class _ConsoleHandler(object):
edata = 2
if edata < 0:
_tracelog.log('client count negative' + traceback.format_exc(),
ltype=log.DataTypes.event,
event=log.Events.stacktrace)
ltype=log.DataTypes.event,
event=log.Events.stacktrace)
edata = 0
self.log(
logdata=username, ltype=log.DataTypes.event,
@ -369,7 +371,7 @@ class _ConsoleHandler(object):
self.buffer += data
else:
self.buffer += data.encode('utf-8')
#TODO: analyze buffer for registered events, examples:
# TODO: analyze buffer for registered events, examples:
# panics
# certificate signing request
if len(self.buffer) > 16384:
@ -388,28 +390,28 @@ class _ConsoleHandler(object):
Replay data in the intent to perhaps reproduce the display.
"""
#For now, just try to seek back in buffer to find a clear screen
#If that fails, just return buffer
#a scheme always tracking the last clear screen would be too costly
# For now, just try to seek back in buffer to find a clear screen
# If that fails, just return buffer
# a scheme always tracking the last clear screen would be too costly
connstate = {
'connectstate': self.connectstate,
'clientcount': self.clientcount,
}
retdata = ''
if self.shiftin is not None: # detected that terminal requested a
#shiftin character set, relay that to the terminal that cannected
# shiftin character set, relay that to the terminal that cannected
retdata += '\x1b)' + self.shiftin
if self.appmodedetected:
retdata += '\x1b[?1h'
else:
retdata += '\x1b[?1l'
#an alternative would be to emulate a VT100 to know what the
#whole screen would look like
#this is one scheme to clear screen, move cursor then clear
# an alternative would be to emulate a VT100 to know what the
# whole screen would look like
# this is one scheme to clear screen, move cursor then clear
bufidx = self.buffer.rfind('\x1b[H\x1b[J')
if bufidx >= 0:
return retdata + str(self.buffer[bufidx:]), connstate
#another scheme is the 2J scheme
# another scheme is the 2J scheme
bufidx = self.buffer.rfind('\x1b[2J')
if bufidx >= 0:
# there was some sort of clear screen event
@ -417,8 +419,8 @@ class _ConsoleHandler(object):
# in hopes that it reproduces the screen
return retdata + str(self.buffer[bufidx:]), connstate
else:
#we have no indication of last erase, play back last kibibyte
#to give some sense of context anyway
# we have no indication of last erase, play back last kibibyte
# to give some sense of context anyway
return retdata + str(self.buffer[-1024:]), connstate
def write(self, data):
@ -456,13 +458,15 @@ def start_console_sessions():
configmodule.hook_new_configmanagers(_start_tenant_sessions)
def connect_node(node, configmanager):
def connect_node(node, configmanager, username=None):
consk = (node, configmanager.tenant)
if consk not in _handled_consoles:
_handled_consoles[consk] = _ConsoleHandler(node, configmanager)
#this represents some api view of a console handler. This handles things like
#holding the caller specific queue data, for example, when http api should be
#sending data, but there is no outstanding POST request to hold it,
_handled_consoles[consk] = ConsoleHandler(node, configmanager)
return _handled_consoles[consk]
# this represents some api view of a console handler. This handles things like
# holding the caller specific queue data, for example, when http api should be
# sending data, but there is no outstanding POST request to hold it,
# this object has the job of holding the data
@ -474,7 +478,14 @@ class ConsoleSession(object):
event watching will all be handled seamlessly
:param node: Name of the node for which this session will be created
:param configmanager: A configuration manager object for current context
:param username: Username for which this session object will operate
:param datacallback: An asynchronous data handler, to be called when data
is available. Note that if passed, it makes
'get_next_output' non-functional
:param skipreplay: If true, will skip the attempt to redraw the screen
"""
connector = connect_node
def __init__(self, node, configmanager, username, datacallback=None,
skipreplay=False):
@ -482,30 +493,40 @@ class ConsoleSession(object):
self.tenant = configmanager.tenant
if not configmanager.is_node(node):
raise exc.NotFoundException("Invalid node")
consk = (node, self.tenant)
self.ckey = consk
self.username = username
connect_node(node, configmanager)
self.node = node
self.configmanager = configmanager
self.connect_session()
self.registered = True
_handled_consoles[consk].attachuser(username)
self.conshdl.attachuser(self.username)
self._evt = None
self.node = node
self.conshdl = _handled_consoles[consk]
self.write = _handled_consoles[consk].write
self.write = self.conshdl.write
if datacallback is None:
self.reaper = eventlet.spawn_after(15, self.destroy)
self.databuffer = collections.deque([])
self.reghdl = _handled_consoles[consk].register_rcpt(self.got_data)
self.reghdl = self.conshdl.register_rcpt(self.got_data)
if not skipreplay:
self.databuffer.extend(_handled_consoles[consk].get_recent())
self.databuffer.extend(self.conshdl.get_recent())
else:
self.reghdl = _handled_consoles[consk].register_rcpt(datacallback)
self.reghdl = self.conshdl.register_rcpt(datacallback)
if not skipreplay:
for recdata in _handled_consoles[consk].get_recent():
for recdata in self.conshdl.get_recent():
if recdata:
datacallback(recdata)
def connect_session(self):
"""Connect to the appropriate backend handler
This is not intended to be called by your usual consumer,
it is a hook for confluent to abstract the concept of a terminal
between console and shell.
"""
self.conshdl = connect_node(self.node, self.configmanager,
self.username)
def send_break(self):
"""Send break to remote system
"""
self.conshdl.send_break()
def get_buffer_age(self):
@ -515,12 +536,20 @@ class ConsoleSession(object):
return self.conshdl.get_buffer_age()
def reopen(self):
"""Reopen the session
This can be useful if there is suspicion that the remote console is
dead. Note that developers should consider need for this a bug unless
there really is some fundamental, unavoidable limitation regarding
automatically detecting an unusable console in the underlying
technology that cannot be unambiguously autodetected.
"""
self.conshdl.reopen()
def destroy(self):
if self.registered:
_handled_consoles[self.ckey].detachuser(self.username)
_handled_consoles[self.ckey].unregister_rcpt(self.reghdl)
self.conshdl.detachuser(self.username)
self.conshdl.unregister_rcpt(self.reghdl)
self.databuffer = None
self._evt = None
self.reghdl = None
@ -529,7 +558,9 @@ class ConsoleSession(object):
"""Receive data from console and buffer
If the caller does not provide a callback and instead will be polling
for data, we must maintain data in a buffer until retrieved
for data, we must maintain data in a buffer until retrieved. This is
an internal function used as a means to convert the async behavior to
polling for consumers that cannot do the async behavior.
"""
self.databuffer.append(data)
if self._evt:
@ -539,7 +570,9 @@ class ConsoleSession(object):
"""Poll for next available output on this console.
Ideally purely event driven scheme is perfect. AJAX over HTTP is
at least one case where we don't have that luxury
at least one case where we don't have that luxury. This function
will not work if the session was initialized with a data callback
instead of polling mode.
"""
self.reaper.cancel()
if self._evt:

View File

@ -173,6 +173,10 @@ noderesources = {
'default': 'ssh',
})
},
'shell': {
# another special case similar to console
'sessions': [],
},
'console': {
# this is a dummy value, http or socket must handle special
'session': PluginRoute({}),

View File

@ -0,0 +1,92 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2016 Lenovo
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This module tracks each node, tenants currently active shell sessions
# 'ConsoleSession' objects from consoleserver are used, but with the additional
# capacity for having a multiple of sessions per node active at a given time
import confluent.consoleserver as consoleserver
import uuid
activesessions = {}
class _ShellHandler(consoleserver.ConsoleHandler):
_plugin_path = '/nodes/{0}/_shell/session'
def get_sessions(tenant, node, user):
"""Get sessionids active for node
Given a tenant, nodename, and user; provide an iterable of sessionids.
Each permutation of tenant, nodename and user have a distinct set of shell
sessions.
:param tenant: The tenant identifier for the current scope
:param node: The nodename of the current scope.
:param user: The confluent user that will 'own' the session.
"""
return activesessions.get((tenant, node, user), {})
def get_session(tenant, node, user, sessionid):
return activesessions.get((tenant, node, user), {}).get(sessionid, None)
class ShellSession(consoleserver.ConsoleSession):
"""Create a new socket to converse with a node shell session
This object provides a filehandle that can be read/written
too in a normal fashion and the concurrency, logging, and
event watching will all be handled seamlessly. It represents a remote
CLI shell session.
:param node: Name of the node for which this session will be created
:param configmanager: A configuration manager object for current context
:param username: Username for which this session object will operate
:param datacallback: An asynchronous data handler, to be called when data
is available. Note that if passed, it makes
'get_next_output' non-functional
:param skipreplay: If true, will skip the attempt to redraw the screen
:param sessionid: An optional identifier to match a running session or
customize the name of a new session.
"""
def __init__(self, node, configmanager, username, datacallback=None,
skipreplay=False, sessionid=None):
self.sessionid = sessionid
self.configmanager = configmanager
self.node = node
super(ShellSession, self).__init__(node, configmanager, username,
datacallback, skipreplay)
def connect_session(self):
global activesessions
tenant = self.configmanager.tenant
if self.sessionid is None:
self.sessionid = str(uuid.uuid4())
if (self.configmanager.tenant, self.node) not in activesessions:
activesessions[(tenant, self.node)] = {}
if self.sessionid not in activesessions[(tenant, self.node)]:
activesessions[(tenant, self.node)][self.sessionid] = _ShellHandler(self.node, self.configmanager)
self.conshdl = activesessions[(self.configmanager.tenant, self.node)][self.sessionid]