]> git.ipfire.org Git - people/ms/libloc.git/blob - src/scripts/location-importer.in
importer: Merge the downloader into our main downloader
[people/ms/libloc.git] / src / scripts / location-importer.in
1 #!/usr/bin/python3
2 ###############################################################################
3 # #
4 # libloc - A library to determine the location of someone on the Internet #
5 # #
6 # Copyright (C) 2020-2024 IPFire Development Team <info@ipfire.org> #
7 # #
8 # This library is free software; you can redistribute it and/or #
9 # modify it under the terms of the GNU Lesser General Public #
10 # License as published by the Free Software Foundation; either #
11 # version 2.1 of the License, or (at your option) any later version. #
12 # #
13 # This library is distributed in the hope that it will be useful, #
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of #
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU #
16 # Lesser General Public License for more details. #
17 # #
18 ###############################################################################
19
20 import argparse
21 import csv
22 import functools
23 import http.client
24 import ipaddress
25 import json
26 import logging
27 import math
28 import re
29 import socket
30 import sys
31 import urllib.error
32
33 # Load our location module
34 import location
35 import location.database
36 from location.downloader import Downloader
37 from location.i18n import _
38
39 # Initialise logging
40 log = logging.getLogger("location.importer")
41 log.propagate = 1
42
43 # Define constants
44 VALID_ASN_RANGES = (
45 (1, 23455),
46 (23457, 64495),
47 (131072, 4199999999),
48 )
49
50 TRANSLATED_COUNTRIES = {
51 # When people say UK, they mean GB
52 "UK" : "GB",
53 }
54
55 IGNORED_COUNTRIES = set((
56 # Formerly Yugoslavia
57 "YU",
58
59 # Some people use ZZ to say "no country" or to hide the country
60 "ZZ",
61 ))
62
63 # Configure the CSV parser for ARIN
64 csv.register_dialect("arin", delimiter=",", quoting=csv.QUOTE_ALL, quotechar="\"")
65
66 class CLI(object):
67 def parse_cli(self):
68 parser = argparse.ArgumentParser(
69 description=_("Location Importer Command Line Interface"),
70 )
71 subparsers = parser.add_subparsers()
72
73 # Global configuration flags
74 parser.add_argument("--debug", action="store_true",
75 help=_("Enable debug output"))
76 parser.add_argument("--quiet", action="store_true",
77 help=_("Enable quiet mode"))
78
79 # version
80 parser.add_argument("--version", action="version",
81 version="%(prog)s @VERSION@")
82
83 # Database
84 parser.add_argument("--database-host", required=True,
85 help=_("Database Hostname"), metavar=_("HOST"))
86 parser.add_argument("--database-name", required=True,
87 help=_("Database Name"), metavar=_("NAME"))
88 parser.add_argument("--database-username", required=True,
89 help=_("Database Username"), metavar=_("USERNAME"))
90 parser.add_argument("--database-password", required=True,
91 help=_("Database Password"), metavar=_("PASSWORD"))
92
93 # Write Database
94 write = subparsers.add_parser("write", help=_("Write database to file"))
95 write.set_defaults(func=self.handle_write)
96 write.add_argument("file", nargs=1, help=_("Database File"))
97 write.add_argument("--signing-key", nargs="?", type=open, help=_("Signing Key"))
98 write.add_argument("--backup-signing-key", nargs="?", type=open, help=_("Backup Signing Key"))
99 write.add_argument("--vendor", nargs="?", help=_("Sets the vendor"))
100 write.add_argument("--description", nargs="?", help=_("Sets a description"))
101 write.add_argument("--license", nargs="?", help=_("Sets the license"))
102 write.add_argument("--version", type=int, help=_("Database Format Version"))
103
104 # Update WHOIS
105 update_whois = subparsers.add_parser("update-whois", help=_("Update WHOIS Information"))
106 update_whois.add_argument("sources", nargs="*",
107 help=_("Only update these sources"))
108 update_whois.set_defaults(func=self.handle_update_whois)
109
110 # Update announcements
111 update_announcements = subparsers.add_parser("update-announcements",
112 help=_("Update BGP Annoucements"))
113 update_announcements.set_defaults(func=self.handle_update_announcements)
114 update_announcements.add_argument("server", nargs=1,
115 help=_("Route Server to connect to"), metavar=_("SERVER"))
116
117 # Update geofeeds
118 update_geofeeds = subparsers.add_parser("update-geofeeds",
119 help=_("Update Geofeeds"))
120 update_geofeeds.set_defaults(func=self.handle_update_geofeeds)
121
122 # Update feeds
123 update_feeds = subparsers.add_parser("update-feeds",
124 help=_("Update Feeds"))
125 update_feeds.add_argument("feeds", nargs="*",
126 help=_("Only update these feeds"))
127 update_feeds.set_defaults(func=self.handle_update_feeds)
128
129 # Update overrides
130 update_overrides = subparsers.add_parser("update-overrides",
131 help=_("Update overrides"),
132 )
133 update_overrides.add_argument(
134 "files", nargs="+", help=_("Files to import"),
135 )
136 update_overrides.set_defaults(func=self.handle_update_overrides)
137
138 # Import countries
139 import_countries = subparsers.add_parser("import-countries",
140 help=_("Import countries"),
141 )
142 import_countries.add_argument("file", nargs=1, type=argparse.FileType("r"),
143 help=_("File to import"))
144 import_countries.set_defaults(func=self.handle_import_countries)
145
146 args = parser.parse_args()
147
148 # Configure logging
149 if args.debug:
150 location.logger.set_level(logging.DEBUG)
151 elif args.quiet:
152 location.logger.set_level(logging.WARNING)
153
154 # Print usage if no action was given
155 if not "func" in args:
156 parser.print_usage()
157 sys.exit(2)
158
159 return args
160
161 def run(self):
162 # Parse command line arguments
163 args = self.parse_cli()
164
165 # Initialize the downloader
166 self.downloader = Downloader()
167
168 # Initialise database
169 self.db = self._setup_database(args)
170
171 # Call function
172 ret = args.func(args)
173
174 # Return with exit code
175 if ret:
176 sys.exit(ret)
177
178 # Otherwise just exit
179 sys.exit(0)
180
181 def _setup_database(self, ns):
182 """
183 Initialise the database
184 """
185 # Connect to database
186 db = location.database.Connection(
187 host=ns.database_host, database=ns.database_name,
188 user=ns.database_username, password=ns.database_password,
189 )
190
191 with db.transaction():
192 db.execute("""
193 -- announcements
194 CREATE TABLE IF NOT EXISTS announcements(network inet, autnum bigint,
195 first_seen_at timestamp without time zone DEFAULT CURRENT_TIMESTAMP,
196 last_seen_at timestamp without time zone DEFAULT CURRENT_TIMESTAMP);
197 CREATE UNIQUE INDEX IF NOT EXISTS announcements_networks ON announcements(network);
198 CREATE INDEX IF NOT EXISTS announcements_family ON announcements(family(network));
199 CREATE INDEX IF NOT EXISTS announcements_search ON announcements USING GIST(network inet_ops);
200
201 -- autnums
202 CREATE TABLE IF NOT EXISTS autnums(number bigint, name text NOT NULL);
203 ALTER TABLE autnums ADD COLUMN IF NOT EXISTS source text;
204 CREATE UNIQUE INDEX IF NOT EXISTS autnums_number ON autnums(number);
205
206 -- countries
207 CREATE TABLE IF NOT EXISTS countries(
208 country_code text NOT NULL, name text NOT NULL, continent_code text NOT NULL);
209 CREATE UNIQUE INDEX IF NOT EXISTS countries_country_code ON countries(country_code);
210
211 -- networks
212 CREATE TABLE IF NOT EXISTS networks(network inet, country text);
213 ALTER TABLE networks ADD COLUMN IF NOT EXISTS original_countries text[];
214 ALTER TABLE networks ADD COLUMN IF NOT EXISTS source text;
215 CREATE UNIQUE INDEX IF NOT EXISTS networks_network ON networks(network);
216 CREATE INDEX IF NOT EXISTS networks_family ON networks USING BTREE(family(network));
217 CREATE INDEX IF NOT EXISTS networks_search ON networks USING GIST(network inet_ops);
218
219 -- geofeeds
220 CREATE TABLE IF NOT EXISTS geofeeds(
221 id serial primary key,
222 url text,
223 status integer default null,
224 updated_at timestamp without time zone default null
225 );
226 ALTER TABLE geofeeds ADD COLUMN IF NOT EXISTS error text;
227 CREATE UNIQUE INDEX IF NOT EXISTS geofeeds_unique
228 ON geofeeds(url);
229 CREATE TABLE IF NOT EXISTS geofeed_networks(
230 geofeed_id integer references geofeeds(id) on delete cascade,
231 network inet,
232 country text,
233 region text,
234 city text
235 );
236 CREATE INDEX IF NOT EXISTS geofeed_networks_geofeed_id
237 ON geofeed_networks(geofeed_id);
238 CREATE INDEX IF NOT EXISTS geofeed_networks_search
239 ON geofeed_networks USING GIST(network inet_ops);
240 CREATE TABLE IF NOT EXISTS network_geofeeds(network inet, url text);
241 ALTER TABLE network_geofeeds ADD COLUMN IF NOT EXISTS source text NOT NULL;
242 CREATE UNIQUE INDEX IF NOT EXISTS network_geofeeds_unique
243 ON network_geofeeds(network);
244 CREATE INDEX IF NOT EXISTS network_geofeeds_search
245 ON network_geofeeds USING GIST(network inet_ops);
246 CREATE INDEX IF NOT EXISTS network_geofeeds_url
247 ON network_geofeeds(url);
248
249 -- feeds
250 CREATE TABLE IF NOT EXISTS autnum_feeds(
251 number bigint NOT NULL,
252 source text NOT NULL,
253 name text,
254 country text,
255 is_anonymous_proxy boolean,
256 is_satellite_provider boolean,
257 is_anycast boolean,
258 is_drop boolean
259 );
260 CREATE UNIQUE INDEX IF NOT EXISTS autnum_feeds_unique
261 ON autnum_feeds(number, source);
262
263 CREATE TABLE IF NOT EXISTS network_feeds(
264 network inet NOT NULL,
265 source text NOT NULL,
266 country text,
267 is_anonymous_proxy boolean,
268 is_satellite_provider boolean,
269 is_anycast boolean,
270 is_drop boolean
271 );
272 CREATE UNIQUE INDEX IF NOT EXISTS network_feeds_unique
273 ON network_feeds(network, source);
274 CREATE INDEX IF NOT EXISTS network_feeds_search
275 ON network_feeds USING GIST(network inet_ops);
276
277 -- overrides
278 CREATE TABLE IF NOT EXISTS autnum_overrides(
279 number bigint NOT NULL,
280 name text,
281 country text,
282 is_anonymous_proxy boolean,
283 is_satellite_provider boolean,
284 is_anycast boolean
285 );
286 CREATE UNIQUE INDEX IF NOT EXISTS autnum_overrides_number
287 ON autnum_overrides(number);
288 ALTER TABLE autnum_overrides ADD COLUMN IF NOT EXISTS is_drop boolean;
289 ALTER TABLE autnum_overrides DROP COLUMN IF EXISTS source;
290
291 CREATE TABLE IF NOT EXISTS network_overrides(
292 network inet NOT NULL,
293 country text,
294 is_anonymous_proxy boolean,
295 is_satellite_provider boolean,
296 is_anycast boolean
297 );
298 CREATE UNIQUE INDEX IF NOT EXISTS network_overrides_network
299 ON network_overrides(network);
300 CREATE INDEX IF NOT EXISTS network_overrides_search
301 ON network_overrides USING GIST(network inet_ops);
302 ALTER TABLE network_overrides ADD COLUMN IF NOT EXISTS is_drop boolean;
303 ALTER TABLE network_overrides DROP COLUMN IF EXISTS source;
304 """)
305
306 return db
307
308 def fetch_countries(self):
309 """
310 Returns a list of all countries on the list
311 """
312 # Fetch all valid country codes to check parsed networks aganist...
313 countries = self.db.query("SELECT country_code FROM countries ORDER BY country_code")
314
315 return set((country.country_code for country in countries))
316
317 def handle_write(self, ns):
318 """
319 Compiles a database in libloc format out of what is in the database
320 """
321 # Allocate a writer
322 writer = location.Writer(ns.signing_key, ns.backup_signing_key)
323
324 # Set all metadata
325 if ns.vendor:
326 writer.vendor = ns.vendor
327
328 if ns.description:
329 writer.description = ns.description
330
331 if ns.license:
332 writer.license = ns.license
333
334 # Add all Autonomous Systems
335 log.info("Writing Autonomous Systems...")
336
337 # Select all ASes with a name
338 rows = self.db.query("""
339 SELECT
340 autnums.number AS number,
341 COALESCE(
342 overrides.name,
343 autnums.name
344 ) AS name
345 FROM
346 autnums
347 LEFT JOIN
348 autnum_overrides overrides ON autnums.number = overrides.number
349 ORDER BY
350 autnums.number
351 """)
352
353 for row in rows:
354 # Skip AS without names
355 if not row.name:
356 continue
357
358 a = writer.add_as(row.number)
359 a.name = row.name
360
361 # Add all networks
362 log.info("Writing networks...")
363
364 # Select all known networks
365 rows = self.db.query("""
366 WITH known_networks AS (
367 SELECT network FROM announcements
368 UNION
369 SELECT network FROM networks
370 UNION
371 SELECT network FROM network_feeds
372 UNION
373 SELECT network FROM network_overrides
374 UNION
375 SELECT network FROM geofeed_networks
376 ),
377
378 ordered_networks AS (
379 SELECT
380 known_networks.network AS network,
381 announcements.autnum AS autnum,
382 networks.country AS country,
383
384 -- Must be part of returned values for ORDER BY clause
385 masklen(announcements.network) AS sort_a,
386 masklen(networks.network) AS sort_b
387 FROM
388 known_networks
389 LEFT JOIN
390 announcements ON known_networks.network <<= announcements.network
391 LEFT JOIN
392 networks ON known_networks.network <<= networks.network
393 ORDER BY
394 known_networks.network,
395 sort_a DESC,
396 sort_b DESC
397 )
398
399 -- Return a list of those networks enriched with all
400 -- other information that we store in the database
401 SELECT
402 DISTINCT ON (network)
403 network,
404 autnum,
405
406 -- Country
407 COALESCE(
408 (
409 SELECT
410 country
411 FROM
412 network_overrides overrides
413 WHERE
414 networks.network <<= overrides.network
415 ORDER BY
416 masklen(overrides.network) DESC
417 LIMIT 1
418 ),
419 (
420 SELECT
421 country
422 FROM
423 autnum_overrides overrides
424 WHERE
425 networks.autnum = overrides.number
426 ),
427 (
428 SELECT
429 country
430 FROM
431 network_feeds feeds
432 WHERE
433 networks.network <<= feeds.network
434 ORDER BY
435 masklen(feeds.network) DESC
436 LIMIT 1
437 ),
438 (
439 SELECT
440 country
441 FROM
442 autnum_feeds feeds
443 WHERE
444 networks.autnum = feeds.number
445 ORDER BY
446 source
447 LIMIT 1
448 ),
449 (
450 SELECT
451 geofeed_networks.country AS country
452 FROM
453 network_geofeeds
454
455 -- Join the data from the geofeeds
456 LEFT JOIN
457 geofeeds ON network_geofeeds.url = geofeeds.url
458 LEFT JOIN
459 geofeed_networks ON geofeeds.id = geofeed_networks.geofeed_id
460
461 -- Check whether we have a geofeed for this network
462 WHERE
463 networks.network <<= network_geofeeds.network
464 AND
465 networks.network <<= geofeed_networks.network
466
467 -- Filter for the best result
468 ORDER BY
469 masklen(geofeed_networks.network) DESC
470 LIMIT 1
471 ),
472 networks.country
473 ) AS country,
474
475 -- Flags
476 COALESCE(
477 (
478 SELECT
479 is_anonymous_proxy
480 FROM
481 network_overrides overrides
482 WHERE
483 networks.network <<= overrides.network
484 ORDER BY
485 masklen(overrides.network) DESC
486 LIMIT 1
487 ),
488 (
489 SELECT
490 is_anonymous_proxy
491 FROM
492 network_feeds feeds
493 WHERE
494 networks.network <<= feeds.network
495 ORDER BY
496 masklen(feeds.network) DESC
497 LIMIT 1
498 ),
499 (
500 SELECT
501 is_anonymous_proxy
502 FROM
503 autnum_feeds feeds
504 WHERE
505 networks.autnum = feeds.number
506 ORDER BY
507 source
508 LIMIT 1
509 ),
510 (
511 SELECT
512 is_anonymous_proxy
513 FROM
514 autnum_overrides overrides
515 WHERE
516 networks.autnum = overrides.number
517 ),
518 FALSE
519 ) AS is_anonymous_proxy,
520 COALESCE(
521 (
522 SELECT
523 is_satellite_provider
524 FROM
525 network_overrides overrides
526 WHERE
527 networks.network <<= overrides.network
528 ORDER BY
529 masklen(overrides.network) DESC
530 LIMIT 1
531 ),
532 (
533 SELECT
534 is_satellite_provider
535 FROM
536 network_feeds feeds
537 WHERE
538 networks.network <<= feeds.network
539 ORDER BY
540 masklen(feeds.network) DESC
541 LIMIT 1
542 ),
543 (
544 SELECT
545 is_satellite_provider
546 FROM
547 autnum_feeds feeds
548 WHERE
549 networks.autnum = feeds.number
550 ORDER BY
551 source
552 LIMIT 1
553 ),
554 (
555 SELECT
556 is_satellite_provider
557 FROM
558 autnum_overrides overrides
559 WHERE
560 networks.autnum = overrides.number
561 ),
562 FALSE
563 ) AS is_satellite_provider,
564 COALESCE(
565 (
566 SELECT
567 is_anycast
568 FROM
569 network_overrides overrides
570 WHERE
571 networks.network <<= overrides.network
572 ORDER BY
573 masklen(overrides.network) DESC
574 LIMIT 1
575 ),
576 (
577 SELECT
578 is_anycast
579 FROM
580 network_feeds feeds
581 WHERE
582 networks.network <<= feeds.network
583 ORDER BY
584 masklen(feeds.network) DESC
585 LIMIT 1
586 ),
587 (
588 SELECT
589 is_anycast
590 FROM
591 autnum_feeds feeds
592 WHERE
593 networks.autnum = feeds.number
594 ORDER BY
595 source
596 LIMIT 1
597 ),
598 (
599 SELECT
600 is_anycast
601 FROM
602 autnum_overrides overrides
603 WHERE
604 networks.autnum = overrides.number
605 ),
606 FALSE
607 ) AS is_anycast,
608 COALESCE(
609 (
610 SELECT
611 is_drop
612 FROM
613 network_overrides overrides
614 WHERE
615 networks.network <<= overrides.network
616 ORDER BY
617 masklen(overrides.network) DESC
618 LIMIT 1
619 ),
620 (
621 SELECT
622 is_drop
623 FROM
624 network_feeds feeds
625 WHERE
626 networks.network <<= feeds.network
627 ORDER BY
628 masklen(feeds.network) DESC
629 LIMIT 1
630 ),
631 (
632 SELECT
633 is_drop
634 FROM
635 autnum_feeds feeds
636 WHERE
637 networks.autnum = feeds.number
638 ORDER BY
639 source
640 LIMIT 1
641 ),
642 (
643 SELECT
644 is_drop
645 FROM
646 autnum_overrides overrides
647 WHERE
648 networks.autnum = overrides.number
649 ),
650 FALSE
651 ) AS is_drop
652 FROM
653 ordered_networks networks
654 """)
655
656 for row in rows:
657 network = writer.add_network(row.network)
658
659 # Save country
660 if row.country:
661 network.country_code = row.country
662
663 # Save ASN
664 if row.autnum:
665 network.asn = row.autnum
666
667 # Set flags
668 if row.is_anonymous_proxy:
669 network.set_flag(location.NETWORK_FLAG_ANONYMOUS_PROXY)
670
671 if row.is_satellite_provider:
672 network.set_flag(location.NETWORK_FLAG_SATELLITE_PROVIDER)
673
674 if row.is_anycast:
675 network.set_flag(location.NETWORK_FLAG_ANYCAST)
676
677 if row.is_drop:
678 network.set_flag(location.NETWORK_FLAG_DROP)
679
680 # Add all countries
681 log.info("Writing countries...")
682 rows = self.db.query("SELECT * FROM countries ORDER BY country_code")
683
684 for row in rows:
685 c = writer.add_country(row.country_code)
686 c.continent_code = row.continent_code
687 c.name = row.name
688
689 # Write everything to file
690 log.info("Writing database to file...")
691 for file in ns.file:
692 writer.write(file)
693
694 def handle_update_whois(self, ns):
695 # Did we run successfully?
696 success = True
697
698 sources = (
699 # African Network Information Centre
700 ("AFRINIC", (
701 (self._import_standard_format, "https://ftp.afrinic.net/pub/pub/dbase/afrinic.db.gz"),
702 )),
703
704 # Asia Pacific Network Information Centre
705 ("APNIC", (
706 (self._import_standard_format, "https://ftp.apnic.net/apnic/whois/apnic.db.inet6num.gz"),
707 (self._import_standard_format, "https://ftp.apnic.net/apnic/whois/apnic.db.inetnum.gz"),
708 (self._import_standard_format, "https://ftp.apnic.net/apnic/whois/apnic.db.aut-num.gz"),
709 (self._import_standard_format, "https://ftp.apnic.net/apnic/whois/apnic.db.organisation.gz"),
710 )),
711
712 # American Registry for Internet Numbers
713 ("ARIN", (
714 (self._import_extended_format, "https://ftp.arin.net/pub/stats/arin/delegated-arin-extended-latest"),
715 (self._import_arin_as_names, "https://ftp.arin.net/pub/resource_registry_service/asns.csv"),
716 )),
717
718 # Japan Network Information Center
719 ("JPNIC", (
720 (self._import_standard_format, "https://ftp.nic.ad.jp/jpirr/jpirr.db.gz"),
721 )),
722
723 # Latin America and Caribbean Network Information Centre
724 ("LACNIC", (
725 (self._import_standard_format, "https://ftp.lacnic.net/lacnic/dbase/lacnic.db.gz"),
726 (self._import_extended_format, "https://ftp.lacnic.net/pub/stats/lacnic/delegated-lacnic-extended-latest"),
727 )),
728
729 # Réseaux IP Européens
730 ("RIPE", (
731 (self._import_standard_format, "https://ftp.ripe.net/ripe/dbase/split/ripe.db.inet6num.gz"),
732 (self._import_standard_format, "https://ftp.ripe.net/ripe/dbase/split/ripe.db.inetnum.gz"),
733 (self._import_standard_format, "https://ftp.ripe.net/ripe/dbase/split/ripe.db.aut-num.gz"),
734 (self._import_standard_format, "https://ftp.ripe.net/ripe/dbase/split/ripe.db.organisation.gz"),
735 )),
736 )
737
738 # Fetch all valid country codes to check parsed networks against
739 countries = self.fetch_countries()
740
741 # Check if we have countries
742 if not countries:
743 log.error("Please import countries before importing any WHOIS data")
744 return 1
745
746 # Iterate over all potential sources
747 for name, feeds in sources:
748 # Skip anything that should not be updated
749 if ns.sources and not name in ns.sources:
750 continue
751
752 try:
753 self._process_source(name, feeds, countries)
754
755 # Log an error but continue if an exception occurs
756 except Exception as e:
757 log.error("Error processing source %s" % name, exc_info=True)
758 success = False
759
760 # Return a non-zero exit code for errors
761 return 0 if success else 1
762
763 def _process_source(self, source, feeds, countries):
764 """
765 This function processes one source
766 """
767 # Wrap everything into one large transaction
768 with self.db.transaction():
769 # Remove all previously imported content
770 self.db.execute("DELETE FROM autnums WHERE source = %s", source)
771 self.db.execute("DELETE FROM networks WHERE source = %s", source)
772 self.db.execute("DELETE FROM network_geofeeds WHERE source = %s", source)
773
774 # Create some temporary tables to store parsed data
775 self.db.execute("""
776 CREATE TEMPORARY TABLE _autnums(number integer NOT NULL,
777 organization text NOT NULL, source text NOT NULL) ON COMMIT DROP;
778 CREATE UNIQUE INDEX _autnums_number ON _autnums(number);
779
780 CREATE TEMPORARY TABLE _organizations(handle text NOT NULL,
781 name text NOT NULL, source text NOT NULL) ON COMMIT DROP;
782 CREATE UNIQUE INDEX _organizations_handle ON _organizations(handle);
783
784 CREATE TEMPORARY TABLE _rirdata(network inet NOT NULL, country text,
785 original_countries text[] NOT NULL, source text NOT NULL)
786 ON COMMIT DROP;
787 CREATE INDEX _rirdata_search ON _rirdata
788 USING BTREE(family(network), masklen(network));
789 CREATE UNIQUE INDEX _rirdata_network ON _rirdata(network);
790 """)
791
792 # Parse all feeds
793 for callback, url, *args in feeds:
794 # Retrieve the feed
795 f = self.downloader.retrieve(url)
796
797 # Call the callback
798 callback(source, countries, f, *args)
799
800 # Process all parsed networks from every RIR we happen to have access to,
801 # insert the largest network chunks into the networks table immediately...
802 families = self.db.query("""
803 SELECT DISTINCT
804 family(network) AS family
805 FROM
806 _rirdata
807 ORDER BY
808 family(network)
809 """,
810 )
811
812 for family in (row.family for row in families):
813 # Fetch the smallest mask length in our data set
814 smallest = self.db.get("""
815 SELECT
816 MIN(
817 masklen(network)
818 ) AS prefix
819 FROM
820 _rirdata
821 WHERE
822 family(network) = %s
823 """, family,
824 )
825
826 # Copy all networks
827 self.db.execute("""
828 INSERT INTO
829 networks
830 (
831 network,
832 country,
833 original_countries,
834 source
835 )
836 SELECT
837 network,
838 country,
839 original_countries,
840 source
841 FROM
842 _rirdata
843 WHERE
844 masklen(network) = %s
845 AND
846 family(network) = %s
847 ON CONFLICT DO
848 NOTHING""",
849 smallest.prefix,
850 family,
851 )
852
853 # ... determine any other prefixes for this network family, ...
854 prefixes = self.db.query("""
855 SELECT
856 DISTINCT masklen(network) AS prefix
857 FROM
858 _rirdata
859 WHERE
860 family(network) = %s
861 ORDER BY
862 masklen(network) ASC
863 OFFSET 1
864 """, family,
865 )
866
867 # ... and insert networks with this prefix in case they provide additional
868 # information (i. e. subnet of a larger chunk with a different country)
869 for prefix in (row.prefix for row in prefixes):
870 self.db.execute("""
871 WITH candidates AS (
872 SELECT
873 _rirdata.network,
874 _rirdata.country,
875 _rirdata.original_countries,
876 _rirdata.source
877 FROM
878 _rirdata
879 WHERE
880 family(_rirdata.network) = %s
881 AND
882 masklen(_rirdata.network) = %s
883 ),
884 filtered AS (
885 SELECT
886 DISTINCT ON (c.network)
887 c.network,
888 c.country,
889 c.original_countries,
890 c.source,
891 masklen(networks.network),
892 networks.country AS parent_country
893 FROM
894 candidates c
895 LEFT JOIN
896 networks
897 ON
898 c.network << networks.network
899 ORDER BY
900 c.network,
901 masklen(networks.network) DESC NULLS LAST
902 )
903 INSERT INTO
904 networks(network, country, original_countries, source)
905 SELECT
906 network,
907 country,
908 original_countries,
909 source
910 FROM
911 filtered
912 WHERE
913 parent_country IS NULL
914 OR
915 country <> parent_country
916 ON CONFLICT DO NOTHING
917 """, family, prefix,
918 )
919
920 self.db.execute("""
921 INSERT INTO
922 autnums
923 (
924 number,
925 name,
926 source
927 )
928 SELECT
929 _autnums.number,
930 _organizations.name,
931 _organizations.source
932 FROM
933 _autnums
934 JOIN
935 _organizations ON _autnums.organization = _organizations.handle
936 ON CONFLICT
937 (
938 number
939 )
940 DO UPDATE
941 SET name = excluded.name
942 """,
943 )
944
945 def _import_standard_format(self, source, countries, f, *args):
946 """
947 Imports a single standard format source feed
948 """
949 # Iterate over all blocks
950 for block in iterate_over_blocks(f):
951 self._parse_block(block, source, countries)
952
953 def _import_extended_format(self, source, countries, f, *args):
954 # Iterate over all lines
955 for line in iterate_over_lines(f):
956 self._parse_line(block, source, countries)
957
958 def _import_arin_as_names(self, source, countries, f, *args):
959 # Walk through the file
960 for line in csv.DictReader(feed, dialect="arin"):
961 log.debug("Processing object: %s" % line)
962
963 # Fetch status
964 status = line.get("Status")
965
966 # We are only interested in anything managed by ARIN
967 if not status == "Full Registry Services":
968 continue
969
970 # Fetch organization name
971 name = line.get("Org Name")
972
973 # Extract ASNs
974 first_asn = line.get("Start AS Number")
975 last_asn = line.get("End AS Number")
976
977 # Cast to a number
978 try:
979 first_asn = int(first_asn)
980 except TypeError as e:
981 log.warning("Could not parse ASN '%s'" % first_asn)
982 continue
983
984 try:
985 last_asn = int(last_asn)
986 except TypeError as e:
987 log.warning("Could not parse ASN '%s'" % last_asn)
988 continue
989
990 # Check if the range is valid
991 if last_asn < first_asn:
992 log.warning("Invalid ASN range %s-%s" % (first_asn, last_asn))
993
994 # Insert everything into the database
995 for asn in range(first_asn, last_asn + 1):
996 if not self._check_parsed_asn(asn):
997 log.warning("Skipping invalid ASN %s" % asn)
998 continue
999
1000 self.db.execute("""
1001 INSERT INTO
1002 autnums
1003 (
1004 number,
1005 name,
1006 source
1007 )
1008 VALUES
1009 (
1010 %s, %s, %s
1011 )
1012 ON CONFLICT
1013 (
1014 number
1015 )
1016 DO NOTHING
1017 """, asn, name, "ARIN",
1018 )
1019
1020 def _check_parsed_network(self, network):
1021 """
1022 Assistive function to detect and subsequently sort out parsed
1023 networks from RIR data (both Whois and so-called "extended sources"),
1024 which are or have...
1025
1026 (a) not globally routable (RFC 1918 space, et al.)
1027 (b) covering a too large chunk of the IP address space (prefix length
1028 is < 7 for IPv4 networks, and < 10 for IPv6)
1029 (c) "0.0.0.0" or "::" as a network address
1030
1031 This unfortunately is necessary due to brain-dead clutter across
1032 various RIR databases, causing mismatches and eventually disruptions.
1033
1034 We will return False in case a network is not suitable for adding
1035 it to our database, and True otherwise.
1036 """
1037 # Check input
1038 if isinstance(network, ipaddress.IPv6Network):
1039 pass
1040 elif isinstance(network, ipaddress.IPv4Network):
1041 pass
1042 else:
1043 raise ValueError("Invalid network: %s (type %s)" % (network, type(network)))
1044
1045 # Ignore anything that isn't globally routable
1046 if not network.is_global:
1047 log.debug("Skipping non-globally routable network: %s" % network)
1048 return False
1049
1050 # Ignore anything that is unspecified IP range (See RFC 5735 for IPv4 or RFC 2373 for IPv6)
1051 elif network.is_unspecified:
1052 log.debug("Skipping unspecified network: %s" % network)
1053 return False
1054
1055 # IPv6
1056 if network.version == 6:
1057 if network.prefixlen < 10:
1058 log.debug("Skipping too big IP chunk: %s" % network)
1059 return False
1060
1061 # IPv4
1062 elif network.version == 4:
1063 if network.prefixlen < 7:
1064 log.debug("Skipping too big IP chunk: %s" % network)
1065 return False
1066
1067 # In case we have made it here, the network is considered to
1068 # be suitable for libloc consumption...
1069 return True
1070
1071 def _check_parsed_asn(self, asn):
1072 """
1073 Assistive function to filter Autonomous System Numbers not being suitable
1074 for adding to our database. Returns False in such cases, and True otherwise.
1075 """
1076
1077 for start, end in VALID_ASN_RANGES:
1078 if start <= asn and end >= asn:
1079 return True
1080
1081 log.info("Supplied ASN %s out of publicly routable ASN ranges" % asn)
1082 return False
1083
1084 def _parse_block(self, block, source_key, countries):
1085 # Get first line to find out what type of block this is
1086 line = block[0]
1087
1088 # aut-num
1089 if line.startswith("aut-num:"):
1090 return self._parse_autnum_block(block, source_key)
1091
1092 # inetnum
1093 if line.startswith("inet6num:") or line.startswith("inetnum:"):
1094 return self._parse_inetnum_block(block, source_key, countries)
1095
1096 # organisation
1097 elif line.startswith("organisation:"):
1098 return self._parse_org_block(block, source_key)
1099
1100 def _parse_autnum_block(self, block, source_key):
1101 autnum = {}
1102 for line in block:
1103 # Split line
1104 key, val = split_line(line)
1105
1106 if key == "aut-num":
1107 m = re.match(r"^(AS|as)(\d+)", val)
1108 if m:
1109 autnum["asn"] = m.group(2)
1110
1111 elif key == "org":
1112 autnum[key] = val.upper()
1113
1114 elif key == "descr":
1115 # Save the first description line as well...
1116 if not key in autnum:
1117 autnum[key] = val
1118
1119 # Skip empty objects
1120 if not autnum or not "asn" in autnum:
1121 return
1122
1123 # Insert a dummy organisation handle into our temporary organisations
1124 # table in case the AS does not have an organisation handle set, but
1125 # has a description (a quirk often observed in APNIC area), so we can
1126 # later display at least some string for this AS.
1127 if not "org" in autnum:
1128 if "descr" in autnum:
1129 autnum["org"] = "LIBLOC-%s-ORGHANDLE" % autnum.get("asn")
1130
1131 self.db.execute("INSERT INTO _organizations(handle, name, source) \
1132 VALUES(%s, %s, %s) ON CONFLICT (handle) DO NOTHING",
1133 autnum.get("org"), autnum.get("descr"), source_key,
1134 )
1135 else:
1136 log.warning("ASN %s neither has an organisation handle nor a description line set, omitting" % \
1137 autnum.get("asn"))
1138 return
1139
1140 # Insert into database
1141 self.db.execute("INSERT INTO _autnums(number, organization, source) \
1142 VALUES(%s, %s, %s) ON CONFLICT (number) DO UPDATE SET \
1143 organization = excluded.organization",
1144 autnum.get("asn"), autnum.get("org"), source_key,
1145 )
1146
1147 def _parse_inetnum_block(self, block, source_key, countries):
1148 inetnum = {}
1149 for line in block:
1150 # Split line
1151 key, val = split_line(line)
1152
1153 # Filter any inetnum records which are only referring to IP space
1154 # not managed by that specific RIR...
1155 if key == "netname":
1156 if re.match(r"^(ERX-NETBLOCK|(AFRINIC|ARIN|LACNIC|RIPE)-CIDR-BLOCK|IANA-NETBLOCK-\d{1,3}|NON-RIPE-NCC-MANAGED-ADDRESS-BLOCK|STUB-[\d-]{3,}SLASH\d{1,2})", val.strip()):
1157 log.debug("Skipping record indicating historic/orphaned data: %s" % val.strip())
1158 return
1159
1160 if key == "inetnum":
1161 start_address, delim, end_address = val.partition("-")
1162
1163 # Strip any excess space
1164 start_address, end_address = start_address.rstrip(), end_address.strip()
1165
1166 # Handle "inetnum" formatting in LACNIC DB (e.g. "24.152.8/22" instead of "24.152.8.0/22")
1167 if start_address and not (delim or end_address):
1168 try:
1169 start_address = ipaddress.ip_network(start_address, strict=False)
1170 except ValueError:
1171 start_address = start_address.split("/")
1172 ldigits = start_address[0].count(".")
1173
1174 # How many octets do we need to add?
1175 # (LACNIC does not seem to have a /8 or greater assigned, so the following should suffice.)
1176 if ldigits == 1:
1177 start_address = start_address[0] + ".0.0/" + start_address[1]
1178 elif ldigits == 2:
1179 start_address = start_address[0] + ".0/" + start_address[1]
1180 else:
1181 log.warning("Could not recover IPv4 address from line in LACNIC DB format: %s" % line)
1182 return
1183
1184 try:
1185 start_address = ipaddress.ip_network(start_address, strict=False)
1186 except ValueError:
1187 log.warning("Could not parse line in LACNIC DB format: %s" % line)
1188 return
1189
1190 # Enumerate first and last IP address of this network
1191 end_address = start_address[-1]
1192 start_address = start_address[0]
1193
1194 else:
1195 # Convert to IP address
1196 try:
1197 start_address = ipaddress.ip_address(start_address)
1198 end_address = ipaddress.ip_address(end_address)
1199 except ValueError:
1200 log.warning("Could not parse line: %s" % line)
1201 return
1202
1203 inetnum["inetnum"] = list(ipaddress.summarize_address_range(start_address, end_address))
1204
1205 elif key == "inet6num":
1206 inetnum[key] = [ipaddress.ip_network(val, strict=False)]
1207
1208 elif key == "country":
1209 cc = val.upper()
1210
1211 # Ignore certain country codes
1212 if cc in IGNORED_COUNTRIES:
1213 log.debug("Ignoring country code '%s'" % cc)
1214 continue
1215
1216 # Translate country codes
1217 try:
1218 cc = TRANSLATED_COUNTRIES[cc]
1219 except KeyError:
1220 pass
1221
1222 # Do we know this country?
1223 if not cc in countries:
1224 log.warning("Skipping invalid country code '%s'" % cc)
1225 continue
1226
1227 try:
1228 inetnum[key].append(cc)
1229 except KeyError:
1230 inetnum[key] = [cc]
1231
1232 # Parse the geofeed attribute
1233 elif key == "geofeed":
1234 inetnum["geofeed"] = val
1235
1236 # Parse geofeed when used as a remark
1237 elif key == "remarks":
1238 m = re.match(r"^(?:Geofeed)\s+(https://.*)", val)
1239 if m:
1240 inetnum["geofeed"] = m.group(1)
1241
1242 # Skip empty objects
1243 if not inetnum:
1244 return
1245
1246 # Iterate through all networks enumerated from above, check them for plausibility and insert
1247 # them into the database, if _check_parsed_network() succeeded
1248 for single_network in inetnum.get("inet6num") or inetnum.get("inetnum"):
1249 if not self._check_parsed_network(single_network):
1250 continue
1251
1252 # Fetch the countries or use a list with an empty country
1253 countries = inetnum.get("country", [None])
1254
1255 # Insert the network into the database but only use the first country code
1256 for cc in countries:
1257 self.db.execute("""
1258 INSERT INTO
1259 _rirdata
1260 (
1261 network,
1262 country,
1263 original_countries,
1264 source
1265 )
1266 VALUES
1267 (
1268 %s, %s, %s, %s
1269 )
1270 ON CONFLICT (network)
1271 DO UPDATE SET country = excluded.country
1272 """, "%s" % single_network, cc, [cc for cc in countries if cc], source_key,
1273 )
1274
1275 # If there are more than one country, we will only use the first one
1276 break
1277
1278 # Update any geofeed information
1279 geofeed = inetnum.get("geofeed", None)
1280 if geofeed:
1281 self._parse_geofeed(source_key, geofeed, single_network)
1282
1283 def _parse_geofeed(self, source, url, single_network):
1284 # Parse the URL
1285 url = urllib.parse.urlparse(url)
1286
1287 # Make sure that this is a HTTPS URL
1288 if not url.scheme == "https":
1289 log.debug("Geofeed URL is not using HTTPS: %s" % geofeed)
1290 return
1291
1292 # Put the URL back together normalized
1293 url = url.geturl()
1294
1295 # Store/update any geofeeds
1296 self.db.execute("""
1297 INSERT INTO
1298 network_geofeeds
1299 (
1300 network,
1301 url,
1302 source
1303 )
1304 VALUES
1305 (
1306 %s, %s, %s
1307 )
1308 ON CONFLICT (network) DO
1309 UPDATE SET url = excluded.url""",
1310 "%s" % single_network, url, source,
1311 )
1312
1313 def _parse_org_block(self, block, source_key):
1314 org = {}
1315 for line in block:
1316 # Split line
1317 key, val = split_line(line)
1318
1319 if key == "organisation":
1320 org[key] = val.upper()
1321 elif key == "org-name":
1322 org[key] = val
1323
1324 # Skip empty objects
1325 if not org:
1326 return
1327
1328 self.db.execute("INSERT INTO _organizations(handle, name, source) \
1329 VALUES(%s, %s, %s) ON CONFLICT (handle) DO \
1330 UPDATE SET name = excluded.name",
1331 org.get("organisation"), org.get("org-name"), source_key,
1332 )
1333
1334 def _parse_line(self, line, source_key, validcountries=None):
1335 # Skip version line
1336 if line.startswith("2"):
1337 return
1338
1339 # Skip comments
1340 if line.startswith("#"):
1341 return
1342
1343 try:
1344 registry, country_code, type, line = line.split("|", 3)
1345 except:
1346 log.warning("Could not parse line: %s" % line)
1347 return
1348
1349 # Skip any unknown protocols
1350 if not type in ("ipv6", "ipv4"):
1351 log.warning("Unknown IP protocol '%s'" % type)
1352 return
1353
1354 # Skip any lines that are for stats only or do not have a country
1355 # code at all (avoids log spam below)
1356 if not country_code or country_code == '*':
1357 return
1358
1359 # Skip objects with unknown country codes
1360 if validcountries and country_code not in validcountries:
1361 log.warning("Skipping line with bogus country '%s': %s" % \
1362 (country_code, line))
1363 return
1364
1365 try:
1366 address, prefix, date, status, organization = line.split("|")
1367 except ValueError:
1368 organization = None
1369
1370 # Try parsing the line without organization
1371 try:
1372 address, prefix, date, status = line.split("|")
1373 except ValueError:
1374 log.warning("Unhandled line format: %s" % line)
1375 return
1376
1377 # Skip anything that isn't properly assigned
1378 if not status in ("assigned", "allocated"):
1379 return
1380
1381 # Cast prefix into an integer
1382 try:
1383 prefix = int(prefix)
1384 except:
1385 log.warning("Invalid prefix: %s" % prefix)
1386 return
1387
1388 # Fix prefix length for IPv4
1389 if type == "ipv4":
1390 prefix = 32 - int(math.log(prefix, 2))
1391
1392 # Try to parse the address
1393 try:
1394 network = ipaddress.ip_network("%s/%s" % (address, prefix), strict=False)
1395 except ValueError:
1396 log.warning("Invalid IP address: %s" % address)
1397 return
1398
1399 if not self._check_parsed_network(network):
1400 return
1401
1402 self.db.execute("""
1403 INSERT INTO
1404 networks
1405 (
1406 network,
1407 country,
1408 original_countries,
1409 source
1410 )
1411 VALUES
1412 (
1413 %s, %s, %s, %s
1414 )
1415 ON CONFLICT (network)
1416 DO UPDATE SET country = excluded.country
1417 """, "%s" % network, country_code, [country], source_key,
1418 )
1419
1420 def handle_update_announcements(self, ns):
1421 server = ns.server[0]
1422
1423 with self.db.transaction():
1424 if server.startswith("/"):
1425 self._handle_update_announcements_from_bird(server)
1426
1427 # Purge anything we never want here
1428 self.db.execute("""
1429 -- Delete default routes
1430 DELETE FROM announcements WHERE network = '::/0' OR network = '0.0.0.0/0';
1431
1432 -- Delete anything that is not global unicast address space
1433 DELETE FROM announcements WHERE family(network) = 6 AND NOT network <<= '2000::/3';
1434
1435 -- DELETE "current network" address space
1436 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '0.0.0.0/8';
1437
1438 -- DELETE local loopback address space
1439 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '127.0.0.0/8';
1440
1441 -- DELETE RFC 1918 address space
1442 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '10.0.0.0/8';
1443 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '172.16.0.0/12';
1444 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '192.168.0.0/16';
1445
1446 -- DELETE test, benchmark and documentation address space
1447 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '192.0.0.0/24';
1448 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '192.0.2.0/24';
1449 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '198.18.0.0/15';
1450 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '198.51.100.0/24';
1451 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '203.0.113.0/24';
1452
1453 -- DELETE CGNAT address space (RFC 6598)
1454 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '100.64.0.0/10';
1455
1456 -- DELETE link local address space
1457 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '169.254.0.0/16';
1458
1459 -- DELETE IPv6 to IPv4 (6to4) address space (RFC 3068)
1460 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '192.88.99.0/24';
1461 DELETE FROM announcements WHERE family(network) = 6 AND network <<= '2002::/16';
1462
1463 -- DELETE multicast and reserved address space
1464 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '224.0.0.0/4';
1465 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '240.0.0.0/4';
1466
1467 -- Delete networks that are too small to be in the global routing table
1468 DELETE FROM announcements WHERE family(network) = 6 AND masklen(network) > 48;
1469 DELETE FROM announcements WHERE family(network) = 4 AND masklen(network) > 24;
1470
1471 -- Delete any non-public or reserved ASNs
1472 DELETE FROM announcements WHERE NOT (
1473 (autnum >= 1 AND autnum <= 23455)
1474 OR
1475 (autnum >= 23457 AND autnum <= 64495)
1476 OR
1477 (autnum >= 131072 AND autnum <= 4199999999)
1478 );
1479
1480 -- Delete everything that we have not seen for 14 days
1481 DELETE FROM announcements WHERE last_seen_at <= CURRENT_TIMESTAMP - INTERVAL '14 days';
1482 """)
1483
1484 def _handle_update_announcements_from_bird(self, server):
1485 # Pre-compile the regular expression for faster searching
1486 route = re.compile(b"^\s(.+?)\s+.+?\[(?:AS(.*?))?.\]$")
1487
1488 log.info("Requesting routing table from Bird (%s)" % server)
1489
1490 aggregated_networks = []
1491
1492 # Send command to list all routes
1493 for line in self._bird_cmd(server, "show route"):
1494 m = route.match(line)
1495 if not m:
1496 # Skip empty lines
1497 if not line:
1498 pass
1499
1500 # Ignore any header lines with the name of the routing table
1501 elif line.startswith(b"Table"):
1502 pass
1503
1504 # Log anything else
1505 else:
1506 log.debug("Could not parse line: %s" % line.decode())
1507
1508 continue
1509
1510 # Fetch the extracted network and ASN
1511 network, autnum = m.groups()
1512
1513 # Decode into strings
1514 if network:
1515 network = network.decode()
1516 if autnum:
1517 autnum = autnum.decode()
1518
1519 # Collect all aggregated networks
1520 if not autnum:
1521 log.debug("%s is an aggregated network" % network)
1522 aggregated_networks.append(network)
1523 continue
1524
1525 # Insert it into the database
1526 self.db.execute("INSERT INTO announcements(network, autnum) \
1527 VALUES(%s, %s) ON CONFLICT (network) DO \
1528 UPDATE SET autnum = excluded.autnum, last_seen_at = CURRENT_TIMESTAMP",
1529 network, autnum,
1530 )
1531
1532 # Process any aggregated networks
1533 for network in aggregated_networks:
1534 log.debug("Processing aggregated network %s" % network)
1535
1536 # Run "show route all" for each network
1537 for line in self._bird_cmd(server, "show route %s all" % network):
1538 # Try finding the path
1539 m = re.match(b"\s+BGP\.as_path:.* (\d+) {\d+}$", line)
1540 if m:
1541 # Select the last AS number in the path
1542 autnum = m.group(1).decode()
1543
1544 # Insert it into the database
1545 self.db.execute("INSERT INTO announcements(network, autnum) \
1546 VALUES(%s, %s) ON CONFLICT (network) DO \
1547 UPDATE SET autnum = excluded.autnum, last_seen_at = CURRENT_TIMESTAMP",
1548 network, autnum,
1549 )
1550
1551 # We don't need to process any more
1552 break
1553
1554 def _bird_cmd(self, socket_path, command):
1555 # Connect to the socket
1556 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1557 s.connect(socket_path)
1558
1559 # Allocate some buffer
1560 buffer = b""
1561
1562 log.debug("Sending Bird command: %s" % command)
1563
1564 # Send the command
1565 s.send(b"%s\n" % command.encode())
1566
1567 while True:
1568 # Fill up the buffer
1569 buffer += s.recv(4096)
1570
1571 while True:
1572 # Search for the next newline
1573 pos = buffer.find(b"\n")
1574
1575 # If we cannot find one, we go back and read more data
1576 if pos <= 0:
1577 break
1578
1579 # Cut after the newline character
1580 pos += 1
1581
1582 # Split the line we want and keep the rest in buffer
1583 line, buffer = buffer[:pos], buffer[pos:]
1584
1585 # Try parsing any status lines
1586 if len(line) > 4 and line[:4].isdigit() and line[4] in (32, 45):
1587 code, delim, line = int(line[:4]), line[4], line[5:]
1588
1589 log.debug("Received response code %s from bird" % code)
1590
1591 # End of output
1592 if code == 0:
1593 return
1594
1595 # Ignore hello line
1596 elif code == 1:
1597 continue
1598
1599 # Otherwise return the line
1600 yield line
1601
1602 def handle_update_geofeeds(self, ns):
1603 # Sync geofeeds
1604 with self.db.transaction():
1605 # Delete all geofeeds which are no longer linked
1606 self.db.execute("""
1607 DELETE FROM
1608 geofeeds
1609 WHERE
1610 NOT EXISTS (
1611 SELECT
1612 1
1613 FROM
1614 network_geofeeds
1615 WHERE
1616 geofeeds.url = network_geofeeds.url
1617 )""",
1618 )
1619
1620 # Copy all geofeeds
1621 self.db.execute("""
1622 INSERT INTO
1623 geofeeds(
1624 url
1625 )
1626 SELECT
1627 url
1628 FROM
1629 network_geofeeds
1630 ON CONFLICT (url)
1631 DO NOTHING
1632 """,
1633 )
1634
1635 # Fetch all Geofeeds that require an update
1636 geofeeds = self.db.query("""
1637 SELECT
1638 id,
1639 url
1640 FROM
1641 geofeeds
1642 WHERE
1643 updated_at IS NULL
1644 OR
1645 updated_at <= CURRENT_TIMESTAMP - INTERVAL '1 week'
1646 ORDER BY
1647 id
1648 """)
1649
1650 # Update all geofeeds
1651 for geofeed in geofeeds:
1652 with self.db.transaction():
1653 self._fetch_geofeed(geofeed)
1654
1655 # Delete data from any feeds that did not update in the last two weeks
1656 with self.db.transaction():
1657 self.db.execute("""
1658 DELETE FROM
1659 geofeed_networks
1660 WHERE
1661 geofeed_networks.geofeed_id IN (
1662 SELECT
1663 geofeeds.id
1664 FROM
1665 geofeeds
1666 WHERE
1667 updated_at IS NULL
1668 OR
1669 updated_at <= CURRENT_TIMESTAMP - INTERVAL '2 weeks'
1670 )
1671 """)
1672
1673 def _fetch_geofeed(self, geofeed):
1674 log.debug("Fetching Geofeed %s" % geofeed.url)
1675
1676 with self.db.transaction():
1677 # Open the URL
1678 try:
1679 # Send the request
1680 f = self.downloader.retrieve(geofeed.url, headers={
1681 "User-Agent" : "location/%s" % location.__version__,
1682
1683 # We expect some plain text file in CSV format
1684 "Accept" : "text/csv, text/plain",
1685 })
1686
1687 # Remove any previous data
1688 self.db.execute("DELETE FROM geofeed_networks \
1689 WHERE geofeed_id = %s", geofeed.id)
1690
1691 lineno = 0
1692
1693 # Read the output line by line
1694 for line in f:
1695 lineno += 1
1696
1697 try:
1698 line = line.decode()
1699
1700 # Ignore any lines we cannot decode
1701 except UnicodeDecodeError:
1702 log.debug("Could not decode line %s in %s" \
1703 % (lineno, geofeed.url))
1704 continue
1705
1706 # Strip any newline
1707 line = line.rstrip()
1708
1709 # Skip empty lines
1710 if not line:
1711 continue
1712
1713 # Try to parse the line
1714 try:
1715 fields = line.split(",", 5)
1716 except ValueError:
1717 log.debug("Could not parse line: %s" % line)
1718 continue
1719
1720 # Check if we have enough fields
1721 if len(fields) < 4:
1722 log.debug("Not enough fields in line: %s" % line)
1723 continue
1724
1725 # Fetch all fields
1726 network, country, region, city, = fields[:4]
1727
1728 # Try to parse the network
1729 try:
1730 network = ipaddress.ip_network(network, strict=False)
1731 except ValueError:
1732 log.debug("Could not parse network: %s" % network)
1733 continue
1734
1735 # Strip any excess whitespace from country codes
1736 country = country.strip()
1737
1738 # Make the country code uppercase
1739 country = country.upper()
1740
1741 # Check the country code
1742 if not country:
1743 log.debug("Empty country code in Geofeed %s line %s" \
1744 % (geofeed.url, lineno))
1745 continue
1746
1747 elif not location.country_code_is_valid(country):
1748 log.debug("Invalid country code in Geofeed %s:%s: %s" \
1749 % (geofeed.url, lineno, country))
1750 continue
1751
1752 # Write this into the database
1753 self.db.execute("""
1754 INSERT INTO
1755 geofeed_networks (
1756 geofeed_id,
1757 network,
1758 country,
1759 region,
1760 city
1761 )
1762 VALUES (%s, %s, %s, %s, %s)""",
1763 geofeed.id,
1764 "%s" % network,
1765 country,
1766 region,
1767 city,
1768 )
1769
1770 # Catch any HTTP errors
1771 except urllib.request.HTTPError as e:
1772 self.db.execute("UPDATE geofeeds SET status = %s, error = %s \
1773 WHERE id = %s", e.code, "%s" % e, geofeed.id)
1774
1775 # Remove any previous data when the feed has been deleted
1776 if e.code == 404:
1777 self.db.execute("DELETE FROM geofeed_networks \
1778 WHERE geofeed_id = %s", geofeed.id)
1779
1780 # Catch any other errors and connection timeouts
1781 except (http.client.InvalidURL, urllib.request.URLError, TimeoutError) as e:
1782 log.debug("Could not fetch URL %s: %s" % (geofeed.url, e))
1783
1784 self.db.execute("UPDATE geofeeds SET status = %s, error = %s \
1785 WHERE id = %s", 599, "%s" % e, geofeed.id)
1786
1787 # Mark the geofeed as updated
1788 else:
1789 self.db.execute("""
1790 UPDATE
1791 geofeeds
1792 SET
1793 updated_at = CURRENT_TIMESTAMP,
1794 status = NULL,
1795 error = NULL
1796 WHERE
1797 id = %s""",
1798 geofeed.id,
1799 )
1800
1801 def handle_update_overrides(self, ns):
1802 with self.db.transaction():
1803 # Drop any previous content
1804 self.db.execute("TRUNCATE TABLE autnum_overrides")
1805 self.db.execute("TRUNCATE TABLE network_overrides")
1806
1807 for file in ns.files:
1808 log.info("Reading %s..." % file)
1809
1810 with open(file, "rb") as f:
1811 for type, block in read_blocks(f):
1812 if type == "net":
1813 network = block.get("net")
1814 # Try to parse and normalise the network
1815 try:
1816 network = ipaddress.ip_network(network, strict=False)
1817 except ValueError as e:
1818 log.warning("Invalid IP network: %s: %s" % (network, e))
1819 continue
1820
1821 # Prevent that we overwrite all networks
1822 if network.prefixlen == 0:
1823 log.warning("Skipping %s: You cannot overwrite default" % network)
1824 continue
1825
1826 self.db.execute("""
1827 INSERT INTO
1828 network_overrides
1829 (
1830 network,
1831 country,
1832 is_anonymous_proxy,
1833 is_satellite_provider,
1834 is_anycast,
1835 is_drop
1836 )
1837 VALUES
1838 (
1839 %s, %s, %s, %s, %s, %s
1840 )
1841 ON CONFLICT (network) DO NOTHING
1842 """,
1843 "%s" % network,
1844 block.get("country"),
1845 self._parse_bool(block, "is-anonymous-proxy"),
1846 self._parse_bool(block, "is-satellite-provider"),
1847 self._parse_bool(block, "is-anycast"),
1848 self._parse_bool(block, "drop"),
1849 )
1850
1851 elif type == "aut-num":
1852 autnum = block.get("aut-num")
1853
1854 # Check if AS number begins with "AS"
1855 if not autnum.startswith("AS"):
1856 log.warning("Invalid AS number: %s" % autnum)
1857 continue
1858
1859 # Strip "AS"
1860 autnum = autnum[2:]
1861
1862 self.db.execute("""
1863 INSERT INTO
1864 autnum_overrides
1865 (
1866 number,
1867 name,
1868 country,
1869 is_anonymous_proxy,
1870 is_satellite_provider,
1871 is_anycast,
1872 is_drop
1873 )
1874 VALUES
1875 (
1876 %s, %s, %s, %s, %s, %s, %s
1877 )
1878 ON CONFLICT (number) DO NOTHING
1879 """,
1880 autnum,
1881 block.get("name"),
1882 block.get("country"),
1883 self._parse_bool(block, "is-anonymous-proxy"),
1884 self._parse_bool(block, "is-satellite-provider"),
1885 self._parse_bool(block, "is-anycast"),
1886 self._parse_bool(block, "drop"),
1887 )
1888
1889 else:
1890 log.warning("Unsupported type: %s" % type)
1891
1892 def handle_update_feeds(self, ns):
1893 """
1894 Update any third-party feeds
1895 """
1896 success = True
1897
1898 feeds = (
1899 # AWS IP Ranges
1900 ("AWS-IP-RANGES", self._import_aws_ip_ranges, "https://ip-ranges.amazonaws.com/ip-ranges.json"),
1901
1902 # Spamhaus DROP
1903 ("SPAMHAUS-DROP", self._import_spamhaus_drop, "https://www.spamhaus.org/drop/drop.txt"),
1904 ("SPAMHAUS-EDROP", self._import_spamhaus_drop, "https://www.spamhaus.org/drop/edrop.txt"),
1905 ("SPAMHAUS-DROPV6", self._import_spamhaus_drop, "https://www.spamhaus.org/drop/dropv6.txt"),
1906
1907 # Spamhaus ASNDROP
1908 ("SPAMHAUS-ASNDROP", self._import_spamhaus_asndrop, "https://www.spamhaus.org/drop/asndrop.json"),
1909 )
1910
1911 # Drop any data from feeds that we don't support (any more)
1912 with self.db.transaction():
1913 # Fetch the names of all feeds we support
1914 sources = [name for name, *rest in feeds]
1915
1916 self.db.execute("DELETE FROM autnum_feeds WHERE NOT source = ANY(%s)", sources)
1917 self.db.execute("DELETE FROM network_feeds WHERE NOT source = ANY(%s)", sources)
1918
1919 # Walk through all feeds
1920 for name, callback, url, *args in feeds:
1921 # Skip any feeds that were not requested on the command line
1922 if ns.feeds and not name in ns.feeds:
1923 continue
1924
1925 try:
1926 self._process_feed(name, callback, url, *args)
1927
1928 # Log an error but continue if an exception occurs
1929 except Exception as e:
1930 log.error("Error processing feed '%s': %s" % (name, e))
1931 success = False
1932
1933 # Return status
1934 return 0 if success else 1
1935
1936 def _process_feed(self, name, callback, url, *args):
1937 """
1938 Processes one feed
1939 """
1940 # Open the URL
1941 f = self.downloader.retrieve(url)
1942
1943 with self.db.transaction():
1944 # Drop any previous content
1945 self.db.execute("DELETE FROM autnum_feeds WHERE source = %s", name)
1946 self.db.execute("DELETE FROM network_feeds WHERE source = %s", name)
1947
1948 # Call the callback to process the feed
1949 return callback(name, f, *args)
1950
1951 def _import_aws_ip_ranges(self, name, f):
1952 # Parse the feed
1953 feed = json.load(f)
1954
1955 # Set up a dictionary for mapping a region name to a country. Unfortunately,
1956 # there seems to be no machine-readable version available of this other than
1957 # https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html
1958 # (worse, it seems to be incomplete :-/ ); https://www.cloudping.cloud/endpoints
1959 # was helpful here as well.
1960 aws_region_country_map = {
1961 # Africa
1962 "af-south-1" : "ZA",
1963
1964 # Asia
1965 "il-central-1" : "IL", # Tel Aviv
1966
1967 # Asia/Pacific
1968 "ap-northeast-1" : "JP",
1969 "ap-northeast-2" : "KR",
1970 "ap-northeast-3" : "JP",
1971 "ap-east-1" : "HK",
1972 "ap-south-1" : "IN",
1973 "ap-south-2" : "IN",
1974 "ap-southeast-1" : "SG",
1975 "ap-southeast-2" : "AU",
1976 "ap-southeast-3" : "MY",
1977 "ap-southeast-4" : "AU",
1978 "ap-southeast-5" : "NZ", # Auckland, NZ
1979 "ap-southeast-6" : "AP", # XXX: Precise location not documented anywhere
1980
1981 # Canada
1982 "ca-central-1" : "CA",
1983 "ca-west-1" : "CA",
1984
1985 # Europe
1986 "eu-central-1" : "DE",
1987 "eu-central-2" : "CH",
1988 "eu-north-1" : "SE",
1989 "eu-west-1" : "IE",
1990 "eu-west-2" : "GB",
1991 "eu-west-3" : "FR",
1992 "eu-south-1" : "IT",
1993 "eu-south-2" : "ES",
1994
1995 # Middle East
1996 "me-central-1" : "AE",
1997 "me-south-1" : "BH",
1998
1999 # South America
2000 "sa-east-1" : "BR",
2001
2002 # Undocumented, likely located in Berlin rather than Frankfurt
2003 "eusc-de-east-1" : "DE",
2004 }
2005
2006 # Collect a list of all networks
2007 prefixes = feed.get("ipv6_prefixes", []) + feed.get("prefixes", [])
2008
2009 for prefix in prefixes:
2010 # Fetch network
2011 network = prefix.get("ipv6_prefix") or prefix.get("ip_prefix")
2012
2013 # Parse the network
2014 try:
2015 network = ipaddress.ip_network(network)
2016 except ValuleError as e:
2017 log.warning("%s: Unable to parse prefix %s" % (name, network))
2018 continue
2019
2020 # Sanitize parsed networks...
2021 if not self._check_parsed_network(network):
2022 continue
2023
2024 # Fetch the region
2025 region = prefix.get("region")
2026
2027 # Set some defaults
2028 cc = None
2029 is_anycast = False
2030
2031 # Fetch the CC from the dictionary
2032 try:
2033 cc = aws_region_country_map[region]
2034
2035 # If we couldn't find anything, let's try something else...
2036 except KeyError as e:
2037 # Find anycast networks
2038 if region == "GLOBAL":
2039 is_anycast = True
2040
2041 # Everything that starts with us- is probably in the United States
2042 elif region.startswith("us-"):
2043 cc = "US"
2044
2045 # Everything that starts with cn- is probably China
2046 elif region.startswith("cn-"):
2047 cc = "CN"
2048
2049 # Log a warning for anything else
2050 else:
2051 log.warning("%s: Could not determine country code for AWS region %s" \
2052 % (name, region))
2053 continue
2054
2055 # Write to database
2056 self.db.execute("""
2057 INSERT INTO
2058 network_feeds
2059 (
2060 network,
2061 source,
2062 country,
2063 is_anycast
2064 )
2065 VALUES
2066 (
2067 %s, %s, %s, %s
2068 )
2069 ON CONFLICT (network, source) DO NOTHING
2070 """, "%s" % network, name, cc, is_anycast,
2071 )
2072
2073 def _import_spamhaus_drop(self, name, f):
2074 """
2075 Import Spamhaus DROP IP feeds
2076 """
2077 # Count all lines
2078 lines = 0
2079
2080 # Walk through all lines
2081 for line in f:
2082 # Decode line
2083 line = line.decode("utf-8")
2084
2085 # Strip off any comments
2086 line, _, comment = line.partition(";")
2087
2088 # Ignore empty lines
2089 if not line:
2090 continue
2091
2092 # Strip any excess whitespace
2093 line = line.strip()
2094
2095 # Increment line counter
2096 lines += 1
2097
2098 # Parse the network
2099 try:
2100 network = ipaddress.ip_network(line)
2101 except ValueError as e:
2102 log.warning("%s: Could not parse network: %s - %s" % (name, line, e))
2103 continue
2104
2105 # Check network
2106 if not self._check_parsed_network(network):
2107 log.warning("%s: Skipping bogus network: %s" % (name, network))
2108 continue
2109
2110 # Insert into the database
2111 self.db.execute("""
2112 INSERT INTO
2113 network_feeds
2114 (
2115 network,
2116 source,
2117 is_drop
2118 )
2119 VALUES
2120 (
2121 %s, %s, %s
2122 )""", "%s" % network, name, True,
2123 )
2124
2125 # Raise an exception if we could not import anything
2126 if not lines:
2127 raise RuntimeError("Received bogus feed %s with no data" % name)
2128
2129 def _import_spamhaus_asndrop(self, name, f):
2130 """
2131 Import Spamhaus ASNDROP feed
2132 """
2133 for line in f:
2134 # Decode the line
2135 line = line.decode("utf-8")
2136
2137 # Parse JSON
2138 try:
2139 line = json.loads(line)
2140 except json.JSONDecodeError as e:
2141 log.warning("%s: Unable to parse JSON object %s: %s" % (name, line, e))
2142 continue
2143
2144 # Fetch type
2145 type = line.get("type")
2146
2147 # Skip any metadata
2148 if type == "metadata":
2149 continue
2150
2151 # Fetch ASN
2152 asn = line.get("asn")
2153
2154 # Skip any lines without an ASN
2155 if not asn:
2156 continue
2157
2158 # Filter invalid ASNs
2159 if not self._check_parsed_asn(asn):
2160 log.warning("%s: Skipping bogus ASN %s" % (name, asn))
2161 continue
2162
2163 # Write to database
2164 self.db.execute("""
2165 INSERT INTO
2166 autnum_feeds
2167 (
2168 number,
2169 source,
2170 is_drop
2171 )
2172 VALUES
2173 (
2174 %s, %s, %s
2175 )""", "%s" % asn, name, True,
2176 )
2177
2178 @staticmethod
2179 def _parse_bool(block, key):
2180 val = block.get(key)
2181
2182 # There is no point to proceed when we got None
2183 if val is None:
2184 return
2185
2186 # Convert to lowercase
2187 val = val.lower()
2188
2189 # True
2190 if val in ("yes", "1"):
2191 return True
2192
2193 # False
2194 if val in ("no", "0"):
2195 return False
2196
2197 # Default to None
2198 return None
2199
2200 def handle_import_countries(self, ns):
2201 with self.db.transaction():
2202 # Drop all data that we have
2203 self.db.execute("TRUNCATE TABLE countries")
2204
2205 for file in ns.file:
2206 for line in file:
2207 line = line.rstrip()
2208
2209 # Ignore any comments
2210 if line.startswith("#"):
2211 continue
2212
2213 try:
2214 country_code, continent_code, name = line.split(maxsplit=2)
2215 except:
2216 log.warning("Could not parse line: %s" % line)
2217 continue
2218
2219 self.db.execute("INSERT INTO countries(country_code, name, continent_code) \
2220 VALUES(%s, %s, %s) ON CONFLICT DO NOTHING", country_code, name, continent_code)
2221
2222
2223 def split_line(line):
2224 key, colon, val = line.partition(":")
2225
2226 # Strip any excess space
2227 key = key.strip()
2228 val = val.strip()
2229
2230 return key, val
2231
2232 def read_blocks(f):
2233 for block in iterate_over_blocks(f):
2234 type = None
2235 data = {}
2236
2237 for i, line in enumerate(block):
2238 key, value = line.split(":", 1)
2239
2240 # The key of the first line defines the type
2241 if i == 0:
2242 type = key
2243
2244 # Store value
2245 data[key] = value.strip()
2246
2247 yield type, data
2248
2249 def iterate_over_blocks(f, charsets=("utf-8", "latin1")):
2250 block = []
2251
2252 for line in f:
2253 # Skip commented lines
2254 if line.startswith(b"#") or line.startswith(b"%"):
2255 continue
2256
2257 # Convert to string
2258 for charset in charsets:
2259 try:
2260 line = line.decode(charset)
2261 except UnicodeDecodeError:
2262 continue
2263 else:
2264 break
2265
2266 # Remove any comments at the end of line
2267 line, hash, comment = line.partition("#")
2268
2269 # Strip any whitespace at the end of the line
2270 line = line.rstrip()
2271
2272 # If we cut off some comment and the line is empty, we can skip it
2273 if comment and not line:
2274 continue
2275
2276 # If the line has some content, keep collecting it
2277 if line:
2278 block.append(line)
2279 continue
2280
2281 # End the block on an empty line
2282 if block:
2283 yield block
2284
2285 # Reset the block
2286 block = []
2287
2288 # Return the last block
2289 if block:
2290 yield block
2291
2292 def iterate_over_lines(f):
2293 for line in f:
2294 # Decode the line
2295 line = line.decode()
2296
2297 # Strip the ending
2298 yield line.rstrip()
2299
2300 def main():
2301 # Run the command line interface
2302 c = CLI()
2303 c.run()
2304
2305 main()