]> git.ipfire.org Git - thirdparty/squid.git/blame - src/helper.cc
Mark successfull FTP transfers as complete to allow them to stay in the
[thirdparty/squid.git] / src / helper.cc
CommitLineData
f740a279 1
2/*
3c641669 3 * $Id: helper.cc,v 1.34 2001/11/28 08:01:46 robertc Exp $
f740a279 4 *
5 * DEBUG: section 29 Helper process maintenance
6 * AUTHOR: Harvest Derived?
7 *
2b6662ba 8 * SQUID Web Proxy Cache http://www.squid-cache.org/
f740a279 9 * ----------------------------------------------------------
10 *
2b6662ba 11 * Squid is the result of efforts by numerous individuals from
12 * the Internet community; see the CONTRIBUTORS file for full
13 * details. Many organizations have provided support for Squid's
14 * development; see the SPONSORS file for full details. Squid is
15 * Copyrighted (C) 2001 by the Regents of the University of
16 * California; see the COPYRIGHT file for full details. Squid
17 * incorporates software developed and/or copyrighted by other
18 * sources; see the CREDITS file for full details.
f740a279 19 *
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
24 *
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
29 *
30 * You should have received a copy of the GNU General Public License
31 * along with this program; if not, write to the Free Software
32 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
33 *
34 */
35
74addf6c 36#include "squid.h"
37
38#define HELPER_MAX_ARGS 64
39
40static PF helperHandleRead;
94439e4e 41static PF helperStatefulHandleRead;
1f5f60dd 42static PF helperServerFree;
94439e4e 43static PF helperStatefulServerFree;
74addf6c 44static void Enqueue(helper * hlp, helper_request *);
45static helper_request *Dequeue(helper * hlp);
94439e4e 46static helper_stateful_request *StatefulDequeue(statefulhelper * hlp);
74addf6c 47static helper_server *GetFirstAvailable(helper * hlp);
94439e4e 48static helper_stateful_server *StatefulGetFirstAvailable(statefulhelper * hlp);
74addf6c 49static void helperDispatch(helper_server * srv, helper_request * r);
94439e4e 50static void helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r);
74addf6c 51static void helperKickQueue(helper * hlp);
94439e4e 52static void helperStatefulKickQueue(statefulhelper * hlp);
74addf6c 53static void helperRequestFree(helper_request * r);
94439e4e 54static void helperStatefulRequestFree(helper_stateful_request * r);
55static void StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r);
56static helper_stateful_request *StatefulServerDequeue(helper_stateful_server * srv);
57static void StatefulServerEnqueue(helper_stateful_server * srv, helper_stateful_request * r);
58static void helperStatefulServerKickQueue(helper_stateful_server * srv);
74addf6c 59
60void
61helperOpenServers(helper * hlp)
62{
63 char *s;
64 char *progname;
65 char *shortname;
66 char *procname;
a2c963ae 67 const char *args[HELPER_MAX_ARGS];
74addf6c 68 char fd_note_buf[FD_DESC_SZ];
69 helper_server *srv;
70 int nargs = 0;
71 int k;
72 int x;
73 int rfd;
74 int wfd;
75 wordlist *w;
76 if (hlp->cmdline == NULL)
77 return;
78 progname = hlp->cmdline->key;
74addf6c 79 if ((s = strrchr(progname, '/')))
80 shortname = xstrdup(s + 1);
81 else
82 shortname = xstrdup(progname);
83 debug(29, 1) ("helperOpenServers: Starting %d '%s' processes\n",
84 hlp->n_to_start, shortname);
85 procname = xmalloc(strlen(shortname) + 3);
86 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
87 args[nargs++] = procname;
88 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next)
89 args[nargs++] = w->key;
90 args[nargs++] = NULL;
91 assert(nargs <= HELPER_MAX_ARGS);
92 for (k = 0; k < hlp->n_to_start; k++) {
c68e9c6b 93 getCurrentTime();
74addf6c 94 rfd = wfd = -1;
95 x = ipcCreate(hlp->ipc_type,
96 progname,
97 args,
98 shortname,
99 &rfd,
100 &wfd);
101 if (x < 0) {
102 debug(29, 1) ("WARNING: Cannot run '%s' process.\n", progname);
103 continue;
104 }
105 hlp->n_running++;
72711e31 106 srv = cbdataAlloc(helper_server);
74addf6c 107 srv->flags.alive = 1;
108 srv->index = k;
109 srv->rfd = rfd;
110 srv->wfd = wfd;
111 srv->buf = memAllocate(MEM_8K_BUF);
112 srv->buf_sz = 8192;
113 srv->offset = 0;
114 srv->parent = hlp;
1f5f60dd 115 cbdataLock(hlp); /* lock because of the parent backlink */
74addf6c 116 dlinkAddTail(srv, &srv->link, &hlp->servers);
117 if (rfd == wfd) {
118 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
119 fd_note(rfd, fd_note_buf);
120 } else {
121 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
122 fd_note(rfd, fd_note_buf);
123 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
124 fd_note(wfd, fd_note_buf);
125 }
126 commSetNonBlocking(rfd);
127 if (wfd != rfd)
128 commSetNonBlocking(wfd);
1f5f60dd 129 comm_add_close_handler(rfd, helperServerFree, srv);
74addf6c 130 }
131 safe_free(shortname);
132 safe_free(procname);
838b993c 133 helperKickQueue(hlp);
74addf6c 134}
135
94439e4e 136void
137helperStatefulOpenServers(statefulhelper * hlp)
138{
139 char *s;
140 char *progname;
141 char *shortname;
142 char *procname;
a2c963ae 143 const char *args[HELPER_MAX_ARGS];
94439e4e 144 char fd_note_buf[FD_DESC_SZ];
145 helper_stateful_server *srv;
146 int nargs = 0;
147 int k;
148 int x;
149 int rfd;
150 int wfd;
151 wordlist *w;
152 if (hlp->cmdline == NULL)
153 return;
154 progname = hlp->cmdline->key;
155 if ((s = strrchr(progname, '/')))
156 shortname = xstrdup(s + 1);
157 else
158 shortname = xstrdup(progname);
159 debug(29, 1) ("helperStatefulOpenServers: Starting %d '%s' processes\n",
160 hlp->n_to_start, shortname);
161 procname = xmalloc(strlen(shortname) + 3);
162 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
163 args[nargs++] = procname;
164 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next)
165 args[nargs++] = w->key;
166 args[nargs++] = NULL;
167 assert(nargs <= HELPER_MAX_ARGS);
168 for (k = 0; k < hlp->n_to_start; k++) {
169 getCurrentTime();
170 rfd = wfd = -1;
171 x = ipcCreate(hlp->ipc_type,
172 progname,
173 args,
174 shortname,
175 &rfd,
176 &wfd);
177 if (x < 0) {
178 debug(29, 1) ("WARNING: Cannot run '%s' process.\n", progname);
179 continue;
180 }
181 hlp->n_running++;
72711e31 182 srv = cbdataAlloc(helper_stateful_server);
5d146f7d 183 srv->pid = x;
94439e4e 184 srv->flags.alive = 1;
185 srv->flags.reserved = S_HELPER_FREE;
186 srv->deferred_requests = 0;
60d096f4 187 srv->stats.deferbyfunc = 0;
188 srv->stats.deferbycb = 0;
189 srv->stats.submits = 0;
190 srv->stats.releases = 0;
94439e4e 191 srv->index = k;
192 srv->rfd = rfd;
193 srv->wfd = wfd;
194 srv->buf = memAllocate(MEM_8K_BUF);
195 srv->buf_sz = 8192;
196 srv->offset = 0;
197 srv->parent = hlp;
198 if (hlp->datapool != NULL)
199 srv->data = memPoolAlloc(hlp->datapool);
200 cbdataLock(hlp); /* lock because of the parent backlink */
201 dlinkAddTail(srv, &srv->link, &hlp->servers);
202 if (rfd == wfd) {
203 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
204 fd_note(rfd, fd_note_buf);
205 } else {
206 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
207 fd_note(rfd, fd_note_buf);
208 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
209 fd_note(wfd, fd_note_buf);
210 }
211 commSetNonBlocking(rfd);
212 if (wfd != rfd)
213 commSetNonBlocking(wfd);
214 comm_add_close_handler(rfd, helperStatefulServerFree, srv);
215 }
216 safe_free(shortname);
217 safe_free(procname);
218 helperStatefulKickQueue(hlp);
219}
220
221
74addf6c 222void
223helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
224{
c68e9c6b 225 helper_request *r = memAllocate(MEM_HELPER_REQUEST);
74addf6c 226 helper_server *srv;
5b5f9257 227 if (hlp == NULL) {
7d47d8e6 228 debug(29, 3) ("helperSubmit: hlp == NULL\n");
5b5f9257 229 callback(data, NULL);
230 return;
231 }
74addf6c 232 r->callback = callback;
233 r->data = data;
234 r->buf = xstrdup(buf);
235 cbdataLock(r->data);
236 if ((srv = GetFirstAvailable(hlp)))
237 helperDispatch(srv, r);
238 else
239 Enqueue(hlp, r);
94439e4e 240 debug(29, 9) ("helperSubmit: %s\n", buf);
241}
242
721b0310 243/* lastserver = "server last used as part of a deferred or reserved
244 * request sequence"
245 */
94439e4e 246void
247helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPSCB * callback, void *data, helper_stateful_server * lastserver)
248{
249 helper_stateful_request *r = memAllocate(MEM_HELPER_STATEFUL_REQUEST);
250 helper_stateful_server *srv;
251 if (hlp == NULL) {
252 debug(29, 3) ("helperStatefulSubmit: hlp == NULL\n");
253 callback(data, 0, NULL);
254 return;
255 }
256 r->callback = callback;
257 r->data = data;
721b0310 258 if (buf != NULL) {
94439e4e 259 r->buf = xstrdup(buf);
721b0310 260 r->placeholder = 0;
261 } else {
262 r->buf = NULL;
94439e4e 263 r->placeholder = 1;
721b0310 264 }
94439e4e 265 cbdataLock(r->data);
266 if ((buf != NULL) && lastserver) {
38650cc8 267 debug(29, 5) ("StatefulSubmit with lastserver %p\n", lastserver);
60d096f4 268 /* the queue doesn't count for this assert because queued requests
269 * have already gone through here and been tested.
270 * It's legal to have deferred_requests == 0 and queue entries
271 * and status of S_HELPEER_DEFERRED.
272 * BUT: It's not legal to submit a new request w/lastserver in
273 * that state.
274 */
275 assert(!(lastserver->deferred_requests == 0 &&
276 lastserver->flags.reserved == S_HELPER_DEFERRED));
277 if (lastserver->flags.reserved != S_HELPER_RESERVED) {
278 lastserver->stats.submits++;
94439e4e 279 lastserver->deferred_requests--;
60d096f4 280 }
94439e4e 281 if (!(lastserver->request)) {
282 debug(29, 5) ("StatefulSubmit dispatching\n");
283 helperStatefulDispatch(lastserver, r);
284 } else {
285 debug(29, 5) ("StatefulSubmit queuing\n");
286 StatefulServerEnqueue(lastserver, r);
287 }
288 } else {
289 if ((srv = StatefulGetFirstAvailable(hlp))) {
290 helperStatefulDispatch(srv, r);
291 } else
292 StatefulEnqueue(hlp, r);
293 }
294 debug(29, 9) ("helperStatefulSubmit: placeholder: '%d', buf '%s'.\n", r->placeholder, buf);
295}
296
297helper_stateful_server *
298helperStatefulDefer(statefulhelper * hlp)
299/* find and add a deferred request to a server */
300{
301 dlink_node *n;
302 helper_stateful_server *srv = NULL, *rv = NULL;
303 if (hlp == NULL) {
304 debug(29, 3) ("helperStatefulReserve: hlp == NULL\n");
305 return NULL;
306 }
307 debug(29, 5) ("helperStatefulDefer: Running servers %d.\n", hlp->n_running);
308 if (hlp->n_running == 0) {
309 debug(29, 1) ("helperStatefulDefer: No running servers!. \n");
310 return NULL;
311 }
312 srv = StatefulGetFirstAvailable(hlp);
313 /* all currently busy:loop through servers and find server with the shortest queue */
314 rv = srv;
315 if (rv == NULL)
316 for (n = hlp->servers.head; n != NULL; n = n->next) {
317 srv = n->data;
318 if (srv->flags.reserved == S_HELPER_RESERVED)
319 continue;
320 if (!srv->flags.alive)
321 continue;
322 if ((hlp->IsAvailable != NULL) && (srv->data != NULL) &&
323 !(hlp->IsAvailable(srv->data)))
324 continue;
325 if ((rv != NULL) && (rv->deferred_requests < srv->deferred_requests))
326 continue;
327 rv = srv;
328 }
329 if (rv == NULL) {
330 debug(29, 1) ("helperStatefulDefer: None available.\n");
331 return NULL;
332 }
60d096f4 333 /* consistency check:
334 * when the deferred count is 0,
335 * submits + releases == deferbyfunc + deferbycb
336 * Or in english, when there are no deferred requests, the amount
337 * we have submitted to the queue or cancelled must equal the amount
338 * we have said we wanted to be able to submit or cancel
339 */
340 if (rv->deferred_requests == 0)
341 assert(rv->stats.submits + rv->stats.releases ==
342 rv->stats.deferbyfunc + rv->stats.deferbycb);
343
94439e4e 344 rv->flags.reserved = S_HELPER_DEFERRED;
345 rv->deferred_requests++;
60d096f4 346 rv->stats.deferbyfunc++;
94439e4e 347 return rv;
348}
349
350void
351helperStatefulReset(helper_stateful_server * srv)
352/* puts this helper back in the queue. the calling app is required to
353 * manage the state in the helper.
354 */
355{
356 statefulhelper *hlp = srv->parent;
357 helper_stateful_request *r;
358 r = srv->request;
359 if (r != NULL) {
360 /* reset attempt DURING an outstaning request */
361 debug(29, 1) ("helperStatefulReset: RESET During request %s \n",
362 hlp->id_name);
363 srv->flags.busy = 0;
364 srv->offset = 0;
365 helperStatefulRequestFree(r);
366 srv->request = NULL;
367 }
94439e4e 368 srv->flags.busy = 0;
369 if (srv->queue.head) {
370 srv->flags.reserved = S_HELPER_DEFERRED;
371 helperStatefulServerKickQueue(srv);
372 } else {
373 srv->flags.reserved = S_HELPER_FREE;
374 if ((srv->parent->OnEmptyQueue != NULL) && (srv->data))
375 srv->parent->OnEmptyQueue(srv->data);
376 helperStatefulKickQueue(hlp);
377 }
378}
379
380void
381helperStatefulReleaseServer(helper_stateful_server * srv)
382/*decrease the number of 'waiting' clients that set the helper to be DEFERRED */
383{
60d096f4 384 srv->stats.releases++;
385 if (srv->flags.reserved == S_HELPER_DEFERRED) {
386 assert(srv->deferred_requests);
94439e4e 387 srv->deferred_requests--;
60d096f4 388 }
94439e4e 389 if (!(srv->deferred_requests) && (srv->flags.reserved == S_HELPER_DEFERRED) && !(srv->queue.head)) {
390 srv->flags.reserved = S_HELPER_FREE;
391 if ((srv->parent->OnEmptyQueue != NULL) && (srv->data))
392 srv->parent->OnEmptyQueue(srv->data);
393 }
394}
395
396void *
397helperStatefulServerGetData(helper_stateful_server * srv)
398/* return a pointer to the stateful routines data area */
399{
400 return srv->data;
74addf6c 401}
402
403void
404helperStats(StoreEntry * sentry, helper * hlp)
405{
406 helper_server *srv;
407 dlink_node *link;
f4ae18d0 408 double tt;
74addf6c 409 storeAppendPrintf(sentry, "number running: %d of %d\n",
410 hlp->n_running, hlp->n_to_start);
411 storeAppendPrintf(sentry, "requests sent: %d\n",
412 hlp->stats.requests);
413 storeAppendPrintf(sentry, "replies received: %d\n",
414 hlp->stats.replies);
415 storeAppendPrintf(sentry, "queue length: %d\n",
416 hlp->stats.queue_size);
417 storeAppendPrintf(sentry, "avg service time: %d msec\n",
418 hlp->stats.avg_svc_time);
419 storeAppendPrintf(sentry, "\n");
592da4ec 420 storeAppendPrintf(sentry, "%7s\t%7s\t%11s\t%s\t%7s\t%7s\t%7s\n",
74addf6c 421 "#",
422 "FD",
423 "# Requests",
424 "Flags",
425 "Time",
592da4ec 426 "Offset",
427 "Request");
74addf6c 428 for (link = hlp->servers.head; link; link = link->next) {
429 srv = link->data;
f4ae18d0 430 tt = 0.001 * tvSubMsec(srv->dispatch_time, current_time);
592da4ec 431 storeAppendPrintf(sentry, "%7d\t%7d\t%11d\t%c%c%c%c\t%7.3f\t%7d\t%s\n",
74addf6c 432 srv->index + 1,
433 srv->rfd,
434 srv->stats.uses,
435 srv->flags.alive ? 'A' : ' ',
436 srv->flags.busy ? 'B' : ' ',
437 srv->flags.closing ? 'C' : ' ',
438 srv->flags.shutdown ? 'S' : ' ',
f4ae18d0 439 tt < 0.0 ? 0.0 : tt,
592da4ec 440 (int) srv->offset,
441 srv->request ? log_quote(srv->request->buf) : "(none)");
74addf6c 442 }
443 storeAppendPrintf(sentry, "\nFlags key:\n\n");
444 storeAppendPrintf(sentry, " A = ALIVE\n");
445 storeAppendPrintf(sentry, " B = BUSY\n");
446 storeAppendPrintf(sentry, " C = CLOSING\n");
447 storeAppendPrintf(sentry, " S = SHUTDOWN\n");
448}
449
94439e4e 450void
451helperStatefulStats(StoreEntry * sentry, statefulhelper * hlp)
452{
453 helper_stateful_server *srv;
454 dlink_node *link;
455 double tt;
456 storeAppendPrintf(sentry, "number running: %d of %d\n",
457 hlp->n_running, hlp->n_to_start);
458 storeAppendPrintf(sentry, "requests sent: %d\n",
459 hlp->stats.requests);
460 storeAppendPrintf(sentry, "replies received: %d\n",
461 hlp->stats.replies);
462 storeAppendPrintf(sentry, "queue length: %d\n",
463 hlp->stats.queue_size);
464 storeAppendPrintf(sentry, "avg service time: %d msec\n",
465 hlp->stats.avg_svc_time);
466 storeAppendPrintf(sentry, "\n");
38650cc8 467 storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%s\t%7s\t%7s\t%7s\t%7s\n",
94439e4e 468 "#",
469 "FD",
5d146f7d 470 "PID",
94439e4e 471 "# Requests",
472 "# Deferred Requests",
473 "Flags",
474 "Time",
475 "Offset",
476 "Request");
477 for (link = hlp->servers.head; link; link = link->next) {
478 srv = link->data;
479 tt = 0.001 * tvSubMsec(srv->dispatch_time, current_time);
5d146f7d 480 storeAppendPrintf(sentry, "%7d\t%7d\t%7d\t%11d\t%11d\t%c%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
94439e4e 481 srv->index + 1,
482 srv->rfd,
5d146f7d 483 srv->pid,
94439e4e 484 srv->stats.uses,
32754419 485 (int) srv->deferred_requests,
94439e4e 486 srv->flags.alive ? 'A' : ' ',
487 srv->flags.busy ? 'B' : ' ',
488 srv->flags.closing ? 'C' : ' ',
489 srv->flags.reserved != S_HELPER_FREE ? 'R' : ' ',
490 srv->flags.shutdown ? 'S' : ' ',
5d146f7d 491 srv->request ? (srv->request->placeholder ? 'P' : ' ') : ' ',
94439e4e 492 tt < 0.0 ? 0.0 : tt,
493 (int) srv->offset,
494 srv->request ? log_quote(srv->request->buf) : "(none)");
495 }
496 storeAppendPrintf(sentry, "\nFlags key:\n\n");
497 storeAppendPrintf(sentry, " A = ALIVE\n");
498 storeAppendPrintf(sentry, " B = BUSY\n");
499 storeAppendPrintf(sentry, " C = CLOSING\n");
500 storeAppendPrintf(sentry, " R = RESERVED or DEFERRED\n");
501 storeAppendPrintf(sentry, " S = SHUTDOWN\n");
5d146f7d 502 storeAppendPrintf(sentry, " P = PLACEHOLDER\n");
94439e4e 503}
504
74addf6c 505void
506helperShutdown(helper * hlp)
507{
c68e9c6b 508 dlink_node *link = hlp->servers.head;
74addf6c 509 helper_server *srv;
c68e9c6b 510 while (link) {
74addf6c 511 srv = link->data;
c68e9c6b 512 link = link->next;
74addf6c 513 if (!srv->flags.alive) {
514 debug(34, 3) ("helperShutdown: %s #%d is NOT ALIVE.\n",
515 hlp->id_name, srv->index + 1);
516 continue;
517 }
1f5f60dd 518 srv->flags.shutdown = 1; /* request it to shut itself down */
74addf6c 519 if (srv->flags.busy) {
520 debug(34, 3) ("helperShutdown: %s #%d is BUSY.\n",
521 hlp->id_name, srv->index + 1);
74addf6c 522 continue;
523 }
524 if (srv->flags.closing) {
525 debug(34, 3) ("helperShutdown: %s #%d is CLOSING.\n",
526 hlp->id_name, srv->index + 1);
527 continue;
528 }
74addf6c 529 srv->flags.closing = 1;
3cdb7cd0 530 comm_close(srv->wfd);
531 srv->wfd = -1;
74addf6c 532 }
533}
534
94439e4e 535void
536helperStatefulShutdown(statefulhelper * hlp)
537{
538 dlink_node *link = hlp->servers.head;
539 helper_stateful_server *srv;
540 while (link) {
541 srv = link->data;
542 link = link->next;
543 if (!srv->flags.alive) {
544 debug(34, 3) ("helperStatefulShutdown: %s #%d is NOT ALIVE.\n",
545 hlp->id_name, srv->index + 1);
546 continue;
547 }
548 srv->flags.shutdown = 1; /* request it to shut itself down */
549 if (srv->flags.busy) {
550 debug(34, 3) ("helperStatefulShutdown: %s #%d is BUSY.\n",
551 hlp->id_name, srv->index + 1);
552 continue;
553 }
554 if (srv->flags.closing) {
555 debug(34, 3) ("helperStatefulShutdown: %s #%d is CLOSING.\n",
556 hlp->id_name, srv->index + 1);
557 continue;
558 }
559 if (srv->flags.reserved != S_HELPER_FREE) {
560 debug(34, 3) ("helperStatefulShutdown: %s #%d is RESERVED.\n",
561 hlp->id_name, srv->index + 1);
562 continue;
563 }
564 if (srv->deferred_requests) {
565 debug(34, 3) ("helperStatefulShutdown: %s #%d has DEFERRED requests.\n",
566 hlp->id_name, srv->index + 1);
567 continue;
568 }
569 srv->flags.closing = 1;
570 comm_close(srv->wfd);
571 srv->wfd = -1;
572 }
573}
574
575
1f5f60dd 576helper *
577helperCreate(const char *name)
578{
28c60158 579 helper *hlp;
72711e31 580 hlp = cbdataAlloc(helper);
1f5f60dd 581 hlp->id_name = name;
582 return hlp;
583}
584
94439e4e 585statefulhelper *
586helperStatefulCreate(const char *name)
587{
588 statefulhelper *hlp;
72711e31 589 hlp = cbdataAlloc(statefulhelper);
94439e4e 590 hlp->id_name = name;
591 return hlp;
592}
593
594
1f5f60dd 595void
596helperFree(helper * hlp)
597{
5dae8514 598 if (!hlp)
599 return;
1f5f60dd 600 /* note, don't free hlp->name, it probably points to static memory */
fe73896c 601 if (hlp->queue.head)
602 debug(29, 0) ("WARNING: freeing %s helper with %d requests queued\n",
603 hlp->id_name, hlp->stats.queue_size);
1f5f60dd 604 cbdataFree(hlp);
605}
606
94439e4e 607void
608helperStatefulFree(statefulhelper * hlp)
609{
5dae8514 610 if (!hlp)
611 return;
94439e4e 612 /* note, don't free hlp->name, it probably points to static memory */
613 if (hlp->queue.head)
614 debug(29, 0) ("WARNING: freeing %s helper with %d requests queued\n",
615 hlp->id_name, hlp->stats.queue_size);
616 cbdataFree(hlp);
617}
618
619
74addf6c 620/* ====================================================================== */
621/* LOCAL FUNCTIONS */
622/* ====================================================================== */
623
624static void
1f5f60dd 625helperServerFree(int fd, void *data)
74addf6c 626{
627 helper_server *srv = data;
628 helper *hlp = srv->parent;
ac750329 629 helper_request *r;
74addf6c 630 assert(srv->rfd == fd);
631 if (srv->buf) {
db1cd23c 632 memFree(srv->buf, MEM_8K_BUF);
74addf6c 633 srv->buf = NULL;
634 }
ac750329 635 if ((r = srv->request)) {
636 if (cbdataValid(r->data))
637 r->callback(r->data, srv->buf);
638 helperRequestFree(r);
63758217 639 srv->request = NULL;
ac750329 640 }
3cdb7cd0 641 if (srv->wfd != srv->rfd && srv->wfd != -1)
74addf6c 642 comm_close(srv->wfd);
643 dlinkDelete(&srv->link, &hlp->servers);
74addf6c 644 hlp->n_running--;
645 assert(hlp->n_running >= 0);
1f5f60dd 646 if (!srv->flags.shutdown) {
647 debug(34, 0) ("WARNING: %s #%d (FD %d) exited\n",
14e87a44 648 hlp->id_name, srv->index + 1, fd);
1f5f60dd 649 if (hlp->n_running < hlp->n_to_start / 2)
14e87a44 650 fatalf("Too few %s processes are running", hlp->id_name);
651 }
1f5f60dd 652 cbdataUnlock(srv->parent);
14e87a44 653 cbdataFree(srv);
74addf6c 654}
655
94439e4e 656static void
657helperStatefulServerFree(int fd, void *data)
658{
659 helper_stateful_server *srv = data;
660 statefulhelper *hlp = srv->parent;
661 helper_stateful_request *r;
662 assert(srv->rfd == fd);
663 if (srv->buf) {
664 memFree(srv->buf, MEM_8K_BUF);
665 srv->buf = NULL;
666 }
667 if ((r = srv->request)) {
668 if (cbdataValid(r->data))
669 r->callback(r->data, srv, srv->buf);
670 helperStatefulRequestFree(r);
671 srv->request = NULL;
672 }
3c641669 673 /* TODO: walk the local queue of requests and carry them all out */
94439e4e 674 if (srv->wfd != srv->rfd && srv->wfd != -1)
675 comm_close(srv->wfd);
676 dlinkDelete(&srv->link, &hlp->servers);
677 hlp->n_running--;
678 assert(hlp->n_running >= 0);
679 if (!srv->flags.shutdown) {
680 debug(34, 0) ("WARNING: %s #%d (FD %d) exited\n",
681 hlp->id_name, srv->index + 1, fd);
682 if (hlp->n_running < hlp->n_to_start / 2)
683 fatalf("Too few %s processes are running", hlp->id_name);
684 }
685 if (srv->data != NULL)
686 memPoolFree(hlp->datapool, srv->data);
687 cbdataUnlock(srv->parent);
688 cbdataFree(srv);
689}
690
691
74addf6c 692static void
693helperHandleRead(int fd, void *data)
694{
695 int len;
696 char *t = NULL;
697 helper_server *srv = data;
698 helper_request *r;
699 helper *hlp = srv->parent;
700 assert(fd == srv->rfd);
701 assert(cbdataValid(data));
83704487 702 statCounter.syscalls.sock.reads++;
1f7c9178 703 len = FD_READ_METHOD(fd, srv->buf + srv->offset, srv->buf_sz - srv->offset);
74addf6c 704 fd_bytes(fd, len, FD_READ);
705 debug(29, 5) ("helperHandleRead: %d bytes from %s #%d.\n",
706 len, hlp->id_name, srv->index + 1);
707 if (len <= 0) {
708 if (len < 0)
709 debug(50, 1) ("helperHandleRead: FD %d read: %s\n", fd, xstrerror());
710 comm_close(fd);
711 return;
712 }
713 srv->offset += len;
714 srv->buf[srv->offset] = '\0';
715 r = srv->request;
716 if (r == NULL) {
717 /* someone spoke without being spoken to */
718 debug(29, 1) ("helperHandleRead: unexpected read from %s #%d, %d bytes\n",
719 hlp->id_name, srv->index + 1, len);
720 srv->offset = 0;
721 } else if ((t = strchr(srv->buf, '\n'))) {
722 /* end of reply found */
723 debug(29, 3) ("helperHandleRead: end of reply found\n");
724 *t = '\0';
725 if (cbdataValid(r->data))
726 r->callback(r->data, srv->buf);
727 srv->flags.busy = 0;
728 srv->offset = 0;
729 helperRequestFree(r);
63758217 730 srv->request = NULL;
74addf6c 731 hlp->stats.replies++;
732 hlp->stats.avg_svc_time =
733 intAverage(hlp->stats.avg_svc_time,
734 tvSubMsec(srv->dispatch_time, current_time),
735 hlp->stats.replies, REDIRECT_AV_FACTOR);
3cdb7cd0 736 if (srv->flags.shutdown) {
74addf6c 737 comm_close(srv->wfd);
3cdb7cd0 738 srv->wfd = -1;
739 } else
c68e9c6b 740 helperKickQueue(hlp);
74addf6c 741 } else {
742 commSetSelect(srv->rfd, COMM_SELECT_READ, helperHandleRead, srv, 0);
743 }
74addf6c 744}
745
94439e4e 746static void
747helperStatefulHandleRead(int fd, void *data)
748{
749 int len;
750 char *t = NULL;
751 helper_stateful_server *srv = data;
752 helper_stateful_request *r;
753 statefulhelper *hlp = srv->parent;
754 assert(fd == srv->rfd);
755 assert(cbdataValid(data));
756 statCounter.syscalls.sock.reads++;
757 len = read(fd, srv->buf + srv->offset, srv->buf_sz - srv->offset);
758 fd_bytes(fd, len, FD_READ);
759 debug(29, 5) ("helperStatefulHandleRead: %d bytes from %s #%d.\n",
760 len, hlp->id_name, srv->index + 1);
761 if (len <= 0) {
762 if (len < 0)
763 debug(50, 1) ("helperStatefulHandleRead: FD %d read: %s\n", fd, xstrerror());
764 comm_close(fd);
765 return;
766 }
767 srv->offset += len;
768 srv->buf[srv->offset] = '\0';
769 r = srv->request;
770 if (r == NULL) {
771 /* someone spoke without being spoken to */
772 debug(29, 1) ("helperStatefulHandleRead: unexpected read from %s #%d, %d bytes\n",
773 hlp->id_name, srv->index + 1, len);
774 srv->offset = 0;
775 } else if ((t = strchr(srv->buf, '\n'))) {
776 /* end of reply found */
777 debug(29, 3) ("helperStatefulHandleRead: end of reply found\n");
778 *t = '\0';
779 if (cbdataValid(r->data)) {
780 switch ((r->callback(r->data, srv, srv->buf))) { /*if non-zero reserve helper */
781 case S_HELPER_UNKNOWN:
782 fatal("helperStatefulHandleRead: either a non-state aware callback was give to the stateful helper routines, or an uninitialised callback response was recieved.\n");
783 break;
784 case S_HELPER_RELEASE: /* helper finished with */
60d096f4 785 if (!srv->deferred_requests && !srv->queue.head) {
94439e4e 786 srv->flags.reserved = S_HELPER_FREE;
787 if ((srv->parent->OnEmptyQueue != NULL) && (srv->data))
788 srv->parent->OnEmptyQueue(srv->data);
789 debug(29, 5) ("StatefulHandleRead: releasing %s #%d\n", hlp->id_name, srv->index + 1);
790 } else {
791 srv->flags.reserved = S_HELPER_DEFERRED;
792 debug(29, 5) ("StatefulHandleRead: outstanding deferred requests on %s #%d. reserving for deferred requests.\n", hlp->id_name, srv->index + 1);
793 }
794 break;
795 case S_HELPER_RESERVE: /* 'pin' this helper for the caller */
796 if (!srv->queue.head) {
60d096f4 797 assert(srv->deferred_requests == 0);
94439e4e 798 srv->flags.reserved = S_HELPER_RESERVED;
799 debug(29, 5) ("StatefulHandleRead: reserving %s #%d\n", hlp->id_name, srv->index + 1);
800 } else {
801 fatal("StatefulHandleRead: Callback routine attempted to reserve a stateful helper with deferred requests. This can lead to deadlock.\n");
802 }
803 break;
804 case S_HELPER_DEFER:
805 /* the helper is still needed, but can
806 * be used for other requests in the meantime.
807 */
808 srv->flags.reserved = S_HELPER_DEFERRED;
809 srv->deferred_requests++;
60d096f4 810 srv->stats.deferbycb++;
94439e4e 811 debug(29, 5) ("StatefulHandleRead: reserving %s #%d for deferred requests.\n", hlp->id_name, srv->index + 1);
812 break;
813 default:
814 fatal("helperStatefulHandleRead: unknown stateful helper callback result.\n");
815 }
816
817 } else {
818 debug(29, 1) ("StatefulHandleRead: no callback data registered\n");
819 }
820 srv->flags.busy = 0;
821 srv->offset = 0;
822 helperStatefulRequestFree(r);
823 srv->request = NULL;
824 hlp->stats.replies++;
825 hlp->stats.avg_svc_time =
826 intAverage(hlp->stats.avg_svc_time,
827 tvSubMsec(srv->dispatch_time, current_time),
828 hlp->stats.replies, REDIRECT_AV_FACTOR);
5d146f7d 829 if (srv->flags.shutdown
830 && srv->flags.reserved == S_HELPER_FREE
831 && !srv->deferred_requests) {
94439e4e 832 comm_close(srv->wfd);
833 srv->wfd = -1;
834 } else {
835 if (srv->queue.head)
836 helperStatefulServerKickQueue(srv);
837 else
838 helperStatefulKickQueue(hlp);
839 }
840 } else {
841 commSetSelect(srv->rfd, COMM_SELECT_READ, helperStatefulHandleRead, srv, 0);
842 }
843}
844
74addf6c 845static void
846Enqueue(helper * hlp, helper_request * r)
847{
c68e9c6b 848 dlink_node *link = memAllocate(MEM_DLINK_NODE);
74addf6c 849 dlinkAddTail(r, link, &hlp->queue);
850 hlp->stats.queue_size++;
851 if (hlp->stats.queue_size < hlp->n_running)
852 return;
853 if (squid_curtime - hlp->last_queue_warn < 600)
854 return;
fe73896c 855 if (shutting_down || reconfiguring)
856 return;
74addf6c 857 hlp->last_queue_warn = squid_curtime;
858 debug(14, 0) ("WARNING: All %s processes are busy.\n", hlp->id_name);
859 debug(14, 0) ("WARNING: %d pending requests queued\n", hlp->stats.queue_size);
860 if (hlp->stats.queue_size > hlp->n_running * 2)
861 fatalf("Too many queued %s requests", hlp->id_name);
862 debug(14, 1) ("Consider increasing the number of %s processes in your config file.\n", hlp->id_name);
863}
864
94439e4e 865static void
866StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r)
867{
868 dlink_node *link = memAllocate(MEM_DLINK_NODE);
869 dlinkAddTail(r, link, &hlp->queue);
870 hlp->stats.queue_size++;
871 if (hlp->stats.queue_size < hlp->n_running)
872 return;
873 if (squid_curtime - hlp->last_queue_warn < 600)
874 return;
875 if (shutting_down || reconfiguring)
876 return;
877 hlp->last_queue_warn = squid_curtime;
878 debug(14, 0) ("WARNING: All %s processes are busy.\n", hlp->id_name);
879 debug(14, 0) ("WARNING: %d pending requests queued\n", hlp->stats.queue_size);
880 if (hlp->stats.queue_size > hlp->n_running * 2)
881 fatalf("Too many queued %s requests", hlp->id_name);
882 debug(14, 1) ("Consider increasing the number of %s processes in your config file.\n", hlp->id_name);
883}
884
885static void
886StatefulServerEnqueue(helper_stateful_server * srv, helper_stateful_request * r)
887{
888 dlink_node *link = memAllocate(MEM_DLINK_NODE);
889 dlinkAddTail(r, link, &srv->queue);
2d70df72 890/* TODO: warning if the queue on this server is more than X
891 * We don't check the queue size at the moment, because
892 * requests hitting here are deferrable
893 */
894/* hlp->stats.queue_size++;
895 * if (hlp->stats.queue_size < hlp->n_running)
896 * return;
897 * if (squid_curtime - hlp->last_queue_warn < 600)
898 * return;
899 * if (shutting_down || reconfiguring)
900 * return;
901 * hlp->last_queue_warn = squid_curtime;
902 * debug(14, 0) ("WARNING: All %s processes are busy.\n", hlp->id_name);
903 * debug(14, 0) ("WARNING: %d pending requests queued\n", hlp->stats.queue_size);
904 * if (hlp->stats.queue_size > hlp->n_running * 2)
905 * fatalf("Too many queued %s requests", hlp->id_name);
906 * debug(14, 1) ("Consider increasing the number of %s processes in your config file.\n", hlp->id_name); */
94439e4e 907}
908
909
74addf6c 910static helper_request *
911Dequeue(helper * hlp)
912{
913 dlink_node *link;
914 helper_request *r = NULL;
915 if ((link = hlp->queue.head)) {
916 r = link->data;
917 dlinkDelete(link, &hlp->queue);
db1cd23c 918 memFree(link, MEM_DLINK_NODE);
74addf6c 919 hlp->stats.queue_size--;
920 }
921 return r;
922}
923
94439e4e 924static helper_stateful_request *
925StatefulServerDequeue(helper_stateful_server * srv)
926{
927 dlink_node *link;
928 helper_stateful_request *r = NULL;
929 if ((link = srv->queue.head)) {
930 r = link->data;
931 dlinkDelete(link, &srv->queue);
932 memFree(link, MEM_DLINK_NODE);
933 }
934 return r;
935}
936
937static helper_stateful_request *
938StatefulDequeue(statefulhelper * hlp)
939{
940 dlink_node *link;
941 helper_stateful_request *r = NULL;
942 if ((link = hlp->queue.head)) {
943 r = link->data;
944 dlinkDelete(link, &hlp->queue);
945 memFree(link, MEM_DLINK_NODE);
946 hlp->stats.queue_size--;
947 }
948 return r;
949}
950
74addf6c 951static helper_server *
952GetFirstAvailable(helper * hlp)
953{
954 dlink_node *n;
955 helper_server *srv = NULL;
fe73896c 956 if (hlp->n_running == 0)
957 return NULL;
74addf6c 958 for (n = hlp->servers.head; n != NULL; n = n->next) {
959 srv = n->data;
960 if (srv->flags.busy)
961 continue;
962 if (!srv->flags.alive)
963 continue;
964 return srv;
965 }
966 return NULL;
967}
968
94439e4e 969static helper_stateful_server *
970StatefulGetFirstAvailable(statefulhelper * hlp)
971{
972 dlink_node *n;
973 helper_stateful_server *srv = NULL;
974 debug(29, 5) ("StatefulGetFirstAvailable: Running servers %d.\n", hlp->n_running);
975 if (hlp->n_running == 0)
976 return NULL;
977 for (n = hlp->servers.head; n != NULL; n = n->next) {
978 srv = n->data;
979 if (srv->flags.busy)
980 continue;
981 if (srv->flags.reserved == S_HELPER_RESERVED)
982 continue;
983 if (!srv->flags.alive)
984 continue;
985 if ((hlp->IsAvailable != NULL) && (srv->data != NULL) && !(hlp->IsAvailable(srv->data)))
986 continue;
987 return srv;
988 }
989 debug(29, 5) ("StatefulGetFirstAvailable: None available.\n");
990 return NULL;
991}
992
993
74addf6c 994static void
995helperDispatch(helper_server * srv, helper_request * r)
996{
997 helper *hlp = srv->parent;
998 if (!cbdataValid(r->data)) {
999 debug(29, 1) ("helperDispatch: invalid callback data\n");
1000 helperRequestFree(r);
1001 return;
1002 }
1003 assert(!srv->flags.busy);
1004 srv->flags.busy = 1;
1005 srv->request = r;
1006 srv->dispatch_time = current_time;
1007 comm_write(srv->wfd,
1008 r->buf,
1009 strlen(r->buf),
1010 NULL, /* Handler */
1011 NULL, /* Handler-data */
1012 NULL); /* free */
1013 commSetSelect(srv->rfd,
1014 COMM_SELECT_READ,
1015 helperHandleRead,
1016 srv, 0);
1017 debug(29, 5) ("helperDispatch: Request sent to %s #%d, %d bytes\n",
32754419 1018 hlp->id_name, srv->index + 1, (int) strlen(r->buf));
74addf6c 1019 srv->stats.uses++;
1020 hlp->stats.requests++;
1021}
1022
94439e4e 1023static void
1024helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r)
1025{
1026 statefulhelper *hlp = srv->parent;
1027 if (!cbdataValid(r->data)) {
1028 debug(29, 1) ("helperStatefulDispatch: invalid callback data\n");
1029 helperStatefulRequestFree(r);
1030 return;
1031 }
1032 debug(29, 9) ("helperStatefulDispatch busying helper %s #%d\n", hlp->id_name, srv->index + 1);
1033 if (r->placeholder == 1) {
1034 /* a callback is needed before this request can _use_ a helper. */
721b0310 1035 /* we don't care about releasing/deferring this helper. The request NEVER
1036 * gets to the helper. So we throw away the return code */
1037 r->callback(r->data, srv, NULL);
1038 /* throw away the placeholder */
1039 helperStatefulRequestFree(r);
1040 /* and push the queue. Note that the callback may have submitted a new
1041 * request to the helper which is why we test for the request*/
1042 if (srv->request == NULL) {
5d146f7d 1043 if (srv->flags.shutdown
1044 && srv->flags.reserved == S_HELPER_FREE
1045 && !srv->deferred_requests) {
721b0310 1046 comm_close(srv->wfd);
1047 srv->wfd = -1;
1048 } else {
1049 if (srv->queue.head)
1050 helperStatefulServerKickQueue(srv);
1051 else
1052 helperStatefulKickQueue(hlp);
94439e4e 1053 }
1054 }
1055 return;
1056 }
1057 srv->flags.busy = 1;
1058 srv->request = r;
1059 srv->dispatch_time = current_time;
1060 comm_write(srv->wfd,
1061 r->buf,
1062 strlen(r->buf),
1063 NULL, /* Handler */
1064 NULL, /* Handler-data */
1065 NULL); /* free */
1066 commSetSelect(srv->rfd,
1067 COMM_SELECT_READ,
1068 helperStatefulHandleRead,
1069 srv, 0);
1070 debug(29, 5) ("helperStatefulDispatch: Request sent to %s #%d, %d bytes\n",
32754419 1071 hlp->id_name, srv->index + 1, (int) strlen(r->buf));
94439e4e 1072 srv->stats.uses++;
1073 hlp->stats.requests++;
1074}
1075
1076
74addf6c 1077static void
1078helperKickQueue(helper * hlp)
1079{
1080 helper_request *r;
1081 helper_server *srv;
1082 while ((srv = GetFirstAvailable(hlp)) && (r = Dequeue(hlp)))
1083 helperDispatch(srv, r);
1084}
1085
94439e4e 1086static void
1087helperStatefulKickQueue(statefulhelper * hlp)
1088{
1089 helper_stateful_request *r;
1090 helper_stateful_server *srv;
1091 while ((srv = StatefulGetFirstAvailable(hlp)) && (r = StatefulDequeue(hlp)))
1092 helperStatefulDispatch(srv, r);
1093}
1094
1095static void
1096helperStatefulServerKickQueue(helper_stateful_server * srv)
1097{
1098 helper_stateful_request *r;
1099 if ((r = StatefulServerDequeue(srv)))
1100 helperStatefulDispatch(srv, r);
1101}
1102
74addf6c 1103static void
1104helperRequestFree(helper_request * r)
1105{
1106 cbdataUnlock(r->data);
1107 xfree(r->buf);
db1cd23c 1108 memFree(r, MEM_HELPER_REQUEST);
74addf6c 1109}
94439e4e 1110
1111static void
1112helperStatefulRequestFree(helper_stateful_request * r)
1113{
1114 cbdataUnlock(r->data);
1115 xfree(r->buf);
1116 memFree(r, MEM_HELPER_STATEFUL_REQUEST);
1117}