received_ec = ec;
received_feed = feed;
// Stop the IO service so as we can continue.
- io_service->stop();
+ io_service->stopWork();
}, ClientConnection::Timeout(CONNECTION_TIMEOUT));
io_service->run();
///
/// @param response Stub response to be sent from the server socket to the
/// client.
- void bindServerSocket(const std::string& response) {
+ /// @param use_thread Indicates if the IO service will be ran in thread.
+ void bindServerSocket(const std::string& response,
+ const bool use_thread = false) {
server_socket_.reset(new test::TestServerUnixSocket(*getIOService(),
unixSocketFilePath(),
response));
server_socket_->startTimer(TEST_TIMEOUT);
- server_socket_->bindServerSocket();
+ server_socket_->bindServerSocket(use_thread);
}
/// @brief Creates command with no arguments.
// Configure client side socket.
configureControlSocket(server_type);
// Create server side socket.
- bindServerSocket(server_response);
+ bindServerSocket(server_response, true);
// The client side communication is synchronous. To be able to respond
// to this we need to run the server side socket at the same time.
// Running IO service in a thread guarantees that the server responds
// as soon as it receives the control command.
- isc::util::thread::Thread th(boost::bind(&IOService::run, getIOService().get()));
+ isc::util::thread::Thread th(boost::bind(&IOService::run,
+ getIOService().get()));
+
+
+ // Wait for the IO service in thread to actually run.
+ server_socket_->waitForRunning();
ConstElementPtr command = createCommand("foo", service);
ConstElementPtr answer = mgr_.handleCommand("foo", ConstElementPtr(),
command);
- getIOService()->stop();
+ server_socket_->stopServer();
+ getIOService()->stopWork();
th.wait();
+ EXPECT_EQ(expected_responses, server_socket_->getResponseNum());
+
checkAnswer(answer, expected_result0, expected_result1, expected_result2);
}
// Configure client side socket.
configureControlSocket(CtrlAgentCfgContext::TYPE_DHCP4);
// Create server side socket.
- bindServerSocket("{ \"result\" : 3 }");
+ bindServerSocket("{ \"result\" : 3 }", true);
// The client side communication is synchronous. To be able to respond
// to this we need to run the server side socket at the same time.
// as soon as it receives the control command.
isc::util::thread::Thread th(boost::bind(&IOService::run, getIOService().get()));
+ // Wait for the IO service in thread to actually run.
+ server_socket_->waitForRunning();
+
ConstElementPtr command = createCommand("list-commands", "dhcp4");
ConstElementPtr answer = mgr_.handleCommand("list-commands", ConstElementPtr(),
command);
- getIOService()->stop();
+ server_socket_->stopServer();
+ getIOService()->stopWork();
th.wait();
-// Copyright (C) 2011-2016 Internet Systems Consortium, Inc. ("ISC")
+// Copyright (C) 2011-2017 Internet Systems Consortium, Inc. ("ISC")
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
#include <unistd.h> // for some IPC/network system calls
#include <netinet/in.h>
+#include <boost/shared_ptr.hpp>
#include <sys/socket.h>
namespace isc {
/// \brief The constructor
IOServiceImpl() :
io_service_(),
- work_(io_service_)
+ work_(new boost::asio::io_service::work(io_service_))
{};
/// \brief The destructor.
~IOServiceImpl() {};
/// This will return the control to the caller of the \c run() method.
void stop() { io_service_.stop();} ;
+ /// \brief Removes IO service work object to let it finish running
+ /// when all handlers have been invoked.
+ void stopWork() {
+ work_.reset();
+ }
+
/// \brief Return the native \c io_service object used in this wrapper.
///
/// This is a short term work around to support other Kea modules
}
private:
boost::asio::io_service io_service_;
- boost::asio::io_service::work work_;
+ boost::shared_ptr<boost::asio::io_service::work> work_;
};
IOService::IOService() {
io_impl_->stop();
}
+void
+IOService::stopWork() {
+ io_impl_->stopWork();
+}
+
boost::asio::io_service&
IOService::get_io_service() {
return (io_impl_->get_io_service());
-// Copyright (C) 2011-2015 Internet Systems Consortium, Inc. ("ISC")
+// Copyright (C) 2011-2017 Internet Systems Consortium, Inc. ("ISC")
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
/// This will return the control to the caller of the \c run() method.
void stop();
+ /// \brief Removes IO service work object to let it finish running
+ /// when all handlers have been invoked.
+ void stopWork();
+
/// \brief Return the native \c io_service object used in this wrapper.
///
/// This is a short term work around to support other Kea modules
sent_response_callback_(sent_response_callback) {
}
+ /// @brief Starts asynchronous read from the socket.
void start() {
socket_->async_read_some(boost::asio::buffer(&raw_buf_[0], raw_buf_.size()),
boost::bind(&Connection::readHandler, shared_from_this(),
boost::asio::placeholders::bytes_transferred));
}
+ /// @brief Closes the socket.
+ void stop() {
+ socket_->close();
+ }
+
/// @brief Handler invoked when data have been received over the socket.
///
/// This is the handler invoked when the data have been received over the
///
/// @param io_service Reference to the IO service.
ConnectionPool(IOService& io_service)
- : io_service_(io_service), next_socket_(),
+ : io_service_(io_service), connections_(), next_socket_(),
response_num_(0) {
}
/// @brief Destructor.
~ConnectionPool() {
+ stopAll();
}
/// @brief Creates new unix domain socket and returns it.
}));
conn->start();
+ connections_.insert(conn);
next_socket_.reset();
}
+ /// @brief Stops the given connection.
+ ///
+ /// @param conn Pointer to the connection to be stopped.
+ void stop(const ConnectionPtr& conn) {
+ conn->stop();
+ connections_.erase(conn);
+ }
+
+ /// @brief Stops all connections.
+ void stopAll() {
+ for (auto conn = connections_.begin(); conn != connections_.end();
+ ++conn) {
+ (*conn)->stop();
+ }
+ connections_.clear();
+ }
+
/// @brief Returns number of responses sent so far.
size_t getResponseNum() const {
return (response_num_);
/// @brief Reference to the IO service.
IOService& io_service_;
+ /// @brief Container holding established connections.
+ std::set<ConnectionPtr> connections_;
+
/// @brief Holds pointer to the generated socket.
///
/// This socket will be used by the next connection.
test_timer_(io_service_),
custom_response_(custom_response),
connection_pool_(new ConnectionPool(io_service)),
- stopped_(false) {
+ stopped_(false),
+ running_(false) {
}
TestServerUnixSocket::~TestServerUnixSocket() {
+ server_acceptor_.close();
}
void
}
void
-TestServerUnixSocket::bindServerSocket() {
+TestServerUnixSocket::stopServer() {
+ test_timer_.cancel();
+ server_acceptor_.cancel();
+ connection_pool_->stopAll();
+}
+
+void
+TestServerUnixSocket::bindServerSocket(const bool use_thread) {
server_acceptor_.open();
server_acceptor_.bind(server_endpoint_);
server_acceptor_.listen();
accept();
+
+ if (use_thread) {
+ io_service_.post(boost::bind(&TestServerUnixSocket::signalRunning,
+ this));
+ }
}
void
boost::asio::placeholders::error));
}
+void
+TestServerUnixSocket::signalRunning() {
+ {
+ isc::util::thread::Mutex::Locker lock(mutex_);
+ running_ = true;
+ }
+ condvar_.signal();
+}
+
+void
+TestServerUnixSocket::waitForRunning() {
+ isc::util::thread::Mutex::Locker lock(mutex_);
+ while (!running_) {
+ condvar_.wait(mutex_);
+ }
+}
+
void
TestServerUnixSocket::timeoutHandler() {
ADD_FAILURE() << "Timeout occurred while running the test!";
#include <config.h>
#include <asiolink/interval_timer.h>
#include <asiolink/io_service.h>
+#include <util/threads/thread.h>
+#include <util/threads/sync.h>
#include <boost/shared_ptr.hpp>
#include <gtest/gtest.h>
#include <list>
/// @param test_timeout Test timeout in milliseconds.
void startTimer(const long test_timeout);
+ /// @brief Cancels all asynchronous operations.
+ void stopServer();
+
/// @brief Generates response of a given length.
///
/// @param response_size Desired response size.
void generateCustomResponse(const uint64_t response_size);
/// @brief Creates and binds server socket.
- void bindServerSocket();
+ ///
+ /// @param use_thread Boolean value indicating if the IO service
+ /// is running in thread.
+ void bindServerSocket(const bool use_thread = false);
/// @brief Server acceptor handler.
///
/// @brief Return number of responses sent so far to the clients.
size_t getResponseNum() const;
- /// @brief Checks if IO service has been stopped as a result of the
- /// timeout.
- bool isStopped() const {
+ /// @brief Indicates if the server has been stopped.
+ bool isStopped() {
return (stopped_);
}
+ /// @brief Waits for the server signal that it is running.
+ ///
+ /// When the caller starts the service he indicates whether
+ /// IO service will be running in thread or not. If threads
+ /// are used the caller has to wait for the IO service to
+ /// actually run. In such case this function should be invoked
+ /// which waits for a posted callback to be executed. When this
+ /// happens it means that IO service is running and the main
+ /// thread can move forward.
+ void waitForRunning();
+
private:
/// @brief Asynchronously accept new connections.
void accept();
+ /// @brief Handler invoked to signal that server is running.
+ ///
+ /// This is used only when thread is used to run IO service.
+ void signalRunning();
+
/// @brief IO service used by the tests.
IOService& io_service_;
/// @brief Indicates if IO service has been stopped as a result of
/// a timeout.
bool stopped_;
+
+ /// @brief Indicates if the server in a thread is running.
+ bool running_;
+
+ /// @brief Mutex used by the server.
+ isc::util::thread::Mutex mutex_;
+
+ /// @brief Conditional variable used by the server.
+ isc::util::thread::CondVar condvar_;
};
/// @brief Pointer to the @ref TestServerUnixSocket.
const ClientConnection::Timeout& timeout) {
// Start the timer protecting against timeouts.
timer_.setup(boost::bind(&ClientConnectionImpl::timeoutCallback,
- shared_from_this(), handler),
+ this, handler),
timeout.timeout_, IntervalTimer::ONE_SHOT);
// Store the command in the class member to make sure it is valid
ClientConnectionImpl::terminate(const boost::system::error_code& ec,
ClientConnection::Handler handler) {
try {
+ timer_.cancel();
current_command_.clear();
handler(ec, feed_);