mirror of
https://github.com/xcat2/confluent.git
synced 2025-01-23 16:13:47 +00:00
Make indirect PDU operations concurrent
Similar to the enclosure reseat work, have indirect PDU operations be made concurrent across PDUs, though still serial within a PDU.
This commit is contained in:
parent
59a31d38a2
commit
3a0172cccc
@ -15,7 +15,6 @@ import confluent.core as core
|
||||
import confluent.messages as msg
|
||||
import pyghmi.exceptions as pygexc
|
||||
import confluent.exceptions as exc
|
||||
import eventlet
|
||||
import eventlet.queue as queue
|
||||
import eventlet.greenpool as greenpool
|
||||
|
||||
|
@ -15,10 +15,16 @@ import confluent.core as core
|
||||
import confluent.messages as msg
|
||||
import pyghmi.exceptions as pygexc
|
||||
import confluent.exceptions as exc
|
||||
import eventlet.greenpool as greenpool
|
||||
import eventlet.queue as queue
|
||||
|
||||
class TaskDone:
|
||||
pass
|
||||
|
||||
def retrieve(nodes, element, configmanager, inputdata):
|
||||
emebs = configmanager.get_node_attributes(
|
||||
nodes, (u'power.*pdu', u'power.*outlet'))
|
||||
relpdus = {}
|
||||
if element == ['power', 'inlets']:
|
||||
outletnames = set([])
|
||||
for node in nodes:
|
||||
@ -39,13 +45,36 @@ def retrieve(nodes, element, configmanager, inputdata):
|
||||
for pgroup in outlets[node]:
|
||||
pdu = outlets[node][pgroup]['pdu']
|
||||
outlet = outlets[node][pgroup]['outlet']
|
||||
try:
|
||||
for rsp in core.handle_path(
|
||||
'/nodes/{0}/power/outlets/{1}'.format(pdu, outlet),
|
||||
'retrieve', configmanager):
|
||||
yield msg.KeyValueData({pgroup: rsp.kvpairs['state']['value']}, node)
|
||||
except exc.TargetEndpointBadCredentials:
|
||||
yield msg.ConfluentTargetInvalidCredentials(pdu)
|
||||
if pdu not in relpdus:
|
||||
relpdus[pdu] = {}
|
||||
relpdus[pdu][outlet] = (node, pgroup)
|
||||
rspq = queue.Queue()
|
||||
gp = greenpool.GreenPool(64)
|
||||
for pdu in relpdus:
|
||||
gp.spawn(readpdu, pdu, relpdus[pdu], configmanager, rspq)
|
||||
while gp.running():
|
||||
nrsp = rspq.get()
|
||||
if not isinstance(nrsp, TaskDone):
|
||||
yield nrsp
|
||||
while not rspq.empty():
|
||||
nrsp = rspq.get()
|
||||
if not isinstance(nrsp, TaskDone):
|
||||
yield nrsp
|
||||
|
||||
def readpdu(pdu, outletmap, configmanager, rspq):
|
||||
try:
|
||||
for outlet in outletmap:
|
||||
node, pgroup = outletmap[outlet]
|
||||
try:
|
||||
for rsp in core.handle_path(
|
||||
'/nodes/{0}/power/outlets/{1}'.format(pdu, outlet),
|
||||
'retrieve', configmanager):
|
||||
rspq.put(msg.KeyValueData({pgroup: rsp.kvpairs['state']['value']}, node))
|
||||
except exc.TargetEndpointBadCredentials:
|
||||
rspq.put(msg.ConfluentTargetInvalidCredentials(pdu))
|
||||
finally: # ensure thhat at least one thing triggers the get
|
||||
rspq.put(TaskDone())
|
||||
|
||||
|
||||
def get_outlets(nodes, emebs, inletname):
|
||||
outlets = {}
|
||||
@ -72,11 +101,34 @@ def update(nodes, element, configmanager, inputdata):
|
||||
emebs = configmanager.get_node_attributes(
|
||||
nodes, (u'power.*pdu', u'power.*outlet'))
|
||||
inletname = element[-1]
|
||||
relpdus = {}
|
||||
rspq = queue.Queue()
|
||||
gp = greenpool.GreenPool(64)
|
||||
outlets = get_outlets(nodes, emebs, inletname)
|
||||
for node in outlets:
|
||||
for pgroup in outlets[node]:
|
||||
pdu = outlets[node][pgroup]['pdu']
|
||||
outlet = outlets[node][pgroup]['outlet']
|
||||
if pdu not in relpdus:
|
||||
relpdus[pdu] = {}
|
||||
relpdus[pdu][outlet] = (node, pgroup)
|
||||
for pdu in relpdus:
|
||||
gp.spawn(updatepdu, pdu, relpdus[pdu], configmanager, inputdata, rspq)
|
||||
while gp.running():
|
||||
nrsp = rspq.get()
|
||||
if not isinstance(nrsp, TaskDone):
|
||||
yield nrsp
|
||||
while not rspq.empty():
|
||||
nrsp = rspq.get()
|
||||
if not isinstance(nrsp, TaskDone):
|
||||
yield nrsp
|
||||
|
||||
def updatepdu(pdu, outletmap, configmanager, inputdata, rspq):
|
||||
try:
|
||||
for outlet in outletmap:
|
||||
node, pgroup = outletmap[outlet]
|
||||
for rsp in core.handle_path('/nodes/{0}/power/outlets/{1}'.format(pdu, outlet),
|
||||
'update', configmanager, inputdata={'state': inputdata.powerstate(node)}):
|
||||
yield msg.KeyValueData({pgroup: rsp.kvpairs['state']['value']}, node)
|
||||
rspq.put(msg.KeyValueData({pgroup: rsp.kvpairs['state']['value']}, node))
|
||||
finally:
|
||||
rspq.put(TaskDone())
|
||||
|
Loading…
x
Reference in New Issue
Block a user