typedef enum { methodNone, methodReqmod, methodRespmod, methodOptions } Method;
typedef enum { pointNone, pointPreCache, pointPostCache } VectPoint;
+typedef enum { srvBlock, srvBypass, srvWait, srvForce} SrvBehaviour;
extern const char *crlf;
extern const char *methodStr(Method); // TODO: make into a stream operator?
Adaptation::ServiceConfig::ServiceConfig():
port(-1), method(methodNone), point(pointNone),
- bypass(false), routing(false), ipv6(false)
+ bypass(false), maxConn(-1), onOverload(srvWait),
+ routing(false), ipv6(false)
{}
const char *
// handle optional service name=value parameters
const char *lastOption = NULL;
bool grokkedUri = false;
+ bool onOverloadSet = false;
while (char *option = strtok(NULL, w_space)) {
if (strcmp(option, "0") == 0) { // backward compatibility
bypass = false;
// TODO: warn if option is set twice?
bool grokked = false;
- if (strcmp(name, "bypass") == 0)
+ if (strcmp(name, "bypass") == 0) {
grokked = grokBool(bypass, name, value);
- else if (strcmp(name, "routing") == 0)
+ } else if (strcmp(name, "routing") == 0)
grokked = grokBool(routing, name, value);
else if (strcmp(name, "uri") == 0)
grokked = grokkedUri = grokUri(value);
grokked = grokBool(ipv6, name, value);
if (grokked && ipv6 && !Ip::EnableIpv6)
debugs(3, DBG_IMPORTANT, "WARNING: IPv6 is disabled. ICAP service option ignored.");
+ } else if (strcmp(name, "max-conn") == 0)
+ grokked = grokLong(maxConn, name, value);
+ else if (strcmp(name, "on-overload") == 0) {
+ grokked = grokOnOverload(onOverload, value);
+ onOverloadSet = true;
} else
grokked = grokExtension(name, value);
return false;
}
+ // set default on-overload value if needed
+ if (!onOverloadSet)
+ onOverload = bypass ? srvBypass : srvWait;
+
// what is left must be the service URI
if (!grokkedUri && !grokUri(lastOption))
return false;
return true;
}
+bool
+Adaptation::ServiceConfig::grokLong(long &var, const char *name, const char *value)
+{
+ char *bad = NULL;
+ const long p = strtol(value, &bad, 0);
+ if (p < 0 || bad == value) {
+ debugs(3, DBG_CRITICAL, "ERROR: " << cfg_filename << ':' <<
+ config_lineno << ": " << "wrong value for " << name << "; " <<
+ "a non-negative integer expected but got: " << value);
+ return false;
+ }
+ var = p;
+ return true;
+}
+
+bool
+Adaptation::ServiceConfig::grokOnOverload(SrvBehaviour &var, const char *value)
+{
+ if (strcmp(value, "block") == 0)
+ var = srvBlock;
+ else if (strcmp(value, "bypass") == 0)
+ var = srvBypass;
+ else if (strcmp(value, "wait") == 0)
+ var = srvWait;
+ else if (strcmp(value, "force") == 0)
+ var = srvForce;
+ else {
+ debugs(3, DBG_CRITICAL, "ERROR: " << cfg_filename << ':' <<
+ config_lineno << ": " << "wrong value for on-overload; " <<
+ "'block', 'bypass', 'wait' or 'force' expected but got: " << value);
+ return false;
+ }
+ return true;
+}
+
bool
Adaptation::ServiceConfig::grokExtension(const char *name, const char *value)
{
Method method; // what is being adapted (REQMOD vs RESPMOD)
VectPoint point; // where the adaptation happens (pre- or post-cache)
bool bypass;
+
+ // options
+ long maxConn; ///< maximum number of concurrent service transactions
+ SrvBehaviour onOverload; ///< how to handle Max-Connections feature
bool routing; ///< whether this service may determine the next service(s)
bool ipv6; ///< whether this service uses IPv6 transport (default IPv4)
/// interpret parsed values
bool grokBool(bool &var, const char *name, const char *value);
bool grokUri(const char *value);
+ bool grokLong(long &var, const char *name, const char *value);
+ /// handle on-overload configuration option
+ bool grokOnOverload(SrvBehaviour &var, const char *value);
/// handle name=value configuration option with name unknown to Squid
virtual bool grokExtension(const char *name, const char *value);
};
canStartBypass = service().cfg().bypass;
// it is an ICAP violation to send request to a service w/o known OPTIONS
-
- if (service().up())
+ // and the service may is too busy for us: honor Max-Connections and such
+ if (service().up() && service().availableForNew())
startWriting();
else
waitForService();
void Adaptation::Icap::ModXact::waitForService()
{
+ const char *comment;
Must(!state.serviceWaiting);
- debugs(93, 7, HERE << "will wait for the ICAP service" << status());
- typedef NullaryMemFunT<ModXact> Dialer;
- AsyncCall::Pointer call = JobCallback(93,5,
- Dialer, this, Adaptation::Icap::ModXact::noteServiceReady);
- service().callWhenReady(call);
+
+ if (!service().up()) {
+ AsyncCall::Pointer call = JobCallback(93,5,
+ ConnWaiterDialer, this, Adaptation::Icap::ModXact::noteServiceReady);
+
+ service().callWhenReady(call);
+ comment = "to be up";
+ } else {
+ //The service is unavailable because of max-connection or other reason
+
+ if (service().cfg().onOverload != srvWait) {
+ // The service is overloaded, but waiting to be available prohibited by
+ // user configuration (onOverload is set to "block" or "bypass")
+ if (service().cfg().onOverload == srvBlock)
+ disableBypass("not available", true);
+ else //if (service().cfg().onOverload == srvBypass)
+ canStartBypass = true;
+
+ disableRetries();
+ disableRepeats("ICAP service is not available");
+
+ debugs(93, 7, HERE << "will not wait for the service to be available" <<
+ status());
+
+ throw TexcHere("ICAP service is not available");
+ }
+
+ AsyncCall::Pointer call = JobCallback(93,5,
+ ConnWaiterDialer, this, Adaptation::Icap::ModXact::noteServiceAvailable);
+ service().callWhenAvailable(call, state.waitedForService);
+ comment = "to be available";
+ }
+
+ debugs(93, 7, HERE << "will wait for the service " << comment << status());
state.serviceWaiting = true; // after callWhenReady() which may throw
+ state.waitedForService = true;
}
void Adaptation::Icap::ModXact::noteServiceReady()
Must(state.serviceWaiting);
state.serviceWaiting = false;
- if (service().up()) {
- startWriting();
- } else {
+ if (!service().up()) {
disableRetries();
disableRepeats("ICAP service is unusable");
throw TexcHere("ICAP service is unusable");
}
+
+ if (service().availableForOld())
+ startWriting();
+ else
+ waitForService();
+}
+
+void Adaptation::Icap::ModXact::noteServiceAvailable()
+{
+ Must(state.serviceWaiting);
+ state.serviceWaiting = false;
+
+ if (service().up() && service().availableForOld())
+ startWriting();
+ else
+ waitForService();
}
void Adaptation::Icap::ModXact::startWriting()
// service waiting
void noteServiceReady();
+ void noteServiceAvailable();
public:
InOut virgin;
bool allowedPostview206; // must handle 206 Partial Content outside preview
bool allowedPreview206; // must handle 206 Partial Content inside preview
bool readyForUob; ///< got a 206 response and expect a use-origin-body
+ bool waitedForService; ///< true if was queued at least once
// will not write anything [else] to the ICAP server connection
bool doneWriting() const { return writing == writingReallyDone; }
}
cfgIntHeader(h, "Max-Connections", max_connections);
+ if (max_connections == 0)
+ debugs(93, DBG_IMPORTANT, "WARNING: Max-Connections is set to zero! ");
cfgIntHeader(h, "Options-TTL", theTTL);
#include "adaptation/icap/ServiceRep.h"
#include "base/TextException.h"
#include "ConfigParser.h"
+#include "ip/tools.h"
#include "HttpReply.h"
#include "SquidTime.h"
+#include "fde.h"
CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Icap, ServiceRep);
Adaptation::Icap::ServiceRep::ServiceRep(const ServiceConfigPointer &svcCfg):
AsyncJob("Adaptation::Icap::ServiceRep"), Adaptation::Service(svcCfg),
theOptions(NULL), theOptionsFetcher(0), theLastUpdate(0),
+ theBusyConns(0),
+ theAllWaiters(0),
+ connOverloadReported(false),
+ theIdleConns("ICAP Service"),
isSuspended(0), notifying(false),
updateScheduled(false),
wasAnnouncedUp(true), // do not announce an "up" service at startup
isDetached(false)
-{}
+{
+ setMaxConnections();
+}
Adaptation::Icap::ServiceRep::~ServiceRep()
{
// should be configurable.
}
+// returns a persistent or brand new connection; negative int on failures
+int Adaptation::Icap::ServiceRep::getConnection(bool retriableXact, bool &reused)
+{
+ Ip::Address client_addr;
+
+ int connection = theIdleConns.pop(cfg().host.termedBuf(), cfg().port, NULL, client_addr,
+ retriableXact);
+
+ reused = connection >= 0; // reused a persistent connection
+
+ if (!reused) { // need a new connection
+ Ip::Address outgoing; // default: IP6_ANY_ADDR
+ if (!Ip::EnableIpv6)
+ outgoing.SetIPv4();
+ else if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && !cfg().ipv6) {
+ /* split-stack for now requires default IPv4-only socket */
+ outgoing.SetIPv4();
+ }
+ connection = comm_open(SOCK_STREAM, 0, outgoing, COMM_NONBLOCKING, cfg().uri.termedBuf());
+ }
+
+ if (connection >= 0)
+ ++theBusyConns;
+
+ return connection;
+}
+
+// pools connection if it is reusable or closes it
+void Adaptation::Icap::ServiceRep::putConnection(int fd, bool isReusable, const char *comment)
+{
+ Must(fd >= 0);
+ // do not pool an idle connection if we owe connections
+ if (isReusable && excessConnections() == 0) {
+ debugs(93, 3, HERE << "pushing pconn" << comment);
+ commSetTimeout(fd, -1, NULL, NULL);
+ Ip::Address anyAddr;
+ theIdleConns.push(fd, cfg().host.termedBuf(), cfg().port, NULL, anyAddr);
+ } else {
+ debugs(93, 3, HERE << "closing pconn" << comment);
+ // comm_close will clear timeout
+ comm_close(fd);
+ }
+
+ Must(theBusyConns > 0);
+ --theBusyConns;
+ // a connection slot released. Check if there are waiters....
+ busyCheckpoint();
+}
+
+// a wrapper to avoid exposing theIdleConns
+void Adaptation::Icap::ServiceRep::noteConnectionUse(int fd)
+{
+ Must(fd >= 0);
+ fd_table[fd].noteUse(&theIdleConns);
+}
+
+void Adaptation::Icap::ServiceRep::setMaxConnections()
+{
+ if (cfg().maxConn >= 0)
+ theMaxConnections = cfg().maxConn;
+ else if (theOptions && theOptions->max_connections >= 0)
+ theMaxConnections = theOptions->max_connections;
+ else {
+ theMaxConnections = -1;
+ return;
+ }
+
+ if (::Config.workers > 1 )
+ theMaxConnections /= ::Config.workers;
+}
+
+int Adaptation::Icap::ServiceRep::availableConnections() const
+{
+ if (theMaxConnections < 0)
+ return -1;
+
+ // we are available if we can open or reuse connections
+ // in other words, if we will not create debt
+ int available = max(0, theMaxConnections - theBusyConns);
+
+ if (!available && !connOverloadReported) {
+ debugs(93, DBG_IMPORTANT, "WARNING: ICAP Max-Connections limit " <<
+ "exceeded for service " << cfg().uri << ". Open connections now: " <<
+ theBusyConns + theIdleConns.count() << ", including " <<
+ theIdleConns.count() << " idle persistent connections.");
+ connOverloadReported = true;
+ }
+
+ if (cfg().onOverload == srvForce)
+ return -1;
+
+ return available;
+}
+
+// The number of connections which excess the Max-Connections limit
+int Adaptation::Icap::ServiceRep::excessConnections() const
+{
+ if (theMaxConnections < 0)
+ return 0;
+
+ // Waiters affect the number of needed connections but a needed
+ // connection may still be excessive from Max-Connections p.o.v.
+ // so we should not account for waiting transaction needs here.
+ const int debt = theBusyConns + theIdleConns.count() - theMaxConnections;
+ if (debt > 0)
+ return debt;
+ else
+ return 0;
+}
+
+void Adaptation::Icap::ServiceRep::noteGoneWaiter()
+{
+ theAllWaiters--;
+
+ // in case the notified transaction did not take the connection slot
+ busyCheckpoint();
+}
+
+// called when a connection slot may become available
+void Adaptation::Icap::ServiceRep::busyCheckpoint()
+{
+ if (theNotificationWaiters.empty()) // nobody is waiting for a slot
+ return;
+
+ int freed = 0;
+ int available = availableConnections();
+
+ if (available < 0) {
+ // It is possible to have waiters when no limit on connections exist in
+ // case of reconfigure or because new Options received.
+ // In this case, notify all waiting transactions.
+ freed = theNotificationWaiters.size();
+ } else {
+ // avoid notifying more waiters than there will be available slots
+ const int notifiedWaiters = theAllWaiters - theNotificationWaiters.size();
+ freed = available - notifiedWaiters;
+ }
+
+ debugs(93,7, HERE << "Available connections: " << available <<
+ " freed slots: " << freed <<
+ " waiting in queue: " << theNotificationWaiters.size());
+
+ while (freed > 0 && !theNotificationWaiters.empty()) {
+ Client i = theNotificationWaiters.front();
+ theNotificationWaiters.pop_front();
+ ScheduleCallHere(i.callback);
+ i.callback = NULL;
+ --freed;
+ }
+}
+
void Adaptation::Icap::ServiceRep::suspend(const char *reason)
{
if (isSuspended) {
return !isSuspended && hasOptions();
}
+bool Adaptation::Icap::ServiceRep::availableForNew() const
+{
+ Must(up());
+ int available = availableConnections();
+ if (available < 0)
+ return true;
+ else
+ return (available - theAllWaiters > 0);
+}
+
+bool Adaptation::Icap::ServiceRep::availableForOld() const
+{
+ Must(up());
+
+ int available = availableConnections();
+ return (available != 0); // it is -1 (no limit) or has available slots
+}
+
+
bool Adaptation::Icap::ServiceRep::wantsUrl(const String &urlPath) const
{
Must(hasOptions());
notifying = false;
}
+void Adaptation::Icap::ServiceRep::callWhenAvailable(AsyncCall::Pointer &cb, bool priority)
+{
+ debugs(93,8, "ICAPServiceRep::callWhenAvailable");
+ Must(cb!=NULL);
+ Must(up());
+ Must(!theIdleConns.count()); // or we should not be waiting
+
+ Client i;
+ i.service = Pointer(this);
+ i.callback = cb;
+ if (priority)
+ theNotificationWaiters.push_front(i);
+ else
+ theNotificationWaiters.push_back(i);
+
+ busyCheckpoint();
+}
+
void Adaptation::Icap::ServiceRep::callWhenReady(AsyncCall::Pointer &cb)
{
Must(cb!=NULL);
debugs(93,3, HERE << "got new options and is now " << status());
scheduleUpdate(optionsFetchTime());
+
+ setMaxConnections();
+ const int excess = excessConnections();
+ // if we owe connections and have idle pconns, close the latter
+ if (excess && theIdleConns.count() > 0) {
+ const int n = min(excess, theIdleConns.count());
+ debugs(93,5, HERE << "closing " << n << " pconns to relief debt");
+ Ip::Address anyAddr;
+ theIdleConns.closeN(n, cfg().host.termedBuf(), cfg().port, NULL, anyAddr);
+ }
+
scheduleNotification();
}
{
return isDetached;
}
+
+Adaptation::Icap::ConnWaiterDialer::ConnWaiterDialer(const CbcPointer<ModXact> &xact,
+ Adaptation::Icap::ConnWaiterDialer::Parent::Method aHandler):
+ Parent(xact, aHandler)
+{
+ theService = &xact->service();
+ theService->noteNewWaiter();
+}
+
+Adaptation::Icap::ConnWaiterDialer::ConnWaiterDialer(const Adaptation::Icap::ConnWaiterDialer &aConnWaiter): Parent(aConnWaiter)
+{
+ theService = aConnWaiter.theService;
+ theService->noteNewWaiter();
+}
+
+Adaptation::Icap::ConnWaiterDialer::~ConnWaiterDialer()
+{
+ theService->noteGoneWaiter();
+}
#include "adaptation/forward.h"
#include "adaptation/Initiator.h"
#include "adaptation/icap/Elements.h"
-
+#include "base/AsyncJobCalls.h"
+#include "comm.h"
+#include "pconn.h"
+#include <deque>
namespace Adaptation
{
virtual bool probed() const; // see comments above
virtual bool up() const; // see comments above
+ bool availableForNew() const; ///< a new transaction may start communicating with the service
+ bool availableForOld() const; ///< a transaction notified about connection slot availability may start communicating with the service
virtual Initiate *makeXactLauncher(HttpMsg *virginHeader, HttpRequest *virginCause);
+ void callWhenAvailable(AsyncCall::Pointer &cb, bool priority = false);
void callWhenReady(AsyncCall::Pointer &cb);
// the methods below can only be called on an up() service
bool wantsPreview(const String &urlPath, size_t &wantedSize) const;
bool allows204() const;
bool allows206() const;
+ int getConnection(bool isRetriable, bool &isReused);
+ void putConnection(int fd, bool isReusable, const char *comment);
+ void noteConnectionUse(int fd);
void noteFailure(); // called by transactions to report service failure
+ void noteNewWaiter() {theAllWaiters++;} ///< New xaction waiting for service to be up or available
+ void noteGoneWaiter(); ///< An xaction is not waiting any more for service to be available
+ bool existWaiters() const {return (theAllWaiters > 0);} ///< if there are xactions waiting for the service to be available
+
//AsyncJob virtual methods
virtual bool doneAll() const { return Adaptation::Initiator::doneAll() && false;}
virtual void callException(const std::exception &e);
};
typedef Vector<Client> Clients;
+ // TODO: rename to theUpWaiters
Clients theClients; // all clients waiting for a call back
Options *theOptions;
CbcPointer<Adaptation::Initiate> theOptionsFetcher; // pending ICAP OPTIONS transaction
time_t theLastUpdate; // time the options were last updated
+ /// FIFO queue of xactions waiting for a connection slot and not yet notified
+ /// about it; xaction is removed when notification is scheduled
+ std::deque<Client> theNotificationWaiters;
+ int theBusyConns; ///< number of connections given to active transactions
+ /// number of xactions waiting for a connection slot (notified and not)
+ /// the number is decreased after the xaction receives notification
+ int theAllWaiters;
+ int theMaxConnections; ///< the maximum allowed connections to the service
+ // TODO: use a better type like the FadingCounter for connOverloadReported
+ mutable bool connOverloadReported; ///< whether we reported exceeding theMaxConnections
+ PconnPool theIdleConns; ///< idle persistent connection pool
+
FadingCounter theSessionFailures;
const char *isSuspended; // also stores suspension reason for debugging
void announceStatusChange(const char *downPhrase, bool important) const;
+ /// Set the maximum allowed connections for the service
+ void setMaxConnections();
+ /// The number of connections which excess the Max-Connections limit
+ int excessConnections() const;
+ /**
+ * The available connections slots to the ICAP server
+ \return the available slots, or -1 if there is no limit on allowed connections
+ */
+ int availableConnections() const;
+ /**
+ * If there are xactions waiting for the service to be available, notify
+ * as many xactions as the available connections slots.
+ */
+ void busyCheckpoint();
+
const char *status() const;
mutable bool wasAnnouncedUp; // prevent sequential same-state announcements
CBDATA_CLASS2(ServiceRep);
};
+class ModXact;
+/// Custom dialer to call Service::noteNewWaiter and noteGoneWaiter
+/// to maintain Service idea of waiting and being-notified transactions.
+class ConnWaiterDialer: public NullaryMemFunT<ModXact>
+{
+public:
+ typedef NullaryMemFunT<ModXact> Parent;
+ ServiceRep::Pointer theService;
+ ConnWaiterDialer(const CbcPointer<ModXact> &xact, Parent::Method aHandler);
+ ConnWaiterDialer(const Adaptation::Icap::ConnWaiterDialer &aConnWaiter);
+ ~ConnWaiterDialer();
+};
} // namespace Icap
} // namespace Adaptation
#include "SquidTime.h"
#include "err_detail_type.h"
-static PconnPool *icapPconnPool = new PconnPool("ICAP Servers");
-
//CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Icap, Xaction);
// TODO: obey service-specific, OPTIONS-reported connection limit
void Adaptation::Icap::Xaction::openConnection()
{
- Ip::Address client_addr;
-
Must(connection < 0);
- const Adaptation::Service &s = service();
+ Adaptation::Icap::ServiceRep &s = service();
if (!TheConfig.reuse_connections)
disableRetries(); // this will also safely drain pconn pool
- // TODO: check whether NULL domain is appropriate here
- connection = icapPconnPool->pop(s.cfg().host.termedBuf(), s.cfg().port, NULL, client_addr, isRetriable);
- if (connection >= 0) {
- debugs(93,3, HERE << "reused pconn FD " << connection);
+ bool wasReused = false;
+ connection = s.getConnection(isRetriable, wasReused);
+ if (connection < 0)
+ dieOnConnectionFailure(); // throws
+
+ if (wasReused) {
+ // Set comm Close handler
+ typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommCloseCbParams> CloseDialer;
+ closer = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed",
+ CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed));
+ comm_add_close_handler(connection, closer);
// fake the connect callback
// TODO: can we sync call Adaptation::Icap::Xaction::noteCommConnected here instead?
disableRetries(); // we only retry pconn failures
- Ip::Address outgoing;
- if (!Ip::EnableIpv6 && !outgoing.SetIPv4()) {
- debugs(31, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << outgoing << " is not an IPv4 address.");
- dieOnConnectionFailure(); // throws
- }
- /* split-stack for now requires default IPv4-only socket */
- if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && outgoing.IsAnyAddr() && !s.cfg().ipv6) {
- outgoing.SetIPv4();
- }
-
- connection = comm_open(SOCK_STREAM, 0, outgoing,
- COMM_NONBLOCKING, s.cfg().uri.termedBuf());
- if (connection < 0)
- dieOnConnectionFailure(); // throws
-
- debugs(93,3, typeName << " opens connection to " << s.cfg().host << ":" << s.cfg().port);
+ debugs(93,3, typeName << " opens connection to " << s.cfg().host.termedBuf() << ":" << s.cfg().port);
// TODO: service bypass status may differ from that of a transaction
typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommTimeoutCbParams> TimeoutDialer;
- AsyncCall::Pointer timeoutCall = JobCallback(93, 5,
- TimeoutDialer, this, Adaptation::Icap::Xaction::noteCommTimedout);
+ AsyncCall::Pointer timeoutCall = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommTimedout",
+ TimeoutDialer(this,&Adaptation::Icap::Xaction::noteCommTimedout));
+
commSetTimeout(connection, TheConfig.connect_timeout(
service().cfg().bypass), timeoutCall);
typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommCloseCbParams> CloseDialer;
- closer = JobCallback(93, 5,
- CloseDialer, this, Adaptation::Icap::Xaction::noteCommClosed);
+ closer = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed",
+ CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed));
comm_add_close_handler(connection, closer);
typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> ConnectDialer;
- connector = JobCallback(93,3,
- ConnectDialer, this, Adaptation::Icap::Xaction::noteCommConnected);
+ connector = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected",
+ ConnectDialer(this, &Adaptation::Icap::Xaction::noteCommConnected));
commConnectStart(connection, s.cfg().host.termedBuf(), s.cfg().port, connector);
}
reuseConnection = false;
}
- if (reuseConnection) {
- Ip::Address client_addr;
- //status() adds leading spaces.
- debugs(93,3, HERE << "pushing pconn" << status());
- AsyncCall::Pointer call = NULL;
- commSetTimeout(connection, -1, call);
- icapPconnPool->push(connection, theService->cfg().host.termedBuf(),
- theService->cfg().port, NULL, client_addr);
+ if (reuseConnection)
disableRetries();
- } else {
- //status() adds leading spaces.
- debugs(93,3, HERE << "closing pconn" << status());
- // comm_close will clear timeout
- comm_close(connection);
- }
+
+ Adaptation::Icap::ServiceRep &s = service();
+ s.putConnection(connection, reuseConnection, status());
writer = NULL;
reader = NULL;
if (io.flag != COMM_OK)
dieOnConnectionFailure(); // throws
- fd_table[connection].noteUse(icapPconnPool);
+ service().noteConnectionUse(connection);
handleCommConnected();
}
void setOutcome(const XactOutcome &xo);
virtual void finalizeLogInfo();
+public:
ServiceRep &service();
private:
is to use IPv4-only connections. When set to 'on' this option will
make Squid use IPv6-only connections to contact this ICAP service.
+ on-overload=block|bypass|wait|force
+ If the service Max-Connections limit has been reached, do
+ one of the following for each new ICAP transaction:
+ * block: send an HTTP error response to the client
+ * bypass: ignore the "over-connected" ICAP service
+ * wait: wait (in a FIFO queue) for an ICAP connection slot
+ * force: proceed, ignoring the Max-Connections limit
+
+ In SMP mode with N workers, each worker assumes the service
+ connection limit is Max-Connections/N, even though not all
+ workers may use a given service.
+
+ The default value is "bypass" if service is bypassable,
+ otherwise it is set to "wait".
+
+
+ max-conn=number
+ Use the given number as the Max-Connections limit, regardless
+ of the Max-Connections value given by the service, if any.
+
Older icap_service format without optional named parameters is
deprecated but supported for backward compatibility.
commCallCloseHandlers(fd);
if (F->pconn.uses)
- F->pconn.pool->count(F->pconn.uses);
+ F->pconn.pool->noteUses(F->pconn.uses);
comm_empty_os_read_buffers(fd);
for (; index < nfds - 1; index++)
fds[index] = fds[index + 1];
+ if (parent)
+ parent->noteConnectionRemoved();
+
if (--nfds == 0) {
debugs(48, 3, "IdleConnList::removeFD: deleting " << hashKeyStr(&hash));
delete this;
xfree(old);
}
+ if (parent)
+ parent->noteConnectionAdded();
+
fds[nfds++] = fd;
comm_read(fd, fakeReadBuf, sizeof(fakeReadBuf), IdleConnList::read, this);
commSetTimeout(fd, Config.Timeout.pconn, IdleConnList::timeout, this);
/* ========== PconnPool PUBLIC FUNCTIONS ============================================ */
-PconnPool::PconnPool(const char *aDescr) : table(NULL), descr(aDescr)
+PconnPool::PconnPool(const char *aDescr) : table(NULL), descr(aDescr),
+ theCount(0)
{
int i;
table = hash_create((HASHCMP *) strcmp, 229, hash_string);
* We close available persistent connection if the caller transaction is not
* retriable to avoid having a growing number of open connections when many
* transactions create persistent connections but are not retriable.
+ * PconnPool::closeN() relies on that behavior as well.
*/
int
PconnPool::pop(const char *host, u_short port, const char *domain, Ip::Address &client_address, bool isRetriable)
}
void
-PconnPool::unlinkList(IdleConnList *list) const
+PconnPool::closeN(int n, const char *host, u_short port, const char *domain, Ip::Address &client_address)
+{
+ // TODO: optimize: we can probably do hash_lookup just once
+ for (int i = 0; i < n; ++i)
+ pop(host, port, domain, client_address, false); // may fail!
+}
+
+void
+PconnPool::unlinkList(IdleConnList *list)
{
+ theCount -= list->count();
+ assert(theCount >= 0);
hash_remove_link(table, &list->hash);
}
void
-PconnPool::count(int uses)
+PconnPool::noteUses(int uses)
{
if (uses >= PCONN_HIST_SZ)
uses = PCONN_HIST_SZ - 1;
public:
IdleConnList(const char *key, PconnPool *parent);
~IdleConnList();
- int numIdle() { return nfds; }
int findFDIndex(int fd); ///< search from the end of array
void removeFD(int fd);
int findUseableFD(); ///< find first from the end not pending read fd.
void clearHandlers(int fd);
+ int count() const { return nfds; }
+
private:
static IOCB read;
static PF timeout;
void moduleInit();
void push(int fd, const char *host, u_short port, const char *domain, Ip::Address &client_address);
int pop(const char *host, u_short port, const char *domain, Ip::Address &client_address, bool retriable);
- void count(int uses);
+ void noteUses(int uses);
void dumpHist(StoreEntry *e);
void dumpHash(StoreEntry *e);
- void unlinkList(IdleConnList *list) const;
+ void unlinkList(IdleConnList *list);
+ void closeN(int n, const char *host, u_short port, const char *domain, Ip::Address &client_address);
+ int count() const { return theCount; }
+ void noteConnectionAdded() { ++theCount; }
+ void noteConnectionRemoved() { assert(theCount > 0); --theCount; }
private:
int hist[PCONN_HIST_SZ];
hash_table *table;
const char *descr;
-
+ int theCount; ///< the number of pooled connections
};