]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
[pzstd] Run the reading thread separately
authorNick Terrell <terrelln@fb.com>
Fri, 7 Oct 2016 22:04:34 +0000 (15:04 -0700)
committerNick Terrell <terrelln@fb.com>
Fri, 14 Oct 2016 22:26:55 +0000 (15:26 -0700)
contrib/pzstd/Pzstd.cpp

index 68f5bb9778afce40d1b5ea260cb116c076ca5919..db9b8c85b877065e2038165a816eff6f80594b20 100644 (file)
@@ -62,19 +62,18 @@ static std::uint64_t handleOneInput(const Options &options,
                              ErrorHolder &errorHolder) {
   auto inputSize = fileSizeOrZero(inputFile);
   // WorkQueue outlives ThreadPool so in the case of error we are certain
-  // we don't accidently try to call push() on it after it is destroyed.
+  // we don't accidently try to call push() on it after it is destroyed
   WorkQueue<std::shared_ptr<BufferWorkQueue>> outs{options.numThreads + 1};
   std::uint64_t bytesRead;
   std::uint64_t bytesWritten;
   {
-    // Initialize the thread pool with numThreads + 1
-    // We add one because the read thread spends most of its time waiting.
-    // This also sets the minimum number of threads to 2, so the algorithm
-    // doesn't deadlock.
-    ThreadPool executor(options.numThreads + 1);
+    // Initialize the (de)compression thread pool with numThreads
+    ThreadPool executor(options.numThreads);
+    // Run the reader thread on an extra thread
+    ThreadPool readExecutor(1);
     if (!options.decompress) {
       // Add a job that reads the input and starts all the compression jobs
-      executor.add(
+      readExecutor.add(
           [&errorHolder, &outs, &executor, inputFd, inputSize, &options,
                                                                &bytesRead] {
             bytesRead = asyncCompressChunks(
@@ -91,7 +90,7 @@ static std::uint64_t handleOneInput(const Options &options,
                                options.verbosity);
     } else {
       // Add a job that reads the input and starts all the decompression jobs
-      executor.add([&errorHolder, &outs, &executor, inputFd, &bytesRead] {
+      readExecutor.add([&errorHolder, &outs, &executor, inputFd, &bytesRead] {
         bytesRead = asyncDecompressFrames(errorHolder, outs, executor, inputFd);
       });
       // Start writing