]> git.ipfire.org Git - pbs.git/blame - src/buildservice/jobqueue.py
Merge Pakfire Hub into the main webapp
[pbs.git] / src / buildservice / jobqueue.py
CommitLineData
fd43d5e1
MT
1#!/usr/bin/python
2
ac10fd43
MT
3import logging
4
fd43d5e1
MT
5from . import base
6
a329017a 7log = logging.getLogger("pakfire.buildservice.jobqueue")
ac10fd43 8
fd43d5e1 9class JobQueue(base.Object):
f062b044
MT
10 # A list of all builders that have a connection
11 connections = []
12
fd43d5e1 13 def __iter__(self):
0c35aa44 14 jobs = self.backend.jobs._get_jobs("SELECT jobs.* FROM job_queue queue \
fd43d5e1
MT
15 LEFT JOIN jobs ON queue.job_id = jobs.id")
16
17 return iter(jobs)
18
19 def __len__(self):
0c35aa44 20 res = self.db.get("SELECT COUNT(*) AS len FROM job_queue")
fd43d5e1
MT
21
22 return res.len
23
e0a1f367 24 def pop(self, builder):
d287a114
MT
25 """
26 Returns the next build job that matches the given architectures
27 """
28 return self.backend.jobs._get_job("""
29 SELECT
30 jobs.*
31 FROM
32 job_queue queue
33 LEFT JOIN
34 jobs ON queue.job_id = jobs.id
35 WHERE
36 queue.arch = ANY(%s)
37 LIMIT 1""",
e0a1f367 38 builder.supported_arches,
d287a114 39 )
fd43d5e1 40
f062b044
MT
41 async def open(self, builder, connection):
42 """
43 Called when a builder opens a connection
44 """
45 log.debug("Connection opened by %s" % builder)
46
47 # Find any previous connections of this builder and close them
48 for c in self.connections:
49 if not c.builder == builder:
50 continue
51
52 log.warning("Closing connection to builder %s because it is being replaced" % builder)
53
54 # Close the previous connection
55 c.close(code=1000, reason="Replaced by a new connection")
56
57 # Add this connection to the list
58 self.connections.append(connection)
59
60 # Dispatch any jobs immediately
61 await self.dispatch_jobs()
62
63 def close(self, builder, connection):
64 log.debug("Connection to %s closed" % builder)
65
66 # Remove the connection
67 try:
68 self.connections.remove(connection)
69 except IndexError:
70 pass
71
72 async def dispatch_jobs(self):
73 """
74 Will be called regularly and will dispatch any pending jobs to any
75 available builders
76 """
77 log.debug("Dispatching jobs...")
78
79 # Exit if there are no builders connected
80 if not self.connections:
81 log.debug(" No connections open")
82 return
83
84 # Map all connections by builder
85 builders = { c.builder : c for c in self.connections }
86
87 # Process all builders and assign jobs
88 # We prioritize builders with fewer jobs
89 for builder in sorted(builders, key=lambda b: len(b.jobs)):
90 log.debug(" Processing builder %s" % builder)
91
92 # Find the connection
93 connection = builders[builder]
94
95 with self.backend.db.transaction():
96 if not builder.is_ready():
97 log.debug(" Builder %s is not ready" % builder)
98 continue
99
100 # We are ready for a new job
101 job = self.pop(builder)
102 if job:
103 connection.assign_job(job)
104 continue
105
106 log.debug(" No jobs processable for %s" % builder)
107
108 # If there is no job for the builder, we might as well shut it down
109 await builder.stop()