]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-113884: Refactor `queue.SimpleQueue` to use a ring buffer to store items (#114259)
authormpage <mpage@meta.com>
Fri, 19 Jan 2024 12:17:51 +0000 (04:17 -0800)
committerGitHub <noreply@github.com>
Fri, 19 Jan 2024 12:17:51 +0000 (12:17 +0000)
Use a ring buffer instead of a Python list in order to simplify the
process of making queue.SimpleQueue thread-safe in free-threaded
builds. The ring buffer implementation has no places where critical
sections may be released.

Modules/_queuemodule.c

index 81a06cdb79a4f25f5d4525be3a01db77c68d356b..8fca3cdd0deb188177a3c400e2a5ba39e3af8629 100644 (file)
@@ -7,6 +7,7 @@
 #include "pycore_moduleobject.h"  // _PyModule_GetState()
 #include "pycore_time.h"          // _PyTime_t
 
+#include <stdbool.h>
 #include <stddef.h>               // offsetof()
 
 typedef struct {
@@ -25,12 +26,167 @@ static struct PyModuleDef queuemodule;
 #define simplequeue_get_state_by_type(type) \
     (simplequeue_get_state(PyType_GetModuleByDef(type, &queuemodule)))
 
+static const Py_ssize_t INITIAL_RING_BUF_CAPACITY = 8;
+
+typedef struct {
+    // Where to place the next item
+    Py_ssize_t put_idx;
+
+    // Where to get the next item
+    Py_ssize_t get_idx;
+
+    PyObject **items;
+
+    // Total number of items that may be stored
+    Py_ssize_t items_cap;
+
+    // Number of items stored
+    Py_ssize_t num_items;
+} RingBuf;
+
+static int
+RingBuf_Init(RingBuf *buf)
+{
+    buf->put_idx = 0;
+    buf->get_idx = 0;
+    buf->items_cap = INITIAL_RING_BUF_CAPACITY;
+    buf->num_items = 0;
+    buf->items = PyMem_Calloc(buf->items_cap, sizeof(PyObject *));
+    if (buf->items == NULL) {
+        PyErr_NoMemory();
+        return -1;
+    }
+    return 0;
+}
+
+static PyObject *
+RingBuf_At(RingBuf *buf, Py_ssize_t idx)
+{
+    assert(idx >= 0 && idx < buf->num_items);
+    return buf->items[(buf->get_idx + idx) % buf->items_cap];
+}
+
+static void
+RingBuf_Fini(RingBuf *buf)
+{
+    PyObject **items = buf->items;
+    Py_ssize_t num_items = buf->num_items;
+    Py_ssize_t cap = buf->items_cap;
+    Py_ssize_t idx = buf->get_idx;
+    buf->items = NULL;
+    buf->put_idx = 0;
+    buf->get_idx = 0;
+    buf->num_items = 0;
+    buf->items_cap = 0;
+    for (Py_ssize_t n = num_items; n > 0; idx = (idx + 1) % cap, n--) {
+        Py_DECREF(items[idx]);
+    }
+    PyMem_Free(items);
+}
+
+// Resize the underlying items array of buf to the new capacity and arrange
+// the items contiguously in the new items array.
+//
+// Returns -1 on allocation failure or 0 on success.
+static int
+resize_ringbuf(RingBuf *buf, Py_ssize_t capacity)
+{
+    Py_ssize_t new_capacity = Py_MAX(INITIAL_RING_BUF_CAPACITY, capacity);
+    if (new_capacity == buf->items_cap) {
+        return 0;
+    }
+    assert(buf->num_items <= new_capacity);
+
+    PyObject **new_items = PyMem_Calloc(new_capacity, sizeof(PyObject *));
+    if (new_items == NULL) {
+        return -1;
+    }
+
+    // Copy the "tail" of the old items array. This corresponds to "head" of
+    // the abstract ring buffer.
+    Py_ssize_t tail_size =
+        Py_MIN(buf->num_items, buf->items_cap - buf->get_idx);
+    if (tail_size > 0) {
+        memcpy(new_items, buf->items + buf->get_idx,
+               tail_size * sizeof(PyObject *));
+    }
+
+    // Copy the "head" of the old items array, if any. This corresponds to the
+    // "tail" of the abstract ring buffer.
+    Py_ssize_t head_size = buf->num_items - tail_size;
+    if (head_size > 0) {
+        memcpy(new_items + tail_size, buf->items,
+               head_size * sizeof(PyObject *));
+    }
+
+    PyMem_Free(buf->items);
+    buf->items = new_items;
+    buf->items_cap = new_capacity;
+    buf->get_idx = 0;
+    buf->put_idx = buf->num_items;
+
+    return 0;
+}
+
+// Returns a strong reference from the head of the buffer.
+static PyObject *
+RingBuf_Get(RingBuf *buf)
+{
+    assert(buf->num_items > 0);
+
+    if (buf->num_items < (buf->items_cap / 4)) {
+        // Items is less than 25% occupied, shrink it by 50%. This allows for
+        // growth without immediately needing to resize the underlying items
+        // array.
+        //
+        // It's safe it ignore allocation failures here; shrinking is an
+        // optimization that isn't required for correctness.
+        (void)resize_ringbuf(buf, buf->items_cap / 2);
+    }
+
+    PyObject *item = buf->items[buf->get_idx];
+    buf->items[buf->get_idx] = NULL;
+    buf->get_idx = (buf->get_idx + 1) % buf->items_cap;
+    buf->num_items--;
+    return item;
+}
+
+// Returns 0 on success or -1 if the buffer failed to grow
+static int
+RingBuf_Put(RingBuf *buf, PyObject *item)
+{
+    assert(buf->num_items <= buf->items_cap);
+
+    if (buf->num_items == buf->items_cap) {
+        // Buffer is full, grow it.
+        if (resize_ringbuf(buf, buf->items_cap * 2) < 0) {
+            PyErr_NoMemory();
+            return -1;
+        }
+    }
+    buf->items[buf->put_idx] = Py_NewRef(item);
+    buf->put_idx = (buf->put_idx + 1) % buf->items_cap;
+    buf->num_items++;
+    return 0;
+}
+
+static Py_ssize_t
+RingBuf_Len(RingBuf *buf)
+{
+    return buf->num_items;
+}
+
+static bool
+RingBuf_IsEmpty(RingBuf *buf)
+{
+    return buf->num_items == 0;
+}
+
 typedef struct {
     PyObject_HEAD
     PyThread_type_lock lock;
     int locked;
-    PyObject *lst;
-    Py_ssize_t lst_pos;
+    RingBuf buf;
     PyObject *weakreflist;
 } simplequeueobject;
 
@@ -43,7 +199,7 @@ class _queue.SimpleQueue "simplequeueobject *" "simplequeue_get_state_by_type(ty
 static int
 simplequeue_clear(simplequeueobject *self)
 {
-    Py_CLEAR(self->lst);
+    RingBuf_Fini(&self->buf);
     return 0;
 }
 
@@ -69,7 +225,10 @@ simplequeue_dealloc(simplequeueobject *self)
 static int
 simplequeue_traverse(simplequeueobject *self, visitproc visit, void *arg)
 {
-    Py_VISIT(self->lst);
+    RingBuf *buf = &self->buf;
+    for (Py_ssize_t i = 0, num_items = buf->num_items; i < num_items; i++) {
+        Py_VISIT(RingBuf_At(buf, i));
+    }
     Py_VISIT(Py_TYPE(self));
     return 0;
 }
@@ -90,15 +249,13 @@ simplequeue_new_impl(PyTypeObject *type)
     self = (simplequeueobject *) type->tp_alloc(type, 0);
     if (self != NULL) {
         self->weakreflist = NULL;
-        self->lst = PyList_New(0);
         self->lock = PyThread_allocate_lock();
-        self->lst_pos = 0;
         if (self->lock == NULL) {
             Py_DECREF(self);
             PyErr_SetString(PyExc_MemoryError, "can't allocate lock");
             return NULL;
         }
-        if (self->lst == NULL) {
+        if (RingBuf_Init(&self->buf) < 0) {
             Py_DECREF(self);
             return NULL;
         }
@@ -126,7 +283,7 @@ _queue_SimpleQueue_put_impl(simplequeueobject *self, PyObject *item,
 /*[clinic end generated code: output=4333136e88f90d8b input=6e601fa707a782d5]*/
 {
     /* BEGIN GIL-protected critical section */
-    if (PyList_Append(self->lst, item) < 0)
+    if (RingBuf_Put(&self->buf, item) < 0)
         return NULL;
     if (self->locked) {
         /* A get() may be waiting, wake it up */
@@ -155,33 +312,6 @@ _queue_SimpleQueue_put_nowait_impl(simplequeueobject *self, PyObject *item)
     return _queue_SimpleQueue_put_impl(self, item, 0, Py_None);
 }
 
-static PyObject *
-simplequeue_pop_item(simplequeueobject *self)
-{
-    Py_ssize_t count, n;
-    PyObject *item;
-
-    n = PyList_GET_SIZE(self->lst);
-    assert(self->lst_pos < n);
-
-    item = PyList_GET_ITEM(self->lst, self->lst_pos);
-    Py_INCREF(Py_None);
-    PyList_SET_ITEM(self->lst, self->lst_pos, Py_None);
-    self->lst_pos += 1;
-    count = n - self->lst_pos;
-    if (self->lst_pos > count) {
-        /* The list is more than 50% empty, reclaim space at the beginning */
-        if (PyList_SetSlice(self->lst, 0, self->lst_pos, NULL)) {
-            /* Undo pop */
-            self->lst_pos -= 1;
-            PyList_SET_ITEM(self->lst, self->lst_pos, item);
-            return NULL;
-        }
-        self->lst_pos = 0;
-    }
-    return item;
-}
-
 /*[clinic input]
 _queue.SimpleQueue.get
 
@@ -249,7 +379,7 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls,
      * So we simply try to acquire the lock in a loop, until the condition
      * (queue non-empty) becomes true.
      */
-    while (self->lst_pos == PyList_GET_SIZE(self->lst)) {
+    while (RingBuf_IsEmpty(&self->buf)) {
         /* First a simple non-blocking try without releasing the GIL */
         r = PyThread_acquire_lock_timed(self->lock, 0, 0);
         if (r == PY_LOCK_FAILURE && microseconds != 0) {
@@ -279,8 +409,7 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls,
     }
 
     /* BEGIN GIL-protected critical section */
-    assert(self->lst_pos < PyList_GET_SIZE(self->lst));
-    item = simplequeue_pop_item(self);
+    item = RingBuf_Get(&self->buf);
     if (self->locked) {
         PyThread_release_lock(self->lock);
         self->locked = 0;
@@ -320,7 +449,7 @@ static int
 _queue_SimpleQueue_empty_impl(simplequeueobject *self)
 /*[clinic end generated code: output=1a02a1b87c0ef838 input=1a98431c45fd66f9]*/
 {
-    return self->lst_pos == PyList_GET_SIZE(self->lst);
+    return RingBuf_IsEmpty(&self->buf);
 }
 
 /*[clinic input]
@@ -333,7 +462,7 @@ static Py_ssize_t
 _queue_SimpleQueue_qsize_impl(simplequeueobject *self)
 /*[clinic end generated code: output=f9dcd9d0a90e121e input=7a74852b407868a1]*/
 {
-    return PyList_GET_SIZE(self->lst) - self->lst_pos;
+    return RingBuf_Len(&self->buf);
 }
 
 static int