2
0
mirror of https://github.com/xcat2/confluent.git synced 2024-11-22 17:43:14 +00:00

Advanced asyncio port progress

Offer a function in core to normalize plugin return.

A plugin might return an async generator, a traditional generator,
or might even return an awaitable wrapping a traditional generator.

Replace eventlet spawn with util spawn in discover core

Have node attribute update await the set_node_attributes appropriately
This commit is contained in:
Jarrod Johnson 2024-04-30 10:44:43 -04:00
parent 0be60b1ce2
commit 553916340e
4 changed files with 30 additions and 11 deletions

View File

@ -64,6 +64,7 @@ import confluent.util as util
import eventlet
import eventlet.green.ssl as ssl
import eventlet.semaphore as semaphore
import inspect
import itertools
import msgpack
import os
@ -82,6 +83,24 @@ try:
except NameError:
unicode = str
async def iterate_responses(responses):
# normalize plugin behaviors
# some might have an async Generator
# some might be a traditional generator
# others might return an awaitable generator
if inspect.isasyncgen(responses):
async for rsp in responses:
yield rsp
elif inspect.isgenerator(responses):
for rsp in responses:
yield rsp
elif inspect.isawaitable(responses):
responses = await responses
for rsp in responses:
yield rsp
def seek_element(currplace, currkey, depth):
try:
return currplace[currkey]

View File

@ -730,7 +730,7 @@ def _recheck_single_unknown_info(configmanager, info):
# if cancel did not result in dead, then we are in progress
if rechecker is None or rechecker.dead:
rechecktime = util.monotonic_time() + 300
rechecker = eventlet.spawn_after(300, _periodic_recheck,
rechecker = util.spawn_after(300, _periodic_recheck,
configmanager)
return
nodename, info['maccount'] = get_nodename(configmanager, handler, info)
@ -860,7 +860,7 @@ def detected(info):
rechecker.cancel()
if rechecker is None or rechecker.dead:
rechecktime = util.monotonic_time() + 300
rechecker = eventlet.spawn_after(300, _periodic_recheck, cfg)
rechecker = util.spawn_after(300, _periodic_recheck, cfg)
unknown_info[info['hwaddr']] = info
info['discostatus'] = 'unidentfied'
#TODO, eventlet spawn after to recheck sooner, or somehow else
@ -1581,12 +1581,12 @@ rechecker = None
rechecktime = None
rechecklock = eventlet.semaphore.Semaphore()
def _periodic_recheck(configmanager):
async def _periodic_recheck(configmanager):
global rechecker
global rechecktime
rechecker = None
try:
_recheck_nodes((), configmanager)
await _recheck_nodes((), configmanager)
except Exception:
traceback.print_exc()
log.log({'error': 'Unexpected error during discovery, check debug '
@ -1595,7 +1595,7 @@ def _periodic_recheck(configmanager):
# for rechecker was requested in the course of recheck_nodes
if rechecker is None:
rechecktime = util.monotonic_time() + 900
rechecker = eventlet.spawn_after(900, _periodic_recheck,
rechecker = util.spawn_after(900, _periodic_recheck,
configmanager)
@ -1643,7 +1643,7 @@ def start_detection():
start_autosense()
if rechecker is None:
rechecktime = util.monotonic_time() + 900
rechecker = eventlet.spawn_after(900, _periodic_recheck, cfg)
rechecker = util.spawn_after(900, _periodic_recheck, cfg)
eventlet.spawn_n(ssdp.snoop, safe_detected, None, ssdp, get_node_by_uuid_or_mac)
def stop_autosense():

View File

@ -157,9 +157,9 @@ def retrieve_nodes(nodes, element, configmanager, inputdata, clearwarnbynode):
raise Exception("BUGGY ATTRIBUTE FOR NODE")
def update(nodes, element, configmanager, inputdata):
async def update(nodes, element, configmanager, inputdata):
if nodes is not None:
return update_nodes(nodes, element, configmanager, inputdata)
return await update_nodes(nodes, element, configmanager, inputdata)
elif element[0] == 'nodegroups':
return update_nodegroup(
element[1], element[3], configmanager, inputdata)
@ -243,7 +243,7 @@ def yield_rename_resources(namemap, isnode):
else:
yield msg.RenamedResource(node, namemap[node])
def update_nodes(nodes, element, configmanager, inputdata):
async def update_nodes(nodes, element, configmanager, inputdata):
updatedict = {}
if not nodes:
raise exc.InvalidArgumentException(
@ -306,7 +306,7 @@ def update_nodes(nodes, element, configmanager, inputdata):
configmanager.clear_node_attributes([node], clearattribs, warnings=clearwarnbynode[node])
updatedict[node] = updatenode
try:
configmanager.set_node_attributes(updatedict)
await configmanager.set_node_attributes(updatedict)
except ValueError as e:
raise exc.InvalidArgumentException(str(e))
return retrieve(nodes, element, configmanager, inputdata, clearwarnbynode)

View File

@ -203,7 +203,7 @@ async def send_response(responses, connection):
responses = await responses
if responses is None:
return
async for rsp in responses:
async for rsp in pluginapi.iterate_responses(responses):
await send_data(connection, rsp.raw())
await send_data(connection, {'_requestdone': 1})