]> git.ipfire.org Git - pakfire.git/blame - src/pakfire/hub.py
libpakfire: build: Use better return codes
[pakfire.git] / src / pakfire / hub.py
CommitLineData
96a8f2a4
MT
1#!/usr/bin/python3
2###############################################################################
3# #
4# Pakfire - The IPFire package management system #
5# Copyright (C) 2013 Pakfire development team #
6# #
7# This program is free software: you can redistribute it and/or modify #
8# it under the terms of the GNU General Public License as published by #
9# the Free Software Foundation, either version 3 of the License, or #
10# (at your option) any later version. #
11# #
12# This program is distributed in the hope that it will be useful, #
13# but WITHOUT ANY WARRANTY; without even the implied warranty of #
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
15# GNU General Public License for more details. #
16# #
17# You should have received a copy of the GNU General Public License #
18# along with this program. If not, see <http://www.gnu.org/licenses/>. #
19# #
20###############################################################################
21
1479df35 22import asyncio
da170b36 23import cpuinfo
a0dc7d43 24import functools
96a8f2a4 25import hashlib
a0dc7d43 26import json
6284f1f2 27import kerberos
96a8f2a4 28import logging
96a8f2a4 29import os.path
da170b36 30import psutil
1479df35
MT
31import subprocess
32import tempfile
c8d43da3 33import tornado.escape
a0dc7d43 34import tornado.httpclient
1479df35 35import tornado.simple_httpclient
24763c22 36import tornado.websocket
a0dc7d43 37import urllib.parse
96a8f2a4 38
dec08a5c 39from . import _pakfire
7899002d 40from . import util
96a8f2a4 41from .constants import *
dec08a5c 42from .i18n import _
96a8f2a4 43
a0dc7d43 44# Setup logging
96a8f2a4 45log = logging.getLogger("pakfire.hub")
a0dc7d43
MT
46
47# Configure some useful defaults for all requests
48tornado.httpclient.AsyncHTTPClient.configure(
49 None, defaults = {
50 "user_agent" : "pakfire/%s" % PAKFIRE_VERSION,
51 },
52)
96a8f2a4 53
252fa19c
MT
54class AuthError(Exception):
55 """
56 Raised when the client could not authenticate against the hub
57 """
58 pass
59
60
96a8f2a4 61class Hub(object):
6284f1f2 62 def __init__(self, url, keytab=None):
a0dc7d43 63 self.url = url
6284f1f2
MT
64
65 # Store path to keytab
b6258da2 66 self.keytab = keytab
96a8f2a4
MT
67
68 # Initialise the HTTP client
a0dc7d43 69 self.client = tornado.httpclient.AsyncHTTPClient()
96a8f2a4 70
a0dc7d43 71 # XXX support proxies
ec5917f3 72
de84c866
MT
73 # Fetch a TGT with the given keytab
74 if self.keytab:
75 self._setup_credentials_cache()
76
77 self._fetch_kerberos_ticket()
78
79 def _setup_credentials_cache(self):
80 """
81 Create a temporary file to be used as Kerberos credentials cache
82 """
83 self.credentials_cache = tempfile.NamedTemporaryFile()
84
85 os.environ["KRB5CCNAME"] = self.credentials_cache.name
86
87 def _fetch_kerberos_ticket(self):
88 command = ["kinit", "-k", "-t", self.keytab]
89
90 p = subprocess.run(command, check=True, capture_output=True, text=True)
91
1479df35
MT
92 async def _socket(self, path, **kwargs):
93 return await self._request("GET", path,
94
95 # Enable websocket and ping once every ten seconds
96 websocket=True,
97 websocket_ping_interval=10,
98 websocket_ping_timeout=60,
99
100 **kwargs,
101 )
102
103 async def _request(self, method, path, websocket=False, websocket_ping_interval=None,
104 websocket_ping_timeout=None, authenticate=True,
24763c22 105 body=None, body_producer=None, on_message_callback=None, **kwargs):
6284f1f2 106 headers = {}
966f54c8 107 query_args = {}
6284f1f2 108
a0dc7d43
MT
109 # Make absolute URL
110 url = urllib.parse.urljoin(self.url, path)
96a8f2a4 111
24763c22
MT
112 # Change scheme for websocket
113 if websocket and url.startswith("https://"):
114 url = url.replace("https://", "wss://")
115
966f54c8
MT
116 # Filter all query arguments
117 for arg in kwargs:
118 # Skip anything that is None
119 if kwargs[arg] is None:
120 continue
121
122 # Add to query arguments
123 query_args[arg] = kwargs[arg]
124
125 # Encode query arguments
126 query_args = urllib.parse.urlencode(query_args, doseq=True)
6a577fa0 127
a0dc7d43 128 # Add query arguments
21a47e81 129 if method in ("GET", "PUT", "DELETE"):
6a577fa0
MT
130 url = "%s?%s" % (url, query_args)
131
132 # Add any arguments to the body
133 elif method == "POST":
134 if body is None:
135 body = query_args
96a8f2a4 136
6284f1f2
MT
137 # Perform Kerberos authentication
138 if authenticate:
139 krb5_context = self._setup_krb5_context(url)
140
24622c91
MT
141 # Fetch the Kerberos client response
142 krb5_client_response = kerberos.authGSSClientResponse(krb5_context)
143
6284f1f2
MT
144 # Set the Negotiate header
145 headers |= {
24622c91 146 "Authorization" : "Negotiate %s" % krb5_client_response,
6284f1f2
MT
147 }
148
a0dc7d43
MT
149 # Make the request
150 req = tornado.httpclient.HTTPRequest(
6284f1f2 151 method=method, url=url, headers=headers, body=body,
ec5917f3 152
6284f1f2 153 # Add all the rest
a0dc7d43
MT
154 body_producer=body_producer,
155 )
156
24763c22
MT
157 # Is this a web socket request?
158 if websocket:
159 return await tornado.websocket.websocket_connect(
160 req,
1479df35
MT
161 ping_interval=websocket_ping_interval,
162 ping_timeout=websocket_ping_timeout,
24763c22
MT
163 on_message_callback=on_message_callback,
164 )
165
a0dc7d43
MT
166 # Send the request and wait for a response
167 res = await self.client.fetch(req)
168
169 # XXX Do we have to catch any errors here?
170
24622c91
MT
171 # Perform mutual authentication
172 if authenticate:
173 for header in res.headers.get_list("WWW-Authenticate"):
174 # Skip anything that isn't a Negotiate header
175 if not header.startswith("Negotiate "):
176 continue
177
178 # Fetch the server response
179 krb5_server_response = header.removeprefix("Negotiate ")
180
181 # Validate the server response
182 result = kerberos.authGSSClientStep(krb5_context, krb5_server_response)
183 if not result == kerberos.AUTH_GSS_COMPLETE:
252fa19c 184 raise AuthError("Could not verify the Kerberos server response")
24622c91
MT
185
186 log.debug("Kerberos Server Response validating succeeded")
187
188 # Call this so that we won't end in the else block
189 break
190
191 # If there were no headers
192 else:
252fa19c 193 raise AuthError("Mutual authentication failed")
24622c91 194
a0dc7d43
MT
195 # Decode JSON response
196 if res.body:
197 return json.loads(res.body)
198
199 # Empty response
200 return {}
ec5917f3 201
1479df35
MT
202 async def _proxy(self, cls, *args, **kwargs):
203 conn = cls(self, *args, **kwargs)
204
205 # Create the initial connection
206 await conn.reconnect()
207
208 return conn
209
6284f1f2
MT
210 def _setup_krb5_context(self, url):
211 """
212 Creates the Kerberos context that can be used to perform client
213 authentication against the server, and mutual authentication for the server.
214 """
215 # Parse the input URL
216 url = urllib.parse.urlparse(url)
217
218 # Create a new client context
219 result, krb5_context = kerberos.authGSSClientInit("HTTP@%s" % url.hostname)
220
221 if not result == kerberos.AUTH_GSS_COMPLETE:
252fa19c 222 raise AuthError("Could not create Kerberos Client context")
6284f1f2
MT
223
224 # Next step...
225 try:
226 result = kerberos.authGSSClientStep(krb5_context, "")
227
228 except kerberos.GSSError as e:
229 log.error("Kerberos authentication failed: %s" % e)
252fa19c
MT
230
231 raise AuthError("%s" % e) from e
6284f1f2
MT
232
233 if not result == kerberos.AUTH_GSS_CONTINUE:
252fa19c 234 raise AuthError("Cloud not continue Kerberos authentication")
6284f1f2
MT
235
236 return krb5_context
237
96a8f2a4
MT
238 # Test functions
239
6284f1f2 240 async def test(self):
96a8f2a4
MT
241 """
242 Tests the connection
243 """
6284f1f2 244 return await self._request("GET", "/test")
96a8f2a4
MT
245
246 # Build actions
247
248 def get_build(self, uuid):
1479df35 249 return self._request("/api/v1/builds/%s" % uuid, decode="json")
96a8f2a4 250
62d5a42f
MT
251 async def build(self, path, repo=None, arches=None,
252 disable_test_builds=False):
96a8f2a4
MT
253 """
254 Create a new build on the hub
255 """
dcc48721
MT
256 # Upload the souce file to the server
257 upload_id = await self.upload(path)
96a8f2a4 258
dcc48721 259 log.debug("%s has been uploaded as %s" % (path, upload_id))
96a8f2a4 260
dcc48721 261 # Create a new build
1479df35 262 build_id = await self._request("POST", "/api/v1/builds",
62d5a42f
MT
263 upload_id=upload_id,
264 repo=repo,
265 arches=arches,
266 disable_test_builds=disable_test_builds,
267 )
96a8f2a4 268
dcc48721 269 log.debug("Build creates as %s" % build_id)
96a8f2a4 270
dcc48721 271 return build_id
96a8f2a4
MT
272
273 # Job actions
274
275 def get_job(self, uuid):
276 return self._request("/jobs/%s" % uuid, decode="json")
277
278 # Package actions
279
280 def get_package(self, uuid):
281 return self._request("/packages/%s" % uuid, decode="json")
282
3cafdb83
MT
283 # Uploads
284
285 async def list_uploads(self):
286 """
287 Returns a list of all uploads
288 """
1479df35 289 response = await self._request("GET", "/api/v1/uploads")
3cafdb83
MT
290
291 return response.get("uploads")
96a8f2a4 292
dec08a5c 293 async def upload(self, path, filename=None, show_progress=True):
a0dc7d43
MT
294 """
295 Uploads the file to the hub returning the upload ID
296 """
297 log.debug("Uploading %s..." % path)
298
299 # Use the basename of the file if no name was given
300 if filename is None:
301 filename = os.path.basename(path)
302
303 # Determine the filesize
304 size = os.path.getsize(path)
305
dec08a5c
MT
306 # Make progressbar
307 if show_progress:
308 p = _pakfire.Progressbar()
309 p.add_string(_("Uploading %s") % filename)
310 p.add_percentage()
311 p.add_bar()
312 p.add_transfer_speed()
313 p.add_string("|")
314 p.add_bytes_transferred()
315 p.add_eta()
316 else:
317 p = None
318
a0dc7d43
MT
319 # Compute a digest
320 digest = self._compute_digest("blake2b", path)
321
322 # Prepare the file for streaming
dec08a5c 323 body_producer = functools.partial(self._stream_file, path, size, p)
a0dc7d43
MT
324
325 # Perform upload
1479df35 326 response = await self._request("PUT", "/api/v1/uploads",
a0dc7d43
MT
327 body_producer=body_producer,
328 filename=filename, size=size, digest=digest
329 )
330
331 # Return the upload ID
332 return response.get("id")
333
21a47e81 334 async def delete_upload(self, upload_id):
1479df35 335 await self._request("DELETE", "/api/v1/uploads/%s" % upload_id)
21a47e81 336
688e6846
MT
337 async def upload_multi(self, *paths, show_progress=True):
338 """
339 Upload multiple files
340
341 If one file could not be uploaded, any other uploads will be deleted
342 """
343 uploads = []
344
345 # Upload everything
346 try:
347 for path in paths:
348 upload = await self.upload(path, show_progress=show_progress)
349
350 # Store the upload ID
351 uploads.append(upload)
352
353 except Exception as e:
354 # Remove any previous uploads
355 await asyncio.gather(
356 *(self.delete_upload(upload) for upload in uploads),
357 )
358
359 # Raise the exception
360 raise e
361
362 # Return the IDs of the uploads
363 return uploads
364
a0dc7d43 365 @staticmethod
dec08a5c
MT
366 def _stream_file(path, size, p, write):
367 # Start the progressbar
368 if p:
369 p.start(size)
370
371 try:
372 with open(path, "rb") as f:
373 while True:
374 buf = f.read(64 * 1024)
375 if not buf:
376 break
377
378 # Update progressbar
379 if p:
380 l = len(buf)
381 p.increment(l)
382
383 write(buf)
384 finally:
385 # Finish the progressbar
386 if p:
387 p.finish()
a0dc7d43
MT
388
389 @staticmethod
390 def _compute_digest(algo, path):
391 h = hashlib.new(algo)
96a8f2a4 392
a0dc7d43
MT
393 with open(path, "rb") as f:
394 while True:
395 buf = f.read(64 * 1024)
396 if not buf:
397 break
96a8f2a4 398
a0dc7d43 399 h.update(buf)
96a8f2a4 400
a0dc7d43 401 return "%s:%s" % (algo, h.hexdigest())
da170b36 402
1479df35 403 @staticmethod
28041406 404 def _decode_json_message(message):
1479df35
MT
405 """
406 Takes a received message and decodes it.
407
408 It will then call the callback with the decoded message.
409 """
410 # Ignore empty messages
411 if message is None:
412 return
413
414 try:
415 message = json.loads(message)
416 except json.JSONDecodeError:
417 log.error("Could not decode JSON message:\n%s" % message)
418 return
419
28041406
MT
420 return message
421
422 # Builder
423
424 async def control(self, *args, **kwargs):
425 """
426 Creates a control connection
427 """
428 return await self._proxy(ControlConnection, *args, **kwargs)
1479df35 429
4f872b98 430 async def job(self, *args, **kwargs):
1479df35 431 """
28041406 432 Creates a control connection for a certain job
1479df35 433 """
4f872b98 434 return await self._proxy(JobControlConnection, *args, **kwargs)
1479df35
MT
435
436
437class HubObject(object):
438 # Disable Nagle's algorithm?
439 nodelay = False
440
28041406 441 def __init__(self, hub, *args, **kwargs):
1479df35
MT
442 self.hub = hub
443
444 # The active connection
445 self.conn = None
446
a04ccbf5
MT
447 # Callbacks
448 self.callbacks = {}
449
1479df35
MT
450 # Perform custom initialization
451 self.init(*args, **kwargs)
452
453 def init(self, *args, **kwargs):
454 pass
455
456 @property
457 def url(self):
458 raise NotImplementedError
459
460 async def connect(self):
461 """
462 This will create a connection
463 """
464 conn = await self.hub._socket(self.url,
465 on_message_callback=self.on_message_callback)
466
467 # Disable Nagle's algorithm
468 if self.nodelay:
469 conn.set_nodelay(True)
470
471 return conn
472
473 async def reconnect(self):
474 """
475 Tries to reconnect for forever
476 """
477 attempts = 0
478
479 while True:
480 attempts += 1
481
482 log.debug("Trying to reconnect (attempt %s)..." % attempts)
483
484 try:
485 self.conn = await self.connect()
486
487 # The web service responded with some error
488 except tornado.httpclient.HTTPClientError as e:
489 log.error("%s: Received HTTP Error %s" % (self.url, e.code))
490
2eb22ec1
MT
491 # 502 - Proxy Error
492 # 503 - Service Unavailable
493 if e.code in (502, 503):
1479df35
MT
494 await asyncio.sleep(10)
495
496 # Raise any unhandled errors
497 else:
498 raise e
499
500 # The web service did not respond in time
501 except tornado.simple_httpclient.HTTPTimeoutError as e:
502 await asyncio.sleep(30)
503
504 # Raise all other exceptions
505 except Exception as e:
506 raise e
507
508 # If the connection was established successfully, we return
509 else:
510 return
511
9216b623
MT
512 def close(self):
513 """
514 Closes the connection
515 """
516 if self.conn:
517 self.conn.close()
518
a04ccbf5
MT
519 def on_message_callback(self, message):
520 # Fail if no callbacks have been set
521 if not self.callbacks:
522 raise NotImplementedError
523
524 # Decode the message
525 message = self.hub._decode_json_message(message)
526
527 # Ignore empty messages
528 if message is None:
529 return
530
531 # Log the received message
532 log.debug("Received message:\n%s" % json.dumps(message, indent=4))
533
534 # Fetch the message type & data
535 type = message.get("type")
536 data = message.get("data")
537
538 # Find a suitable callback
539 try:
540 callback = self.callbacks[type]
541
542 # Log an error for unknown messages and ignore them
543 except KeyError:
544 log.error("Received message of unknown type '%s'" % type)
545 return
546
547 # Call the callback
548 callback(data)
9216b623 549
1479df35
MT
550 async def write_message(self, message, **kwargs):
551 """
552 Sends a message but encodes it into JSON first
553 """
554 # This should never happen
555 if not self.conn:
556 raise RuntimeError("Not connected")
557
558 if isinstance(message, dict):
559 message = tornado.escape.json_encode(message)
560
561 try:
562 return await self.conn.write_message(message, **kwargs)
da170b36 563
1479df35
MT
564 except tornado.websocket.WebSocketClosedError as e:
565 # Try to reconnect
566 await self.reconnect()
da170b36 567
1479df35
MT
568 # Try to send the message again
569 return await self.write_message(message, **kwargs)
570
571
9216b623
MT
572class ControlConnection(HubObject):
573 url = "/api/v1/builders/control"
1479df35 574
9216b623
MT
575 def init(self, daemon):
576 self.daemon = daemon
1479df35 577
a04ccbf5
MT
578 # Callbacks
579 self.callbacks = {
580 "job" : self.daemon.job_received,
581 }
582
1479df35
MT
583 # Fetch processor information
584 self.cpu = cpuinfo.get_cpu_info()
585
586 # Fetch the native architecture
587 self.native_arch = _pakfire.native_arch()
588
9216b623
MT
589 async def submit_stats(self):
590 """
591 Sends stats about this builder
592 """
593 log.debug("Sending stats...")
da170b36
MT
594
595 # Fetch processor information
1479df35 596 cpu_times = psutil.cpu_times_percent()
da170b36
MT
597
598 # Fetch memory/swap information
599 mem = psutil.virtual_memory()
600 swap = psutil.swap_memory()
601
602 # Fetch load average
603 loadavg = psutil.getloadavg()
604
1479df35 605 await self.write_message({
28041406
MT
606 "type" : "stats",
607 "data" : {
608 # CPU info
e9a944c7 609 "cpu_model" : self.cpu.get("brand"),
28041406
MT
610 "cpu_count" : self.cpu.get("count"),
611 "cpu_arch" : self.native_arch,
612
613 # Pakfire + OS
614 "pakfire_version" : PAKFIRE_VERSION,
615 "os_name" : util.get_distro_name(),
616
617 # CPU Times
618 "cpu_user" : cpu_times.user,
619 "cpu_nice" : cpu_times.nice,
620 "cpu_system" : cpu_times.system,
621 "cpu_idle" : cpu_times.idle,
622 "cpu_iowait" : cpu_times.iowait,
623 "cpu_irq" : cpu_times.irq,
624 "cpu_softirq" : cpu_times.softirq,
625 "cpu_steal" : cpu_times.steal,
626 "cpu_guest" : cpu_times.guest,
627 "cpu_guest_nice" : cpu_times.guest_nice,
628
629 # Load average
630 "loadavg1" : loadavg[0],
631 "loadavg5" : loadavg[1],
632 "loadavg15" : loadavg[2],
633
634 # Memory
635 "mem_total" : mem.total,
636 "mem_available" : mem.available,
637 "mem_used" : mem.used,
638 "mem_free" : mem.free,
639 "mem_active" : mem.active,
640 "mem_inactive" : mem.inactive,
641 "mem_buffers" : mem.buffers,
642 "mem_cached" : mem.cached,
643 "mem_shared" : mem.shared,
644
645 # Swap
646 "swap_total" : swap.total,
647 "swap_used" : swap.used,
648 "swap_free" : swap.free,
649 },
1479df35 650 })
76960e90 651
2465b05c 652
28041406 653class JobControlConnection(HubObject):
1479df35
MT
654 """
655 Proxy for Build Jobs
656 """
a04ccbf5 657 def init(self, id, worker):
1479df35 658 self.id = id
76960e90 659
a04ccbf5 660 # Callbacks
4f872b98 661 self.callbacks = {
a04ccbf5 662 "abort" : worker.abort,
4f872b98
MT
663 }
664
1479df35
MT
665 @property
666 def url(self):
667 return "/api/v1/jobs/%s" % self.id
c8d43da3 668
1479df35 669 async def finished(self, success, packages=None, logfile=None):
2dc6117b
MT
670 """
671 Will tell the hub that a job has finished
672 """
673 # Upload the log file
1479df35
MT
674 if logfile:
675 logfile = await self.hub.upload(logfile, filename="%s.log" % self.id)
2dc6117b 676
688e6846
MT
677 # Upload the packages
678 if packages:
1479df35 679 packages = await self.hub.upload_multi(*packages)
688e6846 680
2dc6117b 681 # Send the request
85ebb85e
MT
682 response = await self.hub._request("POST", "/api/v1/jobs/%s/finished" % self.id,
683 success="1" if success else "0", logfile=logfile, packages=packages,
684 )
2dc6117b
MT
685
686 # Handle the response
687 # XXX TODO
688
1479df35 689 async def log(self, timestamp, level, message):
c8d43da3
MT
690 """
691 Sends a log message to the hub
692 """
1479df35 693 await self.write_message({
c21e39c1
MT
694 "type" : "log",
695 "data" : {
696 "timestamp" : timestamp,
697 "level" : level,
fae7d9b8 698 "message" : message,
c21e39c1 699 },
c8d43da3 700 })