# encoding: utf-8
import datetime
+import json
import ldap
import ldap.modlist
import logging
import phonenumbers
import sshpubkeys
import time
+import tornado.httpclient
import urllib.parse
import urllib.request
import zxcvbn
"(&(objectClass=inetOrgPerson)(|(sipAuthenticationUser=%s)(telephoneNumber=%s)(homePhone=%s)(mobile=%s)))" \
% (number, number, number, number))
+ @tornado.gen.coroutine
+ def check_spam(self, uid, email, address):
+ sfs = StopForumSpam(self.backend, uid, email, address)
+
+ # Get spam score
+ score = yield sfs.check()
+
+ return score >= 50
+
# Registration
def register(self, uid, email, first_name, last_name):
])
+class StopForumSpam(Object):
+ def init(self, uid, email, address):
+ self.uid, self.email, self.address = uid, email, address
+
+ @tornado.gen.coroutine
+ def send_request(self, **kwargs):
+ arguments = {
+ "json" : "1",
+ }
+ arguments.update(kwargs)
+
+ # Create request
+ request = tornado.httpclient.HTTPRequest(
+ "https://api.stopforumspam.org/api", method="POST")
+ request.body = urllib.parse.urlencode(arguments)
+
+ # Send the request
+ response = yield self.backend.http_client.fetch(request)
+
+ # Decode the JSON response
+ return json.loads(response.body.decode())
+
+ @tornado.gen.coroutine
+ def check_address(self):
+ response = yield self.send_request(ip=self.address)
+
+ try:
+ confidence = response["ip"]["confidence"]
+ except KeyError:
+ confidence = 100
+
+ logging.debug("Confidence for %s: %s" % (self.address, confidence))
+
+ return confidence
+
+ @tornado.gen.coroutine
+ def check_username(self):
+ response = yield self.send_request(username=self.uid)
+
+ try:
+ confidence = response["username"]["confidence"]
+ except KeyError:
+ confidence = 100
+
+ logging.debug("Confidence for %s: %s" % (self.uid, confidence))
+
+ return confidence
+
+ @tornado.gen.coroutine
+ def check_email(self):
+ response = yield self.send_request(email=self.email)
+
+ try:
+ confidence = response["email"]["confidence"]
+ except KeyError:
+ confidence = 100
+
+ logging.debug("Confidence for %s: %s" % (self.email, confidence))
+
+ return confidence
+
+ @tornado.gen.coroutine
+ def check(self, threshold=95):
+ """
+ This function tries to detect if we have a spammer.
+
+ To honour the privacy of our users, we only send the IP
+ address and username and if those are on the database, we
+ will send the email address as well.
+ """
+ confidences = yield [self.check_address(), self.check_username()]
+
+ if any((c < threshold for c in confidences)):
+ confidences += yield [self.check_email()]
+
+ # Build a score based on the lowest confidence
+ return 100 - min(confidences)
+
+
if __name__ == "__main__":
a = Accounts()
import configparser
import io
import tornado.gen
+import tornado.httpclient
from . import accounts
from . import blog
# Setup database.
self.setup_database()
+ # Create HTTPClient
+ self.http_client = tornado.httpclient.AsyncHTTPClient(
+ defaults = {
+ "User-Agent" : "IPFireWebApp",
+ }
+ )
# Initialize settings first.
self.settings = settings.Settings(self)
self.memcache = memcached.Memcached(self)
def run_task(self, task, *args, **kwargs):
tasks = {
"check-mirrors" : self.mirrors.check_all,
+ "check-spam-score" : self.accounts.check_spam_score,
"cleanup" : self.cleanup,
"scan-files" : self.releases.scan_files,
"send-all-messages" : self.messages.queue.send_all,
if not all((self.url, self.api_key, self.api_secret)):
raise RuntimeError("%s is not configured" % self.__class__.__name__)
- # Create HTTPClient
- self.http_client = tornado.httpclient.AsyncHTTPClient(
- defaults = {
- "User-Agent" : "IPFireWebApp",
- }
- )
-
def _make_signature(self, method, path, body):
# Empty since we only support POST
canonical_query = ""
)
# Send the request
- response = yield self.http_client.fetch(request)
+ response = yield self.backend.http_client.fetch(request)
# Decode the JSON response
d = json.loads(response.body.decode())
self.render("auth/register.html")
- @base.blacklisted
+ @tornado.gen.coroutine
@base.ratelimit(minutes=24*60, requests=5)
def post(self):
uid = self.get_argument("uid")
first_name = self.get_argument("first_name")
last_name = self.get_argument("last_name")
+ # Check if this is a spam account
+ is_spam = yield self.backend.accounts.check_spam(uid, email,
+ address=self.get_remote_ip())
+
+ if is_spam:
+ self.render("auth/register-spam.html")
+ return
+
# Register account
try:
with self.db.transaction():