mirror of
https://github.com/xcat2/confluent.git
synced 2025-08-29 22:38:22 +00:00
Switch to asyncio usage of pysnmnp
This requires pysnm 6, the edition that should become the official one, maintained by lextudio
This commit is contained in:
@@ -45,7 +45,6 @@ if __name__ == '__main__':
|
||||
import asyncio
|
||||
from confluent.networking.lldp import _handle_neighbor_query, get_fingerprint
|
||||
from confluent.networking.netutil import get_switchcreds, list_switches, get_portnamemap
|
||||
import eventlet.green.select as select
|
||||
|
||||
import socket
|
||||
|
||||
@@ -55,10 +54,7 @@ import confluent.log as log
|
||||
import confluent.messages as msg
|
||||
import confluent.noderange as noderange
|
||||
import confluent.util as util
|
||||
import eventlet.green.subprocess as subprocess
|
||||
import fcntl
|
||||
import eventlet
|
||||
import eventlet.semaphore
|
||||
import msgpack
|
||||
import random
|
||||
import re
|
||||
@@ -201,9 +197,7 @@ async def _offload_map_switch(switch, password, user):
|
||||
use_bin_type=True))
|
||||
#_offloader.stdin.flush()
|
||||
await _offloader.stdin.drain()
|
||||
print("dispatched..")
|
||||
result = await _offloadevts[evtid]
|
||||
print("returned..")
|
||||
del _offloadevts[evtid]
|
||||
if len(result) == 2:
|
||||
if result[0] == 1:
|
||||
@@ -233,9 +227,7 @@ async def _recv_offload():
|
||||
upacker = msgpack.Unpacker(raw=False, strict_map_key=False)
|
||||
#instream = _offloader.stdout.fileno()
|
||||
while True:
|
||||
#select.select([_offloader.stdout], [], [])
|
||||
datum = await _offloader.stdout.read(512)
|
||||
print(repr(datum))
|
||||
upacker.feed(datum)
|
||||
for result in upacker:
|
||||
if result[0] not in _offloadevts:
|
||||
@@ -349,9 +341,9 @@ async def _map_switch_backend(args):
|
||||
_nodesbymac[mac] = (nodename, maccounts[ifname])
|
||||
_macsbyswitch[switch] = newmacs
|
||||
|
||||
def _snmp_map_switch_relay(rqid, switch, password, user):
|
||||
async def _snmp_map_switch_relay(rqid, switch, password, user):
|
||||
try:
|
||||
res = _snmp_map_switch(switch, password, user)
|
||||
res = await _snmp_map_switch(switch, password, user)
|
||||
payload = msgpack.packb((rqid,) + res, use_bin_type=True)
|
||||
try:
|
||||
sys.stdout.buffer.write(payload)
|
||||
@@ -365,6 +357,7 @@ def _snmp_map_switch_relay(rqid, switch, password, user):
|
||||
except AttributeError:
|
||||
sys.stdout.write(payload)
|
||||
except Exception as e:
|
||||
import traceback
|
||||
payload = msgpack.packb((rqid, 2, str(e)), use_bin_type=True)
|
||||
try:
|
||||
sys.stdout.buffer.write(payload)
|
||||
@@ -373,12 +366,12 @@ def _snmp_map_switch_relay(rqid, switch, password, user):
|
||||
finally:
|
||||
sys.stdout.flush()
|
||||
|
||||
def _snmp_map_switch(switch, password, user):
|
||||
async def _snmp_map_switch(switch, password, user):
|
||||
haveqbridge = False
|
||||
mactobridge = {}
|
||||
conn = snmp.Session(switch, password, user)
|
||||
ifnamemap = get_portnamemap(conn)
|
||||
for vb in conn.walk('1.3.6.1.2.1.17.7.1.2.2.1.2'):
|
||||
ifnamemap = await get_portnamemap(conn)
|
||||
async for vb in conn.walk('1.3.6.1.2.1.17.7.1.2.2.1.2'):
|
||||
haveqbridge = True
|
||||
oid, bridgeport = vb
|
||||
if not bridgeport:
|
||||
@@ -390,7 +383,7 @@ def _snmp_map_switch(switch, password, user):
|
||||
)
|
||||
mactobridge[macaddr] = int(bridgeport)
|
||||
if not haveqbridge:
|
||||
for vb in conn.walk('1.3.6.1.2.1.17.4.3.1.2'):
|
||||
async for vb in conn.walk('1.3.6.1.2.1.17.4.3.1.2'):
|
||||
oid, bridgeport = vb
|
||||
if not bridgeport:
|
||||
continue
|
||||
@@ -402,15 +395,15 @@ def _snmp_map_switch(switch, password, user):
|
||||
vlanstocheck = set([])
|
||||
try:
|
||||
#ciscoiftovlanmap = {}
|
||||
for vb in conn.walk('.1.3.6.1.4.1.9.9.68.1.2.2.1.2'):
|
||||
async for vb in conn.walk('.1.3.6.1.4.1.9.9.68.1.2.2.1.2'):
|
||||
vlanstocheck.add(vb[1])
|
||||
#ciscotrunktovlanmap = {}
|
||||
for vb in conn.walk('.1.3.6.1.4.1.9.9.46.1.6.1.1.5'):
|
||||
async for vb in conn.walk('.1.3.6.1.4.1.9.9.46.1.6.1.1.5'):
|
||||
vlanstocheck.add(vb[1])
|
||||
except Exception:
|
||||
# We might have crashed snmp on a non-cisco switch
|
||||
# in such a case, delay 8 seconds to allow recovery to complete
|
||||
eventlet.sleep(8)
|
||||
await asyncio.sleep(8)
|
||||
if not vlanstocheck:
|
||||
vlanstocheck.add(None)
|
||||
bridgetoifmap = {}
|
||||
@@ -422,7 +415,7 @@ def _snmp_map_switch(switch, password, user):
|
||||
if not isinstance(password, str):
|
||||
password = password.decode('utf8')
|
||||
conn = snmp.Session(switch, '{}@{}'.format(password, vlan))
|
||||
for vb in conn.walk('1.3.6.1.2.1.17.1.4.1.2'):
|
||||
async for vb in conn.walk('1.3.6.1.2.1.17.1.4.1.2'):
|
||||
bridgeport, ifidx = vb
|
||||
bridgeport = int(str(bridgeport).rsplit('.', 1)[1])
|
||||
try:
|
||||
@@ -478,12 +471,12 @@ async def update_macmap(configmanager, impatient=False):
|
||||
except GeneratorExit:
|
||||
# the calling function has stopped caring, but we want to finish
|
||||
# the sweep, background it
|
||||
eventlet.spawn_n(_finish_update, completions)
|
||||
util.spawn(_finish_update(completions))
|
||||
raise
|
||||
|
||||
|
||||
def _finish_update(completions):
|
||||
for _ in completions:
|
||||
async def _finish_update(completions):
|
||||
async for _ in completions:
|
||||
pass
|
||||
|
||||
|
||||
@@ -711,27 +704,30 @@ async def rescan(cfg):
|
||||
async for _ in update_macmap(cfg):
|
||||
pass
|
||||
|
||||
async def get_stdin_reader(cloop):
|
||||
reader = asyncio.StreamReader()
|
||||
protocol = asyncio.StreamReaderProtocol(reader)
|
||||
await cloop.connect_read_pipe(lambda: protocol, sys.stdin)
|
||||
return reader
|
||||
|
||||
if __name__ == '__main__':
|
||||
if len(sys.argv) > 1 and sys.argv[1] == '-o':
|
||||
try:
|
||||
upacker = msgpack.Unpacker(encoding='utf8')
|
||||
except TypeError:
|
||||
upacker = msgpack.Unpacker(raw=False, strict_map_key=False)
|
||||
currfl = fcntl.fcntl(sys.stdin.fileno(), fcntl.F_GETFL)
|
||||
fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, currfl | os.O_NONBLOCK)
|
||||
async def offloader_main(cloop):
|
||||
try:
|
||||
upacker = msgpack.Unpacker(encoding='utf8')
|
||||
except TypeError:
|
||||
upacker = msgpack.Unpacker(raw=False, strict_map_key=False)
|
||||
#currfl = fcntl.fcntl(sys.stdin.fileno(), fcntl.F_GETFL)
|
||||
#fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, currfl | os.O_NONBLOCK)
|
||||
sreader = await get_stdin_reader(cloop)
|
||||
while True:
|
||||
data = await sreader.read(512)
|
||||
upacker.feed(data)
|
||||
for cmd in upacker:
|
||||
util.spawn(_snmp_map_switch_relay(*cmd))
|
||||
sys.exit(0)
|
||||
|
||||
while True:
|
||||
r = select.select([sys.stdin], [], [])
|
||||
try:
|
||||
upacker.feed(sys.stdin.buffer.read())
|
||||
except AttributeError:
|
||||
upacker.feed(sys.stdin.read())
|
||||
for cmd in upacker:
|
||||
eventlet.spawn_n(_snmp_map_switch_relay, *cmd)
|
||||
sys.exit(0)
|
||||
async def test_main(cloop):
|
||||
cg = cfm.ConfigManager(None)
|
||||
for res in update_macmap(cg):
|
||||
async for res in update_macmap(cg):
|
||||
print("map has updated")
|
||||
if len(sys.argv) > 1:
|
||||
print(repr(_macmap[sys.argv[1]]))
|
||||
@@ -743,3 +739,10 @@ if __name__ == '__main__':
|
||||
print(repr(_macmap))
|
||||
print("switch to fdb lookup table: -------------------")
|
||||
print(repr(_macsbyswitch))
|
||||
|
||||
if __name__ == '__main__':
|
||||
cloop = asyncio.get_event_loop()
|
||||
if len(sys.argv) > 1 and sys.argv[1] == '-o':
|
||||
cloop.run_until_complete(offloader_main(cloop))
|
||||
sys.exit(0)
|
||||
cloop.run_until_complete(test_main(cloop))
|
||||
|
@@ -66,10 +66,10 @@ def list_switches(configmanager):
|
||||
return util.natural_sort(switches)
|
||||
|
||||
|
||||
def get_portnamemap(conn):
|
||||
async def get_portnamemap(conn):
|
||||
ifnamemap = {}
|
||||
havenames = False
|
||||
for vb in conn.walk('1.3.6.1.2.1.31.1.1.1.1'):
|
||||
async for vb in conn.walk('1.3.6.1.2.1.31.1.1.1.1'):
|
||||
ifidx, ifname = vb
|
||||
if not ifname:
|
||||
continue
|
||||
@@ -77,8 +77,8 @@ def get_portnamemap(conn):
|
||||
ifidx = int(str(ifidx).rsplit('.', 1)[1])
|
||||
ifnamemap[ifidx] = str(ifname)
|
||||
if not havenames:
|
||||
for vb in conn.walk('1.3.6.1.2.1.2.2.1.2'):
|
||||
async for vb in conn.walk('1.3.6.1.2.1.2.2.1.2'):
|
||||
ifidx, ifname = vb
|
||||
ifidx = int(str(ifidx).rsplit('.', 1)[1])
|
||||
ifnamemap[ifidx] = str(ifname)
|
||||
return ifnamemap
|
||||
return ifnamemap
|
||||
|
@@ -21,17 +21,14 @@
|
||||
# patch pysnmp to have it be eventlet friendly has caused it's selection
|
||||
# This module simplifies the complex hlapi pysnmp interface
|
||||
|
||||
import asyncio
|
||||
import confluent.exceptions as exc
|
||||
import eventlet
|
||||
from eventlet.support.greendns import getaddrinfo
|
||||
import pysnmp.smi.error as snmperr
|
||||
import socket
|
||||
snmp = eventlet.import_patched('pysnmp.hlapi')
|
||||
import pysnmp.hlapi.asyncio as snmp
|
||||
|
||||
|
||||
def _get_transport(name):
|
||||
async def _get_transport(name):
|
||||
# Annoyingly, pysnmp does not automatically determine ipv6 v ipv4
|
||||
res = getaddrinfo(name, 161, 0, socket.SOCK_DGRAM)
|
||||
res = await asyncio.get_event_loop().getaddrinfo(name, 161, type=socket.SOCK_DGRAM)
|
||||
if res[0][0] == socket.AF_INET6:
|
||||
return snmp.Udp6TransportTarget(res[0][4], 2)
|
||||
else:
|
||||
@@ -64,7 +61,7 @@ class Session(object):
|
||||
authProtocol=snmp.usmHMACSHAAuthProtocol)
|
||||
self.eng = snmp.SnmpEngine()
|
||||
|
||||
def walk(self, oid):
|
||||
async def walk(self, oid):
|
||||
"""Walk over children of a given OID
|
||||
|
||||
This is roughly equivalent to snmpwalk. It will automatically try to
|
||||
@@ -77,7 +74,7 @@ class Session(object):
|
||||
# there may come a time where we add more parameters to override the
|
||||
# automatic behavior (e.g. DES is weak, so it's likely to be
|
||||
# overriden, but some devices only support DES)
|
||||
tp = _get_transport(self.server)
|
||||
tp = await _get_transport(self.server)
|
||||
ctx = snmp.ContextData(self.context)
|
||||
resolvemib = False
|
||||
if '::' in oid:
|
||||
@@ -86,33 +83,31 @@ class Session(object):
|
||||
obj = snmp.ObjectType(snmp.ObjectIdentity(mib, field))
|
||||
else:
|
||||
obj = snmp.ObjectType(snmp.ObjectIdentity(oid))
|
||||
|
||||
walking = snmp.bulkCmd(self.eng, self.authdata, tp, ctx, 0, 10, obj,
|
||||
rsps = snmp.bulkWalkCmd(self.eng, self.authdata, tp, ctx, 0, 10, obj,
|
||||
lexicographicMode=False, lookupMib=resolvemib)
|
||||
try:
|
||||
for rsp in walking:
|
||||
errstr, errnum, erridx, answers = rsp
|
||||
if errstr:
|
||||
errstr = str(errstr)
|
||||
finerr = errstr + ' while trying to connect to ' \
|
||||
'{0}'.format(self.server)
|
||||
if errstr in ('Unknown USM user', 'unknownUserName',
|
||||
'wrongDigest', 'Wrong SNMP PDU digest'):
|
||||
raise exc.TargetEndpointBadCredentials(finerr)
|
||||
# need to do bad credential versus timeout
|
||||
raise exc.TargetEndpointUnreachable(finerr)
|
||||
elif errnum:
|
||||
raise exc.ConfluentException(errnum.prettyPrint() +
|
||||
' while trying to connect to '
|
||||
'{0}'.format(self.server))
|
||||
for ans in answers:
|
||||
if not obj[0].isPrefixOf(ans[0]):
|
||||
# PySNMP returns leftovers in a bulk command
|
||||
# filter out such leftovers
|
||||
break
|
||||
yield ans
|
||||
except snmperr.WrongValueError:
|
||||
raise exc.TargetEndpointBadCredentials('Invalid SNMPv3 password')
|
||||
async for rsp in rsps:
|
||||
errstr, errnum, erridx, answers = rsp
|
||||
if errstr:
|
||||
errstr = str(errstr)
|
||||
finerr = errstr + ' while trying to connect to ' \
|
||||
'{0}'.format(self.server)
|
||||
if errstr in ('Unknown USM user', 'unknownUserName',
|
||||
'wrongDigest', 'Wrong SNMP PDU digest'):
|
||||
raise exc.TargetEndpointBadCredentials(finerr)
|
||||
# need to do bad credential versus timeout
|
||||
raise exc.TargetEndpointUnreachable(finerr)
|
||||
elif errnum:
|
||||
raise exc.ConfluentException(errnum.prettyPrint() +
|
||||
' while trying to connect to '
|
||||
'{0}'.format(self.server))
|
||||
for ans in answers:
|
||||
if not obj[0].isPrefixOf(ans[0]):
|
||||
# PySNMP returns leftovers in a bulk command
|
||||
# filter out such leftovers
|
||||
break
|
||||
yield ans
|
||||
#except snmperr.WrongValueError:
|
||||
# raise exc.TargetEndpointBadCredentials('Invalid SNMPv3 password')
|
||||
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user