#include <asiolink/unix_domain_socket.h>
#include <cc/command_interpreter.h>
#include <cc/data.h>
+#include <cc/json_feed.h>
+#include <config/client_connection.h>
#include <boost/pointer_cast.hpp>
#include <iterator>
#include <string>
using namespace isc::hooks;
using namespace isc::process;
+namespace {
+
+/// @brief Client side connection timeout.
+const long CONNECTION_TIMEOUT = 5000;
+
+}
+
namespace isc {
namespace agent {
std::string socket_name = socket_info->get("socket-name")->stringValue();
// Forward command and receive reply.
- IOService io_service;
- UnixDomainSocket unix_socket(io_service);
- size_t receive_len;
- try {
- unix_socket.connect(socket_name);
- std::string wire_command = command->toWire();
- unix_socket.write(&wire_command[0], wire_command.size());
- receive_len = unix_socket.receive(&receive_buf_[0], receive_buf_.size());
-
- } catch (const std::exception& ex) {
+ IOServicePtr io_service(new IOService());;
+ ClientConnection conn(*io_service);
+ boost::system::error_code received_ec;
+ ConstJSONFeedPtr received_feed;
+ conn.start(ClientConnection::SocketPath(socket_name),
+ ClientConnection::ControlCommand(command->toWire()),
+ [&io_service, &received_ec, &received_feed]
+ (const boost::system::error_code& ec, ConstJSONFeedPtr feed) {
+ // Capture error code and parsed data.
+ received_ec = ec;
+ received_feed = feed;
+ // Stop the IO service so as we can continue.
+ io_service->stop();
+ }, ClientConnection::Timeout(CONNECTION_TIMEOUT));
+ io_service->run();
+
+ if (received_ec) {
isc_throw(CommandForwardingError, "unable to forward command to the "
- << service << " service: " << ex.what() << ". The server "
- "is likely to be offline");
+ << service << " service: " << received_ec.message()
+ << ". The server is likely to be offline");
}
- // This is really not possible right now, but when we migrate to the
- // solution using timeouts it is possible that the response is not
- // received.
- if (receive_len == 0) {
- isc_throw(CommandForwardingError, "internal server error: no answer"
- " received from the server to the forwarded message");
+ // This shouldn't happen because the fact that there was no time out indicates
+ // that the whole response has been read and it should be stored within the
+ // feed. But, let's check to prevent assertions.
+ if (!received_feed) {
+ isc_throw(CommandForwardingError, "internal server error: empty response"
+ " received from the unix domain socket");
}
- std::string reply(&receive_buf_[0], receive_len);
-
ConstElementPtr answer;
try {
- answer = Element::fromJSON(reply);
+ answer = received_feed->toElement();
LOG_INFO(agent_logger, CTRL_AGENT_COMMAND_FORWARDED)
.arg(cmd_name).arg(service);
// to this we need to run the server side socket at the same time.
// Running IO service in a thread guarantees that the server responds
// as soon as it receives the control command.
- isc::util::thread::Thread(boost::bind(&CtrlAgentCommandMgrTest::runIO,
- getIOService(), server_socket_,
- expected_responses));
+ isc::util::thread::Thread th(boost::bind(&IOService::run, getIOService().get()));
ConstElementPtr command = createCommand("foo", service);
ConstElementPtr answer = mgr_.handleCommand("foo", ConstElementPtr(),
command);
- checkAnswer(answer, expected_result0, expected_result1, expected_result2);
- }
+ getIOService()->stop();
- /// @brief Runs IO service until number of sent responses is lower than
- /// expected.
- ///
- /// @param server_socket Pointer to the server socket.
- /// @param expected_responses Number of expected responses.
- static void runIO(IOServicePtr& io_service,
- const test::TestServerUnixSocketPtr& server_socket,
- const size_t expected_responses) {
- while (server_socket->getResponseNum() < expected_responses) {
- io_service->run_one();
- }
+ th.wait();
+
+ checkAnswer(answer, expected_result0, expected_result1, expected_result2);
}
/// @brief a convenience reference to control agent command manager
TEST_F(CtrlAgentCommandMgrTest, invalidAnswer) {
testForward(CtrlAgentCfgContext::TYPE_DHCP6, "dhcp6",
isc::config::CONTROL_RESULT_ERROR, -1, -1, 1,
- "{ \"result\": 0");
+ "{ \"result\": }");
}
/// Check that error is returned to the client if the forwarding socket is
// to this we need to run the server side socket at the same time.
// Running IO service in a thread guarantees that the server responds
// as soon as it receives the control command.
- isc::util::thread::Thread(boost::bind(&CtrlAgentCommandMgrTest::runIO,
- getIOService(), server_socket_, 1));
+ isc::util::thread::Thread th(boost::bind(&IOService::run, getIOService().get()));
ConstElementPtr command = createCommand("list-commands", "dhcp4");
ConstElementPtr answer = mgr_.handleCommand("list-commands", ConstElementPtr(),
command);
+ getIOService()->stop();
+
+ th.wait();
+
// Answer of 3 is specific to the stub response we send when the
// command is forwarded. So having this value returned means that
// the command was forwarded as expected.
#include <asiolink/asio_wrapper.h>
#include <asiolink/testutils/test_server_unix_socket.h>
#include <boost/bind.hpp>
+#include <boost/enable_shared_from_this.hpp>
#include <boost/shared_ptr.hpp>
#include <functional>
#include <set>
/// @brief Connection to the server over unix domain socket.
///
/// It reads the data over the socket, sends responses and closes a socket.
-class Connection {
+class Connection : public boost::enable_shared_from_this<Connection> {
public:
/// @brief Constructor.
/// server sends a response.
Connection(const UnixSocketPtr& unix_socket,
const std::string custom_response,
- const SentResponseCallback& sent_response_callback)
+ SentResponseCallback sent_response_callback)
: socket_(unix_socket), custom_response_(custom_response),
sent_response_callback_(sent_response_callback) {
+ }
+
+ void start() {
socket_->async_read_some(boost::asio::buffer(&raw_buf_[0], raw_buf_.size()),
- boost::bind(&Connection::readHandler, this, _1, _2));
+ boost::bind(&Connection::readHandler, shared_from_this(),
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred));
}
/// @brief Handler invoked when data have been received over the socket.
boost::asio::buffer(response.c_str(), response.size()));
}
+ start();
+
// Invoke callback function to notify that the response has been sent.
sent_response_callback_();
}
- /// @brief Closes the socket.
- void stop() {
- socket_->close();
- }
-
private:
/// @brief Pointer to the unix domain socket.
///
/// @param io_service Reference to the IO service.
ConnectionPool(IOService& io_service)
- : io_service_(io_service), connections_(), next_socket_(),
+ : io_service_(io_service), next_socket_(),
response_num_(0) {
}
/// @brief Destructor.
~ConnectionPool() {
- stopAll();
}
/// @brief Creates new unix domain socket and returns it.
ConnectionPtr conn(new Connection(next_socket_, custom_response, [this] {
++response_num_;
}));
+ conn->start();
- connections_.insert(conn);
next_socket_.reset();
}
- /// @brief Stops the given connection.
- ///
- /// @param conn Pointer to the connection to be stopped.
- void stop(const ConnectionPtr& conn) {
- conn->stop();
- connections_.erase(conn);
- }
-
- /// @brief Stops all connections.
- void stopAll() {
- for (auto conn = connections_.begin(); conn != connections_.end();
- ++conn) {
- (*conn)->stop();
- }
- connections_.clear();
- }
-
/// @brief Returns number of responses sent so far.
size_t getResponseNum() const {
return (response_num_);
/// @brief Reference to the IO service.
IOService& io_service_;
- /// @brief Container holding established connections.
- std::set<ConnectionPtr> connections_;
-
/// @brief Holds pointer to the generated socket.
///
/// This socket will be used by the next connection.
}
TestServerUnixSocket::~TestServerUnixSocket() {
- connection_pool_->stopAll();
}
void
void
TestServerUnixSocket::startTimer(const long test_timeout) {
- test_timer_.setup(boost::bind(&TestServerUnixSocket::timeoutHandler,
- shared_from_this()),
+ test_timer_.setup(boost::bind(&TestServerUnixSocket::timeoutHandler, this),
test_timeout, IntervalTimer::ONE_SHOT);
}
}
void
-TestServerUnixSocket::acceptHandler(const boost::system::error_code&) {
+TestServerUnixSocket::acceptHandler(const boost::system::error_code& ec) {
+ if (ec) {
+ return;
+ }
connection_pool_->start(custom_response_);
accept();
}
void
TestServerUnixSocket::accept() {
server_acceptor_.async_accept(*(connection_pool_->getSocket()),
- boost::bind(&TestServerUnixSocket::acceptHandler, shared_from_this(), _1));
+ boost::bind(&TestServerUnixSocket::acceptHandler, this,
+ boost::asio::placeholders::error));
}
void
return (connection_pool_->getResponseNum());
}
-
} // end of namespace isc::asiolink::test
} // end of namespace isc::asiolink
} // end of namespace isc
#include <config.h>
#include <asiolink/interval_timer.h>
#include <asiolink/io_service.h>
-#include <boost/enable_shared_from_this.hpp>
#include <boost/shared_ptr.hpp>
#include <gtest/gtest.h>
#include <list>
/// This class uses @c shared_from_this() to pass its instance to the
/// @c boost::bind function, thus the caller must store shared pointer
/// to this object.
-class TestServerUnixSocket
- : public boost::enable_shared_from_this<TestServerUnixSocket> {
+class TestServerUnixSocket {
public:
/// @brief Constructor.
/// @param test_timeout Test timeout in milliseconds.
void startTimer(const long test_timeout);
- /// @brief Starts timer for detecting test timeout.
- ///
- /// @param test_timeout Test timeout in milliseconds.
- void startTimer(const long test_timeout);
-
/// @brief Generates response of a given length.
///
/// @param response_size Desired response size.
/// @param timeout Connection timeout in milliseconds.
void start(const ClientConnection::SocketPath& socket_path,
const ClientConnection::ControlCommand& command,
- const ClientConnection::Handler& handler,
+ ClientConnection::Handler handler,
const ClientConnection::Timeout& timeout);
/// @brief Closes the socket.
/// @param length Length of the data in the input buffer.
/// @param handler User supplied callback.
void doSend(const void* buffer, const size_t length,
- const ClientConnection::Handler& handler);
+ ClientConnection::Handler handler);
/// @brief Starts asynchronous receive from the server.
///
/// @ref JSONFeed is returned.
///
/// @param handler User supplied callback.
- void doReceive(const ClientConnection::Handler& handler);
+ void doReceive(ClientConnection::Handler handler);
/// @brief Terminates the connection and invokes a user callback indicating
/// an error.
/// @param ec Error code.
/// @param handler User callback.
void terminate(const boost::system::error_code& ec,
- const ClientConnection::Handler& handler);
+ ClientConnection::Handler handler);
/// @brief Callback invoked when the timeout occurs.
///
/// It calls @ref terminate with the @c boost::asio::error::timed_out.
- void timeoutCallback(const ClientConnection::Handler& handler);
+ void timeoutCallback(ClientConnection::Handler handler);
private:
void
ClientConnectionImpl::start(const ClientConnection::SocketPath& socket_path,
const ClientConnection::ControlCommand& command,
- const ClientConnection::Handler& handler,
+ ClientConnection::Handler handler,
const ClientConnection::Timeout& timeout) {
// Start the timer protecting against timeouts.
timer_.setup(boost::bind(&ClientConnectionImpl::timeoutCallback,
});
}
-void
-ClientConnectionImpl::stop() {
- try {
- socket_.close();
-
- } catch (...) {
- // Suppress errors related to closing a socket. We can't really help
- // if an error occurred.
- }
-}
-
void
ClientConnectionImpl::doSend(const void* buffer, const size_t length,
- const ClientConnection::Handler& handler) {
+ ClientConnection::Handler handler) {
// Pass self to lambda to make sure that the instance of this class
// lives as long as the lambda is held for async send.
auto self(shared_from_this());
}
void
-ClientConnectionImpl::doReceive(const ClientConnection::Handler& handler) {
+ClientConnectionImpl::doReceive(ClientConnection::Handler handler) {
// Pass self to lambda to make sure that the instance of this class
// lives as long as the lambda is held for async receive.
auto self(shared_from_this());
terminate(ec, handler);
} else {
+ std::string x(&read_buf_[0], length);
// Lazy initialization of the JSONFeed. The feed will be "parsing"
// received JSON stream and will detect when the whole response
// has been received.
void
ClientConnectionImpl::terminate(const boost::system::error_code& ec,
- const ClientConnection::Handler& handler) {
+ ClientConnection::Handler handler) {
try {
- stop();
current_command_.clear();
handler(ec, feed_);
}
void
-ClientConnectionImpl::timeoutCallback(const ClientConnection::Handler& handler) {
+ClientConnectionImpl::timeoutCallback(ClientConnection::Handler handler) {
// Timeout has occurred. The remote server didn't provide the entire
// response within the given time frame. Let's close the connection
// and signal the timeout.
: impl_(new ClientConnectionImpl(io_service)) {
}
-ClientConnection::~ClientConnection() {
- stop();
-}
-
void
ClientConnection::start(const ClientConnection::SocketPath& socket_path,
const ClientConnection::ControlCommand& command,
- const Handler& handler,
+ ClientConnection::Handler handler,
const ClientConnection::Timeout& timeout) {
impl_->start(socket_path, command, handler, timeout);
}
-void
-ClientConnection::stop() {
- impl_->stop();
-}
-
} // end of namespace config
} // end of namespace isc
/// @param io_service Reference to the IO service.
explicit ClientConnection(asiolink::IOService& io_service);
- /// @brief Destructor.
- ///
- /// Closes current connection.
- ~ClientConnection();
-
/// @brief Starts asynchronous transaction with a remote endpoint.
///
/// Starts asynchronous connection with the remote endpoint. If the
/// occurred during the transaction.
/// @param timeout Connection timeout in milliseconds.
void start(const SocketPath& socket_path, const ControlCommand& command,
- const Handler& handler, const Timeout& timeout = Timeout(10000));
-
- /// @brief Closes the connection.
- void stop();
+ Handler handler, const Timeout& timeout = Timeout(10000));
private: