]> git.ipfire.org Git - pbs.git/blob - src/buildservice/__init__.py
http: Build a custom HTTP client based on cURL
[pbs.git] / src / buildservice / __init__.py
1 #!/usr/bin/python
2
3 import asyncio
4 import configparser
5 import inspect
6 import logging
7 import os
8 import pakfire
9 import shutil
10 import ssl
11 import systemd.journal
12 import tempfile
13 import urllib.parse
14
15 from . import aws
16 from . import bugtracker
17 from . import builders
18 from . import builds
19 from . import cache
20 from . import config
21 from . import database
22 from . import distribution
23 from . import events
24 from . import httpclient
25 from . import jobs
26 from . import keys
27 from . import logstreams
28 from . import messages
29 from . import mirrors
30 from . import packages
31 from . import releasemonitoring
32 from . import repository
33 from . import settings
34 from . import sessions
35 from . import sources
36 from . import uploads
37 from . import users
38
39 # Setup logging
40 log = logging.getLogger("pbs")
41
42 # Import version
43 from .__version__ import VERSION as __version__
44
45 from .decorators import *
46 from .constants import *
47
48 class Backend(object):
49 version = __version__
50
51 # A list of any background tasks
52 __tasks = set()
53
54 def __init__(self, config_file, test=False):
55 self.test = test
56
57 # Read configuration file
58 self.config = self.read_config(config_file)
59
60 # Fetch the base path
61 self.basepath = self.config.get("global", "basepath")
62
63 # Global pakfire settings (from database).
64 self.settings = settings.Settings(self)
65
66 # Initialize the HTTP Client
67 self.httpclient = httpclient.HTTPClient(self)
68
69 self.aws = aws.AWS(self)
70 self.builds = builds.Builds(self)
71 self.cache = cache.Cache(self)
72 self.jobs = jobs.Jobs(self)
73 self.builders = builders.Builders(self)
74 self.distros = distribution.Distributions(self)
75 self.events = events.Events(self)
76 self.keys = keys.Keys(self)
77 self.logstreams = logstreams.LogStreams(self)
78 self.messages = messages.Messages(self)
79 self.mirrors = mirrors.Mirrors(self)
80 self.packages = packages.Packages(self)
81 self.releasemonitoring = releasemonitoring.ReleaseMonitoring(self)
82 self.repos = repository.Repositories(self)
83 self.sessions = sessions.Sessions(self)
84 self.sources = sources.Sources(self)
85 self.uploads = uploads.Uploads(self)
86 self.users = users.Users(self)
87
88 # Open a connection to bugzilla.
89 self.bugzilla = bugtracker.Bugzilla(self)
90
91 # Create a temporary directory
92 self._create_tmp_path()
93
94 log.info("Pakfire Build Service initialized at %s" % self.basepath)
95
96 def read_config(self, path):
97 c = configparser.ConfigParser()
98
99 # Read configuration from file
100 if path:
101 c.read(path)
102
103 return c
104
105 @lazy_property
106 def db(self):
107 try:
108 name = self.config.get("database", "name")
109 hostname = self.config.get("database", "hostname")
110 user = self.config.get("database", "user")
111 password = self.config.get("database", "password")
112 except configparser.Error as e:
113 log.error("Error parsing the config: %s" % e.message)
114
115 log.debug("Connecting to database %s @ %s" % (name, hostname))
116
117 return database.Connection(hostname, name, user=user, password=password)
118
119 def _create_tmp_path(self):
120 """
121 This function will create some temporary space with the correct permissions.
122 """
123 path = self.path("tmp")
124
125 try:
126 os.mkdir(path, mode=0o1777)
127
128 # Ignore if the directory already exists
129 except FileExistsError:
130 pass
131
132 def path(self, *args):
133 """
134 Takes a relative path and makes it absolute
135 """
136 return os.path.join(self.basepath, *args)
137
138 def url_to(self, url):
139 """
140 Takes a relative URL and makes it absolute
141 """
142 # The base URL
143 baseurl = self.settings.get("baseurl")
144
145 # Join it all together
146 return urllib.parse.urljoin(baseurl, url)
147
148 def path_to_url(self, path):
149 """
150 Takes a path to a file on the file system and converts it into a URL
151 """
152 # Path to package
153 path = os.path.join(
154 "files", os.path.relpath(path, self.basepath),
155 )
156
157 return self.url_to(path)
158
159 def pakfire(self, *args, **kwargs):
160 """
161 Launches a new Pakfire instance with the given configuration
162 """
163 return config.PakfireConfig(self, *args, **kwargs)
164
165 # Functions to run something in background
166
167 def run_task(self, callback, *args):
168 """
169 Runs the given coroutine in the background
170 """
171 # Create a new task
172 task = asyncio.create_task(callback(*args))
173
174 # Keep a reference to the task and remove it when the task has finished
175 self.__tasks.add(task)
176 task.add_done_callback(self.__tasks.discard)
177
178 return task
179
180 def run_periodic_task(self, delay, callback, *args):
181 """
182 Calls the given callback periodically in the background
183 """
184 self.run_task(self._periodic_task, delay, callback, *args)
185
186 async def _periodic_task(self, delay, callback, *args):
187 """
188 Helper function for run_periodic_task() that will call the given
189 callback regulary after the timer has expired.
190 """
191 log.debug("Periodic callback %r started" % callback)
192
193 while True:
194 # Wait a little moment
195 await asyncio.sleep(delay)
196
197 try:
198 ret = callback(*args)
199
200 # Await ret if callback is a coroutine
201 if inspect.isawaitable(ret):
202 await ret
203
204 except Exception as e:
205 log.error("Exception in periodic callback %r" % callback, exc_info=True)
206
207 # Commands
208
209 async def command(self, *command, krb5_auth=False, **kwargs):
210 """
211 Runs this shell command
212 """
213 with tempfile.TemporaryDirectory() as tmp:
214 # Create a minimal environment
215 env = {
216 "HOME" : os.environ.get("HOME", tmp),
217
218 # Tell the system where to put temporary files
219 "TMPDIR" : tmp,
220
221 # Store any Kerberos credentials here
222 "KRB5CCNAME" : os.path.join(tmp, ".krb5cc"),
223 }
224
225 # Authenticate using Kerberos
226 if krb5_auth:
227 await self._krb5_auth(env=env)
228
229 # Run the command
230 return await self._command(*command, env=env, **kwargs)
231
232 async def _command(self, *command, return_output=False, **kwargs):
233 log.debug("Running command: %s" % " ".join(command))
234
235 # Fork child process
236 process = await asyncio.create_subprocess_exec(
237 *command,
238 stdin=asyncio.subprocess.DEVNULL,
239 stdout=asyncio.subprocess.PIPE,
240 stderr=asyncio.subprocess.PIPE,
241 **kwargs,
242 )
243
244 stdout = []
245
246 # Fetch output of command and send it to the logger
247 while True:
248 line = await process.stdout.readline()
249 if not line:
250 break
251
252 # Decode line
253 line = line.decode()
254
255 # Strip newline
256 line = line.rstrip()
257
258 # Log the output
259 log.debug(line)
260
261 # Store the output if requested
262 if return_output:
263 stdout.append(line)
264
265 # Wait until the process has finished
266 returncode = await process.wait()
267
268 # Check the return code
269 if returncode:
270 # Fetch any output from the standard error output
271 stderr = await process.stderr.read()
272 stderr = stderr.decode()
273
274 # Log the error
275 log.error("Error running command: %s (code=%s)" % (
276 " ".join(command), returncode,
277 ))
278 if stderr:
279 log.error(stderr)
280
281 raise CommandExecutionError(returncode, stderr)
282
283 # Return output if requested
284 if return_output:
285 return "\n".join(stdout)
286
287 async def _krb5_auth(self, **kwargs):
288 log.debug("Performing Kerberos authentication...")
289
290 # Fetch path to keytab
291 keytab = self.settings.get("krb5-keytab")
292 if not keytab:
293 log.warning("No keytab configured")
294 return
295
296 # Fetch Kerberos principal
297 principal = self.settings.get("krb5-principal")
298 if not principal:
299 log.warning("No Kerberos principal configured")
300 return
301
302 # Fetch a Kerberos ticket
303 await self._command(
304 "kinit", "-k", "-t", keytab, principal, **kwargs,
305 )
306
307 async def copy(self, src, dst, mode=None):
308 """
309 Copies a file from src to dst
310 """
311 log.debug("Copying %s to %s" % (src, dst))
312
313 # Create parent directory
314 await self.make_parent_directory(dst)
315
316 # Copy data without any metadata
317 await asyncio.to_thread(shutil.copyfile, src, dst)
318
319 # Set mode
320 if mode:
321 await asyncio.to_thread(os.chmod, dst, mode)
322
323 async def make_parent_directory(self, path):
324 """
325 Creates the parent directory of path
326 """
327 path = os.path.dirname(path)
328
329 # Create destination path (if it does not exist)
330 try:
331 await asyncio.to_thread(os.makedirs, path)
332 except FileExistsError:
333 pass
334
335 async def unlink(self, path):
336 """
337 Unlinks path
338 """
339 # Normalize the path
340 path = os.path.abspath(path)
341
342 # Check if the path is within our base directory
343 if not path.startswith(self.basepath):
344 raise OSError("Cannot delete %s which is outside %s" % (path, self.basepath))
345
346 log.debug("Unlinking %s" % path)
347
348 await asyncio.to_thread(self._unlink, path)
349
350 def _unlink(self, path):
351 # Unlink the file we were asked to unlink
352 try:
353 os.unlink(path)
354 except OSError as e:
355 return
356
357 # Try to delete any empty parent directories
358 while True:
359 # Get the parent directory
360 path = os.path.dirname(path)
361
362 # Break if we reached the base path
363 if path == self.basepath:
364 break
365
366 # Call rmdir()
367 try:
368 os.rmdir(path)
369 except OSError as e:
370 break
371
372 log.debug(" Cleaned up %s..." % path)
373
374 def tempfile(self, mode="w+b", delete=True):
375 """
376 Returns an open file handle to a new temporary file
377 """
378 path = self.path("tmp")
379
380 return tempfile.NamedTemporaryFile(mode=mode, dir=path, delete=delete)
381
382 def _write_tempfile(self, content):
383 """
384 Writes the content to a temporary file and returns its path
385 """
386 t = self.tempfile(delete=False)
387
388 # Write the content
389 if content:
390 t.write(content.encode())
391
392 # Close the file
393 t.close()
394
395 return t.name
396
397 async def open(self, path):
398 """
399 Opens a package and returns the archive
400 """
401 return await asyncio.to_thread(self._open, path)
402
403 def _open(self, path):
404 log.debug("Opening %s..." % path)
405
406 # Open the archive
407 with self.pakfire() as p:
408 return p.open(path)
409
410 @property
411 def ssl_context(self):
412 # Create SSL context
413 context = ssl.create_default_context()
414
415 # Fetch client certificate
416 certificate = self.settings.get("client-certificate", None)
417 key = self.settings.get("client-key", None)
418
419 # Apply client certificate
420 if certificate and key:
421 with tempfile.NamedTemporaryFile(mode="w") as f_cert:
422 f_cert.write(certificate)
423 f_cert.flush()
424
425 with tempfile.NamedTemporaryFile(mode="w") as f_key:
426 f_key.write(key)
427 f_key.flush()
428
429 context.load_cert_chain(f_cert.name, f_key.name)
430
431 return context
432
433 async def load_certificate(self, certfile, keyfile):
434 with self.db.transaction():
435 # Load certificate
436 with open(certfile) as f:
437 self.settings.set("client-certificate", f.read())
438
439 # Load key file
440 with open(keyfile) as f:
441 self.settings.set("client-key", f.read())
442
443 log.info("Updated certificates")
444
445 async def cleanup(self):
446 """
447 Called regularly to cleanup any left-over resources
448 """
449 # Messages
450 await self.messages.queue.cleanup()
451
452 # Sessions
453 await self.sessions.cleanup()
454
455 # Uploads
456 await self.uploads.cleanup()
457
458 async def sync(self):
459 """
460 Syncs any repository that should be mirrored
461 """
462 log.info("Syncing mirrors...")
463
464 # Fetch the sync target
465 target = self.settings.get("sync-target")
466 if not target:
467 log.warning("No sync target configured")
468 return 0
469
470 commandline = [
471 "rsync",
472
473 # Show what is being transferred
474 #"--verbose",
475
476 # Compress any transferred data
477 "--compress",
478
479 # Enable archive mode
480 "--archive",
481
482 # Preserve hardlinks, ACLs & XATTRs
483 "--hard-links",
484 "--acls",
485 "--xattrs",
486
487 # Delete any files that we have deleted
488 "--delete",
489 "--delete-excluded",
490
491 # Remove any empty directories
492 "--prune-empty-dirs",
493
494 # Make the transaction atomic
495 "--delay-updates",
496
497 # Add source & target
498 "%s/" % self.basepath,
499 target,
500 ]
501
502 # Add all mirrored repositories
503 for repo in self.repos.mirrored:
504 path = os.path.relpath(repo.local_path(), self.basepath)
505
506 commandline.append("--include=%s***" % path)
507
508 # Exclude everything that hasn't been included
509 commandline += ("--include=*/", "--exclude=*")
510
511 # Run command
512 await self.command(*commandline, krb5_auth=True)
513
514
515 def setup_logging():
516 """
517 Configures the logger for the buildservice backend
518 """
519 # Log everything to journal
520 handler = systemd.journal.JournalHandler(
521 SYSLOG_IDENTIFIER="pakfire-build-service",
522 )
523 log.addHandler(handler)
524
525
526 # Setup logging
527 setup_logging()