]> git.ipfire.org Git - location/libloc.git/blob - src/python/location-exporter.in
Add quiet mode
[location/libloc.git] / src / python / location-exporter.in
1 #!/usr/bin/python3
2 ###############################################################################
3 # #
4 # libloc - A library to determine the location of someone on the Internet #
5 # #
6 # Copyright (C) 2019 IPFire Development Team <info@ipfire.org> #
7 # #
8 # This library is free software; you can redistribute it and/or #
9 # modify it under the terms of the GNU Lesser General Public #
10 # License as published by the Free Software Foundation; either #
11 # version 2.1 of the License, or (at your option) any later version. #
12 # #
13 # This library is distributed in the hope that it will be useful, #
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of #
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU #
16 # Lesser General Public License for more details. #
17 # #
18 ###############################################################################
19
20 import argparse
21 import io
22 import ipaddress
23 import logging
24 import os.path
25 import socket
26 import sys
27
28 # Load our location module
29 import location
30 from location.i18n import _
31
32 # Initialise logging
33 log = logging.getLogger("location.exporter")
34 log.propagate = 1
35
36 class OutputWriter(object):
37 suffix = "networks"
38
39 def __init__(self, family, country_code=None, asn=None):
40 self.family, self.country_code, self.asn = family, country_code, asn
41
42 self.f = io.BytesIO()
43
44 def write_out(self, directory):
45 # Make the output filename
46 filename = os.path.join(
47 directory, self._make_filename(),
48 )
49
50 with open(filename, "wb") as f:
51 self._write_header(f)
52
53 # Copy all data into the file
54 f.write(self.f.getbuffer())
55
56 self._write_footer(f)
57
58 def _make_filename(self):
59 return "%s.%s%s" % (
60 self.country_code or "AS%s" % self.asn,
61 self.suffix,
62 "6" if self.family == socket.AF_INET6 else "4"
63 )
64
65 @property
66 def name(self):
67 if self.country_code:
68 return "CC_%s" % self.country_code
69
70 if self.asn:
71 return "AS%s" % self.asn
72
73 def _write_header(self, f):
74 """
75 The header of the file
76 """
77 pass
78
79 def _write_footer(self, f):
80 """
81 The footer of the file
82 """
83 pass
84
85 def write(self, network):
86 s = "%s\n" % network
87
88 self.f.write(s.encode("ascii"))
89
90
91 class IpsetOutputWriter(OutputWriter):
92 """
93 For ipset
94 """
95 suffix = "ipset"
96
97 def _write_header(self, f):
98 h = "create %s hash:net family inet hashsize 1024 maxelem 65536\n" % self.name
99
100 f.write(h.encode("ascii"))
101
102 def write(self, network):
103 s = "add %s %s\n" % (self.name, network)
104
105 self.f.write(s.encode("ascii"))
106
107
108 class NftablesOutputWriter(OutputWriter):
109 """
110 For nftables
111 """
112 suffix = "set"
113
114 def _write_header(self, f):
115 h = "define %s = {\n" % self.name
116
117 f.write(h.encode("ascii"))
118
119 def _write_footer(self, f):
120 f.write(b"}")
121
122 def write(self, network):
123 s = " %s,\n" % network
124
125 self.f.write(s.encode("ascii"))
126
127
128 class XTGeoIPOutputWriter(OutputWriter):
129 """
130 Formats the output in that way, that it can be loaded by
131 the xt_geoip kernel module from xtables-addons.
132 """
133 suffix = "iv"
134
135 def write(self, network):
136 n = ipaddress.ip_network("%s" % network)
137
138 for address in (n.network_address, n.broadcast_address):
139 bytes = socket.inet_pton(
140 socket.AF_INET6 if address.version == 6 else socket.AF_INET,
141 "%s" % address,
142 )
143
144 self.f.write(bytes)
145
146
147 class Exporter(object):
148 def __init__(self, db, writer):
149 self.db = db
150 self.writer = writer
151
152 def export(self, directory, families, countries, asns):
153 for family in families:
154 log.debug("Exporting family %s" % family)
155
156 writers = {}
157
158 # Create writers for countries
159 for country_code in countries:
160 writers[country_code] = self.writer(family, country_code=country_code)
161
162 # Create writers for ASNs
163 for asn in asns:
164 writers[asn] = self.writer(family, asn=asn)
165
166 # Get all networks that match the family
167 networks = self.db.search_networks(family=family)
168
169 # Walk through all networks
170 for network in networks:
171 # Write matching countries
172 if network.country_code in countries:
173 writers[network.country_code].write(network)
174
175 # Write matching ASNs
176 if network.asn in asns:
177 writers[network.asn].write(network)
178
179 # Write everything to the filesystem
180 for writer in writers.values():
181 writer.write_out(directory)
182
183
184 class CLI(object):
185 output_formats = {
186 "ipset" : IpsetOutputWriter,
187 "list" : OutputWriter,
188 "nftables" : NftablesOutputWriter,
189 "xt_geoip" : XTGeoIPOutputWriter,
190 }
191
192 def parse_cli(self):
193 parser = argparse.ArgumentParser(
194 description=_("Location Exporter Command Line Interface"),
195 )
196
197 # Global configuration flags
198 parser.add_argument("--debug", action="store_true",
199 help=_("Enable debug output"))
200 parser.add_argument("--quiet", action="store_true",
201 help=_("Enable quiet mode"))
202
203 # version
204 parser.add_argument("--version", action="version",
205 version="%(prog)s @VERSION@")
206
207 # database
208 parser.add_argument("--database", "-d",
209 default="@databasedir@/database.db", help=_("Path to database"),
210 )
211
212 # format
213 parser.add_argument("--format", help=_("Output format"),
214 default="list", choices=self.output_formats.keys())
215
216 # directory
217 parser.add_argument("--directory", help=_("Output directory"), required=True)
218
219 # family
220 parser.add_argument("--family", help=_("Specify address family"), choices=("ipv6", "ipv4"))
221
222 # Countries and Autonomous Systems
223 parser.add_argument("objects", nargs="+")
224
225 args = parser.parse_args()
226
227 # Configure logging
228 if args.debug:
229 location.logger.set_level(logging.DEBUG)
230 elif args.quiet:
231 location.logger.set_level(logging.WARNING)
232
233 return args
234
235 def run(self):
236 # Parse command line arguments
237 args = self.parse_cli()
238
239 # Call function
240 ret = self.handle_export(args)
241
242 # Return with exit code
243 if ret:
244 sys.exit(ret)
245
246 # Otherwise just exit
247 sys.exit(0)
248
249 def handle_export(self, ns):
250 countries, asns = [], []
251
252 # Translate family
253 if ns.family == "ipv6":
254 families = [ socket.AF_INET6 ]
255 elif ns.family == "ipv4":
256 families = [ socket.AF_INET ]
257 else:
258 families = [ socket.AF_INET6, socket.AF_INET ]
259
260 for object in ns.objects:
261 if object.startswith("AS"):
262 try:
263 object = int(object[2:])
264 except ValueError:
265 log.error("Invalid argument: %s" % object)
266 return 2
267
268 asns.append(object)
269
270 elif location.country_code_is_valid(object) \
271 or object in ("A1", "A2", "A3"):
272 countries.append(object)
273
274 else:
275 log.error("Invalid argument: %s" % object)
276 return 2
277
278 # Open the database
279 try:
280 db = location.Database(ns.database)
281 except FileNotFoundError as e:
282 log.error("Count not open database: %s" % ns.database)
283 return 1
284
285 # Select the output format
286 writer = self.output_formats.get(ns.format)
287 assert writer
288
289 e = Exporter(db, writer)
290 e.export(ns.directory, countries=countries, asns=asns, families=families)
291
292
293 def main():
294 # Run the command line interface
295 c = CLI()
296 c.run()
297
298 main()