4 * DEBUG: section 84 Helper process maintenance
5 * AUTHOR: Harvest Derived?
7 * SQUID Web Proxy Cache http://www.squid-cache.org/
8 * ----------------------------------------------------------
10 * Squid is the result of efforts by numerous individuals from
11 * the Internet community; see the CONTRIBUTORS file for full
12 * details. Many organizations have provided support for Squid's
13 * development; see the SPONSORS file for full details. Squid is
14 * Copyrighted (C) 2001 by the Regents of the University of
15 * California; see the COPYRIGHT file for full details. Squid
16 * incorporates software developed and/or copyrighted by other
17 * sources; see the CREDITS file for full details.
19 * This program is free software; you can redistribute it and/or modify
20 * it under the terms of the GNU General Public License as published by
21 * the Free Software Foundation; either version 2 of the License, or
22 * (at your option) any later version.
24 * This program is distributed in the hope that it will be useful,
25 * but WITHOUT ANY WARRANTY; without even the implied warranty of
26 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
27 * GNU General Public License for more details.
29 * You should have received a copy of the GNU General Public License
30 * along with this program; if not, write to the Free Software
31 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
37 #include "SquidTime.h"
43 #define HELPER_MAX_ARGS 64
45 /* size of helper read buffer (maximum?). no reason given for this size */
46 /* though it has been seen to be too short for some requests */
47 /* it is dynamic, so increasng should not have side effects */
50 static IOCB helperHandleRead
;
51 static IOCB helperStatefulHandleRead
;
52 static PF helperServerFree
;
53 static PF helperStatefulServerFree
;
54 static void Enqueue(helper
* hlp
, helper_request
*);
55 static helper_request
*Dequeue(helper
* hlp
);
56 static helper_stateful_request
*StatefulDequeue(statefulhelper
* hlp
);
57 static helper_server
*GetFirstAvailable(helper
* hlp
);
58 static helper_stateful_server
*StatefulGetFirstAvailable(statefulhelper
* hlp
);
59 static void helperDispatch(helper_server
* srv
, helper_request
* r
);
60 static void helperStatefulDispatch(helper_stateful_server
* srv
, helper_stateful_request
* r
);
61 static void helperKickQueue(helper
* hlp
);
62 static void helperStatefulKickQueue(statefulhelper
* hlp
);
63 static void helperStatefulServerDone(helper_stateful_server
* srv
);
64 static void helperRequestFree(helper_request
* r
);
65 static void helperStatefulRequestFree(helper_stateful_request
* r
);
66 static void StatefulEnqueue(statefulhelper
* hlp
, helper_stateful_request
* r
);
67 static bool helperStartStats(StoreEntry
*sentry
, void *hlp
, const char *label
);
71 CBDATA_TYPE(helper_server
);
72 CBDATA_TYPE(statefulhelper
);
73 CBDATA_TYPE(helper_stateful_server
);
76 helperOpenServers(helper
* hlp
)
82 const char *args
[HELPER_MAX_ARGS
+1]; // save space for a NULL terminator
83 char fd_note_buf
[FD_DESC_SZ
];
93 if (hlp
->cmdline
== NULL
)
96 progname
= hlp
->cmdline
->key
;
98 if ((s
= strrchr(progname
, '/')))
99 shortname
= xstrdup(s
+ 1);
101 shortname
= xstrdup(progname
);
103 /* dont ever start more than hlp->n_to_start processes. */
104 int need_new
= hlp
->n_to_start
- hlp
->n_active
;
106 debugs(84, 1, "helperOpenServers: Starting " << need_new
<< "/" << hlp
->n_to_start
<< " '" << shortname
<< "' processes");
109 debugs(84, 1, "helperOpenServers: No '" << shortname
<< "' processes needed.");
112 procname
= (char *)xmalloc(strlen(shortname
) + 3);
114 snprintf(procname
, strlen(shortname
) + 3, "(%s)", shortname
);
116 args
[nargs
++] = procname
;
118 for (w
= hlp
->cmdline
->next
; w
&& nargs
< HELPER_MAX_ARGS
; w
= w
->next
)
119 args
[nargs
++] = w
->key
;
121 args
[nargs
++] = NULL
;
123 assert(nargs
<= HELPER_MAX_ARGS
);
125 for (k
= 0; k
< need_new
; k
++) {
128 pid
= ipcCreate(hlp
->ipc_type
,
138 debugs(84, 1, "WARNING: Cannot run '" << progname
<< "' process.");
144 CBDATA_INIT_TYPE(helper_server
);
145 srv
= cbdataAlloc(helper_server
);
149 srv
->addr
= hlp
->addr
;
152 srv
->rbuf
= (char *)memAllocBuf(BUF_8KB
, &srv
->rbuf_sz
);
153 srv
->wqueue
= new MemBuf
;
155 srv
->requests
= (helper_request
**)xcalloc(hlp
->concurrency
? hlp
->concurrency
: 1, sizeof(*srv
->requests
));
156 srv
->parent
= cbdataReference(hlp
);
157 dlinkAddTail(srv
, &srv
->link
, &hlp
->servers
);
160 snprintf(fd_note_buf
, FD_DESC_SZ
, "%s #%d", shortname
, k
+ 1);
161 fd_note(rfd
, fd_note_buf
);
163 snprintf(fd_note_buf
, FD_DESC_SZ
, "reading %s #%d", shortname
, k
+ 1);
164 fd_note(rfd
, fd_note_buf
);
165 snprintf(fd_note_buf
, FD_DESC_SZ
, "writing %s #%d", shortname
, k
+ 1);
166 fd_note(wfd
, fd_note_buf
);
169 commSetNonBlocking(rfd
);
172 commSetNonBlocking(wfd
);
174 comm_add_close_handler(rfd
, helperServerFree
, srv
);
176 comm_read(srv
->rfd
, srv
->rbuf
, srv
->rbuf_sz
- 1, helperHandleRead
, srv
);
179 hlp
->last_restart
= squid_curtime
;
180 safe_free(shortname
);
182 helperKickQueue(hlp
);
188 * helperStatefulOpenServers: create the stateful child helper processes
191 helperStatefulOpenServers(statefulhelper
* hlp
)
194 const char *args
[HELPER_MAX_ARGS
+1]; // save space for a NULL terminator
195 char fd_note_buf
[FD_DESC_SZ
];
198 if (hlp
->cmdline
== NULL
)
201 char *progname
= hlp
->cmdline
->key
;
204 if ((s
= strrchr(progname
, '/')))
205 shortname
= xstrdup(s
+ 1);
207 shortname
= xstrdup(progname
);
209 /* dont ever start more than hlp->n_to_start processes. */
210 /* n_active are the helpers which have not been shut down. */
211 int need_new
= hlp
->n_to_start
- hlp
->n_active
;
213 debugs(84, 1, "helperOpenServers: Starting " << need_new
<< "/" << hlp
->n_to_start
<< " '" << shortname
<< "' processes");
216 debugs(84, 1, "helperStatefulOpenServers: No '" << shortname
<< "' processes needed.");
219 char *procname
= (char *)xmalloc(strlen(shortname
) + 3);
221 snprintf(procname
, strlen(shortname
) + 3, "(%s)", shortname
);
223 args
[nargs
++] = procname
;
225 for (wordlist
*w
= hlp
->cmdline
->next
; w
&& nargs
< HELPER_MAX_ARGS
; w
= w
->next
)
226 args
[nargs
++] = w
->key
;
228 args
[nargs
++] = NULL
;
230 assert(nargs
<= HELPER_MAX_ARGS
);
232 for (int k
= 0; k
< need_new
; k
++) {
237 pid_t pid
= ipcCreate(hlp
->ipc_type
,
247 debugs(84, 1, "WARNING: Cannot run '" << progname
<< "' process.");
253 CBDATA_INIT_TYPE(helper_stateful_server
);
254 helper_stateful_server
*srv
= cbdataAlloc(helper_stateful_server
);
257 srv
->flags
.reserved
= 0;
258 srv
->stats
.submits
= 0;
259 srv
->stats
.releases
= 0;
261 srv
->addr
= hlp
->addr
;
264 srv
->rbuf
= (char *)memAllocBuf(BUF_8KB
, &srv
->rbuf_sz
);
266 srv
->parent
= cbdataReference(hlp
);
268 if (hlp
->datapool
!= NULL
)
269 srv
->data
= hlp
->datapool
->alloc();
271 dlinkAddTail(srv
, &srv
->link
, &hlp
->servers
);
274 snprintf(fd_note_buf
, FD_DESC_SZ
, "%s #%d", shortname
, k
+ 1);
275 fd_note(rfd
, fd_note_buf
);
277 snprintf(fd_note_buf
, FD_DESC_SZ
, "reading %s #%d", shortname
, k
+ 1);
278 fd_note(rfd
, fd_note_buf
);
279 snprintf(fd_note_buf
, FD_DESC_SZ
, "writing %s #%d", shortname
, k
+ 1);
280 fd_note(wfd
, fd_note_buf
);
283 commSetNonBlocking(rfd
);
286 commSetNonBlocking(wfd
);
288 comm_add_close_handler(rfd
, helperStatefulServerFree
, srv
);
290 comm_read(srv
->rfd
, srv
->rbuf
, srv
->rbuf_sz
- 1, helperStatefulHandleRead
, srv
);
293 hlp
->last_restart
= squid_curtime
;
294 safe_free(shortname
);
296 helperStatefulKickQueue(hlp
);
301 helperSubmit(helper
* hlp
, const char *buf
, HLPCB
* callback
, void *data
)
304 debugs(84, 3, "helperSubmit: hlp == NULL");
305 callback(data
, NULL
);
309 helper_request
*r
= new helper_request
;
312 r
->callback
= callback
;
313 r
->data
= cbdataReference(data
);
314 r
->buf
= xstrdup(buf
);
316 if ((srv
= GetFirstAvailable(hlp
)))
317 helperDispatch(srv
, r
);
321 debugs(84, 9, "helperSubmit: " << buf
);
324 /// lastserver = "server last used as part of a reserved request sequence"
326 helperStatefulSubmit(statefulhelper
* hlp
, const char *buf
, HLPSCB
* callback
, void *data
, helper_stateful_server
* lastserver
)
329 debugs(84, 3, "helperStatefulSubmit: hlp == NULL");
330 callback(data
, 0, NULL
);
334 helper_stateful_request
*r
= new helper_stateful_request
;
336 r
->callback
= callback
;
337 r
->data
= cbdataReference(data
);
340 r
->buf
= xstrdup(buf
);
347 if ((buf
!= NULL
) && lastserver
) {
348 debugs(84, 5, "StatefulSubmit with lastserver " << lastserver
);
349 assert(lastserver
->flags
.reserved
);
350 assert(!(lastserver
->request
));
352 debugs(84, 5, "StatefulSubmit dispatching");
353 helperStatefulDispatch(lastserver
, r
);
355 helper_stateful_server
*srv
;
356 if ((srv
= StatefulGetFirstAvailable(hlp
))) {
357 helperStatefulDispatch(srv
, r
);
359 StatefulEnqueue(hlp
, r
);
362 debugs(84, 9, "helperStatefulSubmit: placeholder: '" << r
->placeholder
<< "', buf '" << buf
<< "'.");
368 * helperStatefulReleaseServer tells the helper that whoever was
369 * using it no longer needs its services.
372 helperStatefulReleaseServer(helper_stateful_server
* srv
)
374 debugs(84, 3, HERE
<< "srv-" << srv
->index
<< " flags.reserved = " << srv
->flags
.reserved
);
375 if (!srv
->flags
.reserved
)
378 srv
->stats
.releases
++;
380 srv
->flags
.reserved
= 0;
381 if (srv
->parent
->OnEmptyQueue
!= NULL
&& srv
->data
)
382 srv
->parent
->OnEmptyQueue(srv
->data
);
384 helperStatefulServerDone(srv
);
387 /** return a pointer to the stateful routines data area */
389 helperStatefulServerGetData(helper_stateful_server
* srv
)
395 * Dump some stats about the helper states to a StoreEntry
398 helperStats(StoreEntry
* sentry
, helper
* hlp
, const char *label
)
400 if (!helperStartStats(sentry
, hlp
, label
))
403 storeAppendPrintf(sentry
, "program: %s\n",
405 storeAppendPrintf(sentry
, "number active: %d of %d (%d shutting down)\n",
406 hlp
->n_active
, hlp
->n_to_start
, (hlp
->n_running
- hlp
->n_active
) );
407 storeAppendPrintf(sentry
, "requests sent: %d\n",
408 hlp
->stats
.requests
);
409 storeAppendPrintf(sentry
, "replies received: %d\n",
411 storeAppendPrintf(sentry
, "queue length: %d\n",
412 hlp
->stats
.queue_size
);
413 storeAppendPrintf(sentry
, "avg service time: %d msec\n",
414 hlp
->stats
.avg_svc_time
);
415 storeAppendPrintf(sentry
, "\n");
416 storeAppendPrintf(sentry
, "%7s\t%7s\t%7s\t%11s\t%s\t%7s\t%7s\t%7s\n",
426 for (dlink_node
*link
= hlp
->servers
.head
; link
; link
= link
->next
) {
427 helper_server
*srv
= (helper_server
*)link
->data
;
428 double tt
= 0.001 * (srv
->requests
[0] ? tvSubMsec(srv
->requests
[0]->dispatch_time
, current_time
) : tvSubMsec(srv
->dispatch_time
, srv
->answer_time
));
429 storeAppendPrintf(sentry
, "%7d\t%7d\t%7d\t%11d\t%c%c%c%c\t%7.3f\t%7d\t%s\n",
434 srv
->stats
.pending
? 'B' : ' ',
435 srv
->flags
.writing
? 'W' : ' ',
436 srv
->flags
.closing
? 'C' : ' ',
437 srv
->flags
.shutdown
? 'S' : ' ',
440 srv
->requests
[0] ? log_quote(srv
->requests
[0]->buf
) : "(none)");
443 storeAppendPrintf(sentry
, "\nFlags key:\n\n");
444 storeAppendPrintf(sentry
, " B = BUSY\n");
445 storeAppendPrintf(sentry
, " W = WRITING\n");
446 storeAppendPrintf(sentry
, " C = CLOSING\n");
447 storeAppendPrintf(sentry
, " S = SHUTDOWN PENDING\n");
451 helperStatefulStats(StoreEntry
* sentry
, statefulhelper
* hlp
, const char *label
)
453 if (!helperStartStats(sentry
, hlp
, label
))
456 storeAppendPrintf(sentry
, "program: %s\n",
458 storeAppendPrintf(sentry
, "number active: %d of %d (%d shutting down)\n",
459 hlp
->n_active
, hlp
->n_to_start
, (hlp
->n_running
- hlp
->n_active
) );
460 storeAppendPrintf(sentry
, "requests sent: %d\n",
461 hlp
->stats
.requests
);
462 storeAppendPrintf(sentry
, "replies received: %d\n",
464 storeAppendPrintf(sentry
, "queue length: %d\n",
465 hlp
->stats
.queue_size
);
466 storeAppendPrintf(sentry
, "avg service time: %d msec\n",
467 hlp
->stats
.avg_svc_time
);
468 storeAppendPrintf(sentry
, "\n");
469 storeAppendPrintf(sentry
, "%7s\t%7s\t%7s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
479 for (dlink_node
*link
= hlp
->servers
.head
; link
; link
= link
->next
) {
480 helper_stateful_server
*srv
= (helper_stateful_server
*)link
->data
;
481 double tt
= 0.001 * tvSubMsec(srv
->dispatch_time
, srv
->flags
.busy
? current_time
: srv
->answer_time
);
482 storeAppendPrintf(sentry
, "%7d\t%7d\t%7d\t%11d\t%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
487 srv
->flags
.busy
? 'B' : ' ',
488 srv
->flags
.closing
? 'C' : ' ',
489 srv
->flags
.reserved
? 'R' : ' ',
490 srv
->flags
.shutdown
? 'S' : ' ',
491 srv
->request
? (srv
->request
->placeholder
? 'P' : ' ') : ' ',
494 srv
->request
? log_quote(srv
->request
->buf
) : "(none)");
497 storeAppendPrintf(sentry
, "\nFlags key:\n\n");
498 storeAppendPrintf(sentry
, " B = BUSY\n");
499 storeAppendPrintf(sentry
, " C = CLOSING\n");
500 storeAppendPrintf(sentry
, " R = RESERVED\n");
501 storeAppendPrintf(sentry
, " S = SHUTDOWN PENDING\n");
502 storeAppendPrintf(sentry
, " P = PLACEHOLDER\n");
506 helperShutdown(helper
* hlp
)
508 dlink_node
*link
= hlp
->servers
.head
;
518 srv
= (helper_server
*)link
->data
;
521 if (srv
->flags
.shutdown
) {
522 debugs(84, 3, "helperShutdown: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " has already SHUT DOWN.");
527 assert(hlp
->n_active
>= 0);
528 srv
->flags
.shutdown
= 1; /* request it to shut itself down */
530 if (srv
->flags
.closing
) {
531 debugs(84, 3, "helperShutdown: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " is CLOSING.");
535 if (srv
->stats
.pending
) {
536 debugs(84, 3, "helperShutdown: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " is BUSY.");
540 srv
->flags
.closing
= 1;
546 shutdown(srv
->wfd
, SD_BOTH
);
549 debugs(84, 3, "helperShutdown: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " shutting down.");
550 /* the rest of the details is dealt with in the helperServerFree
553 comm_close(srv
->rfd
);
557 if (WaitForSingleObject(hIpc
, 5000) != WAIT_OBJECT_0
) {
559 debugs(84, 1, "helperShutdown: WARNING: " << hlp
->id_name
<<
560 " #" << no
<< " (" << hlp
->cmdline
->key
<< "," <<
561 (long int)pid
<< ") didn't exit in 5 seconds");
574 helperStatefulShutdown(statefulhelper
* hlp
)
576 dlink_node
*link
= hlp
->servers
.head
;
577 helper_stateful_server
*srv
;
586 srv
= (helper_stateful_server
*)link
->data
;
589 if (srv
->flags
.shutdown
) {
590 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " has already SHUT DOWN.");
595 assert(hlp
->n_active
>= 0);
596 srv
->flags
.shutdown
= 1; /* request it to shut itself down */
598 if (srv
->flags
.busy
) {
599 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " is BUSY.");
603 if (srv
->flags
.closing
) {
604 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " is CLOSING.");
608 if (srv
->flags
.reserved
) {
610 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " is RESERVED. Closing anyway.");
612 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " is RESERVED. Not Shutting Down Yet.");
617 srv
->flags
.closing
= 1;
623 shutdown(srv
->wfd
, SD_BOTH
);
626 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " shutting down.");
628 /* the rest of the details is dealt with in the helperStatefulServerFree
631 comm_close(srv
->rfd
);
635 if (WaitForSingleObject(hIpc
, 5000) != WAIT_OBJECT_0
) {
637 debugs(84, 1, "helperShutdown: WARNING: " << hlp
->id_name
<<
638 " #" << no
<< " (" << hlp
->cmdline
->key
<< "," <<
639 (long int)pid
<< ") didn't exit in 5 seconds");
652 helperCreate(const char *name
)
655 CBDATA_INIT_TYPE(helper
);
656 hlp
= cbdataAlloc(helper
);
662 helperStatefulCreate(const char *name
)
665 CBDATA_INIT_TYPE(statefulhelper
);
666 hlp
= cbdataAlloc(statefulhelper
);
673 helperFree(helper
* hlp
)
678 /* note, don't free hlp->name, it probably points to static memory */
680 debugs(84, 0, "WARNING: freeing " << hlp
->id_name
<< " helper with " <<
681 hlp
->stats
.queue_size
<< " requests queued");
687 helperStatefulFree(statefulhelper
* hlp
)
692 /* note, don't free hlp->name, it probably points to static memory */
694 debugs(84, 0, "WARNING: freeing " << hlp
->id_name
<< " helper with " <<
695 hlp
->stats
.queue_size
<< " requests queued");
701 /* ====================================================================== */
702 /* LOCAL FUNCTIONS */
703 /* ====================================================================== */
706 helperServerFree(int fd
, void *data
)
708 helper_server
*srv
= (helper_server
*)data
;
709 helper
*hlp
= srv
->parent
;
711 int i
, concurrency
= hlp
->concurrency
;
717 memFreeBuf(srv
->rbuf_sz
, srv
->rbuf
);
721 srv
->wqueue
->clean();
725 srv
->writebuf
->clean();
726 delete srv
->writebuf
;
727 srv
->writebuf
= NULL
;
730 for (i
= 0; i
< concurrency
; i
++) {
731 if ((r
= srv
->requests
[i
])) {
734 if (cbdataReferenceValidDone(r
->data
, &cbdata
))
735 r
->callback(cbdata
, NULL
);
737 helperRequestFree(r
);
739 srv
->requests
[i
] = NULL
;
743 safe_free(srv
->requests
);
745 if (srv
->wfd
!= srv
->rfd
&& srv
->wfd
!= -1)
746 comm_close(srv
->wfd
);
748 dlinkDelete(&srv
->link
, &hlp
->servers
);
752 assert(hlp
->n_running
>= 0);
754 if (!srv
->flags
.shutdown
) {
756 assert(hlp
->n_active
>= 0);
757 debugs(84, 0, "WARNING: " << hlp
->id_name
<< " #" << srv
->index
+ 1 <<
758 " (FD " << fd
<< ") exited");
760 if (hlp
->n_active
< hlp
->n_to_start
/ 2) {
761 debugs(80, 0, "Too few " << hlp
->id_name
<< " processes are running");
763 if (hlp
->last_restart
> squid_curtime
- 30)
764 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp
->id_name
);
766 debugs(80, 0, "Starting new helpers");
768 helperOpenServers(hlp
);
772 cbdataReferenceDone(srv
->parent
);
777 helperStatefulServerFree(int fd
, void *data
)
779 helper_stateful_server
*srv
= (helper_stateful_server
*)data
;
780 statefulhelper
*hlp
= srv
->parent
;
781 helper_stateful_request
*r
;
784 memFreeBuf(srv
->rbuf_sz
, srv
->rbuf
);
789 srv
->wqueue
->clean();
795 if ((r
= srv
->request
)) {
798 if (cbdataReferenceValidDone(r
->data
, &cbdata
))
799 r
->callback(cbdata
, srv
, NULL
);
801 helperStatefulRequestFree(r
);
806 /* TODO: walk the local queue of requests and carry them all out */
807 if (srv
->wfd
!= srv
->rfd
&& srv
->wfd
!= -1)
808 comm_close(srv
->wfd
);
810 dlinkDelete(&srv
->link
, &hlp
->servers
);
814 assert(hlp
->n_running
>= 0);
816 if (!srv
->flags
.shutdown
) {
818 assert( hlp
->n_active
>= 0);
819 debugs(84, 0, "WARNING: " << hlp
->id_name
<< " #" << srv
->index
+ 1 << " (FD " << fd
<< ") exited");
821 if (hlp
->n_active
<= hlp
->n_to_start
/ 2) {
822 debugs(80, 0, "Too few " << hlp
->id_name
<< " processes are running");
824 if (hlp
->last_restart
> squid_curtime
- 30)
825 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp
->id_name
);
827 debugs(80, 0, "Starting new helpers");
829 helperStatefulOpenServers(hlp
);
833 if (srv
->data
!= NULL
)
834 hlp
->datapool
->free(srv
->data
);
836 cbdataReferenceDone(srv
->parent
);
843 helperHandleRead(int fd
, char *buf
, size_t len
, comm_err_t flag
, int xerrno
, void *data
)
846 helper_server
*srv
= (helper_server
*)data
;
847 helper
*hlp
= srv
->parent
;
848 assert(cbdataReferenceValid(data
));
850 /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
852 if (flag
== COMM_ERR_CLOSING
) {
856 assert(fd
== srv
->rfd
);
858 debugs(84, 5, "helperHandleRead: " << len
<< " bytes from " << hlp
->id_name
<< " #" << srv
->index
+ 1);
860 if (flag
!= COMM_OK
|| len
<= 0) {
862 debugs(84, 1, "helperHandleRead: FD " << fd
<< " read: " << xstrerror());
870 srv
->rbuf
[srv
->roffset
] = '\0';
871 debugs(84, 9, "helperHandleRead: '" << srv
->rbuf
<< "'");
873 if (!srv
->stats
.pending
) {
874 /* someone spoke without being spoken to */
875 debugs(84, 1, "helperHandleRead: unexpected read from " <<
876 hlp
->id_name
<< " #" << srv
->index
+ 1 << ", " << (int)len
<<
877 " bytes '" << srv
->rbuf
<< "'");
883 while ((t
= strchr(srv
->rbuf
, '\n'))) {
884 /* end of reply found */
886 char *msg
= srv
->rbuf
;
888 debugs(84, 3, "helperHandleRead: end of reply found");
890 if (t
> srv
->rbuf
&& t
[-1] == '\r')
895 if (hlp
->concurrency
) {
896 i
= strtol(msg
, &msg
, 10);
898 while (*msg
&& xisspace(*msg
))
902 r
= srv
->requests
[i
];
905 HLPCB
*callback
= r
->callback
;
908 srv
->requests
[i
] = NULL
;
912 if (cbdataReferenceValidDone(r
->data
, &cbdata
))
913 callback(cbdata
, msg
);
915 srv
->stats
.pending
--;
917 hlp
->stats
.replies
++;
919 srv
->answer_time
= current_time
;
921 srv
->dispatch_time
= r
->dispatch_time
;
923 hlp
->stats
.avg_svc_time
=
924 intAverage(hlp
->stats
.avg_svc_time
,
925 tvSubMsec(r
->dispatch_time
, current_time
),
926 hlp
->stats
.replies
, REDIRECT_AV_FACTOR
);
928 helperRequestFree(r
);
930 debugs(84, 1, "helperHandleRead: unexpected reply on channel " <<
931 i
<< " from " << hlp
->id_name
<< " #" << srv
->index
+ 1 <<
932 " '" << srv
->rbuf
<< "'");
936 srv
->roffset
-= (t
- srv
->rbuf
);
937 memmove(srv
->rbuf
, t
, srv
->roffset
+ 1);
939 if (!srv
->flags
.shutdown
) {
940 helperKickQueue(hlp
);
941 } else if (!srv
->flags
.closing
&& !srv
->stats
.pending
) {
946 srv
->flags
.closing
=1;
953 comm_read(fd
, srv
->rbuf
+ srv
->roffset
, srv
->rbuf_sz
- srv
->roffset
- 1, helperHandleRead
, srv
);
957 helperStatefulHandleRead(int fd
, char *buf
, size_t len
, comm_err_t flag
, int xerrno
, void *data
)
960 helper_stateful_server
*srv
= (helper_stateful_server
*)data
;
961 helper_stateful_request
*r
;
962 statefulhelper
*hlp
= srv
->parent
;
963 assert(cbdataReferenceValid(data
));
965 /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
967 if (flag
== COMM_ERR_CLOSING
) {
971 assert(fd
== srv
->rfd
);
973 debugs(84, 5, "helperStatefulHandleRead: " << len
<< " bytes from " <<
974 hlp
->id_name
<< " #" << srv
->index
+ 1);
977 if (flag
!= COMM_OK
|| len
<= 0) {
979 debugs(84, 1, "helperStatefulHandleRead: FD " << fd
<< " read: " << xstrerror());
987 srv
->rbuf
[srv
->roffset
] = '\0';
991 /* someone spoke without being spoken to */
992 debugs(84, 1, "helperStatefulHandleRead: unexpected read from " <<
993 hlp
->id_name
<< " #" << srv
->index
+ 1 << ", " << (int)len
<<
994 " bytes '" << srv
->rbuf
<< "'");
999 if ((t
= strchr(srv
->rbuf
, '\n'))) {
1000 /* end of reply found */
1002 debugs(84, 3, "helperStatefulHandleRead: end of reply found");
1004 if (t
> srv
->rbuf
&& t
[-1] == '\r')
1009 if (r
&& cbdataReferenceValid(r
->data
)) {
1010 r
->callback(r
->data
, srv
, srv
->rbuf
);
1012 debugs(84, 1, "StatefulHandleRead: no callback data registered");
1016 srv
->flags
.busy
= 0;
1018 helperStatefulRequestFree(r
);
1019 srv
->request
= NULL
;
1020 hlp
->stats
.replies
++;
1021 srv
->answer_time
= current_time
;
1022 hlp
->stats
.avg_svc_time
=
1023 intAverage(hlp
->stats
.avg_svc_time
,
1024 tvSubMsec(srv
->dispatch_time
, current_time
),
1025 hlp
->stats
.replies
, REDIRECT_AV_FACTOR
);
1028 helperStatefulServerDone(srv
);
1030 helperStatefulReleaseServer(srv
);
1034 comm_read(srv
->rfd
, srv
->rbuf
+ srv
->roffset
, srv
->rbuf_sz
- srv
->roffset
- 1,
1035 helperStatefulHandleRead
, srv
);
1039 Enqueue(helper
* hlp
, helper_request
* r
)
1041 dlink_node
*link
= (dlink_node
*)memAllocate(MEM_DLINK_NODE
);
1042 dlinkAddTail(r
, link
, &hlp
->queue
);
1043 hlp
->stats
.queue_size
++;
1045 if (hlp
->stats
.queue_size
< hlp
->n_running
)
1048 if (squid_curtime
- hlp
->last_queue_warn
< 600)
1051 if (shutting_down
|| reconfiguring
)
1054 hlp
->last_queue_warn
= squid_curtime
;
1056 debugs(84, 0, "WARNING: All " << hlp
->id_name
<< " processes are busy.");
1057 debugs(84, 0, "WARNING: " << hlp
->stats
.queue_size
<< " pending requests queued");
1060 if (hlp
->stats
.queue_size
> hlp
->n_running
* 2)
1061 fatalf("Too many queued %s requests", hlp
->id_name
);
1063 debugs(84, 1, "Consider increasing the number of " << hlp
->id_name
<< " processes in your config file.");
1068 StatefulEnqueue(statefulhelper
* hlp
, helper_stateful_request
* r
)
1070 dlink_node
*link
= (dlink_node
*)memAllocate(MEM_DLINK_NODE
);
1071 dlinkAddTail(r
, link
, &hlp
->queue
);
1072 hlp
->stats
.queue_size
++;
1074 if (hlp
->stats
.queue_size
< hlp
->n_running
)
1077 if (hlp
->stats
.queue_size
> hlp
->n_running
* 2)
1078 fatalf("Too many queued %s requests", hlp
->id_name
);
1080 if (squid_curtime
- hlp
->last_queue_warn
< 600)
1083 if (shutting_down
|| reconfiguring
)
1086 hlp
->last_queue_warn
= squid_curtime
;
1088 debugs(84, 0, "WARNING: All " << hlp
->id_name
<< " processes are busy.");
1090 debugs(84, 0, "WARNING: " << hlp
->stats
.queue_size
<< " pending requests queued");
1091 debugs(84, 1, "Consider increasing the number of " << hlp
->id_name
<< " processes in your config file.");
1095 static helper_request
*
1096 Dequeue(helper
* hlp
)
1099 helper_request
*r
= NULL
;
1101 if ((link
= hlp
->queue
.head
)) {
1102 r
= (helper_request
*)link
->data
;
1103 dlinkDelete(link
, &hlp
->queue
);
1104 memFree(link
, MEM_DLINK_NODE
);
1105 hlp
->stats
.queue_size
--;
1111 static helper_stateful_request
*
1112 StatefulDequeue(statefulhelper
* hlp
)
1115 helper_stateful_request
*r
= NULL
;
1117 if ((link
= hlp
->queue
.head
)) {
1118 r
= (helper_stateful_request
*)link
->data
;
1119 dlinkDelete(link
, &hlp
->queue
);
1120 memFree(link
, MEM_DLINK_NODE
);
1121 hlp
->stats
.queue_size
--;
1127 static helper_server
*
1128 GetFirstAvailable(helper
* hlp
)
1132 helper_server
*selected
= NULL
;
1134 if (hlp
->n_running
== 0)
1137 /* Find "least" loaded helper (approx) */
1138 for (n
= hlp
->servers
.head
; n
!= NULL
; n
= n
->next
) {
1139 srv
= (helper_server
*)n
->data
;
1141 if (selected
&& selected
->stats
.pending
<= srv
->stats
.pending
)
1144 if (srv
->flags
.shutdown
)
1147 if (!srv
->stats
.pending
)
1158 /* Check for overload */
1162 if (selected
->stats
.pending
>= (hlp
->concurrency
? hlp
->concurrency
: 1))
1168 static helper_stateful_server
*
1169 StatefulGetFirstAvailable(statefulhelper
* hlp
)
1172 helper_stateful_server
*srv
= NULL
;
1173 debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp
->n_running
);
1175 if (hlp
->n_running
== 0)
1178 for (n
= hlp
->servers
.head
; n
!= NULL
; n
= n
->next
) {
1179 srv
= (helper_stateful_server
*)n
->data
;
1181 if (srv
->flags
.busy
)
1184 if (srv
->flags
.reserved
)
1187 if (srv
->flags
.shutdown
)
1190 if ((hlp
->IsAvailable
!= NULL
) && (srv
->data
!= NULL
) && !(hlp
->IsAvailable(srv
->data
)))
1193 debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv
->index
);
1197 debugs(84, 5, "StatefulGetFirstAvailable: None available.");
1203 helperDispatchWriteDone(int fd
, char *buf
, size_t len
, comm_err_t flag
, int xerrno
, void *data
)
1205 helper_server
*srv
= (helper_server
*)data
;
1207 srv
->writebuf
->clean();
1208 delete srv
->writebuf
;
1209 srv
->writebuf
= NULL
;
1210 srv
->flags
.writing
= 0;
1212 if (flag
!= COMM_OK
) {
1213 /* Helper server has crashed */
1214 debugs(84, 0, "helperDispatch: Helper " << srv
->parent
->id_name
<< " #" << srv
->index
+ 1 << " has crashed");
1218 if (!srv
->wqueue
->isNull()) {
1219 srv
->writebuf
= srv
->wqueue
;
1220 srv
->wqueue
= new MemBuf
;
1221 srv
->flags
.writing
= 1;
1222 comm_write(srv
->wfd
,
1223 srv
->writebuf
->content(),
1224 srv
->writebuf
->contentSize(),
1225 helperDispatchWriteDone
, /* Handler */
1226 srv
, NULL
); /* Handler-data, freefunc */
1231 helperDispatch(helper_server
* srv
, helper_request
* r
)
1233 helper
*hlp
= srv
->parent
;
1234 helper_request
**ptr
= NULL
;
1237 if (!cbdataReferenceValid(r
->data
)) {
1238 debugs(84, 1, "helperDispatch: invalid callback data");
1239 helperRequestFree(r
);
1243 for (slot
= 0; slot
< (hlp
->concurrency
? hlp
->concurrency
: 1); slot
++) {
1244 if (!srv
->requests
[slot
]) {
1245 ptr
= &srv
->requests
[slot
];
1252 srv
->stats
.pending
+= 1;
1253 r
->dispatch_time
= current_time
;
1255 if (srv
->wqueue
->isNull())
1256 srv
->wqueue
->init();
1258 if (hlp
->concurrency
)
1259 srv
->wqueue
->Printf("%d %s", slot
, r
->buf
);
1261 srv
->wqueue
->append(r
->buf
, strlen(r
->buf
));
1263 if (!srv
->flags
.writing
) {
1264 assert(NULL
== srv
->writebuf
);
1265 srv
->writebuf
= srv
->wqueue
;
1266 srv
->wqueue
= new MemBuf
;
1267 srv
->flags
.writing
= 1;
1268 comm_write(srv
->wfd
,
1269 srv
->writebuf
->content(),
1270 srv
->writebuf
->contentSize(),
1271 helperDispatchWriteDone
, /* Handler */
1272 srv
, NULL
); /* Handler-data, free func */
1275 debugs(84, 5, "helperDispatch: Request sent to " << hlp
->id_name
<< " #" << srv
->index
+ 1 << ", " << strlen(r
->buf
) << " bytes");
1278 hlp
->stats
.requests
++;
1282 helperStatefulDispatchWriteDone(int fd
, char *buf
, size_t len
, comm_err_t flag
,
1283 int xerrno
, void *data
)
1290 helperStatefulDispatch(helper_stateful_server
* srv
, helper_stateful_request
* r
)
1292 statefulhelper
*hlp
= srv
->parent
;
1294 if (!cbdataReferenceValid(r
->data
)) {
1295 debugs(84, 1, "helperStatefulDispatch: invalid callback data");
1296 helperStatefulRequestFree(r
);
1297 helperStatefulReleaseServer(srv
);
1301 debugs(84, 9, "helperStatefulDispatch busying helper " << hlp
->id_name
<< " #" << srv
->index
+ 1);
1303 if (r
->placeholder
== 1) {
1304 /* a callback is needed before this request can _use_ a helper. */
1305 /* we don't care about releasing this helper. The request NEVER
1306 * gets to the helper. So we throw away the return code */
1307 r
->callback(r
->data
, srv
, NULL
);
1308 /* throw away the placeholder */
1309 helperStatefulRequestFree(r
);
1310 /* and push the queue. Note that the callback may have submitted a new
1311 * request to the helper which is why we test for the request*/
1313 if (srv
->request
== NULL
)
1314 helperStatefulServerDone(srv
);
1319 srv
->flags
.busy
= 1;
1320 srv
->flags
.reserved
= 1;
1322 srv
->dispatch_time
= current_time
;
1323 comm_write(srv
->wfd
,
1326 helperStatefulDispatchWriteDone
, /* Handler */
1327 hlp
, NULL
); /* Handler-data, free func */
1328 debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
1329 hlp
->id_name
<< " #" << srv
->index
+ 1 << ", " <<
1330 (int) strlen(r
->buf
) << " bytes");
1333 hlp
->stats
.requests
++;
1338 helperKickQueue(helper
* hlp
)
1343 while ((srv
= GetFirstAvailable(hlp
)) && (r
= Dequeue(hlp
)))
1344 helperDispatch(srv
, r
);
1348 helperStatefulKickQueue(statefulhelper
* hlp
)
1350 helper_stateful_request
*r
;
1351 helper_stateful_server
*srv
;
1353 while ((srv
= StatefulGetFirstAvailable(hlp
)) && (r
= StatefulDequeue(hlp
)))
1354 helperStatefulDispatch(srv
, r
);
1358 helperStatefulServerDone(helper_stateful_server
* srv
)
1360 if (!srv
->flags
.shutdown
) {
1361 helperStatefulKickQueue(srv
->parent
);
1362 } else if (!srv
->flags
.closing
&& !srv
->flags
.reserved
&& !srv
->flags
.busy
) {
1365 if (srv
->rfd
== wfd
)
1367 srv
->flags
.closing
=1;
1374 helperRequestFree(helper_request
* r
)
1376 cbdataReferenceDone(r
->data
);
1382 helperStatefulRequestFree(helper_stateful_request
* r
)
1385 cbdataReferenceDone(r
->data
);
1391 // TODO: should helper_ and helper_stateful_ have a common parent?
1393 helperStartStats(StoreEntry
*sentry
, void *hlp
, const char *label
)
1397 storeAppendPrintf(sentry
, "%s: unavailable\n", label
);
1402 storeAppendPrintf(sentry
, "%s:\n", label
);