std::shared_ptr<BufferWorkQueue> out,
size_t maxInputSize) {
auto& errorHolder = state.errorHolder;
- auto guard = makeScopeGuard([&] { out->finish(); });
+ auto guard = makeScopeGuard([&] {
+ in->finish();
+ out->finish();
+ });
// Initialize the CCtx
auto ctx = state.cStreamPool->get();
if (!errorHolder.check(ctx != nullptr, "Failed to allocate ZSTD_CStream")) {
std::shared_ptr<BufferWorkQueue> in,
std::shared_ptr<BufferWorkQueue> out) {
auto& errorHolder = state.errorHolder;
- auto guard = makeScopeGuard([&] { out->finish(); });
+ auto guard = makeScopeGuard([&] {
+ in->finish();
+ out->finish();
+ });
// Initialize the DCtx
auto ctx = state.dStreamPool->get();
if (!errorHolder.check(ctx != nullptr, "Failed to allocate ZSTD_DStream")) {
FILE* outputFd,
bool decompress) {
auto& errorHolder = state.errorHolder;
+ auto outsFinishGuard = makeScopeGuard([&outs] { outs.finish(); });
auto lineClearGuard = makeScopeGuard([&state] {
state.log.clear(kLogInfo);
});
std::shared_ptr<BufferWorkQueue> out;
// Grab the output queue for each decompression job (in order).
while (outs.pop(out)) {
+ auto outFinishGuard = makeScopeGuard([&out] { out->finish(); });
if (errorHolder.hasError()) {
continue;
}
}
/**
- * Promise that `push()` won't be called again, so once the queue is empty
- * there will never any more work.
+ * Promise that either the reader side or the writer side is done.
+ * If the writer is done, `push()` won't be called again, so once the queue
+ * is empty there will never be any more work. If the reader is done, `pop()`
+ * won't be called again, so further items pushed will just be ignored.
*/
void finish() {
{
std::lock_guard<std::mutex> lock(mutex_);
- assert(!done_);
done_ = true;
}
readerCv_.notify_all();