#!/usr/bin/python3 ############################################################################### # # # libloc - A library to determine the location of someone on the Internet # # # # Copyright (C) 2019 IPFire Development Team # # # # This library is free software; you can redistribute it and/or # # modify it under the terms of the GNU Lesser General Public # # License as published by the Free Software Foundation; either # # version 2.1 of the License, or (at your option) any later version. # # # # This library is distributed in the hope that it will be useful, # # but WITHOUT ANY WARRANTY; without even the implied warranty of # # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # # Lesser General Public License for more details. # # # ############################################################################### import argparse import gettext import io import ipaddress import logging import logging.handlers import os.path import socket import sys # Load our location module import location def setup_logging(level=logging.INFO): l = logging.getLogger("location-exporter") l.setLevel(level) # Log to console h = logging.StreamHandler() h.setLevel(logging.DEBUG) l.addHandler(h) # Log to syslog h = logging.handlers.SysLogHandler(address="/dev/log", facility=logging.handlers.SysLogHandler.LOG_DAEMON) h.setLevel(logging.INFO) l.addHandler(h) # Format syslog messages formatter = logging.Formatter("location-exporter[%(process)d]: %(message)s") h.setFormatter(formatter) return l # Initialise logging log = setup_logging() # i18n def _(singular, plural=None, n=None): if plural: return gettext.dngettext("libloc", singular, plural, n) return gettext.dgettext("libloc", singular) class OutputWriter(object): suffix = "networks" def __init__(self, family, country_code=None, asn=None): self.family, self.country_code, self.asn = family, country_code, asn self.f = io.BytesIO() def write_out(self, directory): # Make the output filename filename = os.path.join( directory, self._make_filename(), ) with open(filename, "wb") as f: self._write_header(f) # Copy all data into the file f.write(self.f.getbuffer()) self._write_footer(f) def _make_filename(self): return "%s.%s%s" % ( self.country_code or "AS%s" % self.asn, self.suffix, "6" if self.family == socket.AF_INET6 else "4" ) @property def name(self): if self.country_code: return "CC_%s" % self.country_code if self.asn: return "AS%s" % self.asn def _write_header(self, f): """ The header of the file """ pass def _write_footer(self, f): """ The footer of the file """ pass def write(self, network): s = "%s\n" % network self.f.write(s.encode("ascii")) class IpsetOutputWriter(OutputWriter): """ For ipset """ suffix = "ipset" def _write_header(self, f): h = "create %s hash:net family inet hashsize 1024 maxelem 65536\n" % self.name f.write(h.encode("ascii")) def write(self, network): s = "add %s %s\n" % (self.name, network) self.f.write(s.encode("ascii")) class NftablesOutputWriter(OutputWriter): """ For nftables """ suffix = "set" def _write_header(self, f): h = "define %s = {\n" % self.name f.write(h.encode("ascii")) def _write_footer(self, f): f.write(b"}") def write(self, network): s = " %s,\n" % network self.f.write(s.encode("ascii")) class XTGeoIPOutputWriter(OutputWriter): """ Formats the output in that way, that it can be loaded by the xt_geoip kernel module from xtables-addons. """ suffix = "iv" def write(self, network): n = ipaddress.ip_network("%s" % network) for address in (n.network_address, n.broadcast_address): bytes = socket.inet_pton( socket.AF_INET6 if address.version == 6 else socket.AF_INET, "%s" % address, ) self.f.write(bytes) class Exporter(object): def __init__(self, db, writer): self.db = db self.writer = writer def export(self, directory, families, countries, asns): for family in families: log.debug("Exporting family %s" % family) writers = {} # Create writers for countries for country_code in countries: writers[country_code] = self.writer(family, country_code=country_code) # Create writers for ASNs for asn in asns: writers[asn] = self.writer(family, asn=asn) # Get all networks that match the family networks = self.db.search_networks(family=family) # Walk through all networks for network in networks: # Write matching countries if network.country_code in countries: writers[network.country_code].write(network) # Write matching ASNs if network.asn in asns: writers[network.asn].write(network) # Write everything to the filesystem for writer in writers.values(): writer.write_out(directory) class CLI(object): output_formats = { "ipset" : IpsetOutputWriter, "list" : OutputWriter, "nftables" : NftablesOutputWriter, "xt_geoip" : XTGeoIPOutputWriter, } def parse_cli(self): parser = argparse.ArgumentParser( description=_("Location Exporter Command Line Interface"), ) # Global configuration flags parser.add_argument("--debug", action="store_true", help=_("Enable debug output")) # version parser.add_argument("--version", action="version", version="%(prog)s @VERSION@") # database parser.add_argument("--database", "-d", default="@databasedir@/database.db", help=_("Path to database"), ) # format parser.add_argument("--format", help=_("Output format"), default="list", choices=self.output_formats.keys()) # directory parser.add_argument("--directory", help=_("Output directory"), required=True) # family parser.add_argument("--family", help=_("Specify address family"), choices=("ipv6", "ipv4")) # Countries and Autonomous Systems parser.add_argument("objects", nargs="+") args = parser.parse_args() # Enable debug logging if args.debug: log.setLevel(logging.DEBUG) return args def run(self): # Parse command line arguments args = self.parse_cli() # Call function ret = self.handle_export(args) # Return with exit code if ret: sys.exit(ret) # Otherwise just exit sys.exit(0) def handle_export(self, ns): countries, asns = [], [] # Translate family if ns.family == "ipv6": families = [ socket.AF_INET6 ] elif ns.family == "ipv4": families = [ socket.AF_INET ] else: families = [ socket.AF_INET6, socket.AF_INET ] for object in ns.objects: if object.startswith("AS"): try: object = int(object[2:]) except ValueError: log.error("Invalid argument: %s" % object) return 2 asns.append(object) elif location.country_code_is_valid(object) \ or object in ("A1", "A2", "A3"): countries.append(object) else: log.error("Invalid argument: %s" % object) return 2 # Open the database try: db = location.Database(ns.database) except FileNotFoundError as e: log.error("Count not open database: %s" % ns.database) return 1 # Select the output format writer = self.output_formats.get(ns.format) assert writer e = Exporter(db, writer) e.export(ns.directory, countries=countries, asns=asns, families=families) def main(): # Run the command line interface c = CLI() c.run() main()