2 ###############################################################################
4 # libloc - A library to determine the location of someone on the Internet #
6 # Copyright (C) 2020-2024 IPFire Development Team <info@ipfire.org> #
8 # This library is free software; you can redistribute it and/or #
9 # modify it under the terms of the GNU Lesser General Public #
10 # License as published by the Free Software Foundation; either #
11 # version 2.1 of the License, or (at your option) any later version. #
13 # This library is distributed in the hope that it will be useful, #
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of #
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU #
16 # Lesser General Public License for more details. #
18 ###############################################################################
35 # Load our location module
37 import location
.database
38 from location
.downloader
import Downloader
39 from location
.i18n
import _
42 log
= logging
.getLogger("location.importer")
52 TRANSLATED_COUNTRIES
= {
53 # When people say UK, they mean GB
57 IGNORED_COUNTRIES
= set((
61 # Some people use ZZ to say "no country" or to hide the country
65 # Configure the CSV parser for ARIN
66 csv
.register_dialect("arin", delimiter
=",", quoting
=csv
.QUOTE_ALL
, quotechar
="\"")
70 parser
= argparse
.ArgumentParser(
71 description
=_("Location Importer Command Line Interface"),
73 subparsers
= parser
.add_subparsers()
75 # Global configuration flags
76 parser
.add_argument("--debug", action
="store_true",
77 help=_("Enable debug output"))
78 parser
.add_argument("--quiet", action
="store_true",
79 help=_("Enable quiet mode"))
82 parser
.add_argument("--version", action
="version",
83 version
="%(prog)s @VERSION@")
86 parser
.add_argument("--database-host", required
=True,
87 help=_("Database Hostname"), metavar
=_("HOST"))
88 parser
.add_argument("--database-name", required
=True,
89 help=_("Database Name"), metavar
=_("NAME"))
90 parser
.add_argument("--database-username", required
=True,
91 help=_("Database Username"), metavar
=_("USERNAME"))
92 parser
.add_argument("--database-password", required
=True,
93 help=_("Database Password"), metavar
=_("PASSWORD"))
96 write
= subparsers
.add_parser("write", help=_("Write database to file"))
97 write
.set_defaults(func
=self
.handle_write
)
98 write
.add_argument("file", nargs
=1, help=_("Database File"))
99 write
.add_argument("--signing-key", nargs
="?", type=open, help=_("Signing Key"))
100 write
.add_argument("--backup-signing-key", nargs
="?", type=open, help=_("Backup Signing Key"))
101 write
.add_argument("--vendor", nargs
="?", help=_("Sets the vendor"))
102 write
.add_argument("--description", nargs
="?", help=_("Sets a description"))
103 write
.add_argument("--license", nargs
="?", help=_("Sets the license"))
104 write
.add_argument("--version", type=int, help=_("Database Format Version"))
107 update_whois
= subparsers
.add_parser("update-whois", help=_("Update WHOIS Information"))
108 update_whois
.add_argument("sources", nargs
="*",
109 help=_("Only update these sources"))
110 update_whois
.set_defaults(func
=self
.handle_update_whois
)
112 # Update announcements
113 update_announcements
= subparsers
.add_parser("update-announcements",
114 help=_("Update BGP Annoucements"))
115 update_announcements
.set_defaults(func
=self
.handle_update_announcements
)
116 update_announcements
.add_argument("server", nargs
=1,
117 help=_("Route Server to connect to"), metavar
=_("SERVER"))
120 update_geofeeds
= subparsers
.add_parser("update-geofeeds",
121 help=_("Update Geofeeds"))
122 update_geofeeds
.set_defaults(func
=self
.handle_update_geofeeds
)
125 update_feeds
= subparsers
.add_parser("update-feeds",
126 help=_("Update Feeds"))
127 update_feeds
.add_argument("feeds", nargs
="*",
128 help=_("Only update these feeds"))
129 update_feeds
.set_defaults(func
=self
.handle_update_feeds
)
132 update_overrides
= subparsers
.add_parser("update-overrides",
133 help=_("Update overrides"),
135 update_overrides
.add_argument(
136 "files", nargs
="+", help=_("Files to import"),
138 update_overrides
.set_defaults(func
=self
.handle_update_overrides
)
141 import_countries
= subparsers
.add_parser("import-countries",
142 help=_("Import countries"),
144 import_countries
.add_argument("file", nargs
=1, type=argparse
.FileType("r"),
145 help=_("File to import"))
146 import_countries
.set_defaults(func
=self
.handle_import_countries
)
148 args
= parser
.parse_args()
152 location
.logger
.set_level(logging
.DEBUG
)
154 location
.logger
.set_level(logging
.WARNING
)
156 # Print usage if no action was given
157 if not "func" in args
:
164 # Parse command line arguments
165 args
= self
.parse_cli()
167 # Initialize the downloader
168 self
.downloader
= Downloader()
170 # Initialise database
171 self
.db
= self
._setup
_database
(args
)
174 ret
= await args
.func(args
)
176 # Return with exit code
180 # Otherwise just exit
183 def _setup_database(self
, ns
):
185 Initialise the database
187 # Connect to database
188 db
= location
.database
.Connection(
189 host
=ns
.database_host
, database
=ns
.database_name
,
190 user
=ns
.database_username
, password
=ns
.database_password
,
193 with db
.transaction():
196 CREATE TABLE IF NOT EXISTS announcements(network inet, autnum bigint,
197 first_seen_at timestamp without time zone DEFAULT CURRENT_TIMESTAMP,
198 last_seen_at timestamp without time zone DEFAULT CURRENT_TIMESTAMP);
199 CREATE UNIQUE INDEX IF NOT EXISTS announcements_networks ON announcements(network);
200 CREATE INDEX IF NOT EXISTS announcements_search2 ON announcements
201 USING SPGIST(network inet_ops);
202 ALTER TABLE announcements ALTER COLUMN first_seen_at SET NOT NULL;
203 ALTER TABLE announcements ALTER COLUMN last_seen_at SET NOT NULL;
206 CREATE TABLE IF NOT EXISTS autnums(number bigint, name text NOT NULL);
207 ALTER TABLE autnums ADD COLUMN IF NOT EXISTS source text;
208 CREATE UNIQUE INDEX IF NOT EXISTS autnums_number ON autnums(number);
211 CREATE TABLE IF NOT EXISTS countries(
212 country_code text NOT NULL, name text NOT NULL, continent_code text NOT NULL);
213 CREATE UNIQUE INDEX IF NOT EXISTS countries_country_code ON countries(country_code);
216 CREATE TABLE IF NOT EXISTS networks(network inet, country text);
217 ALTER TABLE networks ADD COLUMN IF NOT EXISTS original_countries text[];
218 ALTER TABLE networks ADD COLUMN IF NOT EXISTS source text;
219 CREATE UNIQUE INDEX IF NOT EXISTS networks_network ON networks(network);
220 CREATE INDEX IF NOT EXISTS networks_search2 ON networks
221 USING SPGIST(network inet_ops);
224 CREATE TABLE IF NOT EXISTS geofeeds(
225 id serial primary key,
227 status integer default null,
228 updated_at timestamp without time zone default null
230 ALTER TABLE geofeeds ADD COLUMN IF NOT EXISTS error text;
231 CREATE UNIQUE INDEX IF NOT EXISTS geofeeds_unique
233 CREATE TABLE IF NOT EXISTS geofeed_networks(
234 geofeed_id integer references geofeeds(id) on delete cascade,
240 CREATE INDEX IF NOT EXISTS geofeed_networks_geofeed_id
241 ON geofeed_networks(geofeed_id);
242 CREATE TABLE IF NOT EXISTS network_geofeeds(network inet, url text);
243 ALTER TABLE network_geofeeds ADD COLUMN IF NOT EXISTS source text NOT NULL;
244 CREATE UNIQUE INDEX IF NOT EXISTS network_geofeeds_unique2
245 ON network_geofeeds(network, url);
246 CREATE INDEX IF NOT EXISTS network_geofeeds_url
247 ON network_geofeeds(url);
250 CREATE TABLE IF NOT EXISTS autnum_feeds(
251 number bigint NOT NULL,
252 source text NOT NULL,
255 is_anonymous_proxy boolean,
256 is_satellite_provider boolean,
260 CREATE UNIQUE INDEX IF NOT EXISTS autnum_feeds_unique
261 ON autnum_feeds(number, source);
263 CREATE TABLE IF NOT EXISTS network_feeds(
264 network inet NOT NULL,
265 source text NOT NULL,
267 is_anonymous_proxy boolean,
268 is_satellite_provider boolean,
272 CREATE UNIQUE INDEX IF NOT EXISTS network_feeds_unique
273 ON network_feeds(network, source);
276 CREATE TABLE IF NOT EXISTS autnum_overrides(
277 number bigint NOT NULL,
280 is_anonymous_proxy boolean,
281 is_satellite_provider boolean,
284 CREATE UNIQUE INDEX IF NOT EXISTS autnum_overrides_number
285 ON autnum_overrides(number);
286 ALTER TABLE autnum_overrides ADD COLUMN IF NOT EXISTS is_drop boolean;
287 ALTER TABLE autnum_overrides DROP COLUMN IF EXISTS source;
289 CREATE TABLE IF NOT EXISTS network_overrides(
290 network inet NOT NULL,
292 is_anonymous_proxy boolean,
293 is_satellite_provider boolean,
296 CREATE UNIQUE INDEX IF NOT EXISTS network_overrides_network
297 ON network_overrides(network);
298 ALTER TABLE network_overrides ADD COLUMN IF NOT EXISTS is_drop boolean;
299 ALTER TABLE network_overrides DROP COLUMN IF EXISTS source;
301 -- Cleanup things we no longer need
302 DROP TABLE IF EXISTS geofeed_overrides;
303 DROP INDEX IF EXISTS announcements_family;
304 DROP INDEX IF EXISTS announcements_search;
305 DROP INDEX IF EXISTS geofeed_networks_search;
306 DROP INDEX IF EXISTS networks_family;
307 DROP INDEX IF EXISTS networks_search;
308 DROP INDEX IF EXISTS network_feeds_search;
309 DROP INDEX IF EXISTS network_geofeeds_unique;
310 DROP INDEX IF EXISTS network_geofeeds_search;
311 DROP INDEX IF EXISTS network_overrides_search;
316 def fetch_countries(self
):
318 Returns a list of all countries on the list
320 # Fetch all valid country codes to check parsed networks aganist...
321 countries
= self
.db
.query("SELECT country_code FROM countries ORDER BY country_code")
323 return set((country
.country_code
for country
in countries
))
325 async def handle_write(self
, ns
):
327 Compiles a database in libloc format out of what is in the database
330 writer
= location
.Writer(ns
.signing_key
, ns
.backup_signing_key
)
334 writer
.vendor
= ns
.vendor
337 writer
.description
= ns
.description
340 writer
.license
= ns
.license
342 # Analyze everything for the query planner hopefully making better decisions
343 self
.db
.execute("ANALYZE")
345 # Add all Autonomous Systems
346 log
.info("Writing Autonomous Systems...")
348 # Select all ASes with a name
349 rows
= self
.db
.query("""
351 autnums.number AS number,
359 autnum_overrides overrides ON autnums.number = overrides.number
365 # Skip AS without names
369 a
= writer
.add_as(row
.number
)
373 log
.info("Writing networks...")
375 # Create a new temporary table where we collect
376 # the networks that we are interested in
378 CREATE TEMPORARY TABLE
381 network inet NOT NULL,
384 is_anonymous_proxy boolean,
385 is_satellite_provider boolean,
389 WITH (FILLFACTOR = 50)
392 # Add all known networks
434 # Create an index to search through networks faster
445 self
.db
.execute("ANALYZE n")
447 # Apply the AS number to all networks
449 -- Join all networks together with their most specific announcements
450 WITH announcements AS (
453 announcements.autnum,
455 -- Sort all merges and number them so
456 -- that we can later select the best one
463 masklen(announcements.network) DESC
470 announcements.network >>= n.network
477 autnum = announcements.autnum
481 announcements.network = n.network
483 announcements.row = 1
487 # Apply country information
500 masklen(networks.network) DESC
507 networks.network >>= n.network
513 country = networks.country
517 networks.network = n.network
523 # Add all country information from Geofeeds
527 DISTINCT ON (geofeed_networks.network)
528 geofeed_networks.network,
529 geofeed_networks.country
533 network_geofeeds networks
535 geofeeds.url = networks.url
539 geofeeds.id = geofeed_networks.geofeed_id
541 networks.network >>= geofeed_networks.network
555 masklen(geofeeds.network) DESC
562 geofeeds.network >>= n.network
568 country = networks.country
572 networks.network = n.network
578 # Apply country and flags from feeds
583 network_feeds.country,
586 network_feeds.is_anonymous_proxy,
587 network_feeds.is_satellite_provider,
588 network_feeds.is_anycast,
589 network_feeds.is_drop,
597 masklen(network_feeds.network) DESC
604 network_feeds.network >>= n.network
611 COALESCE(networks.country, n.country),
614 COALESCE(networks.is_anonymous_proxy, n.is_anonymous_proxy),
616 is_satellite_provider =
617 COALESCE(networks.is_satellite_provider, n.is_satellite_provider),
620 COALESCE(networks.is_anycast, n.is_anycast),
623 COALESCE(networks.is_drop, n.is_drop)
627 networks.network = n.network
633 # Apply country and flags from AS feeds
638 autnum_feeds.country,
641 autnum_feeds.is_anonymous_proxy,
642 autnum_feeds.is_satellite_provider,
643 autnum_feeds.is_anycast,
650 autnum_feeds.number = n.autnum
657 COALESCE(networks.country, n.country),
660 COALESCE(networks.is_anonymous_proxy, n.is_anonymous_proxy),
662 is_satellite_provider =
663 COALESCE(networks.is_satellite_provider, n.is_satellite_provider),
666 COALESCE(networks.is_anycast, n.is_anycast),
669 COALESCE(networks.is_drop, n.is_drop)
673 networks.network = n.network
676 # Apply network overrides
681 network_overrides.country,
684 network_overrides.is_anonymous_proxy,
685 network_overrides.is_satellite_provider,
686 network_overrides.is_anycast,
687 network_overrides.is_drop,
695 masklen(network_overrides.network) DESC
702 network_overrides.network >>= n.network
709 COALESCE(networks.country, n.country),
712 COALESCE(networks.is_anonymous_proxy, n.is_anonymous_proxy),
714 is_satellite_provider =
715 COALESCE(networks.is_satellite_provider, n.is_satellite_provider),
718 COALESCE(networks.is_anycast, n.is_anycast),
721 COALESCE(networks.is_drop, n.is_drop)
725 networks.network = n.network
735 autnum_overrides.country,
738 autnum_overrides.is_anonymous_proxy,
739 autnum_overrides.is_satellite_provider,
740 autnum_overrides.is_anycast,
741 autnum_overrides.is_drop
747 autnum_overrides.number = n.autnum
754 COALESCE(networks.country, n.country),
757 COALESCE(networks.is_anonymous_proxy, n.is_anonymous_proxy),
759 is_satellite_provider =
760 COALESCE(networks.is_satellite_provider, n.is_satellite_provider),
763 COALESCE(networks.is_anycast, n.is_anycast),
766 COALESCE(networks.is_drop, n.is_drop)
770 networks.network = n.network
773 # Here we could remove some networks that we no longer need, but since we
774 # already have implemented our deduplication/merge algorithm this would not
777 # Export the entire temporary table
778 rows
= self
.db
.query("""
788 network
= writer
.add_network("%s" % row
.network
)
792 network
.country_code
= row
.country
796 network
.asn
= row
.autnum
799 if row
.is_anonymous_proxy
:
800 network
.set_flag(location
.NETWORK_FLAG_ANONYMOUS_PROXY
)
802 if row
.is_satellite_provider
:
803 network
.set_flag(location
.NETWORK_FLAG_SATELLITE_PROVIDER
)
806 network
.set_flag(location
.NETWORK_FLAG_ANYCAST
)
809 network
.set_flag(location
.NETWORK_FLAG_DROP
)
812 log
.info("Writing countries...")
814 # Select all countries
815 rows
= self
.db
.query("""
826 c
= writer
.add_country(row
.country_code
)
827 c
.continent_code
= row
.continent_code
830 # Write everything to file
831 log
.info("Writing database to file...")
835 async def handle_update_whois(self
, ns
):
836 # Did we run successfully?
840 # African Network Information Centre
842 (self
._import
_standard
_format
, "https://ftp.afrinic.net/pub/pub/dbase/afrinic.db.gz"),
845 # Asia Pacific Network Information Centre
847 (self
._import
_standard
_format
, "https://ftp.apnic.net/apnic/whois/apnic.db.inet6num.gz"),
848 (self
._import
_standard
_format
, "https://ftp.apnic.net/apnic/whois/apnic.db.inetnum.gz"),
849 (self
._import
_standard
_format
, "https://ftp.apnic.net/apnic/whois/apnic.db.aut-num.gz"),
850 (self
._import
_standard
_format
, "https://ftp.apnic.net/apnic/whois/apnic.db.organisation.gz"),
853 # American Registry for Internet Numbers
855 (self
._import
_extended
_format
, "https://ftp.arin.net/pub/stats/arin/delegated-arin-extended-latest"),
856 (self
._import
_arin
_as
_names
, "https://ftp.arin.net/pub/resource_registry_service/asns.csv"),
859 # Japan Network Information Center
861 (self
._import
_standard
_format
, "https://ftp.nic.ad.jp/jpirr/jpirr.db.gz"),
864 # Latin America and Caribbean Network Information Centre
866 (self
._import
_standard
_format
, "https://ftp.lacnic.net/lacnic/dbase/lacnic.db.gz"),
867 (self
._import
_extended
_format
, "https://ftp.lacnic.net/pub/stats/lacnic/delegated-lacnic-extended-latest"),
870 # Réseaux IP Européens
872 (self
._import
_standard
_format
, "https://ftp.ripe.net/ripe/dbase/split/ripe.db.inet6num.gz"),
873 (self
._import
_standard
_format
, "https://ftp.ripe.net/ripe/dbase/split/ripe.db.inetnum.gz"),
874 (self
._import
_standard
_format
, "https://ftp.ripe.net/ripe/dbase/split/ripe.db.aut-num.gz"),
875 (self
._import
_standard
_format
, "https://ftp.ripe.net/ripe/dbase/split/ripe.db.organisation.gz"),
879 # Fetch all valid country codes to check parsed networks against
880 countries
= self
.fetch_countries()
882 # Check if we have countries
884 log
.error("Please import countries before importing any WHOIS data")
887 # Iterate over all potential sources
888 for name
, feeds
in sources
:
889 # Skip anything that should not be updated
890 if ns
.sources
and not name
in ns
.sources
:
894 await self
._process
_source
(name
, feeds
, countries
)
896 # Log an error but continue if an exception occurs
897 except Exception as e
:
898 log
.error("Error processing source %s" % name
, exc_info
=True)
901 # Return a non-zero exit code for errors
902 return 0 if success
else 1
904 async def _process_source(self
, source
, feeds
, countries
):
906 This function processes one source
908 # Wrap everything into one large transaction
909 with self
.db
.transaction():
910 # Remove all previously imported content
911 self
.db
.execute("DELETE FROM autnums WHERE source = %s", source
)
912 self
.db
.execute("DELETE FROM networks WHERE source = %s", source
)
913 self
.db
.execute("DELETE FROM network_geofeeds WHERE source = %s", source
)
915 # Create some temporary tables to store parsed data
917 CREATE TEMPORARY TABLE _autnums(number integer NOT NULL,
918 organization text NOT NULL, source text NOT NULL) ON COMMIT DROP;
919 CREATE UNIQUE INDEX _autnums_number ON _autnums(number);
921 CREATE TEMPORARY TABLE _organizations(handle text NOT NULL,
922 name text NOT NULL, source text NOT NULL) ON COMMIT DROP;
923 CREATE UNIQUE INDEX _organizations_handle ON _organizations(handle);
925 CREATE TEMPORARY TABLE _rirdata(network inet NOT NULL, country text,
926 original_countries text[] NOT NULL, source text NOT NULL)
928 CREATE INDEX _rirdata_search ON _rirdata
929 USING BTREE(family(network), masklen(network));
930 CREATE UNIQUE INDEX _rirdata_network ON _rirdata(network);
934 for callback
, url
, *args
in feeds
:
936 f
= self
.downloader
.retrieve(url
)
939 with self
.db
.pipeline():
940 await callback(source
, countries
, f
, *args
)
942 # Process all parsed networks from every RIR we happen to have access to,
943 # insert the largest network chunks into the networks table immediately...
944 families
= self
.db
.query("""
946 family(network) AS family
954 for family
in (row
.family
for row
in families
):
955 # Fetch the smallest mask length in our data set
956 smallest
= self
.db
.get("""
986 masklen(network) = %s
995 # ... determine any other prefixes for this network family, ...
996 prefixes
= self
.db
.query("""
998 DISTINCT masklen(network) AS prefix
1002 family(network) = %s
1004 masklen(network) ASC
1009 # ... and insert networks with this prefix in case they provide additional
1010 # information (i. e. subnet of a larger chunk with a different country)
1011 for prefix
in (row
.prefix
for row
in prefixes
):
1013 WITH candidates AS (
1017 _rirdata.original_countries,
1022 family(_rirdata.network) = %s
1024 masklen(_rirdata.network) = %s
1028 DISTINCT ON (c.network)
1031 c.original_countries,
1033 masklen(networks.network),
1034 networks.country AS parent_country
1040 c.network << networks.network
1043 masklen(networks.network) DESC NULLS LAST
1046 networks(network, country, original_countries, source)
1055 parent_country IS NULL
1057 country <> parent_country
1058 ON CONFLICT DO NOTHING
1059 """, family
, prefix
,
1072 _organizations.name,
1073 _organizations.source
1077 _organizations ON _autnums.organization = _organizations.handle
1083 SET name = excluded.name
1087 async def _import_standard_format(self
, source
, countries
, f
, *args
):
1089 Imports a single standard format source feed
1091 # Iterate over all blocks
1092 for block
in iterate_over_blocks(f
):
1093 self
._parse
_block
(block
, source
, countries
)
1095 async def _import_extended_format(self
, source
, countries
, f
, *args
):
1096 # Iterate over all lines
1097 for line
in iterate_over_lines(f
):
1098 self
._parse
_line
(line
, source
, countries
)
1100 async def _import_arin_as_names(self
, source
, countries
, f
, *args
):
1101 # Wrap the data to text
1102 f
= io
.TextIOWrapper(f
)
1104 # Walk through the file
1105 for line
in csv
.DictReader(f
, dialect
="arin"):
1107 status
= line
.get("Status")
1109 # We are only interested in anything managed by ARIN
1110 if not status
== "Full Registry Services":
1113 # Fetch organization name
1114 name
= line
.get("Org Name")
1117 first_asn
= line
.get("Start AS Number")
1118 last_asn
= line
.get("End AS Number")
1122 first_asn
= int(first_asn
)
1123 except TypeError as e
:
1124 log
.warning("Could not parse ASN '%s'" % first_asn
)
1128 last_asn
= int(last_asn
)
1129 except TypeError as e
:
1130 log
.warning("Could not parse ASN '%s'" % last_asn
)
1133 # Check if the range is valid
1134 if last_asn
< first_asn
:
1135 log
.warning("Invalid ASN range %s-%s" % (first_asn
, last_asn
))
1137 # Insert everything into the database
1138 for asn
in range(first_asn
, last_asn
+ 1):
1139 if not self
._check
_parsed
_asn
(asn
):
1140 log
.warning("Skipping invalid ASN %s" % asn
)
1160 """, asn
, name
, "ARIN",
1163 def _check_parsed_network(self
, network
):
1165 Assistive function to detect and subsequently sort out parsed
1166 networks from RIR data (both Whois and so-called "extended sources"),
1167 which are or have...
1169 (a) not globally routable (RFC 1918 space, et al.)
1170 (b) covering a too large chunk of the IP address space (prefix length
1171 is < 7 for IPv4 networks, and < 10 for IPv6)
1172 (c) "0.0.0.0" or "::" as a network address
1174 This unfortunately is necessary due to brain-dead clutter across
1175 various RIR databases, causing mismatches and eventually disruptions.
1177 We will return False in case a network is not suitable for adding
1178 it to our database, and True otherwise.
1181 if isinstance(network
, ipaddress
.IPv6Network
):
1183 elif isinstance(network
, ipaddress
.IPv4Network
):
1186 raise ValueError("Invalid network: %s (type %s)" % (network
, type(network
)))
1188 # Ignore anything that isn't globally routable
1189 if not network
.is_global
:
1190 log
.debug("Skipping non-globally routable network: %s" % network
)
1193 # Ignore anything that is unspecified IP range (See RFC 5735 for IPv4 or RFC 2373 for IPv6)
1194 elif network
.is_unspecified
:
1195 log
.debug("Skipping unspecified network: %s" % network
)
1199 if network
.version
== 6:
1200 if network
.prefixlen
< 10:
1201 log
.debug("Skipping too big IP chunk: %s" % network
)
1205 elif network
.version
== 4:
1206 if network
.prefixlen
< 7:
1207 log
.debug("Skipping too big IP chunk: %s" % network
)
1210 # In case we have made it here, the network is considered to
1211 # be suitable for libloc consumption...
1214 def _check_parsed_asn(self
, asn
):
1216 Assistive function to filter Autonomous System Numbers not being suitable
1217 for adding to our database. Returns False in such cases, and True otherwise.
1220 for start
, end
in VALID_ASN_RANGES
:
1221 if start
<= asn
and end
>= asn
:
1224 log
.info("Supplied ASN %s out of publicly routable ASN ranges" % asn
)
1227 def _check_geofeed_url(self
, url
):
1229 This function checks if a Geofeed URL is valid.
1231 If so, it returns the normalized URL which should be stored instead of
1236 url
= urllib
.parse
.urlparse(url
)
1237 except ValueError as e
:
1238 log
.warning("Invalid URL %s: %s" % (url
, e
))
1241 # Make sure that this is a HTTPS URL
1242 if not url
.scheme
== "https":
1243 log
.warning("Skipping Geofeed URL that is not using HTTPS: %s" \
1247 # Normalize the URL and convert it back
1250 def _parse_block(self
, block
, source_key
, countries
):
1251 # Get first line to find out what type of block this is
1255 if line
.startswith("aut-num:"):
1256 return self
._parse
_autnum
_block
(block
, source_key
)
1259 if line
.startswith("inet6num:") or line
.startswith("inetnum:"):
1260 return self
._parse
_inetnum
_block
(block
, source_key
, countries
)
1263 elif line
.startswith("organisation:"):
1264 return self
._parse
_org
_block
(block
, source_key
)
1266 def _parse_autnum_block(self
, block
, source_key
):
1270 key
, val
= split_line(line
)
1272 if key
== "aut-num":
1273 m
= re
.match(r
"^(AS|as)(\d+)", val
)
1275 autnum
["asn"] = m
.group(2)
1278 autnum
[key
] = val
.upper()
1280 elif key
== "descr":
1281 # Save the first description line as well...
1282 if not key
in autnum
:
1285 # Skip empty objects
1286 if not autnum
or not "asn" in autnum
:
1289 # Insert a dummy organisation handle into our temporary organisations
1290 # table in case the AS does not have an organisation handle set, but
1291 # has a description (a quirk often observed in APNIC area), so we can
1292 # later display at least some string for this AS.
1293 if not "org" in autnum
:
1294 if "descr" in autnum
:
1295 autnum
["org"] = "LIBLOC-%s-ORGHANDLE" % autnum
.get("asn")
1297 self
.db
.execute("INSERT INTO _organizations(handle, name, source) \
1298 VALUES(%s, %s, %s) ON CONFLICT (handle) DO NOTHING",
1299 autnum
.get("org"), autnum
.get("descr"), source_key
,
1302 log
.warning("ASN %s neither has an organisation handle nor a description line set, omitting" % \
1306 # Insert into database
1307 self
.db
.execute("INSERT INTO _autnums(number, organization, source) \
1308 VALUES(%s, %s, %s) ON CONFLICT (number) DO UPDATE SET \
1309 organization = excluded.organization",
1310 autnum
.get("asn"), autnum
.get("org"), source_key
,
1313 def _parse_inetnum_block(self
, block
, source_key
, countries
):
1317 key
, val
= split_line(line
)
1319 # Filter any inetnum records which are only referring to IP space
1320 # not managed by that specific RIR...
1321 if key
== "netname":
1322 if re
.match(r
"^(ERX-NETBLOCK|(AFRINIC|ARIN|LACNIC|RIPE)-CIDR-BLOCK|IANA-NETBLOCK-\d{1,3}|NON-RIPE-NCC-MANAGED-ADDRESS-BLOCK|STUB-[\d-]{3,}SLASH\d{1,2})", val
.strip()):
1323 log
.debug("Skipping record indicating historic/orphaned data: %s" % val
.strip())
1326 if key
== "inetnum":
1327 start_address
, delim
, end_address
= val
.partition("-")
1329 # Strip any excess space
1330 start_address
, end_address
= start_address
.rstrip(), end_address
.strip()
1332 # Handle "inetnum" formatting in LACNIC DB (e.g. "24.152.8/22" instead of "24.152.8.0/22")
1333 if start_address
and not (delim
or end_address
):
1335 start_address
= ipaddress
.ip_network(start_address
, strict
=False)
1337 start_address
= start_address
.split("/")
1338 ldigits
= start_address
[0].count(".")
1340 # How many octets do we need to add?
1341 # (LACNIC does not seem to have a /8 or greater assigned, so the following should suffice.)
1343 start_address
= start_address
[0] + ".0.0/" + start_address
[1]
1345 start_address
= start_address
[0] + ".0/" + start_address
[1]
1347 log
.warning("Could not recover IPv4 address from line in LACNIC DB format: %s" % line
)
1351 start_address
= ipaddress
.ip_network(start_address
, strict
=False)
1353 log
.warning("Could not parse line in LACNIC DB format: %s" % line
)
1356 # Enumerate first and last IP address of this network
1357 end_address
= start_address
[-1]
1358 start_address
= start_address
[0]
1361 # Convert to IP address
1363 start_address
= ipaddress
.ip_address(start_address
)
1364 end_address
= ipaddress
.ip_address(end_address
)
1366 log
.warning("Could not parse line: %s" % line
)
1369 inetnum
["inetnum"] = list(ipaddress
.summarize_address_range(start_address
, end_address
))
1371 elif key
== "inet6num":
1372 inetnum
[key
] = [ipaddress
.ip_network(val
, strict
=False)]
1374 elif key
== "country":
1377 # Ignore certain country codes
1378 if cc
in IGNORED_COUNTRIES
:
1379 log
.debug("Ignoring country code '%s'" % cc
)
1382 # Translate country codes
1384 cc
= TRANSLATED_COUNTRIES
[cc
]
1388 # Do we know this country?
1389 if not cc
in countries
:
1390 log
.warning("Skipping invalid country code '%s'" % cc
)
1394 inetnum
[key
].append(cc
)
1398 # Parse the geofeed attribute
1399 elif key
== "geofeed":
1400 inetnum
["geofeed"] = val
1402 # Parse geofeed when used as a remark
1403 elif key
== "remarks":
1404 m
= re
.match(r
"^(?:Geofeed)\s+(https://.*)", val
)
1406 inetnum
["geofeed"] = m
.group(1)
1408 # Skip empty objects
1412 # Iterate through all networks enumerated from above, check them for plausibility and insert
1413 # them into the database, if _check_parsed_network() succeeded
1414 for single_network
in inetnum
.get("inet6num") or inetnum
.get("inetnum"):
1415 if not self
._check
_parsed
_network
(single_network
):
1418 # Fetch the countries or use a list with an empty country
1419 countries
= inetnum
.get("country", [None])
1421 # Insert the network into the database but only use the first country code
1422 for cc
in countries
:
1436 ON CONFLICT (network)
1437 DO UPDATE SET country = excluded.country
1438 """, "%s" % single_network
, cc
, [cc
for cc
in countries
if cc
], source_key
,
1441 # If there are more than one country, we will only use the first one
1444 # Update any geofeed information
1445 geofeed
= inetnum
.get("geofeed", None)
1447 self
._parse
_geofeed
(source_key
, geofeed
, single_network
)
1449 def _parse_geofeed(self
, source
, url
, single_network
):
1451 url
= self
._check
_geofeed
_url
(url
)
1455 # Store/update any geofeeds
1473 source = excluded.source
1474 """, "%s" % single_network
, url
, source
,
1477 def _parse_org_block(self
, block
, source_key
):
1481 key
, val
= split_line(line
)
1483 if key
== "organisation":
1484 org
[key
] = val
.upper()
1485 elif key
== "org-name":
1488 # Skip empty objects
1492 self
.db
.execute("INSERT INTO _organizations(handle, name, source) \
1493 VALUES(%s, %s, %s) ON CONFLICT (handle) DO \
1494 UPDATE SET name = excluded.name",
1495 org
.get("organisation"), org
.get("org-name"), source_key
,
1498 def _parse_line(self
, line
, source_key
, validcountries
=None):
1500 if line
.startswith("2"):
1504 if line
.startswith("#"):
1508 registry
, country_code
, type, line
= line
.split("|", 3)
1510 log
.warning("Could not parse line: %s" % line
)
1517 # Skip any unknown protocols
1518 elif not type in ("ipv6", "ipv4"):
1519 log
.warning("Unknown IP protocol '%s'" % type)
1522 # Skip any lines that are for stats only or do not have a country
1523 # code at all (avoids log spam below)
1524 if not country_code
or country_code
== '*':
1527 # Skip objects with unknown country codes
1528 if validcountries
and country_code
not in validcountries
:
1529 log
.warning("Skipping line with bogus country '%s': %s" % \
1530 (country_code
, line
))
1534 address
, prefix
, date
, status
, organization
= line
.split("|")
1538 # Try parsing the line without organization
1540 address
, prefix
, date
, status
= line
.split("|")
1542 log
.warning("Unhandled line format: %s" % line
)
1545 # Skip anything that isn't properly assigned
1546 if not status
in ("assigned", "allocated"):
1549 # Cast prefix into an integer
1551 prefix
= int(prefix
)
1553 log
.warning("Invalid prefix: %s" % prefix
)
1556 # Fix prefix length for IPv4
1558 prefix
= 32 - int(math
.log(prefix
, 2))
1560 # Try to parse the address
1562 network
= ipaddress
.ip_network("%s/%s" % (address
, prefix
), strict
=False)
1564 log
.warning("Invalid IP address: %s" % address
)
1567 if not self
._check
_parsed
_network
(network
):
1583 ON CONFLICT (network)
1584 DO UPDATE SET country = excluded.country
1585 """, "%s" % network
, country_code
, [country_code
], source_key
,
1588 async def handle_update_announcements(self
, ns
):
1589 server
= ns
.server
[0]
1591 with self
.db
.transaction():
1592 if server
.startswith("/"):
1593 await self
._handle
_update
_announcements
_from
_bird
(server
)
1595 # Purge anything we never want here
1597 -- Delete default routes
1598 DELETE FROM announcements WHERE network = '::/0' OR network = '0.0.0.0/0';
1600 -- Delete anything that is not global unicast address space
1601 DELETE FROM announcements WHERE family(network) = 6 AND NOT network <<= '2000::/3';
1603 -- DELETE "current network" address space
1604 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '0.0.0.0/8';
1606 -- DELETE local loopback address space
1607 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '127.0.0.0/8';
1609 -- DELETE RFC 1918 address space
1610 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '10.0.0.0/8';
1611 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '172.16.0.0/12';
1612 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '192.168.0.0/16';
1614 -- DELETE test, benchmark and documentation address space
1615 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '192.0.0.0/24';
1616 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '192.0.2.0/24';
1617 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '198.18.0.0/15';
1618 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '198.51.100.0/24';
1619 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '203.0.113.0/24';
1621 -- DELETE CGNAT address space (RFC 6598)
1622 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '100.64.0.0/10';
1624 -- DELETE link local address space
1625 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '169.254.0.0/16';
1627 -- DELETE IPv6 to IPv4 (6to4) address space (RFC 3068)
1628 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '192.88.99.0/24';
1629 DELETE FROM announcements WHERE family(network) = 6 AND network <<= '2002::/16';
1631 -- DELETE multicast and reserved address space
1632 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '224.0.0.0/4';
1633 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '240.0.0.0/4';
1635 -- Delete networks that are too small to be in the global routing table
1636 DELETE FROM announcements WHERE family(network) = 6 AND masklen(network) > 48;
1637 DELETE FROM announcements WHERE family(network) = 4 AND masklen(network) > 24;
1639 -- Delete any non-public or reserved ASNs
1640 DELETE FROM announcements WHERE NOT (
1641 (autnum >= 1 AND autnum <= 23455)
1643 (autnum >= 23457 AND autnum <= 64495)
1645 (autnum >= 131072 AND autnum <= 4199999999)
1648 -- Delete everything that we have not seen for 14 days
1649 DELETE FROM announcements WHERE last_seen_at <= CURRENT_TIMESTAMP - INTERVAL '14 days';
1652 async def _handle_update_announcements_from_bird(self
, server
):
1653 # Pre-compile the regular expression for faster searching
1654 route
= re
.compile(b
"^\s(.+?)\s+.+?\[(?:AS(.*?))?.\]$")
1656 log
.info("Requesting routing table from Bird (%s)" % server
)
1658 aggregated_networks
= []
1660 # Send command to list all routes
1661 for line
in self
._bird
_cmd
(server
, "show route"):
1662 m
= route
.match(line
)
1668 # Ignore any header lines with the name of the routing table
1669 elif line
.startswith(b
"Table"):
1674 log
.debug("Could not parse line: %s" % line
.decode())
1678 # Fetch the extracted network and ASN
1679 network
, autnum
= m
.groups()
1681 # Skip the line if there is no network
1685 # Decode into strings
1686 network
= network
.decode()
1688 # Parse as network object
1689 network
= ipaddress
.ip_network(network
)
1691 # Skip announcements that are too large
1692 if isinstance(network
, ipaddress
.IPv6Network
):
1693 if network
.prefixlen
< 10:
1694 log
.warning("Skipping unusually large network %s" % network
)
1696 elif isinstance(network
, ipaddress
.IPv4Network
):
1697 if network
.prefixlen
< 4:
1698 log
.warning("Skipping unusually large network %s" % network
)
1701 # Collect all aggregated networks
1703 log
.debug("%s is an aggregated network" % network
)
1704 aggregated_networks
.append(network
)
1708 autnum
= autnum
.decode()
1710 # Insert it into the database
1711 self
.db
.execute("INSERT INTO announcements(network, autnum) \
1712 VALUES(%s, %s) ON CONFLICT (network) DO \
1713 UPDATE SET autnum = excluded.autnum, last_seen_at = CURRENT_TIMESTAMP",
1714 "%s" % network
, autnum
,
1717 # Process any aggregated networks
1718 for network
in aggregated_networks
:
1719 log
.debug("Processing aggregated network %s" % network
)
1721 # Run "show route all" for each network
1722 for line
in self
._bird
_cmd
(server
, "show route %s all" % network
):
1723 # Try finding the path
1724 m
= re
.match(b
"\s+BGP\.as_path:.* (\d+) {\d+}$", line
)
1726 # Select the last AS number in the path
1727 autnum
= m
.group(1).decode()
1729 # Insert it into the database
1730 self
.db
.execute("INSERT INTO announcements(network, autnum) \
1731 VALUES(%s, %s) ON CONFLICT (network) DO \
1732 UPDATE SET autnum = excluded.autnum, last_seen_at = CURRENT_TIMESTAMP",
1736 # We don't need to process any more
1739 def _bird_cmd(self
, socket_path
, command
):
1740 # Connect to the socket
1741 s
= socket
.socket(socket
.AF_UNIX
, socket
.SOCK_STREAM
)
1742 s
.connect(socket_path
)
1744 # Allocate some buffer
1747 log
.debug("Sending Bird command: %s" % command
)
1750 s
.send(b
"%s\n" % command
.encode())
1753 # Fill up the buffer
1754 buffer += s
.recv(4096)
1757 # Search for the next newline
1758 pos
= buffer.find(b
"\n")
1760 # If we cannot find one, we go back and read more data
1764 # Cut after the newline character
1767 # Split the line we want and keep the rest in buffer
1768 line
, buffer = buffer[:pos
], buffer[pos
:]
1770 # Try parsing any status lines
1771 if len(line
) > 4 and line
[:4].isdigit() and line
[4] in (32, 45):
1772 code
, delim
, line
= int(line
[:4]), line
[4], line
[5:]
1774 log
.debug("Received response code %s from bird" % code
)
1784 # Otherwise return the line
1787 async def handle_update_geofeeds(self
, ns
):
1789 with self
.db
.transaction():
1790 # Delete all geofeeds which are no longer linked
1795 geofeeds.url NOT IN (
1797 network_geofeeds.url
1806 WITH all_geofeeds AS (
1808 network_geofeeds.url
1826 # Fetch all Geofeeds that require an update
1827 geofeeds
= self
.db
.query("""
1836 updated_at <= CURRENT_TIMESTAMP - INTERVAL '1 week'
1841 ratelimiter
= asyncio
.Semaphore(32)
1843 # Update all geofeeds
1844 async with asyncio
.TaskGroup() as tasks
:
1845 for geofeed
in geofeeds
:
1846 task
= tasks
.create_task(
1847 self
._fetch
_geofeed
(ratelimiter
, geofeed
),
1850 # Delete data from any feeds that did not update in the last two weeks
1851 with self
.db
.transaction():
1856 geofeed_networks.geofeed_id IN (
1864 updated_at <= CURRENT_TIMESTAMP - INTERVAL '2 weeks'
1868 async def _fetch_geofeed(self
, ratelimiter
, geofeed
):
1869 async with ratelimiter
:
1870 log
.debug("Fetching Geofeed %s" % geofeed
.url
)
1872 with self
.db
.transaction():
1876 f
= await asyncio
.to_thread(
1877 self
.downloader
.retrieve
,
1879 # Fetch the feed by its URL
1882 # Send some extra headers
1884 "User-Agent" : "location/%s" % location
.__version
__,
1886 # We expect some plain text file in CSV format
1887 "Accept" : "text/csv, text/plain",
1890 # Don't wait longer than 10 seconds for a response
1894 # Remove any previous data
1895 self
.db
.execute("DELETE FROM geofeed_networks \
1896 WHERE geofeed_id = %s", geofeed
.id)
1900 # Read the output line by line
1901 with self
.db
.pipeline():
1906 line
= line
.decode()
1908 # Ignore any lines we cannot decode
1909 except UnicodeDecodeError:
1910 log
.debug("Could not decode line %s in %s" \
1911 % (lineno
, geofeed
.url
))
1915 line
= line
.rstrip()
1922 elif line
.startswith("#"):
1925 # Try to parse the line
1927 fields
= line
.split(",", 5)
1929 log
.debug("Could not parse line: %s" % line
)
1932 # Check if we have enough fields
1934 log
.debug("Not enough fields in line: %s" % line
)
1938 network
, country
, region
, city
, = fields
[:4]
1940 # Try to parse the network
1942 network
= ipaddress
.ip_network(network
, strict
=False)
1944 log
.debug("Could not parse network: %s" % network
)
1947 # Strip any excess whitespace from country codes
1948 country
= country
.strip()
1950 # Make the country code uppercase
1951 country
= country
.upper()
1953 # Check the country code
1955 log
.debug("Empty country code in Geofeed %s line %s" \
1956 % (geofeed
.url
, lineno
))
1959 elif not location
.country_code_is_valid(country
):
1960 log
.debug("Invalid country code in Geofeed %s:%s: %s" \
1961 % (geofeed
.url
, lineno
, country
))
1964 # Write this into the database
1974 VALUES (%s, %s, %s, %s, %s)""",
1982 # Catch any HTTP errors
1983 except urllib
.request
.HTTPError
as e
:
1984 self
.db
.execute("UPDATE geofeeds SET status = %s, error = %s \
1985 WHERE id = %s", e
.code
, "%s" % e
, geofeed
.id)
1987 # Remove any previous data when the feed has been deleted
1989 self
.db
.execute("DELETE FROM geofeed_networks \
1990 WHERE geofeed_id = %s", geofeed
.id)
1992 # Catch any other errors and connection timeouts
1993 except (http
.client
.InvalidURL
, http
.client
.RemoteDisconnected
, urllib
.request
.URLError
, TimeoutError
) as e
:
1994 log
.debug("Could not fetch URL %s: %s" % (geofeed
.url
, e
))
1996 self
.db
.execute("UPDATE geofeeds SET status = %s, error = %s \
1997 WHERE id = %s", 599, "%s" % e
, geofeed
.id)
1999 # Mark the geofeed as updated
2005 updated_at = CURRENT_TIMESTAMP,
2013 async def handle_update_overrides(self
, ns
):
2014 with self
.db
.transaction():
2015 # Drop any previous content
2016 self
.db
.execute("TRUNCATE TABLE autnum_overrides")
2017 self
.db
.execute("TRUNCATE TABLE network_overrides")
2019 # Remove all Geofeeds
2020 self
.db
.execute("DELETE FROM network_geofeeds WHERE source = %s", "overrides")
2022 for file in ns
.files
:
2023 log
.info("Reading %s..." % file)
2025 with
open(file, "rb") as f
:
2026 for type, block
in read_blocks(f
):
2028 network
= block
.get("net")
2029 # Try to parse and normalise the network
2031 network
= ipaddress
.ip_network(network
, strict
=False)
2032 except ValueError as e
:
2033 log
.warning("Invalid IP network: %s: %s" % (network
, e
))
2036 # Prevent that we overwrite all networks
2037 if network
.prefixlen
== 0:
2038 log
.warning("Skipping %s: You cannot overwrite default" % network
)
2048 is_satellite_provider,
2054 %s, %s, %s, %s, %s, %s
2056 ON CONFLICT (network) DO NOTHING
2059 block
.get("country"),
2060 self
._parse
_bool
(block
, "is-anonymous-proxy"),
2061 self
._parse
_bool
(block
, "is-satellite-provider"),
2062 self
._parse
_bool
(block
, "is-anycast"),
2063 self
._parse
_bool
(block
, "drop"),
2066 elif type == "aut-num":
2067 autnum
= block
.get("aut-num")
2069 # Check if AS number begins with "AS"
2070 if not autnum
.startswith("AS"):
2071 log
.warning("Invalid AS number: %s" % autnum
)
2085 is_satellite_provider,
2091 %s, %s, %s, %s, %s, %s, %s
2093 ON CONFLICT (number) DO NOTHING
2097 block
.get("country"),
2098 self
._parse
_bool
(block
, "is-anonymous-proxy"),
2099 self
._parse
_bool
(block
, "is-satellite-provider"),
2100 self
._parse
_bool
(block
, "is-anycast"),
2101 self
._parse
_bool
(block
, "drop"),
2105 elif type == "geofeed":
2109 url
= block
.get("geofeed")
2111 # Fetch permitted networks
2112 for n
in block
.get("network", []):
2114 n
= ipaddress
.ip_network(n
)
2115 except ValueError as e
:
2116 log
.warning("Ignoring invalid network %s: %s" % (n
, e
))
2121 # If no networks have been specified, permit for everything
2124 ipaddress
.ip_network("::/0"),
2125 ipaddress
.ip_network("0.0.0.0/0"),
2129 url
= self
._check
_geofeed
_url
(url
)
2133 # Store the Geofeed URL
2144 ON CONFLICT (url) DO NOTHING
2148 # Store all permitted networks
2149 self
.db
.executemany("""
2166 source = excluded.source
2167 """, (("%s" % n
, url
, "overrides") for n
in networks
),
2171 log
.warning("Unsupported type: %s" % type)
2173 async def handle_update_feeds(self
, ns
):
2175 Update any third-party feeds
2181 ("AWS-IP-RANGES", self
._import
_aws
_ip
_ranges
, "https://ip-ranges.amazonaws.com/ip-ranges.json"),
2184 ("SPAMHAUS-DROP", self
._import
_spamhaus
_drop
, "https://www.spamhaus.org/drop/drop.txt"),
2185 ("SPAMHAUS-DROPV6", self
._import
_spamhaus
_drop
, "https://www.spamhaus.org/drop/dropv6.txt"),
2188 ("SPAMHAUS-ASNDROP", self
._import
_spamhaus
_asndrop
, "https://www.spamhaus.org/drop/asndrop.json"),
2191 # Drop any data from feeds that we don't support (any more)
2192 with self
.db
.transaction():
2193 # Fetch the names of all feeds we support
2194 sources
= [name
for name
, *rest
in feeds
]
2196 self
.db
.execute("DELETE FROM autnum_feeds WHERE NOT source = ANY(%s)", sources
)
2197 self
.db
.execute("DELETE FROM network_feeds WHERE NOT source = ANY(%s)", sources
)
2199 # Walk through all feeds
2200 for name
, callback
, url
, *args
in feeds
:
2201 # Skip any feeds that were not requested on the command line
2202 if ns
.feeds
and not name
in ns
.feeds
:
2206 await self
._process
_feed
(name
, callback
, url
, *args
)
2208 # Log an error but continue if an exception occurs
2209 except Exception as e
:
2210 log
.error("Error processing feed '%s': %s" % (name
, e
))
2214 return 0 if success
else 1
2216 async def _process_feed(self
, name
, callback
, url
, *args
):
2221 f
= self
.downloader
.retrieve(url
)
2223 with self
.db
.transaction():
2224 # Drop any previous content
2225 self
.db
.execute("DELETE FROM autnum_feeds WHERE source = %s", name
)
2226 self
.db
.execute("DELETE FROM network_feeds WHERE source = %s", name
)
2228 # Call the callback to process the feed
2229 with self
.db
.pipeline():
2230 return await callback(name
, f
, *args
)
2232 async def _import_aws_ip_ranges(self
, name
, f
):
2236 # Set up a dictionary for mapping a region name to a country. Unfortunately,
2237 # there seems to be no machine-readable version available of this other than
2238 # https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html
2239 # (worse, it seems to be incomplete :-/ ); https://www.cloudping.cloud/endpoints
2240 # was helpful here as well.
2241 aws_region_country_map
= {
2243 "af-south-1" : "ZA",
2246 "il-central-1" : "IL", # Tel Aviv
2249 "ap-northeast-1" : "JP",
2250 "ap-northeast-2" : "KR",
2251 "ap-northeast-3" : "JP",
2253 "ap-south-1" : "IN",
2254 "ap-south-2" : "IN",
2255 "ap-southeast-1" : "SG",
2256 "ap-southeast-2" : "AU",
2257 "ap-southeast-3" : "MY",
2258 "ap-southeast-4" : "AU", # Melbourne
2259 "ap-southeast-5" : "MY", # Malaysia
2260 "ap-southeast-6" : "AP", # XXX: Precise location not documented anywhere
2261 "ap-southeast-7" : "AP", # XXX: Precise location unknown
2264 "ca-central-1" : "CA",
2268 "eu-central-1" : "DE",
2269 "eu-central-2" : "CH",
2270 "eu-north-1" : "SE",
2274 "eu-south-1" : "IT",
2275 "eu-south-2" : "ES",
2278 "me-central-1" : "AE",
2279 "me-south-1" : "BH",
2282 "mx-central-1" : "MX",
2287 # Undocumented, likely located in Berlin rather than Frankfurt
2288 "eusc-de-east-1" : "DE",
2291 # Collect a list of all networks
2292 prefixes
= feed
.get("ipv6_prefixes", []) + feed
.get("prefixes", [])
2294 for prefix
in prefixes
:
2296 network
= prefix
.get("ipv6_prefix") or prefix
.get("ip_prefix")
2300 network
= ipaddress
.ip_network(network
)
2301 except ValuleError
as e
:
2302 log
.warning("%s: Unable to parse prefix %s" % (name
, network
))
2305 # Sanitize parsed networks...
2306 if not self
._check
_parsed
_network
(network
):
2310 region
= prefix
.get("region")
2316 # Fetch the CC from the dictionary
2318 cc
= aws_region_country_map
[region
]
2320 # If we couldn't find anything, let's try something else...
2321 except KeyError as e
:
2322 # Find anycast networks
2323 if region
== "GLOBAL":
2326 # Everything that starts with us- is probably in the United States
2327 elif region
.startswith("us-"):
2330 # Everything that starts with cn- is probably China
2331 elif region
.startswith("cn-"):
2334 # Log a warning for anything else
2336 log
.warning("%s: Could not determine country code for AWS region %s" \
2354 ON CONFLICT (network, source) DO NOTHING
2355 """, "%s" % network
, name
, cc
, is_anycast
,
2358 async def _import_spamhaus_drop(self
, name
, f
):
2360 Import Spamhaus DROP IP feeds
2365 # Walk through all lines
2368 line
= line
.decode("utf-8")
2370 # Strip off any comments
2371 line
, _
, comment
= line
.partition(";")
2373 # Ignore empty lines
2377 # Strip any excess whitespace
2380 # Increment line counter
2385 network
= ipaddress
.ip_network(line
)
2386 except ValueError as e
:
2387 log
.warning("%s: Could not parse network: %s - %s" % (name
, line
, e
))
2391 if not self
._check
_parsed
_network
(network
):
2392 log
.warning("%s: Skipping bogus network: %s" % (name
, network
))
2395 # Insert into the database
2407 )""", "%s" % network
, name
, True,
2410 # Raise an exception if we could not import anything
2412 raise RuntimeError("Received bogus feed %s with no data" % name
)
2414 async def _import_spamhaus_asndrop(self
, name
, f
):
2416 Import Spamhaus ASNDROP feed
2420 line
= line
.decode("utf-8")
2424 line
= json
.loads(line
)
2425 except json
.JSONDecodeError
as e
:
2426 log
.warning("%s: Unable to parse JSON object %s: %s" % (name
, line
, e
))
2430 type = line
.get("type")
2433 if type == "metadata":
2437 asn
= line
.get("asn")
2439 # Skip any lines without an ASN
2443 # Filter invalid ASNs
2444 if not self
._check
_parsed
_asn
(asn
):
2445 log
.warning("%s: Skipping bogus ASN %s" % (name
, asn
))
2460 )""", "%s" % asn
, name
, True,
2464 def _parse_bool(block
, key
):
2465 val
= block
.get(key
)
2467 # There is no point to proceed when we got None
2471 # Convert to lowercase
2475 if val
in ("yes", "1"):
2479 if val
in ("no", "0"):
2485 async def handle_import_countries(self
, ns
):
2486 with self
.db
.transaction():
2487 # Drop all data that we have
2488 self
.db
.execute("TRUNCATE TABLE countries")
2490 for file in ns
.file:
2492 line
= line
.rstrip()
2494 # Ignore any comments
2495 if line
.startswith("#"):
2499 country_code
, continent_code
, name
= line
.split(maxsplit
=2)
2501 log
.warning("Could not parse line: %s" % line
)
2504 self
.db
.execute("INSERT INTO countries(country_code, name, continent_code) \
2505 VALUES(%s, %s, %s) ON CONFLICT DO NOTHING", country_code
, name
, continent_code
)
2508 def split_line(line
):
2509 key
, colon
, val
= line
.partition(":")
2511 # Strip any excess space
2518 for block
in iterate_over_blocks(f
):
2522 for i
, line
in enumerate(block
):
2523 key
, value
= line
.split(":", 1)
2525 # The key of the first line defines the type
2529 # Strip any excess whitespace
2530 value
= value
.strip()
2532 # Store some values as a list
2533 if type == "geofeed" and key
== "network":
2535 data
[key
].append(value
)
2539 # Otherwise store the value as string
2545 def iterate_over_blocks(f
, charsets
=("utf-8", "latin1")):
2549 # Skip commented lines
2550 if line
.startswith(b
"#") or line
.startswith(b
"%"):
2554 for charset
in charsets
:
2556 line
= line
.decode(charset
)
2557 except UnicodeDecodeError:
2562 # Remove any comments at the end of line
2563 line
, hash, comment
= line
.partition("#")
2565 # Strip any whitespace at the end of the line
2566 line
= line
.rstrip()
2568 # If we cut off some comment and the line is empty, we can skip it
2569 if comment
and not line
:
2572 # If the line has some content, keep collecting it
2577 # End the block on an empty line
2584 # Return the last block
2588 def iterate_over_lines(f
):
2591 line
= line
.decode()
2597 # Run the command line interface