]> git.ipfire.org Git - ipfire.org.git/blame - src/backend/util.py
location: Redesign lookup page
[ipfire.org.git] / src / backend / util.py
CommitLineData
9523790a 1#!/usr/bin/python3
66862195 2
5ef115cd
MT
3import PIL.Image
4import PIL.ImageFilter
2b72638d 5import PIL.ImageOps
5ef115cd 6import io
440aba92
MT
7import ipaddress
8import location
5ef115cd 9import logging
440aba92 10import pycares
e96e445b 11import random
9523790a 12import re
440aba92 13import socket
e96e445b 14import string
75d9b3da 15import unicodedata
e96e445b 16
440aba92
MT
17from .decorators import *
18from .misc import Object
19
20# These lists are used to block access to the webapp
21BLOCKLISTS = (
22 "sbl.spamhaus.org",
23 "xbl.spamhaus.org",
24)
25
26BLACKLISTS = (
27 "b.barracudacentral.org",
28 "bl.spamcop.net",
29 "bl.blocklist.de",
30 "cbl.abuseat.org",
31 "dnsbl-1.uceprotect.net",
32 "dnsbl-2.uceprotect.net",
33 "dnsbl-3.uceprotect.net",
34 "dnsbl.abuse.ch",
35 "ix.dnsbl.manitu.net",
36 "pbl.spamhaus.org",
37 "sbl.spamhaus.org",
38 "xbl.spamhaus.org",
39 "zen.spamhaus.org",
40)
41
440aba92
MT
42class Address(Object):
43 def init(self, address):
44 self.address = ipaddress.ip_address(address)
45
46 def __str__(self):
47 return "%s" % self.address
48
49 @property
50 def family(self):
51 if isinstance(self.address, ipaddress.IPv6Address):
52 return socket.AF_INET6
53 elif isinstance(self.address, ipaddress.IPv4Address):
54 return socket.AF_INET
55
56 @lazy_property
57 def network(self):
7b05edde 58 return self.backend.location.lookup("%s" % self.address)
440aba92
MT
59
60 @property
61 def country_code(self):
62 if self.network:
63 return self.network.country_code
64
65 @lazy_property
66 def asn(self):
67 if self.network:
68 return self.network.asn
69
7b05edde
MT
70 @lazy_property
71 def autonomous_system(self):
72 if self.asn:
73 return self.backend.location.get_as(self.asn)
74
7ac8b12f
MT
75 def is_anonymous_proxy(self):
76 return self.network.has_flag(location.NETWORK_FLAG_ANONYMOUS_PROXY)
77
78 def is_satellite_provider(self):
79 return self.network.has_flag(location.NETWORK_FLAG_SATELLITE_PROVIDER)
80
81 def is_anycast(self):
82 return self.network.has_flag(location.NETWORK_FLAG_ANYCAST)
83
440aba92
MT
84 # Blacklist
85
86 def _make_blacklist_rr(self, blacklist):
87 if self.family == socket.AF_INET6:
88 octets = list(self.address.exploded.replace(":", ""))
89 elif self.family == socket.AF_INET:
90 octets = str(self.address).split(".")
91 else:
92 raise NotImplementedError("Unknown IP protocol")
93
94 # Reverse the list
95 octets.reverse()
96
97 # Append suffix
98 octets.append(blacklist)
99
100 return ".".join(octets)
101
102 async def _resolve_blacklist(self, blacklist):
103 return_code = None
104
105 # Get resource record name
106 rr = self._make_blacklist_rr(blacklist)
107
108 # Get query type from IP protocol version
109 if self.family == socket.AF_INET6:
110 type = pycares.QUERY_TYPE_AAAA
111 elif self.family == socket.AF_INET:
112 type = pycares.QUERY_TYPE_A
113 else:
114 raise NotImplementedError("Unknown IP protocol")
115
116 # Run query
117 try:
118 res = await self.backend.resolver.query(rr, type=type)
119 except IOError as e:
120 logging.warning(e)
121
122 return return_code, "%s" % e
123
124 # Not found
125 if not res:
126 logging.debug("%s is not blacklisted on %s" % (self, blacklist))
127 return return_code, None
128
129 # Extract return code from DNS response
130 for row in res:
131 return_code = row.host
132 break
133
134 # If the IP address is on a blacklist, we will try to fetch the TXT record
135 reason = await self.backend.resolver.query(rr, type=pycares.QUERY_TYPE_TXT)
136
137 # Log result
138 logging.debug("%s is blacklisted on %s: %s" % (self, blacklist, reason or "N/A"))
139
140 # Take the first reason
141 if reason:
142 for i in reason:
143 return return_code, i.text
144
145 # Blocked, but no reason
146 return return_code, None
147
148 async def get_blacklists(self):
149 blacklists = { bl : await self._resolve_blacklist(bl) for bl in BLACKLISTS }
150
151 return blacklists
152
440aba92
MT
153
154def format_asn(asn):
155 network = db.get_as(asn)
156
157 if network:
158 return "%s" % network
159
160 return "AS%s" % asn
161
84604476
MT
162def format_size(s, max_unit=None):
163 units = ("B", "kB", "MB", "GB", "TB")
66862195
MT
164
165 i = 0
166 while s >= 1024 and i < len(units) - 1:
167 s /= 1024
168 i += 1
169
84604476
MT
170 if max_unit and units[i] == max_unit:
171 break
172
66862195
MT
173 return "%.0f%s" % (s, units[i])
174
5ac74b02 175def format_time(s, shorter=True):
66862195
MT
176 #_ = handler.locale.translate
177 _ = lambda x: x
178
179 hrs, s = divmod(s, 3600)
180 min, s = divmod(s, 60)
181
182 if s >= 30:
183 min += 1
184
185 if shorter and not hrs:
186 return _("%(min)d min") % { "min" : min }
187
188 return _("%(hrs)d:%(min)02d hrs") % {"hrs" : hrs, "min" : min}
e96e445b
MT
189
190def random_string(length=8):
191 input_chars = string.ascii_letters + string.digits
192
193 r = (random.choice(input_chars) for i in range(length))
194
195 return "".join(r)
75d9b3da
MT
196
197def normalize(s):
198 # Remove any non-ASCII characters
199 try:
200 s = unicodedata.normalize("NFKD", s)
201 except TypeError:
202 pass
203
204 # Remove excessive whitespace
205 s = re.sub(r"[^\w]+", " ", s)
206
207 return "-".join(s.split())
5ef115cd 208
2de3dacc 209def generate_thumbnail(data, size, square=False, **args):
5ef115cd
MT
210 assert data, "No image data received"
211
212 image = PIL.Image.open(io.BytesIO(data))
213
214 # Save image format
215 format = image.format
216
217 # Remove any alpha-channels
218 if image.format == "JPEG" and not image.mode == "RGB":
219 # Make a white background
220 background = PIL.Image.new("RGBA", image.size, (255,255,255))
221
222 # Convert image to RGBA if not in RGBA, yet
223 if not image.mode == "RGBA":
224 image = image.convert("RGBA")
225
226 # Flatten both images together
227 flattened_image = PIL.Image.alpha_composite(background, image)
228
229 # Remove the alpha channel
230 image = flattened_image.convert("RGB")
231
232 # Resize the image to the desired resolution
2de3dacc 233 if square:
7e222133 234 image = PIL.ImageOps.fit(image, (size, size), PIL.Image.LANCZOS)
2de3dacc
MT
235 else:
236 image.thumbnail((size, size), PIL.Image.LANCZOS)
5ef115cd
MT
237
238 if image.format == "JPEG":
239 # Apply a gaussian blur to make compression easier
240 image = image.filter(PIL.ImageFilter.GaussianBlur(radius=0.05))
241
242 # Arguments to optimise the compression
243 args.update({
244 "subsampling" : "4:2:0",
245 "quality" : 70,
246 })
247
248 with io.BytesIO() as f:
249 # If writing out the image does not work with optimization,
250 # we try to write it out without any optimization.
251 try:
252 image.save(f, format, optimize=True, **args)
253 except:
254 image.save(f, format, **args)
255
256 return f.getvalue()