]> git.ipfire.org Git - people/ms/libloc.git/blob - src/python/export.py
export: Skip writing any subnets
[people/ms/libloc.git] / src / python / export.py
1 #!/usr/bin/python3
2 ###############################################################################
3 # #
4 # libloc - A library to determine the location of someone on the Internet #
5 # #
6 # Copyright (C) 2020 IPFire Development Team <info@ipfire.org> #
7 # #
8 # This library is free software; you can redistribute it and/or #
9 # modify it under the terms of the GNU Lesser General Public #
10 # License as published by the Free Software Foundation; either #
11 # version 2.1 of the License, or (at your option) any later version. #
12 # #
13 # This library is distributed in the hope that it will be useful, #
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of #
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU #
16 # Lesser General Public License for more details. #
17 # #
18 ###############################################################################
19
20 import io
21 import ipaddress
22 import logging
23 import os
24 import socket
25
26 import _location
27
28 # Initialise logging
29 log = logging.getLogger("location.export")
30 log.propagate = 1
31
32 flags = {
33 _location.NETWORK_FLAG_ANONYMOUS_PROXY : "A1",
34 _location.NETWORK_FLAG_SATELLITE_PROVIDER : "A2",
35 _location.NETWORK_FLAG_ANYCAST : "A3",
36 }
37
38 class OutputWriter(object):
39 suffix = "networks"
40 mode = "w"
41
42 def __init__(self, f, prefix=None, flatten=True):
43 self.f, self.prefix, self.flatten = f, prefix, flatten
44
45 # The previously written network
46 self._last_network = None
47
48 # Immediately write the header
49 self._write_header()
50
51 @classmethod
52 def open(cls, filename, **kwargs):
53 """
54 Convenience function to open a file
55 """
56 f = open(filename, cls.mode)
57
58 return cls(f, **kwargs)
59
60 def __repr__(self):
61 return "<%s f=%s>" % (self.__class__.__name__, self.f)
62
63 def _flatten(self, network):
64 """
65 Checks if the given network needs to be written to file,
66 or if it is a subnet of the previously written network.
67 """
68 if self._last_network and network.is_subnet_of(self._last_network):
69 return True
70
71 # Remember this network for the next call
72 self._last_network = network
73 return False
74
75 def _write_header(self):
76 """
77 The header of the file
78 """
79 pass
80
81 def _write_footer(self):
82 """
83 The footer of the file
84 """
85 pass
86
87 def _write_network(self, network):
88 self.f.write("%s\n" % network)
89
90 def write(self, network):
91 if self.flatten and self._flatten(network):
92 log.debug("Skipping writing network %s" % network)
93 return
94
95 # Write the network to file
96 self._write_network(network)
97
98 def finish(self):
99 """
100 Called when all data has been written
101 """
102 self._write_footer()
103
104 # Close the file
105 self.f.close()
106
107
108 class IpsetOutputWriter(OutputWriter):
109 """
110 For ipset
111 """
112 suffix = "ipset"
113
114 def _write_header(self):
115 self.f.write("create %s hash:net family inet hashsize 1024 maxelem 65536\n" % self.prefix)
116
117 def _write_network(self, network):
118 self.f.write("add %s %s\n" % (self.prefix, network))
119
120
121 class NftablesOutputWriter(OutputWriter):
122 """
123 For nftables
124 """
125 suffix = "set"
126
127 def _write_header(self):
128 self.f.write("define %s = {\n" % self.prefix)
129
130 def _write_footer(self):
131 self.f.write("}\n")
132
133 def _write_network(self, network):
134 self.f.write(" %s,\n" % network)
135
136
137 class XTGeoIPOutputWriter(OutputWriter):
138 """
139 Formats the output in that way, that it can be loaded by
140 the xt_geoip kernel module from xtables-addons.
141 """
142 suffix = "iv"
143 mode = "wb"
144
145 def _write_network(self, network):
146 n = ipaddress.ip_network("%s" % network)
147
148 for address in (n.network_address, n.broadcast_address):
149 bytes = socket.inet_pton(
150 socket.AF_INET6 if address.version == 6 else socket.AF_INET,
151 "%s" % address,
152 )
153
154 self.f.write(bytes)
155
156
157 formats = {
158 "ipset" : IpsetOutputWriter,
159 "list" : OutputWriter,
160 "nftables" : NftablesOutputWriter,
161 "xt_geoip" : XTGeoIPOutputWriter,
162 }
163
164 class Exporter(object):
165 def __init__(self, db, writer):
166 self.db, self.writer = db, writer
167
168 def export(self, directory, families, countries, asns):
169 for family in families:
170 log.debug("Exporting family %s" % family)
171
172 writers = {}
173
174 # Create writers for countries
175 for country_code in countries:
176 filename = self._make_filename(
177 directory, prefix=country_code, suffix=self.writer.suffix, family=family,
178 )
179
180 writers[country_code] = self.writer.open(filename, prefix="CC_%s" % country_code)
181
182 # Create writers for ASNs
183 for asn in asns:
184 filename = self._make_filename(
185 directory, "AS%s" % asn, suffix=self.writer.suffix, family=family,
186 )
187
188 writers[asn] = self.writer.open(filename, prefix="AS%s" % asn)
189
190 # Get all networks that match the family
191 networks = self.db.search_networks(family=family)
192
193 # Walk through all networks
194 for network in networks:
195 # Write matching countries
196 try:
197 writers[network.country_code].write(network)
198 except KeyError:
199 pass
200
201 # Write matching ASNs
202 try:
203 writers[network.asn].write(network)
204 except KeyError:
205 pass
206
207 # Handle flags
208 for flag in flags:
209 if network.has_flag(flag):
210 # Fetch the "fake" country code
211 country = flags[flag]
212
213 try:
214 writers[country].write(network)
215 except KeyError:
216 pass
217
218 # Write everything to the filesystem
219 for writer in writers.values():
220 writer.finish()
221
222 def _make_filename(self, directory, prefix, suffix, family):
223 filename = "%s.%s%s" % (
224 prefix, suffix, "6" if family == socket.AF_INET6 else "4"
225 )
226
227 return os.path.join(directory, filename)