EXPECT_EQ("received foo", response);
}
+// This test verifies that the client can send the data over the unix
+// domain socket and receive a response asynchronously.
+TEST_F(UnixDomainSocketTest, asyncSendReceive) {
+ // Start the server.
+ test_socket_->bindServerSocket();
+
+ // Setup client side.
+ UnixDomainSocket socket(io_service_);
+
+ // We're going to asynchronously connect to the server. The boolean value
+ // below will be modified by the connect handler function (lambda) invoked
+ // when the connection is established or if an error occurs.
+ bool connect_handler_invoked = false;
+ ASSERT_NO_THROW(socket.asyncConnect(unixSocketFilePath(),
+ [this, &connect_handler_invoked](const boost::system::error_code& ec) {
+ // Indicate that the handler has been called so as the loop below gets
+ // interrupted.
+ connect_handler_invoked = true;
+ // Operation aborted indicates that IO service has been stopped. This
+ // shouldn't happen here.
+ if (ec && ec.value() != boost::asio::error::operation_aborted) {
+ ADD_FAILURE() << "error occurred while asynchronously connecting"
+ " via unix domain socket: " << ec.message();
+ }
+ }
+ ));
+ // Run IO service until connect handler is invoked.
+ while (!connect_handler_invoked) {
+ io_service_.run_one();
+ }
+
+ // We are going to asynchronously send the 'foo' over the unix socket.
+ const std::string outbound_data = "foo";
+ size_t sent_size = 0;
+ ASSERT_NO_THROW(socket.asyncSend(outbound_data.c_str(), outbound_data.size(),
+ [this, &sent_size](const boost::system::error_code& ec, size_t length) {
+ // If we have been successful sending the data, record the number of
+ // bytes we have sent.
+ if (!ec) {
+ sent_size = length;
+
+ } else if (ec.value() != boost::asio::error::operation_aborted) {
+ ADD_FAILURE() << "error occurred while asynchronously sending the"
+ " data over unix domain socket: " << ec.message();
+ }
+ }
+ ));
+
+ // Run IO service to generate server's response.
+ while (test_socket_->getResponseNum() < 1) {
+ io_service_.run_one();
+ }
+
+ // There is no guarantee that all data have been sent so we only check that
+ // some data have been sent.
+ ASSERT_GT(sent_size, 0);
+
+ // Receive response from the socket.
+ std::array<char, 1024> read_buf;
+ size_t bytes_read = 0;
+ bool receive_handler_invoked = false;
+ ASSERT_NO_THROW(socket.asyncReceive(&read_buf[0], read_buf.size(),
+ [this, &receive_handler_invoked, &bytes_read]
+ (const boost::system::error_code& ec, size_t length) mutable {
+ // Indicate that the handler has been called to interrupt the
+ // loop below.
+ receive_handler_invoked = true;
+
+ // If we have been successful receiving the data, record the number of
+ // bytes received.
+ if (!ec) {
+ bytes_read = length;
+
+ } else if (ec.value() != boost::asio::error::operation_aborted) {
+ ADD_FAILURE() << "error occurred while asynchronously receiving"
+ " data via unix domain socket: " << ec.message();
+ }
+
+ }));
+ // Run IO service until we get some response from the server.
+ while (!receive_handler_invoked) {
+ io_service_.run_one();
+ }
+
+ // Make sure we have received something.
+ ASSERT_GT(bytes_read, 0);
+
+ std::string response(&read_buf[0], bytes_read);
+
+ // What we have received should be a substring of the sent data prepended
+ // with 'received'. For such short chunks of data it is usually 'received foo'
+ // that we receive but there is no guarantee.
+ EXPECT_EQ(0, std::string("received foo").find(response));
+}
+
// This test verifies that UnixDomainSocketError exception is thrown
// on attempt to connect, write or receive when the server socket
// is not available.
UnixDomainSocketError);
}
+// This test verifies that an error is returned on attempt to asynchronously
+// connect, write or receive when the server socket is not available.
+TEST_F(UnixDomainSocketTest, asyncClientErrors) {
+ UnixDomainSocket socket(io_service_);
+
+ // Connect
+ bool connect_handler_invoked = false;
+ socket.asyncConnect(unixSocketFilePath(),
+ [this, &connect_handler_invoked](const boost::system::error_code& ec) {
+ connect_handler_invoked = true;
+ EXPECT_TRUE(ec);
+ });
+ while (!connect_handler_invoked) {
+ io_service_.run_one();
+ }
+
+ // Send
+ const std::string outbound_data = "foo";
+ bool send_handler_invoked = false;
+ socket.asyncSend(outbound_data.c_str(), outbound_data.size(),
+ [this, &send_handler_invoked]
+ (const boost::system::error_code& ec, size_t length) {
+ send_handler_invoked = true;
+ EXPECT_TRUE(ec);
+ });
+ while (!send_handler_invoked) {
+ io_service_.run_one();
+ }
+
+ // Receive
+ bool receive_handler_invoked = false;
+ std::array<char, 1024> read_buf;
+ socket.asyncReceive(&read_buf[0], read_buf.size(),
+ [this, &receive_handler_invoked]
+ (const boost::system::error_code& ec, size_t length) {
+ receive_handler_invoked = true;
+ EXPECT_TRUE(ec);
+ });
+ while (!receive_handler_invoked) {
+ io_service_.run_one();
+ }
+}
+
// Check that native socket descriptor is returned correctly when
// the socket is connected.
TEST_F(UnixDomainSocketTest, getNative) {
#include <asiolink/asio_wrapper.h>
#include <asiolink/unix_domain_socket.h>
+#include <boost/enable_shared_from_this.hpp>
#include <iostream>
using namespace boost::asio::local;
namespace asiolink {
/// @brief Implementation of the unix domain socket.
-class UnixDomainSocketImpl {
+class UnixDomainSocketImpl : public boost::enable_shared_from_this<UnixDomainSocketImpl> {
public:
/// @brief Constructor.
close();
}
+ /// @brief Asynchronously connects to an endpoint.
+ ///
+ /// This method schedules asynchronous connect and installs the
+ /// @ref UnixDomainSocketImpl::connectHandler as a callback.
+ ///
+ /// @param endpoint Reference to an endpoint to connect to.
+ /// @param handler User supplied handler to be invoked when the connection
+ /// is established or when error is signalled.
+ void asyncConnect(const stream_protocol::endpoint& endpoint,
+ const UnixDomainSocket::ConnectHandler& handler);
+
+ /// @brief Local handler invoked as a result of asynchronous connection.
+ ///
+ /// This is a wrapper around the user supplied callback. It ignores
+ /// EINPROGRESS errors which are observed on some operating systems as
+ /// a result of trying to connect asynchronously. This error code doesn't
+ /// necessarily indicate a problem and the subsequent attempts to read
+ /// and write to the socket will succeed. Therefore, the handler simply
+ /// overrides this error code with success status. The user supplied
+ /// handler don't need to deal with the EINPROGRESS error codes.
+ ///
+ /// @param remote_handler User supplied callback.
+ /// @param ec Error code returned as a result of connection.
+ void connectHandler(const UnixDomainSocket::ConnectHandler& remote_handler,
+ const boost::system::error_code& ec);
+
+ /// @brief Asynchronously sends data over the socket.
+ ///
+ /// This method schedules an asynchronous send and installs the
+ /// @ref UnixDomainSocketImpl::sendHandler as a callback.
+ ///
+ /// @param data Pointer to data to be sent.
+ /// @param length Number of bytes to be sent.
+ /// @param handler Callback to be invoked when data have been sent or an
+ /// sending error is signalled.
+ void asyncSend(const void* data, const size_t length,
+ const UnixDomainSocket::Handler& handler);
+
+ /// @brief Asynchronously sends the data over the socket.
+ ///
+ /// This method is called by the @ref asyncSend and the @ref sendHandler
+ /// if the asynchronous send has to be repeated as a result of receiving
+ /// EAGAIN or EWOULDBLOCK.
+ ///
+ /// @param buffer Buffers holding the data to be sent.
+ /// @param handler User supplied callback to be invoked when data have
+ /// been sent or sending error is signalled.
+ void doSend(const boost::asio::const_buffers_1& buffer,
+ const UnixDomainSocket::Handler& handler);
+
+
+ /// @brief Local handler invoked as a result of asynchronous send.
+ ///
+ /// This handler is invoked as a result of asynchronous send. It is a
+ /// wrapper callback around the user supplied callback. It handles
+ /// EWOULDBLOCK and EAGAIN errors by retrying an asynchronous send.
+ /// These errors are often returned on some operating systems, even
+ /// though one would expect that asynchronous operation would not
+ /// return such errors. Because these errors are handled by the
+ /// wrapper callback, the user supplied callback never receives
+ /// these errors.
+ ///
+ /// @param remote_handler User supplied callback.
+ /// @param buffer Buffers holding the data to be sent.
+ /// @param ec Error code returned as a result of sending the data.
+ /// @param length Length of the data sent.
+ void sendHandler(const UnixDomainSocket::Handler& remote_handler,
+ const boost::asio::const_buffers_1& buffer,
+ const boost::system::error_code& ec,
+ size_t length);
+
+ /// @brief Asynchronously receive data over the socket.
+ ///
+ /// This method schedules asynchronous receive and installs the
+ /// @ref UnixDomainSocket::receiveHandler is a callback.
+ ///
+ /// @param data Pointer to a buffer into which the data should be read.
+ /// @param length Length of the buffer.
+ /// @param handler User supplied callback invoked when data have been
+ /// received or an error is signalled.
+ void asyncReceive(void* data, const size_t length,
+ const UnixDomainSocket::Handler& handler);
+
+ /// @brief Asynchronously receives the data over the socket.
+ ///
+ /// This method is called @ref asyncReceive and @ref receiveHandler when
+ /// EWOULDBLOCK or EAGAIN is returned.
+ ///
+ /// @param buffer A buffer into which the data should be received.
+ /// @param handler User supplied callback invoked when data have been
+ /// received on an error is signalled.
+ void doReceive(const boost::asio::mutable_buffers_1& buffer,
+ const UnixDomainSocket::Handler& handler);
+
+ /// @brief Local handler invoked as a result of asynchronous receive.
+ ///
+ /// This handler is invoked as a result of asynchronous receive. It is a
+ /// wrapper callback around the user supplied callback. It handles
+ /// EWOULDBLOCK and EAGAIN by retrying to asynchronously receive the
+ /// data. These errors are often returned on some operating systems, even
+ /// though one would expect that asynchronous operation would not
+ /// return such errors. Because these errors are handled by the
+ /// wrapper callback, the user supplied callback never receives
+ /// these errors.
+ ///
+ /// @param remote_handler User supplied callback.
+ /// @param buffer Buffer into which the data are received.
+ /// @param ec Error code returned as a result of asynchronous receive.
+ /// @param length Size of the received data.
+ void receiveHandler(const UnixDomainSocket::Handler& remote_handler,
+ const boost::asio::mutable_buffers_1& buffer,
+ const boost::system::error_code& ec,
+ size_t length);
+
/// @brief Closes the socket.
void close();
stream_protocol::socket socket_;
};
+void
+UnixDomainSocketImpl::asyncConnect(const stream_protocol::endpoint& endpoint,
+ const UnixDomainSocket::ConnectHandler& handler) {
+ using namespace std::placeholders;
+
+ UnixDomainSocket::ConnectHandler local_handler =
+ std::bind(&UnixDomainSocketImpl::connectHandler, shared_from_this(),
+ handler, _1);
+ socket_.async_connect(endpoint, local_handler);
+}
+
+void
+UnixDomainSocketImpl::connectHandler(const UnixDomainSocket::ConnectHandler& remote_handler,
+ const boost::system::error_code& ec) {
+ // It was observed on Debian and Fedora that asynchronous connect may result
+ // in EINPROGRESS error. This doesn't really indicate a problem with a
+ // connection. If we continue transmitting data over the socket it will
+ // succeed. So we suppress this error and return 'success' to the user's
+ // handler.
+ if (ec.value() == boost::asio::error::in_progress) {
+ remote_handler(boost::system::error_code());
+ } else {
+ remote_handler(ec);
+ }
+}
+
+void
+UnixDomainSocketImpl::asyncSend(const void* data, const size_t length,
+ const UnixDomainSocket::Handler& handler) {
+ doSend(boost::asio::buffer(data, length), handler);
+}
+
+void
+UnixDomainSocketImpl::doSend(const boost::asio::const_buffers_1& buffer,
+ const UnixDomainSocket::Handler& handler) {
+ using namespace std::placeholders;
+
+ UnixDomainSocket::Handler local_handler =
+ std::bind(&UnixDomainSocketImpl::sendHandler, shared_from_this(),
+ handler, buffer, _1, _2);
+ socket_.async_send(buffer, local_handler);
+}
+
+void
+UnixDomainSocketImpl::sendHandler(const UnixDomainSocket::Handler& remote_handler,
+ const boost::asio::const_buffers_1& buffer,
+ const boost::system::error_code& ec,
+ size_t length) {
+ // The asynchronous send may return EWOULDBLOCK or EAGAIN on some
+ // operating systems. In this case, we simply retry hoping that it
+ // will succeed next time. The user's callback never sees these
+ // errors.
+ if ((ec.value() == boost::asio::error::would_block) ||
+ (ec.value() == boost::asio::error::try_again)) {
+ doSend(buffer, remote_handler);
+ }
+ remote_handler(ec, length);
+}
+
+void
+UnixDomainSocketImpl::asyncReceive(void* data, const size_t length,
+ const UnixDomainSocket::Handler& handler) {
+ doReceive(boost::asio::buffer(data, length), handler);
+}
+
+void
+UnixDomainSocketImpl::doReceive(const boost::asio::mutable_buffers_1& buffer,
+ const UnixDomainSocket::Handler& handler) {
+ using namespace std::placeholders;
+
+ UnixDomainSocket::Handler local_handler =
+ std::bind(&UnixDomainSocketImpl::receiveHandler, shared_from_this(),
+ handler, buffer, _1, _2);
+ socket_.async_receive(buffer, 0, local_handler);
+}
+
+void
+UnixDomainSocketImpl::receiveHandler(const UnixDomainSocket::Handler& remote_handler,
+ const boost::asio::mutable_buffers_1& buffer,
+ const boost::system::error_code& ec,
+ size_t length) {
+ // The asynchronous receive may return EWOULDBLOCK or EAGAIN on some
+ // operating systems. In this case, we simply retry hoping that it
+ // will succeed next time. The user's callback never sees these
+ // errors.
+ if ((ec.value() == boost::asio::error::would_block) ||
+ (ec.value() == boost::asio::error::try_again)) {
+ doReceive(buffer, remote_handler);
+ }
+ remote_handler(ec, length);
+}
+
void
UnixDomainSocketImpl::close() {
static_cast<void>(socket_.close());
}
}
+void
+UnixDomainSocket::asyncConnect(const std::string& path, const ConnectHandler& handler) {
+ impl_->asyncConnect(stream_protocol::endpoint(path.c_str()), handler);
+}
+
size_t
UnixDomainSocket::write(const void* data, size_t length) {
boost::system::error_code ec;
return (res);
}
+void
+UnixDomainSocket::asyncSend(const void* data, const size_t length,
+ const Handler& handler) {
+ impl_->asyncSend(data, length, handler);
+}
+
size_t
UnixDomainSocket::receive(void* data, size_t length) {
boost::system::error_code ec;
return (res);
}
+void
+UnixDomainSocket::asyncReceive(void* data, const size_t length,
+ const Handler& handler) {
+ impl_->asyncReceive(data, length, handler);
+}
+
void
UnixDomainSocket::close() {
impl_->close();
}
-}
-}
+} // end of namespace asiolink
+} // end of namespace isc
#include <asiolink/io_service.h>
#include <asiolink/io_socket.h>
#include <boost/shared_ptr.hpp>
+#include <functional>
#include <string>
namespace isc {
class UnixDomainSocket : public IOSocket {
public:
+ /// @brief Callback type used in call to @ref UnixDomainSocket::asyncConnect.
+ typedef std::function<void(const boost::system::error_code&)> ConnectHandler;
+
+ /// @brief Callback type used in calls to @ref UnixDomainSocket::asyncSend
+ /// and @ref UnixDomainSocket::asyncReceive.
+ typedef std::function<void(const boost::system::error_code&, size_t)> Handler;
+
/// @brief Constructor.
///
/// @param io_service Reference to IOService to be used by this
/// @throw UnixDomainSocketError if error occurs.
void connect(const std::string& path);
+ /// @brief Asynchronously connects the socket to the specified endpoint.
+ ///
+ /// Always returns immediatelly.
+ ///
+ /// @param path Path to the unix socket to which we should connect.
+ /// @param handler Callback to be invoked when connection is established or
+ /// a connection error is signalled.
+ void asyncConnect(const std::string& path, const ConnectHandler& handler);
+
/// @brief Writes specified amount of data to a socket.
///
/// @param data Pointer to data to be written.
/// @throw UnixDomainSocketError if error occurs.
size_t write(const void* data, size_t length);
+ /// @brief Asynchronously sends data over the socket.
+ ///
+ /// Always returns immediatelly.
+ ///
+ /// @param data Pointer to data to be sent.
+ /// @param length Number of bytes to be sent.
+ /// @param handler Callback to be invoked when data have been sent or an
+ /// sending error is signalled.
+ void asyncSend(const void* data, const size_t length, const Handler& handler);
+
/// @brief Receives data from a socket.
///
/// @param [out] data Pointer to a location into which the read data should
/// @throw UnixDomainSocketError if error occurs.
size_t receive(void* data, size_t length);
+ /// @brief Asynchronously receives data over the socket.
+ ///
+ /// Always returns immediatelly.
+ /// @param [out] data Pointer to a location into which the read data should
+ /// be stored.
+ /// @param length Length of the buffer.
+ /// @param handler Callback to be invoked when data have been received or an
+ /// error is signalled.
+ void asyncReceive(void* data, const size_t length, const Handler& handler);
+
/// @brief Closes the socket.
void close();