19 from .decorators
import *
20 from .misc
import Object
22 # These lists are used to block access to the webapp
29 "b.barracudacentral.org",
33 "dnsbl-1.uceprotect.net",
34 "dnsbl-2.uceprotect.net",
35 "dnsbl-3.uceprotect.net",
37 "ix.dnsbl.manitu.net",
44 class Address(Object
):
45 def init(self
, address
):
46 self
.address
= ipaddress
.ip_address(address
)
49 return "%s" % self
.address
53 if isinstance(self
.address
, ipaddress
.IPv6Address
):
54 return socket
.AF_INET6
55 elif isinstance(self
.address
, ipaddress
.IPv4Address
):
60 return self
.backend
.location
.lookup("%s" % self
.address
)
63 def country_code(self
):
65 return self
.network
.country_code
70 return self
.network
.asn
73 def autonomous_system(self
):
75 return self
.backend
.location
.get_as(self
.asn
)
77 def is_anonymous_proxy(self
):
79 return self
.network
.has_flag(location
.NETWORK_FLAG_ANONYMOUS_PROXY
)
81 def is_satellite_provider(self
):
83 return self
.network
.has_flag(location
.NETWORK_FLAG_SATELLITE_PROVIDER
)
87 return self
.network
.has_flag(location
.NETWORK_FLAG_ANYCAST
)
91 def _make_blacklist_rr(self
, blacklist
):
92 if self
.family
== socket
.AF_INET6
:
93 octets
= list(self
.address
.exploded
.replace(":", ""))
94 elif self
.family
== socket
.AF_INET
:
95 octets
= str(self
.address
).split(".")
97 raise NotImplementedError("Unknown IP protocol")
103 octets
.append(blacklist
)
105 return ".".join(octets
)
107 async def _resolve_blacklist(self
, blacklist
):
110 # Get resource record name
111 rr
= self
._make
_blacklist
_rr
(blacklist
)
113 # Get query type from IP protocol version
114 if self
.family
== socket
.AF_INET6
:
115 type = pycares
.QUERY_TYPE_AAAA
116 elif self
.family
== socket
.AF_INET
:
117 type = pycares
.QUERY_TYPE_A
119 raise NotImplementedError("Unknown IP protocol")
123 res
= await self
.backend
.resolver
.query(rr
, type=type)
127 return return_code
, "%s" % e
131 logging
.debug("%s is not blacklisted on %s" % (self
, blacklist
))
132 return return_code
, None
134 # Extract return code from DNS response
136 return_code
= row
.host
139 # If the IP address is on a blacklist, we will try to fetch the TXT record
140 reason
= await self
.backend
.resolver
.query(rr
, type=pycares
.QUERY_TYPE_TXT
)
143 logging
.debug("%s is blacklisted on %s: %s" % (self
, blacklist
, reason
or "N/A"))
145 # Take the first reason
148 return return_code
, i
.text
150 # Blocked, but no reason
151 return return_code
, None
153 async def get_blacklists(self
):
154 blacklists
= { bl
: await self
._resolve
_blacklist
(bl
) for bl
in BLACKLISTS
}
159 def format_size(s
, max_unit
=None):
160 units
= ("B", "kB", "MB", "GB", "TB")
163 while s
>= 1024 and i
< len(units
) - 1:
167 if max_unit
and units
[i
] == max_unit
:
170 return "%.0f%s" % (s
, units
[i
])
172 def format_time(s
, shorter
=True):
173 #_ = handler.locale.translate
176 if isinstance(s
, datetime
.timedelta
):
177 s
= s
.total_seconds()
179 hrs
, s
= divmod(s
, 3600)
180 min, s
= divmod(s
, 60)
185 if shorter
and not hrs
:
186 return _("%(min)d min") % { "min" : min }
188 return _("%(hrs)d:%(min)02d hrs") % {"hrs" : hrs
, "min" : min}
190 def random_string(length
=8):
191 input_chars
= string
.ascii_letters
+ string
.digits
193 r
= (random
.choice(input_chars
) for i
in range(length
))
198 # Remove any non-ASCII characters
200 s
= unicodedata
.normalize("NFKD", s
)
204 # Remove excessive whitespace
205 s
= re
.sub(r
"[^\w]+", " ", s
)
207 return "-".join(s
.split())
209 def generate_thumbnail(data
, size
, square
=False, **args
):
210 assert data
, "No image data received"
213 image
= PIL
.Image
.open(io
.BytesIO(data
))
215 # If we cannot open the image, we return it in raw form
216 except PIL
.UnidentifiedImageError
as e
:
220 format
= image
.format
222 # Fetch any EXIF data
223 exif
= image
._getexif
()
227 for tag
in PIL
.ExifTags
.TAGS
:
228 if PIL
.ExifTags
.TAGS
[tag
] == "Orientation":
230 image
= image
.rotate(180, expand
=True)
232 image
= image
.rotate(270, expand
=True)
234 image
= image
.rotate( 90, expand
=True)
236 # Remove any alpha-channels
237 if image
.format
== "JPEG" and not image
.mode
== "RGB":
238 # Make a white background
239 background
= PIL
.Image
.new("RGBA", image
.size
, (255,255,255))
241 # Convert image to RGBA if not in RGBA, yet
242 if not image
.mode
== "RGBA":
243 image
= image
.convert("RGBA")
245 # Flatten both images together
246 flattened_image
= PIL
.Image
.alpha_composite(background
, image
)
248 # Remove the alpha channel
249 image
= flattened_image
.convert("RGB")
251 # Resize the image to the desired resolution
253 image
= PIL
.ImageOps
.fit(image
, (size
, size
), PIL
.Image
.LANCZOS
)
255 image
.thumbnail((size
, size
), PIL
.Image
.LANCZOS
)
257 if image
.format
== "JPEG":
258 # Apply a gaussian blur to make compression easier
259 image
= image
.filter(PIL
.ImageFilter
.GaussianBlur(radius
=0.05))
261 # Arguments to optimise the compression
263 "subsampling" : "4:2:0",
267 with io
.BytesIO() as f
:
268 # If writing out the image does not work with optimization,
269 # we try to write it out without any optimization.
271 image
.save(f
, format
, optimize
=True, **args
)
273 image
.save(f
, format
, **args
)