]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-116738: Use PyMutex for bz2 module (gh-140555)
authorAlper <alperyoney@fb.com>
Mon, 27 Oct 2025 13:52:30 +0000 (06:52 -0700)
committerGitHub <noreply@github.com>
Mon, 27 Oct 2025 13:52:30 +0000 (09:52 -0400)
The methods are already wrapped with a lock, which makes them thread-safe in
free-threaded build. This replaces `PyThread_acquire_lock` with `PyMutex` and
removes some macros and allocation handling code.

Also add a test for free-threading to ensure we aren't getting data races and
that the locking is working.

Lib/test/test_free_threading/test_bz2.py [new file with mode: 0644]
Modules/_bz2module.c

diff --git a/Lib/test/test_free_threading/test_bz2.py b/Lib/test/test_free_threading/test_bz2.py
new file mode 100644 (file)
index 0000000..0e09c64
--- /dev/null
@@ -0,0 +1,53 @@
+import unittest
+
+from test.support import import_helper, threading_helper
+from test.support.threading_helper import run_concurrently
+
+bz2 = import_helper.import_module("bz2")
+from bz2 import BZ2Compressor, BZ2Decompressor
+
+from test.test_bz2 import ext_decompress, BaseTest
+
+
+NTHREADS = 10
+TEXT = BaseTest.TEXT
+
+
+@threading_helper.requires_working_threading()
+class TestBZ2(unittest.TestCase):
+    def test_compressor(self):
+        bz2c = BZ2Compressor()
+
+        def worker():
+            # it should return empty bytes as it buffers data internally
+            data = bz2c.compress(TEXT)
+            self.assertEqual(data, b"")
+
+        run_concurrently(worker_func=worker, nthreads=NTHREADS)
+        data = bz2c.flush()
+        # The decompressed data should be TEXT repeated NTHREADS times
+        decompressed = ext_decompress(data)
+        self.assertEqual(decompressed, TEXT * NTHREADS)
+
+    def test_decompressor(self):
+        chunk_size = 128
+        chunks = [bytes([ord("a") + i]) * chunk_size for i in range(NTHREADS)]
+        input_data = b"".join(chunks)
+        compressed = bz2.compress(input_data)
+
+        bz2d = BZ2Decompressor()
+        output = []
+
+        def worker():
+            data = bz2d.decompress(compressed, chunk_size)
+            self.assertEqual(len(data), chunk_size)
+            output.append(data)
+
+        run_concurrently(worker_func=worker, nthreads=NTHREADS)
+        self.assertEqual(len(output), NTHREADS)
+        # Verify the expected chunks (order doesn't matter due to append race)
+        self.assertEqual(set(output), set(chunks))
+
+
+if __name__ == "__main__":
+    unittest.main()
index 9721b493a19956bd6a64f083fbd48559485c0beb..452b88dfed29cedbdb72dc5c3df55615bcdf2b29 100644 (file)
@@ -97,20 +97,11 @@ OutputBuffer_OnError(_BlocksOutputBuffer *buffer)
 #endif  /* ! BZ_CONFIG_ERROR */
 
 
-#define ACQUIRE_LOCK(obj) do { \
-    if (!PyThread_acquire_lock((obj)->lock, 0)) { \
-        Py_BEGIN_ALLOW_THREADS \
-        PyThread_acquire_lock((obj)->lock, 1); \
-        Py_END_ALLOW_THREADS \
-    } } while (0)
-#define RELEASE_LOCK(obj) PyThread_release_lock((obj)->lock)
-
-
 typedef struct {
     PyObject_HEAD
     bz_stream bzs;
     int flushed;
-    PyThread_type_lock lock;
+    PyMutex mutex;
 } BZ2Compressor;
 
 typedef struct {
@@ -126,7 +117,7 @@ typedef struct {
        separately. Conversion and looping is encapsulated in
        decompress_buf() */
     size_t bzs_avail_in_real;
-    PyThread_type_lock lock;
+    PyMutex mutex;
 } BZ2Decompressor;
 
 #define _BZ2Compressor_CAST(op)     ((BZ2Compressor *)(op))
@@ -271,12 +262,12 @@ _bz2_BZ2Compressor_compress_impl(BZ2Compressor *self, Py_buffer *data)
 {
     PyObject *result = NULL;
 
-    ACQUIRE_LOCK(self);
+    PyMutex_Lock(&self->mutex);
     if (self->flushed)
         PyErr_SetString(PyExc_ValueError, "Compressor has been flushed");
     else
         result = compress(self, data->buf, data->len, BZ_RUN);
-    RELEASE_LOCK(self);
+    PyMutex_Unlock(&self->mutex);
     return result;
 }
 
@@ -296,14 +287,14 @@ _bz2_BZ2Compressor_flush_impl(BZ2Compressor *self)
 {
     PyObject *result = NULL;
 
-    ACQUIRE_LOCK(self);
+    PyMutex_Lock(&self->mutex);
     if (self->flushed)
         PyErr_SetString(PyExc_ValueError, "Repeated call to flush()");
     else {
         self->flushed = 1;
         result = compress(self, NULL, 0, BZ_FINISH);
     }
-    RELEASE_LOCK(self);
+    PyMutex_Unlock(&self->mutex);
     return result;
 }
 
@@ -357,13 +348,7 @@ _bz2_BZ2Compressor_impl(PyTypeObject *type, int compresslevel)
         return NULL;
     }
 
-    self->lock = PyThread_allocate_lock();
-    if (self->lock == NULL) {
-        Py_DECREF(self);
-        PyErr_SetString(PyExc_MemoryError, "Unable to allocate lock");
-        return NULL;
-    }
-
+    self->mutex = (PyMutex){0};
     self->bzs.opaque = NULL;
     self->bzs.bzalloc = BZ2_Malloc;
     self->bzs.bzfree = BZ2_Free;
@@ -382,10 +367,8 @@ static void
 BZ2Compressor_dealloc(PyObject *op)
 {
     BZ2Compressor *self = _BZ2Compressor_CAST(op);
+    assert(!PyMutex_IsLocked(&self->mutex));
     BZ2_bzCompressEnd(&self->bzs);
-    if (self->lock != NULL) {
-        PyThread_free_lock(self->lock);
-    }
     PyTypeObject *tp = Py_TYPE(self);
     tp->tp_free((PyObject *)self);
     Py_DECREF(tp);
@@ -619,12 +602,12 @@ _bz2_BZ2Decompressor_decompress_impl(BZ2Decompressor *self, Py_buffer *data,
 {
     PyObject *result = NULL;
 
-    ACQUIRE_LOCK(self);
+    PyMutex_Lock(&self->mutex);
     if (self->eof)
         PyErr_SetString(PyExc_EOFError, "End of stream already reached");
     else
         result = decompress(self, data->buf, data->len, max_length);
-    RELEASE_LOCK(self);
+    PyMutex_Unlock(&self->mutex);
     return result;
 }
 
@@ -650,13 +633,7 @@ _bz2_BZ2Decompressor_impl(PyTypeObject *type)
         return NULL;
     }
 
-    self->lock = PyThread_allocate_lock();
-    if (self->lock == NULL) {
-        Py_DECREF(self);
-        PyErr_SetString(PyExc_MemoryError, "Unable to allocate lock");
-        return NULL;
-    }
-
+    self->mutex = (PyMutex){0};
     self->needs_input = 1;
     self->bzs_avail_in_real = 0;
     self->input_buffer = NULL;
@@ -678,15 +655,13 @@ static void
 BZ2Decompressor_dealloc(PyObject *op)
 {
     BZ2Decompressor *self = _BZ2Decompressor_CAST(op);
+    assert(!PyMutex_IsLocked(&self->mutex));
 
     if(self->input_buffer != NULL) {
         PyMem_Free(self->input_buffer);
     }
     BZ2_bzDecompressEnd(&self->bzs);
     Py_CLEAR(self->unused_data);
-    if (self->lock != NULL) {
-        PyThread_free_lock(self->lock);
-    }
 
     PyTypeObject *tp = Py_TYPE(self);
     tp->tp_free((PyObject *)self);