+++ /dev/null
-#!/usr/bin/python3
-###############################################################################
-# #
-# libloc - A library to determine the location of someone on the Internet #
-# #
-# Copyright (C) 2017 IPFire Development Team <info@ipfire.org> #
-# #
-# This library is free software; you can redistribute it and/or #
-# modify it under the terms of the GNU Lesser General Public #
-# License as published by the Free Software Foundation; either #
-# version 2.1 of the License, or (at your option) any later version. #
-# #
-# This library is distributed in the hope that it will be useful, #
-# but WITHOUT ANY WARRANTY; without even the implied warranty of #
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU #
-# Lesser General Public License for more details. #
-# #
-###############################################################################
-
-import argparse
-import gettext
-import ipaddress
-import os
-import socket
-import sys
-import syslog
-
-# Load our location module
-import location
-
-# i18n
-def _(singular, plural=None, n=None):
- if plural:
- return gettext.dngettext("libloc", singular, plural, n)
-
- return gettext.dgettext("libloc", singular)
-
-# Output formatters
-
-class OutputFormatter(object):
- def __init__(self, ns):
- self.ns = ns
-
- def __enter__(self):
- # Open the output
- self.open()
-
- return self
-
- def __exit__(self, type, value, tb):
- if tb is None:
- self.close()
-
- @property
- def name(self):
- if "country_code" in self.ns:
- return "networks_country_%s" % self.ns.country_code[0]
-
- elif "asn" in self.ns:
- return "networks_AS%s" % self.ns.asn[0]
-
- def open(self):
- pass
-
- def close(self):
- pass
-
- def network(self, network):
- print(network)
-
-
-class IpsetOutputFormatter(OutputFormatter):
- """
- For nftables
- """
- def open(self):
- print("create %s hash:net family inet hashsize 1024 maxelem 65536" % self.name)
-
- def network(self, network):
- print("add %s %s" % (self.name, network))
-
-
-class NftablesOutputFormatter(OutputFormatter):
- """
- For nftables
- """
- def open(self):
- print("define %s = {" % self.name)
-
- def close(self):
- print("}")
-
- def network(self, network):
- print(" %s," % network)
-
-
-class XTGeoIPOutputFormatter(OutputFormatter):
- """
- Formats the output in that way, that it can be loaded by
- the xt_geoip kernel module from xtables-addons.
- """
- def network(self, network):
- n = ipaddress.ip_network("%s" % network)
-
- for address in (n.network_address, n.broadcast_address):
- bytes = socket.inet_pton(
- socket.AF_INET6 if address.version == 6 else socket.AF_INET,
- "%s" % address,
- )
-
- os.write(1, bytes)
-
-
-class CLI(object):
- output_formats = {
- "ipset" : IpsetOutputFormatter,
- "list" : OutputFormatter,
- "nftables" : NftablesOutputFormatter,
- "xt_geoip" : XTGeoIPOutputFormatter,
- }
-
- def parse_cli(self):
- parser = argparse.ArgumentParser(
- description=_("Location Database Command Line Interface"),
- )
- subparsers = parser.add_subparsers()
-
- # Global configuration flags
- parser.add_argument("--debug", action="store_true",
- help=_("Enable debug output"))
-
- # version
- parser.add_argument("--version", action="version",
- version="%%(prog)s %s" % location.__version__)
-
- # database
- parser.add_argument("--database", "-d",
- default="@databasedir@/database.db", help=_("Path to database"),
- )
-
- # public key
- parser.add_argument("--public-key", "-k",
- default="@databasedir@/signing-key.pem", help=_("Public Signing Key"),
- )
-
- # lookup an IP address
- lookup = subparsers.add_parser("lookup",
- help=_("Lookup one or multiple IP addresses"),
- )
- lookup.add_argument("address", nargs="+")
- lookup.set_defaults(func=self.handle_lookup)
-
- # Get AS
- get_as = subparsers.add_parser("get-as",
- help=_("Get information about one or multiple Autonomous Systems"),
- )
- get_as.add_argument("asn", nargs="+")
- get_as.set_defaults(func=self.handle_get_as)
-
- # Search for AS
- search_as = subparsers.add_parser("search-as",
- help=_("Search for Autonomous Systems that match the string"),
- )
- search_as.add_argument("query", nargs=1)
- search_as.set_defaults(func=self.handle_search_as)
-
- # List all networks in an AS
- list_networks_by_as = subparsers.add_parser("list-networks-by-as",
- help=_("Lists all networks in an AS"),
- )
- list_networks_by_as.add_argument("asn", nargs=1, type=int)
- list_networks_by_as.add_argument("--output-format",
- choices=self.output_formats.keys(), default="list")
- list_networks_by_as.set_defaults(func=self.handle_list_networks_by_as)
-
- # List all networks in a country
- list_networks_by_cc = subparsers.add_parser("list-networks-by-cc",
- help=_("Lists all networks in a country"),
- )
- list_networks_by_cc.add_argument("country_code", nargs=1)
- list_networks_by_cc.add_argument("--output-format",
- choices=self.output_formats.keys(), default="list")
- list_networks_by_cc.set_defaults(func=self.handle_list_networks_by_cc)
-
- # List all networks with flags
- list_networks_by_flags = subparsers.add_parser("list-networks-by-flags",
- help=_("Lists all networks with flags"),
- )
- list_networks_by_flags.add_argument("--anonymous-proxy",
- action="store_true", help=_("Anonymous Proxies"),
- )
- list_networks_by_flags.add_argument("--satellite-provider",
- action="store_true", help=_("Satellite Providers"),
- )
- list_networks_by_flags.add_argument("--anycast",
- action="store_true", help=_("Anycasts"),
- )
- list_networks_by_flags.add_argument("--output-format",
- choices=self.output_formats.keys(), default="list")
- list_networks_by_flags.set_defaults(func=self.handle_list_networks_by_flags)
-
- args = parser.parse_args()
-
- # Print usage if no action was given
- if not "func" in args:
- parser.print_usage()
- sys.exit(2)
-
- return args
-
- def run(self):
- # Parse command line arguments
- args = self.parse_cli()
-
- # Open database
- try:
- db = location.Database(args.database)
- except FileNotFoundError as e:
- sys.stderr.write("location-query: Could not open database %s: %s\n" \
- % (args.database, e))
- sys.exit(1)
-
- # Verify the database
- try:
- with open(args.public_key, "r") as f:
- if not db.verify(f):
- sys.stderr.write("location-query: Could not verify the database\n")
- sys.exit(1)
-
- # Catch any errors when loading the public key
- except (FileNotFoundError, OSError) as e:
- sys.stderr.write("Could not read the public key: %s\n" % e)
- sys.exit(1)
-
- # Call function
- ret = args.func(db, args)
-
- # Return with exit code
- if ret:
- sys.exit(ret)
-
- # Otherwise just exit
- sys.exit(0)
-
- def handle_lookup(self, db, ns):
- ret = 0
-
- for address in ns.address:
- try:
- n = db.lookup(address)
- except ValueError:
- print(_("Invalid IP address: %s") % address, file=sys.stderr)
-
- args = {
- "address" : address,
- "network" : n,
- }
-
- # Nothing found?
- if not n:
- print(_("Nothing found for %(address)s") % args, file=sys.stderr)
- ret = 1
- continue
-
- # Try to retrieve the AS if we have an AS number
- if n.asn:
- a = db.get_as(n.asn)
-
- # If we have found an AS we will print it in the message
- if a:
- args.update({
- "as" : a,
- })
-
- print(_("%(address)s belongs to %(network)s which is a part of %(as)s") % args)
- continue
-
- print(_("%(address)s belongs to %(network)s") % args)
-
- return ret
-
- def handle_get_as(self, db, ns):
- """
- Gets information about Autonomous Systems
- """
- ret = 0
-
- for asn in ns.asn:
- try:
- asn = int(asn)
- except ValueError:
- print(_("Invalid ASN: %s") % asn, file=sys.stderr)
- ret = 1
- continue
-
- # Fetch AS from database
- a = db.get_as(asn)
-
- # Nothing found
- if not a:
- print(_("Could not find AS%s") % asn, file=sys.stderr)
- ret = 1
- continue
-
- print(_("AS%(asn)s belongs to %(name)s") % { "asn" : a.number, "name" : a.name })
-
- return ret
-
- def handle_search_as(self, db, ns):
- for query in ns.query:
- # Print all matches ASes
- for a in db.search_as(query):
- print(a)
-
- def __get_output_formatter(self, ns):
- try:
- cls = self.output_formats[ns.output_format]
- except KeyError:
- cls = OutputFormatter
-
- return cls(ns)
-
- def handle_list_networks_by_as(self, db, ns):
- with self.__get_output_formatter(ns) as f:
- for asn in ns.asn:
- # Print all matching networks
- for n in db.search_networks(asn=asn):
- f.network(n)
-
- def handle_list_networks_by_cc(self, db, ns):
- with self.__get_output_formatter(ns) as f:
- for country_code in ns.country_code:
- # Print all matching networks
- for n in db.search_networks(country_code=country_code):
- f.network(n)
-
- def handle_list_networks_by_flags(self, db, ns):
- flags = 0
-
- if ns.anonymous_proxy:
- flags |= location.NETWORK_FLAG_ANONYMOUS_PROXY
-
- if ns.satellite_provider:
- flags |= location.NETWORK_FLAG_SATELLITE_PROVIDER
-
- if ns.anycast:
- flags |= location.NETWORK_FLAG_ANYCAST
-
- with self.__get_output_formatter(ns) as f:
- for n in db.search_networks(flags=flags):
- f.network(n)
-
-
-def main():
- # Run the command line interface
- c = CLI()
- c.run()
-
-main()