X-Git-Url: http://git.ipfire.org/?a=blobdiff_plain;f=src%2Fbackend%2Futil.py;h=370c63a096c00d8071b03ac2a8ee47df77748a20;hb=1c22909c6b68bb6c7d799de455083f8e39558941;hp=bcd8ca56111de46b9daaf4f2cb1ae22cac79b57e;hpb=eea71144d64063e086fba33b740884367f0f9ff5;p=ipfire.org.git diff --git a/src/backend/util.py b/src/backend/util.py index bcd8ca56..370c63a0 100644 --- a/src/backend/util.py +++ b/src/backend/util.py @@ -1,13 +1,178 @@ -#!/usr/bin/python +#!/usr/bin/python3 -def format_size(s): - units = ("B", "k", "M", "G", "T") +import PIL.Image +import PIL.ImageFilter +import PIL.ImageOps +import io +import ipaddress +import location +import logging +import pycares +import random +import re +import socket +import string +import unicodedata + +from .decorators import * +from .misc import Object + +# These lists are used to block access to the webapp +BLOCKLISTS = ( + "sbl.spamhaus.org", + "xbl.spamhaus.org", +) + +BLACKLISTS = ( + "b.barracudacentral.org", + "bl.spamcop.net", + "bl.blocklist.de", + "cbl.abuseat.org", + "dnsbl-1.uceprotect.net", + "dnsbl-2.uceprotect.net", + "dnsbl-3.uceprotect.net", + "dnsbl.abuse.ch", + "ix.dnsbl.manitu.net", + "pbl.spamhaus.org", + "sbl.spamhaus.org", + "xbl.spamhaus.org", + "zen.spamhaus.org", +) + +class Address(Object): + def init(self, address): + self.address = ipaddress.ip_address(address) + + def __str__(self): + return "%s" % self.address + + @property + def family(self): + if isinstance(self.address, ipaddress.IPv6Address): + return socket.AF_INET6 + elif isinstance(self.address, ipaddress.IPv4Address): + return socket.AF_INET + + @lazy_property + def network(self): + return self.backend.location.lookup("%s" % self.address) + + @property + def country_code(self): + if self.network: + return self.network.country_code + + @lazy_property + def asn(self): + if self.network: + return self.network.asn + + @lazy_property + def autonomous_system(self): + if self.asn: + return self.backend.location.get_as(self.asn) + + def is_anonymous_proxy(self): + if self.network: + return self.network.has_flag(location.NETWORK_FLAG_ANONYMOUS_PROXY) + + def is_satellite_provider(self): + if self.network: + return self.network.has_flag(location.NETWORK_FLAG_SATELLITE_PROVIDER) + + def is_anycast(self): + if self.network: + return self.network.has_flag(location.NETWORK_FLAG_ANYCAST) + + # Blacklist + + def _make_blacklist_rr(self, blacklist): + if self.family == socket.AF_INET6: + octets = list(self.address.exploded.replace(":", "")) + elif self.family == socket.AF_INET: + octets = str(self.address).split(".") + else: + raise NotImplementedError("Unknown IP protocol") + + # Reverse the list + octets.reverse() + + # Append suffix + octets.append(blacklist) + + return ".".join(octets) + + async def _resolve_blacklist(self, blacklist): + return_code = None + + # Get resource record name + rr = self._make_blacklist_rr(blacklist) + + # Get query type from IP protocol version + if self.family == socket.AF_INET6: + type = pycares.QUERY_TYPE_AAAA + elif self.family == socket.AF_INET: + type = pycares.QUERY_TYPE_A + else: + raise NotImplementedError("Unknown IP protocol") + + # Run query + try: + res = await self.backend.resolver.query(rr, type=type) + except IOError as e: + logging.warning(e) + + return return_code, "%s" % e + + # Not found + if not res: + logging.debug("%s is not blacklisted on %s" % (self, blacklist)) + return return_code, None + + # Extract return code from DNS response + for row in res: + return_code = row.host + break + + # If the IP address is on a blacklist, we will try to fetch the TXT record + reason = await self.backend.resolver.query(rr, type=pycares.QUERY_TYPE_TXT) + + # Log result + logging.debug("%s is blacklisted on %s: %s" % (self, blacklist, reason or "N/A")) + + # Take the first reason + if reason: + for i in reason: + return return_code, i.text + + # Blocked, but no reason + return return_code, None + + async def get_blacklists(self): + blacklists = { bl : await self._resolve_blacklist(bl) for bl in BLACKLISTS } + + return blacklists + + +def format_asn(asn): + network = db.get_as(asn) + + if network: + return "%s" % network + + return "AS%s" % asn + +def format_size(s, max_unit=None): + units = ("B", "kB", "MB", "GB", "TB") i = 0 while s >= 1024 and i < len(units) - 1: s /= 1024 i += 1 + if max_unit and units[i] == max_unit: + break + return "%.0f%s" % (s, units[i]) def format_time(s, shorter=True): @@ -24,3 +189,71 @@ def format_time(s, shorter=True): return _("%(min)d min") % { "min" : min } return _("%(hrs)d:%(min)02d hrs") % {"hrs" : hrs, "min" : min} + +def random_string(length=8): + input_chars = string.ascii_letters + string.digits + + r = (random.choice(input_chars) for i in range(length)) + + return "".join(r) + +def normalize(s): + # Remove any non-ASCII characters + try: + s = unicodedata.normalize("NFKD", s) + except TypeError: + pass + + # Remove excessive whitespace + s = re.sub(r"[^\w]+", " ", s) + + return "-".join(s.split()) + +def generate_thumbnail(data, size, square=False, **args): + assert data, "No image data received" + + image = PIL.Image.open(io.BytesIO(data)) + + # Save image format + format = image.format + + # Remove any alpha-channels + if image.format == "JPEG" and not image.mode == "RGB": + # Make a white background + background = PIL.Image.new("RGBA", image.size, (255,255,255)) + + # Convert image to RGBA if not in RGBA, yet + if not image.mode == "RGBA": + image = image.convert("RGBA") + + # Flatten both images together + flattened_image = PIL.Image.alpha_composite(background, image) + + # Remove the alpha channel + image = flattened_image.convert("RGB") + + # Resize the image to the desired resolution + if square: + image = PIL.ImageOps.fit(image, (size, size), PIL.Image.LANCZOS) + else: + image.thumbnail((size, size), PIL.Image.LANCZOS) + + if image.format == "JPEG": + # Apply a gaussian blur to make compression easier + image = image.filter(PIL.ImageFilter.GaussianBlur(radius=0.05)) + + # Arguments to optimise the compression + args.update({ + "subsampling" : "4:2:0", + "quality" : 70, + }) + + with io.BytesIO() as f: + # If writing out the image does not work with optimization, + # we try to write it out without any optimization. + try: + image.save(f, format, optimize=True, **args) + except: + image.save(f, format, **args) + + return f.getvalue()