4 * DEBUG: section 84 Helper process maintenance
5 * AUTHOR: Harvest Derived?
7 * SQUID Web Proxy Cache http://www.squid-cache.org/
8 * ----------------------------------------------------------
10 * Squid is the result of efforts by numerous individuals from
11 * the Internet community; see the CONTRIBUTORS file for full
12 * details. Many organizations have provided support for Squid's
13 * development; see the SPONSORS file for full details. Squid is
14 * Copyrighted (C) 2001 by the Regents of the University of
15 * California; see the COPYRIGHT file for full details. Squid
16 * incorporates software developed and/or copyrighted by other
17 * sources; see the CREDITS file for full details.
19 * This program is free software; you can redistribute it and/or modify
20 * it under the terms of the GNU General Public License as published by
21 * the Free Software Foundation; either version 2 of the License, or
22 * (at your option) any later version.
24 * This program is distributed in the hope that it will be useful,
25 * but WITHOUT ANY WARRANTY; without even the implied warranty of
26 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
27 * GNU General Public License for more details.
29 * You should have received a copy of the GNU General Public License
30 * along with this program; if not, write to the Free Software
31 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
36 #include "base/AsyncCbdataCalls.h"
38 #include "comm/Connection.h"
39 #include "comm/Write.h"
40 #include "format/Quoting.h"
44 #include "SquidMath.h"
45 #include "SquidTime.h"
49 #define HELPER_MAX_ARGS 64
52 /** Initial Squid input buffer size. Helper responses may exceed this, and
53 * Squid will grow the input buffer as needed, up to ReadBufMaxSize.
55 const size_t ReadBufMinSize(4*1024);
57 /** Maximum safe size of a helper-to-Squid response message plus one.
58 * Squid will warn and close the stream if a helper sends a too-big response.
59 * ssl_crtd helper is known to produce responses of at least 10KB in size.
60 * Some undocumented helpers are known to produce responses exceeding 8KB.
62 const size_t ReadBufMaxSize(32*1024);
64 static IOCB helperHandleRead
;
65 static IOCB helperStatefulHandleRead
;
66 static void helperServerFree(helper_server
*srv
);
67 static void helperStatefulServerFree(helper_stateful_server
*srv
);
68 static void Enqueue(helper
* hlp
, helper_request
*);
69 static helper_request
*Dequeue(helper
* hlp
);
70 static helper_stateful_request
*StatefulDequeue(statefulhelper
* hlp
);
71 static helper_server
*GetFirstAvailable(helper
* hlp
);
72 static helper_stateful_server
*StatefulGetFirstAvailable(statefulhelper
* hlp
);
73 static void helperDispatch(helper_server
* srv
, helper_request
* r
);
74 static void helperStatefulDispatch(helper_stateful_server
* srv
, helper_stateful_request
* r
);
75 static void helperKickQueue(helper
* hlp
);
76 static void helperStatefulKickQueue(statefulhelper
* hlp
);
77 static void helperStatefulServerDone(helper_stateful_server
* srv
);
78 static void helperRequestFree(helper_request
* r
);
79 static void helperStatefulRequestFree(helper_stateful_request
* r
);
80 static void StatefulEnqueue(statefulhelper
* hlp
, helper_stateful_request
* r
);
81 static bool helperStartStats(StoreEntry
*sentry
, void *hlp
, const char *label
);
84 CBDATA_CLASS_INIT(helper
);
85 CBDATA_TYPE(helper_server
);
86 CBDATA_CLASS_INIT(statefulhelper
);
87 CBDATA_TYPE(helper_stateful_server
);
90 HelperServerBase::closePipesSafely()
95 shutdown(writePipe
->fd
, SD_BOTH
);
99 if (readPipe
->fd
== writePipe
->fd
)
107 if (WaitForSingleObject(hIpc
, 5000) != WAIT_OBJECT_0
) {
109 debugs(84, DBG_IMPORTANT
, "WARNING: " << hlp
->id_name
<<
110 " #" << no
<< " (" << hlp
->cmdline
->key
<< "," <<
111 (long int)pid
<< ") didn't exit in 5 seconds");
119 HelperServerBase::closeWritePipeSafely()
124 shutdown(writePipe
->fd
, (readPipe
->fd
== writePipe
->fd
? SD_BOTH
: SD_SEND
));
128 if (readPipe
->fd
== writePipe
->fd
)
134 if (WaitForSingleObject(hIpc
, 5000) != WAIT_OBJECT_0
) {
136 debugs(84, DBG_IMPORTANT
, "WARNING: " << hlp
->id_name
<<
137 " #" << no
<< " (" << hlp
->cmdline
->key
<< "," <<
138 (long int)pid
<< ") didn't exit in 5 seconds");
146 helperOpenServers(helper
* hlp
)
152 const char *args
[HELPER_MAX_ARGS
+1]; // save space for a NULL terminator
153 char fd_note_buf
[FD_DESC_SZ
];
163 if (hlp
->cmdline
== NULL
)
166 progname
= hlp
->cmdline
->key
;
168 if ((s
= strrchr(progname
, '/')))
169 shortname
= xstrdup(s
+ 1);
171 shortname
= xstrdup(progname
);
173 /* figure out how many new child are actually needed. */
174 int need_new
= hlp
->childs
.needNew();
176 debugs(84, DBG_IMPORTANT
, "helperOpenServers: Starting " << need_new
<< "/" << hlp
->childs
.n_max
<< " '" << shortname
<< "' processes");
179 debugs(84, DBG_IMPORTANT
, "helperOpenServers: No '" << shortname
<< "' processes needed.");
182 procname
= (char *)xmalloc(strlen(shortname
) + 3);
184 snprintf(procname
, strlen(shortname
) + 3, "(%s)", shortname
);
186 args
[nargs
] = procname
;
189 for (w
= hlp
->cmdline
->next
; w
&& nargs
< HELPER_MAX_ARGS
; w
= w
->next
) {
190 args
[nargs
] = w
->key
;
197 assert(nargs
<= HELPER_MAX_ARGS
);
199 for (k
= 0; k
< need_new
; ++k
) {
202 pid
= ipcCreate(hlp
->ipc_type
,
212 debugs(84, DBG_IMPORTANT
, "WARNING: Cannot run '" << progname
<< "' process.");
216 ++ hlp
->childs
.n_running
;
217 ++ hlp
->childs
.n_active
;
218 CBDATA_INIT_TYPE(helper_server
);
219 srv
= cbdataAlloc(helper_server
);
223 srv
->addr
= hlp
->addr
;
224 srv
->readPipe
= new Comm::Connection
;
225 srv
->readPipe
->fd
= rfd
;
226 srv
->writePipe
= new Comm::Connection
;
227 srv
->writePipe
->fd
= wfd
;
228 srv
->rbuf
= (char *)memAllocBuf(ReadBufMinSize
, &srv
->rbuf_sz
);
229 srv
->wqueue
= new MemBuf
;
231 srv
->requests
= (helper_request
**)xcalloc(hlp
->childs
.concurrency
? hlp
->childs
.concurrency
: 1, sizeof(*srv
->requests
));
232 srv
->parent
= cbdataReference(hlp
);
233 dlinkAddTail(srv
, &srv
->link
, &hlp
->servers
);
236 snprintf(fd_note_buf
, FD_DESC_SZ
, "%s #%d", shortname
, k
+ 1);
237 fd_note(rfd
, fd_note_buf
);
239 snprintf(fd_note_buf
, FD_DESC_SZ
, "reading %s #%d", shortname
, k
+ 1);
240 fd_note(rfd
, fd_note_buf
);
241 snprintf(fd_note_buf
, FD_DESC_SZ
, "writing %s #%d", shortname
, k
+ 1);
242 fd_note(wfd
, fd_note_buf
);
245 commSetNonBlocking(rfd
);
248 commSetNonBlocking(wfd
);
250 AsyncCall::Pointer closeCall
= asyncCall(5,4, "helperServerFree", cbdataDialer(helperServerFree
, srv
));
251 comm_add_close_handler(rfd
, closeCall
);
253 AsyncCall::Pointer call
= commCbCall(5,4, "helperHandleRead",
254 CommIoCbPtrFun(helperHandleRead
, srv
));
255 comm_read(srv
->readPipe
, srv
->rbuf
, srv
->rbuf_sz
- 1, call
);
258 hlp
->last_restart
= squid_curtime
;
259 safe_free(shortname
);
261 helperKickQueue(hlp
);
267 * helperStatefulOpenServers: create the stateful child helper processes
270 helperStatefulOpenServers(statefulhelper
* hlp
)
273 const char *args
[HELPER_MAX_ARGS
+1]; // save space for a NULL terminator
274 char fd_note_buf
[FD_DESC_SZ
];
277 if (hlp
->cmdline
== NULL
)
280 if (hlp
->childs
.concurrency
)
281 debugs(84, DBG_CRITICAL
, "ERROR: concurrency= is not yet supported for stateful helpers ('" << hlp
->cmdline
<< "')");
283 char *progname
= hlp
->cmdline
->key
;
286 if ((s
= strrchr(progname
, '/')))
287 shortname
= xstrdup(s
+ 1);
289 shortname
= xstrdup(progname
);
291 /* figure out haw mant new helpers are needed. */
292 int need_new
= hlp
->childs
.needNew();
294 debugs(84, DBG_IMPORTANT
, "helperOpenServers: Starting " << need_new
<< "/" << hlp
->childs
.n_max
<< " '" << shortname
<< "' processes");
297 debugs(84, DBG_IMPORTANT
, "helperStatefulOpenServers: No '" << shortname
<< "' processes needed.");
300 char *procname
= (char *)xmalloc(strlen(shortname
) + 3);
302 snprintf(procname
, strlen(shortname
) + 3, "(%s)", shortname
);
304 args
[nargs
] = procname
;
307 for (wordlist
*w
= hlp
->cmdline
->next
; w
&& nargs
< HELPER_MAX_ARGS
; w
= w
->next
) {
308 args
[nargs
] = w
->key
;
315 assert(nargs
<= HELPER_MAX_ARGS
);
317 for (int k
= 0; k
< need_new
; ++k
) {
322 pid_t pid
= ipcCreate(hlp
->ipc_type
,
332 debugs(84, DBG_IMPORTANT
, "WARNING: Cannot run '" << progname
<< "' process.");
336 ++ hlp
->childs
.n_running
;
337 ++ hlp
->childs
.n_active
;
338 CBDATA_INIT_TYPE(helper_stateful_server
);
339 helper_stateful_server
*srv
= cbdataAlloc(helper_stateful_server
);
342 srv
->flags
.reserved
= 0;
343 srv
->stats
.submits
= 0;
344 srv
->stats
.releases
= 0;
346 srv
->addr
= hlp
->addr
;
347 srv
->readPipe
= new Comm::Connection
;
348 srv
->readPipe
->fd
= rfd
;
349 srv
->writePipe
= new Comm::Connection
;
350 srv
->writePipe
->fd
= wfd
;
351 srv
->rbuf
= (char *)memAllocBuf(ReadBufMinSize
, &srv
->rbuf_sz
);
353 srv
->parent
= cbdataReference(hlp
);
355 if (hlp
->datapool
!= NULL
)
356 srv
->data
= hlp
->datapool
->alloc();
358 dlinkAddTail(srv
, &srv
->link
, &hlp
->servers
);
361 snprintf(fd_note_buf
, FD_DESC_SZ
, "%s #%d", shortname
, k
+ 1);
362 fd_note(rfd
, fd_note_buf
);
364 snprintf(fd_note_buf
, FD_DESC_SZ
, "reading %s #%d", shortname
, k
+ 1);
365 fd_note(rfd
, fd_note_buf
);
366 snprintf(fd_note_buf
, FD_DESC_SZ
, "writing %s #%d", shortname
, k
+ 1);
367 fd_note(wfd
, fd_note_buf
);
370 commSetNonBlocking(rfd
);
373 commSetNonBlocking(wfd
);
375 AsyncCall::Pointer closeCall
= asyncCall(5,4, "helperStatefulServerFree", cbdataDialer(helperStatefulServerFree
, srv
));
376 comm_add_close_handler(rfd
, closeCall
);
378 AsyncCall::Pointer call
= commCbCall(5,4, "helperStatefulHandleRead",
379 CommIoCbPtrFun(helperStatefulHandleRead
, srv
));
380 comm_read(srv
->readPipe
, srv
->rbuf
, srv
->rbuf_sz
- 1, call
);
383 hlp
->last_restart
= squid_curtime
;
384 safe_free(shortname
);
386 helperStatefulKickQueue(hlp
);
391 helperSubmit(helper
* hlp
, const char *buf
, HLPCB
* callback
, void *data
)
394 debugs(84, 3, "helperSubmit: hlp == NULL");
395 callback(data
, NULL
);
399 helper_request
*r
= new helper_request
;
402 r
->callback
= callback
;
403 r
->data
= cbdataReference(data
);
404 r
->buf
= xstrdup(buf
);
406 if ((srv
= GetFirstAvailable(hlp
)))
407 helperDispatch(srv
, r
);
411 debugs(84, 9, "helperSubmit: " << buf
);
414 /// lastserver = "server last used as part of a reserved request sequence"
416 helperStatefulSubmit(statefulhelper
* hlp
, const char *buf
, HLPSCB
* callback
, void *data
, helper_stateful_server
* lastserver
)
419 debugs(84, 3, "helperStatefulSubmit: hlp == NULL");
420 callback(data
, 0, NULL
);
424 helper_stateful_request
*r
= new helper_stateful_request
;
426 r
->callback
= callback
;
427 r
->data
= cbdataReference(data
);
430 r
->buf
= xstrdup(buf
);
437 if ((buf
!= NULL
) && lastserver
) {
438 debugs(84, 5, "StatefulSubmit with lastserver " << lastserver
);
439 assert(lastserver
->flags
.reserved
);
440 assert(!(lastserver
->request
));
442 debugs(84, 5, "StatefulSubmit dispatching");
443 helperStatefulDispatch(lastserver
, r
);
445 helper_stateful_server
*srv
;
446 if ((srv
= StatefulGetFirstAvailable(hlp
))) {
447 helperStatefulDispatch(srv
, r
);
449 StatefulEnqueue(hlp
, r
);
452 debugs(84, 9, "helperStatefulSubmit: placeholder: '" << r
->placeholder
<< "', buf '" << buf
<< "'.");
458 * helperStatefulReleaseServer tells the helper that whoever was
459 * using it no longer needs its services.
462 helperStatefulReleaseServer(helper_stateful_server
* srv
)
464 debugs(84, 3, HERE
<< "srv-" << srv
->index
<< " flags.reserved = " << srv
->flags
.reserved
);
465 if (!srv
->flags
.reserved
)
468 ++ srv
->stats
.releases
;
470 srv
->flags
.reserved
= 0;
471 if (srv
->parent
->OnEmptyQueue
!= NULL
&& srv
->data
)
472 srv
->parent
->OnEmptyQueue(srv
->data
);
474 helperStatefulServerDone(srv
);
477 /** return a pointer to the stateful routines data area */
479 helperStatefulServerGetData(helper_stateful_server
* srv
)
485 * Dump some stats about the helper states to a StoreEntry
488 helperStats(StoreEntry
* sentry
, helper
* hlp
, const char *label
)
490 if (!helperStartStats(sentry
, hlp
, label
))
493 storeAppendPrintf(sentry
, "program: %s\n",
495 storeAppendPrintf(sentry
, "number active: %d of %d (%d shutting down)\n",
496 hlp
->childs
.n_active
, hlp
->childs
.n_max
, (hlp
->childs
.n_running
- hlp
->childs
.n_active
) );
497 storeAppendPrintf(sentry
, "requests sent: %d\n",
498 hlp
->stats
.requests
);
499 storeAppendPrintf(sentry
, "replies received: %d\n",
501 storeAppendPrintf(sentry
, "queue length: %d\n",
502 hlp
->stats
.queue_size
);
503 storeAppendPrintf(sentry
, "avg service time: %d msec\n",
504 hlp
->stats
.avg_svc_time
);
505 storeAppendPrintf(sentry
, "\n");
506 storeAppendPrintf(sentry
, "%7s\t%7s\t%7s\t%11s\t%s\t%7s\t%7s\t%7s\n",
516 for (dlink_node
*link
= hlp
->servers
.head
; link
; link
= link
->next
) {
517 helper_server
*srv
= (helper_server
*)link
->data
;
518 double tt
= 0.001 * (srv
->requests
[0] ? tvSubMsec(srv
->requests
[0]->dispatch_time
, current_time
) : tvSubMsec(srv
->dispatch_time
, srv
->answer_time
));
519 storeAppendPrintf(sentry
, "%7d\t%7d\t%7d\t%11d\t%c%c%c%c\t%7.3f\t%7d\t%s\n",
524 srv
->stats
.pending
? 'B' : ' ',
525 srv
->flags
.writing
? 'W' : ' ',
526 srv
->flags
.closing
? 'C' : ' ',
527 srv
->flags
.shutdown
? 'S' : ' ',
530 srv
->requests
[0] ? Format::QuoteMimeBlob(srv
->requests
[0]->buf
) : "(none)");
533 storeAppendPrintf(sentry
, "\nFlags key:\n\n");
534 storeAppendPrintf(sentry
, " B = BUSY\n");
535 storeAppendPrintf(sentry
, " W = WRITING\n");
536 storeAppendPrintf(sentry
, " C = CLOSING\n");
537 storeAppendPrintf(sentry
, " S = SHUTDOWN PENDING\n");
541 helperStatefulStats(StoreEntry
* sentry
, statefulhelper
* hlp
, const char *label
)
543 if (!helperStartStats(sentry
, hlp
, label
))
546 storeAppendPrintf(sentry
, "program: %s\n",
548 storeAppendPrintf(sentry
, "number active: %d of %d (%d shutting down)\n",
549 hlp
->childs
.n_active
, hlp
->childs
.n_max
, (hlp
->childs
.n_running
- hlp
->childs
.n_active
) );
550 storeAppendPrintf(sentry
, "requests sent: %d\n",
551 hlp
->stats
.requests
);
552 storeAppendPrintf(sentry
, "replies received: %d\n",
554 storeAppendPrintf(sentry
, "queue length: %d\n",
555 hlp
->stats
.queue_size
);
556 storeAppendPrintf(sentry
, "avg service time: %d msec\n",
557 hlp
->stats
.avg_svc_time
);
558 storeAppendPrintf(sentry
, "\n");
559 storeAppendPrintf(sentry
, "%7s\t%7s\t%7s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
569 for (dlink_node
*link
= hlp
->servers
.head
; link
; link
= link
->next
) {
570 helper_stateful_server
*srv
= (helper_stateful_server
*)link
->data
;
571 double tt
= 0.001 * tvSubMsec(srv
->dispatch_time
, srv
->flags
.busy
? current_time
: srv
->answer_time
);
572 storeAppendPrintf(sentry
, "%7d\t%7d\t%7d\t%11d\t%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
577 srv
->flags
.busy
? 'B' : ' ',
578 srv
->flags
.closing
? 'C' : ' ',
579 srv
->flags
.reserved
? 'R' : ' ',
580 srv
->flags
.shutdown
? 'S' : ' ',
581 srv
->request
? (srv
->request
->placeholder
? 'P' : ' ') : ' ',
584 srv
->request
? Format::QuoteMimeBlob(srv
->request
->buf
) : "(none)");
587 storeAppendPrintf(sentry
, "\nFlags key:\n\n");
588 storeAppendPrintf(sentry
, " B = BUSY\n");
589 storeAppendPrintf(sentry
, " C = CLOSING\n");
590 storeAppendPrintf(sentry
, " R = RESERVED\n");
591 storeAppendPrintf(sentry
, " S = SHUTDOWN PENDING\n");
592 storeAppendPrintf(sentry
, " P = PLACEHOLDER\n");
596 helperShutdown(helper
* hlp
)
598 dlink_node
*link
= hlp
->servers
.head
;
602 srv
= (helper_server
*)link
->data
;
605 if (srv
->flags
.shutdown
) {
606 debugs(84, 3, "helperShutdown: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " has already SHUT DOWN.");
610 assert(hlp
->childs
.n_active
> 0);
611 -- hlp
->childs
.n_active
;
612 srv
->flags
.shutdown
= 1; /* request it to shut itself down */
614 if (srv
->flags
.closing
) {
615 debugs(84, 3, "helperShutdown: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " is CLOSING.");
619 if (srv
->stats
.pending
) {
620 debugs(84, 3, "helperShutdown: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " is BUSY.");
624 debugs(84, 3, "helperShutdown: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " shutting down.");
625 /* the rest of the details is dealt with in the helperServerFree
628 srv
->closePipesSafely();
633 helperStatefulShutdown(statefulhelper
* hlp
)
635 dlink_node
*link
= hlp
->servers
.head
;
636 helper_stateful_server
*srv
;
639 srv
= (helper_stateful_server
*)link
->data
;
642 if (srv
->flags
.shutdown
) {
643 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " has already SHUT DOWN.");
647 assert(hlp
->childs
.n_active
> 0);
648 -- hlp
->childs
.n_active
;
649 srv
->flags
.shutdown
= 1; /* request it to shut itself down */
651 if (srv
->flags
.busy
) {
652 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " is BUSY.");
656 if (srv
->flags
.closing
) {
657 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " is CLOSING.");
661 if (srv
->flags
.reserved
) {
663 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " is RESERVED. Closing anyway.");
665 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " is RESERVED. Not Shutting Down Yet.");
670 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " shutting down.");
672 /* the rest of the details is dealt with in the helperStatefulServerFree
675 srv
->closePipesSafely();
681 /* note, don't free id_name, it probably points to static memory */
684 debugs(84, DBG_CRITICAL
, "WARNING: freeing " << id_name
<< " helper with " << stats
.queue_size
<< " requests queued");
687 /* ====================================================================== */
688 /* LOCAL FUNCTIONS */
689 /* ====================================================================== */
692 helperServerFree(helper_server
*srv
)
694 helper
*hlp
= srv
->parent
;
696 int i
, concurrency
= hlp
->childs
.concurrency
;
702 memFreeBuf(srv
->rbuf_sz
, srv
->rbuf
);
706 srv
->wqueue
->clean();
710 srv
->writebuf
->clean();
711 delete srv
->writebuf
;
712 srv
->writebuf
= NULL
;
715 if (Comm::IsConnOpen(srv
->writePipe
))
716 srv
->closeWritePipeSafely();
718 dlinkDelete(&srv
->link
, &hlp
->servers
);
720 assert(hlp
->childs
.n_running
> 0);
721 -- hlp
->childs
.n_running
;
723 if (!srv
->flags
.shutdown
) {
724 assert(hlp
->childs
.n_active
> 0);
725 -- hlp
->childs
.n_active
;
726 debugs(84, DBG_CRITICAL
, "WARNING: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " exited");
728 if (hlp
->childs
.needNew() > 0) {
729 debugs(80, DBG_IMPORTANT
, "Too few " << hlp
->id_name
<< " processes are running (need " << hlp
->childs
.needNew() << "/" << hlp
->childs
.n_max
<< ")");
731 if (hlp
->childs
.n_active
< hlp
->childs
.n_startup
&& hlp
->last_restart
> squid_curtime
- 30)
732 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp
->id_name
);
734 debugs(80, DBG_IMPORTANT
, "Starting new helpers");
735 helperOpenServers(hlp
);
739 for (i
= 0; i
< concurrency
; ++i
) {
740 if ((r
= srv
->requests
[i
])) {
743 if (cbdataReferenceValidDone(r
->data
, &cbdata
))
744 r
->callback(cbdata
, NULL
);
746 helperRequestFree(r
);
748 srv
->requests
[i
] = NULL
;
751 safe_free(srv
->requests
);
753 cbdataReferenceDone(srv
->parent
);
758 helperStatefulServerFree(helper_stateful_server
*srv
)
760 statefulhelper
*hlp
= srv
->parent
;
761 helper_stateful_request
*r
;
764 memFreeBuf(srv
->rbuf_sz
, srv
->rbuf
);
769 srv
->wqueue
->clean();
775 /* TODO: walk the local queue of requests and carry them all out */
776 if (Comm::IsConnOpen(srv
->writePipe
))
777 srv
->closeWritePipeSafely();
779 dlinkDelete(&srv
->link
, &hlp
->servers
);
781 assert(hlp
->childs
.n_running
> 0);
782 -- hlp
->childs
.n_running
;
784 if (!srv
->flags
.shutdown
) {
785 assert( hlp
->childs
.n_active
> 0);
786 -- hlp
->childs
.n_active
;
787 debugs(84, DBG_CRITICAL
, "WARNING: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " exited");
789 if (hlp
->childs
.needNew() > 0) {
790 debugs(80, DBG_IMPORTANT
, "Too few " << hlp
->id_name
<< " processes are running (need " << hlp
->childs
.needNew() << "/" << hlp
->childs
.n_max
<< ")");
792 if (hlp
->childs
.n_active
< hlp
->childs
.n_startup
&& hlp
->last_restart
> squid_curtime
- 30)
793 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp
->id_name
);
795 debugs(80, DBG_IMPORTANT
, "Starting new helpers");
796 helperStatefulOpenServers(hlp
);
800 if ((r
= srv
->request
)) {
803 if (cbdataReferenceValidDone(r
->data
, &cbdata
))
804 r
->callback(cbdata
, srv
, NULL
);
806 helperStatefulRequestFree(r
);
811 if (srv
->data
!= NULL
)
812 hlp
->datapool
->freeOne(srv
->data
);
814 cbdataReferenceDone(srv
->parent
);
819 /// Calls back with a pointer to the buffer with the helper output
820 static void helperReturnBuffer(int request_number
, helper_server
* srv
, helper
* hlp
, char * msg
, char * msg_end
)
822 helper_request
*r
= srv
->requests
[request_number
];
824 HLPCB
*callback
= r
->callback
;
826 srv
->requests
[request_number
] = NULL
;
831 if (cbdataReferenceValidDone(r
->data
, &cbdata
))
832 callback(cbdata
, msg
);
834 -- srv
->stats
.pending
;
836 ++ hlp
->stats
.replies
;
838 srv
->answer_time
= current_time
;
840 srv
->dispatch_time
= r
->dispatch_time
;
842 hlp
->stats
.avg_svc_time
=
843 Math::intAverage(hlp
->stats
.avg_svc_time
,
844 tvSubMsec(r
->dispatch_time
, current_time
),
845 hlp
->stats
.replies
, REDIRECT_AV_FACTOR
);
847 helperRequestFree(r
);
849 debugs(84, DBG_IMPORTANT
, "helperHandleRead: unexpected reply on channel " <<
850 request_number
<< " from " << hlp
->id_name
<< " #" << srv
->index
+ 1 <<
851 " '" << srv
->rbuf
<< "'");
853 srv
->roffset
-= (msg_end
- srv
->rbuf
);
854 memmove(srv
->rbuf
, msg_end
, srv
->roffset
+ 1);
856 if (!srv
->flags
.shutdown
) {
857 helperKickQueue(hlp
);
858 } else if (!srv
->flags
.closing
&& !srv
->stats
.pending
) {
859 srv
->flags
.closing
=1;
860 srv
->writePipe
->close();
866 helperHandleRead(const Comm::ConnectionPointer
&conn
, char *buf
, size_t len
, comm_err_t flag
, int xerrno
, void *data
)
869 helper_server
*srv
= (helper_server
*)data
;
870 helper
*hlp
= srv
->parent
;
871 assert(cbdataReferenceValid(data
));
873 /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
875 if (flag
== COMM_ERR_CLOSING
) {
879 assert(conn
->fd
== srv
->readPipe
->fd
);
881 debugs(84, 5, "helperHandleRead: " << len
<< " bytes from " << hlp
->id_name
<< " #" << srv
->index
+ 1);
883 if (flag
!= COMM_OK
|| len
== 0) {
884 srv
->closePipesSafely();
889 srv
->rbuf
[srv
->roffset
] = '\0';
890 debugs(84, 9, "helperHandleRead: '" << srv
->rbuf
<< "'");
892 if (!srv
->stats
.pending
) {
893 /* someone spoke without being spoken to */
894 debugs(84, DBG_IMPORTANT
, "helperHandleRead: unexpected read from " <<
895 hlp
->id_name
<< " #" << srv
->index
+ 1 << ", " << (int)len
<<
896 " bytes '" << srv
->rbuf
<< "'");
902 while ((t
= strchr(srv
->rbuf
, hlp
->eom
))) {
903 /* end of reply found */
904 char *msg
= srv
->rbuf
;
906 debugs(84, 3, "helperHandleRead: end of reply found");
908 if (t
> srv
->rbuf
&& t
[-1] == '\r' && hlp
->eom
== '\n')
914 if (hlp
->childs
.concurrency
) {
915 i
= strtol(msg
, &msg
, 10);
917 while (*msg
&& xisspace(*msg
))
921 helperReturnBuffer(i
, srv
, hlp
, msg
, t
);
924 if (Comm::IsConnOpen(srv
->readPipe
)) {
925 int spaceSize
= srv
->rbuf_sz
- srv
->roffset
- 1;
926 assert(spaceSize
>= 0);
928 // grow the input buffer if needed and possible
929 if (!spaceSize
&& srv
->rbuf_sz
+ 4096 <= ReadBufMaxSize
) {
930 srv
->rbuf
= (char *)memReallocBuf(srv
->rbuf
, srv
->rbuf_sz
+ 4096, &srv
->rbuf_sz
);
931 debugs(84, 3, HERE
<< "Grew read buffer to " << srv
->rbuf_sz
);
932 spaceSize
= srv
->rbuf_sz
- srv
->roffset
- 1;
933 assert(spaceSize
>= 0);
936 // quit reading if there is no space left
938 debugs(84, DBG_IMPORTANT
, "ERROR: Disconnecting from a " <<
939 "helper that overflowed " << srv
->rbuf_sz
<< "-byte " <<
940 "Squid input buffer: " << hlp
->id_name
<< " #" <<
942 srv
->closePipesSafely();
946 AsyncCall::Pointer call
= commCbCall(5,4, "helperHandleRead",
947 CommIoCbPtrFun(helperHandleRead
, srv
));
948 comm_read(srv
->readPipe
, srv
->rbuf
+ srv
->roffset
, spaceSize
, call
);
953 helperStatefulHandleRead(const Comm::ConnectionPointer
&conn
, char *buf
, size_t len
, comm_err_t flag
, int xerrno
, void *data
)
956 helper_stateful_server
*srv
= (helper_stateful_server
*)data
;
957 helper_stateful_request
*r
;
958 statefulhelper
*hlp
= srv
->parent
;
959 assert(cbdataReferenceValid(data
));
961 /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
963 if (flag
== COMM_ERR_CLOSING
) {
967 assert(conn
->fd
== srv
->readPipe
->fd
);
969 debugs(84, 5, "helperStatefulHandleRead: " << len
<< " bytes from " <<
970 hlp
->id_name
<< " #" << srv
->index
+ 1);
973 if (flag
!= COMM_OK
|| len
== 0) {
974 srv
->closePipesSafely();
979 srv
->rbuf
[srv
->roffset
] = '\0';
983 /* someone spoke without being spoken to */
984 debugs(84, DBG_IMPORTANT
, "helperStatefulHandleRead: unexpected read from " <<
985 hlp
->id_name
<< " #" << srv
->index
+ 1 << ", " << (int)len
<<
986 " bytes '" << srv
->rbuf
<< "'");
991 if ((t
= strchr(srv
->rbuf
, hlp
->eom
))) {
992 /* end of reply found */
994 debugs(84, 3, "helperStatefulHandleRead: end of reply found");
996 if (t
> srv
->rbuf
&& t
[-1] == '\r' && hlp
->eom
== '\n')
1001 if (r
&& cbdataReferenceValid(r
->data
)) {
1002 r
->callback(r
->data
, srv
, srv
->rbuf
);
1004 debugs(84, DBG_IMPORTANT
, "StatefulHandleRead: no callback data registered");
1008 srv
->flags
.busy
= 0;
1010 helperStatefulRequestFree(r
);
1011 srv
->request
= NULL
;
1012 ++ hlp
->stats
.replies
;
1013 srv
->answer_time
= current_time
;
1014 hlp
->stats
.avg_svc_time
=
1015 Math::intAverage(hlp
->stats
.avg_svc_time
,
1016 tvSubMsec(srv
->dispatch_time
, current_time
),
1017 hlp
->stats
.replies
, REDIRECT_AV_FACTOR
);
1020 helperStatefulServerDone(srv
);
1022 helperStatefulReleaseServer(srv
);
1025 if (Comm::IsConnOpen(srv
->readPipe
)) {
1026 int spaceSize
= srv
->rbuf_sz
- srv
->roffset
- 1;
1027 assert(spaceSize
>= 0);
1029 // grow the input buffer if needed and possible
1030 if (!spaceSize
&& srv
->rbuf_sz
+ 4096 <= ReadBufMaxSize
) {
1031 srv
->rbuf
= (char *)memReallocBuf(srv
->rbuf
, srv
->rbuf_sz
+ 4096, &srv
->rbuf_sz
);
1032 debugs(84, 3, HERE
<< "Grew read buffer to " << srv
->rbuf_sz
);
1033 spaceSize
= srv
->rbuf_sz
- srv
->roffset
- 1;
1034 assert(spaceSize
>= 0);
1037 // quit reading if there is no space left
1039 debugs(84, DBG_IMPORTANT
, "ERROR: Disconnecting from a " <<
1040 "helper that overflowed " << srv
->rbuf_sz
<< "-byte " <<
1041 "Squid input buffer: " << hlp
->id_name
<< " #" <<
1043 srv
->closePipesSafely();
1047 AsyncCall::Pointer call
= commCbCall(5,4, "helperStatefulHandleRead",
1048 CommIoCbPtrFun(helperStatefulHandleRead
, srv
));
1049 comm_read(srv
->readPipe
, srv
->rbuf
+ srv
->roffset
, spaceSize
, call
);
1054 Enqueue(helper
* hlp
, helper_request
* r
)
1056 dlink_node
*link
= (dlink_node
*)memAllocate(MEM_DLINK_NODE
);
1057 dlinkAddTail(r
, link
, &hlp
->queue
);
1058 ++ hlp
->stats
.queue_size
;
1060 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1061 if (hlp
->childs
.needNew() > 0) {
1062 debugs(84, DBG_CRITICAL
, "Starting new " << hlp
->id_name
<< " helpers...");
1063 helperOpenServers(hlp
);
1067 if (hlp
->stats
.queue_size
< (int)hlp
->childs
.n_running
)
1070 if (squid_curtime
- hlp
->last_queue_warn
< 600)
1073 if (shutting_down
|| reconfiguring
)
1076 hlp
->last_queue_warn
= squid_curtime
;
1078 debugs(84, DBG_CRITICAL
, "WARNING: All " << hlp
->childs
.n_active
<< "/" << hlp
->childs
.n_max
<< " " << hlp
->id_name
<< " processes are busy.");
1079 debugs(84, DBG_CRITICAL
, "WARNING: " << hlp
->stats
.queue_size
<< " pending requests queued");
1080 debugs(84, DBG_CRITICAL
, "WARNING: Consider increasing the number of " << hlp
->id_name
<< " processes in your config file.");
1082 if (hlp
->stats
.queue_size
> (int)hlp
->childs
.n_running
* 2)
1083 fatalf("Too many queued %s requests", hlp
->id_name
);
1087 StatefulEnqueue(statefulhelper
* hlp
, helper_stateful_request
* r
)
1089 dlink_node
*link
= (dlink_node
*)memAllocate(MEM_DLINK_NODE
);
1090 dlinkAddTail(r
, link
, &hlp
->queue
);
1091 ++ hlp
->stats
.queue_size
;
1093 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1094 if (hlp
->childs
.needNew() > 0) {
1095 debugs(84, DBG_CRITICAL
, "Starting new " << hlp
->id_name
<< " helpers...");
1096 helperStatefulOpenServers(hlp
);
1100 if (hlp
->stats
.queue_size
< (int)hlp
->childs
.n_running
)
1103 if (hlp
->stats
.queue_size
> (int)hlp
->childs
.n_running
* 2)
1104 fatalf("Too many queued %s requests", hlp
->id_name
);
1106 if (squid_curtime
- hlp
->last_queue_warn
< 600)
1109 if (shutting_down
|| reconfiguring
)
1112 hlp
->last_queue_warn
= squid_curtime
;
1114 debugs(84, DBG_CRITICAL
, "WARNING: All " << hlp
->childs
.n_active
<< "/" << hlp
->childs
.n_max
<< " " << hlp
->id_name
<< " processes are busy.");
1115 debugs(84, DBG_CRITICAL
, "WARNING: " << hlp
->stats
.queue_size
<< " pending requests queued");
1116 debugs(84, DBG_CRITICAL
, "WARNING: Consider increasing the number of " << hlp
->id_name
<< " processes in your config file.");
1119 static helper_request
*
1120 Dequeue(helper
* hlp
)
1123 helper_request
*r
= NULL
;
1125 if ((link
= hlp
->queue
.head
)) {
1126 r
= (helper_request
*)link
->data
;
1127 dlinkDelete(link
, &hlp
->queue
);
1128 memFree(link
, MEM_DLINK_NODE
);
1129 -- hlp
->stats
.queue_size
;
1135 static helper_stateful_request
*
1136 StatefulDequeue(statefulhelper
* hlp
)
1139 helper_stateful_request
*r
= NULL
;
1141 if ((link
= hlp
->queue
.head
)) {
1142 r
= (helper_stateful_request
*)link
->data
;
1143 dlinkDelete(link
, &hlp
->queue
);
1144 memFree(link
, MEM_DLINK_NODE
);
1145 -- hlp
->stats
.queue_size
;
1151 static helper_server
*
1152 GetFirstAvailable(helper
* hlp
)
1156 helper_server
*selected
= NULL
;
1157 debugs(84, 5, "GetFirstAvailable: Running servers " << hlp
->childs
.n_running
);
1159 if (hlp
->childs
.n_running
== 0)
1162 /* Find "least" loaded helper (approx) */
1163 for (n
= hlp
->servers
.head
; n
!= NULL
; n
= n
->next
) {
1164 srv
= (helper_server
*)n
->data
;
1166 if (selected
&& selected
->stats
.pending
<= srv
->stats
.pending
)
1169 if (srv
->flags
.shutdown
)
1172 if (!srv
->stats
.pending
)
1183 /* Check for overload */
1185 debugs(84, 5, "GetFirstAvailable: None available.");
1189 if (selected
->stats
.pending
>= (hlp
->childs
.concurrency
? hlp
->childs
.concurrency
: 1)) {
1190 debugs(84, 3, "GetFirstAvailable: Least-loaded helper is overloaded!");
1194 debugs(84, 5, "GetFirstAvailable: returning srv-" << selected
->index
);
1198 static helper_stateful_server
*
1199 StatefulGetFirstAvailable(statefulhelper
* hlp
)
1202 helper_stateful_server
*srv
= NULL
;
1203 debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp
->childs
.n_running
);
1205 if (hlp
->childs
.n_running
== 0)
1208 for (n
= hlp
->servers
.head
; n
!= NULL
; n
= n
->next
) {
1209 srv
= (helper_stateful_server
*)n
->data
;
1211 if (srv
->flags
.busy
)
1214 if (srv
->flags
.reserved
)
1217 if (srv
->flags
.shutdown
)
1220 if ((hlp
->IsAvailable
!= NULL
) && (srv
->data
!= NULL
) && !(hlp
->IsAvailable(srv
->data
)))
1223 debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv
->index
);
1227 debugs(84, 5, "StatefulGetFirstAvailable: None available.");
1233 helperDispatchWriteDone(const Comm::ConnectionPointer
&conn
, char *buf
, size_t len
, comm_err_t flag
, int xerrno
, void *data
)
1235 helper_server
*srv
= (helper_server
*)data
;
1237 srv
->writebuf
->clean();
1238 delete srv
->writebuf
;
1239 srv
->writebuf
= NULL
;
1240 srv
->flags
.writing
= 0;
1242 if (flag
!= COMM_OK
) {
1243 /* Helper server has crashed */
1244 debugs(84, DBG_CRITICAL
, "helperDispatch: Helper " << srv
->parent
->id_name
<< " #" << srv
->index
+ 1 << " has crashed");
1248 if (!srv
->wqueue
->isNull()) {
1249 srv
->writebuf
= srv
->wqueue
;
1250 srv
->wqueue
= new MemBuf
;
1251 srv
->flags
.writing
= 1;
1252 AsyncCall::Pointer call
= commCbCall(5,5, "helperDispatchWriteDone",
1253 CommIoCbPtrFun(helperDispatchWriteDone
, srv
));
1254 Comm::Write(srv
->writePipe
, srv
->writebuf
->content(), srv
->writebuf
->contentSize(), call
, NULL
);
1259 helperDispatch(helper_server
* srv
, helper_request
* r
)
1261 helper
*hlp
= srv
->parent
;
1262 helper_request
**ptr
= NULL
;
1265 if (!cbdataReferenceValid(r
->data
)) {
1266 debugs(84, DBG_IMPORTANT
, "helperDispatch: invalid callback data");
1267 helperRequestFree(r
);
1271 for (slot
= 0; slot
< (hlp
->childs
.concurrency
? hlp
->childs
.concurrency
: 1); ++slot
) {
1272 if (!srv
->requests
[slot
]) {
1273 ptr
= &srv
->requests
[slot
];
1280 srv
->stats
.pending
+= 1;
1281 r
->dispatch_time
= current_time
;
1283 if (srv
->wqueue
->isNull())
1284 srv
->wqueue
->init();
1286 if (hlp
->childs
.concurrency
)
1287 srv
->wqueue
->Printf("%d %s", slot
, r
->buf
);
1289 srv
->wqueue
->append(r
->buf
, strlen(r
->buf
));
1291 if (!srv
->flags
.writing
) {
1292 assert(NULL
== srv
->writebuf
);
1293 srv
->writebuf
= srv
->wqueue
;
1294 srv
->wqueue
= new MemBuf
;
1295 srv
->flags
.writing
= 1;
1296 AsyncCall::Pointer call
= commCbCall(5,5, "helperDispatchWriteDone",
1297 CommIoCbPtrFun(helperDispatchWriteDone
, srv
));
1298 Comm::Write(srv
->writePipe
, srv
->writebuf
->content(), srv
->writebuf
->contentSize(), call
, NULL
);
1301 debugs(84, 5, "helperDispatch: Request sent to " << hlp
->id_name
<< " #" << srv
->index
+ 1 << ", " << strlen(r
->buf
) << " bytes");
1304 ++ hlp
->stats
.requests
;
1308 helperStatefulDispatchWriteDone(const Comm::ConnectionPointer
&conn
, char *buf
, size_t len
, comm_err_t flag
,
1309 int xerrno
, void *data
)
1316 helperStatefulDispatch(helper_stateful_server
* srv
, helper_stateful_request
* r
)
1318 statefulhelper
*hlp
= srv
->parent
;
1320 if (!cbdataReferenceValid(r
->data
)) {
1321 debugs(84, DBG_IMPORTANT
, "helperStatefulDispatch: invalid callback data");
1322 helperStatefulRequestFree(r
);
1323 helperStatefulReleaseServer(srv
);
1327 debugs(84, 9, "helperStatefulDispatch busying helper " << hlp
->id_name
<< " #" << srv
->index
+ 1);
1329 if (r
->placeholder
== 1) {
1330 /* a callback is needed before this request can _use_ a helper. */
1331 /* we don't care about releasing this helper. The request NEVER
1332 * gets to the helper. So we throw away the return code */
1333 r
->callback(r
->data
, srv
, NULL
);
1334 /* throw away the placeholder */
1335 helperStatefulRequestFree(r
);
1336 /* and push the queue. Note that the callback may have submitted a new
1337 * request to the helper which is why we test for the request*/
1339 if (srv
->request
== NULL
)
1340 helperStatefulServerDone(srv
);
1345 srv
->flags
.busy
= 1;
1346 srv
->flags
.reserved
= 1;
1348 srv
->dispatch_time
= current_time
;
1349 AsyncCall::Pointer call
= commCbCall(5,5, "helperStatefulDispatchWriteDone",
1350 CommIoCbPtrFun(helperStatefulDispatchWriteDone
, hlp
));
1351 Comm::Write(srv
->writePipe
, r
->buf
, strlen(r
->buf
), call
, NULL
);
1352 debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
1353 hlp
->id_name
<< " #" << srv
->index
+ 1 << ", " <<
1354 (int) strlen(r
->buf
) << " bytes");
1357 ++ hlp
->stats
.requests
;
1362 helperKickQueue(helper
* hlp
)
1367 while ((srv
= GetFirstAvailable(hlp
)) && (r
= Dequeue(hlp
)))
1368 helperDispatch(srv
, r
);
1372 helperStatefulKickQueue(statefulhelper
* hlp
)
1374 helper_stateful_request
*r
;
1375 helper_stateful_server
*srv
;
1377 while ((srv
= StatefulGetFirstAvailable(hlp
)) && (r
= StatefulDequeue(hlp
)))
1378 helperStatefulDispatch(srv
, r
);
1382 helperStatefulServerDone(helper_stateful_server
* srv
)
1384 if (!srv
->flags
.shutdown
) {
1385 helperStatefulKickQueue(srv
->parent
);
1386 } else if (!srv
->flags
.closing
&& !srv
->flags
.reserved
&& !srv
->flags
.busy
) {
1387 srv
->closeWritePipeSafely();
1393 helperRequestFree(helper_request
* r
)
1395 cbdataReferenceDone(r
->data
);
1401 helperStatefulRequestFree(helper_stateful_request
* r
)
1404 cbdataReferenceDone(r
->data
);
1410 // TODO: should helper_ and helper_stateful_ have a common parent?
1412 helperStartStats(StoreEntry
*sentry
, void *hlp
, const char *label
)
1416 storeAppendPrintf(sentry
, "%s: unavailable\n", label
);
1421 storeAppendPrintf(sentry
, "%s:\n", label
);