]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Author: Alex Rousskov <rousskov@measurement-factory.com>
authorChristos Tsantilas <chtsanti@users.sourceforge.net>
Fri, 13 May 2011 10:38:28 +0000 (13:38 +0300)
committerChristos Tsantilas <chtsanti@users.sourceforge.net>
Fri, 13 May 2011 10:38:28 +0000 (13:38 +0300)
Author: Alexey Veselovsky <alexey.veselovsky@eykontech.com>
Author: Christos Tsantilas <chtsanti@users.sourceforge.net>
Bug 2055: Honor ICAP Max-Connections

This patch implements the phase 1 of the ICAP Max-Connections feature as it is
described in squid wiki:
  http://wiki.squid-cache.org/Features/ServiceOverload

The behaviour of the patch  can be configured using on_overload and max_conn
options of the icap_service configuration parameter. Squid can be configured
to do one of the following:
  - Block: send and HTTP error response to the subscriber
  - Bypass: ignore the "over-connected" ICAP service
  - Wait: wait (in a FIFO queue) for an ICAP connection slot
  - Force: proceed, ignoring the Max-Connections limit

Squid warns the first time the service become overloaded

For more information please visit the feature wiki page given above.

Technical informations:

The patch starts count a connections to the ICAP server as active when the
ModXact class receives an FD even if the fd is not really connected to the
server yet, and decrease the active connections to the server when the ModXact
object releases its fd connection.

If the Max-Connection limit is reached squid puts the request to a waiters list.
When one or more connections released squid schedules one or more waiters for
execution and remove them from waiters list.
To handle cases where a waiter gone/canceled before its execution the custom
dialer ConnWaiterDialer used.

The Options connections counted as active connections but are not limited by
the Max-Connections limit. An Option request will be executed even if the
maximum connections number is reached.

This is a Measurement Factory project

14 files changed:
src/adaptation/Elements.h
src/adaptation/ServiceConfig.cc
src/adaptation/ServiceConfig.h
src/adaptation/icap/ModXact.cc
src/adaptation/icap/ModXact.h
src/adaptation/icap/Options.cc
src/adaptation/icap/ServiceRep.cc
src/adaptation/icap/ServiceRep.h
src/adaptation/icap/Xaction.cc
src/adaptation/icap/Xaction.h
src/cf.data.pre
src/comm.cc
src/pconn.cc
src/pconn.h

index 1dbd089d8d46c7529ca4b6c5ff00fce3b4533a78..8bd39549bc24ce34c0653dd257fa5d9729640cb9 100644 (file)
@@ -8,6 +8,7 @@ namespace Adaptation
 
 typedef enum { methodNone, methodReqmod, methodRespmod, methodOptions } Method;
 typedef enum { pointNone, pointPreCache, pointPostCache } VectPoint;
+typedef enum { srvBlock, srvBypass, srvWait, srvForce} SrvBehaviour;
 
 extern const char *crlf;
 extern const char *methodStr(Method); // TODO: make into a stream operator?
index 44f1a8b43618c286207bc80b95cc620be4c4922d..6d0b8d49abc48a15bf9f14ffd72820a41a160ce8 100644 (file)
@@ -9,7 +9,8 @@
 
 Adaptation::ServiceConfig::ServiceConfig():
         port(-1), method(methodNone), point(pointNone),
-        bypass(false), routing(false), ipv6(false)
+        bypass(false), maxConn(-1), onOverload(srvWait),
+        routing(false), ipv6(false)
 {}
 
 const char *
@@ -70,6 +71,7 @@ Adaptation::ServiceConfig::parse()
     // handle optional service name=value parameters
     const char *lastOption = NULL;
     bool grokkedUri = false;
+    bool onOverloadSet = false;
     while (char *option = strtok(NULL, w_space)) {
         if (strcmp(option, "0") == 0) { // backward compatibility
             bypass = false;
@@ -91,9 +93,9 @@ Adaptation::ServiceConfig::parse()
 
         // TODO: warn if option is set twice?
         bool grokked = false;
-        if (strcmp(name, "bypass") == 0)
+        if (strcmp(name, "bypass") == 0) {
             grokked = grokBool(bypass, name, value);
-        else if (strcmp(name, "routing") == 0)
+        else if (strcmp(name, "routing") == 0)
             grokked = grokBool(routing, name, value);
         else if (strcmp(name, "uri") == 0)
             grokked = grokkedUri = grokUri(value);
@@ -101,6 +103,11 @@ Adaptation::ServiceConfig::parse()
             grokked = grokBool(ipv6, name, value);
             if (grokked && ipv6 && !Ip::EnableIpv6)
                 debugs(3, DBG_IMPORTANT, "WARNING: IPv6 is disabled. ICAP service option ignored.");
+        } else if (strcmp(name, "max-conn") == 0)
+            grokked = grokLong(maxConn, name, value);
+        else if (strcmp(name, "on-overload") == 0) {
+            grokked = grokOnOverload(onOverload, value);
+            onOverloadSet = true;
         } else
             grokked = grokExtension(name, value);
 
@@ -108,6 +115,10 @@ Adaptation::ServiceConfig::parse()
             return false;
     }
 
+    // set default on-overload value if needed
+    if (!onOverloadSet)
+        onOverload = bypass ? srvBypass : srvWait;
+
     // what is left must be the service URI
     if (!grokkedUri && !grokUri(lastOption))
         return false;
@@ -246,6 +257,41 @@ Adaptation::ServiceConfig::grokBool(bool &var, const char *name, const char *val
     return true;
 }
 
+bool
+Adaptation::ServiceConfig::grokLong(long &var, const char *name, const char *value)
+{
+    char *bad = NULL;
+    const long p = strtol(value, &bad, 0);
+    if (p < 0 || bad == value) {
+        debugs(3, DBG_CRITICAL, "ERROR: " << cfg_filename << ':' <<
+               config_lineno << ": " << "wrong value for " << name << "; " <<
+               "a non-negative integer expected but got: " << value);
+        return false;
+    }
+    var = p;
+    return true;
+}
+
+bool
+Adaptation::ServiceConfig::grokOnOverload(SrvBehaviour &var, const char *value)
+{
+    if (strcmp(value, "block") == 0)
+        var = srvBlock;
+    else if (strcmp(value, "bypass") == 0)
+        var = srvBypass;
+    else if (strcmp(value, "wait") == 0)
+        var = srvWait;
+    else if (strcmp(value, "force") == 0)
+        var = srvForce;
+    else {
+        debugs(3, DBG_CRITICAL, "ERROR: " << cfg_filename << ':' <<
+               config_lineno << ": " << "wrong value for on-overload; " <<
+               "'block', 'bypass', 'wait' or 'force' expected but got: " << value);
+        return false;
+    }
+    return true;
+}
+
 bool
 Adaptation::ServiceConfig::grokExtension(const char *name, const char *value)
 {
index 9adbf37d0fb67bc9e5ef27db75b6eb5dd89b7cbf..dc73a59d12c3ce56ceccfe33bc326daf3ddd363d 100644 (file)
@@ -32,6 +32,10 @@ public:
     Method method;   // what is being adapted (REQMOD vs RESPMOD)
     VectPoint point; // where the adaptation happens (pre- or post-cache)
     bool bypass;
+
+    // options
+    long maxConn; ///< maximum number of concurrent service transactions
+    SrvBehaviour onOverload; ///< how to handle Max-Connections feature
     bool routing; ///< whether this service may determine the next service(s)
     bool ipv6;    ///< whether this service uses IPv6 transport (default IPv4)
 
@@ -42,6 +46,9 @@ protected:
     /// interpret parsed values
     bool grokBool(bool &var, const char *name, const char *value);
     bool grokUri(const char *value);
+    bool grokLong(long &var, const char *name, const char *value);
+    /// handle on-overload configuration option
+    bool grokOnOverload(SrvBehaviour &var, const char *value);
     /// handle name=value configuration option with name unknown to Squid
     virtual bool grokExtension(const char *name, const char *value);
 };
index cbed78962c77826630f2002c58cb75b2bec46ed1..c7e55cd9d14a41876fcdd18548bfdb7f158cdcf4 100644 (file)
@@ -86,8 +86,8 @@ void Adaptation::Icap::ModXact::start()
     canStartBypass = service().cfg().bypass;
 
     // it is an ICAP violation to send request to a service w/o known OPTIONS
-
-    if (service().up())
+    // and the service may is too busy for us: honor Max-Connections and such
+    if (service().up() && service().availableForNew())
         startWriting();
     else
         waitForService();
@@ -95,13 +95,44 @@ void Adaptation::Icap::ModXact::start()
 
 void Adaptation::Icap::ModXact::waitForService()
 {
+    const char *comment;
     Must(!state.serviceWaiting);
-    debugs(93, 7, HERE << "will wait for the ICAP service" << status());
-    typedef NullaryMemFunT<ModXact> Dialer;
-    AsyncCall::Pointer call = JobCallback(93,5,
-                                          Dialer, this, Adaptation::Icap::ModXact::noteServiceReady);
-    service().callWhenReady(call);
+
+    if (!service().up()) {
+        AsyncCall::Pointer call = JobCallback(93,5,
+                                              ConnWaiterDialer, this, Adaptation::Icap::ModXact::noteServiceReady);
+
+        service().callWhenReady(call);
+        comment = "to be up";
+    } else {
+        //The service is unavailable because of max-connection or other reason
+
+        if (service().cfg().onOverload != srvWait) {
+            // The service is overloaded, but waiting to be available prohibited by
+            // user configuration (onOverload is set to "block" or "bypass")
+            if (service().cfg().onOverload == srvBlock)
+                disableBypass("not available", true);
+            else //if (service().cfg().onOverload == srvBypass)
+                canStartBypass = true;
+
+            disableRetries();
+            disableRepeats("ICAP service is not available");
+
+            debugs(93, 7, HERE << "will not wait for the service to be available" <<
+                   status());
+
+            throw TexcHere("ICAP service is not available");
+        }
+
+        AsyncCall::Pointer call = JobCallback(93,5,
+                                              ConnWaiterDialer, this, Adaptation::Icap::ModXact::noteServiceAvailable);
+        service().callWhenAvailable(call, state.waitedForService);
+        comment = "to be available";
+    }
+
+    debugs(93, 7, HERE << "will wait for the service " << comment <<  status());
     state.serviceWaiting = true; // after callWhenReady() which may throw
+    state.waitedForService = true;
 }
 
 void Adaptation::Icap::ModXact::noteServiceReady()
@@ -109,13 +140,27 @@ void Adaptation::Icap::ModXact::noteServiceReady()
     Must(state.serviceWaiting);
     state.serviceWaiting = false;
 
-    if (service().up()) {
-        startWriting();
-    } else {
+    if (!service().up()) {
         disableRetries();
         disableRepeats("ICAP service is unusable");
         throw TexcHere("ICAP service is unusable");
     }
+
+    if (service().availableForOld())
+        startWriting();
+    else
+        waitForService();
+}
+
+void Adaptation::Icap::ModXact::noteServiceAvailable()
+{
+    Must(state.serviceWaiting);
+    state.serviceWaiting = false;
+
+    if (service().up() && service().availableForOld())
+        startWriting();
+    else
+        waitForService();
 }
 
 void Adaptation::Icap::ModXact::startWriting()
index f9117c98a965a5518cd9112563628b4e571e950e..8fb9423c2c6ad6a965e02901ac6c2b2974eba5a6 100644 (file)
@@ -157,6 +157,7 @@ public:
 
     // service waiting
     void noteServiceReady();
+    void noteServiceAvailable();
 
 public:
     InOut virgin;
@@ -303,6 +304,7 @@ private:
         bool allowedPostview206; // must handle 206 Partial Content outside preview
         bool allowedPreview206; // must handle 206 Partial Content inside preview
         bool readyForUob; ///< got a 206 response and expect a use-origin-body
+        bool waitedForService; ///< true if was queued at least once
 
         // will not write anything [else] to the ICAP server connection
         bool doneWriting() const { return writing == writingReallyDone; }
index a0392a6d22cbe83058a818472653d9562ea17f66..6a626e84ff7abe432b45a5be9d400ec2726b78ef 100644 (file)
@@ -98,6 +98,8 @@ void Adaptation::Icap::Options::configure(const HttpReply *reply)
     }
 
     cfgIntHeader(h, "Max-Connections", max_connections);
+    if (max_connections == 0)
+        debugs(93, DBG_IMPORTANT, "WARNING: Max-Connections is set to zero! ");
 
     cfgIntHeader(h, "Options-TTL", theTTL);
 
index 9d2f90f7434f535356f379420f1c4103bd0b7978..b5d17addb49754dc7d9f9e57252f0084d919f508 100644 (file)
 #include "adaptation/icap/ServiceRep.h"
 #include "base/TextException.h"
 #include "ConfigParser.h"
+#include "ip/tools.h"
 #include "HttpReply.h"
 #include "SquidTime.h"
+#include "fde.h"
 
 CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Icap, ServiceRep);
 
 Adaptation::Icap::ServiceRep::ServiceRep(const ServiceConfigPointer &svcCfg):
         AsyncJob("Adaptation::Icap::ServiceRep"), Adaptation::Service(svcCfg),
         theOptions(NULL), theOptionsFetcher(0), theLastUpdate(0),
+        theBusyConns(0),
+        theAllWaiters(0),
+        connOverloadReported(false),
+        theIdleConns("ICAP Service"),
         isSuspended(0), notifying(false),
         updateScheduled(false),
         wasAnnouncedUp(true), // do not announce an "up" service at startup
         isDetached(false)
-{}
+{
+    setMaxConnections();
+}
 
 Adaptation::Icap::ServiceRep::~ServiceRep()
 {
@@ -72,6 +80,157 @@ void Adaptation::Icap::ServiceRep::noteFailure()
     // should be configurable.
 }
 
+// returns a persistent or brand new connection; negative int on failures
+int Adaptation::Icap::ServiceRep::getConnection(bool retriableXact, bool &reused)
+{
+    Ip::Address client_addr;
+
+    int connection = theIdleConns.pop(cfg().host.termedBuf(), cfg().port, NULL, client_addr,
+                                      retriableXact);
+
+    reused = connection >= 0; // reused a persistent connection
+
+    if (!reused) { // need a new connection
+        Ip::Address outgoing;  // default: IP6_ANY_ADDR
+        if (!Ip::EnableIpv6)
+            outgoing.SetIPv4();
+        else if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK &&  !cfg().ipv6) {
+            /* split-stack for now requires default IPv4-only socket */
+            outgoing.SetIPv4();
+        }
+        connection = comm_open(SOCK_STREAM, 0, outgoing, COMM_NONBLOCKING, cfg().uri.termedBuf());
+    }
+
+    if (connection >= 0)
+        ++theBusyConns;
+
+    return connection;
+}
+
+// pools connection if it is reusable or closes it
+void Adaptation::Icap::ServiceRep::putConnection(int fd, bool isReusable, const char *comment)
+{
+    Must(fd >= 0);
+    // do not pool an idle connection if we owe connections
+    if (isReusable && excessConnections() == 0) {
+        debugs(93, 3, HERE << "pushing pconn" << comment);
+        commSetTimeout(fd, -1, NULL, NULL);
+        Ip::Address anyAddr;
+        theIdleConns.push(fd, cfg().host.termedBuf(), cfg().port, NULL, anyAddr);
+    } else {
+        debugs(93, 3, HERE << "closing pconn" << comment);
+        // comm_close will clear timeout
+        comm_close(fd);
+    }
+
+    Must(theBusyConns > 0);
+    --theBusyConns;
+    // a connection slot released. Check if there are waiters....
+    busyCheckpoint();
+}
+
+// a wrapper to avoid exposing theIdleConns
+void Adaptation::Icap::ServiceRep::noteConnectionUse(int fd)
+{
+    Must(fd >= 0);
+    fd_table[fd].noteUse(&theIdleConns);
+}
+
+void Adaptation::Icap::ServiceRep::setMaxConnections()
+{
+    if (cfg().maxConn >= 0)
+        theMaxConnections = cfg().maxConn;
+    else if (theOptions && theOptions->max_connections >= 0)
+        theMaxConnections = theOptions->max_connections;
+    else {
+        theMaxConnections = -1;
+        return;
+    }
+
+    if (::Config.workers > 1 )
+        theMaxConnections /= ::Config.workers;
+}
+
+int Adaptation::Icap::ServiceRep::availableConnections() const
+{
+    if (theMaxConnections < 0)
+        return -1;
+
+    // we are available if we can open or reuse connections
+    // in other words, if we will not create debt
+    int available = max(0, theMaxConnections - theBusyConns);
+
+    if (!available && !connOverloadReported) {
+        debugs(93, DBG_IMPORTANT, "WARNING: ICAP Max-Connections limit " <<
+               "exceeded for service " << cfg().uri << ". Open connections now: " <<
+               theBusyConns + theIdleConns.count() << ", including " <<
+               theIdleConns.count() << " idle persistent connections.");
+        connOverloadReported = true;
+    }
+
+    if (cfg().onOverload == srvForce)
+        return -1;
+
+    return available;
+}
+
+// The number of connections which excess the Max-Connections limit
+int Adaptation::Icap::ServiceRep::excessConnections() const
+{
+    if (theMaxConnections < 0)
+        return 0;
+
+    // Waiters affect the number of needed connections but a needed
+    // connection may still be excessive from Max-Connections p.o.v.
+    // so we should not account for waiting transaction needs here.
+    const int debt =  theBusyConns + theIdleConns.count() - theMaxConnections;
+    if (debt > 0)
+        return debt;
+    else
+        return 0;
+}
+
+void Adaptation::Icap::ServiceRep::noteGoneWaiter()
+{
+    theAllWaiters--;
+
+    // in case the notified transaction did not take the connection slot
+    busyCheckpoint();
+}
+
+// called when a connection slot may become available
+void Adaptation::Icap::ServiceRep::busyCheckpoint()
+{
+    if (theNotificationWaiters.empty()) // nobody is waiting for a slot
+        return;
+
+    int freed = 0;
+    int available = availableConnections();
+
+    if (available < 0) {
+        // It is possible to have waiters when no limit on connections exist in
+        // case of reconfigure or because new Options received.
+        // In this case, notify all waiting transactions.
+        freed  = theNotificationWaiters.size();
+    } else {
+        // avoid notifying more waiters than there will be available slots
+        const int notifiedWaiters = theAllWaiters - theNotificationWaiters.size();
+        freed = available - notifiedWaiters;
+    }
+
+    debugs(93,7, HERE << "Available connections: " << available <<
+           " freed slots: " << freed <<
+           " waiting in queue: " << theNotificationWaiters.size());
+
+    while (freed > 0 && !theNotificationWaiters.empty()) {
+        Client i = theNotificationWaiters.front();
+        theNotificationWaiters.pop_front();
+        ScheduleCallHere(i.callback);
+        i.callback = NULL;
+        --freed;
+    }
+}
+
 void Adaptation::Icap::ServiceRep::suspend(const char *reason)
 {
     if (isSuspended) {
@@ -99,6 +258,25 @@ bool Adaptation::Icap::ServiceRep::up() const
     return !isSuspended && hasOptions();
 }
 
+bool Adaptation::Icap::ServiceRep::availableForNew() const
+{
+    Must(up());
+    int available = availableConnections();
+    if (available < 0)
+        return true;
+    else
+        return (available - theAllWaiters > 0);
+}
+
+bool Adaptation::Icap::ServiceRep::availableForOld() const
+{
+    Must(up());
+
+    int available = availableConnections();
+    return (available != 0); // it is -1 (no limit) or has available slots
+}
+
+
 bool Adaptation::Icap::ServiceRep::wantsUrl(const String &urlPath) const
 {
     Must(hasOptions());
@@ -187,6 +365,24 @@ void Adaptation::Icap::ServiceRep::noteTimeToNotify()
     notifying = false;
 }
 
+void Adaptation::Icap::ServiceRep::callWhenAvailable(AsyncCall::Pointer &cb, bool priority)
+{
+    debugs(93,8, "ICAPServiceRep::callWhenAvailable");
+    Must(cb!=NULL);
+    Must(up());
+    Must(!theIdleConns.count()); // or we should not be waiting
+
+    Client i;
+    i.service = Pointer(this);
+    i.callback = cb;
+    if (priority)
+        theNotificationWaiters.push_front(i);
+    else
+        theNotificationWaiters.push_back(i);
+
+    busyCheckpoint();
+}
+
 void Adaptation::Icap::ServiceRep::callWhenReady(AsyncCall::Pointer &cb)
 {
     Must(cb!=NULL);
@@ -351,6 +547,17 @@ void Adaptation::Icap::ServiceRep::handleNewOptions(Adaptation::Icap::Options *n
     debugs(93,3, HERE << "got new options and is now " << status());
 
     scheduleUpdate(optionsFetchTime());
+
+    setMaxConnections();
+    const int excess = excessConnections();
+    // if we owe connections and have idle pconns, close the latter
+    if (excess && theIdleConns.count() > 0) {
+        const int n = min(excess, theIdleConns.count());
+        debugs(93,5, HERE << "closing " << n << " pconns to relief debt");
+        Ip::Address anyAddr;
+        theIdleConns.closeN(n, cfg().host.termedBuf(), cfg().port, NULL, anyAddr);
+    }
+
     scheduleNotification();
 }
 
@@ -486,3 +693,22 @@ bool Adaptation::Icap::ServiceRep::detached() const
 {
     return isDetached;
 }
+
+Adaptation::Icap::ConnWaiterDialer::ConnWaiterDialer(const CbcPointer<ModXact> &xact,
+        Adaptation::Icap::ConnWaiterDialer::Parent::Method aHandler):
+        Parent(xact, aHandler)
+{
+    theService = &xact->service();
+    theService->noteNewWaiter();
+}
+
+Adaptation::Icap::ConnWaiterDialer::ConnWaiterDialer(const Adaptation::Icap::ConnWaiterDialer &aConnWaiter): Parent(aConnWaiter)
+{
+    theService = aConnWaiter.theService;
+    theService->noteNewWaiter();
+}
+
+Adaptation::Icap::ConnWaiterDialer::~ConnWaiterDialer()
+{
+    theService->noteGoneWaiter();
+}
index bbf0f9178d01c847bb664193ccb22b1c10b45ff2..27caf2f24bd77002b9eea8c1984a598905608ed4 100644 (file)
 #include "adaptation/forward.h"
 #include "adaptation/Initiator.h"
 #include "adaptation/icap/Elements.h"
-
+#include "base/AsyncJobCalls.h"
+#include "comm.h"
+#include "pconn.h"
+#include <deque>
 
 namespace Adaptation
 {
@@ -94,9 +97,12 @@ public:
 
     virtual bool probed() const; // see comments above
     virtual bool up() const; // see comments above
+    bool availableForNew() const; ///< a new transaction may start communicating with the service
+    bool availableForOld() const; ///< a transaction notified about connection slot availability may start communicating with the service
 
     virtual Initiate *makeXactLauncher(HttpMsg *virginHeader, HttpRequest *virginCause);
 
+    void callWhenAvailable(AsyncCall::Pointer &cb, bool priority = false);
     void callWhenReady(AsyncCall::Pointer &cb);
 
     // the methods below can only be called on an up() service
@@ -104,9 +110,16 @@ public:
     bool wantsPreview(const String &urlPath, size_t &wantedSize) const;
     bool allows204() const;
     bool allows206() const;
+    int getConnection(bool isRetriable, bool &isReused);
+    void putConnection(int fd, bool isReusable, const char *comment);
+    void noteConnectionUse(int fd);
 
     void noteFailure(); // called by transactions to report service failure
 
+    void noteNewWaiter() {theAllWaiters++;} ///< New xaction waiting for service to be up or available
+    void noteGoneWaiter(); ///< An xaction is not waiting any more for service to be available
+    bool existWaiters() const {return (theAllWaiters > 0);} ///< if there are xactions waiting for the service to be available
+
     //AsyncJob virtual methods
     virtual bool doneAll() const { return Adaptation::Initiator::doneAll() && false;}
     virtual void callException(const std::exception &e);
@@ -130,12 +143,25 @@ private:
     };
 
     typedef Vector<Client> Clients;
+    // TODO: rename to theUpWaiters
     Clients theClients; // all clients waiting for a call back
 
     Options *theOptions;
     CbcPointer<Adaptation::Initiate> theOptionsFetcher; // pending ICAP OPTIONS transaction
     time_t theLastUpdate; // time the options were last updated
 
+    /// FIFO queue of xactions waiting for a connection slot and not yet notified
+    /// about it; xaction is removed when notification is scheduled
+    std::deque<Client> theNotificationWaiters;
+    int theBusyConns;   ///< number of connections given to active transactions
+    /// number of xactions waiting for a connection slot (notified and not)
+    /// the number is decreased after the xaction receives notification
+    int theAllWaiters;
+    int theMaxConnections; ///< the maximum allowed connections to the service
+    // TODO: use a better type like the FadingCounter for connOverloadReported
+    mutable bool connOverloadReported; ///< whether we reported exceeding theMaxConnections
+    PconnPool theIdleConns; ///< idle persistent connection pool
+
     FadingCounter theSessionFailures;
     const char *isSuspended; // also stores suspension reason for debugging
 
@@ -162,6 +188,21 @@ private:
 
     void announceStatusChange(const char *downPhrase, bool important) const;
 
+    /// Set the maximum allowed connections for the service
+    void setMaxConnections();
+    /// The number of connections which excess the Max-Connections limit
+    int excessConnections() const;
+    /**
+     * The available connections slots to the ICAP server
+     \return the available slots, or -1 if there is no limit on allowed connections
+     */
+    int availableConnections() const;
+    /**
+     * If there are xactions waiting for the service to be available, notify
+     * as many xactions as the available connections slots.
+     */
+    void busyCheckpoint();
+
     const char *status() const;
 
     mutable bool wasAnnouncedUp; // prevent sequential same-state announcements
@@ -169,6 +210,18 @@ private:
     CBDATA_CLASS2(ServiceRep);
 };
 
+class ModXact;
+/// Custom dialer to call Service::noteNewWaiter and noteGoneWaiter
+/// to maintain Service idea of waiting and being-notified transactions.
+class ConnWaiterDialer: public NullaryMemFunT<ModXact>
+{
+public:
+    typedef NullaryMemFunT<ModXact> Parent;
+    ServiceRep::Pointer theService;
+    ConnWaiterDialer(const CbcPointer<ModXact> &xact, Parent::Method aHandler);
+    ConnWaiterDialer(const Adaptation::Icap::ConnWaiterDialer &aConnWaiter);
+    ~ConnWaiterDialer();
+};
 
 } // namespace Icap
 } // namespace Adaptation
index 7bd9b31f6b5c10541682548d239b772deb48535f..4c39db104e0b2d57e75cc7658aa3ecbe40b57f85 100644 (file)
@@ -21,8 +21,6 @@
 #include "SquidTime.h"
 #include "err_detail_type.h"
 
-static PconnPool *icapPconnPool = new PconnPool("ICAP Servers");
-
 
 //CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Icap, Xaction);
 
@@ -90,19 +88,24 @@ void Adaptation::Icap::Xaction::start()
 // TODO: obey service-specific, OPTIONS-reported connection limit
 void Adaptation::Icap::Xaction::openConnection()
 {
-    Ip::Address client_addr;
-
     Must(connection < 0);
 
-    const Adaptation::Service &s = service();
+    Adaptation::Icap::ServiceRep &s = service();
 
     if (!TheConfig.reuse_connections)
         disableRetries(); // this will also safely drain pconn pool
 
-    // TODO: check whether NULL domain is appropriate here
-    connection = icapPconnPool->pop(s.cfg().host.termedBuf(), s.cfg().port, NULL, client_addr, isRetriable);
-    if (connection >= 0) {
-        debugs(93,3, HERE << "reused pconn FD " << connection);
+    bool wasReused = false;
+    connection = s.getConnection(isRetriable, wasReused);
+    if (connection < 0)
+        dieOnConnectionFailure(); // throws
+
+    if (wasReused) {
+        // Set comm Close handler
+        typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommCloseCbParams> CloseDialer;
+        closer =  asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed",
+                            CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed));
+        comm_add_close_handler(connection, closer);
 
         // fake the connect callback
         // TODO: can we sync call Adaptation::Icap::Xaction::noteCommConnected here instead?
@@ -119,39 +122,25 @@ void Adaptation::Icap::Xaction::openConnection()
 
     disableRetries(); // we only retry pconn failures
 
-    Ip::Address outgoing;
-    if (!Ip::EnableIpv6 && !outgoing.SetIPv4()) {
-        debugs(31, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << outgoing << " is not an IPv4 address.");
-        dieOnConnectionFailure(); // throws
-    }
-    /* split-stack for now requires default IPv4-only socket */
-    if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && outgoing.IsAnyAddr() && !s.cfg().ipv6) {
-        outgoing.SetIPv4();
-    }
-
-    connection = comm_open(SOCK_STREAM, 0, outgoing,
-                           COMM_NONBLOCKING, s.cfg().uri.termedBuf());
 
-    if (connection < 0)
-        dieOnConnectionFailure(); // throws
-
-    debugs(93,3, typeName << " opens connection to " << s.cfg().host << ":" << s.cfg().port);
+    debugs(93,3, typeName << " opens connection to " << s.cfg().host.termedBuf() << ":" << s.cfg().port);
 
     // TODO: service bypass status may differ from that of a transaction
     typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommTimeoutCbParams> TimeoutDialer;
-    AsyncCall::Pointer timeoutCall = JobCallback(93, 5,
-                                     TimeoutDialer, this, Adaptation::Icap::Xaction::noteCommTimedout);
+    AsyncCall::Pointer timeoutCall =  asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommTimedout",
+                                      TimeoutDialer(this,&Adaptation::Icap::Xaction::noteCommTimedout));
+
     commSetTimeout(connection, TheConfig.connect_timeout(
                        service().cfg().bypass), timeoutCall);
 
     typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommCloseCbParams> CloseDialer;
-    closer = JobCallback(93, 5,
-                         CloseDialer, this, Adaptation::Icap::Xaction::noteCommClosed);
+    closer =  asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed",
+                        CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed));
     comm_add_close_handler(connection, closer);
 
     typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> ConnectDialer;
-    connector = JobCallback(93,3,
-                            ConnectDialer, this, Adaptation::Icap::Xaction::noteCommConnected);
+    connector = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected",
+                          ConnectDialer(this, &Adaptation::Icap::Xaction::noteCommConnected));
     commConnectStart(connection, s.cfg().host.termedBuf(), s.cfg().port, connector);
 }
 
@@ -186,21 +175,11 @@ void Adaptation::Icap::Xaction::closeConnection()
             reuseConnection = false;
         }
 
-        if (reuseConnection) {
-            Ip::Address client_addr;
-            //status() adds leading spaces.
-            debugs(93,3, HERE << "pushing pconn" << status());
-            AsyncCall::Pointer call = NULL;
-            commSetTimeout(connection, -1, call);
-            icapPconnPool->push(connection, theService->cfg().host.termedBuf(),
-                                theService->cfg().port, NULL, client_addr);
+        if (reuseConnection)
             disableRetries();
-        } else {
-            //status() adds leading spaces.
-            debugs(93,3, HERE << "closing pconn" << status());
-            // comm_close will clear timeout
-            comm_close(connection);
-        }
+
+        Adaptation::Icap::ServiceRep &s = service();
+        s.putConnection(connection, reuseConnection, status());
 
         writer = NULL;
         reader = NULL;
@@ -218,7 +197,7 @@ void Adaptation::Icap::Xaction::noteCommConnected(const CommConnectCbParams &io)
     if (io.flag != COMM_OK)
         dieOnConnectionFailure(); // throws
 
-    fd_table[connection].noteUse(icapPconnPool);
+    service().noteConnectionUse(connection);
 
     handleCommConnected();
 }
index b22cbbe4b8219b10c36af8ca0a03ddd744545a5d..c912891a63ba5eb7d92a9e56ff5e3cacc044b555 100644 (file)
@@ -138,6 +138,7 @@ protected:
     void setOutcome(const XactOutcome &xo);
     virtual void finalizeLogInfo();
 
+public:
     ServiceRep &service();
 
 private:
index 919379ad837467d34a665aafcecca30042efe0de..72351a644833d69272ef1eee9f13ac5ec98495ac 100644 (file)
@@ -6546,6 +6546,26 @@ DOC_START
                is to use IPv4-only connections. When set to 'on' this option will
                make Squid use IPv6-only connections to contact this ICAP service.
 
+       on-overload=block|bypass|wait|force
+               If the service Max-Connections limit has been reached, do
+               one of the following for each new ICAP transaction:
+                 * block:  send an HTTP error response to the client
+                 * bypass: ignore the "over-connected" ICAP service
+                 * wait:   wait (in a FIFO queue) for an ICAP connection slot
+                 * force:  proceed, ignoring the Max-Connections limit 
+
+               In SMP mode with N workers, each worker assumes the service
+               connection limit is Max-Connections/N, even though not all
+               workers may use a given service.
+
+               The default value is "bypass" if service is bypassable,
+               otherwise it is set to "wait".
+               
+
+       max-conn=number
+               Use the given number as the Max-Connections limit, regardless
+               of the Max-Connections value given by the service, if any.
+
        Older icap_service format without optional named parameters is
        deprecated but supported for backward compatibility.
 
index b92dc8e92f1808174cafc85d1594641af30d124d..d8227b479a18502c77af602f7ab68dee97ef914d 100644 (file)
@@ -1490,7 +1490,7 @@ _comm_close(int fd, char const *file, int line)
     commCallCloseHandlers(fd);
 
     if (F->pconn.uses)
-        F->pconn.pool->count(F->pconn.uses);
+        F->pconn.pool->noteUses(F->pconn.uses);
 
     comm_empty_os_read_buffers(fd);
 
index 99bf06a9e9a9147b6cee6ef12665afee69245735..be9fe30c3c0b846ebbbbd209871224b22dbd8c16 100644 (file)
@@ -94,6 +94,9 @@ IdleConnList::removeFD(int fd)
     for (; index < nfds - 1; index++)
         fds[index] = fds[index + 1];
 
+    if (parent)
+        parent->noteConnectionRemoved();
+
     if (--nfds == 0) {
         debugs(48, 3, "IdleConnList::removeFD: deleting " << hashKeyStr(&hash));
         delete this;
@@ -123,6 +126,9 @@ IdleConnList::push(int fd)
             xfree(old);
     }
 
+    if (parent)
+        parent->noteConnectionAdded();
+
     fds[nfds++] = fd;
     comm_read(fd, fakeReadBuf, sizeof(fakeReadBuf), IdleConnList::read, this);
     commSetTimeout(fd, Config.Timeout.pconn, IdleConnList::timeout, this);
@@ -230,7 +236,8 @@ PconnPool::dumpHash(StoreEntry *e)
 
 /* ========== PconnPool PUBLIC FUNCTIONS ============================================ */
 
-PconnPool::PconnPool(const char *aDescr) : table(NULL), descr(aDescr)
+PconnPool::PconnPool(const char *aDescr) : table(NULL), descr(aDescr),
+        theCount(0)
 {
     int i;
     table = hash_create((HASHCMP *) strcmp, 229, hash_string);
@@ -291,6 +298,7 @@ PconnPool::push(int fd, const char *host, u_short port, const char *domain, Ip::
  * We close available persistent connection if the caller transaction is not
  * retriable to avoid having a growing number of open connections when many
  * transactions create persistent connections but are not retriable.
+ * PconnPool::closeN() relies on that behavior as well.
  */
 int
 PconnPool::pop(const char *host, u_short port, const char *domain, Ip::Address &client_address, bool isRetriable)
@@ -321,13 +329,23 @@ PconnPool::pop(const char *host, u_short port, const char *domain, Ip::Address &
 }
 
 void
-PconnPool::unlinkList(IdleConnList *list) const
+PconnPool::closeN(int n, const char *host, u_short port, const char *domain, Ip::Address &client_address)
+{
+    // TODO: optimize: we can probably do hash_lookup just once
+    for (int i = 0; i < n; ++i)
+        pop(host, port, domain, client_address, false); // may fail!
+}
+
+void
+PconnPool::unlinkList(IdleConnList *list)
 {
+    theCount -= list->count();
+    assert(theCount >= 0);
     hash_remove_link(table, &list->hash);
 }
 
 void
-PconnPool::count(int uses)
+PconnPool::noteUses(int uses)
 {
     if (uses >= PCONN_HIST_SZ)
         uses = PCONN_HIST_SZ - 1;
index e874e22c7a93cbeff4cea075cf31fb0582976987..6eada4fdbd0e686774cbe123a1bff7273080b974 100644 (file)
@@ -32,7 +32,6 @@ class IdleConnList
 public:
     IdleConnList(const char *key, PconnPool *parent);
     ~IdleConnList();
-    int numIdle() { return nfds; }
 
     int findFDIndex(int fd); ///< search from the end of array
     void removeFD(int fd);
@@ -40,6 +39,8 @@ public:
     int findUseableFD();     ///< find first from the end not pending read fd.
     void clearHandlers(int fd);
 
+    int count() const { return nfds; }
+
 private:
     static IOCB read;
     static PF timeout;
@@ -82,10 +83,14 @@ public:
     void moduleInit();
     void push(int fd, const char *host, u_short port, const char *domain, Ip::Address &client_address);
     int pop(const char *host, u_short port, const char *domain, Ip::Address &client_address, bool retriable);
-    void count(int uses);
+    void noteUses(int uses);
     void dumpHist(StoreEntry *e);
     void dumpHash(StoreEntry *e);
-    void unlinkList(IdleConnList *list) const;
+    void unlinkList(IdleConnList *list);
+    void closeN(int n, const char *host, u_short port, const char *domain, Ip::Address &client_address);
+    int count() const { return theCount; }
+    void noteConnectionAdded() { ++theCount; }
+    void noteConnectionRemoved() { assert(theCount > 0); --theCount; }
 
 private:
 
@@ -94,7 +99,7 @@ private:
     int hist[PCONN_HIST_SZ];
     hash_table *table;
     const char *descr;
-
+    int theCount; ///< the number of pooled connections
 };