diff --git a/confluent_server/confluent/consoleserver.py b/confluent_server/confluent/consoleserver.py index d9996b7e..09b4c0a1 100644 --- a/confluent_server/confluent/consoleserver.py +++ b/confluent_server/confluent/consoleserver.py @@ -47,13 +47,12 @@ class ConsoleHandler(object): self._dologging = True self._isondemand = False self.error = None - self.rcpts = {} self.cfgmgr = configmanager self.node = node self.connectstate = 'unconnected' - self.clientcount = 0 self._isalive = True self.buffer = bytearray() + self.livesessions = set([]) if self._logtobuffer: self.logger = log.Logger(node, console=True, tenant=configmanager.tenant) @@ -141,7 +140,7 @@ class ConsoleHandler(object): self._ondemand() if logvalue == 'none': self._dologging = False - if not self._isondemand or self.clientcount > 0: + if not self._isondemand or self.livesessions: eventlet.spawn(self._connect) def log(self, *args, **kwargs): @@ -173,7 +172,7 @@ class ConsoleHandler(object): def _ondemand(self): self._isondemand = True - if self.clientcount < 1 and self._console: + if not self.livesessions and self._console: self._disconnect() def _connect(self): @@ -276,32 +275,6 @@ class ConsoleHandler(object): self.connectionthread.kill() self.connectionthread = None - def unregister_rcpt(self, handle): - self.clientcount -= 1 - if handle in self.rcpts: - del self.rcpts[handle] - self._send_rcpts({'clientcount': self.clientcount}) - if self._isondemand and self.clientcount < 1: - self._disconnect() - - def register_rcpt(self, callback): - self.clientcount += 1 - self._send_rcpts({'clientcount': self.clientcount}) - hdl = random.random() - while hdl in self.rcpts: - hdl = random.random() - self.rcpts[hdl] = callback - if self.connectstate == 'unconnected': - # if console is not connected, take time to try to assert - # connectivity now. - if self.reconnect: - # cancel an automated retry if one is pending - self.reconnect.cancel() - self.reconnect = None - self.connectstate = 'connecting' - eventlet.spawn(self._connect) - return hdl - def flushbuffer(self): # Logging is handled in a different stream # this buffer is now just for having screen redraw on @@ -313,39 +286,44 @@ class ConsoleHandler(object): # to the console object eventlet.spawn(self._handle_console_output, data) - def attachuser(self, username): - if username in self.users: - self.users[username] += 1 - else: - self.users[username] = 1 - edata = self.users[username] - if edata > 2: # for log purposes, only need to - # clearly indicate redundant connections - # not connection count - edata = 2 - if edata < 0: - _tracelog.log('client count negative' + traceback.format_exc(), - ltype=log.DataTypes.event, - event=log.Events.stacktrace) - edata = 0 + def attachsession(self, session): + edata = 1 + for currsession in self.livesessions: + if currsession.username == session.username: + # indicate that user has multiple connections + edata = 2 + self.livesessions.add(session) self.log( - logdata=username, ltype=log.DataTypes.event, + logdata=session.username, ltype=log.DataTypes.event, event=log.Events.clientconnect, eventdata=edata) + self._send_rcpts({'clientcount': len(self.livesessions)}) + if self.connectstate == 'unconnected': + # if console is not connected, take time to try to assert + # connectivity now. + if self.reconnect: + # cancel an automated retry if one is pending + self.reconnect.cancel() + self.reconnect = None + self.connectstate = 'connecting' + eventlet.spawn(self._connect) - def detachuser(self, username): - self.users[username] -= 1 - if self.users[username] < 2: - edata = self.users[username] - else: - edata = 2 - if edata < 0: - _tracelog.log('client count negative' + traceback.format_exc(), - ltype=log.DataTypes.event, - event=log.Events.stacktrace) - edata = 0 + + + def detachsession(self, session): + edata = 0 + self.livesessions.discard(session) + for currsession in self.livesessions: + if currsession.username == session.username: + edata += 1 + if edata > 1: # don't bother counting beyond 2 in the log + break self.log( - logdata=username, ltype=log.DataTypes.event, + logdata=session.username, ltype=log.DataTypes.event, event=log.Events.clientdisconnect, eventdata=edata) + self._send_rcpts({'clientcount': len(self.livesessions)}) + if self._isondemand and not self.livesessions: + self._disconnect() + def reopen(self): self._got_disconnected() @@ -385,11 +363,12 @@ class ConsoleHandler(object): self._send_rcpts(data) def _send_rcpts(self, data): - for rcpt in self.rcpts.itervalues(): + for rcpt in self.livesessions: try: - rcpt(data) + rcpt.data_handler(data) except: # No matter the reason, advance to next recipient - pass + _tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event, + event=log.Events.stacktrace) def get_recent(self): """Retrieve 'recent' data @@ -401,7 +380,7 @@ class ConsoleHandler(object): # a scheme always tracking the last clear screen would be too costly connstate = { 'connectstate': self.connectstate, - 'clientcount': self.clientcount, + 'clientcount': len(self.livesessions), } retdata = '' if self.shiftin is not None: # detected that terminal requested a @@ -504,22 +483,23 @@ class ConsoleSession(object): self.configmanager = configmanager self.connect_session() self.registered = True - self.conshdl.attachuser(self.username) self._evt = None self.node = node self.write = self.conshdl.write if datacallback is None: self.reaper = eventlet.spawn_after(15, self.destroy) self.databuffer = collections.deque([]) - self.reghdl = self.conshdl.register_rcpt(self.got_data) + self.data_handler = self.got_data if not skipreplay: self.databuffer.extend(self.conshdl.get_recent()) else: - self.reghdl = self.conshdl.register_rcpt(datacallback) + self.data_handler = datacallback if not skipreplay: for recdata in self.conshdl.get_recent(): if recdata: datacallback(recdata) + self.conshdl.attachsession(self) + def connect_session(self): """Connect to the appropriate backend handler @@ -554,8 +534,7 @@ class ConsoleSession(object): def destroy(self): if self.registered: - self.conshdl.detachuser(self.username) - self.conshdl.unregister_rcpt(self.reghdl) + self.conshdl.detachsession(self) self.databuffer = None self._evt = None self.reghdl = None