mirror of
https://github.com/xcat2/confluent.git
synced 2025-08-21 10:40:28 +00:00
Rework session to console relationship
Previously, was using counters to track the relation, also had distinct tracking of users versus callbacks. Unify the callback and user into a single 'session' attach and then use the size of the set of sessions and their declared users rather than trying to maintain a counter on the side. This change simplifies the relationship, changes away the logging and clientcount counter for a more robust strategy, and paves the way for the dependent ShellHandler to terminate connected sessions when the shell session dies.
This commit is contained in:
@@ -47,13 +47,12 @@ class ConsoleHandler(object):
|
||||
self._dologging = True
|
||||
self._isondemand = False
|
||||
self.error = None
|
||||
self.rcpts = {}
|
||||
self.cfgmgr = configmanager
|
||||
self.node = node
|
||||
self.connectstate = 'unconnected'
|
||||
self.clientcount = 0
|
||||
self._isalive = True
|
||||
self.buffer = bytearray()
|
||||
self.livesessions = set([])
|
||||
if self._logtobuffer:
|
||||
self.logger = log.Logger(node, console=True,
|
||||
tenant=configmanager.tenant)
|
||||
@@ -141,7 +140,7 @@ class ConsoleHandler(object):
|
||||
self._ondemand()
|
||||
if logvalue == 'none':
|
||||
self._dologging = False
|
||||
if not self._isondemand or self.clientcount > 0:
|
||||
if not self._isondemand or self.livesessions:
|
||||
eventlet.spawn(self._connect)
|
||||
|
||||
def log(self, *args, **kwargs):
|
||||
@@ -173,7 +172,7 @@ class ConsoleHandler(object):
|
||||
|
||||
def _ondemand(self):
|
||||
self._isondemand = True
|
||||
if self.clientcount < 1 and self._console:
|
||||
if not self.livesessions and self._console:
|
||||
self._disconnect()
|
||||
|
||||
def _connect(self):
|
||||
@@ -276,32 +275,6 @@ class ConsoleHandler(object):
|
||||
self.connectionthread.kill()
|
||||
self.connectionthread = None
|
||||
|
||||
def unregister_rcpt(self, handle):
|
||||
self.clientcount -= 1
|
||||
if handle in self.rcpts:
|
||||
del self.rcpts[handle]
|
||||
self._send_rcpts({'clientcount': self.clientcount})
|
||||
if self._isondemand and self.clientcount < 1:
|
||||
self._disconnect()
|
||||
|
||||
def register_rcpt(self, callback):
|
||||
self.clientcount += 1
|
||||
self._send_rcpts({'clientcount': self.clientcount})
|
||||
hdl = random.random()
|
||||
while hdl in self.rcpts:
|
||||
hdl = random.random()
|
||||
self.rcpts[hdl] = callback
|
||||
if self.connectstate == 'unconnected':
|
||||
# if console is not connected, take time to try to assert
|
||||
# connectivity now.
|
||||
if self.reconnect:
|
||||
# cancel an automated retry if one is pending
|
||||
self.reconnect.cancel()
|
||||
self.reconnect = None
|
||||
self.connectstate = 'connecting'
|
||||
eventlet.spawn(self._connect)
|
||||
return hdl
|
||||
|
||||
def flushbuffer(self):
|
||||
# Logging is handled in a different stream
|
||||
# this buffer is now just for having screen redraw on
|
||||
@@ -313,39 +286,44 @@ class ConsoleHandler(object):
|
||||
# to the console object
|
||||
eventlet.spawn(self._handle_console_output, data)
|
||||
|
||||
def attachuser(self, username):
|
||||
if username in self.users:
|
||||
self.users[username] += 1
|
||||
else:
|
||||
self.users[username] = 1
|
||||
edata = self.users[username]
|
||||
if edata > 2: # for log purposes, only need to
|
||||
# clearly indicate redundant connections
|
||||
# not connection count
|
||||
edata = 2
|
||||
if edata < 0:
|
||||
_tracelog.log('client count negative' + traceback.format_exc(),
|
||||
ltype=log.DataTypes.event,
|
||||
event=log.Events.stacktrace)
|
||||
edata = 0
|
||||
def attachsession(self, session):
|
||||
edata = 1
|
||||
for currsession in self.livesessions:
|
||||
if currsession.username == session.username:
|
||||
# indicate that user has multiple connections
|
||||
edata = 2
|
||||
self.livesessions.add(session)
|
||||
self.log(
|
||||
logdata=username, ltype=log.DataTypes.event,
|
||||
logdata=session.username, ltype=log.DataTypes.event,
|
||||
event=log.Events.clientconnect, eventdata=edata)
|
||||
self._send_rcpts({'clientcount': len(self.livesessions)})
|
||||
if self.connectstate == 'unconnected':
|
||||
# if console is not connected, take time to try to assert
|
||||
# connectivity now.
|
||||
if self.reconnect:
|
||||
# cancel an automated retry if one is pending
|
||||
self.reconnect.cancel()
|
||||
self.reconnect = None
|
||||
self.connectstate = 'connecting'
|
||||
eventlet.spawn(self._connect)
|
||||
|
||||
def detachuser(self, username):
|
||||
self.users[username] -= 1
|
||||
if self.users[username] < 2:
|
||||
edata = self.users[username]
|
||||
else:
|
||||
edata = 2
|
||||
if edata < 0:
|
||||
_tracelog.log('client count negative' + traceback.format_exc(),
|
||||
ltype=log.DataTypes.event,
|
||||
event=log.Events.stacktrace)
|
||||
edata = 0
|
||||
|
||||
|
||||
def detachsession(self, session):
|
||||
edata = 0
|
||||
self.livesessions.discard(session)
|
||||
for currsession in self.livesessions:
|
||||
if currsession.username == session.username:
|
||||
edata += 1
|
||||
if edata > 1: # don't bother counting beyond 2 in the log
|
||||
break
|
||||
self.log(
|
||||
logdata=username, ltype=log.DataTypes.event,
|
||||
logdata=session.username, ltype=log.DataTypes.event,
|
||||
event=log.Events.clientdisconnect, eventdata=edata)
|
||||
self._send_rcpts({'clientcount': len(self.livesessions)})
|
||||
if self._isondemand and not self.livesessions:
|
||||
self._disconnect()
|
||||
|
||||
|
||||
def reopen(self):
|
||||
self._got_disconnected()
|
||||
@@ -385,11 +363,12 @@ class ConsoleHandler(object):
|
||||
self._send_rcpts(data)
|
||||
|
||||
def _send_rcpts(self, data):
|
||||
for rcpt in self.rcpts.itervalues():
|
||||
for rcpt in self.livesessions:
|
||||
try:
|
||||
rcpt(data)
|
||||
rcpt.data_handler(data)
|
||||
except: # No matter the reason, advance to next recipient
|
||||
pass
|
||||
_tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event,
|
||||
event=log.Events.stacktrace)
|
||||
|
||||
def get_recent(self):
|
||||
"""Retrieve 'recent' data
|
||||
@@ -401,7 +380,7 @@ class ConsoleHandler(object):
|
||||
# a scheme always tracking the last clear screen would be too costly
|
||||
connstate = {
|
||||
'connectstate': self.connectstate,
|
||||
'clientcount': self.clientcount,
|
||||
'clientcount': len(self.livesessions),
|
||||
}
|
||||
retdata = ''
|
||||
if self.shiftin is not None: # detected that terminal requested a
|
||||
@@ -504,22 +483,23 @@ class ConsoleSession(object):
|
||||
self.configmanager = configmanager
|
||||
self.connect_session()
|
||||
self.registered = True
|
||||
self.conshdl.attachuser(self.username)
|
||||
self._evt = None
|
||||
self.node = node
|
||||
self.write = self.conshdl.write
|
||||
if datacallback is None:
|
||||
self.reaper = eventlet.spawn_after(15, self.destroy)
|
||||
self.databuffer = collections.deque([])
|
||||
self.reghdl = self.conshdl.register_rcpt(self.got_data)
|
||||
self.data_handler = self.got_data
|
||||
if not skipreplay:
|
||||
self.databuffer.extend(self.conshdl.get_recent())
|
||||
else:
|
||||
self.reghdl = self.conshdl.register_rcpt(datacallback)
|
||||
self.data_handler = datacallback
|
||||
if not skipreplay:
|
||||
for recdata in self.conshdl.get_recent():
|
||||
if recdata:
|
||||
datacallback(recdata)
|
||||
self.conshdl.attachsession(self)
|
||||
|
||||
|
||||
def connect_session(self):
|
||||
"""Connect to the appropriate backend handler
|
||||
@@ -554,8 +534,7 @@ class ConsoleSession(object):
|
||||
|
||||
def destroy(self):
|
||||
if self.registered:
|
||||
self.conshdl.detachuser(self.username)
|
||||
self.conshdl.unregister_rcpt(self.reghdl)
|
||||
self.conshdl.detachsession(self)
|
||||
self.databuffer = None
|
||||
self._evt = None
|
||||
self.reghdl = None
|
||||
|
Reference in New Issue
Block a user