]> git.ipfire.org Git - people/ms/libloc.git/blob - src/scripts/location-importer.in
641aec2d44435c53bd6c8ad9d4dd5cf918d15687
[people/ms/libloc.git] / src / scripts / location-importer.in
1 #!/usr/bin/python3
2 ###############################################################################
3 # #
4 # libloc - A library to determine the location of someone on the Internet #
5 # #
6 # Copyright (C) 2020-2024 IPFire Development Team <info@ipfire.org> #
7 # #
8 # This library is free software; you can redistribute it and/or #
9 # modify it under the terms of the GNU Lesser General Public #
10 # License as published by the Free Software Foundation; either #
11 # version 2.1 of the License, or (at your option) any later version. #
12 # #
13 # This library is distributed in the hope that it will be useful, #
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of #
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU #
16 # Lesser General Public License for more details. #
17 # #
18 ###############################################################################
19
20 import argparse
21 import asyncio
22 import csv
23 import functools
24 import http.client
25 import io
26 import ipaddress
27 import json
28 import logging
29 import math
30 import re
31 import socket
32 import sys
33 import urllib.error
34
35 # Load our location module
36 import location
37 import location.database
38 from location.downloader import Downloader
39 from location.i18n import _
40
41 # Initialise logging
42 log = logging.getLogger("location.importer")
43 log.propagate = 1
44
45 # Define constants
46 VALID_ASN_RANGES = (
47 (1, 23455),
48 (23457, 64495),
49 (131072, 4199999999),
50 )
51
52 TRANSLATED_COUNTRIES = {
53 # When people say UK, they mean GB
54 "UK" : "GB",
55 }
56
57 IGNORED_COUNTRIES = set((
58 # Formerly Yugoslavia
59 "YU",
60
61 # Some people use ZZ to say "no country" or to hide the country
62 "ZZ",
63 ))
64
65 # Configure the CSV parser for ARIN
66 csv.register_dialect("arin", delimiter=",", quoting=csv.QUOTE_ALL, quotechar="\"")
67
68 class CLI(object):
69 def parse_cli(self):
70 parser = argparse.ArgumentParser(
71 description=_("Location Importer Command Line Interface"),
72 )
73 subparsers = parser.add_subparsers()
74
75 # Global configuration flags
76 parser.add_argument("--debug", action="store_true",
77 help=_("Enable debug output"))
78 parser.add_argument("--quiet", action="store_true",
79 help=_("Enable quiet mode"))
80
81 # version
82 parser.add_argument("--version", action="version",
83 version="%(prog)s @VERSION@")
84
85 # Database
86 parser.add_argument("--database-host", required=True,
87 help=_("Database Hostname"), metavar=_("HOST"))
88 parser.add_argument("--database-name", required=True,
89 help=_("Database Name"), metavar=_("NAME"))
90 parser.add_argument("--database-username", required=True,
91 help=_("Database Username"), metavar=_("USERNAME"))
92 parser.add_argument("--database-password", required=True,
93 help=_("Database Password"), metavar=_("PASSWORD"))
94
95 # Write Database
96 write = subparsers.add_parser("write", help=_("Write database to file"))
97 write.set_defaults(func=self.handle_write)
98 write.add_argument("file", nargs=1, help=_("Database File"))
99 write.add_argument("--signing-key", nargs="?", type=open, help=_("Signing Key"))
100 write.add_argument("--backup-signing-key", nargs="?", type=open, help=_("Backup Signing Key"))
101 write.add_argument("--vendor", nargs="?", help=_("Sets the vendor"))
102 write.add_argument("--description", nargs="?", help=_("Sets a description"))
103 write.add_argument("--license", nargs="?", help=_("Sets the license"))
104 write.add_argument("--version", type=int, help=_("Database Format Version"))
105
106 # Update WHOIS
107 update_whois = subparsers.add_parser("update-whois", help=_("Update WHOIS Information"))
108 update_whois.add_argument("sources", nargs="*",
109 help=_("Only update these sources"))
110 update_whois.set_defaults(func=self.handle_update_whois)
111
112 # Update announcements
113 update_announcements = subparsers.add_parser("update-announcements",
114 help=_("Update BGP Annoucements"))
115 update_announcements.set_defaults(func=self.handle_update_announcements)
116 update_announcements.add_argument("server", nargs=1,
117 help=_("Route Server to connect to"), metavar=_("SERVER"))
118
119 # Update geofeeds
120 update_geofeeds = subparsers.add_parser("update-geofeeds",
121 help=_("Update Geofeeds"))
122 update_geofeeds.set_defaults(func=self.handle_update_geofeeds)
123
124 # Update feeds
125 update_feeds = subparsers.add_parser("update-feeds",
126 help=_("Update Feeds"))
127 update_feeds.add_argument("feeds", nargs="*",
128 help=_("Only update these feeds"))
129 update_feeds.set_defaults(func=self.handle_update_feeds)
130
131 # Update overrides
132 update_overrides = subparsers.add_parser("update-overrides",
133 help=_("Update overrides"),
134 )
135 update_overrides.add_argument(
136 "files", nargs="+", help=_("Files to import"),
137 )
138 update_overrides.set_defaults(func=self.handle_update_overrides)
139
140 # Import countries
141 import_countries = subparsers.add_parser("import-countries",
142 help=_("Import countries"),
143 )
144 import_countries.add_argument("file", nargs=1, type=argparse.FileType("r"),
145 help=_("File to import"))
146 import_countries.set_defaults(func=self.handle_import_countries)
147
148 args = parser.parse_args()
149
150 # Configure logging
151 if args.debug:
152 location.logger.set_level(logging.DEBUG)
153 elif args.quiet:
154 location.logger.set_level(logging.WARNING)
155
156 # Print usage if no action was given
157 if not "func" in args:
158 parser.print_usage()
159 sys.exit(2)
160
161 return args
162
163 async def run(self):
164 # Parse command line arguments
165 args = self.parse_cli()
166
167 # Initialize the downloader
168 self.downloader = Downloader()
169
170 # Initialise database
171 self.db = self._setup_database(args)
172
173 # Call function
174 ret = await args.func(args)
175
176 # Return with exit code
177 if ret:
178 sys.exit(ret)
179
180 # Otherwise just exit
181 sys.exit(0)
182
183 def _setup_database(self, ns):
184 """
185 Initialise the database
186 """
187 # Connect to database
188 db = location.database.Connection(
189 host=ns.database_host, database=ns.database_name,
190 user=ns.database_username, password=ns.database_password,
191 )
192
193 with db.transaction():
194 db.execute("""
195 -- announcements
196 CREATE TABLE IF NOT EXISTS announcements(network inet, autnum bigint,
197 first_seen_at timestamp without time zone DEFAULT CURRENT_TIMESTAMP,
198 last_seen_at timestamp without time zone DEFAULT CURRENT_TIMESTAMP);
199 CREATE UNIQUE INDEX IF NOT EXISTS announcements_networks ON announcements(network);
200 CREATE INDEX IF NOT EXISTS announcements_search2 ON announcements
201 USING SPGIST(network inet_ops);
202 ALTER TABLE announcements ALTER COLUMN first_seen_at SET NOT NULL;
203 ALTER TABLE announcements ALTER COLUMN last_seen_at SET NOT NULL;
204
205 -- autnums
206 CREATE TABLE IF NOT EXISTS autnums(number bigint, name text NOT NULL);
207 ALTER TABLE autnums ADD COLUMN IF NOT EXISTS source text;
208 CREATE UNIQUE INDEX IF NOT EXISTS autnums_number ON autnums(number);
209
210 -- countries
211 CREATE TABLE IF NOT EXISTS countries(
212 country_code text NOT NULL, name text NOT NULL, continent_code text NOT NULL);
213 CREATE UNIQUE INDEX IF NOT EXISTS countries_country_code ON countries(country_code);
214
215 -- networks
216 CREATE TABLE IF NOT EXISTS networks(network inet, country text);
217 ALTER TABLE networks ADD COLUMN IF NOT EXISTS original_countries text[];
218 ALTER TABLE networks ADD COLUMN IF NOT EXISTS source text;
219 CREATE UNIQUE INDEX IF NOT EXISTS networks_network ON networks(network);
220 CREATE INDEX IF NOT EXISTS networks_search2 ON networks
221 USING SPGIST(network inet_ops);
222
223 -- geofeeds
224 CREATE TABLE IF NOT EXISTS geofeeds(
225 id serial primary key,
226 url text,
227 status integer default null,
228 updated_at timestamp without time zone default null
229 );
230 ALTER TABLE geofeeds ADD COLUMN IF NOT EXISTS error text;
231 CREATE UNIQUE INDEX IF NOT EXISTS geofeeds_unique
232 ON geofeeds(url);
233 CREATE TABLE IF NOT EXISTS geofeed_networks(
234 geofeed_id integer references geofeeds(id) on delete cascade,
235 network inet,
236 country text,
237 region text,
238 city text
239 );
240 CREATE INDEX IF NOT EXISTS geofeed_networks_geofeed_id
241 ON geofeed_networks(geofeed_id);
242 CREATE TABLE IF NOT EXISTS network_geofeeds(network inet, url text);
243 ALTER TABLE network_geofeeds ADD COLUMN IF NOT EXISTS source text NOT NULL;
244 CREATE UNIQUE INDEX IF NOT EXISTS network_geofeeds_unique2
245 ON network_geofeeds(network, url);
246 CREATE INDEX IF NOT EXISTS network_geofeeds_url
247 ON network_geofeeds(url);
248
249 -- feeds
250 CREATE TABLE IF NOT EXISTS autnum_feeds(
251 number bigint NOT NULL,
252 source text NOT NULL,
253 name text,
254 country text,
255 is_anonymous_proxy boolean,
256 is_satellite_provider boolean,
257 is_anycast boolean,
258 is_drop boolean
259 );
260 CREATE UNIQUE INDEX IF NOT EXISTS autnum_feeds_unique
261 ON autnum_feeds(number, source);
262
263 CREATE TABLE IF NOT EXISTS network_feeds(
264 network inet NOT NULL,
265 source text NOT NULL,
266 country text,
267 is_anonymous_proxy boolean,
268 is_satellite_provider boolean,
269 is_anycast boolean,
270 is_drop boolean
271 );
272 CREATE UNIQUE INDEX IF NOT EXISTS network_feeds_unique
273 ON network_feeds(network, source);
274
275 -- overrides
276 CREATE TABLE IF NOT EXISTS autnum_overrides(
277 number bigint NOT NULL,
278 name text,
279 country text,
280 is_anonymous_proxy boolean,
281 is_satellite_provider boolean,
282 is_anycast boolean
283 );
284 CREATE UNIQUE INDEX IF NOT EXISTS autnum_overrides_number
285 ON autnum_overrides(number);
286 ALTER TABLE autnum_overrides ADD COLUMN IF NOT EXISTS is_drop boolean;
287 ALTER TABLE autnum_overrides DROP COLUMN IF EXISTS source;
288
289 CREATE TABLE IF NOT EXISTS network_overrides(
290 network inet NOT NULL,
291 country text,
292 is_anonymous_proxy boolean,
293 is_satellite_provider boolean,
294 is_anycast boolean
295 );
296 CREATE UNIQUE INDEX IF NOT EXISTS network_overrides_network
297 ON network_overrides(network);
298 ALTER TABLE network_overrides ADD COLUMN IF NOT EXISTS is_drop boolean;
299 ALTER TABLE network_overrides DROP COLUMN IF EXISTS source;
300
301 -- Cleanup things we no longer need
302 DROP TABLE IF EXISTS geofeed_overrides;
303 DROP INDEX IF EXISTS announcements_family;
304 DROP INDEX IF EXISTS announcements_search;
305 DROP INDEX IF EXISTS geofeed_networks_search;
306 DROP INDEX IF EXISTS networks_family;
307 DROP INDEX IF EXISTS networks_search;
308 DROP INDEX IF EXISTS network_feeds_search;
309 DROP INDEX IF EXISTS network_geofeeds_unique;
310 DROP INDEX IF EXISTS network_geofeeds_search;
311 DROP INDEX IF EXISTS network_overrides_search;
312 """)
313
314 return db
315
316 def fetch_countries(self):
317 """
318 Returns a list of all countries on the list
319 """
320 # Fetch all valid country codes to check parsed networks aganist...
321 countries = self.db.query("SELECT country_code FROM countries ORDER BY country_code")
322
323 return set((country.country_code for country in countries))
324
325 async def handle_write(self, ns):
326 """
327 Compiles a database in libloc format out of what is in the database
328 """
329 # Allocate a writer
330 writer = location.Writer(ns.signing_key, ns.backup_signing_key)
331
332 # Set all metadata
333 if ns.vendor:
334 writer.vendor = ns.vendor
335
336 if ns.description:
337 writer.description = ns.description
338
339 if ns.license:
340 writer.license = ns.license
341
342 # Analyze everything for the query planner hopefully making better decisions
343 self.db.execute("ANALYZE")
344
345 # Add all Autonomous Systems
346 log.info("Writing Autonomous Systems...")
347
348 # Select all ASes with a name
349 rows = self.db.query("""
350 SELECT
351 autnums.number AS number,
352 COALESCE(
353 overrides.name,
354 autnums.name
355 ) AS name
356 FROM
357 autnums
358 LEFT JOIN
359 autnum_overrides overrides ON autnums.number = overrides.number
360 ORDER BY
361 autnums.number
362 """)
363
364 for row in rows:
365 # Skip AS without names
366 if not row.name:
367 continue
368
369 a = writer.add_as(row.number)
370 a.name = row.name
371
372 # Add all networks
373 log.info("Writing networks...")
374
375 # Create a new temporary table where we collect
376 # the networks that we are interested in
377 self.db.execute("""
378 CREATE TEMPORARY TABLE
379 n
380 (
381 network inet NOT NULL,
382 autnum integer,
383 country text,
384 is_anonymous_proxy boolean,
385 is_satellite_provider boolean,
386 is_anycast boolean,
387 is_drop boolean
388 )
389 WITH (FILLFACTOR = 50)
390 """)
391
392 # Add all known networks
393 self.db.execute("""
394 INSERT INTO
395 n
396 (
397 network
398 )
399
400 SELECT
401 network
402 FROM
403 announcements
404
405 UNION
406
407 SELECT
408 network
409 FROM
410 networks
411
412 UNION
413
414 SELECT
415 network
416 FROM
417 network_feeds
418
419 UNION
420
421 SELECT
422 network
423 FROM
424 network_overrides
425
426 UNION
427
428 SELECT
429 network
430 FROM
431 geofeed_networks
432 """)
433
434 # Create an index to search through networks faster
435 self.db.execute("""
436 CREATE INDEX
437 n_search
438 ON
439 n
440 USING
441 SPGIST(network)
442 """)
443
444 # Analyze n
445 self.db.execute("ANALYZE n")
446
447 # Apply the AS number to all networks
448 self.db.execute("""
449 -- Join all networks together with their most specific announcements
450 WITH announcements AS (
451 SELECT
452 n.network,
453 announcements.autnum,
454
455 -- Sort all merges and number them so
456 -- that we can later select the best one
457 ROW_NUMBER()
458 OVER
459 (
460 PARTITION BY
461 n.network
462 ORDER BY
463 masklen(announcements.network) DESC
464 ) AS row
465 FROM
466 n
467 JOIN
468 announcements
469 ON
470 announcements.network >>= n.network
471 )
472
473 -- Store the result
474 UPDATE
475 n
476 SET
477 autnum = announcements.autnum
478 FROM
479 announcements
480 WHERE
481 announcements.network = n.network
482 AND
483 announcements.row = 1
484 """,
485 )
486
487 # Apply country information
488 self.db.execute("""
489 WITH networks AS (
490 SELECT
491 n.network,
492 networks.country,
493
494 ROW_NUMBER()
495 OVER
496 (
497 PARTITION BY
498 n.network
499 ORDER BY
500 masklen(networks.network) DESC
501 ) AS row
502 FROM
503 n
504 JOIN
505 networks
506 ON
507 networks.network >>= n.network
508 )
509
510 UPDATE
511 n
512 SET
513 country = networks.country
514 FROM
515 networks
516 WHERE
517 networks.network = n.network
518 AND
519 networks.row = 1
520 """,
521 )
522
523 # Add all country information from Geofeeds
524 self.db.execute("""
525 WITH geofeeds AS (
526 SELECT
527 DISTINCT ON (geofeed_networks.network)
528 geofeed_networks.network,
529 geofeed_networks.country
530 FROM
531 geofeeds
532 JOIN
533 network_geofeeds networks
534 ON
535 geofeeds.url = networks.url
536 JOIN
537 geofeed_networks
538 ON
539 geofeeds.id = geofeed_networks.geofeed_id
540 AND
541 networks.network >>= geofeed_networks.network
542 ),
543
544 networks AS (
545 SELECT
546 n.network,
547 geofeeds.country,
548
549 ROW_NUMBER()
550 OVER
551 (
552 PARTITION BY
553 n.network
554 ORDER BY
555 masklen(geofeeds.network) DESC
556 ) AS row
557 FROM
558 n
559 JOIN
560 geofeeds
561 ON
562 geofeeds.network >>= n.network
563 )
564
565 UPDATE
566 n
567 SET
568 country = networks.country
569 FROM
570 networks
571 WHERE
572 networks.network = n.network
573 AND
574 networks.row = 1
575 """,
576 )
577
578 # Apply country and flags from feeds
579 self.db.execute("""
580 WITH networks AS (
581 SELECT
582 n.network,
583 network_feeds.country,
584
585 -- Flags
586 network_feeds.is_anonymous_proxy,
587 network_feeds.is_satellite_provider,
588 network_feeds.is_anycast,
589 network_feeds.is_drop,
590
591 ROW_NUMBER()
592 OVER
593 (
594 PARTITION BY
595 n.network
596 ORDER BY
597 masklen(network_feeds.network) DESC
598 ) AS row
599 FROM
600 n
601 JOIN
602 network_feeds
603 ON
604 network_feeds.network >>= n.network
605 )
606
607 UPDATE
608 n
609 SET
610 country =
611 COALESCE(networks.country, n.country),
612
613 is_anonymous_proxy =
614 COALESCE(networks.is_anonymous_proxy, n.is_anonymous_proxy),
615
616 is_satellite_provider =
617 COALESCE(networks.is_satellite_provider, n.is_satellite_provider),
618
619 is_anycast =
620 COALESCE(networks.is_anycast, n.is_anycast),
621
622 is_drop =
623 COALESCE(networks.is_drop, n.is_drop)
624 FROM
625 networks
626 WHERE
627 networks.network = n.network
628 AND
629 networks.row = 1
630 """,
631 )
632
633 # Apply country and flags from AS feeds
634 self.db.execute("""
635 WITH networks AS (
636 SELECT
637 n.network,
638 autnum_feeds.country,
639
640 -- Flags
641 autnum_feeds.is_anonymous_proxy,
642 autnum_feeds.is_satellite_provider,
643 autnum_feeds.is_anycast,
644 autnum_feeds.is_drop
645 FROM
646 n
647 JOIN
648 autnum_feeds
649 ON
650 autnum_feeds.number = n.autnum
651 )
652
653 UPDATE
654 n
655 SET
656 country =
657 COALESCE(networks.country, n.country),
658
659 is_anonymous_proxy =
660 COALESCE(networks.is_anonymous_proxy, n.is_anonymous_proxy),
661
662 is_satellite_provider =
663 COALESCE(networks.is_satellite_provider, n.is_satellite_provider),
664
665 is_anycast =
666 COALESCE(networks.is_anycast, n.is_anycast),
667
668 is_drop =
669 COALESCE(networks.is_drop, n.is_drop)
670 FROM
671 networks
672 WHERE
673 networks.network = n.network
674 """)
675
676 # Apply network overrides
677 self.db.execute("""
678 WITH networks AS (
679 SELECT
680 n.network,
681 network_overrides.country,
682
683 -- Flags
684 network_overrides.is_anonymous_proxy,
685 network_overrides.is_satellite_provider,
686 network_overrides.is_anycast,
687 network_overrides.is_drop,
688
689 ROW_NUMBER()
690 OVER
691 (
692 PARTITION BY
693 n.network
694 ORDER BY
695 masklen(network_overrides.network) DESC
696 ) AS row
697 FROM
698 n
699 JOIN
700 network_overrides
701 ON
702 network_overrides.network >>= n.network
703 )
704
705 UPDATE
706 n
707 SET
708 country =
709 COALESCE(networks.country, n.country),
710
711 is_anonymous_proxy =
712 COALESCE(networks.is_anonymous_proxy, n.is_anonymous_proxy),
713
714 is_satellite_provider =
715 COALESCE(networks.is_satellite_provider, n.is_satellite_provider),
716
717 is_anycast =
718 COALESCE(networks.is_anycast, n.is_anycast),
719
720 is_drop =
721 COALESCE(networks.is_drop, n.is_drop)
722 FROM
723 networks
724 WHERE
725 networks.network = n.network
726 AND
727 networks.row = 1
728 """)
729
730 # Apply AS overrides
731 self.db.execute("""
732 WITH networks AS (
733 SELECT
734 n.network,
735 autnum_overrides.country,
736
737 -- Flags
738 autnum_overrides.is_anonymous_proxy,
739 autnum_overrides.is_satellite_provider,
740 autnum_overrides.is_anycast,
741 autnum_overrides.is_drop
742 FROM
743 n
744 JOIN
745 autnum_overrides
746 ON
747 autnum_overrides.number = n.autnum
748 )
749
750 UPDATE
751 n
752 SET
753 country =
754 COALESCE(networks.country, n.country),
755
756 is_anonymous_proxy =
757 COALESCE(networks.is_anonymous_proxy, n.is_anonymous_proxy),
758
759 is_satellite_provider =
760 COALESCE(networks.is_satellite_provider, n.is_satellite_provider),
761
762 is_anycast =
763 COALESCE(networks.is_anycast, n.is_anycast),
764
765 is_drop =
766 COALESCE(networks.is_drop, n.is_drop)
767 FROM
768 networks
769 WHERE
770 networks.network = n.network
771 """)
772
773 # Here we could remove some networks that we no longer need, but since we
774 # already have implemented our deduplication/merge algorithm this would not
775 # be necessary.
776
777 # Export the entire temporary table
778 rows = self.db.query("""
779 SELECT
780 *
781 FROM
782 n
783 ORDER BY
784 network
785 """)
786
787 for row in rows:
788 network = writer.add_network("%s" % row.network)
789
790 # Save country
791 if row.country:
792 network.country_code = row.country
793
794 # Save ASN
795 if row.autnum:
796 network.asn = row.autnum
797
798 # Set flags
799 if row.is_anonymous_proxy:
800 network.set_flag(location.NETWORK_FLAG_ANONYMOUS_PROXY)
801
802 if row.is_satellite_provider:
803 network.set_flag(location.NETWORK_FLAG_SATELLITE_PROVIDER)
804
805 if row.is_anycast:
806 network.set_flag(location.NETWORK_FLAG_ANYCAST)
807
808 if row.is_drop:
809 network.set_flag(location.NETWORK_FLAG_DROP)
810
811 # Add all countries
812 log.info("Writing countries...")
813
814 # Select all countries
815 rows = self.db.query("""
816 SELECT
817 *
818 FROM
819 countries
820 ORDER BY
821 country_code
822 """,
823 )
824
825 for row in rows:
826 c = writer.add_country(row.country_code)
827 c.continent_code = row.continent_code
828 c.name = row.name
829
830 # Write everything to file
831 log.info("Writing database to file...")
832 for file in ns.file:
833 writer.write(file)
834
835 async def handle_update_whois(self, ns):
836 # Did we run successfully?
837 success = True
838
839 sources = (
840 # African Network Information Centre
841 ("AFRINIC", (
842 (self._import_standard_format, "https://ftp.afrinic.net/pub/pub/dbase/afrinic.db.gz"),
843 )),
844
845 # Asia Pacific Network Information Centre
846 ("APNIC", (
847 (self._import_standard_format, "https://ftp.apnic.net/apnic/whois/apnic.db.inet6num.gz"),
848 (self._import_standard_format, "https://ftp.apnic.net/apnic/whois/apnic.db.inetnum.gz"),
849 (self._import_standard_format, "https://ftp.apnic.net/apnic/whois/apnic.db.aut-num.gz"),
850 (self._import_standard_format, "https://ftp.apnic.net/apnic/whois/apnic.db.organisation.gz"),
851 )),
852
853 # American Registry for Internet Numbers
854 ("ARIN", (
855 (self._import_extended_format, "https://ftp.arin.net/pub/stats/arin/delegated-arin-extended-latest"),
856 (self._import_arin_as_names, "https://ftp.arin.net/pub/resource_registry_service/asns.csv"),
857 )),
858
859 # Japan Network Information Center
860 ("JPNIC", (
861 (self._import_standard_format, "https://ftp.nic.ad.jp/jpirr/jpirr.db.gz"),
862 )),
863
864 # Latin America and Caribbean Network Information Centre
865 ("LACNIC", (
866 (self._import_standard_format, "https://ftp.lacnic.net/lacnic/dbase/lacnic.db.gz"),
867 (self._import_extended_format, "https://ftp.lacnic.net/pub/stats/lacnic/delegated-lacnic-extended-latest"),
868 )),
869
870 # Réseaux IP Européens
871 ("RIPE", (
872 (self._import_standard_format, "https://ftp.ripe.net/ripe/dbase/split/ripe.db.inet6num.gz"),
873 (self._import_standard_format, "https://ftp.ripe.net/ripe/dbase/split/ripe.db.inetnum.gz"),
874 (self._import_standard_format, "https://ftp.ripe.net/ripe/dbase/split/ripe.db.aut-num.gz"),
875 (self._import_standard_format, "https://ftp.ripe.net/ripe/dbase/split/ripe.db.organisation.gz"),
876 )),
877 )
878
879 # Fetch all valid country codes to check parsed networks against
880 countries = self.fetch_countries()
881
882 # Check if we have countries
883 if not countries:
884 log.error("Please import countries before importing any WHOIS data")
885 return 1
886
887 # Iterate over all potential sources
888 for name, feeds in sources:
889 # Skip anything that should not be updated
890 if ns.sources and not name in ns.sources:
891 continue
892
893 try:
894 await self._process_source(name, feeds, countries)
895
896 # Log an error but continue if an exception occurs
897 except Exception as e:
898 log.error("Error processing source %s" % name, exc_info=True)
899 success = False
900
901 # Return a non-zero exit code for errors
902 return 0 if success else 1
903
904 async def _process_source(self, source, feeds, countries):
905 """
906 This function processes one source
907 """
908 # Wrap everything into one large transaction
909 with self.db.transaction():
910 # Remove all previously imported content
911 self.db.execute("DELETE FROM autnums WHERE source = %s", source)
912 self.db.execute("DELETE FROM networks WHERE source = %s", source)
913 self.db.execute("DELETE FROM network_geofeeds WHERE source = %s", source)
914
915 # Create some temporary tables to store parsed data
916 self.db.execute("""
917 CREATE TEMPORARY TABLE _autnums(number integer NOT NULL,
918 organization text NOT NULL, source text NOT NULL) ON COMMIT DROP;
919 CREATE UNIQUE INDEX _autnums_number ON _autnums(number);
920
921 CREATE TEMPORARY TABLE _organizations(handle text NOT NULL,
922 name text NOT NULL, source text NOT NULL) ON COMMIT DROP;
923 CREATE UNIQUE INDEX _organizations_handle ON _organizations(handle);
924
925 CREATE TEMPORARY TABLE _rirdata(network inet NOT NULL, country text,
926 original_countries text[] NOT NULL, source text NOT NULL)
927 ON COMMIT DROP;
928 CREATE INDEX _rirdata_search ON _rirdata
929 USING BTREE(family(network), masklen(network));
930 CREATE UNIQUE INDEX _rirdata_network ON _rirdata(network);
931 """)
932
933 # Parse all feeds
934 for callback, url, *args in feeds:
935 # Retrieve the feed
936 f = self.downloader.retrieve(url)
937
938 # Call the callback
939 with self.db.pipeline():
940 await callback(source, countries, f, *args)
941
942 # Process all parsed networks from every RIR we happen to have access to,
943 # insert the largest network chunks into the networks table immediately...
944 families = self.db.query("""
945 SELECT DISTINCT
946 family(network) AS family
947 FROM
948 _rirdata
949 ORDER BY
950 family(network)
951 """,
952 )
953
954 for family in (row.family for row in families):
955 # Fetch the smallest mask length in our data set
956 smallest = self.db.get("""
957 SELECT
958 MIN(
959 masklen(network)
960 ) AS prefix
961 FROM
962 _rirdata
963 WHERE
964 family(network) = %s
965 """, family,
966 )
967
968 # Copy all networks
969 self.db.execute("""
970 INSERT INTO
971 networks
972 (
973 network,
974 country,
975 original_countries,
976 source
977 )
978 SELECT
979 network,
980 country,
981 original_countries,
982 source
983 FROM
984 _rirdata
985 WHERE
986 masklen(network) = %s
987 AND
988 family(network) = %s
989 ON CONFLICT DO
990 NOTHING""",
991 smallest.prefix,
992 family,
993 )
994
995 # ... determine any other prefixes for this network family, ...
996 prefixes = self.db.query("""
997 SELECT
998 DISTINCT masklen(network) AS prefix
999 FROM
1000 _rirdata
1001 WHERE
1002 family(network) = %s
1003 ORDER BY
1004 masklen(network) ASC
1005 OFFSET 1
1006 """, family,
1007 )
1008
1009 # ... and insert networks with this prefix in case they provide additional
1010 # information (i. e. subnet of a larger chunk with a different country)
1011 for prefix in (row.prefix for row in prefixes):
1012 self.db.execute("""
1013 WITH candidates AS (
1014 SELECT
1015 _rirdata.network,
1016 _rirdata.country,
1017 _rirdata.original_countries,
1018 _rirdata.source
1019 FROM
1020 _rirdata
1021 WHERE
1022 family(_rirdata.network) = %s
1023 AND
1024 masklen(_rirdata.network) = %s
1025 ),
1026 filtered AS (
1027 SELECT
1028 DISTINCT ON (c.network)
1029 c.network,
1030 c.country,
1031 c.original_countries,
1032 c.source,
1033 masklen(networks.network),
1034 networks.country AS parent_country
1035 FROM
1036 candidates c
1037 LEFT JOIN
1038 networks
1039 ON
1040 c.network << networks.network
1041 ORDER BY
1042 c.network,
1043 masklen(networks.network) DESC NULLS LAST
1044 )
1045 INSERT INTO
1046 networks(network, country, original_countries, source)
1047 SELECT
1048 network,
1049 country,
1050 original_countries,
1051 source
1052 FROM
1053 filtered
1054 WHERE
1055 parent_country IS NULL
1056 OR
1057 country <> parent_country
1058 ON CONFLICT DO NOTHING
1059 """, family, prefix,
1060 )
1061
1062 self.db.execute("""
1063 INSERT INTO
1064 autnums
1065 (
1066 number,
1067 name,
1068 source
1069 )
1070 SELECT
1071 _autnums.number,
1072 _organizations.name,
1073 _organizations.source
1074 FROM
1075 _autnums
1076 JOIN
1077 _organizations ON _autnums.organization = _organizations.handle
1078 ON CONFLICT
1079 (
1080 number
1081 )
1082 DO UPDATE
1083 SET name = excluded.name
1084 """,
1085 )
1086
1087 async def _import_standard_format(self, source, countries, f, *args):
1088 """
1089 Imports a single standard format source feed
1090 """
1091 # Iterate over all blocks
1092 for block in iterate_over_blocks(f):
1093 self._parse_block(block, source, countries)
1094
1095 async def _import_extended_format(self, source, countries, f, *args):
1096 # Iterate over all lines
1097 for line in iterate_over_lines(f):
1098 self._parse_line(line, source, countries)
1099
1100 async def _import_arin_as_names(self, source, countries, f, *args):
1101 # Wrap the data to text
1102 f = io.TextIOWrapper(f)
1103
1104 # Walk through the file
1105 for line in csv.DictReader(f, dialect="arin"):
1106 # Fetch status
1107 status = line.get("Status")
1108
1109 # We are only interested in anything managed by ARIN
1110 if not status == "Full Registry Services":
1111 continue
1112
1113 # Fetch organization name
1114 name = line.get("Org Name")
1115
1116 # Extract ASNs
1117 first_asn = line.get("Start AS Number")
1118 last_asn = line.get("End AS Number")
1119
1120 # Cast to a number
1121 try:
1122 first_asn = int(first_asn)
1123 except TypeError as e:
1124 log.warning("Could not parse ASN '%s'" % first_asn)
1125 continue
1126
1127 try:
1128 last_asn = int(last_asn)
1129 except TypeError as e:
1130 log.warning("Could not parse ASN '%s'" % last_asn)
1131 continue
1132
1133 # Check if the range is valid
1134 if last_asn < first_asn:
1135 log.warning("Invalid ASN range %s-%s" % (first_asn, last_asn))
1136
1137 # Insert everything into the database
1138 for asn in range(first_asn, last_asn + 1):
1139 if not self._check_parsed_asn(asn):
1140 log.warning("Skipping invalid ASN %s" % asn)
1141 continue
1142
1143 self.db.execute("""
1144 INSERT INTO
1145 autnums
1146 (
1147 number,
1148 name,
1149 source
1150 )
1151 VALUES
1152 (
1153 %s, %s, %s
1154 )
1155 ON CONFLICT
1156 (
1157 number
1158 )
1159 DO NOTHING
1160 """, asn, name, "ARIN",
1161 )
1162
1163 def _check_parsed_network(self, network):
1164 """
1165 Assistive function to detect and subsequently sort out parsed
1166 networks from RIR data (both Whois and so-called "extended sources"),
1167 which are or have...
1168
1169 (a) not globally routable (RFC 1918 space, et al.)
1170 (b) covering a too large chunk of the IP address space (prefix length
1171 is < 7 for IPv4 networks, and < 10 for IPv6)
1172 (c) "0.0.0.0" or "::" as a network address
1173
1174 This unfortunately is necessary due to brain-dead clutter across
1175 various RIR databases, causing mismatches and eventually disruptions.
1176
1177 We will return False in case a network is not suitable for adding
1178 it to our database, and True otherwise.
1179 """
1180 # Check input
1181 if isinstance(network, ipaddress.IPv6Network):
1182 pass
1183 elif isinstance(network, ipaddress.IPv4Network):
1184 pass
1185 else:
1186 raise ValueError("Invalid network: %s (type %s)" % (network, type(network)))
1187
1188 # Ignore anything that isn't globally routable
1189 if not network.is_global:
1190 log.debug("Skipping non-globally routable network: %s" % network)
1191 return False
1192
1193 # Ignore anything that is unspecified IP range (See RFC 5735 for IPv4 or RFC 2373 for IPv6)
1194 elif network.is_unspecified:
1195 log.debug("Skipping unspecified network: %s" % network)
1196 return False
1197
1198 # IPv6
1199 if network.version == 6:
1200 if network.prefixlen < 10:
1201 log.debug("Skipping too big IP chunk: %s" % network)
1202 return False
1203
1204 # IPv4
1205 elif network.version == 4:
1206 if network.prefixlen < 7:
1207 log.debug("Skipping too big IP chunk: %s" % network)
1208 return False
1209
1210 # In case we have made it here, the network is considered to
1211 # be suitable for libloc consumption...
1212 return True
1213
1214 def _check_parsed_asn(self, asn):
1215 """
1216 Assistive function to filter Autonomous System Numbers not being suitable
1217 for adding to our database. Returns False in such cases, and True otherwise.
1218 """
1219
1220 for start, end in VALID_ASN_RANGES:
1221 if start <= asn and end >= asn:
1222 return True
1223
1224 log.info("Supplied ASN %s out of publicly routable ASN ranges" % asn)
1225 return False
1226
1227 def _check_geofeed_url(self, url):
1228 """
1229 This function checks if a Geofeed URL is valid.
1230
1231 If so, it returns the normalized URL which should be stored instead of
1232 the original one.
1233 """
1234 # Parse the URL
1235 try:
1236 url = urllib.parse.urlparse(url)
1237 except ValueError as e:
1238 log.warning("Invalid URL %s: %s" % (url, e))
1239 return
1240
1241 # Make sure that this is a HTTPS URL
1242 if not url.scheme == "https":
1243 log.warning("Skipping Geofeed URL that is not using HTTPS: %s" \
1244 % url.geturl())
1245 return
1246
1247 # Normalize the URL and convert it back
1248 return url.geturl()
1249
1250 def _parse_block(self, block, source_key, countries):
1251 # Get first line to find out what type of block this is
1252 line = block[0]
1253
1254 # aut-num
1255 if line.startswith("aut-num:"):
1256 return self._parse_autnum_block(block, source_key)
1257
1258 # inetnum
1259 if line.startswith("inet6num:") or line.startswith("inetnum:"):
1260 return self._parse_inetnum_block(block, source_key, countries)
1261
1262 # organisation
1263 elif line.startswith("organisation:"):
1264 return self._parse_org_block(block, source_key)
1265
1266 def _parse_autnum_block(self, block, source_key):
1267 autnum = {}
1268 for line in block:
1269 # Split line
1270 key, val = split_line(line)
1271
1272 if key == "aut-num":
1273 m = re.match(r"^(AS|as)(\d+)", val)
1274 if m:
1275 autnum["asn"] = m.group(2)
1276
1277 elif key == "org":
1278 autnum[key] = val.upper()
1279
1280 elif key == "descr":
1281 # Save the first description line as well...
1282 if not key in autnum:
1283 autnum[key] = val
1284
1285 # Skip empty objects
1286 if not autnum or not "asn" in autnum:
1287 return
1288
1289 # Insert a dummy organisation handle into our temporary organisations
1290 # table in case the AS does not have an organisation handle set, but
1291 # has a description (a quirk often observed in APNIC area), so we can
1292 # later display at least some string for this AS.
1293 if not "org" in autnum:
1294 if "descr" in autnum:
1295 autnum["org"] = "LIBLOC-%s-ORGHANDLE" % autnum.get("asn")
1296
1297 self.db.execute("INSERT INTO _organizations(handle, name, source) \
1298 VALUES(%s, %s, %s) ON CONFLICT (handle) DO NOTHING",
1299 autnum.get("org"), autnum.get("descr"), source_key,
1300 )
1301 else:
1302 log.warning("ASN %s neither has an organisation handle nor a description line set, omitting" % \
1303 autnum.get("asn"))
1304 return
1305
1306 # Insert into database
1307 self.db.execute("INSERT INTO _autnums(number, organization, source) \
1308 VALUES(%s, %s, %s) ON CONFLICT (number) DO UPDATE SET \
1309 organization = excluded.organization",
1310 autnum.get("asn"), autnum.get("org"), source_key,
1311 )
1312
1313 def _parse_inetnum_block(self, block, source_key, countries):
1314 inetnum = {}
1315 for line in block:
1316 # Split line
1317 key, val = split_line(line)
1318
1319 # Filter any inetnum records which are only referring to IP space
1320 # not managed by that specific RIR...
1321 if key == "netname":
1322 if re.match(r"^(ERX-NETBLOCK|(AFRINIC|ARIN|LACNIC|RIPE)-CIDR-BLOCK|IANA-NETBLOCK-\d{1,3}|NON-RIPE-NCC-MANAGED-ADDRESS-BLOCK|STUB-[\d-]{3,}SLASH\d{1,2})", val.strip()):
1323 log.debug("Skipping record indicating historic/orphaned data: %s" % val.strip())
1324 return
1325
1326 if key == "inetnum":
1327 start_address, delim, end_address = val.partition("-")
1328
1329 # Strip any excess space
1330 start_address, end_address = start_address.rstrip(), end_address.strip()
1331
1332 # Handle "inetnum" formatting in LACNIC DB (e.g. "24.152.8/22" instead of "24.152.8.0/22")
1333 if start_address and not (delim or end_address):
1334 try:
1335 start_address = ipaddress.ip_network(start_address, strict=False)
1336 except ValueError:
1337 start_address = start_address.split("/")
1338 ldigits = start_address[0].count(".")
1339
1340 # How many octets do we need to add?
1341 # (LACNIC does not seem to have a /8 or greater assigned, so the following should suffice.)
1342 if ldigits == 1:
1343 start_address = start_address[0] + ".0.0/" + start_address[1]
1344 elif ldigits == 2:
1345 start_address = start_address[0] + ".0/" + start_address[1]
1346 else:
1347 log.warning("Could not recover IPv4 address from line in LACNIC DB format: %s" % line)
1348 return
1349
1350 try:
1351 start_address = ipaddress.ip_network(start_address, strict=False)
1352 except ValueError:
1353 log.warning("Could not parse line in LACNIC DB format: %s" % line)
1354 return
1355
1356 # Enumerate first and last IP address of this network
1357 end_address = start_address[-1]
1358 start_address = start_address[0]
1359
1360 else:
1361 # Convert to IP address
1362 try:
1363 start_address = ipaddress.ip_address(start_address)
1364 end_address = ipaddress.ip_address(end_address)
1365 except ValueError:
1366 log.warning("Could not parse line: %s" % line)
1367 return
1368
1369 inetnum["inetnum"] = list(ipaddress.summarize_address_range(start_address, end_address))
1370
1371 elif key == "inet6num":
1372 inetnum[key] = [ipaddress.ip_network(val, strict=False)]
1373
1374 elif key == "country":
1375 cc = val.upper()
1376
1377 # Ignore certain country codes
1378 if cc in IGNORED_COUNTRIES:
1379 log.debug("Ignoring country code '%s'" % cc)
1380 continue
1381
1382 # Translate country codes
1383 try:
1384 cc = TRANSLATED_COUNTRIES[cc]
1385 except KeyError:
1386 pass
1387
1388 # Do we know this country?
1389 if not cc in countries:
1390 log.warning("Skipping invalid country code '%s'" % cc)
1391 continue
1392
1393 try:
1394 inetnum[key].append(cc)
1395 except KeyError:
1396 inetnum[key] = [cc]
1397
1398 # Parse the geofeed attribute
1399 elif key == "geofeed":
1400 inetnum["geofeed"] = val
1401
1402 # Parse geofeed when used as a remark
1403 elif key == "remarks":
1404 m = re.match(r"^(?:Geofeed)\s+(https://.*)", val)
1405 if m:
1406 inetnum["geofeed"] = m.group(1)
1407
1408 # Skip empty objects
1409 if not inetnum:
1410 return
1411
1412 # Iterate through all networks enumerated from above, check them for plausibility and insert
1413 # them into the database, if _check_parsed_network() succeeded
1414 for single_network in inetnum.get("inet6num") or inetnum.get("inetnum"):
1415 if not self._check_parsed_network(single_network):
1416 continue
1417
1418 # Fetch the countries or use a list with an empty country
1419 countries = inetnum.get("country", [None])
1420
1421 # Insert the network into the database but only use the first country code
1422 for cc in countries:
1423 self.db.execute("""
1424 INSERT INTO
1425 _rirdata
1426 (
1427 network,
1428 country,
1429 original_countries,
1430 source
1431 )
1432 VALUES
1433 (
1434 %s, %s, %s, %s
1435 )
1436 ON CONFLICT (network)
1437 DO UPDATE SET country = excluded.country
1438 """, "%s" % single_network, cc, [cc for cc in countries if cc], source_key,
1439 )
1440
1441 # If there are more than one country, we will only use the first one
1442 break
1443
1444 # Update any geofeed information
1445 geofeed = inetnum.get("geofeed", None)
1446 if geofeed:
1447 self._parse_geofeed(source_key, geofeed, single_network)
1448
1449 def _parse_geofeed(self, source, url, single_network):
1450 # Check the URL
1451 url = self._check_geofeed_url(url)
1452 if not url:
1453 return
1454
1455 # Store/update any geofeeds
1456 self.db.execute("""
1457 INSERT INTO
1458 network_geofeeds
1459 (
1460 network,
1461 url,
1462 source
1463 )
1464 VALUES
1465 (
1466 %s, %s, %s
1467 )
1468 ON CONFLICT
1469 (
1470 network, url
1471 )
1472 DO UPDATE SET
1473 source = excluded.source
1474 """, "%s" % single_network, url, source,
1475 )
1476
1477 def _parse_org_block(self, block, source_key):
1478 org = {}
1479 for line in block:
1480 # Split line
1481 key, val = split_line(line)
1482
1483 if key == "organisation":
1484 org[key] = val.upper()
1485 elif key == "org-name":
1486 org[key] = val
1487
1488 # Skip empty objects
1489 if not org:
1490 return
1491
1492 self.db.execute("INSERT INTO _organizations(handle, name, source) \
1493 VALUES(%s, %s, %s) ON CONFLICT (handle) DO \
1494 UPDATE SET name = excluded.name",
1495 org.get("organisation"), org.get("org-name"), source_key,
1496 )
1497
1498 def _parse_line(self, line, source_key, validcountries=None):
1499 # Skip version line
1500 if line.startswith("2"):
1501 return
1502
1503 # Skip comments
1504 if line.startswith("#"):
1505 return
1506
1507 try:
1508 registry, country_code, type, line = line.split("|", 3)
1509 except:
1510 log.warning("Could not parse line: %s" % line)
1511 return
1512
1513 # Skip ASN
1514 if type == "asn":
1515 return
1516
1517 # Skip any unknown protocols
1518 elif not type in ("ipv6", "ipv4"):
1519 log.warning("Unknown IP protocol '%s'" % type)
1520 return
1521
1522 # Skip any lines that are for stats only or do not have a country
1523 # code at all (avoids log spam below)
1524 if not country_code or country_code == '*':
1525 return
1526
1527 # Skip objects with unknown country codes
1528 if validcountries and country_code not in validcountries:
1529 log.warning("Skipping line with bogus country '%s': %s" % \
1530 (country_code, line))
1531 return
1532
1533 try:
1534 address, prefix, date, status, organization = line.split("|")
1535 except ValueError:
1536 organization = None
1537
1538 # Try parsing the line without organization
1539 try:
1540 address, prefix, date, status = line.split("|")
1541 except ValueError:
1542 log.warning("Unhandled line format: %s" % line)
1543 return
1544
1545 # Skip anything that isn't properly assigned
1546 if not status in ("assigned", "allocated"):
1547 return
1548
1549 # Cast prefix into an integer
1550 try:
1551 prefix = int(prefix)
1552 except:
1553 log.warning("Invalid prefix: %s" % prefix)
1554 return
1555
1556 # Fix prefix length for IPv4
1557 if type == "ipv4":
1558 prefix = 32 - int(math.log(prefix, 2))
1559
1560 # Try to parse the address
1561 try:
1562 network = ipaddress.ip_network("%s/%s" % (address, prefix), strict=False)
1563 except ValueError:
1564 log.warning("Invalid IP address: %s" % address)
1565 return
1566
1567 if not self._check_parsed_network(network):
1568 return
1569
1570 self.db.execute("""
1571 INSERT INTO
1572 networks
1573 (
1574 network,
1575 country,
1576 original_countries,
1577 source
1578 )
1579 VALUES
1580 (
1581 %s, %s, %s, %s
1582 )
1583 ON CONFLICT (network)
1584 DO UPDATE SET country = excluded.country
1585 """, "%s" % network, country_code, [country_code], source_key,
1586 )
1587
1588 async def handle_update_announcements(self, ns):
1589 server = ns.server[0]
1590
1591 with self.db.transaction():
1592 if server.startswith("/"):
1593 await self._handle_update_announcements_from_bird(server)
1594
1595 # Purge anything we never want here
1596 self.db.execute("""
1597 -- Delete default routes
1598 DELETE FROM announcements WHERE network = '::/0' OR network = '0.0.0.0/0';
1599
1600 -- Delete anything that is not global unicast address space
1601 DELETE FROM announcements WHERE family(network) = 6 AND NOT network <<= '2000::/3';
1602
1603 -- DELETE "current network" address space
1604 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '0.0.0.0/8';
1605
1606 -- DELETE local loopback address space
1607 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '127.0.0.0/8';
1608
1609 -- DELETE RFC 1918 address space
1610 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '10.0.0.0/8';
1611 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '172.16.0.0/12';
1612 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '192.168.0.0/16';
1613
1614 -- DELETE test, benchmark and documentation address space
1615 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '192.0.0.0/24';
1616 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '192.0.2.0/24';
1617 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '198.18.0.0/15';
1618 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '198.51.100.0/24';
1619 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '203.0.113.0/24';
1620
1621 -- DELETE CGNAT address space (RFC 6598)
1622 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '100.64.0.0/10';
1623
1624 -- DELETE link local address space
1625 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '169.254.0.0/16';
1626
1627 -- DELETE IPv6 to IPv4 (6to4) address space (RFC 3068)
1628 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '192.88.99.0/24';
1629 DELETE FROM announcements WHERE family(network) = 6 AND network <<= '2002::/16';
1630
1631 -- DELETE multicast and reserved address space
1632 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '224.0.0.0/4';
1633 DELETE FROM announcements WHERE family(network) = 4 AND network <<= '240.0.0.0/4';
1634
1635 -- Delete networks that are too small to be in the global routing table
1636 DELETE FROM announcements WHERE family(network) = 6 AND masklen(network) > 48;
1637 DELETE FROM announcements WHERE family(network) = 4 AND masklen(network) > 24;
1638
1639 -- Delete any non-public or reserved ASNs
1640 DELETE FROM announcements WHERE NOT (
1641 (autnum >= 1 AND autnum <= 23455)
1642 OR
1643 (autnum >= 23457 AND autnum <= 64495)
1644 OR
1645 (autnum >= 131072 AND autnum <= 4199999999)
1646 );
1647
1648 -- Delete everything that we have not seen for 14 days
1649 DELETE FROM announcements WHERE last_seen_at <= CURRENT_TIMESTAMP - INTERVAL '14 days';
1650 """)
1651
1652 async def _handle_update_announcements_from_bird(self, server):
1653 # Pre-compile the regular expression for faster searching
1654 route = re.compile(b"^\s(.+?)\s+.+?\[(?:AS(.*?))?.\]$")
1655
1656 log.info("Requesting routing table from Bird (%s)" % server)
1657
1658 aggregated_networks = []
1659
1660 # Send command to list all routes
1661 for line in self._bird_cmd(server, "show route"):
1662 m = route.match(line)
1663 if not m:
1664 # Skip empty lines
1665 if not line:
1666 pass
1667
1668 # Ignore any header lines with the name of the routing table
1669 elif line.startswith(b"Table"):
1670 pass
1671
1672 # Log anything else
1673 else:
1674 log.debug("Could not parse line: %s" % line.decode())
1675
1676 continue
1677
1678 # Fetch the extracted network and ASN
1679 network, autnum = m.groups()
1680
1681 # Skip the line if there is no network
1682 if not network:
1683 continue
1684
1685 # Decode into strings
1686 network = network.decode()
1687
1688 # Parse as network object
1689 network = ipaddress.ip_network(network)
1690
1691 # Skip announcements that are too large
1692 if isinstance(network, ipaddress.IPv6Network):
1693 if network.prefixlen < 10:
1694 log.warning("Skipping unusually large network %s" % network)
1695 continue
1696 elif isinstance(network, ipaddress.IPv4Network):
1697 if network.prefixlen < 4:
1698 log.warning("Skipping unusually large network %s" % network)
1699 continue
1700
1701 # Collect all aggregated networks
1702 if not autnum:
1703 log.debug("%s is an aggregated network" % network)
1704 aggregated_networks.append(network)
1705 continue
1706
1707 # Decode ASN
1708 autnum = autnum.decode()
1709
1710 # Insert it into the database
1711 self.db.execute("INSERT INTO announcements(network, autnum) \
1712 VALUES(%s, %s) ON CONFLICT (network) DO \
1713 UPDATE SET autnum = excluded.autnum, last_seen_at = CURRENT_TIMESTAMP",
1714 "%s" % network, autnum,
1715 )
1716
1717 # Process any aggregated networks
1718 for network in aggregated_networks:
1719 log.debug("Processing aggregated network %s" % network)
1720
1721 # Run "show route all" for each network
1722 for line in self._bird_cmd(server, "show route %s all" % network):
1723 # Try finding the path
1724 m = re.match(b"\s+BGP\.as_path:.* (\d+) {\d+}$", line)
1725 if m:
1726 # Select the last AS number in the path
1727 autnum = m.group(1).decode()
1728
1729 # Insert it into the database
1730 self.db.execute("INSERT INTO announcements(network, autnum) \
1731 VALUES(%s, %s) ON CONFLICT (network) DO \
1732 UPDATE SET autnum = excluded.autnum, last_seen_at = CURRENT_TIMESTAMP",
1733 network, autnum,
1734 )
1735
1736 # We don't need to process any more
1737 break
1738
1739 def _bird_cmd(self, socket_path, command):
1740 # Connect to the socket
1741 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1742 s.connect(socket_path)
1743
1744 # Allocate some buffer
1745 buffer = b""
1746
1747 log.debug("Sending Bird command: %s" % command)
1748
1749 # Send the command
1750 s.send(b"%s\n" % command.encode())
1751
1752 while True:
1753 # Fill up the buffer
1754 buffer += s.recv(4096)
1755
1756 while True:
1757 # Search for the next newline
1758 pos = buffer.find(b"\n")
1759
1760 # If we cannot find one, we go back and read more data
1761 if pos <= 0:
1762 break
1763
1764 # Cut after the newline character
1765 pos += 1
1766
1767 # Split the line we want and keep the rest in buffer
1768 line, buffer = buffer[:pos], buffer[pos:]
1769
1770 # Try parsing any status lines
1771 if len(line) > 4 and line[:4].isdigit() and line[4] in (32, 45):
1772 code, delim, line = int(line[:4]), line[4], line[5:]
1773
1774 log.debug("Received response code %s from bird" % code)
1775
1776 # End of output
1777 if code == 0:
1778 return
1779
1780 # Ignore hello line
1781 elif code == 1:
1782 continue
1783
1784 # Otherwise return the line
1785 yield line
1786
1787 async def handle_update_geofeeds(self, ns):
1788 # Sync geofeeds
1789 with self.db.transaction():
1790 # Delete all geofeeds which are no longer linked
1791 self.db.execute("""
1792 DELETE FROM
1793 geofeeds
1794 WHERE
1795 geofeeds.url NOT IN (
1796 SELECT
1797 network_geofeeds.url
1798 FROM
1799 network_geofeeds
1800 )
1801 """,
1802 )
1803
1804 # Copy all geofeeds
1805 self.db.execute("""
1806 WITH all_geofeeds AS (
1807 SELECT
1808 network_geofeeds.url
1809 FROM
1810 network_geofeeds
1811 )
1812 INSERT INTO
1813 geofeeds
1814 (
1815 url
1816 )
1817 SELECT
1818 url
1819 FROM
1820 all_geofeeds
1821 ON CONFLICT (url)
1822 DO NOTHING
1823 """,
1824 )
1825
1826 # Fetch all Geofeeds that require an update
1827 geofeeds = self.db.query("""
1828 SELECT
1829 id,
1830 url
1831 FROM
1832 geofeeds
1833 WHERE
1834 updated_at IS NULL
1835 OR
1836 updated_at <= CURRENT_TIMESTAMP - INTERVAL '1 week'
1837 ORDER BY
1838 id
1839 """)
1840
1841 ratelimiter = asyncio.Semaphore(32)
1842
1843 # Update all geofeeds
1844 async with asyncio.TaskGroup() as tasks:
1845 for geofeed in geofeeds:
1846 task = tasks.create_task(
1847 self._fetch_geofeed(ratelimiter, geofeed),
1848 )
1849
1850 # Delete data from any feeds that did not update in the last two weeks
1851 with self.db.transaction():
1852 self.db.execute("""
1853 DELETE FROM
1854 geofeed_networks
1855 WHERE
1856 geofeed_networks.geofeed_id IN (
1857 SELECT
1858 geofeeds.id
1859 FROM
1860 geofeeds
1861 WHERE
1862 updated_at IS NULL
1863 OR
1864 updated_at <= CURRENT_TIMESTAMP - INTERVAL '2 weeks'
1865 )
1866 """)
1867
1868 async def _fetch_geofeed(self, ratelimiter, geofeed):
1869 async with ratelimiter:
1870 log.debug("Fetching Geofeed %s" % geofeed.url)
1871
1872 with self.db.transaction():
1873 # Open the URL
1874 try:
1875 # Send the request
1876 f = await asyncio.to_thread(
1877 self.downloader.retrieve,
1878
1879 # Fetch the feed by its URL
1880 geofeed.url,
1881
1882 # Send some extra headers
1883 headers={
1884 "User-Agent" : "location/%s" % location.__version__,
1885
1886 # We expect some plain text file in CSV format
1887 "Accept" : "text/csv, text/plain",
1888 },
1889
1890 # Don't wait longer than 10 seconds for a response
1891 timeout=10,
1892 )
1893
1894 # Remove any previous data
1895 self.db.execute("DELETE FROM geofeed_networks \
1896 WHERE geofeed_id = %s", geofeed.id)
1897
1898 lineno = 0
1899
1900 # Read the output line by line
1901 with self.db.pipeline():
1902 for line in f:
1903 lineno += 1
1904
1905 try:
1906 line = line.decode()
1907
1908 # Ignore any lines we cannot decode
1909 except UnicodeDecodeError:
1910 log.debug("Could not decode line %s in %s" \
1911 % (lineno, geofeed.url))
1912 continue
1913
1914 # Strip any newline
1915 line = line.rstrip()
1916
1917 # Skip empty lines
1918 if not line:
1919 continue
1920
1921 # Skip comments
1922 elif line.startswith("#"):
1923 continue
1924
1925 # Try to parse the line
1926 try:
1927 fields = line.split(",", 5)
1928 except ValueError:
1929 log.debug("Could not parse line: %s" % line)
1930 continue
1931
1932 # Check if we have enough fields
1933 if len(fields) < 4:
1934 log.debug("Not enough fields in line: %s" % line)
1935 continue
1936
1937 # Fetch all fields
1938 network, country, region, city, = fields[:4]
1939
1940 # Try to parse the network
1941 try:
1942 network = ipaddress.ip_network(network, strict=False)
1943 except ValueError:
1944 log.debug("Could not parse network: %s" % network)
1945 continue
1946
1947 # Strip any excess whitespace from country codes
1948 country = country.strip()
1949
1950 # Make the country code uppercase
1951 country = country.upper()
1952
1953 # Check the country code
1954 if not country:
1955 log.debug("Empty country code in Geofeed %s line %s" \
1956 % (geofeed.url, lineno))
1957 continue
1958
1959 elif not location.country_code_is_valid(country):
1960 log.debug("Invalid country code in Geofeed %s:%s: %s" \
1961 % (geofeed.url, lineno, country))
1962 continue
1963
1964 # Write this into the database
1965 self.db.execute("""
1966 INSERT INTO
1967 geofeed_networks (
1968 geofeed_id,
1969 network,
1970 country,
1971 region,
1972 city
1973 )
1974 VALUES (%s, %s, %s, %s, %s)""",
1975 geofeed.id,
1976 "%s" % network,
1977 country,
1978 region,
1979 city,
1980 )
1981
1982 # Catch any HTTP errors
1983 except urllib.request.HTTPError as e:
1984 self.db.execute("UPDATE geofeeds SET status = %s, error = %s \
1985 WHERE id = %s", e.code, "%s" % e, geofeed.id)
1986
1987 # Remove any previous data when the feed has been deleted
1988 if e.code == 404:
1989 self.db.execute("DELETE FROM geofeed_networks \
1990 WHERE geofeed_id = %s", geofeed.id)
1991
1992 # Catch any other errors and connection timeouts
1993 except (http.client.InvalidURL, http.client.RemoteDisconnected, urllib.request.URLError, TimeoutError) as e:
1994 log.debug("Could not fetch URL %s: %s" % (geofeed.url, e))
1995
1996 self.db.execute("UPDATE geofeeds SET status = %s, error = %s \
1997 WHERE id = %s", 599, "%s" % e, geofeed.id)
1998
1999 # Mark the geofeed as updated
2000 else:
2001 self.db.execute("""
2002 UPDATE
2003 geofeeds
2004 SET
2005 updated_at = CURRENT_TIMESTAMP,
2006 status = NULL,
2007 error = NULL
2008 WHERE
2009 id = %s""",
2010 geofeed.id,
2011 )
2012
2013 async def handle_update_overrides(self, ns):
2014 with self.db.transaction():
2015 # Drop any previous content
2016 self.db.execute("TRUNCATE TABLE autnum_overrides")
2017 self.db.execute("TRUNCATE TABLE network_overrides")
2018
2019 # Remove all Geofeeds
2020 self.db.execute("DELETE FROM network_geofeeds WHERE source = %s", "overrides")
2021
2022 for file in ns.files:
2023 log.info("Reading %s..." % file)
2024
2025 with open(file, "rb") as f:
2026 for type, block in read_blocks(f):
2027 if type == "net":
2028 network = block.get("net")
2029 # Try to parse and normalise the network
2030 try:
2031 network = ipaddress.ip_network(network, strict=False)
2032 except ValueError as e:
2033 log.warning("Invalid IP network: %s: %s" % (network, e))
2034 continue
2035
2036 # Prevent that we overwrite all networks
2037 if network.prefixlen == 0:
2038 log.warning("Skipping %s: You cannot overwrite default" % network)
2039 continue
2040
2041 self.db.execute("""
2042 INSERT INTO
2043 network_overrides
2044 (
2045 network,
2046 country,
2047 is_anonymous_proxy,
2048 is_satellite_provider,
2049 is_anycast,
2050 is_drop
2051 )
2052 VALUES
2053 (
2054 %s, %s, %s, %s, %s, %s
2055 )
2056 ON CONFLICT (network) DO NOTHING
2057 """,
2058 "%s" % network,
2059 block.get("country"),
2060 self._parse_bool(block, "is-anonymous-proxy"),
2061 self._parse_bool(block, "is-satellite-provider"),
2062 self._parse_bool(block, "is-anycast"),
2063 self._parse_bool(block, "drop"),
2064 )
2065
2066 elif type == "aut-num":
2067 autnum = block.get("aut-num")
2068
2069 # Check if AS number begins with "AS"
2070 if not autnum.startswith("AS"):
2071 log.warning("Invalid AS number: %s" % autnum)
2072 continue
2073
2074 # Strip "AS"
2075 autnum = autnum[2:]
2076
2077 self.db.execute("""
2078 INSERT INTO
2079 autnum_overrides
2080 (
2081 number,
2082 name,
2083 country,
2084 is_anonymous_proxy,
2085 is_satellite_provider,
2086 is_anycast,
2087 is_drop
2088 )
2089 VALUES
2090 (
2091 %s, %s, %s, %s, %s, %s, %s
2092 )
2093 ON CONFLICT (number) DO NOTHING
2094 """,
2095 autnum,
2096 block.get("name"),
2097 block.get("country"),
2098 self._parse_bool(block, "is-anonymous-proxy"),
2099 self._parse_bool(block, "is-satellite-provider"),
2100 self._parse_bool(block, "is-anycast"),
2101 self._parse_bool(block, "drop"),
2102 )
2103
2104 # Geofeeds
2105 elif type == "geofeed":
2106 networks = []
2107
2108 # Fetch the URL
2109 url = block.get("geofeed")
2110
2111 # Fetch permitted networks
2112 for n in block.get("network", []):
2113 try:
2114 n = ipaddress.ip_network(n)
2115 except ValueError as e:
2116 log.warning("Ignoring invalid network %s: %s" % (n, e))
2117 continue
2118
2119 networks.append(n)
2120
2121 # If no networks have been specified, permit for everything
2122 if not networks:
2123 networks = [
2124 ipaddress.ip_network("::/0"),
2125 ipaddress.ip_network("0.0.0.0/0"),
2126 ]
2127
2128 # Check the URL
2129 url = self._check_geofeed_url(url)
2130 if not url:
2131 continue
2132
2133 # Store the Geofeed URL
2134 self.db.execute("""
2135 INSERT INTO
2136 geofeeds
2137 (
2138 url
2139 )
2140 VALUES
2141 (
2142 %s
2143 )
2144 ON CONFLICT (url) DO NOTHING
2145 """, url,
2146 )
2147
2148 # Store all permitted networks
2149 self.db.executemany("""
2150 INSERT INTO
2151 network_geofeeds
2152 (
2153 network,
2154 url,
2155 source
2156 )
2157 VALUES
2158 (
2159 %s, %s, %s
2160 )
2161 ON CONFLICT
2162 (
2163 network, url
2164 )
2165 DO UPDATE SET
2166 source = excluded.source
2167 """, (("%s" % n, url, "overrides") for n in networks),
2168 )
2169
2170 else:
2171 log.warning("Unsupported type: %s" % type)
2172
2173 async def handle_update_feeds(self, ns):
2174 """
2175 Update any third-party feeds
2176 """
2177 success = True
2178
2179 feeds = (
2180 # AWS IP Ranges
2181 ("AWS-IP-RANGES", self._import_aws_ip_ranges, "https://ip-ranges.amazonaws.com/ip-ranges.json"),
2182
2183 # Spamhaus DROP
2184 ("SPAMHAUS-DROP", self._import_spamhaus_drop, "https://www.spamhaus.org/drop/drop.txt"),
2185 ("SPAMHAUS-DROPV6", self._import_spamhaus_drop, "https://www.spamhaus.org/drop/dropv6.txt"),
2186
2187 # Spamhaus ASNDROP
2188 ("SPAMHAUS-ASNDROP", self._import_spamhaus_asndrop, "https://www.spamhaus.org/drop/asndrop.json"),
2189 )
2190
2191 # Drop any data from feeds that we don't support (any more)
2192 with self.db.transaction():
2193 # Fetch the names of all feeds we support
2194 sources = [name for name, *rest in feeds]
2195
2196 self.db.execute("DELETE FROM autnum_feeds WHERE NOT source = ANY(%s)", sources)
2197 self.db.execute("DELETE FROM network_feeds WHERE NOT source = ANY(%s)", sources)
2198
2199 # Walk through all feeds
2200 for name, callback, url, *args in feeds:
2201 # Skip any feeds that were not requested on the command line
2202 if ns.feeds and not name in ns.feeds:
2203 continue
2204
2205 try:
2206 await self._process_feed(name, callback, url, *args)
2207
2208 # Log an error but continue if an exception occurs
2209 except Exception as e:
2210 log.error("Error processing feed '%s': %s" % (name, e))
2211 success = False
2212
2213 # Return status
2214 return 0 if success else 1
2215
2216 async def _process_feed(self, name, callback, url, *args):
2217 """
2218 Processes one feed
2219 """
2220 # Open the URL
2221 f = self.downloader.retrieve(url)
2222
2223 with self.db.transaction():
2224 # Drop any previous content
2225 self.db.execute("DELETE FROM autnum_feeds WHERE source = %s", name)
2226 self.db.execute("DELETE FROM network_feeds WHERE source = %s", name)
2227
2228 # Call the callback to process the feed
2229 with self.db.pipeline():
2230 return await callback(name, f, *args)
2231
2232 async def _import_aws_ip_ranges(self, name, f):
2233 # Parse the feed
2234 feed = json.load(f)
2235
2236 # Set up a dictionary for mapping a region name to a country. Unfortunately,
2237 # there seems to be no machine-readable version available of this other than
2238 # https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html
2239 # (worse, it seems to be incomplete :-/ ); https://www.cloudping.cloud/endpoints
2240 # was helpful here as well.
2241 aws_region_country_map = {
2242 # Africa
2243 "af-south-1" : "ZA",
2244
2245 # Asia
2246 "il-central-1" : "IL", # Tel Aviv
2247
2248 # Asia/Pacific
2249 "ap-northeast-1" : "JP",
2250 "ap-northeast-2" : "KR",
2251 "ap-northeast-3" : "JP",
2252 "ap-east-1" : "HK",
2253 "ap-south-1" : "IN",
2254 "ap-south-2" : "IN",
2255 "ap-southeast-1" : "SG",
2256 "ap-southeast-2" : "AU",
2257 "ap-southeast-3" : "MY",
2258 "ap-southeast-4" : "AU", # Melbourne
2259 "ap-southeast-5" : "MY", # Malaysia
2260 "ap-southeast-6" : "AP", # XXX: Precise location not documented anywhere
2261 "ap-southeast-7" : "AP", # XXX: Precise location unknown
2262
2263 # Canada
2264 "ca-central-1" : "CA",
2265 "ca-west-1" : "CA",
2266
2267 # Europe
2268 "eu-central-1" : "DE",
2269 "eu-central-2" : "CH",
2270 "eu-north-1" : "SE",
2271 "eu-west-1" : "IE",
2272 "eu-west-2" : "GB",
2273 "eu-west-3" : "FR",
2274 "eu-south-1" : "IT",
2275 "eu-south-2" : "ES",
2276
2277 # Middle East
2278 "me-central-1" : "AE",
2279 "me-south-1" : "BH",
2280
2281 # Mexico
2282 "mx-central-1" : "MX",
2283
2284 # South America
2285 "sa-east-1" : "BR",
2286
2287 # Undocumented, likely located in Berlin rather than Frankfurt
2288 "eusc-de-east-1" : "DE",
2289 }
2290
2291 # Collect a list of all networks
2292 prefixes = feed.get("ipv6_prefixes", []) + feed.get("prefixes", [])
2293
2294 for prefix in prefixes:
2295 # Fetch network
2296 network = prefix.get("ipv6_prefix") or prefix.get("ip_prefix")
2297
2298 # Parse the network
2299 try:
2300 network = ipaddress.ip_network(network)
2301 except ValuleError as e:
2302 log.warning("%s: Unable to parse prefix %s" % (name, network))
2303 continue
2304
2305 # Sanitize parsed networks...
2306 if not self._check_parsed_network(network):
2307 continue
2308
2309 # Fetch the region
2310 region = prefix.get("region")
2311
2312 # Set some defaults
2313 cc = None
2314 is_anycast = False
2315
2316 # Fetch the CC from the dictionary
2317 try:
2318 cc = aws_region_country_map[region]
2319
2320 # If we couldn't find anything, let's try something else...
2321 except KeyError as e:
2322 # Find anycast networks
2323 if region == "GLOBAL":
2324 is_anycast = True
2325
2326 # Everything that starts with us- is probably in the United States
2327 elif region.startswith("us-"):
2328 cc = "US"
2329
2330 # Everything that starts with cn- is probably China
2331 elif region.startswith("cn-"):
2332 cc = "CN"
2333
2334 # Log a warning for anything else
2335 else:
2336 log.warning("%s: Could not determine country code for AWS region %s" \
2337 % (name, region))
2338 continue
2339
2340 # Write to database
2341 self.db.execute("""
2342 INSERT INTO
2343 network_feeds
2344 (
2345 network,
2346 source,
2347 country,
2348 is_anycast
2349 )
2350 VALUES
2351 (
2352 %s, %s, %s, %s
2353 )
2354 ON CONFLICT (network, source) DO NOTHING
2355 """, "%s" % network, name, cc, is_anycast,
2356 )
2357
2358 async def _import_spamhaus_drop(self, name, f):
2359 """
2360 Import Spamhaus DROP IP feeds
2361 """
2362 # Count all lines
2363 lines = 0
2364
2365 # Walk through all lines
2366 for line in f:
2367 # Decode line
2368 line = line.decode("utf-8")
2369
2370 # Strip off any comments
2371 line, _, comment = line.partition(";")
2372
2373 # Ignore empty lines
2374 if not line:
2375 continue
2376
2377 # Strip any excess whitespace
2378 line = line.strip()
2379
2380 # Increment line counter
2381 lines += 1
2382
2383 # Parse the network
2384 try:
2385 network = ipaddress.ip_network(line)
2386 except ValueError as e:
2387 log.warning("%s: Could not parse network: %s - %s" % (name, line, e))
2388 continue
2389
2390 # Check network
2391 if not self._check_parsed_network(network):
2392 log.warning("%s: Skipping bogus network: %s" % (name, network))
2393 continue
2394
2395 # Insert into the database
2396 self.db.execute("""
2397 INSERT INTO
2398 network_feeds
2399 (
2400 network,
2401 source,
2402 is_drop
2403 )
2404 VALUES
2405 (
2406 %s, %s, %s
2407 )""", "%s" % network, name, True,
2408 )
2409
2410 # Raise an exception if we could not import anything
2411 if not lines:
2412 raise RuntimeError("Received bogus feed %s with no data" % name)
2413
2414 async def _import_spamhaus_asndrop(self, name, f):
2415 """
2416 Import Spamhaus ASNDROP feed
2417 """
2418 for line in f:
2419 # Decode the line
2420 line = line.decode("utf-8")
2421
2422 # Parse JSON
2423 try:
2424 line = json.loads(line)
2425 except json.JSONDecodeError as e:
2426 log.warning("%s: Unable to parse JSON object %s: %s" % (name, line, e))
2427 continue
2428
2429 # Fetch type
2430 type = line.get("type")
2431
2432 # Skip any metadata
2433 if type == "metadata":
2434 continue
2435
2436 # Fetch ASN
2437 asn = line.get("asn")
2438
2439 # Skip any lines without an ASN
2440 if not asn:
2441 continue
2442
2443 # Filter invalid ASNs
2444 if not self._check_parsed_asn(asn):
2445 log.warning("%s: Skipping bogus ASN %s" % (name, asn))
2446 continue
2447
2448 # Write to database
2449 self.db.execute("""
2450 INSERT INTO
2451 autnum_feeds
2452 (
2453 number,
2454 source,
2455 is_drop
2456 )
2457 VALUES
2458 (
2459 %s, %s, %s
2460 )""", "%s" % asn, name, True,
2461 )
2462
2463 @staticmethod
2464 def _parse_bool(block, key):
2465 val = block.get(key)
2466
2467 # There is no point to proceed when we got None
2468 if val is None:
2469 return
2470
2471 # Convert to lowercase
2472 val = val.lower()
2473
2474 # True
2475 if val in ("yes", "1"):
2476 return True
2477
2478 # False
2479 if val in ("no", "0"):
2480 return False
2481
2482 # Default to None
2483 return None
2484
2485 async def handle_import_countries(self, ns):
2486 with self.db.transaction():
2487 # Drop all data that we have
2488 self.db.execute("TRUNCATE TABLE countries")
2489
2490 for file in ns.file:
2491 for line in file:
2492 line = line.rstrip()
2493
2494 # Ignore any comments
2495 if line.startswith("#"):
2496 continue
2497
2498 try:
2499 country_code, continent_code, name = line.split(maxsplit=2)
2500 except:
2501 log.warning("Could not parse line: %s" % line)
2502 continue
2503
2504 self.db.execute("INSERT INTO countries(country_code, name, continent_code) \
2505 VALUES(%s, %s, %s) ON CONFLICT DO NOTHING", country_code, name, continent_code)
2506
2507
2508 def split_line(line):
2509 key, colon, val = line.partition(":")
2510
2511 # Strip any excess space
2512 key = key.strip()
2513 val = val.strip()
2514
2515 return key, val
2516
2517 def read_blocks(f):
2518 for block in iterate_over_blocks(f):
2519 type = None
2520 data = {}
2521
2522 for i, line in enumerate(block):
2523 key, value = line.split(":", 1)
2524
2525 # The key of the first line defines the type
2526 if i == 0:
2527 type = key
2528
2529 # Strip any excess whitespace
2530 value = value.strip()
2531
2532 # Store some values as a list
2533 if type == "geofeed" and key == "network":
2534 try:
2535 data[key].append(value)
2536 except KeyError:
2537 data[key] = [value]
2538
2539 # Otherwise store the value as string
2540 else:
2541 data[key] = value
2542
2543 yield type, data
2544
2545 def iterate_over_blocks(f, charsets=("utf-8", "latin1")):
2546 block = []
2547
2548 for line in f:
2549 # Skip commented lines
2550 if line.startswith(b"#") or line.startswith(b"%"):
2551 continue
2552
2553 # Convert to string
2554 for charset in charsets:
2555 try:
2556 line = line.decode(charset)
2557 except UnicodeDecodeError:
2558 continue
2559 else:
2560 break
2561
2562 # Remove any comments at the end of line
2563 line, hash, comment = line.partition("#")
2564
2565 # Strip any whitespace at the end of the line
2566 line = line.rstrip()
2567
2568 # If we cut off some comment and the line is empty, we can skip it
2569 if comment and not line:
2570 continue
2571
2572 # If the line has some content, keep collecting it
2573 if line:
2574 block.append(line)
2575 continue
2576
2577 # End the block on an empty line
2578 if block:
2579 yield block
2580
2581 # Reset the block
2582 block = []
2583
2584 # Return the last block
2585 if block:
2586 yield block
2587
2588 def iterate_over_lines(f):
2589 for line in f:
2590 # Decode the line
2591 line = line.decode()
2592
2593 # Strip the ending
2594 yield line.rstrip()
2595
2596 async def main():
2597 # Run the command line interface
2598 c = CLI()
2599
2600 await c.run()
2601
2602 asyncio.run(main())