}
void
-Helper::SessionBase::dropQueued()
+Helper::SessionBase::dropQueued(Client &client)
{
while (!requests.empty()) {
// XXX: re-schedule these on another helper?
const auto r = requests.front();
requests.pop_front();
- void *cbdata;
- if (cbdataReferenceValidDone(r->request.data, &cbdata)) {
- r->reply.result = Helper::Unknown;
- r->request.callback(cbdata, r->reply);
- }
-
+ r->reply.result = Helper::Unknown;
+ client.callBack(*r);
delete r;
}
}
}
void
-Helper::Session::dropQueued()
+Helper::Session::dropQueued(Client &client)
{
- SessionBase::dropQueued();
+ SessionBase::dropQueued(client);
requestsIndex.clear();
}
debugs(84, DBG_DATA, Raw("buf", buf, strlen(buf)));
}
+void
+Helper::Client::callBack(Xaction &r)
+{
+ const auto callback = r.request.callback;
+ Assure(callback);
+
+ r.request.callback = nullptr;
+ void *cbdata = nullptr;
+ if (cbdataReferenceValidDone(r.request.data, &cbdata))
+ callback(cbdata, r.reply);
+}
+
/// Submit request or callback the caller with a Helper::Error error.
/// If the reservation is not set then reserves a new helper.
void
if (!lastServer) {
debugs(84, DBG_CRITICAL, "ERROR: Helper " << id_name << " reservation expired (" << reservation << ")");
r->reply.result = Helper::TimedOut;
- r->request.callback(r->request.data, r->reply);
+ callBack(*r);
delete r;
return;
}
Helper::Session::HelperServerClosed(Session * const srv)
{
srv->parent->sessionClosed(*srv);
- srv->dropQueued();
+ srv->dropQueued(*srv->parent);
delete srv;
}
helper_stateful_server::HelperServerClosed(helper_stateful_server *srv)
{
srv->parent->sessionClosed(*srv);
- srv->dropQueued();
+ srv->dropQueued(*srv->parent);
delete srv;
}
debugs(84, DBG_IMPORTANT, "ERROR: helper: " << r->reply << ", attempt #" << (r->request.retries + 1) << " of 2");
retry = true;
} else {
- HLPCB *callback = r->request.callback;
- r->request.callback = nullptr;
- void *cbdata = nullptr;
- if (cbdataReferenceValidDone(r->request.data, &cbdata))
- callback(cbdata, r->reply);
+ hlp->callBack(*r);
}
}
if (cbdataReferenceValid(r->request.data)) {
r->reply.finalize();
r->reply.reservationId = srv->reservationId;
- r->request.callback(r->request.data, r->reply);
+ hlp->callBack(*r);
} else {
debugs(84, DBG_IMPORTANT, "StatefulHandleRead: no callback data registered");
called = 0;
/* we don't care about releasing this helper. The request NEVER
* gets to the helper. So we throw away the return code */
r->reply.result = Helper::Unknown;
- r->request.callback(r->request.data, r->reply);
+ hlp->callBack(*r);
/* throw away the placeholder */
delete r;
/* and push the queue. Note that the callback may have submitted a new
requestsIndex.erase(it);
requests.pop_front();
debugs(84, 2, "Request " << r->request.Id << " timed-out, remove it from queue");
- void *cbdata;
bool retried = false;
if (retry && r->request.retries < MAX_RETRIES && cbdataReferenceValid(r->request.data)) {
debugs(84, 2, "Retry request " << r->request.Id);
++r->request.retries;
parent->submitRequest(r);
retried = true;
- } else if (cbdataReferenceValidDone(r->request.data, &cbdata)) {
+ } else if (cbdataReferenceValid(r->request.data)) {
if (!parent->onTimedOutResponse.isEmpty()) {
if (r->reply.accumulate(parent->onTimedOutResponse.rawContent(), parent->onTimedOutResponse.length()))
r->reply.finalize();
else
r->reply.result = Helper::TimedOut;
- r->request.callback(cbdata, r->reply);
+ parent->callBack(*r);
} else {
r->reply.result = Helper::TimedOut;
- r->request.callback(cbdata, r->reply);
+ parent->callBack(*r);
}
}
--stats.pending;
/// \param madeProgress whether the died helper(s) responded to any requests
void handleFewerServers(bool madeProgress);
+ /// sends transaction response to the transaction initiator
+ void callBack(Xaction &);
+
/// Starts required helper process(es).
/// The caller is responsible for checking that new processes are needed.
virtual void openSessions();
virtual bool reserved() = 0;
/// dequeues and sends an Unknown answer to all queued requests
- virtual void dropQueued();
+ virtual void dropQueued(Client &);
public:
/// Helper program identifier; does not change when contents do,
/* SessionBase API */
bool reserved() override {return false;}
- void dropQueued() override;
+ void dropQueued(Client &) override;
/// Read timeout handler
static void requestTimeout(const CommTimeoutCbParams &io);