]> git.ipfire.org Git - location/libloc.git/blobdiff - src/python/location-query.in
Implement listing all IP addresses in an AS
[location/libloc.git] / src / python / location-query.in
index e750ea4bb15cacfe36b3aa9498c6ea3825aefcbe..e8bc19e2915cc0dfbc4374bca071b8e918a60ce0 100644 (file)
@@ -18,6 +18,7 @@
 ###############################################################################
 
 import argparse
+import gettext
 import sys
 import syslog
 
@@ -25,13 +26,13 @@ import syslog
 import location
 
 # i18n
-_ = lambda x: x
+def _(singular, plural=None, n=None):
+       if plural:
+               return gettext.dngettext("libloc", singular, plural, n)
 
-class CLI(object):
-       def __init__(self):
-               # Open database
-               self.db = location.Database("@databasedir@/database.db")
+       return gettext.dgettext("libloc", singular)
 
+class CLI(object):
        def parse_cli(self):
                parser = argparse.ArgumentParser(
                        description=_("Location Database Command Line Interface"),
@@ -46,6 +47,11 @@ class CLI(object):
                parser.add_argument("--version", action="version",
                        version="%%(prog)s %s" % location.__version__)
 
+               # database
+               parser.add_argument("--database", "-d",
+                       default="@databasedir@/database.db", help=_("Path to database"),
+               )
+
                # lookup an IP address
                lookup = subparsers.add_parser("lookup",
                        help=_("Lookup one or multiple IP addresses"),
@@ -53,17 +59,57 @@ class CLI(object):
                lookup.add_argument("address", nargs="+")
                lookup.set_defaults(func=self.handle_lookup)
 
-               return parser.parse_args()
+               # Get AS
+               get_as = subparsers.add_parser("get-as",
+                       help=_("Get information about one or multiple Autonomous Systems"),
+               )
+               get_as.add_argument("asn", nargs="+")
+               get_as.set_defaults(func=self.handle_get_as)
+
+               # Search for AS
+               search_as = subparsers.add_parser("search-as",
+                       help=_("Search for Autonomous Systems that match the string"),
+               )
+               search_as.add_argument("query", nargs=1)
+               search_as.set_defaults(func=self.handle_search_as)
+
+               # List all networks in an AS
+               list_networks_by_as = subparsers.add_parser("list-networks-by-as",
+                       help=_("Lists all networks in an AS"),
+               )
+               list_networks_by_as.add_argument("asn", nargs=1, type=int)
+               list_networks_by_as.set_defaults(func=self.handle_list_networks_by_as)
+
+               # List all networks in a country
+               search_as = subparsers.add_parser("list-networks-by-cc",
+                       help=_("Lists all networks in a country"),
+               )
+               search_as.add_argument("country_code", nargs=1)
+               search_as.set_defaults(func=self.handle_list_networks_by_cc)
+
+               args = parser.parse_args()
+
+               # Print usage if no action was given
+               if not "func" in args:
+                       parser.print_usage()
+                       sys.exit(2)
+
+               return args
 
        def run(self):
                # Parse command line arguments
                args = self.parse_cli()
 
-               # Callback function must be defined
-               assert args.func, "Callback function not defined"
+               # Open database
+               try:
+                       db = location.Database(args.database)
+               except FileNotFoundError as e:
+                       sys.stderr.write("location-query: Could not open database %s: %s\n" \
+                               % (args.database, e))
+                       sys.exit(1)
 
                # Call function
-               ret = args.func(args)
+               ret = args.func(db, args)
 
                # Return with exit code
                if ret:
@@ -72,14 +118,14 @@ class CLI(object):
                # Otherwise just exit
                sys.exit(0)
 
-       def handle_lookup(self, ns):
+       def handle_lookup(self, db, ns):
                ret = 0
 
                for address in ns.address:
                        try:
-                               n = self.db.lookup(address)
+                               n = db.lookup(address)
                        except ValueError:
-                               sys.stderr.write(_("Invalid IP address: %s") % address)
+                               print(_("Invalid IP address: %s") % address, file=sys.stderr)
 
                        args = {
                                "address" : address,
@@ -88,13 +134,13 @@ class CLI(object):
 
                        # Nothing found?
                        if not n:
-                               print(_("Nothing found for %(address)s") % args)
+                               print(_("Nothing found for %(address)s") % args, file=sys.stderr)
                                ret = 1
                                continue
 
                        # Try to retrieve the AS if we have an AS number
                        if n.asn:
-                               a = self.db.get_as(n.asn)
+                               a = db.get_as(n.asn)
 
                                # If we have found an AS we will print it in the message
                                if a:
@@ -109,6 +155,50 @@ class CLI(object):
 
                return ret
 
+       def handle_get_as(self, db, ns):
+               """
+                       Gets information about Autonomous Systems
+               """
+               ret = 0
+
+               for asn in ns.asn:
+                       try:
+                               asn = int(asn)
+                       except ValueError:
+                               print(_("Invalid ASN: %s") % asn, file=sys.stderr)
+                               ret = 1
+                               continue
+
+                       # Fetch AS from database
+                       a = db.get_as(asn)
+
+                       # Nothing found
+                       if not a:
+                               print(_("Could not find AS%s") % asn, file=sys.stderr)
+                               ret = 1
+                               continue
+
+                       print(_("AS%(asn)s belongs to %(name)s") % { "asn" : a.number, "name" : a.name })
+
+               return ret
+
+       def handle_search_as(self, db, ns):
+               for query in ns.query:
+                       # Print all matches ASes
+                       for a in db.search_as(query):
+                               print(a)
+
+       def handle_list_networks_by_as(self, db, ns):
+               for asn in ns.asn:
+                       # Print all matching networks
+                       for n in db.search_networks(asn=asn):
+                               print(n)
+
+       def handle_list_networks_by_cc(self, db, ns):
+               for country_code in ns.country_code:
+                       # Print all matching networks
+                       for n in db.search_networks(country_code=country_code):
+                               print(n)
 
 def main():
        # Run the command line interface