]> git.ipfire.org Git - location/libloc.git/blob - src/python/location-exporter.in
3e53d4183c5f8e970972fb521e073db589e9e921
[location/libloc.git] / src / python / location-exporter.in
1 #!/usr/bin/python3
2 ###############################################################################
3 # #
4 # libloc - A library to determine the location of someone on the Internet #
5 # #
6 # Copyright (C) 2019 IPFire Development Team <info@ipfire.org> #
7 # #
8 # This library is free software; you can redistribute it and/or #
9 # modify it under the terms of the GNU Lesser General Public #
10 # License as published by the Free Software Foundation; either #
11 # version 2.1 of the License, or (at your option) any later version. #
12 # #
13 # This library is distributed in the hope that it will be useful, #
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of #
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU #
16 # Lesser General Public License for more details. #
17 # #
18 ###############################################################################
19
20 import argparse
21 import io
22 import ipaddress
23 import logging
24 import os.path
25 import socket
26 import sys
27
28 # Load our location module
29 import location
30 from location.i18n import _
31
32 # Initialise logging
33 log = logging.getLogger("location.exporter")
34 log.propagate = 1
35
36 class OutputWriter(object):
37 suffix = "networks"
38
39 def __init__(self, family, country_code=None, asn=None):
40 self.family, self.country_code, self.asn = family, country_code, asn
41
42 self.f = io.BytesIO()
43
44 def write_out(self, directory):
45 # Make the output filename
46 filename = os.path.join(
47 directory, self._make_filename(),
48 )
49
50 with open(filename, "wb") as f:
51 self._write_header(f)
52
53 # Copy all data into the file
54 f.write(self.f.getbuffer())
55
56 self._write_footer(f)
57
58 def _make_filename(self):
59 return "%s.%s%s" % (
60 self.country_code or "AS%s" % self.asn,
61 self.suffix,
62 "6" if self.family == socket.AF_INET6 else "4"
63 )
64
65 @property
66 def name(self):
67 if self.country_code:
68 return "CC_%s" % self.country_code
69
70 if self.asn:
71 return "AS%s" % self.asn
72
73 def _write_header(self, f):
74 """
75 The header of the file
76 """
77 pass
78
79 def _write_footer(self, f):
80 """
81 The footer of the file
82 """
83 pass
84
85 def write(self, network):
86 s = "%s\n" % network
87
88 self.f.write(s.encode("ascii"))
89
90
91 class IpsetOutputWriter(OutputWriter):
92 """
93 For ipset
94 """
95 suffix = "ipset"
96
97 def _write_header(self, f):
98 h = "create %s hash:net family inet hashsize 1024 maxelem 65536\n" % self.name
99
100 f.write(h.encode("ascii"))
101
102 def write(self, network):
103 s = "add %s %s\n" % (self.name, network)
104
105 self.f.write(s.encode("ascii"))
106
107
108 class NftablesOutputWriter(OutputWriter):
109 """
110 For nftables
111 """
112 suffix = "set"
113
114 def _write_header(self, f):
115 h = "define %s = {\n" % self.name
116
117 f.write(h.encode("ascii"))
118
119 def _write_footer(self, f):
120 f.write(b"}")
121
122 def write(self, network):
123 s = " %s,\n" % network
124
125 self.f.write(s.encode("ascii"))
126
127
128 class XTGeoIPOutputWriter(OutputWriter):
129 """
130 Formats the output in that way, that it can be loaded by
131 the xt_geoip kernel module from xtables-addons.
132 """
133 suffix = "iv"
134
135 def write(self, network):
136 n = ipaddress.ip_network("%s" % network)
137
138 for address in (n.network_address, n.broadcast_address):
139 bytes = socket.inet_pton(
140 socket.AF_INET6 if address.version == 6 else socket.AF_INET,
141 "%s" % address,
142 )
143
144 self.f.write(bytes)
145
146
147 class Exporter(object):
148 def __init__(self, db, writer):
149 self.db = db
150 self.writer = writer
151
152 def export(self, directory, families, countries, asns):
153 for family in families:
154 log.debug("Exporting family %s" % family)
155
156 writers = {}
157
158 # Create writers for countries
159 for country_code in countries:
160 writers[country_code] = self.writer(family, country_code=country_code)
161
162 # Create writers for ASNs
163 for asn in asns:
164 writers[asn] = self.writer(family, asn=asn)
165
166 # Get all networks that match the family
167 networks = self.db.search_networks(family=family)
168
169 # Walk through all networks
170 for network in networks:
171 # Write matching countries
172 if network.country_code in countries:
173 writers[network.country_code].write(network)
174
175 # Write matching ASNs
176 if network.asn in asns:
177 writers[network.asn].write(network)
178
179 # Write everything to the filesystem
180 for writer in writers.values():
181 writer.write_out(directory)
182
183
184 class CLI(object):
185 output_formats = {
186 "ipset" : IpsetOutputWriter,
187 "list" : OutputWriter,
188 "nftables" : NftablesOutputWriter,
189 "xt_geoip" : XTGeoIPOutputWriter,
190 }
191
192 def parse_cli(self):
193 parser = argparse.ArgumentParser(
194 description=_("Location Exporter Command Line Interface"),
195 )
196
197 # Global configuration flags
198 parser.add_argument("--debug", action="store_true",
199 help=_("Enable debug output"))
200
201 # version
202 parser.add_argument("--version", action="version",
203 version="%(prog)s @VERSION@")
204
205 # database
206 parser.add_argument("--database", "-d",
207 default="@databasedir@/database.db", help=_("Path to database"),
208 )
209
210 # format
211 parser.add_argument("--format", help=_("Output format"),
212 default="list", choices=self.output_formats.keys())
213
214 # directory
215 parser.add_argument("--directory", help=_("Output directory"), required=True)
216
217 # family
218 parser.add_argument("--family", help=_("Specify address family"), choices=("ipv6", "ipv4"))
219
220 # Countries and Autonomous Systems
221 parser.add_argument("objects", nargs="+")
222
223 args = parser.parse_args()
224
225 # Enable debug logging
226 if args.debug:
227 log.setLevel(logging.DEBUG)
228
229 return args
230
231 def run(self):
232 # Parse command line arguments
233 args = self.parse_cli()
234
235 # Call function
236 ret = self.handle_export(args)
237
238 # Return with exit code
239 if ret:
240 sys.exit(ret)
241
242 # Otherwise just exit
243 sys.exit(0)
244
245 def handle_export(self, ns):
246 countries, asns = [], []
247
248 # Translate family
249 if ns.family == "ipv6":
250 families = [ socket.AF_INET6 ]
251 elif ns.family == "ipv4":
252 families = [ socket.AF_INET ]
253 else:
254 families = [ socket.AF_INET6, socket.AF_INET ]
255
256 for object in ns.objects:
257 if object.startswith("AS"):
258 try:
259 object = int(object[2:])
260 except ValueError:
261 log.error("Invalid argument: %s" % object)
262 return 2
263
264 asns.append(object)
265
266 elif location.country_code_is_valid(object) \
267 or object in ("A1", "A2", "A3"):
268 countries.append(object)
269
270 else:
271 log.error("Invalid argument: %s" % object)
272 return 2
273
274 # Open the database
275 try:
276 db = location.Database(ns.database)
277 except FileNotFoundError as e:
278 log.error("Count not open database: %s" % ns.database)
279 return 1
280
281 # Select the output format
282 writer = self.output_formats.get(ns.format)
283 assert writer
284
285 e = Exporter(db, writer)
286 e.export(ns.directory, countries=countries, asns=asns, families=families)
287
288
289 def main():
290 # Run the command line interface
291 c = CLI()
292 c.run()
293
294 main()