From decee2ed93dd28d1a85b5973dd079f0f3a55a6d9 Mon Sep 17 00:00:00 2001 From: Jarrod Johnson Date: Tue, 19 Oct 2021 15:39:47 -0400 Subject: [PATCH] Add fallback to apiclient When apiclient fails using deploycfg, add a multicast scan looking for replacements. --- .../common/opt/confluent/bin/apiclient | 121 +++++++++++++++++- 1 file changed, 118 insertions(+), 3 deletions(-) diff --git a/confluent_osdeploy/common/opt/confluent/bin/apiclient b/confluent_osdeploy/common/opt/confluent/bin/apiclient index f4383332..40fe6989 100644 --- a/confluent_osdeploy/common/opt/confluent/bin/apiclient +++ b/confluent_osdeploy/common/opt/confluent/bin/apiclient @@ -5,22 +5,118 @@ except ImportError: import httplib as client import ctypes import ctypes.util +import glob import os +import select import socket import subprocess import ssl import sys +import struct import time class InvalidApiKey(Exception): pass -c_libcrypt = ctypes.CDLL(ctypes.util.find_library('crypt')) +cryptname = ctypes.util.find_library('crypt') +if not cryptname: + if os.path.exists('/usr/lib64/libcrypt.so.1'): + cryptname = 'libcrypt.so.1' + elif os.path.exists('/usr/lib64/libcrypt.so.2'): + cryptname = 'libcrypt.so.2' +c_libcrypt = ctypes.CDLL(cryptname) c_crypt = c_libcrypt.crypt c_crypt.argtypes = (ctypes.c_char_p, ctypes.c_char_p) c_crypt.restype = ctypes.c_char_p +def get_my_addresses(): + nlhdrsz = struct.calcsize('IHHII') + ifaddrsz = struct.calcsize('BBBBI') + # RTM_GETADDR = 22 + # nlmsghdr struct: u32 len, u16 type, u16 flags, u32 seq, u32 pid + nlhdr = struct.pack('IHHII', nlhdrsz + ifaddrsz, 22, 0x301, 0, 0) + # ifaddrmsg struct: u8 family, u8 prefixlen, u8 flags, u8 scope, u32 index + ifaddrmsg = struct.pack('BBBBI', 0, 0, 0, 0, 0) + s = socket.socket(socket.AF_NETLINK, socket.SOCK_RAW, socket.NETLINK_ROUTE) + s.bind((0, 0)) + s.sendall(nlhdr + ifaddrmsg) + addrs = [] + while True: + pdata = s.recv(65536) + v = memoryview(pdata) + if struct.unpack('H', v[4:6])[0] == 3: # netlink done message + break + while len(v): + length, typ = struct.unpack('IH', v[:6]) + if typ == 20: + fam, plen, _, scope, ridx = struct.unpack('BBBBI', v[nlhdrsz:nlhdrsz+ifaddrsz]) + if scope in (253, 0): + rta = v[nlhdrsz+ifaddrsz:length] + while len(rta): + rtalen, rtatyp = struct.unpack('HH', rta[:4]) + if rtatyp == 1: + addrs.append((fam, rta[4:rtalen], plen, ridx)) + if not rtalen: + break + rta = rta[rtalen:] + v = v[length:] + return addrs + + +def scan_confluents(): + srvs = {} + s6 = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) + s6.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1) + s4 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s4.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + doneidxs = set([]) + msg = 'M-SEARCH * HTTP/1.1\r\nST: urn:xcat.org:service:confluent:' + with open('/etc/confluent/confluent.deploycfg') as dcfg: + for line in dcfg.read().split('\n'): + if line.startswith('confluent_uuid:'): + confluentuuid = line.split(': ')[1] + msg += '/confluentuuid=' + confluentuuid + break + with open('/sys/devices/virtual/dmi/id/product_uuid') as uuidin: + msg += '/uuid=' + uuidin.read().strip() + for addrf in glob.glob('/sys/class/net/*/address'): + with open(addrf) as addrin: + hwaddr = addrin.read().strip() + msg += '/mac=' + hwaddr + msg = msg.encode('utf8') + for addr in get_my_addresses(): + if addr[0] == socket.AF_INET6: + if addr[-1] in doneidxs: + continue + s6.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, addr[-1]) + s6.sendto(msg, ('ff02::c', 1900)) + doneidxs.add(addr[-1]) + elif addr[0] == socket.AF_INET: + s4.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, addr[1]) + s4.sendto(msg, ('239.255.255.250', 1900)) + r = select.select((s4, s6), (), (), 4) + if r: + r = r[0] + while r: + for s in r: + (rsp, peer) = s.recvfrom(9000) + rsp = rsp.split(b'\r\n') + current = None + for line in rsp: + if line.startswith(b'NODENAME: '): + current = {} + elif line.startswith(b'DEFAULTNET: 1'): + current['isdefault'] = True + elif line.startswith(b'MGTIFACE: '): + current['mgtiface'] = line.replace(b'MGTIFACE: ', b'').strip().decode('utf8') + srvs[peer[0]] = current + r = select.select((s4, s6), (), (), 2) + if r: + r = r[0] + return srvs + + def get_net_apikey(nodename, mgr): alpha = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789./' newpass = ''.join([alpha[x >> 2] for x in bytearray(os.urandom(32))]) @@ -155,7 +251,7 @@ class HTTPSClient(client.HTTPConnection, object): hosts.append(self.v4srv) if self.v6srv: hosts.append(self.v6srv) - for timeo in (0.1, 10): + for timeo in (0.1, 5): for host in hosts: try: addrinf = socket.getaddrinfo(host, self.port)[0] @@ -171,7 +267,26 @@ class HTTPSClient(client.HTTPConnection, object): continue break if not foundsrv: - raise Exception('Unable to reach any hosts') + srvs = scan_confluents() + hosts = [] + for srv in srvs: + if srvs[srv].get('isdefault', False): + hosts = [srv] + hosts + else: + hosts = hosts + [srv] + for host in hosts: + try: + addrinf = socket.getaddrinfo(host, self.port)[0] + psock = socket.socket(addrinf[0]) + psock.settimeout(timeo) + psock.connect(addrinf[4]) + foundsrv = host + psock.close() + break + except OSError: + continue + else: + raise Exception('Unable to reach any hosts') return foundsrv def connect(self):