--- /dev/null
+#!/usr/bin/python3
+###############################################################################
+# #
+# libloc - A library to determine the location of someone on the Internet #
+# #
+# Copyright (C) 2019 IPFire Development Team <info@ipfire.org> #
+# #
+# This library is free software; you can redistribute it and/or #
+# modify it under the terms of the GNU Lesser General Public #
+# License as published by the Free Software Foundation; either #
+# version 2.1 of the License, or (at your option) any later version. #
+# #
+# This library is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU #
+# Lesser General Public License for more details. #
+# #
+###############################################################################
+
+import argparse
+import gettext
+import io
+import ipaddress
+import logging
+import logging.handlers
+import os.path
+import socket
+import sys
+
+# Load our location module
+import location
+
+def setup_logging(level=logging.INFO):
+ l = logging.getLogger("location-downloader")
+ l.setLevel(level)
+
+ # Log to console
+ h = logging.StreamHandler()
+ h.setLevel(logging.DEBUG)
+ l.addHandler(h)
+
+ # Log to syslog
+ h = logging.handlers.SysLogHandler(address="/dev/log",
+ facility=logging.handlers.SysLogHandler.LOG_DAEMON)
+ h.setLevel(logging.INFO)
+ l.addHandler(h)
+
+ # Format syslog messages
+ formatter = logging.Formatter("location-exporter[%(process)d]: %(message)s")
+ h.setFormatter(formatter)
+
+ return l
+
+# Initialise logging
+log = setup_logging()
+
+# i18n
+def _(singular, plural=None, n=None):
+ if plural:
+ return gettext.dngettext("libloc", singular, plural, n)
+
+ return gettext.dgettext("libloc", singular)
+
+class OutputWriter(object):
+ suffix = "networks"
+
+ def __init__(self, family, country_code=None, asn=None):
+ self.family, self.country_code, self.asn = family, country_code, asn
+
+ self.f = io.BytesIO()
+
+ def write_out(self, directory):
+ # Make the output filename
+ filename = os.path.join(
+ directory, self._make_filename(),
+ )
+
+ with open(filename, "wb") as f:
+ self._write_header(f)
+
+ # Copy all data into the file
+ f.write(self.f.getbuffer())
+
+ self._write_footer(f)
+
+ def _make_filename(self):
+ return "%s.%s%s" % (
+ self.country_code or "AS%s" % self.asn,
+ self.suffix,
+ "6" if self.family == socket.AF_INET6 else "4"
+ )
+
+ @property
+ def name(self):
+ if self.country_code:
+ return "CC_%s" % self.country_code
+
+ if self.asn:
+ return "AS%s" % self.asn
+
+ def _write_header(self, f):
+ """
+ The header of the file
+ """
+ pass
+
+ def _write_footer(self, f):
+ """
+ The footer of the file
+ """
+ pass
+
+ def write(self, network):
+ s = "%s\n" % network
+
+ self.f.write(s.encode("ascii"))
+
+
+class IpsetOutputWriter(OutputWriter):
+ """
+ For ipset
+ """
+ suffix = "ipset"
+
+ def _write_header(self, f):
+ h = "create %s hash:net family inet hashsize 1024 maxelem 65536" % self.name
+
+ f.write(h.encode("ascii"))
+
+ def write(self, network):
+ s = "add %s %s\n" % (self.name, network)
+
+ self.f.write(s.encode("ascii"))
+
+
+class NftablesOutputWriter(OutputWriter):
+ """
+ For nftables
+ """
+ suffix = "set"
+
+ def _write_header(self, f):
+ h = "define %s = {" % self.name
+
+ f.write(h.encode("ascii"))
+
+ def _write_footer(self):
+ f.write(b"}")
+
+ def write(self, network):
+ s = " %s,\n" % network
+
+ self.f.write(s.encode("ascii"))
+
+
+class XTGeoIPOutputWriter(OutputWriter):
+ """
+ Formats the output in that way, that it can be loaded by
+ the xt_geoip kernel module from xtables-addons.
+ """
+ suffix = "iv"
+
+ def write(self, network):
+ n = ipaddress.ip_network("%s" % network)
+
+ for address in (n.network_address, n.broadcast_address):
+ bytes = socket.inet_pton(
+ socket.AF_INET6 if address.version == 6 else socket.AF_INET,
+ "%s" % address,
+ )
+
+ self.f.write(bytes)
+
+
+class Exporter(object):
+ def __init__(self, db, writer):
+ self.db = db
+ self.writer = writer
+
+ def export(self, directory, countries, asns):
+ for family in (socket.AF_INET6, socket.AF_INET):
+ log.debug("Exporting family %s" % family)
+
+ writers = {}
+
+ # Create writers for countries
+ for country_code in countries:
+ writers[country_code] = self.writer(family, country_code=country_code)
+
+ # Create writers for ASNs
+ for asn in asns:
+ writers[asn] = self.writer(family, asn=asn)
+
+ # Get all networks that match the family
+ networks = self.db.search_networks(family=family)
+
+ # Walk through all networks
+ for network in networks:
+ # Write matching countries
+ if network.country_code in countries:
+ writers[network.country_code].write(network)
+
+ # Write matching ASNs
+ if network.asn in asns:
+ writers[network.asn].write(network)
+
+ # Write everything to the filesystem
+ for writer in writers.values():
+ writer.write_out(directory)
+
+
+class CLI(object):
+ output_formats = {
+ "ipset" : IpsetOutputWriter,
+ "list" : OutputWriter,
+ "nftables" : NftablesOutputWriter,
+ "xt_geoip" : XTGeoIPOutputWriter,
+ }
+
+ def parse_cli(self):
+ parser = argparse.ArgumentParser(
+ description=_("Location Exporter Command Line Interface"),
+ )
+
+ # Global configuration flags
+ parser.add_argument("--debug", action="store_true",
+ help=_("Enable debug output"))
+
+ # version
+ parser.add_argument("--version", action="version",
+ version="%%(prog)s %s" % location.__version__)
+
+ # database
+ parser.add_argument("--database", "-d",
+ default="@databasedir@/database.db", help=_("Path to database"),
+ )
+
+ # format
+ parser.add_argument("--format", help=_("Output format"),
+ )#default="list", choices=self.output_formats.keys())
+
+ # directory
+ parser.add_argument("--directory", help=_("Output directory"), required=True)
+
+ # Countries and Autonomous Systems
+ parser.add_argument("objects", nargs="+")
+
+ args = parser.parse_args()
+
+ # Enable debug logging
+ if args.debug:
+ log.setLevel(logging.DEBUG)
+
+ return args
+
+ def run(self):
+ # Parse command line arguments
+ args = self.parse_cli()
+
+ # Call function
+ ret = self.handle_export(args)
+
+ # Return with exit code
+ if ret:
+ sys.exit(ret)
+
+ # Otherwise just exit
+ sys.exit(0)
+
+ def handle_export(self, ns):
+ countries, asns = [], []
+
+ for object in ns.objects:
+ if object.startswith("AS"):
+ try:
+ object = int(object[2:])
+ except ValueError:
+ log.error("Invalid argument: %s" % object)
+ return 2
+
+ asns.append(object)
+
+ elif not len(object) == 2:
+ log.error("Invalid argument: %s" % object)
+ return 2
+
+ else:
+ countries.append(object)
+
+ # Open the database
+ try:
+ db = location.Database(ns.database)
+ except FileNotFoundError as e:
+ log.error("Count not open database: %s" % ns.database)
+ return 1
+
+ # Select the output format
+ writer = self.output_formats.get(ns.format)
+ assert writer
+
+ e = Exporter(db, writer)
+ e.export(ns.directory, countries=countries, asns=asns)
+
+
+def main():
+ # Run the command line interface
+ c = CLI()
+ c.run()
+
+main()