]> git.ipfire.org Git - thirdparty/squid.git/blame - src/helper.cc
SourceFormat: enforcement
[thirdparty/squid.git] / src / helper.cc
CommitLineData
f740a279 1/*
262a0e14 2 * $Id$
f740a279 3 *
17bb3486 4 * DEBUG: section 84 Helper process maintenance
f740a279 5 * AUTHOR: Harvest Derived?
6 *
2b6662ba 7 * SQUID Web Proxy Cache http://www.squid-cache.org/
f740a279 8 * ----------------------------------------------------------
9 *
2b6662ba 10 * Squid is the result of efforts by numerous individuals from
11 * the Internet community; see the CONTRIBUTORS file for full
12 * details. Many organizations have provided support for Squid's
13 * development; see the SPONSORS file for full details. Squid is
14 * Copyrighted (C) 2001 by the Regents of the University of
15 * California; see the COPYRIGHT file for full details. Squid
16 * incorporates software developed and/or copyrighted by other
17 * sources; see the CREDITS file for full details.
f740a279 18 *
19 * This program is free software; you can redistribute it and/or modify
20 * it under the terms of the GNU General Public License as published by
21 * the Free Software Foundation; either version 2 of the License, or
22 * (at your option) any later version.
26ac0430 23 *
f740a279 24 * This program is distributed in the hope that it will be useful,
25 * but WITHOUT ANY WARRANTY; without even the implied warranty of
26 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
27 * GNU General Public License for more details.
26ac0430 28 *
f740a279 29 * You should have received a copy of the GNU General Public License
30 * along with this program; if not, write to the Free Software
31 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
32 *
33 */
34
74addf6c 35#include "squid.h"
51ee7c82 36#include "helper.h"
985c86bc 37#include "SquidTime.h"
e6ccf245 38#include "Store.h"
42679bd6 39#include "comm.h"
0eb49b6d 40#include "MemBuf.h"
d295d770 41#include "wordlist.h"
74addf6c 42
43#define HELPER_MAX_ARGS 64
44
7f37478b
AJ
45/* size of helper read buffer (maximum?). no reason given for this size */
46/* though it has been seen to be too short for some requests */
47/* it is dynamic, so increasng should not have side effects */
48#define BUF_8KB 8192
49
c4b7a5a9 50static IOCB helperHandleRead;
51static IOCB helperStatefulHandleRead;
1f5f60dd 52static PF helperServerFree;
94439e4e 53static PF helperStatefulServerFree;
74addf6c 54static void Enqueue(helper * hlp, helper_request *);
55static helper_request *Dequeue(helper * hlp);
94439e4e 56static helper_stateful_request *StatefulDequeue(statefulhelper * hlp);
74addf6c 57static helper_server *GetFirstAvailable(helper * hlp);
94439e4e 58static helper_stateful_server *StatefulGetFirstAvailable(statefulhelper * hlp);
74addf6c 59static void helperDispatch(helper_server * srv, helper_request * r);
94439e4e 60static void helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r);
74addf6c 61static void helperKickQueue(helper * hlp);
94439e4e 62static void helperStatefulKickQueue(statefulhelper * hlp);
a8c4f8d6 63static void helperStatefulServerDone(helper_stateful_server * srv);
74addf6c 64static void helperRequestFree(helper_request * r);
94439e4e 65static void helperStatefulRequestFree(helper_stateful_request * r);
66static void StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r);
9522b380 67static bool helperStartStats(StoreEntry *sentry, void *hlp, const char *label);
68
74addf6c 69
aa839030 70CBDATA_TYPE(helper);
71CBDATA_TYPE(helper_server);
72CBDATA_TYPE(statefulhelper);
73CBDATA_TYPE(helper_stateful_server);
74
74addf6c 75void
76helperOpenServers(helper * hlp)
77{
78 char *s;
79 char *progname;
80 char *shortname;
81 char *procname;
1b8af627 82 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
74addf6c 83 char fd_note_buf[FD_DESC_SZ];
84 helper_server *srv;
85 int nargs = 0;
86 int k;
b5d712b5 87 pid_t pid;
74addf6c 88 int rfd;
89 int wfd;
b5d712b5 90 void * hIpc;
74addf6c 91 wordlist *w;
62e76326 92
74addf6c 93 if (hlp->cmdline == NULL)
62e76326 94 return;
95
74addf6c 96 progname = hlp->cmdline->key;
62e76326 97
74addf6c 98 if ((s = strrchr(progname, '/')))
62e76326 99 shortname = xstrdup(s + 1);
74addf6c 100 else
62e76326 101 shortname = xstrdup(progname);
102
5a5b2c56 103 /* dont ever start more than hlp->n_to_start processes. */
d974a072 104 int need_new = hlp->n_to_start - hlp->n_active;
5a5b2c56
AJ
105
106 debugs(84, 1, "helperOpenServers: Starting " << need_new << "/" << hlp->n_to_start << " '" << shortname << "' processes");
107
04f7fd38 108 if (need_new < 1) {
5a5b2c56
AJ
109 debugs(84, 1, "helperOpenServers: No '" << shortname << "' processes needed.");
110 }
62e76326 111
e6ccf245 112 procname = (char *)xmalloc(strlen(shortname) + 3);
62e76326 113
74addf6c 114 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
62e76326 115
74addf6c 116 args[nargs++] = procname;
62e76326 117
74addf6c 118 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next)
62e76326 119 args[nargs++] = w->key;
120
74addf6c 121 args[nargs++] = NULL;
62e76326 122
74addf6c 123 assert(nargs <= HELPER_MAX_ARGS);
62e76326 124
5a5b2c56 125 for (k = 0; k < need_new; k++) {
62e76326 126 getCurrentTime();
127 rfd = wfd = -1;
b5d712b5 128 pid = ipcCreate(hlp->ipc_type,
129 progname,
130 args,
131 shortname,
cc192b50 132 hlp->addr,
b5d712b5 133 &rfd,
134 &wfd,
135 &hIpc);
136
137 if (pid < 0) {
bf8fe701 138 debugs(84, 1, "WARNING: Cannot run '" << progname << "' process.");
62e76326 139 continue;
140 }
141
142 hlp->n_running++;
d8f10d6a 143 hlp->n_active++;
b5d712b5 144 CBDATA_INIT_TYPE(helper_server);
62e76326 145 srv = cbdataAlloc(helper_server);
b5d712b5 146 srv->hIpc = hIpc;
147 srv->pid = pid;
62e76326 148 srv->index = k;
cc192b50 149 srv->addr = hlp->addr;
62e76326 150 srv->rfd = rfd;
151 srv->wfd = wfd;
7f37478b 152 srv->rbuf = (char *)memAllocBuf(BUF_8KB, &srv->rbuf_sz);
90bed1c4 153 srv->wqueue = new MemBuf;
07eca7e0 154 srv->roffset = 0;
155 srv->requests = (helper_request **)xcalloc(hlp->concurrency ? hlp->concurrency : 1, sizeof(*srv->requests));
62e76326 156 srv->parent = cbdataReference(hlp);
157 dlinkAddTail(srv, &srv->link, &hlp->servers);
158
159 if (rfd == wfd) {
160 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
161 fd_note(rfd, fd_note_buf);
162 } else {
163 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
164 fd_note(rfd, fd_note_buf);
165 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
166 fd_note(wfd, fd_note_buf);
167 }
168
169 commSetNonBlocking(rfd);
170
171 if (wfd != rfd)
172 commSetNonBlocking(wfd);
173
174 comm_add_close_handler(rfd, helperServerFree, srv);
07eca7e0 175
176 comm_read(srv->rfd, srv->rbuf, srv->rbuf_sz - 1, helperHandleRead, srv);
74addf6c 177 }
62e76326 178
5ea33fce 179 hlp->last_restart = squid_curtime;
74addf6c 180 safe_free(shortname);
181 safe_free(procname);
838b993c 182 helperKickQueue(hlp);
74addf6c 183}
184
0a6dd8fb 185/**
4f0ef8e8 186 * DPW 2007-05-08
26ac0430 187 *
4f0ef8e8 188 * helperStatefulOpenServers: create the stateful child helper processes
189 */
94439e4e 190void
191helperStatefulOpenServers(statefulhelper * hlp)
192{
94439e4e 193 char *shortname;
1b8af627 194 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
94439e4e 195 char fd_note_buf[FD_DESC_SZ];
94439e4e 196 int nargs = 0;
62e76326 197
94439e4e 198 if (hlp->cmdline == NULL)
62e76326 199 return;
200
4f0ef8e8 201 char *progname = hlp->cmdline->key;
62e76326 202
4f0ef8e8 203 char *s;
94439e4e 204 if ((s = strrchr(progname, '/')))
62e76326 205 shortname = xstrdup(s + 1);
94439e4e 206 else
62e76326 207 shortname = xstrdup(progname);
208
5a5b2c56 209 /* dont ever start more than hlp->n_to_start processes. */
d974a072
AJ
210 /* n_active are the helpers which have not been shut down. */
211 int need_new = hlp->n_to_start - hlp->n_active;
5a5b2c56
AJ
212
213 debugs(84, 1, "helperOpenServers: Starting " << need_new << "/" << hlp->n_to_start << " '" << shortname << "' processes");
214
04f7fd38 215 if (need_new < 1) {
5a5b2c56
AJ
216 debugs(84, 1, "helperStatefulOpenServers: No '" << shortname << "' processes needed.");
217 }
62e76326 218
4f0ef8e8 219 char *procname = (char *)xmalloc(strlen(shortname) + 3);
62e76326 220
94439e4e 221 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
62e76326 222
94439e4e 223 args[nargs++] = procname;
62e76326 224
4f0ef8e8 225 for (wordlist *w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next)
62e76326 226 args[nargs++] = w->key;
227
94439e4e 228 args[nargs++] = NULL;
62e76326 229
94439e4e 230 assert(nargs <= HELPER_MAX_ARGS);
62e76326 231
5a5b2c56 232 for (int k = 0; k < need_new; k++) {
62e76326 233 getCurrentTime();
26ac0430
AJ
234 int rfd = -1;
235 int wfd = -1;
236 void * hIpc;
4f0ef8e8 237 pid_t pid = ipcCreate(hlp->ipc_type,
26ac0430
AJ
238 progname,
239 args,
240 shortname,
241 hlp->addr,
242 &rfd,
243 &wfd,
244 &hIpc);
b5d712b5 245
246 if (pid < 0) {
bf8fe701 247 debugs(84, 1, "WARNING: Cannot run '" << progname << "' process.");
62e76326 248 continue;
249 }
250
251 hlp->n_running++;
d8f10d6a 252 hlp->n_active++;
b5d712b5 253 CBDATA_INIT_TYPE(helper_stateful_server);
4f0ef8e8 254 helper_stateful_server *srv = cbdataAlloc(helper_stateful_server);
b5d712b5 255 srv->hIpc = hIpc;
256 srv->pid = pid;
360d26ea 257 srv->flags.reserved = 0;
62e76326 258 srv->stats.submits = 0;
259 srv->stats.releases = 0;
260 srv->index = k;
cc192b50 261 srv->addr = hlp->addr;
62e76326 262 srv->rfd = rfd;
263 srv->wfd = wfd;
7f37478b 264 srv->rbuf = (char *)memAllocBuf(BUF_8KB, &srv->rbuf_sz);
07eca7e0 265 srv->roffset = 0;
62e76326 266 srv->parent = cbdataReference(hlp);
267
268 if (hlp->datapool != NULL)
b001e822 269 srv->data = hlp->datapool->alloc();
62e76326 270
271 dlinkAddTail(srv, &srv->link, &hlp->servers);
272
273 if (rfd == wfd) {
274 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
275 fd_note(rfd, fd_note_buf);
276 } else {
277 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
278 fd_note(rfd, fd_note_buf);
279 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
280 fd_note(wfd, fd_note_buf);
281 }
282
283 commSetNonBlocking(rfd);
284
285 if (wfd != rfd)
286 commSetNonBlocking(wfd);
287
288 comm_add_close_handler(rfd, helperStatefulServerFree, srv);
07eca7e0 289
290 comm_read(srv->rfd, srv->rbuf, srv->rbuf_sz - 1, helperStatefulHandleRead, srv);
94439e4e 291 }
62e76326 292
5ea33fce 293 hlp->last_restart = squid_curtime;
94439e4e 294 safe_free(shortname);
295 safe_free(procname);
296 helperStatefulKickQueue(hlp);
297}
298
299
74addf6c 300void
301helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
302{
5b5f9257 303 if (hlp == NULL) {
bf8fe701 304 debugs(84, 3, "helperSubmit: hlp == NULL");
62e76326 305 callback(data, NULL);
306 return;
5b5f9257 307 }
62e76326 308
eff7218b 309 helper_request *r = new helper_request;
310 helper_server *srv;
311
74addf6c 312 r->callback = callback;
fa80a8ef 313 r->data = cbdataReference(data);
74addf6c 314 r->buf = xstrdup(buf);
62e76326 315
74addf6c 316 if ((srv = GetFirstAvailable(hlp)))
62e76326 317 helperDispatch(srv, r);
74addf6c 318 else
62e76326 319 Enqueue(hlp, r);
320
bf8fe701 321 debugs(84, 9, "helperSubmit: " << buf);
94439e4e 322}
323
0a6dd8fb 324/// lastserver = "server last used as part of a reserved request sequence"
94439e4e 325void
326helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPSCB * callback, void *data, helper_stateful_server * lastserver)
327{
94439e4e 328 if (hlp == NULL) {
bf8fe701 329 debugs(84, 3, "helperStatefulSubmit: hlp == NULL");
62e76326 330 callback(data, 0, NULL);
331 return;
94439e4e 332 }
62e76326 333
eff7218b 334 helper_stateful_request *r = new helper_stateful_request;
eff7218b 335
94439e4e 336 r->callback = callback;
fa80a8ef 337 r->data = cbdataReference(data);
62e76326 338
721b0310 339 if (buf != NULL) {
62e76326 340 r->buf = xstrdup(buf);
341 r->placeholder = 0;
721b0310 342 } else {
62e76326 343 r->buf = NULL;
344 r->placeholder = 1;
721b0310 345 }
62e76326 346
94439e4e 347 if ((buf != NULL) && lastserver) {
bf8fe701 348 debugs(84, 5, "StatefulSubmit with lastserver " << lastserver);
360d26ea 349 assert(lastserver->flags.reserved);
a80b46e3 350 assert(!(lastserver->request));
62e76326 351
d20ce97d
HN
352 debugs(84, 5, "StatefulSubmit dispatching");
353 helperStatefulDispatch(lastserver, r);
94439e4e 354 } else {
26ac0430 355 helper_stateful_server *srv;
62e76326 356 if ((srv = StatefulGetFirstAvailable(hlp))) {
357 helperStatefulDispatch(srv, r);
358 } else
359 StatefulEnqueue(hlp, r);
94439e4e 360 }
62e76326 361
bf8fe701 362 debugs(84, 9, "helperStatefulSubmit: placeholder: '" << r->placeholder << "', buf '" << buf << "'.");
94439e4e 363}
364
0a6dd8fb 365/**
4f0ef8e8 366 * DPW 2007-05-08
367 *
368 * helperStatefulReleaseServer tells the helper that whoever was
369 * using it no longer needs its services.
4f0ef8e8 370 */
94439e4e 371void
372helperStatefulReleaseServer(helper_stateful_server * srv)
94439e4e 373{
4f0ef8e8 374 debugs(84, 3, HERE << "srv-" << srv->index << " flags.reserved = " << srv->flags.reserved);
360d26ea 375 if (!srv->flags.reserved)
be29fa07 376 return;
4f0ef8e8 377
60d096f4 378 srv->stats.releases++;
62e76326 379
360d26ea 380 srv->flags.reserved = 0;
4f0ef8e8 381 if (srv->parent->OnEmptyQueue != NULL && srv->data)
26ac0430 382 srv->parent->OnEmptyQueue(srv->data);
420e3e30 383
a8c4f8d6 384 helperStatefulServerDone(srv);
94439e4e 385}
386
0a6dd8fb 387/** return a pointer to the stateful routines data area */
94439e4e 388void *
389helperStatefulServerGetData(helper_stateful_server * srv)
94439e4e 390{
391 return srv->data;
74addf6c 392}
393
0a6dd8fb
AJ
394/**
395 * Dump some stats about the helper states to a StoreEntry
396 */
74addf6c 397void
9522b380 398helperStats(StoreEntry * sentry, helper * hlp, const char *label)
74addf6c 399{
9522b380 400 if (!helperStartStats(sentry, hlp, label))
401 return;
402
0a706c69 403 storeAppendPrintf(sentry, "program: %s\n",
62e76326 404 hlp->cmdline->key);
d974a072
AJ
405 storeAppendPrintf(sentry, "number active: %d of %d (%d shutting down)\n",
406 hlp->n_active, hlp->n_to_start, (hlp->n_running - hlp->n_active) );
74addf6c 407 storeAppendPrintf(sentry, "requests sent: %d\n",
62e76326 408 hlp->stats.requests);
74addf6c 409 storeAppendPrintf(sentry, "replies received: %d\n",
62e76326 410 hlp->stats.replies);
74addf6c 411 storeAppendPrintf(sentry, "queue length: %d\n",
62e76326 412 hlp->stats.queue_size);
f9598528 413 storeAppendPrintf(sentry, "avg service time: %d msec\n",
414 hlp->stats.avg_svc_time);
74addf6c 415 storeAppendPrintf(sentry, "\n");
09c1ece1 416 storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%s\t%7s\t%7s\t%7s\n",
62e76326 417 "#",
418 "FD",
419 "PID",
420 "# Requests",
421 "Flags",
422 "Time",
423 "Offset",
424 "Request");
425
4f0ef8e8 426 for (dlink_node *link = hlp->servers.head; link; link = link->next) {
07eca7e0 427 helper_server *srv = (helper_server*)link->data;
f9598528 428 double tt = 0.001 * (srv->requests[0] ? tvSubMsec(srv->requests[0]->dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
d8f10d6a 429 storeAppendPrintf(sentry, "%7d\t%7d\t%7d\t%11d\t%c%c%c%c\t%7.3f\t%7d\t%s\n",
62e76326 430 srv->index + 1,
431 srv->rfd,
432 srv->pid,
433 srv->stats.uses,
07eca7e0 434 srv->stats.pending ? 'B' : ' ',
435 srv->flags.writing ? 'W' : ' ',
62e76326 436 srv->flags.closing ? 'C' : ' ',
437 srv->flags.shutdown ? 'S' : ' ',
438 tt < 0.0 ? 0.0 : tt,
07eca7e0 439 (int) srv->roffset,
440 srv->requests[0] ? log_quote(srv->requests[0]->buf) : "(none)");
74addf6c 441 }
62e76326 442
74addf6c 443 storeAppendPrintf(sentry, "\nFlags key:\n\n");
74addf6c 444 storeAppendPrintf(sentry, " B = BUSY\n");
f9598528 445 storeAppendPrintf(sentry, " W = WRITING\n");
74addf6c 446 storeAppendPrintf(sentry, " C = CLOSING\n");
d974a072 447 storeAppendPrintf(sentry, " S = SHUTDOWN PENDING\n");
74addf6c 448}
449
94439e4e 450void
9522b380 451helperStatefulStats(StoreEntry * sentry, statefulhelper * hlp, const char *label)
94439e4e 452{
9522b380 453 if (!helperStartStats(sentry, hlp, label))
454 return;
455
cf154edc 456 storeAppendPrintf(sentry, "program: %s\n",
457 hlp->cmdline->key);
d974a072
AJ
458 storeAppendPrintf(sentry, "number active: %d of %d (%d shutting down)\n",
459 hlp->n_active, hlp->n_to_start, (hlp->n_running - hlp->n_active) );
94439e4e 460 storeAppendPrintf(sentry, "requests sent: %d\n",
62e76326 461 hlp->stats.requests);
94439e4e 462 storeAppendPrintf(sentry, "replies received: %d\n",
62e76326 463 hlp->stats.replies);
94439e4e 464 storeAppendPrintf(sentry, "queue length: %d\n",
62e76326 465 hlp->stats.queue_size);
94439e4e 466 storeAppendPrintf(sentry, "avg service time: %d msec\n",
62e76326 467 hlp->stats.avg_svc_time);
94439e4e 468 storeAppendPrintf(sentry, "\n");
3900cea9 469 storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
62e76326 470 "#",
471 "FD",
472 "PID",
473 "# Requests",
62e76326 474 "Flags",
475 "Time",
476 "Offset",
477 "Request");
478
4f0ef8e8 479 for (dlink_node *link = hlp->servers.head; link; link = link->next) {
480 helper_stateful_server *srv = (helper_stateful_server *)link->data;
0a6dd8fb 481 double tt = 0.001 * tvSubMsec(srv->dispatch_time, srv->flags.busy ? current_time : srv->answer_time);
d20ce97d 482 storeAppendPrintf(sentry, "%7d\t%7d\t%7d\t%11d\t%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
62e76326 483 srv->index + 1,
484 srv->rfd,
485 srv->pid,
486 srv->stats.uses,
62e76326 487 srv->flags.busy ? 'B' : ' ',
488 srv->flags.closing ? 'C' : ' ',
360d26ea 489 srv->flags.reserved ? 'R' : ' ',
62e76326 490 srv->flags.shutdown ? 'S' : ' ',
491 srv->request ? (srv->request->placeholder ? 'P' : ' ') : ' ',
e1381638
AJ
492 tt < 0.0 ? 0.0 : tt,
493 (int) srv->roffset,
494 srv->request ? log_quote(srv->request->buf) : "(none)");
94439e4e 495 }
62e76326 496
94439e4e 497 storeAppendPrintf(sentry, "\nFlags key:\n\n");
94439e4e 498 storeAppendPrintf(sentry, " B = BUSY\n");
499 storeAppendPrintf(sentry, " C = CLOSING\n");
d20ce97d 500 storeAppendPrintf(sentry, " R = RESERVED\n");
d974a072 501 storeAppendPrintf(sentry, " S = SHUTDOWN PENDING\n");
5d146f7d 502 storeAppendPrintf(sentry, " P = PLACEHOLDER\n");
94439e4e 503}
504
74addf6c 505void
506helperShutdown(helper * hlp)
507{
c68e9c6b 508 dlink_node *link = hlp->servers.head;
2d5d5261 509#ifdef _SQUID_MSWIN_
510
511 HANDLE hIpc;
512 pid_t pid;
513 int no;
514#endif
62e76326 515
c68e9c6b 516 while (link) {
62e76326 517 helper_server *srv;
518 srv = (helper_server *)link->data;
519 link = link->next;
520
8cfc76db 521 if (srv->flags.shutdown) {
bf8fe701 522 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " has already SHUT DOWN.");
62e76326 523 continue;
524 }
525
d8f10d6a 526 hlp->n_active--;
527 assert(hlp->n_active >= 0);
62e76326 528 srv->flags.shutdown = 1; /* request it to shut itself down */
529
d8f10d6a 530 if (srv->flags.closing) {
bf8fe701 531 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is CLOSING.");
62e76326 532 continue;
533 }
534
d8f10d6a 535 if (srv->stats.pending) {
bf8fe701 536 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is BUSY.");
62e76326 537 continue;
538 }
539
540 srv->flags.closing = 1;
2d5d5261 541#ifdef _SQUID_MSWIN_
542
543 hIpc = srv->hIpc;
544 pid = srv->pid;
545 no = srv->index + 1;
546 shutdown(srv->wfd, SD_BOTH);
547#endif
548
bf8fe701 549 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " shutting down.");
62e76326 550 /* the rest of the details is dealt with in the helperServerFree
551 * close handler
552 */
553 comm_close(srv->rfd);
2d5d5261 554#ifdef _SQUID_MSWIN_
555
556 if (hIpc) {
557 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
558 getCurrentTime();
bf8fe701 559 debugs(84, 1, "helperShutdown: WARNING: " << hlp->id_name <<
560 " #" << no << " (" << hlp->cmdline->key << "," <<
561 (long int)pid << ") didn't exit in 5 seconds");
562
2d5d5261 563 }
564
565 CloseHandle(hIpc);
566 }
567
568#endif
569
74addf6c 570 }
571}
572
94439e4e 573void
574helperStatefulShutdown(statefulhelper * hlp)
575{
576 dlink_node *link = hlp->servers.head;
577 helper_stateful_server *srv;
2d5d5261 578#ifdef _SQUID_MSWIN_
579
580 HANDLE hIpc;
581 pid_t pid;
582 int no;
583#endif
62e76326 584
94439e4e 585 while (link) {
62e76326 586 srv = (helper_stateful_server *)link->data;
587 link = link->next;
588
8cfc76db 589 if (srv->flags.shutdown) {
bf8fe701 590 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " has already SHUT DOWN.");
62e76326 591 continue;
592 }
593
d8f10d6a 594 hlp->n_active--;
595 assert(hlp->n_active >= 0);
62e76326 596 srv->flags.shutdown = 1; /* request it to shut itself down */
597
598 if (srv->flags.busy) {
bf8fe701 599 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is BUSY.");
62e76326 600 continue;
601 }
602
603 if (srv->flags.closing) {
bf8fe701 604 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is CLOSING.");
62e76326 605 continue;
606 }
607
360d26ea 608 if (srv->flags.reserved) {
d7e0f901
AJ
609 if (shutting_down) {
610 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is RESERVED. Closing anyway.");
e1381638 611 } else {
d7e0f901
AJ
612 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is RESERVED. Not Shutting Down Yet.");
613 continue;
614 }
62e76326 615 }
616
62e76326 617 srv->flags.closing = 1;
2d5d5261 618#ifdef _SQUID_MSWIN_
619
620 hIpc = srv->hIpc;
621 pid = srv->pid;
622 no = srv->index + 1;
623 shutdown(srv->wfd, SD_BOTH);
624#endif
625
bf8fe701 626 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " shutting down.");
627
62e76326 628 /* the rest of the details is dealt with in the helperStatefulServerFree
629 * close handler
630 */
631 comm_close(srv->rfd);
2d5d5261 632#ifdef _SQUID_MSWIN_
633
634 if (hIpc) {
635 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
636 getCurrentTime();
bf8fe701 637 debugs(84, 1, "helperShutdown: WARNING: " << hlp->id_name <<
638 " #" << no << " (" << hlp->cmdline->key << "," <<
639 (long int)pid << ") didn't exit in 5 seconds");
2d5d5261 640 }
641
642 CloseHandle(hIpc);
643 }
644
645#endif
646
94439e4e 647 }
648}
649
650
1f5f60dd 651helper *
652helperCreate(const char *name)
653{
28c60158 654 helper *hlp;
aa839030 655 CBDATA_INIT_TYPE(helper);
72711e31 656 hlp = cbdataAlloc(helper);
1f5f60dd 657 hlp->id_name = name;
658 return hlp;
659}
660
94439e4e 661statefulhelper *
662helperStatefulCreate(const char *name)
663{
664 statefulhelper *hlp;
aa839030 665 CBDATA_INIT_TYPE(statefulhelper);
72711e31 666 hlp = cbdataAlloc(statefulhelper);
94439e4e 667 hlp->id_name = name;
668 return hlp;
669}
670
671
1f5f60dd 672void
673helperFree(helper * hlp)
674{
5dae8514 675 if (!hlp)
62e76326 676 return;
677
1f5f60dd 678 /* note, don't free hlp->name, it probably points to static memory */
fe73896c 679 if (hlp->queue.head)
bf8fe701 680 debugs(84, 0, "WARNING: freeing " << hlp->id_name << " helper with " <<
681 hlp->stats.queue_size << " requests queued");
62e76326 682
1f5f60dd 683 cbdataFree(hlp);
684}
685
94439e4e 686void
687helperStatefulFree(statefulhelper * hlp)
688{
5dae8514 689 if (!hlp)
62e76326 690 return;
691
94439e4e 692 /* note, don't free hlp->name, it probably points to static memory */
693 if (hlp->queue.head)
bf8fe701 694 debugs(84, 0, "WARNING: freeing " << hlp->id_name << " helper with " <<
695 hlp->stats.queue_size << " requests queued");
62e76326 696
94439e4e 697 cbdataFree(hlp);
698}
699
700
74addf6c 701/* ====================================================================== */
702/* LOCAL FUNCTIONS */
703/* ====================================================================== */
704
705static void
1f5f60dd 706helperServerFree(int fd, void *data)
74addf6c 707{
e6ccf245 708 helper_server *srv = (helper_server *)data;
74addf6c 709 helper *hlp = srv->parent;
ac750329 710 helper_request *r;
07eca7e0 711 int i, concurrency = hlp->concurrency;
712
713 if (!concurrency)
714 concurrency = 1;
715
07eca7e0 716 if (srv->rbuf) {
717 memFreeBuf(srv->rbuf_sz, srv->rbuf);
718 srv->rbuf = NULL;
74addf6c 719 }
62e76326 720
2fe7eff9 721 srv->wqueue->clean();
032785bf 722 delete srv->wqueue;
723
724 if (srv->writebuf) {
2fe7eff9 725 srv->writebuf->clean();
032785bf 726 delete srv->writebuf;
727 srv->writebuf = NULL;
728 }
62e76326 729
07eca7e0 730 for (i = 0; i < concurrency; i++) {
731 if ((r = srv->requests[i])) {
732 void *cbdata;
62e76326 733
07eca7e0 734 if (cbdataReferenceValidDone(r->data, &cbdata))
735 r->callback(cbdata, NULL);
62e76326 736
07eca7e0 737 helperRequestFree(r);
738
739 srv->requests[i] = NULL;
740 }
ac750329 741 }
62e76326 742
07eca7e0 743 safe_free(srv->requests);
744
3cdb7cd0 745 if (srv->wfd != srv->rfd && srv->wfd != -1)
62e76326 746 comm_close(srv->wfd);
747
74addf6c 748 dlinkDelete(&srv->link, &hlp->servers);
62e76326 749
74addf6c 750 hlp->n_running--;
62e76326 751
74addf6c 752 assert(hlp->n_running >= 0);
62e76326 753
1f5f60dd 754 if (!srv->flags.shutdown) {
d8f10d6a 755 hlp->n_active--;
756 assert(hlp->n_active >= 0);
bf8fe701 757 debugs(84, 0, "WARNING: " << hlp->id_name << " #" << srv->index + 1 <<
758 " (FD " << fd << ") exited");
62e76326 759
5ea33fce 760 if (hlp->n_active < hlp->n_to_start / 2) {
bf8fe701 761 debugs(80, 0, "Too few " << hlp->id_name << " processes are running");
5ea33fce 762
763 if (hlp->last_restart > squid_curtime - 30)
764 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
765
bf8fe701 766 debugs(80, 0, "Starting new helpers");
5ea33fce 767
768 helperOpenServers(hlp);
769 }
14e87a44 770 }
62e76326 771
fa80a8ef 772 cbdataReferenceDone(srv->parent);
14e87a44 773 cbdataFree(srv);
74addf6c 774}
775
94439e4e 776static void
777helperStatefulServerFree(int fd, void *data)
778{
e6ccf245 779 helper_stateful_server *srv = (helper_stateful_server *)data;
94439e4e 780 statefulhelper *hlp = srv->parent;
781 helper_stateful_request *r;
62e76326 782
07eca7e0 783 if (srv->rbuf) {
784 memFreeBuf(srv->rbuf_sz, srv->rbuf);
785 srv->rbuf = NULL;
94439e4e 786 }
62e76326 787
07eca7e0 788#if 0
2fe7eff9 789 srv->wqueue->clean();
032785bf 790
791 delete srv->wqueue;
07eca7e0 792
793#endif
794
94439e4e 795 if ((r = srv->request)) {
62e76326 796 void *cbdata;
797
798 if (cbdataReferenceValidDone(r->data, &cbdata))
07eca7e0 799 r->callback(cbdata, srv, NULL);
62e76326 800
801 helperStatefulRequestFree(r);
802
803 srv->request = NULL;
94439e4e 804 }
62e76326 805
3c641669 806 /* TODO: walk the local queue of requests and carry them all out */
94439e4e 807 if (srv->wfd != srv->rfd && srv->wfd != -1)
62e76326 808 comm_close(srv->wfd);
809
94439e4e 810 dlinkDelete(&srv->link, &hlp->servers);
62e76326 811
94439e4e 812 hlp->n_running--;
62e76326 813
94439e4e 814 assert(hlp->n_running >= 0);
62e76326 815
94439e4e 816 if (!srv->flags.shutdown) {
d8f10d6a 817 hlp->n_active--;
818 assert( hlp->n_active >= 0);
bf8fe701 819 debugs(84, 0, "WARNING: " << hlp->id_name << " #" << srv->index + 1 << " (FD " << fd << ") exited");
62e76326 820
5ea33fce 821 if (hlp->n_active <= hlp->n_to_start / 2) {
bf8fe701 822 debugs(80, 0, "Too few " << hlp->id_name << " processes are running");
5ea33fce 823
824 if (hlp->last_restart > squid_curtime - 30)
825 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
826
bf8fe701 827 debugs(80, 0, "Starting new helpers");
5ea33fce 828
829 helperStatefulOpenServers(hlp);
830 }
94439e4e 831 }
62e76326 832
94439e4e 833 if (srv->data != NULL)
b001e822 834 hlp->datapool->free(srv->data);
62e76326 835
fa80a8ef 836 cbdataReferenceDone(srv->parent);
62e76326 837
94439e4e 838 cbdataFree(srv);
839}
840
841
74addf6c 842static void
c4b7a5a9 843helperHandleRead(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
74addf6c 844{
74addf6c 845 char *t = NULL;
e6ccf245 846 helper_server *srv = (helper_server *)data;
74addf6c 847 helper *hlp = srv->parent;
fa80a8ef 848 assert(cbdataReferenceValid(data));
c4b7a5a9 849
850 /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
62e76326 851
c4b7a5a9 852 if (flag == COMM_ERR_CLOSING) {
853 return;
854 }
855
420e3e30
HN
856 assert(fd == srv->rfd);
857
4f0ef8e8 858 debugs(84, 5, "helperHandleRead: " << len << " bytes from " << hlp->id_name << " #" << srv->index + 1);
62e76326 859
c4b7a5a9 860 if (flag != COMM_OK || len <= 0) {
62e76326 861 if (len < 0)
bf8fe701 862 debugs(84, 1, "helperHandleRead: FD " << fd << " read: " << xstrerror());
62e76326 863
864 comm_close(fd);
865
866 return;
74addf6c 867 }
62e76326 868
07eca7e0 869 srv->roffset += len;
870 srv->rbuf[srv->roffset] = '\0';
bf8fe701 871 debugs(84, 9, "helperHandleRead: '" << srv->rbuf << "'");
62e76326 872
07eca7e0 873 if (!srv->stats.pending) {
62e76326 874 /* someone spoke without being spoken to */
bf8fe701 875 debugs(84, 1, "helperHandleRead: unexpected read from " <<
876 hlp->id_name << " #" << srv->index + 1 << ", " << (int)len <<
877 " bytes '" << srv->rbuf << "'");
878
07eca7e0 879 srv->roffset = 0;
880 srv->rbuf[0] = '\0';
881 }
882
883 while ((t = strchr(srv->rbuf, '\n'))) {
62e76326 884 /* end of reply found */
07eca7e0 885 helper_request *r;
886 char *msg = srv->rbuf;
887 int i = 0;
bf8fe701 888 debugs(84, 3, "helperHandleRead: end of reply found");
6bf4f823 889
890 if (t > srv->rbuf && t[-1] == '\r')
891 t[-1] = '\0';
892
07eca7e0 893 *t++ = '\0';
62e76326 894
07eca7e0 895 if (hlp->concurrency) {
896 i = strtol(msg, &msg, 10);
62e76326 897
e4755e29 898 while (*msg && xisspace(*msg))
07eca7e0 899 msg++;
900 }
62e76326 901
07eca7e0 902 r = srv->requests[i];
62e76326 903
07eca7e0 904 if (r) {
905 HLPCB *callback = r->callback;
906 void *cbdata;
62e76326 907
07eca7e0 908 srv->requests[i] = NULL;
62e76326 909
07eca7e0 910 r->callback = NULL;
62e76326 911
07eca7e0 912 if (cbdataReferenceValidDone(r->data, &cbdata))
913 callback(cbdata, msg);
62e76326 914
07eca7e0 915 srv->stats.pending--;
916
917 hlp->stats.replies++;
918
f9598528 919 srv->answer_time = current_time;
920
921 srv->dispatch_time = r->dispatch_time;
922
07eca7e0 923 hlp->stats.avg_svc_time =
924 intAverage(hlp->stats.avg_svc_time,
925 tvSubMsec(r->dispatch_time, current_time),
926 hlp->stats.replies, REDIRECT_AV_FACTOR);
927
928 helperRequestFree(r);
929 } else {
bf8fe701 930 debugs(84, 1, "helperHandleRead: unexpected reply on channel " <<
931 i << " from " << hlp->id_name << " #" << srv->index + 1 <<
932 " '" << srv->rbuf << "'");
933
07eca7e0 934 }
935
936 srv->roffset -= (t - srv->rbuf);
937 memmove(srv->rbuf, t, srv->roffset + 1);
62e76326 938
420e3e30
HN
939 if (!srv->flags.shutdown) {
940 helperKickQueue(hlp);
941 } else if (!srv->flags.closing && !srv->stats.pending) {
62e76326 942 int wfd = srv->wfd;
943 srv->wfd = -1;
420e3e30
HN
944 if (srv->rfd == wfd)
945 srv->rfd = -1;
d8f10d6a 946 srv->flags.closing=1;
62e76326 947 comm_close(wfd);
26ac0430 948 return;
420e3e30 949 }
74addf6c 950 }
07eca7e0 951
420e3e30
HN
952 if (srv->rfd != -1)
953 comm_read(fd, srv->rbuf + srv->roffset, srv->rbuf_sz - srv->roffset - 1, helperHandleRead, srv);
74addf6c 954}
955
94439e4e 956static void
c4b7a5a9 957helperStatefulHandleRead(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
94439e4e 958{
94439e4e 959 char *t = NULL;
e6ccf245 960 helper_stateful_server *srv = (helper_stateful_server *)data;
94439e4e 961 helper_stateful_request *r;
962 statefulhelper *hlp = srv->parent;
fa80a8ef 963 assert(cbdataReferenceValid(data));
c4b7a5a9 964
965 /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
62e76326 966
c4b7a5a9 967 if (flag == COMM_ERR_CLOSING) {
968 return;
969 }
970
420e3e30
HN
971 assert(fd == srv->rfd);
972
4a7a3d56 973 debugs(84, 5, "helperStatefulHandleRead: " << len << " bytes from " <<
4f0ef8e8 974 hlp->id_name << " #" << srv->index + 1);
bf8fe701 975
62e76326 976
c4b7a5a9 977 if (flag != COMM_OK || len <= 0) {
62e76326 978 if (len < 0)
bf8fe701 979 debugs(84, 1, "helperStatefulHandleRead: FD " << fd << " read: " << xstrerror());
62e76326 980
981 comm_close(fd);
982
983 return;
94439e4e 984 }
62e76326 985
07eca7e0 986 srv->roffset += len;
987 srv->rbuf[srv->roffset] = '\0';
94439e4e 988 r = srv->request;
62e76326 989
94439e4e 990 if (r == NULL) {
62e76326 991 /* someone spoke without being spoken to */
bf8fe701 992 debugs(84, 1, "helperStatefulHandleRead: unexpected read from " <<
993 hlp->id_name << " #" << srv->index + 1 << ", " << (int)len <<
994 " bytes '" << srv->rbuf << "'");
995
07eca7e0 996 srv->roffset = 0;
997 }
998
999 if ((t = strchr(srv->rbuf, '\n'))) {
62e76326 1000 /* end of reply found */
e1381638 1001 int called = 1;
bf8fe701 1002 debugs(84, 3, "helperStatefulHandleRead: end of reply found");
6bf4f823 1003
1004 if (t > srv->rbuf && t[-1] == '\r')
1005 t[-1] = '\0';
1006
62e76326 1007 *t = '\0';
1008
2734fd37 1009 if (r && cbdataReferenceValid(r->data)) {
dcb802aa 1010 r->callback(r->data, srv, srv->rbuf);
62e76326 1011 } else {
bf8fe701 1012 debugs(84, 1, "StatefulHandleRead: no callback data registered");
e1381638 1013 called = 0;
62e76326 1014 }
1015
1016 srv->flags.busy = 0;
07eca7e0 1017 srv->roffset = 0;
62e76326 1018 helperStatefulRequestFree(r);
1019 srv->request = NULL;
1020 hlp->stats.replies++;
f9598528 1021 srv->answer_time = current_time;
62e76326 1022 hlp->stats.avg_svc_time =
1023 intAverage(hlp->stats.avg_svc_time,
1024 tvSubMsec(srv->dispatch_time, current_time),
1025 hlp->stats.replies, REDIRECT_AV_FACTOR);
1026
e1381638
AJ
1027 if (called)
1028 helperStatefulServerDone(srv);
1029 else
1030 helperStatefulReleaseServer(srv);
94439e4e 1031 }
07eca7e0 1032
420e3e30
HN
1033 if (srv->rfd != -1)
1034 comm_read(srv->rfd, srv->rbuf + srv->roffset, srv->rbuf_sz - srv->roffset - 1,
e1381638 1035 helperStatefulHandleRead, srv);
94439e4e 1036}
1037
74addf6c 1038static void
1039Enqueue(helper * hlp, helper_request * r)
1040{
e6ccf245 1041 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
74addf6c 1042 dlinkAddTail(r, link, &hlp->queue);
1043 hlp->stats.queue_size++;
62e76326 1044
74addf6c 1045 if (hlp->stats.queue_size < hlp->n_running)
62e76326 1046 return;
1047
74addf6c 1048 if (squid_curtime - hlp->last_queue_warn < 600)
62e76326 1049 return;
1050
fe73896c 1051 if (shutting_down || reconfiguring)
62e76326 1052 return;
1053
74addf6c 1054 hlp->last_queue_warn = squid_curtime;
62e76326 1055
bf8fe701 1056 debugs(84, 0, "WARNING: All " << hlp->id_name << " processes are busy.");
1057 debugs(84, 0, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
62e76326 1058
62e76326 1059
74addf6c 1060 if (hlp->stats.queue_size > hlp->n_running * 2)
62e76326 1061 fatalf("Too many queued %s requests", hlp->id_name);
1062
bf8fe701 1063 debugs(84, 1, "Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1064
74addf6c 1065}
1066
94439e4e 1067static void
1068StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r)
1069{
e6ccf245 1070 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
94439e4e 1071 dlinkAddTail(r, link, &hlp->queue);
1072 hlp->stats.queue_size++;
62e76326 1073
94439e4e 1074 if (hlp->stats.queue_size < hlp->n_running)
62e76326 1075 return;
1076
893cbac6 1077 if (hlp->stats.queue_size > hlp->n_running * 2)
62e76326 1078 fatalf("Too many queued %s requests", hlp->id_name);
1079
94439e4e 1080 if (squid_curtime - hlp->last_queue_warn < 600)
62e76326 1081 return;
1082
94439e4e 1083 if (shutting_down || reconfiguring)
62e76326 1084 return;
1085
94439e4e 1086 hlp->last_queue_warn = squid_curtime;
62e76326 1087
bf8fe701 1088 debugs(84, 0, "WARNING: All " << hlp->id_name << " processes are busy.");
62e76326 1089
bf8fe701 1090 debugs(84, 0, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1091 debugs(84, 1, "Consider increasing the number of " << hlp->id_name << " processes in your config file.");
62e76326 1092
94439e4e 1093}
1094
74addf6c 1095static helper_request *
1096Dequeue(helper * hlp)
1097{
1098 dlink_node *link;
1099 helper_request *r = NULL;
62e76326 1100
74addf6c 1101 if ((link = hlp->queue.head)) {
62e76326 1102 r = (helper_request *)link->data;
1103 dlinkDelete(link, &hlp->queue);
1104 memFree(link, MEM_DLINK_NODE);
1105 hlp->stats.queue_size--;
74addf6c 1106 }
62e76326 1107
74addf6c 1108 return r;
1109}
1110
94439e4e 1111static helper_stateful_request *
1112StatefulDequeue(statefulhelper * hlp)
1113{
1114 dlink_node *link;
1115 helper_stateful_request *r = NULL;
62e76326 1116
94439e4e 1117 if ((link = hlp->queue.head)) {
62e76326 1118 r = (helper_stateful_request *)link->data;
1119 dlinkDelete(link, &hlp->queue);
1120 memFree(link, MEM_DLINK_NODE);
1121 hlp->stats.queue_size--;
94439e4e 1122 }
62e76326 1123
94439e4e 1124 return r;
1125}
1126
74addf6c 1127static helper_server *
1128GetFirstAvailable(helper * hlp)
1129{
1130 dlink_node *n;
07eca7e0 1131 helper_server *srv;
1132 helper_server *selected = NULL;
62e76326 1133
fe73896c 1134 if (hlp->n_running == 0)
62e76326 1135 return NULL;
1136
07eca7e0 1137 /* Find "least" loaded helper (approx) */
74addf6c 1138 for (n = hlp->servers.head; n != NULL; n = n->next) {
62e76326 1139 srv = (helper_server *)n->data;
1140
07eca7e0 1141 if (selected && selected->stats.pending <= srv->stats.pending)
62e76326 1142 continue;
1143
d8f10d6a 1144 if (srv->flags.shutdown)
62e76326 1145 continue;
1146
07eca7e0 1147 if (!srv->stats.pending)
1148 return srv;
1149
1150 if (selected) {
1151 selected = srv;
1152 break;
1153 }
1154
1155 selected = srv;
74addf6c 1156 }
62e76326 1157
07eca7e0 1158 /* Check for overload */
1159 if (!selected)
1160 return NULL;
1161
1162 if (selected->stats.pending >= (hlp->concurrency ? hlp->concurrency : 1))
1163 return NULL;
1164
1165 return selected;
74addf6c 1166}
1167
94439e4e 1168static helper_stateful_server *
1169StatefulGetFirstAvailable(statefulhelper * hlp)
1170{
1171 dlink_node *n;
1172 helper_stateful_server *srv = NULL;
4f0ef8e8 1173 debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp->n_running);
62e76326 1174
94439e4e 1175 if (hlp->n_running == 0)
62e76326 1176 return NULL;
1177
94439e4e 1178 for (n = hlp->servers.head; n != NULL; n = n->next) {
62e76326 1179 srv = (helper_stateful_server *)n->data;
1180
1181 if (srv->flags.busy)
1182 continue;
1183
360d26ea 1184 if (srv->flags.reserved)
62e76326 1185 continue;
1186
d8f10d6a 1187 if (srv->flags.shutdown)
62e76326 1188 continue;
1189
1190 if ((hlp->IsAvailable != NULL) && (srv->data != NULL) && !(hlp->IsAvailable(srv->data)))
1191 continue;
1192
26ac0430 1193 debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv->index);
62e76326 1194 return srv;
94439e4e 1195 }
62e76326 1196
bf8fe701 1197 debugs(84, 5, "StatefulGetFirstAvailable: None available.");
94439e4e 1198 return NULL;
1199}
1200
1201
42679bd6 1202static void
1203helperDispatchWriteDone(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
1204{
07eca7e0 1205 helper_server *srv = (helper_server *)data;
1206
2fe7eff9 1207 srv->writebuf->clean();
032785bf 1208 delete srv->writebuf;
1209 srv->writebuf = NULL;
07eca7e0 1210 srv->flags.writing = 0;
1211
1212 if (flag != COMM_OK) {
1213 /* Helper server has crashed */
bf8fe701 1214 debugs(84, 0, "helperDispatch: Helper " << srv->parent->id_name << " #" << srv->index + 1 << " has crashed");
07eca7e0 1215 return;
1216 }
1217
2fe7eff9 1218 if (!srv->wqueue->isNull()) {
07eca7e0 1219 srv->writebuf = srv->wqueue;
032785bf 1220 srv->wqueue = new MemBuf;
07eca7e0 1221 srv->flags.writing = 1;
1222 comm_write(srv->wfd,
032785bf 1223 srv->writebuf->content(),
1224 srv->writebuf->contentSize(),
07eca7e0 1225 helperDispatchWriteDone, /* Handler */
2b663917 1226 srv, NULL); /* Handler-data, freefunc */
07eca7e0 1227 }
42679bd6 1228}
1229
74addf6c 1230static void
1231helperDispatch(helper_server * srv, helper_request * r)
1232{
1233 helper *hlp = srv->parent;
07eca7e0 1234 helper_request **ptr = NULL;
1235 unsigned int slot;
62e76326 1236
fa80a8ef 1237 if (!cbdataReferenceValid(r->data)) {
bf8fe701 1238 debugs(84, 1, "helperDispatch: invalid callback data");
62e76326 1239 helperRequestFree(r);
1240 return;
74addf6c 1241 }
62e76326 1242
07eca7e0 1243 for (slot = 0; slot < (hlp->concurrency ? hlp->concurrency : 1); slot++) {
1244 if (!srv->requests[slot]) {
1245 ptr = &srv->requests[slot];
1246 break;
1247 }
1248 }
1249
1250 assert(ptr);
1251 *ptr = r;
1252 srv->stats.pending += 1;
1253 r->dispatch_time = current_time;
1254
2fe7eff9 1255 if (srv->wqueue->isNull())
1256 srv->wqueue->init();
07eca7e0 1257
1258 if (hlp->concurrency)
2fe7eff9 1259 srv->wqueue->Printf("%d %s", slot, r->buf);
07eca7e0 1260 else
2fe7eff9 1261 srv->wqueue->append(r->buf, strlen(r->buf));
07eca7e0 1262
1263 if (!srv->flags.writing) {
032785bf 1264 assert(NULL == srv->writebuf);
07eca7e0 1265 srv->writebuf = srv->wqueue;
032785bf 1266 srv->wqueue = new MemBuf;
07eca7e0 1267 srv->flags.writing = 1;
1268 comm_write(srv->wfd,
032785bf 1269 srv->writebuf->content(),
1270 srv->writebuf->contentSize(),
07eca7e0 1271 helperDispatchWriteDone, /* Handler */
2b663917 1272 srv, NULL); /* Handler-data, free func */
07eca7e0 1273 }
1274
4a7a3d56 1275 debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index + 1 << ", " << strlen(r->buf) << " bytes");
bf8fe701 1276
74addf6c 1277 srv->stats.uses++;
1278 hlp->stats.requests++;
1279}
1280
42679bd6 1281static void
1282helperStatefulDispatchWriteDone(int fd, char *buf, size_t len, comm_err_t flag,
62e76326 1283 int xerrno, void *data)
42679bd6 1284{
62e76326 1285 /* nothing! */
42679bd6 1286}
1287
1288
94439e4e 1289static void
1290helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r)
1291{
1292 statefulhelper *hlp = srv->parent;
62e76326 1293
fa80a8ef 1294 if (!cbdataReferenceValid(r->data)) {
bf8fe701 1295 debugs(84, 1, "helperStatefulDispatch: invalid callback data");
62e76326 1296 helperStatefulRequestFree(r);
e1381638 1297 helperStatefulReleaseServer(srv);
62e76326 1298 return;
94439e4e 1299 }
62e76326 1300
bf8fe701 1301 debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index + 1);
62e76326 1302
94439e4e 1303 if (r->placeholder == 1) {
62e76326 1304 /* a callback is needed before this request can _use_ a helper. */
d20ce97d 1305 /* we don't care about releasing this helper. The request NEVER
62e76326 1306 * gets to the helper. So we throw away the return code */
1307 r->callback(r->data, srv, NULL);
1308 /* throw away the placeholder */
1309 helperStatefulRequestFree(r);
1310 /* and push the queue. Note that the callback may have submitted a new
1311 * request to the helper which is why we test for the request*/
1312
420e3e30 1313 if (srv->request == NULL)
a8c4f8d6 1314 helperStatefulServerDone(srv);
62e76326 1315
1316 return;
94439e4e 1317 }
62e76326 1318
94439e4e 1319 srv->flags.busy = 1;
dcb802aa 1320 srv->flags.reserved = 1;
94439e4e 1321 srv->request = r;
1322 srv->dispatch_time = current_time;
42679bd6 1323 comm_write(srv->wfd,
62e76326 1324 r->buf,
1325 strlen(r->buf),
1326 helperStatefulDispatchWriteDone, /* Handler */
2b663917 1327 hlp, NULL); /* Handler-data, free func */
bf8fe701 1328 debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
1329 hlp->id_name << " #" << srv->index + 1 << ", " <<
1330 (int) strlen(r->buf) << " bytes");
1331
94439e4e 1332 srv->stats.uses++;
1333 hlp->stats.requests++;
1334}
1335
1336
74addf6c 1337static void
1338helperKickQueue(helper * hlp)
1339{
1340 helper_request *r;
1341 helper_server *srv;
62e76326 1342
74addf6c 1343 while ((srv = GetFirstAvailable(hlp)) && (r = Dequeue(hlp)))
62e76326 1344 helperDispatch(srv, r);
74addf6c 1345}
1346
94439e4e 1347static void
1348helperStatefulKickQueue(statefulhelper * hlp)
1349{
1350 helper_stateful_request *r;
1351 helper_stateful_server *srv;
62e76326 1352
94439e4e 1353 while ((srv = StatefulGetFirstAvailable(hlp)) && (r = StatefulDequeue(hlp)))
62e76326 1354 helperStatefulDispatch(srv, r);
94439e4e 1355}
1356
1357static void
a8c4f8d6 1358helperStatefulServerDone(helper_stateful_server * srv)
94439e4e 1359{
420e3e30
HN
1360 if (!srv->flags.shutdown) {
1361 helperStatefulKickQueue(srv->parent);
a8db3389 1362 } else if (!srv->flags.closing && !srv->flags.reserved && !srv->flags.busy) {
420e3e30
HN
1363 int wfd = srv->wfd;
1364 srv->wfd = -1;
1365 if (srv->rfd == wfd)
1366 srv->rfd = -1;
1367 srv->flags.closing=1;
1368 comm_close(wfd);
1369 return;
1370 }
94439e4e 1371}
1372
74addf6c 1373static void
1374helperRequestFree(helper_request * r)
1375{
fa80a8ef 1376 cbdataReferenceDone(r->data);
74addf6c 1377 xfree(r->buf);
00d77d6b 1378 delete r;
51ee7c82 1379}
1380
94439e4e 1381static void
1382helperStatefulRequestFree(helper_stateful_request * r)
1383{
26ac0430 1384 if (r) {
b1da7838 1385 cbdataReferenceDone(r->data);
1386 xfree(r->buf);
1387 delete r;
1388 }
51ee7c82 1389}
9522b380 1390
1391// TODO: should helper_ and helper_stateful_ have a common parent?
1392static bool
1393helperStartStats(StoreEntry *sentry, void *hlp, const char *label)
1394{
1395 if (!hlp) {
1396 if (label)
1397 storeAppendPrintf(sentry, "%s: unavailable\n", label);
1398 return false;
1399 }
1400
1401 if (label)
1402 storeAppendPrintf(sentry, "%s:\n", label);
1403
1404 return true;
1405}