]> git.ipfire.org Git - thirdparty/kea.git/commitdiff
[#1732] Refactored internal HttpClient classes
authorThomas Markwalder <tmark@isc.org>
Fri, 26 Mar 2021 20:43:37 +0000 (16:43 -0400)
committerThomas Markwalder <tmark@isc.org>
Thu, 8 Apr 2021 12:59:48 +0000 (08:59 -0400)
src/lib/http/client.cc
    Refactored, connections and request queue are now managed together
    as part of a URL destination.

src/lib/http/url.cc
    Url::operator<(const Url& url) - compares original
    unparsed string rather then reconstructing a new string for
    both operands every time

src/lib/http/client.cc
src/lib/http/url.cc
src/lib/http/url.h

index b0baf631ebac89a45b6c3011d63ac0ad4795e180..b8411bf8179a65bb8a76ad9c5249751cdacb2dc4 100644 (file)
@@ -22,6 +22,7 @@
 
 #include <atomic>
 #include <array>
+#include <unordered_set>
 #include <functional>
 #include <iostream>
 #include <map>
@@ -452,145 +453,13 @@ private:
 /// @brief Shared pointer to the connection.
 typedef boost::shared_ptr<Connection> ConnectionPtr;
 
-/// @brief Container of Connections for a given URL
-class ConnectionList {
-public:
-    /// @brief Constructor
-    ///
-    /// @param url URL associated with this connection list.
-    /// @param max_connections maximum number of connections
-    /// allowed for in the list URL
-    ConnectionList(Url url, size_t max_connections)
-            : url_(url), max_connections_(max_connections) {
-    }
-
-    /// @brief Add a new connection to the list
-    ///
-    /// @param connection the connection to add
-    ///
-    /// @throw BadValue if the maximum number of connections already
-    /// exist.
-    void addConnection(ConnectionPtr connection) {
-        if (full()) {
-            isc_throw(BadValue, "URL: " << url_.toText()
-                      << ", already at maximum connections: "
-                      << max_connections_);
-        }
-
-        connections_.push_back(connection);
-    }
-
-    /// @brief Closes a connection and removes it from the list. 
-    ///
-    /// @param connection the connection to remove
-    void closeConnection(ConnectionPtr connection) {
-        for (auto it = connections_.begin(); it != connections_.end(); ++it) {
-            if (*it == connection) {
-                (*it)->close();
-                connections_.erase(it);
-                break;
-            }
-        }
-    }
-
-    /// @brief Closes all connections and clears the list.
-    void closeAllConnections() {
-        for (auto connection : connections_) {
-            connection->close();
-        }
-
-        connections_.clear();
-    }
-
-    /// @brief Find the first idle connection.
-    ///
-    /// Iterates over the existing connections and returns the
-    /// first connection which is not currently in a transaction.
-    ///
-    /// @return The first idle connection or an empty pointer if
-    /// all connections are busy.
-    ConnectionPtr getIdleConnection() {
-        for (auto connection : connections_) {
-            if (!connection->isTransactionOngoing()) {
-                return(connection);
-            }
-        }
-
-        return(ConnectionPtr());
-    }
-
-    /// @brief Find a connection by its socket descriptor.
-    ///
-    /// @param socket_fd socket descriptor to find
-    ///
-    /// @return The connection or an empty pointer if no matching
-    /// connection exists.
-    ConnectionPtr findBySocketFd(int socket_fd) {
-        for (auto connection : connections_) {
-            if (connection->isMySocket(socket_fd)) {
-                return(connection);
-            }
-        }
-
-        return(ConnectionPtr());
-    }
-
-    /// @brief Indicates if there are no connections in the list.
-    ///
-    /// @return true if the list is empty.
-    bool empty() {
-        return(connections_.empty());
-    }
-
-    /// @brief Indicates if list contains the maximum number.
-    ///
-    /// @return true if the list is full.
-    bool full() {
-        return(connections_.size() >= max_connections_);
-    }
-
-    /// @brief Fetches the number of connections in the list.
-    ///
-    /// @return the number of connections in the list.
-    size_t size() {
-        return connections_.size();
-    }
-
-    /// @brief Fetches the maximum number of connections.
-    ///
-    /// @return the maxim number of connections.
-    size_t max_connections() const {
-        return max_connections_;
-    }
-
-    /// @brief Fetches the URL.
-    ///
-    /// @return the URL.
-    const Url& getUrl() const {
-        return url_;
-    }
-
-private:
-    /// @brief URL supported by the list.
-    Url url_;
-
-    /// @brief Maximum number of concurrent connections allowed in the list.
-    size_t max_connections_;
-
-    /// @brief List of concurrent connections.
-    std::vector<ConnectionPtr> connections_;
-};
-
-/// @brief Defines a pointer to a ConnectionList instance.
-typedef boost::shared_ptr<ConnectionList> ConnectionListPtr;
-
 /// @brief Connection pool for managing multiple connections.
 ///
-/// Connection pool creates and destroys connections. It holds pointers
-/// to all created connections and can verify whether the particular
-/// connection is currently busy or idle. If the connection is idle, it
-/// uses this connection for new requests. If the connection is busy, it
-/// queues new requests until the connection becomes available.
+/// Connection pool creates and destroys URL destinations.  t manages
+/// connections to and requests for URLs.  Each time a request is
+/// submitted for a URL, it assigns it to an available idle connection,
+/// or if no idle connections are available, pushes the request on the queue
+/// for that URL.
 class ConnectionPool : public boost::enable_shared_from_this<ConnectionPool> {
 public:
 
@@ -601,7 +470,7 @@ public:
     /// @param max_url_connections maximum number of concurrent
     /// connections allowed per URL.
     explicit ConnectionPool(IOService& io_service, size_t max_url_connections)
-        : io_service_(io_service), conns_(), queue_(), mutex_(),
+        : io_service_(io_service), destinations_(), mutex_(),
         max_url_connections_(max_url_connections) {
     }
 
@@ -667,19 +536,6 @@ public:
         }
     }
 
-    /// @brief Closes a URL's connections and removes associated information
-    /// from the connection pool.
-    ///
-    /// @param url URL for which connection should be closed.
-    void closeUrlConnections(const Url& url) {
-        if (MultiThreadingMgr::instance().getMode()) {
-            std::lock_guard<std::mutex> lk(mutex_);
-            closeUrlConnectionsInternal(url);
-        } else {
-            closeUrlConnectionsInternal(url);
-        }
-    }
-
     /// @brief Closes all URLS and removes associated information from
     /// the connection pool.
     void closeAll() {
@@ -722,18 +578,12 @@ private:
     void processNextRequestInternal(const Url& url) {
         // Check if there is a queue for this URL. If there is no queue, there
         // is no request queued either.
-        auto queue_it = queue_.find(url);
-        if (queue_it != queue_.end()) {
-            if (!queue_it->second.empty()) {
+        DestinationPtr destination = findDestination(url);
+        if (destination) {
+            if (!destination->queueEmpty()) {
                 // We have at least one queued request. Do we have an
                 // idle connection?
-                auto conns_it = conns_.find(url);
-                if (conns_it == conns_.end()) {
-                    isc_throw(Unexpected, "no connection list for :" << url.toText());
-                }
-
-                // Now, look for an idle connection.
-                ConnectionPtr connection = conns_it->second->getIdleConnection();
+                ConnectionPtr connection = destination->getIdleConnection();
                 if (!connection) {
                     TOMS_TRACE_LOG("*** No idle connections, don't dequeue?");
                     // @todo Resolve this,  throw or just return, possibly log and return
@@ -750,9 +600,7 @@ private:
 
                 // Dequeue the oldest request and start a transaction for it using
                 // the idle connection.
-                RequestDescriptor desc = queue_it->second.front();
-                queue_it->second.pop();
-                desc.conn_ = connection;
+                RequestDescriptor desc = destination->popNextRequest();
                 connection->doTransaction(desc.request_, desc.response_,
                                           desc.request_timeout_, desc.callback_,
                                           desc.connect_callback_,
@@ -793,37 +641,35 @@ private:
                               const HttpClient::ConnectHandler& connect_callback,
                               const HttpClient::HandshakeHandler& handshake_callback,
                               const HttpClient::CloseHandler& close_callback) {
-        // Find the connection list for the url
-        ConnectionListPtr url_connections;
-        auto it = conns_.find(url);
-        if (it != conns_.end()) {
-            url_connections = it->second;
+        ConnectionPtr connection;
+        // Find the destination for the requested URL.
+        DestinationPtr destination = findDestination(url);
+        if (destination) {
+            // Found it, look for an idle connection.
+            connection = destination->getIdleConnection();
         } else {
-            // Doesn't exist yet, so create it.
-            url_connections.reset(new ConnectionList(url, max_url_connections_));
-            conns_[url] = url_connections;
+            // Doesn't exist yet so it's a new destination/
+            destination = addDestination(url);
         }
 
-        // Now, look for an idle connection.
-        ConnectionPtr connection = url_connections->getIdleConnection();
         if (!connection) {
-            if (url_connections->full()) {
+            if (destination->connectionsFull()) {
                 TOMS_TRACE_LOG("no idle connections, queue request");
                 // All connections busy, queue it.
-                queue_[url].push(RequestDescriptor(ConnectionPtr(), request, response,
-                                                   request_timeout,
-                                                   request_callback,
-                                                   connect_callback,
-                                                   handshake_callback,
-                                                   close_callback));
+                destination->pushRequest(RequestDescriptor(ConnectionPtr(), request, response,
+                                                           request_timeout,
+                                                           request_callback,
+                                                           connect_callback,
+                                                           handshake_callback,
+                                                           close_callback));
                 return;
             }
 
             // Room to make another connection with this destination, so make one.
             TOMS_TRACE_LOG("creating a new connection");
-            connection.reset(new Connection(io_service_, tls_context, 
+            connection.reset(new Connection(io_service_, tls_context,
                                             shared_from_this(), url));
-            url_connections->addConnection(connection);
+            destination->addConnection(connection);
         }
 
         // Use the connection to start the transaction.
@@ -832,40 +678,16 @@ private:
                                   connect_callback, handshake_callback, close_callback);
     }
 
-    /// @brief Closes a URL's connections and clears its queue
-    ///
-    /// This method should be called in a thread safe context.
-    ///
-    /// @param url URL for which connection should be closed.
-    void closeUrlConnectionsInternal(const Url& url) {
-        // Remove requests from the queue.
-        auto queue_it = queue_.find(url);
-        if (queue_it != queue_.end()) {
-            queue_.erase(queue_it);
-        }
-
-        // Close connections for the URL.
-        auto it = conns_.find(url);
-        if (it != conns_.end()) {
-            it->second->closeAllConnections();
-        }
-    }
-
     /// @brief Closes all connections for all URLs and removes associated
     /// information from the connection pool.
     ///
     /// This method should be called in a thread safe context.
     void closeAllInternal() {
-        // Remove all requests from the queue.
-        queue_.clear();
-
-        // Close connections for each URL.
-        for (auto it = conns_.begin(); it != conns_.end(); ++it) {
-            it->second->closeAllConnections();
+        for (auto const& destination : destinations_) {
+            destination.second->closeAllConnections();
         }
 
-        // Remove all of the connections.
-        conns_.clear();
+        destinations_.clear();
     }
 
     /// @brief Closes a connection if it has an out-of-band socket event
@@ -883,24 +705,21 @@ private:
     ///
     /// @param socket_fd socket descriptor to check
     void closeIfOutOfBandInternal(int socket_fd) {
-        // First we look for a connection with the socket.
-        for (auto conns_it = conns_.begin(); conns_it != conns_.end();
-             ++conns_it) {
-
-            ConnectionListPtr url_connections = conns_it->second;
-            ConnectionPtr connection = url_connections->findBySocketFd(socket_fd);
-            if (connection && connection->isTransactionOngoing()) {
-                // Matches but is in a transaction, all is well.
+        for (auto const& destination : destinations_) {
+            // First we look for a connection with the socket.
+            ConnectionPtr connection = destination.second->findBySocketFd(socket_fd);
+            if (connection) {
+                if (!connection->isTransactionOngoing()) {
+                    // Socket has no transaction, so any ready event is
+                    // out-of-band (other end probably closed), so
+                    // let's close it.  Note we do not remove any queued
+                    // requests, as this might somehow be occurring in
+                    // between them.
+                    destination.second->closeConnection(connection);
+                }
+
                 return;
             }
-
-            // Socket has no transaction, so any ready event is
-            // out-of-band (other end probably closed), so
-            // let's close it.  Note we do not remove any queued
-            // requests, as this might somehow be occurring in
-            // between them.
-            url_connections->closeConnection(connection);
-            break;
         }
     }
 
@@ -961,14 +780,218 @@ private:
         HttpClient::CloseHandler close_callback_;
     };
 
+    /// @brief Encapsulates connections and requests for a given URL
+    class Destination {
+    public:
+        /// @brief Constructor
+        ///
+        /// @param url server URL of this destination
+        /// @param max_connections maximum number of concurrent connections
+        /// allowed for in the list URL
+        Destination(Url url, size_t max_connections)
+        : url_(url), max_connections_(max_connections), connections_(), queue_() { }
+
+        /// @brief Destructor
+        ~Destination() {
+            closeAllConnections();
+        }
+
+        /// @brief Add a new connection
+        ///
+        /// @param connection the connection to add
+        ///
+        /// @throw BadValue if the maximum number of connections already
+        /// exist.
+        void addConnection(ConnectionPtr connection) {
+            if (connectionsFull()) {
+                isc_throw(BadValue, "URL: " << url_.toText()
+                      << ", already at maximum connections: "
+                      << max_connections_);
+            }
+
+            connections_.push_back(connection);
+        }
+
+        /// @brief Closes a connection and removes it from the list.
+        ///
+        /// @param connection the connection to remove
+        void closeConnection(ConnectionPtr connection) {
+            for (auto it = connections_.begin(); it != connections_.end(); ++it) {
+                if (*it == connection) {
+                    (*it)->close();
+                    connections_.erase(it);
+                    break;
+                }
+            }
+        }
+
+        /// @brief Closes all connections and clears the list.
+        void closeAllConnections() {
+            for (auto connection : connections_) {
+                connection->close();
+            }
+
+            connections_.clear();
+        }
+
+        /// @brief Find the first idle connection.
+        ///
+        /// Iterates over the existing connections and returns the
+        /// first connection which is not currently in a transaction.
+        ///
+        /// @return The first idle connection or an empty pointer if
+        /// all connections are busy.
+        ConnectionPtr getIdleConnection() {
+            for (auto connection : connections_) {
+                if (!connection->isTransactionOngoing()) {
+                    return(connection);
+                }
+            }
+
+            return(ConnectionPtr());
+        }
+
+        /// @brief Find a connection by its socket descriptor.
+        ///
+        /// @param socket_fd socket descriptor to find
+        ///
+        /// @return The connection or an empty pointer if no matching
+        /// connection exists.
+        ConnectionPtr findBySocketFd(int socket_fd) {
+            for (auto connection : connections_) {
+                if (connection->isMySocket(socket_fd)) {
+                    return(connection);
+                }
+            }
+
+            return(ConnectionPtr());
+        }
+
+        /// @brief Indicates if there are no connections in the list.
+        ///
+        /// @return true if the list is empty.
+        bool connectionsEmpty() {
+            return(connections_.empty());
+        }
+
+        /// @brief Indicates if list contains the maximum number.
+        ///
+        /// @return true if the list is full.
+        bool connectionsFull() {
+            return (connections_.size() >= max_connections_);
+        }
+
+        /// @brief Fetches the number of connections in the list.
+        ///
+        /// @return the number of connections in the list.
+        size_t connectionCount() {
+            return (connections_.size());
+        }
+
+        /// @brief Fetches the maximum number of connections.
+        ///
+        /// @return the maxim number of connections.
+        size_t getMaxConnections() const {
+            return (max_connections_);
+        }
+
+        /// @brief Fetches the URL.
+        ///
+        /// @return the URL.
+        const Url& getUrl() const {
+            return (url_);
+        }
+
+        /// @brief Indicates if request queue is empty.
+        ///
+        /// @return true if there are no requests queued.
+        bool queueEmpty() const {
+            return (queue_.empty());
+
+        }
+
+        /// @brief Adds a request to the end of the request queue.
+        ///
+        /// @param desc RequestDescriptor to queue.
+        void pushRequest(RequestDescriptor desc) {
+            queue_.push(desc);
+        }
+
+        /// @brief Adds a request to the end of the request queue.
+        ///
+        /// @return desc RequestDescriptor to queue.
+        RequestDescriptor popNextRequest() {
+            if (queue_.empty()) {
+                isc_throw(InvalidOperation, "cannot pop, queue is empty");
+            }
+
+            RequestDescriptor desc = queue_.front();
+            queue_.pop();
+            return(desc);
+        }
+
+    private:
+        /// @brief URL supported by the list.
+        Url url_;
+
+        /// @brief Maximum number of concurrent connections for this destination.
+        size_t max_connections_;
+
+        /// @brief List of concurrent connections.
+        std::vector<ConnectionPtr> connections_;
+
+        /// @brief Holds the queue of request for this destination.
+        std::queue<RequestDescriptor> queue_;
+    };
+
+    /// @brief Pointer to a Destination.
+    typedef boost::shared_ptr<Destination> DestinationPtr;
+
+    /// @brief Creates a new destination for the given URL.
+    ///
+    /// @param url URL of the new destination.
+    ///
+    /// @return Pointer to the newly created destination.
+    /// @note Must be called from within a thread-safe context.
+    DestinationPtr addDestination(const Url& url) {
+        DestinationPtr destination(new Destination(url, max_url_connections_));
+        destinations_[url] = destination;
+        return(destination);
+    }
+
+    /// @brief Fetches a destination by URL
+    ///
+    /// @param url Url of the destination desired.
+    ///
+    /// @return pointer the desired destination, empty pointer
+    /// if the destination does not exist.
+    DestinationPtr findDestination(const Url& url) const {
+        auto it = destinations_.find(url);
+        if (it != destinations_.end()) {
+            return (it->second);
+        }
+
+        return (DestinationPtr());
+    }
+
+    /// @brief Removes a destination by URL
+    ///
+    /// @param url Url of the destination to be removed.
+    /// @note Must be called from within a thread-safe context.
+    void removeDestination(const Url& url) {
+        // Remove requests from the queue.
+        auto it = destinations_.find(url);
+        if (it != destinations_.end()) {
+            it->second->closeAllConnections();
+            destinations_.erase(it);
+        }
+    }
+
     /// @brief A reference to the IOService that drives socket IO.
     IOService& io_service_;
 
-    /// @brief Holds lists of connections for each URL.
-    std::map<Url, ConnectionListPtr> conns_;
-
-    /// @brief Holds the queue of requests for each URL.
-    std::map<Url, std::queue<RequestDescriptor> > queue_;
+    /// @brief Map of Destinations by URL.
+    std::map<Url, DestinationPtr> destinations_;
 
     /// @brief Mutex to protect the internal state.
     std::mutex mutex_;
@@ -1222,7 +1245,7 @@ Connection::terminate(const boost::system::error_code& ec,
 void
 Connection::terminateInternal(const boost::system::error_code& ec,
                               const std::string& parsing_error) {
-    TOMS_TRACE_LOG("terminate on: " << getSocketFd() 
+    TOMS_TRACE_LOG("terminate on: " << getSocketFd()
                     << ", isTransactionOngoing? " << isTransactionOngoing());
     HttpResponsePtr response;
 
@@ -1296,7 +1319,7 @@ Connection::terminateInternal(const boost::system::error_code& ec,
     // another transaction if there is at least one.
     ConnectionPoolPtr conn_pool = conn_pool_.lock();
     if (conn_pool) {
-        TOMS_TRACE_LOG(" more work on? " << getSocketFd() 
+        TOMS_TRACE_LOG(" more work on? " << getSocketFd()
                     << ", isTransactionOngoing? " << isTransactionOngoing());
         if (MultiThreadingMgr::instance().getMode()) {
             UnlockGuard<std::mutex> lock(mutex_);
index d7b161d28c6e71f922ad290f12bc242997a6ac8a..427b11afc7ad3484a509ac08642cd007671a5047 100644 (file)
@@ -1,4 +1,4 @@
-// Copyright (C) 2017-2018 Internet Systems Consortium, Inc. ("ISC")
+// Copyright (C) 2017-2021 Internet Systems Consortium, Inc. ("ISC")
 //
 // This Source Code Form is subject to the terms of the Mozilla Public
 // License, v. 2.0. If a copy of the MPL was not distributed with this
@@ -24,7 +24,7 @@ Url::Url(const std::string& url)
 
 bool
 Url::operator<(const Url& url) const {
-    return (toText() < url.toText());
+    return (url_ < url.rawUrl());
 }
 
 Url::Scheme
index c96641b120786c33031f1c17d6afa95dcf3c85fc..a9e80694bb46e2793a3358190747868a76080fca 100644 (file)
@@ -83,6 +83,10 @@ public:
     /// @brief Returns textual representation of the URL.
     std::string toText() const;
 
+    const std::string& rawUrl() const {
+        return (url_);
+    }
+
 private:
 
     /// @brief Returns boolean value indicating if the URL is valid.