]> git.ipfire.org Git - thirdparty/kea.git/commitdiff
[#1239] fixed possible dead lock
authorRazvan Becheriu <razvan@isc.org>
Thu, 4 Jun 2020 19:54:03 +0000 (22:54 +0300)
committerRazvan Becheriu <razvan@isc.org>
Tue, 16 Jun 2020 09:02:52 +0000 (09:02 +0000)
src/lib/http/client.cc

index 88e1080295ff6b7e307e5d305e63ccb6e8a85b83..dd2e73c6f9b7ca65571c35f794e7abc34e1bac8e 100644 (file)
@@ -20,6 +20,7 @@
 #include <boost/enable_shared_from_this.hpp>
 #include <boost/weak_ptr.hpp>
 
+#include <atomic>
 #include <array>
 #include <iostream>
 #include <map>
@@ -144,7 +145,7 @@ public:
     /// @brief Checks if a transaction has been initiated over this connection.
     ///
     /// @return true if transaction has been initiated, false otherwise.
-    bool isTransactionOngoing();
+    bool isTransactionOngoing() const;
 
     /// @brief Checks if a socket descriptor belongs to this connection.
     ///
@@ -202,13 +203,6 @@ private:
     /// Should be called in a thread safe context.
     void closeInternal();
 
-    /// @brief Checks if a transaction has been initiated over this connection.
-    ///
-    /// Should be called in a thread safe context.
-    ///
-    /// @return true if transaction has been initiated, false otherwise.
-    bool isTransactionOngoingInternal() const;
-
     /// @brief Checks and logs if premature transaction timeout is suspected.
     ///
     /// Should be called in a thread safe context.
@@ -387,6 +381,9 @@ private:
     /// @brief User supplied callback.
     HttpClient::CloseHandler close_callback_;
 
+    /// @brief Flag to indicate that a transaction is running.
+    std::atomic<bool> started_;
+
     /// @brief Mutex to protect the internal state.
     std::mutex mutex_;
 };
@@ -434,22 +431,12 @@ public:
     ///
     /// @return true if the request for the given URL has been retrieved,
     /// false if there are no more requests queued for this URL.
-    bool getNextRequest(const Url& url,
-                        HttpRequestPtr& request,
-                        HttpResponsePtr& response,
-                        long& request_timeout,
-                        HttpClient::RequestHandler& callback,
-                        HttpClient::ConnectHandler& connect_callback,
-                        HttpClient::CloseHandler& close_callback) {
+    bool processNextRequest(const Url& url) {
         if (MultiThreadingMgr::instance().getMode()) {
             std::lock_guard<std::mutex> lk(mutex_);
-            return (getNextRequestInternal(url, request, response,
-                                           request_timeout, callback,
-                                           connect_callback, close_callback));
+            return (processNextRequestInternal(url));
         } else {
-            return (getNextRequestInternal(url, request, response,
-                                           request_timeout, callback,
-                                           connect_callback, close_callback));
+            return (processNextRequestInternal(url));
         }
     }
 
@@ -553,13 +540,7 @@ private:
     ///
     /// @return true if the request for the given URL has been retrieved,
     /// false if there are no more requests queued for this URL.
-    bool getNextRequestInternal(const Url& url,
-                                HttpRequestPtr& request,
-                                HttpResponsePtr& response,
-                                long& request_timeout,
-                                HttpClient::RequestHandler& callback,
-                                HttpClient::ConnectHandler& connect_callback,
-                                HttpClient::CloseHandler& close_callback) {
+    bool processNextRequestInternal(const Url& url) {
         // Check if there is a queue for this URL. If there is no queue, there
         // is no request queued either.
         auto it = queue_.find(url);
@@ -568,12 +549,10 @@ private:
             if (!it->second.empty()) {
                 RequestDescriptor desc = it->second.front();
                 it->second.pop();
-                request = desc.request_;
-                response = desc.response_;
-                request_timeout = desc.request_timeout_,
-                callback = desc.callback_;
-                connect_callback = desc.connect_callback_;
-                close_callback = desc.close_callback_;
+                desc.conn_->doTransaction(desc.request_, desc.response_,
+                                          desc.request_timeout_, desc.callback_,
+                                          desc.connect_callback_,
+                                          desc.close_callback_);
                 return (true);
             }
         }
@@ -613,7 +592,7 @@ private:
             // There is a connection for this URL already. Check if it is idle.
             if (conn->isTransactionOngoing()) {
                 // Connection is busy, so let's queue the request.
-                queue_[url].push(RequestDescriptor(request, response,
+                queue_[url].push(RequestDescriptor(conn, request, response,
                                                    request_timeout,
                                                    request_callback,
                                                    connect_callback,
@@ -730,19 +709,22 @@ private:
         /// when the client connects to the server.
         /// @param close_callback pointer to the user callback to be invoked
         /// when the client closes the connection to the server.
-        RequestDescriptor(const HttpRequestPtr& request,
+        RequestDescriptor(const ConnectionPtr& conn,
+                          const HttpRequestPtr& request,
                           const HttpResponsePtr& response,
-                          const long request_timeout,
+                          const long& request_timeout,
                           const HttpClient::RequestHandler& callback,
                           const HttpClient::ConnectHandler& connect_callback,
                           const HttpClient::CloseHandler& close_callback)
-            : request_(request), response_(response),
-              request_timeout_(request_timeout),
-              callback_(callback),
+            : conn_(conn), request_(request), response_(response),
+              request_timeout_(request_timeout), callback_(callback),
               connect_callback_(connect_callback),
               close_callback_(close_callback) {
         }
 
+        /// @brief Holds the connection.
+        ConnectionPtr conn_;
+
         /// @brief Holds pointer to the request.
         HttpRequestPtr request_;
 
@@ -774,7 +756,8 @@ Connection::Connection(IOService& io_service,
                        const Url& url)
     : conn_pool_(conn_pool), url_(url), socket_(io_service), timer_(io_service),
       current_request_(), current_response_(), parser_(), current_callback_(),
-      buf_(), input_buf_(), current_transid_(0), close_callback_() {
+      buf_(), input_buf_(), current_transid_(0), close_callback_(),
+      started_(false) {
 }
 
 Connection::~Connection() {
@@ -783,6 +766,7 @@ Connection::~Connection() {
 
 void
 Connection::resetState() {
+    started_ = false;
     current_request_.reset();
     current_response_.reset();
     parser_.reset();
@@ -830,6 +814,7 @@ Connection::doTransactionInternal(const HttpRequestPtr& request,
                                   const HttpClient::ConnectHandler& connect_callback,
                                   const HttpClient::CloseHandler& close_callback) {
     try {
+        started_ = true;
         current_request_ = request;
         current_response_ = response;
         parser_.reset(new HttpResponseParser(*current_response_));
@@ -906,18 +891,8 @@ Connection::closeInternal() {
 }
 
 bool
-Connection::isTransactionOngoing() {
-    if (MultiThreadingMgr::instance().getMode()) {
-        std::lock_guard<std::mutex> lk(mutex_);
-        return (isTransactionOngoingInternal());
-    } else {
-        return (isTransactionOngoingInternal());
-    }
-}
-
-bool
-Connection::isTransactionOngoingInternal() const {
-    return (static_cast<bool>(current_request_));
+Connection::isTransactionOngoing() const {
+    return (started_);
 }
 
 bool
@@ -942,7 +917,7 @@ Connection::checkPrematureTimeoutInternal(const uint64_t transid) {
     // Also, if there is a transaction in progress but the ID of that
     // transaction doesn't match the one associated with the handler it,
     // also means that the transaction timed out prematurely.
-    if (!isTransactionOngoingInternal() || (transid != current_transid_)) {
+    if (!isTransactionOngoing() || (transid != current_transid_)) {
         LOG_WARN(http_logger, HTTP_PREMATURE_CONNECTION_TIMEOUT_OCCURRED);
         return (true);
     }
@@ -965,7 +940,7 @@ Connection::terminateInternal(const boost::system::error_code& ec,
                               const std::string& parsing_error) {
     HttpResponsePtr response;
 
-    if (isTransactionOngoingInternal()) {
+    if (isTransactionOngoing()) {
 
         timer_.cancel();
         socket_.cancel();
@@ -980,9 +955,9 @@ Connection::terminateInternal(const boost::system::error_code& ec,
             LOG_DEBUG(http_logger, isc::log::DBGLVL_TRACE_BASIC_DATA,
                       HTTP_SERVER_RESPONSE_RECEIVED_DETAILS)
                 .arg(url_.toText())
-                .arg((parser_ ?
-                      parser_->getBufferAsString(MAX_LOGGED_MESSAGE_SIZE) :
-                      "[HttpResponseParser is null]"));
+                .arg(parser_ ?
+                     parser_->getBufferAsString(MAX_LOGGED_MESSAGE_SIZE) :
+                     "[HttpResponseParser is null]");
 
         } else {
             std::string err = parsing_error.empty() ? ec.message() :
@@ -999,15 +974,12 @@ Connection::terminateInternal(const boost::system::error_code& ec,
                 LOG_DEBUG(http_logger, isc::log::DBGLVL_TRACE_BASIC_DATA,
                           HTTP_BAD_SERVER_RESPONSE_RECEIVED_DETAILS)
                     .arg(url_.toText())
-                    .arg((parser_ ? parser_->getBufferAsString() :
-                                    "[HttpResponseParser is null]"));
+                    .arg(parser_ ?
+                         parser_->getBufferAsString(MAX_LOGGED_MESSAGE_SIZE) :
+                         "[HttpResponseParser is null]");
             }
         }
 
-        // unlock mutex so that callback can call any locking function.
-        if (MultiThreadingMgr::instance().getMode()) {
-            mutex_.unlock();
-        }
         try {
             // The callback should take care of its own exceptions but one
             // never knows.
@@ -1015,10 +987,6 @@ Connection::terminateInternal(const boost::system::error_code& ec,
 
         } catch (...) {
         }
-        // lock mutex so that we can continue processing.
-        if (MultiThreadingMgr::instance().getMode()) {
-            mutex_.lock();
-        }
 
         // If we're not requesting connection persistence, we should close the socket.
         // We're going to reconnect for the next transaction.
@@ -1029,20 +997,21 @@ Connection::terminateInternal(const boost::system::error_code& ec,
         resetState();
     }
 
+    // unlock mutex so that the next request can be safely processed.
+    if (MultiThreadingMgr::instance().getMode()) {
+        mutex_.unlock();
+    }
+
     // Check if there are any requests queued for this connection and start
     // another transaction if there is at least one.
-    HttpRequestPtr request;
-    long request_timeout;
-    HttpClient::RequestHandler callback;
-    HttpClient::ConnectHandler connect_callback;
-    HttpClient::CloseHandler close_callback;
     ConnectionPoolPtr conn_pool = conn_pool_.lock();
-    if (conn_pool && conn_pool->getNextRequest(url_, request, response,
-                                               request_timeout, callback,
-                                               connect_callback,
-                                               close_callback)) {
-        doTransactionInternal(request, response, request_timeout, callback,
-                              connect_callback, close_callback);
+    if (conn_pool) {
+        conn_pool->processNextRequest(url_);
+    }
+
+    // lock mutex so that processing can continue.
+    if (MultiThreadingMgr::instance().getMode()) {
+        mutex_.lock();
     }
 }