]> git.ipfire.org Git - location/libloc.git/blob - src/python/location-query.in
python: Move common i18n code into python module
[location/libloc.git] / src / python / location-query.in
1 #!/usr/bin/python3
2 ###############################################################################
3 # #
4 # libloc - A library to determine the location of someone on the Internet #
5 # #
6 # Copyright (C) 2017 IPFire Development Team <info@ipfire.org> #
7 # #
8 # This library is free software; you can redistribute it and/or #
9 # modify it under the terms of the GNU Lesser General Public #
10 # License as published by the Free Software Foundation; either #
11 # version 2.1 of the License, or (at your option) any later version. #
12 # #
13 # This library is distributed in the hope that it will be useful, #
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of #
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU #
16 # Lesser General Public License for more details. #
17 # #
18 ###############################################################################
19
20 import argparse
21 import ipaddress
22 import os
23 import socket
24 import sys
25
26 # Load our location module
27 import location
28 from location.i18n import _
29
30 # Output formatters
31
32 class OutputFormatter(object):
33 def __init__(self, ns):
34 self.ns = ns
35
36 def __enter__(self):
37 # Open the output
38 self.open()
39
40 return self
41
42 def __exit__(self, type, value, tb):
43 if tb is None:
44 self.close()
45
46 @property
47 def name(self):
48 if "country_code" in self.ns:
49 return "networks_country_%s" % self.ns.country_code[0]
50
51 elif "asn" in self.ns:
52 return "networks_AS%s" % self.ns.asn[0]
53
54 def open(self):
55 pass
56
57 def close(self):
58 pass
59
60 def network(self, network):
61 print(network)
62
63
64 class IpsetOutputFormatter(OutputFormatter):
65 """
66 For nftables
67 """
68 def open(self):
69 print("create %s hash:net family inet hashsize 1024 maxelem 65536" % self.name)
70
71 def network(self, network):
72 print("add %s %s" % (self.name, network))
73
74
75 class NftablesOutputFormatter(OutputFormatter):
76 """
77 For nftables
78 """
79 def open(self):
80 print("define %s = {" % self.name)
81
82 def close(self):
83 print("}")
84
85 def network(self, network):
86 print(" %s," % network)
87
88
89 class XTGeoIPOutputFormatter(OutputFormatter):
90 """
91 Formats the output in that way, that it can be loaded by
92 the xt_geoip kernel module from xtables-addons.
93 """
94 def network(self, network):
95 n = ipaddress.ip_network("%s" % network)
96
97 for address in (n.network_address, n.broadcast_address):
98 bytes = socket.inet_pton(
99 socket.AF_INET6 if address.version == 6 else socket.AF_INET,
100 "%s" % address,
101 )
102
103 os.write(1, bytes)
104
105
106 class CLI(object):
107 output_formats = {
108 "ipset" : IpsetOutputFormatter,
109 "list" : OutputFormatter,
110 "nftables" : NftablesOutputFormatter,
111 "xt_geoip" : XTGeoIPOutputFormatter,
112 }
113
114 def parse_cli(self):
115 parser = argparse.ArgumentParser(
116 description=_("Location Database Command Line Interface"),
117 )
118 subparsers = parser.add_subparsers()
119
120 # Global configuration flags
121 parser.add_argument("--debug", action="store_true",
122 help=_("Enable debug output"))
123
124 # version
125 parser.add_argument("--version", action="version",
126 version="%(prog)s @VERSION@")
127
128 # database
129 parser.add_argument("--database", "-d",
130 default="@databasedir@/database.db", help=_("Path to database"),
131 )
132
133 # public key
134 parser.add_argument("--public-key", "-k",
135 default="@databasedir@/signing-key.pem", help=_("Public Signing Key"),
136 )
137
138 # lookup an IP address
139 lookup = subparsers.add_parser("lookup",
140 help=_("Lookup one or multiple IP addresses"),
141 )
142 lookup.add_argument("address", nargs="+")
143 lookup.set_defaults(func=self.handle_lookup)
144
145 # Get AS
146 get_as = subparsers.add_parser("get-as",
147 help=_("Get information about one or multiple Autonomous Systems"),
148 )
149 get_as.add_argument("asn", nargs="+")
150 get_as.set_defaults(func=self.handle_get_as)
151
152 # Search for AS
153 search_as = subparsers.add_parser("search-as",
154 help=_("Search for Autonomous Systems that match the string"),
155 )
156 search_as.add_argument("query", nargs=1)
157 search_as.set_defaults(func=self.handle_search_as)
158
159 # List all networks in an AS
160 list_networks_by_as = subparsers.add_parser("list-networks-by-as",
161 help=_("Lists all networks in an AS"),
162 )
163 list_networks_by_as.add_argument("asn", nargs=1, type=int)
164 list_networks_by_as.add_argument("--family", choices=("ipv6", "ipv4"))
165 list_networks_by_as.add_argument("--output-format",
166 choices=self.output_formats.keys(), default="list")
167 list_networks_by_as.set_defaults(func=self.handle_list_networks_by_as)
168
169 # List all networks in a country
170 list_networks_by_cc = subparsers.add_parser("list-networks-by-cc",
171 help=_("Lists all networks in a country"),
172 )
173 list_networks_by_cc.add_argument("country_code", nargs=1)
174 list_networks_by_cc.add_argument("--family", choices=("ipv6", "ipv4"))
175 list_networks_by_cc.add_argument("--output-format",
176 choices=self.output_formats.keys(), default="list")
177 list_networks_by_cc.set_defaults(func=self.handle_list_networks_by_cc)
178
179 # List all networks with flags
180 list_networks_by_flags = subparsers.add_parser("list-networks-by-flags",
181 help=_("Lists all networks with flags"),
182 )
183 list_networks_by_flags.add_argument("--anonymous-proxy",
184 action="store_true", help=_("Anonymous Proxies"),
185 )
186 list_networks_by_flags.add_argument("--satellite-provider",
187 action="store_true", help=_("Satellite Providers"),
188 )
189 list_networks_by_flags.add_argument("--anycast",
190 action="store_true", help=_("Anycasts"),
191 )
192 list_networks_by_flags.add_argument("--family", choices=("ipv6", "ipv4"))
193 list_networks_by_flags.add_argument("--output-format",
194 choices=self.output_formats.keys(), default="list")
195 list_networks_by_flags.set_defaults(func=self.handle_list_networks_by_flags)
196
197 args = parser.parse_args()
198
199 # Print usage if no action was given
200 if not "func" in args:
201 parser.print_usage()
202 sys.exit(2)
203
204 return args
205
206 def run(self):
207 # Parse command line arguments
208 args = self.parse_cli()
209
210 # Open database
211 try:
212 db = location.Database(args.database)
213 except FileNotFoundError as e:
214 sys.stderr.write("location-query: Could not open database %s: %s\n" \
215 % (args.database, e))
216 sys.exit(1)
217
218 # Translate family (if present)
219 if "family" in args:
220 if args.family == "ipv6":
221 args.family = socket.AF_INET6
222 elif args.family == "ipv4":
223 args.family = socket.AF_INET
224 else:
225 args.family = 0
226
227 # Call function
228 ret = args.func(db, args)
229
230 # Return with exit code
231 if ret:
232 sys.exit(ret)
233
234 # Otherwise just exit
235 sys.exit(0)
236
237 def handle_lookup(self, db, ns):
238 ret = 0
239
240 for address in ns.address:
241 try:
242 n = db.lookup(address)
243 except ValueError:
244 print(_("Invalid IP address: %s") % address, file=sys.stderr)
245
246 args = {
247 "address" : address,
248 "network" : n,
249 }
250
251 # Nothing found?
252 if not n:
253 print(_("Nothing found for %(address)s") % args, file=sys.stderr)
254 ret = 1
255 continue
256
257 # Try to retrieve the AS if we have an AS number
258 if n.asn:
259 a = db.get_as(n.asn)
260
261 # If we have found an AS we will print it in the message
262 if a:
263 args.update({
264 "as" : a,
265 })
266
267 print(_("%(address)s belongs to %(network)s which is a part of %(as)s") % args)
268 continue
269
270 print(_("%(address)s belongs to %(network)s") % args)
271
272 return ret
273
274 def handle_get_as(self, db, ns):
275 """
276 Gets information about Autonomous Systems
277 """
278 ret = 0
279
280 for asn in ns.asn:
281 try:
282 asn = int(asn)
283 except ValueError:
284 print(_("Invalid ASN: %s") % asn, file=sys.stderr)
285 ret = 1
286 continue
287
288 # Fetch AS from database
289 a = db.get_as(asn)
290
291 # Nothing found
292 if not a:
293 print(_("Could not find AS%s") % asn, file=sys.stderr)
294 ret = 1
295 continue
296
297 print(_("AS%(asn)s belongs to %(name)s") % { "asn" : a.number, "name" : a.name })
298
299 return ret
300
301 def handle_search_as(self, db, ns):
302 for query in ns.query:
303 # Print all matches ASes
304 for a in db.search_as(query):
305 print(a)
306
307 def __get_output_formatter(self, ns):
308 try:
309 cls = self.output_formats[ns.output_format]
310 except KeyError:
311 cls = OutputFormatter
312
313 return cls(ns)
314
315 def handle_list_networks_by_as(self, db, ns):
316 with self.__get_output_formatter(ns) as f:
317 for asn in ns.asn:
318 # Print all matching networks
319 for n in db.search_networks(asn=asn, family=ns.family):
320 f.network(n)
321
322 def handle_list_networks_by_cc(self, db, ns):
323 with self.__get_output_formatter(ns) as f:
324 for country_code in ns.country_code:
325 # Print all matching networks
326 for n in db.search_networks(country_code=country_code, family=ns.family):
327 f.network(n)
328
329 def handle_list_networks_by_flags(self, db, ns):
330 flags = 0
331
332 if ns.anonymous_proxy:
333 flags |= location.NETWORK_FLAG_ANONYMOUS_PROXY
334
335 if ns.satellite_provider:
336 flags |= location.NETWORK_FLAG_SATELLITE_PROVIDER
337
338 if ns.anycast:
339 flags |= location.NETWORK_FLAG_ANYCAST
340
341 with self.__get_output_formatter(ns) as f:
342 for n in db.search_networks(flags=flags, family=ns.family):
343 f.network(n)
344
345
346 def main():
347 # Run the command line interface
348 c = CLI()
349 c.run()
350
351 main()