From: Christos Tsantilas Date: Tue, 12 Feb 2019 17:16:23 +0000 (+0000) Subject: Reuse reserved Negotiate and NTLM helpers after an idle timeout (#59) X-Git-Tag: M-staged-PR364~1 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=a56fcf0;p=thirdparty%2Fsquid.git Reuse reserved Negotiate and NTLM helpers after an idle timeout (#59) Squid can be killed or maimed by enough clients that start multi-step connection authentication but never follow up with the second HTTP request while keeping their HTTP connection open. Affected helpers remain in the "reserved" state and cannot be reused for other clients. Observed helper exhaustion has happened without any malicious intent. To address the problem, we add a helper reservation timeout. Timed out reserved helpers may be reused by new clients/connections. To minimize problems with slow-to-resume-authentication clients, timed out reserved helpers are not reused until there are no unreserved running helpers left. The reservations are tracked using unique integer IDs. Also fixed Squid crashes caused by unexpected helper termination -- the raw UserRequest::authserver pointer could point to a deleted helper. This is a Measurement Factory project. --- diff --git a/src/auth/negotiate/UserRequest.cc b/src/auth/negotiate/UserRequest.cc index 07006acce9..168d8cacd0 100644 --- a/src/auth/negotiate/UserRequest.cc +++ b/src/auth/negotiate/UserRequest.cc @@ -28,7 +28,6 @@ #include "SquidTime.h" Auth::Negotiate::UserRequest::UserRequest() : - authserver(nullptr), server_blob(nullptr), client_blob(nullptr), waiting(0), @@ -164,7 +163,7 @@ Auth::Negotiate::UserRequest::startHelperLookup(HttpRequest *, AccessLogEntry::P safe_free(client_blob); helperStatefulSubmit(negotiateauthenticators, buf, Auth::Negotiate::UserRequest::HandleReply, - new Auth::StateData(this, handler, data), authserver); + new Auth::StateData(this, handler, data), reservationId); } /** @@ -174,10 +173,10 @@ Auth::Negotiate::UserRequest::startHelperLookup(HttpRequest *, AccessLogEntry::P void Auth::Negotiate::UserRequest::releaseAuthServer() { - if (authserver) { - debugs(29, 6, HERE << "releasing Negotiate auth server '" << authserver << "'"); - helperStatefulReleaseServer(authserver); - authserver = NULL; + if (reservationId) { + debugs(29, 6, reservationId); + negotiateauthenticators->cancelReservation(reservationId); + reservationId.clear(); } else debugs(29, 6, HERE << "No Negotiate auth server to release."); } @@ -266,10 +265,10 @@ Auth::Negotiate::UserRequest::HandleReply(void *data, const Helper::Reply &reply { Auth::StateData *r = static_cast(data); - debugs(29, 8, HERE << "helper: '" << reply.whichServer << "' sent us reply=" << reply); + debugs(29, 8, reply.reservationId << " got reply=" << reply); if (!cbdataReferenceValid(r->data)) { - debugs(29, DBG_IMPORTANT, "ERROR: Negotiate Authentication invalid callback data. helper '" << reply.whichServer << "'."); + debugs(29, DBG_IMPORTANT, "ERROR: Negotiate Authentication invalid callback data (" << reply.reservationId << ")"); delete r; return; } @@ -293,10 +292,10 @@ Auth::Negotiate::UserRequest::HandleReply(void *data, const Helper::Reply &reply assert(auth_user_request->user() != NULL); assert(auth_user_request->user()->auth_type == Auth::AUTH_NEGOTIATE); - if (lm_request->authserver == NULL) - lm_request->authserver = reply.whichServer.get(); // XXX: no locking? + if (!lm_request->reservationId) + lm_request->reservationId = reply.reservationId; else - assert(reply.whichServer == lm_request->authserver); + assert(reply.reservationId == lm_request->reservationId); switch (reply.result) { case Helper::TT: @@ -368,7 +367,7 @@ Auth::Negotiate::UserRequest::HandleReply(void *data, const Helper::Reply &reply break; case Helper::Unknown: - debugs(29, DBG_IMPORTANT, "ERROR: Negotiate Authentication Helper '" << reply.whichServer << "' crashed!."); + debugs(29, DBG_IMPORTANT, "ERROR: Negotiate Authentication Helper crashed (" << reply.reservationId << ")"); /* continue to the next case */ case Helper::TimedOut: diff --git a/src/auth/negotiate/UserRequest.h b/src/auth/negotiate/UserRequest.h index a21cec70da..66d9040335 100644 --- a/src/auth/negotiate/UserRequest.h +++ b/src/auth/negotiate/UserRequest.h @@ -13,6 +13,7 @@ #include "auth/UserRequest.h" #include "helper/forward.h" +#include "helper/ReservationId.h" class ConnStateData; class HttpReply; @@ -38,8 +39,6 @@ public: virtual const char * connLastHeader(); - /* we need to store the helper server between requests */ - helper_stateful_server *authserver; void releaseAuthServer(void); ///< Release the authserver helper server properly. /* what connection is this associated with */ @@ -56,6 +55,9 @@ public: /* need access to the request flags to mess around on pconn failure */ HttpRequest *request; + /// a helper-issued reservation locking the helper state between + /// HTTP requests + Helper::ReservationId reservationId; private: static HLPCB HandleReply; }; diff --git a/src/auth/ntlm/UserRequest.cc b/src/auth/ntlm/UserRequest.cc index f2b42586b5..2edbeb59b7 100644 --- a/src/auth/ntlm/UserRequest.cc +++ b/src/auth/ntlm/UserRequest.cc @@ -26,7 +26,6 @@ #include "SquidTime.h" Auth::Ntlm::UserRequest::UserRequest() : - authserver(nullptr), server_blob(nullptr), client_blob(nullptr), waiting(0), @@ -157,7 +156,7 @@ Auth::Ntlm::UserRequest::startHelperLookup(HttpRequest *, AccessLogEntry::Pointe safe_free(client_blob); helperStatefulSubmit(ntlmauthenticators, buf, Auth::Ntlm::UserRequest::HandleReply, - new Auth::StateData(this, handler, data), authserver); + new Auth::StateData(this, handler, data), reservationId); } /** @@ -167,10 +166,10 @@ Auth::Ntlm::UserRequest::startHelperLookup(HttpRequest *, AccessLogEntry::Pointe void Auth::Ntlm::UserRequest::releaseAuthServer() { - if (authserver) { - debugs(29, 6, HERE << "releasing NTLM auth server '" << authserver << "'"); - helperStatefulReleaseServer(authserver); - authserver = NULL; + if (reservationId) { + debugs(29, 6, reservationId); + ntlmauthenticators->cancelReservation(reservationId); + reservationId.clear(); } else debugs(29, 6, HERE << "No NTLM auth server to release."); } @@ -260,10 +259,10 @@ Auth::Ntlm::UserRequest::HandleReply(void *data, const Helper::Reply &reply) { Auth::StateData *r = static_cast(data); - debugs(29, 8, HERE << "helper: '" << reply.whichServer << "' sent us reply=" << reply); + debugs(29, 8, reply.reservationId << " got reply=" << reply); if (!cbdataReferenceValid(r->data)) { - debugs(29, DBG_IMPORTANT, "ERROR: NTLM Authentication invalid callback data. helper '" << reply.whichServer << "'."); + debugs(29, DBG_IMPORTANT, "ERROR: NTLM Authentication invalid callback data(" << reply.reservationId <<")"); delete r; return; } @@ -287,10 +286,10 @@ Auth::Ntlm::UserRequest::HandleReply(void *data, const Helper::Reply &reply) assert(auth_user_request->user() != NULL); assert(auth_user_request->user()->auth_type == Auth::AUTH_NTLM); - if (lm_request->authserver == NULL) - lm_request->authserver = reply.whichServer.get(); // XXX: no locking? + if (!lm_request->reservationId) + lm_request->reservationId = reply.reservationId; else - assert(reply.whichServer == lm_request->authserver); + assert(lm_request->reservationId == reply.reservationId); switch (reply.result) { case Helper::TT: @@ -360,7 +359,7 @@ Auth::Ntlm::UserRequest::HandleReply(void *data, const Helper::Reply &reply) break; case Helper::Unknown: - debugs(29, DBG_IMPORTANT, "ERROR: NTLM Authentication Helper '" << reply.whichServer << "' crashed!."); + debugs(29, DBG_IMPORTANT, "ERROR: NTLM Authentication Helper crashed (" << reply.reservationId << ")"); /* continue to the next case */ case Helper::TimedOut: diff --git a/src/auth/ntlm/UserRequest.h b/src/auth/ntlm/UserRequest.h index 02d61b9513..170dbf05ee 100644 --- a/src/auth/ntlm/UserRequest.h +++ b/src/auth/ntlm/UserRequest.h @@ -13,6 +13,7 @@ #include "auth/UserRequest.h" #include "helper/forward.h" +#include "helper/ReservationId.h" class ConnStateData; class HttpReply; @@ -38,8 +39,6 @@ public: virtual const char * connLastHeader(); - /* we need to store the helper server between requests */ - helper_stateful_server *authserver; virtual void releaseAuthServer(); ///< Release authserver NTLM helpers properly when finished or abandoning. /* our current blob to pass to the client */ @@ -54,6 +53,9 @@ public: /* need access to the request flags to mess around on pconn failure */ HttpRequest *request; + /// a helper-issued reservation locking the helper state between + /// HTTP requests + Helper::ReservationId reservationId; private: static HLPCB HandleReply; }; diff --git a/src/cf.data.pre b/src/cf.data.pre index f67d1a438a..ed1ced4bab 100644 --- a/src/cf.data.pre +++ b/src/cf.data.pre @@ -594,6 +594,7 @@ DOC_START "children" numberofchildren [startup=N] [idle=N] [concurrency=N] [queue-size=N] [on-persistent-overload=action] + [reservation-timeout=seconds] The maximum number of authenticator processes to spawn. If you start too few Squid will have to wait for them to process @@ -646,6 +647,22 @@ DOC_START NOTE: NTLM and Negotiate schemes do not support concurrency in the Squid code module even though some helpers can. + The reservation-timeout=seconds option allows NTLM and Negotiate + helpers to forget about clients that abandon their in-progress + connection authentication without closing the connection. The + timeout is measured since the last helper response received by + Squid for the client. Fractional seconds are not supported. + + After the timeout, the helper will be used for other clients if + there are no unreserved helpers available. In the latter case, + the old client attempt to resume authentication will not be + forwarded to the helper (and the client should open a new HTTP + connection and retry authentication from scratch). + + By default, reservations do not expire and clients that keep + their connections open without completing authentication may + exhaust all NTLM and Negotiate helpers. + "keep_alive" on|off If you experience problems with PUT/POST requests when using the NTLM or Negotiate schemes then you can try setting this diff --git a/src/helper.cc b/src/helper.cc index 0fd27c4b02..61f7081037 100644 --- a/src/helper.cc +++ b/src/helper.cc @@ -42,11 +42,9 @@ const size_t ReadBufSize(32*1024); static IOCB helperHandleRead; static IOCB helperStatefulHandleRead; -static void helperServerFree(helper_server *srv); -static void helperStatefulServerFree(helper_stateful_server *srv); static void Enqueue(helper * hlp, Helper::Xaction *); static helper_server *GetFirstAvailable(const helper * hlp); -static helper_stateful_server *StatefulGetFirstAvailable(const statefulhelper * hlp); +static helper_stateful_server *StatefulGetFirstAvailable(statefulhelper * hlp); static void helperDispatch(helper_server * srv, Helper::Xaction * r); static void helperStatefulDispatch(helper_stateful_server * srv, Helper::Xaction * r); static void helperKickQueue(helper * hlp); @@ -121,6 +119,79 @@ HelperServerBase::closeWritePipeSafely(const char *id_name) #endif } +void +HelperServerBase::dropQueued() +{ + while (!requests.empty()) { + // XXX: re-schedule these on another helper? + Helper::Xaction *r = requests.front(); + requests.pop_front(); + void *cbdata; + if (cbdataReferenceValidDone(r->request.data, &cbdata)) { + r->reply.result = Helper::Unknown; + r->request.callback(cbdata, r->reply); + } + + delete r; + } +} + +HelperServerBase::~HelperServerBase() +{ + if (rbuf) { + memFreeBuf(rbuf_sz, rbuf); + rbuf = NULL; + } +} + +helper_server::~helper_server() +{ + wqueue->clean(); + delete wqueue; + + if (writebuf) { + writebuf->clean(); + delete writebuf; + writebuf = NULL; + } + + if (Comm::IsConnOpen(writePipe)) + closeWritePipeSafely(parent->id_name); + + dlinkDelete(&link, &parent->servers); + + assert(parent->childs.n_running > 0); + -- parent->childs.n_running; + + assert(requests.empty()); + cbdataReferenceDone(parent); +} + +void +helper_server::dropQueued() +{ + HelperServerBase::dropQueued(); + requestsIndex.clear(); +} + +helper_stateful_server::~helper_stateful_server() +{ + /* TODO: walk the local queue of requests and carry them all out */ + if (Comm::IsConnOpen(writePipe)) + closeWritePipeSafely(parent->id_name); + + parent->cancelReservation(reservationId); + + dlinkDelete(&link, &parent->servers); + + assert(parent->childs.n_running > 0); + -- parent->childs.n_running; + + assert(requests.empty()); + + cbdataReferenceDone(parent); +} + void helperOpenServers(helper * hlp) { @@ -227,7 +298,7 @@ helperOpenServers(helper * hlp) if (wfd != rfd) commSetNonBlocking(wfd); - AsyncCall::Pointer closeCall = asyncCall(5,4, "helperServerFree", cbdataDialer(helperServerFree, srv)); + AsyncCall::Pointer closeCall = asyncCall(5,4, "helper_server::HelperServerClosed", cbdataDialer(helper_server::HelperServerClosed, srv)); comm_add_close_handler(rfd, closeCall); if (hlp->timeout && hlp->childs.concurrency) { @@ -324,7 +395,6 @@ helperStatefulOpenServers(statefulhelper * hlp) helper_stateful_server *srv = new helper_stateful_server; srv->hIpc = hIpc; srv->pid = pid; - srv->flags.reserved = false; srv->initStats(); srv->addr = hlp->addr; srv->readPipe = new Comm::Connection; @@ -334,6 +404,7 @@ helperStatefulOpenServers(statefulhelper * hlp) srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz); srv->roffset = 0; srv->parent = cbdataReference(hlp); + srv->reservationStart = 0; dlinkAddTail(srv, &srv->link, &hlp->servers); @@ -352,7 +423,7 @@ helperStatefulOpenServers(statefulhelper * hlp) if (wfd != rfd) commSetNonBlocking(wfd); - AsyncCall::Pointer closeCall = asyncCall(5,4, "helperStatefulServerFree", cbdataDialer(helperStatefulServerFree, srv)); + AsyncCall::Pointer closeCall = asyncCall(5,4, "helper_stateful_server::HelperServerClosed", cbdataDialer(helper_stateful_server::HelperServerClosed, srv)); comm_add_close_handler(rfd, closeCall); AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead", @@ -485,39 +556,107 @@ helper::submit(const char *buf, HLPCB * callback, void *data) debugs(84, DBG_DATA, Raw("buf", buf, strlen(buf))); } -/// lastserver = "server last used as part of a reserved request sequence" +/// Submit request or callback the caller with a Helper::Error error. +/// If the reservation is not set then reserves a new helper. void -helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPCB * callback, void *data, helper_stateful_server * lastserver) +helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation) { - if (!hlp || !hlp->trySubmit(buf, callback, data, lastserver)) + if (!hlp || !hlp->trySubmit(buf, callback, data, reservation)) SubmissionFailure(hlp, callback, data); } /// If possible, submit request. Otherwise, either kill Squid or return false. bool -statefulhelper::trySubmit(const char *buf, HLPCB * callback, void *data, helper_stateful_server *lastserver) +statefulhelper::trySubmit(const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation) { if (!prepSubmit()) return false; // request was dropped - submit(buf, callback, data, lastserver); // will send or queue + submit(buf, callback, data, reservation); // will send or queue return true; // request submitted or queued } -void statefulhelper::submit(const char *buf, HLPCB * callback, void *data, helper_stateful_server * lastserver) +void +statefulhelper::reserveServer(helper_stateful_server * srv) { - Helper::Xaction *r = new Helper::Xaction(callback, data, buf); + // clear any old reservation + if (srv->reserved()) { + reservations.erase(srv->reservationId); + srv->clearReservation(); + } + + srv->reserve(); + reservations.insert(Reservations::value_type(srv->reservationId, srv)); +} + +void +statefulhelper::cancelReservation(const Helper::ReservationId reservation) +{ + const auto it = reservations.find(reservation); + if (it == reservations.end()) + return; + + helper_stateful_server *srv = it->second; + reservations.erase(it); + srv->clearReservation(); + + // schedule a queue kick + AsyncCall::Pointer call = asyncCall(5,4, "helperStatefulServerDone", cbdataDialer(helperStatefulServerDone, srv)); + ScheduleCallHere(call); +} + +helper_stateful_server * +statefulhelper::findServer(const Helper::ReservationId & reservation) +{ + const auto it = reservations.find(reservation); + if (it == reservations.end()) + return nullptr; + return it->second; +} + +void +helper_stateful_server::reserve() +{ + assert(!reservationId); + reservationStart = squid_curtime; + reservationId = Helper::ReservationId::Next(); + debugs(84, 3, "srv-" << index << " reservation id = " << reservationId); +} + +void +helper_stateful_server::clearReservation() +{ + debugs(84, 3, "srv-" << index << " reservation id = " << reservationId); + if (!reservationId) + return; - if ((buf != NULL) && lastserver) { - debugs(84, 5, "StatefulSubmit with lastserver " << lastserver); - assert(lastserver->flags.reserved); - assert(!lastserver->requests.size()); + ++stats.releases; + + reservationId.clear(); + reservationStart = 0; +} + +void +statefulhelper::submit(const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation) +{ + Helper::Xaction *r = new Helper::Xaction(callback, data, buf); + if (buf && reservation) { + debugs(84, 5, reservation); + helper_stateful_server *lastServer = findServer(reservation); + if (!lastServer) { + debugs(84, DBG_CRITICAL, "ERROR: Helper " << id_name << " reservation expired (" << reservation << ")"); + r->reply.result = Helper::TimedOut; + r->request.callback(r->request.data, r->reply); + delete r; + return; + } debugs(84, 5, "StatefulSubmit dispatching"); - helperStatefulDispatch(lastserver, r); + helperStatefulDispatch(lastServer, r); } else { helper_stateful_server *srv; if ((srv = StatefulGetFirstAvailable(this))) { + reserveServer(srv); helperStatefulDispatch(srv, r); } else StatefulEnqueue(this, r); @@ -529,26 +668,6 @@ void statefulhelper::submit(const char *buf, HLPCB * callback, void *data, helpe syncQueueStats(); } -/** - * DPW 2007-05-08 - * - * helperStatefulReleaseServer tells the helper that whoever was - * using it no longer needs its services. - */ -void -helperStatefulReleaseServer(helper_stateful_server * srv) -{ - debugs(84, 3, HERE << "srv-" << srv->index << " flags.reserved = " << srv->flags.reserved); - if (!srv->flags.reserved) - return; - - ++ srv->stats.releases; - - srv->flags.reserved = false; - - helperStatefulServerDone(srv); -} - void helper::packStatsInto(Packable *p, const char *label) const { @@ -590,7 +709,7 @@ helper::packStatsInto(Packable *p, const char *label) const srv->stats.pending ? 'B' : ' ', srv->flags.writing ? 'W' : ' ', srv->flags.closing ? 'C' : ' ', - srv->flags.reserved ? 'R' : ' ', + srv->reserved() ? 'R' : ' ', srv->flags.shutdown ? 'S' : ' ', xaction && xaction->request.placeholder ? 'P' : ' ', tt < 0.0 ? 0.0 : tt, @@ -678,7 +797,7 @@ helperStatefulShutdown(statefulhelper * hlp) continue; } - if (srv->flags.reserved) { + if (srv->reserved()) { if (shutting_down) { debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Closing anyway."); } else { @@ -705,141 +824,62 @@ helper::~helper() debugs(84, DBG_CRITICAL, "WARNING: freeing " << id_name << " helper with " << stats.queue_size << " requests queued"); } -/* ====================================================================== */ -/* LOCAL FUNCTIONS */ -/* ====================================================================== */ - -static void -helperServerFree(helper_server *srv) +void +helper::handleKilledServer(HelperServerBase *srv, bool &needsNewServers) { - helper *hlp = srv->parent; - int concurrency = hlp->childs.concurrency; - - if (!concurrency) - concurrency = 1; - - if (srv->rbuf) { - memFreeBuf(srv->rbuf_sz, srv->rbuf); - srv->rbuf = NULL; - } - - srv->wqueue->clean(); - delete srv->wqueue; - - if (srv->writebuf) { - srv->writebuf->clean(); - delete srv->writebuf; - srv->writebuf = NULL; - } - - if (Comm::IsConnOpen(srv->writePipe)) - srv->closeWritePipeSafely(hlp->id_name); - - dlinkDelete(&srv->link, &hlp->servers); - - assert(hlp->childs.n_running > 0); - -- hlp->childs.n_running; - + needsNewServers = false; if (!srv->flags.shutdown) { - assert(hlp->childs.n_active > 0); - -- hlp->childs.n_active; - debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index << " exited"); + assert(childs.n_active > 0); + --childs.n_active; + debugs(84, DBG_CRITICAL, "WARNING: " << id_name << " #" << srv->index << " exited"); - if (hlp->childs.needNew() > 0) { - debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")"); + if (childs.needNew() > 0) { + debugs(80, DBG_IMPORTANT, "Too few " << id_name << " processes are running (need " << childs.needNew() << "/" << childs.n_max << ")"); - if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30) { + if (childs.n_active < childs.n_startup && last_restart > squid_curtime - 30) { if (srv->stats.replies < 1) - fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name); + fatalf("The %s helpers are crashing too rapidly, need help!\n", id_name); else - debugs(80, DBG_CRITICAL, "ERROR: The " << hlp->id_name << " helpers are crashing too rapidly, need help!"); + debugs(80, DBG_CRITICAL, "ERROR: The " << id_name << " helpers are crashing too rapidly, need help!"); } - - debugs(80, DBG_IMPORTANT, "Starting new helpers"); - helperOpenServers(hlp); - } - } - - while (!srv->requests.empty()) { - // XXX: re-schedule these on another helper? - Helper::Xaction *r = srv->requests.front(); - srv->requests.pop_front(); - void *cbdata; - - if (cbdataReferenceValidDone(r->request.data, &cbdata)) { - r->reply.result = Helper::Unknown; - r->request.callback(cbdata, r->reply); + srv->flags.shutdown = true; + needsNewServers = true; } - - delete r; } - srv->requestsIndex.clear(); - - cbdataReferenceDone(srv->parent); - delete srv; } -static void -helperStatefulServerFree(helper_stateful_server *srv) +void +helper_server::HelperServerClosed(helper_server *srv) { - statefulhelper *hlp = srv->parent; + helper *hlp = srv->getParent(); - if (srv->rbuf) { - memFreeBuf(srv->rbuf_sz, srv->rbuf); - srv->rbuf = NULL; + bool needsNewServers = false; + hlp->handleKilledServer(srv, needsNewServers); + if (needsNewServers) { + debugs(80, DBG_IMPORTANT, "Starting new helpers"); + helperOpenServers(hlp); } -#if 0 - srv->wqueue->clean(); - - delete srv->wqueue; - -#endif - - /* TODO: walk the local queue of requests and carry them all out */ - if (Comm::IsConnOpen(srv->writePipe)) - srv->closeWritePipeSafely(hlp->id_name); - - dlinkDelete(&srv->link, &hlp->servers); - - assert(hlp->childs.n_running > 0); - -- hlp->childs.n_running; - - if (!srv->flags.shutdown) { - assert( hlp->childs.n_active > 0); - -- hlp->childs.n_active; - debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index << " exited"); - - if (hlp->childs.needNew() > 0) { - debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")"); - - if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30) { - if (srv->stats.replies < 1) - fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name); - else - debugs(80, DBG_CRITICAL, "ERROR: The " << hlp->id_name << " helpers are crashing too rapidly, need help!"); - } - - debugs(80, DBG_IMPORTANT, "Starting new helpers"); - helperStatefulOpenServers(hlp); - } - } + srv->dropQueued(); - while (!srv->requests.empty()) { - // XXX: re-schedule these on another helper? - Helper::Xaction *r = srv->requests.front(); - srv->requests.pop_front(); - void *cbdata; + delete srv; +} - if (cbdataReferenceValidDone(r->request.data, &cbdata)) { - r->reply.result = Helper::Unknown; - r->request.callback(cbdata, r->reply); - } +// XXX: Almost duplicates helper_server::HelperServerClosed() because helperOpenServers() is not a virtual method of the `helper` class +// TODO: Fix the `helper` class hierarchy to use CbdataParent and virtual functions. +void +helper_stateful_server::HelperServerClosed(helper_stateful_server *srv) +{ + statefulhelper *hlp = static_cast(srv->getParent()); - delete r; + bool needsNewServers = false; + hlp->handleKilledServer(srv, needsNewServers); + if (needsNewServers) { + debugs(80, DBG_IMPORTANT, "Starting new helpers"); + helperStatefulOpenServers(hlp); } - cbdataReferenceDone(srv->parent); + srv->dropQueued(); delete srv; } @@ -1120,7 +1160,7 @@ helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len if (r && cbdataReferenceValid(r->request.data)) { r->reply.finalize(); - r->reply.whichServer = srv; + r->reply.reservationId = srv->reservationId; r->request.callback(r->request.data, r->reply); } else { debugs(84, DBG_IMPORTANT, "StatefulHandleRead: no callback data registered"); @@ -1142,7 +1182,7 @@ helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len if (called) helperStatefulServerDone(srv); else - helperStatefulReleaseServer(srv); + hlp->cancelReservation(srv->reservationId); } if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) { @@ -1272,10 +1312,11 @@ GetFirstAvailable(const helper * hlp) } static helper_stateful_server * -StatefulGetFirstAvailable(const statefulhelper * hlp) +StatefulGetFirstAvailable(statefulhelper * hlp) { dlink_node *n; helper_stateful_server *srv = NULL; + helper_stateful_server *oldestReservedServer = nullptr; debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp->childs.n_running); if (hlp->childs.n_running == 0) @@ -1287,8 +1328,16 @@ StatefulGetFirstAvailable(const statefulhelper * hlp) if (srv->stats.pending) continue; - if (srv->flags.reserved) + if (srv->reserved()) { + if ((squid_curtime - srv->reservationStart) > hlp->childs.reservationTimeout) { + if (!oldestReservedServer) + oldestReservedServer = srv; + else if (oldestReservedServer->reservationStart < srv->reservationStart) + oldestReservedServer = srv; + debugs(84, 5, "the earlier reserved server is the srv-" << oldestReservedServer->index); + } continue; + } if (srv->flags.shutdown) continue; @@ -1297,8 +1346,13 @@ StatefulGetFirstAvailable(const statefulhelper * hlp) return srv; } + if (oldestReservedServer) { + debugs(84, 5, "expired reservation " << oldestReservedServer->reservationId << " for srv-" << oldestReservedServer->index); + return oldestReservedServer; + } + debugs(84, 5, "StatefulGetFirstAvailable: None available."); - return NULL; + return nullptr; } static void @@ -1382,18 +1436,20 @@ helperStatefulDispatch(helper_stateful_server * srv, Helper::Xaction * r) if (!cbdataReferenceValid(r->request.data)) { debugs(84, DBG_IMPORTANT, "helperStatefulDispatch: invalid callback data"); delete r; - helperStatefulReleaseServer(srv); + hlp->cancelReservation(srv->reservationId); return; } debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index); + assert(srv->reservationId); + r->reply.reservationId = srv->reservationId; + if (r->request.placeholder == 1) { /* a callback is needed before this request can _use_ a helper. */ /* we don't care about releasing this helper. The request NEVER * gets to the helper. So we throw away the return code */ r->reply.result = Helper::Unknown; - r->reply.whichServer = srv; r->request.callback(r->request.data, r->reply); /* throw away the placeholder */ delete r; @@ -1406,7 +1462,6 @@ helperStatefulDispatch(helper_stateful_server * srv, Helper::Xaction * r) return; } - srv->flags.reserved = true; srv->requests.push_back(r); srv->dispatch_time = current_time; AsyncCall::Pointer call = commCbCall(5,5, "helperStatefulDispatchWriteDone", @@ -1436,9 +1491,11 @@ helperStatefulKickQueue(statefulhelper * hlp) { Helper::Xaction *r; helper_stateful_server *srv; - - while ((srv = StatefulGetFirstAvailable(hlp)) && (r = hlp->nextRequest())) + while ((srv = StatefulGetFirstAvailable(hlp)) && (r = hlp->nextRequest())) { + debugs(84, 5, "found srv-" << srv->index); + hlp->reserveServer(srv); helperStatefulDispatch(srv, r); + } } static void @@ -1446,7 +1503,7 @@ helperStatefulServerDone(helper_stateful_server * srv) { if (!srv->flags.shutdown) { helperStatefulKickQueue(srv->parent); - } else if (!srv->flags.closing && !srv->flags.reserved && !srv->stats.pending) { + } else if (!srv->flags.closing && !srv->reserved() && !srv->stats.pending) { srv->closeWritePipeSafely(srv->parent->id_name); return; } @@ -1497,9 +1554,6 @@ helper_server::requestTimeout(const CommTimeoutCbParams &io) debugs(26, 3, HERE << io.conn); helper_server *srv = static_cast(io.data); - if (!cbdataReferenceValid(srv)) - return; - srv->checkForTimedOutRequests(srv->parent->retryTimedOut); debugs(84, 3, HERE << io.conn << " establish new helper_server::requestTimeout"); diff --git a/src/helper.h b/src/helper.h index d70612b271..3512b35cd8 100644 --- a/src/helper.h +++ b/src/helper.h @@ -20,11 +20,12 @@ #include "helper/forward.h" #include "helper/Reply.h" #include "helper/Request.h" +#include "helper/ReservationId.h" #include "ip/Address.h" #include "sbuf/SBuf.h" #include -#include +#include #include class Packable; @@ -42,6 +43,7 @@ public: }; } +class HelperServerBase; /** * Managers a set of individual helper processes with a common queue of requests. * @@ -94,6 +96,11 @@ public: /// already overloaded helpers return true bool willOverload() const; + /// Updates interall statistics and start new helper server processes after + /// an unexpected server exit + /// \param needsNewServers true if new servers must started, false otherwise + void handleKilledServer(HelperServerBase *srv, bool &needsNewServers); + public: wordlist *cmdline; dlink_list servers; @@ -134,21 +141,36 @@ class statefulhelper : public helper CBDATA_CLASS(statefulhelper); public: + typedef std::unordered_map Reservations; + inline statefulhelper(const char *name) : helper(name) {} inline ~statefulhelper() {} +public: + /// reserve the given server + void reserveServer(helper_stateful_server * srv); + + /// undo reserveServer(), clear the reservation and kick the queue + void cancelReservation(const Helper::ReservationId reservation); + private: - friend void helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPCB * callback, void *data, helper_stateful_server * lastserver); - void submit(const char *buf, HLPCB * callback, void *data, helper_stateful_server *lastserver); - bool trySubmit(const char *buf, HLPCB * callback, void *data, helper_stateful_server *lastserver); + friend void helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation); + + /// \return the previously reserved server (if the reservation is still valid) or nil + helper_stateful_server *findServer(const Helper::ReservationId & reservation); + + void submit(const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation); + bool trySubmit(const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation); + + ///< reserved servers indexed by reservation IDs + Reservations reservations; }; -/** - * Fields shared between stateless and stateful helper servers. - */ -class HelperServerBase +/// represents a single helper process abstraction +class HelperServerBase: public CbdataParent { public: + virtual ~HelperServerBase(); /** Closes pipes to the helper safely. * Handles the case where the read and write pipes are the same FD. * @@ -164,6 +186,16 @@ public: */ void closeWritePipeSafely(const char *name); + // TODO: Teach each child to report its child-specific state instead. + /// whether the server is locked for exclusive use by a client + virtual bool reserved() = 0; + + /// dequeues and sends a Helper::Unknown answer to all queued requests + virtual void dropQueued(); + + /// the helper object that created this server + virtual helper *getParent() const = 0; + public: /// Helper program identifier; does not change when contents do, /// including during assignment @@ -187,7 +219,6 @@ public: bool writing; bool closing; bool shutdown; - bool reserved; } flags; typedef std::list Requests; @@ -206,9 +237,11 @@ public: class MemBuf; class CommTimeoutCbParams; +// TODO: Rename to StatelessHelperServer and rename HelperServerBase to HelperServer. +/// represents a single "stateless helper" process class helper_server : public HelperServerBase { - CBDATA_CLASS(helper_server); + CBDATA_CHILD(helper_server); public: uint64_t nextRequestId; @@ -231,6 +264,7 @@ public: typedef std::map RequestIndex; RequestIndex requestsIndex; ///< maps request IDs to requests + virtual ~helper_server(); /// Search in queue for the request with requestId, return the related /// Xaction object and remove it from queue. /// If concurrency is disabled then the requestId is ignored and the @@ -241,26 +275,53 @@ public: /// or the configured "on timeout response" for timedout requests. void checkForTimedOutRequests(bool const retry); + /*HelperServerBase API*/ + virtual bool reserved() override {return false;} + virtual void dropQueued() override; + virtual helper *getParent() const override {return parent;} + /// Read timeout handler static void requestTimeout(const CommTimeoutCbParams &io); + + /// close handler to handle exited server processes + static void HelperServerClosed(helper_server *srv); }; +// TODO: Rename to StatefulHelperServer and rename HelperServerBase to HelperServer. +/// represents a single "stateful helper" process class helper_stateful_server : public HelperServerBase { - CBDATA_CLASS(helper_stateful_server); + CBDATA_CHILD(helper_stateful_server); public: + virtual ~helper_stateful_server(); + void reserve(); + void clearReservation(); + + /* HelperServerBase API */ + virtual bool reserved() override {return reservationId.reserved();} + virtual helper *getParent() const override {return parent;} + + /// close handler to handle exited server processes + static void HelperServerClosed(helper_stateful_server *srv); + statefulhelper *parent; + + // Reservations temporary lock the server for an exclusive "client" use. The + // client keeps the reservation ID as a proof of her reservation. If a + // reservation expires, and the server is reserved for another client, then + // the reservation ID presented by the late client will not match ours. + Helper::ReservationId reservationId; ///< "confirmation ID" of the last + time_t reservationStart; ///< when the last `reservation` was made }; /* helper.c */ void helperOpenServers(helper * hlp); void helperStatefulOpenServers(statefulhelper * hlp); void helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data); -void helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPCB * callback, void *data, helper_stateful_server * lastserver); +void helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPCB * callback, void *data, uint64_t reservation); void helperShutdown(helper * hlp); void helperStatefulShutdown(statefulhelper * hlp); -void helperStatefulReleaseServer(helper_stateful_server * srv); #endif /* SQUID_HELPER_H */ diff --git a/src/helper/ChildConfig.cc b/src/helper/ChildConfig.cc index 97aeeede06..42a8509462 100644 --- a/src/helper/ChildConfig.cc +++ b/src/helper/ChildConfig.cc @@ -113,7 +113,9 @@ Helper::ChildConfig::parseConfig() self_destruct(); return; } - } else { + } else if (strncmp(token, "reservation-timeout=", 20) == 0) + reservationTimeout = xatoui(token + 20); + else { debugs(0, DBG_PARSE_NOTE(DBG_IMPORTANT), "ERROR: Undefined option: " << token << "."); self_destruct(); return; diff --git a/src/helper/ChildConfig.h b/src/helper/ChildConfig.h index 94b6c8e2d0..65254f1d83 100644 --- a/src/helper/ChildConfig.h +++ b/src/helper/ChildConfig.h @@ -104,6 +104,9 @@ public: * special configurations, for example when redirector_bypass is used. */ bool defaultQueueSize; + + /// older stateful helper server reservations may be forgotten + time_t reservationTimeout = 64; // reservation-timeout }; } // namespace Helper diff --git a/src/helper/Makefile.am b/src/helper/Makefile.am index ad2d7711df..226d8c8c48 100644 --- a/src/helper/Makefile.am +++ b/src/helper/Makefile.am @@ -17,6 +17,8 @@ libhelper_la_SOURCES = \ Reply.cc \ Reply.h \ Request.h \ - ResultCode.h + ResultCode.h \ + ReservationId.h \ + ReservationId.cc EXTRA_DIST= protocol_defines.h diff --git a/src/helper/Reply.cc b/src/helper/Reply.cc index 5282234253..2311789941 100644 --- a/src/helper/Reply.cc +++ b/src/helper/Reply.cc @@ -17,8 +17,7 @@ #include "SquidString.h" Helper::Reply::Reply() : - result(Helper::Unknown), - whichServer(NULL) + result(Helper::Unknown) { } diff --git a/src/helper/Reply.h b/src/helper/Reply.h index bddaf8b310..14a56f21a8 100644 --- a/src/helper/Reply.h +++ b/src/helper/Reply.h @@ -11,6 +11,7 @@ #include "base/CbcPointer.h" #include "helper/forward.h" +#include "helper/ReservationId.h" #include "helper/ResultCode.h" #include "MemBuf.h" #include "Notes.h" @@ -33,7 +34,7 @@ private: Reply &operator =(const Helper::Reply &r); public: - explicit Reply(Helper::ResultCode res) : result(res), notes(), whichServer(NULL) {} + explicit Reply(Helper::ResultCode res) : result(res), notes() {} /// Creates a NULL reply Reply(); @@ -60,9 +61,8 @@ public: // list of key=value pairs the helper produced NotePairs notes; - /// for stateful replies the responding helper 'server' needs to be preserved across callbacks - CbcPointer whichServer; - + /// The stateful replies should include the reservation ID + Helper::ReservationId reservationId; private: void parseResponseKeys(); diff --git a/src/helper/ReservationId.cc b/src/helper/ReservationId.cc new file mode 100644 index 0000000000..b3888b5617 --- /dev/null +++ b/src/helper/ReservationId.cc @@ -0,0 +1,16 @@ +#include "helper/ReservationId.h" + +Helper::ReservationId +Helper::ReservationId::Next() +{ + static uint64_t Ids = 0; + Helper::ReservationId reservation; + reservation.id = ++Ids; + return reservation; +} + +std::ostream & +Helper::ReservationId::print(std::ostream &os) const +{ + return os << "hlpRes" << id; +} diff --git a/src/helper/ReservationId.h b/src/helper/ReservationId.h new file mode 100644 index 0000000000..d2d19af018 --- /dev/null +++ b/src/helper/ReservationId.h @@ -0,0 +1,63 @@ + +/* + * Copyright (C) 1996-2017 The Squid Software Foundation and contributors + * + * Squid software is distributed under GPLv2+ license and includes + * contributions from numerous individuals and organizations. + * Please see the COPYING and CONTRIBUTORS files for details. + */ + +#ifndef _SQUID_SRC_HELPER_RESERVATIONID_H +#define _SQUID_SRC_HELPER_RESERVATIONID_H + +#include + +namespace Helper +{ +/// a (temporary) lock on a (stateful) helper channel +class ReservationId +{ +public: + static ReservationId Next(); + + bool reserved() const { return id > 0; } + + explicit operator bool() const { return reserved(); } + bool operator !() const { return !reserved(); } + bool operator ==(const Helper::ReservationId &other) const { return id == other.id; } + bool operator !=(const Helper::ReservationId &other) const { return !(*this == other); } + + void clear() { id = 0; } + uint64_t value() const {return id;} + + /// dumps the reservation info for debugging + std::ostream &print(std::ostream &os) const; + +private: + uint64_t id = 0; ///< uniquely identifies this reservation +}; + +}; // namespace Helper + +inline std::ostream & +operator <<(std::ostream &os, const Helper::ReservationId &id) +{ + return id.print(os); +} + +namespace std { +/// default hash functor to support std::unordered_map +template <> +struct hash +{ + typedef Helper::ReservationId argument_type; + typedef std::size_t result_type; + result_type operator()(const argument_type &reservation) const noexcept + { + std::hash aHash; + return aHash(reservation.value()); + } +}; +} + +#endif