]> git.ipfire.org Git - people/ms/libloc.git/blob - src/python/location-query.in
260425546aef5752b098804340f4a6066a6f75b8
[people/ms/libloc.git] / src / python / location-query.in
1 #!/usr/bin/python3
2 ###############################################################################
3 # #
4 # libloc - A library to determine the location of someone on the Internet #
5 # #
6 # Copyright (C) 2017 IPFire Development Team <info@ipfire.org> #
7 # #
8 # This library is free software; you can redistribute it and/or #
9 # modify it under the terms of the GNU Lesser General Public #
10 # License as published by the Free Software Foundation; either #
11 # version 2.1 of the License, or (at your option) any later version. #
12 # #
13 # This library is distributed in the hope that it will be useful, #
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of #
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU #
16 # Lesser General Public License for more details. #
17 # #
18 ###############################################################################
19
20 import argparse
21 import gettext
22 import ipaddress
23 import os
24 import socket
25 import sys
26
27 # Load our location module
28 import location
29
30 # i18n
31 def _(singular, plural=None, n=None):
32 if plural:
33 return gettext.dngettext("libloc", singular, plural, n)
34
35 return gettext.dgettext("libloc", singular)
36
37 # Output formatters
38
39 class OutputFormatter(object):
40 def __init__(self, ns):
41 self.ns = ns
42
43 def __enter__(self):
44 # Open the output
45 self.open()
46
47 return self
48
49 def __exit__(self, type, value, tb):
50 if tb is None:
51 self.close()
52
53 @property
54 def name(self):
55 if "country_code" in self.ns:
56 return "networks_country_%s" % self.ns.country_code[0]
57
58 elif "asn" in self.ns:
59 return "networks_AS%s" % self.ns.asn[0]
60
61 def open(self):
62 pass
63
64 def close(self):
65 pass
66
67 def network(self, network):
68 print(network)
69
70
71 class IpsetOutputFormatter(OutputFormatter):
72 """
73 For nftables
74 """
75 def open(self):
76 print("create %s hash:net family inet hashsize 1024 maxelem 65536" % self.name)
77
78 def network(self, network):
79 print("add %s %s" % (self.name, network))
80
81
82 class NftablesOutputFormatter(OutputFormatter):
83 """
84 For nftables
85 """
86 def open(self):
87 print("define %s = {" % self.name)
88
89 def close(self):
90 print("}")
91
92 def network(self, network):
93 print(" %s," % network)
94
95
96 class XTGeoIPOutputFormatter(OutputFormatter):
97 """
98 Formats the output in that way, that it can be loaded by
99 the xt_geoip kernel module from xtables-addons.
100 """
101 def network(self, network):
102 n = ipaddress.ip_network("%s" % network)
103
104 for address in (n.network_address, n.broadcast_address):
105 bytes = socket.inet_pton(
106 socket.AF_INET6 if address.version == 6 else socket.AF_INET,
107 "%s" % address,
108 )
109
110 os.write(1, bytes)
111
112
113 class CLI(object):
114 output_formats = {
115 "ipset" : IpsetOutputFormatter,
116 "list" : OutputFormatter,
117 "nftables" : NftablesOutputFormatter,
118 "xt_geoip" : XTGeoIPOutputFormatter,
119 }
120
121 def parse_cli(self):
122 parser = argparse.ArgumentParser(
123 description=_("Location Database Command Line Interface"),
124 )
125 subparsers = parser.add_subparsers()
126
127 # Global configuration flags
128 parser.add_argument("--debug", action="store_true",
129 help=_("Enable debug output"))
130
131 # version
132 parser.add_argument("--version", action="version",
133 version="%(prog)s @VERSION@")
134
135 # database
136 parser.add_argument("--database", "-d",
137 default="@databasedir@/database.db", help=_("Path to database"),
138 )
139
140 # public key
141 parser.add_argument("--public-key", "-k",
142 default="@databasedir@/signing-key.pem", help=_("Public Signing Key"),
143 )
144
145 # lookup an IP address
146 lookup = subparsers.add_parser("lookup",
147 help=_("Lookup one or multiple IP addresses"),
148 )
149 lookup.add_argument("address", nargs="+")
150 lookup.set_defaults(func=self.handle_lookup)
151
152 # Get AS
153 get_as = subparsers.add_parser("get-as",
154 help=_("Get information about one or multiple Autonomous Systems"),
155 )
156 get_as.add_argument("asn", nargs="+")
157 get_as.set_defaults(func=self.handle_get_as)
158
159 # Search for AS
160 search_as = subparsers.add_parser("search-as",
161 help=_("Search for Autonomous Systems that match the string"),
162 )
163 search_as.add_argument("query", nargs=1)
164 search_as.set_defaults(func=self.handle_search_as)
165
166 # List all networks in an AS
167 list_networks_by_as = subparsers.add_parser("list-networks-by-as",
168 help=_("Lists all networks in an AS"),
169 )
170 list_networks_by_as.add_argument("asn", nargs=1, type=int)
171 list_networks_by_as.add_argument("--family", choices=("ipv6", "ipv4"))
172 list_networks_by_as.add_argument("--output-format",
173 choices=self.output_formats.keys(), default="list")
174 list_networks_by_as.set_defaults(func=self.handle_list_networks_by_as)
175
176 # List all networks in a country
177 list_networks_by_cc = subparsers.add_parser("list-networks-by-cc",
178 help=_("Lists all networks in a country"),
179 )
180 list_networks_by_cc.add_argument("country_code", nargs=1)
181 list_networks_by_cc.add_argument("--family", choices=("ipv6", "ipv4"))
182 list_networks_by_cc.add_argument("--output-format",
183 choices=self.output_formats.keys(), default="list")
184 list_networks_by_cc.set_defaults(func=self.handle_list_networks_by_cc)
185
186 # List all networks with flags
187 list_networks_by_flags = subparsers.add_parser("list-networks-by-flags",
188 help=_("Lists all networks with flags"),
189 )
190 list_networks_by_flags.add_argument("--anonymous-proxy",
191 action="store_true", help=_("Anonymous Proxies"),
192 )
193 list_networks_by_flags.add_argument("--satellite-provider",
194 action="store_true", help=_("Satellite Providers"),
195 )
196 list_networks_by_flags.add_argument("--anycast",
197 action="store_true", help=_("Anycasts"),
198 )
199 list_networks_by_flags.add_argument("--family", choices=("ipv6", "ipv4"))
200 list_networks_by_flags.add_argument("--output-format",
201 choices=self.output_formats.keys(), default="list")
202 list_networks_by_flags.set_defaults(func=self.handle_list_networks_by_flags)
203
204 args = parser.parse_args()
205
206 # Print usage if no action was given
207 if not "func" in args:
208 parser.print_usage()
209 sys.exit(2)
210
211 return args
212
213 def run(self):
214 # Parse command line arguments
215 args = self.parse_cli()
216
217 # Open database
218 try:
219 db = location.Database(args.database)
220 except FileNotFoundError as e:
221 sys.stderr.write("location-query: Could not open database %s: %s\n" \
222 % (args.database, e))
223 sys.exit(1)
224
225 # Translate family (if present)
226 if "family" in args:
227 if args.family == "ipv6":
228 args.family = socket.AF_INET6
229 elif args.family == "ipv4":
230 args.family = socket.AF_INET
231 else:
232 args.family = 0
233
234 # Call function
235 ret = args.func(db, args)
236
237 # Return with exit code
238 if ret:
239 sys.exit(ret)
240
241 # Otherwise just exit
242 sys.exit(0)
243
244 def handle_lookup(self, db, ns):
245 ret = 0
246
247 for address in ns.address:
248 try:
249 n = db.lookup(address)
250 except ValueError:
251 print(_("Invalid IP address: %s") % address, file=sys.stderr)
252
253 args = {
254 "address" : address,
255 "network" : n,
256 }
257
258 # Nothing found?
259 if not n:
260 print(_("Nothing found for %(address)s") % args, file=sys.stderr)
261 ret = 1
262 continue
263
264 # Try to retrieve the AS if we have an AS number
265 if n.asn:
266 a = db.get_as(n.asn)
267
268 # If we have found an AS we will print it in the message
269 if a:
270 args.update({
271 "as" : a,
272 })
273
274 print(_("%(address)s belongs to %(network)s which is a part of %(as)s") % args)
275 continue
276
277 print(_("%(address)s belongs to %(network)s") % args)
278
279 return ret
280
281 def handle_get_as(self, db, ns):
282 """
283 Gets information about Autonomous Systems
284 """
285 ret = 0
286
287 for asn in ns.asn:
288 try:
289 asn = int(asn)
290 except ValueError:
291 print(_("Invalid ASN: %s") % asn, file=sys.stderr)
292 ret = 1
293 continue
294
295 # Fetch AS from database
296 a = db.get_as(asn)
297
298 # Nothing found
299 if not a:
300 print(_("Could not find AS%s") % asn, file=sys.stderr)
301 ret = 1
302 continue
303
304 print(_("AS%(asn)s belongs to %(name)s") % { "asn" : a.number, "name" : a.name })
305
306 return ret
307
308 def handle_search_as(self, db, ns):
309 for query in ns.query:
310 # Print all matches ASes
311 for a in db.search_as(query):
312 print(a)
313
314 def __get_output_formatter(self, ns):
315 try:
316 cls = self.output_formats[ns.output_format]
317 except KeyError:
318 cls = OutputFormatter
319
320 return cls(ns)
321
322 def handle_list_networks_by_as(self, db, ns):
323 with self.__get_output_formatter(ns) as f:
324 for asn in ns.asn:
325 # Print all matching networks
326 for n in db.search_networks(asn=asn, family=ns.family):
327 f.network(n)
328
329 def handle_list_networks_by_cc(self, db, ns):
330 with self.__get_output_formatter(ns) as f:
331 for country_code in ns.country_code:
332 # Print all matching networks
333 for n in db.search_networks(country_code=country_code, family=ns.family):
334 f.network(n)
335
336 def handle_list_networks_by_flags(self, db, ns):
337 flags = 0
338
339 if ns.anonymous_proxy:
340 flags |= location.NETWORK_FLAG_ANONYMOUS_PROXY
341
342 if ns.satellite_provider:
343 flags |= location.NETWORK_FLAG_SATELLITE_PROVIDER
344
345 if ns.anycast:
346 flags |= location.NETWORK_FLAG_ANYCAST
347
348 with self.__get_output_formatter(ns) as f:
349 for n in db.search_networks(flags=flags, family=ns.family):
350 f.network(n)
351
352
353 def main():
354 # Run the command line interface
355 c = CLI()
356 c.run()
357
358 main()