--- /dev/null
+import unittest
+
+from test.support import import_helper, threading_helper
+from test.support.threading_helper import run_concurrently
+
+lzma = import_helper.import_module("lzma")
+from lzma import LZMACompressor, LZMADecompressor
+
+from test.test_lzma import INPUT
+
+
+NTHREADS = 10
+
+
+@threading_helper.requires_working_threading()
+class TestLZMA(unittest.TestCase):
+ def test_compressor(self):
+ lzc = LZMACompressor()
+
+ # First compress() outputs LZMA header
+ header = lzc.compress(INPUT)
+ self.assertGreater(len(header), 0)
+
+ def worker():
+ # it should return empty bytes as it buffers data internally
+ data = lzc.compress(INPUT)
+ self.assertEqual(data, b"")
+
+ run_concurrently(worker_func=worker, nthreads=NTHREADS - 1)
+ full_compressed = header + lzc.flush()
+ decompressed = lzma.decompress(full_compressed)
+ # The decompressed data should be INPUT repeated NTHREADS times
+ self.assertEqual(decompressed, INPUT * NTHREADS)
+
+ def test_decompressor(self):
+ chunk_size = 128
+ chunks = [bytes([ord("a") + i]) * chunk_size for i in range(NTHREADS)]
+ input_data = b"".join(chunks)
+ compressed = lzma.compress(input_data)
+
+ lzd = LZMADecompressor()
+ output = []
+
+ def worker():
+ data = lzd.decompress(compressed, chunk_size)
+ self.assertEqual(len(data), chunk_size)
+ output.append(data)
+
+ run_concurrently(worker_func=worker, nthreads=NTHREADS)
+ self.assertEqual(len(output), NTHREADS)
+ # Verify the expected chunks (order doesn't matter due to append race)
+ self.assertSetEqual(set(output), set(chunks))
+
+
+if __name__ == "__main__":
+ unittest.main()
}
-#define ACQUIRE_LOCK(obj) do { \
- if (!PyThread_acquire_lock((obj)->lock, 0)) { \
- Py_BEGIN_ALLOW_THREADS \
- PyThread_acquire_lock((obj)->lock, 1); \
- Py_END_ALLOW_THREADS \
- } } while (0)
-#define RELEASE_LOCK(obj) PyThread_release_lock((obj)->lock)
typedef struct {
PyTypeObject *lzma_compressor_type;
lzma_allocator alloc;
lzma_stream lzs;
int flushed;
- PyThread_type_lock lock;
+ PyMutex mutex;
} Compressor;
typedef struct {
char needs_input;
uint8_t *input_buffer;
size_t input_buffer_size;
- PyThread_type_lock lock;
+ PyMutex mutex;
} Decompressor;
#define Compressor_CAST(op) ((Compressor *)(op))
{
PyObject *result = NULL;
- ACQUIRE_LOCK(self);
+ PyMutex_Lock(&self->mutex);
if (self->flushed) {
PyErr_SetString(PyExc_ValueError, "Compressor has been flushed");
}
else {
result = compress(self, data->buf, data->len, LZMA_RUN);
}
- RELEASE_LOCK(self);
+ PyMutex_Unlock(&self->mutex);
return result;
}
{
PyObject *result = NULL;
- ACQUIRE_LOCK(self);
+ PyMutex_Lock(&self->mutex);
if (self->flushed) {
PyErr_SetString(PyExc_ValueError, "Repeated call to flush()");
} else {
self->flushed = 1;
result = compress(self, NULL, 0, LZMA_FINISH);
}
- RELEASE_LOCK(self);
+ PyMutex_Unlock(&self->mutex);
return result;
}
self->alloc.free = PyLzma_Free;
self->lzs.allocator = &self->alloc;
- self->lock = PyThread_allocate_lock();
- if (self->lock == NULL) {
- Py_DECREF(self);
- PyErr_SetString(PyExc_MemoryError, "Unable to allocate lock");
- return NULL;
- }
+ self->mutex = (PyMutex){0};
self->flushed = 0;
switch (format) {
Compressor_dealloc(PyObject *op)
{
Compressor *self = Compressor_CAST(op);
+ assert(!PyMutex_IsLocked(&self->mutex));
lzma_end(&self->lzs);
- if (self->lock != NULL) {
- PyThread_free_lock(self->lock);
- }
PyTypeObject *tp = Py_TYPE(self);
tp->tp_free(self);
Py_DECREF(tp);
{
PyObject *result = NULL;
- ACQUIRE_LOCK(self);
+ PyMutex_Lock(&self->mutex);
if (self->eof)
PyErr_SetString(PyExc_EOFError, "Already at end of stream");
else
result = decompress(self, data->buf, data->len, max_length);
- RELEASE_LOCK(self);
+ PyMutex_Unlock(&self->mutex);
return result;
}
self->lzs.allocator = &self->alloc;
self->lzs.next_in = NULL;
- self->lock = PyThread_allocate_lock();
- if (self->lock == NULL) {
- Py_DECREF(self);
- PyErr_SetString(PyExc_MemoryError, "Unable to allocate lock");
- return NULL;
- }
+ self->mutex = (PyMutex){0};
self->check = LZMA_CHECK_UNKNOWN;
self->needs_input = 1;
Decompressor_dealloc(PyObject *op)
{
Decompressor *self = Decompressor_CAST(op);
+ assert(!PyMutex_IsLocked(&self->mutex));
+
if(self->input_buffer != NULL)
PyMem_Free(self->input_buffer);
lzma_end(&self->lzs);
Py_CLEAR(self->unused_data);
- if (self->lock != NULL) {
- PyThread_free_lock(self->lock);
- }
PyTypeObject *tp = Py_TYPE(self);
tp->tp_free(self);
Py_DECREF(tp);