]> git.ipfire.org Git - ipfire.org.git/blob - src/backend/util.py
wiki: Only match usernames when a word starts with @
[ipfire.org.git] / src / backend / util.py
1 #!/usr/bin/python3
2
3 import PIL.ExifTags
4 import PIL.Image
5 import PIL.ImageFilter
6 import PIL.ImageOps
7 import datetime
8 import io
9 import ipaddress
10 import location
11 import logging
12 import pycares
13 import random
14 import re
15 import socket
16 import string
17 import unicodedata
18
19 from .decorators import *
20 from .misc import Object
21
22 # These lists are used to block access to the webapp
23 BLOCKLISTS = (
24 "sbl.spamhaus.org",
25 "xbl.spamhaus.org",
26 )
27
28 class Address(Object):
29 def init(self, address):
30 self.address = ipaddress.ip_address(address)
31
32 def __str__(self):
33 return "%s" % self.address
34
35 @property
36 def family(self):
37 if isinstance(self.address, ipaddress.IPv6Address):
38 return socket.AF_INET6
39 elif isinstance(self.address, ipaddress.IPv4Address):
40 return socket.AF_INET
41
42 @lazy_property
43 def network(self):
44 return self.backend.location.lookup("%s" % self.address)
45
46 @property
47 def country_code(self):
48 if self.network:
49 return self.network.country_code
50
51 @lazy_property
52 def asn(self):
53 if self.network:
54 return self.network.asn
55
56 @lazy_property
57 def autonomous_system(self):
58 if self.asn:
59 return self.backend.location.get_as(self.asn)
60
61 def is_anonymous_proxy(self):
62 if self.network:
63 return self.network.has_flag(location.NETWORK_FLAG_ANONYMOUS_PROXY)
64
65 def is_satellite_provider(self):
66 if self.network:
67 return self.network.has_flag(location.NETWORK_FLAG_SATELLITE_PROVIDER)
68
69 def is_anycast(self):
70 if self.network:
71 return self.network.has_flag(location.NETWORK_FLAG_ANYCAST)
72
73 # Blacklist
74
75 def _make_blacklist_rr(self, blacklist):
76 if self.family == socket.AF_INET6:
77 octets = list(self.address.exploded.replace(":", ""))
78 elif self.family == socket.AF_INET:
79 octets = str(self.address).split(".")
80 else:
81 raise NotImplementedError("Unknown IP protocol")
82
83 # Reverse the list
84 octets.reverse()
85
86 # Append suffix
87 octets.append(blacklist)
88
89 return ".".join(octets)
90
91 async def _resolve_blacklist(self, blacklist):
92 return_code = None
93
94 # Get resource record name
95 rr = self._make_blacklist_rr(blacklist)
96
97 # Get query type from IP protocol version
98 if self.family == socket.AF_INET6:
99 type = pycares.QUERY_TYPE_AAAA
100 elif self.family == socket.AF_INET:
101 type = pycares.QUERY_TYPE_A
102 else:
103 raise NotImplementedError("Unknown IP protocol")
104
105 # Run query
106 try:
107 res = await self.backend.resolver.query(rr, type=type)
108 except IOError as e:
109 logging.warning(e)
110
111 return return_code, "%s" % e
112
113 # Not found
114 if not res:
115 logging.debug("%s is not blacklisted on %s" % (self, blacklist))
116 return return_code, None
117
118 # Extract return code from DNS response
119 for row in res:
120 return_code = row.host
121 break
122
123 # If the IP address is on a blacklist, we will try to fetch the TXT record
124 reason = await self.backend.resolver.query(rr, type=pycares.QUERY_TYPE_TXT)
125
126 # Log result
127 logging.debug("%s is blacklisted on %s: %s" % (self, blacklist, reason or "N/A"))
128
129 # Take the first reason
130 if reason:
131 for i in reason:
132 return return_code, i.text
133
134 # Blocked, but no reason
135 return return_code, None
136
137
138 def format_size(s, max_unit=None):
139 units = ("B", "kB", "MB", "GB", "TB")
140
141 i = 0
142 while s >= 1024 and i < len(units) - 1:
143 s /= 1024
144 i += 1
145
146 if max_unit and units[i] == max_unit:
147 break
148
149 return "%.0f%s" % (s, units[i])
150
151 def format_time(s, shorter=True):
152 #_ = handler.locale.translate
153 _ = lambda x: x
154
155 if isinstance(s, datetime.timedelta):
156 s = s.total_seconds()
157
158 hrs, s = divmod(s, 3600)
159 min, s = divmod(s, 60)
160
161 if s >= 30:
162 min += 1
163
164 if shorter and not hrs:
165 return _("%(min)d min") % { "min" : min }
166
167 return _("%(hrs)d:%(min)02d hrs") % {"hrs" : hrs, "min" : min}
168
169 def random_string(length=8):
170 input_chars = string.ascii_letters + string.digits
171
172 r = (random.choice(input_chars) for i in range(length))
173
174 return "".join(r)
175
176 def normalize(s):
177 # Remove any non-ASCII characters
178 try:
179 s = unicodedata.normalize("NFKD", s)
180 except TypeError:
181 pass
182
183 # Remove excessive whitespace
184 s = re.sub(r"[^\w]+", " ", s)
185
186 return "-".join(s.split())
187
188 def generate_thumbnail(image, size, square=False, format=None, quality=None, **args):
189 assert image, "No image data received"
190
191 if not isinstance(image, PIL.Image.Image):
192 image = io.BytesIO(image)
193
194 try:
195 image = PIL.Image.open(image)
196
197 # If we cannot open the image, we return it in raw form
198 except PIL.UnidentifiedImageError as e:
199 return image.getvalue()
200
201 # Save image format
202 format = format or image.format or "JPEG"
203
204 # Fetch any EXIF data
205 try:
206 exif = image._getexif()
207 except AttributeError as e:
208 exif = None
209
210 # Rotate the image
211 if exif:
212 for tag in PIL.ExifTags.TAGS:
213 if PIL.ExifTags.TAGS[tag] == "Orientation":
214 try:
215 if exif[tag] == 3:
216 image = image.rotate(180, expand=True)
217 elif exif[tag] == 6:
218 image = image.rotate(270, expand=True)
219 elif exif[tag] == 8:
220 image = image.rotate( 90, expand=True)
221
222 # Ignore if the orientation isn't encoded
223 except KeyError:
224 pass
225
226 # Remove any alpha-channels
227 if format == "JPEG" and not image.mode == "RGB":
228 # Make a white background
229 background = PIL.Image.new("RGBA", image.size, (255,255,255))
230
231 # Convert image to RGBA if not in RGBA, yet
232 if not image.mode == "RGBA":
233 image = image.convert("RGBA")
234
235 # Flatten both images together
236 flattened_image = PIL.Image.alpha_composite(background, image)
237
238 # Remove the alpha channel
239 image = flattened_image.convert("RGB")
240
241 # Resize the image to the desired resolution
242 if square:
243 image = PIL.ImageOps.fit(image, (size, size), PIL.Image.LANCZOS)
244 else:
245 image.thumbnail((size, size), PIL.Image.LANCZOS)
246
247 # Apply a gaussian blur to make compression easier
248 try:
249 image = image.filter(PIL.ImageFilter.GaussianBlur(radius=0.05))
250 except ValueError:
251 pass
252
253 # Arguments to optimise the compression
254 args.update({
255 "subsampling" : "4:2:0",
256 "quality" : quality or 72,
257 })
258
259 if image.format == "JPEG":
260 args.update({
261 "qtables" : "web_low",
262 })
263
264 elif image.format == "WEBP":
265 args.update({
266 "lossless" : False,
267 })
268
269 with io.BytesIO() as f:
270 # If writing out the image does not work with optimization,
271 # we try to write it out without any optimization.
272 try:
273 image.save(f, format, optimize=True, **args)
274 except:
275 image.save(f, format, **args)
276
277 return f.getvalue()