]> git.ipfire.org Git - pbs.git/blob - src/buildservice/__init__.py
sync: Move this out of the repository stuff
[pbs.git] / src / buildservice / __init__.py
1 #!/usr/bin/python
2
3 import asyncio
4 import configparser
5 import inspect
6 import logging
7 import os
8 import pakfire
9 import shutil
10 import tempfile
11 import urllib.parse
12
13 from . import aws
14 from . import bugtracker
15 from . import builders
16 from . import builds
17 from . import cache
18 from . import database
19 from . import distribution
20 from . import events
21 from . import jobqueue
22 from . import jobs
23 from . import keys
24 from . import logs
25 from . import messages
26 from . import mirrors
27 from . import packages
28 from . import repository
29 from . import settings
30 from . import sessions
31 from . import sources
32 from . import updates
33 from . import uploads
34 from . import users
35
36 log = logging.getLogger("backend")
37 log.propagate = 1
38
39 # Import version
40 from .__version__ import VERSION as __version__
41
42 from .decorators import *
43 from .constants import *
44
45 class Backend(object):
46 version = __version__
47
48 # A list of any background tasks
49 __tasks = set()
50
51 def __init__(self, config_file=None):
52 # Read configuration file.
53 self.config = self.read_config(config_file)
54
55 # Global pakfire settings (from database).
56 self.settings = settings.Settings(self)
57
58 self.aws = aws.AWS(self)
59 self.builds = builds.Builds(self)
60 self.cache = cache.Cache(self)
61 self.jobs = jobs.Jobs(self)
62 self.builders = builders.Builders(self)
63 self.distros = distribution.Distributions(self)
64 self.events = events.Events(self)
65 self.jobqueue = jobqueue.JobQueue(self)
66 self.keys = keys.Keys(self)
67 self.messages = messages.Messages(self)
68 self.mirrors = mirrors.Mirrors(self)
69 self.packages = packages.Packages(self)
70 self.repos = repository.Repositories(self)
71 self.sessions = sessions.Sessions(self)
72 self.sources = sources.Sources(self)
73 self.updates = updates.Updates(self)
74 self.uploads = uploads.Uploads(self)
75 self.users = users.Users(self)
76
77 # Open a connection to bugzilla.
78 self.bugzilla = bugtracker.Bugzilla(self)
79
80 @lazy_property
81 def _environment_configuration(self):
82 env = {}
83
84 # Get database configuration
85 env["database"] = {
86 "name" : os.environ.get("PBS_DATABASE_NAME"),
87 "hostname" : os.environ.get("PBS_DATABASE_HOSTNAME"),
88 "user" : os.environ.get("PBS_DATABASE_USER"),
89 "password" : os.environ.get("PBS_DATABASE_PASSWORD"),
90 }
91
92 return env
93
94 def read_config(self, path):
95 c = configparser.SafeConfigParser()
96
97 # Import configuration from environment
98 for section in self._environment_configuration:
99 c.add_section(section)
100
101 for k in self._environment_configuration[section]:
102 c.set(section, k, self._environment_configuration[section][k] or "")
103
104 # Load default configuration file first
105 paths = [
106 os.path.join(CONFIGSDIR, "pbs.conf"),
107 ]
108
109 if path:
110 paths.append(path)
111
112 # Load all configuration files
113 for path in paths:
114 if os.path.exists(path):
115 log.debug("Loading configuration from %s" % path)
116 c.read(path)
117 else:
118 log.error("No such file %s" % path)
119
120 return c
121
122 @lazy_property
123 def db(self):
124 try:
125 name = self.config.get("database", "name")
126 hostname = self.config.get("database", "hostname")
127 user = self.config.get("database", "user")
128 password = self.config.get("database", "password")
129 except configparser.Error as e:
130 log.error("Error parsing the config: %s" % e.message)
131
132 log.debug("Connecting to database %s @ %s" % (name, hostname))
133
134 return database.Connection(hostname, name, user=user, password=password)
135
136 def path_to_url(self, path):
137 """
138 Takes a path to a file on the file system and converts it into a URL
139 """
140 # The base URL
141 baseurl = self.settings.get("baseurl")
142
143 # Path to package
144 path = os.path.join(
145 "files", os.path.relpath(path, PAKFIRE_DIR),
146 )
147
148 # Join it all together
149 return urllib.parse.urljoin(baseurl, path)
150
151 def pakfire(self, config, offline=True, **kwargs):
152 """
153 Launches a new Pakfire instance with the given configuration
154 """
155 log.debug("Launching pakfire with configuration:\n%s" % config)
156
157 # Write configuration to file
158 t = self._write_tempfile(config)
159
160 # Launch a new Pakfire instance
161 try:
162 return pakfire.Pakfire(conf=t, logger=log.log, offline=offline, **kwargs)
163
164 finally:
165 # Delete the configuration file
166 os.unlink(t)
167
168 # Functions to run something in background
169
170 def run_task(self, callback, *args):
171 """
172 Runs the given coroutine in the background
173 """
174 # Create a new task
175 task = asyncio.create_task(callback(*args))
176
177 # Keep a reference to the task and remove it when the task has finished
178 self.__tasks.add(task)
179 task.add_done_callback(self.__tasks.discard)
180
181 return task
182
183 def run_periodic_task(self, delay, callback, *args):
184 """
185 Calls the given callback periodically in the background
186 """
187 self.run_task(self._periodic_task, delay, callback, *args)
188
189 async def _periodic_task(self, delay, callback, *args):
190 """
191 Helper function for run_periodic_task() that will call the given
192 callback regulary after the timer has expired.
193 """
194 log.debug("Periodic callback %r started" % callback)
195
196 while True:
197 # Wait a little moment
198 await asyncio.sleep(delay)
199
200 try:
201 ret = callback(*args)
202
203 # Await ret if callback is a coroutine
204 if inspect.isawaitable(ret):
205 await ret
206
207 except Exception as e:
208 log.error("Exception in periodic callback %r" % callback, exc_info=True)
209
210 # Commands
211
212 async def command(self, *args, krb5_auth=False, **kwargs):
213 """
214 Runs this shell command
215 """
216 # Authenticate using Kerberos
217 if krb5_auth:
218 await self.krb5_auth()
219
220 log.debug("Running command: %s" % " ".join(args))
221
222 # Fork child process
223 process = await asyncio.create_subprocess_exec(
224 *args,
225 stdin=asyncio.subprocess.DEVNULL,
226 stdout=asyncio.subprocess.PIPE,
227 stderr=asyncio.subprocess.STDOUT,
228 **kwargs,
229 )
230
231 # Fetch output of command and send it to the logger
232 while True:
233 line = await process.stdout.readline()
234 if not line:
235 break
236
237 # Decode line
238 line = line.decode()
239
240 # Strip newline
241 line = line.rstrip()
242
243 log.info(line)
244
245 # Wait until the process has finished
246 await process.wait()
247
248 async def krb5_auth(self):
249 log.debug("Performing Kerberos authentication...")
250
251 # Fetch path to keytab
252 keytab = self.settings.get("krb5-keytab")
253 if not keytab:
254 log.warning("No keytab configured")
255 return
256
257 # Fetch Kerberos principal
258 principal = self.settings.get("krb5-principal")
259 if not principal:
260 log.warning("No Kerberos principal configured")
261 return
262
263 # Fetch a Kerberos ticket
264 await self.command(
265 "kinit", "-k", "-t", keytab, principal,
266 )
267
268 async def copy(self, src, dst):
269 """
270 Copies a file from src to dst
271 """
272 log.debug("Copying %s to %s" % (src, dst))
273
274 path = os.path.dirname(dst)
275
276 # Create destination path (if it does not exist)
277 try:
278 await asyncio.to_thread(os.makedirs, path)
279 except FileExistsError:
280 pass
281
282 # Copy data without any metadata
283 await asyncio.to_thread(shutil.copyfile, src, dst)
284
285 async def unlink(self, path):
286 """
287 Unlinks path
288 """
289 log.debug("Unlinking %s" % path)
290
291 try:
292 await asyncio.to_thread(os.unlink, path)
293 except OSError as e:
294 pass
295
296 def _write_tempfile(self, content):
297 """
298 Writes the content to a temporary file and returns its path
299 """
300 t = tempfile.NamedTemporaryFile(delete=False)
301
302 # Write the content
303 t.write(content.encode())
304 t.close()
305
306 return t.name
307
308 async def open(self, path):
309 """
310 Opens a package and returns the archive
311 """
312 return await asyncio.to_thread(self._open, path)
313
314 def _open(self, path):
315 # Create a dummy Pakfire instance
316 p = pakfire.Pakfire(offline=True)
317
318 # Open the archive
319 return p.open(path)
320
321 async def cleanup(self):
322 """
323 Called regularly to cleanup any left-over resources
324 """
325 # Sessions
326 await self.sessions.cleanup()
327
328 # Uploads
329 await self.uploads.cleanup()
330
331 async def sync(self):
332 """
333 Syncs any repository that should be mirrored
334 """
335 log.info("Syncing mirrors...")
336
337 # Fetch the sync target
338 target = self.settings.get("sync-target")
339 if not target:
340 log.warning("No sync target configured")
341 return 0
342
343 commandline = [
344 "rsync",
345
346 # Show what is being transferred
347 #"--verbose",
348
349 # Compress any transferred data
350 "--compress",
351
352 # Enable archive mode
353 "--archive",
354
355 # Preserve hardlinks, ACLs & XATTRs
356 "--hard-links",
357 "--acls",
358 "--xattrs",
359
360 # Delete any files that we have deleted
361 "--delete",
362 "--delete-excluded",
363
364 # Remove any empty directories
365 "--prune-empty-dirs",
366
367 # Make the transaction atomic
368 "--delay-updates",
369
370 # Add source & target
371 "%s/" % PAKFIRE_DIR,
372 target,
373 ]
374
375 # Add all mirrored repositories
376 for repo in self.repos.mirrored:
377 path = os.path.relpath(repo.local_path(), PAKFIRE_DIR)
378
379 commandline.append("--include=%s***" % path)
380
381 # Exclude everything that hasn't been included
382 commandline += ("--include=*/", "--exclude=*")
383
384 # Run command
385 await self.command(*commandline, krb5_auth=True)