]>
git.ipfire.org Git - pbs.git/blob - src/buildservice/__init__.py
13 import systemd
.journal
18 from . import bugtracker
19 from . import builders
23 from . import database
24 from . import distribution
26 from . import httpclient
29 from . import logstreams
30 from . import messages
32 from . import packages
33 from . import ratelimiter
34 from . import releasemonitoring
35 from . import repository
36 from . import settings
37 from . import sessions
43 log
= logging
.getLogger("pbs")
46 from .__version
__ import VERSION
as __version__
48 from .decorators
import *
49 from .constants
import *
51 class Backend(object):
54 # A list of any background tasks
57 def __init__(self
, config_file
, test
=False):
60 # Read configuration file
61 self
.config
= self
.read_config(config_file
)
64 self
.basepath
= self
.config
.get("global", "basepath")
66 # Global pakfire settings (from database).
67 self
.settings
= settings
.Settings(self
)
69 # Initialize the HTTP Client
70 self
.httpclient
= httpclient
.HTTPClient(self
)
72 self
.aws
= aws
.AWS(self
)
73 self
.builds
= builds
.Builds(self
)
74 self
.cache
= cache
.Cache(self
)
75 self
.jobs
= jobs
.Jobs(self
)
76 self
.builders
= builders
.Builders(self
)
77 self
.distros
= distribution
.Distributions(self
)
78 self
.events
= events
.Events(self
)
79 self
.keys
= keys
.Keys(self
)
80 self
.logstreams
= logstreams
.LogStreams(self
)
81 self
.messages
= messages
.Messages(self
)
82 self
.mirrors
= mirrors
.Mirrors(self
)
83 self
.packages
= packages
.Packages(self
)
84 self
.monitorings
= releasemonitoring
.Monitorings(self
)
85 self
.ratelimiter
= ratelimiter
.RateLimiter(self
)
86 self
.repos
= repository
.Repositories(self
)
87 self
.sessions
= sessions
.Sessions(self
)
88 self
.sources
= sources
.Sources(self
)
89 self
.uploads
= uploads
.Uploads(self
)
90 self
.users
= users
.Users(self
)
92 # Open a connection to bugzilla.
93 self
.bugzilla
= bugtracker
.Bugzilla(self
)
95 # Create a temporary directory
96 self
._create
_tmp
_path
()
98 log
.info("Pakfire Build Service initialized at %s" % self
.basepath
)
100 def launch_background_tasks(self
):
101 # Launch some initial tasks
102 self
.run_task(self
.users
.generate_vapid_keys
)
103 self
.run_task(self
.builders
.autoscale
)
105 # Regularly check the mirrors
106 self
.run_periodic_task(300, self
.mirrors
.check
)
108 # Regularly fetch sources
109 self
.run_periodic_task(300, self
.sources
.fetch
)
111 # Regularly check for new releases
112 self
.run_periodic_task(300, self
.monitorings
.check
)
114 def read_config(self
, path
):
115 c
= configparser
.ConfigParser()
117 # Read configuration from file
126 name
= self
.config
.get("database", "name")
127 hostname
= self
.config
.get("database", "hostname")
128 user
= self
.config
.get("database", "user")
129 password
= self
.config
.get("database", "password")
130 except configparser
.Error
as e
:
131 log
.error("Error parsing the config: %s" % e
.message
)
133 log
.debug("Connecting to database %s @ %s" % (name
, hostname
))
135 return database
.Connection(self
, hostname
, name
, user
=user
, password
=password
)
137 def _create_tmp_path(self
):
139 This function will create some temporary space with the correct permissions.
141 path
= self
.path("tmp")
144 os
.mkdir(path
, mode
=0o1777)
146 # Ignore if the directory already exists
147 except FileExistsError
:
150 def path(self
, *args
):
152 Takes a relative path and makes it absolute
154 return os
.path
.join(self
.basepath
, *args
)
156 def url_to(self
, url
):
158 Takes a relative URL and makes it absolute
161 baseurl
= self
.settings
.get("baseurl")
163 # Join it all together
164 return urllib
.parse
.urljoin(baseurl
, url
)
166 def path_to_url(self
, path
):
168 Takes a path to a file on the file system and converts it into a URL
172 "files", os
.path
.relpath(path
, self
.basepath
),
175 return self
.url_to(path
)
177 def pakfire(self
, *args
, **kwargs
):
179 Launches a new Pakfire instance with the given configuration
181 return config
.PakfireConfig(self
, *args
, **kwargs
)
183 # Functions to run something in background
185 def run_task(self
, callback
, *args
):
187 Runs the given coroutine in the background
190 task
= asyncio
.create_task(callback(*args
))
192 # Keep a reference to the task and remove it when the task has finished
193 self
.__tasks
.add(task
)
194 task
.add_done_callback(self
.__tasks
.discard
)
198 def run_periodic_task(self
, delay
, callback
, *args
):
200 Calls the given callback periodically in the background
202 self
.run_task(self
._periodic
_task
, delay
, callback
, *args
)
204 async def _periodic_task(self
, delay
, callback
, *args
):
206 Helper function for run_periodic_task() that will call the given
207 callback regulary after the timer has expired.
209 log
.debug("Periodic callback %r started" % callback
)
213 ret
= callback(*args
)
215 # Await ret if callback is a coroutine
216 if inspect
.isawaitable(ret
):
219 except Exception as e
:
220 log
.error("Exception in periodic callback %r" % callback
, exc_info
=True)
222 # Wait a little moment
223 await asyncio
.sleep(delay
)
227 async def command(self
, *command
, krb5_auth
=False, **kwargs
):
229 Runs this shell command
231 async with self
.tempdir() as tmp
:
232 # Create a minimal environment
234 "HOME" : os
.environ
.get("HOME", tmp
),
236 # Tell the system where to put temporary files
239 # Store any Kerberos credentials here
240 "KRB5CCNAME" : os
.path
.join(tmp
, ".krb5cc"),
243 # Authenticate using Kerberos
245 await self
._krb
5_auth
(env
=env
)
248 return await self
._command
(*command
, env
=env
, **kwargs
)
250 async def _command(self
, *command
, return_output
=False, input=None, **kwargs
):
251 log
.debug("Running command: %s" % " ".join(command
))
254 process
= await asyncio
.create_subprocess_exec(
256 stdin
=asyncio
.subprocess
.PIPE
if input else asyncio
.subprocess
.DEVNULL
,
257 stdout
=asyncio
.subprocess
.PIPE
,
258 stderr
=asyncio
.subprocess
.PIPE
,
265 if not isinstance(input, bytes
):
266 input = input.encode()
268 # Write the entire data chunk by chunk
270 chunk
, input = input[0:64], input[64:]
274 process
.stdin
.write(chunk
)
275 await process
.stdin
.drain()
277 # Close the input once we are done
278 process
.stdin
.close()
282 # Fetch output of command and send it to the logger
284 line
= await process
.stdout
.readline()
297 # Store the output if requested
301 # Wait until the process has finished
302 returncode
= await process
.wait()
304 # Check the return code
306 # Fetch any output from the standard error output
307 stderr
= await process
.stderr
.read()
308 stderr
= stderr
.decode()
311 log
.error("Error running command: %s (code=%s)" % (
312 " ".join(command
), returncode
,
317 raise CommandExecutionError(returncode
, stderr
)
319 # Return output if requested
321 return "\n".join(stdout
)
323 async def _krb5_auth(self
, **kwargs
):
324 log
.debug("Performing Kerberos authentication...")
326 # Fetch path to keytab
327 keytab
= self
.settings
.get("krb5-keytab")
329 log
.warning("No keytab configured")
332 # Fetch Kerberos principal
333 principal
= self
.settings
.get("krb5-principal")
335 log
.warning("No Kerberos principal configured")
338 # Fetch a Kerberos ticket
340 "kinit", "-k", "-t", keytab
, principal
, **kwargs
,
343 async def exists(self
, *args
, **kwargs
):
345 Checks whether a file exists
347 return await asyncio
.to_thread(os
.path
.exists
, *args
, **kwargs
)
349 async def makedirs(self
, path
, **kwargs
):
351 Creates a new directory
353 return await asyncio
.to_thread(os
.makedirs
, path
, **kwargs
)
355 async def copy(self
, src
, dst
, mode
=None):
357 Copies a file from src to dst
359 log
.debug("Copying %s to %s" % (src
, dst
))
361 # Create parent directory
362 await self
.make_parent_directory(dst
)
364 # Copy data without any metadata
365 await asyncio
.to_thread(shutil
.copyfile
, src
, dst
)
369 await asyncio
.to_thread(os
.chmod
, dst
, mode
)
371 async def move(self
, src
, dst
, **kwargs
):
373 Moves something from src to dst
375 log
.debug("Moving %s to %s" % (src
, dst
))
377 # Create parent directory
378 await self
.make_parent_directory(dst
)
381 await asyncio
.to_thread(shutil
.move
, src
, dst
, **kwargs
)
383 async def make_parent_directory(self
, path
):
385 Creates the parent directory of path
387 path
= os
.path
.dirname(path
)
389 return await self
.makedirs(path
, exist_ok
=True)
391 async def unlink(self
, path
):
396 path
= os
.path
.abspath(path
)
398 # Check if the path is within our base directory
399 if not path
.startswith(self
.basepath
):
400 raise OSError("Cannot delete %s which is outside %s" % (path
, self
.basepath
))
402 log
.debug("Unlinking %s" % path
)
404 await asyncio
.to_thread(self
._unlink
, path
)
406 def _unlink(self
, path
):
407 # Unlink the file we were asked to unlink
413 # Try to delete any empty parent directories
415 # Get the parent directory
416 path
= os
.path
.dirname(path
)
418 # Break if we reached the base path
419 if path
== self
.basepath
or path
== self
.path("tmp"):
428 log
.debug(" Cleaned up %s..." % path
)
430 async def rmtree(self
, path
):
432 Removes a directory recursively
435 path
= os
.path
.abspath(path
)
437 # Check if the path is within our base directory
438 if not path
.startswith(self
.basepath
):
439 raise OSError("Cannot delete %s which is outside %s" % (path
, self
.basepath
))
441 log
.debug("Removing %s..." % path
)
444 await asyncio
.to_thread(shutil
.rmtree
, path
)
446 # Ignore if path didn't exist in the first place
447 except FileNotFoundError
:
450 async def chmod(self
, *args
, **kwargs
):
451 return await asyncio
.to_thread(os
.chmod
, *args
, **kwargs
)
453 def tempfile(self
, mode
="w+b", delete
=True):
455 Returns an open file handle to a new temporary file
457 path
= self
.path("tmp")
459 return aiofiles
.tempfile
.NamedTemporaryFile(mode
=mode
, dir=path
, delete
=delete
)
461 def tempdir(self
, **kwargs
):
463 Asynchronously creates a new temporary directory
465 # Default to our own tmp directory
466 path
= self
.path("tmp")
468 return aiofiles
.tempfile
.TemporaryDirectory(dir=path
, **kwargs
)
470 def _write_tempfile(self
, content
):
472 Writes the content to a temporary file and returns its path
474 t
= self
.tempfile(delete
=False)
478 t
.write(content
.encode())
485 async def open(self
, path
):
487 Opens a package and returns the archive
489 log
.debug("Opening %s..." % path
)
492 async with self
.pakfire() as p
:
493 return await asyncio
.to_thread(p
.open, path
)
496 def ssl_context(self
):
498 context
= ssl
.create_default_context()
500 # Fetch client certificate
501 certificate
= self
.settings
.get("client-certificate", None)
502 key
= self
.settings
.get("client-key", None)
504 # Apply client certificate
505 if certificate
and key
:
506 with tempfile
.NamedTemporaryFile(mode
="w") as f_cert
:
507 f_cert
.write(certificate
)
510 with tempfile
.NamedTemporaryFile(mode
="w") as f_key
:
514 context
.load_cert_chain(f_cert
.name
, f_key
.name
)
518 async def load_certificate(self
, certfile
, keyfile
):
519 with self
.db
.transaction():
521 with
open(certfile
) as f
:
522 self
.settings
.set("client-certificate", f
.read())
525 with
open(keyfile
) as f
:
526 self
.settings
.set("client-key", f
.read())
528 log
.info("Updated certificates")
530 async def cleanup(self
):
532 Called regularly to cleanup any left-over resources
535 await self
.messages
.queue
.cleanup()
538 await self
.sessions
.cleanup()
541 await self
.uploads
.cleanup()
543 async def sync(self
):
545 Syncs any repository that should be mirrored
547 log
.info("Syncing mirrors...")
549 # Fetch the sync target
550 target
= self
.settings
.get("sync-target")
552 log
.warning("No sync target configured")
555 # Update the timestamp
556 await self
._update
_timestamp
()
561 # Show what is being transferred
564 # Compress any transferred data
567 # Enable archive mode
570 # Preserve hardlinks, ACLs & XATTRs
575 # Delete any files that we have deleted
579 # Remove any empty directories
580 "--prune-empty-dirs",
582 # Make the transaction atomic
585 # Add source & target
586 "%s/" % self
.basepath
,
589 # Sync the .timestamp
590 "--include=.timestamp",
593 # Add all mirrored repositories
594 for repo
in self
.repos
.mirrored
:
595 path
= os
.path
.relpath(repo
.local_path(), self
.basepath
)
597 commandline
.append("--include=%s***" % path
)
599 # Exclude everything that hasn't been included
600 commandline
+= ("--include=*/", "--exclude=*")
603 await self
.command(*commandline
, krb5_auth
=True)
605 async def _update_timestamp(self
):
607 Updates the .timestamp file in the root of the exported files
610 path
= os
.path
.join(self
.basepath
, ".timestamp")
612 t
= datetime
.datetime
.utcnow()
614 # Write the current time as seconds since epoch
615 async with aiofiles
.open(path
, mode
="w") as f
:
616 await f
.write(t
.strftime("%s"))
621 Configures the logger for the buildservice backend
623 # Log everything to journal
624 handler
= systemd
.journal
.JournalHandler(
625 SYSLOG_IDENTIFIER
="pakfire-build-service",
627 log
.addHandler(handler
)