]>
git.ipfire.org Git - people/ms/pakfire.git/blob - src/pakfire/buildservice.py
1 ###############################################################################
3 # Pakfire - The IPFire package management system #
4 # Copyright (C) 2023 Pakfire development team #
6 # This program is free software: you can redistribute it and/or modify #
7 # it under the terms of the GNU General Public License as published by #
8 # the Free Software Foundation, either version 3 of the License, or #
9 # (at your option) any later version. #
11 # This program is distributed in the hope that it will be useful, #
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of #
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
14 # GNU General Public License for more details. #
16 # You should have received a copy of the GNU General Public License #
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. #
19 ###############################################################################
29 import tornado
.httpclient
30 import tornado
.simple_httpclient
31 import tornado
.websocket
34 from .__version
__ import PAKFIRE_VERSION
35 from . import _pakfire
38 # Configure some useful defaults for all requests
39 tornado
.httpclient
.AsyncHTTPClient
.configure(
41 "user_agent" : "pakfire/%s" % PAKFIRE_VERSION
,
46 log
= logging
.getLogger("pakfire.buildservice")
48 class AuthError(Exception):
50 Raised when the client could not authenticate against the build service
55 class TransportError(Exception):
59 class TemporaryConnectionError(TransportError
):
61 Raised when there is a temporary connection issue and
62 the request should be tried again.
67 class BuildService(_pakfire
.BuildService
):
69 This wraps the parts of the build service
70 that has been implemented in libpakfire.
75 # Initialise the HTTP client
76 self
.client
= tornado
.httpclient
.AsyncHTTPClient()
78 async def _socket(self
, path
, **kwargs
):
79 return await self
._request
("GET", path
,
81 # Enable websocket and ping once every ten seconds
83 websocket_ping_interval
=10,
84 websocket_ping_timeout
=60,
89 async def _request(self
, method
, path
, websocket
=False, websocket_ping_interval
=None,
90 websocket_ping_timeout
=None, authenticate
=True,
91 body
=None, body_producer
=None, on_message_callback
=None, **kwargs
):
96 url
= urllib
.parse
.urljoin(self
.url
, path
)
98 # Change scheme for websocket
99 if websocket
and url
.startswith("https://"):
100 url
= url
.replace("https://", "wss://")
102 # Filter all query arguments
104 # Skip anything that is None
105 if kwargs
[arg
] is None:
108 # Add to query arguments
109 query_args
[arg
] = kwargs
[arg
]
111 # Encode query arguments
112 query_args
= urllib
.parse
.urlencode(query_args
, doseq
=True)
114 # Add query arguments
115 if method
in ("GET", "PUT", "DELETE"):
116 url
= "%s?%s" % (url
, query_args
)
118 # Add any arguments to the body
119 elif method
== "POST":
123 # Perform Kerberos authentication
125 krb5_context
= self
._setup
_krb
5_context
(url
)
127 # Fetch the Kerberos client response
128 krb5_client_response
= kerberos
.authGSSClientResponse(krb5_context
)
130 # Set the Negotiate header
132 "Authorization" : "Negotiate %s" % krb5_client_response
,
136 req
= tornado
.httpclient
.HTTPRequest(
137 method
=method
, url
=url
, headers
=headers
, body
=body
,
139 # Give the server more time to respond
143 body_producer
=body_producer
,
146 # Is this a web socket request?
148 return await tornado
.websocket
.websocket_connect(
150 ping_interval
=websocket_ping_interval
,
151 ping_timeout
=websocket_ping_timeout
,
152 on_message_callback
=on_message_callback
,
155 # Send the request and wait for a response
157 res
= await self
.client
.fetch(req
)
159 # Catch any HTTP errors
160 except tornado
.httpclient
.HTTPError
as e
:
161 if e
.code
in (502, 503):
162 raise TemporaryConnectionError
from e
164 # Re-raise anything else
167 # Perform mutual authentication
169 for header
in res
.headers
.get_list("WWW-Authenticate"):
170 # Skip anything that isn't a Negotiate header
171 if not header
.startswith("Negotiate "):
174 # Fetch the server response
175 krb5_server_response
= header
.removeprefix("Negotiate ")
177 # Validate the server response
178 result
= kerberos
.authGSSClientStep(krb5_context
, krb5_server_response
)
179 if not result
== kerberos
.AUTH_GSS_COMPLETE
:
180 raise AuthError("Could not verify the Kerberos server response")
182 log
.debug("Kerberos Server Response validating succeeded")
184 # Call this so that we won't end in the else block
187 # If there were no headers
189 raise AuthError("Mutual authentication failed")
191 # Decode JSON response
193 return json
.loads(res
.body
)
198 async def _proxy(self
, cls
, *args
, **kwargs
):
199 conn
= cls(self
, *args
, **kwargs
)
201 # Create the initial connection
202 await conn
.reconnect()
206 def _setup_krb5_context(self
, url
):
208 Creates the Kerberos context that can be used to perform client
209 authentication against the server, and mutual authentication for the server.
211 # Parse the input URL
212 url
= urllib
.parse
.urlparse(url
)
214 # Create a new client context
215 result
, krb5_context
= kerberos
.authGSSClientInit("HTTP@%s" % url
.hostname
)
217 if not result
== kerberos
.AUTH_GSS_COMPLETE
:
218 raise AuthError("Could not create Kerberos Client context")
222 result
= kerberos
.authGSSClientStep(krb5_context
, "")
224 except kerberos
.GSSError
as e
:
225 log
.error("Kerberos authentication failed: %s" % e
)
227 raise AuthError("%s" % e
) from e
229 if not result
== kerberos
.AUTH_GSS_CONTINUE
:
230 raise AuthError("Cloud not continue Kerberos authentication")
236 async def control(self
, *args
, **kwargs
):
238 Creates a control connection
240 return await self
._proxy
(ControlConnection
, *args
, **kwargs
)
242 async def job(self
, *args
, **kwargs
):
244 Creates a control connection for a certain job
246 return await self
._proxy
(JobControlConnection
, *args
, **kwargs
)
249 class Connection(object):
250 def __init__(self
, service
, *args
, **kwargs
):
251 self
.service
= service
253 # The active connection
259 # Perform custom initialization
260 self
.init(*args
, **kwargs
)
262 def init(self
, *args
, **kwargs
):
267 raise NotImplementedError
269 async def connect(self
):
271 This will create a connection
273 return await self
.service
._socket
(self
.url
,
274 on_message_callback
=self
.on_message_callback
)
276 async def reconnect(self
):
278 Tries to reconnect for forever
285 log
.debug("Trying to reconnect (attempt %s)..." % attempts
)
288 self
.conn
= await self
.connect()
290 # The web service responded with some error
291 except tornado
.httpclient
.HTTPClientError
as e
:
292 log
.error("%s: Received HTTP Error %s" % (self
.url
, e
.code
))
295 # 503 - Service Unavailable
296 if e
.code
in (502, 503):
297 await asyncio
.sleep(10)
299 # Raise any unhandled errors
303 # The web service did not respond in time
304 except tornado
.simple_httpclient
.HTTPTimeoutError
as e
:
305 await asyncio
.sleep(30)
307 # Raise all other exceptions
308 except Exception as e
:
311 # If the connection was established successfully, we return
317 Closes the connection
322 def on_message_callback(self
, message
):
323 # Fail if no callbacks have been set
324 if not self
.callbacks
:
325 raise NotImplementedError
328 message
= self
._decode
_json
_message
(message
)
330 # Ignore empty messages
334 # Log the received message
335 log
.debug("Received message:\n%s" % json
.dumps(message
, indent
=4))
337 # Fetch the message type & data
338 type = message
.get("type")
339 data
= message
.get("data")
341 # Find a suitable callback
343 callback
= self
.callbacks
[type]
345 # Log an error for unknown messages and ignore them
347 log
.error("Received message of unknown type '%s'" % type)
354 def _decode_json_message(message
):
356 Takes a received message and decodes it
358 # Ignore empty messages
363 message
= json
.loads(message
)
364 except json
.JSONDecodeError
:
365 log
.error("Could not decode JSON message:\n%s" % message
)
370 async def write_message(self
, message
, **kwargs
):
372 Sends a message but encodes it into JSON first
374 # This should never happen
376 raise RuntimeError("Not connected")
378 if isinstance(message
, dict):
379 message
= tornado
.escape
.json_encode(message
)
382 return await self
.conn
.write_message(message
, **kwargs
)
384 except tornado
.websocket
.WebSocketClosedError
as e
:
386 await self
.reconnect()
388 # Try to send the message again
389 return await self
.write_message(message
, **kwargs
)
392 class ControlConnection(Connection
):
393 url
= "/api/v1/builders/control"
395 def init(self
, daemon
):
400 "job" : self
.daemon
.job_received
,
404 class JobControlConnection(Connection
):
408 def init(self
, id, worker
):
413 "abort" : worker
.abort
,
418 return "/api/v1/jobs/%s" % self
.id
420 async def finished(self
, success
, packages
=None, logfile
=None):
422 Will tell the hub that a job has finished
424 # Upload the log file
426 logfile
= await self
.service
.upload(logfile
, filename
="%s.log" % self
.id)
428 # Upload the packages
430 for package
in packages
:
431 await self
.service
.upload(package
)
434 self
.service
.job_finished(self
.id, success
=success
, logfile
=logfile
, packages
=packages
)
436 async def log(self
, timestamp
, level
, message
):
438 Sends a log message to the hub
440 await self
.write_message({
443 "timestamp" : timestamp
,