]> git.ipfire.org Git - thirdparty/squid.git/blame - src/helper.cc
cf_gen should use xcalloc() instead of calloc(), if for no other reason
[thirdparty/squid.git] / src / helper.cc
CommitLineData
f740a279 1
2/*
90bed1c4 3 * $Id: helper.cc,v 1.67 2005/09/11 10:19:41 serassio Exp $
f740a279 4 *
17bb3486 5 * DEBUG: section 84 Helper process maintenance
f740a279 6 * AUTHOR: Harvest Derived?
7 *
2b6662ba 8 * SQUID Web Proxy Cache http://www.squid-cache.org/
f740a279 9 * ----------------------------------------------------------
10 *
2b6662ba 11 * Squid is the result of efforts by numerous individuals from
12 * the Internet community; see the CONTRIBUTORS file for full
13 * details. Many organizations have provided support for Squid's
14 * development; see the SPONSORS file for full details. Squid is
15 * Copyrighted (C) 2001 by the Regents of the University of
16 * California; see the COPYRIGHT file for full details. Squid
17 * incorporates software developed and/or copyrighted by other
18 * sources; see the CREDITS file for full details.
f740a279 19 *
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
24 *
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
29 *
30 * You should have received a copy of the GNU General Public License
31 * along with this program; if not, write to the Free Software
32 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
33 *
34 */
35
74addf6c 36#include "squid.h"
51ee7c82 37#include "helper.h"
e6ccf245 38#include "Store.h"
42679bd6 39#include "comm.h"
74addf6c 40
41#define HELPER_MAX_ARGS 64
42
c4b7a5a9 43static IOCB helperHandleRead;
44static IOCB helperStatefulHandleRead;
1f5f60dd 45static PF helperServerFree;
94439e4e 46static PF helperStatefulServerFree;
74addf6c 47static void Enqueue(helper * hlp, helper_request *);
48static helper_request *Dequeue(helper * hlp);
94439e4e 49static helper_stateful_request *StatefulDequeue(statefulhelper * hlp);
74addf6c 50static helper_server *GetFirstAvailable(helper * hlp);
94439e4e 51static helper_stateful_server *StatefulGetFirstAvailable(statefulhelper * hlp);
74addf6c 52static void helperDispatch(helper_server * srv, helper_request * r);
94439e4e 53static void helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r);
74addf6c 54static void helperKickQueue(helper * hlp);
94439e4e 55static void helperStatefulKickQueue(statefulhelper * hlp);
74addf6c 56static void helperRequestFree(helper_request * r);
94439e4e 57static void helperStatefulRequestFree(helper_stateful_request * r);
58static void StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r);
59static helper_stateful_request *StatefulServerDequeue(helper_stateful_server * srv);
60static void StatefulServerEnqueue(helper_stateful_server * srv, helper_stateful_request * r);
61static void helperStatefulServerKickQueue(helper_stateful_server * srv);
74addf6c 62
63void
64helperOpenServers(helper * hlp)
65{
66 char *s;
67 char *progname;
68 char *shortname;
69 char *procname;
a2c963ae 70 const char *args[HELPER_MAX_ARGS];
74addf6c 71 char fd_note_buf[FD_DESC_SZ];
72 helper_server *srv;
73 int nargs = 0;
74 int k;
75 int x;
76 int rfd;
77 int wfd;
78 wordlist *w;
62e76326 79
74addf6c 80 if (hlp->cmdline == NULL)
62e76326 81 return;
82
74addf6c 83 progname = hlp->cmdline->key;
62e76326 84
74addf6c 85 if ((s = strrchr(progname, '/')))
62e76326 86 shortname = xstrdup(s + 1);
74addf6c 87 else
62e76326 88 shortname = xstrdup(progname);
89
17bb3486 90 debug(84, 1) ("helperOpenServers: Starting %d '%s' processes\n",
62e76326 91 hlp->n_to_start, shortname);
92
e6ccf245 93 procname = (char *)xmalloc(strlen(shortname) + 3);
62e76326 94
74addf6c 95 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
62e76326 96
74addf6c 97 args[nargs++] = procname;
62e76326 98
74addf6c 99 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next)
62e76326 100 args[nargs++] = w->key;
101
74addf6c 102 args[nargs++] = NULL;
62e76326 103
74addf6c 104 assert(nargs <= HELPER_MAX_ARGS);
62e76326 105
74addf6c 106 for (k = 0; k < hlp->n_to_start; k++) {
62e76326 107 getCurrentTime();
108 rfd = wfd = -1;
109 x = ipcCreate(hlp->ipc_type,
110 progname,
111 args,
112 shortname,
113 &rfd,
114 &wfd);
115
116 if (x < 0) {
117 debug(84, 1) ("WARNING: Cannot run '%s' process.\n", progname);
118 continue;
119 }
120
121 hlp->n_running++;
d8f10d6a 122 hlp->n_active++;
62e76326 123 srv = cbdataAlloc(helper_server);
124 srv->pid = x;
62e76326 125 srv->index = k;
126 srv->rfd = rfd;
127 srv->wfd = wfd;
07eca7e0 128 srv->rbuf = (char *)memAllocBuf(8192, &srv->rbuf_sz);
90bed1c4 129 srv->wqueue = new MemBuf;
07eca7e0 130 srv->roffset = 0;
131 srv->requests = (helper_request **)xcalloc(hlp->concurrency ? hlp->concurrency : 1, sizeof(*srv->requests));
62e76326 132 srv->parent = cbdataReference(hlp);
133 dlinkAddTail(srv, &srv->link, &hlp->servers);
134
135 if (rfd == wfd) {
136 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
137 fd_note(rfd, fd_note_buf);
138 } else {
139 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
140 fd_note(rfd, fd_note_buf);
141 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
142 fd_note(wfd, fd_note_buf);
143 }
144
145 commSetNonBlocking(rfd);
146
147 if (wfd != rfd)
148 commSetNonBlocking(wfd);
149
150 comm_add_close_handler(rfd, helperServerFree, srv);
07eca7e0 151
152 comm_read(srv->rfd, srv->rbuf, srv->rbuf_sz - 1, helperHandleRead, srv);
74addf6c 153 }
62e76326 154
5ea33fce 155 hlp->last_restart = squid_curtime;
74addf6c 156 safe_free(shortname);
157 safe_free(procname);
838b993c 158 helperKickQueue(hlp);
74addf6c 159}
160
94439e4e 161void
162helperStatefulOpenServers(statefulhelper * hlp)
163{
164 char *s;
165 char *progname;
166 char *shortname;
167 char *procname;
a2c963ae 168 const char *args[HELPER_MAX_ARGS];
94439e4e 169 char fd_note_buf[FD_DESC_SZ];
170 helper_stateful_server *srv;
171 int nargs = 0;
172 int k;
173 int x;
174 int rfd;
175 int wfd;
176 wordlist *w;
62e76326 177
94439e4e 178 if (hlp->cmdline == NULL)
62e76326 179 return;
180
94439e4e 181 progname = hlp->cmdline->key;
62e76326 182
94439e4e 183 if ((s = strrchr(progname, '/')))
62e76326 184 shortname = xstrdup(s + 1);
94439e4e 185 else
62e76326 186 shortname = xstrdup(progname);
187
17bb3486 188 debug(84, 1) ("helperStatefulOpenServers: Starting %d '%s' processes\n",
62e76326 189 hlp->n_to_start, shortname);
190
e6ccf245 191 procname = (char *)xmalloc(strlen(shortname) + 3);
62e76326 192
94439e4e 193 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
62e76326 194
94439e4e 195 args[nargs++] = procname;
62e76326 196
94439e4e 197 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next)
62e76326 198 args[nargs++] = w->key;
199
94439e4e 200 args[nargs++] = NULL;
62e76326 201
94439e4e 202 assert(nargs <= HELPER_MAX_ARGS);
62e76326 203
94439e4e 204 for (k = 0; k < hlp->n_to_start; k++) {
62e76326 205 getCurrentTime();
206 rfd = wfd = -1;
207 x = ipcCreate(hlp->ipc_type,
208 progname,
209 args,
210 shortname,
211 &rfd,
212 &wfd);
213
214 if (x < 0) {
215 debug(84, 1) ("WARNING: Cannot run '%s' process.\n", progname);
216 continue;
217 }
218
219 hlp->n_running++;
d8f10d6a 220 hlp->n_active++;
62e76326 221 srv = cbdataAlloc(helper_stateful_server);
222 srv->pid = x;
62e76326 223 srv->flags.reserved = S_HELPER_FREE;
224 srv->deferred_requests = 0;
225 srv->stats.deferbyfunc = 0;
226 srv->stats.deferbycb = 0;
227 srv->stats.submits = 0;
228 srv->stats.releases = 0;
229 srv->index = k;
230 srv->rfd = rfd;
231 srv->wfd = wfd;
07eca7e0 232 srv->rbuf = (char *)memAllocBuf(8192, &srv->rbuf_sz);
233 srv->roffset = 0;
62e76326 234 srv->parent = cbdataReference(hlp);
235
236 if (hlp->datapool != NULL)
b001e822 237 srv->data = hlp->datapool->alloc();
62e76326 238
239 dlinkAddTail(srv, &srv->link, &hlp->servers);
240
241 if (rfd == wfd) {
242 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
243 fd_note(rfd, fd_note_buf);
244 } else {
245 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
246 fd_note(rfd, fd_note_buf);
247 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
248 fd_note(wfd, fd_note_buf);
249 }
250
251 commSetNonBlocking(rfd);
252
253 if (wfd != rfd)
254 commSetNonBlocking(wfd);
255
256 comm_add_close_handler(rfd, helperStatefulServerFree, srv);
07eca7e0 257
258 comm_read(srv->rfd, srv->rbuf, srv->rbuf_sz - 1, helperStatefulHandleRead, srv);
259
94439e4e 260 }
62e76326 261
5ea33fce 262 hlp->last_restart = squid_curtime;
94439e4e 263 safe_free(shortname);
264 safe_free(procname);
265 helperStatefulKickQueue(hlp);
266}
267
268
74addf6c 269void
270helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
271{
51ee7c82 272 helper_request *r = new helper_request;
74addf6c 273 helper_server *srv;
62e76326 274
5b5f9257 275 if (hlp == NULL) {
62e76326 276 debug(84, 3) ("helperSubmit: hlp == NULL\n");
277 callback(data, NULL);
278 return;
5b5f9257 279 }
62e76326 280
74addf6c 281 r->callback = callback;
fa80a8ef 282 r->data = cbdataReference(data);
74addf6c 283 r->buf = xstrdup(buf);
62e76326 284
74addf6c 285 if ((srv = GetFirstAvailable(hlp)))
62e76326 286 helperDispatch(srv, r);
74addf6c 287 else
62e76326 288 Enqueue(hlp, r);
289
17bb3486 290 debug(84, 9) ("helperSubmit: %s\n", buf);
94439e4e 291}
292
721b0310 293/* lastserver = "server last used as part of a deferred or reserved
294 * request sequence"
295 */
94439e4e 296void
297helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPSCB * callback, void *data, helper_stateful_server * lastserver)
298{
51ee7c82 299 helper_stateful_request *r = new helper_stateful_request;
94439e4e 300 helper_stateful_server *srv;
62e76326 301
94439e4e 302 if (hlp == NULL) {
62e76326 303 debug(84, 3) ("helperStatefulSubmit: hlp == NULL\n");
304 callback(data, 0, NULL);
305 return;
94439e4e 306 }
62e76326 307
94439e4e 308 r->callback = callback;
fa80a8ef 309 r->data = cbdataReference(data);
62e76326 310
721b0310 311 if (buf != NULL) {
62e76326 312 r->buf = xstrdup(buf);
313 r->placeholder = 0;
721b0310 314 } else {
62e76326 315 r->buf = NULL;
316 r->placeholder = 1;
721b0310 317 }
62e76326 318
94439e4e 319 if ((buf != NULL) && lastserver) {
62e76326 320 debug(84, 5) ("StatefulSubmit with lastserver %p\n", lastserver);
321 /* the queue doesn't count for this assert because queued requests
322 * have already gone through here and been tested.
323 * It's legal to have deferred_requests == 0 and queue entries
324 * and status of S_HELPEER_DEFERRED.
325 * BUT: It's not legal to submit a new request w/lastserver in
326 * that state.
327 */
328 assert(!(lastserver->deferred_requests == 0 &&
329 lastserver->flags.reserved == S_HELPER_DEFERRED));
330
331 if (lastserver->flags.reserved != S_HELPER_RESERVED) {
332 lastserver->stats.submits++;
333 lastserver->deferred_requests--;
334 }
335
336 if (!(lastserver->request)) {
337 debug(84, 5) ("StatefulSubmit dispatching\n");
338 helperStatefulDispatch(lastserver, r);
339 } else {
340 debug(84, 5) ("StatefulSubmit queuing\n");
341 StatefulServerEnqueue(lastserver, r);
342 }
94439e4e 343 } else {
62e76326 344 if ((srv = StatefulGetFirstAvailable(hlp))) {
345 helperStatefulDispatch(srv, r);
346 } else
347 StatefulEnqueue(hlp, r);
94439e4e 348 }
62e76326 349
17bb3486 350 debug(84, 9) ("helperStatefulSubmit: placeholder: '%d', buf '%s'.\n", r->placeholder, buf);
94439e4e 351}
352
353helper_stateful_server *
354helperStatefulDefer(statefulhelper * hlp)
355/* find and add a deferred request to a server */
356{
357 dlink_node *n;
358 helper_stateful_server *srv = NULL, *rv = NULL;
62e76326 359
360 if (hlp == NULL)
361 {
362 debug(84, 3) ("helperStatefulDefer: hlp == NULL\n");
363 return NULL;
94439e4e 364 }
62e76326 365
17bb3486 366 debug(84, 5) ("helperStatefulDefer: Running servers %d.\n", hlp->n_running);
62e76326 367
368 if (hlp->n_running == 0)
369 {
370 debug(84, 1) ("helperStatefulDefer: No running servers!. \n");
371 return NULL;
94439e4e 372 }
62e76326 373
0a706c69 374 rv = srv = StatefulGetFirstAvailable(hlp);
62e76326 375
376 if (rv == NULL)
377 {
378 /*
379 * all currently busy; loop through servers and find server
380 * with the shortest queue
381 */
382
383 for (n = hlp->servers.head; n != NULL; n = n->next) {
384 srv = (helper_stateful_server *)n->data;
385
386 if (srv->flags.reserved == S_HELPER_RESERVED)
387 continue;
388
b902a58c 389 if (!srv->flags.shutdown)
62e76326 390 continue;
391
392 if ((hlp->IsAvailable != NULL) && (srv->data != NULL) &&
393 !(hlp->IsAvailable(srv->data)))
394 continue;
395
396 if ((rv != NULL) && (rv->deferred_requests < srv->deferred_requests))
397 continue;
398
399 rv = srv;
400 }
0a706c69 401 }
62e76326 402
403 if (rv == NULL)
404 {
405 debug(84, 1) ("helperStatefulDefer: None available.\n");
406 return NULL;
94439e4e 407 }
62e76326 408
60d096f4 409 /* consistency check:
410 * when the deferred count is 0,
411 * submits + releases == deferbyfunc + deferbycb
412 * Or in english, when there are no deferred requests, the amount
413 * we have submitted to the queue or cancelled must equal the amount
414 * we have said we wanted to be able to submit or cancel
415 */
416 if (rv->deferred_requests == 0)
62e76326 417 assert(rv->stats.submits + rv->stats.releases ==
418 rv->stats.deferbyfunc + rv->stats.deferbycb);
60d096f4 419
94439e4e 420 rv->flags.reserved = S_HELPER_DEFERRED;
62e76326 421
94439e4e 422 rv->deferred_requests++;
62e76326 423
60d096f4 424 rv->stats.deferbyfunc++;
62e76326 425
94439e4e 426 return rv;
427}
428
429void
430helperStatefulReset(helper_stateful_server * srv)
62e76326 431/* puts this helper back in the queue. the calling app is required to
94439e4e 432 * manage the state in the helper.
433 */
434{
435 statefulhelper *hlp = srv->parent;
436 helper_stateful_request *r;
437 r = srv->request;
62e76326 438
439 if (r != NULL)
440 {
441 /* reset attempt DURING an outstaning request */
442 debug(84, 1) ("helperStatefulReset: RESET During request %s \n",
443 hlp->id_name);
444 srv->flags.busy = 0;
07eca7e0 445 srv->roffset = 0;
62e76326 446 helperStatefulRequestFree(r);
447 srv->request = NULL;
94439e4e 448 }
62e76326 449
94439e4e 450 srv->flags.busy = 0;
62e76326 451
452 if (srv->queue.head)
453 {
454 srv->flags.reserved = S_HELPER_DEFERRED;
455 helperStatefulServerKickQueue(srv);
456 } else
457 {
458 srv->flags.reserved = S_HELPER_FREE;
459
460 if ((srv->parent->OnEmptyQueue != NULL) && (srv->data))
461 srv->parent->OnEmptyQueue(srv->data);
462
463 helperStatefulKickQueue(hlp);
94439e4e 464 }
465}
466
467void
468helperStatefulReleaseServer(helper_stateful_server * srv)
469/*decrease the number of 'waiting' clients that set the helper to be DEFERRED */
470{
60d096f4 471 srv->stats.releases++;
62e76326 472
473 if (srv->flags.reserved == S_HELPER_DEFERRED)
474 {
475 assert(srv->deferred_requests);
476 srv->deferred_requests--;
60d096f4 477 }
62e76326 478
479 if (!(srv->deferred_requests) && (srv->flags.reserved == S_HELPER_DEFERRED) && !(srv->queue.head))
480 {
481 srv->flags.reserved = S_HELPER_FREE;
482
483 if ((srv->parent->OnEmptyQueue != NULL) && (srv->data))
484 srv->parent->OnEmptyQueue(srv->data);
94439e4e 485 }
486}
487
488void *
489helperStatefulServerGetData(helper_stateful_server * srv)
490/* return a pointer to the stateful routines data area */
491{
492 return srv->data;
74addf6c 493}
494
495void
496helperStats(StoreEntry * sentry, helper * hlp)
497{
74addf6c 498 dlink_node *link;
0a706c69 499 storeAppendPrintf(sentry, "program: %s\n",
62e76326 500 hlp->cmdline->key);
74addf6c 501 storeAppendPrintf(sentry, "number running: %d of %d\n",
62e76326 502 hlp->n_running, hlp->n_to_start);
74addf6c 503 storeAppendPrintf(sentry, "requests sent: %d\n",
62e76326 504 hlp->stats.requests);
74addf6c 505 storeAppendPrintf(sentry, "replies received: %d\n",
62e76326 506 hlp->stats.replies);
74addf6c 507 storeAppendPrintf(sentry, "queue length: %d\n",
62e76326 508 hlp->stats.queue_size);
a15d85ea 509 storeAppendPrintf(sentry, "avg service time: %.2f msec\n",
62e76326 510 (double) hlp->stats.avg_svc_time / 1000.0);
74addf6c 511 storeAppendPrintf(sentry, "\n");
09c1ece1 512 storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%s\t%7s\t%7s\t%7s\n",
62e76326 513 "#",
514 "FD",
515 "PID",
516 "# Requests",
517 "Flags",
518 "Time",
519 "Offset",
520 "Request");
521
74addf6c 522 for (link = hlp->servers.head; link; link = link->next) {
07eca7e0 523 helper_server *srv = (helper_server*)link->data;
524 double tt = srv->requests[0] ? 0.001 * tvSubMsec(srv->requests[0]->dispatch_time, current_time) : 0.0;
d8f10d6a 525 storeAppendPrintf(sentry, "%7d\t%7d\t%7d\t%11d\t%c%c%c%c\t%7.3f\t%7d\t%s\n",
62e76326 526 srv->index + 1,
527 srv->rfd,
528 srv->pid,
529 srv->stats.uses,
07eca7e0 530 srv->stats.pending ? 'B' : ' ',
531 srv->flags.writing ? 'W' : ' ',
62e76326 532 srv->flags.closing ? 'C' : ' ',
533 srv->flags.shutdown ? 'S' : ' ',
534 tt < 0.0 ? 0.0 : tt,
07eca7e0 535 (int) srv->roffset,
536 srv->requests[0] ? log_quote(srv->requests[0]->buf) : "(none)");
74addf6c 537 }
62e76326 538
74addf6c 539 storeAppendPrintf(sentry, "\nFlags key:\n\n");
74addf6c 540 storeAppendPrintf(sentry, " B = BUSY\n");
07eca7e0 541 storeAppendPrintf(sentry, " W = BUSY\n");
74addf6c 542 storeAppendPrintf(sentry, " C = CLOSING\n");
543 storeAppendPrintf(sentry, " S = SHUTDOWN\n");
544}
545
94439e4e 546void
547helperStatefulStats(StoreEntry * sentry, statefulhelper * hlp)
548{
549 helper_stateful_server *srv;
550 dlink_node *link;
551 double tt;
552 storeAppendPrintf(sentry, "number running: %d of %d\n",
62e76326 553 hlp->n_running, hlp->n_to_start);
94439e4e 554 storeAppendPrintf(sentry, "requests sent: %d\n",
62e76326 555 hlp->stats.requests);
94439e4e 556 storeAppendPrintf(sentry, "replies received: %d\n",
62e76326 557 hlp->stats.replies);
94439e4e 558 storeAppendPrintf(sentry, "queue length: %d\n",
62e76326 559 hlp->stats.queue_size);
94439e4e 560 storeAppendPrintf(sentry, "avg service time: %d msec\n",
62e76326 561 hlp->stats.avg_svc_time);
94439e4e 562 storeAppendPrintf(sentry, "\n");
0a706c69 563 storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%20s\t%s\t%7s\t%7s\t%7s\n",
62e76326 564 "#",
565 "FD",
566 "PID",
567 "# Requests",
568 "# Deferred Requests",
569 "Flags",
570 "Time",
571 "Offset",
572 "Request");
573
94439e4e 574 for (link = hlp->servers.head; link; link = link->next) {
62e76326 575 srv = (helper_stateful_server *)link->data;
576 tt = 0.001 * tvSubMsec(srv->dispatch_time, current_time);
b902a58c 577 storeAppendPrintf(sentry, "%7d\t%7d\t%7d\t%11d\t%20d\t%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
62e76326 578 srv->index + 1,
579 srv->rfd,
580 srv->pid,
581 srv->stats.uses,
582 (int) srv->deferred_requests,
62e76326 583 srv->flags.busy ? 'B' : ' ',
584 srv->flags.closing ? 'C' : ' ',
585 srv->flags.reserved != S_HELPER_FREE ? 'R' : ' ',
586 srv->flags.shutdown ? 'S' : ' ',
587 srv->request ? (srv->request->placeholder ? 'P' : ' ') : ' ',
588 tt < 0.0 ? 0.0 : tt,
07eca7e0 589 (int) srv->roffset,
62e76326 590 srv->request ? log_quote(srv->request->buf) : "(none)");
94439e4e 591 }
62e76326 592
94439e4e 593 storeAppendPrintf(sentry, "\nFlags key:\n\n");
94439e4e 594 storeAppendPrintf(sentry, " B = BUSY\n");
595 storeAppendPrintf(sentry, " C = CLOSING\n");
596 storeAppendPrintf(sentry, " R = RESERVED or DEFERRED\n");
597 storeAppendPrintf(sentry, " S = SHUTDOWN\n");
5d146f7d 598 storeAppendPrintf(sentry, " P = PLACEHOLDER\n");
94439e4e 599}
600
74addf6c 601void
602helperShutdown(helper * hlp)
603{
c68e9c6b 604 dlink_node *link = hlp->servers.head;
62e76326 605
c68e9c6b 606 while (link) {
62e76326 607 helper_server *srv;
608 srv = (helper_server *)link->data;
609 link = link->next;
610
8cfc76db 611 if (srv->flags.shutdown) {
d8f10d6a 612 debug(84, 3) ("helperShutdown: %s #%d has already SHUT DOWN.\n",
62e76326 613 hlp->id_name, srv->index + 1);
614 continue;
615 }
616
d8f10d6a 617 hlp->n_active--;
618 assert(hlp->n_active >= 0);
619
62e76326 620 srv->flags.shutdown = 1; /* request it to shut itself down */
621
d8f10d6a 622 if (srv->flags.closing) {
623 debug(84, 3) ("helperShutdown: %s #%d is CLOSING.\n",
62e76326 624 hlp->id_name, srv->index + 1);
625 continue;
626 }
627
d8f10d6a 628 if (srv->stats.pending) {
629 debug(84, 3) ("helperShutdown: %s #%d is BUSY.\n",
62e76326 630 hlp->id_name, srv->index + 1);
631 continue;
632 }
633
634 srv->flags.closing = 1;
8cfc76db 635 debug(84, 3) ("helperShutdown: %s #%d shutting down.\n",
636 hlp->id_name, srv->index + 1);
62e76326 637 /* the rest of the details is dealt with in the helperServerFree
638 * close handler
639 */
640 comm_close(srv->rfd);
74addf6c 641 }
642}
643
94439e4e 644void
645helperStatefulShutdown(statefulhelper * hlp)
646{
647 dlink_node *link = hlp->servers.head;
648 helper_stateful_server *srv;
62e76326 649
94439e4e 650 while (link) {
62e76326 651 srv = (helper_stateful_server *)link->data;
652 link = link->next;
653
8cfc76db 654 if (srv->flags.shutdown) {
d8f10d6a 655 debug(84, 3) ("helperStatefulShutdown: %s #%d has already SHUT DOWN.\n",
62e76326 656 hlp->id_name, srv->index + 1);
657 continue;
658 }
659
d8f10d6a 660 hlp->n_active--;
661 assert(hlp->n_active >= 0);
62e76326 662 srv->flags.shutdown = 1; /* request it to shut itself down */
663
664 if (srv->flags.busy) {
665 debug(84, 3) ("helperStatefulShutdown: %s #%d is BUSY.\n",
666 hlp->id_name, srv->index + 1);
667 continue;
668 }
669
670 if (srv->flags.closing) {
671 debug(84, 3) ("helperStatefulShutdown: %s #%d is CLOSING.\n",
672 hlp->id_name, srv->index + 1);
673 continue;
674 }
675
676 if (srv->flags.reserved != S_HELPER_FREE) {
677 debug(84, 3) ("helperStatefulShutdown: %s #%d is RESERVED.\n",
678 hlp->id_name, srv->index + 1);
679 continue;
680 }
681
682 if (srv->deferred_requests) {
683 debug(84, 3) ("helperStatefulShutdown: %s #%d has DEFERRED requests.\n",
684 hlp->id_name, srv->index + 1);
685 continue;
686 }
687
688 srv->flags.closing = 1;
8cfc76db 689 debug(84, 3) ("helperStatefulShutdown: %s #%d shutting down.\n",
690 hlp->id_name, srv->index + 1);
62e76326 691 /* the rest of the details is dealt with in the helperStatefulServerFree
692 * close handler
693 */
694 comm_close(srv->rfd);
94439e4e 695 }
696}
697
698
1f5f60dd 699helper *
700helperCreate(const char *name)
701{
28c60158 702 helper *hlp;
72711e31 703 hlp = cbdataAlloc(helper);
1f5f60dd 704 hlp->id_name = name;
705 return hlp;
706}
707
94439e4e 708statefulhelper *
709helperStatefulCreate(const char *name)
710{
711 statefulhelper *hlp;
72711e31 712 hlp = cbdataAlloc(statefulhelper);
94439e4e 713 hlp->id_name = name;
714 return hlp;
715}
716
717
1f5f60dd 718void
719helperFree(helper * hlp)
720{
5dae8514 721 if (!hlp)
62e76326 722 return;
723
1f5f60dd 724 /* note, don't free hlp->name, it probably points to static memory */
fe73896c 725 if (hlp->queue.head)
62e76326 726 debug(84, 0) ("WARNING: freeing %s helper with %d requests queued\n",
727 hlp->id_name, hlp->stats.queue_size);
728
1f5f60dd 729 cbdataFree(hlp);
730}
731
94439e4e 732void
733helperStatefulFree(statefulhelper * hlp)
734{
5dae8514 735 if (!hlp)
62e76326 736 return;
737
94439e4e 738 /* note, don't free hlp->name, it probably points to static memory */
739 if (hlp->queue.head)
62e76326 740 debug(84, 0) ("WARNING: freeing %s helper with %d requests queued\n",
741 hlp->id_name, hlp->stats.queue_size);
742
94439e4e 743 cbdataFree(hlp);
744}
745
746
74addf6c 747/* ====================================================================== */
748/* LOCAL FUNCTIONS */
749/* ====================================================================== */
750
751static void
1f5f60dd 752helperServerFree(int fd, void *data)
74addf6c 753{
e6ccf245 754 helper_server *srv = (helper_server *)data;
74addf6c 755 helper *hlp = srv->parent;
ac750329 756 helper_request *r;
07eca7e0 757 int i, concurrency = hlp->concurrency;
758
759 if (!concurrency)
760 concurrency = 1;
761
74addf6c 762 assert(srv->rfd == fd);
62e76326 763
07eca7e0 764 if (srv->rbuf) {
765 memFreeBuf(srv->rbuf_sz, srv->rbuf);
766 srv->rbuf = NULL;
74addf6c 767 }
62e76326 768
032785bf 769 memBufClean(srv->wqueue);
770 delete srv->wqueue;
771
772 if (srv->writebuf) {
773 memBufClean(srv->writebuf);
774 delete srv->writebuf;
775 srv->writebuf = NULL;
776 }
62e76326 777
07eca7e0 778 for (i = 0; i < concurrency; i++) {
779 if ((r = srv->requests[i])) {
780 void *cbdata;
62e76326 781
07eca7e0 782 if (cbdataReferenceValidDone(r->data, &cbdata))
783 r->callback(cbdata, NULL);
62e76326 784
07eca7e0 785 helperRequestFree(r);
786
787 srv->requests[i] = NULL;
788 }
ac750329 789 }
62e76326 790
07eca7e0 791 safe_free(srv->requests);
792
3cdb7cd0 793 if (srv->wfd != srv->rfd && srv->wfd != -1)
62e76326 794 comm_close(srv->wfd);
795
74addf6c 796 dlinkDelete(&srv->link, &hlp->servers);
62e76326 797
74addf6c 798 hlp->n_running--;
62e76326 799
74addf6c 800 assert(hlp->n_running >= 0);
62e76326 801
1f5f60dd 802 if (!srv->flags.shutdown) {
d8f10d6a 803 hlp->n_active--;
804 assert(hlp->n_active >= 0);
62e76326 805 debug(84, 0) ("WARNING: %s #%d (FD %d) exited\n",
806 hlp->id_name, srv->index + 1, fd);
807
5ea33fce 808 if (hlp->n_active < hlp->n_to_start / 2) {
809 debug(80, 0) ("Too few %s processes are running\n", hlp->id_name);
810
811 if (hlp->last_restart > squid_curtime - 30)
812 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
813
814 debug(80, 0) ("Starting new helpers\n");
815
816 helperOpenServers(hlp);
817 }
14e87a44 818 }
62e76326 819
fa80a8ef 820 cbdataReferenceDone(srv->parent);
14e87a44 821 cbdataFree(srv);
74addf6c 822}
823
94439e4e 824static void
825helperStatefulServerFree(int fd, void *data)
826{
e6ccf245 827 helper_stateful_server *srv = (helper_stateful_server *)data;
94439e4e 828 statefulhelper *hlp = srv->parent;
829 helper_stateful_request *r;
830 assert(srv->rfd == fd);
62e76326 831
07eca7e0 832 if (srv->rbuf) {
833 memFreeBuf(srv->rbuf_sz, srv->rbuf);
834 srv->rbuf = NULL;
94439e4e 835 }
62e76326 836
07eca7e0 837#if 0
032785bf 838 memBufClean(srv->wqueue);
839
840 delete srv->wqueue;
07eca7e0 841
842#endif
843
94439e4e 844 if ((r = srv->request)) {
62e76326 845 void *cbdata;
846
847 if (cbdataReferenceValidDone(r->data, &cbdata))
07eca7e0 848 r->callback(cbdata, srv, NULL);
62e76326 849
850 helperStatefulRequestFree(r);
851
852 srv->request = NULL;
94439e4e 853 }
62e76326 854
3c641669 855 /* TODO: walk the local queue of requests and carry them all out */
94439e4e 856 if (srv->wfd != srv->rfd && srv->wfd != -1)
62e76326 857 comm_close(srv->wfd);
858
94439e4e 859 dlinkDelete(&srv->link, &hlp->servers);
62e76326 860
94439e4e 861 hlp->n_running--;
62e76326 862
94439e4e 863 assert(hlp->n_running >= 0);
62e76326 864
94439e4e 865 if (!srv->flags.shutdown) {
d8f10d6a 866 hlp->n_active--;
867 assert( hlp->n_active >= 0);
62e76326 868 debug(84, 0) ("WARNING: %s #%d (FD %d) exited\n",
869 hlp->id_name, srv->index + 1, fd);
870
5ea33fce 871 if (hlp->n_active <= hlp->n_to_start / 2) {
872 debug(80, 0) ("Too few %s processes are running\n", hlp->id_name);
873
874 if (hlp->last_restart > squid_curtime - 30)
875 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
876
877 debug(80, 0) ("Starting new helpers\n");
878
879 helperStatefulOpenServers(hlp);
880 }
94439e4e 881 }
62e76326 882
94439e4e 883 if (srv->data != NULL)
b001e822 884 hlp->datapool->free(srv->data);
62e76326 885
fa80a8ef 886 cbdataReferenceDone(srv->parent);
62e76326 887
94439e4e 888 cbdataFree(srv);
889}
890
891
74addf6c 892static void
c4b7a5a9 893helperHandleRead(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
74addf6c 894{
74addf6c 895 char *t = NULL;
e6ccf245 896 helper_server *srv = (helper_server *)data;
74addf6c 897 helper *hlp = srv->parent;
898 assert(fd == srv->rfd);
fa80a8ef 899 assert(cbdataReferenceValid(data));
c4b7a5a9 900
901 /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
62e76326 902
c4b7a5a9 903 if (flag == COMM_ERR_CLOSING) {
904 return;
905 }
906
17bb3486 907 debug(84, 5) ("helperHandleRead: %d bytes from %s #%d.\n",
62e76326 908 (int)len, hlp->id_name, srv->index + 1);
909
c4b7a5a9 910 if (flag != COMM_OK || len <= 0) {
62e76326 911 if (len < 0)
912 debug(84, 1) ("helperHandleRead: FD %d read: %s\n", fd, xstrerror());
913
914 comm_close(fd);
915
916 return;
74addf6c 917 }
62e76326 918
07eca7e0 919 srv->roffset += len;
920 srv->rbuf[srv->roffset] = '\0';
921 debug(84, 9) ("helperHandleRead: '%s'\n", srv->rbuf);
62e76326 922
07eca7e0 923 if (!srv->stats.pending) {
62e76326 924 /* someone spoke without being spoken to */
07eca7e0 925 debug(84, 1) ("helperHandleRead: unexpected read from %s #%d, %d bytes '%s'\n",
926 hlp->id_name, srv->index + 1, (int)len, srv->rbuf);
927 srv->roffset = 0;
928 srv->rbuf[0] = '\0';
929 }
930
931 while ((t = strchr(srv->rbuf, '\n'))) {
62e76326 932 /* end of reply found */
07eca7e0 933 helper_request *r;
934 char *msg = srv->rbuf;
935 int i = 0;
62e76326 936 debug(84, 3) ("helperHandleRead: end of reply found\n");
07eca7e0 937 *t++ = '\0';
62e76326 938
07eca7e0 939 if (hlp->concurrency) {
940 i = strtol(msg, &msg, 10);
62e76326 941
07eca7e0 942 while (*msg && isspace(*msg))
943 msg++;
944 }
62e76326 945
07eca7e0 946 r = srv->requests[i];
62e76326 947
07eca7e0 948 if (r) {
949 HLPCB *callback = r->callback;
950 void *cbdata;
62e76326 951
07eca7e0 952 srv->requests[i] = NULL;
62e76326 953
07eca7e0 954 r->callback = NULL;
62e76326 955
07eca7e0 956 if (cbdataReferenceValidDone(r->data, &cbdata))
957 callback(cbdata, msg);
62e76326 958
07eca7e0 959 srv->stats.pending--;
960
961 hlp->stats.replies++;
962
963 hlp->stats.avg_svc_time =
964 intAverage(hlp->stats.avg_svc_time,
965 tvSubMsec(r->dispatch_time, current_time),
966 hlp->stats.replies, REDIRECT_AV_FACTOR);
967
968 helperRequestFree(r);
969 } else {
970 debug(84, 1) ("helperHandleRead: unexpected reply on channel %d from %s #%d '%s'\n",
971 i, hlp->id_name, srv->index + 1, srv->rbuf);
972 }
973
974 srv->roffset -= (t - srv->rbuf);
975 memmove(srv->rbuf, t, srv->roffset + 1);
62e76326 976
977 if (srv->flags.shutdown) {
978 int wfd = srv->wfd;
979 srv->wfd = -1;
d8f10d6a 980 srv->flags.closing=1;
62e76326 981 comm_close(wfd);
982 } else
983 helperKickQueue(hlp);
74addf6c 984 }
07eca7e0 985
986 comm_read(fd, srv->rbuf + srv->roffset, srv->rbuf_sz - srv->roffset - 1, helperHandleRead, srv);
74addf6c 987}
988
94439e4e 989static void
c4b7a5a9 990helperStatefulHandleRead(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
94439e4e 991{
94439e4e 992 char *t = NULL;
e6ccf245 993 helper_stateful_server *srv = (helper_stateful_server *)data;
94439e4e 994 helper_stateful_request *r;
995 statefulhelper *hlp = srv->parent;
996 assert(fd == srv->rfd);
fa80a8ef 997 assert(cbdataReferenceValid(data));
c4b7a5a9 998
999 /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
62e76326 1000
c4b7a5a9 1001 if (flag == COMM_ERR_CLOSING) {
1002 return;
1003 }
1004
17bb3486 1005 debug(84, 5) ("helperStatefulHandleRead: %d bytes from %s #%d.\n",
62e76326 1006 (int)len, hlp->id_name, srv->index + 1);
1007
c4b7a5a9 1008 if (flag != COMM_OK || len <= 0) {
62e76326 1009 if (len < 0)
1010 debug(84, 1) ("helperStatefulHandleRead: FD %d read: %s\n", fd, xstrerror());
1011
1012 comm_close(fd);
1013
1014 return;
94439e4e 1015 }
62e76326 1016
07eca7e0 1017 srv->roffset += len;
1018 srv->rbuf[srv->roffset] = '\0';
94439e4e 1019 r = srv->request;
62e76326 1020
94439e4e 1021 if (r == NULL) {
62e76326 1022 /* someone spoke without being spoken to */
07eca7e0 1023 debug(84, 1) ("helperStatefulHandleRead: unexpected read from %s #%d, %d bytes '%s'\n",
1024 hlp->id_name, srv->index + 1, (int)len, srv->rbuf);
1025 srv->roffset = 0;
1026 }
1027
1028 if ((t = strchr(srv->rbuf, '\n'))) {
62e76326 1029 /* end of reply found */
1030 debug(84, 3) ("helperStatefulHandleRead: end of reply found\n");
1031 *t = '\0';
1032
1033 if (cbdataReferenceValid(r->data)) {
07eca7e0 1034 switch ((r->callback(r->data, srv, srv->rbuf))) { /*if non-zero reserve helper */
62e76326 1035
1036 case S_HELPER_UNKNOWN:
1037 fatal("helperStatefulHandleRead: either a non-state aware callback was give to the stateful helper routines, or an uninitialised callback response was recieved.\n");
1038 break;
1039
1040 case S_HELPER_RELEASE: /* helper finished with */
1041
1042 if (!srv->deferred_requests && !srv->queue.head) {
1043 srv->flags.reserved = S_HELPER_FREE;
1044
1045 if ((srv->parent->OnEmptyQueue != NULL) && (srv->data))
1046 srv->parent->OnEmptyQueue(srv->data);
1047
1048 debug(84, 5) ("StatefulHandleRead: releasing %s #%d\n", hlp->id_name, srv->index + 1);
1049 } else {
1050 srv->flags.reserved = S_HELPER_DEFERRED;
1051 debug(84, 5) ("StatefulHandleRead: outstanding deferred requests on %s #%d. reserving for deferred requests.\n", hlp->id_name, srv->index + 1);
1052 }
1053
1054 break;
1055
1056 case S_HELPER_RESERVE: /* 'pin' this helper for the caller */
1057
1058 if (!srv->queue.head) {
1059 assert(srv->deferred_requests == 0);
1060 srv->flags.reserved = S_HELPER_RESERVED;
1061 debug(84, 5) ("StatefulHandleRead: reserving %s #%d\n", hlp->id_name, srv->index + 1);
1062 } else {
1063 fatal("StatefulHandleRead: Callback routine attempted to reserve a stateful helper with deferred requests. This can lead to deadlock.\n");
1064 }
1065
1066 break;
1067
1068 case S_HELPER_DEFER:
1069 /* the helper is still needed, but can
1070 * be used for other requests in the meantime.
1071 */
1072 srv->flags.reserved = S_HELPER_DEFERRED;
1073 srv->deferred_requests++;
1074 srv->stats.deferbycb++;
1075 debug(84, 5) ("StatefulHandleRead: reserving %s #%d for deferred requests.\n", hlp->id_name, srv->index + 1);
1076 break;
1077
1078 default:
1079 fatal("helperStatefulHandleRead: unknown stateful helper callback result.\n");
1080 }
1081
1082 } else {
1083 debug(84, 1) ("StatefulHandleRead: no callback data registered\n");
1084 }
1085
1086 srv->flags.busy = 0;
07eca7e0 1087 srv->roffset = 0;
62e76326 1088 helperStatefulRequestFree(r);
1089 srv->request = NULL;
1090 hlp->stats.replies++;
1091 hlp->stats.avg_svc_time =
1092 intAverage(hlp->stats.avg_svc_time,
1093 tvSubMsec(srv->dispatch_time, current_time),
1094 hlp->stats.replies, REDIRECT_AV_FACTOR);
1095
1096 if (srv->flags.shutdown
1097 && srv->flags.reserved == S_HELPER_FREE
1098 && !srv->deferred_requests) {
1099 int wfd = srv->wfd;
1100 srv->wfd = -1;
d8f10d6a 1101 srv->flags.closing=1;
62e76326 1102 comm_close(wfd);
1103 } else {
1104 if (srv->queue.head)
1105 helperStatefulServerKickQueue(srv);
1106 else
1107 helperStatefulKickQueue(hlp);
1108 }
94439e4e 1109 }
07eca7e0 1110
1111 comm_read(srv->rfd, srv->rbuf + srv->roffset, srv->rbuf_sz - srv->roffset - 1,
1112 helperStatefulHandleRead, srv);
94439e4e 1113}
1114
74addf6c 1115static void
1116Enqueue(helper * hlp, helper_request * r)
1117{
e6ccf245 1118 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
74addf6c 1119 dlinkAddTail(r, link, &hlp->queue);
1120 hlp->stats.queue_size++;
62e76326 1121
74addf6c 1122 if (hlp->stats.queue_size < hlp->n_running)
62e76326 1123 return;
1124
74addf6c 1125 if (squid_curtime - hlp->last_queue_warn < 600)
62e76326 1126 return;
1127
fe73896c 1128 if (shutting_down || reconfiguring)
62e76326 1129 return;
1130
74addf6c 1131 hlp->last_queue_warn = squid_curtime;
62e76326 1132
c916b75b 1133 debug(84, 0) ("WARNING: All %s processes are busy.\n", hlp->id_name);
62e76326 1134
c916b75b 1135 debug(84, 0) ("WARNING: %d pending requests queued\n", hlp->stats.queue_size);
62e76326 1136
74addf6c 1137 if (hlp->stats.queue_size > hlp->n_running * 2)
62e76326 1138 fatalf("Too many queued %s requests", hlp->id_name);
1139
c916b75b 1140 debug(84, 1) ("Consider increasing the number of %s processes in your config file.\n", hlp->id_name);
74addf6c 1141}
1142
94439e4e 1143static void
1144StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r)
1145{
e6ccf245 1146 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
94439e4e 1147 dlinkAddTail(r, link, &hlp->queue);
1148 hlp->stats.queue_size++;
62e76326 1149
94439e4e 1150 if (hlp->stats.queue_size < hlp->n_running)
62e76326 1151 return;
1152
893cbac6 1153 if (hlp->stats.queue_size > hlp->n_running * 2)
62e76326 1154 fatalf("Too many queued %s requests", hlp->id_name);
1155
94439e4e 1156 if (squid_curtime - hlp->last_queue_warn < 600)
62e76326 1157 return;
1158
94439e4e 1159 if (shutting_down || reconfiguring)
62e76326 1160 return;
1161
94439e4e 1162 hlp->last_queue_warn = squid_curtime;
62e76326 1163
c916b75b 1164 debug(84, 0) ("WARNING: All %s processes are busy.\n", hlp->id_name);
62e76326 1165
c916b75b 1166 debug(84, 0) ("WARNING: %d pending requests queued\n", hlp->stats.queue_size);
62e76326 1167
c916b75b 1168 debug(84, 1) ("Consider increasing the number of %s processes in your config file.\n", hlp->id_name);
94439e4e 1169}
1170
1171static void
1172StatefulServerEnqueue(helper_stateful_server * srv, helper_stateful_request * r)
1173{
e6ccf245 1174 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
94439e4e 1175 dlinkAddTail(r, link, &srv->queue);
62e76326 1176 /* TODO: warning if the queue on this server is more than X
1177 * We don't check the queue size at the moment, because
1178 * requests hitting here are deferrable
1179 */
1180 /* hlp->stats.queue_size++;
1181 * if (hlp->stats.queue_size < hlp->n_running)
1182 * return;
1183 * if (squid_curtime - hlp->last_queue_warn < 600)
1184 * return;
1185 * if (shutting_down || reconfiguring)
1186 * return;
1187 * hlp->last_queue_warn = squid_curtime;
1188 * debug(84, 0) ("WARNING: All %s processes are busy.\n", hlp->id_name);
1189 * debug(84, 0) ("WARNING: %d pending requests queued\n", hlp->stats.queue_size);
1190 * if (hlp->stats.queue_size > hlp->n_running * 2)
1191 * fatalf("Too many queued %s requests", hlp->id_name);
1192 * debug(84, 1) ("Consider increasing the number of %s processes in your config file.\n", hlp->id_name); */
94439e4e 1193}
1194
1195
74addf6c 1196static helper_request *
1197Dequeue(helper * hlp)
1198{
1199 dlink_node *link;
1200 helper_request *r = NULL;
62e76326 1201
74addf6c 1202 if ((link = hlp->queue.head)) {
62e76326 1203 r = (helper_request *)link->data;
1204 dlinkDelete(link, &hlp->queue);
1205 memFree(link, MEM_DLINK_NODE);
1206 hlp->stats.queue_size--;
74addf6c 1207 }
62e76326 1208
74addf6c 1209 return r;
1210}
1211
94439e4e 1212static helper_stateful_request *
1213StatefulServerDequeue(helper_stateful_server * srv)
1214{
1215 dlink_node *link;
1216 helper_stateful_request *r = NULL;
62e76326 1217
94439e4e 1218 if ((link = srv->queue.head)) {
62e76326 1219 r = (helper_stateful_request *)link->data;
1220 dlinkDelete(link, &srv->queue);
1221 memFree(link, MEM_DLINK_NODE);
94439e4e 1222 }
62e76326 1223
94439e4e 1224 return r;
1225}
1226
1227static helper_stateful_request *
1228StatefulDequeue(statefulhelper * hlp)
1229{
1230 dlink_node *link;
1231 helper_stateful_request *r = NULL;
62e76326 1232
94439e4e 1233 if ((link = hlp->queue.head)) {
62e76326 1234 r = (helper_stateful_request *)link->data;
1235 dlinkDelete(link, &hlp->queue);
1236 memFree(link, MEM_DLINK_NODE);
1237 hlp->stats.queue_size--;
94439e4e 1238 }
62e76326 1239
94439e4e 1240 return r;
1241}
1242
74addf6c 1243static helper_server *
1244GetFirstAvailable(helper * hlp)
1245{
1246 dlink_node *n;
07eca7e0 1247 helper_server *srv;
1248 helper_server *selected = NULL;
62e76326 1249
fe73896c 1250 if (hlp->n_running == 0)
62e76326 1251 return NULL;
1252
07eca7e0 1253 /* Find "least" loaded helper (approx) */
74addf6c 1254 for (n = hlp->servers.head; n != NULL; n = n->next) {
62e76326 1255 srv = (helper_server *)n->data;
1256
07eca7e0 1257 if (selected && selected->stats.pending <= srv->stats.pending)
62e76326 1258 continue;
1259
d8f10d6a 1260 if (srv->flags.shutdown)
62e76326 1261 continue;
1262
07eca7e0 1263 if (!srv->stats.pending)
1264 return srv;
1265
1266 if (selected) {
1267 selected = srv;
1268 break;
1269 }
1270
1271 selected = srv;
74addf6c 1272 }
62e76326 1273
07eca7e0 1274 /* Check for overload */
1275 if (!selected)
1276 return NULL;
1277
1278 if (selected->stats.pending >= (hlp->concurrency ? hlp->concurrency : 1))
1279 return NULL;
1280
1281 return selected;
74addf6c 1282}
1283
94439e4e 1284static helper_stateful_server *
1285StatefulGetFirstAvailable(statefulhelper * hlp)
1286{
1287 dlink_node *n;
1288 helper_stateful_server *srv = NULL;
17bb3486 1289 debug(84, 5) ("StatefulGetFirstAvailable: Running servers %d.\n", hlp->n_running);
62e76326 1290
94439e4e 1291 if (hlp->n_running == 0)
62e76326 1292 return NULL;
1293
94439e4e 1294 for (n = hlp->servers.head; n != NULL; n = n->next) {
62e76326 1295 srv = (helper_stateful_server *)n->data;
1296
1297 if (srv->flags.busy)
1298 continue;
1299
1300 if (srv->flags.reserved == S_HELPER_RESERVED)
1301 continue;
1302
d8f10d6a 1303 if (srv->flags.shutdown)
62e76326 1304 continue;
1305
1306 if ((hlp->IsAvailable != NULL) && (srv->data != NULL) && !(hlp->IsAvailable(srv->data)))
1307 continue;
1308
1309 return srv;
94439e4e 1310 }
62e76326 1311
17bb3486 1312 debug(84, 5) ("StatefulGetFirstAvailable: None available.\n");
94439e4e 1313 return NULL;
1314}
1315
1316
42679bd6 1317static void
1318helperDispatchWriteDone(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
1319{
07eca7e0 1320 helper_server *srv = (helper_server *)data;
1321
032785bf 1322 memBufClean(srv->writebuf);
1323 delete srv->writebuf;
1324 srv->writebuf = NULL;
07eca7e0 1325 srv->flags.writing = 0;
1326
1327 if (flag != COMM_OK) {
1328 /* Helper server has crashed */
1329 debug(84, 0) ("helperDispatch: Helper %s #%d has crashed\n",
1330 srv->parent->id_name, srv->index + 1);
1331 return;
1332 }
1333
032785bf 1334 if (!memBufIsNull(srv->wqueue)) {
07eca7e0 1335 srv->writebuf = srv->wqueue;
032785bf 1336 srv->wqueue = new MemBuf;
07eca7e0 1337 srv->flags.writing = 1;
1338 comm_write(srv->wfd,
032785bf 1339 srv->writebuf->content(),
1340 srv->writebuf->contentSize(),
07eca7e0 1341 helperDispatchWriteDone, /* Handler */
1342 srv); /* Handler-data */
1343 }
42679bd6 1344}
1345
74addf6c 1346static void
1347helperDispatch(helper_server * srv, helper_request * r)
1348{
1349 helper *hlp = srv->parent;
07eca7e0 1350 helper_request **ptr = NULL;
1351 unsigned int slot;
62e76326 1352
fa80a8ef 1353 if (!cbdataReferenceValid(r->data)) {
62e76326 1354 debug(84, 1) ("helperDispatch: invalid callback data\n");
1355 helperRequestFree(r);
1356 return;
74addf6c 1357 }
62e76326 1358
07eca7e0 1359 for (slot = 0; slot < (hlp->concurrency ? hlp->concurrency : 1); slot++) {
1360 if (!srv->requests[slot]) {
1361 ptr = &srv->requests[slot];
1362 break;
1363 }
1364 }
1365
1366 assert(ptr);
1367 *ptr = r;
1368 srv->stats.pending += 1;
1369 r->dispatch_time = current_time;
1370
032785bf 1371 if (memBufIsNull(srv->wqueue))
1372 memBufDefInit(srv->wqueue);
07eca7e0 1373
1374 if (hlp->concurrency)
032785bf 1375 memBufPrintf(srv->wqueue, "%d %s", slot, r->buf);
07eca7e0 1376 else
032785bf 1377 memBufAppend(srv->wqueue, r->buf, strlen(r->buf));
07eca7e0 1378
1379 if (!srv->flags.writing) {
032785bf 1380 assert(NULL == srv->writebuf);
07eca7e0 1381 srv->writebuf = srv->wqueue;
032785bf 1382 srv->wqueue = new MemBuf;
07eca7e0 1383 srv->flags.writing = 1;
1384 comm_write(srv->wfd,
032785bf 1385 srv->writebuf->content(),
1386 srv->writebuf->contentSize(),
07eca7e0 1387 helperDispatchWriteDone, /* Handler */
1388 srv); /* Handler-data */
1389 }
1390
17bb3486 1391 debug(84, 5) ("helperDispatch: Request sent to %s #%d, %d bytes\n",
62e76326 1392 hlp->id_name, srv->index + 1, (int) strlen(r->buf));
74addf6c 1393 srv->stats.uses++;
1394 hlp->stats.requests++;
1395}
1396
42679bd6 1397static void
1398helperStatefulDispatchWriteDone(int fd, char *buf, size_t len, comm_err_t flag,
62e76326 1399 int xerrno, void *data)
42679bd6 1400{
62e76326 1401 /* nothing! */
42679bd6 1402}
1403
1404
94439e4e 1405static void
1406helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r)
1407{
1408 statefulhelper *hlp = srv->parent;
62e76326 1409
fa80a8ef 1410 if (!cbdataReferenceValid(r->data)) {
62e76326 1411 debug(84, 1) ("helperStatefulDispatch: invalid callback data\n");
1412 helperStatefulRequestFree(r);
1413 return;
94439e4e 1414 }
62e76326 1415
17bb3486 1416 debug(84, 9) ("helperStatefulDispatch busying helper %s #%d\n", hlp->id_name, srv->index + 1);
62e76326 1417
94439e4e 1418 if (r->placeholder == 1) {
62e76326 1419 /* a callback is needed before this request can _use_ a helper. */
1420 /* we don't care about releasing/deferring this helper. The request NEVER
1421 * gets to the helper. So we throw away the return code */
1422 r->callback(r->data, srv, NULL);
1423 /* throw away the placeholder */
1424 helperStatefulRequestFree(r);
1425 /* and push the queue. Note that the callback may have submitted a new
1426 * request to the helper which is why we test for the request*/
1427
1428 if (srv->request == NULL) {
1429 if (srv->flags.shutdown
1430 && srv->flags.reserved == S_HELPER_FREE
1431 && !srv->deferred_requests) {
1432 int wfd = srv->wfd;
1433 srv->wfd = -1;
1434 comm_close(wfd);
1435 } else {
1436 if (srv->queue.head)
1437 helperStatefulServerKickQueue(srv);
1438 else
1439 helperStatefulKickQueue(hlp);
1440 }
1441 }
1442
1443 return;
94439e4e 1444 }
62e76326 1445
94439e4e 1446 srv->flags.busy = 1;
1447 srv->request = r;
1448 srv->dispatch_time = current_time;
42679bd6 1449 comm_write(srv->wfd,
62e76326 1450 r->buf,
1451 strlen(r->buf),
1452 helperStatefulDispatchWriteDone, /* Handler */
1453 hlp); /* Handler-data */
17bb3486 1454 debug(84, 5) ("helperStatefulDispatch: Request sent to %s #%d, %d bytes\n",
62e76326 1455 hlp->id_name, srv->index + 1, (int) strlen(r->buf));
94439e4e 1456 srv->stats.uses++;
1457 hlp->stats.requests++;
1458}
1459
1460
74addf6c 1461static void
1462helperKickQueue(helper * hlp)
1463{
1464 helper_request *r;
1465 helper_server *srv;
62e76326 1466
74addf6c 1467 while ((srv = GetFirstAvailable(hlp)) && (r = Dequeue(hlp)))
62e76326 1468 helperDispatch(srv, r);
74addf6c 1469}
1470
94439e4e 1471static void
1472helperStatefulKickQueue(statefulhelper * hlp)
1473{
1474 helper_stateful_request *r;
1475 helper_stateful_server *srv;
62e76326 1476
94439e4e 1477 while ((srv = StatefulGetFirstAvailable(hlp)) && (r = StatefulDequeue(hlp)))
62e76326 1478 helperStatefulDispatch(srv, r);
94439e4e 1479}
1480
1481static void
1482helperStatefulServerKickQueue(helper_stateful_server * srv)
1483{
1484 helper_stateful_request *r;
62e76326 1485
94439e4e 1486 if ((r = StatefulServerDequeue(srv)))
62e76326 1487 helperStatefulDispatch(srv, r);
94439e4e 1488}
1489
74addf6c 1490static void
1491helperRequestFree(helper_request * r)
1492{
fa80a8ef 1493 cbdataReferenceDone(r->data);
74addf6c 1494 xfree(r->buf);
00d77d6b 1495 delete r;
51ee7c82 1496}
1497
94439e4e 1498static void
1499helperStatefulRequestFree(helper_stateful_request * r)
1500{
fa80a8ef 1501 cbdataReferenceDone(r->data);
94439e4e 1502 xfree(r->buf);
00d77d6b 1503 delete r;
51ee7c82 1504}