]> git.ipfire.org Git - ipfire.org.git/blob - src/backend/util.py
Remove unused blacklist feature
[ipfire.org.git] / src / backend / util.py
1 #!/usr/bin/python3
2
3 import PIL.Image
4 import PIL.ImageFilter
5 import PIL.ImageOps
6 import io
7 import ipaddress
8 import location
9 import logging
10 import pycares
11 import random
12 import re
13 import socket
14 import string
15 import unicodedata
16
17 from .decorators import *
18 from .misc import Object
19
20 # These lists are used to block access to the webapp
21 BLOCKLISTS = (
22 "sbl.spamhaus.org",
23 "xbl.spamhaus.org",
24 )
25
26 BLACKLISTS = (
27 "b.barracudacentral.org",
28 "bl.spamcop.net",
29 "bl.blocklist.de",
30 "cbl.abuseat.org",
31 "dnsbl-1.uceprotect.net",
32 "dnsbl-2.uceprotect.net",
33 "dnsbl-3.uceprotect.net",
34 "dnsbl.abuse.ch",
35 "ix.dnsbl.manitu.net",
36 "pbl.spamhaus.org",
37 "sbl.spamhaus.org",
38 "xbl.spamhaus.org",
39 "zen.spamhaus.org",
40 )
41
42 # Open location database
43 db = location.Database("/var/lib/location/database.db")
44
45 class Address(Object):
46 def init(self, address):
47 self.address = ipaddress.ip_address(address)
48
49 def __str__(self):
50 return "%s" % self.address
51
52 @property
53 def family(self):
54 if isinstance(self.address, ipaddress.IPv6Address):
55 return socket.AF_INET6
56 elif isinstance(self.address, ipaddress.IPv4Address):
57 return socket.AF_INET
58
59 @lazy_property
60 def network(self):
61 return db.lookup("%s" % self.address)
62
63 @property
64 def country_code(self):
65 if self.network:
66 return self.network.country_code
67
68 @lazy_property
69 def asn(self):
70 if self.network:
71 return self.network.asn
72
73 # Blacklist
74
75 def _make_blacklist_rr(self, blacklist):
76 if self.family == socket.AF_INET6:
77 octets = list(self.address.exploded.replace(":", ""))
78 elif self.family == socket.AF_INET:
79 octets = str(self.address).split(".")
80 else:
81 raise NotImplementedError("Unknown IP protocol")
82
83 # Reverse the list
84 octets.reverse()
85
86 # Append suffix
87 octets.append(blacklist)
88
89 return ".".join(octets)
90
91 async def _resolve_blacklist(self, blacklist):
92 return_code = None
93
94 # Get resource record name
95 rr = self._make_blacklist_rr(blacklist)
96
97 # Get query type from IP protocol version
98 if self.family == socket.AF_INET6:
99 type = pycares.QUERY_TYPE_AAAA
100 elif self.family == socket.AF_INET:
101 type = pycares.QUERY_TYPE_A
102 else:
103 raise NotImplementedError("Unknown IP protocol")
104
105 # Run query
106 try:
107 res = await self.backend.resolver.query(rr, type=type)
108 except IOError as e:
109 logging.warning(e)
110
111 return return_code, "%s" % e
112
113 # Not found
114 if not res:
115 logging.debug("%s is not blacklisted on %s" % (self, blacklist))
116 return return_code, None
117
118 # Extract return code from DNS response
119 for row in res:
120 return_code = row.host
121 break
122
123 # If the IP address is on a blacklist, we will try to fetch the TXT record
124 reason = await self.backend.resolver.query(rr, type=pycares.QUERY_TYPE_TXT)
125
126 # Log result
127 logging.debug("%s is blacklisted on %s: %s" % (self, blacklist, reason or "N/A"))
128
129 # Take the first reason
130 if reason:
131 for i in reason:
132 return return_code, i.text
133
134 # Blocked, but no reason
135 return return_code, None
136
137 async def get_blacklists(self):
138 blacklists = { bl : await self._resolve_blacklist(bl) for bl in BLACKLISTS }
139
140 return blacklists
141
142
143 def format_asn(asn):
144 network = db.get_as(asn)
145
146 if network:
147 return "%s" % network
148
149 return "AS%s" % asn
150
151 def format_size(s, max_unit=None):
152 units = ("B", "kB", "MB", "GB", "TB")
153
154 i = 0
155 while s >= 1024 and i < len(units) - 1:
156 s /= 1024
157 i += 1
158
159 if max_unit and units[i] == max_unit:
160 break
161
162 return "%.0f%s" % (s, units[i])
163
164 def format_time(s, shorter=True):
165 #_ = handler.locale.translate
166 _ = lambda x: x
167
168 hrs, s = divmod(s, 3600)
169 min, s = divmod(s, 60)
170
171 if s >= 30:
172 min += 1
173
174 if shorter and not hrs:
175 return _("%(min)d min") % { "min" : min }
176
177 return _("%(hrs)d:%(min)02d hrs") % {"hrs" : hrs, "min" : min}
178
179 def random_string(length=8):
180 input_chars = string.ascii_letters + string.digits
181
182 r = (random.choice(input_chars) for i in range(length))
183
184 return "".join(r)
185
186 def normalize(s):
187 # Remove any non-ASCII characters
188 try:
189 s = unicodedata.normalize("NFKD", s)
190 except TypeError:
191 pass
192
193 # Remove excessive whitespace
194 s = re.sub(r"[^\w]+", " ", s)
195
196 return "-".join(s.split())
197
198 def generate_thumbnail(data, size, square=False, **args):
199 assert data, "No image data received"
200
201 image = PIL.Image.open(io.BytesIO(data))
202
203 # Save image format
204 format = image.format
205
206 # Remove any alpha-channels
207 if image.format == "JPEG" and not image.mode == "RGB":
208 # Make a white background
209 background = PIL.Image.new("RGBA", image.size, (255,255,255))
210
211 # Convert image to RGBA if not in RGBA, yet
212 if not image.mode == "RGBA":
213 image = image.convert("RGBA")
214
215 # Flatten both images together
216 flattened_image = PIL.Image.alpha_composite(background, image)
217
218 # Remove the alpha channel
219 image = flattened_image.convert("RGB")
220
221 # Resize the image to the desired resolution
222 if square:
223 image = PIL.ImageOps.fit(image, (size, size), PIL.Image.LANCZOS)
224 else:
225 image.thumbnail((size, size), PIL.Image.LANCZOS)
226
227 if image.format == "JPEG":
228 # Apply a gaussian blur to make compression easier
229 image = image.filter(PIL.ImageFilter.GaussianBlur(radius=0.05))
230
231 # Arguments to optimise the compression
232 args.update({
233 "subsampling" : "4:2:0",
234 "quality" : 70,
235 })
236
237 with io.BytesIO() as f:
238 # If writing out the image does not work with optimization,
239 # we try to write it out without any optimization.
240 try:
241 image.save(f, format, optimize=True, **args)
242 except:
243 image.save(f, format, **args)
244
245 return f.getvalue()