mirror of
https://github.com/xcat2/confluent.git
synced 2025-04-14 17:19:32 +00:00
Implement client reconnect on server restart
When the server restarts, clients should try to reconnect seamlessly. Doing so allows maintenance without being overly disruptive to users.
This commit is contained in:
parent
00a33b05b7
commit
0cd4ffcd3b
@ -49,6 +49,7 @@ import os
|
||||
import readline
|
||||
import select
|
||||
import shlex
|
||||
import socket
|
||||
import sys
|
||||
import termios
|
||||
import time
|
||||
@ -312,8 +313,13 @@ def do_command(command, server):
|
||||
targpath = fullpath_target(argv[1])
|
||||
nodename = targpath.split('/')[-3]
|
||||
currconsole = targpath
|
||||
startrequest = {'operation': 'start', 'path': targpath,
|
||||
'parameters': {}}
|
||||
for param in argv[2:]:
|
||||
(parmkey, parmval) = param.split("=")
|
||||
startrequest['parameters'][parmkey] = parmval
|
||||
tlvdata.send(
|
||||
session.connection, {'operation': 'start', 'path': targpath})
|
||||
session.connection, startrequest)
|
||||
status = tlvdata.recv(session.connection)
|
||||
if 'error' in status:
|
||||
if 'errorcode' in status:
|
||||
@ -322,7 +328,6 @@ def do_command(command, server):
|
||||
while '_requestdone' not in status:
|
||||
status = tlvdata.recv(session.connection)
|
||||
return
|
||||
print '[console session started]'
|
||||
startconsole(nodename)
|
||||
return
|
||||
elif argv[0] == 'set':
|
||||
@ -534,27 +539,36 @@ parser.add_option("-c", "--control", dest="controlpath",
|
||||
help="Path to offer terminal control",
|
||||
metavar="PATH")
|
||||
opts, shellargs = parser.parse_args()
|
||||
if opts.controlpath:
|
||||
termhandler.TermHandler(opts.controlpath)
|
||||
if opts.netserver: # going over a TLS network
|
||||
session = client.Command(opts.netserver)
|
||||
elif 'CONFLUENT_HOST' in os.environ:
|
||||
session = client.Command(os.environ['CONFLUENT_HOST'])
|
||||
else: # unix domain
|
||||
session = client.Command()
|
||||
|
||||
#Next stop, reading and writing from whichever of stdin and server goes first.
|
||||
#see pyghmi code for solconnect.py
|
||||
if not session.authenticated and 'CONFLUENT_USER' in os.environ:
|
||||
username = os.environ['CONFLUENT_USER']
|
||||
passphrase = os.environ['CONFLUENT_PASSPHRASE']
|
||||
session.authenticate(username, passphrase)
|
||||
while not session.authenticated:
|
||||
username = raw_input("Name: ")
|
||||
readline.clear_history()
|
||||
passphrase = getpass.getpass("Passphrase: ")
|
||||
session.authenticate(username, passphrase)
|
||||
# clear on start can help with readable of TUI, but it
|
||||
username = None
|
||||
passphrase = None
|
||||
def server_connect():
|
||||
global session, username, passphrase
|
||||
if opts.controlpath:
|
||||
termhandler.TermHandler(opts.controlpath)
|
||||
if opts.netserver: # going over a TLS network
|
||||
session = client.Command(opts.netserver)
|
||||
elif 'CONFLUENT_HOST' in os.environ:
|
||||
session = client.Command(os.environ['CONFLUENT_HOST'])
|
||||
else: # unix domain
|
||||
session = client.Command()
|
||||
|
||||
# Next stop, reading and writing from whichever of stdin and server goes first.
|
||||
#see pyghmi code for solconnect.py
|
||||
if not session.authenticated and username is not None:
|
||||
session.authenticate(username, passphrase)
|
||||
if not session.authenticated and 'CONFLUENT_USER' in os.environ:
|
||||
username = os.environ['CONFLUENT_USER']
|
||||
passphrase = os.environ['CONFLUENT_PASSPHRASE']
|
||||
session.authenticate(username, passphrase)
|
||||
while not session.authenticated:
|
||||
username = raw_input("Name: ")
|
||||
readline.clear_history()
|
||||
passphrase = getpass.getpass("Passphrase: ")
|
||||
session.authenticate(username, passphrase)
|
||||
|
||||
|
||||
server_connect()# clear on start can help with readable of TUI, but it
|
||||
# can be annoying, so for now don't do it.
|
||||
# sys.stdout.write('\x1b[H\x1b[J')
|
||||
# sys.stdout.flush()
|
||||
@ -601,9 +615,25 @@ while inconsole or not doexit:
|
||||
updatestatus()
|
||||
sys.stdout.flush()
|
||||
else:
|
||||
doexit = True
|
||||
inconsole = False
|
||||
sys.stdout.write("\r\n[remote disconnected]\r\n")
|
||||
deadline = 5
|
||||
connected = False
|
||||
while not connected and deadline > 0:
|
||||
try:
|
||||
server_connect()
|
||||
connected = True
|
||||
except socket.gaierror:
|
||||
pass
|
||||
if not connected:
|
||||
time.sleep(1)
|
||||
deadline -=1
|
||||
if connected:
|
||||
do_command(
|
||||
"start /nodes/%s/console/session skipreplay=True" % consolename,
|
||||
netserver)
|
||||
else:
|
||||
doexit = True
|
||||
inconsole = False
|
||||
sys.stdout.write("\r\n[remote disconnected]\r\n")
|
||||
break
|
||||
else:
|
||||
myinput = fh.read()
|
||||
|
@ -449,7 +449,8 @@ class ConsoleSession(object):
|
||||
:param node: Name of the node for which this session will be created
|
||||
"""
|
||||
|
||||
def __init__(self, node, configmanager, username, datacallback=None):
|
||||
def __init__(self, node, configmanager, username, datacallback=None,
|
||||
skipreplay=False):
|
||||
self.tenant = configmanager.tenant
|
||||
if not configmanager.is_node(node):
|
||||
raise exc.NotFoundException("Invalid node")
|
||||
@ -466,12 +467,14 @@ class ConsoleSession(object):
|
||||
self.reaper = eventlet.spawn_after(15, self.destroy)
|
||||
self.databuffer = collections.deque([])
|
||||
self.reghdl = _handled_consoles[consk].register_rcpt(self.got_data)
|
||||
self.databuffer.extend(_handled_consoles[consk].get_recent())
|
||||
if not skipreplay:
|
||||
self.databuffer.extend(_handled_consoles[consk].get_recent())
|
||||
else:
|
||||
self.reghdl = _handled_consoles[consk].register_rcpt(datacallback)
|
||||
for recdata in _handled_consoles[consk].get_recent():
|
||||
if recdata:
|
||||
datacallback(recdata)
|
||||
if not skipreplay:
|
||||
for recdata in _handled_consoles[consk].get_recent():
|
||||
if recdata:
|
||||
datacallback(recdata)
|
||||
|
||||
def send_break(self):
|
||||
self.conshdl.send_break()
|
||||
|
@ -311,10 +311,13 @@ def resourcehandler_backend(env, start_response):
|
||||
auditmsg['tenant'] = authorized['tenant']
|
||||
auditlog.log(auditmsg)
|
||||
# Request for new session
|
||||
skipreplay = False
|
||||
if 'skipreplay' in querydict and querydict['skipreplay']:
|
||||
skipreplay = True
|
||||
try:
|
||||
consession = consoleserver.ConsoleSession(
|
||||
node=nodename, configmanager=cfgmgr,
|
||||
username=authorized['username'])
|
||||
username=authorized['username'], skipreplay=skipreplay)
|
||||
except exc.NotFoundException:
|
||||
start_response("404 Not found", headers)
|
||||
yield "404 - Request Path not recognized"
|
||||
|
@ -167,9 +167,12 @@ def process_request(connection, request, cfm, authdata, authname, skipauth):
|
||||
raise exc.InvalidArgumentException()
|
||||
node = elems[2]
|
||||
ccons = ClientConsole(connection)
|
||||
skipreplay = False
|
||||
if params and 'skipreplay' in params and params['skipreplay']:
|
||||
skipreplay = True
|
||||
consession = consoleserver.ConsoleSession(
|
||||
node=node, configmanager=cfm, username=authname,
|
||||
datacallback=ccons.sendall)
|
||||
datacallback=ccons.sendall, skipreplay=skipreplay)
|
||||
if consession is None:
|
||||
raise Exception("TODO")
|
||||
tlvdata.send(connection, {'started': 1})
|
||||
|
Loading…
x
Reference in New Issue
Block a user