2 * Copyright (C) 1996-2022 The Squid Software Foundation and contributors
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
9 /* DEBUG: section 84 Helper process maintenance */
12 #include "base/AsyncCbdataCalls.h"
13 #include "base/Packable.h"
16 #include "comm/Connection.h"
17 #include "comm/Read.h"
18 #include "comm/Write.h"
19 #include "debug/Messages.h"
22 #include "format/Quoting.h"
24 #include "helper/Reply.h"
25 #include "helper/Request.h"
27 #include "SquidConfig.h"
29 #include "SquidMath.h"
33 // helper_stateful_server::data uses explicit alloc()/freeOne() */
36 #define HELPER_MAX_ARGS 64
38 /// The maximum allowed request retries.
41 /// Helpers input buffer size.
42 const size_t ReadBufSize(32*1024);
44 static IOCB helperHandleRead
;
45 static IOCB helperStatefulHandleRead
;
46 static void Enqueue(helper
* hlp
, Helper::Xaction
*);
47 static helper_server
*GetFirstAvailable(const helper
* hlp
);
48 static helper_stateful_server
*StatefulGetFirstAvailable(statefulhelper
* hlp
);
49 static void helperDispatch(helper_server
* srv
, Helper::Xaction
* r
);
50 static void helperStatefulDispatch(helper_stateful_server
* srv
, Helper::Xaction
* r
);
51 static void helperKickQueue(helper
* hlp
);
52 static void helperStatefulKickQueue(statefulhelper
* hlp
);
53 static void helperStatefulServerDone(helper_stateful_server
* srv
);
54 static void StatefulEnqueue(statefulhelper
* hlp
, Helper::Xaction
* r
);
56 CBDATA_CLASS_INIT(helper
);
57 CBDATA_CLASS_INIT(helper_server
);
58 CBDATA_CLASS_INIT(statefulhelper
);
59 CBDATA_CLASS_INIT(helper_stateful_server
);
61 InstanceIdDefinitions(HelperServerBase
, "Hlpr");
64 HelperServerBase::initStats()
74 HelperServerBase::closePipesSafely(const char *id_name
)
77 shutdown(writePipe
->fd
, SD_BOTH
);
81 if (readPipe
->fd
== writePipe
->fd
)
89 if (WaitForSingleObject(hIpc
, 5000) != WAIT_OBJECT_0
) {
91 debugs(84, DBG_IMPORTANT
, "WARNING: " << id_name
<<
92 " #" << index
<< " (PID " << (long int)pid
<< ") didn't exit in 5 seconds");
102 HelperServerBase::closeWritePipeSafely(const char *id_name
)
105 shutdown(writePipe
->fd
, (readPipe
->fd
== writePipe
->fd
? SD_BOTH
: SD_SEND
));
108 flags
.closing
= true;
109 if (readPipe
->fd
== writePipe
->fd
)
115 if (WaitForSingleObject(hIpc
, 5000) != WAIT_OBJECT_0
) {
117 debugs(84, DBG_IMPORTANT
, "WARNING: " << id_name
<<
118 " #" << index
<< " (PID " << (long int)pid
<< ") didn't exit in 5 seconds");
128 HelperServerBase::dropQueued()
130 while (!requests
.empty()) {
131 // XXX: re-schedule these on another helper?
132 Helper::Xaction
*r
= requests
.front();
133 requests
.pop_front();
135 if (cbdataReferenceValidDone(r
->request
.data
, &cbdata
)) {
136 r
->reply
.result
= Helper::Unknown
;
137 r
->request
.callback(cbdata
, r
->reply
);
144 HelperServerBase::~HelperServerBase()
147 memFreeBuf(rbuf_sz
, rbuf
);
152 helper_server::~helper_server()
163 if (Comm::IsConnOpen(writePipe
))
164 closeWritePipeSafely(parent
->id_name
);
166 dlinkDelete(&link
, &parent
->servers
);
168 assert(parent
->childs
.n_running
> 0);
169 -- parent
->childs
.n_running
;
171 assert(requests
.empty());
172 cbdataReferenceDone(parent
);
176 helper_server::dropQueued()
178 HelperServerBase::dropQueued();
179 requestsIndex
.clear();
182 helper_stateful_server::~helper_stateful_server()
184 /* TODO: walk the local queue of requests and carry them all out */
185 if (Comm::IsConnOpen(writePipe
))
186 closeWritePipeSafely(parent
->id_name
);
188 parent
->cancelReservation(reservationId
);
190 dlinkDelete(&link
, &parent
->servers
);
192 assert(parent
->childs
.n_running
> 0);
193 -- parent
->childs
.n_running
;
195 assert(requests
.empty());
197 cbdataReferenceDone(parent
);
201 helperOpenServers(helper
* hlp
)
207 const char *args
[HELPER_MAX_ARGS
+1]; // save space for a NULL terminator
208 char fd_note_buf
[FD_DESC_SZ
];
218 if (hlp
->cmdline
== nullptr)
221 progname
= hlp
->cmdline
->key
;
223 if ((s
= strrchr(progname
, '/')))
224 shortname
= xstrdup(s
+ 1);
226 shortname
= xstrdup(progname
);
228 /* figure out how many new child are actually needed. */
229 int need_new
= hlp
->childs
.needNew();
231 debugs(84, Important(19), "helperOpenServers: Starting " << need_new
<< "/" << hlp
->childs
.n_max
<< " '" << shortname
<< "' processes");
234 debugs(84, Important(20), "helperOpenServers: No '" << shortname
<< "' processes needed.");
237 procname
= (char *)xmalloc(strlen(shortname
) + 3);
239 snprintf(procname
, strlen(shortname
) + 3, "(%s)", shortname
);
241 args
[nargs
] = procname
;
244 for (w
= hlp
->cmdline
->next
; w
&& nargs
< HELPER_MAX_ARGS
; w
= w
->next
) {
245 args
[nargs
] = w
->key
;
249 args
[nargs
] = nullptr;
252 assert(nargs
<= HELPER_MAX_ARGS
);
254 for (k
= 0; k
< need_new
; ++k
) {
257 pid
= ipcCreate(hlp
->ipc_type
,
267 debugs(84, DBG_IMPORTANT
, "WARNING: Cannot run '" << progname
<< "' process.");
271 ++ hlp
->childs
.n_running
;
272 ++ hlp
->childs
.n_active
;
273 srv
= new helper_server
;
277 srv
->addr
= hlp
->addr
;
278 srv
->readPipe
= new Comm::Connection
;
279 srv
->readPipe
->fd
= rfd
;
280 srv
->writePipe
= new Comm::Connection
;
281 srv
->writePipe
->fd
= wfd
;
282 srv
->rbuf
= (char *)memAllocBuf(ReadBufSize
, &srv
->rbuf_sz
);
283 srv
->wqueue
= new MemBuf
;
285 srv
->nextRequestId
= 0;
286 srv
->replyXaction
= nullptr;
287 srv
->ignoreToEom
= false;
288 srv
->parent
= cbdataReference(hlp
);
289 dlinkAddTail(srv
, &srv
->link
, &hlp
->servers
);
292 snprintf(fd_note_buf
, FD_DESC_SZ
, "%s #%d", shortname
, k
+ 1);
293 fd_note(rfd
, fd_note_buf
);
295 snprintf(fd_note_buf
, FD_DESC_SZ
, "reading %s #%d", shortname
, k
+ 1);
296 fd_note(rfd
, fd_note_buf
);
297 snprintf(fd_note_buf
, FD_DESC_SZ
, "writing %s #%d", shortname
, k
+ 1);
298 fd_note(wfd
, fd_note_buf
);
301 commSetNonBlocking(rfd
);
304 commSetNonBlocking(wfd
);
306 AsyncCall::Pointer closeCall
= asyncCall(5,4, "helper_server::HelperServerClosed", cbdataDialer(helper_server::HelperServerClosed
, srv
));
307 comm_add_close_handler(rfd
, closeCall
);
309 if (hlp
->timeout
&& hlp
->childs
.concurrency
) {
310 AsyncCall::Pointer timeoutCall
= commCbCall(84, 4, "helper_server::requestTimeout",
311 CommTimeoutCbPtrFun(helper_server::requestTimeout
, srv
));
312 commSetConnTimeout(srv
->readPipe
, hlp
->timeout
, timeoutCall
);
315 AsyncCall::Pointer call
= commCbCall(5,4, "helperHandleRead",
316 CommIoCbPtrFun(helperHandleRead
, srv
));
317 comm_read(srv
->readPipe
, srv
->rbuf
, srv
->rbuf_sz
- 1, call
);
320 hlp
->last_restart
= squid_curtime
;
321 safe_free(shortname
);
323 helperKickQueue(hlp
);
329 * helperStatefulOpenServers: create the stateful child helper processes
332 helperStatefulOpenServers(statefulhelper
* hlp
)
335 const char *args
[HELPER_MAX_ARGS
+1]; // save space for a NULL terminator
336 char fd_note_buf
[FD_DESC_SZ
];
339 if (hlp
->cmdline
== nullptr)
342 if (hlp
->childs
.concurrency
)
343 debugs(84, DBG_CRITICAL
, "ERROR: concurrency= is not yet supported for stateful helpers ('" << hlp
->cmdline
<< "')");
345 char *progname
= hlp
->cmdline
->key
;
348 if ((s
= strrchr(progname
, '/')))
349 shortname
= xstrdup(s
+ 1);
351 shortname
= xstrdup(progname
);
353 /* figure out haw mant new helpers are needed. */
354 int need_new
= hlp
->childs
.needNew();
356 debugs(84, DBG_IMPORTANT
, "helperOpenServers: Starting " << need_new
<< "/" << hlp
->childs
.n_max
<< " '" << shortname
<< "' processes");
359 debugs(84, DBG_IMPORTANT
, "helperStatefulOpenServers: No '" << shortname
<< "' processes needed.");
362 char *procname
= (char *)xmalloc(strlen(shortname
) + 3);
364 snprintf(procname
, strlen(shortname
) + 3, "(%s)", shortname
);
366 args
[nargs
] = procname
;
369 for (wordlist
*w
= hlp
->cmdline
->next
; w
&& nargs
< HELPER_MAX_ARGS
; w
= w
->next
) {
370 args
[nargs
] = w
->key
;
374 args
[nargs
] = nullptr;
377 assert(nargs
<= HELPER_MAX_ARGS
);
379 for (int k
= 0; k
< need_new
; ++k
) {
384 pid_t pid
= ipcCreate(hlp
->ipc_type
,
394 debugs(84, DBG_IMPORTANT
, "WARNING: Cannot run '" << progname
<< "' process.");
398 ++ hlp
->childs
.n_running
;
399 ++ hlp
->childs
.n_active
;
400 helper_stateful_server
*srv
= new helper_stateful_server
;
404 srv
->addr
= hlp
->addr
;
405 srv
->readPipe
= new Comm::Connection
;
406 srv
->readPipe
->fd
= rfd
;
407 srv
->writePipe
= new Comm::Connection
;
408 srv
->writePipe
->fd
= wfd
;
409 srv
->rbuf
= (char *)memAllocBuf(ReadBufSize
, &srv
->rbuf_sz
);
411 srv
->parent
= cbdataReference(hlp
);
412 srv
->reservationStart
= 0;
414 dlinkAddTail(srv
, &srv
->link
, &hlp
->servers
);
417 snprintf(fd_note_buf
, FD_DESC_SZ
, "%s #%d", shortname
, k
+ 1);
418 fd_note(rfd
, fd_note_buf
);
420 snprintf(fd_note_buf
, FD_DESC_SZ
, "reading %s #%d", shortname
, k
+ 1);
421 fd_note(rfd
, fd_note_buf
);
422 snprintf(fd_note_buf
, FD_DESC_SZ
, "writing %s #%d", shortname
, k
+ 1);
423 fd_note(wfd
, fd_note_buf
);
426 commSetNonBlocking(rfd
);
429 commSetNonBlocking(wfd
);
431 AsyncCall::Pointer closeCall
= asyncCall(5,4, "helper_stateful_server::HelperServerClosed", cbdataDialer(helper_stateful_server::HelperServerClosed
, srv
));
432 comm_add_close_handler(rfd
, closeCall
);
434 AsyncCall::Pointer call
= commCbCall(5,4, "helperStatefulHandleRead",
435 CommIoCbPtrFun(helperStatefulHandleRead
, srv
));
436 comm_read(srv
->readPipe
, srv
->rbuf
, srv
->rbuf_sz
- 1, call
);
439 hlp
->last_restart
= squid_curtime
;
440 safe_free(shortname
);
442 helperStatefulKickQueue(hlp
);
446 helper::submitRequest(Helper::Xaction
*r
)
450 if ((srv
= GetFirstAvailable(this)))
451 helperDispatch(srv
, r
);
458 /// handles helperSubmit() and helperStatefulSubmit() failures
460 SubmissionFailure(helper
*hlp
, HLPCB
*callback
, void *data
)
462 auto result
= Helper::Error
;
464 debugs(84, 3, "no helper");
465 result
= Helper::Unknown
;
467 // else pretend the helper has responded with ERR
469 callback(data
, Helper::Reply(result
));
473 helperSubmit(helper
* hlp
, const char *buf
, HLPCB
* callback
, void *data
)
475 if (!hlp
|| !hlp
->trySubmit(buf
, callback
, data
))
476 SubmissionFailure(hlp
, callback
, data
);
479 /// whether queuing an additional request would overload the helper
481 helper::queueFull() const {
482 return stats
.queue_size
>= static_cast<int>(childs
.queue_size
);
486 helper::overloaded() const {
487 return stats
.queue_size
> static_cast<int>(childs
.queue_size
);
490 /// synchronizes queue-dependent measurements with the current queue state
492 helper::syncQueueStats()
496 debugs(84, 5, id_name
<< " still overloaded; dropped " << droppedRequests
);
498 overloadStart
= squid_curtime
;
499 debugs(84, 3, id_name
<< " became overloaded");
503 debugs(84, 5, id_name
<< " is no longer overloaded");
504 if (droppedRequests
) {
505 debugs(84, DBG_IMPORTANT
, "helper " << id_name
<<
506 " is no longer overloaded after dropping " << droppedRequests
<<
507 " requests in " << (squid_curtime
- overloadStart
) << " seconds");
515 /// prepares the helper for request submission
516 /// returns true if and only if the submission should proceed
517 /// may kill Squid if the helper remains overloaded for too long
521 // re-sync for the configuration may have changed since the last submission
524 // Nothing special to do if the new request does not overload (i.e., the
525 // queue is not even full yet) or only _starts_ overloading this helper
526 // (i.e., the queue is currently at its limit).
530 if (squid_curtime
- overloadStart
<= 180)
531 return true; // also OK: overload has not persisted long enough to panic
533 if (childs
.onPersistentOverload
== Helper::ChildConfig::actDie
)
534 fatalf("Too many queued %s requests; see on-persistent-overload.", id_name
);
536 if (!droppedRequests
) {
537 debugs(84, DBG_IMPORTANT
, "WARNING: dropping requests to overloaded " <<
538 id_name
<< " helper configured with on-persistent-overload=err");
541 debugs(84, 3, "failed to send " << droppedRequests
<< " helper requests to " << id_name
);
546 helper::trySubmit(const char *buf
, HLPCB
* callback
, void *data
)
549 return false; // request was dropped
551 submit(buf
, callback
, data
); // will send or queue
552 return true; // request submitted or queued
555 /// dispatches or enqueues a helper requests; does not enforce queue limits
557 helper::submit(const char *buf
, HLPCB
* callback
, void *data
)
559 Helper::Xaction
*r
= new Helper::Xaction(callback
, data
, buf
);
561 debugs(84, DBG_DATA
, Raw("buf", buf
, strlen(buf
)));
564 /// Submit request or callback the caller with a Helper::Error error.
565 /// If the reservation is not set then reserves a new helper.
567 helperStatefulSubmit(statefulhelper
* hlp
, const char *buf
, HLPCB
* callback
, void *data
, const Helper::ReservationId
& reservation
)
569 if (!hlp
|| !hlp
->trySubmit(buf
, callback
, data
, reservation
))
570 SubmissionFailure(hlp
, callback
, data
);
573 /// If possible, submit request. Otherwise, either kill Squid or return false.
575 statefulhelper::trySubmit(const char *buf
, HLPCB
* callback
, void *data
, const Helper::ReservationId
& reservation
)
578 return false; // request was dropped
580 submit(buf
, callback
, data
, reservation
); // will send or queue
581 return true; // request submitted or queued
585 statefulhelper::reserveServer(helper_stateful_server
* srv
)
587 // clear any old reservation
588 if (srv
->reserved()) {
589 reservations
.erase(srv
->reservationId
);
590 srv
->clearReservation();
594 reservations
.insert(Reservations::value_type(srv
->reservationId
, srv
));
598 statefulhelper::cancelReservation(const Helper::ReservationId reservation
)
600 const auto it
= reservations
.find(reservation
);
601 if (it
== reservations
.end())
604 helper_stateful_server
*srv
= it
->second
;
605 reservations
.erase(it
);
606 srv
->clearReservation();
608 // schedule a queue kick
609 AsyncCall::Pointer call
= asyncCall(5,4, "helperStatefulServerDone", cbdataDialer(helperStatefulServerDone
, srv
));
610 ScheduleCallHere(call
);
613 helper_stateful_server
*
614 statefulhelper::findServer(const Helper::ReservationId
& reservation
)
616 const auto it
= reservations
.find(reservation
);
617 if (it
== reservations
.end())
623 helper_stateful_server::reserve()
625 assert(!reservationId
);
626 reservationStart
= squid_curtime
;
627 reservationId
= Helper::ReservationId::Next();
628 debugs(84, 3, "srv-" << index
<< " reservation id = " << reservationId
);
632 helper_stateful_server::clearReservation()
634 debugs(84, 3, "srv-" << index
<< " reservation id = " << reservationId
);
640 reservationId
.clear();
641 reservationStart
= 0;
645 statefulhelper::submit(const char *buf
, HLPCB
* callback
, void *data
, const Helper::ReservationId
& reservation
)
647 Helper::Xaction
*r
= new Helper::Xaction(callback
, data
, buf
);
649 if (buf
&& reservation
) {
650 debugs(84, 5, reservation
);
651 helper_stateful_server
*lastServer
= findServer(reservation
);
653 debugs(84, DBG_CRITICAL
, "ERROR: Helper " << id_name
<< " reservation expired (" << reservation
<< ")");
654 r
->reply
.result
= Helper::TimedOut
;
655 r
->request
.callback(r
->request
.data
, r
->reply
);
659 debugs(84, 5, "StatefulSubmit dispatching");
660 helperStatefulDispatch(lastServer
, r
);
662 helper_stateful_server
*srv
;
663 if ((srv
= StatefulGetFirstAvailable(this))) {
665 helperStatefulDispatch(srv
, r
);
667 StatefulEnqueue(this, r
);
670 debugs(84, DBG_DATA
, "placeholder: '" << r
->request
.placeholder
<<
671 "', " << Raw("buf", buf
, (!buf
?0:strlen(buf
))));
677 helper::packStatsInto(Packable
*p
, const char *label
) const
680 p
->appendf("%s:\n", label
);
682 p
->appendf(" program: %s\n", cmdline
->key
);
683 p
->appendf(" number active: %d of %d (%d shutting down)\n", childs
.n_active
, childs
.n_max
, (childs
.n_running
- childs
.n_active
));
684 p
->appendf(" requests sent: %d\n", stats
.requests
);
685 p
->appendf(" replies received: %d\n", stats
.replies
);
686 p
->appendf(" requests timedout: %d\n", stats
.timedout
);
687 p
->appendf(" queue length: %d\n", stats
.queue_size
);
688 p
->appendf(" avg service time: %d msec\n", stats
.avg_svc_time
);
690 p
->appendf("%7s\t%7s\t%7s\t%11s\t%11s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
702 for (dlink_node
*link
= servers
.head
; link
; link
= link
->next
) {
703 HelperServerBase
*srv
= static_cast<HelperServerBase
*>(link
->data
);
705 Helper::Xaction
*xaction
= srv
->requests
.empty() ? nullptr : srv
->requests
.front();
706 double tt
= 0.001 * (xaction
? tvSubMsec(xaction
->request
.dispatch_time
, current_time
) : tvSubMsec(srv
->dispatch_time
, srv
->answer_time
));
707 p
->appendf("%7u\t%7d\t%7d\t%11" PRIu64
"\t%11" PRIu64
"\t%11" PRIu64
"\t%c%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
714 srv
->stats
.pending
? 'B' : ' ',
715 srv
->flags
.writing
? 'W' : ' ',
716 srv
->flags
.closing
? 'C' : ' ',
717 srv
->reserved() ? 'R' : ' ',
718 srv
->flags
.shutdown
? 'S' : ' ',
719 xaction
&& xaction
->request
.placeholder
? 'P' : ' ',
722 xaction
? Format::QuoteMimeBlob(xaction
->request
.buf
) : "(none)");
725 p
->append("\nFlags key:\n"
730 " S\tSHUTDOWN PENDING\n"
731 " P\tPLACEHOLDER\n", 101);
735 helper::willOverload() const {
736 return queueFull() && !(childs
.needNew() || GetFirstAvailable(this));
740 helperShutdown(helper
* hlp
)
742 dlink_node
*link
= hlp
->servers
.head
;
746 srv
= (helper_server
*)link
->data
;
749 if (srv
->flags
.shutdown
) {
750 debugs(84, 3, "helperShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " has already SHUT DOWN.");
754 assert(hlp
->childs
.n_active
> 0);
755 -- hlp
->childs
.n_active
;
756 srv
->flags
.shutdown
= true; /* request it to shut itself down */
758 if (srv
->flags
.closing
) {
759 debugs(84, 3, "helperShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " is CLOSING.");
763 if (srv
->stats
.pending
) {
764 debugs(84, 3, "helperShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " is BUSY.");
768 debugs(84, 3, "helperShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " shutting down.");
769 /* the rest of the details is dealt with in the helperServerFree
772 srv
->closePipesSafely(hlp
->id_name
);
777 helperStatefulShutdown(statefulhelper
* hlp
)
779 dlink_node
*link
= hlp
->servers
.head
;
780 helper_stateful_server
*srv
;
783 srv
= (helper_stateful_server
*)link
->data
;
786 if (srv
->flags
.shutdown
) {
787 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " has already SHUT DOWN.");
791 assert(hlp
->childs
.n_active
> 0);
792 -- hlp
->childs
.n_active
;
793 srv
->flags
.shutdown
= true; /* request it to shut itself down */
795 if (srv
->stats
.pending
) {
796 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " is BUSY.");
800 if (srv
->flags
.closing
) {
801 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " is CLOSING.");
805 if (srv
->reserved()) {
807 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " is RESERVED. Closing anyway.");
809 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " is RESERVED. Not Shutting Down Yet.");
814 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " shutting down.");
816 /* the rest of the details is dealt with in the helperStatefulServerFree
819 srv
->closePipesSafely(hlp
->id_name
);
825 /* note, don't free id_name, it probably points to static memory */
827 // TODO: if the queue is not empty it will leak Helper::Request's
829 debugs(84, DBG_CRITICAL
, "WARNING: freeing " << id_name
<< " helper with " << stats
.queue_size
<< " requests queued");
833 helper::handleKilledServer(HelperServerBase
*srv
, bool &needsNewServers
)
835 needsNewServers
= false;
836 if (!srv
->flags
.shutdown
) {
837 assert(childs
.n_active
> 0);
839 debugs(84, DBG_CRITICAL
, "WARNING: " << id_name
<< " #" << srv
->index
<< " exited");
841 if (childs
.needNew() > 0) {
842 debugs(80, DBG_IMPORTANT
, "Too few " << id_name
<< " processes are running (need " << childs
.needNew() << "/" << childs
.n_max
<< ")");
844 if (childs
.n_active
< childs
.n_startup
&& last_restart
> squid_curtime
- 30) {
845 if (srv
->stats
.replies
< 1)
846 fatalf("The %s helpers are crashing too rapidly, need help!\n", id_name
);
848 debugs(80, DBG_CRITICAL
, "ERROR: The " << id_name
<< " helpers are crashing too rapidly, need help!");
850 srv
->flags
.shutdown
= true;
851 needsNewServers
= true;
857 helper_server::HelperServerClosed(helper_server
*srv
)
859 helper
*hlp
= srv
->getParent();
861 bool needsNewServers
= false;
862 hlp
->handleKilledServer(srv
, needsNewServers
);
863 if (needsNewServers
) {
864 debugs(80, DBG_IMPORTANT
, "Starting new helpers");
865 helperOpenServers(hlp
);
873 // XXX: Almost duplicates helper_server::HelperServerClosed() because helperOpenServers() is not a virtual method of the `helper` class
874 // TODO: Fix the `helper` class hierarchy to use CbdataParent and virtual functions.
876 helper_stateful_server::HelperServerClosed(helper_stateful_server
*srv
)
878 statefulhelper
*hlp
= static_cast<statefulhelper
*>(srv
->getParent());
880 bool needsNewServers
= false;
881 hlp
->handleKilledServer(srv
, needsNewServers
);
882 if (needsNewServers
) {
883 debugs(80, DBG_IMPORTANT
, "Starting new helpers");
884 helperStatefulOpenServers(hlp
);
893 helper_server::popRequest(int request_number
)
895 Helper::Xaction
*r
= nullptr;
896 helper_server::RequestIndex::iterator it
;
897 if (parent
->childs
.concurrency
) {
898 // If concurrency supported retrieve request from ID
899 it
= requestsIndex
.find(request_number
);
900 if (it
!= requestsIndex
.end()) {
902 requests
.erase(it
->second
);
903 requestsIndex
.erase(it
);
905 } else if(!requests
.empty()) {
906 // Else get the first request from queue, if any
907 r
= requests
.front();
908 requests
.pop_front();
914 /// Calls back with a pointer to the buffer with the helper output
916 helperReturnBuffer(helper_server
* srv
, helper
* hlp
, char * msg
, size_t msgSize
, char * msgEnd
)
918 if (Helper::Xaction
*r
= srv
->replyXaction
) {
919 const bool hasSpace
= r
->reply
.accumulate(msg
, msgSize
);
921 debugs(84, DBG_IMPORTANT
, "ERROR: Disconnecting from a " <<
922 "helper that overflowed " << srv
->rbuf_sz
<< "-byte " <<
923 "Squid input buffer: " << hlp
->id_name
<< " #" << srv
->index
);
924 srv
->closePipesSafely(hlp
->id_name
);
929 return; // We are waiting for more data.
932 if (cbdataReferenceValid(r
->request
.data
)) {
934 if (r
->reply
.result
== Helper::BrokenHelper
&& r
->request
.retries
< MAX_RETRIES
) {
935 debugs(84, DBG_IMPORTANT
, "ERROR: helper: " << r
->reply
<< ", attempt #" << (r
->request
.retries
+ 1) << " of 2");
938 HLPCB
*callback
= r
->request
.callback
;
939 r
->request
.callback
= nullptr;
940 void *cbdata
= nullptr;
941 if (cbdataReferenceValidDone(r
->request
.data
, &cbdata
))
942 callback(cbdata
, r
->reply
);
946 -- srv
->stats
.pending
;
947 ++ srv
->stats
.replies
;
949 ++ hlp
->stats
.replies
;
951 srv
->answer_time
= current_time
;
953 srv
->dispatch_time
= r
->request
.dispatch_time
;
955 hlp
->stats
.avg_svc_time
=
956 Math::intAverage(hlp
->stats
.avg_svc_time
,
957 tvSubMsec(r
->request
.dispatch_time
, current_time
),
958 hlp
->stats
.replies
, REDIRECT_AV_FACTOR
);
960 // release or re-submit parsedRequestXaction object
961 srv
->replyXaction
= nullptr;
963 ++r
->request
.retries
;
964 hlp
->submitRequest(r
);
969 if (hlp
->timeout
&& hlp
->childs
.concurrency
)
970 srv
->checkForTimedOutRequests(hlp
->retryTimedOut
);
972 if (!srv
->flags
.shutdown
) {
973 helperKickQueue(hlp
);
974 } else if (!srv
->flags
.closing
&& !srv
->stats
.pending
) {
975 srv
->closeWritePipeSafely(srv
->parent
->id_name
);
980 helperHandleRead(const Comm::ConnectionPointer
&conn
, char *, size_t len
, Comm::Flag flag
, int, void *data
)
982 helper_server
*srv
= (helper_server
*)data
;
983 helper
*hlp
= srv
->parent
;
984 assert(cbdataReferenceValid(data
));
986 /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
988 if (flag
== Comm::ERR_CLOSING
) {
992 assert(conn
->fd
== srv
->readPipe
->fd
);
994 debugs(84, 5, "helperHandleRead: " << len
<< " bytes from " << hlp
->id_name
<< " #" << srv
->index
);
996 if (flag
!= Comm::OK
|| len
== 0) {
997 srv
->closePipesSafely(hlp
->id_name
);
1001 srv
->roffset
+= len
;
1002 srv
->rbuf
[srv
->roffset
] = '\0';
1003 debugs(84, DBG_DATA
, Raw("accumulated", srv
->rbuf
, srv
->roffset
));
1005 if (!srv
->stats
.pending
&& !srv
->stats
.timedout
) {
1006 /* someone spoke without being spoken to */
1007 debugs(84, DBG_IMPORTANT
, "ERROR: helperHandleRead: unexpected read from " <<
1008 hlp
->id_name
<< " #" << srv
->index
<< ", " << (int)len
<<
1009 " bytes '" << srv
->rbuf
<< "'");
1012 srv
->rbuf
[0] = '\0';
1015 bool needsMore
= false;
1016 char *msg
= srv
->rbuf
;
1017 while (*msg
&& !needsMore
) {
1019 char *eom
= strchr(msg
, hlp
->eom
);
1022 debugs(84, 3, "helperHandleRead: end of reply found");
1023 if (eom
> msg
&& eom
[-1] == '\r' && hlp
->eom
== '\n') {
1025 // rewind to the \r octet which is the real terminal now
1026 // and remember that we have to skip forward 2 places now.
1033 if (!srv
->ignoreToEom
&& !srv
->replyXaction
) {
1035 if (hlp
->childs
.concurrency
) {
1037 i
= strtol(msg
, &e
, 10);
1038 // Do we need to check for e == msg? Means wrong response from helper.
1039 // Will be dropped as "unexpected reply on channel 0"
1040 needsMore
= !(xisspace(*e
) || (eom
&& e
== eom
));
1043 while (*msg
&& xisspace(*msg
))
1045 } // else not enough data to compute request number
1047 if (!(srv
->replyXaction
= srv
->popRequest(i
))) {
1048 if (srv
->stats
.timedout
) {
1049 debugs(84, 3, "Timedout reply received for request-ID: " << i
<< " , ignore");
1051 debugs(84, DBG_IMPORTANT
, "ERROR: helperHandleRead: unexpected reply on channel " <<
1052 i
<< " from " << hlp
->id_name
<< " #" << srv
->index
<<
1053 " '" << srv
->rbuf
<< "'");
1055 srv
->ignoreToEom
= true;
1057 } // else we need to just append reply data to the current Xaction
1060 size_t msgSize
= eom
? eom
- msg
: (srv
->roffset
- (msg
- srv
->rbuf
));
1061 assert(msgSize
<= srv
->rbuf_sz
);
1062 helperReturnBuffer(srv
, hlp
, msg
, msgSize
, eom
);
1063 msg
+= msgSize
+ skip
;
1064 assert(static_cast<size_t>(msg
- srv
->rbuf
) <= srv
->rbuf_sz
);
1066 // The next message should not ignored.
1067 if (eom
&& srv
->ignoreToEom
)
1068 srv
->ignoreToEom
= false;
1070 assert(skip
== 0 && eom
== nullptr);
1074 size_t msgSize
= (srv
->roffset
- (msg
- srv
->rbuf
));
1075 assert(msgSize
<= srv
->rbuf_sz
);
1076 memmove(srv
->rbuf
, msg
, msgSize
);
1077 srv
->roffset
= msgSize
;
1078 srv
->rbuf
[srv
->roffset
] = '\0';
1080 // All of the responses parsed and msg points at the end of read data
1081 assert(static_cast<size_t>(msg
- srv
->rbuf
) == srv
->roffset
);
1085 if (Comm::IsConnOpen(srv
->readPipe
) && !fd_table
[srv
->readPipe
->fd
].closing()) {
1086 int spaceSize
= srv
->rbuf_sz
- srv
->roffset
- 1;
1087 assert(spaceSize
>= 0);
1089 AsyncCall::Pointer call
= commCbCall(5,4, "helperHandleRead",
1090 CommIoCbPtrFun(helperHandleRead
, srv
));
1091 comm_read(srv
->readPipe
, srv
->rbuf
+ srv
->roffset
, spaceSize
, call
);
1096 helperStatefulHandleRead(const Comm::ConnectionPointer
&conn
, char *, size_t len
, Comm::Flag flag
, int, void *data
)
1099 helper_stateful_server
*srv
= (helper_stateful_server
*)data
;
1100 statefulhelper
*hlp
= srv
->parent
;
1101 assert(cbdataReferenceValid(data
));
1103 /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
1105 if (flag
== Comm::ERR_CLOSING
) {
1109 assert(conn
->fd
== srv
->readPipe
->fd
);
1111 debugs(84, 5, "helperStatefulHandleRead: " << len
<< " bytes from " <<
1112 hlp
->id_name
<< " #" << srv
->index
);
1114 if (flag
!= Comm::OK
|| len
== 0) {
1115 srv
->closePipesSafely(hlp
->id_name
);
1119 srv
->roffset
+= len
;
1120 srv
->rbuf
[srv
->roffset
] = '\0';
1121 Helper::Xaction
*r
= srv
->requests
.front();
1122 debugs(84, DBG_DATA
, Raw("accumulated", srv
->rbuf
, srv
->roffset
));
1125 /* someone spoke without being spoken to */
1126 debugs(84, DBG_IMPORTANT
, "ERROR: helperStatefulHandleRead: unexpected read from " <<
1127 hlp
->id_name
<< " #" << srv
->index
<< ", " << (int)len
<<
1128 " bytes '" << srv
->rbuf
<< "'");
1133 if ((t
= strchr(srv
->rbuf
, hlp
->eom
))) {
1134 debugs(84, 3, "helperStatefulHandleRead: end of reply found");
1136 if (t
> srv
->rbuf
&& t
[-1] == '\r' && hlp
->eom
== '\n') {
1138 // rewind to the \r octet which is the real terminal now
1145 if (r
&& !r
->reply
.accumulate(srv
->rbuf
, t
? (t
- srv
->rbuf
) : srv
->roffset
)) {
1146 debugs(84, DBG_IMPORTANT
, "ERROR: Disconnecting from a " <<
1147 "helper that overflowed " << srv
->rbuf_sz
<< "-byte " <<
1148 "Squid input buffer: " << hlp
->id_name
<< " #" << srv
->index
);
1149 srv
->closePipesSafely(hlp
->id_name
);
1153 * BUG: the below assumes that only one response per read() was received and discards any octets remaining.
1154 * Doing this prohibits concurrency support with multiple replies per read().
1155 * TODO: check that read() setup on these buffers pays attention to roffest!=0
1156 * TODO: check that replies bigger than the buffer are discarded and do not to affect future replies
1161 /* end of reply found */
1162 srv
->requests
.pop_front(); // we already have it in 'r'
1165 if (r
&& cbdataReferenceValid(r
->request
.data
)) {
1166 r
->reply
.finalize();
1167 r
->reply
.reservationId
= srv
->reservationId
;
1168 r
->request
.callback(r
->request
.data
, r
->reply
);
1170 debugs(84, DBG_IMPORTANT
, "StatefulHandleRead: no callback data registered");
1176 -- srv
->stats
.pending
;
1177 ++ srv
->stats
.replies
;
1179 ++ hlp
->stats
.replies
;
1180 srv
->answer_time
= current_time
;
1181 hlp
->stats
.avg_svc_time
=
1182 Math::intAverage(hlp
->stats
.avg_svc_time
,
1183 tvSubMsec(srv
->dispatch_time
, current_time
),
1184 hlp
->stats
.replies
, REDIRECT_AV_FACTOR
);
1187 helperStatefulServerDone(srv
);
1189 hlp
->cancelReservation(srv
->reservationId
);
1192 if (Comm::IsConnOpen(srv
->readPipe
) && !fd_table
[srv
->readPipe
->fd
].closing()) {
1193 int spaceSize
= srv
->rbuf_sz
- 1;
1195 AsyncCall::Pointer call
= commCbCall(5,4, "helperStatefulHandleRead",
1196 CommIoCbPtrFun(helperStatefulHandleRead
, srv
));
1197 comm_read(srv
->readPipe
, srv
->rbuf
, spaceSize
, call
);
1201 /// Handles a request when all running helpers, if any, are busy.
1203 Enqueue(helper
* hlp
, Helper::Xaction
* r
)
1206 ++ hlp
->stats
.queue_size
;
1208 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1209 if (hlp
->childs
.needNew() > 0) {
1210 debugs(84, DBG_CRITICAL
, "Starting new " << hlp
->id_name
<< " helpers...");
1211 helperOpenServers(hlp
);
1215 if (hlp
->stats
.queue_size
< (int)hlp
->childs
.queue_size
)
1218 if (squid_curtime
- hlp
->last_queue_warn
< 600)
1221 if (shutting_down
|| reconfiguring
)
1224 hlp
->last_queue_warn
= squid_curtime
;
1226 debugs(84, DBG_CRITICAL
, "WARNING: All " << hlp
->childs
.n_active
<< "/" << hlp
->childs
.n_max
<< " " << hlp
->id_name
<< " processes are busy.");
1227 debugs(84, DBG_CRITICAL
, "WARNING: " << hlp
->stats
.queue_size
<< " pending requests queued");
1228 debugs(84, DBG_CRITICAL
, "WARNING: Consider increasing the number of " << hlp
->id_name
<< " processes in your config file.");
1232 StatefulEnqueue(statefulhelper
* hlp
, Helper::Xaction
* r
)
1235 ++ hlp
->stats
.queue_size
;
1237 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1238 if (hlp
->childs
.needNew() > 0) {
1239 debugs(84, DBG_CRITICAL
, "Starting new " << hlp
->id_name
<< " helpers...");
1240 helperStatefulOpenServers(hlp
);
1244 if (hlp
->stats
.queue_size
< (int)hlp
->childs
.queue_size
)
1247 if (squid_curtime
- hlp
->last_queue_warn
< 600)
1250 if (shutting_down
|| reconfiguring
)
1253 hlp
->last_queue_warn
= squid_curtime
;
1255 debugs(84, DBG_CRITICAL
, "WARNING: All " << hlp
->childs
.n_active
<< "/" << hlp
->childs
.n_max
<< " " << hlp
->id_name
<< " processes are busy.");
1256 debugs(84, DBG_CRITICAL
, "WARNING: " << hlp
->stats
.queue_size
<< " pending requests queued");
1257 debugs(84, DBG_CRITICAL
, "WARNING: Consider increasing the number of " << hlp
->id_name
<< " processes in your config file.");
1261 helper::nextRequest()
1266 auto *r
= queue
.front();
1272 static helper_server
*
1273 GetFirstAvailable(const helper
* hlp
)
1277 helper_server
*selected
= nullptr;
1278 debugs(84, 5, "GetFirstAvailable: Running servers " << hlp
->childs
.n_running
);
1280 if (hlp
->childs
.n_running
== 0)
1283 /* Find "least" loaded helper (approx) */
1284 for (n
= hlp
->servers
.head
; n
!= nullptr; n
= n
->next
) {
1285 srv
= (helper_server
*)n
->data
;
1287 if (selected
&& selected
->stats
.pending
<= srv
->stats
.pending
)
1290 if (srv
->flags
.shutdown
)
1293 if (!srv
->stats
.pending
)
1305 debugs(84, 5, "GetFirstAvailable: None available.");
1309 if (selected
->stats
.pending
>= (hlp
->childs
.concurrency
? hlp
->childs
.concurrency
: 1)) {
1310 debugs(84, 3, "GetFirstAvailable: Least-loaded helper is fully loaded!");
1314 debugs(84, 5, "GetFirstAvailable: returning srv-" << selected
->index
);
1318 static helper_stateful_server
*
1319 StatefulGetFirstAvailable(statefulhelper
* hlp
)
1322 helper_stateful_server
*srv
= nullptr;
1323 helper_stateful_server
*oldestReservedServer
= nullptr;
1324 debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp
->childs
.n_running
);
1326 if (hlp
->childs
.n_running
== 0)
1329 for (n
= hlp
->servers
.head
; n
!= nullptr; n
= n
->next
) {
1330 srv
= (helper_stateful_server
*)n
->data
;
1332 if (srv
->stats
.pending
)
1335 if (srv
->reserved()) {
1336 if ((squid_curtime
- srv
->reservationStart
) > hlp
->childs
.reservationTimeout
) {
1337 if (!oldestReservedServer
)
1338 oldestReservedServer
= srv
;
1339 else if (oldestReservedServer
->reservationStart
< srv
->reservationStart
)
1340 oldestReservedServer
= srv
;
1341 debugs(84, 5, "the earlier reserved server is the srv-" << oldestReservedServer
->index
);
1346 if (srv
->flags
.shutdown
)
1349 debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv
->index
);
1353 if (oldestReservedServer
) {
1354 debugs(84, 5, "expired reservation " << oldestReservedServer
->reservationId
<< " for srv-" << oldestReservedServer
->index
);
1355 return oldestReservedServer
;
1358 debugs(84, 5, "StatefulGetFirstAvailable: None available.");
1363 helperDispatchWriteDone(const Comm::ConnectionPointer
&, char *, size_t, Comm::Flag flag
, int, void *data
)
1365 helper_server
*srv
= (helper_server
*)data
;
1367 srv
->writebuf
->clean();
1368 delete srv
->writebuf
;
1369 srv
->writebuf
= nullptr;
1370 srv
->flags
.writing
= false;
1372 if (flag
!= Comm::OK
) {
1373 /* Helper server has crashed */
1374 debugs(84, DBG_CRITICAL
, "helperDispatch: Helper " << srv
->parent
->id_name
<< " #" << srv
->index
<< " has crashed");
1378 if (!srv
->wqueue
->isNull()) {
1379 srv
->writebuf
= srv
->wqueue
;
1380 srv
->wqueue
= new MemBuf
;
1381 srv
->flags
.writing
= true;
1382 AsyncCall::Pointer call
= commCbCall(5,5, "helperDispatchWriteDone",
1383 CommIoCbPtrFun(helperDispatchWriteDone
, srv
));
1384 Comm::Write(srv
->writePipe
, srv
->writebuf
->content(), srv
->writebuf
->contentSize(), call
, nullptr);
1389 helperDispatch(helper_server
* srv
, Helper::Xaction
* r
)
1391 helper
*hlp
= srv
->parent
;
1392 const uint64_t reqId
= ++srv
->nextRequestId
;
1394 if (!cbdataReferenceValid(r
->request
.data
)) {
1395 debugs(84, DBG_IMPORTANT
, "ERROR: helperDispatch: invalid callback data");
1400 r
->request
.Id
= reqId
;
1401 helper_server::Requests::iterator it
= srv
->requests
.insert(srv
->requests
.end(), r
);
1402 r
->request
.dispatch_time
= current_time
;
1404 if (srv
->wqueue
->isNull())
1405 srv
->wqueue
->init();
1407 if (hlp
->childs
.concurrency
) {
1408 srv
->requestsIndex
.insert(helper_server::RequestIndex::value_type(reqId
, it
));
1409 assert(srv
->requestsIndex
.size() == srv
->requests
.size());
1410 srv
->wqueue
->appendf("%" PRIu64
" %s", reqId
, r
->request
.buf
);
1412 srv
->wqueue
->append(r
->request
.buf
, strlen(r
->request
.buf
));
1414 if (!srv
->flags
.writing
) {
1415 assert(nullptr == srv
->writebuf
);
1416 srv
->writebuf
= srv
->wqueue
;
1417 srv
->wqueue
= new MemBuf
;
1418 srv
->flags
.writing
= true;
1419 AsyncCall::Pointer call
= commCbCall(5,5, "helperDispatchWriteDone",
1420 CommIoCbPtrFun(helperDispatchWriteDone
, srv
));
1421 Comm::Write(srv
->writePipe
, srv
->writebuf
->content(), srv
->writebuf
->contentSize(), call
, nullptr);
1424 debugs(84, 5, "helperDispatch: Request sent to " << hlp
->id_name
<< " #" << srv
->index
<< ", " << strlen(r
->request
.buf
) << " bytes");
1427 ++ srv
->stats
.pending
;
1428 ++ hlp
->stats
.requests
;
1432 helperStatefulDispatchWriteDone(const Comm::ConnectionPointer
&, char *, size_t, Comm::Flag
, int, void *)
1436 helperStatefulDispatch(helper_stateful_server
* srv
, Helper::Xaction
* r
)
1438 statefulhelper
*hlp
= srv
->parent
;
1440 if (!cbdataReferenceValid(r
->request
.data
)) {
1441 debugs(84, DBG_IMPORTANT
, "ERROR: helperStatefulDispatch: invalid callback data");
1443 hlp
->cancelReservation(srv
->reservationId
);
1447 debugs(84, 9, "helperStatefulDispatch busying helper " << hlp
->id_name
<< " #" << srv
->index
);
1449 assert(srv
->reservationId
);
1450 r
->reply
.reservationId
= srv
->reservationId
;
1452 if (r
->request
.placeholder
== 1) {
1453 /* a callback is needed before this request can _use_ a helper. */
1454 /* we don't care about releasing this helper. The request NEVER
1455 * gets to the helper. So we throw away the return code */
1456 r
->reply
.result
= Helper::Unknown
;
1457 r
->request
.callback(r
->request
.data
, r
->reply
);
1458 /* throw away the placeholder */
1460 /* and push the queue. Note that the callback may have submitted a new
1461 * request to the helper which is why we test for the request */
1463 if (!srv
->requests
.size())
1464 helperStatefulServerDone(srv
);
1469 srv
->requests
.push_back(r
);
1470 srv
->dispatch_time
= current_time
;
1471 AsyncCall::Pointer call
= commCbCall(5,5, "helperStatefulDispatchWriteDone",
1472 CommIoCbPtrFun(helperStatefulDispatchWriteDone
, hlp
));
1473 Comm::Write(srv
->writePipe
, r
->request
.buf
, strlen(r
->request
.buf
), call
, nullptr);
1474 debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
1475 hlp
->id_name
<< " #" << srv
->index
<< ", " <<
1476 (int) strlen(r
->request
.buf
) << " bytes");
1479 ++ srv
->stats
.pending
;
1480 ++ hlp
->stats
.requests
;
1484 helperKickQueue(helper
* hlp
)
1489 while ((srv
= GetFirstAvailable(hlp
)) && (r
= hlp
->nextRequest()))
1490 helperDispatch(srv
, r
);
1494 helperStatefulKickQueue(statefulhelper
* hlp
)
1497 helper_stateful_server
*srv
;
1498 while ((srv
= StatefulGetFirstAvailable(hlp
)) && (r
= hlp
->nextRequest())) {
1499 debugs(84, 5, "found srv-" << srv
->index
);
1500 hlp
->reserveServer(srv
);
1501 helperStatefulDispatch(srv
, r
);
1506 helperStatefulServerDone(helper_stateful_server
* srv
)
1508 if (!srv
->flags
.shutdown
) {
1509 helperStatefulKickQueue(srv
->parent
);
1510 } else if (!srv
->flags
.closing
&& !srv
->reserved() && !srv
->stats
.pending
) {
1511 srv
->closeWritePipeSafely(srv
->parent
->id_name
);
1517 helper_server::checkForTimedOutRequests(bool const retry
)
1519 assert(parent
->childs
.concurrency
);
1520 while(!requests
.empty() && requests
.front()->request
.timedOut(parent
->timeout
)) {
1521 Helper::Xaction
*r
= requests
.front();
1522 RequestIndex::iterator it
;
1523 it
= requestsIndex
.find(r
->request
.Id
);
1524 assert(it
!= requestsIndex
.end());
1525 requestsIndex
.erase(it
);
1526 requests
.pop_front();
1527 debugs(84, 2, "Request " << r
->request
.Id
<< " timed-out, remove it from queue");
1529 bool retried
= false;
1530 if (retry
&& r
->request
.retries
< MAX_RETRIES
&& cbdataReferenceValid(r
->request
.data
)) {
1531 debugs(84, 2, "Retry request " << r
->request
.Id
);
1532 ++r
->request
.retries
;
1533 parent
->submitRequest(r
);
1535 } else if (cbdataReferenceValidDone(r
->request
.data
, &cbdata
)) {
1536 if (!parent
->onTimedOutResponse
.isEmpty()) {
1537 if (r
->reply
.accumulate(parent
->onTimedOutResponse
.rawContent(), parent
->onTimedOutResponse
.length()))
1538 r
->reply
.finalize();
1540 r
->reply
.result
= Helper::TimedOut
;
1541 r
->request
.callback(cbdata
, r
->reply
);
1543 r
->reply
.result
= Helper::TimedOut
;
1544 r
->request
.callback(cbdata
, r
->reply
);
1549 ++parent
->stats
.timedout
;
1556 helper_server::requestTimeout(const CommTimeoutCbParams
&io
)
1558 debugs(26, 3, io
.conn
);
1559 helper_server
*srv
= static_cast<helper_server
*>(io
.data
);
1561 srv
->checkForTimedOutRequests(srv
->parent
->retryTimedOut
);
1563 debugs(84, 3, io
.conn
<< " establish new helper_server::requestTimeout");
1564 AsyncCall::Pointer timeoutCall
= commCbCall(84, 4, "helper_server::requestTimeout",
1565 CommTimeoutCbPtrFun(helper_server::requestTimeout
, srv
));
1567 const int timeSpent
= srv
->requests
.empty() ? 0 : (squid_curtime
- srv
->requests
.front()->request
.dispatch_time
.tv_sec
);
1568 const int timeLeft
= max(1, (static_cast<int>(srv
->parent
->timeout
) - timeSpent
));
1570 commSetConnTimeout(io
.conn
, timeLeft
, timeoutCall
);