]>
git.ipfire.org Git - location/libloc.git/blob - src/python/location-exporter.in
2 ###############################################################################
4 # libloc - A library to determine the location of someone on the Internet #
6 # Copyright (C) 2019 IPFire Development Team <info@ipfire.org> #
8 # This library is free software; you can redistribute it and/or #
9 # modify it under the terms of the GNU Lesser General Public #
10 # License as published by the Free Software Foundation; either #
11 # version 2.1 of the License, or (at your option) any later version. #
13 # This library is distributed in the hope that it will be useful, #
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of #
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU #
16 # Lesser General Public License for more details. #
18 ###############################################################################
28 # Load our location module
30 from location
.i18n
import _
33 log
= logging
.getLogger("location.exporter")
36 class OutputWriter(object):
39 def __init__(self
, family
, country_code
=None, asn
=None):
40 self
.family
, self
.country_code
, self
.asn
= family
, country_code
, asn
44 def write_out(self
, directory
):
45 # Make the output filename
46 filename
= os
.path
.join(
47 directory
, self
._make
_filename
(),
50 with
open(filename
, "wb") as f
:
53 # Copy all data into the file
54 f
.write(self
.f
.getbuffer())
58 def _make_filename(self
):
60 self
.country_code
or "AS%s" % self
.asn
,
62 "6" if self
.family
== socket
.AF_INET6
else "4"
68 return "CC_%s" % self
.country_code
71 return "AS%s" % self
.asn
73 def _write_header(self
, f
):
75 The header of the file
79 def _write_footer(self
, f
):
81 The footer of the file
85 def write(self
, network
):
88 self
.f
.write(s
.encode("ascii"))
91 class IpsetOutputWriter(OutputWriter
):
97 def _write_header(self
, f
):
98 h
= "create %s hash:net family inet hashsize 1024 maxelem 65536\n" % self
.name
100 f
.write(h
.encode("ascii"))
102 def write(self
, network
):
103 s
= "add %s %s\n" % (self
.name
, network
)
105 self
.f
.write(s
.encode("ascii"))
108 class NftablesOutputWriter(OutputWriter
):
114 def _write_header(self
, f
):
115 h
= "define %s = {\n" % self
.name
117 f
.write(h
.encode("ascii"))
119 def _write_footer(self
, f
):
122 def write(self
, network
):
123 s
= " %s,\n" % network
125 self
.f
.write(s
.encode("ascii"))
128 class XTGeoIPOutputWriter(OutputWriter
):
130 Formats the output in that way, that it can be loaded by
131 the xt_geoip kernel module from xtables-addons.
135 def write(self
, network
):
136 n
= ipaddress
.ip_network("%s" % network
)
138 for address
in (n
.network_address
, n
.broadcast_address
):
139 bytes
= socket
.inet_pton(
140 socket
.AF_INET6
if address
.version
== 6 else socket
.AF_INET
,
147 class Exporter(object):
148 def __init__(self
, db
, writer
):
152 def export(self
, directory
, families
, countries
, asns
):
153 for family
in families
:
154 log
.debug("Exporting family %s" % family
)
158 # Create writers for countries
159 for country_code
in countries
:
160 writers
[country_code
] = self
.writer(family
, country_code
=country_code
)
162 # Create writers for ASNs
164 writers
[asn
] = self
.writer(family
, asn
=asn
)
166 # Get all networks that match the family
167 networks
= self
.db
.search_networks(family
=family
)
169 # Walk through all networks
170 for network
in networks
:
171 # Write matching countries
172 if network
.country_code
in countries
:
173 writers
[network
.country_code
].write(network
)
175 # Write matching ASNs
176 if network
.asn
in asns
:
177 writers
[network
.asn
].write(network
)
179 # Write everything to the filesystem
180 for writer
in writers
.values():
181 writer
.write_out(directory
)
186 "ipset" : IpsetOutputWriter
,
187 "list" : OutputWriter
,
188 "nftables" : NftablesOutputWriter
,
189 "xt_geoip" : XTGeoIPOutputWriter
,
193 parser
= argparse
.ArgumentParser(
194 description
=_("Location Exporter Command Line Interface"),
197 # Global configuration flags
198 parser
.add_argument("--debug", action
="store_true",
199 help=_("Enable debug output"))
202 parser
.add_argument("--version", action
="version",
203 version
="%(prog)s @VERSION@")
206 parser
.add_argument("--database", "-d",
207 default
="@databasedir@/database.db", help=_("Path to database"),
211 parser
.add_argument("--format", help=_("Output format"),
212 default
="list", choices
=self
.output_formats
.keys())
215 parser
.add_argument("--directory", help=_("Output directory"), required
=True)
218 parser
.add_argument("--family", help=_("Specify address family"), choices
=("ipv6", "ipv4"))
220 # Countries and Autonomous Systems
221 parser
.add_argument("objects", nargs
="+")
223 args
= parser
.parse_args()
225 # Enable debug logging
227 log
.setLevel(logging
.DEBUG
)
232 # Parse command line arguments
233 args
= self
.parse_cli()
236 ret
= self
.handle_export(args
)
238 # Return with exit code
242 # Otherwise just exit
245 def handle_export(self
, ns
):
246 countries
, asns
= [], []
249 if ns
.family
== "ipv6":
250 families
= [ socket
.AF_INET6
]
251 elif ns
.family
== "ipv4":
252 families
= [ socket
.AF_INET
]
254 families
= [ socket
.AF_INET6
, socket
.AF_INET
]
256 for object in ns
.objects
:
257 if object.startswith("AS"):
259 object = int(object[2:])
261 log
.error("Invalid argument: %s" % object)
266 elif location
.country_code_is_valid(object) \
267 or object in ("A1", "A2", "A3"):
268 countries
.append(object)
271 log
.error("Invalid argument: %s" % object)
276 db
= location
.Database(ns
.database
)
277 except FileNotFoundError
as e
:
278 log
.error("Count not open database: %s" % ns
.database
)
281 # Select the output format
282 writer
= self
.output_formats
.get(ns
.format
)
285 e
= Exporter(db
, writer
)
286 e
.export(ns
.directory
, countries
=countries
, asns
=asns
, families
=families
)
290 # Run the command line interface