#include "Python.h"
#include "pycore_crossinterp.h" // struct _xid
+#define REGISTERS_HEAP_TYPES
#include "_interpreters_common.h"
+#undef REGISTERS_HEAP_TYPES
#define MODULE_NAME _xxinterpqueues
}
+static int
+ensure_highlevel_module_loaded(void)
+{
+ PyObject *highlevel = PyImport_ImportModule("interpreters.queues");
+ if (highlevel == NULL) {
+ PyErr_Clear();
+ highlevel = PyImport_ImportModule("test.support.interpreters.queues");
+ if (highlevel == NULL) {
+ return -1;
+ }
+ }
+ Py_DECREF(highlevel);
+ return 0;
+}
+
+
/* module state *************************************************************/
typedef struct {
{
/* external types */
if (state->queue_type != NULL) {
- (void)_PyCrossInterpreterData_UnregisterClass(state->queue_type);
+ (void)clear_xid_class(state->queue_type);
}
Py_CLEAR(state->queue_type);
// single-queue errors
#define ERR_QUEUE_EMPTY (-21)
#define ERR_QUEUE_FULL (-22)
+#define ERR_QUEUE_NEVER_BOUND (-23)
+
+static int ensure_external_exc_types(module_state *);
static int
resolve_module_errcode(module_state *state, int errcode, int64_t qid,
msg = PyUnicode_FromFormat("queue %" PRId64 " not found", qid);
break;
case ERR_QUEUE_EMPTY:
+ if (ensure_external_exc_types(state) < 0) {
+ return -1;
+ }
exctype = state->QueueEmpty;
msg = PyUnicode_FromFormat("queue %" PRId64 " is empty", qid);
break;
case ERR_QUEUE_FULL:
+ if (ensure_external_exc_types(state) < 0) {
+ return -1;
+ }
exctype = state->QueueFull;
msg = PyUnicode_FromFormat("queue %" PRId64 " is full", qid);
break;
+ case ERR_QUEUE_NEVER_BOUND:
+ exctype = state->QueueError;
+ msg = PyUnicode_FromFormat("queue %" PRId64 " never bound", qid);
+ break;
default:
PyErr_Format(PyExc_ValueError,
"unsupported error code %d", errcode);
#define PREFIX "test.support.interpreters."
#define ADD_EXCTYPE(NAME, BASE, DOC) \
+ assert(state->NAME == NULL); \
if (add_exctype(mod, &state->NAME, PREFIX #NAME, DOC, BASE) < 0) { \
return -1; \
}
ADD_EXCTYPE(QueueError, PyExc_RuntimeError,
"Indicates that a queue-related error happened.")
ADD_EXCTYPE(QueueNotFoundError, state->QueueError, NULL)
- ADD_EXCTYPE(QueueEmpty, state->QueueError, NULL)
- ADD_EXCTYPE(QueueFull, state->QueueError, NULL)
+ // QueueEmpty and QueueFull are set by set_external_exc_types().
+ state->QueueEmpty = NULL;
+ state->QueueFull = NULL;
#undef ADD_EXCTYPE
#undef PREFIX
return 0;
}
+static int
+set_external_exc_types(module_state *state,
+ PyObject *emptyerror, PyObject *fullerror)
+{
+ if (state->QueueEmpty != NULL) {
+ assert(state->QueueFull != NULL);
+ Py_CLEAR(state->QueueEmpty);
+ Py_CLEAR(state->QueueFull);
+ }
+ else {
+ assert(state->QueueFull == NULL);
+ }
+ assert(PyObject_IsSubclass(emptyerror, state->QueueError));
+ assert(PyObject_IsSubclass(fullerror, state->QueueError));
+ state->QueueEmpty = Py_NewRef(emptyerror);
+ state->QueueFull = Py_NewRef(fullerror);
+ return 0;
+}
+
+static int
+ensure_external_exc_types(module_state *state)
+{
+ if (state->QueueEmpty != NULL) {
+ assert(state->QueueFull != NULL);
+ return 0;
+ }
+ assert(state->QueueFull == NULL);
+
+ // Force the module to be loaded, to register the type.
+ if (ensure_highlevel_module_loaded() < 0) {
+ return -1;
+ }
+ assert(state->QueueEmpty != NULL);
+ assert(state->QueueFull != NULL);
+ return 0;
+}
+
static int
handle_queue_error(int err, PyObject *mod, int64_t qid)
{
/* the queue */
+
typedef struct _queue {
Py_ssize_t num_waiters; // protected by global lock
PyThread_type_lock mutex;
*queue = (_queue){0};
}
+static void _queue_free(_queue *);
+
static void
_queue_kill_and_wait(_queue *queue)
{
return ref;
}
+static void
+_queuerefs_clear(_queueref *head)
+{
+ _queueref *next = head;
+ while (next != NULL) {
+ _queueref *ref = next;
+ next = ref->next;
+
+#ifdef Py_DEBUG
+ int64_t qid = ref->qid;
+ fprintf(stderr, "queue %ld still exists\n", qid);
+#endif
+ _queue *queue = ref->queue;
+ GLOBAL_FREE(ref);
+
+ _queue_kill_and_wait(queue);
+#ifdef Py_DEBUG
+ if (queue->items.count > 0) {
+ fprintf(stderr, "queue %ld still holds %ld items\n",
+ qid, queue->items.count);
+ }
+#endif
+ _queue_free(queue);
+ }
+}
+
/* a collection of queues ***************************************************/
static void
_queues_fini(_queues *queues)
{
- assert(queues->count == 0);
- assert(queues->head == NULL);
+ if (queues->count > 0) {
+ PyThread_acquire_lock(queues->mutex, WAIT_LOCK);
+ assert((queues->count == 0) != (queues->head != NULL));
+ _queueref *head = queues->head;
+ queues->head = NULL;
+ queues->count = 0;
+ PyThread_release_lock(queues->mutex);
+ _queuerefs_clear(head);
+ }
if (queues->mutex != NULL) {
PyThread_free_lock(queues->mutex);
queues->mutex = NULL;
return res;
}
-static void _queue_free(_queue *);
-
-static void
+static int
_queues_decref(_queues *queues, int64_t qid)
{
+ int res = -1;
PyThread_acquire_lock(queues->mutex, WAIT_LOCK);
_queueref *prev = NULL;
_queueref *ref = _queuerefs_find(queues->head, qid, &prev);
if (ref == NULL) {
assert(!PyErr_Occurred());
- // Already destroyed.
- // XXX Warn?
+ res = ERR_QUEUE_NOT_FOUND;
+ goto finally;
+ }
+ if (ref->refcount == 0) {
+ res = ERR_QUEUE_NEVER_BOUND;
goto finally;
}
assert(ref->refcount > 0);
_queue_kill_and_wait(queue);
_queue_free(queue);
- return;
+ return 0;
}
+ res = 0;
finally:
PyThread_release_lock(queues->mutex);
+ return res;
}
struct queue_id_and_fmt {
PyObject *, _PyCrossInterpreterData *);
static int
-set_external_queue_type(PyObject *module, PyTypeObject *queue_type)
+set_external_queue_type(module_state *state, PyTypeObject *queue_type)
{
- module_state *state = get_module_state(module);
-
// Clear the old value if the .py module was reloaded.
if (state->queue_type != NULL) {
- (void)_PyCrossInterpreterData_UnregisterClass(
- state->queue_type);
+ (void)clear_xid_class(state->queue_type);
Py_CLEAR(state->queue_type);
}
PyTypeObject *cls = state->queue_type;
if (cls == NULL) {
// Force the module to be loaded, to register the type.
- PyObject *highlevel = PyImport_ImportModule("interpreters.queue");
- if (highlevel == NULL) {
- PyErr_Clear();
- highlevel = PyImport_ImportModule("test.support.interpreters.queue");
- if (highlevel == NULL) {
- return NULL;
- }
+ if (ensure_highlevel_module_loaded() < 0) {
+ return NULL;
}
- Py_DECREF(highlevel);
cls = state->queue_type;
assert(cls != NULL);
}
int64_t qid = ((struct _queueid_xid *)data)->qid;
PyMem_RawFree(data);
_queues *queues = _get_global_queues();
- _queues_decref(queues, qid);
+ int res = _queues_decref(queues, qid);
+ if (res == ERR_QUEUE_NOT_FOUND) {
+ // Already destroyed.
+ // XXX Warn?
+ }
+ else {
+ assert(res == 0);
+ }
}
static PyObject *
}
PyDoc_STRVAR(queuesmod_create_doc,
-"create() -> qid\n\
+"create(maxsize, fmt) -> qid\n\
\n\
Create a new cross-interpreter queue and return its unique generated ID.\n\
-It is a new reference as though bind() had been called on the queue.");
+It is a new reference as though bind() had been called on the queue.\n\
+\n\
+The caller is responsible for calling destroy() for the new queue\n\
+before the runtime is finalized.");
static PyObject *
queuesmod_destroy(PyObject *self, PyObject *args, PyObject *kwds)
}
PyDoc_STRVAR(queuesmod_list_all_doc,
-"list_all() -> [qid]\n\
+"list_all() -> [(qid, fmt)]\n\
\n\
-Return the list of IDs for all queues.");
+Return the list of IDs for all queues.\n\
+Each corresponding default format is also included.");
static PyObject *
queuesmod_put(PyObject *self, PyObject *args, PyObject *kwds)
/* Queue up the object. */
int err = queue_put(&_globals.queues, qid, obj, fmt);
+ // This is the only place that raises QueueFull.
if (handle_queue_error(err, self, qid)) {
return NULL;
}
}
PyDoc_STRVAR(queuesmod_put_doc,
-"put(qid, obj, sharedonly=False)\n\
+"put(qid, obj, fmt)\n\
\n\
Add the object's data to the queue.");
static PyObject *
queuesmod_get(PyObject *self, PyObject *args, PyObject *kwds)
{
- static char *kwlist[] = {"qid", "default", NULL};
+ static char *kwlist[] = {"qid", NULL};
qidarg_converter_data qidarg;
- PyObject *dflt = NULL;
- if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&|O:get", kwlist,
- qidarg_converter, &qidarg, &dflt)) {
+ if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&:get", kwlist,
+ qidarg_converter, &qidarg)) {
return NULL;
}
int64_t qid = qidarg.id;
PyObject *obj = NULL;
int fmt = 0;
int err = queue_get(&_globals.queues, qid, &obj, &fmt);
- if (err == ERR_QUEUE_EMPTY && dflt != NULL) {
- assert(obj == NULL);
- obj = Py_NewRef(dflt);
- }
- else if (handle_queue_error(err, self, qid)) {
+ // This is the only place that raises QueueEmpty.
+ if (handle_queue_error(err, self, qid)) {
return NULL;
}
}
PyDoc_STRVAR(queuesmod_get_doc,
-"get(qid, [default]) -> obj\n\
+"get(qid) -> (obj, fmt)\n\
\n\
Return a new object from the data at the front of the queue.\n\
+The object's format is also returned.\n\
\n\
-If there is nothing to receive then raise QueueEmpty, unless\n\
-a default value is provided. In that case return it.");
+If there is nothing to receive then raise QueueEmpty.");
static PyObject *
queuesmod_bind(PyObject *self, PyObject *args, PyObject *kwds)
// XXX Check module state if bound already.
// XXX Update module state.
- _queues_decref(&_globals.queues, qid);
+ int err = _queues_decref(&_globals.queues, qid);
+ if (handle_queue_error(err, self, qid)) {
+ return NULL;
+ }
Py_RETURN_NONE;
}
Return the maximum number of items in the queue.");
static PyObject *
-queuesmod_get_default_fmt(PyObject *self, PyObject *args, PyObject *kwds)
+queuesmod_get_queue_defaults(PyObject *self, PyObject *args, PyObject *kwds)
{
static char *kwlist[] = {"qid", NULL};
qidarg_converter_data qidarg;
if (!PyArg_ParseTupleAndKeywords(args, kwds,
- "O&:get_default_fmt", kwlist,
+ "O&:get_queue_defaults", kwlist,
qidarg_converter, &qidarg)) {
return NULL;
}
}
int fmt = queue->fmt;
_queue_unmark_waiter(queue, _globals.queues.mutex);
- return PyLong_FromLong(fmt);
+
+ PyObject *fmt_obj = PyLong_FromLong(fmt);
+ if (fmt_obj == NULL) {
+ return NULL;
+ }
+ // For now queues only have one default.
+ PyObject *res = PyTuple_Pack(1, fmt_obj);
+ Py_DECREF(fmt_obj);
+ return res;
}
-PyDoc_STRVAR(queuesmod_get_default_fmt_doc,
-"get_default_fmt(qid)\n\
+PyDoc_STRVAR(queuesmod_get_queue_defaults_doc,
+"get_queue_defaults(qid)\n\
\n\
-Return the default format to use for the queue.");
+Return the queue's default values, set when it was created.");
static PyObject *
queuesmod_is_full(PyObject *self, PyObject *args, PyObject *kwds)
Return the number of items in the queue.");
static PyObject *
-queuesmod__register_queue_type(PyObject *self, PyObject *args, PyObject *kwds)
+queuesmod__register_heap_types(PyObject *self, PyObject *args, PyObject *kwds)
{
- static char *kwlist[] = {"queuetype", NULL};
+ static char *kwlist[] = {"queuetype", "emptyerror", "fullerror", NULL};
PyObject *queuetype;
+ PyObject *emptyerror;
+ PyObject *fullerror;
if (!PyArg_ParseTupleAndKeywords(args, kwds,
- "O:_register_queue_type", kwlist,
- &queuetype)) {
+ "OOO:_register_heap_types", kwlist,
+ &queuetype, &emptyerror, &fullerror)) {
return NULL;
}
if (!PyType_Check(queuetype)) {
- PyErr_SetString(PyExc_TypeError, "expected a type for 'queuetype'");
+ PyErr_SetString(PyExc_TypeError,
+ "expected a type for 'queuetype'");
+ return NULL;
+ }
+ if (!PyExceptionClass_Check(emptyerror)) {
+ PyErr_SetString(PyExc_TypeError,
+ "expected an exception type for 'emptyerror'");
+ return NULL;
+ }
+ if (!PyExceptionClass_Check(fullerror)) {
+ PyErr_SetString(PyExc_TypeError,
+ "expected an exception type for 'fullerror'");
return NULL;
}
- PyTypeObject *cls_queue = (PyTypeObject *)queuetype;
- if (set_external_queue_type(self, cls_queue) < 0) {
+ module_state *state = get_module_state(self);
+
+ if (set_external_queue_type(state, (PyTypeObject *)queuetype) < 0) {
+ return NULL;
+ }
+ if (set_external_exc_types(state, emptyerror, fullerror) < 0) {
return NULL;
}
METH_VARARGS | METH_KEYWORDS, queuesmod_destroy_doc},
{"list_all", queuesmod_list_all,
METH_NOARGS, queuesmod_list_all_doc},
- {"put", _PyCFunction_CAST(queuesmod_put),
+ {"put", _PyCFunction_CAST(queuesmod_put),
METH_VARARGS | METH_KEYWORDS, queuesmod_put_doc},
- {"get", _PyCFunction_CAST(queuesmod_get),
+ {"get", _PyCFunction_CAST(queuesmod_get),
METH_VARARGS | METH_KEYWORDS, queuesmod_get_doc},
- {"bind", _PyCFunction_CAST(queuesmod_bind),
+ {"bind", _PyCFunction_CAST(queuesmod_bind),
METH_VARARGS | METH_KEYWORDS, queuesmod_bind_doc},
{"release", _PyCFunction_CAST(queuesmod_release),
METH_VARARGS | METH_KEYWORDS, queuesmod_release_doc},
{"get_maxsize", _PyCFunction_CAST(queuesmod_get_maxsize),
METH_VARARGS | METH_KEYWORDS, queuesmod_get_maxsize_doc},
- {"get_default_fmt", _PyCFunction_CAST(queuesmod_get_default_fmt),
- METH_VARARGS | METH_KEYWORDS, queuesmod_get_default_fmt_doc},
+ {"get_queue_defaults", _PyCFunction_CAST(queuesmod_get_queue_defaults),
+ METH_VARARGS | METH_KEYWORDS, queuesmod_get_queue_defaults_doc},
{"is_full", _PyCFunction_CAST(queuesmod_is_full),
METH_VARARGS | METH_KEYWORDS, queuesmod_is_full_doc},
{"get_count", _PyCFunction_CAST(queuesmod_get_count),
METH_VARARGS | METH_KEYWORDS, queuesmod_get_count_doc},
- {"_register_queue_type", _PyCFunction_CAST(queuesmod__register_queue_type),
+ {"_register_heap_types", _PyCFunction_CAST(queuesmod__register_heap_types),
METH_VARARGS | METH_KEYWORDS, NULL},
{NULL, NULL} /* sentinel */