]>
git.ipfire.org Git - people/ms/libloc.git/blob - src/python/export.py
2 ###############################################################################
4 # libloc - A library to determine the location of someone on the Internet #
6 # Copyright (C) 2020 IPFire Development Team <info@ipfire.org> #
8 # This library is free software; you can redistribute it and/or #
9 # modify it under the terms of the GNU Lesser General Public #
10 # License as published by the Free Software Foundation; either #
11 # version 2.1 of the License, or (at your option) any later version. #
13 # This library is distributed in the hope that it will be useful, #
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of #
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU #
16 # Lesser General Public License for more details. #
18 ###############################################################################
27 log
= logging
.getLogger("location.export")
30 class OutputWriter(object):
34 def __init__(self
, f
, prefix
=None):
35 self
.f
, self
.prefix
= f
, prefix
37 # Immediately write the header
41 def open(cls
, filename
, **kwargs
):
43 Convenience function to open a file
45 f
= open(filename
, cls
.mode
)
47 return cls(f
, **kwargs
)
50 return "<%s f=%s>" % (self
.__class
__.__name
__, self
.f
)
52 def _write_header(self
):
54 The header of the file
58 def _write_footer(self
):
60 The footer of the file
64 def write(self
, network
):
65 self
.f
.write("%s\n" % network
)
69 Called when all data has been written
77 class IpsetOutputWriter(OutputWriter
):
83 def _write_header(self
):
84 self
.f
.write("create %s hash:net family inet hashsize 1024 maxelem 65536\n" % self
.prefix
)
86 def write(self
, network
):
87 self
.f
.write("add %s %s\n" % (self
.prefix
, network
))
90 class NftablesOutputWriter(OutputWriter
):
96 def _write_header(self
):
97 self
.f
.write("define %s = {\n" % self
.prefix
)
99 def _write_footer(self
):
102 def write(self
, network
):
103 self
.f
.write(" %s,\n" % network
)
106 class XTGeoIPOutputWriter(OutputWriter
):
108 Formats the output in that way, that it can be loaded by
109 the xt_geoip kernel module from xtables-addons.
114 def write(self
, network
):
115 n
= ipaddress
.ip_network("%s" % network
)
117 for address
in (n
.network_address
, n
.broadcast_address
):
118 bytes
= socket
.inet_pton(
119 socket
.AF_INET6
if address
.version
== 6 else socket
.AF_INET
,
127 "ipset" : IpsetOutputWriter
,
128 "list" : OutputWriter
,
129 "nftables" : NftablesOutputWriter
,
130 "xt_geoip" : XTGeoIPOutputWriter
,
133 class Exporter(object):
134 def __init__(self
, db
, writer
):
135 self
.db
, self
.writer
= db
, writer
137 def export(self
, directory
, families
, countries
, asns
):
138 for family
in families
:
139 log
.debug("Exporting family %s" % family
)
143 # Create writers for countries
144 for country_code
in countries
:
145 filename
= self
._make
_filename
(
146 directory
, prefix
=country_code
, suffix
=self
.writer
.suffix
, family
=family
,
149 writers
[country_code
] = self
.writer
.open(filename
, prefix
="CC_%s" % country_code
)
151 # Create writers for ASNs
153 filename
= self
._make
_filename
(
154 directory
, "AS%s" % asn
, suffix
=self
.writer
.suffix
, family
=family
,
157 writers
[asn
] = self
.writer
.open(filename
, prefix
="AS%s" % asn
)
159 # Get all networks that match the family
160 networks
= self
.db
.search_networks(family
=family
)
162 # Walk through all networks
163 for network
in networks
:
164 # Write matching countries
166 writers
[network
.country_code
].write(network
)
170 # Write matching ASNs
172 writers
[network
.asn
].write(network
)
176 # Write everything to the filesystem
177 for writer
in writers
.values():
180 def _make_filename(self
, directory
, prefix
, suffix
, family
):
181 filename
= "%s.%s%s" % (
182 prefix
, suffix
, "6" if family
== socket
.AF_INET6
else "4"
185 return os
.path
.join(directory
, filename
)