]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
[pzstd] Print (de)compression results
authorNick Terrell <terrelln@fb.com>
Fri, 23 Sep 2016 19:55:21 +0000 (12:55 -0700)
committerNick Terrell <terrelln@fb.com>
Fri, 23 Sep 2016 20:42:50 +0000 (13:42 -0700)
contrib/pzstd/Pzstd.cpp
contrib/pzstd/Pzstd.h
contrib/pzstd/test/PzstdTest.cpp

index ccd4f6266483803fda99413dd45bb7c7de8acb57..5de90e8b68e9b0c59b7d55447bc7ca319b3fc2e5 100644 (file)
@@ -52,16 +52,18 @@ static std::uintmax_t fileSizeOrZero(const std::string &file) {
   return size;
 }
 
-static size_t handleOneInput(const Options &options,
+static std::uint64_t handleOneInput(const Options &options,
                              const std::string &inputFile,
                              FILE* inputFd,
+                             const std::string &outputFile,
                              FILE* outputFd,
                              ErrorHolder &errorHolder) {
   auto inputSize = fileSizeOrZero(inputFile);
   // WorkQueue outlives ThreadPool so in the case of error we are certain
   // we don't accidently try to call push() on it after it is destroyed.
   WorkQueue<std::shared_ptr<BufferWorkQueue>> outs{options.numThreads + 1};
-  size_t bytesWritten;
+  std::uint64_t bytesRead;
+  std::uint64_t bytesWritten;
   {
     // Initialize the thread pool with numThreads + 1
     // We add one because the read thread spends most of its time waiting.
@@ -71,8 +73,9 @@ static size_t handleOneInput(const Options &options,
     if (!options.decompress) {
       // Add a job that reads the input and starts all the compression jobs
       executor.add(
-          [&errorHolder, &outs, &executor, inputFd, inputSize, &options] {
-            asyncCompressChunks(
+          [&errorHolder, &outs, &executor, inputFd, inputSize, &options,
+                                                               &bytesRead] {
+            bytesRead = asyncCompressChunks(
                 errorHolder,
                 outs,
                 executor,
@@ -85,13 +88,27 @@ static size_t handleOneInput(const Options &options,
       bytesWritten = writeFile(errorHolder, outs, outputFd, options.decompress);
     } else {
       // Add a job that reads the input and starts all the decompression jobs
-      executor.add([&errorHolder, &outs, &executor, inputFd] {
-        asyncDecompressFrames(errorHolder, outs, executor, inputFd);
+      executor.add([&errorHolder, &outs, &executor, inputFd, &bytesRead] {
+        bytesRead = asyncDecompressFrames(errorHolder, outs, executor, inputFd);
       });
       // Start writing
       bytesWritten = writeFile(errorHolder, outs, outputFd, options.decompress);
     }
   }
+  if (options.verbosity > 1 && !errorHolder.hasError()) {
+    std::string inputFileName = inputFile == "-" ? "stdin" : inputFile;
+    std::string outputFileName = outputFile == "-" ? "stdout" : outputFile;
+    if (!options.decompress) {
+      double ratio = static_cast<double>(bytesWritten) /
+                     static_cast<double>(bytesRead + !bytesRead);
+      std::fprintf(stderr, "%-20s :%6.2f%%   (%6llu => %6llu bytes, %s)\n",
+                   inputFileName.c_str(), ratio * 100, bytesRead, bytesWritten,
+                   outputFileName.c_str());
+    } else {
+      std::fprintf(stderr, "%-20s: %llu bytes \n",
+                   inputFileName.c_str(),bytesWritten);
+    }
+  }
   return bytesWritten;
 }
 
@@ -185,7 +202,7 @@ int pzstdMain(const Options &options) {
     }
     auto closeOutputGuard = makeScopeGuard([&] { std::fclose(outputFd); });
     // (de)compress the file
-    handleOneInput(options, input, inputFd, outputFd, errorHolder);
+    handleOneInput(options, input, inputFd, outputFile, outputFd, errorHolder);
     if (errorHolder.hasError()) {
       continue;
     }
@@ -359,11 +376,13 @@ FileStatus fileStatus(FILE* fd) {
  * Returns the status of the file after all of the reads have occurred.
  */
 static FileStatus
-readData(BufferWorkQueue& queue, size_t chunkSize, size_t size, FILE* fd) {
+readData(BufferWorkQueue& queue, size_t chunkSize, size_t size, FILE* fd,
+         std::uint64_t *totalBytesRead) {
   Buffer buffer(size);
   while (!buffer.empty()) {
     auto bytesRead =
         std::fread(buffer.data(), 1, std::min(chunkSize, buffer.size()), fd);
+    *totalBytesRead += bytesRead;
     queue.push(buffer.splitAt(bytesRead));
     auto status = fileStatus(fd);
     if (status != FileStatus::Continue) {
@@ -373,7 +392,7 @@ readData(BufferWorkQueue& queue, size_t chunkSize, size_t size, FILE* fd) {
   return FileStatus::Continue;
 }
 
-void asyncCompressChunks(
+std::uint64_t asyncCompressChunks(
     ErrorHolder& errorHolder,
     WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks,
     ThreadPool& executor,
@@ -382,6 +401,7 @@ void asyncCompressChunks(
     size_t numThreads,
     ZSTD_parameters params) {
   auto chunksGuard = makeScopeGuard([&] { chunks.finish(); });
+  std::uint64_t bytesRead = 0;
 
   // Break the input up into chunks of size `step` and compress each chunk
   // independently.
@@ -401,9 +421,10 @@ void asyncCompressChunks(
     // Pass the output queue to the writer thread.
     chunks.push(std::move(out));
     // Fill the input queue for the compression job we just started
-    status = readData(*in, ZSTD_CStreamInSize(), step, fd);
+    status = readData(*in, ZSTD_CStreamInSize(), step, fd, &bytesRead);
   }
   errorHolder.check(status != FileStatus::Error, "Error reading input");
+  return bytesRead;
 }
 
 /**
@@ -484,12 +505,14 @@ static void decompress(
   }
 }
 
-void asyncDecompressFrames(
+std::uint64_t asyncDecompressFrames(
     ErrorHolder& errorHolder,
     WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames,
     ThreadPool& executor,
     FILE* fd) {
   auto framesGuard = makeScopeGuard([&] { frames.finish(); });
+  std::uint64_t totalBytesRead = 0;
+
   // Split the source up into its component frames.
   // If we find our recognized skippable frame we know the next frames size
   // which means that we can decompress each standard frame in independently.
@@ -509,6 +532,7 @@ void asyncDecompressFrames(
       // frameSize is 0 if the frame info can't be decoded.
       Buffer buffer(SkippableFrame::kSize);
       auto bytesRead = std::fread(buffer.data(), 1, buffer.size(), fd);
+      totalBytesRead += bytesRead;
       status = fileStatus(fd);
       if (bytesRead == 0 && status != FileStatus::Continue) {
         break;
@@ -533,14 +557,15 @@ void asyncDecompressFrames(
       // We hit a non SkippableFrame ==> not compressed by pzstd or corrupted
       // Pass the rest of the source to this decompression task
       while (status == FileStatus::Continue && !errorHolder.hasError()) {
-        status = readData(*in, chunkSize, chunkSize, fd);
+        status = readData(*in, chunkSize, chunkSize, fd, &totalBytesRead);
       }
       break;
     }
     // Fill the input queue for the decompression job we just started
-    status = readData(*in, chunkSize, frameSize, fd);
+    status = readData(*in, chunkSize, frameSize, fd, &totalBytesRead);
   }
   errorHolder.check(status != FileStatus::Error, "Error reading input");
+  return totalBytesRead;
 }
 
 /// Write `data` to `fd`, returns true iff success.
@@ -554,12 +579,12 @@ static bool writeData(ByteRange data, FILE* fd) {
   return true;
 }
 
-size_t writeFile(
+std::uint64_t writeFile(
     ErrorHolder& errorHolder,
     WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs,
     FILE* outputFd,
     bool decompress) {
-  size_t bytesWritten = 0;
+  std::uint64_t bytesWritten = 0;
   std::shared_ptr<BufferWorkQueue> out;
   // Grab the output queue for each decompression job (in order).
   while (outs.pop(out) && !errorHolder.hasError()) {
index 0c21d13522a06066c33472f84abbb1f11aa77468..c3b2926b6a307d0ec95603e47c60d8a843cb2d00 100644 (file)
@@ -45,8 +45,9 @@ int pzstdMain(const Options& options);
  * @param size         The size of the input file if known, 0 otherwise
  * @param numThreads   The number of threads in the thread pool
  * @param parameters   The zstd parameters to use for compression
+ * @returns            The number of bytes read from the file
  */
-void asyncCompressChunks(
+std::uint64_t asyncCompressChunks(
     ErrorHolder& errorHolder,
     WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks,
     ThreadPool& executor,
@@ -66,8 +67,9 @@ void asyncCompressChunks(
  *                      as soon as it is available
  * @param executor     The thread pool to run compression jobs in
  * @param fd           The input file descriptor
+ * @returns            The number of bytes read from the file
  */
-void asyncDecompressFrames(
+std::uint64_t asyncDecompressFrames(
     ErrorHolder& errorHolder,
     WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames,
     ThreadPool& executor,
@@ -84,7 +86,7 @@ void asyncDecompressFrames(
  * @param decompress   Are we decompressing?
  * @returns            The number of bytes written
  */
-std::size_t writeFile(
+std::uint64_t writeFile(
     ErrorHolder& errorHolder,
     WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs,
     FILE* outputFd,
index 64bcf9cab3db42e1f0a68128c91920ce96cdc406..c85f73a39b60ef3a564f181263ff1beeace6f220 100644 (file)
@@ -54,6 +54,7 @@ TEST(Pzstd, SmallSizes) {
           options.inputFiles = {inputFile};
           options.numThreads = numThreads;
           options.compressionLevel = level;
+          options.verbosity = 1;
           ASSERT_TRUE(roundTrip(options));
           errorGuard.dismiss();
         }
@@ -91,6 +92,7 @@ TEST(Pzstd, LargeSizes) {
           options.inputFiles = {inputFile};
           options.numThreads = std::min(numThreads, options.numThreads);
           options.compressionLevel = level;
+          options.verbosity = 1;
           ASSERT_TRUE(roundTrip(options));
           errorGuard.dismiss();
         }