def log_digest_blake2s(self):
return self.data.log_digest_blake2s
+ async def open_log(self):
+ """
+ Opens the log file, and returns an open file handle
+ """
+ # Raise an error, if we don't have a log file
+ if not self.has_log():
+ raise FileNotFoundError
+
+ return await asyncio.to_thread(self._open_log)
+
+ def _open_log(self):
+ path = self.log_path
+
+ # Open gzip-compressed files
+ if path.endswith(".gz"):
+ return gzip.open(path)
+
+ # Open uncompressed files
+ else:
+ return open(open)
+
async def _import_log(self, upload):
# Create some destination path
path = self.backend.path(
#!/usr/bin/python3
+import io
import unittest
import test
log=log,
)
+ async def test_log(self):
+ """
+ This test creates a build and tries to access the log
+ """
+ path = self.source_path("tests/data/beep-1.3-2.ip3.src.pfm")
+
+ # Create the build
+ with self.db.transaction():
+ build = await self._create_build(path)
+
+ # Assign all jobs to the default builder
+ for job in build.jobs:
+ job.assign(self.builder)
+
+ # This job should now be running
+ self.assertTrue(job.is_running())
+
+ # The builder should match
+ self.assertEqual(job.builder, self.builder)
+
+ path = self.source_path("tests/data/beep-1.3-2.ip3.x86_64.pfm")
+
+ # Pick the first job
+ job = build.jobs[0]
+
+ # Upload the log
+ log = await self._create_upload(
+ self.source_path("tests/data/beep-1.3-2.ip3.x86_64.log"),
+ )
+
+ # Pretend that the job has failed
+ await job.finished(
+ success=False,
+ log=log,
+ )
+
+ # Check if we have a log
+ self.assertTrue(job.has_log())
+
+ # Check if the log path is set
+ self.assertIsNotNone(job.log_path)
+
+ # Check if log size is set
+ self.assertGreater(job.log_size, 0)
+
+ # Check if the digest has been set
+ self.assertIsNotNone(job.log_digest_blake2s)
+
+ # Try to open the log file again
+ with await job.open_log() as f:
+ self.assertIsInstance(f, io.IOBase)
+
+
if __name__ == "__main__":
unittest.main()