]> git.ipfire.org Git - location/libloc.git/blob - src/python/location-query.in
location-query: Include flags in dump output
[location/libloc.git] / src / python / location-query.in
1 #!/usr/bin/python3
2 ###############################################################################
3 # #
4 # libloc - A library to determine the location of someone on the Internet #
5 # #
6 # Copyright (C) 2017 IPFire Development Team <info@ipfire.org> #
7 # #
8 # This library is free software; you can redistribute it and/or #
9 # modify it under the terms of the GNU Lesser General Public #
10 # License as published by the Free Software Foundation; either #
11 # version 2.1 of the License, or (at your option) any later version. #
12 # #
13 # This library is distributed in the hope that it will be useful, #
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of #
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU #
16 # Lesser General Public License for more details. #
17 # #
18 ###############################################################################
19
20 import argparse
21 import ipaddress
22 import os
23 import socket
24 import sys
25 import time
26
27 # Load our location module
28 import location
29 from location.i18n import _
30
31 # Output formatters
32
33 class OutputFormatter(object):
34 def __init__(self, ns):
35 self.ns = ns
36
37 def __enter__(self):
38 # Open the output
39 self.open()
40
41 return self
42
43 def __exit__(self, type, value, tb):
44 if tb is None:
45 self.close()
46
47 @property
48 def name(self):
49 if "country_code" in self.ns:
50 return "networks_country_%s" % self.ns.country_code[0]
51
52 elif "asn" in self.ns:
53 return "networks_AS%s" % self.ns.asn[0]
54
55 def open(self):
56 pass
57
58 def close(self):
59 pass
60
61 def network(self, network):
62 print(network)
63
64
65 class IpsetOutputFormatter(OutputFormatter):
66 """
67 For nftables
68 """
69 def open(self):
70 print("create %s hash:net family inet hashsize 1024 maxelem 65536" % self.name)
71
72 def network(self, network):
73 print("add %s %s" % (self.name, network))
74
75
76 class NftablesOutputFormatter(OutputFormatter):
77 """
78 For nftables
79 """
80 def open(self):
81 print("define %s = {" % self.name)
82
83 def close(self):
84 print("}")
85
86 def network(self, network):
87 print(" %s," % network)
88
89
90 class XTGeoIPOutputFormatter(OutputFormatter):
91 """
92 Formats the output in that way, that it can be loaded by
93 the xt_geoip kernel module from xtables-addons.
94 """
95 def network(self, network):
96 n = ipaddress.ip_network("%s" % network)
97
98 for address in (n.network_address, n.broadcast_address):
99 bytes = socket.inet_pton(
100 socket.AF_INET6 if address.version == 6 else socket.AF_INET,
101 "%s" % address,
102 )
103
104 os.write(1, bytes)
105
106
107 class CLI(object):
108 output_formats = {
109 "ipset" : IpsetOutputFormatter,
110 "list" : OutputFormatter,
111 "nftables" : NftablesOutputFormatter,
112 "xt_geoip" : XTGeoIPOutputFormatter,
113 }
114
115 def parse_cli(self):
116 parser = argparse.ArgumentParser(
117 description=_("Location Database Command Line Interface"),
118 )
119 subparsers = parser.add_subparsers()
120
121 # Global configuration flags
122 parser.add_argument("--debug", action="store_true",
123 help=_("Enable debug output"))
124 parser.add_argument("--quiet", action="store_true",
125 help=_("Enable quiet mode"))
126
127 # version
128 parser.add_argument("--version", action="version",
129 version="%(prog)s @VERSION@")
130
131 # database
132 parser.add_argument("--database", "-d",
133 default="@databasedir@/database.db", help=_("Path to database"),
134 )
135
136 # public key
137 parser.add_argument("--public-key", "-k",
138 default="@databasedir@/signing-key.pem", help=_("Public Signing Key"),
139 )
140
141 # lookup an IP address
142 lookup = subparsers.add_parser("lookup",
143 help=_("Lookup one or multiple IP addresses"),
144 )
145 lookup.add_argument("address", nargs="+")
146 lookup.set_defaults(func=self.handle_lookup)
147
148 # Dump the whole database
149 dump = subparsers.add_parser("dump",
150 help=_("Dump the entire database"),
151 )
152 dump.add_argument("output", nargs="?", type=argparse.FileType("w"))
153 dump.set_defaults(func=self.handle_dump)
154
155 # Get AS
156 get_as = subparsers.add_parser("get-as",
157 help=_("Get information about one or multiple Autonomous Systems"),
158 )
159 get_as.add_argument("asn", nargs="+")
160 get_as.set_defaults(func=self.handle_get_as)
161
162 # Search for AS
163 search_as = subparsers.add_parser("search-as",
164 help=_("Search for Autonomous Systems that match the string"),
165 )
166 search_as.add_argument("query", nargs=1)
167 search_as.set_defaults(func=self.handle_search_as)
168
169 # List all networks in an AS
170 list_networks_by_as = subparsers.add_parser("list-networks-by-as",
171 help=_("Lists all networks in an AS"),
172 )
173 list_networks_by_as.add_argument("asn", nargs=1, type=int)
174 list_networks_by_as.add_argument("--family", choices=("ipv6", "ipv4"))
175 list_networks_by_as.add_argument("--output-format",
176 choices=self.output_formats.keys(), default="list")
177 list_networks_by_as.set_defaults(func=self.handle_list_networks_by_as)
178
179 # List all networks in a country
180 list_networks_by_cc = subparsers.add_parser("list-networks-by-cc",
181 help=_("Lists all networks in a country"),
182 )
183 list_networks_by_cc.add_argument("country_code", nargs=1)
184 list_networks_by_cc.add_argument("--family", choices=("ipv6", "ipv4"))
185 list_networks_by_cc.add_argument("--output-format",
186 choices=self.output_formats.keys(), default="list")
187 list_networks_by_cc.set_defaults(func=self.handle_list_networks_by_cc)
188
189 # List all networks with flags
190 list_networks_by_flags = subparsers.add_parser("list-networks-by-flags",
191 help=_("Lists all networks with flags"),
192 )
193 list_networks_by_flags.add_argument("--anonymous-proxy",
194 action="store_true", help=_("Anonymous Proxies"),
195 )
196 list_networks_by_flags.add_argument("--satellite-provider",
197 action="store_true", help=_("Satellite Providers"),
198 )
199 list_networks_by_flags.add_argument("--anycast",
200 action="store_true", help=_("Anycasts"),
201 )
202 list_networks_by_flags.add_argument("--family", choices=("ipv6", "ipv4"))
203 list_networks_by_flags.add_argument("--output-format",
204 choices=self.output_formats.keys(), default="list")
205 list_networks_by_flags.set_defaults(func=self.handle_list_networks_by_flags)
206
207 args = parser.parse_args()
208
209 # Configure logging
210 if args.debug:
211 location.logger.set_level(logging.DEBUG)
212 elif args.quiet:
213 location.logger.set_level(logging.WARNING)
214
215 # Print usage if no action was given
216 if not "func" in args:
217 parser.print_usage()
218 sys.exit(2)
219
220 return args
221
222 def run(self):
223 # Parse command line arguments
224 args = self.parse_cli()
225
226 # Open database
227 try:
228 db = location.Database(args.database)
229 except FileNotFoundError as e:
230 sys.stderr.write("location-query: Could not open database %s: %s\n" \
231 % (args.database, e))
232 sys.exit(1)
233
234 # Translate family (if present)
235 if "family" in args:
236 if args.family == "ipv6":
237 args.family = socket.AF_INET6
238 elif args.family == "ipv4":
239 args.family = socket.AF_INET
240 else:
241 args.family = 0
242
243 # Call function
244 ret = args.func(db, args)
245
246 # Return with exit code
247 if ret:
248 sys.exit(ret)
249
250 # Otherwise just exit
251 sys.exit(0)
252
253 def handle_lookup(self, db, ns):
254 ret = 0
255
256 format = " %-24s: %s"
257
258 for address in ns.address:
259 try:
260 network = db.lookup(address)
261 except ValueError:
262 print(_("Invalid IP address: %s") % address, file=sys.stderr)
263
264 args = {
265 "address" : address,
266 "network" : network,
267 }
268
269 # Nothing found?
270 if not network:
271 print(_("Nothing found for %(address)s") % args, file=sys.stderr)
272 ret = 1
273 continue
274
275 print("%s:" % address)
276 print(format % (_("Network"), network))
277
278 # Print country
279 if network.country_code:
280 print(format % (_("Country"), network.country_code))
281
282 # Print AS information
283 if network.asn:
284 autonomous_system = db.get_as(network.asn)
285
286 print(format % (
287 _("Autonomous System"),
288 autonomous_system or "AS%s" % network.asn),
289 )
290
291 # Anonymous Proxy
292 if network.has_flag(location.NETWORK_FLAG_ANONYMOUS_PROXY):
293 print(format % (
294 _("Anonymous Proxy"), _("yes"),
295 ))
296
297 # Satellite Provider
298 if network.has_flag(location.NETWORK_FLAG_SATELLITE_PROVIDER):
299 print(format % (
300 _("Satellite Provider"), _("yes"),
301 ))
302
303 # Anycast
304 if network.has_flag(location.NETWORK_FLAG_ANYCAST):
305 print(format % (
306 _("Anycast"), _("yes"),
307 ))
308
309 return ret
310
311 def handle_dump(self, db, ns):
312 # Use output file or write to stdout
313 f = ns.output or sys.stdout
314
315 # Format everything like this
316 format = "%-24s %s\n"
317
318 # Write metadata
319 f.write("#\n# Location Database Export\n#\n")
320
321 f.write("# Generated: %s\n" % time.strftime(
322 "%a, %d %b %Y %H:%M:%S GMT", time.gmtime(db.created_at),
323 ))
324
325 if db.vendor:
326 f.write("# Vendor: %s\n" % db.vendor)
327
328 if db.license:
329 f.write("# License: %s\n" % db.license)
330
331 f.write("#\n")
332
333 if db.description:
334 for line in db.description.splitlines():
335 f.write("# %s\n" % line)
336
337 f.write("#\n")
338
339 # Iterate over all ASes
340 for a in db.ases:
341 f.write("\n")
342 f.write(format % ("aut-num:", "AS%s" % a.number))
343 f.write(format % ("name:", a.name))
344
345 flags = {
346 location.NETWORK_FLAG_ANONYMOUS_PROXY : "is-anonymous-proxy:",
347 location.NETWORK_FLAG_SATELLITE_PROVIDER : "is-satellite-provider:",
348 location.NETWORK_FLAG_ANYCAST : "is-anycast:",
349 }
350
351 # Iterate over all networks
352 for n in db.networks:
353 f.write("\n")
354 f.write(format % ("net:", n))
355
356 if n.country_code:
357 f.write(format % ("country:", n.country_code))
358
359 if n.asn:
360 f.write(format % ("autnum:", n.asn))
361
362 # Print all flags
363 for flag in flags:
364 if n.has_flag(flag):
365 f.write(format % (flags[flag], "yes"))
366
367 def handle_get_as(self, db, ns):
368 """
369 Gets information about Autonomous Systems
370 """
371 ret = 0
372
373 for asn in ns.asn:
374 try:
375 asn = int(asn)
376 except ValueError:
377 print(_("Invalid ASN: %s") % asn, file=sys.stderr)
378 ret = 1
379 continue
380
381 # Fetch AS from database
382 a = db.get_as(asn)
383
384 # Nothing found
385 if not a:
386 print(_("Could not find AS%s") % asn, file=sys.stderr)
387 ret = 1
388 continue
389
390 print(_("AS%(asn)s belongs to %(name)s") % { "asn" : a.number, "name" : a.name })
391
392 return ret
393
394 def handle_search_as(self, db, ns):
395 for query in ns.query:
396 # Print all matches ASes
397 for a in db.search_as(query):
398 print(a)
399
400 def __get_output_formatter(self, ns):
401 try:
402 cls = self.output_formats[ns.output_format]
403 except KeyError:
404 cls = OutputFormatter
405
406 return cls(ns)
407
408 def handle_list_networks_by_as(self, db, ns):
409 with self.__get_output_formatter(ns) as f:
410 for asn in ns.asn:
411 # Print all matching networks
412 for n in db.search_networks(asn=asn, family=ns.family):
413 f.network(n)
414
415 def handle_list_networks_by_cc(self, db, ns):
416 with self.__get_output_formatter(ns) as f:
417 for country_code in ns.country_code:
418 # Print all matching networks
419 for n in db.search_networks(country_code=country_code, family=ns.family):
420 f.network(n)
421
422 def handle_list_networks_by_flags(self, db, ns):
423 flags = 0
424
425 if ns.anonymous_proxy:
426 flags |= location.NETWORK_FLAG_ANONYMOUS_PROXY
427
428 if ns.satellite_provider:
429 flags |= location.NETWORK_FLAG_SATELLITE_PROVIDER
430
431 if ns.anycast:
432 flags |= location.NETWORK_FLAG_ANYCAST
433
434 with self.__get_output_formatter(ns) as f:
435 for n in db.search_networks(flags=flags, family=ns.family):
436 f.network(n)
437
438
439 def main():
440 # Run the command line interface
441 c = CLI()
442 c.run()
443
444 main()