2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-04-17 18:49:32 +00:00

Migrate IPMI SOL to asyncio

This commit is contained in:
Jarrod Johnson 2024-05-30 15:37:06 -04:00
parent cbb52739d3
commit 00eff4a002
3 changed files with 48 additions and 41 deletions

View File

@ -64,7 +64,7 @@ async def get_buffer_output(nodename):
out = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
out.setsockopt(socket.SOL_SOCKET, socket.SO_PASSCRED, 1)
out.connect("\x00confluent-vtbuffer")
rdr, writer = await asyncio.open__unix_connection(sock=out)
rdr, writer = await asyncio.open_unix_connection(sock=out)
if not isinstance(nodename, bytes):
nodename = nodename.encode('utf8')
outdata = bytearray()
@ -77,7 +77,7 @@ async def get_buffer_output(nodename):
raise Exception("bad read")
outdata.extend(chunk)
writer.close()
await writer.wait_close()
await writer.wait_closed()
return bytes(outdata[:-1])
@ -167,7 +167,10 @@ class ConsoleHandler(object):
if self._genwatchattribs:
self._attribwatcher = self.cfgmgr.watch_attributes(
(self.node,), self._genwatchattribs, self._attribschanged)
self.check_isondemand()
util.spawn(self.ondemand_init())
async def ondemand_init(self):
await self.check_isondemand()
if not self._isondemand:
self.connectstate = 'connecting'
self._connect()
@ -202,7 +205,7 @@ class ConsoleHandler(object):
_tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event,
event=log.Events.stacktrace)
def check_isondemand(self):
async def check_isondemand(self):
self._dologging = True
attrvalue = self.cfgmgr.get_node_attributes(
(self.node,), ('console.logging', 'collective.manager'))
@ -216,21 +219,21 @@ class ConsoleHandler(object):
self._isondemand = True
if (attrvalue[self.node]['console.logging']['value']) in ('none', 'memory'):
self._dologging = False
self.check_collective(attrvalue)
await self.check_collective(attrvalue)
def check_collective(self, attrvalue):
async def check_collective(self, attrvalue):
myc = attrvalue.get(self.node, {}).get('collective.manager', {}).get(
'value', None)
if list(configmodule.list_collective()) and not myc:
self._is_local = False
self._detach()
self._disconnect()
await self._disconnect()
if myc and myc != collective.get_myname():
# Do not do console connect for nodes managed by another
# confluent collective member
self._is_local = False
self._detach()
self._disconnect()
await self._disconnect()
else:
self._is_local = True
@ -243,11 +246,11 @@ class ConsoleHandler(object):
return util.monotonic_time() - self.lasttime
return False
def _attribschanged(self, nodeattribs, configmanager, **kwargs):
async def _attribschanged(self, nodeattribs, configmanager, **kwargs):
if 'collective.manager' in nodeattribs[self.node]:
attrval = configmanager.get_node_attributes(self.node,
'collective.manager')
self.check_collective(attrval)
await self.check_collective(attrval)
if 'console.logging' in nodeattribs[self.node]:
# decide whether logging changes how we react or not
self._dologging = True
@ -344,9 +347,12 @@ class ConsoleHandler(object):
'not configured,\r\nset it to a valid value for console '
'function')
try:
self._console = list(await plugin.handle_path(
self._plugin_path.format(self.node),
"create", self.cfgmgr))[0]
consoles = await plugin.handle_path(
self._plugin_path.format(self.node),
"create", self.cfgmgr)
async for cns in consoles:
print(repr(cns))
self._console = cns
except (exc.NotImplementedException, exc.NotFoundException):
self._console = None
except Exception as e:
@ -563,7 +569,7 @@ class ConsoleHandler(object):
except Exception:
_tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event,
event=log.Events.stacktrace)
self._got_disconnected()
await self._got_disconnected()
def disconnect_node(node, configmanager):
@ -608,7 +614,8 @@ async def initialize():
# ['/opt/confluent/bin/vtbufferd', 'confluent-vtbuffer'], bufsize=0, stdin=subprocess.DEVNULL,
# stdout=subprocess.DEVNULL)
def start_console_sessions():
async def start_console_sessions():
configmodule.hook_new_configmanagers(_start_tenant_sessions)
@ -789,7 +796,7 @@ class ConsoleSession(object):
if not skipreplay:
for recdata in await self.conshdl.get_recent():
if recdata:
datacallback(recdata)
await datacallback(recdata)
await self.conshdl.attachsession(self)

View File

@ -25,6 +25,9 @@
# Things like heartbeating and discovery
# It also will optionally snoop SLP DA requests
#import logging
#logging.basicConfig(filename='/tmp/asyn.log', level=logging.DEBUG)
import atexit
import confluent.auth as auth
import confluent.config.conf as conf
@ -283,6 +286,7 @@ def migrate_db():
async def run(args):
asyncio.get_event_loop().set_debug(True)
setlimits()
try:
configmanager.ConfigManager(None)
@ -343,7 +347,7 @@ async def run(args):
os.umask(oumask)
auth.check_for_yaml()
collective.startup()
consoleserver.initialize()
await consoleserver.initialize()
http_bind_host, http_bind_port = _get_connector_config('http')
sock_bind_host, sock_bind_port = _get_connector_config('socket')
try:
@ -363,7 +367,7 @@ async def run(args):
await asyncio.sleep(0.5)
eventlet.spawn_n(disco.start_detection)
await asyncio.sleep(1)
consoleserver.start_console_sessions()
await consoleserver.start_console_sessions()
while 1:
await asyncio.sleep(100)

View File

@ -22,12 +22,12 @@ import confluent.messages as msg
import confluent.util as util
import copy
import errno
import eventlet
import eventlet.event
import eventlet.green.threading as threading
import eventlet.greenpool as greenpool
import eventlet.queue as queue
import eventlet.support.greendns
#import eventlet
#import eventlet.event
#import eventlet.green.threading as threading
#import eventlet.greenpool as greenpool
#import eventlet.queue as queue
#import eventlet.support.greendns
from fnmatch import fnmatch
import os
import pwd
@ -53,8 +53,9 @@ except NameError:
pci_cache = {}
def get_dns_txt(qstring):
return eventlet.support.greendns.resolver.query(
qstring, 'TXT')[0].strings[0].replace('i=', '')
return None
# return eventlet.support.greendns.resolver.query(
# qstring, 'TXT')[0].strings[0].replace('i=', '')
def get_pci_text_from_ids(subdevice, subvendor, device, vendor):
fqpi = '{0}.{1}.{2}.{3}'.format(subdevice, subvendor, device, vendor)
@ -214,10 +215,10 @@ class IpmiCommandWrapper(ipmicommand.Command):
# then do nothing
pass
def get_health(self):
async def get_health(self):
if self._inhealth:
while self._inhealth:
eventlet.sleep(0.1)
await asyncio.sleep(0.1)
return self._lasthealth
self._inhealth = True
try:
@ -329,7 +330,7 @@ class IpmiConsole(conapi.Console):
else:
self.datacallback(data)
def connect(self, callback):
async def connect(self, callback):
self.datacallback = callback
# we provide a weak reference to pyghmi as otherwise we'd
# have a circular reference and reference counting would never get
@ -341,12 +342,7 @@ class IpmiConsole(conapi.Console):
kg=self.kg, force=True,
iohandler=self.handle_data)
self.solconnection.outputlock = NullLock()
while (self.solconnection and not self.solconnection.connected and
not (self.broken or self.solconnection.broken or
self.solconnection.ipmi_session.broken)):
w = eventlet.event.Event()
_ipmiwaiters.append(w)
w.wait(15)
await self.solconnection.connect()
if (self.broken or not self.solconnection or
self.solconnection.broken or
self.solconnection.ipmi_session.broken):
@ -371,11 +367,11 @@ class IpmiConsole(conapi.Console):
self.broken = True
self.error = "closed"
def write(self, data):
self.solconnection.send_data(data)
async def write(self, data):
await self.solconnection.send_data(data)
def send_break(self):
self.solconnection.send_break()
async def send_break(self):
await self.solconnection.send_break()
async def perform_requests(operator, nodes, element, cfg, inputdata, realop):
@ -1323,10 +1319,10 @@ class IpmiHandler:
for sensor in filter(self.match_sensor, sensors):
self.output.put(msg.ChildCollection(simplify_name(sensor['name'])))
def health(self):
async def health(self):
if 'read' == self.op:
try:
response = self.ipmicmd.get_health()
response = await self.ipmicmd.get_health()
except pygexc.IpmiException:
self.output.put(msg.ConfluentTargetTimeout(self.node))
return