2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-01-28 11:57:37 +00:00

Remove thread aspect of ipmi.py

This is a work in progress, trying to make ipmi.py theoretically look better.
This commit is contained in:
Jarrod Johnson 2014-02-02 19:16:59 -05:00
parent 131e64ef0e
commit fd27835424
2 changed files with 39 additions and 141 deletions

View File

@ -11,6 +11,7 @@ import confluent.interface.console as conapi
import confluent.pluginapi as plugin
import confluent.util as util
import eventlet
import eventlet.green.threading as threading
import random
_handled_consoles = {}
@ -123,6 +124,7 @@ class ConsoleSession(object):
def __init__(self, node, configmanager, datacallback=None):
if node not in _handled_consoles:
_handled_consoles[node] = _ConsoleHandler(node, configmanager)
self._evt = threading.Event()
self.node = node
self.conshdl = _handled_consoles[node]
self.write = _handled_consoles[node].write
@ -139,6 +141,7 @@ class ConsoleSession(object):
def destroy(self):
_handled_consoles[self.node].unregister_rcpt(self.reghdl)
self.databuffer = None
self._evt = None
self.reghdl = None
def got_data(self, data):
@ -148,6 +151,7 @@ class ConsoleSession(object):
for data, we must maintain data in a buffer until retrieved
"""
self.databuffer += data
self._evt.set()
def get_next_output(self, timeout=45):
"""Poll for next available output on this console.
@ -161,12 +165,20 @@ class ConsoleSession(object):
try:
while len(self.databuffer) == 0 and currtime < deadline:
timeo = deadline - currtime
# change to a threading event object
# got_data will trigger this function to move
if self._evt is None:
self._evt = threading.Event()
self._evt.wait(timeout)
self.conshdl._console.wait_for_data(timeout=timeo)
currtime = util.monotonic_time()
except TypeError:
import traceback
traceback.print_exc()
return ""
retval = self.databuffer
self.databuffer = ""
self._evt.clear()
self.reaper = eventlet.spawn_after(15, self.destroy)
return retval

View File

@ -11,27 +11,13 @@ import pyghmi.exceptions as pygexc
import pyghmi.ipmi.console as console
import pyghmi.ipmi.command as ipmicommand
console.session.select = eventlet.green.select
console.session.threading = eventlet.green.threading
_ipmithread = None
#pullchain is a pipe to tug on to induce the ipmi thread process pending data
pullchain = None
chainpulled = False
tmptimeout = None
ipmiq = collections.deque([])
ipmiwaiters = collections.deque([])
def wait_on_ipmi():
waitevt = eventlet.event.Event()
ipmiwaiters.append(waitevt)
waitevt.wait()
_ipmithread = None
def _ipmi_evtloop():
global tmptimeout
global pullchain
console.session.Session.register_handle_callback(pullchain[0],
_process_chgs)
while (1):
try:
if tmptimeout is not None:
@ -39,52 +25,10 @@ def _ipmi_evtloop():
tmptimeout = None
else:
console.session.Session.wait_for_rsp(timeout=600)
while ipmiwaiters:
waiter = ipmiwaiters.popleft()
waiter.send()
except RuntimeError:
raise
except:
import traceback
traceback.print_exc()
def _process_chgs(intline):
#here we receive functions to run in our thread
#the tuples on the deque consist of:
#function, arg tuple, and optionally a callback
#to send the return value back to the requester
global chainpulled
os.read(intline,1) # answer the bell
chainpulled = False
cval = ()
try:
while ipmiq:
cval = ipmiq.popleft()
if hasattr(cval[0], '__call__'):
if isinstance(cval[1], tuple):
rv = cval[0](*cval[1])
elif isinstance(cval[1], dict):
rv = cval[0](**cval[1])
except pygexc.IpmiException as problem:
if str(problem) == 'timeout':
rv = {'error': 'timeout'}
else:
import traceback
traceback.print_exc()
cval=()
except: # assure the thread does not crash and burn
import traceback
traceback.print_exc()
cval=()
if len(cval) > 2:
cval[2](rv)
# If we are inside a loop within pyghmi, this is our only shot
# so we have to wake up anything that might be interested in
# state changes here as well as the evtloop
while ipmiwaiters:
waiter = ipmiwaiters.popleft()
waiter.send()
def get_conn_params(node, configdata):
if 'secret.ipmiuser' in configdata:
@ -148,39 +92,17 @@ class IpmiConsole(conapi.Console):
def connect(self,callback):
global _ipmithread
global pullchain
global chainpulled
self.datacallback = callback
self.solconnection=console.Console(bmc=self.bmc, port=self.port,
userid=self.username,
password=self.password, kg=self.kg,
force=True,
iohandler=self.handle_data)
if _ipmithread is None:
pullchain = os.pipe()
_ipmithread = eventlet.spawn(_ipmi_evtloop)
self.solconnection = None
ipmiq.append((console.Console,{'bmc': self.bmc,
'port': self.port,
'userid': self.username,
'password': self.password,
'kg': self.kg,
'force': True,
'iohandler': self.handle_data}, self.got_consobject))
if not chainpulled:
chainpulled = True
os.write(pullchain[1],'1')
while self.solconnection is None:
wait_on_ipmi()
def got_consobject(self, solconnection):
self.solconnection = solconnection
def write(self, data):
global chainpulled
while self.solconnection is None and not self.broken:
wait_on_ipmi()
ipmiq.append((self.solconnection.send_data, (data,)))
if not chainpulled:
chainpulled = True
os.write(pullchain[1],'1')
#self.solconnection.send_data(data)
self.solconnection.send_data(data)
def wait_for_data(self, timeout=600):
"""Wait for some network event.
@ -194,18 +116,10 @@ class IpmiConsole(conapi.Console):
# would be to add a layer through the callback. IMO there isn't enough
# value in assuring data coming back to bother with making the stack
# taller than it has to be
global tmptimeout
global chainpulled
tmptimeout = timeout
if not chainpulled:
chainpulled=True
os.write(pullchain[1],'1')
eventlet.sleep(0.001)
wait_on_ipmi()
#TODO: a channel for the ipmithread to tug back instead of busy wait
#while tmptimeout is not None:
# eventlet.sleep(0)
#console.session.Session.wait_for_rsp(timeout=timeout)
console.session.Session.wait_for_rsp(timeout=timeout)
class IpmiIterator(object):
@ -239,13 +153,7 @@ class IpmiHandler(object):
return self
def __init__(self, operation, node, element, cfd, inputdata):
global chainpulled
global _ipmithread
global pullchain
self.broken = False
if _ipmithread is None:
pullchain = os.pipe()
_ipmithread = eventlet.spawn(_ipmi_evtloop)
eventlet.sleep(0)
self.cfg = cfd[node]
self.loggedin = False
@ -255,54 +163,32 @@ class IpmiHandler(object):
connparams = get_conn_params(node, self.cfg)
self.ipmicmd = None
self.inputdata = inputdata
ipmiq.append((ipmicommand.Command,{'bmc': connparams['bmc'],
'userid': connparams['username'],
'password': connparams['passphrase'],
'kg': connparams['kg'],
'port': connparams['port'],
'onlogon': self.logged},
self.got_ipmicmd))
if not chainpulled:
chainpulled = True
os.write(pullchain[1],'1')
while self.ipmicmd == None:
wait_on_ipmi()
def got_ipmicmd(self, ipmicmd):
self.ipmicmd = ipmicmd
self.ipmicmd = ipmicommand.Command(bmc=connparams['bmc'],
userid=connparams['username'],
password=connparams['passphrase'],
kg=connparams['kg'],
port=connparams['port'],
onlogon=self.logged)
print "spin on logon"
while not (self.loggedin or self.broken):
print "on"
console.session.Session.wait_for_rsp(timeout=600)
print "hmph..."
def logged(self, response, ipmicmd):
print "huzzah"
if 'error' in response:
self.broken = True
self.error = response['error']
else:
self.loggedin = True
def call_ipmicmd(self, function, *args):
global chainpulled
self.lastrsp = None
ipmiq.append((function, args, self.got_rsp))
if not chainpulled:
chainpulled = True
os.write(pullchain[1],'1')
while self.lastrsp is None:
wait_on_ipmi()
if 'error' in self.lastrsp:
if self.lastrsp['error'] == 'timeout':
raise exc.TargetEndpointTimeout()
else:
raise Exception(self.lastrsp['error'])
return self.lastrsp
def got_rsp(self, response):
self.lastrsp = response
def handle_request(self):
bootdevices = {
'optical': 'cd'
}
while not (self.loggedin or self.broken):
wait_on_ipmi()
console.session.Session.wait_for_rsp(timeout=600)
if self.broken:
if self.error == 'timeout':
raise exc.TargetEndpointTimeout()
@ -310,26 +196,26 @@ class IpmiHandler(object):
raise Exception(self.error)
if self.element == [ 'power', 'state' ]:
if 'read' == self.op:
power = self.call_ipmicmd(self.ipmicmd.get_power)
power = self.ipmicmd.get_power()
return msg.PowerState(node=self.node,
state=power['powerstate'])
elif 'update' == self.op:
powerstate = self.inputdata.powerstate(self.node)
#TODO: call with wait argument
self.call_ipmicmd(self.ipmicmd.set_power, powerstate)
power = self.call_ipmicmd(self.ipmicmd.get_power)
self.ipmicmd.set_power(powerstate)
power = self.ipmicmd.get_power()
return msg.PowerState(node=self.node,
state=power['powerstate'])
elif self.element == [ 'boot', 'device' ]:
if 'read' == self.op:
bootdev = self.call_ipmicmd(self.ipmicmd.get_bootdev)
bootdev = self.ipmicmd.get_bootdev()
if bootdev['bootdev'] in bootdevices:
bootdev['bootdev'] = bootdevices[bootdev['bootdev']]
return msg.BootDevice(node=self.node,
device=bootdev['bootdev'])
elif 'update' == self.op:
bootdev = self.inputdata.bootdevice(self.node)
bootdev = self.call_ipmicmd(self.ipmicmd.set_bootdev, bootdev)
bootdev = self.ipmicmd.set_bootdev(bootdev)
if bootdev['bootdev'] in bootdevices:
bootdev['bootdev'] = bootdevices[bootdev['bootdev']]
return msg.BootDevice(node=self.node,