]> git.ipfire.org Git - ipfire.org.git/blob - src/backend/util.py
a44f228946ed1f37c5f40209c8a207e6801e0123
[ipfire.org.git] / src / backend / util.py
1 #!/usr/bin/python3
2
3 import PIL.Image
4 import PIL.ImageFilter
5 import PIL.ImageOps
6 import io
7 import ipaddress
8 import location
9 import logging
10 import pycares
11 import random
12 import re
13 import socket
14 import string
15 import unicodedata
16
17 from .decorators import *
18 from .misc import Object
19
20 # These lists are used to block access to the webapp
21 BLOCKLISTS = (
22 "sbl.spamhaus.org",
23 "xbl.spamhaus.org",
24 )
25
26 BLACKLISTS = (
27 "b.barracudacentral.org",
28 "bl.spamcop.net",
29 "bl.blocklist.de",
30 "cbl.abuseat.org",
31 "dnsbl-1.uceprotect.net",
32 "dnsbl-2.uceprotect.net",
33 "dnsbl-3.uceprotect.net",
34 "dnsbl.abuse.ch",
35 "ix.dnsbl.manitu.net",
36 "pbl.spamhaus.org",
37 "sbl.spamhaus.org",
38 "xbl.spamhaus.org",
39 "zen.spamhaus.org",
40 )
41
42 class Address(Object):
43 def init(self, address):
44 self.address = ipaddress.ip_address(address)
45
46 def __str__(self):
47 return "%s" % self.address
48
49 @property
50 def family(self):
51 if isinstance(self.address, ipaddress.IPv6Address):
52 return socket.AF_INET6
53 elif isinstance(self.address, ipaddress.IPv4Address):
54 return socket.AF_INET
55
56 @lazy_property
57 def network(self):
58 return self.backend.location.lookup("%s" % self.address)
59
60 @property
61 def country_code(self):
62 if self.network:
63 return self.network.country_code
64
65 @lazy_property
66 def asn(self):
67 if self.network:
68 return self.network.asn
69
70 @lazy_property
71 def autonomous_system(self):
72 if self.asn:
73 return self.backend.location.get_as(self.asn)
74
75 def is_anonymous_proxy(self):
76 return self.network.has_flag(location.NETWORK_FLAG_ANONYMOUS_PROXY)
77
78 def is_satellite_provider(self):
79 return self.network.has_flag(location.NETWORK_FLAG_SATELLITE_PROVIDER)
80
81 def is_anycast(self):
82 return self.network.has_flag(location.NETWORK_FLAG_ANYCAST)
83
84 # Blacklist
85
86 def _make_blacklist_rr(self, blacklist):
87 if self.family == socket.AF_INET6:
88 octets = list(self.address.exploded.replace(":", ""))
89 elif self.family == socket.AF_INET:
90 octets = str(self.address).split(".")
91 else:
92 raise NotImplementedError("Unknown IP protocol")
93
94 # Reverse the list
95 octets.reverse()
96
97 # Append suffix
98 octets.append(blacklist)
99
100 return ".".join(octets)
101
102 async def _resolve_blacklist(self, blacklist):
103 return_code = None
104
105 # Get resource record name
106 rr = self._make_blacklist_rr(blacklist)
107
108 # Get query type from IP protocol version
109 if self.family == socket.AF_INET6:
110 type = pycares.QUERY_TYPE_AAAA
111 elif self.family == socket.AF_INET:
112 type = pycares.QUERY_TYPE_A
113 else:
114 raise NotImplementedError("Unknown IP protocol")
115
116 # Run query
117 try:
118 res = await self.backend.resolver.query(rr, type=type)
119 except IOError as e:
120 logging.warning(e)
121
122 return return_code, "%s" % e
123
124 # Not found
125 if not res:
126 logging.debug("%s is not blacklisted on %s" % (self, blacklist))
127 return return_code, None
128
129 # Extract return code from DNS response
130 for row in res:
131 return_code = row.host
132 break
133
134 # If the IP address is on a blacklist, we will try to fetch the TXT record
135 reason = await self.backend.resolver.query(rr, type=pycares.QUERY_TYPE_TXT)
136
137 # Log result
138 logging.debug("%s is blacklisted on %s: %s" % (self, blacklist, reason or "N/A"))
139
140 # Take the first reason
141 if reason:
142 for i in reason:
143 return return_code, i.text
144
145 # Blocked, but no reason
146 return return_code, None
147
148 async def get_blacklists(self):
149 blacklists = { bl : await self._resolve_blacklist(bl) for bl in BLACKLISTS }
150
151 return blacklists
152
153
154 def format_asn(asn):
155 network = db.get_as(asn)
156
157 if network:
158 return "%s" % network
159
160 return "AS%s" % asn
161
162 def format_size(s, max_unit=None):
163 units = ("B", "kB", "MB", "GB", "TB")
164
165 i = 0
166 while s >= 1024 and i < len(units) - 1:
167 s /= 1024
168 i += 1
169
170 if max_unit and units[i] == max_unit:
171 break
172
173 return "%.0f%s" % (s, units[i])
174
175 def format_time(s, shorter=True):
176 #_ = handler.locale.translate
177 _ = lambda x: x
178
179 hrs, s = divmod(s, 3600)
180 min, s = divmod(s, 60)
181
182 if s >= 30:
183 min += 1
184
185 if shorter and not hrs:
186 return _("%(min)d min") % { "min" : min }
187
188 return _("%(hrs)d:%(min)02d hrs") % {"hrs" : hrs, "min" : min}
189
190 def random_string(length=8):
191 input_chars = string.ascii_letters + string.digits
192
193 r = (random.choice(input_chars) for i in range(length))
194
195 return "".join(r)
196
197 def normalize(s):
198 # Remove any non-ASCII characters
199 try:
200 s = unicodedata.normalize("NFKD", s)
201 except TypeError:
202 pass
203
204 # Remove excessive whitespace
205 s = re.sub(r"[^\w]+", " ", s)
206
207 return "-".join(s.split())
208
209 def generate_thumbnail(data, size, square=False, **args):
210 assert data, "No image data received"
211
212 image = PIL.Image.open(io.BytesIO(data))
213
214 # Save image format
215 format = image.format
216
217 # Remove any alpha-channels
218 if image.format == "JPEG" and not image.mode == "RGB":
219 # Make a white background
220 background = PIL.Image.new("RGBA", image.size, (255,255,255))
221
222 # Convert image to RGBA if not in RGBA, yet
223 if not image.mode == "RGBA":
224 image = image.convert("RGBA")
225
226 # Flatten both images together
227 flattened_image = PIL.Image.alpha_composite(background, image)
228
229 # Remove the alpha channel
230 image = flattened_image.convert("RGB")
231
232 # Resize the image to the desired resolution
233 if square:
234 image = PIL.ImageOps.fit(image, (size, size), PIL.Image.LANCZOS)
235 else:
236 image.thumbnail((size, size), PIL.Image.LANCZOS)
237
238 if image.format == "JPEG":
239 # Apply a gaussian blur to make compression easier
240 image = image.filter(PIL.ImageFilter.GaussianBlur(radius=0.05))
241
242 # Arguments to optimise the compression
243 args.update({
244 "subsampling" : "4:2:0",
245 "quality" : 70,
246 })
247
248 with io.BytesIO() as f:
249 # If writing out the image does not work with optimization,
250 # we try to write it out without any optimization.
251 try:
252 image.save(f, format, optimize=True, **args)
253 except:
254 image.save(f, format, **args)
255
256 return f.getvalue()