]> git.ipfire.org Git - location/libloc.git/blob - src/python/location-exporter.in
54545619f39a3a8dddbe0981e0f85f721223030f
[location/libloc.git] / src / python / location-exporter.in
1 #!/usr/bin/python3
2 ###############################################################################
3 # #
4 # libloc - A library to determine the location of someone on the Internet #
5 # #
6 # Copyright (C) 2019 IPFire Development Team <info@ipfire.org> #
7 # #
8 # This library is free software; you can redistribute it and/or #
9 # modify it under the terms of the GNU Lesser General Public #
10 # License as published by the Free Software Foundation; either #
11 # version 2.1 of the License, or (at your option) any later version. #
12 # #
13 # This library is distributed in the hope that it will be useful, #
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of #
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU #
16 # Lesser General Public License for more details. #
17 # #
18 ###############################################################################
19
20 import argparse
21 import io
22 import ipaddress
23 import logging
24 import os.path
25 import re
26 import socket
27 import sys
28
29 # Load our location module
30 import location
31 from location.i18n import _
32
33 # Initialise logging
34 log = logging.getLogger("location.exporter")
35 log.propagate = 1
36
37 class OutputWriter(object):
38 suffix = "networks"
39
40 def __init__(self, family, country_code=None, asn=None):
41 self.family, self.country_code, self.asn = family, country_code, asn
42
43 self.f = io.BytesIO()
44
45 def write_out(self, directory):
46 # Make the output filename
47 filename = os.path.join(
48 directory, self._make_filename(),
49 )
50
51 with open(filename, "wb") as f:
52 self._write_header(f)
53
54 # Copy all data into the file
55 f.write(self.f.getbuffer())
56
57 self._write_footer(f)
58
59 def _make_filename(self):
60 return "%s.%s%s" % (
61 self.country_code or "AS%s" % self.asn,
62 self.suffix,
63 "6" if self.family == socket.AF_INET6 else "4"
64 )
65
66 @property
67 def name(self):
68 if self.country_code:
69 return "CC_%s" % self.country_code
70
71 if self.asn:
72 return "AS%s" % self.asn
73
74 def _write_header(self, f):
75 """
76 The header of the file
77 """
78 pass
79
80 def _write_footer(self, f):
81 """
82 The footer of the file
83 """
84 pass
85
86 def write(self, network):
87 s = "%s\n" % network
88
89 self.f.write(s.encode("ascii"))
90
91
92 class IpsetOutputWriter(OutputWriter):
93 """
94 For ipset
95 """
96 suffix = "ipset"
97
98 def _write_header(self, f):
99 h = "create %s hash:net family inet hashsize 1024 maxelem 65536\n" % self.name
100
101 f.write(h.encode("ascii"))
102
103 def write(self, network):
104 s = "add %s %s\n" % (self.name, network)
105
106 self.f.write(s.encode("ascii"))
107
108
109 class NftablesOutputWriter(OutputWriter):
110 """
111 For nftables
112 """
113 suffix = "set"
114
115 def _write_header(self, f):
116 h = "define %s = {\n" % self.name
117
118 f.write(h.encode("ascii"))
119
120 def _write_footer(self, f):
121 f.write(b"}")
122
123 def write(self, network):
124 s = " %s,\n" % network
125
126 self.f.write(s.encode("ascii"))
127
128
129 class XTGeoIPOutputWriter(OutputWriter):
130 """
131 Formats the output in that way, that it can be loaded by
132 the xt_geoip kernel module from xtables-addons.
133 """
134 suffix = "iv"
135
136 def write(self, network):
137 n = ipaddress.ip_network("%s" % network)
138
139 for address in (n.network_address, n.broadcast_address):
140 bytes = socket.inet_pton(
141 socket.AF_INET6 if address.version == 6 else socket.AF_INET,
142 "%s" % address,
143 )
144
145 self.f.write(bytes)
146
147
148 class Exporter(object):
149 def __init__(self, db, writer):
150 self.db = db
151 self.writer = writer
152
153 def export(self, directory, families, countries, asns):
154 for family in families:
155 log.debug("Exporting family %s" % family)
156
157 writers = {}
158
159 # Create writers for countries
160 for country_code in countries:
161 writers[country_code] = self.writer(family, country_code=country_code)
162
163 # Create writers for ASNs
164 for asn in asns:
165 writers[asn] = self.writer(family, asn=asn)
166
167 # Get all networks that match the family
168 networks = self.db.search_networks(family=family)
169
170 # Walk through all networks
171 for network in networks:
172 # Write matching countries
173 if network.country_code in countries:
174 writers[network.country_code].write(network)
175
176 # Write matching ASNs
177 if network.asn in asns:
178 writers[network.asn].write(network)
179
180 # Write everything to the filesystem
181 for writer in writers.values():
182 writer.write_out(directory)
183
184
185 class CLI(object):
186 output_formats = {
187 "ipset" : IpsetOutputWriter,
188 "list" : OutputWriter,
189 "nftables" : NftablesOutputWriter,
190 "xt_geoip" : XTGeoIPOutputWriter,
191 }
192
193 def parse_cli(self):
194 parser = argparse.ArgumentParser(
195 description=_("Location Exporter Command Line Interface"),
196 )
197
198 # Global configuration flags
199 parser.add_argument("--debug", action="store_true",
200 help=_("Enable debug output"))
201 parser.add_argument("--quiet", action="store_true",
202 help=_("Enable quiet mode"))
203
204 # version
205 parser.add_argument("--version", action="version",
206 version="%(prog)s @VERSION@")
207
208 # database
209 parser.add_argument("--database", "-d",
210 default="@databasedir@/database.db", help=_("Path to database"),
211 )
212
213 # format
214 parser.add_argument("--format", help=_("Output format"),
215 default="list", choices=self.output_formats.keys())
216
217 # directory
218 parser.add_argument("--directory", help=_("Output directory"), required=True)
219
220 # family
221 parser.add_argument("--family", help=_("Specify address family"), choices=("ipv6", "ipv4"))
222
223 # Countries and Autonomous Systems
224 parser.add_argument("objects", nargs="+")
225
226 args = parser.parse_args()
227
228 # Configure logging
229 if args.debug:
230 location.logger.set_level(logging.DEBUG)
231 elif args.quiet:
232 location.logger.set_level(logging.WARNING)
233
234 return args
235
236 def run(self):
237 # Parse command line arguments
238 args = self.parse_cli()
239
240 # Call function
241 ret = self.handle_export(args)
242
243 # Return with exit code
244 if ret:
245 sys.exit(ret)
246
247 # Otherwise just exit
248 sys.exit(0)
249
250 def handle_export(self, ns):
251 countries, asns = [], []
252
253 # Translate family
254 if ns.family == "ipv6":
255 families = [ socket.AF_INET6 ]
256 elif ns.family == "ipv4":
257 families = [ socket.AF_INET ]
258 else:
259 families = [ socket.AF_INET6, socket.AF_INET ]
260
261 for object in ns.objects:
262 m = re.match("^AS(\d+)$", object)
263 if m:
264 object = int(m.group(1))
265
266 asns.append(object)
267
268 elif location.country_code_is_valid(object) \
269 or object in ("A1", "A2", "A3"):
270 countries.append(object)
271
272 else:
273 log.error("Invalid argument: %s" % object)
274 return 2
275
276 # Open the database
277 try:
278 db = location.Database(ns.database)
279 except FileNotFoundError as e:
280 log.error("Count not open database: %s" % ns.database)
281 return 1
282
283 # Select the output format
284 writer = self.output_formats.get(ns.format)
285 assert writer
286
287 e = Exporter(db, writer)
288 e.export(ns.directory, countries=countries, asns=asns, families=families)
289
290
291 def main():
292 # Run the command line interface
293 c = CLI()
294 c.run()
295
296 main()