]>
Commit | Line | Data |
---|---|---|
96a8f2a4 MT |
1 | #!/usr/bin/python3 |
2 | ############################################################################### | |
3 | # # | |
4 | # Pakfire - The IPFire package management system # | |
5 | # Copyright (C) 2013 Pakfire development team # | |
6 | # # | |
7 | # This program is free software: you can redistribute it and/or modify # | |
8 | # it under the terms of the GNU General Public License as published by # | |
9 | # the Free Software Foundation, either version 3 of the License, or # | |
10 | # (at your option) any later version. # | |
11 | # # | |
12 | # This program is distributed in the hope that it will be useful, # | |
13 | # but WITHOUT ANY WARRANTY; without even the implied warranty of # | |
14 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # | |
15 | # GNU General Public License for more details. # | |
16 | # # | |
17 | # You should have received a copy of the GNU General Public License # | |
18 | # along with this program. If not, see <http://www.gnu.org/licenses/>. # | |
19 | # # | |
20 | ############################################################################### | |
21 | ||
1479df35 | 22 | import asyncio |
da170b36 | 23 | import cpuinfo |
a0dc7d43 | 24 | import functools |
96a8f2a4 | 25 | import hashlib |
a0dc7d43 | 26 | import json |
6284f1f2 | 27 | import kerberos |
96a8f2a4 | 28 | import logging |
96a8f2a4 | 29 | import os.path |
364b7040 | 30 | import progressbar2 as progressbar |
da170b36 | 31 | import psutil |
1479df35 MT |
32 | import subprocess |
33 | import tempfile | |
c8d43da3 | 34 | import tornado.escape |
a0dc7d43 | 35 | import tornado.httpclient |
1479df35 | 36 | import tornado.simple_httpclient |
24763c22 | 37 | import tornado.websocket |
a0dc7d43 | 38 | import urllib.parse |
96a8f2a4 | 39 | |
dec08a5c | 40 | from . import _pakfire |
7899002d | 41 | from . import util |
96a8f2a4 | 42 | from .constants import * |
dec08a5c | 43 | from .i18n import _ |
96a8f2a4 | 44 | |
a0dc7d43 | 45 | # Setup logging |
96a8f2a4 | 46 | log = logging.getLogger("pakfire.hub") |
a0dc7d43 MT |
47 | |
48 | # Configure some useful defaults for all requests | |
49 | tornado.httpclient.AsyncHTTPClient.configure( | |
50 | None, defaults = { | |
51 | "user_agent" : "pakfire/%s" % PAKFIRE_VERSION, | |
52 | }, | |
53 | ) | |
96a8f2a4 | 54 | |
252fa19c MT |
55 | class AuthError(Exception): |
56 | """ | |
57 | Raised when the client could not authenticate against the hub | |
58 | """ | |
59 | pass | |
60 | ||
61 | ||
1dd799f3 MT |
62 | class TransportError(Exception): |
63 | pass | |
64 | ||
65 | ||
66 | class TemporaryConnectionError(TransportError): | |
67 | """ | |
68 | Raised when there is a temporary connection issue and | |
69 | the request should be tried again. | |
70 | """ | |
71 | pass | |
72 | ||
73 | ||
96a8f2a4 | 74 | class Hub(object): |
6284f1f2 | 75 | def __init__(self, url, keytab=None): |
a0dc7d43 | 76 | self.url = url |
6284f1f2 MT |
77 | |
78 | # Store path to keytab | |
b6258da2 | 79 | self.keytab = keytab |
96a8f2a4 MT |
80 | |
81 | # Initialise the HTTP client | |
a0dc7d43 | 82 | self.client = tornado.httpclient.AsyncHTTPClient() |
96a8f2a4 | 83 | |
a0dc7d43 | 84 | # XXX support proxies |
ec5917f3 | 85 | |
de84c866 MT |
86 | # Fetch a TGT with the given keytab |
87 | if self.keytab: | |
88 | self._setup_credentials_cache() | |
89 | ||
90 | self._fetch_kerberos_ticket() | |
91 | ||
92 | def _setup_credentials_cache(self): | |
93 | """ | |
94 | Create a temporary file to be used as Kerberos credentials cache | |
95 | """ | |
96 | self.credentials_cache = tempfile.NamedTemporaryFile() | |
97 | ||
98 | os.environ["KRB5CCNAME"] = self.credentials_cache.name | |
99 | ||
100 | def _fetch_kerberos_ticket(self): | |
101 | command = ["kinit", "-k", "-t", self.keytab] | |
102 | ||
103 | p = subprocess.run(command, check=True, capture_output=True, text=True) | |
104 | ||
1479df35 MT |
105 | async def _socket(self, path, **kwargs): |
106 | return await self._request("GET", path, | |
107 | ||
108 | # Enable websocket and ping once every ten seconds | |
109 | websocket=True, | |
110 | websocket_ping_interval=10, | |
111 | websocket_ping_timeout=60, | |
112 | ||
113 | **kwargs, | |
114 | ) | |
115 | ||
116 | async def _request(self, method, path, websocket=False, websocket_ping_interval=None, | |
117 | websocket_ping_timeout=None, authenticate=True, | |
24763c22 | 118 | body=None, body_producer=None, on_message_callback=None, **kwargs): |
6284f1f2 | 119 | headers = {} |
966f54c8 | 120 | query_args = {} |
6284f1f2 | 121 | |
a0dc7d43 MT |
122 | # Make absolute URL |
123 | url = urllib.parse.urljoin(self.url, path) | |
96a8f2a4 | 124 | |
24763c22 MT |
125 | # Change scheme for websocket |
126 | if websocket and url.startswith("https://"): | |
127 | url = url.replace("https://", "wss://") | |
128 | ||
966f54c8 MT |
129 | # Filter all query arguments |
130 | for arg in kwargs: | |
131 | # Skip anything that is None | |
132 | if kwargs[arg] is None: | |
133 | continue | |
134 | ||
135 | # Add to query arguments | |
136 | query_args[arg] = kwargs[arg] | |
137 | ||
138 | # Encode query arguments | |
139 | query_args = urllib.parse.urlencode(query_args, doseq=True) | |
6a577fa0 | 140 | |
a0dc7d43 | 141 | # Add query arguments |
21a47e81 | 142 | if method in ("GET", "PUT", "DELETE"): |
6a577fa0 MT |
143 | url = "%s?%s" % (url, query_args) |
144 | ||
145 | # Add any arguments to the body | |
146 | elif method == "POST": | |
147 | if body is None: | |
148 | body = query_args | |
96a8f2a4 | 149 | |
6284f1f2 MT |
150 | # Perform Kerberos authentication |
151 | if authenticate: | |
152 | krb5_context = self._setup_krb5_context(url) | |
153 | ||
24622c91 MT |
154 | # Fetch the Kerberos client response |
155 | krb5_client_response = kerberos.authGSSClientResponse(krb5_context) | |
156 | ||
6284f1f2 MT |
157 | # Set the Negotiate header |
158 | headers |= { | |
24622c91 | 159 | "Authorization" : "Negotiate %s" % krb5_client_response, |
6284f1f2 MT |
160 | } |
161 | ||
a0dc7d43 MT |
162 | # Make the request |
163 | req = tornado.httpclient.HTTPRequest( | |
6284f1f2 | 164 | method=method, url=url, headers=headers, body=body, |
ec5917f3 | 165 | |
1dd799f3 MT |
166 | # Give the server more time to respond |
167 | request_timeout=60, | |
168 | ||
6284f1f2 | 169 | # Add all the rest |
a0dc7d43 MT |
170 | body_producer=body_producer, |
171 | ) | |
172 | ||
24763c22 MT |
173 | # Is this a web socket request? |
174 | if websocket: | |
175 | return await tornado.websocket.websocket_connect( | |
176 | req, | |
1479df35 MT |
177 | ping_interval=websocket_ping_interval, |
178 | ping_timeout=websocket_ping_timeout, | |
24763c22 MT |
179 | on_message_callback=on_message_callback, |
180 | ) | |
181 | ||
a0dc7d43 | 182 | # Send the request and wait for a response |
1dd799f3 MT |
183 | try: |
184 | res = await self.client.fetch(req) | |
a0dc7d43 | 185 | |
1dd799f3 MT |
186 | # Catch any HTTP errors |
187 | except tornado.httpclient.HTTPError as e: | |
188 | if e.code in (502, 503): | |
189 | raise TemporaryConnectionError from e | |
190 | ||
191 | # Re-raise anything else | |
192 | raise e | |
a0dc7d43 | 193 | |
24622c91 MT |
194 | # Perform mutual authentication |
195 | if authenticate: | |
196 | for header in res.headers.get_list("WWW-Authenticate"): | |
197 | # Skip anything that isn't a Negotiate header | |
198 | if not header.startswith("Negotiate "): | |
199 | continue | |
200 | ||
201 | # Fetch the server response | |
202 | krb5_server_response = header.removeprefix("Negotiate ") | |
203 | ||
204 | # Validate the server response | |
205 | result = kerberos.authGSSClientStep(krb5_context, krb5_server_response) | |
206 | if not result == kerberos.AUTH_GSS_COMPLETE: | |
252fa19c | 207 | raise AuthError("Could not verify the Kerberos server response") |
24622c91 MT |
208 | |
209 | log.debug("Kerberos Server Response validating succeeded") | |
210 | ||
211 | # Call this so that we won't end in the else block | |
212 | break | |
213 | ||
214 | # If there were no headers | |
215 | else: | |
252fa19c | 216 | raise AuthError("Mutual authentication failed") |
24622c91 | 217 | |
a0dc7d43 MT |
218 | # Decode JSON response |
219 | if res.body: | |
220 | return json.loads(res.body) | |
221 | ||
222 | # Empty response | |
223 | return {} | |
ec5917f3 | 224 | |
1479df35 MT |
225 | async def _proxy(self, cls, *args, **kwargs): |
226 | conn = cls(self, *args, **kwargs) | |
227 | ||
228 | # Create the initial connection | |
229 | await conn.reconnect() | |
230 | ||
231 | return conn | |
232 | ||
6284f1f2 MT |
233 | def _setup_krb5_context(self, url): |
234 | """ | |
235 | Creates the Kerberos context that can be used to perform client | |
236 | authentication against the server, and mutual authentication for the server. | |
237 | """ | |
238 | # Parse the input URL | |
239 | url = urllib.parse.urlparse(url) | |
240 | ||
241 | # Create a new client context | |
242 | result, krb5_context = kerberos.authGSSClientInit("HTTP@%s" % url.hostname) | |
243 | ||
244 | if not result == kerberos.AUTH_GSS_COMPLETE: | |
252fa19c | 245 | raise AuthError("Could not create Kerberos Client context") |
6284f1f2 MT |
246 | |
247 | # Next step... | |
248 | try: | |
249 | result = kerberos.authGSSClientStep(krb5_context, "") | |
250 | ||
251 | except kerberos.GSSError as e: | |
252 | log.error("Kerberos authentication failed: %s" % e) | |
252fa19c MT |
253 | |
254 | raise AuthError("%s" % e) from e | |
6284f1f2 MT |
255 | |
256 | if not result == kerberos.AUTH_GSS_CONTINUE: | |
252fa19c | 257 | raise AuthError("Cloud not continue Kerberos authentication") |
6284f1f2 MT |
258 | |
259 | return krb5_context | |
260 | ||
3cafdb83 MT |
261 | # Uploads |
262 | ||
dec08a5c | 263 | async def upload(self, path, filename=None, show_progress=True): |
a0dc7d43 MT |
264 | """ |
265 | Uploads the file to the hub returning the upload ID | |
266 | """ | |
267 | log.debug("Uploading %s..." % path) | |
268 | ||
269 | # Use the basename of the file if no name was given | |
270 | if filename is None: | |
271 | filename = os.path.basename(path) | |
272 | ||
273 | # Determine the filesize | |
274 | size = os.path.getsize(path) | |
275 | ||
dec08a5c MT |
276 | # Make progressbar |
277 | if show_progress: | |
364b7040 MT |
278 | p = progressbar.ProgressBar( |
279 | max_value=size, | |
280 | widgets=[ | |
281 | progressbar.FormatCustomText(_("Uploading %s") % filename), | |
282 | progressbar.Percentage(), | |
283 | progressbar.Bar(), | |
284 | progressbar.FileTransferSpeed(), | |
285 | progressbar.DataSize(), | |
286 | progressbar.AdaptiveETA(), | |
287 | ], | |
288 | ) | |
dec08a5c MT |
289 | else: |
290 | p = None | |
291 | ||
a0dc7d43 MT |
292 | # Compute a digest |
293 | digest = self._compute_digest("blake2b", path) | |
294 | ||
1dd799f3 MT |
295 | while True: |
296 | # Prepare the file for streaming | |
297 | body_producer = functools.partial(self._stream_file, path, size, p) | |
a0dc7d43 | 298 | |
1dd799f3 MT |
299 | # Perform upload |
300 | try: | |
301 | response = await self._request("PUT", "/api/v1/uploads", | |
302 | body_producer=body_producer, | |
303 | filename=filename, size=size, digest=digest | |
304 | ) | |
305 | ||
306 | # On temporary issues, try again after a few seconds | |
307 | except TemporaryConnectionError as e: | |
a67d9b7f | 308 | await asyncio.sleep(5) |
1dd799f3 MT |
309 | |
310 | else: | |
311 | break | |
a0dc7d43 MT |
312 | |
313 | # Return the upload ID | |
314 | return response.get("id") | |
315 | ||
21a47e81 | 316 | async def delete_upload(self, upload_id): |
1479df35 | 317 | await self._request("DELETE", "/api/v1/uploads/%s" % upload_id) |
21a47e81 | 318 | |
688e6846 MT |
319 | async def upload_multi(self, *paths, show_progress=True): |
320 | """ | |
321 | Upload multiple files | |
322 | ||
323 | If one file could not be uploaded, any other uploads will be deleted | |
324 | """ | |
325 | uploads = [] | |
326 | ||
327 | # Upload everything | |
328 | try: | |
329 | for path in paths: | |
330 | upload = await self.upload(path, show_progress=show_progress) | |
331 | ||
332 | # Store the upload ID | |
333 | uploads.append(upload) | |
334 | ||
335 | except Exception as e: | |
336 | # Remove any previous uploads | |
337 | await asyncio.gather( | |
338 | *(self.delete_upload(upload) for upload in uploads), | |
339 | ) | |
340 | ||
341 | # Raise the exception | |
342 | raise e | |
343 | ||
344 | # Return the IDs of the uploads | |
345 | return uploads | |
346 | ||
a0dc7d43 | 347 | @staticmethod |
dec08a5c | 348 | def _stream_file(path, size, p, write): |
dec08a5c MT |
349 | try: |
350 | with open(path, "rb") as f: | |
351 | while True: | |
352 | buf = f.read(64 * 1024) | |
353 | if not buf: | |
354 | break | |
355 | ||
356 | # Update progressbar | |
357 | if p: | |
358 | l = len(buf) | |
359 | p.increment(l) | |
360 | ||
361 | write(buf) | |
362 | finally: | |
363 | # Finish the progressbar | |
364 | if p: | |
365 | p.finish() | |
a0dc7d43 MT |
366 | |
367 | @staticmethod | |
368 | def _compute_digest(algo, path): | |
369 | h = hashlib.new(algo) | |
96a8f2a4 | 370 | |
a0dc7d43 MT |
371 | with open(path, "rb") as f: |
372 | while True: | |
373 | buf = f.read(64 * 1024) | |
374 | if not buf: | |
375 | break | |
96a8f2a4 | 376 | |
a0dc7d43 | 377 | h.update(buf) |
96a8f2a4 | 378 | |
a0dc7d43 | 379 | return "%s:%s" % (algo, h.hexdigest()) |
da170b36 | 380 | |
1479df35 | 381 | @staticmethod |
28041406 | 382 | def _decode_json_message(message): |
1479df35 MT |
383 | """ |
384 | Takes a received message and decodes it. | |
385 | ||
386 | It will then call the callback with the decoded message. | |
387 | """ | |
388 | # Ignore empty messages | |
389 | if message is None: | |
390 | return | |
391 | ||
392 | try: | |
393 | message = json.loads(message) | |
394 | except json.JSONDecodeError: | |
395 | log.error("Could not decode JSON message:\n%s" % message) | |
396 | return | |
397 | ||
28041406 MT |
398 | return message |
399 | ||
400 | # Builder | |
401 | ||
402 | async def control(self, *args, **kwargs): | |
403 | """ | |
404 | Creates a control connection | |
405 | """ | |
406 | return await self._proxy(ControlConnection, *args, **kwargs) | |
1479df35 | 407 | |
4f872b98 | 408 | async def job(self, *args, **kwargs): |
1479df35 | 409 | """ |
28041406 | 410 | Creates a control connection for a certain job |
1479df35 | 411 | """ |
4f872b98 | 412 | return await self._proxy(JobControlConnection, *args, **kwargs) |
1479df35 MT |
413 | |
414 | ||
415 | class HubObject(object): | |
416 | # Disable Nagle's algorithm? | |
417 | nodelay = False | |
418 | ||
28041406 | 419 | def __init__(self, hub, *args, **kwargs): |
1479df35 MT |
420 | self.hub = hub |
421 | ||
422 | # The active connection | |
423 | self.conn = None | |
424 | ||
a04ccbf5 MT |
425 | # Callbacks |
426 | self.callbacks = {} | |
427 | ||
1479df35 MT |
428 | # Perform custom initialization |
429 | self.init(*args, **kwargs) | |
430 | ||
431 | def init(self, *args, **kwargs): | |
432 | pass | |
433 | ||
434 | @property | |
435 | def url(self): | |
436 | raise NotImplementedError | |
437 | ||
438 | async def connect(self): | |
439 | """ | |
440 | This will create a connection | |
441 | """ | |
442 | conn = await self.hub._socket(self.url, | |
443 | on_message_callback=self.on_message_callback) | |
444 | ||
445 | # Disable Nagle's algorithm | |
446 | if self.nodelay: | |
447 | conn.set_nodelay(True) | |
448 | ||
449 | return conn | |
450 | ||
451 | async def reconnect(self): | |
452 | """ | |
453 | Tries to reconnect for forever | |
454 | """ | |
455 | attempts = 0 | |
456 | ||
457 | while True: | |
458 | attempts += 1 | |
459 | ||
460 | log.debug("Trying to reconnect (attempt %s)..." % attempts) | |
461 | ||
462 | try: | |
463 | self.conn = await self.connect() | |
464 | ||
465 | # The web service responded with some error | |
466 | except tornado.httpclient.HTTPClientError as e: | |
467 | log.error("%s: Received HTTP Error %s" % (self.url, e.code)) | |
468 | ||
2eb22ec1 MT |
469 | # 502 - Proxy Error |
470 | # 503 - Service Unavailable | |
471 | if e.code in (502, 503): | |
1479df35 MT |
472 | await asyncio.sleep(10) |
473 | ||
474 | # Raise any unhandled errors | |
475 | else: | |
476 | raise e | |
477 | ||
478 | # The web service did not respond in time | |
479 | except tornado.simple_httpclient.HTTPTimeoutError as e: | |
480 | await asyncio.sleep(30) | |
481 | ||
482 | # Raise all other exceptions | |
483 | except Exception as e: | |
484 | raise e | |
485 | ||
486 | # If the connection was established successfully, we return | |
487 | else: | |
488 | return | |
489 | ||
9216b623 MT |
490 | def close(self): |
491 | """ | |
492 | Closes the connection | |
493 | """ | |
494 | if self.conn: | |
495 | self.conn.close() | |
496 | ||
a04ccbf5 MT |
497 | def on_message_callback(self, message): |
498 | # Fail if no callbacks have been set | |
499 | if not self.callbacks: | |
500 | raise NotImplementedError | |
501 | ||
502 | # Decode the message | |
503 | message = self.hub._decode_json_message(message) | |
504 | ||
505 | # Ignore empty messages | |
506 | if message is None: | |
507 | return | |
508 | ||
509 | # Log the received message | |
510 | log.debug("Received message:\n%s" % json.dumps(message, indent=4)) | |
511 | ||
512 | # Fetch the message type & data | |
513 | type = message.get("type") | |
514 | data = message.get("data") | |
515 | ||
516 | # Find a suitable callback | |
517 | try: | |
518 | callback = self.callbacks[type] | |
519 | ||
520 | # Log an error for unknown messages and ignore them | |
521 | except KeyError: | |
522 | log.error("Received message of unknown type '%s'" % type) | |
523 | return | |
524 | ||
525 | # Call the callback | |
526 | callback(data) | |
9216b623 | 527 | |
1479df35 MT |
528 | async def write_message(self, message, **kwargs): |
529 | """ | |
530 | Sends a message but encodes it into JSON first | |
531 | """ | |
532 | # This should never happen | |
533 | if not self.conn: | |
534 | raise RuntimeError("Not connected") | |
535 | ||
536 | if isinstance(message, dict): | |
537 | message = tornado.escape.json_encode(message) | |
538 | ||
539 | try: | |
540 | return await self.conn.write_message(message, **kwargs) | |
da170b36 | 541 | |
1479df35 MT |
542 | except tornado.websocket.WebSocketClosedError as e: |
543 | # Try to reconnect | |
544 | await self.reconnect() | |
da170b36 | 545 | |
1479df35 MT |
546 | # Try to send the message again |
547 | return await self.write_message(message, **kwargs) | |
548 | ||
549 | ||
9216b623 MT |
550 | class ControlConnection(HubObject): |
551 | url = "/api/v1/builders/control" | |
1479df35 | 552 | |
9216b623 MT |
553 | def init(self, daemon): |
554 | self.daemon = daemon | |
1479df35 | 555 | |
a04ccbf5 MT |
556 | # Callbacks |
557 | self.callbacks = { | |
558 | "job" : self.daemon.job_received, | |
559 | } | |
560 | ||
1479df35 MT |
561 | # Fetch processor information |
562 | self.cpu = cpuinfo.get_cpu_info() | |
563 | ||
564 | # Fetch the native architecture | |
565 | self.native_arch = _pakfire.native_arch() | |
566 | ||
9216b623 MT |
567 | async def submit_stats(self): |
568 | """ | |
569 | Sends stats about this builder | |
570 | """ | |
571 | log.debug("Sending stats...") | |
da170b36 MT |
572 | |
573 | # Fetch processor information | |
1479df35 | 574 | cpu_times = psutil.cpu_times_percent() |
da170b36 MT |
575 | |
576 | # Fetch memory/swap information | |
577 | mem = psutil.virtual_memory() | |
578 | swap = psutil.swap_memory() | |
579 | ||
580 | # Fetch load average | |
581 | loadavg = psutil.getloadavg() | |
582 | ||
1479df35 | 583 | await self.write_message({ |
28041406 MT |
584 | "type" : "stats", |
585 | "data" : { | |
586 | # CPU info | |
e9a944c7 | 587 | "cpu_model" : self.cpu.get("brand"), |
28041406 MT |
588 | "cpu_count" : self.cpu.get("count"), |
589 | "cpu_arch" : self.native_arch, | |
590 | ||
591 | # Pakfire + OS | |
592 | "pakfire_version" : PAKFIRE_VERSION, | |
593 | "os_name" : util.get_distro_name(), | |
594 | ||
595 | # CPU Times | |
596 | "cpu_user" : cpu_times.user, | |
597 | "cpu_nice" : cpu_times.nice, | |
598 | "cpu_system" : cpu_times.system, | |
599 | "cpu_idle" : cpu_times.idle, | |
600 | "cpu_iowait" : cpu_times.iowait, | |
601 | "cpu_irq" : cpu_times.irq, | |
602 | "cpu_softirq" : cpu_times.softirq, | |
603 | "cpu_steal" : cpu_times.steal, | |
604 | "cpu_guest" : cpu_times.guest, | |
605 | "cpu_guest_nice" : cpu_times.guest_nice, | |
606 | ||
607 | # Load average | |
608 | "loadavg1" : loadavg[0], | |
609 | "loadavg5" : loadavg[1], | |
610 | "loadavg15" : loadavg[2], | |
611 | ||
612 | # Memory | |
613 | "mem_total" : mem.total, | |
614 | "mem_available" : mem.available, | |
615 | "mem_used" : mem.used, | |
616 | "mem_free" : mem.free, | |
617 | "mem_active" : mem.active, | |
618 | "mem_inactive" : mem.inactive, | |
619 | "mem_buffers" : mem.buffers, | |
620 | "mem_cached" : mem.cached, | |
621 | "mem_shared" : mem.shared, | |
622 | ||
623 | # Swap | |
624 | "swap_total" : swap.total, | |
625 | "swap_used" : swap.used, | |
626 | "swap_free" : swap.free, | |
627 | }, | |
1479df35 | 628 | }) |
76960e90 | 629 | |
2465b05c | 630 | |
28041406 | 631 | class JobControlConnection(HubObject): |
1479df35 MT |
632 | """ |
633 | Proxy for Build Jobs | |
634 | """ | |
a04ccbf5 | 635 | def init(self, id, worker): |
1479df35 | 636 | self.id = id |
76960e90 | 637 | |
a04ccbf5 | 638 | # Callbacks |
4f872b98 | 639 | self.callbacks = { |
a04ccbf5 | 640 | "abort" : worker.abort, |
4f872b98 MT |
641 | } |
642 | ||
1479df35 MT |
643 | @property |
644 | def url(self): | |
645 | return "/api/v1/jobs/%s" % self.id | |
c8d43da3 | 646 | |
1479df35 | 647 | async def finished(self, success, packages=None, logfile=None): |
2dc6117b MT |
648 | """ |
649 | Will tell the hub that a job has finished | |
650 | """ | |
651 | # Upload the log file | |
1479df35 MT |
652 | if logfile: |
653 | logfile = await self.hub.upload(logfile, filename="%s.log" % self.id) | |
2dc6117b | 654 | |
688e6846 MT |
655 | # Upload the packages |
656 | if packages: | |
1479df35 | 657 | packages = await self.hub.upload_multi(*packages) |
688e6846 | 658 | |
1dd799f3 MT |
659 | while True: |
660 | try: | |
661 | # Send the request | |
662 | response = await self.hub._request("POST", "/api/v1/jobs/%s/finished" % self.id, | |
663 | success="1" if success else "0", logfile=logfile, packages=packages, | |
664 | ) | |
665 | ||
666 | # Try again after a short moment on connection errors | |
667 | except TemporaryConnectionError as e: | |
668 | await asyncio.sleep(5) | |
669 | ||
670 | else: | |
671 | break | |
2dc6117b MT |
672 | |
673 | # Handle the response | |
674 | # XXX TODO | |
675 | ||
1479df35 | 676 | async def log(self, timestamp, level, message): |
c8d43da3 MT |
677 | """ |
678 | Sends a log message to the hub | |
679 | """ | |
1479df35 | 680 | await self.write_message({ |
c21e39c1 MT |
681 | "type" : "log", |
682 | "data" : { | |
683 | "timestamp" : timestamp, | |
684 | "level" : level, | |
fae7d9b8 | 685 | "message" : message, |
c21e39c1 | 686 | }, |
c8d43da3 | 687 | }) |