+#!/usr/bin/python3
+###############################################################################
+# #
+# libloc - A library to determine the location of someone on the Internet #
+# #
+# Copyright (C) 2020-2024 IPFire Development Team <info@ipfire.org> #
+# #
+# This library is free software; you can redistribute it and/or #
+# modify it under the terms of the GNU Lesser General Public #
+# License as published by the Free Software Foundation; either #
+# version 2.1 of the License, or (at your option) any later version. #
+# #
+# This library is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU #
+# Lesser General Public License for more details. #
+# #
+###############################################################################
+
+import argparse
+import asyncio
+import csv
+import functools
+import http.client
+import io
+import ipaddress
+import json
+import logging
+import math
+import re
+import socket
+import sys
+import urllib.error
+
+# Load our location module
+import location
+import location.database
+from location.downloader import Downloader
+from location.i18n import _
+
+# Initialise logging
+log = logging.getLogger("location.importer")
+log.propagate = 1
+
+# Define constants
+VALID_ASN_RANGES = (
+ (1, 23455),
+ (23457, 64495),
+ (131072, 4199999999),
+)
+
+TRANSLATED_COUNTRIES = {
+ # When people say UK, they mean GB
+ "UK" : "GB",
+}
+
+IGNORED_COUNTRIES = set((
+ # Formerly Yugoslavia
+ "YU",
+
+ # Some people use ZZ to say "no country" or to hide the country
+ "ZZ",
+))
+
+# Configure the CSV parser for ARIN
+csv.register_dialect("arin", delimiter=",", quoting=csv.QUOTE_ALL, quotechar="\"")
+
+class CLI(object):
+ def parse_cli(self):
+ parser = argparse.ArgumentParser(
+ description=_("Location Importer Command Line Interface"),
+ )
+ subparsers = parser.add_subparsers()
+
+ # Global configuration flags
+ parser.add_argument("--debug", action="store_true",
+ help=_("Enable debug output"))
+ parser.add_argument("--quiet", action="store_true",
+ help=_("Enable quiet mode"))
+
+ # version
+ parser.add_argument("--version", action="version",
+ version="%(prog)s @VERSION@")
+
+ # Database
+ parser.add_argument("--database-host", required=True,
+ help=_("Database Hostname"), metavar=_("HOST"))
+ parser.add_argument("--database-name", required=True,
+ help=_("Database Name"), metavar=_("NAME"))
+ parser.add_argument("--database-username", required=True,
+ help=_("Database Username"), metavar=_("USERNAME"))
+ parser.add_argument("--database-password", required=True,
+ help=_("Database Password"), metavar=_("PASSWORD"))
+
+ # Write Database
+ write = subparsers.add_parser("write", help=_("Write database to file"))
+ write.set_defaults(func=self.handle_write)
+ write.add_argument("file", nargs=1, help=_("Database File"))
+ write.add_argument("--signing-key", nargs="?", type=open, help=_("Signing Key"))
+ write.add_argument("--backup-signing-key", nargs="?", type=open, help=_("Backup Signing Key"))
+ write.add_argument("--vendor", nargs="?", help=_("Sets the vendor"))
+ write.add_argument("--description", nargs="?", help=_("Sets a description"))
+ write.add_argument("--license", nargs="?", help=_("Sets the license"))
+ write.add_argument("--version", type=int, help=_("Database Format Version"))
+
+ # Update WHOIS
+ update_whois = subparsers.add_parser("update-whois", help=_("Update WHOIS Information"))
+ update_whois.add_argument("sources", nargs="*",
+ help=_("Only update these sources"))
+ update_whois.set_defaults(func=self.handle_update_whois)
+
+ # Update announcements
+ update_announcements = subparsers.add_parser("update-announcements",
+ help=_("Update BGP Annoucements"))
+ update_announcements.set_defaults(func=self.handle_update_announcements)
+ update_announcements.add_argument("server", nargs=1,
+ help=_("Route Server to connect to"), metavar=_("SERVER"))
+
+ # Update geofeeds
+ update_geofeeds = subparsers.add_parser("update-geofeeds",
+ help=_("Update Geofeeds"))
+ update_geofeeds.set_defaults(func=self.handle_update_geofeeds)
+
+ # Update feeds
+ update_feeds = subparsers.add_parser("update-feeds",
+ help=_("Update Feeds"))
+ update_feeds.add_argument("feeds", nargs="*",
+ help=_("Only update these feeds"))
+ update_feeds.set_defaults(func=self.handle_update_feeds)
+
+ # Update overrides
+ update_overrides = subparsers.add_parser("update-overrides",
+ help=_("Update overrides"),
+ )
+ update_overrides.add_argument(
+ "files", nargs="+", help=_("Files to import"),
+ )
+ update_overrides.set_defaults(func=self.handle_update_overrides)
+
+ # Import countries
+ import_countries = subparsers.add_parser("import-countries",
+ help=_("Import countries"),
+ )
+ import_countries.add_argument("file", nargs=1, type=argparse.FileType("r"),
+ help=_("File to import"))
+ import_countries.set_defaults(func=self.handle_import_countries)
+
+ args = parser.parse_args()
+
+ # Configure logging
+ if args.debug:
+ location.logger.set_level(logging.DEBUG)
+ elif args.quiet:
+ location.logger.set_level(logging.WARNING)
+
+ # Print usage if no action was given
+ if not "func" in args:
+ parser.print_usage()
+ sys.exit(2)
+
+ return args
+
+ async def run(self):
+ # Parse command line arguments
+ args = self.parse_cli()
+
+ # Initialize the downloader
+ self.downloader = Downloader()
+
+ # Initialise database
+ self.db = self._setup_database(args)
+
+ # Call function
+ ret = await args.func(args)
+
+ # Return with exit code
+ if ret:
+ sys.exit(ret)
+
+ # Otherwise just exit
+ sys.exit(0)
+
+ def _setup_database(self, ns):
+ """
+ Initialise the database
+ """
+ # Connect to database
+ db = location.database.Connection(
+ host=ns.database_host, database=ns.database_name,
+ user=ns.database_username, password=ns.database_password,
+ )
+
+ with db.transaction():
+ db.execute("""
+ -- announcements
+ CREATE TABLE IF NOT EXISTS announcements(network inet, autnum bigint,
+ first_seen_at timestamp without time zone DEFAULT CURRENT_TIMESTAMP,
+ last_seen_at timestamp without time zone DEFAULT CURRENT_TIMESTAMP);
+ CREATE UNIQUE INDEX IF NOT EXISTS announcements_networks ON announcements(network);
+ CREATE INDEX IF NOT EXISTS announcements_search2 ON announcements
+ USING SPGIST(network inet_ops);
+
+ -- autnums
+ CREATE TABLE IF NOT EXISTS autnums(number bigint, name text NOT NULL);
+ ALTER TABLE autnums ADD COLUMN IF NOT EXISTS source text;
+ CREATE UNIQUE INDEX IF NOT EXISTS autnums_number ON autnums(number);
+
+ -- countries
+ CREATE TABLE IF NOT EXISTS countries(
+ country_code text NOT NULL, name text NOT NULL, continent_code text NOT NULL);
+ CREATE UNIQUE INDEX IF NOT EXISTS countries_country_code ON countries(country_code);
+
+ -- networks
+ CREATE TABLE IF NOT EXISTS networks(network inet, country text);
+ ALTER TABLE networks ADD COLUMN IF NOT EXISTS original_countries text[];
+ ALTER TABLE networks ADD COLUMN IF NOT EXISTS source text;
+ CREATE UNIQUE INDEX IF NOT EXISTS networks_network ON networks(network);
+ CREATE INDEX IF NOT EXISTS networks_search2 ON networks
+ USING SPGIST(network inet_ops);
+
+ -- geofeeds
+ CREATE TABLE IF NOT EXISTS geofeeds(
+ id serial primary key,
+ url text,
+ status integer default null,
+ updated_at timestamp without time zone default null
+ );
+ ALTER TABLE geofeeds ADD COLUMN IF NOT EXISTS error text;
+ CREATE UNIQUE INDEX IF NOT EXISTS geofeeds_unique
+ ON geofeeds(url);
+ CREATE TABLE IF NOT EXISTS geofeed_networks(
+ geofeed_id integer references geofeeds(id) on delete cascade,
+ network inet,
+ country text,
+ region text,
+ city text
+ );
+ CREATE INDEX IF NOT EXISTS geofeed_networks_geofeed_id
+ ON geofeed_networks(geofeed_id);
+ CREATE TABLE IF NOT EXISTS network_geofeeds(network inet, url text);
+ ALTER TABLE network_geofeeds ADD COLUMN IF NOT EXISTS source text NOT NULL;
+ CREATE UNIQUE INDEX IF NOT EXISTS network_geofeeds_unique2
+ ON network_geofeeds(network, url);
+ CREATE INDEX IF NOT EXISTS network_geofeeds_url
+ ON network_geofeeds(url);
+
+ -- feeds
+ CREATE TABLE IF NOT EXISTS autnum_feeds(
+ number bigint NOT NULL,
+ source text NOT NULL,
+ name text,
+ country text,
+ is_anonymous_proxy boolean,
+ is_satellite_provider boolean,
+ is_anycast boolean,
+ is_drop boolean
+ );
+ CREATE UNIQUE INDEX IF NOT EXISTS autnum_feeds_unique
+ ON autnum_feeds(number, source);
+
+ CREATE TABLE IF NOT EXISTS network_feeds(
+ network inet NOT NULL,
+ source text NOT NULL,
+ country text,
+ is_anonymous_proxy boolean,
+ is_satellite_provider boolean,
+ is_anycast boolean,
+ is_drop boolean
+ );
+ CREATE UNIQUE INDEX IF NOT EXISTS network_feeds_unique
+ ON network_feeds(network, source);
+
+ -- overrides
+ CREATE TABLE IF NOT EXISTS autnum_overrides(
+ number bigint NOT NULL,
+ name text,
+ country text,
+ is_anonymous_proxy boolean,
+ is_satellite_provider boolean,
+ is_anycast boolean
+ );
+ CREATE UNIQUE INDEX IF NOT EXISTS autnum_overrides_number
+ ON autnum_overrides(number);
+ ALTER TABLE autnum_overrides ADD COLUMN IF NOT EXISTS is_drop boolean;
+ ALTER TABLE autnum_overrides DROP COLUMN IF EXISTS source;
+
+ CREATE TABLE IF NOT EXISTS network_overrides(
+ network inet NOT NULL,
+ country text,
+ is_anonymous_proxy boolean,
+ is_satellite_provider boolean,
+ is_anycast boolean
+ );
+ CREATE UNIQUE INDEX IF NOT EXISTS network_overrides_network
+ ON network_overrides(network);
+ ALTER TABLE network_overrides ADD COLUMN IF NOT EXISTS is_drop boolean;
+ ALTER TABLE network_overrides DROP COLUMN IF EXISTS source;
+
+ -- Cleanup things we no longer need
+ DROP TABLE IF EXISTS geofeed_overrides;
+ DROP INDEX IF EXISTS announcements_family;
+ DROP INDEX IF EXISTS announcements_search;
+ DROP INDEX IF EXISTS geofeed_networks_search;
+ DROP INDEX IF EXISTS networks_family;
+ DROP INDEX IF EXISTS networks_search;
+ DROP INDEX IF EXISTS network_feeds_search;
+ DROP INDEX IF EXISTS network_geofeeds_unique;
+ DROP INDEX IF EXISTS network_geofeeds_search;
+ DROP INDEX IF EXISTS network_overrides_search;
+ """)
+
+ return db
+
+ def fetch_countries(self):
+ """
+ Returns a list of all countries on the list
+ """
+ # Fetch all valid country codes to check parsed networks aganist...
+ countries = self.db.query("SELECT country_code FROM countries ORDER BY country_code")
+
+ return set((country.country_code for country in countries))
+
+ async def handle_write(self, ns):
+ """
+ Compiles a database in libloc format out of what is in the database
+ """
+ # Allocate a writer
+ writer = location.Writer(ns.signing_key, ns.backup_signing_key)
+
+ # Set all metadata
+ if ns.vendor:
+ writer.vendor = ns.vendor
+
+ if ns.description:
+ writer.description = ns.description
+
+ if ns.license:
+ writer.license = ns.license
+
+ # Analyze everything for the query planner hopefully making better decisions
+ self.db.execute("ANALYZE")
+
+ # Add all Autonomous Systems
+ log.info("Writing Autonomous Systems...")
+
+ # Select all ASes with a name
+ rows = self.db.query("""
+ SELECT
+ autnums.number AS number,
+ COALESCE(
+ overrides.name,
+ autnums.name
+ ) AS name
+ FROM
+ autnums
+ LEFT JOIN
+ autnum_overrides overrides ON autnums.number = overrides.number
+ ORDER BY
+ autnums.number
+ """)
+
+ for row in rows:
+ # Skip AS without names
+ if not row.name:
+ continue
+
+ a = writer.add_as(row.number)
+ a.name = row.name
+
+ # Add all networks
+ log.info("Writing networks...")
+
+ # Create a new temporary table where we collect
+ # the networks that we are interested in
+ self.db.execute("""
+ CREATE TEMPORARY TABLE
+ n
+ (
+ network inet NOT NULL,
+ autnum integer,
+ country text,
+ is_anonymous_proxy boolean,
+ is_satellite_provider boolean,
+ is_anycast boolean,
+ is_drop boolean
+ )
+ WITH (FILLFACTOR = 50)
+ """)
+
+ # Add all known networks
+ self.db.execute("""
+ INSERT INTO
+ n
+ (
+ network
+ )
+
+ SELECT
+ network
+ FROM
+ announcements
+
+ UNION
+
+ SELECT
+ network
+ FROM
+ networks
+
+ UNION
+
+ SELECT
+ network
+ FROM
+ network_feeds
+
+ UNION
+
+ SELECT
+ network
+ FROM
+ network_overrides
+
+ UNION
+
+ SELECT
+ network
+ FROM
+ geofeed_networks
+ """)
+
+ # Create an index to search through networks faster
+ self.db.execute("""
+ CREATE INDEX
+ n_search
+ ON
+ n
+ USING
+ SPGIST(network)
+ """)
+
+ # Analyze n
+ self.db.execute("ANALYZE n")
+
+ # Apply the AS number to all networks
+ self.db.execute("""
+ -- Join all networks together with their most specific announcements
+ WITH announcements AS (
+ SELECT
+ n.network,
+ announcements.autnum,
+
+ -- Sort all merges and number them so
+ -- that we can later select the best one
+ ROW_NUMBER()
+ OVER
+ (
+ PARTITION BY
+ n.network
+ ORDER BY
+ masklen(announcements.network) DESC
+ ) AS row
+ FROM
+ n
+ JOIN
+ announcements
+ ON
+ announcements.network >>= n.network
+ )
+
+ -- Store the result
+ UPDATE
+ n
+ SET
+ autnum = announcements.autnum
+ FROM
+ announcements
+ WHERE
+ announcements.network = n.network
+ AND
+ announcements.row = 1
+ """,
+ )
+
+ # Apply country information
+ self.db.execute("""
+ WITH networks AS (
+ SELECT
+ n.network,
+ networks.country,
+
+ ROW_NUMBER()
+ OVER
+ (
+ PARTITION BY
+ n.network
+ ORDER BY
+ masklen(networks.network) DESC
+ ) AS row
+ FROM
+ n
+ JOIN
+ networks
+ ON
+ networks.network >>= n.network
+ )
+
+ UPDATE
+ n
+ SET
+ country = networks.country
+ FROM
+ networks
+ WHERE
+ networks.network = n.network
+ AND
+ networks.row = 1
+ """,
+ )
+
+ # Add all country information from Geofeeds
+ self.db.execute("""
+ WITH geofeeds AS (
+ SELECT
+ DISTINCT ON (geofeed_networks.network)
+ geofeed_networks.network,
+ geofeed_networks.country
+ FROM
+ geofeeds
+ JOIN
+ network_geofeeds networks
+ ON
+ geofeeds.url = networks.url
+ JOIN
+ geofeed_networks
+ ON
+ geofeeds.id = geofeed_networks.geofeed_id
+ AND
+ networks.network >>= geofeed_networks.network
+ ),
+
+ networks AS (
+ SELECT
+ n.network,
+ geofeeds.country,
+
+ ROW_NUMBER()
+ OVER
+ (
+ PARTITION BY
+ n.network
+ ORDER BY
+ masklen(geofeeds.network) DESC
+ ) AS row
+ FROM
+ n
+ JOIN
+ geofeeds
+ ON
+ geofeeds.network >>= n.network
+ )
+
+ UPDATE
+ n
+ SET
+ country = networks.country
+ FROM
+ networks
+ WHERE
+ networks.network = n.network
+ AND
+ networks.row = 1
+ """,
+ )
+
+ # Apply country and flags from feeds
+ self.db.execute("""
+ WITH networks AS (
+ SELECT
+ n.network,
+ network_feeds.country,
+
+ -- Flags
+ network_feeds.is_anonymous_proxy,
+ network_feeds.is_satellite_provider,
+ network_feeds.is_anycast,
+ network_feeds.is_drop,
+
+ ROW_NUMBER()
+ OVER
+ (
+ PARTITION BY
+ n.network
+ ORDER BY
+ masklen(network_feeds.network) DESC
+ ) AS row
+ FROM
+ n
+ JOIN
+ network_feeds
+ ON
+ network_feeds.network >>= n.network
+ )
+
+ UPDATE
+ n
+ SET
+ country =
+ COALESCE(networks.country, n.country),
+
+ is_anonymous_proxy =
+ COALESCE(networks.is_anonymous_proxy, n.is_anonymous_proxy),
+
+ is_satellite_provider =
+ COALESCE(networks.is_satellite_provider, n.is_satellite_provider),
+
+ is_anycast =
+ COALESCE(networks.is_anycast, n.is_anycast),
+
+ is_drop =
+ COALESCE(networks.is_drop, n.is_drop)
+ FROM
+ networks
+ WHERE
+ networks.network = n.network
+ AND
+ networks.row = 1
+ """,
+ )
+
+ # Apply country and flags from AS feeds
+ self.db.execute("""
+ WITH networks AS (
+ SELECT
+ n.network,
+ autnum_feeds.country,
+
+ -- Flags
+ autnum_feeds.is_anonymous_proxy,
+ autnum_feeds.is_satellite_provider,
+ autnum_feeds.is_anycast,
+ autnum_feeds.is_drop
+ FROM
+ n
+ JOIN
+ autnum_feeds
+ ON
+ autnum_feeds.number = n.autnum
+ )
+
+ UPDATE
+ n
+ SET
+ country =
+ COALESCE(networks.country, n.country),
+
+ is_anonymous_proxy =
+ COALESCE(networks.is_anonymous_proxy, n.is_anonymous_proxy),
+
+ is_satellite_provider =
+ COALESCE(networks.is_satellite_provider, n.is_satellite_provider),
+
+ is_anycast =
+ COALESCE(networks.is_anycast, n.is_anycast),
+
+ is_drop =
+ COALESCE(networks.is_drop, n.is_drop)
+ FROM
+ networks
+ WHERE
+ networks.network = n.network
+ """)
+
+ # Apply network overrides
+ self.db.execute("""
+ WITH networks AS (
+ SELECT
+ n.network,
+ network_overrides.country,
+
+ -- Flags
+ network_overrides.is_anonymous_proxy,
+ network_overrides.is_satellite_provider,
+ network_overrides.is_anycast,
+ network_overrides.is_drop,
+
+ ROW_NUMBER()
+ OVER
+ (
+ PARTITION BY
+ n.network
+ ORDER BY
+ masklen(network_overrides.network) DESC
+ ) AS row
+ FROM
+ n
+ JOIN
+ network_overrides
+ ON
+ network_overrides.network >>= n.network
+ )
+
+ UPDATE
+ n
+ SET
+ country =
+ COALESCE(networks.country, n.country),
+
+ is_anonymous_proxy =
+ COALESCE(networks.is_anonymous_proxy, n.is_anonymous_proxy),
+
+ is_satellite_provider =
+ COALESCE(networks.is_satellite_provider, n.is_satellite_provider),
+
+ is_anycast =
+ COALESCE(networks.is_anycast, n.is_anycast),
+
+ is_drop =
+ COALESCE(networks.is_drop, n.is_drop)
+ FROM
+ networks
+ WHERE
+ networks.network = n.network
+ AND
+ networks.row = 1
+ """)
+
+ # Apply AS overrides
+ self.db.execute("""
+ WITH networks AS (
+ SELECT
+ n.network,
+ autnum_overrides.country,
+
+ -- Flags
+ autnum_overrides.is_anonymous_proxy,
+ autnum_overrides.is_satellite_provider,
+ autnum_overrides.is_anycast,
+ autnum_overrides.is_drop
+ FROM
+ n
+ JOIN
+ autnum_overrides
+ ON
+ autnum_overrides.number = n.autnum
+ )
+
+ UPDATE
+ n
+ SET
+ country =
+ COALESCE(networks.country, n.country),
+
+ is_anonymous_proxy =
+ COALESCE(networks.is_anonymous_proxy, n.is_anonymous_proxy),
+
+ is_satellite_provider =
+ COALESCE(networks.is_satellite_provider, n.is_satellite_provider),
+
+ is_anycast =
+ COALESCE(networks.is_anycast, n.is_anycast),
+
+ is_drop =
+ COALESCE(networks.is_drop, n.is_drop)
+ FROM
+ networks
+ WHERE
+ networks.network = n.network
+ """)
+
+ # Here we could remove some networks that we no longer need, but since we
+ # already have implemented our deduplication/merge algorithm this would not
+ # be necessary.
+
+ # Export the entire temporary table
+ rows = self.db.query("""
+ SELECT
+ *
+ FROM
+ n
+ ORDER BY
+ network
+ """)
+
+ for row in rows:
+ network = writer.add_network("%s" % row.network)
+
+ # Save country
+ if row.country:
+ network.country_code = row.country
+
+ # Save ASN
+ if row.autnum:
+ network.asn = row.autnum
+
+ # Set flags
+ if row.is_anonymous_proxy:
+ network.set_flag(location.NETWORK_FLAG_ANONYMOUS_PROXY)
+
+ if row.is_satellite_provider:
+ network.set_flag(location.NETWORK_FLAG_SATELLITE_PROVIDER)
+
+ if row.is_anycast:
+ network.set_flag(location.NETWORK_FLAG_ANYCAST)
+
+ if row.is_drop:
+ network.set_flag(location.NETWORK_FLAG_DROP)
+
+ # Add all countries
+ log.info("Writing countries...")
+
+ # Select all countries
+ rows = self.db.query("""
+ SELECT
+ *
+ FROM
+ countries
+ ORDER BY
+ country_code
+ """,
+ )
+
+ for row in rows:
+ c = writer.add_country(row.country_code)
+ c.continent_code = row.continent_code
+ c.name = row.name
+
+ # Write everything to file
+ log.info("Writing database to file...")
+ for file in ns.file:
+ writer.write(file)
+
+ async def handle_update_whois(self, ns):
+ # Did we run successfully?
+ success = True
+
+ sources = (
+ # African Network Information Centre
+ ("AFRINIC", (
+ (self._import_standard_format, "https://ftp.afrinic.net/pub/pub/dbase/afrinic.db.gz"),
+ )),
+
+ # Asia Pacific Network Information Centre
+ ("APNIC", (
+ (self._import_standard_format, "https://ftp.apnic.net/apnic/whois/apnic.db.inet6num.gz"),
+ (self._import_standard_format, "https://ftp.apnic.net/apnic/whois/apnic.db.inetnum.gz"),
+ (self._import_standard_format, "https://ftp.apnic.net/apnic/whois/apnic.db.aut-num.gz"),
+ (self._import_standard_format, "https://ftp.apnic.net/apnic/whois/apnic.db.organisation.gz"),
+ )),
+
+ # American Registry for Internet Numbers
+ ("ARIN", (
+ (self._import_extended_format, "https://ftp.arin.net/pub/stats/arin/delegated-arin-extended-latest"),
+ (self._import_arin_as_names, "https://ftp.arin.net/pub/resource_registry_service/asns.csv"),
+ )),
+
+ # Japan Network Information Center
+ ("JPNIC", (
+ (self._import_standard_format, "https://ftp.nic.ad.jp/jpirr/jpirr.db.gz"),
+ )),
+
+ # Latin America and Caribbean Network Information Centre
+ ("LACNIC", (
+ (self._import_standard_format, "https://ftp.lacnic.net/lacnic/dbase/lacnic.db.gz"),
+ (self._import_extended_format, "https://ftp.lacnic.net/pub/stats/lacnic/delegated-lacnic-extended-latest"),
+ )),
+
+ # Réseaux IP Européens
+ ("RIPE", (
+ (self._import_standard_format, "https://ftp.ripe.net/ripe/dbase/split/ripe.db.inet6num.gz"),
+ (self._import_standard_format, "https://ftp.ripe.net/ripe/dbase/split/ripe.db.inetnum.gz"),
+ (self._import_standard_format, "https://ftp.ripe.net/ripe/dbase/split/ripe.db.aut-num.gz"),
+ (self._import_standard_format, "https://ftp.ripe.net/ripe/dbase/split/ripe.db.organisation.gz"),
+ )),
+ )
+
+ # Fetch all valid country codes to check parsed networks against
+ countries = self.fetch_countries()
+
+ # Check if we have countries
+ if not countries:
+ log.error("Please import countries before importing any WHOIS data")
+ return 1
+
+ # Iterate over all potential sources
+ for name, feeds in sources:
+ # Skip anything that should not be updated
+ if ns.sources and not name in ns.sources:
+ continue
+
+ try:
+ await self._process_source(name, feeds, countries)
+
+ # Log an error but continue if an exception occurs
+ except Exception as e:
+ log.error("Error processing source %s" % name, exc_info=True)
+ success = False
+
+ # Return a non-zero exit code for errors
+ return 0 if success else 1
+
+ async def _process_source(self, source, feeds, countries):
+ """
+ This function processes one source
+ """
+ # Wrap everything into one large transaction
+ with self.db.transaction():
+ # Remove all previously imported content
+ self.db.execute("DELETE FROM autnums WHERE source = %s", source)
+ self.db.execute("DELETE FROM networks WHERE source = %s", source)
+ self.db.execute("DELETE FROM network_geofeeds WHERE source = %s", source)
+
+ # Create some temporary tables to store parsed data
+ self.db.execute("""
+ CREATE TEMPORARY TABLE _autnums(number integer NOT NULL,
+ organization text NOT NULL, source text NOT NULL) ON COMMIT DROP;
+ CREATE UNIQUE INDEX _autnums_number ON _autnums(number);
+
+ CREATE TEMPORARY TABLE _organizations(handle text NOT NULL,
+ name text NOT NULL, source text NOT NULL) ON COMMIT DROP;
+ CREATE UNIQUE INDEX _organizations_handle ON _organizations(handle);
+
+ CREATE TEMPORARY TABLE _rirdata(network inet NOT NULL, country text,
+ original_countries text[] NOT NULL, source text NOT NULL)
+ ON COMMIT DROP;
+ CREATE INDEX _rirdata_search ON _rirdata
+ USING BTREE(family(network), masklen(network));
+ CREATE UNIQUE INDEX _rirdata_network ON _rirdata(network);
+ """)
+
+ # Parse all feeds
+ for callback, url, *args in feeds:
+ # Retrieve the feed
+ f = self.downloader.retrieve(url)
+
+ # Call the callback
+ with self.db.pipeline():
+ await callback(source, countries, f, *args)
+
+ # Process all parsed networks from every RIR we happen to have access to,
+ # insert the largest network chunks into the networks table immediately...
+ families = self.db.query("""
+ SELECT DISTINCT
+ family(network) AS family
+ FROM
+ _rirdata
+ ORDER BY
+ family(network)
+ """,
+ )
+
+ for family in (row.family for row in families):
+ # Fetch the smallest mask length in our data set
+ smallest = self.db.get("""
+ SELECT
+ MIN(
+ masklen(network)
+ ) AS prefix
+ FROM
+ _rirdata
+ WHERE
+ family(network) = %s
+ """, family,
+ )
+
+ # Copy all networks
+ self.db.execute("""
+ INSERT INTO
+ networks
+ (
+ network,
+ country,
+ original_countries,
+ source
+ )
+ SELECT
+ network,
+ country,
+ original_countries,
+ source
+ FROM
+ _rirdata
+ WHERE
+ masklen(network) = %s
+ AND
+ family(network) = %s
+ ON CONFLICT DO
+ NOTHING""",
+ smallest.prefix,
+ family,
+ )
+
+ # ... determine any other prefixes for this network family, ...
+ prefixes = self.db.query("""
+ SELECT
+ DISTINCT masklen(network) AS prefix
+ FROM
+ _rirdata
+ WHERE
+ family(network) = %s
+ ORDER BY
+ masklen(network) ASC
+ OFFSET 1
+ """, family,
+ )
+
+ # ... and insert networks with this prefix in case they provide additional
+ # information (i. e. subnet of a larger chunk with a different country)
+ for prefix in (row.prefix for row in prefixes):
+ self.db.execute("""
+ WITH candidates AS (
+ SELECT
+ _rirdata.network,
+ _rirdata.country,
+ _rirdata.original_countries,
+ _rirdata.source
+ FROM
+ _rirdata
+ WHERE
+ family(_rirdata.network) = %s
+ AND
+ masklen(_rirdata.network) = %s
+ ),
+ filtered AS (
+ SELECT
+ DISTINCT ON (c.network)
+ c.network,
+ c.country,
+ c.original_countries,
+ c.source,
+ masklen(networks.network),
+ networks.country AS parent_country
+ FROM
+ candidates c
+ LEFT JOIN
+ networks
+ ON
+ c.network << networks.network
+ ORDER BY
+ c.network,
+ masklen(networks.network) DESC NULLS LAST
+ )
+ INSERT INTO
+ networks(network, country, original_countries, source)
+ SELECT
+ network,
+ country,
+ original_countries,
+ source
+ FROM
+ filtered
+ WHERE
+ parent_country IS NULL
+ OR
+ country <> parent_country
+ ON CONFLICT DO NOTHING
+ """, family, prefix,
+ )
+
+ self.db.execute("""
+ INSERT INTO
+ autnums
+ (
+ number,
+ name,
+ source
+ )
+ SELECT
+ _autnums.number,
+ _organizations.name,
+ _organizations.source
+ FROM
+ _autnums
+ JOIN
+ _organizations ON _autnums.organization = _organizations.handle
+ ON CONFLICT
+ (
+ number
+ )
+ DO UPDATE
+ SET name = excluded.name
+ """,
+ )
+
+ async def _import_standard_format(self, source, countries, f, *args):
+ """
+ Imports a single standard format source feed
+ """
+ # Iterate over all blocks
+ for block in iterate_over_blocks(f):
+ self._parse_block(block, source, countries)
+
+ async def _import_extended_format(self, source, countries, f, *args):
+ # Iterate over all lines
+ for line in iterate_over_lines(f):
+ self._parse_line(line, source, countries)
+
+ async def _import_arin_as_names(self, source, countries, f, *args):
+ # Wrap the data to text
+ f = io.TextIOWrapper(f)
+
+ # Walk through the file
+ for line in csv.DictReader(f, dialect="arin"):
+ # Fetch status
+ status = line.get("Status")
+
+ # We are only interested in anything managed by ARIN
+ if not status == "Full Registry Services":
+ continue
+
+ # Fetch organization name
+ name = line.get("Org Name")
+
+ # Extract ASNs
+ first_asn = line.get("Start AS Number")
+ last_asn = line.get("End AS Number")
+
+ # Cast to a number
+ try:
+ first_asn = int(first_asn)
+ except TypeError as e:
+ log.warning("Could not parse ASN '%s'" % first_asn)
+ continue
+
+ try:
+ last_asn = int(last_asn)
+ except TypeError as e:
+ log.warning("Could not parse ASN '%s'" % last_asn)
+ continue
+
+ # Check if the range is valid
+ if last_asn < first_asn:
+ log.warning("Invalid ASN range %s-%s" % (first_asn, last_asn))
+
+ # Insert everything into the database
+ for asn in range(first_asn, last_asn + 1):
+ if not self._check_parsed_asn(asn):
+ log.warning("Skipping invalid ASN %s" % asn)
+ continue
+
+ self.db.execute("""
+ INSERT INTO
+ autnums
+ (
+ number,
+ name,
+ source
+ )
+ VALUES
+ (
+ %s, %s, %s
+ )
+ ON CONFLICT
+ (
+ number
+ )
+ DO NOTHING
+ """, asn, name, "ARIN",
+ )
+
+ def _check_parsed_network(self, network):
+ """
+ Assistive function to detect and subsequently sort out parsed
+ networks from RIR data (both Whois and so-called "extended sources"),
+ which are or have...
+
+ (a) not globally routable (RFC 1918 space, et al.)
+ (b) covering a too large chunk of the IP address space (prefix length
+ is < 7 for IPv4 networks, and < 10 for IPv6)
+ (c) "0.0.0.0" or "::" as a network address
+
+ This unfortunately is necessary due to brain-dead clutter across
+ various RIR databases, causing mismatches and eventually disruptions.
+
+ We will return False in case a network is not suitable for adding
+ it to our database, and True otherwise.
+ """
+ # Check input
+ if isinstance(network, ipaddress.IPv6Network):
+ pass
+ elif isinstance(network, ipaddress.IPv4Network):
+ pass
+ else:
+ raise ValueError("Invalid network: %s (type %s)" % (network, type(network)))
+
+ # Ignore anything that isn't globally routable
+ if not network.is_global:
+ log.debug("Skipping non-globally routable network: %s" % network)
+ return False
+
+ # Ignore anything that is unspecified IP range (See RFC 5735 for IPv4 or RFC 2373 for IPv6)
+ elif network.is_unspecified:
+ log.debug("Skipping unspecified network: %s" % network)
+ return False
+
+ # IPv6
+ if network.version == 6:
+ if network.prefixlen < 10:
+ log.debug("Skipping too big IP chunk: %s" % network)
+ return False
+
+ # IPv4
+ elif network.version == 4:
+ if network.prefixlen < 7:
+ log.debug("Skipping too big IP chunk: %s" % network)
+ return False
+
+ # In case we have made it here, the network is considered to
+ # be suitable for libloc consumption...
+ return True
+
+ def _check_parsed_asn(self, asn):
+ """
+ Assistive function to filter Autonomous System Numbers not being suitable
+ for adding to our database. Returns False in such cases, and True otherwise.
+ """
+
+ for start, end in VALID_ASN_RANGES:
+ if start <= asn and end >= asn:
+ return True
+
+ log.info("Supplied ASN %s out of publicly routable ASN ranges" % asn)
+ return False
+
+ def _check_geofeed_url(self, url):
+ """
+ This function checks if a Geofeed URL is valid.
+
+ If so, it returns the normalized URL which should be stored instead of
+ the original one.
+ """
+ # Parse the URL
+ try:
+ url = urllib.parse.urlparse(url)
+ except ValueError as e:
+ log.warning("Invalid URL %s: %s" % (url, e))
+ return
+
+ # Make sure that this is a HTTPS URL
+ if not url.scheme == "https":
+ log.warning("Skipping Geofeed URL that is not using HTTPS: %s" \
+ % url.geturl())
+ return
+
+ # Normalize the URL and convert it back
+ return url.geturl()
+
+ def _parse_block(self, block, source_key, countries):
+ # Get first line to find out what type of block this is
+ line = block[0]
+
+ # aut-num
+ if line.startswith("aut-num:"):
+ return self._parse_autnum_block(block, source_key)
+
+ # inetnum
+ if line.startswith("inet6num:") or line.startswith("inetnum:"):
+ return self._parse_inetnum_block(block, source_key, countries)
+
+ # organisation
+ elif line.startswith("organisation:"):
+ return self._parse_org_block(block, source_key)
+
+ def _parse_autnum_block(self, block, source_key):
+ autnum = {}
+ for line in block:
+ # Split line
+ key, val = split_line(line)
+
+ if key == "aut-num":
+ m = re.match(r"^(AS|as)(\d+)", val)
+ if m:
+ autnum["asn"] = m.group(2)
+
+ elif key == "org":
+ autnum[key] = val.upper()
+
+ elif key == "descr":
+ # Save the first description line as well...
+ if not key in autnum:
+ autnum[key] = val
+
+ # Skip empty objects
+ if not autnum or not "asn" in autnum:
+ return
+
+ # Insert a dummy organisation handle into our temporary organisations
+ # table in case the AS does not have an organisation handle set, but
+ # has a description (a quirk often observed in APNIC area), so we can
+ # later display at least some string for this AS.
+ if not "org" in autnum:
+ if "descr" in autnum:
+ autnum["org"] = "LIBLOC-%s-ORGHANDLE" % autnum.get("asn")
+
+ self.db.execute("INSERT INTO _organizations(handle, name, source) \
+ VALUES(%s, %s, %s) ON CONFLICT (handle) DO NOTHING",
+ autnum.get("org"), autnum.get("descr"), source_key,
+ )
+ else:
+ log.warning("ASN %s neither has an organisation handle nor a description line set, omitting" % \
+ autnum.get("asn"))
+ return
+
+ # Insert into database
+ self.db.execute("INSERT INTO _autnums(number, organization, source) \
+ VALUES(%s, %s, %s) ON CONFLICT (number) DO UPDATE SET \
+ organization = excluded.organization",
+ autnum.get("asn"), autnum.get("org"), source_key,
+ )
+
+ def _parse_inetnum_block(self, block, source_key, countries):
+ inetnum = {}
+ for line in block:
+ # Split line
+ key, val = split_line(line)
+
+ # Filter any inetnum records which are only referring to IP space
+ # not managed by that specific RIR...
+ if key == "netname":
+ if re.match(r"^(ERX-NETBLOCK|(AFRINIC|ARIN|LACNIC|RIPE)-CIDR-BLOCK|IANA-NETBLOCK-\d{1,3}|NON-RIPE-NCC-MANAGED-ADDRESS-BLOCK|STUB-[\d-]{3,}SLASH\d{1,2})", val.strip()):
+ log.debug("Skipping record indicating historic/orphaned data: %s" % val.strip())
+ return
+
+ if key == "inetnum":
+ start_address, delim, end_address = val.partition("-")
+
+ # Strip any excess space
+ start_address, end_address = start_address.rstrip(), end_address.strip()
+
+ # Handle "inetnum" formatting in LACNIC DB (e.g. "24.152.8/22" instead of "24.152.8.0/22")
+ if start_address and not (delim or end_address):
+ try:
+ start_address = ipaddress.ip_network(start_address, strict=False)
+ except ValueError:
+ start_address = start_address.split("/")
+ ldigits = start_address[0].count(".")
+
+ # How many octets do we need to add?
+ # (LACNIC does not seem to have a /8 or greater assigned, so the following should suffice.)
+ if ldigits == 1:
+ start_address = start_address[0] + ".0.0/" + start_address[1]
+ elif ldigits == 2:
+ start_address = start_address[0] + ".0/" + start_address[1]
+ else:
+ log.warning("Could not recover IPv4 address from line in LACNIC DB format: %s" % line)
+ return
+
+ try:
+ start_address = ipaddress.ip_network(start_address, strict=False)
+ except ValueError:
+ log.warning("Could not parse line in LACNIC DB format: %s" % line)
+ return
+
+ # Enumerate first and last IP address of this network
+ end_address = start_address[-1]
+ start_address = start_address[0]
+
+ else:
+ # Convert to IP address
+ try:
+ start_address = ipaddress.ip_address(start_address)
+ end_address = ipaddress.ip_address(end_address)
+ except ValueError:
+ log.warning("Could not parse line: %s" % line)
+ return
+
+ inetnum["inetnum"] = list(ipaddress.summarize_address_range(start_address, end_address))
+
+ elif key == "inet6num":
+ inetnum[key] = [ipaddress.ip_network(val, strict=False)]
+
+ elif key == "country":
+ cc = val.upper()
+
+ # Ignore certain country codes
+ if cc in IGNORED_COUNTRIES:
+ log.debug("Ignoring country code '%s'" % cc)
+ continue
+
+ # Translate country codes
+ try:
+ cc = TRANSLATED_COUNTRIES[cc]
+ except KeyError:
+ pass
+
+ # Do we know this country?
+ if not cc in countries:
+ log.warning("Skipping invalid country code '%s'" % cc)
+ continue
+
+ try:
+ inetnum[key].append(cc)
+ except KeyError:
+ inetnum[key] = [cc]
+
+ # Parse the geofeed attribute
+ elif key == "geofeed":
+ inetnum["geofeed"] = val
+
+ # Parse geofeed when used as a remark
+ elif key == "remarks":
+ m = re.match(r"^(?:Geofeed)\s+(https://.*)", val)
+ if m:
+ inetnum["geofeed"] = m.group(1)
+
+ # Skip empty objects
+ if not inetnum:
+ return
+
+ # Iterate through all networks enumerated from above, check them for plausibility and insert
+ # them into the database, if _check_parsed_network() succeeded
+ for single_network in inetnum.get("inet6num") or inetnum.get("inetnum"):
+ if not self._check_parsed_network(single_network):
+ continue
+
+ # Fetch the countries or use a list with an empty country
+ countries = inetnum.get("country", [None])
+
+ # Insert the network into the database but only use the first country code
+ for cc in countries:
+ self.db.execute("""
+ INSERT INTO
+ _rirdata
+ (
+ network,
+ country,
+ original_countries,
+ source
+ )
+ VALUES
+ (
+ %s, %s, %s, %s
+ )
+ ON CONFLICT (network)
+ DO UPDATE SET country = excluded.country
+ """, "%s" % single_network, cc, [cc for cc in countries if cc], source_key,
+ )
+
+ # If there are more than one country, we will only use the first one
+ break
+
+ # Update any geofeed information
+ geofeed = inetnum.get("geofeed", None)
+ if geofeed:
+ self._parse_geofeed(source_key, geofeed, single_network)
+
+ def _parse_geofeed(self, source, url, single_network):
+ # Check the URL
+ url = self._check_geofeed_url(url)
+ if not url:
+ return
+
+ # Store/update any geofeeds
+ self.db.execute("""
+ INSERT INTO
+ network_geofeeds
+ (
+ network,
+ url,
+ source
+ )
+ VALUES
+ (
+ %s, %s, %s
+ )
+ ON CONFLICT
+ (
+ network, url
+ )
+ DO UPDATE SET
+ source = excluded.source
+ """, "%s" % single_network, url, source,
+ )
+
+ def _parse_org_block(self, block, source_key):
+ org = {}
+ for line in block:
+ # Split line
+ key, val = split_line(line)
+
+ if key == "organisation":
+ org[key] = val.upper()
+ elif key == "org-name":
+ org[key] = val
+
+ # Skip empty objects
+ if not org:
+ return
+
+ self.db.execute("INSERT INTO _organizations(handle, name, source) \
+ VALUES(%s, %s, %s) ON CONFLICT (handle) DO \
+ UPDATE SET name = excluded.name",
+ org.get("organisation"), org.get("org-name"), source_key,
+ )
+
+ def _parse_line(self, line, source_key, validcountries=None):
+ # Skip version line
+ if line.startswith("2"):
+ return
+
+ # Skip comments
+ if line.startswith("#"):
+ return
+
+ try:
+ registry, country_code, type, line = line.split("|", 3)
+ except:
+ log.warning("Could not parse line: %s" % line)
+ return
+
+ # Skip ASN
+ if type == "asn":
+ return
+
+ # Skip any unknown protocols
+ elif not type in ("ipv6", "ipv4"):
+ log.warning("Unknown IP protocol '%s'" % type)
+ return
+
+ # Skip any lines that are for stats only or do not have a country
+ # code at all (avoids log spam below)
+ if not country_code or country_code == '*':
+ return
+
+ # Skip objects with unknown country codes
+ if validcountries and country_code not in validcountries:
+ log.warning("Skipping line with bogus country '%s': %s" % \
+ (country_code, line))
+ return
+
+ try:
+ address, prefix, date, status, organization = line.split("|")
+ except ValueError:
+ organization = None
+
+ # Try parsing the line without organization
+ try:
+ address, prefix, date, status = line.split("|")
+ except ValueError:
+ log.warning("Unhandled line format: %s" % line)
+ return
+
+ # Skip anything that isn't properly assigned
+ if not status in ("assigned", "allocated"):
+ return
+
+ # Cast prefix into an integer
+ try:
+ prefix = int(prefix)
+ except:
+ log.warning("Invalid prefix: %s" % prefix)
+ return
+
+ # Fix prefix length for IPv4
+ if type == "ipv4":
+ prefix = 32 - int(math.log(prefix, 2))
+
+ # Try to parse the address
+ try:
+ network = ipaddress.ip_network("%s/%s" % (address, prefix), strict=False)
+ except ValueError:
+ log.warning("Invalid IP address: %s" % address)
+ return
+
+ if not self._check_parsed_network(network):
+ return
+
+ self.db.execute("""
+ INSERT INTO
+ networks
+ (
+ network,
+ country,
+ original_countries,
+ source
+ )
+ VALUES
+ (
+ %s, %s, %s, %s
+ )
+ ON CONFLICT (network)
+ DO UPDATE SET country = excluded.country
+ """, "%s" % network, country_code, [country_code], source_key,
+ )
+
+ async def handle_update_announcements(self, ns):
+ server = ns.server[0]
+
+ with self.db.transaction():
+ if server.startswith("/"):
+ await self._handle_update_announcements_from_bird(server)
+
+ # Purge anything we never want here
+ self.db.execute("""
+ -- Delete default routes
+ DELETE FROM announcements WHERE network = '::/0' OR network = '0.0.0.0/0';
+
+ -- Delete anything that is not global unicast address space
+ DELETE FROM announcements WHERE family(network) = 6 AND NOT network <<= '2000::/3';
+
+ -- DELETE "current network" address space
+ DELETE FROM announcements WHERE family(network) = 4 AND network <<= '0.0.0.0/8';
+
+ -- DELETE local loopback address space
+ DELETE FROM announcements WHERE family(network) = 4 AND network <<= '127.0.0.0/8';
+
+ -- DELETE RFC 1918 address space
+ DELETE FROM announcements WHERE family(network) = 4 AND network <<= '10.0.0.0/8';
+ DELETE FROM announcements WHERE family(network) = 4 AND network <<= '172.16.0.0/12';
+ DELETE FROM announcements WHERE family(network) = 4 AND network <<= '192.168.0.0/16';
+
+ -- DELETE test, benchmark and documentation address space
+ DELETE FROM announcements WHERE family(network) = 4 AND network <<= '192.0.0.0/24';
+ DELETE FROM announcements WHERE family(network) = 4 AND network <<= '192.0.2.0/24';
+ DELETE FROM announcements WHERE family(network) = 4 AND network <<= '198.18.0.0/15';
+ DELETE FROM announcements WHERE family(network) = 4 AND network <<= '198.51.100.0/24';
+ DELETE FROM announcements WHERE family(network) = 4 AND network <<= '203.0.113.0/24';
+
+ -- DELETE CGNAT address space (RFC 6598)
+ DELETE FROM announcements WHERE family(network) = 4 AND network <<= '100.64.0.0/10';
+
+ -- DELETE link local address space
+ DELETE FROM announcements WHERE family(network) = 4 AND network <<= '169.254.0.0/16';
+
+ -- DELETE IPv6 to IPv4 (6to4) address space (RFC 3068)
+ DELETE FROM announcements WHERE family(network) = 4 AND network <<= '192.88.99.0/24';
+ DELETE FROM announcements WHERE family(network) = 6 AND network <<= '2002::/16';
+
+ -- DELETE multicast and reserved address space
+ DELETE FROM announcements WHERE family(network) = 4 AND network <<= '224.0.0.0/4';
+ DELETE FROM announcements WHERE family(network) = 4 AND network <<= '240.0.0.0/4';
+
+ -- Delete networks that are too small to be in the global routing table
+ DELETE FROM announcements WHERE family(network) = 6 AND masklen(network) > 48;
+ DELETE FROM announcements WHERE family(network) = 4 AND masklen(network) > 24;
+
+ -- Delete any non-public or reserved ASNs
+ DELETE FROM announcements WHERE NOT (
+ (autnum >= 1 AND autnum <= 23455)
+ OR
+ (autnum >= 23457 AND autnum <= 64495)
+ OR
+ (autnum >= 131072 AND autnum <= 4199999999)
+ );
+
+ -- Delete everything that we have not seen for 14 days
+ DELETE FROM announcements WHERE last_seen_at <= CURRENT_TIMESTAMP - INTERVAL '14 days';
+ """)
+
+ async def _handle_update_announcements_from_bird(self, server):
+ # Pre-compile the regular expression for faster searching
+ route = re.compile(b"^\s(.+?)\s+.+?\[(?:AS(.*?))?.\]$")
+
+ log.info("Requesting routing table from Bird (%s)" % server)
+
+ aggregated_networks = []
+
+ # Send command to list all routes
+ for line in self._bird_cmd(server, "show route"):
+ m = route.match(line)
+ if not m:
+ # Skip empty lines
+ if not line:
+ pass
+
+ # Ignore any header lines with the name of the routing table
+ elif line.startswith(b"Table"):
+ pass
+
+ # Log anything else
+ else:
+ log.debug("Could not parse line: %s" % line.decode())
+
+ continue
+
+ # Fetch the extracted network and ASN
+ network, autnum = m.groups()
+
+ # Decode into strings
+ if network:
+ network = network.decode()
+ if autnum:
+ autnum = autnum.decode()
+
+ # Collect all aggregated networks
+ if not autnum:
+ log.debug("%s is an aggregated network" % network)
+ aggregated_networks.append(network)
+ continue
+
+ # Insert it into the database
+ self.db.execute("INSERT INTO announcements(network, autnum) \
+ VALUES(%s, %s) ON CONFLICT (network) DO \
+ UPDATE SET autnum = excluded.autnum, last_seen_at = CURRENT_TIMESTAMP",
+ network, autnum,
+ )
+
+ # Process any aggregated networks
+ for network in aggregated_networks:
+ log.debug("Processing aggregated network %s" % network)
+
+ # Run "show route all" for each network
+ for line in self._bird_cmd(server, "show route %s all" % network):
+ # Try finding the path
+ m = re.match(b"\s+BGP\.as_path:.* (\d+) {\d+}$", line)
+ if m:
+ # Select the last AS number in the path
+ autnum = m.group(1).decode()
+
+ # Insert it into the database
+ self.db.execute("INSERT INTO announcements(network, autnum) \
+ VALUES(%s, %s) ON CONFLICT (network) DO \
+ UPDATE SET autnum = excluded.autnum, last_seen_at = CURRENT_TIMESTAMP",
+ network, autnum,
+ )
+
+ # We don't need to process any more
+ break
+
+ def _bird_cmd(self, socket_path, command):
+ # Connect to the socket
+ s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ s.connect(socket_path)
+
+ # Allocate some buffer
+ buffer = b""
+
+ log.debug("Sending Bird command: %s" % command)
+
+ # Send the command
+ s.send(b"%s\n" % command.encode())
+
+ while True:
+ # Fill up the buffer
+ buffer += s.recv(4096)
+
+ while True:
+ # Search for the next newline
+ pos = buffer.find(b"\n")
+
+ # If we cannot find one, we go back and read more data
+ if pos <= 0:
+ break
+
+ # Cut after the newline character
+ pos += 1
+
+ # Split the line we want and keep the rest in buffer
+ line, buffer = buffer[:pos], buffer[pos:]
+
+ # Try parsing any status lines
+ if len(line) > 4 and line[:4].isdigit() and line[4] in (32, 45):
+ code, delim, line = int(line[:4]), line[4], line[5:]
+
+ log.debug("Received response code %s from bird" % code)
+
+ # End of output
+ if code == 0:
+ return
+
+ # Ignore hello line
+ elif code == 1:
+ continue
+
+ # Otherwise return the line
+ yield line
+
+ async def handle_update_geofeeds(self, ns):
+ # Sync geofeeds
+ with self.db.transaction():
+ # Delete all geofeeds which are no longer linked
+ self.db.execute("""
+ DELETE FROM
+ geofeeds
+ WHERE
+ geofeeds.url NOT IN (
+ SELECT
+ network_geofeeds.url
+ FROM
+ network_geofeeds
+ )
+ """,
+ )
+
+ # Copy all geofeeds
+ self.db.execute("""
+ WITH all_geofeeds AS (
+ SELECT
+ network_geofeeds.url
+ FROM
+ network_geofeeds
+ )
+ INSERT INTO
+ geofeeds
+ (
+ url
+ )
+ SELECT
+ url
+ FROM
+ all_geofeeds
+ ON CONFLICT (url)
+ DO NOTHING
+ """,
+ )
+
+ # Fetch all Geofeeds that require an update
+ geofeeds = self.db.query("""
+ SELECT
+ id,
+ url
+ FROM
+ geofeeds
+ WHERE
+ updated_at IS NULL
+ OR
+ updated_at <= CURRENT_TIMESTAMP - INTERVAL '1 week'
+ ORDER BY
+ id
+ """)
+
+ ratelimiter = asyncio.Semaphore(32)
+
+ # Update all geofeeds
+ async with asyncio.TaskGroup() as tasks:
+ for geofeed in geofeeds:
+ task = tasks.create_task(
+ self._fetch_geofeed(ratelimiter, geofeed),
+ )
+
+ # Delete data from any feeds that did not update in the last two weeks
+ with self.db.transaction():
+ self.db.execute("""
+ DELETE FROM
+ geofeed_networks
+ WHERE
+ geofeed_networks.geofeed_id IN (
+ SELECT
+ geofeeds.id
+ FROM
+ geofeeds
+ WHERE
+ updated_at IS NULL
+ OR
+ updated_at <= CURRENT_TIMESTAMP - INTERVAL '2 weeks'
+ )
+ """)
+
+ async def _fetch_geofeed(self, ratelimiter, geofeed):
+ async with ratelimiter:
+ log.debug("Fetching Geofeed %s" % geofeed.url)
+
+ with self.db.transaction():
+ # Open the URL
+ try:
+ # Send the request
+ f = await asyncio.to_thread(
+ self.downloader.retrieve,
+
+ # Fetch the feed by its URL
+ geofeed.url,
+
+ # Send some extra headers
+ headers={
+ "User-Agent" : "location/%s" % location.__version__,
+
+ # We expect some plain text file in CSV format
+ "Accept" : "text/csv, text/plain",
+ },
+
+ # Don't wait longer than 10 seconds for a response
+ timeout=10,
+ )
+
+ # Remove any previous data
+ self.db.execute("DELETE FROM geofeed_networks \
+ WHERE geofeed_id = %s", geofeed.id)
+
+ lineno = 0
+
+ # Read the output line by line
+ with self.db.pipeline():
+ for line in f:
+ lineno += 1
+
+ try:
+ line = line.decode()
+
+ # Ignore any lines we cannot decode
+ except UnicodeDecodeError:
+ log.debug("Could not decode line %s in %s" \
+ % (lineno, geofeed.url))
+ continue
+
+ # Strip any newline
+ line = line.rstrip()
+
+ # Skip empty lines
+ if not line:
+ continue
+
+ # Skip comments
+ elif line.startswith("#"):
+ continue
+
+ # Try to parse the line
+ try:
+ fields = line.split(",", 5)
+ except ValueError:
+ log.debug("Could not parse line: %s" % line)
+ continue
+
+ # Check if we have enough fields
+ if len(fields) < 4:
+ log.debug("Not enough fields in line: %s" % line)
+ continue
+
+ # Fetch all fields
+ network, country, region, city, = fields[:4]
+
+ # Try to parse the network
+ try:
+ network = ipaddress.ip_network(network, strict=False)
+ except ValueError:
+ log.debug("Could not parse network: %s" % network)
+ continue
+
+ # Strip any excess whitespace from country codes
+ country = country.strip()
+
+ # Make the country code uppercase
+ country = country.upper()
+
+ # Check the country code
+ if not country:
+ log.debug("Empty country code in Geofeed %s line %s" \
+ % (geofeed.url, lineno))
+ continue
+
+ elif not location.country_code_is_valid(country):
+ log.debug("Invalid country code in Geofeed %s:%s: %s" \
+ % (geofeed.url, lineno, country))
+ continue
+
+ # Write this into the database
+ self.db.execute("""
+ INSERT INTO
+ geofeed_networks (
+ geofeed_id,
+ network,
+ country,
+ region,
+ city
+ )
+ VALUES (%s, %s, %s, %s, %s)""",
+ geofeed.id,
+ "%s" % network,
+ country,
+ region,
+ city,
+ )
+
+ # Catch any HTTP errors
+ except urllib.request.HTTPError as e:
+ self.db.execute("UPDATE geofeeds SET status = %s, error = %s \
+ WHERE id = %s", e.code, "%s" % e, geofeed.id)
+
+ # Remove any previous data when the feed has been deleted
+ if e.code == 404:
+ self.db.execute("DELETE FROM geofeed_networks \
+ WHERE geofeed_id = %s", geofeed.id)
+
+ # Catch any other errors and connection timeouts
+ except (http.client.InvalidURL, urllib.request.URLError, TimeoutError) as e:
+ log.debug("Could not fetch URL %s: %s" % (geofeed.url, e))
+
+ self.db.execute("UPDATE geofeeds SET status = %s, error = %s \
+ WHERE id = %s", 599, "%s" % e, geofeed.id)
+
+ # Mark the geofeed as updated
+ else:
+ self.db.execute("""
+ UPDATE
+ geofeeds
+ SET
+ updated_at = CURRENT_TIMESTAMP,
+ status = NULL,
+ error = NULL
+ WHERE
+ id = %s""",
+ geofeed.id,
+ )
+
+ async def handle_update_overrides(self, ns):
+ with self.db.transaction():
+ # Drop any previous content
+ self.db.execute("TRUNCATE TABLE autnum_overrides")
+ self.db.execute("TRUNCATE TABLE network_overrides")
+
+ # Remove all Geofeeds
+ self.db.execute("DELETE FROM network_geofeeds WHERE source = %s", "overrides")
+
+ for file in ns.files:
+ log.info("Reading %s..." % file)
+
+ with open(file, "rb") as f:
+ for type, block in read_blocks(f):
+ if type == "net":
+ network = block.get("net")
+ # Try to parse and normalise the network
+ try:
+ network = ipaddress.ip_network(network, strict=False)
+ except ValueError as e:
+ log.warning("Invalid IP network: %s: %s" % (network, e))
+ continue
+
+ # Prevent that we overwrite all networks
+ if network.prefixlen == 0:
+ log.warning("Skipping %s: You cannot overwrite default" % network)
+ continue
+
+ self.db.execute("""
+ INSERT INTO
+ network_overrides
+ (
+ network,
+ country,
+ is_anonymous_proxy,
+ is_satellite_provider,
+ is_anycast,
+ is_drop
+ )
+ VALUES
+ (
+ %s, %s, %s, %s, %s, %s
+ )
+ ON CONFLICT (network) DO NOTHING
+ """,
+ "%s" % network,
+ block.get("country"),
+ self._parse_bool(block, "is-anonymous-proxy"),
+ self._parse_bool(block, "is-satellite-provider"),
+ self._parse_bool(block, "is-anycast"),
+ self._parse_bool(block, "drop"),
+ )
+
+ elif type == "aut-num":
+ autnum = block.get("aut-num")
+
+ # Check if AS number begins with "AS"
+ if not autnum.startswith("AS"):
+ log.warning("Invalid AS number: %s" % autnum)
+ continue
+
+ # Strip "AS"
+ autnum = autnum[2:]
+
+ self.db.execute("""
+ INSERT INTO
+ autnum_overrides
+ (
+ number,
+ name,
+ country,
+ is_anonymous_proxy,
+ is_satellite_provider,
+ is_anycast,
+ is_drop
+ )
+ VALUES
+ (
+ %s, %s, %s, %s, %s, %s, %s
+ )
+ ON CONFLICT (number) DO NOTHING
+ """,
+ autnum,
+ block.get("name"),
+ block.get("country"),
+ self._parse_bool(block, "is-anonymous-proxy"),
+ self._parse_bool(block, "is-satellite-provider"),
+ self._parse_bool(block, "is-anycast"),
+ self._parse_bool(block, "drop"),
+ )
+
+ # Geofeeds
+ elif type == "geofeed":
+ networks = []
+
+ # Fetch the URL
+ url = block.get("geofeed")
+
+ # Fetch permitted networks
+ for n in block.get("network", []):
+ try:
+ n = ipaddress.ip_network(n)
+ except ValueError as e:
+ log.warning("Ignoring invalid network %s: %s" % (n, e))
+ continue
+
+ networks.append(n)
+
+ # If no networks have been specified, permit for everything
+ if not networks:
+ networks = [
+ ipaddress.ip_network("::/0"),
+ ipaddress.ip_network("0.0.0.0/0"),
+ ]
+
+ # Check the URL
+ url = self._check_geofeed_url(url)
+ if not url:
+ continue
+
+ # Store the Geofeed URL
+ self.db.execute("""
+ INSERT INTO
+ geofeeds
+ (
+ url
+ )
+ VALUES
+ (
+ %s
+ )
+ ON CONFLICT (url) DO NOTHING
+ """, url,
+ )
+
+ # Store all permitted networks
+ self.db.executemany("""
+ INSERT INTO
+ network_geofeeds
+ (
+ network,
+ url,
+ source
+ )
+ VALUES
+ (
+ %s, %s, %s
+ )
+ ON CONFLICT
+ (
+ network, url
+ )
+ DO UPDATE SET
+ source = excluded.source
+ """, (("%s" % n, url, "overrides") for n in networks),
+ )
+
+ else:
+ log.warning("Unsupported type: %s" % type)
+
+ async def handle_update_feeds(self, ns):
+ """
+ Update any third-party feeds
+ """
+ success = True
+
+ feeds = (
+ # AWS IP Ranges
+ ("AWS-IP-RANGES", self._import_aws_ip_ranges, "https://ip-ranges.amazonaws.com/ip-ranges.json"),
+
+ # Spamhaus DROP
+ ("SPAMHAUS-DROP", self._import_spamhaus_drop, "https://www.spamhaus.org/drop/drop.txt"),
+ ("SPAMHAUS-DROPV6", self._import_spamhaus_drop, "https://www.spamhaus.org/drop/dropv6.txt"),
+
+ # Spamhaus ASNDROP
+ ("SPAMHAUS-ASNDROP", self._import_spamhaus_asndrop, "https://www.spamhaus.org/drop/asndrop.json"),
+ )
+
+ # Drop any data from feeds that we don't support (any more)
+ with self.db.transaction():
+ # Fetch the names of all feeds we support
+ sources = [name for name, *rest in feeds]
+
+ self.db.execute("DELETE FROM autnum_feeds WHERE NOT source = ANY(%s)", sources)
+ self.db.execute("DELETE FROM network_feeds WHERE NOT source = ANY(%s)", sources)
+
+ # Walk through all feeds
+ for name, callback, url, *args in feeds:
+ # Skip any feeds that were not requested on the command line
+ if ns.feeds and not name in ns.feeds:
+ continue
+
+ try:
+ await self._process_feed(name, callback, url, *args)
+
+ # Log an error but continue if an exception occurs
+ except Exception as e:
+ log.error("Error processing feed '%s': %s" % (name, e))
+ success = False
+
+ # Return status
+ return 0 if success else 1
+
+ async def _process_feed(self, name, callback, url, *args):
+ """
+ Processes one feed
+ """
+ # Open the URL
+ f = self.downloader.retrieve(url)
+
+ with self.db.transaction():
+ # Drop any previous content
+ self.db.execute("DELETE FROM autnum_feeds WHERE source = %s", name)
+ self.db.execute("DELETE FROM network_feeds WHERE source = %s", name)
+
+ # Call the callback to process the feed
+ with self.db.pipeline():
+ return await callback(name, f, *args)
+
+ async def _import_aws_ip_ranges(self, name, f):
+ # Parse the feed
+ feed = json.load(f)
+
+ # Set up a dictionary for mapping a region name to a country. Unfortunately,
+ # there seems to be no machine-readable version available of this other than
+ # https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html
+ # (worse, it seems to be incomplete :-/ ); https://www.cloudping.cloud/endpoints
+ # was helpful here as well.
+ aws_region_country_map = {
+ # Africa
+ "af-south-1" : "ZA",
+
+ # Asia
+ "il-central-1" : "IL", # Tel Aviv
+
+ # Asia/Pacific
+ "ap-northeast-1" : "JP",
+ "ap-northeast-2" : "KR",
+ "ap-northeast-3" : "JP",
+ "ap-east-1" : "HK",
+ "ap-south-1" : "IN",
+ "ap-south-2" : "IN",
+ "ap-southeast-1" : "SG",
+ "ap-southeast-2" : "AU",
+ "ap-southeast-3" : "MY",
+ "ap-southeast-4" : "AU",
+ "ap-southeast-5" : "NZ", # Auckland, NZ
+ "ap-southeast-6" : "AP", # XXX: Precise location not documented anywhere
+
+ # Canada
+ "ca-central-1" : "CA",
+ "ca-west-1" : "CA",
+
+ # Europe
+ "eu-central-1" : "DE",
+ "eu-central-2" : "CH",
+ "eu-north-1" : "SE",
+ "eu-west-1" : "IE",
+ "eu-west-2" : "GB",
+ "eu-west-3" : "FR",
+ "eu-south-1" : "IT",
+ "eu-south-2" : "ES",
+
+ # Middle East
+ "me-central-1" : "AE",
+ "me-south-1" : "BH",
+
+ # South America
+ "sa-east-1" : "BR",
+
+ # Undocumented, likely located in Berlin rather than Frankfurt
+ "eusc-de-east-1" : "DE",
+ }
+
+ # Collect a list of all networks
+ prefixes = feed.get("ipv6_prefixes", []) + feed.get("prefixes", [])
+
+ for prefix in prefixes:
+ # Fetch network
+ network = prefix.get("ipv6_prefix") or prefix.get("ip_prefix")
+
+ # Parse the network
+ try:
+ network = ipaddress.ip_network(network)
+ except ValuleError as e:
+ log.warning("%s: Unable to parse prefix %s" % (name, network))
+ continue
+
+ # Sanitize parsed networks...
+ if not self._check_parsed_network(network):
+ continue
+
+ # Fetch the region
+ region = prefix.get("region")
+
+ # Set some defaults
+ cc = None
+ is_anycast = False
+
+ # Fetch the CC from the dictionary
+ try:
+ cc = aws_region_country_map[region]
+
+ # If we couldn't find anything, let's try something else...
+ except KeyError as e:
+ # Find anycast networks
+ if region == "GLOBAL":
+ is_anycast = True
+
+ # Everything that starts with us- is probably in the United States
+ elif region.startswith("us-"):
+ cc = "US"
+
+ # Everything that starts with cn- is probably China
+ elif region.startswith("cn-"):
+ cc = "CN"
+
+ # Log a warning for anything else
+ else:
+ log.warning("%s: Could not determine country code for AWS region %s" \
+ % (name, region))
+ continue
+
+ # Write to database
+ self.db.execute("""
+ INSERT INTO
+ network_feeds
+ (
+ network,
+ source,
+ country,
+ is_anycast
+ )
+ VALUES
+ (
+ %s, %s, %s, %s
+ )
+ ON CONFLICT (network, source) DO NOTHING
+ """, "%s" % network, name, cc, is_anycast,
+ )
+
+ async def _import_spamhaus_drop(self, name, f):
+ """
+ Import Spamhaus DROP IP feeds
+ """
+ # Count all lines
+ lines = 0
+
+ # Walk through all lines
+ for line in f:
+ # Decode line
+ line = line.decode("utf-8")
+
+ # Strip off any comments
+ line, _, comment = line.partition(";")
+
+ # Ignore empty lines
+ if not line:
+ continue
+
+ # Strip any excess whitespace
+ line = line.strip()
+
+ # Increment line counter
+ lines += 1
+
+ # Parse the network
+ try:
+ network = ipaddress.ip_network(line)
+ except ValueError as e:
+ log.warning("%s: Could not parse network: %s - %s" % (name, line, e))
+ continue
+
+ # Check network
+ if not self._check_parsed_network(network):
+ log.warning("%s: Skipping bogus network: %s" % (name, network))
+ continue
+
+ # Insert into the database
+ self.db.execute("""
+ INSERT INTO
+ network_feeds
+ (
+ network,
+ source,
+ is_drop
+ )
+ VALUES
+ (
+ %s, %s, %s
+ )""", "%s" % network, name, True,
+ )
+
+ # Raise an exception if we could not import anything
+ if not lines:
+ raise RuntimeError("Received bogus feed %s with no data" % name)
+
+ async def _import_spamhaus_asndrop(self, name, f):
+ """
+ Import Spamhaus ASNDROP feed
+ """
+ for line in f:
+ # Decode the line
+ line = line.decode("utf-8")
+
+ # Parse JSON
+ try:
+ line = json.loads(line)
+ except json.JSONDecodeError as e:
+ log.warning("%s: Unable to parse JSON object %s: %s" % (name, line, e))
+ continue
+
+ # Fetch type
+ type = line.get("type")
+
+ # Skip any metadata
+ if type == "metadata":
+ continue
+
+ # Fetch ASN
+ asn = line.get("asn")
+
+ # Skip any lines without an ASN
+ if not asn:
+ continue
+
+ # Filter invalid ASNs
+ if not self._check_parsed_asn(asn):
+ log.warning("%s: Skipping bogus ASN %s" % (name, asn))
+ continue
+
+ # Write to database
+ self.db.execute("""
+ INSERT INTO
+ autnum_feeds
+ (
+ number,
+ source,
+ is_drop
+ )
+ VALUES
+ (
+ %s, %s, %s
+ )""", "%s" % asn, name, True,
+ )
+
+ @staticmethod
+ def _parse_bool(block, key):
+ val = block.get(key)
+
+ # There is no point to proceed when we got None
+ if val is None:
+ return
+
+ # Convert to lowercase
+ val = val.lower()
+
+ # True
+ if val in ("yes", "1"):
+ return True
+
+ # False
+ if val in ("no", "0"):
+ return False
+
+ # Default to None
+ return None
+
+ async def handle_import_countries(self, ns):
+ with self.db.transaction():
+ # Drop all data that we have
+ self.db.execute("TRUNCATE TABLE countries")
+
+ for file in ns.file:
+ for line in file:
+ line = line.rstrip()
+
+ # Ignore any comments
+ if line.startswith("#"):
+ continue
+
+ try:
+ country_code, continent_code, name = line.split(maxsplit=2)
+ except:
+ log.warning("Could not parse line: %s" % line)
+ continue
+
+ self.db.execute("INSERT INTO countries(country_code, name, continent_code) \
+ VALUES(%s, %s, %s) ON CONFLICT DO NOTHING", country_code, name, continent_code)
+
+
+def split_line(line):
+ key, colon, val = line.partition(":")
+
+ # Strip any excess space
+ key = key.strip()
+ val = val.strip()
+
+ return key, val
+
+def read_blocks(f):
+ for block in iterate_over_blocks(f):
+ type = None
+ data = {}
+
+ for i, line in enumerate(block):
+ key, value = line.split(":", 1)
+
+ # The key of the first line defines the type
+ if i == 0:
+ type = key
+
+ # Strip any excess whitespace
+ value = value.strip()
+
+ # Store some values as a list
+ if type == "geofeed" and key == "network":
+ try:
+ data[key].append(value)
+ except KeyError:
+ data[key] = [value]
+
+ # Otherwise store the value as string
+ else:
+ data[key] = value
+
+ yield type, data
+
+def iterate_over_blocks(f, charsets=("utf-8", "latin1")):
+ block = []
+
+ for line in f:
+ # Skip commented lines
+ if line.startswith(b"#") or line.startswith(b"%"):
+ continue
+
+ # Convert to string
+ for charset in charsets:
+ try:
+ line = line.decode(charset)
+ except UnicodeDecodeError:
+ continue
+ else:
+ break
+
+ # Remove any comments at the end of line
+ line, hash, comment = line.partition("#")
+
+ # Strip any whitespace at the end of the line
+ line = line.rstrip()
+
+ # If we cut off some comment and the line is empty, we can skip it
+ if comment and not line:
+ continue
+
+ # If the line has some content, keep collecting it
+ if line:
+ block.append(line)
+ continue
+
+ # End the block on an empty line
+ if block:
+ yield block
+
+ # Reset the block
+ block = []
+
+ # Return the last block
+ if block:
+ yield block
+
+def iterate_over_lines(f):
+ for line in f:
+ # Decode the line
+ line = line.decode()
+
+ # Strip the ending
+ yield line.rstrip()
+
+async def main():
+ # Run the command line interface
+ c = CLI()
+
+ await c.run()
+
+asyncio.run(main())