#! /usr/bin/env python ############################################################################# ## ## ## netyah.py -- draw a network map from routing tables or rulesets ## ## more infos: http://www.secdev.org/projects/netyah ## ## ## ## Copyright (C) 2004 Philippe Biondi ## ## ## ## This program is free software; you can redistribute it and/or modify it ## ## under the terms of the GNU General Public License as published by the ## ## Free Software Foundation; either version 2, or (at your option) any ## ## later version. ## ## ## ## This program is distributed in the hope that it will be useful, but ## ## WITHOUT ANY WARRANTY; without even the implied warranty of ## ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ## ## General Public License for more details. ## ## ## ############################################################################# # Routes -> graph # # $Id: netyah.py,v 1.2 2004/07/23 17:31:34 pbi Exp pbi $ import sys,re,socket, struct,types def WARNING(x): print >>sys.stderr,"WARNING: %s"%x def atontol(x): return struct.unpack("!I",socket.inet_aton(x))[0] def ltontoa(x): return socket.inet_ntoa(struct.pack("!I", x)) def incl(x,y): x += "/32" ip1,msk1 = x.split("/")[0:2] y += "/32" ip2,msk2 = y.split("/")[0:2] ip1 = atontol(ip1) ip2 = atontol(ip2) msk1 = ~((1L << (32-int(msk1))) - 1) msk2 = ~((1L << (32-int(msk2))) - 1) c1 = cmp(ip1 & msk1, ip2 & msk2) c2 = -cmp(ip1 | ~msk1, ip2 |~ msk2) if c1 < 0 or c2 < 0: return -1 return c1 or c2 def normalize(net): net += "/32" ip,msk = net.split("/")[0:2] ip = atontol(ip) msk2 = ~((1L << (32-int(msk))) - 1) ip &= msk2 ip = ltontoa(ip) if msk != 32: ip += "/%s" % msk return ip def ipmask(ip, msk): msk = atontol(msk) msk = (msk & 0x55555555L)+((msk & 0xaaaaaaaaL)>>1) msk = (msk & 0x33333333L)+((msk & 0xccccccccL)>>2) msk = (msk + (msk>>4)) & 0x0f0f0f0fL msk = (msk + (msk>>8)) & 0x00ff00ffL msk = (msk + (msk>>16)) & 0x0000ffffL return "%s/%i" % (ip,msk) class Router: def __init__(self, ruleset, name=None): self.routes={} self.ifaces={} self.import_ruleset(ruleset) if name is None: self.name = ruleset.split("/")[-1] else: self.name = name def import_ruleset(self, ruleset): f=open(ruleset) self.parse_ruleset(f.readlines()) f.close() class Cisco(Router): re_name=re.compile(r"\s*hostname\s+([a-zA-Z0-9.-]*)") re_if=re.compile(r"\s*interface\s+([a-zA-Z0-9/]*)") re_ifip=re.compile(r"\s*ip\s+address\s+([0-9.]*)\s+([0-9.]*)") re_route=re.compile(r"\s*ip\s+route\s+([0-9.]*)\s+([0-9.]*)\s+([0-9.]*)") def parse_ruleset(self, rules): iff = "" for l in rules: m = self.re_name.match(l) if m: self.name = m.groups()[0] continue m = self.re_if.match(l) if m: iff = m.groups()[0] continue m = self.re_ifip.match(l) if m: self.ifaces[apply(ipmask,m.groups())] = iff continue m = self.re_route.match(l) if m: dst,msk,gw = m.groups() if gw: self.routes[normalize(ipmask(dst,msk))] = gw else: WARNING("no gw found in line [%s]"%l.strip()) continue class Pix(Cisco): re_objname=re.compile(r"\s*name\s+([0-9.]+)\s+([a-zA-Z0-9.-]+)") re_if=re.compile(r"\s*ip\s+address\s+([a-zA-Z0-9.-]+)\s+([0-9.]+)\s+([0-9.]+)") re_routeobj=re.compile(r"\s*route\s+[a-zA-Z0-9.-]+\s+([0-9.]+)\s+([0-9.]+)\s+([a-zA-Z0-9.-]+)") re_ip = re.compile(r"[0-9.]+") def parse_ruleset(self, rules): obj={} for l in rules: m = self.re_objname.match(l) if m: ip,name = m.groups() obj[name] = ip for l in rules: m = self.re_name.match(l) if m: self.name = m.groups()[0] continue m = self.re_if.match(l) if m: name,ip,msk = m.groups() self.ifaces[ipmask(ip,msk)] = name continue m = self.re_routeobj.match(l) if m: dst,msk,gw = m.groups() if not self.re_ip.match(gw): gw = obj.get(gw,"") if gw: self.routes[normalize(ipmask(dst,msk))] = gw else: WARNING("no gw found in line [%s]"%l.strip()) continue class Netscreen(Router): re_name = re.compile(r"\s*set\s+hostname\s+([a-zA-Z0-9.-]+)") re_if = re.compile(r'\s*set\s+interface\s+([a-zA-Z0-9.-]+)\s+ip\s+([0-9./]+)') re_route = re.compile(r"\s*set\s+route\s+([0-9./]+)\s+interface\s+[a-zA-Z0-9.-]+\s+gateway\s+([0-9.]+)") def parse_ruleset(self, rules): for l in rules: m = self.re_name.match(l) if m: self.name = m.groups()[0] continue m = self.re_if.match(l) if m: name,ip = m.groups() self.ifaces[ip] = name continue m = self.re_route.match(l) if m: dst,gw = m.groups() self.routes[normalize(dst)] = gw continue class Solaris_netstat(Router): re_if = re.compile(r"[0-9.]+\s+([0-9.]+)\s+([0-9.]+)\s+([a-zA-Z][a-zA-Z0-9]+)") re_route = re.compile(r"(default|[0-9.]+)\s+([0-9.]+)\s+([0-9.]+)") def parse_ruleset(self, rules): for l in rules: m = self.re_if.match(l) if m: msk,ip,ifname = m.groups() if ifname.startswith("lo"): continue self.ifaces[ipmask(ip,msk)] = ifname continue m = self.re_route.match(l) if m: dst,msk,gw = m.groups() if dst == "default": dst = "0.0.0.0" self.routes[normalize(ipmask(dst,msk))] = gw continue class Linux_netstat(Router): re_line = re.compile(r"\s*([0-9.]+)\s+([0-9.]+)\s+([0-9.]+)\s+([A-Z]+)\s.*\s([a-zA-Z0-9]+)$") def parse_ruleset(self, rules): for l in rules: m = self.re_line.match(l) if m: dst,gw,msk,fl,iff = m.groups() if "G" in fl: self.routes[normalize(ipmask(dst,msk))] = gw else: self.ifaces[ipmask(dst,msk)] = iff continue class SNMP_output(Router): re_ipAdEntNetMask = re.compile(r"ip\.ipAddrTable\.ipAddrEntry\.ipAdEntNetMask\.([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}).*IpAddress:\s*([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})") re_ipRouteNextHop = re.compile(r"ip\.ipRouteTable\.ipRouteEntry\.ipRouteNextHop\.([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}).*IpAddress:\s*([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})") re_ipRouteMask = re.compile(r"ip\.ipRouteTable\.ipRouteEntry\.ipRouteMask\.([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}).*IpAddress:\s*([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})") def parse_ruleset(self, rules): GW=1 MSK=2 routes={} for l in rules: m = self.re_ipAdEntNetMask.match(l) if m: ip,mask = m.groups() if ip not in ["127.0.0.1","0.0.0.0"]: self.ifaces[ipmask(ip,mask)]=ip continue m = self.re_ipRouteNextHop.match(l) if m: dst,gw = m.groups() if not routes.has_key(dst): routes[dst] = {} routes[dst][GW]=gw continue m = self.re_ipRouteMask.match(l) if m: dst,mask = m.groups() if not routes.has_key(dst): routes[dst] = {} routes[dst][MSK]=mask for d in routes: if routes[d][GW] == '0.0.0.0': continue self.routes[normalize(ipmask(d,routes[d][MSK]))] = routes[d][GW] class SNMP_query(SNMP_output): def import_ruleset(self, ruleset): f=popen("snmpwalk -n -v1 -c public %s" % ruleset) self.parse_ruleset(f.readlines()) ret = f.close() if ret: WARNING("snmpwalk exited with code %04x" % ret) class RIP_output(Router): re_gw = re.compile(r".*from.*[^0-9]([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})") re_route = re.compile(r"\s*([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}),\s*metric\s+([0-9]+)") def parse_ruleset(self, rules): gw = None for l in rules: m = self.re_gw.match(l) if m: gw, = m.groups() continue if gw is None: WARNING("line [%s] reached before any gw declaration"%l[:-1]) continue m = self.re_route.match(l) if m: dst,metric = m.groups() ipdst = atontol(dst) netmask=32 while netmask and ipdst & 0xff == 0: ipdst >>=8 netmask -= 8 self.routes["%s/%i" % (dst,netmask)] = gw class RIP_query(RIP_output): def import_ruleset(self, ruleset): f=popen("ripquery -l -n -r %s" % ruleset) self.parse_ruleset(f.readlines()) ret = f.close() if ret: WARNING("ripquery exited with code %04x" % ret) #class traceroute(Router): # should use whois to determine netmask ? # pass router_types=[] l = locals().keys() for v in l: o = locals()[v] if type(o) is types.ClassType: if issubclass(o,Router): router_types.append(v) routers = [] the_router = None for p in sys.argv[1:]: if p[0] == "-" and p[1:] in router_types: the_router = locals()[p[1:]] continue elif the_router: routers.append(the_router(p)) print >>sys.stderr,"adding %s [%s]" % (the_router.__name__, p) else: print >>sys.stderr,"Warning: unknwon router type [%s]" % p if not routers: print >>sys.stderr,"Error: no routers given..." print >>sys.stderr,"Usage: netyah.py -routertype taget1 [target2..] [-routertype2 target3..]" print >>sys.stderr,"Supported router types :\n-", "\n- ".join(router_types) raise SystemExit if 0: import code,rlcompleter,readline readline.parse_and_bind("tab: complete") code.interact(local=vars()) #routers = [Linux_netstat("rulesets/mizar"), # Linux_netstat("rulesets/rigel"), # Linux_netstat("rulesets/styx")] nets = {} nodes = {} nodes2 = {} ipname = {} ipiff = {} for router in routers: nodes[router.name] = None for iff in router.ifaces: ipname[iff.split("/")[0]] = router.name ipiff[iff.split("/")[0]] = router.ifaces[iff] nets[normalize(iff)]=None for router in routers: for r in router.routes: if router.routes[r] not in ipname: nodes2[router.routes[r]] = None print "graph reseau {" print "\trankdir=LR;" print "\tnode [shape=box,style=filled,color=lightgrey,fontsize=10];" for n in nodes.keys()+nodes2.keys(): print '\t\t"%s";' % n print "\tnode [shape=ellipse,style=solid];" print for router in routers: for iff in router.ifaces: for n in nets: if incl(iff.split("/")[0], n) >= 0: print '\t"%s" -- "%s" [label="%s"];' % (router.name, n, router.ifaces[iff]) print for nd in nodes2: for nt in nets: if incl(nd,nt) >= 0: print '\t"%s" -- "%s"' % (nt,nd) print for router in routers: for r in router.routes: if router.routes[r] not in ipname: print '\t"%s" -- "%s"' % (router.routes[r],r) print "}"