]> git.ipfire.org Git - pbs.git/blob - src/buildservice/__init__.py
backend: Add function to create parent directories
[pbs.git] / src / buildservice / __init__.py
1 #!/usr/bin/python
2
3 import asyncio
4 import configparser
5 import inspect
6 import logging
7 import os
8 import pakfire
9 import shutil
10 import ssl
11 import systemd.journal
12 import tempfile
13 import urllib.parse
14
15 from . import aws
16 from . import bugtracker
17 from . import builders
18 from . import builds
19 from . import cache
20 from . import config
21 from . import database
22 from . import distribution
23 from . import events
24 from . import jobqueue
25 from . import jobs
26 from . import keys
27 from . import messages
28 from . import mirrors
29 from . import packages
30 from . import repository
31 from . import settings
32 from . import sessions
33 from . import sources
34 from . import uploads
35 from . import users
36
37 log = logging.getLogger("pakfire.buildservice")
38
39 # Import version
40 from .__version__ import VERSION as __version__
41
42 from .decorators import *
43 from .constants import *
44
45 class Backend(object):
46 version = __version__
47
48 # A list of any background tasks
49 __tasks = set()
50
51 def __init__(self, config_file, test=False):
52 self.test = test
53
54 # Read configuration file
55 self.config = self.read_config(config_file)
56
57 # Fetch the base path
58 self.basepath = self.config.get("global", "basepath")
59
60 # Global pakfire settings (from database).
61 self.settings = settings.Settings(self)
62
63 self.aws = aws.AWS(self)
64 self.builds = builds.Builds(self)
65 self.cache = cache.Cache(self)
66 self.jobs = jobs.Jobs(self)
67 self.builders = builders.Builders(self)
68 self.distros = distribution.Distributions(self)
69 self.events = events.Events(self)
70 self.jobqueue = jobqueue.JobQueue(self)
71 self.keys = keys.Keys(self)
72 self.messages = messages.Messages(self)
73 self.mirrors = mirrors.Mirrors(self)
74 self.packages = packages.Packages(self)
75 self.repos = repository.Repositories(self)
76 self.sessions = sessions.Sessions(self)
77 self.sources = sources.Sources(self)
78 self.uploads = uploads.Uploads(self)
79 self.users = users.Users(self)
80
81 # Open a connection to bugzilla.
82 self.bugzilla = bugtracker.Bugzilla(self)
83
84 log.info("Pakfire Build Service initialized at %s" % self.basepath)
85
86 def read_config(self, path):
87 c = configparser.ConfigParser()
88
89 # Read configuration from file
90 if path:
91 c.read(path)
92
93 return c
94
95 @lazy_property
96 def db(self):
97 try:
98 name = self.config.get("database", "name")
99 hostname = self.config.get("database", "hostname")
100 user = self.config.get("database", "user")
101 password = self.config.get("database", "password")
102 except configparser.Error as e:
103 log.error("Error parsing the config: %s" % e.message)
104
105 log.debug("Connecting to database %s @ %s" % (name, hostname))
106
107 return database.Connection(hostname, name, user=user, password=password)
108
109 def path(self, *args):
110 """
111 Takes a relative path and makes it absolute
112 """
113 return os.path.join(self.basepath, *args)
114
115 def path_to_url(self, path):
116 """
117 Takes a path to a file on the file system and converts it into a URL
118 """
119 # The base URL
120 baseurl = self.settings.get("baseurl")
121
122 # Path to package
123 path = os.path.join(
124 "files", os.path.relpath(path, self.basepath),
125 )
126
127 # Join it all together
128 return urllib.parse.urljoin(baseurl, path)
129
130 def pakfire(self, *args, **kwargs):
131 """
132 Launches a new Pakfire instance with the given configuration
133 """
134 return config.PakfireConfig(self, *args, **kwargs)
135
136 # Functions to run something in background
137
138 def run_task(self, callback, *args):
139 """
140 Runs the given coroutine in the background
141 """
142 # Create a new task
143 task = asyncio.create_task(callback(*args))
144
145 # Keep a reference to the task and remove it when the task has finished
146 self.__tasks.add(task)
147 task.add_done_callback(self.__tasks.discard)
148
149 return task
150
151 def run_periodic_task(self, delay, callback, *args):
152 """
153 Calls the given callback periodically in the background
154 """
155 self.run_task(self._periodic_task, delay, callback, *args)
156
157 async def _periodic_task(self, delay, callback, *args):
158 """
159 Helper function for run_periodic_task() that will call the given
160 callback regulary after the timer has expired.
161 """
162 log.debug("Periodic callback %r started" % callback)
163
164 while True:
165 # Wait a little moment
166 await asyncio.sleep(delay)
167
168 try:
169 ret = callback(*args)
170
171 # Await ret if callback is a coroutine
172 if inspect.isawaitable(ret):
173 await ret
174
175 except Exception as e:
176 log.error("Exception in periodic callback %r" % callback, exc_info=True)
177
178 # Commands
179
180 async def command(self, *args, krb5_auth=False, **kwargs):
181 """
182 Runs this shell command
183 """
184 # Authenticate using Kerberos
185 if krb5_auth:
186 await self.krb5_auth()
187
188 log.debug("Running command: %s" % " ".join(args))
189
190 # Fork child process
191 process = await asyncio.create_subprocess_exec(
192 *args,
193 stdin=asyncio.subprocess.DEVNULL,
194 stdout=asyncio.subprocess.PIPE,
195 stderr=asyncio.subprocess.STDOUT,
196 **kwargs,
197 )
198
199 # Fetch output of command and send it to the logger
200 while True:
201 line = await process.stdout.readline()
202 if not line:
203 break
204
205 # Decode line
206 line = line.decode()
207
208 # Strip newline
209 line = line.rstrip()
210
211 log.info(line)
212
213 # Wait until the process has finished
214 await process.wait()
215
216 async def krb5_auth(self):
217 log.debug("Performing Kerberos authentication...")
218
219 # Fetch path to keytab
220 keytab = self.settings.get("krb5-keytab")
221 if not keytab:
222 log.warning("No keytab configured")
223 return
224
225 # Fetch Kerberos principal
226 principal = self.settings.get("krb5-principal")
227 if not principal:
228 log.warning("No Kerberos principal configured")
229 return
230
231 # Fetch a Kerberos ticket
232 await self.command(
233 "kinit", "-k", "-t", keytab, principal,
234 )
235
236 async def copy(self, src, dst):
237 """
238 Copies a file from src to dst
239 """
240 log.debug("Copying %s to %s" % (src, dst))
241
242 # Create parent directory
243 await self.make_parent_directory(dst)
244
245 # Copy data without any metadata
246 await asyncio.to_thread(shutil.copyfile, src, dst)
247
248 async def make_parent_directory(self, path):
249 """
250 Creates the parent directory of path
251 """
252 path = os.path.dirname(path)
253
254 # Create destination path (if it does not exist)
255 try:
256 await asyncio.to_thread(os.makedirs, path)
257 except FileExistsError:
258 pass
259
260 async def unlink(self, path):
261 """
262 Unlinks path
263 """
264 # Normalize the path
265 path = os.path.abspath(path)
266
267 # Check if the path is within our base directory
268 if not path.startswith(self.basepath):
269 raise OSError("Cannot delete %s which is outside %s" % (path, self.basepath))
270
271 log.debug("Unlinking %s" % path)
272
273 await asyncio.to_thread(self._unlink, path)
274
275 def _unlink(self, path):
276 # Unlink the file we were asked to unlink
277 try:
278 os.unlink(path)
279 except OSError as e:
280 return
281
282 # Try to delete any empty parent directories
283 while True:
284 # Get the parent directory
285 path = os.path.dirname(path)
286
287 # Break if we reached the base path
288 if path == self.basepath:
289 break
290
291 # Call rmdir()
292 try:
293 os.rmdir(path)
294 except OSError as e:
295 break
296
297 log.debug(" Cleaned up %s..." % path)
298
299 def _write_tempfile(self, content):
300 """
301 Writes the content to a temporary file and returns its path
302 """
303 t = tempfile.NamedTemporaryFile(delete=False)
304
305 # Write the content
306 if content:
307 t.write(content.encode())
308
309 # Close the file
310 t.close()
311
312 return t.name
313
314 async def open(self, path):
315 """
316 Opens a package and returns the archive
317 """
318 return await asyncio.to_thread(self._open, path)
319
320 def _open(self, path):
321 log.debug("Opening %s..." % path)
322
323 # Open the archive
324 with self.pakfire() as p:
325 return p.open(path)
326
327 @property
328 def ssl_context(self):
329 # Create SSL context
330 context = ssl.create_default_context()
331
332 # Fetch client certificate
333 certificate = self.settings.get("client-certificate", None)
334 key = self.settings.get("client-key", None)
335
336 # Apply client certificate
337 if certificate and key:
338 with tempfile.NamedTemporaryFile(mode="w") as f_cert:
339 f_cert.write(certificate)
340 f_cert.flush()
341
342 with tempfile.NamedTemporaryFile(mode="w") as f_key:
343 f_key.write(key)
344 f_key.flush()
345
346 context.load_cert_chain(f_cert.name, f_key.name)
347
348 return context
349
350 async def load_certificate(self, certfile, keyfile):
351 with self.db.transaction():
352 # Load certificate
353 with open(certfile) as f:
354 self.settings.set("client-certificate", f.read())
355
356 # Load key file
357 with open(keyfile) as f:
358 self.settings.set("client-key", f.read())
359
360 log.info("Updated certificates")
361
362 async def cleanup(self):
363 """
364 Called regularly to cleanup any left-over resources
365 """
366 # Messages
367 await self.messages.queue.cleanup()
368
369 # Sessions
370 await self.sessions.cleanup()
371
372 # Uploads
373 await self.uploads.cleanup()
374
375 async def sync(self):
376 """
377 Syncs any repository that should be mirrored
378 """
379 log.info("Syncing mirrors...")
380
381 # Fetch the sync target
382 target = self.settings.get("sync-target")
383 if not target:
384 log.warning("No sync target configured")
385 return 0
386
387 commandline = [
388 "rsync",
389
390 # Show what is being transferred
391 #"--verbose",
392
393 # Compress any transferred data
394 "--compress",
395
396 # Enable archive mode
397 "--archive",
398
399 # Preserve hardlinks, ACLs & XATTRs
400 "--hard-links",
401 "--acls",
402 "--xattrs",
403
404 # Delete any files that we have deleted
405 "--delete",
406 "--delete-excluded",
407
408 # Remove any empty directories
409 "--prune-empty-dirs",
410
411 # Make the transaction atomic
412 "--delay-updates",
413
414 # Add source & target
415 "%s/" % self.basepath,
416 target,
417 ]
418
419 # Add all mirrored repositories
420 for repo in self.repos.mirrored:
421 path = os.path.relpath(repo.local_path(), self.basepath)
422
423 commandline.append("--include=%s***" % path)
424
425 # Exclude everything that hasn't been included
426 commandline += ("--include=*/", "--exclude=*")
427
428 # Run command
429 await self.command(*commandline, krb5_auth=True)
430
431
432 def setup_logging():
433 """
434 Configures the logger for the buildservice backend
435 """
436 # Do not propagate anything from the build service up to any Pakfire instances
437 log.propagate = 0
438
439 # Enable debug logging
440 log.setLevel(logging.DEBUG)
441
442 # Log everything to journal
443 handler = systemd.journal.JournalHandler(
444 SYSLOG_IDENTIFIER="pakfire-build-service",
445 )
446 log.addHandler(handler)
447
448
449 # Setup logging
450 setup_logging()