2 * Copyright (C) 1996-2025 The Squid Software Foundation and contributors
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
9 /* DEBUG: section 84 Helper process maintenance */
12 #include "base/AsyncCbdataCalls.h"
13 #include "base/Packable.h"
16 #include "comm/Connection.h"
17 #include "comm/Read.h"
18 #include "comm/Write.h"
19 #include "debug/Messages.h"
22 #include "format/Quoting.h"
24 #include "helper/Reply.h"
25 #include "helper/Request.h"
27 #include "SquidConfig.h"
29 #include "SquidMath.h"
33 // helper_stateful_server::data uses explicit alloc()/freeOne() */
36 #define HELPER_MAX_ARGS 64
38 /// The maximum allowed request retries.
41 /// Helpers input buffer size.
42 const size_t ReadBufSize(32*1024);
44 static IOCB helperHandleRead
;
45 static IOCB helperStatefulHandleRead
;
46 static void Enqueue(Helper::Client
*, Helper::Xaction
*);
47 static Helper::Session
*GetFirstAvailable(const Helper::Client::Pointer
&);
48 static helper_stateful_server
*StatefulGetFirstAvailable(const statefulhelper::Pointer
&);
49 static void helperDispatch(Helper::Session
*, Helper::Xaction
*);
50 static void helperStatefulDispatch(helper_stateful_server
* srv
, Helper::Xaction
* r
);
51 static void helperKickQueue(const Helper::Client::Pointer
&);
52 static void helperStatefulKickQueue(const statefulhelper::Pointer
&);
53 static void helperStatefulServerDone(helper_stateful_server
* srv
);
54 static void StatefulEnqueue(statefulhelper
* hlp
, Helper::Xaction
* r
);
56 CBDATA_NAMESPACED_CLASS_INIT(Helper
, Session
);
57 CBDATA_CLASS_INIT(helper_stateful_server
);
59 InstanceIdDefinitions(Helper::SessionBase
, "Hlpr");
62 Helper::SessionBase::initStats()
72 Helper::SessionBase::closePipesSafely()
75 shutdown(writePipe
->fd
, SD_BOTH
);
79 if (readPipe
->fd
== writePipe
->fd
)
87 if (WaitForSingleObject(hIpc
, 5000) != WAIT_OBJECT_0
) {
89 debugs(84, DBG_IMPORTANT
, "WARNING: " << helper().id_name
<<
90 " #" << index
<< " (PID " << (long int)pid
<< ") didn't exit in 5 seconds");
98 Helper::SessionBase::closeWritePipeSafely()
101 shutdown(writePipe
->fd
, (readPipe
->fd
== writePipe
->fd
? SD_BOTH
: SD_SEND
));
104 flags
.closing
= true;
105 if (readPipe
->fd
== writePipe
->fd
)
111 if (WaitForSingleObject(hIpc
, 5000) != WAIT_OBJECT_0
) {
113 debugs(84, DBG_IMPORTANT
, "WARNING: " << helper().id_name
<<
114 " #" << index
<< " (PID " << (long int)pid
<< ") didn't exit in 5 seconds");
122 Helper::SessionBase::dropQueued()
124 while (!requests
.empty()) {
125 // XXX: re-schedule these on another helper?
126 const auto r
= requests
.front();
127 requests
.pop_front();
128 r
->reply
.result
= Helper::Unknown
;
129 helper().callBack(*r
);
134 Helper::SessionBase::~SessionBase()
137 memFreeBuf(rbuf_sz
, rbuf
);
142 Helper::Session::~Session()
153 if (Comm::IsConnOpen(writePipe
))
154 closeWritePipeSafely();
156 dlinkDelete(&link
, &parent
->servers
);
158 assert(parent
->childs
.n_running
> 0);
159 -- parent
->childs
.n_running
;
161 assert(requests
.empty());
165 Helper::Session::dropQueued()
167 SessionBase::dropQueued();
168 requestsIndex
.clear();
171 helper_stateful_server::~helper_stateful_server()
173 /* TODO: walk the local queue of requests and carry them all out */
174 if (Comm::IsConnOpen(writePipe
))
175 closeWritePipeSafely();
177 parent
->cancelReservation(reservationId
);
179 dlinkDelete(&link
, &parent
->servers
);
181 assert(parent
->childs
.n_running
> 0);
182 -- parent
->childs
.n_running
;
184 assert(requests
.empty());
188 Helper::Client::openSessions()
194 const char *args
[HELPER_MAX_ARGS
+1]; // save space for a NULL terminator
195 char fd_note_buf
[FD_DESC_SZ
];
203 // Helps reducing diff. TODO: remove
204 const auto hlp
= this;
206 if (hlp
->cmdline
== nullptr)
209 progname
= hlp
->cmdline
->key
;
211 if ((s
= strrchr(progname
, '/')))
212 shortname
= xstrdup(s
+ 1);
214 shortname
= xstrdup(progname
);
216 /* figure out how many new child are actually needed. */
217 int need_new
= hlp
->childs
.needNew();
219 debugs(84, Important(19), "helperOpenServers: Starting " << need_new
<< "/" << hlp
->childs
.n_max
<< " '" << shortname
<< "' processes");
222 debugs(84, Important(20), "helperOpenServers: No '" << shortname
<< "' processes needed.");
225 procname
= (char *)xmalloc(strlen(shortname
) + 3);
227 snprintf(procname
, strlen(shortname
) + 3, "(%s)", shortname
);
229 args
[nargs
] = procname
;
232 for (w
= hlp
->cmdline
->next
; w
&& nargs
< HELPER_MAX_ARGS
; w
= w
->next
) {
233 args
[nargs
] = w
->key
;
237 args
[nargs
] = nullptr;
240 assert(nargs
<= HELPER_MAX_ARGS
);
242 int successfullyStarted
= 0;
244 for (k
= 0; k
< need_new
; ++k
) {
247 pid
= ipcCreate(hlp
->ipc_type
,
257 debugs(84, DBG_IMPORTANT
, "WARNING: Cannot run '" << progname
<< "' process.");
261 ++successfullyStarted
;
262 ++ hlp
->childs
.n_running
;
263 ++ hlp
->childs
.n_active
;
264 const auto srv
= new Helper::Session
;
268 srv
->addr
= hlp
->addr
;
269 srv
->readPipe
= new Comm::Connection
;
270 srv
->readPipe
->fd
= rfd
;
271 srv
->writePipe
= new Comm::Connection
;
272 srv
->writePipe
->fd
= wfd
;
273 srv
->rbuf
= (char *)memAllocBuf(ReadBufSize
, &srv
->rbuf_sz
);
274 srv
->wqueue
= new MemBuf
;
276 srv
->nextRequestId
= 0;
277 srv
->replyXaction
= nullptr;
278 srv
->ignoreToEom
= false;
280 dlinkAddTail(srv
, &srv
->link
, &hlp
->servers
);
283 snprintf(fd_note_buf
, FD_DESC_SZ
, "%s #%d", shortname
, k
+ 1);
284 fd_note(rfd
, fd_note_buf
);
286 snprintf(fd_note_buf
, FD_DESC_SZ
, "reading %s #%d", shortname
, k
+ 1);
287 fd_note(rfd
, fd_note_buf
);
288 snprintf(fd_note_buf
, FD_DESC_SZ
, "writing %s #%d", shortname
, k
+ 1);
289 fd_note(wfd
, fd_note_buf
);
292 commSetNonBlocking(rfd
);
295 commSetNonBlocking(wfd
);
297 AsyncCall::Pointer closeCall
= asyncCall(5, 4, "Helper::Session::HelperServerClosed", cbdataDialer(SessionBase::HelperServerClosed
,
298 static_cast<Helper::SessionBase
*>(srv
)));
300 comm_add_close_handler(rfd
, closeCall
);
302 if (hlp
->timeout
&& hlp
->childs
.concurrency
) {
303 AsyncCall::Pointer timeoutCall
= commCbCall(84, 4, "Helper::Session::requestTimeout",
304 CommTimeoutCbPtrFun(Helper::Session::requestTimeout
, srv
));
305 commSetConnTimeout(srv
->readPipe
, hlp
->timeout
, timeoutCall
);
308 AsyncCall::Pointer call
= commCbCall(5,4, "helperHandleRead",
309 CommIoCbPtrFun(helperHandleRead
, srv
));
310 comm_read(srv
->readPipe
, srv
->rbuf
, srv
->rbuf_sz
- 1, call
);
313 // Call handleFewerServers() before hlp->last_restart is updated because
314 // that method uses last_restart to measure the delay since previous start.
315 // TODO: Refactor last_restart code to measure failure frequency rather than
316 // detecting a helper #X failure that is being close to the helper #Y start.
317 if (successfullyStarted
< need_new
)
318 hlp
->handleFewerServers(false);
320 hlp
->last_restart
= squid_curtime
;
321 safe_free(shortname
);
323 helperKickQueue(hlp
);
327 statefulhelper::openSessions()
330 const char *args
[HELPER_MAX_ARGS
+1]; // save space for a NULL terminator
331 char fd_note_buf
[FD_DESC_SZ
];
333 // Helps reducing diff. TODO: remove
334 const auto hlp
= this;
336 if (hlp
->cmdline
== nullptr)
339 if (hlp
->childs
.concurrency
)
340 debugs(84, DBG_CRITICAL
, "ERROR: concurrency= is not yet supported for stateful helpers ('" << hlp
->cmdline
<< "')");
342 char *progname
= hlp
->cmdline
->key
;
345 if ((s
= strrchr(progname
, '/')))
346 shortname
= xstrdup(s
+ 1);
348 shortname
= xstrdup(progname
);
350 /* figure out haw mant new helpers are needed. */
351 int need_new
= hlp
->childs
.needNew();
353 debugs(84, DBG_IMPORTANT
, "helperOpenServers: Starting " << need_new
<< "/" << hlp
->childs
.n_max
<< " '" << shortname
<< "' processes");
356 debugs(84, DBG_IMPORTANT
, "helperStatefulOpenServers: No '" << shortname
<< "' processes needed.");
359 char *procname
= (char *)xmalloc(strlen(shortname
) + 3);
361 snprintf(procname
, strlen(shortname
) + 3, "(%s)", shortname
);
363 args
[nargs
] = procname
;
366 for (wordlist
*w
= hlp
->cmdline
->next
; w
&& nargs
< HELPER_MAX_ARGS
; w
= w
->next
) {
367 args
[nargs
] = w
->key
;
371 args
[nargs
] = nullptr;
374 assert(nargs
<= HELPER_MAX_ARGS
);
376 int successfullyStarted
= 0;
378 for (int k
= 0; k
< need_new
; ++k
) {
383 pid_t pid
= ipcCreate(hlp
->ipc_type
,
393 debugs(84, DBG_IMPORTANT
, "WARNING: Cannot run '" << progname
<< "' process.");
397 ++successfullyStarted
;
398 ++ hlp
->childs
.n_running
;
399 ++ hlp
->childs
.n_active
;
400 helper_stateful_server
*srv
= new helper_stateful_server
;
404 srv
->addr
= hlp
->addr
;
405 srv
->readPipe
= new Comm::Connection
;
406 srv
->readPipe
->fd
= rfd
;
407 srv
->writePipe
= new Comm::Connection
;
408 srv
->writePipe
->fd
= wfd
;
409 srv
->rbuf
= (char *)memAllocBuf(ReadBufSize
, &srv
->rbuf_sz
);
412 srv
->reservationStart
= 0;
414 dlinkAddTail(srv
, &srv
->link
, &hlp
->servers
);
417 snprintf(fd_note_buf
, FD_DESC_SZ
, "%s #%d", shortname
, k
+ 1);
418 fd_note(rfd
, fd_note_buf
);
420 snprintf(fd_note_buf
, FD_DESC_SZ
, "reading %s #%d", shortname
, k
+ 1);
421 fd_note(rfd
, fd_note_buf
);
422 snprintf(fd_note_buf
, FD_DESC_SZ
, "writing %s #%d", shortname
, k
+ 1);
423 fd_note(wfd
, fd_note_buf
);
426 commSetNonBlocking(rfd
);
429 commSetNonBlocking(wfd
);
431 AsyncCall::Pointer closeCall
= asyncCall(5, 4, "helper_stateful_server::HelperServerClosed", cbdataDialer(Helper::SessionBase::HelperServerClosed
,
432 static_cast<Helper::SessionBase
*>(srv
)));
434 comm_add_close_handler(rfd
, closeCall
);
436 AsyncCall::Pointer call
= commCbCall(5,4, "helperStatefulHandleRead",
437 CommIoCbPtrFun(helperStatefulHandleRead
, srv
));
438 comm_read(srv
->readPipe
, srv
->rbuf
, srv
->rbuf_sz
- 1, call
);
441 // Call handleFewerServers() before hlp->last_restart is updated because
442 // that method uses last_restart to measure the delay since previous start.
443 // TODO: Refactor last_restart code to measure failure frequency rather than
444 // detecting a helper #X failure that is being close to the helper #Y start.
445 if (successfullyStarted
< need_new
)
446 hlp
->handleFewerServers(false);
448 hlp
->last_restart
= squid_curtime
;
449 safe_free(shortname
);
451 helperStatefulKickQueue(hlp
);
455 Helper::Client::submitRequest(Helper::Xaction
* const r
)
457 if (const auto srv
= GetFirstAvailable(this))
458 helperDispatch(srv
, r
);
465 /// handles helperSubmit() and helperStatefulSubmit() failures
467 SubmissionFailure(const Helper::Client::Pointer
&hlp
, HLPCB
*callback
, void *data
)
469 auto result
= Helper::Error
;
471 debugs(84, 3, "no helper");
472 result
= Helper::Unknown
;
474 // else pretend the helper has responded with ERR
476 callback(data
, Helper::Reply(result
));
480 helperSubmit(const Helper::Client::Pointer
&hlp
, const char * const buf
, HLPCB
* const callback
, void * const data
)
482 if (!hlp
|| !hlp
->trySubmit(buf
, callback
, data
))
483 SubmissionFailure(hlp
, callback
, data
);
486 /// whether queuing an additional request would overload the helper
488 Helper::Client::queueFull() const {
489 return stats
.queue_size
>= static_cast<int>(childs
.queue_size
);
493 Helper::Client::overloaded() const {
494 return stats
.queue_size
> static_cast<int>(childs
.queue_size
);
497 /// synchronizes queue-dependent measurements with the current queue state
499 Helper::Client::syncQueueStats()
503 debugs(84, 5, id_name
<< " still overloaded; dropped " << droppedRequests
);
505 overloadStart
= squid_curtime
;
506 debugs(84, 3, id_name
<< " became overloaded");
510 debugs(84, 5, id_name
<< " is no longer overloaded");
511 if (droppedRequests
) {
512 debugs(84, DBG_IMPORTANT
, "helper " << id_name
<<
513 " is no longer overloaded after dropping " << droppedRequests
<<
514 " requests in " << (squid_curtime
- overloadStart
) << " seconds");
522 /// prepares the helper for request submission
523 /// returns true if and only if the submission should proceed
524 /// may kill Squid if the helper remains overloaded for too long
526 Helper::Client::prepSubmit()
528 // re-sync for the configuration may have changed since the last submission
531 // Nothing special to do if the new request does not overload (i.e., the
532 // queue is not even full yet) or only _starts_ overloading this helper
533 // (i.e., the queue is currently at its limit).
537 if (squid_curtime
- overloadStart
<= 180)
538 return true; // also OK: overload has not persisted long enough to panic
540 if (childs
.onPersistentOverload
== ChildConfig::actDie
)
541 fatalf("Too many queued %s requests; see on-persistent-overload.", id_name
);
543 if (!droppedRequests
) {
544 debugs(84, DBG_IMPORTANT
, "WARNING: dropping requests to overloaded " <<
545 id_name
<< " helper configured with on-persistent-overload=err");
548 debugs(84, 3, "failed to send " << droppedRequests
<< " helper requests to " << id_name
);
553 Helper::Client::trySubmit(const char * const buf
, HLPCB
* const callback
, void * const data
)
556 return false; // request was dropped
558 submit(buf
, callback
, data
); // will send or queue
559 return true; // request submitted or queued
562 /// dispatches or enqueues a helper requests; does not enforce queue limits
564 Helper::Client::submit(const char * const buf
, HLPCB
* const callback
, void * const data
)
566 const auto r
= new Xaction(callback
, data
, buf
);
568 debugs(84, DBG_DATA
, Raw("buf", buf
, strlen(buf
)));
572 Helper::Client::callBack(Xaction
&r
)
574 const auto callback
= r
.request
.callback
;
577 r
.request
.callback
= nullptr;
578 void *cbdata
= nullptr;
579 if (cbdataReferenceValidDone(r
.request
.data
, &cbdata
))
580 callback(cbdata
, r
.reply
);
583 /// Submit request or callback the caller with a Helper::Error error.
584 /// If the reservation is not set then reserves a new helper.
586 helperStatefulSubmit(const statefulhelper::Pointer
&hlp
, const char *buf
, HLPCB
* callback
, void *data
, const Helper::ReservationId
& reservation
)
588 if (!hlp
|| !hlp
->trySubmit(buf
, callback
, data
, reservation
))
589 SubmissionFailure(hlp
, callback
, data
);
592 /// If possible, submit request. Otherwise, either kill Squid or return false.
594 statefulhelper::trySubmit(const char *buf
, HLPCB
* callback
, void *data
, const Helper::ReservationId
& reservation
)
597 return false; // request was dropped
599 submit(buf
, callback
, data
, reservation
); // will send or queue
600 return true; // request submitted or queued
604 statefulhelper::reserveServer(helper_stateful_server
* srv
)
606 // clear any old reservation
607 if (srv
->reserved()) {
608 reservations
.erase(srv
->reservationId
);
609 srv
->clearReservation();
613 reservations
.insert(Reservations::value_type(srv
->reservationId
, srv
));
617 statefulhelper::cancelReservation(const Helper::ReservationId reservation
)
619 const auto it
= reservations
.find(reservation
);
620 if (it
== reservations
.end())
623 helper_stateful_server
*srv
= it
->second
;
624 reservations
.erase(it
);
625 srv
->clearReservation();
627 // schedule a queue kick
628 AsyncCall::Pointer call
= asyncCall(5,4, "helperStatefulServerDone", cbdataDialer(helperStatefulServerDone
, srv
));
629 ScheduleCallHere(call
);
632 helper_stateful_server
*
633 statefulhelper::findServer(const Helper::ReservationId
& reservation
)
635 const auto it
= reservations
.find(reservation
);
636 if (it
== reservations
.end())
642 helper_stateful_server::reserve()
644 assert(!reservationId
);
645 reservationStart
= squid_curtime
;
646 reservationId
= Helper::ReservationId::Next();
647 debugs(84, 3, "srv-" << index
<< " reservation id = " << reservationId
);
651 helper_stateful_server::clearReservation()
653 debugs(84, 3, "srv-" << index
<< " reservation id = " << reservationId
);
659 reservationId
.clear();
660 reservationStart
= 0;
664 statefulhelper::submit(const char *buf
, HLPCB
* callback
, void *data
, const Helper::ReservationId
& reservation
)
666 Helper::Xaction
*r
= new Helper::Xaction(callback
, data
, buf
);
668 if (buf
&& reservation
) {
669 debugs(84, 5, reservation
);
670 helper_stateful_server
*lastServer
= findServer(reservation
);
672 debugs(84, DBG_CRITICAL
, "ERROR: Helper " << id_name
<< " reservation expired (" << reservation
<< ")");
673 r
->reply
.result
= Helper::TimedOut
;
678 debugs(84, 5, "StatefulSubmit dispatching");
679 helperStatefulDispatch(lastServer
, r
);
681 helper_stateful_server
*srv
;
682 if ((srv
= StatefulGetFirstAvailable(this))) {
684 helperStatefulDispatch(srv
, r
); // may delete r
686 StatefulEnqueue(this, r
);
689 // r may be dangling here
694 Helper::Client::packStatsInto(Packable
* const p
, const char * const label
) const
697 p
->appendf("%s:\n", label
);
699 p
->appendf(" program: %s\n", cmdline
->key
);
700 p
->appendf(" number active: %d of %d (%d shutting down)\n", childs
.n_active
, childs
.n_max
, (childs
.n_running
- childs
.n_active
));
701 p
->appendf(" requests sent: %d\n", stats
.requests
);
702 p
->appendf(" replies received: %d\n", stats
.replies
);
703 p
->appendf(" requests timedout: %d\n", stats
.timedout
);
704 p
->appendf(" queue length: %d\n", stats
.queue_size
);
705 p
->appendf(" avg service time: %d msec\n", stats
.avg_svc_time
);
707 p
->appendf("%7s\t%7s\t%7s\t%11s\t%11s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
719 for (dlink_node
*link
= servers
.head
; link
; link
= link
->next
) {
720 const auto srv
= static_cast<SessionBase
*>(link
->data
);
722 const auto xaction
= srv
->requests
.empty() ? nullptr : srv
->requests
.front();
723 double tt
= 0.001 * (xaction
? tvSubMsec(xaction
->request
.dispatch_time
, current_time
) : tvSubMsec(srv
->dispatch_time
, srv
->answer_time
));
724 p
->appendf("%7u\t%7d\t%7d\t%11" PRIu64
"\t%11" PRIu64
"\t%11" PRIu64
"\t%c%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
731 srv
->stats
.pending
? 'B' : ' ',
732 srv
->flags
.writing
? 'W' : ' ',
733 srv
->flags
.closing
? 'C' : ' ',
734 srv
->reserved() ? 'R' : ' ',
735 srv
->flags
.shutdown
? 'S' : ' ',
736 xaction
&& xaction
->request
.placeholder
? 'P' : ' ',
739 xaction
? Format::QuoteMimeBlob(xaction
->request
.buf
) : "(none)");
742 p
->append("\nFlags key:\n"
747 " S\tSHUTDOWN PENDING\n"
748 " P\tPLACEHOLDER\n", 101);
752 Helper::Client::willOverload() const {
753 return queueFull() && !(childs
.needNew() || GetFirstAvailable(this));
756 Helper::Client::Pointer
757 Helper::Client::Make(const char * const name
)
759 return new Client(name
);
762 statefulhelper::Pointer
763 statefulhelper::Make(const char *name
)
765 return new statefulhelper(name
);
769 helperShutdown(const Helper::Client::Pointer
&hlp
)
771 dlink_node
*link
= hlp
->servers
.head
;
774 const auto srv
= static_cast<Helper::Session
*>(link
->data
);
777 if (srv
->flags
.shutdown
) {
778 debugs(84, 3, "helperShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " has already SHUT DOWN.");
782 assert(hlp
->childs
.n_active
> 0);
783 -- hlp
->childs
.n_active
;
784 srv
->flags
.shutdown
= true; /* request it to shut itself down */
786 if (srv
->flags
.closing
) {
787 debugs(84, 3, "helperShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " is CLOSING.");
791 if (srv
->stats
.pending
) {
792 debugs(84, 3, "helperShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " is BUSY.");
796 debugs(84, 3, "helperShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " shutting down.");
797 /* the rest of the details is dealt with in the helperServerFree
800 srv
->closePipesSafely();
803 Assure(!hlp
->childs
.n_active
);
808 helperStatefulShutdown(const statefulhelper::Pointer
&hlp
)
810 dlink_node
*link
= hlp
->servers
.head
;
811 helper_stateful_server
*srv
;
814 srv
= (helper_stateful_server
*)link
->data
;
817 if (srv
->flags
.shutdown
) {
818 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " has already SHUT DOWN.");
822 assert(hlp
->childs
.n_active
> 0);
823 -- hlp
->childs
.n_active
;
824 srv
->flags
.shutdown
= true; /* request it to shut itself down */
826 if (srv
->stats
.pending
) {
827 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " is BUSY.");
831 if (srv
->flags
.closing
) {
832 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " is CLOSING.");
836 if (srv
->reserved()) {
838 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " is RESERVED. Closing anyway.");
840 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " is RESERVED. Not Shutting Down Yet.");
845 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " shutting down.");
847 /* the rest of the details is dealt with in the helperStatefulServerFree
850 srv
->closePipesSafely();
854 Helper::Client::~Client()
856 /* note, don't free id_name, it probably points to static memory */
858 // A non-empty queue would leak Helper::Xaction objects, stalling any
859 // pending (and even future collapsed) transactions. To avoid stalling
860 // transactions, we must dropQueued(). We ought to do that when we
861 // discover that no progress is possible rather than here because
862 // reference counting may keep this object alive for a long time.
863 assert(queue
.empty());
867 Helper::Client::handleKilledServer(SessionBase
* const srv
)
869 if (!srv
->flags
.shutdown
) {
870 assert(childs
.n_active
> 0);
872 debugs(84, DBG_CRITICAL
, "WARNING: " << id_name
<< " #" << srv
->index
<< " exited");
874 handleFewerServers(srv
->stats
.replies
>= 1);
876 if (childs
.needNew() > 0) {
877 srv
->flags
.shutdown
= true;
882 if (!childs
.n_active
)
887 Helper::Client::dropQueued()
892 Assure(!childs
.n_active
);
893 Assure(!GetFirstAvailable(this));
895 // no helper servers means nobody can advance our queued transactions
897 debugs(80, DBG_CRITICAL
, "ERROR: Dropping " << queue
.size() << ' ' <<
898 id_name
<< " helper requests due to lack of helper processes");
899 // similar to SessionBase::dropQueued()
900 while (const auto r
= nextRequest()) {
901 r
->reply
.result
= Helper::Unknown
;
908 Helper::Client::handleFewerServers(const bool madeProgress
)
910 const auto needNew
= childs
.needNew();
913 return; // some server(s) have died, but we still have enough
915 debugs(80, DBG_IMPORTANT
, "Too few " << id_name
<< " processes are running (need " << needNew
<< "/" << childs
.n_max
<< ")" <<
916 Debug::Extra
<< "active processes: " << childs
.n_active
<<
917 Debug::Extra
<< "processes configured to start at (re)configuration: " << childs
.n_startup
);
919 if (childs
.n_active
< childs
.n_startup
&& last_restart
> squid_curtime
- 30) {
921 debugs(80, DBG_CRITICAL
, "ERROR: The " << id_name
<< " helpers are crashing too rapidly, need help!");
923 fatalf("The %s helpers are crashing too rapidly, need help!", id_name
);
928 Helper::SessionBase::HelperServerClosed(SessionBase
* const srv
)
930 srv
->helper().handleKilledServer(srv
);
936 Helper::Session::popRequest(const int request_number
)
938 Xaction
*r
= nullptr;
939 if (parent
->childs
.concurrency
) {
940 // If concurrency supported retrieve request from ID
941 const auto it
= requestsIndex
.find(request_number
);
942 if (it
!= requestsIndex
.end()) {
944 requests
.erase(it
->second
);
945 requestsIndex
.erase(it
);
947 } else if(!requests
.empty()) {
948 // Else get the first request from queue, if any
949 r
= requests
.front();
950 requests
.pop_front();
956 /// Calls back with a pointer to the buffer with the helper output
958 helperReturnBuffer(Helper::Session
* srv
, const Helper::Client::Pointer
&hlp
, char * const msg
, const size_t msgSize
, const char * const msgEnd
)
960 if (Helper::Xaction
*r
= srv
->replyXaction
) {
961 const bool hasSpace
= r
->reply
.accumulate(msg
, msgSize
);
963 debugs(84, DBG_IMPORTANT
, "ERROR: Disconnecting from a " <<
964 "helper that overflowed " << srv
->rbuf_sz
<< "-byte " <<
965 "Squid input buffer: " << hlp
->id_name
<< " #" << srv
->index
);
966 srv
->closePipesSafely();
971 return; // We are waiting for more data.
974 if (cbdataReferenceValid(r
->request
.data
)) {
976 if (r
->reply
.result
== Helper::BrokenHelper
&& r
->request
.retries
< MAX_RETRIES
) {
977 debugs(84, DBG_IMPORTANT
, "ERROR: helper: " << r
->reply
<< ", attempt #" << (r
->request
.retries
+ 1) << " of 2");
984 -- srv
->stats
.pending
;
985 ++ srv
->stats
.replies
;
987 ++ hlp
->stats
.replies
;
989 srv
->answer_time
= current_time
;
991 srv
->dispatch_time
= r
->request
.dispatch_time
;
993 hlp
->stats
.avg_svc_time
=
994 Math::intAverage(hlp
->stats
.avg_svc_time
,
995 tvSubMsec(r
->request
.dispatch_time
, current_time
),
996 hlp
->stats
.replies
, REDIRECT_AV_FACTOR
);
998 // release or re-submit parsedRequestXaction object
999 srv
->replyXaction
= nullptr;
1001 ++r
->request
.retries
;
1002 hlp
->submitRequest(r
);
1007 if (hlp
->timeout
&& hlp
->childs
.concurrency
)
1008 srv
->checkForTimedOutRequests(hlp
->retryTimedOut
);
1010 if (!srv
->flags
.shutdown
) {
1011 helperKickQueue(hlp
);
1012 } else if (!srv
->flags
.closing
&& !srv
->stats
.pending
) {
1013 srv
->closeWritePipeSafely();
1018 helperHandleRead(const Comm::ConnectionPointer
&conn
, char *, size_t len
, Comm::Flag flag
, int, void *data
)
1020 const auto srv
= static_cast<Helper::Session
*>(data
);
1021 const auto hlp
= srv
->parent
;
1022 assert(cbdataReferenceValid(data
));
1024 /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
1026 if (flag
== Comm::ERR_CLOSING
) {
1030 assert(conn
->fd
== srv
->readPipe
->fd
);
1032 debugs(84, 5, "helperHandleRead: " << len
<< " bytes from " << hlp
->id_name
<< " #" << srv
->index
);
1034 if (flag
!= Comm::OK
|| len
== 0) {
1035 srv
->closePipesSafely();
1039 srv
->roffset
+= len
;
1040 srv
->rbuf
[srv
->roffset
] = '\0';
1041 debugs(84, DBG_DATA
, Raw("accumulated", srv
->rbuf
, srv
->roffset
));
1043 if (!srv
->stats
.pending
&& !srv
->stats
.timedout
) {
1044 /* someone spoke without being spoken to */
1045 debugs(84, DBG_IMPORTANT
, "ERROR: Killing helper process after an unexpected read from " <<
1046 hlp
->id_name
<< " #" << srv
->index
<< ", " << (int)len
<<
1047 " bytes '" << srv
->rbuf
<< "'");
1050 srv
->rbuf
[0] = '\0';
1051 srv
->closePipesSafely();
1055 bool needsMore
= false;
1056 char *msg
= srv
->rbuf
;
1057 while (*msg
&& !needsMore
) {
1059 char *eom
= strchr(msg
, hlp
->eom
);
1062 debugs(84, 3, "helperHandleRead: end of reply found");
1063 if (eom
> msg
&& eom
[-1] == '\r' && hlp
->eom
== '\n') {
1065 // rewind to the \r octet which is the real terminal now
1066 // and remember that we have to skip forward 2 places now.
1073 if (!srv
->ignoreToEom
&& !srv
->replyXaction
) {
1075 if (hlp
->childs
.concurrency
) {
1077 i
= strtol(msg
, &e
, 10);
1078 // Do we need to check for e == msg? Means wrong response from helper.
1079 // Will be dropped as "unexpected reply on channel 0"
1080 needsMore
= !(xisspace(*e
) || (eom
&& e
== eom
));
1083 while (*msg
&& xisspace(*msg
))
1085 } // else not enough data to compute request number
1087 if (!(srv
->replyXaction
= srv
->popRequest(i
))) {
1088 if (srv
->stats
.timedout
) {
1089 debugs(84, 3, "Timedout reply received for request-ID: " << i
<< " , ignore");
1091 debugs(84, DBG_IMPORTANT
, "ERROR: helperHandleRead: unexpected reply on channel " <<
1092 i
<< " from " << hlp
->id_name
<< " #" << srv
->index
<<
1093 " '" << srv
->rbuf
<< "'");
1095 srv
->ignoreToEom
= true;
1097 } // else we need to just append reply data to the current Xaction
1100 size_t msgSize
= eom
? eom
- msg
: (srv
->roffset
- (msg
- srv
->rbuf
));
1101 assert(msgSize
<= srv
->rbuf_sz
);
1102 helperReturnBuffer(srv
, hlp
, msg
, msgSize
, eom
);
1103 msg
+= msgSize
+ skip
;
1104 assert(static_cast<size_t>(msg
- srv
->rbuf
) <= srv
->rbuf_sz
);
1106 // The next message should not ignored.
1107 if (eom
&& srv
->ignoreToEom
)
1108 srv
->ignoreToEom
= false;
1110 assert(skip
== 0 && eom
== nullptr);
1114 size_t msgSize
= (srv
->roffset
- (msg
- srv
->rbuf
));
1115 assert(msgSize
<= srv
->rbuf_sz
);
1116 memmove(srv
->rbuf
, msg
, msgSize
);
1117 srv
->roffset
= msgSize
;
1118 srv
->rbuf
[srv
->roffset
] = '\0';
1120 // All of the responses parsed and msg points at the end of read data
1121 assert(static_cast<size_t>(msg
- srv
->rbuf
) == srv
->roffset
);
1125 if (Comm::IsConnOpen(srv
->readPipe
) && !fd_table
[srv
->readPipe
->fd
].closing()) {
1126 int spaceSize
= srv
->rbuf_sz
- srv
->roffset
- 1;
1127 assert(spaceSize
>= 0);
1129 AsyncCall::Pointer call
= commCbCall(5,4, "helperHandleRead",
1130 CommIoCbPtrFun(helperHandleRead
, srv
));
1131 comm_read(srv
->readPipe
, srv
->rbuf
+ srv
->roffset
, spaceSize
, call
);
1136 helperStatefulHandleRead(const Comm::ConnectionPointer
&conn
, char *, size_t len
, Comm::Flag flag
, int, void *data
)
1139 helper_stateful_server
*srv
= (helper_stateful_server
*)data
;
1140 const auto hlp
= srv
->parent
;
1141 assert(cbdataReferenceValid(data
));
1143 /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
1145 if (flag
== Comm::ERR_CLOSING
) {
1149 assert(conn
->fd
== srv
->readPipe
->fd
);
1151 debugs(84, 5, "helperStatefulHandleRead: " << len
<< " bytes from " <<
1152 hlp
->id_name
<< " #" << srv
->index
);
1154 if (flag
!= Comm::OK
|| len
== 0) {
1155 srv
->closePipesSafely();
1159 srv
->roffset
+= len
;
1160 srv
->rbuf
[srv
->roffset
] = '\0';
1161 debugs(84, DBG_DATA
, Raw("accumulated", srv
->rbuf
, srv
->roffset
));
1163 if (srv
->requests
.empty()) {
1164 /* someone spoke without being spoken to */
1165 debugs(84, DBG_IMPORTANT
, "ERROR: Killing helper process after an unexpected read from " <<
1166 hlp
->id_name
<< " #" << srv
->index
<< ", " << (int)len
<<
1167 " bytes '" << srv
->rbuf
<< "'");
1170 srv
->closePipesSafely();
1174 if ((t
= strchr(srv
->rbuf
, hlp
->eom
))) {
1175 debugs(84, 3, "helperStatefulHandleRead: end of reply found");
1177 if (t
> srv
->rbuf
&& t
[-1] == '\r' && hlp
->eom
== '\n') {
1179 // rewind to the \r octet which is the real terminal now
1186 const auto r
= srv
->requests
.front();
1188 if (!r
->reply
.accumulate(srv
->rbuf
, t
? (t
- srv
->rbuf
) : srv
->roffset
)) {
1189 debugs(84, DBG_IMPORTANT
, "ERROR: Disconnecting from a " <<
1190 "helper that overflowed " << srv
->rbuf_sz
<< "-byte " <<
1191 "Squid input buffer: " << hlp
->id_name
<< " #" << srv
->index
);
1192 srv
->closePipesSafely();
1196 * BUG: the below assumes that only one response per read() was received and discards any octets remaining.
1197 * Doing this prohibits concurrency support with multiple replies per read().
1198 * TODO: check that read() setup on these buffers pays attention to roffest!=0
1199 * TODO: check that replies bigger than the buffer are discarded and do not to affect future replies
1204 /* end of reply found */
1205 srv
->requests
.pop_front(); // we already have it in 'r'
1208 if (cbdataReferenceValid(r
->request
.data
)) {
1209 r
->reply
.finalize();
1210 r
->reply
.reservationId
= srv
->reservationId
;
1213 debugs(84, DBG_IMPORTANT
, "StatefulHandleRead: no callback data registered");
1219 -- srv
->stats
.pending
;
1220 ++ srv
->stats
.replies
;
1222 ++ hlp
->stats
.replies
;
1223 srv
->answer_time
= current_time
;
1224 hlp
->stats
.avg_svc_time
=
1225 Math::intAverage(hlp
->stats
.avg_svc_time
,
1226 tvSubMsec(srv
->dispatch_time
, current_time
),
1227 hlp
->stats
.replies
, REDIRECT_AV_FACTOR
);
1230 helperStatefulServerDone(srv
);
1232 hlp
->cancelReservation(srv
->reservationId
);
1235 if (Comm::IsConnOpen(srv
->readPipe
) && !fd_table
[srv
->readPipe
->fd
].closing()) {
1236 int spaceSize
= srv
->rbuf_sz
- 1;
1238 AsyncCall::Pointer call
= commCbCall(5,4, "helperStatefulHandleRead",
1239 CommIoCbPtrFun(helperStatefulHandleRead
, srv
));
1240 comm_read(srv
->readPipe
, srv
->rbuf
, spaceSize
, call
);
1244 /// Handles a request when all running helpers, if any, are busy.
1246 Enqueue(Helper::Client
* const hlp
, Helper::Xaction
* const r
)
1249 ++ hlp
->stats
.queue_size
;
1251 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1252 if (hlp
->childs
.needNew() > 0) {
1253 hlp
->openSessions();
1257 if (hlp
->stats
.queue_size
< (int)hlp
->childs
.queue_size
)
1260 if (squid_curtime
- hlp
->last_queue_warn
< 600)
1263 if (shutting_down
|| reconfiguring
)
1266 hlp
->last_queue_warn
= squid_curtime
;
1268 debugs(84, DBG_CRITICAL
, "WARNING: All " << hlp
->childs
.n_active
<< "/" << hlp
->childs
.n_max
<< " " << hlp
->id_name
<< " processes are busy.");
1269 debugs(84, DBG_CRITICAL
, "WARNING: " << hlp
->stats
.queue_size
<< " pending requests queued");
1270 debugs(84, DBG_CRITICAL
, "WARNING: Consider increasing the number of " << hlp
->id_name
<< " processes in your config file.");
1274 StatefulEnqueue(statefulhelper
* hlp
, Helper::Xaction
* r
)
1277 ++ hlp
->stats
.queue_size
;
1279 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1280 if (hlp
->childs
.needNew() > 0) {
1281 hlp
->openSessions();
1285 if (hlp
->stats
.queue_size
< (int)hlp
->childs
.queue_size
)
1288 if (squid_curtime
- hlp
->last_queue_warn
< 600)
1291 if (shutting_down
|| reconfiguring
)
1294 hlp
->last_queue_warn
= squid_curtime
;
1296 debugs(84, DBG_CRITICAL
, "WARNING: All " << hlp
->childs
.n_active
<< "/" << hlp
->childs
.n_max
<< " " << hlp
->id_name
<< " processes are busy.");
1297 debugs(84, DBG_CRITICAL
, "WARNING: " << hlp
->stats
.queue_size
<< " pending requests queued");
1298 debugs(84, DBG_CRITICAL
, "WARNING: Consider increasing the number of " << hlp
->id_name
<< " processes in your config file.");
1302 Helper::Client::nextRequest()
1307 auto *r
= queue
.front();
1313 static Helper::Session
*
1314 GetFirstAvailable(const Helper::Client::Pointer
&hlp
)
1317 Helper::Session
*selected
= nullptr;
1318 debugs(84, 5, "GetFirstAvailable: Running servers " << hlp
->childs
.n_running
);
1320 if (hlp
->childs
.n_running
== 0)
1323 /* Find "least" loaded helper (approx) */
1324 for (n
= hlp
->servers
.head
; n
!= nullptr; n
= n
->next
) {
1325 const auto srv
= static_cast<Helper::Session
*>(n
->data
);
1327 if (selected
&& selected
->stats
.pending
<= srv
->stats
.pending
)
1330 if (srv
->flags
.shutdown
)
1333 if (!srv
->stats
.pending
)
1345 debugs(84, 5, "GetFirstAvailable: None available.");
1349 if (selected
->stats
.pending
>= (hlp
->childs
.concurrency
? hlp
->childs
.concurrency
: 1)) {
1350 debugs(84, 3, "GetFirstAvailable: Least-loaded helper is fully loaded!");
1354 debugs(84, 5, "GetFirstAvailable: returning srv-" << selected
->index
);
1358 static helper_stateful_server
*
1359 StatefulGetFirstAvailable(const statefulhelper::Pointer
&hlp
)
1362 helper_stateful_server
*srv
= nullptr;
1363 helper_stateful_server
*oldestReservedServer
= nullptr;
1364 debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp
->childs
.n_running
);
1366 if (hlp
->childs
.n_running
== 0)
1369 for (n
= hlp
->servers
.head
; n
!= nullptr; n
= n
->next
) {
1370 srv
= (helper_stateful_server
*)n
->data
;
1372 if (srv
->stats
.pending
)
1375 if (srv
->reserved()) {
1376 if ((squid_curtime
- srv
->reservationStart
) > hlp
->childs
.reservationTimeout
) {
1377 if (!oldestReservedServer
)
1378 oldestReservedServer
= srv
;
1379 else if (oldestReservedServer
->reservationStart
< srv
->reservationStart
)
1380 oldestReservedServer
= srv
;
1381 debugs(84, 5, "the earlier reserved server is the srv-" << oldestReservedServer
->index
);
1386 if (srv
->flags
.shutdown
)
1389 debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv
->index
);
1393 if (oldestReservedServer
) {
1394 debugs(84, 5, "expired reservation " << oldestReservedServer
->reservationId
<< " for srv-" << oldestReservedServer
->index
);
1395 return oldestReservedServer
;
1398 debugs(84, 5, "StatefulGetFirstAvailable: None available.");
1403 helperDispatchWriteDone(const Comm::ConnectionPointer
&, char *, size_t, Comm::Flag flag
, int, void *data
)
1405 const auto srv
= static_cast<Helper::Session
*>(data
);
1407 srv
->writebuf
->clean();
1408 delete srv
->writebuf
;
1409 srv
->writebuf
= nullptr;
1410 srv
->flags
.writing
= false;
1412 if (flag
!= Comm::OK
) {
1413 /* Helper server has crashed */
1414 debugs(84, DBG_CRITICAL
, "helperDispatch: Helper " << srv
->parent
->id_name
<< " #" << srv
->index
<< " has crashed");
1418 if (!srv
->wqueue
->isNull()) {
1419 srv
->writebuf
= srv
->wqueue
;
1420 srv
->wqueue
= new MemBuf
;
1421 srv
->flags
.writing
= true;
1422 AsyncCall::Pointer call
= commCbCall(5,5, "helperDispatchWriteDone",
1423 CommIoCbPtrFun(helperDispatchWriteDone
, srv
));
1424 Comm::Write(srv
->writePipe
, srv
->writebuf
->content(), srv
->writebuf
->contentSize(), call
, nullptr);
1429 helperDispatch(Helper::Session
* const srv
, Helper::Xaction
* const r
)
1431 const auto hlp
= srv
->parent
;
1432 const uint64_t reqId
= ++srv
->nextRequestId
;
1434 if (!cbdataReferenceValid(r
->request
.data
)) {
1435 debugs(84, DBG_IMPORTANT
, "ERROR: helperDispatch: invalid callback data");
1440 r
->request
.Id
= reqId
;
1441 const auto it
= srv
->requests
.insert(srv
->requests
.end(), r
);
1442 r
->request
.dispatch_time
= current_time
;
1444 if (srv
->wqueue
->isNull())
1445 srv
->wqueue
->init();
1447 if (hlp
->childs
.concurrency
) {
1448 srv
->requestsIndex
.insert(Helper::Session::RequestIndex::value_type(reqId
, it
));
1449 assert(srv
->requestsIndex
.size() == srv
->requests
.size());
1450 srv
->wqueue
->appendf("%" PRIu64
" %s", reqId
, r
->request
.buf
);
1452 srv
->wqueue
->append(r
->request
.buf
, strlen(r
->request
.buf
));
1454 if (!srv
->flags
.writing
) {
1455 assert(nullptr == srv
->writebuf
);
1456 srv
->writebuf
= srv
->wqueue
;
1457 srv
->wqueue
= new MemBuf
;
1458 srv
->flags
.writing
= true;
1459 AsyncCall::Pointer call
= commCbCall(5,5, "helperDispatchWriteDone",
1460 CommIoCbPtrFun(helperDispatchWriteDone
, srv
));
1461 Comm::Write(srv
->writePipe
, srv
->writebuf
->content(), srv
->writebuf
->contentSize(), call
, nullptr);
1464 debugs(84, 5, "helperDispatch: Request sent to " << hlp
->id_name
<< " #" << srv
->index
<< ", " << strlen(r
->request
.buf
) << " bytes");
1467 ++ srv
->stats
.pending
;
1468 ++ hlp
->stats
.requests
;
1472 helperStatefulDispatchWriteDone(const Comm::ConnectionPointer
&, char *, size_t, Comm::Flag
, int, void *)
1476 helperStatefulDispatch(helper_stateful_server
* srv
, Helper::Xaction
* r
)
1478 const auto hlp
= srv
->parent
;
1480 if (!cbdataReferenceValid(r
->request
.data
)) {
1481 debugs(84, DBG_IMPORTANT
, "ERROR: helperStatefulDispatch: invalid callback data");
1483 hlp
->cancelReservation(srv
->reservationId
);
1487 debugs(84, 9, "helperStatefulDispatch busying helper " << hlp
->id_name
<< " #" << srv
->index
);
1489 assert(srv
->reservationId
);
1490 r
->reply
.reservationId
= srv
->reservationId
;
1492 if (r
->request
.placeholder
== 1) {
1493 /* a callback is needed before this request can _use_ a helper. */
1494 /* we don't care about releasing this helper. The request NEVER
1495 * gets to the helper. So we throw away the return code */
1496 r
->reply
.result
= Helper::Unknown
;
1498 /* throw away the placeholder */
1500 /* and push the queue. Note that the callback may have submitted a new
1501 * request to the helper which is why we test for the request */
1503 if (!srv
->requests
.size())
1504 helperStatefulServerDone(srv
);
1509 srv
->requests
.push_back(r
);
1510 srv
->dispatch_time
= current_time
;
1511 AsyncCall::Pointer call
= commCbCall(5,5, "helperStatefulDispatchWriteDone",
1512 CommIoCbPtrFun(helperStatefulDispatchWriteDone
, srv
));
1513 Comm::Write(srv
->writePipe
, r
->request
.buf
, strlen(r
->request
.buf
), call
, nullptr);
1514 debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
1515 hlp
->id_name
<< " #" << srv
->index
<< ", " <<
1516 (int) strlen(r
->request
.buf
) << " bytes");
1519 ++ srv
->stats
.pending
;
1520 ++ hlp
->stats
.requests
;
1524 helperKickQueue(const Helper::Client::Pointer
&hlp
)
1526 Helper::Xaction
*r
= nullptr;
1527 Helper::Session
*srv
= nullptr;
1529 while ((srv
= GetFirstAvailable(hlp
)) && (r
= hlp
->nextRequest()))
1530 helperDispatch(srv
, r
);
1532 if (!hlp
->childs
.n_active
)
1537 helperStatefulKickQueue(const statefulhelper::Pointer
&hlp
)
1540 helper_stateful_server
*srv
;
1541 while ((srv
= StatefulGetFirstAvailable(hlp
)) && (r
= hlp
->nextRequest())) {
1542 debugs(84, 5, "found srv-" << srv
->index
);
1543 hlp
->reserveServer(srv
);
1544 helperStatefulDispatch(srv
, r
);
1547 if (!hlp
->childs
.n_active
)
1552 helperStatefulServerDone(helper_stateful_server
* srv
)
1554 if (!srv
->flags
.shutdown
) {
1555 helperStatefulKickQueue(srv
->parent
);
1556 } else if (!srv
->flags
.closing
&& !srv
->reserved() && !srv
->stats
.pending
) {
1557 srv
->closeWritePipeSafely();
1563 Helper::Session::checkForTimedOutRequests(bool const retry
)
1565 assert(parent
->childs
.concurrency
);
1566 while(!requests
.empty() && requests
.front()->request
.timedOut(parent
->timeout
)) {
1567 const auto r
= requests
.front();
1568 RequestIndex::iterator it
;
1569 it
= requestsIndex
.find(r
->request
.Id
);
1570 assert(it
!= requestsIndex
.end());
1571 requestsIndex
.erase(it
);
1572 requests
.pop_front();
1573 debugs(84, 2, "Request " << r
->request
.Id
<< " timed-out, remove it from queue");
1574 bool retried
= false;
1575 if (retry
&& r
->request
.retries
< MAX_RETRIES
&& cbdataReferenceValid(r
->request
.data
)) {
1576 debugs(84, 2, "Retry request " << r
->request
.Id
);
1577 ++r
->request
.retries
;
1578 parent
->submitRequest(r
);
1580 } else if (cbdataReferenceValid(r
->request
.data
)) {
1581 if (!parent
->onTimedOutResponse
.isEmpty()) {
1582 if (r
->reply
.accumulate(parent
->onTimedOutResponse
.rawContent(), parent
->onTimedOutResponse
.length()))
1583 r
->reply
.finalize();
1585 r
->reply
.result
= Helper::TimedOut
;
1586 parent
->callBack(*r
);
1588 r
->reply
.result
= Helper::TimedOut
;
1589 parent
->callBack(*r
);
1594 ++parent
->stats
.timedout
;
1601 Helper::Session::requestTimeout(const CommTimeoutCbParams
&io
)
1603 debugs(26, 3, io
.conn
);
1604 const auto srv
= static_cast<Session
*>(io
.data
);
1606 srv
->checkForTimedOutRequests(srv
->parent
->retryTimedOut
);
1608 debugs(84, 3, io
.conn
<< " establish a new timeout");
1609 AsyncCall::Pointer timeoutCall
= commCbCall(84, 4, "Helper::Session::requestTimeout",
1610 CommTimeoutCbPtrFun(Session::requestTimeout
, srv
));
1612 const time_t timeSpent
= srv
->requests
.empty() ? 0 : (squid_curtime
- srv
->requests
.front()->request
.dispatch_time
.tv_sec
);
1613 const time_t minimumNewTimeout
= 1; // second
1614 const auto timeLeft
= max(minimumNewTimeout
, srv
->parent
->timeout
- timeSpent
);
1616 commSetConnTimeout(io
.conn
, timeLeft
, timeoutCall
);