]>
git.ipfire.org Git - pbs.git/blob - src/buildservice/mirrors.py
11 import tornado
.netutil
17 from sqlalchemy
import ARRAY
, Boolean
, Column
, DateTime
, Double
, ForeignKey
, Integer
, Text
18 from sqlalchemy
.dialects
.postgresql
import INET
21 from . import database
22 from . import httpclient
24 from .decorators
import *
27 log
= logging
.getLogger("pbs.mirrors")
29 class Mirrors(base
.Object
):
35 Mirror
.deleted_at
== None,
38 # Order them by hostname
39 .order_by(Mirror
.hostname
)
43 return self
.db
.fetch(stmt
)
45 async def get_by_hostname(self
, hostname
):
50 Mirror
.deleted_at
== None,
53 Mirror
.hostname
== hostname
,
57 return await self
.db
.fetch_one(stmt
)
59 async def create(self
, hostname
, path
, owner
, contact
, notes
, user
=None, check
=True):
63 # Create the new mirror
64 mirror
= await self
.db
.insert(
74 log
.info("Mirror %s has been created" % mirror
)
76 # Perform the first check
78 await mirror
.check(force
=True)
82 async def get_mirrors_for_address(self
, address
):
84 Returns all mirrors in random order with preferred mirrors first
87 # Generate some random value for each mirror
90 # Add the preference score
91 r
+= mirror
.is_preferred_for_address(address
)
95 # Fetch all mirrors and shuffle them, but put preferred mirrors first
96 return sorted([mirror
async for mirror
in self
], key
=__sort
, reverse
=True)
98 @functools.cached_property
101 The location database
103 return location
.Database("/var/lib/location/database.db")
105 @functools.cached_property
110 return tornado
.netutil
.ThreadedResolver()
112 async def check(self
, *args
, **kwargs
):
114 Runs the mirror check for all mirrors
116 # Check all mirrors concurrently
117 async with asyncio
.TaskGroup() as tg
:
118 async for mirror
in self
:
120 mirror
.check(*args
, **kwargs
),
124 class Mirror(database
.Base
, database
.BackendMixin
, database
.SoftDeleteMixin
):
125 __tablename__
= "mirrors"
130 def __lt__(self
, other
):
131 if isinstance(other
, self
.__class
__):
132 return self
.hostname
< other
.hostname
134 return NotImplemented
138 id = Column(Integer
, primary_key
=True)
142 hostname
= Column(Text
, unique
=True, nullable
=False)
144 # XXX Must be unique over non-deleted items
148 path
= Column(Text
, nullable
=False)
154 return self
.make_url()
156 def make_url(self
, *paths
):
157 url
= "https://%s%s/" % (
162 # Merge the path together
163 path
= os
.path
.join(*paths
)
165 # Remove any leading slashes
166 if path
.startswith("/"):
169 return urllib
.parse
.urljoin(url
, path
)
171 # Last Check Success - True if the last check was successful
173 last_check_success
= Column(Boolean
, nullable
=False, default
=False)
177 last_check_at
= Column(DateTime(timezone
=False))
179 # Error Message when the check has been unsuccessful
181 error
= Column(Text
, nullable
=False, default
="")
186 DateTime(timezone
=False), nullable
=False, server_default
=sqlalchemy
.func
.current_timestamp(),
191 created_by_id
= Column(Integer
, ForeignKey("users.id"), nullable
=False)
195 created_by
= sqlalchemy
.orm
.relationship(
196 "User", foreign_keys
=[created_by_id
], lazy
="joined", innerjoin
=True,
201 deleted_by_id
= Column(Integer
, ForeignKey("users.id"))
205 deleted_by
= sqlalchemy
.orm
.relationship(
206 "User", foreign_keys
=[deleted_by_id
], lazy
="selectin",
209 def has_perm(self
, user
):
210 # Anonymous users have no permission
214 # Admins have all permissions
215 return user
.is_admin()
219 owner
= Column(Text
, nullable
=False)
223 contact
= Column(Text
, nullable
=False)
227 notes
= Column(Text
, nullable
=False, default
="")
231 country_code
= Column(Text
)
235 asn
= Column(Integer
)
239 addresses_ipv6
= Column(ARRAY(INET
), nullable
=False, default
=[])
241 def supports_ipv6(self
):
243 Returns True if this mirror supports IPv6
245 if self
.addresses_ipv6
:
252 addresses_ipv4
= Column(ARRAY(INET
), nullable
=False, default
=[])
254 def supports_ipv4(self
):
256 Returns True if this mirror supports IPv4
258 if self
.addresses_ipv4
:
268 All addresses of the mirror, regardless of family
270 return self
.addresses_ipv6
+ self
.addresses_ipv4
272 async def _update_country_code_and_asn(self
):
274 Updates the country code of this mirror
276 # Resolve the hostname
278 addresses
= await self
.backend
.mirrors
.resolver
.resolve(self
.hostname
, port
=443)
281 except socket
.gaierror
as e
:
282 # Name or service not known
284 log
.error("Could not resolve %s: %s" % (self
, e
))
287 # Raise anything else
292 # Store all IP addresses
293 self
.addresses_ipv6
= [
294 address
[0] for family
, address
in addresses
if family
== socket
.AF_INET6
296 self
.addresses_ipv4
= [
297 address
[0] for family
, address
in addresses
if family
== socket
.AF_INET
300 # Lookup the country code and ASN
301 for address
in self
.addresses
:
302 network
= self
.backend
.mirrors
.location
.lookup(address
)
304 # Try the next IP address if we didn't find any data
305 if not network
or not network
.country_code
:
308 # Store the country code
309 self
.country_code
= network
.country_code
312 self
.asn
= network
.asn
317 def is_preferred_for_address(self
, address
):
319 Returns a score of how much the mirror is preferred for this address
322 network
= self
.backend
.mirrors
.location
.lookup("%s" % address
)
324 # Return if we could not find the client
328 first_address
= ipaddress
.ip_address(network
.first_address
)
329 last_address
= ipaddress
.ip_address(network
.last_address
)
331 # Check if the mirror is on the same network
332 for address
in self
.addresses
:
333 # Skip incompatible families
334 if isinstance(address
, ipaddress
.IPv6Address
):
335 if not network
.family
== socket
.AF_INET6
:
337 elif isinstance(address
, ipaddress
.IPv4Address
):
338 if not network
.family
== socket
.AF_INET
:
341 # Check if the address is within the network
342 if first_address
<= address
<= last_address
:
345 # If the AS matches, we will prefer this
346 if self
.asn
and self
.asn
== network
.asn
:
349 # If the mirror and client are in the same country, we prefer this
350 if self
.country_code
and self
.country_code
== network
.country_code
:
353 # Check if we are on the same continent
354 if self
._continent
_match
(self
.country_code
, network
.country_code
):
359 def _continent_match(self
, cc1
, cc2
):
361 Checks if the two given country codes are on the same continent
363 country1
= self
.backend
.mirrors
.location
.get_country(cc1
)
364 country2
= self
.backend
.mirrors
.location
.get_country(cc2
)
366 # If we are missing either country, we don't know
367 if not country1
or not country2
:
370 # Return True if both countries are on the same continent
371 return country1
.continent_code
== country2
.continent_code
375 async def check(self
, force
=False):
376 t
= datetime
.datetime
.utcnow()
378 # Ratelimit checks somewhat
380 # Check mirrors that are up only once an hour
381 if self
.last_check_success
is True:
382 if self
.last_check_at
+ datetime
.timedelta(hours
=1) > t
:
383 log
.debug("Skipping check for %s" % self
)
386 # Check mirrors that are down once every 15 minutes
387 elif self
.last_check_success
is False:
388 if self
.last_check_at
+ datetime
.timedelta(minutes
=15) > t
:
389 log
.debug("Skipping check for %s" % self
)
392 log
.debug("Running mirror check for %s" % self
.hostname
)
394 # Wrap this into one large transaction
395 async with await self
.db
.transaction():
396 # Update the country code & ASN
397 await self
._update
_country
_code
_and
_asn
()
399 # Make URL for .timestamp
400 url
= self
.make_url(".timestamp")
405 # Was this check successful?
408 # When was the last sync?
412 response
= await self
.backend
.httpclient
.fetch(
415 # Allow a moment to connect and get a response
420 # Try to parse the response
422 timestamp
= int(response
.body
)
424 except (TypeError, ValueError) as e
:
425 log
.error("%s responded with an invalid timestamp")
427 raise ValueError("Invalid timestamp received") from e
429 # Convert into datetime
430 timestamp
= datetime
.datetime
.utcfromtimestamp(timestamp
)
432 # Catch anything that isn't 200 OK
433 except httpclient
.HTTPError
as e
:
434 log
.error("%s: %s" % (self
, e
))
438 except socket
.gaierror
as e
:
439 # Name or service not known
441 log
.error("Could not resolve %s: %s" % (self
, e
))
446 # Raise anything else
452 # This check was successful!
456 await self
.db
.insert(
460 response_time
= response
.request_time
if response
else None,
461 http_status
= response
.code
if response
else None,
462 last_sync_at
= timestamp
,
466 # Update the main table
467 self
.last_check_at
= sqlalchemy
.func
.current_timestamp()
468 self
.last_check_success
= success
469 self
.last_sync_at
= timestamp
472 async def get_uptime_since(self
, t
):
473 # Convert timedeltas to absolute time
474 if isinstance(t
, datetime
.timedelta
):
475 t
= datetime
.datetime
.utcnow() - t
478 uptimes
= sqlalchemy
.select(
480 sqlalchemy
.func
.least(
481 sqlalchemy
.func
.lead(
482 MirrorCheck
.checked_at
,
484 sqlalchemy
.func
.current_timestamp(),
486 order_by
=MirrorCheck
.checked_at
.asc(),
489 MirrorCheck
.checked_at
,
490 sqlalchemy
.text("INTERVAL '1 hour'"),
493 MirrorCheck
.mirror_id
== self
.id,
494 MirrorCheck
.checked_at
>= t
,
497 # Check the percentage of how many checks have been successful
498 stmt
= sqlalchemy
.select(
500 sqlalchemy
.func
.extract(
505 uptimes
.c
.success
== True,
509 sqlalchemy
.func
.extract(
516 ).select_from(uptimes
)
519 return await self
.db
.select_one(stmt
, "uptime")
521 async def serves_file(self
, path
):
523 Checks if this mirror serves the file
525 # XXX Skip this if the mirror is not online
528 url
= self
.make_url(path
)
531 cache_key
= "file-check:%s" % url
533 # Check if we have something in the cache
534 serves_file
= await self
.backend
.cache
.get(cache_key
)
536 # Nothing in cache, let's run the check
537 if serves_file
is None:
538 serves_file
= await self
._serves
_file
(url
, cache_key
=cache_key
)
542 async def _serves_file(self
, url
, cache_key
=None):
545 # Send a HEAD request for the URL
547 response
= await self
.backend
.httpclient
.fetch(
548 url
, method
="HEAD", follow_redirects
=True,
550 # Don't allow too much time for the mirror to respond
551 connect_timeout
=5, request_timeout
=5,
553 # Ensure the server responds to all types or requests
556 "Cache-Control" : "no-cache",
560 # Catch any HTTP errors
561 except tornado
.httpclient
.HTTPClientError
as e
:
562 log
.error("Mirror %s returned %s for %s" % (self
, e
.code
, url
))
565 # If there was no error, we assume this file can be downloaded
567 log
.debug("Mirror %s seems to be serving %s")
570 # Store positive responses in the cache for 24 hours
571 # and negative responses for six hours.
579 await self
.backend
.cache
.set(cache_key
, serves_file
, ttl
)
584 class MirrorCheck(database
.Base
):
586 An object that represents a single mirror check
588 __tablename__
= "mirror_checks"
592 mirror_id
= Column(Integer
, ForeignKey("mirrors.id"), primary_key
=True, nullable
=False)
596 checked_at
= Column(DateTime(timezone
=None), primary_key
=True, nullable
=False,
597 server_default
=sqlalchemy
.func
.current_timestamp())
601 success
= Column(Boolean
, nullable
=False)
605 response_time
= Column(Double
)
609 http_status
= Column(Integer
)
613 last_sync_at
= Column(DateTime(timezone
=None))