]> git.ipfire.org Git - thirdparty/squid.git/blobdiff - src/helper.cc
Source Format Enforcement (#532)
[thirdparty/squid.git] / src / helper.cc
index fe52569d5b9c70d87f08800808f1ea30c2eeca87..640afe734fb8f63cc207aca0a8e9d891762c122d 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 1996-2016 The Squid Software Foundation and contributors
+ * Copyright (C) 1996-2020 The Squid Software Foundation and contributors
  *
  * Squid software is distributed under GPLv2+ license and includes
  * contributions from numerous individuals and organizations.
@@ -42,10 +42,8 @@ const size_t ReadBufSize(32*1024);
 
 static IOCB helperHandleRead;
 static IOCB helperStatefulHandleRead;
-static void helperServerFree(helper_server *srv);
-static void helperStatefulServerFree(helper_stateful_server *srv);
 static void Enqueue(helper * hlp, Helper::Xaction *);
-static helper_server *GetFirstAvailable(helper * hlp);
+static helper_server *GetFirstAvailable(const helper * hlp);
 static helper_stateful_server *StatefulGetFirstAvailable(statefulhelper * hlp);
 static void helperDispatch(helper_server * srv, Helper::Xaction * r);
 static void helperStatefulDispatch(helper_stateful_server * srv, Helper::Xaction * r);
@@ -121,6 +119,79 @@ HelperServerBase::closeWritePipeSafely(const char *id_name)
 #endif
 }
 
+void
+HelperServerBase::dropQueued()
+{
+    while (!requests.empty()) {
+        // XXX: re-schedule these on another helper?
+        Helper::Xaction *r = requests.front();
+        requests.pop_front();
+        void *cbdata;
+        if (cbdataReferenceValidDone(r->request.data, &cbdata)) {
+            r->reply.result = Helper::Unknown;
+            r->request.callback(cbdata, r->reply);
+        }
+
+        delete r;
+    }
+}
+
+HelperServerBase::~HelperServerBase()
+{
+    if (rbuf) {
+        memFreeBuf(rbuf_sz, rbuf);
+        rbuf = NULL;
+    }
+}
+
+helper_server::~helper_server()
+{
+    wqueue->clean();
+    delete wqueue;
+
+    if (writebuf) {
+        writebuf->clean();
+        delete writebuf;
+        writebuf = NULL;
+    }
+
+    if (Comm::IsConnOpen(writePipe))
+        closeWritePipeSafely(parent->id_name);
+
+    dlinkDelete(&link, &parent->servers);
+
+    assert(parent->childs.n_running > 0);
+    -- parent->childs.n_running;
+
+    assert(requests.empty());
+    cbdataReferenceDone(parent);
+}
+
+void
+helper_server::dropQueued()
+{
+    HelperServerBase::dropQueued();
+    requestsIndex.clear();
+}
+
+helper_stateful_server::~helper_stateful_server()
+{
+    /* TODO: walk the local queue of requests and carry them all out */
+    if (Comm::IsConnOpen(writePipe))
+        closeWritePipeSafely(parent->id_name);
+
+    parent->cancelReservation(reservationId);
+
+    dlinkDelete(&link, &parent->servers);
+
+    assert(parent->childs.n_running > 0);
+    -- parent->childs.n_running;
+
+    assert(requests.empty());
+
+    cbdataReferenceDone(parent);
+}
+
 void
 helperOpenServers(helper * hlp)
 {
@@ -227,7 +298,7 @@ helperOpenServers(helper * hlp)
         if (wfd != rfd)
             commSetNonBlocking(wfd);
 
-        AsyncCall::Pointer closeCall = asyncCall(5,4, "helperServerFree", cbdataDialer(helperServerFree, srv));
+        AsyncCall::Pointer closeCall = asyncCall(5,4, "helper_server::HelperServerClosed", cbdataDialer(helper_server::HelperServerClosed, srv));
         comm_add_close_handler(rfd, closeCall);
 
         if (hlp->timeout && hlp->childs.concurrency) {
@@ -324,7 +395,6 @@ helperStatefulOpenServers(statefulhelper * hlp)
         helper_stateful_server *srv = new helper_stateful_server;
         srv->hIpc = hIpc;
         srv->pid = pid;
-        srv->flags.reserved = false;
         srv->initStats();
         srv->addr = hlp->addr;
         srv->readPipe = new Comm::Connection;
@@ -334,9 +404,7 @@ helperStatefulOpenServers(statefulhelper * hlp)
         srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
         srv->roffset = 0;
         srv->parent = cbdataReference(hlp);
-
-        if (hlp->datapool != NULL)
-            srv->data = hlp->datapool->alloc();
+        srv->reservationStart = 0;
 
         dlinkAddTail(srv, &srv->link, &hlp->servers);
 
@@ -355,7 +423,7 @@ helperStatefulOpenServers(statefulhelper * hlp)
         if (wfd != rfd)
             commSetNonBlocking(wfd);
 
-        AsyncCall::Pointer closeCall = asyncCall(5,4, "helperStatefulServerFree", cbdataDialer(helperStatefulServerFree, srv));
+        AsyncCall::Pointer closeCall = asyncCall(5,4, "helper_stateful_server::HelperServerClosed", cbdataDialer(helper_stateful_server::HelperServerClosed, srv));
         comm_add_close_handler(rfd, closeCall);
 
         AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
@@ -379,54 +447,101 @@ helper::submitRequest(Helper::Xaction *r)
     else
         Enqueue(this, r);
 
-    if (!queueFull()) {
-        full_time = 0;
-    } else if (!full_time) {
-        debugs(84, 3, id_name << " queue became full");
-        full_time = squid_curtime;
+    syncQueueStats();
+}
+
+/// handles helperSubmit() and helperStatefulSubmit() failures
+static void
+SubmissionFailure(helper *hlp, HLPCB *callback, void *data)
+{
+    auto result = Helper::Error;
+    if (!hlp) {
+        debugs(84, 3, "no helper");
+        result = Helper::Unknown;
     }
+    // else pretend the helper has responded with ERR
+
+    callback(data, Helper::Reply(result));
 }
 
 void
 helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
 {
-    if (hlp == NULL) {
-        debugs(84, 3, "helperSubmit: hlp == NULL");
-        Helper::Reply const nilReply(Helper::Unknown);
-        callback(data, nilReply);
-        return;
-    }
-    hlp->prepSubmit();
-    hlp->submit(buf, callback, data);
+    if (!hlp || !hlp->trySubmit(buf, callback, data))
+        SubmissionFailure(hlp, callback, data);
 }
 
+/// whether queuing an additional request would overload the helper
 bool
 helper::queueFull() const {
+    return stats.queue_size >= static_cast<int>(childs.queue_size);
+}
+
+bool
+helper::overloaded() const {
     return stats.queue_size > static_cast<int>(childs.queue_size);
 }
 
-/// prepares the helper for request submission via trySubmit() or helperSubmit()
-/// currently maintains full_time and kills Squid if the helper remains full for too long
+/// synchronizes queue-dependent measurements with the current queue state
 void
-helper::prepSubmit()
+helper::syncQueueStats()
 {
-    if (!queueFull())
-        full_time = 0;
-    else if (!full_time) // may happen here if reconfigure decreases capacity
-        full_time = squid_curtime;
-    else if (squid_curtime - full_time > 180)
-        fatalf("Too many queued %s requests", id_name);
+    if (overloaded()) {
+        if (overloadStart) {
+            debugs(84, 5, id_name << " still overloaded; dropped " << droppedRequests);
+        } else {
+            overloadStart = squid_curtime;
+            debugs(84, 3, id_name << " became overloaded");
+        }
+    } else {
+        if (overloadStart) {
+            debugs(84, 5, id_name << " is no longer overloaded");
+            if (droppedRequests) {
+                debugs(84, DBG_IMPORTANT, "helper " << id_name <<
+                       " is no longer overloaded after dropping " << droppedRequests <<
+                       " requests in " << (squid_curtime - overloadStart) << " seconds");
+                droppedRequests = 0;
+            }
+            overloadStart = 0;
+        }
+    }
 }
 
+/// prepares the helper for request submission
+/// returns true if and only if the submission should proceed
+/// may kill Squid if the helper remains overloaded for too long
 bool
-helper::trySubmit(const char *buf, HLPCB * callback, void *data)
+helper::prepSubmit()
 {
-    prepSubmit();
+    // re-sync for the configuration may have changed since the last submission
+    syncQueueStats();
+
+    // Nothing special to do if the new request does not overload (i.e., the
+    // queue is not even full yet) or only _starts_ overloading this helper
+    // (i.e., the queue is currently at its limit).
+    if (!overloaded())
+        return true;
 
-    if (queueFull()) {
-        debugs(84, DBG_IMPORTANT, id_name << " drops request due to a full queue");
-        return false; // request was ignored
+    if (squid_curtime - overloadStart <= 180)
+        return true; // also OK: overload has not persisted long enough to panic
+
+    if (childs.onPersistentOverload == Helper::ChildConfig::actDie)
+        fatalf("Too many queued %s requests; see on-persistent-overload.", id_name);
+
+    if (!droppedRequests) {
+        debugs(84, DBG_IMPORTANT, "WARNING: dropping requests to overloaded " <<
+               id_name << " helper configured with on-persistent-overload=err");
     }
+    ++droppedRequests;
+    debugs(84, 3, "failed to send " << droppedRequests << " helper requests to " << id_name);
+    return false;
+}
+
+bool
+helper::trySubmit(const char *buf, HLPCB * callback, void *data)
+{
+    if (!prepSubmit())
+        return false; // request was dropped
 
     submit(buf, callback, data); // will send or queue
     return true; // request submitted or queued
@@ -441,75 +556,116 @@ helper::submit(const char *buf, HLPCB * callback, void *data)
     debugs(84, DBG_DATA, Raw("buf", buf, strlen(buf)));
 }
 
-/// lastserver = "server last used as part of a reserved request sequence"
+/// Submit request or callback the caller with a Helper::Error error.
+/// If the reservation is not set then reserves a new helper.
 void
-helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPCB * callback, void *data, helper_stateful_server * lastserver)
+helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
 {
-    if (hlp == NULL) {
-        debugs(84, 3, "helperStatefulSubmit: hlp == NULL");
-        Helper::Reply const nilReply(Helper::Unknown);
-        callback(data, nilReply);
-        return;
-    }
-    hlp->prepSubmit();
-    hlp->submit(buf, callback, data, lastserver);
+    if (!hlp || !hlp->trySubmit(buf, callback, data, reservation))
+        SubmissionFailure(hlp, callback, data);
 }
 
-void statefulhelper::submit(const char *buf, HLPCB * callback, void *data, helper_stateful_server * lastserver)
+/// If possible, submit request. Otherwise, either kill Squid or return false.
+bool
+statefulhelper::trySubmit(const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
 {
-    Helper::Xaction *r = new Helper::Xaction(callback, data, buf);
+    if (!prepSubmit())
+        return false; // request was dropped
 
-    if ((buf != NULL) && lastserver) {
-        debugs(84, 5, "StatefulSubmit with lastserver " << lastserver);
-        assert(lastserver->flags.reserved);
-        assert(!lastserver->requests.size());
+    submit(buf, callback, data, reservation); // will send or queue
+    return true; // request submitted or queued
+}
 
-        debugs(84, 5, "StatefulSubmit dispatching");
-        helperStatefulDispatch(lastserver, r);
-    } else {
-        helper_stateful_server *srv;
-        if ((srv = StatefulGetFirstAvailable(this))) {
-            helperStatefulDispatch(srv, r);
-        } else
-            StatefulEnqueue(this, r);
+void
+statefulhelper::reserveServer(helper_stateful_server * srv)
+{
+    // clear any old reservation
+    if (srv->reserved()) {
+        reservations.erase(srv->reservationId);
+        srv->clearReservation();
     }
 
-    debugs(84, DBG_DATA, "placeholder: '" << r->request.placeholder <<
-           "', " << Raw("buf", buf, (!buf?0:strlen(buf))));
-
-    if (!queueFull()) {
-        full_time = 0;
-    } else if (!full_time) {
-        debugs(84, 3, id_name << " queue became full");
-        full_time = squid_curtime;
-    }
+    srv->reserve();
+    reservations.insert(Reservations::value_type(srv->reservationId, srv));
 }
 
-/**
- * DPW 2007-05-08
- *
- * helperStatefulReleaseServer tells the helper that whoever was
- * using it no longer needs its services.
- */
 void
-helperStatefulReleaseServer(helper_stateful_server * srv)
+statefulhelper::cancelReservation(const Helper::ReservationId reservation)
 {
-    debugs(84, 3, HERE << "srv-" << srv->index << " flags.reserved = " << srv->flags.reserved);
-    if (!srv->flags.reserved)
+    const auto it = reservations.find(reservation);
+    if (it == reservations.end())
         return;
 
-    ++ srv->stats.releases;
+    helper_stateful_server *srv = it->second;
+    reservations.erase(it);
+    srv->clearReservation();
+
+    // schedule a queue kick
+    AsyncCall::Pointer call = asyncCall(5,4, "helperStatefulServerDone", cbdataDialer(helperStatefulServerDone, srv));
+    ScheduleCallHere(call);
+}
+
+helper_stateful_server *
+statefulhelper::findServer(const Helper::ReservationId & reservation)
+{
+    const auto it = reservations.find(reservation);
+    if (it == reservations.end())
+        return nullptr;
+    return it->second;
+}
+
+void
+helper_stateful_server::reserve()
+{
+    assert(!reservationId);
+    reservationStart = squid_curtime;
+    reservationId = Helper::ReservationId::Next();
+    debugs(84, 3, "srv-" << index << " reservation id = " << reservationId);
+}
+
+void
+helper_stateful_server::clearReservation()
+{
+    debugs(84, 3, "srv-" << index << " reservation id = " << reservationId);
+    if (!reservationId)
+        return;
 
-    srv->flags.reserved = false;
+    ++stats.releases;
 
-    helperStatefulServerDone(srv);
+    reservationId.clear();
+    reservationStart = 0;
 }
 
-/** return a pointer to the stateful routines data area */
-void *
-helperStatefulServerGetData(helper_stateful_server * srv)
+void
+statefulhelper::submit(const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
 {
-    return srv->data;
+    Helper::Xaction *r = new Helper::Xaction(callback, data, buf);
+
+    if (buf && reservation) {
+        debugs(84, 5, reservation);
+        helper_stateful_server *lastServer = findServer(reservation);
+        if (!lastServer) {
+            debugs(84, DBG_CRITICAL, "ERROR: Helper " << id_name << " reservation expired (" << reservation << ")");
+            r->reply.result = Helper::TimedOut;
+            r->request.callback(r->request.data, r->reply);
+            delete r;
+            return;
+        }
+        debugs(84, 5, "StatefulSubmit dispatching");
+        helperStatefulDispatch(lastServer, r);
+    } else {
+        helper_stateful_server *srv;
+        if ((srv = StatefulGetFirstAvailable(this))) {
+            reserveServer(srv);
+            helperStatefulDispatch(srv, r);
+        } else
+            StatefulEnqueue(this, r);
+    }
+
+    debugs(84, DBG_DATA, "placeholder: '" << r->request.placeholder <<
+           "', " << Raw("buf", buf, (!buf?0:strlen(buf))));
+
+    syncQueueStats();
 }
 
 void
@@ -553,7 +709,7 @@ helper::packStatsInto(Packable *p, const char *label) const
                    srv->stats.pending ? 'B' : ' ',
                    srv->flags.writing ? 'W' : ' ',
                    srv->flags.closing ? 'C' : ' ',
-                   srv->flags.reserved ? 'R' : ' ',
+                   srv->reserved() ? 'R' : ' ',
                    srv->flags.shutdown ? 'S' : ' ',
                    xaction && xaction->request.placeholder ? 'P' : ' ',
                    tt < 0.0 ? 0.0 : tt,
@@ -570,6 +726,11 @@ helper::packStatsInto(Packable *p, const char *label) const
               "   P\tPLACEHOLDER\n", 101);
 }
 
+bool
+helper::willOverload() const {
+    return queueFull() && !(childs.needNew() || GetFirstAvailable(this));
+}
+
 void
 helperShutdown(helper * hlp)
 {
@@ -636,7 +797,7 @@ helperStatefulShutdown(statefulhelper * hlp)
             continue;
         }
 
-        if (srv->flags.reserved) {
+        if (srv->reserved()) {
             if (shutting_down) {
                 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Closing anyway.");
             } else {
@@ -663,144 +824,62 @@ helper::~helper()
         debugs(84, DBG_CRITICAL, "WARNING: freeing " << id_name << " helper with " << stats.queue_size << " requests queued");
 }
 
-/* ====================================================================== */
-/* LOCAL FUNCTIONS */
-/* ====================================================================== */
-
-static void
-helperServerFree(helper_server *srv)
+void
+helper::handleKilledServer(HelperServerBase *srv, bool &needsNewServers)
 {
-    helper *hlp = srv->parent;
-    int concurrency = hlp->childs.concurrency;
-
-    if (!concurrency)
-        concurrency = 1;
-
-    if (srv->rbuf) {
-        memFreeBuf(srv->rbuf_sz, srv->rbuf);
-        srv->rbuf = NULL;
-    }
-
-    srv->wqueue->clean();
-    delete srv->wqueue;
-
-    if (srv->writebuf) {
-        srv->writebuf->clean();
-        delete srv->writebuf;
-        srv->writebuf = NULL;
-    }
-
-    if (Comm::IsConnOpen(srv->writePipe))
-        srv->closeWritePipeSafely(hlp->id_name);
-
-    dlinkDelete(&srv->link, &hlp->servers);
-
-    assert(hlp->childs.n_running > 0);
-    -- hlp->childs.n_running;
-
+    needsNewServers = false;
     if (!srv->flags.shutdown) {
-        assert(hlp->childs.n_active > 0);
-        -- hlp->childs.n_active;
-        debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index << " exited");
+        assert(childs.n_active > 0);
+        --childs.n_active;
+        debugs(84, DBG_CRITICAL, "WARNING: " << id_name << " #" << srv->index << " exited");
 
-        if (hlp->childs.needNew() > 0) {
-            debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
+        if (childs.needNew() > 0) {
+            debugs(80, DBG_IMPORTANT, "Too few " << id_name << " processes are running (need " << childs.needNew() << "/" << childs.n_max << ")");
 
-            if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30) {
+            if (childs.n_active < childs.n_startup && last_restart > squid_curtime - 30) {
                 if (srv->stats.replies < 1)
-                    fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
+                    fatalf("The %s helpers are crashing too rapidly, need help!\n", id_name);
                 else
-                    debugs(80, DBG_CRITICAL, "ERROR: The " << hlp->id_name << " helpers are crashing too rapidly, need help!");
+                    debugs(80, DBG_CRITICAL, "ERROR: The " << id_name << " helpers are crashing too rapidly, need help!");
             }
-
-            debugs(80, DBG_IMPORTANT, "Starting new helpers");
-            helperOpenServers(hlp);
+            srv->flags.shutdown = true;
+            needsNewServers = true;
         }
     }
-
-    while (!srv->requests.empty()) {
-        // XXX: re-schedule these on another helper?
-        Helper::Xaction *r = srv->requests.front();
-        srv->requests.pop_front();
-        void *cbdata;
-
-        if (cbdataReferenceValidDone(r->request.data, &cbdata)) {
-            r->reply.result = Helper::Unknown;
-            r->request.callback(cbdata, r->reply);
-        }
-
-        delete r;
-    }
-    srv->requestsIndex.clear();
-
-    cbdataReferenceDone(srv->parent);
-    delete srv;
 }
 
-static void
-helperStatefulServerFree(helper_stateful_server *srv)
+void
+helper_server::HelperServerClosed(helper_server *srv)
 {
-    statefulhelper *hlp = srv->parent;
+    helper *hlp = srv->getParent();
 
-    if (srv->rbuf) {
-        memFreeBuf(srv->rbuf_sz, srv->rbuf);
-        srv->rbuf = NULL;
+    bool needsNewServers = false;
+    hlp->handleKilledServer(srv, needsNewServers);
+    if (needsNewServers) {
+        debugs(80, DBG_IMPORTANT, "Starting new helpers");
+        helperOpenServers(hlp);
     }
 
-#if 0
-    srv->wqueue->clean();
-
-    delete srv->wqueue;
-
-#endif
-
-    /* TODO: walk the local queue of requests and carry them all out */
-    if (Comm::IsConnOpen(srv->writePipe))
-        srv->closeWritePipeSafely(hlp->id_name);
-
-    dlinkDelete(&srv->link, &hlp->servers);
+    srv->dropQueued();
 
-    assert(hlp->childs.n_running > 0);
-    -- hlp->childs.n_running;
-
-    if (!srv->flags.shutdown) {
-        assert( hlp->childs.n_active > 0);
-        -- hlp->childs.n_active;
-        debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index << " exited");
-
-        if (hlp->childs.needNew() > 0) {
-            debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
-
-            if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30) {
-                if (srv->stats.replies < 1)
-                    fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
-                else
-                    debugs(80, DBG_CRITICAL, "ERROR: The " << hlp->id_name << " helpers are crashing too rapidly, need help!");
-            }
-
-            debugs(80, DBG_IMPORTANT, "Starting new helpers");
-            helperStatefulOpenServers(hlp);
-        }
-    }
-
-    while (!srv->requests.empty()) {
-        // XXX: re-schedule these on another helper?
-        Helper::Xaction *r = srv->requests.front();
-        srv->requests.pop_front();
-        void *cbdata;
+    delete srv;
+}
 
-        if (cbdataReferenceValidDone(r->request.data, &cbdata)) {
-            r->reply.result = Helper::Unknown;
-            r->request.callback(cbdata, r->reply);
-        }
+// XXX: Almost duplicates helper_server::HelperServerClosed() because helperOpenServers() is not a virtual method of the `helper` class
+// TODO: Fix the `helper` class hierarchy to use CbdataParent and virtual functions.
+void
+helper_stateful_server::HelperServerClosed(helper_stateful_server *srv)
+{
+    statefulhelper *hlp = static_cast<statefulhelper *>(srv->getParent());
 
-        delete r;
+    bool needsNewServers = false;
+    hlp->handleKilledServer(srv, needsNewServers);
+    if (needsNewServers) {
+        debugs(80, DBG_IMPORTANT, "Starting new helpers");
+        helperStatefulOpenServers(hlp);
     }
 
-    if (srv->data != NULL)
-        hlp->datapool->freeOne(srv->data);
-
-    cbdataReferenceDone(srv->parent);
+    srv->dropQueued();
 
     delete srv;
 }
@@ -844,18 +923,19 @@ helperReturnBuffer(helper_server * srv, helper * hlp, char * msg, size_t msgSize
         if (!msgEnd)
             return; // We are waiting for more data.
 
-        HLPCB *callback = r->request.callback;
-        r->request.callback = nullptr;
-
         bool retry = false;
-        void *cbdata = nullptr;
-        if (cbdataReferenceValidDone(r->request.data, &cbdata)) {
+        if (cbdataReferenceValid(r->request.data)) {
             r->reply.finalize();
             if (r->reply.result == Helper::BrokenHelper && r->request.retries < MAX_RETRIES) {
                 debugs(84, DBG_IMPORTANT, "ERROR: helper: " << r->reply << ", attempt #" << (r->request.retries + 1) << " of 2");
                 retry = true;
-            } else
-                callback(cbdata, r->reply);
+            } else {
+                HLPCB *callback = r->request.callback;
+                r->request.callback = nullptr;
+                void *cbdata = nullptr;
+                if (cbdataReferenceValidDone(r->request.data, &cbdata))
+                    callback(cbdata, r->reply);
+            }
         }
 
         -- srv->stats.pending;
@@ -1080,7 +1160,7 @@ helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len
 
         if (r && cbdataReferenceValid(r->request.data)) {
             r->reply.finalize();
-            r->reply.whichServer = srv;
+            r->reply.reservationId = srv->reservationId;
             r->request.callback(r->request.data, r->reply);
         } else {
             debugs(84, DBG_IMPORTANT, "StatefulHandleRead: no callback data registered");
@@ -1102,7 +1182,7 @@ helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len
         if (called)
             helperStatefulServerDone(srv);
         else
-            helperStatefulReleaseServer(srv);
+            hlp->cancelReservation(srv->reservationId);
     }
 
     if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
@@ -1186,7 +1266,7 @@ helper::nextRequest()
 }
 
 static helper_server *
-GetFirstAvailable(helper * hlp)
+GetFirstAvailable(const helper * hlp)
 {
     dlink_node *n;
     helper_server *srv;
@@ -1217,14 +1297,13 @@ GetFirstAvailable(helper * hlp)
         selected = srv;
     }
 
-    /* Check for overload */
     if (!selected) {
         debugs(84, 5, "GetFirstAvailable: None available.");
         return NULL;
     }
 
     if (selected->stats.pending >= (hlp->childs.concurrency ? hlp->childs.concurrency : 1)) {
-        debugs(84, 3, "GetFirstAvailable: Least-loaded helper is overloaded!");
+        debugs(84, 3, "GetFirstAvailable: Least-loaded helper is fully loaded!");
         return NULL;
     }
 
@@ -1237,6 +1316,7 @@ StatefulGetFirstAvailable(statefulhelper * hlp)
 {
     dlink_node *n;
     helper_stateful_server *srv = NULL;
+    helper_stateful_server *oldestReservedServer = nullptr;
     debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp->childs.n_running);
 
     if (hlp->childs.n_running == 0)
@@ -1248,8 +1328,16 @@ StatefulGetFirstAvailable(statefulhelper * hlp)
         if (srv->stats.pending)
             continue;
 
-        if (srv->flags.reserved)
+        if (srv->reserved()) {
+            if ((squid_curtime - srv->reservationStart) > hlp->childs.reservationTimeout) {
+                if (!oldestReservedServer)
+                    oldestReservedServer = srv;
+                else if (oldestReservedServer->reservationStart < srv->reservationStart)
+                    oldestReservedServer = srv;
+                debugs(84, 5, "the earlier reserved server is the srv-" << oldestReservedServer->index);
+            }
             continue;
+        }
 
         if (srv->flags.shutdown)
             continue;
@@ -1258,8 +1346,13 @@ StatefulGetFirstAvailable(statefulhelper * hlp)
         return srv;
     }
 
+    if (oldestReservedServer) {
+        debugs(84, 5, "expired reservation " << oldestReservedServer->reservationId << " for srv-" << oldestReservedServer->index);
+        return oldestReservedServer;
+    }
+
     debugs(84, 5, "StatefulGetFirstAvailable: None available.");
-    return NULL;
+    return nullptr;
 }
 
 static void
@@ -1343,18 +1436,20 @@ helperStatefulDispatch(helper_stateful_server * srv, Helper::Xaction * r)
     if (!cbdataReferenceValid(r->request.data)) {
         debugs(84, DBG_IMPORTANT, "helperStatefulDispatch: invalid callback data");
         delete r;
-        helperStatefulReleaseServer(srv);
+        hlp->cancelReservation(srv->reservationId);
         return;
     }
 
     debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index);
 
+    assert(srv->reservationId);
+    r->reply.reservationId = srv->reservationId;
+
     if (r->request.placeholder == 1) {
         /* a callback is needed before this request can _use_ a helper. */
         /* we don't care about releasing this helper. The request NEVER
          * gets to the helper. So we throw away the return code */
         r->reply.result = Helper::Unknown;
-        r->reply.whichServer = srv;
         r->request.callback(r->request.data, r->reply);
         /* throw away the placeholder */
         delete r;
@@ -1367,7 +1462,6 @@ helperStatefulDispatch(helper_stateful_server * srv, Helper::Xaction * r)
         return;
     }
 
-    srv->flags.reserved = true;
     srv->requests.push_back(r);
     srv->dispatch_time = current_time;
     AsyncCall::Pointer call = commCbCall(5,5, "helperStatefulDispatchWriteDone",
@@ -1397,9 +1491,11 @@ helperStatefulKickQueue(statefulhelper * hlp)
 {
     Helper::Xaction *r;
     helper_stateful_server *srv;
-
-    while ((srv = StatefulGetFirstAvailable(hlp)) && (r = hlp->nextRequest()))
+    while ((srv = StatefulGetFirstAvailable(hlp)) && (r = hlp->nextRequest())) {
+        debugs(84, 5, "found srv-" << srv->index);
+        hlp->reserveServer(srv);
         helperStatefulDispatch(srv, r);
+    }
 }
 
 static void
@@ -1407,7 +1503,7 @@ helperStatefulServerDone(helper_stateful_server * srv)
 {
     if (!srv->flags.shutdown) {
         helperStatefulKickQueue(srv->parent);
-    } else if (!srv->flags.closing && !srv->flags.reserved && !srv->stats.pending) {
+    } else if (!srv->flags.closing && !srv->reserved() && !srv->stats.pending) {
         srv->closeWritePipeSafely(srv->parent->id_name);
         return;
     }
@@ -1458,9 +1554,6 @@ helper_server::requestTimeout(const CommTimeoutCbParams &io)
     debugs(26, 3, HERE << io.conn);
     helper_server *srv = static_cast<helper_server *>(io.data);
 
-    if (!cbdataReferenceValid(srv))
-        return;
-
     srv->checkForTimedOutRequests(srv->parent->retryTimedOut);
 
     debugs(84, 3, HERE << io.conn << " establish new helper_server::requestTimeout");