]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
author: Alex Rousskov <rousskov@measurement-factory.com>
authorChristos Tsantilas <chtsanti@users.sourceforge.net>
Wed, 30 Apr 2014 10:50:09 +0000 (13:50 +0300)
committerChristos Tsantilas <chtsanti@users.sourceforge.net>
Wed, 30 Apr 2014 10:50:09 +0000 (13:50 +0300)
cache_peer standby=N implementation.

The feature focus is to instantly provide a ready-to-use connection to a
cooperating cache peer, virtually at all times. This is useful when connection
establishment is "too slow" and/or when infrequent peer use prevents Squid from
combating slow connection establishment with the regular idle connection pool.

The feature is similar to Squid2 idle=N feature, but there are key differences:

* Standby connections are available virtually at all times, while Squid2 unused
  "idle" connections are available only for a short time after a peer request.

* All N standby connections are not opened at once, reducing the chance of
  the feature being mistaken for a DoS attack on a peer.

* More consistent support for peers with multiple IP addresses (peer IPs are
  cycled through, just like during regular Squid request forwarding).

Besides, "idle" is a poor choice of adjective for an unused connection pool
name because the same term is used for used persistent connections, which have
somewhat different properties, are stored in a different pool, may need
distinct set of tuning options, etc. It is better to use a dedicated term for
the new feature.

The relationship between the max-conn limit and standby/idle connections is a
complex one. After several rewrites and tests, Squid now obeys max-conn limit
when opening new standby connections and accounts for standby connections when
checking whether to allow peer use. This often works OK, but leads to standby
guarantee violations when non-standby connections approach the limit. The
alternative design where standby code ignores max-conn works better, but is
really difficult to explain and advocate because an admin expects max-conn to
cover all connections and because of the idle connections accounting and
maintenance bugs. We may come back to this when the idle connections code is
fixed.

Fixed max-conn documentation and XXXed a peerHTTPOkay() bug (now in
peerHasConnAvailable()) that results in max-conn limit preventing the use of a
peer with idle persistent connections.

Decided to use standby connections for non-retriable requests. Avoiding
standby connections for POSTs and such would violate the main purpose of the
feature: providing an instant ready-to-use connection. A user does not care
whether it is waiting too long for a GET or POST request. Actually, a user may
care more when their POST requests are delayed (because canceling and
retrying them is often scary from the user point of view). The idea behind
standby connections is that the admin is responsible for avoiding race
conditions by properly configuring the peering Squids. If such proper
configuration is not possible or the consequences of rare races (e.g., due to
peer shutdown) are more severe than the consequences of slow requests, the
admin should not use standby=N. This choice may become configurable in the
future.

TODO: Teach peer probing code to push successful probing connections into the
standby pool (when enabled). Should be done as a followup project because of
the differences in standby and probe connection opening code, especially when
SSL peers are supported. Will require some discussion.

A standby pool is using a full-blown PconnPool object for storage instead of
the smaller IdleConnList, like the ICAP code does. The primary reasons for
this design were:

* A peer may have multiple addresses and those addresses may change. PconnPool
has code to deal with multiple addresses while IdleConnList does not. I do not
think this difference is really used in this implementation, but I did not
want to face an unknown limitation. Note that ICAP does not support multiple
ICAP server addresses.

* PconnPool has reporting (and cache manager integration) code that we should
eventually improve and report standby-specific stats. When this happens,
PconnPool will probably become abstract and spawn two kids, one for pconn and
one for standby pools.

Seemingly unrelated changes triggered by standby=N addition:

* Removed PconnPool from fde.h. We used to create immortal PconnPool objects.
Now, standby pools are destroyed when their peer is destroyed. Sharing raw
pointers to such pools is too dangerous. We could use smart pointers, but
PconnPools do not really belong to such a low-level object like fde IMO.

* Added FwdState::closeServerConnection() to encapsulate server connection
closing code, including the new noteUses() maintenance. Also updated
FwdState::serverClosed() to do the same maintenance.

* Close all connections in IdleConnList upon deletion. The old code did
not care because we never deleted PconnPools (although I am not sure
there were no bugs related to ICAP service pools which use IdleConnList
directly and do get destroyed).

* Fixed PconnPool::dumpHash(). It was listing the first entry twice because
the code misused misnamed hash_next().

* Removed unnecessary hard-coded limit on the number of PconnPools. Use
std::set for their storage.

* Fixed very stale PconnPool::pop() documentation and polished its code.

* Added RegisteredRunner::sync() method to use during Squid reconfiguration:

The existing run() method and destructor are great for the initial
configuration and final shutdown, but do not work well for reconfiguration
when you do not want to completely destroy and then recreate the state.
The sync() method (called via SyncRegistered) can be used for that.

Eventually, the reconfiguration API should present the old "saved" config
and the new "current" config to RegisteredRunners so that they can update
their modules/features intelligently. For now, they just see the new config.

This is a Measurement Factory project

20 files changed:
src/CachePeer.h
src/FwdState.cc
src/FwdState.h
src/Makefile.am
src/PeerPoolMgr.cc [new file with mode: 0644]
src/PeerPoolMgr.h [new file with mode: 0644]
src/adaptation/icap/ServiceRep.cc
src/cache_cf.cc
src/cf.data.pre
src/comm.cc
src/comm/Connection.cc
src/fde.cc
src/fde.h
src/hier_code.h
src/neighbors.cc
src/neighbors.h
src/pconn.cc
src/pconn.h
src/tests/stub_neighbors.cc [new file with mode: 0644]
src/tests/stub_pconn.cc

index 13c2b5558d37fd09e5c049d2cf29d93e179693c2..9ad8c52df81320447f48a7f421738116acb319e8 100644 (file)
@@ -30,6 +30,7 @@
  */
 
 #include "acl/forward.h"
+#include "base/CbcPointer.h"
 #include "enums.h"
 #include "icp_opcode.h"
 #include "ip/Address.h"
@@ -43,7 +44,9 @@
 
 class CachePeerDomainList;
 class NeighborTypeDomainList;
+class PconnPool;
 class PeerDigest;
+class PeerPoolMgr;
 
 // currently a POD
 class CachePeer
@@ -186,6 +189,12 @@ public:
     time_t connect_timeout;
     int connect_fail_limit;
     int max_conn;
+    struct {
+        PconnPool *pool; ///< idle connection pool for this peer
+        CbcPointer<PeerPoolMgr> mgr; ///< pool manager
+        int limit; ///< the limit itself
+        bool waitingForClose; ///< a conn must close before we open a standby conn
+    } standby; ///< optional "cache_peer standby=limit" feature
     char *domain;       /* Forced domain */
 #if USE_OPENSSL
 
index 3e492e37f68fa3832584f7e9c4115847f50d3115..3b07d463376b5394920068cb7e2c3abf3430307d 100644 (file)
@@ -65,6 +65,7 @@
 #include "neighbors.h"
 #include "pconn.h"
 #include "PeerSelectState.h"
+#include "PeerPoolMgr.h"
 #include "SquidConfig.h"
 #include "SquidTime.h"
 #include "Store.h"
@@ -93,7 +94,7 @@ static OBJH fwdStats;
 #define MAX_FWD_STATS_IDX 9
 static int FwdReplyCodes[MAX_FWD_STATS_IDX + 1][Http::scInvalidHeader + 1];
 
-static PconnPool *fwdPconnPool = new PconnPool("server-side");
+static PconnPool *fwdPconnPool = new PconnPool("server-side", NULL);
 CBDATA_CLASS_INIT(FwdState);
 
 #if USE_OPENSSL
@@ -129,10 +130,7 @@ FwdState::abort(void* d)
     Pointer tmp = fwd; // Grab a temporary pointer to keep the object alive during our scope.
 
     if (Comm::IsConnOpen(fwd->serverConnection())) {
-        comm_remove_close_handler(fwd->serverConnection()->fd, fwdServerClosedWrapper, fwd);
-        debugs(17, 3, HERE << "store entry aborted; closing " <<
-               fwd->serverConnection());
-        fwd->serverConnection()->close();
+        fwd->closeServerConnection("store entry aborted");
     } else {
         debugs(17, 7, HERE << "store entry aborted; no connection to close");
     }
@@ -140,6 +138,15 @@ FwdState::abort(void* d)
     fwd->self = NULL;
 }
 
+void
+FwdState::closeServerConnection(const char *reason)
+{
+    debugs(17, 3, "because " << reason << "; " << serverConn);
+    comm_remove_close_handler(serverConn->fd, fwdServerClosedWrapper, this);
+    fwdPconnPool->noteUses(fd_table[serverConn->fd].pconn.uses);
+    serverConn->close();
+}
+
 /**** PUBLIC INTERFACE ********************************************************/
 
 FwdState::FwdState(const Comm::ConnectionPointer &client, StoreEntry * e, HttpRequest * r, const AccessLogEntryPointer &alp):
@@ -295,11 +302,8 @@ FwdState::~FwdState()
         calls.connector = NULL;
     }
 
-    if (Comm::IsConnOpen(serverConn)) {
-        comm_remove_close_handler(serverConnection()->fd, fwdServerClosedWrapper, this);
-        debugs(17, 3, HERE << "closing FD " << serverConnection()->fd);
-        serverConn->close();
-    }
+    if (Comm::IsConnOpen(serverConn))
+        closeServerConnection("~FwdState");
 
     serverDestinations.clear();
 
@@ -615,7 +619,10 @@ FwdState::checkRetriable()
 void
 FwdState::serverClosed(int fd)
 {
-    debugs(17, 2, HERE << "FD " << fd << " " << entry->url());
+    debugs(17, 2, "FD " << fd << " " << entry->url() << " after " <<
+           fd_table[fd].pconn.uses << " requests");
+    if (serverConnection()->fd == fd) // should be, but not critical to assert
+        fwdPconnPool->noteUses(fd_table[fd].pconn.uses);
     retryOrBail();
 }
 
@@ -833,7 +840,7 @@ FwdState::connectStart()
     // This does not increase the total number of connections because we just
     // closed the connection that failed the race. And re-pinning assumes this.
     if (pconnRace != raceHappened)
-        temp = fwdPconnPool->pop(serverDestinations[0], host, checkRetriable());
+        temp = pconnPop(serverDestinations[0], host);
 
     const bool openedPconn = Comm::IsConnOpen(temp);
     pconnRace = openedPconn ? racePossible : raceImpossible;
@@ -895,7 +902,7 @@ FwdState::dispatch()
 
     fd_note(serverConnection()->fd, entry->url());
 
-    fd_table[serverConnection()->fd].noteUse(fwdPconnPool);
+    fd_table[serverConnection()->fd].noteUse();
 
     /*assert(!EBIT_TEST(entry->flags, ENTRY_DISPATCHED)); */
     assert(entry->ping_status != PING_WAITING);
@@ -1128,6 +1135,22 @@ FwdState::pconnPush(Comm::ConnectionPointer &conn, const char *domain)
     }
 }
 
+Comm::ConnectionPointer
+FwdState::pconnPop(const Comm::ConnectionPointer &dest, const char *domain)
+{
+    // always call shared pool first because we need to close an idle
+    // connection there if we have to use a standby connection.
+    Comm::ConnectionPointer conn = fwdPconnPool->pop(dest, domain, checkRetriable());
+    if (!Comm::IsConnOpen(conn)) {
+        // either there was no pconn to pop or this is not a retriable xaction
+        if (CachePeer *peer = dest->getPeer()) {
+            if (peer->standby.pool)
+                conn = peer->standby.pool->pop(dest, domain, true);
+        }
+    }
+    return conn; // open, closed, or nil
+}
+
 void
 FwdState::initModule()
 {
index 7673f9a2d3405bf22c36b1e3e99208213a153b1b..36680b9ba554eb3d01b98b601c7983a67d06f1cf 100644 (file)
@@ -16,6 +16,8 @@
 
 class AccessLogEntry;
 typedef RefCount<AccessLogEntry> AccessLogEntryPointer;
+class PconnPool;
+typedef RefCount<PconnPool> PconnPoolPointer;
 class ErrorState;
 class HttpRequest;
 
@@ -76,6 +78,9 @@ public:
     bool checkRetry();
     bool checkRetriable();
     void dispatch();
+    /// Pops a connection from connection pool if available. If not
+    /// checks the peer stand-by connection pool for available connection.
+    Comm::ConnectionPointer pconnPop(const Comm::ConnectionPointer &dest, const char *domain);
     void pconnPush(Comm::ConnectionPointer & conn, const char *domain);
 
     bool dontRetry() { return flags.dont_retry; }
@@ -103,6 +108,9 @@ private:
 #endif
     static void RegisterWithCacheManager(void);
 
+    /// stops monitoring server connection for closure and updates pconn stats
+    void closeServerConnection(const char *reason);
+
 public:
     StoreEntry *entry;
     HttpRequest *request;
index 2d88ff027ef993740fd4c987c5c9cabb6603fb81..5480bba3afc2ae240955a33e9b4b4c22f712c8da 100644 (file)
@@ -472,6 +472,8 @@ squid_SOURCES = \
        peer_sourcehash.cc \
        peer_userhash.h \
        peer_userhash.cc \
+       PeerPoolMgr.h \
+       PeerPoolMgr.cc \
        PeerSelectState.h \
        PingData.h \
        protos.h \
@@ -1518,6 +1520,8 @@ tests_testCacheManager_SOURCES = \
        peer_sourcehash.cc \
        peer_userhash.h \
        peer_userhash.cc \
+       PeerPoolMgr.h \
+       PeerPoolMgr.cc \
        redirect.h \
        tests/stub_redirect.cc \
        refresh.h \
@@ -1744,6 +1748,7 @@ tests_testDiskIO_SOURCES = \
        tests/stub_MemStore.cc \
        mime.h \
        tests/stub_mime.cc \
+       tests/stub_neighbors.cc \
        tests/stub_pconn.cc \
        tests/stub_Port.cc \
        tests/stub_stat.cc \
@@ -1899,6 +1904,8 @@ tests_testEvent_SOURCES = \
        HttpParser.cc \
        HttpParser.h \
        HttpReply.cc \
+       PeerPoolMgr.h \
+       PeerPoolMgr.cc \
        RequestFlags.h \
        RequestFlags.cc \
        HttpRequest.cc \
@@ -2147,6 +2154,8 @@ tests_testEventLoop_SOURCES = \
        HttpParser.cc \
        HttpParser.h \
        HttpReply.cc \
+       PeerPoolMgr.h \
+       PeerPoolMgr.cc \
        RequestFlags.h \
        RequestFlags.cc \
        HttpRequest.cc \
@@ -2391,6 +2400,8 @@ tests_test_http_range_SOURCES = \
        HttpParser.cc \
        HttpParser.h \
        HttpReply.cc \
+       PeerPoolMgr.h \
+       PeerPoolMgr.cc \
        RequestFlags.h \
        RequestFlags.cc \
        HttpRequest.cc \
@@ -2731,6 +2742,8 @@ tests_testHttpRequest_SOURCES = \
        peer_sourcehash.cc \
        peer_userhash.h \
        peer_userhash.cc \
+       PeerPoolMgr.h \
+       PeerPoolMgr.cc \
        redirect.h \
        tests/stub_libauth_acls.cc \
        tests/stub_redirect.cc \
@@ -3081,6 +3094,7 @@ tests_testUfs_SOURCES = \
        tests/stub_libeui.cc \
        tests/stub_libicmp.cc \
        tests/stub_MemStore.cc \
+       tests/stub_neighbors.cc \
        tests/stub_pconn.cc \
        tests/stub_Port.cc \
        tests/stub_UdsOp.cc \
@@ -3373,6 +3387,7 @@ tests_testRock_SOURCES = \
        tests/stub_MemStore.cc \
        mime.h \
        tests/stub_mime.cc \
+       tests/stub_neighbors.cc \
        tests/stub_Port.cc \
        tests/stub_pconn.cc \
        tests/stub_store_client.cc \
@@ -3513,6 +3528,8 @@ tests_testURL_SOURCES = \
        HttpParser.cc \
        HttpParser.h \
        HttpReply.cc \
+       PeerPoolMgr.h \
+       PeerPoolMgr.cc \
        RequestFlags.h \
        RequestFlags.cc \
        HttpRequest.cc \
diff --git a/src/PeerPoolMgr.cc b/src/PeerPoolMgr.cc
new file mode 100644 (file)
index 0000000..27e62d0
--- /dev/null
@@ -0,0 +1,283 @@
+#include "squid.h"
+#include "base/AsyncJobCalls.h"
+#include "base/RunnersRegistry.h"
+#include "CachePeer.h"
+#include "comm/Connection.h"
+#include "comm/ConnOpener.h"
+#include "Debug.h"
+#include "fd.h"
+#include "FwdState.h"
+#include "globals.h"
+#include "HttpRequest.h"
+#include "neighbors.h"
+#include "pconn.h"
+#include "PeerPoolMgr.h"
+#include "SquidConfig.h"
+#if USE_OPENSSL
+#include "ssl/PeerConnector.h"
+#endif
+
+CBDATA_CLASS_INIT(PeerPoolMgr);
+
+#if USE_OPENSSL
+/// Gives Ssl::PeerConnector access to Answer in the PeerPoolMgr callback dialer.
+class MyAnswerDialer: public UnaryMemFunT<PeerPoolMgr, Ssl::PeerConnectorAnswer, Ssl::PeerConnectorAnswer&>,
+        public Ssl::PeerConnector::CbDialer
+{
+public:
+    MyAnswerDialer(const JobPointer &aJob, Method aMethod):
+                   UnaryMemFunT<PeerPoolMgr, Ssl::PeerConnectorAnswer, Ssl::PeerConnectorAnswer&>(aJob, aMethod, Ssl::PeerConnectorAnswer()) {}
+
+    /* Ssl::PeerConnector::CbDialer API */
+    virtual Ssl::PeerConnectorAnswer &answer() { return arg1; }
+};
+#endif
+
+PeerPoolMgr::PeerPoolMgr(CachePeer *aPeer): AsyncJob("PeerPoolMgr"),
+        peer(cbdataReference(aPeer)),
+        request(),
+        opener(),
+        securer(),
+        closer(),
+        addrUsed(0)
+{
+}
+
+PeerPoolMgr::~PeerPoolMgr()
+{
+    cbdataReferenceDone(peer);
+}
+
+void
+PeerPoolMgr::start()
+{
+    AsyncJob::start();
+
+    // ErrorState, getOutgoingAddress(), and other APIs may require a request.
+    // We fake one. TODO: Optionally send this request to peers?
+    request = new HttpRequest(Http::METHOD_OPTIONS, AnyP::PROTO_HTTP, "*");
+    request->SetHost(peer->host);
+
+    checkpoint("peer initialized");
+}
+
+void
+PeerPoolMgr::swanSong()
+{
+    AsyncJob::swanSong();
+}
+
+bool
+PeerPoolMgr::validPeer() const {
+    return peer && cbdataReferenceValid(peer) && peer->standby.pool;
+}
+
+bool
+PeerPoolMgr::doneAll() const
+{
+    return !(validPeer() && peer->standby.limit) && AsyncJob::doneAll();
+}
+
+void
+PeerPoolMgr::handleOpenedConnection(const CommConnectCbParams &params)
+{
+    opener = NULL;
+
+    if (!validPeer()) {
+        debugs(48, 3, "peer gone");
+        if (params.conn != NULL)
+            params.conn->close();
+        return;
+    }
+
+    if (params.flag != COMM_OK) {
+        /* it might have been a timeout with a partially open link */
+        if (params.conn != NULL)
+            params.conn->close();
+        peerConnectFailed(peer);
+        checkpoint("conn opening failure"); // may retry
+        return;
+    }
+
+    Must(params.conn != NULL);
+
+#if USE_OPENSSL
+    // Handle SSL peers.
+    if (peer->use_ssl) {
+        typedef CommCbMemFunT<PeerPoolMgr, CommCloseCbParams> CloserDialer;
+        closer = JobCallback(48, 3, CloserDialer, this,
+                             PeerPoolMgr::handleSecureClosure);
+        comm_add_close_handler(params.conn->fd, closer);
+
+        securer = asyncCall(48, 4, "PeerPoolMgr::handleSecuredPeer",
+                            MyAnswerDialer(this, &PeerPoolMgr::handleSecuredPeer));
+        Ssl::PeerConnector *connector =
+            new Ssl::PeerConnector(request, params.conn, securer);
+        AsyncJob::Start(connector); // will call our callback
+        return;
+    }
+#endif
+
+    pushNewConnection(params.conn);
+}
+
+void
+PeerPoolMgr::pushNewConnection(const Comm::ConnectionPointer &conn)
+{
+    Must(validPeer());
+    Must(Comm::IsConnOpen(conn));
+    peer->standby.pool->push(conn, NULL /* domain */);
+    // push() will trigger a checkpoint()
+}
+
+#if USE_OPENSSL
+void
+PeerPoolMgr::handleSecuredPeer(Ssl::PeerConnectorAnswer &answer)
+{
+    Must(securer != NULL);
+    securer = NULL;
+
+    if (closer != NULL) {
+        if (answer.conn != NULL)
+            comm_remove_close_handler(answer.conn->fd, closer);
+        else
+            closer->cancel("securing completed");
+        closer = NULL;
+    }
+
+    if (!validPeer()) {
+        debugs(48, 3, "peer gone");
+        if (answer.conn != NULL)
+            answer.conn->close();
+        return;
+    }
+
+    if (answer.error.get()) {
+        if (answer.conn != NULL)
+            answer.conn->close();
+        // PeerConnector calls peerConnectFailed() for us;
+        checkpoint("conn securing failure"); // may retry
+        return;
+    }
+
+    pushNewConnection(answer.conn);
+}
+
+void
+PeerPoolMgr::handleSecureClosure(const CommCloseCbParams &params)
+{
+    Must(closer != NULL);
+    Must(securer != NULL);
+    securer->cancel("conn closed by a 3rd party");
+    securer = NULL;
+    closer = NULL;
+    // allow the closing connection to fully close before we check again
+    Checkpoint(this, "conn closure while securing");
+}
+#endif
+
+void
+PeerPoolMgr::openNewConnection()
+{
+    // KISS: Do nothing else when we are already doing something.
+    if (opener != NULL || securer != NULL || shutting_down) {
+        debugs(48, 7, "busy: " << opener << '|' << securer << '|' << shutting_down);
+        return; // there will be another checkpoint when we are done opening/securing
+    }
+
+    // Do not talk to a peer until it is ready.
+    if (!neighborUp(peer)) // provides debugging
+        return; // there will be another checkpoint when peer is up
+
+    // Do not violate peer limits.
+    if (!peerCanOpenMore(peer)) { // provides debugging
+        peer->standby.waitingForClose = true; // may already be true
+        return; // there will be another checkpoint when a peer conn closes
+    }
+
+    // Do not violate global restrictions.
+    if (fdUsageHigh()) {
+        debugs(48, 7, "overwhelmed");
+        peer->standby.waitingForClose = true; // may already be true
+        // There will be another checkpoint when a peer conn closes OR when
+        // a future pop() fails due to an empty pool. See PconnPool::pop().
+        return;
+    }
+
+    peer->standby.waitingForClose = false;
+
+    Comm::ConnectionPointer conn = new Comm::Connection;
+    Must(peer->n_addresses); // guaranteed by neighborUp() above
+    // cycle through all available IP addresses
+    conn->remote = peer->addresses[addrUsed++ % peer->n_addresses];
+    conn->remote.port(peer->http_port);
+    conn->peerType = STANDBY_POOL; // should be reset by peerSelect()
+    conn->setPeer(peer);
+    getOutgoingAddress(request.getRaw(), conn);
+    GetMarkingsToServer(request.getRaw(), *conn);
+
+    const int ctimeout = peer->connect_timeout > 0 ?
+                         peer->connect_timeout : Config.Timeout.peer_connect;
+    typedef CommCbMemFunT<PeerPoolMgr, CommConnectCbParams> Dialer;
+    opener = JobCallback(48, 5, Dialer, this, PeerPoolMgr::handleOpenedConnection);
+    Comm::ConnOpener *cs = new Comm::ConnOpener(conn, opener, ctimeout);
+    AsyncJob::Start(cs);
+}
+
+void
+PeerPoolMgr::closeOldConnections(const int howMany)
+{
+    debugs(48, 8, howMany);
+    peer->standby.pool->closeN(howMany);
+}
+
+void
+PeerPoolMgr::checkpoint(const char *reason)
+{
+    if (!validPeer()) {
+        debugs(48, 3, reason << " and peer gone");
+        return; // nothing to do after our owner dies; the job will quit
+    }
+
+    const int count = peer->standby.pool->count();
+    const int limit = peer->standby.limit;
+    debugs(48, 7, reason << " with " << count << " ? " << limit);
+
+    if (count < limit)
+        openNewConnection();
+    else if (count > limit)
+        closeOldConnections(count - limit);
+}
+
+void
+PeerPoolMgr::Checkpoint(const Pointer &mgr, const char *reason)
+{
+    CallJobHere1(48, 5, mgr, PeerPoolMgr, checkpoint, reason);
+}
+
+/// launches PeerPoolMgrs for peers configured with standby.limit
+class PeerPoolMgrsRr: public RegisteredRunner
+{
+public:
+    /* RegisteredRunner API */
+    virtual void useConfig() { syncConfig(); }
+    virtual void syncConfig();
+};
+
+RunnerRegistrationEntry(PeerPoolMgrsRr);
+
+void
+PeerPoolMgrsRr::syncConfig()
+{
+    for (CachePeer *p = Config.peers; p; p = p->next) {
+        // On reconfigure, Squid deletes the old config (and old peers in it),
+        // so should always be dealing with a brand new configuration.
+        assert(!p->standby.mgr);
+        assert(!p->standby.pool);
+        if (p->standby.limit) {
+            p->standby.mgr = new PeerPoolMgr(p);
+            p->standby.pool = new PconnPool(p->name, p->standby.mgr);
+            AsyncJob::Start(p->standby.mgr.get());
+        }
+    }
+}
diff --git a/src/PeerPoolMgr.h b/src/PeerPoolMgr.h
new file mode 100644 (file)
index 0000000..e792fb0
--- /dev/null
@@ -0,0 +1,68 @@
+#ifndef SQUID_PEERPOOLMGR_H
+#define SQUID_PEERPOOLMGR_H
+
+#include "base/AsyncJob.h"
+#include "comm/forward.h"
+
+class HttpRequest;
+class CachePeer;
+class CommConnectCbParams;
+
+#if USE_OPENSSL
+namespace Ssl {
+    class PeerConnectorAnswer;
+}
+#endif
+
+/// Maintains an fixed-size "standby" PconnPool for a single CachePeer.
+class PeerPoolMgr: public AsyncJob
+{
+public:
+    typedef CbcPointer<PeerPoolMgr> Pointer;
+
+    // syncs mgr state whenever connection-related peer or pool state changes
+    static void Checkpoint(const Pointer &mgr, const char *reason);
+
+    explicit PeerPoolMgr(CachePeer *aPeer);
+    virtual ~PeerPoolMgr();
+
+protected:
+    /* AsyncJob API */
+    virtual void start();
+    virtual void swanSong();
+    virtual bool doneAll() const;
+
+    /// whether the peer is still out there and in a valid state we can safely use
+    bool validPeer() const;
+
+    /// Starts new connection, or closes the excess connections
+    /// according pool configuration
+    void checkpoint(const char *reason);
+    /// starts the process of opening a new standby connection (if possible)
+    void openNewConnection();
+    /// closes 'howMany' standby connections
+    void closeOldConnections(const int howMany);
+
+    /// Comm::ConnOpener calls this when done opening a connection for us
+    void handleOpenedConnection(const CommConnectCbParams &params);
+#if USE_OPENSSL
+    /// Ssl::PeerConnector callback
+    void handleSecuredPeer(Ssl::PeerConnectorAnswer &answer);
+    /// called when the connection we are trying to secure is closed by a 3rd party
+    void handleSecureClosure(const CommCloseCbParams &params);
+#endif
+    /// the final step in connection opening (and, optionally, securing) sequence
+    void pushNewConnection(const Comm::ConnectionPointer &conn);
+
+private:
+    CachePeer *peer; ///< the owner of the pool we manage
+    RefCount<HttpRequest> request; ///< fake HTTP request for conn opening code
+    AsyncCall::Pointer opener; ///< whether we are opening a connection
+    AsyncCall::Pointer securer; ///< whether we are securing a connection
+    AsyncCall::Pointer closer; ///< monitors conn while we are securing it
+    unsigned int addrUsed; ///< counter for cycling through peer addresses
+
+    CBDATA_CLASS2(PeerPoolMgr);
+};
+
+#endif /* SQUID_PEERPOOLMGR_H */
index 4c4136c04c9826ce5da7fcec5d9c81ea9bbf27c1..8274551add9a9bc3ab5e45527e79fc1537ae0429 100644 (file)
@@ -147,7 +147,7 @@ void Adaptation::Icap::ServiceRep::putConnection(const Comm::ConnectionPointer &
 void Adaptation::Icap::ServiceRep::noteConnectionUse(const Comm::ConnectionPointer &conn)
 {
     Must(Comm::IsConnOpen(conn));
-    fd_table[conn->fd].noteUse(NULL); // pconn re-use but not via PconnPool API
+    fd_table[conn->fd].noteUse(); // pconn re-use, albeit not via PconnPool API
 }
 
 void Adaptation::Icap::ServiceRep::noteConnectionFailed(const char *comment)
index fd19be776d4d62766d6fa7ec393e6d1217e08c4b..c81c387f4e1775e87e33e5b1f56da3fbc3cb707d 100644 (file)
@@ -69,7 +69,9 @@
 #include "neighbors.h"
 #include "NeighborTypeDomainList.h"
 #include "Parsing.h"
+#include "pconn.h"
 #include "PeerDigest.h"
+#include "PeerPoolMgr.h"
 #include "RefreshPattern.h"
 #include "rfc1738.h"
 #include "SBufList.h"
@@ -2267,6 +2269,8 @@ parse_peer(CachePeer ** head)
             p->options.allow_miss = true;
         } else if (!strncmp(token, "max-conn=", 9)) {
             p->max_conn = xatoi(token + 9);
+        } else if (!strncmp(token, "standby=", 8)) {
+            p->standby.limit = xatoi(token + 8);
         } else if (!strcmp(token, "originserver")) {
             p->options.originserver = true;
         } else if (!strncmp(token, "name=", 5)) {
@@ -2340,6 +2344,9 @@ parse_peer(CachePeer ** head)
     if (peerFindByName(p->name))
         fatalf("ERROR: cache_peer %s specified twice\n", p->name);
 
+    if (p->max_conn > 0 && p->max_conn < p->standby.limit)
+        fatalf("ERROR: cache_peer %s max-conn=%d is lower than its standby=%d\n", p->host, p->max_conn, p->standby.limit);
+
     if (p->weight < 1)
         p->weight = 1;
 
@@ -2384,6 +2391,9 @@ free_peer(CachePeer ** P)
         cbdataReferenceDone(p->digest);
 #endif
 
+        // the mgr job will notice that its owner is gone and stop
+        PeerPoolMgr::Checkpoint(p->standby.mgr, "peer gone");
+        delete p->standby.pool;
         cbdataFree(p);
     }
 
index 41e68db95081bbea85644bc239feb541fc2beb5e..43c3902edd45d26d8cb43905dbad71e6f60d6bb4 100644 (file)
@@ -3086,7 +3086,8 @@ DOC_START
        
        connect-fail-limit=N
                        How many times connecting to a peer must fail before
-                       it is marked as down. Default is 10.
+                       it is marked as down. Standby connection failures
+                       count towards this limit. Default is 10.
        
        allow-miss      Disable Squid's use of only-if-cached when forwarding
                        requests to siblings. This is primarily useful when
@@ -3096,8 +3097,50 @@ DOC_START
                        For example to deny peer usage on requests from peer
                        by denying cache_peer_access if the source is a peer.
        
-       max-conn=N      Limit the amount of connections Squid may open to this
-                       peer. see also 
+       max-conn=N      Limit the number of concurrent connections the Squid
+                       may open to this peer, including already opened idle
+                       and standby connections. There is no peer-specific
+                       connection limit by default.
+       
+                       A peer exceeding the limit is not used for new
+                       requests unless a standby connection is available.
+       
+                       max-conn currently works poorly with idle persistent
+                       connections: When a peer reaches its max-conn limit,
+                       and there are idle persistent connections to the peer,
+                       the peer may not be selected because the limiting code
+                       does not know whether Squid can reuse those idle
+                       connections.
+       
+       standby=N       Maintain a pool of N "hot standby" connections to an
+                       UP peer, available for requests when no idle
+                       persistent connection is available (or safe) to use.
+                       By default and with zero N, no such pool is maintained.
+                       N must not exceed the max-conn limit (if any).
+       
+                       At start or after reconfiguration, Squid opens new TCP
+                       standby connections until there are N connections
+                       available and then replenishes the standby pool as
+                       opened connections are used up for requests. A used
+                       connection never goes back to the standby pool, but
+                       may go to the regular idle persistent connection pool
+                       shared by all peers and origin servers.
+       
+                       Squid never opens multiple new standby connections
+                       concurrently.  This one-at-a-time approach minimizes
+                       flooding-like effect on peers. Furthermore, just a few
+                       standby connections should be sufficient in most cases
+                       to supply most new requests with a ready-to-use
+                       connection.
+       
+                       Standby connections obey server_idle_pconn_timeout.
+                       For the feature to work as intended, the peer must be
+                       configured to accept and keep them open longer than
+                       the idle timeout at the connecting Squid, to minimize
+                       race conditions typical to idle used persistent
+                       connections. Default request_timeout and
+                       server_idle_pconn_timeout values ensure such a
+                       configuration.
        
        name=xxx        Unique name for the peer.
                        Required if you have multiple peers on the same host
index ae1498bccdb084bd6358db978cb9de078c6ba293..2548680d0556a425d0aa15eaa58c7139b764ad48 100644 (file)
@@ -1180,9 +1180,6 @@ _comm_close(int fd, char const *file, int line)
 
     commCallCloseHandlers(fd);
 
-    if (F->pconn.uses && F->pconn.pool)
-        F->pconn.pool->noteUses(F->pconn.uses);
-
     comm_empty_os_read_buffers(fd);
 
     AsyncCall::Pointer completeCall=commCbCall(5,4, "comm_close_complete",
index 5cd9e3d71cf2c213a01330fc94cca1abef0aab24..79f3e35b866b2782df25edc82c11d0280ebbd738 100644 (file)
@@ -4,6 +4,7 @@
 #include "comm.h"
 #include "comm/Connection.h"
 #include "fde.h"
+#include "neighbors.h"
 #include "SquidTime.h"
 
 class CachePeer;
@@ -66,7 +67,7 @@ Comm::Connection::close()
         comm_close(fd);
         fd = -1;
         if (CachePeer *p=getPeer())
-            -- p->stats.conn_open;
+            peerConnClosed(p);
     }
 }
 
index fc82c0a63da328dd50154d2236eafa286cabd9f3..6128efd59c52e862dbccec1ee547492a04624c07 100644 (file)
@@ -122,8 +122,7 @@ fde::remoteAddr() const
 }
 
 void
-fde::noteUse(PconnPool *pool)
+fde::noteUse()
 {
     ++ pconn.uses;
-    pconn.pool = pool;
 }
index 08ad8604f82b0bf9513475af6b67d7bd6a9e93b1..8b8058fb6c031e6d23cdc75d23f62b257745fcc6 100644 (file)
--- a/src/fde.h
+++ b/src/fde.h
@@ -42,7 +42,6 @@
 class ClientInfo;
 #endif
 
-class PconnPool;
 class dwrite_q;
 class _fde_disk
 {
@@ -70,7 +69,7 @@ public:
     char const *remoteAddr() const;
     void dumpStats (StoreEntry &, int);
     bool readPending(int);
-    void noteUse(PconnPool *);
+    void noteUse();
 
 public:
 
@@ -110,7 +109,6 @@ public:
 
     struct {
         int uses;                   /* ie # req's over persistent conn */
-        PconnPool *pool;
     } pconn;
 
 #if USE_DELAY_POOLS
@@ -167,7 +165,6 @@ private:
         bytes_read = 0;
         bytes_written = 0;
         pconn.uses = 0;
-        pconn.pool = NULL;
 #if USE_DELAY_POOLS
         clientInfo = NULL;
 #endif
index e8b57d69a55b5dfe65ce427152bfb65529fbfdf2..19307ecc3f1e45937f069e5bc2b19df599f0b154 100644 (file)
@@ -26,6 +26,7 @@ typedef enum {
     SOURCEHASH_PARENT,
     PINNED,
     ORIGINAL_DST,
+    STANDBY_POOL,
     HIER_MAX
 } hier_code;
 
index eb2dd52ea6a9e155fcb69425b98d4158c77d840a..0e02bd791449598aa3c8fe7b4f3cec403561883f 100644 (file)
@@ -54,7 +54,9 @@
 #include "multicast.h"
 #include "neighbors.h"
 #include "NeighborTypeDomainList.h"
+#include "pconn.h"
 #include "PeerDigest.h"
+#include "PeerPoolMgr.h"
 #include "PeerSelectState.h"
 #include "RequestFlags.h"
 #include "SquidConfig.h"
@@ -246,13 +248,46 @@ peerWouldBePinged(const CachePeer * p, HttpRequest * request)
     return 1;
 }
 
+bool
+peerCanOpenMore(const CachePeer *p)
+{
+    const int effectiveLimit = p->max_conn <= 0 ? Squid_MaxFD : p->max_conn;
+    const int remaining = effectiveLimit - p->stats.conn_open;
+    debugs(15, 7, remaining << '=' << effectiveLimit << '-' << p->stats.conn_open);
+    return remaining > 0;
+}
+
+bool
+peerHasConnAvailable(const CachePeer *p)
+{
+    // Standby connections can be used without opening new connections.
+    const int standbys = p->standby.pool ? p->standby.pool->count() : 0;
+
+    // XXX: Some idle pconns can be used without opening new connections.
+    // Complication: Idle pconns cannot be reused for some requests.
+    const int usableIdles = 0;
+
+    const int available = standbys + usableIdles;
+    debugs(15, 7, available << '=' << standbys << '+' << usableIdles);
+    return available > 0;
+}
+
+void
+peerConnClosed(CachePeer *p)
+{
+    --p->stats.conn_open;
+    if (p->standby.waitingForClose && peerCanOpenMore(p)) {
+        p->standby.waitingForClose = false;
+        PeerPoolMgr::Checkpoint(p->standby.mgr, "conn closed");
+    }
+}
+
 /* Return TRUE if it is okay to send an HTTP request to this CachePeer. */
 int
 peerHTTPOkay(const CachePeer * p, HttpRequest * request)
 {
-    if (p->max_conn)
-        if (p->stats.conn_open >= p->max_conn)
-            return 0;
+    if (!peerCanOpenMore(p) && !peerHasConnAvailable(p))
+        return 0;
 
     if (!peerAllowedToUse(p, request))
         return 0;
@@ -446,6 +481,8 @@ peerAlive(CachePeer *p)
         debugs(15, DBG_IMPORTANT, "Detected REVIVED " << neighborTypeStr(p) << ": " << p->name);
         p->stats.logged_state = PEER_ALIVE;
         peerClearRR();
+        if (p->standby.mgr.valid())
+            PeerPoolMgr::Checkpoint(p->standby.mgr, "revived peer");
     }
 
     p->stats.last_reply = squid_curtime;
@@ -1190,6 +1227,9 @@ peerNoteDigestGone(CachePeer * p)
 static void
 peerDNSConfigure(const ipcache_addrs *ia, const DnsLookupDetails &, void *data)
 {
+    // TODO: connections to no-longer valid IP addresses should be
+    // closed when we can detect such IP addresses.
+
     CachePeer *p = (CachePeer *)data;
 
     int j;
@@ -1234,6 +1274,8 @@ peerDNSConfigure(const ipcache_addrs *ia, const DnsLookupDetails &, void *data)
             eventAddIsh("netdbExchangeStart", netdbExchangeStart, p, 30.0, 1);
 #endif
 
+    if (p->standby.mgr.valid())
+        PeerPoolMgr::Checkpoint(p->standby.mgr, "resolved peer");
 }
 
 static void
@@ -1565,6 +1607,8 @@ dump_peer_options(StoreEntry * sentry, CachePeer * p)
 
     if (p->max_conn > 0)
         storeAppendPrintf(sentry, " max-conn=%d", p->max_conn);
+    if (p->standby.limit > 0)
+        storeAppendPrintf(sentry, " standby=%d", p->standby.limit);
 
     if (p->options.originserver)
         storeAppendPrintf(sentry, " originserver");
index 78c95f54edda0365af03690fed19ddb8e2fa3492..75ae56ac0eb8e4c87c37c548776a1c0650ac8a0d 100644 (file)
@@ -81,6 +81,13 @@ void peerConnectSucceded(CachePeer *);
 void dump_peer_options(StoreEntry *, CachePeer *);
 int peerHTTPOkay(const CachePeer *, HttpRequest *);
 
+/// Whether we can open new connections to the peer (e.g., despite max-conn)
+bool peerCanOpenMore(const CachePeer *p);
+/// Whether the peer has idle or standby connections that can be used now
+bool peerHasConnAvailable(const CachePeer *p);
+/// Notifies peer of an associated connection closure.
+void peerConnClosed(CachePeer *p);
+
 CachePeer *whichPeer(const Ip::Address &from);
 
 #endif /* SQUID_NEIGHBORS_H_ */
index e6b497f711c17f30b807e7dd0fd85cb377e4d0cd..0ae66630f46cf1eb471f5039da65ec44d01234c1 100644 (file)
  */
 
 #include "squid.h"
+#include "CachePeer.h"
 #include "comm.h"
 #include "comm/Connection.h"
 #include "fd.h"
 #include "fde.h"
 #include "globals.h"
 #include "mgr/Registration.h"
+#include "neighbors.h"
 #include "pconn.h"
+#include "PeerPoolMgr.h"
 #include "SquidConfig.h"
 #include "Store.h"
 
@@ -64,6 +67,11 @@ IdleConnList::~IdleConnList()
     if (parent_)
         parent_->unlinkList(this);
 
+    if (size_) {
+        parent_ = NULL; // prevent reentrant notifications and deletions
+        closeN(size_);
+    }
+
     delete[] theList_;
 
     xfree(hash.key);
@@ -292,6 +300,8 @@ IdleConnList::findAndClose(const Comm::ConnectionPointer &conn)
 {
     const int index = findIndexOf(conn);
     if (index >= 0) {
+        if (parent_)
+            parent_->notifyManager("idle conn closure");
         /* might delete this */
         removeAt(index);
         clearHandlers(conn);
@@ -366,7 +376,7 @@ PconnPool::dumpHash(StoreEntry *e) const
     hash_first(hid);
 
     int i = 0;
-    for (hash_link *walker = hid->next; walker; walker = hash_next(hid)) {
+    for (hash_link *walker = hash_next(hid); walker; walker = hash_next(hid)) {
         storeAppendPrintf(e, "\t item %d:\t%s\n", i, (char *)(walker->key));
         ++i;
     }
@@ -374,7 +384,9 @@ PconnPool::dumpHash(StoreEntry *e) const
 
 /* ========== PconnPool PUBLIC FUNCTIONS ============================================ */
 
-PconnPool::PconnPool(const char *aDescr) : table(NULL), descr(aDescr),
+PconnPool::PconnPool(const char *aDescr, const CbcPointer<PeerPoolMgr> &aMgr):
+        table(NULL), descr(aDescr),
+        mgr(aMgr),
         theCount(0)
 {
     int i;
@@ -386,10 +398,18 @@ PconnPool::PconnPool(const char *aDescr) : table(NULL), descr(aDescr),
     PconnModule::GetInstance()->add(this);
 }
 
+static void
+DeleteIdleConnList(void *hashItem)
+{
+    delete reinterpret_cast<IdleConnList*>(hashItem);
+}
+
 PconnPool::~PconnPool()
 {
-    descr = NULL;
+    PconnModule::GetInstance()->remove(this);
+    hashFreeItems(table, &DeleteIdleConnList);
     hashFreeMemory(table);
+    descr = NULL;
 }
 
 void
@@ -404,6 +424,7 @@ PconnPool::push(const Comm::ConnectionPointer &conn, const char *domain)
         debugs(48, 3, HERE << "Squid is shutting down. Refusing to do anything");
         return;
     }
+    // TODO: also close used pconns if we exceed peer max-conn limit
 
     const char *aKey = key(conn, domain);
     IdleConnList *list = (IdleConnList *) hash_lookup(table, aKey);
@@ -423,35 +444,64 @@ PconnPool::push(const Comm::ConnectionPointer &conn, const char *domain)
     snprintf(desc, FD_DESC_SZ, "Idle server: %s", aKey);
     fd_note(conn->fd, desc);
     debugs(48, 3, HERE << "pushed " << conn << " for " << aKey);
+
+    // successful push notifications resume multi-connection opening sequence
+    notifyManager("push");
 }
 
 Comm::ConnectionPointer
-PconnPool::pop(const Comm::ConnectionPointer &destLink, const char *domain, bool isRetriable)
+PconnPool::pop(const Comm::ConnectionPointer &dest, const char *domain, bool keepOpen)
 {
-    const char * aKey = key(destLink, domain);
+
+    const char * aKey = key(dest, domain);
 
     IdleConnList *list = (IdleConnList *)hash_lookup(table, aKey);
     if (list == NULL) {
         debugs(48, 3, HERE << "lookup for key {" << aKey << "} failed.");
+        // failure notifications resume standby conn creation after fdUsageHigh
+        notifyManager("pop failure");
         return Comm::ConnectionPointer();
     } else {
-        debugs(48, 3, HERE << "found " << hashKeyStr(&list->hash) << (isRetriable?"(to use)":"(to kill)") );
+        debugs(48, 3, HERE << "found " << hashKeyStr(&list->hash) <<
+               (keepOpen ? " to use" : " to kill"));
     }
 
     /* may delete list */
-    Comm::ConnectionPointer temp = list->findUseable(destLink);
-    if (!isRetriable && Comm::IsConnOpen(temp))
-        temp->close();
+    Comm::ConnectionPointer popped = list->findUseable(dest);
+    if (!keepOpen && Comm::IsConnOpen(popped))
+        popped->close();
+
+    // successful pop notifications replenish standby connections pool
+    notifyManager("pop");
+    return popped;
+}
 
-    return temp;
+void
+PconnPool::notifyManager(const char *reason)
+{
+    if (mgr.valid())
+        PeerPoolMgr::Checkpoint(mgr, reason);
 }
 
 void
-PconnPool::closeN(int n, const Comm::ConnectionPointer &destLink, const char *domain)
+PconnPool::closeN(int n)
 {
-    // TODO: optimize: we can probably do hash_lookup just once
-    for (int i = 0; i < n; ++i)
-        pop(destLink, domain, false); // may fail!
+    hash_table *hid = table;
+    hash_first(hid);
+
+    // close N connections, one per list, to treat all lists "fairly"
+    for (int i = 0; i < n && count(); ++i) {
+
+        hash_link *current = hash_next(hid);
+        if (!current) {
+            hash_first(hid);
+            current = hash_next(hid);
+            Must(current); // must have one because the count() was positive
+        }
+
+        // may delete current
+        reinterpret_cast<IdleConnList*>(current)->closeN(1);
+    }
 }
 
 void
@@ -477,11 +527,8 @@ PconnPool::noteUses(int uses)
  * This simple class exists only for the cache manager
  */
 
-PconnModule::PconnModule() : pools(NULL), poolCount(0)
+PconnModule::PconnModule(): pools()
 {
-    pools = (PconnPool **) xcalloc(MAX_NUM_PCONN_POOLS, sizeof(*pools));
-//TODO: re-link to MemPools. WAS:    pconn_fds_pool = memPoolCreate("pconn_fds", PCONN_FDS_SZ * sizeof(int));
-    debugs(48, DBG_CRITICAL, "persistent connection module initialized");
     registerWithCacheManager();
 }
 
@@ -505,21 +552,26 @@ PconnModule::registerWithCacheManager(void)
 void
 PconnModule::add(PconnPool *aPool)
 {
-    assert(poolCount < MAX_NUM_PCONN_POOLS);
-    *(pools+poolCount) = aPool;
-    ++poolCount;
+    pools.insert(aPool);
 }
 
 void
-PconnModule::dump(StoreEntry *e)
+PconnModule::remove(PconnPool *aPool)
 {
-    int i;
+    pools.erase(aPool);
+}
 
-    for (i = 0; i < poolCount; ++i) {
+void
+PconnModule::dump(StoreEntry *e)
+{
+    typedef Pools::const_iterator PCI;
+    int i = 0; // TODO: Why number pools if they all have names?
+    for (PCI p = pools.begin(); p != pools.end(); ++p, ++i) {
+        // TODO: Let each pool dump itself the way it wants to.
         storeAppendPrintf(e, "\n Pool %d Stats\n", i);
-        (*(pools+i))->dumpHist(e);
+        (*p)->dumpHist(e);
         storeAppendPrintf(e, "\n Pool %d Hash Table\n",i);
-        (*(pools+i))->dumpHash(e);
+        (*p)->dumpHash(e);
     }
 }
 
index ac474518e269337249e8ce6a8347fbbd8d26e22c..0301862f319cb0c2f96b1f7221ae1719fbdba544 100644 (file)
@@ -1,6 +1,9 @@
 #ifndef SQUID_PCONN_H
 #define SQUID_PCONN_H
 
+#include "base/CbcPointer.h"
+#include <set>
+
 /**
  \defgroup PConnAPI Persistent Connection API
  \ingroup Component
@@ -9,6 +12,7 @@
  */
 
 class PconnPool;
+class PeerPoolMgr;
 
 /* for CBDATA_CLASS2() macros */
 #include "cbdata.h"
@@ -17,9 +21,6 @@ class PconnPool;
 /* for IOCB */
 #include "comm.h"
 
-/// \ingroup PConnAPI
-#define MAX_NUM_PCONN_POOLS 10
-
 /// \ingroup PConnAPI
 #define PCONN_HIST_SZ (1<<16)
 
@@ -106,31 +107,38 @@ class PconnPool
 {
 
 public:
-    PconnPool(const char *);
+    PconnPool(const char *aDescription, const CbcPointer<PeerPoolMgr> &aMgr);
     ~PconnPool();
 
     void moduleInit();
     void push(const Comm::ConnectionPointer &serverConn, const char *domain);
 
     /**
-     * Updates destLink to point at an existing open connection if available and retriable.
-     * Otherwise, return false.
+     * Returns either a pointer to a popped connection to dest or nil.
+     * Closes the connection before returning its pointer unless keepOpen.
      *
-     * We close available persistent connection if the caller transaction is not
-     * retriable to avoid having a growing number of open connections when many
-     * transactions create persistent connections but are not retriable.
+     * A caller with a non-retriable transaction should set keepOpen to false
+     * and call pop() anyway, even though the caller does not want a pconn.
+     * This forces us to close an available persistent connection, avoiding
+     * creating a growing number of open connections when many transactions 
+     * create (and push) persistent connections but are not retriable and,
+     * hence, do not need to pop a connection.
      */
-    Comm::ConnectionPointer pop(const Comm::ConnectionPointer &destLink, const char *domain, bool retriable);
+    Comm::ConnectionPointer pop(const Comm::ConnectionPointer &dest, const char *domain, bool keepOpen);
     void count(int uses);
     void dumpHist(StoreEntry *e) const;
     void dumpHash(StoreEntry *e) const;
     void unlinkList(IdleConnList *list);
     void noteUses(int uses);
-    void closeN(int n, const Comm::ConnectionPointer &destLink, const char *domain);
+    /// closes any n connections, regardless of their destination
+    void closeN(int n);
     int count() const { return theCount; }
     void noteConnectionAdded() { ++theCount; }
     void noteConnectionRemoved() { assert(theCount > 0); --theCount; }
 
+    // sends an async message to the pool manager, if any
+    void notifyManager(const char *reason);
+
 private:
 
     static const char *key(const Comm::ConnectionPointer &destLink, const char *domain);
@@ -138,6 +146,7 @@ private:
     int hist[PCONN_HIST_SZ];
     hash_table *table;
     const char *descr;
+    CbcPointer<PeerPoolMgr> mgr; ///< optional pool manager (for notifications)
     int theCount; ///< the number of pooled connections
 };
 
@@ -162,15 +171,15 @@ public:
     void registerWithCacheManager(void);
 
     void add(PconnPool *);
+    void remove(PconnPool *); ///< unregister and forget about this pool object
 
     OBJH dump;
 
 private:
-    PconnPool **pools;
+    typedef std::set<PconnPool*> Pools; ///< unordered PconnPool collection
+    Pools pools; ///< all live pools
 
     static PconnModule * instance;
-
-    int poolCount;
 };
 
 #endif /* SQUID_PCONN_H */
diff --git a/src/tests/stub_neighbors.cc b/src/tests/stub_neighbors.cc
new file mode 100644 (file)
index 0000000..1fc471f
--- /dev/null
@@ -0,0 +1,9 @@
+#include "squid.h"
+
+#define STUB_API "neighbors.cc"
+#include "tests/STUB.h"
+
+#include "neighbors.h"
+
+void
+peerConnClosed(CachePeer *p) STUB
index 80e3db46fc4854a5323aa6f3b0c125f317edf705..53ba213a5565a571ca80b22e5484e94bd070a137 100644 (file)
@@ -13,7 +13,7 @@ IdleConnList::~IdleConnList() STUB
 void IdleConnList::push(const Comm::ConnectionPointer &conn) STUB
 Comm::ConnectionPointer IdleConnList::findUseable(const Comm::ConnectionPointer &key) STUB_RETVAL(Comm::ConnectionPointer())
 void IdleConnList::clearHandlers(const Comm::ConnectionPointer &conn) STUB
-PconnPool::PconnPool(const char *) STUB
+PconnPool::PconnPool(const char *, const CbcPointer<PeerPoolMgr>&) STUB
 PconnPool::~PconnPool() STUB
 void PconnPool::moduleInit() STUB
 void PconnPool::push(const Comm::ConnectionPointer &serverConn, const char *domain) STUB