]> git.ipfire.org Git - people/ms/libloc.git/blob - src/python/location-exporter.in
location-exporter: ipset: Add line break after header
[people/ms/libloc.git] / src / python / location-exporter.in
1 #!/usr/bin/python3
2 ###############################################################################
3 # #
4 # libloc - A library to determine the location of someone on the Internet #
5 # #
6 # Copyright (C) 2019 IPFire Development Team <info@ipfire.org> #
7 # #
8 # This library is free software; you can redistribute it and/or #
9 # modify it under the terms of the GNU Lesser General Public #
10 # License as published by the Free Software Foundation; either #
11 # version 2.1 of the License, or (at your option) any later version. #
12 # #
13 # This library is distributed in the hope that it will be useful, #
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of #
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU #
16 # Lesser General Public License for more details. #
17 # #
18 ###############################################################################
19
20 import argparse
21 import gettext
22 import io
23 import ipaddress
24 import logging
25 import logging.handlers
26 import os.path
27 import socket
28 import sys
29
30 # Load our location module
31 import location
32
33 def setup_logging(level=logging.INFO):
34 l = logging.getLogger("location-downloader")
35 l.setLevel(level)
36
37 # Log to console
38 h = logging.StreamHandler()
39 h.setLevel(logging.DEBUG)
40 l.addHandler(h)
41
42 # Log to syslog
43 h = logging.handlers.SysLogHandler(address="/dev/log",
44 facility=logging.handlers.SysLogHandler.LOG_DAEMON)
45 h.setLevel(logging.INFO)
46 l.addHandler(h)
47
48 # Format syslog messages
49 formatter = logging.Formatter("location-exporter[%(process)d]: %(message)s")
50 h.setFormatter(formatter)
51
52 return l
53
54 # Initialise logging
55 log = setup_logging()
56
57 # i18n
58 def _(singular, plural=None, n=None):
59 if plural:
60 return gettext.dngettext("libloc", singular, plural, n)
61
62 return gettext.dgettext("libloc", singular)
63
64 class OutputWriter(object):
65 suffix = "networks"
66
67 def __init__(self, family, country_code=None, asn=None):
68 self.family, self.country_code, self.asn = family, country_code, asn
69
70 self.f = io.BytesIO()
71
72 def write_out(self, directory):
73 # Make the output filename
74 filename = os.path.join(
75 directory, self._make_filename(),
76 )
77
78 with open(filename, "wb") as f:
79 self._write_header(f)
80
81 # Copy all data into the file
82 f.write(self.f.getbuffer())
83
84 self._write_footer(f)
85
86 def _make_filename(self):
87 return "%s.%s%s" % (
88 self.country_code or "AS%s" % self.asn,
89 self.suffix,
90 "6" if self.family == socket.AF_INET6 else "4"
91 )
92
93 @property
94 def name(self):
95 if self.country_code:
96 return "CC_%s" % self.country_code
97
98 if self.asn:
99 return "AS%s" % self.asn
100
101 def _write_header(self, f):
102 """
103 The header of the file
104 """
105 pass
106
107 def _write_footer(self, f):
108 """
109 The footer of the file
110 """
111 pass
112
113 def write(self, network):
114 s = "%s\n" % network
115
116 self.f.write(s.encode("ascii"))
117
118
119 class IpsetOutputWriter(OutputWriter):
120 """
121 For ipset
122 """
123 suffix = "ipset"
124
125 def _write_header(self, f):
126 h = "create %s hash:net family inet hashsize 1024 maxelem 65536\n" % self.name
127
128 f.write(h.encode("ascii"))
129
130 def write(self, network):
131 s = "add %s %s\n" % (self.name, network)
132
133 self.f.write(s.encode("ascii"))
134
135
136 class NftablesOutputWriter(OutputWriter):
137 """
138 For nftables
139 """
140 suffix = "set"
141
142 def _write_header(self, f):
143 h = "define %s = {\n" % self.name
144
145 f.write(h.encode("ascii"))
146
147 def _write_footer(self, f):
148 f.write(b"}")
149
150 def write(self, network):
151 s = " %s,\n" % network
152
153 self.f.write(s.encode("ascii"))
154
155
156 class XTGeoIPOutputWriter(OutputWriter):
157 """
158 Formats the output in that way, that it can be loaded by
159 the xt_geoip kernel module from xtables-addons.
160 """
161 suffix = "iv"
162
163 def write(self, network):
164 n = ipaddress.ip_network("%s" % network)
165
166 for address in (n.network_address, n.broadcast_address):
167 bytes = socket.inet_pton(
168 socket.AF_INET6 if address.version == 6 else socket.AF_INET,
169 "%s" % address,
170 )
171
172 self.f.write(bytes)
173
174
175 class Exporter(object):
176 def __init__(self, db, writer):
177 self.db = db
178 self.writer = writer
179
180 def export(self, directory, countries, asns):
181 for family in (socket.AF_INET6, socket.AF_INET):
182 log.debug("Exporting family %s" % family)
183
184 writers = {}
185
186 # Create writers for countries
187 for country_code in countries:
188 writers[country_code] = self.writer(family, country_code=country_code)
189
190 # Create writers for ASNs
191 for asn in asns:
192 writers[asn] = self.writer(family, asn=asn)
193
194 # Get all networks that match the family
195 networks = self.db.search_networks(family=family)
196
197 # Walk through all networks
198 for network in networks:
199 # Write matching countries
200 if network.country_code in countries:
201 writers[network.country_code].write(network)
202
203 # Write matching ASNs
204 if network.asn in asns:
205 writers[network.asn].write(network)
206
207 # Write everything to the filesystem
208 for writer in writers.values():
209 writer.write_out(directory)
210
211
212 class CLI(object):
213 output_formats = {
214 "ipset" : IpsetOutputWriter,
215 "list" : OutputWriter,
216 "nftables" : NftablesOutputWriter,
217 "xt_geoip" : XTGeoIPOutputWriter,
218 }
219
220 def parse_cli(self):
221 parser = argparse.ArgumentParser(
222 description=_("Location Exporter Command Line Interface"),
223 )
224
225 # Global configuration flags
226 parser.add_argument("--debug", action="store_true",
227 help=_("Enable debug output"))
228
229 # version
230 parser.add_argument("--version", action="version",
231 version="%%(prog)s %s" % location.__version__)
232
233 # database
234 parser.add_argument("--database", "-d",
235 default="@databasedir@/database.db", help=_("Path to database"),
236 )
237
238 # format
239 parser.add_argument("--format", help=_("Output format"),
240 )#default="list", choices=self.output_formats.keys())
241
242 # directory
243 parser.add_argument("--directory", help=_("Output directory"), required=True)
244
245 # Countries and Autonomous Systems
246 parser.add_argument("objects", nargs="+")
247
248 args = parser.parse_args()
249
250 # Enable debug logging
251 if args.debug:
252 log.setLevel(logging.DEBUG)
253
254 return args
255
256 def run(self):
257 # Parse command line arguments
258 args = self.parse_cli()
259
260 # Call function
261 ret = self.handle_export(args)
262
263 # Return with exit code
264 if ret:
265 sys.exit(ret)
266
267 # Otherwise just exit
268 sys.exit(0)
269
270 def handle_export(self, ns):
271 countries, asns = [], []
272
273 for object in ns.objects:
274 if object.startswith("AS"):
275 try:
276 object = int(object[2:])
277 except ValueError:
278 log.error("Invalid argument: %s" % object)
279 return 2
280
281 asns.append(object)
282
283 elif not len(object) == 2:
284 log.error("Invalid argument: %s" % object)
285 return 2
286
287 else:
288 countries.append(object)
289
290 # Open the database
291 try:
292 db = location.Database(ns.database)
293 except FileNotFoundError as e:
294 log.error("Count not open database: %s" % ns.database)
295 return 1
296
297 # Select the output format
298 writer = self.output_formats.get(ns.format)
299 assert writer
300
301 e = Exporter(db, writer)
302 e.export(ns.directory, countries=countries, asns=asns)
303
304
305 def main():
306 # Run the command line interface
307 c = CLI()
308 c.run()
309
310 main()