]> git.ipfire.org Git - location/libloc.git/blob - src/python/location.in
10269468be95dd99d9a67d3ec6e787dfc421f2dc
[location/libloc.git] / src / python / location.in
1 #!/usr/bin/python3
2 ###############################################################################
3 # #
4 # libloc - A library to determine the location of someone on the Internet #
5 # #
6 # Copyright (C) 2017-2021 IPFire Development Team <info@ipfire.org> #
7 # #
8 # This library is free software; you can redistribute it and/or #
9 # modify it under the terms of the GNU Lesser General Public #
10 # License as published by the Free Software Foundation; either #
11 # version 2.1 of the License, or (at your option) any later version. #
12 # #
13 # This library is distributed in the hope that it will be useful, #
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of #
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU #
16 # Lesser General Public License for more details. #
17 # #
18 ###############################################################################
19
20 import argparse
21 import datetime
22 import ipaddress
23 import logging
24 import os
25 import re
26 import shutil
27 import socket
28 import sys
29 import time
30
31 # Load our location module
32 import location
33 import location.downloader
34 import location.export
35
36 from location.i18n import _
37
38 # Setup logging
39 log = logging.getLogger("location")
40
41 # Output formatters
42
43 class CLI(object):
44 def parse_cli(self):
45 parser = argparse.ArgumentParser(
46 description=_("Location Database Command Line Interface"),
47 )
48 subparsers = parser.add_subparsers()
49
50 # Global configuration flags
51 parser.add_argument("--debug", action="store_true",
52 help=_("Enable debug output"))
53 parser.add_argument("--quiet", action="store_true",
54 help=_("Enable quiet mode"))
55
56 # version
57 parser.add_argument("--version", action="version",
58 version="%(prog)s @VERSION@")
59
60 # database
61 parser.add_argument("--database", "-d",
62 default="@databasedir@/database.db", help=_("Path to database"),
63 )
64
65 # public key
66 parser.add_argument("--public-key", "-k",
67 default="@databasedir@/signing-key.pem", help=_("Public Signing Key"),
68 )
69
70 # Show the database version
71 version = subparsers.add_parser("version",
72 help=_("Show database version"))
73 version.set_defaults(func=self.handle_version)
74
75 # lookup an IP address
76 lookup = subparsers.add_parser("lookup",
77 help=_("Lookup one or multiple IP addresses"),
78 )
79 lookup.add_argument("address", nargs="+")
80 lookup.set_defaults(func=self.handle_lookup)
81
82 # Dump the whole database
83 dump = subparsers.add_parser("dump",
84 help=_("Dump the entire database"),
85 )
86 dump.add_argument("output", nargs="?", type=argparse.FileType("w"))
87 dump.set_defaults(func=self.handle_dump)
88
89 # Update
90 update = subparsers.add_parser("update", help=_("Update database"))
91 update.add_argument("--cron",
92 help=_("Update the library only once per interval"),
93 choices=("daily", "weekly", "monthly"),
94 )
95 update.set_defaults(func=self.handle_update)
96
97 # Verify
98 verify = subparsers.add_parser("verify",
99 help=_("Verify the downloaded database"))
100 verify.set_defaults(func=self.handle_verify)
101
102 # Get AS
103 get_as = subparsers.add_parser("get-as",
104 help=_("Get information about one or multiple Autonomous Systems"),
105 )
106 get_as.add_argument("asn", nargs="+")
107 get_as.set_defaults(func=self.handle_get_as)
108
109 # Search for AS
110 search_as = subparsers.add_parser("search-as",
111 help=_("Search for Autonomous Systems that match the string"),
112 )
113 search_as.add_argument("query", nargs=1)
114 search_as.set_defaults(func=self.handle_search_as)
115
116 # List all networks in an AS
117 list_networks_by_as = subparsers.add_parser("list-networks-by-as",
118 help=_("Lists all networks in an AS"),
119 )
120 list_networks_by_as.add_argument("asn", nargs=1, type=int)
121 list_networks_by_as.add_argument("--family", choices=("ipv6", "ipv4"))
122 list_networks_by_as.add_argument("--format",
123 choices=location.export.formats.keys(), default="list")
124 list_networks_by_as.set_defaults(func=self.handle_list_networks_by_as)
125
126 # List all networks in a country
127 list_networks_by_cc = subparsers.add_parser("list-networks-by-cc",
128 help=_("Lists all networks in a country"),
129 )
130 list_networks_by_cc.add_argument("country_code", nargs=1)
131 list_networks_by_cc.add_argument("--family", choices=("ipv6", "ipv4"))
132 list_networks_by_cc.add_argument("--format",
133 choices=location.export.formats.keys(), default="list")
134 list_networks_by_cc.set_defaults(func=self.handle_list_networks_by_cc)
135
136 # List all networks with flags
137 list_networks_by_flags = subparsers.add_parser("list-networks-by-flags",
138 help=_("Lists all networks with flags"),
139 )
140 list_networks_by_flags.add_argument("--anonymous-proxy",
141 action="store_true", help=_("Anonymous Proxies"),
142 )
143 list_networks_by_flags.add_argument("--satellite-provider",
144 action="store_true", help=_("Satellite Providers"),
145 )
146 list_networks_by_flags.add_argument("--anycast",
147 action="store_true", help=_("Anycasts"),
148 )
149 list_networks_by_flags.add_argument("--drop",
150 action="store_true", help=_("Hostile Networks safe to drop"),
151 )
152 list_networks_by_flags.add_argument("--family", choices=("ipv6", "ipv4"))
153 list_networks_by_flags.add_argument("--format",
154 choices=location.export.formats.keys(), default="list")
155 list_networks_by_flags.set_defaults(func=self.handle_list_networks_by_flags)
156
157 # List bogons
158 list_bogons = subparsers.add_parser("list-bogons",
159 help=_("Lists all bogons"),
160 )
161 list_bogons.add_argument("--family", choices=("ipv6", "ipv4"))
162 list_bogons.add_argument("--format",
163 choices=location.export.formats.keys(), default="list")
164 list_bogons.set_defaults(func=self.handle_list_bogons)
165
166 # List countries
167 list_countries = subparsers.add_parser("list-countries",
168 help=_("Lists all countries"),
169 )
170 list_countries.add_argument("--show-name",
171 action="store_true", help=_("Show the name of the country"),
172 )
173 list_countries.add_argument("--show-continent",
174 action="store_true", help=_("Show the continent"),
175 )
176 list_countries.set_defaults(func=self.handle_list_countries)
177
178 # Export
179 export = subparsers.add_parser("export",
180 help=_("Exports data in many formats to load it into packet filters"),
181 )
182 export.add_argument("--format", help=_("Output format"),
183 choices=location.export.formats.keys(), default="list")
184 export.add_argument("--directory", help=_("Output directory"), required=True)
185 export.add_argument("--family",
186 help=_("Specify address family"), choices=("ipv6", "ipv4"),
187 )
188 export.add_argument("objects", nargs="*", help=_("List country codes or ASNs to export"))
189 export.set_defaults(func=self.handle_export)
190
191 args = parser.parse_args()
192
193 # Configure logging
194 if args.debug:
195 location.logger.set_level(logging.DEBUG)
196 elif args.quiet:
197 location.logger.set_level(logging.WARNING)
198
199 # Print usage if no action was given
200 if not "func" in args:
201 parser.print_usage()
202 sys.exit(2)
203
204 return args
205
206 def run(self):
207 # Parse command line arguments
208 args = self.parse_cli()
209
210 # Open database
211 try:
212 db = location.Database(args.database)
213 except FileNotFoundError as e:
214 # Allow continuing without a database
215 if args.func == self.handle_update:
216 db = None
217
218 else:
219 sys.stderr.write("location: Could not open database %s: %s\n" \
220 % (args.database, e))
221 sys.exit(1)
222
223 # Translate family (if present)
224 if "family" in args:
225 if args.family == "ipv6":
226 args.family = socket.AF_INET6
227 elif args.family == "ipv4":
228 args.family = socket.AF_INET
229 else:
230 args.family = 0
231
232 # Call function
233 try:
234 ret = args.func(db, args)
235
236 # Catch invalid inputs
237 except ValueError as e:
238 sys.stderr.write("%s\n" % e)
239 ret = 2
240
241 # Return with exit code
242 if ret:
243 sys.exit(ret)
244
245 # Otherwise just exit
246 sys.exit(0)
247
248 def handle_version(self, db, ns):
249 """
250 Print the version of the database
251 """
252 t = time.strftime(
253 "%a, %d %b %Y %H:%M:%S GMT", time.gmtime(db.created_at),
254 )
255
256 print(t)
257
258 def handle_lookup(self, db, ns):
259 ret = 0
260
261 format = " %-24s: %s"
262
263 for address in ns.address:
264 try:
265 network = db.lookup(address)
266 except ValueError:
267 print(_("Invalid IP address: %s") % address, file=sys.stderr)
268 return 2
269
270 args = {
271 "address" : address,
272 "network" : network,
273 }
274
275 # Nothing found?
276 if not network:
277 print(_("Nothing found for %(address)s") % args, file=sys.stderr)
278 ret = 1
279 continue
280
281 print("%s:" % address)
282 print(format % (_("Network"), network))
283
284 # Print country
285 if network.country_code:
286 country = db.get_country(network.country_code)
287
288 print(format % (
289 _("Country"),
290 country.name if country else network.country_code),
291 )
292
293 # Print AS information
294 if network.asn:
295 autonomous_system = db.get_as(network.asn)
296
297 print(format % (
298 _("Autonomous System"),
299 autonomous_system or "AS%s" % network.asn),
300 )
301
302 # Anonymous Proxy
303 if network.has_flag(location.NETWORK_FLAG_ANONYMOUS_PROXY):
304 print(format % (
305 _("Anonymous Proxy"), _("yes"),
306 ))
307
308 # Satellite Provider
309 if network.has_flag(location.NETWORK_FLAG_SATELLITE_PROVIDER):
310 print(format % (
311 _("Satellite Provider"), _("yes"),
312 ))
313
314 # Anycast
315 if network.has_flag(location.NETWORK_FLAG_ANYCAST):
316 print(format % (
317 _("Anycast"), _("yes"),
318 ))
319
320 # Hostile Network
321 if network.has_flag(location.NETWORK_FLAG_DROP):
322 print(format % (
323 _("Hostile Network safe to drop"), _("yes"),
324 ))
325
326 return ret
327
328 def handle_dump(self, db, ns):
329 # Use output file or write to stdout
330 f = ns.output or sys.stdout
331
332 # Format everything like this
333 format = "%-24s %s\n"
334
335 # Write metadata
336 f.write("#\n# Location Database Export\n#\n")
337
338 f.write("# Generated: %s\n" % time.strftime(
339 "%a, %d %b %Y %H:%M:%S GMT", time.gmtime(db.created_at),
340 ))
341
342 if db.vendor:
343 f.write("# Vendor: %s\n" % db.vendor)
344
345 if db.license:
346 f.write("# License: %s\n" % db.license)
347
348 f.write("#\n")
349
350 if db.description:
351 for line in db.description.splitlines():
352 line = "# %s" % line
353 f.write("%s\n" % line.rstrip())
354
355 f.write("#\n")
356
357 # Iterate over all ASes
358 for a in db.ases:
359 f.write("\n")
360 f.write(format % ("aut-num:", "AS%s" % a.number))
361 f.write(format % ("name:", a.name))
362
363 flags = {
364 location.NETWORK_FLAG_ANONYMOUS_PROXY : "is-anonymous-proxy:",
365 location.NETWORK_FLAG_SATELLITE_PROVIDER : "is-satellite-provider:",
366 location.NETWORK_FLAG_ANYCAST : "is-anycast:",
367 location.NETWORK_FLAG_DROP : "drop:",
368 }
369
370 # Iterate over all networks
371 for n in db.networks:
372 f.write("\n")
373 f.write(format % ("net:", n))
374
375 if n.country_code:
376 f.write(format % ("country:", n.country_code))
377
378 if n.asn:
379 f.write(format % ("aut-num:", n.asn))
380
381 # Print all flags
382 for flag in flags:
383 if n.has_flag(flag):
384 f.write(format % (flags[flag], "yes"))
385
386 def handle_get_as(self, db, ns):
387 """
388 Gets information about Autonomous Systems
389 """
390 ret = 0
391
392 for asn in ns.asn:
393 try:
394 asn = int(asn)
395 except ValueError:
396 print(_("Invalid ASN: %s") % asn, file=sys.stderr)
397 ret = 1
398 continue
399
400 # Fetch AS from database
401 a = db.get_as(asn)
402
403 # Nothing found
404 if not a:
405 print(_("Could not find AS%s") % asn, file=sys.stderr)
406 ret = 1
407 continue
408
409 print(_("AS%(asn)s belongs to %(name)s") % { "asn" : a.number, "name" : a.name })
410
411 return ret
412
413 def handle_search_as(self, db, ns):
414 for query in ns.query:
415 # Print all matches ASes
416 for a in db.search_as(query):
417 print(a)
418
419 def handle_update(self, db, ns):
420 if ns.cron and db:
421 now = time.time()
422
423 if ns.cron == "daily":
424 delta = datetime.timedelta(days=1)
425 elif ns.cron == "weekly":
426 delta = datetime.timedelta(days=7)
427 elif ns.cron == "monthly":
428 delta = datetime.timedelta(days=30)
429
430 delta = delta.total_seconds()
431
432 # Check if the database has recently been updated
433 if db.created_at >= (now - delta):
434 log.info(
435 _("The database has been updated recently"),
436 )
437 return 3
438
439 # Fetch the timestamp we need from DNS
440 t = location.discover_latest_version()
441
442 # Check the version of the local database
443 if db and t and db.created_at >= t:
444 log.info("Already on the latest version")
445 return
446
447 # Download the database into the correct directory
448 tmpdir = os.path.dirname(ns.database)
449
450 # Create a downloader
451 d = location.downloader.Downloader()
452
453 # Try downloading a new database
454 try:
455 t = d.download(public_key=ns.public_key, timestamp=t, tmpdir=tmpdir)
456
457 # If no file could be downloaded, log a message
458 except FileNotFoundError as e:
459 log.error("Could not download a new database")
460 return 1
461
462 # If we have not received a new file, there is nothing to do
463 if not t:
464 return 3
465
466 # Move temporary file to destination
467 shutil.move(t.name, ns.database)
468
469 return 0
470
471 def handle_verify(self, db, ns):
472 # Verify the database
473 with open(ns.public_key, "r") as f:
474 if not db.verify(f):
475 log.error("Could not verify database")
476 return 1
477
478 # Success
479 log.debug("Database successfully verified")
480 return 0
481
482 def __get_output_formatter(self, ns):
483 try:
484 cls = location.export.formats[ns.format]
485 except KeyError:
486 cls = location.export.OutputFormatter
487
488 return cls
489
490 def handle_list_countries(self, db, ns):
491 for country in db.countries:
492 line = [
493 country.code,
494 ]
495
496 if ns.show_continent:
497 line.append(country.continent_code)
498
499 if ns.show_name:
500 line.append(country.name)
501
502 # Format the output
503 line = " ".join(line)
504
505 # Print the output
506 print(line)
507
508 def handle_list_networks_by_as(self, db, ns):
509 writer = self.__get_output_formatter(ns)
510
511 for asn in ns.asn:
512 f = writer(sys.stdout, prefix="AS%s" % asn)
513
514 # Print all matching networks
515 for n in db.search_networks(asns=[asn], family=ns.family):
516 f.write(n)
517
518 f.finish()
519
520 def handle_list_networks_by_cc(self, db, ns):
521 writer = self.__get_output_formatter(ns)
522
523 for country_code in ns.country_code:
524 # Open standard output
525 f = writer(sys.stdout, prefix=country_code)
526
527 # Print all matching networks
528 for n in db.search_networks(country_codes=[country_code], family=ns.family):
529 f.write(n)
530
531 f.finish()
532
533 def handle_list_networks_by_flags(self, db, ns):
534 flags = 0
535
536 if ns.anonymous_proxy:
537 flags |= location.NETWORK_FLAG_ANONYMOUS_PROXY
538
539 if ns.satellite_provider:
540 flags |= location.NETWORK_FLAG_SATELLITE_PROVIDER
541
542 if ns.anycast:
543 flags |= location.NETWORK_FLAG_ANYCAST
544
545 if ns.drop:
546 flags |= location.NETWORK_FLAG_DROP
547
548 if not flags:
549 raise ValueError(_("You must at least pass one flag"))
550
551 writer = self.__get_output_formatter(ns)
552 f = writer(sys.stdout, prefix="custom")
553
554 for n in db.search_networks(flags=flags, family=ns.family):
555 f.write(n)
556
557 f.finish()
558
559 def handle_list_bogons(self, db, ns):
560 writer = self.__get_output_formatter(ns)
561 f = writer(sys.stdout, prefix="bogons")
562
563 for n in db.list_bogons(family=ns.family):
564 f.write(n)
565
566 f.finish()
567
568 def handle_export(self, db, ns):
569 countries, asns = [], []
570
571 # Translate family
572 if ns.family:
573 families = [ ns.family ]
574 else:
575 families = [ socket.AF_INET6, socket.AF_INET ]
576
577 for object in ns.objects:
578 m = re.match("^AS(\d+)$", object)
579 if m:
580 object = int(m.group(1))
581
582 asns.append(object)
583
584 elif location.country_code_is_valid(object) \
585 or object in ("A1", "A2", "A3", "XD"):
586 countries.append(object)
587
588 else:
589 log.warning("Invalid argument: %s" % object)
590 continue
591
592 # Default to exporting all countries
593 if not countries and not asns:
594 countries = ["A1", "A2", "A3", "XD"] + [country.code for country in db.countries]
595
596 # Select the output format
597 writer = self.__get_output_formatter(ns)
598
599 e = location.export.Exporter(db, writer)
600 e.export(ns.directory, countries=countries, asns=asns, families=families)
601
602
603 def format_timedelta(t):
604 s = []
605
606 if t.days:
607 s.append(
608 _("One Day", "%(days)s Days", t.days) % { "days" : t.days, }
609 )
610
611 hours = t.seconds // 3600
612 if hours:
613 s.append(
614 _("One Hour", "%(hours)s Hours", hours) % { "hours" : hours, }
615 )
616
617 minutes = (t.seconds % 3600) // 60
618 if minutes:
619 s.append(
620 _("One Minute", "%(minutes)s Minutes", minutes) % { "minutes" : minutes, }
621 )
622
623 seconds = t.seconds % 60
624 if t.seconds:
625 s.append(
626 _("One Second", "%(seconds)s Seconds", seconds) % { "seconds" : seconds, }
627 )
628
629 if not s:
630 return _("Now")
631
632 return _("%s ago") % ", ".join(s)
633
634 def main():
635 # Run the command line interface
636 c = CLI()
637 c.run()
638
639 main()