]> git.ipfire.org Git - location/libloc.git/blob - src/scripts/location-importer.in
importer: Add structure to add Geofeed overrides
[location/libloc.git] / src / scripts / location-importer.in
1 #!/usr/bin/python3
2 ###############################################################################
3 # #
4 # libloc - A library to determine the location of someone on the Internet #
5 # #
6 # Copyright (C) 2020-2024 IPFire Development Team <info@ipfire.org> #
7 # #
8 # This library is free software; you can redistribute it and/or #
9 # modify it under the terms of the GNU Lesser General Public #
10 # License as published by the Free Software Foundation; either #
11 # version 2.1 of the License, or (at your option) any later version. #
12 # #
13 # This library is distributed in the hope that it will be useful, #
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of #
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU #
16 # Lesser General Public License for more details. #
17 # #
18 ###############################################################################
19
20 import argparse
21 import csv
22 import functools
23 import http.client
24 import ipaddress
25 import json
26 import logging
27 import math
28 import re
29 import socket
30 import sys
31 import urllib.error
32
33 # Load our location module
34 import location
35 import location.database
36 from location.downloader import Downloader
37 from location.i18n import _
38
39 # Initialise logging
40 log = logging.getLogger("location.importer")
41 log.propagate = 1
42
43 # Define constants
44 VALID_ASN_RANGES = (
45 (1, 23455),
46 (23457, 64495),
47 (131072, 4199999999),
48 )
49
50 TRANSLATED_COUNTRIES = {
51 # When people say UK, they mean GB
52 "UK" : "GB",
53 }
54
55 IGNORED_COUNTRIES = set((
56 # Formerly Yugoslavia
57 "YU",
58
59 # Some people use ZZ to say "no country" or to hide the country
60 "ZZ",
61 ))
62
63 # Configure the CSV parser for ARIN
64 csv.register_dialect("arin", delimiter=",", quoting=csv.QUOTE_ALL, quotechar="\"")
65
66 class CLI(object):
67 def parse_cli(self):
68 parser = argparse.ArgumentParser(
69 description=_("Location Importer Command Line Interface"),
70 )
71 subparsers = parser.add_subparsers()
72
73 # Global configuration flags
74 parser.add_argument("--debug", action="store_true",
75 help=_("Enable debug output"))
76 parser.add_argument("--quiet", action="store_true",
77 help=_("Enable quiet mode"))
78
79 # version
80 parser.add_argument("--version", action="version",
81 version="%(prog)s @VERSION@")
82
83 # Database
84 parser.add_argument("--database-host", required=True,
85 help=_("Database Hostname"), metavar=_("HOST"))
86 parser.add_argument("--database-name", required=True,
87 help=_("Database Name"), metavar=_("NAME"))
88 parser.add_argument("--database-username", required=True,
89 help=_("Database Username"), metavar=_("USERNAME"))
90 parser.add_argument("--database-password", required=True,
91 help=_("Database Password"), metavar=_("PASSWORD"))
92
93 # Write Database
94 write = subparsers.add_parser("write", help=_("Write database to file"))
95 write.set_defaults(func=self.handle_write)
96 write.add_argument("file", nargs=1, help=_("Database File"))
97 write.add_argument("--signing-key", nargs="?", type=open, help=_("Signing Key"))
98 write.add_argument("--backup-signing-key", nargs="?", type=open, help=_("Backup Signing Key"))
99 write.add_argument("--vendor", nargs="?", help=_("Sets the vendor"))
100 write.add_argument("--description", nargs="?", help=_("Sets a description"))
101 write.add_argument("--license", nargs="?", help=_("Sets the license"))
102 write.add_argument("--version", type=int, help=_("Database Format Version"))
103
104 # Update WHOIS
105 update_whois = subparsers.add_parser("update-whois", help=_("Update WHOIS Information"))
106 update_whois.add_argument("sources", nargs="*",
107 help=_("Only update these sources"))
108 update_whois.set_defaults(func=self.handle_update_whois)
109
110 # Update announcements
111 update_announcements = subparsers.add_parser("update-announcements",
112 help=_("Update BGP Annoucements"))
113 update_announcements.set_defaults(func=self.handle_update_announcements)
114 update_announcements.add_argument("server", nargs=1,
115 help=_("Route Server to connect to"), metavar=_("SERVER"))
116
117 # Update geofeeds
118 update_geofeeds = subparsers.add_parser("update-geofeeds",
119 help=_("Update Geofeeds"))
120 update_geofeeds.set_defaults(func=self.handle_update_geofeeds)
121
122 # Update feeds
123 update_feeds = subparsers.add_parser("update-feeds",
124 help=_("Update Feeds"))
125 update_feeds.add_argument("feeds", nargs="*",
126 help=_("Only update these feeds"))
127 update_feeds.set_defaults(func=self.handle_update_feeds)
128
129 # Update overrides
130 update_overrides = subparsers.add_parser("update-overrides",
131 help=_("Update overrides"),
132 )
133 update_overrides.add_argument(
134 "files", nargs="+", help=_("Files to import"),
135 )
136 update_overrides.set_defaults(func=self.handle_update_overrides)
137
138 # Import countries
139 import_countries = subparsers.add_parser("import-countries",
140 help=_("Import countries"),
141 )
142 import_countries.add_argument("file", nargs=1, type=argparse.FileType("r"),
143 help=_("File to import"))
144 import_countries.set_defaults(func=self.handle_import_countries)
145
146 args = parser.parse_args()
147
148 # Configure logging
149 if args.debug:
150 location.logger.set_level(logging.DEBUG)
151 elif args.quiet:
152 location.logger.set_level(logging.WARNING)
153
154 # Print usage if no action was given
155 if not "func" in args:
156 parser.print_usage()
157 sys.exit(2)
158
159 return args
160
161 def run(self):
162 # Parse command line arguments
163 args = self.parse_cli()
164
165 # Initialize the downloader
166 self.downloader = Downloader()
167
168 # Initialise database
169 self.db = self._setup_database(args)
170
171 # Call function
172 ret = args.func(args)
173
174 # Return with exit code
175 if ret:
176 sys.exit(ret)
177
178 # Otherwise just exit
179 sys.exit(0)
180
181 def _setup_database(self, ns):
182 """
183 Initialise the database
184 """
185 # Connect to database
186 db = location.database.Connection(
187 host=ns.database_host, database=ns.database_name,
188 user=ns.database_username, password=ns.database_password,
189 )
190
191 with db.transaction():
192 db.execute("""
193 -- announcements
194 CREATE TABLE IF NOT EXISTS announcements(network inet, autnum bigint,
195 first_seen_at timestamp without time zone DEFAULT CURRENT_TIMESTAMP,
196 last_seen_at timestamp without time zone DEFAULT CURRENT_TIMESTAMP);
197 CREATE UNIQUE INDEX IF NOT EXISTS announcements_networks ON announcements(network);
198 CREATE INDEX IF NOT EXISTS announcements_family ON announcements(family(network));
199 CREATE INDEX IF NOT EXISTS announcements_search ON announcements USING GIST(network inet_ops);
200
201 -- autnums
202 CREATE TABLE IF NOT EXISTS autnums(number bigint, name text NOT NULL);
203 ALTER TABLE autnums ADD COLUMN IF NOT EXISTS source text;
204 CREATE UNIQUE INDEX IF NOT EXISTS autnums_number ON autnums(number);
205
206 -- countries
207 CREATE TABLE IF NOT EXISTS countries(
208 country_code text NOT NULL, name text NOT NULL, continent_code text NOT NULL);
209 CREATE UNIQUE INDEX IF NOT EXISTS countries_country_code ON countries(country_code);
210
211 -- networks
212 CREATE TABLE IF NOT EXISTS networks(network inet, country text);
213 ALTER TABLE networks ADD COLUMN IF NOT EXISTS original_countries text[];
214 ALTER TABLE networks ADD COLUMN IF NOT EXISTS source text;
215 CREATE UNIQUE INDEX IF NOT EXISTS networks_network ON networks(network);
216 CREATE INDEX IF NOT EXISTS networks_family ON networks USING BTREE(family(network));
217 CREATE INDEX IF NOT EXISTS networks_search ON networks USING GIST(network inet_ops);
218
219 -- geofeeds
220 CREATE TABLE IF NOT EXISTS geofeeds(
221 id serial primary key,
222 url text,
223 status integer default null,
224 updated_at timestamp without time zone default null
225 );
226 ALTER TABLE geofeeds ADD COLUMN IF NOT EXISTS error text;
227 CREATE UNIQUE INDEX IF NOT EXISTS geofeeds_unique
228 ON geofeeds(url);
229 CREATE TABLE IF NOT EXISTS geofeed_networks(
230 geofeed_id integer references geofeeds(id) on delete cascade,
231 network inet,
232 country text,
233 region text,
234 city text
235 );
236 CREATE INDEX IF NOT EXISTS geofeed_networks_geofeed_id
237 ON geofeed_networks(geofeed_id);
238 CREATE INDEX IF NOT EXISTS geofeed_networks_search
239 ON geofeed_networks USING GIST(network inet_ops);
240 CREATE TABLE IF NOT EXISTS network_geofeeds(network inet, url text);
241 ALTER TABLE network_geofeeds ADD COLUMN IF NOT EXISTS source text NOT NULL;
242 CREATE UNIQUE INDEX IF NOT EXISTS network_geofeeds_unique
243 ON network_geofeeds(network);
244 CREATE INDEX IF NOT EXISTS network_geofeeds_search
245 ON network_geofeeds USING GIST(network inet_ops);
246 CREATE INDEX IF NOT EXISTS network_geofeeds_url
247 ON network_geofeeds(url);
248
249 -- feeds
250 CREATE TABLE IF NOT EXISTS autnum_feeds(
251 number bigint NOT NULL,
252 source text NOT NULL,
253 name text,
254 country text,
255 is_anonymous_proxy boolean,
256 is_satellite_provider boolean,
257 is_anycast boolean,
258 is_drop boolean
259 );
260 CREATE UNIQUE INDEX IF NOT EXISTS autnum_feeds_unique
261 ON autnum_feeds(number, source);
262
263 CREATE TABLE IF NOT EXISTS network_feeds(
264 network inet NOT NULL,
265 source text NOT NULL,
266 country text,
267 is_anonymous_proxy boolean,
268 is_satellite_provider boolean,
269 is_anycast boolean,
270 is_drop boolean
271 );
272 CREATE UNIQUE INDEX IF NOT EXISTS network_feeds_unique
273 ON network_feeds(network, source);
274 CREATE INDEX IF NOT EXISTS network_feeds_search
275 ON network_feeds USING GIST(network inet_ops);
276
277 -- overrides
278 CREATE TABLE IF NOT EXISTS autnum_overrides(
279 number bigint NOT NULL,
280 name text,
281 country text,
282 is_anonymous_proxy boolean,
283 is_satellite_provider boolean,
284 is_anycast boolean
285 );
286 CREATE UNIQUE INDEX IF NOT EXISTS autnum_overrides_number
287 ON autnum_overrides(number);
288 ALTER TABLE autnum_overrides ADD COLUMN IF NOT EXISTS is_drop boolean;
289 ALTER TABLE autnum_overrides DROP COLUMN IF EXISTS source;
290
291 CREATE TABLE IF NOT EXISTS network_overrides(
292 network inet NOT NULL,
293 country text,
294 is_anonymous_proxy boolean,
295 is_satellite_provider boolean,
296 is_anycast boolean
297 );
298 CREATE UNIQUE INDEX IF NOT EXISTS network_overrides_network
299 ON network_overrides(network);
300 CREATE INDEX IF NOT EXISTS network_overrides_search
301 ON network_overrides USING GIST(network inet_ops);
302 ALTER TABLE network_overrides ADD COLUMN IF NOT EXISTS is_drop boolean;
303 ALTER TABLE network_overrides DROP COLUMN IF EXISTS source;
304
305 CREATE TABLE IF NOT EXISTS geofeed_overrides(
306 url text NOT NULL
307 );
308 CREATE UNIQUE INDEX IF NOT EXISTS geofeed_overrides_url
309 ON geofeed_overrides(url);
310 """)
311
312 return db
313
314 def fetch_countries(self):
315 """
316 Returns a list of all countries on the list
317 """
318 # Fetch all valid country codes to check parsed networks aganist...
319 countries = self.db.query("SELECT country_code FROM countries ORDER BY country_code")
320
321 return set((country.country_code for country in countries))
322
323 def handle_write(self, ns):
324 """
325 Compiles a database in libloc format out of what is in the database
326 """
327 # Allocate a writer
328 writer = location.Writer(ns.signing_key, ns.backup_signing_key)
329
330 # Set all metadata
331 if ns.vendor:
332 writer.vendor = ns.vendor
333
334 if ns.description:
335 writer.description = ns.description
336
337 if ns.license:
338 writer.license = ns.license
339
340 # Add all Autonomous Systems
341 log.info("Writing Autonomous Systems...")
342
343 # Select all ASes with a name
344 rows = self.db.query("""
345 SELECT
346 autnums.number AS number,
347 COALESCE(
348 overrides.name,
349 autnums.name
350 ) AS name
351 FROM
352 autnums
353 LEFT JOIN
354 autnum_overrides overrides ON autnums.number = overrides.number
355 ORDER BY
356 autnums.number
357 """)
358
359 for row in rows:
360 # Skip AS without names
361 if not row.name:
362 continue
363
364 a = writer.add_as(row.number)
365 a.name = row.name
366
367 # Add all networks
368 log.info("Writing networks...")
369
370 # Select all known networks
371 rows = self.db.query("""
372 WITH known_networks AS (
373 SELECT network FROM announcements
374 UNION
375 SELECT network FROM networks
376 UNION
377 SELECT network FROM network_feeds
378 UNION
379 SELECT network FROM network_overrides
380 UNION
381 SELECT network FROM geofeed_networks
382 ),
383
384 ordered_networks AS (
385 SELECT
386 known_networks.network AS network,
387 announcements.autnum AS autnum,
388 networks.country AS country,
389
390 -- Must be part of returned values for ORDER BY clause
391 masklen(announcements.network) AS sort_a,
392 masklen(networks.network) AS sort_b
393 FROM
394 known_networks
395 LEFT JOIN
396 announcements ON known_networks.network <<= announcements.network
397 LEFT JOIN
398 networks ON known_networks.network <<= networks.network
399 ORDER BY
400 known_networks.network,
401 sort_a DESC,
402 sort_b DESC
403 )
404
405 -- Return a list of those networks enriched with all
406 -- other information that we store in the database
407 SELECT
408 DISTINCT ON (network)
409 network,
410 autnum,
411
412 -- Country
413 COALESCE(
414 (
415 SELECT
416 country
417 FROM
418 network_overrides overrides
419 WHERE
420 networks.network <<= overrides.network
421 ORDER BY
422 masklen(overrides.network) DESC
423 LIMIT 1
424 ),
425 (
426 SELECT
427 country
428 FROM
429 autnum_overrides overrides
430 WHERE
431 networks.autnum = overrides.number
432 ),
433 (
434 SELECT
435 country
436 FROM
437 network_feeds feeds
438 WHERE
439 networks.network <<= feeds.network
440 ORDER BY
441 masklen(feeds.network) DESC
442 LIMIT 1
443 ),
444 (
445 SELECT
446 country
447 FROM
448 autnum_feeds feeds
449 WHERE
450 networks.autnum = feeds.number
451 ORDER BY
452 source
453 LIMIT 1
454 ),
455 (
456 SELECT
457 geofeed_networks.country AS country
458 FROM
459 network_geofeeds
460
461 -- Join the data from the geofeeds
462 LEFT JOIN
463 geofeeds ON network_geofeeds.url = geofeeds.url
464 LEFT JOIN
465 geofeed_networks ON geofeeds.id = geofeed_networks.geofeed_id
466
467 -- Check whether we have a geofeed for this network
468 WHERE
469 networks.network <<= network_geofeeds.network
470 AND
471 networks.network <<= geofeed_networks.network
472
473 -- Filter for the best result
474 ORDER BY
475 masklen(geofeed_networks.network) DESC
476 LIMIT 1
477 ),
478 networks.country
479 ) AS country,
480
481 -- Flags
482 COALESCE(
483 (
484 SELECT
485 is_anonymous_proxy
486 FROM
487 network_overrides overrides
488 WHERE
489 networks.network <<= overrides.network
490 ORDER BY
491 masklen(overrides.network) DESC
492 LIMIT 1
493 ),
494 (
495 SELECT
496 is_anonymous_proxy
497 FROM
498 network_feeds feeds
499 WHERE
500 networks.network <<= feeds.network
501 ORDER BY
502 masklen(feeds.network) DESC
503 LIMIT 1
504 ),
505 (
506 SELECT
507 is_anonymous_proxy
508 FROM
509 autnum_feeds feeds
510 WHERE
511 networks.autnum = feeds.number
512 ORDER BY
513 source
514 LIMIT 1
515 ),
516 (
517 SELECT
518 is_anonymous_proxy
519 FROM
520 autnum_overrides overrides
521 WHERE
522 networks.autnum = overrides.number
523 ),
524 FALSE
525 ) AS is_anonymous_proxy,
526 COALESCE(
527 (
528 SELECT
529 is_satellite_provider
530 FROM
531 network_overrides overrides
532 WHERE
533 networks.network <<= overrides.network
534 ORDER BY
535 masklen(overrides.network) DESC
536 LIMIT 1
537 ),
538 (
539 SELECT
540 is_satellite_provider
541 FROM
542 network_feeds feeds
543 WHERE
544 networks.network <<= feeds.network
545 ORDER BY
546 masklen(feeds.network) DESC
547 LIMIT 1
548 ),
549 (
550 SELECT
551 is_satellite_provider
552 FROM
553 autnum_feeds feeds
554 WHERE
555 networks.autnum = feeds.number
556 ORDER BY
557 source
558 LIMIT 1
559 ),
560 (
561 SELECT
562 is_satellite_provider
563 FROM
564 autnum_overrides overrides
565 WHERE
566 networks.autnum = overrides.number
567 ),
568 FALSE
569 ) AS is_satellite_provider,
570 COALESCE(
571 (
572 SELECT
573 is_anycast
574 FROM
575 network_overrides overrides
576 WHERE
577 networks.network <<= overrides.network
578 ORDER BY
579 masklen(overrides.network) DESC
580 LIMIT 1
581 ),
582 (
583 SELECT
584 is_anycast
585 FROM
586 network_feeds feeds
587 WHERE
588 networks.network <<= feeds.network
589 ORDER BY
590 masklen(feeds.network) DESC
591 LIMIT 1
592 ),
593 (
594 SELECT
595 is_anycast
596 FROM
597 autnum_feeds feeds
598 WHERE
599 networks.autnum = feeds.number
600 ORDER BY
601 source
602 LIMIT 1
603 ),
604 (
605 SELECT
606 is_anycast
607 FROM
608 autnum_overrides overrides
609 WHERE
610 networks.autnum = overrides.number
611 ),
612 FALSE
613 ) AS is_anycast,
614 COALESCE(
615 (
616 SELECT
617 is_drop
618 FROM
619 network_overrides overrides
620 WHERE
621 networks.network <<= overrides.network
622 ORDER BY
623 masklen(overrides.network) DESC
624 LIMIT 1
625 ),
626 (
627 SELECT
628 is_drop
629 FROM
630 network_feeds feeds
631 WHERE
632 networks.network <<= feeds.network
633 ORDER BY
634 masklen(feeds.network) DESC
635 LIMIT 1
636 ),
637 (
638 SELECT
639 is_drop
640 FROM
641 autnum_feeds feeds
642 WHERE
643 networks.autnum = feeds.number
644 ORDER BY
645 source
646 LIMIT 1
647 ),
648 (
649 SELECT
650 is_drop
651 FROM
652 autnum_overrides overrides
653 WHERE
654 networks.autnum = overrides.number
655 ),
656 FALSE
657 ) AS is_drop
658 FROM
659 ordered_networks networks
660 """)
661
662 for row in rows:
663 network = writer.add_network(row.network)
664
665 # Save country
666 if row.country:
667 network.country_code = row.country
668
669 # Save ASN
670 if row.autnum:
671 network.asn = row.autnum
672
673 # Set flags
674 if row.is_anonymous_proxy:
675 network.set_flag(location.NETWORK_FLAG_ANONYMOUS_PROXY)
676
677 if row.is_satellite_provider:
678 network.set_flag(location.NETWORK_FLAG_SATELLITE_PROVIDER)
679
680 if row.is_anycast:
681 network.set_flag(location.NETWORK_FLAG_ANYCAST)
682
683 if row.is_drop:
684 network.set_flag(location.NETWORK_FLAG_DROP)
685
686 # Add all countries
687 log.info("Writing countries...")
688 rows = self.db.query("SELECT * FROM countries ORDER BY country_code")
689
690 for row in rows:
691 c = writer.add_country(row.country_code)
692 c.continent_code = row.continent_code
693 c.name = row.name
694
695 # Write everything to file
696 log.info("Writing database to file...")
697 for file in ns.file:
698 writer.write(file)
699
700 def handle_update_whois(self, ns):
701 # Did we run successfully?
702 success = True
703
704 sources = (
705 # African Network Information Centre
706 ("AFRINIC", (
707 (self._import_standard_format, "https://ftp.afrinic.net/pub/pub/dbase/afrinic.db.gz"),
708 )),
709
710 # Asia Pacific Network Information Centre
711 ("APNIC", (
712 (self._import_standard_format, "https://ftp.apnic.net/apnic/whois/apnic.db.inet6num.gz"),
713 (self._import_standard_format, "https://ftp.apnic.net/apnic/whois/apnic.db.inetnum.gz"),
714 (self._import_standard_format, "https://ftp.apnic.net/apnic/whois/apnic.db.aut-num.gz"),
715 (self._import_standard_format, "https://ftp.apnic.net/apnic/whois/apnic.db.organisation.gz"),
716 )),
717
718 # American Registry for Internet Numbers
719 ("ARIN", (
720 (self._import_extended_format, "https://ftp.arin.net/pub/stats/arin/delegated-arin-extended-latest"),
721 (self._import_arin_as_names, "https://ftp.arin.net/pub/resource_registry_service/asns.csv"),
722 )),
723
724 # Japan Network Information Center
725 ("JPNIC", (
726 (self._import_standard_format, "https://ftp.nic.ad.jp/jpirr/jpirr.db.gz"),
727 )),
728
729 # Latin America and Caribbean Network Information Centre
730 ("LACNIC", (
731 (self._import_standard_format, "https://ftp.lacnic.net/lacnic/dbase/lacnic.db.gz"),
732 (self._import_extended_format, "https://ftp.lacnic.net/pub/stats/lacnic/delegated-lacnic-extended-latest"),
733 )),
734
735 # Réseaux IP Européens
736 ("RIPE", (
737 (self._import_standard_format, "https://ftp.ripe.net/ripe/dbase/split/ripe.db.inet6num.gz"),
738 (self._import_standard_format, "https://ftp.ripe.net/ripe/dbase/split/ripe.db.inetnum.gz"),
739 (self._import_standard_format, "https://ftp.ripe.net/ripe/dbase/split/ripe.db.aut-num.gz"),
740 (self._import_standard_format, "https://ftp.ripe.net/ripe/dbase/split/ripe.db.organisation.gz"),
741 )),
742 )
743
744 # Fetch all valid country codes to check parsed networks against
745 countries = self.fetch_countries()
746
747 # Check if we have countries
748 if not countries:
749 log.error("Please import countries before importing any WHOIS data")
750 return 1
751
752 # Iterate over all potential sources
753 for name, feeds in sources:
754 # Skip anything that should not be updated
755 if ns.sources and not name in ns.sources:
756 continue
757
758 try:
759 self._process_source(name, feeds, countries)
760
761 # Log an error but continue if an exception occurs
762 except Exception as e:
763 log.error("Error processing source %s" % name, exc_info=True)
764 success = False
765
766 # Return a non-zero exit code for errors
767 return 0 if success else 1
768
769 def _process_source(self, source, feeds, countries):
770 """
771 This function processes one source
772 """
773 # Wrap everything into one large transaction
774 with self.db.transaction():
775 # Remove all previously imported content
776 self.db.execute("DELETE FROM autnums WHERE source = %s", source)
777 self.db.execute("DELETE FROM networks WHERE source = %s", source)
778 self.db.execute("DELETE FROM network_geofeeds WHERE source = %s", source)
779
780 # Create some temporary tables to store parsed data
781 self.db.execute("""
782 CREATE TEMPORARY TABLE _autnums(number integer NOT NULL,
783 organization text NOT NULL, source text NOT NULL) ON COMMIT DROP;
784 CREATE UNIQUE INDEX _autnums_number ON _autnums(number);
785
786 CREATE TEMPORARY TABLE _organizations(handle text NOT NULL,
787 name text NOT NULL, source text NOT NULL) ON COMMIT DROP;
788 CREATE UNIQUE INDEX _organizations_handle ON _organizations(handle);
789
790 CREATE TEMPORARY TABLE _rirdata(network inet NOT NULL, country text,
791 original_countries text[] NOT NULL, source text NOT NULL)
792 ON COMMIT DROP;
793 CREATE INDEX _rirdata_search ON _rirdata
794 USING BTREE(family(network), masklen(network));
795 CREATE UNIQUE INDEX _rirdata_network ON _rirdata(network);
796 """)
797
798 # Parse all feeds
799 for callback, url, *args in feeds:
800 # Retrieve the feed
801 f = self.downloader.retrieve(url)
802
803 # Call the callback
804 callback(source, countries, f, *args)
805
806 # Process all parsed networks from every RIR we happen to have access to,
807 # insert the largest network chunks into the networks table immediately...
808 families = self.db.query("""
809 SELECT DISTINCT
810 family(network) AS family
811 FROM
812 _rirdata
813 ORDER BY
814 family(network)
815 """,
816 )
817
818 for family in (row.family for row in families):
819 # Fetch the smallest mask length in our data set
820 smallest = self.db.get("""
821 SELECT
822 MIN(
823 masklen(network)
824 ) AS prefix
825 FROM
826 _rirdata
827 WHERE
828 family(network) = %s
829 """, family,
830 )
831
832 # Copy all networks
833 self.db.execute("""
834 INSERT INTO
835 networks
836 (
837 network,
838 country,
839 original_countries,
840 source
841 )
842 SELECT
843 network,
844 country,
845 original_countries,
846 source
847 FROM
848 _rirdata
849 WHERE
850 masklen(network) = %s
851 AND
852 family(network) = %s
853 ON CONFLICT DO
854 NOTHING""",
855 smallest.prefix,
856 family,
857 )
858
859 # ... determine any other prefixes for this network family, ...
860 prefixes = self.db.query("""
861 SELECT
862 DISTINCT masklen(network) AS prefix
863 FROM
864 _rirdata
865 WHERE
866 family(network) = %s
867 ORDER BY
868 masklen(network) ASC
869 OFFSET 1
870 """, family,
871 )
872
873 # ... and insert networks with this prefix in case they provide additional
874 # information (i. e. subnet of a larger chunk with a different country)
875 for prefix in (row.prefix for row in prefixes):
876 self.db.execute("""
877 WITH candidates AS (
878 SELECT
879 _rirdata.network,
880 _rirdata.country,
881 _rirdata.original_countries,
882 _rirdata.source
883 FROM
884 _rirdata
885 WHERE
886 family(_rirdata.network) = %s
887 AND
888 masklen(_rirdata.network) = %s
889 ),
890 filtered AS (
891 SELECT
892 DISTINCT ON (c.network)
893 c.network,
894 c.country,
895 c.original_countries,
896 c.source,
897 masklen(networks.network),
898 networks.country AS parent_country
899 FROM
900 candidates c
901 LEFT JOIN
902 networks
903 ON
904 c.network << networks.network
905 ORDER BY
906 c.network,
907 masklen(networks.network) DESC NULLS LAST
908 )
909 INSERT INTO
910 networks(network, country, original_countries, source)
911 SELECT
912 network,
913 country,
914 original_countries,
915 source
916 FROM
917 filtered
918 WHERE
919 parent_country IS NULL
920 OR
921 country <> parent_country
922 ON CONFLICT DO NOTHING
923 """, family, prefix,
924 )
925
926 self.db.execute("""
927 INSERT INTO
928 autnums
929 (
930 number,
931 name,
932 source
933 )
934 SELECT
935 _autnums.number,
936 _organizations.name,
937 _organizations.source
938 FROM
939 _autnums
940 JOIN
941 _organizations ON _autnums.organization = _organizations.handle
942 ON CONFLICT
943 (
944 number
945 )
946 DO UPDATE
947 SET name = excluded.name
948 """,
949 )
950
951 def _import_standard_format(self, source, countries, f, *args):
952 """
953 Imports a single standard format source feed
954 """
955 # Iterate over all blocks
956 for block in iterate_over_blocks(f):
957 self._parse_block(block, source, countries)
958
959 def _import_extended_format(self, source, countries, f, *args):
960 # Iterate over all lines
961 for line in iterate_over_lines(f):
962 self._parse_line(block, source, countries)
963
964 def _import_arin_as_names(self, source, countries, f, *args):
965 # Walk through the file
966 for line in csv.DictReader(feed, dialect="arin"):
967 log.debug("Processing object: %s" % line)
968
969 # Fetch status
970 status = line.get("Status")
971
972 # We are only interested in anything managed by ARIN
973 if not status == "Full Registry Services":
974 continue
975
976 # Fetch organization name
977 name = line.get("Org Name")
978
979 # Extract ASNs
980 first_asn = line.get("Start AS Number")
981 last_asn = line.get("End AS Number")
982
983 # Cast to a number
984 try:
985 first_asn = int(first_asn)
986 except TypeError as e:
987 log.warning("Could not parse ASN '%s'" % first_asn)
988 continue
989
990 try:
991 last_asn = int(last_asn)
992 except TypeError as e:
993 log.warning("Could not parse ASN '%s'" % last_asn)
994 continue
995
996 # Check if the range is valid
997 if last_asn < first_asn:
998 log.warning("Invalid ASN range %s-%s" % (first_asn, last_asn))
999
1000 # Insert everything into the database
1001 for asn in range(first_asn, last_asn + 1):
1002 if not self._check_parsed_asn(asn):
1003 log.warning("Skipping invalid ASN %s" % asn)
1004 continue
1005
1006 self.db.execute("""
1007 INSERT INTO
1008 autnums
1009 (
1010 number,
1011 name,
1012 source
1013 )
1014 VALUES
1015 (
1016 %s, %s, %s
1017 )
1018 ON CONFLICT
1019 (
1020 number
1021 )
1022 DO NOTHING
1023 """, asn, name, "ARIN",
1024 )
1025
1026 def _check_parsed_network(self, network):
1027 """
1028 Assistive function to detect and subsequently sort out parsed
1029 networks from RIR data (both Whois and so-called "extended sources"),
1030 which are or have...
1031
1032 (a) not globally routable (RFC 1918 space, et al.)
1033 (b) covering a too large chunk of the IP address space (prefix length
1034 is < 7 for IPv4 networks, and < 10 for IPv6)
1035 (c) "0.0.0.0" or "::" as a network address
1036
1037 This unfortunately is necessary due to brain-dead clutter across
1038 various RIR databases, causing mismatches and eventually disruptions.
1039
1040 We will return False in case a network is not suitable for adding
1041 it to our database, and True otherwise.
1042 """
1043 # Check input
1044 if isinstance(network, ipaddress.IPv6Network):
1045 pass
1046 elif isinstance(network, ipaddress.IPv4Network):
1047 pass
1048 else:
1049 raise ValueError("Invalid network: %s (type %s)" % (network, type(network)))
1050
1051 # Ignore anything that isn't globally routable
1052 if not network.is_global:
1053 log.debug("Skipping non-globally routable network: %s" % network)
1054 return False
1055
1056 # Ignore anything that is unspecified IP range (See RFC 5735 for IPv4 or RFC 2373 for IPv6)
1057 elif network.is_unspecified:
1058 log.debug("Skipping unspecified network: %s" % network)
1059 return False
1060
1061 # IPv6
1062 if network.version == 6:
1063 if network.prefixlen < 10:
1064 log.debug("Skipping too big IP chunk: %s" % network)
1065 return False
1066
1067 # IPv4
1068 elif network.version == 4:
1069 if network.prefixlen < 7:
1070 log.debug("Skipping too big IP chunk: %s" % network)
1071 return False
1072
1073 # In case we have made it here, the network is considered to
1074 # be suitable for libloc consumption...
1075 return True
1076
1077 def _check_parsed_asn(self, asn):
1078 """
1079 Assistive function to filter Autonomous System Numbers not being suitable
1080 for adding to our database. Returns False in such cases, and True otherwise.
1081 """
1082
1083 for start, end in VALID_ASN_RANGES:
1084 if start <= asn and end >= asn:
1085 return True
1086
1087 log.info("Supplied ASN %s out of publicly routable ASN ranges" % asn)
1088 return False
1089
1090 def _parse_block(self, block, source_key, countries):
1091 # Get first line to find out what type of block this is
1092 line = block[0]
1093
1094 # aut-num
1095 if line.startswith("aut-num:"):
1096 return self._parse_autnum_block(block, source_key)
1097
1098 # inetnum
1099 if line.startswith("inet6num:") or line.startswith("inetnum:"):
1100 return self._parse_inetnum_block(block, source_key, countries)
1101
1102 # organisation
1103 elif line.startswith("organisation:"):
1104 return self._parse_org_block(block, source_key)
1105
1106 def _parse_autnum_block(self, block, source_key):
1107 autnum = {}
1108 for line in block:
1109 # Split line
1110 key, val = split_line(line)
1111
1112 if key == "aut-num":
1113 m = re.match(r"^(AS|as)(\d+)", val)
1114 if m:
1115 autnum["asn"] = m.group(2)
1116
1117 elif key == "org":
1118 autnum[key] = val.upper()
1119
1120 elif key == "descr":
1121 # Save the first description line as well...
1122 if not key in autnum:
1123 autnum[key] = val
1124
1125 # Skip empty objects
1126 if not autnum or not "asn" in autnum:
1127 return
1128
1129 # Insert a dummy organisation handle into our temporary organisations
1130 # table in case the AS does not have an organisation handle set, but
1131 # has a description (a quirk often observed in APNIC area), so we can
1132 # later display at least some string for this AS.
1133 if not "org" in autnum:
1134 if "descr" in autnum:
1135 autnum["org"] = "LIBLOC-%s-ORGHANDLE" % autnum.get("asn")
1136
1137 self.db.execute("INSERT INTO _organizations(handle, name, source) \
1138 VALUES(%s, %s, %s) ON CONFLICT (handle) DO NOTHING",
1139 autnum.get("org"), autnum.get("descr"), source_key,
1140 )
1141 else:
1142 log.warning("ASN %s neither has an organisation handle nor a description line set, omitting" % \
1143 autnum.get("asn"))
1144 return
1145
1146 # Insert into database
1147 self.db.execute("INSERT INTO _autnums(number, organization, source) \
1148 VALUES(%s, %s, %s) ON CONFLICT (number) DO UPDATE SET \
1149 organization = excluded.organization",
1150 autnum.get("asn"), autnum.get("org"), source_key,
1151 )
1152
1153 def _parse_inetnum_block(self, block, source_key, countries):
1154 inetnum = {}
1155 for line in block:
1156 # Split line
1157 key, val = split_line(line)
1158
1159 # Filter any inetnum records which are only referring to IP space
1160 # not managed by that specific RIR...
1161 if key == "netname":
1162 if re.match(r"^(ERX-NETBLOCK|(AFRINIC|ARIN|LACNIC|RIPE)-CIDR-BLOCK|IANA-NETBLOCK-\d{1,3}|NON-RIPE-NCC-MANAGED-ADDRESS-BLOCK|STUB-[\d-]{3,}SLASH\d{1,2})", val.strip()):
1163 log.debug("Skipping record indicating historic/orphaned data: %s" % val.strip())
1164 return
1165
1166 if key == "inetnum":
1167 start_address, delim, end_address = val.partition("-")
1168
1169 # Strip any excess space
1170 start_address, end_address = start_address.rstrip(), end_address.strip()
1171
1172 # Handle "inetnum" formatting in LACNIC DB (e.g. "24.152.8/22" instead of "24.152.8.0/22")
1173 if start_address and not (delim or end_address):
1174 try:
1175 start_address = ipaddress.ip_network(start_address, strict=False)
1176 except ValueError:
1177 start_address = start_address.split("/")
1178 ldigits = start_address[0].count(".")
1179
1180 # How many octets do we need to add?
1181 # (LACNIC does not seem to have a /8 or greater assigned, so the following should suffice.)
1182 if ldigits == 1:
1183 start_address = start_address[0] + ".0.0/" + start_address[1]
1184 elif ldigits == 2:
1185 start_address = start_address[0] + ".0/" + start_address[1]
1186 else:
1187 log.warning("Could not recover IPv4 address from line in LACNIC DB format: %s" % line)
1188 return
1189
1190 try:
1191 start_address = ipaddress.ip_network(start_address, strict=False)
1192 except ValueError:
1193 log.warning("Could not parse line in LACNIC DB format: %s" % line)
1194 return
1195
1196 # Enumerate first and last IP address of this network
1197 end_address = start_address[-1]
1198 start_address = start_address[0]
1199
1200 else:
1201 # Convert to IP address
1202 try:
1203 start_address = ipaddress.ip_address(start_address)
1204 end_address = ipaddress.ip_address(end_address)
1205 except ValueError:
1206 log.warning("Could not parse line: %s" % line)
1207 return
1208
1209 inetnum["inetnum"] = list(ipaddress.summarize_address_range(start_address, end_address))
1210
1211 elif key == "inet6num":
1212 inetnum[key] = [ipaddress.ip_network(val, strict=False)]
1213
1214 elif key == "country":
1215 cc = val.upper()
1216
1217 # Ignore certain country codes
1218 if cc in IGNORED_COUNTRIES:
1219 log.debug("Ignoring country code '%s'" % cc)
1220 continue
1221
1222 # Translate country codes
1223 try:
1224 cc = TRANSLATED_COUNTRIES[cc]
1225 except KeyError:
1226 pass
1227
1228 # Do we know this country?
1229 if not cc in countries:
1230 log.warning("Skipping invalid country code '%s'" % cc)
1231 continue
1232
1233 try:
1234 inetnum[key].append(cc)
1235 except KeyError:
1236 inetnum[key] = [cc]
1237
1238 # Parse the geofeed attribute
1239 elif key == "geofeed":
1240 inetnum["geofeed"] = val
1241
1242 # Parse geofeed when used as a remark
1243 elif key == "remarks":
1244 m = re.match(r"^(?:Geofeed)\s+(https://.*)", val)
1245 if m:
1246 inetnum["geofeed"] = m.group(1)
1247
1248 # Skip empty objects
1249 if not inetnum:
1250 return
1251
1252 # Iterate through all networks enumerated from above, check them for plausibility and insert
1253 # them into the database, if _check_parsed_network() succeeded
1254 for single_network in inetnum.get("inet6num") or inetnum.get("inetnum"):
1255 if not self._check_parsed_network(single_network):
1256 continue
1257
1258 # Fetch the countries or use a list with an empty country
1259 countries = inetnum.get("country", [None])
1260
1261 # Insert the network into the database but only use the first country code
1262 for cc in countries:
1263 self.db.execute("""
1264 INSERT INTO
1265 _rirdata
1266 (
1267 network,
1268 country,
1269 original_countries,
1270 source
1271 )
1272 VALUES
1273 (
1274 %s, %s, %s, %s
1275 )
1276 ON CONFLICT (network)
1277 DO UPDATE SET country = excluded.country
1278 """, "%s" % single_network, cc, [cc for cc in countries if cc], source_key,
1279 )
1280
1281 # If there are more than one country, we will only use the first one
1282 break
1283
1284 # Update any geofeed information
1285 geofeed = inetnum.get("geofeed", None)
1286 if geofeed:
1287 self._parse_geofeed(source_key, geofeed, single_network)
1288
1289 def _parse_geofeed(self, source, url, single_network):
1290 # Parse the URL
1291 url = urllib.parse.urlparse(url)
1292
1293 # Make sure that this is a HTTPS URL
1294 if not url.scheme == "https":
1295 log.debug("Geofeed URL is not using HTTPS: %s" % geofeed)
1296 return
1297
1298 # Put the URL back together normalized
1299 url = url.geturl()
1300
1301 # Store/update any geofeeds
1302 self.db.execute("""
1303 INSERT INTO
1304 network_geofeeds
1305 (
1306 network,
1307 url,
1308 source
1309 )
1310 VALUES
1311 (
1312 %s, %s, %s
1313 )
1314 ON CONFLICT (network) DO
1315 UPDATE SET url = excluded.url""",
1316 "%s" % single_network, url, source,
1317 )
1318
1319 def _parse_org_block(self, block, source_key):
1320 org = {}
1321 for line in block:
1322 # Split line
1323 key, val = split_line(line)
1324
1325 if key == "organisation":
1326 org[key] = val.upper()
1327 elif key == "org-name":
1328 org[key] = val
1329
1330 # Skip empty objects
1331 if not org:
1332 return
1333
1334 self.db.execute("INSERT INTO _organizations(handle, name, source) \
1335 VALUES(%s, %s, %s) ON CONFLICT (handle) DO \
1336 UPDATE SET name = excluded.name",
1337 org.get("organisation"), org.get("org-name"), source_key,
1338 )
1339
1340 def _parse_line(self, line, source_key, validcountries=None):
1341 # Skip version line
1342 if line.startswith("2"):
1343 return
1344
1345 # Skip comments
1346 if line.startswith("#"):
1347 return
1348
1349 try:
1350 registry, country_code, type, line = line.split("|", 3)
1351 except:
1352 log.warning("Could not parse line: %s" % line)
1353 return
1354
1355 # Skip any unknown protocols
1356 if not type in ("ipv6", "ipv4"):
1357 log.warning("Unknown IP protocol '%s'" % type)
1358 return
1359
1360 # Skip any lines that are for stats only or do not have a country
1361 # code at all (avoids log spam below)
1362 if not country_code or country_code == '*':
1363 return
1364
1365 # Skip objects with unknown country codes
1366 if validcountries and country_code not in validcountries:
1367 log.warning("Skipping line with bogus country '%s': %s" % \
1368 (country_code, line))
1369 return
1370
1371 try:
1372 address, prefix, date, status, organization = line.split("|")
1373 except ValueError:
1374 organization = None
1375
1376 # Try parsing the line without organization
1377 try:
1378 address, prefix, date, status = line.split("|")
1379 except ValueError:
1380 log.warning("Unhandled line format: %s" % line)
1381 return
1382
1383 # Skip anything that isn't properly assigned
1384 if not status in ("assigned", "allocated"):
1385 return
1386
1387 # Cast prefix into an integer
1388 try:
1389 prefix = int(prefix)
1390 except:
1391 log.warning("Invalid prefix: %s" % prefix)
1392 return
1393
1394 # Fix prefix length for IPv4
1395 if type == "ipv4":
1396 prefix = 32 - int(math.log(prefix, 2))
1397
1398 # Try to parse the address
1399 try:
1400 network = ipaddress.ip_network("%s/%s" % (address, prefix), strict=False)
1401 except ValueError:
1402 log.warning("Invalid IP address: %s" % address)
1403 return
1404
1405 if not self._check_parsed_network(network):
1406 return
1407
1408 self.db.execute("""
1409 INSERT INTO
1410 networks
1411 (
1412 network,
1413 country,
1414 original_countries,
1415 source
1416 )
1417 VALUES
1418 (
1419 %s, %s, %s, %s
1420 )
1421 ON CONFLICT (network)
1422 DO UPDATE SET country = excluded.country
1423 """, "%s" % network, country_code, [country], source_key,
1424 )
1425
1426 def handle_update_announcements(self, ns):
1427 server = ns.server[0]
1428
1429 with self.db.transaction():
1430 if server.startswith("/"):
1431 self._handle_update_announcements_from_bird(server)
1432
1433 # Purge anything we never want here
1434 self.db.execute("""
1435 -- Delete default routes
1436 DELETE FROM announcements WHERE network = '::/0' OR network = '0.0.0.0/0';
1437
1438 -- Delete anything that is not global unicast address space
1439 DELETE FROM announcements WHERE family(network) = 6 AND NOT network <<= '2000::/3';
1440
1441 -- DELETE "current network" address space
1442 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '0.0.0.0/8';
1443
1444 -- DELETE local loopback address space
1445 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '127.0.0.0/8';
1446
1447 -- DELETE RFC 1918 address space
1448 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '10.0.0.0/8';
1449 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '172.16.0.0/12';
1450 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '192.168.0.0/16';
1451
1452 -- DELETE test, benchmark and documentation address space
1453 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '192.0.0.0/24';
1454 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '192.0.2.0/24';
1455 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '198.18.0.0/15';
1456 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '198.51.100.0/24';
1457 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '203.0.113.0/24';
1458
1459 -- DELETE CGNAT address space (RFC 6598)
1460 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '100.64.0.0/10';
1461
1462 -- DELETE link local address space
1463 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '169.254.0.0/16';
1464
1465 -- DELETE IPv6 to IPv4 (6to4) address space (RFC 3068)
1466 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '192.88.99.0/24';
1467 DELETE FROM announcements WHERE family(network) = 6 AND network <<= '2002::/16';
1468
1469 -- DELETE multicast and reserved address space
1470 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '224.0.0.0/4';
1471 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '240.0.0.0/4';
1472
1473 -- Delete networks that are too small to be in the global routing table
1474 DELETE FROM announcements WHERE family(network) = 6 AND masklen(network) > 48;
1475 DELETE FROM announcements WHERE family(network) = 4 AND masklen(network) > 24;
1476
1477 -- Delete any non-public or reserved ASNs
1478 DELETE FROM announcements WHERE NOT (
1479 (autnum >= 1 AND autnum <= 23455)
1480 OR
1481 (autnum >= 23457 AND autnum <= 64495)
1482 OR
1483 (autnum >= 131072 AND autnum <= 4199999999)
1484 );
1485
1486 -- Delete everything that we have not seen for 14 days
1487 DELETE FROM announcements WHERE last_seen_at <= CURRENT_TIMESTAMP - INTERVAL '14 days';
1488 """)
1489
1490 def _handle_update_announcements_from_bird(self, server):
1491 # Pre-compile the regular expression for faster searching
1492 route = re.compile(b"^\s(.+?)\s+.+?\[(?:AS(.*?))?.\]$")
1493
1494 log.info("Requesting routing table from Bird (%s)" % server)
1495
1496 aggregated_networks = []
1497
1498 # Send command to list all routes
1499 for line in self._bird_cmd(server, "show route"):
1500 m = route.match(line)
1501 if not m:
1502 # Skip empty lines
1503 if not line:
1504 pass
1505
1506 # Ignore any header lines with the name of the routing table
1507 elif line.startswith(b"Table"):
1508 pass
1509
1510 # Log anything else
1511 else:
1512 log.debug("Could not parse line: %s" % line.decode())
1513
1514 continue
1515
1516 # Fetch the extracted network and ASN
1517 network, autnum = m.groups()
1518
1519 # Decode into strings
1520 if network:
1521 network = network.decode()
1522 if autnum:
1523 autnum = autnum.decode()
1524
1525 # Collect all aggregated networks
1526 if not autnum:
1527 log.debug("%s is an aggregated network" % network)
1528 aggregated_networks.append(network)
1529 continue
1530
1531 # Insert it into the database
1532 self.db.execute("INSERT INTO announcements(network, autnum) \
1533 VALUES(%s, %s) ON CONFLICT (network) DO \
1534 UPDATE SET autnum = excluded.autnum, last_seen_at = CURRENT_TIMESTAMP",
1535 network, autnum,
1536 )
1537
1538 # Process any aggregated networks
1539 for network in aggregated_networks:
1540 log.debug("Processing aggregated network %s" % network)
1541
1542 # Run "show route all" for each network
1543 for line in self._bird_cmd(server, "show route %s all" % network):
1544 # Try finding the path
1545 m = re.match(b"\s+BGP\.as_path:.* (\d+) {\d+}$", line)
1546 if m:
1547 # Select the last AS number in the path
1548 autnum = m.group(1).decode()
1549
1550 # Insert it into the database
1551 self.db.execute("INSERT INTO announcements(network, autnum) \
1552 VALUES(%s, %s) ON CONFLICT (network) DO \
1553 UPDATE SET autnum = excluded.autnum, last_seen_at = CURRENT_TIMESTAMP",
1554 network, autnum,
1555 )
1556
1557 # We don't need to process any more
1558 break
1559
1560 def _bird_cmd(self, socket_path, command):
1561 # Connect to the socket
1562 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1563 s.connect(socket_path)
1564
1565 # Allocate some buffer
1566 buffer = b""
1567
1568 log.debug("Sending Bird command: %s" % command)
1569
1570 # Send the command
1571 s.send(b"%s\n" % command.encode())
1572
1573 while True:
1574 # Fill up the buffer
1575 buffer += s.recv(4096)
1576
1577 while True:
1578 # Search for the next newline
1579 pos = buffer.find(b"\n")
1580
1581 # If we cannot find one, we go back and read more data
1582 if pos <= 0:
1583 break
1584
1585 # Cut after the newline character
1586 pos += 1
1587
1588 # Split the line we want and keep the rest in buffer
1589 line, buffer = buffer[:pos], buffer[pos:]
1590
1591 # Try parsing any status lines
1592 if len(line) > 4 and line[:4].isdigit() and line[4] in (32, 45):
1593 code, delim, line = int(line[:4]), line[4], line[5:]
1594
1595 log.debug("Received response code %s from bird" % code)
1596
1597 # End of output
1598 if code == 0:
1599 return
1600
1601 # Ignore hello line
1602 elif code == 1:
1603 continue
1604
1605 # Otherwise return the line
1606 yield line
1607
1608 def handle_update_geofeeds(self, ns):
1609 # Sync geofeeds
1610 with self.db.transaction():
1611 # Delete all geofeeds which are no longer linked
1612 self.db.execute("""
1613 DELETE FROM
1614 geofeeds
1615 WHERE
1616 geofeeds.url NOT IN (
1617 SELECT
1618 network_geofeeds.url
1619 FROM
1620 network_geofeeds
1621
1622 UNION
1623
1624 SELECT
1625 geofeed_overrides.url
1626 FROM
1627 geofeed_overrides
1628 )
1629 """,
1630 )
1631
1632 # Copy all geofeeds
1633 self.db.execute("""
1634 WITH all_geofeeds AS (
1635 SELECT
1636 network_geofeeds.url
1637 FROM
1638 network_geofeeds
1639
1640 UNION
1641
1642 SELECT
1643 geofeed_overrides.url
1644 FROM
1645 geofeed_overrides
1646 )
1647 INSERT INTO
1648 geofeeds
1649 (
1650 url
1651 )
1652 SELECT
1653 url
1654 FROM
1655 all_geofeeds
1656 ON CONFLICT (url)
1657 DO NOTHING
1658 """,
1659 )
1660
1661 # Fetch all Geofeeds that require an update
1662 geofeeds = self.db.query("""
1663 SELECT
1664 id,
1665 url
1666 FROM
1667 geofeeds
1668 WHERE
1669 updated_at IS NULL
1670 OR
1671 updated_at <= CURRENT_TIMESTAMP - INTERVAL '1 week'
1672 ORDER BY
1673 id
1674 """)
1675
1676 # Update all geofeeds
1677 for geofeed in geofeeds:
1678 with self.db.transaction():
1679 self._fetch_geofeed(geofeed)
1680
1681 # Delete data from any feeds that did not update in the last two weeks
1682 with self.db.transaction():
1683 self.db.execute("""
1684 DELETE FROM
1685 geofeed_networks
1686 WHERE
1687 geofeed_networks.geofeed_id IN (
1688 SELECT
1689 geofeeds.id
1690 FROM
1691 geofeeds
1692 WHERE
1693 updated_at IS NULL
1694 OR
1695 updated_at <= CURRENT_TIMESTAMP - INTERVAL '2 weeks'
1696 )
1697 """)
1698
1699 def _fetch_geofeed(self, geofeed):
1700 log.debug("Fetching Geofeed %s" % geofeed.url)
1701
1702 with self.db.transaction():
1703 # Open the URL
1704 try:
1705 # Send the request
1706 f = self.downloader.retrieve(geofeed.url, headers={
1707 "User-Agent" : "location/%s" % location.__version__,
1708
1709 # We expect some plain text file in CSV format
1710 "Accept" : "text/csv, text/plain",
1711 })
1712
1713 # Remove any previous data
1714 self.db.execute("DELETE FROM geofeed_networks \
1715 WHERE geofeed_id = %s", geofeed.id)
1716
1717 lineno = 0
1718
1719 # Read the output line by line
1720 for line in f:
1721 lineno += 1
1722
1723 try:
1724 line = line.decode()
1725
1726 # Ignore any lines we cannot decode
1727 except UnicodeDecodeError:
1728 log.debug("Could not decode line %s in %s" \
1729 % (lineno, geofeed.url))
1730 continue
1731
1732 # Strip any newline
1733 line = line.rstrip()
1734
1735 # Skip empty lines
1736 if not line:
1737 continue
1738
1739 # Try to parse the line
1740 try:
1741 fields = line.split(",", 5)
1742 except ValueError:
1743 log.debug("Could not parse line: %s" % line)
1744 continue
1745
1746 # Check if we have enough fields
1747 if len(fields) < 4:
1748 log.debug("Not enough fields in line: %s" % line)
1749 continue
1750
1751 # Fetch all fields
1752 network, country, region, city, = fields[:4]
1753
1754 # Try to parse the network
1755 try:
1756 network = ipaddress.ip_network(network, strict=False)
1757 except ValueError:
1758 log.debug("Could not parse network: %s" % network)
1759 continue
1760
1761 # Strip any excess whitespace from country codes
1762 country = country.strip()
1763
1764 # Make the country code uppercase
1765 country = country.upper()
1766
1767 # Check the country code
1768 if not country:
1769 log.debug("Empty country code in Geofeed %s line %s" \
1770 % (geofeed.url, lineno))
1771 continue
1772
1773 elif not location.country_code_is_valid(country):
1774 log.debug("Invalid country code in Geofeed %s:%s: %s" \
1775 % (geofeed.url, lineno, country))
1776 continue
1777
1778 # Write this into the database
1779 self.db.execute("""
1780 INSERT INTO
1781 geofeed_networks (
1782 geofeed_id,
1783 network,
1784 country,
1785 region,
1786 city
1787 )
1788 VALUES (%s, %s, %s, %s, %s)""",
1789 geofeed.id,
1790 "%s" % network,
1791 country,
1792 region,
1793 city,
1794 )
1795
1796 # Catch any HTTP errors
1797 except urllib.request.HTTPError as e:
1798 self.db.execute("UPDATE geofeeds SET status = %s, error = %s \
1799 WHERE id = %s", e.code, "%s" % e, geofeed.id)
1800
1801 # Remove any previous data when the feed has been deleted
1802 if e.code == 404:
1803 self.db.execute("DELETE FROM geofeed_networks \
1804 WHERE geofeed_id = %s", geofeed.id)
1805
1806 # Catch any other errors and connection timeouts
1807 except (http.client.InvalidURL, urllib.request.URLError, TimeoutError) as e:
1808 log.debug("Could not fetch URL %s: %s" % (geofeed.url, e))
1809
1810 self.db.execute("UPDATE geofeeds SET status = %s, error = %s \
1811 WHERE id = %s", 599, "%s" % e, geofeed.id)
1812
1813 # Mark the geofeed as updated
1814 else:
1815 self.db.execute("""
1816 UPDATE
1817 geofeeds
1818 SET
1819 updated_at = CURRENT_TIMESTAMP,
1820 status = NULL,
1821 error = NULL
1822 WHERE
1823 id = %s""",
1824 geofeed.id,
1825 )
1826
1827 def handle_update_overrides(self, ns):
1828 with self.db.transaction():
1829 # Drop any previous content
1830 self.db.execute("TRUNCATE TABLE autnum_overrides")
1831 self.db.execute("TRUNCATE TABLE geofeed_overrides")
1832 self.db.execute("TRUNCATE TABLE network_overrides")
1833
1834 for file in ns.files:
1835 log.info("Reading %s..." % file)
1836
1837 with open(file, "rb") as f:
1838 for type, block in read_blocks(f):
1839 if type == "net":
1840 network = block.get("net")
1841 # Try to parse and normalise the network
1842 try:
1843 network = ipaddress.ip_network(network, strict=False)
1844 except ValueError as e:
1845 log.warning("Invalid IP network: %s: %s" % (network, e))
1846 continue
1847
1848 # Prevent that we overwrite all networks
1849 if network.prefixlen == 0:
1850 log.warning("Skipping %s: You cannot overwrite default" % network)
1851 continue
1852
1853 self.db.execute("""
1854 INSERT INTO
1855 network_overrides
1856 (
1857 network,
1858 country,
1859 is_anonymous_proxy,
1860 is_satellite_provider,
1861 is_anycast,
1862 is_drop
1863 )
1864 VALUES
1865 (
1866 %s, %s, %s, %s, %s, %s
1867 )
1868 ON CONFLICT (network) DO NOTHING
1869 """,
1870 "%s" % network,
1871 block.get("country"),
1872 self._parse_bool(block, "is-anonymous-proxy"),
1873 self._parse_bool(block, "is-satellite-provider"),
1874 self._parse_bool(block, "is-anycast"),
1875 self._parse_bool(block, "drop"),
1876 )
1877
1878 elif type == "aut-num":
1879 autnum = block.get("aut-num")
1880
1881 # Check if AS number begins with "AS"
1882 if not autnum.startswith("AS"):
1883 log.warning("Invalid AS number: %s" % autnum)
1884 continue
1885
1886 # Strip "AS"
1887 autnum = autnum[2:]
1888
1889 self.db.execute("""
1890 INSERT INTO
1891 autnum_overrides
1892 (
1893 number,
1894 name,
1895 country,
1896 is_anonymous_proxy,
1897 is_satellite_provider,
1898 is_anycast,
1899 is_drop
1900 )
1901 VALUES
1902 (
1903 %s, %s, %s, %s, %s, %s, %s
1904 )
1905 ON CONFLICT (number) DO NOTHING
1906 """,
1907 autnum,
1908 block.get("name"),
1909 block.get("country"),
1910 self._parse_bool(block, "is-anonymous-proxy"),
1911 self._parse_bool(block, "is-satellite-provider"),
1912 self._parse_bool(block, "is-anycast"),
1913 self._parse_bool(block, "drop"),
1914 )
1915
1916 # Geofeeds
1917 elif type == "geofeed":
1918 url = block.get("geofeed")
1919
1920 # XXX Check the URL
1921
1922 self.db.execute("""
1923 INSERT INTO
1924 geofeed_overrides
1925 (
1926 url
1927 )
1928 VALUES
1929 (
1930 %s
1931 )
1932 ON CONFLICT (url) DO NOTHING
1933 """, url,
1934 )
1935
1936 else:
1937 log.warning("Unsupported type: %s" % type)
1938
1939 def handle_update_feeds(self, ns):
1940 """
1941 Update any third-party feeds
1942 """
1943 success = True
1944
1945 feeds = (
1946 # AWS IP Ranges
1947 ("AWS-IP-RANGES", self._import_aws_ip_ranges, "https://ip-ranges.amazonaws.com/ip-ranges.json"),
1948
1949 # Spamhaus DROP
1950 ("SPAMHAUS-DROP", self._import_spamhaus_drop, "https://www.spamhaus.org/drop/drop.txt"),
1951 ("SPAMHAUS-EDROP", self._import_spamhaus_drop, "https://www.spamhaus.org/drop/edrop.txt"),
1952 ("SPAMHAUS-DROPV6", self._import_spamhaus_drop, "https://www.spamhaus.org/drop/dropv6.txt"),
1953
1954 # Spamhaus ASNDROP
1955 ("SPAMHAUS-ASNDROP", self._import_spamhaus_asndrop, "https://www.spamhaus.org/drop/asndrop.json"),
1956 )
1957
1958 # Drop any data from feeds that we don't support (any more)
1959 with self.db.transaction():
1960 # Fetch the names of all feeds we support
1961 sources = [name for name, *rest in feeds]
1962
1963 self.db.execute("DELETE FROM autnum_feeds WHERE NOT source = ANY(%s)", sources)
1964 self.db.execute("DELETE FROM network_feeds WHERE NOT source = ANY(%s)", sources)
1965
1966 # Walk through all feeds
1967 for name, callback, url, *args in feeds:
1968 # Skip any feeds that were not requested on the command line
1969 if ns.feeds and not name in ns.feeds:
1970 continue
1971
1972 try:
1973 self._process_feed(name, callback, url, *args)
1974
1975 # Log an error but continue if an exception occurs
1976 except Exception as e:
1977 log.error("Error processing feed '%s': %s" % (name, e))
1978 success = False
1979
1980 # Return status
1981 return 0 if success else 1
1982
1983 def _process_feed(self, name, callback, url, *args):
1984 """
1985 Processes one feed
1986 """
1987 # Open the URL
1988 f = self.downloader.retrieve(url)
1989
1990 with self.db.transaction():
1991 # Drop any previous content
1992 self.db.execute("DELETE FROM autnum_feeds WHERE source = %s", name)
1993 self.db.execute("DELETE FROM network_feeds WHERE source = %s", name)
1994
1995 # Call the callback to process the feed
1996 return callback(name, f, *args)
1997
1998 def _import_aws_ip_ranges(self, name, f):
1999 # Parse the feed
2000 feed = json.load(f)
2001
2002 # Set up a dictionary for mapping a region name to a country. Unfortunately,
2003 # there seems to be no machine-readable version available of this other than
2004 # https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html
2005 # (worse, it seems to be incomplete :-/ ); https://www.cloudping.cloud/endpoints
2006 # was helpful here as well.
2007 aws_region_country_map = {
2008 # Africa
2009 "af-south-1" : "ZA",
2010
2011 # Asia
2012 "il-central-1" : "IL", # Tel Aviv
2013
2014 # Asia/Pacific
2015 "ap-northeast-1" : "JP",
2016 "ap-northeast-2" : "KR",
2017 "ap-northeast-3" : "JP",
2018 "ap-east-1" : "HK",
2019 "ap-south-1" : "IN",
2020 "ap-south-2" : "IN",
2021 "ap-southeast-1" : "SG",
2022 "ap-southeast-2" : "AU",
2023 "ap-southeast-3" : "MY",
2024 "ap-southeast-4" : "AU",
2025 "ap-southeast-5" : "NZ", # Auckland, NZ
2026 "ap-southeast-6" : "AP", # XXX: Precise location not documented anywhere
2027
2028 # Canada
2029 "ca-central-1" : "CA",
2030 "ca-west-1" : "CA",
2031
2032 # Europe
2033 "eu-central-1" : "DE",
2034 "eu-central-2" : "CH",
2035 "eu-north-1" : "SE",
2036 "eu-west-1" : "IE",
2037 "eu-west-2" : "GB",
2038 "eu-west-3" : "FR",
2039 "eu-south-1" : "IT",
2040 "eu-south-2" : "ES",
2041
2042 # Middle East
2043 "me-central-1" : "AE",
2044 "me-south-1" : "BH",
2045
2046 # South America
2047 "sa-east-1" : "BR",
2048
2049 # Undocumented, likely located in Berlin rather than Frankfurt
2050 "eusc-de-east-1" : "DE",
2051 }
2052
2053 # Collect a list of all networks
2054 prefixes = feed.get("ipv6_prefixes", []) + feed.get("prefixes", [])
2055
2056 for prefix in prefixes:
2057 # Fetch network
2058 network = prefix.get("ipv6_prefix") or prefix.get("ip_prefix")
2059
2060 # Parse the network
2061 try:
2062 network = ipaddress.ip_network(network)
2063 except ValuleError as e:
2064 log.warning("%s: Unable to parse prefix %s" % (name, network))
2065 continue
2066
2067 # Sanitize parsed networks...
2068 if not self._check_parsed_network(network):
2069 continue
2070
2071 # Fetch the region
2072 region = prefix.get("region")
2073
2074 # Set some defaults
2075 cc = None
2076 is_anycast = False
2077
2078 # Fetch the CC from the dictionary
2079 try:
2080 cc = aws_region_country_map[region]
2081
2082 # If we couldn't find anything, let's try something else...
2083 except KeyError as e:
2084 # Find anycast networks
2085 if region == "GLOBAL":
2086 is_anycast = True
2087
2088 # Everything that starts with us- is probably in the United States
2089 elif region.startswith("us-"):
2090 cc = "US"
2091
2092 # Everything that starts with cn- is probably China
2093 elif region.startswith("cn-"):
2094 cc = "CN"
2095
2096 # Log a warning for anything else
2097 else:
2098 log.warning("%s: Could not determine country code for AWS region %s" \
2099 % (name, region))
2100 continue
2101
2102 # Write to database
2103 self.db.execute("""
2104 INSERT INTO
2105 network_feeds
2106 (
2107 network,
2108 source,
2109 country,
2110 is_anycast
2111 )
2112 VALUES
2113 (
2114 %s, %s, %s, %s
2115 )
2116 ON CONFLICT (network, source) DO NOTHING
2117 """, "%s" % network, name, cc, is_anycast,
2118 )
2119
2120 def _import_spamhaus_drop(self, name, f):
2121 """
2122 Import Spamhaus DROP IP feeds
2123 """
2124 # Count all lines
2125 lines = 0
2126
2127 # Walk through all lines
2128 for line in f:
2129 # Decode line
2130 line = line.decode("utf-8")
2131
2132 # Strip off any comments
2133 line, _, comment = line.partition(";")
2134
2135 # Ignore empty lines
2136 if not line:
2137 continue
2138
2139 # Strip any excess whitespace
2140 line = line.strip()
2141
2142 # Increment line counter
2143 lines += 1
2144
2145 # Parse the network
2146 try:
2147 network = ipaddress.ip_network(line)
2148 except ValueError as e:
2149 log.warning("%s: Could not parse network: %s - %s" % (name, line, e))
2150 continue
2151
2152 # Check network
2153 if not self._check_parsed_network(network):
2154 log.warning("%s: Skipping bogus network: %s" % (name, network))
2155 continue
2156
2157 # Insert into the database
2158 self.db.execute("""
2159 INSERT INTO
2160 network_feeds
2161 (
2162 network,
2163 source,
2164 is_drop
2165 )
2166 VALUES
2167 (
2168 %s, %s, %s
2169 )""", "%s" % network, name, True,
2170 )
2171
2172 # Raise an exception if we could not import anything
2173 if not lines:
2174 raise RuntimeError("Received bogus feed %s with no data" % name)
2175
2176 def _import_spamhaus_asndrop(self, name, f):
2177 """
2178 Import Spamhaus ASNDROP feed
2179 """
2180 for line in f:
2181 # Decode the line
2182 line = line.decode("utf-8")
2183
2184 # Parse JSON
2185 try:
2186 line = json.loads(line)
2187 except json.JSONDecodeError as e:
2188 log.warning("%s: Unable to parse JSON object %s: %s" % (name, line, e))
2189 continue
2190
2191 # Fetch type
2192 type = line.get("type")
2193
2194 # Skip any metadata
2195 if type == "metadata":
2196 continue
2197
2198 # Fetch ASN
2199 asn = line.get("asn")
2200
2201 # Skip any lines without an ASN
2202 if not asn:
2203 continue
2204
2205 # Filter invalid ASNs
2206 if not self._check_parsed_asn(asn):
2207 log.warning("%s: Skipping bogus ASN %s" % (name, asn))
2208 continue
2209
2210 # Write to database
2211 self.db.execute("""
2212 INSERT INTO
2213 autnum_feeds
2214 (
2215 number,
2216 source,
2217 is_drop
2218 )
2219 VALUES
2220 (
2221 %s, %s, %s
2222 )""", "%s" % asn, name, True,
2223 )
2224
2225 @staticmethod
2226 def _parse_bool(block, key):
2227 val = block.get(key)
2228
2229 # There is no point to proceed when we got None
2230 if val is None:
2231 return
2232
2233 # Convert to lowercase
2234 val = val.lower()
2235
2236 # True
2237 if val in ("yes", "1"):
2238 return True
2239
2240 # False
2241 if val in ("no", "0"):
2242 return False
2243
2244 # Default to None
2245 return None
2246
2247 def handle_import_countries(self, ns):
2248 with self.db.transaction():
2249 # Drop all data that we have
2250 self.db.execute("TRUNCATE TABLE countries")
2251
2252 for file in ns.file:
2253 for line in file:
2254 line = line.rstrip()
2255
2256 # Ignore any comments
2257 if line.startswith("#"):
2258 continue
2259
2260 try:
2261 country_code, continent_code, name = line.split(maxsplit=2)
2262 except:
2263 log.warning("Could not parse line: %s" % line)
2264 continue
2265
2266 self.db.execute("INSERT INTO countries(country_code, name, continent_code) \
2267 VALUES(%s, %s, %s) ON CONFLICT DO NOTHING", country_code, name, continent_code)
2268
2269
2270 def split_line(line):
2271 key, colon, val = line.partition(":")
2272
2273 # Strip any excess space
2274 key = key.strip()
2275 val = val.strip()
2276
2277 return key, val
2278
2279 def read_blocks(f):
2280 for block in iterate_over_blocks(f):
2281 type = None
2282 data = {}
2283
2284 for i, line in enumerate(block):
2285 key, value = line.split(":", 1)
2286
2287 # The key of the first line defines the type
2288 if i == 0:
2289 type = key
2290
2291 # Store value
2292 data[key] = value.strip()
2293
2294 yield type, data
2295
2296 def iterate_over_blocks(f, charsets=("utf-8", "latin1")):
2297 block = []
2298
2299 for line in f:
2300 # Skip commented lines
2301 if line.startswith(b"#") or line.startswith(b"%"):
2302 continue
2303
2304 # Convert to string
2305 for charset in charsets:
2306 try:
2307 line = line.decode(charset)
2308 except UnicodeDecodeError:
2309 continue
2310 else:
2311 break
2312
2313 # Remove any comments at the end of line
2314 line, hash, comment = line.partition("#")
2315
2316 # Strip any whitespace at the end of the line
2317 line = line.rstrip()
2318
2319 # If we cut off some comment and the line is empty, we can skip it
2320 if comment and not line:
2321 continue
2322
2323 # If the line has some content, keep collecting it
2324 if line:
2325 block.append(line)
2326 continue
2327
2328 # End the block on an empty line
2329 if block:
2330 yield block
2331
2332 # Reset the block
2333 block = []
2334
2335 # Return the last block
2336 if block:
2337 yield block
2338
2339 def iterate_over_lines(f):
2340 for line in f:
2341 # Decode the line
2342 line = line.decode()
2343
2344 # Strip the ending
2345 yield line.rstrip()
2346
2347 def main():
2348 # Run the command line interface
2349 c = CLI()
2350 c.run()
2351
2352 main()