]> git.ipfire.org Git - thirdparty/kea.git/commitdiff
[#2583] Cleaned up tcp_stream and UT
authorThomas Markwalder <tmark@isc.org>
Mon, 7 Nov 2022 19:45:14 +0000 (14:45 -0500)
committerThomas Markwalder <tmark@isc.org>
Thu, 10 Nov 2022 19:43:23 +0000 (14:43 -0500)
src/lib/tcp/tcp_connection.cc
    Removed couts

src/lib/tcp/tcp_stream.h
    Added doxygen

src/lib/tcp/tests/tcp_listener_unittests.cc
    Removed HTTPs
    Added testing of mulitple messages per client

src/lib/tcp/tests/tcp_test_client.h
    Clients now use TcpStreamRequest to receive and can
    send/receive multiple requests/responses

src/lib/tcp/tcp_connection.cc
src/lib/tcp/tcp_connection.h
src/lib/tcp/tcp_stream.h
src/lib/tcp/tests/tcp_listener_unittests.cc
src/lib/tcp/tests/tcp_test_client.h

index 64255779c53f15340d22e571008d69f5d74d16cf..17463fc9d368f18cc0e16bd4ed90e3fc7b261748 100644 (file)
@@ -457,7 +457,6 @@ TcpConnection::socketWriteCallback(TcpResponsePtr response,
 
 void
 TcpConnection::setupIdleTimer() {
-    std::cout << "idle timeout: " << idle_timeout_ << std::endl;
     idle_timer_.setup(std::bind(&TcpConnection::idleTimeoutCallback, this),
                       idle_timeout_, IntervalTimer::ONE_SHOT);
 }
index 2bfbea038e97ae96893769a83fd2e214903d427a..2ba0661e9964b359cb28d29f9cfd6959c6e6231b 100644 (file)
@@ -25,7 +25,7 @@ namespace isc {
 namespace tcp {
 
 /// @todo Take this out, it's just for dev coding
-#if 1
+#if 0
 #define HERE(a) std::cout << __FILE__ << ":" << __FUNCTION__ << ":" << __LINE__ << " " << a << std::endl << std::flush;
 #else
 #define HERE(a)
index 3d4e651de80f96e5dac2466a441208c4414d4bb1..51171437c47365782b651cb126463ea4ab8a8c01 100644 (file)
 namespace isc {
 namespace tcp {
 
+/// @brief Implement a simple length:data input stream message.
+///
+/// This class can be used to receive a single message from a TCP
+/// stream where the message consists of a 16-bit unsigned length (in
+/// network order), followed by that number of bytes of data.
 class TcpStreamRequest : public TcpRequest {
 public:
     /// @brief Constructor.
@@ -40,18 +45,21 @@ public:
 
     /// @brief Returns request contents formatted for log output
     ///
-    /// @param limit Maximum length of the buffer to be output. If the limit 
+    /// @param limit Maximum length of the buffer to be output. If the limit
     /// is 0, the length of the output is unlimited.
     /// @return Textual representation of the input buffer.
     virtual std::string logFormatRequest(const size_t limit = 0) const;
 
+    /// @brief Unpacks the wire data into a string request.
     virtual void unpack();
 
+    /// @brief Fetches the unpacked string request.
     std::string getRequest() const {
         return(request_);
     };
 
 protected:
+    /// @brief Unpacked request string.
     std::string request_;
 
 private:
@@ -59,8 +67,14 @@ private:
     size_t expected_size_;
 };
 
+/// @brief Pointer to a TcpStreamRequest.
 typedef boost::shared_ptr<TcpStreamRequest> TcpStreamRequestPtr;
 
+/// @brief Implements a simple length:data output stream message.
+///
+/// This class can be used to send a single message on a TCP
+/// stream where the message consists of a 16-bit unsigned length (in
+/// network order), followed by that number of bytes of data.
 class TcpStreamResponse : public TcpResponse {
 public:
     /// @brief Constructor.
@@ -83,9 +97,11 @@ public:
     virtual void pack();
 
 private:
+    /// @brief Unpacked response data to send.
     std::string response_;
 };
 
+/// @brief Pointer to a TcpStreamResponse.
 typedef boost::shared_ptr<TcpStreamResponse> TcpStreamResponsePtr;
 
 } // end of namespace isc::tcp
index 9a1e4fe72df5ab5df679ca2c8fa1b0cde7b822b2..d16bb4557aadff90d2aa8f173d025d1e7217396f 100644 (file)
@@ -37,13 +37,13 @@ namespace ph = std::placeholders;
 
 namespace {
 
-/// @brief IP address to which HTTP service is bound.
+/// @brief IP address to which service is bound.
 const std::string SERVER_ADDRESS = "127.0.0.1";
 
-/// @brief IPv6 address to whch HTTP service is bound.
+/// @brief IPv6 address to whch service is bound.
 const std::string IPV6_SERVER_ADDRESS = "::1";
 
-/// @brief Port number to which HTTP service is bound.
+/// @brief Port number to which service is bound.
 const unsigned short SERVER_PORT = 18123;
 
 /// @brief Request Timeout used in most of the tests (ms).
@@ -87,7 +87,6 @@ public:
 
         req->unpack();
         auto request_str = req->getRequest();
-        std::cout << "request_str: [" << request_str << "]" << std::endl;
         std::ostringstream os;
         if (request_str == "I am done") {
             os << "good bye";
@@ -95,7 +94,6 @@ public:
             os << "echo " << request_str;
         }
 
-        std::cout << "send client this: [" << os.str() << "]" << std::endl;
         TcpStreamResponsePtr resp(new TcpStreamResponse());
         resp->setResponseData(os.str());
         resp->pack();
@@ -159,7 +157,7 @@ public:
 
     /// @brief Destructor.
     ///
-    /// Removes active HTTP clients.
+    /// Removes active clients.
     virtual ~TcpListenerTest() {
         for (auto client = clients_.begin(); client != clients_.end();
              ++client) {
@@ -185,7 +183,7 @@ public:
     /// This method creates TcpTestClient instance and retains it in the clients_
     /// list.
     ///
-    /// @param request String containing the HTTP request to be sent.
+    /// @param request String containing the request to be sent.
     void startRequest(const std::string& request) {
         TcpTestClientPtr client(new TcpTestClient(io_service_,
                                     std::bind(&TcpListenerTest::clientDone, this)));
@@ -193,6 +191,13 @@ public:
         clients_.back()->startRequest(request);
     }
 
+    void startRequests(const std::list<std::string>& requests) {
+        TcpTestClientPtr client(new TcpTestClient(io_service_,
+                                    std::bind(&TcpListenerTest::clientDone, this)));
+        clients_.push_back(client);
+        clients_.back()->startRequests(requests);
+    }
+
     /// @brief Callback function invoke upon test timeout.
     ///
     /// It stops the IO service and reports test timeout.
@@ -354,5 +359,32 @@ TEST_F(TcpListenerTest, multipleClientsListen) {
     io_service_.poll();
 }
 
+TEST_F(TcpListenerTest, multipleRequetsPerClients) {
+    std::list<std::string>requests{ "one", "two", "three", "I am done"};
+
+    TcpTestListener listener(io_service_, IOAddress(SERVER_ADDRESS), SERVER_PORT,
+                             TlsContextPtr(), TcpListener::IdleTimeout(IDLE_TIMEOUT));
+
+    ASSERT_NO_THROW(listener.start());
+    ASSERT_EQ(SERVER_ADDRESS, listener.getLocalAddress().toText());
+    ASSERT_EQ(SERVER_PORT, listener.getLocalPort());
+    size_t num_clients = 5;
+    for ( auto i = 0; i < num_clients; ++i ) {
+        ASSERT_NO_THROW(startRequests(requests));
+    }
+
+    ASSERT_NO_THROW(runIOService());
+    ASSERT_EQ(num_clients, clients_.size());
+
+    std::list<std::string>expected_responses{ "echo one", "echo two", "echo three", "good bye"};
+    for (auto client = clients_.begin(); client != clients_.end(); ++client) {
+        EXPECT_TRUE((*client)->receiveDone());
+        EXPECT_FALSE((*client)->expectedEof());
+        EXPECT_EQ(expected_responses, (*client)->getResponses());
+    }
+
+    listener.stop();
+    io_service_.poll();
+}
 
 }
index bb3749d8768bfd7ebda81ab5193e9184c898f370..ac9e77f76e7791b57cc9c1143dce618b1ab5ede5 100644 (file)
@@ -48,7 +48,7 @@ public:
 
     /// @brief Connect to the listener.
     ///
-    /// @param request HTTP request in the textual format.
+    /// @param request request string to send.
     void connect() {
         tcp::endpoint endpoint(address::from_string(server_address_), server_port_);
         socket_.async_connect(endpoint,
@@ -73,9 +73,9 @@ public:
         });
     }
 
-    /// @brief Send HTTP request specified in textual format.
+    /// @brief Send request specified in textual format.
     ///
-    /// @param request HTTP request in the textual format.
+    /// @param request request in the textual format.
     void startRequest(const std::string& request) {
         tcp::endpoint endpoint(address::from_string(server_address_), server_port_);
         socket_.async_connect(endpoint,
@@ -106,6 +106,47 @@ public:
         });
     }
 
+    /// @brief Send request specified in textual format.
+    ///
+    /// @param request request in the textual format.
+    void startRequests(const std::list<std::string>& requests) {
+        requests_to_send_ = requests;
+
+        tcp::endpoint endpoint(address::from_string(server_address_), server_port_);
+        socket_.async_connect(endpoint,
+        [this](const boost::system::error_code& ec) {
+            receive_done_ = false;
+            expected_eof_ = false;
+            if (ec) {
+                // One would expect that async_connect wouldn't return
+                // EINPROGRESS error code, but simply wait for the connection
+                // to get established before the handler is invoked. It turns out,
+                // however, that on some OSes the connect handler may receive this
+                // error code which doesn't necessarily indicate a problem.
+                // Making an attempt to write and read from this socket will
+                // typically succeed. So, we ignore this error.
+                if (ec.value() != boost::asio::error::in_progress) {
+                    ADD_FAILURE() << "error occurred while connecting: "
+                                  << ec.message();
+                    done_callback_();
+                    return;
+                }
+            }
+
+            sendNextRequest();
+        });
+    }
+
+    /// @brief Sends the next request from the list of requests to send.
+    void sendNextRequest() {
+        // If there are any requests left to send, send them.
+        if (!requests_to_send_.empty()) {
+            std::string request = requests_to_send_.front();
+            requests_to_send_.pop_front();
+            sendRequest(request);
+        }
+    }
+
     /// @brief Send a stream request.
     ///
     /// @param request request data to send textual format.
@@ -126,6 +167,7 @@ public:
 
     /// @brief Wait for a server to close the connection.
     void waitForEof() {
+        stream_response_.reset(new TcpStreamRequest());
         receivePartialResponse(true);
     }
 
@@ -174,7 +216,6 @@ public:
                 sendPartialRequest(wire_request);
             } else {
                 // Request has been sent. Start receiving response.
-                response_.clear();
                 receivePartialResponse();
             }
        });
@@ -185,11 +226,14 @@ public:
         socket_.async_read_some(boost::asio::buffer(buf_.data(), buf_.size()),
                                 [this, expect_eof](const boost::system::error_code& ec,
                                        std::size_t bytes_transferred) {
+            if (!stream_response_) {
+                stream_response_.reset(new TcpStreamRequest());
+            }
+
             if (ec) {
                 // IO service stopped so simply return.
                 if (ec.value() == boost::asio::error::operation_aborted) {
                     return;
-
                 } else if ((ec.value() == boost::asio::error::try_again) ||
                            (ec.value() == boost::asio::error::would_block)) {
                     // If we should try again, make sure that there is no garbage
@@ -206,22 +250,39 @@ public:
                 }
             }
 
+            // Post received data to the current response.
             if (bytes_transferred > 0) {
-                response_.insert(response_.end(), buf_.data(),
-                                 buf_.data() + bytes_transferred);
+                stream_response_->postBuffer(buf_.data(), bytes_transferred);
             }
 
-            // Two consecutive new lines end the part of the response we're
-            // expecting.
-            if (response_.find("good bye", 0) != std::string::npos) {
-                receive_done_ = true;
-                done_callback_();
-            } else {
+            if (stream_response_->needData()) {
+                // Response is incomplete, keep reading.
                 receivePartialResponse();
+            } else {
+                // Response is complete, process it.
+                responseReceived();
             }
         });
     }
 
+    /// @brief Process a completed response received from the server.
+    void responseReceived() {
+        /// Unpack wire data into a string.
+        ASSERT_NO_THROW(stream_response_->unpack());
+        std::string response = stream_response_->getRequest();
+        responses_received_.push_back(response);
+
+        // Quit if server tells us "good bye".
+        if (response.find("good bye", 0) != std::string::npos) {
+            receive_done_ = true;
+            done_callback_();
+        }
+
+        // Clear out for the next one.
+        stream_response_.reset();
+        sendNextRequest();
+    }
+
     /// @brief Checks if the TCP connection is still open.
     ///
     /// Tests the TCP connection by trying to read from the socket.
@@ -287,13 +348,6 @@ public:
         socket_.close();
     }
 
-    /// @brief Returns the HTTP response string.
-    ///
-    /// @return string containing the response.
-    std::string getResponse() const {
-        return (response_);
-    }
-
     /// @brief Returns true if the receive completed without error.
     ///
     /// @return True if the receive completed successfully, false
@@ -309,6 +363,13 @@ public:
         return (expected_eof_);
     }
 
+    /// @brief Returns the list of received responses.
+    ///
+    /// @return list of string responses.
+    const std::list<std::string>& getResponses() {
+        return (responses_received_);
+    }
+
 private:
 
     /// @brief Holds reference to the IO service.
@@ -333,7 +394,6 @@ private:
     /// @brief IP port of the server.
     uint16_t server_port_;
 
-
     /// @brief Set to true when the receive has completed successfully.
     bool receive_done_;
 
@@ -342,6 +402,14 @@ private:
     /// expected it to do.
     bool expected_eof_;
 
+    /// @brief Pointer to the server response currently being received.
+    TcpStreamRequestPtr stream_response_;
+
+    /// @brief List of string requests to send.
+    std::list<std::string> requests_to_send_;
+
+    /// @brief List of string responses received.
+    std::list<std::string> responses_received_;
 };
 
 /// @brief Pointer to the TcpTestClient.