// WorkQueue outlives ThreadPool so in the case of error we are certain
// we don't accidently try to call push() on it after it is destroyed.
- WorkQueue<std::shared_ptr<BufferWorkQueue>> outs;
+ WorkQueue<std::shared_ptr<BufferWorkQueue>> outs{2 * options.numThreads};
size_t bytesWritten;
{
- // Initialize the thread pool with numThreads
- ThreadPool executor(options.numThreads);
+ // Initialize the thread pool with numThreads + 1
+ // We add one because the read thread spends most of its time waiting.
+ // This also sets the minimum number of threads to 2, so the algorithm
+ // doesn't deadlock.
+ ThreadPool executor(options.numThreads + 1);
if (!options.decompress) {
// Add a job that reads the input and starts all the compression jobs
executor.add(
namespace {
enum class FileStatus { Continue, Done, Error };
+/// Determines the status of the file descriptor `fd`.
+FileStatus fileStatus(FILE* fd) {
+ if (std::feof(fd)) {
+ return FileStatus::Done;
+ } else if (std::ferror(fd)) {
+ return FileStatus::Error;
+ }
+ return FileStatus::Continue;
+}
} // anonymous namespace
/**
auto bytesRead =
std::fread(buffer.data(), 1, std::min(chunkSize, buffer.size()), fd);
queue.push(buffer.splitAt(bytesRead));
- if (std::feof(fd)) {
- return FileStatus::Done;
- } else if (std::ferror(fd) || bytesRead == 0) {
- return FileStatus::Error;
+ auto status = fileStatus(fd);
+ if (status != FileStatus::Continue) {
+ return status;
}
}
return FileStatus::Continue;
// frameSize is 0 if the frame info can't be decoded.
Buffer buffer(SkippableFrame::kSize);
auto bytesRead = std::fread(buffer.data(), 1, buffer.size(), fd);
+ status = fileStatus(fd);
if (bytesRead == 0 && status != FileStatus::Continue) {
break;
}
frameSize = SkippableFrame::tryRead(buffer.range());
in->push(std::move(buffer));
}
+ if (frameSize == 0) {
+ // We hit a non SkippableFrame, so this will be the last job.
+ // Make sure that we don't use too much memory
+ in->setMaxSize(64);
+ out->setMaxSize(64);
+ }
// Start decompression in the thread pool
executor.add([&errorHolder, in, out] {
return decompress(errorHolder, std::move(in), std::move(out));
return true;
}
+ /**
+ * Sets the maximum queue size. If `maxSize == 0` then it is unbounded.
+ *
+ * @param maxSize The new maximum queue size.
+ */
+ void setMaxSize(std::size_t maxSize) {
+ {
+ std::lock_guard<std::mutex> lock(mutex_);
+ maxSize_ = maxSize;
+ }
+ writerCv_.notify_all();
+ }
+
/**
* Promise that `push()` won't be called again, so once the queue is empty
* there will never any more work.
return result;
}
+ void setMaxSize(std::size_t maxSize) {
+ queue_.setMaxSize(maxSize);
+ }
+
void finish() {
queue_.finish();
}
pusher.join();
}
+TEST(WorkQueue, SetMaxSize) {
+ WorkQueue<int> queue(2);
+ int result;
+ queue.push(5);
+ queue.push(6);
+ queue.setMaxSize(1);
+ std::thread pusher([&queue] {
+ queue.push(7);
+ });
+ // Dirtily try and make sure that pusher has run.
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ queue.finish();
+ EXPECT_TRUE(queue.pop(result));
+ EXPECT_EQ(5, result);
+ EXPECT_TRUE(queue.pop(result));
+ EXPECT_EQ(6, result);
+ EXPECT_FALSE(queue.pop(result));
+
+ pusher.join();
+}
+
TEST(WorkQueue, BoundedSizeMPMC) {
WorkQueue<int> queue(100);
std::vector<int> results(10000, -1);