]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
[pzstd] Fix frame size for small files + add logging
authorNick Terrell <terrelln@fb.com>
Wed, 16 Nov 2016 00:39:09 +0000 (16:39 -0800)
committerNick Terrell <terrelln@fb.com>
Wed, 16 Nov 2016 00:39:09 +0000 (16:39 -0800)
contrib/pzstd/Pzstd.cpp
contrib/pzstd/Pzstd.h

index c5b4ce4cb0a0a3a2ec746f843b945c6bc575e903..1778fef25e7b11d67125c2a6a40648a4aa3fb607 100644 (file)
@@ -341,15 +341,7 @@ static size_t calculateStep(
     std::uintmax_t size,
     size_t numThreads,
     const ZSTD_parameters &params) {
-  size_t step = size_t{1} << (params.cParams.windowLog + 2);
-  // If file size is known, see if a smaller step will spread work more evenly
-  if (size != 0) {
-    const std::uintmax_t newStep = size / numThreads;
-    if (newStep != 0 && newStep <= std::numeric_limits<size_t>::max()) {
-      step = std::min(step, static_cast<size_t>(newStep));
-    }
-  }
-  return step;
+  return size_t{1} << (params.cParams.windowLog + 2);
 }
 
 namespace {
@@ -401,6 +393,7 @@ std::uint64_t asyncCompressChunks(
   // Break the input up into chunks of size `step` and compress each chunk
   // independently.
   size_t step = calculateStep(size, numThreads, params);
+  state.log(DEBUG, "Chosen frame size: %zu\n", step);
   auto status = FileStatus::Continue;
   while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
     // Make a new input queue that we will put the chunk's input data into.
@@ -415,6 +408,7 @@ std::uint64_t asyncCompressChunks(
     });
     // Pass the output queue to the writer thread.
     chunks.push(std::move(out));
+    state.log(VERBOSE, "Starting a new frame\n");
     // Fill the input queue for the compression job we just started
     status = readData(*in, ZSTD_CStreamInSize(), step, fd, &bytesRead);
   }
@@ -551,11 +545,14 @@ std::uint64_t asyncDecompressFrames(
     if (frameSize == 0) {
       // We hit a non SkippableFrame ==> not compressed by pzstd or corrupted
       // Pass the rest of the source to this decompression task
+      state.log(VERBOSE,
+          "Input not in pzstd format, falling back to serial decompression\n");
       while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
         status = readData(*in, chunkSize, chunkSize, fd, &totalBytesRead);
       }
       break;
     }
+    state.log(VERBOSE, "Decompressing a frame of size %zu", frameSize);
     // Fill the input queue for the decompression job we just started
     status = readData(*in, chunkSize, frameSize, fd, &totalBytesRead);
   }
index 9fb2c488405279ac3a93552bf8d60c153fe426c6..dc60dd9b862c11fc8ad081dfd5c70bbe69227174 100644 (file)
@@ -40,7 +40,8 @@ class SharedState {
     if (!options.decompress) {
       auto parameters = options.determineParameters();
       cStreamPool.reset(new ResourcePool<ZSTD_CStream>{
-          [parameters]() -> ZSTD_CStream* {
+          [this, parameters]() -> ZSTD_CStream* {
+            this->log(VERBOSE, "Creating new ZSTD_CStream\n");
             auto zcs = ZSTD_createCStream();
             if (zcs) {
               auto err = ZSTD_initCStream_advanced(
@@ -57,7 +58,8 @@ class SharedState {
           }});
     } else {
       dStreamPool.reset(new ResourcePool<ZSTD_DStream>{
-          []() -> ZSTD_DStream* {
+          [this]() -> ZSTD_DStream* {
+            this->log(VERBOSE, "Creating new ZSTD_DStream\n");
             auto zds = ZSTD_createDStream();
             if (zds) {
               auto err = ZSTD_initDStream(zds);
@@ -74,6 +76,12 @@ class SharedState {
     }
   }
 
+  ~SharedState() {
+    // The resource pools have references to this, so destroy them first.
+    cStreamPool.reset();
+    dStreamPool.reset();
+  }
+
   Logger log;
   ErrorHolder errorHolder;
   std::unique_ptr<ResourcePool<ZSTD_CStream>> cStreamPool;