*/
srv->closePipesSafely(hlp->id_name);
}
+
+ Assure(!hlp->childs.n_active);
+ hlp->dropQueued();
}
void
{
/* note, don't free id_name, it probably points to static memory */
- // TODO: if the queue is not empty it will leak Helper::Request's
- if (!queue.empty())
- debugs(84, DBG_CRITICAL, "WARNING: freeing " << id_name << " helper with " << stats.queue_size << " requests queued");
+ // A non-empty queue would leak Helper::Xaction objects, stalling any
+ // pending (and even future collapsed) transactions. To avoid stalling
+ // transactions, we must dropQueued(). We ought to do that when we
+ // discover that no progress is possible rather than here because
+ // reference counting may keep this object alive for a long time.
+ assert(queue.empty());
}
void
-Helper::Client::handleKilledServer(SessionBase * const srv, bool &needsNewServers)
+Helper::Client::handleKilledServer(SessionBase * const srv)
{
- needsNewServers = false;
if (!srv->flags.shutdown) {
assert(childs.n_active > 0);
--childs.n_active;
if (childs.needNew() > 0) {
srv->flags.shutdown = true;
- needsNewServers = true;
+ openSessions();
}
}
+
+ if (!childs.n_active)
+ dropQueued();
+}
+
+void
+Helper::Client::dropQueued()
+{
+ if (queue.empty())
+ return;
+
+ Assure(!childs.n_active);
+ Assure(!GetFirstAvailable(this));
+
+ // no helper servers means nobody can advance our queued transactions
+
+ debugs(80, DBG_CRITICAL, "ERROR: Dropping " << queue.size() << ' ' <<
+ id_name << " helper requests due to lack of helper processes");
+ // similar to SessionBase::dropQueued()
+ while (const auto r = nextRequest()) {
+ r->reply.result = Helper::Unknown;
+ callBack(*r);
+ delete r;
+ }
}
void
}
}
-void
-Helper::Client::sessionClosed(SessionBase &session)
-{
- bool needsNewServers = false;
- handleKilledServer(&session, needsNewServers);
- if (needsNewServers) {
- debugs(80, DBG_IMPORTANT, "Starting new helpers");
- openSessions();
- }
-}
-
void
Helper::Session::HelperServerClosed(Session * const srv)
{
- srv->parent->sessionClosed(*srv);
+ srv->parent->handleKilledServer(srv);
srv->dropQueued(*srv->parent);
delete srv;
}
void
helper_stateful_server::HelperServerClosed(helper_stateful_server *srv)
{
- srv->parent->sessionClosed(*srv);
+ srv->parent->handleKilledServer(srv);
srv->dropQueued(*srv->parent);
delete srv;
}
while ((srv = GetFirstAvailable(hlp)) && (r = hlp->nextRequest()))
helperDispatch(srv, r);
+
+ if (!hlp->childs.n_active)
+ hlp->dropQueued();
}
static void
hlp->reserveServer(srv);
helperStatefulDispatch(srv, r);
}
+
+ if (!hlp->childs.n_active)
+ hlp->dropQueued();
}
static void
/// Updates internal statistics and starts new helper processes after
/// an unexpected server exit
- /// \param needsNewServers true if new helper(s) must be started, false otherwise
- void handleKilledServer(SessionBase *, bool &needsNewServers);
+ void handleKilledServer(SessionBase *);
/// Reacts to unexpected helper process death(s), including a failure to start helper(s)
/// and an unexpected exit of a previously started helper. \sa handleKilledServer()
/// \param madeProgress whether the died helper(s) responded to any requests
void handleFewerServers(bool madeProgress);
+ /// satisfies all queued requests with a Helper::Unknown answer
+ /// \prec no existing servers will be able to process queued requests
+ /// \sa SessionBase::dropQueued()
+ void dropQueued();
+
/// sends transaction response to the transaction initiator
void callBack(Xaction &);
/// The caller is responsible for checking that new processes are needed.
virtual void openSessions();
- /// handles exited helper process
- void sessionClosed(SessionBase &);
-
public:
wordlist *cmdline = nullptr;
dlink_list servers;