From 1eddef81930ad9f7e2f411f153c35af16a6edf14 Mon Sep 17 00:00:00 2001 From: Alper Date: Fri, 12 Dec 2025 10:14:42 -0800 Subject: [PATCH] gh-116738: Make zlib module thread-safe (gh-142432) Makes the zlib module thread-safe free-threading build. Even though operations are protected by locks, attributes exposed via PyMemberDef (eof, needs_input, unused_data, unconsumed_tail) should still be stored atomically within locked sections, since they can be read without acquiring the lock. --- Lib/test/test_free_threading/test_zlib.py | 80 +++++++++++ ...-12-08-14-14-40.gh-issue-116738.x7aaBF.rst | 2 + Modules/zlibmodule.c | 133 +++++++++++------- 3 files changed, 161 insertions(+), 54 deletions(-) create mode 100644 Lib/test/test_free_threading/test_zlib.py create mode 100644 Misc/NEWS.d/next/Core_and_Builtins/2025-12-08-14-14-40.gh-issue-116738.x7aaBF.rst diff --git a/Lib/test/test_free_threading/test_zlib.py b/Lib/test/test_free_threading/test_zlib.py new file mode 100644 index 000000000000..7c4ed04f4a7c --- /dev/null +++ b/Lib/test/test_free_threading/test_zlib.py @@ -0,0 +1,80 @@ +import itertools +import unittest + +from test.support import import_helper, threading_helper +from test.support.threading_helper import run_concurrently + +zlib = import_helper.import_module("zlib") + +from test.test_zlib import HAMLET_SCENE + + +NTHREADS = 10 + + +@threading_helper.requires_working_threading() +class TestZlib(unittest.TestCase): + def test_compressor(self): + comp = zlib.compressobj() + + # First compress() outputs zlib header + header = comp.compress(HAMLET_SCENE) + self.assertGreater(len(header), 0) + + def worker(): + # it should return empty bytes as it buffers data internally + data = comp.compress(HAMLET_SCENE) + self.assertEqual(data, b"") + + run_concurrently(worker_func=worker, nthreads=NTHREADS - 1) + full_compressed = header + comp.flush() + decompressed = zlib.decompress(full_compressed) + # The decompressed data should be HAMLET_SCENE repeated NTHREADS times + self.assertEqual(decompressed, HAMLET_SCENE * NTHREADS) + + def test_decompressor_concurrent_attribute_reads(self): + input_data = HAMLET_SCENE * NTHREADS + compressed = zlib.compress(input_data) + + decomp = zlib.decompressobj() + decomp_size_per_loop = len(input_data) // 1000 + decompressed_parts = [] + + def decomp_worker(): + # Decompress in chunks, which updates eof, unused_data, unconsumed_tail + decompressed_parts.append( + decomp.decompress(compressed, decomp_size_per_loop) + ) + while decomp.unconsumed_tail: + decompressed_parts.append( + decomp.decompress( + decomp.unconsumed_tail, decomp_size_per_loop + ) + ) + + def decomp_attr_reader(): + # Read attributes concurrently while another thread decompresses + for _ in range(1000): + _ = decomp.unused_data + _ = decomp.unconsumed_tail + _ = decomp.eof + + counter = itertools.count() + + def worker(): + # First thread decompresses, others read attributes + if next(counter) == 0: + decomp_worker() + else: + decomp_attr_reader() + + run_concurrently(worker_func=worker, nthreads=NTHREADS) + + self.assertTrue(decomp.eof) + self.assertEqual(decomp.unused_data, b"") + decompressed = b"".join(decompressed_parts) + self.assertEqual(decompressed, HAMLET_SCENE * NTHREADS) + + +if __name__ == "__main__": + unittest.main() diff --git a/Misc/NEWS.d/next/Core_and_Builtins/2025-12-08-14-14-40.gh-issue-116738.x7aaBF.rst b/Misc/NEWS.d/next/Core_and_Builtins/2025-12-08-14-14-40.gh-issue-116738.x7aaBF.rst new file mode 100644 index 000000000000..dcf4d0efa6f2 --- /dev/null +++ b/Misc/NEWS.d/next/Core_and_Builtins/2025-12-08-14-14-40.gh-issue-116738.x7aaBF.rst @@ -0,0 +1,2 @@ +Make the attributes in :mod:`zlib` thread-safe on the :term:`free threaded +` build. diff --git a/Modules/zlibmodule.c b/Modules/zlibmodule.c index 6bac09aa6c2a..f546f3ff1cb8 100644 --- a/Modules/zlibmodule.c +++ b/Modules/zlibmodule.c @@ -8,6 +8,7 @@ #endif #include "Python.h" +#include "pycore_pyatomic_ft_wrappers.h" // FT_ATOMIC_STORE_CHAR_RELAXED #include "zlib.h" #include "stdbool.h" @@ -181,15 +182,6 @@ OutputBuffer_WindowOnError(_BlocksOutputBuffer *buffer, _Uint32Window *window) } -#define ENTER_ZLIB(obj) do { \ - if (!PyThread_acquire_lock((obj)->lock, 0)) { \ - Py_BEGIN_ALLOW_THREADS \ - PyThread_acquire_lock((obj)->lock, 1); \ - Py_END_ALLOW_THREADS \ - } } while (0) -#define LEAVE_ZLIB(obj) PyThread_release_lock((obj)->lock); - - /* The following parameters are copied from zutil.h, version 0.95 */ #define DEFLATED 8 #if MAX_MEM_LEVEL >= 8 @@ -228,7 +220,7 @@ typedef struct char eof; bool is_initialised; PyObject *zdict; - PyThread_type_lock lock; + PyMutex mutex; } compobject; #define _compobject_CAST(op) ((compobject *)op) @@ -291,12 +283,7 @@ newcompobject(PyTypeObject *type) Py_DECREF(self); return NULL; } - self->lock = PyThread_allocate_lock(); - if (self->lock == NULL) { - Py_DECREF(self); - PyErr_SetString(PyExc_MemoryError, "Unable to allocate lock"); - return NULL; - } + self->mutex = (PyMutex){0}; return self; } @@ -720,10 +707,10 @@ compobject_dealloc_impl(PyObject *op, int (*dealloc)(z_streamp)) PyTypeObject *type = Py_TYPE(op); PyObject_GC_UnTrack(op); compobject *self = _compobject_CAST(op); + assert(!PyMutex_IsLocked(&self->mutex)); if (self->is_initialised) { (void)dealloc(&self->zst); } - PyThread_free_lock(self->lock); Py_XDECREF(self->unused_data); Py_XDECREF(self->unconsumed_tail); Py_XDECREF(self->zdict); @@ -777,7 +764,7 @@ zlib_Compress_compress_impl(compobject *self, PyTypeObject *cls, _BlocksOutputBuffer buffer = {.writer = NULL}; zlibstate *state = PyType_GetModuleState(cls); - ENTER_ZLIB(self); + PyMutex_Lock(&self->mutex); self->zst.next_in = data->buf; Py_ssize_t ibuflen = data->len; @@ -819,7 +806,7 @@ zlib_Compress_compress_impl(compobject *self, PyTypeObject *cls, OutputBuffer_OnError(&buffer); return_value = NULL; success: - LEAVE_ZLIB(self); + PyMutex_Unlock(&self->mutex); return return_value; } @@ -909,7 +896,7 @@ zlib_Decompress_decompress_impl(compobject *self, PyTypeObject *cls, max_length = -1; } - ENTER_ZLIB(self); + PyMutex_Lock(&self->mutex); self->zst.next_in = data->buf; ibuflen = data->len; @@ -962,7 +949,7 @@ zlib_Decompress_decompress_impl(compobject *self, PyTypeObject *cls, if (err == Z_STREAM_END) { /* This is the logical place to call inflateEnd, but the old behaviour of only calling it on flush() is preserved. */ - self->eof = 1; + FT_ATOMIC_STORE_CHAR_RELAXED(self->eof, 1); } else if (err != Z_OK && err != Z_BUF_ERROR) { /* We will only get Z_BUF_ERROR if the output buffer was full but there wasn't more output when we tried again, so it is @@ -981,7 +968,7 @@ zlib_Decompress_decompress_impl(compobject *self, PyTypeObject *cls, OutputBuffer_OnError(&buffer); return_value = NULL; success: - LEAVE_ZLIB(self); + PyMutex_Unlock(&self->mutex); return return_value; } @@ -1014,7 +1001,7 @@ zlib_Compress_flush_impl(compobject *self, PyTypeObject *cls, int mode) return Py_GetConstant(Py_CONSTANT_EMPTY_BYTES); } - ENTER_ZLIB(self); + PyMutex_Lock(&self->mutex); self->zst.avail_in = 0; @@ -1070,7 +1057,7 @@ error: OutputBuffer_OnError(&buffer); return_value = NULL; success: - LEAVE_ZLIB(self); + PyMutex_Unlock(&self->mutex); return return_value; } @@ -1094,9 +1081,9 @@ zlib_Compress_copy_impl(compobject *self, PyTypeObject *cls) if (!return_value) return NULL; /* Copy the zstream state - * We use ENTER_ZLIB / LEAVE_ZLIB to make this thread-safe + * We use mutex to make this thread-safe */ - ENTER_ZLIB(self); + PyMutex_Lock(&self->mutex); int err = deflateCopy(&return_value->zst, &self->zst); switch (err) { case Z_OK: @@ -1120,11 +1107,11 @@ zlib_Compress_copy_impl(compobject *self, PyTypeObject *cls) /* Mark it as being initialized */ return_value->is_initialised = 1; - LEAVE_ZLIB(self); + PyMutex_Unlock(&self->mutex); return (PyObject *)return_value; error: - LEAVE_ZLIB(self); + PyMutex_Unlock(&self->mutex); Py_XDECREF(return_value); return NULL; } @@ -1178,9 +1165,9 @@ zlib_Decompress_copy_impl(compobject *self, PyTypeObject *cls) if (!return_value) return NULL; /* Copy the zstream state - * We use ENTER_ZLIB / LEAVE_ZLIB to make this thread-safe + * We use mutex to make this thread-safe */ - ENTER_ZLIB(self); + PyMutex_Lock(&self->mutex); int err = inflateCopy(&return_value->zst, &self->zst); switch (err) { case Z_OK: @@ -1205,11 +1192,11 @@ zlib_Decompress_copy_impl(compobject *self, PyTypeObject *cls) /* Mark it as being initialized */ return_value->is_initialised = 1; - LEAVE_ZLIB(self); + PyMutex_Unlock(&self->mutex); return (PyObject *)return_value; error: - LEAVE_ZLIB(self); + PyMutex_Unlock(&self->mutex); Py_XDECREF(return_value); return NULL; } @@ -1282,10 +1269,10 @@ zlib_Decompress_flush_impl(compobject *self, PyTypeObject *cls, return NULL; } - ENTER_ZLIB(self); + PyMutex_Lock(&self->mutex); if (PyObject_GetBuffer(self->unconsumed_tail, &data, PyBUF_SIMPLE) == -1) { - LEAVE_ZLIB(self); + PyMutex_Unlock(&self->mutex); return NULL; } @@ -1333,7 +1320,7 @@ zlib_Decompress_flush_impl(compobject *self, PyTypeObject *cls, /* If at end of stream, clean up any memory allocated by zlib. */ if (err == Z_STREAM_END) { - self->eof = 1; + FT_ATOMIC_STORE_CHAR_RELAXED(self->eof, 1); self->is_initialised = 0; err = inflateEnd(&self->zst); if (err != Z_OK) { @@ -1352,7 +1339,7 @@ zlib_Decompress_flush_impl(compobject *self, PyTypeObject *cls, return_value = NULL; success: PyBuffer_Release(&data); - LEAVE_ZLIB(self); + PyMutex_Unlock(&self->mutex); return return_value; } @@ -1361,7 +1348,7 @@ typedef struct { PyObject_HEAD z_stream zst; PyObject *zdict; - PyThread_type_lock lock; + PyMutex mutex; PyObject *unused_data; uint8_t *input_buffer; Py_ssize_t input_buffer_size; @@ -1387,7 +1374,7 @@ ZlibDecompressor_dealloc(PyObject *op) PyTypeObject *type = Py_TYPE(op); PyObject_GC_UnTrack(op); ZlibDecompressor *self = ZlibDecompressor_CAST(op); - PyThread_free_lock(self->lock); + assert(!PyMutex_IsLocked(&self->mutex)); if (self->is_initialised) { inflateEnd(&self->zst); } @@ -1545,7 +1532,7 @@ decompress_buf(ZlibDecompressor *self, Py_ssize_t max_length) } while(err != Z_STREAM_END && self->avail_in_real != 0); if (err == Z_STREAM_END) { - self->eof = 1; + FT_ATOMIC_STORE_CHAR_RELAXED(self->eof, 1); self->is_initialised = 0; /* Unlike the Decompress object we call inflateEnd here as there are no backwards compatibility issues */ @@ -1633,7 +1620,7 @@ decompress(ZlibDecompressor *self, uint8_t *data, } if (self->eof) { - self->needs_input = 0; + FT_ATOMIC_STORE_CHAR_RELAXED(self->needs_input, 0); if (self->avail_in_real > 0) { PyObject *unused_data = PyBytes_FromStringAndSize( @@ -1646,10 +1633,10 @@ decompress(ZlibDecompressor *self, uint8_t *data, } else if (self->avail_in_real == 0) { self->zst.next_in = NULL; - self->needs_input = 1; + FT_ATOMIC_STORE_CHAR_RELAXED(self->needs_input, 1); } else { - self->needs_input = 0; + FT_ATOMIC_STORE_CHAR_RELAXED(self->needs_input, 0); /* If we did not use the input buffer, we now have to copy the tail from the caller's buffer into the @@ -1718,14 +1705,14 @@ zlib__ZlibDecompressor_decompress_impl(ZlibDecompressor *self, { PyObject *result = NULL; - ENTER_ZLIB(self); + PyMutex_Lock(&self->mutex); if (self->eof) { PyErr_SetString(PyExc_EOFError, "End of stream already reached"); } else { result = decompress(self, data->buf, data->len, max_length); } - LEAVE_ZLIB(self); + PyMutex_Unlock(&self->mutex); return result; } @@ -1767,12 +1754,7 @@ zlib__ZlibDecompressor_impl(PyTypeObject *type, int wbits, PyObject *zdict) self->zst.next_in = NULL; self->zst.avail_in = 0; self->unused_data = Py_GetConstant(Py_CONSTANT_EMPTY_BYTES); - self->lock = PyThread_allocate_lock(); - if (self->lock == NULL) { - Py_DECREF(self); - PyErr_SetString(PyExc_MemoryError, "Unable to allocate lock"); - return NULL; - } + self->mutex = (PyMutex){0}; int err = inflateInit2(&(self->zst), wbits); switch (err) { case Z_OK: @@ -1827,10 +1809,36 @@ static PyMethodDef ZlibDecompressor_methods[] = { {NULL} }; +static PyObject * +Decomp_unused_data_get(PyObject *op, void *Py_UNUSED(ignored)) +{ + compobject *self = _compobject_CAST(op); + PyMutex_Lock(&self->mutex); + assert(self->unused_data != NULL); + PyObject *result = Py_NewRef(self->unused_data); + PyMutex_Unlock(&self->mutex); + return result; +} + +static PyObject * +Decomp_unconsumed_tail_get(PyObject *op, void *Py_UNUSED(ignored)) +{ + compobject *self = _compobject_CAST(op); + PyMutex_Lock(&self->mutex); + assert(self->unconsumed_tail != NULL); + PyObject *result = Py_NewRef(self->unconsumed_tail); + PyMutex_Unlock(&self->mutex); + return result; +} + +static PyGetSetDef Decomp_getset[] = { + {"unused_data", Decomp_unused_data_get, NULL, NULL}, + {"unconsumed_tail", Decomp_unconsumed_tail_get, NULL, NULL}, + {NULL}, +}; + #define COMP_OFF(x) offsetof(compobject, x) static PyMemberDef Decomp_members[] = { - {"unused_data", _Py_T_OBJECT, COMP_OFF(unused_data), Py_READONLY}, - {"unconsumed_tail", _Py_T_OBJECT, COMP_OFF(unconsumed_tail), Py_READONLY}, {"eof", Py_T_BOOL, COMP_OFF(eof), Py_READONLY}, {NULL}, }; @@ -1844,11 +1852,26 @@ PyDoc_STRVAR(ZlibDecompressor_unused_data__doc__, PyDoc_STRVAR(ZlibDecompressor_needs_input_doc, "True if more input is needed before more decompressed data can be produced."); +static PyObject * +ZlibDecompressor_unused_data_get(PyObject *op, void *Py_UNUSED(ignored)) +{ + ZlibDecompressor *self = ZlibDecompressor_CAST(op); + PyMutex_Lock(&self->mutex); + assert(self->unused_data != NULL); + PyObject *result = Py_NewRef(self->unused_data); + PyMutex_Unlock(&self->mutex); + return result; +} + +static PyGetSetDef ZlibDecompressor_getset[] = { + {"unused_data", ZlibDecompressor_unused_data_get, NULL, + ZlibDecompressor_unused_data__doc__}, + {NULL}, +}; + static PyMemberDef ZlibDecompressor_members[] = { {"eof", Py_T_BOOL, offsetof(ZlibDecompressor, eof), Py_READONLY, ZlibDecompressor_eof__doc__}, - {"unused_data", Py_T_OBJECT_EX, offsetof(ZlibDecompressor, unused_data), - Py_READONLY, ZlibDecompressor_unused_data__doc__}, {"needs_input", Py_T_BOOL, offsetof(ZlibDecompressor, needs_input), Py_READONLY, ZlibDecompressor_needs_input_doc}, {NULL}, @@ -2074,6 +2097,7 @@ static PyType_Slot Decomptype_slots[] = { {Py_tp_traverse, compobject_traverse}, {Py_tp_methods, Decomp_methods}, {Py_tp_members, Decomp_members}, + {Py_tp_getset, Decomp_getset}, {0, 0}, }; @@ -2093,6 +2117,7 @@ static PyType_Slot ZlibDecompressor_type_slots[] = { {Py_tp_dealloc, ZlibDecompressor_dealloc}, {Py_tp_traverse, ZlibDecompressor_traverse}, {Py_tp_members, ZlibDecompressor_members}, + {Py_tp_getset, ZlibDecompressor_getset}, {Py_tp_new, zlib__ZlibDecompressor}, {Py_tp_doc, (char *)zlib__ZlibDecompressor__doc__}, {Py_tp_methods, ZlibDecompressor_methods}, -- 2.47.3