2 * Copyright (C) 1996-2016 The Squid Software Foundation and contributors
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
9 /* DEBUG: section 84 Helper process maintenance */
12 #include "base/AsyncCbdataCalls.h"
13 #include "base/Packable.h"
15 #include "comm/Connection.h"
16 #include "comm/Read.h"
17 #include "comm/Write.h"
20 #include "format/Quoting.h"
22 #include "helper/Reply.h"
23 #include "helper/Request.h"
25 #include "SquidConfig.h"
27 #include "SquidMath.h"
28 #include "SquidTime.h"
32 // helper_stateful_server::data uses explicit alloc()/freeOne() */
35 #define HELPER_MAX_ARGS 64
37 /// The maximum allowed request retries.
40 /// Helpers input buffer size.
41 const size_t ReadBufSize(32*1024);
43 static IOCB helperHandleRead
;
44 static IOCB helperStatefulHandleRead
;
45 static void helperServerFree(helper_server
*srv
);
46 static void helperStatefulServerFree(helper_stateful_server
*srv
);
47 static void Enqueue(helper
* hlp
, Helper::Xaction
*);
48 static helper_server
*GetFirstAvailable(const helper
* hlp
);
49 static helper_stateful_server
*StatefulGetFirstAvailable(const statefulhelper
* hlp
);
50 static void helperDispatch(helper_server
* srv
, Helper::Xaction
* r
);
51 static void helperStatefulDispatch(helper_stateful_server
* srv
, Helper::Xaction
* r
);
52 static void helperKickQueue(helper
* hlp
);
53 static void helperStatefulKickQueue(statefulhelper
* hlp
);
54 static void helperStatefulServerDone(helper_stateful_server
* srv
);
55 static void StatefulEnqueue(statefulhelper
* hlp
, Helper::Xaction
* r
);
57 CBDATA_CLASS_INIT(helper
);
58 CBDATA_CLASS_INIT(helper_server
);
59 CBDATA_CLASS_INIT(statefulhelper
);
60 CBDATA_CLASS_INIT(helper_stateful_server
);
62 InstanceIdDefinitions(HelperServerBase
, "Hlpr");
65 HelperServerBase::initStats()
75 HelperServerBase::closePipesSafely(const char *id_name
)
78 shutdown(writePipe
->fd
, SD_BOTH
);
82 if (readPipe
->fd
== writePipe
->fd
)
90 if (WaitForSingleObject(hIpc
, 5000) != WAIT_OBJECT_0
) {
92 debugs(84, DBG_IMPORTANT
, "WARNING: " << id_name
<<
93 " #" << index
<< " (PID " << (long int)pid
<< ") didn't exit in 5 seconds");
101 HelperServerBase::closeWritePipeSafely(const char *id_name
)
104 shutdown(writePipe
->fd
, (readPipe
->fd
== writePipe
->fd
? SD_BOTH
: SD_SEND
));
107 flags
.closing
= true;
108 if (readPipe
->fd
== writePipe
->fd
)
114 if (WaitForSingleObject(hIpc
, 5000) != WAIT_OBJECT_0
) {
116 debugs(84, DBG_IMPORTANT
, "WARNING: " << id_name
<<
117 " #" << index
<< " (PID " << (long int)pid
<< ") didn't exit in 5 seconds");
125 helperOpenServers(helper
* hlp
)
131 const char *args
[HELPER_MAX_ARGS
+1]; // save space for a NULL terminator
132 char fd_note_buf
[FD_DESC_SZ
];
142 if (hlp
->cmdline
== NULL
)
145 progname
= hlp
->cmdline
->key
;
147 if ((s
= strrchr(progname
, '/')))
148 shortname
= xstrdup(s
+ 1);
150 shortname
= xstrdup(progname
);
152 /* figure out how many new child are actually needed. */
153 int need_new
= hlp
->childs
.needNew();
155 debugs(84, DBG_IMPORTANT
, "helperOpenServers: Starting " << need_new
<< "/" << hlp
->childs
.n_max
<< " '" << shortname
<< "' processes");
158 debugs(84, DBG_IMPORTANT
, "helperOpenServers: No '" << shortname
<< "' processes needed.");
161 procname
= (char *)xmalloc(strlen(shortname
) + 3);
163 snprintf(procname
, strlen(shortname
) + 3, "(%s)", shortname
);
165 args
[nargs
] = procname
;
168 for (w
= hlp
->cmdline
->next
; w
&& nargs
< HELPER_MAX_ARGS
; w
= w
->next
) {
169 args
[nargs
] = w
->key
;
176 assert(nargs
<= HELPER_MAX_ARGS
);
178 for (k
= 0; k
< need_new
; ++k
) {
181 pid
= ipcCreate(hlp
->ipc_type
,
191 debugs(84, DBG_IMPORTANT
, "WARNING: Cannot run '" << progname
<< "' process.");
195 ++ hlp
->childs
.n_running
;
196 ++ hlp
->childs
.n_active
;
197 srv
= new helper_server
;
201 srv
->addr
= hlp
->addr
;
202 srv
->readPipe
= new Comm::Connection
;
203 srv
->readPipe
->fd
= rfd
;
204 srv
->writePipe
= new Comm::Connection
;
205 srv
->writePipe
->fd
= wfd
;
206 srv
->rbuf
= (char *)memAllocBuf(ReadBufSize
, &srv
->rbuf_sz
);
207 srv
->wqueue
= new MemBuf
;
209 srv
->nextRequestId
= 0;
210 srv
->replyXaction
= NULL
;
211 srv
->ignoreToEom
= false;
212 srv
->parent
= cbdataReference(hlp
);
213 dlinkAddTail(srv
, &srv
->link
, &hlp
->servers
);
216 snprintf(fd_note_buf
, FD_DESC_SZ
, "%s #%d", shortname
, k
+ 1);
217 fd_note(rfd
, fd_note_buf
);
219 snprintf(fd_note_buf
, FD_DESC_SZ
, "reading %s #%d", shortname
, k
+ 1);
220 fd_note(rfd
, fd_note_buf
);
221 snprintf(fd_note_buf
, FD_DESC_SZ
, "writing %s #%d", shortname
, k
+ 1);
222 fd_note(wfd
, fd_note_buf
);
225 commSetNonBlocking(rfd
);
228 commSetNonBlocking(wfd
);
230 AsyncCall::Pointer closeCall
= asyncCall(5,4, "helperServerFree", cbdataDialer(helperServerFree
, srv
));
231 comm_add_close_handler(rfd
, closeCall
);
233 if (hlp
->timeout
&& hlp
->childs
.concurrency
) {
234 AsyncCall::Pointer timeoutCall
= commCbCall(84, 4, "helper_server::requestTimeout",
235 CommTimeoutCbPtrFun(helper_server::requestTimeout
, srv
));
236 commSetConnTimeout(srv
->readPipe
, hlp
->timeout
, timeoutCall
);
239 AsyncCall::Pointer call
= commCbCall(5,4, "helperHandleRead",
240 CommIoCbPtrFun(helperHandleRead
, srv
));
241 comm_read(srv
->readPipe
, srv
->rbuf
, srv
->rbuf_sz
- 1, call
);
244 hlp
->last_restart
= squid_curtime
;
245 safe_free(shortname
);
247 helperKickQueue(hlp
);
253 * helperStatefulOpenServers: create the stateful child helper processes
256 helperStatefulOpenServers(statefulhelper
* hlp
)
259 const char *args
[HELPER_MAX_ARGS
+1]; // save space for a NULL terminator
260 char fd_note_buf
[FD_DESC_SZ
];
263 if (hlp
->cmdline
== NULL
)
266 if (hlp
->childs
.concurrency
)
267 debugs(84, DBG_CRITICAL
, "ERROR: concurrency= is not yet supported for stateful helpers ('" << hlp
->cmdline
<< "')");
269 char *progname
= hlp
->cmdline
->key
;
272 if ((s
= strrchr(progname
, '/')))
273 shortname
= xstrdup(s
+ 1);
275 shortname
= xstrdup(progname
);
277 /* figure out haw mant new helpers are needed. */
278 int need_new
= hlp
->childs
.needNew();
280 debugs(84, DBG_IMPORTANT
, "helperOpenServers: Starting " << need_new
<< "/" << hlp
->childs
.n_max
<< " '" << shortname
<< "' processes");
283 debugs(84, DBG_IMPORTANT
, "helperStatefulOpenServers: No '" << shortname
<< "' processes needed.");
286 char *procname
= (char *)xmalloc(strlen(shortname
) + 3);
288 snprintf(procname
, strlen(shortname
) + 3, "(%s)", shortname
);
290 args
[nargs
] = procname
;
293 for (wordlist
*w
= hlp
->cmdline
->next
; w
&& nargs
< HELPER_MAX_ARGS
; w
= w
->next
) {
294 args
[nargs
] = w
->key
;
301 assert(nargs
<= HELPER_MAX_ARGS
);
303 for (int k
= 0; k
< need_new
; ++k
) {
308 pid_t pid
= ipcCreate(hlp
->ipc_type
,
318 debugs(84, DBG_IMPORTANT
, "WARNING: Cannot run '" << progname
<< "' process.");
322 ++ hlp
->childs
.n_running
;
323 ++ hlp
->childs
.n_active
;
324 helper_stateful_server
*srv
= new helper_stateful_server
;
327 srv
->flags
.reserved
= false;
329 srv
->addr
= hlp
->addr
;
330 srv
->readPipe
= new Comm::Connection
;
331 srv
->readPipe
->fd
= rfd
;
332 srv
->writePipe
= new Comm::Connection
;
333 srv
->writePipe
->fd
= wfd
;
334 srv
->rbuf
= (char *)memAllocBuf(ReadBufSize
, &srv
->rbuf_sz
);
336 srv
->parent
= cbdataReference(hlp
);
338 if (hlp
->datapool
!= NULL
)
339 srv
->data
= hlp
->datapool
->alloc();
341 dlinkAddTail(srv
, &srv
->link
, &hlp
->servers
);
344 snprintf(fd_note_buf
, FD_DESC_SZ
, "%s #%d", shortname
, k
+ 1);
345 fd_note(rfd
, fd_note_buf
);
347 snprintf(fd_note_buf
, FD_DESC_SZ
, "reading %s #%d", shortname
, k
+ 1);
348 fd_note(rfd
, fd_note_buf
);
349 snprintf(fd_note_buf
, FD_DESC_SZ
, "writing %s #%d", shortname
, k
+ 1);
350 fd_note(wfd
, fd_note_buf
);
353 commSetNonBlocking(rfd
);
356 commSetNonBlocking(wfd
);
358 AsyncCall::Pointer closeCall
= asyncCall(5,4, "helperStatefulServerFree", cbdataDialer(helperStatefulServerFree
, srv
));
359 comm_add_close_handler(rfd
, closeCall
);
361 AsyncCall::Pointer call
= commCbCall(5,4, "helperStatefulHandleRead",
362 CommIoCbPtrFun(helperStatefulHandleRead
, srv
));
363 comm_read(srv
->readPipe
, srv
->rbuf
, srv
->rbuf_sz
- 1, call
);
366 hlp
->last_restart
= squid_curtime
;
367 safe_free(shortname
);
369 helperStatefulKickQueue(hlp
);
373 helper::submitRequest(Helper::Xaction
*r
)
377 if ((srv
= GetFirstAvailable(this)))
378 helperDispatch(srv
, r
);
385 /// handles helperSubmit() and helperStatefulSubmit() failures
387 SubmissionFailure(helper
*hlp
, HLPCB
*callback
, void *data
)
389 auto result
= Helper::Error
;
391 debugs(84, 3, "no helper");
392 result
= Helper::Unknown
;
394 // else pretend the helper has responded with ERR
396 callback(data
, Helper::Reply(result
));
400 helperSubmit(helper
* hlp
, const char *buf
, HLPCB
* callback
, void *data
)
402 if (!hlp
|| !hlp
->trySubmit(buf
, callback
, data
))
403 SubmissionFailure(hlp
, callback
, data
);
406 /// whether queuing an additional request would overload the helper
408 helper::queueFull() const {
409 return stats
.queue_size
>= static_cast<int>(childs
.queue_size
);
413 helper::overloaded() const {
414 return stats
.queue_size
> static_cast<int>(childs
.queue_size
);
417 /// synchronizes queue-dependent measurements with the current queue state
419 helper::syncQueueStats()
423 debugs(84, 5, id_name
<< " still overloaded; dropped " << droppedRequests
);
425 overloadStart
= squid_curtime
;
426 debugs(84, 3, id_name
<< " became overloaded");
430 debugs(84, 5, id_name
<< " is no longer overloaded");
431 if (droppedRequests
) {
432 debugs(84, DBG_IMPORTANT
, "helper " << id_name
<<
433 " is no longer overloaded after dropping " << droppedRequests
<<
434 " requests in " << (squid_curtime
- overloadStart
) << " seconds");
442 /// prepares the helper for request submission
443 /// returns true if and only if the submission should proceed
444 /// may kill Squid if the helper remains overloaded for too long
448 // re-sync for the configuration may have changed since the last submission
451 // Nothing special to do if the new request does not overload (i.e., the
452 // queue is not even full yet) or only _starts_ overloading this helper
453 // (i.e., the queue is currently at its limit).
457 if (squid_curtime
- overloadStart
<= 180)
458 return true; // also OK: overload has not persisted long enough to panic
460 if (childs
.onPersistentOverload
== Helper::ChildConfig::actDie
)
461 fatalf("Too many queued %s requests; see on-persistent-overload.", id_name
);
463 if (!droppedRequests
) {
464 debugs(84, DBG_IMPORTANT
, "WARNING: dropping requests to overloaded " <<
465 id_name
<< " helper configured with on-persistent-overload=err");
468 debugs(84, 3, "failed to send " << droppedRequests
<< " helper requests to " << id_name
);
473 helper::trySubmit(const char *buf
, HLPCB
* callback
, void *data
)
476 return false; // request was dropped
478 submit(buf
, callback
, data
); // will send or queue
479 return true; // request submitted or queued
482 /// dispatches or enqueues a helper requests; does not enforce queue limits
484 helper::submit(const char *buf
, HLPCB
* callback
, void *data
)
486 Helper::Xaction
*r
= new Helper::Xaction(callback
, data
, buf
);
488 debugs(84, DBG_DATA
, Raw("buf", buf
, strlen(buf
)));
491 /// lastserver = "server last used as part of a reserved request sequence"
493 helperStatefulSubmit(statefulhelper
* hlp
, const char *buf
, HLPCB
* callback
, void *data
, helper_stateful_server
* lastserver
)
495 if (!hlp
|| !hlp
->trySubmit(buf
, callback
, data
, lastserver
))
496 SubmissionFailure(hlp
, callback
, data
);
499 /// If possible, submit request. Otherwise, either kill Squid or return false.
501 statefulhelper::trySubmit(const char *buf
, HLPCB
* callback
, void *data
, helper_stateful_server
*lastserver
)
504 return false; // request was dropped
506 submit(buf
, callback
, data
, lastserver
); // will send or queue
507 return true; // request submitted or queued
510 void statefulhelper::submit(const char *buf
, HLPCB
* callback
, void *data
, helper_stateful_server
* lastserver
)
512 Helper::Xaction
*r
= new Helper::Xaction(callback
, data
, buf
);
514 if ((buf
!= NULL
) && lastserver
) {
515 debugs(84, 5, "StatefulSubmit with lastserver " << lastserver
);
516 assert(lastserver
->flags
.reserved
);
517 assert(!lastserver
->requests
.size());
519 debugs(84, 5, "StatefulSubmit dispatching");
520 helperStatefulDispatch(lastserver
, r
);
522 helper_stateful_server
*srv
;
523 if ((srv
= StatefulGetFirstAvailable(this))) {
524 helperStatefulDispatch(srv
, r
);
526 StatefulEnqueue(this, r
);
529 debugs(84, DBG_DATA
, "placeholder: '" << r
->request
.placeholder
<<
530 "', " << Raw("buf", buf
, (!buf
?0:strlen(buf
))));
538 * helperStatefulReleaseServer tells the helper that whoever was
539 * using it no longer needs its services.
542 helperStatefulReleaseServer(helper_stateful_server
* srv
)
544 debugs(84, 3, HERE
<< "srv-" << srv
->index
<< " flags.reserved = " << srv
->flags
.reserved
);
545 if (!srv
->flags
.reserved
)
548 ++ srv
->stats
.releases
;
550 srv
->flags
.reserved
= false;
552 helperStatefulServerDone(srv
);
555 /** return a pointer to the stateful routines data area */
557 helperStatefulServerGetData(helper_stateful_server
* srv
)
563 helper::packStatsInto(Packable
*p
, const char *label
) const
566 p
->appendf("%s:\n", label
);
568 p
->appendf(" program: %s\n", cmdline
->key
);
569 p
->appendf(" number active: %d of %d (%d shutting down)\n", childs
.n_active
, childs
.n_max
, (childs
.n_running
- childs
.n_active
));
570 p
->appendf(" requests sent: %d\n", stats
.requests
);
571 p
->appendf(" replies received: %d\n", stats
.replies
);
572 p
->appendf(" requests timedout: %d\n", stats
.timedout
);
573 p
->appendf(" queue length: %d\n", stats
.queue_size
);
574 p
->appendf(" avg service time: %d msec\n", stats
.avg_svc_time
);
576 p
->appendf("%7s\t%7s\t%7s\t%11s\t%11s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
588 for (dlink_node
*link
= servers
.head
; link
; link
= link
->next
) {
589 HelperServerBase
*srv
= static_cast<HelperServerBase
*>(link
->data
);
591 Helper::Xaction
*xaction
= srv
->requests
.empty() ? NULL
: srv
->requests
.front();
592 double tt
= 0.001 * (xaction
? tvSubMsec(xaction
->request
.dispatch_time
, current_time
) : tvSubMsec(srv
->dispatch_time
, srv
->answer_time
));
593 p
->appendf("%7u\t%7d\t%7d\t%11" PRIu64
"\t%11" PRIu64
"\t%11" PRIu64
"\t%c%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
600 srv
->stats
.pending
? 'B' : ' ',
601 srv
->flags
.writing
? 'W' : ' ',
602 srv
->flags
.closing
? 'C' : ' ',
603 srv
->flags
.reserved
? 'R' : ' ',
604 srv
->flags
.shutdown
? 'S' : ' ',
605 xaction
&& xaction
->request
.placeholder
? 'P' : ' ',
608 xaction
? Format::QuoteMimeBlob(xaction
->request
.buf
) : "(none)");
611 p
->append("\nFlags key:\n"
616 " S\tSHUTDOWN PENDING\n"
617 " P\tPLACEHOLDER\n", 101);
621 helper::willOverload() const {
622 return queueFull() && !(childs
.needNew() || GetFirstAvailable(this));
626 helperShutdown(helper
* hlp
)
628 dlink_node
*link
= hlp
->servers
.head
;
632 srv
= (helper_server
*)link
->data
;
635 if (srv
->flags
.shutdown
) {
636 debugs(84, 3, "helperShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " has already SHUT DOWN.");
640 assert(hlp
->childs
.n_active
> 0);
641 -- hlp
->childs
.n_active
;
642 srv
->flags
.shutdown
= true; /* request it to shut itself down */
644 if (srv
->flags
.closing
) {
645 debugs(84, 3, "helperShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " is CLOSING.");
649 if (srv
->stats
.pending
) {
650 debugs(84, 3, "helperShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " is BUSY.");
654 debugs(84, 3, "helperShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " shutting down.");
655 /* the rest of the details is dealt with in the helperServerFree
658 srv
->closePipesSafely(hlp
->id_name
);
663 helperStatefulShutdown(statefulhelper
* hlp
)
665 dlink_node
*link
= hlp
->servers
.head
;
666 helper_stateful_server
*srv
;
669 srv
= (helper_stateful_server
*)link
->data
;
672 if (srv
->flags
.shutdown
) {
673 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " has already SHUT DOWN.");
677 assert(hlp
->childs
.n_active
> 0);
678 -- hlp
->childs
.n_active
;
679 srv
->flags
.shutdown
= true; /* request it to shut itself down */
681 if (srv
->stats
.pending
) {
682 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " is BUSY.");
686 if (srv
->flags
.closing
) {
687 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " is CLOSING.");
691 if (srv
->flags
.reserved
) {
693 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " is RESERVED. Closing anyway.");
695 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " is RESERVED. Not Shutting Down Yet.");
700 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " shutting down.");
702 /* the rest of the details is dealt with in the helperStatefulServerFree
705 srv
->closePipesSafely(hlp
->id_name
);
711 /* note, don't free id_name, it probably points to static memory */
713 // TODO: if the queue is not empty it will leak Helper::Request's
715 debugs(84, DBG_CRITICAL
, "WARNING: freeing " << id_name
<< " helper with " << stats
.queue_size
<< " requests queued");
718 /* ====================================================================== */
719 /* LOCAL FUNCTIONS */
720 /* ====================================================================== */
723 helperServerFree(helper_server
*srv
)
725 helper
*hlp
= srv
->parent
;
726 int concurrency
= hlp
->childs
.concurrency
;
732 memFreeBuf(srv
->rbuf_sz
, srv
->rbuf
);
736 srv
->wqueue
->clean();
740 srv
->writebuf
->clean();
741 delete srv
->writebuf
;
742 srv
->writebuf
= NULL
;
745 if (Comm::IsConnOpen(srv
->writePipe
))
746 srv
->closeWritePipeSafely(hlp
->id_name
);
748 dlinkDelete(&srv
->link
, &hlp
->servers
);
750 assert(hlp
->childs
.n_running
> 0);
751 -- hlp
->childs
.n_running
;
753 if (!srv
->flags
.shutdown
) {
754 assert(hlp
->childs
.n_active
> 0);
755 -- hlp
->childs
.n_active
;
756 debugs(84, DBG_CRITICAL
, "WARNING: " << hlp
->id_name
<< " #" << srv
->index
<< " exited");
758 if (hlp
->childs
.needNew() > 0) {
759 debugs(80, DBG_IMPORTANT
, "Too few " << hlp
->id_name
<< " processes are running (need " << hlp
->childs
.needNew() << "/" << hlp
->childs
.n_max
<< ")");
761 if (hlp
->childs
.n_active
< hlp
->childs
.n_startup
&& hlp
->last_restart
> squid_curtime
- 30) {
762 if (srv
->stats
.replies
< 1)
763 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp
->id_name
);
765 debugs(80, DBG_CRITICAL
, "ERROR: The " << hlp
->id_name
<< " helpers are crashing too rapidly, need help!");
768 debugs(80, DBG_IMPORTANT
, "Starting new helpers");
769 helperOpenServers(hlp
);
773 while (!srv
->requests
.empty()) {
774 // XXX: re-schedule these on another helper?
775 Helper::Xaction
*r
= srv
->requests
.front();
776 srv
->requests
.pop_front();
779 if (cbdataReferenceValidDone(r
->request
.data
, &cbdata
)) {
780 r
->reply
.result
= Helper::Unknown
;
781 r
->request
.callback(cbdata
, r
->reply
);
786 srv
->requestsIndex
.clear();
788 cbdataReferenceDone(srv
->parent
);
793 helperStatefulServerFree(helper_stateful_server
*srv
)
795 statefulhelper
*hlp
= srv
->parent
;
798 memFreeBuf(srv
->rbuf_sz
, srv
->rbuf
);
803 srv
->wqueue
->clean();
809 /* TODO: walk the local queue of requests and carry them all out */
810 if (Comm::IsConnOpen(srv
->writePipe
))
811 srv
->closeWritePipeSafely(hlp
->id_name
);
813 dlinkDelete(&srv
->link
, &hlp
->servers
);
815 assert(hlp
->childs
.n_running
> 0);
816 -- hlp
->childs
.n_running
;
818 if (!srv
->flags
.shutdown
) {
819 assert( hlp
->childs
.n_active
> 0);
820 -- hlp
->childs
.n_active
;
821 debugs(84, DBG_CRITICAL
, "WARNING: " << hlp
->id_name
<< " #" << srv
->index
<< " exited");
823 if (hlp
->childs
.needNew() > 0) {
824 debugs(80, DBG_IMPORTANT
, "Too few " << hlp
->id_name
<< " processes are running (need " << hlp
->childs
.needNew() << "/" << hlp
->childs
.n_max
<< ")");
826 if (hlp
->childs
.n_active
< hlp
->childs
.n_startup
&& hlp
->last_restart
> squid_curtime
- 30) {
827 if (srv
->stats
.replies
< 1)
828 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp
->id_name
);
830 debugs(80, DBG_CRITICAL
, "ERROR: The " << hlp
->id_name
<< " helpers are crashing too rapidly, need help!");
833 debugs(80, DBG_IMPORTANT
, "Starting new helpers");
834 helperStatefulOpenServers(hlp
);
838 while (!srv
->requests
.empty()) {
839 // XXX: re-schedule these on another helper?
840 Helper::Xaction
*r
= srv
->requests
.front();
841 srv
->requests
.pop_front();
844 if (cbdataReferenceValidDone(r
->request
.data
, &cbdata
)) {
845 r
->reply
.result
= Helper::Unknown
;
846 r
->request
.callback(cbdata
, r
->reply
);
852 if (srv
->data
!= NULL
)
853 hlp
->datapool
->freeOne(srv
->data
);
855 cbdataReferenceDone(srv
->parent
);
861 helper_server::popRequest(int request_number
)
863 Helper::Xaction
*r
= nullptr;
864 helper_server::RequestIndex::iterator it
;
865 if (parent
->childs
.concurrency
) {
866 // If concurency supported retrieve request from ID
867 it
= requestsIndex
.find(request_number
);
868 if (it
!= requestsIndex
.end()) {
870 requests
.erase(it
->second
);
871 requestsIndex
.erase(it
);
873 } else if(!requests
.empty()) {
874 // Else get the first request from queue, if any
875 r
= requests
.front();
876 requests
.pop_front();
882 /// Calls back with a pointer to the buffer with the helper output
884 helperReturnBuffer(helper_server
* srv
, helper
* hlp
, char * msg
, size_t msgSize
, char * msgEnd
)
886 if (Helper::Xaction
*r
= srv
->replyXaction
) {
887 const bool hasSpace
= r
->reply
.accumulate(msg
, msgSize
);
889 debugs(84, DBG_IMPORTANT
, "ERROR: Disconnecting from a " <<
890 "helper that overflowed " << srv
->rbuf_sz
<< "-byte " <<
891 "Squid input buffer: " << hlp
->id_name
<< " #" << srv
->index
);
892 srv
->closePipesSafely(hlp
->id_name
);
897 return; // We are waiting for more data.
899 HLPCB
*callback
= r
->request
.callback
;
900 r
->request
.callback
= nullptr;
903 void *cbdata
= nullptr;
904 if (cbdataReferenceValidDone(r
->request
.data
, &cbdata
)) {
906 if (r
->reply
.result
== Helper::BrokenHelper
&& r
->request
.retries
< MAX_RETRIES
) {
907 debugs(84, DBG_IMPORTANT
, "ERROR: helper: " << r
->reply
<< ", attempt #" << (r
->request
.retries
+ 1) << " of 2");
910 callback(cbdata
, r
->reply
);
913 -- srv
->stats
.pending
;
914 ++ srv
->stats
.replies
;
916 ++ hlp
->stats
.replies
;
918 srv
->answer_time
= current_time
;
920 srv
->dispatch_time
= r
->request
.dispatch_time
;
922 hlp
->stats
.avg_svc_time
=
923 Math::intAverage(hlp
->stats
.avg_svc_time
,
924 tvSubMsec(r
->request
.dispatch_time
, current_time
),
925 hlp
->stats
.replies
, REDIRECT_AV_FACTOR
);
927 // release or re-submit parsedRequestXaction object
928 srv
->replyXaction
= nullptr;
930 ++r
->request
.retries
;
931 hlp
->submitRequest(r
);
936 if (hlp
->timeout
&& hlp
->childs
.concurrency
)
937 srv
->checkForTimedOutRequests(hlp
->retryTimedOut
);
939 if (!srv
->flags
.shutdown
) {
940 helperKickQueue(hlp
);
941 } else if (!srv
->flags
.closing
&& !srv
->stats
.pending
) {
942 srv
->flags
.closing
=true;
943 srv
->writePipe
->close();
948 helperHandleRead(const Comm::ConnectionPointer
&conn
, char *, size_t len
, Comm::Flag flag
, int, void *data
)
950 helper_server
*srv
= (helper_server
*)data
;
951 helper
*hlp
= srv
->parent
;
952 assert(cbdataReferenceValid(data
));
954 /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
956 if (flag
== Comm::ERR_CLOSING
) {
960 assert(conn
->fd
== srv
->readPipe
->fd
);
962 debugs(84, 5, "helperHandleRead: " << len
<< " bytes from " << hlp
->id_name
<< " #" << srv
->index
);
964 if (flag
!= Comm::OK
|| len
== 0) {
965 srv
->closePipesSafely(hlp
->id_name
);
970 srv
->rbuf
[srv
->roffset
] = '\0';
971 debugs(84, DBG_DATA
, Raw("accumulated", srv
->rbuf
, srv
->roffset
));
973 if (!srv
->stats
.pending
&& !srv
->stats
.timedout
) {
974 /* someone spoke without being spoken to */
975 debugs(84, DBG_IMPORTANT
, "helperHandleRead: unexpected read from " <<
976 hlp
->id_name
<< " #" << srv
->index
<< ", " << (int)len
<<
977 " bytes '" << srv
->rbuf
<< "'");
983 bool needsMore
= false;
984 char *msg
= srv
->rbuf
;
985 while (*msg
&& !needsMore
) {
987 char *eom
= strchr(msg
, hlp
->eom
);
990 debugs(84, 3, "helperHandleRead: end of reply found");
991 if (eom
> msg
&& eom
[-1] == '\r' && hlp
->eom
== '\n') {
993 // rewind to the \r octet which is the real terminal now
994 // and remember that we have to skip forward 2 places now.
1001 if (!srv
->ignoreToEom
&& !srv
->replyXaction
) {
1003 if (hlp
->childs
.concurrency
) {
1005 i
= strtol(msg
, &e
, 10);
1006 // Do we need to check for e == msg? Means wrong response from helper.
1007 // Will be droped as "unexpected reply on channel 0"
1008 needsMore
= !(xisspace(*e
) || (eom
&& e
== eom
));
1011 while (*msg
&& xisspace(*msg
))
1013 } // else not enough data to compute request number
1015 if (!(srv
->replyXaction
= srv
->popRequest(i
))) {
1016 if (srv
->stats
.timedout
) {
1017 debugs(84, 3, "Timedout reply received for request-ID: " << i
<< " , ignore");
1019 debugs(84, DBG_IMPORTANT
, "helperHandleRead: unexpected reply on channel " <<
1020 i
<< " from " << hlp
->id_name
<< " #" << srv
->index
<<
1021 " '" << srv
->rbuf
<< "'");
1023 srv
->ignoreToEom
= true;
1025 } // else we need to just append reply data to the current Xaction
1028 size_t msgSize
= eom
? eom
- msg
: (srv
->roffset
- (msg
- srv
->rbuf
));
1029 assert(msgSize
<= srv
->rbuf_sz
);
1030 helperReturnBuffer(srv
, hlp
, msg
, msgSize
, eom
);
1031 msg
+= msgSize
+ skip
;
1032 assert(static_cast<size_t>(msg
- srv
->rbuf
) <= srv
->rbuf_sz
);
1034 // The next message should not ignored.
1035 if (eom
&& srv
->ignoreToEom
)
1036 srv
->ignoreToEom
= false;
1038 assert(skip
== 0 && eom
== NULL
);
1042 size_t msgSize
= (srv
->roffset
- (msg
- srv
->rbuf
));
1043 assert(msgSize
<= srv
->rbuf_sz
);
1044 memmove(srv
->rbuf
, msg
, msgSize
);
1045 srv
->roffset
= msgSize
;
1046 srv
->rbuf
[srv
->roffset
] = '\0';
1048 // All of the responses parsed and msg points at the end of read data
1049 assert(static_cast<size_t>(msg
- srv
->rbuf
) == srv
->roffset
);
1053 if (Comm::IsConnOpen(srv
->readPipe
) && !fd_table
[srv
->readPipe
->fd
].closing()) {
1054 int spaceSize
= srv
->rbuf_sz
- srv
->roffset
- 1;
1055 assert(spaceSize
>= 0);
1057 AsyncCall::Pointer call
= commCbCall(5,4, "helperHandleRead",
1058 CommIoCbPtrFun(helperHandleRead
, srv
));
1059 comm_read(srv
->readPipe
, srv
->rbuf
+ srv
->roffset
, spaceSize
, call
);
1064 helperStatefulHandleRead(const Comm::ConnectionPointer
&conn
, char *, size_t len
, Comm::Flag flag
, int, void *data
)
1067 helper_stateful_server
*srv
= (helper_stateful_server
*)data
;
1068 statefulhelper
*hlp
= srv
->parent
;
1069 assert(cbdataReferenceValid(data
));
1071 /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
1073 if (flag
== Comm::ERR_CLOSING
) {
1077 assert(conn
->fd
== srv
->readPipe
->fd
);
1079 debugs(84, 5, "helperStatefulHandleRead: " << len
<< " bytes from " <<
1080 hlp
->id_name
<< " #" << srv
->index
);
1082 if (flag
!= Comm::OK
|| len
== 0) {
1083 srv
->closePipesSafely(hlp
->id_name
);
1087 srv
->roffset
+= len
;
1088 srv
->rbuf
[srv
->roffset
] = '\0';
1089 Helper::Xaction
*r
= srv
->requests
.front();
1090 debugs(84, DBG_DATA
, Raw("accumulated", srv
->rbuf
, srv
->roffset
));
1093 /* someone spoke without being spoken to */
1094 debugs(84, DBG_IMPORTANT
, "helperStatefulHandleRead: unexpected read from " <<
1095 hlp
->id_name
<< " #" << srv
->index
<< ", " << (int)len
<<
1096 " bytes '" << srv
->rbuf
<< "'");
1101 if ((t
= strchr(srv
->rbuf
, hlp
->eom
))) {
1102 debugs(84, 3, "helperStatefulHandleRead: end of reply found");
1104 if (t
> srv
->rbuf
&& t
[-1] == '\r' && hlp
->eom
== '\n') {
1106 // rewind to the \r octet which is the real terminal now
1113 if (r
&& !r
->reply
.accumulate(srv
->rbuf
, t
? (t
- srv
->rbuf
) : srv
->roffset
)) {
1114 debugs(84, DBG_IMPORTANT
, "ERROR: Disconnecting from a " <<
1115 "helper that overflowed " << srv
->rbuf_sz
<< "-byte " <<
1116 "Squid input buffer: " << hlp
->id_name
<< " #" << srv
->index
);
1117 srv
->closePipesSafely(hlp
->id_name
);
1121 * BUG: the below assumes that only one response per read() was received and discards any octets remaining.
1122 * Doing this prohibits concurrency support with multiple replies per read().
1123 * TODO: check that read() setup on these buffers pays attention to roffest!=0
1124 * TODO: check that replies bigger than the buffer are discarded and do not to affect future replies
1129 /* end of reply found */
1130 srv
->requests
.pop_front(); // we already have it in 'r'
1133 if (r
&& cbdataReferenceValid(r
->request
.data
)) {
1134 r
->reply
.finalize();
1135 r
->reply
.whichServer
= srv
;
1136 r
->request
.callback(r
->request
.data
, r
->reply
);
1138 debugs(84, DBG_IMPORTANT
, "StatefulHandleRead: no callback data registered");
1144 -- srv
->stats
.pending
;
1145 ++ srv
->stats
.replies
;
1147 ++ hlp
->stats
.replies
;
1148 srv
->answer_time
= current_time
;
1149 hlp
->stats
.avg_svc_time
=
1150 Math::intAverage(hlp
->stats
.avg_svc_time
,
1151 tvSubMsec(srv
->dispatch_time
, current_time
),
1152 hlp
->stats
.replies
, REDIRECT_AV_FACTOR
);
1155 helperStatefulServerDone(srv
);
1157 helperStatefulReleaseServer(srv
);
1160 if (Comm::IsConnOpen(srv
->readPipe
) && !fd_table
[srv
->readPipe
->fd
].closing()) {
1161 int spaceSize
= srv
->rbuf_sz
- 1;
1163 AsyncCall::Pointer call
= commCbCall(5,4, "helperStatefulHandleRead",
1164 CommIoCbPtrFun(helperStatefulHandleRead
, srv
));
1165 comm_read(srv
->readPipe
, srv
->rbuf
, spaceSize
, call
);
1169 /// Handles a request when all running helpers, if any, are busy.
1171 Enqueue(helper
* hlp
, Helper::Xaction
* r
)
1174 ++ hlp
->stats
.queue_size
;
1176 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1177 if (hlp
->childs
.needNew() > 0) {
1178 debugs(84, DBG_CRITICAL
, "Starting new " << hlp
->id_name
<< " helpers...");
1179 helperOpenServers(hlp
);
1183 if (hlp
->stats
.queue_size
< (int)hlp
->childs
.queue_size
)
1186 if (squid_curtime
- hlp
->last_queue_warn
< 600)
1189 if (shutting_down
|| reconfiguring
)
1192 hlp
->last_queue_warn
= squid_curtime
;
1194 debugs(84, DBG_CRITICAL
, "WARNING: All " << hlp
->childs
.n_active
<< "/" << hlp
->childs
.n_max
<< " " << hlp
->id_name
<< " processes are busy.");
1195 debugs(84, DBG_CRITICAL
, "WARNING: " << hlp
->stats
.queue_size
<< " pending requests queued");
1196 debugs(84, DBG_CRITICAL
, "WARNING: Consider increasing the number of " << hlp
->id_name
<< " processes in your config file.");
1200 StatefulEnqueue(statefulhelper
* hlp
, Helper::Xaction
* r
)
1203 ++ hlp
->stats
.queue_size
;
1205 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1206 if (hlp
->childs
.needNew() > 0) {
1207 debugs(84, DBG_CRITICAL
, "Starting new " << hlp
->id_name
<< " helpers...");
1208 helperStatefulOpenServers(hlp
);
1212 if (hlp
->stats
.queue_size
< (int)hlp
->childs
.queue_size
)
1215 if (squid_curtime
- hlp
->last_queue_warn
< 600)
1218 if (shutting_down
|| reconfiguring
)
1221 hlp
->last_queue_warn
= squid_curtime
;
1223 debugs(84, DBG_CRITICAL
, "WARNING: All " << hlp
->childs
.n_active
<< "/" << hlp
->childs
.n_max
<< " " << hlp
->id_name
<< " processes are busy.");
1224 debugs(84, DBG_CRITICAL
, "WARNING: " << hlp
->stats
.queue_size
<< " pending requests queued");
1225 debugs(84, DBG_CRITICAL
, "WARNING: Consider increasing the number of " << hlp
->id_name
<< " processes in your config file.");
1229 helper::nextRequest()
1234 auto *r
= queue
.front();
1240 static helper_server
*
1241 GetFirstAvailable(const helper
* hlp
)
1245 helper_server
*selected
= NULL
;
1246 debugs(84, 5, "GetFirstAvailable: Running servers " << hlp
->childs
.n_running
);
1248 if (hlp
->childs
.n_running
== 0)
1251 /* Find "least" loaded helper (approx) */
1252 for (n
= hlp
->servers
.head
; n
!= NULL
; n
= n
->next
) {
1253 srv
= (helper_server
*)n
->data
;
1255 if (selected
&& selected
->stats
.pending
<= srv
->stats
.pending
)
1258 if (srv
->flags
.shutdown
)
1261 if (!srv
->stats
.pending
)
1273 debugs(84, 5, "GetFirstAvailable: None available.");
1277 if (selected
->stats
.pending
>= (hlp
->childs
.concurrency
? hlp
->childs
.concurrency
: 1)) {
1278 debugs(84, 3, "GetFirstAvailable: Least-loaded helper is fully loaded!");
1282 debugs(84, 5, "GetFirstAvailable: returning srv-" << selected
->index
);
1286 static helper_stateful_server
*
1287 StatefulGetFirstAvailable(const statefulhelper
* hlp
)
1290 helper_stateful_server
*srv
= NULL
;
1291 debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp
->childs
.n_running
);
1293 if (hlp
->childs
.n_running
== 0)
1296 for (n
= hlp
->servers
.head
; n
!= NULL
; n
= n
->next
) {
1297 srv
= (helper_stateful_server
*)n
->data
;
1299 if (srv
->stats
.pending
)
1302 if (srv
->flags
.reserved
)
1305 if (srv
->flags
.shutdown
)
1308 debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv
->index
);
1312 debugs(84, 5, "StatefulGetFirstAvailable: None available.");
1317 helperDispatchWriteDone(const Comm::ConnectionPointer
&, char *, size_t, Comm::Flag flag
, int, void *data
)
1319 helper_server
*srv
= (helper_server
*)data
;
1321 srv
->writebuf
->clean();
1322 delete srv
->writebuf
;
1323 srv
->writebuf
= NULL
;
1324 srv
->flags
.writing
= false;
1326 if (flag
!= Comm::OK
) {
1327 /* Helper server has crashed */
1328 debugs(84, DBG_CRITICAL
, "helperDispatch: Helper " << srv
->parent
->id_name
<< " #" << srv
->index
<< " has crashed");
1332 if (!srv
->wqueue
->isNull()) {
1333 srv
->writebuf
= srv
->wqueue
;
1334 srv
->wqueue
= new MemBuf
;
1335 srv
->flags
.writing
= true;
1336 AsyncCall::Pointer call
= commCbCall(5,5, "helperDispatchWriteDone",
1337 CommIoCbPtrFun(helperDispatchWriteDone
, srv
));
1338 Comm::Write(srv
->writePipe
, srv
->writebuf
->content(), srv
->writebuf
->contentSize(), call
, NULL
);
1343 helperDispatch(helper_server
* srv
, Helper::Xaction
* r
)
1345 helper
*hlp
= srv
->parent
;
1346 const uint64_t reqId
= ++srv
->nextRequestId
;
1348 if (!cbdataReferenceValid(r
->request
.data
)) {
1349 debugs(84, DBG_IMPORTANT
, "helperDispatch: invalid callback data");
1354 r
->request
.Id
= reqId
;
1355 helper_server::Requests::iterator it
= srv
->requests
.insert(srv
->requests
.end(), r
);
1356 r
->request
.dispatch_time
= current_time
;
1358 if (srv
->wqueue
->isNull())
1359 srv
->wqueue
->init();
1361 if (hlp
->childs
.concurrency
) {
1362 srv
->requestsIndex
.insert(helper_server::RequestIndex::value_type(reqId
, it
));
1363 assert(srv
->requestsIndex
.size() == srv
->requests
.size());
1364 srv
->wqueue
->appendf("%" PRIu64
" %s", reqId
, r
->request
.buf
);
1366 srv
->wqueue
->append(r
->request
.buf
, strlen(r
->request
.buf
));
1368 if (!srv
->flags
.writing
) {
1369 assert(NULL
== srv
->writebuf
);
1370 srv
->writebuf
= srv
->wqueue
;
1371 srv
->wqueue
= new MemBuf
;
1372 srv
->flags
.writing
= true;
1373 AsyncCall::Pointer call
= commCbCall(5,5, "helperDispatchWriteDone",
1374 CommIoCbPtrFun(helperDispatchWriteDone
, srv
));
1375 Comm::Write(srv
->writePipe
, srv
->writebuf
->content(), srv
->writebuf
->contentSize(), call
, NULL
);
1378 debugs(84, 5, "helperDispatch: Request sent to " << hlp
->id_name
<< " #" << srv
->index
<< ", " << strlen(r
->request
.buf
) << " bytes");
1381 ++ srv
->stats
.pending
;
1382 ++ hlp
->stats
.requests
;
1386 helperStatefulDispatchWriteDone(const Comm::ConnectionPointer
&, char *, size_t, Comm::Flag
, int, void *)
1390 helperStatefulDispatch(helper_stateful_server
* srv
, Helper::Xaction
* r
)
1392 statefulhelper
*hlp
= srv
->parent
;
1394 if (!cbdataReferenceValid(r
->request
.data
)) {
1395 debugs(84, DBG_IMPORTANT
, "helperStatefulDispatch: invalid callback data");
1397 helperStatefulReleaseServer(srv
);
1401 debugs(84, 9, "helperStatefulDispatch busying helper " << hlp
->id_name
<< " #" << srv
->index
);
1403 if (r
->request
.placeholder
== 1) {
1404 /* a callback is needed before this request can _use_ a helper. */
1405 /* we don't care about releasing this helper. The request NEVER
1406 * gets to the helper. So we throw away the return code */
1407 r
->reply
.result
= Helper::Unknown
;
1408 r
->reply
.whichServer
= srv
;
1409 r
->request
.callback(r
->request
.data
, r
->reply
);
1410 /* throw away the placeholder */
1412 /* and push the queue. Note that the callback may have submitted a new
1413 * request to the helper which is why we test for the request */
1415 if (!srv
->requests
.size())
1416 helperStatefulServerDone(srv
);
1421 srv
->flags
.reserved
= true;
1422 srv
->requests
.push_back(r
);
1423 srv
->dispatch_time
= current_time
;
1424 AsyncCall::Pointer call
= commCbCall(5,5, "helperStatefulDispatchWriteDone",
1425 CommIoCbPtrFun(helperStatefulDispatchWriteDone
, hlp
));
1426 Comm::Write(srv
->writePipe
, r
->request
.buf
, strlen(r
->request
.buf
), call
, NULL
);
1427 debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
1428 hlp
->id_name
<< " #" << srv
->index
<< ", " <<
1429 (int) strlen(r
->request
.buf
) << " bytes");
1432 ++ srv
->stats
.pending
;
1433 ++ hlp
->stats
.requests
;
1437 helperKickQueue(helper
* hlp
)
1442 while ((srv
= GetFirstAvailable(hlp
)) && (r
= hlp
->nextRequest()))
1443 helperDispatch(srv
, r
);
1447 helperStatefulKickQueue(statefulhelper
* hlp
)
1450 helper_stateful_server
*srv
;
1452 while ((srv
= StatefulGetFirstAvailable(hlp
)) && (r
= hlp
->nextRequest()))
1453 helperStatefulDispatch(srv
, r
);
1457 helperStatefulServerDone(helper_stateful_server
* srv
)
1459 if (!srv
->flags
.shutdown
) {
1460 helperStatefulKickQueue(srv
->parent
);
1461 } else if (!srv
->flags
.closing
&& !srv
->flags
.reserved
&& !srv
->stats
.pending
) {
1462 srv
->closeWritePipeSafely(srv
->parent
->id_name
);
1468 helper_server::checkForTimedOutRequests(bool const retry
)
1470 assert(parent
->childs
.concurrency
);
1471 while(!requests
.empty() && requests
.front()->request
.timedOut(parent
->timeout
)) {
1472 Helper::Xaction
*r
= requests
.front();
1473 RequestIndex::iterator it
;
1474 it
= requestsIndex
.find(r
->request
.Id
);
1475 assert(it
!= requestsIndex
.end());
1476 requestsIndex
.erase(it
);
1477 requests
.pop_front();
1478 debugs(84, 2, "Request " << r
->request
.Id
<< " timed-out, remove it from queue");
1480 bool retried
= false;
1481 if (retry
&& r
->request
.retries
< MAX_RETRIES
&& cbdataReferenceValid(r
->request
.data
)) {
1482 debugs(84, 2, "Retry request " << r
->request
.Id
);
1483 ++r
->request
.retries
;
1484 parent
->submitRequest(r
);
1486 } else if (cbdataReferenceValidDone(r
->request
.data
, &cbdata
)) {
1487 if (!parent
->onTimedOutResponse
.isEmpty()) {
1488 if (r
->reply
.accumulate(parent
->onTimedOutResponse
.rawContent(), parent
->onTimedOutResponse
.length()))
1489 r
->reply
.finalize();
1491 r
->reply
.result
= Helper::TimedOut
;
1492 r
->request
.callback(cbdata
, r
->reply
);
1494 r
->reply
.result
= Helper::TimedOut
;
1495 r
->request
.callback(cbdata
, r
->reply
);
1500 ++parent
->stats
.timedout
;
1507 helper_server::requestTimeout(const CommTimeoutCbParams
&io
)
1509 debugs(26, 3, HERE
<< io
.conn
);
1510 helper_server
*srv
= static_cast<helper_server
*>(io
.data
);
1512 if (!cbdataReferenceValid(srv
))
1515 srv
->checkForTimedOutRequests(srv
->parent
->retryTimedOut
);
1517 debugs(84, 3, HERE
<< io
.conn
<< " establish new helper_server::requestTimeout");
1518 AsyncCall::Pointer timeoutCall
= commCbCall(84, 4, "helper_server::requestTimeout",
1519 CommTimeoutCbPtrFun(helper_server::requestTimeout
, srv
));
1521 const int timeSpent
= srv
->requests
.empty() ? 0 : (squid_curtime
- srv
->requests
.front()->request
.dispatch_time
.tv_sec
);
1522 const int timeLeft
= max(1, (static_cast<int>(srv
->parent
->timeout
) - timeSpent
));
1524 commSetConnTimeout(io
.conn
, timeLeft
, timeoutCall
);