--- /dev/null
+import unittest
+
+from test.support import import_helper, threading_helper
+from test.support.threading_helper import run_concurrently
+
+bz2 = import_helper.import_module("bz2")
+from bz2 import BZ2Compressor, BZ2Decompressor
+
+from test.test_bz2 import ext_decompress, BaseTest
+
+
+NTHREADS = 10
+TEXT = BaseTest.TEXT
+
+
+@threading_helper.requires_working_threading()
+class TestBZ2(unittest.TestCase):
+ def test_compressor(self):
+ bz2c = BZ2Compressor()
+
+ def worker():
+ # it should return empty bytes as it buffers data internally
+ data = bz2c.compress(TEXT)
+ self.assertEqual(data, b"")
+
+ run_concurrently(worker_func=worker, nthreads=NTHREADS)
+ data = bz2c.flush()
+ # The decompressed data should be TEXT repeated NTHREADS times
+ decompressed = ext_decompress(data)
+ self.assertEqual(decompressed, TEXT * NTHREADS)
+
+ def test_decompressor(self):
+ chunk_size = 128
+ chunks = [bytes([ord("a") + i]) * chunk_size for i in range(NTHREADS)]
+ input_data = b"".join(chunks)
+ compressed = bz2.compress(input_data)
+
+ bz2d = BZ2Decompressor()
+ output = []
+
+ def worker():
+ data = bz2d.decompress(compressed, chunk_size)
+ self.assertEqual(len(data), chunk_size)
+ output.append(data)
+
+ run_concurrently(worker_func=worker, nthreads=NTHREADS)
+ self.assertEqual(len(output), NTHREADS)
+ # Verify the expected chunks (order doesn't matter due to append race)
+ self.assertEqual(set(output), set(chunks))
+
+
+if __name__ == "__main__":
+ unittest.main()
#endif /* ! BZ_CONFIG_ERROR */
-#define ACQUIRE_LOCK(obj) do { \
- if (!PyThread_acquire_lock((obj)->lock, 0)) { \
- Py_BEGIN_ALLOW_THREADS \
- PyThread_acquire_lock((obj)->lock, 1); \
- Py_END_ALLOW_THREADS \
- } } while (0)
-#define RELEASE_LOCK(obj) PyThread_release_lock((obj)->lock)
-
-
typedef struct {
PyObject_HEAD
bz_stream bzs;
int flushed;
- PyThread_type_lock lock;
+ PyMutex mutex;
} BZ2Compressor;
typedef struct {
separately. Conversion and looping is encapsulated in
decompress_buf() */
size_t bzs_avail_in_real;
- PyThread_type_lock lock;
+ PyMutex mutex;
} BZ2Decompressor;
#define _BZ2Compressor_CAST(op) ((BZ2Compressor *)(op))
{
PyObject *result = NULL;
- ACQUIRE_LOCK(self);
+ PyMutex_Lock(&self->mutex);
if (self->flushed)
PyErr_SetString(PyExc_ValueError, "Compressor has been flushed");
else
result = compress(self, data->buf, data->len, BZ_RUN);
- RELEASE_LOCK(self);
+ PyMutex_Unlock(&self->mutex);
return result;
}
{
PyObject *result = NULL;
- ACQUIRE_LOCK(self);
+ PyMutex_Lock(&self->mutex);
if (self->flushed)
PyErr_SetString(PyExc_ValueError, "Repeated call to flush()");
else {
self->flushed = 1;
result = compress(self, NULL, 0, BZ_FINISH);
}
- RELEASE_LOCK(self);
+ PyMutex_Unlock(&self->mutex);
return result;
}
return NULL;
}
- self->lock = PyThread_allocate_lock();
- if (self->lock == NULL) {
- Py_DECREF(self);
- PyErr_SetString(PyExc_MemoryError, "Unable to allocate lock");
- return NULL;
- }
-
+ self->mutex = (PyMutex){0};
self->bzs.opaque = NULL;
self->bzs.bzalloc = BZ2_Malloc;
self->bzs.bzfree = BZ2_Free;
BZ2Compressor_dealloc(PyObject *op)
{
BZ2Compressor *self = _BZ2Compressor_CAST(op);
+ assert(!PyMutex_IsLocked(&self->mutex));
BZ2_bzCompressEnd(&self->bzs);
- if (self->lock != NULL) {
- PyThread_free_lock(self->lock);
- }
PyTypeObject *tp = Py_TYPE(self);
tp->tp_free((PyObject *)self);
Py_DECREF(tp);
{
PyObject *result = NULL;
- ACQUIRE_LOCK(self);
+ PyMutex_Lock(&self->mutex);
if (self->eof)
PyErr_SetString(PyExc_EOFError, "End of stream already reached");
else
result = decompress(self, data->buf, data->len, max_length);
- RELEASE_LOCK(self);
+ PyMutex_Unlock(&self->mutex);
return result;
}
return NULL;
}
- self->lock = PyThread_allocate_lock();
- if (self->lock == NULL) {
- Py_DECREF(self);
- PyErr_SetString(PyExc_MemoryError, "Unable to allocate lock");
- return NULL;
- }
-
+ self->mutex = (PyMutex){0};
self->needs_input = 1;
self->bzs_avail_in_real = 0;
self->input_buffer = NULL;
BZ2Decompressor_dealloc(PyObject *op)
{
BZ2Decompressor *self = _BZ2Decompressor_CAST(op);
+ assert(!PyMutex_IsLocked(&self->mutex));
if(self->input_buffer != NULL) {
PyMem_Free(self->input_buffer);
}
BZ2_bzDecompressEnd(&self->bzs);
Py_CLEAR(self->unused_data);
- if (self->lock != NULL) {
- PyThread_free_lock(self->lock);
- }
PyTypeObject *tp = Py_TYPE(self);
tp->tp_free((PyObject *)self);