]> git.ipfire.org Git - ipfire.org.git/blame - src/backend/util.py
docs: Fix URL computation
[ipfire.org.git] / src / backend / util.py
CommitLineData
9523790a 1#!/usr/bin/python3
66862195 2
5ef115cd
MT
3import PIL.Image
4import PIL.ImageFilter
2b72638d 5import PIL.ImageOps
d6c41da2 6import datetime
5ef115cd 7import io
440aba92
MT
8import ipaddress
9import location
5ef115cd 10import logging
440aba92 11import pycares
e96e445b 12import random
9523790a 13import re
440aba92 14import socket
e96e445b 15import string
75d9b3da 16import unicodedata
e96e445b 17
440aba92
MT
18from .decorators import *
19from .misc import Object
20
21# These lists are used to block access to the webapp
22BLOCKLISTS = (
23 "sbl.spamhaus.org",
24 "xbl.spamhaus.org",
25)
26
27BLACKLISTS = (
28 "b.barracudacentral.org",
29 "bl.spamcop.net",
30 "bl.blocklist.de",
31 "cbl.abuseat.org",
32 "dnsbl-1.uceprotect.net",
33 "dnsbl-2.uceprotect.net",
34 "dnsbl-3.uceprotect.net",
35 "dnsbl.abuse.ch",
36 "ix.dnsbl.manitu.net",
37 "pbl.spamhaus.org",
38 "sbl.spamhaus.org",
39 "xbl.spamhaus.org",
40 "zen.spamhaus.org",
41)
42
440aba92
MT
43class Address(Object):
44 def init(self, address):
45 self.address = ipaddress.ip_address(address)
46
47 def __str__(self):
48 return "%s" % self.address
49
50 @property
51 def family(self):
52 if isinstance(self.address, ipaddress.IPv6Address):
53 return socket.AF_INET6
54 elif isinstance(self.address, ipaddress.IPv4Address):
55 return socket.AF_INET
56
57 @lazy_property
58 def network(self):
7b05edde 59 return self.backend.location.lookup("%s" % self.address)
440aba92
MT
60
61 @property
62 def country_code(self):
63 if self.network:
64 return self.network.country_code
65
66 @lazy_property
67 def asn(self):
68 if self.network:
69 return self.network.asn
70
7b05edde
MT
71 @lazy_property
72 def autonomous_system(self):
73 if self.asn:
74 return self.backend.location.get_as(self.asn)
75
7ac8b12f 76 def is_anonymous_proxy(self):
1c22909c
MT
77 if self.network:
78 return self.network.has_flag(location.NETWORK_FLAG_ANONYMOUS_PROXY)
7ac8b12f
MT
79
80 def is_satellite_provider(self):
1c22909c
MT
81 if self.network:
82 return self.network.has_flag(location.NETWORK_FLAG_SATELLITE_PROVIDER)
7ac8b12f
MT
83
84 def is_anycast(self):
1c22909c
MT
85 if self.network:
86 return self.network.has_flag(location.NETWORK_FLAG_ANYCAST)
7ac8b12f 87
440aba92
MT
88 # Blacklist
89
90 def _make_blacklist_rr(self, blacklist):
91 if self.family == socket.AF_INET6:
92 octets = list(self.address.exploded.replace(":", ""))
93 elif self.family == socket.AF_INET:
94 octets = str(self.address).split(".")
95 else:
96 raise NotImplementedError("Unknown IP protocol")
97
98 # Reverse the list
99 octets.reverse()
100
101 # Append suffix
102 octets.append(blacklist)
103
104 return ".".join(octets)
105
106 async def _resolve_blacklist(self, blacklist):
107 return_code = None
108
109 # Get resource record name
110 rr = self._make_blacklist_rr(blacklist)
111
112 # Get query type from IP protocol version
113 if self.family == socket.AF_INET6:
114 type = pycares.QUERY_TYPE_AAAA
115 elif self.family == socket.AF_INET:
116 type = pycares.QUERY_TYPE_A
117 else:
118 raise NotImplementedError("Unknown IP protocol")
119
120 # Run query
121 try:
122 res = await self.backend.resolver.query(rr, type=type)
123 except IOError as e:
124 logging.warning(e)
125
126 return return_code, "%s" % e
127
128 # Not found
129 if not res:
130 logging.debug("%s is not blacklisted on %s" % (self, blacklist))
131 return return_code, None
132
133 # Extract return code from DNS response
134 for row in res:
135 return_code = row.host
136 break
137
138 # If the IP address is on a blacklist, we will try to fetch the TXT record
139 reason = await self.backend.resolver.query(rr, type=pycares.QUERY_TYPE_TXT)
140
141 # Log result
142 logging.debug("%s is blacklisted on %s: %s" % (self, blacklist, reason or "N/A"))
143
144 # Take the first reason
145 if reason:
146 for i in reason:
147 return return_code, i.text
148
149 # Blocked, but no reason
150 return return_code, None
151
152 async def get_blacklists(self):
153 blacklists = { bl : await self._resolve_blacklist(bl) for bl in BLACKLISTS }
154
155 return blacklists
156
440aba92 157
84604476
MT
158def format_size(s, max_unit=None):
159 units = ("B", "kB", "MB", "GB", "TB")
66862195
MT
160
161 i = 0
162 while s >= 1024 and i < len(units) - 1:
163 s /= 1024
164 i += 1
165
84604476
MT
166 if max_unit and units[i] == max_unit:
167 break
168
66862195
MT
169 return "%.0f%s" % (s, units[i])
170
5ac74b02 171def format_time(s, shorter=True):
66862195
MT
172 #_ = handler.locale.translate
173 _ = lambda x: x
174
d6c41da2
MT
175 if isinstance(s, datetime.timedelta):
176 s = s.total_seconds()
177
66862195
MT
178 hrs, s = divmod(s, 3600)
179 min, s = divmod(s, 60)
180
181 if s >= 30:
182 min += 1
183
184 if shorter and not hrs:
185 return _("%(min)d min") % { "min" : min }
186
187 return _("%(hrs)d:%(min)02d hrs") % {"hrs" : hrs, "min" : min}
e96e445b
MT
188
189def random_string(length=8):
190 input_chars = string.ascii_letters + string.digits
191
192 r = (random.choice(input_chars) for i in range(length))
193
194 return "".join(r)
75d9b3da
MT
195
196def normalize(s):
197 # Remove any non-ASCII characters
198 try:
199 s = unicodedata.normalize("NFKD", s)
200 except TypeError:
201 pass
202
203 # Remove excessive whitespace
204 s = re.sub(r"[^\w]+", " ", s)
205
206 return "-".join(s.split())
5ef115cd 207
2de3dacc 208def generate_thumbnail(data, size, square=False, **args):
5ef115cd
MT
209 assert data, "No image data received"
210
4bb517e3
MT
211 try:
212 image = PIL.Image.open(io.BytesIO(data))
213
214 # If we cannot open the image, we return it in raw form
215 except PIL.UnidentifiedImageError as e:
216 return data
5ef115cd
MT
217
218 # Save image format
219 format = image.format
220
221 # Remove any alpha-channels
222 if image.format == "JPEG" and not image.mode == "RGB":
223 # Make a white background
224 background = PIL.Image.new("RGBA", image.size, (255,255,255))
225
226 # Convert image to RGBA if not in RGBA, yet
227 if not image.mode == "RGBA":
228 image = image.convert("RGBA")
229
230 # Flatten both images together
231 flattened_image = PIL.Image.alpha_composite(background, image)
232
233 # Remove the alpha channel
234 image = flattened_image.convert("RGB")
235
236 # Resize the image to the desired resolution
2de3dacc 237 if square:
7e222133 238 image = PIL.ImageOps.fit(image, (size, size), PIL.Image.LANCZOS)
2de3dacc
MT
239 else:
240 image.thumbnail((size, size), PIL.Image.LANCZOS)
5ef115cd
MT
241
242 if image.format == "JPEG":
243 # Apply a gaussian blur to make compression easier
244 image = image.filter(PIL.ImageFilter.GaussianBlur(radius=0.05))
245
246 # Arguments to optimise the compression
247 args.update({
248 "subsampling" : "4:2:0",
249 "quality" : 70,
250 })
251
252 with io.BytesIO() as f:
253 # If writing out the image does not work with optimization,
254 # we try to write it out without any optimization.
255 try:
256 image.save(f, format, optimize=True, **args)
257 except:
258 image.save(f, format, **args)
259
260 return f.getvalue()