2
0
mirror of https://opendev.org/x/pyghmi synced 2025-01-28 11:57:34 +00:00

Avoid looping select() on sockets repeatedly

When select() would identify a a socket, it would
potentially call select() on the same socket
before a recvfrom() would happen.  In python 2.7,
this caused the IO thread to block other threads
waiting on something the other threads needed to
do.  Resolve by explicitly ignoring a socket
where recvfrom() will be pending until recvfrom()
is next called.  This reduces one test case from
42,000-47,000 select() calls to just 86.

Change-Id: Ic8ebecfc61d048e537b5d76a6a3f0665fd340a3d
This commit is contained in:
Jarrod Johnson 2014-05-09 12:51:24 -04:00
parent 0dd388686e
commit de8e63883d
2 changed files with 34 additions and 21 deletions

View File

@ -36,8 +36,9 @@ newtcattr[-1][termios.VINTR] = 0
newtcattr[-1][termios.VSUSP] = 0
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, newtcattr)
tty.setcbreak(sys.stdin.fileno())
fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
tty.setraw(sys.stdin.fileno())
currfl = fcntl.fcntl(sys.stdin.fileno(), fcntl.F_GETFL)
fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, currfl | os.O_NONBLOCK)
passwd = os.environ['IPMIPASSWORD']
@ -55,14 +56,24 @@ def _doinput():
def _print(data):
bailout = False
if type(data) not in (str, unicode):
bailout = True
data = repr(data)
sys.stdout.write(data)
sys.stdout.flush()
if bailout:
raise Exception(data)
try:
sol = console.Console(bmc=sys.argv[1], userid=sys.argv[2], password=passwd,
iohandler=_print, force=True)
inputthread = threading.Thread(target=_doinput)
inputthread.daemon = True
inputthread.start()
sol.main_loop()
finally:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, tcattr)
except:
currfl = fcntl.fcntl(sys.stdin.fileno(), fcntl.F_GETFL)
fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, currfl ^ os.O_NONBLOCK)
termios.tcsetattr(sys.stdin, termios.TCSANOW, tcattr)
sys.exit(0)

View File

@ -46,6 +46,8 @@ iothread = None # the thread in which all IO will be performed
# the nature of things.
iothreadready = False # whether io thread is yet ready to work
iothreadwaiters = [] # threads waiting for iothreadready
ignoresockets = set() # between 'select' firing and 'recvfrom', a socket
# should be ignored
ioqueue = collections.deque([])
selectbreak = None
selectdeadline = 0
@ -72,7 +74,13 @@ def _ioworker():
if timeout < 0:
timeout = 0
selectdeadline = _monotonic_time() + timeout
mysockets = iosockets + [selectbreak[0]]
if ignoresockets:
mysockets = [selectbreak[0]]
for pendingsocket in iosockets:
if pendingsocket not in ignoresockets:
mysockets.append(pendingsocket)
else:
mysockets = iosockets + [selectbreak[0]]
tmplist, _, _ = select.select(mysockets, (), (), timeout)
# pessimistically move out the deadline
# doing it this early (before ioqueue is evaluated)
@ -90,6 +98,7 @@ def _ioworker():
# was the endgame
pass
else:
ignoresockets.add(handle)
rdylist.append(handle)
for w in iowaiters:
w[2].append(tuple(rdylist))
@ -135,6 +144,7 @@ def _io_sendto(mysocket, packet, sockaddr):
def _io_recvfrom(mysocket, size):
mysocket.setblocking(0)
ignoresockets.discard(mysocket)
try:
return mysocket.recvfrom(size)
except socket.error:
@ -153,7 +163,7 @@ def _monotonic_time():
return os.times()[4]
def _poller(readhandles, timeout=0):
def _poller(timeout=0):
rdylist = _io_apply('wait', timeout + _monotonic_time())
return rdylist
@ -895,22 +905,14 @@ class Session(object):
timeout = 0
if timeout is None:
return 0
if selectbreak is None:
mysockets = iosockets
else:
mysockets = [iosockets + [selectbreak[0]]]
rdylist = _poller(mysockets, timeout=timeout)
rdylist = _poller(timeout=timeout)
if len(rdylist) > 0:
while _poller(iosockets): # if the somewhat lengthy
# queue # processing takes long enough for packets to
# come in, be eager
mysockets = _poller(iosockets)
pktqueue = collections.deque([])
cls.pulltoqueue(mysockets, pktqueue)
while len(pktqueue):
(data, sockaddr) = pktqueue.popleft()
cls._route_ipmiresponse(sockaddr, data)
cls.pulltoqueue(mysockets, pktqueue)
pktqueue = collections.deque([])
cls.pulltoqueue(iosockets, pktqueue)
while len(pktqueue):
(data, sockaddr) = pktqueue.popleft()
cls._route_ipmiresponse(sockaddr, data)
cls.pulltoqueue(iosockets, pktqueue)
sessionstodel = []
sessionstokeepalive = []
for session, parms in cls.keepalive_sessions.iteritems():