]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
[pzstd] Put ErrorHolder into SharedState
authorNick Terrell <terrelln@fb.com>
Wed, 12 Oct 2016 22:18:16 +0000 (15:18 -0700)
committerNick Terrell <terrelln@fb.com>
Fri, 14 Oct 2016 22:26:55 +0000 (15:26 -0700)
contrib/pzstd/Pzstd.cpp
contrib/pzstd/Pzstd.h

index db9b8c85b877065e2038165a816eff6f80594b20..70c0515b3854da0e45c8a017fb431b9160a9c4ef 100644 (file)
@@ -59,7 +59,7 @@ static std::uint64_t handleOneInput(const Options &options,
                              FILE* inputFd,
                              const std::string &outputFile,
                              FILE* outputFd,
-                             ErrorHolder &errorHolder) {
+                             SharedState& state) {
   auto inputSize = fileSizeOrZero(inputFile);
   // WorkQueue outlives ThreadPool so in the case of error we are certain
   // we don't accidently try to call push() on it after it is destroyed
@@ -74,10 +74,9 @@ static std::uint64_t handleOneInput(const Options &options,
     if (!options.decompress) {
       // Add a job that reads the input and starts all the compression jobs
       readExecutor.add(
-          [&errorHolder, &outs, &executor, inputFd, inputSize, &options,
-                                                               &bytesRead] {
+          [&state, &outs, &executor, inputFd, inputSize, &options, &bytesRead] {
             bytesRead = asyncCompressChunks(
-                errorHolder,
+                state,
                 outs,
                 executor,
                 inputFd,
@@ -86,19 +85,19 @@ static std::uint64_t handleOneInput(const Options &options,
                 options.determineParameters());
           });
       // Start writing
-      bytesWritten = writeFile(errorHolder, outs, outputFd, options.decompress,
+      bytesWritten = writeFile(state, outs, outputFd, options.decompress,
                                options.verbosity);
     } else {
       // Add a job that reads the input and starts all the decompression jobs
-      readExecutor.add([&errorHolder, &outs, &executor, inputFd, &bytesRead] {
-        bytesRead = asyncDecompressFrames(errorHolder, outs, executor, inputFd);
+      readExecutor.add([&state, &outs, &executor, inputFd, &bytesRead] {
+        bytesRead = asyncDecompressFrames(state, outs, executor, inputFd);
       });
       // Start writing
-      bytesWritten = writeFile(errorHolder, outs, outputFd, options.decompress,
+      bytesWritten = writeFile(state, outs, outputFd, options.decompress,
                                options.verbosity);
     }
   }
-  if (options.verbosity > 1 && !errorHolder.hasError()) {
+  if (options.verbosity > 1 && !state.errorHolder.hasError()) {
     std::string inputFileName = inputFile == "-" ? "stdin" : inputFile;
     std::string outputFileName = outputFile == "-" ? "stdout" : outputFile;
     if (!options.decompress) {
@@ -176,53 +175,53 @@ int pzstdMain(const Options &options) {
   int returnCode = 0;
   for (const auto& input : options.inputFiles) {
     // Setup the error holder
-    ErrorHolder errorHolder;
+    SharedState state;
     auto printErrorGuard = makeScopeGuard([&] {
-      if (errorHolder.hasError()) {
+      if (state.errorHolder.hasError()) {
         returnCode = 1;
         if (options.verbosity > 0) {
           std::fprintf(stderr, "pzstd: %s: %s.\n", input.c_str(),
-                       errorHolder.getError().c_str());
+                       state.errorHolder.getError().c_str());
         }
       }
     });
     // Open the input file
-    auto inputFd = openInputFile(input, errorHolder);
+    auto inputFd = openInputFile(input, state.errorHolder);
     if (inputFd == nullptr) {
       continue;
     }
     auto closeInputGuard = makeScopeGuard([&] { std::fclose(inputFd); });
     // Open the output file
     auto outputFile = options.getOutputFile(input);
-    if (!errorHolder.check(outputFile != "",
+    if (!state.errorHolder.check(outputFile != "",
                            "Input file does not have extension .zst")) {
       continue;
     }
-    auto outputFd = openOutputFile(options, outputFile, errorHolder);
+    auto outputFd = openOutputFile(options, outputFile, state.errorHolder);
     if (outputFd == nullptr) {
       continue;
     }
     auto closeOutputGuard = makeScopeGuard([&] { std::fclose(outputFd); });
     // (de)compress the file
-    handleOneInput(options, input, inputFd, outputFile, outputFd, errorHolder);
-    if (errorHolder.hasError()) {
+    handleOneInput(options, input, inputFd, outputFile, outputFd, state);
+    if (state.errorHolder.hasError()) {
       continue;
     }
     // Delete the input file if necessary
     if (!options.keepSource) {
       // Be sure that we are done and have written everything before we delete
-      if (!errorHolder.check(std::fclose(inputFd) == 0,
+      if (!state.errorHolder.check(std::fclose(inputFd) == 0,
                              "Failed to close input file")) {
         continue;
       }
       closeInputGuard.dismiss();
-      if (!errorHolder.check(std::fclose(outputFd) == 0,
+      if (!state.errorHolder.check(std::fclose(outputFd) == 0,
                              "Failed to close output file")) {
         continue;
       }
       closeOutputGuard.dismiss();
       if (std::remove(input.c_str()) != 0) {
-        errorHolder.setError("Failed to remove input file");
+        state.errorHolder.setError("Failed to remove input file");
         continue;
       }
     }
@@ -268,18 +267,19 @@ Buffer split(Buffer& buffer, ZSTD_outBuffer& outBuffer) {
 /**
  * Stream chunks of input from `in`, compress it, and stream it out to `out`.
  *
- * @param errorHolder Used to report errors and check if an error occured
+ * @param state        The shared state
  * @param in           Queue that we `pop()` input buffers from
  * @param out          Queue that we `push()` compressed output buffers to
  * @param maxInputSize An upper bound on the size of the input
  * @param parameters   The zstd parameters to use for compression
  */
 static void compress(
-    ErrorHolder& errorHolder,
+    SharedState& state,
     std::shared_ptr<BufferWorkQueue> in,
     std::shared_ptr<BufferWorkQueue> out,
     size_t maxInputSize,
     ZSTD_parameters parameters) {
+  auto& errorHolder = state.errorHolder;
   auto guard = makeScopeGuard([&] { out->finish(); });
   // Initialize the CCtx
   std::unique_ptr<ZSTD_CStream, size_t (*)(ZSTD_CStream*)> ctx(
@@ -395,7 +395,7 @@ readData(BufferWorkQueue& queue, size_t chunkSize, size_t size, FILE* fd,
 }
 
 std::uint64_t asyncCompressChunks(
-    ErrorHolder& errorHolder,
+    SharedState& state,
     WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks,
     ThreadPool& executor,
     FILE* fd,
@@ -409,23 +409,23 @@ std::uint64_t asyncCompressChunks(
   // independently.
   size_t step = calculateStep(size, numThreads, params);
   auto status = FileStatus::Continue;
-  while (status == FileStatus::Continue && !errorHolder.hasError()) {
+  while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
     // Make a new input queue that we will put the chunk's input data into.
     auto in = std::make_shared<BufferWorkQueue>();
     auto inGuard = makeScopeGuard([&] { in->finish(); });
     // Make a new output queue that compress will put the compressed data into.
     auto out = std::make_shared<BufferWorkQueue>();
     // Start compression in the thread pool
-    executor.add([&errorHolder, in, out, step, params] {
+    executor.add([&state, in, out, step, params] {
       return compress(
-          errorHolder, std::move(in), std::move(out), step, params);
+          state, std::move(in), std::move(out), step, params);
     });
     // Pass the output queue to the writer thread.
     chunks.push(std::move(out));
     // Fill the input queue for the compression job we just started
     status = readData(*in, ZSTD_CStreamInSize(), step, fd, &bytesRead);
   }
-  errorHolder.check(status != FileStatus::Error, "Error reading input");
+  state.errorHolder.check(status != FileStatus::Error, "Error reading input");
   return bytesRead;
 }
 
@@ -433,15 +433,16 @@ std::uint64_t asyncCompressChunks(
  * Decompress a frame, whose data is streamed into `in`, and stream the output
  * to `out`.
  *
- * @param errorHolder Used to report errors and check if an error occured
+ * @param state        The shared state
  * @param in           Queue that we `pop()` input buffers from. It contains
  *                      exactly one compressed frame.
  * @param out          Queue that we `push()` decompressed output buffers to
  */
 static void decompress(
-    ErrorHolder& errorHolder,
+    SharedState& state,
     std::shared_ptr<BufferWorkQueue> in,
     std::shared_ptr<BufferWorkQueue> out) {
+  auto& errorHolder = state.errorHolder;
   auto guard = makeScopeGuard([&] { out->finish(); });
   // Initialize the DCtx
   std::unique_ptr<ZSTD_DStream, size_t (*)(ZSTD_DStream*)> ctx(
@@ -508,7 +509,7 @@ static void decompress(
 }
 
 std::uint64_t asyncDecompressFrames(
-    ErrorHolder& errorHolder,
+    SharedState& state,
     WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames,
     ThreadPool& executor,
     FILE* fd) {
@@ -521,7 +522,7 @@ std::uint64_t asyncDecompressFrames(
   // Otherwise, we will decompress using only one decompression task.
   const size_t chunkSize = ZSTD_DStreamInSize();
   auto status = FileStatus::Continue;
-  while (status == FileStatus::Continue && !errorHolder.hasError()) {
+  while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
     // Make a new input queue that we will put the frames's bytes into.
     auto in = std::make_shared<BufferWorkQueue>();
     auto inGuard = makeScopeGuard([&] { in->finish(); });
@@ -550,15 +551,15 @@ std::uint64_t asyncDecompressFrames(
       out->setMaxSize(64);
     }
     // Start decompression in the thread pool
-    executor.add([&errorHolder, in, out] {
-      return decompress(errorHolder, std::move(in), std::move(out));
+    executor.add([&state, in, out] {
+      return decompress(state, std::move(in), std::move(out));
     });
     // Pass the output queue to the writer thread
     frames.push(std::move(out));
     if (frameSize == 0) {
       // We hit a non SkippableFrame ==> not compressed by pzstd or corrupted
       // Pass the rest of the source to this decompression task
-      while (status == FileStatus::Continue && !errorHolder.hasError()) {
+      while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
         status = readData(*in, chunkSize, chunkSize, fd, &totalBytesRead);
       }
       break;
@@ -566,7 +567,7 @@ std::uint64_t asyncDecompressFrames(
     // Fill the input queue for the decompression job we just started
     status = readData(*in, chunkSize, frameSize, fd, &totalBytesRead);
   }
-  errorHolder.check(status != FileStatus::Error, "Error reading input");
+  state.errorHolder.check(status != FileStatus::Error, "Error reading input");
   return totalBytesRead;
 }
 
@@ -598,11 +599,12 @@ void updateWritten(int verbosity, std::uint64_t bytesWritten) {
 }
 
 std::uint64_t writeFile(
-    ErrorHolder& errorHolder,
+    SharedState& state,
     WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs,
     FILE* outputFd,
     bool decompress,
     int verbosity) {
+  auto& errorHolder = state.errorHolder;
   auto lineClearGuard = makeScopeGuard([verbosity] {
     if (verbosity > 1) {
       std::fprintf(stderr, "\r%79s\r", "");
index fe44ccfde7cdf27f4fa8c31ab96848716b6bf781..469c20cd45e15574aff7bdd6c0bf8a88fffe1bf4 100644 (file)
@@ -12,6 +12,7 @@
 #include "Options.h"
 #include "utils/Buffer.h"
 #include "utils/Range.h"
+#include "utils/ResourcePool.h"
 #include "utils/ThreadPool.h"
 #include "utils/WorkQueue.h"
 #define ZSTD_STATIC_LINKING_ONLY
@@ -32,12 +33,17 @@ namespace pzstd {
  */
 int pzstdMain(const Options& options);
 
+class SharedState {
+ public:
+  ErrorHolder errorHolder;
+};
+
 /**
  * Streams input from `fd`, breaks input up into chunks, and compresses each
  * chunk independently.  Output of each chunk gets streamed to a queue, and
  * the output queues get put into `chunks` in order.
  *
- * @param errorHolder  Used to report errors and coordinate early shutdown
+ * @param state        The shared state
  * @param chunks       Each compression jobs output queue gets `pushed()` here
  *                      as soon as it is available
  * @param executor     The thread pool to run compression jobs in
@@ -48,7 +54,7 @@ int pzstdMain(const Options& options);
  * @returns            The number of bytes read from the file
  */
 std::uint64_t asyncCompressChunks(
-    ErrorHolder& errorHolder,
+    SharedState& state,
     WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks,
     ThreadPool& executor,
     FILE* fd,
@@ -62,7 +68,7 @@ std::uint64_t asyncCompressChunks(
  * decompression job.  Output of each frame gets streamed to a queue, and
  * the output queues get put into `frames` in order.
  *
- * @param errorHolder  Used to report errors and coordinate early shutdown
+ * @param state        The shared state
  * @param frames       Each decompression jobs output queue gets `pushed()` here
  *                      as soon as it is available
  * @param executor     The thread pool to run compression jobs in
@@ -70,7 +76,7 @@ std::uint64_t asyncCompressChunks(
  * @returns            The number of bytes read from the file
  */
 std::uint64_t asyncDecompressFrames(
-    ErrorHolder& errorHolder,
+    SharedState& state,
     WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames,
     ThreadPool& executor,
     FILE* fd);
@@ -79,7 +85,7 @@ std::uint64_t asyncDecompressFrames(
  * Streams input in from each queue in `outs` in order, and writes the data to
  * `outputFd`.
  *
- * @param errorHolder  Used to report errors and coordinate early exit
+ * @param state        The shared state
  * @param outs         A queue of output queues, one for each
  *                      (de)compression job.
  * @param outputFd     The file descriptor to write to
@@ -88,7 +94,7 @@ std::uint64_t asyncDecompressFrames(
  * @returns            The number of bytes written
  */
 std::uint64_t writeFile(
-    ErrorHolder& errorHolder,
+    SharedState& state,
     WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs,
     FILE* outputFd,
     bool decompress,