]> git.ipfire.org Git - thirdparty/squid.git/blame - src/helper.cc
Removed some bad & unneeded SSL junk
[thirdparty/squid.git] / src / helper.cc
CommitLineData
f740a279 1
2/*
60d096f4 3 * $Id: helper.cc,v 1.29 2001/08/03 15:13:04 adrian Exp $
f740a279 4 *
5 * DEBUG: section 29 Helper process maintenance
6 * AUTHOR: Harvest Derived?
7 *
2b6662ba 8 * SQUID Web Proxy Cache http://www.squid-cache.org/
f740a279 9 * ----------------------------------------------------------
10 *
2b6662ba 11 * Squid is the result of efforts by numerous individuals from
12 * the Internet community; see the CONTRIBUTORS file for full
13 * details. Many organizations have provided support for Squid's
14 * development; see the SPONSORS file for full details. Squid is
15 * Copyrighted (C) 2001 by the Regents of the University of
16 * California; see the COPYRIGHT file for full details. Squid
17 * incorporates software developed and/or copyrighted by other
18 * sources; see the CREDITS file for full details.
f740a279 19 *
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
24 *
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
29 *
30 * You should have received a copy of the GNU General Public License
31 * along with this program; if not, write to the Free Software
32 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
33 *
34 */
35
74addf6c 36#include "squid.h"
37
38#define HELPER_MAX_ARGS 64
39
40static PF helperHandleRead;
94439e4e 41static PF helperStatefulHandleRead;
1f5f60dd 42static PF helperServerFree;
94439e4e 43static PF helperStatefulServerFree;
74addf6c 44static void Enqueue(helper * hlp, helper_request *);
45static helper_request *Dequeue(helper * hlp);
94439e4e 46static helper_stateful_request *StatefulDequeue(statefulhelper * hlp);
74addf6c 47static helper_server *GetFirstAvailable(helper * hlp);
94439e4e 48static helper_stateful_server *StatefulGetFirstAvailable(statefulhelper * hlp);
74addf6c 49static void helperDispatch(helper_server * srv, helper_request * r);
94439e4e 50static void helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r);
74addf6c 51static void helperKickQueue(helper * hlp);
94439e4e 52static void helperStatefulKickQueue(statefulhelper * hlp);
74addf6c 53static void helperRequestFree(helper_request * r);
94439e4e 54static void helperStatefulRequestFree(helper_stateful_request * r);
55static void StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r);
56static helper_stateful_request *StatefulServerDequeue(helper_stateful_server * srv);
57static void StatefulServerEnqueue(helper_stateful_server * srv, helper_stateful_request * r);
58static void helperStatefulServerKickQueue(helper_stateful_server * srv);
74addf6c 59
60void
61helperOpenServers(helper * hlp)
62{
63 char *s;
64 char *progname;
65 char *shortname;
66 char *procname;
67 char *args[HELPER_MAX_ARGS];
68 char fd_note_buf[FD_DESC_SZ];
69 helper_server *srv;
70 int nargs = 0;
71 int k;
72 int x;
73 int rfd;
74 int wfd;
75 wordlist *w;
76 if (hlp->cmdline == NULL)
77 return;
78 progname = hlp->cmdline->key;
74addf6c 79 if ((s = strrchr(progname, '/')))
80 shortname = xstrdup(s + 1);
81 else
82 shortname = xstrdup(progname);
83 debug(29, 1) ("helperOpenServers: Starting %d '%s' processes\n",
84 hlp->n_to_start, shortname);
85 procname = xmalloc(strlen(shortname) + 3);
86 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
87 args[nargs++] = procname;
88 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next)
89 args[nargs++] = w->key;
90 args[nargs++] = NULL;
91 assert(nargs <= HELPER_MAX_ARGS);
92 for (k = 0; k < hlp->n_to_start; k++) {
c68e9c6b 93 getCurrentTime();
74addf6c 94 rfd = wfd = -1;
95 x = ipcCreate(hlp->ipc_type,
96 progname,
97 args,
98 shortname,
99 &rfd,
100 &wfd);
101 if (x < 0) {
102 debug(29, 1) ("WARNING: Cannot run '%s' process.\n", progname);
103 continue;
104 }
105 hlp->n_running++;
72711e31 106 srv = cbdataAlloc(helper_server);
74addf6c 107 srv->flags.alive = 1;
108 srv->index = k;
109 srv->rfd = rfd;
110 srv->wfd = wfd;
111 srv->buf = memAllocate(MEM_8K_BUF);
112 srv->buf_sz = 8192;
113 srv->offset = 0;
114 srv->parent = hlp;
1f5f60dd 115 cbdataLock(hlp); /* lock because of the parent backlink */
74addf6c 116 dlinkAddTail(srv, &srv->link, &hlp->servers);
117 if (rfd == wfd) {
118 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
119 fd_note(rfd, fd_note_buf);
120 } else {
121 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
122 fd_note(rfd, fd_note_buf);
123 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
124 fd_note(wfd, fd_note_buf);
125 }
126 commSetNonBlocking(rfd);
127 if (wfd != rfd)
128 commSetNonBlocking(wfd);
1f5f60dd 129 comm_add_close_handler(rfd, helperServerFree, srv);
74addf6c 130 }
131 safe_free(shortname);
132 safe_free(procname);
838b993c 133 helperKickQueue(hlp);
74addf6c 134}
135
94439e4e 136void
137helperStatefulOpenServers(statefulhelper * hlp)
138{
139 char *s;
140 char *progname;
141 char *shortname;
142 char *procname;
143 char *args[HELPER_MAX_ARGS];
144 char fd_note_buf[FD_DESC_SZ];
145 helper_stateful_server *srv;
146 int nargs = 0;
147 int k;
148 int x;
149 int rfd;
150 int wfd;
151 wordlist *w;
152 if (hlp->cmdline == NULL)
153 return;
154 progname = hlp->cmdline->key;
155 if ((s = strrchr(progname, '/')))
156 shortname = xstrdup(s + 1);
157 else
158 shortname = xstrdup(progname);
159 debug(29, 1) ("helperStatefulOpenServers: Starting %d '%s' processes\n",
160 hlp->n_to_start, shortname);
161 procname = xmalloc(strlen(shortname) + 3);
162 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
163 args[nargs++] = procname;
164 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next)
165 args[nargs++] = w->key;
166 args[nargs++] = NULL;
167 assert(nargs <= HELPER_MAX_ARGS);
168 for (k = 0; k < hlp->n_to_start; k++) {
169 getCurrentTime();
170 rfd = wfd = -1;
171 x = ipcCreate(hlp->ipc_type,
172 progname,
173 args,
174 shortname,
175 &rfd,
176 &wfd);
177 if (x < 0) {
178 debug(29, 1) ("WARNING: Cannot run '%s' process.\n", progname);
179 continue;
180 }
181 hlp->n_running++;
72711e31 182 srv = cbdataAlloc(helper_stateful_server);
94439e4e 183 srv->flags.alive = 1;
184 srv->flags.reserved = S_HELPER_FREE;
185 srv->deferred_requests = 0;
60d096f4 186 srv->stats.deferbyfunc = 0;
187 srv->stats.deferbycb = 0;
188 srv->stats.submits = 0;
189 srv->stats.releases = 0;
94439e4e 190 srv->index = k;
191 srv->rfd = rfd;
192 srv->wfd = wfd;
193 srv->buf = memAllocate(MEM_8K_BUF);
194 srv->buf_sz = 8192;
195 srv->offset = 0;
196 srv->parent = hlp;
197 if (hlp->datapool != NULL)
198 srv->data = memPoolAlloc(hlp->datapool);
199 cbdataLock(hlp); /* lock because of the parent backlink */
200 dlinkAddTail(srv, &srv->link, &hlp->servers);
201 if (rfd == wfd) {
202 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
203 fd_note(rfd, fd_note_buf);
204 } else {
205 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
206 fd_note(rfd, fd_note_buf);
207 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
208 fd_note(wfd, fd_note_buf);
209 }
210 commSetNonBlocking(rfd);
211 if (wfd != rfd)
212 commSetNonBlocking(wfd);
213 comm_add_close_handler(rfd, helperStatefulServerFree, srv);
214 }
215 safe_free(shortname);
216 safe_free(procname);
217 helperStatefulKickQueue(hlp);
218}
219
220
74addf6c 221void
222helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
223{
c68e9c6b 224 helper_request *r = memAllocate(MEM_HELPER_REQUEST);
74addf6c 225 helper_server *srv;
5b5f9257 226 if (hlp == NULL) {
7d47d8e6 227 debug(29, 3) ("helperSubmit: hlp == NULL\n");
5b5f9257 228 callback(data, NULL);
229 return;
230 }
74addf6c 231 r->callback = callback;
232 r->data = data;
233 r->buf = xstrdup(buf);
234 cbdataLock(r->data);
235 if ((srv = GetFirstAvailable(hlp)))
236 helperDispatch(srv, r);
237 else
238 Enqueue(hlp, r);
94439e4e 239 debug(29, 9) ("helperSubmit: %s\n", buf);
240}
241
721b0310 242/* lastserver = "server last used as part of a deferred or reserved
243 * request sequence"
244 */
94439e4e 245void
246helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPSCB * callback, void *data, helper_stateful_server * lastserver)
247{
248 helper_stateful_request *r = memAllocate(MEM_HELPER_STATEFUL_REQUEST);
249 helper_stateful_server *srv;
250 if (hlp == NULL) {
251 debug(29, 3) ("helperStatefulSubmit: hlp == NULL\n");
252 callback(data, 0, NULL);
253 return;
254 }
255 r->callback = callback;
256 r->data = data;
721b0310 257 if (buf != NULL) {
94439e4e 258 r->buf = xstrdup(buf);
721b0310 259 r->placeholder = 0;
260 } else {
261 r->buf = NULL;
94439e4e 262 r->placeholder = 1;
721b0310 263 }
94439e4e 264 cbdataLock(r->data);
265 if ((buf != NULL) && lastserver) {
266 debug(29, 5) ("StatefulSubmit with lastserver %d\n", lastserver);
60d096f4 267 /* the queue doesn't count for this assert because queued requests
268 * have already gone through here and been tested.
269 * It's legal to have deferred_requests == 0 and queue entries
270 * and status of S_HELPEER_DEFERRED.
271 * BUT: It's not legal to submit a new request w/lastserver in
272 * that state.
273 */
274 assert(!(lastserver->deferred_requests == 0 &&
275 lastserver->flags.reserved == S_HELPER_DEFERRED));
276 if (lastserver->flags.reserved != S_HELPER_RESERVED) {
277 lastserver->stats.submits++;
94439e4e 278 lastserver->deferred_requests--;
60d096f4 279 }
94439e4e 280 if (!(lastserver->request)) {
281 debug(29, 5) ("StatefulSubmit dispatching\n");
282 helperStatefulDispatch(lastserver, r);
283 } else {
284 debug(29, 5) ("StatefulSubmit queuing\n");
285 StatefulServerEnqueue(lastserver, r);
286 }
287 } else {
288 if ((srv = StatefulGetFirstAvailable(hlp))) {
289 helperStatefulDispatch(srv, r);
290 } else
291 StatefulEnqueue(hlp, r);
292 }
293 debug(29, 9) ("helperStatefulSubmit: placeholder: '%d', buf '%s'.\n", r->placeholder, buf);
294}
295
296helper_stateful_server *
297helperStatefulDefer(statefulhelper * hlp)
298/* find and add a deferred request to a server */
299{
300 dlink_node *n;
301 helper_stateful_server *srv = NULL, *rv = NULL;
302 if (hlp == NULL) {
303 debug(29, 3) ("helperStatefulReserve: hlp == NULL\n");
304 return NULL;
305 }
306 debug(29, 5) ("helperStatefulDefer: Running servers %d.\n", hlp->n_running);
307 if (hlp->n_running == 0) {
308 debug(29, 1) ("helperStatefulDefer: No running servers!. \n");
309 return NULL;
310 }
311 srv = StatefulGetFirstAvailable(hlp);
312 /* all currently busy:loop through servers and find server with the shortest queue */
313 rv = srv;
314 if (rv == NULL)
315 for (n = hlp->servers.head; n != NULL; n = n->next) {
316 srv = n->data;
317 if (srv->flags.reserved == S_HELPER_RESERVED)
318 continue;
319 if (!srv->flags.alive)
320 continue;
321 if ((hlp->IsAvailable != NULL) && (srv->data != NULL) &&
322 !(hlp->IsAvailable(srv->data)))
323 continue;
324 if ((rv != NULL) && (rv->deferred_requests < srv->deferred_requests))
325 continue;
326 rv = srv;
327 }
328 if (rv == NULL) {
329 debug(29, 1) ("helperStatefulDefer: None available.\n");
330 return NULL;
331 }
60d096f4 332 /* consistency check:
333 * when the deferred count is 0,
334 * submits + releases == deferbyfunc + deferbycb
335 * Or in english, when there are no deferred requests, the amount
336 * we have submitted to the queue or cancelled must equal the amount
337 * we have said we wanted to be able to submit or cancel
338 */
339 if (rv->deferred_requests == 0)
340 assert(rv->stats.submits + rv->stats.releases ==
341 rv->stats.deferbyfunc + rv->stats.deferbycb);
342
94439e4e 343 rv->flags.reserved = S_HELPER_DEFERRED;
344 rv->deferred_requests++;
60d096f4 345 rv->stats.deferbyfunc++;
94439e4e 346 return rv;
347}
348
349void
350helperStatefulReset(helper_stateful_server * srv)
351/* puts this helper back in the queue. the calling app is required to
352 * manage the state in the helper.
353 */
354{
355 statefulhelper *hlp = srv->parent;
356 helper_stateful_request *r;
357 r = srv->request;
358 if (r != NULL) {
359 /* reset attempt DURING an outstaning request */
360 debug(29, 1) ("helperStatefulReset: RESET During request %s \n",
361 hlp->id_name);
362 srv->flags.busy = 0;
363 srv->offset = 0;
364 helperStatefulRequestFree(r);
365 srv->request = NULL;
366 }
94439e4e 367 srv->flags.busy = 0;
368 if (srv->queue.head) {
369 srv->flags.reserved = S_HELPER_DEFERRED;
370 helperStatefulServerKickQueue(srv);
371 } else {
372 srv->flags.reserved = S_HELPER_FREE;
373 if ((srv->parent->OnEmptyQueue != NULL) && (srv->data))
374 srv->parent->OnEmptyQueue(srv->data);
375 helperStatefulKickQueue(hlp);
376 }
377}
378
379void
380helperStatefulReleaseServer(helper_stateful_server * srv)
381/*decrease the number of 'waiting' clients that set the helper to be DEFERRED */
382{
60d096f4 383 srv->stats.releases++;
384 if (srv->flags.reserved == S_HELPER_DEFERRED) {
385 assert(srv->deferred_requests);
94439e4e 386 srv->deferred_requests--;
60d096f4 387 }
94439e4e 388 if (!(srv->deferred_requests) && (srv->flags.reserved == S_HELPER_DEFERRED) && !(srv->queue.head)) {
389 srv->flags.reserved = S_HELPER_FREE;
390 if ((srv->parent->OnEmptyQueue != NULL) && (srv->data))
391 srv->parent->OnEmptyQueue(srv->data);
392 }
393}
394
395void *
396helperStatefulServerGetData(helper_stateful_server * srv)
397/* return a pointer to the stateful routines data area */
398{
399 return srv->data;
74addf6c 400}
401
402void
403helperStats(StoreEntry * sentry, helper * hlp)
404{
405 helper_server *srv;
406 dlink_node *link;
f4ae18d0 407 double tt;
74addf6c 408 storeAppendPrintf(sentry, "number running: %d of %d\n",
409 hlp->n_running, hlp->n_to_start);
410 storeAppendPrintf(sentry, "requests sent: %d\n",
411 hlp->stats.requests);
412 storeAppendPrintf(sentry, "replies received: %d\n",
413 hlp->stats.replies);
414 storeAppendPrintf(sentry, "queue length: %d\n",
415 hlp->stats.queue_size);
416 storeAppendPrintf(sentry, "avg service time: %d msec\n",
417 hlp->stats.avg_svc_time);
418 storeAppendPrintf(sentry, "\n");
592da4ec 419 storeAppendPrintf(sentry, "%7s\t%7s\t%11s\t%s\t%7s\t%7s\t%7s\n",
74addf6c 420 "#",
421 "FD",
422 "# Requests",
423 "Flags",
424 "Time",
592da4ec 425 "Offset",
426 "Request");
74addf6c 427 for (link = hlp->servers.head; link; link = link->next) {
428 srv = link->data;
f4ae18d0 429 tt = 0.001 * tvSubMsec(srv->dispatch_time, current_time);
592da4ec 430 storeAppendPrintf(sentry, "%7d\t%7d\t%11d\t%c%c%c%c\t%7.3f\t%7d\t%s\n",
74addf6c 431 srv->index + 1,
432 srv->rfd,
433 srv->stats.uses,
434 srv->flags.alive ? 'A' : ' ',
435 srv->flags.busy ? 'B' : ' ',
436 srv->flags.closing ? 'C' : ' ',
437 srv->flags.shutdown ? 'S' : ' ',
f4ae18d0 438 tt < 0.0 ? 0.0 : tt,
592da4ec 439 (int) srv->offset,
440 srv->request ? log_quote(srv->request->buf) : "(none)");
74addf6c 441 }
442 storeAppendPrintf(sentry, "\nFlags key:\n\n");
443 storeAppendPrintf(sentry, " A = ALIVE\n");
444 storeAppendPrintf(sentry, " B = BUSY\n");
445 storeAppendPrintf(sentry, " C = CLOSING\n");
446 storeAppendPrintf(sentry, " S = SHUTDOWN\n");
447}
448
94439e4e 449void
450helperStatefulStats(StoreEntry * sentry, statefulhelper * hlp)
451{
452 helper_stateful_server *srv;
453 dlink_node *link;
454 double tt;
455 storeAppendPrintf(sentry, "number running: %d of %d\n",
456 hlp->n_running, hlp->n_to_start);
457 storeAppendPrintf(sentry, "requests sent: %d\n",
458 hlp->stats.requests);
459 storeAppendPrintf(sentry, "replies received: %d\n",
460 hlp->stats.replies);
461 storeAppendPrintf(sentry, "queue length: %d\n",
462 hlp->stats.queue_size);
463 storeAppendPrintf(sentry, "avg service time: %d msec\n",
464 hlp->stats.avg_svc_time);
465 storeAppendPrintf(sentry, "\n");
466 storeAppendPrintf(sentry, "%7s\t%7s\t%11s\t%s\t%7s\t%7s\t%7s\n",
467 "#",
468 "FD",
469 "# Requests",
470 "# Deferred Requests",
471 "Flags",
472 "Time",
473 "Offset",
474 "Request");
475 for (link = hlp->servers.head; link; link = link->next) {
476 srv = link->data;
477 tt = 0.001 * tvSubMsec(srv->dispatch_time, current_time);
478 storeAppendPrintf(sentry, "%7d\t%7d\t%11d\t%11d\t%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
479 srv->index + 1,
480 srv->rfd,
481 srv->stats.uses,
482 srv->deferred_requests,
483 srv->flags.alive ? 'A' : ' ',
484 srv->flags.busy ? 'B' : ' ',
485 srv->flags.closing ? 'C' : ' ',
486 srv->flags.reserved != S_HELPER_FREE ? 'R' : ' ',
487 srv->flags.shutdown ? 'S' : ' ',
488 tt < 0.0 ? 0.0 : tt,
489 (int) srv->offset,
490 srv->request ? log_quote(srv->request->buf) : "(none)");
491 }
492 storeAppendPrintf(sentry, "\nFlags key:\n\n");
493 storeAppendPrintf(sentry, " A = ALIVE\n");
494 storeAppendPrintf(sentry, " B = BUSY\n");
495 storeAppendPrintf(sentry, " C = CLOSING\n");
496 storeAppendPrintf(sentry, " R = RESERVED or DEFERRED\n");
497 storeAppendPrintf(sentry, " S = SHUTDOWN\n");
498}
499
74addf6c 500void
501helperShutdown(helper * hlp)
502{
c68e9c6b 503 dlink_node *link = hlp->servers.head;
74addf6c 504 helper_server *srv;
c68e9c6b 505 while (link) {
74addf6c 506 srv = link->data;
c68e9c6b 507 link = link->next;
74addf6c 508 if (!srv->flags.alive) {
509 debug(34, 3) ("helperShutdown: %s #%d is NOT ALIVE.\n",
510 hlp->id_name, srv->index + 1);
511 continue;
512 }
1f5f60dd 513 srv->flags.shutdown = 1; /* request it to shut itself down */
74addf6c 514 if (srv->flags.busy) {
515 debug(34, 3) ("helperShutdown: %s #%d is BUSY.\n",
516 hlp->id_name, srv->index + 1);
74addf6c 517 continue;
518 }
519 if (srv->flags.closing) {
520 debug(34, 3) ("helperShutdown: %s #%d is CLOSING.\n",
521 hlp->id_name, srv->index + 1);
522 continue;
523 }
74addf6c 524 srv->flags.closing = 1;
3cdb7cd0 525 comm_close(srv->wfd);
526 srv->wfd = -1;
74addf6c 527 }
528}
529
94439e4e 530void
531helperStatefulShutdown(statefulhelper * hlp)
532{
533 dlink_node *link = hlp->servers.head;
534 helper_stateful_server *srv;
535 while (link) {
536 srv = link->data;
537 link = link->next;
538 if (!srv->flags.alive) {
539 debug(34, 3) ("helperStatefulShutdown: %s #%d is NOT ALIVE.\n",
540 hlp->id_name, srv->index + 1);
541 continue;
542 }
543 srv->flags.shutdown = 1; /* request it to shut itself down */
544 if (srv->flags.busy) {
545 debug(34, 3) ("helperStatefulShutdown: %s #%d is BUSY.\n",
546 hlp->id_name, srv->index + 1);
547 continue;
548 }
549 if (srv->flags.closing) {
550 debug(34, 3) ("helperStatefulShutdown: %s #%d is CLOSING.\n",
551 hlp->id_name, srv->index + 1);
552 continue;
553 }
554 if (srv->flags.reserved != S_HELPER_FREE) {
555 debug(34, 3) ("helperStatefulShutdown: %s #%d is RESERVED.\n",
556 hlp->id_name, srv->index + 1);
557 continue;
558 }
559 if (srv->deferred_requests) {
560 debug(34, 3) ("helperStatefulShutdown: %s #%d has DEFERRED requests.\n",
561 hlp->id_name, srv->index + 1);
562 continue;
563 }
564 srv->flags.closing = 1;
565 comm_close(srv->wfd);
566 srv->wfd = -1;
567 }
568}
569
570
1f5f60dd 571helper *
572helperCreate(const char *name)
573{
28c60158 574 helper *hlp;
72711e31 575 hlp = cbdataAlloc(helper);
1f5f60dd 576 hlp->id_name = name;
577 return hlp;
578}
579
94439e4e 580statefulhelper *
581helperStatefulCreate(const char *name)
582{
583 statefulhelper *hlp;
72711e31 584 hlp = cbdataAlloc(statefulhelper);
94439e4e 585 hlp->id_name = name;
586 return hlp;
587}
588
589
1f5f60dd 590void
591helperFree(helper * hlp)
592{
5dae8514 593 if (!hlp)
594 return;
1f5f60dd 595 /* note, don't free hlp->name, it probably points to static memory */
fe73896c 596 if (hlp->queue.head)
597 debug(29, 0) ("WARNING: freeing %s helper with %d requests queued\n",
598 hlp->id_name, hlp->stats.queue_size);
1f5f60dd 599 cbdataFree(hlp);
600}
601
94439e4e 602void
603helperStatefulFree(statefulhelper * hlp)
604{
5dae8514 605 if (!hlp)
606 return;
94439e4e 607 /* note, don't free hlp->name, it probably points to static memory */
608 if (hlp->queue.head)
609 debug(29, 0) ("WARNING: freeing %s helper with %d requests queued\n",
610 hlp->id_name, hlp->stats.queue_size);
611 cbdataFree(hlp);
612}
613
614
74addf6c 615/* ====================================================================== */
616/* LOCAL FUNCTIONS */
617/* ====================================================================== */
618
619static void
1f5f60dd 620helperServerFree(int fd, void *data)
74addf6c 621{
622 helper_server *srv = data;
623 helper *hlp = srv->parent;
ac750329 624 helper_request *r;
74addf6c 625 assert(srv->rfd == fd);
626 if (srv->buf) {
db1cd23c 627 memFree(srv->buf, MEM_8K_BUF);
74addf6c 628 srv->buf = NULL;
629 }
ac750329 630 if ((r = srv->request)) {
631 if (cbdataValid(r->data))
632 r->callback(r->data, srv->buf);
633 helperRequestFree(r);
63758217 634 srv->request = NULL;
ac750329 635 }
3cdb7cd0 636 if (srv->wfd != srv->rfd && srv->wfd != -1)
74addf6c 637 comm_close(srv->wfd);
638 dlinkDelete(&srv->link, &hlp->servers);
74addf6c 639 hlp->n_running--;
640 assert(hlp->n_running >= 0);
1f5f60dd 641 if (!srv->flags.shutdown) {
642 debug(34, 0) ("WARNING: %s #%d (FD %d) exited\n",
14e87a44 643 hlp->id_name, srv->index + 1, fd);
1f5f60dd 644 if (hlp->n_running < hlp->n_to_start / 2)
14e87a44 645 fatalf("Too few %s processes are running", hlp->id_name);
646 }
1f5f60dd 647 cbdataUnlock(srv->parent);
14e87a44 648 cbdataFree(srv);
74addf6c 649}
650
94439e4e 651static void
652helperStatefulServerFree(int fd, void *data)
653{
654 helper_stateful_server *srv = data;
655 statefulhelper *hlp = srv->parent;
656 helper_stateful_request *r;
657 assert(srv->rfd == fd);
658 if (srv->buf) {
659 memFree(srv->buf, MEM_8K_BUF);
660 srv->buf = NULL;
661 }
662 if ((r = srv->request)) {
663 if (cbdataValid(r->data))
664 r->callback(r->data, srv, srv->buf);
665 helperStatefulRequestFree(r);
666 srv->request = NULL;
667 }
668 if (srv->wfd != srv->rfd && srv->wfd != -1)
669 comm_close(srv->wfd);
670 dlinkDelete(&srv->link, &hlp->servers);
671 hlp->n_running--;
672 assert(hlp->n_running >= 0);
673 if (!srv->flags.shutdown) {
674 debug(34, 0) ("WARNING: %s #%d (FD %d) exited\n",
675 hlp->id_name, srv->index + 1, fd);
676 if (hlp->n_running < hlp->n_to_start / 2)
677 fatalf("Too few %s processes are running", hlp->id_name);
678 }
679 if (srv->data != NULL)
680 memPoolFree(hlp->datapool, srv->data);
681 cbdataUnlock(srv->parent);
682 cbdataFree(srv);
683}
684
685
74addf6c 686static void
687helperHandleRead(int fd, void *data)
688{
689 int len;
690 char *t = NULL;
691 helper_server *srv = data;
692 helper_request *r;
693 helper *hlp = srv->parent;
694 assert(fd == srv->rfd);
695 assert(cbdataValid(data));
83704487 696 statCounter.syscalls.sock.reads++;
1f7c9178 697 len = FD_READ_METHOD(fd, srv->buf + srv->offset, srv->buf_sz - srv->offset);
74addf6c 698 fd_bytes(fd, len, FD_READ);
699 debug(29, 5) ("helperHandleRead: %d bytes from %s #%d.\n",
700 len, hlp->id_name, srv->index + 1);
701 if (len <= 0) {
702 if (len < 0)
703 debug(50, 1) ("helperHandleRead: FD %d read: %s\n", fd, xstrerror());
704 comm_close(fd);
705 return;
706 }
707 srv->offset += len;
708 srv->buf[srv->offset] = '\0';
709 r = srv->request;
710 if (r == NULL) {
711 /* someone spoke without being spoken to */
712 debug(29, 1) ("helperHandleRead: unexpected read from %s #%d, %d bytes\n",
713 hlp->id_name, srv->index + 1, len);
714 srv->offset = 0;
715 } else if ((t = strchr(srv->buf, '\n'))) {
716 /* end of reply found */
717 debug(29, 3) ("helperHandleRead: end of reply found\n");
718 *t = '\0';
719 if (cbdataValid(r->data))
720 r->callback(r->data, srv->buf);
721 srv->flags.busy = 0;
722 srv->offset = 0;
723 helperRequestFree(r);
63758217 724 srv->request = NULL;
74addf6c 725 hlp->stats.replies++;
726 hlp->stats.avg_svc_time =
727 intAverage(hlp->stats.avg_svc_time,
728 tvSubMsec(srv->dispatch_time, current_time),
729 hlp->stats.replies, REDIRECT_AV_FACTOR);
3cdb7cd0 730 if (srv->flags.shutdown) {
74addf6c 731 comm_close(srv->wfd);
3cdb7cd0 732 srv->wfd = -1;
733 } else
c68e9c6b 734 helperKickQueue(hlp);
74addf6c 735 } else {
736 commSetSelect(srv->rfd, COMM_SELECT_READ, helperHandleRead, srv, 0);
737 }
74addf6c 738}
739
94439e4e 740static void
741helperStatefulHandleRead(int fd, void *data)
742{
743 int len;
744 char *t = NULL;
745 helper_stateful_server *srv = data;
746 helper_stateful_request *r;
747 statefulhelper *hlp = srv->parent;
748 assert(fd == srv->rfd);
749 assert(cbdataValid(data));
750 statCounter.syscalls.sock.reads++;
751 len = read(fd, srv->buf + srv->offset, srv->buf_sz - srv->offset);
752 fd_bytes(fd, len, FD_READ);
753 debug(29, 5) ("helperStatefulHandleRead: %d bytes from %s #%d.\n",
754 len, hlp->id_name, srv->index + 1);
755 if (len <= 0) {
756 if (len < 0)
757 debug(50, 1) ("helperStatefulHandleRead: FD %d read: %s\n", fd, xstrerror());
758 comm_close(fd);
759 return;
760 }
761 srv->offset += len;
762 srv->buf[srv->offset] = '\0';
763 r = srv->request;
764 if (r == NULL) {
765 /* someone spoke without being spoken to */
766 debug(29, 1) ("helperStatefulHandleRead: unexpected read from %s #%d, %d bytes\n",
767 hlp->id_name, srv->index + 1, len);
768 srv->offset = 0;
769 } else if ((t = strchr(srv->buf, '\n'))) {
770 /* end of reply found */
771 debug(29, 3) ("helperStatefulHandleRead: end of reply found\n");
772 *t = '\0';
773 if (cbdataValid(r->data)) {
774 switch ((r->callback(r->data, srv, srv->buf))) { /*if non-zero reserve helper */
775 case S_HELPER_UNKNOWN:
776 fatal("helperStatefulHandleRead: either a non-state aware callback was give to the stateful helper routines, or an uninitialised callback response was recieved.\n");
777 break;
778 case S_HELPER_RELEASE: /* helper finished with */
60d096f4 779 if (!srv->deferred_requests && !srv->queue.head) {
94439e4e 780 srv->flags.reserved = S_HELPER_FREE;
781 if ((srv->parent->OnEmptyQueue != NULL) && (srv->data))
782 srv->parent->OnEmptyQueue(srv->data);
783 debug(29, 5) ("StatefulHandleRead: releasing %s #%d\n", hlp->id_name, srv->index + 1);
784 } else {
785 srv->flags.reserved = S_HELPER_DEFERRED;
786 debug(29, 5) ("StatefulHandleRead: outstanding deferred requests on %s #%d. reserving for deferred requests.\n", hlp->id_name, srv->index + 1);
787 }
788 break;
789 case S_HELPER_RESERVE: /* 'pin' this helper for the caller */
790 if (!srv->queue.head) {
60d096f4 791 assert(srv->deferred_requests == 0);
94439e4e 792 srv->flags.reserved = S_HELPER_RESERVED;
793 debug(29, 5) ("StatefulHandleRead: reserving %s #%d\n", hlp->id_name, srv->index + 1);
794 } else {
795 fatal("StatefulHandleRead: Callback routine attempted to reserve a stateful helper with deferred requests. This can lead to deadlock.\n");
796 }
797 break;
798 case S_HELPER_DEFER:
799 /* the helper is still needed, but can
800 * be used for other requests in the meantime.
801 */
802 srv->flags.reserved = S_HELPER_DEFERRED;
803 srv->deferred_requests++;
60d096f4 804 srv->stats.deferbycb++;
94439e4e 805 debug(29, 5) ("StatefulHandleRead: reserving %s #%d for deferred requests.\n", hlp->id_name, srv->index + 1);
806 break;
807 default:
808 fatal("helperStatefulHandleRead: unknown stateful helper callback result.\n");
809 }
810
811 } else {
812 debug(29, 1) ("StatefulHandleRead: no callback data registered\n");
813 }
814 srv->flags.busy = 0;
815 srv->offset = 0;
816 helperStatefulRequestFree(r);
817 srv->request = NULL;
818 hlp->stats.replies++;
819 hlp->stats.avg_svc_time =
820 intAverage(hlp->stats.avg_svc_time,
821 tvSubMsec(srv->dispatch_time, current_time),
822 hlp->stats.replies, REDIRECT_AV_FACTOR);
823 if (srv->flags.shutdown) {
824 comm_close(srv->wfd);
825 srv->wfd = -1;
826 } else {
827 if (srv->queue.head)
828 helperStatefulServerKickQueue(srv);
829 else
830 helperStatefulKickQueue(hlp);
831 }
832 } else {
833 commSetSelect(srv->rfd, COMM_SELECT_READ, helperStatefulHandleRead, srv, 0);
834 }
835}
836
74addf6c 837static void
838Enqueue(helper * hlp, helper_request * r)
839{
c68e9c6b 840 dlink_node *link = memAllocate(MEM_DLINK_NODE);
74addf6c 841 dlinkAddTail(r, link, &hlp->queue);
842 hlp->stats.queue_size++;
843 if (hlp->stats.queue_size < hlp->n_running)
844 return;
845 if (squid_curtime - hlp->last_queue_warn < 600)
846 return;
fe73896c 847 if (shutting_down || reconfiguring)
848 return;
74addf6c 849 hlp->last_queue_warn = squid_curtime;
850 debug(14, 0) ("WARNING: All %s processes are busy.\n", hlp->id_name);
851 debug(14, 0) ("WARNING: %d pending requests queued\n", hlp->stats.queue_size);
852 if (hlp->stats.queue_size > hlp->n_running * 2)
853 fatalf("Too many queued %s requests", hlp->id_name);
854 debug(14, 1) ("Consider increasing the number of %s processes in your config file.\n", hlp->id_name);
855}
856
94439e4e 857static void
858StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r)
859{
860 dlink_node *link = memAllocate(MEM_DLINK_NODE);
861 dlinkAddTail(r, link, &hlp->queue);
862 hlp->stats.queue_size++;
863 if (hlp->stats.queue_size < hlp->n_running)
864 return;
865 if (squid_curtime - hlp->last_queue_warn < 600)
866 return;
867 if (shutting_down || reconfiguring)
868 return;
869 hlp->last_queue_warn = squid_curtime;
870 debug(14, 0) ("WARNING: All %s processes are busy.\n", hlp->id_name);
871 debug(14, 0) ("WARNING: %d pending requests queued\n", hlp->stats.queue_size);
872 if (hlp->stats.queue_size > hlp->n_running * 2)
873 fatalf("Too many queued %s requests", hlp->id_name);
874 debug(14, 1) ("Consider increasing the number of %s processes in your config file.\n", hlp->id_name);
875}
876
877static void
878StatefulServerEnqueue(helper_stateful_server * srv, helper_stateful_request * r)
879{
880 dlink_node *link = memAllocate(MEM_DLINK_NODE);
881 dlinkAddTail(r, link, &srv->queue);
2d70df72 882/* TODO: warning if the queue on this server is more than X
883 * We don't check the queue size at the moment, because
884 * requests hitting here are deferrable
885 */
886/* hlp->stats.queue_size++;
887 * if (hlp->stats.queue_size < hlp->n_running)
888 * return;
889 * if (squid_curtime - hlp->last_queue_warn < 600)
890 * return;
891 * if (shutting_down || reconfiguring)
892 * return;
893 * hlp->last_queue_warn = squid_curtime;
894 * debug(14, 0) ("WARNING: All %s processes are busy.\n", hlp->id_name);
895 * debug(14, 0) ("WARNING: %d pending requests queued\n", hlp->stats.queue_size);
896 * if (hlp->stats.queue_size > hlp->n_running * 2)
897 * fatalf("Too many queued %s requests", hlp->id_name);
898 * debug(14, 1) ("Consider increasing the number of %s processes in your config file.\n", hlp->id_name); */
94439e4e 899}
900
901
74addf6c 902static helper_request *
903Dequeue(helper * hlp)
904{
905 dlink_node *link;
906 helper_request *r = NULL;
907 if ((link = hlp->queue.head)) {
908 r = link->data;
909 dlinkDelete(link, &hlp->queue);
db1cd23c 910 memFree(link, MEM_DLINK_NODE);
74addf6c 911 hlp->stats.queue_size--;
912 }
913 return r;
914}
915
94439e4e 916static helper_stateful_request *
917StatefulServerDequeue(helper_stateful_server * srv)
918{
919 dlink_node *link;
920 helper_stateful_request *r = NULL;
921 if ((link = srv->queue.head)) {
922 r = link->data;
923 dlinkDelete(link, &srv->queue);
924 memFree(link, MEM_DLINK_NODE);
925 }
926 return r;
927}
928
929static helper_stateful_request *
930StatefulDequeue(statefulhelper * hlp)
931{
932 dlink_node *link;
933 helper_stateful_request *r = NULL;
934 if ((link = hlp->queue.head)) {
935 r = link->data;
936 dlinkDelete(link, &hlp->queue);
937 memFree(link, MEM_DLINK_NODE);
938 hlp->stats.queue_size--;
939 }
940 return r;
941}
942
74addf6c 943static helper_server *
944GetFirstAvailable(helper * hlp)
945{
946 dlink_node *n;
947 helper_server *srv = NULL;
fe73896c 948 if (hlp->n_running == 0)
949 return NULL;
74addf6c 950 for (n = hlp->servers.head; n != NULL; n = n->next) {
951 srv = n->data;
952 if (srv->flags.busy)
953 continue;
954 if (!srv->flags.alive)
955 continue;
956 return srv;
957 }
958 return NULL;
959}
960
94439e4e 961static helper_stateful_server *
962StatefulGetFirstAvailable(statefulhelper * hlp)
963{
964 dlink_node *n;
965 helper_stateful_server *srv = NULL;
966 debug(29, 5) ("StatefulGetFirstAvailable: Running servers %d.\n", hlp->n_running);
967 if (hlp->n_running == 0)
968 return NULL;
969 for (n = hlp->servers.head; n != NULL; n = n->next) {
970 srv = n->data;
971 if (srv->flags.busy)
972 continue;
973 if (srv->flags.reserved == S_HELPER_RESERVED)
974 continue;
975 if (!srv->flags.alive)
976 continue;
977 if ((hlp->IsAvailable != NULL) && (srv->data != NULL) && !(hlp->IsAvailable(srv->data)))
978 continue;
979 return srv;
980 }
981 debug(29, 5) ("StatefulGetFirstAvailable: None available.\n");
982 return NULL;
983}
984
985
74addf6c 986static void
987helperDispatch(helper_server * srv, helper_request * r)
988{
989 helper *hlp = srv->parent;
990 if (!cbdataValid(r->data)) {
991 debug(29, 1) ("helperDispatch: invalid callback data\n");
992 helperRequestFree(r);
993 return;
994 }
995 assert(!srv->flags.busy);
996 srv->flags.busy = 1;
997 srv->request = r;
998 srv->dispatch_time = current_time;
999 comm_write(srv->wfd,
1000 r->buf,
1001 strlen(r->buf),
1002 NULL, /* Handler */
1003 NULL, /* Handler-data */
1004 NULL); /* free */
1005 commSetSelect(srv->rfd,
1006 COMM_SELECT_READ,
1007 helperHandleRead,
1008 srv, 0);
1009 debug(29, 5) ("helperDispatch: Request sent to %s #%d, %d bytes\n",
1010 hlp->id_name, srv->index + 1, strlen(r->buf));
1011 srv->stats.uses++;
1012 hlp->stats.requests++;
1013}
1014
94439e4e 1015static void
1016helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r)
1017{
1018 statefulhelper *hlp = srv->parent;
1019 if (!cbdataValid(r->data)) {
1020 debug(29, 1) ("helperStatefulDispatch: invalid callback data\n");
1021 helperStatefulRequestFree(r);
1022 return;
1023 }
1024 debug(29, 9) ("helperStatefulDispatch busying helper %s #%d\n", hlp->id_name, srv->index + 1);
1025 if (r->placeholder == 1) {
1026 /* a callback is needed before this request can _use_ a helper. */
721b0310 1027 /* we don't care about releasing/deferring this helper. The request NEVER
1028 * gets to the helper. So we throw away the return code */
1029 r->callback(r->data, srv, NULL);
1030 /* throw away the placeholder */
1031 helperStatefulRequestFree(r);
1032 /* and push the queue. Note that the callback may have submitted a new
1033 * request to the helper which is why we test for the request*/
1034 if (srv->request == NULL) {
1035 if (srv->flags.shutdown) {
1036 comm_close(srv->wfd);
1037 srv->wfd = -1;
1038 } else {
1039 if (srv->queue.head)
1040 helperStatefulServerKickQueue(srv);
1041 else
1042 helperStatefulKickQueue(hlp);
94439e4e 1043 }
1044 }
1045 return;
1046 }
1047 srv->flags.busy = 1;
1048 srv->request = r;
1049 srv->dispatch_time = current_time;
1050 comm_write(srv->wfd,
1051 r->buf,
1052 strlen(r->buf),
1053 NULL, /* Handler */
1054 NULL, /* Handler-data */
1055 NULL); /* free */
1056 commSetSelect(srv->rfd,
1057 COMM_SELECT_READ,
1058 helperStatefulHandleRead,
1059 srv, 0);
1060 debug(29, 5) ("helperStatefulDispatch: Request sent to %s #%d, %d bytes\n",
1061 hlp->id_name, srv->index + 1, strlen(r->buf));
1062 srv->stats.uses++;
1063 hlp->stats.requests++;
1064}
1065
1066
74addf6c 1067static void
1068helperKickQueue(helper * hlp)
1069{
1070 helper_request *r;
1071 helper_server *srv;
1072 while ((srv = GetFirstAvailable(hlp)) && (r = Dequeue(hlp)))
1073 helperDispatch(srv, r);
1074}
1075
94439e4e 1076static void
1077helperStatefulKickQueue(statefulhelper * hlp)
1078{
1079 helper_stateful_request *r;
1080 helper_stateful_server *srv;
1081 while ((srv = StatefulGetFirstAvailable(hlp)) && (r = StatefulDequeue(hlp)))
1082 helperStatefulDispatch(srv, r);
1083}
1084
1085static void
1086helperStatefulServerKickQueue(helper_stateful_server * srv)
1087{
1088 helper_stateful_request *r;
1089 if ((r = StatefulServerDequeue(srv)))
1090 helperStatefulDispatch(srv, r);
1091}
1092
74addf6c 1093static void
1094helperRequestFree(helper_request * r)
1095{
1096 cbdataUnlock(r->data);
1097 xfree(r->buf);
db1cd23c 1098 memFree(r, MEM_HELPER_REQUEST);
74addf6c 1099}
94439e4e 1100
1101static void
1102helperStatefulRequestFree(helper_stateful_request * r)
1103{
1104 cbdataUnlock(r->data);
1105 xfree(r->buf);
1106 memFree(r, MEM_HELPER_STATEFUL_REQUEST);
1107}