]> git.ipfire.org Git - location/libloc.git/blob - src/python/export.py
4702bcfa08edf0ec7a1c27e8797011d625f8f2dc
[location/libloc.git] / src / python / export.py
1 #!/usr/bin/python3
2 ###############################################################################
3 # #
4 # libloc - A library to determine the location of someone on the Internet #
5 # #
6 # Copyright (C) 2020 IPFire Development Team <info@ipfire.org> #
7 # #
8 # This library is free software; you can redistribute it and/or #
9 # modify it under the terms of the GNU Lesser General Public #
10 # License as published by the Free Software Foundation; either #
11 # version 2.1 of the License, or (at your option) any later version. #
12 # #
13 # This library is distributed in the hope that it will be useful, #
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of #
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU #
16 # Lesser General Public License for more details. #
17 # #
18 ###############################################################################
19
20 import io
21 import ipaddress
22 import logging
23 import os
24 import socket
25
26 import _location
27
28 # Initialise logging
29 log = logging.getLogger("location.export")
30 log.propagate = 1
31
32 FLAGS = {
33 _location.NETWORK_FLAG_ANONYMOUS_PROXY : "A1",
34 _location.NETWORK_FLAG_SATELLITE_PROVIDER : "A2",
35 _location.NETWORK_FLAG_ANYCAST : "A3",
36 }
37
38 class OutputWriter(object):
39 suffix = "networks"
40 mode = "w"
41
42 def __init__(self, f, prefix=None):
43 self.f, self.prefix = f, prefix
44
45 # Immediately write the header
46 self._write_header()
47
48 @classmethod
49 def open(cls, filename, **kwargs):
50 """
51 Convenience function to open a file
52 """
53 f = open(filename, cls.mode)
54
55 return cls(f, **kwargs)
56
57 def __repr__(self):
58 return "<%s f=%s>" % (self.__class__.__name__, self.f)
59
60 def _write_header(self):
61 """
62 The header of the file
63 """
64 pass
65
66 def _write_footer(self):
67 """
68 The footer of the file
69 """
70 pass
71
72 def write(self, network):
73 self.f.write("%s\n" % network)
74
75 def finish(self):
76 """
77 Called when all data has been written
78 """
79 self._write_footer()
80
81 # Close the file
82 self.f.close()
83
84
85 class IpsetOutputWriter(OutputWriter):
86 """
87 For ipset
88 """
89 suffix = "ipset"
90
91 def _write_header(self):
92 self.f.write("create %s hash:net family inet hashsize 1024 maxelem 65536\n" % self.prefix)
93
94 def write(self, network):
95 self.f.write("add %s %s\n" % (self.prefix, network))
96
97
98 class NftablesOutputWriter(OutputWriter):
99 """
100 For nftables
101 """
102 suffix = "set"
103
104 def _write_header(self):
105 self.f.write("define %s = {\n" % self.prefix)
106
107 def _write_footer(self):
108 self.f.write("}\n")
109
110 def write(self, network):
111 self.f.write(" %s,\n" % network)
112
113
114 class XTGeoIPOutputWriter(OutputWriter):
115 """
116 Formats the output in that way, that it can be loaded by
117 the xt_geoip kernel module from xtables-addons.
118 """
119 suffix = "iv"
120 mode = "wb"
121
122 def write(self, network):
123 for address in (network._first_address, network._last_address):
124 self.f.write(address)
125
126
127 formats = {
128 "ipset" : IpsetOutputWriter,
129 "list" : OutputWriter,
130 "nftables" : NftablesOutputWriter,
131 "xt_geoip" : XTGeoIPOutputWriter,
132 }
133
134 class Exporter(object):
135 def __init__(self, db, writer):
136 self.db, self.writer = db, writer
137
138 def export(self, directory, families, countries, asns):
139 for family in families:
140 log.debug("Exporting family %s" % family)
141
142 writers = {}
143
144 # Create writers for countries
145 for country_code in countries:
146 filename = self._make_filename(
147 directory, prefix=country_code, suffix=self.writer.suffix, family=family,
148 )
149
150 writers[country_code] = self.writer.open(filename, prefix="CC_%s" % country_code)
151
152 # Create writers for ASNs
153 for asn in asns:
154 filename = self._make_filename(
155 directory, "AS%s" % asn, suffix=self.writer.suffix, family=family,
156 )
157
158 writers[asn] = self.writer.open(filename, prefix="AS%s" % asn)
159
160 # Filter countries from special country codes
161 country_codes = [
162 country_code for country_code in countries if not country_code in FLAGS.values()
163 ]
164
165 # Get all networks that match the family
166 networks = self.db.search_networks(family=family,
167 country_codes=country_codes, asns=asns, flatten=True)
168
169 # Walk through all networks
170 for network in networks:
171 # Write matching countries
172 try:
173 writers[network.country_code].write(network)
174 except KeyError:
175 pass
176
177 # Write matching ASNs
178 try:
179 writers[network.asn].write(network)
180 except KeyError:
181 pass
182
183 # Handle flags
184 for flag in FLAGS:
185 if network.has_flag(flag):
186 # Fetch the "fake" country code
187 country = FLAGS[flag]
188
189 try:
190 writers[country].write(network)
191 except KeyError:
192 pass
193
194 # Write everything to the filesystem
195 for writer in writers.values():
196 writer.finish()
197
198 def _make_filename(self, directory, prefix, suffix, family):
199 filename = "%s.%s%s" % (
200 prefix, suffix, "6" if family == socket.AF_INET6 else "4"
201 )
202
203 return os.path.join(directory, filename)