write-only transport like write pipe
"""
+ __slots__ = ()
+
def connection_made(self, transport):
"""Called when a connection is made.
* CL: connection_lost()
"""
+ __slots__ = ()
+
def data_received(self, data):
"""Called when some data is received.
* CL: connection_lost()
"""
+ __slots__ = ()
+
def get_buffer(self, sizehint):
"""Called to allocate a new receive buffer.
class DatagramProtocol(BaseProtocol):
"""Interface for datagram protocol."""
+ __slots__ = ()
+
def datagram_received(self, data, addr):
"""Called when some datagram is received."""
class SubprocessProtocol(BaseProtocol):
"""Interface for protocol for subprocess calls."""
+ __slots__ = ()
+
def pipe_data_received(self, fd, data):
"""Called when the subprocess writes data into stdout/stderr pipe.
loop.close()
-class ProtocolsAbsTests(unittest.TestCase):
-
- def test_empty(self):
- f = mock.Mock()
- p = asyncio.Protocol()
- self.assertIsNone(p.connection_made(f))
- self.assertIsNone(p.connection_lost(f))
- self.assertIsNone(p.data_received(f))
- self.assertIsNone(p.eof_received())
-
- dp = asyncio.DatagramProtocol()
- self.assertIsNone(dp.connection_made(f))
- self.assertIsNone(dp.connection_lost(f))
- self.assertIsNone(dp.error_received(f))
- self.assertIsNone(dp.datagram_received(f, f))
-
- sp = asyncio.SubprocessProtocol()
- self.assertIsNone(sp.connection_made(f))
- self.assertIsNone(sp.connection_lost(f))
- self.assertIsNone(sp.pipe_data_received(1, f))
- self.assertIsNone(sp.pipe_connection_lost(1, f))
- self.assertIsNone(sp.process_exited())
-
-
class PolicyTests(unittest.TestCase):
def test_event_loop_policy(self):
--- /dev/null
+import unittest
+from unittest import mock
+
+import asyncio
+
+
+class ProtocolsAbsTests(unittest.TestCase):
+
+ def test_base_protocol(self):
+ f = mock.Mock()
+ p = asyncio.BaseProtocol()
+ self.assertIsNone(p.connection_made(f))
+ self.assertIsNone(p.connection_lost(f))
+ self.assertIsNone(p.pause_writing())
+ self.assertIsNone(p.resume_writing())
+ self.assertFalse(hasattr(p, '__dict__'))
+
+ def test_protocol(self):
+ f = mock.Mock()
+ p = asyncio.Protocol()
+ self.assertIsNone(p.connection_made(f))
+ self.assertIsNone(p.connection_lost(f))
+ self.assertIsNone(p.data_received(f))
+ self.assertIsNone(p.eof_received())
+ self.assertIsNone(p.pause_writing())
+ self.assertIsNone(p.resume_writing())
+ self.assertFalse(hasattr(p, '__dict__'))
+
+ def test_buffered_protocol(self):
+ f = mock.Mock()
+ p = asyncio.BufferedProtocol()
+ self.assertIsNone(p.connection_made(f))
+ self.assertIsNone(p.connection_lost(f))
+ self.assertIsNone(p.get_buffer(100))
+ self.assertIsNone(p.buffer_updated(150))
+ self.assertIsNone(p.pause_writing())
+ self.assertIsNone(p.resume_writing())
+ self.assertFalse(hasattr(p, '__dict__'))
+
+ def test_datagram_protocol(self):
+ f = mock.Mock()
+ dp = asyncio.DatagramProtocol()
+ self.assertIsNone(dp.connection_made(f))
+ self.assertIsNone(dp.connection_lost(f))
+ self.assertIsNone(dp.error_received(f))
+ self.assertIsNone(dp.datagram_received(f, f))
+ self.assertFalse(hasattr(dp, '__dict__'))
+
+ def test_subprocess_protocol(self):
+ f = mock.Mock()
+ sp = asyncio.SubprocessProtocol()
+ self.assertIsNone(sp.connection_made(f))
+ self.assertIsNone(sp.connection_lost(f))
+ self.assertIsNone(sp.pipe_data_received(1, f))
+ self.assertIsNone(sp.pipe_connection_lost(1, f))
+ self.assertIsNone(sp.process_exited())
+ self.assertFalse(hasattr(sp, '__dict__'))