]> git.ipfire.org Git - location/libloc.git/blobdiff - src/python/location-query.in
importer: Drop EDROP as it has been merged into DROP
[location/libloc.git] / src / python / location-query.in
diff --git a/src/python/location-query.in b/src/python/location-query.in
deleted file mode 100644 (file)
index 2604255..0000000
+++ /dev/null
@@ -1,358 +0,0 @@
-#!/usr/bin/python3
-###############################################################################
-#                                                                             #
-# libloc - A library to determine the location of someone on the Internet     #
-#                                                                             #
-# Copyright (C) 2017 IPFire Development Team <info@ipfire.org>                #
-#                                                                             #
-# This library is free software; you can redistribute it and/or               #
-# modify it under the terms of the GNU Lesser General Public                  #
-# License as published by the Free Software Foundation; either                #
-# version 2.1 of the License, or (at your option) any later version.          #
-#                                                                             #
-# This library is distributed in the hope that it will be useful,             #
-# but WITHOUT ANY WARRANTY; without even the implied warranty of              #
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU           #
-# Lesser General Public License for more details.                             #
-#                                                                             #
-###############################################################################
-
-import argparse
-import gettext
-import ipaddress
-import os
-import socket
-import sys
-
-# Load our location module
-import location
-
-# i18n
-def _(singular, plural=None, n=None):
-       if plural:
-               return gettext.dngettext("libloc", singular, plural, n)
-
-       return gettext.dgettext("libloc", singular)
-
-# Output formatters
-
-class OutputFormatter(object):
-       def __init__(self, ns):
-               self.ns = ns
-
-       def __enter__(self):
-               # Open the output
-               self.open()
-
-               return self
-
-       def __exit__(self, type, value, tb):
-               if tb is None:
-                       self.close()
-
-       @property
-       def name(self):
-               if "country_code" in self.ns:
-                       return "networks_country_%s" % self.ns.country_code[0]
-
-               elif "asn" in self.ns:
-                       return "networks_AS%s" % self.ns.asn[0]
-
-       def open(self):
-               pass
-
-       def close(self):
-               pass
-
-       def network(self, network):
-               print(network)
-
-
-class IpsetOutputFormatter(OutputFormatter):
-       """
-               For nftables
-       """
-       def open(self):
-               print("create %s hash:net family inet hashsize 1024 maxelem 65536" % self.name)
-
-       def network(self, network):
-               print("add %s %s" % (self.name, network))
-
-
-class NftablesOutputFormatter(OutputFormatter):
-       """
-               For nftables
-       """
-       def open(self):
-               print("define %s = {" % self.name)
-
-       def close(self):
-               print("}")
-
-       def network(self, network):
-               print(" %s," % network)
-
-
-class XTGeoIPOutputFormatter(OutputFormatter):
-       """
-               Formats the output in that way, that it can be loaded by
-               the xt_geoip kernel module from xtables-addons.
-       """
-       def network(self, network):
-               n = ipaddress.ip_network("%s" % network)
-
-               for address in (n.network_address, n.broadcast_address):
-                       bytes = socket.inet_pton(
-                               socket.AF_INET6 if address.version == 6 else socket.AF_INET,
-                               "%s" % address,
-                       )
-
-                       os.write(1, bytes)
-
-
-class CLI(object):
-       output_formats = {
-               "ipset"    : IpsetOutputFormatter,
-               "list"     : OutputFormatter,
-               "nftables" : NftablesOutputFormatter,
-               "xt_geoip" : XTGeoIPOutputFormatter,
-       }
-
-       def parse_cli(self):
-               parser = argparse.ArgumentParser(
-                       description=_("Location Database Command Line Interface"),
-               )
-               subparsers = parser.add_subparsers()
-
-               # Global configuration flags
-               parser.add_argument("--debug", action="store_true",
-                       help=_("Enable debug output"))
-
-               # version
-               parser.add_argument("--version", action="version",
-                       version="%(prog)s @VERSION@")
-
-               # database
-               parser.add_argument("--database", "-d",
-                       default="@databasedir@/database.db", help=_("Path to database"),
-               )
-
-               # public key
-               parser.add_argument("--public-key", "-k",
-                       default="@databasedir@/signing-key.pem", help=_("Public Signing Key"),
-               )
-
-               # lookup an IP address
-               lookup = subparsers.add_parser("lookup",
-                       help=_("Lookup one or multiple IP addresses"),
-               )
-               lookup.add_argument("address", nargs="+")
-               lookup.set_defaults(func=self.handle_lookup)
-
-               # Get AS
-               get_as = subparsers.add_parser("get-as",
-                       help=_("Get information about one or multiple Autonomous Systems"),
-               )
-               get_as.add_argument("asn", nargs="+")
-               get_as.set_defaults(func=self.handle_get_as)
-
-               # Search for AS
-               search_as = subparsers.add_parser("search-as",
-                       help=_("Search for Autonomous Systems that match the string"),
-               )
-               search_as.add_argument("query", nargs=1)
-               search_as.set_defaults(func=self.handle_search_as)
-
-               # List all networks in an AS
-               list_networks_by_as = subparsers.add_parser("list-networks-by-as",
-                       help=_("Lists all networks in an AS"),
-               )
-               list_networks_by_as.add_argument("asn", nargs=1, type=int)
-               list_networks_by_as.add_argument("--family", choices=("ipv6", "ipv4"))
-               list_networks_by_as.add_argument("--output-format",
-                       choices=self.output_formats.keys(), default="list")
-               list_networks_by_as.set_defaults(func=self.handle_list_networks_by_as)
-
-               # List all networks in a country
-               list_networks_by_cc = subparsers.add_parser("list-networks-by-cc",
-                       help=_("Lists all networks in a country"),
-               )
-               list_networks_by_cc.add_argument("country_code", nargs=1)
-               list_networks_by_cc.add_argument("--family", choices=("ipv6", "ipv4"))
-               list_networks_by_cc.add_argument("--output-format",
-                       choices=self.output_formats.keys(), default="list")
-               list_networks_by_cc.set_defaults(func=self.handle_list_networks_by_cc)
-
-               # List all networks with flags
-               list_networks_by_flags = subparsers.add_parser("list-networks-by-flags",
-                       help=_("Lists all networks with flags"),
-               )
-               list_networks_by_flags.add_argument("--anonymous-proxy",
-                       action="store_true", help=_("Anonymous Proxies"),
-               )
-               list_networks_by_flags.add_argument("--satellite-provider",
-                       action="store_true", help=_("Satellite Providers"),
-               )
-               list_networks_by_flags.add_argument("--anycast",
-                       action="store_true", help=_("Anycasts"),
-               )
-               list_networks_by_flags.add_argument("--family", choices=("ipv6", "ipv4"))
-               list_networks_by_flags.add_argument("--output-format",
-                       choices=self.output_formats.keys(), default="list")
-               list_networks_by_flags.set_defaults(func=self.handle_list_networks_by_flags)
-
-               args = parser.parse_args()
-
-               # Print usage if no action was given
-               if not "func" in args:
-                       parser.print_usage()
-                       sys.exit(2)
-
-               return args
-
-       def run(self):
-               # Parse command line arguments
-               args = self.parse_cli()
-
-               # Open database
-               try:
-                       db = location.Database(args.database)
-               except FileNotFoundError as e:
-                       sys.stderr.write("location-query: Could not open database %s: %s\n" \
-                               % (args.database, e))
-                       sys.exit(1)
-
-               # Translate family (if present)
-               if "family" in args:
-                       if args.family == "ipv6":
-                               args.family = socket.AF_INET6
-                       elif args.family == "ipv4":
-                               args.family = socket.AF_INET
-                       else:
-                               args.family = 0
-
-               # Call function
-               ret = args.func(db, args)
-
-               # Return with exit code
-               if ret:
-                       sys.exit(ret)
-
-               # Otherwise just exit
-               sys.exit(0)
-
-       def handle_lookup(self, db, ns):
-               ret = 0
-
-               for address in ns.address:
-                       try:
-                               n = db.lookup(address)
-                       except ValueError:
-                               print(_("Invalid IP address: %s") % address, file=sys.stderr)
-
-                       args = {
-                               "address" : address,
-                               "network" : n,
-                       }
-
-                       # Nothing found?
-                       if not n:
-                               print(_("Nothing found for %(address)s") % args, file=sys.stderr)
-                               ret = 1
-                               continue
-
-                       # Try to retrieve the AS if we have an AS number
-                       if n.asn:
-                               a = db.get_as(n.asn)
-
-                               # If we have found an AS we will print it in the message
-                               if a:
-                                       args.update({
-                                               "as" : a,
-                                       })
-
-                                       print(_("%(address)s belongs to %(network)s which is a part of %(as)s") % args)
-                                       continue
-
-                       print(_("%(address)s belongs to %(network)s") % args)
-
-               return ret
-
-       def handle_get_as(self, db, ns):
-               """
-                       Gets information about Autonomous Systems
-               """
-               ret = 0
-
-               for asn in ns.asn:
-                       try:
-                               asn = int(asn)
-                       except ValueError:
-                               print(_("Invalid ASN: %s") % asn, file=sys.stderr)
-                               ret = 1
-                               continue
-
-                       # Fetch AS from database
-                       a = db.get_as(asn)
-
-                       # Nothing found
-                       if not a:
-                               print(_("Could not find AS%s") % asn, file=sys.stderr)
-                               ret = 1
-                               continue
-
-                       print(_("AS%(asn)s belongs to %(name)s") % { "asn" : a.number, "name" : a.name })
-
-               return ret
-
-       def handle_search_as(self, db, ns):
-               for query in ns.query:
-                       # Print all matches ASes
-                       for a in db.search_as(query):
-                               print(a)
-
-       def __get_output_formatter(self, ns):
-               try:
-                       cls = self.output_formats[ns.output_format]
-               except KeyError:
-                       cls = OutputFormatter
-
-               return cls(ns)
-
-       def handle_list_networks_by_as(self, db, ns):
-               with self.__get_output_formatter(ns) as f:
-                       for asn in ns.asn:
-                               # Print all matching networks
-                               for n in db.search_networks(asn=asn, family=ns.family):
-                                       f.network(n)
-
-       def handle_list_networks_by_cc(self, db, ns):
-               with self.__get_output_formatter(ns) as f:
-                       for country_code in ns.country_code:
-                               # Print all matching networks
-                               for n in db.search_networks(country_code=country_code, family=ns.family):
-                                       f.network(n)
-
-       def handle_list_networks_by_flags(self, db, ns):
-               flags = 0
-
-               if ns.anonymous_proxy:
-                       flags |= location.NETWORK_FLAG_ANONYMOUS_PROXY
-
-               if ns.satellite_provider:
-                       flags |= location.NETWORK_FLAG_SATELLITE_PROVIDER
-
-               if ns.anycast:
-                       flags |= location.NETWORK_FLAG_ANYCAST
-
-               with self.__get_output_formatter(ns) as f:
-                       for n in db.search_networks(flags=flags, family=ns.family):
-                               f.network(n)
-
-
-def main():
-       # Run the command line interface
-       c = CLI()
-       c.run()
-
-main()