2
0
mirror of https://github.com/xcat2/confluent.git synced 2024-11-26 03:19:48 +00:00

Improve resience of snoop threads

slp could crash in active_scan,
removing it's ability to passively scan.

pxe snoop could crash and go away persistently.
This commit is contained in:
Jarrod Johnson 2020-07-09 09:53:25 -04:00
parent 68c4652662
commit 994c53191c
2 changed files with 97 additions and 84 deletions

View File

@ -305,88 +305,92 @@ def snoop(handler, protocol=None):
net4.setsockopt(socket.IPPROTO_IP, IP_PKTINFO, 1)
net4.bind(('', 67))
while True:
# Just need some delay, picked a prime number so that overlap with other
# timers might be reduced, though it really is probably nothing
ready = select.select([net4], [], [], None)
if not ready or not ready[0]:
continue
clientaddr = sockaddr_in()
rawbuffer = bytearray(2048)
data = pkttype.from_buffer(rawbuffer)
msg = msghdr()
cmsgarr = bytearray(cmsgsize)
cmsg = cmsgtype.from_buffer(cmsgarr)
iov = iovec()
iov.iov_base = ctypes.addressof(data)
iov.iov_len = 2048
msg.msg_iov = ctypes.pointer(iov)
msg.msg_iovlen = 1
msg.msg_control = ctypes.addressof(cmsg)
msg.msg_controllen = ctypes.sizeof(cmsg)
msg.msg_name = ctypes.addressof(clientaddr)
msg.msg_namelen = ctypes.sizeof(clientaddr)
# We'll leave name and namelen blank for now
i = recvmsg(net4.fileno(), ctypes.pointer(msg), 0)
# if we have a small packet, just skip, it can't possible hold enough
# data and avoids some downstream IndexErrors that would be messy
# with try/except
if i < 64:
continue
#peer = ipfromint(clientaddr.sin_addr.s_addr)
# We don't need peer yet, generally it's 0.0.0.0
_, level, typ = struct.unpack('QII', cmsgarr[:16])
if level == socket.IPPROTO_IP and typ == IP_PKTINFO:
idx, recv, targ = struct.unpack('III', cmsgarr[16:28])
recv = ipfromint(recv)
targ = ipfromint(targ)
# peer is the source ip (in dhcpdiscover, 0.0.0.0)
# recv is the 'ip' that recevied the packet, regardless of target
# targ is the ip in the destination ip of the header.
# idx is the ip link number of the receiving nic
# For example, a DHCPDISCOVER will probably have:
# peer of 0.0.0.0
# targ of 255.255.255.255
# recv of <actual ip address that could reply>
# idx correlated to the nic
rqv = memoryview(rawbuffer)
rq = bytearray(rqv[:i])
if rq[0] == 1: # Boot request
addrlen = rq[2]
if addrlen > 16 or addrlen == 0:
try:
# Just need some delay, picked a prime number so that overlap with other
# timers might be reduced, though it really is probably nothing
ready = select.select([net4], [], [], None)
if not ready or not ready[0]:
continue
rawnetaddr = rq[28:28+addrlen]
netaddr = ':'.join(['{0:02x}'.format(x) for x in rawnetaddr])
optidx = 0
try:
optidx = rq.index(b'\x63\x82\x53\x63') + 4
except ValueError:
clientaddr = sockaddr_in()
rawbuffer = bytearray(2048)
data = pkttype.from_buffer(rawbuffer)
msg = msghdr()
cmsgarr = bytearray(cmsgsize)
cmsg = cmsgtype.from_buffer(cmsgarr)
iov = iovec()
iov.iov_base = ctypes.addressof(data)
iov.iov_len = 2048
msg.msg_iov = ctypes.pointer(iov)
msg.msg_iovlen = 1
msg.msg_control = ctypes.addressof(cmsg)
msg.msg_controllen = ctypes.sizeof(cmsg)
msg.msg_name = ctypes.addressof(clientaddr)
msg.msg_namelen = ctypes.sizeof(clientaddr)
# We'll leave name and namelen blank for now
i = recvmsg(net4.fileno(), ctypes.pointer(msg), 0)
# if we have a small packet, just skip, it can't possible hold enough
# data and avoids some downstream IndexErrors that would be messy
# with try/except
if i < 64:
continue
txid = rq[4:8] # struct.unpack('!I', rq[4:8])[0]
rqinfo, disco = opts_to_dict(rq, optidx)
vivso = disco.get('vivso', None)
if vivso:
# info['modelnumber'] = info['attributes']['enclosure-machinetype-model'][0]
#peer = ipfromint(clientaddr.sin_addr.s_addr)
# We don't need peer yet, generally it's 0.0.0.0
_, level, typ = struct.unpack('QII', cmsgarr[:16])
if level == socket.IPPROTO_IP and typ == IP_PKTINFO:
idx, recv, targ = struct.unpack('III', cmsgarr[16:28])
recv = ipfromint(recv)
targ = ipfromint(targ)
# peer is the source ip (in dhcpdiscover, 0.0.0.0)
# recv is the 'ip' that recevied the packet, regardless of target
# targ is the ip in the destination ip of the header.
# idx is the ip link number of the receiving nic
# For example, a DHCPDISCOVER will probably have:
# peer of 0.0.0.0
# targ of 255.255.255.255
# recv of <actual ip address that could reply>
# idx correlated to the nic
rqv = memoryview(rawbuffer)
rq = bytearray(rqv[:i])
if rq[0] == 1: # Boot request
addrlen = rq[2]
if addrlen > 16 or addrlen == 0:
continue
rawnetaddr = rq[28:28+addrlen]
netaddr = ':'.join(['{0:02x}'.format(x) for x in rawnetaddr])
optidx = 0
try:
optidx = rq.index(b'\x63\x82\x53\x63') + 4
except ValueError:
continue
txid = rq[4:8] # struct.unpack('!I', rq[4:8])[0]
rqinfo, disco = opts_to_dict(rq, optidx)
vivso = disco.get('vivso', None)
if vivso:
# info['modelnumber'] = info['attributes']['enclosure-machinetype-model'][0]
info = {'hwaddr': netaddr, 'uuid': disco['uuid'],
'architecture': vivso.get('arch', ''),
'services': (vivso['service-type'],),
'netinfo': {'ifidx': idx, 'recvip': recv, 'txid': txid},
'attributes': {'enclosure-machinetype-model': [vivso.get('machine', '')]}}
handler(info)
#consider_discover(info, rqinfo, net4, cfg, rqv)
continue
# We will fill out service to have something to byte into,
# but the nature of the beast is that we do not have peers,
# so that will not be present for a pxe snoop
info = {'hwaddr': netaddr, 'uuid': disco['uuid'],
'architecture': vivso.get('arch', ''),
'services': (vivso['service-type'],),
'architecture': disco['arch'],
'netinfo': {'ifidx': idx, 'recvip': recv, 'txid': txid},
'attributes': {'enclosure-machinetype-model': [vivso.get('machine', '')]}}
handler(info)
#consider_discover(info, rqinfo, net4, cfg, rqv)
continue
# We will fill out service to have something to byte into,
# but the nature of the beast is that we do not have peers,
# so that will not be present for a pxe snoop
info = {'hwaddr': netaddr, 'uuid': disco['uuid'],
'architecture': disco['arch'],
'netinfo': {'ifidx': idx, 'recvip': recv, 'txid': txid},
'services': ('pxe-client',)}
if disco['uuid']: #TODO(jjohnson2): need to explictly check for
# discover, so that the parser can go ahead and
# parse the options including uuid to enable
# ACK
handler(info)
consider_discover(info, rqinfo, net4, cfg, rqv)
'services': ('pxe-client',)}
if disco['uuid']: #TODO(jjohnson2): need to explictly check for
# discover, so that the parser can go ahead and
# parse the options including uuid to enable
# ACK
handler(info)
consider_discover(info, rqinfo, net4, cfg, rqv)
except Exception as e:
tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event,
event=log.Events.stacktrace)

View File

@ -344,14 +344,19 @@ def _add_attributes(parsed):
else:
net = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
net.settimeout(1.0)
net.settimeout(2.0)
net.connect(target)
except socket.error:
return
net.sendall(attrq)
rsp = net.recv(8192)
net.close()
_parse_attrs(rsp, parsed, xid)
try:
net.sendall(attrq)
rsp = net.recv(8192)
net.close()
_parse_attrs(rsp, parsed, xid)
except Exception as e:
# this can be a messy area, just degrade the quality of rsp
# in a bad situation
return
def query_srvtypes(target):
@ -415,7 +420,11 @@ def snoop(handler, protocol=None):
:return:
"""
tracelog = log.Logger('trace')
active_scan(handler, protocol)
try:
active_scan(handler, protocol)
except Exception as e:
tracelog.log(traceback.format_exc(), ltype=log.DataTypes.event,
event=log.Events.stacktrace)
net = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
net.setsockopt(IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
slpg = socket.inet_pton(socket.AF_INET6, 'ff01::123')