]>
git.ipfire.org Git - location/libloc.git/blob - src/python/location-exporter.in
2 ###############################################################################
4 # libloc - A library to determine the location of someone on the Internet #
6 # Copyright (C) 2019 IPFire Development Team <info@ipfire.org> #
8 # This library is free software; you can redistribute it and/or #
9 # modify it under the terms of the GNU Lesser General Public #
10 # License as published by the Free Software Foundation; either #
11 # version 2.1 of the License, or (at your option) any later version. #
13 # This library is distributed in the hope that it will be useful, #
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of #
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU #
16 # Lesser General Public License for more details. #
18 ###############################################################################
29 # Load our location module
31 from location
.i18n
import _
34 log
= logging
.getLogger("location.exporter")
37 class OutputWriter(object):
40 def __init__(self
, family
, country_code
=None, asn
=None):
41 self
.family
, self
.country_code
, self
.asn
= family
, country_code
, asn
45 def write_out(self
, directory
):
46 # Make the output filename
47 filename
= os
.path
.join(
48 directory
, self
._make
_filename
(),
51 with
open(filename
, "wb") as f
:
54 # Copy all data into the file
55 f
.write(self
.f
.getbuffer())
59 def _make_filename(self
):
61 self
.country_code
or "AS%s" % self
.asn
,
63 "6" if self
.family
== socket
.AF_INET6
else "4"
69 return "CC_%s" % self
.country_code
72 return "AS%s" % self
.asn
74 def _write_header(self
, f
):
76 The header of the file
80 def _write_footer(self
, f
):
82 The footer of the file
86 def write(self
, network
):
89 self
.f
.write(s
.encode("ascii"))
92 class IpsetOutputWriter(OutputWriter
):
98 def _write_header(self
, f
):
99 h
= "create %s hash:net family inet hashsize 1024 maxelem 65536\n" % self
.name
101 f
.write(h
.encode("ascii"))
103 def write(self
, network
):
104 s
= "add %s %s\n" % (self
.name
, network
)
106 self
.f
.write(s
.encode("ascii"))
109 class NftablesOutputWriter(OutputWriter
):
115 def _write_header(self
, f
):
116 h
= "define %s = {\n" % self
.name
118 f
.write(h
.encode("ascii"))
120 def _write_footer(self
, f
):
123 def write(self
, network
):
124 s
= " %s,\n" % network
126 self
.f
.write(s
.encode("ascii"))
129 class XTGeoIPOutputWriter(OutputWriter
):
131 Formats the output in that way, that it can be loaded by
132 the xt_geoip kernel module from xtables-addons.
136 def write(self
, network
):
137 n
= ipaddress
.ip_network("%s" % network
)
139 for address
in (n
.network_address
, n
.broadcast_address
):
140 bytes
= socket
.inet_pton(
141 socket
.AF_INET6
if address
.version
== 6 else socket
.AF_INET
,
148 class Exporter(object):
149 def __init__(self
, db
, writer
):
153 def export(self
, directory
, families
, countries
, asns
):
154 for family
in families
:
155 log
.debug("Exporting family %s" % family
)
159 # Create writers for countries
160 for country_code
in countries
:
161 writers
[country_code
] = self
.writer(family
, country_code
=country_code
)
163 # Create writers for ASNs
165 writers
[asn
] = self
.writer(family
, asn
=asn
)
167 # Get all networks that match the family
168 networks
= self
.db
.search_networks(family
=family
)
170 # Walk through all networks
171 for network
in networks
:
172 # Write matching countries
173 if network
.country_code
in countries
:
174 writers
[network
.country_code
].write(network
)
176 # Write matching ASNs
177 if network
.asn
in asns
:
178 writers
[network
.asn
].write(network
)
180 # Write everything to the filesystem
181 for writer
in writers
.values():
182 writer
.write_out(directory
)
187 "ipset" : IpsetOutputWriter
,
188 "list" : OutputWriter
,
189 "nftables" : NftablesOutputWriter
,
190 "xt_geoip" : XTGeoIPOutputWriter
,
194 parser
= argparse
.ArgumentParser(
195 description
=_("Location Exporter Command Line Interface"),
198 # Global configuration flags
199 parser
.add_argument("--debug", action
="store_true",
200 help=_("Enable debug output"))
201 parser
.add_argument("--quiet", action
="store_true",
202 help=_("Enable quiet mode"))
205 parser
.add_argument("--version", action
="version",
206 version
="%(prog)s @VERSION@")
209 parser
.add_argument("--database", "-d",
210 default
="@databasedir@/database.db", help=_("Path to database"),
214 parser
.add_argument("--format", help=_("Output format"),
215 default
="list", choices
=self
.output_formats
.keys())
218 parser
.add_argument("--directory", help=_("Output directory"), required
=True)
221 parser
.add_argument("--family", help=_("Specify address family"), choices
=("ipv6", "ipv4"))
223 # Countries and Autonomous Systems
224 parser
.add_argument("objects", nargs
="+")
226 args
= parser
.parse_args()
230 location
.logger
.set_level(logging
.DEBUG
)
232 location
.logger
.set_level(logging
.WARNING
)
237 # Parse command line arguments
238 args
= self
.parse_cli()
241 ret
= self
.handle_export(args
)
243 # Return with exit code
247 # Otherwise just exit
250 def handle_export(self
, ns
):
251 countries
, asns
= [], []
254 if ns
.family
== "ipv6":
255 families
= [ socket
.AF_INET6
]
256 elif ns
.family
== "ipv4":
257 families
= [ socket
.AF_INET
]
259 families
= [ socket
.AF_INET6
, socket
.AF_INET
]
261 for object in ns
.objects
:
262 m
= re
.match("^AS(\d+)$", object)
264 object = int(m
.group(1))
268 elif location
.country_code_is_valid(object) \
269 or object in ("A1", "A2", "A3"):
270 countries
.append(object)
273 log
.warning("Invalid argument: %s" % object)
276 if not countries
and not asns
:
277 log
.error("Nothing to export")
282 db
= location
.Database(ns
.database
)
283 except FileNotFoundError
as e
:
284 log
.error("Count not open database: %s" % ns
.database
)
287 # Select the output format
288 writer
= self
.output_formats
.get(ns
.format
)
291 e
= Exporter(db
, writer
)
292 e
.export(ns
.directory
, countries
=countries
, asns
=asns
, families
=families
)
296 # Run the command line interface