##################################
# helpers
+def recv_wait(cid):
+ while True:
+ try:
+ return channels.recv(cid)
+ except channels.ChannelEmptyError:
+ time.sleep(0.1)
+
#@contextmanager
#def run_threaded(id, source, **shared):
# def run():
def _run_action(cid, action, end, state):
if action == 'use':
if end == 'send':
- channels.send(cid, b'spam')
+ channels.send(cid, b'spam', blocking=False)
return state.incr()
elif end == 'recv':
if not state.pending:
chan = channels.create()
obj = channels.create()
- channels.send(chan, obj)
+ channels.send(chan, obj, blocking=False)
got = channels.recv(chan)
self.assertEqual(got, obj)
"""Test basic listing channel interpreters."""
interp0 = interpreters.get_main()
cid = channels.create()
- channels.send(cid, "send")
+ channels.send(cid, "send", blocking=False)
# Test for a channel that has one end associated to an interpreter.
send_interps = channels.list_interpreters(cid, send=True)
recv_interps = channels.list_interpreters(cid, send=False)
interp3 = interpreters.create()
cid = channels.create()
- channels.send(cid, "send")
+ channels.send(cid, "send", blocking=False)
_run_output(interp1, dedent(f"""
import _xxinterpchannels as _channels
- _channels.send({cid}, "send")
+ _channels.send({cid}, "send", blocking=False)
"""))
_run_output(interp2, dedent(f"""
import _xxinterpchannels as _channels
interp0 = interpreters.get_main()
interp1 = interpreters.create()
cid = channels.create()
- channels.send(cid, "send")
+ channels.send(cid, "send", blocking=False)
_run_output(interp1, dedent(f"""
import _xxinterpchannels as _channels
obj = _channels.recv({cid})
interp1 = interpreters.create()
interp2 = interpreters.create()
cid = channels.create()
- channels.send(cid, "data")
+ channels.send(cid, "data", blocking=False)
_run_output(interp1, dedent(f"""
import _xxinterpchannels as _channels
obj = _channels.recv({cid})
"""))
- channels.send(cid, "data")
+ channels.send(cid, "data", blocking=False)
_run_output(interp2, dedent(f"""
import _xxinterpchannels as _channels
obj = _channels.recv({cid})
interp1 = interpreters.create()
cid = channels.create()
# Put something in the channel so that it's not empty.
- channels.send(cid, "send")
+ channels.send(cid, "send", blocking=False)
# Check initial state.
send_interps = channels.list_interpreters(cid, send=True)
interp1 = interpreters.create()
cid = channels.create()
# Put something in the channel so that it's not empty.
- channels.send(cid, "send")
+ channels.send(cid, "send", blocking=False)
# Check initial state.
send_interps = channels.list_interpreters(cid, send=True)
def test_send_recv_main(self):
cid = channels.create()
orig = b'spam'
- channels.send(cid, orig)
+ channels.send(cid, orig, blocking=False)
obj = channels.recv(cid)
self.assertEqual(obj, orig)
import _xxinterpchannels as _channels
cid = _channels.create()
orig = b'spam'
- _channels.send(cid, orig)
+ _channels.send(cid, orig, blocking=False)
obj = _channels.recv(cid)
assert obj is not orig
assert obj == orig
id1 = interpreters.create()
out = _run_output(id1, dedent(f"""
import _xxinterpchannels as _channels
- _channels.send({cid}, b'spam')
+ _channels.send({cid}, b'spam', blocking=False)
"""))
obj = channels.recv(cid)
cid = channels.create()
def f():
- while True:
- try:
- obj = channels.recv(cid)
- break
- except channels.ChannelEmptyError:
- time.sleep(0.1)
+ obj = recv_wait(cid)
channels.send(cid, obj)
t = threading.Thread(target=f)
t.start()
channels.send(cid, b'spam')
+ obj = recv_wait(cid)
t.join()
- obj = channels.recv(cid)
self.assertEqual(obj, b'spam')
t.start()
channels.send(cid, b'spam')
+ obj = recv_wait(cid)
t.join()
- obj = channels.recv(cid)
self.assertEqual(obj, b'eggs')
default = object()
cid = channels.create()
obj1 = channels.recv(cid, default)
- channels.send(cid, None)
- channels.send(cid, 1)
- channels.send(cid, b'spam')
- channels.send(cid, b'eggs')
+ channels.send(cid, None, blocking=False)
+ channels.send(cid, 1, blocking=False)
+ channels.send(cid, b'spam', blocking=False)
+ channels.send(cid, b'eggs', blocking=False)
obj2 = channels.recv(cid, default)
obj3 = channels.recv(cid, default)
obj4 = channels.recv(cid)
interp = interpreters.create()
interpreters.run_string(interp, dedent(f"""
import _xxinterpchannels as _channels
- _channels.send({cid1}, b'spam')
+ _channels.send({cid1}, b'spam', blocking=False)
"""))
interpreters.destroy(interp)
interp = interpreters.create()
interpreters.run_string(interp, dedent(f"""
import _xxinterpchannels as _channels
- _channels.send({cid2}, b'spam')
+ _channels.send({cid2}, b'spam', blocking=False)
"""))
- channels.send(cid2, b'eggs')
+ channels.send(cid2, b'eggs', blocking=False)
interpreters.destroy(interp)
channels.recv(cid2)
def test_send_buffer(self):
buf = bytearray(b'spamspamspam')
cid = channels.create()
- channels.send_buffer(cid, buf)
+ channels.send_buffer(cid, buf, blocking=False)
obj = channels.recv(cid)
self.assertIsNot(obj, buf)
]
for obj in objects:
with self.subTest(obj):
- channels.send(cid, obj)
+ channels.send(cid, obj, blocking=False)
got = channels.recv(cid)
self.assertEqual(got, obj)
out = _run_output(interp, dedent("""
import _xxinterpchannels as _channels
print(cid.end)
- _channels.send(cid, b'spam')
+ _channels.send(cid, b'spam', blocking=False)
"""),
dict(cid=cid.send))
obj = channels.recv(cid)
out = _run_output(interp, dedent("""
import _xxinterpchannels as _channels
print(chan.id.end)
- _channels.send(chan.id, b'spam')
+ _channels.send(chan.id, b'spam', blocking=False)
"""),
dict(chan=cid.send))
obj = channels.recv(cid)
def test_close_single_user(self):
cid = channels.create()
- channels.send(cid, b'spam')
+ channels.send(cid, b'spam', blocking=False)
channels.recv(cid)
channels.close(cid)
id2 = interpreters.create()
interpreters.run_string(id1, dedent(f"""
import _xxinterpchannels as _channels
- _channels.send({cid}, b'spam')
+ _channels.send({cid}, b'spam', blocking=False)
"""))
interpreters.run_string(id2, dedent(f"""
import _xxinterpchannels as _channels
def test_close_multiple_times(self):
cid = channels.create()
- channels.send(cid, b'spam')
+ channels.send(cid, b'spam', blocking=False)
channels.recv(cid)
channels.close(cid)
for send, recv in tests:
with self.subTest((send, recv)):
cid = channels.create()
- channels.send(cid, b'spam')
+ channels.send(cid, b'spam', blocking=False)
channels.recv(cid)
channels.close(cid, send=send, recv=recv)
def test_close_defaults_with_unused_items(self):
cid = channels.create()
- channels.send(cid, b'spam')
- channels.send(cid, b'ham')
+ channels.send(cid, b'spam', blocking=False)
+ channels.send(cid, b'ham', blocking=False)
with self.assertRaises(channels.ChannelNotEmptyError):
channels.close(cid)
channels.recv(cid)
- channels.send(cid, b'eggs')
+ channels.send(cid, b'eggs', blocking=False)
def test_close_recv_with_unused_items_unforced(self):
cid = channels.create()
- channels.send(cid, b'spam')
- channels.send(cid, b'ham')
+ channels.send(cid, b'spam', blocking=False)
+ channels.send(cid, b'ham', blocking=False)
with self.assertRaises(channels.ChannelNotEmptyError):
channels.close(cid, recv=True)
channels.recv(cid)
- channels.send(cid, b'eggs')
+ channels.send(cid, b'eggs', blocking=False)
channels.recv(cid)
channels.recv(cid)
channels.close(cid, recv=True)
def test_close_send_with_unused_items_unforced(self):
cid = channels.create()
- channels.send(cid, b'spam')
- channels.send(cid, b'ham')
+ channels.send(cid, b'spam', blocking=False)
+ channels.send(cid, b'ham', blocking=False)
channels.close(cid, send=True)
with self.assertRaises(channels.ChannelClosedError):
def test_close_both_with_unused_items_unforced(self):
cid = channels.create()
- channels.send(cid, b'spam')
- channels.send(cid, b'ham')
+ channels.send(cid, b'spam', blocking=False)
+ channels.send(cid, b'ham', blocking=False)
with self.assertRaises(channels.ChannelNotEmptyError):
channels.close(cid, recv=True, send=True)
channels.recv(cid)
- channels.send(cid, b'eggs')
+ channels.send(cid, b'eggs', blocking=False)
channels.recv(cid)
channels.recv(cid)
channels.close(cid, recv=True)
def test_close_recv_with_unused_items_forced(self):
cid = channels.create()
- channels.send(cid, b'spam')
- channels.send(cid, b'ham')
+ channels.send(cid, b'spam', blocking=False)
+ channels.send(cid, b'ham', blocking=False)
channels.close(cid, recv=True, force=True)
with self.assertRaises(channels.ChannelClosedError):
def test_close_send_with_unused_items_forced(self):
cid = channels.create()
- channels.send(cid, b'spam')
- channels.send(cid, b'ham')
+ channels.send(cid, b'spam', blocking=False)
+ channels.send(cid, b'ham', blocking=False)
channels.close(cid, send=True, force=True)
with self.assertRaises(channels.ChannelClosedError):
def test_close_both_with_unused_items_forced(self):
cid = channels.create()
- channels.send(cid, b'spam')
- channels.send(cid, b'ham')
+ channels.send(cid, b'spam', blocking=False)
+ channels.send(cid, b'ham', blocking=False)
channels.close(cid, send=True, recv=True, force=True)
with self.assertRaises(channels.ChannelClosedError):
def test_close_by_unassociated_interp(self):
cid = channels.create()
- channels.send(cid, b'spam')
+ channels.send(cid, b'spam', blocking=False)
interp = interpreters.create()
interpreters.run_string(interp, dedent(f"""
import _xxinterpchannels as _channels
def test_close_used_multiple_times_by_single_user(self):
cid = channels.create()
- channels.send(cid, b'spam')
- channels.send(cid, b'spam')
- channels.send(cid, b'spam')
+ channels.send(cid, b'spam', blocking=False)
+ channels.send(cid, b'spam', blocking=False)
+ channels.send(cid, b'spam', blocking=False)
channels.recv(cid)
channels.close(cid, force=True)
def test_single_user(self):
cid = channels.create()
- channels.send(cid, b'spam')
+ channels.send(cid, b'spam', blocking=False)
channels.recv(cid)
channels.release(cid, send=True, recv=True)
id2 = interpreters.create()
interpreters.run_string(id1, dedent(f"""
import _xxinterpchannels as _channels
- _channels.send({cid}, b'spam')
+ _channels.send({cid}, b'spam', blocking=False)
"""))
out = _run_output(id2, dedent(f"""
import _xxinterpchannels as _channels
def test_no_kwargs(self):
cid = channels.create()
- channels.send(cid, b'spam')
+ channels.send(cid, b'spam', blocking=False)
channels.recv(cid)
channels.release(cid)
def test_multiple_times(self):
cid = channels.create()
- channels.send(cid, b'spam')
+ channels.send(cid, b'spam', blocking=False)
channels.recv(cid)
channels.release(cid, send=True, recv=True)
def test_with_unused_items(self):
cid = channels.create()
- channels.send(cid, b'spam')
- channels.send(cid, b'ham')
+ channels.send(cid, b'spam', blocking=False)
+ channels.send(cid, b'ham', blocking=False)
channels.release(cid, send=True, recv=True)
with self.assertRaises(channels.ChannelClosedError):
def test_by_unassociated_interp(self):
cid = channels.create()
- channels.send(cid, b'spam')
+ channels.send(cid, b'spam', blocking=False)
interp = interpreters.create()
interpreters.run_string(interp, dedent(f"""
import _xxinterpchannels as _channels
interp = interpreters.create()
interpreters.run_string(interp, dedent(f"""
import _xxinterpchannels as _channels
- obj = _channels.send({cid}, b'spam')
+ obj = _channels.send({cid}, b'spam', blocking=False)
_channels.release({cid})
"""))
def test_partially(self):
# XXX Is partial close too weird/confusing?
cid = channels.create()
- channels.send(cid, None)
+ channels.send(cid, None, blocking=False)
channels.recv(cid)
- channels.send(cid, b'spam')
+ channels.send(cid, b'spam', blocking=False)
channels.release(cid, send=True)
obj = channels.recv(cid)
def test_used_multiple_times_by_single_user(self):
cid = channels.create()
- channels.send(cid, b'spam')
- channels.send(cid, b'spam')
- channels.send(cid, b'spam')
+ channels.send(cid, b'spam', blocking=False)
+ channels.send(cid, b'spam', blocking=False)
+ channels.send(cid, b'spam', blocking=False)
channels.recv(cid)
channels.release(cid, send=True, recv=True)
cid = _xxsubchannels.create()
# We purposefully send back an int to avoid tying the
# channel to the other interpreter.
- _xxsubchannels.send({ch}, int(cid))
+ _xxsubchannels.send({ch}, int(cid), blocking=False)
del _xxsubinterpreters
""")
self._cid = channels.recv(ch)
{repr(fix.state)},
hideclosed={hideclosed},
)
- channels.send({_cid}, result.pending.to_bytes(1, 'little'))
- channels.send({_cid}, b'X' if result.closed else b'')
+ channels.send({_cid}, result.pending.to_bytes(1, 'little'), blocking=False)
+ channels.send({_cid}, b'X' if result.closed else b'', blocking=False)
""")
result = ChannelState(
pending=int.from_bytes(channels.recv(_cid), 'little'),
""")
run_interp(interp.id, """
with helpers.expect_channel_closed():
- channels.send(cid, b'spam')
+ channels.send(cid, b'spam', blocking=False)
""")
run_interp(interp.id, """
with helpers.expect_channel_closed():
return cls;
}
+static void
+wait_for_lock(PyThread_type_lock mutex)
+{
+ Py_BEGIN_ALLOW_THREADS
+ // XXX Handle eintr, etc.
+ PyThread_acquire_lock(mutex, WAIT_LOCK);
+ Py_END_ALLOW_THREADS
+
+ PyThread_release_lock(mutex);
+}
+
/* Cross-interpreter Buffer Views *******************************************/
typedef struct _channelitem {
_PyCrossInterpreterData *data;
+ PyThread_type_lock recv_mutex;
struct _channelitem *next;
} _channelitem;
}
static _PyCrossInterpreterData *
-_channelitem_popped(_channelitem *item)
+_channelitem_popped(_channelitem *item, PyThread_type_lock *recv_mutex)
{
_PyCrossInterpreterData *data = item->data;
item->data = NULL;
+ *recv_mutex = item->recv_mutex;
_channelitem_free(item);
return data;
}
}
static int
-_channelqueue_put(_channelqueue *queue, _PyCrossInterpreterData *data)
+_channelqueue_put(_channelqueue *queue, _PyCrossInterpreterData *data,
+ PyThread_type_lock recv_mutex)
{
_channelitem *item = _channelitem_new();
if (item == NULL) {
return -1;
}
item->data = data;
+ item->recv_mutex = recv_mutex;
queue->count += 1;
if (queue->first == NULL) {
}
static _PyCrossInterpreterData *
-_channelqueue_get(_channelqueue *queue)
+_channelqueue_get(_channelqueue *queue, PyThread_type_lock *recv_mutex)
{
_channelitem *item = queue->first;
if (item == NULL) {
}
queue->count -= 1;
- return _channelitem_popped(item);
+ return _channelitem_popped(item, recv_mutex);
}
static void
static int
_channel_add(_PyChannelState *chan, int64_t interp,
- _PyCrossInterpreterData *data)
+ _PyCrossInterpreterData *data, PyThread_type_lock recv_mutex)
{
int res = -1;
PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
goto done;
}
- if (_channelqueue_put(chan->queue, data) != 0) {
+ if (_channelqueue_put(chan->queue, data, recv_mutex) != 0) {
goto done;
}
goto done;
}
- _PyCrossInterpreterData *data = _channelqueue_get(chan->queue);
+ PyThread_type_lock recv_mutex = NULL;
+ _PyCrossInterpreterData *data = _channelqueue_get(chan->queue, &recv_mutex);
if (data == NULL && !PyErr_Occurred() && chan->closing != NULL) {
chan->open = 0;
}
*res = data;
+ if (recv_mutex != NULL) {
+ PyThread_release_lock(recv_mutex);
+ }
+
done:
PyThread_release_lock(chan->mutex);
if (chan->queue->count == 0) {
}
static int
-_channel_send(_channels *channels, int64_t id, PyObject *obj)
+_channel_send(_channels *channels, int64_t id, PyObject *obj,
+ PyThread_type_lock recv_mutex)
{
PyInterpreterState *interp = _get_current_interp();
if (interp == NULL) {
}
// Add the data to the channel.
- int res = _channel_add(chan, PyInterpreterState_GetID(interp), data);
+ int res = _channel_add(chan, PyInterpreterState_GetID(interp), data,
+ recv_mutex);
PyThread_release_lock(mutex);
if (res != 0) {
// We may chain an exception here:
static PyObject *
channel_send(PyObject *self, PyObject *args, PyObject *kwds)
{
- static char *kwlist[] = {"cid", "obj", NULL};
+ // XXX Add a timeout arg.
+ static char *kwlist[] = {"cid", "obj", "blocking", NULL};
int64_t cid;
struct channel_id_converter_data cid_data = {
.module = self,
};
PyObject *obj;
- if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O:channel_send", kwlist,
- channel_id_converter, &cid_data, &obj)) {
+ int blocking = 1;
+ if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O|$p:channel_send", kwlist,
+ channel_id_converter, &cid_data, &obj,
+ &blocking)) {
return NULL;
}
cid = cid_data.cid;
- int err = _channel_send(&_globals.channels, cid, obj);
- if (handle_channel_error(err, self, cid)) {
- return NULL;
+ if (blocking) {
+ PyThread_type_lock mutex = PyThread_allocate_lock();
+ if (mutex == NULL) {
+ PyErr_NoMemory();
+ return NULL;
+ }
+ PyThread_acquire_lock(mutex, WAIT_LOCK);
+
+ /* Queue up the object. */
+ int err = _channel_send(&_globals.channels, cid, obj, mutex);
+ if (handle_channel_error(err, self, cid)) {
+ PyThread_release_lock(mutex);
+ return NULL;
+ }
+
+ /* Wait until the object is received. */
+ wait_for_lock(mutex);
+ }
+ else {
+ /* Queue up the object. */
+ int err = _channel_send(&_globals.channels, cid, obj, NULL);
+ if (handle_channel_error(err, self, cid)) {
+ return NULL;
+ }
}
+
Py_RETURN_NONE;
}
PyDoc_STRVAR(channel_send_doc,
-"channel_send(cid, obj)\n\
+"channel_send(cid, obj, blocking=True)\n\
\n\
-Add the object's data to the channel's queue.");
+Add the object's data to the channel's queue.\n\
+By default this waits for the object to be received.");
static PyObject *
channel_send_buffer(PyObject *self, PyObject *args, PyObject *kwds)
{
- static char *kwlist[] = {"cid", "obj", NULL};
+ static char *kwlist[] = {"cid", "obj", "blocking", NULL};
int64_t cid;
struct channel_id_converter_data cid_data = {
.module = self,
};
PyObject *obj;
+ int blocking = 1;
if (!PyArg_ParseTupleAndKeywords(args, kwds,
- "O&O:channel_send_buffer", kwlist,
- channel_id_converter, &cid_data, &obj)) {
+ "O&O|$p:channel_send_buffer", kwlist,
+ channel_id_converter, &cid_data, &obj,
+ &blocking)) {
return NULL;
}
cid = cid_data.cid;
return NULL;
}
- int err = _channel_send(&_globals.channels, cid, tempobj);
- Py_DECREF(tempobj);
- if (handle_channel_error(err, self, cid)) {
- return NULL;
+ if (blocking) {
+ PyThread_type_lock mutex = PyThread_allocate_lock();
+ if (mutex == NULL) {
+ Py_DECREF(tempobj);
+ PyErr_NoMemory();
+ return NULL;
+ }
+ PyThread_acquire_lock(mutex, WAIT_LOCK);
+
+ /* Queue up the buffer. */
+ int err = _channel_send(&_globals.channels, cid, tempobj, mutex);
+ Py_DECREF(tempobj);
+ if (handle_channel_error(err, self, cid)) {
+ PyThread_acquire_lock(mutex, WAIT_LOCK);
+ return NULL;
+ }
+
+ /* Wait until the buffer is received. */
+ wait_for_lock(mutex);
}
+ else {
+ /* Queue up the buffer. */
+ int err = _channel_send(&_globals.channels, cid, tempobj, NULL);
+ Py_DECREF(tempobj);
+ if (handle_channel_error(err, self, cid)) {
+ return NULL;
+ }
+ }
+
Py_RETURN_NONE;
}
PyDoc_STRVAR(channel_send_buffer_doc,
-"channel_send_buffer(cid, obj)\n\
+"channel_send_buffer(cid, obj, blocking=True)\n\
\n\
-Add the object's buffer to the channel's queue.");
+Add the object's buffer to the channel's queue.\n\
+By default this waits for the object to be received.");
static PyObject *
channel_recv(PyObject *self, PyObject *args, PyObject *kwds)