import datetime
import logging
+import iso3166
import math
import os.path
import random
import socket
import time
+import ssl
import tornado.httpclient
+import tornado.iostream
import tornado.netutil
import urllib.parse
+from . import countries
+from . import util
from .misc import Object
-
-class Downloads(Object):
- @property
- def total(self):
- ret = self.db.get("SELECT COUNT(*) AS total FROM log_download")
-
- return ret.total
-
- @property
- def today(self):
- ret = self.db.get("SELECT COUNT(*) AS today FROM log_download WHERE date::date = NOW()::date")
-
- return ret.today
-
- @property
- def yesterday(self):
- ret = self.db.get("SELECT COUNT(*) AS yesterday FROM log_download WHERE date::date = (NOW() - INTERVAL '1 day')::date")
-
- return ret.yesterday
-
- @property
- def daily_map(self):
- ret = self.db.query("WITH downloads AS (SELECT * FROM log_download \
- WHERE DATE(date) BETWEEN (NOW()::date - INTERVAL '30 days') AND DATE(NOW())) \
- SELECT DATE(date) AS date, COUNT(*) AS count FROM downloads \
- GROUP BY DATE(date) ORDER BY date")
-
- return ret
-
- def get_countries(self, duration="all"):
- query = "SELECT country_code, count(country_code) AS count FROM log_download"
-
- if duration == "today":
- query += " WHERE date::date = NOW()::date"
-
- query += " GROUP BY country_code ORDER BY count DESC"
-
- results = self.db.query(query)
- ret = []
-
- count = sum([o.count for o in results])
- if count:
- for res in results:
- ret.append((res.country_code, res.count / count))
-
- return ret
-
- def get_mirror_load(self, duration="all"):
- query = "SELECT mirror, COUNT(mirror) AS count FROM log_download"
-
- if duration == "today":
- query += " WHERE date::date = NOW()::date"
-
- query += " GROUP BY mirror ORDER BY count DESC"
-
- results = self.db.query(query)
- ret = {}
-
- count = sum([o.count for o in results])
- if count:
- for res in results:
- mirror = self.mirrors.get(res.mirror)
- ret[mirror.hostname] = res.count / count
-
- return ret
-
+from .decorators import *
class Mirrors(Object):
- def check_all(self):
- for mirror in self.get_all():
- mirror.check()
-
- def get(self, id):
- return Mirror(self.backend, id)
-
- def get_all(self):
- res = self.db.query("SELECT * FROM mirrors WHERE enabled = %s", True)
-
- mirrors = []
- for row in res:
- mirror = Mirror(self.backend, row.id, row)
- mirrors.append(mirror)
-
- return MirrorSet(self.backend, sorted(mirrors))
-
- def get_by_hostname(self, hostname):
- ret = self.db.get("SELECT * FROM mirrors WHERE hostname = %s", hostname)
-
- if ret:
- return Mirror(self.backend, ret.id, ret)
-
- def get_with_file(self, filename, country=None):
- # XXX quick and dirty solution - needs a performance boost
- mirror_ids = [m.mirror for m in self.db.query("SELECT mirror FROM mirror_files WHERE filename=%s", filename)]
-
- #if country:
- # # Sort out all mirrors that are not preferred to the given country
- # for mirror in self.get_for_country(country):
- # if not mirror.id in mirror_ids:
- # mirror_ids.remove(mirror.id)
-
- mirrors = []
- for mirror_id in mirror_ids:
- mirror = self.get(mirror_id)
- if not mirror.state == "UP":
- continue
- mirrors.append(mirror)
-
- return mirrors
+ def _get_mirrors(self, query, *args):
+ res = self.db.query(query, *args)
- def get_for_location(self, location, max_distance=4000, filename=None):
- if not location:
- return []
-
- if filename:
- res = self.db.query("\
- WITH client AS (SELECT point(%s, %s) AS location) \
- SELECT * FROM mirrors WHERE mirrors.state = %s \
- AND mirrors.id IN ( \
- SELECT mirror FROM mirror_files WHERE filename = %s \
- ) AND mirrors.id IN ( \
- SELECT id FROM mirrors_locations, client \
- WHERE geodistance(mirrors_locations.location, client.location) <= %s \
- )",
- location.latitude, location.longitude, "UP", filename, max_distance)
- else:
- res = self.db.query("\
- WITH client AS (SELECT point(%s, %s) AS location) \
- SELECT * FROM mirrors WHERE mirrors.state = %s AND mirrors.id IN ( \
- SELECT id FROM mirrors_locations, client \
- WHERE geodistance(mirrors_locations.location, client.location) <= %s \
- )",
- location.latitude, location.longitude, "UP", max_distance)
-
- mirrors = []
for row in res:
- mirror = Mirror(self.backend, row.id, row)
- mirrors.append(mirror)
+ yield Mirror(self.backend, row.id, data=row)
- return sorted(mirrors, reverse=True)
+ def _get_mirror(self, query, *args):
+ res = self.db.get(query, *args)
- def get_all_files(self):
- files = []
-
- for mirror in self.get_all():
- if not mirror.state == "UP":
- continue
-
- for file in mirror.filelist:
- if not file in files:
- files.append(file)
-
- return files
-
- def get_random(self, filename=None):
- if filename:
- ret = self.db.get("SELECT * FROM mirrors WHERE state = %s \
- AND mirrors.id IN (SELECT mirror FROM mirror_files \
- WHERE filename = %s) ORDER BY RANDOM() LIMIT 1", "UP", filename)
- else:
- ret = self.db.get("SELECT * FROM mirrors WHERE state = %s \
- ORDER BY RANDOM() LIMIT 1", "UP")
-
- if ret:
- return Mirror(self.backend, ret.id, ret)
-
- def file_exists(self, filename):
- ret = self.db.get("SELECT 1 FROM mirror_files \
- WHERE filename = %s LIMIT 1", filename)
-
- if ret:
- return True
-
- return False
-
-
-class MirrorSet(Object):
- def __init__(self, backend, mirrors):
- Object.__init__(self, backend)
-
- self._mirrors = mirrors
-
- def __add__(self, other):
- mirrors = []
-
- for mirror in self._mirrors + other._mirrors:
- if mirror in mirrors:
- continue
-
- mirrors.append(mirror)
-
- return MirrorSet(self.backend, mirrors)
-
- def __sub__(self, other):
- mirrors = self._mirrors[:]
-
- for mirror in other._mirrors:
- if mirror in mirrors:
- mirrors.remove(mirror)
-
- return MirrorSet(self.backend, mirrors)
+ if res:
+ return Mirror(self.backend, res.id, data=res)
def __iter__(self):
- return iter(self._mirrors)
-
- def __len__(self):
- return len(self._mirrors)
+ mirrors = self._get_mirrors("SELECT * FROM mirrors \
+ WHERE enabled IS TRUE ORDER BY hostname")
+
+ return iter(mirrors)
+
+ async def check_all(self):
+ for mirror in self:
+ with self.db.transaction():
+ await mirror.check()
+
+ def get_for_download(self, filename, country_code=None):
+ # Try to find a good mirror for this country first
+ if country_code:
+ zone = countries.get_zone(country_code)
+
+ mirror = self._get_mirror("SELECT mirrors.* FROM mirror_files files \
+ LEFT JOIN mirrors ON files.mirror = mirrors.id \
+ WHERE files.filename = %s \
+ AND mirrors.enabled IS TRUE AND mirrors.state = %s \
+ AND mirrors.country_code = ANY(%s) \
+ ORDER BY RANDOM() LIMIT 1", filename, "UP",
+ countries.get_in_zone(zone))
+
+ if mirror:
+ return mirror
+
+ # Get a random mirror that serves the file
+ return self._get_mirror("SELECT mirrors.* FROM mirror_files files \
+ LEFT JOIN mirrors ON files.mirror = mirrors.id \
+ WHERE files.filename = %s \
+ AND mirrors.enabled IS TRUE AND mirrors.state = %s \
+ ORDER BY RANDOM() LIMIT 1", filename, "UP")
- def __str__(self):
- return "<MirrorSet %s>" % ", ".join([m.hostname for m in self._mirrors])
-
- def get_with_file(self, filename):
- with_file = [m.mirror for m in self.db.query("SELECT mirror FROM mirror_files WHERE filename=%s", filename)]
-
- mirrors = []
- for mirror in self._mirrors:
- if mirror.id in with_file:
- mirrors.append(mirror)
-
- return MirrorSet(self.backend, mirrors)
-
- def get_random(self):
- mirrors = []
- for mirror in self._mirrors:
- for i in range(0, mirror.priority):
- mirrors.append(mirror)
-
- return random.choice(mirrors)
-
- def get_for_location(self, location):
- distance = 2500
- mirrors = []
-
- if location:
- while len(mirrors) <= 3 and distance <= 8000:
- for mirror in self._mirrors:
- if mirror in mirrors:
- continue
-
- mirror_distance = mirror.distance_to(location)
- if mirror_distance is None:
- continue
-
- if mirror_distance <= distance:
- mirrors.append(mirror)
-
- distance *= 1.2
-
- return MirrorSet(self.backend, mirrors)
+ def get_by_hostname(self, hostname):
+ return self._get_mirror("SELECT * FROM mirrors \
+ WHERE hostname = %s", hostname)
- def get_with_state(self, state):
- mirrors = []
+ def get_by_countries(self):
+ mirrors = {}
- for mirror in self._mirrors:
- if mirror.state == state:
- mirrors.append(mirror)
+ for m in self:
+ try:
+ mirrors[m.country].append(m)
+ except KeyError:
+ mirrors[m.country] = [m]
- return MirrorSet(self.backend, mirrors)
+ return mirrors
class Mirror(Object):
- def __init__(self, backend, id, data=None):
- Object.__init__(self, backend)
+ def init(self, id, data=None):
+ self.id = id
+ self.data = data
- self.id = id
-
- if data:
- self._info = data
- else:
- self._info = self.db.get("SELECT * FROM mirrors WHERE id = %s", self.id)
- self._info["url"] = self.generate_url()
-
- self.__location = None
- self.__country_name = None
+ def __str__(self):
+ return self.hostname
def __repr__(self):
return "<%s %s>" % (self.__class__.__name__, self.url)
- def __cmp__(self, other):
- ret = cmp(self.country_code, other.country_code)
+ def __eq__(self, other):
+ if isinstance(other, self.__class__):
+ return self.id == other.id
- if not ret:
- ret = cmp(self.hostname, other.hostname)
+ def __lt__(self, other):
+ if isinstance(other, self.__class__):
+ return self.hostname < other.hostname
- return ret
+ def __hash__(self):
+ return self.id
- def generate_url(self):
+ @lazy_property
+ def url(self):
url = "%s://%s" % ("https" if self.supports_https else "http", self.hostname)
+
if not self.path.startswith("/"):
url += "/"
+
url += "%s" % self.path
+
if not self.path.endswith("/"):
url += "/"
+
return url
@property
def hostname(self):
- return self._info.hostname
+ return self.data.hostname
+
+ @lazy_property
+ def address(self):
+ """
+ Returns the stored address
+ """
+ if self.data.address:
+ return util.Address(self.backend, self.data.address)
@property
def path(self):
- return self._info.path
+ return self.data.path
@property
def supports_https(self):
- return self._info.supports_https
-
- @property
- def address(self):
- for addr in self.addresses4:
- return addr
-
- for addr in self.addresses6:
- return addr
+ return self.data.supports_https
@property
def owner(self):
- return self._info.owner
-
- @property
- def location(self):
- if self.__location is None:
- self.__location = self.geoip.get_location(self.address)
-
- return self.__location
-
- @property
- def latitude(self):
- if self.location:
- return self.location.latitude
-
- @property
- def longitude(self):
- if self.location:
- return self.location.longitude
-
- @property
- def coordinates(self):
- return (self.latitude, self.longitude)
+ return self.data.owner
@property
- def coordiante_str(self):
- coordinates = []
-
- for i in self.coordinates:
- coordinates.append("%s" % i)
-
- return ",".join(coordinates)
+ def country(self):
+ return iso3166.countries.get(self.country_code)
@property
def country_code(self):
- if self.location:
- return self.location.country
+ if self.data.country_code:
+ return self.data.country_code
- @property
- def country_name(self):
- if self.__country_name is None:
- self.__country_name = self.geoip.get_country_name(self.country_code)
-
- return self.__country_name
+ if self.address:
+ return self.address.country_code
@property
- def location_str(self):
- location = []
-
- if self._info.location:
- location.append(self._info.location)
-
- elif self.location:
- location.append(self.location.city)
- location.append(self.country_name)
-
- return ", ".join([s for s in location if s])
+ def zone(self):
+ return countries.get_zone(self.country_name)
@property
def asn(self):
- if not hasattr(self, "__asn"):
- self.__asn = self.geoip.get_asn(self.address)
-
- return self.__asn
+ if self.address:
+ return self.address.asn
@property
def filelist(self):
def prefix(self):
return ""
- @property
- def url(self):
- return self._info.url
-
def build_url(self, filename):
return urllib.parse.urljoin(self.url, filename)
@property
def last_update(self):
- return self._info.last_update
+ return self.data.last_update
@property
def state(self):
- return self._info.state
+ return self.data.state
def set_state(self, state):
- logging.info("Setting state of %s to %s" % (self.hostname, state))
+ logging.debug("Setting state of %s to %s" % (self.hostname, state))
if self.state == state:
return
self.db.execute("UPDATE mirrors SET state = %s WHERE id = %s", state, self.id)
# Reload changed settings
- if hasattr(self, "_info"):
- self._info["state"] = state
+ self.data["state"] = state
@property
def enabled(self):
- return self._info.enabled
+ return self.data.enabled
@property
def disabled(self):
return not self.enabled
- def check(self):
- logging.info("Running check for mirror %s" % self.hostname)
+ async def check(self):
+ logging.debug("Running check for mirror %s" % self.hostname)
self.db.execute("UPDATE mirrors SET address = %s WHERE id = %s",
- self.address, self.id)
+ await self.resolve(), self.id)
- self.check_timestamp()
- self.check_filelist()
+ success = await self.check_timestamp()
+ if success:
+ await self.check_filelist()
- def check_state(self):
+ def check_state(self, last_update):
logging.debug("Checking state of mirror %s" % self.id)
if not self.enabled:
now = datetime.datetime.utcnow()
- time_delta = now - self.last_update
+ time_delta = now - last_update
time_diff = time_delta.total_seconds()
time_down = self.settings.get_int("mirrors_time_down", 3*24*60*60)
self.set_state("UP")
- def check_timestamp(self):
+ async def check_timestamp(self):
http = tornado.httpclient.AsyncHTTPClient()
- http.fetch(self.url + ".timestamp",
- headers={ "Pragma" : "no-cache" },
- callback=self.__check_timestamp_response)
+ try:
+ response = await http.fetch(self.url + ".timestamp",
+ headers={ "Pragma" : "no-cache" })
+ except tornado.httpclient.HTTPError as e:
+ logging.warning("Error getting timestamp from %s: %s" % (self.hostname, e))
+ self.set_state("DOWN")
+ return False
+
+ except ssl.SSLError as e:
+ logging.warning("SSL error when getting timestamp from %s: %s" % (self.hostname, e))
+ self.set_state("DOWN")
+ return False
+
+ except tornado.iostream.StreamClosedError as e:
+ logging.warning("Connection closed unexpectedly for %s: %s" % (self.hostname, e))
+ self.set_state("DOWN")
+ return False
+
+ except OSError as e:
+ logging.warning("Could not connect to %s: %s" % (self.hostname, e))
+ self.set_state("DOWN")
+ return False
- def __check_timestamp_response(self, response):
if response.error:
- logging.debug("Error getting timestamp from %s" % self.hostname)
+ logging.warning("Error getting timestamp from %s" % self.hostname)
self.set_state("DOWN")
return
self.db.execute("UPDATE mirrors SET last_update = %s WHERE id = %s",
timestamp, self.id)
- # Reload changed settings
- if hasattr(self, "_info"):
- self._info["timestamp"] = timestamp
+ # Update state
+ self.check_state(timestamp)
- self.check_state()
+ logging.debug("Successfully updated timestamp from %s" % self.hostname)
- logging.info("Successfully updated timestamp from %s" % self.hostname)
+ return True
- def check_filelist(self):
+ async def check_filelist(self):
# XXX need to remove data from disabled mirrors
if not self.enabled:
return
http = tornado.httpclient.AsyncHTTPClient()
- http.fetch(self.url + ".filelist",
- headers={ "Pragma" : "no-cache" },
- callback=self.__check_filelist_response)
+ try:
+ response = await http.fetch(self.url + ".filelist",
+ headers={ "Pragma" : "no-cache" })
+ except tornado.httpclient.HTTPError as e:
+ logging.warning("Error getting filelist from %s: %s" % (self.hostname, e))
+ self.set_state("DOWN")
+ return
- def __check_filelist_response(self, response):
if response.error:
- logging.debug("Error getting timestamp from %s" % self.hostname)
+ logging.debug("Error getting filelist from %s" % self.hostname)
return
- files = self.filelist
+ # Drop the old filelist
+ self.db.execute("DELETE FROM mirror_files WHERE mirror = %s", self.id)
+ # Add them all again
for file in response.body.splitlines():
- file = os.path.join(self.prefix, file)
-
- if file in files:
- files.remove(file)
- continue
-
- self.db.execute("INSERT INTO mirror_files(mirror, filename) VALUES(%s, %s)",
- self.id, file)
-
- for file in files:
- self.db.execute("DELETE FROM mirror_files WHERE mirror=%s AND filename=%s",
- self.id, file)
-
- logging.info("Successfully updated mirror filelist from %s" % self.hostname)
-
- @property
- def prefer_for_countries(self):
- countries = self._info.get("prefer_for_countries", "")
- if countries:
- return sorted(countries.split(", "))
-
- return []
-
- @property
- def prefer_for_countries_names(self):
- countries = [self.geoip.get_country_name(c.upper()) for c in self.prefer_for_countries]
-
- return sorted(countries)
-
- def distance_to(self, location, ignore_preference=False):
- if not location:
- return None
-
- country_code = None
- if location.country:
- country_code = location.country.lower()
-
- if not ignore_preference and country_code in self.prefer_for_countries:
- return 0
-
- # http://www.movable-type.co.uk/scripts/latlong.html
-
- if self.latitude is None:
- return None
+ file = os.path.join(self.prefix, file.decode())
- if self.longitude is None:
- return None
+ self.db.execute("INSERT INTO mirror_files(mirror, filename) \
+ VALUES(%s, %s)", self.id, file)
- earth = 6371 # km
- delta_lat = math.radians(self.latitude - location.latitude)
- delta_lon = math.radians(self.longitude - location.longitude)
-
- lat1 = math.radians(self.latitude)
- lat2 = math.radians(location.latitude)
-
- a = math.sin(delta_lat / 2) ** 2
- a += math.cos(lat1) * math.cos(lat2) * (math.sin(delta_lon / 2) ** 2)
-
- b1 = math.sqrt(a)
- b2 = math.sqrt(1 - a)
-
- c = 2 * math.atan2(b1, b2)
-
- return c * earth
-
- def traffic(self, since):
- # XXX needs to be done better
-
- files = {}
- for entry in self.db.query("SELECT filename, filesize FROM files"):
- files[entry.filename] = entry.filesize
-
- query = "SELECT COUNT(filename) as count, filename FROM log_download WHERE mirror = %s"
- query += " AND date >= %s GROUP BY filename"
-
- traffic = 0
- for entry in self.db.query(query, self.id, since):
- if entry.filename in files:
- traffic += entry.count * files[entry.filename]
-
- return traffic
-
- @property
- def priority(self):
- return self._info.get("priority", 10)
+ logging.debug("Successfully updated mirror filelist from %s" % self.hostname)
@property
def development(self):
- return self._info.get("mirrorlist_devel", False)
+ return self.data.get("mirrorlist_devel", False)
@property
def mirrorlist(self):
- return self._info.get("mirrorlist", False)
-
- @property
- def addresses(self):
- if not hasattr(self, "__addresses"):
- try:
- addrinfo = socket.getaddrinfo(self.hostname, 0, socket.AF_UNSPEC, socket.SOCK_STREAM)
- except:
- raise Exception("Could not resolve %s" % self.hostname)
+ return self.data.get("mirrorlist", False)
- ret = []
- for family, socktype, proto, canonname, address in addrinfo:
- if family == socket.AF_INET:
- address, port = address
- elif family == socket.AF_INET6:
- address, port, flowid, scopeid = address
- ret.append((family, address))
+ async def resolve(self):
+ """
+ Returns a single IP address of this mirror
+ """
+ addresses = await self.backend.resolver.resolve(self.hostname, 0)
- self.__addresses = ret
+ # Return the first address
+ for family, address in addresses:
+ host, port = address
- return self.__addresses
-
- @property
- def addresses6(self):
- return [address for family, address in self.addresses if family == socket.AF_INET6]
-
- @property
- def addresses4(self):
- return [address for family, address in self.addresses if family == socket.AF_INET]
+ return host