###############################################################################
import argparse
+import gettext
import sys
import syslog
import location
# i18n
-_ = lambda x: x
+def _(singular, plural=None, n=None):
+ if plural:
+ return gettext.dngettext("libloc", singular, plural, n)
-class CLI(object):
- def __init__(self):
- # Open database
- self.db = location.Database("@databasedir@/database.db")
+ return gettext.dgettext("libloc", singular)
+class CLI(object):
def parse_cli(self):
parser = argparse.ArgumentParser(
description=_("Location Database Command Line Interface"),
parser.add_argument("--version", action="version",
version="%%(prog)s %s" % location.__version__)
+ # database
+ parser.add_argument("--database", "-d",
+ default="@databasedir@/database.db", help=_("Path to database"),
+ )
+
# lookup an IP address
lookup = subparsers.add_parser("lookup",
help=_("Lookup one or multiple IP addresses"),
lookup.add_argument("address", nargs="+")
lookup.set_defaults(func=self.handle_lookup)
- return parser.parse_args()
+ # Get AS
+ get_as = subparsers.add_parser("get-as",
+ help=_("Get information about one or multiple Autonomous Systems"),
+ )
+ get_as.add_argument("asn", nargs="+")
+ get_as.set_defaults(func=self.handle_get_as)
+
+ # Search for AS
+ search_as = subparsers.add_parser("search-as",
+ help=_("Search for Autonomous Systems that match the string"),
+ )
+ search_as.add_argument("query", nargs=1)
+ search_as.set_defaults(func=self.handle_search_as)
+
+ # List all networks in an AS
+ list_networks_by_as = subparsers.add_parser("list-networks-by-as",
+ help=_("Lists all networks in an AS"),
+ )
+ list_networks_by_as.add_argument("asn", nargs=1, type=int)
+ list_networks_by_as.set_defaults(func=self.handle_list_networks_by_as)
+
+ # List all networks in a country
+ search_as = subparsers.add_parser("list-networks-by-cc",
+ help=_("Lists all networks in a country"),
+ )
+ search_as.add_argument("country_code", nargs=1)
+ search_as.set_defaults(func=self.handle_list_networks_by_cc)
+
+ args = parser.parse_args()
+
+ # Print usage if no action was given
+ if not "func" in args:
+ parser.print_usage()
+ sys.exit(2)
+
+ return args
def run(self):
# Parse command line arguments
args = self.parse_cli()
- # Callback function must be defined
- assert args.func, "Callback function not defined"
+ # Open database
+ try:
+ db = location.Database(args.database)
+ except FileNotFoundError as e:
+ sys.stderr.write("location-query: Could not open database %s: %s\n" \
+ % (args.database, e))
+ sys.exit(1)
# Call function
- ret = args.func(args)
+ ret = args.func(db, args)
# Return with exit code
if ret:
# Otherwise just exit
sys.exit(0)
- def handle_lookup(self, ns):
+ def handle_lookup(self, db, ns):
ret = 0
for address in ns.address:
try:
- n = self.db.lookup(address)
+ n = db.lookup(address)
except ValueError:
- sys.stderr.write(_("Invalid IP address: %s") % address)
+ print(_("Invalid IP address: %s") % address, file=sys.stderr)
args = {
"address" : address,
# Nothing found?
if not n:
- print(_("Nothing found for %(address)s") % args)
+ print(_("Nothing found for %(address)s") % args, file=sys.stderr)
ret = 1
continue
# Try to retrieve the AS if we have an AS number
if n.asn:
- a = self.db.get_as(n.asn)
+ a = db.get_as(n.asn)
# If we have found an AS we will print it in the message
if a:
return ret
+ def handle_get_as(self, db, ns):
+ """
+ Gets information about Autonomous Systems
+ """
+ ret = 0
+
+ for asn in ns.asn:
+ try:
+ asn = int(asn)
+ except ValueError:
+ print(_("Invalid ASN: %s") % asn, file=sys.stderr)
+ ret = 1
+ continue
+
+ # Fetch AS from database
+ a = db.get_as(asn)
+
+ # Nothing found
+ if not a:
+ print(_("Could not find AS%s") % asn, file=sys.stderr)
+ ret = 1
+ continue
+
+ print(_("AS%(asn)s belongs to %(name)s") % { "asn" : a.number, "name" : a.name })
+
+ return ret
+
+ def handle_search_as(self, db, ns):
+ for query in ns.query:
+ # Print all matches ASes
+ for a in db.search_as(query):
+ print(a)
+
+ def handle_list_networks_by_as(self, db, ns):
+ for asn in ns.asn:
+ # Print all matching networks
+ for n in db.search_networks(asn=asn):
+ print(n)
+
+ def handle_list_networks_by_cc(self, db, ns):
+ for country_code in ns.country_code:
+ # Print all matching networks
+ for n in db.search_networks(country_code=country_code):
+ print(n)
def main():
# Run the command line interface