2
0
mirror of https://github.com/xcat2/confluent.git synced 2024-11-26 03:19:48 +00:00

Rework subscriptions API

It's clear that the service will need
to explicitly track subscriptions
to enable rescan, for example,
and thus might as well restructure
the API around this information.
This commit is contained in:
Jarrod Johnson 2022-11-03 09:21:15 -04:00
parent a6090877ff
commit 7561b68b57
2 changed files with 34 additions and 10 deletions

View File

@ -423,11 +423,28 @@ def handle_autosense_config(operation, inputdata):
else:
stop_autosense()
def get_subscriptions():
try:
with open('/etc/confluent/discovery_subscriptions.json', 'r') as ds:
dst = ds.read()
if dst:
return json.loads(dst)
except Exception:
return {}
def save_subscriptions(subs):
with open('/etc/confluent/discovery_subscriptions.json', 'w') as dso:
dso.write(json.dumps(subs))
def handle_api_request(configmanager, inputdata, operation, pathcomponents, affluent=None):
if pathcomponents == ['discovery', 'autosense']:
return handle_autosense_config(operation, inputdata)
if operation == 'retrieve':
if operation == 'retrieve' and pathcomponents[:2] == ['discovery', 'subscriptions']:
if len(pathcomponents) > 2:
raise Exception('TODO')
currsubs = get_subscriptions()
return [msg.ChildCollection(x) for x in currsubs]
elif operation == 'retrieve':
return handle_read_api_request(pathcomponents)
elif (operation in ('update', 'create') and
pathcomponents == ['discovery', 'rescan']):
@ -435,15 +452,21 @@ def handle_api_request(configmanager, inputdata, operation, pathcomponents, affl
raise exc.InvalidArgumentException()
rescan()
return (msg.KeyValueData({'rescan': 'started'}),)
elif operation in ('update', 'create') and pathcomponents == ['discovery', 'subscription']:
if 'subscribe' in inputdata:
target = inputdata['subscribe']
affluent.subscribe_discovery(target, configmanager, collective.get_myname())
return (msg.KeyValueData({'status': 'subscribed'}),)
if 'unsubscribe' in inputdata:
target = inputdata['unsubscribe']
affluent.unsubscribe_discovery(target, configmanager, collective.get_myname())
return (msg.KeyValueData({'status': 'unsubscribed'}),)
elif operation in ('update', 'create') and pathcomponents[:2] == ['discovery', 'subscriptions']:
target = pathcomponents[2]
affluent.subscribe_discovery(target, configmanager, collective.get_myname())
currsubs = get_subscriptions()
currsubs[target] = {}
save_subscriptions(currsubs)
return (msg.KeyValueData({'status': 'subscribed'}),)
elif operation == 'delete' and pathcomponents[:2] == ['discovery', 'subscriptions']:
target = pathcomponents[2]
affluent.unsubscribe_discovery(target, configmanager, collective.get_myname())
currsubs = get_subscriptions()
if target in currsubs:
del currsubs[target]
save_subscriptions(currsubs)
return (msg.KeyValueData({'status': 'unsubscribed'}),)
elif operation in ('update', 'create'):
if pathcomponents == ['discovery', 'register']:
return

View File

@ -71,6 +71,7 @@ def subscribe_discovery(node, configmanager, myname):
if status == 200:
agentkey = res['cryptkey']
configmanager.set_node_attributes({node: {'crypted.selfapikey': {'hashvalue': agentkey}}})
res, status = wc.wc.grab_json_response_with_status('/affluent/systems/renotify', {'subscriber': myname})
def unsubscribe_discovery(node, configmanager, myname):
creds = configmanager.get_node_attributes(