]> git.ipfire.org Git - pakfire.git/blame - src/pakfire/hub.py
hub: Drop unused methods
[pakfire.git] / src / pakfire / hub.py
CommitLineData
96a8f2a4
MT
1#!/usr/bin/python3
2###############################################################################
3# #
4# Pakfire - The IPFire package management system #
5# Copyright (C) 2013 Pakfire development team #
6# #
7# This program is free software: you can redistribute it and/or modify #
8# it under the terms of the GNU General Public License as published by #
9# the Free Software Foundation, either version 3 of the License, or #
10# (at your option) any later version. #
11# #
12# This program is distributed in the hope that it will be useful, #
13# but WITHOUT ANY WARRANTY; without even the implied warranty of #
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
15# GNU General Public License for more details. #
16# #
17# You should have received a copy of the GNU General Public License #
18# along with this program. If not, see <http://www.gnu.org/licenses/>. #
19# #
20###############################################################################
21
1479df35 22import asyncio
da170b36 23import cpuinfo
a0dc7d43 24import functools
96a8f2a4 25import hashlib
a0dc7d43 26import json
6284f1f2 27import kerberos
96a8f2a4 28import logging
96a8f2a4 29import os.path
364b7040 30import progressbar2 as progressbar
da170b36 31import psutil
1479df35
MT
32import subprocess
33import tempfile
c8d43da3 34import tornado.escape
a0dc7d43 35import tornado.httpclient
1479df35 36import tornado.simple_httpclient
24763c22 37import tornado.websocket
a0dc7d43 38import urllib.parse
96a8f2a4 39
dec08a5c 40from . import _pakfire
7899002d 41from . import util
96a8f2a4 42from .constants import *
dec08a5c 43from .i18n import _
96a8f2a4 44
a0dc7d43 45# Setup logging
96a8f2a4 46log = logging.getLogger("pakfire.hub")
a0dc7d43
MT
47
48# Configure some useful defaults for all requests
49tornado.httpclient.AsyncHTTPClient.configure(
50 None, defaults = {
51 "user_agent" : "pakfire/%s" % PAKFIRE_VERSION,
52 },
53)
96a8f2a4 54
252fa19c
MT
55class AuthError(Exception):
56 """
57 Raised when the client could not authenticate against the hub
58 """
59 pass
60
61
1dd799f3
MT
62class TransportError(Exception):
63 pass
64
65
66class TemporaryConnectionError(TransportError):
67 """
68 Raised when there is a temporary connection issue and
69 the request should be tried again.
70 """
71 pass
72
73
96a8f2a4 74class Hub(object):
6284f1f2 75 def __init__(self, url, keytab=None):
a0dc7d43 76 self.url = url
6284f1f2
MT
77
78 # Store path to keytab
b6258da2 79 self.keytab = keytab
96a8f2a4
MT
80
81 # Initialise the HTTP client
a0dc7d43 82 self.client = tornado.httpclient.AsyncHTTPClient()
96a8f2a4 83
a0dc7d43 84 # XXX support proxies
ec5917f3 85
de84c866
MT
86 # Fetch a TGT with the given keytab
87 if self.keytab:
88 self._setup_credentials_cache()
89
90 self._fetch_kerberos_ticket()
91
92 def _setup_credentials_cache(self):
93 """
94 Create a temporary file to be used as Kerberos credentials cache
95 """
96 self.credentials_cache = tempfile.NamedTemporaryFile()
97
98 os.environ["KRB5CCNAME"] = self.credentials_cache.name
99
100 def _fetch_kerberos_ticket(self):
101 command = ["kinit", "-k", "-t", self.keytab]
102
103 p = subprocess.run(command, check=True, capture_output=True, text=True)
104
1479df35
MT
105 async def _socket(self, path, **kwargs):
106 return await self._request("GET", path,
107
108 # Enable websocket and ping once every ten seconds
109 websocket=True,
110 websocket_ping_interval=10,
111 websocket_ping_timeout=60,
112
113 **kwargs,
114 )
115
116 async def _request(self, method, path, websocket=False, websocket_ping_interval=None,
117 websocket_ping_timeout=None, authenticate=True,
24763c22 118 body=None, body_producer=None, on_message_callback=None, **kwargs):
6284f1f2 119 headers = {}
966f54c8 120 query_args = {}
6284f1f2 121
a0dc7d43
MT
122 # Make absolute URL
123 url = urllib.parse.urljoin(self.url, path)
96a8f2a4 124
24763c22
MT
125 # Change scheme for websocket
126 if websocket and url.startswith("https://"):
127 url = url.replace("https://", "wss://")
128
966f54c8
MT
129 # Filter all query arguments
130 for arg in kwargs:
131 # Skip anything that is None
132 if kwargs[arg] is None:
133 continue
134
135 # Add to query arguments
136 query_args[arg] = kwargs[arg]
137
138 # Encode query arguments
139 query_args = urllib.parse.urlencode(query_args, doseq=True)
6a577fa0 140
a0dc7d43 141 # Add query arguments
21a47e81 142 if method in ("GET", "PUT", "DELETE"):
6a577fa0
MT
143 url = "%s?%s" % (url, query_args)
144
145 # Add any arguments to the body
146 elif method == "POST":
147 if body is None:
148 body = query_args
96a8f2a4 149
6284f1f2
MT
150 # Perform Kerberos authentication
151 if authenticate:
152 krb5_context = self._setup_krb5_context(url)
153
24622c91
MT
154 # Fetch the Kerberos client response
155 krb5_client_response = kerberos.authGSSClientResponse(krb5_context)
156
6284f1f2
MT
157 # Set the Negotiate header
158 headers |= {
24622c91 159 "Authorization" : "Negotiate %s" % krb5_client_response,
6284f1f2
MT
160 }
161
a0dc7d43
MT
162 # Make the request
163 req = tornado.httpclient.HTTPRequest(
6284f1f2 164 method=method, url=url, headers=headers, body=body,
ec5917f3 165
1dd799f3
MT
166 # Give the server more time to respond
167 request_timeout=60,
168
6284f1f2 169 # Add all the rest
a0dc7d43
MT
170 body_producer=body_producer,
171 )
172
24763c22
MT
173 # Is this a web socket request?
174 if websocket:
175 return await tornado.websocket.websocket_connect(
176 req,
1479df35
MT
177 ping_interval=websocket_ping_interval,
178 ping_timeout=websocket_ping_timeout,
24763c22
MT
179 on_message_callback=on_message_callback,
180 )
181
a0dc7d43 182 # Send the request and wait for a response
1dd799f3
MT
183 try:
184 res = await self.client.fetch(req)
a0dc7d43 185
1dd799f3
MT
186 # Catch any HTTP errors
187 except tornado.httpclient.HTTPError as e:
188 if e.code in (502, 503):
189 raise TemporaryConnectionError from e
190
191 # Re-raise anything else
192 raise e
a0dc7d43 193
24622c91
MT
194 # Perform mutual authentication
195 if authenticate:
196 for header in res.headers.get_list("WWW-Authenticate"):
197 # Skip anything that isn't a Negotiate header
198 if not header.startswith("Negotiate "):
199 continue
200
201 # Fetch the server response
202 krb5_server_response = header.removeprefix("Negotiate ")
203
204 # Validate the server response
205 result = kerberos.authGSSClientStep(krb5_context, krb5_server_response)
206 if not result == kerberos.AUTH_GSS_COMPLETE:
252fa19c 207 raise AuthError("Could not verify the Kerberos server response")
24622c91
MT
208
209 log.debug("Kerberos Server Response validating succeeded")
210
211 # Call this so that we won't end in the else block
212 break
213
214 # If there were no headers
215 else:
252fa19c 216 raise AuthError("Mutual authentication failed")
24622c91 217
a0dc7d43
MT
218 # Decode JSON response
219 if res.body:
220 return json.loads(res.body)
221
222 # Empty response
223 return {}
ec5917f3 224
1479df35
MT
225 async def _proxy(self, cls, *args, **kwargs):
226 conn = cls(self, *args, **kwargs)
227
228 # Create the initial connection
229 await conn.reconnect()
230
231 return conn
232
6284f1f2
MT
233 def _setup_krb5_context(self, url):
234 """
235 Creates the Kerberos context that can be used to perform client
236 authentication against the server, and mutual authentication for the server.
237 """
238 # Parse the input URL
239 url = urllib.parse.urlparse(url)
240
241 # Create a new client context
242 result, krb5_context = kerberos.authGSSClientInit("HTTP@%s" % url.hostname)
243
244 if not result == kerberos.AUTH_GSS_COMPLETE:
252fa19c 245 raise AuthError("Could not create Kerberos Client context")
6284f1f2
MT
246
247 # Next step...
248 try:
249 result = kerberos.authGSSClientStep(krb5_context, "")
250
251 except kerberos.GSSError as e:
252 log.error("Kerberos authentication failed: %s" % e)
252fa19c
MT
253
254 raise AuthError("%s" % e) from e
6284f1f2
MT
255
256 if not result == kerberos.AUTH_GSS_CONTINUE:
252fa19c 257 raise AuthError("Cloud not continue Kerberos authentication")
6284f1f2
MT
258
259 return krb5_context
260
3cafdb83
MT
261 # Uploads
262
dec08a5c 263 async def upload(self, path, filename=None, show_progress=True):
a0dc7d43
MT
264 """
265 Uploads the file to the hub returning the upload ID
266 """
267 log.debug("Uploading %s..." % path)
268
269 # Use the basename of the file if no name was given
270 if filename is None:
271 filename = os.path.basename(path)
272
273 # Determine the filesize
274 size = os.path.getsize(path)
275
dec08a5c
MT
276 # Make progressbar
277 if show_progress:
364b7040
MT
278 p = progressbar.ProgressBar(
279 max_value=size,
280 widgets=[
281 progressbar.FormatCustomText(_("Uploading %s") % filename),
282 progressbar.Percentage(),
283 progressbar.Bar(),
284 progressbar.FileTransferSpeed(),
285 progressbar.DataSize(),
286 progressbar.AdaptiveETA(),
287 ],
288 )
dec08a5c
MT
289 else:
290 p = None
291
a0dc7d43
MT
292 # Compute a digest
293 digest = self._compute_digest("blake2b", path)
294
1dd799f3
MT
295 while True:
296 # Prepare the file for streaming
297 body_producer = functools.partial(self._stream_file, path, size, p)
a0dc7d43 298
1dd799f3
MT
299 # Perform upload
300 try:
301 response = await self._request("PUT", "/api/v1/uploads",
302 body_producer=body_producer,
303 filename=filename, size=size, digest=digest
304 )
305
306 # On temporary issues, try again after a few seconds
307 except TemporaryConnectionError as e:
a67d9b7f 308 await asyncio.sleep(5)
1dd799f3
MT
309
310 else:
311 break
a0dc7d43
MT
312
313 # Return the upload ID
314 return response.get("id")
315
21a47e81 316 async def delete_upload(self, upload_id):
1479df35 317 await self._request("DELETE", "/api/v1/uploads/%s" % upload_id)
21a47e81 318
688e6846
MT
319 async def upload_multi(self, *paths, show_progress=True):
320 """
321 Upload multiple files
322
323 If one file could not be uploaded, any other uploads will be deleted
324 """
325 uploads = []
326
327 # Upload everything
328 try:
329 for path in paths:
330 upload = await self.upload(path, show_progress=show_progress)
331
332 # Store the upload ID
333 uploads.append(upload)
334
335 except Exception as e:
336 # Remove any previous uploads
337 await asyncio.gather(
338 *(self.delete_upload(upload) for upload in uploads),
339 )
340
341 # Raise the exception
342 raise e
343
344 # Return the IDs of the uploads
345 return uploads
346
a0dc7d43 347 @staticmethod
dec08a5c 348 def _stream_file(path, size, p, write):
dec08a5c
MT
349 try:
350 with open(path, "rb") as f:
351 while True:
352 buf = f.read(64 * 1024)
353 if not buf:
354 break
355
356 # Update progressbar
357 if p:
358 l = len(buf)
359 p.increment(l)
360
361 write(buf)
362 finally:
363 # Finish the progressbar
364 if p:
365 p.finish()
a0dc7d43
MT
366
367 @staticmethod
368 def _compute_digest(algo, path):
369 h = hashlib.new(algo)
96a8f2a4 370
a0dc7d43
MT
371 with open(path, "rb") as f:
372 while True:
373 buf = f.read(64 * 1024)
374 if not buf:
375 break
96a8f2a4 376
a0dc7d43 377 h.update(buf)
96a8f2a4 378
a0dc7d43 379 return "%s:%s" % (algo, h.hexdigest())
da170b36 380
1479df35 381 @staticmethod
28041406 382 def _decode_json_message(message):
1479df35
MT
383 """
384 Takes a received message and decodes it.
385
386 It will then call the callback with the decoded message.
387 """
388 # Ignore empty messages
389 if message is None:
390 return
391
392 try:
393 message = json.loads(message)
394 except json.JSONDecodeError:
395 log.error("Could not decode JSON message:\n%s" % message)
396 return
397
28041406
MT
398 return message
399
400 # Builder
401
402 async def control(self, *args, **kwargs):
403 """
404 Creates a control connection
405 """
406 return await self._proxy(ControlConnection, *args, **kwargs)
1479df35 407
4f872b98 408 async def job(self, *args, **kwargs):
1479df35 409 """
28041406 410 Creates a control connection for a certain job
1479df35 411 """
4f872b98 412 return await self._proxy(JobControlConnection, *args, **kwargs)
1479df35
MT
413
414
415class HubObject(object):
416 # Disable Nagle's algorithm?
417 nodelay = False
418
28041406 419 def __init__(self, hub, *args, **kwargs):
1479df35
MT
420 self.hub = hub
421
422 # The active connection
423 self.conn = None
424
a04ccbf5
MT
425 # Callbacks
426 self.callbacks = {}
427
1479df35
MT
428 # Perform custom initialization
429 self.init(*args, **kwargs)
430
431 def init(self, *args, **kwargs):
432 pass
433
434 @property
435 def url(self):
436 raise NotImplementedError
437
438 async def connect(self):
439 """
440 This will create a connection
441 """
442 conn = await self.hub._socket(self.url,
443 on_message_callback=self.on_message_callback)
444
445 # Disable Nagle's algorithm
446 if self.nodelay:
447 conn.set_nodelay(True)
448
449 return conn
450
451 async def reconnect(self):
452 """
453 Tries to reconnect for forever
454 """
455 attempts = 0
456
457 while True:
458 attempts += 1
459
460 log.debug("Trying to reconnect (attempt %s)..." % attempts)
461
462 try:
463 self.conn = await self.connect()
464
465 # The web service responded with some error
466 except tornado.httpclient.HTTPClientError as e:
467 log.error("%s: Received HTTP Error %s" % (self.url, e.code))
468
2eb22ec1
MT
469 # 502 - Proxy Error
470 # 503 - Service Unavailable
471 if e.code in (502, 503):
1479df35
MT
472 await asyncio.sleep(10)
473
474 # Raise any unhandled errors
475 else:
476 raise e
477
478 # The web service did not respond in time
479 except tornado.simple_httpclient.HTTPTimeoutError as e:
480 await asyncio.sleep(30)
481
482 # Raise all other exceptions
483 except Exception as e:
484 raise e
485
486 # If the connection was established successfully, we return
487 else:
488 return
489
9216b623
MT
490 def close(self):
491 """
492 Closes the connection
493 """
494 if self.conn:
495 self.conn.close()
496
a04ccbf5
MT
497 def on_message_callback(self, message):
498 # Fail if no callbacks have been set
499 if not self.callbacks:
500 raise NotImplementedError
501
502 # Decode the message
503 message = self.hub._decode_json_message(message)
504
505 # Ignore empty messages
506 if message is None:
507 return
508
509 # Log the received message
510 log.debug("Received message:\n%s" % json.dumps(message, indent=4))
511
512 # Fetch the message type & data
513 type = message.get("type")
514 data = message.get("data")
515
516 # Find a suitable callback
517 try:
518 callback = self.callbacks[type]
519
520 # Log an error for unknown messages and ignore them
521 except KeyError:
522 log.error("Received message of unknown type '%s'" % type)
523 return
524
525 # Call the callback
526 callback(data)
9216b623 527
1479df35
MT
528 async def write_message(self, message, **kwargs):
529 """
530 Sends a message but encodes it into JSON first
531 """
532 # This should never happen
533 if not self.conn:
534 raise RuntimeError("Not connected")
535
536 if isinstance(message, dict):
537 message = tornado.escape.json_encode(message)
538
539 try:
540 return await self.conn.write_message(message, **kwargs)
da170b36 541
1479df35
MT
542 except tornado.websocket.WebSocketClosedError as e:
543 # Try to reconnect
544 await self.reconnect()
da170b36 545
1479df35
MT
546 # Try to send the message again
547 return await self.write_message(message, **kwargs)
548
549
9216b623
MT
550class ControlConnection(HubObject):
551 url = "/api/v1/builders/control"
1479df35 552
9216b623
MT
553 def init(self, daemon):
554 self.daemon = daemon
1479df35 555
a04ccbf5
MT
556 # Callbacks
557 self.callbacks = {
558 "job" : self.daemon.job_received,
559 }
560
1479df35
MT
561 # Fetch processor information
562 self.cpu = cpuinfo.get_cpu_info()
563
564 # Fetch the native architecture
565 self.native_arch = _pakfire.native_arch()
566
9216b623
MT
567 async def submit_stats(self):
568 """
569 Sends stats about this builder
570 """
571 log.debug("Sending stats...")
da170b36
MT
572
573 # Fetch processor information
1479df35 574 cpu_times = psutil.cpu_times_percent()
da170b36
MT
575
576 # Fetch memory/swap information
577 mem = psutil.virtual_memory()
578 swap = psutil.swap_memory()
579
580 # Fetch load average
581 loadavg = psutil.getloadavg()
582
1479df35 583 await self.write_message({
28041406
MT
584 "type" : "stats",
585 "data" : {
586 # CPU info
e9a944c7 587 "cpu_model" : self.cpu.get("brand"),
28041406
MT
588 "cpu_count" : self.cpu.get("count"),
589 "cpu_arch" : self.native_arch,
590
591 # Pakfire + OS
592 "pakfire_version" : PAKFIRE_VERSION,
593 "os_name" : util.get_distro_name(),
594
595 # CPU Times
596 "cpu_user" : cpu_times.user,
597 "cpu_nice" : cpu_times.nice,
598 "cpu_system" : cpu_times.system,
599 "cpu_idle" : cpu_times.idle,
600 "cpu_iowait" : cpu_times.iowait,
601 "cpu_irq" : cpu_times.irq,
602 "cpu_softirq" : cpu_times.softirq,
603 "cpu_steal" : cpu_times.steal,
604 "cpu_guest" : cpu_times.guest,
605 "cpu_guest_nice" : cpu_times.guest_nice,
606
607 # Load average
608 "loadavg1" : loadavg[0],
609 "loadavg5" : loadavg[1],
610 "loadavg15" : loadavg[2],
611
612 # Memory
613 "mem_total" : mem.total,
614 "mem_available" : mem.available,
615 "mem_used" : mem.used,
616 "mem_free" : mem.free,
617 "mem_active" : mem.active,
618 "mem_inactive" : mem.inactive,
619 "mem_buffers" : mem.buffers,
620 "mem_cached" : mem.cached,
621 "mem_shared" : mem.shared,
622
623 # Swap
624 "swap_total" : swap.total,
625 "swap_used" : swap.used,
626 "swap_free" : swap.free,
627 },
1479df35 628 })
76960e90 629
2465b05c 630
28041406 631class JobControlConnection(HubObject):
1479df35
MT
632 """
633 Proxy for Build Jobs
634 """
a04ccbf5 635 def init(self, id, worker):
1479df35 636 self.id = id
76960e90 637
a04ccbf5 638 # Callbacks
4f872b98 639 self.callbacks = {
a04ccbf5 640 "abort" : worker.abort,
4f872b98
MT
641 }
642
1479df35
MT
643 @property
644 def url(self):
645 return "/api/v1/jobs/%s" % self.id
c8d43da3 646
1479df35 647 async def finished(self, success, packages=None, logfile=None):
2dc6117b
MT
648 """
649 Will tell the hub that a job has finished
650 """
651 # Upload the log file
1479df35
MT
652 if logfile:
653 logfile = await self.hub.upload(logfile, filename="%s.log" % self.id)
2dc6117b 654
688e6846
MT
655 # Upload the packages
656 if packages:
1479df35 657 packages = await self.hub.upload_multi(*packages)
688e6846 658
1dd799f3
MT
659 while True:
660 try:
661 # Send the request
662 response = await self.hub._request("POST", "/api/v1/jobs/%s/finished" % self.id,
663 success="1" if success else "0", logfile=logfile, packages=packages,
664 )
665
666 # Try again after a short moment on connection errors
667 except TemporaryConnectionError as e:
668 await asyncio.sleep(5)
669
670 else:
671 break
2dc6117b
MT
672
673 # Handle the response
674 # XXX TODO
675
1479df35 676 async def log(self, timestamp, level, message):
c8d43da3
MT
677 """
678 Sends a log message to the hub
679 """
1479df35 680 await self.write_message({
c21e39c1
MT
681 "type" : "log",
682 "data" : {
683 "timestamp" : timestamp,
684 "level" : level,
fae7d9b8 685 "message" : message,
c21e39c1 686 },
c8d43da3 687 })