std::uintmax_t size,
size_t numThreads,
const ZSTD_parameters ¶ms) {
- size_t step = size_t{1} << (params.cParams.windowLog + 2);
- // If file size is known, see if a smaller step will spread work more evenly
- if (size != 0) {
- const std::uintmax_t newStep = size / numThreads;
- if (newStep != 0 && newStep <= std::numeric_limits<size_t>::max()) {
- step = std::min(step, static_cast<size_t>(newStep));
- }
- }
- return step;
+ return size_t{1} << (params.cParams.windowLog + 2);
}
namespace {
// Break the input up into chunks of size `step` and compress each chunk
// independently.
size_t step = calculateStep(size, numThreads, params);
+ state.log(DEBUG, "Chosen frame size: %zu\n", step);
auto status = FileStatus::Continue;
while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
// Make a new input queue that we will put the chunk's input data into.
});
// Pass the output queue to the writer thread.
chunks.push(std::move(out));
+ state.log(VERBOSE, "Starting a new frame\n");
// Fill the input queue for the compression job we just started
status = readData(*in, ZSTD_CStreamInSize(), step, fd, &bytesRead);
}
if (frameSize == 0) {
// We hit a non SkippableFrame ==> not compressed by pzstd or corrupted
// Pass the rest of the source to this decompression task
+ state.log(VERBOSE,
+ "Input not in pzstd format, falling back to serial decompression\n");
while (status == FileStatus::Continue && !state.errorHolder.hasError()) {
status = readData(*in, chunkSize, chunkSize, fd, &totalBytesRead);
}
break;
}
+ state.log(VERBOSE, "Decompressing a frame of size %zu", frameSize);
// Fill the input queue for the decompression job we just started
status = readData(*in, chunkSize, frameSize, fd, &totalBytesRead);
}
if (!options.decompress) {
auto parameters = options.determineParameters();
cStreamPool.reset(new ResourcePool<ZSTD_CStream>{
- [parameters]() -> ZSTD_CStream* {
+ [this, parameters]() -> ZSTD_CStream* {
+ this->log(VERBOSE, "Creating new ZSTD_CStream\n");
auto zcs = ZSTD_createCStream();
if (zcs) {
auto err = ZSTD_initCStream_advanced(
}});
} else {
dStreamPool.reset(new ResourcePool<ZSTD_DStream>{
- []() -> ZSTD_DStream* {
+ [this]() -> ZSTD_DStream* {
+ this->log(VERBOSE, "Creating new ZSTD_DStream\n");
auto zds = ZSTD_createDStream();
if (zds) {
auto err = ZSTD_initDStream(zds);
}
}
+ ~SharedState() {
+ // The resource pools have references to this, so destroy them first.
+ cStreamPool.reset();
+ dStreamPool.reset();
+ }
+
Logger log;
ErrorHolder errorHolder;
std::unique_ptr<ResourcePool<ZSTD_CStream>> cStreamPool;