return size;
}
-static size_t handleOneInput(const Options &options,
+static std::uint64_t handleOneInput(const Options &options,
const std::string &inputFile,
FILE* inputFd,
+ const std::string &outputFile,
FILE* outputFd,
ErrorHolder &errorHolder) {
auto inputSize = fileSizeOrZero(inputFile);
// WorkQueue outlives ThreadPool so in the case of error we are certain
// we don't accidently try to call push() on it after it is destroyed.
WorkQueue<std::shared_ptr<BufferWorkQueue>> outs{options.numThreads + 1};
- size_t bytesWritten;
+ std::uint64_t bytesRead;
+ std::uint64_t bytesWritten;
{
// Initialize the thread pool with numThreads + 1
// We add one because the read thread spends most of its time waiting.
if (!options.decompress) {
// Add a job that reads the input and starts all the compression jobs
executor.add(
- [&errorHolder, &outs, &executor, inputFd, inputSize, &options] {
- asyncCompressChunks(
+ [&errorHolder, &outs, &executor, inputFd, inputSize, &options,
+ &bytesRead] {
+ bytesRead = asyncCompressChunks(
errorHolder,
outs,
executor,
bytesWritten = writeFile(errorHolder, outs, outputFd, options.decompress);
} else {
// Add a job that reads the input and starts all the decompression jobs
- executor.add([&errorHolder, &outs, &executor, inputFd] {
- asyncDecompressFrames(errorHolder, outs, executor, inputFd);
+ executor.add([&errorHolder, &outs, &executor, inputFd, &bytesRead] {
+ bytesRead = asyncDecompressFrames(errorHolder, outs, executor, inputFd);
});
// Start writing
bytesWritten = writeFile(errorHolder, outs, outputFd, options.decompress);
}
}
+ if (options.verbosity > 1 && !errorHolder.hasError()) {
+ std::string inputFileName = inputFile == "-" ? "stdin" : inputFile;
+ std::string outputFileName = outputFile == "-" ? "stdout" : outputFile;
+ if (!options.decompress) {
+ double ratio = static_cast<double>(bytesWritten) /
+ static_cast<double>(bytesRead + !bytesRead);
+ std::fprintf(stderr, "%-20s :%6.2f%% (%6llu => %6llu bytes, %s)\n",
+ inputFileName.c_str(), ratio * 100, bytesRead, bytesWritten,
+ outputFileName.c_str());
+ } else {
+ std::fprintf(stderr, "%-20s: %llu bytes \n",
+ inputFileName.c_str(),bytesWritten);
+ }
+ }
return bytesWritten;
}
}
auto closeOutputGuard = makeScopeGuard([&] { std::fclose(outputFd); });
// (de)compress the file
- handleOneInput(options, input, inputFd, outputFd, errorHolder);
+ handleOneInput(options, input, inputFd, outputFile, outputFd, errorHolder);
if (errorHolder.hasError()) {
continue;
}
* Returns the status of the file after all of the reads have occurred.
*/
static FileStatus
-readData(BufferWorkQueue& queue, size_t chunkSize, size_t size, FILE* fd) {
+readData(BufferWorkQueue& queue, size_t chunkSize, size_t size, FILE* fd,
+ std::uint64_t *totalBytesRead) {
Buffer buffer(size);
while (!buffer.empty()) {
auto bytesRead =
std::fread(buffer.data(), 1, std::min(chunkSize, buffer.size()), fd);
+ *totalBytesRead += bytesRead;
queue.push(buffer.splitAt(bytesRead));
auto status = fileStatus(fd);
if (status != FileStatus::Continue) {
return FileStatus::Continue;
}
-void asyncCompressChunks(
+std::uint64_t asyncCompressChunks(
ErrorHolder& errorHolder,
WorkQueue<std::shared_ptr<BufferWorkQueue>>& chunks,
ThreadPool& executor,
size_t numThreads,
ZSTD_parameters params) {
auto chunksGuard = makeScopeGuard([&] { chunks.finish(); });
+ std::uint64_t bytesRead = 0;
// Break the input up into chunks of size `step` and compress each chunk
// independently.
// Pass the output queue to the writer thread.
chunks.push(std::move(out));
// Fill the input queue for the compression job we just started
- status = readData(*in, ZSTD_CStreamInSize(), step, fd);
+ status = readData(*in, ZSTD_CStreamInSize(), step, fd, &bytesRead);
}
errorHolder.check(status != FileStatus::Error, "Error reading input");
+ return bytesRead;
}
/**
}
}
-void asyncDecompressFrames(
+std::uint64_t asyncDecompressFrames(
ErrorHolder& errorHolder,
WorkQueue<std::shared_ptr<BufferWorkQueue>>& frames,
ThreadPool& executor,
FILE* fd) {
auto framesGuard = makeScopeGuard([&] { frames.finish(); });
+ std::uint64_t totalBytesRead = 0;
+
// Split the source up into its component frames.
// If we find our recognized skippable frame we know the next frames size
// which means that we can decompress each standard frame in independently.
// frameSize is 0 if the frame info can't be decoded.
Buffer buffer(SkippableFrame::kSize);
auto bytesRead = std::fread(buffer.data(), 1, buffer.size(), fd);
+ totalBytesRead += bytesRead;
status = fileStatus(fd);
if (bytesRead == 0 && status != FileStatus::Continue) {
break;
// We hit a non SkippableFrame ==> not compressed by pzstd or corrupted
// Pass the rest of the source to this decompression task
while (status == FileStatus::Continue && !errorHolder.hasError()) {
- status = readData(*in, chunkSize, chunkSize, fd);
+ status = readData(*in, chunkSize, chunkSize, fd, &totalBytesRead);
}
break;
}
// Fill the input queue for the decompression job we just started
- status = readData(*in, chunkSize, frameSize, fd);
+ status = readData(*in, chunkSize, frameSize, fd, &totalBytesRead);
}
errorHolder.check(status != FileStatus::Error, "Error reading input");
+ return totalBytesRead;
}
/// Write `data` to `fd`, returns true iff success.
return true;
}
-size_t writeFile(
+std::uint64_t writeFile(
ErrorHolder& errorHolder,
WorkQueue<std::shared_ptr<BufferWorkQueue>>& outs,
FILE* outputFd,
bool decompress) {
- size_t bytesWritten = 0;
+ std::uint64_t bytesWritten = 0;
std::shared_ptr<BufferWorkQueue> out;
// Grab the output queue for each decompression job (in order).
while (outs.pop(out) && !errorHolder.hasError()) {