]> git.ipfire.org Git - location/libloc.git/blob - src/python/location-query.in
location-query: Support listing networks for xt_geoip
[location/libloc.git] / src / python / location-query.in
1 #!/usr/bin/python3
2 ###############################################################################
3 # #
4 # libloc - A library to determine the location of someone on the Internet #
5 # #
6 # Copyright (C) 2017 IPFire Development Team <info@ipfire.org> #
7 # #
8 # This library is free software; you can redistribute it and/or #
9 # modify it under the terms of the GNU Lesser General Public #
10 # License as published by the Free Software Foundation; either #
11 # version 2.1 of the License, or (at your option) any later version. #
12 # #
13 # This library is distributed in the hope that it will be useful, #
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of #
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU #
16 # Lesser General Public License for more details. #
17 # #
18 ###############################################################################
19
20 import argparse
21 import gettext
22 import ipaddress
23 import os
24 import socket
25 import sys
26 import syslog
27
28 # Load our location module
29 import location
30
31 # i18n
32 def _(singular, plural=None, n=None):
33 if plural:
34 return gettext.dngettext("libloc", singular, plural, n)
35
36 return gettext.dgettext("libloc", singular)
37
38 # Output formatters
39
40 class OutputFormatter(object):
41 def __enter__(self):
42 # Open the output
43 self.open()
44
45 return self
46
47 def __exit__(self, type, value, tb):
48 if tb is None:
49 self.close()
50
51 def open(self):
52 pass
53
54 def close(self):
55 pass
56
57 def network(self, network):
58 print(network)
59
60
61 class XTGeoIPOutputFormatter(OutputFormatter):
62 """
63 Formats the output in that way, that it can be loaded by
64 the xt_geoip kernel module from xtables-addons.
65 """
66 def network(self, network):
67 n = ipaddress.ip_network("%s" % network)
68
69 for address in (n.network_address, n.broadcast_address):
70 bytes = socket.inet_pton(
71 socket.AF_INET6 if address.version == 6 else socket.AF_INET,
72 "%s" % address,
73 )
74
75 os.write(1, bytes)
76
77
78 class CLI(object):
79 output_formats = {
80 "list" : OutputFormatter,
81 "xt_geoip" : XTGeoIPOutputFormatter,
82 }
83
84 def parse_cli(self):
85 parser = argparse.ArgumentParser(
86 description=_("Location Database Command Line Interface"),
87 )
88 subparsers = parser.add_subparsers()
89
90 # Global configuration flags
91 parser.add_argument("--debug", action="store_true",
92 help=_("Enable debug output"))
93
94 # version
95 parser.add_argument("--version", action="version",
96 version="%%(prog)s %s" % location.__version__)
97
98 # database
99 parser.add_argument("--database", "-d",
100 default="@databasedir@/database.db", help=_("Path to database"),
101 )
102
103 # lookup an IP address
104 lookup = subparsers.add_parser("lookup",
105 help=_("Lookup one or multiple IP addresses"),
106 )
107 lookup.add_argument("address", nargs="+")
108 lookup.set_defaults(func=self.handle_lookup)
109
110 # Get AS
111 get_as = subparsers.add_parser("get-as",
112 help=_("Get information about one or multiple Autonomous Systems"),
113 )
114 get_as.add_argument("asn", nargs="+")
115 get_as.set_defaults(func=self.handle_get_as)
116
117 # Search for AS
118 search_as = subparsers.add_parser("search-as",
119 help=_("Search for Autonomous Systems that match the string"),
120 )
121 search_as.add_argument("query", nargs=1)
122 search_as.set_defaults(func=self.handle_search_as)
123
124 # List all networks in an AS
125 list_networks_by_as = subparsers.add_parser("list-networks-by-as",
126 help=_("Lists all networks in an AS"),
127 )
128 list_networks_by_as.add_argument("asn", nargs=1, type=int)
129 list_networks_by_as.add_argument("--output-format",
130 choices=self.output_formats.keys(), default="list")
131 list_networks_by_as.set_defaults(func=self.handle_list_networks_by_as)
132
133 # List all networks in a country
134 list_networks_by_cc = subparsers.add_parser("list-networks-by-cc",
135 help=_("Lists all networks in a country"),
136 )
137 list_networks_by_cc.add_argument("country_code", nargs=1)
138 list_networks_by_cc.add_argument("--output-format",
139 choices=self.output_formats.keys(), default="list")
140 list_networks_by_cc.set_defaults(func=self.handle_list_networks_by_cc)
141
142 args = parser.parse_args()
143
144 # Print usage if no action was given
145 if not "func" in args:
146 parser.print_usage()
147 sys.exit(2)
148
149 return args
150
151 def run(self):
152 # Parse command line arguments
153 args = self.parse_cli()
154
155 # Open database
156 try:
157 db = location.Database(args.database)
158 except FileNotFoundError as e:
159 sys.stderr.write("location-query: Could not open database %s: %s\n" \
160 % (args.database, e))
161 sys.exit(1)
162
163 # Call function
164 ret = args.func(db, args)
165
166 # Return with exit code
167 if ret:
168 sys.exit(ret)
169
170 # Otherwise just exit
171 sys.exit(0)
172
173 def handle_lookup(self, db, ns):
174 ret = 0
175
176 for address in ns.address:
177 try:
178 n = db.lookup(address)
179 except ValueError:
180 print(_("Invalid IP address: %s") % address, file=sys.stderr)
181
182 args = {
183 "address" : address,
184 "network" : n,
185 }
186
187 # Nothing found?
188 if not n:
189 print(_("Nothing found for %(address)s") % args, file=sys.stderr)
190 ret = 1
191 continue
192
193 # Try to retrieve the AS if we have an AS number
194 if n.asn:
195 a = db.get_as(n.asn)
196
197 # If we have found an AS we will print it in the message
198 if a:
199 args.update({
200 "as" : a,
201 })
202
203 print(_("%(address)s belongs to %(network)s which is a part of %(as)s") % args)
204 continue
205
206 print(_("%(address)s belongs to %(network)s") % args)
207
208 return ret
209
210 def handle_get_as(self, db, ns):
211 """
212 Gets information about Autonomous Systems
213 """
214 ret = 0
215
216 for asn in ns.asn:
217 try:
218 asn = int(asn)
219 except ValueError:
220 print(_("Invalid ASN: %s") % asn, file=sys.stderr)
221 ret = 1
222 continue
223
224 # Fetch AS from database
225 a = db.get_as(asn)
226
227 # Nothing found
228 if not a:
229 print(_("Could not find AS%s") % asn, file=sys.stderr)
230 ret = 1
231 continue
232
233 print(_("AS%(asn)s belongs to %(name)s") % { "asn" : a.number, "name" : a.name })
234
235 return ret
236
237 def handle_search_as(self, db, ns):
238 for query in ns.query:
239 # Print all matches ASes
240 for a in db.search_as(query):
241 print(a)
242
243 def __get_output_formatter(self, ns):
244 try:
245 cls = self.output_formats[ns.output_format]
246 except KeyError:
247 cls = OutputFormatter
248
249 return cls()
250
251 def handle_list_networks_by_as(self, db, ns):
252 with self.__get_output_formatter(ns) as f:
253 for asn in ns.asn:
254 # Print all matching networks
255 for n in db.search_networks(asn=asn):
256 f.network(n)
257
258 def handle_list_networks_by_cc(self, db, ns):
259 with self.__get_output_formatter(ns) as f:
260 for country_code in ns.country_code:
261 # Print all matching networks
262 for n in db.search_networks(country_code=country_code):
263 f.network(n)
264
265
266 def main():
267 # Run the command line interface
268 c = CLI()
269 c.run()
270
271 main()