///
/// @param response Stub response to be sent from the server socket to the
/// client.
- /// @param stop_after_count Number of received messages received over the
- /// server socket after which the IO service should be stopped.
- void bindServerSocket(const std::string& response,
- const unsigned int stop_after_count = 1) {
+ void bindServerSocket(const std::string& response) {
server_socket_.reset(new test::TestServerUnixSocket(*getIOService(),
unixSocketFilePath(),
TEST_TIMEOUT,
response));
- server_socket_->bindServerSocket(stop_after_count);
+ server_socket_->bindServerSocket();
}
/// @brief Creates command with no arguments.
/// @param expected_result0 Expected first result in response from the server.
/// @param expected_result1 Expected second result in response from the server.
/// @param expected_result2 Expected third result in response from the server.
- /// @param stop_after_count Number of received messages received over the
/// server socket after which the IO service should be stopped.
+ /// @param expected_responses Number of responses after which the test finishes.
/// @param server_response Stub response to be sent by the server.
void testForward(const CtrlAgentCfgContext::ServerType& server_type,
const std::string& service,
const int expected_result0,
const int expected_result1 = -1,
const int expected_result2 = -1,
- const unsigned stop_after_count = 1,
+ const size_t expected_responses = 1,
const std::string& server_response = "{ \"result\": 0 }") {
// Configure client side socket.
configureControlSocket(server_type);
// Create server side socket.
- bindServerSocket(server_response, stop_after_count);
+ bindServerSocket(server_response);
// The client side communication is synchronous. To be able to respond
// to this we need to run the server side socket at the same time.
// Running IO service in a thread guarantees that the server responds
// as soon as it receives the control command.
- isc::util::thread::Thread(boost::bind(&IOService::run,
- getIOService().get()));
+ isc::util::thread::Thread(boost::bind(&CtrlAgentCommandMgrTest::runIO,
+ getIOService(), server_socket_,
+ expected_responses));
ConstElementPtr command = createCommand("foo", service);
ConstElementPtr answer = mgr_.handleCommand("foo", ConstElementPtr(),
checkAnswer(answer, expected_result0, expected_result1, expected_result2);
}
+ /// @brief Runs IO service until number of sent responses is lower than
+ /// expected.
+ ///
+ /// @param server_socket Pointer to the server socket.
+ /// @param expected_responses Number of expected responses.
+ static void runIO(IOServicePtr& io_service,
+ const test::TestServerUnixSocketPtr& server_socket,
+ const size_t expected_responses) {
+ while (server_socket->getResponseNum() < expected_responses) {
+ io_service->run_one();
+ }
+ }
+
+
+ CtrlAgentCommandMgrTest* getTestSelf() {
+ return (this);
+ }
+
/// @brief a convenience reference to control agent command manager
CtrlAgentCommandMgr& mgr_;
// to this we need to run the server side socket at the same time.
// Running IO service in a thread guarantees that the server responds
// as soon as it receives the control command.
- isc::util::thread::Thread(boost::bind(&IOService::run,
- getIOService().get()));
+ isc::util::thread::Thread(boost::bind(&CtrlAgentCommandMgrTest::runIO,
+ getIOService(), server_socket_, 1));
ConstElementPtr command = createCommand("list-commands", "dhcp4");
ConstElementPtr answer = mgr_.handleCommand("list-commands", ConstElementPtr(),
ASSERT_EQ(outbound_data.size(), sent_size);
// Run IO service to generate server's response.
- io_service_.run();
+ while (test_socket_.getResponseNum() < 1) {
+ io_service_.run_one();
+ }
// Receive response from the socket.
std::array<char, 1024> read_buf;
#include <asiolink/asio_wrapper.h>
#include <asiolink/testutils/test_server_unix_socket.h>
#include <boost/bind.hpp>
+#include <boost/shared_ptr.hpp>
+#include <functional>
+#include <set>
+
+using namespace boost::asio::local;
namespace isc {
namespace asiolink {
namespace test {
+typedef stream_protocol::socket UnixSocket;
+typedef boost::shared_ptr<UnixSocket> UnixSocketPtr;
+
+typedef std::function<void()> SentResponseCallback;
+
+class Connection {
+public:
+
+ Connection(const UnixSocketPtr& unix_socket,
+ const std::string custom_response,
+ const SentResponseCallback& sent_response_callback)
+ : socket_(unix_socket), custom_response_(custom_response),
+ sent_response_callback_(sent_response_callback) {
+ socket_->async_read_some(boost::asio::buffer(&raw_buf_[0], raw_buf_.size()),
+ boost::bind(&Connection::readHandler, this, _1, _2));
+ }
+
+ void
+ readHandler(const boost::system::error_code&, size_t bytes_transferred) {
+ if (!custom_response_.empty()) {
+ boost::asio::write(*socket_,
+ boost::asio::buffer(custom_response_.c_str(), custom_response_.size()));
+
+ } else {
+ std::string received(&raw_buf_[0], bytes_transferred);
+ std::string response("received " + received);
+ boost::asio::write(*socket_,
+ boost::asio::buffer(response.c_str(), response.size()));
+ }
+
+ sent_response_callback_();
+ }
+
+ void stop() {
+ socket_->close();
+ }
+
+private:
+
+ UnixSocketPtr socket_;
+
+ std::string custom_response_;
+
+ /// @brief Receive buffer.
+ std::array<char, 1024> raw_buf_;
+
+ SentResponseCallback sent_response_callback_;
+
+};
+
+typedef boost::shared_ptr<Connection> ConnectionPtr;
+
+class ConnectionPool {
+public:
+
+ ConnectionPool(IOService& io_service)
+ : io_service_(io_service), connections_(), next_socket_(),
+ response_num_(0) {
+ }
+
+ UnixSocketPtr getSocket() {
+ if (!next_socket_) {
+ next_socket_.reset(new UnixSocket(io_service_.get_io_service()));
+ }
+ return (next_socket_);
+ }
+
+ void start(const std::string& custom_response) {
+ ConnectionPtr conn(new Connection(next_socket_, custom_response, [this] {
+ ++response_num_;
+ }));
+
+ connections_.insert(conn);
+ next_socket_.reset();
+ }
+
+ void stop(const ConnectionPtr& conn) {
+ conn->stop();
+ connections_.erase(conn);
+ }
+
+ void stopAll() {
+ for (auto conn = connections_.begin(); conn != connections_.end();
+ ++conn) {
+ (*conn)->stop();
+ }
+ connections_.clear();
+ }
+
+ size_t getResponseNum() const {
+ return (response_num_);
+ }
+
+private:
+
+ IOService& io_service_;
+
+ std::set<ConnectionPtr> connections_;
+
+ UnixSocketPtr next_socket_;
+
+ size_t response_num_;
+};
+
+
TestServerUnixSocket::TestServerUnixSocket(IOService& io_service,
const std::string& socket_file_path,
const long test_timeout,
: io_service_(io_service),
server_endpoint_(socket_file_path),
server_acceptor_(io_service_.get_io_service()),
- server_sockets_(),
test_timer_(io_service_),
custom_response_(custom_response),
- stop_after_count_(1),
- read_count_(0) {
+ connection_pool_(new ConnectionPool(io_service)) {
test_timer_.setup(boost::bind(&TestServerUnixSocket::timeoutHandler, this),
test_timeout, IntervalTimer::ONE_SHOT);
}
TestServerUnixSocket::~TestServerUnixSocket() {
- for (auto sock = server_sockets_.begin(); sock != server_sockets_.end(); ++sock) {
- sock->close();
- }
+ connection_pool_->stopAll();
}
void
-TestServerUnixSocket::bindServerSocket(const unsigned int stop_after_count) {
+TestServerUnixSocket::bindServerSocket() {
server_acceptor_.open();
server_acceptor_.bind(server_endpoint_);
server_acceptor_.listen();
accept();
-
- stop_after_count_ = stop_after_count;
}
void
TestServerUnixSocket::acceptHandler(const boost::system::error_code&) {
- server_sockets_.back().async_read_some(boost::asio::buffer(&raw_buf_[0],
- raw_buf_.size()),
- boost::bind(&TestServerUnixSocket::readHandler, this, _1, _2));
+ connection_pool_->start(custom_response_);
+ accept();
}
void
TestServerUnixSocket::accept() {
- server_sockets_.push_back(boost::asio::local::stream_protocol::
- socket(io_service_.get_io_service()));
- server_acceptor_.async_accept(server_sockets_.back(),
- boost::bind(&TestServerUnixSocket::
- acceptHandler, this, _1));
-}
-
-
-void
-TestServerUnixSocket::readHandler(const boost::system::error_code&,
- size_t bytes_transferred) {
- if (!custom_response_.empty()) {
- boost::asio::write(server_sockets_.back(),
- boost::asio::buffer(custom_response_.c_str(),
- custom_response_.size()));
-
- } else {
- std::string received(&raw_buf_[0], bytes_transferred);
- std::string response("received " + received);
- boost::asio::write(server_sockets_.back(),
- boost::asio::buffer(response.c_str(), response.size()));
- }
-
- // Stop IO service if we have reached the maximum number of read messages.
- if (++read_count_ >= stop_after_count_) {
- io_service_.stop();
-
- } else {
- // Previous connection is done, so let's accept another connection.
- accept();
- }
+ server_acceptor_.async_accept(*(connection_pool_->getSocket()),
+ boost::bind(&TestServerUnixSocket::acceptHandler, this, _1));
}
void
io_service_.stop();
}
+size_t
+TestServerUnixSocket::getResponseNum() const {
+ return (connection_pool_->getResponseNum());
+}
+
+
} // end of namespace isc::asiolink::test
} // end of namespace isc::asiolink
} // end of namespace isc
#include <asiolink/io_service.h>
#include <boost/shared_ptr.hpp>
#include <gtest/gtest.h>
-#include <array>
#include <list>
#include <string>
namespace asiolink {
namespace test {
+class ConnectionPool;
+
/// @brief Provides unix domain socket functionality for unit tests.
class TestServerUnixSocket {
public:
~TestServerUnixSocket();
/// @brief Creates and binds server socket.
- ///
- /// @param stop_after_count Number of received messages after which the
- /// IO service should be stopped.
- void bindServerSocket(const unsigned int stop_after_count = 1);
+ void bindServerSocket();
/// @brief Server acceptor handler.
///
/// @param ec Error code.
void acceptHandler(const boost::system::error_code& ec);
- /// @brief Server read handler.
- ///
- /// @param ec Error code.
- /// @param bytes_transferred Number of bytes read.
- void readHandler(const boost::system::error_code& ec,
- size_t bytes_transferred);
-
/// @brief Callback function invoke upon test timeout.
///
/// It stops the IO service and reports test timeout.
void timeoutHandler();
+ /// @brief Return number of responses sent so far to the clients.
+ size_t getResponseNum() const;
+
private:
/// @brief Asynchronously accept new connections.
/// @brief Server acceptor.
boost::asio::local::stream_protocol::acceptor server_acceptor_;
- /// @brief Server side unix domain sockets.
- std::list<boost::asio::local::stream_protocol::socket> server_sockets_;
-
- /// @brief Receive buffer.
- std::array<char, 1024> raw_buf_;
-
/// @brief Asynchronous timer service to detect timeouts.
IntervalTimer test_timer_;
/// @brief Holds custom response to be sent to the client.
std::string custom_response_;
- /// @brief Number of messages received after which IO service gets stopped.
- unsigned int stop_after_count_;
-
- /// @brief Number of messages received so far.
- unsigned int read_count_;
+ /// @brief Pool of connections.
+ boost::shared_ptr<ConnectionPool> connection_pool_;
};
/// @brief Pointer to the @ref TestServerUnixSocket.