From: Michael Tremer Date: Mon, 4 Mar 2024 11:42:04 +0000 (+0000) Subject: importer: Create a better structure to import RIRs X-Git-Tag: 0.9.18~110 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=8c2dbbc25149a2eda4ebb991ed44b56774144b97;p=people%2Fms%2Flibloc.git importer: Create a better structure to import RIRs All information about all RIRs will now be imported in one large database transaction per RIR which should bring us better integrity and help us to phase out any stale data. Signed-off-by: Michael Tremer --- diff --git a/src/python/location/importer.py b/src/python/location/importer.py index e581180..24bcd11 100644 --- a/src/python/location/importer.py +++ b/src/python/location/importer.py @@ -25,79 +25,6 @@ import urllib.request log = logging.getLogger("location.importer") log.propagate = 1 -WHOIS_SOURCES = { - # African Network Information Centre - "AFRINIC": [ - "https://ftp.afrinic.net/pub/pub/dbase/afrinic.db.gz" - ], - - # Asia Pacific Network Information Centre - "APNIC": [ - "https://ftp.apnic.net/apnic/whois/apnic.db.inet6num.gz", - "https://ftp.apnic.net/apnic/whois/apnic.db.inetnum.gz", - #"https://ftp.apnic.net/apnic/whois/apnic.db.route6.gz", - #"https://ftp.apnic.net/apnic/whois/apnic.db.route.gz", - "https://ftp.apnic.net/apnic/whois/apnic.db.aut-num.gz", - "https://ftp.apnic.net/apnic/whois/apnic.db.organisation.gz" - ], - - # American Registry for Internet Numbers - # XXX there is nothing useful for us in here - # ARIN: [ - # "https://ftp.arin.net/pub/rr/arin.db" - # ], - - # Japan Network Information Center - "JPNIC": [ - "https://ftp.nic.ad.jp/jpirr/jpirr.db.gz" - ], - - # Latin America and Caribbean Network Information Centre - "LACNIC": [ - "https://ftp.lacnic.net/lacnic/dbase/lacnic.db.gz" - ], - - # Réseaux IP Européens - "RIPE": [ - "https://ftp.ripe.net/ripe/dbase/split/ripe.db.inet6num.gz", - "https://ftp.ripe.net/ripe/dbase/split/ripe.db.inetnum.gz", - #"https://ftp.ripe.net/ripe/dbase/split/ripe.db.route6.gz", - #"https://ftp.ripe.net/ripe/dbase/split/ripe.db.route.gz", - "https://ftp.ripe.net/ripe/dbase/split/ripe.db.aut-num.gz", - "https://ftp.ripe.net/ripe/dbase/split/ripe.db.organisation.gz" - ], -} - -EXTENDED_SOURCES = { - # African Network Information Centre - # "ARIN": [ - # "https://ftp.afrinic.net/pub/stats/afrinic/delegated-afrinic-extended-latest" - # ], - - # Asia Pacific Network Information Centre - # "APNIC": [ - # "https://ftp.apnic.net/apnic/stats/apnic/delegated-apnic-extended-latest" - # ], - - # American Registry for Internet Numbers - "ARIN": [ - "https://ftp.arin.net/pub/stats/arin/delegated-arin-extended-latest" - ], - - # Latin America and Caribbean Network Information Centre - "LACNIC": [ - "https://ftp.lacnic.net/pub/stats/lacnic/delegated-lacnic-extended-latest" - ], - - # Réseaux IP Européens - # "RIPE": [ - # "https://ftp.ripe.net/pub/stats/ripencc/delegated-ripencc-extended-latest" - # ], -} - -# List all sources -SOURCES = set(WHOIS_SOURCES|EXTENDED_SOURCES) - class Downloader(object): def __init__(self): self.proxy = None diff --git a/src/scripts/location-importer.in b/src/scripts/location-importer.in index 41ff72c..99bc026 100644 --- a/src/scripts/location-importer.in +++ b/src/scripts/location-importer.in @@ -692,7 +692,47 @@ class CLI(object): downloader = location.importer.Downloader() # Did we run successfully? - error = False + success = True + + sources = ( + # African Network Information Centre + ("AFRINIC", ( + (self._import_standard_format, "https://ftp.afrinic.net/pub/pub/dbase/afrinic.db.gz"), + )), + + # Asia Pacific Network Information Centre + ("APNIC", ( + (self._import_standard_format, "https://ftp.apnic.net/apnic/whois/apnic.db.inet6num.gz"), + (self._import_standard_format, "https://ftp.apnic.net/apnic/whois/apnic.db.inetnum.gz"), + (self._import_standard_format, "https://ftp.apnic.net/apnic/whois/apnic.db.aut-num.gz"), + (self._import_standard_format, "https://ftp.apnic.net/apnic/whois/apnic.db.organisation.gz"), + )), + + # American Registry for Internet Numbers + ("ARIN", ( + (self._import_extended_format, "https://ftp.arin.net/pub/stats/arin/delegated-arin-extended-latest"), + (self._import_arin_as_names, "https://ftp.arin.net/pub/resource_registry_service/asns.csv"), + )), + + # Japan Network Information Center + ("JPNIC", ( + (self._import_standard_format, "https://ftp.nic.ad.jp/jpirr/jpirr.db.gz"), + )), + + # Latin America and Caribbean Network Information Centre + ("LACNIC", ( + (self._import_standard_format, "https://ftp.lacnic.net/lacnic/dbase/lacnic.db.gz"), + (self._import_extended_format, "https://ftp.lacnic.net/pub/stats/lacnic/delegated-lacnic-extended-latest"), + )), + + # Réseaux IP Européens + ("RIPE", ( + (self._import_standard_format, "https://ftp.ripe.net/ripe/dbase/split/ripe.db.inet6num.gz"), + (self._import_standard_format, "https://ftp.ripe.net/ripe/dbase/split/ripe.db.inetnum.gz"), + (self._import_standard_format, "https://ftp.ripe.net/ripe/dbase/split/ripe.db.aut-num.gz"), + (self._import_standard_format, "https://ftp.ripe.net/ripe/dbase/split/ripe.db.organisation.gz"), + )), + ) # Fetch all valid country codes to check parsed networks against countries = self.fetch_countries() @@ -703,170 +743,257 @@ class CLI(object): return 1 # Iterate over all potential sources - for source in sorted(location.importer.SOURCES): + for name, feeds in sources: # Skip anything that should not be updated - if ns.sources and not source in ns.sources: + if ns.sources and not name in ns.sources: continue - with self.db.transaction(): - # Create some temporary tables to store parsed data - self.db.execute(""" - CREATE TEMPORARY TABLE _autnums(number integer NOT NULL, - organization text NOT NULL, source text NOT NULL) ON COMMIT DROP; - CREATE UNIQUE INDEX _autnums_number ON _autnums(number); - - CREATE TEMPORARY TABLE _organizations(handle text NOT NULL, - name text NOT NULL, source text NOT NULL) ON COMMIT DROP; - CREATE UNIQUE INDEX _organizations_handle ON _organizations(handle); - - CREATE TEMPORARY TABLE _rirdata(network inet NOT NULL, country text, - original_countries text[] NOT NULL, source text NOT NULL) - ON COMMIT DROP; - CREATE INDEX _rirdata_search ON _rirdata - USING BTREE(family(network), masklen(network)); - CREATE UNIQUE INDEX _rirdata_network ON _rirdata(network); - """) - - # Remove all previously imported content - self.db.execute("DELETE FROM autnums WHERE source = %s", source) - self.db.execute("DELETE FROM networks WHERE source = %s", source) + try: + self._process_source(downloader, name, feeds, countries) - try: - # Fetch WHOIS sources - for url in location.importer.WHOIS_SOURCES.get(source, []): - for block in downloader.request_blocks(url): - self._parse_block(block, source, countries) - - # Fetch extended sources - for url in location.importer.EXTENDED_SOURCES.get(source, []): - for line in downloader.request_lines(url): - self._parse_line(line, source, countries) - except urllib.error.URLError as e: - log.error("Could not retrieve data from %s: %s" % (source, e)) - error = True - - # Continue with the next source - continue + # Log an error but continue if an exception occurs + except Exception as e: + log.error("Error processing source %s" % name, exc_info=True) + success = False + + # Return a non-zero exit code for errors + return 0 if success else 1 + + def _process_source(self, downloader, source, feeds, countries): + """ + This function processes one source + """ + # Wrap everything into one large transaction + with self.db.transaction(): + # Remove all previously imported content + self.db.execute("DELETE FROM autnums WHERE source = %s", source) + self.db.execute("DELETE FROM networks WHERE source = %s", source) + + # Create some temporary tables to store parsed data + self.db.execute(""" + CREATE TEMPORARY TABLE _autnums(number integer NOT NULL, + organization text NOT NULL, source text NOT NULL) ON COMMIT DROP; + CREATE UNIQUE INDEX _autnums_number ON _autnums(number); + + CREATE TEMPORARY TABLE _organizations(handle text NOT NULL, + name text NOT NULL, source text NOT NULL) ON COMMIT DROP; + CREATE UNIQUE INDEX _organizations_handle ON _organizations(handle); + + CREATE TEMPORARY TABLE _rirdata(network inet NOT NULL, country text, + original_countries text[] NOT NULL, source text NOT NULL) + ON COMMIT DROP; + CREATE INDEX _rirdata_search ON _rirdata + USING BTREE(family(network), masklen(network)); + CREATE UNIQUE INDEX _rirdata_network ON _rirdata(network); + """) + + # Parse all feeds + for callback, url, *args in feeds: + # Retrieve the feed + f = downloader.retrieve(url) + + # Call the callback + callback(source, countries, f, *args) + + # Process all parsed networks from every RIR we happen to have access to, + # insert the largest network chunks into the networks table immediately... + families = self.db.query(""" + SELECT DISTINCT + family(network) AS family + FROM + _rirdata + ORDER BY + family(network) + """, + ) - # Process all parsed networks from every RIR we happen to have access to, - # insert the largest network chunks into the networks table immediately... - families = self.db.query(""" - SELECT DISTINCT - family(network) AS family + for family in (row.family for row in families): + # Fetch the smallest mask length in our data set + smallest = self.db.get(""" + SELECT + MIN( + masklen(network) + ) AS prefix FROM _rirdata - ORDER BY - family(network) - """, + WHERE + family(network) = %s + """, family, ) - for family in (row.family for row in families): - # Fetch the smallest mask length in our data set - smallest = self.db.get(""" - SELECT - MIN( - masklen(network) - ) AS prefix - FROM - _rirdata - WHERE - family(network) = %s - """, family, + # Copy all networks + self.db.execute(""" + INSERT INTO + networks + ( + network, + country, + original_countries, + source ) + SELECT + network, + country, + original_countries, + source + FROM + _rirdata + WHERE + masklen(network) = %s + AND + family(network) = %s + ON CONFLICT DO + NOTHING""", + smallest.prefix, + family, + ) - # Copy all networks + # ... determine any other prefixes for this network family, ... + prefixes = self.db.query(""" + SELECT + DISTINCT masklen(network) AS prefix + FROM + _rirdata + WHERE + family(network) = %s + ORDER BY + masklen(network) ASC + OFFSET 1 + """, family, + ) + + # ... and insert networks with this prefix in case they provide additional + # information (i. e. subnet of a larger chunk with a different country) + for prefix in (row.prefix for row in prefixes): self.db.execute(""" - INSERT INTO - networks - ( - network, - country, - original_countries, - source + WITH candidates AS ( + SELECT + _rirdata.network, + _rirdata.country, + _rirdata.original_countries, + _rirdata.source + FROM + _rirdata + WHERE + family(_rirdata.network) = %s + AND + masklen(_rirdata.network) = %s + ), + filtered AS ( + SELECT + DISTINCT ON (c.network) + c.network, + c.country, + c.original_countries, + c.source, + masklen(networks.network), + networks.country AS parent_country + FROM + candidates c + LEFT JOIN + networks + ON + c.network << networks.network + ORDER BY + c.network, + masklen(networks.network) DESC NULLS LAST ) + INSERT INTO + networks(network, country, original_countries, source) SELECT network, country, original_countries, source FROM - _rirdata + filtered WHERE - masklen(network) = %s - AND - family(network) = %s - ON CONFLICT DO - NOTHING""", - smallest.prefix, - family, + parent_country IS NULL + OR + country <> parent_country + ON CONFLICT DO NOTHING + """, family, prefix, ) - # ... determine any other prefixes for this network family, ... - prefixes = self.db.query(""" - SELECT - DISTINCT masklen(network) AS prefix - FROM - _rirdata - WHERE - family(network) = %s - ORDER BY - masklen(network) ASC - OFFSET 1 - """, family, - ) + self.db.execute(""" + INSERT INTO + autnums + ( + number, + name, + source + ) + SELECT + _autnums.number, + _organizations.name, + _organizations.source + FROM + _autnums + JOIN + _organizations ON _autnums.organization = _organizations.handle + ON CONFLICT + ( + number + ) + DO UPDATE + SET name = excluded.name + """, + ) - # ... and insert networks with this prefix in case they provide additional - # information (i. e. subnet of a larger chunk with a different country) - for prefix in (row.prefix for row in prefixes): - self.db.execute(""" - WITH candidates AS ( - SELECT - _rirdata.network, - _rirdata.country, - _rirdata.original_countries, - _rirdata.source - FROM - _rirdata - WHERE - family(_rirdata.network) = %s - AND - masklen(_rirdata.network) = %s - ), - filtered AS ( - SELECT - DISTINCT ON (c.network) - c.network, - c.country, - c.original_countries, - c.source, - masklen(networks.network), - networks.country AS parent_country - FROM - candidates c - LEFT JOIN - networks - ON - c.network << networks.network - ORDER BY - c.network, - masklen(networks.network) DESC NULLS LAST - ) - INSERT INTO - networks(network, country, original_countries, source) - SELECT - network, - country, - original_countries, - source - FROM - filtered - WHERE - parent_country IS NULL - OR - country <> parent_country - ON CONFLICT DO NOTHING - """, family, prefix, - ) + def _import_standard_format(self, source, countries, f, *args): + """ + Imports a single standard format source feed + """ + # Iterate over all blocks + for block in location.importer.iterate_over_blocks(f): + self._parse_block(block, source, countries) + + def _import_extended_format(self, source, countries, f, *args): + # Iterate over all lines + for line in location.importer.iterate_over_lines(f): + self._parse_line(block, source, countries) + + def _import_arin_as_names(self, source, countries, f, *args): + # Walk through the file + for line in csv.DictReader(feed, dialect="arin"): + log.debug("Processing object: %s" % line) + + # Fetch status + status = line.get("Status") + + # We are only interested in anything managed by ARIN + if not status == "Full Registry Services": + continue + + # Fetch organization name + name = line.get("Org Name") + + # Extract ASNs + first_asn = line.get("Start AS Number") + last_asn = line.get("End AS Number") + + # Cast to a number + try: + first_asn = int(first_asn) + except TypeError as e: + log.warning("Could not parse ASN '%s'" % first_asn) + continue + + try: + last_asn = int(last_asn) + except TypeError as e: + log.warning("Could not parse ASN '%s'" % last_asn) + continue + + # Check if the range is valid + if last_asn < first_asn: + log.warning("Invalid ASN range %s-%s" % (first_asn, last_asn)) + + # Insert everything into the database + for asn in range(first_asn, last_asn + 1): + if not self._check_parsed_asn(asn): + log.warning("Skipping invalid ASN %s" % asn) + continue self.db.execute(""" INSERT INTO @@ -876,30 +1003,18 @@ class CLI(object): name, source ) - SELECT - _autnums.number, - _organizations.name, - _organizations.source - FROM - _autnums - JOIN - _organizations ON _autnums.organization = _organizations.handle + VALUES + ( + %s, %s, %s + ) ON CONFLICT ( number ) - DO UPDATE - SET name = excluded.name - """, + DO NOTHING + """, asn, name, "ARIN", ) - # Download and import (technical) AS names from ARIN - with self.db.transaction(): - self._import_as_names_from_arin(downloader) - - # Return a non-zero exit code for errors - return 1 if error else 0 - def _check_parsed_network(self, network): """ Assistive function to detect and subsequently sort out parsed @@ -1028,12 +1143,8 @@ class CLI(object): ) def _parse_inetnum_block(self, block, source_key, countries): - log.debug("Parsing inetnum block:") - inetnum = {} for line in block: - log.debug(line) - # Split line key, val = split_line(line) @@ -1307,74 +1418,6 @@ class CLI(object): """, "%s" % network, country_code, [country], source_key, ) - def _import_as_names_from_arin(self, downloader): - # Delete all previously imported content - self.db.execute("DELETE FROM autnums WHERE source = %s", "ARIN") - - # Try to retrieve the feed from ftp.arin.net - feed = downloader.request_lines("https://ftp.arin.net/pub/resource_registry_service/asns.csv") - - # Walk through the file - for line in csv.DictReader(feed, dialect="arin"): - log.debug("Processing object: %s" % line) - - # Fetch status - status = line.get("Status") - - # We are only interested in anything managed by ARIN - if not status == "Full Registry Services": - continue - - # Fetch organization name - name = line.get("Org Name") - - # Extract ASNs - first_asn = line.get("Start AS Number") - last_asn = line.get("End AS Number") - - # Cast to a number - try: - first_asn = int(first_asn) - except TypeError as e: - log.warning("Could not parse ASN '%s'" % first_asn) - continue - - try: - last_asn = int(last_asn) - except TypeError as e: - log.warning("Could not parse ASN '%s'" % last_asn) - continue - - # Check if the range is valid - if last_asn < first_asn: - log.warning("Invalid ASN range %s-%s" % (first_asn, last_asn)) - - # Insert everything into the database - for asn in range(first_asn, last_asn + 1): - if not self._check_parsed_asn(asn): - log.warning("Skipping invalid ASN %s" % asn) - continue - - self.db.execute(""" - INSERT INTO - autnums - ( - number, - name, - source - ) - VALUES - ( - %s, %s, %s - ) - ON CONFLICT - ( - number - ) - DO NOTHING - """, asn, name, "ARIN", - ) - def handle_update_announcements(self, ns): server = ns.server[0]