2
0
mirror of https://opendev.org/x/pyghmi synced 2025-01-28 11:57:34 +00:00

Protect Session.*_sessions operations

Multi-thread application like virshbmc can break
Session.keepalive_sessions and Session.waiting_sessions
with their concurrent operations.
This patch adds exclusive locks to avoid the problem.

Change-Id: I191ed066f02fd329da5a210e2383fa72481916ef
This commit is contained in:
Akira Yoshiyama 2017-07-12 11:20:23 +09:00
parent 80db136674
commit a49bb1da9d

View File

@ -31,8 +31,14 @@ from Crypto.Cipher import AES
import pyghmi.exceptions as exc
from pyghmi.ipmi.private import constants
from pyghmi.ipmi.private import util
from pyghmi.ipmi.private.util import get_ipmi_error, _monotonic_time
KEEPALIVE_SESSIONS = threading.RLock()
WAITING_SESSIONS = threading.RLock()
try:
dict.iteritems
@ -381,11 +387,12 @@ class Session(object):
@classmethod
def _is_session_valid(cls, session):
sess = cls.keepalive_sessions.get(session, None)
if sess is not None and 'timeout' in sess:
if sess['timeout'] < _monotonic_time():
# session would have timed out by now, don't use it
return False
with util.protect(KEEPALIVE_SESSIONS):
sess = cls.keepalive_sessions.get(session, None)
if sess is not None and 'timeout' in sess:
if sess['timeout'] < _monotonic_time():
# session would have timed out by now, don't use it
return False
return True
def __new__(cls,
@ -490,8 +497,10 @@ class Session(object):
def _mark_broken(self, error=None):
# since our connection has failed retries
# deregister our keepalive facility
Session.keepalive_sessions.pop(self, None)
Session.waiting_sessions.pop(self, None)
with util.protect(KEEPALIVE_SESSIONS):
Session.keepalive_sessions.pop(self, None)
with util.protect(WAITING_SESSIONS):
Session.waiting_sessions.pop(self, None)
self.logging = False
self.errormsg = error
if self.logged:
@ -690,10 +699,11 @@ class Session(object):
waiter = self.iterwaiters.pop()
waiter({'success': True})
self.process_pktqueue()
if (self in self.waiting_sessions and
self.expiration < _monotonic_time()):
self.waiting_sessions.pop(self, None)
self._timedout()
with util.protect(WAITING_SESSIONS):
if (self in self.waiting_sessions and
self.expiration < _monotonic_time()):
self.waiting_sessions.pop(self, None)
self._timedout()
def raw_command(self,
netfn,
@ -856,11 +866,12 @@ class Session(object):
self.netpacket = struct.pack("!%dB" % len(message), *message)
# advance idle timer since we don't need keepalive while sending
# packets out naturally
if (self in Session.keepalive_sessions and not needskeepalive and
not self._customkeepalives):
Session.keepalive_sessions[self]['timeout'] = _monotonic_time() + \
MAX_IDLE - (random.random() * 4.9)
self._xmit_packet(retry, delay_xmit=delay_xmit, timeout=timeout)
with util.protect(KEEPALIVE_SESSIONS):
if (self in Session.keepalive_sessions and not needskeepalive and
not self._customkeepalives):
Session.keepalive_sessions[self]['timeout'] = \
_monotonic_time() + MAX_IDLE - (random.random() * 4.9)
self._xmit_packet(retry, delay_xmit=delay_xmit, timeout=timeout)
def _ipmi15authcode(self, payload, checkremotecode=False):
# checkremotecode is used to verify remote code,
@ -970,10 +981,11 @@ class Session(object):
self.onlogon({'error': errstr})
return
self.logging = False
Session.keepalive_sessions[self] = {}
Session.keepalive_sessions[self]['ipmisession'] = self
Session.keepalive_sessions[self]['timeout'] = _monotonic_time() + \
MAX_IDLE - (random.random() * 4.9)
with util.protect(KEEPALIVE_SESSIONS):
Session.keepalive_sessions[self] = {}
Session.keepalive_sessions[self]['ipmisession'] = self
Session.keepalive_sessions[self]['timeout'] = _monotonic_time() + \
MAX_IDLE - (random.random() * 4.9)
self.onlogon({'success': True})
def _get_session_challenge(self):
@ -1061,22 +1073,26 @@ class Session(object):
# no more time than that, so that whatever part(ies) need to service in
# a deadline, will be honored
if timeout != 0:
for session, parms in dictitems(cls.waiting_sessions):
if parms['timeout'] <= curtime:
timeout = 0 # exit after one guaranteed pass
break
if (timeout is not None and
timeout < parms['timeout'] - curtime):
continue # timeout smaller than the current session needs
timeout = parms['timeout'] - curtime # set new timeout value
for session, parms in dictitems(cls.keepalive_sessions):
if parms['timeout'] <= curtime:
timeout = 0
break
if (timeout is not None and
timeout < parms['timeout'] - curtime):
continue
timeout = parms['timeout'] - curtime
with util.protect(WAITING_SESSIONS):
for session, parms in dictitems(cls.waiting_sessions):
if parms['timeout'] <= curtime:
timeout = 0 # exit after one guaranteed pass
break
if (timeout is not None and
timeout < parms['timeout'] - curtime):
continue # timeout smaller than the current session
# needs
timeout = parms['timeout'] - curtime # set new timeout
# value
with util.protect(KEEPALIVE_SESSIONS):
for session, parms in dictitems(cls.keepalive_sessions):
if parms['timeout'] <= curtime:
timeout = 0
break
if (timeout is not None and
timeout < parms['timeout'] - curtime):
continue
timeout = parms['timeout'] - curtime
# If the loop above found no sessions wanting *and* the caller had no
# timeout, exit function. In this case there is no way a session
# could be waiting so we can always return 0
@ -1096,29 +1112,31 @@ class Session(object):
relsession.process_pktqueue()
sessionstodel = []
sessionstokeepalive = []
for session, parms in dictitems(cls.keepalive_sessions):
# if the session is busy inside a command, defer invoking keepalive
# until incommand is no longer the case
if parms['timeout'] < curtime and not session._isincommand():
cls.keepalive_sessions[session]['timeout'] = \
_monotonic_time() + MAX_IDLE - (random.random() * 4.9)
sessionstokeepalive.append(session)
with util.protect(KEEPALIVE_SESSIONS):
for session, parms in dictitems(cls.keepalive_sessions):
# if the session is busy inside a command, defer invoking
# keepalive until incommand is no longer the case
if parms['timeout'] < curtime and not session._isincommand():
cls.keepalive_sessions[session]['timeout'] = \
_monotonic_time() + MAX_IDLE - (random.random() * 4.9)
sessionstokeepalive.append(session)
for session in sessionstokeepalive:
session._keepalive()
for session, parms in dictitems(cls.waiting_sessions):
if parms['timeout'] < curtime: # timeout has expired, time to
# give up on it and trigger timeout
# response in the respective session
# defer deletion until after loop
sessionstodel.append(session)
# to avoid confusing the for loop
for session in sessionstodel:
cls.waiting_sessions.pop(session, None)
# one loop iteration to make sure recursion doesn't induce redundant
# timeouts
for session in sessionstodel:
session._timedout()
return len(cls.waiting_sessions)
with util.protect(WAITING_SESSIONS):
for session, parms in dictitems(cls.waiting_sessions):
if parms['timeout'] < curtime: # timeout has expired, time to
# give up on it and trigger timeout
# response in the respective session
# defer deletion until after loop
sessionstodel.append(session)
# to avoid confusing the for loop
for session in sessionstodel:
cls.waiting_sessions.pop(session, None)
# one loop iteration to make sure recursion doesn't induce
# redundant timeouts
for session in sessionstodel:
session._timedout()
return len(cls.waiting_sessions)
def register_keepalive(self, cmd, callback):
"""Register custom keepalive IPMI command
@ -1331,7 +1349,8 @@ class Session(object):
# stop the generic retry behavior here
self.lastpayload = None
self.last_payload_type = None
Session.waiting_sessions.pop(self, None)
with util.protect(WAITING_SESSIONS):
Session.waiting_sessions.pop(self, None)
if len(self.pendingpayloads) > 0:
(nextpayload, nextpayloadtype, retry) = \
self.pendingpayloads.popleft()
@ -1552,7 +1571,8 @@ class Session(object):
if not self.servermode:
self.seqlun += 4 # prepare seqlun for next transmit
self.seqlun &= 0xff # when overflowing, wrap around
Session.waiting_sessions.pop(self, None)
with util.protect(WAITING_SESSIONS):
Session.waiting_sessions.pop(self, None)
# render retry mechanism utterly incapable of
# doing anything, though it shouldn't matter
self.lastpayload = None
@ -1625,10 +1645,11 @@ class Session(object):
# special, otherwise increment
self.sequencenumber += 1
if delay_xmit is not None:
Session.waiting_sessions[self] = {}
Session.waiting_sessions[self]['ipmisession'] = self
self.expiration = delay_xmit + _monotonic_time()
Session.waiting_sessions[self]['timeout'] = self.expiration
with util.protect(WAITING_SESSIONS):
Session.waiting_sessions[self] = {}
Session.waiting_sessions[self]['ipmisession'] = self
self.expiration = delay_xmit + _monotonic_time()
Session.waiting_sessions[self]['timeout'] = self.expiration
return # skip transmit, let retry timer do it's thing
if self.sockaddr:
_io_sendto(self.socket, self.netpacket, self.sockaddr)
@ -1653,13 +1674,14 @@ class Session(object):
raise exc.IpmiException(
"Unable to transmit to specified address")
if retry:
Session.waiting_sessions[self] = {}
Session.waiting_sessions[self]['ipmisession'] = self
if timeout is not None:
self.expiration = timeout + _monotonic_time()
else:
self.expiration = self.timeout + _monotonic_time()
Session.waiting_sessions[self]['timeout'] = self.expiration
with util.protect(WAITING_SESSIONS):
Session.waiting_sessions[self] = {}
Session.waiting_sessions[self]['ipmisession'] = self
if timeout is not None:
self.expiration = timeout + _monotonic_time()
else:
self.expiration = self.timeout + _monotonic_time()
Session.waiting_sessions[self]['timeout'] = self.expiration
def logout(self):
if not self.logged:
@ -1672,7 +1694,8 @@ class Session(object):
struct.pack("I", self.sessionid)),
retry=False)
# stop trying for a keepalive,
Session.keepalive_sessions.pop(self, None)
with util.protect(KEEPALIVE_SESSIONS):
Session.keepalive_sessions.pop(self, None)
self.logged = 0
self.logging = False
self._customkeepalives = None