]> git.ipfire.org Git - people/ms/libloc.git/blobdiff - src/python/export.py
Make sources around that we can run tests without location installed
[people/ms/libloc.git] / src / python / export.py
diff --git a/src/python/export.py b/src/python/export.py
deleted file mode 100644 (file)
index 13c0540..0000000
+++ /dev/null
@@ -1,292 +0,0 @@
-#!/usr/bin/python3
-###############################################################################
-#                                                                             #
-# libloc - A library to determine the location of someone on the Internet     #
-#                                                                             #
-# Copyright (C) 2020-2021 IPFire Development Team <info@ipfire.org>           #
-#                                                                             #
-# This library is free software; you can redistribute it and/or               #
-# modify it under the terms of the GNU Lesser General Public                  #
-# License as published by the Free Software Foundation; either                #
-# version 2.1 of the License, or (at your option) any later version.          #
-#                                                                             #
-# This library is distributed in the hope that it will be useful,             #
-# but WITHOUT ANY WARRANTY; without even the implied warranty of              #
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU           #
-# Lesser General Public License for more details.                             #
-#                                                                             #
-###############################################################################
-
-import io
-import ipaddress
-import logging
-import math
-import os
-import socket
-import sys
-
-from .i18n import _
-import _location
-
-# Initialise logging
-log = logging.getLogger("location.export")
-log.propagate = 1
-
-FLAGS = {
-       _location.NETWORK_FLAG_ANONYMOUS_PROXY    : "A1",
-       _location.NETWORK_FLAG_SATELLITE_PROVIDER : "A2",
-       _location.NETWORK_FLAG_ANYCAST            : "A3",
-       _location.NETWORK_FLAG_DROP               : "XD",
-}
-
-class OutputWriter(object):
-       suffix = "networks"
-       mode = "w"
-
-       def __init__(self, name, family=None, directory=None, f=None):
-               self.name = name
-               self.family = family
-               self.directory = directory
-
-               # Open output file
-               if f:
-                       self.f = f
-               elif self.directory:
-                       self.f = open(self.filename, self.mode)
-               elif "b" in self.mode:
-                       self.f = io.BytesIO()
-               else:
-                       self.f = io.StringIO()
-
-               # Tag
-               self.tag = self._make_tag()
-
-               # Call any custom initialization
-               self.init()
-
-               # Immediately write the header
-               self._write_header()
-
-       def init(self):
-               """
-                       To be overwritten by anything that inherits from this
-               """
-               pass
-
-       def __repr__(self):
-               return "<%s %s f=%s>" % (self.__class__.__name__, self, self.f)
-
-       def _make_tag(self):
-               families = {
-                       socket.AF_INET6 : "6",
-                       socket.AF_INET  : "4",
-               }
-
-               return "%sv%s" % (self.name, families.get(self.family, "?"))
-
-       @functools.cached_property
-       def filename(self):
-               if self.directory:
-                       return os.path.join(self.directory, "%s.%s" % (self.tag, self.suffix))
-
-       def _write_header(self):
-               """
-                       The header of the file
-               """
-               pass
-
-       def _write_footer(self):
-               """
-                       The footer of the file
-               """
-               pass
-
-       def write(self, network):
-               self.f.write("%s\n" % network)
-
-       def finish(self):
-               """
-                       Called when all data has been written
-               """
-               self._write_footer()
-
-               # Flush all output
-               self.f.flush()
-
-       def print(self):
-               """
-                       Prints the entire output line by line
-               """
-               if isinstance(self.f, io.BytesIO):
-                       raise TypeError(_("Won't write binary output to stdout"))
-
-               # Go back to the beginning
-               self.f.seek(0)
-
-               # Iterate over everything line by line
-               for line in self.f:
-                       sys.stdout.write(line)
-
-
-class IpsetOutputWriter(OutputWriter):
-       """
-               For ipset
-       """
-       suffix = "ipset"
-
-       # The value is being used if we don't know any better
-       DEFAULT_HASHSIZE = 64
-
-       # We aim for this many networks in a bucket on average. This allows us to choose
-       # how much memory we want to sacrifice to gain better performance. The lower the
-       # factor, the faster a lookup will be, but it will use more memory.
-       # We will aim for only using three quarters of all buckets to avoid any searches
-       # through the linked lists.
-       HASHSIZE_FACTOR = 0.75
-
-       def init(self):
-               # Count all networks
-               self.networks = 0
-
-       @property
-       def hashsize(self):
-               """
-                       Calculates an optimized hashsize
-               """
-               # Return the default value if we don't know the size of the set
-               if not self.networks:
-                       return self.DEFAULT_HASHSIZE
-
-               # Find the nearest power of two that is larger than the number of networks
-               # divided by the hashsize factor.
-               exponent = math.log(self.networks / self.HASHSIZE_FACTOR, 2)
-
-               # Return the size of the hash (the minimum is 64)
-               return max(2 ** math.ceil(exponent), 64)
-
-       def _write_header(self):
-               # This must have a fixed size, because we will write the header again in the end
-               self.f.write("create %s hash:net family inet%s" % (
-                       self.tag,
-                       "6" if self.family == socket.AF_INET6 else ""
-               ))
-               self.f.write(" hashsize %8d maxelem 1048576 -exist\n" % self.hashsize)
-               self.f.write("flush %s\n" % self.tag)
-
-       def write(self, network):
-               self.f.write("add %s %s\n" % (self.tag, network))
-
-               # Increment network counter
-               self.networks += 1
-
-       def _write_footer(self):
-               # Jump back to the beginning of the file
-               self.f.seek(0)
-
-               # Rewrite the header with better configuration
-               self._write_header()
-
-
-class NftablesOutputWriter(OutputWriter):
-       """
-               For nftables
-       """
-       suffix = "set"
-
-       def _write_header(self):
-               self.f.write("define %s = {\n" % self.tag)
-
-       def _write_footer(self):
-               self.f.write("}\n")
-
-       def write(self, network):
-               self.f.write("  %s,\n" % network)
-
-
-class XTGeoIPOutputWriter(OutputWriter):
-       """
-               Formats the output in that way, that it can be loaded by
-               the xt_geoip kernel module from xtables-addons.
-       """
-       mode = "wb"
-
-       @property
-       def tag(self):
-               return self.name
-
-       @property
-       def suffix(self):
-               return "iv%s" % ("6" if self.family == socket.AF_INET6 else "4")
-
-       def write(self, network):
-               self.f.write(network._first_address)
-               self.f.write(network._last_address)
-
-
-formats = {
-       "ipset"    : IpsetOutputWriter,
-       "list"     : OutputWriter,
-       "nftables" : NftablesOutputWriter,
-       "xt_geoip" : XTGeoIPOutputWriter,
-}
-
-class Exporter(object):
-       def __init__(self, db, writer):
-               self.db, self.writer = db, writer
-
-       def export(self, directory, families, countries, asns):
-               for family in families:
-                       log.debug("Exporting family %s" % family)
-
-                       writers = {}
-
-                       # Create writers for countries
-                       for country_code in countries:
-                               writers[country_code] = self.writer(country_code, family=family, directory=directory)
-
-                       # Create writers for ASNs
-                       for asn in asns:
-                               writers[asn] = self.writer("AS%s" % asn, family=family, directory=directory)
-
-                       # Filter countries from special country codes
-                       country_codes = [
-                               country_code for country_code in countries if not country_code in FLAGS.values()
-                       ]
-
-                       # Get all networks that match the family
-                       networks = self.db.search_networks(family=family,
-                               country_codes=country_codes, asns=asns, flatten=True)
-
-                       # Walk through all networks
-                       for network in networks:
-                               # Write matching countries
-                               try:
-                                       writers[network.country_code].write(network)
-                               except KeyError:
-                                       pass
-
-                               # Write matching ASNs
-                               try:
-                                       writers[network.asn].write(network)
-                               except KeyError:
-                                       pass
-
-                               # Handle flags
-                               for flag in FLAGS:
-                                       if network.has_flag(flag):
-                                               # Fetch the "fake" country code
-                                               country = FLAGS[flag]
-
-                                               try:
-                                                       writers[country].write(network)
-                                               except KeyError:
-                                                       pass
-
-                       # Write everything to the filesystem
-                       for writer in writers.values():
-                               writer.finish()
-
-                       # Print to stdout
-                       if not directory:
-                               for writer in writers.values():
-                                       writer.print()