]> git.ipfire.org Git - thirdparty/squid.git/blame - src/helper.cc
Extended ntlm/negotiate_test to support helper local thinking time/delays
[thirdparty/squid.git] / src / helper.cc
CommitLineData
f740a279 1/*
262a0e14 2 * $Id$
f740a279 3 *
17bb3486 4 * DEBUG: section 84 Helper process maintenance
f740a279 5 * AUTHOR: Harvest Derived?
6 *
2b6662ba 7 * SQUID Web Proxy Cache http://www.squid-cache.org/
f740a279 8 * ----------------------------------------------------------
9 *
2b6662ba 10 * Squid is the result of efforts by numerous individuals from
11 * the Internet community; see the CONTRIBUTORS file for full
12 * details. Many organizations have provided support for Squid's
13 * development; see the SPONSORS file for full details. Squid is
14 * Copyrighted (C) 2001 by the Regents of the University of
15 * California; see the COPYRIGHT file for full details. Squid
16 * incorporates software developed and/or copyrighted by other
17 * sources; see the CREDITS file for full details.
f740a279 18 *
19 * This program is free software; you can redistribute it and/or modify
20 * it under the terms of the GNU General Public License as published by
21 * the Free Software Foundation; either version 2 of the License, or
22 * (at your option) any later version.
26ac0430 23 *
f740a279 24 * This program is distributed in the hope that it will be useful,
25 * but WITHOUT ANY WARRANTY; without even the implied warranty of
26 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
27 * GNU General Public License for more details.
26ac0430 28 *
f740a279 29 * You should have received a copy of the GNU General Public License
30 * along with this program; if not, write to the Free Software
31 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
32 *
33 */
34
74addf6c 35#include "squid.h"
51ee7c82 36#include "helper.h"
985c86bc 37#include "SquidTime.h"
e6ccf245 38#include "Store.h"
42679bd6 39#include "comm.h"
0eb49b6d 40#include "MemBuf.h"
d295d770 41#include "wordlist.h"
74addf6c 42
43#define HELPER_MAX_ARGS 64
44
7f37478b
AJ
45/* size of helper read buffer (maximum?). no reason given for this size */
46/* though it has been seen to be too short for some requests */
47/* it is dynamic, so increasng should not have side effects */
48#define BUF_8KB 8192
49
c4b7a5a9 50static IOCB helperHandleRead;
51static IOCB helperStatefulHandleRead;
1f5f60dd 52static PF helperServerFree;
94439e4e 53static PF helperStatefulServerFree;
74addf6c 54static void Enqueue(helper * hlp, helper_request *);
55static helper_request *Dequeue(helper * hlp);
94439e4e 56static helper_stateful_request *StatefulDequeue(statefulhelper * hlp);
74addf6c 57static helper_server *GetFirstAvailable(helper * hlp);
94439e4e 58static helper_stateful_server *StatefulGetFirstAvailable(statefulhelper * hlp);
74addf6c 59static void helperDispatch(helper_server * srv, helper_request * r);
94439e4e 60static void helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r);
74addf6c 61static void helperKickQueue(helper * hlp);
94439e4e 62static void helperStatefulKickQueue(statefulhelper * hlp);
a8c4f8d6 63static void helperStatefulServerDone(helper_stateful_server * srv);
74addf6c 64static void helperRequestFree(helper_request * r);
94439e4e 65static void helperStatefulRequestFree(helper_stateful_request * r);
66static void StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r);
9522b380 67static bool helperStartStats(StoreEntry *sentry, void *hlp, const char *label);
68
74addf6c 69
aa839030 70CBDATA_TYPE(helper);
71CBDATA_TYPE(helper_server);
72CBDATA_TYPE(statefulhelper);
73CBDATA_TYPE(helper_stateful_server);
74
74addf6c 75void
76helperOpenServers(helper * hlp)
77{
78 char *s;
79 char *progname;
80 char *shortname;
81 char *procname;
1b8af627 82 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
74addf6c 83 char fd_note_buf[FD_DESC_SZ];
84 helper_server *srv;
85 int nargs = 0;
86 int k;
b5d712b5 87 pid_t pid;
74addf6c 88 int rfd;
89 int wfd;
b5d712b5 90 void * hIpc;
74addf6c 91 wordlist *w;
62e76326 92
74addf6c 93 if (hlp->cmdline == NULL)
62e76326 94 return;
95
74addf6c 96 progname = hlp->cmdline->key;
62e76326 97
74addf6c 98 if ((s = strrchr(progname, '/')))
62e76326 99 shortname = xstrdup(s + 1);
74addf6c 100 else
62e76326 101 shortname = xstrdup(progname);
102
5a5b2c56 103 /* dont ever start more than hlp->n_to_start processes. */
d974a072 104 int need_new = hlp->n_to_start - hlp->n_active;
5a5b2c56
AJ
105
106 debugs(84, 1, "helperOpenServers: Starting " << need_new << "/" << hlp->n_to_start << " '" << shortname << "' processes");
107
04f7fd38 108 if (need_new < 1) {
5a5b2c56
AJ
109 debugs(84, 1, "helperOpenServers: No '" << shortname << "' processes needed.");
110 }
62e76326 111
e6ccf245 112 procname = (char *)xmalloc(strlen(shortname) + 3);
62e76326 113
74addf6c 114 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
62e76326 115
74addf6c 116 args[nargs++] = procname;
62e76326 117
74addf6c 118 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next)
62e76326 119 args[nargs++] = w->key;
120
74addf6c 121 args[nargs++] = NULL;
62e76326 122
74addf6c 123 assert(nargs <= HELPER_MAX_ARGS);
62e76326 124
5a5b2c56 125 for (k = 0; k < need_new; k++) {
62e76326 126 getCurrentTime();
127 rfd = wfd = -1;
b5d712b5 128 pid = ipcCreate(hlp->ipc_type,
129 progname,
130 args,
131 shortname,
cc192b50 132 hlp->addr,
b5d712b5 133 &rfd,
134 &wfd,
135 &hIpc);
136
137 if (pid < 0) {
bf8fe701 138 debugs(84, 1, "WARNING: Cannot run '" << progname << "' process.");
62e76326 139 continue;
140 }
141
142 hlp->n_running++;
d8f10d6a 143 hlp->n_active++;
b5d712b5 144 CBDATA_INIT_TYPE(helper_server);
62e76326 145 srv = cbdataAlloc(helper_server);
b5d712b5 146 srv->hIpc = hIpc;
147 srv->pid = pid;
62e76326 148 srv->index = k;
cc192b50 149 srv->addr = hlp->addr;
62e76326 150 srv->rfd = rfd;
151 srv->wfd = wfd;
7f37478b 152 srv->rbuf = (char *)memAllocBuf(BUF_8KB, &srv->rbuf_sz);
90bed1c4 153 srv->wqueue = new MemBuf;
07eca7e0 154 srv->roffset = 0;
155 srv->requests = (helper_request **)xcalloc(hlp->concurrency ? hlp->concurrency : 1, sizeof(*srv->requests));
62e76326 156 srv->parent = cbdataReference(hlp);
157 dlinkAddTail(srv, &srv->link, &hlp->servers);
158
159 if (rfd == wfd) {
160 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
161 fd_note(rfd, fd_note_buf);
162 } else {
163 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
164 fd_note(rfd, fd_note_buf);
165 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
166 fd_note(wfd, fd_note_buf);
167 }
168
169 commSetNonBlocking(rfd);
170
171 if (wfd != rfd)
172 commSetNonBlocking(wfd);
173
174 comm_add_close_handler(rfd, helperServerFree, srv);
07eca7e0 175
176 comm_read(srv->rfd, srv->rbuf, srv->rbuf_sz - 1, helperHandleRead, srv);
74addf6c 177 }
62e76326 178
5ea33fce 179 hlp->last_restart = squid_curtime;
74addf6c 180 safe_free(shortname);
181 safe_free(procname);
838b993c 182 helperKickQueue(hlp);
74addf6c 183}
184
4f0ef8e8 185/*
186 * DPW 2007-05-08
26ac0430 187 *
4f0ef8e8 188 * helperStatefulOpenServers: create the stateful child helper processes
189 */
94439e4e 190void
191helperStatefulOpenServers(statefulhelper * hlp)
192{
94439e4e 193 char *shortname;
1b8af627 194 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
94439e4e 195 char fd_note_buf[FD_DESC_SZ];
94439e4e 196 int nargs = 0;
62e76326 197
94439e4e 198 if (hlp->cmdline == NULL)
62e76326 199 return;
200
4f0ef8e8 201 char *progname = hlp->cmdline->key;
62e76326 202
4f0ef8e8 203 char *s;
94439e4e 204 if ((s = strrchr(progname, '/')))
62e76326 205 shortname = xstrdup(s + 1);
94439e4e 206 else
62e76326 207 shortname = xstrdup(progname);
208
5a5b2c56 209 /* dont ever start more than hlp->n_to_start processes. */
d974a072
AJ
210 /* n_active are the helpers which have not been shut down. */
211 int need_new = hlp->n_to_start - hlp->n_active;
5a5b2c56
AJ
212
213 debugs(84, 1, "helperOpenServers: Starting " << need_new << "/" << hlp->n_to_start << " '" << shortname << "' processes");
214
04f7fd38 215 if (need_new < 1) {
5a5b2c56
AJ
216 debugs(84, 1, "helperStatefulOpenServers: No '" << shortname << "' processes needed.");
217 }
62e76326 218
4f0ef8e8 219 char *procname = (char *)xmalloc(strlen(shortname) + 3);
62e76326 220
94439e4e 221 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
62e76326 222
94439e4e 223 args[nargs++] = procname;
62e76326 224
4f0ef8e8 225 for (wordlist *w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next)
62e76326 226 args[nargs++] = w->key;
227
94439e4e 228 args[nargs++] = NULL;
62e76326 229
94439e4e 230 assert(nargs <= HELPER_MAX_ARGS);
62e76326 231
5a5b2c56 232 for (int k = 0; k < need_new; k++) {
62e76326 233 getCurrentTime();
26ac0430
AJ
234 int rfd = -1;
235 int wfd = -1;
236 void * hIpc;
4f0ef8e8 237 pid_t pid = ipcCreate(hlp->ipc_type,
26ac0430
AJ
238 progname,
239 args,
240 shortname,
241 hlp->addr,
242 &rfd,
243 &wfd,
244 &hIpc);
b5d712b5 245
246 if (pid < 0) {
bf8fe701 247 debugs(84, 1, "WARNING: Cannot run '" << progname << "' process.");
62e76326 248 continue;
249 }
250
251 hlp->n_running++;
d8f10d6a 252 hlp->n_active++;
b5d712b5 253 CBDATA_INIT_TYPE(helper_stateful_server);
4f0ef8e8 254 helper_stateful_server *srv = cbdataAlloc(helper_stateful_server);
b5d712b5 255 srv->hIpc = hIpc;
256 srv->pid = pid;
360d26ea 257 srv->flags.reserved = 0;
62e76326 258 srv->stats.submits = 0;
259 srv->stats.releases = 0;
260 srv->index = k;
cc192b50 261 srv->addr = hlp->addr;
62e76326 262 srv->rfd = rfd;
263 srv->wfd = wfd;
7f37478b 264 srv->rbuf = (char *)memAllocBuf(BUF_8KB, &srv->rbuf_sz);
07eca7e0 265 srv->roffset = 0;
62e76326 266 srv->parent = cbdataReference(hlp);
267
268 if (hlp->datapool != NULL)
b001e822 269 srv->data = hlp->datapool->alloc();
62e76326 270
271 dlinkAddTail(srv, &srv->link, &hlp->servers);
272
273 if (rfd == wfd) {
274 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
275 fd_note(rfd, fd_note_buf);
276 } else {
277 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
278 fd_note(rfd, fd_note_buf);
279 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
280 fd_note(wfd, fd_note_buf);
281 }
282
283 commSetNonBlocking(rfd);
284
285 if (wfd != rfd)
286 commSetNonBlocking(wfd);
287
288 comm_add_close_handler(rfd, helperStatefulServerFree, srv);
07eca7e0 289
290 comm_read(srv->rfd, srv->rbuf, srv->rbuf_sz - 1, helperStatefulHandleRead, srv);
94439e4e 291 }
62e76326 292
5ea33fce 293 hlp->last_restart = squid_curtime;
94439e4e 294 safe_free(shortname);
295 safe_free(procname);
296 helperStatefulKickQueue(hlp);
297}
298
299
74addf6c 300void
301helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
302{
5b5f9257 303 if (hlp == NULL) {
bf8fe701 304 debugs(84, 3, "helperSubmit: hlp == NULL");
62e76326 305 callback(data, NULL);
306 return;
5b5f9257 307 }
62e76326 308
eff7218b 309 helper_request *r = new helper_request;
310 helper_server *srv;
311
74addf6c 312 r->callback = callback;
fa80a8ef 313 r->data = cbdataReference(data);
74addf6c 314 r->buf = xstrdup(buf);
62e76326 315
74addf6c 316 if ((srv = GetFirstAvailable(hlp)))
62e76326 317 helperDispatch(srv, r);
74addf6c 318 else
62e76326 319 Enqueue(hlp, r);
320
bf8fe701 321 debugs(84, 9, "helperSubmit: " << buf);
94439e4e 322}
323
d20ce97d 324/* lastserver = "server last used as part of a reserved request sequence"
721b0310 325 */
94439e4e 326void
327helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPSCB * callback, void *data, helper_stateful_server * lastserver)
328{
94439e4e 329 if (hlp == NULL) {
bf8fe701 330 debugs(84, 3, "helperStatefulSubmit: hlp == NULL");
62e76326 331 callback(data, 0, NULL);
332 return;
94439e4e 333 }
62e76326 334
eff7218b 335 helper_stateful_request *r = new helper_stateful_request;
eff7218b 336
94439e4e 337 r->callback = callback;
fa80a8ef 338 r->data = cbdataReference(data);
62e76326 339
721b0310 340 if (buf != NULL) {
62e76326 341 r->buf = xstrdup(buf);
342 r->placeholder = 0;
721b0310 343 } else {
62e76326 344 r->buf = NULL;
345 r->placeholder = 1;
721b0310 346 }
62e76326 347
94439e4e 348 if ((buf != NULL) && lastserver) {
bf8fe701 349 debugs(84, 5, "StatefulSubmit with lastserver " << lastserver);
360d26ea 350 assert(lastserver->flags.reserved);
a80b46e3 351 assert(!(lastserver->request));
62e76326 352
d20ce97d
HN
353 debugs(84, 5, "StatefulSubmit dispatching");
354 helperStatefulDispatch(lastserver, r);
94439e4e 355 } else {
26ac0430 356 helper_stateful_server *srv;
62e76326 357 if ((srv = StatefulGetFirstAvailable(hlp))) {
358 helperStatefulDispatch(srv, r);
359 } else
360 StatefulEnqueue(hlp, r);
94439e4e 361 }
62e76326 362
bf8fe701 363 debugs(84, 9, "helperStatefulSubmit: placeholder: '" << r->placeholder << "', buf '" << buf << "'.");
94439e4e 364}
365
4f0ef8e8 366/*
367 * DPW 2007-05-08
368 *
369 * helperStatefulReleaseServer tells the helper that whoever was
370 * using it no longer needs its services.
4f0ef8e8 371 */
94439e4e 372void
373helperStatefulReleaseServer(helper_stateful_server * srv)
94439e4e 374{
4f0ef8e8 375 debugs(84, 3, HERE << "srv-" << srv->index << " flags.reserved = " << srv->flags.reserved);
360d26ea 376 if (!srv->flags.reserved)
be29fa07 377 return;
4f0ef8e8 378
60d096f4 379 srv->stats.releases++;
62e76326 380
360d26ea 381 srv->flags.reserved = 0;
4f0ef8e8 382 if (srv->parent->OnEmptyQueue != NULL && srv->data)
26ac0430 383 srv->parent->OnEmptyQueue(srv->data);
420e3e30 384
a8c4f8d6 385 helperStatefulServerDone(srv);
94439e4e 386}
387
388void *
389helperStatefulServerGetData(helper_stateful_server * srv)
390/* return a pointer to the stateful routines data area */
391{
392 return srv->data;
74addf6c 393}
394
395void
9522b380 396helperStats(StoreEntry * sentry, helper * hlp, const char *label)
74addf6c 397{
9522b380 398 if (!helperStartStats(sentry, hlp, label))
399 return;
400
0a706c69 401 storeAppendPrintf(sentry, "program: %s\n",
62e76326 402 hlp->cmdline->key);
d974a072
AJ
403 storeAppendPrintf(sentry, "number active: %d of %d (%d shutting down)\n",
404 hlp->n_active, hlp->n_to_start, (hlp->n_running - hlp->n_active) );
74addf6c 405 storeAppendPrintf(sentry, "requests sent: %d\n",
62e76326 406 hlp->stats.requests);
74addf6c 407 storeAppendPrintf(sentry, "replies received: %d\n",
62e76326 408 hlp->stats.replies);
74addf6c 409 storeAppendPrintf(sentry, "queue length: %d\n",
62e76326 410 hlp->stats.queue_size);
f9598528 411 storeAppendPrintf(sentry, "avg service time: %d msec\n",
412 hlp->stats.avg_svc_time);
74addf6c 413 storeAppendPrintf(sentry, "\n");
09c1ece1 414 storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%s\t%7s\t%7s\t%7s\n",
62e76326 415 "#",
416 "FD",
417 "PID",
418 "# Requests",
419 "Flags",
420 "Time",
421 "Offset",
422 "Request");
423
4f0ef8e8 424 for (dlink_node *link = hlp->servers.head; link; link = link->next) {
07eca7e0 425 helper_server *srv = (helper_server*)link->data;
f9598528 426 double tt = 0.001 * (srv->requests[0] ? tvSubMsec(srv->requests[0]->dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
d8f10d6a 427 storeAppendPrintf(sentry, "%7d\t%7d\t%7d\t%11d\t%c%c%c%c\t%7.3f\t%7d\t%s\n",
62e76326 428 srv->index + 1,
429 srv->rfd,
430 srv->pid,
431 srv->stats.uses,
07eca7e0 432 srv->stats.pending ? 'B' : ' ',
433 srv->flags.writing ? 'W' : ' ',
62e76326 434 srv->flags.closing ? 'C' : ' ',
435 srv->flags.shutdown ? 'S' : ' ',
436 tt < 0.0 ? 0.0 : tt,
07eca7e0 437 (int) srv->roffset,
438 srv->requests[0] ? log_quote(srv->requests[0]->buf) : "(none)");
74addf6c 439 }
62e76326 440
74addf6c 441 storeAppendPrintf(sentry, "\nFlags key:\n\n");
74addf6c 442 storeAppendPrintf(sentry, " B = BUSY\n");
f9598528 443 storeAppendPrintf(sentry, " W = WRITING\n");
74addf6c 444 storeAppendPrintf(sentry, " C = CLOSING\n");
d974a072 445 storeAppendPrintf(sentry, " S = SHUTDOWN PENDING\n");
74addf6c 446}
447
94439e4e 448void
9522b380 449helperStatefulStats(StoreEntry * sentry, statefulhelper * hlp, const char *label)
94439e4e 450{
9522b380 451 if (!helperStartStats(sentry, hlp, label))
452 return;
453
cf154edc 454 storeAppendPrintf(sentry, "program: %s\n",
455 hlp->cmdline->key);
d974a072
AJ
456 storeAppendPrintf(sentry, "number active: %d of %d (%d shutting down)\n",
457 hlp->n_active, hlp->n_to_start, (hlp->n_running - hlp->n_active) );
94439e4e 458 storeAppendPrintf(sentry, "requests sent: %d\n",
62e76326 459 hlp->stats.requests);
94439e4e 460 storeAppendPrintf(sentry, "replies received: %d\n",
62e76326 461 hlp->stats.replies);
94439e4e 462 storeAppendPrintf(sentry, "queue length: %d\n",
62e76326 463 hlp->stats.queue_size);
94439e4e 464 storeAppendPrintf(sentry, "avg service time: %d msec\n",
62e76326 465 hlp->stats.avg_svc_time);
94439e4e 466 storeAppendPrintf(sentry, "\n");
3900cea9 467 storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
62e76326 468 "#",
469 "FD",
470 "PID",
471 "# Requests",
62e76326 472 "Flags",
473 "Time",
474 "Offset",
475 "Request");
476
4f0ef8e8 477 for (dlink_node *link = hlp->servers.head; link; link = link->next) {
478 helper_stateful_server *srv = (helper_stateful_server *)link->data;
479 double tt = 0.001 * tvSubMsec(srv->dispatch_time,
26ac0430 480 srv->flags.busy ? current_time : srv->answer_time);
d20ce97d 481 storeAppendPrintf(sentry, "%7d\t%7d\t%7d\t%11d\t%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
62e76326 482 srv->index + 1,
483 srv->rfd,
484 srv->pid,
485 srv->stats.uses,
62e76326 486 srv->flags.busy ? 'B' : ' ',
487 srv->flags.closing ? 'C' : ' ',
360d26ea 488 srv->flags.reserved ? 'R' : ' ',
62e76326 489 srv->flags.shutdown ? 'S' : ' ',
490 srv->request ? (srv->request->placeholder ? 'P' : ' ') : ' ',
491 tt < 0.0 ? 0.0 : tt,
07eca7e0 492 (int) srv->roffset,
62e76326 493 srv->request ? log_quote(srv->request->buf) : "(none)");
94439e4e 494 }
62e76326 495
94439e4e 496 storeAppendPrintf(sentry, "\nFlags key:\n\n");
94439e4e 497 storeAppendPrintf(sentry, " B = BUSY\n");
498 storeAppendPrintf(sentry, " C = CLOSING\n");
d20ce97d 499 storeAppendPrintf(sentry, " R = RESERVED\n");
d974a072 500 storeAppendPrintf(sentry, " S = SHUTDOWN PENDING\n");
5d146f7d 501 storeAppendPrintf(sentry, " P = PLACEHOLDER\n");
94439e4e 502}
503
74addf6c 504void
505helperShutdown(helper * hlp)
506{
c68e9c6b 507 dlink_node *link = hlp->servers.head;
2d5d5261 508#ifdef _SQUID_MSWIN_
509
510 HANDLE hIpc;
511 pid_t pid;
512 int no;
513#endif
62e76326 514
c68e9c6b 515 while (link) {
62e76326 516 helper_server *srv;
517 srv = (helper_server *)link->data;
518 link = link->next;
519
8cfc76db 520 if (srv->flags.shutdown) {
bf8fe701 521 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " has already SHUT DOWN.");
62e76326 522 continue;
523 }
524
d8f10d6a 525 hlp->n_active--;
526 assert(hlp->n_active >= 0);
62e76326 527 srv->flags.shutdown = 1; /* request it to shut itself down */
528
d8f10d6a 529 if (srv->flags.closing) {
bf8fe701 530 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is CLOSING.");
62e76326 531 continue;
532 }
533
d8f10d6a 534 if (srv->stats.pending) {
bf8fe701 535 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is BUSY.");
62e76326 536 continue;
537 }
538
539 srv->flags.closing = 1;
2d5d5261 540#ifdef _SQUID_MSWIN_
541
542 hIpc = srv->hIpc;
543 pid = srv->pid;
544 no = srv->index + 1;
545 shutdown(srv->wfd, SD_BOTH);
546#endif
547
bf8fe701 548 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " shutting down.");
62e76326 549 /* the rest of the details is dealt with in the helperServerFree
550 * close handler
551 */
552 comm_close(srv->rfd);
2d5d5261 553#ifdef _SQUID_MSWIN_
554
555 if (hIpc) {
556 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
557 getCurrentTime();
bf8fe701 558 debugs(84, 1, "helperShutdown: WARNING: " << hlp->id_name <<
559 " #" << no << " (" << hlp->cmdline->key << "," <<
560 (long int)pid << ") didn't exit in 5 seconds");
561
2d5d5261 562 }
563
564 CloseHandle(hIpc);
565 }
566
567#endif
568
74addf6c 569 }
570}
571
94439e4e 572void
573helperStatefulShutdown(statefulhelper * hlp)
574{
575 dlink_node *link = hlp->servers.head;
576 helper_stateful_server *srv;
2d5d5261 577#ifdef _SQUID_MSWIN_
578
579 HANDLE hIpc;
580 pid_t pid;
581 int no;
582#endif
62e76326 583
94439e4e 584 while (link) {
62e76326 585 srv = (helper_stateful_server *)link->data;
586 link = link->next;
587
8cfc76db 588 if (srv->flags.shutdown) {
bf8fe701 589 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " has already SHUT DOWN.");
62e76326 590 continue;
591 }
592
d8f10d6a 593 hlp->n_active--;
594 assert(hlp->n_active >= 0);
62e76326 595 srv->flags.shutdown = 1; /* request it to shut itself down */
596
597 if (srv->flags.busy) {
bf8fe701 598 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is BUSY.");
62e76326 599 continue;
600 }
601
602 if (srv->flags.closing) {
bf8fe701 603 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is CLOSING.");
62e76326 604 continue;
605 }
606
360d26ea 607 if (srv->flags.reserved) {
d7e0f901
AJ
608 if (shutting_down) {
609 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is RESERVED. Closing anyway.");
610 }
611 else {
612 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is RESERVED. Not Shutting Down Yet.");
613 continue;
614 }
62e76326 615 }
616
62e76326 617 srv->flags.closing = 1;
2d5d5261 618#ifdef _SQUID_MSWIN_
619
620 hIpc = srv->hIpc;
621 pid = srv->pid;
622 no = srv->index + 1;
623 shutdown(srv->wfd, SD_BOTH);
624#endif
625
bf8fe701 626 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " shutting down.");
627
62e76326 628 /* the rest of the details is dealt with in the helperStatefulServerFree
629 * close handler
630 */
631 comm_close(srv->rfd);
2d5d5261 632#ifdef _SQUID_MSWIN_
633
634 if (hIpc) {
635 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
636 getCurrentTime();
bf8fe701 637 debugs(84, 1, "helperShutdown: WARNING: " << hlp->id_name <<
638 " #" << no << " (" << hlp->cmdline->key << "," <<
639 (long int)pid << ") didn't exit in 5 seconds");
2d5d5261 640 }
641
642 CloseHandle(hIpc);
643 }
644
645#endif
646
94439e4e 647 }
648}
649
650
1f5f60dd 651helper *
652helperCreate(const char *name)
653{
28c60158 654 helper *hlp;
aa839030 655 CBDATA_INIT_TYPE(helper);
72711e31 656 hlp = cbdataAlloc(helper);
1f5f60dd 657 hlp->id_name = name;
658 return hlp;
659}
660
94439e4e 661statefulhelper *
662helperStatefulCreate(const char *name)
663{
664 statefulhelper *hlp;
aa839030 665 CBDATA_INIT_TYPE(statefulhelper);
72711e31 666 hlp = cbdataAlloc(statefulhelper);
94439e4e 667 hlp->id_name = name;
668 return hlp;
669}
670
671
1f5f60dd 672void
673helperFree(helper * hlp)
674{
5dae8514 675 if (!hlp)
62e76326 676 return;
677
1f5f60dd 678 /* note, don't free hlp->name, it probably points to static memory */
fe73896c 679 if (hlp->queue.head)
bf8fe701 680 debugs(84, 0, "WARNING: freeing " << hlp->id_name << " helper with " <<
681 hlp->stats.queue_size << " requests queued");
62e76326 682
1f5f60dd 683 cbdataFree(hlp);
684}
685
94439e4e 686void
687helperStatefulFree(statefulhelper * hlp)
688{
5dae8514 689 if (!hlp)
62e76326 690 return;
691
94439e4e 692 /* note, don't free hlp->name, it probably points to static memory */
693 if (hlp->queue.head)
bf8fe701 694 debugs(84, 0, "WARNING: freeing " << hlp->id_name << " helper with " <<
695 hlp->stats.queue_size << " requests queued");
62e76326 696
94439e4e 697 cbdataFree(hlp);
698}
699
700
74addf6c 701/* ====================================================================== */
702/* LOCAL FUNCTIONS */
703/* ====================================================================== */
704
705static void
1f5f60dd 706helperServerFree(int fd, void *data)
74addf6c 707{
e6ccf245 708 helper_server *srv = (helper_server *)data;
74addf6c 709 helper *hlp = srv->parent;
ac750329 710 helper_request *r;
07eca7e0 711 int i, concurrency = hlp->concurrency;
712
713 if (!concurrency)
714 concurrency = 1;
715
07eca7e0 716 if (srv->rbuf) {
717 memFreeBuf(srv->rbuf_sz, srv->rbuf);
718 srv->rbuf = NULL;
74addf6c 719 }
62e76326 720
2fe7eff9 721 srv->wqueue->clean();
032785bf 722 delete srv->wqueue;
723
724 if (srv->writebuf) {
2fe7eff9 725 srv->writebuf->clean();
032785bf 726 delete srv->writebuf;
727 srv->writebuf = NULL;
728 }
62e76326 729
07eca7e0 730 for (i = 0; i < concurrency; i++) {
731 if ((r = srv->requests[i])) {
732 void *cbdata;
62e76326 733
07eca7e0 734 if (cbdataReferenceValidDone(r->data, &cbdata))
735 r->callback(cbdata, NULL);
62e76326 736
07eca7e0 737 helperRequestFree(r);
738
739 srv->requests[i] = NULL;
740 }
ac750329 741 }
62e76326 742
07eca7e0 743 safe_free(srv->requests);
744
3cdb7cd0 745 if (srv->wfd != srv->rfd && srv->wfd != -1)
62e76326 746 comm_close(srv->wfd);
747
74addf6c 748 dlinkDelete(&srv->link, &hlp->servers);
62e76326 749
74addf6c 750 hlp->n_running--;
62e76326 751
74addf6c 752 assert(hlp->n_running >= 0);
62e76326 753
1f5f60dd 754 if (!srv->flags.shutdown) {
d8f10d6a 755 hlp->n_active--;
756 assert(hlp->n_active >= 0);
bf8fe701 757 debugs(84, 0, "WARNING: " << hlp->id_name << " #" << srv->index + 1 <<
758 " (FD " << fd << ") exited");
62e76326 759
5ea33fce 760 if (hlp->n_active < hlp->n_to_start / 2) {
bf8fe701 761 debugs(80, 0, "Too few " << hlp->id_name << " processes are running");
5ea33fce 762
763 if (hlp->last_restart > squid_curtime - 30)
764 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
765
bf8fe701 766 debugs(80, 0, "Starting new helpers");
5ea33fce 767
768 helperOpenServers(hlp);
769 }
14e87a44 770 }
62e76326 771
fa80a8ef 772 cbdataReferenceDone(srv->parent);
14e87a44 773 cbdataFree(srv);
74addf6c 774}
775
94439e4e 776static void
777helperStatefulServerFree(int fd, void *data)
778{
e6ccf245 779 helper_stateful_server *srv = (helper_stateful_server *)data;
94439e4e 780 statefulhelper *hlp = srv->parent;
781 helper_stateful_request *r;
62e76326 782
07eca7e0 783 if (srv->rbuf) {
784 memFreeBuf(srv->rbuf_sz, srv->rbuf);
785 srv->rbuf = NULL;
94439e4e 786 }
62e76326 787
07eca7e0 788#if 0
2fe7eff9 789 srv->wqueue->clean();
032785bf 790
791 delete srv->wqueue;
07eca7e0 792
793#endif
794
94439e4e 795 if ((r = srv->request)) {
62e76326 796 void *cbdata;
797
798 if (cbdataReferenceValidDone(r->data, &cbdata))
07eca7e0 799 r->callback(cbdata, srv, NULL);
62e76326 800
801 helperStatefulRequestFree(r);
802
803 srv->request = NULL;
94439e4e 804 }
62e76326 805
3c641669 806 /* TODO: walk the local queue of requests and carry them all out */
94439e4e 807 if (srv->wfd != srv->rfd && srv->wfd != -1)
62e76326 808 comm_close(srv->wfd);
809
94439e4e 810 dlinkDelete(&srv->link, &hlp->servers);
62e76326 811
94439e4e 812 hlp->n_running--;
62e76326 813
94439e4e 814 assert(hlp->n_running >= 0);
62e76326 815
94439e4e 816 if (!srv->flags.shutdown) {
d8f10d6a 817 hlp->n_active--;
818 assert( hlp->n_active >= 0);
bf8fe701 819 debugs(84, 0, "WARNING: " << hlp->id_name << " #" << srv->index + 1 << " (FD " << fd << ") exited");
62e76326 820
5ea33fce 821 if (hlp->n_active <= hlp->n_to_start / 2) {
bf8fe701 822 debugs(80, 0, "Too few " << hlp->id_name << " processes are running");
5ea33fce 823
824 if (hlp->last_restart > squid_curtime - 30)
825 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
826
bf8fe701 827 debugs(80, 0, "Starting new helpers");
5ea33fce 828
829 helperStatefulOpenServers(hlp);
830 }
94439e4e 831 }
62e76326 832
94439e4e 833 if (srv->data != NULL)
b001e822 834 hlp->datapool->free(srv->data);
62e76326 835
fa80a8ef 836 cbdataReferenceDone(srv->parent);
62e76326 837
94439e4e 838 cbdataFree(srv);
839}
840
841
74addf6c 842static void
c4b7a5a9 843helperHandleRead(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
74addf6c 844{
74addf6c 845 char *t = NULL;
e6ccf245 846 helper_server *srv = (helper_server *)data;
74addf6c 847 helper *hlp = srv->parent;
fa80a8ef 848 assert(cbdataReferenceValid(data));
c4b7a5a9 849
850 /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
62e76326 851
c4b7a5a9 852 if (flag == COMM_ERR_CLOSING) {
853 return;
854 }
855
420e3e30
HN
856 assert(fd == srv->rfd);
857
4f0ef8e8 858 debugs(84, 5, "helperHandleRead: " << len << " bytes from " << hlp->id_name << " #" << srv->index + 1);
62e76326 859
c4b7a5a9 860 if (flag != COMM_OK || len <= 0) {
62e76326 861 if (len < 0)
bf8fe701 862 debugs(84, 1, "helperHandleRead: FD " << fd << " read: " << xstrerror());
62e76326 863
864 comm_close(fd);
865
866 return;
74addf6c 867 }
62e76326 868
07eca7e0 869 srv->roffset += len;
870 srv->rbuf[srv->roffset] = '\0';
bf8fe701 871 debugs(84, 9, "helperHandleRead: '" << srv->rbuf << "'");
62e76326 872
07eca7e0 873 if (!srv->stats.pending) {
62e76326 874 /* someone spoke without being spoken to */
bf8fe701 875 debugs(84, 1, "helperHandleRead: unexpected read from " <<
876 hlp->id_name << " #" << srv->index + 1 << ", " << (int)len <<
877 " bytes '" << srv->rbuf << "'");
878
07eca7e0 879 srv->roffset = 0;
880 srv->rbuf[0] = '\0';
881 }
882
883 while ((t = strchr(srv->rbuf, '\n'))) {
62e76326 884 /* end of reply found */
07eca7e0 885 helper_request *r;
886 char *msg = srv->rbuf;
887 int i = 0;
bf8fe701 888 debugs(84, 3, "helperHandleRead: end of reply found");
6bf4f823 889
890 if (t > srv->rbuf && t[-1] == '\r')
891 t[-1] = '\0';
892
07eca7e0 893 *t++ = '\0';
62e76326 894
07eca7e0 895 if (hlp->concurrency) {
896 i = strtol(msg, &msg, 10);
62e76326 897
e4755e29 898 while (*msg && xisspace(*msg))
07eca7e0 899 msg++;
900 }
62e76326 901
07eca7e0 902 r = srv->requests[i];
62e76326 903
07eca7e0 904 if (r) {
905 HLPCB *callback = r->callback;
906 void *cbdata;
62e76326 907
07eca7e0 908 srv->requests[i] = NULL;
62e76326 909
07eca7e0 910 r->callback = NULL;
62e76326 911
07eca7e0 912 if (cbdataReferenceValidDone(r->data, &cbdata))
913 callback(cbdata, msg);
62e76326 914
07eca7e0 915 srv->stats.pending--;
916
917 hlp->stats.replies++;
918
f9598528 919 srv->answer_time = current_time;
920
921 srv->dispatch_time = r->dispatch_time;
922
07eca7e0 923 hlp->stats.avg_svc_time =
924 intAverage(hlp->stats.avg_svc_time,
925 tvSubMsec(r->dispatch_time, current_time),
926 hlp->stats.replies, REDIRECT_AV_FACTOR);
927
928 helperRequestFree(r);
929 } else {
bf8fe701 930 debugs(84, 1, "helperHandleRead: unexpected reply on channel " <<
931 i << " from " << hlp->id_name << " #" << srv->index + 1 <<
932 " '" << srv->rbuf << "'");
933
07eca7e0 934 }
935
936 srv->roffset -= (t - srv->rbuf);
937 memmove(srv->rbuf, t, srv->roffset + 1);
62e76326 938
420e3e30
HN
939 if (!srv->flags.shutdown) {
940 helperKickQueue(hlp);
941 } else if (!srv->flags.closing && !srv->stats.pending) {
62e76326 942 int wfd = srv->wfd;
943 srv->wfd = -1;
420e3e30
HN
944 if (srv->rfd == wfd)
945 srv->rfd = -1;
d8f10d6a 946 srv->flags.closing=1;
62e76326 947 comm_close(wfd);
26ac0430 948 return;
420e3e30 949 }
74addf6c 950 }
07eca7e0 951
420e3e30
HN
952 if (srv->rfd != -1)
953 comm_read(fd, srv->rbuf + srv->roffset, srv->rbuf_sz - srv->roffset - 1, helperHandleRead, srv);
74addf6c 954}
955
94439e4e 956static void
c4b7a5a9 957helperStatefulHandleRead(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
94439e4e 958{
94439e4e 959 char *t = NULL;
e6ccf245 960 helper_stateful_server *srv = (helper_stateful_server *)data;
94439e4e 961 helper_stateful_request *r;
962 statefulhelper *hlp = srv->parent;
fa80a8ef 963 assert(cbdataReferenceValid(data));
c4b7a5a9 964
965 /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
62e76326 966
c4b7a5a9 967 if (flag == COMM_ERR_CLOSING) {
968 return;
969 }
970
420e3e30
HN
971 assert(fd == srv->rfd);
972
4a7a3d56 973 debugs(84, 5, "helperStatefulHandleRead: " << len << " bytes from " <<
4f0ef8e8 974 hlp->id_name << " #" << srv->index + 1);
bf8fe701 975
62e76326 976
c4b7a5a9 977 if (flag != COMM_OK || len <= 0) {
62e76326 978 if (len < 0)
bf8fe701 979 debugs(84, 1, "helperStatefulHandleRead: FD " << fd << " read: " << xstrerror());
62e76326 980
981 comm_close(fd);
982
983 return;
94439e4e 984 }
62e76326 985
07eca7e0 986 srv->roffset += len;
987 srv->rbuf[srv->roffset] = '\0';
94439e4e 988 r = srv->request;
62e76326 989
94439e4e 990 if (r == NULL) {
62e76326 991 /* someone spoke without being spoken to */
bf8fe701 992 debugs(84, 1, "helperStatefulHandleRead: unexpected read from " <<
993 hlp->id_name << " #" << srv->index + 1 << ", " << (int)len <<
994 " bytes '" << srv->rbuf << "'");
995
07eca7e0 996 srv->roffset = 0;
997 }
998
999 if ((t = strchr(srv->rbuf, '\n'))) {
62e76326 1000 /* end of reply found */
bf8fe701 1001 debugs(84, 3, "helperStatefulHandleRead: end of reply found");
6bf4f823 1002
1003 if (t > srv->rbuf && t[-1] == '\r')
1004 t[-1] = '\0';
1005
62e76326 1006 *t = '\0';
1007
2734fd37 1008 if (r && cbdataReferenceValid(r->data)) {
07eca7e0 1009 switch ((r->callback(r->data, srv, srv->rbuf))) { /*if non-zero reserve helper */
62e76326 1010
1011 case S_HELPER_UNKNOWN:
26ac0430 1012 fatal("helperStatefulHandleRead: either a non-state aware callback was give to the stateful helper routines, or an uninitialised callback response was received.\n");
62e76326 1013 break;
1014
1015 case S_HELPER_RELEASE: /* helper finished with */
1016
360d26ea 1017 srv->flags.reserved = 0;
62e76326 1018
d20ce97d
HN
1019 if ((srv->parent->OnEmptyQueue != NULL) && (srv->data))
1020 srv->parent->OnEmptyQueue(srv->data);
62e76326 1021
d20ce97d 1022 debugs(84, 5, "StatefulHandleRead: releasing " << hlp->id_name << " #" << srv->index + 1);
62e76326 1023
1024 break;
1025
1026 case S_HELPER_RESERVE: /* 'pin' this helper for the caller */
1027
360d26ea 1028 srv->flags.reserved = 1;
a80b46e3 1029 debugs(84, 5, "StatefulHandleRead: reserving " << hlp->id_name << " #" << srv->index + 1);
62e76326 1030
62e76326 1031 break;
1032
1033 default:
1034 fatal("helperStatefulHandleRead: unknown stateful helper callback result.\n");
1035 }
1036
1037 } else {
bf8fe701 1038 debugs(84, 1, "StatefulHandleRead: no callback data registered");
62e76326 1039 }
1040
1041 srv->flags.busy = 0;
07eca7e0 1042 srv->roffset = 0;
62e76326 1043 helperStatefulRequestFree(r);
1044 srv->request = NULL;
1045 hlp->stats.replies++;
f9598528 1046 srv->answer_time = current_time;
62e76326 1047 hlp->stats.avg_svc_time =
1048 intAverage(hlp->stats.avg_svc_time,
1049 tvSubMsec(srv->dispatch_time, current_time),
1050 hlp->stats.replies, REDIRECT_AV_FACTOR);
1051
a8c4f8d6 1052 helperStatefulServerDone(srv);
94439e4e 1053 }
07eca7e0 1054
420e3e30
HN
1055 if (srv->rfd != -1)
1056 comm_read(srv->rfd, srv->rbuf + srv->roffset, srv->rbuf_sz - srv->roffset - 1,
07eca7e0 1057 helperStatefulHandleRead, srv);
94439e4e 1058}
1059
74addf6c 1060static void
1061Enqueue(helper * hlp, helper_request * r)
1062{
e6ccf245 1063 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
74addf6c 1064 dlinkAddTail(r, link, &hlp->queue);
1065 hlp->stats.queue_size++;
62e76326 1066
74addf6c 1067 if (hlp->stats.queue_size < hlp->n_running)
62e76326 1068 return;
1069
74addf6c 1070 if (squid_curtime - hlp->last_queue_warn < 600)
62e76326 1071 return;
1072
fe73896c 1073 if (shutting_down || reconfiguring)
62e76326 1074 return;
1075
74addf6c 1076 hlp->last_queue_warn = squid_curtime;
62e76326 1077
bf8fe701 1078 debugs(84, 0, "WARNING: All " << hlp->id_name << " processes are busy.");
1079 debugs(84, 0, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
62e76326 1080
62e76326 1081
74addf6c 1082 if (hlp->stats.queue_size > hlp->n_running * 2)
62e76326 1083 fatalf("Too many queued %s requests", hlp->id_name);
1084
bf8fe701 1085 debugs(84, 1, "Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1086
74addf6c 1087}
1088
94439e4e 1089static void
1090StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r)
1091{
e6ccf245 1092 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
94439e4e 1093 dlinkAddTail(r, link, &hlp->queue);
1094 hlp->stats.queue_size++;
62e76326 1095
94439e4e 1096 if (hlp->stats.queue_size < hlp->n_running)
62e76326 1097 return;
1098
893cbac6 1099 if (hlp->stats.queue_size > hlp->n_running * 2)
62e76326 1100 fatalf("Too many queued %s requests", hlp->id_name);
1101
94439e4e 1102 if (squid_curtime - hlp->last_queue_warn < 600)
62e76326 1103 return;
1104
94439e4e 1105 if (shutting_down || reconfiguring)
62e76326 1106 return;
1107
94439e4e 1108 hlp->last_queue_warn = squid_curtime;
62e76326 1109
bf8fe701 1110 debugs(84, 0, "WARNING: All " << hlp->id_name << " processes are busy.");
62e76326 1111
bf8fe701 1112 debugs(84, 0, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1113 debugs(84, 1, "Consider increasing the number of " << hlp->id_name << " processes in your config file.");
62e76326 1114
94439e4e 1115}
1116
74addf6c 1117static helper_request *
1118Dequeue(helper * hlp)
1119{
1120 dlink_node *link;
1121 helper_request *r = NULL;
62e76326 1122
74addf6c 1123 if ((link = hlp->queue.head)) {
62e76326 1124 r = (helper_request *)link->data;
1125 dlinkDelete(link, &hlp->queue);
1126 memFree(link, MEM_DLINK_NODE);
1127 hlp->stats.queue_size--;
74addf6c 1128 }
62e76326 1129
74addf6c 1130 return r;
1131}
1132
94439e4e 1133static helper_stateful_request *
1134StatefulDequeue(statefulhelper * hlp)
1135{
1136 dlink_node *link;
1137 helper_stateful_request *r = NULL;
62e76326 1138
94439e4e 1139 if ((link = hlp->queue.head)) {
62e76326 1140 r = (helper_stateful_request *)link->data;
1141 dlinkDelete(link, &hlp->queue);
1142 memFree(link, MEM_DLINK_NODE);
1143 hlp->stats.queue_size--;
94439e4e 1144 }
62e76326 1145
94439e4e 1146 return r;
1147}
1148
74addf6c 1149static helper_server *
1150GetFirstAvailable(helper * hlp)
1151{
1152 dlink_node *n;
07eca7e0 1153 helper_server *srv;
1154 helper_server *selected = NULL;
62e76326 1155
fe73896c 1156 if (hlp->n_running == 0)
62e76326 1157 return NULL;
1158
07eca7e0 1159 /* Find "least" loaded helper (approx) */
74addf6c 1160 for (n = hlp->servers.head; n != NULL; n = n->next) {
62e76326 1161 srv = (helper_server *)n->data;
1162
07eca7e0 1163 if (selected && selected->stats.pending <= srv->stats.pending)
62e76326 1164 continue;
1165
d8f10d6a 1166 if (srv->flags.shutdown)
62e76326 1167 continue;
1168
07eca7e0 1169 if (!srv->stats.pending)
1170 return srv;
1171
1172 if (selected) {
1173 selected = srv;
1174 break;
1175 }
1176
1177 selected = srv;
74addf6c 1178 }
62e76326 1179
07eca7e0 1180 /* Check for overload */
1181 if (!selected)
1182 return NULL;
1183
1184 if (selected->stats.pending >= (hlp->concurrency ? hlp->concurrency : 1))
1185 return NULL;
1186
1187 return selected;
74addf6c 1188}
1189
94439e4e 1190static helper_stateful_server *
1191StatefulGetFirstAvailable(statefulhelper * hlp)
1192{
1193 dlink_node *n;
1194 helper_stateful_server *srv = NULL;
4f0ef8e8 1195 debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp->n_running);
62e76326 1196
94439e4e 1197 if (hlp->n_running == 0)
62e76326 1198 return NULL;
1199
94439e4e 1200 for (n = hlp->servers.head; n != NULL; n = n->next) {
62e76326 1201 srv = (helper_stateful_server *)n->data;
1202
1203 if (srv->flags.busy)
1204 continue;
1205
360d26ea 1206 if (srv->flags.reserved)
62e76326 1207 continue;
1208
d8f10d6a 1209 if (srv->flags.shutdown)
62e76326 1210 continue;
1211
1212 if ((hlp->IsAvailable != NULL) && (srv->data != NULL) && !(hlp->IsAvailable(srv->data)))
1213 continue;
1214
26ac0430 1215 debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv->index);
62e76326 1216 return srv;
94439e4e 1217 }
62e76326 1218
bf8fe701 1219 debugs(84, 5, "StatefulGetFirstAvailable: None available.");
94439e4e 1220 return NULL;
1221}
1222
1223
42679bd6 1224static void
1225helperDispatchWriteDone(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
1226{
07eca7e0 1227 helper_server *srv = (helper_server *)data;
1228
2fe7eff9 1229 srv->writebuf->clean();
032785bf 1230 delete srv->writebuf;
1231 srv->writebuf = NULL;
07eca7e0 1232 srv->flags.writing = 0;
1233
1234 if (flag != COMM_OK) {
1235 /* Helper server has crashed */
bf8fe701 1236 debugs(84, 0, "helperDispatch: Helper " << srv->parent->id_name << " #" << srv->index + 1 << " has crashed");
07eca7e0 1237 return;
1238 }
1239
2fe7eff9 1240 if (!srv->wqueue->isNull()) {
07eca7e0 1241 srv->writebuf = srv->wqueue;
032785bf 1242 srv->wqueue = new MemBuf;
07eca7e0 1243 srv->flags.writing = 1;
1244 comm_write(srv->wfd,
032785bf 1245 srv->writebuf->content(),
1246 srv->writebuf->contentSize(),
07eca7e0 1247 helperDispatchWriteDone, /* Handler */
2b663917 1248 srv, NULL); /* Handler-data, freefunc */
07eca7e0 1249 }
42679bd6 1250}
1251
74addf6c 1252static void
1253helperDispatch(helper_server * srv, helper_request * r)
1254{
1255 helper *hlp = srv->parent;
07eca7e0 1256 helper_request **ptr = NULL;
1257 unsigned int slot;
62e76326 1258
fa80a8ef 1259 if (!cbdataReferenceValid(r->data)) {
bf8fe701 1260 debugs(84, 1, "helperDispatch: invalid callback data");
62e76326 1261 helperRequestFree(r);
1262 return;
74addf6c 1263 }
62e76326 1264
07eca7e0 1265 for (slot = 0; slot < (hlp->concurrency ? hlp->concurrency : 1); slot++) {
1266 if (!srv->requests[slot]) {
1267 ptr = &srv->requests[slot];
1268 break;
1269 }
1270 }
1271
1272 assert(ptr);
1273 *ptr = r;
1274 srv->stats.pending += 1;
1275 r->dispatch_time = current_time;
1276
2fe7eff9 1277 if (srv->wqueue->isNull())
1278 srv->wqueue->init();
07eca7e0 1279
1280 if (hlp->concurrency)
2fe7eff9 1281 srv->wqueue->Printf("%d %s", slot, r->buf);
07eca7e0 1282 else
2fe7eff9 1283 srv->wqueue->append(r->buf, strlen(r->buf));
07eca7e0 1284
1285 if (!srv->flags.writing) {
032785bf 1286 assert(NULL == srv->writebuf);
07eca7e0 1287 srv->writebuf = srv->wqueue;
032785bf 1288 srv->wqueue = new MemBuf;
07eca7e0 1289 srv->flags.writing = 1;
1290 comm_write(srv->wfd,
032785bf 1291 srv->writebuf->content(),
1292 srv->writebuf->contentSize(),
07eca7e0 1293 helperDispatchWriteDone, /* Handler */
2b663917 1294 srv, NULL); /* Handler-data, free func */
07eca7e0 1295 }
1296
4a7a3d56 1297 debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index + 1 << ", " << strlen(r->buf) << " bytes");
bf8fe701 1298
74addf6c 1299 srv->stats.uses++;
1300 hlp->stats.requests++;
1301}
1302
42679bd6 1303static void
1304helperStatefulDispatchWriteDone(int fd, char *buf, size_t len, comm_err_t flag,
62e76326 1305 int xerrno, void *data)
42679bd6 1306{
62e76326 1307 /* nothing! */
42679bd6 1308}
1309
1310
94439e4e 1311static void
1312helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r)
1313{
1314 statefulhelper *hlp = srv->parent;
62e76326 1315
fa80a8ef 1316 if (!cbdataReferenceValid(r->data)) {
bf8fe701 1317 debugs(84, 1, "helperStatefulDispatch: invalid callback data");
62e76326 1318 helperStatefulRequestFree(r);
1319 return;
94439e4e 1320 }
62e76326 1321
bf8fe701 1322 debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index + 1);
62e76326 1323
94439e4e 1324 if (r->placeholder == 1) {
62e76326 1325 /* a callback is needed before this request can _use_ a helper. */
d20ce97d 1326 /* we don't care about releasing this helper. The request NEVER
62e76326 1327 * gets to the helper. So we throw away the return code */
1328 r->callback(r->data, srv, NULL);
1329 /* throw away the placeholder */
1330 helperStatefulRequestFree(r);
1331 /* and push the queue. Note that the callback may have submitted a new
1332 * request to the helper which is why we test for the request*/
1333
420e3e30 1334 if (srv->request == NULL)
a8c4f8d6 1335 helperStatefulServerDone(srv);
62e76326 1336
1337 return;
94439e4e 1338 }
62e76326 1339
94439e4e 1340 srv->flags.busy = 1;
1341 srv->request = r;
1342 srv->dispatch_time = current_time;
42679bd6 1343 comm_write(srv->wfd,
62e76326 1344 r->buf,
1345 strlen(r->buf),
1346 helperStatefulDispatchWriteDone, /* Handler */
2b663917 1347 hlp, NULL); /* Handler-data, free func */
bf8fe701 1348 debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
1349 hlp->id_name << " #" << srv->index + 1 << ", " <<
1350 (int) strlen(r->buf) << " bytes");
1351
94439e4e 1352 srv->stats.uses++;
1353 hlp->stats.requests++;
1354}
1355
1356
74addf6c 1357static void
1358helperKickQueue(helper * hlp)
1359{
1360 helper_request *r;
1361 helper_server *srv;
62e76326 1362
74addf6c 1363 while ((srv = GetFirstAvailable(hlp)) && (r = Dequeue(hlp)))
62e76326 1364 helperDispatch(srv, r);
74addf6c 1365}
1366
94439e4e 1367static void
1368helperStatefulKickQueue(statefulhelper * hlp)
1369{
1370 helper_stateful_request *r;
1371 helper_stateful_server *srv;
62e76326 1372
94439e4e 1373 while ((srv = StatefulGetFirstAvailable(hlp)) && (r = StatefulDequeue(hlp)))
62e76326 1374 helperStatefulDispatch(srv, r);
94439e4e 1375}
1376
1377static void
a8c4f8d6 1378helperStatefulServerDone(helper_stateful_server * srv)
94439e4e 1379{
420e3e30
HN
1380 if (!srv->flags.shutdown) {
1381 helperStatefulKickQueue(srv->parent);
a8db3389 1382 } else if (!srv->flags.closing && !srv->flags.reserved && !srv->flags.busy) {
420e3e30
HN
1383 int wfd = srv->wfd;
1384 srv->wfd = -1;
1385 if (srv->rfd == wfd)
1386 srv->rfd = -1;
1387 srv->flags.closing=1;
1388 comm_close(wfd);
1389 return;
1390 }
94439e4e 1391}
1392
74addf6c 1393static void
1394helperRequestFree(helper_request * r)
1395{
fa80a8ef 1396 cbdataReferenceDone(r->data);
74addf6c 1397 xfree(r->buf);
00d77d6b 1398 delete r;
51ee7c82 1399}
1400
94439e4e 1401static void
1402helperStatefulRequestFree(helper_stateful_request * r)
1403{
26ac0430 1404 if (r) {
b1da7838 1405 cbdataReferenceDone(r->data);
1406 xfree(r->buf);
1407 delete r;
1408 }
51ee7c82 1409}
9522b380 1410
1411// TODO: should helper_ and helper_stateful_ have a common parent?
1412static bool
1413helperStartStats(StoreEntry *sentry, void *hlp, const char *label)
1414{
1415 if (!hlp) {
1416 if (label)
1417 storeAppendPrintf(sentry, "%s: unavailable\n", label);
1418 return false;
1419 }
1420
1421 if (label)
1422 storeAppendPrintf(sentry, "%s:\n", label);
1423
1424 return true;
1425}