From a61bff09a61a891a17c1914664e0f1a1d8279eeb Mon Sep 17 00:00:00 2001 From: Marcin Siodelski Date: Wed, 19 Apr 2017 17:20:09 +0200 Subject: [PATCH] [5189] Added asynchronous operations to the UnixDomainSocket class. --- .../tests/unix_domain_socket_unittest.cc | 138 +++++++++++ src/lib/asiolink/unix_domain_socket.cc | 230 +++++++++++++++++- src/lib/asiolink/unix_domain_socket.h | 37 +++ 3 files changed, 402 insertions(+), 3 deletions(-) diff --git a/src/lib/asiolink/tests/unix_domain_socket_unittest.cc b/src/lib/asiolink/tests/unix_domain_socket_unittest.cc index c4f25564e5..55423cba04 100644 --- a/src/lib/asiolink/tests/unix_domain_socket_unittest.cc +++ b/src/lib/asiolink/tests/unix_domain_socket_unittest.cc @@ -109,6 +109,101 @@ TEST_F(UnixDomainSocketTest, sendReceive) { EXPECT_EQ("received foo", response); } +// This test verifies that the client can send the data over the unix +// domain socket and receive a response asynchronously. +TEST_F(UnixDomainSocketTest, asyncSendReceive) { + // Start the server. + test_socket_->bindServerSocket(); + + // Setup client side. + UnixDomainSocket socket(io_service_); + + // We're going to asynchronously connect to the server. The boolean value + // below will be modified by the connect handler function (lambda) invoked + // when the connection is established or if an error occurs. + bool connect_handler_invoked = false; + ASSERT_NO_THROW(socket.asyncConnect(unixSocketFilePath(), + [this, &connect_handler_invoked](const boost::system::error_code& ec) { + // Indicate that the handler has been called so as the loop below gets + // interrupted. + connect_handler_invoked = true; + // Operation aborted indicates that IO service has been stopped. This + // shouldn't happen here. + if (ec && ec.value() != boost::asio::error::operation_aborted) { + ADD_FAILURE() << "error occurred while asynchronously connecting" + " via unix domain socket: " << ec.message(); + } + } + )); + // Run IO service until connect handler is invoked. + while (!connect_handler_invoked) { + io_service_.run_one(); + } + + // We are going to asynchronously send the 'foo' over the unix socket. + const std::string outbound_data = "foo"; + size_t sent_size = 0; + ASSERT_NO_THROW(socket.asyncSend(outbound_data.c_str(), outbound_data.size(), + [this, &sent_size](const boost::system::error_code& ec, size_t length) { + // If we have been successful sending the data, record the number of + // bytes we have sent. + if (!ec) { + sent_size = length; + + } else if (ec.value() != boost::asio::error::operation_aborted) { + ADD_FAILURE() << "error occurred while asynchronously sending the" + " data over unix domain socket: " << ec.message(); + } + } + )); + + // Run IO service to generate server's response. + while (test_socket_->getResponseNum() < 1) { + io_service_.run_one(); + } + + // There is no guarantee that all data have been sent so we only check that + // some data have been sent. + ASSERT_GT(sent_size, 0); + + // Receive response from the socket. + std::array read_buf; + size_t bytes_read = 0; + bool receive_handler_invoked = false; + ASSERT_NO_THROW(socket.asyncReceive(&read_buf[0], read_buf.size(), + [this, &receive_handler_invoked, &bytes_read] + (const boost::system::error_code& ec, size_t length) mutable { + // Indicate that the handler has been called to interrupt the + // loop below. + receive_handler_invoked = true; + + // If we have been successful receiving the data, record the number of + // bytes received. + if (!ec) { + bytes_read = length; + + } else if (ec.value() != boost::asio::error::operation_aborted) { + ADD_FAILURE() << "error occurred while asynchronously receiving" + " data via unix domain socket: " << ec.message(); + } + + })); + // Run IO service until we get some response from the server. + while (!receive_handler_invoked) { + io_service_.run_one(); + } + + // Make sure we have received something. + ASSERT_GT(bytes_read, 0); + + std::string response(&read_buf[0], bytes_read); + + // What we have received should be a substring of the sent data prepended + // with 'received'. For such short chunks of data it is usually 'received foo' + // that we receive but there is no guarantee. + EXPECT_EQ(0, std::string("received foo").find(response)); +} + // This test verifies that UnixDomainSocketError exception is thrown // on attempt to connect, write or receive when the server socket // is not available. @@ -123,6 +218,49 @@ TEST_F(UnixDomainSocketTest, clientErrors) { UnixDomainSocketError); } +// This test verifies that an error is returned on attempt to asynchronously +// connect, write or receive when the server socket is not available. +TEST_F(UnixDomainSocketTest, asyncClientErrors) { + UnixDomainSocket socket(io_service_); + + // Connect + bool connect_handler_invoked = false; + socket.asyncConnect(unixSocketFilePath(), + [this, &connect_handler_invoked](const boost::system::error_code& ec) { + connect_handler_invoked = true; + EXPECT_TRUE(ec); + }); + while (!connect_handler_invoked) { + io_service_.run_one(); + } + + // Send + const std::string outbound_data = "foo"; + bool send_handler_invoked = false; + socket.asyncSend(outbound_data.c_str(), outbound_data.size(), + [this, &send_handler_invoked] + (const boost::system::error_code& ec, size_t length) { + send_handler_invoked = true; + EXPECT_TRUE(ec); + }); + while (!send_handler_invoked) { + io_service_.run_one(); + } + + // Receive + bool receive_handler_invoked = false; + std::array read_buf; + socket.asyncReceive(&read_buf[0], read_buf.size(), + [this, &receive_handler_invoked] + (const boost::system::error_code& ec, size_t length) { + receive_handler_invoked = true; + EXPECT_TRUE(ec); + }); + while (!receive_handler_invoked) { + io_service_.run_one(); + } +} + // Check that native socket descriptor is returned correctly when // the socket is connected. TEST_F(UnixDomainSocketTest, getNative) { diff --git a/src/lib/asiolink/unix_domain_socket.cc b/src/lib/asiolink/unix_domain_socket.cc index 7c40247073..c6436a8a66 100644 --- a/src/lib/asiolink/unix_domain_socket.cc +++ b/src/lib/asiolink/unix_domain_socket.cc @@ -6,6 +6,7 @@ #include #include +#include #include using namespace boost::asio::local; @@ -13,7 +14,7 @@ namespace isc { namespace asiolink { /// @brief Implementation of the unix domain socket. -class UnixDomainSocketImpl { +class UnixDomainSocketImpl : public boost::enable_shared_from_this { public: /// @brief Constructor. @@ -30,6 +31,120 @@ public: close(); } + /// @brief Asynchronously connects to an endpoint. + /// + /// This method schedules asynchronous connect and installs the + /// @ref UnixDomainSocketImpl::connectHandler as a callback. + /// + /// @param endpoint Reference to an endpoint to connect to. + /// @param handler User supplied handler to be invoked when the connection + /// is established or when error is signalled. + void asyncConnect(const stream_protocol::endpoint& endpoint, + const UnixDomainSocket::ConnectHandler& handler); + + /// @brief Local handler invoked as a result of asynchronous connection. + /// + /// This is a wrapper around the user supplied callback. It ignores + /// EINPROGRESS errors which are observed on some operating systems as + /// a result of trying to connect asynchronously. This error code doesn't + /// necessarily indicate a problem and the subsequent attempts to read + /// and write to the socket will succeed. Therefore, the handler simply + /// overrides this error code with success status. The user supplied + /// handler don't need to deal with the EINPROGRESS error codes. + /// + /// @param remote_handler User supplied callback. + /// @param ec Error code returned as a result of connection. + void connectHandler(const UnixDomainSocket::ConnectHandler& remote_handler, + const boost::system::error_code& ec); + + /// @brief Asynchronously sends data over the socket. + /// + /// This method schedules an asynchronous send and installs the + /// @ref UnixDomainSocketImpl::sendHandler as a callback. + /// + /// @param data Pointer to data to be sent. + /// @param length Number of bytes to be sent. + /// @param handler Callback to be invoked when data have been sent or an + /// sending error is signalled. + void asyncSend(const void* data, const size_t length, + const UnixDomainSocket::Handler& handler); + + /// @brief Asynchronously sends the data over the socket. + /// + /// This method is called by the @ref asyncSend and the @ref sendHandler + /// if the asynchronous send has to be repeated as a result of receiving + /// EAGAIN or EWOULDBLOCK. + /// + /// @param buffer Buffers holding the data to be sent. + /// @param handler User supplied callback to be invoked when data have + /// been sent or sending error is signalled. + void doSend(const boost::asio::const_buffers_1& buffer, + const UnixDomainSocket::Handler& handler); + + + /// @brief Local handler invoked as a result of asynchronous send. + /// + /// This handler is invoked as a result of asynchronous send. It is a + /// wrapper callback around the user supplied callback. It handles + /// EWOULDBLOCK and EAGAIN errors by retrying an asynchronous send. + /// These errors are often returned on some operating systems, even + /// though one would expect that asynchronous operation would not + /// return such errors. Because these errors are handled by the + /// wrapper callback, the user supplied callback never receives + /// these errors. + /// + /// @param remote_handler User supplied callback. + /// @param buffer Buffers holding the data to be sent. + /// @param ec Error code returned as a result of sending the data. + /// @param length Length of the data sent. + void sendHandler(const UnixDomainSocket::Handler& remote_handler, + const boost::asio::const_buffers_1& buffer, + const boost::system::error_code& ec, + size_t length); + + /// @brief Asynchronously receive data over the socket. + /// + /// This method schedules asynchronous receive and installs the + /// @ref UnixDomainSocket::receiveHandler is a callback. + /// + /// @param data Pointer to a buffer into which the data should be read. + /// @param length Length of the buffer. + /// @param handler User supplied callback invoked when data have been + /// received or an error is signalled. + void asyncReceive(void* data, const size_t length, + const UnixDomainSocket::Handler& handler); + + /// @brief Asynchronously receives the data over the socket. + /// + /// This method is called @ref asyncReceive and @ref receiveHandler when + /// EWOULDBLOCK or EAGAIN is returned. + /// + /// @param buffer A buffer into which the data should be received. + /// @param handler User supplied callback invoked when data have been + /// received on an error is signalled. + void doReceive(const boost::asio::mutable_buffers_1& buffer, + const UnixDomainSocket::Handler& handler); + + /// @brief Local handler invoked as a result of asynchronous receive. + /// + /// This handler is invoked as a result of asynchronous receive. It is a + /// wrapper callback around the user supplied callback. It handles + /// EWOULDBLOCK and EAGAIN by retrying to asynchronously receive the + /// data. These errors are often returned on some operating systems, even + /// though one would expect that asynchronous operation would not + /// return such errors. Because these errors are handled by the + /// wrapper callback, the user supplied callback never receives + /// these errors. + /// + /// @param remote_handler User supplied callback. + /// @param buffer Buffer into which the data are received. + /// @param ec Error code returned as a result of asynchronous receive. + /// @param length Size of the received data. + void receiveHandler(const UnixDomainSocket::Handler& remote_handler, + const boost::asio::mutable_buffers_1& buffer, + const boost::system::error_code& ec, + size_t length); + /// @brief Closes the socket. void close(); @@ -37,6 +152,98 @@ public: stream_protocol::socket socket_; }; +void +UnixDomainSocketImpl::asyncConnect(const stream_protocol::endpoint& endpoint, + const UnixDomainSocket::ConnectHandler& handler) { + using namespace std::placeholders; + + UnixDomainSocket::ConnectHandler local_handler = + std::bind(&UnixDomainSocketImpl::connectHandler, shared_from_this(), + handler, _1); + socket_.async_connect(endpoint, local_handler); +} + +void +UnixDomainSocketImpl::connectHandler(const UnixDomainSocket::ConnectHandler& remote_handler, + const boost::system::error_code& ec) { + // It was observed on Debian and Fedora that asynchronous connect may result + // in EINPROGRESS error. This doesn't really indicate a problem with a + // connection. If we continue transmitting data over the socket it will + // succeed. So we suppress this error and return 'success' to the user's + // handler. + if (ec.value() == boost::asio::error::in_progress) { + remote_handler(boost::system::error_code()); + } else { + remote_handler(ec); + } +} + +void +UnixDomainSocketImpl::asyncSend(const void* data, const size_t length, + const UnixDomainSocket::Handler& handler) { + doSend(boost::asio::buffer(data, length), handler); +} + +void +UnixDomainSocketImpl::doSend(const boost::asio::const_buffers_1& buffer, + const UnixDomainSocket::Handler& handler) { + using namespace std::placeholders; + + UnixDomainSocket::Handler local_handler = + std::bind(&UnixDomainSocketImpl::sendHandler, shared_from_this(), + handler, buffer, _1, _2); + socket_.async_send(buffer, local_handler); +} + +void +UnixDomainSocketImpl::sendHandler(const UnixDomainSocket::Handler& remote_handler, + const boost::asio::const_buffers_1& buffer, + const boost::system::error_code& ec, + size_t length) { + // The asynchronous send may return EWOULDBLOCK or EAGAIN on some + // operating systems. In this case, we simply retry hoping that it + // will succeed next time. The user's callback never sees these + // errors. + if ((ec.value() == boost::asio::error::would_block) || + (ec.value() == boost::asio::error::try_again)) { + doSend(buffer, remote_handler); + } + remote_handler(ec, length); +} + +void +UnixDomainSocketImpl::asyncReceive(void* data, const size_t length, + const UnixDomainSocket::Handler& handler) { + doReceive(boost::asio::buffer(data, length), handler); +} + +void +UnixDomainSocketImpl::doReceive(const boost::asio::mutable_buffers_1& buffer, + const UnixDomainSocket::Handler& handler) { + using namespace std::placeholders; + + UnixDomainSocket::Handler local_handler = + std::bind(&UnixDomainSocketImpl::receiveHandler, shared_from_this(), + handler, buffer, _1, _2); + socket_.async_receive(buffer, 0, local_handler); +} + +void +UnixDomainSocketImpl::receiveHandler(const UnixDomainSocket::Handler& remote_handler, + const boost::asio::mutable_buffers_1& buffer, + const boost::system::error_code& ec, + size_t length) { + // The asynchronous receive may return EWOULDBLOCK or EAGAIN on some + // operating systems. In this case, we simply retry hoping that it + // will succeed next time. The user's callback never sees these + // errors. + if ((ec.value() == boost::asio::error::would_block) || + (ec.value() == boost::asio::error::try_again)) { + doReceive(buffer, remote_handler); + } + remote_handler(ec, length); +} + void UnixDomainSocketImpl::close() { static_cast(socket_.close()); @@ -65,6 +272,11 @@ UnixDomainSocket::connect(const std::string& path) { } } +void +UnixDomainSocket::asyncConnect(const std::string& path, const ConnectHandler& handler) { + impl_->asyncConnect(stream_protocol::endpoint(path.c_str()), handler); +} + size_t UnixDomainSocket::write(const void* data, size_t length) { boost::system::error_code ec; @@ -78,6 +290,12 @@ UnixDomainSocket::write(const void* data, size_t length) { return (res); } +void +UnixDomainSocket::asyncSend(const void* data, const size_t length, + const Handler& handler) { + impl_->asyncSend(data, length, handler); +} + size_t UnixDomainSocket::receive(void* data, size_t length) { boost::system::error_code ec; @@ -88,10 +306,16 @@ UnixDomainSocket::receive(void* data, size_t length) { return (res); } +void +UnixDomainSocket::asyncReceive(void* data, const size_t length, + const Handler& handler) { + impl_->asyncReceive(data, length, handler); +} + void UnixDomainSocket::close() { impl_->close(); } -} -} +} // end of namespace asiolink +} // end of namespace isc diff --git a/src/lib/asiolink/unix_domain_socket.h b/src/lib/asiolink/unix_domain_socket.h index 7c669d7eda..db5ca294a9 100644 --- a/src/lib/asiolink/unix_domain_socket.h +++ b/src/lib/asiolink/unix_domain_socket.h @@ -10,6 +10,7 @@ #include #include #include +#include #include namespace isc { @@ -29,6 +30,13 @@ class UnixDomainSocketImpl; class UnixDomainSocket : public IOSocket { public: + /// @brief Callback type used in call to @ref UnixDomainSocket::asyncConnect. + typedef std::function ConnectHandler; + + /// @brief Callback type used in calls to @ref UnixDomainSocket::asyncSend + /// and @ref UnixDomainSocket::asyncReceive. + typedef std::function Handler; + /// @brief Constructor. /// /// @param io_service Reference to IOService to be used by this @@ -48,6 +56,15 @@ public: /// @throw UnixDomainSocketError if error occurs. void connect(const std::string& path); + /// @brief Asynchronously connects the socket to the specified endpoint. + /// + /// Always returns immediatelly. + /// + /// @param path Path to the unix socket to which we should connect. + /// @param handler Callback to be invoked when connection is established or + /// a connection error is signalled. + void asyncConnect(const std::string& path, const ConnectHandler& handler); + /// @brief Writes specified amount of data to a socket. /// /// @param data Pointer to data to be written. @@ -57,6 +74,16 @@ public: /// @throw UnixDomainSocketError if error occurs. size_t write(const void* data, size_t length); + /// @brief Asynchronously sends data over the socket. + /// + /// Always returns immediatelly. + /// + /// @param data Pointer to data to be sent. + /// @param length Number of bytes to be sent. + /// @param handler Callback to be invoked when data have been sent or an + /// sending error is signalled. + void asyncSend(const void* data, const size_t length, const Handler& handler); + /// @brief Receives data from a socket. /// /// @param [out] data Pointer to a location into which the read data should @@ -67,6 +94,16 @@ public: /// @throw UnixDomainSocketError if error occurs. size_t receive(void* data, size_t length); + /// @brief Asynchronously receives data over the socket. + /// + /// Always returns immediatelly. + /// @param [out] data Pointer to a location into which the read data should + /// be stored. + /// @param length Length of the buffer. + /// @param handler Callback to be invoked when data have been received or an + /// error is signalled. + void asyncReceive(void* data, const size_t length, const Handler& handler); + /// @brief Closes the socket. void close(); -- 2.47.3