mirror of
https://github.com/xcat2/confluent.git
synced 2024-11-22 01:22:00 +00:00
Make file descriptor passing python2 friendly
python 2 did not have recvmsg/sendmsg, so have to use ctypes to access them through the c library.
This commit is contained in:
parent
ec2ad9861a
commit
83d92ecfcc
@ -20,6 +20,7 @@ import ctypes
|
||||
import ctypes.util
|
||||
import confluent.tlv as tlv
|
||||
import eventlet.green.socket as socket
|
||||
import eventlet.green.select as select
|
||||
from datetime import datetime
|
||||
import json
|
||||
import os
|
||||
@ -40,11 +41,32 @@ class iovec(ctypes.Structure): # from uio.h
|
||||
('iov_len', ctypes.c_size_t)]
|
||||
|
||||
|
||||
iovec_ptr = ctypes.POINTER(iovec)
|
||||
|
||||
|
||||
class cmsghdr(ctypes.Structure): # also from bits/socket.h
|
||||
_fields_ = [('cmsg_len', ctypes.c_size_t),
|
||||
('cmsg_level', ctypes.c_int),
|
||||
('cmsg_type', ctypes.c_int)]
|
||||
|
||||
@classmethod
|
||||
def init_data(cls, cmsg_len, cmsg_level, cmsg_type, cmsg_data):
|
||||
Data = ctypes.c_ubyte * ctypes.sizeof(cmsg_data)
|
||||
class _flexhdr(ctypes.Structure):
|
||||
_fields_ = cls._fields_ + [('cmsg_data', Data)]
|
||||
|
||||
datab = Data(*bytearray(cmsg_data))
|
||||
return _flexhdr(cmsg_len=cmsg_len, cmsg_level=cmsg_level,
|
||||
cmsg_type=cmsg_type, cmsg_data=datab)
|
||||
|
||||
|
||||
def CMSG_LEN(length):
|
||||
sizeof_cmshdr = ctypes.sizeof(cmsghdr)
|
||||
return ctypes.c_size_t(CMSG_ALIGN(sizeof_cmshdr).value + length)
|
||||
|
||||
|
||||
SCM_RIGHTS = 1
|
||||
|
||||
|
||||
class msghdr(ctypes.Structure): # from bits/socket.h
|
||||
_fields_ = [('msg_name', ctypes.c_void_p),
|
||||
@ -75,7 +97,7 @@ class ClientFile(object):
|
||||
libc = ctypes.CDLL(ctypes.util.find_library('c'))
|
||||
recvmsg = libc.recvmsg
|
||||
recvmsg.argtypes = [ctypes.c_int, ctypes.POINTER(msghdr), ctypes.c_int]
|
||||
recvmsg.restype = ctypes.c_size_t
|
||||
recvmsg.restype = ctypes.c_int
|
||||
sendmsg = libc.sendmsg
|
||||
sendmsg.argtypes = [ctypes.c_int, ctypes.POINTER(msghdr), ctypes.c_int]
|
||||
sendmsg.restype = ctypes.c_size_t
|
||||
@ -150,7 +172,19 @@ def send(handle, data, filehandle=None):
|
||||
else:
|
||||
tl |= (2 << 24)
|
||||
handle.sendall(struct.pack("!I", tl))
|
||||
handle.sendmsg([sdata], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, array.array("i", [filehandle]))])
|
||||
cdtype = ctypes.c_ubyte * len(sdata)
|
||||
cdata = cdtype.from_buffer(bytearray(sdata))
|
||||
ciov = iovec(iov_base=ctypes.addressof(cdata),
|
||||
iov_len=ctypes.c_size_t(ctypes.sizeof(cdata)))
|
||||
fd = ctypes.c_int(filehandle)
|
||||
cmh = cmsghdr.init_data(
|
||||
cmsg_len=CMSG_LEN(
|
||||
ctypes.sizeof(fd)), cmsg_level=socket.SOL_SOCKET,
|
||||
cmsg_type=SCM_RIGHTS, cmsg_data=fd)
|
||||
mh = msghdr(msg_name=None, msg_len=0, msg_iov=iovec_ptr(ciov),
|
||||
msg_iovlen=1, msg_control=ctypes.addressof(cmh),
|
||||
msg_controllen=ctypes.c_size_t(ctypes.sizeof(cmh)))
|
||||
sendmsg(handle.fileno(), mh, 0)
|
||||
|
||||
|
||||
def recvall(handle, size):
|
||||
@ -183,13 +217,32 @@ def recv(handle):
|
||||
return None
|
||||
if datatype == tlv.Types.filehandle:
|
||||
filehandles = array.array('i')
|
||||
data, adata, flags, addr = handle.recvmsg(
|
||||
dlen, socket.CMSG_LEN(filehandles.itemsize))
|
||||
for clev, ctype, cdata in adata:
|
||||
if clev == socket.SOL_SOCKET and ctype == socket.SCM_RIGHTS:
|
||||
filehandles.fromstring(
|
||||
cdata[:len(cdata) - len(cdata) % filehandles.itemsize])
|
||||
data = json.loads(data)
|
||||
rawbuffer = bytearray(2048)
|
||||
pkttype = ctypes.c_ubyte * 2048
|
||||
data = pkttype.from_buffer(rawbuffer)
|
||||
cmsgsize = CMSG_SPACE(ctypes.sizeof(ctypes.c_int)).value
|
||||
cmsgarr = bytearray(cmsgsize)
|
||||
cmtype = ctypes.c_ubyte * cmsgsize
|
||||
cmsg = cmtype.from_buffer(cmsgarr)
|
||||
cmsg.cmsg_level = socket.SOL_SOCKET
|
||||
cmsg.cmsg_type = SCM_RIGHTS
|
||||
cmsg.cmsg_len = CMSG_LEN(ctypes.sizeof(ctypes.c_int))
|
||||
iov = iovec()
|
||||
iov.iov_base = ctypes.addressof(data)
|
||||
iov.iov_len = 2048
|
||||
msg = msghdr()
|
||||
msg.msg_iov = ctypes.pointer(iov)
|
||||
msg.msg_iovlen = 1
|
||||
msg.msg_control = ctypes.addressof(cmsg)
|
||||
msg.msg_controllen = ctypes.sizeof(cmsg)
|
||||
select.select([handle], [], [])
|
||||
i = recvmsg(handle.fileno(), ctypes.pointer(msg), 0)
|
||||
cdata = cmsgarr[CMSG_LEN(0).value:]
|
||||
data = rawbuffer[:i]
|
||||
if cmsg.cmsg_level == socket.SOL_SOCKET and cmsg.cmsg_type == SCM_RIGHTS:
|
||||
filehandles.fromstring(bytes(
|
||||
cdata[:len(cdata) - len(cdata) % filehandles.itemsize]))
|
||||
data = json.loads(bytes(data))
|
||||
return ClientFile(data['filename'], data['mode'], filehandles[0])
|
||||
else:
|
||||
data = handle.recv(dlen)
|
||||
|
@ -21,8 +21,6 @@
|
||||
#
|
||||
|
||||
import atexit
|
||||
import ctypes
|
||||
import ctypes.util
|
||||
import errno
|
||||
import os
|
||||
import pwd
|
||||
@ -77,24 +75,6 @@ except ImportError:
|
||||
|
||||
plainsocket = None
|
||||
|
||||
class iovec(ctypes.Structure): # from uio.h
|
||||
_fields_ = [('iov_base', ctypes.c_void_p),
|
||||
('iov_len', ctypes.c_size_t)]
|
||||
|
||||
class msghdr(ctypes.Structure): # from bits/socket.h
|
||||
_fields_ = [('msg_name', ctypes.c_void_p),
|
||||
('msg_namelen', ctypes.c_uint),
|
||||
('msg_iov', ctypes.POINTER(iovec)),
|
||||
('msg_iovlen', ctypes.c_size_t),
|
||||
('msg_control', ctypes.c_void_p),
|
||||
('msg_controllen', ctypes.c_size_t),
|
||||
('msg_flags', ctypes.c_int)]
|
||||
|
||||
libc = ctypes.CDLL(ctypes.util.find_library('c'))
|
||||
recvmsg = libc.recvmsg
|
||||
recvmsg.argtypes = [ctypes.c_int, ctypes.POINTER(msghdr), ctypes.c_int]
|
||||
recvmsg.restype = ctypes.c_size_t
|
||||
|
||||
def _should_authlog(path, operation):
|
||||
if (operation == 'retrieve' and
|
||||
('/sensors/' in path or '/health/' in path or
|
||||
|
Loading…
Reference in New Issue
Block a user