2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-02-12 08:40:24 +00:00

Begin work on passing filehandles for local cli

This would enable files to be uploaded/downloaded
using the client filehandles, overcoming awkward
difference in file privilege between client and
server.
This commit is contained in:
Jarrod Johnson 2021-02-01 09:05:15 -05:00
parent f16e84de32
commit edaaafa059
6 changed files with 57 additions and 8 deletions

View File

@ -154,6 +154,7 @@ class Command(object):
self._prevkeyname = None
self.connection = None
self._currnoderange = None
self.unixdomain = False
if server is None:
if 'CONFLUENT_HOST' in os.environ:
self.serverloc = os.environ['CONFLUENT_HOST']
@ -163,6 +164,7 @@ class Command(object):
self.serverloc = server
if os.path.isabs(self.serverloc) and os.path.exists(self.serverloc):
self._connect_unix()
self.unixdomain = True
elif self.serverloc == '/var/run/confluent/api.sock':
raise Exception('Confluent service is not available')
else:
@ -178,6 +180,11 @@ class Command(object):
passphrase = os.environ['CONFLUENT_PASSPHRASE']
self.authenticate(username, passphrase)
def add_file(self, name, handle):
if not self.unixdomain:
raise Exception('Can only add a file to a unix domain connection')
tlvdata.send(self.connection, {'filename': name}, handle)
def authenticate(self, username, password):
tlvdata.send(self.connection,
{'username': username, 'password': password})

View File

@ -17,4 +17,4 @@
class Types(object):
text, json = range(2)
text, json, filehandle = range(3)

View File

@ -30,6 +30,23 @@ try:
except NameError:
pass
class msghdr(ctypes.Structure): # from bits/socket.h
_fields_ = [('msg_name', ctypes.c_void_p),
('msg_namelen', ctypes.c_uint),
('msg_iov', ctypes.POINTER(iovec)),
('msg_iovlen', ctypes.c_size_t),
('msg_control', ctypes.c_void_p),
('msg_controllen', ctypes.c_size_t),
('msg_flags', ctypes.c_int)]
libc = ctypes.CDLL(ctypes.util.find_library('c'))
recvmsg = libc.recvmsg
recvmsg.argtypes = [ctypes.c_int, ctypes.POINNTER(msghdr), ctypes.c_int]
recvmsg.restype = ctypes.c_size_t
sendmsg = libc.sendmsg
sendmsg.argtypes =
def decodestr(value):
ret = None
try:
@ -65,7 +82,7 @@ def _unicode_list(currlist):
_unicode_list(currlist[i])
def send(handle, data):
def send(handle, data, handle=None):
if isinstance(data, unicode):
try:
data = data.encode('utf-8')
@ -93,9 +110,15 @@ def send(handle, data):
if tl > 16777215:
raise Exception("JSON data exceeds protocol limits")
# xor in the type (0b1 << 24)
tl |= 16777216
handle.sendall(struct.pack("!I", tl))
handle.sendall(sdata)
if handle is None:
tl |= 16777216
handle.sendall(struct.pack("!I", tl))
handle.sendall(sdata)
else:
tl |= (2 << 24)
handle.sendall(struct.pack("!I", tl))
#sendmsg with scm_rights to pass the handle
def recvall(handle, size):
rd = handle.recv(size)

View File

@ -338,6 +338,9 @@ def oslist():
def osimport(imagefile):
c = client.Command()
imagefile = os.path.abspath(imagefile)
if c.unixdomain:
ofile = open(imagefile, 'rb')
c.add_file(imagefile, ofile.fileno)
importing = False
shortname = None
for rsp in c.create('/deployment/importing/', {'filename': imagefile}):

View File

@ -626,7 +626,9 @@ class MediaImporter(object):
with open(os.devnull, 'w') as devnull:
self.worker = subprocess.Popen(
[sys.executable, __file__, self.filename, '-b'],
stdin=devnull, stdout=subprocess.PIPE)
stdin=devnull, stdout=subprocess.PIPE) # pass_fds needed for
# passing filehandle, also set environment variable to say
# which filehandle it is
wkr = self.worker
currline = b''
while wkr.poll() is None:

View File

@ -77,6 +77,20 @@ except ImportError:
plainsocket = None
class msghdr(ctypes.Structure): # from bits/socket.h
_fields_ = [('msg_name', ctypes.c_void_p),
('msg_namelen', ctypes.c_uint),
('msg_iov', ctypes.POINTER(iovec)),
('msg_iovlen', ctypes.c_size_t),
('msg_control', ctypes.c_void_p),
('msg_controllen', ctypes.c_size_t),
('msg_flags', ctypes.c_int)]
libc = ctypes.CDLL(ctypes.util.find_library('c'))
recvmsg = libc.recvmsg
recvmsg.argtypes = [ctypes.c_int, ctypes.POINNTER(msghdr), ctypes.c_int]
recvmsg.restype = ctypes.c_size_t
def _should_authlog(path, operation):
if (operation == 'retrieve' and
('/sensors/' in path or '/health/' in path or
@ -126,7 +140,8 @@ def sessionhdl(connection, authname, skipauth=False, cert=None):
cfm = authdata[1]
authenticated = True
# version 0 == original, version 1 == pickle3 allowed, 2 = pickle forbidden, msgpack allowed
send_data(connection, "Confluent -- v2 --")
# v3 - filehandle allowed
send_data(connection, "Confluent -- v3 --")
while not authenticated: # prompt for name and passphrase
send_data(connection, {'authpassed': 0})
response = tlvdata.recv(connection)
@ -475,7 +490,6 @@ class SockApi(object):
self.unixdomainserver = eventlet.spawn(_unixdomainhandler)
def watch_for_cert(self):
libc = ctypes.CDLL(ctypes.util.find_library('c'))
watcher = libc.inotify_init1(os.O_NONBLOCK)
if libc.inotify_add_watch(watcher, b'/etc/confluent/', 0x100) > -1:
while True: