]> git.ipfire.org Git - people/ms/pakfire.git/blob - src/pakfire/buildservice.py
buildservice: Move submitting stats into C
[people/ms/pakfire.git] / src / pakfire / buildservice.py
1 ###############################################################################
2 # #
3 # Pakfire - The IPFire package management system #
4 # Copyright (C) 2023 Pakfire development team #
5 # #
6 # This program is free software: you can redistribute it and/or modify #
7 # it under the terms of the GNU General Public License as published by #
8 # the Free Software Foundation, either version 3 of the License, or #
9 # (at your option) any later version. #
10 # #
11 # This program is distributed in the hope that it will be useful, #
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of #
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
14 # GNU General Public License for more details. #
15 # #
16 # You should have received a copy of the GNU General Public License #
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. #
18 # #
19 ###############################################################################
20
21 import asyncio
22 import json
23 import kerberos
24 import logging
25 import os
26 import subprocess
27 import tempfile
28 import tornado.httpclient
29 import tornado.simple_httpclient
30 import tornado.websocket
31 import urllib.parse
32
33 from .__version__ import PAKFIRE_VERSION
34 from . import _pakfire
35 from . import util
36
37 # Configure some useful defaults for all requests
38 tornado.httpclient.AsyncHTTPClient.configure(
39 None, defaults = {
40 "user_agent" : "pakfire/%s" % PAKFIRE_VERSION,
41 },
42 )
43
44 # Setup logging
45 log = logging.getLogger("pakfire.buildservice")
46
47 class AuthError(Exception):
48 """
49 Raised when the client could not authenticate against the build service
50 """
51 pass
52
53
54 class TransportError(Exception):
55 pass
56
57
58 class TemporaryConnectionError(TransportError):
59 """
60 Raised when there is a temporary connection issue and
61 the request should be tried again.
62 """
63 pass
64
65
66 class BuildService(_pakfire.BuildService):
67 """
68 This wraps the parts of the build service
69 that has been implemented in libpakfire.
70 """
71 def __init__(self):
72 super().__init__()
73
74 # Initialise the HTTP client
75 self.client = tornado.httpclient.AsyncHTTPClient()
76
77 async def _socket(self, path, **kwargs):
78 return await self._request("GET", path,
79
80 # Enable websocket and ping once every ten seconds
81 websocket=True,
82 websocket_ping_interval=10,
83 websocket_ping_timeout=60,
84
85 **kwargs,
86 )
87
88 async def _request(self, method, path, websocket=False, websocket_ping_interval=None,
89 websocket_ping_timeout=None, authenticate=True,
90 body=None, body_producer=None, on_message_callback=None, **kwargs):
91 headers = {}
92 query_args = {}
93
94 # Make absolute URL
95 url = urllib.parse.urljoin(self.url, path)
96
97 # Change scheme for websocket
98 if websocket and url.startswith("https://"):
99 url = url.replace("https://", "wss://")
100
101 # Filter all query arguments
102 for arg in kwargs:
103 # Skip anything that is None
104 if kwargs[arg] is None:
105 continue
106
107 # Add to query arguments
108 query_args[arg] = kwargs[arg]
109
110 # Encode query arguments
111 query_args = urllib.parse.urlencode(query_args, doseq=True)
112
113 # Add query arguments
114 if method in ("GET", "PUT", "DELETE"):
115 url = "%s?%s" % (url, query_args)
116
117 # Add any arguments to the body
118 elif method == "POST":
119 if body is None:
120 body = query_args
121
122 # Perform Kerberos authentication
123 if authenticate:
124 krb5_context = self._setup_krb5_context(url)
125
126 # Fetch the Kerberos client response
127 krb5_client_response = kerberos.authGSSClientResponse(krb5_context)
128
129 # Set the Negotiate header
130 headers |= {
131 "Authorization" : "Negotiate %s" % krb5_client_response,
132 }
133
134 # Make the request
135 req = tornado.httpclient.HTTPRequest(
136 method=method, url=url, headers=headers, body=body,
137
138 # Give the server more time to respond
139 request_timeout=60,
140
141 # Add all the rest
142 body_producer=body_producer,
143 )
144
145 # Is this a web socket request?
146 if websocket:
147 return await tornado.websocket.websocket_connect(
148 req,
149 ping_interval=websocket_ping_interval,
150 ping_timeout=websocket_ping_timeout,
151 on_message_callback=on_message_callback,
152 )
153
154 # Send the request and wait for a response
155 try:
156 res = await self.client.fetch(req)
157
158 # Catch any HTTP errors
159 except tornado.httpclient.HTTPError as e:
160 if e.code in (502, 503):
161 raise TemporaryConnectionError from e
162
163 # Re-raise anything else
164 raise e
165
166 # Perform mutual authentication
167 if authenticate:
168 for header in res.headers.get_list("WWW-Authenticate"):
169 # Skip anything that isn't a Negotiate header
170 if not header.startswith("Negotiate "):
171 continue
172
173 # Fetch the server response
174 krb5_server_response = header.removeprefix("Negotiate ")
175
176 # Validate the server response
177 result = kerberos.authGSSClientStep(krb5_context, krb5_server_response)
178 if not result == kerberos.AUTH_GSS_COMPLETE:
179 raise AuthError("Could not verify the Kerberos server response")
180
181 log.debug("Kerberos Server Response validating succeeded")
182
183 # Call this so that we won't end in the else block
184 break
185
186 # If there were no headers
187 else:
188 raise AuthError("Mutual authentication failed")
189
190 # Decode JSON response
191 if res.body:
192 return json.loads(res.body)
193
194 # Empty response
195 return {}
196
197 async def _proxy(self, cls, *args, **kwargs):
198 conn = cls(self, *args, **kwargs)
199
200 # Create the initial connection
201 await conn.reconnect()
202
203 return conn
204
205 def _setup_krb5_context(self, url):
206 """
207 Creates the Kerberos context that can be used to perform client
208 authentication against the server, and mutual authentication for the server.
209 """
210 # Parse the input URL
211 url = urllib.parse.urlparse(url)
212
213 # Create a new client context
214 result, krb5_context = kerberos.authGSSClientInit("HTTP@%s" % url.hostname)
215
216 if not result == kerberos.AUTH_GSS_COMPLETE:
217 raise AuthError("Could not create Kerberos Client context")
218
219 # Next step...
220 try:
221 result = kerberos.authGSSClientStep(krb5_context, "")
222
223 except kerberos.GSSError as e:
224 log.error("Kerberos authentication failed: %s" % e)
225
226 raise AuthError("%s" % e) from e
227
228 if not result == kerberos.AUTH_GSS_CONTINUE:
229 raise AuthError("Cloud not continue Kerberos authentication")
230
231 return krb5_context
232
233 # Builder
234
235 async def control(self, *args, **kwargs):
236 """
237 Creates a control connection
238 """
239 return await self._proxy(ControlConnection, *args, **kwargs)
240
241 async def job(self, *args, **kwargs):
242 """
243 Creates a control connection for a certain job
244 """
245 return await self._proxy(JobControlConnection, *args, **kwargs)
246
247
248 class Connection(object):
249 def __init__(self, service, *args, **kwargs):
250 self.service = service
251
252 # The active connection
253 self.conn = None
254
255 # Callbacks
256 self.callbacks = {}
257
258 # Perform custom initialization
259 self.init(*args, **kwargs)
260
261 def init(self, *args, **kwargs):
262 pass
263
264 @property
265 def url(self):
266 raise NotImplementedError
267
268 async def connect(self):
269 """
270 This will create a connection
271 """
272 return await self.service._socket(self.url,
273 on_message_callback=self.on_message_callback)
274
275 async def reconnect(self):
276 """
277 Tries to reconnect for forever
278 """
279 attempts = 0
280
281 while True:
282 attempts += 1
283
284 log.debug("Trying to reconnect (attempt %s)..." % attempts)
285
286 try:
287 self.conn = await self.connect()
288
289 # The web service responded with some error
290 except tornado.httpclient.HTTPClientError as e:
291 log.error("%s: Received HTTP Error %s" % (self.url, e.code))
292
293 # 502 - Proxy Error
294 # 503 - Service Unavailable
295 if e.code in (502, 503):
296 await asyncio.sleep(10)
297
298 # Raise any unhandled errors
299 else:
300 raise e
301
302 # The web service did not respond in time
303 except tornado.simple_httpclient.HTTPTimeoutError as e:
304 await asyncio.sleep(30)
305
306 # Raise all other exceptions
307 except Exception as e:
308 raise e
309
310 # If the connection was established successfully, we return
311 else:
312 return
313
314 def close(self):
315 """
316 Closes the connection
317 """
318 if self.conn:
319 self.conn.close()
320
321 def on_message_callback(self, message):
322 # Fail if no callbacks have been set
323 if not self.callbacks:
324 raise NotImplementedError
325
326 # Decode the message
327 message = self._decode_json_message(message)
328
329 # Ignore empty messages
330 if message is None:
331 return
332
333 # Log the received message
334 log.debug("Received message:\n%s" % json.dumps(message, indent=4))
335
336 # Fetch the message type & data
337 type = message.get("type")
338 data = message.get("data")
339
340 # Find a suitable callback
341 try:
342 callback = self.callbacks[type]
343
344 # Log an error for unknown messages and ignore them
345 except KeyError:
346 log.error("Received message of unknown type '%s'" % type)
347 return
348
349 # Call the callback
350 callback(data)
351
352 @staticmethod
353 def _decode_json_message(message):
354 """
355 Takes a received message and decodes it
356 """
357 # Ignore empty messages
358 if message is None:
359 return
360
361 try:
362 message = json.loads(message)
363 except json.JSONDecodeError:
364 log.error("Could not decode JSON message:\n%s" % message)
365 return
366
367 return message
368
369 async def write_message(self, message, **kwargs):
370 """
371 Sends a message but encodes it into JSON first
372 """
373 # This should never happen
374 if not self.conn:
375 raise RuntimeError("Not connected")
376
377 if isinstance(message, dict):
378 message = tornado.escape.json_encode(message)
379
380 try:
381 return await self.conn.write_message(message, **kwargs)
382
383 except tornado.websocket.WebSocketClosedError as e:
384 # Try to reconnect
385 await self.reconnect()
386
387 # Try to send the message again
388 return await self.write_message(message, **kwargs)
389
390
391 class ControlConnection(Connection):
392 url = "/api/v1/builders/control"
393
394 def init(self, daemon):
395 self.daemon = daemon
396
397 # Callbacks
398 self.callbacks = {
399 "job" : self.daemon.job_received,
400 }
401
402
403 class JobControlConnection(Connection):
404 """
405 Proxy for Build Jobs
406 """
407 def init(self, id, worker):
408 self.id = id
409
410 # Callbacks
411 self.callbacks = {
412 "abort" : worker.abort,
413 }
414
415 @property
416 def url(self):
417 return "/api/v1/jobs/%s" % self.id
418
419 async def finished(self, success, packages=None, logfile=None):
420 """
421 Will tell the hub that a job has finished
422 """
423 # Upload the log file
424 if logfile:
425 logfile = await self.service.upload(logfile, filename="%s.log" % self.id)
426
427 # Upload the packages
428 if packages:
429 for package in packages:
430 await self.service.upload(package)
431
432 while True:
433 try:
434 # Send the request
435 response = await self.service._request("POST", "/api/v1/jobs/%s/finished" % self.id,
436 success="1" if success else "0", logfile=logfile, packages=packages,
437 )
438
439 # Try again after a short moment on connection errors
440 except TemporaryConnectionError as e:
441 await asyncio.sleep(5)
442
443 else:
444 break
445
446 # Handle the response
447 # XXX TODO
448
449 async def log(self, timestamp, level, message):
450 """
451 Sends a log message to the hub
452 """
453 await self.write_message({
454 "type" : "log",
455 "data" : {
456 "timestamp" : timestamp,
457 "level" : level,
458 "message" : message,
459 },
460 })