2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-02-16 10:39:23 +00:00

Advance state of async shellserver

Can successfully run ssh sessions through
confluent with async now
This commit is contained in:
Jarrod Johnson 2024-05-29 20:18:07 -04:00
parent 23d0bbd047
commit c5405f832c
5 changed files with 94 additions and 88 deletions

View File

@ -15,9 +15,9 @@ if path.startswith('/opt'):
# if installed into system path, do not muck with things
sys.path.append(path)
import confluent.client as client
import confluent.asynclient as client
import confluent.sortutil as sortutil
import confluent.tlvdata as tlvdata
import confluent.asynctlvdata as tlvdata
try:
input = raw_input

View File

@ -21,6 +21,7 @@
# we track nodes that are actively being logged, watched, or have attached
# there should be no more than one handler per node
import asyncio
import codecs
import collections
import confluent.collective.manager as collective
@ -34,8 +35,7 @@ import confluent.util as util
import eventlet
import eventlet.event
import eventlet.green.os as os
import eventlet.green.select as select
import eventlet.green.socket as socket
import socket
import eventlet.green.subprocess as subprocess
import eventlet.green.ssl as ssl
import eventlet.semaphore as semaphore
@ -60,25 +60,24 @@ def chunk_output(output, n):
for i in range(0, len(output), n):
yield output[i:i + n]
def get_buffer_output(nodename):
async def get_buffer_output(nodename):
out = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
out.setsockopt(socket.SOL_SOCKET, socket.SO_PASSCRED, 1)
out.connect("\x00confluent-vtbuffer")
rdr, writer = await asyncio.open__unix_connection(sock=out)
if not isinstance(nodename, bytes):
nodename = nodename.encode('utf8')
outdata = bytearray()
out.send(struct.pack('I', len(nodename)))
out.send(nodename)
select.select((out,), (), (), 30)
writer.write(struct.pack('I', len(nodename)))
writer.write(nodename)
await writer.drain()
while not outdata or outdata[-1]:
try:
chunk = os.read(out.fileno(), 128)
except IOError:
chunk = None
if chunk:
outdata.extend(chunk)
else:
select.select((out,), (), (), 0)
chunk = await rdr.read(128) # os.read(out.fileno(), 128)
if not chunk:
raise Exception("bad read")
outdata.extend(chunk)
writer.close()
await writer.wait_close()
return bytes(outdata[:-1])
@ -304,7 +303,7 @@ class ConsoleHandler(object):
for ses in list(self.livesessions):
ses.detach()
def _disconnect(self):
async def _disconnect(self):
if self.connectionthread:
self.connectionthread.cancel()
self.connectionthread = None
@ -317,7 +316,7 @@ class ConsoleHandler(object):
self._console.close()
self._console = None
self.connectstate = 'unconnected'
self._send_rcpts({'connectstate': self.connectstate})
await self._send_rcpts({'connectstate': self.connectstate})
def _ondemand(self):
self._isondemand = True
@ -337,7 +336,7 @@ class ConsoleHandler(object):
self._console.close()
self._console = None
self.connectstate = 'connecting'
self._send_rcpts({'connectstate': self.connectstate})
await self._send_rcpts({'connectstate': self.connectstate})
if self.reconnect:
self.reconnect.cancel()
self.reconnect = None
@ -361,18 +360,18 @@ class ConsoleHandler(object):
self.clearbuffer()
self.connectstate = 'unconnected'
self.error = 'misconfigured'
self._send_rcpts({'connectstate': self.connectstate,
await self._send_rcpts({'connectstate': self.connectstate,
'error': self.error})
self.feedbuffer(
await self.feedbuffer(
'\x1bc\x1b[2J\x1b[1;1H[{0}]'.format(strerror))
self._send_rcpts(
await self._send_rcpts(
'\x1bc\x1b[2J\x1b[1;1H[{0}]'.format(strerror))
self.clearerror = True
return
if self.clearerror:
self.clearerror = False
self.clearbuffer()
self._send_rcpts(b'\x1bc\x1b[2J\x1b[1;1H')
await self._send_rcpts(b'\x1bc\x1b[2J\x1b[1;1H')
self.send_break = self._console.send_break
self.resize = self._console.resize
if self._attribwatcher:
@ -387,12 +386,12 @@ class ConsoleHandler(object):
(self.node,), attribstowatch, self._attribschanged)
try:
self.resize(width=self.initsize[0], height=self.initsize[1])
self._console.connect(self.get_console_output)
await self._console.connect(self.get_console_output)
except exc.TargetEndpointBadCredentials:
self.clearbuffer()
self.error = 'badcredentials'
self.connectstate = 'unconnected'
self._send_rcpts({'connectstate': self.connectstate,
await self._send_rcpts({'connectstate': self.connectstate,
'error': self.error})
retrytime = self._get_retry_time()
if not self.reconnect:
@ -402,7 +401,7 @@ class ConsoleHandler(object):
self.clearbuffer()
self.error = 'unreachable'
self.connectstate = 'unconnected'
self._send_rcpts({'connectstate': self.connectstate,
await self._send_rcpts({'connectstate': self.connectstate,
'error': self.error})
retrytime = self._get_retry_time()
if not self.reconnect:
@ -414,39 +413,39 @@ class ConsoleHandler(object):
event=log.Events.stacktrace)
self.error = 'unknown'
self.connectstate = 'unconnected'
self._send_rcpts({'connectstate': self.connectstate,
await self._send_rcpts({'connectstate': self.connectstate,
'error': self.error})
retrytime = self._get_retry_time()
if not self.reconnect:
self.reconnect = util.spawn_after(retrytime, self._connect)
return
self._got_connected()
await self._got_connected()
def _got_connected(self):
async def _got_connected(self):
self.connectstate = 'connected'
self._retrytime = 0
self.log(
logdata='console connected', ltype=log.DataTypes.event,
event=log.Events.consoleconnect)
self._send_rcpts({'connectstate': self.connectstate})
await self._send_rcpts({'connectstate': self.connectstate})
def _got_disconnected(self):
async def _got_disconnected(self):
if self.connectstate != 'unconnected':
self._console.close()
self.connectstate = 'unconnected'
self.log(
logdata='console disconnected', ltype=log.DataTypes.event,
event=log.Events.consoledisconnect)
self._send_rcpts({'connectstate': self.connectstate})
await self._send_rcpts({'connectstate': self.connectstate})
if self._isalive:
self._connect()
else:
self.clearbuffer()
def close(self):
async def close(self):
self._isalive = False
self._send_rcpts({'deleting': True})
self._disconnect()
await self._send_rcpts({'deleting': True})
await self._disconnect()
if self._console:
self._console.close()
@ -461,9 +460,9 @@ class ConsoleHandler(object):
def get_console_output(self, data):
# Spawn as a greenthread, return control as soon as possible
# to the console object
eventlet.spawn(self._handle_console_output, data)
util.spawn(self._handle_console_output(data))
def attachsession(self, session):
async def attachsession(self, session):
edata = 1
for currsession in self.livesessions:
if currsession.username == session.username:
@ -473,7 +472,7 @@ class ConsoleHandler(object):
self.log(
logdata=session.username, ltype=log.DataTypes.event,
event=log.Events.clientconnect, eventdata=edata)
self._send_rcpts({'clientcount': len(self.livesessions)})
await self._send_rcpts({'clientcount': len(self.livesessions)})
if self.connectstate == 'unconnected':
# if console is not connected, take time to try to assert
# connectivity now.
@ -486,7 +485,7 @@ class ConsoleHandler(object):
def detachsession(self, session):
async def detachsession(self, session):
edata = 0
self.livesessions.discard(session)
for currsession in self.livesessions:
@ -497,7 +496,7 @@ class ConsoleHandler(object):
self.log(
logdata=session.username, ltype=log.DataTypes.event,
event=log.Events.clientdisconnect, eventdata=edata)
self._send_rcpts({'clientcount': len(self.livesessions)})
await self._send_rcpts({'clientcount': len(self.livesessions)})
if self._isondemand and not self.livesessions:
self._disconnect()
@ -505,7 +504,7 @@ class ConsoleHandler(object):
def reopen(self):
self._got_disconnected()
def _handle_console_output(self, data):
async def _handle_console_output(self, data):
if type(data) == int:
if data == conapi.ConsoleEvent.Disconnect:
self._got_disconnected()
@ -523,22 +522,23 @@ class ConsoleHandler(object):
self.clearpending = False
self.clearerror = False
self.feedbuffer(b'\x1bc\x1b[2J\x1b[1;1H')
self._send_rcpts(b'\x1bc\x1b[2J\x1b[1;1H')
self._send_rcpts(_utf8_normalize(data, self.utf8decoder))
await self._send_rcpts(b'\x1bc\x1b[2J\x1b[1;1H')
await self._send_rcpts(_utf8_normalize(data, self.utf8decoder))
self.log(data, eventdata=eventdata)
self.lasttime = util.monotonic_time()
self.feedbuffer(data)
def _send_rcpts(self, data):
async def _send_rcpts(self, data):
for rcpt in list(self.livesessions):
try:
rcpt.data_handler(data)
except: # No matter the reason, advance to next recipient
await rcpt.data_handler(data)
except Exception as e: # No matter the reason, advance to next recipient
print(repr(e))
_tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event,
event=log.Events.stacktrace)
def get_recent(self):
async def get_recent(self):
"""Retrieve 'recent' data
Replay data in the intent to perhaps reproduce the display.
@ -551,7 +551,7 @@ class ConsoleHandler(object):
'clientcount': len(self.livesessions),
}
nodeid = self.termprefix + self.node
retdata = get_buffer_output(nodeid)
retdata = await get_buffer_output(nodeid)
return retdata, connstate
def write(self, data):
@ -598,13 +598,15 @@ def _start_tenant_sessions(cfm):
cfm.watch_nodecollection(_nodechange)
def initialize():
async def initialize():
global _tracelog
global _bufferdaemon
_tracelog = log.Logger('trace')
_bufferdaemon = subprocess.Popen(
['/opt/confluent/bin/vtbufferd', 'confluent-vtbuffer'], bufsize=0, stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL)
_bufferdaemon = await asyncio.subprocess.create_subprocess_exec(
'/opt/confluent/bin/vtbufferd', 'confluent-vtbuffer')
#_bufferdaemon = subprocess.Popen(
# ['/opt/confluent/bin/vtbufferd', 'confluent-vtbuffer'], bufsize=0, stdin=subprocess.DEVNULL,
# stdout=subprocess.DEVNULL)
def start_console_sessions():
configmodule.hook_new_configmanagers(_start_tenant_sessions)
@ -654,11 +656,11 @@ class ProxyConsole(object):
self.clisession.detach()
self.clisession = None
def relay_data(self):
data = tlvdata.recv(self.remote)
async def relay_data(self):
data = await tlvdata.recv(self.remote)
while data:
self.data_handler(data)
data = tlvdata.recv(self.remote)
data = await tlvdata.recv(self.remote)
self.remote.close()
def get_buffer_age(self):
@ -706,7 +708,7 @@ class ProxyConsole(object):
remote.getpeercert(binary_form=True)):
raise Exception('Invalid peer certificate')
except Exception:
eventlet.sleep(3)
#await asyncio.sleep(3)
if self.clisession:
self.clisession.detach()
self.detachsession(None)
@ -715,7 +717,7 @@ class ProxyConsole(object):
tlvdata.recv(remote)
tlvdata.send(remote, termreq)
self.remote = remote
eventlet.spawn(self.relay_data)
util.spawn(self.relay_data())
def detachsession(self, session):
# we will disappear, so just let that happen...
@ -777,6 +779,9 @@ class ConsoleSession(object):
self._evt = None
self.node = node
self.write = self.conshdl.write
util.spawn(self.delayinit(datacallback, skipreplay))
async def delayinit(self, datacallback, skipreplay):
if datacallback is None:
self.reaper = util.spawn_after(15, self.destroy)
self.databuffer = collections.deque([])
@ -786,10 +791,10 @@ class ConsoleSession(object):
else:
self.data_handler = datacallback
if not skipreplay:
for recdata in self.conshdl.get_recent():
for recdata in await self.conshdl.get_recent():
if recdata:
datacallback(recdata)
self.conshdl.attachsession(self)
await self.conshdl.attachsession(self)
def connect_session(self):
@ -827,9 +832,9 @@ class ConsoleSession(object):
"""
self.conshdl.reopen()
def destroy(self):
async def destroy(self):
if self.registered:
self.conshdl.detachsession(self)
await self.conshdl.detachsession(self)
if self._evt:
self._evt.send()
self._evt = None

View File

@ -22,6 +22,7 @@
import confluent.exceptions as cexc
import confluent.interface.console as conapi
import confluent.log as log
import confluent.util as util
import hashlib
import sys
@ -93,7 +94,7 @@ class SshShell(conapi.Console):
async def recvdata(self):
while self.connected:
pendingdata = await self.shell.stdout.read(8192)
pendingdata = await self.shell[1].read(8192)
if not pendingdata:
self.ssh.close()
if self.datacallback:
@ -101,14 +102,14 @@ class SshShell(conapi.Console):
return
self.datacallback(pendingdata)
def connect(self, callback):
async def connect(self, callback):
# for now, we just use the nodename as the presumptive ssh destination
# TODO(jjohnson2): use a 'nodeipget' utility function for architectures
# that would rather not use the nodename as anything but an opaque
# identifier
self.datacallback = callback
if self.username != b'':
self.logon()
await self.logon()
else:
self.inputmode = 0
callback('\r\nlogin as: ')
@ -116,7 +117,7 @@ class SshShell(conapi.Console):
def logon(self):
self.inputmode = -3
eventlet.spawn_n(self.do_logon)
util.spawn(self.do_logon())
async def do_logon(self):
sco = asyncssh.SSHClientConnectionOptions()
@ -126,7 +127,7 @@ class SshShell(conapi.Console):
try:
self.datacallback('\r\nConnecting to {}...'.format(self.node))
try:
self.ssh = await asyncssh.connect(self,node, username=self.username, password=self.password, known_hosts='/etc/ssh/ssh_known_hosts')
self.ssh = await asyncssh.connect(self.node, username=self.username.decode(), password=self.password.decode(), known_hosts='/etc/ssh/ssh_known_hosts')
except ValueError:
#TODO: non-cert ssh targets
raise
@ -153,9 +154,9 @@ class SshShell(conapi.Console):
self.inputmode = 2
self.connected = True
self.datacallback('Connected\r\n')
self.shell = self.ssh.invoke_shell(width=self.width,
height=self.height)
self.rxthread = eventlet.spawn(self.recvdata)
self.shell = await self.ssh.open_session(term_type='vt100', term_size=(self.width, self.height)) # self.ssh.invoke_shell(width=self.width,
# height=self.height)
self.rxthread = util.spawn(self.recvdata())
def write(self, data):
if self.inputmode == -2:
@ -223,7 +224,7 @@ class SshShell(conapi.Console):
self.datacallback(b'\r\n')
self.logon()
else:
self.shell.sendall(data)
self.shell[0].write(data.decode())
def close(self):
if self.ssh is not None:

View File

@ -41,9 +41,9 @@ class _ShellHandler(consoleserver.ConsoleHandler):
return
#return super().feedbuffer(data)
def get_recent(self):
retdata, connstate = super(_ShellHandler, self).get_recent()
return '', connstate
async def get_recent(self):
#retdata, connstate = await super(_ShellHandler, self).get_recent()
return '', {} # connstate
def _got_disconnected(self):
self.connectstate = 'closed'
@ -117,7 +117,7 @@ class ShellSession(consoleserver.ConsoleSession):
activesessions[(tenant, self.node, self.username)][self.sessionid] = _ShellHandler(self.node, self.configmanager, width=self.width, height=self.height)
self.conshdl = activesessions[(self.configmanager.tenant, self.node, self.username)][self.sessionid]
def destroy(self):
async def destroy(self):
try:
activesessions[(self.configmanager.tenant, self.node,
self.username)][self.sessionid].close()
@ -125,7 +125,7 @@ class ShellSession(consoleserver.ConsoleSession):
self.username)][self.sessionid]
except KeyError:
pass
super(ShellSession, self).destroy()
return await super(ShellSession, self).destroy()
def create(nodes, element, configmanager, inputdata):
# For creating a resource, it really has to be handled

View File

@ -39,7 +39,7 @@ import confluent.auth as auth
import confluent.credserver as credserver
import confluent.config.conf as conf
import confluent.asynctlvdata as tlvdata
#import confluent.consoleserver as consoleserver
import confluent.consoleserver as consoleserver
import confluent.config.configmanager as configmanager
import confluent.exceptions as exc
import confluent.log as log
@ -75,16 +75,16 @@ class ClientConsole(object):
self.xmit = False
self.pendingdata = []
def sendall(self, data):
async def sendall(self, data):
if not self.xmit:
self.pendingdata.append(data)
return
send_data(self.client, data)
await send_data(self.client, data)
def startsending(self):
async def startsending(self):
self.xmit = True
for datum in self.pendingdata:
send_data(self.client, datum)
await send_data(self.client, datum)
self.pendingdata = []
@ -255,9 +255,9 @@ async def process_request(
"error": "Target not found - " + str(e)})
send_data(connection, {"_requestdone": 1})
except exc.InvalidArgumentException as e:
send_data(connection, {"errorcode": 400,
await send_data(connection, {"errorcode": 400,
"error": "Bad Request - " + str(e)})
send_data(connection, {"_requestdone": 1})
await send_data(connection, {"_requestdone": 1})
await send_response(hdlr, connection)
return
@ -310,8 +310,8 @@ async def start_term(authname, cfm, connection, params, path, authdata, skipauth
async def term_interact(authdata, authname, ccons, cfm, connection, consession,
skipauth):
send_data(connection, {'started': 1})
ccons.startsending()
await send_data(connection, {'started': 1})
await ccons.startsending()
bufferage = consession.get_buffer_age()
if bufferage is not False:
send_data(connection, {'bufferage': bufferage})
@ -338,17 +338,17 @@ async def term_interact(authdata, authname, ccons, cfm, connection, consession,
continue
else:
try:
process_request(connection, data, cfm, authdata, authname,
skipauth)
await process_request(connection, data, cfm, authdata, authname,
skipauth)
except Exception as e:
tracelog.log(traceback.format_exc(),
ltype=log.DataTypes.event,
event=log.Events.stacktrace)
send_data(
await send_data(
connection,
{'errorcode': 500,
'error': 'Unexpected error - ' + str(e)})
send_data(connection, {'_requestdone': 1})
await send_data(connection, {'_requestdone': 1})
continue
if not data:
consession.destroy()