]> git.ipfire.org Git - location/libloc.git/blob - src/python/location-query.in
Fix off_t casting in tests
[location/libloc.git] / src / python / location-query.in
1 #!/usr/bin/python3
2 ###############################################################################
3 # #
4 # libloc - A library to determine the location of someone on the Internet #
5 # #
6 # Copyright (C) 2017 IPFire Development Team <info@ipfire.org> #
7 # #
8 # This library is free software; you can redistribute it and/or #
9 # modify it under the terms of the GNU Lesser General Public #
10 # License as published by the Free Software Foundation; either #
11 # version 2.1 of the License, or (at your option) any later version. #
12 # #
13 # This library is distributed in the hope that it will be useful, #
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of #
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU #
16 # Lesser General Public License for more details. #
17 # #
18 ###############################################################################
19
20 import argparse
21 import gettext
22 import sys
23 import syslog
24
25 # Load our location module
26 import location
27
28 # i18n
29 def _(singular, plural=None, n=None):
30 if plural:
31 return gettext.dngettext("libloc", singular, plural, n)
32
33 return gettext.dgettext("libloc", singular)
34
35 class CLI(object):
36 def parse_cli(self):
37 parser = argparse.ArgumentParser(
38 description=_("Location Database Command Line Interface"),
39 )
40 subparsers = parser.add_subparsers()
41
42 # Global configuration flags
43 parser.add_argument("--debug", action="store_true",
44 help=_("Enable debug output"))
45
46 # version
47 parser.add_argument("--version", action="version",
48 version="%%(prog)s %s" % location.__version__)
49
50 # database
51 parser.add_argument("--database", "-d",
52 default="@databasedir@/database.db", help=_("Path to database"),
53 )
54
55 # lookup an IP address
56 lookup = subparsers.add_parser("lookup",
57 help=_("Lookup one or multiple IP addresses"),
58 )
59 lookup.add_argument("address", nargs="+")
60 lookup.set_defaults(func=self.handle_lookup)
61
62 # Get AS
63 get_as = subparsers.add_parser("get-as",
64 help=_("Get information about one or multiple Autonomous Systems"),
65 )
66 get_as.add_argument("asn", nargs="+")
67 get_as.set_defaults(func=self.handle_get_as)
68
69 # Search for AS
70 search_as = subparsers.add_parser("search-as",
71 help=_("Search for Autonomous Systems that match the string"),
72 )
73 search_as.add_argument("query", nargs=1)
74 search_as.set_defaults(func=self.handle_search_as)
75
76 # List all networks in an AS
77 list_networks_by_as = subparsers.add_parser("list-networks-by-as",
78 help=_("Lists all networks in an AS"),
79 )
80 list_networks_by_as.add_argument("asn", nargs=1, type=int)
81 list_networks_by_as.set_defaults(func=self.handle_list_networks_by_as)
82
83 # List all networks in a country
84 list_networks_by_cc = subparsers.add_parser("list-networks-by-cc",
85 help=_("Lists all networks in a country"),
86 )
87 list_networks_by_cc.add_argument("country_code", nargs=1)
88 list_networks_by_cc.set_defaults(func=self.handle_list_networks_by_cc)
89
90 args = parser.parse_args()
91
92 # Print usage if no action was given
93 if not "func" in args:
94 parser.print_usage()
95 sys.exit(2)
96
97 return args
98
99 def run(self):
100 # Parse command line arguments
101 args = self.parse_cli()
102
103 # Open database
104 try:
105 db = location.Database(args.database)
106 except FileNotFoundError as e:
107 sys.stderr.write("location-query: Could not open database %s: %s\n" \
108 % (args.database, e))
109 sys.exit(1)
110
111 # Call function
112 ret = args.func(db, args)
113
114 # Return with exit code
115 if ret:
116 sys.exit(ret)
117
118 # Otherwise just exit
119 sys.exit(0)
120
121 def handle_lookup(self, db, ns):
122 ret = 0
123
124 for address in ns.address:
125 try:
126 n = db.lookup(address)
127 except ValueError:
128 print(_("Invalid IP address: %s") % address, file=sys.stderr)
129
130 args = {
131 "address" : address,
132 "network" : n,
133 }
134
135 # Nothing found?
136 if not n:
137 print(_("Nothing found for %(address)s") % args, file=sys.stderr)
138 ret = 1
139 continue
140
141 # Try to retrieve the AS if we have an AS number
142 if n.asn:
143 a = db.get_as(n.asn)
144
145 # If we have found an AS we will print it in the message
146 if a:
147 args.update({
148 "as" : a,
149 })
150
151 print(_("%(address)s belongs to %(network)s which is a part of %(as)s") % args)
152 continue
153
154 print(_("%(address)s belongs to %(network)s") % args)
155
156 return ret
157
158 def handle_get_as(self, db, ns):
159 """
160 Gets information about Autonomous Systems
161 """
162 ret = 0
163
164 for asn in ns.asn:
165 try:
166 asn = int(asn)
167 except ValueError:
168 print(_("Invalid ASN: %s") % asn, file=sys.stderr)
169 ret = 1
170 continue
171
172 # Fetch AS from database
173 a = db.get_as(asn)
174
175 # Nothing found
176 if not a:
177 print(_("Could not find AS%s") % asn, file=sys.stderr)
178 ret = 1
179 continue
180
181 print(_("AS%(asn)s belongs to %(name)s") % { "asn" : a.number, "name" : a.name })
182
183 return ret
184
185 def handle_search_as(self, db, ns):
186 for query in ns.query:
187 # Print all matches ASes
188 for a in db.search_as(query):
189 print(a)
190
191 def handle_list_networks_by_as(self, db, ns):
192 for asn in ns.asn:
193 # Print all matching networks
194 for n in db.search_networks(asn=asn):
195 print(n)
196
197 def handle_list_networks_by_cc(self, db, ns):
198 for country_code in ns.country_code:
199 # Print all matching networks
200 for n in db.search_networks(country_code=country_code):
201 print(n)
202
203 def main():
204 # Run the command line interface
205 c = CLI()
206 c.run()
207
208 main()