From 7561b68b57fad55857893e0fa9853dd2eab8a16c Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Thu, 3 Nov 2022 09:21:15 -0400 Subject: [PATCH] Rework subscriptions API It's clear that the service will need to explicitly track subscriptions to enable rescan, for example, and thus might as well restructure the API around this information. --- confluent_server/confluent/discovery/core.py | 43 ++++++++++++++----- .../plugins/hardwaremanagement/affluent.py | 1 + 2 files changed, 34 insertions(+), 10 deletions(-) diff --git a/confluent_server/confluent/discovery/core.py b/confluent_server/confluent/discovery/core.py index c502a05c..89db2ac5 100644 --- a/confluent_server/confluent/discovery/core.py +++ b/confluent_server/confluent/discovery/core.py @@ -423,11 +423,28 @@ def handle_autosense_config(operation, inputdata): else: stop_autosense() +def get_subscriptions(): + try: + with open('/etc/confluent/discovery_subscriptions.json', 'r') as ds: + dst = ds.read() + if dst: + return json.loads(dst) + except Exception: + return {} + +def save_subscriptions(subs): + with open('/etc/confluent/discovery_subscriptions.json', 'w') as dso: + dso.write(json.dumps(subs)) def handle_api_request(configmanager, inputdata, operation, pathcomponents, affluent=None): if pathcomponents == ['discovery', 'autosense']: return handle_autosense_config(operation, inputdata) - if operation == 'retrieve': + if operation == 'retrieve' and pathcomponents[:2] == ['discovery', 'subscriptions']: + if len(pathcomponents) > 2: + raise Exception('TODO') + currsubs = get_subscriptions() + return [msg.ChildCollection(x) for x in currsubs] + elif operation == 'retrieve': return handle_read_api_request(pathcomponents) elif (operation in ('update', 'create') and pathcomponents == ['discovery', 'rescan']): @@ -435,15 +452,21 @@ def handle_api_request(configmanager, inputdata, operation, pathcomponents, affl raise exc.InvalidArgumentException() rescan() return (msg.KeyValueData({'rescan': 'started'}),) - elif operation in ('update', 'create') and pathcomponents == ['discovery', 'subscription']: - if 'subscribe' in inputdata: - target = inputdata['subscribe'] - affluent.subscribe_discovery(target, configmanager, collective.get_myname()) - return (msg.KeyValueData({'status': 'subscribed'}),) - if 'unsubscribe' in inputdata: - target = inputdata['unsubscribe'] - affluent.unsubscribe_discovery(target, configmanager, collective.get_myname()) - return (msg.KeyValueData({'status': 'unsubscribed'}),) + elif operation in ('update', 'create') and pathcomponents[:2] == ['discovery', 'subscriptions']: + target = pathcomponents[2] + affluent.subscribe_discovery(target, configmanager, collective.get_myname()) + currsubs = get_subscriptions() + currsubs[target] = {} + save_subscriptions(currsubs) + return (msg.KeyValueData({'status': 'subscribed'}),) + elif operation == 'delete' and pathcomponents[:2] == ['discovery', 'subscriptions']: + target = pathcomponents[2] + affluent.unsubscribe_discovery(target, configmanager, collective.get_myname()) + currsubs = get_subscriptions() + if target in currsubs: + del currsubs[target] + save_subscriptions(currsubs) + return (msg.KeyValueData({'status': 'unsubscribed'}),) elif operation in ('update', 'create'): if pathcomponents == ['discovery', 'register']: return diff --git a/confluent_server/confluent/plugins/hardwaremanagement/affluent.py b/confluent_server/confluent/plugins/hardwaremanagement/affluent.py index 522c1ac0..b0501339 100644 --- a/confluent_server/confluent/plugins/hardwaremanagement/affluent.py +++ b/confluent_server/confluent/plugins/hardwaremanagement/affluent.py @@ -71,6 +71,7 @@ def subscribe_discovery(node, configmanager, myname): if status == 200: agentkey = res['cryptkey'] configmanager.set_node_attributes({node: {'crypted.selfapikey': {'hashvalue': agentkey}}}) + res, status = wc.wc.grab_json_response_with_status('/affluent/systems/renotify', {'subscriber': myname}) def unsubscribe_discovery(node, configmanager, myname): creds = configmanager.get_node_attributes(