]> git.ipfire.org Git - thirdparty/squid.git/blame - src/helper.cc
Minor arguments mismatch in file_write declaration found by Andrey Shorin.
[thirdparty/squid.git] / src / helper.cc
CommitLineData
f740a279 1
2/*
721b0310 3 * $Id: helper.cc,v 1.28 2001/05/21 04:50:57 hno Exp $
f740a279 4 *
5 * DEBUG: section 29 Helper process maintenance
6 * AUTHOR: Harvest Derived?
7 *
2b6662ba 8 * SQUID Web Proxy Cache http://www.squid-cache.org/
f740a279 9 * ----------------------------------------------------------
10 *
2b6662ba 11 * Squid is the result of efforts by numerous individuals from
12 * the Internet community; see the CONTRIBUTORS file for full
13 * details. Many organizations have provided support for Squid's
14 * development; see the SPONSORS file for full details. Squid is
15 * Copyrighted (C) 2001 by the Regents of the University of
16 * California; see the COPYRIGHT file for full details. Squid
17 * incorporates software developed and/or copyrighted by other
18 * sources; see the CREDITS file for full details.
f740a279 19 *
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
24 *
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
29 *
30 * You should have received a copy of the GNU General Public License
31 * along with this program; if not, write to the Free Software
32 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
33 *
34 */
35
74addf6c 36#include "squid.h"
37
38#define HELPER_MAX_ARGS 64
39
40static PF helperHandleRead;
94439e4e 41static PF helperStatefulHandleRead;
1f5f60dd 42static PF helperServerFree;
94439e4e 43static PF helperStatefulServerFree;
74addf6c 44static void Enqueue(helper * hlp, helper_request *);
45static helper_request *Dequeue(helper * hlp);
94439e4e 46static helper_stateful_request *StatefulDequeue(statefulhelper * hlp);
74addf6c 47static helper_server *GetFirstAvailable(helper * hlp);
94439e4e 48static helper_stateful_server *StatefulGetFirstAvailable(statefulhelper * hlp);
74addf6c 49static void helperDispatch(helper_server * srv, helper_request * r);
94439e4e 50static void helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r);
74addf6c 51static void helperKickQueue(helper * hlp);
94439e4e 52static void helperStatefulKickQueue(statefulhelper * hlp);
74addf6c 53static void helperRequestFree(helper_request * r);
94439e4e 54static void helperStatefulRequestFree(helper_stateful_request * r);
55static void StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r);
56static helper_stateful_request *StatefulServerDequeue(helper_stateful_server * srv);
57static void StatefulServerEnqueue(helper_stateful_server * srv, helper_stateful_request * r);
58static void helperStatefulServerKickQueue(helper_stateful_server * srv);
74addf6c 59
60void
61helperOpenServers(helper * hlp)
62{
63 char *s;
64 char *progname;
65 char *shortname;
66 char *procname;
67 char *args[HELPER_MAX_ARGS];
68 char fd_note_buf[FD_DESC_SZ];
69 helper_server *srv;
70 int nargs = 0;
71 int k;
72 int x;
73 int rfd;
74 int wfd;
75 wordlist *w;
76 if (hlp->cmdline == NULL)
77 return;
78 progname = hlp->cmdline->key;
74addf6c 79 if ((s = strrchr(progname, '/')))
80 shortname = xstrdup(s + 1);
81 else
82 shortname = xstrdup(progname);
83 debug(29, 1) ("helperOpenServers: Starting %d '%s' processes\n",
84 hlp->n_to_start, shortname);
85 procname = xmalloc(strlen(shortname) + 3);
86 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
87 args[nargs++] = procname;
88 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next)
89 args[nargs++] = w->key;
90 args[nargs++] = NULL;
91 assert(nargs <= HELPER_MAX_ARGS);
92 for (k = 0; k < hlp->n_to_start; k++) {
c68e9c6b 93 getCurrentTime();
74addf6c 94 rfd = wfd = -1;
95 x = ipcCreate(hlp->ipc_type,
96 progname,
97 args,
98 shortname,
99 &rfd,
100 &wfd);
101 if (x < 0) {
102 debug(29, 1) ("WARNING: Cannot run '%s' process.\n", progname);
103 continue;
104 }
105 hlp->n_running++;
72711e31 106 srv = cbdataAlloc(helper_server);
74addf6c 107 srv->flags.alive = 1;
108 srv->index = k;
109 srv->rfd = rfd;
110 srv->wfd = wfd;
111 srv->buf = memAllocate(MEM_8K_BUF);
112 srv->buf_sz = 8192;
113 srv->offset = 0;
114 srv->parent = hlp;
1f5f60dd 115 cbdataLock(hlp); /* lock because of the parent backlink */
74addf6c 116 dlinkAddTail(srv, &srv->link, &hlp->servers);
117 if (rfd == wfd) {
118 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
119 fd_note(rfd, fd_note_buf);
120 } else {
121 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
122 fd_note(rfd, fd_note_buf);
123 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
124 fd_note(wfd, fd_note_buf);
125 }
126 commSetNonBlocking(rfd);
127 if (wfd != rfd)
128 commSetNonBlocking(wfd);
1f5f60dd 129 comm_add_close_handler(rfd, helperServerFree, srv);
74addf6c 130 }
131 safe_free(shortname);
132 safe_free(procname);
838b993c 133 helperKickQueue(hlp);
74addf6c 134}
135
94439e4e 136void
137helperStatefulOpenServers(statefulhelper * hlp)
138{
139 char *s;
140 char *progname;
141 char *shortname;
142 char *procname;
143 char *args[HELPER_MAX_ARGS];
144 char fd_note_buf[FD_DESC_SZ];
145 helper_stateful_server *srv;
146 int nargs = 0;
147 int k;
148 int x;
149 int rfd;
150 int wfd;
151 wordlist *w;
152 if (hlp->cmdline == NULL)
153 return;
154 progname = hlp->cmdline->key;
155 if ((s = strrchr(progname, '/')))
156 shortname = xstrdup(s + 1);
157 else
158 shortname = xstrdup(progname);
159 debug(29, 1) ("helperStatefulOpenServers: Starting %d '%s' processes\n",
160 hlp->n_to_start, shortname);
161 procname = xmalloc(strlen(shortname) + 3);
162 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
163 args[nargs++] = procname;
164 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next)
165 args[nargs++] = w->key;
166 args[nargs++] = NULL;
167 assert(nargs <= HELPER_MAX_ARGS);
168 for (k = 0; k < hlp->n_to_start; k++) {
169 getCurrentTime();
170 rfd = wfd = -1;
171 x = ipcCreate(hlp->ipc_type,
172 progname,
173 args,
174 shortname,
175 &rfd,
176 &wfd);
177 if (x < 0) {
178 debug(29, 1) ("WARNING: Cannot run '%s' process.\n", progname);
179 continue;
180 }
181 hlp->n_running++;
72711e31 182 srv = cbdataAlloc(helper_stateful_server);
94439e4e 183 srv->flags.alive = 1;
184 srv->flags.reserved = S_HELPER_FREE;
185 srv->deferred_requests = 0;
186 srv->index = k;
187 srv->rfd = rfd;
188 srv->wfd = wfd;
189 srv->buf = memAllocate(MEM_8K_BUF);
190 srv->buf_sz = 8192;
191 srv->offset = 0;
192 srv->parent = hlp;
193 if (hlp->datapool != NULL)
194 srv->data = memPoolAlloc(hlp->datapool);
195 cbdataLock(hlp); /* lock because of the parent backlink */
196 dlinkAddTail(srv, &srv->link, &hlp->servers);
197 if (rfd == wfd) {
198 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
199 fd_note(rfd, fd_note_buf);
200 } else {
201 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
202 fd_note(rfd, fd_note_buf);
203 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
204 fd_note(wfd, fd_note_buf);
205 }
206 commSetNonBlocking(rfd);
207 if (wfd != rfd)
208 commSetNonBlocking(wfd);
209 comm_add_close_handler(rfd, helperStatefulServerFree, srv);
210 }
211 safe_free(shortname);
212 safe_free(procname);
213 helperStatefulKickQueue(hlp);
214}
215
216
74addf6c 217void
218helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
219{
c68e9c6b 220 helper_request *r = memAllocate(MEM_HELPER_REQUEST);
74addf6c 221 helper_server *srv;
5b5f9257 222 if (hlp == NULL) {
7d47d8e6 223 debug(29, 3) ("helperSubmit: hlp == NULL\n");
5b5f9257 224 callback(data, NULL);
225 return;
226 }
74addf6c 227 r->callback = callback;
228 r->data = data;
229 r->buf = xstrdup(buf);
230 cbdataLock(r->data);
231 if ((srv = GetFirstAvailable(hlp)))
232 helperDispatch(srv, r);
233 else
234 Enqueue(hlp, r);
94439e4e 235 debug(29, 9) ("helperSubmit: %s\n", buf);
236}
237
721b0310 238/* lastserver = "server last used as part of a deferred or reserved
239 * request sequence"
240 */
94439e4e 241void
242helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPSCB * callback, void *data, helper_stateful_server * lastserver)
243{
244 helper_stateful_request *r = memAllocate(MEM_HELPER_STATEFUL_REQUEST);
245 helper_stateful_server *srv;
246 if (hlp == NULL) {
247 debug(29, 3) ("helperStatefulSubmit: hlp == NULL\n");
248 callback(data, 0, NULL);
249 return;
250 }
251 r->callback = callback;
252 r->data = data;
721b0310 253 if (buf != NULL) {
94439e4e 254 r->buf = xstrdup(buf);
721b0310 255 r->placeholder = 0;
256 } else {
257 r->buf = NULL;
94439e4e 258 r->placeholder = 1;
721b0310 259 }
94439e4e 260 cbdataLock(r->data);
261 if ((buf != NULL) && lastserver) {
262 debug(29, 5) ("StatefulSubmit with lastserver %d\n", lastserver);
263 if (lastserver->flags.reserved != S_HELPER_RESERVED)
264 lastserver->deferred_requests--;
265 if (!(lastserver->request)) {
266 debug(29, 5) ("StatefulSubmit dispatching\n");
267 helperStatefulDispatch(lastserver, r);
268 } else {
269 debug(29, 5) ("StatefulSubmit queuing\n");
270 StatefulServerEnqueue(lastserver, r);
271 }
272 } else {
273 if ((srv = StatefulGetFirstAvailable(hlp))) {
274 helperStatefulDispatch(srv, r);
275 } else
276 StatefulEnqueue(hlp, r);
277 }
278 debug(29, 9) ("helperStatefulSubmit: placeholder: '%d', buf '%s'.\n", r->placeholder, buf);
279}
280
281helper_stateful_server *
282helperStatefulDefer(statefulhelper * hlp)
283/* find and add a deferred request to a server */
284{
285 dlink_node *n;
286 helper_stateful_server *srv = NULL, *rv = NULL;
287 if (hlp == NULL) {
288 debug(29, 3) ("helperStatefulReserve: hlp == NULL\n");
289 return NULL;
290 }
291 debug(29, 5) ("helperStatefulDefer: Running servers %d.\n", hlp->n_running);
292 if (hlp->n_running == 0) {
293 debug(29, 1) ("helperStatefulDefer: No running servers!. \n");
294 return NULL;
295 }
296 srv = StatefulGetFirstAvailable(hlp);
297 /* all currently busy:loop through servers and find server with the shortest queue */
298 rv = srv;
299 if (rv == NULL)
300 for (n = hlp->servers.head; n != NULL; n = n->next) {
301 srv = n->data;
302 if (srv->flags.reserved == S_HELPER_RESERVED)
303 continue;
304 if (!srv->flags.alive)
305 continue;
306 if ((hlp->IsAvailable != NULL) && (srv->data != NULL) &&
307 !(hlp->IsAvailable(srv->data)))
308 continue;
309 if ((rv != NULL) && (rv->deferred_requests < srv->deferred_requests))
310 continue;
311 rv = srv;
312 }
313 if (rv == NULL) {
314 debug(29, 1) ("helperStatefulDefer: None available.\n");
315 return NULL;
316 }
317 rv->flags.reserved = S_HELPER_DEFERRED;
318 rv->deferred_requests++;
319 return rv;
320}
321
322void
323helperStatefulReset(helper_stateful_server * srv)
324/* puts this helper back in the queue. the calling app is required to
325 * manage the state in the helper.
326 */
327{
328 statefulhelper *hlp = srv->parent;
329 helper_stateful_request *r;
330 r = srv->request;
331 if (r != NULL) {
332 /* reset attempt DURING an outstaning request */
333 debug(29, 1) ("helperStatefulReset: RESET During request %s \n",
334 hlp->id_name);
335 srv->flags.busy = 0;
336 srv->offset = 0;
337 helperStatefulRequestFree(r);
338 srv->request = NULL;
339 }
340 debug(29, 1) ("helperStatefulReset reset helper %s #%d\n", hlp->id_name, srv->index + 1);
341 srv->flags.busy = 0;
342 if (srv->queue.head) {
343 srv->flags.reserved = S_HELPER_DEFERRED;
344 helperStatefulServerKickQueue(srv);
345 } else {
346 srv->flags.reserved = S_HELPER_FREE;
347 if ((srv->parent->OnEmptyQueue != NULL) && (srv->data))
348 srv->parent->OnEmptyQueue(srv->data);
349 helperStatefulKickQueue(hlp);
350 }
351}
352
353void
354helperStatefulReleaseServer(helper_stateful_server * srv)
355/*decrease the number of 'waiting' clients that set the helper to be DEFERRED */
356{
357 if (srv->deferred_requests > 0)
358 srv->deferred_requests--;
359 if (!(srv->deferred_requests) && (srv->flags.reserved == S_HELPER_DEFERRED) && !(srv->queue.head)) {
360 srv->flags.reserved = S_HELPER_FREE;
361 if ((srv->parent->OnEmptyQueue != NULL) && (srv->data))
362 srv->parent->OnEmptyQueue(srv->data);
363 }
364}
365
366void *
367helperStatefulServerGetData(helper_stateful_server * srv)
368/* return a pointer to the stateful routines data area */
369{
370 return srv->data;
74addf6c 371}
372
373void
374helperStats(StoreEntry * sentry, helper * hlp)
375{
376 helper_server *srv;
377 dlink_node *link;
f4ae18d0 378 double tt;
74addf6c 379 storeAppendPrintf(sentry, "number running: %d of %d\n",
380 hlp->n_running, hlp->n_to_start);
381 storeAppendPrintf(sentry, "requests sent: %d\n",
382 hlp->stats.requests);
383 storeAppendPrintf(sentry, "replies received: %d\n",
384 hlp->stats.replies);
385 storeAppendPrintf(sentry, "queue length: %d\n",
386 hlp->stats.queue_size);
387 storeAppendPrintf(sentry, "avg service time: %d msec\n",
388 hlp->stats.avg_svc_time);
389 storeAppendPrintf(sentry, "\n");
592da4ec 390 storeAppendPrintf(sentry, "%7s\t%7s\t%11s\t%s\t%7s\t%7s\t%7s\n",
74addf6c 391 "#",
392 "FD",
393 "# Requests",
394 "Flags",
395 "Time",
592da4ec 396 "Offset",
397 "Request");
74addf6c 398 for (link = hlp->servers.head; link; link = link->next) {
399 srv = link->data;
f4ae18d0 400 tt = 0.001 * tvSubMsec(srv->dispatch_time, current_time);
592da4ec 401 storeAppendPrintf(sentry, "%7d\t%7d\t%11d\t%c%c%c%c\t%7.3f\t%7d\t%s\n",
74addf6c 402 srv->index + 1,
403 srv->rfd,
404 srv->stats.uses,
405 srv->flags.alive ? 'A' : ' ',
406 srv->flags.busy ? 'B' : ' ',
407 srv->flags.closing ? 'C' : ' ',
408 srv->flags.shutdown ? 'S' : ' ',
f4ae18d0 409 tt < 0.0 ? 0.0 : tt,
592da4ec 410 (int) srv->offset,
411 srv->request ? log_quote(srv->request->buf) : "(none)");
74addf6c 412 }
413 storeAppendPrintf(sentry, "\nFlags key:\n\n");
414 storeAppendPrintf(sentry, " A = ALIVE\n");
415 storeAppendPrintf(sentry, " B = BUSY\n");
416 storeAppendPrintf(sentry, " C = CLOSING\n");
417 storeAppendPrintf(sentry, " S = SHUTDOWN\n");
418}
419
94439e4e 420void
421helperStatefulStats(StoreEntry * sentry, statefulhelper * hlp)
422{
423 helper_stateful_server *srv;
424 dlink_node *link;
425 double tt;
426 storeAppendPrintf(sentry, "number running: %d of %d\n",
427 hlp->n_running, hlp->n_to_start);
428 storeAppendPrintf(sentry, "requests sent: %d\n",
429 hlp->stats.requests);
430 storeAppendPrintf(sentry, "replies received: %d\n",
431 hlp->stats.replies);
432 storeAppendPrintf(sentry, "queue length: %d\n",
433 hlp->stats.queue_size);
434 storeAppendPrintf(sentry, "avg service time: %d msec\n",
435 hlp->stats.avg_svc_time);
436 storeAppendPrintf(sentry, "\n");
437 storeAppendPrintf(sentry, "%7s\t%7s\t%11s\t%s\t%7s\t%7s\t%7s\n",
438 "#",
439 "FD",
440 "# Requests",
441 "# Deferred Requests",
442 "Flags",
443 "Time",
444 "Offset",
445 "Request");
446 for (link = hlp->servers.head; link; link = link->next) {
447 srv = link->data;
448 tt = 0.001 * tvSubMsec(srv->dispatch_time, current_time);
449 storeAppendPrintf(sentry, "%7d\t%7d\t%11d\t%11d\t%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
450 srv->index + 1,
451 srv->rfd,
452 srv->stats.uses,
453 srv->deferred_requests,
454 srv->flags.alive ? 'A' : ' ',
455 srv->flags.busy ? 'B' : ' ',
456 srv->flags.closing ? 'C' : ' ',
457 srv->flags.reserved != S_HELPER_FREE ? 'R' : ' ',
458 srv->flags.shutdown ? 'S' : ' ',
459 tt < 0.0 ? 0.0 : tt,
460 (int) srv->offset,
461 srv->request ? log_quote(srv->request->buf) : "(none)");
462 }
463 storeAppendPrintf(sentry, "\nFlags key:\n\n");
464 storeAppendPrintf(sentry, " A = ALIVE\n");
465 storeAppendPrintf(sentry, " B = BUSY\n");
466 storeAppendPrintf(sentry, " C = CLOSING\n");
467 storeAppendPrintf(sentry, " R = RESERVED or DEFERRED\n");
468 storeAppendPrintf(sentry, " S = SHUTDOWN\n");
469}
470
74addf6c 471void
472helperShutdown(helper * hlp)
473{
c68e9c6b 474 dlink_node *link = hlp->servers.head;
74addf6c 475 helper_server *srv;
c68e9c6b 476 while (link) {
74addf6c 477 srv = link->data;
c68e9c6b 478 link = link->next;
74addf6c 479 if (!srv->flags.alive) {
480 debug(34, 3) ("helperShutdown: %s #%d is NOT ALIVE.\n",
481 hlp->id_name, srv->index + 1);
482 continue;
483 }
1f5f60dd 484 srv->flags.shutdown = 1; /* request it to shut itself down */
74addf6c 485 if (srv->flags.busy) {
486 debug(34, 3) ("helperShutdown: %s #%d is BUSY.\n",
487 hlp->id_name, srv->index + 1);
74addf6c 488 continue;
489 }
490 if (srv->flags.closing) {
491 debug(34, 3) ("helperShutdown: %s #%d is CLOSING.\n",
492 hlp->id_name, srv->index + 1);
493 continue;
494 }
74addf6c 495 srv->flags.closing = 1;
3cdb7cd0 496 comm_close(srv->wfd);
497 srv->wfd = -1;
74addf6c 498 }
499}
500
94439e4e 501void
502helperStatefulShutdown(statefulhelper * hlp)
503{
504 dlink_node *link = hlp->servers.head;
505 helper_stateful_server *srv;
506 while (link) {
507 srv = link->data;
508 link = link->next;
509 if (!srv->flags.alive) {
510 debug(34, 3) ("helperStatefulShutdown: %s #%d is NOT ALIVE.\n",
511 hlp->id_name, srv->index + 1);
512 continue;
513 }
514 srv->flags.shutdown = 1; /* request it to shut itself down */
515 if (srv->flags.busy) {
516 debug(34, 3) ("helperStatefulShutdown: %s #%d is BUSY.\n",
517 hlp->id_name, srv->index + 1);
518 continue;
519 }
520 if (srv->flags.closing) {
521 debug(34, 3) ("helperStatefulShutdown: %s #%d is CLOSING.\n",
522 hlp->id_name, srv->index + 1);
523 continue;
524 }
525 if (srv->flags.reserved != S_HELPER_FREE) {
526 debug(34, 3) ("helperStatefulShutdown: %s #%d is RESERVED.\n",
527 hlp->id_name, srv->index + 1);
528 continue;
529 }
530 if (srv->deferred_requests) {
531 debug(34, 3) ("helperStatefulShutdown: %s #%d has DEFERRED requests.\n",
532 hlp->id_name, srv->index + 1);
533 continue;
534 }
535 srv->flags.closing = 1;
536 comm_close(srv->wfd);
537 srv->wfd = -1;
538 }
539}
540
541
1f5f60dd 542helper *
543helperCreate(const char *name)
544{
28c60158 545 helper *hlp;
72711e31 546 hlp = cbdataAlloc(helper);
1f5f60dd 547 hlp->id_name = name;
548 return hlp;
549}
550
94439e4e 551statefulhelper *
552helperStatefulCreate(const char *name)
553{
554 statefulhelper *hlp;
72711e31 555 hlp = cbdataAlloc(statefulhelper);
94439e4e 556 hlp->id_name = name;
557 return hlp;
558}
559
560
1f5f60dd 561void
562helperFree(helper * hlp)
563{
5dae8514 564 if (!hlp)
565 return;
1f5f60dd 566 /* note, don't free hlp->name, it probably points to static memory */
fe73896c 567 if (hlp->queue.head)
568 debug(29, 0) ("WARNING: freeing %s helper with %d requests queued\n",
569 hlp->id_name, hlp->stats.queue_size);
1f5f60dd 570 cbdataFree(hlp);
571}
572
94439e4e 573void
574helperStatefulFree(statefulhelper * hlp)
575{
5dae8514 576 if (!hlp)
577 return;
94439e4e 578 /* note, don't free hlp->name, it probably points to static memory */
579 if (hlp->queue.head)
580 debug(29, 0) ("WARNING: freeing %s helper with %d requests queued\n",
581 hlp->id_name, hlp->stats.queue_size);
582 cbdataFree(hlp);
583}
584
585
74addf6c 586/* ====================================================================== */
587/* LOCAL FUNCTIONS */
588/* ====================================================================== */
589
590static void
1f5f60dd 591helperServerFree(int fd, void *data)
74addf6c 592{
593 helper_server *srv = data;
594 helper *hlp = srv->parent;
ac750329 595 helper_request *r;
74addf6c 596 assert(srv->rfd == fd);
597 if (srv->buf) {
db1cd23c 598 memFree(srv->buf, MEM_8K_BUF);
74addf6c 599 srv->buf = NULL;
600 }
ac750329 601 if ((r = srv->request)) {
602 if (cbdataValid(r->data))
603 r->callback(r->data, srv->buf);
604 helperRequestFree(r);
63758217 605 srv->request = NULL;
ac750329 606 }
3cdb7cd0 607 if (srv->wfd != srv->rfd && srv->wfd != -1)
74addf6c 608 comm_close(srv->wfd);
609 dlinkDelete(&srv->link, &hlp->servers);
74addf6c 610 hlp->n_running--;
611 assert(hlp->n_running >= 0);
1f5f60dd 612 if (!srv->flags.shutdown) {
613 debug(34, 0) ("WARNING: %s #%d (FD %d) exited\n",
14e87a44 614 hlp->id_name, srv->index + 1, fd);
1f5f60dd 615 if (hlp->n_running < hlp->n_to_start / 2)
14e87a44 616 fatalf("Too few %s processes are running", hlp->id_name);
617 }
1f5f60dd 618 cbdataUnlock(srv->parent);
14e87a44 619 cbdataFree(srv);
74addf6c 620}
621
94439e4e 622static void
623helperStatefulServerFree(int fd, void *data)
624{
625 helper_stateful_server *srv = data;
626 statefulhelper *hlp = srv->parent;
627 helper_stateful_request *r;
628 assert(srv->rfd == fd);
629 if (srv->buf) {
630 memFree(srv->buf, MEM_8K_BUF);
631 srv->buf = NULL;
632 }
633 if ((r = srv->request)) {
634 if (cbdataValid(r->data))
635 r->callback(r->data, srv, srv->buf);
636 helperStatefulRequestFree(r);
637 srv->request = NULL;
638 }
639 if (srv->wfd != srv->rfd && srv->wfd != -1)
640 comm_close(srv->wfd);
641 dlinkDelete(&srv->link, &hlp->servers);
642 hlp->n_running--;
643 assert(hlp->n_running >= 0);
644 if (!srv->flags.shutdown) {
645 debug(34, 0) ("WARNING: %s #%d (FD %d) exited\n",
646 hlp->id_name, srv->index + 1, fd);
647 if (hlp->n_running < hlp->n_to_start / 2)
648 fatalf("Too few %s processes are running", hlp->id_name);
649 }
650 if (srv->data != NULL)
651 memPoolFree(hlp->datapool, srv->data);
652 cbdataUnlock(srv->parent);
653 cbdataFree(srv);
654}
655
656
74addf6c 657static void
658helperHandleRead(int fd, void *data)
659{
660 int len;
661 char *t = NULL;
662 helper_server *srv = data;
663 helper_request *r;
664 helper *hlp = srv->parent;
665 assert(fd == srv->rfd);
666 assert(cbdataValid(data));
83704487 667 statCounter.syscalls.sock.reads++;
1f7c9178 668 len = FD_READ_METHOD(fd, srv->buf + srv->offset, srv->buf_sz - srv->offset);
74addf6c 669 fd_bytes(fd, len, FD_READ);
670 debug(29, 5) ("helperHandleRead: %d bytes from %s #%d.\n",
671 len, hlp->id_name, srv->index + 1);
672 if (len <= 0) {
673 if (len < 0)
674 debug(50, 1) ("helperHandleRead: FD %d read: %s\n", fd, xstrerror());
675 comm_close(fd);
676 return;
677 }
678 srv->offset += len;
679 srv->buf[srv->offset] = '\0';
680 r = srv->request;
681 if (r == NULL) {
682 /* someone spoke without being spoken to */
683 debug(29, 1) ("helperHandleRead: unexpected read from %s #%d, %d bytes\n",
684 hlp->id_name, srv->index + 1, len);
685 srv->offset = 0;
686 } else if ((t = strchr(srv->buf, '\n'))) {
687 /* end of reply found */
688 debug(29, 3) ("helperHandleRead: end of reply found\n");
689 *t = '\0';
690 if (cbdataValid(r->data))
691 r->callback(r->data, srv->buf);
692 srv->flags.busy = 0;
693 srv->offset = 0;
694 helperRequestFree(r);
63758217 695 srv->request = NULL;
74addf6c 696 hlp->stats.replies++;
697 hlp->stats.avg_svc_time =
698 intAverage(hlp->stats.avg_svc_time,
699 tvSubMsec(srv->dispatch_time, current_time),
700 hlp->stats.replies, REDIRECT_AV_FACTOR);
3cdb7cd0 701 if (srv->flags.shutdown) {
74addf6c 702 comm_close(srv->wfd);
3cdb7cd0 703 srv->wfd = -1;
704 } else
c68e9c6b 705 helperKickQueue(hlp);
74addf6c 706 } else {
707 commSetSelect(srv->rfd, COMM_SELECT_READ, helperHandleRead, srv, 0);
708 }
74addf6c 709}
710
94439e4e 711static void
712helperStatefulHandleRead(int fd, void *data)
713{
714 int len;
715 char *t = NULL;
716 helper_stateful_server *srv = data;
717 helper_stateful_request *r;
718 statefulhelper *hlp = srv->parent;
719 assert(fd == srv->rfd);
720 assert(cbdataValid(data));
721 statCounter.syscalls.sock.reads++;
722 len = read(fd, srv->buf + srv->offset, srv->buf_sz - srv->offset);
723 fd_bytes(fd, len, FD_READ);
724 debug(29, 5) ("helperStatefulHandleRead: %d bytes from %s #%d.\n",
725 len, hlp->id_name, srv->index + 1);
726 if (len <= 0) {
727 if (len < 0)
728 debug(50, 1) ("helperStatefulHandleRead: FD %d read: %s\n", fd, xstrerror());
729 comm_close(fd);
730 return;
731 }
732 srv->offset += len;
733 srv->buf[srv->offset] = '\0';
734 r = srv->request;
735 if (r == NULL) {
736 /* someone spoke without being spoken to */
737 debug(29, 1) ("helperStatefulHandleRead: unexpected read from %s #%d, %d bytes\n",
738 hlp->id_name, srv->index + 1, len);
739 srv->offset = 0;
740 } else if ((t = strchr(srv->buf, '\n'))) {
741 /* end of reply found */
742 debug(29, 3) ("helperStatefulHandleRead: end of reply found\n");
743 *t = '\0';
744 if (cbdataValid(r->data)) {
745 switch ((r->callback(r->data, srv, srv->buf))) { /*if non-zero reserve helper */
746 case S_HELPER_UNKNOWN:
747 fatal("helperStatefulHandleRead: either a non-state aware callback was give to the stateful helper routines, or an uninitialised callback response was recieved.\n");
748 break;
749 case S_HELPER_RELEASE: /* helper finished with */
750 if (!srv->queue.head) {
751 srv->flags.reserved = S_HELPER_FREE;
752 if ((srv->parent->OnEmptyQueue != NULL) && (srv->data))
753 srv->parent->OnEmptyQueue(srv->data);
754 debug(29, 5) ("StatefulHandleRead: releasing %s #%d\n", hlp->id_name, srv->index + 1);
755 } else {
756 srv->flags.reserved = S_HELPER_DEFERRED;
757 debug(29, 5) ("StatefulHandleRead: outstanding deferred requests on %s #%d. reserving for deferred requests.\n", hlp->id_name, srv->index + 1);
758 }
759 break;
760 case S_HELPER_RESERVE: /* 'pin' this helper for the caller */
761 if (!srv->queue.head) {
762 srv->flags.reserved = S_HELPER_RESERVED;
763 debug(29, 5) ("StatefulHandleRead: reserving %s #%d\n", hlp->id_name, srv->index + 1);
764 } else {
765 fatal("StatefulHandleRead: Callback routine attempted to reserve a stateful helper with deferred requests. This can lead to deadlock.\n");
766 }
767 break;
768 case S_HELPER_DEFER:
769 /* the helper is still needed, but can
770 * be used for other requests in the meantime.
771 */
772 srv->flags.reserved = S_HELPER_DEFERRED;
773 srv->deferred_requests++;
774 debug(29, 5) ("StatefulHandleRead: reserving %s #%d for deferred requests.\n", hlp->id_name, srv->index + 1);
775 break;
776 default:
777 fatal("helperStatefulHandleRead: unknown stateful helper callback result.\n");
778 }
779
780 } else {
781 debug(29, 1) ("StatefulHandleRead: no callback data registered\n");
782 }
783 srv->flags.busy = 0;
784 srv->offset = 0;
785 helperStatefulRequestFree(r);
786 srv->request = NULL;
787 hlp->stats.replies++;
788 hlp->stats.avg_svc_time =
789 intAverage(hlp->stats.avg_svc_time,
790 tvSubMsec(srv->dispatch_time, current_time),
791 hlp->stats.replies, REDIRECT_AV_FACTOR);
792 if (srv->flags.shutdown) {
793 comm_close(srv->wfd);
794 srv->wfd = -1;
795 } else {
796 if (srv->queue.head)
797 helperStatefulServerKickQueue(srv);
798 else
799 helperStatefulKickQueue(hlp);
800 }
801 } else {
802 commSetSelect(srv->rfd, COMM_SELECT_READ, helperStatefulHandleRead, srv, 0);
803 }
804}
805
74addf6c 806static void
807Enqueue(helper * hlp, helper_request * r)
808{
c68e9c6b 809 dlink_node *link = memAllocate(MEM_DLINK_NODE);
74addf6c 810 dlinkAddTail(r, link, &hlp->queue);
811 hlp->stats.queue_size++;
812 if (hlp->stats.queue_size < hlp->n_running)
813 return;
814 if (squid_curtime - hlp->last_queue_warn < 600)
815 return;
fe73896c 816 if (shutting_down || reconfiguring)
817 return;
74addf6c 818 hlp->last_queue_warn = squid_curtime;
819 debug(14, 0) ("WARNING: All %s processes are busy.\n", hlp->id_name);
820 debug(14, 0) ("WARNING: %d pending requests queued\n", hlp->stats.queue_size);
821 if (hlp->stats.queue_size > hlp->n_running * 2)
822 fatalf("Too many queued %s requests", hlp->id_name);
823 debug(14, 1) ("Consider increasing the number of %s processes in your config file.\n", hlp->id_name);
824}
825
94439e4e 826static void
827StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r)
828{
829 dlink_node *link = memAllocate(MEM_DLINK_NODE);
830 dlinkAddTail(r, link, &hlp->queue);
831 hlp->stats.queue_size++;
832 if (hlp->stats.queue_size < hlp->n_running)
833 return;
834 if (squid_curtime - hlp->last_queue_warn < 600)
835 return;
836 if (shutting_down || reconfiguring)
837 return;
838 hlp->last_queue_warn = squid_curtime;
839 debug(14, 0) ("WARNING: All %s processes are busy.\n", hlp->id_name);
840 debug(14, 0) ("WARNING: %d pending requests queued\n", hlp->stats.queue_size);
841 if (hlp->stats.queue_size > hlp->n_running * 2)
842 fatalf("Too many queued %s requests", hlp->id_name);
843 debug(14, 1) ("Consider increasing the number of %s processes in your config file.\n", hlp->id_name);
844}
845
846static void
847StatefulServerEnqueue(helper_stateful_server * srv, helper_stateful_request * r)
848{
849 dlink_node *link = memAllocate(MEM_DLINK_NODE);
850 dlinkAddTail(r, link, &srv->queue);
2d70df72 851/* TODO: warning if the queue on this server is more than X
852 * We don't check the queue size at the moment, because
853 * requests hitting here are deferrable
854 */
855/* hlp->stats.queue_size++;
856 * if (hlp->stats.queue_size < hlp->n_running)
857 * return;
858 * if (squid_curtime - hlp->last_queue_warn < 600)
859 * return;
860 * if (shutting_down || reconfiguring)
861 * return;
862 * hlp->last_queue_warn = squid_curtime;
863 * debug(14, 0) ("WARNING: All %s processes are busy.\n", hlp->id_name);
864 * debug(14, 0) ("WARNING: %d pending requests queued\n", hlp->stats.queue_size);
865 * if (hlp->stats.queue_size > hlp->n_running * 2)
866 * fatalf("Too many queued %s requests", hlp->id_name);
867 * debug(14, 1) ("Consider increasing the number of %s processes in your config file.\n", hlp->id_name); */
94439e4e 868}
869
870
74addf6c 871static helper_request *
872Dequeue(helper * hlp)
873{
874 dlink_node *link;
875 helper_request *r = NULL;
876 if ((link = hlp->queue.head)) {
877 r = link->data;
878 dlinkDelete(link, &hlp->queue);
db1cd23c 879 memFree(link, MEM_DLINK_NODE);
74addf6c 880 hlp->stats.queue_size--;
881 }
882 return r;
883}
884
94439e4e 885static helper_stateful_request *
886StatefulServerDequeue(helper_stateful_server * srv)
887{
888 dlink_node *link;
889 helper_stateful_request *r = NULL;
890 if ((link = srv->queue.head)) {
891 r = link->data;
892 dlinkDelete(link, &srv->queue);
893 memFree(link, MEM_DLINK_NODE);
894 }
895 return r;
896}
897
898static helper_stateful_request *
899StatefulDequeue(statefulhelper * hlp)
900{
901 dlink_node *link;
902 helper_stateful_request *r = NULL;
903 if ((link = hlp->queue.head)) {
904 r = link->data;
905 dlinkDelete(link, &hlp->queue);
906 memFree(link, MEM_DLINK_NODE);
907 hlp->stats.queue_size--;
908 }
909 return r;
910}
911
74addf6c 912static helper_server *
913GetFirstAvailable(helper * hlp)
914{
915 dlink_node *n;
916 helper_server *srv = NULL;
fe73896c 917 if (hlp->n_running == 0)
918 return NULL;
74addf6c 919 for (n = hlp->servers.head; n != NULL; n = n->next) {
920 srv = n->data;
921 if (srv->flags.busy)
922 continue;
923 if (!srv->flags.alive)
924 continue;
925 return srv;
926 }
927 return NULL;
928}
929
94439e4e 930static helper_stateful_server *
931StatefulGetFirstAvailable(statefulhelper * hlp)
932{
933 dlink_node *n;
934 helper_stateful_server *srv = NULL;
935 debug(29, 5) ("StatefulGetFirstAvailable: Running servers %d.\n", hlp->n_running);
936 if (hlp->n_running == 0)
937 return NULL;
938 for (n = hlp->servers.head; n != NULL; n = n->next) {
939 srv = n->data;
940 if (srv->flags.busy)
941 continue;
942 if (srv->flags.reserved == S_HELPER_RESERVED)
943 continue;
944 if (!srv->flags.alive)
945 continue;
946 if ((hlp->IsAvailable != NULL) && (srv->data != NULL) && !(hlp->IsAvailable(srv->data)))
947 continue;
948 return srv;
949 }
950 debug(29, 5) ("StatefulGetFirstAvailable: None available.\n");
951 return NULL;
952}
953
954
74addf6c 955static void
956helperDispatch(helper_server * srv, helper_request * r)
957{
958 helper *hlp = srv->parent;
959 if (!cbdataValid(r->data)) {
960 debug(29, 1) ("helperDispatch: invalid callback data\n");
961 helperRequestFree(r);
962 return;
963 }
964 assert(!srv->flags.busy);
965 srv->flags.busy = 1;
966 srv->request = r;
967 srv->dispatch_time = current_time;
968 comm_write(srv->wfd,
969 r->buf,
970 strlen(r->buf),
971 NULL, /* Handler */
972 NULL, /* Handler-data */
973 NULL); /* free */
974 commSetSelect(srv->rfd,
975 COMM_SELECT_READ,
976 helperHandleRead,
977 srv, 0);
978 debug(29, 5) ("helperDispatch: Request sent to %s #%d, %d bytes\n",
979 hlp->id_name, srv->index + 1, strlen(r->buf));
980 srv->stats.uses++;
981 hlp->stats.requests++;
982}
983
94439e4e 984static void
985helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r)
986{
987 statefulhelper *hlp = srv->parent;
988 if (!cbdataValid(r->data)) {
989 debug(29, 1) ("helperStatefulDispatch: invalid callback data\n");
990 helperStatefulRequestFree(r);
991 return;
992 }
993 debug(29, 9) ("helperStatefulDispatch busying helper %s #%d\n", hlp->id_name, srv->index + 1);
994 if (r->placeholder == 1) {
995 /* a callback is needed before this request can _use_ a helper. */
721b0310 996 /* we don't care about releasing/deferring this helper. The request NEVER
997 * gets to the helper. So we throw away the return code */
998 r->callback(r->data, srv, NULL);
999 /* throw away the placeholder */
1000 helperStatefulRequestFree(r);
1001 /* and push the queue. Note that the callback may have submitted a new
1002 * request to the helper which is why we test for the request*/
1003 if (srv->request == NULL) {
1004 if (srv->flags.shutdown) {
1005 comm_close(srv->wfd);
1006 srv->wfd = -1;
1007 } else {
1008 if (srv->queue.head)
1009 helperStatefulServerKickQueue(srv);
1010 else
1011 helperStatefulKickQueue(hlp);
94439e4e 1012 }
1013 }
1014 return;
1015 }
1016 srv->flags.busy = 1;
1017 srv->request = r;
1018 srv->dispatch_time = current_time;
1019 comm_write(srv->wfd,
1020 r->buf,
1021 strlen(r->buf),
1022 NULL, /* Handler */
1023 NULL, /* Handler-data */
1024 NULL); /* free */
1025 commSetSelect(srv->rfd,
1026 COMM_SELECT_READ,
1027 helperStatefulHandleRead,
1028 srv, 0);
1029 debug(29, 5) ("helperStatefulDispatch: Request sent to %s #%d, %d bytes\n",
1030 hlp->id_name, srv->index + 1, strlen(r->buf));
1031 srv->stats.uses++;
1032 hlp->stats.requests++;
1033}
1034
1035
74addf6c 1036static void
1037helperKickQueue(helper * hlp)
1038{
1039 helper_request *r;
1040 helper_server *srv;
1041 while ((srv = GetFirstAvailable(hlp)) && (r = Dequeue(hlp)))
1042 helperDispatch(srv, r);
1043}
1044
94439e4e 1045static void
1046helperStatefulKickQueue(statefulhelper * hlp)
1047{
1048 helper_stateful_request *r;
1049 helper_stateful_server *srv;
1050 while ((srv = StatefulGetFirstAvailable(hlp)) && (r = StatefulDequeue(hlp)))
1051 helperStatefulDispatch(srv, r);
1052}
1053
1054static void
1055helperStatefulServerKickQueue(helper_stateful_server * srv)
1056{
1057 helper_stateful_request *r;
1058 if ((r = StatefulServerDequeue(srv)))
1059 helperStatefulDispatch(srv, r);
1060}
1061
74addf6c 1062static void
1063helperRequestFree(helper_request * r)
1064{
1065 cbdataUnlock(r->data);
1066 xfree(r->buf);
db1cd23c 1067 memFree(r, MEM_HELPER_REQUEST);
74addf6c 1068}
94439e4e 1069
1070static void
1071helperStatefulRequestFree(helper_stateful_request * r)
1072{
1073 cbdataUnlock(r->data);
1074 xfree(r->buf);
1075 memFree(r, MEM_HELPER_STATEFUL_REQUEST);
1076}