diff --git a/confluent_server/confluent/core.py b/confluent_server/confluent/core.py index ff0f9247..12a382f0 100644 --- a/confluent_server/confluent/core.py +++ b/confluent_server/confluent/core.py @@ -64,6 +64,7 @@ import confluent.util as util import eventlet import eventlet.green.ssl as ssl import eventlet.semaphore as semaphore +import inspect import itertools import msgpack import os @@ -82,6 +83,24 @@ try: except NameError: unicode = str + +async def iterate_responses(responses): + # normalize plugin behaviors + # some might have an async Generator + # some might be a traditional generator + # others might return an awaitable generator + if inspect.isasyncgen(responses): + async for rsp in responses: + yield rsp + elif inspect.isgenerator(responses): + for rsp in responses: + yield rsp + elif inspect.isawaitable(responses): + responses = await responses + for rsp in responses: + yield rsp + + def seek_element(currplace, currkey, depth): try: return currplace[currkey] diff --git a/confluent_server/confluent/discovery/core.py b/confluent_server/confluent/discovery/core.py index ef00737c..b6409a6d 100644 --- a/confluent_server/confluent/discovery/core.py +++ b/confluent_server/confluent/discovery/core.py @@ -730,7 +730,7 @@ def _recheck_single_unknown_info(configmanager, info): # if cancel did not result in dead, then we are in progress if rechecker is None or rechecker.dead: rechecktime = util.monotonic_time() + 300 - rechecker = eventlet.spawn_after(300, _periodic_recheck, + rechecker = util.spawn_after(300, _periodic_recheck, configmanager) return nodename, info['maccount'] = get_nodename(configmanager, handler, info) @@ -860,7 +860,7 @@ def detected(info): rechecker.cancel() if rechecker is None or rechecker.dead: rechecktime = util.monotonic_time() + 300 - rechecker = eventlet.spawn_after(300, _periodic_recheck, cfg) + rechecker = util.spawn_after(300, _periodic_recheck, cfg) unknown_info[info['hwaddr']] = info info['discostatus'] = 'unidentfied' #TODO, eventlet spawn after to recheck sooner, or somehow else @@ -1581,12 +1581,12 @@ rechecker = None rechecktime = None rechecklock = eventlet.semaphore.Semaphore() -def _periodic_recheck(configmanager): +async def _periodic_recheck(configmanager): global rechecker global rechecktime rechecker = None try: - _recheck_nodes((), configmanager) + await _recheck_nodes((), configmanager) except Exception: traceback.print_exc() log.log({'error': 'Unexpected error during discovery, check debug ' @@ -1595,7 +1595,7 @@ def _periodic_recheck(configmanager): # for rechecker was requested in the course of recheck_nodes if rechecker is None: rechecktime = util.monotonic_time() + 900 - rechecker = eventlet.spawn_after(900, _periodic_recheck, + rechecker = util.spawn_after(900, _periodic_recheck, configmanager) @@ -1643,7 +1643,7 @@ def start_detection(): start_autosense() if rechecker is None: rechecktime = util.monotonic_time() + 900 - rechecker = eventlet.spawn_after(900, _periodic_recheck, cfg) + rechecker = util.spawn_after(900, _periodic_recheck, cfg) eventlet.spawn_n(ssdp.snoop, safe_detected, None, ssdp, get_node_by_uuid_or_mac) def stop_autosense(): diff --git a/confluent_server/confluent/plugins/configuration/attributes.py b/confluent_server/confluent/plugins/configuration/attributes.py index 2c9a6ac9..a1244d4c 100644 --- a/confluent_server/confluent/plugins/configuration/attributes.py +++ b/confluent_server/confluent/plugins/configuration/attributes.py @@ -157,9 +157,9 @@ def retrieve_nodes(nodes, element, configmanager, inputdata, clearwarnbynode): raise Exception("BUGGY ATTRIBUTE FOR NODE") -def update(nodes, element, configmanager, inputdata): +async def update(nodes, element, configmanager, inputdata): if nodes is not None: - return update_nodes(nodes, element, configmanager, inputdata) + return await update_nodes(nodes, element, configmanager, inputdata) elif element[0] == 'nodegroups': return update_nodegroup( element[1], element[3], configmanager, inputdata) @@ -243,7 +243,7 @@ def yield_rename_resources(namemap, isnode): else: yield msg.RenamedResource(node, namemap[node]) -def update_nodes(nodes, element, configmanager, inputdata): +async def update_nodes(nodes, element, configmanager, inputdata): updatedict = {} if not nodes: raise exc.InvalidArgumentException( @@ -306,7 +306,7 @@ def update_nodes(nodes, element, configmanager, inputdata): configmanager.clear_node_attributes([node], clearattribs, warnings=clearwarnbynode[node]) updatedict[node] = updatenode try: - configmanager.set_node_attributes(updatedict) + await configmanager.set_node_attributes(updatedict) except ValueError as e: raise exc.InvalidArgumentException(str(e)) return retrieve(nodes, element, configmanager, inputdata, clearwarnbynode) diff --git a/confluent_server/confluent/sockapi.py b/confluent_server/confluent/sockapi.py index accd5373..c82d69b8 100644 --- a/confluent_server/confluent/sockapi.py +++ b/confluent_server/confluent/sockapi.py @@ -203,7 +203,7 @@ async def send_response(responses, connection): responses = await responses if responses is None: return - async for rsp in responses: + async for rsp in pluginapi.iterate_responses(responses): await send_data(connection, rsp.raw()) await send_data(connection, {'_requestdone': 1})