]> git.ipfire.org Git - location/libloc.git/blob - src/python/export.py
3b9e1e0262634a0fded1c43a2b7f0dce6ed8fe12
[location/libloc.git] / src / python / export.py
1 #!/usr/bin/python3
2 ###############################################################################
3 # #
4 # libloc - A library to determine the location of someone on the Internet #
5 # #
6 # Copyright (C) 2020-2021 IPFire Development Team <info@ipfire.org> #
7 # #
8 # This library is free software; you can redistribute it and/or #
9 # modify it under the terms of the GNU Lesser General Public #
10 # License as published by the Free Software Foundation; either #
11 # version 2.1 of the License, or (at your option) any later version. #
12 # #
13 # This library is distributed in the hope that it will be useful, #
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of #
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU #
16 # Lesser General Public License for more details. #
17 # #
18 ###############################################################################
19
20 import io
21 import ipaddress
22 import logging
23 import os
24 import socket
25
26 import _location
27
28 # Initialise logging
29 log = logging.getLogger("location.export")
30 log.propagate = 1
31
32 FLAGS = {
33 _location.NETWORK_FLAG_ANONYMOUS_PROXY : "A1",
34 _location.NETWORK_FLAG_SATELLITE_PROVIDER : "A2",
35 _location.NETWORK_FLAG_ANYCAST : "A3",
36 _location.NETWORK_FLAG_DROP : "XD",
37 }
38
39 class OutputWriter(object):
40 suffix = "networks"
41 mode = "w"
42
43 def __init__(self, f, prefix=None):
44 self.f, self.prefix = f, prefix
45
46 # Immediately write the header
47 self._write_header()
48
49 @classmethod
50 def open(cls, filename, **kwargs):
51 """
52 Convenience function to open a file
53 """
54 f = open(filename, cls.mode)
55
56 return cls(f, **kwargs)
57
58 def __repr__(self):
59 return "<%s f=%s>" % (self.__class__.__name__, self.f)
60
61 def _write_header(self):
62 """
63 The header of the file
64 """
65 pass
66
67 def _write_footer(self):
68 """
69 The footer of the file
70 """
71 pass
72
73 def write(self, network):
74 self.f.write("%s\n" % network)
75
76 def finish(self):
77 """
78 Called when all data has been written
79 """
80 self._write_footer()
81
82 # Close the file
83 self.f.close()
84
85
86 class IpsetOutputWriter(OutputWriter):
87 """
88 For ipset
89 """
90 suffix = "ipset"
91
92 def _write_header(self):
93 self.f.write("create %s hash:net family inet hashsize 1024 maxelem 65536\n" % self.prefix)
94
95 def write(self, network):
96 self.f.write("add %s %s\n" % (self.prefix, network))
97
98
99 class NftablesOutputWriter(OutputWriter):
100 """
101 For nftables
102 """
103 suffix = "set"
104
105 def _write_header(self):
106 self.f.write("define %s = {\n" % self.prefix)
107
108 def _write_footer(self):
109 self.f.write("}\n")
110
111 def write(self, network):
112 self.f.write(" %s,\n" % network)
113
114
115 class XTGeoIPOutputWriter(OutputWriter):
116 """
117 Formats the output in that way, that it can be loaded by
118 the xt_geoip kernel module from xtables-addons.
119 """
120 suffix = "iv"
121 mode = "wb"
122
123 def write(self, network):
124 self.f.write(network._first_address)
125 self.f.write(network._last_address)
126
127
128 formats = {
129 "ipset" : IpsetOutputWriter,
130 "list" : OutputWriter,
131 "nftables" : NftablesOutputWriter,
132 "xt_geoip" : XTGeoIPOutputWriter,
133 }
134
135 class Exporter(object):
136 def __init__(self, db, writer):
137 self.db, self.writer = db, writer
138
139 def export(self, directory, families, countries, asns):
140 for family in families:
141 log.debug("Exporting family %s" % family)
142
143 writers = {}
144
145 # Create writers for countries
146 for country_code in countries:
147 filename = self._make_filename(
148 directory, prefix=country_code, suffix=self.writer.suffix, family=family,
149 )
150
151 writers[country_code] = self.writer.open(filename, prefix="CC_%s" % country_code)
152
153 # Create writers for ASNs
154 for asn in asns:
155 filename = self._make_filename(
156 directory, "AS%s" % asn, suffix=self.writer.suffix, family=family,
157 )
158
159 writers[asn] = self.writer.open(filename, prefix="AS%s" % asn)
160
161 # Filter countries from special country codes
162 country_codes = [
163 country_code for country_code in countries if not country_code in FLAGS.values()
164 ]
165
166 # Get all networks that match the family
167 networks = self.db.search_networks(family=family,
168 country_codes=country_codes, asns=asns, flatten=True)
169
170 # Walk through all networks
171 for network in networks:
172 # Write matching countries
173 try:
174 writers[network.country_code].write(network)
175 except KeyError:
176 pass
177
178 # Write matching ASNs
179 try:
180 writers[network.asn].write(network)
181 except KeyError:
182 pass
183
184 # Handle flags
185 for flag in FLAGS:
186 if network.has_flag(flag):
187 # Fetch the "fake" country code
188 country = FLAGS[flag]
189
190 try:
191 writers[country].write(network)
192 except KeyError:
193 pass
194
195 # Write everything to the filesystem
196 for writer in writers.values():
197 writer.finish()
198
199 def _make_filename(self, directory, prefix, suffix, family):
200 filename = "%s.%s%s" % (
201 prefix, suffix, "6" if family == socket.AF_INET6 else "4"
202 )
203
204 return os.path.join(directory, filename)