#include <atomic>
#include <array>
+#include <unordered_set>
#include <functional>
#include <iostream>
#include <map>
/// @brief Shared pointer to the connection.
typedef boost::shared_ptr<Connection> ConnectionPtr;
-/// @brief Container of Connections for a given URL
-class ConnectionList {
-public:
- /// @brief Constructor
- ///
- /// @param url URL associated with this connection list.
- /// @param max_connections maximum number of connections
- /// allowed for in the list URL
- ConnectionList(Url url, size_t max_connections)
- : url_(url), max_connections_(max_connections) {
- }
-
- /// @brief Add a new connection to the list
- ///
- /// @param connection the connection to add
- ///
- /// @throw BadValue if the maximum number of connections already
- /// exist.
- void addConnection(ConnectionPtr connection) {
- if (full()) {
- isc_throw(BadValue, "URL: " << url_.toText()
- << ", already at maximum connections: "
- << max_connections_);
- }
-
- connections_.push_back(connection);
- }
-
- /// @brief Closes a connection and removes it from the list.
- ///
- /// @param connection the connection to remove
- void closeConnection(ConnectionPtr connection) {
- for (auto it = connections_.begin(); it != connections_.end(); ++it) {
- if (*it == connection) {
- (*it)->close();
- connections_.erase(it);
- break;
- }
- }
- }
-
- /// @brief Closes all connections and clears the list.
- void closeAllConnections() {
- for (auto connection : connections_) {
- connection->close();
- }
-
- connections_.clear();
- }
-
- /// @brief Find the first idle connection.
- ///
- /// Iterates over the existing connections and returns the
- /// first connection which is not currently in a transaction.
- ///
- /// @return The first idle connection or an empty pointer if
- /// all connections are busy.
- ConnectionPtr getIdleConnection() {
- for (auto connection : connections_) {
- if (!connection->isTransactionOngoing()) {
- return(connection);
- }
- }
-
- return(ConnectionPtr());
- }
-
- /// @brief Find a connection by its socket descriptor.
- ///
- /// @param socket_fd socket descriptor to find
- ///
- /// @return The connection or an empty pointer if no matching
- /// connection exists.
- ConnectionPtr findBySocketFd(int socket_fd) {
- for (auto connection : connections_) {
- if (connection->isMySocket(socket_fd)) {
- return(connection);
- }
- }
-
- return(ConnectionPtr());
- }
-
- /// @brief Indicates if there are no connections in the list.
- ///
- /// @return true if the list is empty.
- bool empty() {
- return(connections_.empty());
- }
-
- /// @brief Indicates if list contains the maximum number.
- ///
- /// @return true if the list is full.
- bool full() {
- return(connections_.size() >= max_connections_);
- }
-
- /// @brief Fetches the number of connections in the list.
- ///
- /// @return the number of connections in the list.
- size_t size() {
- return connections_.size();
- }
-
- /// @brief Fetches the maximum number of connections.
- ///
- /// @return the maxim number of connections.
- size_t max_connections() const {
- return max_connections_;
- }
-
- /// @brief Fetches the URL.
- ///
- /// @return the URL.
- const Url& getUrl() const {
- return url_;
- }
-
-private:
- /// @brief URL supported by the list.
- Url url_;
-
- /// @brief Maximum number of concurrent connections allowed in the list.
- size_t max_connections_;
-
- /// @brief List of concurrent connections.
- std::vector<ConnectionPtr> connections_;
-};
-
-/// @brief Defines a pointer to a ConnectionList instance.
-typedef boost::shared_ptr<ConnectionList> ConnectionListPtr;
-
/// @brief Connection pool for managing multiple connections.
///
-/// Connection pool creates and destroys connections. It holds pointers
-/// to all created connections and can verify whether the particular
-/// connection is currently busy or idle. If the connection is idle, it
-/// uses this connection for new requests. If the connection is busy, it
-/// queues new requests until the connection becomes available.
+/// Connection pool creates and destroys URL destinations. t manages
+/// connections to and requests for URLs. Each time a request is
+/// submitted for a URL, it assigns it to an available idle connection,
+/// or if no idle connections are available, pushes the request on the queue
+/// for that URL.
class ConnectionPool : public boost::enable_shared_from_this<ConnectionPool> {
public:
/// @param max_url_connections maximum number of concurrent
/// connections allowed per URL.
explicit ConnectionPool(IOService& io_service, size_t max_url_connections)
- : io_service_(io_service), conns_(), queue_(), mutex_(),
+ : io_service_(io_service), destinations_(), mutex_(),
max_url_connections_(max_url_connections) {
}
}
}
- /// @brief Closes a URL's connections and removes associated information
- /// from the connection pool.
- ///
- /// @param url URL for which connection should be closed.
- void closeUrlConnections(const Url& url) {
- if (MultiThreadingMgr::instance().getMode()) {
- std::lock_guard<std::mutex> lk(mutex_);
- closeUrlConnectionsInternal(url);
- } else {
- closeUrlConnectionsInternal(url);
- }
- }
-
/// @brief Closes all URLS and removes associated information from
/// the connection pool.
void closeAll() {
void processNextRequestInternal(const Url& url) {
// Check if there is a queue for this URL. If there is no queue, there
// is no request queued either.
- auto queue_it = queue_.find(url);
- if (queue_it != queue_.end()) {
- if (!queue_it->second.empty()) {
+ DestinationPtr destination = findDestination(url);
+ if (destination) {
+ if (!destination->queueEmpty()) {
// We have at least one queued request. Do we have an
// idle connection?
- auto conns_it = conns_.find(url);
- if (conns_it == conns_.end()) {
- isc_throw(Unexpected, "no connection list for :" << url.toText());
- }
-
- // Now, look for an idle connection.
- ConnectionPtr connection = conns_it->second->getIdleConnection();
+ ConnectionPtr connection = destination->getIdleConnection();
if (!connection) {
TOMS_TRACE_LOG("*** No idle connections, don't dequeue?");
// @todo Resolve this, throw or just return, possibly log and return
// Dequeue the oldest request and start a transaction for it using
// the idle connection.
- RequestDescriptor desc = queue_it->second.front();
- queue_it->second.pop();
- desc.conn_ = connection;
+ RequestDescriptor desc = destination->popNextRequest();
connection->doTransaction(desc.request_, desc.response_,
desc.request_timeout_, desc.callback_,
desc.connect_callback_,
const HttpClient::ConnectHandler& connect_callback,
const HttpClient::HandshakeHandler& handshake_callback,
const HttpClient::CloseHandler& close_callback) {
- // Find the connection list for the url
- ConnectionListPtr url_connections;
- auto it = conns_.find(url);
- if (it != conns_.end()) {
- url_connections = it->second;
+ ConnectionPtr connection;
+ // Find the destination for the requested URL.
+ DestinationPtr destination = findDestination(url);
+ if (destination) {
+ // Found it, look for an idle connection.
+ connection = destination->getIdleConnection();
} else {
- // Doesn't exist yet, so create it.
- url_connections.reset(new ConnectionList(url, max_url_connections_));
- conns_[url] = url_connections;
+ // Doesn't exist yet so it's a new destination/
+ destination = addDestination(url);
}
- // Now, look for an idle connection.
- ConnectionPtr connection = url_connections->getIdleConnection();
if (!connection) {
- if (url_connections->full()) {
+ if (destination->connectionsFull()) {
TOMS_TRACE_LOG("no idle connections, queue request");
// All connections busy, queue it.
- queue_[url].push(RequestDescriptor(ConnectionPtr(), request, response,
- request_timeout,
- request_callback,
- connect_callback,
- handshake_callback,
- close_callback));
+ destination->pushRequest(RequestDescriptor(ConnectionPtr(), request, response,
+ request_timeout,
+ request_callback,
+ connect_callback,
+ handshake_callback,
+ close_callback));
return;
}
// Room to make another connection with this destination, so make one.
TOMS_TRACE_LOG("creating a new connection");
- connection.reset(new Connection(io_service_, tls_context,
+ connection.reset(new Connection(io_service_, tls_context,
shared_from_this(), url));
- url_connections->addConnection(connection);
+ destination->addConnection(connection);
}
// Use the connection to start the transaction.
connect_callback, handshake_callback, close_callback);
}
- /// @brief Closes a URL's connections and clears its queue
- ///
- /// This method should be called in a thread safe context.
- ///
- /// @param url URL for which connection should be closed.
- void closeUrlConnectionsInternal(const Url& url) {
- // Remove requests from the queue.
- auto queue_it = queue_.find(url);
- if (queue_it != queue_.end()) {
- queue_.erase(queue_it);
- }
-
- // Close connections for the URL.
- auto it = conns_.find(url);
- if (it != conns_.end()) {
- it->second->closeAllConnections();
- }
- }
-
/// @brief Closes all connections for all URLs and removes associated
/// information from the connection pool.
///
/// This method should be called in a thread safe context.
void closeAllInternal() {
- // Remove all requests from the queue.
- queue_.clear();
-
- // Close connections for each URL.
- for (auto it = conns_.begin(); it != conns_.end(); ++it) {
- it->second->closeAllConnections();
+ for (auto const& destination : destinations_) {
+ destination.second->closeAllConnections();
}
- // Remove all of the connections.
- conns_.clear();
+ destinations_.clear();
}
/// @brief Closes a connection if it has an out-of-band socket event
///
/// @param socket_fd socket descriptor to check
void closeIfOutOfBandInternal(int socket_fd) {
- // First we look for a connection with the socket.
- for (auto conns_it = conns_.begin(); conns_it != conns_.end();
- ++conns_it) {
-
- ConnectionListPtr url_connections = conns_it->second;
- ConnectionPtr connection = url_connections->findBySocketFd(socket_fd);
- if (connection && connection->isTransactionOngoing()) {
- // Matches but is in a transaction, all is well.
+ for (auto const& destination : destinations_) {
+ // First we look for a connection with the socket.
+ ConnectionPtr connection = destination.second->findBySocketFd(socket_fd);
+ if (connection) {
+ if (!connection->isTransactionOngoing()) {
+ // Socket has no transaction, so any ready event is
+ // out-of-band (other end probably closed), so
+ // let's close it. Note we do not remove any queued
+ // requests, as this might somehow be occurring in
+ // between them.
+ destination.second->closeConnection(connection);
+ }
+
return;
}
-
- // Socket has no transaction, so any ready event is
- // out-of-band (other end probably closed), so
- // let's close it. Note we do not remove any queued
- // requests, as this might somehow be occurring in
- // between them.
- url_connections->closeConnection(connection);
- break;
}
}
HttpClient::CloseHandler close_callback_;
};
+ /// @brief Encapsulates connections and requests for a given URL
+ class Destination {
+ public:
+ /// @brief Constructor
+ ///
+ /// @param url server URL of this destination
+ /// @param max_connections maximum number of concurrent connections
+ /// allowed for in the list URL
+ Destination(Url url, size_t max_connections)
+ : url_(url), max_connections_(max_connections), connections_(), queue_() { }
+
+ /// @brief Destructor
+ ~Destination() {
+ closeAllConnections();
+ }
+
+ /// @brief Add a new connection
+ ///
+ /// @param connection the connection to add
+ ///
+ /// @throw BadValue if the maximum number of connections already
+ /// exist.
+ void addConnection(ConnectionPtr connection) {
+ if (connectionsFull()) {
+ isc_throw(BadValue, "URL: " << url_.toText()
+ << ", already at maximum connections: "
+ << max_connections_);
+ }
+
+ connections_.push_back(connection);
+ }
+
+ /// @brief Closes a connection and removes it from the list.
+ ///
+ /// @param connection the connection to remove
+ void closeConnection(ConnectionPtr connection) {
+ for (auto it = connections_.begin(); it != connections_.end(); ++it) {
+ if (*it == connection) {
+ (*it)->close();
+ connections_.erase(it);
+ break;
+ }
+ }
+ }
+
+ /// @brief Closes all connections and clears the list.
+ void closeAllConnections() {
+ for (auto connection : connections_) {
+ connection->close();
+ }
+
+ connections_.clear();
+ }
+
+ /// @brief Find the first idle connection.
+ ///
+ /// Iterates over the existing connections and returns the
+ /// first connection which is not currently in a transaction.
+ ///
+ /// @return The first idle connection or an empty pointer if
+ /// all connections are busy.
+ ConnectionPtr getIdleConnection() {
+ for (auto connection : connections_) {
+ if (!connection->isTransactionOngoing()) {
+ return(connection);
+ }
+ }
+
+ return(ConnectionPtr());
+ }
+
+ /// @brief Find a connection by its socket descriptor.
+ ///
+ /// @param socket_fd socket descriptor to find
+ ///
+ /// @return The connection or an empty pointer if no matching
+ /// connection exists.
+ ConnectionPtr findBySocketFd(int socket_fd) {
+ for (auto connection : connections_) {
+ if (connection->isMySocket(socket_fd)) {
+ return(connection);
+ }
+ }
+
+ return(ConnectionPtr());
+ }
+
+ /// @brief Indicates if there are no connections in the list.
+ ///
+ /// @return true if the list is empty.
+ bool connectionsEmpty() {
+ return(connections_.empty());
+ }
+
+ /// @brief Indicates if list contains the maximum number.
+ ///
+ /// @return true if the list is full.
+ bool connectionsFull() {
+ return (connections_.size() >= max_connections_);
+ }
+
+ /// @brief Fetches the number of connections in the list.
+ ///
+ /// @return the number of connections in the list.
+ size_t connectionCount() {
+ return (connections_.size());
+ }
+
+ /// @brief Fetches the maximum number of connections.
+ ///
+ /// @return the maxim number of connections.
+ size_t getMaxConnections() const {
+ return (max_connections_);
+ }
+
+ /// @brief Fetches the URL.
+ ///
+ /// @return the URL.
+ const Url& getUrl() const {
+ return (url_);
+ }
+
+ /// @brief Indicates if request queue is empty.
+ ///
+ /// @return true if there are no requests queued.
+ bool queueEmpty() const {
+ return (queue_.empty());
+
+ }
+
+ /// @brief Adds a request to the end of the request queue.
+ ///
+ /// @param desc RequestDescriptor to queue.
+ void pushRequest(RequestDescriptor desc) {
+ queue_.push(desc);
+ }
+
+ /// @brief Adds a request to the end of the request queue.
+ ///
+ /// @return desc RequestDescriptor to queue.
+ RequestDescriptor popNextRequest() {
+ if (queue_.empty()) {
+ isc_throw(InvalidOperation, "cannot pop, queue is empty");
+ }
+
+ RequestDescriptor desc = queue_.front();
+ queue_.pop();
+ return(desc);
+ }
+
+ private:
+ /// @brief URL supported by the list.
+ Url url_;
+
+ /// @brief Maximum number of concurrent connections for this destination.
+ size_t max_connections_;
+
+ /// @brief List of concurrent connections.
+ std::vector<ConnectionPtr> connections_;
+
+ /// @brief Holds the queue of request for this destination.
+ std::queue<RequestDescriptor> queue_;
+ };
+
+ /// @brief Pointer to a Destination.
+ typedef boost::shared_ptr<Destination> DestinationPtr;
+
+ /// @brief Creates a new destination for the given URL.
+ ///
+ /// @param url URL of the new destination.
+ ///
+ /// @return Pointer to the newly created destination.
+ /// @note Must be called from within a thread-safe context.
+ DestinationPtr addDestination(const Url& url) {
+ DestinationPtr destination(new Destination(url, max_url_connections_));
+ destinations_[url] = destination;
+ return(destination);
+ }
+
+ /// @brief Fetches a destination by URL
+ ///
+ /// @param url Url of the destination desired.
+ ///
+ /// @return pointer the desired destination, empty pointer
+ /// if the destination does not exist.
+ DestinationPtr findDestination(const Url& url) const {
+ auto it = destinations_.find(url);
+ if (it != destinations_.end()) {
+ return (it->second);
+ }
+
+ return (DestinationPtr());
+ }
+
+ /// @brief Removes a destination by URL
+ ///
+ /// @param url Url of the destination to be removed.
+ /// @note Must be called from within a thread-safe context.
+ void removeDestination(const Url& url) {
+ // Remove requests from the queue.
+ auto it = destinations_.find(url);
+ if (it != destinations_.end()) {
+ it->second->closeAllConnections();
+ destinations_.erase(it);
+ }
+ }
+
/// @brief A reference to the IOService that drives socket IO.
IOService& io_service_;
- /// @brief Holds lists of connections for each URL.
- std::map<Url, ConnectionListPtr> conns_;
-
- /// @brief Holds the queue of requests for each URL.
- std::map<Url, std::queue<RequestDescriptor> > queue_;
+ /// @brief Map of Destinations by URL.
+ std::map<Url, DestinationPtr> destinations_;
/// @brief Mutex to protect the internal state.
std::mutex mutex_;
void
Connection::terminateInternal(const boost::system::error_code& ec,
const std::string& parsing_error) {
- TOMS_TRACE_LOG("terminate on: " << getSocketFd()
+ TOMS_TRACE_LOG("terminate on: " << getSocketFd()
<< ", isTransactionOngoing? " << isTransactionOngoing());
HttpResponsePtr response;
// another transaction if there is at least one.
ConnectionPoolPtr conn_pool = conn_pool_.lock();
if (conn_pool) {
- TOMS_TRACE_LOG(" more work on? " << getSocketFd()
+ TOMS_TRACE_LOG(" more work on? " << getSocketFd()
<< ", isTransactionOngoing? " << isTransactionOngoing());
if (MultiThreadingMgr::instance().getMode()) {
UnlockGuard<std::mutex> lock(mutex_);