17 from .decorators
import *
18 from .misc
import Object
20 # These lists are used to block access to the webapp
27 "b.barracudacentral.org",
31 "dnsbl-1.uceprotect.net",
32 "dnsbl-2.uceprotect.net",
33 "dnsbl-3.uceprotect.net",
35 "ix.dnsbl.manitu.net",
42 class Address(Object
):
43 def init(self
, address
):
44 self
.address
= ipaddress
.ip_address(address
)
47 return "%s" % self
.address
51 if isinstance(self
.address
, ipaddress
.IPv6Address
):
52 return socket
.AF_INET6
53 elif isinstance(self
.address
, ipaddress
.IPv4Address
):
58 return self
.backend
.location
.lookup("%s" % self
.address
)
61 def country_code(self
):
63 return self
.network
.country_code
68 return self
.network
.asn
71 def autonomous_system(self
):
73 return self
.backend
.location
.get_as(self
.asn
)
75 def is_anonymous_proxy(self
):
77 return self
.network
.has_flag(location
.NETWORK_FLAG_ANONYMOUS_PROXY
)
79 def is_satellite_provider(self
):
81 return self
.network
.has_flag(location
.NETWORK_FLAG_SATELLITE_PROVIDER
)
85 return self
.network
.has_flag(location
.NETWORK_FLAG_ANYCAST
)
89 def _make_blacklist_rr(self
, blacklist
):
90 if self
.family
== socket
.AF_INET6
:
91 octets
= list(self
.address
.exploded
.replace(":", ""))
92 elif self
.family
== socket
.AF_INET
:
93 octets
= str(self
.address
).split(".")
95 raise NotImplementedError("Unknown IP protocol")
101 octets
.append(blacklist
)
103 return ".".join(octets
)
105 async def _resolve_blacklist(self
, blacklist
):
108 # Get resource record name
109 rr
= self
._make
_blacklist
_rr
(blacklist
)
111 # Get query type from IP protocol version
112 if self
.family
== socket
.AF_INET6
:
113 type = pycares
.QUERY_TYPE_AAAA
114 elif self
.family
== socket
.AF_INET
:
115 type = pycares
.QUERY_TYPE_A
117 raise NotImplementedError("Unknown IP protocol")
121 res
= await self
.backend
.resolver
.query(rr
, type=type)
125 return return_code
, "%s" % e
129 logging
.debug("%s is not blacklisted on %s" % (self
, blacklist
))
130 return return_code
, None
132 # Extract return code from DNS response
134 return_code
= row
.host
137 # If the IP address is on a blacklist, we will try to fetch the TXT record
138 reason
= await self
.backend
.resolver
.query(rr
, type=pycares
.QUERY_TYPE_TXT
)
141 logging
.debug("%s is blacklisted on %s: %s" % (self
, blacklist
, reason
or "N/A"))
143 # Take the first reason
146 return return_code
, i
.text
148 # Blocked, but no reason
149 return return_code
, None
151 async def get_blacklists(self
):
152 blacklists
= { bl
: await self
._resolve
_blacklist
(bl
) for bl
in BLACKLISTS
}
157 def format_size(s
, max_unit
=None):
158 units
= ("B", "kB", "MB", "GB", "TB")
161 while s
>= 1024 and i
< len(units
) - 1:
165 if max_unit
and units
[i
] == max_unit
:
168 return "%.0f%s" % (s
, units
[i
])
170 def format_time(s
, shorter
=True):
171 #_ = handler.locale.translate
174 hrs
, s
= divmod(s
, 3600)
175 min, s
= divmod(s
, 60)
180 if shorter
and not hrs
:
181 return _("%(min)d min") % { "min" : min }
183 return _("%(hrs)d:%(min)02d hrs") % {"hrs" : hrs
, "min" : min}
185 def random_string(length
=8):
186 input_chars
= string
.ascii_letters
+ string
.digits
188 r
= (random
.choice(input_chars
) for i
in range(length
))
193 # Remove any non-ASCII characters
195 s
= unicodedata
.normalize("NFKD", s
)
199 # Remove excessive whitespace
200 s
= re
.sub(r
"[^\w]+", " ", s
)
202 return "-".join(s
.split())
204 def generate_thumbnail(data
, size
, square
=False, **args
):
205 assert data
, "No image data received"
207 image
= PIL
.Image
.open(io
.BytesIO(data
))
210 format
= image
.format
212 # Remove any alpha-channels
213 if image
.format
== "JPEG" and not image
.mode
== "RGB":
214 # Make a white background
215 background
= PIL
.Image
.new("RGBA", image
.size
, (255,255,255))
217 # Convert image to RGBA if not in RGBA, yet
218 if not image
.mode
== "RGBA":
219 image
= image
.convert("RGBA")
221 # Flatten both images together
222 flattened_image
= PIL
.Image
.alpha_composite(background
, image
)
224 # Remove the alpha channel
225 image
= flattened_image
.convert("RGB")
227 # Resize the image to the desired resolution
229 image
= PIL
.ImageOps
.fit(image
, (size
, size
), PIL
.Image
.LANCZOS
)
231 image
.thumbnail((size
, size
), PIL
.Image
.LANCZOS
)
233 if image
.format
== "JPEG":
234 # Apply a gaussian blur to make compression easier
235 image
= image
.filter(PIL
.ImageFilter
.GaussianBlur(radius
=0.05))
237 # Arguments to optimise the compression
239 "subsampling" : "4:2:0",
243 with io
.BytesIO() as f
:
244 # If writing out the image does not work with optimization,
245 # we try to write it out without any optimization.
247 image
.save(f
, format
, optimize
=True, **args
)
249 image
.save(f
, format
, **args
)