]> git.ipfire.org Git - pbs.git/blob - src/buildservice/mirrors.py
builds: Load all builds with the group
[pbs.git] / src / buildservice / mirrors.py
1 #!/usr/bin/python
2
3 import asyncio
4 import datetime
5 import functools
6 import ipaddress
7 import logging
8 import os.path
9 import random
10 import socket
11 import tornado.netutil
12 import urllib.parse
13
14 import location
15
16 import sqlalchemy
17 from sqlalchemy import ARRAY, Boolean, Column, DateTime, Double, ForeignKey, Integer, Text
18 from sqlalchemy.dialects.postgresql import INET
19
20 from . import base
21 from . import database
22 from . import httpclient
23
24 from .decorators import *
25
26 # Setup logging
27 log = logging.getLogger("pbs.mirrors")
28
29 class Mirrors(base.Object):
30 def __aiter__(self):
31 stmt = (
32 sqlalchemy
33 .select(Mirror)
34 .where(
35 Mirror.deleted_at == None,
36 )
37
38 # Order them by hostname
39 .order_by(Mirror.hostname)
40 )
41
42 # Fetch the mirrors
43 return self.db.fetch(stmt)
44
45 async def get_by_hostname(self, hostname):
46 stmt = (
47 sqlalchemy
48 .select(Mirror)
49 .where(
50 Mirror.deleted_at == None,
51
52 # Match by hostname
53 Mirror.hostname == hostname,
54 )
55 )
56
57 return await self.db.fetch_one(stmt)
58
59 async def create(self, hostname, path, owner, contact, notes, user=None, check=True):
60 """
61 Creates a new mirror
62 """
63 # Create the new mirror
64 mirror = await self.db.insert(
65 Mirror,
66 hostname = hostname,
67 path = path,
68 owner = owner,
69 contact = contact,
70 notes = notes,
71 created_by = user,
72 )
73
74 log.info("Mirror %s has been created" % mirror)
75
76 # Perform the first check
77 if check:
78 await mirror.check(force=True)
79
80 return mirror
81
82 async def get_mirrors_for_address(self, address):
83 """
84 Returns all mirrors in random order with preferred mirrors first
85 """
86 def __sort(mirror):
87 # Generate some random value for each mirror
88 r = random.random()
89
90 # Add the preference score
91 r += mirror.is_preferred_for_address(address)
92
93 return r
94
95 # Fetch all mirrors and shuffle them, but put preferred mirrors first
96 return sorted([mirror async for mirror in self], key=__sort, reverse=True)
97
98 @functools.cached_property
99 def location(self):
100 """
101 The location database
102 """
103 return location.Database("/var/lib/location/database.db")
104
105 @functools.cached_property
106 def resolver(self):
107 """
108 A DNS resolver
109 """
110 return tornado.netutil.ThreadedResolver()
111
112 async def check(self, *args, **kwargs):
113 """
114 Runs the mirror check for all mirrors
115 """
116 # Check all mirrors concurrently
117 async with asyncio.TaskGroup() as tg:
118 async for mirror in self:
119 tg.create_task(
120 mirror.check(*args, **kwargs),
121 )
122
123
124 class Mirror(database.Base, database.BackendMixin, database.SoftDeleteMixin):
125 __tablename__ = "mirrors"
126
127 def __str__(self):
128 return self.hostname
129
130 def __lt__(self, other):
131 if isinstance(other, self.__class__):
132 return self.hostname < other.hostname
133
134 return NotImplemented
135
136 # ID
137
138 id = Column(Integer, primary_key=True)
139
140 # Hostname
141
142 hostname = Column(Text, unique=True, nullable=False)
143
144 # XXX Must be unique over non-deleted items
145
146 # Path
147
148 path = Column(Text, nullable=False)
149
150 # URL
151
152 @property
153 def url(self):
154 return self.make_url()
155
156 def make_url(self, *paths):
157 url = "https://%s%s/" % (
158 self.hostname,
159 self.path
160 )
161
162 # Merge the path together
163 path = os.path.join(*paths)
164
165 # Remove any leading slashes
166 if path.startswith("/"):
167 path = path[1:]
168
169 return urllib.parse.urljoin(url, path)
170
171 # Last Check Success - True if the last check was successful
172
173 last_check_success = Column(Boolean, nullable=False, default=False)
174
175 # Last Check At
176
177 last_check_at = Column(DateTime(timezone=False))
178
179 # Error Message when the check has been unsuccessful
180
181 error = Column(Text, nullable=False, default="")
182
183 # Created At
184
185 created_at = Column(
186 DateTime(timezone=False), nullable=False, server_default=sqlalchemy.func.current_timestamp(),
187 )
188
189 # Created By ID
190
191 created_by_id = Column(Integer, ForeignKey("users.id"), nullable=False)
192
193 # Created By
194
195 created_by = sqlalchemy.orm.relationship(
196 "User", foreign_keys=[created_by_id], lazy="joined", innerjoin=True,
197 )
198
199 # Deleted By ID
200
201 deleted_by_id = Column(Integer, ForeignKey("users.id"))
202
203 # Deleted By
204
205 deleted_by = sqlalchemy.orm.relationship(
206 "User", foreign_keys=[deleted_by_id], lazy="selectin",
207 )
208
209 def has_perm(self, user):
210 # Anonymous users have no permission
211 if not user:
212 return False
213
214 # Admins have all permissions
215 return user.is_admin()
216
217 # Owner
218
219 owner = Column(Text, nullable=False)
220
221 # Contact
222
223 contact = Column(Text, nullable=False)
224
225 # Notes
226
227 notes = Column(Text, nullable=False, default="")
228
229 # Country Code
230
231 country_code = Column(Text)
232
233 # ASN
234
235 asn = Column(Integer)
236
237 # Addresses IPv6
238
239 addresses_ipv6 = Column(ARRAY(INET), nullable=False, default=[])
240
241 def supports_ipv6(self):
242 """
243 Returns True if this mirror supports IPv6
244 """
245 if self.addresses_ipv6:
246 return True
247
248 return False
249
250 # Addresses IPv4
251
252 addresses_ipv4 = Column(ARRAY(INET), nullable=False, default=[])
253
254 def supports_ipv4(self):
255 """
256 Returns True if this mirror supports IPv4
257 """
258 if self.addresses_ipv4:
259 return True
260
261 return False
262
263 # Addresses
264
265 @property
266 def addresses(self):
267 """
268 All addresses of the mirror, regardless of family
269 """
270 return self.addresses_ipv6 + self.addresses_ipv4
271
272 async def _update_country_code_and_asn(self):
273 """
274 Updates the country code of this mirror
275 """
276 # Resolve the hostname
277 try:
278 addresses = await self.backend.mirrors.resolver.resolve(self.hostname, port=443)
279
280 # XXX Catch this!
281 except socket.gaierror as e:
282 # Name or service not known
283 if e.errno == -2:
284 log.error("Could not resolve %s: %s" % (self, e))
285 return
286
287 # Raise anything else
288 raise e
289
290 print(addresses)
291
292 # Store all IP addresses
293 self.addresses_ipv6 = [
294 address[0] for family, address in addresses if family == socket.AF_INET6
295 ]
296 self.addresses_ipv4 = [
297 address[0] for family, address in addresses if family == socket.AF_INET
298 ]
299
300 # Lookup the country code and ASN
301 for address in self.addresses:
302 network = self.backend.mirrors.location.lookup(address)
303
304 # Try the next IP address if we didn't find any data
305 if not network or not network.country_code:
306 continue
307
308 # Store the country code
309 self.country_code = network.country_code
310
311 # Store the ASN
312 self.asn = network.asn
313
314 # Once is enough
315 break
316
317 def is_preferred_for_address(self, address):
318 """
319 Returns a score of how much the mirror is preferred for this address
320 """
321 # Lookup the client
322 network = self.backend.mirrors.location.lookup("%s" % address)
323
324 # Return if we could not find the client
325 if not network:
326 return 0
327
328 first_address = ipaddress.ip_address(network.first_address)
329 last_address = ipaddress.ip_address(network.last_address)
330
331 # Check if the mirror is on the same network
332 for address in self.addresses:
333 # Skip incompatible families
334 if isinstance(address, ipaddress.IPv6Address):
335 if not network.family == socket.AF_INET6:
336 continue
337 elif isinstance(address, ipaddress.IPv4Address):
338 if not network.family == socket.AF_INET:
339 continue
340
341 # Check if the address is within the network
342 if first_address <= address <= last_address:
343 return 4
344
345 # If the AS matches, we will prefer this
346 if self.asn and self.asn == network.asn:
347 return 3
348
349 # If the mirror and client are in the same country, we prefer this
350 if self.country_code and self.country_code == network.country_code:
351 return 2
352
353 # Check if we are on the same continent
354 if self._continent_match(self.country_code, network.country_code):
355 return 1
356
357 return 0
358
359 def _continent_match(self, cc1, cc2):
360 """
361 Checks if the two given country codes are on the same continent
362 """
363 country1 = self.backend.mirrors.location.get_country(cc1)
364 country2 = self.backend.mirrors.location.get_country(cc2)
365
366 # If we are missing either country, we don't know
367 if not country1 or not country2:
368 return False
369
370 # Return True if both countries are on the same continent
371 return country1.continent_code == country2.continent_code
372
373 # Check
374
375 async def check(self, force=False):
376 t = datetime.datetime.utcnow()
377
378 # Ratelimit checks somewhat
379 if not force:
380 # Check mirrors that are up only once an hour
381 if self.last_check_success is True:
382 if self.last_check_at + datetime.timedelta(hours=1) > t:
383 log.debug("Skipping check for %s" % self)
384 return
385
386 # Check mirrors that are down once every 15 minutes
387 elif self.last_check_success is False:
388 if self.last_check_at + datetime.timedelta(minutes=15) > t:
389 log.debug("Skipping check for %s" % self)
390 return
391
392 log.debug("Running mirror check for %s" % self.hostname)
393
394 # Wrap this into one large transaction
395 async with await self.db.transaction():
396 # Update the country code & ASN
397 await self._update_country_code_and_asn()
398
399 # Make URL for .timestamp
400 url = self.make_url(".timestamp")
401
402 response = None
403 error = None
404
405 # Was this check successful?
406 success = False
407
408 # When was the last sync?
409 timestamp = None
410
411 try:
412 response = await self.backend.httpclient.fetch(
413 url,
414
415 # Allow a moment to connect and get a response
416 connect_timeout=10,
417 request_timeout=10,
418 )
419
420 # Try to parse the response
421 try:
422 timestamp = int(response.body)
423
424 except (TypeError, ValueError) as e:
425 log.error("%s responded with an invalid timestamp")
426
427 raise ValueError("Invalid timestamp received") from e
428
429 # Convert into datetime
430 timestamp = datetime.datetime.utcfromtimestamp(timestamp)
431
432 # Catch anything that isn't 200 OK
433 except httpclient.HTTPError as e:
434 log.error("%s: %s" % (self, e))
435 error = "%s" % e
436
437 # Catch DNS Errors
438 except socket.gaierror as e:
439 # Name or service not known
440 if e.code == -2:
441 log.error("Could not resolve %s: %s" % (self, e))
442
443 # Store the error
444 error = "%s" % e
445
446 # Raise anything else
447 else:
448 raise e
449
450 # Success!
451 else:
452 # This check was successful!
453 success = True
454
455 # Log this check
456 await self.db.insert(
457 MirrorCheck,
458 mirror_id = self.id,
459 success = success,
460 response_time = response.request_time if response else None,
461 http_status = response.code if response else None,
462 last_sync_at = timestamp,
463 error = error,
464 )
465
466 # Update the main table
467 self.last_check_at = sqlalchemy.func.current_timestamp()
468 self.last_check_success = success
469 self.last_sync_at = timestamp
470 self.error = error
471
472 async def get_uptime_since(self, t):
473 # Convert timedeltas to absolute time
474 if isinstance(t, datetime.timedelta):
475 t = datetime.datetime.utcnow() - t
476
477 # CTE with uptimes
478 uptimes = sqlalchemy.select(
479 MirrorCheck.success,
480 sqlalchemy.func.least(
481 sqlalchemy.func.lead(
482 MirrorCheck.checked_at,
483 1,
484 sqlalchemy.func.current_timestamp(),
485 ).over(
486 order_by=MirrorCheck.checked_at.asc(),
487 )
488 -
489 MirrorCheck.checked_at,
490 sqlalchemy.text("INTERVAL '1 hour'"),
491 ).label("uptime"),
492 ).where(
493 MirrorCheck.mirror_id == self.id,
494 MirrorCheck.checked_at >= t,
495 ).cte("uptimes")
496
497 # Check the percentage of how many checks have been successful
498 stmt = sqlalchemy.select(
499 (
500 sqlalchemy.func.extract(
501 "epoch",
502 sqlalchemy.func.sum(
503 uptimes.c.uptime,
504 ).filter(
505 uptimes.c.success == True,
506 ),
507 )
508 /
509 sqlalchemy.func.extract(
510 "epoch",
511 sqlalchemy.func.sum(
512 uptimes.c.uptime
513 ),
514 )
515 ).label("uptime")
516 ).select_from(uptimes)
517
518 # Run the statement
519 return await self.db.select_one(stmt, "uptime")
520
521 async def serves_file(self, path):
522 """
523 Checks if this mirror serves the file
524 """
525 # XXX Skip this if the mirror is not online
526
527 # Make the URL
528 url = self.make_url(path)
529
530 # Make the cache key
531 cache_key = "file-check:%s" % url
532
533 # Check if we have something in the cache
534 serves_file = await self.backend.cache.get(cache_key)
535
536 # Nothing in cache, let's run the check
537 if serves_file is None:
538 serves_file = await self._serves_file(url, cache_key=cache_key)
539
540 return serves_file
541
542 async def _serves_file(self, url, cache_key=None):
543 serves_file = None
544
545 # Send a HEAD request for the URL
546 try:
547 response = await self.backend.httpclient.fetch(
548 url, method="HEAD", follow_redirects=True,
549
550 # Don't allow too much time for the mirror to respond
551 connect_timeout=5, request_timeout=5,
552
553 # Ensure the server responds to all types or requests
554 headers = {
555 "Accept" : "*/*",
556 "Cache-Control" : "no-cache",
557 }
558 )
559
560 # Catch any HTTP errors
561 except tornado.httpclient.HTTPClientError as e:
562 log.error("Mirror %s returned %s for %s" % (self, e.code, url))
563 serves_file = False
564
565 # If there was no error, we assume this file can be downloaded
566 else:
567 log.debug("Mirror %s seems to be serving %s")
568 serves_file = True
569
570 # Store positive responses in the cache for 24 hours
571 # and negative responses for six hours.
572 if cache_key:
573 if serves_file:
574 ttl = 86400 # 24h
575 else:
576 ttl = 21600 # 6h
577
578 # Store in cache
579 await self.backend.cache.set(cache_key, serves_file, ttl)
580
581 return serves_file
582
583
584 class MirrorCheck(database.Base):
585 """
586 An object that represents a single mirror check
587 """
588 __tablename__ = "mirror_checks"
589
590 # Mirror ID
591
592 mirror_id = Column(Integer, ForeignKey("mirrors.id"), primary_key=True, nullable=False)
593
594 # Checked At
595
596 checked_at = Column(DateTime(timezone=None), primary_key=True, nullable=False,
597 server_default=sqlalchemy.func.current_timestamp())
598
599 # Success
600
601 success = Column(Boolean, nullable=False)
602
603 # Response Time
604
605 response_time = Column(Double)
606
607 # HTTP Status
608
609 http_status = Column(Integer)
610
611 # Last Sync At
612
613 last_sync_at = Column(DateTime(timezone=None))
614
615 # Error
616
617 error = Column(Text)