From: Joel Rosdahl Date: Wed, 29 Oct 2025 14:33:49 +0000 (+0100) Subject: test: Add unit tests for util::ThreadPool X-Git-Tag: v4.13~87 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=85a561f56ddcfdc3f360d7ed35fb3ecfdc2f7890;p=thirdparty%2Fccache.git test: Add unit tests for util::ThreadPool --- diff --git a/unittest/CMakeLists.txt b/unittest/CMakeLists.txt index 524304fb..98e9fdc8 100644 --- a/unittest/CMakeLists.txt +++ b/unittest/CMakeLists.txt @@ -32,6 +32,7 @@ set( test_util_path.cpp test_util_string.cpp test_util_texttable.cpp + test_util_threadpool.cpp test_util_tokenizer.cpp test_util_xxh3_128.cpp test_util_xxh3_64.cpp diff --git a/unittest/test_util_threadpool.cpp b/unittest/test_util_threadpool.cpp new file mode 100644 index 00000000..08eea374 --- /dev/null +++ b/unittest/test_util_threadpool.cpp @@ -0,0 +1,357 @@ +// Copyright (C) 2025 Joel Rosdahl and other contributors +// +// See doc/authors.adoc for a complete list of contributors. +// +// This program is free software; you can redistribute it and/or modify it +// under the terms of the GNU General Public License as published by the Free +// Software Foundation; either version 3 of the License, or (at your option) +// any later version. +// +// This program is distributed in the hope that it will be useful, but WITHOUT +// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +// more details. +// +// You should have received a copy of the GNU General Public License along with +// this program; if not, write to the Free Software Foundation, Inc., 51 +// Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +#include + +#include + +#include +#include +#include +#include +#include +#include +#include + +using namespace std::chrono_literals; + +TEST_SUITE_BEGIN("util"); + +TEST_CASE("ThreadPool basic functionality") +{ + SUBCASE("single task execution") + { + util::ThreadPool pool(2); + std::atomic counter{0}; + + pool.enqueue([&counter] { ++counter; }); + pool.shut_down(); + + CHECK(counter == 1); + } + + SUBCASE("multiple tasks execution") + { + util::ThreadPool pool(2); + std::atomic counter{0}; + const int num_tasks = 10; + + for (int i = 0; i < num_tasks; ++i) { + pool.enqueue([&counter] { ++counter; }); + } + pool.shut_down(); + + CHECK(counter == num_tasks); + } + + SUBCASE("task execution with multiple threads") + { + util::ThreadPool pool(4); + std::atomic counter{0}; + const int num_tasks = 100; + + for (int i = 0; i < num_tasks; ++i) { + pool.enqueue([&counter] { ++counter; }); + } + pool.shut_down(); + + CHECK(counter == num_tasks); + } + + SUBCASE("minimum thread count is 1") + { + util::ThreadPool pool(0); // Should create at least 1 thread + std::atomic executed{false}; + + pool.enqueue([&executed] { executed = true; }); + pool.shut_down(); + + CHECK(executed); + } +} + +TEST_CASE("ThreadPool task queue limits") +{ + SUBCASE("respects maximum queue size") + { + const size_t max_queue_size = 5; + util::ThreadPool pool(1, max_queue_size); + std::atomic counter{0}; + + // Fill the queue and keep the worker busy. + std::mutex mutex; + std::condition_variable cv; + bool first_task_can_finish = false; + + pool.enqueue([&mutex, &cv, &first_task_can_finish] { + std::unique_lock lock(mutex); + cv.wait(lock, [&] { return first_task_can_finish; }); + }); + + // Enqueue tasks to fill the queue. + for (size_t i = 0; i < max_queue_size; ++i) { + pool.enqueue([&counter] { ++counter; }); + } + + // Try to enqueue one more task in a separate thread - it should block. + std::atomic extra_task_enqueued{false}; + std::thread enqueue_thread([&pool, &counter, &extra_task_enqueued] { + pool.enqueue([&counter] { ++counter; }); + extra_task_enqueued = true; + }); + + // Give the enqueue thread a chance to block (not ideal but necessary). + std::this_thread::yield(); + std::this_thread::yield(); + CHECK(!extra_task_enqueued); + + // Allow tasks to complete. + { + std::lock_guard lock(mutex); + first_task_can_finish = true; + } + cv.notify_one(); + enqueue_thread.join(); + + pool.shut_down(); + CHECK(counter == max_queue_size + 1); + CHECK(extra_task_enqueued); + } + + SUBCASE("unlimited queue size by default") + { + util::ThreadPool pool(1); + std::atomic counter{0}; + + // Keep worker busy. + std::mutex mutex; + std::condition_variable cv; + bool can_finish = false; + + pool.enqueue([&mutex, &cv, &can_finish] { + std::unique_lock lock(mutex); + cv.wait(lock, [&] { return can_finish; }); + }); + + // Enqueue many tasks - should not block. + const int num_tasks = 1000; + for (int i = 0; i < num_tasks; ++i) { + pool.enqueue([&counter] { ++counter; }); + } + + { + std::lock_guard lock(mutex); + can_finish = true; + } + cv.notify_one(); + pool.shut_down(); + + CHECK(counter == num_tasks); + } +} + +TEST_CASE("ThreadPool shutdown behavior") +{ + SUBCASE("shutdown waits for all tasks to complete") + { + util::ThreadPool pool(2); + std::atomic counter{0}; + const int num_tasks = 10; + + // Use a condition variable to ensure tasks take some time but without + // arbitrary sleeps. + std::mutex mutex; + std::condition_variable cv; + std::atomic ready_count{0}; + + for (int i = 0; i < num_tasks; ++i) { + pool.enqueue([&counter, &mutex, &cv, &ready_count] { + // Signal that this task is running. + ready_count++; + cv.notify_all(); + + // Wait briefly to simulate work. + std::unique_lock lock(mutex); + cv.wait_for(lock, 10ms); + + ++counter; + }); + } + + pool.shut_down(); + CHECK(counter == num_tasks); + } + + SUBCASE("enqueue after shutdown does nothing") + { + util::ThreadPool pool(2); + std::atomic counter{0}; + + pool.shut_down(); + + pool.enqueue([&counter] { ++counter; }); + + // No need to wait - the enqueue after shutdown should be a no-op. + CHECK(counter == 0); + } + + SUBCASE("multiple shutdown calls are safe") + { + util::ThreadPool pool(2); + std::atomic counter{0}; + + pool.enqueue([&counter] { ++counter; }); + + pool.shut_down(); + pool.shut_down(); // Should be safe to call multiple times + pool.shut_down(); + + CHECK(counter == 1); + } + + SUBCASE("destructor calls shutdown") + { + std::atomic counter{0}; + + { + util::ThreadPool pool(2); + pool.enqueue([&counter] { ++counter; }); + // Destructor should call shut_down(). + } + + CHECK(counter == 1); + } +} + +TEST_CASE("ThreadPool exception handling") +{ + SUBCASE("exception in task does not crash thread pool") + { + util::ThreadPool pool(2); + std::atomic counter{0}; + + pool.enqueue([] { throw std::runtime_error("Test exception"); }); + + pool.enqueue([&counter] { ++counter; }); + pool.enqueue([&counter] { ++counter; }); + + pool.shut_down(); + + // Tasks after the exception should still execute. + CHECK(counter == 2); + } + + SUBCASE("multiple exceptions do not crash thread pool") + { + util::ThreadPool pool(2); + std::atomic counter{0}; + + for (int i = 0; i < 5; ++i) { + pool.enqueue([] { throw std::runtime_error("Test exception"); }); + pool.enqueue([&counter] { ++counter; }); + } + + pool.shut_down(); + + CHECK(counter == 5); + } + + SUBCASE("unknown exception does not crash thread pool") + { + util::ThreadPool pool(2); + std::atomic counter{0}; + + pool.enqueue([] { throw 42; }); + pool.enqueue([&counter] { ++counter; }); + + pool.shut_down(); + + CHECK(counter == 1); + } +} + +TEST_CASE("ThreadPool concurrent access") +{ + SUBCASE("multiple threads can safely enqueue tasks") + { + util::ThreadPool pool(4); + std::atomic counter{0}; + const int num_producer_threads = 8; + const int tasks_per_thread = 50; + + std::vector producer_threads; + for (int i = 0; i < num_producer_threads; ++i) { + producer_threads.emplace_back([&] { + for (int j = 0; j < tasks_per_thread; ++j) { + pool.enqueue([&counter] { ++counter; }); + } + }); + } + + for (auto& thread : producer_threads) { + thread.join(); + } + + pool.shut_down(); + + CHECK(counter == num_producer_threads * tasks_per_thread); + } +} + +TEST_CASE("ThreadPool task ordering") +{ + SUBCASE("tasks execute in FIFO order on single thread") + { + util::ThreadPool pool(1); + std::vector execution_order; + std::mutex order_mutex; + + // Keep worker busy initially. + std::mutex start_mutex; + std::condition_variable start_cv; + bool can_start = false; + + pool.enqueue([&start_mutex, &start_cv, &can_start] { + std::unique_lock lock(start_mutex); + start_cv.wait(lock, [&] { return can_start; }); + }); + + // Enqueue tasks in order. + const int num_tasks = 10; + for (int i = 0; i < num_tasks; ++i) { + pool.enqueue([&execution_order, &order_mutex, i] { + std::lock_guard lock(order_mutex); + execution_order.push_back(i); + }); + } + + { + std::lock_guard lock(start_mutex); + can_start = true; + } + start_cv.notify_one(); + pool.shut_down(); + + REQUIRE(execution_order.size() == num_tasks); + for (int i = 0; i < num_tasks; ++i) { + CHECK(execution_order[i] == i); + } + } +} + +TEST_SUITE_END();