mirror of
https://github.com/xcat2/confluent.git
synced 2024-11-22 01:22:00 +00:00
Convert states from a number to a string
Remote clients are presented the data as-is rather than trying to coordinate the meaning of abstract numbers.
This commit is contained in:
parent
fcf8a36659
commit
15fefe6d1e
@ -7,6 +7,7 @@
|
||||
|
||||
#we track nodes that are actively being logged, watched, or have attached
|
||||
#there should be no more than one handler per node
|
||||
import collections
|
||||
import confluent.exceptions as exc
|
||||
import confluent.interface.console as conapi
|
||||
import confluent.log as log
|
||||
@ -23,7 +24,7 @@ class _ConsoleHandler(object):
|
||||
self.rcpts = {}
|
||||
self.cfgmgr = configmanager
|
||||
self.node = node
|
||||
self.connectstate = 0 # disconnected
|
||||
self.connectstate = 'unconnected'
|
||||
self.logger = log.Logger(node, tenant=configmanager.tenant)
|
||||
self.buffer = bytearray()
|
||||
(text, termstate) = self.logger.read_recent_text(8192)
|
||||
@ -36,12 +37,12 @@ class _ConsoleHandler(object):
|
||||
if termstate & 2:
|
||||
self.shiftin = '0'
|
||||
self.users = {}
|
||||
self.connectstate = 1 # connecting....
|
||||
self.connectstate = 'connecting'
|
||||
eventlet.spawn(self._connect)
|
||||
|
||||
|
||||
def _connect(self):
|
||||
self.connectstate = 1 # connecting....
|
||||
self.connectstate = 'connecting'
|
||||
self._send_rcpts({'connectstate': self.connectstate})
|
||||
if self.reconnect:
|
||||
self.reconnect.cancel()
|
||||
@ -52,7 +53,7 @@ class _ConsoleHandler(object):
|
||||
try:
|
||||
self._console.connect(self.get_console_output)
|
||||
except exc.TargetEndpointUnreachable:
|
||||
self.connectstate = 0
|
||||
self.connectstate = 'unconnected'
|
||||
self._send_rcpts({'connectstate': self.connectstate})
|
||||
retrytime = 30 + (30 * random.random())
|
||||
print "Console was unreachable, waiting %d seconds..." % retrytime
|
||||
@ -61,11 +62,11 @@ class _ConsoleHandler(object):
|
||||
self._got_connected()
|
||||
|
||||
def _got_connected(self):
|
||||
self.connectstate = 2 # connected
|
||||
self.connectstate = 'connected'
|
||||
self._send_rcpts({'connectstate': self.connectstate})
|
||||
|
||||
def _got_disconnected(self):
|
||||
self.connecstate = 0 # disconnected
|
||||
self.connecstate = 'unconnected'
|
||||
eventlet.spawn(self._send_disconnect_events)
|
||||
self._send_rcpts({'connectstate': self.connectstate})
|
||||
self._connect()
|
||||
@ -79,14 +80,14 @@ class _ConsoleHandler(object):
|
||||
while hdl in self.rcpts:
|
||||
hdl = random.random()
|
||||
self.rcpts[hdl] = callback
|
||||
if self.connectstate == 0:
|
||||
if self.connectstate == 'unconnected':
|
||||
# if console is not connected, take time to try to assert
|
||||
# connectivity now.
|
||||
if self.reconnect:
|
||||
# cancel an automated retry if one is pending
|
||||
self.reconnect.cancel()
|
||||
self.reconnect = None
|
||||
self.connectstate = 1 # connecting....
|
||||
self.connectstate = 'connecting'
|
||||
eventlet.spawn(self._connect)
|
||||
return hdl
|
||||
|
||||
@ -237,7 +238,8 @@ class ConsoleSession(object):
|
||||
self.write = _handled_consoles[consk].write
|
||||
if datacallback is None:
|
||||
self.reaper = eventlet.spawn_after(15, self.destroy)
|
||||
self.databuffer = _handled_consoles[consk].get_recent()
|
||||
self.databuffer = collections.deque([])
|
||||
self.databuffer.append(_handled_consoles[consk].get_recent())
|
||||
self.reghdl = _handled_consoles[consk].register_rcpt(self.got_data)
|
||||
else:
|
||||
self.reghdl = _handled_consoles[consk].register_rcpt(datacallback)
|
||||
@ -258,7 +260,7 @@ class ConsoleSession(object):
|
||||
If the caller does not provide a callback and instead will be polling
|
||||
for data, we must maintain data in a buffer until retrieved
|
||||
"""
|
||||
self.databuffer += data
|
||||
self.databuffer.append(data)
|
||||
self._evt.set()
|
||||
|
||||
def get_next_output(self, timeout=45):
|
||||
@ -268,12 +270,22 @@ class ConsoleSession(object):
|
||||
at least one case where we don't have that luxury
|
||||
"""
|
||||
self.reaper.cancel()
|
||||
if len(self.databuffer) == 0:
|
||||
if not self.databuffer:
|
||||
self._evt.wait(timeout)
|
||||
retval = self.databuffer
|
||||
self.databuffer = ""
|
||||
if self._evt is not None:
|
||||
self._evt.clear()
|
||||
if not self.databuffer:
|
||||
self.reaper = eventlet.spawn_after(15, self.destroy)
|
||||
return ""
|
||||
currdata = self.databuffer.popleft()
|
||||
if isinstance(currdata, dict):
|
||||
self.reaper = eventlet.spawn_after(15, self.destroy)
|
||||
return currdata
|
||||
retval = currdata
|
||||
while self.databuffer and not isinstance(self.databuffer[0], dict):
|
||||
retval += self.databuffer.popleft()
|
||||
# the client has 15 seconds to make a new request for data before
|
||||
# they are given up on
|
||||
self.reaper = eventlet.spawn_after(15, self.destroy)
|
||||
return retval
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user