From: Marcin Siodelski Date: Thu, 22 Jun 2017 16:35:05 +0000 (+0200) Subject: [5317] Cleanup of the CommandMgr's code. X-Git-Tag: trac5227_base~26^2~3 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=d913e9a739ca7cf27aa23e466cf35784a70eb895;p=thirdparty%2Fkea.git [5317] Cleanup of the CommandMgr's code. --- diff --git a/src/bin/dhcp4/tests/ctrl_dhcp4_srv_unittest.cc b/src/bin/dhcp4/tests/ctrl_dhcp4_srv_unittest.cc index a7c03e214e..918455df2b 100644 --- a/src/bin/dhcp4/tests/ctrl_dhcp4_srv_unittest.cc +++ b/src/bin/dhcp4/tests/ctrl_dhcp4_srv_unittest.cc @@ -1066,7 +1066,13 @@ TEST_F(CtrlChannelDhcpv4SrvTest, configReloadValid) { ::remove("test8.json"); } -// Verify that server returns an error if more than one connection is established. +/// Verify that concurrent connections over the control channel can be +/// established. +/// @todo Future Kea 1.3 tickets will modify the behavior of the CommandMgr +/// such that the server will be able to send response in multiple chunks. +/// This test will need to be extended. For now, the receive and write +/// operations are atomic and there is no conflict between concurrent +/// connections. TEST_F(CtrlChannelDhcpv4SrvTest, concurrentConnections) { createUnixChannelServer(); @@ -1088,28 +1094,15 @@ TEST_F(CtrlChannelDhcpv4SrvTest, concurrentConnections) { ASSERT_TRUE(client2->sendCommand("{ \"command\": \"list-commands\" }")); ASSERT_NO_THROW(getIOService()->poll()); - // The server should not allow for concurrent connections and should send - // out an error message. std::string response; - ASSERT_TRUE(client2->getResponse(response)); - EXPECT_EQ("{ \"result\": 1, \"text\": \"exceeded maximum number of concurrent" - " connections\" }", response); - - // Now disconnect the first server and retry. - client1->disconnectFromServer(); - ASSERT_NO_THROW(getIOService()->poll()); - - ASSERT_TRUE(client2->connectToServer(socket_path_)); - ASSERT_NO_THROW(getIOService()->poll()); - - ASSERT_TRUE(client2->sendCommand("{ \"command\": \"list-commands\" }")); - ASSERT_NO_THROW(getIOService()->poll()); - - // The server should now respond ok. + // The server should respond ok. ASSERT_TRUE(client2->getResponse(response)); EXPECT_TRUE(response.find("\"result\": 0") != std::string::npos); + // Disconnect the servers. + client1->disconnectFromServer(); client2->disconnectFromServer(); + ASSERT_NO_THROW(getIOService()->poll()); } } // End of anonymous namespace diff --git a/src/bin/dhcp6/tests/ctrl_dhcp6_srv_unittest.cc b/src/bin/dhcp6/tests/ctrl_dhcp6_srv_unittest.cc index f0ea2a5b1a..eae45a4a30 100644 --- a/src/bin/dhcp6/tests/ctrl_dhcp6_srv_unittest.cc +++ b/src/bin/dhcp6/tests/ctrl_dhcp6_srv_unittest.cc @@ -1088,7 +1088,13 @@ TEST_F(CtrlChannelDhcpv6SrvTest, configReloadValid) { ::remove("test8.json"); } -// Verify that server returns an error if more than one connection is established. +/// Verify that concurrent connections over the control channel can be +/// established. +/// @todo Future Kea 1.3 tickets will modify the behavior of the CommandMgr +/// such that the server will be able to send response in multiple chunks. +/// This test will need to be extended. For now, the receive and write +/// operations are atomic and there is no conflict between concurrent +/// connections. TEST_F(CtrlChannelDhcpv6SrvTest, concurrentConnections) { createUnixChannelServer(); @@ -1110,28 +1116,15 @@ TEST_F(CtrlChannelDhcpv6SrvTest, concurrentConnections) { ASSERT_TRUE(client2->sendCommand("{ \"command\": \"list-commands\" }")); ASSERT_NO_THROW(getIOService()->poll()); - // The server should not allow for concurrent connections and should send - // out an error message. std::string response; - ASSERT_TRUE(client2->getResponse(response)); - EXPECT_EQ("{ \"result\": 1, \"text\": \"exceeded maximum number of concurrent" - " connections\" }", response); - - // Now disconnect the first server and retry. - client1->disconnectFromServer(); - ASSERT_NO_THROW(getIOService()->poll()); - - ASSERT_TRUE(client2->connectToServer(socket_path_)); - ASSERT_NO_THROW(getIOService()->poll()); - - ASSERT_TRUE(client2->sendCommand("{ \"command\": \"list-commands\" }")); - ASSERT_NO_THROW(getIOService()->poll()); - - // The server should now respond ok. + // The server should respond ok. ASSERT_TRUE(client2->getResponse(response)); EXPECT_TRUE(response.find("\"result\": 0") != std::string::npos); + // Disconnect the servers. + client1->disconnectFromServer(); client2->disconnectFromServer(); + ASSERT_NO_THROW(getIOService()->poll()); } diff --git a/src/lib/config/command_mgr.cc b/src/lib/config/command_mgr.cc index 916415bd6a..f04a88470f 100644 --- a/src/lib/config/command_mgr.cc +++ b/src/lib/config/command_mgr.cc @@ -28,18 +28,32 @@ namespace { class ConnectionPool; +/// @brief Represents a single connection over control socket. +/// +/// An instance of this object is created when the @c CommandMgr acceptor +/// receives new connection from a controlling client. class Connection : public boost::enable_shared_from_this { public: + /// @brief Constructor. + /// + /// This constructor registers a socket of this connection in the Interface + /// Manager to cause the blocking call to @c select() to return as soon as + /// a transmission over the control socket is received. Connection(const boost::shared_ptr& socket, ConnectionPool& connection_pool) : socket_(socket), connection_pool_(connection_pool), response_in_progress_(false) { - isc::dhcp::IfaceMgr::instance().addExternalSocket(socket_->getNative(), - 0); - + // Callback value of 0 is used to indicate that callback function is + // not installed. + isc::dhcp::IfaceMgr::instance().addExternalSocket(socket_->getNative(), 0); } + /// @brief Start asynchronous read over the unix domain socket. + /// + /// This method doesn't block. Once the transmission is received over the + /// socket, the @c Connection::receiveHandler callback is invoked to + /// process received data. void start() { socket_->asyncReceive(&buf_[0], sizeof(buf_), boost::bind(&Connection::receiveHandler, @@ -48,6 +62,12 @@ public: } + /// @brief Close current connection. + /// + /// Connection is not closed if the invocation of this method is a result of + /// server reconfiguration. The connection will be closed once a response is + /// sent to the client. Closing a socket during processing a request would + /// cause the server to not send a response to the client. void stop() { if (!response_in_progress_) { LOG_INFO(command_logger, COMMAND_SOCKET_CONNECTION_CLOSED) @@ -58,37 +78,55 @@ public: } } + /// @brief Handler invoked when the data is received over the control + /// socket. + /// + /// @param ec Error code. + /// @param bytes_transferred Number of bytes received. void receiveHandler(const boost::system::error_code& ec, size_t bytes_transferred); private: + /// @brief Pointer to the socket used for transmission. boost::shared_ptr socket_; + /// @brief Buffer used for received data. std::array buf_; + /// @brief Reference to the pool of connections. ConnectionPool& connection_pool_; + /// @brief Boolean flag indicating if the request to stop connection is a + /// result of server reconfiguration. bool response_in_progress_; }; - +/// @brief Pointer to the @c Connection. typedef boost::shared_ptr ConnectionPtr; +/// @brief Holds all open connections. class ConnectionPool { public: + /// @brief Starts new connection. + /// + /// @param connection Pointer to the new connection object. void start(const ConnectionPtr& connection) { connection->start(); connections_.insert(connection); } + /// @brief Stops running connection. + /// + /// @param connection Pointer to the new connection object. void stop(const ConnectionPtr& connection) { connection->stop(); connections_.erase(connection); } + /// @brief Stops all connections which are allowed to stop. void stopAll() { for (auto conn = connections_.begin(); conn != connections_.end(); ++conn) { @@ -103,6 +141,7 @@ public: private: + /// @brief Pool of connections. std::set connections_; }; @@ -112,7 +151,6 @@ void Connection::receiveHandler(const boost::system::error_code& ec, size_t bytes_transferred) { if (ec) { - if (ec.value() == boost::asio::error::eof) { // Foreign host has closed the connection. We should remove it from the // connection pool. @@ -130,71 +168,67 @@ Connection::receiveHandler(const boost::system::error_code& ec, return; } else if (bytes_transferred == 0) { + // Nothing received. Close the connection. connection_pool_.stop(shared_from_this()); return; } - ConstElementPtr cmd, rsp; - LOG_DEBUG(command_logger, DBG_COMMAND, COMMAND_SOCKET_READ) .arg(bytes_transferred).arg(socket_->getNative()); + ConstElementPtr cmd, rsp; try { - response_in_progress_ = true; - // Try to interpret it as JSON. std::string sbuf(&buf_[0], bytes_transferred); cmd = Element::fromJSON(sbuf, true); - if (connection_pool_.getConnectionsNum() > 1) { - rsp = createAnswer(CONTROL_RESULT_ERROR, "exceeded maximum number of concurrent" - " connections"); - } else { - // If successful, then process it as a command. - rsp = CommandMgr::instance().processCommand(cmd); - } + response_in_progress_ = true; + + // If successful, then process it as a command. + rsp = CommandMgr::instance().processCommand(cmd); + + response_in_progress_ = false; + } catch (const Exception& ex) { LOG_WARN(command_logger, COMMAND_PROCESS_ERROR1).arg(ex.what()); rsp = createAnswer(CONTROL_RESULT_ERROR, std::string(ex.what())); } + // No response generated. Connection will be closed. if (!rsp) { - response_in_progress_ = false; LOG_WARN(command_logger, COMMAND_RESPONSE_ERROR); - return; - } - // Let's convert JSON response to text. Note that at this stage - // the rsp pointer is always set. - std::string txt = rsp->str(); - size_t len = txt.length(); - if (len > 65535) { - // Hmm, our response is too large. Let's send the first - // 64KB and hope for the best. - LOG_ERROR(command_logger, COMMAND_SOCKET_RESPONSE_TOOLARGE).arg(len); + } else { - len = 65535; - } + // Let's convert JSON response to text. Note that at this stage + // the rsp pointer is always set. + std::string txt = rsp->str(); + size_t len = txt.length(); + if (len > 65535) { + // Hmm, our response is too large. Let's send the first + // 64KB and hope for the best. + LOG_ERROR(command_logger, COMMAND_SOCKET_RESPONSE_TOOLARGE).arg(len); - try { - // Send the data back over socket. - socket_->write(txt.c_str(), len); + len = 65535; + } - } catch (const std::exception& ex) { - // Response transmission failed. Since the response failed, it doesn't - // make sense to send any status codes. Let's log it and be done with - // it. - LOG_ERROR(command_logger, COMMAND_SOCKET_WRITE_FAIL) - .arg(len).arg(socket_->getNative()).arg(ex.what()); - } + try { + // Send the data back over socket. + socket_->write(txt.c_str(), len); - response_in_progress_ = false; + } catch (const std::exception& ex) { + // Response transmission failed. Since the response failed, it doesn't + // make sense to send any status codes. Let's log it and be done with + // it. + LOG_ERROR(command_logger, COMMAND_SOCKET_WRITE_FAIL) + .arg(len).arg(socket_->getNative()).arg(ex.what()); + } + } connection_pool_.stop(shared_from_this()); - } } @@ -202,26 +236,42 @@ Connection::receiveHandler(const boost::system::error_code& ec, namespace isc { namespace config { +/// @brief Implementation of the @c CommandMgr. class CommandMgrImpl { public: + /// @brief Constructor. CommandMgrImpl() : io_service_(), acceptor_(), socket_(), socket_name_(), connection_pool_() { } + /// @brief Opens acceptor service allowing the control clients to connect. + /// + /// @param socket_info Configuration information for the control socket. + /// @throw BadSocketInfo When socket configuration is invalid. + /// @throw SocketError When socket operation fails. void openCommandSocket(const isc::data::ConstElementPtr& socket_info); + /// @brief Asynchronously accepts next connection. void doAccept(); + /// @brief Pointer to the IO service used by the server process for running + /// asynchronous tasks. IOServicePtr io_service_; + /// @brief Pointer to the acceptor service. boost::shared_ptr acceptor_; + /// @brief Pointer to the socket into which the new connection is accepted. boost::shared_ptr socket_; + /// @brief Path to the unix domain socket descriptor. + /// + /// This is used to remove the socket file once the connection terminates. std::string socket_name_; + /// @brief Pool of connections. ConnectionPool connection_pool_; }; @@ -238,6 +288,7 @@ CommandMgrImpl::openCommandSocket(const isc::data::ConstElementPtr& socket_info) isc_throw(BadSocketInfo, "Mandatory 'socket-type' parameter missing"); } + // Only supporting unix sockets right now. if (type->stringValue() != "unix") { isc_throw(BadSocketInfo, "Invalid 'socket-type' parameter value " << type->stringValue()); diff --git a/src/lib/config/command_mgr.h b/src/lib/config/command_mgr.h index 75e2b548b6..95055ac03d 100644 --- a/src/lib/config/command_mgr.h +++ b/src/lib/config/command_mgr.h @@ -49,6 +49,8 @@ public: /// @brief Sets IO service to be used by the command manager. /// + /// The server should use this method to provide the Command Manager with the + /// common IO service used by the server. /// @param io_service Pointer to the IO service. void setIOService(const asiolink::IOServicePtr& io_service); @@ -57,13 +59,10 @@ public: /// Currently supported types are: /// - unix (required parameters: socket-type: unix, socket-name:/unix/path) /// - /// This method will close previously open command socket (if exists). + /// @throw BadSocketInfo When socket configuration is invalid. + /// @throw SocketError When socket operation fails. /// - /// @throw CommandSocketError if socket creation fails. - /// @throw SocketError if command socket is already open. - /// - /// @param socket_info describes control socket parameters - /// @return object representing a socket + /// @param socket_info Configuration information for the control socket. void openCommandSocket(const isc::data::ConstElementPtr& socket_info);