2 * DEBUG: section 84 Helper process maintenance
3 * AUTHOR: Harvest Derived?
5 * SQUID Web Proxy Cache http://www.squid-cache.org/
6 * ----------------------------------------------------------
8 * Squid is the result of efforts by numerous individuals from
9 * the Internet community; see the CONTRIBUTORS file for full
10 * details. Many organizations have provided support for Squid's
11 * development; see the SPONSORS file for full details. Squid is
12 * Copyrighted (C) 2001 by the Regents of the University of
13 * California; see the COPYRIGHT file for full details. Squid
14 * incorporates software developed and/or copyrighted by other
15 * sources; see the CREDITS file for full details.
17 * This program is free software; you can redistribute it and/or modify
18 * it under the terms of the GNU General Public License as published by
19 * the Free Software Foundation; either version 2 of the License, or
20 * (at your option) any later version.
22 * This program is distributed in the hope that it will be useful,
23 * but WITHOUT ANY WARRANTY; without even the implied warranty of
24 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
25 * GNU General Public License for more details.
27 * You should have received a copy of the GNU General Public License
28 * along with this program; if not, write to the Free Software
29 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
34 #include "base/AsyncCbdataCalls.h"
36 #include "comm/Connection.h"
37 #include "comm/Write.h"
39 #include "format/Quoting.h"
44 #include "SquidMath.h"
45 #include "SquidTime.h"
49 #define HELPER_MAX_ARGS 64
51 /** Initial Squid input buffer size. Helper responses may exceed this, and
52 * Squid will grow the input buffer as needed, up to ReadBufMaxSize.
54 const size_t ReadBufMinSize(4*1024);
56 /** Maximum safe size of a helper-to-Squid response message plus one.
57 * Squid will warn and close the stream if a helper sends a too-big response.
58 * ssl_crtd helper is known to produce responses of at least 10KB in size.
59 * Some undocumented helpers are known to produce responses exceeding 8KB.
61 const size_t ReadBufMaxSize(32*1024);
63 static IOCB helperHandleRead
;
64 static IOCB helperStatefulHandleRead
;
65 static void helperServerFree(helper_server
*srv
);
66 static void helperStatefulServerFree(helper_stateful_server
*srv
);
67 static void Enqueue(helper
* hlp
, helper_request
*);
68 static helper_request
*Dequeue(helper
* hlp
);
69 static helper_stateful_request
*StatefulDequeue(statefulhelper
* hlp
);
70 static helper_server
*GetFirstAvailable(helper
* hlp
);
71 static helper_stateful_server
*StatefulGetFirstAvailable(statefulhelper
* hlp
);
72 static void helperDispatch(helper_server
* srv
, helper_request
* r
);
73 static void helperStatefulDispatch(helper_stateful_server
* srv
, helper_stateful_request
* r
);
74 static void helperKickQueue(helper
* hlp
);
75 static void helperStatefulKickQueue(statefulhelper
* hlp
);
76 static void helperStatefulServerDone(helper_stateful_server
* srv
);
77 static void helperRequestFree(helper_request
* r
);
78 static void helperStatefulRequestFree(helper_stateful_request
* r
);
79 static void StatefulEnqueue(statefulhelper
* hlp
, helper_stateful_request
* r
);
80 static bool helperStartStats(StoreEntry
*sentry
, void *hlp
, const char *label
);
82 CBDATA_CLASS_INIT(helper
);
83 CBDATA_TYPE(helper_server
);
84 CBDATA_CLASS_INIT(statefulhelper
);
85 CBDATA_TYPE(helper_stateful_server
);
88 HelperServerBase::initStats()
97 HelperServerBase::closePipesSafely()
102 shutdown(writePipe
->fd
, SD_BOTH
);
106 if (readPipe
->fd
== writePipe
->fd
)
114 if (WaitForSingleObject(hIpc
, 5000) != WAIT_OBJECT_0
) {
116 debugs(84, DBG_IMPORTANT
, "WARNING: " << hlp
->id_name
<<
117 " #" << no
<< " (" << hlp
->cmdline
->key
<< "," <<
118 (long int)pid
<< ") didn't exit in 5 seconds");
126 HelperServerBase::closeWritePipeSafely()
131 shutdown(writePipe
->fd
, (readPipe
->fd
== writePipe
->fd
? SD_BOTH
: SD_SEND
));
135 if (readPipe
->fd
== writePipe
->fd
)
141 if (WaitForSingleObject(hIpc
, 5000) != WAIT_OBJECT_0
) {
143 debugs(84, DBG_IMPORTANT
, "WARNING: " << hlp
->id_name
<<
144 " #" << no
<< " (" << hlp
->cmdline
->key
<< "," <<
145 (long int)pid
<< ") didn't exit in 5 seconds");
153 helperOpenServers(helper
* hlp
)
159 const char *args
[HELPER_MAX_ARGS
+1]; // save space for a NULL terminator
160 char fd_note_buf
[FD_DESC_SZ
];
170 if (hlp
->cmdline
== NULL
)
173 progname
= hlp
->cmdline
->key
;
175 if ((s
= strrchr(progname
, '/')))
176 shortname
= xstrdup(s
+ 1);
178 shortname
= xstrdup(progname
);
180 /* figure out how many new child are actually needed. */
181 int need_new
= hlp
->childs
.needNew();
183 debugs(84, DBG_IMPORTANT
, "helperOpenServers: Starting " << need_new
<< "/" << hlp
->childs
.n_max
<< " '" << shortname
<< "' processes");
186 debugs(84, DBG_IMPORTANT
, "helperOpenServers: No '" << shortname
<< "' processes needed.");
189 procname
= (char *)xmalloc(strlen(shortname
) + 3);
191 snprintf(procname
, strlen(shortname
) + 3, "(%s)", shortname
);
193 args
[nargs
] = procname
;
196 for (w
= hlp
->cmdline
->next
; w
&& nargs
< HELPER_MAX_ARGS
; w
= w
->next
) {
197 args
[nargs
] = w
->key
;
204 assert(nargs
<= HELPER_MAX_ARGS
);
206 for (k
= 0; k
< need_new
; ++k
) {
209 pid
= ipcCreate(hlp
->ipc_type
,
219 debugs(84, DBG_IMPORTANT
, "WARNING: Cannot run '" << progname
<< "' process.");
223 ++ hlp
->childs
.n_running
;
224 ++ hlp
->childs
.n_active
;
225 CBDATA_INIT_TYPE(helper_server
);
226 srv
= cbdataAlloc(helper_server
);
231 srv
->addr
= hlp
->addr
;
232 srv
->readPipe
= new Comm::Connection
;
233 srv
->readPipe
->fd
= rfd
;
234 srv
->writePipe
= new Comm::Connection
;
235 srv
->writePipe
->fd
= wfd
;
236 srv
->rbuf
= (char *)memAllocBuf(ReadBufMinSize
, &srv
->rbuf_sz
);
237 srv
->wqueue
= new MemBuf
;
239 srv
->requests
= (helper_request
**)xcalloc(hlp
->childs
.concurrency
? hlp
->childs
.concurrency
: 1, sizeof(*srv
->requests
));
240 srv
->parent
= cbdataReference(hlp
);
241 dlinkAddTail(srv
, &srv
->link
, &hlp
->servers
);
244 snprintf(fd_note_buf
, FD_DESC_SZ
, "%s #%d", shortname
, k
+ 1);
245 fd_note(rfd
, fd_note_buf
);
247 snprintf(fd_note_buf
, FD_DESC_SZ
, "reading %s #%d", shortname
, k
+ 1);
248 fd_note(rfd
, fd_note_buf
);
249 snprintf(fd_note_buf
, FD_DESC_SZ
, "writing %s #%d", shortname
, k
+ 1);
250 fd_note(wfd
, fd_note_buf
);
253 commSetNonBlocking(rfd
);
256 commSetNonBlocking(wfd
);
258 AsyncCall::Pointer closeCall
= asyncCall(5,4, "helperServerFree", cbdataDialer(helperServerFree
, srv
));
259 comm_add_close_handler(rfd
, closeCall
);
261 AsyncCall::Pointer call
= commCbCall(5,4, "helperHandleRead",
262 CommIoCbPtrFun(helperHandleRead
, srv
));
263 comm_read(srv
->readPipe
, srv
->rbuf
, srv
->rbuf_sz
- 1, call
);
266 hlp
->last_restart
= squid_curtime
;
267 safe_free(shortname
);
269 helperKickQueue(hlp
);
275 * helperStatefulOpenServers: create the stateful child helper processes
278 helperStatefulOpenServers(statefulhelper
* hlp
)
281 const char *args
[HELPER_MAX_ARGS
+1]; // save space for a NULL terminator
282 char fd_note_buf
[FD_DESC_SZ
];
285 if (hlp
->cmdline
== NULL
)
288 if (hlp
->childs
.concurrency
)
289 debugs(84, DBG_CRITICAL
, "ERROR: concurrency= is not yet supported for stateful helpers ('" << hlp
->cmdline
<< "')");
291 char *progname
= hlp
->cmdline
->key
;
294 if ((s
= strrchr(progname
, '/')))
295 shortname
= xstrdup(s
+ 1);
297 shortname
= xstrdup(progname
);
299 /* figure out haw mant new helpers are needed. */
300 int need_new
= hlp
->childs
.needNew();
302 debugs(84, DBG_IMPORTANT
, "helperOpenServers: Starting " << need_new
<< "/" << hlp
->childs
.n_max
<< " '" << shortname
<< "' processes");
305 debugs(84, DBG_IMPORTANT
, "helperStatefulOpenServers: No '" << shortname
<< "' processes needed.");
308 char *procname
= (char *)xmalloc(strlen(shortname
) + 3);
310 snprintf(procname
, strlen(shortname
) + 3, "(%s)", shortname
);
312 args
[nargs
] = procname
;
315 for (wordlist
*w
= hlp
->cmdline
->next
; w
&& nargs
< HELPER_MAX_ARGS
; w
= w
->next
) {
316 args
[nargs
] = w
->key
;
323 assert(nargs
<= HELPER_MAX_ARGS
);
325 for (int k
= 0; k
< need_new
; ++k
) {
330 pid_t pid
= ipcCreate(hlp
->ipc_type
,
340 debugs(84, DBG_IMPORTANT
, "WARNING: Cannot run '" << progname
<< "' process.");
344 ++ hlp
->childs
.n_running
;
345 ++ hlp
->childs
.n_active
;
346 CBDATA_INIT_TYPE(helper_stateful_server
);
347 helper_stateful_server
*srv
= cbdataAlloc(helper_stateful_server
);
350 srv
->flags
.reserved
= 0;
353 srv
->addr
= hlp
->addr
;
354 srv
->readPipe
= new Comm::Connection
;
355 srv
->readPipe
->fd
= rfd
;
356 srv
->writePipe
= new Comm::Connection
;
357 srv
->writePipe
->fd
= wfd
;
358 srv
->rbuf
= (char *)memAllocBuf(ReadBufMinSize
, &srv
->rbuf_sz
);
360 srv
->parent
= cbdataReference(hlp
);
362 if (hlp
->datapool
!= NULL
)
363 srv
->data
= hlp
->datapool
->alloc();
365 dlinkAddTail(srv
, &srv
->link
, &hlp
->servers
);
368 snprintf(fd_note_buf
, FD_DESC_SZ
, "%s #%d", shortname
, k
+ 1);
369 fd_note(rfd
, fd_note_buf
);
371 snprintf(fd_note_buf
, FD_DESC_SZ
, "reading %s #%d", shortname
, k
+ 1);
372 fd_note(rfd
, fd_note_buf
);
373 snprintf(fd_note_buf
, FD_DESC_SZ
, "writing %s #%d", shortname
, k
+ 1);
374 fd_note(wfd
, fd_note_buf
);
377 commSetNonBlocking(rfd
);
380 commSetNonBlocking(wfd
);
382 AsyncCall::Pointer closeCall
= asyncCall(5,4, "helperStatefulServerFree", cbdataDialer(helperStatefulServerFree
, srv
));
383 comm_add_close_handler(rfd
, closeCall
);
385 AsyncCall::Pointer call
= commCbCall(5,4, "helperStatefulHandleRead",
386 CommIoCbPtrFun(helperStatefulHandleRead
, srv
));
387 comm_read(srv
->readPipe
, srv
->rbuf
, srv
->rbuf_sz
- 1, call
);
390 hlp
->last_restart
= squid_curtime
;
391 safe_free(shortname
);
393 helperStatefulKickQueue(hlp
);
397 helperSubmit(helper
* hlp
, const char *buf
, HLPCB
* callback
, void *data
)
400 debugs(84, 3, "helperSubmit: hlp == NULL");
401 HelperReply nilReply
;
402 callback(data
, nilReply
);
406 helper_request
*r
= new helper_request
;
409 r
->callback
= callback
;
410 r
->data
= cbdataReference(data
);
411 r
->buf
= xstrdup(buf
);
413 if ((srv
= GetFirstAvailable(hlp
)))
414 helperDispatch(srv
, r
);
418 debugs(84, 9, "helperSubmit: " << buf
);
421 /// lastserver = "server last used as part of a reserved request sequence"
423 helperStatefulSubmit(statefulhelper
* hlp
, const char *buf
, HLPCB
* callback
, void *data
, helper_stateful_server
* lastserver
)
426 debugs(84, 3, "helperStatefulSubmit: hlp == NULL");
427 HelperReply nilReply
;
428 callback(data
, nilReply
);
432 helper_stateful_request
*r
= new helper_stateful_request
;
434 r
->callback
= callback
;
435 r
->data
= cbdataReference(data
);
438 r
->buf
= xstrdup(buf
);
445 if ((buf
!= NULL
) && lastserver
) {
446 debugs(84, 5, "StatefulSubmit with lastserver " << lastserver
);
447 assert(lastserver
->flags
.reserved
);
448 assert(!(lastserver
->request
));
450 debugs(84, 5, "StatefulSubmit dispatching");
451 helperStatefulDispatch(lastserver
, r
);
453 helper_stateful_server
*srv
;
454 if ((srv
= StatefulGetFirstAvailable(hlp
))) {
455 helperStatefulDispatch(srv
, r
);
457 StatefulEnqueue(hlp
, r
);
460 debugs(84, 9, "helperStatefulSubmit: placeholder: '" << r
->placeholder
<< "', buf '" << buf
<< "'.");
466 * helperStatefulReleaseServer tells the helper that whoever was
467 * using it no longer needs its services.
470 helperStatefulReleaseServer(helper_stateful_server
* srv
)
472 debugs(84, 3, HERE
<< "srv-" << srv
->index
<< " flags.reserved = " << srv
->flags
.reserved
);
473 if (!srv
->flags
.reserved
)
476 ++ srv
->stats
.releases
;
478 srv
->flags
.reserved
= 0;
479 if (srv
->parent
->OnEmptyQueue
!= NULL
&& srv
->data
)
480 srv
->parent
->OnEmptyQueue(srv
->data
);
482 helperStatefulServerDone(srv
);
485 /** return a pointer to the stateful routines data area */
487 helperStatefulServerGetData(helper_stateful_server
* srv
)
493 * Dump some stats about the helper states to a StoreEntry
496 helperStats(StoreEntry
* sentry
, helper
* hlp
, const char *label
)
498 if (!helperStartStats(sentry
, hlp
, label
))
501 storeAppendPrintf(sentry
, "program: %s\n",
503 storeAppendPrintf(sentry
, "number active: %d of %d (%d shutting down)\n",
504 hlp
->childs
.n_active
, hlp
->childs
.n_max
, (hlp
->childs
.n_running
- hlp
->childs
.n_active
) );
505 storeAppendPrintf(sentry
, "requests sent: %d\n",
506 hlp
->stats
.requests
);
507 storeAppendPrintf(sentry
, "replies received: %d\n",
509 storeAppendPrintf(sentry
, "queue length: %d\n",
510 hlp
->stats
.queue_size
);
511 storeAppendPrintf(sentry
, "avg service time: %d msec\n",
512 hlp
->stats
.avg_svc_time
);
513 storeAppendPrintf(sentry
, "\n");
514 storeAppendPrintf(sentry
, "%7s\t%7s\t%7s\t%11s\t%11s\t%s\t%7s\t%7s\t%7s\n",
525 for (dlink_node
*link
= hlp
->servers
.head
; link
; link
= link
->next
) {
526 helper_server
*srv
= (helper_server
*)link
->data
;
527 double tt
= 0.001 * (srv
->requests
[0] ? tvSubMsec(srv
->requests
[0]->dispatch_time
, current_time
) : tvSubMsec(srv
->dispatch_time
, srv
->answer_time
));
528 storeAppendPrintf(sentry
, "%7d\t%7d\t%7d\t%11" PRIu64
"\t%11" PRIu64
"%c%c%c%c\t%7.3f\t%7d\t%s\n",
534 srv
->stats
.pending
? 'B' : ' ',
535 srv
->flags
.writing
? 'W' : ' ',
536 srv
->flags
.closing
? 'C' : ' ',
537 srv
->flags
.shutdown
? 'S' : ' ',
540 srv
->requests
[0] ? Format::QuoteMimeBlob(srv
->requests
[0]->buf
) : "(none)");
543 storeAppendPrintf(sentry
, "\nFlags key:\n\n");
544 storeAppendPrintf(sentry
, " B = BUSY\n");
545 storeAppendPrintf(sentry
, " W = WRITING\n");
546 storeAppendPrintf(sentry
, " C = CLOSING\n");
547 storeAppendPrintf(sentry
, " S = SHUTDOWN PENDING\n");
551 helperStatefulStats(StoreEntry
* sentry
, statefulhelper
* hlp
, const char *label
)
553 if (!helperStartStats(sentry
, hlp
, label
))
556 storeAppendPrintf(sentry
, "program: %s\n",
558 storeAppendPrintf(sentry
, "number active: %d of %d (%d shutting down)\n",
559 hlp
->childs
.n_active
, hlp
->childs
.n_max
, (hlp
->childs
.n_running
- hlp
->childs
.n_active
) );
560 storeAppendPrintf(sentry
, "requests sent: %d\n",
561 hlp
->stats
.requests
);
562 storeAppendPrintf(sentry
, "replies received: %d\n",
564 storeAppendPrintf(sentry
, "queue length: %d\n",
565 hlp
->stats
.queue_size
);
566 storeAppendPrintf(sentry
, "avg service time: %d msec\n",
567 hlp
->stats
.avg_svc_time
);
568 storeAppendPrintf(sentry
, "\n");
569 storeAppendPrintf(sentry
, "%7s\t%7s\t%7s\t%11s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
580 for (dlink_node
*link
= hlp
->servers
.head
; link
; link
= link
->next
) {
581 helper_stateful_server
*srv
= (helper_stateful_server
*)link
->data
;
582 double tt
= 0.001 * tvSubMsec(srv
->dispatch_time
, srv
->flags
.busy
? current_time
: srv
->answer_time
);
583 storeAppendPrintf(sentry
, "%7d\t%7d\t%7d\t%11" PRIu64
"\t%11" PRIu64
"\t%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
589 srv
->flags
.busy
? 'B' : ' ',
590 srv
->flags
.closing
? 'C' : ' ',
591 srv
->flags
.reserved
? 'R' : ' ',
592 srv
->flags
.shutdown
? 'S' : ' ',
593 srv
->request
? (srv
->request
->placeholder
? 'P' : ' ') : ' ',
596 srv
->request
? Format::QuoteMimeBlob(srv
->request
->buf
) : "(none)");
599 storeAppendPrintf(sentry
, "\nFlags key:\n\n");
600 storeAppendPrintf(sentry
, " B = BUSY\n");
601 storeAppendPrintf(sentry
, " C = CLOSING\n");
602 storeAppendPrintf(sentry
, " R = RESERVED\n");
603 storeAppendPrintf(sentry
, " S = SHUTDOWN PENDING\n");
604 storeAppendPrintf(sentry
, " P = PLACEHOLDER\n");
608 helperShutdown(helper
* hlp
)
610 dlink_node
*link
= hlp
->servers
.head
;
614 srv
= (helper_server
*)link
->data
;
617 if (srv
->flags
.shutdown
) {
618 debugs(84, 3, "helperShutdown: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " has already SHUT DOWN.");
622 assert(hlp
->childs
.n_active
> 0);
623 -- hlp
->childs
.n_active
;
624 srv
->flags
.shutdown
= 1; /* request it to shut itself down */
626 if (srv
->flags
.closing
) {
627 debugs(84, 3, "helperShutdown: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " is CLOSING.");
631 if (srv
->stats
.pending
) {
632 debugs(84, 3, "helperShutdown: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " is BUSY.");
636 debugs(84, 3, "helperShutdown: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " shutting down.");
637 /* the rest of the details is dealt with in the helperServerFree
640 srv
->closePipesSafely();
645 helperStatefulShutdown(statefulhelper
* hlp
)
647 dlink_node
*link
= hlp
->servers
.head
;
648 helper_stateful_server
*srv
;
651 srv
= (helper_stateful_server
*)link
->data
;
654 if (srv
->flags
.shutdown
) {
655 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " has already SHUT DOWN.");
659 assert(hlp
->childs
.n_active
> 0);
660 -- hlp
->childs
.n_active
;
661 srv
->flags
.shutdown
= 1; /* request it to shut itself down */
663 if (srv
->flags
.busy
) {
664 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " is BUSY.");
668 if (srv
->flags
.closing
) {
669 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " is CLOSING.");
673 if (srv
->flags
.reserved
) {
675 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " is RESERVED. Closing anyway.");
677 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " is RESERVED. Not Shutting Down Yet.");
682 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " shutting down.");
684 /* the rest of the details is dealt with in the helperStatefulServerFree
687 srv
->closePipesSafely();
693 /* note, don't free id_name, it probably points to static memory */
696 debugs(84, DBG_CRITICAL
, "WARNING: freeing " << id_name
<< " helper with " << stats
.queue_size
<< " requests queued");
699 /* ====================================================================== */
700 /* LOCAL FUNCTIONS */
701 /* ====================================================================== */
704 helperServerFree(helper_server
*srv
)
706 helper
*hlp
= srv
->parent
;
708 int i
, concurrency
= hlp
->childs
.concurrency
;
714 memFreeBuf(srv
->rbuf_sz
, srv
->rbuf
);
718 srv
->wqueue
->clean();
722 srv
->writebuf
->clean();
723 delete srv
->writebuf
;
724 srv
->writebuf
= NULL
;
727 if (Comm::IsConnOpen(srv
->writePipe
))
728 srv
->closeWritePipeSafely();
730 dlinkDelete(&srv
->link
, &hlp
->servers
);
732 assert(hlp
->childs
.n_running
> 0);
733 -- hlp
->childs
.n_running
;
735 if (!srv
->flags
.shutdown
) {
736 assert(hlp
->childs
.n_active
> 0);
737 -- hlp
->childs
.n_active
;
738 debugs(84, DBG_CRITICAL
, "WARNING: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " exited");
740 if (hlp
->childs
.needNew() > 0) {
741 debugs(80, DBG_IMPORTANT
, "Too few " << hlp
->id_name
<< " processes are running (need " << hlp
->childs
.needNew() << "/" << hlp
->childs
.n_max
<< ")");
743 if (hlp
->childs
.n_active
< hlp
->childs
.n_startup
&& hlp
->last_restart
> squid_curtime
- 30) {
744 if (srv
->stats
.replies
< 1)
745 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp
->id_name
);
747 debugs(80, DBG_CRITICAL
, "ERROR: The " << hlp
->id_name
<< " helpers are crashing too rapidly, need help!");
750 debugs(80, DBG_IMPORTANT
, "Starting new helpers");
751 helperOpenServers(hlp
);
755 for (i
= 0; i
< concurrency
; ++i
) {
756 // XXX: re-schedule these on another helper?
757 if ((r
= srv
->requests
[i
])) {
760 if (cbdataReferenceValidDone(r
->data
, &cbdata
)) {
761 HelperReply nilReply
;
762 r
->callback(cbdata
, nilReply
);
765 helperRequestFree(r
);
767 srv
->requests
[i
] = NULL
;
770 safe_free(srv
->requests
);
772 cbdataReferenceDone(srv
->parent
);
777 helperStatefulServerFree(helper_stateful_server
*srv
)
779 statefulhelper
*hlp
= srv
->parent
;
780 helper_stateful_request
*r
;
783 memFreeBuf(srv
->rbuf_sz
, srv
->rbuf
);
788 srv
->wqueue
->clean();
794 /* TODO: walk the local queue of requests and carry them all out */
795 if (Comm::IsConnOpen(srv
->writePipe
))
796 srv
->closeWritePipeSafely();
798 dlinkDelete(&srv
->link
, &hlp
->servers
);
800 assert(hlp
->childs
.n_running
> 0);
801 -- hlp
->childs
.n_running
;
803 if (!srv
->flags
.shutdown
) {
804 assert( hlp
->childs
.n_active
> 0);
805 -- hlp
->childs
.n_active
;
806 debugs(84, DBG_CRITICAL
, "WARNING: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " exited");
808 if (hlp
->childs
.needNew() > 0) {
809 debugs(80, DBG_IMPORTANT
, "Too few " << hlp
->id_name
<< " processes are running (need " << hlp
->childs
.needNew() << "/" << hlp
->childs
.n_max
<< ")");
811 if (hlp
->childs
.n_active
< hlp
->childs
.n_startup
&& hlp
->last_restart
> squid_curtime
- 30) {
812 if (srv
->stats
.replies
< 1)
813 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp
->id_name
);
815 debugs(80, DBG_CRITICAL
, "ERROR: The " << hlp
->id_name
<< " helpers are crashing too rapidly, need help!");
818 debugs(80, DBG_IMPORTANT
, "Starting new helpers");
819 helperStatefulOpenServers(hlp
);
823 if ((r
= srv
->request
)) {
826 if (cbdataReferenceValidDone(r
->data
, &cbdata
)) {
827 HelperReply nilReply
;
828 nilReply
.whichServer
= srv
;
829 r
->callback(cbdata
, nilReply
);
832 helperStatefulRequestFree(r
);
837 if (srv
->data
!= NULL
)
838 hlp
->datapool
->freeOne(srv
->data
);
840 cbdataReferenceDone(srv
->parent
);
845 /// Calls back with a pointer to the buffer with the helper output
847 helperReturnBuffer(int request_number
, helper_server
* srv
, helper
* hlp
, char * msg
, char * msg_end
)
849 helper_request
*r
= srv
->requests
[request_number
];
851 // TODO: parse the reply into new helper reply object
852 // pass that to the callback instead of msg
854 HLPCB
*callback
= r
->callback
;
856 srv
->requests
[request_number
] = NULL
;
861 if (cbdataReferenceValidDone(r
->data
, &cbdata
)) {
862 HelperReply
response(msg
, (msg_end
-msg
));
863 callback(cbdata
, response
);
866 -- srv
->stats
.pending
;
867 ++ srv
->stats
.replies
;
869 ++ hlp
->stats
.replies
;
871 srv
->answer_time
= current_time
;
873 srv
->dispatch_time
= r
->dispatch_time
;
875 hlp
->stats
.avg_svc_time
=
876 Math::intAverage(hlp
->stats
.avg_svc_time
,
877 tvSubMsec(r
->dispatch_time
, current_time
),
878 hlp
->stats
.replies
, REDIRECT_AV_FACTOR
);
880 helperRequestFree(r
);
882 debugs(84, DBG_IMPORTANT
, "helperHandleRead: unexpected reply on channel " <<
883 request_number
<< " from " << hlp
->id_name
<< " #" << srv
->index
+ 1 <<
884 " '" << srv
->rbuf
<< "'");
886 srv
->roffset
-= (msg_end
- srv
->rbuf
);
887 memmove(srv
->rbuf
, msg_end
, srv
->roffset
+ 1);
889 if (!srv
->flags
.shutdown
) {
890 helperKickQueue(hlp
);
891 } else if (!srv
->flags
.closing
&& !srv
->stats
.pending
) {
892 srv
->flags
.closing
=1;
893 srv
->writePipe
->close();
899 helperHandleRead(const Comm::ConnectionPointer
&conn
, char *buf
, size_t len
, comm_err_t flag
, int xerrno
, void *data
)
902 helper_server
*srv
= (helper_server
*)data
;
903 helper
*hlp
= srv
->parent
;
904 assert(cbdataReferenceValid(data
));
906 /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
908 if (flag
== COMM_ERR_CLOSING
) {
912 assert(conn
->fd
== srv
->readPipe
->fd
);
914 debugs(84, 5, "helperHandleRead: " << len
<< " bytes from " << hlp
->id_name
<< " #" << srv
->index
+ 1);
916 if (flag
!= COMM_OK
|| len
== 0) {
917 srv
->closePipesSafely();
922 srv
->rbuf
[srv
->roffset
] = '\0';
923 debugs(84, 9, "helperHandleRead: '" << srv
->rbuf
<< "'");
925 if (!srv
->stats
.pending
) {
926 /* someone spoke without being spoken to */
927 debugs(84, DBG_IMPORTANT
, "helperHandleRead: unexpected read from " <<
928 hlp
->id_name
<< " #" << srv
->index
+ 1 << ", " << (int)len
<<
929 " bytes '" << srv
->rbuf
<< "'");
935 while ((t
= strchr(srv
->rbuf
, hlp
->eom
))) {
936 /* end of reply found */
937 char *msg
= srv
->rbuf
;
939 debugs(84, 3, "helperHandleRead: end of reply found");
941 if (t
> srv
->rbuf
&& t
[-1] == '\r' && hlp
->eom
== '\n')
946 if (hlp
->childs
.concurrency
) {
947 i
= strtol(msg
, &msg
, 10);
949 while (*msg
&& xisspace(*msg
))
953 helperReturnBuffer(i
, srv
, hlp
, msg
, t
);
954 // only skip off the \0 _after_ passing its location to helperReturnBuffer
958 if (Comm::IsConnOpen(srv
->readPipe
)) {
959 int spaceSize
= srv
->rbuf_sz
- srv
->roffset
- 1;
960 assert(spaceSize
>= 0);
962 // grow the input buffer if needed and possible
963 if (!spaceSize
&& srv
->rbuf_sz
+ 4096 <= ReadBufMaxSize
) {
964 srv
->rbuf
= (char *)memReallocBuf(srv
->rbuf
, srv
->rbuf_sz
+ 4096, &srv
->rbuf_sz
);
965 debugs(84, 3, HERE
<< "Grew read buffer to " << srv
->rbuf_sz
);
966 spaceSize
= srv
->rbuf_sz
- srv
->roffset
- 1;
967 assert(spaceSize
>= 0);
970 // quit reading if there is no space left
972 debugs(84, DBG_IMPORTANT
, "ERROR: Disconnecting from a " <<
973 "helper that overflowed " << srv
->rbuf_sz
<< "-byte " <<
974 "Squid input buffer: " << hlp
->id_name
<< " #" <<
976 srv
->closePipesSafely();
980 AsyncCall::Pointer call
= commCbCall(5,4, "helperHandleRead",
981 CommIoCbPtrFun(helperHandleRead
, srv
));
982 comm_read(srv
->readPipe
, srv
->rbuf
+ srv
->roffset
, spaceSize
, call
);
987 helperStatefulHandleRead(const Comm::ConnectionPointer
&conn
, char *buf
, size_t len
, comm_err_t flag
, int xerrno
, void *data
)
990 helper_stateful_server
*srv
= (helper_stateful_server
*)data
;
991 helper_stateful_request
*r
;
992 statefulhelper
*hlp
= srv
->parent
;
993 assert(cbdataReferenceValid(data
));
995 /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
997 if (flag
== COMM_ERR_CLOSING
) {
1001 assert(conn
->fd
== srv
->readPipe
->fd
);
1003 debugs(84, 5, "helperStatefulHandleRead: " << len
<< " bytes from " <<
1004 hlp
->id_name
<< " #" << srv
->index
+ 1);
1006 if (flag
!= COMM_OK
|| len
== 0) {
1007 srv
->closePipesSafely();
1011 srv
->roffset
+= len
;
1012 srv
->rbuf
[srv
->roffset
] = '\0';
1016 /* someone spoke without being spoken to */
1017 debugs(84, DBG_IMPORTANT
, "helperStatefulHandleRead: unexpected read from " <<
1018 hlp
->id_name
<< " #" << srv
->index
+ 1 << ", " << (int)len
<<
1019 " bytes '" << srv
->rbuf
<< "'");
1024 if ((t
= strchr(srv
->rbuf
, hlp
->eom
))) {
1025 /* end of reply found */
1028 debugs(84, 3, "helperStatefulHandleRead: end of reply found");
1030 if (t
> srv
->rbuf
&& t
[-1] == '\r' && hlp
->eom
== '\n') {
1032 // rewind to the \r octet which is the real terminal now
1033 // and remember that we have to skip forward 2 places now.
1040 if (r
&& cbdataReferenceValid(r
->data
)) {
1041 HelperReply
res(srv
->rbuf
, (t
- srv
->rbuf
));
1042 res
.whichServer
= srv
;
1043 r
->callback(r
->data
, res
);
1045 debugs(84, DBG_IMPORTANT
, "StatefulHandleRead: no callback data registered");
1048 // only skip off the \0's _after_ passing its location in HelperReply above
1051 srv
->flags
.busy
= 0;
1053 helperStatefulRequestFree(r
);
1054 srv
->request
= NULL
;
1056 -- srv
->stats
.pending
;
1057 ++ srv
->stats
.replies
;
1059 ++ hlp
->stats
.replies
;
1060 srv
->answer_time
= current_time
;
1061 hlp
->stats
.avg_svc_time
=
1062 Math::intAverage(hlp
->stats
.avg_svc_time
,
1063 tvSubMsec(srv
->dispatch_time
, current_time
),
1064 hlp
->stats
.replies
, REDIRECT_AV_FACTOR
);
1067 helperStatefulServerDone(srv
);
1069 helperStatefulReleaseServer(srv
);
1072 if (Comm::IsConnOpen(srv
->readPipe
)) {
1073 int spaceSize
= srv
->rbuf_sz
- srv
->roffset
- 1;
1074 assert(spaceSize
>= 0);
1076 // grow the input buffer if needed and possible
1077 if (!spaceSize
&& srv
->rbuf_sz
+ 4096 <= ReadBufMaxSize
) {
1078 srv
->rbuf
= (char *)memReallocBuf(srv
->rbuf
, srv
->rbuf_sz
+ 4096, &srv
->rbuf_sz
);
1079 debugs(84, 3, HERE
<< "Grew read buffer to " << srv
->rbuf_sz
);
1080 spaceSize
= srv
->rbuf_sz
- srv
->roffset
- 1;
1081 assert(spaceSize
>= 0);
1084 // quit reading if there is no space left
1086 debugs(84, DBG_IMPORTANT
, "ERROR: Disconnecting from a " <<
1087 "helper that overflowed " << srv
->rbuf_sz
<< "-byte " <<
1088 "Squid input buffer: " << hlp
->id_name
<< " #" <<
1090 srv
->closePipesSafely();
1094 AsyncCall::Pointer call
= commCbCall(5,4, "helperStatefulHandleRead",
1095 CommIoCbPtrFun(helperStatefulHandleRead
, srv
));
1096 comm_read(srv
->readPipe
, srv
->rbuf
+ srv
->roffset
, spaceSize
, call
);
1101 Enqueue(helper
* hlp
, helper_request
* r
)
1103 dlink_node
*link
= (dlink_node
*)memAllocate(MEM_DLINK_NODE
);
1104 dlinkAddTail(r
, link
, &hlp
->queue
);
1105 ++ hlp
->stats
.queue_size
;
1107 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1108 if (hlp
->childs
.needNew() > 0) {
1109 debugs(84, DBG_CRITICAL
, "Starting new " << hlp
->id_name
<< " helpers...");
1110 helperOpenServers(hlp
);
1114 if (hlp
->stats
.queue_size
< (int)hlp
->childs
.n_running
)
1117 if (squid_curtime
- hlp
->last_queue_warn
< 600)
1120 if (shutting_down
|| reconfiguring
)
1123 hlp
->last_queue_warn
= squid_curtime
;
1125 debugs(84, DBG_CRITICAL
, "WARNING: All " << hlp
->childs
.n_active
<< "/" << hlp
->childs
.n_max
<< " " << hlp
->id_name
<< " processes are busy.");
1126 debugs(84, DBG_CRITICAL
, "WARNING: " << hlp
->stats
.queue_size
<< " pending requests queued");
1127 debugs(84, DBG_CRITICAL
, "WARNING: Consider increasing the number of " << hlp
->id_name
<< " processes in your config file.");
1129 if (hlp
->stats
.queue_size
> (int)hlp
->childs
.n_running
* 2)
1130 fatalf("Too many queued %s requests", hlp
->id_name
);
1134 StatefulEnqueue(statefulhelper
* hlp
, helper_stateful_request
* r
)
1136 dlink_node
*link
= (dlink_node
*)memAllocate(MEM_DLINK_NODE
);
1137 dlinkAddTail(r
, link
, &hlp
->queue
);
1138 ++ hlp
->stats
.queue_size
;
1140 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1141 if (hlp
->childs
.needNew() > 0) {
1142 debugs(84, DBG_CRITICAL
, "Starting new " << hlp
->id_name
<< " helpers...");
1143 helperStatefulOpenServers(hlp
);
1147 if (hlp
->stats
.queue_size
< (int)hlp
->childs
.n_running
)
1150 if (hlp
->stats
.queue_size
> (int)hlp
->childs
.n_running
* 2)
1151 fatalf("Too many queued %s requests", hlp
->id_name
);
1153 if (squid_curtime
- hlp
->last_queue_warn
< 600)
1156 if (shutting_down
|| reconfiguring
)
1159 hlp
->last_queue_warn
= squid_curtime
;
1161 debugs(84, DBG_CRITICAL
, "WARNING: All " << hlp
->childs
.n_active
<< "/" << hlp
->childs
.n_max
<< " " << hlp
->id_name
<< " processes are busy.");
1162 debugs(84, DBG_CRITICAL
, "WARNING: " << hlp
->stats
.queue_size
<< " pending requests queued");
1163 debugs(84, DBG_CRITICAL
, "WARNING: Consider increasing the number of " << hlp
->id_name
<< " processes in your config file.");
1166 static helper_request
*
1167 Dequeue(helper
* hlp
)
1170 helper_request
*r
= NULL
;
1172 if ((link
= hlp
->queue
.head
)) {
1173 r
= (helper_request
*)link
->data
;
1174 dlinkDelete(link
, &hlp
->queue
);
1175 memFree(link
, MEM_DLINK_NODE
);
1176 -- hlp
->stats
.queue_size
;
1182 static helper_stateful_request
*
1183 StatefulDequeue(statefulhelper
* hlp
)
1186 helper_stateful_request
*r
= NULL
;
1188 if ((link
= hlp
->queue
.head
)) {
1189 r
= (helper_stateful_request
*)link
->data
;
1190 dlinkDelete(link
, &hlp
->queue
);
1191 memFree(link
, MEM_DLINK_NODE
);
1192 -- hlp
->stats
.queue_size
;
1198 static helper_server
*
1199 GetFirstAvailable(helper
* hlp
)
1203 helper_server
*selected
= NULL
;
1204 debugs(84, 5, "GetFirstAvailable: Running servers " << hlp
->childs
.n_running
);
1206 if (hlp
->childs
.n_running
== 0)
1209 /* Find "least" loaded helper (approx) */
1210 for (n
= hlp
->servers
.head
; n
!= NULL
; n
= n
->next
) {
1211 srv
= (helper_server
*)n
->data
;
1213 if (selected
&& selected
->stats
.pending
<= srv
->stats
.pending
)
1216 if (srv
->flags
.shutdown
)
1219 if (!srv
->stats
.pending
)
1230 /* Check for overload */
1232 debugs(84, 5, "GetFirstAvailable: None available.");
1236 if (selected
->stats
.pending
>= (hlp
->childs
.concurrency
? hlp
->childs
.concurrency
: 1)) {
1237 debugs(84, 3, "GetFirstAvailable: Least-loaded helper is overloaded!");
1241 debugs(84, 5, "GetFirstAvailable: returning srv-" << selected
->index
);
1245 static helper_stateful_server
*
1246 StatefulGetFirstAvailable(statefulhelper
* hlp
)
1249 helper_stateful_server
*srv
= NULL
;
1250 debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp
->childs
.n_running
);
1252 if (hlp
->childs
.n_running
== 0)
1255 for (n
= hlp
->servers
.head
; n
!= NULL
; n
= n
->next
) {
1256 srv
= (helper_stateful_server
*)n
->data
;
1258 if (srv
->flags
.busy
)
1261 if (srv
->flags
.reserved
)
1264 if (srv
->flags
.shutdown
)
1267 if ((hlp
->IsAvailable
!= NULL
) && (srv
->data
!= NULL
) && !(hlp
->IsAvailable(srv
->data
)))
1270 debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv
->index
);
1274 debugs(84, 5, "StatefulGetFirstAvailable: None available.");
1279 helperDispatchWriteDone(const Comm::ConnectionPointer
&conn
, char *buf
, size_t len
, comm_err_t flag
, int xerrno
, void *data
)
1281 helper_server
*srv
= (helper_server
*)data
;
1283 srv
->writebuf
->clean();
1284 delete srv
->writebuf
;
1285 srv
->writebuf
= NULL
;
1286 srv
->flags
.writing
= 0;
1288 if (flag
!= COMM_OK
) {
1289 /* Helper server has crashed */
1290 debugs(84, DBG_CRITICAL
, "helperDispatch: Helper " << srv
->parent
->id_name
<< " #" << srv
->index
+ 1 << " has crashed");
1294 if (!srv
->wqueue
->isNull()) {
1295 srv
->writebuf
= srv
->wqueue
;
1296 srv
->wqueue
= new MemBuf
;
1297 srv
->flags
.writing
= 1;
1298 AsyncCall::Pointer call
= commCbCall(5,5, "helperDispatchWriteDone",
1299 CommIoCbPtrFun(helperDispatchWriteDone
, srv
));
1300 Comm::Write(srv
->writePipe
, srv
->writebuf
->content(), srv
->writebuf
->contentSize(), call
, NULL
);
1305 helperDispatch(helper_server
* srv
, helper_request
* r
)
1307 helper
*hlp
= srv
->parent
;
1308 helper_request
**ptr
= NULL
;
1311 if (!cbdataReferenceValid(r
->data
)) {
1312 debugs(84, DBG_IMPORTANT
, "helperDispatch: invalid callback data");
1313 helperRequestFree(r
);
1317 for (slot
= 0; slot
< (hlp
->childs
.concurrency
? hlp
->childs
.concurrency
: 1); ++slot
) {
1318 if (!srv
->requests
[slot
]) {
1319 ptr
= &srv
->requests
[slot
];
1326 r
->dispatch_time
= current_time
;
1328 if (srv
->wqueue
->isNull())
1329 srv
->wqueue
->init();
1331 if (hlp
->childs
.concurrency
)
1332 srv
->wqueue
->Printf("%d %s", slot
, r
->buf
);
1334 srv
->wqueue
->append(r
->buf
, strlen(r
->buf
));
1336 if (!srv
->flags
.writing
) {
1337 assert(NULL
== srv
->writebuf
);
1338 srv
->writebuf
= srv
->wqueue
;
1339 srv
->wqueue
= new MemBuf
;
1340 srv
->flags
.writing
= 1;
1341 AsyncCall::Pointer call
= commCbCall(5,5, "helperDispatchWriteDone",
1342 CommIoCbPtrFun(helperDispatchWriteDone
, srv
));
1343 Comm::Write(srv
->writePipe
, srv
->writebuf
->content(), srv
->writebuf
->contentSize(), call
, NULL
);
1346 debugs(84, 5, "helperDispatch: Request sent to " << hlp
->id_name
<< " #" << srv
->index
+ 1 << ", " << strlen(r
->buf
) << " bytes");
1349 ++ srv
->stats
.pending
;
1350 ++ hlp
->stats
.requests
;
1354 helperStatefulDispatchWriteDone(const Comm::ConnectionPointer
&conn
, char *buf
, size_t len
, comm_err_t flag
,
1355 int xerrno
, void *data
)
1361 helperStatefulDispatch(helper_stateful_server
* srv
, helper_stateful_request
* r
)
1363 statefulhelper
*hlp
= srv
->parent
;
1365 if (!cbdataReferenceValid(r
->data
)) {
1366 debugs(84, DBG_IMPORTANT
, "helperStatefulDispatch: invalid callback data");
1367 helperStatefulRequestFree(r
);
1368 helperStatefulReleaseServer(srv
);
1372 debugs(84, 9, "helperStatefulDispatch busying helper " << hlp
->id_name
<< " #" << srv
->index
+ 1);
1374 if (r
->placeholder
== 1) {
1375 /* a callback is needed before this request can _use_ a helper. */
1376 /* we don't care about releasing this helper. The request NEVER
1377 * gets to the helper. So we throw away the return code */
1378 HelperReply nilReply
;
1379 nilReply
.whichServer
= srv
;
1380 r
->callback(r
->data
, nilReply
);
1381 /* throw away the placeholder */
1382 helperStatefulRequestFree(r
);
1383 /* and push the queue. Note that the callback may have submitted a new
1384 * request to the helper which is why we test for the request */
1386 if (srv
->request
== NULL
)
1387 helperStatefulServerDone(srv
);
1392 srv
->flags
.busy
= 1;
1393 srv
->flags
.reserved
= 1;
1395 srv
->dispatch_time
= current_time
;
1396 AsyncCall::Pointer call
= commCbCall(5,5, "helperStatefulDispatchWriteDone",
1397 CommIoCbPtrFun(helperStatefulDispatchWriteDone
, hlp
));
1398 Comm::Write(srv
->writePipe
, r
->buf
, strlen(r
->buf
), call
, NULL
);
1399 debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
1400 hlp
->id_name
<< " #" << srv
->index
+ 1 << ", " <<
1401 (int) strlen(r
->buf
) << " bytes");
1404 ++ srv
->stats
.pending
;
1405 ++ hlp
->stats
.requests
;
1409 helperKickQueue(helper
* hlp
)
1414 while ((srv
= GetFirstAvailable(hlp
)) && (r
= Dequeue(hlp
)))
1415 helperDispatch(srv
, r
);
1419 helperStatefulKickQueue(statefulhelper
* hlp
)
1421 helper_stateful_request
*r
;
1422 helper_stateful_server
*srv
;
1424 while ((srv
= StatefulGetFirstAvailable(hlp
)) && (r
= StatefulDequeue(hlp
)))
1425 helperStatefulDispatch(srv
, r
);
1429 helperStatefulServerDone(helper_stateful_server
* srv
)
1431 if (!srv
->flags
.shutdown
) {
1432 helperStatefulKickQueue(srv
->parent
);
1433 } else if (!srv
->flags
.closing
&& !srv
->flags
.reserved
&& !srv
->flags
.busy
) {
1434 srv
->closeWritePipeSafely();
1440 helperRequestFree(helper_request
* r
)
1442 cbdataReferenceDone(r
->data
);
1448 helperStatefulRequestFree(helper_stateful_request
* r
)
1451 cbdataReferenceDone(r
->data
);
1457 // TODO: should helper_ and helper_stateful_ have a common parent?
1459 helperStartStats(StoreEntry
*sentry
, void *hlp
, const char *label
)
1463 storeAppendPrintf(sentry
, "%s: unavailable\n", label
);
1468 storeAppendPrintf(sentry
, "%s:\n", label
);