#!/usr/bin/python3
+import PIL.ExifTags
import PIL.Image
import PIL.ImageFilter
+import PIL.ImageOps
+import datetime
import io
+import ipaddress
+import location
import logging
+import pycares
import random
import re
+import socket
import string
import unicodedata
-def parse_search_query(query):
- q = []
- for word in query.split():
- # Is this lexeme negated?
- negated = word.startswith("!")
+from .decorators import *
+from .misc import Object
+
+# These lists are used to block access to the webapp
+BLOCKLISTS = (
+ "sbl.spamhaus.org",
+ "xbl.spamhaus.org",
+)
+
+BLACKLISTS = (
+ "b.barracudacentral.org",
+ "bl.spamcop.net",
+ "bl.blocklist.de",
+ "cbl.abuseat.org",
+ "dnsbl-1.uceprotect.net",
+ "dnsbl-2.uceprotect.net",
+ "dnsbl-3.uceprotect.net",
+ "dnsbl.abuse.ch",
+ "ix.dnsbl.manitu.net",
+ "pbl.spamhaus.org",
+ "sbl.spamhaus.org",
+ "xbl.spamhaus.org",
+ "zen.spamhaus.org",
+)
+
+class Address(Object):
+ def init(self, address):
+ self.address = ipaddress.ip_address(address)
+
+ def __str__(self):
+ return "%s" % self.address
+
+ @property
+ def family(self):
+ if isinstance(self.address, ipaddress.IPv6Address):
+ return socket.AF_INET6
+ elif isinstance(self.address, ipaddress.IPv4Address):
+ return socket.AF_INET
+
+ @lazy_property
+ def network(self):
+ return self.backend.location.lookup("%s" % self.address)
+
+ @property
+ def country_code(self):
+ if self.network:
+ return self.network.country_code
+
+ @lazy_property
+ def asn(self):
+ if self.network:
+ return self.network.asn
+
+ @lazy_property
+ def autonomous_system(self):
+ if self.asn:
+ return self.backend.location.get_as(self.asn)
+
+ def is_anonymous_proxy(self):
+ if self.network:
+ return self.network.has_flag(location.NETWORK_FLAG_ANONYMOUS_PROXY)
+
+ def is_satellite_provider(self):
+ if self.network:
+ return self.network.has_flag(location.NETWORK_FLAG_SATELLITE_PROVIDER)
+
+ def is_anycast(self):
+ if self.network:
+ return self.network.has_flag(location.NETWORK_FLAG_ANYCAST)
+
+ # Blacklist
+
+ def _make_blacklist_rr(self, blacklist):
+ if self.family == socket.AF_INET6:
+ octets = list(self.address.exploded.replace(":", ""))
+ elif self.family == socket.AF_INET:
+ octets = str(self.address).split(".")
+ else:
+ raise NotImplementedError("Unknown IP protocol")
+
+ # Reverse the list
+ octets.reverse()
+
+ # Append suffix
+ octets.append(blacklist)
+
+ return ".".join(octets)
+
+ async def _resolve_blacklist(self, blacklist):
+ return_code = None
+
+ # Get resource record name
+ rr = self._make_blacklist_rr(blacklist)
+
+ # Get query type from IP protocol version
+ if self.family == socket.AF_INET6:
+ type = pycares.QUERY_TYPE_AAAA
+ elif self.family == socket.AF_INET:
+ type = pycares.QUERY_TYPE_A
+ else:
+ raise NotImplementedError("Unknown IP protocol")
+
+ # Run query
+ try:
+ res = await self.backend.resolver.query(rr, type=type)
+ except IOError as e:
+ logging.warning(e)
+
+ return return_code, "%s" % e
+
+ # Not found
+ if not res:
+ logging.debug("%s is not blacklisted on %s" % (self, blacklist))
+ return return_code, None
+
+ # Extract return code from DNS response
+ for row in res:
+ return_code = row.host
+ break
+
+ # If the IP address is on a blacklist, we will try to fetch the TXT record
+ reason = await self.backend.resolver.query(rr, type=pycares.QUERY_TYPE_TXT)
+
+ # Log result
+ logging.debug("%s is blacklisted on %s: %s" % (self, blacklist, reason or "N/A"))
- # Remove any special characters
- word = re.sub(r"\W+", "", word, flags=re.UNICODE)
- if not word:
- continue
+ # Take the first reason
+ if reason:
+ for i in reason:
+ return return_code, i.text
- # Restore negation
- if negated:
- word = "!%s" % word
+ # Blocked, but no reason
+ return return_code, None
- q.append(word)
+ async def get_blacklists(self):
+ blacklists = { bl : await self._resolve_blacklist(bl) for bl in BLACKLISTS }
+
+ return blacklists
- return " & ".join(q)
def format_size(s, max_unit=None):
units = ("B", "kB", "MB", "GB", "TB")
#_ = handler.locale.translate
_ = lambda x: x
+ if isinstance(s, datetime.timedelta):
+ s = s.total_seconds()
+
hrs, s = divmod(s, 3600)
min, s = divmod(s, 60)
def generate_thumbnail(data, size, square=False, **args):
assert data, "No image data received"
- image = PIL.Image.open(io.BytesIO(data))
+ try:
+ image = PIL.Image.open(io.BytesIO(data))
+
+ # If we cannot open the image, we return it in raw form
+ except PIL.UnidentifiedImageError as e:
+ return data
# Save image format
format = image.format
+ # Fetch any EXIF data
+ exif = image._getexif()
+
+ # Rotate the image
+ if exif:
+ for tag in PIL.ExifTags.TAGS:
+ if PIL.ExifTags.TAGS[tag] == "Orientation":
+ if exif[tag] == 3:
+ image = image.rotate(180, expand=True)
+ elif exif[tag] == 6:
+ image = image.rotate(270, expand=True)
+ elif exif[tag] == 8:
+ image = image.rotate( 90, expand=True)
+
# Remove any alpha-channels
if image.format == "JPEG" and not image.mode == "RGB":
# Make a white background
# Resize the image to the desired resolution
if square:
- thumbnail = PIL.ImageOps.fit(image, (size, size), PIL.Image.LANCZOS)
+ image = PIL.ImageOps.fit(image, (size, size), PIL.Image.LANCZOS)
else:
image.thumbnail((size, size), PIL.Image.LANCZOS)