]> git.ipfire.org Git - pakfire.git/blame - pakfire/server.py
Fix server downloader which has no Pakfire reference.
[pakfire.git] / pakfire / server.py
CommitLineData
3ad4bb5a
MT
1#!/usr/bin/python
2
3import hashlib
4import logging
8276111d 5import os
3ad4bb5a
MT
6import random
7import socket
8import subprocess
9import tempfile
10import time
11import xmlrpclib
12
13import pakfire.api
14import pakfire.base
15import pakfire.config
16import pakfire.downloader
17import pakfire.packages
18import pakfire.repository
19import pakfire.util
20
21from pakfire.constants import *
22
23CHUNK_SIZE = 1024**2 # 1M
24
25class Source(object):
26 def __init__(self, pakfire, id, name, url, path, targetpath, revision, branch):
27 self.pakfire = pakfire
28 self.id = id
29 self.name = name
30 self.url = url
31 self.targetpath = targetpath
32 self.revision = revision
33 self.branch = branch
34
35 # If the repository is not yet checked out, we create a local clone
36 # from it to work with it.
37 if not self.is_cloned():
38 self.clone()
39 else:
40 # Always refresh the repository to have the recent commits.
41 self.fetch()
42
43 def is_cloned(self):
44 return os.path.exists(self.path)
45
46 def clone(self):
47 if self.is_cloned():
48 return
49
50 dirname = os.path.dirname(self.path)
51 basename = os.path.basename(self.path)
52
53 if not os.path.exists(dirname):
54 os.makedirs(dirname)
55
56 self._git("clone %s %s" % (self.url, basename), path=dirname)
57
58 def fetch(self):
59 self._git("fetch")
60
61 @property
62 def path(self):
63 h = hashlib.sha1(self.url)
64
65 # XXX path is to be changed
66 return "/var/cache/pakfire/sources/%s" % h.hexdigest()
67
68 def _git(self, cmd, path=None):
69 if not path:
70 path = self.path
71
72 cmd = "cd %s && git %s" % (path, cmd)
73
74 logging.debug("Running command: %s" % cmd)
75
76 return subprocess.check_output(["/bin/sh", "-c", cmd])
77
78 def _git_changed_files(self, revision1, revision2=""):
79 files = self._git("diff --name-only %s %s" % (revision1, revision2))
80
81 return [os.path.join(self.path, f) for f in files.splitlines()]
82
83 def _git_checkout_revision(self, revision):
84 self._git("checkout %s" % revision)
85
86 def update_revision(self, revision, **pakfire_args):
87 # Checkout the revision we want to work with.
88 self._git_checkout_revision(revision)
89
90 # Get list of all changes files between the current revision and
91 # the previous one.
92 files = self._git_changed_files("HEAD^", "HEAD")
93
94 # Update all changed files and return a repository with them.
95 return self.update_files([f for f in files if f.endswith(".%s" % MAKEFILE_EXTENSION)],
96 **pakfire_args)
97
98 def update_files(self, files, **pakfire_args):
99 rnd = random.randint(0, 1024**2)
100 tmpdir = "/tmp/pakfire-source-%s" % rnd
101
102 pkgs = []
103 for file in files:
104 if os.path.exists(file):
105 pkgs.append(file)
106 # XXX not sure what to do here
107 #else:
108 # pkg_name = os.path.basename(os.path.dirname(file))
109 #
110 # # Send deleted package to server.
111 # self.master.package_remove(self, pkg_name)
112
113 if not pkgs:
114 return
115
116 # XXX This totally ignores the local configuration.
117 pakfire.api.dist(pkgs, resultdirs=[tmpdir,], **pakfire_args)
118
119 # Create a kind of dummy repository to link the packages against it.
8276111d
MT
120 if pakfire_args.has_key("build_id"):
121 del pakfire_args["build_id"]
122 pakfire_args["mode"] = "server"
3ad4bb5a 123
8276111d
MT
124 repo = pakfire.api.repo_create("source-%s" % rnd, [tmpdir,], type="source",
125 **pakfire_args)
3ad4bb5a 126
8276111d 127 return repo
3ad4bb5a
MT
128
129 def update_all(self):
130 _files = []
131 for dir, subdirs, files in os.walk(self.path):
132 for f in files:
133 if not f.endswith(".%s" % MAKEFILE_EXTENSION):
134 continue
135
136 _files.append(os.path.join(dir, f))
137
138 return self.update_files(_files)
139
140
141class XMLRPCTransport(xmlrpclib.Transport):
142 user_agent = "pakfire/%s" % PAKFIRE_VERSION
143
144 def single_request(self, *args, **kwargs):
145 ret = None
146
147 # Tries can be passed to this method.
148 tries = kwargs.pop("tries", 100)
149
150 while tries:
151 try:
152 ret = xmlrpclib.Transport.single_request(self, *args, **kwargs)
153
154 except socket.error, e:
155 # These kinds of errors are not fatal, but they can happen on
156 # a bad internet connection or whatever.
157 # 32 Broken pipe
158 # 110 Connection timeout
159 # 111 Connection refused
160 if not e.errno in (32, 110, 111,):
161 raise
162
163 except xmlrpclib.ProtocolError, e:
164 # Log all XMLRPC protocol errors.
165 logging.error("XMLRPC protocol error:")
166 logging.error(" URL: %s" % e.url)
167 logging.error(" HTTP headers:")
168 for header in e.headers.items():
169 logging.error(" %s: %s" % header)
170 logging.error(" Error code: %s" % e.errcode)
171 logging.error(" Error message: %s" % e.errmsg)
172 raise
173
174 else:
175 # If request was successful, we can break the loop.
176 break
177
178 # If the request was not successful, we wait a little time to try
179 # it again.
180 logging.debug("Request was not successful, we wait a little bit and try it again.")
181 time.sleep(30)
182 tries -= 1
183
184 else:
185 logging.error("Maximum number of tries was reached. Giving up.")
186 # XXX need better exception here.
187 raise Exception, "Could not fulfill request."
188
189 return ret
190
191
192class Server(object):
193 def __init__(self, **pakfire_args):
194 self.config = pakfire.config.Config()
195
196 server = self.config._slave.get("server")
197
198 logging.info("Establishing RPC connection to: %s" % server)
199
200 self.conn = xmlrpclib.ServerProxy(server, transport=XMLRPCTransport(),
201 allow_none=True)
202
203 @property
204 def hostname(self):
205 """
206 Return the host's name.
207 """
208 return socket.gethostname()
209
210 def update_info(self):
211 # Get the current load average.
212 loadavg = ", ".join(["%.2f" % l for l in os.getloadavg()])
213
214 # Get all supported architectures.
215 arches = sorted([a for a in self.config.supported_arches])
216 arches = " ".join(arches)
217
218 # Determine CPU model
219 cpuinfo = {}
220 with open("/proc/cpuinfo") as f:
221 for line in f.readlines():
222 # Break at an empty line, because all information after that
223 # is redundant.
224 if not line:
225 break
226
227 try:
228 key, value = line.split(":")
229 except:
230 pass # Skip invalid lines
231
232 key, value = key.strip(), value.strip()
233
234 cpuinfo[key] = value
235
236 cpu_model = cpuinfo.get("model name", "Could not be determined")
237
238 # Determine memory size
239 memory = 0
240 with open("/proc/meminfo") as f:
241 line = f.readline()
242
243 try:
244 a, b, c = line.split()
245 except:
246 pass
247 else:
ee603c85 248 memory = int(b)
3ad4bb5a
MT
249
250 self.conn.update_host_info(loadavg, cpu_model, memory, arches)
251
252 def upload_file(self, filename, build_id):
253 # Get the hash of the file.
254 hash = pakfire.util.calc_hash1(filename)
255
256 # Get the size of the file.
257 size = os.path.getsize(filename)
258
259 # Get an upload ID from the server.
260 upload_id = self.conn.get_upload_cookie(os.path.basename(filename),
261 size, hash)
262
263 # Calculate the number of chunks.
264 chunks = (size / CHUNK_SIZE) + 1
265
266 # Cut the file in pieces and upload them one after another.
267 with open(filename) as f:
268 chunk = 0
269 while True:
270 data = f.read(CHUNK_SIZE)
271 if not data:
272 break
273
274 chunk += 1
275 logging.info("Uploading chunk %s/%s of %s." % (chunk, chunks,
276 os.path.basename(filename)))
277
278 data = xmlrpclib.Binary(data)
279 self.conn.upload_chunk(upload_id, data)
280
281 # Tell the server, that we finished the upload.
282 ret = self.conn.finish_upload(upload_id, build_id)
283
284 # If the server sends false, something happened with the upload that
285 # could not be recovered.
286 if not ret:
287 raise Exception, "Upload failed."
288
289 def update_build_status(self, build_id, status, message=""):
290 ret = self.conn.update_build_state(build_id, status, message)
291
292 # If the server returns False, then it did not acknowledge our status
293 # update and the build has to be aborted.
294 if not ret:
295 raise BuildAbortedException, "The build was aborted by the master server."
296
297 def build_job(self, type=None):
298 build = self.conn.build_job() # XXX type=None
299
300 # If the server has got no job for us, we end right here.
301 if not build:
302 return
303
304 job_types = {
305 "binary" : self.build_binary_job,
306 "source" : self.build_source_job,
307 }
308
309 build_id = build["id"]
310 build_type = build["type"]
311
312 try:
313 func = job_types[build_type]
314 except KeyError:
315 raise Exception, "Build type not supported: %s" % type
316
317 # Call the function that processes the build and try to catch general
318 # exceptions and report them to the server.
319 # If everything goes okay, we tell this the server, too.
320 try:
321 func(build_id, build)
322
1c2f9e52
MT
323 except DependencyError:
324 # This has already been reported by func.
325 raise
326
3ad4bb5a
MT
327 except Exception, e:
328 # Format the exception and send it to the server.
329 message = "%s: %s" % (e.__class__.__name__, e)
330
331 self.update_build_status(build_id, "failed", message)
332 raise
333
334 else:
335 self.update_build_status(build_id, "finished")
336
337 def build_binary_job(self, build_id, build):
338 arch = build["arch"]
339 filename = build["name"]
340 download = build["download"]
341 hash1 = build["hash1"]
342
343 # Create a temporary file and a directory for the resulting files.
344 tmpdir = tempfile.mkdtemp()
345 tmpfile = os.path.join(tmpdir, filename)
346 logfile = os.path.join(tmpdir, "build.log")
347
348 # Get a package grabber and add mirror download capabilities to it.
e57c5475 349 grabber = pakfire.downloader.PackageDownloader(self.config)
3ad4bb5a
MT
350
351 try:
352 # Download the source.
353 grabber.urlgrab(download, filename=tmpfile)
354
355 # Check if the download checksum matches.
356 if pakfire.util.calc_hash1(tmpfile) == hash1:
357 print "Checksum matches: %s" % hash1
358 else:
359 raise DownloadError, "Download was corrupted"
360
361 # Update the build status on the server.
362 self.update_build_status(build_id, "running")
363
364 # Run the build.
365 pakfire.api.build(tmpfile, build_id=build_id,
366 resultdirs=[tmpdir,], logfile=logfile)
367
368 self.update_build_status(build_id, "uploading")
369
370 # Walk through the result directory and upload all (binary) files.
371 for dir, subdirs, files in os.walk(tmpdir):
372 for file in files:
373 file = os.path.join(dir, file)
374 if file in (logfile, tmpfile,):
375 continue
376
377 self.upload_file(file, build_id)
378
379 except DependencyError, e:
380 message = "%s: %s" % (e.__class__.__name__, e)
381 self.update_build_status(build_id, "dependency_error", message)
1c2f9e52 382 raise
3ad4bb5a
MT
383
384 finally:
385 # Upload the logfile in any case and if it exists.
386 if os.path.exists(logfile):
387 self.upload_file(logfile, build_id)
388
389 # Cleanup the files we created.
390 pakfire.util.rm(tmpdir)
391
392 def build_source_job(self, build_id, build):
393 # Update the build status on the server.
394 self.update_build_status(build_id, "running")
395
396 source = Source(self, **build["source"])
397
398 repo = source.update_revision(build["revision"], build_id=build_id)
399
8276111d
MT
400 try:
401 # Upload all files in the repository.
402 for pkg in repo:
403 path = os.path.join(pkg.repo.path, pkg.filename)
404 self.upload_file(path, build_id)
405 finally:
406 repo.remove()
407
408 def update_repositories(self, limit=2):
409 repos = self.conn.get_repos(limit)
410
411 for repo in repos:
412 files = self.conn.get_repo_packages(repo["id"])
413
414 for arch in repo["arches"]:
415 path = "/pakfire/repositories/%s/%s/%s" % \
416 (repo["distro"]["sname"], repo["name"], arch)
3ad4bb5a 417
8276111d 418 pakfire.api.repo_create(path, files)