From 00eff4a0029690763678dff77296530eb8c0422d Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Thu, 30 May 2024 15:37:06 -0400 Subject: [PATCH] Migrate IPMI SOL to asyncio --- confluent_server/confluent/consoleserver.py | 39 ++++++++++------- confluent_server/confluent/main.py | 8 +++- .../plugins/hardwaremanagement/ipmi.py | 42 +++++++++---------- 3 files changed, 48 insertions(+), 41 deletions(-) diff --git a/confluent_server/confluent/consoleserver.py b/confluent_server/confluent/consoleserver.py index 6e27771e..2442d4fa 100644 --- a/confluent_server/confluent/consoleserver.py +++ b/confluent_server/confluent/consoleserver.py @@ -64,7 +64,7 @@ async def get_buffer_output(nodename): out = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) out.setsockopt(socket.SOL_SOCKET, socket.SO_PASSCRED, 1) out.connect("\x00confluent-vtbuffer") - rdr, writer = await asyncio.open__unix_connection(sock=out) + rdr, writer = await asyncio.open_unix_connection(sock=out) if not isinstance(nodename, bytes): nodename = nodename.encode('utf8') outdata = bytearray() @@ -77,7 +77,7 @@ async def get_buffer_output(nodename): raise Exception("bad read") outdata.extend(chunk) writer.close() - await writer.wait_close() + await writer.wait_closed() return bytes(outdata[:-1]) @@ -167,7 +167,10 @@ class ConsoleHandler(object): if self._genwatchattribs: self._attribwatcher = self.cfgmgr.watch_attributes( (self.node,), self._genwatchattribs, self._attribschanged) - self.check_isondemand() + util.spawn(self.ondemand_init()) + + async def ondemand_init(self): + await self.check_isondemand() if not self._isondemand: self.connectstate = 'connecting' self._connect() @@ -202,7 +205,7 @@ class ConsoleHandler(object): _tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event, event=log.Events.stacktrace) - def check_isondemand(self): + async def check_isondemand(self): self._dologging = True attrvalue = self.cfgmgr.get_node_attributes( (self.node,), ('console.logging', 'collective.manager')) @@ -216,21 +219,21 @@ class ConsoleHandler(object): self._isondemand = True if (attrvalue[self.node]['console.logging']['value']) in ('none', 'memory'): self._dologging = False - self.check_collective(attrvalue) + await self.check_collective(attrvalue) - def check_collective(self, attrvalue): + async def check_collective(self, attrvalue): myc = attrvalue.get(self.node, {}).get('collective.manager', {}).get( 'value', None) if list(configmodule.list_collective()) and not myc: self._is_local = False self._detach() - self._disconnect() + await self._disconnect() if myc and myc != collective.get_myname(): # Do not do console connect for nodes managed by another # confluent collective member self._is_local = False self._detach() - self._disconnect() + await self._disconnect() else: self._is_local = True @@ -243,11 +246,11 @@ class ConsoleHandler(object): return util.monotonic_time() - self.lasttime return False - def _attribschanged(self, nodeattribs, configmanager, **kwargs): + async def _attribschanged(self, nodeattribs, configmanager, **kwargs): if 'collective.manager' in nodeattribs[self.node]: attrval = configmanager.get_node_attributes(self.node, 'collective.manager') - self.check_collective(attrval) + await self.check_collective(attrval) if 'console.logging' in nodeattribs[self.node]: # decide whether logging changes how we react or not self._dologging = True @@ -344,9 +347,12 @@ class ConsoleHandler(object): 'not configured,\r\nset it to a valid value for console ' 'function') try: - self._console = list(await plugin.handle_path( - self._plugin_path.format(self.node), - "create", self.cfgmgr))[0] + consoles = await plugin.handle_path( + self._plugin_path.format(self.node), + "create", self.cfgmgr) + async for cns in consoles: + print(repr(cns)) + self._console = cns except (exc.NotImplementedException, exc.NotFoundException): self._console = None except Exception as e: @@ -563,7 +569,7 @@ class ConsoleHandler(object): except Exception: _tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event, event=log.Events.stacktrace) - self._got_disconnected() + await self._got_disconnected() def disconnect_node(node, configmanager): @@ -608,7 +614,8 @@ async def initialize(): # ['/opt/confluent/bin/vtbufferd', 'confluent-vtbuffer'], bufsize=0, stdin=subprocess.DEVNULL, # stdout=subprocess.DEVNULL) -def start_console_sessions(): + +async def start_console_sessions(): configmodule.hook_new_configmanagers(_start_tenant_sessions) @@ -789,7 +796,7 @@ class ConsoleSession(object): if not skipreplay: for recdata in await self.conshdl.get_recent(): if recdata: - datacallback(recdata) + await datacallback(recdata) await self.conshdl.attachsession(self) diff --git a/confluent_server/confluent/main.py b/confluent_server/confluent/main.py index 7375b584..221ee16f 100644 --- a/confluent_server/confluent/main.py +++ b/confluent_server/confluent/main.py @@ -25,6 +25,9 @@ # Things like heartbeating and discovery # It also will optionally snoop SLP DA requests + +#import logging +#logging.basicConfig(filename='/tmp/asyn.log', level=logging.DEBUG) import atexit import confluent.auth as auth import confluent.config.conf as conf @@ -283,6 +286,7 @@ def migrate_db(): async def run(args): + asyncio.get_event_loop().set_debug(True) setlimits() try: configmanager.ConfigManager(None) @@ -343,7 +347,7 @@ async def run(args): os.umask(oumask) auth.check_for_yaml() collective.startup() - consoleserver.initialize() + await consoleserver.initialize() http_bind_host, http_bind_port = _get_connector_config('http') sock_bind_host, sock_bind_port = _get_connector_config('socket') try: @@ -363,7 +367,7 @@ async def run(args): await asyncio.sleep(0.5) eventlet.spawn_n(disco.start_detection) await asyncio.sleep(1) - consoleserver.start_console_sessions() + await consoleserver.start_console_sessions() while 1: await asyncio.sleep(100) diff --git a/confluent_server/confluent/plugins/hardwaremanagement/ipmi.py b/confluent_server/confluent/plugins/hardwaremanagement/ipmi.py index 46aebdaf..1a854554 100644 --- a/confluent_server/confluent/plugins/hardwaremanagement/ipmi.py +++ b/confluent_server/confluent/plugins/hardwaremanagement/ipmi.py @@ -22,12 +22,12 @@ import confluent.messages as msg import confluent.util as util import copy import errno -import eventlet -import eventlet.event -import eventlet.green.threading as threading -import eventlet.greenpool as greenpool -import eventlet.queue as queue -import eventlet.support.greendns +#import eventlet +#import eventlet.event +#import eventlet.green.threading as threading +#import eventlet.greenpool as greenpool +#import eventlet.queue as queue +#import eventlet.support.greendns from fnmatch import fnmatch import os import pwd @@ -53,8 +53,9 @@ except NameError: pci_cache = {} def get_dns_txt(qstring): - return eventlet.support.greendns.resolver.query( - qstring, 'TXT')[0].strings[0].replace('i=', '') + return None + # return eventlet.support.greendns.resolver.query( + # qstring, 'TXT')[0].strings[0].replace('i=', '') def get_pci_text_from_ids(subdevice, subvendor, device, vendor): fqpi = '{0}.{1}.{2}.{3}'.format(subdevice, subvendor, device, vendor) @@ -214,10 +215,10 @@ class IpmiCommandWrapper(ipmicommand.Command): # then do nothing pass - def get_health(self): + async def get_health(self): if self._inhealth: while self._inhealth: - eventlet.sleep(0.1) + await asyncio.sleep(0.1) return self._lasthealth self._inhealth = True try: @@ -329,7 +330,7 @@ class IpmiConsole(conapi.Console): else: self.datacallback(data) - def connect(self, callback): + async def connect(self, callback): self.datacallback = callback # we provide a weak reference to pyghmi as otherwise we'd # have a circular reference and reference counting would never get @@ -341,12 +342,7 @@ class IpmiConsole(conapi.Console): kg=self.kg, force=True, iohandler=self.handle_data) self.solconnection.outputlock = NullLock() - while (self.solconnection and not self.solconnection.connected and - not (self.broken or self.solconnection.broken or - self.solconnection.ipmi_session.broken)): - w = eventlet.event.Event() - _ipmiwaiters.append(w) - w.wait(15) + await self.solconnection.connect() if (self.broken or not self.solconnection or self.solconnection.broken or self.solconnection.ipmi_session.broken): @@ -371,11 +367,11 @@ class IpmiConsole(conapi.Console): self.broken = True self.error = "closed" - def write(self, data): - self.solconnection.send_data(data) + async def write(self, data): + await self.solconnection.send_data(data) - def send_break(self): - self.solconnection.send_break() + async def send_break(self): + await self.solconnection.send_break() async def perform_requests(operator, nodes, element, cfg, inputdata, realop): @@ -1323,10 +1319,10 @@ class IpmiHandler: for sensor in filter(self.match_sensor, sensors): self.output.put(msg.ChildCollection(simplify_name(sensor['name']))) - def health(self): + async def health(self): if 'read' == self.op: try: - response = self.ipmicmd.get_health() + response = await self.ipmicmd.get_health() except pygexc.IpmiException: self.output.put(msg.ConfluentTargetTimeout(self.node)) return