]> git.ipfire.org Git - ipfire.org.git/blobdiff - src/backend/util.py
util: Optionally rotate JPEG images if coded into EXIF
[ipfire.org.git] / src / backend / util.py
index 5ad6cb618bb1bd0370b8ae38c2d71886149e35a6..cddd677b48e0bb05773ae0964ba89de4f039df86 100644 (file)
 #!/usr/bin/python3
 
+import PIL.ExifTags
 import PIL.Image
 import PIL.ImageFilter
+import PIL.ImageOps
+import datetime
 import io
+import ipaddress
+import location
 import logging
+import pycares
 import random
 import re
+import socket
 import string
 import unicodedata
 
-def parse_search_query(query):
-       q = []
-       for word in query.split():
-               # Is this lexeme negated?
-               negated = word.startswith("!")
+from .decorators import *
+from .misc import Object
+
+# These lists are used to block access to the webapp
+BLOCKLISTS = (
+       "sbl.spamhaus.org",
+       "xbl.spamhaus.org",
+)
+
+BLACKLISTS = (
+       "b.barracudacentral.org",
+       "bl.spamcop.net",
+       "bl.blocklist.de",
+       "cbl.abuseat.org",
+       "dnsbl-1.uceprotect.net",
+       "dnsbl-2.uceprotect.net",
+       "dnsbl-3.uceprotect.net",
+       "dnsbl.abuse.ch",
+       "ix.dnsbl.manitu.net",
+       "pbl.spamhaus.org",
+       "sbl.spamhaus.org",
+       "xbl.spamhaus.org",
+       "zen.spamhaus.org",
+)
+
+class Address(Object):
+       def init(self, address):
+               self.address = ipaddress.ip_address(address)
+
+       def __str__(self):
+               return "%s" % self.address
+
+       @property
+       def family(self):
+               if isinstance(self.address, ipaddress.IPv6Address):
+                       return socket.AF_INET6
+               elif isinstance(self.address, ipaddress.IPv4Address):
+                       return socket.AF_INET
+
+       @lazy_property
+       def network(self):
+               return self.backend.location.lookup("%s" % self.address)
+
+       @property
+       def country_code(self):
+               if self.network:
+                       return self.network.country_code
+
+       @lazy_property
+       def asn(self):
+               if self.network:
+                       return self.network.asn
+
+       @lazy_property
+       def autonomous_system(self):
+               if self.asn:
+                       return self.backend.location.get_as(self.asn)
+
+       def is_anonymous_proxy(self):
+               if self.network:
+                       return self.network.has_flag(location.NETWORK_FLAG_ANONYMOUS_PROXY)
+
+       def is_satellite_provider(self):
+               if self.network:
+                       return self.network.has_flag(location.NETWORK_FLAG_SATELLITE_PROVIDER)
+
+       def is_anycast(self):
+               if self.network:
+                       return self.network.has_flag(location.NETWORK_FLAG_ANYCAST)
+
+       # Blacklist
+
+       def _make_blacklist_rr(self, blacklist):
+               if self.family == socket.AF_INET6:
+                       octets = list(self.address.exploded.replace(":", ""))
+               elif self.family == socket.AF_INET:
+                       octets = str(self.address).split(".")
+               else:
+                       raise NotImplementedError("Unknown IP protocol")
+
+               # Reverse the list
+               octets.reverse()
+
+               # Append suffix
+               octets.append(blacklist)
+
+               return ".".join(octets)
+
+       async def _resolve_blacklist(self, blacklist):
+               return_code = None
+
+               # Get resource record name
+               rr = self._make_blacklist_rr(blacklist)
+
+               # Get query type from IP protocol version
+               if self.family == socket.AF_INET6:
+                       type = pycares.QUERY_TYPE_AAAA
+               elif self.family == socket.AF_INET:
+                       type = pycares.QUERY_TYPE_A
+               else:
+                       raise NotImplementedError("Unknown IP protocol")
+
+               # Run query
+               try:
+                       res = await self.backend.resolver.query(rr, type=type)
+               except IOError as e:
+                       logging.warning(e)
+
+                       return return_code, "%s" % e
+
+               # Not found
+               if not res:
+                       logging.debug("%s is not blacklisted on %s" % (self, blacklist))
+                       return return_code, None
+
+               # Extract return code from DNS response
+               for row in res:
+                       return_code = row.host
+                       break
+
+               # If the IP address is on a blacklist, we will try to fetch the TXT record
+               reason = await self.backend.resolver.query(rr, type=pycares.QUERY_TYPE_TXT)
+
+               # Log result
+               logging.debug("%s is blacklisted on %s: %s" % (self, blacklist, reason or "N/A"))
 
-               # Remove any special characters
-               word = re.sub(r"\W+", "", word, flags=re.UNICODE)
-               if not word:
-                       continue
+               # Take the first reason
+               if reason:
+                       for i in reason:
+                               return return_code, i.text
 
-               # Restore negation
-               if negated:
-                       word = "!%s" % word
+               # Blocked, but no reason
+               return return_code, None
 
-               q.append(word)
+       async def get_blacklists(self):
+               blacklists = { bl : await self._resolve_blacklist(bl) for bl in BLACKLISTS }
+
+               return blacklists
 
-       return " & ".join(q)
 
 def format_size(s, max_unit=None):
        units = ("B", "kB", "MB", "GB", "TB")
@@ -45,6 +173,9 @@ def format_time(s, shorter=True):
        #_ = handler.locale.translate
        _ = lambda x: x
 
+       if isinstance(s, datetime.timedelta):
+               s = s.total_seconds()
+
        hrs, s = divmod(s, 3600)
        min, s = divmod(s, 60)
 
@@ -78,11 +209,30 @@ def normalize(s):
 def generate_thumbnail(data, size, square=False, **args):
        assert data, "No image data received"
 
-       image = PIL.Image.open(io.BytesIO(data))
+       try:
+               image = PIL.Image.open(io.BytesIO(data))
+
+       # If we cannot open the image, we return it in raw form
+       except PIL.UnidentifiedImageError as e:
+               return data
 
        # Save image format
        format = image.format
 
+       # Fetch any EXIF data
+       exif = image._getexif()
+
+       # Rotate the image
+       if exif:
+               for tag in PIL.ExifTags.TAGS:
+                       if PIL.ExifTags.TAGS[tag] == "Orientation":
+                               if exif[tag] == 3:
+                                       image = image.rotate(180, expand=True)
+                               elif exif[tag] == 6:
+                                       image = image.rotate(270, expand=True)
+                               elif exif[tag] == 8:
+                                       image = image.rotate( 90, expand=True)
+
        # Remove any alpha-channels
        if image.format == "JPEG" and not image.mode == "RGB":
                # Make a white background
@@ -100,7 +250,7 @@ def generate_thumbnail(data, size, square=False, **args):
 
        # Resize the image to the desired resolution
        if square:
-               thumbnail = PIL.ImageOps.fit(image, (size, size), PIL.Image.LANCZOS)
+               image = PIL.ImageOps.fit(image, (size, size), PIL.Image.LANCZOS)
        else:
                image.thumbnail((size, size), PIL.Image.LANCZOS)