This patch adds the url_rewrite_timeout directive.
When configured, Squid keeps track of active requests and treats timed out
requests to redirector as failed requests.
url_rewrite_timeout format:
url_rewrite_timeout timeout time-units on_timeout=<fail|bypass|retry|use_configured_response> [response=<quoted-string>]
The url_rewrite_timeout directive can accept the on_timeout argument to allow
user configure the action when the helper request times out.
The available actions are:
fail: squid return a ERR_GATEWAY_FAILURE error page
bypass: the url is not rewritten.
retry: retry the request to helper
use_configured_response: use a response which can be configured using the
the response= option
Example usage:
url_rewrite_timeout 30 seconds \
on_timeout=use_configured_response \
response="OK url=http://example.com/support"
This is a Measurement Factory project
<p>The most important of these new features are:
<itemize>
+ <item>Helper concurrency channels changes
<item>Configurable helper queue size
<item>SSLv2 support removal
</itemize>
<p>The new queue-size=N option to helpers configuration, allows users
to configure the maximum number of queued requests to busy helpers.
+<sect1>Helper concurrency channels changes
+<p> helper-mux.pl we have been distributing for the past few years to
+ encourage use of concurrency is no longer compatible with Squid. If
+ used it will spawn up to 2^64 helpers and DoS the Squid server.
+
+<p> Helpers utilizing arrays to handle fixed amounts of concurrency
+ channels MUST be re-written to use queues and capable of handling a
+ 64-bit int as index or they will be vulnerable to buffer overrun and
+ arbitrary memory accesses.
+
+<p> 32-bit helpers need re-writing to handle the concurrency channel ID
+ as a 64-bit integer value. If not updated they will cause proxies to
+ return unexpected results or timeout once crossing the 32-bit wrap
+ boundary. Leading to undefined behaviour in the client HTTP traffic.
<sect1>SSLv2 support removal
<p>Details in <url url="https://tools.ietf.org/html/rfc6176" name="RFC 6176">
<sect1>New tags<label id="newtags">
<p>
<descrip>
-
+ <tag> url_rewrite_timeout </tag>
+ <p> Squid times active requests to redirector. This option sets
+ the timeout value and the Squid reaction to a timed out
+ request. </p>
</descrip>
<sect1>Changes to existing tags<label id="modifiedtags">
int redirect_state;
int store_id_state;
- /**
- * URL-rewrite/redirect helper may return BH for internal errors.
- * We attempt to recover by trying the lookup again, but limit the
- * number of retries to prevent lag and lockups.
- * This tracks the number of previous failures for the current context.
- */
- uint8_t redirect_fail_count;
- uint8_t store_id_fail_count;
-
bool host_header_verify_done;
bool http_access_done;
bool adapted_http_access_done;
std::queue<std::string> ConfigParser::Undo_;
bool ConfigParser::AllowMacros_ = false;
bool ConfigParser::ParseQuotedOrToEol_ = false;
+bool ConfigParser::ParseKvPair_ = false;
+ConfigParser::ParsingStates ConfigParser::KvPairState_ = ConfigParser::atParseKey;
bool ConfigParser::RecognizeQuotedPair_ = false;
bool ConfigParser::PreviewMode_ = false;
const char *tokenStart = nextToken;
const char *sep;
- if (ConfigParser::ParseQuotedOrToEol_)
+ if (ConfigParser::ParseKvPair_) {
+ if (ConfigParser::KvPairState_ == ConfigParser::atParseKey)
+ sep = "=";
+ else
+ sep = w_space;
+ } else if (ConfigParser::ParseQuotedOrToEol_)
sep = "\n";
else if (!ConfigParser::RecognizeQuotedValues || *nextToken == '(')
sep = w_space;
return token;
}
+bool
+ConfigParser::NextKvPair(char * &key, char * &value)
+{
+ key = value = NULL;
+ ParseKvPair_ = true;
+ KvPairState_ = ConfigParser::atParseKey;
+ if ((key = NextToken()) != NULL) {
+ KvPairState_ = ConfigParser::atParseValue;
+ value = NextQuotedToken();
+ }
+ ParseKvPair_ = false;
+
+ if (!key)
+ return false;
+ if (!value) {
+ debugs(3, DBG_CRITICAL, "Error while parsing key=value token. Value missing after: " << key);
+ return false;
+ }
+
+ return true;
+}
+
char *
ConfigParser::RegexStrtokFile()
{
*/
static char *NextQuotedOrToEol();
+ /**
+ * the next key value pair which must be separated by "="
+ * \return true on success, false otherwise
+ */
+ static bool NextKvPair(char * &key, char * &value);
+
/**
* Preview the next token. The next NextToken() and strtokFile() call
* will return the same token.
static bool ParseQuotedOrToEol_; ///< The next tokens will be handled as quoted or to_eol token
static bool RecognizeQuotedPair_; ///< The next tokens may contain quoted-pair (\-escaped) characters
static bool PreviewMode_; ///< The next token will not poped from cfg files, will just previewd.
+ static bool ParseKvPair_; ///<The next token will be handled as kv-pair token
+ static enum ParsingStates {atParseKey, atParseValue} KvPairState_; ///< Parsing state while parsing kv-pair tokens
};
int parseConfigFile(const char *file_name);
int mcast_icp_query; /* msec */
time_msec_t idns_retransmit;
time_msec_t idns_query;
+ time_t urlRewrite;
} Timeout;
size_t maxRequestHeaderSize;
int64_t maxRequestBodySize;
char *redirector_extras;
+ struct {
+ int action;
+ char *response;
+ } onUrlRewriteTimeout;
+
char *storeId_extras;
struct {
debugs(29, DBG_IMPORTANT, "ERROR: Digest auth does not support the result code received. Using the wrong helper program? received: " << reply);
// fall through to next case. Handle this as an ERR response.
+ case Helper::TimedOut:
case Helper::BrokenHelper:
// TODO retry the broken lookup on another helper?
// fall through to next case for now. Handle this as an ERR response silently.
-
case Helper::Error: {
/* allow this because the digest_request pointer is purely local */
Auth::Digest::UserRequest *digest_request = dynamic_cast<Auth::Digest::UserRequest *>(auth_user_request.getRaw());
debugs(29, DBG_IMPORTANT, "ERROR: Negotiate Authentication Helper '" << reply.whichServer << "' crashed!.");
/* continue to the next case */
+ case Helper::TimedOut:
case Helper::BrokenHelper: {
/* TODO kick off a refresh process. This can occur after a YR or after
* a KK. If after a YR release the helper and resubmit the request via
debugs(29, DBG_IMPORTANT, "ERROR: NTLM Authentication Helper '" << reply.whichServer << "' crashed!.");
/* continue to the next case */
+ case Helper::TimedOut:
case Helper::BrokenHelper: {
/* TODO kick off a refresh process. This can occur after a YR or after
* a KK. If after a YR release the helper and resubmit the request via
#include "NeighborTypeDomainList.h"
#include "Parsing.h"
#include "pconn.h"
+#include "redirect.h"
#include "PeerDigest.h"
#include "PeerPoolMgr.h"
#include "RefreshPattern.h"
static void configDoConfigure(void);
static void parse_refreshpattern(RefreshPattern **);
static uint64_t parseTimeUnits(const char *unit, bool allowMsec);
-static void parseTimeLine(time_msec_t * tptr, const char *units, bool allowMsec);
+static void parseTimeLine(time_msec_t * tptr, const char *units, bool allowMsec, bool expectMoreArguments);
static void parse_u_short(unsigned short * var);
static void parse_string(char **);
static void default_all(void);
static void dump_CpuAffinityMap(StoreEntry *const entry, const char *const name, const CpuAffinityMap *const cpuAffinityMap);
static void free_CpuAffinityMap(CpuAffinityMap **const cpuAffinityMap);
+static void parse_url_rewrite_timeout(SquidConfig *);
+static void dump_url_rewrite_timeout(StoreEntry *, const char *, SquidConfig &);
+static void free_url_rewrite_timeout(SquidConfig *);
+
static int parseOneConfigFile(const char *file_name, unsigned int depth);
static void parse_configuration_includes_quoted_values(bool *recognizeQuotedValues);
/* Parse a time specification from the config file. Store the
* result in 'tptr', after converting it to 'units' */
static void
-parseTimeLine(time_msec_t * tptr, const char *units, bool allowMsec)
+parseTimeLine(time_msec_t * tptr, const char *units, bool allowMsec, bool expectMoreArguments = false)
{
char *token;
double d;
m = u; /* default to 'units' if none specified */
+ bool hasUnits = false;
if (0 == d)
(void) 0;
- else if ((token = ConfigParser::NextToken()) == NULL)
+ else if ((token = ConfigParser::PeekAtToken()) == NULL)
+ (void) 0;
+ else if ((m = parseTimeUnits(token, allowMsec)) == 0) {
+ if (!expectMoreArguments)
+ self_destruct();
+ } else { //pop the token
+ (void)ConfigParser::NextToken();
+ hasUnits = true;
+ }
+ if (!hasUnits)
debugs(3, DBG_CRITICAL, "WARNING: No units on '" <<
config_input_line << "', assuming " <<
d << " " << units );
- else if ((m = parseTimeUnits(token, allowMsec)) == 0)
- self_destruct();
*tptr = static_cast<time_msec_t>(m * d);
FtpEspvDeprecated = false;
}
+static void
+parse_url_rewrite_timeout(SquidConfig *config)
+{
+ time_msec_t tval;
+ parseTimeLine(&tval, T_SECOND_STR, false, true);
+ Config.Timeout.urlRewrite = static_cast<time_t>(tval/1000);
+
+ char *key, *value;
+ while(ConfigParser::NextKvPair(key, value)) {
+ if (strcasecmp(key, "on_timeout") == 0) {
+ if (strcasecmp(value, "bypass") == 0)
+ Config.onUrlRewriteTimeout.action = toutActBypass;
+ else if (strcasecmp(value, "fail") == 0)
+ Config.onUrlRewriteTimeout.action = toutActFail;
+ else if (strcasecmp(value, "retry") == 0)
+ Config.onUrlRewriteTimeout.action = toutActRetry;
+ else if (strcasecmp(value, "use_configured_response") == 0) {
+ Config.onUrlRewriteTimeout.action = toutActUseConfiguredResponse;
+ } else {
+ debugs(3, DBG_CRITICAL, "FATAL: unsuported \"on_timeout\" action:" << value);
+ self_destruct();
+ }
+ } else if (strcasecmp(key, "response") == 0) {
+ Config.onUrlRewriteTimeout.response = xstrdup(value);
+ } else {
+ debugs(3, DBG_CRITICAL, "FATAL: unsuported option " << key);
+ self_destruct();
+ }
+ }
+
+ if (Config.onUrlRewriteTimeout.action == toutActUseConfiguredResponse && !Config.onUrlRewriteTimeout.response) {
+ debugs(3, DBG_CRITICAL, "FATAL: Expected 'response=' option after 'on_timeout=use_configured_response' option");
+ self_destruct();
+ }
+
+ if (Config.onUrlRewriteTimeout.action != toutActUseConfiguredResponse && Config.onUrlRewriteTimeout.response) {
+ debugs(3, DBG_CRITICAL, "FATAL: 'response=' option is valid only when used with the 'on_timeout=use_configured_response' option");
+ self_destruct();
+ }
+}
+
+static void
+dump_url_rewrite_timeout(StoreEntry *entry, const char *name, SquidConfig &config)
+{
+ const char *onTimedOutActions[] = {"bypass", "fail", "retry", "use_configured_response"};
+ assert(Config.onUrlRewriteTimeout.action >= 0 && Config.onUrlRewriteTimeout.action <= toutActUseConfiguredResponse);
+
+ dump_time_t(entry, name, Config.Timeout.urlRewrite);
+ storeAppendPrintf(entry, " on_timeout=%s", onTimedOutActions[Config.onUrlRewriteTimeout.action]);
+
+ if (Config.onUrlRewriteTimeout.response)
+ storeAppendPrintf(entry, " response=\"%s\"", Config.onUrlRewriteTimeout.response);
+
+ storeAppendPrintf(entry, "\n");
+}
+
+static void
+free_url_rewrite_timeout(SquidConfig *config)
+{
+ Config.Timeout.urlRewrite = 0;
+ Config.onUrlRewriteTimeout.action = 0;
+ xfree(Config.onUrlRewriteTimeout.response);
+ Config.onUrlRewriteTimeout.response = NULL;
+}
+
static void
parse_configuration_includes_quoted_values(bool *recognizeQuotedValues)
{
time_t
tristate
uri_whitespace
+url_rewrite_timeout
u_short
wccp2_method
wccp2_amethod
sent before the required macro information is available to Squid.
DOC_END
+NAME: url_rewrite_timeout
+TYPE: url_rewrite_timeout
+LOC: Config
+DEFAULT: none
+DEFAULT_DOC: Squid waits for the helper response forever
+DOC_START
+ Squid times active requests to redirector. The timeout value and Squid
+ reaction to a timed out request are configurable using the following
+ format:
+
+ url_rewrite_timeout timeout time-units on_timeout=<action> [response=<quoted-response>]
+
+ supported timeout actions:
+ fail Squid return a ERR_GATEWAY_FAILURE error page
+
+ bypass Do not re-write the URL
+
+ retry Send the lookup to the helper again
+
+ use_configured_response Use the <quoted-response> as
+ helper response
+DOC_END
+
COMMENT_START
OPTIONS FOR STORE ID
-----------------------------------------------------------------------------
{
http_access_done = false;
redirect_done = false;
- redirect_fail_count = 0;
store_id_done = false;
- store_id_fail_count = 0;
no_cache_done = false;
interpreted_req_hdrs = false;
#if USE_OPENSSL
UpdateRequestNotes(http->getConn(), *old_request, reply.notes);
switch (reply.result) {
+ case Helper::TimedOut:
+ if (Config.onUrlRewriteTimeout.action != toutActBypass) {
+ http->calloutsError(ERR_GATEWAY_FAILURE, ERR_DETAIL_REDIRECTOR_TIMEDOUT);
+ debugs(85, DBG_IMPORTANT, "ERROR: URL rewrite helper: Timedout");
+ }
+ break;
+
case Helper::Unknown:
case Helper::TT:
// Handler in redirect.cc should have already mapped Unknown
break;
case Helper::BrokenHelper:
- debugs(85, DBG_IMPORTANT, "ERROR: URL rewrite helper: " << reply << ", attempt #" << (redirect_fail_count+1) << " of 2");
- if (redirect_fail_count < 2) { // XXX: make this configurable ?
- ++redirect_fail_count;
- // reset state flag to try redirector again from scratch.
- redirect_done = false;
- }
+ debugs(85, DBG_IMPORTANT, "ERROR: URL rewrite helper: " << reply);
break;
case Helper::Error:
debugs(85, DBG_IMPORTANT, "ERROR: storeID helper returned invalid result code. Wrong helper? " << reply);
break;
+ case Helper::TimedOut:
+ // Timeouts for storeID are not implemented
case Helper::BrokenHelper:
- debugs(85, DBG_IMPORTANT, "ERROR: storeID helper: " << reply << ", attempt #" << (store_id_fail_count+1) << " of 2");
- if (store_id_fail_count < 2) { // XXX: make this configurable ?
- ++store_id_fail_count;
- // reset state flag to try StoreID again from scratch.
- store_id_done = false;
- }
+ debugs(85, DBG_IMPORTANT, "ERROR: storeID helper: " << reply);
break;
case Helper::Error:
clientReplyContext *repContext = dynamic_cast<clientReplyContext *>(node->data.getRaw());
assert(repContext);
+ calloutsError(ERR_ICAP_FAILURE, errDetail);
+
+ if (calloutContext)
+ doCallouts();
+}
+
+// XXX: modify and use with ClientRequestContext::clientAccessCheckDone too.
+void
+ClientHttpRequest::calloutsError(const err_type error, const int errDetail)
+{
// The original author of the code also wanted to pass an errno to
// setReplyToError, but it seems unlikely that the errno reflects the
// true cause of the error at this point, so I did not pass it.
Ip::Address noAddr;
noAddr.setNoAddr();
ConnStateData * c = getConn();
- calloutContext->error = clientBuildError(ERR_ICAP_FAILURE, Http::scInternalServerError,
+ calloutContext->error = clientBuildError(error, Http::scInternalServerError,
NULL,
c != NULL ? c->clientConnection->remote : noAddr,
request
calloutContext->readNextRequest = true;
if (c != NULL)
c->expectNoForwarding();
- doCallouts();
}
//else if(calloutContext == NULL) is it possible?
}
ClientRequestContext *calloutContext;
void doCallouts();
+ /// Build an error reply. For use with the callouts.
+ void calloutsError(const err_type error, const int errDetail);
+
#if USE_ADAPTATION
// AsyncJob virtual methods
virtual bool doneAll() const {
public:
void startAdaptation(const Adaptation::ServiceGroupPointer &g);
- // private but exposed for ClientRequestContext
+private:
+ /// Handles an adaptation client request failure.
+ /// Bypasses the error if possible, or build an error reply.
void handleAdaptationFailure(int errDetail, bool bypassable = false);
-private:
// Adaptation::Initiator API
virtual void noteAdaptationAnswer(const Adaptation::Answer &answer);
void handleAdaptedHeader(HttpMsg *msg);
typedef enum {
ERR_DETAIL_NONE,
ERR_DETAIL_START = 100000, // to avoid clashes with most OS error numbers
+ ERR_DETAIL_REDIRECTOR_TIMEDOUT, // External redirector request timed-out
ERR_DETAIL_CLT_REQMOD_ABORT = ERR_DETAIL_START, // client-facing code detected transaction abort
ERR_DETAIL_CLT_REQMOD_REQ_BODY, // client-facing code detected REQMOD request body adaptation failure
ERR_DETAIL_CLT_REQMOD_RESP_BODY, // client-facing code detected REQMOD satisfaction reply body failure
#include "helper/Reply.h"
#include "helper/Request.h"
#include "MemBuf.h"
+#include "SquidConfig.h"
#include "SquidIpc.h"
#include "SquidMath.h"
#include "SquidTime.h"
#define HELPER_MAX_ARGS 64
+/// The maximum allowed request retries.
+#define MAX_RETRIES 2
+
/** Initial Squid input buffer size. Helper responses may exceed this, and
* Squid will grow the input buffer as needed, up to ReadBufMaxSize.
*/
stats.replies=0;
stats.pending=0;
stats.releases=0;
+ stats.timedout = 0;
}
void
srv->rbuf = (char *)memAllocBuf(ReadBufMinSize, &srv->rbuf_sz);
srv->wqueue = new MemBuf;
srv->roffset = 0;
- srv->requests = (Helper::Request **)xcalloc(hlp->childs.concurrency ? hlp->childs.concurrency : 1, sizeof(*srv->requests));
+ srv->nextRequestId = 0;
srv->parent = cbdataReference(hlp);
dlinkAddTail(srv, &srv->link, &hlp->servers);
AsyncCall::Pointer closeCall = asyncCall(5,4, "helperServerFree", cbdataDialer(helperServerFree, srv));
comm_add_close_handler(rfd, closeCall);
+ if (hlp->timeout && hlp->childs.concurrency){
+ AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "helper_server::requestTimeout",
+ CommTimeoutCbPtrFun(helper_server::requestTimeout, srv));
+ commSetConnTimeout(srv->readPipe, hlp->timeout, timeoutCall);
+ }
+
+
AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
CommIoCbPtrFun(helperHandleRead, srv));
comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
helperStatefulKickQueue(hlp);
}
+void
+helper::submitRequest(Helper::Request *r)
+{
+ helper_server *srv;
+
+ if ((srv = GetFirstAvailable(this)))
+ helperDispatch(srv, r);
+ else
+ Enqueue(this, r);
+
+ if (!queueFull()) {
+ full_time = 0;
+ } else if (!full_time) {
+ debugs(84, 3, id_name << " queue became full");
+ full_time = squid_curtime;
+ }
+}
+
void
helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
{
helper::submit(const char *buf, HLPCB * callback, void *data)
{
Helper::Request *r = new Helper::Request(callback, data, buf);
- helper_server *srv;
-
- if ((srv = GetFirstAvailable(this)))
- helperDispatch(srv, r);
- else
- Enqueue(this, r);
-
+ submitRequest(r);
debugs(84, DBG_DATA, Raw("buf", buf, strlen(buf)));
-
- if (!queueFull()) {
- full_time = 0;
- } else if (!full_time) {
- debugs(84, 3, id_name << " queue became full");
- full_time = squid_curtime;
- }
}
/// lastserver = "server last used as part of a reserved request sequence"
hlp->stats.requests);
storeAppendPrintf(sentry, "replies received: %d\n",
hlp->stats.replies);
+ storeAppendPrintf(sentry, "requests timedout: %d\n",
+ hlp->stats.timedout);
storeAppendPrintf(sentry, "queue length: %d\n",
hlp->stats.queue_size);
storeAppendPrintf(sentry, "avg service time: %d msec\n",
hlp->stats.avg_svc_time);
storeAppendPrintf(sentry, "\n");
- storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%11s\t%s\t%7s\t%7s\t%7s\n",
+ storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%11s\t%11s\t%s\t%7s\t%7s\t%7s\n",
"ID #",
"FD",
"PID",
"# Requests",
"# Replies",
+ "# Timed-out",
"Flags",
"Time",
"Offset",
for (dlink_node *link = hlp->servers.head; link; link = link->next) {
helper_server *srv = (helper_server*)link->data;
- double tt = 0.001 * (srv->requests[0] ? tvSubMsec(srv->requests[0]->dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
- storeAppendPrintf(sentry, "%7u\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c\t%7.3f\t%7d\t%s\n",
+ Helper::Request *request = srv->requests.empty() ? NULL : srv->requests.front();
+ double tt = 0.001 * (request ? tvSubMsec(request->dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
+ storeAppendPrintf(sentry, "%7u\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c\t%7.3f\t%7d\t%s\n",
srv->index.value,
srv->readPipe->fd,
srv->pid,
srv->stats.uses,
srv->stats.replies,
+ srv->stats.timedout,
srv->stats.pending ? 'B' : ' ',
srv->flags.writing ? 'W' : ' ',
srv->flags.closing ? 'C' : ' ',
srv->flags.shutdown ? 'S' : ' ',
tt < 0.0 ? 0.0 : tt,
(int) srv->roffset,
- srv->requests[0] ? Format::QuoteMimeBlob(srv->requests[0]->buf) : "(none)");
+ request ? Format::QuoteMimeBlob(request->buf) : "(none)");
}
storeAppendPrintf(sentry, "\nFlags key:\n\n");
{
helper *hlp = srv->parent;
Helper::Request *r;
- int i, concurrency = hlp->childs.concurrency;
+ int concurrency = hlp->childs.concurrency;
if (!concurrency)
concurrency = 1;
}
}
- for (i = 0; i < concurrency; ++i) {
+ while (!srv->requests.empty()) {
// XXX: re-schedule these on another helper?
- if ((r = srv->requests[i])) {
- void *cbdata;
-
- if (cbdataReferenceValidDone(r->data, &cbdata)) {
- Helper::Reply nilReply;
- r->callback(cbdata, nilReply);
- }
-
- delete r;
+ r = srv->requests.front();
+ srv->requests.pop_front();
+ void *cbdata;
- srv->requests[i] = NULL;
+ if (cbdataReferenceValidDone(r->data, &cbdata)) {
+ Helper::Reply nilReply;
+ r->callback(cbdata, nilReply);
}
+
+ delete r;
}
- safe_free(srv->requests);
+ srv->requestsIndex.clear();
cbdataReferenceDone(srv->parent);
delete srv;
static void
helperReturnBuffer(int request_number, helper_server * srv, helper * hlp, char * msg, char * msg_end)
{
- Helper::Request *r = srv->requests[request_number];
+ Helper::Request *r = NULL;
+ helper_server::RequestIndex::iterator it;
+ if (hlp->childs.concurrency) {
+ // If concurency supported retrieve request from ID
+ it = srv->requestsIndex.find(request_number);
+ if (it != srv->requestsIndex.end()) {
+ r = *(it->second);
+ srv->requests.erase(it->second);
+ srv->requestsIndex.erase(it);
+ }
+ } else if(!srv->requests.empty()) {
+ // Else get the first request from queue, if any
+ r = srv->requests.front();
+ srv->requests.pop_front();
+ }
+
if (r) {
HLPCB *callback = r->callback;
-
- srv->requests[request_number] = NULL;
-
r->callback = NULL;
void *cbdata = NULL;
+ bool retry = false;
if (cbdataReferenceValidDone(r->data, &cbdata)) {
Helper::Reply response(msg, (msg_end-msg));
- callback(cbdata, response);
+ if (response.result == Helper::BrokenHelper && r->retries < MAX_RETRIES) {
+ debugs(84, DBG_IMPORTANT, "ERROR: helper: " << response << ", attempt #" << (r->retries + 1) << " of 2");
+ retry = true;
+ } else
+ callback(cbdata, response);
}
-- srv->stats.pending;
tvSubMsec(r->dispatch_time, current_time),
hlp->stats.replies, REDIRECT_AV_FACTOR);
- delete r;
+ if (retry) {
+ ++r->retries;
+ hlp->submitRequest(r);
+ } else
+ delete r;
+ } else if (srv->stats.timedout){
+ debugs(84, 3, "Timedout reply received for request-ID: " << request_number << " , ignore");
} else {
debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected reply on channel " <<
request_number << " from " << hlp->id_name << " #" << srv->index <<
" '" << srv->rbuf << "'");
}
+ if (hlp->timeout && hlp->childs.concurrency)
+ srv->checkForTimedOutRequests(hlp->retryTimedOut);
+
if (!srv->flags.shutdown) {
helperKickQueue(hlp);
} else if (!srv->flags.closing && !srv->stats.pending) {
srv->rbuf[srv->roffset] = '\0';
debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
- if (!srv->stats.pending) {
+ if (!srv->stats.pending && !srv->stats.timedout) {
/* someone spoke without being spoken to */
debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected read from " <<
hlp->id_name << " #" << srv->index << ", " << (int)len <<
helperDispatch(helper_server * srv, Helper::Request * r)
{
helper *hlp = srv->parent;
- Helper::Request **ptr = NULL;
- unsigned int slot;
+ const uint64_t reqId = ++srv->nextRequestId;
if (!cbdataReferenceValid(r->data)) {
debugs(84, DBG_IMPORTANT, "helperDispatch: invalid callback data");
return;
}
- for (slot = 0; slot < (hlp->childs.concurrency ? hlp->childs.concurrency : 1); ++slot) {
- if (!srv->requests[slot]) {
- ptr = &srv->requests[slot];
- break;
- }
- }
-
- assert(ptr);
- *ptr = r;
+ r->Id = reqId;
+ helper_server::Requests::iterator it = srv->requests.insert(srv->requests.end(), r);
r->dispatch_time = current_time;
if (srv->wqueue->isNull())
srv->wqueue->init();
- if (hlp->childs.concurrency)
- srv->wqueue->Printf("%d %s", slot, r->buf);
- else
+ if (hlp->childs.concurrency) {
+ srv->requestsIndex.insert(helper_server::RequestIndex::value_type(reqId, it));
+ assert(srv->requestsIndex.size() == srv->requests.size());
+ srv->wqueue->Printf("%" PRIu64 " %s", reqId, r->buf);
+ } else
srv->wqueue->append(r->buf, strlen(r->buf));
if (!srv->flags.writing) {
return true;
}
+
+void
+helper_server::checkForTimedOutRequests(bool const retry)
+{
+ assert(parent->childs.concurrency);
+ while(!requests.empty() && requests.front()->timedOut(parent->timeout)) {
+ Helper::Request *r = requests.front();
+ RequestIndex::iterator it;
+ it = requestsIndex.find(r->Id);
+ assert(it != requestsIndex.end());
+ requestsIndex.erase(it);
+ requests.pop_front();
+ debugs(84, 2, "Request " << r->Id << " timed-out, remove it from queue");
+ void *cbdata;
+ bool retried = false;
+ if (retry && r->retries < MAX_RETRIES && cbdataReferenceValid(r->data)) {
+ debugs(84, 2, "Retry request " << r->Id);
+ ++r->retries;
+ parent->submitRequest(r);
+ retried = true;
+ } else if (cbdataReferenceValidDone(r->data, &cbdata)) {
+ if (!parent->onTimedOutResponse.isEmpty()) {
+ // Helper::Reply needs a non const buffer
+ char *replyMsg = xstrdup(parent->onTimedOutResponse.c_str());
+ r->callback(cbdata, Helper::Reply(replyMsg, strlen(replyMsg)));
+ xfree(replyMsg);
+ } else
+ r->callback(cbdata, Helper::Reply(Helper::TimedOut));
+ }
+ --stats.pending;
+ ++stats.timedout;
+ ++parent->stats.timedout;
+ if (!retried)
+ delete r;
+ }
+}
+
+void
+helper_server::requestTimeout(const CommTimeoutCbParams &io)
+{
+ debugs(26, 3, HERE << io.conn);
+ helper_server *srv = static_cast<helper_server *>(io.data);
+
+ if (!cbdataReferenceValid(srv))
+ return;
+
+ srv->checkForTimedOutRequests(srv->parent->retryTimedOut);
+
+ debugs(84, 3, HERE << io.conn << " establish new helper_server::requestTimeout");
+ AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "helper_server::requestTimeout",
+ CommTimeoutCbPtrFun(helper_server::requestTimeout, srv));
+
+ const int timeSpent = srv->requests.empty() ? 0 : (squid_curtime - srv->requests.front()->dispatch_time.tv_sec);
+ const int timeLeft = max(1, (static_cast<int>(srv->parent->timeout) - timeSpent));
+
+ commSetConnTimeout(io.conn, timeLeft, timeoutCall);
+}
#include "helper/ChildConfig.h"
#include "helper/forward.h"
#include "ip/Address.h"
+#include "SBuf.h"
+
+#include <list>
+#include <map>
/**
* Managers a set of individual helper processes with a common queue of requests.
full_time(0),
last_queue_warn(0),
last_restart(0),
+ timeout(0),
+ retryTimedOut(false),
+ retryBrokenHelper(false),
eom('\n') {
memset(&stats, 0, sizeof(stats));
}
///< If not full, submit request. Otherwise, either kill Squid or return false.
bool trySubmit(const char *buf, HLPCB * callback, void *data);
+ /// Submits a request to the helper or add it to the queue if none of
+ /// the servers is available.
+ void submitRequest(Helper::Request *r);
public:
wordlist *cmdline;
dlink_list servers;
time_t full_time; ///< when a full helper became full (zero for non-full helpers)
time_t last_queue_warn;
time_t last_restart;
+ time_t timeout; ///< Requests timeout
+ bool retryTimedOut; ///< Whether the timed-out requests must retried
+ bool retryBrokenHelper; ///< Whether the requests must retried on BH replies
+ SBuf onTimedOutResponse; ///< The response to use when helper response timedout
char eom; ///< The char which marks the end of (response) message, normally '\n'
struct _stats {
int requests;
int replies;
+ int timedout;
int queue_size;
int avg_svc_time;
} stats;
uint64_t replies; //< replies received from this helper
uint64_t pending; //< queued lookups waiting to be sent to this helper
uint64_t releases; //< times release() has been called on this helper (if stateful)
+ uint64_t timedout; //< requests which timed-out
} stats;
void initStats();
};
class MemBuf;
+class CommTimeoutCbParams;
class helper_server : public HelperServerBase
{
CBDATA_CLASS(helper_server);
public:
+ uint64_t nextRequestId;
+
MemBuf *wqueue;
MemBuf *writebuf;
helper *parent;
- Helper::Request **requests;
+
+ typedef std::list<Helper::Request *> Requests;
+ Requests requests; ///< requests in order of submission/expiration
+
+ // STL says storing std::list iterators is safe when changing the list
+ typedef std::map<uint64_t, Requests::iterator> RequestIndex;
+ RequestIndex requestsIndex; ///< maps request IDs to requests
+
+ /// Run over the active requests lists and forces a retry, or timedout reply
+ /// or the configured "on timeout response" for timedout requests.
+ void checkForTimedOutRequests(bool const retry);
+
+ /// Read timeout handler
+ static void requestTimeout(const CommTimeoutCbParams &io);
};
class helper_stateful_server : public HelperServerBase
case Helper::TT:
os << "TT";
break;
+ case Helper::TimedOut:
+ os << "Timeout";
+ break;
case Helper::Unknown:
os << "Unknown";
break;
Reply &operator =(const Helper::Reply &r);
public:
- Reply() : result(Helper::Unknown), notes(), whichServer(NULL) {
+ explicit Reply(Helper::ResultCode res = Helper::Unknown) : result(res), notes(), whichServer(NULL) {
other_.init(1,1);
other_.terminate();
}
#define _SQUID_SRC_HELPER_REQUEST_H
#include "helper/forward.h"
+#include "SquidTime.h"
namespace Helper
{
buf(b ? xstrdup(b) : NULL),
callback(c),
data(cbdataReference(d)),
- placeholder(b == NULL)
+ placeholder(b == NULL),
+ Id(0),
+ retries(0)
{
memset(&dispatch_time, 0, sizeof(dispatch_time));
}
int placeholder; /* if 1, this is a dummy request waiting for a stateful helper to become available */
struct timeval dispatch_time;
+ uint64_t Id;
+ /**
+ * A helper may configured to retry timed out requests or on BH replies.
+ * We attempt to recover by trying the lookup again, but limit the
+ * number of retries to prevent lag and lockups.
+ * This tracks the number of previous failures for the request.
+ */
+ int retries;
+ bool timedOut(time_t timeout) {return (squid_curtime - dispatch_time.tv_sec) > timeout;}
};
} // namespace Helper
Okay, // "OK" indicating success/positive result
Error, // "ERR" indicating success/negative result
BrokenHelper, // "BH" indicating failure due to helper internal problems.
+ TimedOut, // Request timedout
// result codes for backward compatibility with NTLM/Negotiate
// TODO: migrate to a variant of the above results with kv-pair parameters
#include "squid.h"
#include "acl/Checklist.h"
+#include "cache_cf.h"
#include "client_side.h"
#include "client_side_reply.h"
#include "client_side_request.h"
redirectors->ipc_type = IPC_STREAM;
+ redirectors->timeout = Config.Timeout.urlRewrite;
+
+ redirectors->retryTimedOut = (Config.onUrlRewriteTimeout.action == toutActRetry);
+ redirectors->retryBrokenHelper = true; // XXX: make this configurable ?
+ redirectors->onTimedOutResponse.clear();
+ if (Config.onUrlRewriteTimeout.action == toutActUseConfiguredResponse)
+ redirectors->onTimedOutResponse.assign(Config.onUrlRewriteTimeout.response);
+
helperOpenServers(redirectors);
}
storeIds->ipc_type = IPC_STREAM;
+ storeIds->retryBrokenHelper = true; // XXX: make this configurable ?
+
helperOpenServers(storeIds);
}
#include "helper.h"
+enum TimeoutAction {toutActBypass, toutActFail, toutActRetry, toutActUseConfiguredResponse};
+
class ClientHttpRequest;
void redirectInit(void);
# variables initialization
my %helpers=();
+my $helpers_running = 0;
my $rvec='';
vec($rvec,0,1)=1; #stdin
my $nfound;
print STDERR "nothing read from stdin\n";
exit 0;
}
- foreach $req (split("\n",$_)) {
- dispatch_request($_);
+ foreach $req (split("\n",$_ )) {
+ dispatch_request($req);
}
}
# find out if any filedesc was closed
#print STDERR "dispatching request $_";
$line =~ /^(\d+) (.*)$/;
- my $slot=$1;
+ my $reqId=$1;
my $req=$2;
- if (!exists($helpers{$slot})) {
- $helpers{$slot}=init_subprocess();
+ undef $h;
+ # Find a free helper
+ foreach $slot ( 1 .. ($helpers_running)) {
+ if (!defined($helpers{$slot}->{lastcmd})) {
+ $h = $helpers{$slot};
+ last;
+ }
+ }
+ # If none create one
+ if (!defined($h)) {
+ $helpers_running = $helpers_running + 1;
+ $helpers{$helpers_running}=init_subprocess();
+ $h = $helpers{$helpers_running};
+ # print STDERR "Now $helpers_running helpers running\n";
}
- $h=$helpers{$slot};
+
$wh=$h->{wh};
$rh=$h->{rh};
$h->{lastcmd}=$req;
+ $h->{reqId}=$reqId;
print $wh "$req\n";
}
my ($nread,$resp);
$nread=sysread($helpers{$h}->{rh},$resp,40960);
#print STDERR "got $resp from slot $h\n";
- print $h, " ", $resp;
+ print $helpers{$h}->{reqId}, " ", $resp;
delete $helpers{$h}->{lastcmd};
}