]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
Fix memory usage issues. 333/head
authorNick Terrell <terrelln@fb.com>
Sat, 3 Sep 2016 03:11:22 +0000 (20:11 -0700)
committerNick Terrell <terrelln@fb.com>
Sat, 3 Sep 2016 03:11:22 +0000 (20:11 -0700)
contrib/pzstd/Makefile
contrib/pzstd/Pzstd.cpp
contrib/pzstd/utils/WorkQueue.h
contrib/pzstd/utils/test/WorkQueueTest.cpp

index c59a6d107a76835315fdce41d4dc887a8686caf0..d71cf5b340c38320641a2f3d5e7d36ca66b04333 100644 (file)
@@ -63,7 +63,9 @@ googletest:
        @cd googletest/build && cmake .. && make
 
 test: libzstd.a Pzstd.o Options.o SkippableFrame.o
+       $(MAKE) -C utils/test clean
        $(MAKE) -C utils/test test
+       $(MAKE) -C test clean
        $(MAKE) -C test test
 
 clean:
index 84f6a2e4c444615bcd3c6e16ba2388bf5ce15f84..ddfa595567e9768dbcc1060d88c80aaa379406ad 100644 (file)
@@ -67,11 +67,14 @@ size_t pzstdMain(const Options& options, ErrorHolder& errorHolder) {
 
   // WorkQueue outlives ThreadPool so in the case of error we are certain
   // we don't accidently try to call push() on it after it is destroyed.
-  WorkQueue<std::shared_ptr<BufferWorkQueue>> outs;
+  WorkQueue<std::shared_ptr<BufferWorkQueue>> outs{2 * options.numThreads};
   size_t bytesWritten;
   {
-    // Initialize the thread pool with numThreads
-    ThreadPool executor(options.numThreads);
+    // Initialize the thread pool with numThreads + 1
+    // We add one because the read thread spends most of its time waiting.
+    // This also sets the minimum number of threads to 2, so the algorithm
+    // doesn't deadlock.
+    ThreadPool executor(options.numThreads + 1);
     if (!options.decompress) {
       // Add a job that reads the input and starts all the compression jobs
       executor.add(
@@ -229,6 +232,15 @@ calculateStep(size_t size, size_t numThreads, const ZSTD_parameters& params) {
 
 namespace {
 enum class FileStatus { Continue, Done, Error };
+/// Determines the status of the file descriptor `fd`.
+FileStatus fileStatus(FILE* fd) {
+  if (std::feof(fd)) {
+    return FileStatus::Done;
+  } else if (std::ferror(fd)) {
+    return FileStatus::Error;
+  }
+  return FileStatus::Continue;
+}
 } // anonymous namespace
 
 /**
@@ -243,10 +255,9 @@ readData(BufferWorkQueue& queue, size_t chunkSize, size_t size, FILE* fd) {
     auto bytesRead =
         std::fread(buffer.data(), 1, std::min(chunkSize, buffer.size()), fd);
     queue.push(buffer.splitAt(bytesRead));
-    if (std::feof(fd)) {
-      return FileStatus::Done;
-    } else if (std::ferror(fd) || bytesRead == 0) {
-      return FileStatus::Error;
+    auto status = fileStatus(fd);
+    if (status != FileStatus::Continue) {
+      return status;
     }
   }
   return FileStatus::Continue;
@@ -388,6 +399,7 @@ void asyncDecompressFrames(
       // frameSize is 0 if the frame info can't be decoded.
       Buffer buffer(SkippableFrame::kSize);
       auto bytesRead = std::fread(buffer.data(), 1, buffer.size(), fd);
+      status = fileStatus(fd);
       if (bytesRead == 0 && status != FileStatus::Continue) {
         break;
       }
@@ -395,6 +407,12 @@ void asyncDecompressFrames(
       frameSize = SkippableFrame::tryRead(buffer.range());
       in->push(std::move(buffer));
     }
+    if (frameSize == 0) {
+      // We hit a non SkippableFrame, so this will be the last job.
+      // Make sure that we don't use too much memory
+      in->setMaxSize(64);
+      out->setMaxSize(64);
+    }
     // Start decompression in the thread pool
     executor.add([&errorHolder, in, out] {
       return decompress(errorHolder, std::move(in), std::move(out));
index 2fa417f419fec09a58080029238e46e5136aebbd..538213500425cf16e35171be27fa2220d85e0fb0 100644 (file)
@@ -99,6 +99,19 @@ class WorkQueue {
     return true;
   }
 
+  /**
+   * Sets the maximum queue size.  If `maxSize == 0` then it is unbounded.
+   *
+   * @param maxSize The new maximum queue size.
+   */
+  void setMaxSize(std::size_t maxSize) {
+    {
+      std::lock_guard<std::mutex> lock(mutex_);
+      maxSize_ = maxSize;
+    }
+    writerCv_.notify_all();
+  }
+
   /**
    * Promise that `push()` won't be called again, so once the queue is empty
    * there will never any more work.
@@ -149,6 +162,10 @@ class BufferWorkQueue {
     return result;
   }
 
+  void setMaxSize(std::size_t maxSize) {
+    queue_.setMaxSize(maxSize);
+  }
+
   void finish() {
     queue_.finish();
   }
index 074891fda9bb0076d5b700aa7f06767e0142f354..84d8573c321ed25cdabfa235c0c307e162172330 100644 (file)
@@ -175,6 +175,27 @@ TEST(WorkQueue, BoundedSizePushAfterFinish) {
   pusher.join();
 }
 
+TEST(WorkQueue, SetMaxSize) {
+  WorkQueue<int> queue(2);
+  int result;
+  queue.push(5);
+  queue.push(6);
+  queue.setMaxSize(1);
+  std::thread pusher([&queue] {
+    queue.push(7);
+  });
+  // Dirtily try and make sure that pusher has run.
+  std::this_thread::sleep_for(std::chrono::seconds(1));
+  queue.finish();
+  EXPECT_TRUE(queue.pop(result));
+  EXPECT_EQ(5, result);
+  EXPECT_TRUE(queue.pop(result));
+  EXPECT_EQ(6, result);
+  EXPECT_FALSE(queue.pop(result));
+
+  pusher.join();
+}
+
 TEST(WorkQueue, BoundedSizeMPMC) {
   WorkQueue<int> queue(100);
   std::vector<int> results(10000, -1);