]>
git.ipfire.org Git - pbs.git/blob - src/buildservice/builders.py
93260c5f6911d62070e528bf37852f3a279f4ba7
4 import botocore
.exceptions
16 from .decorators
import *
19 log
= logging
.getLogger("pakfire.buildservice.builders")
21 class Builders(base
.Object
):
22 def _get_builder(self
, query
, *args
):
23 res
= self
.db
.get(query
, *args
)
26 return Builder(self
.backend
, res
.id, data
=res
)
28 def _get_builders(self
, query
, *args
):
29 res
= self
.db
.query(query
, *args
)
32 yield Builder(self
.backend
, row
.id, data
=row
)
35 builders
= self
._get
_builders
("SELECT * FROM builders \
36 WHERE deleted IS FALSE ORDER BY name")
40 def create(self
, name
, user
=None, log
=True):
42 Creates a new builder.
44 builder
= self
._get
_builder
("INSERT INTO builders(name) \
45 VALUES(%s) RETURNING *", name
)
47 # Log what we have done.
49 builder
.log("created", user
=user
)
51 # The Builder object and the passphrase are returned.
54 def get_by_id(self
, builder_id
):
55 return self
._get
_builder
("SELECT * FROM builders WHERE id = %s", builder_id
)
57 def get_by_name(self
, name
):
58 return self
._get
_builder
("SELECT * FROM builders \
59 WHERE name = %s AND deleted IS FALSE", name
)
61 def get_history(self
, limit
=None, offset
=None, builder
=None, user
=None):
62 query
= "SELECT * FROM builders_history"
68 conditions
.append("builder_id = %s")
69 args
.append(builder
.id)
72 conditions
.append("user_id = %s")
76 query
+= " WHERE %s" % " AND ".join(conditions
)
78 query
+= " ORDER BY time DESC"
82 query
+= " LIMIT %s,%s"
83 args
+= [offset
, limit
,]
89 for entry
in self
.db
.query(query
, *args
):
90 entry
= logs
.BuilderLogEntry(self
.pakfire
, entry
)
95 async def sync(self
, *args
, **kwargs
):
97 Synchronize any state with AWS
99 log
.info("Syncing state with AWS")
102 await asyncio
.gather(*(builder
.sync() for builder
in self
))
104 async def autoscale(self
, wait
=False):
106 This method performs two tasks:
108 * It will launch any new builders if more are required
110 * It will shutdown any builders which are no longer required
112 log
.debug("Running autoscaling schedulder")
114 # XXX max queue length
115 threshold
= datetime
.timedelta(minutes
=5)
117 # Fetch all enabled builders and whether they are running or not
119 builder
: await builder
.is_running() for builder
in self
if builder
.enabled
122 # Sort all running builders to the beginning of the list
123 builders
= sorted(builders
, key
=lambda b
: (builders
[b
], b
))
125 # Store the length of the queue for each builder
127 builder
: datetime
.timedelta(0) for builder
in builders
130 # Run through all build jobs and allocate them to a builder.
131 # If a builder is full (i.e. reaches the threshold of its build time),
132 # we move on to the next builder until that is full and so on.
133 for job
in self
.backend
.jobqueue
:
134 log
.debug("Processing job %s..." % job
)
136 for builder
in builders
:
137 # Skip disabled builders
138 if not builder
.enabled
:
141 # Check if this builder is already at capacity
142 if queue
[builder
] / builder
.max_jobs
>= threshold
:
143 log
.debug("Builder %s is already full" % builder
)
146 # Skip if this builder cannot build this job
147 if not builder
.can_build(job
):
150 log
.debug("Builder %s can build %s" % (builder
, job
))
152 # Add the job to the total build time
153 queue
[builder
] += job
.estimated_build_time
156 # Find all builders that should be running
157 builders_to_be_launched
= [
158 builder
for builder
in builders
if queue
[builder
]
161 # Find all builders that are no longer needed and can be shut down
162 builders_to_be_shut_down
= [
163 builder
for builder
in builders
if not queue
[builder
] and not len(builder
.jobs
)
166 # Start all builders that have been allocated at least one job
167 await asyncio
.gather(
168 *(builder
.start(wait
=wait
) for builder
in builders_to_be_launched
),
172 await asyncio
.gather(
173 *(builder
.stop(wait
=wait
) for builder
in builders_to_be_shut_down
),
179 def total_build_time(self
):
181 Returns the total build time
183 res
= self
.db
.get("""
186 COALESCE(jobs.finished_at, CURRENT_TIMESTAMP)
193 started_at IS NOT NULL""",
199 def total_build_time_by_arch(self
):
201 Returns a dict with the total build times grouped by architecture
203 res
= self
.db
.query("""
207 COALESCE(jobs.finished_at, CURRENT_TIMESTAMP)
214 started_at IS NOT NULL
221 return { row
.arch
: row
.t
for row
in res
}
225 class Builder(base
.DataObject
):
228 def __lt__(self
, other
):
229 if isinstance(other
, self
.__class
__):
230 return self
.name
< other
.name
232 return NotImplemented
235 return "<%s %s>" % (self
.__class
__.__name
__, self
.hostname
)
240 def log(self
, action
, user
=None):
245 self
.db
.execute("INSERT INTO builders_history(builder_id, action, user_id, time) \
246 VALUES(%s, %s, %s, NOW())", self
.id, action
, user_id
)
250 def set_description(self
, description
):
251 self
._set
_attribute
("description", description
)
253 description
= property(lambda s
: s
.data
.description
or "", set_description
)
257 Returns True if the builder is online
264 def set_online_until(self
, online_until
):
265 self
._set
_attribute
("online_until", online_until
)
267 online_until
= property(lambda s
: s
.data
.online_until
, set_online_until
)
269 def update_info(self
, cpu_model
=None, cpu_count
=None, cpu_arch
=None,
270 pakfire_version
=None, os_name
=None):
275 updated_at = CURRENT_TIMESTAMP,
279 pakfire_version = %s,
291 def log_stats(self
, cpu_user
, cpu_nice
, cpu_system
, cpu_idle
, cpu_iowait
,
292 cpu_irq
, cpu_softirq
, cpu_steal
, cpu_guest
, cpu_guest_nice
,
293 loadavg1
, loadavg5
, loadavg15
, mem_total
, mem_available
, mem_used
,
294 mem_free
, mem_active
, mem_inactive
, mem_buffers
, mem_cached
, mem_shared
,
295 swap_total
, swap_used
, swap_free
):
297 Logs some stats about this builder
331 %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
332 %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
365 Returns the latest stats data (if any)
367 return self
.db
.get("""
375 created_at >= CURRENT_TIMESTAMP - INTERVAL '10 minutes'
384 def set_enabled(self
, enabled
):
385 self
._set
_attribute
("enabled", enabled
)
387 enabled
= property(lambda s
: s
.data
.enabled
, set_enabled
)
391 def has_perm(self
, user
):
392 # Anonymous users have no permissions
396 # Admins have all permissions
397 return user
.is_admin()
402 The native architecture - Alias for "cpu_arch"
407 def supported_arches(self
):
409 Returns a list of all architectures this builder can natively build for
411 # Every builder supports noarch
414 # We can always build our native architeture
416 arches
.append(self
.arch
)
418 return sorted(arches
)
420 def can_build(self
, job
):
421 return job
.arch
in self
.supported_arches
423 def set_testmode(self
, testmode
):
424 self
._set
_attribute
("testmode", testmode
)
426 testmode
= property(lambda s
: s
.data
.testmode
, set_testmode
)
432 jobs
= self
.backend
.jobs
._get
_jobs
("""
438 started_at IS NOT NULL
452 def get_max_jobs(self
):
453 return self
.data
.max_jobs
455 def set_max_jobs(self
, value
):
456 self
._set
_attribute
("max_jobs", value
)
458 max_jobs
= property(get_max_jobs
, set_max_jobs
)
462 return self
.data
.name
469 def pakfire_version(self
):
470 return self
.data
.pakfire_version
or ""
474 return self
.data
.os_name
or ""
478 return self
.data
.cpu_model
or ""
482 return self
.data
.cpu_count
486 return self
.data
.cpu_arch
489 def host_key_id(self
):
490 return self
.data
.host_key_id
497 if self
.data
.time_keepalive
is None:
500 #if self.data.updated >= 5*60:
505 def get_history(self
, *args
, **kwargs
):
506 kwargs
["builder"] = self
508 return self
.pakfire
.builders
.get_history(*args
, **kwargs
)
511 # If the builder is not enabled, we are obviously not ready
515 # Does this builder have reached its job limit?
516 if len(self
.jobs
) >= self
.max_jobs
:
519 # Looks like we are ready
525 def instance_id(self
):
526 return self
.data
.instance_id
529 def instance_type(self
):
530 return self
.data
.instance_type
535 return self
.backend
.aws
.ec2
.Instance(self
.instance_id
)
537 async def sync(self
):
538 log
.info("Syncing AWS state for %s" % self
)
540 if not self
.instance
:
541 log
.debug("%s does not have an instance ID" % self
)
544 # This callback is being executed in a separate thread
545 # because boto3 is not thread-safe
547 log
.debug("%s is currently in state: %s" % (self
, self
.instance
.state
))
549 # Launch in a separate thread
550 await asyncio
.to_thread(callback
)
552 async def is_running(self
):
554 Returns True if this builder is currently running
556 state
= await asyncio
.to_thread(self
._fetch
_state
)
558 return state
in ("pending", "running")
560 async def is_shutdown(self
):
562 Returns True if this builder is shut down
564 state
= await asyncio
.to_thread(self
._fetch
_state
)
566 return state
== "stopped"
568 async def is_shutting_down(self
):
570 Returns True if this builder is currently shutting down
572 state
= await asyncio
.to_thread(self
._fetch
_state
)
574 return state
in ("shutting-down", "stopping")
576 def _fetch_state(self
):
578 Returns the current state of this instance
581 return self
.instance
.state
.get("Name")
583 async def start(self
, wait
=True):
585 Starts the instance on AWS
587 await asyncio
.to_thread(self
._start
, wait
=wait
)
589 def _start(self
, wait
):
590 # Requires an instance
591 if not self
.instance
:
594 log
.info("Starting %s" % self
)
596 # Set correct instance type
597 self
._set
_instance
_type
()
599 # Send the start signal
600 self
.instance
.start()
602 # End here if we don't want to wait
606 log
.debug("Waiting until %s has started" % self
)
608 # And wait until the instance is running
609 self
.instance
.wait_until_running()
611 log
.debug("%s has been started" % self
)
613 def _set_instance_type(self
):
615 Changes the type of this instance
617 # Don't try setting instance type if nothing is configured
618 if not self
.instance_type
:
621 # Check if this needs changing at all
622 if self
.instance
.instance_type
== self
.instance_type
:
625 log
.debug("Changing instance type of %s to %s" % (self
, self
.instance_type
))
629 self
.instance
.modify_attribute(
631 "Value" : self
.instance_type
,
635 # Log an error if this request wasn't successful
636 except botocore
.exceptions
.ClientError
as e
:
637 log
.warning("Could not change instance type of %s: %s" % (self
, e
))
639 async def stop(self
, wait
=True):
641 Stops this instance on AWS
643 await asyncio
.to_thread(self
._stop
, wait
=wait
)
645 def _stop(self
, wait
):
646 # Requires an instance
647 if not self
.instance
:
650 log
.info("Stopping %s" % self
)
652 # Send the stop signal
655 # End here if we don't want to wait
659 log
.debug("Waiting until %s has stopped" % self
)
661 # And wait until the instance has been stopped
662 self
.instance
.wait_until_stopped()
664 log
.debug("%s has been stopped" % self
)
669 def total_build_time(self
):
670 res
= self
.db
.get("""
673 COALESCE(jobs.finished_at, CURRENT_TIMESTAMP)
680 started_at IS NOT NULL
689 def generate_password_hash(password
, salt
=None, algo
="sha512"):
691 This function creates a salted digest of the given password.
693 # Generate the salt (length = 16) of none was given.
695 salt
= misc
.generate_random_string(length
=16)
698 # <SALT> + <PASSWORD>
699 if not algo
in hashlib
.algorithms
:
700 raise Exception("Unsupported password hash algorithm: %s" % algo
)
702 # Calculate the digest.
703 h
= hashlib
.new(algo
)
707 # Output string is of kind "<algo>$<salt>$<hash>".
708 return "$".join((algo
, salt
, h
.hexdigest()))
710 def check_password_hash(password
, password_hash
):
712 Check a plain-text password with the given digest.
714 # Handle plaintext passwords (plain$<password>).
715 if password_hash
.startswith("plain$"):
716 return password_hash
[6:] == password
719 algo
, salt
, digest
= password_hash
.split("$", 2)
721 logging
.warning("Unknown password hash: %s" % password_hash
)
724 # Re-generate the password hash and compare the result.
725 return password_hash
== generate_password_hash(password
, salt
=salt
, algo
=algo
)