]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-116738: Make zlib module thread-safe (gh-142432)
authorAlper <alperyoney@fb.com>
Fri, 12 Dec 2025 18:14:42 +0000 (10:14 -0800)
committerGitHub <noreply@github.com>
Fri, 12 Dec 2025 18:14:42 +0000 (13:14 -0500)
Makes the zlib module thread-safe free-threading build. Even though operations
are protected by locks, attributes exposed via PyMemberDef (eof, needs_input,
unused_data, unconsumed_tail) should still be stored atomically within locked
sections, since they can be read without acquiring the lock.

Lib/test/test_free_threading/test_zlib.py [new file with mode: 0644]
Misc/NEWS.d/next/Core_and_Builtins/2025-12-08-14-14-40.gh-issue-116738.x7aaBF.rst [new file with mode: 0644]
Modules/zlibmodule.c

diff --git a/Lib/test/test_free_threading/test_zlib.py b/Lib/test/test_free_threading/test_zlib.py
new file mode 100644 (file)
index 0000000..7c4ed04
--- /dev/null
@@ -0,0 +1,80 @@
+import itertools
+import unittest
+
+from test.support import import_helper, threading_helper
+from test.support.threading_helper import run_concurrently
+
+zlib = import_helper.import_module("zlib")
+
+from test.test_zlib import HAMLET_SCENE
+
+
+NTHREADS = 10
+
+
+@threading_helper.requires_working_threading()
+class TestZlib(unittest.TestCase):
+    def test_compressor(self):
+        comp = zlib.compressobj()
+
+        # First compress() outputs zlib header
+        header = comp.compress(HAMLET_SCENE)
+        self.assertGreater(len(header), 0)
+
+        def worker():
+            # it should return empty bytes as it buffers data internally
+            data = comp.compress(HAMLET_SCENE)
+            self.assertEqual(data, b"")
+
+        run_concurrently(worker_func=worker, nthreads=NTHREADS - 1)
+        full_compressed = header + comp.flush()
+        decompressed = zlib.decompress(full_compressed)
+        # The decompressed data should be HAMLET_SCENE repeated NTHREADS times
+        self.assertEqual(decompressed, HAMLET_SCENE * NTHREADS)
+
+    def test_decompressor_concurrent_attribute_reads(self):
+        input_data = HAMLET_SCENE * NTHREADS
+        compressed = zlib.compress(input_data)
+
+        decomp = zlib.decompressobj()
+        decomp_size_per_loop = len(input_data) // 1000
+        decompressed_parts = []
+
+        def decomp_worker():
+            # Decompress in chunks, which updates eof, unused_data, unconsumed_tail
+            decompressed_parts.append(
+                decomp.decompress(compressed, decomp_size_per_loop)
+            )
+            while decomp.unconsumed_tail:
+                decompressed_parts.append(
+                    decomp.decompress(
+                        decomp.unconsumed_tail, decomp_size_per_loop
+                    )
+                )
+
+        def decomp_attr_reader():
+            # Read attributes concurrently while another thread decompresses
+            for _ in range(1000):
+                _ = decomp.unused_data
+                _ = decomp.unconsumed_tail
+                _ = decomp.eof
+
+        counter = itertools.count()
+
+        def worker():
+            # First thread decompresses, others read attributes
+            if next(counter) == 0:
+                decomp_worker()
+            else:
+                decomp_attr_reader()
+
+        run_concurrently(worker_func=worker, nthreads=NTHREADS)
+
+        self.assertTrue(decomp.eof)
+        self.assertEqual(decomp.unused_data, b"")
+        decompressed = b"".join(decompressed_parts)
+        self.assertEqual(decompressed, HAMLET_SCENE * NTHREADS)
+
+
+if __name__ == "__main__":
+    unittest.main()
diff --git a/Misc/NEWS.d/next/Core_and_Builtins/2025-12-08-14-14-40.gh-issue-116738.x7aaBF.rst b/Misc/NEWS.d/next/Core_and_Builtins/2025-12-08-14-14-40.gh-issue-116738.x7aaBF.rst
new file mode 100644 (file)
index 0000000..dcf4d0e
--- /dev/null
@@ -0,0 +1,2 @@
+Make the attributes in :mod:`zlib` thread-safe on the :term:`free threaded
+<free threading>` build.
index 6bac09aa6c2a6c879f582327041b515ef5bf5dc7..f546f3ff1cb864f7815d58a07c54d616d8a055a0 100644 (file)
@@ -8,6 +8,7 @@
 #endif
 
 #include "Python.h"
+#include "pycore_pyatomic_ft_wrappers.h" // FT_ATOMIC_STORE_CHAR_RELAXED
 
 #include "zlib.h"
 #include "stdbool.h"
@@ -181,15 +182,6 @@ OutputBuffer_WindowOnError(_BlocksOutputBuffer *buffer, _Uint32Window *window)
 }
 
 
-#define ENTER_ZLIB(obj) do {                      \
-    if (!PyThread_acquire_lock((obj)->lock, 0)) { \
-        Py_BEGIN_ALLOW_THREADS                    \
-        PyThread_acquire_lock((obj)->lock, 1);    \
-        Py_END_ALLOW_THREADS                      \
-    } } while (0)
-#define LEAVE_ZLIB(obj) PyThread_release_lock((obj)->lock);
-
-
 /* The following parameters are copied from zutil.h, version 0.95 */
 #define DEFLATED   8
 #if MAX_MEM_LEVEL >= 8
@@ -228,7 +220,7 @@ typedef struct
     char eof;
     bool is_initialised;
     PyObject *zdict;
-    PyThread_type_lock lock;
+    PyMutex mutex;
 } compobject;
 
 #define _compobject_CAST(op)    ((compobject *)op)
@@ -291,12 +283,7 @@ newcompobject(PyTypeObject *type)
         Py_DECREF(self);
         return NULL;
     }
-    self->lock = PyThread_allocate_lock();
-    if (self->lock == NULL) {
-        Py_DECREF(self);
-        PyErr_SetString(PyExc_MemoryError, "Unable to allocate lock");
-        return NULL;
-    }
+    self->mutex = (PyMutex){0};
     return self;
 }
 
@@ -720,10 +707,10 @@ compobject_dealloc_impl(PyObject *op, int (*dealloc)(z_streamp))
     PyTypeObject *type = Py_TYPE(op);
     PyObject_GC_UnTrack(op);
     compobject *self = _compobject_CAST(op);
+    assert(!PyMutex_IsLocked(&self->mutex));
     if (self->is_initialised) {
         (void)dealloc(&self->zst);
     }
-    PyThread_free_lock(self->lock);
     Py_XDECREF(self->unused_data);
     Py_XDECREF(self->unconsumed_tail);
     Py_XDECREF(self->zdict);
@@ -777,7 +764,7 @@ zlib_Compress_compress_impl(compobject *self, PyTypeObject *cls,
     _BlocksOutputBuffer buffer = {.writer = NULL};
     zlibstate *state = PyType_GetModuleState(cls);
 
-    ENTER_ZLIB(self);
+    PyMutex_Lock(&self->mutex);
 
     self->zst.next_in = data->buf;
     Py_ssize_t ibuflen = data->len;
@@ -819,7 +806,7 @@ zlib_Compress_compress_impl(compobject *self, PyTypeObject *cls,
     OutputBuffer_OnError(&buffer);
     return_value = NULL;
  success:
-    LEAVE_ZLIB(self);
+    PyMutex_Unlock(&self->mutex);
     return return_value;
 }
 
@@ -909,7 +896,7 @@ zlib_Decompress_decompress_impl(compobject *self, PyTypeObject *cls,
         max_length = -1;
     }
 
-    ENTER_ZLIB(self);
+    PyMutex_Lock(&self->mutex);
 
     self->zst.next_in = data->buf;
     ibuflen = data->len;
@@ -962,7 +949,7 @@ zlib_Decompress_decompress_impl(compobject *self, PyTypeObject *cls,
     if (err == Z_STREAM_END) {
         /* This is the logical place to call inflateEnd, but the old behaviour
            of only calling it on flush() is preserved. */
-        self->eof = 1;
+        FT_ATOMIC_STORE_CHAR_RELAXED(self->eof, 1);
     } else if (err != Z_OK && err != Z_BUF_ERROR) {
         /* We will only get Z_BUF_ERROR if the output buffer was full
            but there wasn't more output when we tried again, so it is
@@ -981,7 +968,7 @@ zlib_Decompress_decompress_impl(compobject *self, PyTypeObject *cls,
     OutputBuffer_OnError(&buffer);
     return_value = NULL;
  success:
-    LEAVE_ZLIB(self);
+    PyMutex_Unlock(&self->mutex);
     return return_value;
 }
 
@@ -1014,7 +1001,7 @@ zlib_Compress_flush_impl(compobject *self, PyTypeObject *cls, int mode)
         return Py_GetConstant(Py_CONSTANT_EMPTY_BYTES);
     }
 
-    ENTER_ZLIB(self);
+    PyMutex_Lock(&self->mutex);
 
     self->zst.avail_in = 0;
 
@@ -1070,7 +1057,7 @@ error:
     OutputBuffer_OnError(&buffer);
     return_value = NULL;
 success:
-    LEAVE_ZLIB(self);
+    PyMutex_Unlock(&self->mutex);
     return return_value;
 }
 
@@ -1094,9 +1081,9 @@ zlib_Compress_copy_impl(compobject *self, PyTypeObject *cls)
     if (!return_value) return NULL;
 
     /* Copy the zstream state
-     * We use ENTER_ZLIB / LEAVE_ZLIB to make this thread-safe
+     * We use mutex to make this thread-safe
      */
-    ENTER_ZLIB(self);
+    PyMutex_Lock(&self->mutex);
     int err = deflateCopy(&return_value->zst, &self->zst);
     switch (err) {
     case Z_OK:
@@ -1120,11 +1107,11 @@ zlib_Compress_copy_impl(compobject *self, PyTypeObject *cls)
     /* Mark it as being initialized */
     return_value->is_initialised = 1;
 
-    LEAVE_ZLIB(self);
+    PyMutex_Unlock(&self->mutex);
     return (PyObject *)return_value;
 
 error:
-    LEAVE_ZLIB(self);
+    PyMutex_Unlock(&self->mutex);
     Py_XDECREF(return_value);
     return NULL;
 }
@@ -1178,9 +1165,9 @@ zlib_Decompress_copy_impl(compobject *self, PyTypeObject *cls)
     if (!return_value) return NULL;
 
     /* Copy the zstream state
-     * We use ENTER_ZLIB / LEAVE_ZLIB to make this thread-safe
+     * We use mutex to make this thread-safe
      */
-    ENTER_ZLIB(self);
+    PyMutex_Lock(&self->mutex);
     int err = inflateCopy(&return_value->zst, &self->zst);
     switch (err) {
     case Z_OK:
@@ -1205,11 +1192,11 @@ zlib_Decompress_copy_impl(compobject *self, PyTypeObject *cls)
     /* Mark it as being initialized */
     return_value->is_initialised = 1;
 
-    LEAVE_ZLIB(self);
+    PyMutex_Unlock(&self->mutex);
     return (PyObject *)return_value;
 
 error:
-    LEAVE_ZLIB(self);
+    PyMutex_Unlock(&self->mutex);
     Py_XDECREF(return_value);
     return NULL;
 }
@@ -1282,10 +1269,10 @@ zlib_Decompress_flush_impl(compobject *self, PyTypeObject *cls,
         return NULL;
     }
 
-    ENTER_ZLIB(self);
+    PyMutex_Lock(&self->mutex);
 
     if (PyObject_GetBuffer(self->unconsumed_tail, &data, PyBUF_SIMPLE) == -1) {
-        LEAVE_ZLIB(self);
+        PyMutex_Unlock(&self->mutex);
         return NULL;
     }
 
@@ -1333,7 +1320,7 @@ zlib_Decompress_flush_impl(compobject *self, PyTypeObject *cls,
 
     /* If at end of stream, clean up any memory allocated by zlib. */
     if (err == Z_STREAM_END) {
-        self->eof = 1;
+        FT_ATOMIC_STORE_CHAR_RELAXED(self->eof, 1);
         self->is_initialised = 0;
         err = inflateEnd(&self->zst);
         if (err != Z_OK) {
@@ -1352,7 +1339,7 @@ zlib_Decompress_flush_impl(compobject *self, PyTypeObject *cls,
     return_value = NULL;
  success:
     PyBuffer_Release(&data);
-    LEAVE_ZLIB(self);
+    PyMutex_Unlock(&self->mutex);
     return return_value;
 }
 
@@ -1361,7 +1348,7 @@ typedef struct {
     PyObject_HEAD
     z_stream zst;
     PyObject *zdict;
-    PyThread_type_lock lock;
+    PyMutex mutex;
     PyObject *unused_data;
     uint8_t *input_buffer;
     Py_ssize_t input_buffer_size;
@@ -1387,7 +1374,7 @@ ZlibDecompressor_dealloc(PyObject *op)
     PyTypeObject *type = Py_TYPE(op);
     PyObject_GC_UnTrack(op);
     ZlibDecompressor *self = ZlibDecompressor_CAST(op);
-    PyThread_free_lock(self->lock);
+    assert(!PyMutex_IsLocked(&self->mutex));
     if (self->is_initialised) {
         inflateEnd(&self->zst);
     }
@@ -1545,7 +1532,7 @@ decompress_buf(ZlibDecompressor *self, Py_ssize_t max_length)
     } while(err != Z_STREAM_END && self->avail_in_real != 0);
 
     if (err == Z_STREAM_END) {
-        self->eof = 1;
+        FT_ATOMIC_STORE_CHAR_RELAXED(self->eof, 1);
         self->is_initialised = 0;
         /* Unlike the Decompress object we call inflateEnd here as there are no
            backwards compatibility issues */
@@ -1633,7 +1620,7 @@ decompress(ZlibDecompressor *self, uint8_t *data,
     }
 
     if (self->eof) {
-        self->needs_input = 0;
+        FT_ATOMIC_STORE_CHAR_RELAXED(self->needs_input, 0);
 
         if (self->avail_in_real > 0) {
             PyObject *unused_data = PyBytes_FromStringAndSize(
@@ -1646,10 +1633,10 @@ decompress(ZlibDecompressor *self, uint8_t *data,
     }
     else if (self->avail_in_real == 0) {
         self->zst.next_in = NULL;
-        self->needs_input = 1;
+        FT_ATOMIC_STORE_CHAR_RELAXED(self->needs_input, 1);
     }
     else {
-        self->needs_input = 0;
+        FT_ATOMIC_STORE_CHAR_RELAXED(self->needs_input, 0);
 
         /* If we did not use the input buffer, we now have
            to copy the tail from the caller's buffer into the
@@ -1718,14 +1705,14 @@ zlib__ZlibDecompressor_decompress_impl(ZlibDecompressor *self,
 {
     PyObject *result = NULL;
 
-    ENTER_ZLIB(self);
+    PyMutex_Lock(&self->mutex);
     if (self->eof) {
         PyErr_SetString(PyExc_EOFError, "End of stream already reached");
     }
     else {
         result = decompress(self, data->buf, data->len, max_length);
     }
-    LEAVE_ZLIB(self);
+    PyMutex_Unlock(&self->mutex);
     return result;
 }
 
@@ -1767,12 +1754,7 @@ zlib__ZlibDecompressor_impl(PyTypeObject *type, int wbits, PyObject *zdict)
     self->zst.next_in = NULL;
     self->zst.avail_in = 0;
     self->unused_data = Py_GetConstant(Py_CONSTANT_EMPTY_BYTES);
-    self->lock = PyThread_allocate_lock();
-    if (self->lock == NULL) {
-        Py_DECREF(self);
-        PyErr_SetString(PyExc_MemoryError, "Unable to allocate lock");
-        return NULL;
-    }
+    self->mutex = (PyMutex){0};
     int err = inflateInit2(&(self->zst), wbits);
     switch (err) {
         case Z_OK:
@@ -1827,10 +1809,36 @@ static PyMethodDef ZlibDecompressor_methods[] = {
     {NULL}
 };
 
+static PyObject *
+Decomp_unused_data_get(PyObject *op, void *Py_UNUSED(ignored))
+{
+    compobject *self = _compobject_CAST(op);
+    PyMutex_Lock(&self->mutex);
+    assert(self->unused_data != NULL);
+    PyObject *result = Py_NewRef(self->unused_data);
+    PyMutex_Unlock(&self->mutex);
+    return result;
+}
+
+static PyObject *
+Decomp_unconsumed_tail_get(PyObject *op, void *Py_UNUSED(ignored))
+{
+    compobject *self = _compobject_CAST(op);
+    PyMutex_Lock(&self->mutex);
+    assert(self->unconsumed_tail != NULL);
+    PyObject *result = Py_NewRef(self->unconsumed_tail);
+    PyMutex_Unlock(&self->mutex);
+    return result;
+}
+
+static PyGetSetDef Decomp_getset[] = {
+    {"unused_data", Decomp_unused_data_get, NULL, NULL},
+    {"unconsumed_tail", Decomp_unconsumed_tail_get, NULL, NULL},
+    {NULL},
+};
+
 #define COMP_OFF(x) offsetof(compobject, x)
 static PyMemberDef Decomp_members[] = {
-    {"unused_data",     _Py_T_OBJECT, COMP_OFF(unused_data), Py_READONLY},
-    {"unconsumed_tail", _Py_T_OBJECT, COMP_OFF(unconsumed_tail), Py_READONLY},
     {"eof",             Py_T_BOOL,   COMP_OFF(eof), Py_READONLY},
     {NULL},
 };
@@ -1844,11 +1852,26 @@ PyDoc_STRVAR(ZlibDecompressor_unused_data__doc__,
 PyDoc_STRVAR(ZlibDecompressor_needs_input_doc,
 "True if more input is needed before more decompressed data can be produced.");
 
+static PyObject *
+ZlibDecompressor_unused_data_get(PyObject *op, void *Py_UNUSED(ignored))
+{
+    ZlibDecompressor *self = ZlibDecompressor_CAST(op);
+    PyMutex_Lock(&self->mutex);
+    assert(self->unused_data != NULL);
+    PyObject *result = Py_NewRef(self->unused_data);
+    PyMutex_Unlock(&self->mutex);
+    return result;
+}
+
+static PyGetSetDef ZlibDecompressor_getset[] = {
+    {"unused_data", ZlibDecompressor_unused_data_get, NULL,
+     ZlibDecompressor_unused_data__doc__},
+    {NULL},
+};
+
 static PyMemberDef ZlibDecompressor_members[] = {
     {"eof", Py_T_BOOL, offsetof(ZlibDecompressor, eof),
      Py_READONLY, ZlibDecompressor_eof__doc__},
-    {"unused_data", Py_T_OBJECT_EX, offsetof(ZlibDecompressor, unused_data),
-     Py_READONLY, ZlibDecompressor_unused_data__doc__},
     {"needs_input", Py_T_BOOL, offsetof(ZlibDecompressor, needs_input), Py_READONLY,
      ZlibDecompressor_needs_input_doc},
     {NULL},
@@ -2074,6 +2097,7 @@ static PyType_Slot Decomptype_slots[] = {
     {Py_tp_traverse, compobject_traverse},
     {Py_tp_methods, Decomp_methods},
     {Py_tp_members, Decomp_members},
+    {Py_tp_getset, Decomp_getset},
     {0, 0},
 };
 
@@ -2093,6 +2117,7 @@ static PyType_Slot ZlibDecompressor_type_slots[] = {
     {Py_tp_dealloc, ZlibDecompressor_dealloc},
     {Py_tp_traverse, ZlibDecompressor_traverse},
     {Py_tp_members, ZlibDecompressor_members},
+    {Py_tp_getset, ZlibDecompressor_getset},
     {Py_tp_new, zlib__ZlibDecompressor},
     {Py_tp_doc, (char *)zlib__ZlibDecompressor__doc__},
     {Py_tp_methods, ZlibDecompressor_methods},