]> git.ipfire.org Git - thirdparty/squid.git/blame - src/helper.cc
Author: Alex Rousskov <rousskov@measurement-factory.com>
[thirdparty/squid.git] / src / helper.cc
CommitLineData
f740a279 1
2/*
eff7218b 3 * $Id: helper.cc,v 1.82 2007/05/04 15:40:12 rousskov Exp $
f740a279 4 *
17bb3486 5 * DEBUG: section 84 Helper process maintenance
f740a279 6 * AUTHOR: Harvest Derived?
7 *
2b6662ba 8 * SQUID Web Proxy Cache http://www.squid-cache.org/
f740a279 9 * ----------------------------------------------------------
10 *
2b6662ba 11 * Squid is the result of efforts by numerous individuals from
12 * the Internet community; see the CONTRIBUTORS file for full
13 * details. Many organizations have provided support for Squid's
14 * development; see the SPONSORS file for full details. Squid is
15 * Copyrighted (C) 2001 by the Regents of the University of
16 * California; see the COPYRIGHT file for full details. Squid
17 * incorporates software developed and/or copyrighted by other
18 * sources; see the CREDITS file for full details.
f740a279 19 *
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
24 *
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
29 *
30 * You should have received a copy of the GNU General Public License
31 * along with this program; if not, write to the Free Software
32 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
33 *
34 */
35
74addf6c 36#include "squid.h"
51ee7c82 37#include "helper.h"
985c86bc 38#include "SquidTime.h"
e6ccf245 39#include "Store.h"
42679bd6 40#include "comm.h"
0eb49b6d 41#include "MemBuf.h"
d295d770 42#include "wordlist.h"
74addf6c 43
44#define HELPER_MAX_ARGS 64
45
c4b7a5a9 46static IOCB helperHandleRead;
47static IOCB helperStatefulHandleRead;
1f5f60dd 48static PF helperServerFree;
94439e4e 49static PF helperStatefulServerFree;
74addf6c 50static void Enqueue(helper * hlp, helper_request *);
51static helper_request *Dequeue(helper * hlp);
94439e4e 52static helper_stateful_request *StatefulDequeue(statefulhelper * hlp);
74addf6c 53static helper_server *GetFirstAvailable(helper * hlp);
94439e4e 54static helper_stateful_server *StatefulGetFirstAvailable(statefulhelper * hlp);
74addf6c 55static void helperDispatch(helper_server * srv, helper_request * r);
94439e4e 56static void helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r);
74addf6c 57static void helperKickQueue(helper * hlp);
94439e4e 58static void helperStatefulKickQueue(statefulhelper * hlp);
74addf6c 59static void helperRequestFree(helper_request * r);
94439e4e 60static void helperStatefulRequestFree(helper_stateful_request * r);
61static void StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r);
62static helper_stateful_request *StatefulServerDequeue(helper_stateful_server * srv);
63static void StatefulServerEnqueue(helper_stateful_server * srv, helper_stateful_request * r);
64static void helperStatefulServerKickQueue(helper_stateful_server * srv);
74addf6c 65
aa839030 66CBDATA_TYPE(helper);
67CBDATA_TYPE(helper_server);
68CBDATA_TYPE(statefulhelper);
69CBDATA_TYPE(helper_stateful_server);
70
74addf6c 71void
72helperOpenServers(helper * hlp)
73{
74 char *s;
75 char *progname;
76 char *shortname;
77 char *procname;
a2c963ae 78 const char *args[HELPER_MAX_ARGS];
74addf6c 79 char fd_note_buf[FD_DESC_SZ];
80 helper_server *srv;
81 int nargs = 0;
82 int k;
b5d712b5 83 pid_t pid;
74addf6c 84 int rfd;
85 int wfd;
b5d712b5 86 void * hIpc;
74addf6c 87 wordlist *w;
62e76326 88
74addf6c 89 if (hlp->cmdline == NULL)
62e76326 90 return;
91
74addf6c 92 progname = hlp->cmdline->key;
62e76326 93
74addf6c 94 if ((s = strrchr(progname, '/')))
62e76326 95 shortname = xstrdup(s + 1);
74addf6c 96 else
62e76326 97 shortname = xstrdup(progname);
98
bf8fe701 99 debugs(84, 1, "helperOpenServers: Starting " << hlp->n_to_start << " '" << shortname << "' processes");
62e76326 100
e6ccf245 101 procname = (char *)xmalloc(strlen(shortname) + 3);
62e76326 102
74addf6c 103 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
62e76326 104
74addf6c 105 args[nargs++] = procname;
62e76326 106
74addf6c 107 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next)
62e76326 108 args[nargs++] = w->key;
109
74addf6c 110 args[nargs++] = NULL;
62e76326 111
74addf6c 112 assert(nargs <= HELPER_MAX_ARGS);
62e76326 113
74addf6c 114 for (k = 0; k < hlp->n_to_start; k++) {
62e76326 115 getCurrentTime();
116 rfd = wfd = -1;
b5d712b5 117 pid = ipcCreate(hlp->ipc_type,
118 progname,
119 args,
120 shortname,
121 &rfd,
122 &wfd,
123 &hIpc);
124
125 if (pid < 0) {
bf8fe701 126 debugs(84, 1, "WARNING: Cannot run '" << progname << "' process.");
62e76326 127 continue;
128 }
129
130 hlp->n_running++;
d8f10d6a 131 hlp->n_active++;
b5d712b5 132 CBDATA_INIT_TYPE(helper_server);
62e76326 133 srv = cbdataAlloc(helper_server);
b5d712b5 134 srv->hIpc = hIpc;
135 srv->pid = pid;
62e76326 136 srv->index = k;
137 srv->rfd = rfd;
138 srv->wfd = wfd;
07eca7e0 139 srv->rbuf = (char *)memAllocBuf(8192, &srv->rbuf_sz);
90bed1c4 140 srv->wqueue = new MemBuf;
07eca7e0 141 srv->roffset = 0;
142 srv->requests = (helper_request **)xcalloc(hlp->concurrency ? hlp->concurrency : 1, sizeof(*srv->requests));
62e76326 143 srv->parent = cbdataReference(hlp);
144 dlinkAddTail(srv, &srv->link, &hlp->servers);
145
146 if (rfd == wfd) {
147 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
148 fd_note(rfd, fd_note_buf);
149 } else {
150 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
151 fd_note(rfd, fd_note_buf);
152 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
153 fd_note(wfd, fd_note_buf);
154 }
155
156 commSetNonBlocking(rfd);
157
158 if (wfd != rfd)
159 commSetNonBlocking(wfd);
160
161 comm_add_close_handler(rfd, helperServerFree, srv);
07eca7e0 162
163 comm_read(srv->rfd, srv->rbuf, srv->rbuf_sz - 1, helperHandleRead, srv);
74addf6c 164 }
62e76326 165
5ea33fce 166 hlp->last_restart = squid_curtime;
74addf6c 167 safe_free(shortname);
168 safe_free(procname);
838b993c 169 helperKickQueue(hlp);
74addf6c 170}
171
94439e4e 172void
173helperStatefulOpenServers(statefulhelper * hlp)
174{
175 char *s;
176 char *progname;
177 char *shortname;
178 char *procname;
a2c963ae 179 const char *args[HELPER_MAX_ARGS];
94439e4e 180 char fd_note_buf[FD_DESC_SZ];
181 helper_stateful_server *srv;
182 int nargs = 0;
183 int k;
b5d712b5 184 pid_t pid;
94439e4e 185 int rfd;
186 int wfd;
b5d712b5 187 void * hIpc;
94439e4e 188 wordlist *w;
62e76326 189
94439e4e 190 if (hlp->cmdline == NULL)
62e76326 191 return;
192
94439e4e 193 progname = hlp->cmdline->key;
62e76326 194
94439e4e 195 if ((s = strrchr(progname, '/')))
62e76326 196 shortname = xstrdup(s + 1);
94439e4e 197 else
62e76326 198 shortname = xstrdup(progname);
199
bf8fe701 200 debugs(84, 1, "helperStatefulOpenServers: Starting " << hlp->n_to_start << " '" << shortname << "' processes");
62e76326 201
e6ccf245 202 procname = (char *)xmalloc(strlen(shortname) + 3);
62e76326 203
94439e4e 204 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
62e76326 205
94439e4e 206 args[nargs++] = procname;
62e76326 207
94439e4e 208 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next)
62e76326 209 args[nargs++] = w->key;
210
94439e4e 211 args[nargs++] = NULL;
62e76326 212
94439e4e 213 assert(nargs <= HELPER_MAX_ARGS);
62e76326 214
94439e4e 215 for (k = 0; k < hlp->n_to_start; k++) {
62e76326 216 getCurrentTime();
217 rfd = wfd = -1;
b5d712b5 218 pid = ipcCreate(hlp->ipc_type,
219 progname,
220 args,
221 shortname,
222 &rfd,
223 &wfd,
224 &hIpc);
225
226 if (pid < 0) {
bf8fe701 227 debugs(84, 1, "WARNING: Cannot run '" << progname << "' process.");
62e76326 228 continue;
229 }
230
231 hlp->n_running++;
d8f10d6a 232 hlp->n_active++;
b5d712b5 233 CBDATA_INIT_TYPE(helper_stateful_server);
62e76326 234 srv = cbdataAlloc(helper_stateful_server);
b5d712b5 235 srv->hIpc = hIpc;
236 srv->pid = pid;
62e76326 237 srv->flags.reserved = S_HELPER_FREE;
238 srv->deferred_requests = 0;
239 srv->stats.deferbyfunc = 0;
240 srv->stats.deferbycb = 0;
241 srv->stats.submits = 0;
242 srv->stats.releases = 0;
243 srv->index = k;
244 srv->rfd = rfd;
245 srv->wfd = wfd;
07eca7e0 246 srv->rbuf = (char *)memAllocBuf(8192, &srv->rbuf_sz);
247 srv->roffset = 0;
62e76326 248 srv->parent = cbdataReference(hlp);
249
250 if (hlp->datapool != NULL)
b001e822 251 srv->data = hlp->datapool->alloc();
62e76326 252
253 dlinkAddTail(srv, &srv->link, &hlp->servers);
254
255 if (rfd == wfd) {
256 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
257 fd_note(rfd, fd_note_buf);
258 } else {
259 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
260 fd_note(rfd, fd_note_buf);
261 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
262 fd_note(wfd, fd_note_buf);
263 }
264
265 commSetNonBlocking(rfd);
266
267 if (wfd != rfd)
268 commSetNonBlocking(wfd);
269
270 comm_add_close_handler(rfd, helperStatefulServerFree, srv);
07eca7e0 271
272 comm_read(srv->rfd, srv->rbuf, srv->rbuf_sz - 1, helperStatefulHandleRead, srv);
273
94439e4e 274 }
62e76326 275
5ea33fce 276 hlp->last_restart = squid_curtime;
94439e4e 277 safe_free(shortname);
278 safe_free(procname);
279 helperStatefulKickQueue(hlp);
280}
281
282
74addf6c 283void
284helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
285{
5b5f9257 286 if (hlp == NULL) {
bf8fe701 287 debugs(84, 3, "helperSubmit: hlp == NULL");
62e76326 288 callback(data, NULL);
289 return;
5b5f9257 290 }
62e76326 291
eff7218b 292 helper_request *r = new helper_request;
293 helper_server *srv;
294
74addf6c 295 r->callback = callback;
fa80a8ef 296 r->data = cbdataReference(data);
74addf6c 297 r->buf = xstrdup(buf);
62e76326 298
74addf6c 299 if ((srv = GetFirstAvailable(hlp)))
62e76326 300 helperDispatch(srv, r);
74addf6c 301 else
62e76326 302 Enqueue(hlp, r);
303
bf8fe701 304 debugs(84, 9, "helperSubmit: " << buf);
94439e4e 305}
306
721b0310 307/* lastserver = "server last used as part of a deferred or reserved
308 * request sequence"
309 */
94439e4e 310void
311helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPSCB * callback, void *data, helper_stateful_server * lastserver)
312{
94439e4e 313 if (hlp == NULL) {
bf8fe701 314 debugs(84, 3, "helperStatefulSubmit: hlp == NULL");
62e76326 315 callback(data, 0, NULL);
316 return;
94439e4e 317 }
62e76326 318
eff7218b 319 helper_stateful_request *r = new helper_stateful_request;
320 helper_stateful_server *srv;
321
94439e4e 322 r->callback = callback;
fa80a8ef 323 r->data = cbdataReference(data);
62e76326 324
721b0310 325 if (buf != NULL) {
62e76326 326 r->buf = xstrdup(buf);
327 r->placeholder = 0;
721b0310 328 } else {
62e76326 329 r->buf = NULL;
330 r->placeholder = 1;
721b0310 331 }
62e76326 332
94439e4e 333 if ((buf != NULL) && lastserver) {
bf8fe701 334 debugs(84, 5, "StatefulSubmit with lastserver " << lastserver);
62e76326 335 /* the queue doesn't count for this assert because queued requests
336 * have already gone through here and been tested.
337 * It's legal to have deferred_requests == 0 and queue entries
338 * and status of S_HELPEER_DEFERRED.
339 * BUT: It's not legal to submit a new request w/lastserver in
340 * that state.
341 */
342 assert(!(lastserver->deferred_requests == 0 &&
343 lastserver->flags.reserved == S_HELPER_DEFERRED));
344
345 if (lastserver->flags.reserved != S_HELPER_RESERVED) {
346 lastserver->stats.submits++;
347 lastserver->deferred_requests--;
348 }
349
350 if (!(lastserver->request)) {
bf8fe701 351 debugs(84, 5, "StatefulSubmit dispatching");
62e76326 352 helperStatefulDispatch(lastserver, r);
353 } else {
bf8fe701 354 debugs(84, 5, "StatefulSubmit queuing");
62e76326 355 StatefulServerEnqueue(lastserver, r);
356 }
94439e4e 357 } else {
62e76326 358 if ((srv = StatefulGetFirstAvailable(hlp))) {
359 helperStatefulDispatch(srv, r);
360 } else
361 StatefulEnqueue(hlp, r);
94439e4e 362 }
62e76326 363
bf8fe701 364 debugs(84, 9, "helperStatefulSubmit: placeholder: '" << r->placeholder << "', buf '" << buf << "'.");
94439e4e 365}
366
367helper_stateful_server *
368helperStatefulDefer(statefulhelper * hlp)
369/* find and add a deferred request to a server */
370{
62e76326 371 if (hlp == NULL)
372 {
bf8fe701 373 debugs(84, 3, "helperStatefulDefer: hlp == NULL");
62e76326 374 return NULL;
94439e4e 375 }
62e76326 376
eff7218b 377 dlink_node *n;
378 helper_stateful_server *srv = NULL, *rv = NULL;
379
bf8fe701 380 debugs(84, 5, "helperStatefulDefer: Running servers " << hlp->n_running << ".");
62e76326 381
382 if (hlp->n_running == 0)
383 {
bf8fe701 384 debugs(84, 1, "helperStatefulDefer: No running servers!. ");
62e76326 385 return NULL;
94439e4e 386 }
62e76326 387
0a706c69 388 rv = srv = StatefulGetFirstAvailable(hlp);
62e76326 389
390 if (rv == NULL)
391 {
392 /*
393 * all currently busy; loop through servers and find server
394 * with the shortest queue
395 */
396
397 for (n = hlp->servers.head; n != NULL; n = n->next) {
398 srv = (helper_stateful_server *)n->data;
399
400 if (srv->flags.reserved == S_HELPER_RESERVED)
401 continue;
402
b902a58c 403 if (!srv->flags.shutdown)
62e76326 404 continue;
405
406 if ((hlp->IsAvailable != NULL) && (srv->data != NULL) &&
407 !(hlp->IsAvailable(srv->data)))
408 continue;
409
410 if ((rv != NULL) && (rv->deferred_requests < srv->deferred_requests))
411 continue;
412
413 rv = srv;
414 }
0a706c69 415 }
62e76326 416
417 if (rv == NULL)
418 {
bf8fe701 419 debugs(84, 1, "helperStatefulDefer: None available.");
62e76326 420 return NULL;
94439e4e 421 }
62e76326 422
60d096f4 423 /* consistency check:
424 * when the deferred count is 0,
425 * submits + releases == deferbyfunc + deferbycb
426 * Or in english, when there are no deferred requests, the amount
427 * we have submitted to the queue or cancelled must equal the amount
428 * we have said we wanted to be able to submit or cancel
429 */
430 if (rv->deferred_requests == 0)
62e76326 431 assert(rv->stats.submits + rv->stats.releases ==
432 rv->stats.deferbyfunc + rv->stats.deferbycb);
60d096f4 433
94439e4e 434 rv->flags.reserved = S_HELPER_DEFERRED;
62e76326 435
94439e4e 436 rv->deferred_requests++;
62e76326 437
60d096f4 438 rv->stats.deferbyfunc++;
62e76326 439
94439e4e 440 return rv;
441}
442
443void
444helperStatefulReset(helper_stateful_server * srv)
62e76326 445/* puts this helper back in the queue. the calling app is required to
94439e4e 446 * manage the state in the helper.
447 */
448{
449 statefulhelper *hlp = srv->parent;
450 helper_stateful_request *r;
451 r = srv->request;
62e76326 452
453 if (r != NULL)
454 {
455 /* reset attempt DURING an outstaning request */
bf8fe701 456 debugs(84, 1, "helperStatefulReset: RESET During request " << hlp->id_name << " ");
62e76326 457 srv->flags.busy = 0;
07eca7e0 458 srv->roffset = 0;
62e76326 459 helperStatefulRequestFree(r);
460 srv->request = NULL;
94439e4e 461 }
62e76326 462
94439e4e 463 srv->flags.busy = 0;
62e76326 464
465 if (srv->queue.head)
466 {
467 srv->flags.reserved = S_HELPER_DEFERRED;
468 helperStatefulServerKickQueue(srv);
469 } else
470 {
471 srv->flags.reserved = S_HELPER_FREE;
472
473 if ((srv->parent->OnEmptyQueue != NULL) && (srv->data))
474 srv->parent->OnEmptyQueue(srv->data);
475
476 helperStatefulKickQueue(hlp);
94439e4e 477 }
478}
479
480void
481helperStatefulReleaseServer(helper_stateful_server * srv)
482/*decrease the number of 'waiting' clients that set the helper to be DEFERRED */
483{
60d096f4 484 srv->stats.releases++;
62e76326 485
486 if (srv->flags.reserved == S_HELPER_DEFERRED)
487 {
488 assert(srv->deferred_requests);
489 srv->deferred_requests--;
60d096f4 490 }
62e76326 491
492 if (!(srv->deferred_requests) && (srv->flags.reserved == S_HELPER_DEFERRED) && !(srv->queue.head))
493 {
494 srv->flags.reserved = S_HELPER_FREE;
495
496 if ((srv->parent->OnEmptyQueue != NULL) && (srv->data))
497 srv->parent->OnEmptyQueue(srv->data);
94439e4e 498 }
499}
500
501void *
502helperStatefulServerGetData(helper_stateful_server * srv)
503/* return a pointer to the stateful routines data area */
504{
505 return srv->data;
74addf6c 506}
507
508void
509helperStats(StoreEntry * sentry, helper * hlp)
510{
74addf6c 511 dlink_node *link;
0a706c69 512 storeAppendPrintf(sentry, "program: %s\n",
62e76326 513 hlp->cmdline->key);
74addf6c 514 storeAppendPrintf(sentry, "number running: %d of %d\n",
62e76326 515 hlp->n_running, hlp->n_to_start);
74addf6c 516 storeAppendPrintf(sentry, "requests sent: %d\n",
62e76326 517 hlp->stats.requests);
74addf6c 518 storeAppendPrintf(sentry, "replies received: %d\n",
62e76326 519 hlp->stats.replies);
74addf6c 520 storeAppendPrintf(sentry, "queue length: %d\n",
62e76326 521 hlp->stats.queue_size);
f9598528 522 storeAppendPrintf(sentry, "avg service time: %d msec\n",
523 hlp->stats.avg_svc_time);
74addf6c 524 storeAppendPrintf(sentry, "\n");
09c1ece1 525 storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%s\t%7s\t%7s\t%7s\n",
62e76326 526 "#",
527 "FD",
528 "PID",
529 "# Requests",
530 "Flags",
531 "Time",
532 "Offset",
533 "Request");
534
74addf6c 535 for (link = hlp->servers.head; link; link = link->next) {
07eca7e0 536 helper_server *srv = (helper_server*)link->data;
f9598528 537 double tt = 0.001 * (srv->requests[0] ? tvSubMsec(srv->requests[0]->dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
d8f10d6a 538 storeAppendPrintf(sentry, "%7d\t%7d\t%7d\t%11d\t%c%c%c%c\t%7.3f\t%7d\t%s\n",
62e76326 539 srv->index + 1,
540 srv->rfd,
541 srv->pid,
542 srv->stats.uses,
07eca7e0 543 srv->stats.pending ? 'B' : ' ',
544 srv->flags.writing ? 'W' : ' ',
62e76326 545 srv->flags.closing ? 'C' : ' ',
546 srv->flags.shutdown ? 'S' : ' ',
547 tt < 0.0 ? 0.0 : tt,
07eca7e0 548 (int) srv->roffset,
549 srv->requests[0] ? log_quote(srv->requests[0]->buf) : "(none)");
74addf6c 550 }
62e76326 551
74addf6c 552 storeAppendPrintf(sentry, "\nFlags key:\n\n");
74addf6c 553 storeAppendPrintf(sentry, " B = BUSY\n");
f9598528 554 storeAppendPrintf(sentry, " W = WRITING\n");
74addf6c 555 storeAppendPrintf(sentry, " C = CLOSING\n");
556 storeAppendPrintf(sentry, " S = SHUTDOWN\n");
557}
558
94439e4e 559void
560helperStatefulStats(StoreEntry * sentry, statefulhelper * hlp)
561{
562 helper_stateful_server *srv;
563 dlink_node *link;
564 double tt;
cf154edc 565 storeAppendPrintf(sentry, "program: %s\n",
566 hlp->cmdline->key);
94439e4e 567 storeAppendPrintf(sentry, "number running: %d of %d\n",
62e76326 568 hlp->n_running, hlp->n_to_start);
94439e4e 569 storeAppendPrintf(sentry, "requests sent: %d\n",
62e76326 570 hlp->stats.requests);
94439e4e 571 storeAppendPrintf(sentry, "replies received: %d\n",
62e76326 572 hlp->stats.replies);
94439e4e 573 storeAppendPrintf(sentry, "queue length: %d\n",
62e76326 574 hlp->stats.queue_size);
94439e4e 575 storeAppendPrintf(sentry, "avg service time: %d msec\n",
62e76326 576 hlp->stats.avg_svc_time);
94439e4e 577 storeAppendPrintf(sentry, "\n");
0a706c69 578 storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%20s\t%s\t%7s\t%7s\t%7s\n",
62e76326 579 "#",
580 "FD",
581 "PID",
582 "# Requests",
583 "# Deferred Requests",
584 "Flags",
585 "Time",
586 "Offset",
587 "Request");
588
94439e4e 589 for (link = hlp->servers.head; link; link = link->next) {
62e76326 590 srv = (helper_stateful_server *)link->data;
f9598528 591 tt = 0.001 * tvSubMsec(srv->dispatch_time,
592 srv->flags.busy ? current_time : srv->answer_time);
b902a58c 593 storeAppendPrintf(sentry, "%7d\t%7d\t%7d\t%11d\t%20d\t%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
62e76326 594 srv->index + 1,
595 srv->rfd,
596 srv->pid,
597 srv->stats.uses,
598 (int) srv->deferred_requests,
62e76326 599 srv->flags.busy ? 'B' : ' ',
600 srv->flags.closing ? 'C' : ' ',
601 srv->flags.reserved != S_HELPER_FREE ? 'R' : ' ',
602 srv->flags.shutdown ? 'S' : ' ',
603 srv->request ? (srv->request->placeholder ? 'P' : ' ') : ' ',
604 tt < 0.0 ? 0.0 : tt,
07eca7e0 605 (int) srv->roffset,
62e76326 606 srv->request ? log_quote(srv->request->buf) : "(none)");
94439e4e 607 }
62e76326 608
94439e4e 609 storeAppendPrintf(sentry, "\nFlags key:\n\n");
94439e4e 610 storeAppendPrintf(sentry, " B = BUSY\n");
611 storeAppendPrintf(sentry, " C = CLOSING\n");
612 storeAppendPrintf(sentry, " R = RESERVED or DEFERRED\n");
613 storeAppendPrintf(sentry, " S = SHUTDOWN\n");
5d146f7d 614 storeAppendPrintf(sentry, " P = PLACEHOLDER\n");
94439e4e 615}
616
74addf6c 617void
618helperShutdown(helper * hlp)
619{
c68e9c6b 620 dlink_node *link = hlp->servers.head;
2d5d5261 621#ifdef _SQUID_MSWIN_
622
623 HANDLE hIpc;
624 pid_t pid;
625 int no;
626#endif
62e76326 627
c68e9c6b 628 while (link) {
62e76326 629 helper_server *srv;
630 srv = (helper_server *)link->data;
631 link = link->next;
632
8cfc76db 633 if (srv->flags.shutdown) {
bf8fe701 634 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " has already SHUT DOWN.");
62e76326 635 continue;
636 }
637
d8f10d6a 638 hlp->n_active--;
639 assert(hlp->n_active >= 0);
640
62e76326 641 srv->flags.shutdown = 1; /* request it to shut itself down */
642
d8f10d6a 643 if (srv->flags.closing) {
bf8fe701 644 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is CLOSING.");
62e76326 645 continue;
646 }
647
d8f10d6a 648 if (srv->stats.pending) {
bf8fe701 649 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is BUSY.");
62e76326 650 continue;
651 }
652
653 srv->flags.closing = 1;
2d5d5261 654#ifdef _SQUID_MSWIN_
655
656 hIpc = srv->hIpc;
657 pid = srv->pid;
658 no = srv->index + 1;
659 shutdown(srv->wfd, SD_BOTH);
660#endif
661
bf8fe701 662 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " shutting down.");
62e76326 663 /* the rest of the details is dealt with in the helperServerFree
664 * close handler
665 */
666 comm_close(srv->rfd);
2d5d5261 667#ifdef _SQUID_MSWIN_
668
669 if (hIpc) {
670 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
671 getCurrentTime();
bf8fe701 672 debugs(84, 1, "helperShutdown: WARNING: " << hlp->id_name <<
673 " #" << no << " (" << hlp->cmdline->key << "," <<
674 (long int)pid << ") didn't exit in 5 seconds");
675
2d5d5261 676 }
677
678 CloseHandle(hIpc);
679 }
680
681#endif
682
74addf6c 683 }
684}
685
94439e4e 686void
687helperStatefulShutdown(statefulhelper * hlp)
688{
689 dlink_node *link = hlp->servers.head;
690 helper_stateful_server *srv;
2d5d5261 691#ifdef _SQUID_MSWIN_
692
693 HANDLE hIpc;
694 pid_t pid;
695 int no;
696#endif
62e76326 697
94439e4e 698 while (link) {
62e76326 699 srv = (helper_stateful_server *)link->data;
700 link = link->next;
701
8cfc76db 702 if (srv->flags.shutdown) {
bf8fe701 703 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " has already SHUT DOWN.");
62e76326 704 continue;
705 }
706
d8f10d6a 707 hlp->n_active--;
708 assert(hlp->n_active >= 0);
62e76326 709 srv->flags.shutdown = 1; /* request it to shut itself down */
710
711 if (srv->flags.busy) {
bf8fe701 712 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is BUSY.");
62e76326 713 continue;
714 }
715
716 if (srv->flags.closing) {
bf8fe701 717 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is CLOSING.");
62e76326 718 continue;
719 }
720
721 if (srv->flags.reserved != S_HELPER_FREE) {
bf8fe701 722 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is RESERVED.");
62e76326 723 continue;
724 }
725
726 if (srv->deferred_requests) {
bf8fe701 727 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " has DEFERRED requests.");
62e76326 728 continue;
729 }
730
731 srv->flags.closing = 1;
2d5d5261 732#ifdef _SQUID_MSWIN_
733
734 hIpc = srv->hIpc;
735 pid = srv->pid;
736 no = srv->index + 1;
737 shutdown(srv->wfd, SD_BOTH);
738#endif
739
bf8fe701 740 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " shutting down.");
741
62e76326 742 /* the rest of the details is dealt with in the helperStatefulServerFree
743 * close handler
744 */
745 comm_close(srv->rfd);
2d5d5261 746#ifdef _SQUID_MSWIN_
747
748 if (hIpc) {
749 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
750 getCurrentTime();
bf8fe701 751 debugs(84, 1, "helperShutdown: WARNING: " << hlp->id_name <<
752 " #" << no << " (" << hlp->cmdline->key << "," <<
753 (long int)pid << ") didn't exit in 5 seconds");
2d5d5261 754 }
755
756 CloseHandle(hIpc);
757 }
758
759#endif
760
94439e4e 761 }
762}
763
764
1f5f60dd 765helper *
766helperCreate(const char *name)
767{
28c60158 768 helper *hlp;
aa839030 769 CBDATA_INIT_TYPE(helper);
72711e31 770 hlp = cbdataAlloc(helper);
1f5f60dd 771 hlp->id_name = name;
772 return hlp;
773}
774
94439e4e 775statefulhelper *
776helperStatefulCreate(const char *name)
777{
778 statefulhelper *hlp;
aa839030 779 CBDATA_INIT_TYPE(statefulhelper);
72711e31 780 hlp = cbdataAlloc(statefulhelper);
94439e4e 781 hlp->id_name = name;
782 return hlp;
783}
784
785
1f5f60dd 786void
787helperFree(helper * hlp)
788{
5dae8514 789 if (!hlp)
62e76326 790 return;
791
1f5f60dd 792 /* note, don't free hlp->name, it probably points to static memory */
fe73896c 793 if (hlp->queue.head)
bf8fe701 794 debugs(84, 0, "WARNING: freeing " << hlp->id_name << " helper with " <<
795 hlp->stats.queue_size << " requests queued");
62e76326 796
1f5f60dd 797 cbdataFree(hlp);
798}
799
94439e4e 800void
801helperStatefulFree(statefulhelper * hlp)
802{
5dae8514 803 if (!hlp)
62e76326 804 return;
805
94439e4e 806 /* note, don't free hlp->name, it probably points to static memory */
807 if (hlp->queue.head)
bf8fe701 808 debugs(84, 0, "WARNING: freeing " << hlp->id_name << " helper with " <<
809 hlp->stats.queue_size << " requests queued");
62e76326 810
94439e4e 811 cbdataFree(hlp);
812}
813
814
74addf6c 815/* ====================================================================== */
816/* LOCAL FUNCTIONS */
817/* ====================================================================== */
818
819static void
1f5f60dd 820helperServerFree(int fd, void *data)
74addf6c 821{
e6ccf245 822 helper_server *srv = (helper_server *)data;
74addf6c 823 helper *hlp = srv->parent;
ac750329 824 helper_request *r;
07eca7e0 825 int i, concurrency = hlp->concurrency;
826
827 if (!concurrency)
828 concurrency = 1;
829
74addf6c 830 assert(srv->rfd == fd);
62e76326 831
07eca7e0 832 if (srv->rbuf) {
833 memFreeBuf(srv->rbuf_sz, srv->rbuf);
834 srv->rbuf = NULL;
74addf6c 835 }
62e76326 836
2fe7eff9 837 srv->wqueue->clean();
032785bf 838 delete srv->wqueue;
839
840 if (srv->writebuf) {
2fe7eff9 841 srv->writebuf->clean();
032785bf 842 delete srv->writebuf;
843 srv->writebuf = NULL;
844 }
62e76326 845
07eca7e0 846 for (i = 0; i < concurrency; i++) {
847 if ((r = srv->requests[i])) {
848 void *cbdata;
62e76326 849
07eca7e0 850 if (cbdataReferenceValidDone(r->data, &cbdata))
851 r->callback(cbdata, NULL);
62e76326 852
07eca7e0 853 helperRequestFree(r);
854
855 srv->requests[i] = NULL;
856 }
ac750329 857 }
62e76326 858
07eca7e0 859 safe_free(srv->requests);
860
3cdb7cd0 861 if (srv->wfd != srv->rfd && srv->wfd != -1)
62e76326 862 comm_close(srv->wfd);
863
74addf6c 864 dlinkDelete(&srv->link, &hlp->servers);
62e76326 865
74addf6c 866 hlp->n_running--;
62e76326 867
74addf6c 868 assert(hlp->n_running >= 0);
62e76326 869
1f5f60dd 870 if (!srv->flags.shutdown) {
d8f10d6a 871 hlp->n_active--;
872 assert(hlp->n_active >= 0);
bf8fe701 873 debugs(84, 0, "WARNING: " << hlp->id_name << " #" << srv->index + 1 <<
874 " (FD " << fd << ") exited");
62e76326 875
5ea33fce 876 if (hlp->n_active < hlp->n_to_start / 2) {
bf8fe701 877 debugs(80, 0, "Too few " << hlp->id_name << " processes are running");
5ea33fce 878
879 if (hlp->last_restart > squid_curtime - 30)
880 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
881
bf8fe701 882 debugs(80, 0, "Starting new helpers");
5ea33fce 883
884 helperOpenServers(hlp);
885 }
14e87a44 886 }
62e76326 887
fa80a8ef 888 cbdataReferenceDone(srv->parent);
14e87a44 889 cbdataFree(srv);
74addf6c 890}
891
94439e4e 892static void
893helperStatefulServerFree(int fd, void *data)
894{
e6ccf245 895 helper_stateful_server *srv = (helper_stateful_server *)data;
94439e4e 896 statefulhelper *hlp = srv->parent;
897 helper_stateful_request *r;
898 assert(srv->rfd == fd);
62e76326 899
07eca7e0 900 if (srv->rbuf) {
901 memFreeBuf(srv->rbuf_sz, srv->rbuf);
902 srv->rbuf = NULL;
94439e4e 903 }
62e76326 904
07eca7e0 905#if 0
2fe7eff9 906 srv->wqueue->clean();
032785bf 907
908 delete srv->wqueue;
07eca7e0 909
910#endif
911
94439e4e 912 if ((r = srv->request)) {
62e76326 913 void *cbdata;
914
915 if (cbdataReferenceValidDone(r->data, &cbdata))
07eca7e0 916 r->callback(cbdata, srv, NULL);
62e76326 917
918 helperStatefulRequestFree(r);
919
920 srv->request = NULL;
94439e4e 921 }
62e76326 922
3c641669 923 /* TODO: walk the local queue of requests and carry them all out */
94439e4e 924 if (srv->wfd != srv->rfd && srv->wfd != -1)
62e76326 925 comm_close(srv->wfd);
926
94439e4e 927 dlinkDelete(&srv->link, &hlp->servers);
62e76326 928
94439e4e 929 hlp->n_running--;
62e76326 930
94439e4e 931 assert(hlp->n_running >= 0);
62e76326 932
94439e4e 933 if (!srv->flags.shutdown) {
d8f10d6a 934 hlp->n_active--;
935 assert( hlp->n_active >= 0);
bf8fe701 936 debugs(84, 0, "WARNING: " << hlp->id_name << " #" << srv->index + 1 << " (FD " << fd << ") exited");
62e76326 937
5ea33fce 938 if (hlp->n_active <= hlp->n_to_start / 2) {
bf8fe701 939 debugs(80, 0, "Too few " << hlp->id_name << " processes are running");
5ea33fce 940
941 if (hlp->last_restart > squid_curtime - 30)
942 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
943
bf8fe701 944 debugs(80, 0, "Starting new helpers");
5ea33fce 945
946 helperStatefulOpenServers(hlp);
947 }
94439e4e 948 }
62e76326 949
94439e4e 950 if (srv->data != NULL)
b001e822 951 hlp->datapool->free(srv->data);
62e76326 952
fa80a8ef 953 cbdataReferenceDone(srv->parent);
62e76326 954
94439e4e 955 cbdataFree(srv);
956}
957
958
74addf6c 959static void
c4b7a5a9 960helperHandleRead(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
74addf6c 961{
74addf6c 962 char *t = NULL;
e6ccf245 963 helper_server *srv = (helper_server *)data;
74addf6c 964 helper *hlp = srv->parent;
965 assert(fd == srv->rfd);
fa80a8ef 966 assert(cbdataReferenceValid(data));
c4b7a5a9 967
968 /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
62e76326 969
c4b7a5a9 970 if (flag == COMM_ERR_CLOSING) {
971 return;
972 }
973
4a7a3d56 974 debugs(84, 5, "helperHandleRead: " << len << " bytes from " << hlp->id_name << " #" << srv->index + 1 << ".");
62e76326 975
c4b7a5a9 976 if (flag != COMM_OK || len <= 0) {
62e76326 977 if (len < 0)
bf8fe701 978 debugs(84, 1, "helperHandleRead: FD " << fd << " read: " << xstrerror());
62e76326 979
980 comm_close(fd);
981
982 return;
74addf6c 983 }
62e76326 984
07eca7e0 985 srv->roffset += len;
986 srv->rbuf[srv->roffset] = '\0';
bf8fe701 987 debugs(84, 9, "helperHandleRead: '" << srv->rbuf << "'");
62e76326 988
07eca7e0 989 if (!srv->stats.pending) {
62e76326 990 /* someone spoke without being spoken to */
bf8fe701 991 debugs(84, 1, "helperHandleRead: unexpected read from " <<
992 hlp->id_name << " #" << srv->index + 1 << ", " << (int)len <<
993 " bytes '" << srv->rbuf << "'");
994
07eca7e0 995 srv->roffset = 0;
996 srv->rbuf[0] = '\0';
997 }
998
999 while ((t = strchr(srv->rbuf, '\n'))) {
62e76326 1000 /* end of reply found */
07eca7e0 1001 helper_request *r;
1002 char *msg = srv->rbuf;
1003 int i = 0;
bf8fe701 1004 debugs(84, 3, "helperHandleRead: end of reply found");
6bf4f823 1005
1006 if (t > srv->rbuf && t[-1] == '\r')
1007 t[-1] = '\0';
1008
07eca7e0 1009 *t++ = '\0';
62e76326 1010
07eca7e0 1011 if (hlp->concurrency) {
1012 i = strtol(msg, &msg, 10);
62e76326 1013
e4755e29 1014 while (*msg && xisspace(*msg))
07eca7e0 1015 msg++;
1016 }
62e76326 1017
07eca7e0 1018 r = srv->requests[i];
62e76326 1019
07eca7e0 1020 if (r) {
1021 HLPCB *callback = r->callback;
1022 void *cbdata;
62e76326 1023
07eca7e0 1024 srv->requests[i] = NULL;
62e76326 1025
07eca7e0 1026 r->callback = NULL;
62e76326 1027
07eca7e0 1028 if (cbdataReferenceValidDone(r->data, &cbdata))
1029 callback(cbdata, msg);
62e76326 1030
07eca7e0 1031 srv->stats.pending--;
1032
1033 hlp->stats.replies++;
1034
f9598528 1035 srv->answer_time = current_time;
1036
1037 srv->dispatch_time = r->dispatch_time;
1038
07eca7e0 1039 hlp->stats.avg_svc_time =
1040 intAverage(hlp->stats.avg_svc_time,
1041 tvSubMsec(r->dispatch_time, current_time),
1042 hlp->stats.replies, REDIRECT_AV_FACTOR);
1043
1044 helperRequestFree(r);
1045 } else {
bf8fe701 1046 debugs(84, 1, "helperHandleRead: unexpected reply on channel " <<
1047 i << " from " << hlp->id_name << " #" << srv->index + 1 <<
1048 " '" << srv->rbuf << "'");
1049
07eca7e0 1050 }
1051
1052 srv->roffset -= (t - srv->rbuf);
1053 memmove(srv->rbuf, t, srv->roffset + 1);
62e76326 1054
1055 if (srv->flags.shutdown) {
1056 int wfd = srv->wfd;
1057 srv->wfd = -1;
d8f10d6a 1058 srv->flags.closing=1;
62e76326 1059 comm_close(wfd);
1060 } else
1061 helperKickQueue(hlp);
74addf6c 1062 }
07eca7e0 1063
1064 comm_read(fd, srv->rbuf + srv->roffset, srv->rbuf_sz - srv->roffset - 1, helperHandleRead, srv);
74addf6c 1065}
1066
94439e4e 1067static void
c4b7a5a9 1068helperStatefulHandleRead(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
94439e4e 1069{
94439e4e 1070 char *t = NULL;
e6ccf245 1071 helper_stateful_server *srv = (helper_stateful_server *)data;
94439e4e 1072 helper_stateful_request *r;
1073 statefulhelper *hlp = srv->parent;
1074 assert(fd == srv->rfd);
fa80a8ef 1075 assert(cbdataReferenceValid(data));
c4b7a5a9 1076
1077 /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
62e76326 1078
c4b7a5a9 1079 if (flag == COMM_ERR_CLOSING) {
1080 return;
1081 }
1082
4a7a3d56 1083 debugs(84, 5, "helperStatefulHandleRead: " << len << " bytes from " <<
bf8fe701 1084 hlp->id_name << " #" << srv->index + 1 << ".");
1085
62e76326 1086
c4b7a5a9 1087 if (flag != COMM_OK || len <= 0) {
62e76326 1088 if (len < 0)
bf8fe701 1089 debugs(84, 1, "helperStatefulHandleRead: FD " << fd << " read: " << xstrerror());
62e76326 1090
1091 comm_close(fd);
1092
1093 return;
94439e4e 1094 }
62e76326 1095
07eca7e0 1096 srv->roffset += len;
1097 srv->rbuf[srv->roffset] = '\0';
94439e4e 1098 r = srv->request;
62e76326 1099
94439e4e 1100 if (r == NULL) {
62e76326 1101 /* someone spoke without being spoken to */
bf8fe701 1102 debugs(84, 1, "helperStatefulHandleRead: unexpected read from " <<
1103 hlp->id_name << " #" << srv->index + 1 << ", " << (int)len <<
1104 " bytes '" << srv->rbuf << "'");
1105
07eca7e0 1106 srv->roffset = 0;
1107 }
1108
1109 if ((t = strchr(srv->rbuf, '\n'))) {
62e76326 1110 /* end of reply found */
bf8fe701 1111 debugs(84, 3, "helperStatefulHandleRead: end of reply found");
6bf4f823 1112
1113 if (t > srv->rbuf && t[-1] == '\r')
1114 t[-1] = '\0';
1115
62e76326 1116 *t = '\0';
1117
1118 if (cbdataReferenceValid(r->data)) {
07eca7e0 1119 switch ((r->callback(r->data, srv, srv->rbuf))) { /*if non-zero reserve helper */
62e76326 1120
1121 case S_HELPER_UNKNOWN:
1122 fatal("helperStatefulHandleRead: either a non-state aware callback was give to the stateful helper routines, or an uninitialised callback response was recieved.\n");
1123 break;
1124
1125 case S_HELPER_RELEASE: /* helper finished with */
1126
1127 if (!srv->deferred_requests && !srv->queue.head) {
1128 srv->flags.reserved = S_HELPER_FREE;
1129
1130 if ((srv->parent->OnEmptyQueue != NULL) && (srv->data))
1131 srv->parent->OnEmptyQueue(srv->data);
1132
bf8fe701 1133 debugs(84, 5, "StatefulHandleRead: releasing " << hlp->id_name << " #" << srv->index + 1);
62e76326 1134 } else {
1135 srv->flags.reserved = S_HELPER_DEFERRED;
bf8fe701 1136 debugs(84, 5, "StatefulHandleRead: outstanding deferred requests on " <<
1137 hlp->id_name << " #" << srv->index + 1 <<
1138 ". reserving for deferred requests.");
62e76326 1139 }
1140
1141 break;
1142
1143 case S_HELPER_RESERVE: /* 'pin' this helper for the caller */
1144
1145 if (!srv->queue.head) {
1146 assert(srv->deferred_requests == 0);
1147 srv->flags.reserved = S_HELPER_RESERVED;
bf8fe701 1148 debugs(84, 5, "StatefulHandleRead: reserving " << hlp->id_name << " #" << srv->index + 1);
62e76326 1149 } else {
1150 fatal("StatefulHandleRead: Callback routine attempted to reserve a stateful helper with deferred requests. This can lead to deadlock.\n");
1151 }
1152
1153 break;
1154
1155 case S_HELPER_DEFER:
1156 /* the helper is still needed, but can
1157 * be used for other requests in the meantime.
1158 */
1159 srv->flags.reserved = S_HELPER_DEFERRED;
1160 srv->deferred_requests++;
1161 srv->stats.deferbycb++;
bf8fe701 1162 debugs(84, 5, "StatefulHandleRead: reserving " << hlp->id_name << " #" << srv->index + 1 << " for deferred requests.");
62e76326 1163 break;
1164
1165 default:
1166 fatal("helperStatefulHandleRead: unknown stateful helper callback result.\n");
1167 }
1168
1169 } else {
bf8fe701 1170 debugs(84, 1, "StatefulHandleRead: no callback data registered");
62e76326 1171 }
1172
1173 srv->flags.busy = 0;
07eca7e0 1174 srv->roffset = 0;
62e76326 1175 helperStatefulRequestFree(r);
1176 srv->request = NULL;
1177 hlp->stats.replies++;
f9598528 1178 srv->answer_time = current_time;
62e76326 1179 hlp->stats.avg_svc_time =
1180 intAverage(hlp->stats.avg_svc_time,
1181 tvSubMsec(srv->dispatch_time, current_time),
1182 hlp->stats.replies, REDIRECT_AV_FACTOR);
1183
1184 if (srv->flags.shutdown
1185 && srv->flags.reserved == S_HELPER_FREE
1186 && !srv->deferred_requests) {
1187 int wfd = srv->wfd;
1188 srv->wfd = -1;
d8f10d6a 1189 srv->flags.closing=1;
62e76326 1190 comm_close(wfd);
1191 } else {
1192 if (srv->queue.head)
1193 helperStatefulServerKickQueue(srv);
1194 else
1195 helperStatefulKickQueue(hlp);
1196 }
94439e4e 1197 }
07eca7e0 1198
1199 comm_read(srv->rfd, srv->rbuf + srv->roffset, srv->rbuf_sz - srv->roffset - 1,
1200 helperStatefulHandleRead, srv);
94439e4e 1201}
1202
74addf6c 1203static void
1204Enqueue(helper * hlp, helper_request * r)
1205{
e6ccf245 1206 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
74addf6c 1207 dlinkAddTail(r, link, &hlp->queue);
1208 hlp->stats.queue_size++;
62e76326 1209
74addf6c 1210 if (hlp->stats.queue_size < hlp->n_running)
62e76326 1211 return;
1212
74addf6c 1213 if (squid_curtime - hlp->last_queue_warn < 600)
62e76326 1214 return;
1215
fe73896c 1216 if (shutting_down || reconfiguring)
62e76326 1217 return;
1218
74addf6c 1219 hlp->last_queue_warn = squid_curtime;
62e76326 1220
bf8fe701 1221 debugs(84, 0, "WARNING: All " << hlp->id_name << " processes are busy.");
1222 debugs(84, 0, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
62e76326 1223
62e76326 1224
74addf6c 1225 if (hlp->stats.queue_size > hlp->n_running * 2)
62e76326 1226 fatalf("Too many queued %s requests", hlp->id_name);
1227
bf8fe701 1228 debugs(84, 1, "Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1229
74addf6c 1230}
1231
94439e4e 1232static void
1233StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r)
1234{
e6ccf245 1235 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
94439e4e 1236 dlinkAddTail(r, link, &hlp->queue);
1237 hlp->stats.queue_size++;
62e76326 1238
94439e4e 1239 if (hlp->stats.queue_size < hlp->n_running)
62e76326 1240 return;
1241
893cbac6 1242 if (hlp->stats.queue_size > hlp->n_running * 2)
62e76326 1243 fatalf("Too many queued %s requests", hlp->id_name);
1244
94439e4e 1245 if (squid_curtime - hlp->last_queue_warn < 600)
62e76326 1246 return;
1247
94439e4e 1248 if (shutting_down || reconfiguring)
62e76326 1249 return;
1250
94439e4e 1251 hlp->last_queue_warn = squid_curtime;
62e76326 1252
bf8fe701 1253 debugs(84, 0, "WARNING: All " << hlp->id_name << " processes are busy.");
62e76326 1254
bf8fe701 1255 debugs(84, 0, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1256 debugs(84, 1, "Consider increasing the number of " << hlp->id_name << " processes in your config file.");
62e76326 1257
94439e4e 1258}
1259
1260static void
1261StatefulServerEnqueue(helper_stateful_server * srv, helper_stateful_request * r)
1262{
e6ccf245 1263 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
94439e4e 1264 dlinkAddTail(r, link, &srv->queue);
62e76326 1265 /* TODO: warning if the queue on this server is more than X
1266 * We don't check the queue size at the moment, because
1267 * requests hitting here are deferrable
1268 */
1269 /* hlp->stats.queue_size++;
1270 * if (hlp->stats.queue_size < hlp->n_running)
1271 * return;
1272 * if (squid_curtime - hlp->last_queue_warn < 600)
1273 * return;
1274 * if (shutting_down || reconfiguring)
1275 * return;
1276 * hlp->last_queue_warn = squid_curtime;
bf8fe701 1277 * debugs(84, 0, "WARNING: All " << hlp->id_name << " processes are busy.");
1278 * debugs(84, 0, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
62e76326 1279 * if (hlp->stats.queue_size > hlp->n_running * 2)
1280 * fatalf("Too many queued %s requests", hlp->id_name);
bf8fe701 1281 * debugs(84, 1, "Consider increasing the number of " << hlp->id_name << " processes in your config file." ); */
94439e4e 1282}
1283
1284
74addf6c 1285static helper_request *
1286Dequeue(helper * hlp)
1287{
1288 dlink_node *link;
1289 helper_request *r = NULL;
62e76326 1290
74addf6c 1291 if ((link = hlp->queue.head)) {
62e76326 1292 r = (helper_request *)link->data;
1293 dlinkDelete(link, &hlp->queue);
1294 memFree(link, MEM_DLINK_NODE);
1295 hlp->stats.queue_size--;
74addf6c 1296 }
62e76326 1297
74addf6c 1298 return r;
1299}
1300
94439e4e 1301static helper_stateful_request *
1302StatefulServerDequeue(helper_stateful_server * srv)
1303{
1304 dlink_node *link;
1305 helper_stateful_request *r = NULL;
62e76326 1306
94439e4e 1307 if ((link = srv->queue.head)) {
62e76326 1308 r = (helper_stateful_request *)link->data;
1309 dlinkDelete(link, &srv->queue);
1310 memFree(link, MEM_DLINK_NODE);
94439e4e 1311 }
62e76326 1312
94439e4e 1313 return r;
1314}
1315
1316static helper_stateful_request *
1317StatefulDequeue(statefulhelper * hlp)
1318{
1319 dlink_node *link;
1320 helper_stateful_request *r = NULL;
62e76326 1321
94439e4e 1322 if ((link = hlp->queue.head)) {
62e76326 1323 r = (helper_stateful_request *)link->data;
1324 dlinkDelete(link, &hlp->queue);
1325 memFree(link, MEM_DLINK_NODE);
1326 hlp->stats.queue_size--;
94439e4e 1327 }
62e76326 1328
94439e4e 1329 return r;
1330}
1331
74addf6c 1332static helper_server *
1333GetFirstAvailable(helper * hlp)
1334{
1335 dlink_node *n;
07eca7e0 1336 helper_server *srv;
1337 helper_server *selected = NULL;
62e76326 1338
fe73896c 1339 if (hlp->n_running == 0)
62e76326 1340 return NULL;
1341
07eca7e0 1342 /* Find "least" loaded helper (approx) */
74addf6c 1343 for (n = hlp->servers.head; n != NULL; n = n->next) {
62e76326 1344 srv = (helper_server *)n->data;
1345
07eca7e0 1346 if (selected && selected->stats.pending <= srv->stats.pending)
62e76326 1347 continue;
1348
d8f10d6a 1349 if (srv->flags.shutdown)
62e76326 1350 continue;
1351
07eca7e0 1352 if (!srv->stats.pending)
1353 return srv;
1354
1355 if (selected) {
1356 selected = srv;
1357 break;
1358 }
1359
1360 selected = srv;
74addf6c 1361 }
62e76326 1362
07eca7e0 1363 /* Check for overload */
1364 if (!selected)
1365 return NULL;
1366
1367 if (selected->stats.pending >= (hlp->concurrency ? hlp->concurrency : 1))
1368 return NULL;
1369
1370 return selected;
74addf6c 1371}
1372
94439e4e 1373static helper_stateful_server *
1374StatefulGetFirstAvailable(statefulhelper * hlp)
1375{
1376 dlink_node *n;
1377 helper_stateful_server *srv = NULL;
bf8fe701 1378 debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp->n_running << ".");
62e76326 1379
94439e4e 1380 if (hlp->n_running == 0)
62e76326 1381 return NULL;
1382
94439e4e 1383 for (n = hlp->servers.head; n != NULL; n = n->next) {
62e76326 1384 srv = (helper_stateful_server *)n->data;
1385
1386 if (srv->flags.busy)
1387 continue;
1388
1389 if (srv->flags.reserved == S_HELPER_RESERVED)
1390 continue;
1391
d8f10d6a 1392 if (srv->flags.shutdown)
62e76326 1393 continue;
1394
1395 if ((hlp->IsAvailable != NULL) && (srv->data != NULL) && !(hlp->IsAvailable(srv->data)))
1396 continue;
1397
1398 return srv;
94439e4e 1399 }
62e76326 1400
bf8fe701 1401 debugs(84, 5, "StatefulGetFirstAvailable: None available.");
94439e4e 1402 return NULL;
1403}
1404
1405
42679bd6 1406static void
1407helperDispatchWriteDone(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
1408{
07eca7e0 1409 helper_server *srv = (helper_server *)data;
1410
2fe7eff9 1411 srv->writebuf->clean();
032785bf 1412 delete srv->writebuf;
1413 srv->writebuf = NULL;
07eca7e0 1414 srv->flags.writing = 0;
1415
1416 if (flag != COMM_OK) {
1417 /* Helper server has crashed */
bf8fe701 1418 debugs(84, 0, "helperDispatch: Helper " << srv->parent->id_name << " #" << srv->index + 1 << " has crashed");
07eca7e0 1419 return;
1420 }
1421
2fe7eff9 1422 if (!srv->wqueue->isNull()) {
07eca7e0 1423 srv->writebuf = srv->wqueue;
032785bf 1424 srv->wqueue = new MemBuf;
07eca7e0 1425 srv->flags.writing = 1;
1426 comm_write(srv->wfd,
032785bf 1427 srv->writebuf->content(),
1428 srv->writebuf->contentSize(),
07eca7e0 1429 helperDispatchWriteDone, /* Handler */
2b663917 1430 srv, NULL); /* Handler-data, freefunc */
07eca7e0 1431 }
42679bd6 1432}
1433
74addf6c 1434static void
1435helperDispatch(helper_server * srv, helper_request * r)
1436{
1437 helper *hlp = srv->parent;
07eca7e0 1438 helper_request **ptr = NULL;
1439 unsigned int slot;
62e76326 1440
fa80a8ef 1441 if (!cbdataReferenceValid(r->data)) {
bf8fe701 1442 debugs(84, 1, "helperDispatch: invalid callback data");
62e76326 1443 helperRequestFree(r);
1444 return;
74addf6c 1445 }
62e76326 1446
07eca7e0 1447 for (slot = 0; slot < (hlp->concurrency ? hlp->concurrency : 1); slot++) {
1448 if (!srv->requests[slot]) {
1449 ptr = &srv->requests[slot];
1450 break;
1451 }
1452 }
1453
1454 assert(ptr);
1455 *ptr = r;
1456 srv->stats.pending += 1;
1457 r->dispatch_time = current_time;
1458
2fe7eff9 1459 if (srv->wqueue->isNull())
1460 srv->wqueue->init();
07eca7e0 1461
1462 if (hlp->concurrency)
2fe7eff9 1463 srv->wqueue->Printf("%d %s", slot, r->buf);
07eca7e0 1464 else
2fe7eff9 1465 srv->wqueue->append(r->buf, strlen(r->buf));
07eca7e0 1466
1467 if (!srv->flags.writing) {
032785bf 1468 assert(NULL == srv->writebuf);
07eca7e0 1469 srv->writebuf = srv->wqueue;
032785bf 1470 srv->wqueue = new MemBuf;
07eca7e0 1471 srv->flags.writing = 1;
1472 comm_write(srv->wfd,
032785bf 1473 srv->writebuf->content(),
1474 srv->writebuf->contentSize(),
07eca7e0 1475 helperDispatchWriteDone, /* Handler */
2b663917 1476 srv, NULL); /* Handler-data, free func */
07eca7e0 1477 }
1478
4a7a3d56 1479 debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index + 1 << ", " << strlen(r->buf) << " bytes");
bf8fe701 1480
74addf6c 1481 srv->stats.uses++;
1482 hlp->stats.requests++;
1483}
1484
42679bd6 1485static void
1486helperStatefulDispatchWriteDone(int fd, char *buf, size_t len, comm_err_t flag,
62e76326 1487 int xerrno, void *data)
42679bd6 1488{
62e76326 1489 /* nothing! */
42679bd6 1490}
1491
1492
94439e4e 1493static void
1494helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r)
1495{
1496 statefulhelper *hlp = srv->parent;
62e76326 1497
fa80a8ef 1498 if (!cbdataReferenceValid(r->data)) {
bf8fe701 1499 debugs(84, 1, "helperStatefulDispatch: invalid callback data");
62e76326 1500 helperStatefulRequestFree(r);
1501 return;
94439e4e 1502 }
62e76326 1503
bf8fe701 1504 debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index + 1);
62e76326 1505
94439e4e 1506 if (r->placeholder == 1) {
62e76326 1507 /* a callback is needed before this request can _use_ a helper. */
1508 /* we don't care about releasing/deferring this helper. The request NEVER
1509 * gets to the helper. So we throw away the return code */
1510 r->callback(r->data, srv, NULL);
1511 /* throw away the placeholder */
1512 helperStatefulRequestFree(r);
1513 /* and push the queue. Note that the callback may have submitted a new
1514 * request to the helper which is why we test for the request*/
1515
1516 if (srv->request == NULL) {
1517 if (srv->flags.shutdown
1518 && srv->flags.reserved == S_HELPER_FREE
1519 && !srv->deferred_requests) {
1520 int wfd = srv->wfd;
1521 srv->wfd = -1;
1522 comm_close(wfd);
1523 } else {
1524 if (srv->queue.head)
1525 helperStatefulServerKickQueue(srv);
1526 else
1527 helperStatefulKickQueue(hlp);
1528 }
1529 }
1530
1531 return;
94439e4e 1532 }
62e76326 1533
94439e4e 1534 srv->flags.busy = 1;
1535 srv->request = r;
1536 srv->dispatch_time = current_time;
42679bd6 1537 comm_write(srv->wfd,
62e76326 1538 r->buf,
1539 strlen(r->buf),
1540 helperStatefulDispatchWriteDone, /* Handler */
2b663917 1541 hlp, NULL); /* Handler-data, free func */
bf8fe701 1542 debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
1543 hlp->id_name << " #" << srv->index + 1 << ", " <<
1544 (int) strlen(r->buf) << " bytes");
1545
94439e4e 1546 srv->stats.uses++;
1547 hlp->stats.requests++;
1548}
1549
1550
74addf6c 1551static void
1552helperKickQueue(helper * hlp)
1553{
1554 helper_request *r;
1555 helper_server *srv;
62e76326 1556
74addf6c 1557 while ((srv = GetFirstAvailable(hlp)) && (r = Dequeue(hlp)))
62e76326 1558 helperDispatch(srv, r);
74addf6c 1559}
1560
94439e4e 1561static void
1562helperStatefulKickQueue(statefulhelper * hlp)
1563{
1564 helper_stateful_request *r;
1565 helper_stateful_server *srv;
62e76326 1566
94439e4e 1567 while ((srv = StatefulGetFirstAvailable(hlp)) && (r = StatefulDequeue(hlp)))
62e76326 1568 helperStatefulDispatch(srv, r);
94439e4e 1569}
1570
1571static void
1572helperStatefulServerKickQueue(helper_stateful_server * srv)
1573{
1574 helper_stateful_request *r;
62e76326 1575
94439e4e 1576 if ((r = StatefulServerDequeue(srv)))
62e76326 1577 helperStatefulDispatch(srv, r);
94439e4e 1578}
1579
74addf6c 1580static void
1581helperRequestFree(helper_request * r)
1582{
fa80a8ef 1583 cbdataReferenceDone(r->data);
74addf6c 1584 xfree(r->buf);
00d77d6b 1585 delete r;
51ee7c82 1586}
1587
94439e4e 1588static void
1589helperStatefulRequestFree(helper_stateful_request * r)
1590{
fa80a8ef 1591 cbdataReferenceDone(r->data);
94439e4e 1592 xfree(r->buf);
00d77d6b 1593 delete r;
51ee7c82 1594}