]> git.ipfire.org Git - people/ms/pakfire.git/blob - python/pakfire/transport.py
daemon: Improve killing the keepalive process.
[people/ms/pakfire.git] / python / pakfire / transport.py
1 #!/usr/bin/python
2 ###############################################################################
3 # #
4 # Pakfire - The IPFire package management system #
5 # Copyright (C) 2013 Pakfire development team #
6 # #
7 # This program is free software: you can redistribute it and/or modify #
8 # it under the terms of the GNU General Public License as published by #
9 # the Free Software Foundation, either version 3 of the License, or #
10 # (at your option) any later version. #
11 # #
12 # This program is distributed in the hope that it will be useful, #
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of #
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
15 # GNU General Public License for more details. #
16 # #
17 # You should have received a copy of the GNU General Public License #
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. #
19 # #
20 ###############################################################################
21
22 from __future__ import division
23
24 import base64
25 import hashlib
26 import json
27 import os
28 import time
29 import urlgrabber
30 import urllib
31 import urlparse
32
33 import pakfire.downloader
34 import pakfire.util
35
36 from pakfire.constants import *
37 from pakfire.i18n import _
38
39 import logging
40 log = logging.getLogger("pakfire.transport")
41
42
43 class PakfireHubTransportUploader(object):
44 """
45 Handles the upload of a single file to the hub.
46 """
47
48 def __init__(self, transport, filename):
49 self.transport = transport
50 self.filename = filename
51
52 def get_upload_id(self):
53 """
54 Gets an upload from the pakfire hub.
55 """
56 # Calculate the SHA1 sum of the file to upload.
57 h = hashlib.new("sha1")
58 with open(self.filename, "rb") as f:
59 while True:
60 buf = f.read(CHUNK_SIZE)
61 if not buf:
62 break
63
64 h.update(buf)
65
66 data = {
67 "filename" : os.path.basename(self.filename),
68 "filesize" : os.path.getsize(self.filename),
69 "hash" : h.hexdigest(),
70 }
71
72 upload_id = self.transport.get("/uploads/create", data=data)
73 log.debug("Got upload id: %s" % upload_id)
74
75 return upload_id
76
77 def send_file(self, upload_id, progress_callback=None):
78 """
79 Sends the file content to the server.
80
81 The data is splitted into chunks, which are
82 sent one after an other.
83 """
84 with open(self.filename, "rb") as f:
85 # Initial chunk size.
86 chunk_size = CHUNK_SIZE
87
88 # Count the already transmitted bytes.
89 transferred = 0
90
91 while True:
92 chunk = f.read(chunk_size)
93 if not chunk:
94 break
95
96 log.debug("Got chunk of %s bytes" % len(chunk))
97
98 # Save the time when we started to send this bit.
99 time_started = time.time()
100
101 # Send the chunk to the server.
102 self.send_chunk(upload_id, chunk)
103
104 # Save the duration.time after the chunk has been transmitted
105 # and adjust chunk size to send one chunk per second.
106 duration = time.time() - time_started
107 chunk_size = int(chunk_size / duration)
108
109 # Never let chunk_size drop under CHUNK_SIZE:
110 if chunk_size < CHUNK_SIZE:
111 chunk_size = CHUNK_SIZE
112
113 # Add up the send amount of data.
114 transferred += len(chunk)
115 if progress_callback:
116 progress_callback(transferred)
117
118 def send_chunk(self, upload_id, data):
119 """
120 Sends a piece of the file to the server.
121 """
122 # Calculate checksum over the chunk data.
123 h = hashlib.new("sha512")
124 h.update(data)
125 chksum = h.hexdigest()
126
127 # Encode data in base64.
128 data = base64.b64encode(data)
129
130 # Send chunk data to the server.
131 self.transport.post("/uploads/%s/sendchunk" % upload_id,
132 data={ "chksum" : chksum, "data" : data })
133
134 def destroy_upload(self, upload_id):
135 """
136 Destroys the upload on the server.
137 """
138 self.transport.get("/uploads/%s/destroy" % upload_id)
139
140 def finish_upload(self, upload_id):
141 """
142 Signals to the server, that the upload has finished.
143 """
144 self.transport.get("/uploads/%s/finished" % upload_id)
145
146 def run(self):
147 upload_id = None
148
149 # Create a progress bar.
150 progress = pakfire.util.make_progress(
151 os.path.basename(self.filename), os.path.getsize(self.filename), speed=True, eta=True,
152 )
153
154 try:
155 # Get an upload ID.
156 upload_id = self.get_upload_id()
157
158 # Send the file content.
159 if progress:
160 self.send_file(upload_id, progress_callback=progress.update)
161 else:
162 self.send_file(upload_id)
163
164 except:
165 if progress:
166 progress.finish()
167
168 # Remove broken upload from server.
169 if upload_id:
170 self.destroy_upload(upload_id)
171
172 # XXX catch fatal errors
173 raise
174
175 else:
176 if progress:
177 progress.finish()
178
179 # If no exception was raised, the upload
180 # has finished.
181 self.finish_upload(upload_id)
182
183 # Return the upload id so some code can actually do something
184 # with the file on the server.
185 return upload_id
186
187
188 class PakfireHubTransport(object):
189 """
190 Connection to the pakfire hub.
191 """
192
193 def __init__(self, config):
194 self.config = config
195
196 # Create connection to the hub.
197 self.grabber = pakfire.downloader.PakfireGrabber(
198 self.config, prefix=self.url,
199 )
200
201 @property
202 def url(self):
203 """
204 Construct a right URL out of the given
205 server, username and password.
206
207 Basicly this just adds the credentials
208 to the URL.
209 """
210 # Get credentials.
211 server, username, password = self.config.get_hub_credentials()
212
213 # Parse the given URL.
214 url = urlparse.urlparse(server)
215 assert url.scheme in ("http", "https")
216
217 # Build new URL.
218 ret = "%s://" % url.scheme
219
220 # Add credentials if provided.
221 if username and password:
222 ret += "%s:%s@" % (username, password)
223
224 # Add path components.
225 ret += url.netloc
226
227 return ret
228
229 def one_request(self, url, **kwargs):
230 try:
231 return self.grabber.urlread(url, **kwargs)
232
233 except urlgrabber.grabber.URLGrabError, e:
234 # Timeout
235 if e.errno == 12:
236 raise TransportConnectionTimeoutError, e
237
238 # Handle common HTTP errors
239 elif e.errno == 14:
240 # Connection errors
241 if e.code == 5:
242 raise TransportConnectionProxyError, url
243 elif e.code == 6:
244 raise TransportConnectionDNSError, url
245 elif e.code == 7:
246 raise TransportConnectionResetError, url
247 elif e.code == 23:
248 raise TransportConnectionWriteError, url
249 elif e.code == 26:
250 raise TransportConnectionReadError, url
251
252 # SSL errors
253 elif e.code == 52:
254 raise TransportSSLCertificateExpiredError, url
255
256 # HTTP error codes
257 elif e.code == 403:
258 raise TransportForbiddenError, url
259 elif e.code == 404:
260 raise TransportNotFoundError, url
261 elif e.code == 500:
262 raise TransportInternalServerError, url
263 elif e.code in (502, 503):
264 raise TransportBadGatewayError, url
265 elif e.code == 504:
266 raise TransportConnectionTimeoutError, url
267
268 # All other exceptions...
269 raise
270
271 def request(self, url, tries=None, **kwargs):
272 # tries = None implies wait infinitely
273
274 while tries or tries is None:
275 if tries:
276 tries -= 1
277
278 try:
279 return self.one_request(url, **kwargs)
280
281 # 500 - Internal Server Error, 502 + 503 Bad Gateway Error
282 except (TransportInternalServerError, TransportBadGatewayError), e:
283 log.exception("%s" % e.__class__.__name__)
284
285 # Wait a minute before trying again.
286 time.sleep(60)
287
288 # Retry on connection problems.
289 except TransportConnectionError, e:
290 log.exception("%s" % e.__class__.__name__)
291
292 # Wait for 10 seconds.
293 time.sleep(10)
294
295 except (KeyboardInterrupt, SystemExit):
296 break
297
298 raise TransportMaxTriesExceededError
299
300 def escape_args(self, **kwargs):
301 return urllib.urlencode(kwargs)
302
303 def get(self, url, data={}, **kwargs):
304 """
305 Sends a HTTP GET request to the given URL.
306
307 All given keyword arguments are considered as form data.
308 """
309 params = self.escape_args(**data)
310
311 if params:
312 url = "%s?%s" % (url, params)
313
314 return self.request(url, **kwargs)
315
316 def post(self, url, data={}, **kwargs):
317 """
318 Sends a HTTP POST request to the given URL.
319
320 All keyword arguments are considered as form data.
321 """
322 params = self.escape_args(**data)
323 if params:
324 kwargs.update({
325 "data" : params,
326 })
327
328 return self.request(url, **kwargs)
329
330 def upload_file(self, filename):
331 """
332 Uploads the given file to the server.
333 """
334 uploader = PakfireHubTransportUploader(self, filename)
335 upload_id = uploader.run()
336
337 return upload_id
338
339 def get_json(self, *args, **kwargs):
340 res = self.get(*args, **kwargs)
341
342 # Decode JSON.
343 if res:
344 return json.loads(res)
345
346 ### Misc. actions
347
348 def noop(self):
349 """
350 No operation. Just to check if the connection is
351 working. Returns a random number.
352 """
353 return self.get("/noop")
354
355 def test_code(self, error_code):
356 assert error_code >= 100 and error_code <= 999
357
358 self.get("/error/test/%s" % error_code)
359
360 # Build actions
361
362 def build_create(self, filename, build_type, arches=None, distro=None):
363 """
364 Create a new build on the hub.
365 """
366 assert build_type in ("scratch", "release")
367
368 # XXX Check for permission to actually create a build.
369
370 # Upload the source file to the server.
371 upload_id = self.upload_file(filename)
372
373 data = {
374 "arches" : ",".join(arches or []),
375 "build_type" : build_type,
376 "distro" : distro or "",
377 "upload_id" : upload_id,
378 }
379
380 # Then create the build.
381 build_id = self.get("/builds/create", data=data)
382
383 return build_id or None
384
385 def build_get(self, build_uuid):
386 return self.get_json("/builds/%s" % build_uuid)
387
388 # Job actions
389
390 def job_get(self, job_uuid):
391 return self.get_json("/jobs/%s" % job_uuid)
392
393 # Package actions
394
395 def package_get(self, package_uuid):
396 return self.get_json("/packages/%s" % package_uuid)