From: Marcin Siodelski Date: Mon, 26 Jun 2017 13:39:57 +0000 (+0200) Subject: [5318] PoC for extending maximum request/response over 64k in Ctrl channel. X-Git-Tag: trac5227_base~8^2~1^2~10 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=ab11e2394efa8c62e4c26c9a7c192ec1a8ab8e4d;p=thirdparty%2Fkea.git [5318] PoC for extending maximum request/response over 64k in Ctrl channel. --- diff --git a/src/bin/dhcp4/tests/ctrl_dhcp4_srv_unittest.cc b/src/bin/dhcp4/tests/ctrl_dhcp4_srv_unittest.cc index cc3d3da9b9..8d4111897c 100644 --- a/src/bin/dhcp4/tests/ctrl_dhcp4_srv_unittest.cc +++ b/src/bin/dhcp4/tests/ctrl_dhcp4_srv_unittest.cc @@ -30,6 +30,7 @@ #include #include #include +#include #include #include @@ -85,6 +86,7 @@ public: StatsMgr::instance().removeAll(); CommandMgr::instance().closeCommandSocket(); + CommandMgr::instance().deregisterAll(); server_.reset(); }; @@ -296,6 +298,18 @@ public: ADD_FAILURE() << "Invalid expected status: " << exp_status; } } + + /// @brief Command handler which generates long response + static ConstElementPtr longResponseHandler(const std::string&, + const ConstElementPtr&) { + ElementPtr arguments = Element::createList(); + std::string arg = "responseresponseresponseresponseresponseresponse" + "response"; + for (unsigned i = 0; i < 8000; ++i) { + arguments->add(Element::create(arg)); + } + return (createAnswer(0, arguments)); + } }; TEST_F(CtrlChannelDhcpv4SrvTest, commands) { @@ -431,7 +445,8 @@ TEST_F(CtrlChannelDhcpv4SrvTest, controlChannelNegative) { sendUnixCommand("utter nonsense", response); EXPECT_EQ("{ \"result\": 1, " - "\"text\": \"error: unexpected character u in :1:2\" }", + "\"text\": \"invalid first character u : " + "current state: [ 12 RECEIVE_START_ST ] next event: [ 1 START_EVT ]\" }", response); } @@ -710,7 +725,7 @@ TEST_F(CtrlChannelDhcpv4SrvTest, configSet) { // Should fail with a syntax error EXPECT_EQ("{ \"result\": 1, " "\"text\": \"subnet configuration failed: mandatory 'subnet' " - "parameter is missing for a subnet being configured (:20:17)\" }", + "parameter is missing for a subnet being configured (:19:17)\" }", response); // Check that the config was not lost @@ -909,7 +924,7 @@ TEST_F(CtrlChannelDhcpv4SrvTest, configTest) { // Should fail with a syntax error EXPECT_EQ("{ \"result\": 1, " "\"text\": \"subnet configuration failed: mandatory 'subnet' " - "parameter is missing for a subnet being configured (:20:17)\" }", + "parameter is missing for a subnet being configured (:19:17)\" }", response); // Check that the config was not lost @@ -950,7 +965,7 @@ TEST_F(CtrlChannelDhcpv4SrvTest, configTest) { // Clean up after the test. CfgMgr::instance().clear(); } - + // Tests if config-write can be called without any parameters. TEST_F(CtrlChannelDhcpv4SrvTest, writeConfigNoFilename) { createUnixChannelServer(); @@ -1108,4 +1123,93 @@ TEST_F(CtrlChannelDhcpv4SrvTest, concurrentConnections) { ASSERT_NO_THROW(getIOService()->poll()); } +TEST_F(CtrlChannelDhcpv4SrvTest, longCommand) { + createUnixChannelServer(); + + boost::scoped_ptr client(new UnixControlClient()); + ASSERT_TRUE(client); + + ASSERT_TRUE(client->connectToServer(socket_path_)); + getIOService()->run_one(); + getIOService()->poll(); + + size_t bytes_transferred = 0; + const size_t payload_size = 1024 * 1000; + bool first_payload = true; + while (bytes_transferred < payload_size) { + if (bytes_transferred == 0) { + std::string preamble = "{ \"command\": \"foo\", \"arguments\": [ "; + ASSERT_TRUE(client->sendCommand(preamble)); + bytes_transferred += preamble.size(); + + } else { + std::ostringstream payload; + if (!first_payload) { + payload << ", "; + } + first_payload = false; + payload << "\"blablablablablablablablablablablablablablablabla\""; + + if (bytes_transferred + payload.tellp() > payload_size) { + payload << "] }"; + } + ASSERT_TRUE(client->sendCommand(payload.str())); + + bytes_transferred += payload.tellp(); + } + + getIOService()->run_one(); + getIOService()->poll(); + } + + std::string response; + ASSERT_TRUE(client->getResponse(response)); + + EXPECT_EQ("{ \"result\": 2, \"text\": \"'foo' command not supported.\" }", + response); + + client->disconnectFromServer(); +} + +TEST_F(CtrlChannelDhcpv4SrvTest, longResponse) { + ASSERT_NO_THROW( + CommandMgr::instance().registerCommand("foo", + boost::bind(&CtrlChannelDhcpv4SrvTest::longResponseHandler, _1, _2)); + ); + + createUnixChannelServer(); + + std::string reference_response = longResponseHandler("foo", ConstElementPtr())->str(); + std::ostringstream response; + std::thread th([this, &response, reference_response]() { + + size_t long_response_size = reference_response.size(); + + boost::scoped_ptr client(new UnixControlClient()); + ASSERT_TRUE(client); + + ASSERT_TRUE(client->connectToServer(socket_path_)); + + std::string command = "{ \"command\": \"foo\", \"arguments\": { } }"; + ASSERT_TRUE(client->sendCommand(command)); + + while (response.tellp() < long_response_size) { + std::string partial; + client->getResponse(partial); + response << partial; + } + + client->disconnectFromServer(); + + getIOService()->stop(); + }); + + getIOService()->run(); + + th.join(); + + EXPECT_EQ(reference_response, response.str()); +} + + } // End of anonymous namespace diff --git a/src/bin/dhcp6/tests/ctrl_dhcp6_srv_unittest.cc b/src/bin/dhcp6/tests/ctrl_dhcp6_srv_unittest.cc index a477fdefdb..4d809a920d 100644 --- a/src/bin/dhcp6/tests/ctrl_dhcp6_srv_unittest.cc +++ b/src/bin/dhcp6/tests/ctrl_dhcp6_srv_unittest.cc @@ -484,7 +484,7 @@ TEST_F(CtrlChannelDhcpv6SrvTest, configSet) { // Should fail with a syntax error EXPECT_EQ("{ \"result\": 1, " - "\"text\": \"subnet configuration failed: mandatory 'subnet' parameter is missing for a subnet being configured (:21:17)\" }", + "\"text\": \"subnet configuration failed: mandatory 'subnet' parameter is missing for a subnet being configured (:20:17)\" }", response); // Check that the config was not lost @@ -630,7 +630,7 @@ TEST_F(CtrlChannelDhcpv6SrvTest, configTest) { // Should fail with a syntax error EXPECT_EQ("{ \"result\": 1, " "\"text\": \"subnet configuration failed: mandatory 'subnet' parameter " - "is missing for a subnet being configured (:21:17)\" }", + "is missing for a subnet being configured (:20:17)\" }", response); // Check that the config was not lost @@ -737,7 +737,8 @@ TEST_F(CtrlChannelDhcpv6SrvTest, controlChannelNegative) { sendUnixCommand("utter nonsense", response); EXPECT_EQ("{ \"result\": 1, " - "\"text\": \"error: unexpected character u in :1:2\" }", + "\"text\": \"invalid first character u : " + "current state: [ 12 RECEIVE_START_ST ] next event: [ 1 START_EVT ]\" }", response); } diff --git a/src/lib/config/command_mgr.cc b/src/lib/config/command_mgr.cc index 2be8124e94..2d6c249972 100644 --- a/src/lib/config/command_mgr.cc +++ b/src/lib/config/command_mgr.cc @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -42,24 +43,13 @@ public: /// a transmission over the control socket is received. Connection(const boost::shared_ptr& socket, ConnectionPool& connection_pool) - : socket_(socket), connection_pool_(connection_pool), - response_in_progress_(false) { + : socket_(socket), buf_(), response_(), connection_pool_(connection_pool), + feed_(), response_in_progress_(false) { // Callback value of 0 is used to indicate that callback function is // not installed. isc::dhcp::IfaceMgr::instance().addExternalSocket(socket_->getNative(), 0); - } - - /// @brief Start asynchronous read over the unix domain socket. - /// - /// This method doesn't block. Once the transmission is received over the - /// socket, the @c Connection::receiveHandler callback is invoked to - /// process received data. - void start() { - socket_->asyncReceive(&buf_[0], sizeof(buf_), - boost::bind(&Connection::receiveHandler, - shared_from_this(), _1, _2)); - - + // Initialize state model for receiving and preparsing commands. + feed_.initModel(); } /// @brief Close current connection. @@ -78,6 +68,29 @@ public: } } + /// @brief Start asynchronous read over the unix domain socket. + /// + /// This method doesn't block. Once the transmission is received over the + /// socket, the @c Connection::receiveHandler callback is invoked to + /// process received data. + void doReceive() { + socket_->asyncReceive(&buf_[0], sizeof(buf_), + boost::bind(&Connection::receiveHandler, + shared_from_this(), _1, _2)); + + + } + + /// @brief Starts asynchronous send over the unix domain socket. + /// + /// This method doesn't block. Once the send operation is completed, the + /// @c Connection::sendHandler cllback is invoked. + void doSend() { + size_t chunk_size = response_.size() < 8192 ? response_.size() : 8192; + socket_->asyncSend(&response_[0], chunk_size, + boost::bind(&Connection::sendHandler, shared_from_this(), _1, _2)); + } + /// @brief Handler invoked when the data is received over the control /// socket. /// @@ -86,6 +99,13 @@ public: void receiveHandler(const boost::system::error_code& ec, size_t bytes_transferred); + + /// @brief Handler invoked when the data is sent over the control socket. + /// + /// @param ec Error code. + /// @param bytes_transferred Number of bytes sent. + void sendHandler(const boost::system::error_code& ec, + size_t bytes_trasferred); private: /// @brief Pointer to the socket used for transmission. @@ -94,9 +114,16 @@ private: /// @brief Buffer used for received data. std::array buf_; + /// @brief Response created by the server. + std::string response_; + /// @brief Reference to the pool of connections. ConnectionPool& connection_pool_; + /// @brief State model used to receive data over the connection and detect + /// when the command ends. + JSONFeed feed_; + /// @brief Boolean flag indicating if the request to stop connection is a /// result of server reconfiguration. bool response_in_progress_; @@ -114,7 +141,7 @@ public: /// /// @param connection Pointer to the new connection object. void start(const ConnectionPtr& connection) { - connection->start(); + connection->doReceive(); connections_.insert(connection); } @@ -180,17 +207,25 @@ Connection::receiveHandler(const boost::system::error_code& ec, try { - // Try to interpret it as JSON. - std::string sbuf(&buf_[0], bytes_transferred); - cmd = Element::fromJSON(sbuf, true); + feed_.postBuffer(&buf_[0], bytes_transferred); + feed_.poll(); + if (feed_.needData()) { + doReceive(); + return; + } - response_in_progress_ = true; + if (feed_.feedOk()) { + cmd = feed_.toElement(); + response_in_progress_ = true; - // If successful, then process it as a command. - rsp = CommandMgr::instance().processCommand(cmd); + // If successful, then process it as a command. + rsp = CommandMgr::instance().processCommand(cmd); - response_in_progress_ = false; + response_in_progress_ = false; + } else { + isc_throw(BadValue, feed_.getErrorMessage()); + } } catch (const Exception& ex) { LOG_WARN(command_logger, COMMAND_PROCESS_ERROR1).arg(ex.what()); @@ -203,30 +238,52 @@ Connection::receiveHandler(const boost::system::error_code& ec, rsp = createAnswer(CONTROL_RESULT_ERROR, "internal server error: no response generated"); - } + } else { - // Let's convert JSON response to text. Note that at this stage - // the rsp pointer is always set. - std::string txt = rsp->str(); - size_t len = txt.length(); - if (len > 65535) { - // Hmm, our response is too large. Let's send the first - // 64KB and hope for the best. - LOG_ERROR(command_logger, COMMAND_SOCKET_RESPONSE_TOOLARGE).arg(len); + // Let's convert JSON response to text. Note that at this stage + // the rsp pointer is always set. + response_ = rsp->str(); - len = 65535; + doSend(); + return; + +/* size_t len = response_.length(); + if (len > 65535) { + // Hmm, our response is too large. Let's send the first + // 64KB and hope for the best. + LOG_ERROR(command_logger, COMMAND_SOCKET_RESPONSE_TOOLARGE).arg(len); + + len = 65535; + } + + try { + // Send the data back over socket. + socket_->write(response_.c_str(), len); + + } catch (const std::exception& ex) { + // Response transmission failed. Since the response failed, it doesn't + // make sense to send any status codes. Let's log it and be done with + // it. + LOG_ERROR(command_logger, COMMAND_SOCKET_WRITE_FAIL) + .arg(len).arg(socket_->getNative()).arg(ex.what()); + } */ } - try { - // Send the data back over socket. - socket_->write(txt.c_str(), len); + connection_pool_.stop(shared_from_this()); +} - } catch (const std::exception& ex) { - // Response transmission failed. Since the response failed, it doesn't - // make sense to send any status codes. Let's log it and be done with - // it. +void +Connection::sendHandler(const boost::system::error_code& ec, + size_t bytes_transferred) { + if (ec && ec.value() != boost::asio::error::operation_aborted) { LOG_ERROR(command_logger, COMMAND_SOCKET_WRITE_FAIL) - .arg(len).arg(socket_->getNative()).arg(ex.what()); + .arg(socket_->getNative()).arg(ec.message()); + + } else { + response_.erase(0, bytes_transferred); + if (!response_.empty()) { + doSend(); + return; } connection_pool_.stop(shared_from_this()); diff --git a/src/lib/config/config_messages.mes b/src/lib/config/config_messages.mes index 891248013f..40370f7a63 100644 --- a/src/lib/config/config_messages.mes +++ b/src/lib/config/config_messages.mes @@ -88,13 +88,6 @@ over command socket identified by specified file descriptor. This error message indicates that an error was encountered while reading from command socket. -% COMMAND_SOCKET_RESPONSE_TOOLARGE Server's response was larger (%1) than supported 64KB -This error message indicates that the server received a command and generated -an answer for it, but that response was larger than supported 64KB. Server -will attempt to send the first 64KB of the response. Depending on the nature -of this response, this may indicate a software or configuration error. Future -Kea versions are expected to have better support for large responses. - % COMMAND_SOCKET_UNIX_CLOSE Command socket closed: UNIX, fd=%1, path=%2 This informational message indicates that the daemon closed a command processing socket. This was a UNIX socket. It was opened with the file @@ -109,6 +102,6 @@ descriptor and path specified. This debug message indicates that the specified number of bytes was sent over command socket identifier by the specified file descriptor. -% COMMAND_SOCKET_WRITE_FAIL Error while writing %1 bytes to command socket %2 : %3 +% COMMAND_SOCKET_WRITE_FAIL Error while writing to command socket %1 : %2 This error message indicates that an error was encountered while attempting to send a response to the command socket. diff --git a/src/lib/testutils/unix_control_client.cc b/src/lib/testutils/unix_control_client.cc index 8057258b6b..cb8820bd5a 100644 --- a/src/lib/testutils/unix_control_client.cc +++ b/src/lib/testutils/unix_control_client.cc @@ -95,7 +95,6 @@ bool UnixControlClient::getResponse(std::string& response) { return (false); } case 0: - ADD_FAILURE() << "No response data sent"; return (false); default: