]> git.ipfire.org Git - people/ms/pakfire.git/blob - src/pakfire/buildservice.py
buildservice: Add function to signal finished build jobs
[people/ms/pakfire.git] / src / pakfire / buildservice.py
1 ###############################################################################
2 # #
3 # Pakfire - The IPFire package management system #
4 # Copyright (C) 2023 Pakfire development team #
5 # #
6 # This program is free software: you can redistribute it and/or modify #
7 # it under the terms of the GNU General Public License as published by #
8 # the Free Software Foundation, either version 3 of the License, or #
9 # (at your option) any later version. #
10 # #
11 # This program is distributed in the hope that it will be useful, #
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of #
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
14 # GNU General Public License for more details. #
15 # #
16 # You should have received a copy of the GNU General Public License #
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. #
18 # #
19 ###############################################################################
20
21 import asyncio
22 import json
23 import kerberos
24 import logging
25 import os
26
27 import subprocess
28 import tempfile
29 import tornado.httpclient
30 import tornado.simple_httpclient
31 import tornado.websocket
32 import urllib.parse
33
34 from .__version__ import PAKFIRE_VERSION
35 from . import _pakfire
36 from . import util
37
38 # Configure some useful defaults for all requests
39 tornado.httpclient.AsyncHTTPClient.configure(
40 None, defaults = {
41 "user_agent" : "pakfire/%s" % PAKFIRE_VERSION,
42 },
43 )
44
45 # Setup logging
46 log = logging.getLogger("pakfire.buildservice")
47
48 class AuthError(Exception):
49 """
50 Raised when the client could not authenticate against the build service
51 """
52 pass
53
54
55 class TransportError(Exception):
56 pass
57
58
59 class TemporaryConnectionError(TransportError):
60 """
61 Raised when there is a temporary connection issue and
62 the request should be tried again.
63 """
64 pass
65
66
67 class BuildService(_pakfire.BuildService):
68 """
69 This wraps the parts of the build service
70 that has been implemented in libpakfire.
71 """
72 def __init__(self):
73 super().__init__()
74
75 # Initialise the HTTP client
76 self.client = tornado.httpclient.AsyncHTTPClient()
77
78 async def _socket(self, path, **kwargs):
79 return await self._request("GET", path,
80
81 # Enable websocket and ping once every ten seconds
82 websocket=True,
83 websocket_ping_interval=10,
84 websocket_ping_timeout=60,
85
86 **kwargs,
87 )
88
89 async def _request(self, method, path, websocket=False, websocket_ping_interval=None,
90 websocket_ping_timeout=None, authenticate=True,
91 body=None, body_producer=None, on_message_callback=None, **kwargs):
92 headers = {}
93 query_args = {}
94
95 # Make absolute URL
96 url = urllib.parse.urljoin(self.url, path)
97
98 # Change scheme for websocket
99 if websocket and url.startswith("https://"):
100 url = url.replace("https://", "wss://")
101
102 # Filter all query arguments
103 for arg in kwargs:
104 # Skip anything that is None
105 if kwargs[arg] is None:
106 continue
107
108 # Add to query arguments
109 query_args[arg] = kwargs[arg]
110
111 # Encode query arguments
112 query_args = urllib.parse.urlencode(query_args, doseq=True)
113
114 # Add query arguments
115 if method in ("GET", "PUT", "DELETE"):
116 url = "%s?%s" % (url, query_args)
117
118 # Add any arguments to the body
119 elif method == "POST":
120 if body is None:
121 body = query_args
122
123 # Perform Kerberos authentication
124 if authenticate:
125 krb5_context = self._setup_krb5_context(url)
126
127 # Fetch the Kerberos client response
128 krb5_client_response = kerberos.authGSSClientResponse(krb5_context)
129
130 # Set the Negotiate header
131 headers |= {
132 "Authorization" : "Negotiate %s" % krb5_client_response,
133 }
134
135 # Make the request
136 req = tornado.httpclient.HTTPRequest(
137 method=method, url=url, headers=headers, body=body,
138
139 # Give the server more time to respond
140 request_timeout=60,
141
142 # Add all the rest
143 body_producer=body_producer,
144 )
145
146 # Is this a web socket request?
147 if websocket:
148 return await tornado.websocket.websocket_connect(
149 req,
150 ping_interval=websocket_ping_interval,
151 ping_timeout=websocket_ping_timeout,
152 on_message_callback=on_message_callback,
153 )
154
155 # Send the request and wait for a response
156 try:
157 res = await self.client.fetch(req)
158
159 # Catch any HTTP errors
160 except tornado.httpclient.HTTPError as e:
161 if e.code in (502, 503):
162 raise TemporaryConnectionError from e
163
164 # Re-raise anything else
165 raise e
166
167 # Perform mutual authentication
168 if authenticate:
169 for header in res.headers.get_list("WWW-Authenticate"):
170 # Skip anything that isn't a Negotiate header
171 if not header.startswith("Negotiate "):
172 continue
173
174 # Fetch the server response
175 krb5_server_response = header.removeprefix("Negotiate ")
176
177 # Validate the server response
178 result = kerberos.authGSSClientStep(krb5_context, krb5_server_response)
179 if not result == kerberos.AUTH_GSS_COMPLETE:
180 raise AuthError("Could not verify the Kerberos server response")
181
182 log.debug("Kerberos Server Response validating succeeded")
183
184 # Call this so that we won't end in the else block
185 break
186
187 # If there were no headers
188 else:
189 raise AuthError("Mutual authentication failed")
190
191 # Decode JSON response
192 if res.body:
193 return json.loads(res.body)
194
195 # Empty response
196 return {}
197
198 async def _proxy(self, cls, *args, **kwargs):
199 conn = cls(self, *args, **kwargs)
200
201 # Create the initial connection
202 await conn.reconnect()
203
204 return conn
205
206 def _setup_krb5_context(self, url):
207 """
208 Creates the Kerberos context that can be used to perform client
209 authentication against the server, and mutual authentication for the server.
210 """
211 # Parse the input URL
212 url = urllib.parse.urlparse(url)
213
214 # Create a new client context
215 result, krb5_context = kerberos.authGSSClientInit("HTTP@%s" % url.hostname)
216
217 if not result == kerberos.AUTH_GSS_COMPLETE:
218 raise AuthError("Could not create Kerberos Client context")
219
220 # Next step...
221 try:
222 result = kerberos.authGSSClientStep(krb5_context, "")
223
224 except kerberos.GSSError as e:
225 log.error("Kerberos authentication failed: %s" % e)
226
227 raise AuthError("%s" % e) from e
228
229 if not result == kerberos.AUTH_GSS_CONTINUE:
230 raise AuthError("Cloud not continue Kerberos authentication")
231
232 return krb5_context
233
234 # Builder
235
236 async def control(self, *args, **kwargs):
237 """
238 Creates a control connection
239 """
240 return await self._proxy(ControlConnection, *args, **kwargs)
241
242 async def job(self, *args, **kwargs):
243 """
244 Creates a control connection for a certain job
245 """
246 return await self._proxy(JobControlConnection, *args, **kwargs)
247
248
249 class Connection(object):
250 def __init__(self, service, *args, **kwargs):
251 self.service = service
252
253 # The active connection
254 self.conn = None
255
256 # Callbacks
257 self.callbacks = {}
258
259 # Perform custom initialization
260 self.init(*args, **kwargs)
261
262 def init(self, *args, **kwargs):
263 pass
264
265 @property
266 def url(self):
267 raise NotImplementedError
268
269 async def connect(self):
270 """
271 This will create a connection
272 """
273 return await self.service._socket(self.url,
274 on_message_callback=self.on_message_callback)
275
276 async def reconnect(self):
277 """
278 Tries to reconnect for forever
279 """
280 attempts = 0
281
282 while True:
283 attempts += 1
284
285 log.debug("Trying to reconnect (attempt %s)..." % attempts)
286
287 try:
288 self.conn = await self.connect()
289
290 # The web service responded with some error
291 except tornado.httpclient.HTTPClientError as e:
292 log.error("%s: Received HTTP Error %s" % (self.url, e.code))
293
294 # 502 - Proxy Error
295 # 503 - Service Unavailable
296 if e.code in (502, 503):
297 await asyncio.sleep(10)
298
299 # Raise any unhandled errors
300 else:
301 raise e
302
303 # The web service did not respond in time
304 except tornado.simple_httpclient.HTTPTimeoutError as e:
305 await asyncio.sleep(30)
306
307 # Raise all other exceptions
308 except Exception as e:
309 raise e
310
311 # If the connection was established successfully, we return
312 else:
313 return
314
315 def close(self):
316 """
317 Closes the connection
318 """
319 if self.conn:
320 self.conn.close()
321
322 def on_message_callback(self, message):
323 # Fail if no callbacks have been set
324 if not self.callbacks:
325 raise NotImplementedError
326
327 # Decode the message
328 message = self._decode_json_message(message)
329
330 # Ignore empty messages
331 if message is None:
332 return
333
334 # Log the received message
335 log.debug("Received message:\n%s" % json.dumps(message, indent=4))
336
337 # Fetch the message type & data
338 type = message.get("type")
339 data = message.get("data")
340
341 # Find a suitable callback
342 try:
343 callback = self.callbacks[type]
344
345 # Log an error for unknown messages and ignore them
346 except KeyError:
347 log.error("Received message of unknown type '%s'" % type)
348 return
349
350 # Call the callback
351 callback(data)
352
353 @staticmethod
354 def _decode_json_message(message):
355 """
356 Takes a received message and decodes it
357 """
358 # Ignore empty messages
359 if message is None:
360 return
361
362 try:
363 message = json.loads(message)
364 except json.JSONDecodeError:
365 log.error("Could not decode JSON message:\n%s" % message)
366 return
367
368 return message
369
370 async def write_message(self, message, **kwargs):
371 """
372 Sends a message but encodes it into JSON first
373 """
374 # This should never happen
375 if not self.conn:
376 raise RuntimeError("Not connected")
377
378 if isinstance(message, dict):
379 message = tornado.escape.json_encode(message)
380
381 try:
382 return await self.conn.write_message(message, **kwargs)
383
384 except tornado.websocket.WebSocketClosedError as e:
385 # Try to reconnect
386 await self.reconnect()
387
388 # Try to send the message again
389 return await self.write_message(message, **kwargs)
390
391
392 class ControlConnection(Connection):
393 url = "/api/v1/builders/control"
394
395 def init(self, daemon):
396 self.daemon = daemon
397
398 # Callbacks
399 self.callbacks = {
400 "job" : self.daemon.job_received,
401 }
402
403
404 class JobControlConnection(Connection):
405 """
406 Proxy for Build Jobs
407 """
408 def init(self, id, worker):
409 self.id = id
410
411 # Callbacks
412 self.callbacks = {
413 "abort" : worker.abort,
414 }
415
416 @property
417 def url(self):
418 return "/api/v1/jobs/%s" % self.id
419
420 async def finished(self, success, packages=None, logfile=None):
421 """
422 Will tell the hub that a job has finished
423 """
424 # Upload the log file
425 if logfile:
426 logfile = await self.service.upload(logfile, filename="%s.log" % self.id)
427
428 # Upload the packages
429 if packages:
430 for package in packages:
431 await self.service.upload(package)
432
433 # Send request
434 self.service.job_finished(self.id, success=success, logfile=logfile, packages=packages)
435
436 async def log(self, timestamp, level, message):
437 """
438 Sends a log message to the hub
439 """
440 await self.write_message({
441 "type" : "log",
442 "data" : {
443 "timestamp" : timestamp,
444 "level" : level,
445 "message" : message,
446 },
447 })