/*
- * DEBUG: section 84 Helper process maintenance
- * AUTHOR: Harvest Derived?
- *
- * SQUID Web Proxy Cache http://www.squid-cache.org/
- * ----------------------------------------------------------
- *
- * Squid is the result of efforts by numerous individuals from
- * the Internet community; see the CONTRIBUTORS file for full
- * details. Many organizations have provided support for Squid's
- * development; see the SPONSORS file for full details. Squid is
- * Copyrighted (C) 2001 by the Regents of the University of
- * California; see the COPYRIGHT file for full details. Squid
- * incorporates software developed and/or copyrighted by other
- * sources; see the CREDITS file for full details.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
+ * Copyright (C) 1996-2020 The Squid Software Foundation and contributors
*
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
*/
+/* DEBUG: section 84 Helper process maintenance */
+
#include "squid.h"
#include "base/AsyncCbdataCalls.h"
+#include "base/Packable.h"
#include "comm.h"
#include "comm/Connection.h"
+#include "comm/Read.h"
#include "comm/Write.h"
#include "fd.h"
+#include "fde.h"
#include "format/Quoting.h"
#include "helper.h"
-#include "Mem.h"
+#include "helper/Reply.h"
+#include "helper/Request.h"
#include "MemBuf.h"
+#include "SquidConfig.h"
#include "SquidIpc.h"
#include "SquidMath.h"
#include "SquidTime.h"
#include "Store.h"
#include "wordlist.h"
+// helper_stateful_server::data uses explicit alloc()/freeOne() */
+#include "mem/Pool.h"
+
#define HELPER_MAX_ARGS 64
-/** Initial Squid input buffer size. Helper responses may exceed this, and
- * Squid will grow the input buffer as needed, up to ReadBufMaxSize.
- */
-const size_t ReadBufMinSize(4*1024);
+/// The maximum allowed request retries.
+#define MAX_RETRIES 2
-/** Maximum safe size of a helper-to-Squid response message plus one.
- * Squid will warn and close the stream if a helper sends a too-big response.
- * ssl_crtd helper is known to produce responses of at least 10KB in size.
- * Some undocumented helpers are known to produce responses exceeding 8KB.
- */
-const size_t ReadBufMaxSize(32*1024);
+/// Helpers input buffer size.
+const size_t ReadBufSize(32*1024);
static IOCB helperHandleRead;
static IOCB helperStatefulHandleRead;
-static void helperServerFree(helper_server *srv);
-static void helperStatefulServerFree(helper_stateful_server *srv);
-static void Enqueue(helper * hlp, helper_request *);
-static helper_request *Dequeue(helper * hlp);
-static helper_stateful_request *StatefulDequeue(statefulhelper * hlp);
-static helper_server *GetFirstAvailable(helper * hlp);
+static void Enqueue(helper * hlp, Helper::Xaction *);
+static helper_server *GetFirstAvailable(const helper * hlp);
static helper_stateful_server *StatefulGetFirstAvailable(statefulhelper * hlp);
-static void helperDispatch(helper_server * srv, helper_request * r);
-static void helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r);
+static void helperDispatch(helper_server * srv, Helper::Xaction * r);
+static void helperStatefulDispatch(helper_stateful_server * srv, Helper::Xaction * r);
static void helperKickQueue(helper * hlp);
static void helperStatefulKickQueue(statefulhelper * hlp);
static void helperStatefulServerDone(helper_stateful_server * srv);
-static void helperRequestFree(helper_request * r);
-static void helperStatefulRequestFree(helper_stateful_request * r);
-static void StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r);
-static bool helperStartStats(StoreEntry *sentry, void *hlp, const char *label);
+static void StatefulEnqueue(statefulhelper * hlp, Helper::Xaction * r);
CBDATA_CLASS_INIT(helper);
-CBDATA_TYPE(helper_server);
+CBDATA_CLASS_INIT(helper_server);
CBDATA_CLASS_INIT(statefulhelper);
-CBDATA_TYPE(helper_stateful_server);
+CBDATA_CLASS_INIT(helper_stateful_server);
+
+InstanceIdDefinitions(HelperServerBase, "Hlpr");
void
HelperServerBase::initStats()
stats.replies=0;
stats.pending=0;
stats.releases=0;
+ stats.timedout = 0;
}
void
-HelperServerBase::closePipesSafely()
+HelperServerBase::closePipesSafely(const char *id_name)
{
#if _SQUID_WINDOWS_
- int no = index + 1;
-
shutdown(writePipe->fd, SD_BOTH);
#endif
- flags.closing = 1;
+ flags.closing = true;
if (readPipe->fd == writePipe->fd)
readPipe->fd = -1;
else
if (hIpc) {
if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
getCurrentTime();
- debugs(84, DBG_IMPORTANT, "WARNING: " << hlp->id_name <<
- " #" << no << " (" << hlp->cmdline->key << "," <<
- (long int)pid << ") didn't exit in 5 seconds");
+ debugs(84, DBG_IMPORTANT, "WARNING: " << id_name <<
+ " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
}
CloseHandle(hIpc);
}
}
void
-HelperServerBase::closeWritePipeSafely()
+HelperServerBase::closeWritePipeSafely(const char *id_name)
{
#if _SQUID_WINDOWS_
- int no = index + 1;
-
shutdown(writePipe->fd, (readPipe->fd == writePipe->fd ? SD_BOTH : SD_SEND));
#endif
- flags.closing = 1;
+ flags.closing = true;
if (readPipe->fd == writePipe->fd)
readPipe->fd = -1;
writePipe->close();
if (hIpc) {
if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
getCurrentTime();
- debugs(84, DBG_IMPORTANT, "WARNING: " << hlp->id_name <<
- " #" << no << " (" << hlp->cmdline->key << "," <<
- (long int)pid << ") didn't exit in 5 seconds");
+ debugs(84, DBG_IMPORTANT, "WARNING: " << id_name <<
+ " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
}
CloseHandle(hIpc);
}
#endif
}
+void
+HelperServerBase::dropQueued()
+{
+ while (!requests.empty()) {
+ // XXX: re-schedule these on another helper?
+ Helper::Xaction *r = requests.front();
+ requests.pop_front();
+ void *cbdata;
+ if (cbdataReferenceValidDone(r->request.data, &cbdata)) {
+ r->reply.result = Helper::Unknown;
+ r->request.callback(cbdata, r->reply);
+ }
+
+ delete r;
+ }
+}
+
+HelperServerBase::~HelperServerBase()
+{
+ if (rbuf) {
+ memFreeBuf(rbuf_sz, rbuf);
+ rbuf = NULL;
+ }
+}
+
+helper_server::~helper_server()
+{
+ wqueue->clean();
+ delete wqueue;
+
+ if (writebuf) {
+ writebuf->clean();
+ delete writebuf;
+ writebuf = NULL;
+ }
+
+ if (Comm::IsConnOpen(writePipe))
+ closeWritePipeSafely(parent->id_name);
+
+ dlinkDelete(&link, &parent->servers);
+
+ assert(parent->childs.n_running > 0);
+ -- parent->childs.n_running;
+
+ assert(requests.empty());
+ cbdataReferenceDone(parent);
+}
+
+void
+helper_server::dropQueued()
+{
+ HelperServerBase::dropQueued();
+ requestsIndex.clear();
+}
+
+helper_stateful_server::~helper_stateful_server()
+{
+ /* TODO: walk the local queue of requests and carry them all out */
+ if (Comm::IsConnOpen(writePipe))
+ closeWritePipeSafely(parent->id_name);
+
+ parent->cancelReservation(reservationId);
+
+ dlinkDelete(&link, &parent->servers);
+
+ assert(parent->childs.n_running > 0);
+ -- parent->childs.n_running;
+
+ assert(requests.empty());
+
+ cbdataReferenceDone(parent);
+}
+
void
helperOpenServers(helper * hlp)
{
++ hlp->childs.n_running;
++ hlp->childs.n_active;
- CBDATA_INIT_TYPE(helper_server);
- srv = cbdataAlloc(helper_server);
+ srv = new helper_server;
srv->hIpc = hIpc;
srv->pid = pid;
srv->initStats();
- srv->index = k;
srv->addr = hlp->addr;
srv->readPipe = new Comm::Connection;
srv->readPipe->fd = rfd;
srv->writePipe = new Comm::Connection;
srv->writePipe->fd = wfd;
- srv->rbuf = (char *)memAllocBuf(ReadBufMinSize, &srv->rbuf_sz);
+ srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
srv->wqueue = new MemBuf;
srv->roffset = 0;
- srv->requests = (helper_request **)xcalloc(hlp->childs.concurrency ? hlp->childs.concurrency : 1, sizeof(*srv->requests));
+ srv->nextRequestId = 0;
+ srv->replyXaction = NULL;
+ srv->ignoreToEom = false;
srv->parent = cbdataReference(hlp);
dlinkAddTail(srv, &srv->link, &hlp->servers);
if (wfd != rfd)
commSetNonBlocking(wfd);
- AsyncCall::Pointer closeCall = asyncCall(5,4, "helperServerFree", cbdataDialer(helperServerFree, srv));
+ AsyncCall::Pointer closeCall = asyncCall(5,4, "helper_server::HelperServerClosed", cbdataDialer(helper_server::HelperServerClosed, srv));
comm_add_close_handler(rfd, closeCall);
+ if (hlp->timeout && hlp->childs.concurrency) {
+ AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "helper_server::requestTimeout",
+ CommTimeoutCbPtrFun(helper_server::requestTimeout, srv));
+ commSetConnTimeout(srv->readPipe, hlp->timeout, timeoutCall);
+ }
+
AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
CommIoCbPtrFun(helperHandleRead, srv));
comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
++ hlp->childs.n_running;
++ hlp->childs.n_active;
- CBDATA_INIT_TYPE(helper_stateful_server);
- helper_stateful_server *srv = cbdataAlloc(helper_stateful_server);
+ helper_stateful_server *srv = new helper_stateful_server;
srv->hIpc = hIpc;
srv->pid = pid;
- srv->flags.reserved = 0;
srv->initStats();
- srv->index = k;
srv->addr = hlp->addr;
srv->readPipe = new Comm::Connection;
srv->readPipe->fd = rfd;
srv->writePipe = new Comm::Connection;
srv->writePipe->fd = wfd;
- srv->rbuf = (char *)memAllocBuf(ReadBufMinSize, &srv->rbuf_sz);
+ srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
srv->roffset = 0;
srv->parent = cbdataReference(hlp);
-
- if (hlp->datapool != NULL)
- srv->data = hlp->datapool->alloc();
+ srv->reservationStart = 0;
dlinkAddTail(srv, &srv->link, &hlp->servers);
if (wfd != rfd)
commSetNonBlocking(wfd);
- AsyncCall::Pointer closeCall = asyncCall(5,4, "helperStatefulServerFree", cbdataDialer(helperStatefulServerFree, srv));
+ AsyncCall::Pointer closeCall = asyncCall(5,4, "helper_stateful_server::HelperServerClosed", cbdataDialer(helper_stateful_server::HelperServerClosed, srv));
comm_add_close_handler(rfd, closeCall);
AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
}
void
-helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
+helper::submitRequest(Helper::Xaction *r)
{
- if (hlp == NULL) {
- debugs(84, 3, "helperSubmit: hlp == NULL");
- HelperReply nilReply;
- callback(data, nilReply);
- return;
- }
-
- helper_request *r = new helper_request;
helper_server *srv;
- r->callback = callback;
- r->data = cbdataReference(data);
- r->buf = xstrdup(buf);
-
- if ((srv = GetFirstAvailable(hlp)))
+ if ((srv = GetFirstAvailable(this)))
helperDispatch(srv, r);
else
- Enqueue(hlp, r);
+ Enqueue(this, r);
- debugs(84, 9, "helperSubmit: " << buf);
+ syncQueueStats();
}
-/// lastserver = "server last used as part of a reserved request sequence"
-void
-helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPCB * callback, void *data, helper_stateful_server * lastserver)
+/// handles helperSubmit() and helperStatefulSubmit() failures
+static void
+SubmissionFailure(helper *hlp, HLPCB *callback, void *data)
{
- if (hlp == NULL) {
- debugs(84, 3, "helperStatefulSubmit: hlp == NULL");
- HelperReply nilReply;
- callback(data, nilReply);
- return;
+ auto result = Helper::Error;
+ if (!hlp) {
+ debugs(84, 3, "no helper");
+ result = Helper::Unknown;
}
+ // else pretend the helper has responded with ERR
+
+ callback(data, Helper::Reply(result));
+}
+
+void
+helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
+{
+ if (!hlp || !hlp->trySubmit(buf, callback, data))
+ SubmissionFailure(hlp, callback, data);
+}
- helper_stateful_request *r = new helper_stateful_request;
+/// whether queuing an additional request would overload the helper
+bool
+helper::queueFull() const {
+ return stats.queue_size >= static_cast<int>(childs.queue_size);
+}
- r->callback = callback;
- r->data = cbdataReference(data);
+bool
+helper::overloaded() const {
+ return stats.queue_size > static_cast<int>(childs.queue_size);
+}
- if (buf != NULL) {
- r->buf = xstrdup(buf);
- r->placeholder = 0;
+/// synchronizes queue-dependent measurements with the current queue state
+void
+helper::syncQueueStats()
+{
+ if (overloaded()) {
+ if (overloadStart) {
+ debugs(84, 5, id_name << " still overloaded; dropped " << droppedRequests);
+ } else {
+ overloadStart = squid_curtime;
+ debugs(84, 3, id_name << " became overloaded");
+ }
} else {
- r->buf = NULL;
- r->placeholder = 1;
+ if (overloadStart) {
+ debugs(84, 5, id_name << " is no longer overloaded");
+ if (droppedRequests) {
+ debugs(84, DBG_IMPORTANT, "helper " << id_name <<
+ " is no longer overloaded after dropping " << droppedRequests <<
+ " requests in " << (squid_curtime - overloadStart) << " seconds");
+ droppedRequests = 0;
+ }
+ overloadStart = 0;
+ }
}
+}
- if ((buf != NULL) && lastserver) {
- debugs(84, 5, "StatefulSubmit with lastserver " << lastserver);
- assert(lastserver->flags.reserved);
- assert(!(lastserver->request));
+/// prepares the helper for request submission
+/// returns true if and only if the submission should proceed
+/// may kill Squid if the helper remains overloaded for too long
+bool
+helper::prepSubmit()
+{
+ // re-sync for the configuration may have changed since the last submission
+ syncQueueStats();
- debugs(84, 5, "StatefulSubmit dispatching");
- helperStatefulDispatch(lastserver, r);
- } else {
- helper_stateful_server *srv;
- if ((srv = StatefulGetFirstAvailable(hlp))) {
- helperStatefulDispatch(srv, r);
- } else
- StatefulEnqueue(hlp, r);
+ // Nothing special to do if the new request does not overload (i.e., the
+ // queue is not even full yet) or only _starts_ overloading this helper
+ // (i.e., the queue is currently at its limit).
+ if (!overloaded())
+ return true;
+
+ if (squid_curtime - overloadStart <= 180)
+ return true; // also OK: overload has not persisted long enough to panic
+
+ if (childs.onPersistentOverload == Helper::ChildConfig::actDie)
+ fatalf("Too many queued %s requests; see on-persistent-overload.", id_name);
+
+ if (!droppedRequests) {
+ debugs(84, DBG_IMPORTANT, "WARNING: dropping requests to overloaded " <<
+ id_name << " helper configured with on-persistent-overload=err");
}
+ ++droppedRequests;
+ debugs(84, 3, "failed to send " << droppedRequests << " helper requests to " << id_name);
+ return false;
+}
- debugs(84, 9, "helperStatefulSubmit: placeholder: '" << r->placeholder << "', buf '" << buf << "'.");
+bool
+helper::trySubmit(const char *buf, HLPCB * callback, void *data)
+{
+ if (!prepSubmit())
+ return false; // request was dropped
+
+ submit(buf, callback, data); // will send or queue
+ return true; // request submitted or queued
}
-/**
- * DPW 2007-05-08
- *
- * helperStatefulReleaseServer tells the helper that whoever was
- * using it no longer needs its services.
- */
+/// dispatches or enqueues a helper requests; does not enforce queue limits
void
-helperStatefulReleaseServer(helper_stateful_server * srv)
+helper::submit(const char *buf, HLPCB * callback, void *data)
{
- debugs(84, 3, HERE << "srv-" << srv->index << " flags.reserved = " << srv->flags.reserved);
- if (!srv->flags.reserved)
- return;
+ Helper::Xaction *r = new Helper::Xaction(callback, data, buf);
+ submitRequest(r);
+ debugs(84, DBG_DATA, Raw("buf", buf, strlen(buf)));
+}
- ++ srv->stats.releases;
+/// Submit request or callback the caller with a Helper::Error error.
+/// If the reservation is not set then reserves a new helper.
+void
+helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
+{
+ if (!hlp || !hlp->trySubmit(buf, callback, data, reservation))
+ SubmissionFailure(hlp, callback, data);
+}
- srv->flags.reserved = 0;
- if (srv->parent->OnEmptyQueue != NULL && srv->data)
- srv->parent->OnEmptyQueue(srv->data);
+/// If possible, submit request. Otherwise, either kill Squid or return false.
+bool
+statefulhelper::trySubmit(const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
+{
+ if (!prepSubmit())
+ return false; // request was dropped
- helperStatefulServerDone(srv);
+ submit(buf, callback, data, reservation); // will send or queue
+ return true; // request submitted or queued
}
-/** return a pointer to the stateful routines data area */
-void *
-helperStatefulServerGetData(helper_stateful_server * srv)
+void
+statefulhelper::reserveServer(helper_stateful_server * srv)
{
- return srv->data;
+ // clear any old reservation
+ if (srv->reserved()) {
+ reservations.erase(srv->reservationId);
+ srv->clearReservation();
+ }
+
+ srv->reserve();
+ reservations.insert(Reservations::value_type(srv->reservationId, srv));
}
-/**
- * Dump some stats about the helper states to a StoreEntry
- */
void
-helperStats(StoreEntry * sentry, helper * hlp, const char *label)
+statefulhelper::cancelReservation(const Helper::ReservationId reservation)
{
- if (!helperStartStats(sentry, hlp, label))
+ const auto it = reservations.find(reservation);
+ if (it == reservations.end())
return;
- storeAppendPrintf(sentry, "program: %s\n",
- hlp->cmdline->key);
- storeAppendPrintf(sentry, "number active: %d of %d (%d shutting down)\n",
- hlp->childs.n_active, hlp->childs.n_max, (hlp->childs.n_running - hlp->childs.n_active) );
- storeAppendPrintf(sentry, "requests sent: %d\n",
- hlp->stats.requests);
- storeAppendPrintf(sentry, "replies received: %d\n",
- hlp->stats.replies);
- storeAppendPrintf(sentry, "queue length: %d\n",
- hlp->stats.queue_size);
- storeAppendPrintf(sentry, "avg service time: %d msec\n",
- hlp->stats.avg_svc_time);
- storeAppendPrintf(sentry, "\n");
- storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%11s\t%s\t%7s\t%7s\t%7s\n",
- "#",
- "FD",
- "PID",
- "# Requests",
- "# Replies",
- "Flags",
- "Time",
- "Offset",
- "Request");
-
- for (dlink_node *link = hlp->servers.head; link; link = link->next) {
- helper_server *srv = (helper_server*)link->data;
- double tt = 0.001 * (srv->requests[0] ? tvSubMsec(srv->requests[0]->dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
- storeAppendPrintf(sentry, "%7d\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "%c%c%c%c\t%7.3f\t%7d\t%s\n",
- srv->index + 1,
- srv->readPipe->fd,
- srv->pid,
- srv->stats.uses,
- srv->stats.replies,
- srv->stats.pending ? 'B' : ' ',
- srv->flags.writing ? 'W' : ' ',
- srv->flags.closing ? 'C' : ' ',
- srv->flags.shutdown ? 'S' : ' ',
- tt < 0.0 ? 0.0 : tt,
- (int) srv->roffset,
- srv->requests[0] ? Format::QuoteMimeBlob(srv->requests[0]->buf) : "(none)");
- }
+ helper_stateful_server *srv = it->second;
+ reservations.erase(it);
+ srv->clearReservation();
- storeAppendPrintf(sentry, "\nFlags key:\n\n");
- storeAppendPrintf(sentry, " B = BUSY\n");
- storeAppendPrintf(sentry, " W = WRITING\n");
- storeAppendPrintf(sentry, " C = CLOSING\n");
- storeAppendPrintf(sentry, " S = SHUTDOWN PENDING\n");
+ // schedule a queue kick
+ AsyncCall::Pointer call = asyncCall(5,4, "helperStatefulServerDone", cbdataDialer(helperStatefulServerDone, srv));
+ ScheduleCallHere(call);
+}
+
+helper_stateful_server *
+statefulhelper::findServer(const Helper::ReservationId & reservation)
+{
+ const auto it = reservations.find(reservation);
+ if (it == reservations.end())
+ return nullptr;
+ return it->second;
+}
+
+void
+helper_stateful_server::reserve()
+{
+ assert(!reservationId);
+ reservationStart = squid_curtime;
+ reservationId = Helper::ReservationId::Next();
+ debugs(84, 3, "srv-" << index << " reservation id = " << reservationId);
}
void
-helperStatefulStats(StoreEntry * sentry, statefulhelper * hlp, const char *label)
+helper_stateful_server::clearReservation()
{
- if (!helperStartStats(sentry, hlp, label))
+ debugs(84, 3, "srv-" << index << " reservation id = " << reservationId);
+ if (!reservationId)
return;
- storeAppendPrintf(sentry, "program: %s\n",
- hlp->cmdline->key);
- storeAppendPrintf(sentry, "number active: %d of %d (%d shutting down)\n",
- hlp->childs.n_active, hlp->childs.n_max, (hlp->childs.n_running - hlp->childs.n_active) );
- storeAppendPrintf(sentry, "requests sent: %d\n",
- hlp->stats.requests);
- storeAppendPrintf(sentry, "replies received: %d\n",
- hlp->stats.replies);
- storeAppendPrintf(sentry, "queue length: %d\n",
- hlp->stats.queue_size);
- storeAppendPrintf(sentry, "avg service time: %d msec\n",
- hlp->stats.avg_svc_time);
- storeAppendPrintf(sentry, "\n");
- storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
- "#",
- "FD",
- "PID",
- "# Requests",
- "# Replies",
- "Flags",
- "Time",
- "Offset",
- "Request");
-
- for (dlink_node *link = hlp->servers.head; link; link = link->next) {
- helper_stateful_server *srv = (helper_stateful_server *)link->data;
- double tt = 0.001 * tvSubMsec(srv->dispatch_time, srv->flags.busy ? current_time : srv->answer_time);
- storeAppendPrintf(sentry, "%7d\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
- srv->index + 1,
- srv->readPipe->fd,
- srv->pid,
- srv->stats.uses,
- srv->stats.replies,
- srv->flags.busy ? 'B' : ' ',
- srv->flags.closing ? 'C' : ' ',
- srv->flags.reserved ? 'R' : ' ',
- srv->flags.shutdown ? 'S' : ' ',
- srv->request ? (srv->request->placeholder ? 'P' : ' ') : ' ',
- tt < 0.0 ? 0.0 : tt,
- (int) srv->roffset,
- srv->request ? Format::QuoteMimeBlob(srv->request->buf) : "(none)");
+ ++stats.releases;
+
+ reservationId.clear();
+ reservationStart = 0;
+}
+
+void
+statefulhelper::submit(const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
+{
+ Helper::Xaction *r = new Helper::Xaction(callback, data, buf);
+
+ if (buf && reservation) {
+ debugs(84, 5, reservation);
+ helper_stateful_server *lastServer = findServer(reservation);
+ if (!lastServer) {
+ debugs(84, DBG_CRITICAL, "ERROR: Helper " << id_name << " reservation expired (" << reservation << ")");
+ r->reply.result = Helper::TimedOut;
+ r->request.callback(r->request.data, r->reply);
+ delete r;
+ return;
+ }
+ debugs(84, 5, "StatefulSubmit dispatching");
+ helperStatefulDispatch(lastServer, r);
+ } else {
+ helper_stateful_server *srv;
+ if ((srv = StatefulGetFirstAvailable(this))) {
+ reserveServer(srv);
+ helperStatefulDispatch(srv, r);
+ } else
+ StatefulEnqueue(this, r);
+ }
+
+ debugs(84, DBG_DATA, "placeholder: '" << r->request.placeholder <<
+ "', " << Raw("buf", buf, (!buf?0:strlen(buf))));
+
+ syncQueueStats();
+}
+
+void
+helper::packStatsInto(Packable *p, const char *label) const
+{
+ if (label)
+ p->appendf("%s:\n", label);
+
+ p->appendf(" program: %s\n", cmdline->key);
+ p->appendf(" number active: %d of %d (%d shutting down)\n", childs.n_active, childs.n_max, (childs.n_running - childs.n_active));
+ p->appendf(" requests sent: %d\n", stats.requests);
+ p->appendf(" replies received: %d\n", stats.replies);
+ p->appendf(" requests timedout: %d\n", stats.timedout);
+ p->appendf(" queue length: %d\n", stats.queue_size);
+ p->appendf(" avg service time: %d msec\n", stats.avg_svc_time);
+ p->append("\n",1);
+ p->appendf("%7s\t%7s\t%7s\t%11s\t%11s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
+ "ID #",
+ "FD",
+ "PID",
+ "# Requests",
+ "# Replies",
+ "# Timed-out",
+ "Flags",
+ "Time",
+ "Offset",
+ "Request");
+
+ for (dlink_node *link = servers.head; link; link = link->next) {
+ HelperServerBase *srv = static_cast<HelperServerBase *>(link->data);
+ assert(srv);
+ Helper::Xaction *xaction = srv->requests.empty() ? NULL : srv->requests.front();
+ double tt = 0.001 * (xaction ? tvSubMsec(xaction->request.dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
+ p->appendf("%7u\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
+ srv->index.value,
+ srv->readPipe->fd,
+ srv->pid,
+ srv->stats.uses,
+ srv->stats.replies,
+ srv->stats.timedout,
+ srv->stats.pending ? 'B' : ' ',
+ srv->flags.writing ? 'W' : ' ',
+ srv->flags.closing ? 'C' : ' ',
+ srv->reserved() ? 'R' : ' ',
+ srv->flags.shutdown ? 'S' : ' ',
+ xaction && xaction->request.placeholder ? 'P' : ' ',
+ tt < 0.0 ? 0.0 : tt,
+ (int) srv->roffset,
+ xaction ? Format::QuoteMimeBlob(xaction->request.buf) : "(none)");
}
- storeAppendPrintf(sentry, "\nFlags key:\n\n");
- storeAppendPrintf(sentry, " B = BUSY\n");
- storeAppendPrintf(sentry, " C = CLOSING\n");
- storeAppendPrintf(sentry, " R = RESERVED\n");
- storeAppendPrintf(sentry, " S = SHUTDOWN PENDING\n");
- storeAppendPrintf(sentry, " P = PLACEHOLDER\n");
+ p->append("\nFlags key:\n"
+ " B\tBUSY\n"
+ " W\tWRITING\n"
+ " C\tCLOSING\n"
+ " R\tRESERVED\n"
+ " S\tSHUTDOWN PENDING\n"
+ " P\tPLACEHOLDER\n", 101);
+}
+
+bool
+helper::willOverload() const {
+ return queueFull() && !(childs.needNew() || GetFirstAvailable(this));
}
void
link = link->next;
if (srv->flags.shutdown) {
- debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " has already SHUT DOWN.");
+ debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
continue;
}
assert(hlp->childs.n_active > 0);
-- hlp->childs.n_active;
- srv->flags.shutdown = 1; /* request it to shut itself down */
+ srv->flags.shutdown = true; /* request it to shut itself down */
if (srv->flags.closing) {
- debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is CLOSING.");
+ debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
continue;
}
if (srv->stats.pending) {
- debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is BUSY.");
+ debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
continue;
}
- debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " shutting down.");
+ debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
/* the rest of the details is dealt with in the helperServerFree
* close handler
*/
- srv->closePipesSafely();
+ srv->closePipesSafely(hlp->id_name);
}
}
link = link->next;
if (srv->flags.shutdown) {
- debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " has already SHUT DOWN.");
+ debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
continue;
}
assert(hlp->childs.n_active > 0);
-- hlp->childs.n_active;
- srv->flags.shutdown = 1; /* request it to shut itself down */
+ srv->flags.shutdown = true; /* request it to shut itself down */
- if (srv->flags.busy) {
- debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is BUSY.");
+ if (srv->stats.pending) {
+ debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
continue;
}
if (srv->flags.closing) {
- debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is CLOSING.");
+ debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
continue;
}
- if (srv->flags.reserved) {
+ if (srv->reserved()) {
if (shutting_down) {
- debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is RESERVED. Closing anyway.");
+ debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Closing anyway.");
} else {
- debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is RESERVED. Not Shutting Down Yet.");
+ debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Not Shutting Down Yet.");
continue;
}
}
- debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " shutting down.");
+ debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
/* the rest of the details is dealt with in the helperStatefulServerFree
* close handler
*/
- srv->closePipesSafely();
+ srv->closePipesSafely(hlp->id_name);
}
}
{
/* note, don't free id_name, it probably points to static memory */
- if (queue.head)
+ // TODO: if the queue is not empty it will leak Helper::Request's
+ if (!queue.empty())
debugs(84, DBG_CRITICAL, "WARNING: freeing " << id_name << " helper with " << stats.queue_size << " requests queued");
}
-/* ====================================================================== */
-/* LOCAL FUNCTIONS */
-/* ====================================================================== */
-
-static void
-helperServerFree(helper_server *srv)
+void
+helper::handleKilledServer(HelperServerBase *srv, bool &needsNewServers)
{
- helper *hlp = srv->parent;
- helper_request *r;
- int i, concurrency = hlp->childs.concurrency;
-
- if (!concurrency)
- concurrency = 1;
-
- if (srv->rbuf) {
- memFreeBuf(srv->rbuf_sz, srv->rbuf);
- srv->rbuf = NULL;
- }
-
- srv->wqueue->clean();
- delete srv->wqueue;
-
- if (srv->writebuf) {
- srv->writebuf->clean();
- delete srv->writebuf;
- srv->writebuf = NULL;
- }
-
- if (Comm::IsConnOpen(srv->writePipe))
- srv->closeWritePipeSafely();
-
- dlinkDelete(&srv->link, &hlp->servers);
-
- assert(hlp->childs.n_running > 0);
- -- hlp->childs.n_running;
-
+ needsNewServers = false;
if (!srv->flags.shutdown) {
- assert(hlp->childs.n_active > 0);
- -- hlp->childs.n_active;
- debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index + 1 << " exited");
+ assert(childs.n_active > 0);
+ --childs.n_active;
+ debugs(84, DBG_CRITICAL, "WARNING: " << id_name << " #" << srv->index << " exited");
- if (hlp->childs.needNew() > 0) {
- debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
+ if (childs.needNew() > 0) {
+ debugs(80, DBG_IMPORTANT, "Too few " << id_name << " processes are running (need " << childs.needNew() << "/" << childs.n_max << ")");
- if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30) {
+ if (childs.n_active < childs.n_startup && last_restart > squid_curtime - 30) {
if (srv->stats.replies < 1)
- fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
+ fatalf("The %s helpers are crashing too rapidly, need help!\n", id_name);
else
- debugs(80, DBG_CRITICAL, "ERROR: The " << hlp->id_name << " helpers are crashing too rapidly, need help!");
+ debugs(80, DBG_CRITICAL, "ERROR: The " << id_name << " helpers are crashing too rapidly, need help!");
}
-
- debugs(80, DBG_IMPORTANT, "Starting new helpers");
- helperOpenServers(hlp);
+ srv->flags.shutdown = true;
+ needsNewServers = true;
}
}
-
- for (i = 0; i < concurrency; ++i) {
- // XXX: re-schedule these on another helper?
- if ((r = srv->requests[i])) {
- void *cbdata;
-
- if (cbdataReferenceValidDone(r->data, &cbdata)) {
- HelperReply nilReply;
- r->callback(cbdata, nilReply);
- }
-
- helperRequestFree(r);
-
- srv->requests[i] = NULL;
- }
- }
- safe_free(srv->requests);
-
- cbdataReferenceDone(srv->parent);
- cbdataFree(srv);
}
-static void
-helperStatefulServerFree(helper_stateful_server *srv)
+void
+helper_server::HelperServerClosed(helper_server *srv)
{
- statefulhelper *hlp = srv->parent;
- helper_stateful_request *r;
+ helper *hlp = srv->getParent();
- if (srv->rbuf) {
- memFreeBuf(srv->rbuf_sz, srv->rbuf);
- srv->rbuf = NULL;
+ bool needsNewServers = false;
+ hlp->handleKilledServer(srv, needsNewServers);
+ if (needsNewServers) {
+ debugs(80, DBG_IMPORTANT, "Starting new helpers");
+ helperOpenServers(hlp);
}
-#if 0
- srv->wqueue->clean();
+ srv->dropQueued();
- delete srv->wqueue;
-
-#endif
-
- /* TODO: walk the local queue of requests and carry them all out */
- if (Comm::IsConnOpen(srv->writePipe))
- srv->closeWritePipeSafely();
-
- dlinkDelete(&srv->link, &hlp->servers);
-
- assert(hlp->childs.n_running > 0);
- -- hlp->childs.n_running;
-
- if (!srv->flags.shutdown) {
- assert( hlp->childs.n_active > 0);
- -- hlp->childs.n_active;
- debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index + 1 << " exited");
-
- if (hlp->childs.needNew() > 0) {
- debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
+ delete srv;
+}
- if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30) {
- if (srv->stats.replies < 1)
- fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
- else
- debugs(80, DBG_CRITICAL, "ERROR: The " << hlp->id_name << " helpers are crashing too rapidly, need help!");
- }
+// XXX: Almost duplicates helper_server::HelperServerClosed() because helperOpenServers() is not a virtual method of the `helper` class
+// TODO: Fix the `helper` class hierarchy to use CbdataParent and virtual functions.
+void
+helper_stateful_server::HelperServerClosed(helper_stateful_server *srv)
+{
+ statefulhelper *hlp = static_cast<statefulhelper *>(srv->getParent());
- debugs(80, DBG_IMPORTANT, "Starting new helpers");
- helperStatefulOpenServers(hlp);
- }
+ bool needsNewServers = false;
+ hlp->handleKilledServer(srv, needsNewServers);
+ if (needsNewServers) {
+ debugs(80, DBG_IMPORTANT, "Starting new helpers");
+ helperStatefulOpenServers(hlp);
}
- if ((r = srv->request)) {
- void *cbdata;
-
- if (cbdataReferenceValidDone(r->data, &cbdata)) {
- HelperReply nilReply;
- nilReply.whichServer = srv;
- r->callback(cbdata, nilReply);
- }
+ srv->dropQueued();
- helperStatefulRequestFree(r);
+ delete srv;
+}
- srv->request = NULL;
+Helper::Xaction *
+helper_server::popRequest(int request_number)
+{
+ Helper::Xaction *r = nullptr;
+ helper_server::RequestIndex::iterator it;
+ if (parent->childs.concurrency) {
+ // If concurency supported retrieve request from ID
+ it = requestsIndex.find(request_number);
+ if (it != requestsIndex.end()) {
+ r = *(it->second);
+ requests.erase(it->second);
+ requestsIndex.erase(it);
+ }
+ } else if(!requests.empty()) {
+ // Else get the first request from queue, if any
+ r = requests.front();
+ requests.pop_front();
}
- if (srv->data != NULL)
- hlp->datapool->freeOne(srv->data);
-
- cbdataReferenceDone(srv->parent);
-
- cbdataFree(srv);
+ return r;
}
/// Calls back with a pointer to the buffer with the helper output
static void
-helperReturnBuffer(int request_number, helper_server * srv, helper * hlp, char * msg, char * msg_end)
+helperReturnBuffer(helper_server * srv, helper * hlp, char * msg, size_t msgSize, char * msgEnd)
{
- helper_request *r = srv->requests[request_number];
- if (r) {
-// TODO: parse the reply into new helper reply object
-// pass that to the callback instead of msg
-
- HLPCB *callback = r->callback;
-
- srv->requests[request_number] = NULL;
+ if (Helper::Xaction *r = srv->replyXaction) {
+ const bool hasSpace = r->reply.accumulate(msg, msgSize);
+ if (!hasSpace) {
+ debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
+ "helper that overflowed " << srv->rbuf_sz << "-byte " <<
+ "Squid input buffer: " << hlp->id_name << " #" << srv->index);
+ srv->closePipesSafely(hlp->id_name);
+ return;
+ }
- r->callback = NULL;
+ if (!msgEnd)
+ return; // We are waiting for more data.
- void *cbdata = NULL;
- if (cbdataReferenceValidDone(r->data, &cbdata)) {
- HelperReply response(msg, (msg_end-msg));
- callback(cbdata, response);
+ bool retry = false;
+ if (cbdataReferenceValid(r->request.data)) {
+ r->reply.finalize();
+ if (r->reply.result == Helper::BrokenHelper && r->request.retries < MAX_RETRIES) {
+ debugs(84, DBG_IMPORTANT, "ERROR: helper: " << r->reply << ", attempt #" << (r->request.retries + 1) << " of 2");
+ retry = true;
+ } else {
+ HLPCB *callback = r->request.callback;
+ r->request.callback = nullptr;
+ void *cbdata = nullptr;
+ if (cbdataReferenceValidDone(r->request.data, &cbdata))
+ callback(cbdata, r->reply);
+ }
}
-- srv->stats.pending;
srv->answer_time = current_time;
- srv->dispatch_time = r->dispatch_time;
+ srv->dispatch_time = r->request.dispatch_time;
hlp->stats.avg_svc_time =
Math::intAverage(hlp->stats.avg_svc_time,
- tvSubMsec(r->dispatch_time, current_time),
+ tvSubMsec(r->request.dispatch_time, current_time),
hlp->stats.replies, REDIRECT_AV_FACTOR);
- helperRequestFree(r);
- } else {
- debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected reply on channel " <<
- request_number << " from " << hlp->id_name << " #" << srv->index + 1 <<
- " '" << srv->rbuf << "'");
+ // release or re-submit parsedRequestXaction object
+ srv->replyXaction = nullptr;
+ if (retry) {
+ ++r->request.retries;
+ hlp->submitRequest(r);
+ } else
+ delete r;
}
- srv->roffset -= (msg_end - srv->rbuf);
- memmove(srv->rbuf, msg_end, srv->roffset + 1);
+
+ if (hlp->timeout && hlp->childs.concurrency)
+ srv->checkForTimedOutRequests(hlp->retryTimedOut);
if (!srv->flags.shutdown) {
helperKickQueue(hlp);
} else if (!srv->flags.closing && !srv->stats.pending) {
- srv->flags.closing=1;
+ srv->flags.closing=true;
srv->writePipe->close();
- return;
}
}
static void
-helperHandleRead(const Comm::ConnectionPointer &conn, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
+helperHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
{
- char *t = NULL;
helper_server *srv = (helper_server *)data;
helper *hlp = srv->parent;
assert(cbdataReferenceValid(data));
- /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
+ /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
- if (flag == COMM_ERR_CLOSING) {
+ if (flag == Comm::ERR_CLOSING) {
return;
}
assert(conn->fd == srv->readPipe->fd);
- debugs(84, 5, "helperHandleRead: " << len << " bytes from " << hlp->id_name << " #" << srv->index + 1);
+ debugs(84, 5, "helperHandleRead: " << len << " bytes from " << hlp->id_name << " #" << srv->index);
- if (flag != COMM_OK || len == 0) {
- srv->closePipesSafely();
+ if (flag != Comm::OK || len == 0) {
+ srv->closePipesSafely(hlp->id_name);
return;
}
srv->roffset += len;
srv->rbuf[srv->roffset] = '\0';
- debugs(84, 9, "helperHandleRead: '" << srv->rbuf << "'");
+ debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
- if (!srv->stats.pending) {
+ if (!srv->stats.pending && !srv->stats.timedout) {
/* someone spoke without being spoken to */
debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected read from " <<
- hlp->id_name << " #" << srv->index + 1 << ", " << (int)len <<
+ hlp->id_name << " #" << srv->index << ", " << (int)len <<
" bytes '" << srv->rbuf << "'");
srv->roffset = 0;
srv->rbuf[0] = '\0';
}
- while ((t = strchr(srv->rbuf, hlp->eom))) {
- /* end of reply found */
- char *msg = srv->rbuf;
- int i = 0;
- debugs(84, 3, "helperHandleRead: end of reply found");
-
- if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n')
- t[-1] = '\0';
-
- *t = '\0';
- ++t;
-
- if (hlp->childs.concurrency) {
- i = strtol(msg, &msg, 10);
-
- while (*msg && xisspace(*msg))
- ++msg;
+ bool needsMore = false;
+ char *msg = srv->rbuf;
+ while (*msg && !needsMore) {
+ int skip = 0;
+ char *eom = strchr(msg, hlp->eom);
+ if (eom) {
+ skip = 1;
+ debugs(84, 3, "helperHandleRead: end of reply found");
+ if (eom > msg && eom[-1] == '\r' && hlp->eom == '\n') {
+ *eom = '\0';
+ // rewind to the \r octet which is the real terminal now
+ // and remember that we have to skip forward 2 places now.
+ skip = 2;
+ --eom;
+ }
+ *eom = '\0';
}
- helperReturnBuffer(i, srv, hlp, msg, t);
+ if (!srv->ignoreToEom && !srv->replyXaction) {
+ int i = 0;
+ if (hlp->childs.concurrency) {
+ char *e = NULL;
+ i = strtol(msg, &e, 10);
+ // Do we need to check for e == msg? Means wrong response from helper.
+ // Will be droped as "unexpected reply on channel 0"
+ needsMore = !(xisspace(*e) || (eom && e == eom));
+ if (!needsMore) {
+ msg = e;
+ while (*msg && xisspace(*msg))
+ ++msg;
+ } // else not enough data to compute request number
+ }
+ if (!(srv->replyXaction = srv->popRequest(i))) {
+ if (srv->stats.timedout) {
+ debugs(84, 3, "Timedout reply received for request-ID: " << i << " , ignore");
+ } else {
+ debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected reply on channel " <<
+ i << " from " << hlp->id_name << " #" << srv->index <<
+ " '" << srv->rbuf << "'");
+ }
+ srv->ignoreToEom = true;
+ }
+ } // else we need to just append reply data to the current Xaction
+
+ if (!needsMore) {
+ size_t msgSize = eom ? eom - msg : (srv->roffset - (msg - srv->rbuf));
+ assert(msgSize <= srv->rbuf_sz);
+ helperReturnBuffer(srv, hlp, msg, msgSize, eom);
+ msg += msgSize + skip;
+ assert(static_cast<size_t>(msg - srv->rbuf) <= srv->rbuf_sz);
+
+ // The next message should not ignored.
+ if (eom && srv->ignoreToEom)
+ srv->ignoreToEom = false;
+ } else
+ assert(skip == 0 && eom == NULL);
+ }
+
+ if (needsMore) {
+ size_t msgSize = (srv->roffset - (msg - srv->rbuf));
+ assert(msgSize <= srv->rbuf_sz);
+ memmove(srv->rbuf, msg, msgSize);
+ srv->roffset = msgSize;
+ srv->rbuf[srv->roffset] = '\0';
+ } else {
+ // All of the responses parsed and msg points at the end of read data
+ assert(static_cast<size_t>(msg - srv->rbuf) == srv->roffset);
+ srv->roffset = 0;
}
- if (Comm::IsConnOpen(srv->readPipe)) {
+ if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
int spaceSize = srv->rbuf_sz - srv->roffset - 1;
assert(spaceSize >= 0);
- // grow the input buffer if needed and possible
- if (!spaceSize && srv->rbuf_sz + 4096 <= ReadBufMaxSize) {
- srv->rbuf = (char *)memReallocBuf(srv->rbuf, srv->rbuf_sz + 4096, &srv->rbuf_sz);
- debugs(84, 3, HERE << "Grew read buffer to " << srv->rbuf_sz);
- spaceSize = srv->rbuf_sz - srv->roffset - 1;
- assert(spaceSize >= 0);
- }
-
- // quit reading if there is no space left
- if (!spaceSize) {
- debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
- "helper that overflowed " << srv->rbuf_sz << "-byte " <<
- "Squid input buffer: " << hlp->id_name << " #" <<
- (srv->index + 1));
- srv->closePipesSafely();
- return;
- }
-
AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
CommIoCbPtrFun(helperHandleRead, srv));
comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
}
static void
-helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
+helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
{
char *t = NULL;
helper_stateful_server *srv = (helper_stateful_server *)data;
- helper_stateful_request *r;
statefulhelper *hlp = srv->parent;
assert(cbdataReferenceValid(data));
- /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
+ /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
- if (flag == COMM_ERR_CLOSING) {
+ if (flag == Comm::ERR_CLOSING) {
return;
}
assert(conn->fd == srv->readPipe->fd);
debugs(84, 5, "helperStatefulHandleRead: " << len << " bytes from " <<
- hlp->id_name << " #" << srv->index + 1);
+ hlp->id_name << " #" << srv->index);
- if (flag != COMM_OK || len == 0) {
- srv->closePipesSafely();
+ if (flag != Comm::OK || len == 0) {
+ srv->closePipesSafely(hlp->id_name);
return;
}
srv->roffset += len;
srv->rbuf[srv->roffset] = '\0';
- r = srv->request;
+ Helper::Xaction *r = srv->requests.front();
+ debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
if (r == NULL) {
/* someone spoke without being spoken to */
debugs(84, DBG_IMPORTANT, "helperStatefulHandleRead: unexpected read from " <<
- hlp->id_name << " #" << srv->index + 1 << ", " << (int)len <<
+ hlp->id_name << " #" << srv->index << ", " << (int)len <<
" bytes '" << srv->rbuf << "'");
srv->roffset = 0;
}
if ((t = strchr(srv->rbuf, hlp->eom))) {
- /* end of reply found */
- int called = 1;
debugs(84, 3, "helperStatefulHandleRead: end of reply found");
- if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n')
- t[-1] = '\0';
+ if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n') {
+ *t = '\0';
+ // rewind to the \r octet which is the real terminal now
+ --t;
+ }
*t = '\0';
+ }
- if (r && cbdataReferenceValid(r->data)) {
- HelperReply res(srv->rbuf, (t - srv->rbuf));
- res.whichServer = srv;
- r->callback(r->data, res);
+ if (r && !r->reply.accumulate(srv->rbuf, t ? (t - srv->rbuf) : srv->roffset)) {
+ debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
+ "helper that overflowed " << srv->rbuf_sz << "-byte " <<
+ "Squid input buffer: " << hlp->id_name << " #" << srv->index);
+ srv->closePipesSafely(hlp->id_name);
+ return;
+ }
+ /**
+ * BUG: the below assumes that only one response per read() was received and discards any octets remaining.
+ * Doing this prohibits concurrency support with multiple replies per read().
+ * TODO: check that read() setup on these buffers pays attention to roffest!=0
+ * TODO: check that replies bigger than the buffer are discarded and do not to affect future replies
+ */
+ srv->roffset = 0;
+
+ if (t) {
+ /* end of reply found */
+ srv->requests.pop_front(); // we already have it in 'r'
+ int called = 1;
+
+ if (r && cbdataReferenceValid(r->request.data)) {
+ r->reply.finalize();
+ r->reply.reservationId = srv->reservationId;
+ r->request.callback(r->request.data, r->reply);
} else {
debugs(84, DBG_IMPORTANT, "StatefulHandleRead: no callback data registered");
called = 0;
}
- srv->flags.busy = 0;
- srv->roffset = 0;
- helperStatefulRequestFree(r);
- srv->request = NULL;
+ delete r;
-- srv->stats.pending;
++ srv->stats.replies;
if (called)
helperStatefulServerDone(srv);
else
- helperStatefulReleaseServer(srv);
+ hlp->cancelReservation(srv->reservationId);
}
- if (Comm::IsConnOpen(srv->readPipe)) {
- int spaceSize = srv->rbuf_sz - srv->roffset - 1;
- assert(spaceSize >= 0);
-
- // grow the input buffer if needed and possible
- if (!spaceSize && srv->rbuf_sz + 4096 <= ReadBufMaxSize) {
- srv->rbuf = (char *)memReallocBuf(srv->rbuf, srv->rbuf_sz + 4096, &srv->rbuf_sz);
- debugs(84, 3, HERE << "Grew read buffer to " << srv->rbuf_sz);
- spaceSize = srv->rbuf_sz - srv->roffset - 1;
- assert(spaceSize >= 0);
- }
-
- // quit reading if there is no space left
- if (!spaceSize) {
- debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
- "helper that overflowed " << srv->rbuf_sz << "-byte " <<
- "Squid input buffer: " << hlp->id_name << " #" <<
- (srv->index + 1));
- srv->closePipesSafely();
- return;
- }
+ if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
+ int spaceSize = srv->rbuf_sz - 1;
AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
CommIoCbPtrFun(helperStatefulHandleRead, srv));
- comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
+ comm_read(srv->readPipe, srv->rbuf, spaceSize, call);
}
}
+/// Handles a request when all running helpers, if any, are busy.
static void
-Enqueue(helper * hlp, helper_request * r)
+Enqueue(helper * hlp, Helper::Xaction * r)
{
- dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
- dlinkAddTail(r, link, &hlp->queue);
+ hlp->queue.push(r);
++ hlp->stats.queue_size;
/* do this first so idle=N has a chance to grow the child pool before it hits critical. */
return;
}
- if (hlp->stats.queue_size < (int)hlp->childs.n_running)
+ if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
return;
if (squid_curtime - hlp->last_queue_warn < 600)
debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
-
- if (hlp->stats.queue_size > (int)hlp->childs.n_running * 2)
- fatalf("Too many queued %s requests", hlp->id_name);
}
static void
-StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r)
+StatefulEnqueue(statefulhelper * hlp, Helper::Xaction * r)
{
- dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
- dlinkAddTail(r, link, &hlp->queue);
+ hlp->queue.push(r);
++ hlp->stats.queue_size;
/* do this first so idle=N has a chance to grow the child pool before it hits critical. */
return;
}
- if (hlp->stats.queue_size < (int)hlp->childs.n_running)
+ if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
return;
- if (hlp->stats.queue_size > (int)hlp->childs.n_running * 2)
- fatalf("Too many queued %s requests", hlp->id_name);
-
if (squid_curtime - hlp->last_queue_warn < 600)
return;
debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
}
-static helper_request *
-Dequeue(helper * hlp)
-{
- dlink_node *link;
- helper_request *r = NULL;
-
- if ((link = hlp->queue.head)) {
- r = (helper_request *)link->data;
- dlinkDelete(link, &hlp->queue);
- memFree(link, MEM_DLINK_NODE);
- -- hlp->stats.queue_size;
- }
-
- return r;
-}
-
-static helper_stateful_request *
-StatefulDequeue(statefulhelper * hlp)
+Helper::Xaction *
+helper::nextRequest()
{
- dlink_node *link;
- helper_stateful_request *r = NULL;
-
- if ((link = hlp->queue.head)) {
- r = (helper_stateful_request *)link->data;
- dlinkDelete(link, &hlp->queue);
- memFree(link, MEM_DLINK_NODE);
- -- hlp->stats.queue_size;
- }
+ if (queue.empty())
+ return nullptr;
+ auto *r = queue.front();
+ queue.pop();
+ --stats.queue_size;
return r;
}
static helper_server *
-GetFirstAvailable(helper * hlp)
+GetFirstAvailable(const helper * hlp)
{
dlink_node *n;
helper_server *srv;
selected = srv;
}
- /* Check for overload */
if (!selected) {
debugs(84, 5, "GetFirstAvailable: None available.");
return NULL;
}
if (selected->stats.pending >= (hlp->childs.concurrency ? hlp->childs.concurrency : 1)) {
- debugs(84, 3, "GetFirstAvailable: Least-loaded helper is overloaded!");
+ debugs(84, 3, "GetFirstAvailable: Least-loaded helper is fully loaded!");
return NULL;
}
{
dlink_node *n;
helper_stateful_server *srv = NULL;
+ helper_stateful_server *oldestReservedServer = nullptr;
debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp->childs.n_running);
if (hlp->childs.n_running == 0)
for (n = hlp->servers.head; n != NULL; n = n->next) {
srv = (helper_stateful_server *)n->data;
- if (srv->flags.busy)
+ if (srv->stats.pending)
continue;
- if (srv->flags.reserved)
+ if (srv->reserved()) {
+ if ((squid_curtime - srv->reservationStart) > hlp->childs.reservationTimeout) {
+ if (!oldestReservedServer)
+ oldestReservedServer = srv;
+ else if (oldestReservedServer->reservationStart < srv->reservationStart)
+ oldestReservedServer = srv;
+ debugs(84, 5, "the earlier reserved server is the srv-" << oldestReservedServer->index);
+ }
continue;
+ }
if (srv->flags.shutdown)
continue;
- if ((hlp->IsAvailable != NULL) && (srv->data != NULL) && !(hlp->IsAvailable(srv->data)))
- continue;
-
debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv->index);
return srv;
}
+ if (oldestReservedServer) {
+ debugs(84, 5, "expired reservation " << oldestReservedServer->reservationId << " for srv-" << oldestReservedServer->index);
+ return oldestReservedServer;
+ }
+
debugs(84, 5, "StatefulGetFirstAvailable: None available.");
- return NULL;
+ return nullptr;
}
static void
-helperDispatchWriteDone(const Comm::ConnectionPointer &conn, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
+helperDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag flag, int, void *data)
{
helper_server *srv = (helper_server *)data;
srv->writebuf->clean();
delete srv->writebuf;
srv->writebuf = NULL;
- srv->flags.writing = 0;
+ srv->flags.writing = false;
- if (flag != COMM_OK) {
+ if (flag != Comm::OK) {
/* Helper server has crashed */
- debugs(84, DBG_CRITICAL, "helperDispatch: Helper " << srv->parent->id_name << " #" << srv->index + 1 << " has crashed");
+ debugs(84, DBG_CRITICAL, "helperDispatch: Helper " << srv->parent->id_name << " #" << srv->index << " has crashed");
return;
}
if (!srv->wqueue->isNull()) {
srv->writebuf = srv->wqueue;
srv->wqueue = new MemBuf;
- srv->flags.writing = 1;
+ srv->flags.writing = true;
AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
CommIoCbPtrFun(helperDispatchWriteDone, srv));
Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
}
static void
-helperDispatch(helper_server * srv, helper_request * r)
+helperDispatch(helper_server * srv, Helper::Xaction * r)
{
helper *hlp = srv->parent;
- helper_request **ptr = NULL;
- unsigned int slot;
+ const uint64_t reqId = ++srv->nextRequestId;
- if (!cbdataReferenceValid(r->data)) {
+ if (!cbdataReferenceValid(r->request.data)) {
debugs(84, DBG_IMPORTANT, "helperDispatch: invalid callback data");
- helperRequestFree(r);
+ delete r;
return;
}
- for (slot = 0; slot < (hlp->childs.concurrency ? hlp->childs.concurrency : 1); ++slot) {
- if (!srv->requests[slot]) {
- ptr = &srv->requests[slot];
- break;
- }
- }
-
- assert(ptr);
- *ptr = r;
- r->dispatch_time = current_time;
+ r->request.Id = reqId;
+ helper_server::Requests::iterator it = srv->requests.insert(srv->requests.end(), r);
+ r->request.dispatch_time = current_time;
if (srv->wqueue->isNull())
srv->wqueue->init();
- if (hlp->childs.concurrency)
- srv->wqueue->Printf("%d %s", slot, r->buf);
- else
- srv->wqueue->append(r->buf, strlen(r->buf));
+ if (hlp->childs.concurrency) {
+ srv->requestsIndex.insert(helper_server::RequestIndex::value_type(reqId, it));
+ assert(srv->requestsIndex.size() == srv->requests.size());
+ srv->wqueue->appendf("%" PRIu64 " %s", reqId, r->request.buf);
+ } else
+ srv->wqueue->append(r->request.buf, strlen(r->request.buf));
if (!srv->flags.writing) {
assert(NULL == srv->writebuf);
srv->writebuf = srv->wqueue;
srv->wqueue = new MemBuf;
- srv->flags.writing = 1;
+ srv->flags.writing = true;
AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
CommIoCbPtrFun(helperDispatchWriteDone, srv));
Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
}
- debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index + 1 << ", " << strlen(r->buf) << " bytes");
+ debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index << ", " << strlen(r->request.buf) << " bytes");
++ srv->stats.uses;
++ srv->stats.pending;
}
static void
-helperStatefulDispatchWriteDone(const Comm::ConnectionPointer &conn, char *buf, size_t len, comm_err_t flag,
- int xerrno, void *data)
-{
- /* nothing! */
-}
+helperStatefulDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag, int, void *)
+{}
static void
-helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r)
+helperStatefulDispatch(helper_stateful_server * srv, Helper::Xaction * r)
{
statefulhelper *hlp = srv->parent;
- if (!cbdataReferenceValid(r->data)) {
+ if (!cbdataReferenceValid(r->request.data)) {
debugs(84, DBG_IMPORTANT, "helperStatefulDispatch: invalid callback data");
- helperStatefulRequestFree(r);
- helperStatefulReleaseServer(srv);
+ delete r;
+ hlp->cancelReservation(srv->reservationId);
return;
}
- debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index + 1);
+ debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index);
+
+ assert(srv->reservationId);
+ r->reply.reservationId = srv->reservationId;
- if (r->placeholder == 1) {
+ if (r->request.placeholder == 1) {
/* a callback is needed before this request can _use_ a helper. */
/* we don't care about releasing this helper. The request NEVER
* gets to the helper. So we throw away the return code */
- HelperReply nilReply;
- nilReply.whichServer = srv;
- r->callback(r->data, nilReply);
+ r->reply.result = Helper::Unknown;
+ r->request.callback(r->request.data, r->reply);
/* throw away the placeholder */
- helperStatefulRequestFree(r);
+ delete r;
/* and push the queue. Note that the callback may have submitted a new
* request to the helper which is why we test for the request */
- if (srv->request == NULL)
+ if (!srv->requests.size())
helperStatefulServerDone(srv);
return;
}
- srv->flags.busy = 1;
- srv->flags.reserved = 1;
- srv->request = r;
+ srv->requests.push_back(r);
srv->dispatch_time = current_time;
AsyncCall::Pointer call = commCbCall(5,5, "helperStatefulDispatchWriteDone",
CommIoCbPtrFun(helperStatefulDispatchWriteDone, hlp));
- Comm::Write(srv->writePipe, r->buf, strlen(r->buf), call, NULL);
+ Comm::Write(srv->writePipe, r->request.buf, strlen(r->request.buf), call, NULL);
debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
- hlp->id_name << " #" << srv->index + 1 << ", " <<
- (int) strlen(r->buf) << " bytes");
+ hlp->id_name << " #" << srv->index << ", " <<
+ (int) strlen(r->request.buf) << " bytes");
++ srv->stats.uses;
++ srv->stats.pending;
static void
helperKickQueue(helper * hlp)
{
- helper_request *r;
+ Helper::Xaction *r;
helper_server *srv;
- while ((srv = GetFirstAvailable(hlp)) && (r = Dequeue(hlp)))
+ while ((srv = GetFirstAvailable(hlp)) && (r = hlp->nextRequest()))
helperDispatch(srv, r);
}
static void
helperStatefulKickQueue(statefulhelper * hlp)
{
- helper_stateful_request *r;
+ Helper::Xaction *r;
helper_stateful_server *srv;
-
- while ((srv = StatefulGetFirstAvailable(hlp)) && (r = StatefulDequeue(hlp)))
+ while ((srv = StatefulGetFirstAvailable(hlp)) && (r = hlp->nextRequest())) {
+ debugs(84, 5, "found srv-" << srv->index);
+ hlp->reserveServer(srv);
helperStatefulDispatch(srv, r);
+ }
}
static void
{
if (!srv->flags.shutdown) {
helperStatefulKickQueue(srv->parent);
- } else if (!srv->flags.closing && !srv->flags.reserved && !srv->flags.busy) {
- srv->closeWritePipeSafely();
+ } else if (!srv->flags.closing && !srv->reserved() && !srv->stats.pending) {
+ srv->closeWritePipeSafely(srv->parent->id_name);
return;
}
}
-static void
-helperRequestFree(helper_request * r)
-{
- cbdataReferenceDone(r->data);
- xfree(r->buf);
- delete r;
-}
-
-static void
-helperStatefulRequestFree(helper_stateful_request * r)
+void
+helper_server::checkForTimedOutRequests(bool const retry)
{
- if (r) {
- cbdataReferenceDone(r->data);
- xfree(r->buf);
- delete r;
+ assert(parent->childs.concurrency);
+ while(!requests.empty() && requests.front()->request.timedOut(parent->timeout)) {
+ Helper::Xaction *r = requests.front();
+ RequestIndex::iterator it;
+ it = requestsIndex.find(r->request.Id);
+ assert(it != requestsIndex.end());
+ requestsIndex.erase(it);
+ requests.pop_front();
+ debugs(84, 2, "Request " << r->request.Id << " timed-out, remove it from queue");
+ void *cbdata;
+ bool retried = false;
+ if (retry && r->request.retries < MAX_RETRIES && cbdataReferenceValid(r->request.data)) {
+ debugs(84, 2, "Retry request " << r->request.Id);
+ ++r->request.retries;
+ parent->submitRequest(r);
+ retried = true;
+ } else if (cbdataReferenceValidDone(r->request.data, &cbdata)) {
+ if (!parent->onTimedOutResponse.isEmpty()) {
+ if (r->reply.accumulate(parent->onTimedOutResponse.rawContent(), parent->onTimedOutResponse.length()))
+ r->reply.finalize();
+ else
+ r->reply.result = Helper::TimedOut;
+ r->request.callback(cbdata, r->reply);
+ } else {
+ r->reply.result = Helper::TimedOut;
+ r->request.callback(cbdata, r->reply);
+ }
+ }
+ --stats.pending;
+ ++stats.timedout;
+ ++parent->stats.timedout;
+ if (!retried)
+ delete r;
}
}
-// TODO: should helper_ and helper_stateful_ have a common parent?
-static bool
-helperStartStats(StoreEntry *sentry, void *hlp, const char *label)
+void
+helper_server::requestTimeout(const CommTimeoutCbParams &io)
{
- if (!hlp) {
- if (label)
- storeAppendPrintf(sentry, "%s: unavailable\n", label);
- return false;
- }
+ debugs(26, 3, HERE << io.conn);
+ helper_server *srv = static_cast<helper_server *>(io.data);
- if (label)
- storeAppendPrintf(sentry, "%s:\n", label);
+ srv->checkForTimedOutRequests(srv->parent->retryTimedOut);
+
+ debugs(84, 3, HERE << io.conn << " establish new helper_server::requestTimeout");
+ AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "helper_server::requestTimeout",
+ CommTimeoutCbPtrFun(helper_server::requestTimeout, srv));
- return true;
+ const int timeSpent = srv->requests.empty() ? 0 : (squid_curtime - srv->requests.front()->request.dispatch_time.tv_sec);
+ const int timeLeft = max(1, (static_cast<int>(srv->parent->timeout) - timeSpent));
+
+ commSetConnTimeout(io.conn, timeLeft, timeoutCall);
}
+