17 from .decorators
import *
18 from .misc
import Object
20 # These lists are used to block access to the webapp
27 "b.barracudacentral.org",
31 "dnsbl-1.uceprotect.net",
32 "dnsbl-2.uceprotect.net",
33 "dnsbl-3.uceprotect.net",
35 "ix.dnsbl.manitu.net",
42 class Address(Object
):
43 def init(self
, address
):
44 self
.address
= ipaddress
.ip_address(address
)
47 return "%s" % self
.address
51 if isinstance(self
.address
, ipaddress
.IPv6Address
):
52 return socket
.AF_INET6
53 elif isinstance(self
.address
, ipaddress
.IPv4Address
):
58 return self
.backend
.location
.lookup("%s" % self
.address
)
61 def country_code(self
):
63 return self
.network
.country_code
68 return self
.network
.asn
71 def autonomous_system(self
):
73 return self
.backend
.location
.get_as(self
.asn
)
75 def is_anonymous_proxy(self
):
76 return self
.network
.has_flag(location
.NETWORK_FLAG_ANONYMOUS_PROXY
)
78 def is_satellite_provider(self
):
79 return self
.network
.has_flag(location
.NETWORK_FLAG_SATELLITE_PROVIDER
)
82 return self
.network
.has_flag(location
.NETWORK_FLAG_ANYCAST
)
86 def _make_blacklist_rr(self
, blacklist
):
87 if self
.family
== socket
.AF_INET6
:
88 octets
= list(self
.address
.exploded
.replace(":", ""))
89 elif self
.family
== socket
.AF_INET
:
90 octets
= str(self
.address
).split(".")
92 raise NotImplementedError("Unknown IP protocol")
98 octets
.append(blacklist
)
100 return ".".join(octets
)
102 async def _resolve_blacklist(self
, blacklist
):
105 # Get resource record name
106 rr
= self
._make
_blacklist
_rr
(blacklist
)
108 # Get query type from IP protocol version
109 if self
.family
== socket
.AF_INET6
:
110 type = pycares
.QUERY_TYPE_AAAA
111 elif self
.family
== socket
.AF_INET
:
112 type = pycares
.QUERY_TYPE_A
114 raise NotImplementedError("Unknown IP protocol")
118 res
= await self
.backend
.resolver
.query(rr
, type=type)
122 return return_code
, "%s" % e
126 logging
.debug("%s is not blacklisted on %s" % (self
, blacklist
))
127 return return_code
, None
129 # Extract return code from DNS response
131 return_code
= row
.host
134 # If the IP address is on a blacklist, we will try to fetch the TXT record
135 reason
= await self
.backend
.resolver
.query(rr
, type=pycares
.QUERY_TYPE_TXT
)
138 logging
.debug("%s is blacklisted on %s: %s" % (self
, blacklist
, reason
or "N/A"))
140 # Take the first reason
143 return return_code
, i
.text
145 # Blocked, but no reason
146 return return_code
, None
148 async def get_blacklists(self
):
149 blacklists
= { bl
: await self
._resolve
_blacklist
(bl
) for bl
in BLACKLISTS
}
155 network
= db
.get_as(asn
)
158 return "%s" % network
162 def format_size(s
, max_unit
=None):
163 units
= ("B", "kB", "MB", "GB", "TB")
166 while s
>= 1024 and i
< len(units
) - 1:
170 if max_unit
and units
[i
] == max_unit
:
173 return "%.0f%s" % (s
, units
[i
])
175 def format_time(s
, shorter
=True):
176 #_ = handler.locale.translate
179 hrs
, s
= divmod(s
, 3600)
180 min, s
= divmod(s
, 60)
185 if shorter
and not hrs
:
186 return _("%(min)d min") % { "min" : min }
188 return _("%(hrs)d:%(min)02d hrs") % {"hrs" : hrs
, "min" : min}
190 def random_string(length
=8):
191 input_chars
= string
.ascii_letters
+ string
.digits
193 r
= (random
.choice(input_chars
) for i
in range(length
))
198 # Remove any non-ASCII characters
200 s
= unicodedata
.normalize("NFKD", s
)
204 # Remove excessive whitespace
205 s
= re
.sub(r
"[^\w]+", " ", s
)
207 return "-".join(s
.split())
209 def generate_thumbnail(data
, size
, square
=False, **args
):
210 assert data
, "No image data received"
212 image
= PIL
.Image
.open(io
.BytesIO(data
))
215 format
= image
.format
217 # Remove any alpha-channels
218 if image
.format
== "JPEG" and not image
.mode
== "RGB":
219 # Make a white background
220 background
= PIL
.Image
.new("RGBA", image
.size
, (255,255,255))
222 # Convert image to RGBA if not in RGBA, yet
223 if not image
.mode
== "RGBA":
224 image
= image
.convert("RGBA")
226 # Flatten both images together
227 flattened_image
= PIL
.Image
.alpha_composite(background
, image
)
229 # Remove the alpha channel
230 image
= flattened_image
.convert("RGB")
232 # Resize the image to the desired resolution
234 image
= PIL
.ImageOps
.fit(image
, (size
, size
), PIL
.Image
.LANCZOS
)
236 image
.thumbnail((size
, size
), PIL
.Image
.LANCZOS
)
238 if image
.format
== "JPEG":
239 # Apply a gaussian blur to make compression easier
240 image
= image
.filter(PIL
.ImageFilter
.GaussianBlur(radius
=0.05))
242 # Arguments to optimise the compression
244 "subsampling" : "4:2:0",
248 with io
.BytesIO() as f
:
249 # If writing out the image does not work with optimization,
250 # we try to write it out without any optimization.
252 image
.save(f
, format
, optimize
=True, **args
)
254 image
.save(f
, format
, **args
)