From 1a9cdaf63af7014dd7bd852b4d8a8c0ab98387ab Mon Sep 17 00:00:00 2001 From: Alper Date: Mon, 15 Dec 2025 09:47:04 -0800 Subject: [PATCH] gh-116738: Make _bz2 module thread-safe (gh-142756) Make the attributes in _bz2 module thread-safe on the free-threading build. Attributes (eof, needs_input, unused_data) are now stored atomically or accessed via mutex-protected getters. --- Lib/test/test_free_threading/test_bz2.py | 11 +++++++ ...-12-15-03-20-24.gh-issue-116738.NNHiTK.rst | 2 ++ Modules/_bz2module.c | 31 +++++++++++++++---- 3 files changed, 38 insertions(+), 6 deletions(-) create mode 100644 Misc/NEWS.d/next/Core_and_Builtins/2025-12-15-03-20-24.gh-issue-116738.NNHiTK.rst diff --git a/Lib/test/test_free_threading/test_bz2.py b/Lib/test/test_free_threading/test_bz2.py index 0e09c64d5610..3bb531ec04c6 100644 --- a/Lib/test/test_free_threading/test_bz2.py +++ b/Lib/test/test_free_threading/test_bz2.py @@ -42,11 +42,22 @@ class TestBZ2(unittest.TestCase): data = bz2d.decompress(compressed, chunk_size) self.assertEqual(len(data), chunk_size) output.append(data) + # Read attributes concurrently with other threads decompressing + self.assertIsInstance(bz2d.eof, bool) + self.assertIsInstance(bz2d.needs_input, bool) + self.assertIsInstance(bz2d.unused_data, bytes) run_concurrently(worker_func=worker, nthreads=NTHREADS) self.assertEqual(len(output), NTHREADS) # Verify the expected chunks (order doesn't matter due to append race) self.assertEqual(set(output), set(chunks)) + self.assertTrue(bz2d.eof) + self.assertFalse(bz2d.needs_input) + # Each thread added full compressed data to the buffer, but only 1 copy + # is consumed to produce the output. The rest remains as unused_data. + self.assertEqual( + len(bz2d.unused_data), len(compressed) * (NTHREADS - 1) + ) if __name__ == "__main__": diff --git a/Misc/NEWS.d/next/Core_and_Builtins/2025-12-15-03-20-24.gh-issue-116738.NNHiTK.rst b/Misc/NEWS.d/next/Core_and_Builtins/2025-12-15-03-20-24.gh-issue-116738.NNHiTK.rst new file mode 100644 index 000000000000..bf06dfee2329 --- /dev/null +++ b/Misc/NEWS.d/next/Core_and_Builtins/2025-12-15-03-20-24.gh-issue-116738.NNHiTK.rst @@ -0,0 +1,2 @@ +Make the attributes in :mod:`bz2` thread-safe on the :term:`free threaded +` build. diff --git a/Modules/_bz2module.c b/Modules/_bz2module.c index 452b88dfed29..f3457a13c96c 100644 --- a/Modules/_bz2module.c +++ b/Modules/_bz2module.c @@ -12,6 +12,7 @@ // Blocks output buffer wrappers #include "pycore_blocks_output_buffer.h" +#include "pycore_pyatomic_ft_wrappers.h" // FT_ATOMIC_STORE_CHAR_RELAXED #if OUTPUT_BUFFER_MAX_BLOCK_SIZE > UINT32_MAX #error "The maximum block size accepted by libbzip2 is UINT32_MAX." @@ -437,7 +438,7 @@ decompress_buf(BZ2Decompressor *d, Py_ssize_t max_length) if (catch_bz2_error(bzret)) goto error; if (bzret == BZ_STREAM_END) { - d->eof = 1; + FT_ATOMIC_STORE_CHAR_RELAXED(d->eof, 1); break; } else if (d->bzs_avail_in_real == 0) { break; @@ -521,7 +522,7 @@ decompress(BZ2Decompressor *d, char *data, size_t len, Py_ssize_t max_length) } if (d->eof) { - d->needs_input = 0; + FT_ATOMIC_STORE_CHAR_RELAXED(d->needs_input, 0); if (d->bzs_avail_in_real > 0) { Py_XSETREF(d->unused_data, PyBytes_FromStringAndSize(bzs->next_in, d->bzs_avail_in_real)); @@ -531,10 +532,10 @@ decompress(BZ2Decompressor *d, char *data, size_t len, Py_ssize_t max_length) } else if (d->bzs_avail_in_real == 0) { bzs->next_in = NULL; - d->needs_input = 1; + FT_ATOMIC_STORE_CHAR_RELAXED(d->needs_input, 1); } else { - d->needs_input = 0; + FT_ATOMIC_STORE_CHAR_RELAXED(d->needs_input, 0); /* If we did not use the input buffer, we now have to copy the tail from the caller's buffer into the @@ -682,11 +683,28 @@ PyDoc_STRVAR(BZ2Decompressor_unused_data__doc__, PyDoc_STRVAR(BZ2Decompressor_needs_input_doc, "True if more input is needed before more decompressed data can be produced."); +static PyObject * +BZ2Decompressor_unused_data_get(PyObject *op, void *Py_UNUSED(ignored)) +{ + BZ2Decompressor *self = _BZ2Decompressor_CAST(op); + PyMutex_Lock(&self->mutex); + PyObject *result = Py_XNewRef(self->unused_data); + PyMutex_Unlock(&self->mutex); + if (result == NULL) { + PyErr_SetString(PyExc_AttributeError, "unused_data"); + } + return result; +} + +static PyGetSetDef BZ2Decompressor_getset[] = { + {"unused_data", BZ2Decompressor_unused_data_get, NULL, + BZ2Decompressor_unused_data__doc__}, + {NULL}, +}; + static PyMemberDef BZ2Decompressor_members[] = { {"eof", Py_T_BOOL, offsetof(BZ2Decompressor, eof), Py_READONLY, BZ2Decompressor_eof__doc__}, - {"unused_data", Py_T_OBJECT_EX, offsetof(BZ2Decompressor, unused_data), - Py_READONLY, BZ2Decompressor_unused_data__doc__}, {"needs_input", Py_T_BOOL, offsetof(BZ2Decompressor, needs_input), Py_READONLY, BZ2Decompressor_needs_input_doc}, {NULL} @@ -697,6 +715,7 @@ static PyType_Slot bz2_decompressor_type_slots[] = { {Py_tp_methods, BZ2Decompressor_methods}, {Py_tp_doc, (char *)_bz2_BZ2Decompressor__doc__}, {Py_tp_members, BZ2Decompressor_members}, + {Py_tp_getset, BZ2Decompressor_getset}, {Py_tp_new, _bz2_BZ2Decompressor}, {0, 0} }; -- 2.47.3