]> git.ipfire.org Git - thirdparty/squid.git/blame - src/helper.cc
Bug 2679: strsep and strtoll should be bundled
[thirdparty/squid.git] / src / helper.cc
CommitLineData
f740a279 1/*
262a0e14 2 * $Id$
f740a279 3 *
17bb3486 4 * DEBUG: section 84 Helper process maintenance
f740a279 5 * AUTHOR: Harvest Derived?
6 *
2b6662ba 7 * SQUID Web Proxy Cache http://www.squid-cache.org/
f740a279 8 * ----------------------------------------------------------
9 *
2b6662ba 10 * Squid is the result of efforts by numerous individuals from
11 * the Internet community; see the CONTRIBUTORS file for full
12 * details. Many organizations have provided support for Squid's
13 * development; see the SPONSORS file for full details. Squid is
14 * Copyrighted (C) 2001 by the Regents of the University of
15 * California; see the COPYRIGHT file for full details. Squid
16 * incorporates software developed and/or copyrighted by other
17 * sources; see the CREDITS file for full details.
f740a279 18 *
19 * This program is free software; you can redistribute it and/or modify
20 * it under the terms of the GNU General Public License as published by
21 * the Free Software Foundation; either version 2 of the License, or
22 * (at your option) any later version.
26ac0430 23 *
f740a279 24 * This program is distributed in the hope that it will be useful,
25 * but WITHOUT ANY WARRANTY; without even the implied warranty of
26 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
27 * GNU General Public License for more details.
26ac0430 28 *
f740a279 29 * You should have received a copy of the GNU General Public License
30 * along with this program; if not, write to the Free Software
31 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
32 *
33 */
34
74addf6c 35#include "squid.h"
51ee7c82 36#include "helper.h"
985c86bc 37#include "SquidTime.h"
e6ccf245 38#include "Store.h"
42679bd6 39#include "comm.h"
0eb49b6d 40#include "MemBuf.h"
d295d770 41#include "wordlist.h"
74addf6c 42
43#define HELPER_MAX_ARGS 64
44
7f37478b
AJ
45/* size of helper read buffer (maximum?). no reason given for this size */
46/* though it has been seen to be too short for some requests */
47/* it is dynamic, so increasng should not have side effects */
48#define BUF_8KB 8192
49
c4b7a5a9 50static IOCB helperHandleRead;
51static IOCB helperStatefulHandleRead;
1f5f60dd 52static PF helperServerFree;
94439e4e 53static PF helperStatefulServerFree;
74addf6c 54static void Enqueue(helper * hlp, helper_request *);
55static helper_request *Dequeue(helper * hlp);
94439e4e 56static helper_stateful_request *StatefulDequeue(statefulhelper * hlp);
74addf6c 57static helper_server *GetFirstAvailable(helper * hlp);
94439e4e 58static helper_stateful_server *StatefulGetFirstAvailable(statefulhelper * hlp);
74addf6c 59static void helperDispatch(helper_server * srv, helper_request * r);
94439e4e 60static void helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r);
74addf6c 61static void helperKickQueue(helper * hlp);
94439e4e 62static void helperStatefulKickQueue(statefulhelper * hlp);
74addf6c 63static void helperRequestFree(helper_request * r);
94439e4e 64static void helperStatefulRequestFree(helper_stateful_request * r);
65static void StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r);
66static helper_stateful_request *StatefulServerDequeue(helper_stateful_server * srv);
67static void StatefulServerEnqueue(helper_stateful_server * srv, helper_stateful_request * r);
68static void helperStatefulServerKickQueue(helper_stateful_server * srv);
9522b380 69static bool helperStartStats(StoreEntry *sentry, void *hlp, const char *label);
70
74addf6c 71
aa839030 72CBDATA_TYPE(helper);
73CBDATA_TYPE(helper_server);
74CBDATA_TYPE(statefulhelper);
75CBDATA_TYPE(helper_stateful_server);
76
74addf6c 77void
78helperOpenServers(helper * hlp)
79{
80 char *s;
81 char *progname;
82 char *shortname;
83 char *procname;
1b8af627 84 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
74addf6c 85 char fd_note_buf[FD_DESC_SZ];
86 helper_server *srv;
87 int nargs = 0;
88 int k;
b5d712b5 89 pid_t pid;
74addf6c 90 int rfd;
91 int wfd;
b5d712b5 92 void * hIpc;
74addf6c 93 wordlist *w;
62e76326 94
74addf6c 95 if (hlp->cmdline == NULL)
62e76326 96 return;
97
74addf6c 98 progname = hlp->cmdline->key;
62e76326 99
74addf6c 100 if ((s = strrchr(progname, '/')))
62e76326 101 shortname = xstrdup(s + 1);
74addf6c 102 else
62e76326 103 shortname = xstrdup(progname);
104
5a5b2c56
AJ
105 /* dont ever start more than hlp->n_to_start processes. */
106 int need_new = hlp->n_to_start - hlp->n_running;
107
108 debugs(84, 1, "helperOpenServers: Starting " << need_new << "/" << hlp->n_to_start << " '" << shortname << "' processes");
109
110 if(need_new < 1) {
111 debugs(84, 1, "helperOpenServers: No '" << shortname << "' processes needed.");
112 }
62e76326 113
e6ccf245 114 procname = (char *)xmalloc(strlen(shortname) + 3);
62e76326 115
74addf6c 116 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
62e76326 117
74addf6c 118 args[nargs++] = procname;
62e76326 119
74addf6c 120 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next)
62e76326 121 args[nargs++] = w->key;
122
74addf6c 123 args[nargs++] = NULL;
62e76326 124
74addf6c 125 assert(nargs <= HELPER_MAX_ARGS);
62e76326 126
5a5b2c56 127 for (k = 0; k < need_new; k++) {
62e76326 128 getCurrentTime();
129 rfd = wfd = -1;
b5d712b5 130 pid = ipcCreate(hlp->ipc_type,
131 progname,
132 args,
133 shortname,
cc192b50 134 hlp->addr,
b5d712b5 135 &rfd,
136 &wfd,
137 &hIpc);
138
139 if (pid < 0) {
bf8fe701 140 debugs(84, 1, "WARNING: Cannot run '" << progname << "' process.");
62e76326 141 continue;
142 }
143
144 hlp->n_running++;
d8f10d6a 145 hlp->n_active++;
b5d712b5 146 CBDATA_INIT_TYPE(helper_server);
62e76326 147 srv = cbdataAlloc(helper_server);
b5d712b5 148 srv->hIpc = hIpc;
149 srv->pid = pid;
62e76326 150 srv->index = k;
cc192b50 151 srv->addr = hlp->addr;
62e76326 152 srv->rfd = rfd;
153 srv->wfd = wfd;
7f37478b 154 srv->rbuf = (char *)memAllocBuf(BUF_8KB, &srv->rbuf_sz);
90bed1c4 155 srv->wqueue = new MemBuf;
07eca7e0 156 srv->roffset = 0;
157 srv->requests = (helper_request **)xcalloc(hlp->concurrency ? hlp->concurrency : 1, sizeof(*srv->requests));
62e76326 158 srv->parent = cbdataReference(hlp);
159 dlinkAddTail(srv, &srv->link, &hlp->servers);
160
161 if (rfd == wfd) {
162 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
163 fd_note(rfd, fd_note_buf);
164 } else {
165 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
166 fd_note(rfd, fd_note_buf);
167 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
168 fd_note(wfd, fd_note_buf);
169 }
170
171 commSetNonBlocking(rfd);
172
173 if (wfd != rfd)
174 commSetNonBlocking(wfd);
175
176 comm_add_close_handler(rfd, helperServerFree, srv);
07eca7e0 177
178 comm_read(srv->rfd, srv->rbuf, srv->rbuf_sz - 1, helperHandleRead, srv);
74addf6c 179 }
62e76326 180
5ea33fce 181 hlp->last_restart = squid_curtime;
74addf6c 182 safe_free(shortname);
183 safe_free(procname);
838b993c 184 helperKickQueue(hlp);
74addf6c 185}
186
4f0ef8e8 187/*
188 * DPW 2007-05-08
26ac0430 189 *
4f0ef8e8 190 * helperStatefulOpenServers: create the stateful child helper processes
191 */
94439e4e 192void
193helperStatefulOpenServers(statefulhelper * hlp)
194{
94439e4e 195 char *shortname;
1b8af627 196 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
94439e4e 197 char fd_note_buf[FD_DESC_SZ];
94439e4e 198 int nargs = 0;
62e76326 199
94439e4e 200 if (hlp->cmdline == NULL)
62e76326 201 return;
202
4f0ef8e8 203 char *progname = hlp->cmdline->key;
62e76326 204
4f0ef8e8 205 char *s;
94439e4e 206 if ((s = strrchr(progname, '/')))
62e76326 207 shortname = xstrdup(s + 1);
94439e4e 208 else
62e76326 209 shortname = xstrdup(progname);
210
5a5b2c56
AJ
211 /* dont ever start more than hlp->n_to_start processes. */
212 int need_new = hlp->n_to_start - hlp->n_running;
213
214 debugs(84, 1, "helperOpenServers: Starting " << need_new << "/" << hlp->n_to_start << " '" << shortname << "' processes");
215
216 if(need_new < 1) {
217 debugs(84, 1, "helperStatefulOpenServers: No '" << shortname << "' processes needed.");
218 }
62e76326 219
4f0ef8e8 220 char *procname = (char *)xmalloc(strlen(shortname) + 3);
62e76326 221
94439e4e 222 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
62e76326 223
94439e4e 224 args[nargs++] = procname;
62e76326 225
4f0ef8e8 226 for (wordlist *w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next)
62e76326 227 args[nargs++] = w->key;
228
94439e4e 229 args[nargs++] = NULL;
62e76326 230
94439e4e 231 assert(nargs <= HELPER_MAX_ARGS);
62e76326 232
5a5b2c56 233 for (int k = 0; k < need_new; k++) {
62e76326 234 getCurrentTime();
26ac0430
AJ
235 int rfd = -1;
236 int wfd = -1;
237 void * hIpc;
4f0ef8e8 238 pid_t pid = ipcCreate(hlp->ipc_type,
26ac0430
AJ
239 progname,
240 args,
241 shortname,
242 hlp->addr,
243 &rfd,
244 &wfd,
245 &hIpc);
b5d712b5 246
247 if (pid < 0) {
bf8fe701 248 debugs(84, 1, "WARNING: Cannot run '" << progname << "' process.");
62e76326 249 continue;
250 }
251
252 hlp->n_running++;
d8f10d6a 253 hlp->n_active++;
b5d712b5 254 CBDATA_INIT_TYPE(helper_stateful_server);
4f0ef8e8 255 helper_stateful_server *srv = cbdataAlloc(helper_stateful_server);
b5d712b5 256 srv->hIpc = hIpc;
257 srv->pid = pid;
62e76326 258 srv->flags.reserved = S_HELPER_FREE;
259 srv->deferred_requests = 0;
260 srv->stats.deferbyfunc = 0;
261 srv->stats.deferbycb = 0;
262 srv->stats.submits = 0;
263 srv->stats.releases = 0;
264 srv->index = k;
cc192b50 265 srv->addr = hlp->addr;
62e76326 266 srv->rfd = rfd;
267 srv->wfd = wfd;
7f37478b 268 srv->rbuf = (char *)memAllocBuf(BUF_8KB, &srv->rbuf_sz);
07eca7e0 269 srv->roffset = 0;
62e76326 270 srv->parent = cbdataReference(hlp);
271
272 if (hlp->datapool != NULL)
b001e822 273 srv->data = hlp->datapool->alloc();
62e76326 274
275 dlinkAddTail(srv, &srv->link, &hlp->servers);
276
277 if (rfd == wfd) {
278 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
279 fd_note(rfd, fd_note_buf);
280 } else {
281 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
282 fd_note(rfd, fd_note_buf);
283 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
284 fd_note(wfd, fd_note_buf);
285 }
286
287 commSetNonBlocking(rfd);
288
289 if (wfd != rfd)
290 commSetNonBlocking(wfd);
291
292 comm_add_close_handler(rfd, helperStatefulServerFree, srv);
07eca7e0 293
294 comm_read(srv->rfd, srv->rbuf, srv->rbuf_sz - 1, helperStatefulHandleRead, srv);
94439e4e 295 }
62e76326 296
5ea33fce 297 hlp->last_restart = squid_curtime;
94439e4e 298 safe_free(shortname);
299 safe_free(procname);
300 helperStatefulKickQueue(hlp);
301}
302
303
74addf6c 304void
305helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
306{
5b5f9257 307 if (hlp == NULL) {
bf8fe701 308 debugs(84, 3, "helperSubmit: hlp == NULL");
62e76326 309 callback(data, NULL);
310 return;
5b5f9257 311 }
62e76326 312
eff7218b 313 helper_request *r = new helper_request;
314 helper_server *srv;
315
74addf6c 316 r->callback = callback;
fa80a8ef 317 r->data = cbdataReference(data);
74addf6c 318 r->buf = xstrdup(buf);
62e76326 319
74addf6c 320 if ((srv = GetFirstAvailable(hlp)))
62e76326 321 helperDispatch(srv, r);
74addf6c 322 else
62e76326 323 Enqueue(hlp, r);
324
bf8fe701 325 debugs(84, 9, "helperSubmit: " << buf);
94439e4e 326}
327
721b0310 328/* lastserver = "server last used as part of a deferred or reserved
329 * request sequence"
330 */
94439e4e 331void
332helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPSCB * callback, void *data, helper_stateful_server * lastserver)
333{
94439e4e 334 if (hlp == NULL) {
bf8fe701 335 debugs(84, 3, "helperStatefulSubmit: hlp == NULL");
62e76326 336 callback(data, 0, NULL);
337 return;
94439e4e 338 }
62e76326 339
eff7218b 340 helper_stateful_request *r = new helper_stateful_request;
eff7218b 341
94439e4e 342 r->callback = callback;
fa80a8ef 343 r->data = cbdataReference(data);
62e76326 344
721b0310 345 if (buf != NULL) {
62e76326 346 r->buf = xstrdup(buf);
347 r->placeholder = 0;
721b0310 348 } else {
62e76326 349 r->buf = NULL;
350 r->placeholder = 1;
721b0310 351 }
62e76326 352
94439e4e 353 if ((buf != NULL) && lastserver) {
bf8fe701 354 debugs(84, 5, "StatefulSubmit with lastserver " << lastserver);
62e76326 355 /* the queue doesn't count for this assert because queued requests
356 * have already gone through here and been tested.
26ac0430 357 * It's legal to have deferred_requests == 0 and queue entries
62e76326 358 * and status of S_HELPEER_DEFERRED.
359 * BUT: It's not legal to submit a new request w/lastserver in
360 * that state.
361 */
362 assert(!(lastserver->deferred_requests == 0 &&
363 lastserver->flags.reserved == S_HELPER_DEFERRED));
364
365 if (lastserver->flags.reserved != S_HELPER_RESERVED) {
366 lastserver->stats.submits++;
367 lastserver->deferred_requests--;
368 }
369
370 if (!(lastserver->request)) {
bf8fe701 371 debugs(84, 5, "StatefulSubmit dispatching");
62e76326 372 helperStatefulDispatch(lastserver, r);
373 } else {
bf8fe701 374 debugs(84, 5, "StatefulSubmit queuing");
62e76326 375 StatefulServerEnqueue(lastserver, r);
376 }
94439e4e 377 } else {
26ac0430 378 helper_stateful_server *srv;
62e76326 379 if ((srv = StatefulGetFirstAvailable(hlp))) {
380 helperStatefulDispatch(srv, r);
381 } else
382 StatefulEnqueue(hlp, r);
94439e4e 383 }
62e76326 384
bf8fe701 385 debugs(84, 9, "helperStatefulSubmit: placeholder: '" << r->placeholder << "', buf '" << buf << "'.");
94439e4e 386}
387
4f0ef8e8 388/*
389 * helperStatefulDefer
390 *
391 * find and add a deferred request to a helper
392 */
94439e4e 393helper_stateful_server *
394helperStatefulDefer(statefulhelper * hlp)
94439e4e 395{
4f0ef8e8 396 if (hlp == NULL) {
bf8fe701 397 debugs(84, 3, "helperStatefulDefer: hlp == NULL");
62e76326 398 return NULL;
94439e4e 399 }
62e76326 400
4f0ef8e8 401 debugs(84, 5, "helperStatefulDefer: Running servers " << hlp->n_running);
62e76326 402
4f0ef8e8 403 if (hlp->n_running == 0) {
bf8fe701 404 debugs(84, 1, "helperStatefulDefer: No running servers!. ");
62e76326 405 return NULL;
94439e4e 406 }
62e76326 407
4f0ef8e8 408 helper_stateful_server *rv = StatefulGetFirstAvailable(hlp);
62e76326 409
4f0ef8e8 410 if (rv == NULL) {
62e76326 411 /*
412 * all currently busy; loop through servers and find server
413 * with the shortest queue
414 */
415
4f0ef8e8 416 for (dlink_node *n = hlp->servers.head; n != NULL; n = n->next) {
417 helper_stateful_server *srv = (helper_stateful_server *)n->data;
62e76326 418
419 if (srv->flags.reserved == S_HELPER_RESERVED)
420 continue;
421
b902a58c 422 if (!srv->flags.shutdown)
62e76326 423 continue;
424
425 if ((hlp->IsAvailable != NULL) && (srv->data != NULL) &&
426 !(hlp->IsAvailable(srv->data)))
427 continue;
428
429 if ((rv != NULL) && (rv->deferred_requests < srv->deferred_requests))
430 continue;
431
432 rv = srv;
433 }
0a706c69 434 }
62e76326 435
4f0ef8e8 436 if (rv == NULL) {
bf8fe701 437 debugs(84, 1, "helperStatefulDefer: None available.");
62e76326 438 return NULL;
94439e4e 439 }
62e76326 440
60d096f4 441 /* consistency check:
442 * when the deferred count is 0,
443 * submits + releases == deferbyfunc + deferbycb
444 * Or in english, when there are no deferred requests, the amount
445 * we have submitted to the queue or cancelled must equal the amount
446 * we have said we wanted to be able to submit or cancel
447 */
448 if (rv->deferred_requests == 0)
62e76326 449 assert(rv->stats.submits + rv->stats.releases ==
450 rv->stats.deferbyfunc + rv->stats.deferbycb);
60d096f4 451
94439e4e 452 rv->flags.reserved = S_HELPER_DEFERRED;
62e76326 453
94439e4e 454 rv->deferred_requests++;
62e76326 455
60d096f4 456 rv->stats.deferbyfunc++;
62e76326 457
94439e4e 458 return rv;
459}
460
461void
462helperStatefulReset(helper_stateful_server * srv)
62e76326 463/* puts this helper back in the queue. the calling app is required to
94439e4e 464 * manage the state in the helper.
465 */
466{
467 statefulhelper *hlp = srv->parent;
4f0ef8e8 468 helper_stateful_request *r = srv->request;
62e76326 469
4f0ef8e8 470 if (r != NULL) {
62e76326 471 /* reset attempt DURING an outstaning request */
bf8fe701 472 debugs(84, 1, "helperStatefulReset: RESET During request " << hlp->id_name << " ");
62e76326 473 srv->flags.busy = 0;
07eca7e0 474 srv->roffset = 0;
62e76326 475 helperStatefulRequestFree(r);
476 srv->request = NULL;
94439e4e 477 }
62e76326 478
94439e4e 479 srv->flags.busy = 0;
62e76326 480
4f0ef8e8 481 if (srv->queue.head) {
62e76326 482 srv->flags.reserved = S_HELPER_DEFERRED;
483 helperStatefulServerKickQueue(srv);
4f0ef8e8 484 } else {
62e76326 485 srv->flags.reserved = S_HELPER_FREE;
486
487 if ((srv->parent->OnEmptyQueue != NULL) && (srv->data))
488 srv->parent->OnEmptyQueue(srv->data);
489
490 helperStatefulKickQueue(hlp);
94439e4e 491 }
492}
493
4f0ef8e8 494/*
495 * DPW 2007-05-08
496 *
497 * helperStatefulReleaseServer tells the helper that whoever was
498 * using it no longer needs its services.
499 *
500 * If the state is S_HELPER_DEFERRED, decrease the deferred count.
501 * If the count goes to zero, then it can become S_HELPER_FREE.
502 *
503 * If the state is S_HELPER_RESERVED, then it should always
504 * become S_HELPER_FREE.
505 */
94439e4e 506void
507helperStatefulReleaseServer(helper_stateful_server * srv)
94439e4e 508{
4f0ef8e8 509 debugs(84, 3, HERE << "srv-" << srv->index << " flags.reserved = " << srv->flags.reserved);
be29fa07 510 if (srv->flags.reserved == S_HELPER_FREE)
511 return;
4f0ef8e8 512
60d096f4 513 srv->stats.releases++;
62e76326 514
4f0ef8e8 515 if (srv->flags.reserved == S_HELPER_DEFERRED) {
62e76326 516 assert(srv->deferred_requests);
517 srv->deferred_requests--;
26ac0430
AJ
518 if (srv->deferred_requests) {
519 debugs(0,0,HERE << "helperStatefulReleaseServer srv->deferred_requests=" << srv->deferred_requests);
520 return;
521 }
522 if (srv->queue.head) {
523 debugs(0,0,HERE << "helperStatefulReleaseServer srv->queue.head not NULL");
524 return;
525 }
60d096f4 526 }
62e76326 527
4f0ef8e8 528 srv->flags.reserved = S_HELPER_FREE;
529 if (srv->parent->OnEmptyQueue != NULL && srv->data)
26ac0430 530 srv->parent->OnEmptyQueue(srv->data);
94439e4e 531}
532
533void *
534helperStatefulServerGetData(helper_stateful_server * srv)
535/* return a pointer to the stateful routines data area */
536{
537 return srv->data;
74addf6c 538}
539
540void
9522b380 541helperStats(StoreEntry * sentry, helper * hlp, const char *label)
74addf6c 542{
9522b380 543 if (!helperStartStats(sentry, hlp, label))
544 return;
545
0a706c69 546 storeAppendPrintf(sentry, "program: %s\n",
62e76326 547 hlp->cmdline->key);
74addf6c 548 storeAppendPrintf(sentry, "number running: %d of %d\n",
62e76326 549 hlp->n_running, hlp->n_to_start);
74addf6c 550 storeAppendPrintf(sentry, "requests sent: %d\n",
62e76326 551 hlp->stats.requests);
74addf6c 552 storeAppendPrintf(sentry, "replies received: %d\n",
62e76326 553 hlp->stats.replies);
74addf6c 554 storeAppendPrintf(sentry, "queue length: %d\n",
62e76326 555 hlp->stats.queue_size);
f9598528 556 storeAppendPrintf(sentry, "avg service time: %d msec\n",
557 hlp->stats.avg_svc_time);
74addf6c 558 storeAppendPrintf(sentry, "\n");
09c1ece1 559 storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%s\t%7s\t%7s\t%7s\n",
62e76326 560 "#",
561 "FD",
562 "PID",
563 "# Requests",
564 "Flags",
565 "Time",
566 "Offset",
567 "Request");
568
4f0ef8e8 569 for (dlink_node *link = hlp->servers.head; link; link = link->next) {
07eca7e0 570 helper_server *srv = (helper_server*)link->data;
f9598528 571 double tt = 0.001 * (srv->requests[0] ? tvSubMsec(srv->requests[0]->dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
d8f10d6a 572 storeAppendPrintf(sentry, "%7d\t%7d\t%7d\t%11d\t%c%c%c%c\t%7.3f\t%7d\t%s\n",
62e76326 573 srv->index + 1,
574 srv->rfd,
575 srv->pid,
576 srv->stats.uses,
07eca7e0 577 srv->stats.pending ? 'B' : ' ',
578 srv->flags.writing ? 'W' : ' ',
62e76326 579 srv->flags.closing ? 'C' : ' ',
580 srv->flags.shutdown ? 'S' : ' ',
581 tt < 0.0 ? 0.0 : tt,
07eca7e0 582 (int) srv->roffset,
583 srv->requests[0] ? log_quote(srv->requests[0]->buf) : "(none)");
74addf6c 584 }
62e76326 585
74addf6c 586 storeAppendPrintf(sentry, "\nFlags key:\n\n");
74addf6c 587 storeAppendPrintf(sentry, " B = BUSY\n");
f9598528 588 storeAppendPrintf(sentry, " W = WRITING\n");
74addf6c 589 storeAppendPrintf(sentry, " C = CLOSING\n");
590 storeAppendPrintf(sentry, " S = SHUTDOWN\n");
591}
592
94439e4e 593void
9522b380 594helperStatefulStats(StoreEntry * sentry, statefulhelper * hlp, const char *label)
94439e4e 595{
9522b380 596 if (!helperStartStats(sentry, hlp, label))
597 return;
598
cf154edc 599 storeAppendPrintf(sentry, "program: %s\n",
600 hlp->cmdline->key);
94439e4e 601 storeAppendPrintf(sentry, "number running: %d of %d\n",
62e76326 602 hlp->n_running, hlp->n_to_start);
94439e4e 603 storeAppendPrintf(sentry, "requests sent: %d\n",
62e76326 604 hlp->stats.requests);
94439e4e 605 storeAppendPrintf(sentry, "replies received: %d\n",
62e76326 606 hlp->stats.replies);
94439e4e 607 storeAppendPrintf(sentry, "queue length: %d\n",
62e76326 608 hlp->stats.queue_size);
94439e4e 609 storeAppendPrintf(sentry, "avg service time: %d msec\n",
62e76326 610 hlp->stats.avg_svc_time);
94439e4e 611 storeAppendPrintf(sentry, "\n");
0a706c69 612 storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%20s\t%s\t%7s\t%7s\t%7s\n",
62e76326 613 "#",
614 "FD",
615 "PID",
616 "# Requests",
617 "# Deferred Requests",
618 "Flags",
619 "Time",
620 "Offset",
621 "Request");
622
4f0ef8e8 623 for (dlink_node *link = hlp->servers.head; link; link = link->next) {
624 helper_stateful_server *srv = (helper_stateful_server *)link->data;
625 double tt = 0.001 * tvSubMsec(srv->dispatch_time,
26ac0430 626 srv->flags.busy ? current_time : srv->answer_time);
b902a58c 627 storeAppendPrintf(sentry, "%7d\t%7d\t%7d\t%11d\t%20d\t%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
62e76326 628 srv->index + 1,
629 srv->rfd,
630 srv->pid,
631 srv->stats.uses,
632 (int) srv->deferred_requests,
62e76326 633 srv->flags.busy ? 'B' : ' ',
634 srv->flags.closing ? 'C' : ' ',
4f0ef8e8 635 srv->flags.reserved == S_HELPER_RESERVED ? 'R' : (srv->flags.reserved == S_HELPER_DEFERRED ? 'D' : ' '),
62e76326 636 srv->flags.shutdown ? 'S' : ' ',
637 srv->request ? (srv->request->placeholder ? 'P' : ' ') : ' ',
638 tt < 0.0 ? 0.0 : tt,
07eca7e0 639 (int) srv->roffset,
62e76326 640 srv->request ? log_quote(srv->request->buf) : "(none)");
94439e4e 641 }
62e76326 642
94439e4e 643 storeAppendPrintf(sentry, "\nFlags key:\n\n");
94439e4e 644 storeAppendPrintf(sentry, " B = BUSY\n");
645 storeAppendPrintf(sentry, " C = CLOSING\n");
646 storeAppendPrintf(sentry, " R = RESERVED or DEFERRED\n");
647 storeAppendPrintf(sentry, " S = SHUTDOWN\n");
5d146f7d 648 storeAppendPrintf(sentry, " P = PLACEHOLDER\n");
94439e4e 649}
650
74addf6c 651void
652helperShutdown(helper * hlp)
653{
c68e9c6b 654 dlink_node *link = hlp->servers.head;
2d5d5261 655#ifdef _SQUID_MSWIN_
656
657 HANDLE hIpc;
658 pid_t pid;
659 int no;
660#endif
62e76326 661
c68e9c6b 662 while (link) {
62e76326 663 helper_server *srv;
664 srv = (helper_server *)link->data;
665 link = link->next;
666
8cfc76db 667 if (srv->flags.shutdown) {
bf8fe701 668 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " has already SHUT DOWN.");
62e76326 669 continue;
670 }
671
d8f10d6a 672 hlp->n_active--;
673 assert(hlp->n_active >= 0);
674
62e76326 675 srv->flags.shutdown = 1; /* request it to shut itself down */
676
d8f10d6a 677 if (srv->flags.closing) {
bf8fe701 678 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is CLOSING.");
62e76326 679 continue;
680 }
681
d8f10d6a 682 if (srv->stats.pending) {
bf8fe701 683 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is BUSY.");
62e76326 684 continue;
685 }
686
687 srv->flags.closing = 1;
2d5d5261 688#ifdef _SQUID_MSWIN_
689
690 hIpc = srv->hIpc;
691 pid = srv->pid;
692 no = srv->index + 1;
693 shutdown(srv->wfd, SD_BOTH);
694#endif
695
bf8fe701 696 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " shutting down.");
62e76326 697 /* the rest of the details is dealt with in the helperServerFree
698 * close handler
699 */
700 comm_close(srv->rfd);
2d5d5261 701#ifdef _SQUID_MSWIN_
702
703 if (hIpc) {
704 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
705 getCurrentTime();
bf8fe701 706 debugs(84, 1, "helperShutdown: WARNING: " << hlp->id_name <<
707 " #" << no << " (" << hlp->cmdline->key << "," <<
708 (long int)pid << ") didn't exit in 5 seconds");
709
2d5d5261 710 }
711
712 CloseHandle(hIpc);
713 }
714
715#endif
716
74addf6c 717 }
718}
719
94439e4e 720void
721helperStatefulShutdown(statefulhelper * hlp)
722{
723 dlink_node *link = hlp->servers.head;
724 helper_stateful_server *srv;
2d5d5261 725#ifdef _SQUID_MSWIN_
726
727 HANDLE hIpc;
728 pid_t pid;
729 int no;
730#endif
62e76326 731
94439e4e 732 while (link) {
62e76326 733 srv = (helper_stateful_server *)link->data;
734 link = link->next;
735
8cfc76db 736 if (srv->flags.shutdown) {
bf8fe701 737 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " has already SHUT DOWN.");
62e76326 738 continue;
739 }
740
d8f10d6a 741 hlp->n_active--;
742 assert(hlp->n_active >= 0);
62e76326 743 srv->flags.shutdown = 1; /* request it to shut itself down */
744
745 if (srv->flags.busy) {
bf8fe701 746 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is BUSY.");
62e76326 747 continue;
748 }
749
750 if (srv->flags.closing) {
bf8fe701 751 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is CLOSING.");
62e76326 752 continue;
753 }
754
755 if (srv->flags.reserved != S_HELPER_FREE) {
bf8fe701 756 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is RESERVED.");
62e76326 757 continue;
758 }
759
760 if (srv->deferred_requests) {
bf8fe701 761 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " has DEFERRED requests.");
62e76326 762 continue;
763 }
764
765 srv->flags.closing = 1;
2d5d5261 766#ifdef _SQUID_MSWIN_
767
768 hIpc = srv->hIpc;
769 pid = srv->pid;
770 no = srv->index + 1;
771 shutdown(srv->wfd, SD_BOTH);
772#endif
773
bf8fe701 774 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " shutting down.");
775
62e76326 776 /* the rest of the details is dealt with in the helperStatefulServerFree
777 * close handler
778 */
779 comm_close(srv->rfd);
2d5d5261 780#ifdef _SQUID_MSWIN_
781
782 if (hIpc) {
783 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
784 getCurrentTime();
bf8fe701 785 debugs(84, 1, "helperShutdown: WARNING: " << hlp->id_name <<
786 " #" << no << " (" << hlp->cmdline->key << "," <<
787 (long int)pid << ") didn't exit in 5 seconds");
2d5d5261 788 }
789
790 CloseHandle(hIpc);
791 }
792
793#endif
794
94439e4e 795 }
796}
797
798
1f5f60dd 799helper *
800helperCreate(const char *name)
801{
28c60158 802 helper *hlp;
aa839030 803 CBDATA_INIT_TYPE(helper);
72711e31 804 hlp = cbdataAlloc(helper);
1f5f60dd 805 hlp->id_name = name;
806 return hlp;
807}
808
94439e4e 809statefulhelper *
810helperStatefulCreate(const char *name)
811{
812 statefulhelper *hlp;
aa839030 813 CBDATA_INIT_TYPE(statefulhelper);
72711e31 814 hlp = cbdataAlloc(statefulhelper);
94439e4e 815 hlp->id_name = name;
816 return hlp;
817}
818
819
1f5f60dd 820void
821helperFree(helper * hlp)
822{
5dae8514 823 if (!hlp)
62e76326 824 return;
825
1f5f60dd 826 /* note, don't free hlp->name, it probably points to static memory */
fe73896c 827 if (hlp->queue.head)
bf8fe701 828 debugs(84, 0, "WARNING: freeing " << hlp->id_name << " helper with " <<
829 hlp->stats.queue_size << " requests queued");
62e76326 830
1f5f60dd 831 cbdataFree(hlp);
832}
833
94439e4e 834void
835helperStatefulFree(statefulhelper * hlp)
836{
5dae8514 837 if (!hlp)
62e76326 838 return;
839
94439e4e 840 /* note, don't free hlp->name, it probably points to static memory */
841 if (hlp->queue.head)
bf8fe701 842 debugs(84, 0, "WARNING: freeing " << hlp->id_name << " helper with " <<
843 hlp->stats.queue_size << " requests queued");
62e76326 844
94439e4e 845 cbdataFree(hlp);
846}
847
848
74addf6c 849/* ====================================================================== */
850/* LOCAL FUNCTIONS */
851/* ====================================================================== */
852
853static void
1f5f60dd 854helperServerFree(int fd, void *data)
74addf6c 855{
e6ccf245 856 helper_server *srv = (helper_server *)data;
74addf6c 857 helper *hlp = srv->parent;
ac750329 858 helper_request *r;
07eca7e0 859 int i, concurrency = hlp->concurrency;
860
861 if (!concurrency)
862 concurrency = 1;
863
74addf6c 864 assert(srv->rfd == fd);
62e76326 865
07eca7e0 866 if (srv->rbuf) {
867 memFreeBuf(srv->rbuf_sz, srv->rbuf);
868 srv->rbuf = NULL;
74addf6c 869 }
62e76326 870
2fe7eff9 871 srv->wqueue->clean();
032785bf 872 delete srv->wqueue;
873
874 if (srv->writebuf) {
2fe7eff9 875 srv->writebuf->clean();
032785bf 876 delete srv->writebuf;
877 srv->writebuf = NULL;
878 }
62e76326 879
07eca7e0 880 for (i = 0; i < concurrency; i++) {
881 if ((r = srv->requests[i])) {
882 void *cbdata;
62e76326 883
07eca7e0 884 if (cbdataReferenceValidDone(r->data, &cbdata))
885 r->callback(cbdata, NULL);
62e76326 886
07eca7e0 887 helperRequestFree(r);
888
889 srv->requests[i] = NULL;
890 }
ac750329 891 }
62e76326 892
07eca7e0 893 safe_free(srv->requests);
894
3cdb7cd0 895 if (srv->wfd != srv->rfd && srv->wfd != -1)
62e76326 896 comm_close(srv->wfd);
897
74addf6c 898 dlinkDelete(&srv->link, &hlp->servers);
62e76326 899
74addf6c 900 hlp->n_running--;
62e76326 901
74addf6c 902 assert(hlp->n_running >= 0);
62e76326 903
1f5f60dd 904 if (!srv->flags.shutdown) {
d8f10d6a 905 hlp->n_active--;
906 assert(hlp->n_active >= 0);
bf8fe701 907 debugs(84, 0, "WARNING: " << hlp->id_name << " #" << srv->index + 1 <<
908 " (FD " << fd << ") exited");
62e76326 909
5ea33fce 910 if (hlp->n_active < hlp->n_to_start / 2) {
bf8fe701 911 debugs(80, 0, "Too few " << hlp->id_name << " processes are running");
5ea33fce 912
913 if (hlp->last_restart > squid_curtime - 30)
914 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
915
bf8fe701 916 debugs(80, 0, "Starting new helpers");
5ea33fce 917
918 helperOpenServers(hlp);
919 }
14e87a44 920 }
62e76326 921
fa80a8ef 922 cbdataReferenceDone(srv->parent);
14e87a44 923 cbdataFree(srv);
74addf6c 924}
925
94439e4e 926static void
927helperStatefulServerFree(int fd, void *data)
928{
e6ccf245 929 helper_stateful_server *srv = (helper_stateful_server *)data;
94439e4e 930 statefulhelper *hlp = srv->parent;
931 helper_stateful_request *r;
932 assert(srv->rfd == fd);
62e76326 933
07eca7e0 934 if (srv->rbuf) {
935 memFreeBuf(srv->rbuf_sz, srv->rbuf);
936 srv->rbuf = NULL;
94439e4e 937 }
62e76326 938
07eca7e0 939#if 0
2fe7eff9 940 srv->wqueue->clean();
032785bf 941
942 delete srv->wqueue;
07eca7e0 943
944#endif
945
94439e4e 946 if ((r = srv->request)) {
62e76326 947 void *cbdata;
948
949 if (cbdataReferenceValidDone(r->data, &cbdata))
07eca7e0 950 r->callback(cbdata, srv, NULL);
62e76326 951
952 helperStatefulRequestFree(r);
953
954 srv->request = NULL;
94439e4e 955 }
62e76326 956
3c641669 957 /* TODO: walk the local queue of requests and carry them all out */
94439e4e 958 if (srv->wfd != srv->rfd && srv->wfd != -1)
62e76326 959 comm_close(srv->wfd);
960
94439e4e 961 dlinkDelete(&srv->link, &hlp->servers);
62e76326 962
94439e4e 963 hlp->n_running--;
62e76326 964
94439e4e 965 assert(hlp->n_running >= 0);
62e76326 966
94439e4e 967 if (!srv->flags.shutdown) {
d8f10d6a 968 hlp->n_active--;
969 assert( hlp->n_active >= 0);
bf8fe701 970 debugs(84, 0, "WARNING: " << hlp->id_name << " #" << srv->index + 1 << " (FD " << fd << ") exited");
62e76326 971
5ea33fce 972 if (hlp->n_active <= hlp->n_to_start / 2) {
bf8fe701 973 debugs(80, 0, "Too few " << hlp->id_name << " processes are running");
5ea33fce 974
975 if (hlp->last_restart > squid_curtime - 30)
976 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
977
bf8fe701 978 debugs(80, 0, "Starting new helpers");
5ea33fce 979
980 helperStatefulOpenServers(hlp);
981 }
94439e4e 982 }
62e76326 983
94439e4e 984 if (srv->data != NULL)
b001e822 985 hlp->datapool->free(srv->data);
62e76326 986
fa80a8ef 987 cbdataReferenceDone(srv->parent);
62e76326 988
94439e4e 989 cbdataFree(srv);
990}
991
992
74addf6c 993static void
c4b7a5a9 994helperHandleRead(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
74addf6c 995{
74addf6c 996 char *t = NULL;
e6ccf245 997 helper_server *srv = (helper_server *)data;
74addf6c 998 helper *hlp = srv->parent;
999 assert(fd == srv->rfd);
fa80a8ef 1000 assert(cbdataReferenceValid(data));
c4b7a5a9 1001
1002 /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
62e76326 1003
c4b7a5a9 1004 if (flag == COMM_ERR_CLOSING) {
1005 return;
1006 }
1007
4f0ef8e8 1008 debugs(84, 5, "helperHandleRead: " << len << " bytes from " << hlp->id_name << " #" << srv->index + 1);
62e76326 1009
c4b7a5a9 1010 if (flag != COMM_OK || len <= 0) {
62e76326 1011 if (len < 0)
bf8fe701 1012 debugs(84, 1, "helperHandleRead: FD " << fd << " read: " << xstrerror());
62e76326 1013
1014 comm_close(fd);
1015
1016 return;
74addf6c 1017 }
62e76326 1018
07eca7e0 1019 srv->roffset += len;
1020 srv->rbuf[srv->roffset] = '\0';
bf8fe701 1021 debugs(84, 9, "helperHandleRead: '" << srv->rbuf << "'");
62e76326 1022
07eca7e0 1023 if (!srv->stats.pending) {
62e76326 1024 /* someone spoke without being spoken to */
bf8fe701 1025 debugs(84, 1, "helperHandleRead: unexpected read from " <<
1026 hlp->id_name << " #" << srv->index + 1 << ", " << (int)len <<
1027 " bytes '" << srv->rbuf << "'");
1028
07eca7e0 1029 srv->roffset = 0;
1030 srv->rbuf[0] = '\0';
1031 }
1032
1033 while ((t = strchr(srv->rbuf, '\n'))) {
62e76326 1034 /* end of reply found */
07eca7e0 1035 helper_request *r;
1036 char *msg = srv->rbuf;
1037 int i = 0;
bf8fe701 1038 debugs(84, 3, "helperHandleRead: end of reply found");
6bf4f823 1039
1040 if (t > srv->rbuf && t[-1] == '\r')
1041 t[-1] = '\0';
1042
07eca7e0 1043 *t++ = '\0';
62e76326 1044
07eca7e0 1045 if (hlp->concurrency) {
1046 i = strtol(msg, &msg, 10);
62e76326 1047
e4755e29 1048 while (*msg && xisspace(*msg))
07eca7e0 1049 msg++;
1050 }
62e76326 1051
07eca7e0 1052 r = srv->requests[i];
62e76326 1053
07eca7e0 1054 if (r) {
1055 HLPCB *callback = r->callback;
1056 void *cbdata;
62e76326 1057
07eca7e0 1058 srv->requests[i] = NULL;
62e76326 1059
07eca7e0 1060 r->callback = NULL;
62e76326 1061
07eca7e0 1062 if (cbdataReferenceValidDone(r->data, &cbdata))
1063 callback(cbdata, msg);
62e76326 1064
07eca7e0 1065 srv->stats.pending--;
1066
1067 hlp->stats.replies++;
1068
f9598528 1069 srv->answer_time = current_time;
1070
1071 srv->dispatch_time = r->dispatch_time;
1072
07eca7e0 1073 hlp->stats.avg_svc_time =
1074 intAverage(hlp->stats.avg_svc_time,
1075 tvSubMsec(r->dispatch_time, current_time),
1076 hlp->stats.replies, REDIRECT_AV_FACTOR);
1077
1078 helperRequestFree(r);
1079 } else {
bf8fe701 1080 debugs(84, 1, "helperHandleRead: unexpected reply on channel " <<
1081 i << " from " << hlp->id_name << " #" << srv->index + 1 <<
1082 " '" << srv->rbuf << "'");
1083
07eca7e0 1084 }
1085
1086 srv->roffset -= (t - srv->rbuf);
1087 memmove(srv->rbuf, t, srv->roffset + 1);
62e76326 1088
1089 if (srv->flags.shutdown) {
1090 int wfd = srv->wfd;
1091 srv->wfd = -1;
d8f10d6a 1092 srv->flags.closing=1;
62e76326 1093 comm_close(wfd);
26ac0430 1094 return;
62e76326 1095 } else
1096 helperKickQueue(hlp);
74addf6c 1097 }
07eca7e0 1098
1099 comm_read(fd, srv->rbuf + srv->roffset, srv->rbuf_sz - srv->roffset - 1, helperHandleRead, srv);
74addf6c 1100}
1101
94439e4e 1102static void
c4b7a5a9 1103helperStatefulHandleRead(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
94439e4e 1104{
94439e4e 1105 char *t = NULL;
e6ccf245 1106 helper_stateful_server *srv = (helper_stateful_server *)data;
94439e4e 1107 helper_stateful_request *r;
1108 statefulhelper *hlp = srv->parent;
1109 assert(fd == srv->rfd);
fa80a8ef 1110 assert(cbdataReferenceValid(data));
c4b7a5a9 1111
1112 /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
62e76326 1113
c4b7a5a9 1114 if (flag == COMM_ERR_CLOSING) {
1115 return;
1116 }
1117
4a7a3d56 1118 debugs(84, 5, "helperStatefulHandleRead: " << len << " bytes from " <<
4f0ef8e8 1119 hlp->id_name << " #" << srv->index + 1);
bf8fe701 1120
62e76326 1121
c4b7a5a9 1122 if (flag != COMM_OK || len <= 0) {
62e76326 1123 if (len < 0)
bf8fe701 1124 debugs(84, 1, "helperStatefulHandleRead: FD " << fd << " read: " << xstrerror());
62e76326 1125
1126 comm_close(fd);
1127
1128 return;
94439e4e 1129 }
62e76326 1130
07eca7e0 1131 srv->roffset += len;
1132 srv->rbuf[srv->roffset] = '\0';
94439e4e 1133 r = srv->request;
62e76326 1134
94439e4e 1135 if (r == NULL) {
62e76326 1136 /* someone spoke without being spoken to */
bf8fe701 1137 debugs(84, 1, "helperStatefulHandleRead: unexpected read from " <<
1138 hlp->id_name << " #" << srv->index + 1 << ", " << (int)len <<
1139 " bytes '" << srv->rbuf << "'");
1140
07eca7e0 1141 srv->roffset = 0;
1142 }
1143
1144 if ((t = strchr(srv->rbuf, '\n'))) {
62e76326 1145 /* end of reply found */
bf8fe701 1146 debugs(84, 3, "helperStatefulHandleRead: end of reply found");
6bf4f823 1147
1148 if (t > srv->rbuf && t[-1] == '\r')
1149 t[-1] = '\0';
1150
62e76326 1151 *t = '\0';
1152
2734fd37 1153 if (r && cbdataReferenceValid(r->data)) {
07eca7e0 1154 switch ((r->callback(r->data, srv, srv->rbuf))) { /*if non-zero reserve helper */
62e76326 1155
1156 case S_HELPER_UNKNOWN:
26ac0430 1157 fatal("helperStatefulHandleRead: either a non-state aware callback was give to the stateful helper routines, or an uninitialised callback response was received.\n");
62e76326 1158 break;
1159
1160 case S_HELPER_RELEASE: /* helper finished with */
1161
1162 if (!srv->deferred_requests && !srv->queue.head) {
1163 srv->flags.reserved = S_HELPER_FREE;
1164
1165 if ((srv->parent->OnEmptyQueue != NULL) && (srv->data))
1166 srv->parent->OnEmptyQueue(srv->data);
1167
bf8fe701 1168 debugs(84, 5, "StatefulHandleRead: releasing " << hlp->id_name << " #" << srv->index + 1);
62e76326 1169 } else {
1170 srv->flags.reserved = S_HELPER_DEFERRED;
bf8fe701 1171 debugs(84, 5, "StatefulHandleRead: outstanding deferred requests on " <<
1172 hlp->id_name << " #" << srv->index + 1 <<
1173 ". reserving for deferred requests.");
62e76326 1174 }
1175
1176 break;
1177
1178 case S_HELPER_RESERVE: /* 'pin' this helper for the caller */
1179
1180 if (!srv->queue.head) {
1181 assert(srv->deferred_requests == 0);
1182 srv->flags.reserved = S_HELPER_RESERVED;
bf8fe701 1183 debugs(84, 5, "StatefulHandleRead: reserving " << hlp->id_name << " #" << srv->index + 1);
62e76326 1184 } else {
1185 fatal("StatefulHandleRead: Callback routine attempted to reserve a stateful helper with deferred requests. This can lead to deadlock.\n");
1186 }
1187
1188 break;
1189
1190 case S_HELPER_DEFER:
1191 /* the helper is still needed, but can
1192 * be used for other requests in the meantime.
1193 */
1194 srv->flags.reserved = S_HELPER_DEFERRED;
1195 srv->deferred_requests++;
1196 srv->stats.deferbycb++;
bf8fe701 1197 debugs(84, 5, "StatefulHandleRead: reserving " << hlp->id_name << " #" << srv->index + 1 << " for deferred requests.");
62e76326 1198 break;
1199
1200 default:
1201 fatal("helperStatefulHandleRead: unknown stateful helper callback result.\n");
1202 }
1203
1204 } else {
bf8fe701 1205 debugs(84, 1, "StatefulHandleRead: no callback data registered");
62e76326 1206 }
1207
1208 srv->flags.busy = 0;
07eca7e0 1209 srv->roffset = 0;
62e76326 1210 helperStatefulRequestFree(r);
1211 srv->request = NULL;
1212 hlp->stats.replies++;
f9598528 1213 srv->answer_time = current_time;
62e76326 1214 hlp->stats.avg_svc_time =
1215 intAverage(hlp->stats.avg_svc_time,
1216 tvSubMsec(srv->dispatch_time, current_time),
1217 hlp->stats.replies, REDIRECT_AV_FACTOR);
1218
1219 if (srv->flags.shutdown
1220 && srv->flags.reserved == S_HELPER_FREE
1221 && !srv->deferred_requests) {
1222 int wfd = srv->wfd;
1223 srv->wfd = -1;
d8f10d6a 1224 srv->flags.closing=1;
62e76326 1225 comm_close(wfd);
1226 } else {
1227 if (srv->queue.head)
1228 helperStatefulServerKickQueue(srv);
1229 else
1230 helperStatefulKickQueue(hlp);
1231 }
94439e4e 1232 }
07eca7e0 1233
1234 comm_read(srv->rfd, srv->rbuf + srv->roffset, srv->rbuf_sz - srv->roffset - 1,
1235 helperStatefulHandleRead, srv);
94439e4e 1236}
1237
74addf6c 1238static void
1239Enqueue(helper * hlp, helper_request * r)
1240{
e6ccf245 1241 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
74addf6c 1242 dlinkAddTail(r, link, &hlp->queue);
1243 hlp->stats.queue_size++;
62e76326 1244
74addf6c 1245 if (hlp->stats.queue_size < hlp->n_running)
62e76326 1246 return;
1247
74addf6c 1248 if (squid_curtime - hlp->last_queue_warn < 600)
62e76326 1249 return;
1250
fe73896c 1251 if (shutting_down || reconfiguring)
62e76326 1252 return;
1253
74addf6c 1254 hlp->last_queue_warn = squid_curtime;
62e76326 1255
bf8fe701 1256 debugs(84, 0, "WARNING: All " << hlp->id_name << " processes are busy.");
1257 debugs(84, 0, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
62e76326 1258
62e76326 1259
74addf6c 1260 if (hlp->stats.queue_size > hlp->n_running * 2)
62e76326 1261 fatalf("Too many queued %s requests", hlp->id_name);
1262
bf8fe701 1263 debugs(84, 1, "Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1264
74addf6c 1265}
1266
94439e4e 1267static void
1268StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r)
1269{
e6ccf245 1270 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
94439e4e 1271 dlinkAddTail(r, link, &hlp->queue);
1272 hlp->stats.queue_size++;
62e76326 1273
94439e4e 1274 if (hlp->stats.queue_size < hlp->n_running)
62e76326 1275 return;
1276
893cbac6 1277 if (hlp->stats.queue_size > hlp->n_running * 2)
62e76326 1278 fatalf("Too many queued %s requests", hlp->id_name);
1279
94439e4e 1280 if (squid_curtime - hlp->last_queue_warn < 600)
62e76326 1281 return;
1282
94439e4e 1283 if (shutting_down || reconfiguring)
62e76326 1284 return;
1285
94439e4e 1286 hlp->last_queue_warn = squid_curtime;
62e76326 1287
bf8fe701 1288 debugs(84, 0, "WARNING: All " << hlp->id_name << " processes are busy.");
62e76326 1289
bf8fe701 1290 debugs(84, 0, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1291 debugs(84, 1, "Consider increasing the number of " << hlp->id_name << " processes in your config file.");
62e76326 1292
94439e4e 1293}
1294
1295static void
1296StatefulServerEnqueue(helper_stateful_server * srv, helper_stateful_request * r)
1297{
e6ccf245 1298 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
94439e4e 1299 dlinkAddTail(r, link, &srv->queue);
62e76326 1300 /* TODO: warning if the queue on this server is more than X
1301 * We don't check the queue size at the moment, because
26ac0430 1302 * requests hitting here are deferrable
62e76326 1303 */
1304 /* hlp->stats.queue_size++;
1305 * if (hlp->stats.queue_size < hlp->n_running)
1306 * return;
1307 * if (squid_curtime - hlp->last_queue_warn < 600)
1308 * return;
1309 * if (shutting_down || reconfiguring)
1310 * return;
1311 * hlp->last_queue_warn = squid_curtime;
bf8fe701 1312 * debugs(84, 0, "WARNING: All " << hlp->id_name << " processes are busy.");
1313 * debugs(84, 0, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
62e76326 1314 * if (hlp->stats.queue_size > hlp->n_running * 2)
1315 * fatalf("Too many queued %s requests", hlp->id_name);
bf8fe701 1316 * debugs(84, 1, "Consider increasing the number of " << hlp->id_name << " processes in your config file." ); */
94439e4e 1317}
1318
1319
74addf6c 1320static helper_request *
1321Dequeue(helper * hlp)
1322{
1323 dlink_node *link;
1324 helper_request *r = NULL;
62e76326 1325
74addf6c 1326 if ((link = hlp->queue.head)) {
62e76326 1327 r = (helper_request *)link->data;
1328 dlinkDelete(link, &hlp->queue);
1329 memFree(link, MEM_DLINK_NODE);
1330 hlp->stats.queue_size--;
74addf6c 1331 }
62e76326 1332
74addf6c 1333 return r;
1334}
1335
94439e4e 1336static helper_stateful_request *
1337StatefulServerDequeue(helper_stateful_server * srv)
1338{
1339 dlink_node *link;
1340 helper_stateful_request *r = NULL;
62e76326 1341
94439e4e 1342 if ((link = srv->queue.head)) {
62e76326 1343 r = (helper_stateful_request *)link->data;
1344 dlinkDelete(link, &srv->queue);
1345 memFree(link, MEM_DLINK_NODE);
94439e4e 1346 }
62e76326 1347
94439e4e 1348 return r;
1349}
1350
1351static helper_stateful_request *
1352StatefulDequeue(statefulhelper * hlp)
1353{
1354 dlink_node *link;
1355 helper_stateful_request *r = NULL;
62e76326 1356
94439e4e 1357 if ((link = hlp->queue.head)) {
62e76326 1358 r = (helper_stateful_request *)link->data;
1359 dlinkDelete(link, &hlp->queue);
1360 memFree(link, MEM_DLINK_NODE);
1361 hlp->stats.queue_size--;
94439e4e 1362 }
62e76326 1363
94439e4e 1364 return r;
1365}
1366
74addf6c 1367static helper_server *
1368GetFirstAvailable(helper * hlp)
1369{
1370 dlink_node *n;
07eca7e0 1371 helper_server *srv;
1372 helper_server *selected = NULL;
62e76326 1373
fe73896c 1374 if (hlp->n_running == 0)
62e76326 1375 return NULL;
1376
07eca7e0 1377 /* Find "least" loaded helper (approx) */
74addf6c 1378 for (n = hlp->servers.head; n != NULL; n = n->next) {
62e76326 1379 srv = (helper_server *)n->data;
1380
07eca7e0 1381 if (selected && selected->stats.pending <= srv->stats.pending)
62e76326 1382 continue;
1383
d8f10d6a 1384 if (srv->flags.shutdown)
62e76326 1385 continue;
1386
07eca7e0 1387 if (!srv->stats.pending)
1388 return srv;
1389
1390 if (selected) {
1391 selected = srv;
1392 break;
1393 }
1394
1395 selected = srv;
74addf6c 1396 }
62e76326 1397
07eca7e0 1398 /* Check for overload */
1399 if (!selected)
1400 return NULL;
1401
1402 if (selected->stats.pending >= (hlp->concurrency ? hlp->concurrency : 1))
1403 return NULL;
1404
1405 return selected;
74addf6c 1406}
1407
94439e4e 1408static helper_stateful_server *
1409StatefulGetFirstAvailable(statefulhelper * hlp)
1410{
1411 dlink_node *n;
1412 helper_stateful_server *srv = NULL;
4f0ef8e8 1413 debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp->n_running);
62e76326 1414
94439e4e 1415 if (hlp->n_running == 0)
62e76326 1416 return NULL;
1417
94439e4e 1418 for (n = hlp->servers.head; n != NULL; n = n->next) {
62e76326 1419 srv = (helper_stateful_server *)n->data;
1420
1421 if (srv->flags.busy)
1422 continue;
1423
1424 if (srv->flags.reserved == S_HELPER_RESERVED)
1425 continue;
1426
d8f10d6a 1427 if (srv->flags.shutdown)
62e76326 1428 continue;
1429
1430 if ((hlp->IsAvailable != NULL) && (srv->data != NULL) && !(hlp->IsAvailable(srv->data)))
1431 continue;
1432
26ac0430 1433 debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv->index);
62e76326 1434 return srv;
94439e4e 1435 }
62e76326 1436
bf8fe701 1437 debugs(84, 5, "StatefulGetFirstAvailable: None available.");
94439e4e 1438 return NULL;
1439}
1440
1441
42679bd6 1442static void
1443helperDispatchWriteDone(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
1444{
07eca7e0 1445 helper_server *srv = (helper_server *)data;
1446
2fe7eff9 1447 srv->writebuf->clean();
032785bf 1448 delete srv->writebuf;
1449 srv->writebuf = NULL;
07eca7e0 1450 srv->flags.writing = 0;
1451
1452 if (flag != COMM_OK) {
1453 /* Helper server has crashed */
bf8fe701 1454 debugs(84, 0, "helperDispatch: Helper " << srv->parent->id_name << " #" << srv->index + 1 << " has crashed");
07eca7e0 1455 return;
1456 }
1457
2fe7eff9 1458 if (!srv->wqueue->isNull()) {
07eca7e0 1459 srv->writebuf = srv->wqueue;
032785bf 1460 srv->wqueue = new MemBuf;
07eca7e0 1461 srv->flags.writing = 1;
1462 comm_write(srv->wfd,
032785bf 1463 srv->writebuf->content(),
1464 srv->writebuf->contentSize(),
07eca7e0 1465 helperDispatchWriteDone, /* Handler */
2b663917 1466 srv, NULL); /* Handler-data, freefunc */
07eca7e0 1467 }
42679bd6 1468}
1469
74addf6c 1470static void
1471helperDispatch(helper_server * srv, helper_request * r)
1472{
1473 helper *hlp = srv->parent;
07eca7e0 1474 helper_request **ptr = NULL;
1475 unsigned int slot;
62e76326 1476
fa80a8ef 1477 if (!cbdataReferenceValid(r->data)) {
bf8fe701 1478 debugs(84, 1, "helperDispatch: invalid callback data");
62e76326 1479 helperRequestFree(r);
1480 return;
74addf6c 1481 }
62e76326 1482
07eca7e0 1483 for (slot = 0; slot < (hlp->concurrency ? hlp->concurrency : 1); slot++) {
1484 if (!srv->requests[slot]) {
1485 ptr = &srv->requests[slot];
1486 break;
1487 }
1488 }
1489
1490 assert(ptr);
1491 *ptr = r;
1492 srv->stats.pending += 1;
1493 r->dispatch_time = current_time;
1494
2fe7eff9 1495 if (srv->wqueue->isNull())
1496 srv->wqueue->init();
07eca7e0 1497
1498 if (hlp->concurrency)
2fe7eff9 1499 srv->wqueue->Printf("%d %s", slot, r->buf);
07eca7e0 1500 else
2fe7eff9 1501 srv->wqueue->append(r->buf, strlen(r->buf));
07eca7e0 1502
1503 if (!srv->flags.writing) {
032785bf 1504 assert(NULL == srv->writebuf);
07eca7e0 1505 srv->writebuf = srv->wqueue;
032785bf 1506 srv->wqueue = new MemBuf;
07eca7e0 1507 srv->flags.writing = 1;
1508 comm_write(srv->wfd,
032785bf 1509 srv->writebuf->content(),
1510 srv->writebuf->contentSize(),
07eca7e0 1511 helperDispatchWriteDone, /* Handler */
2b663917 1512 srv, NULL); /* Handler-data, free func */
07eca7e0 1513 }
1514
4a7a3d56 1515 debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index + 1 << ", " << strlen(r->buf) << " bytes");
bf8fe701 1516
74addf6c 1517 srv->stats.uses++;
1518 hlp->stats.requests++;
1519}
1520
42679bd6 1521static void
1522helperStatefulDispatchWriteDone(int fd, char *buf, size_t len, comm_err_t flag,
62e76326 1523 int xerrno, void *data)
42679bd6 1524{
62e76326 1525 /* nothing! */
42679bd6 1526}
1527
1528
94439e4e 1529static void
1530helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r)
1531{
1532 statefulhelper *hlp = srv->parent;
62e76326 1533
fa80a8ef 1534 if (!cbdataReferenceValid(r->data)) {
bf8fe701 1535 debugs(84, 1, "helperStatefulDispatch: invalid callback data");
62e76326 1536 helperStatefulRequestFree(r);
1537 return;
94439e4e 1538 }
62e76326 1539
bf8fe701 1540 debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index + 1);
62e76326 1541
94439e4e 1542 if (r->placeholder == 1) {
62e76326 1543 /* a callback is needed before this request can _use_ a helper. */
1544 /* we don't care about releasing/deferring this helper. The request NEVER
1545 * gets to the helper. So we throw away the return code */
1546 r->callback(r->data, srv, NULL);
1547 /* throw away the placeholder */
1548 helperStatefulRequestFree(r);
1549 /* and push the queue. Note that the callback may have submitted a new
1550 * request to the helper which is why we test for the request*/
1551
1552 if (srv->request == NULL) {
1553 if (srv->flags.shutdown
1554 && srv->flags.reserved == S_HELPER_FREE
1555 && !srv->deferred_requests) {
1556 int wfd = srv->wfd;
1557 srv->wfd = -1;
3bf941a6 1558 srv->flags.closing=1;
62e76326 1559 comm_close(wfd);
1560 } else {
1561 if (srv->queue.head)
1562 helperStatefulServerKickQueue(srv);
1563 else
1564 helperStatefulKickQueue(hlp);
1565 }
1566 }
1567
1568 return;
94439e4e 1569 }
62e76326 1570
94439e4e 1571 srv->flags.busy = 1;
1572 srv->request = r;
1573 srv->dispatch_time = current_time;
42679bd6 1574 comm_write(srv->wfd,
62e76326 1575 r->buf,
1576 strlen(r->buf),
1577 helperStatefulDispatchWriteDone, /* Handler */
2b663917 1578 hlp, NULL); /* Handler-data, free func */
bf8fe701 1579 debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
1580 hlp->id_name << " #" << srv->index + 1 << ", " <<
1581 (int) strlen(r->buf) << " bytes");
1582
94439e4e 1583 srv->stats.uses++;
1584 hlp->stats.requests++;
1585}
1586
1587
74addf6c 1588static void
1589helperKickQueue(helper * hlp)
1590{
1591 helper_request *r;
1592 helper_server *srv;
62e76326 1593
74addf6c 1594 while ((srv = GetFirstAvailable(hlp)) && (r = Dequeue(hlp)))
62e76326 1595 helperDispatch(srv, r);
74addf6c 1596}
1597
94439e4e 1598static void
1599helperStatefulKickQueue(statefulhelper * hlp)
1600{
1601 helper_stateful_request *r;
1602 helper_stateful_server *srv;
62e76326 1603
94439e4e 1604 while ((srv = StatefulGetFirstAvailable(hlp)) && (r = StatefulDequeue(hlp)))
62e76326 1605 helperStatefulDispatch(srv, r);
94439e4e 1606}
1607
1608static void
1609helperStatefulServerKickQueue(helper_stateful_server * srv)
1610{
1611 helper_stateful_request *r;
62e76326 1612
94439e4e 1613 if ((r = StatefulServerDequeue(srv)))
62e76326 1614 helperStatefulDispatch(srv, r);
94439e4e 1615}
1616
74addf6c 1617static void
1618helperRequestFree(helper_request * r)
1619{
fa80a8ef 1620 cbdataReferenceDone(r->data);
74addf6c 1621 xfree(r->buf);
00d77d6b 1622 delete r;
51ee7c82 1623}
1624
94439e4e 1625static void
1626helperStatefulRequestFree(helper_stateful_request * r)
1627{
26ac0430 1628 if (r) {
b1da7838 1629 cbdataReferenceDone(r->data);
1630 xfree(r->buf);
1631 delete r;
1632 }
51ee7c82 1633}
9522b380 1634
1635// TODO: should helper_ and helper_stateful_ have a common parent?
1636static bool
1637helperStartStats(StoreEntry *sentry, void *hlp, const char *label)
1638{
1639 if (!hlp) {
1640 if (label)
1641 storeAppendPrintf(sentry, "%s: unavailable\n", label);
1642 return false;
1643 }
1644
1645 if (label)
1646 storeAppendPrintf(sentry, "%s:\n", label);
1647
1648 return true;
1649}