]> git.ipfire.org Git - pbs.git/blob - src/buildservice/__init__.py
backend: Define background tasks here
[pbs.git] / src / buildservice / __init__.py
1 #!/usr/bin/python
2
3 import aiofiles
4 import asyncio
5 import configparser
6 import datetime
7 import inspect
8 import logging
9 import os
10 import pakfire
11 import shutil
12 import ssl
13 import systemd.journal
14 import tempfile
15 import urllib.parse
16
17 from . import aws
18 from . import bugtracker
19 from . import builders
20 from . import builds
21 from . import cache
22 from . import config
23 from . import database
24 from . import distribution
25 from . import events
26 from . import httpclient
27 from . import jobs
28 from . import keys
29 from . import logstreams
30 from . import messages
31 from . import mirrors
32 from . import packages
33 from . import ratelimiter
34 from . import releasemonitoring
35 from . import repository
36 from . import settings
37 from . import sessions
38 from . import sources
39 from . import uploads
40 from . import users
41
42 # Setup logging
43 log = logging.getLogger("pbs")
44
45 # Import version
46 from .__version__ import VERSION as __version__
47
48 from .decorators import *
49 from .constants import *
50
51 class Backend(object):
52 version = __version__
53
54 # A list of any background tasks
55 __tasks = set()
56
57 def __init__(self, config_file, test=False):
58 self.test = test
59
60 # Read configuration file
61 self.config = self.read_config(config_file)
62
63 # Fetch the base path
64 self.basepath = self.config.get("global", "basepath")
65
66 # Global pakfire settings (from database).
67 self.settings = settings.Settings(self)
68
69 # Initialize the HTTP Client
70 self.httpclient = httpclient.HTTPClient(self)
71
72 self.aws = aws.AWS(self)
73 self.builds = builds.Builds(self)
74 self.cache = cache.Cache(self)
75 self.jobs = jobs.Jobs(self)
76 self.builders = builders.Builders(self)
77 self.distros = distribution.Distributions(self)
78 self.events = events.Events(self)
79 self.keys = keys.Keys(self)
80 self.logstreams = logstreams.LogStreams(self)
81 self.messages = messages.Messages(self)
82 self.mirrors = mirrors.Mirrors(self)
83 self.packages = packages.Packages(self)
84 self.monitorings = releasemonitoring.Monitorings(self)
85 self.ratelimiter = ratelimiter.RateLimiter(self)
86 self.repos = repository.Repositories(self)
87 self.sessions = sessions.Sessions(self)
88 self.sources = sources.Sources(self)
89 self.uploads = uploads.Uploads(self)
90 self.users = users.Users(self)
91
92 # Open a connection to bugzilla.
93 self.bugzilla = bugtracker.Bugzilla(self)
94
95 # Create a temporary directory
96 self._create_tmp_path()
97
98 log.info("Pakfire Build Service initialized at %s" % self.basepath)
99
100 def launch_background_tasks(self):
101 # Launch some initial tasks
102 self.run_task(self.users.generate_vapid_keys)
103 self.run_task(self.builders.autoscale)
104
105 # Regularly check the mirrors
106 self.run_periodic_task(300, self.mirrors.check)
107
108 # Regularly fetch sources
109 self.run_periodic_task(300, self.sources.fetch)
110
111 # Regularly check for new releases
112 self.run_periodic_task(300, self.monitorings.check)
113
114 def read_config(self, path):
115 c = configparser.ConfigParser()
116
117 # Read configuration from file
118 if path:
119 c.read(path)
120
121 return c
122
123 @lazy_property
124 def db(self):
125 try:
126 name = self.config.get("database", "name")
127 hostname = self.config.get("database", "hostname")
128 user = self.config.get("database", "user")
129 password = self.config.get("database", "password")
130 except configparser.Error as e:
131 log.error("Error parsing the config: %s" % e.message)
132
133 log.debug("Connecting to database %s @ %s" % (name, hostname))
134
135 return database.Connection(self, hostname, name, user=user, password=password)
136
137 def _create_tmp_path(self):
138 """
139 This function will create some temporary space with the correct permissions.
140 """
141 path = self.path("tmp")
142
143 try:
144 os.mkdir(path, mode=0o1777)
145
146 # Ignore if the directory already exists
147 except FileExistsError:
148 pass
149
150 def path(self, *args):
151 """
152 Takes a relative path and makes it absolute
153 """
154 return os.path.join(self.basepath, *args)
155
156 def url_to(self, url):
157 """
158 Takes a relative URL and makes it absolute
159 """
160 # The base URL
161 baseurl = self.settings.get("baseurl")
162
163 # Join it all together
164 return urllib.parse.urljoin(baseurl, url)
165
166 def path_to_url(self, path):
167 """
168 Takes a path to a file on the file system and converts it into a URL
169 """
170 # Path to package
171 path = os.path.join(
172 "files", os.path.relpath(path, self.basepath),
173 )
174
175 return self.url_to(path)
176
177 def pakfire(self, *args, **kwargs):
178 """
179 Launches a new Pakfire instance with the given configuration
180 """
181 return config.PakfireConfig(self, *args, **kwargs)
182
183 # Functions to run something in background
184
185 def run_task(self, callback, *args):
186 """
187 Runs the given coroutine in the background
188 """
189 # Create a new task
190 task = asyncio.create_task(callback(*args))
191
192 # Keep a reference to the task and remove it when the task has finished
193 self.__tasks.add(task)
194 task.add_done_callback(self.__tasks.discard)
195
196 return task
197
198 def run_periodic_task(self, delay, callback, *args):
199 """
200 Calls the given callback periodically in the background
201 """
202 self.run_task(self._periodic_task, delay, callback, *args)
203
204 async def _periodic_task(self, delay, callback, *args):
205 """
206 Helper function for run_periodic_task() that will call the given
207 callback regulary after the timer has expired.
208 """
209 log.debug("Periodic callback %r started" % callback)
210
211 while True:
212 try:
213 ret = callback(*args)
214
215 # Await ret if callback is a coroutine
216 if inspect.isawaitable(ret):
217 await ret
218
219 except Exception as e:
220 log.error("Exception in periodic callback %r" % callback, exc_info=True)
221
222 # Wait a little moment
223 await asyncio.sleep(delay)
224
225 # Commands
226
227 async def command(self, *command, krb5_auth=False, **kwargs):
228 """
229 Runs this shell command
230 """
231 async with self.tempdir() as tmp:
232 # Create a minimal environment
233 env = {
234 "HOME" : os.environ.get("HOME", tmp),
235
236 # Tell the system where to put temporary files
237 "TMPDIR" : tmp,
238
239 # Store any Kerberos credentials here
240 "KRB5CCNAME" : os.path.join(tmp, ".krb5cc"),
241 }
242
243 # Authenticate using Kerberos
244 if krb5_auth:
245 await self._krb5_auth(env=env)
246
247 # Run the command
248 return await self._command(*command, env=env, **kwargs)
249
250 async def _command(self, *command, return_output=False, input=None, **kwargs):
251 log.debug("Running command: %s" % " ".join(command))
252
253 # Fork child process
254 process = await asyncio.create_subprocess_exec(
255 *command,
256 stdin=asyncio.subprocess.PIPE if input else asyncio.subprocess.DEVNULL,
257 stdout=asyncio.subprocess.PIPE,
258 stderr=asyncio.subprocess.PIPE,
259 **kwargs,
260 )
261
262 # Send input
263 if input:
264 # Convert to bytes
265 if not isinstance(input, bytes):
266 input = input.encode()
267
268 # Write the entire data chunk by chunk
269 while input:
270 chunk, input = input[0:64], input[64:]
271 if not chunk:
272 break
273
274 process.stdin.write(chunk)
275 await process.stdin.drain()
276
277 # Close the input once we are done
278 process.stdin.close()
279
280 stdout = []
281
282 # Fetch output of command and send it to the logger
283 while True:
284 line = await process.stdout.readline()
285 if not line:
286 break
287
288 # Decode line
289 line = line.decode()
290
291 # Strip newline
292 line = line.rstrip()
293
294 # Log the output
295 log.debug(line)
296
297 # Store the output if requested
298 if return_output:
299 stdout.append(line)
300
301 # Wait until the process has finished
302 returncode = await process.wait()
303
304 # Check the return code
305 if returncode:
306 # Fetch any output from the standard error output
307 stderr = await process.stderr.read()
308 stderr = stderr.decode()
309
310 # Log the error
311 log.error("Error running command: %s (code=%s)" % (
312 " ".join(command), returncode,
313 ))
314 if stderr:
315 log.error(stderr)
316
317 raise CommandExecutionError(returncode, stderr)
318
319 # Return output if requested
320 if return_output:
321 return "\n".join(stdout)
322
323 async def _krb5_auth(self, **kwargs):
324 log.debug("Performing Kerberos authentication...")
325
326 # Fetch path to keytab
327 keytab = self.settings.get("krb5-keytab")
328 if not keytab:
329 log.warning("No keytab configured")
330 return
331
332 # Fetch Kerberos principal
333 principal = self.settings.get("krb5-principal")
334 if not principal:
335 log.warning("No Kerberos principal configured")
336 return
337
338 # Fetch a Kerberos ticket
339 await self._command(
340 "kinit", "-k", "-t", keytab, principal, **kwargs,
341 )
342
343 async def exists(self, *args, **kwargs):
344 """
345 Checks whether a file exists
346 """
347 return await asyncio.to_thread(os.path.exists, *args, **kwargs)
348
349 async def makedirs(self, path, **kwargs):
350 """
351 Creates a new directory
352 """
353 return await asyncio.to_thread(os.makedirs, path, **kwargs)
354
355 async def copy(self, src, dst, mode=None):
356 """
357 Copies a file from src to dst
358 """
359 log.debug("Copying %s to %s" % (src, dst))
360
361 # Create parent directory
362 await self.make_parent_directory(dst)
363
364 # Copy data without any metadata
365 await asyncio.to_thread(shutil.copyfile, src, dst)
366
367 # Set mode
368 if mode:
369 await asyncio.to_thread(os.chmod, dst, mode)
370
371 async def move(self, src, dst, **kwargs):
372 """
373 Moves something from src to dst
374 """
375 log.debug("Moving %s to %s" % (src, dst))
376
377 # Create parent directory
378 await self.make_parent_directory(dst)
379
380 # Move!
381 await asyncio.to_thread(shutil.move, src, dst, **kwargs)
382
383 async def make_parent_directory(self, path):
384 """
385 Creates the parent directory of path
386 """
387 path = os.path.dirname(path)
388
389 return await self.makedirs(path, exist_ok=True)
390
391 async def unlink(self, path):
392 """
393 Unlinks path
394 """
395 # Normalize the path
396 path = os.path.abspath(path)
397
398 # Check if the path is within our base directory
399 if not path.startswith(self.basepath):
400 raise OSError("Cannot delete %s which is outside %s" % (path, self.basepath))
401
402 log.debug("Unlinking %s" % path)
403
404 await asyncio.to_thread(self._unlink, path)
405
406 def _unlink(self, path):
407 # Unlink the file we were asked to unlink
408 try:
409 os.unlink(path)
410 except OSError as e:
411 return
412
413 # Try to delete any empty parent directories
414 while True:
415 # Get the parent directory
416 path = os.path.dirname(path)
417
418 # Break if we reached the base path
419 if path == self.basepath or path == self.path("tmp"):
420 break
421
422 # Call rmdir()
423 try:
424 os.rmdir(path)
425 except OSError as e:
426 break
427
428 log.debug(" Cleaned up %s..." % path)
429
430 async def rmtree(self, path):
431 """
432 Removes a directory recursively
433 """
434 # Normalize the path
435 path = os.path.abspath(path)
436
437 # Check if the path is within our base directory
438 if not path.startswith(self.basepath):
439 raise OSError("Cannot delete %s which is outside %s" % (path, self.basepath))
440
441 log.debug("Removing %s..." % path)
442
443 try:
444 await asyncio.to_thread(shutil.rmtree, path)
445
446 # Ignore if path didn't exist in the first place
447 except FileNotFoundError:
448 pass
449
450 async def chmod(self, *args, **kwargs):
451 return await asyncio.to_thread(os.chmod, *args, **kwargs)
452
453 def tempfile(self, mode="w+b", delete=True):
454 """
455 Returns an open file handle to a new temporary file
456 """
457 path = self.path("tmp")
458
459 return aiofiles.tempfile.NamedTemporaryFile(mode=mode, dir=path, delete=delete)
460
461 def tempdir(self, **kwargs):
462 """
463 Asynchronously creates a new temporary directory
464 """
465 # Default to our own tmp directory
466 path = self.path("tmp")
467
468 return aiofiles.tempfile.TemporaryDirectory(dir=path, **kwargs)
469
470 def _write_tempfile(self, content):
471 """
472 Writes the content to a temporary file and returns its path
473 """
474 t = self.tempfile(delete=False)
475
476 # Write the content
477 if content:
478 t.write(content.encode())
479
480 # Close the file
481 t.close()
482
483 return t.name
484
485 async def open(self, path):
486 """
487 Opens a package and returns the archive
488 """
489 log.debug("Opening %s..." % path)
490
491 # Open the archive
492 async with self.pakfire() as p:
493 return await asyncio.to_thread(p.open, path)
494
495 @property
496 def ssl_context(self):
497 # Create SSL context
498 context = ssl.create_default_context()
499
500 # Fetch client certificate
501 certificate = self.settings.get("client-certificate", None)
502 key = self.settings.get("client-key", None)
503
504 # Apply client certificate
505 if certificate and key:
506 with tempfile.NamedTemporaryFile(mode="w") as f_cert:
507 f_cert.write(certificate)
508 f_cert.flush()
509
510 with tempfile.NamedTemporaryFile(mode="w") as f_key:
511 f_key.write(key)
512 f_key.flush()
513
514 context.load_cert_chain(f_cert.name, f_key.name)
515
516 return context
517
518 async def load_certificate(self, certfile, keyfile):
519 with self.db.transaction():
520 # Load certificate
521 with open(certfile) as f:
522 self.settings.set("client-certificate", f.read())
523
524 # Load key file
525 with open(keyfile) as f:
526 self.settings.set("client-key", f.read())
527
528 log.info("Updated certificates")
529
530 async def cleanup(self):
531 """
532 Called regularly to cleanup any left-over resources
533 """
534 # Messages
535 await self.messages.queue.cleanup()
536
537 # Sessions
538 await self.sessions.cleanup()
539
540 # Uploads
541 await self.uploads.cleanup()
542
543 async def sync(self):
544 """
545 Syncs any repository that should be mirrored
546 """
547 log.info("Syncing mirrors...")
548
549 # Fetch the sync target
550 target = self.settings.get("sync-target")
551 if not target:
552 log.warning("No sync target configured")
553 return 0
554
555 # Update the timestamp
556 await self._update_timestamp()
557
558 commandline = [
559 "rsync",
560
561 # Show what is being transferred
562 "--verbose",
563
564 # Compress any transferred data
565 "--compress",
566
567 # Enable archive mode
568 "--archive",
569
570 # Preserve hardlinks, ACLs & XATTRs
571 "--hard-links",
572 "--acls",
573 "--xattrs",
574
575 # Delete any files that we have deleted
576 "--delete",
577 "--delete-excluded",
578
579 # Remove any empty directories
580 "--prune-empty-dirs",
581
582 # Make the transaction atomic
583 "--delay-updates",
584
585 # Add source & target
586 "%s/" % self.basepath,
587 target,
588
589 # Sync the .timestamp
590 "--include=.timestamp",
591 ]
592
593 # Add all mirrored repositories
594 for repo in self.repos.mirrored:
595 path = os.path.relpath(repo.local_path(), self.basepath)
596
597 commandline.append("--include=%s***" % path)
598
599 # Exclude everything that hasn't been included
600 commandline += ("--include=*/", "--exclude=*")
601
602 # Run command
603 await self.command(*commandline, krb5_auth=True)
604
605 async def _update_timestamp(self):
606 """
607 Updates the .timestamp file in the root of the exported files
608 """
609 # Make filename
610 path = os.path.join(self.basepath, ".timestamp")
611
612 t = datetime.datetime.utcnow()
613
614 # Write the current time as seconds since epoch
615 async with aiofiles.open(path, mode="w") as f:
616 await f.write(t.strftime("%s"))
617
618
619 def setup_logging():
620 """
621 Configures the logger for the buildservice backend
622 """
623 # Log everything to journal
624 handler = systemd.journal.JournalHandler(
625 SYSLOG_IDENTIFIER="pakfire-build-service",
626 )
627 log.addHandler(handler)
628
629
630 # Setup logging
631 setup_logging()