diff --git a/pyghmi/ipmi/private/session.py b/pyghmi/ipmi/private/session.py index af47d6a6..5fd24ca4 100644 --- a/pyghmi/ipmi/private/session.py +++ b/pyghmi/ipmi/private/session.py @@ -27,7 +27,6 @@ import select import socket import struct import threading -import traceback from Crypto.Cipher import AES @@ -69,6 +68,7 @@ def define_worker(): global iothreadready global selectdeadline iowaiters = [] + directediowaiters = {} timeout = 300 iothreadready = True while iothreadwaiters: @@ -84,45 +84,51 @@ def define_worker(): # this avoids other threads making a bad assumption # about not having to break into the select selectdeadline = _monotonic_time() + 300 - _io_graball(iosockets) - for w in iowaiters: - w[3].set() - iowaiters = [] timeout = 300 + sockaddrs = _io_graball(iosockets, directediowaiters) + for w in iowaiters: + w[1].set() + iowaiters = [] + for d in directediowaiters: + # these are the existing waiters that didn't get + # satisfied last graball, allow them to set a new + # deadline if they still have time waiting, or + # if they have expired, wake them now to let them + # process their timeout + for w in directediowaiters[d]: + ltimeout = w[0] - _monotonic_time() + if ltimeout < 0: + w[1].set() # time is up, wake the caller + elif ltimeout < timeout: + timeout = ltimeout while ioqueue: workitem = ioqueue.popleft() # order: function, args, list to append to , event to set - if isinstance(workitem[1], tuple): # positional arguments - try: - workitem[2].append(workitem[0](*workitem[1])) - except Exception: - traceback.print_exc() - workitem[3].set() - elif isinstance(workitem[1], dict): - try: - workitem[2].append(workitem[0](**workitem[1])) - except Exception: - traceback.print_exc() - workitem[3].set() - elif workitem[0] == 'wait': - if pktqueue: - workitem[3].set() - else: - ltimeout = workitem[1] - _monotonic_time() - if ltimeout < timeout: - timeout = ltimeout + if workitem[2] is None and sockaddrs: + workitem[1].set() + elif workitem[2] in sockaddrs: + workitem[1].set() + else: + ltimeout = workitem[0] - _monotonic_time() + if ltimeout < timeout: + timeout = ltimeout + if workitem[2] is None: iowaiters.append(workitem) + else: + if workitem[2] in directediowaiters: + directediowaiters[workitem[2]].append(workitem) + else: + directediowaiters[workitem[2]] = [workitem] return _IOWorker -pktqueue = collections.deque([]) +sessionqueue = collections.deque([]) -def _io_wait(timeout): +def _io_wait(timeout, myaddr=None): evt = threading.Event() - result = [] deadline = timeout + _monotonic_time() - ioqueue.append(('wait', deadline, result, evt)) + ioqueue.append((deadline, evt, myaddr)) # Unfortunately, at least with eventlet patched threading, the wait() # is a somewhat busy wait if given a deadline. Workaround by having # it piggy back on the select() in the io thread, which is a truly @@ -141,20 +147,38 @@ def _io_sendto(mysocket, packet, sockaddr): pass -def _io_graball(mysockets): - for mysocket in mysockets: - while True: - rdata = _io_recvfrom(mysocket, 3000) - if rdata is None: - break - # If the payload is shorter than 4 bytes, it cannot - # be a useful packet. Skip it entirely. - # This applies to the packet sent to self to break - # into the select - if len(rdata[0]) < 4: - continue - rdata = rdata + (mysocket,) - pktqueue.append(rdata) +def _io_graball(mysockets, iowaiters): + sockaddrs = [] + for mysocket in mysockets: + while True: + rdata = _io_recvfrom(mysocket, 3000) + if rdata is None: + break + # If the payload is shorter than 4 bytes, it cannot + # be a useful packet. Skip it entirely. + # This applies to the packet sent to self to break + # into the select + if len(rdata[0]) < 4: + continue + rdata = rdata + (mysocket,) + relsession = None + if rdata[1] in Session.bmc_handlers: + # session data + rdata = rdata + (True,) + relsession = Session.bmc_handlers[rdata[1]] + elif rdata[2] in Session.bmc_handlers: + # pyghmi is the bmc, and we have sessionless data + rdata = rdata + (False,) + relsession = Session.bmc_handlers[rdata[2]] + if relsession is not None: + relsession.pktqueue.append(rdata) + sessionqueue.append(relsession) + if rdata[1] in iowaiters: + for w in iowaiters[rdata[1]]: + w[1].set() + del iowaiters[rdata[1]] + sockaddrs.append(rdata[1]) + return sockaddrs def _io_recvfrom(mysocket, size): @@ -193,10 +217,10 @@ def _monotonic_time(): def _poller(timeout=0): - if pktqueue: + if sessionqueue: return True _io_wait(timeout) - return pktqueue + return sessionqueue def _aespad(data): @@ -384,6 +408,10 @@ class Session(object): self._customkeepalives = None self.bmc = bmc self.broken = False + # a private queue for packets for which this session handler + # is destined to receive + self.pktqueue = collections.deque([]) + try: self.userid = userid.encode('utf-8') self.password = password.encode('utf-8') @@ -595,6 +623,23 @@ class Session(object): incrementtime += 1 return cumulativetime + 1 + def _cmdwait(self): + while self._isincommand(): + _io_wait(self._isincommand(), self.sockaddr) + + def awaitresponse(self, retry): + while retry and self.lastresponse is None and self.logged: + timeout = self.expiration - _monotonic_time() + _io_wait(timeout, self.sockaddr) + while self.iterwaiters: + waiter = self.iterwaiters.pop() + waiter({'success': True}) + self.process_pktqueue() + if (self in self.waiting_sessions and + self.expiration < _monotonic_time()): + self.waiting_sessions.pop(self, None) + self._timedout() + def raw_command(self, netfn, command, @@ -605,8 +650,7 @@ class Session(object): timeout=None): if not self.logged: raise exc.IpmiException('Session no longer connected') - while self._isincommand(): - Session.wait_for_rsp(self._isincommand()) + self._cmdwait() if not self.logged: raise exc.IpmiException('Session no longer connected') self.incommand = _monotonic_time() + self._getmaxtimeout() @@ -627,8 +671,7 @@ class Session(object): #of only the constructor needing a callback. From then on, #synchronous usage of the class acts in a greenthread style governed by #order of data on the network - while retry and self.lastresponse is None and self.logged: - Session.wait_for_rsp(timeout=timeout) + self.awaitresponse(retry) lastresponse = self.lastresponse self.incommand = False if retry and lastresponse is None: @@ -982,9 +1025,9 @@ class Session(object): if timeout is None: return 0 if _poller(timeout=timeout): - while pktqueue: - (data, sockaddr, mysocket) = pktqueue.popleft() - cls._route_ipmiresponse(sockaddr, data, mysocket) + while sessionqueue: + relsession = sessionqueue.popleft() + relsession.process_pktqueue() sessionstodel = [] sessionstokeepalive = [] for session, parms in cls.keepalive_sessions.iteritems(): @@ -1067,6 +1110,16 @@ class Session(object): except exc.IpmiException: self._mark_broken() + def process_pktqueue(self): + while self.pktqueue: + pkt = self.pktqueue.popleft() + if not pkt[0][0] == '\x06' and pkt[0][2:4] == '\xff\x07': + continue + if pkt[1] in self.bmc_handlers: + self._handle_ipmi_packet(pkt[0], sockaddr=pkt[1]) + elif pkt[2] in self.bmc_handlers: + self.sessionless_data(pkt[0], pkt[1]) + @classmethod def _route_ipmiresponse(cls, sockaddr, data, mysocket): if not (data[0] == '\x06' and data[2:4] == '\xff\x07'): # not ipmi @@ -1491,8 +1544,8 @@ class Session(object): if delay_xmit is not None: Session.waiting_sessions[self] = {} Session.waiting_sessions[self]['ipmisession'] = self - Session.waiting_sessions[self]['timeout'] = delay_xmit + \ - _monotonic_time() + self.expiration = delay_xmit + _monotonic_time() + Session.waiting_sessions[self]['timeout'] = self.expiration return # skip transmit, let retry timer do it's thing if self.sockaddr: _io_sendto(self.socket, self.netpacket, self.sockaddr) @@ -1519,11 +1572,10 @@ class Session(object): Session.waiting_sessions[self] = {} Session.waiting_sessions[self]['ipmisession'] = self if timeout is not None: - Session.waiting_sessions[self]['timeout'] = timeout + \ - _monotonic_time() + self.expiration = timeout + _monotonic_time() else: - Session.waiting_sessions[self]['timeout'] = self.timeout + \ - _monotonic_time() + self.expiration = self.timeout + _monotonic_time() + Session.waiting_sessions[self]['timeout'] = self.expiration def logout(self): if not self.logged: