]> git.ipfire.org Git - people/ms/libloc.git/blob - src/python/location.in
location.in: strip trailing whitespaces in database description while dumping
[people/ms/libloc.git] / src / python / location.in
1 #!/usr/bin/python3
2 ###############################################################################
3 # #
4 # libloc - A library to determine the location of someone on the Internet #
5 # #
6 # Copyright (C) 2017 IPFire Development Team <info@ipfire.org> #
7 # #
8 # This library is free software; you can redistribute it and/or #
9 # modify it under the terms of the GNU Lesser General Public #
10 # License as published by the Free Software Foundation; either #
11 # version 2.1 of the License, or (at your option) any later version. #
12 # #
13 # This library is distributed in the hope that it will be useful, #
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of #
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU #
16 # Lesser General Public License for more details. #
17 # #
18 ###############################################################################
19
20 import argparse
21 import datetime
22 import ipaddress
23 import logging
24 import os
25 import re
26 import shutil
27 import socket
28 import sys
29 import time
30
31 # Load our location module
32 import location
33 import location.downloader
34 import location.export
35
36 from location.i18n import _
37
38 # Setup logging
39 log = logging.getLogger("location")
40
41 # Output formatters
42
43 class CLI(object):
44 def parse_cli(self):
45 parser = argparse.ArgumentParser(
46 description=_("Location Database Command Line Interface"),
47 )
48 subparsers = parser.add_subparsers()
49
50 # Global configuration flags
51 parser.add_argument("--debug", action="store_true",
52 help=_("Enable debug output"))
53 parser.add_argument("--quiet", action="store_true",
54 help=_("Enable quiet mode"))
55
56 # version
57 parser.add_argument("--version", action="version",
58 version="%(prog)s @VERSION@")
59
60 # database
61 parser.add_argument("--database", "-d",
62 default="@databasedir@/database.db", help=_("Path to database"),
63 )
64
65 # public key
66 parser.add_argument("--public-key", "-k",
67 default="@databasedir@/signing-key.pem", help=_("Public Signing Key"),
68 )
69
70 # Show the database version
71 version = subparsers.add_parser("version",
72 help=_("Show database version"))
73 version.set_defaults(func=self.handle_version)
74
75 # lookup an IP address
76 lookup = subparsers.add_parser("lookup",
77 help=_("Lookup one or multiple IP addresses"),
78 )
79 lookup.add_argument("address", nargs="+")
80 lookup.set_defaults(func=self.handle_lookup)
81
82 # Dump the whole database
83 dump = subparsers.add_parser("dump",
84 help=_("Dump the entire database"),
85 )
86 dump.add_argument("output", nargs="?", type=argparse.FileType("w"))
87 dump.set_defaults(func=self.handle_dump)
88
89 # Update
90 update = subparsers.add_parser("update", help=_("Update database"))
91 update.set_defaults(func=self.handle_update)
92
93 # Verify
94 verify = subparsers.add_parser("verify",
95 help=_("Verify the downloaded database"))
96 verify.set_defaults(func=self.handle_verify)
97
98 # Get AS
99 get_as = subparsers.add_parser("get-as",
100 help=_("Get information about one or multiple Autonomous Systems"),
101 )
102 get_as.add_argument("asn", nargs="+")
103 get_as.set_defaults(func=self.handle_get_as)
104
105 # Search for AS
106 search_as = subparsers.add_parser("search-as",
107 help=_("Search for Autonomous Systems that match the string"),
108 )
109 search_as.add_argument("query", nargs=1)
110 search_as.set_defaults(func=self.handle_search_as)
111
112 # List all networks in an AS
113 list_networks_by_as = subparsers.add_parser("list-networks-by-as",
114 help=_("Lists all networks in an AS"),
115 )
116 list_networks_by_as.add_argument("asn", nargs=1, type=int)
117 list_networks_by_as.add_argument("--family", choices=("ipv6", "ipv4"))
118 list_networks_by_as.add_argument("--format",
119 choices=location.export.formats.keys(), default="list")
120 list_networks_by_as.set_defaults(func=self.handle_list_networks_by_as)
121
122 # List all networks in a country
123 list_networks_by_cc = subparsers.add_parser("list-networks-by-cc",
124 help=_("Lists all networks in a country"),
125 )
126 list_networks_by_cc.add_argument("country_code", nargs=1)
127 list_networks_by_cc.add_argument("--family", choices=("ipv6", "ipv4"))
128 list_networks_by_cc.add_argument("--format",
129 choices=location.export.formats.keys(), default="list")
130 list_networks_by_cc.set_defaults(func=self.handle_list_networks_by_cc)
131
132 # List all networks with flags
133 list_networks_by_flags = subparsers.add_parser("list-networks-by-flags",
134 help=_("Lists all networks with flags"),
135 )
136 list_networks_by_flags.add_argument("--anonymous-proxy",
137 action="store_true", help=_("Anonymous Proxies"),
138 )
139 list_networks_by_flags.add_argument("--satellite-provider",
140 action="store_true", help=_("Satellite Providers"),
141 )
142 list_networks_by_flags.add_argument("--anycast",
143 action="store_true", help=_("Anycasts"),
144 )
145 list_networks_by_flags.add_argument("--family", choices=("ipv6", "ipv4"))
146 list_networks_by_flags.add_argument("--format",
147 choices=location.export.formats.keys(), default="list")
148 list_networks_by_flags.set_defaults(func=self.handle_list_networks_by_flags)
149
150 # List countries
151 list_countries = subparsers.add_parser("list-countries",
152 help=_("Lists all countries"),
153 )
154 list_countries.add_argument("--show-name",
155 action="store_true", help=_("Show the name of the country"),
156 )
157 list_countries.add_argument("--show-continent",
158 action="store_true", help=_("Show the continent"),
159 )
160 list_countries.set_defaults(func=self.handle_list_countries)
161
162 # Export
163 export = subparsers.add_parser("export",
164 help=_("Exports data in many formats to load it into packet filters"),
165 )
166 export.add_argument("--format", help=_("Output format"),
167 choices=location.export.formats.keys(), default="list")
168 export.add_argument("--directory", help=_("Output directory"), required=True)
169 export.add_argument("--family",
170 help=_("Specify address family"), choices=("ipv6", "ipv4"),
171 )
172 export.add_argument("objects", nargs="*", help=_("List country codes or ASNs to export"))
173 export.set_defaults(func=self.handle_export)
174
175 args = parser.parse_args()
176
177 # Configure logging
178 if args.debug:
179 location.logger.set_level(logging.DEBUG)
180 elif args.quiet:
181 location.logger.set_level(logging.WARNING)
182
183 # Print usage if no action was given
184 if not "func" in args:
185 parser.print_usage()
186 sys.exit(2)
187
188 return args
189
190 def run(self):
191 # Parse command line arguments
192 args = self.parse_cli()
193
194 # Open database
195 try:
196 db = location.Database(args.database)
197 except FileNotFoundError as e:
198 sys.stderr.write("location: Could not open database %s: %s\n" \
199 % (args.database, e))
200 sys.exit(1)
201
202 # Translate family (if present)
203 if "family" in args:
204 if args.family == "ipv6":
205 args.family = socket.AF_INET6
206 elif args.family == "ipv4":
207 args.family = socket.AF_INET
208 else:
209 args.family = 0
210
211 # Call function
212 try:
213 ret = args.func(db, args)
214
215 # Catch invalid inputs
216 except ValueError as e:
217 sys.stderr.write("%s\n" % e)
218 ret = 2
219
220 # Return with exit code
221 if ret:
222 sys.exit(ret)
223
224 # Otherwise just exit
225 sys.exit(0)
226
227 def handle_version(self, db, ns):
228 """
229 Print the version of the database
230 """
231 t = time.strftime(
232 "%a, %d %b %Y %H:%M:%S GMT", time.gmtime(db.created_at),
233 )
234
235 print(t)
236
237 def handle_lookup(self, db, ns):
238 ret = 0
239
240 format = " %-24s: %s"
241
242 for address in ns.address:
243 try:
244 network = db.lookup(address)
245 except ValueError:
246 print(_("Invalid IP address: %s") % address, file=sys.stderr)
247
248 args = {
249 "address" : address,
250 "network" : network,
251 }
252
253 # Nothing found?
254 if not network:
255 print(_("Nothing found for %(address)s") % args, file=sys.stderr)
256 ret = 1
257 continue
258
259 print("%s:" % address)
260 print(format % (_("Network"), network))
261
262 # Print country
263 if network.country_code:
264 country = db.get_country(network.country_code)
265
266 print(format % (
267 _("Country"),
268 country.name if country else network.country_code),
269 )
270
271 # Print AS information
272 if network.asn:
273 autonomous_system = db.get_as(network.asn)
274
275 print(format % (
276 _("Autonomous System"),
277 autonomous_system or "AS%s" % network.asn),
278 )
279
280 # Anonymous Proxy
281 if network.has_flag(location.NETWORK_FLAG_ANONYMOUS_PROXY):
282 print(format % (
283 _("Anonymous Proxy"), _("yes"),
284 ))
285
286 # Satellite Provider
287 if network.has_flag(location.NETWORK_FLAG_SATELLITE_PROVIDER):
288 print(format % (
289 _("Satellite Provider"), _("yes"),
290 ))
291
292 # Anycast
293 if network.has_flag(location.NETWORK_FLAG_ANYCAST):
294 print(format % (
295 _("Anycast"), _("yes"),
296 ))
297
298 return ret
299
300 def handle_dump(self, db, ns):
301 # Use output file or write to stdout
302 f = ns.output or sys.stdout
303
304 # Format everything like this
305 format = "%-24s %s\n"
306
307 # Write metadata
308 f.write("#\n# Location Database Export\n#\n")
309
310 f.write("# Generated: %s\n" % time.strftime(
311 "%a, %d %b %Y %H:%M:%S GMT", time.gmtime(db.created_at),
312 ))
313
314 if db.vendor:
315 f.write("# Vendor: %s\n" % db.vendor)
316
317 if db.license:
318 f.write("# License: %s\n" % db.license)
319
320 f.write("#\n")
321
322 if db.description:
323 for line in db.description.splitlines():
324 line = "# %s" % line
325 f.write("%s\n" % line.rstrip())
326
327 f.write("#\n")
328
329 # Iterate over all ASes
330 for a in db.ases:
331 f.write("\n")
332 f.write(format % ("aut-num:", "AS%s" % a.number))
333 f.write(format % ("name:", a.name))
334
335 flags = {
336 location.NETWORK_FLAG_ANONYMOUS_PROXY : "is-anonymous-proxy:",
337 location.NETWORK_FLAG_SATELLITE_PROVIDER : "is-satellite-provider:",
338 location.NETWORK_FLAG_ANYCAST : "is-anycast:",
339 }
340
341 # Iterate over all networks
342 for n in db.networks:
343 f.write("\n")
344 f.write(format % ("net:", n))
345
346 if n.country_code:
347 f.write(format % ("country:", n.country_code))
348
349 if n.asn:
350 f.write(format % ("aut-num:", n.asn))
351
352 # Print all flags
353 for flag in flags:
354 if n.has_flag(flag):
355 f.write(format % (flags[flag], "yes"))
356
357 def handle_get_as(self, db, ns):
358 """
359 Gets information about Autonomous Systems
360 """
361 ret = 0
362
363 for asn in ns.asn:
364 try:
365 asn = int(asn)
366 except ValueError:
367 print(_("Invalid ASN: %s") % asn, file=sys.stderr)
368 ret = 1
369 continue
370
371 # Fetch AS from database
372 a = db.get_as(asn)
373
374 # Nothing found
375 if not a:
376 print(_("Could not find AS%s") % asn, file=sys.stderr)
377 ret = 1
378 continue
379
380 print(_("AS%(asn)s belongs to %(name)s") % { "asn" : a.number, "name" : a.name })
381
382 return ret
383
384 def handle_search_as(self, db, ns):
385 for query in ns.query:
386 # Print all matches ASes
387 for a in db.search_as(query):
388 print(a)
389
390 def handle_update(self, db, ns):
391 # Fetch the timestamp we need from DNS
392 t = location.discover_latest_version()
393
394 # Parse timestamp into datetime format
395 timestamp = datetime.datetime.fromtimestamp(t) if t else None
396
397 # Check the version of the local database
398 if db and timestamp and db.created_at >= timestamp.timestamp():
399 log.info("Already on the latest version")
400 return
401
402 # Download the database into the correct directory
403 tmpdir = os.path.dirname(ns.database)
404
405 # Create a downloader
406 d = location.downloader.Downloader()
407
408 # Try downloading a new database
409 try:
410 t = d.download(public_key=ns.public_key, timestamp=timestamp, tmpdir=tmpdir)
411
412 # If no file could be downloaded, log a message
413 except FileNotFoundError as e:
414 log.error("Could not download a new database")
415 return 1
416
417 # If we have not received a new file, there is nothing to do
418 if not t:
419 return 3
420
421 # Move temporary file to destination
422 shutil.move(t.name, ns.database)
423
424 return 0
425
426 def handle_verify(self, ns):
427 try:
428 db = location.Database(ns.database)
429 except FileNotFoundError as e:
430 log.error("%s: %s" % (ns.database, e))
431 return 127
432
433 # Verify the database
434 with open(ns.public_key, "r") as f:
435 if not db.verify(f):
436 log.error("Could not verify database")
437 return 1
438
439 # Success
440 log.debug("Database successfully verified")
441 return 0
442
443 def __get_output_formatter(self, ns):
444 try:
445 cls = location.export.formats[ns.format]
446 except KeyError:
447 cls = location.export.OutputFormatter
448
449 return cls
450
451 def handle_list_countries(self, db, ns):
452 for country in db.countries:
453 line = [
454 country.code,
455 ]
456
457 if ns.show_continent:
458 line.append(country.continent_code)
459
460 if ns.show_name:
461 line.append(country.name)
462
463 # Format the output
464 line = " ".join(line)
465
466 # Print the output
467 print(line)
468
469 def handle_list_networks_by_as(self, db, ns):
470 writer = self.__get_output_formatter(ns)
471
472 for asn in ns.asn:
473 f = writer(sys.stdout, prefix="AS%s" % asn)
474
475 # Print all matching networks
476 for n in db.search_networks(asn=asn, family=ns.family):
477 f.write(n)
478
479 f.finish()
480
481 def handle_list_networks_by_cc(self, db, ns):
482 writer = self.__get_output_formatter(ns)
483
484 for country_code in ns.country_code:
485 # Open standard output
486 f = writer(sys.stdout, prefix=country_code)
487
488 # Print all matching networks
489 for n in db.search_networks(country_code=country_code, family=ns.family):
490 f.write(n)
491
492 f.finish()
493
494 def handle_list_networks_by_flags(self, db, ns):
495 flags = 0
496
497 if ns.anonymous_proxy:
498 flags |= location.NETWORK_FLAG_ANONYMOUS_PROXY
499
500 if ns.satellite_provider:
501 flags |= location.NETWORK_FLAG_SATELLITE_PROVIDER
502
503 if ns.anycast:
504 flags |= location.NETWORK_FLAG_ANYCAST
505
506 if not flags:
507 raise ValueError(_("You must at least pass one flag"))
508
509 writer = self.__get_output_formatter(ns)
510 f = writer(sys.stdout, prefix="custom")
511
512 for n in db.search_networks(flags=flags, family=ns.family):
513 f.write(n)
514
515 f.finish()
516
517 def handle_export(self, db, ns):
518 countries, asns = [], []
519
520 # Translate family
521 if ns.family:
522 families = [ ns.family ]
523 else:
524 families = [ socket.AF_INET6, socket.AF_INET ]
525
526 for object in ns.objects:
527 m = re.match("^AS(\d+)$", object)
528 if m:
529 object = int(m.group(1))
530
531 asns.append(object)
532
533 elif location.country_code_is_valid(object) \
534 or object in ("A1", "A2", "A3"):
535 countries.append(object)
536
537 else:
538 log.warning("Invalid argument: %s" % object)
539 continue
540
541 # Default to exporting all countries
542 if not countries and not asns:
543 countries = ["A1", "A2", "A3"] + [country.code for country in db.countries]
544
545 # Select the output format
546 writer = self.__get_output_formatter(ns)
547
548 e = location.export.Exporter(db, writer)
549 e.export(ns.directory, countries=countries, asns=asns, families=families)
550
551
552 def main():
553 # Run the command line interface
554 c = CLI()
555 c.run()
556
557 main()