2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-02-19 20:16:04 +00:00

Merge pull request #55 from jjohnson42/sshconsole

Implement shell infrastructure with ssh support
This commit is contained in:
Jarrod Johnson 2016-01-08 15:06:26 -05:00
commit 366de1235c
9 changed files with 580 additions and 278 deletions

View File

@ -116,7 +116,7 @@ def authorize(name, element, tenant=False, operation='create',
user, tenant = _get_usertenant(name, tenant)
if tenant is not None and not configmanager.is_tenant(tenant):
return None
manager = configmanager.ConfigManager(tenant)
manager = configmanager.ConfigManager(tenant, username=user)
if skipuserobj:
return None, manager, user, tenant, skipuserobj
userobj = manager.get_user(user)
@ -178,7 +178,7 @@ def check_user_passphrase(name, passphrase, element=None, tenant=False):
# invalidate cache and force the slower check
del _passcache[(user, tenant)]
return None
cfm = configmanager.ConfigManager(tenant)
cfm = configmanager.ConfigManager(tenant, username=user)
ucfg = cfm.get_user(user)
if ucfg is None or 'cryptpass' not in ucfg:
eventlet.sleep(0.05) # stall even on test for existence of a username

View File

@ -429,9 +429,10 @@ class ConfigManager(object):
_nodecollwatchers = {}
_notifierids = {}
def __init__(self, tenant, decrypt=False):
def __init__(self, tenant, decrypt=False, username=None):
global _cfgstore
self.decrypt = decrypt
self.current_user = username
if tenant is None:
self.tenant = None
if 'main' not in _cfgstore:

View File

@ -18,8 +18,8 @@
# whatever filehandle is conversing with the client and starts
# relaying data. It uses Ctrl-] like telnet for escape back to prompt
#we track nodes that are actively being logged, watched, or have attached
#there should be no more than one handler per node
# we track nodes that are actively being logged, watched, or have attached
# there should be no more than one handler per node
import collections
import confluent.config.configmanager as configmodule
import confluent.exceptions as exc
@ -35,26 +35,30 @@ import traceback
_handled_consoles = {}
_genwatchattribs = frozenset(('console.method', 'console.logging'))
_tracelog = None
class _ConsoleHandler(object):
class ConsoleHandler(object):
_plugin_path = '/nodes/{0}/_console/session'
_logtobuffer = True
_genwatchattribs = frozenset(('console.method', 'console.logging'))
def __init__(self, node, configmanager):
self._dologging = True
self._isondemand = False
self.error = None
self.rcpts = {}
self.cfgmgr = configmanager
self.node = node
self.connectstate = 'unconnected'
self.clientcount = 0
self._isalive = True
self.logger = log.Logger(node, console=True,
tenant=configmanager.tenant)
self.buffer = bytearray()
(text, termstate, timestamp) = self.logger.read_recent_text(8192)
self.livesessions = set([])
if self._logtobuffer:
self.logger = log.Logger(node, console=True,
tenant=configmanager.tenant)
(text, termstate, timestamp) = self.logger.read_recent_text(8192)
else:
(text, termstate, timestamp) = ('', 0, False)
# when reading from log file, we will use wall clock
# it should usually match walltime.
self.lasttime = 0
@ -79,10 +83,12 @@ class _ConsoleHandler(object):
self._console = None
self.connectionthread = None
self.send_break = None
self._attribwatcher = self.cfgmgr.watch_attributes(
(self.node,), _genwatchattribs, self._attribschanged)
if self._genwatchattribs:
self._attribwatcher = self.cfgmgr.watch_attributes(
(self.node,), self._genwatchattribs, self._attribschanged)
self.check_isondemand()
if not self._isondemand:
self.connectstate = 'connecting'
eventlet.spawn(self._connect)
def check_isondemand(self):
@ -134,7 +140,7 @@ class _ConsoleHandler(object):
self._ondemand()
if logvalue == 'none':
self._dologging = False
if not self._isondemand or self.clientcount > 0:
if not self._isondemand or self.livesessions:
eventlet.spawn(self._connect)
def log(self, *args, **kwargs):
@ -166,7 +172,7 @@ class _ConsoleHandler(object):
def _ondemand(self):
self._isondemand = True
if self.clientcount < 1 and self._console:
if not self.livesessions and self._console:
self._disconnect()
def _connect(self):
@ -186,7 +192,7 @@ class _ConsoleHandler(object):
self.reconnect = None
try:
self._console = plugin.handle_path(
"/nodes/%s/_console/session" % self.node,
self._plugin_path.format(self.node),
"create", self.cfgmgr)
except:
_tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event,
@ -202,11 +208,12 @@ class _ConsoleHandler(object):
self.cfgmgr.remove_watcher(self._attribwatcher)
self._attribwatcher = None
if hasattr(self._console, "configattributes"):
attribstowatch = self._console.configattributes | _genwatchattribs
attribstowatch = self._console.configattributes | self._genwatchattribs
else:
attribstowatch = _genwatchattribs
self._attribwatcher = self.cfgmgr.watch_attributes(
(self.node,), attribstowatch, self._attribschanged)
attribstowatch = self._genwatchattribs
if self._genwatchattribs:
self._attribwatcher = self.cfgmgr.watch_attributes(
(self.node,), attribstowatch, self._attribschanged)
try:
self._console.connect(self.get_console_output)
except exc.TargetEndpointBadCredentials:
@ -268,32 +275,6 @@ class _ConsoleHandler(object):
self.connectionthread.kill()
self.connectionthread = None
def unregister_rcpt(self, handle):
self.clientcount -= 1
if handle in self.rcpts:
del self.rcpts[handle]
self._send_rcpts({'clientcount': self.clientcount})
if self._isondemand and self.clientcount < 1:
self._disconnect()
def register_rcpt(self, callback):
self.clientcount += 1
self._send_rcpts({'clientcount': self.clientcount})
hdl = random.random()
while hdl in self.rcpts:
hdl = random.random()
self.rcpts[hdl] = callback
if self.connectstate == 'unconnected':
# if console is not connected, take time to try to assert
# connectivity now.
if self.reconnect:
# cancel an automated retry if one is pending
self.reconnect.cancel()
self.reconnect = None
self.connectstate = 'connecting'
eventlet.spawn(self._connect)
return hdl
def flushbuffer(self):
# Logging is handled in a different stream
# this buffer is now just for having screen redraw on
@ -305,39 +286,44 @@ class _ConsoleHandler(object):
# to the console object
eventlet.spawn(self._handle_console_output, data)
def attachuser(self, username):
if username in self.users:
self.users[username] += 1
else:
self.users[username] = 1
edata = self.users[username]
if edata > 2: # for log purposes, only need to
# clearly indicate redundant connections
# not connection count
edata = 2
if edata < 0:
_tracelog.log('client count negative' + traceback.format_exc(),
ltype=log.DataTypes.event,
event=log.Events.stacktrace)
edata = 0
def attachsession(self, session):
edata = 1
for currsession in self.livesessions:
if currsession.username == session.username:
# indicate that user has multiple connections
edata = 2
self.livesessions.add(session)
self.log(
logdata=username, ltype=log.DataTypes.event,
logdata=session.username, ltype=log.DataTypes.event,
event=log.Events.clientconnect, eventdata=edata)
self._send_rcpts({'clientcount': len(self.livesessions)})
if self.connectstate == 'unconnected':
# if console is not connected, take time to try to assert
# connectivity now.
if self.reconnect:
# cancel an automated retry if one is pending
self.reconnect.cancel()
self.reconnect = None
self.connectstate = 'connecting'
eventlet.spawn(self._connect)
def detachuser(self, username):
self.users[username] -= 1
if self.users[username] < 2:
edata = self.users[username]
else:
edata = 2
if edata < 0:
_tracelog.log('client count negative' + traceback.format_exc(),
ltype=log.DataTypes.event,
event=log.Events.stacktrace)
edata = 0
def detachsession(self, session):
edata = 0
self.livesessions.discard(session)
for currsession in self.livesessions:
if currsession.username == session.username:
edata += 1
if edata > 1: # don't bother counting beyond 2 in the log
break
self.log(
logdata=username, ltype=log.DataTypes.event,
logdata=session.username, ltype=log.DataTypes.event,
event=log.Events.clientdisconnect, eventdata=edata)
self._send_rcpts({'clientcount': len(self.livesessions)})
if self._isondemand and not self.livesessions:
self._disconnect()
def reopen(self):
self._got_disconnected()
@ -369,7 +355,7 @@ class _ConsoleHandler(object):
self.buffer += data
else:
self.buffer += data.encode('utf-8')
#TODO: analyze buffer for registered events, examples:
# TODO: analyze buffer for registered events, examples:
# panics
# certificate signing request
if len(self.buffer) > 16384:
@ -377,39 +363,40 @@ class _ConsoleHandler(object):
self._send_rcpts(data)
def _send_rcpts(self, data):
for rcpt in self.rcpts.itervalues():
for rcpt in self.livesessions:
try:
rcpt(data)
rcpt.data_handler(data)
except: # No matter the reason, advance to next recipient
pass
_tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event,
event=log.Events.stacktrace)
def get_recent(self):
"""Retrieve 'recent' data
Replay data in the intent to perhaps reproduce the display.
"""
#For now, just try to seek back in buffer to find a clear screen
#If that fails, just return buffer
#a scheme always tracking the last clear screen would be too costly
# For now, just try to seek back in buffer to find a clear screen
# If that fails, just return buffer
# a scheme always tracking the last clear screen would be too costly
connstate = {
'connectstate': self.connectstate,
'clientcount': self.clientcount,
'clientcount': len(self.livesessions),
}
retdata = ''
if self.shiftin is not None: # detected that terminal requested a
#shiftin character set, relay that to the terminal that cannected
# shiftin character set, relay that to the terminal that cannected
retdata += '\x1b)' + self.shiftin
if self.appmodedetected:
retdata += '\x1b[?1h'
else:
retdata += '\x1b[?1l'
#an alternative would be to emulate a VT100 to know what the
#whole screen would look like
#this is one scheme to clear screen, move cursor then clear
# an alternative would be to emulate a VT100 to know what the
# whole screen would look like
# this is one scheme to clear screen, move cursor then clear
bufidx = self.buffer.rfind('\x1b[H\x1b[J')
if bufidx >= 0:
return retdata + str(self.buffer[bufidx:]), connstate
#another scheme is the 2J scheme
# another scheme is the 2J scheme
bufidx = self.buffer.rfind('\x1b[2J')
if bufidx >= 0:
# there was some sort of clear screen event
@ -417,8 +404,8 @@ class _ConsoleHandler(object):
# in hopes that it reproduces the screen
return retdata + str(self.buffer[bufidx:]), connstate
else:
#we have no indication of last erase, play back last kibibyte
#to give some sense of context anyway
# we have no indication of last erase, play back last kibibyte
# to give some sense of context anyway
return retdata + str(self.buffer[-1024:]), connstate
def write(self, data):
@ -456,13 +443,15 @@ def start_console_sessions():
configmodule.hook_new_configmanagers(_start_tenant_sessions)
def connect_node(node, configmanager):
def connect_node(node, configmanager, username=None):
consk = (node, configmanager.tenant)
if consk not in _handled_consoles:
_handled_consoles[consk] = _ConsoleHandler(node, configmanager)
#this represents some api view of a console handler. This handles things like
#holding the caller specific queue data, for example, when http api should be
#sending data, but there is no outstanding POST request to hold it,
_handled_consoles[consk] = ConsoleHandler(node, configmanager)
return _handled_consoles[consk]
# this represents some api view of a console handler. This handles things like
# holding the caller specific queue data, for example, when http api should be
# sending data, but there is no outstanding POST request to hold it,
# this object has the job of holding the data
@ -474,7 +463,14 @@ class ConsoleSession(object):
event watching will all be handled seamlessly
:param node: Name of the node for which this session will be created
:param configmanager: A configuration manager object for current context
:param username: Username for which this session object will operate
:param datacallback: An asynchronous data handler, to be called when data
is available. Note that if passed, it makes
'get_next_output' non-functional
:param skipreplay: If true, will skip the attempt to redraw the screen
"""
connector = connect_node
def __init__(self, node, configmanager, username, datacallback=None,
skipreplay=False):
@ -482,30 +478,41 @@ class ConsoleSession(object):
self.tenant = configmanager.tenant
if not configmanager.is_node(node):
raise exc.NotFoundException("Invalid node")
consk = (node, self.tenant)
self.ckey = consk
self.username = username
connect_node(node, configmanager)
self.node = node
self.configmanager = configmanager
self.connect_session()
self.registered = True
_handled_consoles[consk].attachuser(username)
self._evt = None
self.node = node
self.conshdl = _handled_consoles[consk]
self.write = _handled_consoles[consk].write
self.write = self.conshdl.write
if datacallback is None:
self.reaper = eventlet.spawn_after(15, self.destroy)
self.databuffer = collections.deque([])
self.reghdl = _handled_consoles[consk].register_rcpt(self.got_data)
self.data_handler = self.got_data
if not skipreplay:
self.databuffer.extend(_handled_consoles[consk].get_recent())
self.databuffer.extend(self.conshdl.get_recent())
else:
self.reghdl = _handled_consoles[consk].register_rcpt(datacallback)
self.data_handler = datacallback
if not skipreplay:
for recdata in _handled_consoles[consk].get_recent():
for recdata in self.conshdl.get_recent():
if recdata:
datacallback(recdata)
self.conshdl.attachsession(self)
def connect_session(self):
"""Connect to the appropriate backend handler
This is not intended to be called by your usual consumer,
it is a hook for confluent to abstract the concept of a terminal
between console and shell.
"""
self.conshdl = connect_node(self.node, self.configmanager,
self.username)
def send_break(self):
"""Send break to remote system
"""
self.conshdl.send_break()
def get_buffer_age(self):
@ -515,12 +522,19 @@ class ConsoleSession(object):
return self.conshdl.get_buffer_age()
def reopen(self):
"""Reopen the session
This can be useful if there is suspicion that the remote console is
dead. Note that developers should consider need for this a bug unless
there really is some fundamental, unavoidable limitation regarding
automatically detecting an unusable console in the underlying
technology that cannot be unambiguously autodetected.
"""
self.conshdl.reopen()
def destroy(self):
if self.registered:
_handled_consoles[self.ckey].detachuser(self.username)
_handled_consoles[self.ckey].unregister_rcpt(self.reghdl)
self.conshdl.detachsession(self)
self.databuffer = None
self._evt = None
self.reghdl = None
@ -529,7 +543,9 @@ class ConsoleSession(object):
"""Receive data from console and buffer
If the caller does not provide a callback and instead will be polling
for data, we must maintain data in a buffer until retrieved
for data, we must maintain data in a buffer until retrieved. This is
an internal function used as a means to convert the async behavior to
polling for consumers that cannot do the async behavior.
"""
self.databuffer.append(data)
if self._evt:
@ -539,7 +555,9 @@ class ConsoleSession(object):
"""Poll for next available output on this console.
Ideally purely event driven scheme is perfect. AJAX over HTTP is
at least one case where we don't have that luxury
at least one case where we don't have that luxury. This function
will not work if the session was initialized with a data callback
instead of polling mode.
"""
self.reaper.cancel()
if self._evt:

View File

@ -70,6 +70,7 @@ def nested_lookup(nestdict, key):
def load_plugins():
# To know our plugins directory, we get the parent path of 'bin'
_init_core()
path = os.path.dirname(os.path.realpath(__file__))
plugintop = os.path.realpath(os.path.join(path, 'plugins'))
plugins = set()
@ -109,147 +110,163 @@ class PluginCollection(object):
def __init__(self, routedict):
self.routeinfo = routedict
# _ prefix indicates internal use (e.g. special console scheme) and should not
# be enumerated in any collection
noderesources = {
'attributes': {
'all': PluginRoute({'handler': 'attributes'}),
'current': PluginRoute({'handler': 'attributes'}),
},
'boot': {
'nextdevice': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
},
'configuration': {
'management_controller': {
'alerts': {
'destinations': PluginCollection({
def _init_core():
global noderesources
global nodegroupresources
import confluent.shellserver as shellserver
# _ prefix indicates internal use (e.g. special console scheme) and should not
# be enumerated in any collection
noderesources = {
'attributes': {
'all': PluginRoute({'handler': 'attributes'}),
'current': PluginRoute({'handler': 'attributes'}),
},
'boot': {
'nextdevice': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
},
'configuration': {
'management_controller': {
'alerts': {
'destinations': PluginCollection({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
},
'users': PluginCollection({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'net_interfaces': PluginCollection({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'reset': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'identifier': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'domain_name': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'ntp': {
'enabled': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'servers': PluginCollection({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
},
}
},
'_console': {
'session': PluginRoute({
'pluginattrs': ['console.method'],
}),
},
'_shell': {
'session': PluginRoute({
# For now, not configurable, wait until there's demand
'handler': 'ssh',
}),
},
'shell': {
# another special case similar to console
'sessions': PluginCollection({
'handler': shellserver,
}),
},
'console': {
# this is a dummy value, http or socket must handle special
'session': None,
'license': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
},
'events': {
'hardware': {
'log': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'decode': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
},
'users': PluginCollection({
},
'health': {
'hardware': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'net_interfaces': PluginCollection({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'reset': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'identifier': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'domain_name': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'ntp': {
'enabled': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'servers': PluginCollection({
},
'identify': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'inventory': {
'hardware': {
'all': PluginCollection({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
},
}
},
'_console': {
'session': PluginRoute({
'pluginattrs': ['console.method'],
}),
},
'console': {
# this is a dummy value, http or socket must handle special
'session': PluginRoute({}),
'license': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
},
'events': {
'hardware': {
'log': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'decode': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
},
},
'health': {
'hardware': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
},
'identify': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'inventory': {
'hardware': {
'all': PluginCollection({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
},
'firmware': {
'all': PluginCollection({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
},
},
'power': {
'state': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
},
'sensors': {
'hardware': {
'all': PluginCollection({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'temperature': PluginCollection({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'power': PluginCollection({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'fans': PluginCollection({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'leds': PluginCollection({
'firmware': {
'all': PluginCollection({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
},
},
'power': {
'state': PluginRoute({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
},
'sensors': {
'hardware': {
'all': PluginCollection({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'temperature': PluginCollection({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'power': PluginCollection({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'fans': PluginCollection({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
'leds': PluginCollection({
'pluginattrs': ['hardwaremanagement.method'],
'default': 'ipmi',
}),
},
},
}
},
}
nodegroupresources = {
'attributes': {
'all': PluginRoute({'handler': 'attributes'}),
'current': PluginRoute({'handler': 'attributes'}),
},
}
nodegroupresources = {
'attributes': {
'all': PluginRoute({'handler': 'attributes'}),
'current': PluginRoute({'handler': 'attributes'}),
},
}
def create_user(inputdata, configmanager):
@ -500,6 +517,8 @@ def handle_node_request(configmanager, inputdata, operation,
iscollection = True
elif isinstance(routespec, PluginCollection):
iscollection = False # it is a collection, but plugin defined
elif routespec is None:
raise exc.InvalidArgumentException('Custom interface required for resource')
if iscollection:
if operation == "delete":
return delete_node_collection(pathcomponents, configmanager)
@ -513,13 +532,18 @@ def handle_node_request(configmanager, inputdata, operation,
inputdata = msg.get_input_message(
pathcomponents, operation, inputdata, nodes, isnoderange)
if 'handler' in plugroute: # fixed handler definition, easy enough
hfunc = getattr(pluginmap[plugroute['handler']], operation)
if isinstance(plugroute['handler'], str):
hfunc = getattr(pluginmap[plugroute['handler']], operation)
else:
hfunc = getattr(plugroute['handler'], operation)
passvalue = hfunc(
nodes=nodes, element=pathcomponents,
configmanager=configmanager,
inputdata=inputdata)
if isnoderange:
return passvalue
elif isinstance(passvalue, console.Console):
return passvalue
else:
return stripnode(passvalue, nodes[0])
elif 'pluginattrs' in plugroute:

View File

@ -25,6 +25,7 @@ import confluent.exceptions as exc
import confluent.log as log
import confluent.messages
import confluent.core as pluginapi
import confluent.shellserver as shellserver
import confluent.tlvdata
import confluent.util as util
import copy
@ -326,9 +327,15 @@ def resourcehandler_backend(env, start_response):
("Set-Cookie", m.OutputString())
for m in authorized['cookie'].values())
cfgmgr = authorized['cfgmgr']
if '/console/session' in env['PATH_INFO']:
if (operation == 'create' and ('/console/session' in env['PATH_INFO'] or
'/shell/sessions/' in env['PATH_INFO'])):
#hard bake JSON into this path, do not support other incarnations
prefix, _, _ = env['PATH_INFO'].partition('/console/session')
if '/console/session' in env['PATH_INFO']:
prefix, _, _ = env['PATH_INFO'].partition('/console/session')
shellsession = False
elif '/shell/sessions/' in env['PATH_INFO']:
prefix, _, _ = env['PATH_INFO'].partition('/shell/sessions')
shellsession = True
_, _, nodename = prefix.rpartition('/')
if 'session' not in querydict.keys() or not querydict['session']:
auditmsg = {
@ -344,9 +351,14 @@ def resourcehandler_backend(env, start_response):
if 'skipreplay' in querydict and querydict['skipreplay']:
skipreplay = True
try:
consession = consoleserver.ConsoleSession(
node=nodename, configmanager=cfgmgr,
username=authorized['username'], skipreplay=skipreplay)
if shellsession:
consession = shellserver.ShellSession(
node=nodename, configmanager=cfgmgr,
username=authorized['username'], skipreplay=skipreplay)
else:
consession = consoleserver.ConsoleSession(
node=nodename, configmanager=cfgmgr,
username=authorized['username'], skipreplay=skipreplay)
except exc.NotFoundException:
start_response("404 Not found", headers)
yield "404 - Request Path not recognized"

View File

@ -0,0 +1,105 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2015 Lenovo
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__author__ = 'jjohnson2'
# This plugin provides an ssh implementation comforming to the 'console'
# specification. consoleserver or shellserver would be equally likely
# to use this.
import confluent.interface.console as conapi
import eventlet
paramiko = eventlet.import_patched('paramiko')
class SshShell(conapi.Console):
def __init__(self, node, config, username='', password=''):
self.node = node
self.ssh = None
self.nodeconfig = config
self.username = username
self.password = password
self.inputmode = 0 # 0 = username, 1 = password...
def recvdata(self):
while self.connected:
pendingdata = self.shell.recv(8192)
if pendingdata == '':
self.datacallback(conapi.ConsoleEvent.Disconnect)
return
self.datacallback(pendingdata)
def connect(self, callback):
# for now, we just use the nodename as the presumptive ssh destination
#TODO(jjohnson2): use a 'nodeipget' utility function for architectures
# that would rather not use the nodename as anything but an opaque
# identifier
self.datacallback = callback
if self.username is not '':
self.logon()
else:
self.inputmode = 0
callback('\r\nlogin as: ')
return
def logon(self):
self.ssh = paramiko.SSHClient()
self.ssh.set_missing_host_key_policy(paramiko.client.AutoAddPolicy())
try:
self.ssh.connect(self.node, username=self.username,
password=self.password, allow_agent=False,
look_for_keys=False)
except paramiko.AuthenticationException:
self.inputmode = 0
self.username = ''
self.password = ''
self.datacallback('\r\nlogin as: ')
return
self.inputmode = 2
self.connected = True
self.shell = self.ssh.invoke_shell()
self.rxthread = eventlet.spawn(self.recvdata)
def write(self, data):
if self.inputmode == 0:
self.username += data
if '\r' in self.username:
self.username, self.password = self.username.split('\r')
lastdata = data.split('\r')[0]
if lastdata != '':
self.datacallback(lastdata)
self.datacallback('\r\nEnter password: ')
self.inputmode = 1
else:
# echo back typed data
self.datacallback(data)
elif self.inputmode == 1:
self.password += data
if '\r' in self.password:
self.password = self.password.split('\r')[0]
self.datacallback('\r\n')
self.logon()
else:
self.shell.sendall(data)
def close(self):
if self.ssh is not None:
self.ssh.close()
def create(nodes, element, configmanager, inputdata):
if len(nodes) == 1:
return SshShell(nodes[0], configmanager)

View File

@ -0,0 +1,122 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2016 Lenovo
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This module tracks each node, tenants currently active shell sessions
# 'ConsoleSession' objects from consoleserver are used, but with the additional
# capacity for having a multiple of sessions per node active at a given time
import confluent.consoleserver as consoleserver
import confluent.exceptions as exc
import confluent.messages as msg
activesessions = {}
class _ShellHandler(consoleserver.ConsoleHandler):
_plugin_path = '/nodes/{0}/_shell/session'
_genwatchattribs = False
_logtobuffer = False
def log(self, *args, **kwargs):
# suppress logging through proving a stub 'log' function
return
def _got_disconnected(self):
self.connectstate = 'closed'
self._send_rcpts({'connectstate': self.connectstate})
for session in self.livesessions:
session.destroy()
def get_sessions(tenant, node, user):
"""Get sessionids active for node
Given a tenant, nodename, and user; provide an iterable of sessionids.
Each permutation of tenant, nodename and user have a distinct set of shell
sessions.
:param tenant: The tenant identifier for the current scope
:param node: The nodename of the current scope.
:param user: The confluent user that will 'own' the session.
"""
return activesessions.get((tenant, node, user), {})
def get_session(tenant, node, user, sessionid):
return activesessions.get((tenant, node, user), {}).get(sessionid, None)
class ShellSession(consoleserver.ConsoleSession):
"""Create a new socket to converse with a node shell session
This object provides a filehandle that can be read/written
too in a normal fashion and the concurrency, logging, and
event watching will all be handled seamlessly. It represents a remote
CLI shell session.
:param node: Name of the node for which this session will be created
:param configmanager: A configuration manager object for current context
:param username: Username for which this session object will operate
:param datacallback: An asynchronous data handler, to be called when data
is available. Note that if passed, it makes
'get_next_output' non-functional
:param skipreplay: If true, will skip the attempt to redraw the screen
:param sessionid: An optional identifier to match a running session or
customize the name of a new session.
"""
def __init__(self, node, configmanager, username, datacallback=None,
skipreplay=False, sessionid=None):
self.sessionid = sessionid
self.configmanager = configmanager
self.node = node
super(ShellSession, self).__init__(node, configmanager, username,
datacallback, skipreplay)
def connect_session(self):
global activesessions
tenant = self.configmanager.tenant
if (self.configmanager.tenant, self.node) not in activesessions:
activesessions[(tenant, self.node)] = {}
if self.sessionid is None:
self.sessionid = 1
while str(self.sessionid) in activesessions[(tenant, self.node)]:
self.sessionid += 1
self.sessionid = str(self.sessionid)
if self.sessionid not in activesessions[(tenant, self.node)]:
activesessions[(tenant, self.node)][self.sessionid] = _ShellHandler(self.node, self.configmanager)
self.conshdl = activesessions[(self.configmanager.tenant, self.node)][self.sessionid]
def destroy(self):
del activesessions[(self.configmanager.tenant, self.node)][self.sessionid]
super(ShellSession, self).destroy()
def create(nodes, element, configmanager, inputdata):
# For creating a resource, it really has to be handled
# in httpapi/sockapi specially, like a console.
raise exc.InvalidArgumentException('Special client code required')
def retrieve(nodes, element, configmanager, inputdata):
tenant = configmanager.tenant
user = configmanager.current_user
if (tenant, nodes[0]) in activesessions:
for sessionid in activesessions[(tenant, nodes[0])]:
yield msg.ChildCollection(sessionid)

View File

@ -40,6 +40,7 @@ import confluent.config.configmanager as configmanager
import confluent.exceptions as exc
import confluent.log as log
import confluent.core as pluginapi
import confluent.shellserver as shellserver
tracelog = None
@ -88,7 +89,7 @@ def sessionhdl(connection, authname, skipauth=False):
cfm = None
if skipauth:
authenticated = True
cfm = configmanager.ConfigManager(tenant=None)
cfm = configmanager.ConfigManager(tenant=None, username=authname)
elif authname:
authdata = auth.authorize(authname, element=None)
if authdata is not None:
@ -178,7 +179,7 @@ def process_request(connection, request, cfm, authdata, authname, skipauth):
raise ValueError
operation = request['operation']
path = request['path']
params = request.get('parameters', None)
params = request.get('parameters', {})
hdlr = None
if not skipauth:
authdata = auth.authorize(authdata[2], path, authdata[3], operation)
@ -197,42 +198,7 @@ def process_request(connection, request, cfm, authdata, authname, skipauth):
auditlog.log(auditmsg)
try:
if operation == 'start':
elems = path.split('/')
if elems[3] != "console":
raise exc.InvalidArgumentException()
node = elems[2]
ccons = ClientConsole(connection)
skipreplay = False
if params and 'skipreplay' in params and params['skipreplay']:
skipreplay = True
consession = consoleserver.ConsoleSession(
node=node, configmanager=cfm, username=authname,
datacallback=ccons.sendall, skipreplay=skipreplay)
if consession is None:
raise Exception("TODO")
send_data(connection, {'started': 1})
ccons.startsending()
bufferage = consession.get_buffer_age()
if bufferage is not False:
send_data(connection, {'bufferage': bufferage})
while consession is not None:
data = tlvdata.recv(connection)
if type(data) == dict:
if data['operation'] == 'stop':
consession.destroy()
return
elif data['operation'] == 'break':
consession.send_break()
continue
elif data['operation'] == 'reopen':
consession.reopen()
continue
else:
raise Exception("TODO")
if not data:
consession.destroy()
return
consession.write(data)
return start_term(authname, cfm, connection, params, path)
elif operation == 'shutdown':
configmanager.ConfigManager.shutdown()
else:
@ -249,6 +215,59 @@ def process_request(connection, request, cfm, authdata, authname, skipauth):
return
def start_term(authname, cfm, connection, params, path):
elems = path.split('/')
if len(elems) < 4 or elems[1] != 'nodes':
raise exc.InvalidArgumentException('Invalid path {0}'.format(path))
node = elems[2]
ccons = ClientConsole(connection)
skipreplay = False
if params and 'skipreplay' in params and params['skipreplay']:
skipreplay = True
if elems[3] == "console":
consession = consoleserver.ConsoleSession(
node=node, configmanager=cfm, username=authname,
datacallback=ccons.sendall, skipreplay=skipreplay)
elif len(elems) >= 6 and elems[3:5] == ['shell', 'sessions']:
if len(elems) == 7:
sessionid = elems[5]
else:
sessionid = None
consession = shellserver.ShellSession(
node=node, configmanager=cfm, username=authname,
datacallback=ccons.sendall, skipreplay=skipreplay,
sessionid=sessionid)
else:
raise exc.InvalidArgumentException('Invalid path {0}'.format(path))
if consession is None:
raise Exception("TODO")
send_data(connection, {'started': 1})
ccons.startsending()
bufferage = consession.get_buffer_age()
if bufferage is not False:
send_data(connection, {'bufferage': bufferage})
while consession is not None:
data = tlvdata.recv(connection)
if type(data) == dict:
if data['operation'] == 'stop':
consession.destroy()
return
elif data['operation'] == 'break':
consession.send_break()
continue
elif data['operation'] == 'reopen':
consession.reopen()
continue
else:
raise Exception("TODO")
if not data:
consession.destroy()
return
consession.write(data)
def _tlshandler(bind_host, bind_port):
plainsocket = socket.socket(socket.AF_INET6)
plainsocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

View File

@ -10,6 +10,7 @@ setup(
description='confluent systems management server',
packages=['confluent', 'confluent/config', 'confluent/interface',
'confluent/plugins/hardwaremanagement/',
'confluent/plugins/shell/',
'confluent/plugins/configuration/'],
install_requires=['pycrypto>=2.6', 'confluent_client>=0.1.0', 'eventlet',
'pyghmi>=0.6.5'],