]> git.ipfire.org Git - thirdparty/kea.git/commitdiff
[#1239] make http client ConnectionPool thread safe
authorRazvan Becheriu <razvan@isc.org>
Mon, 18 May 2020 13:01:51 +0000 (16:01 +0300)
committerRazvan Becheriu <razvan@isc.org>
Mon, 18 May 2020 17:59:29 +0000 (20:59 +0300)
src/lib/http/client.cc

index cc7d6b7fc76decbb795b5c6f13e74268f9dc18ce..fcbcb44c58ca4fa37cd81a9b848f66ad3eeb61c8 100644 (file)
 #include <http/http_messages.h>
 #include <http/response_json.h>
 #include <http/response_parser.h>
+#include <util/multi_threading_mgr.h>
+
 #include <boost/bind.hpp>
 #include <boost/enable_shared_from_this.hpp>
 #include <boost/weak_ptr.hpp>
+
 #include <array>
+#include <iostream>
 #include <map>
+#include <mutex>
 #include <queue>
 
-#include <iostream>
-
 using namespace isc;
 using namespace isc::asiolink;
 using namespace http;
@@ -309,7 +312,7 @@ public:
     /// @param io_service Reference to the IO service to be used by the
     /// connections.
     explicit ConnectionPool(IOService& io_service)
-        : io_service_(io_service), conns_(), queue_() {
+        : io_service_(io_service), conns_(), queue_(), mutex_() {
     }
 
     /// @brief Destructor.
@@ -341,6 +344,125 @@ public:
                         HttpClient::RequestHandler& callback,
                         HttpClient::ConnectHandler& connect_callback,
                         HttpClient::CloseHandler& close_callback) {
+        if (MultiThreadingMgr::instance().getMode()) {
+            std::lock_guard<std::mutex> lk(mutex_);
+            return (getNextRequestInternal(url, request, response,
+                                           request_timeout, callback,
+                                           connect_callback, close_callback));
+        } else {
+            return (getNextRequestInternal(url, request, response,
+                                           request_timeout, callback,
+                                           connect_callback, close_callback));
+        }
+    }
+
+    /// @brief Queue next request for sending to the server.
+    ///
+    /// A new transaction is started immediately, if there is no other request
+    /// in progress for the given URL. Otherwise, the request is queued.
+    ///
+    /// @param url Destination where the request should be sent.
+    /// @param request Pointer to the request to be sent to the server.
+    /// @param response Pointer to the object into which the response should be
+    /// stored.
+    /// @param request_timeout Requested timeout for the transaction in
+    /// milliseconds.
+    /// @param request_callback Pointer to the user callback to be invoked when the
+    /// transaction ends.
+    /// @param connect_callback Pointer to the user callback to be invoked when the
+    /// client connects to the server.
+    /// @param close_callback Pointer to the user callback to be invoked when the
+    /// client closes the connection to the server.
+    void queueRequest(const Url& url,
+                      const HttpRequestPtr& request,
+                      const HttpResponsePtr& response,
+                      const long request_timeout,
+                      const HttpClient::RequestHandler& request_callback,
+                      const HttpClient::ConnectHandler& connect_callback,
+                      const HttpClient::CloseHandler& close_callback) {
+        if (MultiThreadingMgr::instance().getMode()) {
+            std::lock_guard<std::mutex> lk(mutex_);
+            return (queueRequestInternal(url, request, response,
+                                         request_timeout, request_callback,
+                                          connect_callback, close_callback));
+        } else {
+            return (queueRequestInternal(url, request, response,
+                                         request_timeout, request_callback,
+                                         connect_callback, close_callback));
+        }
+    }
+
+    /// @brief Closes connection and removes associated information from the
+    /// connection pool.
+    ///
+    /// @param url URL for which connection should be closed.
+    void closeConnection(const Url& url) {
+        if (MultiThreadingMgr::instance().getMode()) {
+            std::lock_guard<std::mutex> lk(mutex_);
+            closeConnectionInternal(url);
+        } else {
+            closeConnectionInternal(url);
+        }
+    }
+
+    /// @brief Closes all connections and removes associated information from
+    /// the connection pool.
+    void closeAll() {
+        if (MultiThreadingMgr::instance().getMode()) {
+            std::lock_guard<std::mutex> lk(mutex_);
+            closeAllInternal();
+        } else {
+            closeAllInternal();
+        }
+    }
+
+    /// @brief Closes a connection if it has an out-of-bandwidth socket event
+    ///
+    /// If the pool contains a connection using the given socket and that
+    /// connection is currently in a transaction the method returns as this
+    /// indicates a normal ready event.  If the connection is not in an
+    /// ongoing transaction, then the connection is closed.
+    ///
+    /// This is method is intended to be used to detect and clean up then
+    /// sockets that are marked ready outside of transactions. The most common
+    /// case is the other end of the socket being closed.
+    ///
+    /// @param socket_fd socket descriptor to check
+    void closeIfOutOfBandwidth(int socket_fd) {
+        if (MultiThreadingMgr::instance().getMode()) {
+            std::lock_guard<std::mutex> lk(mutex_);
+            closeIfOutOfBandwidth(fd);
+        } else {
+            closeIfOutOfBandwidth(fd);
+        }
+    }
+
+private:
+
+    /// @brief Returns next queued request for the given URL.
+    ///
+    /// This method should be called in a thread safe context.
+    ///
+    /// @param url URL for which next queued request should be retrieved.
+    /// @param [out] request Pointer to the queued request.
+    /// @param [out] response Pointer to the object into which response should
+    /// be stored.
+    /// @param request_timeout Requested timeout for the transaction.
+    /// @param callback Pointer to the user callback for this request.
+    /// @param connect_callback Pointer to the user callback invoked when
+    /// the client connects to the server.
+    /// @param close_callback Pointer to the user callback invoked when
+    /// the client closes the connection to the server.
+    ///
+    /// @return true if the request for the given URL has been retrieved,
+    /// false if there are no more requests queued for this URL.
+    bool getNextRequestInternal(const Url& url,
+                                HttpRequestPtr& request,
+                                HttpResponsePtr& response,
+                                long& request_timeout,
+                                HttpClient::RequestHandler& callback,
+                                HttpClient::ConnectHandler& connect_callback,
+                                HttpClient::CloseHandler& close_callback) {
         // Check if there is a queue for this URL. If there is no queue, there
         // is no request queued either.
         auto it = queue_.find(url);
@@ -364,9 +486,11 @@ public:
 
     /// @brief Queue next request for sending to the server.
     ///
-    /// A new transaction is started immediatelly, if there is no other request
+    /// A new transaction is started immediately, if there is no other request
     /// in progress for the given URL. Otherwise, the request is queued.
     ///
+    /// This method should be called in a thread safe context.
+    ///
     /// @param url Destination where the request should be sent.
     /// @param request Pointer to the request to be sent to the server.
     /// @param response Pointer to the object into which the response should be
@@ -379,13 +503,13 @@ public:
     /// client connects to the server.
     /// @param close_callback Pointer to the user callback to be invoked when the
     /// client closes the connection to the server.
-    void queueRequest(const Url& url,
-                      const HttpRequestPtr& request,
-                      const HttpResponsePtr& response,
-                      const long request_timeout,
-                      const HttpClient::RequestHandler& request_callback,
-                      const HttpClient::ConnectHandler& connect_callback,
-                      const HttpClient::CloseHandler& close_callback) {
+    void queueRequestInternal(const Url& url,
+                              const HttpRequestPtr& request,
+                              const HttpResponsePtr& response,
+                              const long request_timeout,
+                              const HttpClient::RequestHandler& request_callback,
+                              const HttpClient::ConnectHandler& connect_callback,
+                              const HttpClient::CloseHandler& close_callback) {
         auto it = conns_.find(url);
         if (it != conns_.end()) {
             ConnectionPtr conn = it->second;
@@ -397,13 +521,12 @@ public:
                                                    request_callback,
                                                    connect_callback,
                                                    close_callback));
-
             } else {
                 // Connection is idle, so we can start the transaction.
                 conn->doTransaction(request, response, request_timeout,
-                                    request_callback, connect_callback, close_callback);
+                                    request_callback, connect_callback,
+                                    close_callback);
             }
-
         } else {
             // There is no connection with this destination yet. Let's create
             // it and start the transaction.
@@ -418,8 +541,10 @@ public:
     /// @brief Closes connection and removes associated information from the
     /// connection pool.
     ///
-    /// @param url URL for which connection shuld be closed.
-    void closeConnection(const Url& url) {
+    /// This method should be called in a thread safe context.
+    ///
+    /// @param url URL for which connection should be closed.
+    void closeConnectionInternal(const Url& url) {
         // Close connection for the specified URL.
         auto conns_it = conns_.find(url);
         if (conns_it != conns_.end()) {
@@ -436,7 +561,9 @@ public:
 
     /// @brief Closes all connections and removes associated information from
     /// the connection pool.
-    void closeAll() {
+    ///
+    /// This method should be called in a thread safe context.
+    void closeAllInternal() {
         for (auto conns_it = conns_.begin(); conns_it != conns_.end();
              ++conns_it) {
             conns_it->second->close();
@@ -454,11 +581,13 @@ public:
     /// ongoing transaction, then the connection is closed.
     ///
     /// This is method is intended to be used to detect and clean up then
-    /// sockets that are marked ready outside of transactions. The most comman
+    /// sockets that are marked ready outside of transactions. The most common
     /// case is the other end of the socket being closed.
     ///
+    /// This method should be called in a thread safe context.
+    ///
     /// @param socket_fd socket descriptor to check
-    void closeIfOutOfBandwidth(int socket_fd) {
+    void closeIfOutOfBandwidthInternal(int socket_fd) {
         // First we look for a connection with the socket.
         for (auto conns_it = conns_.begin(); conns_it != conns_.end();
              ++conns_it) {
@@ -484,8 +613,6 @@ public:
         }
     }
 
-private:
-
     /// @brief Holds reference to the IO service.
     IOService& io_service_;
 
@@ -521,12 +648,16 @@ private:
 
         /// @brief Holds pointer to the request.
         HttpRequestPtr request_;
+
         /// @brief Holds pointer to the response.
         HttpResponsePtr response_;
+
         /// @brief Holds requested timeout value.
         long request_timeout_;
+
         /// @brief Holds pointer to the user callback.
         HttpClient::RequestHandler callback_;
+
         /// @brief Holds pointer to the user callback for connect.
         HttpClient::ConnectHandler connect_callback_;
 
@@ -536,6 +667,9 @@ private:
 
     /// @brief Holds the queue of requests for different URLs.
     std::map<Url, std::queue<RequestDescriptor> > queue_;
+
+    /// @brief Mutex to protect the internal state.
+    std::mutex mutex_;
 };
 
 Connection::Connection(IOService& io_service,