]> git.ipfire.org Git - pbs.git/blob - src/web/jobs.py
builds: Move job queue
[pbs.git] / src / web / jobs.py
1 #!/usr/bin/python3
2
3 import logging
4 import tornado.web
5 import tornado.websocket
6
7 from .. import misc
8 from . import base
9 from . import ui_modules
10
11 # Setup logging
12 log = logging.getLogger("pbs.web.jobs")
13
14 class APIv1ControlHandler(base.APIMixin, tornado.websocket.WebSocketHandler):
15 """
16 Builders connect to this handler when they are running a build.
17
18 We can pass information about this build around in real time.
19 """
20 # Don't allow users to authenticate
21 allow_users = False
22
23 @tornado.web.authenticated
24 def open(self, job_id):
25 self.job = self.backend.jobs.get_by_uuid(job_id)
26 if not self.job:
27 raise tornado.web.HTTPError(404, "Could not find job %s" % job_id)
28
29 # Check permissions
30 if not self.job.has_perm(self.current_user):
31 raise tornado.web.HTTPError(403, "%s cannot control job %s" \
32 % (self.current_user, self.job))
33
34 # Consider the job connected
35 self.job.connected(self)
36
37 # Open a new log stream
38 self.logstream = self.backend.logstreams.open(self.job)
39
40 def on_close(self):
41 # Drop the connection to the builder
42 self.job.disconnected()
43
44 # Close the logstream
45 self.logstream.close()
46
47 async def on_message(self, message):
48 message = self._decode_json_message(message)
49
50 # Get message type & data
51 type = message.get("type")
52 data = message.get("data")
53
54 # Handle log messages
55 if type == "log":
56 await self._handle_log(**data)
57
58 # Unknown message
59 else:
60 log.warning("Received a message of an unknown type: %s" % t)
61
62 async def _handle_log(self, timestamp=None, level=None, message=None, **kwargs):
63 """
64 Called when a new log message has been received
65 """
66 await self.logstream.message(timestamp, level, message)
67
68
69 class APIv1FinishedHandler(base.APIMixin, tornado.web.RequestHandler):
70 @tornado.web.authenticated
71 async def post(self, uuid):
72 job = self.backend.jobs.get_by_uuid(uuid)
73 if not job:
74 raise tornado.web.HTTPError(404, "Could not find job %s" % uuid)
75
76 # Check permissions
77 if not job.has_perm(self.current_user):
78 raise tornado.web.HTTPError(403, "%s cannot edit job %s" % (self.current_user, job))
79
80 # Success Status
81 success = self.get_argument_bool("success")
82
83 # Fetch the logfile
84 logfile = self.get_argument_upload("logfile")
85
86 # Fetch the packages
87 packages = self.get_argument_uploads("packages")
88
89 # Mark the job as finished
90 with self.db.transaction():
91 builds = await job.finished(success=success,
92 logfile=logfile, packages=packages)
93
94 # Try to dispatch the next job
95 await self.backend.jobs.queue.dispatch()
96
97 # Launch any (test) builds
98 if builds:
99 self.backend.run_task(self.backend.builds.launch, builds)
100
101 # Send something back to the builder
102 self.finish({
103 "status" : "ok",
104 })
105
106
107 class APIv1LogStreamHandler(base.BackendMixin, tornado.websocket.WebSocketHandler):
108 # No authentication required
109 async def open(self, uuid):
110 job = self.backend.jobs.get_by_uuid(uuid)
111 if not job:
112 raise tornado.web.HTTPError(404, "Could not find job %s" % uuid)
113
114 # How many messages should be initially sent?
115 limit = self.get_argument_int("limit", None)
116
117 # Join the stream
118 self.stream = await self.backend.logstreams.join(job, self, limit=limit)
119 if not self.stream:
120 raise tornado.web.HTTPError(400, "Could not join stream for %s" % job)
121
122 # Send messages without any delay
123 self.set_nodelay(True)
124
125 def on_close(self):
126 """
127 Leave the stream
128 """
129 self.stream.leave(self)
130
131 async def message(self, message):
132 """
133 Called when there is a new message to be sent to the client
134 """
135 try:
136 await self.write_message(message)
137
138 # Ignore if the message could not be sent
139 except tornado.websocket.WebSocketClosedError as e:
140 pass
141
142
143 class IndexHandler(base.BaseHandler):
144 def get(self):
145 # Pagination
146 offset = self.get_argument_int("offset", None) or 0
147 limit = self.get_argument_int("limit", None) or 50
148
149 # Filter
150 failed_only = self.get_argument_bool("failed_only")
151
152 with self.db.transaction():
153 jobs = self.backend.jobs.get_finished(failed_only=failed_only,
154 limit=limit, offset=offset)
155
156 # Group jobs by date
157 jobs = misc.group(jobs, lambda job: job.finished_at.date())
158
159 self.render("jobs/index.html", jobs=jobs, limit=limit, offset=offset,
160 failed_only=failed_only)
161
162
163 class QueueHandler(base.BaseHandler):
164 def get(self):
165 self.render("jobs/queue.html", queue=self.backend.jobs.queue)
166
167
168 class LogHandler(base.BaseHandler):
169 async def get(self, uuid):
170 job = self.backend.jobs.get_by_uuid(uuid)
171 if not job:
172 raise tornado.web.HTTPError(404, "Could not find job %s" % uuid)
173
174 # Stream the log if the job is running
175 if job.is_running():
176 self.render("jobs/log-stream.html", job=job)
177 return
178
179 tail = self.get_argument_int("tail", None)
180
181 # Should we tail the log, or stream the entire file?
182 try:
183 log = await job.tail_log(tail) if tail else await job.open_log()
184
185 # Send 404 if there is no log file
186 except FileNotFoundError as e:
187 raise tornado.web.HTTPError(404, "Could not find log for %s" % job) from e
188
189 # Set Content-Type header
190 self.set_header("Content-Type", "text/plain")
191
192 # Stream the entire log
193 for line in log:
194 self.write(line)
195
196
197 class AbortHandler(base.BaseHandler):
198 @tornado.web.authenticated
199 def get(self, uuid):
200 job = self.backend.jobs.get_by_uuid(uuid)
201 if not job:
202 raise tornado.web.HTTPError(404, "Job not found: %s" % uuid)
203
204 # Check for permissions
205 if not job.has_perm(self.current_user):
206 raise tornado.web.HTTPError(403)
207
208 self.render("jobs/abort.html", job=job)
209
210 @tornado.web.authenticated
211 async def post(self, uuid):
212 job = self.backend.jobs.get_by_uuid(uuid)
213 if not job:
214 raise tornado.web.HTTPError(404, "Job not found: %s" % uuid)
215
216 # Check for permissions
217 if not job.has_perm(self.current_user):
218 raise tornado.web.HTTPError(403)
219
220 with self.db.transaction():
221 await job.abort(self.current_user)
222
223 self.redirect("/builds/%s" % job.build.uuid)
224
225
226 class RetryHandler(base.BaseHandler):
227 @tornado.web.authenticated
228 def get(self, uuid):
229 job = self.backend.jobs.get_by_uuid(uuid)
230 if not job:
231 raise tornado.web.HTTPError(404, "Could not find job %s" % uuid)
232
233 # Check for permissions
234 if not job.has_perm(self.current_user):
235 raise tornado.web.HTTPError(403)
236
237 self.render("jobs/retry.html", job=job)
238
239 @tornado.web.authenticated
240 async def post(self, uuid):
241 job = self.backend.jobs.get_by_uuid(uuid)
242 if not job:
243 raise tornado.web.HTTPError(404, "Could not find job %s" % uuid)
244
245 with self.db.transaction():
246 job = await job.retry(self.current_user)
247
248 # Launch the newly created job
249 await self.backend.jobs.launch([job])
250
251 # Redirect back to the build page
252 self.redirect("/builds/%s" % job.build.uuid)
253
254
255 class ListModule(ui_modules.UIModule):
256 def render(self, jobs, show_arch_only=False, show_packages=False):
257 return self.render_string("jobs/modules/list.html", jobs=jobs,
258 show_arch_only=show_arch_only, show_packages=show_packages)
259
260
261 class QueueModule(ui_modules.UIModule):
262 def render(self, jobs):
263 return self.render_string("jobs/modules/queue.html", jobs=jobs)
264
265
266 class LogStreamModule(ui_modules.UIModule):
267 def render(self, job, limit=None, small=False):
268 return self.render_string("jobs/modules/log-stream.html",
269 job=job, limit=limit, small=small)
270
271 def javascript_files(self):
272 return [
273 "js/job-log-stream.min.js",
274 ]