]> git.ipfire.org Git - thirdparty/squid.git/blobdiff - src/helper.cc
Source Format Enforcement (#763)
[thirdparty/squid.git] / src / helper.cc
index c07787e0c1450123bed0012fe9622eb181af625e..f69e384faf69af2325cf9406bea96a42376abf4c 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 1996-2014 The Squid Software Foundation and contributors
+ * Copyright (C) 1996-2021 The Squid Software Foundation and contributors
  *
  * Squid software is distributed under GPLv2+ license and includes
  * contributions from numerous individuals and organizations.
@@ -10,6 +10,7 @@
 
 #include "squid.h"
 #include "base/AsyncCbdataCalls.h"
+#include "base/Packable.h"
 #include "comm.h"
 #include "comm/Connection.h"
 #include "comm/Read.h"
 #include "fde.h"
 #include "format/Quoting.h"
 #include "helper.h"
-#include "Mem.h"
+#include "helper/Reply.h"
+#include "helper/Request.h"
 #include "MemBuf.h"
+#include "SquidConfig.h"
 #include "SquidIpc.h"
 #include "SquidMath.h"
 #include "SquidTime.h"
 #include "Store.h"
 #include "wordlist.h"
 
+// helper_stateful_server::data uses explicit alloc()/freeOne() */
+#include "mem/Pool.h"
+
 #define HELPER_MAX_ARGS 64
 
-/** Initial Squid input buffer size. Helper responses may exceed this, and
- * Squid will grow the input buffer as needed, up to ReadBufMaxSize.
- */
-const size_t ReadBufMinSize(4*1024);
+/// The maximum allowed request retries.
+#define MAX_RETRIES 2
 
-/** Maximum safe size of a helper-to-Squid response message plus one.
- * Squid will warn and close the stream if a helper sends a too-big response.
- * ssl_crtd helper is known to produce responses of at least 10KB in size.
- * Some undocumented helpers are known to produce responses exceeding 8KB.
- */
-const size_t ReadBufMaxSize(32*1024);
+/// Helpers input buffer size.
+const size_t ReadBufSize(32*1024);
 
 static IOCB helperHandleRead;
 static IOCB helperStatefulHandleRead;
-static void helperServerFree(helper_server *srv);
-static void helperStatefulServerFree(helper_stateful_server *srv);
-static void Enqueue(helper * hlp, helper_request *);
-static helper_request *Dequeue(helper * hlp);
-static helper_stateful_request *StatefulDequeue(statefulhelper * hlp);
-static helper_server *GetFirstAvailable(helper * hlp);
+static void Enqueue(helper * hlp, Helper::Xaction *);
+static helper_server *GetFirstAvailable(const helper * hlp);
 static helper_stateful_server *StatefulGetFirstAvailable(statefulhelper * hlp);
-static void helperDispatch(helper_server * srv, helper_request * r);
-static void helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r);
+static void helperDispatch(helper_server * srv, Helper::Xaction * r);
+static void helperStatefulDispatch(helper_stateful_server * srv, Helper::Xaction * r);
 static void helperKickQueue(helper * hlp);
 static void helperStatefulKickQueue(statefulhelper * hlp);
 static void helperStatefulServerDone(helper_stateful_server * srv);
-static void helperRequestFree(helper_request * r);
-static void helperStatefulRequestFree(helper_stateful_request * r);
-static void StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r);
-static bool helperStartStats(StoreEntry *sentry, void *hlp, const char *label);
+static void StatefulEnqueue(statefulhelper * hlp, Helper::Xaction * r);
 
 CBDATA_CLASS_INIT(helper);
 CBDATA_CLASS_INIT(helper_server);
@@ -73,6 +66,7 @@ HelperServerBase::initStats()
     stats.replies=0;
     stats.pending=0;
     stats.releases=0;
+    stats.timedout = 0;
 }
 
 void
@@ -125,6 +119,79 @@ HelperServerBase::closeWritePipeSafely(const char *id_name)
 #endif
 }
 
+void
+HelperServerBase::dropQueued()
+{
+    while (!requests.empty()) {
+        // XXX: re-schedule these on another helper?
+        Helper::Xaction *r = requests.front();
+        requests.pop_front();
+        void *cbdata;
+        if (cbdataReferenceValidDone(r->request.data, &cbdata)) {
+            r->reply.result = Helper::Unknown;
+            r->request.callback(cbdata, r->reply);
+        }
+
+        delete r;
+    }
+}
+
+HelperServerBase::~HelperServerBase()
+{
+    if (rbuf) {
+        memFreeBuf(rbuf_sz, rbuf);
+        rbuf = NULL;
+    }
+}
+
+helper_server::~helper_server()
+{
+    wqueue->clean();
+    delete wqueue;
+
+    if (writebuf) {
+        writebuf->clean();
+        delete writebuf;
+        writebuf = NULL;
+    }
+
+    if (Comm::IsConnOpen(writePipe))
+        closeWritePipeSafely(parent->id_name);
+
+    dlinkDelete(&link, &parent->servers);
+
+    assert(parent->childs.n_running > 0);
+    -- parent->childs.n_running;
+
+    assert(requests.empty());
+    cbdataReferenceDone(parent);
+}
+
+void
+helper_server::dropQueued()
+{
+    HelperServerBase::dropQueued();
+    requestsIndex.clear();
+}
+
+helper_stateful_server::~helper_stateful_server()
+{
+    /* TODO: walk the local queue of requests and carry them all out */
+    if (Comm::IsConnOpen(writePipe))
+        closeWritePipeSafely(parent->id_name);
+
+    parent->cancelReservation(reservationId);
+
+    dlinkDelete(&link, &parent->servers);
+
+    assert(parent->childs.n_running > 0);
+    -- parent->childs.n_running;
+
+    assert(requests.empty());
+
+    cbdataReferenceDone(parent);
+}
+
 void
 helperOpenServers(helper * hlp)
 {
@@ -207,10 +274,12 @@ helperOpenServers(helper * hlp)
         srv->readPipe->fd = rfd;
         srv->writePipe = new Comm::Connection;
         srv->writePipe->fd = wfd;
-        srv->rbuf = (char *)memAllocBuf(ReadBufMinSize, &srv->rbuf_sz);
+        srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
         srv->wqueue = new MemBuf;
         srv->roffset = 0;
-        srv->requests = (helper_request **)xcalloc(hlp->childs.concurrency ? hlp->childs.concurrency : 1, sizeof(*srv->requests));
+        srv->nextRequestId = 0;
+        srv->replyXaction = NULL;
+        srv->ignoreToEom = false;
         srv->parent = cbdataReference(hlp);
         dlinkAddTail(srv, &srv->link, &hlp->servers);
 
@@ -229,9 +298,15 @@ helperOpenServers(helper * hlp)
         if (wfd != rfd)
             commSetNonBlocking(wfd);
 
-        AsyncCall::Pointer closeCall = asyncCall(5,4, "helperServerFree", cbdataDialer(helperServerFree, srv));
+        AsyncCall::Pointer closeCall = asyncCall(5,4, "helper_server::HelperServerClosed", cbdataDialer(helper_server::HelperServerClosed, srv));
         comm_add_close_handler(rfd, closeCall);
 
+        if (hlp->timeout && hlp->childs.concurrency) {
+            AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "helper_server::requestTimeout",
+                                             CommTimeoutCbPtrFun(helper_server::requestTimeout, srv));
+            commSetConnTimeout(srv->readPipe, hlp->timeout, timeoutCall);
+        }
+
         AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
                                              CommIoCbPtrFun(helperHandleRead, srv));
         comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
@@ -320,19 +395,16 @@ helperStatefulOpenServers(statefulhelper * hlp)
         helper_stateful_server *srv = new helper_stateful_server;
         srv->hIpc = hIpc;
         srv->pid = pid;
-        srv->flags.reserved = false;
         srv->initStats();
         srv->addr = hlp->addr;
         srv->readPipe = new Comm::Connection;
         srv->readPipe->fd = rfd;
         srv->writePipe = new Comm::Connection;
         srv->writePipe->fd = wfd;
-        srv->rbuf = (char *)memAllocBuf(ReadBufMinSize, &srv->rbuf_sz);
+        srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
         srv->roffset = 0;
         srv->parent = cbdataReference(hlp);
-
-        if (hlp->datapool != NULL)
-            srv->data = hlp->datapool->alloc();
+        srv->reservationStart = 0;
 
         dlinkAddTail(srv, &srv->link, &hlp->servers);
 
@@ -351,7 +423,7 @@ helperStatefulOpenServers(statefulhelper * hlp)
         if (wfd != rfd)
             commSetNonBlocking(wfd);
 
-        AsyncCall::Pointer closeCall = asyncCall(5,4, "helperStatefulServerFree", cbdataDialer(helperStatefulServerFree, srv));
+        AsyncCall::Pointer closeCall = asyncCall(5,4, "helper_stateful_server::HelperServerClosed", cbdataDialer(helper_stateful_server::HelperServerClosed, srv));
         comm_add_close_handler(rfd, closeCall);
 
         AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
@@ -366,215 +438,297 @@ helperStatefulOpenServers(statefulhelper * hlp)
 }
 
 void
-helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
+helper::submitRequest(Helper::Xaction *r)
 {
-    if (hlp == NULL) {
-        debugs(84, 3, "helperSubmit: hlp == NULL");
-        HelperReply nilReply;
-        callback(data, nilReply);
-        return;
-    }
-
-    helper_request *r = new helper_request;
     helper_server *srv;
 
-    r->callback = callback;
-    r->data = cbdataReference(data);
-    r->buf = xstrdup(buf);
-
-    if ((srv = GetFirstAvailable(hlp)))
+    if ((srv = GetFirstAvailable(this)))
         helperDispatch(srv, r);
     else
-        Enqueue(hlp, r);
+        Enqueue(this, r);
 
-    debugs(84, DBG_DATA, Raw("buf", buf, strlen(buf)));
+    syncQueueStats();
 }
 
-/// lastserver = "server last used as part of a reserved request sequence"
-void
-helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPCB * callback, void *data, helper_stateful_server * lastserver)
+/// handles helperSubmit() and helperStatefulSubmit() failures
+static void
+SubmissionFailure(helper *hlp, HLPCB *callback, void *data)
 {
-    if (hlp == NULL) {
-        debugs(84, 3, "helperStatefulSubmit: hlp == NULL");
-        HelperReply nilReply;
-        callback(data, nilReply);
-        return;
+    auto result = Helper::Error;
+    if (!hlp) {
+        debugs(84, 3, "no helper");
+        result = Helper::Unknown;
     }
+    // else pretend the helper has responded with ERR
 
-    helper_stateful_request *r = new helper_stateful_request;
+    callback(data, Helper::Reply(result));
+}
 
-    r->callback = callback;
-    r->data = cbdataReference(data);
+void
+helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
+{
+    if (!hlp || !hlp->trySubmit(buf, callback, data))
+        SubmissionFailure(hlp, callback, data);
+}
 
-    if (buf != NULL) {
-        r->buf = xstrdup(buf);
-        r->placeholder = 0;
+/// whether queuing an additional request would overload the helper
+bool
+helper::queueFull() const {
+    return stats.queue_size >= static_cast<int>(childs.queue_size);
+}
+
+bool
+helper::overloaded() const {
+    return stats.queue_size > static_cast<int>(childs.queue_size);
+}
+
+/// synchronizes queue-dependent measurements with the current queue state
+void
+helper::syncQueueStats()
+{
+    if (overloaded()) {
+        if (overloadStart) {
+            debugs(84, 5, id_name << " still overloaded; dropped " << droppedRequests);
+        } else {
+            overloadStart = squid_curtime;
+            debugs(84, 3, id_name << " became overloaded");
+        }
     } else {
-        r->buf = NULL;
-        r->placeholder = 1;
+        if (overloadStart) {
+            debugs(84, 5, id_name << " is no longer overloaded");
+            if (droppedRequests) {
+                debugs(84, DBG_IMPORTANT, "helper " << id_name <<
+                       " is no longer overloaded after dropping " << droppedRequests <<
+                       " requests in " << (squid_curtime - overloadStart) << " seconds");
+                droppedRequests = 0;
+            }
+            overloadStart = 0;
+        }
     }
+}
 
-    if ((buf != NULL) && lastserver) {
-        debugs(84, 5, "StatefulSubmit with lastserver " << lastserver);
-        assert(lastserver->flags.reserved);
-        assert(!(lastserver->request));
+/// prepares the helper for request submission
+/// returns true if and only if the submission should proceed
+/// may kill Squid if the helper remains overloaded for too long
+bool
+helper::prepSubmit()
+{
+    // re-sync for the configuration may have changed since the last submission
+    syncQueueStats();
 
-        debugs(84, 5, "StatefulSubmit dispatching");
-        helperStatefulDispatch(lastserver, r);
-    } else {
-        helper_stateful_server *srv;
-        if ((srv = StatefulGetFirstAvailable(hlp))) {
-            helperStatefulDispatch(srv, r);
-        } else
-            StatefulEnqueue(hlp, r);
+    // Nothing special to do if the new request does not overload (i.e., the
+    // queue is not even full yet) or only _starts_ overloading this helper
+    // (i.e., the queue is currently at its limit).
+    if (!overloaded())
+        return true;
+
+    if (squid_curtime - overloadStart <= 180)
+        return true; // also OK: overload has not persisted long enough to panic
+
+    if (childs.onPersistentOverload == Helper::ChildConfig::actDie)
+        fatalf("Too many queued %s requests; see on-persistent-overload.", id_name);
+
+    if (!droppedRequests) {
+        debugs(84, DBG_IMPORTANT, "WARNING: dropping requests to overloaded " <<
+               id_name << " helper configured with on-persistent-overload=err");
     }
+    ++droppedRequests;
+    debugs(84, 3, "failed to send " << droppedRequests << " helper requests to " << id_name);
+    return false;
+}
+
+bool
+helper::trySubmit(const char *buf, HLPCB * callback, void *data)
+{
+    if (!prepSubmit())
+        return false; // request was dropped
 
-    debugs(84, DBG_DATA, "placeholder: '" << r->placeholder <<
-           "', " << Raw("buf", buf, strlen(buf)));
+    submit(buf, callback, data); // will send or queue
+    return true; // request submitted or queued
 }
 
-/**
- * DPW 2007-05-08
- *
- * helperStatefulReleaseServer tells the helper that whoever was
- * using it no longer needs its services.
- */
+/// dispatches or enqueues a helper requests; does not enforce queue limits
 void
-helperStatefulReleaseServer(helper_stateful_server * srv)
+helper::submit(const char *buf, HLPCB * callback, void *data)
 {
-    debugs(84, 3, HERE << "srv-" << srv->index << " flags.reserved = " << srv->flags.reserved);
-    if (!srv->flags.reserved)
-        return;
+    Helper::Xaction *r = new Helper::Xaction(callback, data, buf);
+    submitRequest(r);
+    debugs(84, DBG_DATA, Raw("buf", buf, strlen(buf)));
+}
 
-    ++ srv->stats.releases;
+/// Submit request or callback the caller with a Helper::Error error.
+/// If the reservation is not set then reserves a new helper.
+void
+helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
+{
+    if (!hlp || !hlp->trySubmit(buf, callback, data, reservation))
+        SubmissionFailure(hlp, callback, data);
+}
 
-    srv->flags.reserved = false;
-    if (srv->parent->OnEmptyQueue != NULL && srv->data)
-        srv->parent->OnEmptyQueue(srv->data);
+/// If possible, submit request. Otherwise, either kill Squid or return false.
+bool
+statefulhelper::trySubmit(const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
+{
+    if (!prepSubmit())
+        return false; // request was dropped
 
-    helperStatefulServerDone(srv);
+    submit(buf, callback, data, reservation); // will send or queue
+    return true; // request submitted or queued
 }
 
-/** return a pointer to the stateful routines data area */
-void *
-helperStatefulServerGetData(helper_stateful_server * srv)
+void
+statefulhelper::reserveServer(helper_stateful_server * srv)
 {
-    return srv->data;
+    // clear any old reservation
+    if (srv->reserved()) {
+        reservations.erase(srv->reservationId);
+        srv->clearReservation();
+    }
+
+    srv->reserve();
+    reservations.insert(Reservations::value_type(srv->reservationId, srv));
 }
 
-/**
- * Dump some stats about the helper states to a StoreEntry
- */
 void
-helperStats(StoreEntry * sentry, helper * hlp, const char *label)
+statefulhelper::cancelReservation(const Helper::ReservationId reservation)
 {
-    if (!helperStartStats(sentry, hlp, label))
+    const auto it = reservations.find(reservation);
+    if (it == reservations.end())
         return;
 
-    storeAppendPrintf(sentry, "program: %s\n",
-                      hlp->cmdline->key);
-    storeAppendPrintf(sentry, "number active: %d of %d (%d shutting down)\n",
-                      hlp->childs.n_active, hlp->childs.n_max, (hlp->childs.n_running - hlp->childs.n_active) );
-    storeAppendPrintf(sentry, "requests sent: %d\n",
-                      hlp->stats.requests);
-    storeAppendPrintf(sentry, "replies received: %d\n",
-                      hlp->stats.replies);
-    storeAppendPrintf(sentry, "queue length: %d\n",
-                      hlp->stats.queue_size);
-    storeAppendPrintf(sentry, "avg service time: %d msec\n",
-                      hlp->stats.avg_svc_time);
-    storeAppendPrintf(sentry, "\n");
-    storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%11s\t%s\t%7s\t%7s\t%7s\n",
-                      "ID #",
-                      "FD",
-                      "PID",
-                      "# Requests",
-                      "# Replies",
-                      "Flags",
-                      "Time",
-                      "Offset",
-                      "Request");
-
-    for (dlink_node *link = hlp->servers.head; link; link = link->next) {
-        helper_server *srv = (helper_server*)link->data;
-        double tt = 0.001 * (srv->requests[0] ? tvSubMsec(srv->requests[0]->dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
-        storeAppendPrintf(sentry, "%7u\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c\t%7.3f\t%7d\t%s\n",
-                          srv->index.value,
-                          srv->readPipe->fd,
-                          srv->pid,
-                          srv->stats.uses,
-                          srv->stats.replies,
-                          srv->stats.pending ? 'B' : ' ',
-                          srv->flags.writing ? 'W' : ' ',
-                          srv->flags.closing ? 'C' : ' ',
-                          srv->flags.shutdown ? 'S' : ' ',
-                          tt < 0.0 ? 0.0 : tt,
-                          (int) srv->roffset,
-                          srv->requests[0] ? Format::QuoteMimeBlob(srv->requests[0]->buf) : "(none)");
-    }
+    helper_stateful_server *srv = it->second;
+    reservations.erase(it);
+    srv->clearReservation();
+
+    // schedule a queue kick
+    AsyncCall::Pointer call = asyncCall(5,4, "helperStatefulServerDone", cbdataDialer(helperStatefulServerDone, srv));
+    ScheduleCallHere(call);
+}
 
-    storeAppendPrintf(sentry, "\nFlags key:\n\n");
-    storeAppendPrintf(sentry, "   B = BUSY\n");
-    storeAppendPrintf(sentry, "   W = WRITING\n");
-    storeAppendPrintf(sentry, "   C = CLOSING\n");
-    storeAppendPrintf(sentry, "   S = SHUTDOWN PENDING\n");
+helper_stateful_server *
+statefulhelper::findServer(const Helper::ReservationId & reservation)
+{
+    const auto it = reservations.find(reservation);
+    if (it == reservations.end())
+        return nullptr;
+    return it->second;
+}
+
+void
+helper_stateful_server::reserve()
+{
+    assert(!reservationId);
+    reservationStart = squid_curtime;
+    reservationId = Helper::ReservationId::Next();
+    debugs(84, 3, "srv-" << index << " reservation id = " << reservationId);
 }
 
 void
-helperStatefulStats(StoreEntry * sentry, statefulhelper * hlp, const char *label)
+helper_stateful_server::clearReservation()
 {
-    if (!helperStartStats(sentry, hlp, label))
+    debugs(84, 3, "srv-" << index << " reservation id = " << reservationId);
+    if (!reservationId)
         return;
 
-    storeAppendPrintf(sentry, "program: %s\n",
-                      hlp->cmdline->key);
-    storeAppendPrintf(sentry, "number active: %d of %d (%d shutting down)\n",
-                      hlp->childs.n_active, hlp->childs.n_max, (hlp->childs.n_running - hlp->childs.n_active) );
-    storeAppendPrintf(sentry, "requests sent: %d\n",
-                      hlp->stats.requests);
-    storeAppendPrintf(sentry, "replies received: %d\n",
-                      hlp->stats.replies);
-    storeAppendPrintf(sentry, "queue length: %d\n",
-                      hlp->stats.queue_size);
-    storeAppendPrintf(sentry, "avg service time: %d msec\n",
-                      hlp->stats.avg_svc_time);
-    storeAppendPrintf(sentry, "\n");
-    storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
-                      "ID #",
-                      "FD",
-                      "PID",
-                      "# Requests",
-                      "# Replies",
-                      "Flags",
-                      "Time",
-                      "Offset",
-                      "Request");
-
-    for (dlink_node *link = hlp->servers.head; link; link = link->next) {
-        helper_stateful_server *srv = (helper_stateful_server *)link->data;
-        double tt = 0.001 * tvSubMsec(srv->dispatch_time, srv->flags.busy ? current_time : srv->answer_time);
-        storeAppendPrintf(sentry, "%7u\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
-                          srv->index.value,
-                          srv->readPipe->fd,
-                          srv->pid,
-                          srv->stats.uses,
-                          srv->stats.replies,
-                          srv->flags.busy ? 'B' : ' ',
-                          srv->flags.closing ? 'C' : ' ',
-                          srv->flags.reserved ? 'R' : ' ',
-                          srv->flags.shutdown ? 'S' : ' ',
-                          srv->request ? (srv->request->placeholder ? 'P' : ' ') : ' ',
-                                  tt < 0.0 ? 0.0 : tt,
-                                  (int) srv->roffset,
-                                  srv->request ? Format::QuoteMimeBlob(srv->request->buf) : "(none)");
+    ++stats.releases;
+
+    reservationId.clear();
+    reservationStart = 0;
+}
+
+void
+statefulhelper::submit(const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
+{
+    Helper::Xaction *r = new Helper::Xaction(callback, data, buf);
+
+    if (buf && reservation) {
+        debugs(84, 5, reservation);
+        helper_stateful_server *lastServer = findServer(reservation);
+        if (!lastServer) {
+            debugs(84, DBG_CRITICAL, "ERROR: Helper " << id_name << " reservation expired (" << reservation << ")");
+            r->reply.result = Helper::TimedOut;
+            r->request.callback(r->request.data, r->reply);
+            delete r;
+            return;
+        }
+        debugs(84, 5, "StatefulSubmit dispatching");
+        helperStatefulDispatch(lastServer, r);
+    } else {
+        helper_stateful_server *srv;
+        if ((srv = StatefulGetFirstAvailable(this))) {
+            reserveServer(srv);
+            helperStatefulDispatch(srv, r);
+        } else
+            StatefulEnqueue(this, r);
+    }
+
+    debugs(84, DBG_DATA, "placeholder: '" << r->request.placeholder <<
+           "', " << Raw("buf", buf, (!buf?0:strlen(buf))));
+
+    syncQueueStats();
+}
+
+void
+helper::packStatsInto(Packable *p, const char *label) const
+{
+    if (label)
+        p->appendf("%s:\n", label);
+
+    p->appendf("  program: %s\n", cmdline->key);
+    p->appendf("  number active: %d of %d (%d shutting down)\n", childs.n_active, childs.n_max, (childs.n_running - childs.n_active));
+    p->appendf("  requests sent: %d\n", stats.requests);
+    p->appendf("  replies received: %d\n", stats.replies);
+    p->appendf("  requests timedout: %d\n", stats.timedout);
+    p->appendf("  queue length: %d\n", stats.queue_size);
+    p->appendf("  avg service time: %d msec\n", stats.avg_svc_time);
+    p->append("\n",1);
+    p->appendf("%7s\t%7s\t%7s\t%11s\t%11s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
+               "ID #",
+               "FD",
+               "PID",
+               "# Requests",
+               "# Replies",
+               "# Timed-out",
+               "Flags",
+               "Time",
+               "Offset",
+               "Request");
+
+    for (dlink_node *link = servers.head; link; link = link->next) {
+        HelperServerBase *srv = static_cast<HelperServerBase *>(link->data);
+        assert(srv);
+        Helper::Xaction *xaction = srv->requests.empty() ? NULL : srv->requests.front();
+        double tt = 0.001 * (xaction ? tvSubMsec(xaction->request.dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
+        p->appendf("%7u\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
+                   srv->index.value,
+                   srv->readPipe->fd,
+                   srv->pid,
+                   srv->stats.uses,
+                   srv->stats.replies,
+                   srv->stats.timedout,
+                   srv->stats.pending ? 'B' : ' ',
+                   srv->flags.writing ? 'W' : ' ',
+                   srv->flags.closing ? 'C' : ' ',
+                   srv->reserved() ? 'R' : ' ',
+                   srv->flags.shutdown ? 'S' : ' ',
+                   xaction && xaction->request.placeholder ? 'P' : ' ',
+                   tt < 0.0 ? 0.0 : tt,
+                   (int) srv->roffset,
+                   xaction ? Format::QuoteMimeBlob(xaction->request.buf) : "(none)");
     }
 
-    storeAppendPrintf(sentry, "\nFlags key:\n\n");
-    storeAppendPrintf(sentry, "   B = BUSY\n");
-    storeAppendPrintf(sentry, "   C = CLOSING\n");
-    storeAppendPrintf(sentry, "   R = RESERVED\n");
-    storeAppendPrintf(sentry, "   S = SHUTDOWN PENDING\n");
-    storeAppendPrintf(sentry, "   P = PLACEHOLDER\n");
+    p->append("\nFlags key:\n"
+              "   B\tBUSY\n"
+              "   W\tWRITING\n"
+              "   C\tCLOSING\n"
+              "   R\tRESERVED\n"
+              "   S\tSHUTDOWN PENDING\n"
+              "   P\tPLACEHOLDER\n", 101);
+}
+
+bool
+helper::willOverload() const {
+    return queueFull() && !(childs.needNew() || GetFirstAvailable(this));
 }
 
 void
@@ -594,7 +748,7 @@ helperShutdown(helper * hlp)
 
         assert(hlp->childs.n_active > 0);
         -- hlp->childs.n_active;
-        srv->flags.shutdown = true;    /* request it to shut itself down */
+        srv->flags.shutdown = true; /* request it to shut itself down */
 
         if (srv->flags.closing) {
             debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
@@ -631,9 +785,9 @@ helperStatefulShutdown(statefulhelper * hlp)
 
         assert(hlp->childs.n_active > 0);
         -- hlp->childs.n_active;
-        srv->flags.shutdown = true;    /* request it to shut itself down */
+        srv->flags.shutdown = true; /* request it to shut itself down */
 
-        if (srv->flags.busy) {
+        if (srv->stats.pending) {
             debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
             continue;
         }
@@ -643,7 +797,7 @@ helperStatefulShutdown(statefulhelper * hlp)
             continue;
         }
 
-        if (srv->flags.reserved) {
+        if (srv->reserved()) {
             if (shutting_down) {
                 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Closing anyway.");
             } else {
@@ -665,172 +819,123 @@ helper::~helper()
 {
     /* note, don't free id_name, it probably points to static memory */
 
-    if (queue.head)
+    // TODO: if the queue is not empty it will leak Helper::Request's
+    if (!queue.empty())
         debugs(84, DBG_CRITICAL, "WARNING: freeing " << id_name << " helper with " << stats.queue_size << " requests queued");
 }
 
-/* ====================================================================== */
-/* LOCAL FUNCTIONS */
-/* ====================================================================== */
-
-static void
-helperServerFree(helper_server *srv)
+void
+helper::handleKilledServer(HelperServerBase *srv, bool &needsNewServers)
 {
-    helper *hlp = srv->parent;
-    helper_request *r;
-    int i, concurrency = hlp->childs.concurrency;
-
-    if (!concurrency)
-        concurrency = 1;
-
-    if (srv->rbuf) {
-        memFreeBuf(srv->rbuf_sz, srv->rbuf);
-        srv->rbuf = NULL;
-    }
-
-    srv->wqueue->clean();
-    delete srv->wqueue;
-
-    if (srv->writebuf) {
-        srv->writebuf->clean();
-        delete srv->writebuf;
-        srv->writebuf = NULL;
-    }
-
-    if (Comm::IsConnOpen(srv->writePipe))
-        srv->closeWritePipeSafely(hlp->id_name);
-
-    dlinkDelete(&srv->link, &hlp->servers);
-
-    assert(hlp->childs.n_running > 0);
-    -- hlp->childs.n_running;
-
+    needsNewServers = false;
     if (!srv->flags.shutdown) {
-        assert(hlp->childs.n_active > 0);
-        -- hlp->childs.n_active;
-        debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index << " exited");
+        assert(childs.n_active > 0);
+        --childs.n_active;
+        debugs(84, DBG_CRITICAL, "WARNING: " << id_name << " #" << srv->index << " exited");
 
-        if (hlp->childs.needNew() > 0) {
-            debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
+        if (childs.needNew() > 0) {
+            debugs(80, DBG_IMPORTANT, "Too few " << id_name << " processes are running (need " << childs.needNew() << "/" << childs.n_max << ")");
 
-            if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30) {
+            if (childs.n_active < childs.n_startup && last_restart > squid_curtime - 30) {
                 if (srv->stats.replies < 1)
-                    fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
+                    fatalf("The %s helpers are crashing too rapidly, need help!\n", id_name);
                 else
-                    debugs(80, DBG_CRITICAL, "ERROR: The " << hlp->id_name << " helpers are crashing too rapidly, need help!");
+                    debugs(80, DBG_CRITICAL, "ERROR: The " << id_name << " helpers are crashing too rapidly, need help!");
             }
-
-            debugs(80, DBG_IMPORTANT, "Starting new helpers");
-            helperOpenServers(hlp);
+            srv->flags.shutdown = true;
+            needsNewServers = true;
         }
     }
+}
 
-    for (i = 0; i < concurrency; ++i) {
-        // XXX: re-schedule these on another helper?
-        if ((r = srv->requests[i])) {
-            void *cbdata;
-
-            if (cbdataReferenceValidDone(r->data, &cbdata)) {
-                HelperReply nilReply;
-                r->callback(cbdata, nilReply);
-            }
-
-            helperRequestFree(r);
+void
+helper_server::HelperServerClosed(helper_server *srv)
+{
+    helper *hlp = srv->getParent();
 
-            srv->requests[i] = NULL;
-        }
+    bool needsNewServers = false;
+    hlp->handleKilledServer(srv, needsNewServers);
+    if (needsNewServers) {
+        debugs(80, DBG_IMPORTANT, "Starting new helpers");
+        helperOpenServers(hlp);
     }
-    safe_free(srv->requests);
 
-    cbdataReferenceDone(srv->parent);
+    srv->dropQueued();
+
     delete srv;
 }
 
-static void
-helperStatefulServerFree(helper_stateful_server *srv)
+// XXX: Almost duplicates helper_server::HelperServerClosed() because helperOpenServers() is not a virtual method of the `helper` class
+// TODO: Fix the `helper` class hierarchy to use CbdataParent and virtual functions.
+void
+helper_stateful_server::HelperServerClosed(helper_stateful_server *srv)
 {
-    statefulhelper *hlp = srv->parent;
-    helper_stateful_request *r;
+    statefulhelper *hlp = static_cast<statefulhelper *>(srv->getParent());
 
-    if (srv->rbuf) {
-        memFreeBuf(srv->rbuf_sz, srv->rbuf);
-        srv->rbuf = NULL;
+    bool needsNewServers = false;
+    hlp->handleKilledServer(srv, needsNewServers);
+    if (needsNewServers) {
+        debugs(80, DBG_IMPORTANT, "Starting new helpers");
+        helperStatefulOpenServers(hlp);
     }
 
-#if 0
-    srv->wqueue->clean();
-
-    delete srv->wqueue;
-
-#endif
-
-    /* TODO: walk the local queue of requests and carry them all out */
-    if (Comm::IsConnOpen(srv->writePipe))
-        srv->closeWritePipeSafely(hlp->id_name);
-
-    dlinkDelete(&srv->link, &hlp->servers);
-
-    assert(hlp->childs.n_running > 0);
-    -- hlp->childs.n_running;
-
-    if (!srv->flags.shutdown) {
-        assert( hlp->childs.n_active > 0);
-        -- hlp->childs.n_active;
-        debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index << " exited");
-
-        if (hlp->childs.needNew() > 0) {
-            debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
-
-            if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30) {
-                if (srv->stats.replies < 1)
-                    fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
-                else
-                    debugs(80, DBG_CRITICAL, "ERROR: The " << hlp->id_name << " helpers are crashing too rapidly, need help!");
-            }
-
-            debugs(80, DBG_IMPORTANT, "Starting new helpers");
-            helperStatefulOpenServers(hlp);
-        }
-    }
+    srv->dropQueued();
 
-    if ((r = srv->request)) {
-        void *cbdata;
+    delete srv;
+}
 
-        if (cbdataReferenceValidDone(r->data, &cbdata)) {
-            HelperReply nilReply;
-            nilReply.whichServer = srv;
-            r->callback(cbdata, nilReply);
+Helper::Xaction *
+helper_server::popRequest(int request_number)
+{
+    Helper::Xaction *r = nullptr;
+    helper_server::RequestIndex::iterator it;
+    if (parent->childs.concurrency) {
+        // If concurrency supported retrieve request from ID
+        it = requestsIndex.find(request_number);
+        if (it != requestsIndex.end()) {
+            r = *(it->second);
+            requests.erase(it->second);
+            requestsIndex.erase(it);
         }
-
-        helperStatefulRequestFree(r);
-
-        srv->request = NULL;
+    } else if(!requests.empty()) {
+        // Else get the first request from queue, if any
+        r = requests.front();
+        requests.pop_front();
     }
 
-    if (srv->data != NULL)
-        hlp->datapool->freeOne(srv->data);
-
-    cbdataReferenceDone(srv->parent);
-
-    delete srv;
+    return r;
 }
 
 /// Calls back with a pointer to the buffer with the helper output
 static void
-helperReturnBuffer(int request_number, helper_server * srv, helper * hlp, char * msg, char * msg_end)
+helperReturnBuffer(helper_server * srv, helper * hlp, char * msg, size_t msgSize, char * msgEnd)
 {
-    helper_request *r = srv->requests[request_number];
-    if (r) {
-        HLPCB *callback = r->callback;
-
-        srv->requests[request_number] = NULL;
+    if (Helper::Xaction *r = srv->replyXaction) {
+        const bool hasSpace = r->reply.accumulate(msg, msgSize);
+        if (!hasSpace) {
+            debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
+                   "helper that overflowed " << srv->rbuf_sz << "-byte " <<
+                   "Squid input buffer: " << hlp->id_name << " #" << srv->index);
+            srv->closePipesSafely(hlp->id_name);
+            return;
+        }
 
-        r->callback = NULL;
+        if (!msgEnd)
+            return; // We are waiting for more data.
 
-        void *cbdata = NULL;
-        if (cbdataReferenceValidDone(r->data, &cbdata)) {
-            HelperReply response(msg, (msg_end-msg));
-            callback(cbdata, response);
+        bool retry = false;
+        if (cbdataReferenceValid(r->request.data)) {
+            r->reply.finalize();
+            if (r->reply.result == Helper::BrokenHelper && r->request.retries < MAX_RETRIES) {
+                debugs(84, DBG_IMPORTANT, "ERROR: helper: " << r->reply << ", attempt #" << (r->request.retries + 1) << " of 2");
+                retry = true;
+            } else {
+                HLPCB *callback = r->request.callback;
+                r->request.callback = nullptr;
+                void *cbdata = nullptr;
+                if (cbdataReferenceValidDone(r->request.data, &cbdata))
+                    callback(cbdata, r->reply);
+            }
         }
 
         -- srv->stats.pending;
@@ -840,20 +945,25 @@ helperReturnBuffer(int request_number, helper_server * srv, helper * hlp, char *
 
         srv->answer_time = current_time;
 
-        srv->dispatch_time = r->dispatch_time;
+        srv->dispatch_time = r->request.dispatch_time;
 
         hlp->stats.avg_svc_time =
             Math::intAverage(hlp->stats.avg_svc_time,
-                             tvSubMsec(r->dispatch_time, current_time),
+                             tvSubMsec(r->request.dispatch_time, current_time),
                              hlp->stats.replies, REDIRECT_AV_FACTOR);
 
-        helperRequestFree(r);
-    } else {
-        debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected reply on channel " <<
-               request_number << " from " << hlp->id_name << " #" << srv->index <<
-               " '" << srv->rbuf << "'");
+        // release or re-submit parsedRequestXaction object
+        srv->replyXaction = nullptr;
+        if (retry) {
+            ++r->request.retries;
+            hlp->submitRequest(r);
+        } else
+            delete r;
     }
 
+    if (hlp->timeout && hlp->childs.concurrency)
+        srv->checkForTimedOutRequests(hlp->retryTimedOut);
+
     if (!srv->flags.shutdown) {
         helperKickQueue(hlp);
     } else if (!srv->flags.closing && !srv->stats.pending) {
@@ -863,9 +973,8 @@ helperReturnBuffer(int request_number, helper_server * srv, helper * hlp, char *
 }
 
 static void
-helperHandleRead(const Comm::ConnectionPointer &conn, char *buf, size_t len, Comm::Flag flag, int xerrno, void *data)
+helperHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
 {
-    char *t = NULL;
     helper_server *srv = (helper_server *)data;
     helper *hlp = srv->parent;
     assert(cbdataReferenceValid(data));
@@ -889,7 +998,7 @@ helperHandleRead(const Comm::ConnectionPointer &conn, char *buf, size_t len, Com
     srv->rbuf[srv->roffset] = '\0';
     debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
 
-    if (!srv->stats.pending) {
+    if (!srv->stats.pending && !srv->stats.timedout) {
         /* someone spoke without being spoken to */
         debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected read from " <<
                hlp->id_name << " #" << srv->index << ", " << (int)len <<
@@ -899,57 +1008,80 @@ helperHandleRead(const Comm::ConnectionPointer &conn, char *buf, size_t len, Com
         srv->rbuf[0] = '\0';
     }
 
-    while ((t = strchr(srv->rbuf, hlp->eom))) {
-        /* end of reply found */
-        char *msg = srv->rbuf;
-        int i = 0;
-        int skip = 1;
-        debugs(84, 3, "helperHandleRead: end of reply found");
-
-        if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n') {
-            *t = '\0';
-            // rewind to the \r octet which is the real terminal now
-            // and remember that we have to skip forward 2 places now.
-            skip = 2;
-            --t;
+    bool needsMore = false;
+    char *msg = srv->rbuf;
+    while (*msg && !needsMore) {
+        int skip = 0;
+        char *eom = strchr(msg, hlp->eom);
+        if (eom) {
+            skip = 1;
+            debugs(84, 3, "helperHandleRead: end of reply found");
+            if (eom > msg && eom[-1] == '\r' && hlp->eom == '\n') {
+                *eom = '\0';
+                // rewind to the \r octet which is the real terminal now
+                // and remember that we have to skip forward 2 places now.
+                skip = 2;
+                --eom;
+            }
+            *eom = '\0';
         }
 
-        *t = '\0';
-
-        if (hlp->childs.concurrency) {
-            i = strtol(msg, &msg, 10);
-
-            while (*msg && xisspace(*msg))
-                ++msg;
-        }
+        if (!srv->ignoreToEom && !srv->replyXaction) {
+            int i = 0;
+            if (hlp->childs.concurrency) {
+                char *e = NULL;
+                i = strtol(msg, &e, 10);
+                // Do we need to check for e == msg? Means wrong response from helper.
+                // Will be dropped as "unexpected reply on channel 0"
+                needsMore = !(xisspace(*e) || (eom && e == eom));
+                if (!needsMore) {
+                    msg = e;
+                    while (*msg && xisspace(*msg))
+                        ++msg;
+                } // else not enough data to compute request number
+            }
+            if (!(srv->replyXaction = srv->popRequest(i))) {
+                if (srv->stats.timedout) {
+                    debugs(84, 3, "Timedout reply received for request-ID: " << i << " , ignore");
+                } else {
+                    debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected reply on channel " <<
+                           i << " from " << hlp->id_name << " #" << srv->index <<
+                           " '" << srv->rbuf << "'");
+                }
+                srv->ignoreToEom = true;
+            }
+        } // else we need to just append reply data to the current Xaction
+
+        if (!needsMore) {
+            size_t msgSize  = eom ? eom - msg : (srv->roffset - (msg - srv->rbuf));
+            assert(msgSize <= srv->rbuf_sz);
+            helperReturnBuffer(srv, hlp, msg, msgSize, eom);
+            msg += msgSize + skip;
+            assert(static_cast<size_t>(msg - srv->rbuf) <= srv->rbuf_sz);
+
+            // The next message should not ignored.
+            if (eom && srv->ignoreToEom)
+                srv->ignoreToEom = false;
+        } else
+            assert(skip == 0 && eom == NULL);
+    }
 
-        helperReturnBuffer(i, srv, hlp, msg, t);
-        srv->roffset -= (t - srv->rbuf) + skip;
-        memmove(srv->rbuf, t + skip, srv->roffset);
+    if (needsMore) {
+        size_t msgSize = (srv->roffset - (msg - srv->rbuf));
+        assert(msgSize <= srv->rbuf_sz);
+        memmove(srv->rbuf, msg, msgSize);
+        srv->roffset = msgSize;
         srv->rbuf[srv->roffset] = '\0';
+    } else {
+        // All of the responses parsed and msg points at the end of read data
+        assert(static_cast<size_t>(msg - srv->rbuf) == srv->roffset);
+        srv->roffset = 0;
     }
 
     if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
         int spaceSize = srv->rbuf_sz - srv->roffset - 1;
         assert(spaceSize >= 0);
 
-        // grow the input buffer if needed and possible
-        if (!spaceSize && srv->rbuf_sz + 4096 <= ReadBufMaxSize) {
-            srv->rbuf = (char *)memReallocBuf(srv->rbuf, srv->rbuf_sz + 4096, &srv->rbuf_sz);
-            debugs(84, 3, HERE << "Grew read buffer to " << srv->rbuf_sz);
-            spaceSize = srv->rbuf_sz - srv->roffset - 1;
-            assert(spaceSize >= 0);
-        }
-
-        // quit reading if there is no space left
-        if (!spaceSize) {
-            debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
-                   "helper that overflowed " << srv->rbuf_sz << "-byte " <<
-                   "Squid input buffer: " << hlp->id_name << " #" << srv->index);
-            srv->closePipesSafely(hlp->id_name);
-            return;
-        }
-
         AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
                                              CommIoCbPtrFun(helperHandleRead, srv));
         comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
@@ -957,11 +1089,10 @@ helperHandleRead(const Comm::ConnectionPointer &conn, char *buf, size_t len, Com
 }
 
 static void
-helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *buf, size_t len, Comm::Flag flag, int xerrno, void *data)
+helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
 {
     char *t = NULL;
     helper_stateful_server *srv = (helper_stateful_server *)data;
-    helper_stateful_request *r;
     statefulhelper *hlp = srv->parent;
     assert(cbdataReferenceValid(data));
 
@@ -983,7 +1114,7 @@ helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *buf, size_t
 
     srv->roffset += len;
     srv->rbuf[srv->roffset] = '\0';
-    r = srv->request;
+    Helper::Xaction *r = srv->requests.front();
     debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
 
     if (r == NULL) {
@@ -996,42 +1127,47 @@ helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *buf, size_t
     }
 
     if ((t = strchr(srv->rbuf, hlp->eom))) {
-        /* end of reply found */
-        int called = 1;
-        int skip = 1;
         debugs(84, 3, "helperStatefulHandleRead: end of reply found");
 
         if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n') {
             *t = '\0';
             // rewind to the \r octet which is the real terminal now
-            // and remember that we have to skip forward 2 places now.
-            skip = 2;
             --t;
         }
 
         *t = '\0';
+    }
+
+    if (r && !r->reply.accumulate(srv->rbuf, t ? (t - srv->rbuf) : srv->roffset)) {
+        debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
+               "helper that overflowed " << srv->rbuf_sz << "-byte " <<
+               "Squid input buffer: " << hlp->id_name << " #" << srv->index);
+        srv->closePipesSafely(hlp->id_name);
+        return;
+    }
+    /**
+     * BUG: the below assumes that only one response per read() was received and discards any octets remaining.
+     *      Doing this prohibits concurrency support with multiple replies per read().
+     * TODO: check that read() setup on these buffers pays attention to roffest!=0
+     * TODO: check that replies bigger than the buffer are discarded and do not to affect future replies
+     */
+    srv->roffset = 0;
+
+    if (t) {
+        /* end of reply found */
+        srv->requests.pop_front(); // we already have it in 'r'
+        int called = 1;
 
-        if (r && cbdataReferenceValid(r->data)) {
-            HelperReply res(srv->rbuf, (t - srv->rbuf));
-            res.whichServer = srv;
-            r->callback(r->data, res);
+        if (r && cbdataReferenceValid(r->request.data)) {
+            r->reply.finalize();
+            r->reply.reservationId = srv->reservationId;
+            r->request.callback(r->request.data, r->reply);
         } else {
             debugs(84, DBG_IMPORTANT, "StatefulHandleRead: no callback data registered");
             called = 0;
         }
-        // only skip off the \0's _after_ passing its location in HelperReply above
-        t += skip;
-
-        srv->flags.busy = false;
-        /**
-         * BUG: the below assumes that only one response per read() was received and discards any octets remaining.
-         *      Doing this prohibits concurrency support with multiple replies per read().
-         * TODO: check that read() setup on these buffers pays attention to roffest!=0
-         * TODO: check that replies bigger than the buffer are discarded and do not to affect future replies
-         */
-        srv->roffset = 0;
-        helperStatefulRequestFree(r);
-        srv->request = NULL;
+
+        delete r;
 
         -- srv->stats.pending;
         ++ srv->stats.replies;
@@ -1046,41 +1182,23 @@ helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *buf, size_t
         if (called)
             helperStatefulServerDone(srv);
         else
-            helperStatefulReleaseServer(srv);
+            hlp->cancelReservation(srv->reservationId);
     }
 
     if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
-        int spaceSize = srv->rbuf_sz - srv->roffset - 1;
-        assert(spaceSize >= 0);
-
-        // grow the input buffer if needed and possible
-        if (!spaceSize && srv->rbuf_sz + 4096 <= ReadBufMaxSize) {
-            srv->rbuf = (char *)memReallocBuf(srv->rbuf, srv->rbuf_sz + 4096, &srv->rbuf_sz);
-            debugs(84, 3, HERE << "Grew read buffer to " << srv->rbuf_sz);
-            spaceSize = srv->rbuf_sz - srv->roffset - 1;
-            assert(spaceSize >= 0);
-        }
-
-        // quit reading if there is no space left
-        if (!spaceSize) {
-            debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
-                   "helper that overflowed " << srv->rbuf_sz << "-byte " <<
-                   "Squid input buffer: " << hlp->id_name << " #" << srv->index);
-            srv->closePipesSafely(hlp->id_name);
-            return;
-        }
+        int spaceSize = srv->rbuf_sz - 1;
 
         AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
                                              CommIoCbPtrFun(helperStatefulHandleRead, srv));
-        comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
+        comm_read(srv->readPipe, srv->rbuf, spaceSize, call);
     }
 }
 
+/// Handles a request when all running helpers, if any, are busy.
 static void
-Enqueue(helper * hlp, helper_request * r)
+Enqueue(helper * hlp, Helper::Xaction * r)
 {
-    dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
-    dlinkAddTail(r, link, &hlp->queue);
+    hlp->queue.push(r);
     ++ hlp->stats.queue_size;
 
     /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
@@ -1090,7 +1208,7 @@ Enqueue(helper * hlp, helper_request * r)
         return;
     }
 
-    if (hlp->stats.queue_size < (int)hlp->childs.n_running)
+    if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
         return;
 
     if (squid_curtime - hlp->last_queue_warn < 600)
@@ -1104,16 +1222,12 @@ Enqueue(helper * hlp, helper_request * r)
     debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
     debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
     debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
-
-    if (hlp->stats.queue_size > (int)hlp->childs.n_running * 2)
-        fatalf("Too many queued %s requests", hlp->id_name);
 }
 
 static void
-StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r)
+StatefulEnqueue(statefulhelper * hlp, Helper::Xaction * r)
 {
-    dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
-    dlinkAddTail(r, link, &hlp->queue);
+    hlp->queue.push(r);
     ++ hlp->stats.queue_size;
 
     /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
@@ -1123,12 +1237,9 @@ StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r)
         return;
     }
 
-    if (hlp->stats.queue_size < (int)hlp->childs.n_running)
+    if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
         return;
 
-    if (hlp->stats.queue_size > (int)hlp->childs.n_running * 2)
-        fatalf("Too many queued %s requests", hlp->id_name);
-
     if (squid_curtime - hlp->last_queue_warn < 600)
         return;
 
@@ -1142,40 +1253,20 @@ StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r)
     debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
 }
 
-static helper_request *
-Dequeue(helper * hlp)
+Helper::Xaction *
+helper::nextRequest()
 {
-    dlink_node *link;
-    helper_request *r = NULL;
-
-    if ((link = hlp->queue.head)) {
-        r = (helper_request *)link->data;
-        dlinkDelete(link, &hlp->queue);
-        memFree(link, MEM_DLINK_NODE);
-        -- hlp->stats.queue_size;
-    }
-
-    return r;
-}
-
-static helper_stateful_request *
-StatefulDequeue(statefulhelper * hlp)
-{
-    dlink_node *link;
-    helper_stateful_request *r = NULL;
-
-    if ((link = hlp->queue.head)) {
-        r = (helper_stateful_request *)link->data;
-        dlinkDelete(link, &hlp->queue);
-        memFree(link, MEM_DLINK_NODE);
-        -- hlp->stats.queue_size;
-    }
+    if (queue.empty())
+        return nullptr;
 
+    auto *r = queue.front();
+    queue.pop();
+    --stats.queue_size;
     return r;
 }
 
 static helper_server *
-GetFirstAvailable(helper * hlp)
+GetFirstAvailable(const helper * hlp)
 {
     dlink_node *n;
     helper_server *srv;
@@ -1206,14 +1297,13 @@ GetFirstAvailable(helper * hlp)
         selected = srv;
     }
 
-    /* Check for overload */
     if (!selected) {
         debugs(84, 5, "GetFirstAvailable: None available.");
         return NULL;
     }
 
     if (selected->stats.pending >= (hlp->childs.concurrency ? hlp->childs.concurrency : 1)) {
-        debugs(84, 3, "GetFirstAvailable: Least-loaded helper is overloaded!");
+        debugs(84, 3, "GetFirstAvailable: Least-loaded helper is fully loaded!");
         return NULL;
     }
 
@@ -1226,6 +1316,7 @@ StatefulGetFirstAvailable(statefulhelper * hlp)
 {
     dlink_node *n;
     helper_stateful_server *srv = NULL;
+    helper_stateful_server *oldestReservedServer = nullptr;
     debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp->childs.n_running);
 
     if (hlp->childs.n_running == 0)
@@ -1234,28 +1325,38 @@ StatefulGetFirstAvailable(statefulhelper * hlp)
     for (n = hlp->servers.head; n != NULL; n = n->next) {
         srv = (helper_stateful_server *)n->data;
 
-        if (srv->flags.busy)
+        if (srv->stats.pending)
             continue;
 
-        if (srv->flags.reserved)
+        if (srv->reserved()) {
+            if ((squid_curtime - srv->reservationStart) > hlp->childs.reservationTimeout) {
+                if (!oldestReservedServer)
+                    oldestReservedServer = srv;
+                else if (oldestReservedServer->reservationStart < srv->reservationStart)
+                    oldestReservedServer = srv;
+                debugs(84, 5, "the earlier reserved server is the srv-" << oldestReservedServer->index);
+            }
             continue;
+        }
 
         if (srv->flags.shutdown)
             continue;
 
-        if ((hlp->IsAvailable != NULL) && (srv->data != NULL) && !(hlp->IsAvailable(srv->data)))
-            continue;
-
         debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv->index);
         return srv;
     }
 
+    if (oldestReservedServer) {
+        debugs(84, 5, "expired reservation " << oldestReservedServer->reservationId << " for srv-" << oldestReservedServer->index);
+        return oldestReservedServer;
+    }
+
     debugs(84, 5, "StatefulGetFirstAvailable: None available.");
-    return NULL;
+    return nullptr;
 }
 
 static void
-helperDispatchWriteDone(const Comm::ConnectionPointer &conn, char *buf, size_t len, Comm::Flag flag, int xerrno, void *data)
+helperDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag flag, int, void *data)
 {
     helper_server *srv = (helper_server *)data;
 
@@ -1281,36 +1382,30 @@ helperDispatchWriteDone(const Comm::ConnectionPointer &conn, char *buf, size_t l
 }
 
 static void
-helperDispatch(helper_server * srv, helper_request * r)
+helperDispatch(helper_server * srv, Helper::Xaction * r)
 {
     helper *hlp = srv->parent;
-    helper_request **ptr = NULL;
-    unsigned int slot;
+    const uint64_t reqId = ++srv->nextRequestId;
 
-    if (!cbdataReferenceValid(r->data)) {
+    if (!cbdataReferenceValid(r->request.data)) {
         debugs(84, DBG_IMPORTANT, "helperDispatch: invalid callback data");
-        helperRequestFree(r);
+        delete r;
         return;
     }
 
-    for (slot = 0; slot < (hlp->childs.concurrency ? hlp->childs.concurrency : 1); ++slot) {
-        if (!srv->requests[slot]) {
-            ptr = &srv->requests[slot];
-            break;
-        }
-    }
-
-    assert(ptr);
-    *ptr = r;
-    r->dispatch_time = current_time;
+    r->request.Id = reqId;
+    helper_server::Requests::iterator it = srv->requests.insert(srv->requests.end(), r);
+    r->request.dispatch_time = current_time;
 
     if (srv->wqueue->isNull())
         srv->wqueue->init();
 
-    if (hlp->childs.concurrency)
-        srv->wqueue->Printf("%d %s", slot, r->buf);
-    else
-        srv->wqueue->append(r->buf, strlen(r->buf));
+    if (hlp->childs.concurrency) {
+        srv->requestsIndex.insert(helper_server::RequestIndex::value_type(reqId, it));
+        assert(srv->requestsIndex.size() == srv->requests.size());
+        srv->wqueue->appendf("%" PRIu64 " %s", reqId, r->request.buf);
+    } else
+        srv->wqueue->append(r->request.buf, strlen(r->request.buf));
 
     if (!srv->flags.writing) {
         assert(NULL == srv->writebuf);
@@ -1322,7 +1417,7 @@ helperDispatch(helper_server * srv, helper_request * r)
         Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
     }
 
-    debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index << ", " << strlen(r->buf) << " bytes");
+    debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index << ", " << strlen(r->request.buf) << " bytes");
 
     ++ srv->stats.uses;
     ++ srv->stats.pending;
@@ -1330,54 +1425,51 @@ helperDispatch(helper_server * srv, helper_request * r)
 }
 
 static void
-helperStatefulDispatchWriteDone(const Comm::ConnectionPointer &conn, char *buf, size_t len, Comm::Flag flag,
-                                int xerrno, void *data)
-{
-    /* nothing! */
-}
+helperStatefulDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag, int, void *)
+{}
 
 static void
-helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r)
+helperStatefulDispatch(helper_stateful_server * srv, Helper::Xaction * r)
 {
     statefulhelper *hlp = srv->parent;
 
-    if (!cbdataReferenceValid(r->data)) {
+    if (!cbdataReferenceValid(r->request.data)) {
         debugs(84, DBG_IMPORTANT, "helperStatefulDispatch: invalid callback data");
-        helperStatefulRequestFree(r);
-        helperStatefulReleaseServer(srv);
+        delete r;
+        hlp->cancelReservation(srv->reservationId);
         return;
     }
 
     debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index);
 
-    if (r->placeholder == 1) {
+    assert(srv->reservationId);
+    r->reply.reservationId = srv->reservationId;
+
+    if (r->request.placeholder == 1) {
         /* a callback is needed before this request can _use_ a helper. */
         /* we don't care about releasing this helper. The request NEVER
          * gets to the helper. So we throw away the return code */
-        HelperReply nilReply;
-        nilReply.whichServer = srv;
-        r->callback(r->data, nilReply);
+        r->reply.result = Helper::Unknown;
+        r->request.callback(r->request.data, r->reply);
         /* throw away the placeholder */
-        helperStatefulRequestFree(r);
+        delete r;
         /* and push the queue. Note that the callback may have submitted a new
          * request to the helper which is why we test for the request */
 
-        if (srv->request == NULL)
+        if (!srv->requests.size())
             helperStatefulServerDone(srv);
 
         return;
     }
 
-    srv->flags.busy = true;
-    srv->flags.reserved = true;
-    srv->request = r;
+    srv->requests.push_back(r);
     srv->dispatch_time = current_time;
     AsyncCall::Pointer call = commCbCall(5,5, "helperStatefulDispatchWriteDone",
                                          CommIoCbPtrFun(helperStatefulDispatchWriteDone, hlp));
-    Comm::Write(srv->writePipe, r->buf, strlen(r->buf), call, NULL);
+    Comm::Write(srv->writePipe, r->request.buf, strlen(r->request.buf), call, NULL);
     debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
            hlp->id_name << " #" << srv->index << ", " <<
-           (int) strlen(r->buf) << " bytes");
+           (int) strlen(r->request.buf) << " bytes");
 
     ++ srv->stats.uses;
     ++ srv->stats.pending;
@@ -1387,21 +1479,23 @@ helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r
 static void
 helperKickQueue(helper * hlp)
 {
-    helper_request *r;
+    Helper::Xaction *r;
     helper_server *srv;
 
-    while ((srv = GetFirstAvailable(hlp)) && (r = Dequeue(hlp)))
+    while ((srv = GetFirstAvailable(hlp)) && (r = hlp->nextRequest()))
         helperDispatch(srv, r);
 }
 
 static void
 helperStatefulKickQueue(statefulhelper * hlp)
 {
-    helper_stateful_request *r;
+    Helper::Xaction *r;
     helper_stateful_server *srv;
-
-    while ((srv = StatefulGetFirstAvailable(hlp)) && (r = StatefulDequeue(hlp)))
+    while ((srv = StatefulGetFirstAvailable(hlp)) && (r = hlp->nextRequest())) {
+        debugs(84, 5, "found srv-" << srv->index);
+        hlp->reserveServer(srv);
         helperStatefulDispatch(srv, r);
+    }
 }
 
 static void
@@ -1409,42 +1503,66 @@ helperStatefulServerDone(helper_stateful_server * srv)
 {
     if (!srv->flags.shutdown) {
         helperStatefulKickQueue(srv->parent);
-    } else if (!srv->flags.closing && !srv->flags.reserved && !srv->flags.busy) {
+    } else if (!srv->flags.closing && !srv->reserved() && !srv->stats.pending) {
         srv->closeWritePipeSafely(srv->parent->id_name);
         return;
     }
 }
 
-static void
-helperRequestFree(helper_request * r)
-{
-    cbdataReferenceDone(r->data);
-    xfree(r->buf);
-    delete r;
-}
-
-static void
-helperStatefulRequestFree(helper_stateful_request * r)
+void
+helper_server::checkForTimedOutRequests(bool const retry)
 {
-    if (r) {
-        cbdataReferenceDone(r->data);
-        xfree(r->buf);
-        delete r;
+    assert(parent->childs.concurrency);
+    while(!requests.empty() && requests.front()->request.timedOut(parent->timeout)) {
+        Helper::Xaction *r = requests.front();
+        RequestIndex::iterator it;
+        it = requestsIndex.find(r->request.Id);
+        assert(it != requestsIndex.end());
+        requestsIndex.erase(it);
+        requests.pop_front();
+        debugs(84, 2, "Request " << r->request.Id << " timed-out, remove it from queue");
+        void *cbdata;
+        bool retried = false;
+        if (retry && r->request.retries < MAX_RETRIES && cbdataReferenceValid(r->request.data)) {
+            debugs(84, 2, "Retry request " << r->request.Id);
+            ++r->request.retries;
+            parent->submitRequest(r);
+            retried = true;
+        } else if (cbdataReferenceValidDone(r->request.data, &cbdata)) {
+            if (!parent->onTimedOutResponse.isEmpty()) {
+                if (r->reply.accumulate(parent->onTimedOutResponse.rawContent(), parent->onTimedOutResponse.length()))
+                    r->reply.finalize();
+                else
+                    r->reply.result = Helper::TimedOut;
+                r->request.callback(cbdata, r->reply);
+            } else {
+                r->reply.result = Helper::TimedOut;
+                r->request.callback(cbdata, r->reply);
+            }
+        }
+        --stats.pending;
+        ++stats.timedout;
+        ++parent->stats.timedout;
+        if (!retried)
+            delete r;
     }
 }
 
-// TODO: should helper_ and helper_stateful_ have a common parent?
-static bool
-helperStartStats(StoreEntry *sentry, void *hlp, const char *label)
+void
+helper_server::requestTimeout(const CommTimeoutCbParams &io)
 {
-    if (!hlp) {
-        if (label)
-            storeAppendPrintf(sentry, "%s: unavailable\n", label);
-        return false;
-    }
+    debugs(26, 3, HERE << io.conn);
+    helper_server *srv = static_cast<helper_server *>(io.data);
 
-    if (label)
-        storeAppendPrintf(sentry, "%s:\n", label);
+    srv->checkForTimedOutRequests(srv->parent->retryTimedOut);
+
+    debugs(84, 3, HERE << io.conn << " establish new helper_server::requestTimeout");
+    AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "helper_server::requestTimeout",
+                                     CommTimeoutCbPtrFun(helper_server::requestTimeout, srv));
 
-    return true;
+    const int timeSpent = srv->requests.empty() ? 0 : (squid_curtime - srv->requests.front()->request.dispatch_time.tv_sec);
+    const int timeLeft = max(1, (static_cast<int>(srv->parent->timeout) - timeSpent));
+
+    commSetConnTimeout(io.conn, timeLeft, timeoutCall);
 }
+