]> git.ipfire.org Git - people/stevee/pakfire.git/blob - python/pakfire/transport.py
transport: Catch 503 Bad Gateway HTTP errors.
[people/stevee/pakfire.git] / python / pakfire / transport.py
1 #!/usr/bin/python
2 ###############################################################################
3 # #
4 # Pakfire - The IPFire package management system #
5 # Copyright (C) 2013 Pakfire development team #
6 # #
7 # This program is free software: you can redistribute it and/or modify #
8 # it under the terms of the GNU General Public License as published by #
9 # the Free Software Foundation, either version 3 of the License, or #
10 # (at your option) any later version. #
11 # #
12 # This program is distributed in the hope that it will be useful, #
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of #
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
15 # GNU General Public License for more details. #
16 # #
17 # You should have received a copy of the GNU General Public License #
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. #
19 # #
20 ###############################################################################
21
22 from __future__ import division
23
24 import base64
25 import hashlib
26 import json
27 import os
28 import time
29 import urlgrabber
30 import urllib
31 import urlparse
32
33 import pakfire.downloader
34 import pakfire.util
35
36 from pakfire.constants import *
37 from pakfire.i18n import _
38
39 import logging
40 log = logging.getLogger("pakfire.transport")
41
42
43 class PakfireHubTransportUploader(object):
44 """
45 Handles the upload of a single file to the hub.
46 """
47
48 def __init__(self, transport, filename):
49 self.transport = transport
50 self.filename = filename
51
52 def get_upload_id(self):
53 """
54 Gets an upload from the pakfire hub.
55 """
56 # Calculate the SHA1 sum of the file to upload.
57 h = hashlib.new("sha1")
58 with open(self.filename, "rb") as f:
59 while True:
60 buf = f.read(CHUNK_SIZE)
61 if not buf:
62 break
63
64 h.update(buf)
65
66 data = {
67 "filename" : os.path.basename(self.filename),
68 "filesize" : os.path.getsize(self.filename),
69 "hash" : h.hexdigest(),
70 }
71
72 upload_id = self.transport.get("/uploads/create", data=data)
73 log.debug("Got upload id: %s" % upload_id)
74
75 return upload_id
76
77 def send_file(self, upload_id, progress_callback=None):
78 """
79 Sends the file content to the server.
80
81 The data is splitted into chunks, which are
82 sent one after an other.
83 """
84 with open(self.filename, "rb") as f:
85 # Initial chunk size.
86 chunk_size = CHUNK_SIZE
87
88 # Count the already transmitted bytes.
89 transferred = 0
90
91 while True:
92 chunk = f.read(chunk_size)
93 if not chunk:
94 break
95
96 log.debug("Got chunk of %s bytes" % len(chunk))
97
98 # Save the time when we started to send this bit.
99 time_started = time.time()
100
101 # Send the chunk to the server.
102 self.send_chunk(upload_id, chunk)
103
104 # Save the duration.time after the chunk has been transmitted
105 # and adjust chunk size to send one chunk per second.
106 duration = time.time() - time_started
107 chunk_size = int(chunk_size / duration)
108
109 # Never let chunk_size drop under CHUNK_SIZE:
110 if chunk_size < CHUNK_SIZE:
111 chunk_size = CHUNK_SIZE
112
113 # Add up the send amount of data.
114 transferred += len(chunk)
115 if progress_callback:
116 progress_callback(transferred)
117
118 def send_chunk(self, upload_id, data):
119 """
120 Sends a piece of the file to the server.
121 """
122 # Calculate checksum over the chunk data.
123 h = hashlib.new("sha512")
124 h.update(data)
125 chksum = h.hexdigest()
126
127 # Encode data in base64.
128 data = base64.b64encode(data)
129
130 # Send chunk data to the server.
131 self.transport.post("/uploads/%s/sendchunk" % upload_id,
132 data={ "chksum" : chksum, "data" : data })
133
134 def destroy_upload(self, upload_id):
135 """
136 Destroys the upload on the server.
137 """
138 self.transport.get("/uploads/%s/destroy" % upload_id)
139
140 def finish_upload(self, upload_id):
141 """
142 Signals to the server, that the upload has finished.
143 """
144 self.transport.get("/uploads/%s/finished" % upload_id)
145
146 def run(self):
147 upload_id = None
148
149 # Create a progress bar.
150 progress = pakfire.util.make_progress(
151 os.path.basename(self.filename), os.path.getsize(self.filename), speed=True, eta=True,
152 )
153
154 try:
155 # Get an upload ID.
156 upload_id = self.get_upload_id()
157
158 # Send the file content.
159 if progress:
160 self.send_file(upload_id, progress_callback=progress.update)
161 else:
162 self.send_file(upload_id)
163
164 except:
165 if progress:
166 progress.finish()
167
168 # Remove broken upload from server.
169 if upload_id:
170 self.destroy_upload(upload_id)
171
172 # XXX catch fatal errors
173 raise
174
175 else:
176 if progress:
177 progress.finish()
178
179 # If no exception was raised, the upload
180 # has finished.
181 self.finish_upload(upload_id)
182
183 # Return the upload id so some code can actually do something
184 # with the file on the server.
185 return upload_id
186
187
188 class PakfireHubTransport(object):
189 """
190 Connection to the pakfire hub.
191 """
192
193 def __init__(self, config):
194 self.config = config
195
196 # Create connection to the hub.
197 self.grabber = pakfire.downloader.PakfireGrabber(
198 self.config, prefix=self.url,
199 )
200
201 @property
202 def url(self):
203 """
204 Construct a right URL out of the given
205 server, username and password.
206
207 Basicly this just adds the credentials
208 to the URL.
209 """
210 # Get credentials.
211 server, username, password = self.config.get_hub_credentials()
212
213 # Parse the given URL.
214 url = urlparse.urlparse(server)
215 assert url.scheme in ("http", "https")
216
217 # Build new URL.
218 ret = "%s://" % url.scheme
219
220 # Add credentials if provided.
221 if username and password:
222 ret += "%s:%s@" % (username, password)
223
224 # Add path components.
225 ret += url.netloc
226
227 return ret
228
229 def one_request(self, url, **kwargs):
230 try:
231 return self.grabber.urlread(url, **kwargs)
232
233 except urlgrabber.grabber.URLGrabError, e:
234 # Timeout
235 if e.errno == 12:
236 raise TransportConnectionTimeoutError, e
237
238 # Handle common HTTP errors
239 elif e.errno == 14:
240 # Connection errors
241 if e.code == 5:
242 raise TransportConnectionProxyError, url
243 elif e.code == 6:
244 raise TransportConnectionDNSError, url
245 elif e.code == 7:
246 raise TransportConnectionResetError, url
247 elif e.code == 23:
248 raise TransportConnectionWriteError, url
249 elif e.code == 26:
250 raise TransportConnectionReadError, url
251
252 # SSL errors
253 elif e.code == 52:
254 raise TransportSSLCertificateExpiredError, url
255
256 # HTTP error codes
257 elif e.code == 403:
258 raise TransportForbiddenError, url
259 elif e.code == 404:
260 raise TransportNotFoundError, url
261 elif e.code == 500:
262 raise TransportInternalServerError, url
263 elif e.code == 503:
264 raise TransportBadGatewayError, url
265 elif e.code == 504:
266 raise TransportConnectionTimeoutError, url
267
268 # All other exceptions...
269 raise
270
271 def request(self, url, tries=None, **kwargs):
272 # tries = None implies wait infinitely
273
274 while tries or tries is None:
275 if tries:
276 tries -= 1
277
278 try:
279 return self.one_request(url, **kwargs)
280
281 # 500 - Internal Server Error
282 except TransportInternalServerError, e:
283 log.exception("%s" % e.__class__.__name__)
284
285 # Wait a minute before trying again.
286 time.sleep(60)
287
288 # Retry on connection problems.
289 except TransportConnectionError, e:
290 log.exception("%s" % e.__class__.__name__)
291
292 # Wait for 10 seconds.
293 time.sleep(10)
294
295 raise TransportMaxTriesExceededError
296
297 def escape_args(self, **kwargs):
298 return urllib.urlencode(kwargs)
299
300 def get(self, url, data={}, **kwargs):
301 """
302 Sends a HTTP GET request to the given URL.
303
304 All given keyword arguments are considered as form data.
305 """
306 params = self.escape_args(**data)
307
308 if params:
309 url = "%s?%s" % (url, params)
310
311 return self.request(url, **kwargs)
312
313 def post(self, url, data={}, **kwargs):
314 """
315 Sends a HTTP POST request to the given URL.
316
317 All keyword arguments are considered as form data.
318 """
319 params = self.escape_args(**data)
320 if params:
321 kwargs.update({
322 "data" : params,
323 })
324
325 return self.request(url, **kwargs)
326
327 def upload_file(self, filename):
328 """
329 Uploads the given file to the server.
330 """
331 uploader = PakfireHubTransportUploader(self, filename)
332 upload_id = uploader.run()
333
334 return upload_id
335
336 def get_json(self, *args, **kwargs):
337 res = self.get(*args, **kwargs)
338
339 # Decode JSON.
340 if res:
341 return json.loads(res)
342
343 ### Misc. actions
344
345 def noop(self):
346 """
347 No operation. Just to check if the connection is
348 working. Returns a random number.
349 """
350 return self.get("/noop")
351
352 def test_code(self, error_code):
353 assert error_code >= 100 and error_code <= 999
354
355 self.get("/error/test/%s" % error_code)
356
357 # Build actions
358
359 def build_create(self, filename, build_type, arches=None, distro=None):
360 """
361 Create a new build on the hub.
362 """
363 assert build_type in ("scratch", "release")
364
365 # XXX Check for permission to actually create a build.
366
367 # Upload the source file to the server.
368 upload_id = self.upload_file(filename)
369
370 data = {
371 "arches" : ",".join(arches or []),
372 "build_type" : build_type,
373 "distro" : distro or "",
374 "upload_id" : upload_id,
375 }
376
377 # Then create the build.
378 build_id = self.get("/builds/create", data=data)
379
380 return build_id or None
381
382 def build_get(self, build_uuid):
383 return self.get_json("/builds/%s" % build_uuid)
384
385 # Job actions
386
387 def job_get(self, job_uuid):
388 return self.get_json("/jobs/%s" % job_uuid)
389
390 # Package actions
391
392 def package_get(self, package_uuid):
393 return self.get_json("/packages/%s" % package_uuid)