data = lzd.decompress(compressed, chunk_size)
self.assertEqual(len(data), chunk_size)
output.append(data)
+ # Read attributes concurrently with other threads decompressing
+ self.assertEqual(lzd.check, lzma.CHECK_CRC64)
+ self.assertIsInstance(lzd.eof, bool)
+ self.assertIsInstance(lzd.needs_input, bool)
+ self.assertIsInstance(lzd.unused_data, bytes)
run_concurrently(worker_func=worker, nthreads=NTHREADS)
self.assertEqual(len(output), NTHREADS)
# Verify the expected chunks (order doesn't matter due to append race)
self.assertSetEqual(set(output), set(chunks))
+ self.assertEqual(lzd.check, lzma.CHECK_CRC64)
+ self.assertTrue(lzd.eof)
+ self.assertFalse(lzd.needs_input)
+ # Each thread added full compressed data to the buffer, but only 1 copy
+ # is consumed to produce the output. The rest remains as unused_data.
+ self.assertEqual(
+ len(lzd.unused_data), len(compressed) * (NTHREADS - 1)
+ )
if __name__ == "__main__":
#include "pycore_long.h" // _PyLong_UInt32_Converter()
// Blocks output buffer wrappers
#include "pycore_blocks_output_buffer.h"
+#include "pycore_pyatomic_ft_wrappers.h" // FT_ATOMIC_STORE_*_RELAXED
#if OUTPUT_BUFFER_MAX_BLOCK_SIZE > SIZE_MAX
#error "The maximum block size accepted by liblzma is SIZE_MAX."
goto error;
}
if (lzret == LZMA_GET_CHECK || lzret == LZMA_NO_CHECK) {
- d->check = lzma_get_check(&d->lzs);
+ FT_ATOMIC_STORE_INT_RELAXED(d->check, lzma_get_check(&d->lzs));
}
if (lzret == LZMA_STREAM_END) {
- d->eof = 1;
+ FT_ATOMIC_STORE_CHAR_RELAXED(d->eof, 1);
break;
} else if (lzs->avail_out == 0) {
/* Need to check lzs->avail_out before lzs->avail_in.
}
if (d->eof) {
- d->needs_input = 0;
+ FT_ATOMIC_STORE_CHAR_RELAXED(d->needs_input, 0);
if (lzs->avail_in > 0) {
- Py_XSETREF(d->unused_data,
- PyBytes_FromStringAndSize((char *)lzs->next_in, lzs->avail_in));
- if (d->unused_data == NULL) {
+ PyObject *unused_data = PyBytes_FromStringAndSize(
+ (char *)lzs->next_in, lzs->avail_in);
+ if (unused_data == NULL) {
goto error;
}
+ Py_XSETREF(d->unused_data, unused_data);
}
}
else if (lzs->avail_in == 0) {
/* (avail_in==0 && avail_out==0)
Maybe lzs's internal state still have a few bytes can
be output, try to output them next time. */
- d->needs_input = 0;
+ FT_ATOMIC_STORE_CHAR_RELAXED(d->needs_input, 0);
/* If max_length < 0, lzs->avail_out always > 0 */
assert(max_length >= 0);
} else {
/* Input buffer exhausted, output buffer has space. */
- d->needs_input = 1;
+ FT_ATOMIC_STORE_CHAR_RELAXED(d->needs_input, 1);
}
}
else {
- d->needs_input = 0;
+ FT_ATOMIC_STORE_CHAR_RELAXED(d->needs_input, 0);
/* If we did not use the input buffer, we now have
to copy the tail from the caller's buffer into the
PyDoc_STRVAR(Decompressor_unused_data_doc,
"Data found after the end of the compressed stream.");
+static PyObject *
+Decompressor_unused_data_get(PyObject *op, void *Py_UNUSED(closure))
+{
+ Decompressor *self = Decompressor_CAST(op);
+ if (!FT_ATOMIC_LOAD_CHAR_RELAXED(self->eof)) {
+ return Py_GetConstant(Py_CONSTANT_EMPTY_BYTES);
+ }
+ PyMutex_Lock(&self->mutex);
+ assert(self->unused_data != NULL);
+ PyObject *result = Py_NewRef(self->unused_data);
+ PyMutex_Unlock(&self->mutex);
+ return result;
+}
+
+static PyGetSetDef Decompressor_getset[] = {
+ {"unused_data", Decompressor_unused_data_get, NULL,
+ Decompressor_unused_data_doc},
+ {NULL},
+};
+
static PyMemberDef Decompressor_members[] = {
{"check", Py_T_INT, offsetof(Decompressor, check), Py_READONLY,
Decompressor_check_doc},
Decompressor_eof_doc},
{"needs_input", Py_T_BOOL, offsetof(Decompressor, needs_input), Py_READONLY,
Decompressor_needs_input_doc},
- {"unused_data", Py_T_OBJECT_EX, offsetof(Decompressor, unused_data), Py_READONLY,
- Decompressor_unused_data_doc},
{NULL}
};
{Py_tp_new, _lzma_LZMADecompressor},
{Py_tp_doc, (char *)_lzma_LZMADecompressor__doc__},
{Py_tp_members, Decompressor_members},
+ {Py_tp_getset, Decompressor_getset},
{0, 0}
};