]> git.ipfire.org Git - thirdparty/squid.git/blame - src/helper.cc
Import IPv6 support from squid3-ipv6 branch to 3-HEAD.
[thirdparty/squid.git] / src / helper.cc
CommitLineData
f740a279 1
2/*
cc192b50 3 * $Id: helper.cc,v 1.91 2007/12/14 23:11:46 amosjeffries Exp $
f740a279 4 *
17bb3486 5 * DEBUG: section 84 Helper process maintenance
f740a279 6 * AUTHOR: Harvest Derived?
7 *
2b6662ba 8 * SQUID Web Proxy Cache http://www.squid-cache.org/
f740a279 9 * ----------------------------------------------------------
10 *
2b6662ba 11 * Squid is the result of efforts by numerous individuals from
12 * the Internet community; see the CONTRIBUTORS file for full
13 * details. Many organizations have provided support for Squid's
14 * development; see the SPONSORS file for full details. Squid is
15 * Copyrighted (C) 2001 by the Regents of the University of
16 * California; see the COPYRIGHT file for full details. Squid
17 * incorporates software developed and/or copyrighted by other
18 * sources; see the CREDITS file for full details.
f740a279 19 *
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
24 *
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
29 *
30 * You should have received a copy of the GNU General Public License
31 * along with this program; if not, write to the Free Software
32 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
33 *
34 */
35
74addf6c 36#include "squid.h"
51ee7c82 37#include "helper.h"
985c86bc 38#include "SquidTime.h"
e6ccf245 39#include "Store.h"
42679bd6 40#include "comm.h"
0eb49b6d 41#include "MemBuf.h"
d295d770 42#include "wordlist.h"
74addf6c 43
44#define HELPER_MAX_ARGS 64
45
c4b7a5a9 46static IOCB helperHandleRead;
47static IOCB helperStatefulHandleRead;
1f5f60dd 48static PF helperServerFree;
94439e4e 49static PF helperStatefulServerFree;
74addf6c 50static void Enqueue(helper * hlp, helper_request *);
51static helper_request *Dequeue(helper * hlp);
94439e4e 52static helper_stateful_request *StatefulDequeue(statefulhelper * hlp);
74addf6c 53static helper_server *GetFirstAvailable(helper * hlp);
94439e4e 54static helper_stateful_server *StatefulGetFirstAvailable(statefulhelper * hlp);
74addf6c 55static void helperDispatch(helper_server * srv, helper_request * r);
94439e4e 56static void helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r);
74addf6c 57static void helperKickQueue(helper * hlp);
94439e4e 58static void helperStatefulKickQueue(statefulhelper * hlp);
74addf6c 59static void helperRequestFree(helper_request * r);
94439e4e 60static void helperStatefulRequestFree(helper_stateful_request * r);
61static void StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r);
62static helper_stateful_request *StatefulServerDequeue(helper_stateful_server * srv);
63static void StatefulServerEnqueue(helper_stateful_server * srv, helper_stateful_request * r);
64static void helperStatefulServerKickQueue(helper_stateful_server * srv);
9522b380 65static bool helperStartStats(StoreEntry *sentry, void *hlp, const char *label);
66
74addf6c 67
aa839030 68CBDATA_TYPE(helper);
69CBDATA_TYPE(helper_server);
70CBDATA_TYPE(statefulhelper);
71CBDATA_TYPE(helper_stateful_server);
72
74addf6c 73void
74helperOpenServers(helper * hlp)
75{
76 char *s;
77 char *progname;
78 char *shortname;
79 char *procname;
1b8af627 80 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
74addf6c 81 char fd_note_buf[FD_DESC_SZ];
82 helper_server *srv;
83 int nargs = 0;
84 int k;
b5d712b5 85 pid_t pid;
74addf6c 86 int rfd;
87 int wfd;
b5d712b5 88 void * hIpc;
74addf6c 89 wordlist *w;
62e76326 90
74addf6c 91 if (hlp->cmdline == NULL)
62e76326 92 return;
93
74addf6c 94 progname = hlp->cmdline->key;
62e76326 95
74addf6c 96 if ((s = strrchr(progname, '/')))
62e76326 97 shortname = xstrdup(s + 1);
74addf6c 98 else
62e76326 99 shortname = xstrdup(progname);
100
bf8fe701 101 debugs(84, 1, "helperOpenServers: Starting " << hlp->n_to_start << " '" << shortname << "' processes");
62e76326 102
e6ccf245 103 procname = (char *)xmalloc(strlen(shortname) + 3);
62e76326 104
74addf6c 105 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
62e76326 106
74addf6c 107 args[nargs++] = procname;
62e76326 108
74addf6c 109 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next)
62e76326 110 args[nargs++] = w->key;
111
74addf6c 112 args[nargs++] = NULL;
62e76326 113
74addf6c 114 assert(nargs <= HELPER_MAX_ARGS);
62e76326 115
74addf6c 116 for (k = 0; k < hlp->n_to_start; k++) {
62e76326 117 getCurrentTime();
118 rfd = wfd = -1;
b5d712b5 119 pid = ipcCreate(hlp->ipc_type,
120 progname,
121 args,
122 shortname,
cc192b50 123 hlp->addr,
b5d712b5 124 &rfd,
125 &wfd,
126 &hIpc);
127
128 if (pid < 0) {
bf8fe701 129 debugs(84, 1, "WARNING: Cannot run '" << progname << "' process.");
62e76326 130 continue;
131 }
132
133 hlp->n_running++;
d8f10d6a 134 hlp->n_active++;
b5d712b5 135 CBDATA_INIT_TYPE(helper_server);
62e76326 136 srv = cbdataAlloc(helper_server);
b5d712b5 137 srv->hIpc = hIpc;
138 srv->pid = pid;
62e76326 139 srv->index = k;
cc192b50 140 srv->addr = hlp->addr;
62e76326 141 srv->rfd = rfd;
142 srv->wfd = wfd;
07eca7e0 143 srv->rbuf = (char *)memAllocBuf(8192, &srv->rbuf_sz);
90bed1c4 144 srv->wqueue = new MemBuf;
07eca7e0 145 srv->roffset = 0;
146 srv->requests = (helper_request **)xcalloc(hlp->concurrency ? hlp->concurrency : 1, sizeof(*srv->requests));
62e76326 147 srv->parent = cbdataReference(hlp);
148 dlinkAddTail(srv, &srv->link, &hlp->servers);
149
150 if (rfd == wfd) {
151 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
152 fd_note(rfd, fd_note_buf);
153 } else {
154 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
155 fd_note(rfd, fd_note_buf);
156 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
157 fd_note(wfd, fd_note_buf);
158 }
159
160 commSetNonBlocking(rfd);
161
162 if (wfd != rfd)
163 commSetNonBlocking(wfd);
164
165 comm_add_close_handler(rfd, helperServerFree, srv);
07eca7e0 166
167 comm_read(srv->rfd, srv->rbuf, srv->rbuf_sz - 1, helperHandleRead, srv);
74addf6c 168 }
62e76326 169
5ea33fce 170 hlp->last_restart = squid_curtime;
74addf6c 171 safe_free(shortname);
172 safe_free(procname);
838b993c 173 helperKickQueue(hlp);
74addf6c 174}
175
4f0ef8e8 176/*
177 * DPW 2007-05-08
178 *
179 * helperStatefulOpenServers: create the stateful child helper processes
180 */
94439e4e 181void
182helperStatefulOpenServers(statefulhelper * hlp)
183{
94439e4e 184 char *shortname;
1b8af627 185 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
94439e4e 186 char fd_note_buf[FD_DESC_SZ];
94439e4e 187 int nargs = 0;
62e76326 188
94439e4e 189 if (hlp->cmdline == NULL)
62e76326 190 return;
191
4f0ef8e8 192 char *progname = hlp->cmdline->key;
62e76326 193
4f0ef8e8 194 char *s;
94439e4e 195 if ((s = strrchr(progname, '/')))
62e76326 196 shortname = xstrdup(s + 1);
94439e4e 197 else
62e76326 198 shortname = xstrdup(progname);
199
bf8fe701 200 debugs(84, 1, "helperStatefulOpenServers: Starting " << hlp->n_to_start << " '" << shortname << "' processes");
62e76326 201
4f0ef8e8 202 char *procname = (char *)xmalloc(strlen(shortname) + 3);
62e76326 203
94439e4e 204 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
62e76326 205
94439e4e 206 args[nargs++] = procname;
62e76326 207
4f0ef8e8 208 for (wordlist *w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next)
62e76326 209 args[nargs++] = w->key;
210
94439e4e 211 args[nargs++] = NULL;
62e76326 212
94439e4e 213 assert(nargs <= HELPER_MAX_ARGS);
62e76326 214
4f0ef8e8 215 for (int k = 0; k < hlp->n_to_start; k++) {
62e76326 216 getCurrentTime();
4f0ef8e8 217 int rfd = -1;
218 int wfd = -1;
219 void * hIpc;
220 pid_t pid = ipcCreate(hlp->ipc_type,
b5d712b5 221 progname,
222 args,
223 shortname,
cc192b50 224 hlp->addr,
b5d712b5 225 &rfd,
226 &wfd,
227 &hIpc);
228
229 if (pid < 0) {
bf8fe701 230 debugs(84, 1, "WARNING: Cannot run '" << progname << "' process.");
62e76326 231 continue;
232 }
233
234 hlp->n_running++;
d8f10d6a 235 hlp->n_active++;
b5d712b5 236 CBDATA_INIT_TYPE(helper_stateful_server);
4f0ef8e8 237 helper_stateful_server *srv = cbdataAlloc(helper_stateful_server);
b5d712b5 238 srv->hIpc = hIpc;
239 srv->pid = pid;
62e76326 240 srv->flags.reserved = S_HELPER_FREE;
241 srv->deferred_requests = 0;
242 srv->stats.deferbyfunc = 0;
243 srv->stats.deferbycb = 0;
244 srv->stats.submits = 0;
245 srv->stats.releases = 0;
246 srv->index = k;
cc192b50 247 srv->addr = hlp->addr;
62e76326 248 srv->rfd = rfd;
249 srv->wfd = wfd;
07eca7e0 250 srv->rbuf = (char *)memAllocBuf(8192, &srv->rbuf_sz);
251 srv->roffset = 0;
62e76326 252 srv->parent = cbdataReference(hlp);
253
254 if (hlp->datapool != NULL)
b001e822 255 srv->data = hlp->datapool->alloc();
62e76326 256
257 dlinkAddTail(srv, &srv->link, &hlp->servers);
258
259 if (rfd == wfd) {
260 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
261 fd_note(rfd, fd_note_buf);
262 } else {
263 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
264 fd_note(rfd, fd_note_buf);
265 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
266 fd_note(wfd, fd_note_buf);
267 }
268
269 commSetNonBlocking(rfd);
270
271 if (wfd != rfd)
272 commSetNonBlocking(wfd);
273
274 comm_add_close_handler(rfd, helperStatefulServerFree, srv);
07eca7e0 275
276 comm_read(srv->rfd, srv->rbuf, srv->rbuf_sz - 1, helperStatefulHandleRead, srv);
277
94439e4e 278 }
62e76326 279
5ea33fce 280 hlp->last_restart = squid_curtime;
94439e4e 281 safe_free(shortname);
282 safe_free(procname);
283 helperStatefulKickQueue(hlp);
284}
285
286
74addf6c 287void
288helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
289{
5b5f9257 290 if (hlp == NULL) {
bf8fe701 291 debugs(84, 3, "helperSubmit: hlp == NULL");
62e76326 292 callback(data, NULL);
293 return;
5b5f9257 294 }
62e76326 295
eff7218b 296 helper_request *r = new helper_request;
297 helper_server *srv;
298
74addf6c 299 r->callback = callback;
fa80a8ef 300 r->data = cbdataReference(data);
74addf6c 301 r->buf = xstrdup(buf);
62e76326 302
74addf6c 303 if ((srv = GetFirstAvailable(hlp)))
62e76326 304 helperDispatch(srv, r);
74addf6c 305 else
62e76326 306 Enqueue(hlp, r);
307
bf8fe701 308 debugs(84, 9, "helperSubmit: " << buf);
94439e4e 309}
310
721b0310 311/* lastserver = "server last used as part of a deferred or reserved
312 * request sequence"
313 */
94439e4e 314void
315helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPSCB * callback, void *data, helper_stateful_server * lastserver)
316{
94439e4e 317 if (hlp == NULL) {
bf8fe701 318 debugs(84, 3, "helperStatefulSubmit: hlp == NULL");
62e76326 319 callback(data, 0, NULL);
320 return;
94439e4e 321 }
62e76326 322
eff7218b 323 helper_stateful_request *r = new helper_stateful_request;
eff7218b 324
94439e4e 325 r->callback = callback;
fa80a8ef 326 r->data = cbdataReference(data);
62e76326 327
721b0310 328 if (buf != NULL) {
62e76326 329 r->buf = xstrdup(buf);
330 r->placeholder = 0;
721b0310 331 } else {
62e76326 332 r->buf = NULL;
333 r->placeholder = 1;
721b0310 334 }
62e76326 335
94439e4e 336 if ((buf != NULL) && lastserver) {
bf8fe701 337 debugs(84, 5, "StatefulSubmit with lastserver " << lastserver);
62e76326 338 /* the queue doesn't count for this assert because queued requests
339 * have already gone through here and been tested.
340 * It's legal to have deferred_requests == 0 and queue entries
341 * and status of S_HELPEER_DEFERRED.
342 * BUT: It's not legal to submit a new request w/lastserver in
343 * that state.
344 */
345 assert(!(lastserver->deferred_requests == 0 &&
346 lastserver->flags.reserved == S_HELPER_DEFERRED));
347
348 if (lastserver->flags.reserved != S_HELPER_RESERVED) {
349 lastserver->stats.submits++;
350 lastserver->deferred_requests--;
351 }
352
353 if (!(lastserver->request)) {
bf8fe701 354 debugs(84, 5, "StatefulSubmit dispatching");
62e76326 355 helperStatefulDispatch(lastserver, r);
356 } else {
bf8fe701 357 debugs(84, 5, "StatefulSubmit queuing");
62e76326 358 StatefulServerEnqueue(lastserver, r);
359 }
94439e4e 360 } else {
4f0ef8e8 361 helper_stateful_server *srv;
62e76326 362 if ((srv = StatefulGetFirstAvailable(hlp))) {
363 helperStatefulDispatch(srv, r);
364 } else
365 StatefulEnqueue(hlp, r);
94439e4e 366 }
62e76326 367
bf8fe701 368 debugs(84, 9, "helperStatefulSubmit: placeholder: '" << r->placeholder << "', buf '" << buf << "'.");
94439e4e 369}
370
4f0ef8e8 371/*
372 * helperStatefulDefer
373 *
374 * find and add a deferred request to a helper
375 */
94439e4e 376helper_stateful_server *
377helperStatefulDefer(statefulhelper * hlp)
94439e4e 378{
4f0ef8e8 379 if (hlp == NULL) {
bf8fe701 380 debugs(84, 3, "helperStatefulDefer: hlp == NULL");
62e76326 381 return NULL;
94439e4e 382 }
62e76326 383
4f0ef8e8 384 debugs(84, 5, "helperStatefulDefer: Running servers " << hlp->n_running);
62e76326 385
4f0ef8e8 386 if (hlp->n_running == 0) {
bf8fe701 387 debugs(84, 1, "helperStatefulDefer: No running servers!. ");
62e76326 388 return NULL;
94439e4e 389 }
62e76326 390
4f0ef8e8 391 helper_stateful_server *rv = StatefulGetFirstAvailable(hlp);
62e76326 392
4f0ef8e8 393 if (rv == NULL) {
62e76326 394 /*
395 * all currently busy; loop through servers and find server
396 * with the shortest queue
397 */
398
4f0ef8e8 399 for (dlink_node *n = hlp->servers.head; n != NULL; n = n->next) {
400 helper_stateful_server *srv = (helper_stateful_server *)n->data;
62e76326 401
402 if (srv->flags.reserved == S_HELPER_RESERVED)
403 continue;
404
b902a58c 405 if (!srv->flags.shutdown)
62e76326 406 continue;
407
408 if ((hlp->IsAvailable != NULL) && (srv->data != NULL) &&
409 !(hlp->IsAvailable(srv->data)))
410 continue;
411
412 if ((rv != NULL) && (rv->deferred_requests < srv->deferred_requests))
413 continue;
414
415 rv = srv;
416 }
0a706c69 417 }
62e76326 418
4f0ef8e8 419 if (rv == NULL) {
bf8fe701 420 debugs(84, 1, "helperStatefulDefer: None available.");
62e76326 421 return NULL;
94439e4e 422 }
62e76326 423
60d096f4 424 /* consistency check:
425 * when the deferred count is 0,
426 * submits + releases == deferbyfunc + deferbycb
427 * Or in english, when there are no deferred requests, the amount
428 * we have submitted to the queue or cancelled must equal the amount
429 * we have said we wanted to be able to submit or cancel
430 */
431 if (rv->deferred_requests == 0)
62e76326 432 assert(rv->stats.submits + rv->stats.releases ==
433 rv->stats.deferbyfunc + rv->stats.deferbycb);
60d096f4 434
94439e4e 435 rv->flags.reserved = S_HELPER_DEFERRED;
62e76326 436
94439e4e 437 rv->deferred_requests++;
62e76326 438
60d096f4 439 rv->stats.deferbyfunc++;
62e76326 440
94439e4e 441 return rv;
442}
443
444void
445helperStatefulReset(helper_stateful_server * srv)
62e76326 446/* puts this helper back in the queue. the calling app is required to
94439e4e 447 * manage the state in the helper.
448 */
449{
450 statefulhelper *hlp = srv->parent;
4f0ef8e8 451 helper_stateful_request *r = srv->request;
62e76326 452
4f0ef8e8 453 if (r != NULL) {
62e76326 454 /* reset attempt DURING an outstaning request */
bf8fe701 455 debugs(84, 1, "helperStatefulReset: RESET During request " << hlp->id_name << " ");
62e76326 456 srv->flags.busy = 0;
07eca7e0 457 srv->roffset = 0;
62e76326 458 helperStatefulRequestFree(r);
459 srv->request = NULL;
94439e4e 460 }
62e76326 461
94439e4e 462 srv->flags.busy = 0;
62e76326 463
4f0ef8e8 464 if (srv->queue.head) {
62e76326 465 srv->flags.reserved = S_HELPER_DEFERRED;
466 helperStatefulServerKickQueue(srv);
4f0ef8e8 467 } else {
62e76326 468 srv->flags.reserved = S_HELPER_FREE;
469
470 if ((srv->parent->OnEmptyQueue != NULL) && (srv->data))
471 srv->parent->OnEmptyQueue(srv->data);
472
473 helperStatefulKickQueue(hlp);
94439e4e 474 }
475}
476
4f0ef8e8 477/*
478 * DPW 2007-05-08
479 *
480 * helperStatefulReleaseServer tells the helper that whoever was
481 * using it no longer needs its services.
482 *
483 * If the state is S_HELPER_DEFERRED, decrease the deferred count.
484 * If the count goes to zero, then it can become S_HELPER_FREE.
485 *
486 * If the state is S_HELPER_RESERVED, then it should always
487 * become S_HELPER_FREE.
488 */
94439e4e 489void
490helperStatefulReleaseServer(helper_stateful_server * srv)
94439e4e 491{
4f0ef8e8 492 debugs(84, 3, HERE << "srv-" << srv->index << " flags.reserved = " << srv->flags.reserved);
be29fa07 493 if (srv->flags.reserved == S_HELPER_FREE)
494 return;
4f0ef8e8 495
60d096f4 496 srv->stats.releases++;
62e76326 497
4f0ef8e8 498 if (srv->flags.reserved == S_HELPER_DEFERRED) {
62e76326 499 assert(srv->deferred_requests);
500 srv->deferred_requests--;
4f0ef8e8 501 if (srv->deferred_requests) {
502 debugs(0,0,HERE << "helperStatefulReleaseServer srv->deferred_requests=" << srv->deferred_requests);
503 return;
504 }
505 if (srv->queue.head) {
506 debugs(0,0,HERE << "helperStatefulReleaseServer srv->queue.head not NULL");
507 return;
508 }
60d096f4 509 }
62e76326 510
4f0ef8e8 511 srv->flags.reserved = S_HELPER_FREE;
512 if (srv->parent->OnEmptyQueue != NULL && srv->data)
513 srv->parent->OnEmptyQueue(srv->data);
94439e4e 514}
515
516void *
517helperStatefulServerGetData(helper_stateful_server * srv)
518/* return a pointer to the stateful routines data area */
519{
520 return srv->data;
74addf6c 521}
522
523void
9522b380 524helperStats(StoreEntry * sentry, helper * hlp, const char *label)
74addf6c 525{
9522b380 526 if (!helperStartStats(sentry, hlp, label))
527 return;
528
0a706c69 529 storeAppendPrintf(sentry, "program: %s\n",
62e76326 530 hlp->cmdline->key);
74addf6c 531 storeAppendPrintf(sentry, "number running: %d of %d\n",
62e76326 532 hlp->n_running, hlp->n_to_start);
74addf6c 533 storeAppendPrintf(sentry, "requests sent: %d\n",
62e76326 534 hlp->stats.requests);
74addf6c 535 storeAppendPrintf(sentry, "replies received: %d\n",
62e76326 536 hlp->stats.replies);
74addf6c 537 storeAppendPrintf(sentry, "queue length: %d\n",
62e76326 538 hlp->stats.queue_size);
f9598528 539 storeAppendPrintf(sentry, "avg service time: %d msec\n",
540 hlp->stats.avg_svc_time);
74addf6c 541 storeAppendPrintf(sentry, "\n");
09c1ece1 542 storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%s\t%7s\t%7s\t%7s\n",
62e76326 543 "#",
544 "FD",
545 "PID",
546 "# Requests",
547 "Flags",
548 "Time",
549 "Offset",
550 "Request");
551
4f0ef8e8 552 for (dlink_node *link = hlp->servers.head; link; link = link->next) {
07eca7e0 553 helper_server *srv = (helper_server*)link->data;
f9598528 554 double tt = 0.001 * (srv->requests[0] ? tvSubMsec(srv->requests[0]->dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
d8f10d6a 555 storeAppendPrintf(sentry, "%7d\t%7d\t%7d\t%11d\t%c%c%c%c\t%7.3f\t%7d\t%s\n",
62e76326 556 srv->index + 1,
557 srv->rfd,
558 srv->pid,
559 srv->stats.uses,
07eca7e0 560 srv->stats.pending ? 'B' : ' ',
561 srv->flags.writing ? 'W' : ' ',
62e76326 562 srv->flags.closing ? 'C' : ' ',
563 srv->flags.shutdown ? 'S' : ' ',
564 tt < 0.0 ? 0.0 : tt,
07eca7e0 565 (int) srv->roffset,
566 srv->requests[0] ? log_quote(srv->requests[0]->buf) : "(none)");
74addf6c 567 }
62e76326 568
74addf6c 569 storeAppendPrintf(sentry, "\nFlags key:\n\n");
74addf6c 570 storeAppendPrintf(sentry, " B = BUSY\n");
f9598528 571 storeAppendPrintf(sentry, " W = WRITING\n");
74addf6c 572 storeAppendPrintf(sentry, " C = CLOSING\n");
573 storeAppendPrintf(sentry, " S = SHUTDOWN\n");
574}
575
94439e4e 576void
9522b380 577helperStatefulStats(StoreEntry * sentry, statefulhelper * hlp, const char *label)
94439e4e 578{
9522b380 579 if (!helperStartStats(sentry, hlp, label))
580 return;
581
cf154edc 582 storeAppendPrintf(sentry, "program: %s\n",
583 hlp->cmdline->key);
94439e4e 584 storeAppendPrintf(sentry, "number running: %d of %d\n",
62e76326 585 hlp->n_running, hlp->n_to_start);
94439e4e 586 storeAppendPrintf(sentry, "requests sent: %d\n",
62e76326 587 hlp->stats.requests);
94439e4e 588 storeAppendPrintf(sentry, "replies received: %d\n",
62e76326 589 hlp->stats.replies);
94439e4e 590 storeAppendPrintf(sentry, "queue length: %d\n",
62e76326 591 hlp->stats.queue_size);
94439e4e 592 storeAppendPrintf(sentry, "avg service time: %d msec\n",
62e76326 593 hlp->stats.avg_svc_time);
94439e4e 594 storeAppendPrintf(sentry, "\n");
0a706c69 595 storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%20s\t%s\t%7s\t%7s\t%7s\n",
62e76326 596 "#",
597 "FD",
598 "PID",
599 "# Requests",
600 "# Deferred Requests",
601 "Flags",
602 "Time",
603 "Offset",
604 "Request");
605
4f0ef8e8 606 for (dlink_node *link = hlp->servers.head; link; link = link->next) {
607 helper_stateful_server *srv = (helper_stateful_server *)link->data;
608 double tt = 0.001 * tvSubMsec(srv->dispatch_time,
f9598528 609 srv->flags.busy ? current_time : srv->answer_time);
b902a58c 610 storeAppendPrintf(sentry, "%7d\t%7d\t%7d\t%11d\t%20d\t%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
62e76326 611 srv->index + 1,
612 srv->rfd,
613 srv->pid,
614 srv->stats.uses,
615 (int) srv->deferred_requests,
62e76326 616 srv->flags.busy ? 'B' : ' ',
617 srv->flags.closing ? 'C' : ' ',
4f0ef8e8 618 srv->flags.reserved == S_HELPER_RESERVED ? 'R' : (srv->flags.reserved == S_HELPER_DEFERRED ? 'D' : ' '),
62e76326 619 srv->flags.shutdown ? 'S' : ' ',
620 srv->request ? (srv->request->placeholder ? 'P' : ' ') : ' ',
621 tt < 0.0 ? 0.0 : tt,
07eca7e0 622 (int) srv->roffset,
62e76326 623 srv->request ? log_quote(srv->request->buf) : "(none)");
94439e4e 624 }
62e76326 625
94439e4e 626 storeAppendPrintf(sentry, "\nFlags key:\n\n");
94439e4e 627 storeAppendPrintf(sentry, " B = BUSY\n");
628 storeAppendPrintf(sentry, " C = CLOSING\n");
629 storeAppendPrintf(sentry, " R = RESERVED or DEFERRED\n");
630 storeAppendPrintf(sentry, " S = SHUTDOWN\n");
5d146f7d 631 storeAppendPrintf(sentry, " P = PLACEHOLDER\n");
94439e4e 632}
633
74addf6c 634void
635helperShutdown(helper * hlp)
636{
c68e9c6b 637 dlink_node *link = hlp->servers.head;
2d5d5261 638#ifdef _SQUID_MSWIN_
639
640 HANDLE hIpc;
641 pid_t pid;
642 int no;
643#endif
62e76326 644
c68e9c6b 645 while (link) {
62e76326 646 helper_server *srv;
647 srv = (helper_server *)link->data;
648 link = link->next;
649
8cfc76db 650 if (srv->flags.shutdown) {
bf8fe701 651 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " has already SHUT DOWN.");
62e76326 652 continue;
653 }
654
d8f10d6a 655 hlp->n_active--;
656 assert(hlp->n_active >= 0);
657
62e76326 658 srv->flags.shutdown = 1; /* request it to shut itself down */
659
d8f10d6a 660 if (srv->flags.closing) {
bf8fe701 661 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is CLOSING.");
62e76326 662 continue;
663 }
664
d8f10d6a 665 if (srv->stats.pending) {
bf8fe701 666 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is BUSY.");
62e76326 667 continue;
668 }
669
670 srv->flags.closing = 1;
2d5d5261 671#ifdef _SQUID_MSWIN_
672
673 hIpc = srv->hIpc;
674 pid = srv->pid;
675 no = srv->index + 1;
676 shutdown(srv->wfd, SD_BOTH);
677#endif
678
bf8fe701 679 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " shutting down.");
62e76326 680 /* the rest of the details is dealt with in the helperServerFree
681 * close handler
682 */
683 comm_close(srv->rfd);
2d5d5261 684#ifdef _SQUID_MSWIN_
685
686 if (hIpc) {
687 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
688 getCurrentTime();
bf8fe701 689 debugs(84, 1, "helperShutdown: WARNING: " << hlp->id_name <<
690 " #" << no << " (" << hlp->cmdline->key << "," <<
691 (long int)pid << ") didn't exit in 5 seconds");
692
2d5d5261 693 }
694
695 CloseHandle(hIpc);
696 }
697
698#endif
699
74addf6c 700 }
701}
702
94439e4e 703void
704helperStatefulShutdown(statefulhelper * hlp)
705{
706 dlink_node *link = hlp->servers.head;
707 helper_stateful_server *srv;
2d5d5261 708#ifdef _SQUID_MSWIN_
709
710 HANDLE hIpc;
711 pid_t pid;
712 int no;
713#endif
62e76326 714
94439e4e 715 while (link) {
62e76326 716 srv = (helper_stateful_server *)link->data;
717 link = link->next;
718
8cfc76db 719 if (srv->flags.shutdown) {
bf8fe701 720 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " has already SHUT DOWN.");
62e76326 721 continue;
722 }
723
d8f10d6a 724 hlp->n_active--;
725 assert(hlp->n_active >= 0);
62e76326 726 srv->flags.shutdown = 1; /* request it to shut itself down */
727
728 if (srv->flags.busy) {
bf8fe701 729 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is BUSY.");
62e76326 730 continue;
731 }
732
733 if (srv->flags.closing) {
bf8fe701 734 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is CLOSING.");
62e76326 735 continue;
736 }
737
738 if (srv->flags.reserved != S_HELPER_FREE) {
bf8fe701 739 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is RESERVED.");
62e76326 740 continue;
741 }
742
743 if (srv->deferred_requests) {
bf8fe701 744 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " has DEFERRED requests.");
62e76326 745 continue;
746 }
747
748 srv->flags.closing = 1;
2d5d5261 749#ifdef _SQUID_MSWIN_
750
751 hIpc = srv->hIpc;
752 pid = srv->pid;
753 no = srv->index + 1;
754 shutdown(srv->wfd, SD_BOTH);
755#endif
756
bf8fe701 757 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " shutting down.");
758
62e76326 759 /* the rest of the details is dealt with in the helperStatefulServerFree
760 * close handler
761 */
762 comm_close(srv->rfd);
2d5d5261 763#ifdef _SQUID_MSWIN_
764
765 if (hIpc) {
766 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
767 getCurrentTime();
bf8fe701 768 debugs(84, 1, "helperShutdown: WARNING: " << hlp->id_name <<
769 " #" << no << " (" << hlp->cmdline->key << "," <<
770 (long int)pid << ") didn't exit in 5 seconds");
2d5d5261 771 }
772
773 CloseHandle(hIpc);
774 }
775
776#endif
777
94439e4e 778 }
779}
780
781
1f5f60dd 782helper *
783helperCreate(const char *name)
784{
28c60158 785 helper *hlp;
aa839030 786 CBDATA_INIT_TYPE(helper);
72711e31 787 hlp = cbdataAlloc(helper);
1f5f60dd 788 hlp->id_name = name;
789 return hlp;
790}
791
94439e4e 792statefulhelper *
793helperStatefulCreate(const char *name)
794{
795 statefulhelper *hlp;
aa839030 796 CBDATA_INIT_TYPE(statefulhelper);
72711e31 797 hlp = cbdataAlloc(statefulhelper);
94439e4e 798 hlp->id_name = name;
799 return hlp;
800}
801
802
1f5f60dd 803void
804helperFree(helper * hlp)
805{
5dae8514 806 if (!hlp)
62e76326 807 return;
808
1f5f60dd 809 /* note, don't free hlp->name, it probably points to static memory */
fe73896c 810 if (hlp->queue.head)
bf8fe701 811 debugs(84, 0, "WARNING: freeing " << hlp->id_name << " helper with " <<
812 hlp->stats.queue_size << " requests queued");
62e76326 813
1f5f60dd 814 cbdataFree(hlp);
815}
816
94439e4e 817void
818helperStatefulFree(statefulhelper * hlp)
819{
5dae8514 820 if (!hlp)
62e76326 821 return;
822
94439e4e 823 /* note, don't free hlp->name, it probably points to static memory */
824 if (hlp->queue.head)
bf8fe701 825 debugs(84, 0, "WARNING: freeing " << hlp->id_name << " helper with " <<
826 hlp->stats.queue_size << " requests queued");
62e76326 827
94439e4e 828 cbdataFree(hlp);
829}
830
831
74addf6c 832/* ====================================================================== */
833/* LOCAL FUNCTIONS */
834/* ====================================================================== */
835
836static void
1f5f60dd 837helperServerFree(int fd, void *data)
74addf6c 838{
e6ccf245 839 helper_server *srv = (helper_server *)data;
74addf6c 840 helper *hlp = srv->parent;
ac750329 841 helper_request *r;
07eca7e0 842 int i, concurrency = hlp->concurrency;
843
844 if (!concurrency)
845 concurrency = 1;
846
74addf6c 847 assert(srv->rfd == fd);
62e76326 848
07eca7e0 849 if (srv->rbuf) {
850 memFreeBuf(srv->rbuf_sz, srv->rbuf);
851 srv->rbuf = NULL;
74addf6c 852 }
62e76326 853
2fe7eff9 854 srv->wqueue->clean();
032785bf 855 delete srv->wqueue;
856
857 if (srv->writebuf) {
2fe7eff9 858 srv->writebuf->clean();
032785bf 859 delete srv->writebuf;
860 srv->writebuf = NULL;
861 }
62e76326 862
07eca7e0 863 for (i = 0; i < concurrency; i++) {
864 if ((r = srv->requests[i])) {
865 void *cbdata;
62e76326 866
07eca7e0 867 if (cbdataReferenceValidDone(r->data, &cbdata))
868 r->callback(cbdata, NULL);
62e76326 869
07eca7e0 870 helperRequestFree(r);
871
872 srv->requests[i] = NULL;
873 }
ac750329 874 }
62e76326 875
07eca7e0 876 safe_free(srv->requests);
877
3cdb7cd0 878 if (srv->wfd != srv->rfd && srv->wfd != -1)
62e76326 879 comm_close(srv->wfd);
880
74addf6c 881 dlinkDelete(&srv->link, &hlp->servers);
62e76326 882
74addf6c 883 hlp->n_running--;
62e76326 884
74addf6c 885 assert(hlp->n_running >= 0);
62e76326 886
1f5f60dd 887 if (!srv->flags.shutdown) {
d8f10d6a 888 hlp->n_active--;
889 assert(hlp->n_active >= 0);
bf8fe701 890 debugs(84, 0, "WARNING: " << hlp->id_name << " #" << srv->index + 1 <<
891 " (FD " << fd << ") exited");
62e76326 892
5ea33fce 893 if (hlp->n_active < hlp->n_to_start / 2) {
bf8fe701 894 debugs(80, 0, "Too few " << hlp->id_name << " processes are running");
5ea33fce 895
896 if (hlp->last_restart > squid_curtime - 30)
897 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
898
bf8fe701 899 debugs(80, 0, "Starting new helpers");
5ea33fce 900
901 helperOpenServers(hlp);
902 }
14e87a44 903 }
62e76326 904
fa80a8ef 905 cbdataReferenceDone(srv->parent);
14e87a44 906 cbdataFree(srv);
74addf6c 907}
908
94439e4e 909static void
910helperStatefulServerFree(int fd, void *data)
911{
e6ccf245 912 helper_stateful_server *srv = (helper_stateful_server *)data;
94439e4e 913 statefulhelper *hlp = srv->parent;
914 helper_stateful_request *r;
915 assert(srv->rfd == fd);
62e76326 916
07eca7e0 917 if (srv->rbuf) {
918 memFreeBuf(srv->rbuf_sz, srv->rbuf);
919 srv->rbuf = NULL;
94439e4e 920 }
62e76326 921
07eca7e0 922#if 0
2fe7eff9 923 srv->wqueue->clean();
032785bf 924
925 delete srv->wqueue;
07eca7e0 926
927#endif
928
94439e4e 929 if ((r = srv->request)) {
62e76326 930 void *cbdata;
931
932 if (cbdataReferenceValidDone(r->data, &cbdata))
07eca7e0 933 r->callback(cbdata, srv, NULL);
62e76326 934
935 helperStatefulRequestFree(r);
936
937 srv->request = NULL;
94439e4e 938 }
62e76326 939
3c641669 940 /* TODO: walk the local queue of requests and carry them all out */
94439e4e 941 if (srv->wfd != srv->rfd && srv->wfd != -1)
62e76326 942 comm_close(srv->wfd);
943
94439e4e 944 dlinkDelete(&srv->link, &hlp->servers);
62e76326 945
94439e4e 946 hlp->n_running--;
62e76326 947
94439e4e 948 assert(hlp->n_running >= 0);
62e76326 949
94439e4e 950 if (!srv->flags.shutdown) {
d8f10d6a 951 hlp->n_active--;
952 assert( hlp->n_active >= 0);
bf8fe701 953 debugs(84, 0, "WARNING: " << hlp->id_name << " #" << srv->index + 1 << " (FD " << fd << ") exited");
62e76326 954
5ea33fce 955 if (hlp->n_active <= hlp->n_to_start / 2) {
bf8fe701 956 debugs(80, 0, "Too few " << hlp->id_name << " processes are running");
5ea33fce 957
958 if (hlp->last_restart > squid_curtime - 30)
959 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
960
bf8fe701 961 debugs(80, 0, "Starting new helpers");
5ea33fce 962
963 helperStatefulOpenServers(hlp);
964 }
94439e4e 965 }
62e76326 966
94439e4e 967 if (srv->data != NULL)
b001e822 968 hlp->datapool->free(srv->data);
62e76326 969
fa80a8ef 970 cbdataReferenceDone(srv->parent);
62e76326 971
94439e4e 972 cbdataFree(srv);
973}
974
975
74addf6c 976static void
c4b7a5a9 977helperHandleRead(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
74addf6c 978{
74addf6c 979 char *t = NULL;
e6ccf245 980 helper_server *srv = (helper_server *)data;
74addf6c 981 helper *hlp = srv->parent;
982 assert(fd == srv->rfd);
fa80a8ef 983 assert(cbdataReferenceValid(data));
c4b7a5a9 984
985 /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
62e76326 986
c4b7a5a9 987 if (flag == COMM_ERR_CLOSING) {
988 return;
989 }
990
4f0ef8e8 991 debugs(84, 5, "helperHandleRead: " << len << " bytes from " << hlp->id_name << " #" << srv->index + 1);
62e76326 992
c4b7a5a9 993 if (flag != COMM_OK || len <= 0) {
62e76326 994 if (len < 0)
bf8fe701 995 debugs(84, 1, "helperHandleRead: FD " << fd << " read: " << xstrerror());
62e76326 996
997 comm_close(fd);
998
999 return;
74addf6c 1000 }
62e76326 1001
07eca7e0 1002 srv->roffset += len;
1003 srv->rbuf[srv->roffset] = '\0';
bf8fe701 1004 debugs(84, 9, "helperHandleRead: '" << srv->rbuf << "'");
62e76326 1005
07eca7e0 1006 if (!srv->stats.pending) {
62e76326 1007 /* someone spoke without being spoken to */
bf8fe701 1008 debugs(84, 1, "helperHandleRead: unexpected read from " <<
1009 hlp->id_name << " #" << srv->index + 1 << ", " << (int)len <<
1010 " bytes '" << srv->rbuf << "'");
1011
07eca7e0 1012 srv->roffset = 0;
1013 srv->rbuf[0] = '\0';
1014 }
1015
1016 while ((t = strchr(srv->rbuf, '\n'))) {
62e76326 1017 /* end of reply found */
07eca7e0 1018 helper_request *r;
1019 char *msg = srv->rbuf;
1020 int i = 0;
bf8fe701 1021 debugs(84, 3, "helperHandleRead: end of reply found");
6bf4f823 1022
1023 if (t > srv->rbuf && t[-1] == '\r')
1024 t[-1] = '\0';
1025
07eca7e0 1026 *t++ = '\0';
62e76326 1027
07eca7e0 1028 if (hlp->concurrency) {
1029 i = strtol(msg, &msg, 10);
62e76326 1030
e4755e29 1031 while (*msg && xisspace(*msg))
07eca7e0 1032 msg++;
1033 }
62e76326 1034
07eca7e0 1035 r = srv->requests[i];
62e76326 1036
07eca7e0 1037 if (r) {
1038 HLPCB *callback = r->callback;
1039 void *cbdata;
62e76326 1040
07eca7e0 1041 srv->requests[i] = NULL;
62e76326 1042
07eca7e0 1043 r->callback = NULL;
62e76326 1044
07eca7e0 1045 if (cbdataReferenceValidDone(r->data, &cbdata))
1046 callback(cbdata, msg);
62e76326 1047
07eca7e0 1048 srv->stats.pending--;
1049
1050 hlp->stats.replies++;
1051
f9598528 1052 srv->answer_time = current_time;
1053
1054 srv->dispatch_time = r->dispatch_time;
1055
07eca7e0 1056 hlp->stats.avg_svc_time =
1057 intAverage(hlp->stats.avg_svc_time,
1058 tvSubMsec(r->dispatch_time, current_time),
1059 hlp->stats.replies, REDIRECT_AV_FACTOR);
1060
1061 helperRequestFree(r);
1062 } else {
bf8fe701 1063 debugs(84, 1, "helperHandleRead: unexpected reply on channel " <<
1064 i << " from " << hlp->id_name << " #" << srv->index + 1 <<
1065 " '" << srv->rbuf << "'");
1066
07eca7e0 1067 }
1068
1069 srv->roffset -= (t - srv->rbuf);
1070 memmove(srv->rbuf, t, srv->roffset + 1);
62e76326 1071
1072 if (srv->flags.shutdown) {
1073 int wfd = srv->wfd;
1074 srv->wfd = -1;
d8f10d6a 1075 srv->flags.closing=1;
62e76326 1076 comm_close(wfd);
b7178dba 1077 return;
62e76326 1078 } else
1079 helperKickQueue(hlp);
74addf6c 1080 }
07eca7e0 1081
1082 comm_read(fd, srv->rbuf + srv->roffset, srv->rbuf_sz - srv->roffset - 1, helperHandleRead, srv);
74addf6c 1083}
1084
94439e4e 1085static void
c4b7a5a9 1086helperStatefulHandleRead(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
94439e4e 1087{
94439e4e 1088 char *t = NULL;
e6ccf245 1089 helper_stateful_server *srv = (helper_stateful_server *)data;
94439e4e 1090 helper_stateful_request *r;
1091 statefulhelper *hlp = srv->parent;
1092 assert(fd == srv->rfd);
fa80a8ef 1093 assert(cbdataReferenceValid(data));
c4b7a5a9 1094
1095 /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
62e76326 1096
c4b7a5a9 1097 if (flag == COMM_ERR_CLOSING) {
1098 return;
1099 }
1100
4a7a3d56 1101 debugs(84, 5, "helperStatefulHandleRead: " << len << " bytes from " <<
4f0ef8e8 1102 hlp->id_name << " #" << srv->index + 1);
bf8fe701 1103
62e76326 1104
c4b7a5a9 1105 if (flag != COMM_OK || len <= 0) {
62e76326 1106 if (len < 0)
bf8fe701 1107 debugs(84, 1, "helperStatefulHandleRead: FD " << fd << " read: " << xstrerror());
62e76326 1108
1109 comm_close(fd);
1110
1111 return;
94439e4e 1112 }
62e76326 1113
07eca7e0 1114 srv->roffset += len;
1115 srv->rbuf[srv->roffset] = '\0';
94439e4e 1116 r = srv->request;
62e76326 1117
94439e4e 1118 if (r == NULL) {
62e76326 1119 /* someone spoke without being spoken to */
bf8fe701 1120 debugs(84, 1, "helperStatefulHandleRead: unexpected read from " <<
1121 hlp->id_name << " #" << srv->index + 1 << ", " << (int)len <<
1122 " bytes '" << srv->rbuf << "'");
1123
07eca7e0 1124 srv->roffset = 0;
1125 }
1126
1127 if ((t = strchr(srv->rbuf, '\n'))) {
62e76326 1128 /* end of reply found */
bf8fe701 1129 debugs(84, 3, "helperStatefulHandleRead: end of reply found");
6bf4f823 1130
1131 if (t > srv->rbuf && t[-1] == '\r')
1132 t[-1] = '\0';
1133
62e76326 1134 *t = '\0';
1135
2734fd37 1136 if (r && cbdataReferenceValid(r->data)) {
07eca7e0 1137 switch ((r->callback(r->data, srv, srv->rbuf))) { /*if non-zero reserve helper */
62e76326 1138
1139 case S_HELPER_UNKNOWN:
2324cda2 1140 fatal("helperStatefulHandleRead: either a non-state aware callback was give to the stateful helper routines, or an uninitialised callback response was received.\n");
62e76326 1141 break;
1142
1143 case S_HELPER_RELEASE: /* helper finished with */
1144
1145 if (!srv->deferred_requests && !srv->queue.head) {
1146 srv->flags.reserved = S_HELPER_FREE;
1147
1148 if ((srv->parent->OnEmptyQueue != NULL) && (srv->data))
1149 srv->parent->OnEmptyQueue(srv->data);
1150
bf8fe701 1151 debugs(84, 5, "StatefulHandleRead: releasing " << hlp->id_name << " #" << srv->index + 1);
62e76326 1152 } else {
1153 srv->flags.reserved = S_HELPER_DEFERRED;
bf8fe701 1154 debugs(84, 5, "StatefulHandleRead: outstanding deferred requests on " <<
1155 hlp->id_name << " #" << srv->index + 1 <<
1156 ". reserving for deferred requests.");
62e76326 1157 }
1158
1159 break;
1160
1161 case S_HELPER_RESERVE: /* 'pin' this helper for the caller */
1162
1163 if (!srv->queue.head) {
1164 assert(srv->deferred_requests == 0);
1165 srv->flags.reserved = S_HELPER_RESERVED;
bf8fe701 1166 debugs(84, 5, "StatefulHandleRead: reserving " << hlp->id_name << " #" << srv->index + 1);
62e76326 1167 } else {
1168 fatal("StatefulHandleRead: Callback routine attempted to reserve a stateful helper with deferred requests. This can lead to deadlock.\n");
1169 }
1170
1171 break;
1172
1173 case S_HELPER_DEFER:
1174 /* the helper is still needed, but can
1175 * be used for other requests in the meantime.
1176 */
1177 srv->flags.reserved = S_HELPER_DEFERRED;
1178 srv->deferred_requests++;
1179 srv->stats.deferbycb++;
bf8fe701 1180 debugs(84, 5, "StatefulHandleRead: reserving " << hlp->id_name << " #" << srv->index + 1 << " for deferred requests.");
62e76326 1181 break;
1182
1183 default:
1184 fatal("helperStatefulHandleRead: unknown stateful helper callback result.\n");
1185 }
1186
1187 } else {
bf8fe701 1188 debugs(84, 1, "StatefulHandleRead: no callback data registered");
62e76326 1189 }
1190
1191 srv->flags.busy = 0;
07eca7e0 1192 srv->roffset = 0;
62e76326 1193 helperStatefulRequestFree(r);
1194 srv->request = NULL;
1195 hlp->stats.replies++;
f9598528 1196 srv->answer_time = current_time;
62e76326 1197 hlp->stats.avg_svc_time =
1198 intAverage(hlp->stats.avg_svc_time,
1199 tvSubMsec(srv->dispatch_time, current_time),
1200 hlp->stats.replies, REDIRECT_AV_FACTOR);
1201
1202 if (srv->flags.shutdown
1203 && srv->flags.reserved == S_HELPER_FREE
1204 && !srv->deferred_requests) {
1205 int wfd = srv->wfd;
1206 srv->wfd = -1;
d8f10d6a 1207 srv->flags.closing=1;
62e76326 1208 comm_close(wfd);
1209 } else {
1210 if (srv->queue.head)
1211 helperStatefulServerKickQueue(srv);
1212 else
1213 helperStatefulKickQueue(hlp);
1214 }
94439e4e 1215 }
07eca7e0 1216
1217 comm_read(srv->rfd, srv->rbuf + srv->roffset, srv->rbuf_sz - srv->roffset - 1,
1218 helperStatefulHandleRead, srv);
94439e4e 1219}
1220
74addf6c 1221static void
1222Enqueue(helper * hlp, helper_request * r)
1223{
e6ccf245 1224 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
74addf6c 1225 dlinkAddTail(r, link, &hlp->queue);
1226 hlp->stats.queue_size++;
62e76326 1227
74addf6c 1228 if (hlp->stats.queue_size < hlp->n_running)
62e76326 1229 return;
1230
74addf6c 1231 if (squid_curtime - hlp->last_queue_warn < 600)
62e76326 1232 return;
1233
fe73896c 1234 if (shutting_down || reconfiguring)
62e76326 1235 return;
1236
74addf6c 1237 hlp->last_queue_warn = squid_curtime;
62e76326 1238
bf8fe701 1239 debugs(84, 0, "WARNING: All " << hlp->id_name << " processes are busy.");
1240 debugs(84, 0, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
62e76326 1241
62e76326 1242
74addf6c 1243 if (hlp->stats.queue_size > hlp->n_running * 2)
62e76326 1244 fatalf("Too many queued %s requests", hlp->id_name);
1245
bf8fe701 1246 debugs(84, 1, "Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1247
74addf6c 1248}
1249
94439e4e 1250static void
1251StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r)
1252{
e6ccf245 1253 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
94439e4e 1254 dlinkAddTail(r, link, &hlp->queue);
1255 hlp->stats.queue_size++;
62e76326 1256
94439e4e 1257 if (hlp->stats.queue_size < hlp->n_running)
62e76326 1258 return;
1259
893cbac6 1260 if (hlp->stats.queue_size > hlp->n_running * 2)
62e76326 1261 fatalf("Too many queued %s requests", hlp->id_name);
1262
94439e4e 1263 if (squid_curtime - hlp->last_queue_warn < 600)
62e76326 1264 return;
1265
94439e4e 1266 if (shutting_down || reconfiguring)
62e76326 1267 return;
1268
94439e4e 1269 hlp->last_queue_warn = squid_curtime;
62e76326 1270
bf8fe701 1271 debugs(84, 0, "WARNING: All " << hlp->id_name << " processes are busy.");
62e76326 1272
bf8fe701 1273 debugs(84, 0, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1274 debugs(84, 1, "Consider increasing the number of " << hlp->id_name << " processes in your config file.");
62e76326 1275
94439e4e 1276}
1277
1278static void
1279StatefulServerEnqueue(helper_stateful_server * srv, helper_stateful_request * r)
1280{
e6ccf245 1281 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
94439e4e 1282 dlinkAddTail(r, link, &srv->queue);
62e76326 1283 /* TODO: warning if the queue on this server is more than X
1284 * We don't check the queue size at the moment, because
1285 * requests hitting here are deferrable
1286 */
1287 /* hlp->stats.queue_size++;
1288 * if (hlp->stats.queue_size < hlp->n_running)
1289 * return;
1290 * if (squid_curtime - hlp->last_queue_warn < 600)
1291 * return;
1292 * if (shutting_down || reconfiguring)
1293 * return;
1294 * hlp->last_queue_warn = squid_curtime;
bf8fe701 1295 * debugs(84, 0, "WARNING: All " << hlp->id_name << " processes are busy.");
1296 * debugs(84, 0, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
62e76326 1297 * if (hlp->stats.queue_size > hlp->n_running * 2)
1298 * fatalf("Too many queued %s requests", hlp->id_name);
bf8fe701 1299 * debugs(84, 1, "Consider increasing the number of " << hlp->id_name << " processes in your config file." ); */
94439e4e 1300}
1301
1302
74addf6c 1303static helper_request *
1304Dequeue(helper * hlp)
1305{
1306 dlink_node *link;
1307 helper_request *r = NULL;
62e76326 1308
74addf6c 1309 if ((link = hlp->queue.head)) {
62e76326 1310 r = (helper_request *)link->data;
1311 dlinkDelete(link, &hlp->queue);
1312 memFree(link, MEM_DLINK_NODE);
1313 hlp->stats.queue_size--;
74addf6c 1314 }
62e76326 1315
74addf6c 1316 return r;
1317}
1318
94439e4e 1319static helper_stateful_request *
1320StatefulServerDequeue(helper_stateful_server * srv)
1321{
1322 dlink_node *link;
1323 helper_stateful_request *r = NULL;
62e76326 1324
94439e4e 1325 if ((link = srv->queue.head)) {
62e76326 1326 r = (helper_stateful_request *)link->data;
1327 dlinkDelete(link, &srv->queue);
1328 memFree(link, MEM_DLINK_NODE);
94439e4e 1329 }
62e76326 1330
94439e4e 1331 return r;
1332}
1333
1334static helper_stateful_request *
1335StatefulDequeue(statefulhelper * hlp)
1336{
1337 dlink_node *link;
1338 helper_stateful_request *r = NULL;
62e76326 1339
94439e4e 1340 if ((link = hlp->queue.head)) {
62e76326 1341 r = (helper_stateful_request *)link->data;
1342 dlinkDelete(link, &hlp->queue);
1343 memFree(link, MEM_DLINK_NODE);
1344 hlp->stats.queue_size--;
94439e4e 1345 }
62e76326 1346
94439e4e 1347 return r;
1348}
1349
74addf6c 1350static helper_server *
1351GetFirstAvailable(helper * hlp)
1352{
1353 dlink_node *n;
07eca7e0 1354 helper_server *srv;
1355 helper_server *selected = NULL;
62e76326 1356
fe73896c 1357 if (hlp->n_running == 0)
62e76326 1358 return NULL;
1359
07eca7e0 1360 /* Find "least" loaded helper (approx) */
74addf6c 1361 for (n = hlp->servers.head; n != NULL; n = n->next) {
62e76326 1362 srv = (helper_server *)n->data;
1363
07eca7e0 1364 if (selected && selected->stats.pending <= srv->stats.pending)
62e76326 1365 continue;
1366
d8f10d6a 1367 if (srv->flags.shutdown)
62e76326 1368 continue;
1369
07eca7e0 1370 if (!srv->stats.pending)
1371 return srv;
1372
1373 if (selected) {
1374 selected = srv;
1375 break;
1376 }
1377
1378 selected = srv;
74addf6c 1379 }
62e76326 1380
07eca7e0 1381 /* Check for overload */
1382 if (!selected)
1383 return NULL;
1384
1385 if (selected->stats.pending >= (hlp->concurrency ? hlp->concurrency : 1))
1386 return NULL;
1387
1388 return selected;
74addf6c 1389}
1390
94439e4e 1391static helper_stateful_server *
1392StatefulGetFirstAvailable(statefulhelper * hlp)
1393{
1394 dlink_node *n;
1395 helper_stateful_server *srv = NULL;
4f0ef8e8 1396 debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp->n_running);
62e76326 1397
94439e4e 1398 if (hlp->n_running == 0)
62e76326 1399 return NULL;
1400
94439e4e 1401 for (n = hlp->servers.head; n != NULL; n = n->next) {
62e76326 1402 srv = (helper_stateful_server *)n->data;
1403
1404 if (srv->flags.busy)
1405 continue;
1406
1407 if (srv->flags.reserved == S_HELPER_RESERVED)
1408 continue;
1409
d8f10d6a 1410 if (srv->flags.shutdown)
62e76326 1411 continue;
1412
1413 if ((hlp->IsAvailable != NULL) && (srv->data != NULL) && !(hlp->IsAvailable(srv->data)))
1414 continue;
1415
4f0ef8e8 1416 debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv->index);
62e76326 1417 return srv;
94439e4e 1418 }
62e76326 1419
bf8fe701 1420 debugs(84, 5, "StatefulGetFirstAvailable: None available.");
94439e4e 1421 return NULL;
1422}
1423
1424
42679bd6 1425static void
1426helperDispatchWriteDone(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
1427{
07eca7e0 1428 helper_server *srv = (helper_server *)data;
1429
2fe7eff9 1430 srv->writebuf->clean();
032785bf 1431 delete srv->writebuf;
1432 srv->writebuf = NULL;
07eca7e0 1433 srv->flags.writing = 0;
1434
1435 if (flag != COMM_OK) {
1436 /* Helper server has crashed */
bf8fe701 1437 debugs(84, 0, "helperDispatch: Helper " << srv->parent->id_name << " #" << srv->index + 1 << " has crashed");
07eca7e0 1438 return;
1439 }
1440
2fe7eff9 1441 if (!srv->wqueue->isNull()) {
07eca7e0 1442 srv->writebuf = srv->wqueue;
032785bf 1443 srv->wqueue = new MemBuf;
07eca7e0 1444 srv->flags.writing = 1;
1445 comm_write(srv->wfd,
032785bf 1446 srv->writebuf->content(),
1447 srv->writebuf->contentSize(),
07eca7e0 1448 helperDispatchWriteDone, /* Handler */
2b663917 1449 srv, NULL); /* Handler-data, freefunc */
07eca7e0 1450 }
42679bd6 1451}
1452
74addf6c 1453static void
1454helperDispatch(helper_server * srv, helper_request * r)
1455{
1456 helper *hlp = srv->parent;
07eca7e0 1457 helper_request **ptr = NULL;
1458 unsigned int slot;
62e76326 1459
fa80a8ef 1460 if (!cbdataReferenceValid(r->data)) {
bf8fe701 1461 debugs(84, 1, "helperDispatch: invalid callback data");
62e76326 1462 helperRequestFree(r);
1463 return;
74addf6c 1464 }
62e76326 1465
07eca7e0 1466 for (slot = 0; slot < (hlp->concurrency ? hlp->concurrency : 1); slot++) {
1467 if (!srv->requests[slot]) {
1468 ptr = &srv->requests[slot];
1469 break;
1470 }
1471 }
1472
1473 assert(ptr);
1474 *ptr = r;
1475 srv->stats.pending += 1;
1476 r->dispatch_time = current_time;
1477
2fe7eff9 1478 if (srv->wqueue->isNull())
1479 srv->wqueue->init();
07eca7e0 1480
1481 if (hlp->concurrency)
2fe7eff9 1482 srv->wqueue->Printf("%d %s", slot, r->buf);
07eca7e0 1483 else
2fe7eff9 1484 srv->wqueue->append(r->buf, strlen(r->buf));
07eca7e0 1485
1486 if (!srv->flags.writing) {
032785bf 1487 assert(NULL == srv->writebuf);
07eca7e0 1488 srv->writebuf = srv->wqueue;
032785bf 1489 srv->wqueue = new MemBuf;
07eca7e0 1490 srv->flags.writing = 1;
1491 comm_write(srv->wfd,
032785bf 1492 srv->writebuf->content(),
1493 srv->writebuf->contentSize(),
07eca7e0 1494 helperDispatchWriteDone, /* Handler */
2b663917 1495 srv, NULL); /* Handler-data, free func */
07eca7e0 1496 }
1497
4a7a3d56 1498 debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index + 1 << ", " << strlen(r->buf) << " bytes");
bf8fe701 1499
74addf6c 1500 srv->stats.uses++;
1501 hlp->stats.requests++;
1502}
1503
42679bd6 1504static void
1505helperStatefulDispatchWriteDone(int fd, char *buf, size_t len, comm_err_t flag,
62e76326 1506 int xerrno, void *data)
42679bd6 1507{
62e76326 1508 /* nothing! */
42679bd6 1509}
1510
1511
94439e4e 1512static void
1513helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r)
1514{
1515 statefulhelper *hlp = srv->parent;
62e76326 1516
fa80a8ef 1517 if (!cbdataReferenceValid(r->data)) {
bf8fe701 1518 debugs(84, 1, "helperStatefulDispatch: invalid callback data");
62e76326 1519 helperStatefulRequestFree(r);
1520 return;
94439e4e 1521 }
62e76326 1522
bf8fe701 1523 debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index + 1);
62e76326 1524
94439e4e 1525 if (r->placeholder == 1) {
62e76326 1526 /* a callback is needed before this request can _use_ a helper. */
1527 /* we don't care about releasing/deferring this helper. The request NEVER
1528 * gets to the helper. So we throw away the return code */
1529 r->callback(r->data, srv, NULL);
1530 /* throw away the placeholder */
1531 helperStatefulRequestFree(r);
1532 /* and push the queue. Note that the callback may have submitted a new
1533 * request to the helper which is why we test for the request*/
1534
1535 if (srv->request == NULL) {
1536 if (srv->flags.shutdown
1537 && srv->flags.reserved == S_HELPER_FREE
1538 && !srv->deferred_requests) {
1539 int wfd = srv->wfd;
1540 srv->wfd = -1;
1541 comm_close(wfd);
1542 } else {
1543 if (srv->queue.head)
1544 helperStatefulServerKickQueue(srv);
1545 else
1546 helperStatefulKickQueue(hlp);
1547 }
1548 }
1549
1550 return;
94439e4e 1551 }
62e76326 1552
94439e4e 1553 srv->flags.busy = 1;
1554 srv->request = r;
1555 srv->dispatch_time = current_time;
42679bd6 1556 comm_write(srv->wfd,
62e76326 1557 r->buf,
1558 strlen(r->buf),
1559 helperStatefulDispatchWriteDone, /* Handler */
2b663917 1560 hlp, NULL); /* Handler-data, free func */
bf8fe701 1561 debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
1562 hlp->id_name << " #" << srv->index + 1 << ", " <<
1563 (int) strlen(r->buf) << " bytes");
1564
94439e4e 1565 srv->stats.uses++;
1566 hlp->stats.requests++;
1567}
1568
1569
74addf6c 1570static void
1571helperKickQueue(helper * hlp)
1572{
1573 helper_request *r;
1574 helper_server *srv;
62e76326 1575
74addf6c 1576 while ((srv = GetFirstAvailable(hlp)) && (r = Dequeue(hlp)))
62e76326 1577 helperDispatch(srv, r);
74addf6c 1578}
1579
94439e4e 1580static void
1581helperStatefulKickQueue(statefulhelper * hlp)
1582{
1583 helper_stateful_request *r;
1584 helper_stateful_server *srv;
62e76326 1585
94439e4e 1586 while ((srv = StatefulGetFirstAvailable(hlp)) && (r = StatefulDequeue(hlp)))
62e76326 1587 helperStatefulDispatch(srv, r);
94439e4e 1588}
1589
1590static void
1591helperStatefulServerKickQueue(helper_stateful_server * srv)
1592{
1593 helper_stateful_request *r;
62e76326 1594
94439e4e 1595 if ((r = StatefulServerDequeue(srv)))
62e76326 1596 helperStatefulDispatch(srv, r);
94439e4e 1597}
1598
74addf6c 1599static void
1600helperRequestFree(helper_request * r)
1601{
fa80a8ef 1602 cbdataReferenceDone(r->data);
74addf6c 1603 xfree(r->buf);
00d77d6b 1604 delete r;
51ee7c82 1605}
1606
94439e4e 1607static void
1608helperStatefulRequestFree(helper_stateful_request * r)
1609{
b1da7838 1610 if(r) {
1611 cbdataReferenceDone(r->data);
1612 xfree(r->buf);
1613 delete r;
1614 }
51ee7c82 1615}
9522b380 1616
1617// TODO: should helper_ and helper_stateful_ have a common parent?
1618static bool
1619helperStartStats(StoreEntry *sentry, void *hlp, const char *label)
1620{
1621 if (!hlp) {
1622 if (label)
1623 storeAppendPrintf(sentry, "%s: unavailable\n", label);
1624 return false;
1625 }
1626
1627 if (label)
1628 storeAppendPrintf(sentry, "%s:\n", label);
1629
1630 return true;
1631}