]> git.ipfire.org Git - pakfire.git/blame - src/pakfire/hub.py
hub: Don't use a default keytab
[pakfire.git] / src / pakfire / hub.py
CommitLineData
96a8f2a4
MT
1#!/usr/bin/python3
2###############################################################################
3# #
4# Pakfire - The IPFire package management system #
5# Copyright (C) 2013 Pakfire development team #
6# #
7# This program is free software: you can redistribute it and/or modify #
8# it under the terms of the GNU General Public License as published by #
9# the Free Software Foundation, either version 3 of the License, or #
10# (at your option) any later version. #
11# #
12# This program is distributed in the hope that it will be useful, #
13# but WITHOUT ANY WARRANTY; without even the implied warranty of #
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
15# GNU General Public License for more details. #
16# #
17# You should have received a copy of the GNU General Public License #
18# along with this program. If not, see <http://www.gnu.org/licenses/>. #
19# #
20###############################################################################
21
1479df35 22import asyncio
da170b36 23import cpuinfo
a0dc7d43 24import functools
96a8f2a4 25import hashlib
a0dc7d43 26import json
6284f1f2 27import kerberos
96a8f2a4 28import logging
96a8f2a4 29import os.path
da170b36 30import psutil
1479df35
MT
31import subprocess
32import tempfile
c8d43da3 33import tornado.escape
a0dc7d43 34import tornado.httpclient
1479df35 35import tornado.simple_httpclient
24763c22 36import tornado.websocket
a0dc7d43 37import urllib.parse
96a8f2a4 38
dec08a5c 39from . import _pakfire
7899002d 40from . import util
96a8f2a4 41from .constants import *
dec08a5c 42from .i18n import _
96a8f2a4 43
a0dc7d43 44# Setup logging
96a8f2a4 45log = logging.getLogger("pakfire.hub")
a0dc7d43
MT
46
47# Configure some useful defaults for all requests
48tornado.httpclient.AsyncHTTPClient.configure(
49 None, defaults = {
50 "user_agent" : "pakfire/%s" % PAKFIRE_VERSION,
51 },
52)
96a8f2a4 53
252fa19c
MT
54class AuthError(Exception):
55 """
56 Raised when the client could not authenticate against the hub
57 """
58 pass
59
60
96a8f2a4 61class Hub(object):
6284f1f2 62 def __init__(self, url, keytab=None):
a0dc7d43 63 self.url = url
6284f1f2
MT
64
65 # Store path to keytab
b6258da2 66 self.keytab = keytab
96a8f2a4
MT
67
68 # Initialise the HTTP client
a0dc7d43 69 self.client = tornado.httpclient.AsyncHTTPClient()
96a8f2a4 70
a0dc7d43 71 # XXX support proxies
ec5917f3 72
1479df35
MT
73 async def _socket(self, path, **kwargs):
74 return await self._request("GET", path,
75
76 # Enable websocket and ping once every ten seconds
77 websocket=True,
78 websocket_ping_interval=10,
79 websocket_ping_timeout=60,
80
81 **kwargs,
82 )
83
84 async def _request(self, method, path, websocket=False, websocket_ping_interval=None,
85 websocket_ping_timeout=None, authenticate=True,
24763c22 86 body=None, body_producer=None, on_message_callback=None, **kwargs):
6284f1f2 87 headers = {}
966f54c8 88 query_args = {}
6284f1f2 89
a0dc7d43
MT
90 # Make absolute URL
91 url = urllib.parse.urljoin(self.url, path)
96a8f2a4 92
24763c22
MT
93 # Change scheme for websocket
94 if websocket and url.startswith("https://"):
95 url = url.replace("https://", "wss://")
96
966f54c8
MT
97 # Filter all query arguments
98 for arg in kwargs:
99 # Skip anything that is None
100 if kwargs[arg] is None:
101 continue
102
103 # Add to query arguments
104 query_args[arg] = kwargs[arg]
105
106 # Encode query arguments
107 query_args = urllib.parse.urlencode(query_args, doseq=True)
6a577fa0 108
a0dc7d43 109 # Add query arguments
21a47e81 110 if method in ("GET", "PUT", "DELETE"):
6a577fa0
MT
111 url = "%s?%s" % (url, query_args)
112
113 # Add any arguments to the body
114 elif method == "POST":
115 if body is None:
116 body = query_args
96a8f2a4 117
6284f1f2
MT
118 # Perform Kerberos authentication
119 if authenticate:
120 krb5_context = self._setup_krb5_context(url)
121
24622c91
MT
122 # Fetch the Kerberos client response
123 krb5_client_response = kerberos.authGSSClientResponse(krb5_context)
124
6284f1f2
MT
125 # Set the Negotiate header
126 headers |= {
24622c91 127 "Authorization" : "Negotiate %s" % krb5_client_response,
6284f1f2
MT
128 }
129
a0dc7d43
MT
130 # Make the request
131 req = tornado.httpclient.HTTPRequest(
6284f1f2 132 method=method, url=url, headers=headers, body=body,
ec5917f3 133
6284f1f2 134 # Add all the rest
a0dc7d43
MT
135 body_producer=body_producer,
136 )
137
24763c22
MT
138 # Is this a web socket request?
139 if websocket:
140 return await tornado.websocket.websocket_connect(
141 req,
1479df35
MT
142 ping_interval=websocket_ping_interval,
143 ping_timeout=websocket_ping_timeout,
24763c22
MT
144 on_message_callback=on_message_callback,
145 )
146
a0dc7d43
MT
147 # Send the request and wait for a response
148 res = await self.client.fetch(req)
149
150 # XXX Do we have to catch any errors here?
151
24622c91
MT
152 # Perform mutual authentication
153 if authenticate:
154 for header in res.headers.get_list("WWW-Authenticate"):
155 # Skip anything that isn't a Negotiate header
156 if not header.startswith("Negotiate "):
157 continue
158
159 # Fetch the server response
160 krb5_server_response = header.removeprefix("Negotiate ")
161
162 # Validate the server response
163 result = kerberos.authGSSClientStep(krb5_context, krb5_server_response)
164 if not result == kerberos.AUTH_GSS_COMPLETE:
252fa19c 165 raise AuthError("Could not verify the Kerberos server response")
24622c91
MT
166
167 log.debug("Kerberos Server Response validating succeeded")
168
169 # Call this so that we won't end in the else block
170 break
171
172 # If there were no headers
173 else:
252fa19c 174 raise AuthError("Mutual authentication failed")
24622c91 175
a0dc7d43
MT
176 # Decode JSON response
177 if res.body:
178 return json.loads(res.body)
179
180 # Empty response
181 return {}
ec5917f3 182
1479df35
MT
183 async def _proxy(self, cls, *args, **kwargs):
184 conn = cls(self, *args, **kwargs)
185
186 # Create the initial connection
187 await conn.reconnect()
188
189 return conn
190
6284f1f2
MT
191 def _setup_krb5_context(self, url):
192 """
193 Creates the Kerberos context that can be used to perform client
194 authentication against the server, and mutual authentication for the server.
195 """
196 # Parse the input URL
197 url = urllib.parse.urlparse(url)
198
199 # Create a new client context
200 result, krb5_context = kerberos.authGSSClientInit("HTTP@%s" % url.hostname)
201
202 if not result == kerberos.AUTH_GSS_COMPLETE:
252fa19c 203 raise AuthError("Could not create Kerberos Client context")
6284f1f2
MT
204
205 # Next step...
206 try:
207 result = kerberos.authGSSClientStep(krb5_context, "")
208
209 except kerberos.GSSError as e:
210 log.error("Kerberos authentication failed: %s" % e)
252fa19c
MT
211
212 raise AuthError("%s" % e) from e
6284f1f2
MT
213
214 if not result == kerberos.AUTH_GSS_CONTINUE:
252fa19c 215 raise AuthError("Cloud not continue Kerberos authentication")
6284f1f2
MT
216
217 return krb5_context
218
96a8f2a4
MT
219 # Test functions
220
6284f1f2 221 async def test(self):
96a8f2a4
MT
222 """
223 Tests the connection
224 """
6284f1f2 225 return await self._request("GET", "/test")
96a8f2a4
MT
226
227 # Build actions
228
229 def get_build(self, uuid):
1479df35 230 return self._request("/api/v1/builds/%s" % uuid, decode="json")
96a8f2a4 231
4240696b 232 async def build(self, path, repo=None, arches=None):
96a8f2a4
MT
233 """
234 Create a new build on the hub
235 """
dcc48721
MT
236 # Upload the souce file to the server
237 upload_id = await self.upload(path)
96a8f2a4 238
dcc48721 239 log.debug("%s has been uploaded as %s" % (path, upload_id))
96a8f2a4 240
dcc48721 241 # Create a new build
1479df35 242 build_id = await self._request("POST", "/api/v1/builds",
4240696b 243 upload_id=upload_id, repo=repo, arches=arches)
96a8f2a4 244
dcc48721 245 log.debug("Build creates as %s" % build_id)
96a8f2a4 246
dcc48721 247 return build_id
96a8f2a4
MT
248
249 # Job actions
250
251 def get_job(self, uuid):
252 return self._request("/jobs/%s" % uuid, decode="json")
253
254 # Package actions
255
256 def get_package(self, uuid):
257 return self._request("/packages/%s" % uuid, decode="json")
258
3cafdb83
MT
259 # Uploads
260
261 async def list_uploads(self):
262 """
263 Returns a list of all uploads
264 """
1479df35 265 response = await self._request("GET", "/api/v1/uploads")
3cafdb83
MT
266
267 return response.get("uploads")
96a8f2a4 268
dec08a5c 269 async def upload(self, path, filename=None, show_progress=True):
a0dc7d43
MT
270 """
271 Uploads the file to the hub returning the upload ID
272 """
273 log.debug("Uploading %s..." % path)
274
275 # Use the basename of the file if no name was given
276 if filename is None:
277 filename = os.path.basename(path)
278
279 # Determine the filesize
280 size = os.path.getsize(path)
281
dec08a5c
MT
282 # Make progressbar
283 if show_progress:
284 p = _pakfire.Progressbar()
285 p.add_string(_("Uploading %s") % filename)
286 p.add_percentage()
287 p.add_bar()
288 p.add_transfer_speed()
289 p.add_string("|")
290 p.add_bytes_transferred()
291 p.add_eta()
292 else:
293 p = None
294
a0dc7d43
MT
295 # Compute a digest
296 digest = self._compute_digest("blake2b", path)
297
298 # Prepare the file for streaming
dec08a5c 299 body_producer = functools.partial(self._stream_file, path, size, p)
a0dc7d43
MT
300
301 # Perform upload
1479df35 302 response = await self._request("PUT", "/api/v1/uploads",
a0dc7d43
MT
303 body_producer=body_producer,
304 filename=filename, size=size, digest=digest
305 )
306
307 # Return the upload ID
308 return response.get("id")
309
21a47e81 310 async def delete_upload(self, upload_id):
1479df35 311 await self._request("DELETE", "/api/v1/uploads/%s" % upload_id)
21a47e81 312
688e6846
MT
313 async def upload_multi(self, *paths, show_progress=True):
314 """
315 Upload multiple files
316
317 If one file could not be uploaded, any other uploads will be deleted
318 """
319 uploads = []
320
321 # Upload everything
322 try:
323 for path in paths:
324 upload = await self.upload(path, show_progress=show_progress)
325
326 # Store the upload ID
327 uploads.append(upload)
328
329 except Exception as e:
330 # Remove any previous uploads
331 await asyncio.gather(
332 *(self.delete_upload(upload) for upload in uploads),
333 )
334
335 # Raise the exception
336 raise e
337
338 # Return the IDs of the uploads
339 return uploads
340
a0dc7d43 341 @staticmethod
dec08a5c
MT
342 def _stream_file(path, size, p, write):
343 # Start the progressbar
344 if p:
345 p.start(size)
346
347 try:
348 with open(path, "rb") as f:
349 while True:
350 buf = f.read(64 * 1024)
351 if not buf:
352 break
353
354 # Update progressbar
355 if p:
356 l = len(buf)
357 p.increment(l)
358
359 write(buf)
360 finally:
361 # Finish the progressbar
362 if p:
363 p.finish()
a0dc7d43
MT
364
365 @staticmethod
366 def _compute_digest(algo, path):
367 h = hashlib.new(algo)
96a8f2a4 368
a0dc7d43
MT
369 with open(path, "rb") as f:
370 while True:
371 buf = f.read(64 * 1024)
372 if not buf:
373 break
96a8f2a4 374
a0dc7d43 375 h.update(buf)
96a8f2a4 376
a0dc7d43 377 return "%s:%s" % (algo, h.hexdigest())
da170b36 378
1479df35 379 @staticmethod
28041406 380 def _decode_json_message(message):
1479df35
MT
381 """
382 Takes a received message and decodes it.
383
384 It will then call the callback with the decoded message.
385 """
386 # Ignore empty messages
387 if message is None:
388 return
389
390 try:
391 message = json.loads(message)
392 except json.JSONDecodeError:
393 log.error("Could not decode JSON message:\n%s" % message)
394 return
395
28041406
MT
396 return message
397
398 # Builder
399
400 async def control(self, *args, **kwargs):
401 """
402 Creates a control connection
403 """
404 return await self._proxy(ControlConnection, *args, **kwargs)
1479df35 405
4f872b98 406 async def job(self, *args, **kwargs):
1479df35 407 """
28041406 408 Creates a control connection for a certain job
1479df35 409 """
4f872b98 410 return await self._proxy(JobControlConnection, *args, **kwargs)
1479df35
MT
411
412
413class HubObject(object):
414 # Disable Nagle's algorithm?
415 nodelay = False
416
28041406 417 def __init__(self, hub, *args, **kwargs):
1479df35
MT
418 self.hub = hub
419
420 # The active connection
421 self.conn = None
422
a04ccbf5
MT
423 # Callbacks
424 self.callbacks = {}
425
1479df35
MT
426 # Perform custom initialization
427 self.init(*args, **kwargs)
428
429 def init(self, *args, **kwargs):
430 pass
431
432 @property
433 def url(self):
434 raise NotImplementedError
435
436 async def connect(self):
437 """
438 This will create a connection
439 """
440 conn = await self.hub._socket(self.url,
441 on_message_callback=self.on_message_callback)
442
443 # Disable Nagle's algorithm
444 if self.nodelay:
445 conn.set_nodelay(True)
446
447 return conn
448
449 async def reconnect(self):
450 """
451 Tries to reconnect for forever
452 """
453 attempts = 0
454
455 while True:
456 attempts += 1
457
458 log.debug("Trying to reconnect (attempt %s)..." % attempts)
459
460 try:
461 self.conn = await self.connect()
462
463 # The web service responded with some error
464 except tornado.httpclient.HTTPClientError as e:
465 log.error("%s: Received HTTP Error %s" % (self.url, e.code))
466
2eb22ec1
MT
467 # 502 - Proxy Error
468 # 503 - Service Unavailable
469 if e.code in (502, 503):
1479df35
MT
470 await asyncio.sleep(10)
471
472 # Raise any unhandled errors
473 else:
474 raise e
475
476 # The web service did not respond in time
477 except tornado.simple_httpclient.HTTPTimeoutError as e:
478 await asyncio.sleep(30)
479
480 # Raise all other exceptions
481 except Exception as e:
482 raise e
483
484 # If the connection was established successfully, we return
485 else:
486 return
487
9216b623
MT
488 def close(self):
489 """
490 Closes the connection
491 """
492 if self.conn:
493 self.conn.close()
494
a04ccbf5
MT
495 def on_message_callback(self, message):
496 # Fail if no callbacks have been set
497 if not self.callbacks:
498 raise NotImplementedError
499
500 # Decode the message
501 message = self.hub._decode_json_message(message)
502
503 # Ignore empty messages
504 if message is None:
505 return
506
507 # Log the received message
508 log.debug("Received message:\n%s" % json.dumps(message, indent=4))
509
510 # Fetch the message type & data
511 type = message.get("type")
512 data = message.get("data")
513
514 # Find a suitable callback
515 try:
516 callback = self.callbacks[type]
517
518 # Log an error for unknown messages and ignore them
519 except KeyError:
520 log.error("Received message of unknown type '%s'" % type)
521 return
522
523 # Call the callback
524 callback(data)
9216b623 525
1479df35
MT
526 async def write_message(self, message, **kwargs):
527 """
528 Sends a message but encodes it into JSON first
529 """
530 # This should never happen
531 if not self.conn:
532 raise RuntimeError("Not connected")
533
534 if isinstance(message, dict):
535 message = tornado.escape.json_encode(message)
536
537 try:
538 return await self.conn.write_message(message, **kwargs)
da170b36 539
1479df35
MT
540 except tornado.websocket.WebSocketClosedError as e:
541 # Try to reconnect
542 await self.reconnect()
da170b36 543
1479df35
MT
544 # Try to send the message again
545 return await self.write_message(message, **kwargs)
546
547
9216b623
MT
548class ControlConnection(HubObject):
549 url = "/api/v1/builders/control"
1479df35 550
9216b623
MT
551 def init(self, daemon):
552 self.daemon = daemon
1479df35 553
a04ccbf5
MT
554 # Callbacks
555 self.callbacks = {
556 "job" : self.daemon.job_received,
557 }
558
1479df35
MT
559 # Fetch processor information
560 self.cpu = cpuinfo.get_cpu_info()
561
562 # Fetch the native architecture
563 self.native_arch = _pakfire.native_arch()
564
9216b623
MT
565 async def submit_stats(self):
566 """
567 Sends stats about this builder
568 """
569 log.debug("Sending stats...")
da170b36
MT
570
571 # Fetch processor information
1479df35 572 cpu_times = psutil.cpu_times_percent()
da170b36
MT
573
574 # Fetch memory/swap information
575 mem = psutil.virtual_memory()
576 swap = psutil.swap_memory()
577
578 # Fetch load average
579 loadavg = psutil.getloadavg()
580
1479df35 581 await self.write_message({
28041406
MT
582 "type" : "stats",
583 "data" : {
584 # CPU info
585 "cpu_model" : self.cpu.get("brand_raw"),
586 "cpu_count" : self.cpu.get("count"),
587 "cpu_arch" : self.native_arch,
588
589 # Pakfire + OS
590 "pakfire_version" : PAKFIRE_VERSION,
591 "os_name" : util.get_distro_name(),
592
593 # CPU Times
594 "cpu_user" : cpu_times.user,
595 "cpu_nice" : cpu_times.nice,
596 "cpu_system" : cpu_times.system,
597 "cpu_idle" : cpu_times.idle,
598 "cpu_iowait" : cpu_times.iowait,
599 "cpu_irq" : cpu_times.irq,
600 "cpu_softirq" : cpu_times.softirq,
601 "cpu_steal" : cpu_times.steal,
602 "cpu_guest" : cpu_times.guest,
603 "cpu_guest_nice" : cpu_times.guest_nice,
604
605 # Load average
606 "loadavg1" : loadavg[0],
607 "loadavg5" : loadavg[1],
608 "loadavg15" : loadavg[2],
609
610 # Memory
611 "mem_total" : mem.total,
612 "mem_available" : mem.available,
613 "mem_used" : mem.used,
614 "mem_free" : mem.free,
615 "mem_active" : mem.active,
616 "mem_inactive" : mem.inactive,
617 "mem_buffers" : mem.buffers,
618 "mem_cached" : mem.cached,
619 "mem_shared" : mem.shared,
620
621 # Swap
622 "swap_total" : swap.total,
623 "swap_used" : swap.used,
624 "swap_free" : swap.free,
625 },
1479df35 626 })
76960e90 627
2465b05c 628
28041406 629class JobControlConnection(HubObject):
1479df35
MT
630 """
631 Proxy for Build Jobs
632 """
a04ccbf5 633 def init(self, id, worker):
1479df35 634 self.id = id
76960e90 635
a04ccbf5 636 # Callbacks
4f872b98 637 self.callbacks = {
a04ccbf5 638 "abort" : worker.abort,
4f872b98
MT
639 }
640
1479df35
MT
641 @property
642 def url(self):
643 return "/api/v1/jobs/%s" % self.id
c8d43da3 644
1479df35 645 async def finished(self, success, packages=None, logfile=None):
2dc6117b
MT
646 """
647 Will tell the hub that a job has finished
648 """
649 # Upload the log file
1479df35
MT
650 if logfile:
651 logfile = await self.hub.upload(logfile, filename="%s.log" % self.id)
2dc6117b 652
688e6846
MT
653 # Upload the packages
654 if packages:
1479df35 655 packages = await self.hub.upload_multi(*packages)
688e6846 656
2dc6117b 657 # Send the request
1479df35 658 await self.write_message({
c21e39c1
MT
659 "type" : "finished",
660 "data" : {
661 "success" : success,
662 "logfile" : logfile,
663 "packages" : packages,
664 },
1479df35 665 })
2dc6117b
MT
666
667 # Handle the response
668 # XXX TODO
669
1479df35 670 async def log(self, timestamp, level, message):
c8d43da3
MT
671 """
672 Sends a log message to the hub
673 """
1479df35 674 await self.write_message({
c21e39c1
MT
675 "type" : "log",
676 "data" : {
677 "timestamp" : timestamp,
678 "level" : level,
fae7d9b8 679 "message" : message,
c21e39c1 680 },
c8d43da3 681 })