mirror of
https://opendev.org/x/pyghmi
synced 2025-01-16 04:38:19 +00:00
Use multiple sockets instead of one
Given the nature of SOL, it is impossible to control flow of incoming traffic. This means measures to mitigate the risk of exhausting buffer memory on the socket cease to be effective. Modify strategy to stop throttling and instead allocate new sockets to acquire more network buffer space. This means the footprint of a small scale setup is actually even lower and a larger setup does get more filehandles, but still 1/64th the footprint of the usual strategy. Change-Id: I10698393d31b0c04d0242ff85815239078c076e2
This commit is contained in:
parent
65f2a617d5
commit
784b39d06e
@ -20,6 +20,7 @@ import collections
|
||||
import fcntl
|
||||
import hashlib
|
||||
import hmac
|
||||
import operator
|
||||
import os
|
||||
import random
|
||||
import select
|
||||
@ -50,6 +51,9 @@ selectbreak = None
|
||||
selectdeadline = 0
|
||||
running = True
|
||||
iosockets = [] # set of iosockets that will be shared amongst Session objects
|
||||
MAX_BMCS_PER_SOCKET = 64 # no more than this many BMCs will share a socket
|
||||
# this could be adjusted based on rmem_max
|
||||
# value, leading to fewer filehandles
|
||||
|
||||
|
||||
def _ioworker():
|
||||
@ -58,7 +62,6 @@ def _ioworker():
|
||||
global selectdeadline
|
||||
selectbreak = os.pipe()
|
||||
fcntl.fcntl(selectbreak[0], fcntl.F_SETFL, os.O_NONBLOCK)
|
||||
iosockets.append(selectbreak[0])
|
||||
iowaiters = []
|
||||
timeout = 300
|
||||
iothreadready = True
|
||||
@ -69,7 +72,8 @@ def _ioworker():
|
||||
if timeout < 0:
|
||||
timeout = 0
|
||||
selectdeadline = _monotonic_time() + timeout
|
||||
tmplist, _, _ = select.select(iosockets, (), (), timeout)
|
||||
mysockets = iosockets + [selectbreak[0]]
|
||||
tmplist, _, _ = select.select(mysockets, (), (), timeout)
|
||||
# pessimistically move out the deadline
|
||||
# doing it this early (before ioqueue is evaluated)
|
||||
# this avoids other threads making a bad assumption
|
||||
@ -225,11 +229,12 @@ class Session(object):
|
||||
keepalive_sessions = {}
|
||||
peeraddr_to_nodes = {}
|
||||
iterwaiters = []
|
||||
pending = 0
|
||||
maxpending = 128
|
||||
socket = None
|
||||
# Upon exit of python, make sure we play nice with BMCs by assuring closed
|
||||
# sessions for all that we tracked
|
||||
#NOTE(jbjohnso):
|
||||
#socketpool is a mapping of sockets to usage count
|
||||
socketpool = {}
|
||||
#this will be a lock. Delay the assignment so that a calling framework
|
||||
#can do something like reassign our threading and select modules
|
||||
socketchecking = None
|
||||
|
||||
@classmethod
|
||||
def _cleanup(cls):
|
||||
@ -240,11 +245,15 @@ class Session(object):
|
||||
running = False
|
||||
|
||||
@classmethod
|
||||
def _createsocket(cls):
|
||||
def _initsessions(cls):
|
||||
atexit.register(cls._cleanup)
|
||||
|
||||
def _assignsocket(self):
|
||||
global iothread
|
||||
global iothreadready
|
||||
global iosockets
|
||||
if iothread is None:
|
||||
self._initsessions()
|
||||
initevt = threading.Event()
|
||||
iothreadwaiters.append(initevt)
|
||||
iothread = threading.Thread(target=_ioworker)
|
||||
@ -255,42 +264,21 @@ class Session(object):
|
||||
initevt = threading.Event()
|
||||
iothreadwaiters.append(initevt)
|
||||
initevt.wait()
|
||||
atexit.register(cls._cleanup)
|
||||
cls.socket = _io_apply(socket.socket,
|
||||
(socket.AF_INET6, socket.SOCK_DGRAM)) # INET6
|
||||
# seek for the least used socket. As sessions close, they may free
|
||||
# up slots in seemingly 'full' sockets. This scheme allows those
|
||||
# slots to be recycled
|
||||
sorted_candidates = sorted(self.socketpool.iteritems(),
|
||||
key=operator.itemgetter(1))
|
||||
if sorted_candidates and sorted_candidates[0][1] < MAX_BMCS_PER_SOCKET:
|
||||
self.socketpool[sorted_candidates[0][0]] += 1
|
||||
return sorted_candidates[0][0]
|
||||
# we need a new socket
|
||||
tmpsocket = _io_apply(socket.socket,
|
||||
(socket.AF_INET6, socket.SOCK_DGRAM)) # INET6
|
||||
# can do IPv4 if you are nice to it
|
||||
try: # we will try to fixup our receive buffer size if we are smaller
|
||||
# than allowed.
|
||||
maxmf = open("/proc/sys/net/core/rmem_max")
|
||||
rmemmax = int(maxmf.read())
|
||||
rmemmax /= 2
|
||||
curmax = cls.socket.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)
|
||||
curmax /= 2
|
||||
if rmemmax > curmax:
|
||||
_io_apply(cls.socket.setsockopt, (socket.SOL_SOCKET,
|
||||
socket.SO_RCVBUF,
|
||||
rmemmax))
|
||||
except Exception:
|
||||
# FIXME: be more selective in catching exceptions
|
||||
pass
|
||||
|
||||
curmax = _io_apply(cls.socket.getsockopt,
|
||||
(socket.SOL_SOCKET, socket.SO_RCVBUF))
|
||||
iosockets.append(cls.socket)
|
||||
curmax = curmax / 2
|
||||
# we throttle such that we never have no more outstanding packets than
|
||||
# our receive buffer should be able to handle
|
||||
cls.pending = 0
|
||||
cls.maxpending = curmax / 1000
|
||||
# pessimistically assume 1 kilobyte messages,
|
||||
# which is way larger than almost all ipmi datagrams.
|
||||
# For faster performance, sysadmins may want to examine and tune
|
||||
# /proc/sys/net/core/rmem_max up. This allows the module to request
|
||||
# more, but does not increase buffers for applications that do less
|
||||
# creative things
|
||||
# TODO(jbjohnso): perhaps spread sessions across a socket pool when
|
||||
# rmem_max is small, still get ~65/socket, but avoid long queues that
|
||||
# might happen with low rmem_max and putting thousands of nodes in line
|
||||
self.socketpool[tmpsocket] = 1
|
||||
iosockets.append(tmpsocket)
|
||||
return tmpsocket
|
||||
|
||||
def _sync_login(self, response):
|
||||
"""Handle synchronous callers in liue of
|
||||
@ -348,6 +336,7 @@ class Session(object):
|
||||
self.cleaningup = False
|
||||
self.lastpayload = None
|
||||
self.bmc = bmc
|
||||
self.broken = False
|
||||
try:
|
||||
self.userid = userid.encode('utf-8')
|
||||
self.password = password.encode('utf-8')
|
||||
@ -373,8 +362,10 @@ class Session(object):
|
||||
else:
|
||||
self.async = True
|
||||
self.logonwaiters = [onlogon]
|
||||
if not Session.socket:
|
||||
self._createsocket()
|
||||
if self.__class__.socketchecking is None:
|
||||
self.__class__.socketchecking = threading.Lock()
|
||||
with self.socketchecking:
|
||||
self.socket = self._assignsocket()
|
||||
self.login()
|
||||
if not self.async:
|
||||
while not self.logged:
|
||||
@ -385,6 +376,9 @@ class Session(object):
|
||||
# deregister our keepalive facility
|
||||
Session.keepalive_sessions.pop(self, None)
|
||||
self.logged = 0 # mark session as busted
|
||||
if not self.broken:
|
||||
self.socketpool[self.socket] -= 1
|
||||
self.broken = True
|
||||
# since this session is broken, remove it from the handler list.
|
||||
# This allows constructor to create a new, functional object to
|
||||
# replace this one
|
||||
@ -740,10 +734,9 @@ class Session(object):
|
||||
self.sessionid = struct.unpack("<I", struct.pack("4B", *data[0:4]))[0]
|
||||
self.authtype = 2
|
||||
self._activate_session(data[4:])
|
||||
'''
|
||||
This sends the activate session payload. We pick '1' as the requested
|
||||
sequence number without perturbing our real sequence number
|
||||
'''
|
||||
# NOTE(jbjohnso):
|
||||
# This sends the activate session payload. We pick '1' as the requested
|
||||
# sequence number without perturbing our real sequence number
|
||||
|
||||
def _activate_session(self, data):
|
||||
rqdata = [2, 4] + list(data) + [1, 0, 0, 0]
|
||||
@ -832,12 +825,13 @@ class Session(object):
|
||||
self._get_channel_auth_cap()
|
||||
|
||||
@classmethod
|
||||
def pulltoqueue(cls, mysocket, queue):
|
||||
while True:
|
||||
rdata = _io_apply(_io_recvfrom, (mysocket, 3000))
|
||||
if rdata is None:
|
||||
break
|
||||
queue.append(rdata)
|
||||
def pulltoqueue(cls, mysockets, queue):
|
||||
for mysocket in mysockets:
|
||||
while True:
|
||||
rdata = _io_apply(_io_recvfrom, (mysocket, 3000))
|
||||
if rdata is None:
|
||||
break
|
||||
queue.append(rdata)
|
||||
|
||||
@classmethod
|
||||
def wait_for_rsp(cls, timeout=None, callout=True):
|
||||
@ -897,17 +891,22 @@ class Session(object):
|
||||
timeout = 0
|
||||
if timeout is None:
|
||||
return 0
|
||||
rdylist = _poller(iosockets, timeout=timeout)
|
||||
if selectbreak is None:
|
||||
mysockets = iosockets
|
||||
else:
|
||||
mysockets = [iosockets + [selectbreak[0]]]
|
||||
rdylist = _poller(mysockets, timeout=timeout)
|
||||
if len(rdylist) > 0:
|
||||
while _poller((cls.socket,)): # if the somewhat lengthy
|
||||
while _poller(iosockets): # if the somewhat lengthy
|
||||
# queue # processing takes long enough for packets to
|
||||
# come in, be eager
|
||||
mysockets = _poller(iosockets)
|
||||
pktqueue = collections.deque([])
|
||||
cls.pulltoqueue(cls.socket, pktqueue)
|
||||
cls.pulltoqueue(mysockets, pktqueue)
|
||||
while len(pktqueue):
|
||||
(data, sockaddr) = pktqueue.popleft()
|
||||
cls._route_ipmiresponse(sockaddr, data)
|
||||
cls.pulltoqueue(cls.socket, pktqueue)
|
||||
cls.pulltoqueue(mysockets, pktqueue)
|
||||
sessionstodel = []
|
||||
sessionstokeepalive = []
|
||||
for session, parms in cls.keepalive_sessions.iteritems():
|
||||
@ -926,7 +925,6 @@ class Session(object):
|
||||
session) # defer deletion until after loop
|
||||
# to avoid confusing the for loop
|
||||
for session in sessionstodel:
|
||||
cls.pending -= 1
|
||||
cls.waiting_sessions.pop(session, None)
|
||||
session._timedout()
|
||||
return len(cls.waiting_sessions)
|
||||
@ -945,7 +943,6 @@ class Session(object):
|
||||
try:
|
||||
cls.bmc_handlers[sockaddr]._handle_ipmi_packet(data,
|
||||
sockaddr=sockaddr)
|
||||
cls.pending -= 1
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
@ -1213,10 +1210,7 @@ class Session(object):
|
||||
self.lastpayload = None
|
||||
self._req_priv_level()
|
||||
|
||||
'''
|
||||
Internal function to parse IPMI nugget once extracted from its framing
|
||||
'''
|
||||
|
||||
# Internal function to parse IPMI nugget once extracted from its framing
|
||||
def _parse_ipmi_payload(self, payload):
|
||||
# For now, skip the checksums since we are in LAN only,
|
||||
# TODO(jbjohnso): if implementing other channels, add checksum checks
|
||||
@ -1326,8 +1320,6 @@ class Session(object):
|
||||
Session.wait_for_rsp(timeout=0, callout=False) # take opportunity
|
||||
# to drain the socket queue if
|
||||
# applicable
|
||||
while Session.pending > Session.maxpending:
|
||||
Session.wait_for_rsp()
|
||||
if self.sequencenumber: # seq number of zero will be left alone, it is
|
||||
# special, otherwise increment
|
||||
self.sequencenumber += 1
|
||||
@ -1336,14 +1328,13 @@ class Session(object):
|
||||
Session.waiting_sessions[self]['ipmisession'] = self
|
||||
Session.waiting_sessions[self]['timeout'] = self.timeout + \
|
||||
_monotonic_time()
|
||||
Session.pending += 1
|
||||
if delay_xmit is not None:
|
||||
Session.waiting_sessions[self]['timeout'] = delay_xmit + \
|
||||
_monotonic_time()
|
||||
return # skip transmit, let retry timer do it's thing
|
||||
if self.sockaddr:
|
||||
_io_apply(_io_sendto,
|
||||
(Session.socket, self.netpacket, self.sockaddr))
|
||||
(self.socket, self.netpacket, self.sockaddr))
|
||||
else: # he have not yet picked a working sockaddr for this connection,
|
||||
# try all the candidates that getaddrinfo provides
|
||||
self.allsockaddrs = []
|
||||
@ -1359,7 +1350,7 @@ class Session(object):
|
||||
sockaddr = (newhost, sockaddr[1], 0, 0)
|
||||
self.allsockaddrs.append(sockaddr)
|
||||
Session.bmc_handlers[sockaddr] = self
|
||||
_io_apply(_io_sendto, (Session.socket,
|
||||
_io_apply(_io_sendto, (self.socket,
|
||||
self.netpacket, sockaddr))
|
||||
except socket.gaierror:
|
||||
raise exc.IpmiException(
|
||||
@ -1377,6 +1368,7 @@ class Session(object):
|
||||
retry=False)
|
||||
self.logged = 0
|
||||
self.nowait = False
|
||||
self.socketpool[self.socket] -= 1
|
||||
return {'success': True}
|
||||
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user