From: Razvan Becheriu Date: Thu, 4 Jun 2020 19:54:03 +0000 (+0300) Subject: [#1239] fixed possible dead lock X-Git-Tag: Kea-1.7.9~63 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=8f4b2dea9470ea5d939aac9824c175784894e221;p=thirdparty%2Fkea.git [#1239] fixed possible dead lock --- diff --git a/src/lib/http/client.cc b/src/lib/http/client.cc index 88e1080295..dd2e73c6f9 100644 --- a/src/lib/http/client.cc +++ b/src/lib/http/client.cc @@ -20,6 +20,7 @@ #include #include +#include #include #include #include @@ -144,7 +145,7 @@ public: /// @brief Checks if a transaction has been initiated over this connection. /// /// @return true if transaction has been initiated, false otherwise. - bool isTransactionOngoing(); + bool isTransactionOngoing() const; /// @brief Checks if a socket descriptor belongs to this connection. /// @@ -202,13 +203,6 @@ private: /// Should be called in a thread safe context. void closeInternal(); - /// @brief Checks if a transaction has been initiated over this connection. - /// - /// Should be called in a thread safe context. - /// - /// @return true if transaction has been initiated, false otherwise. - bool isTransactionOngoingInternal() const; - /// @brief Checks and logs if premature transaction timeout is suspected. /// /// Should be called in a thread safe context. @@ -387,6 +381,9 @@ private: /// @brief User supplied callback. HttpClient::CloseHandler close_callback_; + /// @brief Flag to indicate that a transaction is running. + std::atomic started_; + /// @brief Mutex to protect the internal state. std::mutex mutex_; }; @@ -434,22 +431,12 @@ public: /// /// @return true if the request for the given URL has been retrieved, /// false if there are no more requests queued for this URL. - bool getNextRequest(const Url& url, - HttpRequestPtr& request, - HttpResponsePtr& response, - long& request_timeout, - HttpClient::RequestHandler& callback, - HttpClient::ConnectHandler& connect_callback, - HttpClient::CloseHandler& close_callback) { + bool processNextRequest(const Url& url) { if (MultiThreadingMgr::instance().getMode()) { std::lock_guard lk(mutex_); - return (getNextRequestInternal(url, request, response, - request_timeout, callback, - connect_callback, close_callback)); + return (processNextRequestInternal(url)); } else { - return (getNextRequestInternal(url, request, response, - request_timeout, callback, - connect_callback, close_callback)); + return (processNextRequestInternal(url)); } } @@ -553,13 +540,7 @@ private: /// /// @return true if the request for the given URL has been retrieved, /// false if there are no more requests queued for this URL. - bool getNextRequestInternal(const Url& url, - HttpRequestPtr& request, - HttpResponsePtr& response, - long& request_timeout, - HttpClient::RequestHandler& callback, - HttpClient::ConnectHandler& connect_callback, - HttpClient::CloseHandler& close_callback) { + bool processNextRequestInternal(const Url& url) { // Check if there is a queue for this URL. If there is no queue, there // is no request queued either. auto it = queue_.find(url); @@ -568,12 +549,10 @@ private: if (!it->second.empty()) { RequestDescriptor desc = it->second.front(); it->second.pop(); - request = desc.request_; - response = desc.response_; - request_timeout = desc.request_timeout_, - callback = desc.callback_; - connect_callback = desc.connect_callback_; - close_callback = desc.close_callback_; + desc.conn_->doTransaction(desc.request_, desc.response_, + desc.request_timeout_, desc.callback_, + desc.connect_callback_, + desc.close_callback_); return (true); } } @@ -613,7 +592,7 @@ private: // There is a connection for this URL already. Check if it is idle. if (conn->isTransactionOngoing()) { // Connection is busy, so let's queue the request. - queue_[url].push(RequestDescriptor(request, response, + queue_[url].push(RequestDescriptor(conn, request, response, request_timeout, request_callback, connect_callback, @@ -730,19 +709,22 @@ private: /// when the client connects to the server. /// @param close_callback pointer to the user callback to be invoked /// when the client closes the connection to the server. - RequestDescriptor(const HttpRequestPtr& request, + RequestDescriptor(const ConnectionPtr& conn, + const HttpRequestPtr& request, const HttpResponsePtr& response, - const long request_timeout, + const long& request_timeout, const HttpClient::RequestHandler& callback, const HttpClient::ConnectHandler& connect_callback, const HttpClient::CloseHandler& close_callback) - : request_(request), response_(response), - request_timeout_(request_timeout), - callback_(callback), + : conn_(conn), request_(request), response_(response), + request_timeout_(request_timeout), callback_(callback), connect_callback_(connect_callback), close_callback_(close_callback) { } + /// @brief Holds the connection. + ConnectionPtr conn_; + /// @brief Holds pointer to the request. HttpRequestPtr request_; @@ -774,7 +756,8 @@ Connection::Connection(IOService& io_service, const Url& url) : conn_pool_(conn_pool), url_(url), socket_(io_service), timer_(io_service), current_request_(), current_response_(), parser_(), current_callback_(), - buf_(), input_buf_(), current_transid_(0), close_callback_() { + buf_(), input_buf_(), current_transid_(0), close_callback_(), + started_(false) { } Connection::~Connection() { @@ -783,6 +766,7 @@ Connection::~Connection() { void Connection::resetState() { + started_ = false; current_request_.reset(); current_response_.reset(); parser_.reset(); @@ -830,6 +814,7 @@ Connection::doTransactionInternal(const HttpRequestPtr& request, const HttpClient::ConnectHandler& connect_callback, const HttpClient::CloseHandler& close_callback) { try { + started_ = true; current_request_ = request; current_response_ = response; parser_.reset(new HttpResponseParser(*current_response_)); @@ -906,18 +891,8 @@ Connection::closeInternal() { } bool -Connection::isTransactionOngoing() { - if (MultiThreadingMgr::instance().getMode()) { - std::lock_guard lk(mutex_); - return (isTransactionOngoingInternal()); - } else { - return (isTransactionOngoingInternal()); - } -} - -bool -Connection::isTransactionOngoingInternal() const { - return (static_cast(current_request_)); +Connection::isTransactionOngoing() const { + return (started_); } bool @@ -942,7 +917,7 @@ Connection::checkPrematureTimeoutInternal(const uint64_t transid) { // Also, if there is a transaction in progress but the ID of that // transaction doesn't match the one associated with the handler it, // also means that the transaction timed out prematurely. - if (!isTransactionOngoingInternal() || (transid != current_transid_)) { + if (!isTransactionOngoing() || (transid != current_transid_)) { LOG_WARN(http_logger, HTTP_PREMATURE_CONNECTION_TIMEOUT_OCCURRED); return (true); } @@ -965,7 +940,7 @@ Connection::terminateInternal(const boost::system::error_code& ec, const std::string& parsing_error) { HttpResponsePtr response; - if (isTransactionOngoingInternal()) { + if (isTransactionOngoing()) { timer_.cancel(); socket_.cancel(); @@ -980,9 +955,9 @@ Connection::terminateInternal(const boost::system::error_code& ec, LOG_DEBUG(http_logger, isc::log::DBGLVL_TRACE_BASIC_DATA, HTTP_SERVER_RESPONSE_RECEIVED_DETAILS) .arg(url_.toText()) - .arg((parser_ ? - parser_->getBufferAsString(MAX_LOGGED_MESSAGE_SIZE) : - "[HttpResponseParser is null]")); + .arg(parser_ ? + parser_->getBufferAsString(MAX_LOGGED_MESSAGE_SIZE) : + "[HttpResponseParser is null]"); } else { std::string err = parsing_error.empty() ? ec.message() : @@ -999,15 +974,12 @@ Connection::terminateInternal(const boost::system::error_code& ec, LOG_DEBUG(http_logger, isc::log::DBGLVL_TRACE_BASIC_DATA, HTTP_BAD_SERVER_RESPONSE_RECEIVED_DETAILS) .arg(url_.toText()) - .arg((parser_ ? parser_->getBufferAsString() : - "[HttpResponseParser is null]")); + .arg(parser_ ? + parser_->getBufferAsString(MAX_LOGGED_MESSAGE_SIZE) : + "[HttpResponseParser is null]"); } } - // unlock mutex so that callback can call any locking function. - if (MultiThreadingMgr::instance().getMode()) { - mutex_.unlock(); - } try { // The callback should take care of its own exceptions but one // never knows. @@ -1015,10 +987,6 @@ Connection::terminateInternal(const boost::system::error_code& ec, } catch (...) { } - // lock mutex so that we can continue processing. - if (MultiThreadingMgr::instance().getMode()) { - mutex_.lock(); - } // If we're not requesting connection persistence, we should close the socket. // We're going to reconnect for the next transaction. @@ -1029,20 +997,21 @@ Connection::terminateInternal(const boost::system::error_code& ec, resetState(); } + // unlock mutex so that the next request can be safely processed. + if (MultiThreadingMgr::instance().getMode()) { + mutex_.unlock(); + } + // Check if there are any requests queued for this connection and start // another transaction if there is at least one. - HttpRequestPtr request; - long request_timeout; - HttpClient::RequestHandler callback; - HttpClient::ConnectHandler connect_callback; - HttpClient::CloseHandler close_callback; ConnectionPoolPtr conn_pool = conn_pool_.lock(); - if (conn_pool && conn_pool->getNextRequest(url_, request, response, - request_timeout, callback, - connect_callback, - close_callback)) { - doTransactionInternal(request, response, request_timeout, callback, - connect_callback, close_callback); + if (conn_pool) { + conn_pool->processNextRequest(url_); + } + + // lock mutex so that processing can continue. + if (MultiThreadingMgr::instance().getMode()) { + mutex_.lock(); } }