19 from .decorators
import *
20 from .misc
import Object
22 # These lists are used to block access to the webapp
28 class Address(Object
):
29 def init(self
, address
):
30 self
.address
= ipaddress
.ip_address(address
)
33 return "%s" % self
.address
37 if isinstance(self
.address
, ipaddress
.IPv6Address
):
38 return socket
.AF_INET6
39 elif isinstance(self
.address
, ipaddress
.IPv4Address
):
44 return self
.backend
.location
.lookup("%s" % self
.address
)
47 def country_code(self
):
49 return self
.network
.country_code
54 return self
.network
.asn
57 def autonomous_system(self
):
59 return self
.backend
.location
.get_as(self
.asn
)
61 def is_anonymous_proxy(self
):
63 return self
.network
.has_flag(location
.NETWORK_FLAG_ANONYMOUS_PROXY
)
65 def is_satellite_provider(self
):
67 return self
.network
.has_flag(location
.NETWORK_FLAG_SATELLITE_PROVIDER
)
71 return self
.network
.has_flag(location
.NETWORK_FLAG_ANYCAST
)
75 def _make_blacklist_rr(self
, blacklist
):
76 if self
.family
== socket
.AF_INET6
:
77 octets
= list(self
.address
.exploded
.replace(":", ""))
78 elif self
.family
== socket
.AF_INET
:
79 octets
= str(self
.address
).split(".")
81 raise NotImplementedError("Unknown IP protocol")
87 octets
.append(blacklist
)
89 return ".".join(octets
)
91 async def _resolve_blacklist(self
, blacklist
):
94 # Get resource record name
95 rr
= self
._make
_blacklist
_rr
(blacklist
)
97 # Get query type from IP protocol version
98 if self
.family
== socket
.AF_INET6
:
99 type = pycares
.QUERY_TYPE_AAAA
100 elif self
.family
== socket
.AF_INET
:
101 type = pycares
.QUERY_TYPE_A
103 raise NotImplementedError("Unknown IP protocol")
107 res
= await self
.backend
.resolver
.query(rr
, type=type)
111 return return_code
, "%s" % e
115 logging
.debug("%s is not blacklisted on %s" % (self
, blacklist
))
116 return return_code
, None
118 # Extract return code from DNS response
120 return_code
= row
.host
123 # If the IP address is on a blacklist, we will try to fetch the TXT record
124 reason
= await self
.backend
.resolver
.query(rr
, type=pycares
.QUERY_TYPE_TXT
)
127 logging
.debug("%s is blacklisted on %s: %s" % (self
, blacklist
, reason
or "N/A"))
129 # Take the first reason
132 return return_code
, i
.text
134 # Blocked, but no reason
135 return return_code
, None
138 def format_size(s
, max_unit
=None):
139 units
= ("B", "kB", "MB", "GB", "TB")
142 while s
>= 1024 and i
< len(units
) - 1:
146 if max_unit
and units
[i
] == max_unit
:
149 return "%.0f%s" % (s
, units
[i
])
151 def format_time(s
, shorter
=True):
152 #_ = handler.locale.translate
155 if isinstance(s
, datetime
.timedelta
):
156 s
= s
.total_seconds()
158 hrs
, s
= divmod(s
, 3600)
159 min, s
= divmod(s
, 60)
164 if shorter
and not hrs
:
165 return _("%(min)d min") % { "min" : min }
167 return _("%(hrs)d:%(min)02d hrs") % {"hrs" : hrs
, "min" : min}
169 def random_string(length
=8):
170 input_chars
= string
.ascii_letters
+ string
.digits
172 r
= (random
.choice(input_chars
) for i
in range(length
))
177 # Remove any non-ASCII characters
179 s
= unicodedata
.normalize("NFKD", s
)
183 # Remove excessive whitespace
184 s
= re
.sub(r
"[^\w]+", " ", s
)
186 return "-".join(s
.split())
188 def generate_thumbnail(image
, size
, square
=False, format
=None, quality
=None, **args
):
189 assert image
, "No image data received"
191 if not isinstance(image
, PIL
.Image
.Image
):
192 image
= io
.BytesIO(image
)
195 image
= PIL
.Image
.open(image
)
197 # If we cannot open the image, we return it in raw form
198 except PIL
.UnidentifiedImageError
as e
:
199 return image
.getvalue()
202 format
= format
or image
.format
or "JPEG"
204 # Fetch any EXIF data
206 exif
= image
._getexif
()
207 except AttributeError as e
:
212 for tag
in PIL
.ExifTags
.TAGS
:
213 if PIL
.ExifTags
.TAGS
[tag
] == "Orientation":
216 image
= image
.rotate(180, expand
=True)
218 image
= image
.rotate(270, expand
=True)
220 image
= image
.rotate( 90, expand
=True)
222 # Ignore if the orientation isn't encoded
226 # Remove any alpha-channels
227 if format
== "JPEG" and not image
.mode
== "RGB":
228 # Make a white background
229 background
= PIL
.Image
.new("RGBA", image
.size
, (255,255,255))
231 # Convert image to RGBA if not in RGBA, yet
232 if not image
.mode
== "RGBA":
233 image
= image
.convert("RGBA")
235 # Flatten both images together
236 flattened_image
= PIL
.Image
.alpha_composite(background
, image
)
238 # Remove the alpha channel
239 image
= flattened_image
.convert("RGB")
241 # Resize the image to the desired resolution
243 image
= PIL
.ImageOps
.fit(image
, (size
, size
), PIL
.Image
.LANCZOS
)
245 image
.thumbnail((size
, size
), PIL
.Image
.LANCZOS
)
247 # Apply a gaussian blur to make compression easier
249 image
= image
.filter(PIL
.ImageFilter
.GaussianBlur(radius
=0.05))
253 # Arguments to optimise the compression
255 "subsampling" : "4:2:0",
256 "quality" : quality
or 72,
259 if image
.format
== "JPEG":
261 "qtables" : "web_low",
264 elif image
.format
== "WEBP":
269 with io
.BytesIO() as f
:
270 # If writing out the image does not work with optimization,
271 # we try to write it out without any optimization.
273 image
.save(f
, format
, optimize
=True, **args
)
275 image
.save(f
, format
, **args
)