#include <fstream>
#include <iostream>
#include <sstream>
+#include <thread>
#include <arpa/inet.h>
#include <unistd.h>
StatsMgr::instance().removeAll();
CommandMgr::instance().closeCommandSocket();
+ CommandMgr::instance().deregisterAll();
server_.reset();
};
ADD_FAILURE() << "Invalid expected status: " << exp_status;
}
}
+
+ /// @brief Command handler which generates long response
+ static ConstElementPtr longResponseHandler(const std::string&,
+ const ConstElementPtr&) {
+ ElementPtr arguments = Element::createList();
+ std::string arg = "responseresponseresponseresponseresponseresponse"
+ "response";
+ for (unsigned i = 0; i < 8000; ++i) {
+ arguments->add(Element::create(arg));
+ }
+ return (createAnswer(0, arguments));
+ }
};
TEST_F(CtrlChannelDhcpv4SrvTest, commands) {
sendUnixCommand("utter nonsense", response);
EXPECT_EQ("{ \"result\": 1, "
- "\"text\": \"error: unexpected character u in <string>:1:2\" }",
+ "\"text\": \"invalid first character u : "
+ "current state: [ 12 RECEIVE_START_ST ] next event: [ 1 START_EVT ]\" }",
response);
}
// Should fail with a syntax error
EXPECT_EQ("{ \"result\": 1, "
"\"text\": \"subnet configuration failed: mandatory 'subnet' "
- "parameter is missing for a subnet being configured (<string>:20:17)\" }",
+ "parameter is missing for a subnet being configured (<wire>:19:17)\" }",
response);
// Check that the config was not lost
// Should fail with a syntax error
EXPECT_EQ("{ \"result\": 1, "
"\"text\": \"subnet configuration failed: mandatory 'subnet' "
- "parameter is missing for a subnet being configured (<string>:20:17)\" }",
+ "parameter is missing for a subnet being configured (<wire>:19:17)\" }",
response);
// Check that the config was not lost
// Clean up after the test.
CfgMgr::instance().clear();
}
-
+
// Tests if config-write can be called without any parameters.
TEST_F(CtrlChannelDhcpv4SrvTest, writeConfigNoFilename) {
createUnixChannelServer();
ASSERT_NO_THROW(getIOService()->poll());
}
+TEST_F(CtrlChannelDhcpv4SrvTest, longCommand) {
+ createUnixChannelServer();
+
+ boost::scoped_ptr<UnixControlClient> client(new UnixControlClient());
+ ASSERT_TRUE(client);
+
+ ASSERT_TRUE(client->connectToServer(socket_path_));
+ getIOService()->run_one();
+ getIOService()->poll();
+
+ size_t bytes_transferred = 0;
+ const size_t payload_size = 1024 * 1000;
+ bool first_payload = true;
+ while (bytes_transferred < payload_size) {
+ if (bytes_transferred == 0) {
+ std::string preamble = "{ \"command\": \"foo\", \"arguments\": [ ";
+ ASSERT_TRUE(client->sendCommand(preamble));
+ bytes_transferred += preamble.size();
+
+ } else {
+ std::ostringstream payload;
+ if (!first_payload) {
+ payload << ", ";
+ }
+ first_payload = false;
+ payload << "\"blablablablablablablablablablablablablablablabla\"";
+
+ if (bytes_transferred + payload.tellp() > payload_size) {
+ payload << "] }";
+ }
+ ASSERT_TRUE(client->sendCommand(payload.str()));
+
+ bytes_transferred += payload.tellp();
+ }
+
+ getIOService()->run_one();
+ getIOService()->poll();
+ }
+
+ std::string response;
+ ASSERT_TRUE(client->getResponse(response));
+
+ EXPECT_EQ("{ \"result\": 2, \"text\": \"'foo' command not supported.\" }",
+ response);
+
+ client->disconnectFromServer();
+}
+
+TEST_F(CtrlChannelDhcpv4SrvTest, longResponse) {
+ ASSERT_NO_THROW(
+ CommandMgr::instance().registerCommand("foo",
+ boost::bind(&CtrlChannelDhcpv4SrvTest::longResponseHandler, _1, _2));
+ );
+
+ createUnixChannelServer();
+
+ std::string reference_response = longResponseHandler("foo", ConstElementPtr())->str();
+ std::ostringstream response;
+ std::thread th([this, &response, reference_response]() {
+
+ size_t long_response_size = reference_response.size();
+
+ boost::scoped_ptr<UnixControlClient> client(new UnixControlClient());
+ ASSERT_TRUE(client);
+
+ ASSERT_TRUE(client->connectToServer(socket_path_));
+
+ std::string command = "{ \"command\": \"foo\", \"arguments\": { } }";
+ ASSERT_TRUE(client->sendCommand(command));
+
+ while (response.tellp() < long_response_size) {
+ std::string partial;
+ client->getResponse(partial);
+ response << partial;
+ }
+
+ client->disconnectFromServer();
+
+ getIOService()->stop();
+ });
+
+ getIOService()->run();
+
+ th.join();
+
+ EXPECT_EQ(reference_response, response.str());
+}
+
+
} // End of anonymous namespace
#include <config/command_mgr.h>
#include <cc/data.h>
#include <cc/command_interpreter.h>
+#include <cc/json_feed.h>
#include <dhcp/iface_mgr.h>
#include <config/config_log.h>
#include <boost/bind.hpp>
/// a transmission over the control socket is received.
Connection(const boost::shared_ptr<UnixDomainSocket>& socket,
ConnectionPool& connection_pool)
- : socket_(socket), connection_pool_(connection_pool),
- response_in_progress_(false) {
+ : socket_(socket), buf_(), response_(), connection_pool_(connection_pool),
+ feed_(), response_in_progress_(false) {
// Callback value of 0 is used to indicate that callback function is
// not installed.
isc::dhcp::IfaceMgr::instance().addExternalSocket(socket_->getNative(), 0);
- }
-
- /// @brief Start asynchronous read over the unix domain socket.
- ///
- /// This method doesn't block. Once the transmission is received over the
- /// socket, the @c Connection::receiveHandler callback is invoked to
- /// process received data.
- void start() {
- socket_->asyncReceive(&buf_[0], sizeof(buf_),
- boost::bind(&Connection::receiveHandler,
- shared_from_this(), _1, _2));
-
-
+ // Initialize state model for receiving and preparsing commands.
+ feed_.initModel();
}
/// @brief Close current connection.
}
}
+ /// @brief Start asynchronous read over the unix domain socket.
+ ///
+ /// This method doesn't block. Once the transmission is received over the
+ /// socket, the @c Connection::receiveHandler callback is invoked to
+ /// process received data.
+ void doReceive() {
+ socket_->asyncReceive(&buf_[0], sizeof(buf_),
+ boost::bind(&Connection::receiveHandler,
+ shared_from_this(), _1, _2));
+
+
+ }
+
+ /// @brief Starts asynchronous send over the unix domain socket.
+ ///
+ /// This method doesn't block. Once the send operation is completed, the
+ /// @c Connection::sendHandler cllback is invoked.
+ void doSend() {
+ size_t chunk_size = response_.size() < 8192 ? response_.size() : 8192;
+ socket_->asyncSend(&response_[0], chunk_size,
+ boost::bind(&Connection::sendHandler, shared_from_this(), _1, _2));
+ }
+
/// @brief Handler invoked when the data is received over the control
/// socket.
///
void receiveHandler(const boost::system::error_code& ec,
size_t bytes_transferred);
+
+ /// @brief Handler invoked when the data is sent over the control socket.
+ ///
+ /// @param ec Error code.
+ /// @param bytes_transferred Number of bytes sent.
+ void sendHandler(const boost::system::error_code& ec,
+ size_t bytes_trasferred);
private:
/// @brief Pointer to the socket used for transmission.
/// @brief Buffer used for received data.
std::array<char, 65535> buf_;
+ /// @brief Response created by the server.
+ std::string response_;
+
/// @brief Reference to the pool of connections.
ConnectionPool& connection_pool_;
+ /// @brief State model used to receive data over the connection and detect
+ /// when the command ends.
+ JSONFeed feed_;
+
/// @brief Boolean flag indicating if the request to stop connection is a
/// result of server reconfiguration.
bool response_in_progress_;
///
/// @param connection Pointer to the new connection object.
void start(const ConnectionPtr& connection) {
- connection->start();
+ connection->doReceive();
connections_.insert(connection);
}
try {
- // Try to interpret it as JSON.
- std::string sbuf(&buf_[0], bytes_transferred);
- cmd = Element::fromJSON(sbuf, true);
+ feed_.postBuffer(&buf_[0], bytes_transferred);
+ feed_.poll();
+ if (feed_.needData()) {
+ doReceive();
+ return;
+ }
- response_in_progress_ = true;
+ if (feed_.feedOk()) {
+ cmd = feed_.toElement();
+ response_in_progress_ = true;
- // If successful, then process it as a command.
- rsp = CommandMgr::instance().processCommand(cmd);
+ // If successful, then process it as a command.
+ rsp = CommandMgr::instance().processCommand(cmd);
- response_in_progress_ = false;
+ response_in_progress_ = false;
+ } else {
+ isc_throw(BadValue, feed_.getErrorMessage());
+ }
} catch (const Exception& ex) {
LOG_WARN(command_logger, COMMAND_PROCESS_ERROR1).arg(ex.what());
rsp = createAnswer(CONTROL_RESULT_ERROR,
"internal server error: no response generated");
- }
+ } else {
- // Let's convert JSON response to text. Note that at this stage
- // the rsp pointer is always set.
- std::string txt = rsp->str();
- size_t len = txt.length();
- if (len > 65535) {
- // Hmm, our response is too large. Let's send the first
- // 64KB and hope for the best.
- LOG_ERROR(command_logger, COMMAND_SOCKET_RESPONSE_TOOLARGE).arg(len);
+ // Let's convert JSON response to text. Note that at this stage
+ // the rsp pointer is always set.
+ response_ = rsp->str();
- len = 65535;
+ doSend();
+ return;
+
+/* size_t len = response_.length();
+ if (len > 65535) {
+ // Hmm, our response is too large. Let's send the first
+ // 64KB and hope for the best.
+ LOG_ERROR(command_logger, COMMAND_SOCKET_RESPONSE_TOOLARGE).arg(len);
+
+ len = 65535;
+ }
+
+ try {
+ // Send the data back over socket.
+ socket_->write(response_.c_str(), len);
+
+ } catch (const std::exception& ex) {
+ // Response transmission failed. Since the response failed, it doesn't
+ // make sense to send any status codes. Let's log it and be done with
+ // it.
+ LOG_ERROR(command_logger, COMMAND_SOCKET_WRITE_FAIL)
+ .arg(len).arg(socket_->getNative()).arg(ex.what());
+ } */
}
- try {
- // Send the data back over socket.
- socket_->write(txt.c_str(), len);
+ connection_pool_.stop(shared_from_this());
+}
- } catch (const std::exception& ex) {
- // Response transmission failed. Since the response failed, it doesn't
- // make sense to send any status codes. Let's log it and be done with
- // it.
+void
+Connection::sendHandler(const boost::system::error_code& ec,
+ size_t bytes_transferred) {
+ if (ec && ec.value() != boost::asio::error::operation_aborted) {
LOG_ERROR(command_logger, COMMAND_SOCKET_WRITE_FAIL)
- .arg(len).arg(socket_->getNative()).arg(ex.what());
+ .arg(socket_->getNative()).arg(ec.message());
+
+ } else {
+ response_.erase(0, bytes_transferred);
+ if (!response_.empty()) {
+ doSend();
+ return;
}
connection_pool_.stop(shared_from_this());