2 ###############################################################################
4 # libloc - A library to determine the location of someone on the Internet #
6 # Copyright (C) 2017 IPFire Development Team <info@ipfire.org> #
8 # This library is free software; you can redistribute it and/or #
9 # modify it under the terms of the GNU Lesser General Public #
10 # License as published by the Free Software Foundation; either #
11 # version 2.1 of the License, or (at your option) any later version. #
13 # This library is distributed in the hope that it will be useful, #
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of #
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU #
16 # Lesser General Public License for more details. #
18 ###############################################################################
27 # Load our location module
31 def _(singular
, plural
=None, n
=None):
33 return gettext
.dngettext("libloc", singular
, plural
, n
)
35 return gettext
.dgettext("libloc", singular
)
39 class OutputFormatter(object):
40 def __init__(self
, ns
):
49 def __exit__(self
, type, value
, tb
):
55 if "country_code" in self
.ns
:
56 return "networks_country_%s" % self
.ns
.country_code
[0]
58 elif "asn" in self
.ns
:
59 return "networks_AS%s" % self
.ns
.asn
[0]
67 def network(self
, network
):
71 class IpsetOutputFormatter(OutputFormatter
):
76 print("create %s hash:net family inet hashsize 1024 maxelem 65536" % self
.name
)
78 def network(self
, network
):
79 print("add %s %s" % (self
.name
, network
))
82 class NftablesOutputFormatter(OutputFormatter
):
87 print("define %s = {" % self
.name
)
92 def network(self
, network
):
93 print(" %s," % network
)
96 class XTGeoIPOutputFormatter(OutputFormatter
):
98 Formats the output in that way, that it can be loaded by
99 the xt_geoip kernel module from xtables-addons.
101 def network(self
, network
):
102 n
= ipaddress
.ip_network("%s" % network
)
104 for address
in (n
.network_address
, n
.broadcast_address
):
105 bytes
= socket
.inet_pton(
106 socket
.AF_INET6
if address
.version
== 6 else socket
.AF_INET
,
115 "ipset" : IpsetOutputFormatter
,
116 "list" : OutputFormatter
,
117 "nftables" : NftablesOutputFormatter
,
118 "xt_geoip" : XTGeoIPOutputFormatter
,
122 parser
= argparse
.ArgumentParser(
123 description
=_("Location Database Command Line Interface"),
125 subparsers
= parser
.add_subparsers()
127 # Global configuration flags
128 parser
.add_argument("--debug", action
="store_true",
129 help=_("Enable debug output"))
132 parser
.add_argument("--version", action
="version",
133 version
="%(prog)s @VERSION@")
136 parser
.add_argument("--database", "-d",
137 default
="@databasedir@/database.db", help=_("Path to database"),
141 parser
.add_argument("--public-key", "-k",
142 default
="@databasedir@/signing-key.pem", help=_("Public Signing Key"),
145 # lookup an IP address
146 lookup
= subparsers
.add_parser("lookup",
147 help=_("Lookup one or multiple IP addresses"),
149 lookup
.add_argument("address", nargs
="+")
150 lookup
.set_defaults(func
=self
.handle_lookup
)
153 get_as
= subparsers
.add_parser("get-as",
154 help=_("Get information about one or multiple Autonomous Systems"),
156 get_as
.add_argument("asn", nargs
="+")
157 get_as
.set_defaults(func
=self
.handle_get_as
)
160 search_as
= subparsers
.add_parser("search-as",
161 help=_("Search for Autonomous Systems that match the string"),
163 search_as
.add_argument("query", nargs
=1)
164 search_as
.set_defaults(func
=self
.handle_search_as
)
166 # List all networks in an AS
167 list_networks_by_as
= subparsers
.add_parser("list-networks-by-as",
168 help=_("Lists all networks in an AS"),
170 list_networks_by_as
.add_argument("asn", nargs
=1, type=int)
171 list_networks_by_as
.add_argument("--family", choices
=("ipv6", "ipv4"))
172 list_networks_by_as
.add_argument("--output-format",
173 choices
=self
.output_formats
.keys(), default
="list")
174 list_networks_by_as
.set_defaults(func
=self
.handle_list_networks_by_as
)
176 # List all networks in a country
177 list_networks_by_cc
= subparsers
.add_parser("list-networks-by-cc",
178 help=_("Lists all networks in a country"),
180 list_networks_by_cc
.add_argument("country_code", nargs
=1)
181 list_networks_by_cc
.add_argument("--family", choices
=("ipv6", "ipv4"))
182 list_networks_by_cc
.add_argument("--output-format",
183 choices
=self
.output_formats
.keys(), default
="list")
184 list_networks_by_cc
.set_defaults(func
=self
.handle_list_networks_by_cc
)
186 # List all networks with flags
187 list_networks_by_flags
= subparsers
.add_parser("list-networks-by-flags",
188 help=_("Lists all networks with flags"),
190 list_networks_by_flags
.add_argument("--anonymous-proxy",
191 action
="store_true", help=_("Anonymous Proxies"),
193 list_networks_by_flags
.add_argument("--satellite-provider",
194 action
="store_true", help=_("Satellite Providers"),
196 list_networks_by_flags
.add_argument("--anycast",
197 action
="store_true", help=_("Anycasts"),
199 list_networks_by_flags
.add_argument("--family", choices
=("ipv6", "ipv4"))
200 list_networks_by_flags
.add_argument("--output-format",
201 choices
=self
.output_formats
.keys(), default
="list")
202 list_networks_by_flags
.set_defaults(func
=self
.handle_list_networks_by_flags
)
204 args
= parser
.parse_args()
206 # Print usage if no action was given
207 if not "func" in args
:
214 # Parse command line arguments
215 args
= self
.parse_cli()
219 db
= location
.Database(args
.database
)
220 except FileNotFoundError
as e
:
221 sys
.stderr
.write("location-query: Could not open database %s: %s\n" \
222 % (args
.database
, e
))
225 # Translate family (if present)
227 if args
.family
== "ipv6":
228 args
.family
= socket
.AF_INET6
229 elif args
.family
== "ipv4":
230 args
.family
= socket
.AF_INET
235 ret
= args
.func(db
, args
)
237 # Return with exit code
241 # Otherwise just exit
244 def handle_lookup(self
, db
, ns
):
247 for address
in ns
.address
:
249 n
= db
.lookup(address
)
251 print(_("Invalid IP address: %s") % address
, file=sys
.stderr
)
260 print(_("Nothing found for %(address)s") % args
, file=sys
.stderr
)
264 # Try to retrieve the AS if we have an AS number
268 # If we have found an AS we will print it in the message
274 print(_("%(address)s belongs to %(network)s which is a part of %(as)s") % args
)
277 print(_("%(address)s belongs to %(network)s") % args
)
281 def handle_get_as(self
, db
, ns
):
283 Gets information about Autonomous Systems
291 print(_("Invalid ASN: %s") % asn
, file=sys
.stderr
)
295 # Fetch AS from database
300 print(_("Could not find AS%s") % asn
, file=sys
.stderr
)
304 print(_("AS%(asn)s belongs to %(name)s") % { "asn" : a
.number
, "name" : a
.name
})
308 def handle_search_as(self
, db
, ns
):
309 for query
in ns
.query
:
310 # Print all matches ASes
311 for a
in db
.search_as(query
):
314 def __get_output_formatter(self
, ns
):
316 cls
= self
.output_formats
[ns
.output_format
]
318 cls
= OutputFormatter
322 def handle_list_networks_by_as(self
, db
, ns
):
323 with self
.__get
_output
_formatter
(ns
) as f
:
325 # Print all matching networks
326 for n
in db
.search_networks(asn
=asn
, family
=ns
.family
):
329 def handle_list_networks_by_cc(self
, db
, ns
):
330 with self
.__get
_output
_formatter
(ns
) as f
:
331 for country_code
in ns
.country_code
:
332 # Print all matching networks
333 for n
in db
.search_networks(country_code
=country_code
, family
=ns
.family
):
336 def handle_list_networks_by_flags(self
, db
, ns
):
339 if ns
.anonymous_proxy
:
340 flags |
= location
.NETWORK_FLAG_ANONYMOUS_PROXY
342 if ns
.satellite_provider
:
343 flags |
= location
.NETWORK_FLAG_SATELLITE_PROVIDER
346 flags |
= location
.NETWORK_FLAG_ANYCAST
348 with self
.__get
_output
_formatter
(ns
) as f
:
349 for n
in db
.search_networks(flags
=flags
, family
=ns
.family
):
354 # Run the command line interface