mirror of
https://github.com/xcat2/confluent.git
synced 2025-01-18 05:33:17 +00:00
Refactor socket api terminal handler
The 'terminal' handler code is broken out to get ready for using 'shell' as well as 'console'
This commit is contained in:
parent
30ed563810
commit
0e821a7bfe
@ -39,6 +39,7 @@ import confluent.config.configmanager as configmanager
|
||||
import confluent.exceptions as exc
|
||||
import confluent.log as log
|
||||
import confluent.core as pluginapi
|
||||
import confluent.shellserver as shellserver
|
||||
|
||||
|
||||
tracelog = None
|
||||
@ -169,7 +170,7 @@ def process_request(connection, request, cfm, authdata, authname, skipauth):
|
||||
raise ValueError
|
||||
operation = request['operation']
|
||||
path = request['path']
|
||||
params = request.get('parameters', None)
|
||||
params = request.get('parameters', {})
|
||||
hdlr = None
|
||||
if not skipauth:
|
||||
authdata = auth.authorize(authdata[2], path, authdata[3], operation)
|
||||
@ -188,42 +189,7 @@ def process_request(connection, request, cfm, authdata, authname, skipauth):
|
||||
auditlog.log(auditmsg)
|
||||
try:
|
||||
if operation == 'start':
|
||||
elems = path.split('/')
|
||||
if elems[3] != "console":
|
||||
raise exc.InvalidArgumentException()
|
||||
node = elems[2]
|
||||
ccons = ClientConsole(connection)
|
||||
skipreplay = False
|
||||
if params and 'skipreplay' in params and params['skipreplay']:
|
||||
skipreplay = True
|
||||
consession = consoleserver.ConsoleSession(
|
||||
node=node, configmanager=cfm, username=authname,
|
||||
datacallback=ccons.sendall, skipreplay=skipreplay)
|
||||
if consession is None:
|
||||
raise Exception("TODO")
|
||||
send_data(connection, {'started': 1})
|
||||
ccons.startsending()
|
||||
bufferage = consession.get_buffer_age()
|
||||
if bufferage is not False:
|
||||
send_data(connection, {'bufferage': bufferage})
|
||||
while consession is not None:
|
||||
data = tlvdata.recv(connection)
|
||||
if type(data) == dict:
|
||||
if data['operation'] == 'stop':
|
||||
consession.destroy()
|
||||
return
|
||||
elif data['operation'] == 'break':
|
||||
consession.send_break()
|
||||
continue
|
||||
elif data['operation'] == 'reopen':
|
||||
consession.reopen()
|
||||
continue
|
||||
else:
|
||||
raise Exception("TODO")
|
||||
if not data:
|
||||
consession.destroy()
|
||||
return
|
||||
consession.write(data)
|
||||
return start_term(authname, cfm, connection, params, path)
|
||||
elif operation == 'shutdown':
|
||||
configmanager.ConfigManager.shutdown()
|
||||
else:
|
||||
@ -240,6 +206,59 @@ def process_request(connection, request, cfm, authdata, authname, skipauth):
|
||||
return
|
||||
|
||||
|
||||
def start_term(authname, cfm, connection, params, path):
|
||||
elems = path.split('/')
|
||||
if len(elems) < 4 or elems[1] != 'nodes':
|
||||
raise exc.InvalidArgumentException('Invalid path {0}'.format(path))
|
||||
node = elems[2]
|
||||
ccons = ClientConsole(connection)
|
||||
skipreplay = False
|
||||
if params and 'skipreplay' in params and params['skipreplay']:
|
||||
skipreplay = True
|
||||
if elems[3] == "console":
|
||||
consession = consoleserver.ConsoleSession(
|
||||
node=node, configmanager=cfm, username=authname,
|
||||
datacallback=ccons.sendall, skipreplay=skipreplay)
|
||||
elif len(elems) >= 6 and elems[3:5] == ['shell', 'sessions']:
|
||||
if len(elems) == 7:
|
||||
sessionid = elems[5]
|
||||
else:
|
||||
sessionid = None
|
||||
consession = shellserver.ShellSession(
|
||||
node=node, configmanager=cfm, username=authname,
|
||||
datacallback=ccons.sendall, skipreplay=skipreplay,
|
||||
sessionid=sessionid)
|
||||
|
||||
else:
|
||||
raise exc.InvalidArgumentException('Invalid path {0}'.format(path))
|
||||
|
||||
if consession is None:
|
||||
raise Exception("TODO")
|
||||
send_data(connection, {'started': 1})
|
||||
ccons.startsending()
|
||||
bufferage = consession.get_buffer_age()
|
||||
if bufferage is not False:
|
||||
send_data(connection, {'bufferage': bufferage})
|
||||
while consession is not None:
|
||||
data = tlvdata.recv(connection)
|
||||
if type(data) == dict:
|
||||
if data['operation'] == 'stop':
|
||||
consession.destroy()
|
||||
return
|
||||
elif data['operation'] == 'break':
|
||||
consession.send_break()
|
||||
continue
|
||||
elif data['operation'] == 'reopen':
|
||||
consession.reopen()
|
||||
continue
|
||||
else:
|
||||
raise Exception("TODO")
|
||||
if not data:
|
||||
consession.destroy()
|
||||
return
|
||||
consession.write(data)
|
||||
|
||||
|
||||
def _tlshandler(bind_host, bind_port):
|
||||
plainsocket = socket.socket(socket.AF_INET6)
|
||||
plainsocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
|
Loading…
x
Reference in New Issue
Block a user