#!/usr/bin/python3 ############################################################################### # # # libloc - A library to determine the location of someone on the Internet # # # # Copyright (C) 2020-2024 IPFire Development Team # # # # This library is free software; you can redistribute it and/or # # modify it under the terms of the GNU Lesser General Public # # License as published by the Free Software Foundation; either # # version 2.1 of the License, or (at your option) any later version. # # # # This library is distributed in the hope that it will be useful, # # but WITHOUT ANY WARRANTY; without even the implied warranty of # # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # # Lesser General Public License for more details. # # # ############################################################################### import argparse import csv import functools import http.client import ipaddress import json import logging import math import re import socket import sys import urllib.error # Load our location module import location import location.database from location.downloader import Downloader from location.i18n import _ # Initialise logging log = logging.getLogger("location.importer") log.propagate = 1 # Define constants VALID_ASN_RANGES = ( (1, 23455), (23457, 64495), (131072, 4199999999), ) TRANSLATED_COUNTRIES = { # When people say UK, they mean GB "UK" : "GB", } IGNORED_COUNTRIES = set(( # Formerly Yugoslavia "YU", # Some people use ZZ to say "no country" or to hide the country "ZZ", )) # Configure the CSV parser for ARIN csv.register_dialect("arin", delimiter=",", quoting=csv.QUOTE_ALL, quotechar="\"") class CLI(object): def parse_cli(self): parser = argparse.ArgumentParser( description=_("Location Importer Command Line Interface"), ) subparsers = parser.add_subparsers() # Global configuration flags parser.add_argument("--debug", action="store_true", help=_("Enable debug output")) parser.add_argument("--quiet", action="store_true", help=_("Enable quiet mode")) # version parser.add_argument("--version", action="version", version="%(prog)s @VERSION@") # Database parser.add_argument("--database-host", required=True, help=_("Database Hostname"), metavar=_("HOST")) parser.add_argument("--database-name", required=True, help=_("Database Name"), metavar=_("NAME")) parser.add_argument("--database-username", required=True, help=_("Database Username"), metavar=_("USERNAME")) parser.add_argument("--database-password", required=True, help=_("Database Password"), metavar=_("PASSWORD")) # Write Database write = subparsers.add_parser("write", help=_("Write database to file")) write.set_defaults(func=self.handle_write) write.add_argument("file", nargs=1, help=_("Database File")) write.add_argument("--signing-key", nargs="?", type=open, help=_("Signing Key")) write.add_argument("--backup-signing-key", nargs="?", type=open, help=_("Backup Signing Key")) write.add_argument("--vendor", nargs="?", help=_("Sets the vendor")) write.add_argument("--description", nargs="?", help=_("Sets a description")) write.add_argument("--license", nargs="?", help=_("Sets the license")) write.add_argument("--version", type=int, help=_("Database Format Version")) # Update WHOIS update_whois = subparsers.add_parser("update-whois", help=_("Update WHOIS Information")) update_whois.add_argument("sources", nargs="*", help=_("Only update these sources")) update_whois.set_defaults(func=self.handle_update_whois) # Update announcements update_announcements = subparsers.add_parser("update-announcements", help=_("Update BGP Annoucements")) update_announcements.set_defaults(func=self.handle_update_announcements) update_announcements.add_argument("server", nargs=1, help=_("Route Server to connect to"), metavar=_("SERVER")) # Update geofeeds update_geofeeds = subparsers.add_parser("update-geofeeds", help=_("Update Geofeeds")) update_geofeeds.set_defaults(func=self.handle_update_geofeeds) # Update feeds update_feeds = subparsers.add_parser("update-feeds", help=_("Update Feeds")) update_feeds.add_argument("feeds", nargs="*", help=_("Only update these feeds")) update_feeds.set_defaults(func=self.handle_update_feeds) # Update overrides update_overrides = subparsers.add_parser("update-overrides", help=_("Update overrides"), ) update_overrides.add_argument( "files", nargs="+", help=_("Files to import"), ) update_overrides.set_defaults(func=self.handle_update_overrides) # Import countries import_countries = subparsers.add_parser("import-countries", help=_("Import countries"), ) import_countries.add_argument("file", nargs=1, type=argparse.FileType("r"), help=_("File to import")) import_countries.set_defaults(func=self.handle_import_countries) args = parser.parse_args() # Configure logging if args.debug: location.logger.set_level(logging.DEBUG) elif args.quiet: location.logger.set_level(logging.WARNING) # Print usage if no action was given if not "func" in args: parser.print_usage() sys.exit(2) return args def run(self): # Parse command line arguments args = self.parse_cli() # Initialize the downloader self.downloader = Downloader() # Initialise database self.db = self._setup_database(args) # Call function ret = args.func(args) # Return with exit code if ret: sys.exit(ret) # Otherwise just exit sys.exit(0) def _setup_database(self, ns): """ Initialise the database """ # Connect to database db = location.database.Connection( host=ns.database_host, database=ns.database_name, user=ns.database_username, password=ns.database_password, ) with db.transaction(): db.execute(""" -- announcements CREATE TABLE IF NOT EXISTS announcements(network inet, autnum bigint, first_seen_at timestamp without time zone DEFAULT CURRENT_TIMESTAMP, last_seen_at timestamp without time zone DEFAULT CURRENT_TIMESTAMP); CREATE UNIQUE INDEX IF NOT EXISTS announcements_networks ON announcements(network); CREATE INDEX IF NOT EXISTS announcements_family ON announcements(family(network)); CREATE INDEX IF NOT EXISTS announcements_search ON announcements USING GIST(network inet_ops); -- autnums CREATE TABLE IF NOT EXISTS autnums(number bigint, name text NOT NULL); ALTER TABLE autnums ADD COLUMN IF NOT EXISTS source text; CREATE UNIQUE INDEX IF NOT EXISTS autnums_number ON autnums(number); -- countries CREATE TABLE IF NOT EXISTS countries( country_code text NOT NULL, name text NOT NULL, continent_code text NOT NULL); CREATE UNIQUE INDEX IF NOT EXISTS countries_country_code ON countries(country_code); -- networks CREATE TABLE IF NOT EXISTS networks(network inet, country text); ALTER TABLE networks ADD COLUMN IF NOT EXISTS original_countries text[]; ALTER TABLE networks ADD COLUMN IF NOT EXISTS source text; CREATE UNIQUE INDEX IF NOT EXISTS networks_network ON networks(network); CREATE INDEX IF NOT EXISTS networks_family ON networks USING BTREE(family(network)); CREATE INDEX IF NOT EXISTS networks_search ON networks USING GIST(network inet_ops); -- geofeeds CREATE TABLE IF NOT EXISTS geofeeds( id serial primary key, url text, status integer default null, updated_at timestamp without time zone default null ); ALTER TABLE geofeeds ADD COLUMN IF NOT EXISTS error text; CREATE UNIQUE INDEX IF NOT EXISTS geofeeds_unique ON geofeeds(url); CREATE TABLE IF NOT EXISTS geofeed_networks( geofeed_id integer references geofeeds(id) on delete cascade, network inet, country text, region text, city text ); CREATE INDEX IF NOT EXISTS geofeed_networks_geofeed_id ON geofeed_networks(geofeed_id); CREATE INDEX IF NOT EXISTS geofeed_networks_search ON geofeed_networks USING GIST(network inet_ops); CREATE TABLE IF NOT EXISTS network_geofeeds(network inet, url text); ALTER TABLE network_geofeeds ADD COLUMN IF NOT EXISTS source text NOT NULL; CREATE UNIQUE INDEX IF NOT EXISTS network_geofeeds_unique ON network_geofeeds(network); CREATE INDEX IF NOT EXISTS network_geofeeds_search ON network_geofeeds USING GIST(network inet_ops); CREATE INDEX IF NOT EXISTS network_geofeeds_url ON network_geofeeds(url); -- feeds CREATE TABLE IF NOT EXISTS autnum_feeds( number bigint NOT NULL, source text NOT NULL, name text, country text, is_anonymous_proxy boolean, is_satellite_provider boolean, is_anycast boolean, is_drop boolean ); CREATE UNIQUE INDEX IF NOT EXISTS autnum_feeds_unique ON autnum_feeds(number, source); CREATE TABLE IF NOT EXISTS network_feeds( network inet NOT NULL, source text NOT NULL, country text, is_anonymous_proxy boolean, is_satellite_provider boolean, is_anycast boolean, is_drop boolean ); CREATE UNIQUE INDEX IF NOT EXISTS network_feeds_unique ON network_feeds(network, source); CREATE INDEX IF NOT EXISTS network_feeds_search ON network_feeds USING GIST(network inet_ops); -- overrides CREATE TABLE IF NOT EXISTS autnum_overrides( number bigint NOT NULL, name text, country text, is_anonymous_proxy boolean, is_satellite_provider boolean, is_anycast boolean ); CREATE UNIQUE INDEX IF NOT EXISTS autnum_overrides_number ON autnum_overrides(number); ALTER TABLE autnum_overrides ADD COLUMN IF NOT EXISTS is_drop boolean; ALTER TABLE autnum_overrides DROP COLUMN IF EXISTS source; CREATE TABLE IF NOT EXISTS network_overrides( network inet NOT NULL, country text, is_anonymous_proxy boolean, is_satellite_provider boolean, is_anycast boolean ); CREATE UNIQUE INDEX IF NOT EXISTS network_overrides_network ON network_overrides(network); CREATE INDEX IF NOT EXISTS network_overrides_search ON network_overrides USING GIST(network inet_ops); ALTER TABLE network_overrides ADD COLUMN IF NOT EXISTS is_drop boolean; ALTER TABLE network_overrides DROP COLUMN IF EXISTS source; CREATE TABLE IF NOT EXISTS geofeed_overrides( url text NOT NULL ); CREATE UNIQUE INDEX IF NOT EXISTS geofeed_overrides_url ON geofeed_overrides(url); """) return db def fetch_countries(self): """ Returns a list of all countries on the list """ # Fetch all valid country codes to check parsed networks aganist... countries = self.db.query("SELECT country_code FROM countries ORDER BY country_code") return set((country.country_code for country in countries)) def handle_write(self, ns): """ Compiles a database in libloc format out of what is in the database """ # Allocate a writer writer = location.Writer(ns.signing_key, ns.backup_signing_key) # Set all metadata if ns.vendor: writer.vendor = ns.vendor if ns.description: writer.description = ns.description if ns.license: writer.license = ns.license # Add all Autonomous Systems log.info("Writing Autonomous Systems...") # Select all ASes with a name rows = self.db.query(""" SELECT autnums.number AS number, COALESCE( overrides.name, autnums.name ) AS name FROM autnums LEFT JOIN autnum_overrides overrides ON autnums.number = overrides.number ORDER BY autnums.number """) for row in rows: # Skip AS without names if not row.name: continue a = writer.add_as(row.number) a.name = row.name # Add all networks log.info("Writing networks...") # Select all known networks rows = self.db.query(""" WITH known_networks AS ( SELECT network FROM announcements UNION SELECT network FROM networks UNION SELECT network FROM network_feeds UNION SELECT network FROM network_overrides UNION SELECT network FROM geofeed_networks ), ordered_networks AS ( SELECT known_networks.network AS network, announcements.autnum AS autnum, networks.country AS country, -- Must be part of returned values for ORDER BY clause masklen(announcements.network) AS sort_a, masklen(networks.network) AS sort_b FROM known_networks LEFT JOIN announcements ON known_networks.network <<= announcements.network LEFT JOIN networks ON known_networks.network <<= networks.network ORDER BY known_networks.network, sort_a DESC, sort_b DESC ) -- Return a list of those networks enriched with all -- other information that we store in the database SELECT DISTINCT ON (network) network, autnum, -- Country COALESCE( ( SELECT country FROM network_overrides overrides WHERE networks.network <<= overrides.network ORDER BY masklen(overrides.network) DESC LIMIT 1 ), ( SELECT country FROM autnum_overrides overrides WHERE networks.autnum = overrides.number ), ( SELECT country FROM network_feeds feeds WHERE networks.network <<= feeds.network ORDER BY masklen(feeds.network) DESC LIMIT 1 ), ( SELECT country FROM autnum_feeds feeds WHERE networks.autnum = feeds.number ORDER BY source LIMIT 1 ), ( SELECT geofeed_networks.country AS country FROM network_geofeeds -- Join the data from the geofeeds LEFT JOIN geofeeds ON network_geofeeds.url = geofeeds.url LEFT JOIN geofeed_networks ON geofeeds.id = geofeed_networks.geofeed_id -- Check whether we have a geofeed for this network WHERE networks.network <<= network_geofeeds.network AND networks.network <<= geofeed_networks.network -- Filter for the best result ORDER BY masklen(geofeed_networks.network) DESC LIMIT 1 ), networks.country ) AS country, -- Flags COALESCE( ( SELECT is_anonymous_proxy FROM network_overrides overrides WHERE networks.network <<= overrides.network ORDER BY masklen(overrides.network) DESC LIMIT 1 ), ( SELECT is_anonymous_proxy FROM network_feeds feeds WHERE networks.network <<= feeds.network ORDER BY masklen(feeds.network) DESC LIMIT 1 ), ( SELECT is_anonymous_proxy FROM autnum_feeds feeds WHERE networks.autnum = feeds.number ORDER BY source LIMIT 1 ), ( SELECT is_anonymous_proxy FROM autnum_overrides overrides WHERE networks.autnum = overrides.number ), FALSE ) AS is_anonymous_proxy, COALESCE( ( SELECT is_satellite_provider FROM network_overrides overrides WHERE networks.network <<= overrides.network ORDER BY masklen(overrides.network) DESC LIMIT 1 ), ( SELECT is_satellite_provider FROM network_feeds feeds WHERE networks.network <<= feeds.network ORDER BY masklen(feeds.network) DESC LIMIT 1 ), ( SELECT is_satellite_provider FROM autnum_feeds feeds WHERE networks.autnum = feeds.number ORDER BY source LIMIT 1 ), ( SELECT is_satellite_provider FROM autnum_overrides overrides WHERE networks.autnum = overrides.number ), FALSE ) AS is_satellite_provider, COALESCE( ( SELECT is_anycast FROM network_overrides overrides WHERE networks.network <<= overrides.network ORDER BY masklen(overrides.network) DESC LIMIT 1 ), ( SELECT is_anycast FROM network_feeds feeds WHERE networks.network <<= feeds.network ORDER BY masklen(feeds.network) DESC LIMIT 1 ), ( SELECT is_anycast FROM autnum_feeds feeds WHERE networks.autnum = feeds.number ORDER BY source LIMIT 1 ), ( SELECT is_anycast FROM autnum_overrides overrides WHERE networks.autnum = overrides.number ), FALSE ) AS is_anycast, COALESCE( ( SELECT is_drop FROM network_overrides overrides WHERE networks.network <<= overrides.network ORDER BY masklen(overrides.network) DESC LIMIT 1 ), ( SELECT is_drop FROM network_feeds feeds WHERE networks.network <<= feeds.network ORDER BY masklen(feeds.network) DESC LIMIT 1 ), ( SELECT is_drop FROM autnum_feeds feeds WHERE networks.autnum = feeds.number ORDER BY source LIMIT 1 ), ( SELECT is_drop FROM autnum_overrides overrides WHERE networks.autnum = overrides.number ), FALSE ) AS is_drop FROM ordered_networks networks """) for row in rows: network = writer.add_network(row.network) # Save country if row.country: network.country_code = row.country # Save ASN if row.autnum: network.asn = row.autnum # Set flags if row.is_anonymous_proxy: network.set_flag(location.NETWORK_FLAG_ANONYMOUS_PROXY) if row.is_satellite_provider: network.set_flag(location.NETWORK_FLAG_SATELLITE_PROVIDER) if row.is_anycast: network.set_flag(location.NETWORK_FLAG_ANYCAST) if row.is_drop: network.set_flag(location.NETWORK_FLAG_DROP) # Add all countries log.info("Writing countries...") rows = self.db.query("SELECT * FROM countries ORDER BY country_code") for row in rows: c = writer.add_country(row.country_code) c.continent_code = row.continent_code c.name = row.name # Write everything to file log.info("Writing database to file...") for file in ns.file: writer.write(file) def handle_update_whois(self, ns): # Did we run successfully? success = True sources = ( # African Network Information Centre ("AFRINIC", ( (self._import_standard_format, "https://ftp.afrinic.net/pub/pub/dbase/afrinic.db.gz"), )), # Asia Pacific Network Information Centre ("APNIC", ( (self._import_standard_format, "https://ftp.apnic.net/apnic/whois/apnic.db.inet6num.gz"), (self._import_standard_format, "https://ftp.apnic.net/apnic/whois/apnic.db.inetnum.gz"), (self._import_standard_format, "https://ftp.apnic.net/apnic/whois/apnic.db.aut-num.gz"), (self._import_standard_format, "https://ftp.apnic.net/apnic/whois/apnic.db.organisation.gz"), )), # American Registry for Internet Numbers ("ARIN", ( (self._import_extended_format, "https://ftp.arin.net/pub/stats/arin/delegated-arin-extended-latest"), (self._import_arin_as_names, "https://ftp.arin.net/pub/resource_registry_service/asns.csv"), )), # Japan Network Information Center ("JPNIC", ( (self._import_standard_format, "https://ftp.nic.ad.jp/jpirr/jpirr.db.gz"), )), # Latin America and Caribbean Network Information Centre ("LACNIC", ( (self._import_standard_format, "https://ftp.lacnic.net/lacnic/dbase/lacnic.db.gz"), (self._import_extended_format, "https://ftp.lacnic.net/pub/stats/lacnic/delegated-lacnic-extended-latest"), )), # Réseaux IP Européens ("RIPE", ( (self._import_standard_format, "https://ftp.ripe.net/ripe/dbase/split/ripe.db.inet6num.gz"), (self._import_standard_format, "https://ftp.ripe.net/ripe/dbase/split/ripe.db.inetnum.gz"), (self._import_standard_format, "https://ftp.ripe.net/ripe/dbase/split/ripe.db.aut-num.gz"), (self._import_standard_format, "https://ftp.ripe.net/ripe/dbase/split/ripe.db.organisation.gz"), )), ) # Fetch all valid country codes to check parsed networks against countries = self.fetch_countries() # Check if we have countries if not countries: log.error("Please import countries before importing any WHOIS data") return 1 # Iterate over all potential sources for name, feeds in sources: # Skip anything that should not be updated if ns.sources and not name in ns.sources: continue try: self._process_source(name, feeds, countries) # Log an error but continue if an exception occurs except Exception as e: log.error("Error processing source %s" % name, exc_info=True) success = False # Return a non-zero exit code for errors return 0 if success else 1 def _process_source(self, source, feeds, countries): """ This function processes one source """ # Wrap everything into one large transaction with self.db.transaction(): # Remove all previously imported content self.db.execute("DELETE FROM autnums WHERE source = %s", source) self.db.execute("DELETE FROM networks WHERE source = %s", source) self.db.execute("DELETE FROM network_geofeeds WHERE source = %s", source) # Create some temporary tables to store parsed data self.db.execute(""" CREATE TEMPORARY TABLE _autnums(number integer NOT NULL, organization text NOT NULL, source text NOT NULL) ON COMMIT DROP; CREATE UNIQUE INDEX _autnums_number ON _autnums(number); CREATE TEMPORARY TABLE _organizations(handle text NOT NULL, name text NOT NULL, source text NOT NULL) ON COMMIT DROP; CREATE UNIQUE INDEX _organizations_handle ON _organizations(handle); CREATE TEMPORARY TABLE _rirdata(network inet NOT NULL, country text, original_countries text[] NOT NULL, source text NOT NULL) ON COMMIT DROP; CREATE INDEX _rirdata_search ON _rirdata USING BTREE(family(network), masklen(network)); CREATE UNIQUE INDEX _rirdata_network ON _rirdata(network); """) # Parse all feeds for callback, url, *args in feeds: # Retrieve the feed f = self.downloader.retrieve(url) # Call the callback callback(source, countries, f, *args) # Process all parsed networks from every RIR we happen to have access to, # insert the largest network chunks into the networks table immediately... families = self.db.query(""" SELECT DISTINCT family(network) AS family FROM _rirdata ORDER BY family(network) """, ) for family in (row.family for row in families): # Fetch the smallest mask length in our data set smallest = self.db.get(""" SELECT MIN( masklen(network) ) AS prefix FROM _rirdata WHERE family(network) = %s """, family, ) # Copy all networks self.db.execute(""" INSERT INTO networks ( network, country, original_countries, source ) SELECT network, country, original_countries, source FROM _rirdata WHERE masklen(network) = %s AND family(network) = %s ON CONFLICT DO NOTHING""", smallest.prefix, family, ) # ... determine any other prefixes for this network family, ... prefixes = self.db.query(""" SELECT DISTINCT masklen(network) AS prefix FROM _rirdata WHERE family(network) = %s ORDER BY masklen(network) ASC OFFSET 1 """, family, ) # ... and insert networks with this prefix in case they provide additional # information (i. e. subnet of a larger chunk with a different country) for prefix in (row.prefix for row in prefixes): self.db.execute(""" WITH candidates AS ( SELECT _rirdata.network, _rirdata.country, _rirdata.original_countries, _rirdata.source FROM _rirdata WHERE family(_rirdata.network) = %s AND masklen(_rirdata.network) = %s ), filtered AS ( SELECT DISTINCT ON (c.network) c.network, c.country, c.original_countries, c.source, masklen(networks.network), networks.country AS parent_country FROM candidates c LEFT JOIN networks ON c.network << networks.network ORDER BY c.network, masklen(networks.network) DESC NULLS LAST ) INSERT INTO networks(network, country, original_countries, source) SELECT network, country, original_countries, source FROM filtered WHERE parent_country IS NULL OR country <> parent_country ON CONFLICT DO NOTHING """, family, prefix, ) self.db.execute(""" INSERT INTO autnums ( number, name, source ) SELECT _autnums.number, _organizations.name, _organizations.source FROM _autnums JOIN _organizations ON _autnums.organization = _organizations.handle ON CONFLICT ( number ) DO UPDATE SET name = excluded.name """, ) def _import_standard_format(self, source, countries, f, *args): """ Imports a single standard format source feed """ # Iterate over all blocks for block in iterate_over_blocks(f): self._parse_block(block, source, countries) def _import_extended_format(self, source, countries, f, *args): # Iterate over all lines for line in iterate_over_lines(f): self._parse_line(block, source, countries) def _import_arin_as_names(self, source, countries, f, *args): # Walk through the file for line in csv.DictReader(feed, dialect="arin"): log.debug("Processing object: %s" % line) # Fetch status status = line.get("Status") # We are only interested in anything managed by ARIN if not status == "Full Registry Services": continue # Fetch organization name name = line.get("Org Name") # Extract ASNs first_asn = line.get("Start AS Number") last_asn = line.get("End AS Number") # Cast to a number try: first_asn = int(first_asn) except TypeError as e: log.warning("Could not parse ASN '%s'" % first_asn) continue try: last_asn = int(last_asn) except TypeError as e: log.warning("Could not parse ASN '%s'" % last_asn) continue # Check if the range is valid if last_asn < first_asn: log.warning("Invalid ASN range %s-%s" % (first_asn, last_asn)) # Insert everything into the database for asn in range(first_asn, last_asn + 1): if not self._check_parsed_asn(asn): log.warning("Skipping invalid ASN %s" % asn) continue self.db.execute(""" INSERT INTO autnums ( number, name, source ) VALUES ( %s, %s, %s ) ON CONFLICT ( number ) DO NOTHING """, asn, name, "ARIN", ) def _check_parsed_network(self, network): """ Assistive function to detect and subsequently sort out parsed networks from RIR data (both Whois and so-called "extended sources"), which are or have... (a) not globally routable (RFC 1918 space, et al.) (b) covering a too large chunk of the IP address space (prefix length is < 7 for IPv4 networks, and < 10 for IPv6) (c) "0.0.0.0" or "::" as a network address This unfortunately is necessary due to brain-dead clutter across various RIR databases, causing mismatches and eventually disruptions. We will return False in case a network is not suitable for adding it to our database, and True otherwise. """ # Check input if isinstance(network, ipaddress.IPv6Network): pass elif isinstance(network, ipaddress.IPv4Network): pass else: raise ValueError("Invalid network: %s (type %s)" % (network, type(network))) # Ignore anything that isn't globally routable if not network.is_global: log.debug("Skipping non-globally routable network: %s" % network) return False # Ignore anything that is unspecified IP range (See RFC 5735 for IPv4 or RFC 2373 for IPv6) elif network.is_unspecified: log.debug("Skipping unspecified network: %s" % network) return False # IPv6 if network.version == 6: if network.prefixlen < 10: log.debug("Skipping too big IP chunk: %s" % network) return False # IPv4 elif network.version == 4: if network.prefixlen < 7: log.debug("Skipping too big IP chunk: %s" % network) return False # In case we have made it here, the network is considered to # be suitable for libloc consumption... return True def _check_parsed_asn(self, asn): """ Assistive function to filter Autonomous System Numbers not being suitable for adding to our database. Returns False in such cases, and True otherwise. """ for start, end in VALID_ASN_RANGES: if start <= asn and end >= asn: return True log.info("Supplied ASN %s out of publicly routable ASN ranges" % asn) return False def _parse_block(self, block, source_key, countries): # Get first line to find out what type of block this is line = block[0] # aut-num if line.startswith("aut-num:"): return self._parse_autnum_block(block, source_key) # inetnum if line.startswith("inet6num:") or line.startswith("inetnum:"): return self._parse_inetnum_block(block, source_key, countries) # organisation elif line.startswith("organisation:"): return self._parse_org_block(block, source_key) def _parse_autnum_block(self, block, source_key): autnum = {} for line in block: # Split line key, val = split_line(line) if key == "aut-num": m = re.match(r"^(AS|as)(\d+)", val) if m: autnum["asn"] = m.group(2) elif key == "org": autnum[key] = val.upper() elif key == "descr": # Save the first description line as well... if not key in autnum: autnum[key] = val # Skip empty objects if not autnum or not "asn" in autnum: return # Insert a dummy organisation handle into our temporary organisations # table in case the AS does not have an organisation handle set, but # has a description (a quirk often observed in APNIC area), so we can # later display at least some string for this AS. if not "org" in autnum: if "descr" in autnum: autnum["org"] = "LIBLOC-%s-ORGHANDLE" % autnum.get("asn") self.db.execute("INSERT INTO _organizations(handle, name, source) \ VALUES(%s, %s, %s) ON CONFLICT (handle) DO NOTHING", autnum.get("org"), autnum.get("descr"), source_key, ) else: log.warning("ASN %s neither has an organisation handle nor a description line set, omitting" % \ autnum.get("asn")) return # Insert into database self.db.execute("INSERT INTO _autnums(number, organization, source) \ VALUES(%s, %s, %s) ON CONFLICT (number) DO UPDATE SET \ organization = excluded.organization", autnum.get("asn"), autnum.get("org"), source_key, ) def _parse_inetnum_block(self, block, source_key, countries): inetnum = {} for line in block: # Split line key, val = split_line(line) # Filter any inetnum records which are only referring to IP space # not managed by that specific RIR... if key == "netname": if re.match(r"^(ERX-NETBLOCK|(AFRINIC|ARIN|LACNIC|RIPE)-CIDR-BLOCK|IANA-NETBLOCK-\d{1,3}|NON-RIPE-NCC-MANAGED-ADDRESS-BLOCK|STUB-[\d-]{3,}SLASH\d{1,2})", val.strip()): log.debug("Skipping record indicating historic/orphaned data: %s" % val.strip()) return if key == "inetnum": start_address, delim, end_address = val.partition("-") # Strip any excess space start_address, end_address = start_address.rstrip(), end_address.strip() # Handle "inetnum" formatting in LACNIC DB (e.g. "24.152.8/22" instead of "24.152.8.0/22") if start_address and not (delim or end_address): try: start_address = ipaddress.ip_network(start_address, strict=False) except ValueError: start_address = start_address.split("/") ldigits = start_address[0].count(".") # How many octets do we need to add? # (LACNIC does not seem to have a /8 or greater assigned, so the following should suffice.) if ldigits == 1: start_address = start_address[0] + ".0.0/" + start_address[1] elif ldigits == 2: start_address = start_address[0] + ".0/" + start_address[1] else: log.warning("Could not recover IPv4 address from line in LACNIC DB format: %s" % line) return try: start_address = ipaddress.ip_network(start_address, strict=False) except ValueError: log.warning("Could not parse line in LACNIC DB format: %s" % line) return # Enumerate first and last IP address of this network end_address = start_address[-1] start_address = start_address[0] else: # Convert to IP address try: start_address = ipaddress.ip_address(start_address) end_address = ipaddress.ip_address(end_address) except ValueError: log.warning("Could not parse line: %s" % line) return inetnum["inetnum"] = list(ipaddress.summarize_address_range(start_address, end_address)) elif key == "inet6num": inetnum[key] = [ipaddress.ip_network(val, strict=False)] elif key == "country": cc = val.upper() # Ignore certain country codes if cc in IGNORED_COUNTRIES: log.debug("Ignoring country code '%s'" % cc) continue # Translate country codes try: cc = TRANSLATED_COUNTRIES[cc] except KeyError: pass # Do we know this country? if not cc in countries: log.warning("Skipping invalid country code '%s'" % cc) continue try: inetnum[key].append(cc) except KeyError: inetnum[key] = [cc] # Parse the geofeed attribute elif key == "geofeed": inetnum["geofeed"] = val # Parse geofeed when used as a remark elif key == "remarks": m = re.match(r"^(?:Geofeed)\s+(https://.*)", val) if m: inetnum["geofeed"] = m.group(1) # Skip empty objects if not inetnum: return # Iterate through all networks enumerated from above, check them for plausibility and insert # them into the database, if _check_parsed_network() succeeded for single_network in inetnum.get("inet6num") or inetnum.get("inetnum"): if not self._check_parsed_network(single_network): continue # Fetch the countries or use a list with an empty country countries = inetnum.get("country", [None]) # Insert the network into the database but only use the first country code for cc in countries: self.db.execute(""" INSERT INTO _rirdata ( network, country, original_countries, source ) VALUES ( %s, %s, %s, %s ) ON CONFLICT (network) DO UPDATE SET country = excluded.country """, "%s" % single_network, cc, [cc for cc in countries if cc], source_key, ) # If there are more than one country, we will only use the first one break # Update any geofeed information geofeed = inetnum.get("geofeed", None) if geofeed: self._parse_geofeed(source_key, geofeed, single_network) def _parse_geofeed(self, source, url, single_network): # Parse the URL url = urllib.parse.urlparse(url) # Make sure that this is a HTTPS URL if not url.scheme == "https": log.debug("Geofeed URL is not using HTTPS: %s" % geofeed) return # Put the URL back together normalized url = url.geturl() # Store/update any geofeeds self.db.execute(""" INSERT INTO network_geofeeds ( network, url, source ) VALUES ( %s, %s, %s ) ON CONFLICT (network) DO UPDATE SET url = excluded.url""", "%s" % single_network, url, source, ) def _parse_org_block(self, block, source_key): org = {} for line in block: # Split line key, val = split_line(line) if key == "organisation": org[key] = val.upper() elif key == "org-name": org[key] = val # Skip empty objects if not org: return self.db.execute("INSERT INTO _organizations(handle, name, source) \ VALUES(%s, %s, %s) ON CONFLICT (handle) DO \ UPDATE SET name = excluded.name", org.get("organisation"), org.get("org-name"), source_key, ) def _parse_line(self, line, source_key, validcountries=None): # Skip version line if line.startswith("2"): return # Skip comments if line.startswith("#"): return try: registry, country_code, type, line = line.split("|", 3) except: log.warning("Could not parse line: %s" % line) return # Skip any unknown protocols if not type in ("ipv6", "ipv4"): log.warning("Unknown IP protocol '%s'" % type) return # Skip any lines that are for stats only or do not have a country # code at all (avoids log spam below) if not country_code or country_code == '*': return # Skip objects with unknown country codes if validcountries and country_code not in validcountries: log.warning("Skipping line with bogus country '%s': %s" % \ (country_code, line)) return try: address, prefix, date, status, organization = line.split("|") except ValueError: organization = None # Try parsing the line without organization try: address, prefix, date, status = line.split("|") except ValueError: log.warning("Unhandled line format: %s" % line) return # Skip anything that isn't properly assigned if not status in ("assigned", "allocated"): return # Cast prefix into an integer try: prefix = int(prefix) except: log.warning("Invalid prefix: %s" % prefix) return # Fix prefix length for IPv4 if type == "ipv4": prefix = 32 - int(math.log(prefix, 2)) # Try to parse the address try: network = ipaddress.ip_network("%s/%s" % (address, prefix), strict=False) except ValueError: log.warning("Invalid IP address: %s" % address) return if not self._check_parsed_network(network): return self.db.execute(""" INSERT INTO networks ( network, country, original_countries, source ) VALUES ( %s, %s, %s, %s ) ON CONFLICT (network) DO UPDATE SET country = excluded.country """, "%s" % network, country_code, [country], source_key, ) def handle_update_announcements(self, ns): server = ns.server[0] with self.db.transaction(): if server.startswith("/"): self._handle_update_announcements_from_bird(server) # Purge anything we never want here self.db.execute(""" -- Delete default routes DELETE FROM announcements WHERE network = '::/0' OR network = '0.0.0.0/0'; -- Delete anything that is not global unicast address space DELETE FROM announcements WHERE family(network) = 6 AND NOT network <<= '2000::/3'; -- DELETE "current network" address space DELETE FROM announcements WHERE family(network) = 4 AND network <<= '0.0.0.0/8'; -- DELETE local loopback address space DELETE FROM announcements WHERE family(network) = 4 AND network <<= '127.0.0.0/8'; -- DELETE RFC 1918 address space DELETE FROM announcements WHERE family(network) = 4 AND network <<= '10.0.0.0/8'; DELETE FROM announcements WHERE family(network) = 4 AND network <<= '172.16.0.0/12'; DELETE FROM announcements WHERE family(network) = 4 AND network <<= '192.168.0.0/16'; -- DELETE test, benchmark and documentation address space DELETE FROM announcements WHERE family(network) = 4 AND network <<= '192.0.0.0/24'; DELETE FROM announcements WHERE family(network) = 4 AND network <<= '192.0.2.0/24'; DELETE FROM announcements WHERE family(network) = 4 AND network <<= '198.18.0.0/15'; DELETE FROM announcements WHERE family(network) = 4 AND network <<= '198.51.100.0/24'; DELETE FROM announcements WHERE family(network) = 4 AND network <<= '203.0.113.0/24'; -- DELETE CGNAT address space (RFC 6598) DELETE FROM announcements WHERE family(network) = 4 AND network <<= '100.64.0.0/10'; -- DELETE link local address space DELETE FROM announcements WHERE family(network) = 4 AND network <<= '169.254.0.0/16'; -- DELETE IPv6 to IPv4 (6to4) address space (RFC 3068) DELETE FROM announcements WHERE family(network) = 4 AND network <<= '192.88.99.0/24'; DELETE FROM announcements WHERE family(network) = 6 AND network <<= '2002::/16'; -- DELETE multicast and reserved address space DELETE FROM announcements WHERE family(network) = 4 AND network <<= '224.0.0.0/4'; DELETE FROM announcements WHERE family(network) = 4 AND network <<= '240.0.0.0/4'; -- Delete networks that are too small to be in the global routing table DELETE FROM announcements WHERE family(network) = 6 AND masklen(network) > 48; DELETE FROM announcements WHERE family(network) = 4 AND masklen(network) > 24; -- Delete any non-public or reserved ASNs DELETE FROM announcements WHERE NOT ( (autnum >= 1 AND autnum <= 23455) OR (autnum >= 23457 AND autnum <= 64495) OR (autnum >= 131072 AND autnum <= 4199999999) ); -- Delete everything that we have not seen for 14 days DELETE FROM announcements WHERE last_seen_at <= CURRENT_TIMESTAMP - INTERVAL '14 days'; """) def _handle_update_announcements_from_bird(self, server): # Pre-compile the regular expression for faster searching route = re.compile(b"^\s(.+?)\s+.+?\[(?:AS(.*?))?.\]$") log.info("Requesting routing table from Bird (%s)" % server) aggregated_networks = [] # Send command to list all routes for line in self._bird_cmd(server, "show route"): m = route.match(line) if not m: # Skip empty lines if not line: pass # Ignore any header lines with the name of the routing table elif line.startswith(b"Table"): pass # Log anything else else: log.debug("Could not parse line: %s" % line.decode()) continue # Fetch the extracted network and ASN network, autnum = m.groups() # Decode into strings if network: network = network.decode() if autnum: autnum = autnum.decode() # Collect all aggregated networks if not autnum: log.debug("%s is an aggregated network" % network) aggregated_networks.append(network) continue # Insert it into the database self.db.execute("INSERT INTO announcements(network, autnum) \ VALUES(%s, %s) ON CONFLICT (network) DO \ UPDATE SET autnum = excluded.autnum, last_seen_at = CURRENT_TIMESTAMP", network, autnum, ) # Process any aggregated networks for network in aggregated_networks: log.debug("Processing aggregated network %s" % network) # Run "show route all" for each network for line in self._bird_cmd(server, "show route %s all" % network): # Try finding the path m = re.match(b"\s+BGP\.as_path:.* (\d+) {\d+}$", line) if m: # Select the last AS number in the path autnum = m.group(1).decode() # Insert it into the database self.db.execute("INSERT INTO announcements(network, autnum) \ VALUES(%s, %s) ON CONFLICT (network) DO \ UPDATE SET autnum = excluded.autnum, last_seen_at = CURRENT_TIMESTAMP", network, autnum, ) # We don't need to process any more break def _bird_cmd(self, socket_path, command): # Connect to the socket s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) s.connect(socket_path) # Allocate some buffer buffer = b"" log.debug("Sending Bird command: %s" % command) # Send the command s.send(b"%s\n" % command.encode()) while True: # Fill up the buffer buffer += s.recv(4096) while True: # Search for the next newline pos = buffer.find(b"\n") # If we cannot find one, we go back and read more data if pos <= 0: break # Cut after the newline character pos += 1 # Split the line we want and keep the rest in buffer line, buffer = buffer[:pos], buffer[pos:] # Try parsing any status lines if len(line) > 4 and line[:4].isdigit() and line[4] in (32, 45): code, delim, line = int(line[:4]), line[4], line[5:] log.debug("Received response code %s from bird" % code) # End of output if code == 0: return # Ignore hello line elif code == 1: continue # Otherwise return the line yield line def handle_update_geofeeds(self, ns): # Sync geofeeds with self.db.transaction(): # Delete all geofeeds which are no longer linked self.db.execute(""" DELETE FROM geofeeds WHERE geofeeds.url NOT IN ( SELECT network_geofeeds.url FROM network_geofeeds UNION SELECT geofeed_overrides.url FROM geofeed_overrides ) """, ) # Copy all geofeeds self.db.execute(""" WITH all_geofeeds AS ( SELECT network_geofeeds.url FROM network_geofeeds UNION SELECT geofeed_overrides.url FROM geofeed_overrides ) INSERT INTO geofeeds ( url ) SELECT url FROM all_geofeeds ON CONFLICT (url) DO NOTHING """, ) # Fetch all Geofeeds that require an update geofeeds = self.db.query(""" SELECT id, url FROM geofeeds WHERE updated_at IS NULL OR updated_at <= CURRENT_TIMESTAMP - INTERVAL '1 week' ORDER BY id """) # Update all geofeeds for geofeed in geofeeds: with self.db.transaction(): self._fetch_geofeed(geofeed) # Delete data from any feeds that did not update in the last two weeks with self.db.transaction(): self.db.execute(""" DELETE FROM geofeed_networks WHERE geofeed_networks.geofeed_id IN ( SELECT geofeeds.id FROM geofeeds WHERE updated_at IS NULL OR updated_at <= CURRENT_TIMESTAMP - INTERVAL '2 weeks' ) """) def _fetch_geofeed(self, geofeed): log.debug("Fetching Geofeed %s" % geofeed.url) with self.db.transaction(): # Open the URL try: # Send the request f = self.downloader.retrieve(geofeed.url, headers={ "User-Agent" : "location/%s" % location.__version__, # We expect some plain text file in CSV format "Accept" : "text/csv, text/plain", }) # Remove any previous data self.db.execute("DELETE FROM geofeed_networks \ WHERE geofeed_id = %s", geofeed.id) lineno = 0 # Read the output line by line for line in f: lineno += 1 try: line = line.decode() # Ignore any lines we cannot decode except UnicodeDecodeError: log.debug("Could not decode line %s in %s" \ % (lineno, geofeed.url)) continue # Strip any newline line = line.rstrip() # Skip empty lines if not line: continue # Try to parse the line try: fields = line.split(",", 5) except ValueError: log.debug("Could not parse line: %s" % line) continue # Check if we have enough fields if len(fields) < 4: log.debug("Not enough fields in line: %s" % line) continue # Fetch all fields network, country, region, city, = fields[:4] # Try to parse the network try: network = ipaddress.ip_network(network, strict=False) except ValueError: log.debug("Could not parse network: %s" % network) continue # Strip any excess whitespace from country codes country = country.strip() # Make the country code uppercase country = country.upper() # Check the country code if not country: log.debug("Empty country code in Geofeed %s line %s" \ % (geofeed.url, lineno)) continue elif not location.country_code_is_valid(country): log.debug("Invalid country code in Geofeed %s:%s: %s" \ % (geofeed.url, lineno, country)) continue # Write this into the database self.db.execute(""" INSERT INTO geofeed_networks ( geofeed_id, network, country, region, city ) VALUES (%s, %s, %s, %s, %s)""", geofeed.id, "%s" % network, country, region, city, ) # Catch any HTTP errors except urllib.request.HTTPError as e: self.db.execute("UPDATE geofeeds SET status = %s, error = %s \ WHERE id = %s", e.code, "%s" % e, geofeed.id) # Remove any previous data when the feed has been deleted if e.code == 404: self.db.execute("DELETE FROM geofeed_networks \ WHERE geofeed_id = %s", geofeed.id) # Catch any other errors and connection timeouts except (http.client.InvalidURL, urllib.request.URLError, TimeoutError) as e: log.debug("Could not fetch URL %s: %s" % (geofeed.url, e)) self.db.execute("UPDATE geofeeds SET status = %s, error = %s \ WHERE id = %s", 599, "%s" % e, geofeed.id) # Mark the geofeed as updated else: self.db.execute(""" UPDATE geofeeds SET updated_at = CURRENT_TIMESTAMP, status = NULL, error = NULL WHERE id = %s""", geofeed.id, ) def handle_update_overrides(self, ns): with self.db.transaction(): # Drop any previous content self.db.execute("TRUNCATE TABLE autnum_overrides") self.db.execute("TRUNCATE TABLE geofeed_overrides") self.db.execute("TRUNCATE TABLE network_overrides") for file in ns.files: log.info("Reading %s..." % file) with open(file, "rb") as f: for type, block in read_blocks(f): if type == "net": network = block.get("net") # Try to parse and normalise the network try: network = ipaddress.ip_network(network, strict=False) except ValueError as e: log.warning("Invalid IP network: %s: %s" % (network, e)) continue # Prevent that we overwrite all networks if network.prefixlen == 0: log.warning("Skipping %s: You cannot overwrite default" % network) continue self.db.execute(""" INSERT INTO network_overrides ( network, country, is_anonymous_proxy, is_satellite_provider, is_anycast, is_drop ) VALUES ( %s, %s, %s, %s, %s, %s ) ON CONFLICT (network) DO NOTHING """, "%s" % network, block.get("country"), self._parse_bool(block, "is-anonymous-proxy"), self._parse_bool(block, "is-satellite-provider"), self._parse_bool(block, "is-anycast"), self._parse_bool(block, "drop"), ) elif type == "aut-num": autnum = block.get("aut-num") # Check if AS number begins with "AS" if not autnum.startswith("AS"): log.warning("Invalid AS number: %s" % autnum) continue # Strip "AS" autnum = autnum[2:] self.db.execute(""" INSERT INTO autnum_overrides ( number, name, country, is_anonymous_proxy, is_satellite_provider, is_anycast, is_drop ) VALUES ( %s, %s, %s, %s, %s, %s, %s ) ON CONFLICT (number) DO NOTHING """, autnum, block.get("name"), block.get("country"), self._parse_bool(block, "is-anonymous-proxy"), self._parse_bool(block, "is-satellite-provider"), self._parse_bool(block, "is-anycast"), self._parse_bool(block, "drop"), ) # Geofeeds elif type == "geofeed": url = block.get("geofeed") # XXX Check the URL self.db.execute(""" INSERT INTO geofeed_overrides ( url ) VALUES ( %s ) ON CONFLICT (url) DO NOTHING """, url, ) else: log.warning("Unsupported type: %s" % type) def handle_update_feeds(self, ns): """ Update any third-party feeds """ success = True feeds = ( # AWS IP Ranges ("AWS-IP-RANGES", self._import_aws_ip_ranges, "https://ip-ranges.amazonaws.com/ip-ranges.json"), # Spamhaus DROP ("SPAMHAUS-DROP", self._import_spamhaus_drop, "https://www.spamhaus.org/drop/drop.txt"), ("SPAMHAUS-EDROP", self._import_spamhaus_drop, "https://www.spamhaus.org/drop/edrop.txt"), ("SPAMHAUS-DROPV6", self._import_spamhaus_drop, "https://www.spamhaus.org/drop/dropv6.txt"), # Spamhaus ASNDROP ("SPAMHAUS-ASNDROP", self._import_spamhaus_asndrop, "https://www.spamhaus.org/drop/asndrop.json"), ) # Drop any data from feeds that we don't support (any more) with self.db.transaction(): # Fetch the names of all feeds we support sources = [name for name, *rest in feeds] self.db.execute("DELETE FROM autnum_feeds WHERE NOT source = ANY(%s)", sources) self.db.execute("DELETE FROM network_feeds WHERE NOT source = ANY(%s)", sources) # Walk through all feeds for name, callback, url, *args in feeds: # Skip any feeds that were not requested on the command line if ns.feeds and not name in ns.feeds: continue try: self._process_feed(name, callback, url, *args) # Log an error but continue if an exception occurs except Exception as e: log.error("Error processing feed '%s': %s" % (name, e)) success = False # Return status return 0 if success else 1 def _process_feed(self, name, callback, url, *args): """ Processes one feed """ # Open the URL f = self.downloader.retrieve(url) with self.db.transaction(): # Drop any previous content self.db.execute("DELETE FROM autnum_feeds WHERE source = %s", name) self.db.execute("DELETE FROM network_feeds WHERE source = %s", name) # Call the callback to process the feed return callback(name, f, *args) def _import_aws_ip_ranges(self, name, f): # Parse the feed feed = json.load(f) # Set up a dictionary for mapping a region name to a country. Unfortunately, # there seems to be no machine-readable version available of this other than # https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html # (worse, it seems to be incomplete :-/ ); https://www.cloudping.cloud/endpoints # was helpful here as well. aws_region_country_map = { # Africa "af-south-1" : "ZA", # Asia "il-central-1" : "IL", # Tel Aviv # Asia/Pacific "ap-northeast-1" : "JP", "ap-northeast-2" : "KR", "ap-northeast-3" : "JP", "ap-east-1" : "HK", "ap-south-1" : "IN", "ap-south-2" : "IN", "ap-southeast-1" : "SG", "ap-southeast-2" : "AU", "ap-southeast-3" : "MY", "ap-southeast-4" : "AU", "ap-southeast-5" : "NZ", # Auckland, NZ "ap-southeast-6" : "AP", # XXX: Precise location not documented anywhere # Canada "ca-central-1" : "CA", "ca-west-1" : "CA", # Europe "eu-central-1" : "DE", "eu-central-2" : "CH", "eu-north-1" : "SE", "eu-west-1" : "IE", "eu-west-2" : "GB", "eu-west-3" : "FR", "eu-south-1" : "IT", "eu-south-2" : "ES", # Middle East "me-central-1" : "AE", "me-south-1" : "BH", # South America "sa-east-1" : "BR", # Undocumented, likely located in Berlin rather than Frankfurt "eusc-de-east-1" : "DE", } # Collect a list of all networks prefixes = feed.get("ipv6_prefixes", []) + feed.get("prefixes", []) for prefix in prefixes: # Fetch network network = prefix.get("ipv6_prefix") or prefix.get("ip_prefix") # Parse the network try: network = ipaddress.ip_network(network) except ValuleError as e: log.warning("%s: Unable to parse prefix %s" % (name, network)) continue # Sanitize parsed networks... if not self._check_parsed_network(network): continue # Fetch the region region = prefix.get("region") # Set some defaults cc = None is_anycast = False # Fetch the CC from the dictionary try: cc = aws_region_country_map[region] # If we couldn't find anything, let's try something else... except KeyError as e: # Find anycast networks if region == "GLOBAL": is_anycast = True # Everything that starts with us- is probably in the United States elif region.startswith("us-"): cc = "US" # Everything that starts with cn- is probably China elif region.startswith("cn-"): cc = "CN" # Log a warning for anything else else: log.warning("%s: Could not determine country code for AWS region %s" \ % (name, region)) continue # Write to database self.db.execute(""" INSERT INTO network_feeds ( network, source, country, is_anycast ) VALUES ( %s, %s, %s, %s ) ON CONFLICT (network, source) DO NOTHING """, "%s" % network, name, cc, is_anycast, ) def _import_spamhaus_drop(self, name, f): """ Import Spamhaus DROP IP feeds """ # Count all lines lines = 0 # Walk through all lines for line in f: # Decode line line = line.decode("utf-8") # Strip off any comments line, _, comment = line.partition(";") # Ignore empty lines if not line: continue # Strip any excess whitespace line = line.strip() # Increment line counter lines += 1 # Parse the network try: network = ipaddress.ip_network(line) except ValueError as e: log.warning("%s: Could not parse network: %s - %s" % (name, line, e)) continue # Check network if not self._check_parsed_network(network): log.warning("%s: Skipping bogus network: %s" % (name, network)) continue # Insert into the database self.db.execute(""" INSERT INTO network_feeds ( network, source, is_drop ) VALUES ( %s, %s, %s )""", "%s" % network, name, True, ) # Raise an exception if we could not import anything if not lines: raise RuntimeError("Received bogus feed %s with no data" % name) def _import_spamhaus_asndrop(self, name, f): """ Import Spamhaus ASNDROP feed """ for line in f: # Decode the line line = line.decode("utf-8") # Parse JSON try: line = json.loads(line) except json.JSONDecodeError as e: log.warning("%s: Unable to parse JSON object %s: %s" % (name, line, e)) continue # Fetch type type = line.get("type") # Skip any metadata if type == "metadata": continue # Fetch ASN asn = line.get("asn") # Skip any lines without an ASN if not asn: continue # Filter invalid ASNs if not self._check_parsed_asn(asn): log.warning("%s: Skipping bogus ASN %s" % (name, asn)) continue # Write to database self.db.execute(""" INSERT INTO autnum_feeds ( number, source, is_drop ) VALUES ( %s, %s, %s )""", "%s" % asn, name, True, ) @staticmethod def _parse_bool(block, key): val = block.get(key) # There is no point to proceed when we got None if val is None: return # Convert to lowercase val = val.lower() # True if val in ("yes", "1"): return True # False if val in ("no", "0"): return False # Default to None return None def handle_import_countries(self, ns): with self.db.transaction(): # Drop all data that we have self.db.execute("TRUNCATE TABLE countries") for file in ns.file: for line in file: line = line.rstrip() # Ignore any comments if line.startswith("#"): continue try: country_code, continent_code, name = line.split(maxsplit=2) except: log.warning("Could not parse line: %s" % line) continue self.db.execute("INSERT INTO countries(country_code, name, continent_code) \ VALUES(%s, %s, %s) ON CONFLICT DO NOTHING", country_code, name, continent_code) def split_line(line): key, colon, val = line.partition(":") # Strip any excess space key = key.strip() val = val.strip() return key, val def read_blocks(f): for block in iterate_over_blocks(f): type = None data = {} for i, line in enumerate(block): key, value = line.split(":", 1) # The key of the first line defines the type if i == 0: type = key # Store value data[key] = value.strip() yield type, data def iterate_over_blocks(f, charsets=("utf-8", "latin1")): block = [] for line in f: # Skip commented lines if line.startswith(b"#") or line.startswith(b"%"): continue # Convert to string for charset in charsets: try: line = line.decode(charset) except UnicodeDecodeError: continue else: break # Remove any comments at the end of line line, hash, comment = line.partition("#") # Strip any whitespace at the end of the line line = line.rstrip() # If we cut off some comment and the line is empty, we can skip it if comment and not line: continue # If the line has some content, keep collecting it if line: block.append(line) continue # End the block on an empty line if block: yield block # Reset the block block = [] # Return the last block if block: yield block def iterate_over_lines(f): for line in f: # Decode the line line = line.decode() # Strip the ending yield line.rstrip() def main(): # Run the command line interface c = CLI() c.run() main()