diff --git a/confluent_client/confluent/client.py b/confluent_client/confluent/client.py index 5a29ea60..7fa78e47 100644 --- a/confluent_client/confluent/client.py +++ b/confluent_client/confluent/client.py @@ -154,6 +154,7 @@ class Command(object): self._prevkeyname = None self.connection = None self._currnoderange = None + self.unixdomain = False if server is None: if 'CONFLUENT_HOST' in os.environ: self.serverloc = os.environ['CONFLUENT_HOST'] @@ -163,6 +164,7 @@ class Command(object): self.serverloc = server if os.path.isabs(self.serverloc) and os.path.exists(self.serverloc): self._connect_unix() + self.unixdomain = True elif self.serverloc == '/var/run/confluent/api.sock': raise Exception('Confluent service is not available') else: @@ -178,6 +180,11 @@ class Command(object): passphrase = os.environ['CONFLUENT_PASSPHRASE'] self.authenticate(username, passphrase) + def add_file(self, name, handle): + if not self.unixdomain: + raise Exception('Can only add a file to a unix domain connection') + tlvdata.send(self.connection, {'filename': name}, handle) + def authenticate(self, username, password): tlvdata.send(self.connection, {'username': username, 'password': password}) diff --git a/confluent_client/confluent/tlv.py b/confluent_client/confluent/tlv.py index 99fd7869..361ccd03 100644 --- a/confluent_client/confluent/tlv.py +++ b/confluent_client/confluent/tlv.py @@ -17,4 +17,4 @@ class Types(object): - text, json = range(2) + text, json, filehandle = range(3) diff --git a/confluent_client/confluent/tlvdata.py b/confluent_client/confluent/tlvdata.py index f7d567fe..07e578c6 100644 --- a/confluent_client/confluent/tlvdata.py +++ b/confluent_client/confluent/tlvdata.py @@ -30,6 +30,23 @@ try: except NameError: pass + +class msghdr(ctypes.Structure): # from bits/socket.h + _fields_ = [('msg_name', ctypes.c_void_p), + ('msg_namelen', ctypes.c_uint), + ('msg_iov', ctypes.POINTER(iovec)), + ('msg_iovlen', ctypes.c_size_t), + ('msg_control', ctypes.c_void_p), + ('msg_controllen', ctypes.c_size_t), + ('msg_flags', ctypes.c_int)] + + +libc = ctypes.CDLL(ctypes.util.find_library('c')) +recvmsg = libc.recvmsg +recvmsg.argtypes = [ctypes.c_int, ctypes.POINNTER(msghdr), ctypes.c_int] +recvmsg.restype = ctypes.c_size_t +sendmsg = libc.sendmsg +sendmsg.argtypes = def decodestr(value): ret = None try: @@ -65,7 +82,7 @@ def _unicode_list(currlist): _unicode_list(currlist[i]) -def send(handle, data): +def send(handle, data, handle=None): if isinstance(data, unicode): try: data = data.encode('utf-8') @@ -93,9 +110,15 @@ def send(handle, data): if tl > 16777215: raise Exception("JSON data exceeds protocol limits") # xor in the type (0b1 << 24) - tl |= 16777216 - handle.sendall(struct.pack("!I", tl)) - handle.sendall(sdata) + if handle is None: + tl |= 16777216 + handle.sendall(struct.pack("!I", tl)) + handle.sendall(sdata) + else: + tl |= (2 << 24) + handle.sendall(struct.pack("!I", tl)) + #sendmsg with scm_rights to pass the handle + def recvall(handle, size): rd = handle.recv(size) diff --git a/confluent_server/bin/osdeploy b/confluent_server/bin/osdeploy index 18a6f9c4..c7d4651c 100644 --- a/confluent_server/bin/osdeploy +++ b/confluent_server/bin/osdeploy @@ -338,6 +338,9 @@ def oslist(): def osimport(imagefile): c = client.Command() imagefile = os.path.abspath(imagefile) + if c.unixdomain: + ofile = open(imagefile, 'rb') + c.add_file(imagefile, ofile.fileno) importing = False shortname = None for rsp in c.create('/deployment/importing/', {'filename': imagefile}): diff --git a/confluent_server/confluent/osimage.py b/confluent_server/confluent/osimage.py index 7a8d4ea0..fe813c32 100644 --- a/confluent_server/confluent/osimage.py +++ b/confluent_server/confluent/osimage.py @@ -626,7 +626,9 @@ class MediaImporter(object): with open(os.devnull, 'w') as devnull: self.worker = subprocess.Popen( [sys.executable, __file__, self.filename, '-b'], - stdin=devnull, stdout=subprocess.PIPE) + stdin=devnull, stdout=subprocess.PIPE) # pass_fds needed for + # passing filehandle, also set environment variable to say + # which filehandle it is wkr = self.worker currline = b'' while wkr.poll() is None: diff --git a/confluent_server/confluent/sockapi.py b/confluent_server/confluent/sockapi.py index 96df5045..b4f467a9 100644 --- a/confluent_server/confluent/sockapi.py +++ b/confluent_server/confluent/sockapi.py @@ -77,6 +77,20 @@ except ImportError: plainsocket = None +class msghdr(ctypes.Structure): # from bits/socket.h + _fields_ = [('msg_name', ctypes.c_void_p), + ('msg_namelen', ctypes.c_uint), + ('msg_iov', ctypes.POINTER(iovec)), + ('msg_iovlen', ctypes.c_size_t), + ('msg_control', ctypes.c_void_p), + ('msg_controllen', ctypes.c_size_t), + ('msg_flags', ctypes.c_int)] + +libc = ctypes.CDLL(ctypes.util.find_library('c')) +recvmsg = libc.recvmsg +recvmsg.argtypes = [ctypes.c_int, ctypes.POINNTER(msghdr), ctypes.c_int] +recvmsg.restype = ctypes.c_size_t + def _should_authlog(path, operation): if (operation == 'retrieve' and ('/sensors/' in path or '/health/' in path or @@ -126,7 +140,8 @@ def sessionhdl(connection, authname, skipauth=False, cert=None): cfm = authdata[1] authenticated = True # version 0 == original, version 1 == pickle3 allowed, 2 = pickle forbidden, msgpack allowed - send_data(connection, "Confluent -- v2 --") + # v3 - filehandle allowed + send_data(connection, "Confluent -- v3 --") while not authenticated: # prompt for name and passphrase send_data(connection, {'authpassed': 0}) response = tlvdata.recv(connection) @@ -475,7 +490,6 @@ class SockApi(object): self.unixdomainserver = eventlet.spawn(_unixdomainhandler) def watch_for_cert(self): - libc = ctypes.CDLL(ctypes.util.find_library('c')) watcher = libc.inotify_init1(os.O_NONBLOCK) if libc.inotify_add_watch(watcher, b'/etc/confluent/', 0x100) > -1: while True: