2
0
mirror of https://github.com/xcat2/confluent.git synced 2024-11-22 17:43:14 +00:00

Add fallback to apiclient

When apiclient fails using deploycfg, add
a multicast scan looking for replacements.
This commit is contained in:
Jarrod Johnson 2021-10-19 15:39:47 -04:00
parent 0725717359
commit decee2ed93

View File

@ -5,22 +5,118 @@ except ImportError:
import httplib as client
import ctypes
import ctypes.util
import glob
import os
import select
import socket
import subprocess
import ssl
import sys
import struct
import time
class InvalidApiKey(Exception):
pass
c_libcrypt = ctypes.CDLL(ctypes.util.find_library('crypt'))
cryptname = ctypes.util.find_library('crypt')
if not cryptname:
if os.path.exists('/usr/lib64/libcrypt.so.1'):
cryptname = 'libcrypt.so.1'
elif os.path.exists('/usr/lib64/libcrypt.so.2'):
cryptname = 'libcrypt.so.2'
c_libcrypt = ctypes.CDLL(cryptname)
c_crypt = c_libcrypt.crypt
c_crypt.argtypes = (ctypes.c_char_p, ctypes.c_char_p)
c_crypt.restype = ctypes.c_char_p
def get_my_addresses():
nlhdrsz = struct.calcsize('IHHII')
ifaddrsz = struct.calcsize('BBBBI')
# RTM_GETADDR = 22
# nlmsghdr struct: u32 len, u16 type, u16 flags, u32 seq, u32 pid
nlhdr = struct.pack('IHHII', nlhdrsz + ifaddrsz, 22, 0x301, 0, 0)
# ifaddrmsg struct: u8 family, u8 prefixlen, u8 flags, u8 scope, u32 index
ifaddrmsg = struct.pack('BBBBI', 0, 0, 0, 0, 0)
s = socket.socket(socket.AF_NETLINK, socket.SOCK_RAW, socket.NETLINK_ROUTE)
s.bind((0, 0))
s.sendall(nlhdr + ifaddrmsg)
addrs = []
while True:
pdata = s.recv(65536)
v = memoryview(pdata)
if struct.unpack('H', v[4:6])[0] == 3: # netlink done message
break
while len(v):
length, typ = struct.unpack('IH', v[:6])
if typ == 20:
fam, plen, _, scope, ridx = struct.unpack('BBBBI', v[nlhdrsz:nlhdrsz+ifaddrsz])
if scope in (253, 0):
rta = v[nlhdrsz+ifaddrsz:length]
while len(rta):
rtalen, rtatyp = struct.unpack('HH', rta[:4])
if rtatyp == 1:
addrs.append((fam, rta[4:rtalen], plen, ridx))
if not rtalen:
break
rta = rta[rtalen:]
v = v[length:]
return addrs
def scan_confluents():
srvs = {}
s6 = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
s6.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
s4 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s4.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
doneidxs = set([])
msg = 'M-SEARCH * HTTP/1.1\r\nST: urn:xcat.org:service:confluent:'
with open('/etc/confluent/confluent.deploycfg') as dcfg:
for line in dcfg.read().split('\n'):
if line.startswith('confluent_uuid:'):
confluentuuid = line.split(': ')[1]
msg += '/confluentuuid=' + confluentuuid
break
with open('/sys/devices/virtual/dmi/id/product_uuid') as uuidin:
msg += '/uuid=' + uuidin.read().strip()
for addrf in glob.glob('/sys/class/net/*/address'):
with open(addrf) as addrin:
hwaddr = addrin.read().strip()
msg += '/mac=' + hwaddr
msg = msg.encode('utf8')
for addr in get_my_addresses():
if addr[0] == socket.AF_INET6:
if addr[-1] in doneidxs:
continue
s6.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, addr[-1])
s6.sendto(msg, ('ff02::c', 1900))
doneidxs.add(addr[-1])
elif addr[0] == socket.AF_INET:
s4.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, addr[1])
s4.sendto(msg, ('239.255.255.250', 1900))
r = select.select((s4, s6), (), (), 4)
if r:
r = r[0]
while r:
for s in r:
(rsp, peer) = s.recvfrom(9000)
rsp = rsp.split(b'\r\n')
current = None
for line in rsp:
if line.startswith(b'NODENAME: '):
current = {}
elif line.startswith(b'DEFAULTNET: 1'):
current['isdefault'] = True
elif line.startswith(b'MGTIFACE: '):
current['mgtiface'] = line.replace(b'MGTIFACE: ', b'').strip().decode('utf8')
srvs[peer[0]] = current
r = select.select((s4, s6), (), (), 2)
if r:
r = r[0]
return srvs
def get_net_apikey(nodename, mgr):
alpha = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789./'
newpass = ''.join([alpha[x >> 2] for x in bytearray(os.urandom(32))])
@ -155,7 +251,7 @@ class HTTPSClient(client.HTTPConnection, object):
hosts.append(self.v4srv)
if self.v6srv:
hosts.append(self.v6srv)
for timeo in (0.1, 10):
for timeo in (0.1, 5):
for host in hosts:
try:
addrinf = socket.getaddrinfo(host, self.port)[0]
@ -171,7 +267,26 @@ class HTTPSClient(client.HTTPConnection, object):
continue
break
if not foundsrv:
raise Exception('Unable to reach any hosts')
srvs = scan_confluents()
hosts = []
for srv in srvs:
if srvs[srv].get('isdefault', False):
hosts = [srv] + hosts
else:
hosts = hosts + [srv]
for host in hosts:
try:
addrinf = socket.getaddrinfo(host, self.port)[0]
psock = socket.socket(addrinf[0])
psock.settimeout(timeo)
psock.connect(addrinf[4])
foundsrv = host
psock.close()
break
except OSError:
continue
else:
raise Exception('Unable to reach any hosts')
return foundsrv
def connect(self):