]> git.ipfire.org Git - thirdparty/squid.git/blob - src/helper.cc
Updated copyright
[thirdparty/squid.git] / src / helper.cc
1
2 /*
3 * $Id: helper.cc,v 1.24 2001/01/12 00:37:18 wessels Exp $
4 *
5 * DEBUG: section 29 Helper process maintenance
6 * AUTHOR: Harvest Derived?
7 *
8 * SQUID Web Proxy Cache http://www.squid-cache.org/
9 * ----------------------------------------------------------
10 *
11 * Squid is the result of efforts by numerous individuals from
12 * the Internet community; see the CONTRIBUTORS file for full
13 * details. Many organizations have provided support for Squid's
14 * development; see the SPONSORS file for full details. Squid is
15 * Copyrighted (C) 2001 by the Regents of the University of
16 * California; see the COPYRIGHT file for full details. Squid
17 * incorporates software developed and/or copyrighted by other
18 * sources; see the CREDITS file for full details.
19 *
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
24 *
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
29 *
30 * You should have received a copy of the GNU General Public License
31 * along with this program; if not, write to the Free Software
32 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
33 *
34 */
35
36 #include "squid.h"
37
38 #define HELPER_MAX_ARGS 64
39
40 static PF helperHandleRead;
41 static PF helperStatefulHandleRead;
42 static PF helperServerFree;
43 static PF helperStatefulServerFree;
44 static void Enqueue(helper * hlp, helper_request *);
45 static helper_request *Dequeue(helper * hlp);
46 static helper_stateful_request *StatefulDequeue(statefulhelper * hlp);
47 static helper_server *GetFirstAvailable(helper * hlp);
48 static helper_stateful_server *StatefulGetFirstAvailable(statefulhelper * hlp);
49 static void helperDispatch(helper_server * srv, helper_request * r);
50 static void helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r);
51 static void helperKickQueue(helper * hlp);
52 static void helperStatefulKickQueue(statefulhelper * hlp);
53 static void helperRequestFree(helper_request * r);
54 static void helperStatefulRequestFree(helper_stateful_request * r);
55 static void StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r);
56 static helper_stateful_request *StatefulServerDequeue(helper_stateful_server * srv);
57 static void StatefulServerEnqueue(helper_stateful_server * srv, helper_stateful_request * r);
58 static void helperStatefulServerKickQueue(helper_stateful_server * srv);
59
60 void
61 helperOpenServers(helper * hlp)
62 {
63 char *s;
64 char *progname;
65 char *shortname;
66 char *procname;
67 char *args[HELPER_MAX_ARGS];
68 char fd_note_buf[FD_DESC_SZ];
69 helper_server *srv;
70 int nargs = 0;
71 int k;
72 int x;
73 int rfd;
74 int wfd;
75 wordlist *w;
76 if (hlp->cmdline == NULL)
77 return;
78 progname = hlp->cmdline->key;
79 if ((s = strrchr(progname, '/')))
80 shortname = xstrdup(s + 1);
81 else
82 shortname = xstrdup(progname);
83 debug(29, 1) ("helperOpenServers: Starting %d '%s' processes\n",
84 hlp->n_to_start, shortname);
85 procname = xmalloc(strlen(shortname) + 3);
86 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
87 args[nargs++] = procname;
88 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next)
89 args[nargs++] = w->key;
90 args[nargs++] = NULL;
91 assert(nargs <= HELPER_MAX_ARGS);
92 for (k = 0; k < hlp->n_to_start; k++) {
93 getCurrentTime();
94 rfd = wfd = -1;
95 x = ipcCreate(hlp->ipc_type,
96 progname,
97 args,
98 shortname,
99 &rfd,
100 &wfd);
101 if (x < 0) {
102 debug(29, 1) ("WARNING: Cannot run '%s' process.\n", progname);
103 continue;
104 }
105 hlp->n_running++;
106 srv = CBDATA_ALLOC(helper_server, NULL);
107 srv->flags.alive = 1;
108 srv->index = k;
109 srv->rfd = rfd;
110 srv->wfd = wfd;
111 srv->buf = memAllocate(MEM_8K_BUF);
112 srv->buf_sz = 8192;
113 srv->offset = 0;
114 srv->parent = hlp;
115 cbdataLock(hlp); /* lock because of the parent backlink */
116 dlinkAddTail(srv, &srv->link, &hlp->servers);
117 if (rfd == wfd) {
118 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
119 fd_note(rfd, fd_note_buf);
120 } else {
121 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
122 fd_note(rfd, fd_note_buf);
123 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
124 fd_note(wfd, fd_note_buf);
125 }
126 commSetNonBlocking(rfd);
127 if (wfd != rfd)
128 commSetNonBlocking(wfd);
129 comm_add_close_handler(rfd, helperServerFree, srv);
130 }
131 safe_free(shortname);
132 safe_free(procname);
133 helperKickQueue(hlp);
134 }
135
136 void
137 helperStatefulOpenServers(statefulhelper * hlp)
138 {
139 char *s;
140 char *progname;
141 char *shortname;
142 char *procname;
143 char *args[HELPER_MAX_ARGS];
144 char fd_note_buf[FD_DESC_SZ];
145 helper_stateful_server *srv;
146 int nargs = 0;
147 int k;
148 int x;
149 int rfd;
150 int wfd;
151 wordlist *w;
152 if (hlp->cmdline == NULL)
153 return;
154 progname = hlp->cmdline->key;
155 if ((s = strrchr(progname, '/')))
156 shortname = xstrdup(s + 1);
157 else
158 shortname = xstrdup(progname);
159 debug(29, 1) ("helperStatefulOpenServers: Starting %d '%s' processes\n",
160 hlp->n_to_start, shortname);
161 procname = xmalloc(strlen(shortname) + 3);
162 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
163 args[nargs++] = procname;
164 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next)
165 args[nargs++] = w->key;
166 args[nargs++] = NULL;
167 assert(nargs <= HELPER_MAX_ARGS);
168 for (k = 0; k < hlp->n_to_start; k++) {
169 getCurrentTime();
170 rfd = wfd = -1;
171 x = ipcCreate(hlp->ipc_type,
172 progname,
173 args,
174 shortname,
175 &rfd,
176 &wfd);
177 if (x < 0) {
178 debug(29, 1) ("WARNING: Cannot run '%s' process.\n", progname);
179 continue;
180 }
181 hlp->n_running++;
182 srv = CBDATA_ALLOC(helper_stateful_server, NULL);
183 srv->flags.alive = 1;
184 srv->flags.reserved = S_HELPER_FREE;
185 srv->deferred_requests = 0;
186 srv->index = k;
187 srv->rfd = rfd;
188 srv->wfd = wfd;
189 srv->buf = memAllocate(MEM_8K_BUF);
190 srv->buf_sz = 8192;
191 srv->offset = 0;
192 srv->parent = hlp;
193 if (hlp->datapool != NULL)
194 srv->data = memPoolAlloc(hlp->datapool);
195 cbdataLock(hlp); /* lock because of the parent backlink */
196 dlinkAddTail(srv, &srv->link, &hlp->servers);
197 if (rfd == wfd) {
198 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
199 fd_note(rfd, fd_note_buf);
200 } else {
201 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
202 fd_note(rfd, fd_note_buf);
203 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
204 fd_note(wfd, fd_note_buf);
205 }
206 commSetNonBlocking(rfd);
207 if (wfd != rfd)
208 commSetNonBlocking(wfd);
209 comm_add_close_handler(rfd, helperStatefulServerFree, srv);
210 }
211 safe_free(shortname);
212 safe_free(procname);
213 helperStatefulKickQueue(hlp);
214 }
215
216
217 void
218 helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
219 {
220 helper_request *r = memAllocate(MEM_HELPER_REQUEST);
221 helper_server *srv;
222 if (hlp == NULL) {
223 debug(29, 3) ("helperSubmit: hlp == NULL\n");
224 callback(data, NULL);
225 return;
226 }
227 r->callback = callback;
228 r->data = data;
229 r->buf = xstrdup(buf);
230 cbdataLock(r->data);
231 if ((srv = GetFirstAvailable(hlp)))
232 helperDispatch(srv, r);
233 else
234 Enqueue(hlp, r);
235 debug(29, 9) ("helperSubmit: %s\n", buf);
236 }
237
238 void
239 helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPSCB * callback, void *data, helper_stateful_server * lastserver)
240 {
241 helper_stateful_request *r = memAllocate(MEM_HELPER_STATEFUL_REQUEST);
242 helper_stateful_server *srv;
243 if (hlp == NULL) {
244 debug(29, 3) ("helperStatefulSubmit: hlp == NULL\n");
245 callback(data, 0, NULL);
246 return;
247 }
248 r->callback = callback;
249 r->data = data;
250 if (buf != NULL)
251 r->buf = xstrdup(buf);
252 else
253 r->placeholder = 1;
254 cbdataLock(r->data);
255 if ((buf != NULL) && lastserver) {
256 debug(29, 5) ("StatefulSubmit with lastserver %d\n", lastserver);
257 if (lastserver->flags.reserved != S_HELPER_RESERVED)
258 lastserver->deferred_requests--;
259 if (!(lastserver->request)) {
260 debug(29, 5) ("StatefulSubmit dispatching\n");
261 helperStatefulDispatch(lastserver, r);
262 } else {
263 debug(29, 5) ("StatefulSubmit queuing\n");
264 StatefulServerEnqueue(lastserver, r);
265 }
266 } else {
267 if ((srv = StatefulGetFirstAvailable(hlp))) {
268 helperStatefulDispatch(srv, r);
269 } else
270 StatefulEnqueue(hlp, r);
271 }
272 debug(29, 9) ("helperStatefulSubmit: placeholder: '%d', buf '%s'.\n", r->placeholder, buf);
273 }
274
275 helper_stateful_server *
276 helperStatefulDefer(statefulhelper * hlp)
277 /* find and add a deferred request to a server */
278 {
279 dlink_node *n;
280 helper_stateful_server *srv = NULL, *rv = NULL;
281 if (hlp == NULL) {
282 debug(29, 3) ("helperStatefulReserve: hlp == NULL\n");
283 return NULL;
284 }
285 debug(29, 5) ("helperStatefulDefer: Running servers %d.\n", hlp->n_running);
286 if (hlp->n_running == 0) {
287 debug(29, 1) ("helperStatefulDefer: No running servers!. \n");
288 return NULL;
289 }
290 srv = StatefulGetFirstAvailable(hlp);
291 /* all currently busy:loop through servers and find server with the shortest queue */
292 rv = srv;
293 if (rv == NULL)
294 for (n = hlp->servers.head; n != NULL; n = n->next) {
295 srv = n->data;
296 if (srv->flags.reserved == S_HELPER_RESERVED)
297 continue;
298 if (!srv->flags.alive)
299 continue;
300 if ((hlp->IsAvailable != NULL) && (srv->data != NULL) &&
301 !(hlp->IsAvailable(srv->data)))
302 continue;
303 if ((rv != NULL) && (rv->deferred_requests < srv->deferred_requests))
304 continue;
305 rv = srv;
306 }
307 if (rv == NULL) {
308 debug(29, 1) ("helperStatefulDefer: None available.\n");
309 return NULL;
310 }
311 rv->flags.reserved = S_HELPER_DEFERRED;
312 rv->deferred_requests++;
313 return rv;
314 }
315
316 void
317 helperStatefulReset(helper_stateful_server * srv)
318 /* puts this helper back in the queue. the calling app is required to
319 * manage the state in the helper.
320 */
321 {
322 statefulhelper *hlp = srv->parent;
323 helper_stateful_request *r;
324 r = srv->request;
325 if (r != NULL) {
326 /* reset attempt DURING an outstaning request */
327 debug(29, 1) ("helperStatefulReset: RESET During request %s \n",
328 hlp->id_name);
329 srv->flags.busy = 0;
330 srv->offset = 0;
331 helperStatefulRequestFree(r);
332 srv->request = NULL;
333 }
334 debug(29, 1) ("helperStatefulReset reset helper %s #%d\n", hlp->id_name, srv->index + 1);
335 srv->flags.busy = 0;
336 if (srv->queue.head) {
337 srv->flags.reserved = S_HELPER_DEFERRED;
338 helperStatefulServerKickQueue(srv);
339 } else {
340 srv->flags.reserved = S_HELPER_FREE;
341 if ((srv->parent->OnEmptyQueue != NULL) && (srv->data))
342 srv->parent->OnEmptyQueue(srv->data);
343 helperStatefulKickQueue(hlp);
344 }
345 }
346
347 void
348 helperStatefulReleaseServer(helper_stateful_server * srv)
349 /*decrease the number of 'waiting' clients that set the helper to be DEFERRED */
350 {
351 if (srv->deferred_requests > 0)
352 srv->deferred_requests--;
353 if (!(srv->deferred_requests) && (srv->flags.reserved == S_HELPER_DEFERRED) && !(srv->queue.head)) {
354 srv->flags.reserved = S_HELPER_FREE;
355 if ((srv->parent->OnEmptyQueue != NULL) && (srv->data))
356 srv->parent->OnEmptyQueue(srv->data);
357 }
358 }
359
360 void *
361 helperStatefulServerGetData(helper_stateful_server * srv)
362 /* return a pointer to the stateful routines data area */
363 {
364 return srv->data;
365 }
366
367 void
368 helperStats(StoreEntry * sentry, helper * hlp)
369 {
370 helper_server *srv;
371 dlink_node *link;
372 double tt;
373 storeAppendPrintf(sentry, "number running: %d of %d\n",
374 hlp->n_running, hlp->n_to_start);
375 storeAppendPrintf(sentry, "requests sent: %d\n",
376 hlp->stats.requests);
377 storeAppendPrintf(sentry, "replies received: %d\n",
378 hlp->stats.replies);
379 storeAppendPrintf(sentry, "queue length: %d\n",
380 hlp->stats.queue_size);
381 storeAppendPrintf(sentry, "avg service time: %d msec\n",
382 hlp->stats.avg_svc_time);
383 storeAppendPrintf(sentry, "\n");
384 storeAppendPrintf(sentry, "%7s\t%7s\t%11s\t%s\t%7s\t%7s\t%7s\n",
385 "#",
386 "FD",
387 "# Requests",
388 "Flags",
389 "Time",
390 "Offset",
391 "Request");
392 for (link = hlp->servers.head; link; link = link->next) {
393 srv = link->data;
394 tt = 0.001 * tvSubMsec(srv->dispatch_time, current_time);
395 storeAppendPrintf(sentry, "%7d\t%7d\t%11d\t%c%c%c%c\t%7.3f\t%7d\t%s\n",
396 srv->index + 1,
397 srv->rfd,
398 srv->stats.uses,
399 srv->flags.alive ? 'A' : ' ',
400 srv->flags.busy ? 'B' : ' ',
401 srv->flags.closing ? 'C' : ' ',
402 srv->flags.shutdown ? 'S' : ' ',
403 tt < 0.0 ? 0.0 : tt,
404 (int) srv->offset,
405 srv->request ? log_quote(srv->request->buf) : "(none)");
406 }
407 storeAppendPrintf(sentry, "\nFlags key:\n\n");
408 storeAppendPrintf(sentry, " A = ALIVE\n");
409 storeAppendPrintf(sentry, " B = BUSY\n");
410 storeAppendPrintf(sentry, " C = CLOSING\n");
411 storeAppendPrintf(sentry, " S = SHUTDOWN\n");
412 }
413
414 void
415 helperStatefulStats(StoreEntry * sentry, statefulhelper * hlp)
416 {
417 helper_stateful_server *srv;
418 dlink_node *link;
419 double tt;
420 storeAppendPrintf(sentry, "number running: %d of %d\n",
421 hlp->n_running, hlp->n_to_start);
422 storeAppendPrintf(sentry, "requests sent: %d\n",
423 hlp->stats.requests);
424 storeAppendPrintf(sentry, "replies received: %d\n",
425 hlp->stats.replies);
426 storeAppendPrintf(sentry, "queue length: %d\n",
427 hlp->stats.queue_size);
428 storeAppendPrintf(sentry, "avg service time: %d msec\n",
429 hlp->stats.avg_svc_time);
430 storeAppendPrintf(sentry, "\n");
431 storeAppendPrintf(sentry, "%7s\t%7s\t%11s\t%s\t%7s\t%7s\t%7s\n",
432 "#",
433 "FD",
434 "# Requests",
435 "# Deferred Requests",
436 "Flags",
437 "Time",
438 "Offset",
439 "Request");
440 for (link = hlp->servers.head; link; link = link->next) {
441 srv = link->data;
442 tt = 0.001 * tvSubMsec(srv->dispatch_time, current_time);
443 storeAppendPrintf(sentry, "%7d\t%7d\t%11d\t%11d\t%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
444 srv->index + 1,
445 srv->rfd,
446 srv->stats.uses,
447 srv->deferred_requests,
448 srv->flags.alive ? 'A' : ' ',
449 srv->flags.busy ? 'B' : ' ',
450 srv->flags.closing ? 'C' : ' ',
451 srv->flags.reserved != S_HELPER_FREE ? 'R' : ' ',
452 srv->flags.shutdown ? 'S' : ' ',
453 tt < 0.0 ? 0.0 : tt,
454 (int) srv->offset,
455 srv->request ? log_quote(srv->request->buf) : "(none)");
456 }
457 storeAppendPrintf(sentry, "\nFlags key:\n\n");
458 storeAppendPrintf(sentry, " A = ALIVE\n");
459 storeAppendPrintf(sentry, " B = BUSY\n");
460 storeAppendPrintf(sentry, " C = CLOSING\n");
461 storeAppendPrintf(sentry, " R = RESERVED or DEFERRED\n");
462 storeAppendPrintf(sentry, " S = SHUTDOWN\n");
463 }
464
465 void
466 helperShutdown(helper * hlp)
467 {
468 dlink_node *link = hlp->servers.head;
469 helper_server *srv;
470 while (link) {
471 srv = link->data;
472 link = link->next;
473 if (!srv->flags.alive) {
474 debug(34, 3) ("helperShutdown: %s #%d is NOT ALIVE.\n",
475 hlp->id_name, srv->index + 1);
476 continue;
477 }
478 srv->flags.shutdown = 1; /* request it to shut itself down */
479 if (srv->flags.busy) {
480 debug(34, 3) ("helperShutdown: %s #%d is BUSY.\n",
481 hlp->id_name, srv->index + 1);
482 continue;
483 }
484 if (srv->flags.closing) {
485 debug(34, 3) ("helperShutdown: %s #%d is CLOSING.\n",
486 hlp->id_name, srv->index + 1);
487 continue;
488 }
489 srv->flags.closing = 1;
490 comm_close(srv->wfd);
491 srv->wfd = -1;
492 }
493 }
494
495 void
496 helperStatefulShutdown(statefulhelper * hlp)
497 {
498 dlink_node *link = hlp->servers.head;
499 helper_stateful_server *srv;
500 while (link) {
501 srv = link->data;
502 link = link->next;
503 if (!srv->flags.alive) {
504 debug(34, 3) ("helperStatefulShutdown: %s #%d is NOT ALIVE.\n",
505 hlp->id_name, srv->index + 1);
506 continue;
507 }
508 srv->flags.shutdown = 1; /* request it to shut itself down */
509 if (srv->flags.busy) {
510 debug(34, 3) ("helperStatefulShutdown: %s #%d is BUSY.\n",
511 hlp->id_name, srv->index + 1);
512 continue;
513 }
514 if (srv->flags.closing) {
515 debug(34, 3) ("helperStatefulShutdown: %s #%d is CLOSING.\n",
516 hlp->id_name, srv->index + 1);
517 continue;
518 }
519 if (srv->flags.reserved != S_HELPER_FREE) {
520 debug(34, 3) ("helperStatefulShutdown: %s #%d is RESERVED.\n",
521 hlp->id_name, srv->index + 1);
522 continue;
523 }
524 if (srv->deferred_requests) {
525 debug(34, 3) ("helperStatefulShutdown: %s #%d has DEFERRED requests.\n",
526 hlp->id_name, srv->index + 1);
527 continue;
528 }
529 srv->flags.closing = 1;
530 comm_close(srv->wfd);
531 srv->wfd = -1;
532 }
533 }
534
535
536 helper *
537 helperCreate(const char *name)
538 {
539 helper *hlp;
540 hlp = CBDATA_ALLOC(helper, NULL);
541 hlp->id_name = name;
542 return hlp;
543 }
544
545 statefulhelper *
546 helperStatefulCreate(const char *name)
547 {
548 statefulhelper *hlp;
549 hlp = CBDATA_ALLOC(statefulhelper, NULL);
550 hlp->id_name = name;
551 return hlp;
552 }
553
554
555 void
556 helperFree(helper * hlp)
557 {
558 if (!hlp)
559 return;
560 /* note, don't free hlp->name, it probably points to static memory */
561 if (hlp->queue.head)
562 debug(29, 0) ("WARNING: freeing %s helper with %d requests queued\n",
563 hlp->id_name, hlp->stats.queue_size);
564 cbdataFree(hlp);
565 }
566
567 void
568 helperStatefulFree(statefulhelper * hlp)
569 {
570 if (!hlp)
571 return;
572 /* note, don't free hlp->name, it probably points to static memory */
573 if (hlp->queue.head)
574 debug(29, 0) ("WARNING: freeing %s helper with %d requests queued\n",
575 hlp->id_name, hlp->stats.queue_size);
576 cbdataFree(hlp);
577 }
578
579
580 /* ====================================================================== */
581 /* LOCAL FUNCTIONS */
582 /* ====================================================================== */
583
584 static void
585 helperServerFree(int fd, void *data)
586 {
587 helper_server *srv = data;
588 helper *hlp = srv->parent;
589 helper_request *r;
590 assert(srv->rfd == fd);
591 if (srv->buf) {
592 memFree(srv->buf, MEM_8K_BUF);
593 srv->buf = NULL;
594 }
595 if ((r = srv->request)) {
596 if (cbdataValid(r->data))
597 r->callback(r->data, srv->buf);
598 helperRequestFree(r);
599 srv->request = NULL;
600 }
601 if (srv->wfd != srv->rfd && srv->wfd != -1)
602 comm_close(srv->wfd);
603 dlinkDelete(&srv->link, &hlp->servers);
604 hlp->n_running--;
605 assert(hlp->n_running >= 0);
606 if (!srv->flags.shutdown) {
607 debug(34, 0) ("WARNING: %s #%d (FD %d) exited\n",
608 hlp->id_name, srv->index + 1, fd);
609 if (hlp->n_running < hlp->n_to_start / 2)
610 fatalf("Too few %s processes are running", hlp->id_name);
611 }
612 cbdataUnlock(srv->parent);
613 cbdataFree(srv);
614 }
615
616 static void
617 helperStatefulServerFree(int fd, void *data)
618 {
619 helper_stateful_server *srv = data;
620 statefulhelper *hlp = srv->parent;
621 helper_stateful_request *r;
622 assert(srv->rfd == fd);
623 if (srv->buf) {
624 memFree(srv->buf, MEM_8K_BUF);
625 srv->buf = NULL;
626 }
627 if ((r = srv->request)) {
628 if (cbdataValid(r->data))
629 r->callback(r->data, srv, srv->buf);
630 helperStatefulRequestFree(r);
631 srv->request = NULL;
632 }
633 if (srv->wfd != srv->rfd && srv->wfd != -1)
634 comm_close(srv->wfd);
635 dlinkDelete(&srv->link, &hlp->servers);
636 hlp->n_running--;
637 assert(hlp->n_running >= 0);
638 if (!srv->flags.shutdown) {
639 debug(34, 0) ("WARNING: %s #%d (FD %d) exited\n",
640 hlp->id_name, srv->index + 1, fd);
641 if (hlp->n_running < hlp->n_to_start / 2)
642 fatalf("Too few %s processes are running", hlp->id_name);
643 }
644 if (srv->data != NULL)
645 memPoolFree(hlp->datapool, srv->data);
646 cbdataUnlock(srv->parent);
647 cbdataFree(srv);
648 }
649
650
651 static void
652 helperHandleRead(int fd, void *data)
653 {
654 int len;
655 char *t = NULL;
656 helper_server *srv = data;
657 helper_request *r;
658 helper *hlp = srv->parent;
659 assert(fd == srv->rfd);
660 assert(cbdataValid(data));
661 statCounter.syscalls.sock.reads++;
662 len = read(fd, srv->buf + srv->offset, srv->buf_sz - srv->offset);
663 fd_bytes(fd, len, FD_READ);
664 debug(29, 5) ("helperHandleRead: %d bytes from %s #%d.\n",
665 len, hlp->id_name, srv->index + 1);
666 if (len <= 0) {
667 if (len < 0)
668 debug(50, 1) ("helperHandleRead: FD %d read: %s\n", fd, xstrerror());
669 comm_close(fd);
670 return;
671 }
672 srv->offset += len;
673 srv->buf[srv->offset] = '\0';
674 r = srv->request;
675 if (r == NULL) {
676 /* someone spoke without being spoken to */
677 debug(29, 1) ("helperHandleRead: unexpected read from %s #%d, %d bytes\n",
678 hlp->id_name, srv->index + 1, len);
679 srv->offset = 0;
680 } else if ((t = strchr(srv->buf, '\n'))) {
681 /* end of reply found */
682 debug(29, 3) ("helperHandleRead: end of reply found\n");
683 *t = '\0';
684 if (cbdataValid(r->data))
685 r->callback(r->data, srv->buf);
686 srv->flags.busy = 0;
687 srv->offset = 0;
688 helperRequestFree(r);
689 srv->request = NULL;
690 hlp->stats.replies++;
691 hlp->stats.avg_svc_time =
692 intAverage(hlp->stats.avg_svc_time,
693 tvSubMsec(srv->dispatch_time, current_time),
694 hlp->stats.replies, REDIRECT_AV_FACTOR);
695 if (srv->flags.shutdown) {
696 comm_close(srv->wfd);
697 srv->wfd = -1;
698 } else
699 helperKickQueue(hlp);
700 } else {
701 commSetSelect(srv->rfd, COMM_SELECT_READ, helperHandleRead, srv, 0);
702 }
703 }
704
705 static void
706 helperStatefulHandleRead(int fd, void *data)
707 {
708 int len;
709 char *t = NULL;
710 helper_stateful_server *srv = data;
711 helper_stateful_request *r;
712 statefulhelper *hlp = srv->parent;
713 assert(fd == srv->rfd);
714 assert(cbdataValid(data));
715 statCounter.syscalls.sock.reads++;
716 len = read(fd, srv->buf + srv->offset, srv->buf_sz - srv->offset);
717 fd_bytes(fd, len, FD_READ);
718 debug(29, 5) ("helperStatefulHandleRead: %d bytes from %s #%d.\n",
719 len, hlp->id_name, srv->index + 1);
720 if (len <= 0) {
721 if (len < 0)
722 debug(50, 1) ("helperStatefulHandleRead: FD %d read: %s\n", fd, xstrerror());
723 comm_close(fd);
724 return;
725 }
726 srv->offset += len;
727 srv->buf[srv->offset] = '\0';
728 r = srv->request;
729 if (r == NULL) {
730 /* someone spoke without being spoken to */
731 debug(29, 1) ("helperStatefulHandleRead: unexpected read from %s #%d, %d bytes\n",
732 hlp->id_name, srv->index + 1, len);
733 srv->offset = 0;
734 } else if ((t = strchr(srv->buf, '\n'))) {
735 /* end of reply found */
736 debug(29, 3) ("helperStatefulHandleRead: end of reply found\n");
737 *t = '\0';
738 if (cbdataValid(r->data)) {
739 switch ((r->callback(r->data, srv, srv->buf))) { /*if non-zero reserve helper */
740 case S_HELPER_UNKNOWN:
741 fatal("helperStatefulHandleRead: either a non-state aware callback was give to the stateful helper routines, or an uninitialised callback response was recieved.\n");
742 break;
743 case S_HELPER_RELEASE: /* helper finished with */
744 if (!srv->queue.head) {
745 srv->flags.reserved = S_HELPER_FREE;
746 if ((srv->parent->OnEmptyQueue != NULL) && (srv->data))
747 srv->parent->OnEmptyQueue(srv->data);
748 debug(29, 5) ("StatefulHandleRead: releasing %s #%d\n", hlp->id_name, srv->index + 1);
749 } else {
750 srv->flags.reserved = S_HELPER_DEFERRED;
751 debug(29, 5) ("StatefulHandleRead: outstanding deferred requests on %s #%d. reserving for deferred requests.\n", hlp->id_name, srv->index + 1);
752 }
753 break;
754 case S_HELPER_RESERVE: /* 'pin' this helper for the caller */
755 if (!srv->queue.head) {
756 srv->flags.reserved = S_HELPER_RESERVED;
757 debug(29, 5) ("StatefulHandleRead: reserving %s #%d\n", hlp->id_name, srv->index + 1);
758 } else {
759 fatal("StatefulHandleRead: Callback routine attempted to reserve a stateful helper with deferred requests. This can lead to deadlock.\n");
760 }
761 break;
762 case S_HELPER_DEFER:
763 /* the helper is still needed, but can
764 * be used for other requests in the meantime.
765 */
766 srv->flags.reserved = S_HELPER_DEFERRED;
767 srv->deferred_requests++;
768 debug(29, 5) ("StatefulHandleRead: reserving %s #%d for deferred requests.\n", hlp->id_name, srv->index + 1);
769 break;
770 default:
771 fatal("helperStatefulHandleRead: unknown stateful helper callback result.\n");
772 }
773
774 } else {
775 debug(29, 1) ("StatefulHandleRead: no callback data registered\n");
776 }
777 srv->flags.busy = 0;
778 srv->offset = 0;
779 helperStatefulRequestFree(r);
780 srv->request = NULL;
781 hlp->stats.replies++;
782 hlp->stats.avg_svc_time =
783 intAverage(hlp->stats.avg_svc_time,
784 tvSubMsec(srv->dispatch_time, current_time),
785 hlp->stats.replies, REDIRECT_AV_FACTOR);
786 if (srv->flags.shutdown) {
787 comm_close(srv->wfd);
788 srv->wfd = -1;
789 } else {
790 if (srv->queue.head)
791 helperStatefulServerKickQueue(srv);
792 else
793 helperStatefulKickQueue(hlp);
794 }
795 } else {
796 commSetSelect(srv->rfd, COMM_SELECT_READ, helperStatefulHandleRead, srv, 0);
797 }
798 }
799
800 static void
801 Enqueue(helper * hlp, helper_request * r)
802 {
803 dlink_node *link = memAllocate(MEM_DLINK_NODE);
804 dlinkAddTail(r, link, &hlp->queue);
805 hlp->stats.queue_size++;
806 if (hlp->stats.queue_size < hlp->n_running)
807 return;
808 if (squid_curtime - hlp->last_queue_warn < 600)
809 return;
810 if (shutting_down || reconfiguring)
811 return;
812 hlp->last_queue_warn = squid_curtime;
813 debug(14, 0) ("WARNING: All %s processes are busy.\n", hlp->id_name);
814 debug(14, 0) ("WARNING: %d pending requests queued\n", hlp->stats.queue_size);
815 if (hlp->stats.queue_size > hlp->n_running * 2)
816 fatalf("Too many queued %s requests", hlp->id_name);
817 debug(14, 1) ("Consider increasing the number of %s processes in your config file.\n", hlp->id_name);
818 }
819
820 static void
821 StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r)
822 {
823 dlink_node *link = memAllocate(MEM_DLINK_NODE);
824 dlinkAddTail(r, link, &hlp->queue);
825 hlp->stats.queue_size++;
826 if (hlp->stats.queue_size < hlp->n_running)
827 return;
828 if (squid_curtime - hlp->last_queue_warn < 600)
829 return;
830 if (shutting_down || reconfiguring)
831 return;
832 hlp->last_queue_warn = squid_curtime;
833 debug(14, 0) ("WARNING: All %s processes are busy.\n", hlp->id_name);
834 debug(14, 0) ("WARNING: %d pending requests queued\n", hlp->stats.queue_size);
835 if (hlp->stats.queue_size > hlp->n_running * 2)
836 fatalf("Too many queued %s requests", hlp->id_name);
837 debug(14, 1) ("Consider increasing the number of %s processes in your config file.\n", hlp->id_name);
838 }
839
840 static void
841 StatefulServerEnqueue(helper_stateful_server * srv, helper_stateful_request * r)
842 {
843 dlink_node *link = memAllocate(MEM_DLINK_NODE);
844 dlinkAddTail(r, link, &srv->queue);
845 /* XXX No queue length check here? */
846 }
847
848
849 static helper_request *
850 Dequeue(helper * hlp)
851 {
852 dlink_node *link;
853 helper_request *r = NULL;
854 if ((link = hlp->queue.head)) {
855 r = link->data;
856 dlinkDelete(link, &hlp->queue);
857 memFree(link, MEM_DLINK_NODE);
858 hlp->stats.queue_size--;
859 }
860 return r;
861 }
862
863 static helper_stateful_request *
864 StatefulServerDequeue(helper_stateful_server * srv)
865 {
866 dlink_node *link;
867 helper_stateful_request *r = NULL;
868 if ((link = srv->queue.head)) {
869 r = link->data;
870 dlinkDelete(link, &srv->queue);
871 memFree(link, MEM_DLINK_NODE);
872 }
873 return r;
874 }
875
876 static helper_stateful_request *
877 StatefulDequeue(statefulhelper * hlp)
878 {
879 dlink_node *link;
880 helper_stateful_request *r = NULL;
881 if ((link = hlp->queue.head)) {
882 r = link->data;
883 dlinkDelete(link, &hlp->queue);
884 memFree(link, MEM_DLINK_NODE);
885 hlp->stats.queue_size--;
886 }
887 return r;
888 }
889
890 static helper_server *
891 GetFirstAvailable(helper * hlp)
892 {
893 dlink_node *n;
894 helper_server *srv = NULL;
895 if (hlp->n_running == 0)
896 return NULL;
897 for (n = hlp->servers.head; n != NULL; n = n->next) {
898 srv = n->data;
899 if (srv->flags.busy)
900 continue;
901 if (!srv->flags.alive)
902 continue;
903 return srv;
904 }
905 return NULL;
906 }
907
908 static helper_stateful_server *
909 StatefulGetFirstAvailable(statefulhelper * hlp)
910 {
911 dlink_node *n;
912 helper_stateful_server *srv = NULL;
913 debug(29, 5) ("StatefulGetFirstAvailable: Running servers %d.\n", hlp->n_running);
914 if (hlp->n_running == 0)
915 return NULL;
916 for (n = hlp->servers.head; n != NULL; n = n->next) {
917 srv = n->data;
918 if (srv->flags.busy)
919 continue;
920 if (srv->flags.reserved == S_HELPER_RESERVED)
921 continue;
922 if (!srv->flags.alive)
923 continue;
924 if ((hlp->IsAvailable != NULL) && (srv->data != NULL) && !(hlp->IsAvailable(srv->data)))
925 continue;
926 return srv;
927 }
928 debug(29, 5) ("StatefulGetFirstAvailable: None available.\n");
929 return NULL;
930 }
931
932
933 static void
934 helperDispatch(helper_server * srv, helper_request * r)
935 {
936 helper *hlp = srv->parent;
937 if (!cbdataValid(r->data)) {
938 debug(29, 1) ("helperDispatch: invalid callback data\n");
939 helperRequestFree(r);
940 return;
941 }
942 assert(!srv->flags.busy);
943 srv->flags.busy = 1;
944 srv->request = r;
945 srv->dispatch_time = current_time;
946 comm_write(srv->wfd,
947 r->buf,
948 strlen(r->buf),
949 NULL, /* Handler */
950 NULL, /* Handler-data */
951 NULL); /* free */
952 commSetSelect(srv->rfd,
953 COMM_SELECT_READ,
954 helperHandleRead,
955 srv, 0);
956 debug(29, 5) ("helperDispatch: Request sent to %s #%d, %d bytes\n",
957 hlp->id_name, srv->index + 1, strlen(r->buf));
958 srv->stats.uses++;
959 hlp->stats.requests++;
960 }
961
962 static void
963 helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r)
964 {
965 statefulhelper *hlp = srv->parent;
966 if (!cbdataValid(r->data)) {
967 debug(29, 1) ("helperStatefulDispatch: invalid callback data\n");
968 helperStatefulRequestFree(r);
969 return;
970 }
971 debug(29, 9) ("helperStatefulDispatch busying helper %s #%d\n", hlp->id_name, srv->index + 1);
972 if (r->placeholder == 1) {
973 /* a callback is needed before this request can _use_ a helper. */
974 if (cbdataValid(r->data)) {
975 /* we don't care about releasing/deferring this helper. The request NEVER
976 * gets to the helper. So we throw away the return code */
977 r->callback(r->data, srv, NULL);
978 /* throw away the placeholder */
979 helperStatefulRequestFree(r);
980 /* and push the queue. Note that the callback may have call submit again -
981 * which is why we test for the request*/
982 if (srv->request == NULL) {
983 if (srv->flags.shutdown) {
984 comm_close(srv->wfd);
985 srv->wfd = -1;
986 } else {
987 if (srv->queue.head)
988 helperStatefulServerKickQueue(srv);
989 else
990 helperStatefulKickQueue(hlp);
991 }
992 }
993 }
994 return;
995 }
996 srv->flags.busy = 1;
997 srv->request = r;
998 srv->dispatch_time = current_time;
999 comm_write(srv->wfd,
1000 r->buf,
1001 strlen(r->buf),
1002 NULL, /* Handler */
1003 NULL, /* Handler-data */
1004 NULL); /* free */
1005 commSetSelect(srv->rfd,
1006 COMM_SELECT_READ,
1007 helperStatefulHandleRead,
1008 srv, 0);
1009 debug(29, 5) ("helperStatefulDispatch: Request sent to %s #%d, %d bytes\n",
1010 hlp->id_name, srv->index + 1, strlen(r->buf));
1011 srv->stats.uses++;
1012 hlp->stats.requests++;
1013 }
1014
1015
1016 static void
1017 helperKickQueue(helper * hlp)
1018 {
1019 helper_request *r;
1020 helper_server *srv;
1021 while ((srv = GetFirstAvailable(hlp)) && (r = Dequeue(hlp)))
1022 helperDispatch(srv, r);
1023 }
1024
1025 static void
1026 helperStatefulKickQueue(statefulhelper * hlp)
1027 {
1028 helper_stateful_request *r;
1029 helper_stateful_server *srv;
1030 while ((srv = StatefulGetFirstAvailable(hlp)) && (r = StatefulDequeue(hlp)))
1031 helperStatefulDispatch(srv, r);
1032 }
1033
1034 static void
1035 helperStatefulServerKickQueue(helper_stateful_server * srv)
1036 {
1037 helper_stateful_request *r;
1038 if ((r = StatefulServerDequeue(srv)))
1039 helperStatefulDispatch(srv, r);
1040 }
1041
1042 static void
1043 helperRequestFree(helper_request * r)
1044 {
1045 cbdataUnlock(r->data);
1046 xfree(r->buf);
1047 memFree(r, MEM_HELPER_REQUEST);
1048 }
1049
1050 static void
1051 helperStatefulRequestFree(helper_stateful_request * r)
1052 {
1053 cbdataUnlock(r->data);
1054 xfree(r->buf);
1055 memFree(r, MEM_HELPER_STATEFUL_REQUEST);
1056 }