]>
Commit | Line | Data |
---|---|---|
fd43d5e1 MT |
1 | #!/usr/bin/python |
2 | ||
ac10fd43 MT |
3 | import logging |
4 | ||
fd43d5e1 MT |
5 | from . import base |
6 | ||
a329017a | 7 | log = logging.getLogger("pakfire.buildservice.jobqueue") |
ac10fd43 | 8 | |
fd43d5e1 | 9 | class JobQueue(base.Object): |
f062b044 MT |
10 | # A list of all builders that have a connection |
11 | connections = [] | |
12 | ||
fd43d5e1 | 13 | def __iter__(self): |
0c35aa44 | 14 | jobs = self.backend.jobs._get_jobs("SELECT jobs.* FROM job_queue queue \ |
fd43d5e1 MT |
15 | LEFT JOIN jobs ON queue.job_id = jobs.id") |
16 | ||
17 | return iter(jobs) | |
18 | ||
19 | def __len__(self): | |
0c35aa44 | 20 | res = self.db.get("SELECT COUNT(*) AS len FROM job_queue") |
fd43d5e1 MT |
21 | |
22 | return res.len | |
23 | ||
e0a1f367 | 24 | def pop(self, builder): |
d287a114 MT |
25 | """ |
26 | Returns the next build job that matches the given architectures | |
27 | """ | |
28 | return self.backend.jobs._get_job(""" | |
29 | SELECT | |
30 | jobs.* | |
31 | FROM | |
32 | job_queue queue | |
33 | LEFT JOIN | |
34 | jobs ON queue.job_id = jobs.id | |
35 | WHERE | |
36 | queue.arch = ANY(%s) | |
37 | LIMIT 1""", | |
e0a1f367 | 38 | builder.supported_arches, |
d287a114 | 39 | ) |
fd43d5e1 | 40 | |
f062b044 MT |
41 | async def open(self, builder, connection): |
42 | """ | |
43 | Called when a builder opens a connection | |
44 | """ | |
45 | log.debug("Connection opened by %s" % builder) | |
46 | ||
47 | # Find any previous connections of this builder and close them | |
48 | for c in self.connections: | |
49 | if not c.builder == builder: | |
50 | continue | |
51 | ||
52 | log.warning("Closing connection to builder %s because it is being replaced" % builder) | |
53 | ||
54 | # Close the previous connection | |
55 | c.close(code=1000, reason="Replaced by a new connection") | |
56 | ||
57 | # Add this connection to the list | |
58 | self.connections.append(connection) | |
59 | ||
60 | # Dispatch any jobs immediately | |
61 | await self.dispatch_jobs() | |
62 | ||
63 | def close(self, builder, connection): | |
64 | log.debug("Connection to %s closed" % builder) | |
65 | ||
66 | # Remove the connection | |
67 | try: | |
68 | self.connections.remove(connection) | |
69 | except IndexError: | |
70 | pass | |
71 | ||
72 | async def dispatch_jobs(self): | |
73 | """ | |
74 | Will be called regularly and will dispatch any pending jobs to any | |
75 | available builders | |
76 | """ | |
77 | log.debug("Dispatching jobs...") | |
78 | ||
79 | # Exit if there are no builders connected | |
80 | if not self.connections: | |
81 | log.debug(" No connections open") | |
82 | return | |
83 | ||
84 | # Map all connections by builder | |
85 | builders = { c.builder : c for c in self.connections } | |
86 | ||
87 | # Process all builders and assign jobs | |
88 | # We prioritize builders with fewer jobs | |
89 | for builder in sorted(builders, key=lambda b: len(b.jobs)): | |
90 | log.debug(" Processing builder %s" % builder) | |
91 | ||
92 | # Find the connection | |
93 | connection = builders[builder] | |
94 | ||
95 | with self.backend.db.transaction(): | |
96 | if not builder.is_ready(): | |
97 | log.debug(" Builder %s is not ready" % builder) | |
98 | continue | |
99 | ||
100 | # We are ready for a new job | |
101 | job = self.pop(builder) | |
102 | if job: | |
103 | connection.assign_job(job) | |
104 | continue | |
105 | ||
106 | log.debug(" No jobs processable for %s" % builder) | |
107 | ||
108 | # If there is no job for the builder, we might as well shut it down | |
109 | await builder.stop() |