mirror of
https://opendev.org/x/pyghmi
synced 2025-08-19 09:30:20 +00:00
Rework wait behavior
Rework the thread behavior and raw_command to avoid a ton of needless wakeups for unrelated concurrent execution. It is saving about 20% of CPU time on ~50 node run. The savings should scale proportionate to concurrency. Change-Id: Iff3eaacd6c92d5734e1d1eb19e7be53e269e42cc
This commit is contained in:
@@ -27,7 +27,6 @@ import select
|
||||
import socket
|
||||
import struct
|
||||
import threading
|
||||
import traceback
|
||||
|
||||
from Crypto.Cipher import AES
|
||||
|
||||
@@ -69,6 +68,7 @@ def define_worker():
|
||||
global iothreadready
|
||||
global selectdeadline
|
||||
iowaiters = []
|
||||
directediowaiters = {}
|
||||
timeout = 300
|
||||
iothreadready = True
|
||||
while iothreadwaiters:
|
||||
@@ -84,45 +84,51 @@ def define_worker():
|
||||
# this avoids other threads making a bad assumption
|
||||
# about not having to break into the select
|
||||
selectdeadline = _monotonic_time() + 300
|
||||
_io_graball(iosockets)
|
||||
for w in iowaiters:
|
||||
w[3].set()
|
||||
iowaiters = []
|
||||
timeout = 300
|
||||
sockaddrs = _io_graball(iosockets, directediowaiters)
|
||||
for w in iowaiters:
|
||||
w[1].set()
|
||||
iowaiters = []
|
||||
for d in directediowaiters:
|
||||
# these are the existing waiters that didn't get
|
||||
# satisfied last graball, allow them to set a new
|
||||
# deadline if they still have time waiting, or
|
||||
# if they have expired, wake them now to let them
|
||||
# process their timeout
|
||||
for w in directediowaiters[d]:
|
||||
ltimeout = w[0] - _monotonic_time()
|
||||
if ltimeout < 0:
|
||||
w[1].set() # time is up, wake the caller
|
||||
elif ltimeout < timeout:
|
||||
timeout = ltimeout
|
||||
while ioqueue:
|
||||
workitem = ioqueue.popleft()
|
||||
# order: function, args, list to append to , event to set
|
||||
if isinstance(workitem[1], tuple): # positional arguments
|
||||
try:
|
||||
workitem[2].append(workitem[0](*workitem[1]))
|
||||
except Exception:
|
||||
traceback.print_exc()
|
||||
workitem[3].set()
|
||||
elif isinstance(workitem[1], dict):
|
||||
try:
|
||||
workitem[2].append(workitem[0](**workitem[1]))
|
||||
except Exception:
|
||||
traceback.print_exc()
|
||||
workitem[3].set()
|
||||
elif workitem[0] == 'wait':
|
||||
if pktqueue:
|
||||
workitem[3].set()
|
||||
else:
|
||||
ltimeout = workitem[1] - _monotonic_time()
|
||||
if ltimeout < timeout:
|
||||
timeout = ltimeout
|
||||
if workitem[2] is None and sockaddrs:
|
||||
workitem[1].set()
|
||||
elif workitem[2] in sockaddrs:
|
||||
workitem[1].set()
|
||||
else:
|
||||
ltimeout = workitem[0] - _monotonic_time()
|
||||
if ltimeout < timeout:
|
||||
timeout = ltimeout
|
||||
if workitem[2] is None:
|
||||
iowaiters.append(workitem)
|
||||
else:
|
||||
if workitem[2] in directediowaiters:
|
||||
directediowaiters[workitem[2]].append(workitem)
|
||||
else:
|
||||
directediowaiters[workitem[2]] = [workitem]
|
||||
return _IOWorker
|
||||
|
||||
|
||||
pktqueue = collections.deque([])
|
||||
sessionqueue = collections.deque([])
|
||||
|
||||
|
||||
def _io_wait(timeout):
|
||||
def _io_wait(timeout, myaddr=None):
|
||||
evt = threading.Event()
|
||||
result = []
|
||||
deadline = timeout + _monotonic_time()
|
||||
ioqueue.append(('wait', deadline, result, evt))
|
||||
ioqueue.append((deadline, evt, myaddr))
|
||||
# Unfortunately, at least with eventlet patched threading, the wait()
|
||||
# is a somewhat busy wait if given a deadline. Workaround by having
|
||||
# it piggy back on the select() in the io thread, which is a truly
|
||||
@@ -141,20 +147,38 @@ def _io_sendto(mysocket, packet, sockaddr):
|
||||
pass
|
||||
|
||||
|
||||
def _io_graball(mysockets):
|
||||
for mysocket in mysockets:
|
||||
while True:
|
||||
rdata = _io_recvfrom(mysocket, 3000)
|
||||
if rdata is None:
|
||||
break
|
||||
# If the payload is shorter than 4 bytes, it cannot
|
||||
# be a useful packet. Skip it entirely.
|
||||
# This applies to the packet sent to self to break
|
||||
# into the select
|
||||
if len(rdata[0]) < 4:
|
||||
continue
|
||||
rdata = rdata + (mysocket,)
|
||||
pktqueue.append(rdata)
|
||||
def _io_graball(mysockets, iowaiters):
|
||||
sockaddrs = []
|
||||
for mysocket in mysockets:
|
||||
while True:
|
||||
rdata = _io_recvfrom(mysocket, 3000)
|
||||
if rdata is None:
|
||||
break
|
||||
# If the payload is shorter than 4 bytes, it cannot
|
||||
# be a useful packet. Skip it entirely.
|
||||
# This applies to the packet sent to self to break
|
||||
# into the select
|
||||
if len(rdata[0]) < 4:
|
||||
continue
|
||||
rdata = rdata + (mysocket,)
|
||||
relsession = None
|
||||
if rdata[1] in Session.bmc_handlers:
|
||||
# session data
|
||||
rdata = rdata + (True,)
|
||||
relsession = Session.bmc_handlers[rdata[1]]
|
||||
elif rdata[2] in Session.bmc_handlers:
|
||||
# pyghmi is the bmc, and we have sessionless data
|
||||
rdata = rdata + (False,)
|
||||
relsession = Session.bmc_handlers[rdata[2]]
|
||||
if relsession is not None:
|
||||
relsession.pktqueue.append(rdata)
|
||||
sessionqueue.append(relsession)
|
||||
if rdata[1] in iowaiters:
|
||||
for w in iowaiters[rdata[1]]:
|
||||
w[1].set()
|
||||
del iowaiters[rdata[1]]
|
||||
sockaddrs.append(rdata[1])
|
||||
return sockaddrs
|
||||
|
||||
|
||||
def _io_recvfrom(mysocket, size):
|
||||
@@ -193,10 +217,10 @@ def _monotonic_time():
|
||||
|
||||
|
||||
def _poller(timeout=0):
|
||||
if pktqueue:
|
||||
if sessionqueue:
|
||||
return True
|
||||
_io_wait(timeout)
|
||||
return pktqueue
|
||||
return sessionqueue
|
||||
|
||||
|
||||
def _aespad(data):
|
||||
@@ -384,6 +408,10 @@ class Session(object):
|
||||
self._customkeepalives = None
|
||||
self.bmc = bmc
|
||||
self.broken = False
|
||||
# a private queue for packets for which this session handler
|
||||
# is destined to receive
|
||||
self.pktqueue = collections.deque([])
|
||||
|
||||
try:
|
||||
self.userid = userid.encode('utf-8')
|
||||
self.password = password.encode('utf-8')
|
||||
@@ -595,6 +623,23 @@ class Session(object):
|
||||
incrementtime += 1
|
||||
return cumulativetime + 1
|
||||
|
||||
def _cmdwait(self):
|
||||
while self._isincommand():
|
||||
_io_wait(self._isincommand(), self.sockaddr)
|
||||
|
||||
def awaitresponse(self, retry):
|
||||
while retry and self.lastresponse is None and self.logged:
|
||||
timeout = self.expiration - _monotonic_time()
|
||||
_io_wait(timeout, self.sockaddr)
|
||||
while self.iterwaiters:
|
||||
waiter = self.iterwaiters.pop()
|
||||
waiter({'success': True})
|
||||
self.process_pktqueue()
|
||||
if (self in self.waiting_sessions and
|
||||
self.expiration < _monotonic_time()):
|
||||
self.waiting_sessions.pop(self, None)
|
||||
self._timedout()
|
||||
|
||||
def raw_command(self,
|
||||
netfn,
|
||||
command,
|
||||
@@ -605,8 +650,7 @@ class Session(object):
|
||||
timeout=None):
|
||||
if not self.logged:
|
||||
raise exc.IpmiException('Session no longer connected')
|
||||
while self._isincommand():
|
||||
Session.wait_for_rsp(self._isincommand())
|
||||
self._cmdwait()
|
||||
if not self.logged:
|
||||
raise exc.IpmiException('Session no longer connected')
|
||||
self.incommand = _monotonic_time() + self._getmaxtimeout()
|
||||
@@ -627,8 +671,7 @@ class Session(object):
|
||||
#of only the constructor needing a callback. From then on,
|
||||
#synchronous usage of the class acts in a greenthread style governed by
|
||||
#order of data on the network
|
||||
while retry and self.lastresponse is None and self.logged:
|
||||
Session.wait_for_rsp(timeout=timeout)
|
||||
self.awaitresponse(retry)
|
||||
lastresponse = self.lastresponse
|
||||
self.incommand = False
|
||||
if retry and lastresponse is None:
|
||||
@@ -982,9 +1025,9 @@ class Session(object):
|
||||
if timeout is None:
|
||||
return 0
|
||||
if _poller(timeout=timeout):
|
||||
while pktqueue:
|
||||
(data, sockaddr, mysocket) = pktqueue.popleft()
|
||||
cls._route_ipmiresponse(sockaddr, data, mysocket)
|
||||
while sessionqueue:
|
||||
relsession = sessionqueue.popleft()
|
||||
relsession.process_pktqueue()
|
||||
sessionstodel = []
|
||||
sessionstokeepalive = []
|
||||
for session, parms in cls.keepalive_sessions.iteritems():
|
||||
@@ -1067,6 +1110,16 @@ class Session(object):
|
||||
except exc.IpmiException:
|
||||
self._mark_broken()
|
||||
|
||||
def process_pktqueue(self):
|
||||
while self.pktqueue:
|
||||
pkt = self.pktqueue.popleft()
|
||||
if not pkt[0][0] == '\x06' and pkt[0][2:4] == '\xff\x07':
|
||||
continue
|
||||
if pkt[1] in self.bmc_handlers:
|
||||
self._handle_ipmi_packet(pkt[0], sockaddr=pkt[1])
|
||||
elif pkt[2] in self.bmc_handlers:
|
||||
self.sessionless_data(pkt[0], pkt[1])
|
||||
|
||||
@classmethod
|
||||
def _route_ipmiresponse(cls, sockaddr, data, mysocket):
|
||||
if not (data[0] == '\x06' and data[2:4] == '\xff\x07'): # not ipmi
|
||||
@@ -1491,8 +1544,8 @@ class Session(object):
|
||||
if delay_xmit is not None:
|
||||
Session.waiting_sessions[self] = {}
|
||||
Session.waiting_sessions[self]['ipmisession'] = self
|
||||
Session.waiting_sessions[self]['timeout'] = delay_xmit + \
|
||||
_monotonic_time()
|
||||
self.expiration = delay_xmit + _monotonic_time()
|
||||
Session.waiting_sessions[self]['timeout'] = self.expiration
|
||||
return # skip transmit, let retry timer do it's thing
|
||||
if self.sockaddr:
|
||||
_io_sendto(self.socket, self.netpacket, self.sockaddr)
|
||||
@@ -1519,11 +1572,10 @@ class Session(object):
|
||||
Session.waiting_sessions[self] = {}
|
||||
Session.waiting_sessions[self]['ipmisession'] = self
|
||||
if timeout is not None:
|
||||
Session.waiting_sessions[self]['timeout'] = timeout + \
|
||||
_monotonic_time()
|
||||
self.expiration = timeout + _monotonic_time()
|
||||
else:
|
||||
Session.waiting_sessions[self]['timeout'] = self.timeout + \
|
||||
_monotonic_time()
|
||||
self.expiration = self.timeout + _monotonic_time()
|
||||
Session.waiting_sessions[self]['timeout'] = self.expiration
|
||||
|
||||
def logout(self):
|
||||
if not self.logged:
|
||||
|
Reference in New Issue
Block a user