diff --git a/pyghmi/ipmi/private/session.py b/pyghmi/ipmi/private/session.py index f889162b..d91f5304 100644 --- a/pyghmi/ipmi/private/session.py +++ b/pyghmi/ipmi/private/session.py @@ -20,6 +20,7 @@ import collections import fcntl import hashlib import hmac +import operator import os import random import select @@ -50,6 +51,9 @@ selectbreak = None selectdeadline = 0 running = True iosockets = [] # set of iosockets that will be shared amongst Session objects +MAX_BMCS_PER_SOCKET = 64 # no more than this many BMCs will share a socket + # this could be adjusted based on rmem_max + # value, leading to fewer filehandles def _ioworker(): @@ -58,7 +62,6 @@ def _ioworker(): global selectdeadline selectbreak = os.pipe() fcntl.fcntl(selectbreak[0], fcntl.F_SETFL, os.O_NONBLOCK) - iosockets.append(selectbreak[0]) iowaiters = [] timeout = 300 iothreadready = True @@ -69,7 +72,8 @@ def _ioworker(): if timeout < 0: timeout = 0 selectdeadline = _monotonic_time() + timeout - tmplist, _, _ = select.select(iosockets, (), (), timeout) + mysockets = iosockets + [selectbreak[0]] + tmplist, _, _ = select.select(mysockets, (), (), timeout) # pessimistically move out the deadline # doing it this early (before ioqueue is evaluated) # this avoids other threads making a bad assumption @@ -225,11 +229,12 @@ class Session(object): keepalive_sessions = {} peeraddr_to_nodes = {} iterwaiters = [] - pending = 0 - maxpending = 128 - socket = None - # Upon exit of python, make sure we play nice with BMCs by assuring closed - # sessions for all that we tracked + #NOTE(jbjohnso): + #socketpool is a mapping of sockets to usage count + socketpool = {} + #this will be a lock. Delay the assignment so that a calling framework + #can do something like reassign our threading and select modules + socketchecking = None @classmethod def _cleanup(cls): @@ -240,11 +245,15 @@ class Session(object): running = False @classmethod - def _createsocket(cls): + def _initsessions(cls): + atexit.register(cls._cleanup) + + def _assignsocket(self): global iothread global iothreadready global iosockets if iothread is None: + self._initsessions() initevt = threading.Event() iothreadwaiters.append(initevt) iothread = threading.Thread(target=_ioworker) @@ -255,42 +264,21 @@ class Session(object): initevt = threading.Event() iothreadwaiters.append(initevt) initevt.wait() - atexit.register(cls._cleanup) - cls.socket = _io_apply(socket.socket, - (socket.AF_INET6, socket.SOCK_DGRAM)) # INET6 + # seek for the least used socket. As sessions close, they may free + # up slots in seemingly 'full' sockets. This scheme allows those + # slots to be recycled + sorted_candidates = sorted(self.socketpool.iteritems(), + key=operator.itemgetter(1)) + if sorted_candidates and sorted_candidates[0][1] < MAX_BMCS_PER_SOCKET: + self.socketpool[sorted_candidates[0][0]] += 1 + return sorted_candidates[0][0] + # we need a new socket + tmpsocket = _io_apply(socket.socket, + (socket.AF_INET6, socket.SOCK_DGRAM)) # INET6 # can do IPv4 if you are nice to it - try: # we will try to fixup our receive buffer size if we are smaller - # than allowed. - maxmf = open("/proc/sys/net/core/rmem_max") - rmemmax = int(maxmf.read()) - rmemmax /= 2 - curmax = cls.socket.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF) - curmax /= 2 - if rmemmax > curmax: - _io_apply(cls.socket.setsockopt, (socket.SOL_SOCKET, - socket.SO_RCVBUF, - rmemmax)) - except Exception: - # FIXME: be more selective in catching exceptions - pass - - curmax = _io_apply(cls.socket.getsockopt, - (socket.SOL_SOCKET, socket.SO_RCVBUF)) - iosockets.append(cls.socket) - curmax = curmax / 2 - # we throttle such that we never have no more outstanding packets than - # our receive buffer should be able to handle - cls.pending = 0 - cls.maxpending = curmax / 1000 - # pessimistically assume 1 kilobyte messages, - # which is way larger than almost all ipmi datagrams. - # For faster performance, sysadmins may want to examine and tune - # /proc/sys/net/core/rmem_max up. This allows the module to request - # more, but does not increase buffers for applications that do less - # creative things - # TODO(jbjohnso): perhaps spread sessions across a socket pool when - # rmem_max is small, still get ~65/socket, but avoid long queues that - # might happen with low rmem_max and putting thousands of nodes in line + self.socketpool[tmpsocket] = 1 + iosockets.append(tmpsocket) + return tmpsocket def _sync_login(self, response): """Handle synchronous callers in liue of @@ -348,6 +336,7 @@ class Session(object): self.cleaningup = False self.lastpayload = None self.bmc = bmc + self.broken = False try: self.userid = userid.encode('utf-8') self.password = password.encode('utf-8') @@ -373,8 +362,10 @@ class Session(object): else: self.async = True self.logonwaiters = [onlogon] - if not Session.socket: - self._createsocket() + if self.__class__.socketchecking is None: + self.__class__.socketchecking = threading.Lock() + with self.socketchecking: + self.socket = self._assignsocket() self.login() if not self.async: while not self.logged: @@ -385,6 +376,9 @@ class Session(object): # deregister our keepalive facility Session.keepalive_sessions.pop(self, None) self.logged = 0 # mark session as busted + if not self.broken: + self.socketpool[self.socket] -= 1 + self.broken = True # since this session is broken, remove it from the handler list. # This allows constructor to create a new, functional object to # replace this one @@ -740,10 +734,9 @@ class Session(object): self.sessionid = struct.unpack(" 0: - while _poller((cls.socket,)): # if the somewhat lengthy + while _poller(iosockets): # if the somewhat lengthy # queue # processing takes long enough for packets to # come in, be eager + mysockets = _poller(iosockets) pktqueue = collections.deque([]) - cls.pulltoqueue(cls.socket, pktqueue) + cls.pulltoqueue(mysockets, pktqueue) while len(pktqueue): (data, sockaddr) = pktqueue.popleft() cls._route_ipmiresponse(sockaddr, data) - cls.pulltoqueue(cls.socket, pktqueue) + cls.pulltoqueue(mysockets, pktqueue) sessionstodel = [] sessionstokeepalive = [] for session, parms in cls.keepalive_sessions.iteritems(): @@ -926,7 +925,6 @@ class Session(object): session) # defer deletion until after loop # to avoid confusing the for loop for session in sessionstodel: - cls.pending -= 1 cls.waiting_sessions.pop(session, None) session._timedout() return len(cls.waiting_sessions) @@ -945,7 +943,6 @@ class Session(object): try: cls.bmc_handlers[sockaddr]._handle_ipmi_packet(data, sockaddr=sockaddr) - cls.pending -= 1 except KeyError: pass @@ -1213,10 +1210,7 @@ class Session(object): self.lastpayload = None self._req_priv_level() - ''' - Internal function to parse IPMI nugget once extracted from its framing - ''' - + # Internal function to parse IPMI nugget once extracted from its framing def _parse_ipmi_payload(self, payload): # For now, skip the checksums since we are in LAN only, # TODO(jbjohnso): if implementing other channels, add checksum checks @@ -1326,8 +1320,6 @@ class Session(object): Session.wait_for_rsp(timeout=0, callout=False) # take opportunity # to drain the socket queue if # applicable - while Session.pending > Session.maxpending: - Session.wait_for_rsp() if self.sequencenumber: # seq number of zero will be left alone, it is # special, otherwise increment self.sequencenumber += 1 @@ -1336,14 +1328,13 @@ class Session(object): Session.waiting_sessions[self]['ipmisession'] = self Session.waiting_sessions[self]['timeout'] = self.timeout + \ _monotonic_time() - Session.pending += 1 if delay_xmit is not None: Session.waiting_sessions[self]['timeout'] = delay_xmit + \ _monotonic_time() return # skip transmit, let retry timer do it's thing if self.sockaddr: _io_apply(_io_sendto, - (Session.socket, self.netpacket, self.sockaddr)) + (self.socket, self.netpacket, self.sockaddr)) else: # he have not yet picked a working sockaddr for this connection, # try all the candidates that getaddrinfo provides self.allsockaddrs = [] @@ -1359,7 +1350,7 @@ class Session(object): sockaddr = (newhost, sockaddr[1], 0, 0) self.allsockaddrs.append(sockaddr) Session.bmc_handlers[sockaddr] = self - _io_apply(_io_sendto, (Session.socket, + _io_apply(_io_sendto, (self.socket, self.netpacket, sockaddr)) except socket.gaierror: raise exc.IpmiException( @@ -1377,6 +1368,7 @@ class Session(object): retry=False) self.logged = 0 self.nowait = False + self.socketpool[self.socket] -= 1 return {'success': True}