]> git.ipfire.org Git - location/libloc.git/blobdiff - src/python/location.in
Make sources around that we can run tests without location installed
[location/libloc.git] / src / python / location.in
diff --git a/src/python/location.in b/src/python/location.in
deleted file mode 100644 (file)
index 233cea0..0000000
+++ /dev/null
@@ -1,644 +0,0 @@
-#!/usr/bin/python3
-###############################################################################
-#                                                                             #
-# libloc - A library to determine the location of someone on the Internet     #
-#                                                                             #
-# Copyright (C) 2017-2021 IPFire Development Team <info@ipfire.org>           #
-#                                                                             #
-# This library is free software; you can redistribute it and/or               #
-# modify it under the terms of the GNU Lesser General Public                  #
-# License as published by the Free Software Foundation; either                #
-# version 2.1 of the License, or (at your option) any later version.          #
-#                                                                             #
-# This library is distributed in the hope that it will be useful,             #
-# but WITHOUT ANY WARRANTY; without even the implied warranty of              #
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU           #
-# Lesser General Public License for more details.                             #
-#                                                                             #
-###############################################################################
-
-import argparse
-import datetime
-import ipaddress
-import logging
-import os
-import re
-import shutil
-import socket
-import sys
-import time
-
-# Load our location module
-import location
-import location.downloader
-import location.export
-
-from location.i18n import _
-
-# Setup logging
-log = logging.getLogger("location")
-
-# Output formatters
-
-class CLI(object):
-       def parse_cli(self):
-               parser = argparse.ArgumentParser(
-                       description=_("Location Database Command Line Interface"),
-               )
-               subparsers = parser.add_subparsers()
-
-               # Global configuration flags
-               parser.add_argument("--debug", action="store_true",
-                       help=_("Enable debug output"))
-               parser.add_argument("--quiet", action="store_true",
-                       help=_("Enable quiet mode"))
-
-               # version
-               parser.add_argument("--version", action="version",
-                       version="%(prog)s @VERSION@")
-
-               # database
-               parser.add_argument("--database", "-d",
-                       default="@databasedir@/database.db", help=_("Path to database"),
-               )
-
-               # public key
-               parser.add_argument("--public-key", "-k",
-                       default="@databasedir@/signing-key.pem", help=_("Public Signing Key"),
-               )
-
-               # Show the database version
-               version = subparsers.add_parser("version",
-                       help=_("Show database version"))
-               version.set_defaults(func=self.handle_version)
-
-               # lookup an IP address
-               lookup = subparsers.add_parser("lookup",
-                       help=_("Lookup one or multiple IP addresses"),
-               )
-               lookup.add_argument("address", nargs="+")
-               lookup.set_defaults(func=self.handle_lookup)
-
-               # Dump the whole database
-               dump = subparsers.add_parser("dump",
-                       help=_("Dump the entire database"),
-               )
-               dump.add_argument("output", nargs="?", type=argparse.FileType("w"))
-               dump.set_defaults(func=self.handle_dump)
-
-               # Update
-               update = subparsers.add_parser("update", help=_("Update database"))
-               update.add_argument("--cron",
-                       help=_("Update the library only once per interval"),
-                       choices=("daily", "weekly", "monthly"),
-               )
-               update.set_defaults(func=self.handle_update)
-
-               # Verify
-               verify = subparsers.add_parser("verify",
-                       help=_("Verify the downloaded database"))
-               verify.set_defaults(func=self.handle_verify)
-
-               # Get AS
-               get_as = subparsers.add_parser("get-as",
-                       help=_("Get information about one or multiple Autonomous Systems"),
-               )
-               get_as.add_argument("asn", nargs="+")
-               get_as.set_defaults(func=self.handle_get_as)
-
-               # Search for AS
-               search_as = subparsers.add_parser("search-as",
-                       help=_("Search for Autonomous Systems that match the string"),
-               )
-               search_as.add_argument("query", nargs=1)
-               search_as.set_defaults(func=self.handle_search_as)
-
-               # List all networks in an AS
-               list_networks_by_as = subparsers.add_parser("list-networks-by-as",
-                       help=_("Lists all networks in an AS"),
-               )
-               list_networks_by_as.add_argument("asn", nargs=1, type=int)
-               list_networks_by_as.add_argument("--family", choices=("ipv6", "ipv4"))
-               list_networks_by_as.add_argument("--format",
-                       choices=location.export.formats.keys(), default="list")
-               list_networks_by_as.set_defaults(func=self.handle_list_networks_by_as)
-
-               # List all networks in a country
-               list_networks_by_cc = subparsers.add_parser("list-networks-by-cc",
-                       help=_("Lists all networks in a country"),
-               )
-               list_networks_by_cc.add_argument("country_code", nargs=1)
-               list_networks_by_cc.add_argument("--family", choices=("ipv6", "ipv4"))
-               list_networks_by_cc.add_argument("--format",
-                       choices=location.export.formats.keys(), default="list")
-               list_networks_by_cc.set_defaults(func=self.handle_list_networks_by_cc)
-
-               # List all networks with flags
-               list_networks_by_flags = subparsers.add_parser("list-networks-by-flags",
-                       help=_("Lists all networks with flags"),
-               )
-               list_networks_by_flags.add_argument("--anonymous-proxy",
-                       action="store_true", help=_("Anonymous Proxies"),
-               )
-               list_networks_by_flags.add_argument("--satellite-provider",
-                       action="store_true", help=_("Satellite Providers"),
-               )
-               list_networks_by_flags.add_argument("--anycast",
-                       action="store_true", help=_("Anycasts"),
-               )
-               list_networks_by_flags.add_argument("--drop",
-                       action="store_true", help=_("Hostile Networks safe to drop"),
-               )
-               list_networks_by_flags.add_argument("--family", choices=("ipv6", "ipv4"))
-               list_networks_by_flags.add_argument("--format",
-                       choices=location.export.formats.keys(), default="list")
-               list_networks_by_flags.set_defaults(func=self.handle_list_networks_by_flags)
-
-               # List bogons
-               list_bogons = subparsers.add_parser("list-bogons",
-                       help=_("Lists all bogons"),
-               )
-               list_bogons.add_argument("--family", choices=("ipv6", "ipv4"))
-               list_bogons.add_argument("--format",
-                       choices=location.export.formats.keys(), default="list")
-               list_bogons.set_defaults(func=self.handle_list_bogons)
-
-               # List countries
-               list_countries = subparsers.add_parser("list-countries",
-                       help=_("Lists all countries"),
-               )
-               list_countries.add_argument("--show-name",
-                       action="store_true", help=_("Show the name of the country"),
-               )
-               list_countries.add_argument("--show-continent",
-                       action="store_true", help=_("Show the continent"),
-               )
-               list_countries.set_defaults(func=self.handle_list_countries)
-
-               # Export
-               export = subparsers.add_parser("export",
-                       help=_("Exports data in many formats to load it into packet filters"),
-               )
-               export.add_argument("--format", help=_("Output format"),
-                       choices=location.export.formats.keys(), default="list")
-               export.add_argument("--directory", help=_("Output directory"))
-               export.add_argument("--family",
-                       help=_("Specify address family"), choices=("ipv6", "ipv4"),
-               )
-               export.add_argument("objects", nargs="*", help=_("List country codes or ASNs to export"))
-               export.set_defaults(func=self.handle_export)
-
-               args = parser.parse_args()
-
-               # Configure logging
-               if args.debug:
-                       location.logger.set_level(logging.DEBUG)
-               elif args.quiet:
-                       location.logger.set_level(logging.WARNING)
-
-               # Print usage if no action was given
-               if not "func" in args:
-                       parser.print_usage()
-                       sys.exit(2)
-
-               return args
-
-       def run(self):
-               # Parse command line arguments
-               args = self.parse_cli()
-
-               # Open database
-               try:
-                       db = location.Database(args.database)
-               except FileNotFoundError as e:
-                       # Allow continuing without a database
-                       if args.func == self.handle_update:
-                               db = None
-
-                       else:
-                               sys.stderr.write("location: Could not open database %s: %s\n" \
-                                       % (args.database, e))
-                               sys.exit(1)
-
-               # Translate family (if present)
-               if "family" in args:
-                       if args.family == "ipv6":
-                               args.family = socket.AF_INET6
-                       elif args.family == "ipv4":
-                               args.family = socket.AF_INET
-                       else:
-                               args.family = 0
-
-               # Call function
-               try:
-                       ret = args.func(db, args)
-
-               # Catch invalid inputs
-               except ValueError as e:
-                       sys.stderr.write("%s\n" % e)
-                       ret = 2
-
-               # Catch any other exceptions
-               except Exception as e:
-                       sys.stderr.write("%s\n" % e)
-                       ret = 1
-
-               # Return with exit code
-               if ret:
-                       sys.exit(ret)
-
-               # Otherwise just exit
-               sys.exit(0)
-
-       def handle_version(self, db, ns):
-               """
-                       Print the version of the database
-               """
-               t = time.strftime(
-                       "%a, %d %b %Y %H:%M:%S GMT", time.gmtime(db.created_at),
-               )
-
-               print(t)
-
-       def handle_lookup(self, db, ns):
-               ret = 0
-
-               format = "  %-24s: %s"
-
-               for address in ns.address:
-                       try:
-                               network = db.lookup(address)
-                       except ValueError:
-                               print(_("Invalid IP address: %s") % address, file=sys.stderr)
-                               return 2
-
-                       args = {
-                               "address" : address,
-                               "network" : network,
-                       }
-
-                       # Nothing found?
-                       if not network:
-                               print(_("Nothing found for %(address)s") % args, file=sys.stderr)
-                               ret = 1
-                               continue
-
-                       print("%s:" % address)
-                       print(format % (_("Network"), network))
-
-                       # Print country
-                       if network.country_code:
-                               country = db.get_country(network.country_code)
-
-                               print(format % (
-                                       _("Country"),
-                                       country.name if country else network.country_code),
-                               )
-
-                       # Print AS information
-                       if network.asn:
-                               autonomous_system = db.get_as(network.asn)
-
-                               print(format % (
-                                       _("Autonomous System"),
-                                       autonomous_system or "AS%s" % network.asn),
-                               )
-
-                       # Anonymous Proxy
-                       if network.has_flag(location.NETWORK_FLAG_ANONYMOUS_PROXY):
-                               print(format % (
-                                       _("Anonymous Proxy"), _("yes"),
-                               ))
-
-                       # Satellite Provider
-                       if network.has_flag(location.NETWORK_FLAG_SATELLITE_PROVIDER):
-                               print(format % (
-                                       _("Satellite Provider"), _("yes"),
-                               ))
-
-                       # Anycast
-                       if network.has_flag(location.NETWORK_FLAG_ANYCAST):
-                               print(format % (
-                                       _("Anycast"), _("yes"),
-                               ))
-
-                       # Hostile Network
-                       if network.has_flag(location.NETWORK_FLAG_DROP):
-                               print(format % (
-                                       _("Hostile Network safe to drop"), _("yes"),
-                               ))
-
-               return ret
-
-       def handle_dump(self, db, ns):
-               # Use output file or write to stdout
-               f = ns.output or sys.stdout
-
-               # Format everything like this
-               format = "%-24s %s\n"
-
-               # Write metadata
-               f.write("#\n# Location Database Export\n#\n")
-
-               f.write("# Generated: %s\n" % time.strftime(
-                       "%a, %d %b %Y %H:%M:%S GMT", time.gmtime(db.created_at),
-               ))
-
-               if db.vendor:
-                       f.write("# Vendor:    %s\n" % db.vendor)
-
-               if db.license:
-                       f.write("# License:   %s\n" % db.license)
-
-               f.write("#\n")
-
-               if db.description:
-                       for line in db.description.splitlines():
-                               line = "# %s" % line
-                               f.write("%s\n" % line.rstrip())
-
-                       f.write("#\n")
-
-               # Iterate over all ASes
-               for a in db.ases:
-                       f.write("\n")
-                       f.write(format % ("aut-num:", "AS%s" % a.number))
-                       f.write(format % ("name:", a.name))
-
-               flags = {
-                       location.NETWORK_FLAG_ANONYMOUS_PROXY    : "is-anonymous-proxy:",
-                       location.NETWORK_FLAG_SATELLITE_PROVIDER : "is-satellite-provider:",
-                       location.NETWORK_FLAG_ANYCAST            : "is-anycast:",
-                       location.NETWORK_FLAG_DROP               : "drop:",
-               }
-
-               # Iterate over all networks
-               for n in db.networks:
-                       f.write("\n")
-                       f.write(format % ("net:", n))
-
-                       if n.country_code:
-                               f.write(format % ("country:", n.country_code))
-
-                       if n.asn:
-                               f.write(format % ("aut-num:", n.asn))
-
-                       # Print all flags
-                       for flag in flags:
-                               if n.has_flag(flag):
-                                       f.write(format % (flags[flag], "yes"))
-
-       def handle_get_as(self, db, ns):
-               """
-                       Gets information about Autonomous Systems
-               """
-               ret = 0
-
-               for asn in ns.asn:
-                       try:
-                               asn = int(asn)
-                       except ValueError:
-                               print(_("Invalid ASN: %s") % asn, file=sys.stderr)
-                               ret = 1
-                               continue
-
-                       # Fetch AS from database
-                       a = db.get_as(asn)
-
-                       # Nothing found
-                       if not a:
-                               print(_("Could not find AS%s") % asn, file=sys.stderr)
-                               ret = 1
-                               continue
-
-                       print(_("AS%(asn)s belongs to %(name)s") % { "asn" : a.number, "name" : a.name })
-
-               return ret
-
-       def handle_search_as(self, db, ns):
-               for query in ns.query:
-                       # Print all matches ASes
-                       for a in db.search_as(query):
-                               print(a)
-
-       def handle_update(self, db, ns):
-               if ns.cron and db:
-                       now = time.time()
-
-                       if ns.cron == "daily":
-                               delta = datetime.timedelta(days=1)
-                       elif ns.cron == "weekly":
-                               delta = datetime.timedelta(days=7)
-                       elif ns.cron == "monthly":
-                               delta = datetime.timedelta(days=30)
-
-                       delta = delta.total_seconds()
-
-                       # Check if the database has recently been updated
-                       if db.created_at >= (now - delta):
-                               log.info(
-                                       _("The database has been updated recently"),
-                               )
-                               return 3
-
-               # Fetch the timestamp we need from DNS
-               t = location.discover_latest_version()
-
-               # Check the version of the local database
-               if db and t and db.created_at >= t:
-                       log.info("Already on the latest version")
-                       return
-
-               # Download the database into the correct directory
-               tmpdir = os.path.dirname(ns.database)
-
-               # Create a downloader
-               d = location.downloader.Downloader()
-
-               # Try downloading a new database
-               try:
-                       t = d.download(public_key=ns.public_key, timestamp=t, tmpdir=tmpdir)
-
-               # If no file could be downloaded, log a message
-               except FileNotFoundError as e:
-                       log.error("Could not download a new database")
-                       return 1
-
-               # If we have not received a new file, there is nothing to do
-               if not t:
-                       return 3
-
-               # Move temporary file to destination
-               shutil.move(t.name, ns.database)
-
-               return 0
-
-       def handle_verify(self, db, ns):
-               # Verify the database
-               with open(ns.public_key, "r") as f:
-                       if not db.verify(f):
-                               log.error("Could not verify database")
-                               return 1
-
-               # Success
-               log.debug("Database successfully verified")
-               return 0
-
-       def __get_output_formatter(self, ns):
-               try:
-                       cls = location.export.formats[ns.format]
-               except KeyError:
-                       cls = location.export.OutputFormatter
-
-               return cls
-
-       def handle_list_countries(self, db, ns):
-               for country in db.countries:
-                       line = [
-                               country.code,
-                       ]
-
-                       if ns.show_continent:
-                               line.append(country.continent_code)
-
-                       if ns.show_name:
-                               line.append(country.name)
-
-                       # Format the output
-                       line = " ".join(line)
-
-                       # Print the output
-                       print(line)
-
-       def handle_list_networks_by_as(self, db, ns):
-               writer = self.__get_output_formatter(ns)
-
-               for asn in ns.asn:
-                       f = writer("AS%s" % asn, f=sys.stdout)
-
-                       # Print all matching networks
-                       for n in db.search_networks(asns=[asn], family=ns.family):
-                               f.write(n)
-
-                       f.finish()
-
-       def handle_list_networks_by_cc(self, db, ns):
-               writer = self.__get_output_formatter(ns)
-
-               for country_code in ns.country_code:
-                       # Open standard output
-                       f = writer(country_code, f=sys.stdout)
-
-                       # Print all matching networks
-                       for n in db.search_networks(country_codes=[country_code], family=ns.family):
-                               f.write(n)
-
-                       f.finish()
-
-       def handle_list_networks_by_flags(self, db, ns):
-               flags = 0
-
-               if ns.anonymous_proxy:
-                       flags |= location.NETWORK_FLAG_ANONYMOUS_PROXY
-
-               if ns.satellite_provider:
-                       flags |= location.NETWORK_FLAG_SATELLITE_PROVIDER
-
-               if ns.anycast:
-                       flags |= location.NETWORK_FLAG_ANYCAST
-
-               if ns.drop:
-                       flags |= location.NETWORK_FLAG_DROP
-
-               if not flags:
-                       raise ValueError(_("You must at least pass one flag"))
-
-               writer = self.__get_output_formatter(ns)
-               f = writer("custom", f=sys.stdout)
-
-               for n in db.search_networks(flags=flags, family=ns.family):
-                       f.write(n)
-
-               f.finish()
-
-       def handle_list_bogons(self, db, ns):
-               writer = self.__get_output_formatter(ns)
-               f = writer("bogons", f=sys.stdout)
-
-               for n in db.list_bogons(family=ns.family):
-                       f.write(n)
-
-               f.finish()
-
-       def handle_export(self, db, ns):
-               countries, asns = [], []
-
-               # Translate family
-               if ns.family:
-                       families = [ ns.family ]
-               else:
-                       families = [ socket.AF_INET6, socket.AF_INET ]
-
-               for object in ns.objects:
-                       m = re.match("^AS(\d+)$", object)
-                       if m:
-                               object = int(m.group(1))
-
-                               asns.append(object)
-
-                       elif location.country_code_is_valid(object) \
-                                       or object in ("A1", "A2", "A3", "XD"):
-                               countries.append(object)
-
-                       else:
-                               log.warning("Invalid argument: %s" % object)
-                               continue
-
-               # Default to exporting all countries
-               if not countries and not asns:
-                       countries = ["A1", "A2", "A3", "XD"] + [country.code for country in db.countries]
-
-               # Select the output format
-               writer = self.__get_output_formatter(ns)
-
-               e = location.export.Exporter(db, writer)
-               e.export(ns.directory, countries=countries, asns=asns, families=families)
-
-
-def format_timedelta(t):
-       s = []
-
-       if t.days:
-               s.append(
-                       _("One Day", "%(days)s Days", t.days) % { "days" : t.days, }
-               )
-
-       hours = t.seconds // 3600
-       if hours:
-               s.append(
-                       _("One Hour", "%(hours)s Hours", hours) % { "hours" : hours, }
-               )
-
-       minutes = (t.seconds % 3600) // 60
-       if minutes:
-               s.append(
-                       _("One Minute", "%(minutes)s Minutes", minutes) % { "minutes" : minutes, }
-               )
-
-       seconds = t.seconds % 60
-       if t.seconds:
-               s.append(
-                       _("One Second", "%(seconds)s Seconds", seconds) % { "seconds" : seconds, }
-               )
-
-       if not s:
-               return _("Now")
-
-       return _("%s ago") % ", ".join(s)
-
-def main():
-       # Run the command line interface
-       c = CLI()
-       c.run()
-
-main()