From d47d1c5756235d4e953a5dcaed43a13fe25c3584 Mon Sep 17 00:00:00 2001 From: Ilya Leoshkevich Date: Fri, 3 Jun 2022 15:38:19 +0200 Subject: [PATCH] Add a test for concurrently modifying deflate() input The test simulates what one of the QEMU live migration tests is doing: increments each buffer byte by 1 while deflate()ing it. The test tries to produce a race condition and therefore is probabilistic. The longer it runs, the better are the chances to catch an issue. The scenario in question is known to be broken on IBM Z with DFLTCC, and there it is caught in 100ms most of the time. The run time is therefore set to 1 second in order to balance usability and reliability. --- test/CMakeLists.txt | 13 ++- test/test_deflate_concurrency.cc | 170 +++++++++++++++++++++++++++++++ 2 files changed, 180 insertions(+), 3 deletions(-) create mode 100644 test/test_deflate_concurrency.cc diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index fc06ce7b..3a47e8b2 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -22,9 +22,6 @@ if(NOT TARGET GTest::GTest) # Prevent overriding the parent project's compiler/linker settings for Windows set(gtest_force_shared_crt ON CACHE BOOL "Use shared (DLL) run-time lib even when Google Test is built as static lib." FORCE) - # Disable pthreads for simplicity - set(gtest_disable_pthreads ON CACHE BOOL - "Disable uses of pthreads in gtest." FORCE) # Allow specifying alternative Google test repository if(NOT DEFINED GTEST_REPOSITORY) @@ -100,6 +97,16 @@ endif() target_link_libraries(gtest_zlib zlibstatic GTest::GTest) +find_package(Threads) +if(Threads_FOUND) + target_sources(gtest_zlib PRIVATE test_deflate_concurrency.cc) + if(UNIX AND NOT APPLE) + # On Linux, use a workaround for https://gcc.gnu.org/bugzilla/show_bug.cgi?id=52590 + target_link_libraries(gtest_zlib -Wl,--whole-archive -lpthread -Wl,--no-whole-archive) + endif() + target_link_libraries(gtest_zlib Threads::Threads) +endif() + if(ZLIB_ENABLE_TESTS) add_test(NAME gtest_zlib COMMAND ${CMAKE_CROSSCOMPILING_EMULATOR} $) diff --git a/test/test_deflate_concurrency.cc b/test/test_deflate_concurrency.cc new file mode 100644 index 00000000..1297aee6 --- /dev/null +++ b/test/test_deflate_concurrency.cc @@ -0,0 +1,170 @@ +/* Test deflate() on concurrently modified next_in. + * + * Plain zlib does not document that this is supported, but in practice it tolerates this, and QEMU live migration is + * known to rely on this. Make sure zlib-ng tolerates this as well. + */ + +#include "zbuild.h" +#ifdef ZLIB_COMPAT +#include "zlib.h" +#else +#include "zlib-ng.h" +#endif + +#include + +#include +#include +#include +#include + +static uint8_t buf[8 * 1024]; +static uint8_t zbuf[4 * 1024]; +static uint8_t tmp[8 * 1024]; + +/* Thread that increments all bytes in buf by 1. */ +class Mutator { + enum class State { + PAUSED, + RUNNING, + STOPPED, + }; + +public: + Mutator() + : m_state(State::PAUSED), m_target_state(State::PAUSED), + m_thread(&Mutator::run, this) {} + ~Mutator() { + transition(State::STOPPED); + m_thread.join(); + } + + void pause() { + transition(State::PAUSED); + } + + void resume() { + transition(State::RUNNING); + } + +private: + void run() { + while (true) { + m_state.store(m_target_state); + if (m_state == State::PAUSED) + continue; + if (m_state == State::STOPPED) + break; + for (uint8_t & i: buf) + i++; + } + } + + void transition(State target_state) { + m_target_state = target_state; + while (m_state != target_state) { + } + } + + std::atomic m_state, m_target_state; + std::thread m_thread; +}; + +TEST(deflate, concurrency) { +#ifdef S390_DFLTCC_DEFLATE + GTEST_SKIP() << "Known to be broken with S390_DFLTCC_DEFLATE"; +#endif + + /* Create reusable mutator and streams. */ + Mutator mutator; + + PREFIX3(stream) dstrm; + memset(&dstrm, 0, sizeof(dstrm)); + int err = PREFIX(deflateInit2)(&dstrm, Z_BEST_SPEED, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY); + ASSERT_EQ(Z_OK, err) << dstrm.msg; + + PREFIX3(stream) istrm; + memset(&istrm, 0, sizeof(istrm)); + err = PREFIX(inflateInit2)(&istrm, -15); + ASSERT_EQ(Z_OK, err) << istrm.msg; + + /* Iterate for a certain amount of time. */ + auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(1); + while (std::chrono::steady_clock::now() < deadline) { + /* Start each iteration with a fresh stream state. */ + err = PREFIX(deflateReset)(&dstrm); + ASSERT_EQ(Z_OK, err) << dstrm.msg; + + err = PREFIX(inflateReset)(&istrm); + ASSERT_EQ(Z_OK, err) << istrm.msg; + + /* Mutate and compress the first half of buf concurrently. + * Decompress and throw away the results, which are unpredictable. + */ + mutator.resume(); + dstrm.next_in = buf; + dstrm.avail_in = sizeof(buf) / 2; + while (dstrm.avail_in > 0) { + dstrm.next_out = zbuf; + dstrm.avail_out = sizeof(zbuf); + err = PREFIX(deflate)(&dstrm, Z_NO_FLUSH); + ASSERT_EQ(Z_OK, err) << dstrm.msg; + istrm.next_in = zbuf; + istrm.avail_in = sizeof(zbuf) - dstrm.avail_out; + while (istrm.avail_in > 0) { + istrm.next_out = tmp; + istrm.avail_out = sizeof(tmp); + err = PREFIX(inflate)(&istrm, Z_NO_FLUSH); + ASSERT_EQ(Z_OK, err) << istrm.msg; + } + } + + /* Stop mutation and compress the second half of buf. + * Decompress and check that the result matches. + */ + mutator.pause(); + dstrm.next_in = buf + sizeof(buf) / 2; + dstrm.avail_in = sizeof(buf) - sizeof(buf) / 2; + while (dstrm.avail_in > 0) { + dstrm.next_out = zbuf; + dstrm.avail_out = sizeof(zbuf); + err = PREFIX(deflate)(&dstrm, Z_FINISH); + if (err == Z_STREAM_END) + ASSERT_EQ(0u, dstrm.avail_in); + else + ASSERT_EQ(Z_OK, err) << dstrm.msg; + istrm.next_in = zbuf; + istrm.avail_in = sizeof(zbuf) - dstrm.avail_out; + while (istrm.avail_in > 0) { + size_t orig_total_out = istrm.total_out; + istrm.next_out = tmp; + istrm.avail_out = sizeof(tmp); + err = PREFIX(inflate)(&istrm, Z_NO_FLUSH); + if (err == Z_STREAM_END) + ASSERT_EQ(0u, istrm.avail_in); + else + ASSERT_EQ(Z_OK, err) << istrm.msg; + size_t concurrent_size = sizeof(buf) - sizeof(buf) / 2; + if (istrm.total_out > concurrent_size) { + size_t tmp_offset, buf_offset, size; + if (orig_total_out >= concurrent_size) { + tmp_offset = 0; + buf_offset = orig_total_out - concurrent_size; + size = istrm.total_out - orig_total_out; + } else { + tmp_offset = concurrent_size - orig_total_out; + buf_offset = 0; + size = istrm.total_out - concurrent_size; + } + ASSERT_EQ(0, memcmp(tmp + tmp_offset, buf + sizeof(buf) / 2 + buf_offset, size)); + } + } + } + } + + err = PREFIX(inflateEnd)(&istrm); + ASSERT_EQ(Z_OK, err) << istrm.msg; + + err = PREFIX(deflateEnd)(&dstrm); + ASSERT_EQ(Z_OK, err) << istrm.msg; +} -- 2.47.3