mirror of
https://github.com/xcat2/confluent.git
synced 2025-01-27 19:37:57 +00:00
Begin tracking disconnected/connecting/connected state in console server
This also has failed connection attempts causing a 30-60 second retry. Said retry is done early if a user session attaches in the meantime.
This commit is contained in:
parent
a2f540d3b9
commit
fcf8a36659
@ -7,6 +7,7 @@
|
||||
|
||||
#we track nodes that are actively being logged, watched, or have attached
|
||||
#there should be no more than one handler per node
|
||||
import confluent.exceptions as exc
|
||||
import confluent.interface.console as conapi
|
||||
import confluent.log as log
|
||||
import confluent.pluginapi as plugin
|
||||
@ -22,25 +23,52 @@ class _ConsoleHandler(object):
|
||||
self.rcpts = {}
|
||||
self.cfgmgr = configmanager
|
||||
self.node = node
|
||||
self.connectstate = 0 # disconnected
|
||||
self.logger = log.Logger(node, tenant=configmanager.tenant)
|
||||
self.buffer = bytearray()
|
||||
(text, termstate) = self.logger.read_recent_text(8192)
|
||||
self.buffer += text
|
||||
self.appmodedetected = False
|
||||
self.shiftin = None
|
||||
self.reconnect = None
|
||||
if termstate & 1:
|
||||
self.appmodedetected = True
|
||||
if termstate & 2:
|
||||
self.shiftin = '0'
|
||||
self._connect()
|
||||
self.users = {}
|
||||
self.connectstate = 1 # connecting....
|
||||
eventlet.spawn(self._connect)
|
||||
|
||||
|
||||
def _connect(self):
|
||||
self.connectstate = 1 # connecting....
|
||||
self._send_rcpts({'connectstate': self.connectstate})
|
||||
if self.reconnect:
|
||||
self.reconnect.cancel()
|
||||
self.reconnect = None
|
||||
self._console = plugin.handle_path(
|
||||
"/nodes/%s/_console/session" % self.node,
|
||||
"create", self.cfgmgr)
|
||||
self._console.connect(self.get_console_output)
|
||||
try:
|
||||
self._console.connect(self.get_console_output)
|
||||
except exc.TargetEndpointUnreachable:
|
||||
self.connectstate = 0
|
||||
self._send_rcpts({'connectstate': self.connectstate})
|
||||
retrytime = 30 + (30 * random.random())
|
||||
print "Console was unreachable, waiting %d seconds..." % retrytime
|
||||
self.reconnect = eventlet.spawn_after(retrytime, self._connect)
|
||||
return
|
||||
self._got_connected()
|
||||
|
||||
def _got_connected(self):
|
||||
self.connectstate = 2 # connected
|
||||
self._send_rcpts({'connectstate': self.connectstate})
|
||||
|
||||
def _got_disconnected(self):
|
||||
self.connecstate = 0 # disconnected
|
||||
eventlet.spawn(self._send_disconnect_events)
|
||||
self._send_rcpts({'connectstate': self.connectstate})
|
||||
self._connect()
|
||||
|
||||
def unregister_rcpt(self, handle):
|
||||
if handle in self.rcpts:
|
||||
@ -51,6 +79,15 @@ class _ConsoleHandler(object):
|
||||
while hdl in self.rcpts:
|
||||
hdl = random.random()
|
||||
self.rcpts[hdl] = callback
|
||||
if self.connectstate == 0:
|
||||
# if console is not connected, take time to try to assert
|
||||
# connectivity now.
|
||||
if self.reconnect:
|
||||
# cancel an automated retry if one is pending
|
||||
self.reconnect.cancel()
|
||||
self.reconnect = None
|
||||
self.connectstate = 1 # connecting....
|
||||
eventlet.spawn(self._connect)
|
||||
return hdl
|
||||
|
||||
def flushbuffer(self):
|
||||
@ -91,7 +128,7 @@ class _ConsoleHandler(object):
|
||||
def _handle_console_output(self, data):
|
||||
if type(data) == int:
|
||||
if data == conapi.ConsoleEvent.Disconnect:
|
||||
self._connect()
|
||||
self._got_disconnected()
|
||||
return
|
||||
prefix = ''
|
||||
if '\0' in data: # there is a null in the output
|
||||
@ -119,9 +156,12 @@ class _ConsoleHandler(object):
|
||||
# certificate signing request
|
||||
if len(self.buffer) > 16384:
|
||||
self.flushbuffer()
|
||||
self._send_rcpts(prefix + data)
|
||||
|
||||
def _send_rcpts(self, data):
|
||||
for rcpt in self.rcpts.itervalues():
|
||||
try:
|
||||
rcpt(prefix + data)
|
||||
rcpt(data)
|
||||
except:
|
||||
pass
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user