diff --git a/confluent_server/confluent/consoleserver.py b/confluent_server/confluent/consoleserver.py index 7e0ac7bb..8ab71308 100644 --- a/confluent_server/confluent/consoleserver.py +++ b/confluent_server/confluent/consoleserver.py @@ -625,12 +625,12 @@ def start_console_sessions(): configmodule.hook_new_configmanagers(_start_tenant_sessions) -def connect_node(node, configmanager, username=None): +def connect_node(node, configmanager, username=None, direct=True): attrval = configmanager.get_node_attributes(node, 'collective.manager') myc = attrval.get(node, {}).get('collective.manager', {}).get( 'value', None) myname = collective.get_myname() - if myc and myc != collective.get_myname(): + if myc and myc != collective.get_myname() and direct: minfo = configmodule.get_collective_member(myc) return ProxyConsole(node, minfo, myname, configmanager, username) consk = (node, configmanager.tenant) @@ -758,10 +758,9 @@ class ConsoleSession(object): 'get_next_output' non-functional :param skipreplay: If true, will skip the attempt to redraw the screen """ - connector = connect_node def __init__(self, node, configmanager, username, datacallback=None, - skipreplay=False): + skipreplay=False, direct=True): self.registered = False self.tenant = configmanager.tenant if not configmanager.is_node(node): @@ -769,6 +768,8 @@ class ConsoleSession(object): self.username = username self.node = node self.configmanager = configmanager + self.direct = direct # true if client is directly connected versus + # relay self.connect_session() self.registered = True self._evt = None @@ -797,7 +798,7 @@ class ConsoleSession(object): between console and shell. """ self.conshdl = connect_node(self.node, self.configmanager, - self.username) + self.username, self.direct) def send_break(self): """Send break to remote system """ diff --git a/confluent_server/confluent/core.py b/confluent_server/confluent/core.py index cc469661..cdbfa359 100644 --- a/confluent_server/confluent/core.py +++ b/confluent_server/confluent/core.py @@ -585,6 +585,15 @@ class BadPlugin(object): self.node, self.plugin + ' is not a supported plugin') +class BadCollective(object): + def __init__(self, node): + self.node = node + + def error(self, *args, **kwargs): + yield msg.ConfluentNodeError( + self.node, 'collective mode is active, but collective.manager ' + 'is not set for this node') + def abbreviate_noderange(configmanager, inputdata, operation): if operation != 'create': raise exc.InvalidArgumentException('Must be a create with nodes in list') @@ -777,6 +786,7 @@ def handle_node_request(configmanager, inputdata, operation, continue elif list(cfm.list_collective()): badcollnodes.append(node) + continue if plugpath is not None: try: hfunc = getattr(pluginmap[plugpath], operation) @@ -787,11 +797,8 @@ def handle_node_request(configmanager, inputdata, operation, nodesbyhandler[hfunc].append(node) else: nodesbyhandler[hfunc] = [node] - if badcollnodes: - raise exc.ConfluentException( - 'collective management active, ' - 'collective.manager must be set for {0}'.format( - ','.join(badcollnodes))) + for bn in badcollnodes: + nodesbyhandler[BadCollective(bn).error] = [bn] workers = greenpool.GreenPool() numworkers = 0 for hfunc in nodesbyhandler: diff --git a/confluent_server/confluent/discovery/core.py b/confluent_server/confluent/discovery/core.py index a79fe9df..d2e8b96c 100644 --- a/confluent_server/confluent/discovery/core.py +++ b/confluent_server/confluent/discovery/core.py @@ -1203,7 +1203,7 @@ def _map_unique_ids(nodes=None): uuid_by_nodes = {} fprint_by_nodes = {} for uuid in nodes_by_uuid: - if not uuid_is_valid(): + if not uuid_is_valid(uuid): continue node = nodes_by_uuid[uuid] if node in bigmap: diff --git a/confluent_server/confluent/plugins/shell/ssh.py b/confluent_server/confluent/plugins/shell/ssh.py index 2e44e9f8..d0b5e90b 100644 --- a/confluent_server/confluent/plugins/shell/ssh.py +++ b/confluent_server/confluent/plugins/shell/ssh.py @@ -22,11 +22,22 @@ import confluent.exceptions as cexc import confluent.interface.console as conapi import confluent.log as log +import cryptography + import eventlet import hashlib import sys sys.modules['gssapi'] = None paramiko = eventlet.import_patched('paramiko') +warnhostkey = False +if cryptography.__version__.split('.') < ['1', '5']: + # older cryptography with paramiko breaks most key support except + # ed25519 + warnhostkey = True + paramiko.transport.Transport._preferred_keys = filter( + lambda x: 'ed25519' in x, + paramiko.transport.Transport._preferred_keys) + class HostKeyHandler(paramiko.client.MissingHostKeyPolicy): @@ -121,6 +132,17 @@ class SshShell(conapi.Console): self.datacallback('\r\nNew fingerprint: ' + pi.fingerprint) self.inputmode = -1 self.datacallback('\r\nEnter "disconnect" or "accept": ') + return + except paramiko.SSHException as pi: + self.inputmode = -2 + warn = str(pi) + if warnhostkey: + warn += ' (Older cryptography package on this host only ' \ + 'works with ed25519, check ssh startup on target ' \ + 'and permissions on /etc/ssh/*key)\r\n' \ + 'Press Enter to close...' + self.datacallback('\r\n' + warn) + return self.inputmode = 2 self.connected = True @@ -128,7 +150,10 @@ class SshShell(conapi.Console): self.rxthread = eventlet.spawn(self.recvdata) def write(self, data): - if self.inputmode == -1: + if self.inputmode == -2: + self.datacallback(conapi.ConsoleEvent.Disconnect) + return + elif self.inputmode == -1: while len(data) and data[0] == b'\x7f' and len(self.keyaction): self.datacallback('\b \b') # erase previously echoed value self.keyaction = self.keyaction[:-1] diff --git a/confluent_server/confluent/sockapi.py b/confluent_server/confluent/sockapi.py index f5f16b99..75da4101 100644 --- a/confluent_server/confluent/sockapi.py +++ b/confluent_server/confluent/sockapi.py @@ -238,7 +238,8 @@ def start_proxy_term(connection, cert, request): ccons = ClientConsole(connection) consession = consoleserver.ConsoleSession( node=request['node'], configmanager=cfm, username=request['user'], - datacallback=ccons.sendall, skipreplay=request['skipreplay']) + datacallback=ccons.sendall, skipreplay=request['skipreplay'], + direct=False) term_interact(None, None, ccons, None, connection, consession, None) def start_term(authname, cfm, connection, params, path, authdata, skipauth):