diff --git a/confluent_server/confluent/auth.py b/confluent_server/confluent/auth.py index 709ac3ca..452cdc40 100644 --- a/confluent_server/confluent/auth.py +++ b/confluent_server/confluent/auth.py @@ -116,7 +116,7 @@ def authorize(name, element, tenant=False, operation='create', user, tenant = _get_usertenant(name, tenant) if tenant is not None and not configmanager.is_tenant(tenant): return None - manager = configmanager.ConfigManager(tenant) + manager = configmanager.ConfigManager(tenant, username=user) if skipuserobj: return None, manager, user, tenant, skipuserobj userobj = manager.get_user(user) @@ -178,7 +178,7 @@ def check_user_passphrase(name, passphrase, element=None, tenant=False): # invalidate cache and force the slower check del _passcache[(user, tenant)] return None - cfm = configmanager.ConfigManager(tenant) + cfm = configmanager.ConfigManager(tenant, username=user) ucfg = cfm.get_user(user) if ucfg is None or 'cryptpass' not in ucfg: eventlet.sleep(0.05) # stall even on test for existence of a username diff --git a/confluent_server/confluent/config/configmanager.py b/confluent_server/confluent/config/configmanager.py index 71462573..d241df9d 100644 --- a/confluent_server/confluent/config/configmanager.py +++ b/confluent_server/confluent/config/configmanager.py @@ -429,9 +429,10 @@ class ConfigManager(object): _nodecollwatchers = {} _notifierids = {} - def __init__(self, tenant, decrypt=False): + def __init__(self, tenant, decrypt=False, username=None): global _cfgstore self.decrypt = decrypt + self.current_user = username if tenant is None: self.tenant = None if 'main' not in _cfgstore: diff --git a/confluent_server/confluent/consoleserver.py b/confluent_server/confluent/consoleserver.py index c71cfe7d..09b4c0a1 100644 --- a/confluent_server/confluent/consoleserver.py +++ b/confluent_server/confluent/consoleserver.py @@ -18,8 +18,8 @@ # whatever filehandle is conversing with the client and starts # relaying data. It uses Ctrl-] like telnet for escape back to prompt -#we track nodes that are actively being logged, watched, or have attached -#there should be no more than one handler per node +# we track nodes that are actively being logged, watched, or have attached +# there should be no more than one handler per node import collections import confluent.config.configmanager as configmodule import confluent.exceptions as exc @@ -35,26 +35,30 @@ import traceback _handled_consoles = {} -_genwatchattribs = frozenset(('console.method', 'console.logging')) - _tracelog = None -class _ConsoleHandler(object): +class ConsoleHandler(object): + _plugin_path = '/nodes/{0}/_console/session' + _logtobuffer = True + _genwatchattribs = frozenset(('console.method', 'console.logging')) + def __init__(self, node, configmanager): self._dologging = True self._isondemand = False self.error = None - self.rcpts = {} self.cfgmgr = configmanager self.node = node self.connectstate = 'unconnected' - self.clientcount = 0 self._isalive = True - self.logger = log.Logger(node, console=True, - tenant=configmanager.tenant) self.buffer = bytearray() - (text, termstate, timestamp) = self.logger.read_recent_text(8192) + self.livesessions = set([]) + if self._logtobuffer: + self.logger = log.Logger(node, console=True, + tenant=configmanager.tenant) + (text, termstate, timestamp) = self.logger.read_recent_text(8192) + else: + (text, termstate, timestamp) = ('', 0, False) # when reading from log file, we will use wall clock # it should usually match walltime. self.lasttime = 0 @@ -79,10 +83,12 @@ class _ConsoleHandler(object): self._console = None self.connectionthread = None self.send_break = None - self._attribwatcher = self.cfgmgr.watch_attributes( - (self.node,), _genwatchattribs, self._attribschanged) + if self._genwatchattribs: + self._attribwatcher = self.cfgmgr.watch_attributes( + (self.node,), self._genwatchattribs, self._attribschanged) self.check_isondemand() if not self._isondemand: + self.connectstate = 'connecting' eventlet.spawn(self._connect) def check_isondemand(self): @@ -134,7 +140,7 @@ class _ConsoleHandler(object): self._ondemand() if logvalue == 'none': self._dologging = False - if not self._isondemand or self.clientcount > 0: + if not self._isondemand or self.livesessions: eventlet.spawn(self._connect) def log(self, *args, **kwargs): @@ -166,7 +172,7 @@ class _ConsoleHandler(object): def _ondemand(self): self._isondemand = True - if self.clientcount < 1 and self._console: + if not self.livesessions and self._console: self._disconnect() def _connect(self): @@ -186,7 +192,7 @@ class _ConsoleHandler(object): self.reconnect = None try: self._console = plugin.handle_path( - "/nodes/%s/_console/session" % self.node, + self._plugin_path.format(self.node), "create", self.cfgmgr) except: _tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event, @@ -202,11 +208,12 @@ class _ConsoleHandler(object): self.cfgmgr.remove_watcher(self._attribwatcher) self._attribwatcher = None if hasattr(self._console, "configattributes"): - attribstowatch = self._console.configattributes | _genwatchattribs + attribstowatch = self._console.configattributes | self._genwatchattribs else: - attribstowatch = _genwatchattribs - self._attribwatcher = self.cfgmgr.watch_attributes( - (self.node,), attribstowatch, self._attribschanged) + attribstowatch = self._genwatchattribs + if self._genwatchattribs: + self._attribwatcher = self.cfgmgr.watch_attributes( + (self.node,), attribstowatch, self._attribschanged) try: self._console.connect(self.get_console_output) except exc.TargetEndpointBadCredentials: @@ -268,32 +275,6 @@ class _ConsoleHandler(object): self.connectionthread.kill() self.connectionthread = None - def unregister_rcpt(self, handle): - self.clientcount -= 1 - if handle in self.rcpts: - del self.rcpts[handle] - self._send_rcpts({'clientcount': self.clientcount}) - if self._isondemand and self.clientcount < 1: - self._disconnect() - - def register_rcpt(self, callback): - self.clientcount += 1 - self._send_rcpts({'clientcount': self.clientcount}) - hdl = random.random() - while hdl in self.rcpts: - hdl = random.random() - self.rcpts[hdl] = callback - if self.connectstate == 'unconnected': - # if console is not connected, take time to try to assert - # connectivity now. - if self.reconnect: - # cancel an automated retry if one is pending - self.reconnect.cancel() - self.reconnect = None - self.connectstate = 'connecting' - eventlet.spawn(self._connect) - return hdl - def flushbuffer(self): # Logging is handled in a different stream # this buffer is now just for having screen redraw on @@ -305,39 +286,44 @@ class _ConsoleHandler(object): # to the console object eventlet.spawn(self._handle_console_output, data) - def attachuser(self, username): - if username in self.users: - self.users[username] += 1 - else: - self.users[username] = 1 - edata = self.users[username] - if edata > 2: # for log purposes, only need to - # clearly indicate redundant connections - # not connection count - edata = 2 - if edata < 0: - _tracelog.log('client count negative' + traceback.format_exc(), - ltype=log.DataTypes.event, - event=log.Events.stacktrace) - edata = 0 + def attachsession(self, session): + edata = 1 + for currsession in self.livesessions: + if currsession.username == session.username: + # indicate that user has multiple connections + edata = 2 + self.livesessions.add(session) self.log( - logdata=username, ltype=log.DataTypes.event, + logdata=session.username, ltype=log.DataTypes.event, event=log.Events.clientconnect, eventdata=edata) + self._send_rcpts({'clientcount': len(self.livesessions)}) + if self.connectstate == 'unconnected': + # if console is not connected, take time to try to assert + # connectivity now. + if self.reconnect: + # cancel an automated retry if one is pending + self.reconnect.cancel() + self.reconnect = None + self.connectstate = 'connecting' + eventlet.spawn(self._connect) - def detachuser(self, username): - self.users[username] -= 1 - if self.users[username] < 2: - edata = self.users[username] - else: - edata = 2 - if edata < 0: - _tracelog.log('client count negative' + traceback.format_exc(), - ltype=log.DataTypes.event, - event=log.Events.stacktrace) - edata = 0 + + + def detachsession(self, session): + edata = 0 + self.livesessions.discard(session) + for currsession in self.livesessions: + if currsession.username == session.username: + edata += 1 + if edata > 1: # don't bother counting beyond 2 in the log + break self.log( - logdata=username, ltype=log.DataTypes.event, + logdata=session.username, ltype=log.DataTypes.event, event=log.Events.clientdisconnect, eventdata=edata) + self._send_rcpts({'clientcount': len(self.livesessions)}) + if self._isondemand and not self.livesessions: + self._disconnect() + def reopen(self): self._got_disconnected() @@ -369,7 +355,7 @@ class _ConsoleHandler(object): self.buffer += data else: self.buffer += data.encode('utf-8') - #TODO: analyze buffer for registered events, examples: + # TODO: analyze buffer for registered events, examples: # panics # certificate signing request if len(self.buffer) > 16384: @@ -377,39 +363,40 @@ class _ConsoleHandler(object): self._send_rcpts(data) def _send_rcpts(self, data): - for rcpt in self.rcpts.itervalues(): + for rcpt in self.livesessions: try: - rcpt(data) + rcpt.data_handler(data) except: # No matter the reason, advance to next recipient - pass + _tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event, + event=log.Events.stacktrace) def get_recent(self): """Retrieve 'recent' data Replay data in the intent to perhaps reproduce the display. """ - #For now, just try to seek back in buffer to find a clear screen - #If that fails, just return buffer - #a scheme always tracking the last clear screen would be too costly + # For now, just try to seek back in buffer to find a clear screen + # If that fails, just return buffer + # a scheme always tracking the last clear screen would be too costly connstate = { 'connectstate': self.connectstate, - 'clientcount': self.clientcount, + 'clientcount': len(self.livesessions), } retdata = '' if self.shiftin is not None: # detected that terminal requested a - #shiftin character set, relay that to the terminal that cannected + # shiftin character set, relay that to the terminal that cannected retdata += '\x1b)' + self.shiftin if self.appmodedetected: retdata += '\x1b[?1h' else: retdata += '\x1b[?1l' - #an alternative would be to emulate a VT100 to know what the - #whole screen would look like - #this is one scheme to clear screen, move cursor then clear + # an alternative would be to emulate a VT100 to know what the + # whole screen would look like + # this is one scheme to clear screen, move cursor then clear bufidx = self.buffer.rfind('\x1b[H\x1b[J') if bufidx >= 0: return retdata + str(self.buffer[bufidx:]), connstate - #another scheme is the 2J scheme + # another scheme is the 2J scheme bufidx = self.buffer.rfind('\x1b[2J') if bufidx >= 0: # there was some sort of clear screen event @@ -417,8 +404,8 @@ class _ConsoleHandler(object): # in hopes that it reproduces the screen return retdata + str(self.buffer[bufidx:]), connstate else: - #we have no indication of last erase, play back last kibibyte - #to give some sense of context anyway + # we have no indication of last erase, play back last kibibyte + # to give some sense of context anyway return retdata + str(self.buffer[-1024:]), connstate def write(self, data): @@ -456,13 +443,15 @@ def start_console_sessions(): configmodule.hook_new_configmanagers(_start_tenant_sessions) -def connect_node(node, configmanager): +def connect_node(node, configmanager, username=None): consk = (node, configmanager.tenant) if consk not in _handled_consoles: - _handled_consoles[consk] = _ConsoleHandler(node, configmanager) -#this represents some api view of a console handler. This handles things like -#holding the caller specific queue data, for example, when http api should be -#sending data, but there is no outstanding POST request to hold it, + _handled_consoles[consk] = ConsoleHandler(node, configmanager) + return _handled_consoles[consk] + +# this represents some api view of a console handler. This handles things like +# holding the caller specific queue data, for example, when http api should be +# sending data, but there is no outstanding POST request to hold it, # this object has the job of holding the data @@ -474,7 +463,14 @@ class ConsoleSession(object): event watching will all be handled seamlessly :param node: Name of the node for which this session will be created + :param configmanager: A configuration manager object for current context + :param username: Username for which this session object will operate + :param datacallback: An asynchronous data handler, to be called when data + is available. Note that if passed, it makes + 'get_next_output' non-functional + :param skipreplay: If true, will skip the attempt to redraw the screen """ + connector = connect_node def __init__(self, node, configmanager, username, datacallback=None, skipreplay=False): @@ -482,30 +478,41 @@ class ConsoleSession(object): self.tenant = configmanager.tenant if not configmanager.is_node(node): raise exc.NotFoundException("Invalid node") - consk = (node, self.tenant) - self.ckey = consk self.username = username - connect_node(node, configmanager) + self.node = node + self.configmanager = configmanager + self.connect_session() self.registered = True - _handled_consoles[consk].attachuser(username) self._evt = None self.node = node - self.conshdl = _handled_consoles[consk] - self.write = _handled_consoles[consk].write + self.write = self.conshdl.write if datacallback is None: self.reaper = eventlet.spawn_after(15, self.destroy) self.databuffer = collections.deque([]) - self.reghdl = _handled_consoles[consk].register_rcpt(self.got_data) + self.data_handler = self.got_data if not skipreplay: - self.databuffer.extend(_handled_consoles[consk].get_recent()) + self.databuffer.extend(self.conshdl.get_recent()) else: - self.reghdl = _handled_consoles[consk].register_rcpt(datacallback) + self.data_handler = datacallback if not skipreplay: - for recdata in _handled_consoles[consk].get_recent(): + for recdata in self.conshdl.get_recent(): if recdata: datacallback(recdata) + self.conshdl.attachsession(self) + + def connect_session(self): + """Connect to the appropriate backend handler + + This is not intended to be called by your usual consumer, + it is a hook for confluent to abstract the concept of a terminal + between console and shell. + """ + self.conshdl = connect_node(self.node, self.configmanager, + self.username) def send_break(self): + """Send break to remote system + """ self.conshdl.send_break() def get_buffer_age(self): @@ -515,12 +522,19 @@ class ConsoleSession(object): return self.conshdl.get_buffer_age() def reopen(self): + """Reopen the session + + This can be useful if there is suspicion that the remote console is + dead. Note that developers should consider need for this a bug unless + there really is some fundamental, unavoidable limitation regarding + automatically detecting an unusable console in the underlying + technology that cannot be unambiguously autodetected. + """ self.conshdl.reopen() def destroy(self): if self.registered: - _handled_consoles[self.ckey].detachuser(self.username) - _handled_consoles[self.ckey].unregister_rcpt(self.reghdl) + self.conshdl.detachsession(self) self.databuffer = None self._evt = None self.reghdl = None @@ -529,7 +543,9 @@ class ConsoleSession(object): """Receive data from console and buffer If the caller does not provide a callback and instead will be polling - for data, we must maintain data in a buffer until retrieved + for data, we must maintain data in a buffer until retrieved. This is + an internal function used as a means to convert the async behavior to + polling for consumers that cannot do the async behavior. """ self.databuffer.append(data) if self._evt: @@ -539,7 +555,9 @@ class ConsoleSession(object): """Poll for next available output on this console. Ideally purely event driven scheme is perfect. AJAX over HTTP is - at least one case where we don't have that luxury + at least one case where we don't have that luxury. This function + will not work if the session was initialized with a data callback + instead of polling mode. """ self.reaper.cancel() if self._evt: diff --git a/confluent_server/confluent/core.py b/confluent_server/confluent/core.py index d39b8e99..5bd29bbd 100644 --- a/confluent_server/confluent/core.py +++ b/confluent_server/confluent/core.py @@ -70,6 +70,7 @@ def nested_lookup(nestdict, key): def load_plugins(): # To know our plugins directory, we get the parent path of 'bin' + _init_core() path = os.path.dirname(os.path.realpath(__file__)) plugintop = os.path.realpath(os.path.join(path, 'plugins')) plugins = set() @@ -109,147 +110,163 @@ class PluginCollection(object): def __init__(self, routedict): self.routeinfo = routedict -# _ prefix indicates internal use (e.g. special console scheme) and should not -# be enumerated in any collection -noderesources = { - 'attributes': { - 'all': PluginRoute({'handler': 'attributes'}), - 'current': PluginRoute({'handler': 'attributes'}), - }, - 'boot': { - 'nextdevice': PluginRoute({ - 'pluginattrs': ['hardwaremanagement.method'], - 'default': 'ipmi', - }), - }, - 'configuration': { - 'management_controller': { - 'alerts': { - 'destinations': PluginCollection({ +def _init_core(): + global noderesources + global nodegroupresources + import confluent.shellserver as shellserver + # _ prefix indicates internal use (e.g. special console scheme) and should not + # be enumerated in any collection + noderesources = { + 'attributes': { + 'all': PluginRoute({'handler': 'attributes'}), + 'current': PluginRoute({'handler': 'attributes'}), + }, + 'boot': { + 'nextdevice': PluginRoute({ + 'pluginattrs': ['hardwaremanagement.method'], + 'default': 'ipmi', + }), + }, + 'configuration': { + 'management_controller': { + 'alerts': { + 'destinations': PluginCollection({ + 'pluginattrs': ['hardwaremanagement.method'], + 'default': 'ipmi', + }), + }, + 'users': PluginCollection({ + 'pluginattrs': ['hardwaremanagement.method'], + 'default': 'ipmi', + }), + 'net_interfaces': PluginCollection({ + 'pluginattrs': ['hardwaremanagement.method'], + 'default': 'ipmi', + }), + 'reset': PluginRoute({ + 'pluginattrs': ['hardwaremanagement.method'], + 'default': 'ipmi', + }), + 'identifier': PluginRoute({ + 'pluginattrs': ['hardwaremanagement.method'], + 'default': 'ipmi', + }), + 'domain_name': PluginRoute({ + 'pluginattrs': ['hardwaremanagement.method'], + 'default': 'ipmi', + }), + 'ntp': { + 'enabled': PluginRoute({ + 'pluginattrs': ['hardwaremanagement.method'], + 'default': 'ipmi', + }), + 'servers': PluginCollection({ + 'pluginattrs': ['hardwaremanagement.method'], + 'default': 'ipmi', + }), + }, + } + }, + '_console': { + 'session': PluginRoute({ + 'pluginattrs': ['console.method'], + }), + }, + '_shell': { + 'session': PluginRoute({ + # For now, not configurable, wait until there's demand + 'handler': 'ssh', + }), + }, + 'shell': { + # another special case similar to console + 'sessions': PluginCollection({ + 'handler': shellserver, + }), + }, + 'console': { + # this is a dummy value, http or socket must handle special + 'session': None, + 'license': PluginRoute({ + 'pluginattrs': ['hardwaremanagement.method'], + 'default': 'ipmi', + }), + }, + 'events': { + 'hardware': { + 'log': PluginRoute({ + 'pluginattrs': ['hardwaremanagement.method'], + 'default': 'ipmi', + }), + 'decode': PluginRoute({ 'pluginattrs': ['hardwaremanagement.method'], 'default': 'ipmi', }), }, - 'users': PluginCollection({ + }, + 'health': { + 'hardware': PluginRoute({ 'pluginattrs': ['hardwaremanagement.method'], 'default': 'ipmi', }), - 'net_interfaces': PluginCollection({ - 'pluginattrs': ['hardwaremanagement.method'], - 'default': 'ipmi', - }), - 'reset': PluginRoute({ - 'pluginattrs': ['hardwaremanagement.method'], - 'default': 'ipmi', - }), - 'identifier': PluginRoute({ - 'pluginattrs': ['hardwaremanagement.method'], - 'default': 'ipmi', - }), - 'domain_name': PluginRoute({ - 'pluginattrs': ['hardwaremanagement.method'], - 'default': 'ipmi', - }), - 'ntp': { - 'enabled': PluginRoute({ - 'pluginattrs': ['hardwaremanagement.method'], - 'default': 'ipmi', - }), - 'servers': PluginCollection({ + }, + 'identify': PluginRoute({ + 'pluginattrs': ['hardwaremanagement.method'], + 'default': 'ipmi', + }), + 'inventory': { + 'hardware': { + 'all': PluginCollection({ 'pluginattrs': ['hardwaremanagement.method'], 'default': 'ipmi', }), }, - } - }, - '_console': { - 'session': PluginRoute({ - 'pluginattrs': ['console.method'], - }), - }, - 'console': { - # this is a dummy value, http or socket must handle special - 'session': PluginRoute({}), - 'license': PluginRoute({ - 'pluginattrs': ['hardwaremanagement.method'], - 'default': 'ipmi', - }), - }, - 'events': { - 'hardware': { - 'log': PluginRoute({ - 'pluginattrs': ['hardwaremanagement.method'], - 'default': 'ipmi', - }), - 'decode': PluginRoute({ - 'pluginattrs': ['hardwaremanagement.method'], - 'default': 'ipmi', - }), - }, - }, - 'health': { - 'hardware': PluginRoute({ - 'pluginattrs': ['hardwaremanagement.method'], - 'default': 'ipmi', - }), - }, - 'identify': PluginRoute({ - 'pluginattrs': ['hardwaremanagement.method'], - 'default': 'ipmi', - }), - 'inventory': { - 'hardware': { - 'all': PluginCollection({ - 'pluginattrs': ['hardwaremanagement.method'], - 'default': 'ipmi', - }), - }, - 'firmware': { - 'all': PluginCollection({ - 'pluginattrs': ['hardwaremanagement.method'], - 'default': 'ipmi', - }), - }, - }, - 'power': { - 'state': PluginRoute({ - 'pluginattrs': ['hardwaremanagement.method'], - 'default': 'ipmi', - }), - }, - 'sensors': { - 'hardware': { - 'all': PluginCollection({ - 'pluginattrs': ['hardwaremanagement.method'], - 'default': 'ipmi', - }), - 'temperature': PluginCollection({ - 'pluginattrs': ['hardwaremanagement.method'], - 'default': 'ipmi', - }), - 'power': PluginCollection({ - 'pluginattrs': ['hardwaremanagement.method'], - 'default': 'ipmi', - }), - 'fans': PluginCollection({ - 'pluginattrs': ['hardwaremanagement.method'], - 'default': 'ipmi', - }), - 'leds': PluginCollection({ + 'firmware': { + 'all': PluginCollection({ + 'pluginattrs': ['hardwaremanagement.method'], + 'default': 'ipmi', + }), + }, + }, + 'power': { + 'state': PluginRoute({ 'pluginattrs': ['hardwaremanagement.method'], 'default': 'ipmi', }), }, + 'sensors': { + 'hardware': { + 'all': PluginCollection({ + 'pluginattrs': ['hardwaremanagement.method'], + 'default': 'ipmi', + }), + 'temperature': PluginCollection({ + 'pluginattrs': ['hardwaremanagement.method'], + 'default': 'ipmi', + }), + 'power': PluginCollection({ + 'pluginattrs': ['hardwaremanagement.method'], + 'default': 'ipmi', + }), + 'fans': PluginCollection({ + 'pluginattrs': ['hardwaremanagement.method'], + 'default': 'ipmi', + }), + 'leds': PluginCollection({ + 'pluginattrs': ['hardwaremanagement.method'], + 'default': 'ipmi', + }), + }, - }, -} + }, + } -nodegroupresources = { - 'attributes': { - 'all': PluginRoute({'handler': 'attributes'}), - 'current': PluginRoute({'handler': 'attributes'}), - }, -} + nodegroupresources = { + 'attributes': { + 'all': PluginRoute({'handler': 'attributes'}), + 'current': PluginRoute({'handler': 'attributes'}), + }, + } def create_user(inputdata, configmanager): @@ -500,6 +517,8 @@ def handle_node_request(configmanager, inputdata, operation, iscollection = True elif isinstance(routespec, PluginCollection): iscollection = False # it is a collection, but plugin defined + elif routespec is None: + raise exc.InvalidArgumentException('Custom interface required for resource') if iscollection: if operation == "delete": return delete_node_collection(pathcomponents, configmanager) @@ -513,13 +532,18 @@ def handle_node_request(configmanager, inputdata, operation, inputdata = msg.get_input_message( pathcomponents, operation, inputdata, nodes, isnoderange) if 'handler' in plugroute: # fixed handler definition, easy enough - hfunc = getattr(pluginmap[plugroute['handler']], operation) + if isinstance(plugroute['handler'], str): + hfunc = getattr(pluginmap[plugroute['handler']], operation) + else: + hfunc = getattr(plugroute['handler'], operation) passvalue = hfunc( nodes=nodes, element=pathcomponents, configmanager=configmanager, inputdata=inputdata) if isnoderange: return passvalue + elif isinstance(passvalue, console.Console): + return passvalue else: return stripnode(passvalue, nodes[0]) elif 'pluginattrs' in plugroute: diff --git a/confluent_server/confluent/httpapi.py b/confluent_server/confluent/httpapi.py index f294c7c8..c66f8f32 100644 --- a/confluent_server/confluent/httpapi.py +++ b/confluent_server/confluent/httpapi.py @@ -25,6 +25,7 @@ import confluent.exceptions as exc import confluent.log as log import confluent.messages import confluent.core as pluginapi +import confluent.shellserver as shellserver import confluent.tlvdata import confluent.util as util import copy @@ -326,9 +327,15 @@ def resourcehandler_backend(env, start_response): ("Set-Cookie", m.OutputString()) for m in authorized['cookie'].values()) cfgmgr = authorized['cfgmgr'] - if '/console/session' in env['PATH_INFO']: + if (operation == 'create' and ('/console/session' in env['PATH_INFO'] or + '/shell/sessions/' in env['PATH_INFO'])): #hard bake JSON into this path, do not support other incarnations - prefix, _, _ = env['PATH_INFO'].partition('/console/session') + if '/console/session' in env['PATH_INFO']: + prefix, _, _ = env['PATH_INFO'].partition('/console/session') + shellsession = False + elif '/shell/sessions/' in env['PATH_INFO']: + prefix, _, _ = env['PATH_INFO'].partition('/shell/sessions') + shellsession = True _, _, nodename = prefix.rpartition('/') if 'session' not in querydict.keys() or not querydict['session']: auditmsg = { @@ -344,9 +351,14 @@ def resourcehandler_backend(env, start_response): if 'skipreplay' in querydict and querydict['skipreplay']: skipreplay = True try: - consession = consoleserver.ConsoleSession( - node=nodename, configmanager=cfgmgr, - username=authorized['username'], skipreplay=skipreplay) + if shellsession: + consession = shellserver.ShellSession( + node=nodename, configmanager=cfgmgr, + username=authorized['username'], skipreplay=skipreplay) + else: + consession = consoleserver.ConsoleSession( + node=nodename, configmanager=cfgmgr, + username=authorized['username'], skipreplay=skipreplay) except exc.NotFoundException: start_response("404 Not found", headers) yield "404 - Request Path not recognized" diff --git a/confluent_server/confluent/plugins/shell/ssh.py b/confluent_server/confluent/plugins/shell/ssh.py new file mode 100644 index 00000000..03047ab1 --- /dev/null +++ b/confluent_server/confluent/plugins/shell/ssh.py @@ -0,0 +1,105 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2015 Lenovo +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +__author__ = 'jjohnson2' + +# This plugin provides an ssh implementation comforming to the 'console' +# specification. consoleserver or shellserver would be equally likely +# to use this. + +import confluent.interface.console as conapi +import eventlet +paramiko = eventlet.import_patched('paramiko') + + +class SshShell(conapi.Console): + + def __init__(self, node, config, username='', password=''): + self.node = node + self.ssh = None + self.nodeconfig = config + self.username = username + self.password = password + self.inputmode = 0 # 0 = username, 1 = password... + + def recvdata(self): + while self.connected: + pendingdata = self.shell.recv(8192) + if pendingdata == '': + self.datacallback(conapi.ConsoleEvent.Disconnect) + return + self.datacallback(pendingdata) + + def connect(self, callback): + # for now, we just use the nodename as the presumptive ssh destination + #TODO(jjohnson2): use a 'nodeipget' utility function for architectures + # that would rather not use the nodename as anything but an opaque + # identifier + self.datacallback = callback + if self.username is not '': + self.logon() + else: + self.inputmode = 0 + callback('\r\nlogin as: ') + return + + def logon(self): + self.ssh = paramiko.SSHClient() + self.ssh.set_missing_host_key_policy(paramiko.client.AutoAddPolicy()) + try: + self.ssh.connect(self.node, username=self.username, + password=self.password, allow_agent=False, + look_for_keys=False) + except paramiko.AuthenticationException: + self.inputmode = 0 + self.username = '' + self.password = '' + self.datacallback('\r\nlogin as: ') + return + self.inputmode = 2 + self.connected = True + self.shell = self.ssh.invoke_shell() + self.rxthread = eventlet.spawn(self.recvdata) + + def write(self, data): + if self.inputmode == 0: + self.username += data + if '\r' in self.username: + self.username, self.password = self.username.split('\r') + lastdata = data.split('\r')[0] + if lastdata != '': + self.datacallback(lastdata) + self.datacallback('\r\nEnter password: ') + self.inputmode = 1 + else: + # echo back typed data + self.datacallback(data) + elif self.inputmode == 1: + self.password += data + if '\r' in self.password: + self.password = self.password.split('\r')[0] + self.datacallback('\r\n') + self.logon() + else: + self.shell.sendall(data) + + def close(self): + if self.ssh is not None: + self.ssh.close() + +def create(nodes, element, configmanager, inputdata): + if len(nodes) == 1: + return SshShell(nodes[0], configmanager) \ No newline at end of file diff --git a/confluent_server/confluent/shellserver.py b/confluent_server/confluent/shellserver.py new file mode 100644 index 00000000..6dea5345 --- /dev/null +++ b/confluent_server/confluent/shellserver.py @@ -0,0 +1,122 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2016 Lenovo +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This module tracks each node, tenants currently active shell sessions +# 'ConsoleSession' objects from consoleserver are used, but with the additional +# capacity for having a multiple of sessions per node active at a given time + + +import confluent.consoleserver as consoleserver +import confluent.exceptions as exc +import confluent.messages as msg +activesessions = {} + + +class _ShellHandler(consoleserver.ConsoleHandler): + _plugin_path = '/nodes/{0}/_shell/session' + _genwatchattribs = False + _logtobuffer = False + + def log(self, *args, **kwargs): + # suppress logging through proving a stub 'log' function + return + + def _got_disconnected(self): + self.connectstate = 'closed' + self._send_rcpts({'connectstate': self.connectstate}) + for session in self.livesessions: + session.destroy() + + + + +def get_sessions(tenant, node, user): + """Get sessionids active for node + + Given a tenant, nodename, and user; provide an iterable of sessionids. + Each permutation of tenant, nodename and user have a distinct set of shell + sessions. + + :param tenant: The tenant identifier for the current scope + :param node: The nodename of the current scope. + :param user: The confluent user that will 'own' the session. + """ + return activesessions.get((tenant, node, user), {}) + + +def get_session(tenant, node, user, sessionid): + return activesessions.get((tenant, node, user), {}).get(sessionid, None) + + + + + +class ShellSession(consoleserver.ConsoleSession): + """Create a new socket to converse with a node shell session + + This object provides a filehandle that can be read/written + too in a normal fashion and the concurrency, logging, and + event watching will all be handled seamlessly. It represents a remote + CLI shell session. + + :param node: Name of the node for which this session will be created + :param configmanager: A configuration manager object for current context + :param username: Username for which this session object will operate + :param datacallback: An asynchronous data handler, to be called when data + is available. Note that if passed, it makes + 'get_next_output' non-functional + :param skipreplay: If true, will skip the attempt to redraw the screen + :param sessionid: An optional identifier to match a running session or + customize the name of a new session. + """ + + def __init__(self, node, configmanager, username, datacallback=None, + skipreplay=False, sessionid=None): + self.sessionid = sessionid + self.configmanager = configmanager + self.node = node + super(ShellSession, self).__init__(node, configmanager, username, + datacallback, skipreplay) + + def connect_session(self): + global activesessions + tenant = self.configmanager.tenant + if (self.configmanager.tenant, self.node) not in activesessions: + activesessions[(tenant, self.node)] = {} + if self.sessionid is None: + self.sessionid = 1 + while str(self.sessionid) in activesessions[(tenant, self.node)]: + self.sessionid += 1 + self.sessionid = str(self.sessionid) + if self.sessionid not in activesessions[(tenant, self.node)]: + activesessions[(tenant, self.node)][self.sessionid] = _ShellHandler(self.node, self.configmanager) + self.conshdl = activesessions[(self.configmanager.tenant, self.node)][self.sessionid] + + def destroy(self): + del activesessions[(self.configmanager.tenant, self.node)][self.sessionid] + super(ShellSession, self).destroy() + +def create(nodes, element, configmanager, inputdata): + # For creating a resource, it really has to be handled + # in httpapi/sockapi specially, like a console. + raise exc.InvalidArgumentException('Special client code required') + +def retrieve(nodes, element, configmanager, inputdata): + tenant = configmanager.tenant + user = configmanager.current_user + if (tenant, nodes[0]) in activesessions: + for sessionid in activesessions[(tenant, nodes[0])]: + yield msg.ChildCollection(sessionid) diff --git a/confluent_server/confluent/sockapi.py b/confluent_server/confluent/sockapi.py index 27caab99..38bf589c 100644 --- a/confluent_server/confluent/sockapi.py +++ b/confluent_server/confluent/sockapi.py @@ -40,6 +40,7 @@ import confluent.config.configmanager as configmanager import confluent.exceptions as exc import confluent.log as log import confluent.core as pluginapi +import confluent.shellserver as shellserver tracelog = None @@ -88,7 +89,7 @@ def sessionhdl(connection, authname, skipauth=False): cfm = None if skipauth: authenticated = True - cfm = configmanager.ConfigManager(tenant=None) + cfm = configmanager.ConfigManager(tenant=None, username=authname) elif authname: authdata = auth.authorize(authname, element=None) if authdata is not None: @@ -178,7 +179,7 @@ def process_request(connection, request, cfm, authdata, authname, skipauth): raise ValueError operation = request['operation'] path = request['path'] - params = request.get('parameters', None) + params = request.get('parameters', {}) hdlr = None if not skipauth: authdata = auth.authorize(authdata[2], path, authdata[3], operation) @@ -197,42 +198,7 @@ def process_request(connection, request, cfm, authdata, authname, skipauth): auditlog.log(auditmsg) try: if operation == 'start': - elems = path.split('/') - if elems[3] != "console": - raise exc.InvalidArgumentException() - node = elems[2] - ccons = ClientConsole(connection) - skipreplay = False - if params and 'skipreplay' in params and params['skipreplay']: - skipreplay = True - consession = consoleserver.ConsoleSession( - node=node, configmanager=cfm, username=authname, - datacallback=ccons.sendall, skipreplay=skipreplay) - if consession is None: - raise Exception("TODO") - send_data(connection, {'started': 1}) - ccons.startsending() - bufferage = consession.get_buffer_age() - if bufferage is not False: - send_data(connection, {'bufferage': bufferage}) - while consession is not None: - data = tlvdata.recv(connection) - if type(data) == dict: - if data['operation'] == 'stop': - consession.destroy() - return - elif data['operation'] == 'break': - consession.send_break() - continue - elif data['operation'] == 'reopen': - consession.reopen() - continue - else: - raise Exception("TODO") - if not data: - consession.destroy() - return - consession.write(data) + return start_term(authname, cfm, connection, params, path) elif operation == 'shutdown': configmanager.ConfigManager.shutdown() else: @@ -249,6 +215,59 @@ def process_request(connection, request, cfm, authdata, authname, skipauth): return +def start_term(authname, cfm, connection, params, path): + elems = path.split('/') + if len(elems) < 4 or elems[1] != 'nodes': + raise exc.InvalidArgumentException('Invalid path {0}'.format(path)) + node = elems[2] + ccons = ClientConsole(connection) + skipreplay = False + if params and 'skipreplay' in params and params['skipreplay']: + skipreplay = True + if elems[3] == "console": + consession = consoleserver.ConsoleSession( + node=node, configmanager=cfm, username=authname, + datacallback=ccons.sendall, skipreplay=skipreplay) + elif len(elems) >= 6 and elems[3:5] == ['shell', 'sessions']: + if len(elems) == 7: + sessionid = elems[5] + else: + sessionid = None + consession = shellserver.ShellSession( + node=node, configmanager=cfm, username=authname, + datacallback=ccons.sendall, skipreplay=skipreplay, + sessionid=sessionid) + + else: + raise exc.InvalidArgumentException('Invalid path {0}'.format(path)) + + if consession is None: + raise Exception("TODO") + send_data(connection, {'started': 1}) + ccons.startsending() + bufferage = consession.get_buffer_age() + if bufferage is not False: + send_data(connection, {'bufferage': bufferage}) + while consession is not None: + data = tlvdata.recv(connection) + if type(data) == dict: + if data['operation'] == 'stop': + consession.destroy() + return + elif data['operation'] == 'break': + consession.send_break() + continue + elif data['operation'] == 'reopen': + consession.reopen() + continue + else: + raise Exception("TODO") + if not data: + consession.destroy() + return + consession.write(data) + + def _tlshandler(bind_host, bind_port): plainsocket = socket.socket(socket.AF_INET6) plainsocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) diff --git a/confluent_server/setup.py.tmpl b/confluent_server/setup.py.tmpl index 86ffe696..0b8a3301 100644 --- a/confluent_server/setup.py.tmpl +++ b/confluent_server/setup.py.tmpl @@ -10,6 +10,7 @@ setup( description='confluent systems management server', packages=['confluent', 'confluent/config', 'confluent/interface', 'confluent/plugins/hardwaremanagement/', + 'confluent/plugins/shell/', 'confluent/plugins/configuration/'], install_requires=['pycrypto>=2.6', 'confluent_client>=0.1.0', 'eventlet', 'pyghmi>=0.6.5'],