]> git.ipfire.org Git - ipfire.org.git/blame - src/backend/util.py
Drop unused format_asn() function
[ipfire.org.git] / src / backend / util.py
CommitLineData
9523790a 1#!/usr/bin/python3
66862195 2
5ef115cd
MT
3import PIL.Image
4import PIL.ImageFilter
2b72638d 5import PIL.ImageOps
5ef115cd 6import io
440aba92
MT
7import ipaddress
8import location
5ef115cd 9import logging
440aba92 10import pycares
e96e445b 11import random
9523790a 12import re
440aba92 13import socket
e96e445b 14import string
75d9b3da 15import unicodedata
e96e445b 16
440aba92
MT
17from .decorators import *
18from .misc import Object
19
20# These lists are used to block access to the webapp
21BLOCKLISTS = (
22 "sbl.spamhaus.org",
23 "xbl.spamhaus.org",
24)
25
26BLACKLISTS = (
27 "b.barracudacentral.org",
28 "bl.spamcop.net",
29 "bl.blocklist.de",
30 "cbl.abuseat.org",
31 "dnsbl-1.uceprotect.net",
32 "dnsbl-2.uceprotect.net",
33 "dnsbl-3.uceprotect.net",
34 "dnsbl.abuse.ch",
35 "ix.dnsbl.manitu.net",
36 "pbl.spamhaus.org",
37 "sbl.spamhaus.org",
38 "xbl.spamhaus.org",
39 "zen.spamhaus.org",
40)
41
440aba92
MT
42class Address(Object):
43 def init(self, address):
44 self.address = ipaddress.ip_address(address)
45
46 def __str__(self):
47 return "%s" % self.address
48
49 @property
50 def family(self):
51 if isinstance(self.address, ipaddress.IPv6Address):
52 return socket.AF_INET6
53 elif isinstance(self.address, ipaddress.IPv4Address):
54 return socket.AF_INET
55
56 @lazy_property
57 def network(self):
7b05edde 58 return self.backend.location.lookup("%s" % self.address)
440aba92
MT
59
60 @property
61 def country_code(self):
62 if self.network:
63 return self.network.country_code
64
65 @lazy_property
66 def asn(self):
67 if self.network:
68 return self.network.asn
69
7b05edde
MT
70 @lazy_property
71 def autonomous_system(self):
72 if self.asn:
73 return self.backend.location.get_as(self.asn)
74
7ac8b12f 75 def is_anonymous_proxy(self):
1c22909c
MT
76 if self.network:
77 return self.network.has_flag(location.NETWORK_FLAG_ANONYMOUS_PROXY)
7ac8b12f
MT
78
79 def is_satellite_provider(self):
1c22909c
MT
80 if self.network:
81 return self.network.has_flag(location.NETWORK_FLAG_SATELLITE_PROVIDER)
7ac8b12f
MT
82
83 def is_anycast(self):
1c22909c
MT
84 if self.network:
85 return self.network.has_flag(location.NETWORK_FLAG_ANYCAST)
7ac8b12f 86
440aba92
MT
87 # Blacklist
88
89 def _make_blacklist_rr(self, blacklist):
90 if self.family == socket.AF_INET6:
91 octets = list(self.address.exploded.replace(":", ""))
92 elif self.family == socket.AF_INET:
93 octets = str(self.address).split(".")
94 else:
95 raise NotImplementedError("Unknown IP protocol")
96
97 # Reverse the list
98 octets.reverse()
99
100 # Append suffix
101 octets.append(blacklist)
102
103 return ".".join(octets)
104
105 async def _resolve_blacklist(self, blacklist):
106 return_code = None
107
108 # Get resource record name
109 rr = self._make_blacklist_rr(blacklist)
110
111 # Get query type from IP protocol version
112 if self.family == socket.AF_INET6:
113 type = pycares.QUERY_TYPE_AAAA
114 elif self.family == socket.AF_INET:
115 type = pycares.QUERY_TYPE_A
116 else:
117 raise NotImplementedError("Unknown IP protocol")
118
119 # Run query
120 try:
121 res = await self.backend.resolver.query(rr, type=type)
122 except IOError as e:
123 logging.warning(e)
124
125 return return_code, "%s" % e
126
127 # Not found
128 if not res:
129 logging.debug("%s is not blacklisted on %s" % (self, blacklist))
130 return return_code, None
131
132 # Extract return code from DNS response
133 for row in res:
134 return_code = row.host
135 break
136
137 # If the IP address is on a blacklist, we will try to fetch the TXT record
138 reason = await self.backend.resolver.query(rr, type=pycares.QUERY_TYPE_TXT)
139
140 # Log result
141 logging.debug("%s is blacklisted on %s: %s" % (self, blacklist, reason or "N/A"))
142
143 # Take the first reason
144 if reason:
145 for i in reason:
146 return return_code, i.text
147
148 # Blocked, but no reason
149 return return_code, None
150
151 async def get_blacklists(self):
152 blacklists = { bl : await self._resolve_blacklist(bl) for bl in BLACKLISTS }
153
154 return blacklists
155
440aba92 156
84604476
MT
157def format_size(s, max_unit=None):
158 units = ("B", "kB", "MB", "GB", "TB")
66862195
MT
159
160 i = 0
161 while s >= 1024 and i < len(units) - 1:
162 s /= 1024
163 i += 1
164
84604476
MT
165 if max_unit and units[i] == max_unit:
166 break
167
66862195
MT
168 return "%.0f%s" % (s, units[i])
169
5ac74b02 170def format_time(s, shorter=True):
66862195
MT
171 #_ = handler.locale.translate
172 _ = lambda x: x
173
174 hrs, s = divmod(s, 3600)
175 min, s = divmod(s, 60)
176
177 if s >= 30:
178 min += 1
179
180 if shorter and not hrs:
181 return _("%(min)d min") % { "min" : min }
182
183 return _("%(hrs)d:%(min)02d hrs") % {"hrs" : hrs, "min" : min}
e96e445b
MT
184
185def random_string(length=8):
186 input_chars = string.ascii_letters + string.digits
187
188 r = (random.choice(input_chars) for i in range(length))
189
190 return "".join(r)
75d9b3da
MT
191
192def normalize(s):
193 # Remove any non-ASCII characters
194 try:
195 s = unicodedata.normalize("NFKD", s)
196 except TypeError:
197 pass
198
199 # Remove excessive whitespace
200 s = re.sub(r"[^\w]+", " ", s)
201
202 return "-".join(s.split())
5ef115cd 203
2de3dacc 204def generate_thumbnail(data, size, square=False, **args):
5ef115cd
MT
205 assert data, "No image data received"
206
207 image = PIL.Image.open(io.BytesIO(data))
208
209 # Save image format
210 format = image.format
211
212 # Remove any alpha-channels
213 if image.format == "JPEG" and not image.mode == "RGB":
214 # Make a white background
215 background = PIL.Image.new("RGBA", image.size, (255,255,255))
216
217 # Convert image to RGBA if not in RGBA, yet
218 if not image.mode == "RGBA":
219 image = image.convert("RGBA")
220
221 # Flatten both images together
222 flattened_image = PIL.Image.alpha_composite(background, image)
223
224 # Remove the alpha channel
225 image = flattened_image.convert("RGB")
226
227 # Resize the image to the desired resolution
2de3dacc 228 if square:
7e222133 229 image = PIL.ImageOps.fit(image, (size, size), PIL.Image.LANCZOS)
2de3dacc
MT
230 else:
231 image.thumbnail((size, size), PIL.Image.LANCZOS)
5ef115cd
MT
232
233 if image.format == "JPEG":
234 # Apply a gaussian blur to make compression easier
235 image = image.filter(PIL.ImageFilter.GaussianBlur(radius=0.05))
236
237 # Arguments to optimise the compression
238 args.update({
239 "subsampling" : "4:2:0",
240 "quality" : 70,
241 })
242
243 with io.BytesIO() as f:
244 # If writing out the image does not work with optimization,
245 # we try to write it out without any optimization.
246 try:
247 image.save(f, format, optimize=True, **args)
248 except:
249 image.save(f, format, **args)
250
251 return f.getvalue()