/*
- * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
+ * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
*
* Squid software is distributed under GPLv2+ license and includes
* contributions from numerous individuals and organizations.
#include "squid.h"
#include "base/AsyncCbdataCalls.h"
#include "base/Packable.h"
+#include "base/Raw.h"
#include "comm.h"
#include "comm/Connection.h"
#include "comm/Read.h"
#include "comm/Write.h"
+#include "debug/Messages.h"
#include "fd.h"
#include "fde.h"
#include "format/Quoting.h"
#include "SquidConfig.h"
#include "SquidIpc.h"
#include "SquidMath.h"
-#include "SquidTime.h"
#include "Store.h"
#include "wordlist.h"
static IOCB helperHandleRead;
static IOCB helperStatefulHandleRead;
-static void helperServerFree(helper_server *srv);
-static void helperStatefulServerFree(helper_stateful_server *srv);
-static void Enqueue(helper * hlp, Helper::Xaction *);
-static helper_server *GetFirstAvailable(const helper * hlp);
-static helper_stateful_server *StatefulGetFirstAvailable(const statefulhelper * hlp);
-static void helperDispatch(helper_server * srv, Helper::Xaction * r);
+static void Enqueue(Helper::Client *, Helper::Xaction *);
+static Helper::Session *GetFirstAvailable(const Helper::Client::Pointer &);
+static helper_stateful_server *StatefulGetFirstAvailable(const statefulhelper::Pointer &);
+static void helperDispatch(Helper::Session *, Helper::Xaction *);
static void helperStatefulDispatch(helper_stateful_server * srv, Helper::Xaction * r);
-static void helperKickQueue(helper * hlp);
-static void helperStatefulKickQueue(statefulhelper * hlp);
+static void helperKickQueue(const Helper::Client::Pointer &);
+static void helperStatefulKickQueue(const statefulhelper::Pointer &);
static void helperStatefulServerDone(helper_stateful_server * srv);
static void StatefulEnqueue(statefulhelper * hlp, Helper::Xaction * r);
-CBDATA_CLASS_INIT(helper);
-CBDATA_CLASS_INIT(helper_server);
-CBDATA_CLASS_INIT(statefulhelper);
+CBDATA_NAMESPACED_CLASS_INIT(Helper, Session);
CBDATA_CLASS_INIT(helper_stateful_server);
-InstanceIdDefinitions(HelperServerBase, "Hlpr");
+InstanceIdDefinitions(Helper::SessionBase, "Hlpr");
void
-HelperServerBase::initStats()
+Helper::SessionBase::initStats()
{
stats.uses=0;
stats.replies=0;
}
void
-HelperServerBase::closePipesSafely(const char *id_name)
+Helper::SessionBase::closePipesSafely(const char * const id_name)
{
#if _SQUID_WINDOWS_
shutdown(writePipe->fd, SD_BOTH);
}
CloseHandle(hIpc);
}
+#else
+ (void)id_name;
#endif
}
void
-HelperServerBase::closeWritePipeSafely(const char *id_name)
+Helper::SessionBase::closeWritePipeSafely(const char * const id_name)
{
#if _SQUID_WINDOWS_
shutdown(writePipe->fd, (readPipe->fd == writePipe->fd ? SD_BOTH : SD_SEND));
}
CloseHandle(hIpc);
}
+#else
+ (void)id_name;
#endif
}
void
-helperOpenServers(helper * hlp)
+Helper::SessionBase::dropQueued()
+{
+ while (!requests.empty()) {
+ // XXX: re-schedule these on another helper?
+ const auto r = requests.front();
+ requests.pop_front();
+ void *cbdata;
+ if (cbdataReferenceValidDone(r->request.data, &cbdata)) {
+ r->reply.result = Helper::Unknown;
+ r->request.callback(cbdata, r->reply);
+ }
+
+ delete r;
+ }
+}
+
+Helper::SessionBase::~SessionBase()
+{
+ if (rbuf) {
+ memFreeBuf(rbuf_sz, rbuf);
+ rbuf = nullptr;
+ }
+}
+
+Helper::Session::~Session()
+{
+ wqueue->clean();
+ delete wqueue;
+
+ if (writebuf) {
+ writebuf->clean();
+ delete writebuf;
+ writebuf = nullptr;
+ }
+
+ if (Comm::IsConnOpen(writePipe))
+ closeWritePipeSafely(parent->id_name);
+
+ dlinkDelete(&link, &parent->servers);
+
+ assert(parent->childs.n_running > 0);
+ -- parent->childs.n_running;
+
+ assert(requests.empty());
+}
+
+void
+Helper::Session::dropQueued()
+{
+ SessionBase::dropQueued();
+ requestsIndex.clear();
+}
+
+helper_stateful_server::~helper_stateful_server()
+{
+ /* TODO: walk the local queue of requests and carry them all out */
+ if (Comm::IsConnOpen(writePipe))
+ closeWritePipeSafely(parent->id_name);
+
+ parent->cancelReservation(reservationId);
+
+ dlinkDelete(&link, &parent->servers);
+
+ assert(parent->childs.n_running > 0);
+ -- parent->childs.n_running;
+
+ assert(requests.empty());
+}
+
+void
+helperOpenServers(const Helper::Client::Pointer &hlp)
{
char *s;
char *progname;
char *procname;
const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
char fd_note_buf[FD_DESC_SZ];
- helper_server *srv;
int nargs = 0;
int k;
pid_t pid;
void * hIpc;
wordlist *w;
- if (hlp->cmdline == NULL)
+ if (hlp->cmdline == nullptr)
return;
progname = hlp->cmdline->key;
/* figure out how many new child are actually needed. */
int need_new = hlp->childs.needNew();
- debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
+ debugs(84, Important(19), "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
if (need_new < 1) {
- debugs(84, DBG_IMPORTANT, "helperOpenServers: No '" << shortname << "' processes needed.");
+ debugs(84, Important(20), "helperOpenServers: No '" << shortname << "' processes needed.");
}
procname = (char *)xmalloc(strlen(shortname) + 3);
++nargs;
}
- args[nargs] = NULL;
+ args[nargs] = nullptr;
++nargs;
assert(nargs <= HELPER_MAX_ARGS);
+ int successfullyStarted = 0;
+
for (k = 0; k < need_new; ++k) {
getCurrentTime();
rfd = wfd = -1;
continue;
}
+ ++successfullyStarted;
++ hlp->childs.n_running;
++ hlp->childs.n_active;
- srv = new helper_server;
+ const auto srv = new Helper::Session;
srv->hIpc = hIpc;
srv->pid = pid;
srv->initStats();
srv->wqueue = new MemBuf;
srv->roffset = 0;
srv->nextRequestId = 0;
- srv->replyXaction = NULL;
+ srv->replyXaction = nullptr;
srv->ignoreToEom = false;
- srv->parent = cbdataReference(hlp);
+ srv->parent = hlp;
dlinkAddTail(srv, &srv->link, &hlp->servers);
if (rfd == wfd) {
if (wfd != rfd)
commSetNonBlocking(wfd);
- AsyncCall::Pointer closeCall = asyncCall(5,4, "helperServerFree", cbdataDialer(helperServerFree, srv));
+ AsyncCall::Pointer closeCall = asyncCall(5,4, "Helper::Session::HelperServerClosed", cbdataDialer(Helper::Session::HelperServerClosed, srv));
comm_add_close_handler(rfd, closeCall);
if (hlp->timeout && hlp->childs.concurrency) {
- AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "helper_server::requestTimeout",
- CommTimeoutCbPtrFun(helper_server::requestTimeout, srv));
+ AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "Helper::Session::requestTimeout",
+ CommTimeoutCbPtrFun(Helper::Session::requestTimeout, srv));
commSetConnTimeout(srv->readPipe, hlp->timeout, timeoutCall);
}
comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
}
+ // Call handleFewerServers() before hlp->last_restart is updated because
+ // that method uses last_restart to measure the delay since previous start.
+ // TODO: Refactor last_restart code to measure failure frequency rather than
+ // detecting a helper #X failure that is being close to the helper #Y start.
+ if (successfullyStarted < need_new)
+ hlp->handleFewerServers(false);
+
hlp->last_restart = squid_curtime;
safe_free(shortname);
safe_free(procname);
* helperStatefulOpenServers: create the stateful child helper processes
*/
void
-helperStatefulOpenServers(statefulhelper * hlp)
+helperStatefulOpenServers(const statefulhelper::Pointer &hlp)
{
char *shortname;
const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
char fd_note_buf[FD_DESC_SZ];
int nargs = 0;
- if (hlp->cmdline == NULL)
+ if (hlp->cmdline == nullptr)
return;
if (hlp->childs.concurrency)
++nargs;
}
- args[nargs] = NULL;
+ args[nargs] = nullptr;
++nargs;
assert(nargs <= HELPER_MAX_ARGS);
+ int successfullyStarted = 0;
+
for (int k = 0; k < need_new; ++k) {
getCurrentTime();
int rfd = -1;
continue;
}
+ ++successfullyStarted;
++ hlp->childs.n_running;
++ hlp->childs.n_active;
helper_stateful_server *srv = new helper_stateful_server;
srv->hIpc = hIpc;
srv->pid = pid;
- srv->flags.reserved = false;
srv->initStats();
srv->addr = hlp->addr;
srv->readPipe = new Comm::Connection;
srv->writePipe->fd = wfd;
srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
srv->roffset = 0;
- srv->parent = cbdataReference(hlp);
-
- if (hlp->datapool != NULL)
- srv->data = hlp->datapool->alloc();
+ srv->parent = hlp;
+ srv->reservationStart = 0;
dlinkAddTail(srv, &srv->link, &hlp->servers);
if (wfd != rfd)
commSetNonBlocking(wfd);
- AsyncCall::Pointer closeCall = asyncCall(5,4, "helperStatefulServerFree", cbdataDialer(helperStatefulServerFree, srv));
+ AsyncCall::Pointer closeCall = asyncCall(5,4, "helper_stateful_server::HelperServerClosed", cbdataDialer(helper_stateful_server::HelperServerClosed, srv));
comm_add_close_handler(rfd, closeCall);
AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
}
+ // Call handleFewerServers() before hlp->last_restart is updated because
+ // that method uses last_restart to measure the delay since previous start.
+ // TODO: Refactor last_restart code to measure failure frequency rather than
+ // detecting a helper #X failure that is being close to the helper #Y start.
+ if (successfullyStarted < need_new)
+ hlp->handleFewerServers(false);
+
hlp->last_restart = squid_curtime;
safe_free(shortname);
safe_free(procname);
}
void
-helper::submitRequest(Helper::Xaction *r)
+Helper::Client::submitRequest(Helper::Xaction * const r)
{
- helper_server *srv;
-
- if ((srv = GetFirstAvailable(this)))
+ if (const auto srv = GetFirstAvailable(this))
helperDispatch(srv, r);
else
Enqueue(this, r);
/// handles helperSubmit() and helperStatefulSubmit() failures
static void
-SubmissionFailure(helper *hlp, HLPCB *callback, void *data)
+SubmissionFailure(const Helper::Client::Pointer &hlp, HLPCB *callback, void *data)
{
auto result = Helper::Error;
if (!hlp) {
}
void
-helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
+helperSubmit(const Helper::Client::Pointer &hlp, const char * const buf, HLPCB * const callback, void * const data)
{
if (!hlp || !hlp->trySubmit(buf, callback, data))
SubmissionFailure(hlp, callback, data);
/// whether queuing an additional request would overload the helper
bool
-helper::queueFull() const {
+Helper::Client::queueFull() const {
return stats.queue_size >= static_cast<int>(childs.queue_size);
}
bool
-helper::overloaded() const {
+Helper::Client::overloaded() const {
return stats.queue_size > static_cast<int>(childs.queue_size);
}
/// synchronizes queue-dependent measurements with the current queue state
void
-helper::syncQueueStats()
+Helper::Client::syncQueueStats()
{
if (overloaded()) {
if (overloadStart) {
/// returns true if and only if the submission should proceed
/// may kill Squid if the helper remains overloaded for too long
bool
-helper::prepSubmit()
+Helper::Client::prepSubmit()
{
// re-sync for the configuration may have changed since the last submission
syncQueueStats();
if (squid_curtime - overloadStart <= 180)
return true; // also OK: overload has not persisted long enough to panic
- if (childs.onPersistentOverload == Helper::ChildConfig::actDie)
+ if (childs.onPersistentOverload == ChildConfig::actDie)
fatalf("Too many queued %s requests; see on-persistent-overload.", id_name);
if (!droppedRequests) {
}
bool
-helper::trySubmit(const char *buf, HLPCB * callback, void *data)
+Helper::Client::trySubmit(const char * const buf, HLPCB * const callback, void * const data)
{
if (!prepSubmit())
return false; // request was dropped
/// dispatches or enqueues a helper requests; does not enforce queue limits
void
-helper::submit(const char *buf, HLPCB * callback, void *data)
+Helper::Client::submit(const char * const buf, HLPCB * const callback, void * const data)
{
- Helper::Xaction *r = new Helper::Xaction(callback, data, buf);
+ const auto r = new Xaction(callback, data, buf);
submitRequest(r);
debugs(84, DBG_DATA, Raw("buf", buf, strlen(buf)));
}
-/// lastserver = "server last used as part of a reserved request sequence"
+/// Submit request or callback the caller with a Helper::Error error.
+/// If the reservation is not set then reserves a new helper.
void
-helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPCB * callback, void *data, helper_stateful_server * lastserver)
+helperStatefulSubmit(const statefulhelper::Pointer &hlp, const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
{
- if (!hlp || !hlp->trySubmit(buf, callback, data, lastserver))
+ if (!hlp || !hlp->trySubmit(buf, callback, data, reservation))
SubmissionFailure(hlp, callback, data);
}
/// If possible, submit request. Otherwise, either kill Squid or return false.
bool
-statefulhelper::trySubmit(const char *buf, HLPCB * callback, void *data, helper_stateful_server *lastserver)
+statefulhelper::trySubmit(const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
{
if (!prepSubmit())
return false; // request was dropped
- submit(buf, callback, data, lastserver); // will send or queue
+ submit(buf, callback, data, reservation); // will send or queue
return true; // request submitted or queued
}
-void statefulhelper::submit(const char *buf, HLPCB * callback, void *data, helper_stateful_server * lastserver)
+void
+statefulhelper::reserveServer(helper_stateful_server * srv)
{
- Helper::Xaction *r = new Helper::Xaction(callback, data, buf);
+ // clear any old reservation
+ if (srv->reserved()) {
+ reservations.erase(srv->reservationId);
+ srv->clearReservation();
+ }
+
+ srv->reserve();
+ reservations.insert(Reservations::value_type(srv->reservationId, srv));
+}
+
+void
+statefulhelper::cancelReservation(const Helper::ReservationId reservation)
+{
+ const auto it = reservations.find(reservation);
+ if (it == reservations.end())
+ return;
+
+ helper_stateful_server *srv = it->second;
+ reservations.erase(it);
+ srv->clearReservation();
+
+ // schedule a queue kick
+ AsyncCall::Pointer call = asyncCall(5,4, "helperStatefulServerDone", cbdataDialer(helperStatefulServerDone, srv));
+ ScheduleCallHere(call);
+}
+
+helper_stateful_server *
+statefulhelper::findServer(const Helper::ReservationId & reservation)
+{
+ const auto it = reservations.find(reservation);
+ if (it == reservations.end())
+ return nullptr;
+ return it->second;
+}
+
+void
+helper_stateful_server::reserve()
+{
+ assert(!reservationId);
+ reservationStart = squid_curtime;
+ reservationId = Helper::ReservationId::Next();
+ debugs(84, 3, "srv-" << index << " reservation id = " << reservationId);
+}
+
+void
+helper_stateful_server::clearReservation()
+{
+ debugs(84, 3, "srv-" << index << " reservation id = " << reservationId);
+ if (!reservationId)
+ return;
+
+ ++stats.releases;
- if ((buf != NULL) && lastserver) {
- debugs(84, 5, "StatefulSubmit with lastserver " << lastserver);
- assert(lastserver->flags.reserved);
- assert(!lastserver->requests.size());
+ reservationId.clear();
+ reservationStart = 0;
+}
+
+void
+statefulhelper::submit(const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
+{
+ Helper::Xaction *r = new Helper::Xaction(callback, data, buf);
+ if (buf && reservation) {
+ debugs(84, 5, reservation);
+ helper_stateful_server *lastServer = findServer(reservation);
+ if (!lastServer) {
+ debugs(84, DBG_CRITICAL, "ERROR: Helper " << id_name << " reservation expired (" << reservation << ")");
+ r->reply.result = Helper::TimedOut;
+ r->request.callback(r->request.data, r->reply);
+ delete r;
+ return;
+ }
debugs(84, 5, "StatefulSubmit dispatching");
- helperStatefulDispatch(lastserver, r);
+ helperStatefulDispatch(lastServer, r);
} else {
helper_stateful_server *srv;
if ((srv = StatefulGetFirstAvailable(this))) {
+ reserveServer(srv);
helperStatefulDispatch(srv, r);
} else
StatefulEnqueue(this, r);
syncQueueStats();
}
-/**
- * DPW 2007-05-08
- *
- * helperStatefulReleaseServer tells the helper that whoever was
- * using it no longer needs its services.
- */
void
-helperStatefulReleaseServer(helper_stateful_server * srv)
-{
- debugs(84, 3, HERE << "srv-" << srv->index << " flags.reserved = " << srv->flags.reserved);
- if (!srv->flags.reserved)
- return;
-
- ++ srv->stats.releases;
-
- srv->flags.reserved = false;
-
- helperStatefulServerDone(srv);
-}
-
-/** return a pointer to the stateful routines data area */
-void *
-helperStatefulServerGetData(helper_stateful_server * srv)
-{
- return srv->data;
-}
-
-void
-helper::packStatsInto(Packable *p, const char *label) const
+Helper::Client::packStatsInto(Packable * const p, const char * const label) const
{
if (label)
p->appendf("%s:\n", label);
"Request");
for (dlink_node *link = servers.head; link; link = link->next) {
- HelperServerBase *srv = static_cast<HelperServerBase *>(link->data);
+ const auto srv = static_cast<SessionBase *>(link->data);
assert(srv);
- Helper::Xaction *xaction = srv->requests.empty() ? NULL : srv->requests.front();
+ const auto xaction = srv->requests.empty() ? nullptr : srv->requests.front();
double tt = 0.001 * (xaction ? tvSubMsec(xaction->request.dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
p->appendf("%7u\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
srv->index.value,
srv->stats.pending ? 'B' : ' ',
srv->flags.writing ? 'W' : ' ',
srv->flags.closing ? 'C' : ' ',
- srv->flags.reserved ? 'R' : ' ',
+ srv->reserved() ? 'R' : ' ',
srv->flags.shutdown ? 'S' : ' ',
xaction && xaction->request.placeholder ? 'P' : ' ',
tt < 0.0 ? 0.0 : tt,
}
bool
-helper::willOverload() const {
+Helper::Client::willOverload() const {
return queueFull() && !(childs.needNew() || GetFirstAvailable(this));
}
+Helper::Client::Pointer
+Helper::Client::Make(const char * const name)
+{
+ return new Client(name);
+}
+
+statefulhelper::Pointer
+statefulhelper::Make(const char *name)
+{
+ return new statefulhelper(name);
+}
+
void
-helperShutdown(helper * hlp)
+helperShutdown(const Helper::Client::Pointer &hlp)
{
dlink_node *link = hlp->servers.head;
while (link) {
- helper_server *srv;
- srv = (helper_server *)link->data;
+ const auto srv = static_cast<Helper::Session *>(link->data);
link = link->next;
if (srv->flags.shutdown) {
}
void
-helperStatefulShutdown(statefulhelper * hlp)
+helperStatefulShutdown(const statefulhelper::Pointer &hlp)
{
dlink_node *link = hlp->servers.head;
helper_stateful_server *srv;
continue;
}
- if (srv->flags.reserved) {
+ if (srv->reserved()) {
if (shutting_down) {
debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Closing anyway.");
} else {
}
}
-helper::~helper()
+Helper::Client::~Client()
{
/* note, don't free id_name, it probably points to static memory */
debugs(84, DBG_CRITICAL, "WARNING: freeing " << id_name << " helper with " << stats.queue_size << " requests queued");
}
-/* ====================================================================== */
-/* LOCAL FUNCTIONS */
-/* ====================================================================== */
-
-static void
-helperServerFree(helper_server *srv)
+void
+Helper::Client::handleKilledServer(SessionBase * const srv, bool &needsNewServers)
{
- helper *hlp = srv->parent;
- int concurrency = hlp->childs.concurrency;
-
- if (!concurrency)
- concurrency = 1;
-
- if (srv->rbuf) {
- memFreeBuf(srv->rbuf_sz, srv->rbuf);
- srv->rbuf = NULL;
- }
-
- srv->wqueue->clean();
- delete srv->wqueue;
-
- if (srv->writebuf) {
- srv->writebuf->clean();
- delete srv->writebuf;
- srv->writebuf = NULL;
- }
-
- if (Comm::IsConnOpen(srv->writePipe))
- srv->closeWritePipeSafely(hlp->id_name);
-
- dlinkDelete(&srv->link, &hlp->servers);
-
- assert(hlp->childs.n_running > 0);
- -- hlp->childs.n_running;
-
+ needsNewServers = false;
if (!srv->flags.shutdown) {
- assert(hlp->childs.n_active > 0);
- -- hlp->childs.n_active;
- debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index << " exited");
-
- if (hlp->childs.needNew() > 0) {
- debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
+ assert(childs.n_active > 0);
+ --childs.n_active;
+ debugs(84, DBG_CRITICAL, "WARNING: " << id_name << " #" << srv->index << " exited");
- if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30) {
- if (srv->stats.replies < 1)
- fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
- else
- debugs(80, DBG_CRITICAL, "ERROR: The " << hlp->id_name << " helpers are crashing too rapidly, need help!");
- }
+ handleFewerServers(srv->stats.replies >= 1);
- debugs(80, DBG_IMPORTANT, "Starting new helpers");
- helperOpenServers(hlp);
+ if (childs.needNew() > 0) {
+ srv->flags.shutdown = true;
+ needsNewServers = true;
}
}
+}
- while (!srv->requests.empty()) {
- // XXX: re-schedule these on another helper?
- Helper::Xaction *r = srv->requests.front();
- srv->requests.pop_front();
- void *cbdata;
+void
+Helper::Client::handleFewerServers(const bool madeProgress)
+{
+ const auto needNew = childs.needNew();
- if (cbdataReferenceValidDone(r->request.data, &cbdata)) {
- r->reply.result = Helper::Unknown;
- r->request.callback(cbdata, r->reply);
- }
+ if (!needNew)
+ return; // some server(s) have died, but we still have enough
- delete r;
- }
- srv->requestsIndex.clear();
+ debugs(80, DBG_IMPORTANT, "Too few " << id_name << " processes are running (need " << needNew << "/" << childs.n_max << ")" <<
+ Debug::Extra << "active processes: " << childs.n_active <<
+ Debug::Extra << "processes configured to start at (re)configuration: " << childs.n_startup);
- cbdataReferenceDone(srv->parent);
- delete srv;
+ if (childs.n_active < childs.n_startup && last_restart > squid_curtime - 30) {
+ if (madeProgress)
+ debugs(80, DBG_CRITICAL, "ERROR: The " << id_name << " helpers are crashing too rapidly, need help!");
+ else
+ fatalf("The %s helpers are crashing too rapidly, need help!", id_name);
+ }
}
-static void
-helperStatefulServerFree(helper_stateful_server *srv)
+void
+Helper::Session::HelperServerClosed(Session * const srv)
{
- statefulhelper *hlp = srv->parent;
+ const auto hlp = srv->parent;
- if (srv->rbuf) {
- memFreeBuf(srv->rbuf_sz, srv->rbuf);
- srv->rbuf = NULL;
+ bool needsNewServers = false;
+ hlp->handleKilledServer(srv, needsNewServers);
+ if (needsNewServers) {
+ debugs(80, DBG_IMPORTANT, "Starting new helpers");
+ helperOpenServers(hlp);
}
-#if 0
- srv->wqueue->clean();
-
- delete srv->wqueue;
+ srv->dropQueued();
-#endif
-
- /* TODO: walk the local queue of requests and carry them all out */
- if (Comm::IsConnOpen(srv->writePipe))
- srv->closeWritePipeSafely(hlp->id_name);
-
- dlinkDelete(&srv->link, &hlp->servers);
-
- assert(hlp->childs.n_running > 0);
- -- hlp->childs.n_running;
-
- if (!srv->flags.shutdown) {
- assert( hlp->childs.n_active > 0);
- -- hlp->childs.n_active;
- debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index << " exited");
-
- if (hlp->childs.needNew() > 0) {
- debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
-
- if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30) {
- if (srv->stats.replies < 1)
- fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
- else
- debugs(80, DBG_CRITICAL, "ERROR: The " << hlp->id_name << " helpers are crashing too rapidly, need help!");
- }
-
- debugs(80, DBG_IMPORTANT, "Starting new helpers");
- helperStatefulOpenServers(hlp);
- }
- }
-
- while (!srv->requests.empty()) {
- // XXX: re-schedule these on another helper?
- Helper::Xaction *r = srv->requests.front();
- srv->requests.pop_front();
- void *cbdata;
+ delete srv;
+}
- if (cbdataReferenceValidDone(r->request.data, &cbdata)) {
- r->reply.result = Helper::Unknown;
- r->request.callback(cbdata, r->reply);
- }
+// XXX: Almost duplicates Helper::Session::HelperServerClosed() because helperOpenServers() is not a virtual method of the `Helper::Client` class
+// TODO: Fix the `Helper::Client` class hierarchy to use virtual functions.
+void
+helper_stateful_server::HelperServerClosed(helper_stateful_server *srv)
+{
+ const auto hlp = srv->parent;
- delete r;
+ bool needsNewServers = false;
+ hlp->handleKilledServer(srv, needsNewServers);
+ if (needsNewServers) {
+ debugs(80, DBG_IMPORTANT, "Starting new helpers");
+ helperStatefulOpenServers(hlp);
}
- if (srv->data != NULL)
- hlp->datapool->freeOne(srv->data);
-
- cbdataReferenceDone(srv->parent);
+ srv->dropQueued();
delete srv;
}
Helper::Xaction *
-helper_server::popRequest(int request_number)
+Helper::Session::popRequest(const int request_number)
{
- Helper::Xaction *r = nullptr;
- helper_server::RequestIndex::iterator it;
+ Xaction *r = nullptr;
if (parent->childs.concurrency) {
- // If concurency supported retrieve request from ID
- it = requestsIndex.find(request_number);
+ // If concurrency supported retrieve request from ID
+ const auto it = requestsIndex.find(request_number);
if (it != requestsIndex.end()) {
r = *(it->second);
requests.erase(it->second);
/// Calls back with a pointer to the buffer with the helper output
static void
-helperReturnBuffer(helper_server * srv, helper * hlp, char * msg, size_t msgSize, char * msgEnd)
+helperReturnBuffer(Helper::Session * srv, const Helper::Client::Pointer &hlp, char * const msg, const size_t msgSize, const char * const msgEnd)
{
if (Helper::Xaction *r = srv->replyXaction) {
const bool hasSpace = r->reply.accumulate(msg, msgSize);
if (!srv->flags.shutdown) {
helperKickQueue(hlp);
} else if (!srv->flags.closing && !srv->stats.pending) {
- srv->flags.closing=true;
- srv->writePipe->close();
+ srv->closeWritePipeSafely(srv->parent->id_name);
}
}
static void
helperHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
{
- helper_server *srv = (helper_server *)data;
- helper *hlp = srv->parent;
+ const auto srv = static_cast<Helper::Session *>(data);
+ const auto hlp = srv->parent;
assert(cbdataReferenceValid(data));
/* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
if (!srv->stats.pending && !srv->stats.timedout) {
/* someone spoke without being spoken to */
- debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected read from " <<
+ debugs(84, DBG_IMPORTANT, "ERROR: helperHandleRead: unexpected read from " <<
hlp->id_name << " #" << srv->index << ", " << (int)len <<
" bytes '" << srv->rbuf << "'");
if (!srv->ignoreToEom && !srv->replyXaction) {
int i = 0;
if (hlp->childs.concurrency) {
- char *e = NULL;
+ char *e = nullptr;
i = strtol(msg, &e, 10);
// Do we need to check for e == msg? Means wrong response from helper.
- // Will be droped as "unexpected reply on channel 0"
+ // Will be dropped as "unexpected reply on channel 0"
needsMore = !(xisspace(*e) || (eom && e == eom));
if (!needsMore) {
msg = e;
if (srv->stats.timedout) {
debugs(84, 3, "Timedout reply received for request-ID: " << i << " , ignore");
} else {
- debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected reply on channel " <<
+ debugs(84, DBG_IMPORTANT, "ERROR: helperHandleRead: unexpected reply on channel " <<
i << " from " << hlp->id_name << " #" << srv->index <<
" '" << srv->rbuf << "'");
}
if (eom && srv->ignoreToEom)
srv->ignoreToEom = false;
} else
- assert(skip == 0 && eom == NULL);
+ assert(skip == 0 && eom == nullptr);
}
if (needsMore) {
static void
helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
{
- char *t = NULL;
+ char *t = nullptr;
helper_stateful_server *srv = (helper_stateful_server *)data;
- statefulhelper *hlp = srv->parent;
+ const auto hlp = srv->parent;
assert(cbdataReferenceValid(data));
/* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
Helper::Xaction *r = srv->requests.front();
debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
- if (r == NULL) {
+ if (r == nullptr) {
/* someone spoke without being spoken to */
- debugs(84, DBG_IMPORTANT, "helperStatefulHandleRead: unexpected read from " <<
+ debugs(84, DBG_IMPORTANT, "ERROR: helperStatefulHandleRead: unexpected read from " <<
hlp->id_name << " #" << srv->index << ", " << (int)len <<
" bytes '" << srv->rbuf << "'");
if (r && cbdataReferenceValid(r->request.data)) {
r->reply.finalize();
- r->reply.whichServer = srv;
+ r->reply.reservationId = srv->reservationId;
r->request.callback(r->request.data, r->reply);
} else {
debugs(84, DBG_IMPORTANT, "StatefulHandleRead: no callback data registered");
if (called)
helperStatefulServerDone(srv);
else
- helperStatefulReleaseServer(srv);
+ hlp->cancelReservation(srv->reservationId);
}
if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
/// Handles a request when all running helpers, if any, are busy.
static void
-Enqueue(helper * hlp, Helper::Xaction * r)
+Enqueue(Helper::Client * const hlp, Helper::Xaction * const r)
{
hlp->queue.push(r);
++ hlp->stats.queue_size;
}
Helper::Xaction *
-helper::nextRequest()
+Helper::Client::nextRequest()
{
if (queue.empty())
return nullptr;
return r;
}
-static helper_server *
-GetFirstAvailable(const helper * hlp)
+static Helper::Session *
+GetFirstAvailable(const Helper::Client::Pointer &hlp)
{
dlink_node *n;
- helper_server *srv;
- helper_server *selected = NULL;
+ Helper::Session *selected = nullptr;
debugs(84, 5, "GetFirstAvailable: Running servers " << hlp->childs.n_running);
if (hlp->childs.n_running == 0)
- return NULL;
+ return nullptr;
/* Find "least" loaded helper (approx) */
- for (n = hlp->servers.head; n != NULL; n = n->next) {
- srv = (helper_server *)n->data;
+ for (n = hlp->servers.head; n != nullptr; n = n->next) {
+ const auto srv = static_cast<Helper::Session *>(n->data);
if (selected && selected->stats.pending <= srv->stats.pending)
continue;
if (!selected) {
debugs(84, 5, "GetFirstAvailable: None available.");
- return NULL;
+ return nullptr;
}
if (selected->stats.pending >= (hlp->childs.concurrency ? hlp->childs.concurrency : 1)) {
debugs(84, 3, "GetFirstAvailable: Least-loaded helper is fully loaded!");
- return NULL;
+ return nullptr;
}
debugs(84, 5, "GetFirstAvailable: returning srv-" << selected->index);
}
static helper_stateful_server *
-StatefulGetFirstAvailable(const statefulhelper * hlp)
+StatefulGetFirstAvailable(const statefulhelper::Pointer &hlp)
{
dlink_node *n;
- helper_stateful_server *srv = NULL;
+ helper_stateful_server *srv = nullptr;
+ helper_stateful_server *oldestReservedServer = nullptr;
debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp->childs.n_running);
if (hlp->childs.n_running == 0)
- return NULL;
+ return nullptr;
- for (n = hlp->servers.head; n != NULL; n = n->next) {
+ for (n = hlp->servers.head; n != nullptr; n = n->next) {
srv = (helper_stateful_server *)n->data;
if (srv->stats.pending)
continue;
- if (srv->flags.reserved)
+ if (srv->reserved()) {
+ if ((squid_curtime - srv->reservationStart) > hlp->childs.reservationTimeout) {
+ if (!oldestReservedServer)
+ oldestReservedServer = srv;
+ else if (oldestReservedServer->reservationStart < srv->reservationStart)
+ oldestReservedServer = srv;
+ debugs(84, 5, "the earlier reserved server is the srv-" << oldestReservedServer->index);
+ }
continue;
+ }
if (srv->flags.shutdown)
continue;
return srv;
}
+ if (oldestReservedServer) {
+ debugs(84, 5, "expired reservation " << oldestReservedServer->reservationId << " for srv-" << oldestReservedServer->index);
+ return oldestReservedServer;
+ }
+
debugs(84, 5, "StatefulGetFirstAvailable: None available.");
- return NULL;
+ return nullptr;
}
static void
helperDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag flag, int, void *data)
{
- helper_server *srv = (helper_server *)data;
+ const auto srv = static_cast<Helper::Session *>(data);
srv->writebuf->clean();
delete srv->writebuf;
- srv->writebuf = NULL;
+ srv->writebuf = nullptr;
srv->flags.writing = false;
if (flag != Comm::OK) {
srv->flags.writing = true;
AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
CommIoCbPtrFun(helperDispatchWriteDone, srv));
- Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
+ Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, nullptr);
}
}
static void
-helperDispatch(helper_server * srv, Helper::Xaction * r)
+helperDispatch(Helper::Session * const srv, Helper::Xaction * const r)
{
- helper *hlp = srv->parent;
+ const auto hlp = srv->parent;
const uint64_t reqId = ++srv->nextRequestId;
if (!cbdataReferenceValid(r->request.data)) {
- debugs(84, DBG_IMPORTANT, "helperDispatch: invalid callback data");
+ debugs(84, DBG_IMPORTANT, "ERROR: helperDispatch: invalid callback data");
delete r;
return;
}
r->request.Id = reqId;
- helper_server::Requests::iterator it = srv->requests.insert(srv->requests.end(), r);
+ const auto it = srv->requests.insert(srv->requests.end(), r);
r->request.dispatch_time = current_time;
if (srv->wqueue->isNull())
srv->wqueue->init();
if (hlp->childs.concurrency) {
- srv->requestsIndex.insert(helper_server::RequestIndex::value_type(reqId, it));
+ srv->requestsIndex.insert(Helper::Session::RequestIndex::value_type(reqId, it));
assert(srv->requestsIndex.size() == srv->requests.size());
srv->wqueue->appendf("%" PRIu64 " %s", reqId, r->request.buf);
} else
srv->wqueue->append(r->request.buf, strlen(r->request.buf));
if (!srv->flags.writing) {
- assert(NULL == srv->writebuf);
+ assert(nullptr == srv->writebuf);
srv->writebuf = srv->wqueue;
srv->wqueue = new MemBuf;
srv->flags.writing = true;
AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
CommIoCbPtrFun(helperDispatchWriteDone, srv));
- Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
+ Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, nullptr);
}
debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index << ", " << strlen(r->request.buf) << " bytes");
static void
helperStatefulDispatch(helper_stateful_server * srv, Helper::Xaction * r)
{
- statefulhelper *hlp = srv->parent;
+ const auto hlp = srv->parent;
if (!cbdataReferenceValid(r->request.data)) {
- debugs(84, DBG_IMPORTANT, "helperStatefulDispatch: invalid callback data");
+ debugs(84, DBG_IMPORTANT, "ERROR: helperStatefulDispatch: invalid callback data");
delete r;
- helperStatefulReleaseServer(srv);
+ hlp->cancelReservation(srv->reservationId);
return;
}
debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index);
+ assert(srv->reservationId);
+ r->reply.reservationId = srv->reservationId;
+
if (r->request.placeholder == 1) {
/* a callback is needed before this request can _use_ a helper. */
/* we don't care about releasing this helper. The request NEVER
* gets to the helper. So we throw away the return code */
r->reply.result = Helper::Unknown;
- r->reply.whichServer = srv;
r->request.callback(r->request.data, r->reply);
/* throw away the placeholder */
delete r;
return;
}
- srv->flags.reserved = true;
srv->requests.push_back(r);
srv->dispatch_time = current_time;
AsyncCall::Pointer call = commCbCall(5,5, "helperStatefulDispatchWriteDone",
- CommIoCbPtrFun(helperStatefulDispatchWriteDone, hlp));
- Comm::Write(srv->writePipe, r->request.buf, strlen(r->request.buf), call, NULL);
+ CommIoCbPtrFun(helperStatefulDispatchWriteDone, srv));
+ Comm::Write(srv->writePipe, r->request.buf, strlen(r->request.buf), call, nullptr);
debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
hlp->id_name << " #" << srv->index << ", " <<
(int) strlen(r->request.buf) << " bytes");
}
static void
-helperKickQueue(helper * hlp)
+helperKickQueue(const Helper::Client::Pointer &hlp)
{
- Helper::Xaction *r;
- helper_server *srv;
+ Helper::Xaction *r = nullptr;
+ Helper::Session *srv = nullptr;
while ((srv = GetFirstAvailable(hlp)) && (r = hlp->nextRequest()))
helperDispatch(srv, r);
}
static void
-helperStatefulKickQueue(statefulhelper * hlp)
+helperStatefulKickQueue(const statefulhelper::Pointer &hlp)
{
Helper::Xaction *r;
helper_stateful_server *srv;
-
- while ((srv = StatefulGetFirstAvailable(hlp)) && (r = hlp->nextRequest()))
+ while ((srv = StatefulGetFirstAvailable(hlp)) && (r = hlp->nextRequest())) {
+ debugs(84, 5, "found srv-" << srv->index);
+ hlp->reserveServer(srv);
helperStatefulDispatch(srv, r);
+ }
}
static void
{
if (!srv->flags.shutdown) {
helperStatefulKickQueue(srv->parent);
- } else if (!srv->flags.closing && !srv->flags.reserved && !srv->stats.pending) {
+ } else if (!srv->flags.closing && !srv->reserved() && !srv->stats.pending) {
srv->closeWritePipeSafely(srv->parent->id_name);
return;
}
}
void
-helper_server::checkForTimedOutRequests(bool const retry)
+Helper::Session::checkForTimedOutRequests(bool const retry)
{
assert(parent->childs.concurrency);
while(!requests.empty() && requests.front()->request.timedOut(parent->timeout)) {
- Helper::Xaction *r = requests.front();
+ const auto r = requests.front();
RequestIndex::iterator it;
it = requestsIndex.find(r->request.Id);
assert(it != requestsIndex.end());
}
void
-helper_server::requestTimeout(const CommTimeoutCbParams &io)
+Helper::Session::requestTimeout(const CommTimeoutCbParams &io)
{
- debugs(26, 3, HERE << io.conn);
- helper_server *srv = static_cast<helper_server *>(io.data);
-
- if (!cbdataReferenceValid(srv))
- return;
+ debugs(26, 3, io.conn);
+ const auto srv = static_cast<Session *>(io.data);
srv->checkForTimedOutRequests(srv->parent->retryTimedOut);
- debugs(84, 3, HERE << io.conn << " establish new helper_server::requestTimeout");
- AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "helper_server::requestTimeout",
- CommTimeoutCbPtrFun(helper_server::requestTimeout, srv));
+ debugs(84, 3, io.conn << " establish a new timeout");
+ AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "Helper::Session::requestTimeout",
+ CommTimeoutCbPtrFun(Session::requestTimeout, srv));
const int timeSpent = srv->requests.empty() ? 0 : (squid_curtime - srv->requests.front()->request.dispatch_time.tv_sec);
const int timeLeft = max(1, (static_cast<int>(srv->parent->timeout) - timeSpent));