--- /dev/null
+/* Test deflate() on concurrently modified next_in.
+ *
+ * Plain zlib does not document that this is supported, but in practice it tolerates this, and QEMU live migration is
+ * known to rely on this. Make sure zlib-ng tolerates this as well.
+ */
+
+#include "zbuild.h"
+#ifdef ZLIB_COMPAT
+#include "zlib.h"
+#else
+#include "zlib-ng.h"
+#endif
+
+#include <gtest/gtest.h>
+
+#include <algorithm>
+#include <atomic>
+#include <cstring>
+#include <thread>
+
+static uint8_t buf[8 * 1024];
+static uint8_t zbuf[4 * 1024];
+static uint8_t tmp[8 * 1024];
+
+/* Thread that increments all bytes in buf by 1. */
+class Mutator {
+ enum class State {
+ PAUSED,
+ RUNNING,
+ STOPPED,
+ };
+
+public:
+ Mutator()
+ : m_state(State::PAUSED), m_target_state(State::PAUSED),
+ m_thread(&Mutator::run, this) {}
+ ~Mutator() {
+ transition(State::STOPPED);
+ m_thread.join();
+ }
+
+ void pause() {
+ transition(State::PAUSED);
+ }
+
+ void resume() {
+ transition(State::RUNNING);
+ }
+
+private:
+ void run() {
+ while (true) {
+ m_state.store(m_target_state);
+ if (m_state == State::PAUSED)
+ continue;
+ if (m_state == State::STOPPED)
+ break;
+ for (uint8_t & i: buf)
+ i++;
+ }
+ }
+
+ void transition(State target_state) {
+ m_target_state = target_state;
+ while (m_state != target_state) {
+ }
+ }
+
+ std::atomic<State> m_state, m_target_state;
+ std::thread m_thread;
+};
+
+TEST(deflate, concurrency) {
+#ifdef S390_DFLTCC_DEFLATE
+ GTEST_SKIP() << "Known to be broken with S390_DFLTCC_DEFLATE";
+#endif
+
+ /* Create reusable mutator and streams. */
+ Mutator mutator;
+
+ PREFIX3(stream) dstrm;
+ memset(&dstrm, 0, sizeof(dstrm));
+ int err = PREFIX(deflateInit2)(&dstrm, Z_BEST_SPEED, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY);
+ ASSERT_EQ(Z_OK, err) << dstrm.msg;
+
+ PREFIX3(stream) istrm;
+ memset(&istrm, 0, sizeof(istrm));
+ err = PREFIX(inflateInit2)(&istrm, -15);
+ ASSERT_EQ(Z_OK, err) << istrm.msg;
+
+ /* Iterate for a certain amount of time. */
+ auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(1);
+ while (std::chrono::steady_clock::now() < deadline) {
+ /* Start each iteration with a fresh stream state. */
+ err = PREFIX(deflateReset)(&dstrm);
+ ASSERT_EQ(Z_OK, err) << dstrm.msg;
+
+ err = PREFIX(inflateReset)(&istrm);
+ ASSERT_EQ(Z_OK, err) << istrm.msg;
+
+ /* Mutate and compress the first half of buf concurrently.
+ * Decompress and throw away the results, which are unpredictable.
+ */
+ mutator.resume();
+ dstrm.next_in = buf;
+ dstrm.avail_in = sizeof(buf) / 2;
+ while (dstrm.avail_in > 0) {
+ dstrm.next_out = zbuf;
+ dstrm.avail_out = sizeof(zbuf);
+ err = PREFIX(deflate)(&dstrm, Z_NO_FLUSH);
+ ASSERT_EQ(Z_OK, err) << dstrm.msg;
+ istrm.next_in = zbuf;
+ istrm.avail_in = sizeof(zbuf) - dstrm.avail_out;
+ while (istrm.avail_in > 0) {
+ istrm.next_out = tmp;
+ istrm.avail_out = sizeof(tmp);
+ err = PREFIX(inflate)(&istrm, Z_NO_FLUSH);
+ ASSERT_EQ(Z_OK, err) << istrm.msg;
+ }
+ }
+
+ /* Stop mutation and compress the second half of buf.
+ * Decompress and check that the result matches.
+ */
+ mutator.pause();
+ dstrm.next_in = buf + sizeof(buf) / 2;
+ dstrm.avail_in = sizeof(buf) - sizeof(buf) / 2;
+ while (dstrm.avail_in > 0) {
+ dstrm.next_out = zbuf;
+ dstrm.avail_out = sizeof(zbuf);
+ err = PREFIX(deflate)(&dstrm, Z_FINISH);
+ if (err == Z_STREAM_END)
+ ASSERT_EQ(0u, dstrm.avail_in);
+ else
+ ASSERT_EQ(Z_OK, err) << dstrm.msg;
+ istrm.next_in = zbuf;
+ istrm.avail_in = sizeof(zbuf) - dstrm.avail_out;
+ while (istrm.avail_in > 0) {
+ size_t orig_total_out = istrm.total_out;
+ istrm.next_out = tmp;
+ istrm.avail_out = sizeof(tmp);
+ err = PREFIX(inflate)(&istrm, Z_NO_FLUSH);
+ if (err == Z_STREAM_END)
+ ASSERT_EQ(0u, istrm.avail_in);
+ else
+ ASSERT_EQ(Z_OK, err) << istrm.msg;
+ size_t concurrent_size = sizeof(buf) - sizeof(buf) / 2;
+ if (istrm.total_out > concurrent_size) {
+ size_t tmp_offset, buf_offset, size;
+ if (orig_total_out >= concurrent_size) {
+ tmp_offset = 0;
+ buf_offset = orig_total_out - concurrent_size;
+ size = istrm.total_out - orig_total_out;
+ } else {
+ tmp_offset = concurrent_size - orig_total_out;
+ buf_offset = 0;
+ size = istrm.total_out - concurrent_size;
+ }
+ ASSERT_EQ(0, memcmp(tmp + tmp_offset, buf + sizeof(buf) / 2 + buf_offset, size));
+ }
+ }
+ }
+ }
+
+ err = PREFIX(inflateEnd)(&istrm);
+ ASSERT_EQ(Z_OK, err) << istrm.msg;
+
+ err = PREFIX(deflateEnd)(&dstrm);
+ ASSERT_EQ(Z_OK, err) << istrm.msg;
+}