diff --git a/confluent/consoleserver.py b/confluent/consoleserver.py index b0f28d58..656cb6a1 100644 --- a/confluent/consoleserver.py +++ b/confluent/consoleserver.py @@ -7,6 +7,7 @@ #we track nodes that are actively being logged, watched, or have attached #there should be no more than one handler per node +import confluent.exceptions as exc import confluent.interface.console as conapi import confluent.log as log import confluent.pluginapi as plugin @@ -22,25 +23,52 @@ class _ConsoleHandler(object): self.rcpts = {} self.cfgmgr = configmanager self.node = node + self.connectstate = 0 # disconnected self.logger = log.Logger(node, tenant=configmanager.tenant) self.buffer = bytearray() (text, termstate) = self.logger.read_recent_text(8192) self.buffer += text self.appmodedetected = False self.shiftin = None + self.reconnect = None if termstate & 1: self.appmodedetected = True if termstate & 2: self.shiftin = '0' - self._connect() self.users = {} + self.connectstate = 1 # connecting.... + eventlet.spawn(self._connect) def _connect(self): + self.connectstate = 1 # connecting.... + self._send_rcpts({'connectstate': self.connectstate}) + if self.reconnect: + self.reconnect.cancel() + self.reconnect = None self._console = plugin.handle_path( "/nodes/%s/_console/session" % self.node, "create", self.cfgmgr) - self._console.connect(self.get_console_output) + try: + self._console.connect(self.get_console_output) + except exc.TargetEndpointUnreachable: + self.connectstate = 0 + self._send_rcpts({'connectstate': self.connectstate}) + retrytime = 30 + (30 * random.random()) + print "Console was unreachable, waiting %d seconds..." % retrytime + self.reconnect = eventlet.spawn_after(retrytime, self._connect) + return + self._got_connected() + + def _got_connected(self): + self.connectstate = 2 # connected + self._send_rcpts({'connectstate': self.connectstate}) + + def _got_disconnected(self): + self.connecstate = 0 # disconnected + eventlet.spawn(self._send_disconnect_events) + self._send_rcpts({'connectstate': self.connectstate}) + self._connect() def unregister_rcpt(self, handle): if handle in self.rcpts: @@ -51,6 +79,15 @@ class _ConsoleHandler(object): while hdl in self.rcpts: hdl = random.random() self.rcpts[hdl] = callback + if self.connectstate == 0: + # if console is not connected, take time to try to assert + # connectivity now. + if self.reconnect: + # cancel an automated retry if one is pending + self.reconnect.cancel() + self.reconnect = None + self.connectstate = 1 # connecting.... + eventlet.spawn(self._connect) return hdl def flushbuffer(self): @@ -91,7 +128,7 @@ class _ConsoleHandler(object): def _handle_console_output(self, data): if type(data) == int: if data == conapi.ConsoleEvent.Disconnect: - self._connect() + self._got_disconnected() return prefix = '' if '\0' in data: # there is a null in the output @@ -119,9 +156,12 @@ class _ConsoleHandler(object): # certificate signing request if len(self.buffer) > 16384: self.flushbuffer() + self._send_rcpts(prefix + data) + + def _send_rcpts(self, data): for rcpt in self.rcpts.itervalues(): try: - rcpt(prefix + data) + rcpt(data) except: pass