diff --git a/confluent_server/confluent/core.py b/confluent_server/confluent/core.py index 6ab6bd59..f21d36b5 100644 --- a/confluent_server/confluent/core.py +++ b/confluent_server/confluent/core.py @@ -44,6 +44,7 @@ import confluent.discovery.core as disco import confluent.interface.console as console import confluent.exceptions as exc import confluent.messages as msg +import confluent.mountmanager as mountmanager import confluent.networking.macmap as macmap import confluent.noderange as noderange import confluent.osimage as osimage @@ -159,7 +160,7 @@ def _merge_dict(original, custom): rootcollections = ['deployment/', 'discovery/', 'events/', 'networking/', - 'noderange/', 'nodes/', 'nodegroups/', 'usergroups/' , + 'noderange/', 'nodes/', 'nodegroups/', 'storage/', 'usergroups/' , 'users/', 'uuid', 'version'] @@ -169,6 +170,13 @@ class PluginRoute(object): +def handle_storage(configmanager, inputdata, pathcomponents, operation): + if len(pathcomponents) == 1: + yield msg.ChildCollection('remote/') + return + if pathcomponents[1] == 'remote': + for rsp in mountmanager.handle_request(configmanager, inputdata, pathcomponents[2:], operation): + yield rsp def handle_deployment(configmanager, inputdata, pathcomponents, operation): if len(pathcomponents) == 1: @@ -1245,6 +1253,9 @@ def handle_path(path, operation, configmanager, inputdata=None, autostrip=True): elif pathcomponents[0] == 'deployment': return handle_deployment(configmanager, inputdata, pathcomponents, operation) + elif pathcomponents[0] == 'storage': + return handle_storage(configmanager, inputdata, pathcomponents, + operation) elif pathcomponents[0] == 'nodegroups': return handle_nodegroup_request(configmanager, inputdata, pathcomponents, diff --git a/confluent_server/confluent/mountmanager.py b/confluent_server/confluent/mountmanager.py new file mode 100644 index 00000000..c73b87a2 --- /dev/null +++ b/confluent_server/confluent/mountmanager.py @@ -0,0 +1,58 @@ + +import confluent.messages as msg +import confluent.exceptions as exc +import struct +import eventlet.green.socket as socket +mountsbyuser = {} + +def handle_request(configmanager, inputdata, pathcomponents, operation): + curruser = configmanager.current_user + if len(pathcomponents) == 0: + mounts = mountsbyuser.get(curruser, []) + if operation == 'retrieve': + for mount in mounts: + yield msg.ChildCollection(mount['index']) + elif operation == 'create': + if 'name' not in inputdata: + raise exc.InvalidArgumentException('Required parameter "name" is missing') + usedidx = set([]) + for mount in mounts: + usedidx.add(mount['index']) + curridx = 1 + while curridx in usedidx: + curridx += 1 + currmount = requestmount(curruser, inputdata['name']) + currmount['index'] = curridx + if curruser not in mountsbyuser: + mountsbyuser[curruser] = [] + mountsbyuser[curruser].append(currmount) + yield msg.KeyValueData({ + 'path': currmount['path'], + 'authtoken': currmount['authtoken'] + }) + +def requestmount(subdir, filename): + a = socket.socket(socket.AF_UNIX) + a.connect('/var/run/confluent/browserfs/control') + subname = subdir.encode() + a.send(struct.pack('!II', 1, len(subname))) + a.send(subname) + fname = filename.encode() + a.send(struct.pack('!I', len(fname))) + a.send(fname) + rsp = a.recv(4) + retcode = struct.unpack('!I', rsp)[0] + if retcode != 0: + raise Exception("Bad return code") + rsp = a.recv(4) + nlen = struct.unpack('!I', rsp)[0] + idstr = a.recv(nlen).decode('utf8') + rsp = a.recv(4) + nlen = struct.unpack('!I', rsp)[0] + authtok = a.recv(nlen).decode('utf8') + thismount = { + 'id': idstr, + 'path': '{}/{}/{}'.format(idstr, subdir, filename), + 'authtoken': authtok + } + return thismount