]> git.ipfire.org Git - thirdparty/squid.git/blame - src/helper.cc
Replaced one more mem-> with direct mem_obj-> reference.
[thirdparty/squid.git] / src / helper.cc
CommitLineData
f740a279 1
2/*
4f0ef8e8 3 * $Id: helper.cc,v 1.84 2007/05/09 09:07:39 wessels Exp $
f740a279 4 *
17bb3486 5 * DEBUG: section 84 Helper process maintenance
f740a279 6 * AUTHOR: Harvest Derived?
7 *
2b6662ba 8 * SQUID Web Proxy Cache http://www.squid-cache.org/
f740a279 9 * ----------------------------------------------------------
10 *
2b6662ba 11 * Squid is the result of efforts by numerous individuals from
12 * the Internet community; see the CONTRIBUTORS file for full
13 * details. Many organizations have provided support for Squid's
14 * development; see the SPONSORS file for full details. Squid is
15 * Copyrighted (C) 2001 by the Regents of the University of
16 * California; see the COPYRIGHT file for full details. Squid
17 * incorporates software developed and/or copyrighted by other
18 * sources; see the CREDITS file for full details.
f740a279 19 *
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
24 *
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
29 *
30 * You should have received a copy of the GNU General Public License
31 * along with this program; if not, write to the Free Software
32 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
33 *
34 */
35
74addf6c 36#include "squid.h"
51ee7c82 37#include "helper.h"
985c86bc 38#include "SquidTime.h"
e6ccf245 39#include "Store.h"
42679bd6 40#include "comm.h"
0eb49b6d 41#include "MemBuf.h"
d295d770 42#include "wordlist.h"
74addf6c 43
44#define HELPER_MAX_ARGS 64
45
c4b7a5a9 46static IOCB helperHandleRead;
47static IOCB helperStatefulHandleRead;
1f5f60dd 48static PF helperServerFree;
94439e4e 49static PF helperStatefulServerFree;
74addf6c 50static void Enqueue(helper * hlp, helper_request *);
51static helper_request *Dequeue(helper * hlp);
94439e4e 52static helper_stateful_request *StatefulDequeue(statefulhelper * hlp);
74addf6c 53static helper_server *GetFirstAvailable(helper * hlp);
94439e4e 54static helper_stateful_server *StatefulGetFirstAvailable(statefulhelper * hlp);
74addf6c 55static void helperDispatch(helper_server * srv, helper_request * r);
94439e4e 56static void helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r);
74addf6c 57static void helperKickQueue(helper * hlp);
94439e4e 58static void helperStatefulKickQueue(statefulhelper * hlp);
74addf6c 59static void helperRequestFree(helper_request * r);
94439e4e 60static void helperStatefulRequestFree(helper_stateful_request * r);
61static void StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r);
62static helper_stateful_request *StatefulServerDequeue(helper_stateful_server * srv);
63static void StatefulServerEnqueue(helper_stateful_server * srv, helper_stateful_request * r);
64static void helperStatefulServerKickQueue(helper_stateful_server * srv);
9522b380 65static bool helperStartStats(StoreEntry *sentry, void *hlp, const char *label);
66
74addf6c 67
aa839030 68CBDATA_TYPE(helper);
69CBDATA_TYPE(helper_server);
70CBDATA_TYPE(statefulhelper);
71CBDATA_TYPE(helper_stateful_server);
72
74addf6c 73void
74helperOpenServers(helper * hlp)
75{
76 char *s;
77 char *progname;
78 char *shortname;
79 char *procname;
a2c963ae 80 const char *args[HELPER_MAX_ARGS];
74addf6c 81 char fd_note_buf[FD_DESC_SZ];
82 helper_server *srv;
83 int nargs = 0;
84 int k;
b5d712b5 85 pid_t pid;
74addf6c 86 int rfd;
87 int wfd;
b5d712b5 88 void * hIpc;
74addf6c 89 wordlist *w;
62e76326 90
74addf6c 91 if (hlp->cmdline == NULL)
62e76326 92 return;
93
74addf6c 94 progname = hlp->cmdline->key;
62e76326 95
74addf6c 96 if ((s = strrchr(progname, '/')))
62e76326 97 shortname = xstrdup(s + 1);
74addf6c 98 else
62e76326 99 shortname = xstrdup(progname);
100
bf8fe701 101 debugs(84, 1, "helperOpenServers: Starting " << hlp->n_to_start << " '" << shortname << "' processes");
62e76326 102
e6ccf245 103 procname = (char *)xmalloc(strlen(shortname) + 3);
62e76326 104
74addf6c 105 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
62e76326 106
74addf6c 107 args[nargs++] = procname;
62e76326 108
74addf6c 109 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next)
62e76326 110 args[nargs++] = w->key;
111
74addf6c 112 args[nargs++] = NULL;
62e76326 113
74addf6c 114 assert(nargs <= HELPER_MAX_ARGS);
62e76326 115
74addf6c 116 for (k = 0; k < hlp->n_to_start; k++) {
62e76326 117 getCurrentTime();
118 rfd = wfd = -1;
b5d712b5 119 pid = ipcCreate(hlp->ipc_type,
120 progname,
121 args,
122 shortname,
123 &rfd,
124 &wfd,
125 &hIpc);
126
127 if (pid < 0) {
bf8fe701 128 debugs(84, 1, "WARNING: Cannot run '" << progname << "' process.");
62e76326 129 continue;
130 }
131
132 hlp->n_running++;
d8f10d6a 133 hlp->n_active++;
b5d712b5 134 CBDATA_INIT_TYPE(helper_server);
62e76326 135 srv = cbdataAlloc(helper_server);
b5d712b5 136 srv->hIpc = hIpc;
137 srv->pid = pid;
62e76326 138 srv->index = k;
139 srv->rfd = rfd;
140 srv->wfd = wfd;
07eca7e0 141 srv->rbuf = (char *)memAllocBuf(8192, &srv->rbuf_sz);
90bed1c4 142 srv->wqueue = new MemBuf;
07eca7e0 143 srv->roffset = 0;
144 srv->requests = (helper_request **)xcalloc(hlp->concurrency ? hlp->concurrency : 1, sizeof(*srv->requests));
62e76326 145 srv->parent = cbdataReference(hlp);
146 dlinkAddTail(srv, &srv->link, &hlp->servers);
147
148 if (rfd == wfd) {
149 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
150 fd_note(rfd, fd_note_buf);
151 } else {
152 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
153 fd_note(rfd, fd_note_buf);
154 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
155 fd_note(wfd, fd_note_buf);
156 }
157
158 commSetNonBlocking(rfd);
159
160 if (wfd != rfd)
161 commSetNonBlocking(wfd);
162
163 comm_add_close_handler(rfd, helperServerFree, srv);
07eca7e0 164
165 comm_read(srv->rfd, srv->rbuf, srv->rbuf_sz - 1, helperHandleRead, srv);
74addf6c 166 }
62e76326 167
5ea33fce 168 hlp->last_restart = squid_curtime;
74addf6c 169 safe_free(shortname);
170 safe_free(procname);
838b993c 171 helperKickQueue(hlp);
74addf6c 172}
173
4f0ef8e8 174/*
175 * DPW 2007-05-08
176 *
177 * helperStatefulOpenServers: create the stateful child helper processes
178 */
94439e4e 179void
180helperStatefulOpenServers(statefulhelper * hlp)
181{
94439e4e 182 char *shortname;
a2c963ae 183 const char *args[HELPER_MAX_ARGS];
94439e4e 184 char fd_note_buf[FD_DESC_SZ];
94439e4e 185 int nargs = 0;
62e76326 186
94439e4e 187 if (hlp->cmdline == NULL)
62e76326 188 return;
189
4f0ef8e8 190 char *progname = hlp->cmdline->key;
62e76326 191
4f0ef8e8 192 char *s;
94439e4e 193 if ((s = strrchr(progname, '/')))
62e76326 194 shortname = xstrdup(s + 1);
94439e4e 195 else
62e76326 196 shortname = xstrdup(progname);
197
bf8fe701 198 debugs(84, 1, "helperStatefulOpenServers: Starting " << hlp->n_to_start << " '" << shortname << "' processes");
62e76326 199
4f0ef8e8 200 char *procname = (char *)xmalloc(strlen(shortname) + 3);
62e76326 201
94439e4e 202 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
62e76326 203
94439e4e 204 args[nargs++] = procname;
62e76326 205
4f0ef8e8 206 for (wordlist *w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next)
62e76326 207 args[nargs++] = w->key;
208
94439e4e 209 args[nargs++] = NULL;
62e76326 210
94439e4e 211 assert(nargs <= HELPER_MAX_ARGS);
62e76326 212
4f0ef8e8 213 for (int k = 0; k < hlp->n_to_start; k++) {
62e76326 214 getCurrentTime();
4f0ef8e8 215 int rfd = -1;
216 int wfd = -1;
217 void * hIpc;
218 pid_t pid = ipcCreate(hlp->ipc_type,
b5d712b5 219 progname,
220 args,
221 shortname,
222 &rfd,
223 &wfd,
224 &hIpc);
225
226 if (pid < 0) {
bf8fe701 227 debugs(84, 1, "WARNING: Cannot run '" << progname << "' process.");
62e76326 228 continue;
229 }
230
231 hlp->n_running++;
d8f10d6a 232 hlp->n_active++;
b5d712b5 233 CBDATA_INIT_TYPE(helper_stateful_server);
4f0ef8e8 234 helper_stateful_server *srv = cbdataAlloc(helper_stateful_server);
b5d712b5 235 srv->hIpc = hIpc;
236 srv->pid = pid;
62e76326 237 srv->flags.reserved = S_HELPER_FREE;
238 srv->deferred_requests = 0;
239 srv->stats.deferbyfunc = 0;
240 srv->stats.deferbycb = 0;
241 srv->stats.submits = 0;
242 srv->stats.releases = 0;
243 srv->index = k;
244 srv->rfd = rfd;
245 srv->wfd = wfd;
07eca7e0 246 srv->rbuf = (char *)memAllocBuf(8192, &srv->rbuf_sz);
247 srv->roffset = 0;
62e76326 248 srv->parent = cbdataReference(hlp);
249
250 if (hlp->datapool != NULL)
b001e822 251 srv->data = hlp->datapool->alloc();
62e76326 252
253 dlinkAddTail(srv, &srv->link, &hlp->servers);
254
255 if (rfd == wfd) {
256 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
257 fd_note(rfd, fd_note_buf);
258 } else {
259 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
260 fd_note(rfd, fd_note_buf);
261 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
262 fd_note(wfd, fd_note_buf);
263 }
264
265 commSetNonBlocking(rfd);
266
267 if (wfd != rfd)
268 commSetNonBlocking(wfd);
269
270 comm_add_close_handler(rfd, helperStatefulServerFree, srv);
07eca7e0 271
272 comm_read(srv->rfd, srv->rbuf, srv->rbuf_sz - 1, helperStatefulHandleRead, srv);
273
94439e4e 274 }
62e76326 275
5ea33fce 276 hlp->last_restart = squid_curtime;
94439e4e 277 safe_free(shortname);
278 safe_free(procname);
279 helperStatefulKickQueue(hlp);
280}
281
282
74addf6c 283void
284helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
285{
5b5f9257 286 if (hlp == NULL) {
bf8fe701 287 debugs(84, 3, "helperSubmit: hlp == NULL");
62e76326 288 callback(data, NULL);
289 return;
5b5f9257 290 }
62e76326 291
eff7218b 292 helper_request *r = new helper_request;
293 helper_server *srv;
294
74addf6c 295 r->callback = callback;
fa80a8ef 296 r->data = cbdataReference(data);
74addf6c 297 r->buf = xstrdup(buf);
62e76326 298
74addf6c 299 if ((srv = GetFirstAvailable(hlp)))
62e76326 300 helperDispatch(srv, r);
74addf6c 301 else
62e76326 302 Enqueue(hlp, r);
303
bf8fe701 304 debugs(84, 9, "helperSubmit: " << buf);
94439e4e 305}
306
721b0310 307/* lastserver = "server last used as part of a deferred or reserved
308 * request sequence"
309 */
94439e4e 310void
311helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPSCB * callback, void *data, helper_stateful_server * lastserver)
312{
94439e4e 313 if (hlp == NULL) {
bf8fe701 314 debugs(84, 3, "helperStatefulSubmit: hlp == NULL");
62e76326 315 callback(data, 0, NULL);
316 return;
94439e4e 317 }
62e76326 318
eff7218b 319 helper_stateful_request *r = new helper_stateful_request;
eff7218b 320
94439e4e 321 r->callback = callback;
fa80a8ef 322 r->data = cbdataReference(data);
62e76326 323
721b0310 324 if (buf != NULL) {
62e76326 325 r->buf = xstrdup(buf);
326 r->placeholder = 0;
721b0310 327 } else {
62e76326 328 r->buf = NULL;
329 r->placeholder = 1;
721b0310 330 }
62e76326 331
94439e4e 332 if ((buf != NULL) && lastserver) {
bf8fe701 333 debugs(84, 5, "StatefulSubmit with lastserver " << lastserver);
62e76326 334 /* the queue doesn't count for this assert because queued requests
335 * have already gone through here and been tested.
336 * It's legal to have deferred_requests == 0 and queue entries
337 * and status of S_HELPEER_DEFERRED.
338 * BUT: It's not legal to submit a new request w/lastserver in
339 * that state.
340 */
341 assert(!(lastserver->deferred_requests == 0 &&
342 lastserver->flags.reserved == S_HELPER_DEFERRED));
343
344 if (lastserver->flags.reserved != S_HELPER_RESERVED) {
345 lastserver->stats.submits++;
346 lastserver->deferred_requests--;
347 }
348
349 if (!(lastserver->request)) {
bf8fe701 350 debugs(84, 5, "StatefulSubmit dispatching");
62e76326 351 helperStatefulDispatch(lastserver, r);
352 } else {
bf8fe701 353 debugs(84, 5, "StatefulSubmit queuing");
62e76326 354 StatefulServerEnqueue(lastserver, r);
355 }
94439e4e 356 } else {
4f0ef8e8 357 helper_stateful_server *srv;
62e76326 358 if ((srv = StatefulGetFirstAvailable(hlp))) {
359 helperStatefulDispatch(srv, r);
360 } else
361 StatefulEnqueue(hlp, r);
94439e4e 362 }
62e76326 363
bf8fe701 364 debugs(84, 9, "helperStatefulSubmit: placeholder: '" << r->placeholder << "', buf '" << buf << "'.");
94439e4e 365}
366
4f0ef8e8 367/*
368 * helperStatefulDefer
369 *
370 * find and add a deferred request to a helper
371 */
94439e4e 372helper_stateful_server *
373helperStatefulDefer(statefulhelper * hlp)
94439e4e 374{
4f0ef8e8 375 if (hlp == NULL) {
bf8fe701 376 debugs(84, 3, "helperStatefulDefer: hlp == NULL");
62e76326 377 return NULL;
94439e4e 378 }
62e76326 379
4f0ef8e8 380 debugs(84, 5, "helperStatefulDefer: Running servers " << hlp->n_running);
62e76326 381
4f0ef8e8 382 if (hlp->n_running == 0) {
bf8fe701 383 debugs(84, 1, "helperStatefulDefer: No running servers!. ");
62e76326 384 return NULL;
94439e4e 385 }
62e76326 386
4f0ef8e8 387 helper_stateful_server *rv = StatefulGetFirstAvailable(hlp);
62e76326 388
4f0ef8e8 389 if (rv == NULL) {
62e76326 390 /*
391 * all currently busy; loop through servers and find server
392 * with the shortest queue
393 */
394
4f0ef8e8 395 for (dlink_node *n = hlp->servers.head; n != NULL; n = n->next) {
396 helper_stateful_server *srv = (helper_stateful_server *)n->data;
62e76326 397
398 if (srv->flags.reserved == S_HELPER_RESERVED)
399 continue;
400
b902a58c 401 if (!srv->flags.shutdown)
62e76326 402 continue;
403
404 if ((hlp->IsAvailable != NULL) && (srv->data != NULL) &&
405 !(hlp->IsAvailable(srv->data)))
406 continue;
407
408 if ((rv != NULL) && (rv->deferred_requests < srv->deferred_requests))
409 continue;
410
411 rv = srv;
412 }
0a706c69 413 }
62e76326 414
4f0ef8e8 415 if (rv == NULL) {
bf8fe701 416 debugs(84, 1, "helperStatefulDefer: None available.");
62e76326 417 return NULL;
94439e4e 418 }
62e76326 419
60d096f4 420 /* consistency check:
421 * when the deferred count is 0,
422 * submits + releases == deferbyfunc + deferbycb
423 * Or in english, when there are no deferred requests, the amount
424 * we have submitted to the queue or cancelled must equal the amount
425 * we have said we wanted to be able to submit or cancel
426 */
427 if (rv->deferred_requests == 0)
62e76326 428 assert(rv->stats.submits + rv->stats.releases ==
429 rv->stats.deferbyfunc + rv->stats.deferbycb);
60d096f4 430
94439e4e 431 rv->flags.reserved = S_HELPER_DEFERRED;
62e76326 432
94439e4e 433 rv->deferred_requests++;
62e76326 434
60d096f4 435 rv->stats.deferbyfunc++;
62e76326 436
94439e4e 437 return rv;
438}
439
440void
441helperStatefulReset(helper_stateful_server * srv)
62e76326 442/* puts this helper back in the queue. the calling app is required to
94439e4e 443 * manage the state in the helper.
444 */
445{
446 statefulhelper *hlp = srv->parent;
4f0ef8e8 447 helper_stateful_request *r = srv->request;
62e76326 448
4f0ef8e8 449 if (r != NULL) {
62e76326 450 /* reset attempt DURING an outstaning request */
bf8fe701 451 debugs(84, 1, "helperStatefulReset: RESET During request " << hlp->id_name << " ");
62e76326 452 srv->flags.busy = 0;
07eca7e0 453 srv->roffset = 0;
62e76326 454 helperStatefulRequestFree(r);
455 srv->request = NULL;
94439e4e 456 }
62e76326 457
94439e4e 458 srv->flags.busy = 0;
62e76326 459
4f0ef8e8 460 if (srv->queue.head) {
62e76326 461 srv->flags.reserved = S_HELPER_DEFERRED;
462 helperStatefulServerKickQueue(srv);
4f0ef8e8 463 } else {
62e76326 464 srv->flags.reserved = S_HELPER_FREE;
465
466 if ((srv->parent->OnEmptyQueue != NULL) && (srv->data))
467 srv->parent->OnEmptyQueue(srv->data);
468
469 helperStatefulKickQueue(hlp);
94439e4e 470 }
471}
472
4f0ef8e8 473/*
474 * DPW 2007-05-08
475 *
476 * helperStatefulReleaseServer tells the helper that whoever was
477 * using it no longer needs its services.
478 *
479 * If the state is S_HELPER_DEFERRED, decrease the deferred count.
480 * If the count goes to zero, then it can become S_HELPER_FREE.
481 *
482 * If the state is S_HELPER_RESERVED, then it should always
483 * become S_HELPER_FREE.
484 */
94439e4e 485void
486helperStatefulReleaseServer(helper_stateful_server * srv)
94439e4e 487{
4f0ef8e8 488 debugs(84, 3, HERE << "srv-" << srv->index << " flags.reserved = " << srv->flags.reserved);
489 if (srv->flags.reserved = S_HELPER_FREE)
490 return;
491
60d096f4 492 srv->stats.releases++;
62e76326 493
4f0ef8e8 494 if (srv->flags.reserved == S_HELPER_DEFERRED) {
62e76326 495 assert(srv->deferred_requests);
496 srv->deferred_requests--;
4f0ef8e8 497 if (srv->deferred_requests) {
498 debugs(0,0,HERE << "helperStatefulReleaseServer srv->deferred_requests=" << srv->deferred_requests);
499 return;
500 }
501 if (srv->queue.head) {
502 debugs(0,0,HERE << "helperStatefulReleaseServer srv->queue.head not NULL");
503 return;
504 }
60d096f4 505 }
62e76326 506
4f0ef8e8 507 srv->flags.reserved = S_HELPER_FREE;
508 if (srv->parent->OnEmptyQueue != NULL && srv->data)
509 srv->parent->OnEmptyQueue(srv->data);
94439e4e 510}
511
512void *
513helperStatefulServerGetData(helper_stateful_server * srv)
514/* return a pointer to the stateful routines data area */
515{
516 return srv->data;
74addf6c 517}
518
519void
9522b380 520helperStats(StoreEntry * sentry, helper * hlp, const char *label)
74addf6c 521{
9522b380 522 if (!helperStartStats(sentry, hlp, label))
523 return;
524
0a706c69 525 storeAppendPrintf(sentry, "program: %s\n",
62e76326 526 hlp->cmdline->key);
74addf6c 527 storeAppendPrintf(sentry, "number running: %d of %d\n",
62e76326 528 hlp->n_running, hlp->n_to_start);
74addf6c 529 storeAppendPrintf(sentry, "requests sent: %d\n",
62e76326 530 hlp->stats.requests);
74addf6c 531 storeAppendPrintf(sentry, "replies received: %d\n",
62e76326 532 hlp->stats.replies);
74addf6c 533 storeAppendPrintf(sentry, "queue length: %d\n",
62e76326 534 hlp->stats.queue_size);
f9598528 535 storeAppendPrintf(sentry, "avg service time: %d msec\n",
536 hlp->stats.avg_svc_time);
74addf6c 537 storeAppendPrintf(sentry, "\n");
09c1ece1 538 storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%s\t%7s\t%7s\t%7s\n",
62e76326 539 "#",
540 "FD",
541 "PID",
542 "# Requests",
543 "Flags",
544 "Time",
545 "Offset",
546 "Request");
547
4f0ef8e8 548 for (dlink_node *link = hlp->servers.head; link; link = link->next) {
07eca7e0 549 helper_server *srv = (helper_server*)link->data;
f9598528 550 double tt = 0.001 * (srv->requests[0] ? tvSubMsec(srv->requests[0]->dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
d8f10d6a 551 storeAppendPrintf(sentry, "%7d\t%7d\t%7d\t%11d\t%c%c%c%c\t%7.3f\t%7d\t%s\n",
62e76326 552 srv->index + 1,
553 srv->rfd,
554 srv->pid,
555 srv->stats.uses,
07eca7e0 556 srv->stats.pending ? 'B' : ' ',
557 srv->flags.writing ? 'W' : ' ',
62e76326 558 srv->flags.closing ? 'C' : ' ',
559 srv->flags.shutdown ? 'S' : ' ',
560 tt < 0.0 ? 0.0 : tt,
07eca7e0 561 (int) srv->roffset,
562 srv->requests[0] ? log_quote(srv->requests[0]->buf) : "(none)");
74addf6c 563 }
62e76326 564
74addf6c 565 storeAppendPrintf(sentry, "\nFlags key:\n\n");
74addf6c 566 storeAppendPrintf(sentry, " B = BUSY\n");
f9598528 567 storeAppendPrintf(sentry, " W = WRITING\n");
74addf6c 568 storeAppendPrintf(sentry, " C = CLOSING\n");
569 storeAppendPrintf(sentry, " S = SHUTDOWN\n");
570}
571
94439e4e 572void
9522b380 573helperStatefulStats(StoreEntry * sentry, statefulhelper * hlp, const char *label)
94439e4e 574{
9522b380 575 if (!helperStartStats(sentry, hlp, label))
576 return;
577
cf154edc 578 storeAppendPrintf(sentry, "program: %s\n",
579 hlp->cmdline->key);
94439e4e 580 storeAppendPrintf(sentry, "number running: %d of %d\n",
62e76326 581 hlp->n_running, hlp->n_to_start);
94439e4e 582 storeAppendPrintf(sentry, "requests sent: %d\n",
62e76326 583 hlp->stats.requests);
94439e4e 584 storeAppendPrintf(sentry, "replies received: %d\n",
62e76326 585 hlp->stats.replies);
94439e4e 586 storeAppendPrintf(sentry, "queue length: %d\n",
62e76326 587 hlp->stats.queue_size);
94439e4e 588 storeAppendPrintf(sentry, "avg service time: %d msec\n",
62e76326 589 hlp->stats.avg_svc_time);
94439e4e 590 storeAppendPrintf(sentry, "\n");
0a706c69 591 storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%20s\t%s\t%7s\t%7s\t%7s\n",
62e76326 592 "#",
593 "FD",
594 "PID",
595 "# Requests",
596 "# Deferred Requests",
597 "Flags",
598 "Time",
599 "Offset",
600 "Request");
601
4f0ef8e8 602 for (dlink_node *link = hlp->servers.head; link; link = link->next) {
603 helper_stateful_server *srv = (helper_stateful_server *)link->data;
604 double tt = 0.001 * tvSubMsec(srv->dispatch_time,
f9598528 605 srv->flags.busy ? current_time : srv->answer_time);
b902a58c 606 storeAppendPrintf(sentry, "%7d\t%7d\t%7d\t%11d\t%20d\t%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
62e76326 607 srv->index + 1,
608 srv->rfd,
609 srv->pid,
610 srv->stats.uses,
611 (int) srv->deferred_requests,
62e76326 612 srv->flags.busy ? 'B' : ' ',
613 srv->flags.closing ? 'C' : ' ',
4f0ef8e8 614 srv->flags.reserved == S_HELPER_RESERVED ? 'R' : (srv->flags.reserved == S_HELPER_DEFERRED ? 'D' : ' '),
62e76326 615 srv->flags.shutdown ? 'S' : ' ',
616 srv->request ? (srv->request->placeholder ? 'P' : ' ') : ' ',
617 tt < 0.0 ? 0.0 : tt,
07eca7e0 618 (int) srv->roffset,
62e76326 619 srv->request ? log_quote(srv->request->buf) : "(none)");
94439e4e 620 }
62e76326 621
94439e4e 622 storeAppendPrintf(sentry, "\nFlags key:\n\n");
94439e4e 623 storeAppendPrintf(sentry, " B = BUSY\n");
624 storeAppendPrintf(sentry, " C = CLOSING\n");
625 storeAppendPrintf(sentry, " R = RESERVED or DEFERRED\n");
626 storeAppendPrintf(sentry, " S = SHUTDOWN\n");
5d146f7d 627 storeAppendPrintf(sentry, " P = PLACEHOLDER\n");
94439e4e 628}
629
74addf6c 630void
631helperShutdown(helper * hlp)
632{
c68e9c6b 633 dlink_node *link = hlp->servers.head;
2d5d5261 634#ifdef _SQUID_MSWIN_
635
636 HANDLE hIpc;
637 pid_t pid;
638 int no;
639#endif
62e76326 640
c68e9c6b 641 while (link) {
62e76326 642 helper_server *srv;
643 srv = (helper_server *)link->data;
644 link = link->next;
645
8cfc76db 646 if (srv->flags.shutdown) {
bf8fe701 647 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " has already SHUT DOWN.");
62e76326 648 continue;
649 }
650
d8f10d6a 651 hlp->n_active--;
652 assert(hlp->n_active >= 0);
653
62e76326 654 srv->flags.shutdown = 1; /* request it to shut itself down */
655
d8f10d6a 656 if (srv->flags.closing) {
bf8fe701 657 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is CLOSING.");
62e76326 658 continue;
659 }
660
d8f10d6a 661 if (srv->stats.pending) {
bf8fe701 662 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is BUSY.");
62e76326 663 continue;
664 }
665
666 srv->flags.closing = 1;
2d5d5261 667#ifdef _SQUID_MSWIN_
668
669 hIpc = srv->hIpc;
670 pid = srv->pid;
671 no = srv->index + 1;
672 shutdown(srv->wfd, SD_BOTH);
673#endif
674
bf8fe701 675 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " shutting down.");
62e76326 676 /* the rest of the details is dealt with in the helperServerFree
677 * close handler
678 */
679 comm_close(srv->rfd);
2d5d5261 680#ifdef _SQUID_MSWIN_
681
682 if (hIpc) {
683 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
684 getCurrentTime();
bf8fe701 685 debugs(84, 1, "helperShutdown: WARNING: " << hlp->id_name <<
686 " #" << no << " (" << hlp->cmdline->key << "," <<
687 (long int)pid << ") didn't exit in 5 seconds");
688
2d5d5261 689 }
690
691 CloseHandle(hIpc);
692 }
693
694#endif
695
74addf6c 696 }
697}
698
94439e4e 699void
700helperStatefulShutdown(statefulhelper * hlp)
701{
702 dlink_node *link = hlp->servers.head;
703 helper_stateful_server *srv;
2d5d5261 704#ifdef _SQUID_MSWIN_
705
706 HANDLE hIpc;
707 pid_t pid;
708 int no;
709#endif
62e76326 710
94439e4e 711 while (link) {
62e76326 712 srv = (helper_stateful_server *)link->data;
713 link = link->next;
714
8cfc76db 715 if (srv->flags.shutdown) {
bf8fe701 716 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " has already SHUT DOWN.");
62e76326 717 continue;
718 }
719
d8f10d6a 720 hlp->n_active--;
721 assert(hlp->n_active >= 0);
62e76326 722 srv->flags.shutdown = 1; /* request it to shut itself down */
723
724 if (srv->flags.busy) {
bf8fe701 725 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is BUSY.");
62e76326 726 continue;
727 }
728
729 if (srv->flags.closing) {
bf8fe701 730 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is CLOSING.");
62e76326 731 continue;
732 }
733
734 if (srv->flags.reserved != S_HELPER_FREE) {
bf8fe701 735 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is RESERVED.");
62e76326 736 continue;
737 }
738
739 if (srv->deferred_requests) {
bf8fe701 740 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " has DEFERRED requests.");
62e76326 741 continue;
742 }
743
744 srv->flags.closing = 1;
2d5d5261 745#ifdef _SQUID_MSWIN_
746
747 hIpc = srv->hIpc;
748 pid = srv->pid;
749 no = srv->index + 1;
750 shutdown(srv->wfd, SD_BOTH);
751#endif
752
bf8fe701 753 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " shutting down.");
754
62e76326 755 /* the rest of the details is dealt with in the helperStatefulServerFree
756 * close handler
757 */
758 comm_close(srv->rfd);
2d5d5261 759#ifdef _SQUID_MSWIN_
760
761 if (hIpc) {
762 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
763 getCurrentTime();
bf8fe701 764 debugs(84, 1, "helperShutdown: WARNING: " << hlp->id_name <<
765 " #" << no << " (" << hlp->cmdline->key << "," <<
766 (long int)pid << ") didn't exit in 5 seconds");
2d5d5261 767 }
768
769 CloseHandle(hIpc);
770 }
771
772#endif
773
94439e4e 774 }
775}
776
777
1f5f60dd 778helper *
779helperCreate(const char *name)
780{
28c60158 781 helper *hlp;
aa839030 782 CBDATA_INIT_TYPE(helper);
72711e31 783 hlp = cbdataAlloc(helper);
1f5f60dd 784 hlp->id_name = name;
785 return hlp;
786}
787
94439e4e 788statefulhelper *
789helperStatefulCreate(const char *name)
790{
791 statefulhelper *hlp;
aa839030 792 CBDATA_INIT_TYPE(statefulhelper);
72711e31 793 hlp = cbdataAlloc(statefulhelper);
94439e4e 794 hlp->id_name = name;
795 return hlp;
796}
797
798
1f5f60dd 799void
800helperFree(helper * hlp)
801{
5dae8514 802 if (!hlp)
62e76326 803 return;
804
1f5f60dd 805 /* note, don't free hlp->name, it probably points to static memory */
fe73896c 806 if (hlp->queue.head)
bf8fe701 807 debugs(84, 0, "WARNING: freeing " << hlp->id_name << " helper with " <<
808 hlp->stats.queue_size << " requests queued");
62e76326 809
1f5f60dd 810 cbdataFree(hlp);
811}
812
94439e4e 813void
814helperStatefulFree(statefulhelper * hlp)
815{
5dae8514 816 if (!hlp)
62e76326 817 return;
818
94439e4e 819 /* note, don't free hlp->name, it probably points to static memory */
820 if (hlp->queue.head)
bf8fe701 821 debugs(84, 0, "WARNING: freeing " << hlp->id_name << " helper with " <<
822 hlp->stats.queue_size << " requests queued");
62e76326 823
94439e4e 824 cbdataFree(hlp);
825}
826
827
74addf6c 828/* ====================================================================== */
829/* LOCAL FUNCTIONS */
830/* ====================================================================== */
831
832static void
1f5f60dd 833helperServerFree(int fd, void *data)
74addf6c 834{
e6ccf245 835 helper_server *srv = (helper_server *)data;
74addf6c 836 helper *hlp = srv->parent;
ac750329 837 helper_request *r;
07eca7e0 838 int i, concurrency = hlp->concurrency;
839
840 if (!concurrency)
841 concurrency = 1;
842
74addf6c 843 assert(srv->rfd == fd);
62e76326 844
07eca7e0 845 if (srv->rbuf) {
846 memFreeBuf(srv->rbuf_sz, srv->rbuf);
847 srv->rbuf = NULL;
74addf6c 848 }
62e76326 849
2fe7eff9 850 srv->wqueue->clean();
032785bf 851 delete srv->wqueue;
852
853 if (srv->writebuf) {
2fe7eff9 854 srv->writebuf->clean();
032785bf 855 delete srv->writebuf;
856 srv->writebuf = NULL;
857 }
62e76326 858
07eca7e0 859 for (i = 0; i < concurrency; i++) {
860 if ((r = srv->requests[i])) {
861 void *cbdata;
62e76326 862
07eca7e0 863 if (cbdataReferenceValidDone(r->data, &cbdata))
864 r->callback(cbdata, NULL);
62e76326 865
07eca7e0 866 helperRequestFree(r);
867
868 srv->requests[i] = NULL;
869 }
ac750329 870 }
62e76326 871
07eca7e0 872 safe_free(srv->requests);
873
3cdb7cd0 874 if (srv->wfd != srv->rfd && srv->wfd != -1)
62e76326 875 comm_close(srv->wfd);
876
74addf6c 877 dlinkDelete(&srv->link, &hlp->servers);
62e76326 878
74addf6c 879 hlp->n_running--;
62e76326 880
74addf6c 881 assert(hlp->n_running >= 0);
62e76326 882
1f5f60dd 883 if (!srv->flags.shutdown) {
d8f10d6a 884 hlp->n_active--;
885 assert(hlp->n_active >= 0);
bf8fe701 886 debugs(84, 0, "WARNING: " << hlp->id_name << " #" << srv->index + 1 <<
887 " (FD " << fd << ") exited");
62e76326 888
5ea33fce 889 if (hlp->n_active < hlp->n_to_start / 2) {
bf8fe701 890 debugs(80, 0, "Too few " << hlp->id_name << " processes are running");
5ea33fce 891
892 if (hlp->last_restart > squid_curtime - 30)
893 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
894
bf8fe701 895 debugs(80, 0, "Starting new helpers");
5ea33fce 896
897 helperOpenServers(hlp);
898 }
14e87a44 899 }
62e76326 900
fa80a8ef 901 cbdataReferenceDone(srv->parent);
14e87a44 902 cbdataFree(srv);
74addf6c 903}
904
94439e4e 905static void
906helperStatefulServerFree(int fd, void *data)
907{
e6ccf245 908 helper_stateful_server *srv = (helper_stateful_server *)data;
94439e4e 909 statefulhelper *hlp = srv->parent;
910 helper_stateful_request *r;
911 assert(srv->rfd == fd);
62e76326 912
07eca7e0 913 if (srv->rbuf) {
914 memFreeBuf(srv->rbuf_sz, srv->rbuf);
915 srv->rbuf = NULL;
94439e4e 916 }
62e76326 917
07eca7e0 918#if 0
2fe7eff9 919 srv->wqueue->clean();
032785bf 920
921 delete srv->wqueue;
07eca7e0 922
923#endif
924
94439e4e 925 if ((r = srv->request)) {
62e76326 926 void *cbdata;
927
928 if (cbdataReferenceValidDone(r->data, &cbdata))
07eca7e0 929 r->callback(cbdata, srv, NULL);
62e76326 930
931 helperStatefulRequestFree(r);
932
933 srv->request = NULL;
94439e4e 934 }
62e76326 935
3c641669 936 /* TODO: walk the local queue of requests and carry them all out */
94439e4e 937 if (srv->wfd != srv->rfd && srv->wfd != -1)
62e76326 938 comm_close(srv->wfd);
939
94439e4e 940 dlinkDelete(&srv->link, &hlp->servers);
62e76326 941
94439e4e 942 hlp->n_running--;
62e76326 943
94439e4e 944 assert(hlp->n_running >= 0);
62e76326 945
94439e4e 946 if (!srv->flags.shutdown) {
d8f10d6a 947 hlp->n_active--;
948 assert( hlp->n_active >= 0);
bf8fe701 949 debugs(84, 0, "WARNING: " << hlp->id_name << " #" << srv->index + 1 << " (FD " << fd << ") exited");
62e76326 950
5ea33fce 951 if (hlp->n_active <= hlp->n_to_start / 2) {
bf8fe701 952 debugs(80, 0, "Too few " << hlp->id_name << " processes are running");
5ea33fce 953
954 if (hlp->last_restart > squid_curtime - 30)
955 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
956
bf8fe701 957 debugs(80, 0, "Starting new helpers");
5ea33fce 958
959 helperStatefulOpenServers(hlp);
960 }
94439e4e 961 }
62e76326 962
94439e4e 963 if (srv->data != NULL)
b001e822 964 hlp->datapool->free(srv->data);
62e76326 965
fa80a8ef 966 cbdataReferenceDone(srv->parent);
62e76326 967
94439e4e 968 cbdataFree(srv);
969}
970
971
74addf6c 972static void
c4b7a5a9 973helperHandleRead(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
74addf6c 974{
74addf6c 975 char *t = NULL;
e6ccf245 976 helper_server *srv = (helper_server *)data;
74addf6c 977 helper *hlp = srv->parent;
978 assert(fd == srv->rfd);
fa80a8ef 979 assert(cbdataReferenceValid(data));
c4b7a5a9 980
981 /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
62e76326 982
c4b7a5a9 983 if (flag == COMM_ERR_CLOSING) {
984 return;
985 }
986
4f0ef8e8 987 debugs(84, 5, "helperHandleRead: " << len << " bytes from " << hlp->id_name << " #" << srv->index + 1);
62e76326 988
c4b7a5a9 989 if (flag != COMM_OK || len <= 0) {
62e76326 990 if (len < 0)
bf8fe701 991 debugs(84, 1, "helperHandleRead: FD " << fd << " read: " << xstrerror());
62e76326 992
993 comm_close(fd);
994
995 return;
74addf6c 996 }
62e76326 997
07eca7e0 998 srv->roffset += len;
999 srv->rbuf[srv->roffset] = '\0';
bf8fe701 1000 debugs(84, 9, "helperHandleRead: '" << srv->rbuf << "'");
62e76326 1001
07eca7e0 1002 if (!srv->stats.pending) {
62e76326 1003 /* someone spoke without being spoken to */
bf8fe701 1004 debugs(84, 1, "helperHandleRead: unexpected read from " <<
1005 hlp->id_name << " #" << srv->index + 1 << ", " << (int)len <<
1006 " bytes '" << srv->rbuf << "'");
1007
07eca7e0 1008 srv->roffset = 0;
1009 srv->rbuf[0] = '\0';
1010 }
1011
1012 while ((t = strchr(srv->rbuf, '\n'))) {
62e76326 1013 /* end of reply found */
07eca7e0 1014 helper_request *r;
1015 char *msg = srv->rbuf;
1016 int i = 0;
bf8fe701 1017 debugs(84, 3, "helperHandleRead: end of reply found");
6bf4f823 1018
1019 if (t > srv->rbuf && t[-1] == '\r')
1020 t[-1] = '\0';
1021
07eca7e0 1022 *t++ = '\0';
62e76326 1023
07eca7e0 1024 if (hlp->concurrency) {
1025 i = strtol(msg, &msg, 10);
62e76326 1026
e4755e29 1027 while (*msg && xisspace(*msg))
07eca7e0 1028 msg++;
1029 }
62e76326 1030
07eca7e0 1031 r = srv->requests[i];
62e76326 1032
07eca7e0 1033 if (r) {
1034 HLPCB *callback = r->callback;
1035 void *cbdata;
62e76326 1036
07eca7e0 1037 srv->requests[i] = NULL;
62e76326 1038
07eca7e0 1039 r->callback = NULL;
62e76326 1040
07eca7e0 1041 if (cbdataReferenceValidDone(r->data, &cbdata))
1042 callback(cbdata, msg);
62e76326 1043
07eca7e0 1044 srv->stats.pending--;
1045
1046 hlp->stats.replies++;
1047
f9598528 1048 srv->answer_time = current_time;
1049
1050 srv->dispatch_time = r->dispatch_time;
1051
07eca7e0 1052 hlp->stats.avg_svc_time =
1053 intAverage(hlp->stats.avg_svc_time,
1054 tvSubMsec(r->dispatch_time, current_time),
1055 hlp->stats.replies, REDIRECT_AV_FACTOR);
1056
1057 helperRequestFree(r);
1058 } else {
bf8fe701 1059 debugs(84, 1, "helperHandleRead: unexpected reply on channel " <<
1060 i << " from " << hlp->id_name << " #" << srv->index + 1 <<
1061 " '" << srv->rbuf << "'");
1062
07eca7e0 1063 }
1064
1065 srv->roffset -= (t - srv->rbuf);
1066 memmove(srv->rbuf, t, srv->roffset + 1);
62e76326 1067
1068 if (srv->flags.shutdown) {
1069 int wfd = srv->wfd;
1070 srv->wfd = -1;
d8f10d6a 1071 srv->flags.closing=1;
62e76326 1072 comm_close(wfd);
1073 } else
1074 helperKickQueue(hlp);
74addf6c 1075 }
07eca7e0 1076
1077 comm_read(fd, srv->rbuf + srv->roffset, srv->rbuf_sz - srv->roffset - 1, helperHandleRead, srv);
74addf6c 1078}
1079
94439e4e 1080static void
c4b7a5a9 1081helperStatefulHandleRead(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
94439e4e 1082{
94439e4e 1083 char *t = NULL;
e6ccf245 1084 helper_stateful_server *srv = (helper_stateful_server *)data;
94439e4e 1085 helper_stateful_request *r;
1086 statefulhelper *hlp = srv->parent;
1087 assert(fd == srv->rfd);
fa80a8ef 1088 assert(cbdataReferenceValid(data));
c4b7a5a9 1089
1090 /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
62e76326 1091
c4b7a5a9 1092 if (flag == COMM_ERR_CLOSING) {
1093 return;
1094 }
1095
4a7a3d56 1096 debugs(84, 5, "helperStatefulHandleRead: " << len << " bytes from " <<
4f0ef8e8 1097 hlp->id_name << " #" << srv->index + 1);
bf8fe701 1098
62e76326 1099
c4b7a5a9 1100 if (flag != COMM_OK || len <= 0) {
62e76326 1101 if (len < 0)
bf8fe701 1102 debugs(84, 1, "helperStatefulHandleRead: FD " << fd << " read: " << xstrerror());
62e76326 1103
1104 comm_close(fd);
1105
1106 return;
94439e4e 1107 }
62e76326 1108
07eca7e0 1109 srv->roffset += len;
1110 srv->rbuf[srv->roffset] = '\0';
94439e4e 1111 r = srv->request;
62e76326 1112
94439e4e 1113 if (r == NULL) {
62e76326 1114 /* someone spoke without being spoken to */
bf8fe701 1115 debugs(84, 1, "helperStatefulHandleRead: unexpected read from " <<
1116 hlp->id_name << " #" << srv->index + 1 << ", " << (int)len <<
1117 " bytes '" << srv->rbuf << "'");
1118
07eca7e0 1119 srv->roffset = 0;
1120 }
1121
1122 if ((t = strchr(srv->rbuf, '\n'))) {
62e76326 1123 /* end of reply found */
bf8fe701 1124 debugs(84, 3, "helperStatefulHandleRead: end of reply found");
6bf4f823 1125
1126 if (t > srv->rbuf && t[-1] == '\r')
1127 t[-1] = '\0';
1128
62e76326 1129 *t = '\0';
1130
1131 if (cbdataReferenceValid(r->data)) {
07eca7e0 1132 switch ((r->callback(r->data, srv, srv->rbuf))) { /*if non-zero reserve helper */
62e76326 1133
1134 case S_HELPER_UNKNOWN:
1135 fatal("helperStatefulHandleRead: either a non-state aware callback was give to the stateful helper routines, or an uninitialised callback response was recieved.\n");
1136 break;
1137
1138 case S_HELPER_RELEASE: /* helper finished with */
1139
1140 if (!srv->deferred_requests && !srv->queue.head) {
1141 srv->flags.reserved = S_HELPER_FREE;
1142
1143 if ((srv->parent->OnEmptyQueue != NULL) && (srv->data))
1144 srv->parent->OnEmptyQueue(srv->data);
1145
bf8fe701 1146 debugs(84, 5, "StatefulHandleRead: releasing " << hlp->id_name << " #" << srv->index + 1);
62e76326 1147 } else {
1148 srv->flags.reserved = S_HELPER_DEFERRED;
bf8fe701 1149 debugs(84, 5, "StatefulHandleRead: outstanding deferred requests on " <<
1150 hlp->id_name << " #" << srv->index + 1 <<
1151 ". reserving for deferred requests.");
62e76326 1152 }
1153
1154 break;
1155
1156 case S_HELPER_RESERVE: /* 'pin' this helper for the caller */
1157
1158 if (!srv->queue.head) {
1159 assert(srv->deferred_requests == 0);
1160 srv->flags.reserved = S_HELPER_RESERVED;
bf8fe701 1161 debugs(84, 5, "StatefulHandleRead: reserving " << hlp->id_name << " #" << srv->index + 1);
62e76326 1162 } else {
1163 fatal("StatefulHandleRead: Callback routine attempted to reserve a stateful helper with deferred requests. This can lead to deadlock.\n");
1164 }
1165
1166 break;
1167
1168 case S_HELPER_DEFER:
1169 /* the helper is still needed, but can
1170 * be used for other requests in the meantime.
1171 */
1172 srv->flags.reserved = S_HELPER_DEFERRED;
1173 srv->deferred_requests++;
1174 srv->stats.deferbycb++;
bf8fe701 1175 debugs(84, 5, "StatefulHandleRead: reserving " << hlp->id_name << " #" << srv->index + 1 << " for deferred requests.");
62e76326 1176 break;
1177
1178 default:
1179 fatal("helperStatefulHandleRead: unknown stateful helper callback result.\n");
1180 }
1181
1182 } else {
bf8fe701 1183 debugs(84, 1, "StatefulHandleRead: no callback data registered");
62e76326 1184 }
1185
1186 srv->flags.busy = 0;
07eca7e0 1187 srv->roffset = 0;
62e76326 1188 helperStatefulRequestFree(r);
1189 srv->request = NULL;
1190 hlp->stats.replies++;
f9598528 1191 srv->answer_time = current_time;
62e76326 1192 hlp->stats.avg_svc_time =
1193 intAverage(hlp->stats.avg_svc_time,
1194 tvSubMsec(srv->dispatch_time, current_time),
1195 hlp->stats.replies, REDIRECT_AV_FACTOR);
1196
1197 if (srv->flags.shutdown
1198 && srv->flags.reserved == S_HELPER_FREE
1199 && !srv->deferred_requests) {
1200 int wfd = srv->wfd;
1201 srv->wfd = -1;
d8f10d6a 1202 srv->flags.closing=1;
62e76326 1203 comm_close(wfd);
1204 } else {
1205 if (srv->queue.head)
1206 helperStatefulServerKickQueue(srv);
1207 else
1208 helperStatefulKickQueue(hlp);
1209 }
94439e4e 1210 }
07eca7e0 1211
1212 comm_read(srv->rfd, srv->rbuf + srv->roffset, srv->rbuf_sz - srv->roffset - 1,
1213 helperStatefulHandleRead, srv);
94439e4e 1214}
1215
74addf6c 1216static void
1217Enqueue(helper * hlp, helper_request * r)
1218{
e6ccf245 1219 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
74addf6c 1220 dlinkAddTail(r, link, &hlp->queue);
1221 hlp->stats.queue_size++;
62e76326 1222
74addf6c 1223 if (hlp->stats.queue_size < hlp->n_running)
62e76326 1224 return;
1225
74addf6c 1226 if (squid_curtime - hlp->last_queue_warn < 600)
62e76326 1227 return;
1228
fe73896c 1229 if (shutting_down || reconfiguring)
62e76326 1230 return;
1231
74addf6c 1232 hlp->last_queue_warn = squid_curtime;
62e76326 1233
bf8fe701 1234 debugs(84, 0, "WARNING: All " << hlp->id_name << " processes are busy.");
1235 debugs(84, 0, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
62e76326 1236
62e76326 1237
74addf6c 1238 if (hlp->stats.queue_size > hlp->n_running * 2)
62e76326 1239 fatalf("Too many queued %s requests", hlp->id_name);
1240
bf8fe701 1241 debugs(84, 1, "Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1242
74addf6c 1243}
1244
94439e4e 1245static void
1246StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r)
1247{
e6ccf245 1248 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
94439e4e 1249 dlinkAddTail(r, link, &hlp->queue);
1250 hlp->stats.queue_size++;
62e76326 1251
94439e4e 1252 if (hlp->stats.queue_size < hlp->n_running)
62e76326 1253 return;
1254
893cbac6 1255 if (hlp->stats.queue_size > hlp->n_running * 2)
62e76326 1256 fatalf("Too many queued %s requests", hlp->id_name);
1257
94439e4e 1258 if (squid_curtime - hlp->last_queue_warn < 600)
62e76326 1259 return;
1260
94439e4e 1261 if (shutting_down || reconfiguring)
62e76326 1262 return;
1263
94439e4e 1264 hlp->last_queue_warn = squid_curtime;
62e76326 1265
bf8fe701 1266 debugs(84, 0, "WARNING: All " << hlp->id_name << " processes are busy.");
62e76326 1267
bf8fe701 1268 debugs(84, 0, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1269 debugs(84, 1, "Consider increasing the number of " << hlp->id_name << " processes in your config file.");
62e76326 1270
94439e4e 1271}
1272
1273static void
1274StatefulServerEnqueue(helper_stateful_server * srv, helper_stateful_request * r)
1275{
e6ccf245 1276 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
94439e4e 1277 dlinkAddTail(r, link, &srv->queue);
62e76326 1278 /* TODO: warning if the queue on this server is more than X
1279 * We don't check the queue size at the moment, because
1280 * requests hitting here are deferrable
1281 */
1282 /* hlp->stats.queue_size++;
1283 * if (hlp->stats.queue_size < hlp->n_running)
1284 * return;
1285 * if (squid_curtime - hlp->last_queue_warn < 600)
1286 * return;
1287 * if (shutting_down || reconfiguring)
1288 * return;
1289 * hlp->last_queue_warn = squid_curtime;
bf8fe701 1290 * debugs(84, 0, "WARNING: All " << hlp->id_name << " processes are busy.");
1291 * debugs(84, 0, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
62e76326 1292 * if (hlp->stats.queue_size > hlp->n_running * 2)
1293 * fatalf("Too many queued %s requests", hlp->id_name);
bf8fe701 1294 * debugs(84, 1, "Consider increasing the number of " << hlp->id_name << " processes in your config file." ); */
94439e4e 1295}
1296
1297
74addf6c 1298static helper_request *
1299Dequeue(helper * hlp)
1300{
1301 dlink_node *link;
1302 helper_request *r = NULL;
62e76326 1303
74addf6c 1304 if ((link = hlp->queue.head)) {
62e76326 1305 r = (helper_request *)link->data;
1306 dlinkDelete(link, &hlp->queue);
1307 memFree(link, MEM_DLINK_NODE);
1308 hlp->stats.queue_size--;
74addf6c 1309 }
62e76326 1310
74addf6c 1311 return r;
1312}
1313
94439e4e 1314static helper_stateful_request *
1315StatefulServerDequeue(helper_stateful_server * srv)
1316{
1317 dlink_node *link;
1318 helper_stateful_request *r = NULL;
62e76326 1319
94439e4e 1320 if ((link = srv->queue.head)) {
62e76326 1321 r = (helper_stateful_request *)link->data;
1322 dlinkDelete(link, &srv->queue);
1323 memFree(link, MEM_DLINK_NODE);
94439e4e 1324 }
62e76326 1325
94439e4e 1326 return r;
1327}
1328
1329static helper_stateful_request *
1330StatefulDequeue(statefulhelper * hlp)
1331{
1332 dlink_node *link;
1333 helper_stateful_request *r = NULL;
62e76326 1334
94439e4e 1335 if ((link = hlp->queue.head)) {
62e76326 1336 r = (helper_stateful_request *)link->data;
1337 dlinkDelete(link, &hlp->queue);
1338 memFree(link, MEM_DLINK_NODE);
1339 hlp->stats.queue_size--;
94439e4e 1340 }
62e76326 1341
94439e4e 1342 return r;
1343}
1344
74addf6c 1345static helper_server *
1346GetFirstAvailable(helper * hlp)
1347{
1348 dlink_node *n;
07eca7e0 1349 helper_server *srv;
1350 helper_server *selected = NULL;
62e76326 1351
fe73896c 1352 if (hlp->n_running == 0)
62e76326 1353 return NULL;
1354
07eca7e0 1355 /* Find "least" loaded helper (approx) */
74addf6c 1356 for (n = hlp->servers.head; n != NULL; n = n->next) {
62e76326 1357 srv = (helper_server *)n->data;
1358
07eca7e0 1359 if (selected && selected->stats.pending <= srv->stats.pending)
62e76326 1360 continue;
1361
d8f10d6a 1362 if (srv->flags.shutdown)
62e76326 1363 continue;
1364
07eca7e0 1365 if (!srv->stats.pending)
1366 return srv;
1367
1368 if (selected) {
1369 selected = srv;
1370 break;
1371 }
1372
1373 selected = srv;
74addf6c 1374 }
62e76326 1375
07eca7e0 1376 /* Check for overload */
1377 if (!selected)
1378 return NULL;
1379
1380 if (selected->stats.pending >= (hlp->concurrency ? hlp->concurrency : 1))
1381 return NULL;
1382
1383 return selected;
74addf6c 1384}
1385
94439e4e 1386static helper_stateful_server *
1387StatefulGetFirstAvailable(statefulhelper * hlp)
1388{
1389 dlink_node *n;
1390 helper_stateful_server *srv = NULL;
4f0ef8e8 1391 debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp->n_running);
62e76326 1392
94439e4e 1393 if (hlp->n_running == 0)
62e76326 1394 return NULL;
1395
94439e4e 1396 for (n = hlp->servers.head; n != NULL; n = n->next) {
62e76326 1397 srv = (helper_stateful_server *)n->data;
1398
1399 if (srv->flags.busy)
1400 continue;
1401
1402 if (srv->flags.reserved == S_HELPER_RESERVED)
1403 continue;
1404
d8f10d6a 1405 if (srv->flags.shutdown)
62e76326 1406 continue;
1407
1408 if ((hlp->IsAvailable != NULL) && (srv->data != NULL) && !(hlp->IsAvailable(srv->data)))
1409 continue;
1410
4f0ef8e8 1411 debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv->index);
62e76326 1412 return srv;
94439e4e 1413 }
62e76326 1414
bf8fe701 1415 debugs(84, 5, "StatefulGetFirstAvailable: None available.");
94439e4e 1416 return NULL;
1417}
1418
1419
42679bd6 1420static void
1421helperDispatchWriteDone(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
1422{
07eca7e0 1423 helper_server *srv = (helper_server *)data;
1424
2fe7eff9 1425 srv->writebuf->clean();
032785bf 1426 delete srv->writebuf;
1427 srv->writebuf = NULL;
07eca7e0 1428 srv->flags.writing = 0;
1429
1430 if (flag != COMM_OK) {
1431 /* Helper server has crashed */
bf8fe701 1432 debugs(84, 0, "helperDispatch: Helper " << srv->parent->id_name << " #" << srv->index + 1 << " has crashed");
07eca7e0 1433 return;
1434 }
1435
2fe7eff9 1436 if (!srv->wqueue->isNull()) {
07eca7e0 1437 srv->writebuf = srv->wqueue;
032785bf 1438 srv->wqueue = new MemBuf;
07eca7e0 1439 srv->flags.writing = 1;
1440 comm_write(srv->wfd,
032785bf 1441 srv->writebuf->content(),
1442 srv->writebuf->contentSize(),
07eca7e0 1443 helperDispatchWriteDone, /* Handler */
2b663917 1444 srv, NULL); /* Handler-data, freefunc */
07eca7e0 1445 }
42679bd6 1446}
1447
74addf6c 1448static void
1449helperDispatch(helper_server * srv, helper_request * r)
1450{
1451 helper *hlp = srv->parent;
07eca7e0 1452 helper_request **ptr = NULL;
1453 unsigned int slot;
62e76326 1454
fa80a8ef 1455 if (!cbdataReferenceValid(r->data)) {
bf8fe701 1456 debugs(84, 1, "helperDispatch: invalid callback data");
62e76326 1457 helperRequestFree(r);
1458 return;
74addf6c 1459 }
62e76326 1460
07eca7e0 1461 for (slot = 0; slot < (hlp->concurrency ? hlp->concurrency : 1); slot++) {
1462 if (!srv->requests[slot]) {
1463 ptr = &srv->requests[slot];
1464 break;
1465 }
1466 }
1467
1468 assert(ptr);
1469 *ptr = r;
1470 srv->stats.pending += 1;
1471 r->dispatch_time = current_time;
1472
2fe7eff9 1473 if (srv->wqueue->isNull())
1474 srv->wqueue->init();
07eca7e0 1475
1476 if (hlp->concurrency)
2fe7eff9 1477 srv->wqueue->Printf("%d %s", slot, r->buf);
07eca7e0 1478 else
2fe7eff9 1479 srv->wqueue->append(r->buf, strlen(r->buf));
07eca7e0 1480
1481 if (!srv->flags.writing) {
032785bf 1482 assert(NULL == srv->writebuf);
07eca7e0 1483 srv->writebuf = srv->wqueue;
032785bf 1484 srv->wqueue = new MemBuf;
07eca7e0 1485 srv->flags.writing = 1;
1486 comm_write(srv->wfd,
032785bf 1487 srv->writebuf->content(),
1488 srv->writebuf->contentSize(),
07eca7e0 1489 helperDispatchWriteDone, /* Handler */
2b663917 1490 srv, NULL); /* Handler-data, free func */
07eca7e0 1491 }
1492
4a7a3d56 1493 debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index + 1 << ", " << strlen(r->buf) << " bytes");
bf8fe701 1494
74addf6c 1495 srv->stats.uses++;
1496 hlp->stats.requests++;
1497}
1498
42679bd6 1499static void
1500helperStatefulDispatchWriteDone(int fd, char *buf, size_t len, comm_err_t flag,
62e76326 1501 int xerrno, void *data)
42679bd6 1502{
62e76326 1503 /* nothing! */
42679bd6 1504}
1505
1506
94439e4e 1507static void
1508helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r)
1509{
1510 statefulhelper *hlp = srv->parent;
62e76326 1511
fa80a8ef 1512 if (!cbdataReferenceValid(r->data)) {
bf8fe701 1513 debugs(84, 1, "helperStatefulDispatch: invalid callback data");
62e76326 1514 helperStatefulRequestFree(r);
1515 return;
94439e4e 1516 }
62e76326 1517
bf8fe701 1518 debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index + 1);
62e76326 1519
94439e4e 1520 if (r->placeholder == 1) {
62e76326 1521 /* a callback is needed before this request can _use_ a helper. */
1522 /* we don't care about releasing/deferring this helper. The request NEVER
1523 * gets to the helper. So we throw away the return code */
1524 r->callback(r->data, srv, NULL);
1525 /* throw away the placeholder */
1526 helperStatefulRequestFree(r);
1527 /* and push the queue. Note that the callback may have submitted a new
1528 * request to the helper which is why we test for the request*/
1529
1530 if (srv->request == NULL) {
1531 if (srv->flags.shutdown
1532 && srv->flags.reserved == S_HELPER_FREE
1533 && !srv->deferred_requests) {
1534 int wfd = srv->wfd;
1535 srv->wfd = -1;
1536 comm_close(wfd);
1537 } else {
1538 if (srv->queue.head)
1539 helperStatefulServerKickQueue(srv);
1540 else
1541 helperStatefulKickQueue(hlp);
1542 }
1543 }
1544
1545 return;
94439e4e 1546 }
62e76326 1547
94439e4e 1548 srv->flags.busy = 1;
1549 srv->request = r;
1550 srv->dispatch_time = current_time;
42679bd6 1551 comm_write(srv->wfd,
62e76326 1552 r->buf,
1553 strlen(r->buf),
1554 helperStatefulDispatchWriteDone, /* Handler */
2b663917 1555 hlp, NULL); /* Handler-data, free func */
bf8fe701 1556 debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
1557 hlp->id_name << " #" << srv->index + 1 << ", " <<
1558 (int) strlen(r->buf) << " bytes");
1559
94439e4e 1560 srv->stats.uses++;
1561 hlp->stats.requests++;
1562}
1563
1564
74addf6c 1565static void
1566helperKickQueue(helper * hlp)
1567{
1568 helper_request *r;
1569 helper_server *srv;
62e76326 1570
74addf6c 1571 while ((srv = GetFirstAvailable(hlp)) && (r = Dequeue(hlp)))
62e76326 1572 helperDispatch(srv, r);
74addf6c 1573}
1574
94439e4e 1575static void
1576helperStatefulKickQueue(statefulhelper * hlp)
1577{
1578 helper_stateful_request *r;
1579 helper_stateful_server *srv;
62e76326 1580
94439e4e 1581 while ((srv = StatefulGetFirstAvailable(hlp)) && (r = StatefulDequeue(hlp)))
62e76326 1582 helperStatefulDispatch(srv, r);
94439e4e 1583}
1584
1585static void
1586helperStatefulServerKickQueue(helper_stateful_server * srv)
1587{
1588 helper_stateful_request *r;
62e76326 1589
94439e4e 1590 if ((r = StatefulServerDequeue(srv)))
62e76326 1591 helperStatefulDispatch(srv, r);
94439e4e 1592}
1593
74addf6c 1594static void
1595helperRequestFree(helper_request * r)
1596{
fa80a8ef 1597 cbdataReferenceDone(r->data);
74addf6c 1598 xfree(r->buf);
00d77d6b 1599 delete r;
51ee7c82 1600}
1601
94439e4e 1602static void
1603helperStatefulRequestFree(helper_stateful_request * r)
1604{
fa80a8ef 1605 cbdataReferenceDone(r->data);
94439e4e 1606 xfree(r->buf);
00d77d6b 1607 delete r;
51ee7c82 1608}
9522b380 1609
1610// TODO: should helper_ and helper_stateful_ have a common parent?
1611static bool
1612helperStartStats(StoreEntry *sentry, void *hlp, const char *label)
1613{
1614 if (!hlp) {
1615 if (label)
1616 storeAppendPrintf(sentry, "%s: unavailable\n", label);
1617 return false;
1618 }
1619
1620 if (label)
1621 storeAppendPrintf(sentry, "%s:\n", label);
1622
1623 return true;
1624}