location-query: Implement searching for an AS that matches a string
[people/ms/libloc.git] / src / python / location-query.in
1 #!/usr/bin/python3
2 ###############################################################################
3 #                                                                             #
4 # libloc - A library to determine the location of someone on the Internet     #
5 #                                                                             #
6 # Copyright (C) 2017 IPFire Development Team <info@ipfire.org>                #
7 #                                                                             #
8 # This library is free software; you can redistribute it and/or               #
9 # modify it under the terms of the GNU Lesser General Public                  #
10 # License as published by the Free Software Foundation; either                #
11 # version 2.1 of the License, or (at your option) any later version.          #
12 #                                                                             #
13 # This library is distributed in the hope that it will be useful,             #
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of              #
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU           #
16 # Lesser General Public License for more details.                             #
17 #                                                                             #
18 ###############################################################################
19
20 import argparse
21 import gettext
22 import sys
23 import syslog
24
25 # Load our location module
26 import location
27
28 # i18n
29 def _(singular, plural=None, n=None):
30         if plural:
31                 return gettext.dngettext("libloc", singular, plural, n)
32
33         return gettext.dgettext("libloc", singular)
34
35 class CLI(object):
36         def __init__(self):
37                 # Open database
38                 self.db = location.Database("@databasedir@/database.db")
39
40         def parse_cli(self):
41                 parser = argparse.ArgumentParser(
42                         description=_("Location Database Command Line Interface"),
43                 )
44                 subparsers = parser.add_subparsers()
45
46                 # Global configuration flags
47                 parser.add_argument("--debug", action="store_true",
48                         help=_("Enable debug output"))
49
50                 # version
51                 parser.add_argument("--version", action="version",
52                         version="%%(prog)s %s" % location.__version__)
53
54                 # lookup an IP address
55                 lookup = subparsers.add_parser("lookup",
56                         help=_("Lookup one or multiple IP addresses"),
57                 )
58                 lookup.add_argument("address", nargs="+")
59                 lookup.set_defaults(func=self.handle_lookup)
60
61                 # Get AS
62                 get_as = subparsers.add_parser("get-as",
63                         help=_("Get information about one or multiple Autonomous Systems"),
64                 )
65                 get_as.add_argument("asn", nargs="+")
66                 get_as.set_defaults(func=self.handle_get_as)
67
68                 # Search for AS
69                 search_as = subparsers.add_parser("search-as",
70                         help=_("Search for Autonomous Systems that match the string"),
71                 )
72                 search_as.add_argument("query", nargs=1)
73                 search_as.set_defaults(func=self.handle_search_as)
74
75                 return parser.parse_args()
76
77         def run(self):
78                 # Parse command line arguments
79                 args = self.parse_cli()
80
81                 # Callback function must be defined
82                 assert args.func, "Callback function not defined"
83
84                 # Call function
85                 ret = args.func(args)
86
87                 # Return with exit code
88                 if ret:
89                         sys.exit(ret)
90
91                 # Otherwise just exit
92                 sys.exit(0)
93
94         def handle_lookup(self, ns):
95                 ret = 0
96
97                 for address in ns.address:
98                         try:
99                                 n = self.db.lookup(address)
100                         except ValueError:
101                                 sys.stderr.write(_("Invalid IP address: %s") % address)
102
103                         args = {
104                                 "address" : address,
105                                 "network" : n,
106                         }
107
108                         # Nothing found?
109                         if not n:
110                                 print(_("Nothing found for %(address)s") % args)
111                                 ret = 1
112                                 continue
113
114                         # Try to retrieve the AS if we have an AS number
115                         if n.asn:
116                                 a = self.db.get_as(n.asn)
117
118                                 # If we have found an AS we will print it in the message
119                                 if a:
120                                         args.update({
121                                                 "as" : a,
122                                         })
123
124                                         print(_("%(address)s belongs to %(network)s which is a part of %(as)s") % args)
125                                         continue
126
127                         print(_("%(address)s belongs to %(network)s") % args)
128
129                 return ret
130
131         def handle_get_as(self, ns):
132                 """
133                         Gets information about Autonomous Systems
134                 """
135                 ret = 0
136
137                 for asn in ns.asn:
138                         try:
139                                 asn = int(asn)
140                         except ValueError:
141                                 sys.stderr.write("Invalid ASN: %s" %asn)
142                                 ret = 1
143                                 continue
144
145                         # Fetch AS from database
146                         a = self.db.get_as(asn)
147
148                         # Nothing found
149                         if not a:
150                                 print(_("Could not find AS%s") % asn)
151                                 ret = 1
152                                 continue
153
154                         print(_("AS%(asn)s belongs to %(name)s") % { "asn" : a.number, "name" : a.name })
155
156                 return ret
157
158         def handle_search_as(self, ns):
159                 for query in ns.query:
160                         # Print all matches ASes
161                         for a in self.db.search_as(query):
162                                 print(a)
163
164 def main():
165         # Run the command line interface
166         c = CLI()
167         c.run()
168
169 main()