From: Alper Date: Mon, 27 Oct 2025 13:52:30 +0000 (-0700) Subject: gh-116738: Use PyMutex for bz2 module (gh-140555) X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=9479a62a51e6fbcb3372f5ac07b3ab861e7d17e3;p=thirdparty%2FPython%2Fcpython.git gh-116738: Use PyMutex for bz2 module (gh-140555) The methods are already wrapped with a lock, which makes them thread-safe in free-threaded build. This replaces `PyThread_acquire_lock` with `PyMutex` and removes some macros and allocation handling code. Also add a test for free-threading to ensure we aren't getting data races and that the locking is working. --- diff --git a/Lib/test/test_free_threading/test_bz2.py b/Lib/test/test_free_threading/test_bz2.py new file mode 100644 index 000000000000..0e09c64d5610 --- /dev/null +++ b/Lib/test/test_free_threading/test_bz2.py @@ -0,0 +1,53 @@ +import unittest + +from test.support import import_helper, threading_helper +from test.support.threading_helper import run_concurrently + +bz2 = import_helper.import_module("bz2") +from bz2 import BZ2Compressor, BZ2Decompressor + +from test.test_bz2 import ext_decompress, BaseTest + + +NTHREADS = 10 +TEXT = BaseTest.TEXT + + +@threading_helper.requires_working_threading() +class TestBZ2(unittest.TestCase): + def test_compressor(self): + bz2c = BZ2Compressor() + + def worker(): + # it should return empty bytes as it buffers data internally + data = bz2c.compress(TEXT) + self.assertEqual(data, b"") + + run_concurrently(worker_func=worker, nthreads=NTHREADS) + data = bz2c.flush() + # The decompressed data should be TEXT repeated NTHREADS times + decompressed = ext_decompress(data) + self.assertEqual(decompressed, TEXT * NTHREADS) + + def test_decompressor(self): + chunk_size = 128 + chunks = [bytes([ord("a") + i]) * chunk_size for i in range(NTHREADS)] + input_data = b"".join(chunks) + compressed = bz2.compress(input_data) + + bz2d = BZ2Decompressor() + output = [] + + def worker(): + data = bz2d.decompress(compressed, chunk_size) + self.assertEqual(len(data), chunk_size) + output.append(data) + + run_concurrently(worker_func=worker, nthreads=NTHREADS) + self.assertEqual(len(output), NTHREADS) + # Verify the expected chunks (order doesn't matter due to append race) + self.assertEqual(set(output), set(chunks)) + + +if __name__ == "__main__": + unittest.main() diff --git a/Modules/_bz2module.c b/Modules/_bz2module.c index 9721b493a199..452b88dfed29 100644 --- a/Modules/_bz2module.c +++ b/Modules/_bz2module.c @@ -97,20 +97,11 @@ OutputBuffer_OnError(_BlocksOutputBuffer *buffer) #endif /* ! BZ_CONFIG_ERROR */ -#define ACQUIRE_LOCK(obj) do { \ - if (!PyThread_acquire_lock((obj)->lock, 0)) { \ - Py_BEGIN_ALLOW_THREADS \ - PyThread_acquire_lock((obj)->lock, 1); \ - Py_END_ALLOW_THREADS \ - } } while (0) -#define RELEASE_LOCK(obj) PyThread_release_lock((obj)->lock) - - typedef struct { PyObject_HEAD bz_stream bzs; int flushed; - PyThread_type_lock lock; + PyMutex mutex; } BZ2Compressor; typedef struct { @@ -126,7 +117,7 @@ typedef struct { separately. Conversion and looping is encapsulated in decompress_buf() */ size_t bzs_avail_in_real; - PyThread_type_lock lock; + PyMutex mutex; } BZ2Decompressor; #define _BZ2Compressor_CAST(op) ((BZ2Compressor *)(op)) @@ -271,12 +262,12 @@ _bz2_BZ2Compressor_compress_impl(BZ2Compressor *self, Py_buffer *data) { PyObject *result = NULL; - ACQUIRE_LOCK(self); + PyMutex_Lock(&self->mutex); if (self->flushed) PyErr_SetString(PyExc_ValueError, "Compressor has been flushed"); else result = compress(self, data->buf, data->len, BZ_RUN); - RELEASE_LOCK(self); + PyMutex_Unlock(&self->mutex); return result; } @@ -296,14 +287,14 @@ _bz2_BZ2Compressor_flush_impl(BZ2Compressor *self) { PyObject *result = NULL; - ACQUIRE_LOCK(self); + PyMutex_Lock(&self->mutex); if (self->flushed) PyErr_SetString(PyExc_ValueError, "Repeated call to flush()"); else { self->flushed = 1; result = compress(self, NULL, 0, BZ_FINISH); } - RELEASE_LOCK(self); + PyMutex_Unlock(&self->mutex); return result; } @@ -357,13 +348,7 @@ _bz2_BZ2Compressor_impl(PyTypeObject *type, int compresslevel) return NULL; } - self->lock = PyThread_allocate_lock(); - if (self->lock == NULL) { - Py_DECREF(self); - PyErr_SetString(PyExc_MemoryError, "Unable to allocate lock"); - return NULL; - } - + self->mutex = (PyMutex){0}; self->bzs.opaque = NULL; self->bzs.bzalloc = BZ2_Malloc; self->bzs.bzfree = BZ2_Free; @@ -382,10 +367,8 @@ static void BZ2Compressor_dealloc(PyObject *op) { BZ2Compressor *self = _BZ2Compressor_CAST(op); + assert(!PyMutex_IsLocked(&self->mutex)); BZ2_bzCompressEnd(&self->bzs); - if (self->lock != NULL) { - PyThread_free_lock(self->lock); - } PyTypeObject *tp = Py_TYPE(self); tp->tp_free((PyObject *)self); Py_DECREF(tp); @@ -619,12 +602,12 @@ _bz2_BZ2Decompressor_decompress_impl(BZ2Decompressor *self, Py_buffer *data, { PyObject *result = NULL; - ACQUIRE_LOCK(self); + PyMutex_Lock(&self->mutex); if (self->eof) PyErr_SetString(PyExc_EOFError, "End of stream already reached"); else result = decompress(self, data->buf, data->len, max_length); - RELEASE_LOCK(self); + PyMutex_Unlock(&self->mutex); return result; } @@ -650,13 +633,7 @@ _bz2_BZ2Decompressor_impl(PyTypeObject *type) return NULL; } - self->lock = PyThread_allocate_lock(); - if (self->lock == NULL) { - Py_DECREF(self); - PyErr_SetString(PyExc_MemoryError, "Unable to allocate lock"); - return NULL; - } - + self->mutex = (PyMutex){0}; self->needs_input = 1; self->bzs_avail_in_real = 0; self->input_buffer = NULL; @@ -678,15 +655,13 @@ static void BZ2Decompressor_dealloc(PyObject *op) { BZ2Decompressor *self = _BZ2Decompressor_CAST(op); + assert(!PyMutex_IsLocked(&self->mutex)); if(self->input_buffer != NULL) { PyMem_Free(self->input_buffer); } BZ2_bzDecompressEnd(&self->bzs); Py_CLEAR(self->unused_data); - if (self->lock != NULL) { - PyThread_free_lock(self->lock); - } PyTypeObject *tp = Py_TYPE(self); tp->tp_free((PyObject *)self);