]>
git.ipfire.org Git - people/ms/pakfire.git/blob - src/pakfire/buildservice.py
1 ###############################################################################
3 # Pakfire - The IPFire package management system #
4 # Copyright (C) 2023 Pakfire development team #
6 # This program is free software: you can redistribute it and/or modify #
7 # it under the terms of the GNU General Public License as published by #
8 # the Free Software Foundation, either version 3 of the License, or #
9 # (at your option) any later version. #
11 # This program is distributed in the hope that it will be useful, #
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of #
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
14 # GNU General Public License for more details. #
16 # You should have received a copy of the GNU General Public License #
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. #
19 ###############################################################################
28 import tornado
.httpclient
29 import tornado
.simple_httpclient
30 import tornado
.websocket
33 from .__version
__ import PAKFIRE_VERSION
34 from . import _pakfire
37 # Configure some useful defaults for all requests
38 tornado
.httpclient
.AsyncHTTPClient
.configure(
40 "user_agent" : "pakfire/%s" % PAKFIRE_VERSION
,
45 log
= logging
.getLogger("pakfire.buildservice")
47 class AuthError(Exception):
49 Raised when the client could not authenticate against the build service
54 class TransportError(Exception):
58 class TemporaryConnectionError(TransportError
):
60 Raised when there is a temporary connection issue and
61 the request should be tried again.
66 class BuildService(_pakfire
.BuildService
):
68 This wraps the parts of the build service
69 that has been implemented in libpakfire.
74 # Initialise the HTTP client
75 self
.client
= tornado
.httpclient
.AsyncHTTPClient()
77 async def _socket(self
, path
, **kwargs
):
78 return await self
._request
("GET", path
,
80 # Enable websocket and ping once every ten seconds
82 websocket_ping_interval
=10,
83 websocket_ping_timeout
=60,
88 async def _request(self
, method
, path
, websocket
=False, websocket_ping_interval
=None,
89 websocket_ping_timeout
=None, authenticate
=True,
90 body
=None, body_producer
=None, on_message_callback
=None, **kwargs
):
95 url
= urllib
.parse
.urljoin(self
.url
, path
)
97 # Change scheme for websocket
98 if websocket
and url
.startswith("https://"):
99 url
= url
.replace("https://", "wss://")
101 # Filter all query arguments
103 # Skip anything that is None
104 if kwargs
[arg
] is None:
107 # Add to query arguments
108 query_args
[arg
] = kwargs
[arg
]
110 # Encode query arguments
111 query_args
= urllib
.parse
.urlencode(query_args
, doseq
=True)
113 # Add query arguments
114 if method
in ("GET", "PUT", "DELETE"):
115 url
= "%s?%s" % (url
, query_args
)
117 # Add any arguments to the body
118 elif method
== "POST":
122 # Perform Kerberos authentication
124 krb5_context
= self
._setup
_krb
5_context
(url
)
126 # Fetch the Kerberos client response
127 krb5_client_response
= kerberos
.authGSSClientResponse(krb5_context
)
129 # Set the Negotiate header
131 "Authorization" : "Negotiate %s" % krb5_client_response
,
135 req
= tornado
.httpclient
.HTTPRequest(
136 method
=method
, url
=url
, headers
=headers
, body
=body
,
138 # Give the server more time to respond
142 body_producer
=body_producer
,
145 # Is this a web socket request?
147 return await tornado
.websocket
.websocket_connect(
149 ping_interval
=websocket_ping_interval
,
150 ping_timeout
=websocket_ping_timeout
,
151 on_message_callback
=on_message_callback
,
154 # Send the request and wait for a response
156 res
= await self
.client
.fetch(req
)
158 # Catch any HTTP errors
159 except tornado
.httpclient
.HTTPError
as e
:
160 if e
.code
in (502, 503):
161 raise TemporaryConnectionError
from e
163 # Re-raise anything else
166 # Perform mutual authentication
168 for header
in res
.headers
.get_list("WWW-Authenticate"):
169 # Skip anything that isn't a Negotiate header
170 if not header
.startswith("Negotiate "):
173 # Fetch the server response
174 krb5_server_response
= header
.removeprefix("Negotiate ")
176 # Validate the server response
177 result
= kerberos
.authGSSClientStep(krb5_context
, krb5_server_response
)
178 if not result
== kerberos
.AUTH_GSS_COMPLETE
:
179 raise AuthError("Could not verify the Kerberos server response")
181 log
.debug("Kerberos Server Response validating succeeded")
183 # Call this so that we won't end in the else block
186 # If there were no headers
188 raise AuthError("Mutual authentication failed")
190 # Decode JSON response
192 return json
.loads(res
.body
)
197 async def _proxy(self
, cls
, *args
, **kwargs
):
198 conn
= cls(self
, *args
, **kwargs
)
200 # Create the initial connection
201 await conn
.reconnect()
205 def _setup_krb5_context(self
, url
):
207 Creates the Kerberos context that can be used to perform client
208 authentication against the server, and mutual authentication for the server.
210 # Parse the input URL
211 url
= urllib
.parse
.urlparse(url
)
213 # Create a new client context
214 result
, krb5_context
= kerberos
.authGSSClientInit("HTTP@%s" % url
.hostname
)
216 if not result
== kerberos
.AUTH_GSS_COMPLETE
:
217 raise AuthError("Could not create Kerberos Client context")
221 result
= kerberos
.authGSSClientStep(krb5_context
, "")
223 except kerberos
.GSSError
as e
:
224 log
.error("Kerberos authentication failed: %s" % e
)
226 raise AuthError("%s" % e
) from e
228 if not result
== kerberos
.AUTH_GSS_CONTINUE
:
229 raise AuthError("Cloud not continue Kerberos authentication")
235 async def control(self
, *args
, **kwargs
):
237 Creates a control connection
239 return await self
._proxy
(ControlConnection
, *args
, **kwargs
)
241 async def job(self
, *args
, **kwargs
):
243 Creates a control connection for a certain job
245 return await self
._proxy
(JobControlConnection
, *args
, **kwargs
)
248 class Connection(object):
249 def __init__(self
, service
, *args
, **kwargs
):
250 self
.service
= service
252 # The active connection
258 # Perform custom initialization
259 self
.init(*args
, **kwargs
)
261 def init(self
, *args
, **kwargs
):
266 raise NotImplementedError
268 async def connect(self
):
270 This will create a connection
272 return await self
.service
._socket
(self
.url
,
273 on_message_callback
=self
.on_message_callback
)
275 async def reconnect(self
):
277 Tries to reconnect for forever
284 log
.debug("Trying to reconnect (attempt %s)..." % attempts
)
287 self
.conn
= await self
.connect()
289 # The web service responded with some error
290 except tornado
.httpclient
.HTTPClientError
as e
:
291 log
.error("%s: Received HTTP Error %s" % (self
.url
, e
.code
))
294 # 503 - Service Unavailable
295 if e
.code
in (502, 503):
296 await asyncio
.sleep(10)
298 # Raise any unhandled errors
302 # The web service did not respond in time
303 except tornado
.simple_httpclient
.HTTPTimeoutError
as e
:
304 await asyncio
.sleep(30)
306 # Raise all other exceptions
307 except Exception as e
:
310 # If the connection was established successfully, we return
316 Closes the connection
321 def on_message_callback(self
, message
):
322 # Fail if no callbacks have been set
323 if not self
.callbacks
:
324 raise NotImplementedError
327 message
= self
._decode
_json
_message
(message
)
329 # Ignore empty messages
333 # Log the received message
334 log
.debug("Received message:\n%s" % json
.dumps(message
, indent
=4))
336 # Fetch the message type & data
337 type = message
.get("type")
338 data
= message
.get("data")
340 # Find a suitable callback
342 callback
= self
.callbacks
[type]
344 # Log an error for unknown messages and ignore them
346 log
.error("Received message of unknown type '%s'" % type)
353 def _decode_json_message(message
):
355 Takes a received message and decodes it
357 # Ignore empty messages
362 message
= json
.loads(message
)
363 except json
.JSONDecodeError
:
364 log
.error("Could not decode JSON message:\n%s" % message
)
369 async def write_message(self
, message
, **kwargs
):
371 Sends a message but encodes it into JSON first
373 # This should never happen
375 raise RuntimeError("Not connected")
377 if isinstance(message
, dict):
378 message
= tornado
.escape
.json_encode(message
)
381 return await self
.conn
.write_message(message
, **kwargs
)
383 except tornado
.websocket
.WebSocketClosedError
as e
:
385 await self
.reconnect()
387 # Try to send the message again
388 return await self
.write_message(message
, **kwargs
)
391 class ControlConnection(Connection
):
392 url
= "/api/v1/builders/control"
394 def init(self
, daemon
):
399 "job" : self
.daemon
.job_received
,
403 class JobControlConnection(Connection
):
407 def init(self
, id, worker
):
412 "abort" : worker
.abort
,
417 return "/api/v1/jobs/%s" % self
.id
419 async def finished(self
, success
, packages
=None, logfile
=None):
421 Will tell the hub that a job has finished
423 # Upload the log file
425 logfile
= await self
.service
.upload(logfile
, filename
="%s.log" % self
.id)
427 # Upload the packages
429 for package
in packages
:
430 await self
.service
.upload(package
)
435 response
= await self
.service
._request
("POST", "/api/v1/jobs/%s/finished" % self
.id,
436 success
="1" if success
else "0", logfile
=logfile
, packages
=packages
,
439 # Try again after a short moment on connection errors
440 except TemporaryConnectionError
as e
:
441 await asyncio
.sleep(5)
446 # Handle the response
449 async def log(self
, timestamp
, level
, message
):
451 Sends a log message to the hub
453 await self
.write_message({
456 "timestamp" : timestamp
,