3 * $Id: helper.cc,v 1.24 2001/01/12 00:37:18 wessels Exp $
5 * DEBUG: section 29 Helper process maintenance
6 * AUTHOR: Harvest Derived?
8 * SQUID Web Proxy Cache http://www.squid-cache.org/
9 * ----------------------------------------------------------
11 * Squid is the result of efforts by numerous individuals from
12 * the Internet community; see the CONTRIBUTORS file for full
13 * details. Many organizations have provided support for Squid's
14 * development; see the SPONSORS file for full details. Squid is
15 * Copyrighted (C) 2001 by the Regents of the University of
16 * California; see the COPYRIGHT file for full details. Squid
17 * incorporates software developed and/or copyrighted by other
18 * sources; see the CREDITS file for full details.
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
30 * You should have received a copy of the GNU General Public License
31 * along with this program; if not, write to the Free Software
32 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
38 #define HELPER_MAX_ARGS 64
40 static PF helperHandleRead
;
41 static PF helperStatefulHandleRead
;
42 static PF helperServerFree
;
43 static PF helperStatefulServerFree
;
44 static void Enqueue(helper
* hlp
, helper_request
*);
45 static helper_request
*Dequeue(helper
* hlp
);
46 static helper_stateful_request
*StatefulDequeue(statefulhelper
* hlp
);
47 static helper_server
*GetFirstAvailable(helper
* hlp
);
48 static helper_stateful_server
*StatefulGetFirstAvailable(statefulhelper
* hlp
);
49 static void helperDispatch(helper_server
* srv
, helper_request
* r
);
50 static void helperStatefulDispatch(helper_stateful_server
* srv
, helper_stateful_request
* r
);
51 static void helperKickQueue(helper
* hlp
);
52 static void helperStatefulKickQueue(statefulhelper
* hlp
);
53 static void helperRequestFree(helper_request
* r
);
54 static void helperStatefulRequestFree(helper_stateful_request
* r
);
55 static void StatefulEnqueue(statefulhelper
* hlp
, helper_stateful_request
* r
);
56 static helper_stateful_request
*StatefulServerDequeue(helper_stateful_server
* srv
);
57 static void StatefulServerEnqueue(helper_stateful_server
* srv
, helper_stateful_request
* r
);
58 static void helperStatefulServerKickQueue(helper_stateful_server
* srv
);
61 helperOpenServers(helper
* hlp
)
67 char *args
[HELPER_MAX_ARGS
];
68 char fd_note_buf
[FD_DESC_SZ
];
76 if (hlp
->cmdline
== NULL
)
78 progname
= hlp
->cmdline
->key
;
79 if ((s
= strrchr(progname
, '/')))
80 shortname
= xstrdup(s
+ 1);
82 shortname
= xstrdup(progname
);
83 debug(29, 1) ("helperOpenServers: Starting %d '%s' processes\n",
84 hlp
->n_to_start
, shortname
);
85 procname
= xmalloc(strlen(shortname
) + 3);
86 snprintf(procname
, strlen(shortname
) + 3, "(%s)", shortname
);
87 args
[nargs
++] = procname
;
88 for (w
= hlp
->cmdline
->next
; w
&& nargs
< HELPER_MAX_ARGS
; w
= w
->next
)
89 args
[nargs
++] = w
->key
;
91 assert(nargs
<= HELPER_MAX_ARGS
);
92 for (k
= 0; k
< hlp
->n_to_start
; k
++) {
95 x
= ipcCreate(hlp
->ipc_type
,
102 debug(29, 1) ("WARNING: Cannot run '%s' process.\n", progname
);
106 srv
= CBDATA_ALLOC(helper_server
, NULL
);
107 srv
->flags
.alive
= 1;
111 srv
->buf
= memAllocate(MEM_8K_BUF
);
115 cbdataLock(hlp
); /* lock because of the parent backlink */
116 dlinkAddTail(srv
, &srv
->link
, &hlp
->servers
);
118 snprintf(fd_note_buf
, FD_DESC_SZ
, "%s #%d", shortname
, k
+ 1);
119 fd_note(rfd
, fd_note_buf
);
121 snprintf(fd_note_buf
, FD_DESC_SZ
, "reading %s #%d", shortname
, k
+ 1);
122 fd_note(rfd
, fd_note_buf
);
123 snprintf(fd_note_buf
, FD_DESC_SZ
, "writing %s #%d", shortname
, k
+ 1);
124 fd_note(wfd
, fd_note_buf
);
126 commSetNonBlocking(rfd
);
128 commSetNonBlocking(wfd
);
129 comm_add_close_handler(rfd
, helperServerFree
, srv
);
131 safe_free(shortname
);
133 helperKickQueue(hlp
);
137 helperStatefulOpenServers(statefulhelper
* hlp
)
143 char *args
[HELPER_MAX_ARGS
];
144 char fd_note_buf
[FD_DESC_SZ
];
145 helper_stateful_server
*srv
;
152 if (hlp
->cmdline
== NULL
)
154 progname
= hlp
->cmdline
->key
;
155 if ((s
= strrchr(progname
, '/')))
156 shortname
= xstrdup(s
+ 1);
158 shortname
= xstrdup(progname
);
159 debug(29, 1) ("helperStatefulOpenServers: Starting %d '%s' processes\n",
160 hlp
->n_to_start
, shortname
);
161 procname
= xmalloc(strlen(shortname
) + 3);
162 snprintf(procname
, strlen(shortname
) + 3, "(%s)", shortname
);
163 args
[nargs
++] = procname
;
164 for (w
= hlp
->cmdline
->next
; w
&& nargs
< HELPER_MAX_ARGS
; w
= w
->next
)
165 args
[nargs
++] = w
->key
;
166 args
[nargs
++] = NULL
;
167 assert(nargs
<= HELPER_MAX_ARGS
);
168 for (k
= 0; k
< hlp
->n_to_start
; k
++) {
171 x
= ipcCreate(hlp
->ipc_type
,
178 debug(29, 1) ("WARNING: Cannot run '%s' process.\n", progname
);
182 srv
= CBDATA_ALLOC(helper_stateful_server
, NULL
);
183 srv
->flags
.alive
= 1;
184 srv
->flags
.reserved
= S_HELPER_FREE
;
185 srv
->deferred_requests
= 0;
189 srv
->buf
= memAllocate(MEM_8K_BUF
);
193 if (hlp
->datapool
!= NULL
)
194 srv
->data
= memPoolAlloc(hlp
->datapool
);
195 cbdataLock(hlp
); /* lock because of the parent backlink */
196 dlinkAddTail(srv
, &srv
->link
, &hlp
->servers
);
198 snprintf(fd_note_buf
, FD_DESC_SZ
, "%s #%d", shortname
, k
+ 1);
199 fd_note(rfd
, fd_note_buf
);
201 snprintf(fd_note_buf
, FD_DESC_SZ
, "reading %s #%d", shortname
, k
+ 1);
202 fd_note(rfd
, fd_note_buf
);
203 snprintf(fd_note_buf
, FD_DESC_SZ
, "writing %s #%d", shortname
, k
+ 1);
204 fd_note(wfd
, fd_note_buf
);
206 commSetNonBlocking(rfd
);
208 commSetNonBlocking(wfd
);
209 comm_add_close_handler(rfd
, helperStatefulServerFree
, srv
);
211 safe_free(shortname
);
213 helperStatefulKickQueue(hlp
);
218 helperSubmit(helper
* hlp
, const char *buf
, HLPCB
* callback
, void *data
)
220 helper_request
*r
= memAllocate(MEM_HELPER_REQUEST
);
223 debug(29, 3) ("helperSubmit: hlp == NULL\n");
224 callback(data
, NULL
);
227 r
->callback
= callback
;
229 r
->buf
= xstrdup(buf
);
231 if ((srv
= GetFirstAvailable(hlp
)))
232 helperDispatch(srv
, r
);
235 debug(29, 9) ("helperSubmit: %s\n", buf
);
239 helperStatefulSubmit(statefulhelper
* hlp
, const char *buf
, HLPSCB
* callback
, void *data
, helper_stateful_server
* lastserver
)
241 helper_stateful_request
*r
= memAllocate(MEM_HELPER_STATEFUL_REQUEST
);
242 helper_stateful_server
*srv
;
244 debug(29, 3) ("helperStatefulSubmit: hlp == NULL\n");
245 callback(data
, 0, NULL
);
248 r
->callback
= callback
;
251 r
->buf
= xstrdup(buf
);
255 if ((buf
!= NULL
) && lastserver
) {
256 debug(29, 5) ("StatefulSubmit with lastserver %d\n", lastserver
);
257 if (lastserver
->flags
.reserved
!= S_HELPER_RESERVED
)
258 lastserver
->deferred_requests
--;
259 if (!(lastserver
->request
)) {
260 debug(29, 5) ("StatefulSubmit dispatching\n");
261 helperStatefulDispatch(lastserver
, r
);
263 debug(29, 5) ("StatefulSubmit queuing\n");
264 StatefulServerEnqueue(lastserver
, r
);
267 if ((srv
= StatefulGetFirstAvailable(hlp
))) {
268 helperStatefulDispatch(srv
, r
);
270 StatefulEnqueue(hlp
, r
);
272 debug(29, 9) ("helperStatefulSubmit: placeholder: '%d', buf '%s'.\n", r
->placeholder
, buf
);
275 helper_stateful_server
*
276 helperStatefulDefer(statefulhelper
* hlp
)
277 /* find and add a deferred request to a server */
280 helper_stateful_server
*srv
= NULL
, *rv
= NULL
;
282 debug(29, 3) ("helperStatefulReserve: hlp == NULL\n");
285 debug(29, 5) ("helperStatefulDefer: Running servers %d.\n", hlp
->n_running
);
286 if (hlp
->n_running
== 0) {
287 debug(29, 1) ("helperStatefulDefer: No running servers!. \n");
290 srv
= StatefulGetFirstAvailable(hlp
);
291 /* all currently busy:loop through servers and find server with the shortest queue */
294 for (n
= hlp
->servers
.head
; n
!= NULL
; n
= n
->next
) {
296 if (srv
->flags
.reserved
== S_HELPER_RESERVED
)
298 if (!srv
->flags
.alive
)
300 if ((hlp
->IsAvailable
!= NULL
) && (srv
->data
!= NULL
) &&
301 !(hlp
->IsAvailable(srv
->data
)))
303 if ((rv
!= NULL
) && (rv
->deferred_requests
< srv
->deferred_requests
))
308 debug(29, 1) ("helperStatefulDefer: None available.\n");
311 rv
->flags
.reserved
= S_HELPER_DEFERRED
;
312 rv
->deferred_requests
++;
317 helperStatefulReset(helper_stateful_server
* srv
)
318 /* puts this helper back in the queue. the calling app is required to
319 * manage the state in the helper.
322 statefulhelper
*hlp
= srv
->parent
;
323 helper_stateful_request
*r
;
326 /* reset attempt DURING an outstaning request */
327 debug(29, 1) ("helperStatefulReset: RESET During request %s \n",
331 helperStatefulRequestFree(r
);
334 debug(29, 1) ("helperStatefulReset reset helper %s #%d\n", hlp
->id_name
, srv
->index
+ 1);
336 if (srv
->queue
.head
) {
337 srv
->flags
.reserved
= S_HELPER_DEFERRED
;
338 helperStatefulServerKickQueue(srv
);
340 srv
->flags
.reserved
= S_HELPER_FREE
;
341 if ((srv
->parent
->OnEmptyQueue
!= NULL
) && (srv
->data
))
342 srv
->parent
->OnEmptyQueue(srv
->data
);
343 helperStatefulKickQueue(hlp
);
348 helperStatefulReleaseServer(helper_stateful_server
* srv
)
349 /*decrease the number of 'waiting' clients that set the helper to be DEFERRED */
351 if (srv
->deferred_requests
> 0)
352 srv
->deferred_requests
--;
353 if (!(srv
->deferred_requests
) && (srv
->flags
.reserved
== S_HELPER_DEFERRED
) && !(srv
->queue
.head
)) {
354 srv
->flags
.reserved
= S_HELPER_FREE
;
355 if ((srv
->parent
->OnEmptyQueue
!= NULL
) && (srv
->data
))
356 srv
->parent
->OnEmptyQueue(srv
->data
);
361 helperStatefulServerGetData(helper_stateful_server
* srv
)
362 /* return a pointer to the stateful routines data area */
368 helperStats(StoreEntry
* sentry
, helper
* hlp
)
373 storeAppendPrintf(sentry
, "number running: %d of %d\n",
374 hlp
->n_running
, hlp
->n_to_start
);
375 storeAppendPrintf(sentry
, "requests sent: %d\n",
376 hlp
->stats
.requests
);
377 storeAppendPrintf(sentry
, "replies received: %d\n",
379 storeAppendPrintf(sentry
, "queue length: %d\n",
380 hlp
->stats
.queue_size
);
381 storeAppendPrintf(sentry
, "avg service time: %d msec\n",
382 hlp
->stats
.avg_svc_time
);
383 storeAppendPrintf(sentry
, "\n");
384 storeAppendPrintf(sentry
, "%7s\t%7s\t%11s\t%s\t%7s\t%7s\t%7s\n",
392 for (link
= hlp
->servers
.head
; link
; link
= link
->next
) {
394 tt
= 0.001 * tvSubMsec(srv
->dispatch_time
, current_time
);
395 storeAppendPrintf(sentry
, "%7d\t%7d\t%11d\t%c%c%c%c\t%7.3f\t%7d\t%s\n",
399 srv
->flags
.alive
? 'A' : ' ',
400 srv
->flags
.busy
? 'B' : ' ',
401 srv
->flags
.closing
? 'C' : ' ',
402 srv
->flags
.shutdown
? 'S' : ' ',
405 srv
->request
? log_quote(srv
->request
->buf
) : "(none)");
407 storeAppendPrintf(sentry
, "\nFlags key:\n\n");
408 storeAppendPrintf(sentry
, " A = ALIVE\n");
409 storeAppendPrintf(sentry
, " B = BUSY\n");
410 storeAppendPrintf(sentry
, " C = CLOSING\n");
411 storeAppendPrintf(sentry
, " S = SHUTDOWN\n");
415 helperStatefulStats(StoreEntry
* sentry
, statefulhelper
* hlp
)
417 helper_stateful_server
*srv
;
420 storeAppendPrintf(sentry
, "number running: %d of %d\n",
421 hlp
->n_running
, hlp
->n_to_start
);
422 storeAppendPrintf(sentry
, "requests sent: %d\n",
423 hlp
->stats
.requests
);
424 storeAppendPrintf(sentry
, "replies received: %d\n",
426 storeAppendPrintf(sentry
, "queue length: %d\n",
427 hlp
->stats
.queue_size
);
428 storeAppendPrintf(sentry
, "avg service time: %d msec\n",
429 hlp
->stats
.avg_svc_time
);
430 storeAppendPrintf(sentry
, "\n");
431 storeAppendPrintf(sentry
, "%7s\t%7s\t%11s\t%s\t%7s\t%7s\t%7s\n",
435 "# Deferred Requests",
440 for (link
= hlp
->servers
.head
; link
; link
= link
->next
) {
442 tt
= 0.001 * tvSubMsec(srv
->dispatch_time
, current_time
);
443 storeAppendPrintf(sentry
, "%7d\t%7d\t%11d\t%11d\t%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
447 srv
->deferred_requests
,
448 srv
->flags
.alive
? 'A' : ' ',
449 srv
->flags
.busy
? 'B' : ' ',
450 srv
->flags
.closing
? 'C' : ' ',
451 srv
->flags
.reserved
!= S_HELPER_FREE
? 'R' : ' ',
452 srv
->flags
.shutdown
? 'S' : ' ',
455 srv
->request
? log_quote(srv
->request
->buf
) : "(none)");
457 storeAppendPrintf(sentry
, "\nFlags key:\n\n");
458 storeAppendPrintf(sentry
, " A = ALIVE\n");
459 storeAppendPrintf(sentry
, " B = BUSY\n");
460 storeAppendPrintf(sentry
, " C = CLOSING\n");
461 storeAppendPrintf(sentry
, " R = RESERVED or DEFERRED\n");
462 storeAppendPrintf(sentry
, " S = SHUTDOWN\n");
466 helperShutdown(helper
* hlp
)
468 dlink_node
*link
= hlp
->servers
.head
;
473 if (!srv
->flags
.alive
) {
474 debug(34, 3) ("helperShutdown: %s #%d is NOT ALIVE.\n",
475 hlp
->id_name
, srv
->index
+ 1);
478 srv
->flags
.shutdown
= 1; /* request it to shut itself down */
479 if (srv
->flags
.busy
) {
480 debug(34, 3) ("helperShutdown: %s #%d is BUSY.\n",
481 hlp
->id_name
, srv
->index
+ 1);
484 if (srv
->flags
.closing
) {
485 debug(34, 3) ("helperShutdown: %s #%d is CLOSING.\n",
486 hlp
->id_name
, srv
->index
+ 1);
489 srv
->flags
.closing
= 1;
490 comm_close(srv
->wfd
);
496 helperStatefulShutdown(statefulhelper
* hlp
)
498 dlink_node
*link
= hlp
->servers
.head
;
499 helper_stateful_server
*srv
;
503 if (!srv
->flags
.alive
) {
504 debug(34, 3) ("helperStatefulShutdown: %s #%d is NOT ALIVE.\n",
505 hlp
->id_name
, srv
->index
+ 1);
508 srv
->flags
.shutdown
= 1; /* request it to shut itself down */
509 if (srv
->flags
.busy
) {
510 debug(34, 3) ("helperStatefulShutdown: %s #%d is BUSY.\n",
511 hlp
->id_name
, srv
->index
+ 1);
514 if (srv
->flags
.closing
) {
515 debug(34, 3) ("helperStatefulShutdown: %s #%d is CLOSING.\n",
516 hlp
->id_name
, srv
->index
+ 1);
519 if (srv
->flags
.reserved
!= S_HELPER_FREE
) {
520 debug(34, 3) ("helperStatefulShutdown: %s #%d is RESERVED.\n",
521 hlp
->id_name
, srv
->index
+ 1);
524 if (srv
->deferred_requests
) {
525 debug(34, 3) ("helperStatefulShutdown: %s #%d has DEFERRED requests.\n",
526 hlp
->id_name
, srv
->index
+ 1);
529 srv
->flags
.closing
= 1;
530 comm_close(srv
->wfd
);
537 helperCreate(const char *name
)
540 hlp
= CBDATA_ALLOC(helper
, NULL
);
546 helperStatefulCreate(const char *name
)
549 hlp
= CBDATA_ALLOC(statefulhelper
, NULL
);
556 helperFree(helper
* hlp
)
560 /* note, don't free hlp->name, it probably points to static memory */
562 debug(29, 0) ("WARNING: freeing %s helper with %d requests queued\n",
563 hlp
->id_name
, hlp
->stats
.queue_size
);
568 helperStatefulFree(statefulhelper
* hlp
)
572 /* note, don't free hlp->name, it probably points to static memory */
574 debug(29, 0) ("WARNING: freeing %s helper with %d requests queued\n",
575 hlp
->id_name
, hlp
->stats
.queue_size
);
580 /* ====================================================================== */
581 /* LOCAL FUNCTIONS */
582 /* ====================================================================== */
585 helperServerFree(int fd
, void *data
)
587 helper_server
*srv
= data
;
588 helper
*hlp
= srv
->parent
;
590 assert(srv
->rfd
== fd
);
592 memFree(srv
->buf
, MEM_8K_BUF
);
595 if ((r
= srv
->request
)) {
596 if (cbdataValid(r
->data
))
597 r
->callback(r
->data
, srv
->buf
);
598 helperRequestFree(r
);
601 if (srv
->wfd
!= srv
->rfd
&& srv
->wfd
!= -1)
602 comm_close(srv
->wfd
);
603 dlinkDelete(&srv
->link
, &hlp
->servers
);
605 assert(hlp
->n_running
>= 0);
606 if (!srv
->flags
.shutdown
) {
607 debug(34, 0) ("WARNING: %s #%d (FD %d) exited\n",
608 hlp
->id_name
, srv
->index
+ 1, fd
);
609 if (hlp
->n_running
< hlp
->n_to_start
/ 2)
610 fatalf("Too few %s processes are running", hlp
->id_name
);
612 cbdataUnlock(srv
->parent
);
617 helperStatefulServerFree(int fd
, void *data
)
619 helper_stateful_server
*srv
= data
;
620 statefulhelper
*hlp
= srv
->parent
;
621 helper_stateful_request
*r
;
622 assert(srv
->rfd
== fd
);
624 memFree(srv
->buf
, MEM_8K_BUF
);
627 if ((r
= srv
->request
)) {
628 if (cbdataValid(r
->data
))
629 r
->callback(r
->data
, srv
, srv
->buf
);
630 helperStatefulRequestFree(r
);
633 if (srv
->wfd
!= srv
->rfd
&& srv
->wfd
!= -1)
634 comm_close(srv
->wfd
);
635 dlinkDelete(&srv
->link
, &hlp
->servers
);
637 assert(hlp
->n_running
>= 0);
638 if (!srv
->flags
.shutdown
) {
639 debug(34, 0) ("WARNING: %s #%d (FD %d) exited\n",
640 hlp
->id_name
, srv
->index
+ 1, fd
);
641 if (hlp
->n_running
< hlp
->n_to_start
/ 2)
642 fatalf("Too few %s processes are running", hlp
->id_name
);
644 if (srv
->data
!= NULL
)
645 memPoolFree(hlp
->datapool
, srv
->data
);
646 cbdataUnlock(srv
->parent
);
652 helperHandleRead(int fd
, void *data
)
656 helper_server
*srv
= data
;
658 helper
*hlp
= srv
->parent
;
659 assert(fd
== srv
->rfd
);
660 assert(cbdataValid(data
));
661 statCounter
.syscalls
.sock
.reads
++;
662 len
= read(fd
, srv
->buf
+ srv
->offset
, srv
->buf_sz
- srv
->offset
);
663 fd_bytes(fd
, len
, FD_READ
);
664 debug(29, 5) ("helperHandleRead: %d bytes from %s #%d.\n",
665 len
, hlp
->id_name
, srv
->index
+ 1);
668 debug(50, 1) ("helperHandleRead: FD %d read: %s\n", fd
, xstrerror());
673 srv
->buf
[srv
->offset
] = '\0';
676 /* someone spoke without being spoken to */
677 debug(29, 1) ("helperHandleRead: unexpected read from %s #%d, %d bytes\n",
678 hlp
->id_name
, srv
->index
+ 1, len
);
680 } else if ((t
= strchr(srv
->buf
, '\n'))) {
681 /* end of reply found */
682 debug(29, 3) ("helperHandleRead: end of reply found\n");
684 if (cbdataValid(r
->data
))
685 r
->callback(r
->data
, srv
->buf
);
688 helperRequestFree(r
);
690 hlp
->stats
.replies
++;
691 hlp
->stats
.avg_svc_time
=
692 intAverage(hlp
->stats
.avg_svc_time
,
693 tvSubMsec(srv
->dispatch_time
, current_time
),
694 hlp
->stats
.replies
, REDIRECT_AV_FACTOR
);
695 if (srv
->flags
.shutdown
) {
696 comm_close(srv
->wfd
);
699 helperKickQueue(hlp
);
701 commSetSelect(srv
->rfd
, COMM_SELECT_READ
, helperHandleRead
, srv
, 0);
706 helperStatefulHandleRead(int fd
, void *data
)
710 helper_stateful_server
*srv
= data
;
711 helper_stateful_request
*r
;
712 statefulhelper
*hlp
= srv
->parent
;
713 assert(fd
== srv
->rfd
);
714 assert(cbdataValid(data
));
715 statCounter
.syscalls
.sock
.reads
++;
716 len
= read(fd
, srv
->buf
+ srv
->offset
, srv
->buf_sz
- srv
->offset
);
717 fd_bytes(fd
, len
, FD_READ
);
718 debug(29, 5) ("helperStatefulHandleRead: %d bytes from %s #%d.\n",
719 len
, hlp
->id_name
, srv
->index
+ 1);
722 debug(50, 1) ("helperStatefulHandleRead: FD %d read: %s\n", fd
, xstrerror());
727 srv
->buf
[srv
->offset
] = '\0';
730 /* someone spoke without being spoken to */
731 debug(29, 1) ("helperStatefulHandleRead: unexpected read from %s #%d, %d bytes\n",
732 hlp
->id_name
, srv
->index
+ 1, len
);
734 } else if ((t
= strchr(srv
->buf
, '\n'))) {
735 /* end of reply found */
736 debug(29, 3) ("helperStatefulHandleRead: end of reply found\n");
738 if (cbdataValid(r
->data
)) {
739 switch ((r
->callback(r
->data
, srv
, srv
->buf
))) { /*if non-zero reserve helper */
740 case S_HELPER_UNKNOWN
:
741 fatal("helperStatefulHandleRead: either a non-state aware callback was give to the stateful helper routines, or an uninitialised callback response was recieved.\n");
743 case S_HELPER_RELEASE
: /* helper finished with */
744 if (!srv
->queue
.head
) {
745 srv
->flags
.reserved
= S_HELPER_FREE
;
746 if ((srv
->parent
->OnEmptyQueue
!= NULL
) && (srv
->data
))
747 srv
->parent
->OnEmptyQueue(srv
->data
);
748 debug(29, 5) ("StatefulHandleRead: releasing %s #%d\n", hlp
->id_name
, srv
->index
+ 1);
750 srv
->flags
.reserved
= S_HELPER_DEFERRED
;
751 debug(29, 5) ("StatefulHandleRead: outstanding deferred requests on %s #%d. reserving for deferred requests.\n", hlp
->id_name
, srv
->index
+ 1);
754 case S_HELPER_RESERVE
: /* 'pin' this helper for the caller */
755 if (!srv
->queue
.head
) {
756 srv
->flags
.reserved
= S_HELPER_RESERVED
;
757 debug(29, 5) ("StatefulHandleRead: reserving %s #%d\n", hlp
->id_name
, srv
->index
+ 1);
759 fatal("StatefulHandleRead: Callback routine attempted to reserve a stateful helper with deferred requests. This can lead to deadlock.\n");
763 /* the helper is still needed, but can
764 * be used for other requests in the meantime.
766 srv
->flags
.reserved
= S_HELPER_DEFERRED
;
767 srv
->deferred_requests
++;
768 debug(29, 5) ("StatefulHandleRead: reserving %s #%d for deferred requests.\n", hlp
->id_name
, srv
->index
+ 1);
771 fatal("helperStatefulHandleRead: unknown stateful helper callback result.\n");
775 debug(29, 1) ("StatefulHandleRead: no callback data registered\n");
779 helperStatefulRequestFree(r
);
781 hlp
->stats
.replies
++;
782 hlp
->stats
.avg_svc_time
=
783 intAverage(hlp
->stats
.avg_svc_time
,
784 tvSubMsec(srv
->dispatch_time
, current_time
),
785 hlp
->stats
.replies
, REDIRECT_AV_FACTOR
);
786 if (srv
->flags
.shutdown
) {
787 comm_close(srv
->wfd
);
791 helperStatefulServerKickQueue(srv
);
793 helperStatefulKickQueue(hlp
);
796 commSetSelect(srv
->rfd
, COMM_SELECT_READ
, helperStatefulHandleRead
, srv
, 0);
801 Enqueue(helper
* hlp
, helper_request
* r
)
803 dlink_node
*link
= memAllocate(MEM_DLINK_NODE
);
804 dlinkAddTail(r
, link
, &hlp
->queue
);
805 hlp
->stats
.queue_size
++;
806 if (hlp
->stats
.queue_size
< hlp
->n_running
)
808 if (squid_curtime
- hlp
->last_queue_warn
< 600)
810 if (shutting_down
|| reconfiguring
)
812 hlp
->last_queue_warn
= squid_curtime
;
813 debug(14, 0) ("WARNING: All %s processes are busy.\n", hlp
->id_name
);
814 debug(14, 0) ("WARNING: %d pending requests queued\n", hlp
->stats
.queue_size
);
815 if (hlp
->stats
.queue_size
> hlp
->n_running
* 2)
816 fatalf("Too many queued %s requests", hlp
->id_name
);
817 debug(14, 1) ("Consider increasing the number of %s processes in your config file.\n", hlp
->id_name
);
821 StatefulEnqueue(statefulhelper
* hlp
, helper_stateful_request
* r
)
823 dlink_node
*link
= memAllocate(MEM_DLINK_NODE
);
824 dlinkAddTail(r
, link
, &hlp
->queue
);
825 hlp
->stats
.queue_size
++;
826 if (hlp
->stats
.queue_size
< hlp
->n_running
)
828 if (squid_curtime
- hlp
->last_queue_warn
< 600)
830 if (shutting_down
|| reconfiguring
)
832 hlp
->last_queue_warn
= squid_curtime
;
833 debug(14, 0) ("WARNING: All %s processes are busy.\n", hlp
->id_name
);
834 debug(14, 0) ("WARNING: %d pending requests queued\n", hlp
->stats
.queue_size
);
835 if (hlp
->stats
.queue_size
> hlp
->n_running
* 2)
836 fatalf("Too many queued %s requests", hlp
->id_name
);
837 debug(14, 1) ("Consider increasing the number of %s processes in your config file.\n", hlp
->id_name
);
841 StatefulServerEnqueue(helper_stateful_server
* srv
, helper_stateful_request
* r
)
843 dlink_node
*link
= memAllocate(MEM_DLINK_NODE
);
844 dlinkAddTail(r
, link
, &srv
->queue
);
845 /* XXX No queue length check here? */
849 static helper_request
*
850 Dequeue(helper
* hlp
)
853 helper_request
*r
= NULL
;
854 if ((link
= hlp
->queue
.head
)) {
856 dlinkDelete(link
, &hlp
->queue
);
857 memFree(link
, MEM_DLINK_NODE
);
858 hlp
->stats
.queue_size
--;
863 static helper_stateful_request
*
864 StatefulServerDequeue(helper_stateful_server
* srv
)
867 helper_stateful_request
*r
= NULL
;
868 if ((link
= srv
->queue
.head
)) {
870 dlinkDelete(link
, &srv
->queue
);
871 memFree(link
, MEM_DLINK_NODE
);
876 static helper_stateful_request
*
877 StatefulDequeue(statefulhelper
* hlp
)
880 helper_stateful_request
*r
= NULL
;
881 if ((link
= hlp
->queue
.head
)) {
883 dlinkDelete(link
, &hlp
->queue
);
884 memFree(link
, MEM_DLINK_NODE
);
885 hlp
->stats
.queue_size
--;
890 static helper_server
*
891 GetFirstAvailable(helper
* hlp
)
894 helper_server
*srv
= NULL
;
895 if (hlp
->n_running
== 0)
897 for (n
= hlp
->servers
.head
; n
!= NULL
; n
= n
->next
) {
901 if (!srv
->flags
.alive
)
908 static helper_stateful_server
*
909 StatefulGetFirstAvailable(statefulhelper
* hlp
)
912 helper_stateful_server
*srv
= NULL
;
913 debug(29, 5) ("StatefulGetFirstAvailable: Running servers %d.\n", hlp
->n_running
);
914 if (hlp
->n_running
== 0)
916 for (n
= hlp
->servers
.head
; n
!= NULL
; n
= n
->next
) {
920 if (srv
->flags
.reserved
== S_HELPER_RESERVED
)
922 if (!srv
->flags
.alive
)
924 if ((hlp
->IsAvailable
!= NULL
) && (srv
->data
!= NULL
) && !(hlp
->IsAvailable(srv
->data
)))
928 debug(29, 5) ("StatefulGetFirstAvailable: None available.\n");
934 helperDispatch(helper_server
* srv
, helper_request
* r
)
936 helper
*hlp
= srv
->parent
;
937 if (!cbdataValid(r
->data
)) {
938 debug(29, 1) ("helperDispatch: invalid callback data\n");
939 helperRequestFree(r
);
942 assert(!srv
->flags
.busy
);
945 srv
->dispatch_time
= current_time
;
950 NULL
, /* Handler-data */
952 commSetSelect(srv
->rfd
,
956 debug(29, 5) ("helperDispatch: Request sent to %s #%d, %d bytes\n",
957 hlp
->id_name
, srv
->index
+ 1, strlen(r
->buf
));
959 hlp
->stats
.requests
++;
963 helperStatefulDispatch(helper_stateful_server
* srv
, helper_stateful_request
* r
)
965 statefulhelper
*hlp
= srv
->parent
;
966 if (!cbdataValid(r
->data
)) {
967 debug(29, 1) ("helperStatefulDispatch: invalid callback data\n");
968 helperStatefulRequestFree(r
);
971 debug(29, 9) ("helperStatefulDispatch busying helper %s #%d\n", hlp
->id_name
, srv
->index
+ 1);
972 if (r
->placeholder
== 1) {
973 /* a callback is needed before this request can _use_ a helper. */
974 if (cbdataValid(r
->data
)) {
975 /* we don't care about releasing/deferring this helper. The request NEVER
976 * gets to the helper. So we throw away the return code */
977 r
->callback(r
->data
, srv
, NULL
);
978 /* throw away the placeholder */
979 helperStatefulRequestFree(r
);
980 /* and push the queue. Note that the callback may have call submit again -
981 * which is why we test for the request*/
982 if (srv
->request
== NULL
) {
983 if (srv
->flags
.shutdown
) {
984 comm_close(srv
->wfd
);
988 helperStatefulServerKickQueue(srv
);
990 helperStatefulKickQueue(hlp
);
998 srv
->dispatch_time
= current_time
;
1003 NULL
, /* Handler-data */
1005 commSetSelect(srv
->rfd
,
1007 helperStatefulHandleRead
,
1009 debug(29, 5) ("helperStatefulDispatch: Request sent to %s #%d, %d bytes\n",
1010 hlp
->id_name
, srv
->index
+ 1, strlen(r
->buf
));
1012 hlp
->stats
.requests
++;
1017 helperKickQueue(helper
* hlp
)
1021 while ((srv
= GetFirstAvailable(hlp
)) && (r
= Dequeue(hlp
)))
1022 helperDispatch(srv
, r
);
1026 helperStatefulKickQueue(statefulhelper
* hlp
)
1028 helper_stateful_request
*r
;
1029 helper_stateful_server
*srv
;
1030 while ((srv
= StatefulGetFirstAvailable(hlp
)) && (r
= StatefulDequeue(hlp
)))
1031 helperStatefulDispatch(srv
, r
);
1035 helperStatefulServerKickQueue(helper_stateful_server
* srv
)
1037 helper_stateful_request
*r
;
1038 if ((r
= StatefulServerDequeue(srv
)))
1039 helperStatefulDispatch(srv
, r
);
1043 helperRequestFree(helper_request
* r
)
1045 cbdataUnlock(r
->data
);
1047 memFree(r
, MEM_HELPER_REQUEST
);
1051 helperStatefulRequestFree(helper_stateful_request
* r
)
1053 cbdataUnlock(r
->data
);
1055 memFree(r
, MEM_HELPER_STATEFUL_REQUEST
);