mirror of
https://github.com/xcat2/confluent.git
synced 2025-04-15 17:49:34 +00:00
Add facilities to subscribe/unsubscribe from discovery agents
This connects the new affluent discovery facility to local discovery view.
This commit is contained in:
parent
3afd6ecb5d
commit
e0feb104ff
@ -1239,7 +1239,7 @@ def handle_path(path, operation, configmanager, inputdata=None, autostrip=True):
|
||||
operation, pathcomponents, autostrip)
|
||||
elif pathcomponents[0] == 'discovery':
|
||||
return disco.handle_api_request(
|
||||
configmanager, inputdata, operation, pathcomponents)
|
||||
configmanager, inputdata, operation, pathcomponents, pluginmap['affluent'])
|
||||
elif pathcomponents[0] == 'networking':
|
||||
return macmap.handle_api_request(
|
||||
configmanager, inputdata, operation, pathcomponents)
|
||||
|
@ -424,7 +424,7 @@ def handle_autosense_config(operation, inputdata):
|
||||
stop_autosense()
|
||||
|
||||
|
||||
def handle_api_request(configmanager, inputdata, operation, pathcomponents):
|
||||
def handle_api_request(configmanager, inputdata, operation, pathcomponents, affluent=None):
|
||||
if pathcomponents == ['discovery', 'autosense']:
|
||||
return handle_autosense_config(operation, inputdata)
|
||||
if operation == 'retrieve':
|
||||
@ -435,7 +435,15 @@ def handle_api_request(configmanager, inputdata, operation, pathcomponents):
|
||||
raise exc.InvalidArgumentException()
|
||||
rescan()
|
||||
return (msg.KeyValueData({'rescan': 'started'}),)
|
||||
|
||||
elif operation in ('update', 'create') and pathcomponents == ['discovery', 'remote']:
|
||||
if 'subscribe' in inputdata:
|
||||
target = inputdata['subscribe']
|
||||
affluent.subscribe_discovery(target, configmanager, collective.get_myname())
|
||||
return (msg.KeyValueData({'status': 'subscribed'}),)
|
||||
if 'unsubscribe' in inputdata:
|
||||
target = inputdata['unsubscribe']
|
||||
affluent.unsubscribe_discovery(target, configmanager, collective.get_myname())
|
||||
return (msg.KeyValueData({'status': 'unsubscribed'}),)
|
||||
elif operation in ('update', 'create'):
|
||||
if pathcomponents == ['discovery', 'register']:
|
||||
return
|
||||
@ -487,6 +495,7 @@ def handle_read_api_request(pathcomponents):
|
||||
dirlist = [msg.ChildCollection(x + '/') for x in sorted(list(subcats))]
|
||||
dirlist.append(msg.ChildCollection('rescan'))
|
||||
dirlist.append(msg.ChildCollection('autosense'))
|
||||
dirlist.append(msg.ChildCollection('remote'))
|
||||
return dirlist
|
||||
if not coll:
|
||||
return show_info(queryparms['by-mac'])
|
||||
|
@ -16,6 +16,7 @@
|
||||
|
||||
import eventlet
|
||||
import eventlet.queue as queue
|
||||
import eventlet.green.socket as socket
|
||||
import confluent.exceptions as exc
|
||||
webclient = eventlet.import_patched('pyghmi.util.webclient')
|
||||
import confluent.messages as msg
|
||||
@ -53,6 +54,32 @@ class WebClient(object):
|
||||
return rsp
|
||||
|
||||
|
||||
def subscribe_discovery(node, configmanager, myname):
|
||||
creds = configmanager.get_node_attributes(
|
||||
node, ['secret.hardwaremanagementuser', 'secret.hardwaremanagementpassword'], decrypt=True)
|
||||
tsock = socket.create_connection((node, 443))
|
||||
myip = tsock.getsockname()[0]
|
||||
tsock.close()
|
||||
if ':' in myip:
|
||||
myip = '[{0}]'.format(myip)
|
||||
myurl = 'https://{0}/confluent-api/self/register_discovered'.format(myip)
|
||||
wc = WebClient(node, configmanager, creds)
|
||||
with open('/etc/confluent/tls/cacert.pem') as cain:
|
||||
cacert = cain.read()
|
||||
wc.wc.grab_json_response('/affluent/cert_authorities/{0}'.format(myname), cacert)
|
||||
res, status = wc.wc.grab_json_response_with_status('/affluent/discovery_subscribers/{0}'.format(myname), {'url': myurl, 'authname': node})
|
||||
if status == 200:
|
||||
agentkey = res['cryptkey']
|
||||
configmanager.set_node_attributes({node: {'crypted.selfapikey': {'hashvalue': agentkey}}})
|
||||
|
||||
def unsubscribe_discovery(node, configmanager, myname):
|
||||
creds = configmanager.get_node_attributes(
|
||||
node, ['secret.hardwaremanagementuser', 'secret.hardwaremanagementpassword'], decrypt=True)
|
||||
wc = WebClient(node, configmanager, creds)
|
||||
res, status = wc.wc.grab_json_response_with_status('/affluent/cert_authorities/{0}'.format(myname), method='DELETE')
|
||||
res, status = wc.wc.grab_json_response_with_status('/affluent/discovery_subscribers/{0}'.format(myname), method='DELETE')
|
||||
|
||||
|
||||
def update(nodes, element, configmanager, inputdata):
|
||||
for node in nodes:
|
||||
yield msg.ConfluentNodeError(node, 'Not Implemented')
|
||||
|
Loading…
x
Reference in New Issue
Block a user