diff --git a/pyghmi/ipmi/private/session.py b/pyghmi/ipmi/private/session.py index e9d3e5ad..721fdacc 100644 --- a/pyghmi/ipmi/private/session.py +++ b/pyghmi/ipmi/private/session.py @@ -31,8 +31,14 @@ from Crypto.Cipher import AES import pyghmi.exceptions as exc from pyghmi.ipmi.private import constants +from pyghmi.ipmi.private import util from pyghmi.ipmi.private.util import get_ipmi_error, _monotonic_time + +KEEPALIVE_SESSIONS = threading.RLock() +WAITING_SESSIONS = threading.RLock() + + try: dict.iteritems @@ -381,11 +387,12 @@ class Session(object): @classmethod def _is_session_valid(cls, session): - sess = cls.keepalive_sessions.get(session, None) - if sess is not None and 'timeout' in sess: - if sess['timeout'] < _monotonic_time(): - # session would have timed out by now, don't use it - return False + with util.protect(KEEPALIVE_SESSIONS): + sess = cls.keepalive_sessions.get(session, None) + if sess is not None and 'timeout' in sess: + if sess['timeout'] < _monotonic_time(): + # session would have timed out by now, don't use it + return False return True def __new__(cls, @@ -490,8 +497,10 @@ class Session(object): def _mark_broken(self, error=None): # since our connection has failed retries # deregister our keepalive facility - Session.keepalive_sessions.pop(self, None) - Session.waiting_sessions.pop(self, None) + with util.protect(KEEPALIVE_SESSIONS): + Session.keepalive_sessions.pop(self, None) + with util.protect(WAITING_SESSIONS): + Session.waiting_sessions.pop(self, None) self.logging = False self.errormsg = error if self.logged: @@ -690,10 +699,11 @@ class Session(object): waiter = self.iterwaiters.pop() waiter({'success': True}) self.process_pktqueue() - if (self in self.waiting_sessions and - self.expiration < _monotonic_time()): - self.waiting_sessions.pop(self, None) - self._timedout() + with util.protect(WAITING_SESSIONS): + if (self in self.waiting_sessions and + self.expiration < _monotonic_time()): + self.waiting_sessions.pop(self, None) + self._timedout() def raw_command(self, netfn, @@ -856,11 +866,12 @@ class Session(object): self.netpacket = struct.pack("!%dB" % len(message), *message) # advance idle timer since we don't need keepalive while sending # packets out naturally - if (self in Session.keepalive_sessions and not needskeepalive and - not self._customkeepalives): - Session.keepalive_sessions[self]['timeout'] = _monotonic_time() + \ - MAX_IDLE - (random.random() * 4.9) - self._xmit_packet(retry, delay_xmit=delay_xmit, timeout=timeout) + with util.protect(KEEPALIVE_SESSIONS): + if (self in Session.keepalive_sessions and not needskeepalive and + not self._customkeepalives): + Session.keepalive_sessions[self]['timeout'] = \ + _monotonic_time() + MAX_IDLE - (random.random() * 4.9) + self._xmit_packet(retry, delay_xmit=delay_xmit, timeout=timeout) def _ipmi15authcode(self, payload, checkremotecode=False): # checkremotecode is used to verify remote code, @@ -970,10 +981,11 @@ class Session(object): self.onlogon({'error': errstr}) return self.logging = False - Session.keepalive_sessions[self] = {} - Session.keepalive_sessions[self]['ipmisession'] = self - Session.keepalive_sessions[self]['timeout'] = _monotonic_time() + \ - MAX_IDLE - (random.random() * 4.9) + with util.protect(KEEPALIVE_SESSIONS): + Session.keepalive_sessions[self] = {} + Session.keepalive_sessions[self]['ipmisession'] = self + Session.keepalive_sessions[self]['timeout'] = _monotonic_time() + \ + MAX_IDLE - (random.random() * 4.9) self.onlogon({'success': True}) def _get_session_challenge(self): @@ -1061,22 +1073,26 @@ class Session(object): # no more time than that, so that whatever part(ies) need to service in # a deadline, will be honored if timeout != 0: - for session, parms in dictitems(cls.waiting_sessions): - if parms['timeout'] <= curtime: - timeout = 0 # exit after one guaranteed pass - break - if (timeout is not None and - timeout < parms['timeout'] - curtime): - continue # timeout smaller than the current session needs - timeout = parms['timeout'] - curtime # set new timeout value - for session, parms in dictitems(cls.keepalive_sessions): - if parms['timeout'] <= curtime: - timeout = 0 - break - if (timeout is not None and - timeout < parms['timeout'] - curtime): - continue - timeout = parms['timeout'] - curtime + with util.protect(WAITING_SESSIONS): + for session, parms in dictitems(cls.waiting_sessions): + if parms['timeout'] <= curtime: + timeout = 0 # exit after one guaranteed pass + break + if (timeout is not None and + timeout < parms['timeout'] - curtime): + continue # timeout smaller than the current session + # needs + timeout = parms['timeout'] - curtime # set new timeout + # value + with util.protect(KEEPALIVE_SESSIONS): + for session, parms in dictitems(cls.keepalive_sessions): + if parms['timeout'] <= curtime: + timeout = 0 + break + if (timeout is not None and + timeout < parms['timeout'] - curtime): + continue + timeout = parms['timeout'] - curtime # If the loop above found no sessions wanting *and* the caller had no # timeout, exit function. In this case there is no way a session # could be waiting so we can always return 0 @@ -1096,29 +1112,31 @@ class Session(object): relsession.process_pktqueue() sessionstodel = [] sessionstokeepalive = [] - for session, parms in dictitems(cls.keepalive_sessions): - # if the session is busy inside a command, defer invoking keepalive - # until incommand is no longer the case - if parms['timeout'] < curtime and not session._isincommand(): - cls.keepalive_sessions[session]['timeout'] = \ - _monotonic_time() + MAX_IDLE - (random.random() * 4.9) - sessionstokeepalive.append(session) + with util.protect(KEEPALIVE_SESSIONS): + for session, parms in dictitems(cls.keepalive_sessions): + # if the session is busy inside a command, defer invoking + # keepalive until incommand is no longer the case + if parms['timeout'] < curtime and not session._isincommand(): + cls.keepalive_sessions[session]['timeout'] = \ + _monotonic_time() + MAX_IDLE - (random.random() * 4.9) + sessionstokeepalive.append(session) for session in sessionstokeepalive: session._keepalive() - for session, parms in dictitems(cls.waiting_sessions): - if parms['timeout'] < curtime: # timeout has expired, time to - # give up on it and trigger timeout - # response in the respective session - # defer deletion until after loop - sessionstodel.append(session) - # to avoid confusing the for loop - for session in sessionstodel: - cls.waiting_sessions.pop(session, None) - # one loop iteration to make sure recursion doesn't induce redundant - # timeouts - for session in sessionstodel: - session._timedout() - return len(cls.waiting_sessions) + with util.protect(WAITING_SESSIONS): + for session, parms in dictitems(cls.waiting_sessions): + if parms['timeout'] < curtime: # timeout has expired, time to + # give up on it and trigger timeout + # response in the respective session + # defer deletion until after loop + sessionstodel.append(session) + # to avoid confusing the for loop + for session in sessionstodel: + cls.waiting_sessions.pop(session, None) + # one loop iteration to make sure recursion doesn't induce + # redundant timeouts + for session in sessionstodel: + session._timedout() + return len(cls.waiting_sessions) def register_keepalive(self, cmd, callback): """Register custom keepalive IPMI command @@ -1331,7 +1349,8 @@ class Session(object): # stop the generic retry behavior here self.lastpayload = None self.last_payload_type = None - Session.waiting_sessions.pop(self, None) + with util.protect(WAITING_SESSIONS): + Session.waiting_sessions.pop(self, None) if len(self.pendingpayloads) > 0: (nextpayload, nextpayloadtype, retry) = \ self.pendingpayloads.popleft() @@ -1552,7 +1571,8 @@ class Session(object): if not self.servermode: self.seqlun += 4 # prepare seqlun for next transmit self.seqlun &= 0xff # when overflowing, wrap around - Session.waiting_sessions.pop(self, None) + with util.protect(WAITING_SESSIONS): + Session.waiting_sessions.pop(self, None) # render retry mechanism utterly incapable of # doing anything, though it shouldn't matter self.lastpayload = None @@ -1625,10 +1645,11 @@ class Session(object): # special, otherwise increment self.sequencenumber += 1 if delay_xmit is not None: - Session.waiting_sessions[self] = {} - Session.waiting_sessions[self]['ipmisession'] = self - self.expiration = delay_xmit + _monotonic_time() - Session.waiting_sessions[self]['timeout'] = self.expiration + with util.protect(WAITING_SESSIONS): + Session.waiting_sessions[self] = {} + Session.waiting_sessions[self]['ipmisession'] = self + self.expiration = delay_xmit + _monotonic_time() + Session.waiting_sessions[self]['timeout'] = self.expiration return # skip transmit, let retry timer do it's thing if self.sockaddr: _io_sendto(self.socket, self.netpacket, self.sockaddr) @@ -1653,13 +1674,14 @@ class Session(object): raise exc.IpmiException( "Unable to transmit to specified address") if retry: - Session.waiting_sessions[self] = {} - Session.waiting_sessions[self]['ipmisession'] = self - if timeout is not None: - self.expiration = timeout + _monotonic_time() - else: - self.expiration = self.timeout + _monotonic_time() - Session.waiting_sessions[self]['timeout'] = self.expiration + with util.protect(WAITING_SESSIONS): + Session.waiting_sessions[self] = {} + Session.waiting_sessions[self]['ipmisession'] = self + if timeout is not None: + self.expiration = timeout + _monotonic_time() + else: + self.expiration = self.timeout + _monotonic_time() + Session.waiting_sessions[self]['timeout'] = self.expiration def logout(self): if not self.logged: @@ -1672,7 +1694,8 @@ class Session(object): struct.pack("I", self.sessionid)), retry=False) # stop trying for a keepalive, - Session.keepalive_sessions.pop(self, None) + with util.protect(KEEPALIVE_SESSIONS): + Session.keepalive_sessions.pop(self, None) self.logged = 0 self.logging = False self._customkeepalives = None