]> git.ipfire.org Git - ipfire.org.git/blob - src/backend/util.py
util: Optionally rotate JPEG images if coded into EXIF
[ipfire.org.git] / src / backend / util.py
1 #!/usr/bin/python3
2
3 import PIL.ExifTags
4 import PIL.Image
5 import PIL.ImageFilter
6 import PIL.ImageOps
7 import datetime
8 import io
9 import ipaddress
10 import location
11 import logging
12 import pycares
13 import random
14 import re
15 import socket
16 import string
17 import unicodedata
18
19 from .decorators import *
20 from .misc import Object
21
22 # These lists are used to block access to the webapp
23 BLOCKLISTS = (
24 "sbl.spamhaus.org",
25 "xbl.spamhaus.org",
26 )
27
28 BLACKLISTS = (
29 "b.barracudacentral.org",
30 "bl.spamcop.net",
31 "bl.blocklist.de",
32 "cbl.abuseat.org",
33 "dnsbl-1.uceprotect.net",
34 "dnsbl-2.uceprotect.net",
35 "dnsbl-3.uceprotect.net",
36 "dnsbl.abuse.ch",
37 "ix.dnsbl.manitu.net",
38 "pbl.spamhaus.org",
39 "sbl.spamhaus.org",
40 "xbl.spamhaus.org",
41 "zen.spamhaus.org",
42 )
43
44 class Address(Object):
45 def init(self, address):
46 self.address = ipaddress.ip_address(address)
47
48 def __str__(self):
49 return "%s" % self.address
50
51 @property
52 def family(self):
53 if isinstance(self.address, ipaddress.IPv6Address):
54 return socket.AF_INET6
55 elif isinstance(self.address, ipaddress.IPv4Address):
56 return socket.AF_INET
57
58 @lazy_property
59 def network(self):
60 return self.backend.location.lookup("%s" % self.address)
61
62 @property
63 def country_code(self):
64 if self.network:
65 return self.network.country_code
66
67 @lazy_property
68 def asn(self):
69 if self.network:
70 return self.network.asn
71
72 @lazy_property
73 def autonomous_system(self):
74 if self.asn:
75 return self.backend.location.get_as(self.asn)
76
77 def is_anonymous_proxy(self):
78 if self.network:
79 return self.network.has_flag(location.NETWORK_FLAG_ANONYMOUS_PROXY)
80
81 def is_satellite_provider(self):
82 if self.network:
83 return self.network.has_flag(location.NETWORK_FLAG_SATELLITE_PROVIDER)
84
85 def is_anycast(self):
86 if self.network:
87 return self.network.has_flag(location.NETWORK_FLAG_ANYCAST)
88
89 # Blacklist
90
91 def _make_blacklist_rr(self, blacklist):
92 if self.family == socket.AF_INET6:
93 octets = list(self.address.exploded.replace(":", ""))
94 elif self.family == socket.AF_INET:
95 octets = str(self.address).split(".")
96 else:
97 raise NotImplementedError("Unknown IP protocol")
98
99 # Reverse the list
100 octets.reverse()
101
102 # Append suffix
103 octets.append(blacklist)
104
105 return ".".join(octets)
106
107 async def _resolve_blacklist(self, blacklist):
108 return_code = None
109
110 # Get resource record name
111 rr = self._make_blacklist_rr(blacklist)
112
113 # Get query type from IP protocol version
114 if self.family == socket.AF_INET6:
115 type = pycares.QUERY_TYPE_AAAA
116 elif self.family == socket.AF_INET:
117 type = pycares.QUERY_TYPE_A
118 else:
119 raise NotImplementedError("Unknown IP protocol")
120
121 # Run query
122 try:
123 res = await self.backend.resolver.query(rr, type=type)
124 except IOError as e:
125 logging.warning(e)
126
127 return return_code, "%s" % e
128
129 # Not found
130 if not res:
131 logging.debug("%s is not blacklisted on %s" % (self, blacklist))
132 return return_code, None
133
134 # Extract return code from DNS response
135 for row in res:
136 return_code = row.host
137 break
138
139 # If the IP address is on a blacklist, we will try to fetch the TXT record
140 reason = await self.backend.resolver.query(rr, type=pycares.QUERY_TYPE_TXT)
141
142 # Log result
143 logging.debug("%s is blacklisted on %s: %s" % (self, blacklist, reason or "N/A"))
144
145 # Take the first reason
146 if reason:
147 for i in reason:
148 return return_code, i.text
149
150 # Blocked, but no reason
151 return return_code, None
152
153 async def get_blacklists(self):
154 blacklists = { bl : await self._resolve_blacklist(bl) for bl in BLACKLISTS }
155
156 return blacklists
157
158
159 def format_size(s, max_unit=None):
160 units = ("B", "kB", "MB", "GB", "TB")
161
162 i = 0
163 while s >= 1024 and i < len(units) - 1:
164 s /= 1024
165 i += 1
166
167 if max_unit and units[i] == max_unit:
168 break
169
170 return "%.0f%s" % (s, units[i])
171
172 def format_time(s, shorter=True):
173 #_ = handler.locale.translate
174 _ = lambda x: x
175
176 if isinstance(s, datetime.timedelta):
177 s = s.total_seconds()
178
179 hrs, s = divmod(s, 3600)
180 min, s = divmod(s, 60)
181
182 if s >= 30:
183 min += 1
184
185 if shorter and not hrs:
186 return _("%(min)d min") % { "min" : min }
187
188 return _("%(hrs)d:%(min)02d hrs") % {"hrs" : hrs, "min" : min}
189
190 def random_string(length=8):
191 input_chars = string.ascii_letters + string.digits
192
193 r = (random.choice(input_chars) for i in range(length))
194
195 return "".join(r)
196
197 def normalize(s):
198 # Remove any non-ASCII characters
199 try:
200 s = unicodedata.normalize("NFKD", s)
201 except TypeError:
202 pass
203
204 # Remove excessive whitespace
205 s = re.sub(r"[^\w]+", " ", s)
206
207 return "-".join(s.split())
208
209 def generate_thumbnail(data, size, square=False, **args):
210 assert data, "No image data received"
211
212 try:
213 image = PIL.Image.open(io.BytesIO(data))
214
215 # If we cannot open the image, we return it in raw form
216 except PIL.UnidentifiedImageError as e:
217 return data
218
219 # Save image format
220 format = image.format
221
222 # Fetch any EXIF data
223 exif = image._getexif()
224
225 # Rotate the image
226 if exif:
227 for tag in PIL.ExifTags.TAGS:
228 if PIL.ExifTags.TAGS[tag] == "Orientation":
229 if exif[tag] == 3:
230 image = image.rotate(180, expand=True)
231 elif exif[tag] == 6:
232 image = image.rotate(270, expand=True)
233 elif exif[tag] == 8:
234 image = image.rotate( 90, expand=True)
235
236 # Remove any alpha-channels
237 if image.format == "JPEG" and not image.mode == "RGB":
238 # Make a white background
239 background = PIL.Image.new("RGBA", image.size, (255,255,255))
240
241 # Convert image to RGBA if not in RGBA, yet
242 if not image.mode == "RGBA":
243 image = image.convert("RGBA")
244
245 # Flatten both images together
246 flattened_image = PIL.Image.alpha_composite(background, image)
247
248 # Remove the alpha channel
249 image = flattened_image.convert("RGB")
250
251 # Resize the image to the desired resolution
252 if square:
253 image = PIL.ImageOps.fit(image, (size, size), PIL.Image.LANCZOS)
254 else:
255 image.thumbnail((size, size), PIL.Image.LANCZOS)
256
257 if image.format == "JPEG":
258 # Apply a gaussian blur to make compression easier
259 image = image.filter(PIL.ImageFilter.GaussianBlur(radius=0.05))
260
261 # Arguments to optimise the compression
262 args.update({
263 "subsampling" : "4:2:0",
264 "quality" : 70,
265 })
266
267 with io.BytesIO() as f:
268 # If writing out the image does not work with optimization,
269 # we try to write it out without any optimization.
270 try:
271 image.save(f, format, optimize=True, **args)
272 except:
273 image.save(f, format, **args)
274
275 return f.getvalue()