2 ###############################################################################
4 # libloc - A library to determine the location of someone on the Internet #
6 # Copyright (C) 2017 IPFire Development Team <info@ipfire.org> #
8 # This library is free software; you can redistribute it and/or #
9 # modify it under the terms of the GNU Lesser General Public #
10 # License as published by the Free Software Foundation; either #
11 # version 2.1 of the License, or (at your option) any later version. #
13 # This library is distributed in the hope that it will be useful, #
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of #
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU #
16 # Lesser General Public License for more details. #
18 ###############################################################################
27 # Load our location module
29 from location
.i18n
import _
33 class OutputFormatter(object):
34 def __init__(self
, ns
):
43 def __exit__(self
, type, value
, tb
):
49 if "country_code" in self
.ns
:
50 return "networks_country_%s" % self
.ns
.country_code
[0]
52 elif "asn" in self
.ns
:
53 return "networks_AS%s" % self
.ns
.asn
[0]
61 def network(self
, network
):
65 class IpsetOutputFormatter(OutputFormatter
):
70 print("create %s hash:net family inet hashsize 1024 maxelem 65536" % self
.name
)
72 def network(self
, network
):
73 print("add %s %s" % (self
.name
, network
))
76 class NftablesOutputFormatter(OutputFormatter
):
81 print("define %s = {" % self
.name
)
86 def network(self
, network
):
87 print(" %s," % network
)
90 class XTGeoIPOutputFormatter(OutputFormatter
):
92 Formats the output in that way, that it can be loaded by
93 the xt_geoip kernel module from xtables-addons.
95 def network(self
, network
):
96 n
= ipaddress
.ip_network("%s" % network
)
98 for address
in (n
.network_address
, n
.broadcast_address
):
99 bytes
= socket
.inet_pton(
100 socket
.AF_INET6
if address
.version
== 6 else socket
.AF_INET
,
109 "ipset" : IpsetOutputFormatter
,
110 "list" : OutputFormatter
,
111 "nftables" : NftablesOutputFormatter
,
112 "xt_geoip" : XTGeoIPOutputFormatter
,
116 parser
= argparse
.ArgumentParser(
117 description
=_("Location Database Command Line Interface"),
119 subparsers
= parser
.add_subparsers()
121 # Global configuration flags
122 parser
.add_argument("--debug", action
="store_true",
123 help=_("Enable debug output"))
124 parser
.add_argument("--quiet", action
="store_true",
125 help=_("Enable quiet mode"))
128 parser
.add_argument("--version", action
="version",
129 version
="%(prog)s @VERSION@")
132 parser
.add_argument("--database", "-d",
133 default
="@databasedir@/database.db", help=_("Path to database"),
137 parser
.add_argument("--public-key", "-k",
138 default
="@databasedir@/signing-key.pem", help=_("Public Signing Key"),
141 # lookup an IP address
142 lookup
= subparsers
.add_parser("lookup",
143 help=_("Lookup one or multiple IP addresses"),
145 lookup
.add_argument("address", nargs
="+")
146 lookup
.set_defaults(func
=self
.handle_lookup
)
148 # Dump the whole database
149 dump
= subparsers
.add_parser("dump",
150 help=_("Dump the entire database"),
152 dump
.add_argument("output", nargs
="?", type=argparse
.FileType("w"))
153 dump
.set_defaults(func
=self
.handle_dump
)
156 get_as
= subparsers
.add_parser("get-as",
157 help=_("Get information about one or multiple Autonomous Systems"),
159 get_as
.add_argument("asn", nargs
="+")
160 get_as
.set_defaults(func
=self
.handle_get_as
)
163 search_as
= subparsers
.add_parser("search-as",
164 help=_("Search for Autonomous Systems that match the string"),
166 search_as
.add_argument("query", nargs
=1)
167 search_as
.set_defaults(func
=self
.handle_search_as
)
169 # List all networks in an AS
170 list_networks_by_as
= subparsers
.add_parser("list-networks-by-as",
171 help=_("Lists all networks in an AS"),
173 list_networks_by_as
.add_argument("asn", nargs
=1, type=int)
174 list_networks_by_as
.add_argument("--family", choices
=("ipv6", "ipv4"))
175 list_networks_by_as
.add_argument("--output-format",
176 choices
=self
.output_formats
.keys(), default
="list")
177 list_networks_by_as
.set_defaults(func
=self
.handle_list_networks_by_as
)
179 # List all networks in a country
180 list_networks_by_cc
= subparsers
.add_parser("list-networks-by-cc",
181 help=_("Lists all networks in a country"),
183 list_networks_by_cc
.add_argument("country_code", nargs
=1)
184 list_networks_by_cc
.add_argument("--family", choices
=("ipv6", "ipv4"))
185 list_networks_by_cc
.add_argument("--output-format",
186 choices
=self
.output_formats
.keys(), default
="list")
187 list_networks_by_cc
.set_defaults(func
=self
.handle_list_networks_by_cc
)
189 # List all networks with flags
190 list_networks_by_flags
= subparsers
.add_parser("list-networks-by-flags",
191 help=_("Lists all networks with flags"),
193 list_networks_by_flags
.add_argument("--anonymous-proxy",
194 action
="store_true", help=_("Anonymous Proxies"),
196 list_networks_by_flags
.add_argument("--satellite-provider",
197 action
="store_true", help=_("Satellite Providers"),
199 list_networks_by_flags
.add_argument("--anycast",
200 action
="store_true", help=_("Anycasts"),
202 list_networks_by_flags
.add_argument("--family", choices
=("ipv6", "ipv4"))
203 list_networks_by_flags
.add_argument("--output-format",
204 choices
=self
.output_formats
.keys(), default
="list")
205 list_networks_by_flags
.set_defaults(func
=self
.handle_list_networks_by_flags
)
207 args
= parser
.parse_args()
211 location
.logger
.set_level(logging
.DEBUG
)
213 location
.logger
.set_level(logging
.WARNING
)
215 # Print usage if no action was given
216 if not "func" in args
:
223 # Parse command line arguments
224 args
= self
.parse_cli()
228 db
= location
.Database(args
.database
)
229 except FileNotFoundError
as e
:
230 sys
.stderr
.write("location-query: Could not open database %s: %s\n" \
231 % (args
.database
, e
))
234 # Translate family (if present)
236 if args
.family
== "ipv6":
237 args
.family
= socket
.AF_INET6
238 elif args
.family
== "ipv4":
239 args
.family
= socket
.AF_INET
244 ret
= args
.func(db
, args
)
246 # Return with exit code
250 # Otherwise just exit
253 def handle_lookup(self
, db
, ns
):
256 format
= " %-24s: %s"
258 for address
in ns
.address
:
260 network
= db
.lookup(address
)
262 print(_("Invalid IP address: %s") % address
, file=sys
.stderr
)
271 print(_("Nothing found for %(address)s") % args
, file=sys
.stderr
)
275 print("%s:" % address
)
276 print(format
% (_("Network"), network
))
279 if network
.country_code
:
280 print(format
% (_("Country"), network
.country_code
))
282 # Print AS information
284 autonomous_system
= db
.get_as(network
.asn
)
287 _("Autonomous System"),
288 autonomous_system
or "AS%s" % network
.asn
),
293 def handle_dump(self
, db
, ns
):
294 # Use output file or write to stdout
295 f
= ns
.output
or sys
.stdout
298 f
.write("#\n# Location Database Export\n#\n")
300 f
.write("# Generated: %s\n" % time
.strftime(
301 "%a, %d %b %Y %H:%M:%S GMT", time
.gmtime(db
.created_at
),
305 f
.write("# Vendor: %s\n" % db
.vendor
)
308 f
.write("# License: %s\n" % db
.license
)
313 for line
in db
.description
.splitlines():
314 f
.write("# %s\n" % line
)
318 # Iterate over all ASes
321 f
.write("aut-num: AS%s\n" % a
.number
)
322 f
.write("name: %s\n" % a
.name
)
324 # Iterate over all networks
325 for n
in db
.networks
:
327 f
.write("net: %s\n" % n
)
330 f
.write("country: %s\n" % n
.country_code
)
333 f
.write("autnum: %s\n" % n
.asn
)
335 def handle_get_as(self
, db
, ns
):
337 Gets information about Autonomous Systems
345 print(_("Invalid ASN: %s") % asn
, file=sys
.stderr
)
349 # Fetch AS from database
354 print(_("Could not find AS%s") % asn
, file=sys
.stderr
)
358 print(_("AS%(asn)s belongs to %(name)s") % { "asn" : a
.number
, "name" : a
.name
})
362 def handle_search_as(self
, db
, ns
):
363 for query
in ns
.query
:
364 # Print all matches ASes
365 for a
in db
.search_as(query
):
368 def __get_output_formatter(self
, ns
):
370 cls
= self
.output_formats
[ns
.output_format
]
372 cls
= OutputFormatter
376 def handle_list_networks_by_as(self
, db
, ns
):
377 with self
.__get
_output
_formatter
(ns
) as f
:
379 # Print all matching networks
380 for n
in db
.search_networks(asn
=asn
, family
=ns
.family
):
383 def handle_list_networks_by_cc(self
, db
, ns
):
384 with self
.__get
_output
_formatter
(ns
) as f
:
385 for country_code
in ns
.country_code
:
386 # Print all matching networks
387 for n
in db
.search_networks(country_code
=country_code
, family
=ns
.family
):
390 def handle_list_networks_by_flags(self
, db
, ns
):
393 if ns
.anonymous_proxy
:
394 flags |
= location
.NETWORK_FLAG_ANONYMOUS_PROXY
396 if ns
.satellite_provider
:
397 flags |
= location
.NETWORK_FLAG_SATELLITE_PROVIDER
400 flags |
= location
.NETWORK_FLAG_ANYCAST
402 with self
.__get
_output
_formatter
(ns
) as f
:
403 for n
in db
.search_networks(flags
=flags
, family
=ns
.family
):
408 # Run the command line interface