From c525a08c1702e53615fffc17cd879a5dc2c58f4b Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Wed, 17 Feb 2021 14:31:14 -0500 Subject: [PATCH] Correct a number of mistakes in the draft commit --- confluent_client/confluent/tlvdata.py | 15 +++++++++++---- confluent_server/confluent/collective/manager.py | 4 ++-- confluent_server/confluent/sockapi.py | 6 +++++- 3 files changed, 18 insertions(+), 7 deletions(-) diff --git a/confluent_client/confluent/tlvdata.py b/confluent_client/confluent/tlvdata.py index 07e578c6..3cfbbefe 100644 --- a/confluent_client/confluent/tlvdata.py +++ b/confluent_client/confluent/tlvdata.py @@ -15,6 +15,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import ctypes +import ctypes.util import confluent.tlv as tlv from datetime import datetime import json @@ -30,6 +32,9 @@ try: except NameError: pass +class iovec(ctypes.Structure): # from uio.h + _fields_ = [('iov_base', ctypes.c_void_p), + ('iov_len', ctypes.c_size_t)] class msghdr(ctypes.Structure): # from bits/socket.h _fields_ = [('msg_name', ctypes.c_void_p), @@ -43,10 +48,12 @@ class msghdr(ctypes.Structure): # from bits/socket.h libc = ctypes.CDLL(ctypes.util.find_library('c')) recvmsg = libc.recvmsg -recvmsg.argtypes = [ctypes.c_int, ctypes.POINNTER(msghdr), ctypes.c_int] +recvmsg.argtypes = [ctypes.c_int, ctypes.POINTER(msghdr), ctypes.c_int] recvmsg.restype = ctypes.c_size_t sendmsg = libc.sendmsg -sendmsg.argtypes = +sendmsg.argtypes = [ctypes.c_int, ctypes.POINTER(msghdr), ctypes.c_int] +sendmsg.restype = ctypes.c_size_t + def decodestr(value): ret = None try: @@ -82,7 +89,7 @@ def _unicode_list(currlist): _unicode_list(currlist[i]) -def send(handle, data, handle=None): +def send(handle, data, filehandle=None): if isinstance(data, unicode): try: data = data.encode('utf-8') @@ -110,7 +117,7 @@ def send(handle, data, handle=None): if tl > 16777215: raise Exception("JSON data exceeds protocol limits") # xor in the type (0b1 << 24) - if handle is None: + if filehandle is None: tl |= 16777216 handle.sendall(struct.pack("!I", tl)) handle.sendall(sdata) diff --git a/confluent_server/confluent/collective/manager.py b/confluent_server/confluent/collective/manager.py index 38a6e0bf..f8a46de9 100644 --- a/confluent_server/confluent/collective/manager.py +++ b/confluent_server/confluent/collective/manager.py @@ -76,8 +76,8 @@ def connect_to_leader(cert=None, name=None, leader=None): with cfm._initlock: banner = tlvdata.recv(remote) # the banner vers = banner.split()[2] - if vers != b'v2': - raise Exception('This instance only supports protocol 2, synchronize versions between collective members') + if vers not in (b'v2', b'v3'): + raise Exception('This instance only supports protocol 2 or 3, synchronize versions between collective members') tlvdata.recv(remote) # authpassed... 0.. if name is None: name = get_myname() diff --git a/confluent_server/confluent/sockapi.py b/confluent_server/confluent/sockapi.py index b4f467a9..48f816f5 100644 --- a/confluent_server/confluent/sockapi.py +++ b/confluent_server/confluent/sockapi.py @@ -77,6 +77,10 @@ except ImportError: plainsocket = None +class iovec(ctypes.Structure): # from uio.h + _fields_ = [('iov_base', ctypes.c_void_p), + ('iov_len', ctypes.c_size_t)] + class msghdr(ctypes.Structure): # from bits/socket.h _fields_ = [('msg_name', ctypes.c_void_p), ('msg_namelen', ctypes.c_uint), @@ -88,7 +92,7 @@ class msghdr(ctypes.Structure): # from bits/socket.h libc = ctypes.CDLL(ctypes.util.find_library('c')) recvmsg = libc.recvmsg -recvmsg.argtypes = [ctypes.c_int, ctypes.POINNTER(msghdr), ctypes.c_int] +recvmsg.argtypes = [ctypes.c_int, ctypes.POINTER(msghdr), ctypes.c_int] recvmsg.restype = ctypes.c_size_t def _should_authlog(path, operation):