From fd278354246ce1ac81b0d22dd25da3ece0533a08 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Sun, 2 Feb 2014 19:16:59 -0500 Subject: [PATCH] Remove thread aspect of ipmi.py This is a work in progress, trying to make ipmi.py theoretically look better. --- confluent/consoleserver.py | 12 +++ plugins/hardwaremanagement/ipmi.py | 168 +++++------------------------ 2 files changed, 39 insertions(+), 141 deletions(-) diff --git a/confluent/consoleserver.py b/confluent/consoleserver.py index 0f6c9192..c9061545 100644 --- a/confluent/consoleserver.py +++ b/confluent/consoleserver.py @@ -11,6 +11,7 @@ import confluent.interface.console as conapi import confluent.pluginapi as plugin import confluent.util as util import eventlet +import eventlet.green.threading as threading import random _handled_consoles = {} @@ -123,6 +124,7 @@ class ConsoleSession(object): def __init__(self, node, configmanager, datacallback=None): if node not in _handled_consoles: _handled_consoles[node] = _ConsoleHandler(node, configmanager) + self._evt = threading.Event() self.node = node self.conshdl = _handled_consoles[node] self.write = _handled_consoles[node].write @@ -139,6 +141,7 @@ class ConsoleSession(object): def destroy(self): _handled_consoles[self.node].unregister_rcpt(self.reghdl) self.databuffer = None + self._evt = None self.reghdl = None def got_data(self, data): @@ -148,6 +151,7 @@ class ConsoleSession(object): for data, we must maintain data in a buffer until retrieved """ self.databuffer += data + self._evt.set() def get_next_output(self, timeout=45): """Poll for next available output on this console. @@ -161,12 +165,20 @@ class ConsoleSession(object): try: while len(self.databuffer) == 0 and currtime < deadline: timeo = deadline - currtime + # change to a threading event object + # got_data will trigger this function to move + if self._evt is None: + self._evt = threading.Event() + self._evt.wait(timeout) self.conshdl._console.wait_for_data(timeout=timeo) currtime = util.monotonic_time() except TypeError: + import traceback + traceback.print_exc() return "" retval = self.databuffer self.databuffer = "" + self._evt.clear() self.reaper = eventlet.spawn_after(15, self.destroy) return retval diff --git a/plugins/hardwaremanagement/ipmi.py b/plugins/hardwaremanagement/ipmi.py index 27afe019..5515ac26 100644 --- a/plugins/hardwaremanagement/ipmi.py +++ b/plugins/hardwaremanagement/ipmi.py @@ -11,27 +11,13 @@ import pyghmi.exceptions as pygexc import pyghmi.ipmi.console as console import pyghmi.ipmi.command as ipmicommand console.session.select = eventlet.green.select +console.session.threading = eventlet.green.threading -_ipmithread = None -#pullchain is a pipe to tug on to induce the ipmi thread process pending data -pullchain = None -chainpulled = False tmptimeout = None -ipmiq = collections.deque([]) -ipmiwaiters = collections.deque([]) - - -def wait_on_ipmi(): - waitevt = eventlet.event.Event() - ipmiwaiters.append(waitevt) - waitevt.wait() - +_ipmithread = None def _ipmi_evtloop(): global tmptimeout - global pullchain - console.session.Session.register_handle_callback(pullchain[0], - _process_chgs) while (1): try: if tmptimeout is not None: @@ -39,52 +25,10 @@ def _ipmi_evtloop(): tmptimeout = None else: console.session.Session.wait_for_rsp(timeout=600) - while ipmiwaiters: - waiter = ipmiwaiters.popleft() - waiter.send() - except RuntimeError: - raise except: import traceback traceback.print_exc() -def _process_chgs(intline): - #here we receive functions to run in our thread - #the tuples on the deque consist of: - #function, arg tuple, and optionally a callback - #to send the return value back to the requester - global chainpulled - os.read(intline,1) # answer the bell - chainpulled = False - cval = () - try: - while ipmiq: - cval = ipmiq.popleft() - if hasattr(cval[0], '__call__'): - if isinstance(cval[1], tuple): - rv = cval[0](*cval[1]) - elif isinstance(cval[1], dict): - rv = cval[0](**cval[1]) - except pygexc.IpmiException as problem: - if str(problem) == 'timeout': - rv = {'error': 'timeout'} - else: - import traceback - traceback.print_exc() - cval=() - except: # assure the thread does not crash and burn - import traceback - traceback.print_exc() - cval=() - if len(cval) > 2: - cval[2](rv) - # If we are inside a loop within pyghmi, this is our only shot - # so we have to wake up anything that might be interested in - # state changes here as well as the evtloop - while ipmiwaiters: - waiter = ipmiwaiters.popleft() - waiter.send() - def get_conn_params(node, configdata): if 'secret.ipmiuser' in configdata: @@ -148,39 +92,17 @@ class IpmiConsole(conapi.Console): def connect(self,callback): global _ipmithread - global pullchain - global chainpulled self.datacallback = callback + self.solconnection=console.Console(bmc=self.bmc, port=self.port, + userid=self.username, + password=self.password, kg=self.kg, + force=True, + iohandler=self.handle_data) if _ipmithread is None: - pullchain = os.pipe() _ipmithread = eventlet.spawn(_ipmi_evtloop) - self.solconnection = None - ipmiq.append((console.Console,{'bmc': self.bmc, - 'port': self.port, - 'userid': self.username, - 'password': self.password, - 'kg': self.kg, - 'force': True, - 'iohandler': self.handle_data}, self.got_consobject)) - if not chainpulled: - chainpulled = True - os.write(pullchain[1],'1') - while self.solconnection is None: - wait_on_ipmi() - - def got_consobject(self, solconnection): - self.solconnection = solconnection def write(self, data): - global chainpulled - while self.solconnection is None and not self.broken: - wait_on_ipmi() - ipmiq.append((self.solconnection.send_data, (data,))) - if not chainpulled: - chainpulled = True - os.write(pullchain[1],'1') - - #self.solconnection.send_data(data) + self.solconnection.send_data(data) def wait_for_data(self, timeout=600): """Wait for some network event. @@ -194,18 +116,10 @@ class IpmiConsole(conapi.Console): # would be to add a layer through the callback. IMO there isn't enough # value in assuring data coming back to bother with making the stack # taller than it has to be - global tmptimeout - global chainpulled - tmptimeout = timeout - if not chainpulled: - chainpulled=True - os.write(pullchain[1],'1') - eventlet.sleep(0.001) - wait_on_ipmi() #TODO: a channel for the ipmithread to tug back instead of busy wait #while tmptimeout is not None: # eventlet.sleep(0) - #console.session.Session.wait_for_rsp(timeout=timeout) + console.session.Session.wait_for_rsp(timeout=timeout) class IpmiIterator(object): @@ -239,13 +153,7 @@ class IpmiHandler(object): return self def __init__(self, operation, node, element, cfd, inputdata): - global chainpulled - global _ipmithread - global pullchain self.broken = False - if _ipmithread is None: - pullchain = os.pipe() - _ipmithread = eventlet.spawn(_ipmi_evtloop) eventlet.sleep(0) self.cfg = cfd[node] self.loggedin = False @@ -255,54 +163,32 @@ class IpmiHandler(object): connparams = get_conn_params(node, self.cfg) self.ipmicmd = None self.inputdata = inputdata - ipmiq.append((ipmicommand.Command,{'bmc': connparams['bmc'], - 'userid': connparams['username'], - 'password': connparams['passphrase'], - 'kg': connparams['kg'], - 'port': connparams['port'], - 'onlogon': self.logged}, - self.got_ipmicmd)) - if not chainpulled: - chainpulled = True - os.write(pullchain[1],'1') - while self.ipmicmd == None: - wait_on_ipmi() - - def got_ipmicmd(self, ipmicmd): - self.ipmicmd = ipmicmd + self.ipmicmd = ipmicommand.Command(bmc=connparams['bmc'], + userid=connparams['username'], + password=connparams['passphrase'], + kg=connparams['kg'], + port=connparams['port'], + onlogon=self.logged) + print "spin on logon" + while not (self.loggedin or self.broken): + print "on" + console.session.Session.wait_for_rsp(timeout=600) + print "hmph..." def logged(self, response, ipmicmd): + print "huzzah" if 'error' in response: self.broken = True self.error = response['error'] else: self.loggedin = True - def call_ipmicmd(self, function, *args): - global chainpulled - self.lastrsp = None - ipmiq.append((function, args, self.got_rsp)) - if not chainpulled: - chainpulled = True - os.write(pullchain[1],'1') - while self.lastrsp is None: - wait_on_ipmi() - if 'error' in self.lastrsp: - if self.lastrsp['error'] == 'timeout': - raise exc.TargetEndpointTimeout() - else: - raise Exception(self.lastrsp['error']) - return self.lastrsp - - def got_rsp(self, response): - self.lastrsp = response - def handle_request(self): bootdevices = { 'optical': 'cd' } while not (self.loggedin or self.broken): - wait_on_ipmi() + console.session.Session.wait_for_rsp(timeout=600) if self.broken: if self.error == 'timeout': raise exc.TargetEndpointTimeout() @@ -310,26 +196,26 @@ class IpmiHandler(object): raise Exception(self.error) if self.element == [ 'power', 'state' ]: if 'read' == self.op: - power = self.call_ipmicmd(self.ipmicmd.get_power) + power = self.ipmicmd.get_power() return msg.PowerState(node=self.node, state=power['powerstate']) elif 'update' == self.op: powerstate = self.inputdata.powerstate(self.node) #TODO: call with wait argument - self.call_ipmicmd(self.ipmicmd.set_power, powerstate) - power = self.call_ipmicmd(self.ipmicmd.get_power) + self.ipmicmd.set_power(powerstate) + power = self.ipmicmd.get_power() return msg.PowerState(node=self.node, state=power['powerstate']) elif self.element == [ 'boot', 'device' ]: if 'read' == self.op: - bootdev = self.call_ipmicmd(self.ipmicmd.get_bootdev) + bootdev = self.ipmicmd.get_bootdev() if bootdev['bootdev'] in bootdevices: bootdev['bootdev'] = bootdevices[bootdev['bootdev']] return msg.BootDevice(node=self.node, device=bootdev['bootdev']) elif 'update' == self.op: bootdev = self.inputdata.bootdevice(self.node) - bootdev = self.call_ipmicmd(self.ipmicmd.set_bootdev, bootdev) + bootdev = self.ipmicmd.set_bootdev(bootdev) if bootdev['bootdev'] in bootdevices: bootdev['bootdev'] = bootdevices[bootdev['bootdev']] return msg.BootDevice(node=self.node,