::remove("test8.json");
}
-// Verify that server returns an error if more than one connection is established.
+/// Verify that concurrent connections over the control channel can be
+/// established.
+/// @todo Future Kea 1.3 tickets will modify the behavior of the CommandMgr
+/// such that the server will be able to send response in multiple chunks.
+/// This test will need to be extended. For now, the receive and write
+/// operations are atomic and there is no conflict between concurrent
+/// connections.
TEST_F(CtrlChannelDhcpv4SrvTest, concurrentConnections) {
createUnixChannelServer();
ASSERT_TRUE(client2->sendCommand("{ \"command\": \"list-commands\" }"));
ASSERT_NO_THROW(getIOService()->poll());
- // The server should not allow for concurrent connections and should send
- // out an error message.
std::string response;
- ASSERT_TRUE(client2->getResponse(response));
- EXPECT_EQ("{ \"result\": 1, \"text\": \"exceeded maximum number of concurrent"
- " connections\" }", response);
-
- // Now disconnect the first server and retry.
- client1->disconnectFromServer();
- ASSERT_NO_THROW(getIOService()->poll());
-
- ASSERT_TRUE(client2->connectToServer(socket_path_));
- ASSERT_NO_THROW(getIOService()->poll());
-
- ASSERT_TRUE(client2->sendCommand("{ \"command\": \"list-commands\" }"));
- ASSERT_NO_THROW(getIOService()->poll());
-
- // The server should now respond ok.
+ // The server should respond ok.
ASSERT_TRUE(client2->getResponse(response));
EXPECT_TRUE(response.find("\"result\": 0") != std::string::npos);
+ // Disconnect the servers.
+ client1->disconnectFromServer();
client2->disconnectFromServer();
+ ASSERT_NO_THROW(getIOService()->poll());
}
} // End of anonymous namespace
::remove("test8.json");
}
-// Verify that server returns an error if more than one connection is established.
+/// Verify that concurrent connections over the control channel can be
+/// established.
+/// @todo Future Kea 1.3 tickets will modify the behavior of the CommandMgr
+/// such that the server will be able to send response in multiple chunks.
+/// This test will need to be extended. For now, the receive and write
+/// operations are atomic and there is no conflict between concurrent
+/// connections.
TEST_F(CtrlChannelDhcpv6SrvTest, concurrentConnections) {
createUnixChannelServer();
ASSERT_TRUE(client2->sendCommand("{ \"command\": \"list-commands\" }"));
ASSERT_NO_THROW(getIOService()->poll());
- // The server should not allow for concurrent connections and should send
- // out an error message.
std::string response;
- ASSERT_TRUE(client2->getResponse(response));
- EXPECT_EQ("{ \"result\": 1, \"text\": \"exceeded maximum number of concurrent"
- " connections\" }", response);
-
- // Now disconnect the first server and retry.
- client1->disconnectFromServer();
- ASSERT_NO_THROW(getIOService()->poll());
-
- ASSERT_TRUE(client2->connectToServer(socket_path_));
- ASSERT_NO_THROW(getIOService()->poll());
-
- ASSERT_TRUE(client2->sendCommand("{ \"command\": \"list-commands\" }"));
- ASSERT_NO_THROW(getIOService()->poll());
-
- // The server should now respond ok.
+ // The server should respond ok.
ASSERT_TRUE(client2->getResponse(response));
EXPECT_TRUE(response.find("\"result\": 0") != std::string::npos);
+ // Disconnect the servers.
+ client1->disconnectFromServer();
client2->disconnectFromServer();
+ ASSERT_NO_THROW(getIOService()->poll());
}
class ConnectionPool;
+/// @brief Represents a single connection over control socket.
+///
+/// An instance of this object is created when the @c CommandMgr acceptor
+/// receives new connection from a controlling client.
class Connection : public boost::enable_shared_from_this<Connection> {
public:
+ /// @brief Constructor.
+ ///
+ /// This constructor registers a socket of this connection in the Interface
+ /// Manager to cause the blocking call to @c select() to return as soon as
+ /// a transmission over the control socket is received.
Connection(const boost::shared_ptr<UnixDomainSocket>& socket,
ConnectionPool& connection_pool)
: socket_(socket), connection_pool_(connection_pool),
response_in_progress_(false) {
- isc::dhcp::IfaceMgr::instance().addExternalSocket(socket_->getNative(),
- 0);
-
+ // Callback value of 0 is used to indicate that callback function is
+ // not installed.
+ isc::dhcp::IfaceMgr::instance().addExternalSocket(socket_->getNative(), 0);
}
+ /// @brief Start asynchronous read over the unix domain socket.
+ ///
+ /// This method doesn't block. Once the transmission is received over the
+ /// socket, the @c Connection::receiveHandler callback is invoked to
+ /// process received data.
void start() {
socket_->asyncReceive(&buf_[0], sizeof(buf_),
boost::bind(&Connection::receiveHandler,
}
+ /// @brief Close current connection.
+ ///
+ /// Connection is not closed if the invocation of this method is a result of
+ /// server reconfiguration. The connection will be closed once a response is
+ /// sent to the client. Closing a socket during processing a request would
+ /// cause the server to not send a response to the client.
void stop() {
if (!response_in_progress_) {
LOG_INFO(command_logger, COMMAND_SOCKET_CONNECTION_CLOSED)
}
}
+ /// @brief Handler invoked when the data is received over the control
+ /// socket.
+ ///
+ /// @param ec Error code.
+ /// @param bytes_transferred Number of bytes received.
void receiveHandler(const boost::system::error_code& ec,
size_t bytes_transferred);
private:
+ /// @brief Pointer to the socket used for transmission.
boost::shared_ptr<UnixDomainSocket> socket_;
+ /// @brief Buffer used for received data.
std::array<char, 65535> buf_;
+ /// @brief Reference to the pool of connections.
ConnectionPool& connection_pool_;
+ /// @brief Boolean flag indicating if the request to stop connection is a
+ /// result of server reconfiguration.
bool response_in_progress_;
};
-
+/// @brief Pointer to the @c Connection.
typedef boost::shared_ptr<Connection> ConnectionPtr;
+/// @brief Holds all open connections.
class ConnectionPool {
public:
+ /// @brief Starts new connection.
+ ///
+ /// @param connection Pointer to the new connection object.
void start(const ConnectionPtr& connection) {
connection->start();
connections_.insert(connection);
}
+ /// @brief Stops running connection.
+ ///
+ /// @param connection Pointer to the new connection object.
void stop(const ConnectionPtr& connection) {
connection->stop();
connections_.erase(connection);
}
+ /// @brief Stops all connections which are allowed to stop.
void stopAll() {
for (auto conn = connections_.begin(); conn != connections_.end();
++conn) {
private:
+ /// @brief Pool of connections.
std::set<ConnectionPtr> connections_;
};
Connection::receiveHandler(const boost::system::error_code& ec,
size_t bytes_transferred) {
if (ec) {
-
if (ec.value() == boost::asio::error::eof) {
// Foreign host has closed the connection. We should remove it from the
// connection pool.
return;
} else if (bytes_transferred == 0) {
+ // Nothing received. Close the connection.
connection_pool_.stop(shared_from_this());
return;
}
- ConstElementPtr cmd, rsp;
-
LOG_DEBUG(command_logger, DBG_COMMAND, COMMAND_SOCKET_READ)
.arg(bytes_transferred).arg(socket_->getNative());
+ ConstElementPtr cmd, rsp;
try {
- response_in_progress_ = true;
-
// Try to interpret it as JSON.
std::string sbuf(&buf_[0], bytes_transferred);
cmd = Element::fromJSON(sbuf, true);
- if (connection_pool_.getConnectionsNum() > 1) {
- rsp = createAnswer(CONTROL_RESULT_ERROR, "exceeded maximum number of concurrent"
- " connections");
- } else {
- // If successful, then process it as a command.
- rsp = CommandMgr::instance().processCommand(cmd);
- }
+ response_in_progress_ = true;
+
+ // If successful, then process it as a command.
+ rsp = CommandMgr::instance().processCommand(cmd);
+
+ response_in_progress_ = false;
+
} catch (const Exception& ex) {
LOG_WARN(command_logger, COMMAND_PROCESS_ERROR1).arg(ex.what());
rsp = createAnswer(CONTROL_RESULT_ERROR, std::string(ex.what()));
}
+ // No response generated. Connection will be closed.
if (!rsp) {
- response_in_progress_ = false;
LOG_WARN(command_logger, COMMAND_RESPONSE_ERROR);
- return;
- }
- // Let's convert JSON response to text. Note that at this stage
- // the rsp pointer is always set.
- std::string txt = rsp->str();
- size_t len = txt.length();
- if (len > 65535) {
- // Hmm, our response is too large. Let's send the first
- // 64KB and hope for the best.
- LOG_ERROR(command_logger, COMMAND_SOCKET_RESPONSE_TOOLARGE).arg(len);
+ } else {
- len = 65535;
- }
+ // Let's convert JSON response to text. Note that at this stage
+ // the rsp pointer is always set.
+ std::string txt = rsp->str();
+ size_t len = txt.length();
+ if (len > 65535) {
+ // Hmm, our response is too large. Let's send the first
+ // 64KB and hope for the best.
+ LOG_ERROR(command_logger, COMMAND_SOCKET_RESPONSE_TOOLARGE).arg(len);
- try {
- // Send the data back over socket.
- socket_->write(txt.c_str(), len);
+ len = 65535;
+ }
- } catch (const std::exception& ex) {
- // Response transmission failed. Since the response failed, it doesn't
- // make sense to send any status codes. Let's log it and be done with
- // it.
- LOG_ERROR(command_logger, COMMAND_SOCKET_WRITE_FAIL)
- .arg(len).arg(socket_->getNative()).arg(ex.what());
- }
+ try {
+ // Send the data back over socket.
+ socket_->write(txt.c_str(), len);
- response_in_progress_ = false;
+ } catch (const std::exception& ex) {
+ // Response transmission failed. Since the response failed, it doesn't
+ // make sense to send any status codes. Let's log it and be done with
+ // it.
+ LOG_ERROR(command_logger, COMMAND_SOCKET_WRITE_FAIL)
+ .arg(len).arg(socket_->getNative()).arg(ex.what());
+ }
+ }
connection_pool_.stop(shared_from_this());
-
}
}
namespace isc {
namespace config {
+/// @brief Implementation of the @c CommandMgr.
class CommandMgrImpl {
public:
+ /// @brief Constructor.
CommandMgrImpl()
: io_service_(), acceptor_(), socket_(), socket_name_(),
connection_pool_() {
}
+ /// @brief Opens acceptor service allowing the control clients to connect.
+ ///
+ /// @param socket_info Configuration information for the control socket.
+ /// @throw BadSocketInfo When socket configuration is invalid.
+ /// @throw SocketError When socket operation fails.
void openCommandSocket(const isc::data::ConstElementPtr& socket_info);
+ /// @brief Asynchronously accepts next connection.
void doAccept();
+ /// @brief Pointer to the IO service used by the server process for running
+ /// asynchronous tasks.
IOServicePtr io_service_;
+ /// @brief Pointer to the acceptor service.
boost::shared_ptr<UnixDomainSocketAcceptor> acceptor_;
+ /// @brief Pointer to the socket into which the new connection is accepted.
boost::shared_ptr<UnixDomainSocket> socket_;
+ /// @brief Path to the unix domain socket descriptor.
+ ///
+ /// This is used to remove the socket file once the connection terminates.
std::string socket_name_;
+ /// @brief Pool of connections.
ConnectionPool connection_pool_;
};
isc_throw(BadSocketInfo, "Mandatory 'socket-type' parameter missing");
}
+ // Only supporting unix sockets right now.
if (type->stringValue() != "unix") {
isc_throw(BadSocketInfo, "Invalid 'socket-type' parameter value "
<< type->stringValue());