]> git.ipfire.org Git - thirdparty/squid.git/blame - src/helper.cc
consistent formatting
[thirdparty/squid.git] / src / helper.cc
CommitLineData
f740a279 1
2/*
5dae8514 3 * $Id: helper.cc,v 1.23 2001/01/11 00:01:53 hno Exp $
f740a279 4 *
5 * DEBUG: section 29 Helper process maintenance
6 * AUTHOR: Harvest Derived?
7 *
8 * SQUID Internet Object Cache http://squid.nlanr.net/Squid/
9 * ----------------------------------------------------------
10 *
11 * Squid is the result of efforts by numerous individuals from the
12 * Internet community. Development is led by Duane Wessels of the
13 * National Laboratory for Applied Network Research and funded by the
14 * National Science Foundation. Squid is Copyrighted (C) 1998 by
15 * the Regents of the University of California. Please see the
16 * COPYRIGHT file for full details. Squid incorporates software
17 * developed and/or copyrighted by other sources. Please see the
18 * CREDITS file for full details.
19 *
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
24 *
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
29 *
30 * You should have received a copy of the GNU General Public License
31 * along with this program; if not, write to the Free Software
32 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
33 *
34 */
35
74addf6c 36#include "squid.h"
37
38#define HELPER_MAX_ARGS 64
39
40static PF helperHandleRead;
94439e4e 41static PF helperStatefulHandleRead;
1f5f60dd 42static PF helperServerFree;
94439e4e 43static PF helperStatefulServerFree;
74addf6c 44static void Enqueue(helper * hlp, helper_request *);
45static helper_request *Dequeue(helper * hlp);
94439e4e 46static helper_stateful_request *StatefulDequeue(statefulhelper * hlp);
74addf6c 47static helper_server *GetFirstAvailable(helper * hlp);
94439e4e 48static helper_stateful_server *StatefulGetFirstAvailable(statefulhelper * hlp);
74addf6c 49static void helperDispatch(helper_server * srv, helper_request * r);
94439e4e 50static void helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r);
74addf6c 51static void helperKickQueue(helper * hlp);
94439e4e 52static void helperStatefulKickQueue(statefulhelper * hlp);
74addf6c 53static void helperRequestFree(helper_request * r);
94439e4e 54static void helperStatefulRequestFree(helper_stateful_request * r);
55static void StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r);
56static helper_stateful_request *StatefulServerDequeue(helper_stateful_server * srv);
57static void StatefulServerEnqueue(helper_stateful_server * srv, helper_stateful_request * r);
58static void helperStatefulServerKickQueue(helper_stateful_server * srv);
74addf6c 59
60void
61helperOpenServers(helper * hlp)
62{
63 char *s;
64 char *progname;
65 char *shortname;
66 char *procname;
67 char *args[HELPER_MAX_ARGS];
68 char fd_note_buf[FD_DESC_SZ];
69 helper_server *srv;
70 int nargs = 0;
71 int k;
72 int x;
73 int rfd;
74 int wfd;
75 wordlist *w;
76 if (hlp->cmdline == NULL)
77 return;
78 progname = hlp->cmdline->key;
74addf6c 79 if ((s = strrchr(progname, '/')))
80 shortname = xstrdup(s + 1);
81 else
82 shortname = xstrdup(progname);
83 debug(29, 1) ("helperOpenServers: Starting %d '%s' processes\n",
84 hlp->n_to_start, shortname);
85 procname = xmalloc(strlen(shortname) + 3);
86 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
87 args[nargs++] = procname;
88 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next)
89 args[nargs++] = w->key;
90 args[nargs++] = NULL;
91 assert(nargs <= HELPER_MAX_ARGS);
92 for (k = 0; k < hlp->n_to_start; k++) {
c68e9c6b 93 getCurrentTime();
74addf6c 94 rfd = wfd = -1;
95 x = ipcCreate(hlp->ipc_type,
96 progname,
97 args,
98 shortname,
99 &rfd,
100 &wfd);
101 if (x < 0) {
102 debug(29, 1) ("WARNING: Cannot run '%s' process.\n", progname);
103 continue;
104 }
105 hlp->n_running++;
28c60158 106 srv = CBDATA_ALLOC(helper_server, NULL);
74addf6c 107 srv->flags.alive = 1;
108 srv->index = k;
109 srv->rfd = rfd;
110 srv->wfd = wfd;
111 srv->buf = memAllocate(MEM_8K_BUF);
112 srv->buf_sz = 8192;
113 srv->offset = 0;
114 srv->parent = hlp;
1f5f60dd 115 cbdataLock(hlp); /* lock because of the parent backlink */
74addf6c 116 dlinkAddTail(srv, &srv->link, &hlp->servers);
117 if (rfd == wfd) {
118 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
119 fd_note(rfd, fd_note_buf);
120 } else {
121 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
122 fd_note(rfd, fd_note_buf);
123 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
124 fd_note(wfd, fd_note_buf);
125 }
126 commSetNonBlocking(rfd);
127 if (wfd != rfd)
128 commSetNonBlocking(wfd);
1f5f60dd 129 comm_add_close_handler(rfd, helperServerFree, srv);
74addf6c 130 }
131 safe_free(shortname);
132 safe_free(procname);
838b993c 133 helperKickQueue(hlp);
74addf6c 134}
135
94439e4e 136void
137helperStatefulOpenServers(statefulhelper * hlp)
138{
139 char *s;
140 char *progname;
141 char *shortname;
142 char *procname;
143 char *args[HELPER_MAX_ARGS];
144 char fd_note_buf[FD_DESC_SZ];
145 helper_stateful_server *srv;
146 int nargs = 0;
147 int k;
148 int x;
149 int rfd;
150 int wfd;
151 wordlist *w;
152 if (hlp->cmdline == NULL)
153 return;
154 progname = hlp->cmdline->key;
155 if ((s = strrchr(progname, '/')))
156 shortname = xstrdup(s + 1);
157 else
158 shortname = xstrdup(progname);
159 debug(29, 1) ("helperStatefulOpenServers: Starting %d '%s' processes\n",
160 hlp->n_to_start, shortname);
161 procname = xmalloc(strlen(shortname) + 3);
162 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
163 args[nargs++] = procname;
164 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next)
165 args[nargs++] = w->key;
166 args[nargs++] = NULL;
167 assert(nargs <= HELPER_MAX_ARGS);
168 for (k = 0; k < hlp->n_to_start; k++) {
169 getCurrentTime();
170 rfd = wfd = -1;
171 x = ipcCreate(hlp->ipc_type,
172 progname,
173 args,
174 shortname,
175 &rfd,
176 &wfd);
177 if (x < 0) {
178 debug(29, 1) ("WARNING: Cannot run '%s' process.\n", progname);
179 continue;
180 }
181 hlp->n_running++;
182 srv = CBDATA_ALLOC(helper_stateful_server, NULL);
183 srv->flags.alive = 1;
184 srv->flags.reserved = S_HELPER_FREE;
185 srv->deferred_requests = 0;
186 srv->index = k;
187 srv->rfd = rfd;
188 srv->wfd = wfd;
189 srv->buf = memAllocate(MEM_8K_BUF);
190 srv->buf_sz = 8192;
191 srv->offset = 0;
192 srv->parent = hlp;
193 if (hlp->datapool != NULL)
194 srv->data = memPoolAlloc(hlp->datapool);
195 cbdataLock(hlp); /* lock because of the parent backlink */
196 dlinkAddTail(srv, &srv->link, &hlp->servers);
197 if (rfd == wfd) {
198 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
199 fd_note(rfd, fd_note_buf);
200 } else {
201 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
202 fd_note(rfd, fd_note_buf);
203 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
204 fd_note(wfd, fd_note_buf);
205 }
206 commSetNonBlocking(rfd);
207 if (wfd != rfd)
208 commSetNonBlocking(wfd);
209 comm_add_close_handler(rfd, helperStatefulServerFree, srv);
210 }
211 safe_free(shortname);
212 safe_free(procname);
213 helperStatefulKickQueue(hlp);
214}
215
216
74addf6c 217void
218helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
219{
c68e9c6b 220 helper_request *r = memAllocate(MEM_HELPER_REQUEST);
74addf6c 221 helper_server *srv;
5b5f9257 222 if (hlp == NULL) {
7d47d8e6 223 debug(29, 3) ("helperSubmit: hlp == NULL\n");
5b5f9257 224 callback(data, NULL);
225 return;
226 }
74addf6c 227 r->callback = callback;
228 r->data = data;
229 r->buf = xstrdup(buf);
230 cbdataLock(r->data);
231 if ((srv = GetFirstAvailable(hlp)))
232 helperDispatch(srv, r);
233 else
234 Enqueue(hlp, r);
94439e4e 235 debug(29, 9) ("helperSubmit: %s\n", buf);
236}
237
238void
239helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPSCB * callback, void *data, helper_stateful_server * lastserver)
240{
241 helper_stateful_request *r = memAllocate(MEM_HELPER_STATEFUL_REQUEST);
242 helper_stateful_server *srv;
243 if (hlp == NULL) {
244 debug(29, 3) ("helperStatefulSubmit: hlp == NULL\n");
245 callback(data, 0, NULL);
246 return;
247 }
248 r->callback = callback;
249 r->data = data;
250 if (buf != NULL)
251 r->buf = xstrdup(buf);
252 else
253 r->placeholder = 1;
254 cbdataLock(r->data);
255 if ((buf != NULL) && lastserver) {
256 debug(29, 5) ("StatefulSubmit with lastserver %d\n", lastserver);
257 if (lastserver->flags.reserved != S_HELPER_RESERVED)
258 lastserver->deferred_requests--;
259 if (!(lastserver->request)) {
260 debug(29, 5) ("StatefulSubmit dispatching\n");
261 helperStatefulDispatch(lastserver, r);
262 } else {
263 debug(29, 5) ("StatefulSubmit queuing\n");
264 StatefulServerEnqueue(lastserver, r);
265 }
266 } else {
267 if ((srv = StatefulGetFirstAvailable(hlp))) {
268 helperStatefulDispatch(srv, r);
269 } else
270 StatefulEnqueue(hlp, r);
271 }
272 debug(29, 9) ("helperStatefulSubmit: placeholder: '%d', buf '%s'.\n", r->placeholder, buf);
273}
274
275helper_stateful_server *
276helperStatefulDefer(statefulhelper * hlp)
277/* find and add a deferred request to a server */
278{
279 dlink_node *n;
280 helper_stateful_server *srv = NULL, *rv = NULL;
281 if (hlp == NULL) {
282 debug(29, 3) ("helperStatefulReserve: hlp == NULL\n");
283 return NULL;
284 }
285 debug(29, 5) ("helperStatefulDefer: Running servers %d.\n", hlp->n_running);
286 if (hlp->n_running == 0) {
287 debug(29, 1) ("helperStatefulDefer: No running servers!. \n");
288 return NULL;
289 }
290 srv = StatefulGetFirstAvailable(hlp);
291 /* all currently busy:loop through servers and find server with the shortest queue */
292 rv = srv;
293 if (rv == NULL)
294 for (n = hlp->servers.head; n != NULL; n = n->next) {
295 srv = n->data;
296 if (srv->flags.reserved == S_HELPER_RESERVED)
297 continue;
298 if (!srv->flags.alive)
299 continue;
300 if ((hlp->IsAvailable != NULL) && (srv->data != NULL) &&
301 !(hlp->IsAvailable(srv->data)))
302 continue;
303 if ((rv != NULL) && (rv->deferred_requests < srv->deferred_requests))
304 continue;
305 rv = srv;
306 }
307 if (rv == NULL) {
308 debug(29, 1) ("helperStatefulDefer: None available.\n");
309 return NULL;
310 }
311 rv->flags.reserved = S_HELPER_DEFERRED;
312 rv->deferred_requests++;
313 return rv;
314}
315
316void
317helperStatefulReset(helper_stateful_server * srv)
318/* puts this helper back in the queue. the calling app is required to
319 * manage the state in the helper.
320 */
321{
322 statefulhelper *hlp = srv->parent;
323 helper_stateful_request *r;
324 r = srv->request;
325 if (r != NULL) {
326 /* reset attempt DURING an outstaning request */
327 debug(29, 1) ("helperStatefulReset: RESET During request %s \n",
328 hlp->id_name);
329 srv->flags.busy = 0;
330 srv->offset = 0;
331 helperStatefulRequestFree(r);
332 srv->request = NULL;
333 }
334 debug(29, 1) ("helperStatefulReset reset helper %s #%d\n", hlp->id_name, srv->index + 1);
335 srv->flags.busy = 0;
336 if (srv->queue.head) {
337 srv->flags.reserved = S_HELPER_DEFERRED;
338 helperStatefulServerKickQueue(srv);
339 } else {
340 srv->flags.reserved = S_HELPER_FREE;
341 if ((srv->parent->OnEmptyQueue != NULL) && (srv->data))
342 srv->parent->OnEmptyQueue(srv->data);
343 helperStatefulKickQueue(hlp);
344 }
345}
346
347void
348helperStatefulReleaseServer(helper_stateful_server * srv)
349/*decrease the number of 'waiting' clients that set the helper to be DEFERRED */
350{
351 if (srv->deferred_requests > 0)
352 srv->deferred_requests--;
353 if (!(srv->deferred_requests) && (srv->flags.reserved == S_HELPER_DEFERRED) && !(srv->queue.head)) {
354 srv->flags.reserved = S_HELPER_FREE;
355 if ((srv->parent->OnEmptyQueue != NULL) && (srv->data))
356 srv->parent->OnEmptyQueue(srv->data);
357 }
358}
359
360void *
361helperStatefulServerGetData(helper_stateful_server * srv)
362/* return a pointer to the stateful routines data area */
363{
364 return srv->data;
74addf6c 365}
366
367void
368helperStats(StoreEntry * sentry, helper * hlp)
369{
370 helper_server *srv;
371 dlink_node *link;
f4ae18d0 372 double tt;
74addf6c 373 storeAppendPrintf(sentry, "number running: %d of %d\n",
374 hlp->n_running, hlp->n_to_start);
375 storeAppendPrintf(sentry, "requests sent: %d\n",
376 hlp->stats.requests);
377 storeAppendPrintf(sentry, "replies received: %d\n",
378 hlp->stats.replies);
379 storeAppendPrintf(sentry, "queue length: %d\n",
380 hlp->stats.queue_size);
381 storeAppendPrintf(sentry, "avg service time: %d msec\n",
382 hlp->stats.avg_svc_time);
383 storeAppendPrintf(sentry, "\n");
592da4ec 384 storeAppendPrintf(sentry, "%7s\t%7s\t%11s\t%s\t%7s\t%7s\t%7s\n",
74addf6c 385 "#",
386 "FD",
387 "# Requests",
388 "Flags",
389 "Time",
592da4ec 390 "Offset",
391 "Request");
74addf6c 392 for (link = hlp->servers.head; link; link = link->next) {
393 srv = link->data;
f4ae18d0 394 tt = 0.001 * tvSubMsec(srv->dispatch_time, current_time);
592da4ec 395 storeAppendPrintf(sentry, "%7d\t%7d\t%11d\t%c%c%c%c\t%7.3f\t%7d\t%s\n",
74addf6c 396 srv->index + 1,
397 srv->rfd,
398 srv->stats.uses,
399 srv->flags.alive ? 'A' : ' ',
400 srv->flags.busy ? 'B' : ' ',
401 srv->flags.closing ? 'C' : ' ',
402 srv->flags.shutdown ? 'S' : ' ',
f4ae18d0 403 tt < 0.0 ? 0.0 : tt,
592da4ec 404 (int) srv->offset,
405 srv->request ? log_quote(srv->request->buf) : "(none)");
74addf6c 406 }
407 storeAppendPrintf(sentry, "\nFlags key:\n\n");
408 storeAppendPrintf(sentry, " A = ALIVE\n");
409 storeAppendPrintf(sentry, " B = BUSY\n");
410 storeAppendPrintf(sentry, " C = CLOSING\n");
411 storeAppendPrintf(sentry, " S = SHUTDOWN\n");
412}
413
94439e4e 414void
415helperStatefulStats(StoreEntry * sentry, statefulhelper * hlp)
416{
417 helper_stateful_server *srv;
418 dlink_node *link;
419 double tt;
420 storeAppendPrintf(sentry, "number running: %d of %d\n",
421 hlp->n_running, hlp->n_to_start);
422 storeAppendPrintf(sentry, "requests sent: %d\n",
423 hlp->stats.requests);
424 storeAppendPrintf(sentry, "replies received: %d\n",
425 hlp->stats.replies);
426 storeAppendPrintf(sentry, "queue length: %d\n",
427 hlp->stats.queue_size);
428 storeAppendPrintf(sentry, "avg service time: %d msec\n",
429 hlp->stats.avg_svc_time);
430 storeAppendPrintf(sentry, "\n");
431 storeAppendPrintf(sentry, "%7s\t%7s\t%11s\t%s\t%7s\t%7s\t%7s\n",
432 "#",
433 "FD",
434 "# Requests",
435 "# Deferred Requests",
436 "Flags",
437 "Time",
438 "Offset",
439 "Request");
440 for (link = hlp->servers.head; link; link = link->next) {
441 srv = link->data;
442 tt = 0.001 * tvSubMsec(srv->dispatch_time, current_time);
443 storeAppendPrintf(sentry, "%7d\t%7d\t%11d\t%11d\t%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
444 srv->index + 1,
445 srv->rfd,
446 srv->stats.uses,
447 srv->deferred_requests,
448 srv->flags.alive ? 'A' : ' ',
449 srv->flags.busy ? 'B' : ' ',
450 srv->flags.closing ? 'C' : ' ',
451 srv->flags.reserved != S_HELPER_FREE ? 'R' : ' ',
452 srv->flags.shutdown ? 'S' : ' ',
453 tt < 0.0 ? 0.0 : tt,
454 (int) srv->offset,
455 srv->request ? log_quote(srv->request->buf) : "(none)");
456 }
457 storeAppendPrintf(sentry, "\nFlags key:\n\n");
458 storeAppendPrintf(sentry, " A = ALIVE\n");
459 storeAppendPrintf(sentry, " B = BUSY\n");
460 storeAppendPrintf(sentry, " C = CLOSING\n");
461 storeAppendPrintf(sentry, " R = RESERVED or DEFERRED\n");
462 storeAppendPrintf(sentry, " S = SHUTDOWN\n");
463}
464
74addf6c 465void
466helperShutdown(helper * hlp)
467{
c68e9c6b 468 dlink_node *link = hlp->servers.head;
74addf6c 469 helper_server *srv;
c68e9c6b 470 while (link) {
74addf6c 471 srv = link->data;
c68e9c6b 472 link = link->next;
74addf6c 473 if (!srv->flags.alive) {
474 debug(34, 3) ("helperShutdown: %s #%d is NOT ALIVE.\n",
475 hlp->id_name, srv->index + 1);
476 continue;
477 }
1f5f60dd 478 srv->flags.shutdown = 1; /* request it to shut itself down */
74addf6c 479 if (srv->flags.busy) {
480 debug(34, 3) ("helperShutdown: %s #%d is BUSY.\n",
481 hlp->id_name, srv->index + 1);
74addf6c 482 continue;
483 }
484 if (srv->flags.closing) {
485 debug(34, 3) ("helperShutdown: %s #%d is CLOSING.\n",
486 hlp->id_name, srv->index + 1);
487 continue;
488 }
74addf6c 489 srv->flags.closing = 1;
3cdb7cd0 490 comm_close(srv->wfd);
491 srv->wfd = -1;
74addf6c 492 }
493}
494
94439e4e 495void
496helperStatefulShutdown(statefulhelper * hlp)
497{
498 dlink_node *link = hlp->servers.head;
499 helper_stateful_server *srv;
500 while (link) {
501 srv = link->data;
502 link = link->next;
503 if (!srv->flags.alive) {
504 debug(34, 3) ("helperStatefulShutdown: %s #%d is NOT ALIVE.\n",
505 hlp->id_name, srv->index + 1);
506 continue;
507 }
508 srv->flags.shutdown = 1; /* request it to shut itself down */
509 if (srv->flags.busy) {
510 debug(34, 3) ("helperStatefulShutdown: %s #%d is BUSY.\n",
511 hlp->id_name, srv->index + 1);
512 continue;
513 }
514 if (srv->flags.closing) {
515 debug(34, 3) ("helperStatefulShutdown: %s #%d is CLOSING.\n",
516 hlp->id_name, srv->index + 1);
517 continue;
518 }
519 if (srv->flags.reserved != S_HELPER_FREE) {
520 debug(34, 3) ("helperStatefulShutdown: %s #%d is RESERVED.\n",
521 hlp->id_name, srv->index + 1);
522 continue;
523 }
524 if (srv->deferred_requests) {
525 debug(34, 3) ("helperStatefulShutdown: %s #%d has DEFERRED requests.\n",
526 hlp->id_name, srv->index + 1);
527 continue;
528 }
529 srv->flags.closing = 1;
530 comm_close(srv->wfd);
531 srv->wfd = -1;
532 }
533}
534
535
1f5f60dd 536helper *
537helperCreate(const char *name)
538{
28c60158 539 helper *hlp;
540 hlp = CBDATA_ALLOC(helper, NULL);
1f5f60dd 541 hlp->id_name = name;
542 return hlp;
543}
544
94439e4e 545statefulhelper *
546helperStatefulCreate(const char *name)
547{
548 statefulhelper *hlp;
549 hlp = CBDATA_ALLOC(statefulhelper, NULL);
550 hlp->id_name = name;
551 return hlp;
552}
553
554
1f5f60dd 555void
556helperFree(helper * hlp)
557{
5dae8514 558 if (!hlp)
559 return;
1f5f60dd 560 /* note, don't free hlp->name, it probably points to static memory */
fe73896c 561 if (hlp->queue.head)
562 debug(29, 0) ("WARNING: freeing %s helper with %d requests queued\n",
563 hlp->id_name, hlp->stats.queue_size);
1f5f60dd 564 cbdataFree(hlp);
565}
566
94439e4e 567void
568helperStatefulFree(statefulhelper * hlp)
569{
5dae8514 570 if (!hlp)
571 return;
94439e4e 572 /* note, don't free hlp->name, it probably points to static memory */
573 if (hlp->queue.head)
574 debug(29, 0) ("WARNING: freeing %s helper with %d requests queued\n",
575 hlp->id_name, hlp->stats.queue_size);
576 cbdataFree(hlp);
577}
578
579
74addf6c 580/* ====================================================================== */
581/* LOCAL FUNCTIONS */
582/* ====================================================================== */
583
584static void
1f5f60dd 585helperServerFree(int fd, void *data)
74addf6c 586{
587 helper_server *srv = data;
588 helper *hlp = srv->parent;
ac750329 589 helper_request *r;
74addf6c 590 assert(srv->rfd == fd);
591 if (srv->buf) {
db1cd23c 592 memFree(srv->buf, MEM_8K_BUF);
74addf6c 593 srv->buf = NULL;
594 }
ac750329 595 if ((r = srv->request)) {
596 if (cbdataValid(r->data))
597 r->callback(r->data, srv->buf);
598 helperRequestFree(r);
63758217 599 srv->request = NULL;
ac750329 600 }
3cdb7cd0 601 if (srv->wfd != srv->rfd && srv->wfd != -1)
74addf6c 602 comm_close(srv->wfd);
603 dlinkDelete(&srv->link, &hlp->servers);
74addf6c 604 hlp->n_running--;
605 assert(hlp->n_running >= 0);
1f5f60dd 606 if (!srv->flags.shutdown) {
607 debug(34, 0) ("WARNING: %s #%d (FD %d) exited\n",
14e87a44 608 hlp->id_name, srv->index + 1, fd);
1f5f60dd 609 if (hlp->n_running < hlp->n_to_start / 2)
14e87a44 610 fatalf("Too few %s processes are running", hlp->id_name);
611 }
1f5f60dd 612 cbdataUnlock(srv->parent);
14e87a44 613 cbdataFree(srv);
74addf6c 614}
615
94439e4e 616static void
617helperStatefulServerFree(int fd, void *data)
618{
619 helper_stateful_server *srv = data;
620 statefulhelper *hlp = srv->parent;
621 helper_stateful_request *r;
622 assert(srv->rfd == fd);
623 if (srv->buf) {
624 memFree(srv->buf, MEM_8K_BUF);
625 srv->buf = NULL;
626 }
627 if ((r = srv->request)) {
628 if (cbdataValid(r->data))
629 r->callback(r->data, srv, srv->buf);
630 helperStatefulRequestFree(r);
631 srv->request = NULL;
632 }
633 if (srv->wfd != srv->rfd && srv->wfd != -1)
634 comm_close(srv->wfd);
635 dlinkDelete(&srv->link, &hlp->servers);
636 hlp->n_running--;
637 assert(hlp->n_running >= 0);
638 if (!srv->flags.shutdown) {
639 debug(34, 0) ("WARNING: %s #%d (FD %d) exited\n",
640 hlp->id_name, srv->index + 1, fd);
641 if (hlp->n_running < hlp->n_to_start / 2)
642 fatalf("Too few %s processes are running", hlp->id_name);
643 }
644 if (srv->data != NULL)
645 memPoolFree(hlp->datapool, srv->data);
646 cbdataUnlock(srv->parent);
647 cbdataFree(srv);
648}
649
650
74addf6c 651static void
652helperHandleRead(int fd, void *data)
653{
654 int len;
655 char *t = NULL;
656 helper_server *srv = data;
657 helper_request *r;
658 helper *hlp = srv->parent;
659 assert(fd == srv->rfd);
660 assert(cbdataValid(data));
83704487 661 statCounter.syscalls.sock.reads++;
74addf6c 662 len = read(fd, srv->buf + srv->offset, srv->buf_sz - srv->offset);
663 fd_bytes(fd, len, FD_READ);
664 debug(29, 5) ("helperHandleRead: %d bytes from %s #%d.\n",
665 len, hlp->id_name, srv->index + 1);
666 if (len <= 0) {
667 if (len < 0)
668 debug(50, 1) ("helperHandleRead: FD %d read: %s\n", fd, xstrerror());
669 comm_close(fd);
670 return;
671 }
672 srv->offset += len;
673 srv->buf[srv->offset] = '\0';
674 r = srv->request;
675 if (r == NULL) {
676 /* someone spoke without being spoken to */
677 debug(29, 1) ("helperHandleRead: unexpected read from %s #%d, %d bytes\n",
678 hlp->id_name, srv->index + 1, len);
679 srv->offset = 0;
680 } else if ((t = strchr(srv->buf, '\n'))) {
681 /* end of reply found */
682 debug(29, 3) ("helperHandleRead: end of reply found\n");
683 *t = '\0';
684 if (cbdataValid(r->data))
685 r->callback(r->data, srv->buf);
686 srv->flags.busy = 0;
687 srv->offset = 0;
688 helperRequestFree(r);
63758217 689 srv->request = NULL;
74addf6c 690 hlp->stats.replies++;
691 hlp->stats.avg_svc_time =
692 intAverage(hlp->stats.avg_svc_time,
693 tvSubMsec(srv->dispatch_time, current_time),
694 hlp->stats.replies, REDIRECT_AV_FACTOR);
3cdb7cd0 695 if (srv->flags.shutdown) {
74addf6c 696 comm_close(srv->wfd);
3cdb7cd0 697 srv->wfd = -1;
698 } else
c68e9c6b 699 helperKickQueue(hlp);
74addf6c 700 } else {
701 commSetSelect(srv->rfd, COMM_SELECT_READ, helperHandleRead, srv, 0);
702 }
74addf6c 703}
704
94439e4e 705static void
706helperStatefulHandleRead(int fd, void *data)
707{
708 int len;
709 char *t = NULL;
710 helper_stateful_server *srv = data;
711 helper_stateful_request *r;
712 statefulhelper *hlp = srv->parent;
713 assert(fd == srv->rfd);
714 assert(cbdataValid(data));
715 statCounter.syscalls.sock.reads++;
716 len = read(fd, srv->buf + srv->offset, srv->buf_sz - srv->offset);
717 fd_bytes(fd, len, FD_READ);
718 debug(29, 5) ("helperStatefulHandleRead: %d bytes from %s #%d.\n",
719 len, hlp->id_name, srv->index + 1);
720 if (len <= 0) {
721 if (len < 0)
722 debug(50, 1) ("helperStatefulHandleRead: FD %d read: %s\n", fd, xstrerror());
723 comm_close(fd);
724 return;
725 }
726 srv->offset += len;
727 srv->buf[srv->offset] = '\0';
728 r = srv->request;
729 if (r == NULL) {
730 /* someone spoke without being spoken to */
731 debug(29, 1) ("helperStatefulHandleRead: unexpected read from %s #%d, %d bytes\n",
732 hlp->id_name, srv->index + 1, len);
733 srv->offset = 0;
734 } else if ((t = strchr(srv->buf, '\n'))) {
735 /* end of reply found */
736 debug(29, 3) ("helperStatefulHandleRead: end of reply found\n");
737 *t = '\0';
738 if (cbdataValid(r->data)) {
739 switch ((r->callback(r->data, srv, srv->buf))) { /*if non-zero reserve helper */
740 case S_HELPER_UNKNOWN:
741 fatal("helperStatefulHandleRead: either a non-state aware callback was give to the stateful helper routines, or an uninitialised callback response was recieved.\n");
742 break;
743 case S_HELPER_RELEASE: /* helper finished with */
744 if (!srv->queue.head) {
745 srv->flags.reserved = S_HELPER_FREE;
746 if ((srv->parent->OnEmptyQueue != NULL) && (srv->data))
747 srv->parent->OnEmptyQueue(srv->data);
748 debug(29, 5) ("StatefulHandleRead: releasing %s #%d\n", hlp->id_name, srv->index + 1);
749 } else {
750 srv->flags.reserved = S_HELPER_DEFERRED;
751 debug(29, 5) ("StatefulHandleRead: outstanding deferred requests on %s #%d. reserving for deferred requests.\n", hlp->id_name, srv->index + 1);
752 }
753 break;
754 case S_HELPER_RESERVE: /* 'pin' this helper for the caller */
755 if (!srv->queue.head) {
756 srv->flags.reserved = S_HELPER_RESERVED;
757 debug(29, 5) ("StatefulHandleRead: reserving %s #%d\n", hlp->id_name, srv->index + 1);
758 } else {
759 fatal("StatefulHandleRead: Callback routine attempted to reserve a stateful helper with deferred requests. This can lead to deadlock.\n");
760 }
761 break;
762 case S_HELPER_DEFER:
763 /* the helper is still needed, but can
764 * be used for other requests in the meantime.
765 */
766 srv->flags.reserved = S_HELPER_DEFERRED;
767 srv->deferred_requests++;
768 debug(29, 5) ("StatefulHandleRead: reserving %s #%d for deferred requests.\n", hlp->id_name, srv->index + 1);
769 break;
770 default:
771 fatal("helperStatefulHandleRead: unknown stateful helper callback result.\n");
772 }
773
774 } else {
775 debug(29, 1) ("StatefulHandleRead: no callback data registered\n");
776 }
777 srv->flags.busy = 0;
778 srv->offset = 0;
779 helperStatefulRequestFree(r);
780 srv->request = NULL;
781 hlp->stats.replies++;
782 hlp->stats.avg_svc_time =
783 intAverage(hlp->stats.avg_svc_time,
784 tvSubMsec(srv->dispatch_time, current_time),
785 hlp->stats.replies, REDIRECT_AV_FACTOR);
786 if (srv->flags.shutdown) {
787 comm_close(srv->wfd);
788 srv->wfd = -1;
789 } else {
790 if (srv->queue.head)
791 helperStatefulServerKickQueue(srv);
792 else
793 helperStatefulKickQueue(hlp);
794 }
795 } else {
796 commSetSelect(srv->rfd, COMM_SELECT_READ, helperStatefulHandleRead, srv, 0);
797 }
798}
799
74addf6c 800static void
801Enqueue(helper * hlp, helper_request * r)
802{
c68e9c6b 803 dlink_node *link = memAllocate(MEM_DLINK_NODE);
74addf6c 804 dlinkAddTail(r, link, &hlp->queue);
805 hlp->stats.queue_size++;
806 if (hlp->stats.queue_size < hlp->n_running)
807 return;
808 if (squid_curtime - hlp->last_queue_warn < 600)
809 return;
fe73896c 810 if (shutting_down || reconfiguring)
811 return;
74addf6c 812 hlp->last_queue_warn = squid_curtime;
813 debug(14, 0) ("WARNING: All %s processes are busy.\n", hlp->id_name);
814 debug(14, 0) ("WARNING: %d pending requests queued\n", hlp->stats.queue_size);
815 if (hlp->stats.queue_size > hlp->n_running * 2)
816 fatalf("Too many queued %s requests", hlp->id_name);
817 debug(14, 1) ("Consider increasing the number of %s processes in your config file.\n", hlp->id_name);
818}
819
94439e4e 820static void
821StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r)
822{
823 dlink_node *link = memAllocate(MEM_DLINK_NODE);
824 dlinkAddTail(r, link, &hlp->queue);
825 hlp->stats.queue_size++;
826 if (hlp->stats.queue_size < hlp->n_running)
827 return;
828 if (squid_curtime - hlp->last_queue_warn < 600)
829 return;
830 if (shutting_down || reconfiguring)
831 return;
832 hlp->last_queue_warn = squid_curtime;
833 debug(14, 0) ("WARNING: All %s processes are busy.\n", hlp->id_name);
834 debug(14, 0) ("WARNING: %d pending requests queued\n", hlp->stats.queue_size);
835 if (hlp->stats.queue_size > hlp->n_running * 2)
836 fatalf("Too many queued %s requests", hlp->id_name);
837 debug(14, 1) ("Consider increasing the number of %s processes in your config file.\n", hlp->id_name);
838}
839
840static void
841StatefulServerEnqueue(helper_stateful_server * srv, helper_stateful_request * r)
842{
843 dlink_node *link = memAllocate(MEM_DLINK_NODE);
844 dlinkAddTail(r, link, &srv->queue);
845 /* XXX No queue length check here? */
846}
847
848
74addf6c 849static helper_request *
850Dequeue(helper * hlp)
851{
852 dlink_node *link;
853 helper_request *r = NULL;
854 if ((link = hlp->queue.head)) {
855 r = link->data;
856 dlinkDelete(link, &hlp->queue);
db1cd23c 857 memFree(link, MEM_DLINK_NODE);
74addf6c 858 hlp->stats.queue_size--;
859 }
860 return r;
861}
862
94439e4e 863static helper_stateful_request *
864StatefulServerDequeue(helper_stateful_server * srv)
865{
866 dlink_node *link;
867 helper_stateful_request *r = NULL;
868 if ((link = srv->queue.head)) {
869 r = link->data;
870 dlinkDelete(link, &srv->queue);
871 memFree(link, MEM_DLINK_NODE);
872 }
873 return r;
874}
875
876static helper_stateful_request *
877StatefulDequeue(statefulhelper * hlp)
878{
879 dlink_node *link;
880 helper_stateful_request *r = NULL;
881 if ((link = hlp->queue.head)) {
882 r = link->data;
883 dlinkDelete(link, &hlp->queue);
884 memFree(link, MEM_DLINK_NODE);
885 hlp->stats.queue_size--;
886 }
887 return r;
888}
889
74addf6c 890static helper_server *
891GetFirstAvailable(helper * hlp)
892{
893 dlink_node *n;
894 helper_server *srv = NULL;
fe73896c 895 if (hlp->n_running == 0)
896 return NULL;
74addf6c 897 for (n = hlp->servers.head; n != NULL; n = n->next) {
898 srv = n->data;
899 if (srv->flags.busy)
900 continue;
901 if (!srv->flags.alive)
902 continue;
903 return srv;
904 }
905 return NULL;
906}
907
94439e4e 908static helper_stateful_server *
909StatefulGetFirstAvailable(statefulhelper * hlp)
910{
911 dlink_node *n;
912 helper_stateful_server *srv = NULL;
913 debug(29, 5) ("StatefulGetFirstAvailable: Running servers %d.\n", hlp->n_running);
914 if (hlp->n_running == 0)
915 return NULL;
916 for (n = hlp->servers.head; n != NULL; n = n->next) {
917 srv = n->data;
918 if (srv->flags.busy)
919 continue;
920 if (srv->flags.reserved == S_HELPER_RESERVED)
921 continue;
922 if (!srv->flags.alive)
923 continue;
924 if ((hlp->IsAvailable != NULL) && (srv->data != NULL) && !(hlp->IsAvailable(srv->data)))
925 continue;
926 return srv;
927 }
928 debug(29, 5) ("StatefulGetFirstAvailable: None available.\n");
929 return NULL;
930}
931
932
74addf6c 933static void
934helperDispatch(helper_server * srv, helper_request * r)
935{
936 helper *hlp = srv->parent;
937 if (!cbdataValid(r->data)) {
938 debug(29, 1) ("helperDispatch: invalid callback data\n");
939 helperRequestFree(r);
940 return;
941 }
942 assert(!srv->flags.busy);
943 srv->flags.busy = 1;
944 srv->request = r;
945 srv->dispatch_time = current_time;
946 comm_write(srv->wfd,
947 r->buf,
948 strlen(r->buf),
949 NULL, /* Handler */
950 NULL, /* Handler-data */
951 NULL); /* free */
952 commSetSelect(srv->rfd,
953 COMM_SELECT_READ,
954 helperHandleRead,
955 srv, 0);
956 debug(29, 5) ("helperDispatch: Request sent to %s #%d, %d bytes\n",
957 hlp->id_name, srv->index + 1, strlen(r->buf));
958 srv->stats.uses++;
959 hlp->stats.requests++;
960}
961
94439e4e 962static void
963helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r)
964{
965 statefulhelper *hlp = srv->parent;
966 if (!cbdataValid(r->data)) {
967 debug(29, 1) ("helperStatefulDispatch: invalid callback data\n");
968 helperStatefulRequestFree(r);
969 return;
970 }
971 debug(29, 9) ("helperStatefulDispatch busying helper %s #%d\n", hlp->id_name, srv->index + 1);
972 if (r->placeholder == 1) {
973 /* a callback is needed before this request can _use_ a helper. */
974 if (cbdataValid(r->data)) {
975 /* we don't care about releasing/deferring this helper. The request NEVER
976 * gets to the helper. So we throw away the return code */
977 r->callback(r->data, srv, NULL);
978 /* throw away the placeholder */
979 helperStatefulRequestFree(r);
980 /* and push the queue. Note that the callback may have call submit again -
981 * which is why we test for the request*/
982 if (srv->request == NULL) {
983 if (srv->flags.shutdown) {
984 comm_close(srv->wfd);
985 srv->wfd = -1;
986 } else {
987 if (srv->queue.head)
988 helperStatefulServerKickQueue(srv);
989 else
990 helperStatefulKickQueue(hlp);
991 }
992 }
993 }
994 return;
995 }
996 srv->flags.busy = 1;
997 srv->request = r;
998 srv->dispatch_time = current_time;
999 comm_write(srv->wfd,
1000 r->buf,
1001 strlen(r->buf),
1002 NULL, /* Handler */
1003 NULL, /* Handler-data */
1004 NULL); /* free */
1005 commSetSelect(srv->rfd,
1006 COMM_SELECT_READ,
1007 helperStatefulHandleRead,
1008 srv, 0);
1009 debug(29, 5) ("helperStatefulDispatch: Request sent to %s #%d, %d bytes\n",
1010 hlp->id_name, srv->index + 1, strlen(r->buf));
1011 srv->stats.uses++;
1012 hlp->stats.requests++;
1013}
1014
1015
74addf6c 1016static void
1017helperKickQueue(helper * hlp)
1018{
1019 helper_request *r;
1020 helper_server *srv;
1021 while ((srv = GetFirstAvailable(hlp)) && (r = Dequeue(hlp)))
1022 helperDispatch(srv, r);
1023}
1024
94439e4e 1025static void
1026helperStatefulKickQueue(statefulhelper * hlp)
1027{
1028 helper_stateful_request *r;
1029 helper_stateful_server *srv;
1030 while ((srv = StatefulGetFirstAvailable(hlp)) && (r = StatefulDequeue(hlp)))
1031 helperStatefulDispatch(srv, r);
1032}
1033
1034static void
1035helperStatefulServerKickQueue(helper_stateful_server * srv)
1036{
1037 helper_stateful_request *r;
1038 if ((r = StatefulServerDequeue(srv)))
1039 helperStatefulDispatch(srv, r);
1040}
1041
74addf6c 1042static void
1043helperRequestFree(helper_request * r)
1044{
1045 cbdataUnlock(r->data);
1046 xfree(r->buf);
db1cd23c 1047 memFree(r, MEM_HELPER_REQUEST);
74addf6c 1048}
94439e4e 1049
1050static void
1051helperStatefulRequestFree(helper_stateful_request * r)
1052{
1053 cbdataUnlock(r->data);
1054 xfree(r->buf);
1055 memFree(r, MEM_HELPER_STATEFUL_REQUEST);
1056}