data = bz2d.decompress(compressed, chunk_size)
self.assertEqual(len(data), chunk_size)
output.append(data)
+ # Read attributes concurrently with other threads decompressing
+ self.assertIsInstance(bz2d.eof, bool)
+ self.assertIsInstance(bz2d.needs_input, bool)
+ self.assertIsInstance(bz2d.unused_data, bytes)
run_concurrently(worker_func=worker, nthreads=NTHREADS)
self.assertEqual(len(output), NTHREADS)
# Verify the expected chunks (order doesn't matter due to append race)
self.assertEqual(set(output), set(chunks))
+ self.assertTrue(bz2d.eof)
+ self.assertFalse(bz2d.needs_input)
+ # Each thread added full compressed data to the buffer, but only 1 copy
+ # is consumed to produce the output. The rest remains as unused_data.
+ self.assertEqual(
+ len(bz2d.unused_data), len(compressed) * (NTHREADS - 1)
+ )
if __name__ == "__main__":
// Blocks output buffer wrappers
#include "pycore_blocks_output_buffer.h"
+#include "pycore_pyatomic_ft_wrappers.h" // FT_ATOMIC_STORE_CHAR_RELAXED
#if OUTPUT_BUFFER_MAX_BLOCK_SIZE > UINT32_MAX
#error "The maximum block size accepted by libbzip2 is UINT32_MAX."
if (catch_bz2_error(bzret))
goto error;
if (bzret == BZ_STREAM_END) {
- d->eof = 1;
+ FT_ATOMIC_STORE_CHAR_RELAXED(d->eof, 1);
break;
} else if (d->bzs_avail_in_real == 0) {
break;
}
if (d->eof) {
- d->needs_input = 0;
+ FT_ATOMIC_STORE_CHAR_RELAXED(d->needs_input, 0);
if (d->bzs_avail_in_real > 0) {
Py_XSETREF(d->unused_data,
PyBytes_FromStringAndSize(bzs->next_in, d->bzs_avail_in_real));
}
else if (d->bzs_avail_in_real == 0) {
bzs->next_in = NULL;
- d->needs_input = 1;
+ FT_ATOMIC_STORE_CHAR_RELAXED(d->needs_input, 1);
}
else {
- d->needs_input = 0;
+ FT_ATOMIC_STORE_CHAR_RELAXED(d->needs_input, 0);
/* If we did not use the input buffer, we now have
to copy the tail from the caller's buffer into the
PyDoc_STRVAR(BZ2Decompressor_needs_input_doc,
"True if more input is needed before more decompressed data can be produced.");
+static PyObject *
+BZ2Decompressor_unused_data_get(PyObject *op, void *Py_UNUSED(ignored))
+{
+ BZ2Decompressor *self = _BZ2Decompressor_CAST(op);
+ PyMutex_Lock(&self->mutex);
+ PyObject *result = Py_XNewRef(self->unused_data);
+ PyMutex_Unlock(&self->mutex);
+ if (result == NULL) {
+ PyErr_SetString(PyExc_AttributeError, "unused_data");
+ }
+ return result;
+}
+
+static PyGetSetDef BZ2Decompressor_getset[] = {
+ {"unused_data", BZ2Decompressor_unused_data_get, NULL,
+ BZ2Decompressor_unused_data__doc__},
+ {NULL},
+};
+
static PyMemberDef BZ2Decompressor_members[] = {
{"eof", Py_T_BOOL, offsetof(BZ2Decompressor, eof),
Py_READONLY, BZ2Decompressor_eof__doc__},
- {"unused_data", Py_T_OBJECT_EX, offsetof(BZ2Decompressor, unused_data),
- Py_READONLY, BZ2Decompressor_unused_data__doc__},
{"needs_input", Py_T_BOOL, offsetof(BZ2Decompressor, needs_input), Py_READONLY,
BZ2Decompressor_needs_input_doc},
{NULL}
{Py_tp_methods, BZ2Decompressor_methods},
{Py_tp_doc, (char *)_bz2_BZ2Decompressor__doc__},
{Py_tp_members, BZ2Decompressor_members},
+ {Py_tp_getset, BZ2Decompressor_getset},
{Py_tp_new, _bz2_BZ2Decompressor},
{0, 0}
};