/// The maximum allowed request retries.
#define MAX_RETRIES 2
-/** Initial Squid input buffer size. Helper responses may exceed this, and
- * Squid will grow the input buffer as needed, up to ReadBufMaxSize.
- */
-const size_t ReadBufMinSize(4*1024);
+/// Helpers input buffer size.
+const size_t ReadBufSize(32*1024);
-/** Maximum safe size of a helper-to-Squid response message plus one.
- * Squid will warn and close the stream if a helper sends a too-big response.
- * ssl_crtd helper is known to produce responses of at least 10KB in size.
- * Some undocumented helpers are known to produce responses exceeding 8KB.
- */
-const size_t ReadBufMaxSize(32*1024);
static IOCB helperHandleRead;
static IOCB helperStatefulHandleRead;
static void helperServerFree(helper_server *srv);
static void helperStatefulServerFree(helper_stateful_server *srv);
-static void Enqueue(helper * hlp, Helper::Request *);
+static void Enqueue(helper * hlp, Helper::Xaction *);
static helper_server *GetFirstAvailable(helper * hlp);
static helper_stateful_server *StatefulGetFirstAvailable(statefulhelper * hlp);
-static void helperDispatch(helper_server * srv, Helper::Request * r);
-static void helperStatefulDispatch(helper_stateful_server * srv, Helper::Request * r);
+static void helperDispatch(helper_server * srv, Helper::Xaction * r);
+static void helperStatefulDispatch(helper_stateful_server * srv, Helper::Xaction * r);
static void helperKickQueue(helper * hlp);
static void helperStatefulKickQueue(statefulhelper * hlp);
static void helperStatefulServerDone(helper_stateful_server * srv);
-static void StatefulEnqueue(statefulhelper * hlp, Helper::Request * r);
+static void StatefulEnqueue(statefulhelper * hlp, Helper::Xaction * r);
CBDATA_CLASS_INIT(helper);
CBDATA_CLASS_INIT(helper_server);
srv->readPipe->fd = rfd;
srv->writePipe = new Comm::Connection;
srv->writePipe->fd = wfd;
- srv->rbuf = (char *)memAllocBuf(ReadBufMinSize, &srv->rbuf_sz);
+ srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
srv->wqueue = new MemBuf;
srv->roffset = 0;
srv->nextRequestId = 0;
+ srv->replyXaction = NULL;
srv->parent = cbdataReference(hlp);
dlinkAddTail(srv, &srv->link, &hlp->servers);
srv->readPipe->fd = rfd;
srv->writePipe = new Comm::Connection;
srv->writePipe->fd = wfd;
- srv->rbuf = (char *)memAllocBuf(ReadBufMinSize, &srv->rbuf_sz);
+ srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
srv->roffset = 0;
srv->parent = cbdataReference(hlp);
}
void
-helper::submitRequest(Helper::Request *r)
+helper::submitRequest(Helper::Xaction *r)
{
helper_server *srv;
{
if (hlp == NULL) {
debugs(84, 3, "helperSubmit: hlp == NULL");
- Helper::Reply nilReply;
+ Helper::Reply const nilReply(Helper::Unknown);
callback(data, nilReply);
return;
}
void
helper::submit(const char *buf, HLPCB * callback, void *data)
{
- Helper::Request *r = new Helper::Request(callback, data, buf);
+ Helper::Xaction *r = new Helper::Xaction(callback, data, buf);
submitRequest(r);
debugs(84, DBG_DATA, Raw("buf", buf, strlen(buf)));
}
{
if (hlp == NULL) {
debugs(84, 3, "helperStatefulSubmit: hlp == NULL");
- Helper::Reply nilReply;
+ Helper::Reply const nilReply(Helper::Unknown);
callback(data, nilReply);
return;
}
void statefulhelper::submit(const char *buf, HLPCB * callback, void *data, helper_stateful_server * lastserver)
{
- Helper::Request *r = new Helper::Request(callback, data, buf);
+ Helper::Xaction *r = new Helper::Xaction(callback, data, buf);
if ((buf != NULL) && lastserver) {
debugs(84, 5, "StatefulSubmit with lastserver " << lastserver);
StatefulEnqueue(this, r);
}
- debugs(84, DBG_DATA, "placeholder: '" << r->placeholder <<
+ debugs(84, DBG_DATA, "placeholder: '" << r->request.placeholder <<
"', " << Raw("buf", buf, (!buf?0:strlen(buf))));
if (!queueFull()) {
for (dlink_node *link = servers.head; link; link = link->next) {
HelperServerBase *srv = static_cast<HelperServerBase *>(link->data);
assert(srv);
- Helper::Request *request = srv->requests.empty() ? NULL : srv->requests.front();
- double tt = 0.001 * (request ? tvSubMsec(request->dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
+ Helper::Xaction *xaction = srv->requests.empty() ? NULL : srv->requests.front();
+ double tt = 0.001 * (xaction ? tvSubMsec(xaction->request.dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
p->appendf("%7u\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
srv->index.value,
srv->readPipe->fd,
srv->flags.closing ? 'C' : ' ',
srv->flags.reserved ? 'R' : ' ',
srv->flags.shutdown ? 'S' : ' ',
- request && request->placeholder ? 'P' : ' ',
+ xaction && xaction->request.placeholder ? 'P' : ' ',
tt < 0.0 ? 0.0 : tt,
(int) srv->roffset,
- request ? Format::QuoteMimeBlob(request->buf) : "(none)");
+ xaction ? Format::QuoteMimeBlob(xaction->request.buf) : "(none)");
}
p->append("\nFlags key:\n"
while (!srv->requests.empty()) {
// XXX: re-schedule these on another helper?
- Helper::Request *r = srv->requests.front();
+ Helper::Xaction *r = srv->requests.front();
srv->requests.pop_front();
void *cbdata;
- if (cbdataReferenceValidDone(r->data, &cbdata)) {
- Helper::Reply nilReply;
- r->callback(cbdata, nilReply);
+ if (cbdataReferenceValidDone(r->request.data, &cbdata)) {
+ r->reply.result = Helper::Unknown;
+ r->request.callback(cbdata, r->reply);
}
delete r;
while (!srv->requests.empty()) {
// XXX: re-schedule these on another helper?
- Helper::Request *r = srv->requests.front();
+ Helper::Xaction *r = srv->requests.front();
srv->requests.pop_front();
void *cbdata;
- if (cbdataReferenceValidDone(r->data, &cbdata)) {
- Helper::Reply nilReply;
- r->callback(cbdata, nilReply);
+ if (cbdataReferenceValidDone(r->request.data, &cbdata)) {
+ r->reply.result = Helper::Unknown;
+ r->request.callback(cbdata, r->reply);
}
delete r;
delete srv;
}
-/// Calls back with a pointer to the buffer with the helper output
-static void
-helperReturnBuffer(int request_number, helper_server * srv, helper * hlp, char * msg, char * msg_end)
+Helper::Xaction *
+helper_server::popRequest(int request_number)
{
- Helper::Request *r = NULL;
+ Helper::Xaction *r = nullptr;
helper_server::RequestIndex::iterator it;
- if (hlp->childs.concurrency) {
+ if (parent->childs.concurrency) {
// If concurency supported retrieve request from ID
- it = srv->requestsIndex.find(request_number);
- if (it != srv->requestsIndex.end()) {
+ it = requestsIndex.find(request_number);
+ if (it != requestsIndex.end()) {
r = *(it->second);
- srv->requests.erase(it->second);
- srv->requestsIndex.erase(it);
+ requests.erase(it->second);
+ requestsIndex.erase(it);
}
- } else if(!srv->requests.empty()) {
+ } else if(!requests.empty()) {
// Else get the first request from queue, if any
- r = srv->requests.front();
- srv->requests.pop_front();
+ r = requests.front();
+ requests.pop_front();
}
- if (r) {
- HLPCB *callback = r->callback;
- r->callback = NULL;
+ return r;
+}
+
+/// Calls back with a pointer to the buffer with the helper output
+static void
+helperReturnBuffer(helper_server * srv, helper * hlp, char * msg, size_t msgSize, char * msgEnd)
+{
+ if (Helper::Xaction *r = srv->replyXaction) {
+ const bool hasSpace = r->reply.accumulate(msg, msgSize);
+ if (!hasSpace) {
+ debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
+ "helper that overflowed " << srv->rbuf_sz << "-byte " <<
+ "Squid input buffer: " << hlp->id_name << " #" << srv->index);
+ srv->closePipesSafely(hlp->id_name);
+ return;
+ }
+
+ if (!msgEnd)
+ return; // We are waiting for more data.
+
+ HLPCB *callback = r->request.callback;
+ r->request.callback = nullptr;
- void *cbdata = NULL;
bool retry = false;
- if (cbdataReferenceValidDone(r->data, &cbdata)) {
- Helper::Reply response(msg, (msg_end-msg));
- if (response.result == Helper::BrokenHelper && r->retries < MAX_RETRIES) {
- debugs(84, DBG_IMPORTANT, "ERROR: helper: " << response << ", attempt #" << (r->retries + 1) << " of 2");
+ void *cbdata = nullptr;
+ if (cbdataReferenceValidDone(r->request.data, &cbdata)) {
+ r->reply.finalize();
+ if (r->reply.result == Helper::BrokenHelper && r->request.retries < MAX_RETRIES) {
+ debugs(84, DBG_IMPORTANT, "ERROR: helper: " << r->reply << ", attempt #" << (r->request.retries + 1) << " of 2");
retry = true;
} else
- callback(cbdata, response);
+ callback(cbdata, r->reply);
}
-- srv->stats.pending;
srv->answer_time = current_time;
- srv->dispatch_time = r->dispatch_time;
+ srv->dispatch_time = r->request.dispatch_time;
hlp->stats.avg_svc_time =
Math::intAverage(hlp->stats.avg_svc_time,
- tvSubMsec(r->dispatch_time, current_time),
+ tvSubMsec(r->request.dispatch_time, current_time),
hlp->stats.replies, REDIRECT_AV_FACTOR);
+ // release or re-submit parsedRequestXaction object
+ srv->replyXaction = nullptr;
if (retry) {
- ++r->retries;
+ ++r->request.retries;
hlp->submitRequest(r);
} else
delete r;
- } else if (srv->stats.timedout) {
- debugs(84, 3, "Timedout reply received for request-ID: " << request_number << " , ignore");
- } else {
- debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected reply on channel " <<
- request_number << " from " << hlp->id_name << " #" << srv->index <<
- " '" << srv->rbuf << "'");
}
if (hlp->timeout && hlp->childs.concurrency)
static void
helperHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
{
- char *t = NULL;
helper_server *srv = (helper_server *)data;
helper *hlp = srv->parent;
assert(cbdataReferenceValid(data));
srv->rbuf[0] = '\0';
}
- while ((t = strchr(srv->rbuf, hlp->eom))) {
- /* end of reply found */
- char *msg = srv->rbuf;
- int i = 0;
- int skip = 1;
- debugs(84, 3, "helperHandleRead: end of reply found");
-
- if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n') {
- *t = '\0';
- // rewind to the \r octet which is the real terminal now
- // and remember that we have to skip forward 2 places now.
- skip = 2;
- --t;
+ bool needsMore = false;
+ char *msg = srv->rbuf;
+ while (*msg && !needsMore) {
+ int skip = 0;
+ char *eom = strchr(msg, hlp->eom);
+ if (eom) {
+ skip = 1;
+ debugs(84, 3, "helperHandleRead: end of reply found");
+ if (eom > msg && eom[-1] == '\r' && hlp->eom == '\n') {
+ *eom = '\0';
+ // rewind to the \r octet which is the real terminal now
+ // and remember that we have to skip forward 2 places now.
+ skip = 2;
+ --eom;
+ }
+ *eom = '\0';
}
- *t = '\0';
-
- if (hlp->childs.concurrency) {
- i = strtol(msg, &msg, 10);
-
- while (*msg && xisspace(*msg))
- ++msg;
- }
+ if (!srv->replyXaction) {
+ int i = 0;
+ if (hlp->childs.concurrency) {
+ char *e = NULL;
+ i = strtol(msg, &e, 10);
+ // Do we need to check for e == msg? Means wrong response from helper.
+ // Will be droped as "unexpected reply on channel 0"
+ needsMore = !(xisspace(*e) || (eom && e == eom));
+ if (!needsMore) {
+ msg = e;
+ while (*msg && xisspace(*msg))
+ ++msg;
+ } // else not enough data to compute request number
+ }
+ if (!(srv->replyXaction = srv->popRequest(i))) {
+ if (srv->stats.timedout) {
+ debugs(84, 3, "Timedout reply received for request-ID: " << i << " , ignore");
+ } else {
+ debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected reply on channel " <<
+ i << " from " << hlp->id_name << " #" << srv->index <<
+ " '" << srv->rbuf << "'");
+ }
+ }
+ } // else we need to just append reply data to the current Xaction
+
+ if (!needsMore) {
+ size_t msgSize = eom ? eom - msg : (srv->roffset - (msg - srv->rbuf));
+ assert(msgSize <= srv->rbuf_sz);
+ helperReturnBuffer(srv, hlp, msg, msgSize, eom);
+ msg += msgSize + skip;
+ assert(static_cast<size_t>(msg - srv->rbuf) <= srv->rbuf_sz);
+ } else
+ assert(skip == 0 && eom == NULL);
+ }
- helperReturnBuffer(i, srv, hlp, msg, t);
- srv->roffset -= (t - srv->rbuf) + skip;
- memmove(srv->rbuf, t + skip, srv->roffset);
+ if (needsMore) {
+ size_t msgSize = (srv->roffset - (msg - srv->rbuf));
+ assert(msgSize <= srv->rbuf_sz);
+ memmove(srv->rbuf, msg, msgSize);
+ srv->roffset = msgSize;
srv->rbuf[srv->roffset] = '\0';
+ } else {
+ // All of the responses parsed and msg points at the end of read data
+ assert(static_cast<size_t>(msg - srv->rbuf) == srv->roffset);
+ srv->roffset = 0;
}
if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
int spaceSize = srv->rbuf_sz - srv->roffset - 1;
assert(spaceSize >= 0);
- // grow the input buffer if needed and possible
- if (!spaceSize && srv->rbuf_sz + 4096 <= ReadBufMaxSize) {
- srv->rbuf = (char *)memReallocBuf(srv->rbuf, srv->rbuf_sz + 4096, &srv->rbuf_sz);
- debugs(84, 3, HERE << "Grew read buffer to " << srv->rbuf_sz);
- spaceSize = srv->rbuf_sz - srv->roffset - 1;
- assert(spaceSize >= 0);
- }
-
- // quit reading if there is no space left
- if (!spaceSize) {
- debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
- "helper that overflowed " << srv->rbuf_sz << "-byte " <<
- "Squid input buffer: " << hlp->id_name << " #" << srv->index);
- srv->closePipesSafely(hlp->id_name);
- return;
- }
-
AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
CommIoCbPtrFun(helperHandleRead, srv));
comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
srv->roffset += len;
srv->rbuf[srv->roffset] = '\0';
- Helper::Request *r = srv->requests.front();
+ Helper::Xaction *r = srv->requests.front();
debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
if (r == NULL) {
}
if ((t = strchr(srv->rbuf, hlp->eom))) {
- /* end of reply found */
- srv->requests.pop_front(); // we already have it in 'r'
- int called = 1;
- int skip = 1;
debugs(84, 3, "helperStatefulHandleRead: end of reply found");
if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n') {
*t = '\0';
// rewind to the \r octet which is the real terminal now
- // and remember that we have to skip forward 2 places now.
- skip = 2;
--t;
}
*t = '\0';
+ }
+
+ if (!r->reply.accumulate(srv->rbuf, t ? (t - srv->rbuf) : srv->roffset)) {
+ debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
+ "helper that overflowed " << srv->rbuf_sz << "-byte " <<
+ "Squid input buffer: " << hlp->id_name << " #" << srv->index);
+ srv->closePipesSafely(hlp->id_name);
+ return;
+ }
+ /**
+ * BUG: the below assumes that only one response per read() was received and discards any octets remaining.
+ * Doing this prohibits concurrency support with multiple replies per read().
+ * TODO: check that read() setup on these buffers pays attention to roffest!=0
+ * TODO: check that replies bigger than the buffer are discarded and do not to affect future replies
+ */
+ srv->roffset = 0;
+
+ if (t) {
+ /* end of reply found */
+ srv->requests.pop_front(); // we already have it in 'r'
+ int called = 1;
- if (r && cbdataReferenceValid(r->data)) {
- Helper::Reply res(srv->rbuf, (t - srv->rbuf));
- res.whichServer = srv;
- r->callback(r->data, res);
+ if (r && cbdataReferenceValid(r->request.data)) {
+ r->reply.finalize();
+ r->reply.whichServer = srv;
+ r->request.callback(r->request.data, r->reply);
} else {
debugs(84, DBG_IMPORTANT, "StatefulHandleRead: no callback data registered");
called = 0;
}
- // only skip off the \0's _after_ passing its location in Helper::Reply above
- t += skip;
-
- /**
- * BUG: the below assumes that only one response per read() was received and discards any octets remaining.
- * Doing this prohibits concurrency support with multiple replies per read().
- * TODO: check that read() setup on these buffers pays attention to roffest!=0
- * TODO: check that replies bigger than the buffer are discarded and do not to affect future replies
- */
- srv->roffset = 0;
+
delete r;
-- srv->stats.pending;
}
if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
- int spaceSize = srv->rbuf_sz - srv->roffset - 1;
- assert(spaceSize >= 0);
-
- // grow the input buffer if needed and possible
- if (!spaceSize && srv->rbuf_sz + 4096 <= ReadBufMaxSize) {
- srv->rbuf = (char *)memReallocBuf(srv->rbuf, srv->rbuf_sz + 4096, &srv->rbuf_sz);
- debugs(84, 3, HERE << "Grew read buffer to " << srv->rbuf_sz);
- spaceSize = srv->rbuf_sz - srv->roffset - 1;
- assert(spaceSize >= 0);
- }
-
- // quit reading if there is no space left
- if (!spaceSize) {
- debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
- "helper that overflowed " << srv->rbuf_sz << "-byte " <<
- "Squid input buffer: " << hlp->id_name << " #" << srv->index);
- srv->closePipesSafely(hlp->id_name);
- return;
- }
+ int spaceSize = srv->rbuf_sz - 1;
AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
CommIoCbPtrFun(helperStatefulHandleRead, srv));
- comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
+ comm_read(srv->readPipe, srv->rbuf, spaceSize, call);
}
}
/// Handles a request when all running helpers, if any, are busy.
static void
-Enqueue(helper * hlp, Helper::Request * r)
+Enqueue(helper * hlp, Helper::Xaction * r)
{
hlp->queue.push(r);
++ hlp->stats.queue_size;
}
static void
-StatefulEnqueue(statefulhelper * hlp, Helper::Request * r)
+StatefulEnqueue(statefulhelper * hlp, Helper::Xaction * r)
{
hlp->queue.push(r);
++ hlp->stats.queue_size;
debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
}
-Helper::Request *
+Helper::Xaction *
helper::nextRequest()
{
if (queue.empty())
}
static void
-helperDispatch(helper_server * srv, Helper::Request * r)
+helperDispatch(helper_server * srv, Helper::Xaction * r)
{
helper *hlp = srv->parent;
const uint64_t reqId = ++srv->nextRequestId;
- if (!cbdataReferenceValid(r->data)) {
+ if (!cbdataReferenceValid(r->request.data)) {
debugs(84, DBG_IMPORTANT, "helperDispatch: invalid callback data");
delete r;
return;
}
- r->Id = reqId;
+ r->request.Id = reqId;
helper_server::Requests::iterator it = srv->requests.insert(srv->requests.end(), r);
- r->dispatch_time = current_time;
+ r->request.dispatch_time = current_time;
if (srv->wqueue->isNull())
srv->wqueue->init();
if (hlp->childs.concurrency) {
srv->requestsIndex.insert(helper_server::RequestIndex::value_type(reqId, it));
assert(srv->requestsIndex.size() == srv->requests.size());
- srv->wqueue->appendf("%" PRIu64 " %s", reqId, r->buf);
+ srv->wqueue->appendf("%" PRIu64 " %s", reqId, r->request.buf);
} else
- srv->wqueue->append(r->buf, strlen(r->buf));
+ srv->wqueue->append(r->request.buf, strlen(r->request.buf));
if (!srv->flags.writing) {
assert(NULL == srv->writebuf);
Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
}
- debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index << ", " << strlen(r->buf) << " bytes");
+ debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index << ", " << strlen(r->request.buf) << " bytes");
++ srv->stats.uses;
++ srv->stats.pending;
{}
static void
-helperStatefulDispatch(helper_stateful_server * srv, Helper::Request * r)
+helperStatefulDispatch(helper_stateful_server * srv, Helper::Xaction * r)
{
statefulhelper *hlp = srv->parent;
- if (!cbdataReferenceValid(r->data)) {
+ if (!cbdataReferenceValid(r->request.data)) {
debugs(84, DBG_IMPORTANT, "helperStatefulDispatch: invalid callback data");
delete r;
helperStatefulReleaseServer(srv);
debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index);
- if (r->placeholder == 1) {
+ if (r->request.placeholder == 1) {
/* a callback is needed before this request can _use_ a helper. */
/* we don't care about releasing this helper. The request NEVER
* gets to the helper. So we throw away the return code */
- Helper::Reply nilReply;
- nilReply.whichServer = srv;
- r->callback(r->data, nilReply);
+ r->reply.result = Helper::Unknown;
+ r->reply.whichServer = srv;
+ r->request.callback(r->request.data, r->reply);
/* throw away the placeholder */
delete r;
/* and push the queue. Note that the callback may have submitted a new
srv->dispatch_time = current_time;
AsyncCall::Pointer call = commCbCall(5,5, "helperStatefulDispatchWriteDone",
CommIoCbPtrFun(helperStatefulDispatchWriteDone, hlp));
- Comm::Write(srv->writePipe, r->buf, strlen(r->buf), call, NULL);
+ Comm::Write(srv->writePipe, r->request.buf, strlen(r->request.buf), call, NULL);
debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
hlp->id_name << " #" << srv->index << ", " <<
- (int) strlen(r->buf) << " bytes");
+ (int) strlen(r->request.buf) << " bytes");
++ srv->stats.uses;
++ srv->stats.pending;
static void
helperKickQueue(helper * hlp)
{
- Helper::Request *r;
+ Helper::Xaction *r;
helper_server *srv;
while ((srv = GetFirstAvailable(hlp)) && (r = hlp->nextRequest()))
static void
helperStatefulKickQueue(statefulhelper * hlp)
{
- Helper::Request *r;
+ Helper::Xaction *r;
helper_stateful_server *srv;
while ((srv = StatefulGetFirstAvailable(hlp)) && (r = hlp->nextRequest()))
helper_server::checkForTimedOutRequests(bool const retry)
{
assert(parent->childs.concurrency);
- while(!requests.empty() && requests.front()->timedOut(parent->timeout)) {
- Helper::Request *r = requests.front();
+ while(!requests.empty() && requests.front()->request.timedOut(parent->timeout)) {
+ Helper::Xaction *r = requests.front();
RequestIndex::iterator it;
- it = requestsIndex.find(r->Id);
+ it = requestsIndex.find(r->request.Id);
assert(it != requestsIndex.end());
requestsIndex.erase(it);
requests.pop_front();
- debugs(84, 2, "Request " << r->Id << " timed-out, remove it from queue");
+ debugs(84, 2, "Request " << r->request.Id << " timed-out, remove it from queue");
void *cbdata;
bool retried = false;
- if (retry && r->retries < MAX_RETRIES && cbdataReferenceValid(r->data)) {
- debugs(84, 2, "Retry request " << r->Id);
- ++r->retries;
+ if (retry && r->request.retries < MAX_RETRIES && cbdataReferenceValid(r->request.data)) {
+ debugs(84, 2, "Retry request " << r->request.Id);
+ ++r->request.retries;
parent->submitRequest(r);
retried = true;
- } else if (cbdataReferenceValidDone(r->data, &cbdata)) {
+ } else if (cbdataReferenceValidDone(r->request.data, &cbdata)) {
if (!parent->onTimedOutResponse.isEmpty()) {
- // Helper::Reply needs a non const buffer
- char *replyMsg = xstrdup(parent->onTimedOutResponse.c_str());
- r->callback(cbdata, Helper::Reply(replyMsg, strlen(replyMsg)));
- xfree(replyMsg);
- } else
- r->callback(cbdata, Helper::Reply(Helper::TimedOut));
+ if (r->reply.accumulate(parent->onTimedOutResponse.rawContent(), parent->onTimedOutResponse.length()))
+ r->reply.finalize();
+ else
+ r->reply.result = Helper::TimedOut;
+ r->request.callback(cbdata, r->reply);
+ } else {
+ r->reply.result = Helper::TimedOut;
+ r->request.callback(cbdata, r->reply);
+ }
}
--stats.pending;
++stats.timedout;
AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "helper_server::requestTimeout",
CommTimeoutCbPtrFun(helper_server::requestTimeout, srv));
- const int timeSpent = srv->requests.empty() ? 0 : (squid_curtime - srv->requests.front()->dispatch_time.tv_sec);
+ const int timeSpent = srv->requests.empty() ? 0 : (squid_curtime - srv->requests.front()->request.dispatch_time.tv_sec);
const int timeLeft = max(1, (static_cast<int>(srv->parent->timeout) - timeSpent));
commSetConnTimeout(io.conn, timeLeft, timeoutCall);