From baef8c2c42eb7e8293199060d66738797e7f39c4 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Thu, 21 Jun 2018 13:46:51 -0400 Subject: [PATCH] Connect and fix the proxyconsole This works full duplex for sockapi and read for web consoles. --- confluent_server/confluent/consoleserver.py | 63 +++++++++++++-------- 1 file changed, 39 insertions(+), 24 deletions(-) diff --git a/confluent_server/confluent/consoleserver.py b/confluent_server/confluent/consoleserver.py index a1ab4413..59011880 100644 --- a/confluent_server/confluent/consoleserver.py +++ b/confluent_server/confluent/consoleserver.py @@ -33,6 +33,7 @@ import confluent.tlvdata as tlvdata import confluent.util as util import eventlet import eventlet.event +import eventlet.green.socket as socket import eventlet.green.ssl as ssl import pyte import random @@ -612,6 +613,13 @@ def start_console_sessions(): def connect_node(node, configmanager, username=None): + attrval = configmanager.get_node_attributes(node, 'collective.manager') + myc = attrval.get(node, {}).get('collective.manager', {}).get( + 'value', None) + myname = collective.get_myname() + if myc and myc != collective.get_myname(): + minfo = configmodule.get_collective_member(myc) + return ProxyConsole(node, minfo, myname, configmanager, username) consk = (node, configmanager.tenant) if consk not in _handled_consoles: _handled_consoles[consk] = ConsoleHandler(node, configmanager) @@ -621,27 +629,14 @@ def connect_node(node, configmanager, username=None): # collective member. It can skip the multi-session sharing as that is handled # remotely class ProxyConsole(object): - def __init__(self, node, managerinfo, myname, configmanager, skipreplay): - termreq = { - 'proxyconsole': { - 'name': myname, - 'tenant': configmanager.tenant, - 'node': node, - 'skiprelay': skipreplay, - }, - } - remote = socket.create_connection((managerinfo['address'], 13001)) - remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE, - keyfile='/etc/confluent/privkey.pem', - certfile='/etc/confluent/srvcert.pem') - if not util.cert_matches(managerinfo['fingerprint'], - remote.getpeercert(binary_form=True)): - raise Exception('Invalid peer certificate') - tlvdata.recv(remote) - tlvdata.recv(remote) - tlvdata.send(remote, termreq) - self.remote = remote - eventlet.spawn(self.relay_data) + def __init__(self, node, managerinfo, myname, configmanager, user): + self.skipreplay = True + self.managerinfo = managerinfo + self.myname = myname + self.cfm = configmanager + self.node = node + self.user = user + def relay_data(self): data = tlvdata.recv(self.remote) @@ -656,16 +651,36 @@ class ProxyConsole(object): def get_recent(self): # Again, delegate this to the remote collective member + self.skipreplay = False return b'' def write(self, data): # Relay data to the collective manager - tlvdata.send(remote, data) + tlvdata.send(self.remote, data) def attachsession(self, session): - # a do nothing stub, since this relationship is easier, the real - # complexity is handled remote self.data_handler = session.data_handler + termreq = { + 'proxyconsole': { + 'name': self.myname, + 'user': self.user, + 'tenant': self.cfm.tenant, + 'node': self.node, + 'skipreplay': self.skipreplay, + }, + } + remote = socket.create_connection((self.managerinfo['address'], 13001)) + remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE, + keyfile='/etc/confluent/privkey.pem', + certfile='/etc/confluent/srvcert.pem') + if not util.cert_matches(self.managerinfo['fingerprint'], + remote.getpeercert(binary_form=True)): + raise Exception('Invalid peer certificate') + tlvdata.recv(remote) + tlvdata.recv(remote) + tlvdata.send(remote, termreq) + self.remote = remote + eventlet.spawn(self.relay_data) def detachsession(self, session): # we will disappear, so just let that happen...