2 * Copyright (C) 1996-2016 The Squid Software Foundation and contributors
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
9 /* DEBUG: section 84 Helper process maintenance */
12 #include "base/AsyncCbdataCalls.h"
13 #include "base/Packable.h"
15 #include "comm/Connection.h"
16 #include "comm/Read.h"
17 #include "comm/Write.h"
20 #include "format/Quoting.h"
22 #include "helper/Reply.h"
23 #include "helper/Request.h"
25 #include "SquidConfig.h"
27 #include "SquidMath.h"
28 #include "SquidTime.h"
32 // helper_stateful_server::data uses explicit alloc()/freeOne() */
35 #define HELPER_MAX_ARGS 64
37 /// The maximum allowed request retries.
40 /** Initial Squid input buffer size. Helper responses may exceed this, and
41 * Squid will grow the input buffer as needed, up to ReadBufMaxSize.
43 const size_t ReadBufMinSize(4*1024);
45 /** Maximum safe size of a helper-to-Squid response message plus one.
46 * Squid will warn and close the stream if a helper sends a too-big response.
47 * ssl_crtd helper is known to produce responses of at least 10KB in size.
48 * Some undocumented helpers are known to produce responses exceeding 8KB.
50 const size_t ReadBufMaxSize(32*1024);
52 static IOCB helperHandleRead
;
53 static IOCB helperStatefulHandleRead
;
54 static void helperServerFree(helper_server
*srv
);
55 static void helperStatefulServerFree(helper_stateful_server
*srv
);
56 static void Enqueue(helper
* hlp
, Helper::Request
*);
57 static Helper::Request
*Dequeue(helper
* hlp
);
58 static Helper::Request
*StatefulDequeue(statefulhelper
* hlp
);
59 static helper_server
*GetFirstAvailable(helper
* hlp
);
60 static helper_stateful_server
*StatefulGetFirstAvailable(statefulhelper
* hlp
);
61 static void helperDispatch(helper_server
* srv
, Helper::Request
* r
);
62 static void helperStatefulDispatch(helper_stateful_server
* srv
, Helper::Request
* r
);
63 static void helperKickQueue(helper
* hlp
);
64 static void helperStatefulKickQueue(statefulhelper
* hlp
);
65 static void helperStatefulServerDone(helper_stateful_server
* srv
);
66 static void StatefulEnqueue(statefulhelper
* hlp
, Helper::Request
* r
);
68 CBDATA_CLASS_INIT(helper
);
69 CBDATA_CLASS_INIT(helper_server
);
70 CBDATA_CLASS_INIT(statefulhelper
);
71 CBDATA_CLASS_INIT(helper_stateful_server
);
73 InstanceIdDefinitions(HelperServerBase
, "Hlpr");
76 HelperServerBase::initStats()
86 HelperServerBase::closePipesSafely(const char *id_name
)
89 shutdown(writePipe
->fd
, SD_BOTH
);
93 if (readPipe
->fd
== writePipe
->fd
)
101 if (WaitForSingleObject(hIpc
, 5000) != WAIT_OBJECT_0
) {
103 debugs(84, DBG_IMPORTANT
, "WARNING: " << id_name
<<
104 " #" << index
<< " (PID " << (long int)pid
<< ") didn't exit in 5 seconds");
112 HelperServerBase::closeWritePipeSafely(const char *id_name
)
115 shutdown(writePipe
->fd
, (readPipe
->fd
== writePipe
->fd
? SD_BOTH
: SD_SEND
));
118 flags
.closing
= true;
119 if (readPipe
->fd
== writePipe
->fd
)
125 if (WaitForSingleObject(hIpc
, 5000) != WAIT_OBJECT_0
) {
127 debugs(84, DBG_IMPORTANT
, "WARNING: " << id_name
<<
128 " #" << index
<< " (PID " << (long int)pid
<< ") didn't exit in 5 seconds");
136 helperOpenServers(helper
* hlp
)
142 const char *args
[HELPER_MAX_ARGS
+1]; // save space for a NULL terminator
143 char fd_note_buf
[FD_DESC_SZ
];
153 if (hlp
->cmdline
== NULL
)
156 progname
= hlp
->cmdline
->key
;
158 if ((s
= strrchr(progname
, '/')))
159 shortname
= xstrdup(s
+ 1);
161 shortname
= xstrdup(progname
);
163 /* figure out how many new child are actually needed. */
164 int need_new
= hlp
->childs
.needNew();
166 debugs(84, DBG_IMPORTANT
, "helperOpenServers: Starting " << need_new
<< "/" << hlp
->childs
.n_max
<< " '" << shortname
<< "' processes");
169 debugs(84, DBG_IMPORTANT
, "helperOpenServers: No '" << shortname
<< "' processes needed.");
172 procname
= (char *)xmalloc(strlen(shortname
) + 3);
174 snprintf(procname
, strlen(shortname
) + 3, "(%s)", shortname
);
176 args
[nargs
] = procname
;
179 for (w
= hlp
->cmdline
->next
; w
&& nargs
< HELPER_MAX_ARGS
; w
= w
->next
) {
180 args
[nargs
] = w
->key
;
187 assert(nargs
<= HELPER_MAX_ARGS
);
189 for (k
= 0; k
< need_new
; ++k
) {
192 pid
= ipcCreate(hlp
->ipc_type
,
202 debugs(84, DBG_IMPORTANT
, "WARNING: Cannot run '" << progname
<< "' process.");
206 ++ hlp
->childs
.n_running
;
207 ++ hlp
->childs
.n_active
;
208 srv
= new helper_server
;
212 srv
->addr
= hlp
->addr
;
213 srv
->readPipe
= new Comm::Connection
;
214 srv
->readPipe
->fd
= rfd
;
215 srv
->writePipe
= new Comm::Connection
;
216 srv
->writePipe
->fd
= wfd
;
217 srv
->rbuf
= (char *)memAllocBuf(ReadBufMinSize
, &srv
->rbuf_sz
);
218 srv
->wqueue
= new MemBuf
;
220 srv
->nextRequestId
= 0;
221 srv
->parent
= cbdataReference(hlp
);
222 dlinkAddTail(srv
, &srv
->link
, &hlp
->servers
);
225 snprintf(fd_note_buf
, FD_DESC_SZ
, "%s #%d", shortname
, k
+ 1);
226 fd_note(rfd
, fd_note_buf
);
228 snprintf(fd_note_buf
, FD_DESC_SZ
, "reading %s #%d", shortname
, k
+ 1);
229 fd_note(rfd
, fd_note_buf
);
230 snprintf(fd_note_buf
, FD_DESC_SZ
, "writing %s #%d", shortname
, k
+ 1);
231 fd_note(wfd
, fd_note_buf
);
234 commSetNonBlocking(rfd
);
237 commSetNonBlocking(wfd
);
239 AsyncCall::Pointer closeCall
= asyncCall(5,4, "helperServerFree", cbdataDialer(helperServerFree
, srv
));
240 comm_add_close_handler(rfd
, closeCall
);
242 if (hlp
->timeout
&& hlp
->childs
.concurrency
) {
243 AsyncCall::Pointer timeoutCall
= commCbCall(84, 4, "helper_server::requestTimeout",
244 CommTimeoutCbPtrFun(helper_server::requestTimeout
, srv
));
245 commSetConnTimeout(srv
->readPipe
, hlp
->timeout
, timeoutCall
);
248 AsyncCall::Pointer call
= commCbCall(5,4, "helperHandleRead",
249 CommIoCbPtrFun(helperHandleRead
, srv
));
250 comm_read(srv
->readPipe
, srv
->rbuf
, srv
->rbuf_sz
- 1, call
);
253 hlp
->last_restart
= squid_curtime
;
254 safe_free(shortname
);
256 helperKickQueue(hlp
);
262 * helperStatefulOpenServers: create the stateful child helper processes
265 helperStatefulOpenServers(statefulhelper
* hlp
)
268 const char *args
[HELPER_MAX_ARGS
+1]; // save space for a NULL terminator
269 char fd_note_buf
[FD_DESC_SZ
];
272 if (hlp
->cmdline
== NULL
)
275 if (hlp
->childs
.concurrency
)
276 debugs(84, DBG_CRITICAL
, "ERROR: concurrency= is not yet supported for stateful helpers ('" << hlp
->cmdline
<< "')");
278 char *progname
= hlp
->cmdline
->key
;
281 if ((s
= strrchr(progname
, '/')))
282 shortname
= xstrdup(s
+ 1);
284 shortname
= xstrdup(progname
);
286 /* figure out haw mant new helpers are needed. */
287 int need_new
= hlp
->childs
.needNew();
289 debugs(84, DBG_IMPORTANT
, "helperOpenServers: Starting " << need_new
<< "/" << hlp
->childs
.n_max
<< " '" << shortname
<< "' processes");
292 debugs(84, DBG_IMPORTANT
, "helperStatefulOpenServers: No '" << shortname
<< "' processes needed.");
295 char *procname
= (char *)xmalloc(strlen(shortname
) + 3);
297 snprintf(procname
, strlen(shortname
) + 3, "(%s)", shortname
);
299 args
[nargs
] = procname
;
302 for (wordlist
*w
= hlp
->cmdline
->next
; w
&& nargs
< HELPER_MAX_ARGS
; w
= w
->next
) {
303 args
[nargs
] = w
->key
;
310 assert(nargs
<= HELPER_MAX_ARGS
);
312 for (int k
= 0; k
< need_new
; ++k
) {
317 pid_t pid
= ipcCreate(hlp
->ipc_type
,
327 debugs(84, DBG_IMPORTANT
, "WARNING: Cannot run '" << progname
<< "' process.");
331 ++ hlp
->childs
.n_running
;
332 ++ hlp
->childs
.n_active
;
333 helper_stateful_server
*srv
= new helper_stateful_server
;
336 srv
->flags
.reserved
= false;
338 srv
->addr
= hlp
->addr
;
339 srv
->readPipe
= new Comm::Connection
;
340 srv
->readPipe
->fd
= rfd
;
341 srv
->writePipe
= new Comm::Connection
;
342 srv
->writePipe
->fd
= wfd
;
343 srv
->rbuf
= (char *)memAllocBuf(ReadBufMinSize
, &srv
->rbuf_sz
);
345 srv
->parent
= cbdataReference(hlp
);
347 if (hlp
->datapool
!= NULL
)
348 srv
->data
= hlp
->datapool
->alloc();
350 dlinkAddTail(srv
, &srv
->link
, &hlp
->servers
);
353 snprintf(fd_note_buf
, FD_DESC_SZ
, "%s #%d", shortname
, k
+ 1);
354 fd_note(rfd
, fd_note_buf
);
356 snprintf(fd_note_buf
, FD_DESC_SZ
, "reading %s #%d", shortname
, k
+ 1);
357 fd_note(rfd
, fd_note_buf
);
358 snprintf(fd_note_buf
, FD_DESC_SZ
, "writing %s #%d", shortname
, k
+ 1);
359 fd_note(wfd
, fd_note_buf
);
362 commSetNonBlocking(rfd
);
365 commSetNonBlocking(wfd
);
367 AsyncCall::Pointer closeCall
= asyncCall(5,4, "helperStatefulServerFree", cbdataDialer(helperStatefulServerFree
, srv
));
368 comm_add_close_handler(rfd
, closeCall
);
370 AsyncCall::Pointer call
= commCbCall(5,4, "helperStatefulHandleRead",
371 CommIoCbPtrFun(helperStatefulHandleRead
, srv
));
372 comm_read(srv
->readPipe
, srv
->rbuf
, srv
->rbuf_sz
- 1, call
);
375 hlp
->last_restart
= squid_curtime
;
376 safe_free(shortname
);
378 helperStatefulKickQueue(hlp
);
382 helper::submitRequest(Helper::Request
*r
)
386 if ((srv
= GetFirstAvailable(this)))
387 helperDispatch(srv
, r
);
393 } else if (!full_time
) {
394 debugs(84, 3, id_name
<< " queue became full");
395 full_time
= squid_curtime
;
400 helperSubmit(helper
* hlp
, const char *buf
, HLPCB
* callback
, void *data
)
403 debugs(84, 3, "helperSubmit: hlp == NULL");
404 Helper::Reply nilReply
;
405 callback(data
, nilReply
);
409 hlp
->submit(buf
, callback
, data
);
413 helper::queueFull() const {
414 return stats
.queue_size
> static_cast<int>(childs
.queue_size
);
417 /// prepares the helper for request submission via trySubmit() or helperSubmit()
418 /// currently maintains full_time and kills Squid if the helper remains full for too long
424 else if (!full_time
) // may happen here if reconfigure decreases capacity
425 full_time
= squid_curtime
;
426 else if (squid_curtime
- full_time
> 180)
427 fatalf("Too many queued %s requests", id_name
);
431 helper::trySubmit(const char *buf
, HLPCB
* callback
, void *data
)
436 debugs(84, DBG_IMPORTANT
, id_name
<< " drops request due to a full queue");
437 return false; // request was ignored
440 submit(buf
, callback
, data
); // will send or queue
441 return true; // request submitted or queued
444 /// dispatches or enqueues a helper requests; does not enforce queue limits
446 helper::submit(const char *buf
, HLPCB
* callback
, void *data
)
448 Helper::Request
*r
= new Helper::Request(callback
, data
, buf
);
450 debugs(84, DBG_DATA
, Raw("buf", buf
, strlen(buf
)));
453 /// lastserver = "server last used as part of a reserved request sequence"
455 helperStatefulSubmit(statefulhelper
* hlp
, const char *buf
, HLPCB
* callback
, void *data
, helper_stateful_server
* lastserver
)
458 debugs(84, 3, "helperStatefulSubmit: hlp == NULL");
459 Helper::Reply nilReply
;
460 callback(data
, nilReply
);
464 hlp
->submit(buf
, callback
, data
, lastserver
);
467 void statefulhelper::submit(const char *buf
, HLPCB
* callback
, void *data
, helper_stateful_server
* lastserver
)
469 Helper::Request
*r
= new Helper::Request(callback
, data
, buf
);
471 if ((buf
!= NULL
) && lastserver
) {
472 debugs(84, 5, "StatefulSubmit with lastserver " << lastserver
);
473 assert(lastserver
->flags
.reserved
);
474 assert(!lastserver
->requests
.size());
476 debugs(84, 5, "StatefulSubmit dispatching");
477 helperStatefulDispatch(lastserver
, r
);
479 helper_stateful_server
*srv
;
480 if ((srv
= StatefulGetFirstAvailable(this))) {
481 helperStatefulDispatch(srv
, r
);
483 StatefulEnqueue(this, r
);
486 debugs(84, DBG_DATA
, "placeholder: '" << r
->placeholder
<<
487 "', " << Raw("buf", buf
, (!buf
?0:strlen(buf
))));
491 } else if (!full_time
) {
492 debugs(84, 3, id_name
<< " queue became full");
493 full_time
= squid_curtime
;
500 * helperStatefulReleaseServer tells the helper that whoever was
501 * using it no longer needs its services.
504 helperStatefulReleaseServer(helper_stateful_server
* srv
)
506 debugs(84, 3, HERE
<< "srv-" << srv
->index
<< " flags.reserved = " << srv
->flags
.reserved
);
507 if (!srv
->flags
.reserved
)
510 ++ srv
->stats
.releases
;
512 srv
->flags
.reserved
= false;
514 helperStatefulServerDone(srv
);
517 /** return a pointer to the stateful routines data area */
519 helperStatefulServerGetData(helper_stateful_server
* srv
)
525 helper::packStatsInto(Packable
*p
, const char *label
) const
528 p
->appendf("%s:\n", label
);
530 p
->appendf(" program: %s\n", cmdline
->key
);
531 p
->appendf(" number active: %d of %d (%d shutting down)\n", childs
.n_active
, childs
.n_max
, (childs
.n_running
- childs
.n_active
));
532 p
->appendf(" requests sent: %d\n", stats
.requests
);
533 p
->appendf(" replies received: %d\n", stats
.replies
);
534 p
->appendf(" requests timedout: %d\n", stats
.timedout
);
535 p
->appendf(" queue length: %d\n", stats
.queue_size
);
536 p
->appendf(" avg service time: %d msec\n", stats
.avg_svc_time
);
538 p
->appendf("%7s\t%7s\t%7s\t%11s\t%11s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
550 for (dlink_node
*link
= servers
.head
; link
; link
= link
->next
) {
551 HelperServerBase
*srv
= static_cast<HelperServerBase
*>(link
->data
);
553 Helper::Request
*request
= srv
->requests
.empty() ? NULL
: srv
->requests
.front();
554 double tt
= 0.001 * (request
? tvSubMsec(request
->dispatch_time
, current_time
) : tvSubMsec(srv
->dispatch_time
, srv
->answer_time
));
555 p
->appendf("%7u\t%7d\t%7d\t%11" PRIu64
"\t%11" PRIu64
"\t%11" PRIu64
"\t%c%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
562 srv
->stats
.pending
? 'B' : ' ',
563 srv
->flags
.writing
? 'W' : ' ',
564 srv
->flags
.closing
? 'C' : ' ',
565 srv
->flags
.reserved
? 'R' : ' ',
566 srv
->flags
.shutdown
? 'S' : ' ',
567 request
&& request
->placeholder
? 'P' : ' ',
570 request
? Format::QuoteMimeBlob(request
->buf
) : "(none)");
573 p
->append("\nFlags key:\n"
578 " S\tSHUTDOWN PENDING\n"
579 " P\tPLACEHOLDER\n", 101);
583 helperShutdown(helper
* hlp
)
585 dlink_node
*link
= hlp
->servers
.head
;
589 srv
= (helper_server
*)link
->data
;
592 if (srv
->flags
.shutdown
) {
593 debugs(84, 3, "helperShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " has already SHUT DOWN.");
597 assert(hlp
->childs
.n_active
> 0);
598 -- hlp
->childs
.n_active
;
599 srv
->flags
.shutdown
= true; /* request it to shut itself down */
601 if (srv
->flags
.closing
) {
602 debugs(84, 3, "helperShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " is CLOSING.");
606 if (srv
->stats
.pending
) {
607 debugs(84, 3, "helperShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " is BUSY.");
611 debugs(84, 3, "helperShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " shutting down.");
612 /* the rest of the details is dealt with in the helperServerFree
615 srv
->closePipesSafely(hlp
->id_name
);
620 helperStatefulShutdown(statefulhelper
* hlp
)
622 dlink_node
*link
= hlp
->servers
.head
;
623 helper_stateful_server
*srv
;
626 srv
= (helper_stateful_server
*)link
->data
;
629 if (srv
->flags
.shutdown
) {
630 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " has already SHUT DOWN.");
634 assert(hlp
->childs
.n_active
> 0);
635 -- hlp
->childs
.n_active
;
636 srv
->flags
.shutdown
= true; /* request it to shut itself down */
638 if (srv
->stats
.pending
) {
639 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " is BUSY.");
643 if (srv
->flags
.closing
) {
644 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " is CLOSING.");
648 if (srv
->flags
.reserved
) {
650 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " is RESERVED. Closing anyway.");
652 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " is RESERVED. Not Shutting Down Yet.");
657 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " shutting down.");
659 /* the rest of the details is dealt with in the helperStatefulServerFree
662 srv
->closePipesSafely(hlp
->id_name
);
668 /* note, don't free id_name, it probably points to static memory */
671 debugs(84, DBG_CRITICAL
, "WARNING: freeing " << id_name
<< " helper with " << stats
.queue_size
<< " requests queued");
674 /* ====================================================================== */
675 /* LOCAL FUNCTIONS */
676 /* ====================================================================== */
679 helperServerFree(helper_server
*srv
)
681 helper
*hlp
= srv
->parent
;
682 int concurrency
= hlp
->childs
.concurrency
;
688 memFreeBuf(srv
->rbuf_sz
, srv
->rbuf
);
692 srv
->wqueue
->clean();
696 srv
->writebuf
->clean();
697 delete srv
->writebuf
;
698 srv
->writebuf
= NULL
;
701 if (Comm::IsConnOpen(srv
->writePipe
))
702 srv
->closeWritePipeSafely(hlp
->id_name
);
704 dlinkDelete(&srv
->link
, &hlp
->servers
);
706 assert(hlp
->childs
.n_running
> 0);
707 -- hlp
->childs
.n_running
;
709 if (!srv
->flags
.shutdown
) {
710 assert(hlp
->childs
.n_active
> 0);
711 -- hlp
->childs
.n_active
;
712 debugs(84, DBG_CRITICAL
, "WARNING: " << hlp
->id_name
<< " #" << srv
->index
<< " exited");
714 if (hlp
->childs
.needNew() > 0) {
715 debugs(80, DBG_IMPORTANT
, "Too few " << hlp
->id_name
<< " processes are running (need " << hlp
->childs
.needNew() << "/" << hlp
->childs
.n_max
<< ")");
717 if (hlp
->childs
.n_active
< hlp
->childs
.n_startup
&& hlp
->last_restart
> squid_curtime
- 30) {
718 if (srv
->stats
.replies
< 1)
719 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp
->id_name
);
721 debugs(80, DBG_CRITICAL
, "ERROR: The " << hlp
->id_name
<< " helpers are crashing too rapidly, need help!");
724 debugs(80, DBG_IMPORTANT
, "Starting new helpers");
725 helperOpenServers(hlp
);
729 while (!srv
->requests
.empty()) {
730 // XXX: re-schedule these on another helper?
731 Helper::Request
*r
= srv
->requests
.front();
732 srv
->requests
.pop_front();
735 if (cbdataReferenceValidDone(r
->data
, &cbdata
)) {
736 Helper::Reply nilReply
;
737 r
->callback(cbdata
, nilReply
);
742 srv
->requestsIndex
.clear();
744 cbdataReferenceDone(srv
->parent
);
749 helperStatefulServerFree(helper_stateful_server
*srv
)
751 statefulhelper
*hlp
= srv
->parent
;
754 memFreeBuf(srv
->rbuf_sz
, srv
->rbuf
);
759 srv
->wqueue
->clean();
765 /* TODO: walk the local queue of requests and carry them all out */
766 if (Comm::IsConnOpen(srv
->writePipe
))
767 srv
->closeWritePipeSafely(hlp
->id_name
);
769 dlinkDelete(&srv
->link
, &hlp
->servers
);
771 assert(hlp
->childs
.n_running
> 0);
772 -- hlp
->childs
.n_running
;
774 if (!srv
->flags
.shutdown
) {
775 assert( hlp
->childs
.n_active
> 0);
776 -- hlp
->childs
.n_active
;
777 debugs(84, DBG_CRITICAL
, "WARNING: " << hlp
->id_name
<< " #" << srv
->index
<< " exited");
779 if (hlp
->childs
.needNew() > 0) {
780 debugs(80, DBG_IMPORTANT
, "Too few " << hlp
->id_name
<< " processes are running (need " << hlp
->childs
.needNew() << "/" << hlp
->childs
.n_max
<< ")");
782 if (hlp
->childs
.n_active
< hlp
->childs
.n_startup
&& hlp
->last_restart
> squid_curtime
- 30) {
783 if (srv
->stats
.replies
< 1)
784 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp
->id_name
);
786 debugs(80, DBG_CRITICAL
, "ERROR: The " << hlp
->id_name
<< " helpers are crashing too rapidly, need help!");
789 debugs(80, DBG_IMPORTANT
, "Starting new helpers");
790 helperStatefulOpenServers(hlp
);
794 while (!srv
->requests
.empty()) {
795 // XXX: re-schedule these on another helper?
796 Helper::Request
*r
= srv
->requests
.front();
797 srv
->requests
.pop_front();
800 if (cbdataReferenceValidDone(r
->data
, &cbdata
)) {
801 Helper::Reply nilReply
;
802 r
->callback(cbdata
, nilReply
);
808 if (srv
->data
!= NULL
)
809 hlp
->datapool
->freeOne(srv
->data
);
811 cbdataReferenceDone(srv
->parent
);
816 /// Calls back with a pointer to the buffer with the helper output
818 helperReturnBuffer(int request_number
, helper_server
* srv
, helper
* hlp
, char * msg
, char * msg_end
)
820 Helper::Request
*r
= NULL
;
821 helper_server::RequestIndex::iterator it
;
822 if (hlp
->childs
.concurrency
) {
823 // If concurency supported retrieve request from ID
824 it
= srv
->requestsIndex
.find(request_number
);
825 if (it
!= srv
->requestsIndex
.end()) {
827 srv
->requests
.erase(it
->second
);
828 srv
->requestsIndex
.erase(it
);
830 } else if(!srv
->requests
.empty()) {
831 // Else get the first request from queue, if any
832 r
= srv
->requests
.front();
833 srv
->requests
.pop_front();
837 HLPCB
*callback
= r
->callback
;
842 if (cbdataReferenceValidDone(r
->data
, &cbdata
)) {
843 Helper::Reply
response(msg
, (msg_end
-msg
));
844 if (response
.result
== Helper::BrokenHelper
&& r
->retries
< MAX_RETRIES
) {
845 debugs(84, DBG_IMPORTANT
, "ERROR: helper: " << response
<< ", attempt #" << (r
->retries
+ 1) << " of 2");
848 callback(cbdata
, response
);
851 -- srv
->stats
.pending
;
852 ++ srv
->stats
.replies
;
854 ++ hlp
->stats
.replies
;
856 srv
->answer_time
= current_time
;
858 srv
->dispatch_time
= r
->dispatch_time
;
860 hlp
->stats
.avg_svc_time
=
861 Math::intAverage(hlp
->stats
.avg_svc_time
,
862 tvSubMsec(r
->dispatch_time
, current_time
),
863 hlp
->stats
.replies
, REDIRECT_AV_FACTOR
);
867 hlp
->submitRequest(r
);
870 } else if (srv
->stats
.timedout
) {
871 debugs(84, 3, "Timedout reply received for request-ID: " << request_number
<< " , ignore");
873 debugs(84, DBG_IMPORTANT
, "helperHandleRead: unexpected reply on channel " <<
874 request_number
<< " from " << hlp
->id_name
<< " #" << srv
->index
<<
875 " '" << srv
->rbuf
<< "'");
878 if (hlp
->timeout
&& hlp
->childs
.concurrency
)
879 srv
->checkForTimedOutRequests(hlp
->retryTimedOut
);
881 if (!srv
->flags
.shutdown
) {
882 helperKickQueue(hlp
);
883 } else if (!srv
->flags
.closing
&& !srv
->stats
.pending
) {
884 srv
->flags
.closing
=true;
885 srv
->writePipe
->close();
890 helperHandleRead(const Comm::ConnectionPointer
&conn
, char *, size_t len
, Comm::Flag flag
, int, void *data
)
893 helper_server
*srv
= (helper_server
*)data
;
894 helper
*hlp
= srv
->parent
;
895 assert(cbdataReferenceValid(data
));
897 /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
899 if (flag
== Comm::ERR_CLOSING
) {
903 assert(conn
->fd
== srv
->readPipe
->fd
);
905 debugs(84, 5, "helperHandleRead: " << len
<< " bytes from " << hlp
->id_name
<< " #" << srv
->index
);
907 if (flag
!= Comm::OK
|| len
== 0) {
908 srv
->closePipesSafely(hlp
->id_name
);
913 srv
->rbuf
[srv
->roffset
] = '\0';
914 debugs(84, DBG_DATA
, Raw("accumulated", srv
->rbuf
, srv
->roffset
));
916 if (!srv
->stats
.pending
&& !srv
->stats
.timedout
) {
917 /* someone spoke without being spoken to */
918 debugs(84, DBG_IMPORTANT
, "helperHandleRead: unexpected read from " <<
919 hlp
->id_name
<< " #" << srv
->index
<< ", " << (int)len
<<
920 " bytes '" << srv
->rbuf
<< "'");
926 while ((t
= strchr(srv
->rbuf
, hlp
->eom
))) {
927 /* end of reply found */
928 char *msg
= srv
->rbuf
;
931 debugs(84, 3, "helperHandleRead: end of reply found");
933 if (t
> srv
->rbuf
&& t
[-1] == '\r' && hlp
->eom
== '\n') {
935 // rewind to the \r octet which is the real terminal now
936 // and remember that we have to skip forward 2 places now.
943 if (hlp
->childs
.concurrency
) {
944 i
= strtol(msg
, &msg
, 10);
946 while (*msg
&& xisspace(*msg
))
950 helperReturnBuffer(i
, srv
, hlp
, msg
, t
);
951 srv
->roffset
-= (t
- srv
->rbuf
) + skip
;
952 memmove(srv
->rbuf
, t
+ skip
, srv
->roffset
);
953 srv
->rbuf
[srv
->roffset
] = '\0';
956 if (Comm::IsConnOpen(srv
->readPipe
) && !fd_table
[srv
->readPipe
->fd
].closing()) {
957 int spaceSize
= srv
->rbuf_sz
- srv
->roffset
- 1;
958 assert(spaceSize
>= 0);
960 // grow the input buffer if needed and possible
961 if (!spaceSize
&& srv
->rbuf_sz
+ 4096 <= ReadBufMaxSize
) {
962 srv
->rbuf
= (char *)memReallocBuf(srv
->rbuf
, srv
->rbuf_sz
+ 4096, &srv
->rbuf_sz
);
963 debugs(84, 3, HERE
<< "Grew read buffer to " << srv
->rbuf_sz
);
964 spaceSize
= srv
->rbuf_sz
- srv
->roffset
- 1;
965 assert(spaceSize
>= 0);
968 // quit reading if there is no space left
970 debugs(84, DBG_IMPORTANT
, "ERROR: Disconnecting from a " <<
971 "helper that overflowed " << srv
->rbuf_sz
<< "-byte " <<
972 "Squid input buffer: " << hlp
->id_name
<< " #" << srv
->index
);
973 srv
->closePipesSafely(hlp
->id_name
);
977 AsyncCall::Pointer call
= commCbCall(5,4, "helperHandleRead",
978 CommIoCbPtrFun(helperHandleRead
, srv
));
979 comm_read(srv
->readPipe
, srv
->rbuf
+ srv
->roffset
, spaceSize
, call
);
984 helperStatefulHandleRead(const Comm::ConnectionPointer
&conn
, char *, size_t len
, Comm::Flag flag
, int, void *data
)
987 helper_stateful_server
*srv
= (helper_stateful_server
*)data
;
988 statefulhelper
*hlp
= srv
->parent
;
989 assert(cbdataReferenceValid(data
));
991 /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
993 if (flag
== Comm::ERR_CLOSING
) {
997 assert(conn
->fd
== srv
->readPipe
->fd
);
999 debugs(84, 5, "helperStatefulHandleRead: " << len
<< " bytes from " <<
1000 hlp
->id_name
<< " #" << srv
->index
);
1002 if (flag
!= Comm::OK
|| len
== 0) {
1003 srv
->closePipesSafely(hlp
->id_name
);
1007 srv
->roffset
+= len
;
1008 srv
->rbuf
[srv
->roffset
] = '\0';
1009 Helper::Request
*r
= srv
->requests
.front();
1010 debugs(84, DBG_DATA
, Raw("accumulated", srv
->rbuf
, srv
->roffset
));
1013 /* someone spoke without being spoken to */
1014 debugs(84, DBG_IMPORTANT
, "helperStatefulHandleRead: unexpected read from " <<
1015 hlp
->id_name
<< " #" << srv
->index
<< ", " << (int)len
<<
1016 " bytes '" << srv
->rbuf
<< "'");
1021 if ((t
= strchr(srv
->rbuf
, hlp
->eom
))) {
1022 /* end of reply found */
1023 srv
->requests
.pop_front(); // we already have it in 'r'
1026 debugs(84, 3, "helperStatefulHandleRead: end of reply found");
1028 if (t
> srv
->rbuf
&& t
[-1] == '\r' && hlp
->eom
== '\n') {
1030 // rewind to the \r octet which is the real terminal now
1031 // and remember that we have to skip forward 2 places now.
1038 if (r
&& cbdataReferenceValid(r
->data
)) {
1039 Helper::Reply
res(srv
->rbuf
, (t
- srv
->rbuf
));
1040 res
.whichServer
= srv
;
1041 r
->callback(r
->data
, res
);
1043 debugs(84, DBG_IMPORTANT
, "StatefulHandleRead: no callback data registered");
1046 // only skip off the \0's _after_ passing its location in Helper::Reply above
1050 * BUG: the below assumes that only one response per read() was received and discards any octets remaining.
1051 * Doing this prohibits concurrency support with multiple replies per read().
1052 * TODO: check that read() setup on these buffers pays attention to roffest!=0
1053 * TODO: check that replies bigger than the buffer are discarded and do not to affect future replies
1058 -- srv
->stats
.pending
;
1059 ++ srv
->stats
.replies
;
1061 ++ hlp
->stats
.replies
;
1062 srv
->answer_time
= current_time
;
1063 hlp
->stats
.avg_svc_time
=
1064 Math::intAverage(hlp
->stats
.avg_svc_time
,
1065 tvSubMsec(srv
->dispatch_time
, current_time
),
1066 hlp
->stats
.replies
, REDIRECT_AV_FACTOR
);
1069 helperStatefulServerDone(srv
);
1071 helperStatefulReleaseServer(srv
);
1074 if (Comm::IsConnOpen(srv
->readPipe
) && !fd_table
[srv
->readPipe
->fd
].closing()) {
1075 int spaceSize
= srv
->rbuf_sz
- srv
->roffset
- 1;
1076 assert(spaceSize
>= 0);
1078 // grow the input buffer if needed and possible
1079 if (!spaceSize
&& srv
->rbuf_sz
+ 4096 <= ReadBufMaxSize
) {
1080 srv
->rbuf
= (char *)memReallocBuf(srv
->rbuf
, srv
->rbuf_sz
+ 4096, &srv
->rbuf_sz
);
1081 debugs(84, 3, HERE
<< "Grew read buffer to " << srv
->rbuf_sz
);
1082 spaceSize
= srv
->rbuf_sz
- srv
->roffset
- 1;
1083 assert(spaceSize
>= 0);
1086 // quit reading if there is no space left
1088 debugs(84, DBG_IMPORTANT
, "ERROR: Disconnecting from a " <<
1089 "helper that overflowed " << srv
->rbuf_sz
<< "-byte " <<
1090 "Squid input buffer: " << hlp
->id_name
<< " #" << srv
->index
);
1091 srv
->closePipesSafely(hlp
->id_name
);
1095 AsyncCall::Pointer call
= commCbCall(5,4, "helperStatefulHandleRead",
1096 CommIoCbPtrFun(helperStatefulHandleRead
, srv
));
1097 comm_read(srv
->readPipe
, srv
->rbuf
+ srv
->roffset
, spaceSize
, call
);
1101 /// Handles a request when all running helpers, if any, are busy.
1103 Enqueue(helper
* hlp
, Helper::Request
* r
)
1105 dlink_node
*link
= (dlink_node
*)memAllocate(MEM_DLINK_NODE
);
1106 dlinkAddTail(r
, link
, &hlp
->queue
);
1107 ++ hlp
->stats
.queue_size
;
1109 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1110 if (hlp
->childs
.needNew() > 0) {
1111 debugs(84, DBG_CRITICAL
, "Starting new " << hlp
->id_name
<< " helpers...");
1112 helperOpenServers(hlp
);
1116 if (hlp
->stats
.queue_size
< (int)hlp
->childs
.queue_size
)
1119 if (squid_curtime
- hlp
->last_queue_warn
< 600)
1122 if (shutting_down
|| reconfiguring
)
1125 hlp
->last_queue_warn
= squid_curtime
;
1127 debugs(84, DBG_CRITICAL
, "WARNING: All " << hlp
->childs
.n_active
<< "/" << hlp
->childs
.n_max
<< " " << hlp
->id_name
<< " processes are busy.");
1128 debugs(84, DBG_CRITICAL
, "WARNING: " << hlp
->stats
.queue_size
<< " pending requests queued");
1129 debugs(84, DBG_CRITICAL
, "WARNING: Consider increasing the number of " << hlp
->id_name
<< " processes in your config file.");
1133 StatefulEnqueue(statefulhelper
* hlp
, Helper::Request
* r
)
1135 dlink_node
*link
= (dlink_node
*)memAllocate(MEM_DLINK_NODE
);
1136 dlinkAddTail(r
, link
, &hlp
->queue
);
1137 ++ hlp
->stats
.queue_size
;
1139 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1140 if (hlp
->childs
.needNew() > 0) {
1141 debugs(84, DBG_CRITICAL
, "Starting new " << hlp
->id_name
<< " helpers...");
1142 helperStatefulOpenServers(hlp
);
1146 if (hlp
->stats
.queue_size
< (int)hlp
->childs
.queue_size
)
1149 if (squid_curtime
- hlp
->last_queue_warn
< 600)
1152 if (shutting_down
|| reconfiguring
)
1155 hlp
->last_queue_warn
= squid_curtime
;
1157 debugs(84, DBG_CRITICAL
, "WARNING: All " << hlp
->childs
.n_active
<< "/" << hlp
->childs
.n_max
<< " " << hlp
->id_name
<< " processes are busy.");
1158 debugs(84, DBG_CRITICAL
, "WARNING: " << hlp
->stats
.queue_size
<< " pending requests queued");
1159 debugs(84, DBG_CRITICAL
, "WARNING: Consider increasing the number of " << hlp
->id_name
<< " processes in your config file.");
1162 static Helper::Request
*
1163 Dequeue(helper
* hlp
)
1166 Helper::Request
*r
= NULL
;
1168 if ((link
= hlp
->queue
.head
)) {
1169 r
= (Helper::Request
*)link
->data
;
1170 dlinkDelete(link
, &hlp
->queue
);
1171 memFree(link
, MEM_DLINK_NODE
);
1172 -- hlp
->stats
.queue_size
;
1178 static Helper::Request
*
1179 StatefulDequeue(statefulhelper
* hlp
)
1182 Helper::Request
*r
= NULL
;
1184 if ((link
= hlp
->queue
.head
)) {
1185 r
= (Helper::Request
*)link
->data
;
1186 dlinkDelete(link
, &hlp
->queue
);
1187 memFree(link
, MEM_DLINK_NODE
);
1188 -- hlp
->stats
.queue_size
;
1194 static helper_server
*
1195 GetFirstAvailable(helper
* hlp
)
1199 helper_server
*selected
= NULL
;
1200 debugs(84, 5, "GetFirstAvailable: Running servers " << hlp
->childs
.n_running
);
1202 if (hlp
->childs
.n_running
== 0)
1205 /* Find "least" loaded helper (approx) */
1206 for (n
= hlp
->servers
.head
; n
!= NULL
; n
= n
->next
) {
1207 srv
= (helper_server
*)n
->data
;
1209 if (selected
&& selected
->stats
.pending
<= srv
->stats
.pending
)
1212 if (srv
->flags
.shutdown
)
1215 if (!srv
->stats
.pending
)
1226 /* Check for overload */
1228 debugs(84, 5, "GetFirstAvailable: None available.");
1232 if (selected
->stats
.pending
>= (hlp
->childs
.concurrency
? hlp
->childs
.concurrency
: 1)) {
1233 debugs(84, 3, "GetFirstAvailable: Least-loaded helper is overloaded!");
1237 debugs(84, 5, "GetFirstAvailable: returning srv-" << selected
->index
);
1241 static helper_stateful_server
*
1242 StatefulGetFirstAvailable(statefulhelper
* hlp
)
1245 helper_stateful_server
*srv
= NULL
;
1246 debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp
->childs
.n_running
);
1248 if (hlp
->childs
.n_running
== 0)
1251 for (n
= hlp
->servers
.head
; n
!= NULL
; n
= n
->next
) {
1252 srv
= (helper_stateful_server
*)n
->data
;
1254 if (srv
->stats
.pending
)
1257 if (srv
->flags
.reserved
)
1260 if (srv
->flags
.shutdown
)
1263 debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv
->index
);
1267 debugs(84, 5, "StatefulGetFirstAvailable: None available.");
1272 helperDispatchWriteDone(const Comm::ConnectionPointer
&, char *, size_t, Comm::Flag flag
, int, void *data
)
1274 helper_server
*srv
= (helper_server
*)data
;
1276 srv
->writebuf
->clean();
1277 delete srv
->writebuf
;
1278 srv
->writebuf
= NULL
;
1279 srv
->flags
.writing
= false;
1281 if (flag
!= Comm::OK
) {
1282 /* Helper server has crashed */
1283 debugs(84, DBG_CRITICAL
, "helperDispatch: Helper " << srv
->parent
->id_name
<< " #" << srv
->index
<< " has crashed");
1287 if (!srv
->wqueue
->isNull()) {
1288 srv
->writebuf
= srv
->wqueue
;
1289 srv
->wqueue
= new MemBuf
;
1290 srv
->flags
.writing
= true;
1291 AsyncCall::Pointer call
= commCbCall(5,5, "helperDispatchWriteDone",
1292 CommIoCbPtrFun(helperDispatchWriteDone
, srv
));
1293 Comm::Write(srv
->writePipe
, srv
->writebuf
->content(), srv
->writebuf
->contentSize(), call
, NULL
);
1298 helperDispatch(helper_server
* srv
, Helper::Request
* r
)
1300 helper
*hlp
= srv
->parent
;
1301 const uint64_t reqId
= ++srv
->nextRequestId
;
1303 if (!cbdataReferenceValid(r
->data
)) {
1304 debugs(84, DBG_IMPORTANT
, "helperDispatch: invalid callback data");
1310 helper_server::Requests::iterator it
= srv
->requests
.insert(srv
->requests
.end(), r
);
1311 r
->dispatch_time
= current_time
;
1313 if (srv
->wqueue
->isNull())
1314 srv
->wqueue
->init();
1316 if (hlp
->childs
.concurrency
) {
1317 srv
->requestsIndex
.insert(helper_server::RequestIndex::value_type(reqId
, it
));
1318 assert(srv
->requestsIndex
.size() == srv
->requests
.size());
1319 srv
->wqueue
->appendf("%" PRIu64
" %s", reqId
, r
->buf
);
1321 srv
->wqueue
->append(r
->buf
, strlen(r
->buf
));
1323 if (!srv
->flags
.writing
) {
1324 assert(NULL
== srv
->writebuf
);
1325 srv
->writebuf
= srv
->wqueue
;
1326 srv
->wqueue
= new MemBuf
;
1327 srv
->flags
.writing
= true;
1328 AsyncCall::Pointer call
= commCbCall(5,5, "helperDispatchWriteDone",
1329 CommIoCbPtrFun(helperDispatchWriteDone
, srv
));
1330 Comm::Write(srv
->writePipe
, srv
->writebuf
->content(), srv
->writebuf
->contentSize(), call
, NULL
);
1333 debugs(84, 5, "helperDispatch: Request sent to " << hlp
->id_name
<< " #" << srv
->index
<< ", " << strlen(r
->buf
) << " bytes");
1336 ++ srv
->stats
.pending
;
1337 ++ hlp
->stats
.requests
;
1341 helperStatefulDispatchWriteDone(const Comm::ConnectionPointer
&, char *, size_t, Comm::Flag
, int, void *)
1345 helperStatefulDispatch(helper_stateful_server
* srv
, Helper::Request
* r
)
1347 statefulhelper
*hlp
= srv
->parent
;
1349 if (!cbdataReferenceValid(r
->data
)) {
1350 debugs(84, DBG_IMPORTANT
, "helperStatefulDispatch: invalid callback data");
1352 helperStatefulReleaseServer(srv
);
1356 debugs(84, 9, "helperStatefulDispatch busying helper " << hlp
->id_name
<< " #" << srv
->index
);
1358 if (r
->placeholder
== 1) {
1359 /* a callback is needed before this request can _use_ a helper. */
1360 /* we don't care about releasing this helper. The request NEVER
1361 * gets to the helper. So we throw away the return code */
1362 Helper::Reply nilReply
;
1363 nilReply
.whichServer
= srv
;
1364 r
->callback(r
->data
, nilReply
);
1365 /* throw away the placeholder */
1367 /* and push the queue. Note that the callback may have submitted a new
1368 * request to the helper which is why we test for the request */
1370 if (!srv
->requests
.size())
1371 helperStatefulServerDone(srv
);
1376 srv
->flags
.reserved
= true;
1377 srv
->requests
.push_back(r
);
1378 srv
->dispatch_time
= current_time
;
1379 AsyncCall::Pointer call
= commCbCall(5,5, "helperStatefulDispatchWriteDone",
1380 CommIoCbPtrFun(helperStatefulDispatchWriteDone
, hlp
));
1381 Comm::Write(srv
->writePipe
, r
->buf
, strlen(r
->buf
), call
, NULL
);
1382 debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
1383 hlp
->id_name
<< " #" << srv
->index
<< ", " <<
1384 (int) strlen(r
->buf
) << " bytes");
1387 ++ srv
->stats
.pending
;
1388 ++ hlp
->stats
.requests
;
1392 helperKickQueue(helper
* hlp
)
1397 while ((srv
= GetFirstAvailable(hlp
)) && (r
= Dequeue(hlp
)))
1398 helperDispatch(srv
, r
);
1402 helperStatefulKickQueue(statefulhelper
* hlp
)
1405 helper_stateful_server
*srv
;
1407 while ((srv
= StatefulGetFirstAvailable(hlp
)) && (r
= StatefulDequeue(hlp
)))
1408 helperStatefulDispatch(srv
, r
);
1412 helperStatefulServerDone(helper_stateful_server
* srv
)
1414 if (!srv
->flags
.shutdown
) {
1415 helperStatefulKickQueue(srv
->parent
);
1416 } else if (!srv
->flags
.closing
&& !srv
->flags
.reserved
&& !srv
->stats
.pending
) {
1417 srv
->closeWritePipeSafely(srv
->parent
->id_name
);
1423 helper_server::checkForTimedOutRequests(bool const retry
)
1425 assert(parent
->childs
.concurrency
);
1426 while(!requests
.empty() && requests
.front()->timedOut(parent
->timeout
)) {
1427 Helper::Request
*r
= requests
.front();
1428 RequestIndex::iterator it
;
1429 it
= requestsIndex
.find(r
->Id
);
1430 assert(it
!= requestsIndex
.end());
1431 requestsIndex
.erase(it
);
1432 requests
.pop_front();
1433 debugs(84, 2, "Request " << r
->Id
<< " timed-out, remove it from queue");
1435 bool retried
= false;
1436 if (retry
&& r
->retries
< MAX_RETRIES
&& cbdataReferenceValid(r
->data
)) {
1437 debugs(84, 2, "Retry request " << r
->Id
);
1439 parent
->submitRequest(r
);
1441 } else if (cbdataReferenceValidDone(r
->data
, &cbdata
)) {
1442 if (!parent
->onTimedOutResponse
.isEmpty()) {
1443 // Helper::Reply needs a non const buffer
1444 char *replyMsg
= xstrdup(parent
->onTimedOutResponse
.c_str());
1445 r
->callback(cbdata
, Helper::Reply(replyMsg
, strlen(replyMsg
)));
1448 r
->callback(cbdata
, Helper::Reply(Helper::TimedOut
));
1452 ++parent
->stats
.timedout
;
1459 helper_server::requestTimeout(const CommTimeoutCbParams
&io
)
1461 debugs(26, 3, HERE
<< io
.conn
);
1462 helper_server
*srv
= static_cast<helper_server
*>(io
.data
);
1464 if (!cbdataReferenceValid(srv
))
1467 srv
->checkForTimedOutRequests(srv
->parent
->retryTimedOut
);
1469 debugs(84, 3, HERE
<< io
.conn
<< " establish new helper_server::requestTimeout");
1470 AsyncCall::Pointer timeoutCall
= commCbCall(84, 4, "helper_server::requestTimeout",
1471 CommTimeoutCbPtrFun(helper_server::requestTimeout
, srv
));
1473 const int timeSpent
= srv
->requests
.empty() ? 0 : (squid_curtime
- srv
->requests
.front()->dispatch_time
.tv_sec
);
1474 const int timeLeft
= max(1, (static_cast<int>(srv
->parent
->timeout
) - timeSpent
));
1476 commSetConnTimeout(io
.conn
, timeLeft
, timeoutCall
);