]> git.ipfire.org Git - ipfire.org.git/blame - src/backend/util.py
wiki: Only match usernames when a word starts with @
[ipfire.org.git] / src / backend / util.py
CommitLineData
9523790a 1#!/usr/bin/python3
66862195 2
6317802a 3import PIL.ExifTags
5ef115cd
MT
4import PIL.Image
5import PIL.ImageFilter
2b72638d 6import PIL.ImageOps
d6c41da2 7import datetime
5ef115cd 8import io
440aba92
MT
9import ipaddress
10import location
5ef115cd 11import logging
440aba92 12import pycares
e96e445b 13import random
9523790a 14import re
440aba92 15import socket
e96e445b 16import string
75d9b3da 17import unicodedata
e96e445b 18
440aba92
MT
19from .decorators import *
20from .misc import Object
21
22# These lists are used to block access to the webapp
23BLOCKLISTS = (
24 "sbl.spamhaus.org",
25 "xbl.spamhaus.org",
26)
27
440aba92
MT
28class Address(Object):
29 def init(self, address):
30 self.address = ipaddress.ip_address(address)
31
32 def __str__(self):
33 return "%s" % self.address
34
35 @property
36 def family(self):
37 if isinstance(self.address, ipaddress.IPv6Address):
38 return socket.AF_INET6
39 elif isinstance(self.address, ipaddress.IPv4Address):
40 return socket.AF_INET
41
42 @lazy_property
43 def network(self):
7b05edde 44 return self.backend.location.lookup("%s" % self.address)
440aba92
MT
45
46 @property
47 def country_code(self):
48 if self.network:
49 return self.network.country_code
50
51 @lazy_property
52 def asn(self):
53 if self.network:
54 return self.network.asn
55
7b05edde
MT
56 @lazy_property
57 def autonomous_system(self):
58 if self.asn:
59 return self.backend.location.get_as(self.asn)
60
7ac8b12f 61 def is_anonymous_proxy(self):
1c22909c
MT
62 if self.network:
63 return self.network.has_flag(location.NETWORK_FLAG_ANONYMOUS_PROXY)
7ac8b12f
MT
64
65 def is_satellite_provider(self):
1c22909c
MT
66 if self.network:
67 return self.network.has_flag(location.NETWORK_FLAG_SATELLITE_PROVIDER)
7ac8b12f
MT
68
69 def is_anycast(self):
1c22909c
MT
70 if self.network:
71 return self.network.has_flag(location.NETWORK_FLAG_ANYCAST)
7ac8b12f 72
440aba92
MT
73 # Blacklist
74
75 def _make_blacklist_rr(self, blacklist):
76 if self.family == socket.AF_INET6:
77 octets = list(self.address.exploded.replace(":", ""))
78 elif self.family == socket.AF_INET:
79 octets = str(self.address).split(".")
80 else:
81 raise NotImplementedError("Unknown IP protocol")
82
83 # Reverse the list
84 octets.reverse()
85
86 # Append suffix
87 octets.append(blacklist)
88
89 return ".".join(octets)
90
91 async def _resolve_blacklist(self, blacklist):
92 return_code = None
93
94 # Get resource record name
95 rr = self._make_blacklist_rr(blacklist)
96
97 # Get query type from IP protocol version
98 if self.family == socket.AF_INET6:
99 type = pycares.QUERY_TYPE_AAAA
100 elif self.family == socket.AF_INET:
101 type = pycares.QUERY_TYPE_A
102 else:
103 raise NotImplementedError("Unknown IP protocol")
104
105 # Run query
106 try:
107 res = await self.backend.resolver.query(rr, type=type)
108 except IOError as e:
109 logging.warning(e)
110
111 return return_code, "%s" % e
112
113 # Not found
114 if not res:
115 logging.debug("%s is not blacklisted on %s" % (self, blacklist))
116 return return_code, None
117
118 # Extract return code from DNS response
119 for row in res:
120 return_code = row.host
121 break
122
123 # If the IP address is on a blacklist, we will try to fetch the TXT record
124 reason = await self.backend.resolver.query(rr, type=pycares.QUERY_TYPE_TXT)
125
126 # Log result
127 logging.debug("%s is blacklisted on %s: %s" % (self, blacklist, reason or "N/A"))
128
129 # Take the first reason
130 if reason:
131 for i in reason:
132 return return_code, i.text
133
134 # Blocked, but no reason
135 return return_code, None
136
440aba92 137
84604476
MT
138def format_size(s, max_unit=None):
139 units = ("B", "kB", "MB", "GB", "TB")
66862195
MT
140
141 i = 0
142 while s >= 1024 and i < len(units) - 1:
143 s /= 1024
144 i += 1
145
84604476
MT
146 if max_unit and units[i] == max_unit:
147 break
148
66862195
MT
149 return "%.0f%s" % (s, units[i])
150
5ac74b02 151def format_time(s, shorter=True):
66862195
MT
152 #_ = handler.locale.translate
153 _ = lambda x: x
154
d6c41da2
MT
155 if isinstance(s, datetime.timedelta):
156 s = s.total_seconds()
157
66862195
MT
158 hrs, s = divmod(s, 3600)
159 min, s = divmod(s, 60)
160
161 if s >= 30:
162 min += 1
163
164 if shorter and not hrs:
165 return _("%(min)d min") % { "min" : min }
166
167 return _("%(hrs)d:%(min)02d hrs") % {"hrs" : hrs, "min" : min}
e96e445b
MT
168
169def random_string(length=8):
170 input_chars = string.ascii_letters + string.digits
171
172 r = (random.choice(input_chars) for i in range(length))
173
174 return "".join(r)
75d9b3da
MT
175
176def normalize(s):
177 # Remove any non-ASCII characters
178 try:
179 s = unicodedata.normalize("NFKD", s)
180 except TypeError:
181 pass
182
183 # Remove excessive whitespace
184 s = re.sub(r"[^\w]+", " ", s)
185
186 return "-".join(s.split())
5ef115cd 187
60b2a858 188def generate_thumbnail(image, size, square=False, format=None, quality=None, **args):
221cd6bc 189 assert image, "No image data received"
5ef115cd 190
221cd6bc
MT
191 if not isinstance(image, PIL.Image.Image):
192 image = io.BytesIO(image)
193
194 try:
195 image = PIL.Image.open(image)
4bb517e3 196
221cd6bc
MT
197 # If we cannot open the image, we return it in raw form
198 except PIL.UnidentifiedImageError as e:
e6340233 199 return image.getvalue()
5ef115cd
MT
200
201 # Save image format
221cd6bc 202 format = format or image.format or "JPEG"
5ef115cd 203
6317802a 204 # Fetch any EXIF data
221cd6bc
MT
205 try:
206 exif = image._getexif()
207 except AttributeError as e:
208 exif = None
6317802a
MT
209
210 # Rotate the image
211 if exif:
212 for tag in PIL.ExifTags.TAGS:
213 if PIL.ExifTags.TAGS[tag] == "Orientation":
fb2118a7
MT
214 try:
215 if exif[tag] == 3:
216 image = image.rotate(180, expand=True)
217 elif exif[tag] == 6:
218 image = image.rotate(270, expand=True)
219 elif exif[tag] == 8:
220 image = image.rotate( 90, expand=True)
221
222 # Ignore if the orientation isn't encoded
223 except KeyError:
224 pass
6317802a 225
5ef115cd 226 # Remove any alpha-channels
221cd6bc 227 if format == "JPEG" and not image.mode == "RGB":
5ef115cd
MT
228 # Make a white background
229 background = PIL.Image.new("RGBA", image.size, (255,255,255))
230
231 # Convert image to RGBA if not in RGBA, yet
232 if not image.mode == "RGBA":
233 image = image.convert("RGBA")
234
235 # Flatten both images together
236 flattened_image = PIL.Image.alpha_composite(background, image)
237
238 # Remove the alpha channel
239 image = flattened_image.convert("RGB")
240
241 # Resize the image to the desired resolution
2de3dacc 242 if square:
7e222133 243 image = PIL.ImageOps.fit(image, (size, size), PIL.Image.LANCZOS)
2de3dacc
MT
244 else:
245 image.thumbnail((size, size), PIL.Image.LANCZOS)
5ef115cd 246
98c8bd26 247 # Apply a gaussian blur to make compression easier
3e3886a9
MT
248 try:
249 image = image.filter(PIL.ImageFilter.GaussianBlur(radius=0.05))
250 except ValueError:
251 pass
98c8bd26
MT
252
253 # Arguments to optimise the compression
254 args.update({
255 "subsampling" : "4:2:0",
60b2a858 256 "quality" : quality or 72,
98c8bd26
MT
257 })
258
5ef115cd 259 if image.format == "JPEG":
98c8bd26 260 args.update({
60b2a858 261 "qtables" : "web_low",
98c8bd26 262 })
5ef115cd 263
98c8bd26 264 elif image.format == "WEBP":
5ef115cd 265 args.update({
60b2a858 266 "lossless" : False,
5ef115cd
MT
267 })
268
269 with io.BytesIO() as f:
270 # If writing out the image does not work with optimization,
271 # we try to write it out without any optimization.
272 try:
273 image.save(f, format, optimize=True, **args)
274 except:
275 image.save(f, format, **args)
276
277 return f.getvalue()