#include <http/http_messages.h>
#include <http/response_json.h>
#include <http/response_parser.h>
+#include <util/multi_threading_mgr.h>
+
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/weak_ptr.hpp>
+
#include <array>
+#include <iostream>
#include <map>
+#include <mutex>
#include <queue>
-#include <iostream>
-
using namespace isc;
using namespace isc::asiolink;
using namespace http;
/// @param io_service Reference to the IO service to be used by the
/// connections.
explicit ConnectionPool(IOService& io_service)
- : io_service_(io_service), conns_(), queue_() {
+ : io_service_(io_service), conns_(), queue_(), mutex_() {
}
/// @brief Destructor.
HttpClient::RequestHandler& callback,
HttpClient::ConnectHandler& connect_callback,
HttpClient::CloseHandler& close_callback) {
+ if (MultiThreadingMgr::instance().getMode()) {
+ std::lock_guard<std::mutex> lk(mutex_);
+ return (getNextRequestInternal(url, request, response,
+ request_timeout, callback,
+ connect_callback, close_callback));
+ } else {
+ return (getNextRequestInternal(url, request, response,
+ request_timeout, callback,
+ connect_callback, close_callback));
+ }
+ }
+
+ /// @brief Queue next request for sending to the server.
+ ///
+ /// A new transaction is started immediately, if there is no other request
+ /// in progress for the given URL. Otherwise, the request is queued.
+ ///
+ /// @param url Destination where the request should be sent.
+ /// @param request Pointer to the request to be sent to the server.
+ /// @param response Pointer to the object into which the response should be
+ /// stored.
+ /// @param request_timeout Requested timeout for the transaction in
+ /// milliseconds.
+ /// @param request_callback Pointer to the user callback to be invoked when the
+ /// transaction ends.
+ /// @param connect_callback Pointer to the user callback to be invoked when the
+ /// client connects to the server.
+ /// @param close_callback Pointer to the user callback to be invoked when the
+ /// client closes the connection to the server.
+ void queueRequest(const Url& url,
+ const HttpRequestPtr& request,
+ const HttpResponsePtr& response,
+ const long request_timeout,
+ const HttpClient::RequestHandler& request_callback,
+ const HttpClient::ConnectHandler& connect_callback,
+ const HttpClient::CloseHandler& close_callback) {
+ if (MultiThreadingMgr::instance().getMode()) {
+ std::lock_guard<std::mutex> lk(mutex_);
+ return (queueRequestInternal(url, request, response,
+ request_timeout, request_callback,
+ connect_callback, close_callback));
+ } else {
+ return (queueRequestInternal(url, request, response,
+ request_timeout, request_callback,
+ connect_callback, close_callback));
+ }
+ }
+
+ /// @brief Closes connection and removes associated information from the
+ /// connection pool.
+ ///
+ /// @param url URL for which connection should be closed.
+ void closeConnection(const Url& url) {
+ if (MultiThreadingMgr::instance().getMode()) {
+ std::lock_guard<std::mutex> lk(mutex_);
+ closeConnectionInternal(url);
+ } else {
+ closeConnectionInternal(url);
+ }
+ }
+
+ /// @brief Closes all connections and removes associated information from
+ /// the connection pool.
+ void closeAll() {
+ if (MultiThreadingMgr::instance().getMode()) {
+ std::lock_guard<std::mutex> lk(mutex_);
+ closeAllInternal();
+ } else {
+ closeAllInternal();
+ }
+ }
+
+ /// @brief Closes a connection if it has an out-of-bandwidth socket event
+ ///
+ /// If the pool contains a connection using the given socket and that
+ /// connection is currently in a transaction the method returns as this
+ /// indicates a normal ready event. If the connection is not in an
+ /// ongoing transaction, then the connection is closed.
+ ///
+ /// This is method is intended to be used to detect and clean up then
+ /// sockets that are marked ready outside of transactions. The most common
+ /// case is the other end of the socket being closed.
+ ///
+ /// @param socket_fd socket descriptor to check
+ void closeIfOutOfBandwidth(int socket_fd) {
+ if (MultiThreadingMgr::instance().getMode()) {
+ std::lock_guard<std::mutex> lk(mutex_);
+ closeIfOutOfBandwidth(fd);
+ } else {
+ closeIfOutOfBandwidth(fd);
+ }
+ }
+
+private:
+
+ /// @brief Returns next queued request for the given URL.
+ ///
+ /// This method should be called in a thread safe context.
+ ///
+ /// @param url URL for which next queued request should be retrieved.
+ /// @param [out] request Pointer to the queued request.
+ /// @param [out] response Pointer to the object into which response should
+ /// be stored.
+ /// @param request_timeout Requested timeout for the transaction.
+ /// @param callback Pointer to the user callback for this request.
+ /// @param connect_callback Pointer to the user callback invoked when
+ /// the client connects to the server.
+ /// @param close_callback Pointer to the user callback invoked when
+ /// the client closes the connection to the server.
+ ///
+ /// @return true if the request for the given URL has been retrieved,
+ /// false if there are no more requests queued for this URL.
+ bool getNextRequestInternal(const Url& url,
+ HttpRequestPtr& request,
+ HttpResponsePtr& response,
+ long& request_timeout,
+ HttpClient::RequestHandler& callback,
+ HttpClient::ConnectHandler& connect_callback,
+ HttpClient::CloseHandler& close_callback) {
// Check if there is a queue for this URL. If there is no queue, there
// is no request queued either.
auto it = queue_.find(url);
/// @brief Queue next request for sending to the server.
///
- /// A new transaction is started immediatelly, if there is no other request
+ /// A new transaction is started immediately, if there is no other request
/// in progress for the given URL. Otherwise, the request is queued.
///
+ /// This method should be called in a thread safe context.
+ ///
/// @param url Destination where the request should be sent.
/// @param request Pointer to the request to be sent to the server.
/// @param response Pointer to the object into which the response should be
/// client connects to the server.
/// @param close_callback Pointer to the user callback to be invoked when the
/// client closes the connection to the server.
- void queueRequest(const Url& url,
- const HttpRequestPtr& request,
- const HttpResponsePtr& response,
- const long request_timeout,
- const HttpClient::RequestHandler& request_callback,
- const HttpClient::ConnectHandler& connect_callback,
- const HttpClient::CloseHandler& close_callback) {
+ void queueRequestInternal(const Url& url,
+ const HttpRequestPtr& request,
+ const HttpResponsePtr& response,
+ const long request_timeout,
+ const HttpClient::RequestHandler& request_callback,
+ const HttpClient::ConnectHandler& connect_callback,
+ const HttpClient::CloseHandler& close_callback) {
auto it = conns_.find(url);
if (it != conns_.end()) {
ConnectionPtr conn = it->second;
request_callback,
connect_callback,
close_callback));
-
} else {
// Connection is idle, so we can start the transaction.
conn->doTransaction(request, response, request_timeout,
- request_callback, connect_callback, close_callback);
+ request_callback, connect_callback,
+ close_callback);
}
-
} else {
// There is no connection with this destination yet. Let's create
// it and start the transaction.
/// @brief Closes connection and removes associated information from the
/// connection pool.
///
- /// @param url URL for which connection shuld be closed.
- void closeConnection(const Url& url) {
+ /// This method should be called in a thread safe context.
+ ///
+ /// @param url URL for which connection should be closed.
+ void closeConnectionInternal(const Url& url) {
// Close connection for the specified URL.
auto conns_it = conns_.find(url);
if (conns_it != conns_.end()) {
/// @brief Closes all connections and removes associated information from
/// the connection pool.
- void closeAll() {
+ ///
+ /// This method should be called in a thread safe context.
+ void closeAllInternal() {
for (auto conns_it = conns_.begin(); conns_it != conns_.end();
++conns_it) {
conns_it->second->close();
/// ongoing transaction, then the connection is closed.
///
/// This is method is intended to be used to detect and clean up then
- /// sockets that are marked ready outside of transactions. The most comman
+ /// sockets that are marked ready outside of transactions. The most common
/// case is the other end of the socket being closed.
///
+ /// This method should be called in a thread safe context.
+ ///
/// @param socket_fd socket descriptor to check
- void closeIfOutOfBandwidth(int socket_fd) {
+ void closeIfOutOfBandwidthInternal(int socket_fd) {
// First we look for a connection with the socket.
for (auto conns_it = conns_.begin(); conns_it != conns_.end();
++conns_it) {
}
}
-private:
-
/// @brief Holds reference to the IO service.
IOService& io_service_;
/// @brief Holds pointer to the request.
HttpRequestPtr request_;
+
/// @brief Holds pointer to the response.
HttpResponsePtr response_;
+
/// @brief Holds requested timeout value.
long request_timeout_;
+
/// @brief Holds pointer to the user callback.
HttpClient::RequestHandler callback_;
+
/// @brief Holds pointer to the user callback for connect.
HttpClient::ConnectHandler connect_callback_;
/// @brief Holds the queue of requests for different URLs.
std::map<Url, std::queue<RequestDescriptor> > queue_;
+
+ /// @brief Mutex to protect the internal state.
+ std::mutex mutex_;
};
Connection::Connection(IOService& io_service,