From: Razvan Becheriu Date: Mon, 18 May 2020 13:01:51 +0000 (+0300) Subject: [#1239] make http client ConnectionPool thread safe X-Git-Tag: Kea-1.7.8~73 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=c1d3395ec5df3a8df57c0247857b18d856ee3a11;p=thirdparty%2Fkea.git [#1239] make http client ConnectionPool thread safe --- diff --git a/src/lib/http/client.cc b/src/lib/http/client.cc index cc7d6b7fc7..fcbcb44c58 100644 --- a/src/lib/http/client.cc +++ b/src/lib/http/client.cc @@ -14,15 +14,18 @@ #include #include #include +#include + #include #include #include + #include +#include #include +#include #include -#include - using namespace isc; using namespace isc::asiolink; using namespace http; @@ -309,7 +312,7 @@ public: /// @param io_service Reference to the IO service to be used by the /// connections. explicit ConnectionPool(IOService& io_service) - : io_service_(io_service), conns_(), queue_() { + : io_service_(io_service), conns_(), queue_(), mutex_() { } /// @brief Destructor. @@ -341,6 +344,125 @@ public: HttpClient::RequestHandler& callback, HttpClient::ConnectHandler& connect_callback, HttpClient::CloseHandler& close_callback) { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard lk(mutex_); + return (getNextRequestInternal(url, request, response, + request_timeout, callback, + connect_callback, close_callback)); + } else { + return (getNextRequestInternal(url, request, response, + request_timeout, callback, + connect_callback, close_callback)); + } + } + + /// @brief Queue next request for sending to the server. + /// + /// A new transaction is started immediately, if there is no other request + /// in progress for the given URL. Otherwise, the request is queued. + /// + /// @param url Destination where the request should be sent. + /// @param request Pointer to the request to be sent to the server. + /// @param response Pointer to the object into which the response should be + /// stored. + /// @param request_timeout Requested timeout for the transaction in + /// milliseconds. + /// @param request_callback Pointer to the user callback to be invoked when the + /// transaction ends. + /// @param connect_callback Pointer to the user callback to be invoked when the + /// client connects to the server. + /// @param close_callback Pointer to the user callback to be invoked when the + /// client closes the connection to the server. + void queueRequest(const Url& url, + const HttpRequestPtr& request, + const HttpResponsePtr& response, + const long request_timeout, + const HttpClient::RequestHandler& request_callback, + const HttpClient::ConnectHandler& connect_callback, + const HttpClient::CloseHandler& close_callback) { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard lk(mutex_); + return (queueRequestInternal(url, request, response, + request_timeout, request_callback, + connect_callback, close_callback)); + } else { + return (queueRequestInternal(url, request, response, + request_timeout, request_callback, + connect_callback, close_callback)); + } + } + + /// @brief Closes connection and removes associated information from the + /// connection pool. + /// + /// @param url URL for which connection should be closed. + void closeConnection(const Url& url) { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard lk(mutex_); + closeConnectionInternal(url); + } else { + closeConnectionInternal(url); + } + } + + /// @brief Closes all connections and removes associated information from + /// the connection pool. + void closeAll() { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard lk(mutex_); + closeAllInternal(); + } else { + closeAllInternal(); + } + } + + /// @brief Closes a connection if it has an out-of-bandwidth socket event + /// + /// If the pool contains a connection using the given socket and that + /// connection is currently in a transaction the method returns as this + /// indicates a normal ready event. If the connection is not in an + /// ongoing transaction, then the connection is closed. + /// + /// This is method is intended to be used to detect and clean up then + /// sockets that are marked ready outside of transactions. The most common + /// case is the other end of the socket being closed. + /// + /// @param socket_fd socket descriptor to check + void closeIfOutOfBandwidth(int socket_fd) { + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard lk(mutex_); + closeIfOutOfBandwidth(fd); + } else { + closeIfOutOfBandwidth(fd); + } + } + +private: + + /// @brief Returns next queued request for the given URL. + /// + /// This method should be called in a thread safe context. + /// + /// @param url URL for which next queued request should be retrieved. + /// @param [out] request Pointer to the queued request. + /// @param [out] response Pointer to the object into which response should + /// be stored. + /// @param request_timeout Requested timeout for the transaction. + /// @param callback Pointer to the user callback for this request. + /// @param connect_callback Pointer to the user callback invoked when + /// the client connects to the server. + /// @param close_callback Pointer to the user callback invoked when + /// the client closes the connection to the server. + /// + /// @return true if the request for the given URL has been retrieved, + /// false if there are no more requests queued for this URL. + bool getNextRequestInternal(const Url& url, + HttpRequestPtr& request, + HttpResponsePtr& response, + long& request_timeout, + HttpClient::RequestHandler& callback, + HttpClient::ConnectHandler& connect_callback, + HttpClient::CloseHandler& close_callback) { // Check if there is a queue for this URL. If there is no queue, there // is no request queued either. auto it = queue_.find(url); @@ -364,9 +486,11 @@ public: /// @brief Queue next request for sending to the server. /// - /// A new transaction is started immediatelly, if there is no other request + /// A new transaction is started immediately, if there is no other request /// in progress for the given URL. Otherwise, the request is queued. /// + /// This method should be called in a thread safe context. + /// /// @param url Destination where the request should be sent. /// @param request Pointer to the request to be sent to the server. /// @param response Pointer to the object into which the response should be @@ -379,13 +503,13 @@ public: /// client connects to the server. /// @param close_callback Pointer to the user callback to be invoked when the /// client closes the connection to the server. - void queueRequest(const Url& url, - const HttpRequestPtr& request, - const HttpResponsePtr& response, - const long request_timeout, - const HttpClient::RequestHandler& request_callback, - const HttpClient::ConnectHandler& connect_callback, - const HttpClient::CloseHandler& close_callback) { + void queueRequestInternal(const Url& url, + const HttpRequestPtr& request, + const HttpResponsePtr& response, + const long request_timeout, + const HttpClient::RequestHandler& request_callback, + const HttpClient::ConnectHandler& connect_callback, + const HttpClient::CloseHandler& close_callback) { auto it = conns_.find(url); if (it != conns_.end()) { ConnectionPtr conn = it->second; @@ -397,13 +521,12 @@ public: request_callback, connect_callback, close_callback)); - } else { // Connection is idle, so we can start the transaction. conn->doTransaction(request, response, request_timeout, - request_callback, connect_callback, close_callback); + request_callback, connect_callback, + close_callback); } - } else { // There is no connection with this destination yet. Let's create // it and start the transaction. @@ -418,8 +541,10 @@ public: /// @brief Closes connection and removes associated information from the /// connection pool. /// - /// @param url URL for which connection shuld be closed. - void closeConnection(const Url& url) { + /// This method should be called in a thread safe context. + /// + /// @param url URL for which connection should be closed. + void closeConnectionInternal(const Url& url) { // Close connection for the specified URL. auto conns_it = conns_.find(url); if (conns_it != conns_.end()) { @@ -436,7 +561,9 @@ public: /// @brief Closes all connections and removes associated information from /// the connection pool. - void closeAll() { + /// + /// This method should be called in a thread safe context. + void closeAllInternal() { for (auto conns_it = conns_.begin(); conns_it != conns_.end(); ++conns_it) { conns_it->second->close(); @@ -454,11 +581,13 @@ public: /// ongoing transaction, then the connection is closed. /// /// This is method is intended to be used to detect and clean up then - /// sockets that are marked ready outside of transactions. The most comman + /// sockets that are marked ready outside of transactions. The most common /// case is the other end of the socket being closed. /// + /// This method should be called in a thread safe context. + /// /// @param socket_fd socket descriptor to check - void closeIfOutOfBandwidth(int socket_fd) { + void closeIfOutOfBandwidthInternal(int socket_fd) { // First we look for a connection with the socket. for (auto conns_it = conns_.begin(); conns_it != conns_.end(); ++conns_it) { @@ -484,8 +613,6 @@ public: } } -private: - /// @brief Holds reference to the IO service. IOService& io_service_; @@ -521,12 +648,16 @@ private: /// @brief Holds pointer to the request. HttpRequestPtr request_; + /// @brief Holds pointer to the response. HttpResponsePtr response_; + /// @brief Holds requested timeout value. long request_timeout_; + /// @brief Holds pointer to the user callback. HttpClient::RequestHandler callback_; + /// @brief Holds pointer to the user callback for connect. HttpClient::ConnectHandler connect_callback_; @@ -536,6 +667,9 @@ private: /// @brief Holds the queue of requests for different URLs. std::map > queue_; + + /// @brief Mutex to protect the internal state. + std::mutex mutex_; }; Connection::Connection(IOService& io_service,