4 * DEBUG: section 84 Helper process maintenance
5 * AUTHOR: Harvest Derived?
7 * SQUID Web Proxy Cache http://www.squid-cache.org/
8 * ----------------------------------------------------------
10 * Squid is the result of efforts by numerous individuals from
11 * the Internet community; see the CONTRIBUTORS file for full
12 * details. Many organizations have provided support for Squid's
13 * development; see the SPONSORS file for full details. Squid is
14 * Copyrighted (C) 2001 by the Regents of the University of
15 * California; see the COPYRIGHT file for full details. Squid
16 * incorporates software developed and/or copyrighted by other
17 * sources; see the CREDITS file for full details.
19 * This program is free software; you can redistribute it and/or modify
20 * it under the terms of the GNU General Public License as published by
21 * the Free Software Foundation; either version 2 of the License, or
22 * (at your option) any later version.
24 * This program is distributed in the hope that it will be useful,
25 * but WITHOUT ANY WARRANTY; without even the implied warranty of
26 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
27 * GNU General Public License for more details.
29 * You should have received a copy of the GNU General Public License
30 * along with this program; if not, write to the Free Software
31 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
35 #include "squid-old.h"
36 #include "base/AsyncCbdataCalls.h"
38 #include "comm/Connection.h"
39 #include "comm/Write.h"
41 #include "format/Quoting.h"
43 #include "SquidMath.h"
44 #include "SquidTime.h"
48 #define HELPER_MAX_ARGS 64
50 /* size of helper read buffer (maximum?). no reason given for this size */
51 /* though it has been seen to be too short for some requests */
52 /* it is dynamic, so increasng should not have side effects */
55 static IOCB helperHandleRead
;
56 static IOCB helperStatefulHandleRead
;
57 static void helperServerFree(helper_server
*srv
);
58 static void helperStatefulServerFree(helper_stateful_server
*srv
);
59 static void Enqueue(helper
* hlp
, helper_request
*);
60 static helper_request
*Dequeue(helper
* hlp
);
61 static helper_stateful_request
*StatefulDequeue(statefulhelper
* hlp
);
62 static helper_server
*GetFirstAvailable(helper
* hlp
);
63 static helper_stateful_server
*StatefulGetFirstAvailable(statefulhelper
* hlp
);
64 static void helperDispatch(helper_server
* srv
, helper_request
* r
);
65 static void helperStatefulDispatch(helper_stateful_server
* srv
, helper_stateful_request
* r
);
66 static void helperKickQueue(helper
* hlp
);
67 static void helperStatefulKickQueue(statefulhelper
* hlp
);
68 static void helperStatefulServerDone(helper_stateful_server
* srv
);
69 static void helperRequestFree(helper_request
* r
);
70 static void helperStatefulRequestFree(helper_stateful_request
* r
);
71 static void StatefulEnqueue(statefulhelper
* hlp
, helper_stateful_request
* r
);
72 static bool helperStartStats(StoreEntry
*sentry
, void *hlp
, const char *label
);
75 CBDATA_CLASS_INIT(helper
);
76 CBDATA_TYPE(helper_server
);
77 CBDATA_CLASS_INIT(statefulhelper
);
78 CBDATA_TYPE(helper_stateful_server
);
81 HelperServerBase::closePipesSafely()
86 shutdown(writePipe
->fd
, SD_BOTH
);
90 if (readPipe
->fd
== writePipe
->fd
)
98 if (WaitForSingleObject(hIpc
, 5000) != WAIT_OBJECT_0
) {
100 debugs(84, DBG_IMPORTANT
, "WARNING: " << hlp
->id_name
<<
101 " #" << no
<< " (" << hlp
->cmdline
->key
<< "," <<
102 (long int)pid
<< ") didn't exit in 5 seconds");
110 HelperServerBase::closeWritePipeSafely()
115 shutdown(writePipe
->fd
, (readPipe
->fd
== writePipe
->fd
? SD_BOTH
: SD_SEND
));
119 if (readPipe
->fd
== writePipe
->fd
)
125 if (WaitForSingleObject(hIpc
, 5000) != WAIT_OBJECT_0
) {
127 debugs(84, DBG_IMPORTANT
, "WARNING: " << hlp
->id_name
<<
128 " #" << no
<< " (" << hlp
->cmdline
->key
<< "," <<
129 (long int)pid
<< ") didn't exit in 5 seconds");
137 helperOpenServers(helper
* hlp
)
143 const char *args
[HELPER_MAX_ARGS
+1]; // save space for a NULL terminator
144 char fd_note_buf
[FD_DESC_SZ
];
154 if (hlp
->cmdline
== NULL
)
157 progname
= hlp
->cmdline
->key
;
159 if ((s
= strrchr(progname
, '/')))
160 shortname
= xstrdup(s
+ 1);
162 shortname
= xstrdup(progname
);
164 /* figure out how many new child are actually needed. */
165 int need_new
= hlp
->childs
.needNew();
167 debugs(84, 1, "helperOpenServers: Starting " << need_new
<< "/" << hlp
->childs
.n_max
<< " '" << shortname
<< "' processes");
170 debugs(84, 1, "helperOpenServers: No '" << shortname
<< "' processes needed.");
173 procname
= (char *)xmalloc(strlen(shortname
) + 3);
175 snprintf(procname
, strlen(shortname
) + 3, "(%s)", shortname
);
177 args
[nargs
++] = procname
;
179 for (w
= hlp
->cmdline
->next
; w
&& nargs
< HELPER_MAX_ARGS
; w
= w
->next
)
180 args
[nargs
++] = w
->key
;
182 args
[nargs
++] = NULL
;
184 assert(nargs
<= HELPER_MAX_ARGS
);
186 for (k
= 0; k
< need_new
; k
++) {
189 pid
= ipcCreate(hlp
->ipc_type
,
199 debugs(84, 1, "WARNING: Cannot run '" << progname
<< "' process.");
203 hlp
->childs
.n_running
++;
204 hlp
->childs
.n_active
++;
205 CBDATA_INIT_TYPE(helper_server
);
206 srv
= cbdataAlloc(helper_server
);
210 srv
->addr
= hlp
->addr
;
211 srv
->readPipe
= new Comm::Connection
;
212 srv
->readPipe
->fd
= rfd
;
213 srv
->writePipe
= new Comm::Connection
;
214 srv
->writePipe
->fd
= wfd
;
215 srv
->rbuf
= (char *)memAllocBuf(BUF_8KB
, &srv
->rbuf_sz
);
216 srv
->wqueue
= new MemBuf
;
218 srv
->requests
= (helper_request
**)xcalloc(hlp
->childs
.concurrency
? hlp
->childs
.concurrency
: 1, sizeof(*srv
->requests
));
219 srv
->parent
= cbdataReference(hlp
);
220 dlinkAddTail(srv
, &srv
->link
, &hlp
->servers
);
223 snprintf(fd_note_buf
, FD_DESC_SZ
, "%s #%d", shortname
, k
+ 1);
224 fd_note(rfd
, fd_note_buf
);
226 snprintf(fd_note_buf
, FD_DESC_SZ
, "reading %s #%d", shortname
, k
+ 1);
227 fd_note(rfd
, fd_note_buf
);
228 snprintf(fd_note_buf
, FD_DESC_SZ
, "writing %s #%d", shortname
, k
+ 1);
229 fd_note(wfd
, fd_note_buf
);
232 commSetNonBlocking(rfd
);
235 commSetNonBlocking(wfd
);
237 AsyncCall::Pointer closeCall
= asyncCall(5,4, "helperServerFree", cbdataDialer(helperServerFree
, srv
));
238 comm_add_close_handler(rfd
, closeCall
);
240 AsyncCall::Pointer call
= commCbCall(5,4, "helperHandleRead",
241 CommIoCbPtrFun(helperHandleRead
, srv
));
242 comm_read(srv
->readPipe
, srv
->rbuf
, srv
->rbuf_sz
- 1, call
);
245 hlp
->last_restart
= squid_curtime
;
246 safe_free(shortname
);
248 helperKickQueue(hlp
);
254 * helperStatefulOpenServers: create the stateful child helper processes
257 helperStatefulOpenServers(statefulhelper
* hlp
)
260 const char *args
[HELPER_MAX_ARGS
+1]; // save space for a NULL terminator
261 char fd_note_buf
[FD_DESC_SZ
];
264 if (hlp
->cmdline
== NULL
)
267 if (hlp
->childs
.concurrency
)
268 debugs(84, 0, "ERROR: concurrency= is not yet supported for stateful helpers ('" << hlp
->cmdline
<< "')");
270 char *progname
= hlp
->cmdline
->key
;
273 if ((s
= strrchr(progname
, '/')))
274 shortname
= xstrdup(s
+ 1);
276 shortname
= xstrdup(progname
);
278 /* figure out haw mant new helpers are needed. */
279 int need_new
= hlp
->childs
.needNew();
281 debugs(84, 1, "helperOpenServers: Starting " << need_new
<< "/" << hlp
->childs
.n_max
<< " '" << shortname
<< "' processes");
284 debugs(84, 1, "helperStatefulOpenServers: No '" << shortname
<< "' processes needed.");
287 char *procname
= (char *)xmalloc(strlen(shortname
) + 3);
289 snprintf(procname
, strlen(shortname
) + 3, "(%s)", shortname
);
291 args
[nargs
++] = procname
;
293 for (wordlist
*w
= hlp
->cmdline
->next
; w
&& nargs
< HELPER_MAX_ARGS
; w
= w
->next
)
294 args
[nargs
++] = w
->key
;
296 args
[nargs
++] = NULL
;
298 assert(nargs
<= HELPER_MAX_ARGS
);
300 for (int k
= 0; k
< need_new
; k
++) {
305 pid_t pid
= ipcCreate(hlp
->ipc_type
,
315 debugs(84, 1, "WARNING: Cannot run '" << progname
<< "' process.");
319 hlp
->childs
.n_running
++;
320 hlp
->childs
.n_active
++;
321 CBDATA_INIT_TYPE(helper_stateful_server
);
322 helper_stateful_server
*srv
= cbdataAlloc(helper_stateful_server
);
325 srv
->flags
.reserved
= 0;
326 srv
->stats
.submits
= 0;
327 srv
->stats
.releases
= 0;
329 srv
->addr
= hlp
->addr
;
330 srv
->readPipe
= new Comm::Connection
;
331 srv
->readPipe
->fd
= rfd
;
332 srv
->writePipe
= new Comm::Connection
;
333 srv
->writePipe
->fd
= wfd
;
334 srv
->rbuf
= (char *)memAllocBuf(BUF_8KB
, &srv
->rbuf_sz
);
336 srv
->parent
= cbdataReference(hlp
);
338 if (hlp
->datapool
!= NULL
)
339 srv
->data
= hlp
->datapool
->alloc();
341 dlinkAddTail(srv
, &srv
->link
, &hlp
->servers
);
344 snprintf(fd_note_buf
, FD_DESC_SZ
, "%s #%d", shortname
, k
+ 1);
345 fd_note(rfd
, fd_note_buf
);
347 snprintf(fd_note_buf
, FD_DESC_SZ
, "reading %s #%d", shortname
, k
+ 1);
348 fd_note(rfd
, fd_note_buf
);
349 snprintf(fd_note_buf
, FD_DESC_SZ
, "writing %s #%d", shortname
, k
+ 1);
350 fd_note(wfd
, fd_note_buf
);
353 commSetNonBlocking(rfd
);
356 commSetNonBlocking(wfd
);
358 AsyncCall::Pointer closeCall
= asyncCall(5,4, "helperStatefulServerFree", cbdataDialer(helperStatefulServerFree
, srv
));
359 comm_add_close_handler(rfd
, closeCall
);
361 AsyncCall::Pointer call
= commCbCall(5,4, "helperStatefulHandleRead",
362 CommIoCbPtrFun(helperStatefulHandleRead
, srv
));
363 comm_read(srv
->readPipe
, srv
->rbuf
, srv
->rbuf_sz
- 1, call
);
366 hlp
->last_restart
= squid_curtime
;
367 safe_free(shortname
);
369 helperStatefulKickQueue(hlp
);
374 helperSubmit(helper
* hlp
, const char *buf
, HLPCB
* callback
, void *data
)
377 debugs(84, 3, "helperSubmit: hlp == NULL");
378 callback(data
, NULL
);
382 helper_request
*r
= new helper_request
;
385 r
->callback
= callback
;
386 r
->data
= cbdataReference(data
);
387 r
->buf
= xstrdup(buf
);
389 if ((srv
= GetFirstAvailable(hlp
)))
390 helperDispatch(srv
, r
);
394 debugs(84, 9, "helperSubmit: " << buf
);
397 /// lastserver = "server last used as part of a reserved request sequence"
399 helperStatefulSubmit(statefulhelper
* hlp
, const char *buf
, HLPSCB
* callback
, void *data
, helper_stateful_server
* lastserver
)
402 debugs(84, 3, "helperStatefulSubmit: hlp == NULL");
403 callback(data
, 0, NULL
);
407 helper_stateful_request
*r
= new helper_stateful_request
;
409 r
->callback
= callback
;
410 r
->data
= cbdataReference(data
);
413 r
->buf
= xstrdup(buf
);
420 if ((buf
!= NULL
) && lastserver
) {
421 debugs(84, 5, "StatefulSubmit with lastserver " << lastserver
);
422 assert(lastserver
->flags
.reserved
);
423 assert(!(lastserver
->request
));
425 debugs(84, 5, "StatefulSubmit dispatching");
426 helperStatefulDispatch(lastserver
, r
);
428 helper_stateful_server
*srv
;
429 if ((srv
= StatefulGetFirstAvailable(hlp
))) {
430 helperStatefulDispatch(srv
, r
);
432 StatefulEnqueue(hlp
, r
);
435 debugs(84, 9, "helperStatefulSubmit: placeholder: '" << r
->placeholder
<< "', buf '" << buf
<< "'.");
441 * helperStatefulReleaseServer tells the helper that whoever was
442 * using it no longer needs its services.
445 helperStatefulReleaseServer(helper_stateful_server
* srv
)
447 debugs(84, 3, HERE
<< "srv-" << srv
->index
<< " flags.reserved = " << srv
->flags
.reserved
);
448 if (!srv
->flags
.reserved
)
451 srv
->stats
.releases
++;
453 srv
->flags
.reserved
= 0;
454 if (srv
->parent
->OnEmptyQueue
!= NULL
&& srv
->data
)
455 srv
->parent
->OnEmptyQueue(srv
->data
);
457 helperStatefulServerDone(srv
);
460 /** return a pointer to the stateful routines data area */
462 helperStatefulServerGetData(helper_stateful_server
* srv
)
468 * Dump some stats about the helper states to a StoreEntry
471 helperStats(StoreEntry
* sentry
, helper
* hlp
, const char *label
)
473 if (!helperStartStats(sentry
, hlp
, label
))
476 storeAppendPrintf(sentry
, "program: %s\n",
478 storeAppendPrintf(sentry
, "number active: %d of %d (%d shutting down)\n",
479 hlp
->childs
.n_active
, hlp
->childs
.n_max
, (hlp
->childs
.n_running
- hlp
->childs
.n_active
) );
480 storeAppendPrintf(sentry
, "requests sent: %d\n",
481 hlp
->stats
.requests
);
482 storeAppendPrintf(sentry
, "replies received: %d\n",
484 storeAppendPrintf(sentry
, "queue length: %d\n",
485 hlp
->stats
.queue_size
);
486 storeAppendPrintf(sentry
, "avg service time: %d msec\n",
487 hlp
->stats
.avg_svc_time
);
488 storeAppendPrintf(sentry
, "\n");
489 storeAppendPrintf(sentry
, "%7s\t%7s\t%7s\t%11s\t%s\t%7s\t%7s\t%7s\n",
499 for (dlink_node
*link
= hlp
->servers
.head
; link
; link
= link
->next
) {
500 helper_server
*srv
= (helper_server
*)link
->data
;
501 double tt
= 0.001 * (srv
->requests
[0] ? tvSubMsec(srv
->requests
[0]->dispatch_time
, current_time
) : tvSubMsec(srv
->dispatch_time
, srv
->answer_time
));
502 storeAppendPrintf(sentry
, "%7d\t%7d\t%7d\t%11d\t%c%c%c%c\t%7.3f\t%7d\t%s\n",
507 srv
->stats
.pending
? 'B' : ' ',
508 srv
->flags
.writing
? 'W' : ' ',
509 srv
->flags
.closing
? 'C' : ' ',
510 srv
->flags
.shutdown
? 'S' : ' ',
513 srv
->requests
[0] ? Format::QuoteMimeBlob(srv
->requests
[0]->buf
) : "(none)");
516 storeAppendPrintf(sentry
, "\nFlags key:\n\n");
517 storeAppendPrintf(sentry
, " B = BUSY\n");
518 storeAppendPrintf(sentry
, " W = WRITING\n");
519 storeAppendPrintf(sentry
, " C = CLOSING\n");
520 storeAppendPrintf(sentry
, " S = SHUTDOWN PENDING\n");
524 helperStatefulStats(StoreEntry
* sentry
, statefulhelper
* hlp
, const char *label
)
526 if (!helperStartStats(sentry
, hlp
, label
))
529 storeAppendPrintf(sentry
, "program: %s\n",
531 storeAppendPrintf(sentry
, "number active: %d of %d (%d shutting down)\n",
532 hlp
->childs
.n_active
, hlp
->childs
.n_max
, (hlp
->childs
.n_running
- hlp
->childs
.n_active
) );
533 storeAppendPrintf(sentry
, "requests sent: %d\n",
534 hlp
->stats
.requests
);
535 storeAppendPrintf(sentry
, "replies received: %d\n",
537 storeAppendPrintf(sentry
, "queue length: %d\n",
538 hlp
->stats
.queue_size
);
539 storeAppendPrintf(sentry
, "avg service time: %d msec\n",
540 hlp
->stats
.avg_svc_time
);
541 storeAppendPrintf(sentry
, "\n");
542 storeAppendPrintf(sentry
, "%7s\t%7s\t%7s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
552 for (dlink_node
*link
= hlp
->servers
.head
; link
; link
= link
->next
) {
553 helper_stateful_server
*srv
= (helper_stateful_server
*)link
->data
;
554 double tt
= 0.001 * tvSubMsec(srv
->dispatch_time
, srv
->flags
.busy
? current_time
: srv
->answer_time
);
555 storeAppendPrintf(sentry
, "%7d\t%7d\t%7d\t%11d\t%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
560 srv
->flags
.busy
? 'B' : ' ',
561 srv
->flags
.closing
? 'C' : ' ',
562 srv
->flags
.reserved
? 'R' : ' ',
563 srv
->flags
.shutdown
? 'S' : ' ',
564 srv
->request
? (srv
->request
->placeholder
? 'P' : ' ') : ' ',
567 srv
->request
? Format::QuoteMimeBlob(srv
->request
->buf
) : "(none)");
570 storeAppendPrintf(sentry
, "\nFlags key:\n\n");
571 storeAppendPrintf(sentry
, " B = BUSY\n");
572 storeAppendPrintf(sentry
, " C = CLOSING\n");
573 storeAppendPrintf(sentry
, " R = RESERVED\n");
574 storeAppendPrintf(sentry
, " S = SHUTDOWN PENDING\n");
575 storeAppendPrintf(sentry
, " P = PLACEHOLDER\n");
579 helperShutdown(helper
* hlp
)
581 dlink_node
*link
= hlp
->servers
.head
;
585 srv
= (helper_server
*)link
->data
;
588 if (srv
->flags
.shutdown
) {
589 debugs(84, 3, "helperShutdown: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " has already SHUT DOWN.");
593 assert(hlp
->childs
.n_active
> 0);
594 hlp
->childs
.n_active
--;
595 srv
->flags
.shutdown
= 1; /* request it to shut itself down */
597 if (srv
->flags
.closing
) {
598 debugs(84, 3, "helperShutdown: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " is CLOSING.");
602 if (srv
->stats
.pending
) {
603 debugs(84, 3, "helperShutdown: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " is BUSY.");
607 debugs(84, 3, "helperShutdown: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " shutting down.");
608 /* the rest of the details is dealt with in the helperServerFree
611 srv
->closePipesSafely();
616 helperStatefulShutdown(statefulhelper
* hlp
)
618 dlink_node
*link
= hlp
->servers
.head
;
619 helper_stateful_server
*srv
;
622 srv
= (helper_stateful_server
*)link
->data
;
625 if (srv
->flags
.shutdown
) {
626 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " has already SHUT DOWN.");
630 assert(hlp
->childs
.n_active
> 0);
631 hlp
->childs
.n_active
--;
632 srv
->flags
.shutdown
= 1; /* request it to shut itself down */
634 if (srv
->flags
.busy
) {
635 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " is BUSY.");
639 if (srv
->flags
.closing
) {
640 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " is CLOSING.");
644 if (srv
->flags
.reserved
) {
646 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " is RESERVED. Closing anyway.");
648 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " is RESERVED. Not Shutting Down Yet.");
653 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " shutting down.");
655 /* the rest of the details is dealt with in the helperStatefulServerFree
658 srv
->closePipesSafely();
664 /* note, don't free id_name, it probably points to static memory */
667 debugs(84, 0, "WARNING: freeing " << id_name
<< " helper with " << stats
.queue_size
<< " requests queued");
670 /* ====================================================================== */
671 /* LOCAL FUNCTIONS */
672 /* ====================================================================== */
675 helperServerFree(helper_server
*srv
)
677 helper
*hlp
= srv
->parent
;
679 int i
, concurrency
= hlp
->childs
.concurrency
;
685 memFreeBuf(srv
->rbuf_sz
, srv
->rbuf
);
689 srv
->wqueue
->clean();
693 srv
->writebuf
->clean();
694 delete srv
->writebuf
;
695 srv
->writebuf
= NULL
;
698 if (Comm::IsConnOpen(srv
->writePipe
))
699 srv
->closeWritePipeSafely();
701 dlinkDelete(&srv
->link
, &hlp
->servers
);
703 assert(hlp
->childs
.n_running
> 0);
704 hlp
->childs
.n_running
--;
706 if (!srv
->flags
.shutdown
) {
707 assert(hlp
->childs
.n_active
> 0);
708 hlp
->childs
.n_active
--;
709 debugs(84, DBG_CRITICAL
, "WARNING: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " exited");
711 if (hlp
->childs
.needNew() > 0) {
712 debugs(80, 1, "Too few " << hlp
->id_name
<< " processes are running (need " << hlp
->childs
.needNew() << "/" << hlp
->childs
.n_max
<< ")");
714 if (hlp
->childs
.n_active
< hlp
->childs
.n_startup
&& hlp
->last_restart
> squid_curtime
- 30)
715 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp
->id_name
);
717 debugs(80, 1, "Starting new helpers");
718 helperOpenServers(hlp
);
722 for (i
= 0; i
< concurrency
; i
++) {
723 if ((r
= srv
->requests
[i
])) {
726 if (cbdataReferenceValidDone(r
->data
, &cbdata
))
727 r
->callback(cbdata
, NULL
);
729 helperRequestFree(r
);
731 srv
->requests
[i
] = NULL
;
734 safe_free(srv
->requests
);
736 cbdataReferenceDone(srv
->parent
);
741 helperStatefulServerFree(helper_stateful_server
*srv
)
743 statefulhelper
*hlp
= srv
->parent
;
744 helper_stateful_request
*r
;
747 memFreeBuf(srv
->rbuf_sz
, srv
->rbuf
);
752 srv
->wqueue
->clean();
758 /* TODO: walk the local queue of requests and carry them all out */
759 if (Comm::IsConnOpen(srv
->writePipe
))
760 srv
->closeWritePipeSafely();
762 dlinkDelete(&srv
->link
, &hlp
->servers
);
764 assert(hlp
->childs
.n_running
> 0);
765 hlp
->childs
.n_running
--;
767 if (!srv
->flags
.shutdown
) {
768 assert( hlp
->childs
.n_active
> 0);
769 hlp
->childs
.n_active
--;
770 debugs(84, 0, "WARNING: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " exited");
772 if (hlp
->childs
.needNew() > 0) {
773 debugs(80, 1, "Too few " << hlp
->id_name
<< " processes are running (need " << hlp
->childs
.needNew() << "/" << hlp
->childs
.n_max
<< ")");
775 if (hlp
->childs
.n_active
< hlp
->childs
.n_startup
&& hlp
->last_restart
> squid_curtime
- 30)
776 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp
->id_name
);
778 debugs(80, 1, "Starting new helpers");
779 helperStatefulOpenServers(hlp
);
783 if ((r
= srv
->request
)) {
786 if (cbdataReferenceValidDone(r
->data
, &cbdata
))
787 r
->callback(cbdata
, srv
, NULL
);
789 helperStatefulRequestFree(r
);
794 if (srv
->data
!= NULL
)
795 hlp
->datapool
->freeOne(srv
->data
);
797 cbdataReferenceDone(srv
->parent
);
802 /// Calls back with a pointer to the buffer with the helper output
803 static void helperReturnBuffer(int request_number
, helper_server
* srv
, helper
* hlp
, char * msg
, char * msg_end
)
805 helper_request
*r
= srv
->requests
[request_number
];
807 HLPCB
*callback
= r
->callback
;
809 srv
->requests
[request_number
] = NULL
;
814 if (cbdataReferenceValidDone(r
->data
, &cbdata
))
815 callback(cbdata
, msg
);
817 srv
->stats
.pending
--;
819 hlp
->stats
.replies
++;
821 srv
->answer_time
= current_time
;
823 srv
->dispatch_time
= r
->dispatch_time
;
825 hlp
->stats
.avg_svc_time
=
826 Math::intAverage(hlp
->stats
.avg_svc_time
,
827 tvSubMsec(r
->dispatch_time
, current_time
),
828 hlp
->stats
.replies
, REDIRECT_AV_FACTOR
);
830 helperRequestFree(r
);
832 debugs(84, 1, "helperHandleRead: unexpected reply on channel " <<
833 request_number
<< " from " << hlp
->id_name
<< " #" << srv
->index
+ 1 <<
834 " '" << srv
->rbuf
<< "'");
836 srv
->roffset
-= (msg_end
- srv
->rbuf
);
837 memmove(srv
->rbuf
, msg_end
, srv
->roffset
+ 1);
839 if (!srv
->flags
.shutdown
) {
840 helperKickQueue(hlp
);
841 } else if (!srv
->flags
.closing
&& !srv
->stats
.pending
) {
842 srv
->flags
.closing
=1;
843 srv
->writePipe
->close();
849 helperHandleRead(const Comm::ConnectionPointer
&conn
, char *buf
, size_t len
, comm_err_t flag
, int xerrno
, void *data
)
852 helper_server
*srv
= (helper_server
*)data
;
853 helper
*hlp
= srv
->parent
;
854 assert(cbdataReferenceValid(data
));
856 /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
858 if (flag
== COMM_ERR_CLOSING
) {
862 assert(conn
->fd
== srv
->readPipe
->fd
);
864 debugs(84, 5, "helperHandleRead: " << len
<< " bytes from " << hlp
->id_name
<< " #" << srv
->index
+ 1);
866 if (flag
!= COMM_OK
|| len
== 0) {
867 srv
->closePipesSafely();
872 srv
->rbuf
[srv
->roffset
] = '\0';
873 debugs(84, 9, "helperHandleRead: '" << srv
->rbuf
<< "'");
875 if (!srv
->stats
.pending
) {
876 /* someone spoke without being spoken to */
877 debugs(84, 1, "helperHandleRead: unexpected read from " <<
878 hlp
->id_name
<< " #" << srv
->index
+ 1 << ", " << (int)len
<<
879 " bytes '" << srv
->rbuf
<< "'");
885 while ((t
= strchr(srv
->rbuf
, hlp
->eom
))) {
886 /* end of reply found */
887 char *msg
= srv
->rbuf
;
889 debugs(84, 3, "helperHandleRead: end of reply found");
891 if (t
> srv
->rbuf
&& t
[-1] == '\r' && hlp
->eom
== '\n')
896 if (hlp
->childs
.concurrency
) {
897 i
= strtol(msg
, &msg
, 10);
899 while (*msg
&& xisspace(*msg
))
903 helperReturnBuffer(i
, srv
, hlp
, msg
, t
);
906 if (Comm::IsConnOpen(srv
->readPipe
)) {
907 AsyncCall::Pointer call
= commCbCall(5,4, "helperHandleRead",
908 CommIoCbPtrFun(helperHandleRead
, srv
));
909 comm_read(srv
->readPipe
, srv
->rbuf
+ srv
->roffset
, srv
->rbuf_sz
- srv
->roffset
- 1, call
);
914 helperStatefulHandleRead(const Comm::ConnectionPointer
&conn
, char *buf
, size_t len
, comm_err_t flag
, int xerrno
, void *data
)
917 helper_stateful_server
*srv
= (helper_stateful_server
*)data
;
918 helper_stateful_request
*r
;
919 statefulhelper
*hlp
= srv
->parent
;
920 assert(cbdataReferenceValid(data
));
922 /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
924 if (flag
== COMM_ERR_CLOSING
) {
928 assert(conn
->fd
== srv
->readPipe
->fd
);
930 debugs(84, 5, "helperStatefulHandleRead: " << len
<< " bytes from " <<
931 hlp
->id_name
<< " #" << srv
->index
+ 1);
934 if (flag
!= COMM_OK
|| len
== 0) {
935 srv
->closePipesSafely();
940 srv
->rbuf
[srv
->roffset
] = '\0';
944 /* someone spoke without being spoken to */
945 debugs(84, 1, "helperStatefulHandleRead: unexpected read from " <<
946 hlp
->id_name
<< " #" << srv
->index
+ 1 << ", " << (int)len
<<
947 " bytes '" << srv
->rbuf
<< "'");
952 if ((t
= strchr(srv
->rbuf
, hlp
->eom
))) {
953 /* end of reply found */
955 debugs(84, 3, "helperStatefulHandleRead: end of reply found");
957 if (t
> srv
->rbuf
&& t
[-1] == '\r' && hlp
->eom
== '\n')
962 if (r
&& cbdataReferenceValid(r
->data
)) {
963 r
->callback(r
->data
, srv
, srv
->rbuf
);
965 debugs(84, 1, "StatefulHandleRead: no callback data registered");
971 helperStatefulRequestFree(r
);
973 hlp
->stats
.replies
++;
974 srv
->answer_time
= current_time
;
975 hlp
->stats
.avg_svc_time
=
976 Math::intAverage(hlp
->stats
.avg_svc_time
,
977 tvSubMsec(srv
->dispatch_time
, current_time
),
978 hlp
->stats
.replies
, REDIRECT_AV_FACTOR
);
981 helperStatefulServerDone(srv
);
983 helperStatefulReleaseServer(srv
);
986 if (Comm::IsConnOpen(srv
->readPipe
)) {
987 AsyncCall::Pointer call
= commCbCall(5,4, "helperStatefulHandleRead",
988 CommIoCbPtrFun(helperStatefulHandleRead
, srv
));
989 comm_read(srv
->readPipe
, srv
->rbuf
+ srv
->roffset
, srv
->rbuf_sz
- srv
->roffset
- 1, call
);
994 Enqueue(helper
* hlp
, helper_request
* r
)
996 dlink_node
*link
= (dlink_node
*)memAllocate(MEM_DLINK_NODE
);
997 dlinkAddTail(r
, link
, &hlp
->queue
);
998 hlp
->stats
.queue_size
++;
1000 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1001 if (hlp
->childs
.needNew() > 0) {
1002 debugs(84, 0, "Starting new " << hlp
->id_name
<< " helpers...");
1003 helperOpenServers(hlp
);
1007 if (hlp
->stats
.queue_size
< (int)hlp
->childs
.n_running
)
1010 if (squid_curtime
- hlp
->last_queue_warn
< 600)
1013 if (shutting_down
|| reconfiguring
)
1016 hlp
->last_queue_warn
= squid_curtime
;
1018 debugs(84, 0, "WARNING: All " << hlp
->childs
.n_active
<< "/" << hlp
->childs
.n_max
<< " " << hlp
->id_name
<< " processes are busy.");
1019 debugs(84, 0, "WARNING: " << hlp
->stats
.queue_size
<< " pending requests queued");
1020 debugs(84, 0, "WARNING: Consider increasing the number of " << hlp
->id_name
<< " processes in your config file.");
1022 if (hlp
->stats
.queue_size
> (int)hlp
->childs
.n_running
* 2)
1023 fatalf("Too many queued %s requests", hlp
->id_name
);
1027 StatefulEnqueue(statefulhelper
* hlp
, helper_stateful_request
* r
)
1029 dlink_node
*link
= (dlink_node
*)memAllocate(MEM_DLINK_NODE
);
1030 dlinkAddTail(r
, link
, &hlp
->queue
);
1031 hlp
->stats
.queue_size
++;
1033 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1034 if (hlp
->childs
.needNew() > 0) {
1035 debugs(84, 0, "Starting new " << hlp
->id_name
<< " helpers...");
1036 helperStatefulOpenServers(hlp
);
1040 if (hlp
->stats
.queue_size
< (int)hlp
->childs
.n_running
)
1043 if (hlp
->stats
.queue_size
> (int)hlp
->childs
.n_running
* 2)
1044 fatalf("Too many queued %s requests", hlp
->id_name
);
1046 if (squid_curtime
- hlp
->last_queue_warn
< 600)
1049 if (shutting_down
|| reconfiguring
)
1052 hlp
->last_queue_warn
= squid_curtime
;
1054 debugs(84, 0, "WARNING: All " << hlp
->childs
.n_active
<< "/" << hlp
->childs
.n_max
<< " " << hlp
->id_name
<< " processes are busy.");
1055 debugs(84, 0, "WARNING: " << hlp
->stats
.queue_size
<< " pending requests queued");
1056 debugs(84, 0, "WARNING: Consider increasing the number of " << hlp
->id_name
<< " processes in your config file.");
1059 static helper_request
*
1060 Dequeue(helper
* hlp
)
1063 helper_request
*r
= NULL
;
1065 if ((link
= hlp
->queue
.head
)) {
1066 r
= (helper_request
*)link
->data
;
1067 dlinkDelete(link
, &hlp
->queue
);
1068 memFree(link
, MEM_DLINK_NODE
);
1069 hlp
->stats
.queue_size
--;
1075 static helper_stateful_request
*
1076 StatefulDequeue(statefulhelper
* hlp
)
1079 helper_stateful_request
*r
= NULL
;
1081 if ((link
= hlp
->queue
.head
)) {
1082 r
= (helper_stateful_request
*)link
->data
;
1083 dlinkDelete(link
, &hlp
->queue
);
1084 memFree(link
, MEM_DLINK_NODE
);
1085 hlp
->stats
.queue_size
--;
1091 static helper_server
*
1092 GetFirstAvailable(helper
* hlp
)
1096 helper_server
*selected
= NULL
;
1097 debugs(84, 5, "GetFirstAvailable: Running servers " << hlp
->childs
.n_running
);
1099 if (hlp
->childs
.n_running
== 0)
1102 /* Find "least" loaded helper (approx) */
1103 for (n
= hlp
->servers
.head
; n
!= NULL
; n
= n
->next
) {
1104 srv
= (helper_server
*)n
->data
;
1106 if (selected
&& selected
->stats
.pending
<= srv
->stats
.pending
)
1109 if (srv
->flags
.shutdown
)
1112 if (!srv
->stats
.pending
)
1123 /* Check for overload */
1125 debugs(84, 5, "GetFirstAvailable: None available.");
1129 if (selected
->stats
.pending
>= (hlp
->childs
.concurrency
? hlp
->childs
.concurrency
: 1)) {
1130 debugs(84, 3, "GetFirstAvailable: Least-loaded helper is overloaded!");
1134 debugs(84, 5, "GetFirstAvailable: returning srv-" << selected
->index
);
1138 static helper_stateful_server
*
1139 StatefulGetFirstAvailable(statefulhelper
* hlp
)
1142 helper_stateful_server
*srv
= NULL
;
1143 debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp
->childs
.n_running
);
1145 if (hlp
->childs
.n_running
== 0)
1148 for (n
= hlp
->servers
.head
; n
!= NULL
; n
= n
->next
) {
1149 srv
= (helper_stateful_server
*)n
->data
;
1151 if (srv
->flags
.busy
)
1154 if (srv
->flags
.reserved
)
1157 if (srv
->flags
.shutdown
)
1160 if ((hlp
->IsAvailable
!= NULL
) && (srv
->data
!= NULL
) && !(hlp
->IsAvailable(srv
->data
)))
1163 debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv
->index
);
1167 debugs(84, 5, "StatefulGetFirstAvailable: None available.");
1173 helperDispatchWriteDone(const Comm::ConnectionPointer
&conn
, char *buf
, size_t len
, comm_err_t flag
, int xerrno
, void *data
)
1175 helper_server
*srv
= (helper_server
*)data
;
1177 srv
->writebuf
->clean();
1178 delete srv
->writebuf
;
1179 srv
->writebuf
= NULL
;
1180 srv
->flags
.writing
= 0;
1182 if (flag
!= COMM_OK
) {
1183 /* Helper server has crashed */
1184 debugs(84, 0, "helperDispatch: Helper " << srv
->parent
->id_name
<< " #" << srv
->index
+ 1 << " has crashed");
1188 if (!srv
->wqueue
->isNull()) {
1189 srv
->writebuf
= srv
->wqueue
;
1190 srv
->wqueue
= new MemBuf
;
1191 srv
->flags
.writing
= 1;
1192 AsyncCall::Pointer call
= commCbCall(5,5, "helperDispatchWriteDone",
1193 CommIoCbPtrFun(helperDispatchWriteDone
, srv
));
1194 Comm::Write(srv
->writePipe
, srv
->writebuf
->content(), srv
->writebuf
->contentSize(), call
, NULL
);
1199 helperDispatch(helper_server
* srv
, helper_request
* r
)
1201 helper
*hlp
= srv
->parent
;
1202 helper_request
**ptr
= NULL
;
1205 if (!cbdataReferenceValid(r
->data
)) {
1206 debugs(84, 1, "helperDispatch: invalid callback data");
1207 helperRequestFree(r
);
1211 for (slot
= 0; slot
< (hlp
->childs
.concurrency
? hlp
->childs
.concurrency
: 1); slot
++) {
1212 if (!srv
->requests
[slot
]) {
1213 ptr
= &srv
->requests
[slot
];
1220 srv
->stats
.pending
+= 1;
1221 r
->dispatch_time
= current_time
;
1223 if (srv
->wqueue
->isNull())
1224 srv
->wqueue
->init();
1226 if (hlp
->childs
.concurrency
)
1227 srv
->wqueue
->Printf("%d %s", slot
, r
->buf
);
1229 srv
->wqueue
->append(r
->buf
, strlen(r
->buf
));
1231 if (!srv
->flags
.writing
) {
1232 assert(NULL
== srv
->writebuf
);
1233 srv
->writebuf
= srv
->wqueue
;
1234 srv
->wqueue
= new MemBuf
;
1235 srv
->flags
.writing
= 1;
1236 AsyncCall::Pointer call
= commCbCall(5,5, "helperDispatchWriteDone",
1237 CommIoCbPtrFun(helperDispatchWriteDone
, srv
));
1238 Comm::Write(srv
->writePipe
, srv
->writebuf
->content(), srv
->writebuf
->contentSize(), call
, NULL
);
1241 debugs(84, 5, "helperDispatch: Request sent to " << hlp
->id_name
<< " #" << srv
->index
+ 1 << ", " << strlen(r
->buf
) << " bytes");
1244 hlp
->stats
.requests
++;
1248 helperStatefulDispatchWriteDone(const Comm::ConnectionPointer
&conn
, char *buf
, size_t len
, comm_err_t flag
,
1249 int xerrno
, void *data
)
1256 helperStatefulDispatch(helper_stateful_server
* srv
, helper_stateful_request
* r
)
1258 statefulhelper
*hlp
= srv
->parent
;
1260 if (!cbdataReferenceValid(r
->data
)) {
1261 debugs(84, 1, "helperStatefulDispatch: invalid callback data");
1262 helperStatefulRequestFree(r
);
1263 helperStatefulReleaseServer(srv
);
1267 debugs(84, 9, "helperStatefulDispatch busying helper " << hlp
->id_name
<< " #" << srv
->index
+ 1);
1269 if (r
->placeholder
== 1) {
1270 /* a callback is needed before this request can _use_ a helper. */
1271 /* we don't care about releasing this helper. The request NEVER
1272 * gets to the helper. So we throw away the return code */
1273 r
->callback(r
->data
, srv
, NULL
);
1274 /* throw away the placeholder */
1275 helperStatefulRequestFree(r
);
1276 /* and push the queue. Note that the callback may have submitted a new
1277 * request to the helper which is why we test for the request*/
1279 if (srv
->request
== NULL
)
1280 helperStatefulServerDone(srv
);
1285 srv
->flags
.busy
= 1;
1286 srv
->flags
.reserved
= 1;
1288 srv
->dispatch_time
= current_time
;
1289 AsyncCall::Pointer call
= commCbCall(5,5, "helperStatefulDispatchWriteDone",
1290 CommIoCbPtrFun(helperStatefulDispatchWriteDone
, hlp
));
1291 Comm::Write(srv
->writePipe
, r
->buf
, strlen(r
->buf
), call
, NULL
);
1292 debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
1293 hlp
->id_name
<< " #" << srv
->index
+ 1 << ", " <<
1294 (int) strlen(r
->buf
) << " bytes");
1297 hlp
->stats
.requests
++;
1302 helperKickQueue(helper
* hlp
)
1307 while ((srv
= GetFirstAvailable(hlp
)) && (r
= Dequeue(hlp
)))
1308 helperDispatch(srv
, r
);
1312 helperStatefulKickQueue(statefulhelper
* hlp
)
1314 helper_stateful_request
*r
;
1315 helper_stateful_server
*srv
;
1317 while ((srv
= StatefulGetFirstAvailable(hlp
)) && (r
= StatefulDequeue(hlp
)))
1318 helperStatefulDispatch(srv
, r
);
1322 helperStatefulServerDone(helper_stateful_server
* srv
)
1324 if (!srv
->flags
.shutdown
) {
1325 helperStatefulKickQueue(srv
->parent
);
1326 } else if (!srv
->flags
.closing
&& !srv
->flags
.reserved
&& !srv
->flags
.busy
) {
1327 srv
->closeWritePipeSafely();
1333 helperRequestFree(helper_request
* r
)
1335 cbdataReferenceDone(r
->data
);
1341 helperStatefulRequestFree(helper_stateful_request
* r
)
1344 cbdataReferenceDone(r
->data
);
1350 // TODO: should helper_ and helper_stateful_ have a common parent?
1352 helperStartStats(StoreEntry
*sentry
, void *hlp
, const char *label
)
1356 storeAppendPrintf(sentry
, "%s: unavailable\n", label
);
1361 storeAppendPrintf(sentry
, "%s:\n", label
);