from test.support import import_helper
-from test.test__xxsubinterpreters import (
+from test.test__interpreters import (
_interpreters,
_run_output,
clean_up_interpreters,
)
-channels = import_helper.import_module('_xxinterpchannels')
+_channels = import_helper.import_module('_interpchannels')
# Additional tests are found in Lib/test/test_interpreters/test_channels.py.
def recv_wait(cid):
while True:
try:
- return channels.recv(cid)
- except channels.ChannelEmptyError:
+ return _channels.recv(cid)
+ except _channels.ChannelEmptyError:
time.sleep(0.1)
#@contextmanager
def expect_channel_closed():
try:
yield
- except channels.ChannelClosedError:
+ except _channels.ChannelClosedError:
pass
else:
assert False, 'channel not closed'
try:
result = _run_action(cid, action, end, state)
- except channels.ChannelClosedError:
+ except _channels.ChannelClosedError:
if not hideclosed and not expectfail:
raise
result = state.close()
def _run_action(cid, action, end, state):
if action == 'use':
if end == 'send':
- channels.send(cid, b'spam', blocking=False)
+ _channels.send(cid, b'spam', blocking=False)
return state.incr()
elif end == 'recv':
if not state.pending:
try:
- channels.recv(cid)
- except channels.ChannelEmptyError:
+ _channels.recv(cid)
+ except _channels.ChannelEmptyError:
return state
else:
raise Exception('expected ChannelEmptyError')
else:
- channels.recv(cid)
+ _channels.recv(cid)
return state.decr()
else:
raise ValueError(end)
kwargs = {}
if end in ('recv', 'send'):
kwargs[end] = True
- channels.close(cid, **kwargs)
+ _channels.close(cid, **kwargs)
return state.close()
elif action == 'force-close':
kwargs = {
}
if end in ('recv', 'send'):
kwargs[end] = True
- channels.close(cid, **kwargs)
+ _channels.close(cid, **kwargs)
return state.close(force=True)
else:
raise ValueError(action)
def clean_up_channels():
- for cid in channels.list_all():
+ for cid in _channels.list_all():
try:
- channels.destroy(cid)
- except channels.ChannelNotFoundError:
+ _channels.destroy(cid)
+ except _channels.ChannelNotFoundError:
pass # already destroyed
class ChannelIDTests(TestBase):
def test_default_kwargs(self):
- cid = channels._channel_id(10, force=True)
+ cid = _channels._channel_id(10, force=True)
self.assertEqual(int(cid), 10)
self.assertEqual(cid.end, 'both')
def test_with_kwargs(self):
- cid = channels._channel_id(10, send=True, force=True)
+ cid = _channels._channel_id(10, send=True, force=True)
self.assertEqual(cid.end, 'send')
- cid = channels._channel_id(10, send=True, recv=False, force=True)
+ cid = _channels._channel_id(10, send=True, recv=False, force=True)
self.assertEqual(cid.end, 'send')
- cid = channels._channel_id(10, recv=True, force=True)
+ cid = _channels._channel_id(10, recv=True, force=True)
self.assertEqual(cid.end, 'recv')
- cid = channels._channel_id(10, recv=True, send=False, force=True)
+ cid = _channels._channel_id(10, recv=True, send=False, force=True)
self.assertEqual(cid.end, 'recv')
- cid = channels._channel_id(10, send=True, recv=True, force=True)
+ cid = _channels._channel_id(10, send=True, recv=True, force=True)
self.assertEqual(cid.end, 'both')
def test_coerce_id(self):
def __index__(self):
return 10
- cid = channels._channel_id(Int(), force=True)
+ cid = _channels._channel_id(Int(), force=True)
self.assertEqual(int(cid), 10)
def test_bad_id(self):
- self.assertRaises(TypeError, channels._channel_id, object())
- self.assertRaises(TypeError, channels._channel_id, 10.0)
- self.assertRaises(TypeError, channels._channel_id, '10')
- self.assertRaises(TypeError, channels._channel_id, b'10')
- self.assertRaises(ValueError, channels._channel_id, -1)
- self.assertRaises(OverflowError, channels._channel_id, 2**64)
+ self.assertRaises(TypeError, _channels._channel_id, object())
+ self.assertRaises(TypeError, _channels._channel_id, 10.0)
+ self.assertRaises(TypeError, _channels._channel_id, '10')
+ self.assertRaises(TypeError, _channels._channel_id, b'10')
+ self.assertRaises(ValueError, _channels._channel_id, -1)
+ self.assertRaises(OverflowError, _channels._channel_id, 2**64)
def test_bad_kwargs(self):
with self.assertRaises(ValueError):
- channels._channel_id(10, send=False, recv=False)
+ _channels._channel_id(10, send=False, recv=False)
def test_does_not_exist(self):
- cid = channels.create()
- with self.assertRaises(channels.ChannelNotFoundError):
- channels._channel_id(int(cid) + 1) # unforced
+ cid = _channels.create()
+ with self.assertRaises(_channels.ChannelNotFoundError):
+ _channels._channel_id(int(cid) + 1) # unforced
def test_str(self):
- cid = channels._channel_id(10, force=True)
+ cid = _channels._channel_id(10, force=True)
self.assertEqual(str(cid), '10')
def test_repr(self):
- cid = channels._channel_id(10, force=True)
+ cid = _channels._channel_id(10, force=True)
self.assertEqual(repr(cid), 'ChannelID(10)')
- cid = channels._channel_id(10, send=True, force=True)
+ cid = _channels._channel_id(10, send=True, force=True)
self.assertEqual(repr(cid), 'ChannelID(10, send=True)')
- cid = channels._channel_id(10, recv=True, force=True)
+ cid = _channels._channel_id(10, recv=True, force=True)
self.assertEqual(repr(cid), 'ChannelID(10, recv=True)')
- cid = channels._channel_id(10, send=True, recv=True, force=True)
+ cid = _channels._channel_id(10, send=True, recv=True, force=True)
self.assertEqual(repr(cid), 'ChannelID(10)')
def test_equality(self):
- cid1 = channels.create()
- cid2 = channels._channel_id(int(cid1))
- cid3 = channels.create()
+ cid1 = _channels.create()
+ cid2 = _channels._channel_id(int(cid1))
+ cid3 = _channels.create()
self.assertTrue(cid1 == cid1)
self.assertTrue(cid1 == cid2)
self.assertTrue(cid1 != cid3)
def test_shareable(self):
- chan = channels.create()
+ chan = _channels.create()
- obj = channels.create()
- channels.send(chan, obj, blocking=False)
- got = channels.recv(chan)
+ obj = _channels.create()
+ _channels.send(chan, obj, blocking=False)
+ got = _channels.recv(chan)
self.assertEqual(got, obj)
self.assertIs(type(got), type(obj))
class ChannelTests(TestBase):
def test_create_cid(self):
- cid = channels.create()
- self.assertIsInstance(cid, channels.ChannelID)
+ cid = _channels.create()
+ self.assertIsInstance(cid, _channels.ChannelID)
def test_sequential_ids(self):
- before = channels.list_all()
- id1 = channels.create()
- id2 = channels.create()
- id3 = channels.create()
- after = channels.list_all()
+ before = _channels.list_all()
+ id1 = _channels.create()
+ id2 = _channels.create()
+ id3 = _channels.create()
+ after = _channels.list_all()
self.assertEqual(id2, int(id1) + 1)
self.assertEqual(id3, int(id2) + 1)
def test_ids_global(self):
id1 = _interpreters.create()
out = _run_output(id1, dedent("""
- import _xxinterpchannels as _channels
+ import _interpchannels as _channels
cid = _channels.create()
print(cid)
"""))
id2 = _interpreters.create()
out = _run_output(id2, dedent("""
- import _xxinterpchannels as _channels
+ import _interpchannels as _channels
cid = _channels.create()
print(cid)
"""))
def test_channel_list_interpreters_none(self):
"""Test listing interpreters for a channel with no associations."""
# Test for channel with no associated _interpreters.
- cid = channels.create()
- send_interps = channels.list_interpreters(cid, send=True)
- recv_interps = channels.list_interpreters(cid, send=False)
+ cid = _channels.create()
+ send_interps = _channels.list_interpreters(cid, send=True)
+ recv_interps = _channels.list_interpreters(cid, send=False)
self.assertEqual(send_interps, [])
self.assertEqual(recv_interps, [])
def test_channel_list_interpreters_basic(self):
"""Test basic listing channel _interpreters."""
interp0, *_ = _interpreters.get_main()
- cid = channels.create()
- channels.send(cid, "send", blocking=False)
+ cid = _channels.create()
+ _channels.send(cid, "send", blocking=False)
# Test for a channel that has one end associated to an interpreter.
- send_interps = channels.list_interpreters(cid, send=True)
- recv_interps = channels.list_interpreters(cid, send=False)
+ send_interps = _channels.list_interpreters(cid, send=True)
+ recv_interps = _channels.list_interpreters(cid, send=False)
self.assertEqual(send_interps, [interp0])
self.assertEqual(recv_interps, [])
interp1 = _interpreters.create()
_run_output(interp1, dedent(f"""
- import _xxinterpchannels as _channels
+ import _interpchannels as _channels
obj = _channels.recv({cid})
"""))
# Test for channel that has both ends associated to an interpreter.
- send_interps = channels.list_interpreters(cid, send=True)
- recv_interps = channels.list_interpreters(cid, send=False)
+ send_interps = _channels.list_interpreters(cid, send=True)
+ recv_interps = _channels.list_interpreters(cid, send=False)
self.assertEqual(send_interps, [interp0])
self.assertEqual(recv_interps, [interp1])
interp1 = _interpreters.create()
interp2 = _interpreters.create()
interp3 = _interpreters.create()
- cid = channels.create()
+ cid = _channels.create()
- channels.send(cid, "send", blocking=False)
+ _channels.send(cid, "send", blocking=False)
_run_output(interp1, dedent(f"""
- import _xxinterpchannels as _channels
+ import _interpchannels as _channels
_channels.send({cid}, "send", blocking=False)
"""))
_run_output(interp2, dedent(f"""
- import _xxinterpchannels as _channels
+ import _interpchannels as _channels
obj = _channels.recv({cid})
"""))
_run_output(interp3, dedent(f"""
- import _xxinterpchannels as _channels
+ import _interpchannels as _channels
obj = _channels.recv({cid})
"""))
- send_interps = channels.list_interpreters(cid, send=True)
- recv_interps = channels.list_interpreters(cid, send=False)
+ send_interps = _channels.list_interpreters(cid, send=True)
+ recv_interps = _channels.list_interpreters(cid, send=False)
self.assertEqual(set(send_interps), {interp0, interp1})
self.assertEqual(set(recv_interps), {interp2, interp3})
"""Test listing channel interpreters with a destroyed interpreter."""
interp0, *_ = _interpreters.get_main()
interp1 = _interpreters.create()
- cid = channels.create()
- channels.send(cid, "send", blocking=False)
+ cid = _channels.create()
+ _channels.send(cid, "send", blocking=False)
_run_output(interp1, dedent(f"""
- import _xxinterpchannels as _channels
+ import _interpchannels as _channels
obj = _channels.recv({cid})
"""))
# Should be one interpreter associated with each end.
- send_interps = channels.list_interpreters(cid, send=True)
- recv_interps = channels.list_interpreters(cid, send=False)
+ send_interps = _channels.list_interpreters(cid, send=True)
+ recv_interps = _channels.list_interpreters(cid, send=False)
self.assertEqual(send_interps, [interp0])
self.assertEqual(recv_interps, [interp1])
_interpreters.destroy(interp1)
# Destroyed interpreter should not be listed.
- send_interps = channels.list_interpreters(cid, send=True)
- recv_interps = channels.list_interpreters(cid, send=False)
+ send_interps = _channels.list_interpreters(cid, send=True)
+ recv_interps = _channels.list_interpreters(cid, send=False)
self.assertEqual(send_interps, [interp0])
self.assertEqual(recv_interps, [])
interp0, *_ = _interpreters.get_main()
interp1 = _interpreters.create()
interp2 = _interpreters.create()
- cid = channels.create()
- channels.send(cid, "data", blocking=False)
+ cid = _channels.create()
+ _channels.send(cid, "data", blocking=False)
_run_output(interp1, dedent(f"""
- import _xxinterpchannels as _channels
+ import _interpchannels as _channels
obj = _channels.recv({cid})
"""))
- channels.send(cid, "data", blocking=False)
+ _channels.send(cid, "data", blocking=False)
_run_output(interp2, dedent(f"""
- import _xxinterpchannels as _channels
+ import _interpchannels as _channels
obj = _channels.recv({cid})
"""))
# Check the setup.
- send_interps = channels.list_interpreters(cid, send=True)
- recv_interps = channels.list_interpreters(cid, send=False)
+ send_interps = _channels.list_interpreters(cid, send=True)
+ recv_interps = _channels.list_interpreters(cid, send=False)
self.assertEqual(len(send_interps), 1)
self.assertEqual(len(recv_interps), 2)
# Release the main interpreter from the send end.
- channels.release(cid, send=True)
+ _channels.release(cid, send=True)
# Send end should have no associated _interpreters.
- send_interps = channels.list_interpreters(cid, send=True)
- recv_interps = channels.list_interpreters(cid, send=False)
+ send_interps = _channels.list_interpreters(cid, send=True)
+ recv_interps = _channels.list_interpreters(cid, send=False)
self.assertEqual(len(send_interps), 0)
self.assertEqual(len(recv_interps), 2)
# Release one of the subinterpreters from the receive end.
_run_output(interp2, dedent(f"""
- import _xxinterpchannels as _channels
+ import _interpchannels as _channels
_channels.release({cid})
"""))
# Receive end should have the released interpreter removed.
- send_interps = channels.list_interpreters(cid, send=True)
- recv_interps = channels.list_interpreters(cid, send=False)
+ send_interps = _channels.list_interpreters(cid, send=True)
+ recv_interps = _channels.list_interpreters(cid, send=False)
self.assertEqual(len(send_interps), 0)
self.assertEqual(recv_interps, [interp1])
"""Test listing channel interpreters with a closed channel."""
interp0, *_ = _interpreters.get_main()
interp1 = _interpreters.create()
- cid = channels.create()
+ cid = _channels.create()
# Put something in the channel so that it's not empty.
- channels.send(cid, "send", blocking=False)
+ _channels.send(cid, "send", blocking=False)
# Check initial state.
- send_interps = channels.list_interpreters(cid, send=True)
- recv_interps = channels.list_interpreters(cid, send=False)
+ send_interps = _channels.list_interpreters(cid, send=True)
+ recv_interps = _channels.list_interpreters(cid, send=False)
self.assertEqual(len(send_interps), 1)
self.assertEqual(len(recv_interps), 0)
# Force close the channel.
- channels.close(cid, force=True)
+ _channels.close(cid, force=True)
# Both ends should raise an error.
- with self.assertRaises(channels.ChannelClosedError):
- channels.list_interpreters(cid, send=True)
- with self.assertRaises(channels.ChannelClosedError):
- channels.list_interpreters(cid, send=False)
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.list_interpreters(cid, send=True)
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.list_interpreters(cid, send=False)
def test_channel_list_interpreters_closed_send_end(self):
"""Test listing channel interpreters with a channel's send end closed."""
interp0, *_ = _interpreters.get_main()
interp1 = _interpreters.create()
- cid = channels.create()
+ cid = _channels.create()
# Put something in the channel so that it's not empty.
- channels.send(cid, "send", blocking=False)
+ _channels.send(cid, "send", blocking=False)
# Check initial state.
- send_interps = channels.list_interpreters(cid, send=True)
- recv_interps = channels.list_interpreters(cid, send=False)
+ send_interps = _channels.list_interpreters(cid, send=True)
+ recv_interps = _channels.list_interpreters(cid, send=False)
self.assertEqual(len(send_interps), 1)
self.assertEqual(len(recv_interps), 0)
# Close the send end of the channel.
- channels.close(cid, send=True)
+ _channels.close(cid, send=True)
# Send end should raise an error.
- with self.assertRaises(channels.ChannelClosedError):
- channels.list_interpreters(cid, send=True)
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.list_interpreters(cid, send=True)
# Receive end should not be closed (since channel is not empty).
- recv_interps = channels.list_interpreters(cid, send=False)
+ recv_interps = _channels.list_interpreters(cid, send=False)
self.assertEqual(len(recv_interps), 0)
# Close the receive end of the channel from a subinterpreter.
_run_output(interp1, dedent(f"""
- import _xxinterpchannels as _channels
+ import _interpchannels as _channels
_channels.close({cid}, force=True)
"""))
return
# Both ends should raise an error.
- with self.assertRaises(channels.ChannelClosedError):
- channels.list_interpreters(cid, send=True)
- with self.assertRaises(channels.ChannelClosedError):
- channels.list_interpreters(cid, send=False)
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.list_interpreters(cid, send=True)
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.list_interpreters(cid, send=False)
def test_allowed_types(self):
- cid = channels.create()
+ cid = _channels.create()
objects = [
None,
'spam',
]
for obj in objects:
with self.subTest(obj):
- channels.send(cid, obj, blocking=False)
- got = channels.recv(cid)
+ _channels.send(cid, obj, blocking=False)
+ got = _channels.recv(cid)
self.assertEqual(got, obj)
self.assertIs(type(got), type(obj))
# XXX What about between interpreters?
def test_run_string_arg_unresolved(self):
- cid = channels.create()
+ cid = _channels.create()
interp = _interpreters.create()
_interpreters.set___main___attrs(interp, dict(cid=cid.send))
out = _run_output(interp, dedent("""
- import _xxinterpchannels as _channels
+ import _interpchannels as _channels
print(cid.end)
_channels.send(cid, b'spam', blocking=False)
"""))
- obj = channels.recv(cid)
+ obj = _channels.recv(cid)
self.assertEqual(obj, b'spam')
self.assertEqual(out.strip(), 'send')
# Note: this test caused crashes on some buildbots (bpo-33615).
@unittest.skip('disabled until high-level channels exist')
def test_run_string_arg_resolved(self):
- cid = channels.create()
- cid = channels._channel_id(cid, _resolve=True)
+ cid = _channels.create()
+ cid = _channels._channel_id(cid, _resolve=True)
interp = _interpreters.create()
out = _run_output(interp, dedent("""
- import _xxinterpchannels as _channels
+ import _interpchannels as _channels
print(chan.id.end)
_channels.send(chan.id, b'spam', blocking=False)
"""),
dict(chan=cid.send))
- obj = channels.recv(cid)
+ obj = _channels.recv(cid)
self.assertEqual(obj, b'spam')
self.assertEqual(out.strip(), 'send')
# send/recv
def test_send_recv_main(self):
- cid = channels.create()
+ cid = _channels.create()
orig = b'spam'
- channels.send(cid, orig, blocking=False)
- obj = channels.recv(cid)
+ _channels.send(cid, orig, blocking=False)
+ obj = _channels.recv(cid)
self.assertEqual(obj, orig)
self.assertIsNot(obj, orig)
def test_send_recv_same_interpreter(self):
id1 = _interpreters.create()
out = _run_output(id1, dedent("""
- import _xxinterpchannels as _channels
+ import _interpchannels as _channels
cid = _channels.create()
orig = b'spam'
_channels.send(cid, orig, blocking=False)
"""))
def test_send_recv_different_interpreters(self):
- cid = channels.create()
+ cid = _channels.create()
id1 = _interpreters.create()
out = _run_output(id1, dedent(f"""
- import _xxinterpchannels as _channels
+ import _interpchannels as _channels
_channels.send({cid}, b'spam', blocking=False)
"""))
- obj = channels.recv(cid)
+ obj = _channels.recv(cid)
self.assertEqual(obj, b'spam')
def test_send_recv_different_threads(self):
- cid = channels.create()
+ cid = _channels.create()
def f():
obj = recv_wait(cid)
- channels.send(cid, obj)
+ _channels.send(cid, obj)
t = threading.Thread(target=f)
t.start()
- channels.send(cid, b'spam')
+ _channels.send(cid, b'spam')
obj = recv_wait(cid)
t.join()
self.assertEqual(obj, b'spam')
def test_send_recv_different_interpreters_and_threads(self):
- cid = channels.create()
+ cid = _channels.create()
id1 = _interpreters.create()
out = None
nonlocal out
out = _run_output(id1, dedent(f"""
import time
- import _xxinterpchannels as _channels
+ import _interpchannels as _channels
while True:
try:
obj = _channels.recv({cid})
t = threading.Thread(target=f)
t.start()
- channels.send(cid, b'spam')
+ _channels.send(cid, b'spam')
obj = recv_wait(cid)
t.join()
self.assertEqual(obj, b'eggs')
def test_send_not_found(self):
- with self.assertRaises(channels.ChannelNotFoundError):
- channels.send(10, b'spam')
+ with self.assertRaises(_channels.ChannelNotFoundError):
+ _channels.send(10, b'spam')
def test_recv_not_found(self):
- with self.assertRaises(channels.ChannelNotFoundError):
- channels.recv(10)
+ with self.assertRaises(_channels.ChannelNotFoundError):
+ _channels.recv(10)
def test_recv_empty(self):
- cid = channels.create()
- with self.assertRaises(channels.ChannelEmptyError):
- channels.recv(cid)
+ cid = _channels.create()
+ with self.assertRaises(_channels.ChannelEmptyError):
+ _channels.recv(cid)
def test_recv_default(self):
default = object()
- cid = channels.create()
- obj1 = channels.recv(cid, default)
- channels.send(cid, None, blocking=False)
- channels.send(cid, 1, blocking=False)
- channels.send(cid, b'spam', blocking=False)
- channels.send(cid, b'eggs', blocking=False)
- obj2 = channels.recv(cid, default)
- obj3 = channels.recv(cid, default)
- obj4 = channels.recv(cid)
- obj5 = channels.recv(cid, default)
- obj6 = channels.recv(cid, default)
+ cid = _channels.create()
+ obj1 = _channels.recv(cid, default)
+ _channels.send(cid, None, blocking=False)
+ _channels.send(cid, 1, blocking=False)
+ _channels.send(cid, b'spam', blocking=False)
+ _channels.send(cid, b'eggs', blocking=False)
+ obj2 = _channels.recv(cid, default)
+ obj3 = _channels.recv(cid, default)
+ obj4 = _channels.recv(cid)
+ obj5 = _channels.recv(cid, default)
+ obj6 = _channels.recv(cid, default)
self.assertIs(obj1, default)
self.assertIs(obj2, None)
def test_recv_sending_interp_destroyed(self):
with self.subTest('closed'):
- cid1 = channels.create()
+ cid1 = _channels.create()
interp = _interpreters.create()
_interpreters.run_string(interp, dedent(f"""
- import _xxinterpchannels as _channels
+ import _interpchannels as _channels
_channels.send({cid1}, b'spam', blocking=False)
"""))
_interpreters.destroy(interp)
with self.assertRaisesRegex(RuntimeError,
f'channel {cid1} is closed'):
- channels.recv(cid1)
+ _channels.recv(cid1)
del cid1
with self.subTest('still open'):
- cid2 = channels.create()
+ cid2 = _channels.create()
interp = _interpreters.create()
_interpreters.run_string(interp, dedent(f"""
- import _xxinterpchannels as _channels
+ import _interpchannels as _channels
_channels.send({cid2}, b'spam', blocking=False)
"""))
- channels.send(cid2, b'eggs', blocking=False)
+ _channels.send(cid2, b'eggs', blocking=False)
_interpreters.destroy(interp)
- channels.recv(cid2)
+ _channels.recv(cid2)
with self.assertRaisesRegex(RuntimeError,
f'channel {cid2} is empty'):
- channels.recv(cid2)
+ _channels.recv(cid2)
del cid2
#-------------------
def test_send_buffer(self):
buf = bytearray(b'spamspamspam')
- cid = channels.create()
- channels.send_buffer(cid, buf, blocking=False)
- obj = channels.recv(cid)
+ cid = _channels.create()
+ _channels.send_buffer(cid, buf, blocking=False)
+ obj = _channels.recv(cid)
self.assertIsNot(obj, buf)
self.assertIsInstance(obj, memoryview)
# We want a long enough sleep that send() actually has to wait.
if buffer:
- send = channels.send_buffer
+ send = _channels.send_buffer
else:
- send = channels.send
+ send = _channels.send
- cid = channels.create()
+ cid = _channels.create()
try:
started = time.monotonic()
send(cid, obj, blocking=False)
stopped = time.monotonic()
- channels.recv(cid)
+ _channels.recv(cid)
finally:
- channels.destroy(cid)
+ _channels.destroy(cid)
delay = stopped - started # seconds
delay *= 3
received = None
obj = b'spam'
wait = self.build_send_waiter(obj)
- cid = channels.create()
+ cid = _channels.create()
def f():
nonlocal received
wait()
received = recv_wait(cid)
t = threading.Thread(target=f)
t.start()
- channels.send(cid, obj, blocking=True)
+ _channels.send(cid, obj, blocking=True)
t.join()
self.assertEqual(received, obj)
received = None
obj = bytearray(b'spam')
wait = self.build_send_waiter(obj, buffer=True)
- cid = channels.create()
+ cid = _channels.create()
def f():
nonlocal received
wait()
received = recv_wait(cid)
t = threading.Thread(target=f)
t.start()
- channels.send_buffer(cid, obj, blocking=True)
+ _channels.send_buffer(cid, obj, blocking=True)
t.join()
self.assertEqual(received, obj)
def test_send_blocking_no_wait(self):
received = None
obj = b'spam'
- cid = channels.create()
+ cid = _channels.create()
def f():
nonlocal received
received = recv_wait(cid)
t = threading.Thread(target=f)
t.start()
- channels.send(cid, obj, blocking=True)
+ _channels.send(cid, obj, blocking=True)
t.join()
self.assertEqual(received, obj)
def test_send_buffer_blocking_no_wait(self):
received = None
obj = bytearray(b'spam')
- cid = channels.create()
+ cid = _channels.create()
def f():
nonlocal received
received = recv_wait(cid)
t = threading.Thread(target=f)
t.start()
- channels.send_buffer(cid, obj, blocking=True)
+ _channels.send_buffer(cid, obj, blocking=True)
t.join()
self.assertEqual(received, obj)
obj = b'spam'
with self.subTest('non-blocking with timeout'):
- cid = channels.create()
+ cid = _channels.create()
with self.assertRaises(ValueError):
- channels.send(cid, obj, blocking=False, timeout=0.1)
+ _channels.send(cid, obj, blocking=False, timeout=0.1)
with self.subTest('timeout hit'):
- cid = channels.create()
+ cid = _channels.create()
with self.assertRaises(TimeoutError):
- channels.send(cid, obj, blocking=True, timeout=0.1)
- with self.assertRaises(channels.ChannelEmptyError):
- received = channels.recv(cid)
+ _channels.send(cid, obj, blocking=True, timeout=0.1)
+ with self.assertRaises(_channels.ChannelEmptyError):
+ received = _channels.recv(cid)
print(repr(received))
with self.subTest('timeout not hit'):
- cid = channels.create()
+ cid = _channels.create()
def f():
recv_wait(cid)
t = threading.Thread(target=f)
t.start()
- channels.send(cid, obj, blocking=True, timeout=10)
+ _channels.send(cid, obj, blocking=True, timeout=10)
t.join()
def test_send_buffer_timeout(self):
obj = bytearray(b'spam')
with self.subTest('non-blocking with timeout'):
- cid = channels.create()
+ cid = _channels.create()
with self.assertRaises(ValueError):
- channels.send_buffer(cid, obj, blocking=False, timeout=0.1)
+ _channels.send_buffer(cid, obj, blocking=False, timeout=0.1)
with self.subTest('timeout hit'):
- cid = channels.create()
+ cid = _channels.create()
with self.assertRaises(TimeoutError):
- channels.send_buffer(cid, obj, blocking=True, timeout=0.1)
- with self.assertRaises(channels.ChannelEmptyError):
- received = channels.recv(cid)
+ _channels.send_buffer(cid, obj, blocking=True, timeout=0.1)
+ with self.assertRaises(_channels.ChannelEmptyError):
+ received = _channels.recv(cid)
print(repr(received))
with self.subTest('timeout not hit'):
- cid = channels.create()
+ cid = _channels.create()
def f():
recv_wait(cid)
t = threading.Thread(target=f)
t.start()
- channels.send_buffer(cid, obj, blocking=True, timeout=10)
+ _channels.send_buffer(cid, obj, blocking=True, timeout=10)
t.join()
def test_send_closed_while_waiting(self):
wait = self.build_send_waiter(obj)
with self.subTest('without timeout'):
- cid = channels.create()
+ cid = _channels.create()
def f():
wait()
- channels.close(cid, force=True)
+ _channels.close(cid, force=True)
t = threading.Thread(target=f)
t.start()
- with self.assertRaises(channels.ChannelClosedError):
- channels.send(cid, obj, blocking=True)
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.send(cid, obj, blocking=True)
t.join()
with self.subTest('with timeout'):
- cid = channels.create()
+ cid = _channels.create()
def f():
wait()
- channels.close(cid, force=True)
+ _channels.close(cid, force=True)
t = threading.Thread(target=f)
t.start()
- with self.assertRaises(channels.ChannelClosedError):
- channels.send(cid, obj, blocking=True, timeout=30)
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.send(cid, obj, blocking=True, timeout=30)
t.join()
def test_send_buffer_closed_while_waiting(self):
wait = self.build_send_waiter(obj, buffer=True)
with self.subTest('without timeout'):
- cid = channels.create()
+ cid = _channels.create()
def f():
wait()
- channels.close(cid, force=True)
+ _channels.close(cid, force=True)
t = threading.Thread(target=f)
t.start()
- with self.assertRaises(channels.ChannelClosedError):
- channels.send_buffer(cid, obj, blocking=True)
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.send_buffer(cid, obj, blocking=True)
t.join()
with self.subTest('with timeout'):
- cid = channels.create()
+ cid = _channels.create()
def f():
wait()
- channels.close(cid, force=True)
+ _channels.close(cid, force=True)
t = threading.Thread(target=f)
t.start()
- with self.assertRaises(channels.ChannelClosedError):
- channels.send_buffer(cid, obj, blocking=True, timeout=30)
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.send_buffer(cid, obj, blocking=True, timeout=30)
t.join()
#-------------------
# close
def test_close_single_user(self):
- cid = channels.create()
- channels.send(cid, b'spam', blocking=False)
- channels.recv(cid)
- channels.close(cid)
+ cid = _channels.create()
+ _channels.send(cid, b'spam', blocking=False)
+ _channels.recv(cid)
+ _channels.close(cid)
- with self.assertRaises(channels.ChannelClosedError):
- channels.send(cid, b'eggs')
- with self.assertRaises(channels.ChannelClosedError):
- channels.recv(cid)
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.send(cid, b'eggs')
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.recv(cid)
def test_close_multiple_users(self):
- cid = channels.create()
+ cid = _channels.create()
id1 = _interpreters.create()
id2 = _interpreters.create()
_interpreters.run_string(id1, dedent(f"""
- import _xxinterpchannels as _channels
+ import _interpchannels as _channels
_channels.send({cid}, b'spam', blocking=False)
"""))
_interpreters.run_string(id2, dedent(f"""
- import _xxinterpchannels as _channels
+ import _interpchannels as _channels
_channels.recv({cid})
"""))
- channels.close(cid)
+ _channels.close(cid)
excsnap = _interpreters.run_string(id1, dedent(f"""
_channels.send({cid}, b'spam')
self.assertEqual(excsnap.type.__name__, 'ChannelClosedError')
def test_close_multiple_times(self):
- cid = channels.create()
- channels.send(cid, b'spam', blocking=False)
- channels.recv(cid)
- channels.close(cid)
+ cid = _channels.create()
+ _channels.send(cid, b'spam', blocking=False)
+ _channels.recv(cid)
+ _channels.close(cid)
- with self.assertRaises(channels.ChannelClosedError):
- channels.close(cid)
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.close(cid)
def test_close_empty(self):
tests = [
]
for send, recv in tests:
with self.subTest((send, recv)):
- cid = channels.create()
- channels.send(cid, b'spam', blocking=False)
- channels.recv(cid)
- channels.close(cid, send=send, recv=recv)
+ cid = _channels.create()
+ _channels.send(cid, b'spam', blocking=False)
+ _channels.recv(cid)
+ _channels.close(cid, send=send, recv=recv)
- with self.assertRaises(channels.ChannelClosedError):
- channels.send(cid, b'eggs')
- with self.assertRaises(channels.ChannelClosedError):
- channels.recv(cid)
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.send(cid, b'eggs')
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.recv(cid)
def test_close_defaults_with_unused_items(self):
- cid = channels.create()
- channels.send(cid, b'spam', blocking=False)
- channels.send(cid, b'ham', blocking=False)
+ cid = _channels.create()
+ _channels.send(cid, b'spam', blocking=False)
+ _channels.send(cid, b'ham', blocking=False)
- with self.assertRaises(channels.ChannelNotEmptyError):
- channels.close(cid)
- channels.recv(cid)
- channels.send(cid, b'eggs', blocking=False)
+ with self.assertRaises(_channels.ChannelNotEmptyError):
+ _channels.close(cid)
+ _channels.recv(cid)
+ _channels.send(cid, b'eggs', blocking=False)
def test_close_recv_with_unused_items_unforced(self):
- cid = channels.create()
- channels.send(cid, b'spam', blocking=False)
- channels.send(cid, b'ham', blocking=False)
-
- with self.assertRaises(channels.ChannelNotEmptyError):
- channels.close(cid, recv=True)
- channels.recv(cid)
- channels.send(cid, b'eggs', blocking=False)
- channels.recv(cid)
- channels.recv(cid)
- channels.close(cid, recv=True)
+ cid = _channels.create()
+ _channels.send(cid, b'spam', blocking=False)
+ _channels.send(cid, b'ham', blocking=False)
+
+ with self.assertRaises(_channels.ChannelNotEmptyError):
+ _channels.close(cid, recv=True)
+ _channels.recv(cid)
+ _channels.send(cid, b'eggs', blocking=False)
+ _channels.recv(cid)
+ _channels.recv(cid)
+ _channels.close(cid, recv=True)
def test_close_send_with_unused_items_unforced(self):
- cid = channels.create()
- channels.send(cid, b'spam', blocking=False)
- channels.send(cid, b'ham', blocking=False)
- channels.close(cid, send=True)
-
- with self.assertRaises(channels.ChannelClosedError):
- channels.send(cid, b'eggs')
- channels.recv(cid)
- channels.recv(cid)
- with self.assertRaises(channels.ChannelClosedError):
- channels.recv(cid)
+ cid = _channels.create()
+ _channels.send(cid, b'spam', blocking=False)
+ _channels.send(cid, b'ham', blocking=False)
+ _channels.close(cid, send=True)
+
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.send(cid, b'eggs')
+ _channels.recv(cid)
+ _channels.recv(cid)
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.recv(cid)
def test_close_both_with_unused_items_unforced(self):
- cid = channels.create()
- channels.send(cid, b'spam', blocking=False)
- channels.send(cid, b'ham', blocking=False)
-
- with self.assertRaises(channels.ChannelNotEmptyError):
- channels.close(cid, recv=True, send=True)
- channels.recv(cid)
- channels.send(cid, b'eggs', blocking=False)
- channels.recv(cid)
- channels.recv(cid)
- channels.close(cid, recv=True)
+ cid = _channels.create()
+ _channels.send(cid, b'spam', blocking=False)
+ _channels.send(cid, b'ham', blocking=False)
+
+ with self.assertRaises(_channels.ChannelNotEmptyError):
+ _channels.close(cid, recv=True, send=True)
+ _channels.recv(cid)
+ _channels.send(cid, b'eggs', blocking=False)
+ _channels.recv(cid)
+ _channels.recv(cid)
+ _channels.close(cid, recv=True)
def test_close_recv_with_unused_items_forced(self):
- cid = channels.create()
- channels.send(cid, b'spam', blocking=False)
- channels.send(cid, b'ham', blocking=False)
- channels.close(cid, recv=True, force=True)
+ cid = _channels.create()
+ _channels.send(cid, b'spam', blocking=False)
+ _channels.send(cid, b'ham', blocking=False)
+ _channels.close(cid, recv=True, force=True)
- with self.assertRaises(channels.ChannelClosedError):
- channels.send(cid, b'eggs')
- with self.assertRaises(channels.ChannelClosedError):
- channels.recv(cid)
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.send(cid, b'eggs')
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.recv(cid)
def test_close_send_with_unused_items_forced(self):
- cid = channels.create()
- channels.send(cid, b'spam', blocking=False)
- channels.send(cid, b'ham', blocking=False)
- channels.close(cid, send=True, force=True)
+ cid = _channels.create()
+ _channels.send(cid, b'spam', blocking=False)
+ _channels.send(cid, b'ham', blocking=False)
+ _channels.close(cid, send=True, force=True)
- with self.assertRaises(channels.ChannelClosedError):
- channels.send(cid, b'eggs')
- with self.assertRaises(channels.ChannelClosedError):
- channels.recv(cid)
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.send(cid, b'eggs')
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.recv(cid)
def test_close_both_with_unused_items_forced(self):
- cid = channels.create()
- channels.send(cid, b'spam', blocking=False)
- channels.send(cid, b'ham', blocking=False)
- channels.close(cid, send=True, recv=True, force=True)
+ cid = _channels.create()
+ _channels.send(cid, b'spam', blocking=False)
+ _channels.send(cid, b'ham', blocking=False)
+ _channels.close(cid, send=True, recv=True, force=True)
- with self.assertRaises(channels.ChannelClosedError):
- channels.send(cid, b'eggs')
- with self.assertRaises(channels.ChannelClosedError):
- channels.recv(cid)
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.send(cid, b'eggs')
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.recv(cid)
def test_close_never_used(self):
- cid = channels.create()
- channels.close(cid)
+ cid = _channels.create()
+ _channels.close(cid)
- with self.assertRaises(channels.ChannelClosedError):
- channels.send(cid, b'spam')
- with self.assertRaises(channels.ChannelClosedError):
- channels.recv(cid)
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.send(cid, b'spam')
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.recv(cid)
def test_close_by_unassociated_interp(self):
- cid = channels.create()
- channels.send(cid, b'spam', blocking=False)
+ cid = _channels.create()
+ _channels.send(cid, b'spam', blocking=False)
interp = _interpreters.create()
_interpreters.run_string(interp, dedent(f"""
- import _xxinterpchannels as _channels
+ import _interpchannels as _channels
_channels.close({cid}, force=True)
"""))
- with self.assertRaises(channels.ChannelClosedError):
- channels.recv(cid)
- with self.assertRaises(channels.ChannelClosedError):
- channels.close(cid)
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.recv(cid)
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.close(cid)
def test_close_used_multiple_times_by_single_user(self):
- cid = channels.create()
- channels.send(cid, b'spam', blocking=False)
- channels.send(cid, b'spam', blocking=False)
- channels.send(cid, b'spam', blocking=False)
- channels.recv(cid)
- channels.close(cid, force=True)
-
- with self.assertRaises(channels.ChannelClosedError):
- channels.send(cid, b'eggs')
- with self.assertRaises(channels.ChannelClosedError):
- channels.recv(cid)
+ cid = _channels.create()
+ _channels.send(cid, b'spam', blocking=False)
+ _channels.send(cid, b'spam', blocking=False)
+ _channels.send(cid, b'spam', blocking=False)
+ _channels.recv(cid)
+ _channels.close(cid, force=True)
+
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.send(cid, b'eggs')
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.recv(cid)
def test_channel_list_interpreters_invalid_channel(self):
- cid = channels.create()
+ cid = _channels.create()
# Test for invalid channel ID.
- with self.assertRaises(channels.ChannelNotFoundError):
- channels.list_interpreters(1000, send=True)
+ with self.assertRaises(_channels.ChannelNotFoundError):
+ _channels.list_interpreters(1000, send=True)
- channels.close(cid)
+ _channels.close(cid)
# Test for a channel that has been closed.
- with self.assertRaises(channels.ChannelClosedError):
- channels.list_interpreters(cid, send=True)
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.list_interpreters(cid, send=True)
def test_channel_list_interpreters_invalid_args(self):
# Tests for invalid arguments passed to the API.
- cid = channels.create()
+ cid = _channels.create()
with self.assertRaises(TypeError):
- channels.list_interpreters(cid)
+ _channels.list_interpreters(cid)
class ChannelReleaseTests(TestBase):
"""
def test_single_user(self):
- cid = channels.create()
- channels.send(cid, b'spam', blocking=False)
- channels.recv(cid)
- channels.release(cid, send=True, recv=True)
+ cid = _channels.create()
+ _channels.send(cid, b'spam', blocking=False)
+ _channels.recv(cid)
+ _channels.release(cid, send=True, recv=True)
- with self.assertRaises(channels.ChannelClosedError):
- channels.send(cid, b'eggs')
- with self.assertRaises(channels.ChannelClosedError):
- channels.recv(cid)
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.send(cid, b'eggs')
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.recv(cid)
def test_multiple_users(self):
- cid = channels.create()
+ cid = _channels.create()
id1 = _interpreters.create()
id2 = _interpreters.create()
_interpreters.run_string(id1, dedent(f"""
- import _xxinterpchannels as _channels
+ import _interpchannels as _channels
_channels.send({cid}, b'spam', blocking=False)
"""))
out = _run_output(id2, dedent(f"""
- import _xxinterpchannels as _channels
+ import _interpchannels as _channels
obj = _channels.recv({cid})
_channels.release({cid})
print(repr(obj))
self.assertEqual(out.strip(), "b'spam'")
def test_no_kwargs(self):
- cid = channels.create()
- channels.send(cid, b'spam', blocking=False)
- channels.recv(cid)
- channels.release(cid)
+ cid = _channels.create()
+ _channels.send(cid, b'spam', blocking=False)
+ _channels.recv(cid)
+ _channels.release(cid)
- with self.assertRaises(channels.ChannelClosedError):
- channels.send(cid, b'eggs')
- with self.assertRaises(channels.ChannelClosedError):
- channels.recv(cid)
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.send(cid, b'eggs')
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.recv(cid)
def test_multiple_times(self):
- cid = channels.create()
- channels.send(cid, b'spam', blocking=False)
- channels.recv(cid)
- channels.release(cid, send=True, recv=True)
+ cid = _channels.create()
+ _channels.send(cid, b'spam', blocking=False)
+ _channels.recv(cid)
+ _channels.release(cid, send=True, recv=True)
- with self.assertRaises(channels.ChannelClosedError):
- channels.release(cid, send=True, recv=True)
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.release(cid, send=True, recv=True)
def test_with_unused_items(self):
- cid = channels.create()
- channels.send(cid, b'spam', blocking=False)
- channels.send(cid, b'ham', blocking=False)
- channels.release(cid, send=True, recv=True)
+ cid = _channels.create()
+ _channels.send(cid, b'spam', blocking=False)
+ _channels.send(cid, b'ham', blocking=False)
+ _channels.release(cid, send=True, recv=True)
- with self.assertRaises(channels.ChannelClosedError):
- channels.recv(cid)
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.recv(cid)
def test_never_used(self):
- cid = channels.create()
- channels.release(cid)
+ cid = _channels.create()
+ _channels.release(cid)
- with self.assertRaises(channels.ChannelClosedError):
- channels.send(cid, b'spam')
- with self.assertRaises(channels.ChannelClosedError):
- channels.recv(cid)
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.send(cid, b'spam')
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.recv(cid)
def test_by_unassociated_interp(self):
- cid = channels.create()
- channels.send(cid, b'spam', blocking=False)
+ cid = _channels.create()
+ _channels.send(cid, b'spam', blocking=False)
interp = _interpreters.create()
_interpreters.run_string(interp, dedent(f"""
- import _xxinterpchannels as _channels
+ import _interpchannels as _channels
_channels.release({cid})
"""))
- obj = channels.recv(cid)
- channels.release(cid)
+ obj = _channels.recv(cid)
+ _channels.release(cid)
- with self.assertRaises(channels.ChannelClosedError):
- channels.send(cid, b'eggs')
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.send(cid, b'eggs')
self.assertEqual(obj, b'spam')
def test_close_if_unassociated(self):
# XXX Something's not right with this test...
- cid = channels.create()
+ cid = _channels.create()
interp = _interpreters.create()
_interpreters.run_string(interp, dedent(f"""
- import _xxinterpchannels as _channels
+ import _interpchannels as _channels
obj = _channels.send({cid}, b'spam', blocking=False)
_channels.release({cid})
"""))
- with self.assertRaises(channels.ChannelClosedError):
- channels.recv(cid)
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.recv(cid)
def test_partially(self):
# XXX Is partial close too weird/confusing?
- cid = channels.create()
- channels.send(cid, None, blocking=False)
- channels.recv(cid)
- channels.send(cid, b'spam', blocking=False)
- channels.release(cid, send=True)
- obj = channels.recv(cid)
+ cid = _channels.create()
+ _channels.send(cid, None, blocking=False)
+ _channels.recv(cid)
+ _channels.send(cid, b'spam', blocking=False)
+ _channels.release(cid, send=True)
+ obj = _channels.recv(cid)
self.assertEqual(obj, b'spam')
def test_used_multiple_times_by_single_user(self):
- cid = channels.create()
- channels.send(cid, b'spam', blocking=False)
- channels.send(cid, b'spam', blocking=False)
- channels.send(cid, b'spam', blocking=False)
- channels.recv(cid)
- channels.release(cid, send=True, recv=True)
+ cid = _channels.create()
+ _channels.send(cid, b'spam', blocking=False)
+ _channels.send(cid, b'spam', blocking=False)
+ _channels.send(cid, b'spam', blocking=False)
+ _channels.recv(cid)
+ _channels.release(cid, send=True, recv=True)
- with self.assertRaises(channels.ChannelClosedError):
- channels.send(cid, b'eggs')
- with self.assertRaises(channels.ChannelClosedError):
- channels.recv(cid)
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.send(cid, b'eggs')
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.recv(cid)
class ChannelCloseFixture(namedtuple('ChannelCloseFixture',
def _new_channel(self, creator):
if creator.name == 'main':
- return channels.create()
+ return _channels.create()
else:
- ch = channels.create()
+ ch = _channels.create()
run_interp(creator.id, f"""
- import _xxsubinterpreters
+ import _interpreters
cid = _xxsubchannels.create()
# We purposefully send back an int to avoid tying the
# channel to the other interpreter.
_xxsubchannels.send({ch}, int(cid), blocking=False)
- del _xxsubinterpreters
+ del _interpreters
""")
- self._cid = channels.recv(ch)
+ self._cid = _channels.recv(ch)
return self._cid
def _get_interpreter(self, interp):
if interp.name == 'main':
return
run_interp(interp.id, f"""
- import _xxinterpchannels as channels
- import test.test__xxinterpchannels as helpers
+ import _interpchannels as channels
+ import test.test__interpchannels as helpers
ChannelState = helpers.ChannelState
try:
cid
except NameError:
- cid = channels._channel_id({self.cid})
+ cid = _channels._channel_id({self.cid})
""")
)
fix.record_action(action, result)
else:
- _cid = channels.create()
+ _cid = _channels.create()
run_interp(interp.id, f"""
result = helpers.run_action(
{fix.cid},
{repr(fix.state)},
hideclosed={hideclosed},
)
- channels.send({_cid}, result.pending.to_bytes(1, 'little'), blocking=False)
- channels.send({_cid}, b'X' if result.closed else b'', blocking=False)
+ _channels.send({_cid}, result.pending.to_bytes(1, 'little'), blocking=False)
+ _channels.send({_cid}, b'X' if result.closed else b'', blocking=False)
""")
result = ChannelState(
- pending=int.from_bytes(channels.recv(_cid), 'little'),
- closed=bool(channels.recv(_cid)),
+ pending=int.from_bytes(_channels.recv(_cid), 'little'),
+ closed=bool(_channels.recv(_cid)),
)
fix.record_action(action, result)
if not fix.expect_closed_error():
self.run_action(fix, close, hideclosed=False)
else:
- with self.assertRaises(channels.ChannelClosedError):
+ with self.assertRaises(_channels.ChannelClosedError):
self.run_action(fix, close, hideclosed=False)
def _assert_closed_in_interp(self, fix, interp=None):
if interp is None or interp.name == 'main':
- with self.assertRaises(channels.ChannelClosedError):
- channels.recv(fix.cid)
- with self.assertRaises(channels.ChannelClosedError):
- channels.send(fix.cid, b'spam')
- with self.assertRaises(channels.ChannelClosedError):
- channels.close(fix.cid)
- with self.assertRaises(channels.ChannelClosedError):
- channels.close(fix.cid, force=True)
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.recv(fix.cid)
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.send(fix.cid, b'spam')
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.close(fix.cid)
+ with self.assertRaises(_channels.ChannelClosedError):
+ _channels.close(fix.cid, force=True)
else:
run_interp(interp.id, """
with helpers.expect_channel_closed():
- channels.recv(cid)
+ _channels.recv(cid)
""")
run_interp(interp.id, """
with helpers.expect_channel_closed():
- channels.send(cid, b'spam', blocking=False)
+ _channels.send(cid, b'spam', blocking=False)
""")
run_interp(interp.id, """
with helpers.expect_channel_closed():
- channels.close(cid)
+ _channels.close(cid)
""")
run_interp(interp.id, """
with helpers.expect_channel_closed():
- channels.close(cid, force=True)
+ _channels.close(cid, force=True)
""")
def _assert_closed(self, fix):
self.assertTrue(fix.state.closed)
for _ in range(fix.state.pending):
- channels.recv(fix.cid)
+ _channels.recv(fix.cid)
self._assert_closed_in_interp(fix)
for interp in ('same', 'other'):