]> git.ipfire.org Git - people/ms/libloc.git/blob - src/python/location-query.in
location-query: Allow filtering networks by family
[people/ms/libloc.git] / src / python / location-query.in
1 #!/usr/bin/python3
2 ###############################################################################
3 # #
4 # libloc - A library to determine the location of someone on the Internet #
5 # #
6 # Copyright (C) 2017 IPFire Development Team <info@ipfire.org> #
7 # #
8 # This library is free software; you can redistribute it and/or #
9 # modify it under the terms of the GNU Lesser General Public #
10 # License as published by the Free Software Foundation; either #
11 # version 2.1 of the License, or (at your option) any later version. #
12 # #
13 # This library is distributed in the hope that it will be useful, #
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of #
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU #
16 # Lesser General Public License for more details. #
17 # #
18 ###############################################################################
19
20 import argparse
21 import gettext
22 import ipaddress
23 import os
24 import socket
25 import sys
26 import syslog
27
28 # Load our location module
29 import location
30
31 # i18n
32 def _(singular, plural=None, n=None):
33 if plural:
34 return gettext.dngettext("libloc", singular, plural, n)
35
36 return gettext.dgettext("libloc", singular)
37
38 # Output formatters
39
40 class OutputFormatter(object):
41 def __init__(self, ns):
42 self.ns = ns
43
44 def __enter__(self):
45 # Open the output
46 self.open()
47
48 return self
49
50 def __exit__(self, type, value, tb):
51 if tb is None:
52 self.close()
53
54 @property
55 def name(self):
56 if "country_code" in self.ns:
57 return "networks_country_%s" % self.ns.country_code[0]
58
59 elif "asn" in self.ns:
60 return "networks_AS%s" % self.ns.asn[0]
61
62 def open(self):
63 pass
64
65 def close(self):
66 pass
67
68 def network(self, network):
69 print(network)
70
71
72 class IpsetOutputFormatter(OutputFormatter):
73 """
74 For nftables
75 """
76 def open(self):
77 print("create %s hash:net family inet hashsize 1024 maxelem 65536" % self.name)
78
79 def network(self, network):
80 print("add %s %s" % (self.name, network))
81
82
83 class NftablesOutputFormatter(OutputFormatter):
84 """
85 For nftables
86 """
87 def open(self):
88 print("define %s = {" % self.name)
89
90 def close(self):
91 print("}")
92
93 def network(self, network):
94 print(" %s," % network)
95
96
97 class XTGeoIPOutputFormatter(OutputFormatter):
98 """
99 Formats the output in that way, that it can be loaded by
100 the xt_geoip kernel module from xtables-addons.
101 """
102 def network(self, network):
103 n = ipaddress.ip_network("%s" % network)
104
105 for address in (n.network_address, n.broadcast_address):
106 bytes = socket.inet_pton(
107 socket.AF_INET6 if address.version == 6 else socket.AF_INET,
108 "%s" % address,
109 )
110
111 os.write(1, bytes)
112
113
114 class CLI(object):
115 output_formats = {
116 "ipset" : IpsetOutputFormatter,
117 "list" : OutputFormatter,
118 "nftables" : NftablesOutputFormatter,
119 "xt_geoip" : XTGeoIPOutputFormatter,
120 }
121
122 def parse_cli(self):
123 parser = argparse.ArgumentParser(
124 description=_("Location Database Command Line Interface"),
125 )
126 subparsers = parser.add_subparsers()
127
128 # Global configuration flags
129 parser.add_argument("--debug", action="store_true",
130 help=_("Enable debug output"))
131
132 # version
133 parser.add_argument("--version", action="version",
134 version="%%(prog)s %s" % location.__version__)
135
136 # database
137 parser.add_argument("--database", "-d",
138 default="@databasedir@/database.db", help=_("Path to database"),
139 )
140
141 # public key
142 parser.add_argument("--public-key", "-k",
143 default="@databasedir@/signing-key.pem", help=_("Public Signing Key"),
144 )
145
146 # lookup an IP address
147 lookup = subparsers.add_parser("lookup",
148 help=_("Lookup one or multiple IP addresses"),
149 )
150 lookup.add_argument("address", nargs="+")
151 lookup.set_defaults(func=self.handle_lookup)
152
153 # Get AS
154 get_as = subparsers.add_parser("get-as",
155 help=_("Get information about one or multiple Autonomous Systems"),
156 )
157 get_as.add_argument("asn", nargs="+")
158 get_as.set_defaults(func=self.handle_get_as)
159
160 # Search for AS
161 search_as = subparsers.add_parser("search-as",
162 help=_("Search for Autonomous Systems that match the string"),
163 )
164 search_as.add_argument("query", nargs=1)
165 search_as.set_defaults(func=self.handle_search_as)
166
167 # List all networks in an AS
168 list_networks_by_as = subparsers.add_parser("list-networks-by-as",
169 help=_("Lists all networks in an AS"),
170 )
171 list_networks_by_as.add_argument("asn", nargs=1, type=int)
172 list_networks_by_as.add_argument("--family", choices=("ipv6", "ipv4"))
173 list_networks_by_as.add_argument("--output-format",
174 choices=self.output_formats.keys(), default="list")
175 list_networks_by_as.set_defaults(func=self.handle_list_networks_by_as)
176
177 # List all networks in a country
178 list_networks_by_cc = subparsers.add_parser("list-networks-by-cc",
179 help=_("Lists all networks in a country"),
180 )
181 list_networks_by_cc.add_argument("country_code", nargs=1)
182 list_networks_by_cc.add_argument("--family", choices=("ipv6", "ipv4"))
183 list_networks_by_cc.add_argument("--output-format",
184 choices=self.output_formats.keys(), default="list")
185 list_networks_by_cc.set_defaults(func=self.handle_list_networks_by_cc)
186
187 # List all networks with flags
188 list_networks_by_flags = subparsers.add_parser("list-networks-by-flags",
189 help=_("Lists all networks with flags"),
190 )
191 list_networks_by_flags.add_argument("--anonymous-proxy",
192 action="store_true", help=_("Anonymous Proxies"),
193 )
194 list_networks_by_flags.add_argument("--satellite-provider",
195 action="store_true", help=_("Satellite Providers"),
196 )
197 list_networks_by_flags.add_argument("--anycast",
198 action="store_true", help=_("Anycasts"),
199 )
200 list_networks_by_flags.add_argument("--family", choices=("ipv6", "ipv4"))
201 list_networks_by_flags.add_argument("--output-format",
202 choices=self.output_formats.keys(), default="list")
203 list_networks_by_flags.set_defaults(func=self.handle_list_networks_by_flags)
204
205 args = parser.parse_args()
206
207 # Print usage if no action was given
208 if not "func" in args:
209 parser.print_usage()
210 sys.exit(2)
211
212 return args
213
214 def run(self):
215 # Parse command line arguments
216 args = self.parse_cli()
217
218 # Open database
219 try:
220 db = location.Database(args.database)
221 except FileNotFoundError as e:
222 sys.stderr.write("location-query: Could not open database %s: %s\n" \
223 % (args.database, e))
224 sys.exit(1)
225
226 # Verify the database
227 try:
228 with open(args.public_key, "r") as f:
229 if not db.verify(f):
230 sys.stderr.write("location-query: Could not verify the database\n")
231 sys.exit(1)
232
233 # Catch any errors when loading the public key
234 except (FileNotFoundError, OSError) as e:
235 sys.stderr.write("Could not read the public key: %s\n" % e)
236 sys.exit(1)
237
238 # Translate family
239 if args.family == "ipv6":
240 args.family = socket.AF_INET6
241 elif args.family == "ipv4":
242 args.family = socket.AF_INET
243 else:
244 args.family = 0
245
246 # Call function
247 ret = args.func(db, args)
248
249 # Return with exit code
250 if ret:
251 sys.exit(ret)
252
253 # Otherwise just exit
254 sys.exit(0)
255
256 def handle_lookup(self, db, ns):
257 ret = 0
258
259 for address in ns.address:
260 try:
261 n = db.lookup(address)
262 except ValueError:
263 print(_("Invalid IP address: %s") % address, file=sys.stderr)
264
265 args = {
266 "address" : address,
267 "network" : n,
268 }
269
270 # Nothing found?
271 if not n:
272 print(_("Nothing found for %(address)s") % args, file=sys.stderr)
273 ret = 1
274 continue
275
276 # Try to retrieve the AS if we have an AS number
277 if n.asn:
278 a = db.get_as(n.asn)
279
280 # If we have found an AS we will print it in the message
281 if a:
282 args.update({
283 "as" : a,
284 })
285
286 print(_("%(address)s belongs to %(network)s which is a part of %(as)s") % args)
287 continue
288
289 print(_("%(address)s belongs to %(network)s") % args)
290
291 return ret
292
293 def handle_get_as(self, db, ns):
294 """
295 Gets information about Autonomous Systems
296 """
297 ret = 0
298
299 for asn in ns.asn:
300 try:
301 asn = int(asn)
302 except ValueError:
303 print(_("Invalid ASN: %s") % asn, file=sys.stderr)
304 ret = 1
305 continue
306
307 # Fetch AS from database
308 a = db.get_as(asn)
309
310 # Nothing found
311 if not a:
312 print(_("Could not find AS%s") % asn, file=sys.stderr)
313 ret = 1
314 continue
315
316 print(_("AS%(asn)s belongs to %(name)s") % { "asn" : a.number, "name" : a.name })
317
318 return ret
319
320 def handle_search_as(self, db, ns):
321 for query in ns.query:
322 # Print all matches ASes
323 for a in db.search_as(query):
324 print(a)
325
326 def __get_output_formatter(self, ns):
327 try:
328 cls = self.output_formats[ns.output_format]
329 except KeyError:
330 cls = OutputFormatter
331
332 return cls(ns)
333
334 def handle_list_networks_by_as(self, db, ns):
335 with self.__get_output_formatter(ns) as f:
336 for asn in ns.asn:
337 # Print all matching networks
338 for n in db.search_networks(asn=asn, family=ns.family):
339 f.network(n)
340
341 def handle_list_networks_by_cc(self, db, ns):
342 with self.__get_output_formatter(ns) as f:
343 for country_code in ns.country_code:
344 # Print all matching networks
345 for n in db.search_networks(country_code=country_code, family=ns.family):
346 f.network(n)
347
348 def handle_list_networks_by_flags(self, db, ns):
349 flags = 0
350
351 if ns.anonymous_proxy:
352 flags |= location.NETWORK_FLAG_ANONYMOUS_PROXY
353
354 if ns.satellite_provider:
355 flags |= location.NETWORK_FLAG_SATELLITE_PROVIDER
356
357 if ns.anycast:
358 flags |= location.NETWORK_FLAG_ANYCAST
359
360 with self.__get_output_formatter(ns) as f:
361 for n in db.search_networks(flags=flags, family=ns.family):
362 f.network(n)
363
364
365 def main():
366 # Run the command line interface
367 c = CLI()
368 c.run()
369
370 main()