diff --git a/confluent_server/confluent/consoleserver.py b/confluent_server/confluent/consoleserver.py index c71cfe7d..3f394dd4 100644 --- a/confluent_server/confluent/consoleserver.py +++ b/confluent_server/confluent/consoleserver.py @@ -18,8 +18,8 @@ # whatever filehandle is conversing with the client and starts # relaying data. It uses Ctrl-] like telnet for escape back to prompt -#we track nodes that are actively being logged, watched, or have attached -#there should be no more than one handler per node +# we track nodes that are actively being logged, watched, or have attached +# there should be no more than one handler per node import collections import confluent.config.configmanager as configmodule import confluent.exceptions as exc @@ -40,7 +40,9 @@ _genwatchattribs = frozenset(('console.method', 'console.logging')) _tracelog = None -class _ConsoleHandler(object): +class ConsoleHandler(object): + _plugin_path = '/nodes/{0}/_console/session' + def __init__(self, node, configmanager): self._dologging = True self._isondemand = False @@ -186,7 +188,7 @@ class _ConsoleHandler(object): self.reconnect = None try: self._console = plugin.handle_path( - "/nodes/%s/_console/session" % self.node, + self._plugin_path.format(self.node), "create", self.cfgmgr) except: _tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event, @@ -332,8 +334,8 @@ class _ConsoleHandler(object): edata = 2 if edata < 0: _tracelog.log('client count negative' + traceback.format_exc(), - ltype=log.DataTypes.event, - event=log.Events.stacktrace) + ltype=log.DataTypes.event, + event=log.Events.stacktrace) edata = 0 self.log( logdata=username, ltype=log.DataTypes.event, @@ -369,7 +371,7 @@ class _ConsoleHandler(object): self.buffer += data else: self.buffer += data.encode('utf-8') - #TODO: analyze buffer for registered events, examples: + # TODO: analyze buffer for registered events, examples: # panics # certificate signing request if len(self.buffer) > 16384: @@ -388,28 +390,28 @@ class _ConsoleHandler(object): Replay data in the intent to perhaps reproduce the display. """ - #For now, just try to seek back in buffer to find a clear screen - #If that fails, just return buffer - #a scheme always tracking the last clear screen would be too costly + # For now, just try to seek back in buffer to find a clear screen + # If that fails, just return buffer + # a scheme always tracking the last clear screen would be too costly connstate = { 'connectstate': self.connectstate, 'clientcount': self.clientcount, } retdata = '' if self.shiftin is not None: # detected that terminal requested a - #shiftin character set, relay that to the terminal that cannected + # shiftin character set, relay that to the terminal that cannected retdata += '\x1b)' + self.shiftin if self.appmodedetected: retdata += '\x1b[?1h' else: retdata += '\x1b[?1l' - #an alternative would be to emulate a VT100 to know what the - #whole screen would look like - #this is one scheme to clear screen, move cursor then clear + # an alternative would be to emulate a VT100 to know what the + # whole screen would look like + # this is one scheme to clear screen, move cursor then clear bufidx = self.buffer.rfind('\x1b[H\x1b[J') if bufidx >= 0: return retdata + str(self.buffer[bufidx:]), connstate - #another scheme is the 2J scheme + # another scheme is the 2J scheme bufidx = self.buffer.rfind('\x1b[2J') if bufidx >= 0: # there was some sort of clear screen event @@ -417,8 +419,8 @@ class _ConsoleHandler(object): # in hopes that it reproduces the screen return retdata + str(self.buffer[bufidx:]), connstate else: - #we have no indication of last erase, play back last kibibyte - #to give some sense of context anyway + # we have no indication of last erase, play back last kibibyte + # to give some sense of context anyway return retdata + str(self.buffer[-1024:]), connstate def write(self, data): @@ -456,13 +458,15 @@ def start_console_sessions(): configmodule.hook_new_configmanagers(_start_tenant_sessions) -def connect_node(node, configmanager): +def connect_node(node, configmanager, username=None): consk = (node, configmanager.tenant) if consk not in _handled_consoles: - _handled_consoles[consk] = _ConsoleHandler(node, configmanager) -#this represents some api view of a console handler. This handles things like -#holding the caller specific queue data, for example, when http api should be -#sending data, but there is no outstanding POST request to hold it, + _handled_consoles[consk] = ConsoleHandler(node, configmanager) + return _handled_consoles[consk] + +# this represents some api view of a console handler. This handles things like +# holding the caller specific queue data, for example, when http api should be +# sending data, but there is no outstanding POST request to hold it, # this object has the job of holding the data @@ -474,7 +478,14 @@ class ConsoleSession(object): event watching will all be handled seamlessly :param node: Name of the node for which this session will be created + :param configmanager: A configuration manager object for current context + :param username: Username for which this session object will operate + :param datacallback: An asynchronous data handler, to be called when data + is available. Note that if passed, it makes + 'get_next_output' non-functional + :param skipreplay: If true, will skip the attempt to redraw the screen """ + connector = connect_node def __init__(self, node, configmanager, username, datacallback=None, skipreplay=False): @@ -482,30 +493,40 @@ class ConsoleSession(object): self.tenant = configmanager.tenant if not configmanager.is_node(node): raise exc.NotFoundException("Invalid node") - consk = (node, self.tenant) - self.ckey = consk self.username = username - connect_node(node, configmanager) + self.node = node + self.configmanager = configmanager + self.connect_session() self.registered = True - _handled_consoles[consk].attachuser(username) + self.conshdl.attachuser(self.username) self._evt = None self.node = node - self.conshdl = _handled_consoles[consk] - self.write = _handled_consoles[consk].write + self.write = self.conshdl.write if datacallback is None: self.reaper = eventlet.spawn_after(15, self.destroy) self.databuffer = collections.deque([]) - self.reghdl = _handled_consoles[consk].register_rcpt(self.got_data) + self.reghdl = self.conshdl.register_rcpt(self.got_data) if not skipreplay: - self.databuffer.extend(_handled_consoles[consk].get_recent()) + self.databuffer.extend(self.conshdl.get_recent()) else: - self.reghdl = _handled_consoles[consk].register_rcpt(datacallback) + self.reghdl = self.conshdl.register_rcpt(datacallback) if not skipreplay: - for recdata in _handled_consoles[consk].get_recent(): + for recdata in self.conshdl.get_recent(): if recdata: datacallback(recdata) + def connect_session(self): + """Connect to the appropriate backend handler + + This is not intended to be called by your usual consumer, + it is a hook for confluent to abstract the concept of a terminal + between console and shell. + """ + self.conshdl = connect_node(self.node, self.configmanager, + self.username) def send_break(self): + """Send break to remote system + """ self.conshdl.send_break() def get_buffer_age(self): @@ -515,12 +536,20 @@ class ConsoleSession(object): return self.conshdl.get_buffer_age() def reopen(self): + """Reopen the session + + This can be useful if there is suspicion that the remote console is + dead. Note that developers should consider need for this a bug unless + there really is some fundamental, unavoidable limitation regarding + automatically detecting an unusable console in the underlying + technology that cannot be unambiguously autodetected. + """ self.conshdl.reopen() def destroy(self): if self.registered: - _handled_consoles[self.ckey].detachuser(self.username) - _handled_consoles[self.ckey].unregister_rcpt(self.reghdl) + self.conshdl.detachuser(self.username) + self.conshdl.unregister_rcpt(self.reghdl) self.databuffer = None self._evt = None self.reghdl = None @@ -529,7 +558,9 @@ class ConsoleSession(object): """Receive data from console and buffer If the caller does not provide a callback and instead will be polling - for data, we must maintain data in a buffer until retrieved + for data, we must maintain data in a buffer until retrieved. This is + an internal function used as a means to convert the async behavior to + polling for consumers that cannot do the async behavior. """ self.databuffer.append(data) if self._evt: @@ -539,7 +570,9 @@ class ConsoleSession(object): """Poll for next available output on this console. Ideally purely event driven scheme is perfect. AJAX over HTTP is - at least one case where we don't have that luxury + at least one case where we don't have that luxury. This function + will not work if the session was initialized with a data callback + instead of polling mode. """ self.reaper.cancel() if self._evt: diff --git a/confluent_server/confluent/core.py b/confluent_server/confluent/core.py index 6d309ea8..fcf7b517 100644 --- a/confluent_server/confluent/core.py +++ b/confluent_server/confluent/core.py @@ -173,6 +173,10 @@ noderesources = { 'default': 'ssh', }) }, + 'shell': { + # another special case similar to console + 'sessions': [], + }, 'console': { # this is a dummy value, http or socket must handle special 'session': PluginRoute({}), diff --git a/confluent_server/confluent/shellserver.py b/confluent_server/confluent/shellserver.py new file mode 100644 index 00000000..6d766d1e --- /dev/null +++ b/confluent_server/confluent/shellserver.py @@ -0,0 +1,92 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2016 Lenovo +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This module tracks each node, tenants currently active shell sessions +# 'ConsoleSession' objects from consoleserver are used, but with the additional +# capacity for having a multiple of sessions per node active at a given time + + +import confluent.consoleserver as consoleserver +import uuid + +activesessions = {} + + +class _ShellHandler(consoleserver.ConsoleHandler): + _plugin_path = '/nodes/{0}/_shell/session' + + + + +def get_sessions(tenant, node, user): + """Get sessionids active for node + + Given a tenant, nodename, and user; provide an iterable of sessionids. + Each permutation of tenant, nodename and user have a distinct set of shell + sessions. + + :param tenant: The tenant identifier for the current scope + :param node: The nodename of the current scope. + :param user: The confluent user that will 'own' the session. + """ + return activesessions.get((tenant, node, user), {}) + + +def get_session(tenant, node, user, sessionid): + return activesessions.get((tenant, node, user), {}).get(sessionid, None) + + + + + +class ShellSession(consoleserver.ConsoleSession): + """Create a new socket to converse with a node shell session + + This object provides a filehandle that can be read/written + too in a normal fashion and the concurrency, logging, and + event watching will all be handled seamlessly. It represents a remote + CLI shell session. + + :param node: Name of the node for which this session will be created + :param configmanager: A configuration manager object for current context + :param username: Username for which this session object will operate + :param datacallback: An asynchronous data handler, to be called when data + is available. Note that if passed, it makes + 'get_next_output' non-functional + :param skipreplay: If true, will skip the attempt to redraw the screen + :param sessionid: An optional identifier to match a running session or + customize the name of a new session. + """ + + def __init__(self, node, configmanager, username, datacallback=None, + skipreplay=False, sessionid=None): + self.sessionid = sessionid + self.configmanager = configmanager + self.node = node + super(ShellSession, self).__init__(node, configmanager, username, + datacallback, skipreplay) + + def connect_session(self): + global activesessions + tenant = self.configmanager.tenant + if self.sessionid is None: + self.sessionid = str(uuid.uuid4()) + if (self.configmanager.tenant, self.node) not in activesessions: + activesessions[(tenant, self.node)] = {} + if self.sessionid not in activesessions[(tenant, self.node)]: + activesessions[(tenant, self.node)][self.sessionid] = _ShellHandler(self.node, self.configmanager) + self.conshdl = activesessions[(self.configmanager.tenant, self.node)][self.sessionid] +