From c5405f832c299f34fecf9538c2a4d50cbf52f87c Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Wed, 29 May 2024 20:18:07 -0400 Subject: [PATCH] Advance state of async shellserver Can successfully run ssh sessions through confluent with async now --- confluent_server/bin/collective | 4 +- confluent_server/confluent/consoleserver.py | 123 +++++++++--------- .../confluent/plugins/shell/ssh.py | 19 +-- confluent_server/confluent/shellserver.py | 10 +- confluent_server/confluent/sockapi.py | 26 ++-- 5 files changed, 94 insertions(+), 88 deletions(-) diff --git a/confluent_server/bin/collective b/confluent_server/bin/collective index 71a9daa6..78eb83d0 100644 --- a/confluent_server/bin/collective +++ b/confluent_server/bin/collective @@ -15,9 +15,9 @@ if path.startswith('/opt'): # if installed into system path, do not muck with things sys.path.append(path) -import confluent.client as client +import confluent.asynclient as client import confluent.sortutil as sortutil -import confluent.tlvdata as tlvdata +import confluent.asynctlvdata as tlvdata try: input = raw_input diff --git a/confluent_server/confluent/consoleserver.py b/confluent_server/confluent/consoleserver.py index 070759c6..a2d5d8a5 100644 --- a/confluent_server/confluent/consoleserver.py +++ b/confluent_server/confluent/consoleserver.py @@ -21,6 +21,7 @@ # we track nodes that are actively being logged, watched, or have attached # there should be no more than one handler per node +import asyncio import codecs import collections import confluent.collective.manager as collective @@ -34,8 +35,7 @@ import confluent.util as util import eventlet import eventlet.event import eventlet.green.os as os -import eventlet.green.select as select -import eventlet.green.socket as socket +import socket import eventlet.green.subprocess as subprocess import eventlet.green.ssl as ssl import eventlet.semaphore as semaphore @@ -60,25 +60,24 @@ def chunk_output(output, n): for i in range(0, len(output), n): yield output[i:i + n] -def get_buffer_output(nodename): +async def get_buffer_output(nodename): out = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) out.setsockopt(socket.SOL_SOCKET, socket.SO_PASSCRED, 1) out.connect("\x00confluent-vtbuffer") + rdr, writer = await asyncio.open__unix_connection(sock=out) if not isinstance(nodename, bytes): nodename = nodename.encode('utf8') outdata = bytearray() - out.send(struct.pack('I', len(nodename))) - out.send(nodename) - select.select((out,), (), (), 30) + writer.write(struct.pack('I', len(nodename))) + writer.write(nodename) + await writer.drain() while not outdata or outdata[-1]: - try: - chunk = os.read(out.fileno(), 128) - except IOError: - chunk = None - if chunk: - outdata.extend(chunk) - else: - select.select((out,), (), (), 0) + chunk = await rdr.read(128) # os.read(out.fileno(), 128) + if not chunk: + raise Exception("bad read") + outdata.extend(chunk) + writer.close() + await writer.wait_close() return bytes(outdata[:-1]) @@ -304,7 +303,7 @@ class ConsoleHandler(object): for ses in list(self.livesessions): ses.detach() - def _disconnect(self): + async def _disconnect(self): if self.connectionthread: self.connectionthread.cancel() self.connectionthread = None @@ -317,7 +316,7 @@ class ConsoleHandler(object): self._console.close() self._console = None self.connectstate = 'unconnected' - self._send_rcpts({'connectstate': self.connectstate}) + await self._send_rcpts({'connectstate': self.connectstate}) def _ondemand(self): self._isondemand = True @@ -337,7 +336,7 @@ class ConsoleHandler(object): self._console.close() self._console = None self.connectstate = 'connecting' - self._send_rcpts({'connectstate': self.connectstate}) + await self._send_rcpts({'connectstate': self.connectstate}) if self.reconnect: self.reconnect.cancel() self.reconnect = None @@ -361,18 +360,18 @@ class ConsoleHandler(object): self.clearbuffer() self.connectstate = 'unconnected' self.error = 'misconfigured' - self._send_rcpts({'connectstate': self.connectstate, + await self._send_rcpts({'connectstate': self.connectstate, 'error': self.error}) - self.feedbuffer( + await self.feedbuffer( '\x1bc\x1b[2J\x1b[1;1H[{0}]'.format(strerror)) - self._send_rcpts( + await self._send_rcpts( '\x1bc\x1b[2J\x1b[1;1H[{0}]'.format(strerror)) self.clearerror = True return if self.clearerror: self.clearerror = False self.clearbuffer() - self._send_rcpts(b'\x1bc\x1b[2J\x1b[1;1H') + await self._send_rcpts(b'\x1bc\x1b[2J\x1b[1;1H') self.send_break = self._console.send_break self.resize = self._console.resize if self._attribwatcher: @@ -387,12 +386,12 @@ class ConsoleHandler(object): (self.node,), attribstowatch, self._attribschanged) try: self.resize(width=self.initsize[0], height=self.initsize[1]) - self._console.connect(self.get_console_output) + await self._console.connect(self.get_console_output) except exc.TargetEndpointBadCredentials: self.clearbuffer() self.error = 'badcredentials' self.connectstate = 'unconnected' - self._send_rcpts({'connectstate': self.connectstate, + await self._send_rcpts({'connectstate': self.connectstate, 'error': self.error}) retrytime = self._get_retry_time() if not self.reconnect: @@ -402,7 +401,7 @@ class ConsoleHandler(object): self.clearbuffer() self.error = 'unreachable' self.connectstate = 'unconnected' - self._send_rcpts({'connectstate': self.connectstate, + await self._send_rcpts({'connectstate': self.connectstate, 'error': self.error}) retrytime = self._get_retry_time() if not self.reconnect: @@ -414,39 +413,39 @@ class ConsoleHandler(object): event=log.Events.stacktrace) self.error = 'unknown' self.connectstate = 'unconnected' - self._send_rcpts({'connectstate': self.connectstate, + await self._send_rcpts({'connectstate': self.connectstate, 'error': self.error}) retrytime = self._get_retry_time() if not self.reconnect: self.reconnect = util.spawn_after(retrytime, self._connect) return - self._got_connected() + await self._got_connected() - def _got_connected(self): + async def _got_connected(self): self.connectstate = 'connected' self._retrytime = 0 self.log( logdata='console connected', ltype=log.DataTypes.event, event=log.Events.consoleconnect) - self._send_rcpts({'connectstate': self.connectstate}) + await self._send_rcpts({'connectstate': self.connectstate}) - def _got_disconnected(self): + async def _got_disconnected(self): if self.connectstate != 'unconnected': self._console.close() self.connectstate = 'unconnected' self.log( logdata='console disconnected', ltype=log.DataTypes.event, event=log.Events.consoledisconnect) - self._send_rcpts({'connectstate': self.connectstate}) + await self._send_rcpts({'connectstate': self.connectstate}) if self._isalive: self._connect() else: self.clearbuffer() - def close(self): + async def close(self): self._isalive = False - self._send_rcpts({'deleting': True}) - self._disconnect() + await self._send_rcpts({'deleting': True}) + await self._disconnect() if self._console: self._console.close() @@ -461,9 +460,9 @@ class ConsoleHandler(object): def get_console_output(self, data): # Spawn as a greenthread, return control as soon as possible # to the console object - eventlet.spawn(self._handle_console_output, data) + util.spawn(self._handle_console_output(data)) - def attachsession(self, session): + async def attachsession(self, session): edata = 1 for currsession in self.livesessions: if currsession.username == session.username: @@ -473,7 +472,7 @@ class ConsoleHandler(object): self.log( logdata=session.username, ltype=log.DataTypes.event, event=log.Events.clientconnect, eventdata=edata) - self._send_rcpts({'clientcount': len(self.livesessions)}) + await self._send_rcpts({'clientcount': len(self.livesessions)}) if self.connectstate == 'unconnected': # if console is not connected, take time to try to assert # connectivity now. @@ -486,7 +485,7 @@ class ConsoleHandler(object): - def detachsession(self, session): + async def detachsession(self, session): edata = 0 self.livesessions.discard(session) for currsession in self.livesessions: @@ -497,7 +496,7 @@ class ConsoleHandler(object): self.log( logdata=session.username, ltype=log.DataTypes.event, event=log.Events.clientdisconnect, eventdata=edata) - self._send_rcpts({'clientcount': len(self.livesessions)}) + await self._send_rcpts({'clientcount': len(self.livesessions)}) if self._isondemand and not self.livesessions: self._disconnect() @@ -505,7 +504,7 @@ class ConsoleHandler(object): def reopen(self): self._got_disconnected() - def _handle_console_output(self, data): + async def _handle_console_output(self, data): if type(data) == int: if data == conapi.ConsoleEvent.Disconnect: self._got_disconnected() @@ -523,22 +522,23 @@ class ConsoleHandler(object): self.clearpending = False self.clearerror = False self.feedbuffer(b'\x1bc\x1b[2J\x1b[1;1H') - self._send_rcpts(b'\x1bc\x1b[2J\x1b[1;1H') - self._send_rcpts(_utf8_normalize(data, self.utf8decoder)) + await self._send_rcpts(b'\x1bc\x1b[2J\x1b[1;1H') + await self._send_rcpts(_utf8_normalize(data, self.utf8decoder)) self.log(data, eventdata=eventdata) self.lasttime = util.monotonic_time() self.feedbuffer(data) - def _send_rcpts(self, data): + async def _send_rcpts(self, data): for rcpt in list(self.livesessions): try: - rcpt.data_handler(data) - except: # No matter the reason, advance to next recipient + await rcpt.data_handler(data) + except Exception as e: # No matter the reason, advance to next recipient + print(repr(e)) _tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event, event=log.Events.stacktrace) - def get_recent(self): + async def get_recent(self): """Retrieve 'recent' data Replay data in the intent to perhaps reproduce the display. @@ -551,7 +551,7 @@ class ConsoleHandler(object): 'clientcount': len(self.livesessions), } nodeid = self.termprefix + self.node - retdata = get_buffer_output(nodeid) + retdata = await get_buffer_output(nodeid) return retdata, connstate def write(self, data): @@ -598,13 +598,15 @@ def _start_tenant_sessions(cfm): cfm.watch_nodecollection(_nodechange) -def initialize(): +async def initialize(): global _tracelog global _bufferdaemon _tracelog = log.Logger('trace') - _bufferdaemon = subprocess.Popen( - ['/opt/confluent/bin/vtbufferd', 'confluent-vtbuffer'], bufsize=0, stdin=subprocess.DEVNULL, - stdout=subprocess.DEVNULL) + _bufferdaemon = await asyncio.subprocess.create_subprocess_exec( + '/opt/confluent/bin/vtbufferd', 'confluent-vtbuffer') + #_bufferdaemon = subprocess.Popen( + # ['/opt/confluent/bin/vtbufferd', 'confluent-vtbuffer'], bufsize=0, stdin=subprocess.DEVNULL, + # stdout=subprocess.DEVNULL) def start_console_sessions(): configmodule.hook_new_configmanagers(_start_tenant_sessions) @@ -654,11 +656,11 @@ class ProxyConsole(object): self.clisession.detach() self.clisession = None - def relay_data(self): - data = tlvdata.recv(self.remote) + async def relay_data(self): + data = await tlvdata.recv(self.remote) while data: self.data_handler(data) - data = tlvdata.recv(self.remote) + data = await tlvdata.recv(self.remote) self.remote.close() def get_buffer_age(self): @@ -706,7 +708,7 @@ class ProxyConsole(object): remote.getpeercert(binary_form=True)): raise Exception('Invalid peer certificate') except Exception: - eventlet.sleep(3) + #await asyncio.sleep(3) if self.clisession: self.clisession.detach() self.detachsession(None) @@ -715,7 +717,7 @@ class ProxyConsole(object): tlvdata.recv(remote) tlvdata.send(remote, termreq) self.remote = remote - eventlet.spawn(self.relay_data) + util.spawn(self.relay_data()) def detachsession(self, session): # we will disappear, so just let that happen... @@ -777,6 +779,9 @@ class ConsoleSession(object): self._evt = None self.node = node self.write = self.conshdl.write + util.spawn(self.delayinit(datacallback, skipreplay)) + + async def delayinit(self, datacallback, skipreplay): if datacallback is None: self.reaper = util.spawn_after(15, self.destroy) self.databuffer = collections.deque([]) @@ -786,10 +791,10 @@ class ConsoleSession(object): else: self.data_handler = datacallback if not skipreplay: - for recdata in self.conshdl.get_recent(): + for recdata in await self.conshdl.get_recent(): if recdata: datacallback(recdata) - self.conshdl.attachsession(self) + await self.conshdl.attachsession(self) def connect_session(self): @@ -827,9 +832,9 @@ class ConsoleSession(object): """ self.conshdl.reopen() - def destroy(self): + async def destroy(self): if self.registered: - self.conshdl.detachsession(self) + await self.conshdl.detachsession(self) if self._evt: self._evt.send() self._evt = None diff --git a/confluent_server/confluent/plugins/shell/ssh.py b/confluent_server/confluent/plugins/shell/ssh.py index 08199a12..d7106e50 100644 --- a/confluent_server/confluent/plugins/shell/ssh.py +++ b/confluent_server/confluent/plugins/shell/ssh.py @@ -22,6 +22,7 @@ import confluent.exceptions as cexc import confluent.interface.console as conapi import confluent.log as log +import confluent.util as util import hashlib import sys @@ -93,7 +94,7 @@ class SshShell(conapi.Console): async def recvdata(self): while self.connected: - pendingdata = await self.shell.stdout.read(8192) + pendingdata = await self.shell[1].read(8192) if not pendingdata: self.ssh.close() if self.datacallback: @@ -101,14 +102,14 @@ class SshShell(conapi.Console): return self.datacallback(pendingdata) - def connect(self, callback): + async def connect(self, callback): # for now, we just use the nodename as the presumptive ssh destination # TODO(jjohnson2): use a 'nodeipget' utility function for architectures # that would rather not use the nodename as anything but an opaque # identifier self.datacallback = callback if self.username != b'': - self.logon() + await self.logon() else: self.inputmode = 0 callback('\r\nlogin as: ') @@ -116,7 +117,7 @@ class SshShell(conapi.Console): def logon(self): self.inputmode = -3 - eventlet.spawn_n(self.do_logon) + util.spawn(self.do_logon()) async def do_logon(self): sco = asyncssh.SSHClientConnectionOptions() @@ -126,7 +127,7 @@ class SshShell(conapi.Console): try: self.datacallback('\r\nConnecting to {}...'.format(self.node)) try: - self.ssh = await asyncssh.connect(self,node, username=self.username, password=self.password, known_hosts='/etc/ssh/ssh_known_hosts') + self.ssh = await asyncssh.connect(self.node, username=self.username.decode(), password=self.password.decode(), known_hosts='/etc/ssh/ssh_known_hosts') except ValueError: #TODO: non-cert ssh targets raise @@ -153,9 +154,9 @@ class SshShell(conapi.Console): self.inputmode = 2 self.connected = True self.datacallback('Connected\r\n') - self.shell = self.ssh.invoke_shell(width=self.width, - height=self.height) - self.rxthread = eventlet.spawn(self.recvdata) + self.shell = await self.ssh.open_session(term_type='vt100', term_size=(self.width, self.height)) # self.ssh.invoke_shell(width=self.width, + # height=self.height) + self.rxthread = util.spawn(self.recvdata()) def write(self, data): if self.inputmode == -2: @@ -223,7 +224,7 @@ class SshShell(conapi.Console): self.datacallback(b'\r\n') self.logon() else: - self.shell.sendall(data) + self.shell[0].write(data.decode()) def close(self): if self.ssh is not None: diff --git a/confluent_server/confluent/shellserver.py b/confluent_server/confluent/shellserver.py index 3dbf475a..4293309c 100644 --- a/confluent_server/confluent/shellserver.py +++ b/confluent_server/confluent/shellserver.py @@ -41,9 +41,9 @@ class _ShellHandler(consoleserver.ConsoleHandler): return #return super().feedbuffer(data) - def get_recent(self): - retdata, connstate = super(_ShellHandler, self).get_recent() - return '', connstate + async def get_recent(self): + #retdata, connstate = await super(_ShellHandler, self).get_recent() + return '', {} # connstate def _got_disconnected(self): self.connectstate = 'closed' @@ -117,7 +117,7 @@ class ShellSession(consoleserver.ConsoleSession): activesessions[(tenant, self.node, self.username)][self.sessionid] = _ShellHandler(self.node, self.configmanager, width=self.width, height=self.height) self.conshdl = activesessions[(self.configmanager.tenant, self.node, self.username)][self.sessionid] - def destroy(self): + async def destroy(self): try: activesessions[(self.configmanager.tenant, self.node, self.username)][self.sessionid].close() @@ -125,7 +125,7 @@ class ShellSession(consoleserver.ConsoleSession): self.username)][self.sessionid] except KeyError: pass - super(ShellSession, self).destroy() + return await super(ShellSession, self).destroy() def create(nodes, element, configmanager, inputdata): # For creating a resource, it really has to be handled diff --git a/confluent_server/confluent/sockapi.py b/confluent_server/confluent/sockapi.py index bfb257a2..6f278e6c 100644 --- a/confluent_server/confluent/sockapi.py +++ b/confluent_server/confluent/sockapi.py @@ -39,7 +39,7 @@ import confluent.auth as auth import confluent.credserver as credserver import confluent.config.conf as conf import confluent.asynctlvdata as tlvdata -#import confluent.consoleserver as consoleserver +import confluent.consoleserver as consoleserver import confluent.config.configmanager as configmanager import confluent.exceptions as exc import confluent.log as log @@ -75,16 +75,16 @@ class ClientConsole(object): self.xmit = False self.pendingdata = [] - def sendall(self, data): + async def sendall(self, data): if not self.xmit: self.pendingdata.append(data) return - send_data(self.client, data) + await send_data(self.client, data) - def startsending(self): + async def startsending(self): self.xmit = True for datum in self.pendingdata: - send_data(self.client, datum) + await send_data(self.client, datum) self.pendingdata = [] @@ -255,9 +255,9 @@ async def process_request( "error": "Target not found - " + str(e)}) send_data(connection, {"_requestdone": 1}) except exc.InvalidArgumentException as e: - send_data(connection, {"errorcode": 400, + await send_data(connection, {"errorcode": 400, "error": "Bad Request - " + str(e)}) - send_data(connection, {"_requestdone": 1}) + await send_data(connection, {"_requestdone": 1}) await send_response(hdlr, connection) return @@ -310,8 +310,8 @@ async def start_term(authname, cfm, connection, params, path, authdata, skipauth async def term_interact(authdata, authname, ccons, cfm, connection, consession, skipauth): - send_data(connection, {'started': 1}) - ccons.startsending() + await send_data(connection, {'started': 1}) + await ccons.startsending() bufferage = consession.get_buffer_age() if bufferage is not False: send_data(connection, {'bufferage': bufferage}) @@ -338,17 +338,17 @@ async def term_interact(authdata, authname, ccons, cfm, connection, consession, continue else: try: - process_request(connection, data, cfm, authdata, authname, - skipauth) + await process_request(connection, data, cfm, authdata, authname, + skipauth) except Exception as e: tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event, event=log.Events.stacktrace) - send_data( + await send_data( connection, {'errorcode': 500, 'error': 'Unexpected error - ' + str(e)}) - send_data(connection, {'_requestdone': 1}) + await send_data(connection, {'_requestdone': 1}) continue if not data: consession.destroy()