]> git.ipfire.org Git - pbs.git/blob - src/buildservice/builders.py
93260c5f6911d62070e528bf37852f3a279f4ba7
[pbs.git] / src / buildservice / builders.py
1 #!/usr/bin/python
2
3 import asyncio
4 import botocore.exceptions
5 import datetime
6 import hashlib
7 import logging
8 import random
9 import string
10 import time
11
12 from . import base
13 from . import logs
14 from . import misc
15
16 from .decorators import *
17
18 # Setup logging
19 log = logging.getLogger("pakfire.buildservice.builders")
20
21 class Builders(base.Object):
22 def _get_builder(self, query, *args):
23 res = self.db.get(query, *args)
24
25 if res:
26 return Builder(self.backend, res.id, data=res)
27
28 def _get_builders(self, query, *args):
29 res = self.db.query(query, *args)
30
31 for row in res:
32 yield Builder(self.backend, row.id, data=row)
33
34 def __iter__(self):
35 builders = self._get_builders("SELECT * FROM builders \
36 WHERE deleted IS FALSE ORDER BY name")
37
38 return iter(builders)
39
40 def create(self, name, user=None, log=True):
41 """
42 Creates a new builder.
43 """
44 builder = self._get_builder("INSERT INTO builders(name) \
45 VALUES(%s) RETURNING *", name)
46
47 # Log what we have done.
48 if log:
49 builder.log("created", user=user)
50
51 # The Builder object and the passphrase are returned.
52 return builder
53
54 def get_by_id(self, builder_id):
55 return self._get_builder("SELECT * FROM builders WHERE id = %s", builder_id)
56
57 def get_by_name(self, name):
58 return self._get_builder("SELECT * FROM builders \
59 WHERE name = %s AND deleted IS FALSE", name)
60
61 def get_history(self, limit=None, offset=None, builder=None, user=None):
62 query = "SELECT * FROM builders_history"
63 args = []
64
65 conditions = []
66
67 if builder:
68 conditions.append("builder_id = %s")
69 args.append(builder.id)
70
71 if user:
72 conditions.append("user_id = %s")
73 args.append(user.id)
74
75 if conditions:
76 query += " WHERE %s" % " AND ".join(conditions)
77
78 query += " ORDER BY time DESC"
79
80 if limit:
81 if offset:
82 query += " LIMIT %s,%s"
83 args += [offset, limit,]
84 else:
85 query += " LIMIT %s"
86 args += [limit,]
87
88 entries = []
89 for entry in self.db.query(query, *args):
90 entry = logs.BuilderLogEntry(self.pakfire, entry)
91 entries.append(entry)
92
93 return entries
94
95 async def sync(self, *args, **kwargs):
96 """
97 Synchronize any state with AWS
98 """
99 log.info("Syncing state with AWS")
100
101 # Sync all builders
102 await asyncio.gather(*(builder.sync() for builder in self))
103
104 async def autoscale(self, wait=False):
105 """
106 This method performs two tasks:
107
108 * It will launch any new builders if more are required
109
110 * It will shutdown any builders which are no longer required
111 """
112 log.debug("Running autoscaling schedulder")
113
114 # XXX max queue length
115 threshold = datetime.timedelta(minutes=5)
116
117 # Fetch all enabled builders and whether they are running or not
118 builders = {
119 builder : await builder.is_running() for builder in self if builder.enabled
120 }
121
122 # Sort all running builders to the beginning of the list
123 builders = sorted(builders, key=lambda b: (builders[b], b))
124
125 # Store the length of the queue for each builder
126 queue = {
127 builder : datetime.timedelta(0) for builder in builders
128 }
129
130 # Run through all build jobs and allocate them to a builder.
131 # If a builder is full (i.e. reaches the threshold of its build time),
132 # we move on to the next builder until that is full and so on.
133 for job in self.backend.jobqueue:
134 log.debug("Processing job %s..." % job)
135
136 for builder in builders:
137 # Skip disabled builders
138 if not builder.enabled:
139 continue
140
141 # Check if this builder is already at capacity
142 if queue[builder] / builder.max_jobs >= threshold:
143 log.debug("Builder %s is already full" % builder)
144 continue
145
146 # Skip if this builder cannot build this job
147 if not builder.can_build(job):
148 continue
149
150 log.debug("Builder %s can build %s" % (builder, job))
151
152 # Add the job to the total build time
153 queue[builder] += job.estimated_build_time
154 break
155
156 # Find all builders that should be running
157 builders_to_be_launched = [
158 builder for builder in builders if queue[builder]
159 ]
160
161 # Find all builders that are no longer needed and can be shut down
162 builders_to_be_shut_down = [
163 builder for builder in builders if not queue[builder] and not len(builder.jobs)
164 ]
165
166 # Start all builders that have been allocated at least one job
167 await asyncio.gather(
168 *(builder.start(wait=wait) for builder in builders_to_be_launched),
169 )
170
171 # Shutdown the rest
172 await asyncio.gather(
173 *(builder.stop(wait=wait) for builder in builders_to_be_shut_down),
174 )
175
176 # Stats
177
178 @property
179 def total_build_time(self):
180 """
181 Returns the total build time
182 """
183 res = self.db.get("""
184 SELECT
185 SUM(
186 COALESCE(jobs.finished_at, CURRENT_TIMESTAMP)
187 -
188 jobs.started_at
189 ) AS t
190 FROM
191 jobs
192 WHERE
193 started_at IS NOT NULL""",
194 )
195
196 return res.t
197
198 @property
199 def total_build_time_by_arch(self):
200 """
201 Returns a dict with the total build times grouped by architecture
202 """
203 res = self.db.query("""
204 SELECT
205 jobs.arch AS arch,
206 SUM(
207 COALESCE(jobs.finished_at, CURRENT_TIMESTAMP)
208 -
209 jobs.started_at
210 ) AS t
211 FROM
212 jobs
213 WHERE
214 started_at IS NOT NULL
215 GROUP BY
216 jobs.arch
217 ORDER BY
218 jobs.arch""",
219 )
220
221 return { row.arch : row.t for row in res }
222
223
224
225 class Builder(base.DataObject):
226 table = "builders"
227
228 def __lt__(self, other):
229 if isinstance(other, self.__class__):
230 return self.name < other.name
231
232 return NotImplemented
233
234 def __repr__(self):
235 return "<%s %s>" % (self.__class__.__name__, self.hostname)
236
237 def __str__(self):
238 return self.hostname
239
240 def log(self, action, user=None):
241 user_id = None
242 if user:
243 user_id = user.id
244
245 self.db.execute("INSERT INTO builders_history(builder_id, action, user_id, time) \
246 VALUES(%s, %s, %s, NOW())", self.id, action, user_id)
247
248 # Description
249
250 def set_description(self, description):
251 self._set_attribute("description", description)
252
253 description = property(lambda s: s.data.description or "", set_description)
254
255 def is_online(self):
256 """
257 Returns True if the builder is online
258 """
259 if self.stats:
260 return True
261
262 return False
263
264 def set_online_until(self, online_until):
265 self._set_attribute("online_until", online_until)
266
267 online_until = property(lambda s: s.data.online_until, set_online_until)
268
269 def update_info(self, cpu_model=None, cpu_count=None, cpu_arch=None,
270 pakfire_version=None, os_name=None):
271 self.db.execute("""
272 UPDATE
273 builders
274 SET
275 updated_at = CURRENT_TIMESTAMP,
276 cpu_model = %s,
277 cpu_count = %s,
278 cpu_arch = %s,
279 pakfire_version = %s,
280 os_name = %s
281 WHERE
282 id = %s""",
283 cpu_model,
284 cpu_count,
285 cpu_arch,
286 pakfire_version,
287 os_name,
288 self.id,
289 )
290
291 def log_stats(self, cpu_user, cpu_nice, cpu_system, cpu_idle, cpu_iowait,
292 cpu_irq, cpu_softirq, cpu_steal, cpu_guest, cpu_guest_nice,
293 loadavg1, loadavg5, loadavg15, mem_total, mem_available, mem_used,
294 mem_free, mem_active, mem_inactive, mem_buffers, mem_cached, mem_shared,
295 swap_total, swap_used, swap_free):
296 """
297 Logs some stats about this builder
298 """
299 self.db.execute("""
300 INSERT INTO
301 builder_stats
302 (
303 builder_id,
304 cpu_user,
305 cpu_nice,
306 cpu_system,
307 cpu_idle,
308 cpu_iowait,
309 cpu_irq,
310 cpu_softirq,
311 cpu_steal,
312 cpu_guest,
313 cpu_guest_nice,
314 loadavg1,
315 loadavg5,
316 loadavg15,
317 mem_total,
318 mem_available,
319 mem_used,
320 mem_free,
321 mem_active,
322 mem_inactive,
323 mem_buffers,
324 mem_cached,
325 mem_shared,
326 swap_total,
327 swap_used,
328 swap_free
329 )
330 VALUES (
331 %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
332 %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
333 )""",
334 self.id,
335 cpu_user,
336 cpu_nice,
337 cpu_system,
338 cpu_idle,
339 cpu_iowait,
340 cpu_irq,
341 cpu_softirq,
342 cpu_steal,
343 cpu_guest,
344 cpu_guest_nice,
345 loadavg1,
346 loadavg5,
347 loadavg15,
348 mem_total,
349 mem_available,
350 mem_used,
351 mem_free,
352 mem_active,
353 mem_inactive,
354 mem_buffers,
355 mem_cached,
356 mem_shared,
357 swap_total,
358 swap_used,
359 swap_free,
360 )
361
362 @lazy_property
363 def stats(self):
364 """
365 Returns the latest stats data (if any)
366 """
367 return self.db.get("""
368 SELECT
369 *
370 FROM
371 builder_stats
372 WHERE
373 builder_id = %s
374 AND
375 created_at >= CURRENT_TIMESTAMP - INTERVAL '10 minutes'
376 ORDER BY
377 created_at DESC
378 LIMIT 1""",
379 self.id,
380 )
381
382 # Enabled
383
384 def set_enabled(self, enabled):
385 self._set_attribute("enabled", enabled)
386
387 enabled = property(lambda s: s.data.enabled, set_enabled)
388
389 # Permissions
390
391 def has_perm(self, user):
392 # Anonymous users have no permissions
393 if not user:
394 return False
395
396 # Admins have all permissions
397 return user.is_admin()
398
399 @property
400 def arch(self):
401 """
402 The native architecture - Alias for "cpu_arch"
403 """
404 return self.cpu_arch
405
406 @property
407 def supported_arches(self):
408 """
409 Returns a list of all architectures this builder can natively build for
410 """
411 # Every builder supports noarch
412 arches = ["noarch"]
413
414 # We can always build our native architeture
415 if self.arch:
416 arches.append(self.arch)
417
418 return sorted(arches)
419
420 def can_build(self, job):
421 return job.arch in self.supported_arches
422
423 def set_testmode(self, testmode):
424 self._set_attribute("testmode", testmode)
425
426 testmode = property(lambda s: s.data.testmode, set_testmode)
427
428 # Jobs
429
430 @property
431 def jobs(self):
432 jobs = self.backend.jobs._get_jobs("""
433 SELECT
434 jobs.*
435 FROM
436 jobs
437 WHERE
438 started_at IS NOT NULL
439 AND
440 finished_at IS NULL
441 AND
442 builder_id = %s
443 ORDER BY
444 started_at DESC""",
445 self.id,
446 )
447
448 return list(jobs)
449
450 # Max Jobs
451
452 def get_max_jobs(self):
453 return self.data.max_jobs
454
455 def set_max_jobs(self, value):
456 self._set_attribute("max_jobs", value)
457
458 max_jobs = property(get_max_jobs, set_max_jobs)
459
460 @property
461 def name(self):
462 return self.data.name
463
464 @property
465 def hostname(self):
466 return self.name
467
468 @property
469 def pakfire_version(self):
470 return self.data.pakfire_version or ""
471
472 @property
473 def os_name(self):
474 return self.data.os_name or ""
475
476 @property
477 def cpu_model(self):
478 return self.data.cpu_model or ""
479
480 @property
481 def cpu_count(self):
482 return self.data.cpu_count
483
484 @property
485 def cpu_arch(self):
486 return self.data.cpu_arch
487
488 @property
489 def host_key_id(self):
490 return self.data.host_key_id
491
492 @property
493 def state(self):
494 if not self.enabled:
495 return "disabled"
496
497 if self.data.time_keepalive is None:
498 return "offline"
499
500 #if self.data.updated >= 5*60:
501 # return "offline"
502
503 return "online"
504
505 def get_history(self, *args, **kwargs):
506 kwargs["builder"] = self
507
508 return self.pakfire.builders.get_history(*args, **kwargs)
509
510 def is_ready(self):
511 # If the builder is not enabled, we are obviously not ready
512 if not self.enabled:
513 return False
514
515 # Does this builder have reached its job limit?
516 if len(self.jobs) >= self.max_jobs:
517 return False
518
519 # Looks like we are ready
520 return True
521
522 # AWS
523
524 @property
525 def instance_id(self):
526 return self.data.instance_id
527
528 @property
529 def instance_type(self):
530 return self.data.instance_type
531
532 @lazy_property
533 def instance(self):
534 if self.instance_id:
535 return self.backend.aws.ec2.Instance(self.instance_id)
536
537 async def sync(self):
538 log.info("Syncing AWS state for %s" % self)
539
540 if not self.instance:
541 log.debug("%s does not have an instance ID" % self)
542 return
543
544 # This callback is being executed in a separate thread
545 # because boto3 is not thread-safe
546 def callback():
547 log.debug("%s is currently in state: %s" % (self, self.instance.state))
548
549 # Launch in a separate thread
550 await asyncio.to_thread(callback)
551
552 async def is_running(self):
553 """
554 Returns True if this builder is currently running
555 """
556 state = await asyncio.to_thread(self._fetch_state)
557
558 return state in ("pending", "running")
559
560 async def is_shutdown(self):
561 """
562 Returns True if this builder is shut down
563 """
564 state = await asyncio.to_thread(self._fetch_state)
565
566 return state == "stopped"
567
568 async def is_shutting_down(self):
569 """
570 Returns True if this builder is currently shutting down
571 """
572 state = await asyncio.to_thread(self._fetch_state)
573
574 return state in ("shutting-down", "stopping")
575
576 def _fetch_state(self):
577 """
578 Returns the current state of this instance
579 """
580 if self.instance:
581 return self.instance.state.get("Name")
582
583 async def start(self, wait=True):
584 """
585 Starts the instance on AWS
586 """
587 await asyncio.to_thread(self._start, wait=wait)
588
589 def _start(self, wait):
590 # Requires an instance
591 if not self.instance:
592 return
593
594 log.info("Starting %s" % self)
595
596 # Set correct instance type
597 self._set_instance_type()
598
599 # Send the start signal
600 self.instance.start()
601
602 # End here if we don't want to wait
603 if not wait:
604 return
605
606 log.debug("Waiting until %s has started" % self)
607
608 # And wait until the instance is running
609 self.instance.wait_until_running()
610
611 log.debug("%s has been started" % self)
612
613 def _set_instance_type(self):
614 """
615 Changes the type of this instance
616 """
617 # Don't try setting instance type if nothing is configured
618 if not self.instance_type:
619 return
620
621 # Check if this needs changing at all
622 if self.instance.instance_type == self.instance_type:
623 return
624
625 log.debug("Changing instance type of %s to %s" % (self, self.instance_type))
626
627 # Send the change
628 try:
629 self.instance.modify_attribute(
630 InstanceType={
631 "Value" : self.instance_type,
632 }
633 )
634
635 # Log an error if this request wasn't successful
636 except botocore.exceptions.ClientError as e:
637 log.warning("Could not change instance type of %s: %s" % (self, e))
638
639 async def stop(self, wait=True):
640 """
641 Stops this instance on AWS
642 """
643 await asyncio.to_thread(self._stop, wait=wait)
644
645 def _stop(self, wait):
646 # Requires an instance
647 if not self.instance:
648 return
649
650 log.info("Stopping %s" % self)
651
652 # Send the stop signal
653 self.instance.stop()
654
655 # End here if we don't want to wait
656 if not wait:
657 return
658
659 log.debug("Waiting until %s has stopped" % self)
660
661 # And wait until the instance has been stopped
662 self.instance.wait_until_stopped()
663
664 log.debug("%s has been stopped" % self)
665
666 # Stats
667
668 @lazy_property
669 def total_build_time(self):
670 res = self.db.get("""
671 SELECT
672 SUM(
673 COALESCE(jobs.finished_at, CURRENT_TIMESTAMP)
674 -
675 jobs.started_at
676 ) AS t
677 FROM
678 jobs
679 WHERE
680 started_at IS NOT NULL
681 AND
682 builder_id = %s""",
683 self.id,
684 )
685
686 return res.t
687
688
689 def generate_password_hash(password, salt=None, algo="sha512"):
690 """
691 This function creates a salted digest of the given password.
692 """
693 # Generate the salt (length = 16) of none was given.
694 if salt is None:
695 salt = misc.generate_random_string(length=16)
696
697 # Compute the hash.
698 # <SALT> + <PASSWORD>
699 if not algo in hashlib.algorithms:
700 raise Exception("Unsupported password hash algorithm: %s" % algo)
701
702 # Calculate the digest.
703 h = hashlib.new(algo)
704 h.update(salt)
705 h.update(password)
706
707 # Output string is of kind "<algo>$<salt>$<hash>".
708 return "$".join((algo, salt, h.hexdigest()))
709
710 def check_password_hash(password, password_hash):
711 """
712 Check a plain-text password with the given digest.
713 """
714 # Handle plaintext passwords (plain$<password>).
715 if password_hash.startswith("plain$"):
716 return password_hash[6:] == password
717
718 try:
719 algo, salt, digest = password_hash.split("$", 2)
720 except ValueError:
721 logging.warning("Unknown password hash: %s" % password_hash)
722 return False
723
724 # Re-generate the password hash and compare the result.
725 return password_hash == generate_password_hash(password, salt=salt, algo=algo)