]> git.ipfire.org Git - ipfire.org.git/blob - src/backend/util.py
Update Christman campaign copy
[ipfire.org.git] / src / backend / util.py
1 #!/usr/bin/python3
2
3 import PIL.Image
4 import PIL.ImageFilter
5 import PIL.ImageOps
6 import io
7 import ipaddress
8 import location
9 import logging
10 import pycares
11 import random
12 import re
13 import socket
14 import string
15 import unicodedata
16
17 from .decorators import *
18 from .misc import Object
19
20 # These lists are used to block access to the webapp
21 BLOCKLISTS = (
22 "sbl.spamhaus.org",
23 "xbl.spamhaus.org",
24 )
25
26 BLACKLISTS = (
27 "b.barracudacentral.org",
28 "bl.spamcop.net",
29 "bl.blocklist.de",
30 "cbl.abuseat.org",
31 "dnsbl-1.uceprotect.net",
32 "dnsbl-2.uceprotect.net",
33 "dnsbl-3.uceprotect.net",
34 "dnsbl.abuse.ch",
35 "ix.dnsbl.manitu.net",
36 "pbl.spamhaus.org",
37 "sbl.spamhaus.org",
38 "xbl.spamhaus.org",
39 "zen.spamhaus.org",
40 )
41
42 class Address(Object):
43 def init(self, address):
44 self.address = ipaddress.ip_address(address)
45
46 def __str__(self):
47 return "%s" % self.address
48
49 @property
50 def family(self):
51 if isinstance(self.address, ipaddress.IPv6Address):
52 return socket.AF_INET6
53 elif isinstance(self.address, ipaddress.IPv4Address):
54 return socket.AF_INET
55
56 @lazy_property
57 def network(self):
58 return self.backend.location.lookup("%s" % self.address)
59
60 @property
61 def country_code(self):
62 if self.network:
63 return self.network.country_code
64
65 @lazy_property
66 def asn(self):
67 if self.network:
68 return self.network.asn
69
70 @lazy_property
71 def autonomous_system(self):
72 if self.asn:
73 return self.backend.location.get_as(self.asn)
74
75 def is_anonymous_proxy(self):
76 if self.network:
77 return self.network.has_flag(location.NETWORK_FLAG_ANONYMOUS_PROXY)
78
79 def is_satellite_provider(self):
80 if self.network:
81 return self.network.has_flag(location.NETWORK_FLAG_SATELLITE_PROVIDER)
82
83 def is_anycast(self):
84 if self.network:
85 return self.network.has_flag(location.NETWORK_FLAG_ANYCAST)
86
87 # Blacklist
88
89 def _make_blacklist_rr(self, blacklist):
90 if self.family == socket.AF_INET6:
91 octets = list(self.address.exploded.replace(":", ""))
92 elif self.family == socket.AF_INET:
93 octets = str(self.address).split(".")
94 else:
95 raise NotImplementedError("Unknown IP protocol")
96
97 # Reverse the list
98 octets.reverse()
99
100 # Append suffix
101 octets.append(blacklist)
102
103 return ".".join(octets)
104
105 async def _resolve_blacklist(self, blacklist):
106 return_code = None
107
108 # Get resource record name
109 rr = self._make_blacklist_rr(blacklist)
110
111 # Get query type from IP protocol version
112 if self.family == socket.AF_INET6:
113 type = pycares.QUERY_TYPE_AAAA
114 elif self.family == socket.AF_INET:
115 type = pycares.QUERY_TYPE_A
116 else:
117 raise NotImplementedError("Unknown IP protocol")
118
119 # Run query
120 try:
121 res = await self.backend.resolver.query(rr, type=type)
122 except IOError as e:
123 logging.warning(e)
124
125 return return_code, "%s" % e
126
127 # Not found
128 if not res:
129 logging.debug("%s is not blacklisted on %s" % (self, blacklist))
130 return return_code, None
131
132 # Extract return code from DNS response
133 for row in res:
134 return_code = row.host
135 break
136
137 # If the IP address is on a blacklist, we will try to fetch the TXT record
138 reason = await self.backend.resolver.query(rr, type=pycares.QUERY_TYPE_TXT)
139
140 # Log result
141 logging.debug("%s is blacklisted on %s: %s" % (self, blacklist, reason or "N/A"))
142
143 # Take the first reason
144 if reason:
145 for i in reason:
146 return return_code, i.text
147
148 # Blocked, but no reason
149 return return_code, None
150
151 async def get_blacklists(self):
152 blacklists = { bl : await self._resolve_blacklist(bl) for bl in BLACKLISTS }
153
154 return blacklists
155
156
157 def format_size(s, max_unit=None):
158 units = ("B", "kB", "MB", "GB", "TB")
159
160 i = 0
161 while s >= 1024 and i < len(units) - 1:
162 s /= 1024
163 i += 1
164
165 if max_unit and units[i] == max_unit:
166 break
167
168 return "%.0f%s" % (s, units[i])
169
170 def format_time(s, shorter=True):
171 #_ = handler.locale.translate
172 _ = lambda x: x
173
174 hrs, s = divmod(s, 3600)
175 min, s = divmod(s, 60)
176
177 if s >= 30:
178 min += 1
179
180 if shorter and not hrs:
181 return _("%(min)d min") % { "min" : min }
182
183 return _("%(hrs)d:%(min)02d hrs") % {"hrs" : hrs, "min" : min}
184
185 def random_string(length=8):
186 input_chars = string.ascii_letters + string.digits
187
188 r = (random.choice(input_chars) for i in range(length))
189
190 return "".join(r)
191
192 def normalize(s):
193 # Remove any non-ASCII characters
194 try:
195 s = unicodedata.normalize("NFKD", s)
196 except TypeError:
197 pass
198
199 # Remove excessive whitespace
200 s = re.sub(r"[^\w]+", " ", s)
201
202 return "-".join(s.split())
203
204 def generate_thumbnail(data, size, square=False, **args):
205 assert data, "No image data received"
206
207 image = PIL.Image.open(io.BytesIO(data))
208
209 # Save image format
210 format = image.format
211
212 # Remove any alpha-channels
213 if image.format == "JPEG" and not image.mode == "RGB":
214 # Make a white background
215 background = PIL.Image.new("RGBA", image.size, (255,255,255))
216
217 # Convert image to RGBA if not in RGBA, yet
218 if not image.mode == "RGBA":
219 image = image.convert("RGBA")
220
221 # Flatten both images together
222 flattened_image = PIL.Image.alpha_composite(background, image)
223
224 # Remove the alpha channel
225 image = flattened_image.convert("RGB")
226
227 # Resize the image to the desired resolution
228 if square:
229 image = PIL.ImageOps.fit(image, (size, size), PIL.Image.LANCZOS)
230 else:
231 image.thumbnail((size, size), PIL.Image.LANCZOS)
232
233 if image.format == "JPEG":
234 # Apply a gaussian blur to make compression easier
235 image = image.filter(PIL.ImageFilter.GaussianBlur(radius=0.05))
236
237 # Arguments to optimise the compression
238 args.update({
239 "subsampling" : "4:2:0",
240 "quality" : 70,
241 })
242
243 with io.BytesIO() as f:
244 # If writing out the image does not work with optimization,
245 # we try to write it out without any optimization.
246 try:
247 image.save(f, format, optimize=True, **args)
248 except:
249 image.save(f, format, **args)
250
251 return f.getvalue()