]> git.ipfire.org Git - thirdparty/kea.git/commitdiff
[#691,!395] Add Connection socket exposure and close_callback handler
authorThomas Markwalder <tmark@isc.org>
Sat, 22 Jun 2019 15:38:55 +0000 (11:38 -0400)
committerThomas Markwalder <tmark@isc.org>
Thu, 27 Jun 2019 11:49:10 +0000 (07:49 -0400)
Addes close_callback and exposes Connectin's TCP socket to it
and connect_callback.

src/lib/http/client.h b/src/lib/http/client.h
    HttpClient:
        Added second parameter, socket FD, to ConnectHandler
        Added CloseHandler typedef

        asyncSendRequest() - added close_callback parameter

src/lib/http/client.cc
    Connection - added close_callback parameter to all
    methods that accept connect_callback parameter

    Added invocation of close_callback wherever the connection's
    socket is closed.

src/lib/http/tests/server_client_unittests.cc
    TEST_F(HttpClientTest, connectCloseCallbacks) - new test that
    verifies connect and close callback operations

configure.ac
src/lib/http/client.cc
src/lib/http/client.h
src/lib/http/tests/server_client_unittests.cc

index 737bc64f6d4ec8effa1423f782a329757d793116..455a9fa3e8d0bba014c1e9c78b771ac3b4f3a1ac 100644 (file)
@@ -1327,7 +1327,7 @@ fi
 
 #
 # Doesn't seem to be required?
-CPPFLAGS="$CPPFLAGS -DBOOST_ASIO_HEADER_ONLY"
+#CPPFLAGS="$CPPFLAGS -DBOOST_ASIO_HEADER_ONLY"
 #
 # Disable threads: they seems to break things on some systems
 # As now we use threads in boost ASIO this is commented out...
index 971e493e11a3238d9323b8a1ca492f10d01b73ac..db6473115ea67e9dc2b8598ca141fbda1cc653c9 100644 (file)
@@ -124,9 +124,12 @@ public:
     /// transaction completes.
     /// @param connect_callback Pointer to the callback function to be invoked when
     /// the client connects to the server.
+    /// @param close_callback Pointer to the callback function to be invoked when
+    /// the client closes the socket to the server.
     void doTransaction(const HttpRequestPtr& request, const HttpResponsePtr& response,
                        const long request_timeout, const HttpClient::RequestHandler& callback,
-                       const HttpClient::ConnectHandler& connect_callback);
+                       const HttpClient::ConnectHandler& connect_callback,
+                       const HttpClient::CloseHandler& close_callback);
 
     /// @brief Closes the socket and cancels the request timer.
     void close();
@@ -265,6 +268,9 @@ private:
 
     /// @brief Identifier of the current transaction.
     uint64_t current_transid_;
+
+    /// @brief User supplied callback.
+    HttpClient::CloseHandler close_callback_;
 };
 
 /// @brief Shared pointer to the connection.
@@ -305,6 +311,8 @@ public:
     /// @param callback Pointer to the user callback for this request.
     /// @param connect_callback Pointer to the user callback invoked when
     /// the client connects to the server.
+    /// @param close_callback Pointer to the user callback invoked when
+    /// the client closes the connection to the server.
     ///
     /// @return true if the request for the given URL has been retrieved,
     /// false if there are no more requests queued for this URL.
@@ -313,7 +321,8 @@ public:
                         HttpResponsePtr& response,
                         long& request_timeout,
                         HttpClient::RequestHandler& callback,
-                        HttpClient::ConnectHandler& connect_callback) {
+                        HttpClient::ConnectHandler& connect_callback,
+                        HttpClient::CloseHandler& close_callback) {
         // Check if there is a queue for this URL. If there is no queue, there
         // is no request queued either.
         auto it = queue_.find(url);
@@ -327,6 +336,7 @@ public:
                 request_timeout = desc.request_timeout_,
                 callback = desc.callback_;
                 connect_callback = desc.connect_callback_;
+                close_callback = desc.close_callback_;
                 return (true);
             }
         }
@@ -349,12 +359,15 @@ public:
     /// transaction ends.
     /// @param connect_callback Pointer to the user callback to be invoked when the
     /// client connects to the server.
+    /// @param close_callback Pointer to the user callback to be invoked when the
+    /// client closes the connection to the server.
     void queueRequest(const Url& url,
                       const HttpRequestPtr& request,
                       const HttpResponsePtr& response,
                       const long request_timeout,
                       const HttpClient::RequestHandler& request_callback,
-                      const HttpClient::ConnectHandler& connect_callback) {
+                      const HttpClient::ConnectHandler& connect_callback,
+                      const HttpClient::CloseHandler& close_callback) {
         auto it = conns_.find(url);
         if (it != conns_.end()) {
             ConnectionPtr conn = it->second;
@@ -364,12 +377,13 @@ public:
                 queue_[url].push(RequestDescriptor(request, response,
                                                    request_timeout,
                                                    request_callback,
-                                                   connect_callback));
+                                                   connect_callback,
+                                                   close_callback));
 
             } else {
                 // Connection is idle, so we can start the transaction.
                 conn->doTransaction(request, response, request_timeout,
-                                    request_callback, connect_callback);
+                                    request_callback, connect_callback, close_callback);
             }
 
         } else {
@@ -378,7 +392,7 @@ public:
             ConnectionPtr conn(new Connection(io_service_, shared_from_this(),
                                               url));
             conn->doTransaction(request, response, request_timeout, request_callback,
-                                connect_callback);
+                                connect_callback, close_callback);
             conns_[url] = conn;
         }
     }
@@ -434,15 +448,19 @@ private:
         /// @param callback Pointer to the user callback.
         /// @param connect_callback pointer to the user callback to be invoked
         /// when the client connects to the server.
+        /// @param close_callback pointer to the user callback to be invoked
+        /// when the client closes the connection to the server.
         RequestDescriptor(const HttpRequestPtr& request,
                           const HttpResponsePtr& response,
                           const long request_timeout,
                           const HttpClient::RequestHandler& callback,
-                          const HttpClient::ConnectHandler& connect_callback)
+                          const HttpClient::ConnectHandler& connect_callback,
+                          const HttpClient::CloseHandler& close_callback)
             : request_(request), response_(response),
               request_timeout_(request_timeout),
               callback_(callback),
-              connect_callback_(connect_callback) {
+              connect_callback_(connect_callback),
+              close_callback_(close_callback) {
         }
 
         /// @brief Holds pointer to the request.
@@ -455,6 +473,9 @@ private:
         HttpClient::RequestHandler callback_;
         /// @brief Holds pointer to the user callback for connect.
         HttpClient::ConnectHandler connect_callback_;
+
+        /// @brief Holds pointer to the user callback for close.
+        HttpClient::CloseHandler close_callback_;
     };
 
     /// @brief Holds the queue of requests for different URLs.
@@ -466,7 +487,7 @@ Connection::Connection(IOService& io_service,
                        const Url& url)
     : conn_pool_(conn_pool), url_(url), socket_(io_service), timer_(io_service),
       current_request_(), current_response_(), parser_(), current_callback_(),
-      buf_(), input_buf_(), current_transid_(0) {
+      buf_(), input_buf_(), current_transid_(0), close_callback_() {
 }
 
 Connection::~Connection() {
@@ -486,13 +507,15 @@ Connection::doTransaction(const HttpRequestPtr& request,
                           const HttpResponsePtr& response,
                           const long request_timeout,
                           const HttpClient::RequestHandler& callback,
-                          const HttpClient::ConnectHandler& connect_callback) {
+                          const HttpClient::ConnectHandler& connect_callback,
+                          const HttpClient::CloseHandler& close_callback) {
     try {
         current_request_ = request;
         current_response_ = response;
         parser_.reset(new HttpResponseParser(*current_response_));
         parser_->initModel();
         current_callback_ = callback;
+        close_callback_ = close_callback;
 
         // Starting new transaction. Generate new transaction id.
         ++current_transid_;
@@ -506,6 +529,9 @@ Connection::doTransaction(const HttpRequestPtr& request,
         // data over this socket, when the peer may close the connection. In this
         // case we'll need to re-transmit but we don't handle it here.
         if (socket_.getASIOSocket().is_open() && !socket_.isUsable()) {
+            if (close_callback) {
+                close_callback(socket_.getNative());
+            }
             socket_.close();
         }
 
@@ -542,6 +568,10 @@ Connection::doTransaction(const HttpRequestPtr& request,
 
 void
 Connection::close() {
+    if (close_callback_) {
+        close_callback_(socket_.getNative());
+    }
+    
     timer_.cancel();
     socket_.close();
     resetState();
@@ -632,10 +662,12 @@ Connection::terminate(const boost::system::error_code& ec,
     long request_timeout;
     HttpClient::RequestHandler callback;
     HttpClient::ConnectHandler connect_callback;
+    HttpClient::CloseHandler close_callback;
     ConnectionPoolPtr conn_pool = conn_pool_.lock();
     if (conn_pool && conn_pool->getNextRequest(url_, request, response, request_timeout,
-                                               callback, connect_callback)) {
-        doTransaction(request, response, request_timeout, callback, connect_callback);
+                                               callback, connect_callback, close_callback)) {
+        doTransaction(request, response, request_timeout, callback, 
+                      connect_callback, close_callback);
     }
 }
 
@@ -685,7 +717,7 @@ Connection::connectCallback(HttpClient::ConnectHandler connect_callback,
     if (connect_callback) {
         // If the user defined callback indicates that the connection
         // should not be continued.
-        if (!connect_callback(ec)) {
+        if (!connect_callback(ec, socket_.getNative())) {
             return;
         }
     }
@@ -844,7 +876,8 @@ HttpClient::asyncSendRequest(const Url& url, const HttpRequestPtr& request,
                              const HttpResponsePtr& response,
                              const HttpClient::RequestHandler& request_callback,
                              const HttpClient::RequestTimeout& request_timeout,
-                             const HttpClient::ConnectHandler& connect_callback) {
+                             const HttpClient::ConnectHandler& connect_callback,
+                             const HttpClient::CloseHandler& close_callback) {
     if (!url.isValid()) {
         isc_throw(HttpClientError, "invalid URL specified for the HTTP client");
     }
@@ -862,7 +895,7 @@ HttpClient::asyncSendRequest(const Url& url, const HttpRequestPtr& request,
     }
 
     impl_->conn_pool_->queueRequest(url, request, response, request_timeout.value_,
-                                    request_callback, connect_callback);
+                                    request_callback, connect_callback, close_callback);
 }
 
 void
index ac33c9f9a3fffed2d33e88a071ea5b3ce3c6c9ef..46ec5c86159e6af97a57fe04f47d5f803d293b88 100644 (file)
@@ -1,4 +1,4 @@
-// Copyright (C) 2018 Internet Systems Consortium, Inc. ("ISC")
+// Copyright (C) 2018-2019 Internet Systems Consortium, Inc. ("ISC")
 //
 // This Source Code Form is subject to the terms of the Mozilla Public
 // License, v. 2.0. If a copy of the MPL was not distributed with this
@@ -86,9 +86,18 @@ public:
     ///
     /// Returned boolean value indicates whether the client should continue
     /// connecting to the server (if true) or not (false).
+    /// It is passed the IO error code along with the  native socket handle of
+    /// the connection's TCP socket.  This always the socket's event readiness
+    /// to be monitored via select() or epoll.
+    ///
     /// @note Beware that the IO error code can be set to "in progress"
     /// so a not null error code does not always mean the connect failed.
-    typedef std::function<bool(const boost::system::error_code&)> ConnectHandler;
+    typedef std::function<bool(const boost::system::error_code&, const int)> ConnectHandler;
+
+    /// @brief Optional handler invoked when client closes the connection to the server.
+    ///
+    /// It is passed the native socket handler of the connection's TCP socket.
+    typedef std::function<void(const int)> CloseHandler;
 
     /// @brief Constructor.
     ///
@@ -167,7 +176,9 @@ public:
                           const RequestTimeout& request_timeout =
                           RequestTimeout(10000),
                           const ConnectHandler& connect_callback =
-                          ConnectHandler());
+                          ConnectHandler(),
+                          const CloseHandler& close_callback =
+                          CloseHandler());
 
     /// @brief Closes all connections.
     void stop();
index a0bdbb20bfcf6c936c368724ad15a669c78d7e3a..d31de574a723220135878ac004be393d3bee7440 100644 (file)
@@ -724,7 +724,7 @@ public:
             "Content-Length: 3\r\n\r\n"
             "{ }";
 
-        // Use custom listener and the specialized connection object. 
+        // Use custom listener and the specialized connection object.
         HttpListenerCustom<HttpConnectionType>
             listener(io_service_, IOAddress(SERVER_ADDRESS), SERVER_PORT,
                      factory_, HttpListener::RequestTimeout(REQUEST_TIMEOUT),
@@ -1375,6 +1375,123 @@ public:
         ASSERT_NO_THROW(runIOService());
     }
 
+    /// @brief Tests that underlying TCP socket can be registered and
+    /// unregsitered via connection and close callbacks.
+    ///
+    /// It conducts to consequetive requests over the same client.
+    ///
+    /// @param version HTTP version to be used.
+    void testConnectCloseCallbacks(const HttpVersion& version) {
+        // Start the server.
+        ASSERT_NO_THROW(listener_.start());
+
+        // Create a client and specify the URL on which the server can be reached.
+        HttpClient client(io_service_);
+        Url url("http://127.0.0.1:18123");
+
+        // Initiate request to the server.
+        PostHttpRequestJsonPtr request1 = createRequest("sequence", 1, version);
+        HttpResponseJsonPtr response1(new HttpResponseJson());
+        unsigned resp_num = 0;
+        ExternalMonitor monitor;
+        ASSERT_NO_THROW(client.asyncSendRequest(url, request1, response1,
+            [this, &resp_num](const boost::system::error_code& ec,
+                              const HttpResponsePtr&,
+                              const std::string&) {
+            if (++resp_num > 1) {
+                io_service_.stop();
+            }
+            EXPECT_FALSE(ec);
+        },
+            HttpClient::RequestTimeout(10000),
+            boost::bind(&ExternalMonitor::connectHandler, &monitor, _1, _2),
+            boost::bind(&ExternalMonitor::closeHandler, &monitor, _1)
+        ));
+
+        // Initiate another request to the destination.
+        PostHttpRequestJsonPtr request2 = createRequest("sequence", 2, version);
+        HttpResponseJsonPtr response2(new HttpResponseJson());
+        ASSERT_NO_THROW(client.asyncSendRequest(url, request2, response2,
+            [this, &resp_num](const boost::system::error_code& ec,
+                              const HttpResponsePtr&,
+                              const std::string&) {
+            if (++resp_num > 1) {
+                io_service_.stop();
+            }
+            EXPECT_FALSE(ec);
+        },
+            HttpClient::RequestTimeout(10000),
+            boost::bind(&ExternalMonitor::connectHandler, &monitor, _1, _2),
+            boost::bind(&ExternalMonitor::closeHandler, &monitor, _1)
+        ));
+
+        // Actually trigger the requests. The requests should be handlded by the
+        // server one after another. While the first request is being processed
+        // the server should queue another one.
+        ASSERT_NO_THROW(runIOService());
+
+        // Make sure that the received responses are different. We check that by
+        // comparing value of the sequence parameters.
+        ASSERT_TRUE(response1);
+        ConstElementPtr sequence1 = response1->getJsonElement("sequence");
+        ASSERT_TRUE(sequence1);
+
+        ASSERT_TRUE(response2);
+        ConstElementPtr sequence2 = response2->getJsonElement("sequence");
+        ASSERT_TRUE(sequence2);
+
+        EXPECT_NE(sequence1->intValue(), sequence2->intValue());
+    }
+
+    /// @brief Simulates external registery of Connection TCP sockets
+    ///
+    /// Provides methods compatible with Connection callbacks for connnect
+    /// and close operations.
+    class ExternalMonitor {
+    public:
+        /// @breif Constructor
+        ExternalMonitor() : registered_fd_(-1) {};
+
+        /// @brief Connect callback handler
+        /// @param ec Error status of the ASIO connect
+        /// @param tcp_native_fd socket descriptor to register
+        bool connectHandler(const boost::system::error_code& ec, int tcp_native_fd) {
+            if (!ec || ec.value() == boost::asio::error::in_progress) {
+                if (tcp_native_fd >= 0) {
+                    registered_fd_ = tcp_native_fd;
+                    return (true);
+                }
+
+                // Invalid fd?, this really should not be possible.  EXPECT makes
+                // sure we log it.
+                EXPECT_TRUE (tcp_native_fd >= 0) << "no ec error but invalid fd?";
+                return (false);
+
+            } else if (ec.value() == boost::asio::error::already_connected) {
+                if (registered_fd_ != tcp_native_fd) {
+                    return (false);
+                }
+            }
+
+            // ec indicates an error, return true, so that error can be handled
+            // by Connection logic.
+            return (true);
+        }
+
+        /// @brief Close callback handler
+        ///
+        /// @param tcp_native_fd socket descriptor to register
+        void closeHandler(int tcp_native_fd) {
+            EXPECT_EQ(tcp_native_fd, registered_fd_) << "closeHandler fd mismatch";
+            if (tcp_native_fd >= 0) {
+                registered_fd_ = -1;
+            }
+        }
+
+        /// @brief Keeps track of socket currently "registered" for external monitoring.
+        int registered_fd_;
+    };
+
     /// @brief Instance of the listener used in the tests.
     HttpListener listener_;
 
@@ -1384,6 +1501,7 @@ public:
     /// @brief Instance of the third listener used in the tests (with short idle
     /// timeout).
     HttpListener listener3_;
+
 };
 
 // Test that two conscutive requests can be sent over the same (persistent)
@@ -1671,7 +1789,7 @@ TEST_F(HttpClientTest, clientConnectTimeout) {
        // try to send a request to the server. This simulates the
        // case of connect() taking very long and should eventually
        // cause the transaction to time out.
-       [](const boost::system::error_code& /*ec*/) {
+       [](const boost::system::error_code& /*ec*/, int) {
            return (false);
     }));
 
@@ -1688,5 +1806,9 @@ TEST_F(HttpClientTest, clientConnectTimeout) {
     ASSERT_NO_THROW(runIOService());
 }
 
+/// Tests that connect and close callbacks work correctly.
+TEST_F(HttpClientTest, connectCloseCallbacks) {
+    ASSERT_NO_FATAL_FAILURE(testConnectCloseCallbacks(HttpVersion(1, 1)));
+}
 
 }