]> git.ipfire.org Git - thirdparty/zlib-ng.git/commitdiff
Add a test for concurrently modifying deflate() input
authorIlya Leoshkevich <iii@linux.ibm.com>
Fri, 3 Jun 2022 13:38:19 +0000 (15:38 +0200)
committerHans Kristian Rosbach <hk-github@circlestorm.org>
Fri, 24 Jun 2022 13:15:17 +0000 (15:15 +0200)
The test simulates what one of the QEMU live migration tests is doing:
increments each buffer byte by 1 while deflate()ing it.

The test tries to produce a race condition and therefore is
probabilistic. The longer it runs, the better are the chances to catch
an issue. The scenario in question is known to be broken on IBM Z
with DFLTCC, and there it is caught in 100ms most of the time. The
run time is therefore set to 1 second in order to balance usability and
reliability.

test/CMakeLists.txt
test/test_deflate_concurrency.cc [new file with mode: 0644]

index fc06ce7ba02349051f087f1c36bc3d2a4a12d375..3a47e8b2758ec1506d866217db8d2c36109bb41f 100644 (file)
@@ -22,9 +22,6 @@ if(NOT TARGET GTest::GTest)
     # Prevent overriding the parent project's compiler/linker settings for Windows
     set(gtest_force_shared_crt ON CACHE BOOL
         "Use shared (DLL) run-time lib even when Google Test is built as static lib." FORCE)
-    # Disable pthreads for simplicity
-    set(gtest_disable_pthreads ON CACHE BOOL
-        "Disable uses of pthreads in gtest." FORCE)
 
     # Allow specifying alternative Google test repository
     if(NOT DEFINED GTEST_REPOSITORY)
@@ -100,6 +97,16 @@ endif()
 
 target_link_libraries(gtest_zlib zlibstatic GTest::GTest)
 
+find_package(Threads)
+if(Threads_FOUND)
+    target_sources(gtest_zlib PRIVATE test_deflate_concurrency.cc)
+    if(UNIX AND NOT APPLE)
+        # On Linux, use a workaround for https://gcc.gnu.org/bugzilla/show_bug.cgi?id=52590
+        target_link_libraries(gtest_zlib -Wl,--whole-archive -lpthread -Wl,--no-whole-archive)
+    endif()
+    target_link_libraries(gtest_zlib Threads::Threads)
+endif()
+
 if(ZLIB_ENABLE_TESTS)
     add_test(NAME gtest_zlib
         COMMAND ${CMAKE_CROSSCOMPILING_EMULATOR} $<TARGET_FILE:gtest_zlib>)
diff --git a/test/test_deflate_concurrency.cc b/test/test_deflate_concurrency.cc
new file mode 100644 (file)
index 0000000..1297aee
--- /dev/null
@@ -0,0 +1,170 @@
+/* Test deflate() on concurrently modified next_in.
+ *
+ * Plain zlib does not document that this is supported, but in practice it tolerates this, and QEMU live migration is
+ * known to rely on this. Make sure zlib-ng tolerates this as well.
+ */
+
+#include "zbuild.h"
+#ifdef ZLIB_COMPAT
+#include "zlib.h"
+#else
+#include "zlib-ng.h"
+#endif
+
+#include <gtest/gtest.h>
+
+#include <algorithm>
+#include <atomic>
+#include <cstring>
+#include <thread>
+
+static uint8_t buf[8 * 1024];
+static uint8_t zbuf[4 * 1024];
+static uint8_t tmp[8 * 1024];
+
+/* Thread that increments all bytes in buf by 1. */
+class Mutator {
+    enum class State {
+        PAUSED,
+        RUNNING,
+        STOPPED,
+    };
+
+public:
+    Mutator()
+        : m_state(State::PAUSED), m_target_state(State::PAUSED),
+          m_thread(&Mutator::run, this) {}
+    ~Mutator() {
+        transition(State::STOPPED);
+        m_thread.join();
+    }
+
+    void pause() {
+        transition(State::PAUSED);
+    }
+
+    void resume() {
+        transition(State::RUNNING);
+    }
+
+private:
+    void run() {
+        while (true) {
+            m_state.store(m_target_state);
+            if (m_state == State::PAUSED)
+                continue;
+            if (m_state == State::STOPPED)
+                break;
+            for (uint8_t & i: buf)
+                i++;
+        }
+    }
+
+    void transition(State target_state) {
+        m_target_state = target_state;
+        while (m_state != target_state) {
+        }
+    }
+
+    std::atomic<State> m_state, m_target_state;
+    std::thread m_thread;
+};
+
+TEST(deflate, concurrency) {
+#ifdef S390_DFLTCC_DEFLATE
+    GTEST_SKIP() << "Known to be broken with S390_DFLTCC_DEFLATE";
+#endif
+
+    /* Create reusable mutator and streams. */
+    Mutator mutator;
+
+    PREFIX3(stream) dstrm;
+    memset(&dstrm, 0, sizeof(dstrm));
+    int err = PREFIX(deflateInit2)(&dstrm, Z_BEST_SPEED, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY);
+    ASSERT_EQ(Z_OK, err) << dstrm.msg;
+
+    PREFIX3(stream) istrm;
+    memset(&istrm, 0, sizeof(istrm));
+    err = PREFIX(inflateInit2)(&istrm, -15);
+    ASSERT_EQ(Z_OK, err) << istrm.msg;
+
+    /* Iterate for a certain amount of time. */
+    auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(1);
+    while (std::chrono::steady_clock::now() < deadline) {
+        /* Start each iteration with a fresh stream state. */
+        err = PREFIX(deflateReset)(&dstrm);
+        ASSERT_EQ(Z_OK, err) << dstrm.msg;
+
+        err = PREFIX(inflateReset)(&istrm);
+        ASSERT_EQ(Z_OK, err) << istrm.msg;
+
+        /* Mutate and compress the first half of buf concurrently.
+         * Decompress and throw away the results, which are unpredictable.
+         */
+        mutator.resume();
+        dstrm.next_in = buf;
+        dstrm.avail_in = sizeof(buf) / 2;
+        while (dstrm.avail_in > 0) {
+            dstrm.next_out = zbuf;
+            dstrm.avail_out = sizeof(zbuf);
+            err = PREFIX(deflate)(&dstrm, Z_NO_FLUSH);
+            ASSERT_EQ(Z_OK, err) << dstrm.msg;
+            istrm.next_in = zbuf;
+            istrm.avail_in = sizeof(zbuf) - dstrm.avail_out;
+            while (istrm.avail_in > 0) {
+                istrm.next_out = tmp;
+                istrm.avail_out = sizeof(tmp);
+                err = PREFIX(inflate)(&istrm, Z_NO_FLUSH);
+                ASSERT_EQ(Z_OK, err) << istrm.msg;
+            }
+        }
+
+        /* Stop mutation and compress the second half of buf.
+         * Decompress and check that the result matches.
+         */
+        mutator.pause();
+        dstrm.next_in = buf + sizeof(buf) / 2;
+        dstrm.avail_in = sizeof(buf) - sizeof(buf) / 2;
+        while (dstrm.avail_in > 0) {
+            dstrm.next_out = zbuf;
+            dstrm.avail_out = sizeof(zbuf);
+            err = PREFIX(deflate)(&dstrm, Z_FINISH);
+            if (err == Z_STREAM_END)
+                ASSERT_EQ(0u, dstrm.avail_in);
+            else
+                ASSERT_EQ(Z_OK, err) << dstrm.msg;
+            istrm.next_in = zbuf;
+            istrm.avail_in = sizeof(zbuf) - dstrm.avail_out;
+            while (istrm.avail_in > 0) {
+                size_t orig_total_out = istrm.total_out;
+                istrm.next_out = tmp;
+                istrm.avail_out = sizeof(tmp);
+                err = PREFIX(inflate)(&istrm, Z_NO_FLUSH);
+                if (err == Z_STREAM_END)
+                    ASSERT_EQ(0u, istrm.avail_in);
+                else
+                    ASSERT_EQ(Z_OK, err) << istrm.msg;
+                size_t concurrent_size = sizeof(buf) - sizeof(buf) / 2;
+                if (istrm.total_out > concurrent_size) {
+                    size_t tmp_offset, buf_offset, size;
+                    if (orig_total_out >= concurrent_size) {
+                        tmp_offset = 0;
+                        buf_offset = orig_total_out - concurrent_size;
+                        size = istrm.total_out - orig_total_out;
+                    } else {
+                        tmp_offset = concurrent_size - orig_total_out;
+                        buf_offset = 0;
+                        size = istrm.total_out - concurrent_size;
+                    }
+                    ASSERT_EQ(0, memcmp(tmp + tmp_offset, buf + sizeof(buf) / 2 + buf_offset, size));
+                }
+            }
+        }
+    }
+
+    err = PREFIX(inflateEnd)(&istrm);
+    ASSERT_EQ(Z_OK, err) << istrm.msg;
+
+    err = PREFIX(deflateEnd)(&dstrm);
+    ASSERT_EQ(Z_OK, err) << istrm.msg;
+}