--- /dev/null
+import itertools
+import unittest
+
+from test.support import import_helper, threading_helper
+from test.support.threading_helper import run_concurrently
+
+zlib = import_helper.import_module("zlib")
+
+from test.test_zlib import HAMLET_SCENE
+
+
+NTHREADS = 10
+
+
+@threading_helper.requires_working_threading()
+class TestZlib(unittest.TestCase):
+ def test_compressor(self):
+ comp = zlib.compressobj()
+
+ # First compress() outputs zlib header
+ header = comp.compress(HAMLET_SCENE)
+ self.assertGreater(len(header), 0)
+
+ def worker():
+ # it should return empty bytes as it buffers data internally
+ data = comp.compress(HAMLET_SCENE)
+ self.assertEqual(data, b"")
+
+ run_concurrently(worker_func=worker, nthreads=NTHREADS - 1)
+ full_compressed = header + comp.flush()
+ decompressed = zlib.decompress(full_compressed)
+ # The decompressed data should be HAMLET_SCENE repeated NTHREADS times
+ self.assertEqual(decompressed, HAMLET_SCENE * NTHREADS)
+
+ def test_decompressor_concurrent_attribute_reads(self):
+ input_data = HAMLET_SCENE * NTHREADS
+ compressed = zlib.compress(input_data)
+
+ decomp = zlib.decompressobj()
+ decomp_size_per_loop = len(input_data) // 1000
+ decompressed_parts = []
+
+ def decomp_worker():
+ # Decompress in chunks, which updates eof, unused_data, unconsumed_tail
+ decompressed_parts.append(
+ decomp.decompress(compressed, decomp_size_per_loop)
+ )
+ while decomp.unconsumed_tail:
+ decompressed_parts.append(
+ decomp.decompress(
+ decomp.unconsumed_tail, decomp_size_per_loop
+ )
+ )
+
+ def decomp_attr_reader():
+ # Read attributes concurrently while another thread decompresses
+ for _ in range(1000):
+ _ = decomp.unused_data
+ _ = decomp.unconsumed_tail
+ _ = decomp.eof
+
+ counter = itertools.count()
+
+ def worker():
+ # First thread decompresses, others read attributes
+ if next(counter) == 0:
+ decomp_worker()
+ else:
+ decomp_attr_reader()
+
+ run_concurrently(worker_func=worker, nthreads=NTHREADS)
+
+ self.assertTrue(decomp.eof)
+ self.assertEqual(decomp.unused_data, b"")
+ decompressed = b"".join(decompressed_parts)
+ self.assertEqual(decompressed, HAMLET_SCENE * NTHREADS)
+
+
+if __name__ == "__main__":
+ unittest.main()
#endif
#include "Python.h"
+#include "pycore_pyatomic_ft_wrappers.h" // FT_ATOMIC_STORE_CHAR_RELAXED
#include "zlib.h"
#include "stdbool.h"
}
-#define ENTER_ZLIB(obj) do { \
- if (!PyThread_acquire_lock((obj)->lock, 0)) { \
- Py_BEGIN_ALLOW_THREADS \
- PyThread_acquire_lock((obj)->lock, 1); \
- Py_END_ALLOW_THREADS \
- } } while (0)
-#define LEAVE_ZLIB(obj) PyThread_release_lock((obj)->lock);
-
-
/* The following parameters are copied from zutil.h, version 0.95 */
#define DEFLATED 8
#if MAX_MEM_LEVEL >= 8
char eof;
bool is_initialised;
PyObject *zdict;
- PyThread_type_lock lock;
+ PyMutex mutex;
} compobject;
#define _compobject_CAST(op) ((compobject *)op)
Py_DECREF(self);
return NULL;
}
- self->lock = PyThread_allocate_lock();
- if (self->lock == NULL) {
- Py_DECREF(self);
- PyErr_SetString(PyExc_MemoryError, "Unable to allocate lock");
- return NULL;
- }
+ self->mutex = (PyMutex){0};
return self;
}
PyTypeObject *type = Py_TYPE(op);
PyObject_GC_UnTrack(op);
compobject *self = _compobject_CAST(op);
+ assert(!PyMutex_IsLocked(&self->mutex));
if (self->is_initialised) {
(void)dealloc(&self->zst);
}
- PyThread_free_lock(self->lock);
Py_XDECREF(self->unused_data);
Py_XDECREF(self->unconsumed_tail);
Py_XDECREF(self->zdict);
_BlocksOutputBuffer buffer = {.writer = NULL};
zlibstate *state = PyType_GetModuleState(cls);
- ENTER_ZLIB(self);
+ PyMutex_Lock(&self->mutex);
self->zst.next_in = data->buf;
Py_ssize_t ibuflen = data->len;
OutputBuffer_OnError(&buffer);
return_value = NULL;
success:
- LEAVE_ZLIB(self);
+ PyMutex_Unlock(&self->mutex);
return return_value;
}
max_length = -1;
}
- ENTER_ZLIB(self);
+ PyMutex_Lock(&self->mutex);
self->zst.next_in = data->buf;
ibuflen = data->len;
if (err == Z_STREAM_END) {
/* This is the logical place to call inflateEnd, but the old behaviour
of only calling it on flush() is preserved. */
- self->eof = 1;
+ FT_ATOMIC_STORE_CHAR_RELAXED(self->eof, 1);
} else if (err != Z_OK && err != Z_BUF_ERROR) {
/* We will only get Z_BUF_ERROR if the output buffer was full
but there wasn't more output when we tried again, so it is
OutputBuffer_OnError(&buffer);
return_value = NULL;
success:
- LEAVE_ZLIB(self);
+ PyMutex_Unlock(&self->mutex);
return return_value;
}
return Py_GetConstant(Py_CONSTANT_EMPTY_BYTES);
}
- ENTER_ZLIB(self);
+ PyMutex_Lock(&self->mutex);
self->zst.avail_in = 0;
OutputBuffer_OnError(&buffer);
return_value = NULL;
success:
- LEAVE_ZLIB(self);
+ PyMutex_Unlock(&self->mutex);
return return_value;
}
if (!return_value) return NULL;
/* Copy the zstream state
- * We use ENTER_ZLIB / LEAVE_ZLIB to make this thread-safe
+ * We use mutex to make this thread-safe
*/
- ENTER_ZLIB(self);
+ PyMutex_Lock(&self->mutex);
int err = deflateCopy(&return_value->zst, &self->zst);
switch (err) {
case Z_OK:
/* Mark it as being initialized */
return_value->is_initialised = 1;
- LEAVE_ZLIB(self);
+ PyMutex_Unlock(&self->mutex);
return (PyObject *)return_value;
error:
- LEAVE_ZLIB(self);
+ PyMutex_Unlock(&self->mutex);
Py_XDECREF(return_value);
return NULL;
}
if (!return_value) return NULL;
/* Copy the zstream state
- * We use ENTER_ZLIB / LEAVE_ZLIB to make this thread-safe
+ * We use mutex to make this thread-safe
*/
- ENTER_ZLIB(self);
+ PyMutex_Lock(&self->mutex);
int err = inflateCopy(&return_value->zst, &self->zst);
switch (err) {
case Z_OK:
/* Mark it as being initialized */
return_value->is_initialised = 1;
- LEAVE_ZLIB(self);
+ PyMutex_Unlock(&self->mutex);
return (PyObject *)return_value;
error:
- LEAVE_ZLIB(self);
+ PyMutex_Unlock(&self->mutex);
Py_XDECREF(return_value);
return NULL;
}
return NULL;
}
- ENTER_ZLIB(self);
+ PyMutex_Lock(&self->mutex);
if (PyObject_GetBuffer(self->unconsumed_tail, &data, PyBUF_SIMPLE) == -1) {
- LEAVE_ZLIB(self);
+ PyMutex_Unlock(&self->mutex);
return NULL;
}
/* If at end of stream, clean up any memory allocated by zlib. */
if (err == Z_STREAM_END) {
- self->eof = 1;
+ FT_ATOMIC_STORE_CHAR_RELAXED(self->eof, 1);
self->is_initialised = 0;
err = inflateEnd(&self->zst);
if (err != Z_OK) {
return_value = NULL;
success:
PyBuffer_Release(&data);
- LEAVE_ZLIB(self);
+ PyMutex_Unlock(&self->mutex);
return return_value;
}
PyObject_HEAD
z_stream zst;
PyObject *zdict;
- PyThread_type_lock lock;
+ PyMutex mutex;
PyObject *unused_data;
uint8_t *input_buffer;
Py_ssize_t input_buffer_size;
PyTypeObject *type = Py_TYPE(op);
PyObject_GC_UnTrack(op);
ZlibDecompressor *self = ZlibDecompressor_CAST(op);
- PyThread_free_lock(self->lock);
+ assert(!PyMutex_IsLocked(&self->mutex));
if (self->is_initialised) {
inflateEnd(&self->zst);
}
} while(err != Z_STREAM_END && self->avail_in_real != 0);
if (err == Z_STREAM_END) {
- self->eof = 1;
+ FT_ATOMIC_STORE_CHAR_RELAXED(self->eof, 1);
self->is_initialised = 0;
/* Unlike the Decompress object we call inflateEnd here as there are no
backwards compatibility issues */
}
if (self->eof) {
- self->needs_input = 0;
+ FT_ATOMIC_STORE_CHAR_RELAXED(self->needs_input, 0);
if (self->avail_in_real > 0) {
PyObject *unused_data = PyBytes_FromStringAndSize(
}
else if (self->avail_in_real == 0) {
self->zst.next_in = NULL;
- self->needs_input = 1;
+ FT_ATOMIC_STORE_CHAR_RELAXED(self->needs_input, 1);
}
else {
- self->needs_input = 0;
+ FT_ATOMIC_STORE_CHAR_RELAXED(self->needs_input, 0);
/* If we did not use the input buffer, we now have
to copy the tail from the caller's buffer into the
{
PyObject *result = NULL;
- ENTER_ZLIB(self);
+ PyMutex_Lock(&self->mutex);
if (self->eof) {
PyErr_SetString(PyExc_EOFError, "End of stream already reached");
}
else {
result = decompress(self, data->buf, data->len, max_length);
}
- LEAVE_ZLIB(self);
+ PyMutex_Unlock(&self->mutex);
return result;
}
self->zst.next_in = NULL;
self->zst.avail_in = 0;
self->unused_data = Py_GetConstant(Py_CONSTANT_EMPTY_BYTES);
- self->lock = PyThread_allocate_lock();
- if (self->lock == NULL) {
- Py_DECREF(self);
- PyErr_SetString(PyExc_MemoryError, "Unable to allocate lock");
- return NULL;
- }
+ self->mutex = (PyMutex){0};
int err = inflateInit2(&(self->zst), wbits);
switch (err) {
case Z_OK:
{NULL}
};
+static PyObject *
+Decomp_unused_data_get(PyObject *op, void *Py_UNUSED(ignored))
+{
+ compobject *self = _compobject_CAST(op);
+ PyMutex_Lock(&self->mutex);
+ assert(self->unused_data != NULL);
+ PyObject *result = Py_NewRef(self->unused_data);
+ PyMutex_Unlock(&self->mutex);
+ return result;
+}
+
+static PyObject *
+Decomp_unconsumed_tail_get(PyObject *op, void *Py_UNUSED(ignored))
+{
+ compobject *self = _compobject_CAST(op);
+ PyMutex_Lock(&self->mutex);
+ assert(self->unconsumed_tail != NULL);
+ PyObject *result = Py_NewRef(self->unconsumed_tail);
+ PyMutex_Unlock(&self->mutex);
+ return result;
+}
+
+static PyGetSetDef Decomp_getset[] = {
+ {"unused_data", Decomp_unused_data_get, NULL, NULL},
+ {"unconsumed_tail", Decomp_unconsumed_tail_get, NULL, NULL},
+ {NULL},
+};
+
#define COMP_OFF(x) offsetof(compobject, x)
static PyMemberDef Decomp_members[] = {
- {"unused_data", _Py_T_OBJECT, COMP_OFF(unused_data), Py_READONLY},
- {"unconsumed_tail", _Py_T_OBJECT, COMP_OFF(unconsumed_tail), Py_READONLY},
{"eof", Py_T_BOOL, COMP_OFF(eof), Py_READONLY},
{NULL},
};
PyDoc_STRVAR(ZlibDecompressor_needs_input_doc,
"True if more input is needed before more decompressed data can be produced.");
+static PyObject *
+ZlibDecompressor_unused_data_get(PyObject *op, void *Py_UNUSED(ignored))
+{
+ ZlibDecompressor *self = ZlibDecompressor_CAST(op);
+ PyMutex_Lock(&self->mutex);
+ assert(self->unused_data != NULL);
+ PyObject *result = Py_NewRef(self->unused_data);
+ PyMutex_Unlock(&self->mutex);
+ return result;
+}
+
+static PyGetSetDef ZlibDecompressor_getset[] = {
+ {"unused_data", ZlibDecompressor_unused_data_get, NULL,
+ ZlibDecompressor_unused_data__doc__},
+ {NULL},
+};
+
static PyMemberDef ZlibDecompressor_members[] = {
{"eof", Py_T_BOOL, offsetof(ZlibDecompressor, eof),
Py_READONLY, ZlibDecompressor_eof__doc__},
- {"unused_data", Py_T_OBJECT_EX, offsetof(ZlibDecompressor, unused_data),
- Py_READONLY, ZlibDecompressor_unused_data__doc__},
{"needs_input", Py_T_BOOL, offsetof(ZlibDecompressor, needs_input), Py_READONLY,
ZlibDecompressor_needs_input_doc},
{NULL},
{Py_tp_traverse, compobject_traverse},
{Py_tp_methods, Decomp_methods},
{Py_tp_members, Decomp_members},
+ {Py_tp_getset, Decomp_getset},
{0, 0},
};
{Py_tp_dealloc, ZlibDecompressor_dealloc},
{Py_tp_traverse, ZlibDecompressor_traverse},
{Py_tp_members, ZlibDecompressor_members},
+ {Py_tp_getset, ZlibDecompressor_getset},
{Py_tp_new, zlib__ZlibDecompressor},
{Py_tp_doc, (char *)zlib__ZlibDecompressor__doc__},
{Py_tp_methods, ZlibDecompressor_methods},