]> git.ipfire.org Git - pbs.git/blob - src/buildservice/__init__.py
backend: command: Add option to return output
[pbs.git] / src / buildservice / __init__.py
1 #!/usr/bin/python
2
3 import asyncio
4 import configparser
5 import inspect
6 import logging
7 import os
8 import pakfire
9 import shutil
10 import ssl
11 import systemd.journal
12 import tempfile
13 import urllib.parse
14
15 from . import aws
16 from . import bugtracker
17 from . import builders
18 from . import builds
19 from . import cache
20 from . import config
21 from . import database
22 from . import distribution
23 from . import events
24 from . import jobqueue
25 from . import jobs
26 from . import keys
27 from . import messages
28 from . import mirrors
29 from . import packages
30 from . import repository
31 from . import settings
32 from . import sessions
33 from . import sources
34 from . import uploads
35 from . import users
36
37 log = logging.getLogger("pakfire.buildservice")
38
39 # Import version
40 from .__version__ import VERSION as __version__
41
42 from .decorators import *
43 from .constants import *
44
45 class Backend(object):
46 version = __version__
47
48 # A list of any background tasks
49 __tasks = set()
50
51 def __init__(self, config_file, test=False):
52 self.test = test
53
54 # Read configuration file
55 self.config = self.read_config(config_file)
56
57 # Fetch the base path
58 self.basepath = self.config.get("global", "basepath")
59
60 # Global pakfire settings (from database).
61 self.settings = settings.Settings(self)
62
63 self.aws = aws.AWS(self)
64 self.builds = builds.Builds(self)
65 self.cache = cache.Cache(self)
66 self.jobs = jobs.Jobs(self)
67 self.builders = builders.Builders(self)
68 self.distros = distribution.Distributions(self)
69 self.events = events.Events(self)
70 self.jobqueue = jobqueue.JobQueue(self)
71 self.keys = keys.Keys(self)
72 self.messages = messages.Messages(self)
73 self.mirrors = mirrors.Mirrors(self)
74 self.packages = packages.Packages(self)
75 self.repos = repository.Repositories(self)
76 self.sessions = sessions.Sessions(self)
77 self.sources = sources.Sources(self)
78 self.uploads = uploads.Uploads(self)
79 self.users = users.Users(self)
80
81 # Open a connection to bugzilla.
82 self.bugzilla = bugtracker.Bugzilla(self)
83
84 log.info("Pakfire Build Service initialized at %s" % self.basepath)
85
86 def read_config(self, path):
87 c = configparser.ConfigParser()
88
89 # Read configuration from file
90 if path:
91 c.read(path)
92
93 return c
94
95 @lazy_property
96 def db(self):
97 try:
98 name = self.config.get("database", "name")
99 hostname = self.config.get("database", "hostname")
100 user = self.config.get("database", "user")
101 password = self.config.get("database", "password")
102 except configparser.Error as e:
103 log.error("Error parsing the config: %s" % e.message)
104
105 log.debug("Connecting to database %s @ %s" % (name, hostname))
106
107 return database.Connection(hostname, name, user=user, password=password)
108
109 def path(self, *args):
110 """
111 Takes a relative path and makes it absolute
112 """
113 return os.path.join(self.basepath, *args)
114
115 def path_to_url(self, path):
116 """
117 Takes a path to a file on the file system and converts it into a URL
118 """
119 # The base URL
120 baseurl = self.settings.get("baseurl")
121
122 # Path to package
123 path = os.path.join(
124 "files", os.path.relpath(path, self.basepath),
125 )
126
127 # Join it all together
128 return urllib.parse.urljoin(baseurl, path)
129
130 def pakfire(self, *args, **kwargs):
131 """
132 Launches a new Pakfire instance with the given configuration
133 """
134 return config.PakfireConfig(self, *args, **kwargs)
135
136 # Functions to run something in background
137
138 def run_task(self, callback, *args):
139 """
140 Runs the given coroutine in the background
141 """
142 # Create a new task
143 task = asyncio.create_task(callback(*args))
144
145 # Keep a reference to the task and remove it when the task has finished
146 self.__tasks.add(task)
147 task.add_done_callback(self.__tasks.discard)
148
149 return task
150
151 def run_periodic_task(self, delay, callback, *args):
152 """
153 Calls the given callback periodically in the background
154 """
155 self.run_task(self._periodic_task, delay, callback, *args)
156
157 async def _periodic_task(self, delay, callback, *args):
158 """
159 Helper function for run_periodic_task() that will call the given
160 callback regulary after the timer has expired.
161 """
162 log.debug("Periodic callback %r started" % callback)
163
164 while True:
165 # Wait a little moment
166 await asyncio.sleep(delay)
167
168 try:
169 ret = callback(*args)
170
171 # Await ret if callback is a coroutine
172 if inspect.isawaitable(ret):
173 await ret
174
175 except Exception as e:
176 log.error("Exception in periodic callback %r" % callback, exc_info=True)
177
178 # Commands
179
180 async def command(self, *args, krb5_auth=False, return_output=False, **kwargs):
181 """
182 Runs this shell command
183 """
184 stdout = []
185
186 # Authenticate using Kerberos
187 if krb5_auth:
188 await self.krb5_auth()
189
190 log.debug("Running command: %s" % " ".join(args))
191
192 # Fork child process
193 process = await asyncio.create_subprocess_exec(
194 *args,
195 stdin=asyncio.subprocess.DEVNULL,
196 stdout=asyncio.subprocess.PIPE,
197 stderr=asyncio.subprocess.STDOUT,
198 **kwargs,
199 )
200
201 # Fetch output of command and send it to the logger
202 while True:
203 line = await process.stdout.readline()
204 if not line:
205 break
206
207 # Decode line
208 line = line.decode()
209
210 # Strip newline
211 line = line.rstrip()
212
213 # Log the output
214 log.debug(line)
215
216 # Store the output if requested
217 if return_output:
218 stdout.append(line)
219
220 # Wait until the process has finished
221 await process.wait()
222
223 if return_output:
224 return "\n".join(stdout)
225
226 async def krb5_auth(self):
227 log.debug("Performing Kerberos authentication...")
228
229 # Fetch path to keytab
230 keytab = self.settings.get("krb5-keytab")
231 if not keytab:
232 log.warning("No keytab configured")
233 return
234
235 # Fetch Kerberos principal
236 principal = self.settings.get("krb5-principal")
237 if not principal:
238 log.warning("No Kerberos principal configured")
239 return
240
241 # Fetch a Kerberos ticket
242 await self.command(
243 "kinit", "-k", "-t", keytab, principal,
244 )
245
246 async def copy(self, src, dst):
247 """
248 Copies a file from src to dst
249 """
250 log.debug("Copying %s to %s" % (src, dst))
251
252 # Create parent directory
253 await self.make_parent_directory(dst)
254
255 # Copy data without any metadata
256 await asyncio.to_thread(shutil.copyfile, src, dst)
257
258 async def make_parent_directory(self, path):
259 """
260 Creates the parent directory of path
261 """
262 path = os.path.dirname(path)
263
264 # Create destination path (if it does not exist)
265 try:
266 await asyncio.to_thread(os.makedirs, path)
267 except FileExistsError:
268 pass
269
270 async def unlink(self, path):
271 """
272 Unlinks path
273 """
274 # Normalize the path
275 path = os.path.abspath(path)
276
277 # Check if the path is within our base directory
278 if not path.startswith(self.basepath):
279 raise OSError("Cannot delete %s which is outside %s" % (path, self.basepath))
280
281 log.debug("Unlinking %s" % path)
282
283 await asyncio.to_thread(self._unlink, path)
284
285 def _unlink(self, path):
286 # Unlink the file we were asked to unlink
287 try:
288 os.unlink(path)
289 except OSError as e:
290 return
291
292 # Try to delete any empty parent directories
293 while True:
294 # Get the parent directory
295 path = os.path.dirname(path)
296
297 # Break if we reached the base path
298 if path == self.basepath:
299 break
300
301 # Call rmdir()
302 try:
303 os.rmdir(path)
304 except OSError as e:
305 break
306
307 log.debug(" Cleaned up %s..." % path)
308
309 def _write_tempfile(self, content):
310 """
311 Writes the content to a temporary file and returns its path
312 """
313 t = tempfile.NamedTemporaryFile(delete=False)
314
315 # Write the content
316 if content:
317 t.write(content.encode())
318
319 # Close the file
320 t.close()
321
322 return t.name
323
324 async def open(self, path):
325 """
326 Opens a package and returns the archive
327 """
328 return await asyncio.to_thread(self._open, path)
329
330 def _open(self, path):
331 log.debug("Opening %s..." % path)
332
333 # Open the archive
334 with self.pakfire() as p:
335 return p.open(path)
336
337 @property
338 def ssl_context(self):
339 # Create SSL context
340 context = ssl.create_default_context()
341
342 # Fetch client certificate
343 certificate = self.settings.get("client-certificate", None)
344 key = self.settings.get("client-key", None)
345
346 # Apply client certificate
347 if certificate and key:
348 with tempfile.NamedTemporaryFile(mode="w") as f_cert:
349 f_cert.write(certificate)
350 f_cert.flush()
351
352 with tempfile.NamedTemporaryFile(mode="w") as f_key:
353 f_key.write(key)
354 f_key.flush()
355
356 context.load_cert_chain(f_cert.name, f_key.name)
357
358 return context
359
360 async def load_certificate(self, certfile, keyfile):
361 with self.db.transaction():
362 # Load certificate
363 with open(certfile) as f:
364 self.settings.set("client-certificate", f.read())
365
366 # Load key file
367 with open(keyfile) as f:
368 self.settings.set("client-key", f.read())
369
370 log.info("Updated certificates")
371
372 async def cleanup(self):
373 """
374 Called regularly to cleanup any left-over resources
375 """
376 # Messages
377 await self.messages.queue.cleanup()
378
379 # Sessions
380 await self.sessions.cleanup()
381
382 # Uploads
383 await self.uploads.cleanup()
384
385 async def sync(self):
386 """
387 Syncs any repository that should be mirrored
388 """
389 log.info("Syncing mirrors...")
390
391 # Fetch the sync target
392 target = self.settings.get("sync-target")
393 if not target:
394 log.warning("No sync target configured")
395 return 0
396
397 commandline = [
398 "rsync",
399
400 # Show what is being transferred
401 #"--verbose",
402
403 # Compress any transferred data
404 "--compress",
405
406 # Enable archive mode
407 "--archive",
408
409 # Preserve hardlinks, ACLs & XATTRs
410 "--hard-links",
411 "--acls",
412 "--xattrs",
413
414 # Delete any files that we have deleted
415 "--delete",
416 "--delete-excluded",
417
418 # Remove any empty directories
419 "--prune-empty-dirs",
420
421 # Make the transaction atomic
422 "--delay-updates",
423
424 # Add source & target
425 "%s/" % self.basepath,
426 target,
427 ]
428
429 # Add all mirrored repositories
430 for repo in self.repos.mirrored:
431 path = os.path.relpath(repo.local_path(), self.basepath)
432
433 commandline.append("--include=%s***" % path)
434
435 # Exclude everything that hasn't been included
436 commandline += ("--include=*/", "--exclude=*")
437
438 # Run command
439 await self.command(*commandline, krb5_auth=True)
440
441
442 def setup_logging():
443 """
444 Configures the logger for the buildservice backend
445 """
446 # Do not propagate anything from the build service up to any Pakfire instances
447 log.propagate = 0
448
449 # Enable debug logging
450 log.setLevel(logging.DEBUG)
451
452 # Log everything to journal
453 handler = systemd.journal.JournalHandler(
454 SYSLOG_IDENTIFIER="pakfire-build-service",
455 )
456 log.addHandler(handler)
457
458
459 # Setup logging
460 setup_logging()