]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
[contrib/pzstd] Prevent hangs when there are errors
authorSean Bartell <yotann@yotann.org>
Thu, 20 Jun 2024 20:52:43 +0000 (15:52 -0500)
committerNick Terrell <nickrterrell@gmail.com>
Mon, 13 Jan 2025 22:24:41 +0000 (17:24 -0500)
When two threads are using a WorkQueue and the reader thread exits due
to an error, it must call WorkQueue::finish() to wake up the writer
thread. Otherwise, if the queue is full and the writer thread is waiting
for a free slot, it could hang forever.

This can happen in pratice when decompressing a large, corrupted file
that does not contain pzstd skippable frames.

contrib/pzstd/Pzstd.cpp
contrib/pzstd/utils/WorkQueue.h

index 67b9419914928c7446e1c564931c73a222c8dbcc..048a006b3b3742d0710a7029f72e6c200fac203b 100644 (file)
@@ -269,7 +269,10 @@ static void compress(
     std::shared_ptr<BufferWorkQueue> out,
     size_t maxInputSize) {
   auto& errorHolder = state.errorHolder;
-  auto guard = makeScopeGuard([&] { out->finish(); });
+  auto guard = makeScopeGuard([&] {
+    in->finish();
+    out->finish();
+  });
   // Initialize the CCtx
   auto ctx = state.cStreamPool->get();
   if (!errorHolder.check(ctx != nullptr, "Failed to allocate ZSTD_CStream")) {
@@ -431,7 +434,10 @@ static void decompress(
     std::shared_ptr<BufferWorkQueue> in,
     std::shared_ptr<BufferWorkQueue> out) {
   auto& errorHolder = state.errorHolder;
-  auto guard = makeScopeGuard([&] { out->finish(); });
+  auto guard = makeScopeGuard([&] {
+    in->finish();
+    out->finish();
+  });
   // Initialize the DCtx
   auto ctx = state.dStreamPool->get();
   if (!errorHolder.check(ctx != nullptr, "Failed to allocate ZSTD_DStream")) {
@@ -578,6 +584,7 @@ std::uint64_t writeFile(
     FILE* outputFd,
     bool decompress) {
   auto& errorHolder = state.errorHolder;
+  auto outsFinishGuard = makeScopeGuard([&outs] { outs.finish(); });
   auto lineClearGuard = makeScopeGuard([&state] {
     state.log.clear(kLogInfo);
   });
@@ -585,6 +592,7 @@ std::uint64_t writeFile(
   std::shared_ptr<BufferWorkQueue> out;
   // Grab the output queue for each decompression job (in order).
   while (outs.pop(out)) {
+    auto outFinishGuard = makeScopeGuard([&out] { out->finish(); });
     if (errorHolder.hasError()) {
       continue;
     }
index d7947b814dd5305300139a2c43f89a1bc25ee6f0..07842e59817e84fdbe3ae2c6b900e7d14d7ff16a 100644 (file)
@@ -115,13 +115,14 @@ class WorkQueue {
   }
 
   /**
-   * Promise that `push()` won't be called again, so once the queue is empty
-   * there will never any more work.
+   * Promise that either the reader side or the writer side is done.
+   * If the writer is done, `push()` won't be called again, so once the queue
+   * is empty there will never be any more work. If the reader is done, `pop()`
+   * won't be called again, so further items pushed will just be ignored.
    */
   void finish() {
     {
       std::lock_guard<std::mutex> lock(mutex_);
-      assert(!done_);
       done_ = true;
     }
     readerCv_.notify_all();