From: Michael Tremer Date: Fri, 12 Aug 2022 15:39:37 +0000 (+0000) Subject: importer: Import each source individually X-Git-Tag: 0.9.14~5 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=3ce28dea019c37425964615110185994ecc367fc;p=location%2Flibloc.git importer: Import each source individually When importing source data fails for one source, we won't import anything at all. This patch changes that behaviour so that we will import all data one after the other but continue if there was a problem with one source. Signed-off-by: Michael Tremer --- diff --git a/src/python/location/importer.py b/src/python/location/importer.py index a5776f7..96f2218 100644 --- a/src/python/location/importer.py +++ b/src/python/location/importer.py @@ -96,6 +96,9 @@ EXTENDED_SOURCES = { # ], } +# List all sources +SOURCES = set(WHOIS_SOURCES|EXTENDED_SOURCES) + class Downloader(object): def __init__(self): self.proxy = None diff --git a/src/scripts/location-importer.in b/src/scripts/location-importer.in index b242275..b9e3511 100644 --- a/src/scripts/location-importer.in +++ b/src/scripts/location-importer.in @@ -26,6 +26,7 @@ import re import socket import sys import telnetlib +import urllib.error # Load our location module import location @@ -403,125 +404,137 @@ class CLI(object): def handle_update_whois(self, ns): downloader = location.importer.Downloader() - # Download all sources - with self.db.transaction(): - # Create some temporary tables to store parsed data - self.db.execute(""" - CREATE TEMPORARY TABLE _autnums(number integer NOT NULL, organization text NOT NULL, source text NOT NULL) - ON COMMIT DROP; - CREATE UNIQUE INDEX _autnums_number ON _autnums(number); - - CREATE TEMPORARY TABLE _organizations(handle text NOT NULL, name text NOT NULL, source text NOT NULL) - ON COMMIT DROP; - CREATE UNIQUE INDEX _organizations_handle ON _organizations(handle); - - CREATE TEMPORARY TABLE _rirdata(network inet NOT NULL, country text NOT NULL, original_countries text[] NOT NULL, source text NOT NULL) - ON COMMIT DROP; - CREATE INDEX _rirdata_search ON _rirdata USING BTREE(family(network), masklen(network)); - CREATE UNIQUE INDEX _rirdata_network ON _rirdata(network); - """) - - # Remove all previously imported content - self.db.execute(""" - TRUNCATE TABLE networks; - """) - - # Fetch all valid country codes to check parsed networks aganist... - rows = self.db.query("SELECT * FROM countries ORDER BY country_code") - validcountries = [] - - for row in rows: - validcountries.append(row.country_code) + # Did we run successfully? + error = False - for source_key in location.importer.WHOIS_SOURCES: - for single_url in location.importer.WHOIS_SOURCES[source_key]: - for block in downloader.request_blocks(single_url): - self._parse_block(block, source_key, validcountries) + # Fetch all valid country codes to check parsed networks aganist + validcountries = self.countries - # Process all parsed networks from every RIR we happen to have access to, - # insert the largest network chunks into the networks table immediately... - families = self.db.query("SELECT DISTINCT family(network) AS family FROM _rirdata ORDER BY family(network)") - - for family in (row.family for row in families): - smallest = self.db.get("SELECT MIN(masklen(network)) AS prefix FROM _rirdata WHERE family(network) = %s", family) + # Iterate over all potential sources + for source in location.importer.SOURCES: + with self.db.transaction(): + # Create some temporary tables to store parsed data + self.db.execute(""" + CREATE TEMPORARY TABLE _autnums(number integer NOT NULL, + organization text NOT NULL, source text NOT NULL) ON COMMIT DROP; + CREATE UNIQUE INDEX _autnums_number ON _autnums(number); + + CREATE TEMPORARY TABLE _organizations(handle text NOT NULL, + name text NOT NULL, source text NOT NULL) ON COMMIT DROP; + CREATE UNIQUE INDEX _organizations_handle ON _organizations(handle); + + CREATE TEMPORARY TABLE _rirdata(network inet NOT NULL, country text NOT NULL, + original_countries text[] NOT NULL, source text NOT NULL) + ON COMMIT DROP; + CREATE INDEX _rirdata_search ON _rirdata + USING BTREE(family(network), masklen(network)); + CREATE UNIQUE INDEX _rirdata_network ON _rirdata(network); + """) - self.db.execute("INSERT INTO networks(network, country, original_countries, source) \ - SELECT network, country, original_countries, source FROM _rirdata WHERE masklen(network) = %s AND family(network) = %s", smallest.prefix, family) + # Remove all previously imported content + self.db.execute("DELETE FROM networks WHERE source = %s", source) - # ... determine any other prefixes for this network family, ... - prefixes = self.db.query("SELECT DISTINCT masklen(network) AS prefix FROM _rirdata \ - WHERE family(network) = %s ORDER BY masklen(network) ASC OFFSET 1", family) + try: + # Fetch WHOIS sources + for url in location.importer.WHOIS_SOURCES.get(source, []): + for block in downloader.request_blocks(url): + self._parse_block(block, source, validcountries) + + # Fetch extended sources + for url in location.importer.EXTENDED_SOURCES.get(source, []): + for line in downloader.request_lines(url): + self._parse_line(line, source, validcountries) + except urllib.error.URLError as e: + log.error("Could not retrieve data from %s: %s" % (source, e) + error = True + + # Continue with the next source + continue - # ... and insert networks with this prefix in case they provide additional - # information (i. e. subnet of a larger chunk with a different country) - for prefix in (row.prefix for row in prefixes): - self.db.execute(""" - WITH candidates AS ( + # Process all parsed networks from every RIR we happen to have access to, + # insert the largest network chunks into the networks table immediately... + families = self.db.query("SELECT DISTINCT family(network) AS family FROM _rirdata \ + ORDER BY family(network)") + + for family in (row.family for row in families): + smallest = self.db.get("SELECT MIN(masklen(network)) AS prefix FROM _rirdata \ + WHERE family(network) = %s", family) + + self.db.execute("INSERT INTO networks(network, country, original_countries, source) \ + SELECT network, country, original_countries, source FROM _rirdata \ + WHERE masklen(network) = %s AND family(network) = %s", smallest.prefix, family) + + # ... determine any other prefixes for this network family, ... + prefixes = self.db.query("SELECT DISTINCT masklen(network) AS prefix FROM _rirdata \ + WHERE family(network) = %s ORDER BY masklen(network) ASC OFFSET 1", family) + + # ... and insert networks with this prefix in case they provide additional + # information (i. e. subnet of a larger chunk with a different country) + for prefix in (row.prefix for row in prefixes): + self.db.execute(""" + WITH candidates AS ( + SELECT + _rirdata.network, + _rirdata.country, + _rirdata.original_countries, + _rirdata.source + FROM + _rirdata + WHERE + family(_rirdata.network) = %s + AND + masklen(_rirdata.network) = %s + ), + filtered AS ( + SELECT + DISTINCT ON (c.network) + c.network, + c.country, + c.original_countries, + c.source, + masklen(networks.network), + networks.country AS parent_country + FROM + candidates c + LEFT JOIN + networks + ON + c.network << networks.network + ORDER BY + c.network, + masklen(networks.network) DESC NULLS LAST + ) + INSERT INTO + networks(network, country, original_countries, source) SELECT - _rirdata.network, - _rirdata.country, - _rirdata.original_countries, - _rirdata.source + network, + country, + original_countries, + source FROM - _rirdata + filtered WHERE - family(_rirdata.network) = %s - AND - masklen(_rirdata.network) = %s - ), - filtered AS ( - SELECT - DISTINCT ON (c.network) - c.network, - c.country, - c.original_countries, - c.source, - masklen(networks.network), - networks.country AS parent_country - FROM - candidates c - LEFT JOIN - networks - ON - c.network << networks.network - ORDER BY - c.network, - masklen(networks.network) DESC NULLS LAST + parent_country IS NULL + OR + country <> parent_country + ON CONFLICT DO NOTHING""", + family, prefix, ) - INSERT INTO - networks(network, country, original_countries, source) - SELECT - network, - country, - original_countries, - source - FROM - filtered - WHERE - parent_country IS NULL - OR - country <> parent_country - ON CONFLICT DO NOTHING""", - family, prefix, - ) - - self.db.execute(""" - INSERT INTO autnums(number, name, source) - SELECT _autnums.number, _organizations.name, _organizations.source FROM _autnums - JOIN _organizations ON _autnums.organization = _organizations.handle - ON CONFLICT (number) DO UPDATE SET name = excluded.name; - """) - # Download all extended sources - for source_key in location.importer.EXTENDED_SOURCES: - for single_url in location.importer.EXTENDED_SOURCES[source_key]: - # Download data - for line in downloader.request_lines(single_url): - self._parse_line(line, source_key, validcountries) + self.db.execute(""" + INSERT INTO autnums(number, name, source) + SELECT _autnums.number, _organizations.name, _organizations.source FROM _autnums + JOIN _organizations ON _autnums.organization = _organizations.handle + ON CONFLICT (number) DO UPDATE SET name = excluded.name; + """) - # Download and import (technical) AS names from ARIN + # Download and import (technical) AS names from ARIN + with self.db.transaction(): self._import_as_names_from_arin() + # Return a non-zero exit code for errors + return 1 if error else 0 + def _check_parsed_network(self, network): """ Assistive function to detect and subsequently sort out parsed @@ -1501,6 +1514,14 @@ class CLI(object): # Default to None return None + @property + def countries(self): + # Fetch all valid country codes to check parsed networks aganist + rows = self.db.query("SELECT * FROM countries ORDER BY country_code") + + # Return all countries + return [row.country_code for row in rows] + def handle_import_countries(self, ns): with self.db.transaction(): # Drop all data that we have