]> git.ipfire.org Git - thirdparty/squid.git/blame - src/helper.cc
Updated hosts_file directive description.
[thirdparty/squid.git] / src / helper.cc
CommitLineData
f740a279 1
2/*
6bf4f823 3 * $Id: helper.cc,v 1.70 2005/10/23 11:55:37 hno Exp $
f740a279 4 *
17bb3486 5 * DEBUG: section 84 Helper process maintenance
f740a279 6 * AUTHOR: Harvest Derived?
7 *
2b6662ba 8 * SQUID Web Proxy Cache http://www.squid-cache.org/
f740a279 9 * ----------------------------------------------------------
10 *
2b6662ba 11 * Squid is the result of efforts by numerous individuals from
12 * the Internet community; see the CONTRIBUTORS file for full
13 * details. Many organizations have provided support for Squid's
14 * development; see the SPONSORS file for full details. Squid is
15 * Copyrighted (C) 2001 by the Regents of the University of
16 * California; see the COPYRIGHT file for full details. Squid
17 * incorporates software developed and/or copyrighted by other
18 * sources; see the CREDITS file for full details.
f740a279 19 *
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
24 *
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
29 *
30 * You should have received a copy of the GNU General Public License
31 * along with this program; if not, write to the Free Software
32 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
33 *
34 */
35
74addf6c 36#include "squid.h"
51ee7c82 37#include "helper.h"
e6ccf245 38#include "Store.h"
42679bd6 39#include "comm.h"
0eb49b6d 40#include "MemBuf.h"
74addf6c 41
42#define HELPER_MAX_ARGS 64
43
c4b7a5a9 44static IOCB helperHandleRead;
45static IOCB helperStatefulHandleRead;
1f5f60dd 46static PF helperServerFree;
94439e4e 47static PF helperStatefulServerFree;
74addf6c 48static void Enqueue(helper * hlp, helper_request *);
49static helper_request *Dequeue(helper * hlp);
94439e4e 50static helper_stateful_request *StatefulDequeue(statefulhelper * hlp);
74addf6c 51static helper_server *GetFirstAvailable(helper * hlp);
94439e4e 52static helper_stateful_server *StatefulGetFirstAvailable(statefulhelper * hlp);
74addf6c 53static void helperDispatch(helper_server * srv, helper_request * r);
94439e4e 54static void helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r);
74addf6c 55static void helperKickQueue(helper * hlp);
94439e4e 56static void helperStatefulKickQueue(statefulhelper * hlp);
74addf6c 57static void helperRequestFree(helper_request * r);
94439e4e 58static void helperStatefulRequestFree(helper_stateful_request * r);
59static void StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r);
60static helper_stateful_request *StatefulServerDequeue(helper_stateful_server * srv);
61static void StatefulServerEnqueue(helper_stateful_server * srv, helper_stateful_request * r);
62static void helperStatefulServerKickQueue(helper_stateful_server * srv);
74addf6c 63
64void
65helperOpenServers(helper * hlp)
66{
67 char *s;
68 char *progname;
69 char *shortname;
70 char *procname;
a2c963ae 71 const char *args[HELPER_MAX_ARGS];
74addf6c 72 char fd_note_buf[FD_DESC_SZ];
73 helper_server *srv;
74 int nargs = 0;
75 int k;
76 int x;
77 int rfd;
78 int wfd;
79 wordlist *w;
62e76326 80
74addf6c 81 if (hlp->cmdline == NULL)
62e76326 82 return;
83
74addf6c 84 progname = hlp->cmdline->key;
62e76326 85
74addf6c 86 if ((s = strrchr(progname, '/')))
62e76326 87 shortname = xstrdup(s + 1);
74addf6c 88 else
62e76326 89 shortname = xstrdup(progname);
90
17bb3486 91 debug(84, 1) ("helperOpenServers: Starting %d '%s' processes\n",
62e76326 92 hlp->n_to_start, shortname);
93
e6ccf245 94 procname = (char *)xmalloc(strlen(shortname) + 3);
62e76326 95
74addf6c 96 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
62e76326 97
74addf6c 98 args[nargs++] = procname;
62e76326 99
74addf6c 100 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next)
62e76326 101 args[nargs++] = w->key;
102
74addf6c 103 args[nargs++] = NULL;
62e76326 104
74addf6c 105 assert(nargs <= HELPER_MAX_ARGS);
62e76326 106
74addf6c 107 for (k = 0; k < hlp->n_to_start; k++) {
62e76326 108 getCurrentTime();
109 rfd = wfd = -1;
110 x = ipcCreate(hlp->ipc_type,
111 progname,
112 args,
113 shortname,
114 &rfd,
115 &wfd);
116
117 if (x < 0) {
118 debug(84, 1) ("WARNING: Cannot run '%s' process.\n", progname);
119 continue;
120 }
121
122 hlp->n_running++;
d8f10d6a 123 hlp->n_active++;
62e76326 124 srv = cbdataAlloc(helper_server);
125 srv->pid = x;
62e76326 126 srv->index = k;
127 srv->rfd = rfd;
128 srv->wfd = wfd;
07eca7e0 129 srv->rbuf = (char *)memAllocBuf(8192, &srv->rbuf_sz);
90bed1c4 130 srv->wqueue = new MemBuf;
07eca7e0 131 srv->roffset = 0;
132 srv->requests = (helper_request **)xcalloc(hlp->concurrency ? hlp->concurrency : 1, sizeof(*srv->requests));
62e76326 133 srv->parent = cbdataReference(hlp);
134 dlinkAddTail(srv, &srv->link, &hlp->servers);
135
136 if (rfd == wfd) {
137 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
138 fd_note(rfd, fd_note_buf);
139 } else {
140 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
141 fd_note(rfd, fd_note_buf);
142 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
143 fd_note(wfd, fd_note_buf);
144 }
145
146 commSetNonBlocking(rfd);
147
148 if (wfd != rfd)
149 commSetNonBlocking(wfd);
150
151 comm_add_close_handler(rfd, helperServerFree, srv);
07eca7e0 152
153 comm_read(srv->rfd, srv->rbuf, srv->rbuf_sz - 1, helperHandleRead, srv);
74addf6c 154 }
62e76326 155
5ea33fce 156 hlp->last_restart = squid_curtime;
74addf6c 157 safe_free(shortname);
158 safe_free(procname);
838b993c 159 helperKickQueue(hlp);
74addf6c 160}
161
94439e4e 162void
163helperStatefulOpenServers(statefulhelper * hlp)
164{
165 char *s;
166 char *progname;
167 char *shortname;
168 char *procname;
a2c963ae 169 const char *args[HELPER_MAX_ARGS];
94439e4e 170 char fd_note_buf[FD_DESC_SZ];
171 helper_stateful_server *srv;
172 int nargs = 0;
173 int k;
174 int x;
175 int rfd;
176 int wfd;
177 wordlist *w;
62e76326 178
94439e4e 179 if (hlp->cmdline == NULL)
62e76326 180 return;
181
94439e4e 182 progname = hlp->cmdline->key;
62e76326 183
94439e4e 184 if ((s = strrchr(progname, '/')))
62e76326 185 shortname = xstrdup(s + 1);
94439e4e 186 else
62e76326 187 shortname = xstrdup(progname);
188
17bb3486 189 debug(84, 1) ("helperStatefulOpenServers: Starting %d '%s' processes\n",
62e76326 190 hlp->n_to_start, shortname);
191
e6ccf245 192 procname = (char *)xmalloc(strlen(shortname) + 3);
62e76326 193
94439e4e 194 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
62e76326 195
94439e4e 196 args[nargs++] = procname;
62e76326 197
94439e4e 198 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next)
62e76326 199 args[nargs++] = w->key;
200
94439e4e 201 args[nargs++] = NULL;
62e76326 202
94439e4e 203 assert(nargs <= HELPER_MAX_ARGS);
62e76326 204
94439e4e 205 for (k = 0; k < hlp->n_to_start; k++) {
62e76326 206 getCurrentTime();
207 rfd = wfd = -1;
208 x = ipcCreate(hlp->ipc_type,
209 progname,
210 args,
211 shortname,
212 &rfd,
213 &wfd);
214
215 if (x < 0) {
216 debug(84, 1) ("WARNING: Cannot run '%s' process.\n", progname);
217 continue;
218 }
219
220 hlp->n_running++;
d8f10d6a 221 hlp->n_active++;
62e76326 222 srv = cbdataAlloc(helper_stateful_server);
223 srv->pid = x;
62e76326 224 srv->flags.reserved = S_HELPER_FREE;
225 srv->deferred_requests = 0;
226 srv->stats.deferbyfunc = 0;
227 srv->stats.deferbycb = 0;
228 srv->stats.submits = 0;
229 srv->stats.releases = 0;
230 srv->index = k;
231 srv->rfd = rfd;
232 srv->wfd = wfd;
07eca7e0 233 srv->rbuf = (char *)memAllocBuf(8192, &srv->rbuf_sz);
234 srv->roffset = 0;
62e76326 235 srv->parent = cbdataReference(hlp);
236
237 if (hlp->datapool != NULL)
b001e822 238 srv->data = hlp->datapool->alloc();
62e76326 239
240 dlinkAddTail(srv, &srv->link, &hlp->servers);
241
242 if (rfd == wfd) {
243 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
244 fd_note(rfd, fd_note_buf);
245 } else {
246 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
247 fd_note(rfd, fd_note_buf);
248 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
249 fd_note(wfd, fd_note_buf);
250 }
251
252 commSetNonBlocking(rfd);
253
254 if (wfd != rfd)
255 commSetNonBlocking(wfd);
256
257 comm_add_close_handler(rfd, helperStatefulServerFree, srv);
07eca7e0 258
259 comm_read(srv->rfd, srv->rbuf, srv->rbuf_sz - 1, helperStatefulHandleRead, srv);
260
94439e4e 261 }
62e76326 262
5ea33fce 263 hlp->last_restart = squid_curtime;
94439e4e 264 safe_free(shortname);
265 safe_free(procname);
266 helperStatefulKickQueue(hlp);
267}
268
269
74addf6c 270void
271helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
272{
51ee7c82 273 helper_request *r = new helper_request;
74addf6c 274 helper_server *srv;
62e76326 275
5b5f9257 276 if (hlp == NULL) {
62e76326 277 debug(84, 3) ("helperSubmit: hlp == NULL\n");
278 callback(data, NULL);
279 return;
5b5f9257 280 }
62e76326 281
74addf6c 282 r->callback = callback;
fa80a8ef 283 r->data = cbdataReference(data);
74addf6c 284 r->buf = xstrdup(buf);
62e76326 285
74addf6c 286 if ((srv = GetFirstAvailable(hlp)))
62e76326 287 helperDispatch(srv, r);
74addf6c 288 else
62e76326 289 Enqueue(hlp, r);
290
17bb3486 291 debug(84, 9) ("helperSubmit: %s\n", buf);
94439e4e 292}
293
721b0310 294/* lastserver = "server last used as part of a deferred or reserved
295 * request sequence"
296 */
94439e4e 297void
298helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPSCB * callback, void *data, helper_stateful_server * lastserver)
299{
51ee7c82 300 helper_stateful_request *r = new helper_stateful_request;
94439e4e 301 helper_stateful_server *srv;
62e76326 302
94439e4e 303 if (hlp == NULL) {
62e76326 304 debug(84, 3) ("helperStatefulSubmit: hlp == NULL\n");
305 callback(data, 0, NULL);
306 return;
94439e4e 307 }
62e76326 308
94439e4e 309 r->callback = callback;
fa80a8ef 310 r->data = cbdataReference(data);
62e76326 311
721b0310 312 if (buf != NULL) {
62e76326 313 r->buf = xstrdup(buf);
314 r->placeholder = 0;
721b0310 315 } else {
62e76326 316 r->buf = NULL;
317 r->placeholder = 1;
721b0310 318 }
62e76326 319
94439e4e 320 if ((buf != NULL) && lastserver) {
62e76326 321 debug(84, 5) ("StatefulSubmit with lastserver %p\n", lastserver);
322 /* the queue doesn't count for this assert because queued requests
323 * have already gone through here and been tested.
324 * It's legal to have deferred_requests == 0 and queue entries
325 * and status of S_HELPEER_DEFERRED.
326 * BUT: It's not legal to submit a new request w/lastserver in
327 * that state.
328 */
329 assert(!(lastserver->deferred_requests == 0 &&
330 lastserver->flags.reserved == S_HELPER_DEFERRED));
331
332 if (lastserver->flags.reserved != S_HELPER_RESERVED) {
333 lastserver->stats.submits++;
334 lastserver->deferred_requests--;
335 }
336
337 if (!(lastserver->request)) {
338 debug(84, 5) ("StatefulSubmit dispatching\n");
339 helperStatefulDispatch(lastserver, r);
340 } else {
341 debug(84, 5) ("StatefulSubmit queuing\n");
342 StatefulServerEnqueue(lastserver, r);
343 }
94439e4e 344 } else {
62e76326 345 if ((srv = StatefulGetFirstAvailable(hlp))) {
346 helperStatefulDispatch(srv, r);
347 } else
348 StatefulEnqueue(hlp, r);
94439e4e 349 }
62e76326 350
17bb3486 351 debug(84, 9) ("helperStatefulSubmit: placeholder: '%d', buf '%s'.\n", r->placeholder, buf);
94439e4e 352}
353
354helper_stateful_server *
355helperStatefulDefer(statefulhelper * hlp)
356/* find and add a deferred request to a server */
357{
358 dlink_node *n;
359 helper_stateful_server *srv = NULL, *rv = NULL;
62e76326 360
361 if (hlp == NULL)
362 {
363 debug(84, 3) ("helperStatefulDefer: hlp == NULL\n");
364 return NULL;
94439e4e 365 }
62e76326 366
17bb3486 367 debug(84, 5) ("helperStatefulDefer: Running servers %d.\n", hlp->n_running);
62e76326 368
369 if (hlp->n_running == 0)
370 {
371 debug(84, 1) ("helperStatefulDefer: No running servers!. \n");
372 return NULL;
94439e4e 373 }
62e76326 374
0a706c69 375 rv = srv = StatefulGetFirstAvailable(hlp);
62e76326 376
377 if (rv == NULL)
378 {
379 /*
380 * all currently busy; loop through servers and find server
381 * with the shortest queue
382 */
383
384 for (n = hlp->servers.head; n != NULL; n = n->next) {
385 srv = (helper_stateful_server *)n->data;
386
387 if (srv->flags.reserved == S_HELPER_RESERVED)
388 continue;
389
b902a58c 390 if (!srv->flags.shutdown)
62e76326 391 continue;
392
393 if ((hlp->IsAvailable != NULL) && (srv->data != NULL) &&
394 !(hlp->IsAvailable(srv->data)))
395 continue;
396
397 if ((rv != NULL) && (rv->deferred_requests < srv->deferred_requests))
398 continue;
399
400 rv = srv;
401 }
0a706c69 402 }
62e76326 403
404 if (rv == NULL)
405 {
406 debug(84, 1) ("helperStatefulDefer: None available.\n");
407 return NULL;
94439e4e 408 }
62e76326 409
60d096f4 410 /* consistency check:
411 * when the deferred count is 0,
412 * submits + releases == deferbyfunc + deferbycb
413 * Or in english, when there are no deferred requests, the amount
414 * we have submitted to the queue or cancelled must equal the amount
415 * we have said we wanted to be able to submit or cancel
416 */
417 if (rv->deferred_requests == 0)
62e76326 418 assert(rv->stats.submits + rv->stats.releases ==
419 rv->stats.deferbyfunc + rv->stats.deferbycb);
60d096f4 420
94439e4e 421 rv->flags.reserved = S_HELPER_DEFERRED;
62e76326 422
94439e4e 423 rv->deferred_requests++;
62e76326 424
60d096f4 425 rv->stats.deferbyfunc++;
62e76326 426
94439e4e 427 return rv;
428}
429
430void
431helperStatefulReset(helper_stateful_server * srv)
62e76326 432/* puts this helper back in the queue. the calling app is required to
94439e4e 433 * manage the state in the helper.
434 */
435{
436 statefulhelper *hlp = srv->parent;
437 helper_stateful_request *r;
438 r = srv->request;
62e76326 439
440 if (r != NULL)
441 {
442 /* reset attempt DURING an outstaning request */
443 debug(84, 1) ("helperStatefulReset: RESET During request %s \n",
444 hlp->id_name);
445 srv->flags.busy = 0;
07eca7e0 446 srv->roffset = 0;
62e76326 447 helperStatefulRequestFree(r);
448 srv->request = NULL;
94439e4e 449 }
62e76326 450
94439e4e 451 srv->flags.busy = 0;
62e76326 452
453 if (srv->queue.head)
454 {
455 srv->flags.reserved = S_HELPER_DEFERRED;
456 helperStatefulServerKickQueue(srv);
457 } else
458 {
459 srv->flags.reserved = S_HELPER_FREE;
460
461 if ((srv->parent->OnEmptyQueue != NULL) && (srv->data))
462 srv->parent->OnEmptyQueue(srv->data);
463
464 helperStatefulKickQueue(hlp);
94439e4e 465 }
466}
467
468void
469helperStatefulReleaseServer(helper_stateful_server * srv)
470/*decrease the number of 'waiting' clients that set the helper to be DEFERRED */
471{
60d096f4 472 srv->stats.releases++;
62e76326 473
474 if (srv->flags.reserved == S_HELPER_DEFERRED)
475 {
476 assert(srv->deferred_requests);
477 srv->deferred_requests--;
60d096f4 478 }
62e76326 479
480 if (!(srv->deferred_requests) && (srv->flags.reserved == S_HELPER_DEFERRED) && !(srv->queue.head))
481 {
482 srv->flags.reserved = S_HELPER_FREE;
483
484 if ((srv->parent->OnEmptyQueue != NULL) && (srv->data))
485 srv->parent->OnEmptyQueue(srv->data);
94439e4e 486 }
487}
488
489void *
490helperStatefulServerGetData(helper_stateful_server * srv)
491/* return a pointer to the stateful routines data area */
492{
493 return srv->data;
74addf6c 494}
495
496void
497helperStats(StoreEntry * sentry, helper * hlp)
498{
74addf6c 499 dlink_node *link;
0a706c69 500 storeAppendPrintf(sentry, "program: %s\n",
62e76326 501 hlp->cmdline->key);
74addf6c 502 storeAppendPrintf(sentry, "number running: %d of %d\n",
62e76326 503 hlp->n_running, hlp->n_to_start);
74addf6c 504 storeAppendPrintf(sentry, "requests sent: %d\n",
62e76326 505 hlp->stats.requests);
74addf6c 506 storeAppendPrintf(sentry, "replies received: %d\n",
62e76326 507 hlp->stats.replies);
74addf6c 508 storeAppendPrintf(sentry, "queue length: %d\n",
62e76326 509 hlp->stats.queue_size);
a15d85ea 510 storeAppendPrintf(sentry, "avg service time: %.2f msec\n",
62e76326 511 (double) hlp->stats.avg_svc_time / 1000.0);
74addf6c 512 storeAppendPrintf(sentry, "\n");
09c1ece1 513 storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%s\t%7s\t%7s\t%7s\n",
62e76326 514 "#",
515 "FD",
516 "PID",
517 "# Requests",
518 "Flags",
519 "Time",
520 "Offset",
521 "Request");
522
74addf6c 523 for (link = hlp->servers.head; link; link = link->next) {
07eca7e0 524 helper_server *srv = (helper_server*)link->data;
525 double tt = srv->requests[0] ? 0.001 * tvSubMsec(srv->requests[0]->dispatch_time, current_time) : 0.0;
d8f10d6a 526 storeAppendPrintf(sentry, "%7d\t%7d\t%7d\t%11d\t%c%c%c%c\t%7.3f\t%7d\t%s\n",
62e76326 527 srv->index + 1,
528 srv->rfd,
529 srv->pid,
530 srv->stats.uses,
07eca7e0 531 srv->stats.pending ? 'B' : ' ',
532 srv->flags.writing ? 'W' : ' ',
62e76326 533 srv->flags.closing ? 'C' : ' ',
534 srv->flags.shutdown ? 'S' : ' ',
535 tt < 0.0 ? 0.0 : tt,
07eca7e0 536 (int) srv->roffset,
537 srv->requests[0] ? log_quote(srv->requests[0]->buf) : "(none)");
74addf6c 538 }
62e76326 539
74addf6c 540 storeAppendPrintf(sentry, "\nFlags key:\n\n");
74addf6c 541 storeAppendPrintf(sentry, " B = BUSY\n");
07eca7e0 542 storeAppendPrintf(sentry, " W = BUSY\n");
74addf6c 543 storeAppendPrintf(sentry, " C = CLOSING\n");
544 storeAppendPrintf(sentry, " S = SHUTDOWN\n");
545}
546
94439e4e 547void
548helperStatefulStats(StoreEntry * sentry, statefulhelper * hlp)
549{
550 helper_stateful_server *srv;
551 dlink_node *link;
552 double tt;
553 storeAppendPrintf(sentry, "number running: %d of %d\n",
62e76326 554 hlp->n_running, hlp->n_to_start);
94439e4e 555 storeAppendPrintf(sentry, "requests sent: %d\n",
62e76326 556 hlp->stats.requests);
94439e4e 557 storeAppendPrintf(sentry, "replies received: %d\n",
62e76326 558 hlp->stats.replies);
94439e4e 559 storeAppendPrintf(sentry, "queue length: %d\n",
62e76326 560 hlp->stats.queue_size);
94439e4e 561 storeAppendPrintf(sentry, "avg service time: %d msec\n",
62e76326 562 hlp->stats.avg_svc_time);
94439e4e 563 storeAppendPrintf(sentry, "\n");
0a706c69 564 storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%20s\t%s\t%7s\t%7s\t%7s\n",
62e76326 565 "#",
566 "FD",
567 "PID",
568 "# Requests",
569 "# Deferred Requests",
570 "Flags",
571 "Time",
572 "Offset",
573 "Request");
574
94439e4e 575 for (link = hlp->servers.head; link; link = link->next) {
62e76326 576 srv = (helper_stateful_server *)link->data;
577 tt = 0.001 * tvSubMsec(srv->dispatch_time, current_time);
b902a58c 578 storeAppendPrintf(sentry, "%7d\t%7d\t%7d\t%11d\t%20d\t%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
62e76326 579 srv->index + 1,
580 srv->rfd,
581 srv->pid,
582 srv->stats.uses,
583 (int) srv->deferred_requests,
62e76326 584 srv->flags.busy ? 'B' : ' ',
585 srv->flags.closing ? 'C' : ' ',
586 srv->flags.reserved != S_HELPER_FREE ? 'R' : ' ',
587 srv->flags.shutdown ? 'S' : ' ',
588 srv->request ? (srv->request->placeholder ? 'P' : ' ') : ' ',
589 tt < 0.0 ? 0.0 : tt,
07eca7e0 590 (int) srv->roffset,
62e76326 591 srv->request ? log_quote(srv->request->buf) : "(none)");
94439e4e 592 }
62e76326 593
94439e4e 594 storeAppendPrintf(sentry, "\nFlags key:\n\n");
94439e4e 595 storeAppendPrintf(sentry, " B = BUSY\n");
596 storeAppendPrintf(sentry, " C = CLOSING\n");
597 storeAppendPrintf(sentry, " R = RESERVED or DEFERRED\n");
598 storeAppendPrintf(sentry, " S = SHUTDOWN\n");
5d146f7d 599 storeAppendPrintf(sentry, " P = PLACEHOLDER\n");
94439e4e 600}
601
74addf6c 602void
603helperShutdown(helper * hlp)
604{
c68e9c6b 605 dlink_node *link = hlp->servers.head;
62e76326 606
c68e9c6b 607 while (link) {
62e76326 608 helper_server *srv;
609 srv = (helper_server *)link->data;
610 link = link->next;
611
8cfc76db 612 if (srv->flags.shutdown) {
d8f10d6a 613 debug(84, 3) ("helperShutdown: %s #%d has already SHUT DOWN.\n",
62e76326 614 hlp->id_name, srv->index + 1);
615 continue;
616 }
617
d8f10d6a 618 hlp->n_active--;
619 assert(hlp->n_active >= 0);
620
62e76326 621 srv->flags.shutdown = 1; /* request it to shut itself down */
622
d8f10d6a 623 if (srv->flags.closing) {
624 debug(84, 3) ("helperShutdown: %s #%d is CLOSING.\n",
62e76326 625 hlp->id_name, srv->index + 1);
626 continue;
627 }
628
d8f10d6a 629 if (srv->stats.pending) {
630 debug(84, 3) ("helperShutdown: %s #%d is BUSY.\n",
62e76326 631 hlp->id_name, srv->index + 1);
632 continue;
633 }
634
635 srv->flags.closing = 1;
8cfc76db 636 debug(84, 3) ("helperShutdown: %s #%d shutting down.\n",
637 hlp->id_name, srv->index + 1);
62e76326 638 /* the rest of the details is dealt with in the helperServerFree
639 * close handler
640 */
641 comm_close(srv->rfd);
74addf6c 642 }
643}
644
94439e4e 645void
646helperStatefulShutdown(statefulhelper * hlp)
647{
648 dlink_node *link = hlp->servers.head;
649 helper_stateful_server *srv;
62e76326 650
94439e4e 651 while (link) {
62e76326 652 srv = (helper_stateful_server *)link->data;
653 link = link->next;
654
8cfc76db 655 if (srv->flags.shutdown) {
d8f10d6a 656 debug(84, 3) ("helperStatefulShutdown: %s #%d has already SHUT DOWN.\n",
62e76326 657 hlp->id_name, srv->index + 1);
658 continue;
659 }
660
d8f10d6a 661 hlp->n_active--;
662 assert(hlp->n_active >= 0);
62e76326 663 srv->flags.shutdown = 1; /* request it to shut itself down */
664
665 if (srv->flags.busy) {
666 debug(84, 3) ("helperStatefulShutdown: %s #%d is BUSY.\n",
667 hlp->id_name, srv->index + 1);
668 continue;
669 }
670
671 if (srv->flags.closing) {
672 debug(84, 3) ("helperStatefulShutdown: %s #%d is CLOSING.\n",
673 hlp->id_name, srv->index + 1);
674 continue;
675 }
676
677 if (srv->flags.reserved != S_HELPER_FREE) {
678 debug(84, 3) ("helperStatefulShutdown: %s #%d is RESERVED.\n",
679 hlp->id_name, srv->index + 1);
680 continue;
681 }
682
683 if (srv->deferred_requests) {
684 debug(84, 3) ("helperStatefulShutdown: %s #%d has DEFERRED requests.\n",
685 hlp->id_name, srv->index + 1);
686 continue;
687 }
688
689 srv->flags.closing = 1;
8cfc76db 690 debug(84, 3) ("helperStatefulShutdown: %s #%d shutting down.\n",
691 hlp->id_name, srv->index + 1);
62e76326 692 /* the rest of the details is dealt with in the helperStatefulServerFree
693 * close handler
694 */
695 comm_close(srv->rfd);
94439e4e 696 }
697}
698
699
1f5f60dd 700helper *
701helperCreate(const char *name)
702{
28c60158 703 helper *hlp;
72711e31 704 hlp = cbdataAlloc(helper);
1f5f60dd 705 hlp->id_name = name;
706 return hlp;
707}
708
94439e4e 709statefulhelper *
710helperStatefulCreate(const char *name)
711{
712 statefulhelper *hlp;
72711e31 713 hlp = cbdataAlloc(statefulhelper);
94439e4e 714 hlp->id_name = name;
715 return hlp;
716}
717
718
1f5f60dd 719void
720helperFree(helper * hlp)
721{
5dae8514 722 if (!hlp)
62e76326 723 return;
724
1f5f60dd 725 /* note, don't free hlp->name, it probably points to static memory */
fe73896c 726 if (hlp->queue.head)
62e76326 727 debug(84, 0) ("WARNING: freeing %s helper with %d requests queued\n",
728 hlp->id_name, hlp->stats.queue_size);
729
1f5f60dd 730 cbdataFree(hlp);
731}
732
94439e4e 733void
734helperStatefulFree(statefulhelper * hlp)
735{
5dae8514 736 if (!hlp)
62e76326 737 return;
738
94439e4e 739 /* note, don't free hlp->name, it probably points to static memory */
740 if (hlp->queue.head)
62e76326 741 debug(84, 0) ("WARNING: freeing %s helper with %d requests queued\n",
742 hlp->id_name, hlp->stats.queue_size);
743
94439e4e 744 cbdataFree(hlp);
745}
746
747
74addf6c 748/* ====================================================================== */
749/* LOCAL FUNCTIONS */
750/* ====================================================================== */
751
752static void
1f5f60dd 753helperServerFree(int fd, void *data)
74addf6c 754{
e6ccf245 755 helper_server *srv = (helper_server *)data;
74addf6c 756 helper *hlp = srv->parent;
ac750329 757 helper_request *r;
07eca7e0 758 int i, concurrency = hlp->concurrency;
759
760 if (!concurrency)
761 concurrency = 1;
762
74addf6c 763 assert(srv->rfd == fd);
62e76326 764
07eca7e0 765 if (srv->rbuf) {
766 memFreeBuf(srv->rbuf_sz, srv->rbuf);
767 srv->rbuf = NULL;
74addf6c 768 }
62e76326 769
2fe7eff9 770 srv->wqueue->clean();
032785bf 771 delete srv->wqueue;
772
773 if (srv->writebuf) {
2fe7eff9 774 srv->writebuf->clean();
032785bf 775 delete srv->writebuf;
776 srv->writebuf = NULL;
777 }
62e76326 778
07eca7e0 779 for (i = 0; i < concurrency; i++) {
780 if ((r = srv->requests[i])) {
781 void *cbdata;
62e76326 782
07eca7e0 783 if (cbdataReferenceValidDone(r->data, &cbdata))
784 r->callback(cbdata, NULL);
62e76326 785
07eca7e0 786 helperRequestFree(r);
787
788 srv->requests[i] = NULL;
789 }
ac750329 790 }
62e76326 791
07eca7e0 792 safe_free(srv->requests);
793
3cdb7cd0 794 if (srv->wfd != srv->rfd && srv->wfd != -1)
62e76326 795 comm_close(srv->wfd);
796
74addf6c 797 dlinkDelete(&srv->link, &hlp->servers);
62e76326 798
74addf6c 799 hlp->n_running--;
62e76326 800
74addf6c 801 assert(hlp->n_running >= 0);
62e76326 802
1f5f60dd 803 if (!srv->flags.shutdown) {
d8f10d6a 804 hlp->n_active--;
805 assert(hlp->n_active >= 0);
62e76326 806 debug(84, 0) ("WARNING: %s #%d (FD %d) exited\n",
807 hlp->id_name, srv->index + 1, fd);
808
5ea33fce 809 if (hlp->n_active < hlp->n_to_start / 2) {
810 debug(80, 0) ("Too few %s processes are running\n", hlp->id_name);
811
812 if (hlp->last_restart > squid_curtime - 30)
813 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
814
815 debug(80, 0) ("Starting new helpers\n");
816
817 helperOpenServers(hlp);
818 }
14e87a44 819 }
62e76326 820
fa80a8ef 821 cbdataReferenceDone(srv->parent);
14e87a44 822 cbdataFree(srv);
74addf6c 823}
824
94439e4e 825static void
826helperStatefulServerFree(int fd, void *data)
827{
e6ccf245 828 helper_stateful_server *srv = (helper_stateful_server *)data;
94439e4e 829 statefulhelper *hlp = srv->parent;
830 helper_stateful_request *r;
831 assert(srv->rfd == fd);
62e76326 832
07eca7e0 833 if (srv->rbuf) {
834 memFreeBuf(srv->rbuf_sz, srv->rbuf);
835 srv->rbuf = NULL;
94439e4e 836 }
62e76326 837
07eca7e0 838#if 0
2fe7eff9 839 srv->wqueue->clean();
032785bf 840
841 delete srv->wqueue;
07eca7e0 842
843#endif
844
94439e4e 845 if ((r = srv->request)) {
62e76326 846 void *cbdata;
847
848 if (cbdataReferenceValidDone(r->data, &cbdata))
07eca7e0 849 r->callback(cbdata, srv, NULL);
62e76326 850
851 helperStatefulRequestFree(r);
852
853 srv->request = NULL;
94439e4e 854 }
62e76326 855
3c641669 856 /* TODO: walk the local queue of requests and carry them all out */
94439e4e 857 if (srv->wfd != srv->rfd && srv->wfd != -1)
62e76326 858 comm_close(srv->wfd);
859
94439e4e 860 dlinkDelete(&srv->link, &hlp->servers);
62e76326 861
94439e4e 862 hlp->n_running--;
62e76326 863
94439e4e 864 assert(hlp->n_running >= 0);
62e76326 865
94439e4e 866 if (!srv->flags.shutdown) {
d8f10d6a 867 hlp->n_active--;
868 assert( hlp->n_active >= 0);
62e76326 869 debug(84, 0) ("WARNING: %s #%d (FD %d) exited\n",
870 hlp->id_name, srv->index + 1, fd);
871
5ea33fce 872 if (hlp->n_active <= hlp->n_to_start / 2) {
873 debug(80, 0) ("Too few %s processes are running\n", hlp->id_name);
874
875 if (hlp->last_restart > squid_curtime - 30)
876 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
877
878 debug(80, 0) ("Starting new helpers\n");
879
880 helperStatefulOpenServers(hlp);
881 }
94439e4e 882 }
62e76326 883
94439e4e 884 if (srv->data != NULL)
b001e822 885 hlp->datapool->free(srv->data);
62e76326 886
fa80a8ef 887 cbdataReferenceDone(srv->parent);
62e76326 888
94439e4e 889 cbdataFree(srv);
890}
891
892
74addf6c 893static void
c4b7a5a9 894helperHandleRead(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
74addf6c 895{
74addf6c 896 char *t = NULL;
e6ccf245 897 helper_server *srv = (helper_server *)data;
74addf6c 898 helper *hlp = srv->parent;
899 assert(fd == srv->rfd);
fa80a8ef 900 assert(cbdataReferenceValid(data));
c4b7a5a9 901
902 /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
62e76326 903
c4b7a5a9 904 if (flag == COMM_ERR_CLOSING) {
905 return;
906 }
907
17bb3486 908 debug(84, 5) ("helperHandleRead: %d bytes from %s #%d.\n",
62e76326 909 (int)len, hlp->id_name, srv->index + 1);
910
c4b7a5a9 911 if (flag != COMM_OK || len <= 0) {
62e76326 912 if (len < 0)
913 debug(84, 1) ("helperHandleRead: FD %d read: %s\n", fd, xstrerror());
914
915 comm_close(fd);
916
917 return;
74addf6c 918 }
62e76326 919
07eca7e0 920 srv->roffset += len;
921 srv->rbuf[srv->roffset] = '\0';
922 debug(84, 9) ("helperHandleRead: '%s'\n", srv->rbuf);
62e76326 923
07eca7e0 924 if (!srv->stats.pending) {
62e76326 925 /* someone spoke without being spoken to */
07eca7e0 926 debug(84, 1) ("helperHandleRead: unexpected read from %s #%d, %d bytes '%s'\n",
927 hlp->id_name, srv->index + 1, (int)len, srv->rbuf);
928 srv->roffset = 0;
929 srv->rbuf[0] = '\0';
930 }
931
932 while ((t = strchr(srv->rbuf, '\n'))) {
62e76326 933 /* end of reply found */
07eca7e0 934 helper_request *r;
935 char *msg = srv->rbuf;
936 int i = 0;
62e76326 937 debug(84, 3) ("helperHandleRead: end of reply found\n");
6bf4f823 938
939 if (t > srv->rbuf && t[-1] == '\r')
940 t[-1] = '\0';
941
07eca7e0 942 *t++ = '\0';
62e76326 943
07eca7e0 944 if (hlp->concurrency) {
945 i = strtol(msg, &msg, 10);
62e76326 946
07eca7e0 947 while (*msg && isspace(*msg))
948 msg++;
949 }
62e76326 950
07eca7e0 951 r = srv->requests[i];
62e76326 952
07eca7e0 953 if (r) {
954 HLPCB *callback = r->callback;
955 void *cbdata;
62e76326 956
07eca7e0 957 srv->requests[i] = NULL;
62e76326 958
07eca7e0 959 r->callback = NULL;
62e76326 960
07eca7e0 961 if (cbdataReferenceValidDone(r->data, &cbdata))
962 callback(cbdata, msg);
62e76326 963
07eca7e0 964 srv->stats.pending--;
965
966 hlp->stats.replies++;
967
968 hlp->stats.avg_svc_time =
969 intAverage(hlp->stats.avg_svc_time,
970 tvSubMsec(r->dispatch_time, current_time),
971 hlp->stats.replies, REDIRECT_AV_FACTOR);
972
973 helperRequestFree(r);
974 } else {
975 debug(84, 1) ("helperHandleRead: unexpected reply on channel %d from %s #%d '%s'\n",
976 i, hlp->id_name, srv->index + 1, srv->rbuf);
977 }
978
979 srv->roffset -= (t - srv->rbuf);
980 memmove(srv->rbuf, t, srv->roffset + 1);
62e76326 981
982 if (srv->flags.shutdown) {
983 int wfd = srv->wfd;
984 srv->wfd = -1;
d8f10d6a 985 srv->flags.closing=1;
62e76326 986 comm_close(wfd);
987 } else
988 helperKickQueue(hlp);
74addf6c 989 }
07eca7e0 990
991 comm_read(fd, srv->rbuf + srv->roffset, srv->rbuf_sz - srv->roffset - 1, helperHandleRead, srv);
74addf6c 992}
993
94439e4e 994static void
c4b7a5a9 995helperStatefulHandleRead(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
94439e4e 996{
94439e4e 997 char *t = NULL;
e6ccf245 998 helper_stateful_server *srv = (helper_stateful_server *)data;
94439e4e 999 helper_stateful_request *r;
1000 statefulhelper *hlp = srv->parent;
1001 assert(fd == srv->rfd);
fa80a8ef 1002 assert(cbdataReferenceValid(data));
c4b7a5a9 1003
1004 /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
62e76326 1005
c4b7a5a9 1006 if (flag == COMM_ERR_CLOSING) {
1007 return;
1008 }
1009
17bb3486 1010 debug(84, 5) ("helperStatefulHandleRead: %d bytes from %s #%d.\n",
62e76326 1011 (int)len, hlp->id_name, srv->index + 1);
1012
c4b7a5a9 1013 if (flag != COMM_OK || len <= 0) {
62e76326 1014 if (len < 0)
1015 debug(84, 1) ("helperStatefulHandleRead: FD %d read: %s\n", fd, xstrerror());
1016
1017 comm_close(fd);
1018
1019 return;
94439e4e 1020 }
62e76326 1021
07eca7e0 1022 srv->roffset += len;
1023 srv->rbuf[srv->roffset] = '\0';
94439e4e 1024 r = srv->request;
62e76326 1025
94439e4e 1026 if (r == NULL) {
62e76326 1027 /* someone spoke without being spoken to */
07eca7e0 1028 debug(84, 1) ("helperStatefulHandleRead: unexpected read from %s #%d, %d bytes '%s'\n",
1029 hlp->id_name, srv->index + 1, (int)len, srv->rbuf);
1030 srv->roffset = 0;
1031 }
1032
1033 if ((t = strchr(srv->rbuf, '\n'))) {
62e76326 1034 /* end of reply found */
1035 debug(84, 3) ("helperStatefulHandleRead: end of reply found\n");
6bf4f823 1036
1037 if (t > srv->rbuf && t[-1] == '\r')
1038 t[-1] = '\0';
1039
62e76326 1040 *t = '\0';
1041
1042 if (cbdataReferenceValid(r->data)) {
07eca7e0 1043 switch ((r->callback(r->data, srv, srv->rbuf))) { /*if non-zero reserve helper */
62e76326 1044
1045 case S_HELPER_UNKNOWN:
1046 fatal("helperStatefulHandleRead: either a non-state aware callback was give to the stateful helper routines, or an uninitialised callback response was recieved.\n");
1047 break;
1048
1049 case S_HELPER_RELEASE: /* helper finished with */
1050
1051 if (!srv->deferred_requests && !srv->queue.head) {
1052 srv->flags.reserved = S_HELPER_FREE;
1053
1054 if ((srv->parent->OnEmptyQueue != NULL) && (srv->data))
1055 srv->parent->OnEmptyQueue(srv->data);
1056
1057 debug(84, 5) ("StatefulHandleRead: releasing %s #%d\n", hlp->id_name, srv->index + 1);
1058 } else {
1059 srv->flags.reserved = S_HELPER_DEFERRED;
1060 debug(84, 5) ("StatefulHandleRead: outstanding deferred requests on %s #%d. reserving for deferred requests.\n", hlp->id_name, srv->index + 1);
1061 }
1062
1063 break;
1064
1065 case S_HELPER_RESERVE: /* 'pin' this helper for the caller */
1066
1067 if (!srv->queue.head) {
1068 assert(srv->deferred_requests == 0);
1069 srv->flags.reserved = S_HELPER_RESERVED;
1070 debug(84, 5) ("StatefulHandleRead: reserving %s #%d\n", hlp->id_name, srv->index + 1);
1071 } else {
1072 fatal("StatefulHandleRead: Callback routine attempted to reserve a stateful helper with deferred requests. This can lead to deadlock.\n");
1073 }
1074
1075 break;
1076
1077 case S_HELPER_DEFER:
1078 /* the helper is still needed, but can
1079 * be used for other requests in the meantime.
1080 */
1081 srv->flags.reserved = S_HELPER_DEFERRED;
1082 srv->deferred_requests++;
1083 srv->stats.deferbycb++;
1084 debug(84, 5) ("StatefulHandleRead: reserving %s #%d for deferred requests.\n", hlp->id_name, srv->index + 1);
1085 break;
1086
1087 default:
1088 fatal("helperStatefulHandleRead: unknown stateful helper callback result.\n");
1089 }
1090
1091 } else {
1092 debug(84, 1) ("StatefulHandleRead: no callback data registered\n");
1093 }
1094
1095 srv->flags.busy = 0;
07eca7e0 1096 srv->roffset = 0;
62e76326 1097 helperStatefulRequestFree(r);
1098 srv->request = NULL;
1099 hlp->stats.replies++;
1100 hlp->stats.avg_svc_time =
1101 intAverage(hlp->stats.avg_svc_time,
1102 tvSubMsec(srv->dispatch_time, current_time),
1103 hlp->stats.replies, REDIRECT_AV_FACTOR);
1104
1105 if (srv->flags.shutdown
1106 && srv->flags.reserved == S_HELPER_FREE
1107 && !srv->deferred_requests) {
1108 int wfd = srv->wfd;
1109 srv->wfd = -1;
d8f10d6a 1110 srv->flags.closing=1;
62e76326 1111 comm_close(wfd);
1112 } else {
1113 if (srv->queue.head)
1114 helperStatefulServerKickQueue(srv);
1115 else
1116 helperStatefulKickQueue(hlp);
1117 }
94439e4e 1118 }
07eca7e0 1119
1120 comm_read(srv->rfd, srv->rbuf + srv->roffset, srv->rbuf_sz - srv->roffset - 1,
1121 helperStatefulHandleRead, srv);
94439e4e 1122}
1123
74addf6c 1124static void
1125Enqueue(helper * hlp, helper_request * r)
1126{
e6ccf245 1127 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
74addf6c 1128 dlinkAddTail(r, link, &hlp->queue);
1129 hlp->stats.queue_size++;
62e76326 1130
74addf6c 1131 if (hlp->stats.queue_size < hlp->n_running)
62e76326 1132 return;
1133
74addf6c 1134 if (squid_curtime - hlp->last_queue_warn < 600)
62e76326 1135 return;
1136
fe73896c 1137 if (shutting_down || reconfiguring)
62e76326 1138 return;
1139
74addf6c 1140 hlp->last_queue_warn = squid_curtime;
62e76326 1141
c916b75b 1142 debug(84, 0) ("WARNING: All %s processes are busy.\n", hlp->id_name);
62e76326 1143
c916b75b 1144 debug(84, 0) ("WARNING: %d pending requests queued\n", hlp->stats.queue_size);
62e76326 1145
74addf6c 1146 if (hlp->stats.queue_size > hlp->n_running * 2)
62e76326 1147 fatalf("Too many queued %s requests", hlp->id_name);
1148
c916b75b 1149 debug(84, 1) ("Consider increasing the number of %s processes in your config file.\n", hlp->id_name);
74addf6c 1150}
1151
94439e4e 1152static void
1153StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r)
1154{
e6ccf245 1155 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
94439e4e 1156 dlinkAddTail(r, link, &hlp->queue);
1157 hlp->stats.queue_size++;
62e76326 1158
94439e4e 1159 if (hlp->stats.queue_size < hlp->n_running)
62e76326 1160 return;
1161
893cbac6 1162 if (hlp->stats.queue_size > hlp->n_running * 2)
62e76326 1163 fatalf("Too many queued %s requests", hlp->id_name);
1164
94439e4e 1165 if (squid_curtime - hlp->last_queue_warn < 600)
62e76326 1166 return;
1167
94439e4e 1168 if (shutting_down || reconfiguring)
62e76326 1169 return;
1170
94439e4e 1171 hlp->last_queue_warn = squid_curtime;
62e76326 1172
c916b75b 1173 debug(84, 0) ("WARNING: All %s processes are busy.\n", hlp->id_name);
62e76326 1174
c916b75b 1175 debug(84, 0) ("WARNING: %d pending requests queued\n", hlp->stats.queue_size);
62e76326 1176
c916b75b 1177 debug(84, 1) ("Consider increasing the number of %s processes in your config file.\n", hlp->id_name);
94439e4e 1178}
1179
1180static void
1181StatefulServerEnqueue(helper_stateful_server * srv, helper_stateful_request * r)
1182{
e6ccf245 1183 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
94439e4e 1184 dlinkAddTail(r, link, &srv->queue);
62e76326 1185 /* TODO: warning if the queue on this server is more than X
1186 * We don't check the queue size at the moment, because
1187 * requests hitting here are deferrable
1188 */
1189 /* hlp->stats.queue_size++;
1190 * if (hlp->stats.queue_size < hlp->n_running)
1191 * return;
1192 * if (squid_curtime - hlp->last_queue_warn < 600)
1193 * return;
1194 * if (shutting_down || reconfiguring)
1195 * return;
1196 * hlp->last_queue_warn = squid_curtime;
1197 * debug(84, 0) ("WARNING: All %s processes are busy.\n", hlp->id_name);
1198 * debug(84, 0) ("WARNING: %d pending requests queued\n", hlp->stats.queue_size);
1199 * if (hlp->stats.queue_size > hlp->n_running * 2)
1200 * fatalf("Too many queued %s requests", hlp->id_name);
1201 * debug(84, 1) ("Consider increasing the number of %s processes in your config file.\n", hlp->id_name); */
94439e4e 1202}
1203
1204
74addf6c 1205static helper_request *
1206Dequeue(helper * hlp)
1207{
1208 dlink_node *link;
1209 helper_request *r = NULL;
62e76326 1210
74addf6c 1211 if ((link = hlp->queue.head)) {
62e76326 1212 r = (helper_request *)link->data;
1213 dlinkDelete(link, &hlp->queue);
1214 memFree(link, MEM_DLINK_NODE);
1215 hlp->stats.queue_size--;
74addf6c 1216 }
62e76326 1217
74addf6c 1218 return r;
1219}
1220
94439e4e 1221static helper_stateful_request *
1222StatefulServerDequeue(helper_stateful_server * srv)
1223{
1224 dlink_node *link;
1225 helper_stateful_request *r = NULL;
62e76326 1226
94439e4e 1227 if ((link = srv->queue.head)) {
62e76326 1228 r = (helper_stateful_request *)link->data;
1229 dlinkDelete(link, &srv->queue);
1230 memFree(link, MEM_DLINK_NODE);
94439e4e 1231 }
62e76326 1232
94439e4e 1233 return r;
1234}
1235
1236static helper_stateful_request *
1237StatefulDequeue(statefulhelper * hlp)
1238{
1239 dlink_node *link;
1240 helper_stateful_request *r = NULL;
62e76326 1241
94439e4e 1242 if ((link = hlp->queue.head)) {
62e76326 1243 r = (helper_stateful_request *)link->data;
1244 dlinkDelete(link, &hlp->queue);
1245 memFree(link, MEM_DLINK_NODE);
1246 hlp->stats.queue_size--;
94439e4e 1247 }
62e76326 1248
94439e4e 1249 return r;
1250}
1251
74addf6c 1252static helper_server *
1253GetFirstAvailable(helper * hlp)
1254{
1255 dlink_node *n;
07eca7e0 1256 helper_server *srv;
1257 helper_server *selected = NULL;
62e76326 1258
fe73896c 1259 if (hlp->n_running == 0)
62e76326 1260 return NULL;
1261
07eca7e0 1262 /* Find "least" loaded helper (approx) */
74addf6c 1263 for (n = hlp->servers.head; n != NULL; n = n->next) {
62e76326 1264 srv = (helper_server *)n->data;
1265
07eca7e0 1266 if (selected && selected->stats.pending <= srv->stats.pending)
62e76326 1267 continue;
1268
d8f10d6a 1269 if (srv->flags.shutdown)
62e76326 1270 continue;
1271
07eca7e0 1272 if (!srv->stats.pending)
1273 return srv;
1274
1275 if (selected) {
1276 selected = srv;
1277 break;
1278 }
1279
1280 selected = srv;
74addf6c 1281 }
62e76326 1282
07eca7e0 1283 /* Check for overload */
1284 if (!selected)
1285 return NULL;
1286
1287 if (selected->stats.pending >= (hlp->concurrency ? hlp->concurrency : 1))
1288 return NULL;
1289
1290 return selected;
74addf6c 1291}
1292
94439e4e 1293static helper_stateful_server *
1294StatefulGetFirstAvailable(statefulhelper * hlp)
1295{
1296 dlink_node *n;
1297 helper_stateful_server *srv = NULL;
17bb3486 1298 debug(84, 5) ("StatefulGetFirstAvailable: Running servers %d.\n", hlp->n_running);
62e76326 1299
94439e4e 1300 if (hlp->n_running == 0)
62e76326 1301 return NULL;
1302
94439e4e 1303 for (n = hlp->servers.head; n != NULL; n = n->next) {
62e76326 1304 srv = (helper_stateful_server *)n->data;
1305
1306 if (srv->flags.busy)
1307 continue;
1308
1309 if (srv->flags.reserved == S_HELPER_RESERVED)
1310 continue;
1311
d8f10d6a 1312 if (srv->flags.shutdown)
62e76326 1313 continue;
1314
1315 if ((hlp->IsAvailable != NULL) && (srv->data != NULL) && !(hlp->IsAvailable(srv->data)))
1316 continue;
1317
1318 return srv;
94439e4e 1319 }
62e76326 1320
17bb3486 1321 debug(84, 5) ("StatefulGetFirstAvailable: None available.\n");
94439e4e 1322 return NULL;
1323}
1324
1325
42679bd6 1326static void
1327helperDispatchWriteDone(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
1328{
07eca7e0 1329 helper_server *srv = (helper_server *)data;
1330
2fe7eff9 1331 srv->writebuf->clean();
032785bf 1332 delete srv->writebuf;
1333 srv->writebuf = NULL;
07eca7e0 1334 srv->flags.writing = 0;
1335
1336 if (flag != COMM_OK) {
1337 /* Helper server has crashed */
1338 debug(84, 0) ("helperDispatch: Helper %s #%d has crashed\n",
1339 srv->parent->id_name, srv->index + 1);
1340 return;
1341 }
1342
2fe7eff9 1343 if (!srv->wqueue->isNull()) {
07eca7e0 1344 srv->writebuf = srv->wqueue;
032785bf 1345 srv->wqueue = new MemBuf;
07eca7e0 1346 srv->flags.writing = 1;
1347 comm_write(srv->wfd,
032785bf 1348 srv->writebuf->content(),
1349 srv->writebuf->contentSize(),
07eca7e0 1350 helperDispatchWriteDone, /* Handler */
1351 srv); /* Handler-data */
1352 }
42679bd6 1353}
1354
74addf6c 1355static void
1356helperDispatch(helper_server * srv, helper_request * r)
1357{
1358 helper *hlp = srv->parent;
07eca7e0 1359 helper_request **ptr = NULL;
1360 unsigned int slot;
62e76326 1361
fa80a8ef 1362 if (!cbdataReferenceValid(r->data)) {
62e76326 1363 debug(84, 1) ("helperDispatch: invalid callback data\n");
1364 helperRequestFree(r);
1365 return;
74addf6c 1366 }
62e76326 1367
07eca7e0 1368 for (slot = 0; slot < (hlp->concurrency ? hlp->concurrency : 1); slot++) {
1369 if (!srv->requests[slot]) {
1370 ptr = &srv->requests[slot];
1371 break;
1372 }
1373 }
1374
1375 assert(ptr);
1376 *ptr = r;
1377 srv->stats.pending += 1;
1378 r->dispatch_time = current_time;
1379
2fe7eff9 1380 if (srv->wqueue->isNull())
1381 srv->wqueue->init();
07eca7e0 1382
1383 if (hlp->concurrency)
2fe7eff9 1384 srv->wqueue->Printf("%d %s", slot, r->buf);
07eca7e0 1385 else
2fe7eff9 1386 srv->wqueue->append(r->buf, strlen(r->buf));
07eca7e0 1387
1388 if (!srv->flags.writing) {
032785bf 1389 assert(NULL == srv->writebuf);
07eca7e0 1390 srv->writebuf = srv->wqueue;
032785bf 1391 srv->wqueue = new MemBuf;
07eca7e0 1392 srv->flags.writing = 1;
1393 comm_write(srv->wfd,
032785bf 1394 srv->writebuf->content(),
1395 srv->writebuf->contentSize(),
07eca7e0 1396 helperDispatchWriteDone, /* Handler */
1397 srv); /* Handler-data */
1398 }
1399
17bb3486 1400 debug(84, 5) ("helperDispatch: Request sent to %s #%d, %d bytes\n",
62e76326 1401 hlp->id_name, srv->index + 1, (int) strlen(r->buf));
74addf6c 1402 srv->stats.uses++;
1403 hlp->stats.requests++;
1404}
1405
42679bd6 1406static void
1407helperStatefulDispatchWriteDone(int fd, char *buf, size_t len, comm_err_t flag,
62e76326 1408 int xerrno, void *data)
42679bd6 1409{
62e76326 1410 /* nothing! */
42679bd6 1411}
1412
1413
94439e4e 1414static void
1415helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r)
1416{
1417 statefulhelper *hlp = srv->parent;
62e76326 1418
fa80a8ef 1419 if (!cbdataReferenceValid(r->data)) {
62e76326 1420 debug(84, 1) ("helperStatefulDispatch: invalid callback data\n");
1421 helperStatefulRequestFree(r);
1422 return;
94439e4e 1423 }
62e76326 1424
17bb3486 1425 debug(84, 9) ("helperStatefulDispatch busying helper %s #%d\n", hlp->id_name, srv->index + 1);
62e76326 1426
94439e4e 1427 if (r->placeholder == 1) {
62e76326 1428 /* a callback is needed before this request can _use_ a helper. */
1429 /* we don't care about releasing/deferring this helper. The request NEVER
1430 * gets to the helper. So we throw away the return code */
1431 r->callback(r->data, srv, NULL);
1432 /* throw away the placeholder */
1433 helperStatefulRequestFree(r);
1434 /* and push the queue. Note that the callback may have submitted a new
1435 * request to the helper which is why we test for the request*/
1436
1437 if (srv->request == NULL) {
1438 if (srv->flags.shutdown
1439 && srv->flags.reserved == S_HELPER_FREE
1440 && !srv->deferred_requests) {
1441 int wfd = srv->wfd;
1442 srv->wfd = -1;
1443 comm_close(wfd);
1444 } else {
1445 if (srv->queue.head)
1446 helperStatefulServerKickQueue(srv);
1447 else
1448 helperStatefulKickQueue(hlp);
1449 }
1450 }
1451
1452 return;
94439e4e 1453 }
62e76326 1454
94439e4e 1455 srv->flags.busy = 1;
1456 srv->request = r;
1457 srv->dispatch_time = current_time;
42679bd6 1458 comm_write(srv->wfd,
62e76326 1459 r->buf,
1460 strlen(r->buf),
1461 helperStatefulDispatchWriteDone, /* Handler */
1462 hlp); /* Handler-data */
17bb3486 1463 debug(84, 5) ("helperStatefulDispatch: Request sent to %s #%d, %d bytes\n",
62e76326 1464 hlp->id_name, srv->index + 1, (int) strlen(r->buf));
94439e4e 1465 srv->stats.uses++;
1466 hlp->stats.requests++;
1467}
1468
1469
74addf6c 1470static void
1471helperKickQueue(helper * hlp)
1472{
1473 helper_request *r;
1474 helper_server *srv;
62e76326 1475
74addf6c 1476 while ((srv = GetFirstAvailable(hlp)) && (r = Dequeue(hlp)))
62e76326 1477 helperDispatch(srv, r);
74addf6c 1478}
1479
94439e4e 1480static void
1481helperStatefulKickQueue(statefulhelper * hlp)
1482{
1483 helper_stateful_request *r;
1484 helper_stateful_server *srv;
62e76326 1485
94439e4e 1486 while ((srv = StatefulGetFirstAvailable(hlp)) && (r = StatefulDequeue(hlp)))
62e76326 1487 helperStatefulDispatch(srv, r);
94439e4e 1488}
1489
1490static void
1491helperStatefulServerKickQueue(helper_stateful_server * srv)
1492{
1493 helper_stateful_request *r;
62e76326 1494
94439e4e 1495 if ((r = StatefulServerDequeue(srv)))
62e76326 1496 helperStatefulDispatch(srv, r);
94439e4e 1497}
1498
74addf6c 1499static void
1500helperRequestFree(helper_request * r)
1501{
fa80a8ef 1502 cbdataReferenceDone(r->data);
74addf6c 1503 xfree(r->buf);
00d77d6b 1504 delete r;
51ee7c82 1505}
1506
94439e4e 1507static void
1508helperStatefulRequestFree(helper_stateful_request * r)
1509{
fa80a8ef 1510 cbdataReferenceDone(r->data);
94439e4e 1511 xfree(r->buf);
00d77d6b 1512 delete r;
51ee7c82 1513}