]> git.ipfire.org Git - ipfire.org.git/blame - src/backend/util.py
location: Disable caching for the index page
[ipfire.org.git] / src / backend / util.py
CommitLineData
9523790a 1#!/usr/bin/python3
66862195 2
5ef115cd
MT
3import PIL.Image
4import PIL.ImageFilter
2b72638d 5import PIL.ImageOps
5ef115cd 6import io
440aba92
MT
7import ipaddress
8import location
5ef115cd 9import logging
440aba92 10import pycares
e96e445b 11import random
9523790a 12import re
440aba92 13import socket
e96e445b 14import string
75d9b3da 15import unicodedata
e96e445b 16
440aba92
MT
17from .decorators import *
18from .misc import Object
19
20# These lists are used to block access to the webapp
21BLOCKLISTS = (
22 "sbl.spamhaus.org",
23 "xbl.spamhaus.org",
24)
25
26BLACKLISTS = (
27 "b.barracudacentral.org",
28 "bl.spamcop.net",
29 "bl.blocklist.de",
30 "cbl.abuseat.org",
31 "dnsbl-1.uceprotect.net",
32 "dnsbl-2.uceprotect.net",
33 "dnsbl-3.uceprotect.net",
34 "dnsbl.abuse.ch",
35 "ix.dnsbl.manitu.net",
36 "pbl.spamhaus.org",
37 "sbl.spamhaus.org",
38 "xbl.spamhaus.org",
39 "zen.spamhaus.org",
40)
41
42# Open location database
43db = location.Database("/var/lib/location/database.db")
44
45class Address(Object):
46 def init(self, address):
47 self.address = ipaddress.ip_address(address)
48
49 def __str__(self):
50 return "%s" % self.address
51
52 @property
53 def family(self):
54 if isinstance(self.address, ipaddress.IPv6Address):
55 return socket.AF_INET6
56 elif isinstance(self.address, ipaddress.IPv4Address):
57 return socket.AF_INET
58
59 @lazy_property
60 def network(self):
61 return db.lookup("%s" % self.address)
62
63 @property
64 def country_code(self):
65 if self.network:
66 return self.network.country_code
67
68 @lazy_property
69 def asn(self):
70 if self.network:
71 return self.network.asn
72
73 # Blacklist
74
75 def _make_blacklist_rr(self, blacklist):
76 if self.family == socket.AF_INET6:
77 octets = list(self.address.exploded.replace(":", ""))
78 elif self.family == socket.AF_INET:
79 octets = str(self.address).split(".")
80 else:
81 raise NotImplementedError("Unknown IP protocol")
82
83 # Reverse the list
84 octets.reverse()
85
86 # Append suffix
87 octets.append(blacklist)
88
89 return ".".join(octets)
90
91 async def _resolve_blacklist(self, blacklist):
92 return_code = None
93
94 # Get resource record name
95 rr = self._make_blacklist_rr(blacklist)
96
97 # Get query type from IP protocol version
98 if self.family == socket.AF_INET6:
99 type = pycares.QUERY_TYPE_AAAA
100 elif self.family == socket.AF_INET:
101 type = pycares.QUERY_TYPE_A
102 else:
103 raise NotImplementedError("Unknown IP protocol")
104
105 # Run query
106 try:
107 res = await self.backend.resolver.query(rr, type=type)
108 except IOError as e:
109 logging.warning(e)
110
111 return return_code, "%s" % e
112
113 # Not found
114 if not res:
115 logging.debug("%s is not blacklisted on %s" % (self, blacklist))
116 return return_code, None
117
118 # Extract return code from DNS response
119 for row in res:
120 return_code = row.host
121 break
122
123 # If the IP address is on a blacklist, we will try to fetch the TXT record
124 reason = await self.backend.resolver.query(rr, type=pycares.QUERY_TYPE_TXT)
125
126 # Log result
127 logging.debug("%s is blacklisted on %s: %s" % (self, blacklist, reason or "N/A"))
128
129 # Take the first reason
130 if reason:
131 for i in reason:
132 return return_code, i.text
133
134 # Blocked, but no reason
135 return return_code, None
136
137 async def get_blacklists(self):
138 blacklists = { bl : await self._resolve_blacklist(bl) for bl in BLACKLISTS }
139
140 return blacklists
141
440aba92
MT
142
143def format_asn(asn):
144 network = db.get_as(asn)
145
146 if network:
147 return "%s" % network
148
149 return "AS%s" % asn
150
84604476
MT
151def format_size(s, max_unit=None):
152 units = ("B", "kB", "MB", "GB", "TB")
66862195
MT
153
154 i = 0
155 while s >= 1024 and i < len(units) - 1:
156 s /= 1024
157 i += 1
158
84604476
MT
159 if max_unit and units[i] == max_unit:
160 break
161
66862195
MT
162 return "%.0f%s" % (s, units[i])
163
5ac74b02 164def format_time(s, shorter=True):
66862195
MT
165 #_ = handler.locale.translate
166 _ = lambda x: x
167
168 hrs, s = divmod(s, 3600)
169 min, s = divmod(s, 60)
170
171 if s >= 30:
172 min += 1
173
174 if shorter and not hrs:
175 return _("%(min)d min") % { "min" : min }
176
177 return _("%(hrs)d:%(min)02d hrs") % {"hrs" : hrs, "min" : min}
e96e445b
MT
178
179def random_string(length=8):
180 input_chars = string.ascii_letters + string.digits
181
182 r = (random.choice(input_chars) for i in range(length))
183
184 return "".join(r)
75d9b3da
MT
185
186def normalize(s):
187 # Remove any non-ASCII characters
188 try:
189 s = unicodedata.normalize("NFKD", s)
190 except TypeError:
191 pass
192
193 # Remove excessive whitespace
194 s = re.sub(r"[^\w]+", " ", s)
195
196 return "-".join(s.split())
5ef115cd 197
2de3dacc 198def generate_thumbnail(data, size, square=False, **args):
5ef115cd
MT
199 assert data, "No image data received"
200
201 image = PIL.Image.open(io.BytesIO(data))
202
203 # Save image format
204 format = image.format
205
206 # Remove any alpha-channels
207 if image.format == "JPEG" and not image.mode == "RGB":
208 # Make a white background
209 background = PIL.Image.new("RGBA", image.size, (255,255,255))
210
211 # Convert image to RGBA if not in RGBA, yet
212 if not image.mode == "RGBA":
213 image = image.convert("RGBA")
214
215 # Flatten both images together
216 flattened_image = PIL.Image.alpha_composite(background, image)
217
218 # Remove the alpha channel
219 image = flattened_image.convert("RGB")
220
221 # Resize the image to the desired resolution
2de3dacc 222 if square:
7e222133 223 image = PIL.ImageOps.fit(image, (size, size), PIL.Image.LANCZOS)
2de3dacc
MT
224 else:
225 image.thumbnail((size, size), PIL.Image.LANCZOS)
5ef115cd
MT
226
227 if image.format == "JPEG":
228 # Apply a gaussian blur to make compression easier
229 image = image.filter(PIL.ImageFilter.GaussianBlur(radius=0.05))
230
231 # Arguments to optimise the compression
232 args.update({
233 "subsampling" : "4:2:0",
234 "quality" : 70,
235 })
236
237 with io.BytesIO() as f:
238 # If writing out the image does not work with optimization,
239 # we try to write it out without any optimization.
240 try:
241 image.save(f, format, optimize=True, **args)
242 except:
243 image.save(f, format, **args)
244
245 return f.getvalue()