#!/usr/bin/python3 ############################################################################### # # # libloc - A library to determine the location of someone on the Internet # # # # Copyright (C) 2017 IPFire Development Team # # # # This library is free software; you can redistribute it and/or # # modify it under the terms of the GNU Lesser General Public # # License as published by the Free Software Foundation; either # # version 2.1 of the License, or (at your option) any later version. # # # # This library is distributed in the hope that it will be useful, # # but WITHOUT ANY WARRANTY; without even the implied warranty of # # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # # Lesser General Public License for more details. # # # ############################################################################### import argparse import datetime import ipaddress import logging import os import re import shutil import socket import sys import time # Load our location module import location import location.downloader import location.export from location.i18n import _ # Setup logging log = logging.getLogger("location") # Output formatters class CLI(object): def parse_cli(self): parser = argparse.ArgumentParser( description=_("Location Database Command Line Interface"), ) subparsers = parser.add_subparsers() # Global configuration flags parser.add_argument("--debug", action="store_true", help=_("Enable debug output")) parser.add_argument("--quiet", action="store_true", help=_("Enable quiet mode")) # version parser.add_argument("--version", action="version", version="%(prog)s @VERSION@") # database parser.add_argument("--database", "-d", default="@databasedir@/database.db", help=_("Path to database"), ) # public key parser.add_argument("--public-key", "-k", default="@databasedir@/signing-key.pem", help=_("Public Signing Key"), ) # Show the database version version = subparsers.add_parser("version", help=_("Show database version")) version.set_defaults(func=self.handle_version) # lookup an IP address lookup = subparsers.add_parser("lookup", help=_("Lookup one or multiple IP addresses"), ) lookup.add_argument("address", nargs="+") lookup.set_defaults(func=self.handle_lookup) # Dump the whole database dump = subparsers.add_parser("dump", help=_("Dump the entire database"), ) dump.add_argument("output", nargs="?", type=argparse.FileType("w")) dump.set_defaults(func=self.handle_dump) # Update update = subparsers.add_parser("update", help=_("Update database")) update.set_defaults(func=self.handle_update) # Verify verify = subparsers.add_parser("verify", help=_("Verify the downloaded database")) verify.set_defaults(func=self.handle_verify) # Get AS get_as = subparsers.add_parser("get-as", help=_("Get information about one or multiple Autonomous Systems"), ) get_as.add_argument("asn", nargs="+") get_as.set_defaults(func=self.handle_get_as) # Search for AS search_as = subparsers.add_parser("search-as", help=_("Search for Autonomous Systems that match the string"), ) search_as.add_argument("query", nargs=1) search_as.set_defaults(func=self.handle_search_as) # List all networks in an AS list_networks_by_as = subparsers.add_parser("list-networks-by-as", help=_("Lists all networks in an AS"), ) list_networks_by_as.add_argument("asn", nargs=1, type=int) list_networks_by_as.add_argument("--family", choices=("ipv6", "ipv4")) list_networks_by_as.add_argument("--format", choices=location.export.formats.keys(), default="list") list_networks_by_as.set_defaults(func=self.handle_list_networks_by_as) # List all networks in a country list_networks_by_cc = subparsers.add_parser("list-networks-by-cc", help=_("Lists all networks in a country"), ) list_networks_by_cc.add_argument("country_code", nargs=1) list_networks_by_cc.add_argument("--family", choices=("ipv6", "ipv4")) list_networks_by_cc.add_argument("--format", choices=location.export.formats.keys(), default="list") list_networks_by_cc.set_defaults(func=self.handle_list_networks_by_cc) # List all networks with flags list_networks_by_flags = subparsers.add_parser("list-networks-by-flags", help=_("Lists all networks with flags"), ) list_networks_by_flags.add_argument("--anonymous-proxy", action="store_true", help=_("Anonymous Proxies"), ) list_networks_by_flags.add_argument("--satellite-provider", action="store_true", help=_("Satellite Providers"), ) list_networks_by_flags.add_argument("--anycast", action="store_true", help=_("Anycasts"), ) list_networks_by_flags.add_argument("--family", choices=("ipv6", "ipv4")) list_networks_by_flags.add_argument("--format", choices=location.export.formats.keys(), default="list") list_networks_by_flags.set_defaults(func=self.handle_list_networks_by_flags) # List countries list_countries = subparsers.add_parser("list-countries", help=_("Lists all countries"), ) list_countries.add_argument("--show-name", action="store_true", help=_("Show the name of the country"), ) list_countries.add_argument("--show-continent", action="store_true", help=_("Show the continent"), ) list_countries.set_defaults(func=self.handle_list_countries) # Export export = subparsers.add_parser("export", help=_("Exports data in many formats to load it into packet filters"), ) export.add_argument("--format", help=_("Output format"), choices=location.export.formats.keys(), default="list") export.add_argument("--directory", help=_("Output directory"), required=True) export.add_argument("--family", help=_("Specify address family"), choices=("ipv6", "ipv4"), ) export.add_argument("objects", nargs="*", help=_("List country codes or ASNs to export")) export.set_defaults(func=self.handle_export) args = parser.parse_args() # Configure logging if args.debug: location.logger.set_level(logging.DEBUG) elif args.quiet: location.logger.set_level(logging.WARNING) # Print usage if no action was given if not "func" in args: parser.print_usage() sys.exit(2) return args def run(self): # Parse command line arguments args = self.parse_cli() # Open database try: db = location.Database(args.database) except FileNotFoundError as e: sys.stderr.write("location: Could not open database %s: %s\n" \ % (args.database, e)) sys.exit(1) # Translate family (if present) if "family" in args: if args.family == "ipv6": args.family = socket.AF_INET6 elif args.family == "ipv4": args.family = socket.AF_INET else: args.family = 0 # Call function try: ret = args.func(db, args) # Catch invalid inputs except ValueError as e: sys.stderr.write("%s\n" % e) ret = 2 # Return with exit code if ret: sys.exit(ret) # Otherwise just exit sys.exit(0) def handle_version(self, db, ns): """ Print the version of the database """ t = time.strftime( "%a, %d %b %Y %H:%M:%S GMT", time.gmtime(db.created_at), ) print(t) def handle_lookup(self, db, ns): ret = 0 format = " %-24s: %s" for address in ns.address: try: network = db.lookup(address) except ValueError: print(_("Invalid IP address: %s") % address, file=sys.stderr) args = { "address" : address, "network" : network, } # Nothing found? if not network: print(_("Nothing found for %(address)s") % args, file=sys.stderr) ret = 1 continue print("%s:" % address) print(format % (_("Network"), network)) # Print country if network.country_code: country = db.get_country(network.country_code) print(format % ( _("Country"), country.name if country else network.country_code), ) # Print AS information if network.asn: autonomous_system = db.get_as(network.asn) print(format % ( _("Autonomous System"), autonomous_system or "AS%s" % network.asn), ) # Anonymous Proxy if network.has_flag(location.NETWORK_FLAG_ANONYMOUS_PROXY): print(format % ( _("Anonymous Proxy"), _("yes"), )) # Satellite Provider if network.has_flag(location.NETWORK_FLAG_SATELLITE_PROVIDER): print(format % ( _("Satellite Provider"), _("yes"), )) # Anycast if network.has_flag(location.NETWORK_FLAG_ANYCAST): print(format % ( _("Anycast"), _("yes"), )) return ret def handle_dump(self, db, ns): # Use output file or write to stdout f = ns.output or sys.stdout # Format everything like this format = "%-24s %s\n" # Write metadata f.write("#\n# Location Database Export\n#\n") f.write("# Generated: %s\n" % time.strftime( "%a, %d %b %Y %H:%M:%S GMT", time.gmtime(db.created_at), )) if db.vendor: f.write("# Vendor: %s\n" % db.vendor) if db.license: f.write("# License: %s\n" % db.license) f.write("#\n") if db.description: for line in db.description.splitlines(): f.write("# %s\n" % line) f.write("#\n") # Iterate over all ASes for a in db.ases: f.write("\n") f.write(format % ("aut-num:", "AS%s" % a.number)) f.write(format % ("name:", a.name)) flags = { location.NETWORK_FLAG_ANONYMOUS_PROXY : "is-anonymous-proxy:", location.NETWORK_FLAG_SATELLITE_PROVIDER : "is-satellite-provider:", location.NETWORK_FLAG_ANYCAST : "is-anycast:", } # Iterate over all networks for n in db.networks: f.write("\n") f.write(format % ("net:", n)) if n.country_code: f.write(format % ("country:", n.country_code)) if n.asn: f.write(format % ("aut-num:", n.asn)) # Print all flags for flag in flags: if n.has_flag(flag): f.write(format % (flags[flag], "yes")) def handle_get_as(self, db, ns): """ Gets information about Autonomous Systems """ ret = 0 for asn in ns.asn: try: asn = int(asn) except ValueError: print(_("Invalid ASN: %s") % asn, file=sys.stderr) ret = 1 continue # Fetch AS from database a = db.get_as(asn) # Nothing found if not a: print(_("Could not find AS%s") % asn, file=sys.stderr) ret = 1 continue print(_("AS%(asn)s belongs to %(name)s") % { "asn" : a.number, "name" : a.name }) return ret def handle_search_as(self, db, ns): for query in ns.query: # Print all matches ASes for a in db.search_as(query): print(a) def handle_update(self, db, ns): # Fetch the timestamp we need from DNS t = location.discover_latest_version() # Parse timestamp into datetime format timestamp = datetime.datetime.fromtimestamp(t) if t else None # Check the version of the local database if db and timestamp and db.created_at >= timestamp.timestamp(): log.info("Already on the latest version") return # Download the database into the correct directory tmpdir = os.path.dirname(ns.database) # Create a downloader d = location.downloader.Downloader() # Try downloading a new database try: t = d.download(public_key=ns.public_key, timestamp=timestamp, tmpdir=tmpdir) # If no file could be downloaded, log a message except FileNotFoundError as e: log.error("Could not download a new database") return 1 # If we have not received a new file, there is nothing to do if not t: return 3 # Move temporary file to destination shutil.move(t.name, ns.database) return 0 def handle_verify(self, ns): try: db = location.Database(ns.database) except FileNotFoundError as e: log.error("%s: %s" % (ns.database, e)) return 127 # Verify the database with open(ns.public_key, "r") as f: if not db.verify(f): log.error("Could not verify database") return 1 # Success log.debug("Database successfully verified") return 0 def __get_output_formatter(self, ns): try: cls = location.export.formats[ns.format] except KeyError: cls = location.export.OutputFormatter return cls def handle_list_countries(self, db, ns): for country in db.countries: line = [ country.code, ] if ns.show_continent: line.append(country.continent_code) if ns.show_name: line.append(country.name) # Format the output line = " ".join(line) # Print the output print(line) def handle_list_networks_by_as(self, db, ns): writer = self.__get_output_formatter(ns) for asn in ns.asn: f = writer(sys.stdout, prefix="AS%s" % asn) # Print all matching networks for n in db.search_networks(asn=asn, family=ns.family): f.write(n) f.finish() def handle_list_networks_by_cc(self, db, ns): writer = self.__get_output_formatter(ns) for country_code in ns.country_code: # Open standard output f = writer(sys.stdout, prefix=country_code) # Print all matching networks for n in db.search_networks(country_code=country_code, family=ns.family): f.write(n) f.finish() def handle_list_networks_by_flags(self, db, ns): flags = 0 if ns.anonymous_proxy: flags |= location.NETWORK_FLAG_ANONYMOUS_PROXY if ns.satellite_provider: flags |= location.NETWORK_FLAG_SATELLITE_PROVIDER if ns.anycast: flags |= location.NETWORK_FLAG_ANYCAST if not flags: raise ValueError(_("You must at least pass one flag")) writer = self.__get_output_formatter(ns) f = writer(sys.stdout, prefix="custom") for n in db.search_networks(flags=flags, family=ns.family): f.write(n) f.finish() def handle_export(self, db, ns): countries, asns = [], [] # Translate family if ns.family: families = [ ns.family ] else: families = [ socket.AF_INET6, socket.AF_INET ] for object in ns.objects: m = re.match("^AS(\d+)$", object) if m: object = int(m.group(1)) asns.append(object) elif location.country_code_is_valid(object) \ or object in ("A1", "A2", "A3"): countries.append(object) else: log.warning("Invalid argument: %s" % object) continue # Default to exporting all countries if not countries and not asns: countries = ["A1", "A2", "A3"] + [country.code for country in db.countries] # Select the output format writer = self.__get_output_formatter(ns) e = location.export.Exporter(db, writer) e.export(ns.directory, countries=countries, asns=asns, families=families) def main(): # Run the command line interface c = CLI() c.run() main()