From: Thomas Markwalder Date: Mon, 7 Nov 2022 19:45:14 +0000 (-0500) Subject: [#2583] Cleaned up tcp_stream and UT X-Git-Tag: Kea-2.3.3~95 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=2dada52252939a7118a695ba462c19c909ad2eed;p=thirdparty%2Fkea.git [#2583] Cleaned up tcp_stream and UT src/lib/tcp/tcp_connection.cc Removed couts src/lib/tcp/tcp_stream.h Added doxygen src/lib/tcp/tests/tcp_listener_unittests.cc Removed HTTPs Added testing of mulitple messages per client src/lib/tcp/tests/tcp_test_client.h Clients now use TcpStreamRequest to receive and can send/receive multiple requests/responses --- diff --git a/src/lib/tcp/tcp_connection.cc b/src/lib/tcp/tcp_connection.cc index 64255779c5..17463fc9d3 100644 --- a/src/lib/tcp/tcp_connection.cc +++ b/src/lib/tcp/tcp_connection.cc @@ -457,7 +457,6 @@ TcpConnection::socketWriteCallback(TcpResponsePtr response, void TcpConnection::setupIdleTimer() { - std::cout << "idle timeout: " << idle_timeout_ << std::endl; idle_timer_.setup(std::bind(&TcpConnection::idleTimeoutCallback, this), idle_timeout_, IntervalTimer::ONE_SHOT); } diff --git a/src/lib/tcp/tcp_connection.h b/src/lib/tcp/tcp_connection.h index 2bfbea038e..2ba0661e99 100644 --- a/src/lib/tcp/tcp_connection.h +++ b/src/lib/tcp/tcp_connection.h @@ -25,7 +25,7 @@ namespace isc { namespace tcp { /// @todo Take this out, it's just for dev coding -#if 1 +#if 0 #define HERE(a) std::cout << __FILE__ << ":" << __FUNCTION__ << ":" << __LINE__ << " " << a << std::endl << std::flush; #else #define HERE(a) diff --git a/src/lib/tcp/tcp_stream.h b/src/lib/tcp/tcp_stream.h index 3d4e651de8..51171437c4 100644 --- a/src/lib/tcp/tcp_stream.h +++ b/src/lib/tcp/tcp_stream.h @@ -17,6 +17,11 @@ namespace isc { namespace tcp { +/// @brief Implement a simple length:data input stream message. +/// +/// This class can be used to receive a single message from a TCP +/// stream where the message consists of a 16-bit unsigned length (in +/// network order), followed by that number of bytes of data. class TcpStreamRequest : public TcpRequest { public: /// @brief Constructor. @@ -40,18 +45,21 @@ public: /// @brief Returns request contents formatted for log output /// - /// @param limit Maximum length of the buffer to be output. If the limit + /// @param limit Maximum length of the buffer to be output. If the limit /// is 0, the length of the output is unlimited. /// @return Textual representation of the input buffer. virtual std::string logFormatRequest(const size_t limit = 0) const; + /// @brief Unpacks the wire data into a string request. virtual void unpack(); + /// @brief Fetches the unpacked string request. std::string getRequest() const { return(request_); }; protected: + /// @brief Unpacked request string. std::string request_; private: @@ -59,8 +67,14 @@ private: size_t expected_size_; }; +/// @brief Pointer to a TcpStreamRequest. typedef boost::shared_ptr TcpStreamRequestPtr; +/// @brief Implements a simple length:data output stream message. +/// +/// This class can be used to send a single message on a TCP +/// stream where the message consists of a 16-bit unsigned length (in +/// network order), followed by that number of bytes of data. class TcpStreamResponse : public TcpResponse { public: /// @brief Constructor. @@ -83,9 +97,11 @@ public: virtual void pack(); private: + /// @brief Unpacked response data to send. std::string response_; }; +/// @brief Pointer to a TcpStreamResponse. typedef boost::shared_ptr TcpStreamResponsePtr; } // end of namespace isc::tcp diff --git a/src/lib/tcp/tests/tcp_listener_unittests.cc b/src/lib/tcp/tests/tcp_listener_unittests.cc index 9a1e4fe72d..d16bb4557a 100644 --- a/src/lib/tcp/tests/tcp_listener_unittests.cc +++ b/src/lib/tcp/tests/tcp_listener_unittests.cc @@ -37,13 +37,13 @@ namespace ph = std::placeholders; namespace { -/// @brief IP address to which HTTP service is bound. +/// @brief IP address to which service is bound. const std::string SERVER_ADDRESS = "127.0.0.1"; -/// @brief IPv6 address to whch HTTP service is bound. +/// @brief IPv6 address to whch service is bound. const std::string IPV6_SERVER_ADDRESS = "::1"; -/// @brief Port number to which HTTP service is bound. +/// @brief Port number to which service is bound. const unsigned short SERVER_PORT = 18123; /// @brief Request Timeout used in most of the tests (ms). @@ -87,7 +87,6 @@ public: req->unpack(); auto request_str = req->getRequest(); - std::cout << "request_str: [" << request_str << "]" << std::endl; std::ostringstream os; if (request_str == "I am done") { os << "good bye"; @@ -95,7 +94,6 @@ public: os << "echo " << request_str; } - std::cout << "send client this: [" << os.str() << "]" << std::endl; TcpStreamResponsePtr resp(new TcpStreamResponse()); resp->setResponseData(os.str()); resp->pack(); @@ -159,7 +157,7 @@ public: /// @brief Destructor. /// - /// Removes active HTTP clients. + /// Removes active clients. virtual ~TcpListenerTest() { for (auto client = clients_.begin(); client != clients_.end(); ++client) { @@ -185,7 +183,7 @@ public: /// This method creates TcpTestClient instance and retains it in the clients_ /// list. /// - /// @param request String containing the HTTP request to be sent. + /// @param request String containing the request to be sent. void startRequest(const std::string& request) { TcpTestClientPtr client(new TcpTestClient(io_service_, std::bind(&TcpListenerTest::clientDone, this))); @@ -193,6 +191,13 @@ public: clients_.back()->startRequest(request); } + void startRequests(const std::list& requests) { + TcpTestClientPtr client(new TcpTestClient(io_service_, + std::bind(&TcpListenerTest::clientDone, this))); + clients_.push_back(client); + clients_.back()->startRequests(requests); + } + /// @brief Callback function invoke upon test timeout. /// /// It stops the IO service and reports test timeout. @@ -354,5 +359,32 @@ TEST_F(TcpListenerTest, multipleClientsListen) { io_service_.poll(); } +TEST_F(TcpListenerTest, multipleRequetsPerClients) { + std::listrequests{ "one", "two", "three", "I am done"}; + + TcpTestListener listener(io_service_, IOAddress(SERVER_ADDRESS), SERVER_PORT, + TlsContextPtr(), TcpListener::IdleTimeout(IDLE_TIMEOUT)); + + ASSERT_NO_THROW(listener.start()); + ASSERT_EQ(SERVER_ADDRESS, listener.getLocalAddress().toText()); + ASSERT_EQ(SERVER_PORT, listener.getLocalPort()); + size_t num_clients = 5; + for ( auto i = 0; i < num_clients; ++i ) { + ASSERT_NO_THROW(startRequests(requests)); + } + + ASSERT_NO_THROW(runIOService()); + ASSERT_EQ(num_clients, clients_.size()); + + std::listexpected_responses{ "echo one", "echo two", "echo three", "good bye"}; + for (auto client = clients_.begin(); client != clients_.end(); ++client) { + EXPECT_TRUE((*client)->receiveDone()); + EXPECT_FALSE((*client)->expectedEof()); + EXPECT_EQ(expected_responses, (*client)->getResponses()); + } + + listener.stop(); + io_service_.poll(); +} } diff --git a/src/lib/tcp/tests/tcp_test_client.h b/src/lib/tcp/tests/tcp_test_client.h index bb3749d876..ac9e77f76e 100644 --- a/src/lib/tcp/tests/tcp_test_client.h +++ b/src/lib/tcp/tests/tcp_test_client.h @@ -48,7 +48,7 @@ public: /// @brief Connect to the listener. /// - /// @param request HTTP request in the textual format. + /// @param request request string to send. void connect() { tcp::endpoint endpoint(address::from_string(server_address_), server_port_); socket_.async_connect(endpoint, @@ -73,9 +73,9 @@ public: }); } - /// @brief Send HTTP request specified in textual format. + /// @brief Send request specified in textual format. /// - /// @param request HTTP request in the textual format. + /// @param request request in the textual format. void startRequest(const std::string& request) { tcp::endpoint endpoint(address::from_string(server_address_), server_port_); socket_.async_connect(endpoint, @@ -106,6 +106,47 @@ public: }); } + /// @brief Send request specified in textual format. + /// + /// @param request request in the textual format. + void startRequests(const std::list& requests) { + requests_to_send_ = requests; + + tcp::endpoint endpoint(address::from_string(server_address_), server_port_); + socket_.async_connect(endpoint, + [this](const boost::system::error_code& ec) { + receive_done_ = false; + expected_eof_ = false; + if (ec) { + // One would expect that async_connect wouldn't return + // EINPROGRESS error code, but simply wait for the connection + // to get established before the handler is invoked. It turns out, + // however, that on some OSes the connect handler may receive this + // error code which doesn't necessarily indicate a problem. + // Making an attempt to write and read from this socket will + // typically succeed. So, we ignore this error. + if (ec.value() != boost::asio::error::in_progress) { + ADD_FAILURE() << "error occurred while connecting: " + << ec.message(); + done_callback_(); + return; + } + } + + sendNextRequest(); + }); + } + + /// @brief Sends the next request from the list of requests to send. + void sendNextRequest() { + // If there are any requests left to send, send them. + if (!requests_to_send_.empty()) { + std::string request = requests_to_send_.front(); + requests_to_send_.pop_front(); + sendRequest(request); + } + } + /// @brief Send a stream request. /// /// @param request request data to send textual format. @@ -126,6 +167,7 @@ public: /// @brief Wait for a server to close the connection. void waitForEof() { + stream_response_.reset(new TcpStreamRequest()); receivePartialResponse(true); } @@ -174,7 +216,6 @@ public: sendPartialRequest(wire_request); } else { // Request has been sent. Start receiving response. - response_.clear(); receivePartialResponse(); } }); @@ -185,11 +226,14 @@ public: socket_.async_read_some(boost::asio::buffer(buf_.data(), buf_.size()), [this, expect_eof](const boost::system::error_code& ec, std::size_t bytes_transferred) { + if (!stream_response_) { + stream_response_.reset(new TcpStreamRequest()); + } + if (ec) { // IO service stopped so simply return. if (ec.value() == boost::asio::error::operation_aborted) { return; - } else if ((ec.value() == boost::asio::error::try_again) || (ec.value() == boost::asio::error::would_block)) { // If we should try again, make sure that there is no garbage @@ -206,22 +250,39 @@ public: } } + // Post received data to the current response. if (bytes_transferred > 0) { - response_.insert(response_.end(), buf_.data(), - buf_.data() + bytes_transferred); + stream_response_->postBuffer(buf_.data(), bytes_transferred); } - // Two consecutive new lines end the part of the response we're - // expecting. - if (response_.find("good bye", 0) != std::string::npos) { - receive_done_ = true; - done_callback_(); - } else { + if (stream_response_->needData()) { + // Response is incomplete, keep reading. receivePartialResponse(); + } else { + // Response is complete, process it. + responseReceived(); } }); } + /// @brief Process a completed response received from the server. + void responseReceived() { + /// Unpack wire data into a string. + ASSERT_NO_THROW(stream_response_->unpack()); + std::string response = stream_response_->getRequest(); + responses_received_.push_back(response); + + // Quit if server tells us "good bye". + if (response.find("good bye", 0) != std::string::npos) { + receive_done_ = true; + done_callback_(); + } + + // Clear out for the next one. + stream_response_.reset(); + sendNextRequest(); + } + /// @brief Checks if the TCP connection is still open. /// /// Tests the TCP connection by trying to read from the socket. @@ -287,13 +348,6 @@ public: socket_.close(); } - /// @brief Returns the HTTP response string. - /// - /// @return string containing the response. - std::string getResponse() const { - return (response_); - } - /// @brief Returns true if the receive completed without error. /// /// @return True if the receive completed successfully, false @@ -309,6 +363,13 @@ public: return (expected_eof_); } + /// @brief Returns the list of received responses. + /// + /// @return list of string responses. + const std::list& getResponses() { + return (responses_received_); + } + private: /// @brief Holds reference to the IO service. @@ -333,7 +394,6 @@ private: /// @brief IP port of the server. uint16_t server_port_; - /// @brief Set to true when the receive has completed successfully. bool receive_done_; @@ -342,6 +402,14 @@ private: /// expected it to do. bool expected_eof_; + /// @brief Pointer to the server response currently being received. + TcpStreamRequestPtr stream_response_; + + /// @brief List of string requests to send. + std::list requests_to_send_; + + /// @brief List of string responses received. + std::list responses_received_; }; /// @brief Pointer to the TcpTestClient.