]> git.ipfire.org Git - thirdparty/squid.git/blob - src/helper.cc
Merged from trunk rev.13445
[thirdparty/squid.git] / src / helper.cc
1 /*
2 * DEBUG: section 84 Helper process maintenance
3 * AUTHOR: Harvest Derived?
4 *
5 * SQUID Web Proxy Cache http://www.squid-cache.org/
6 * ----------------------------------------------------------
7 *
8 * Squid is the result of efforts by numerous individuals from
9 * the Internet community; see the CONTRIBUTORS file for full
10 * details. Many organizations have provided support for Squid's
11 * development; see the SPONSORS file for full details. Squid is
12 * Copyrighted (C) 2001 by the Regents of the University of
13 * California; see the COPYRIGHT file for full details. Squid
14 * incorporates software developed and/or copyrighted by other
15 * sources; see the CREDITS file for full details.
16 *
17 * This program is free software; you can redistribute it and/or modify
18 * it under the terms of the GNU General Public License as published by
19 * the Free Software Foundation; either version 2 of the License, or
20 * (at your option) any later version.
21 *
22 * This program is distributed in the hope that it will be useful,
23 * but WITHOUT ANY WARRANTY; without even the implied warranty of
24 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
25 * GNU General Public License for more details.
26 *
27 * You should have received a copy of the GNU General Public License
28 * along with this program; if not, write to the Free Software
29 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
30 *
31 */
32
33 #include "squid.h"
34 #include "base/AsyncCbdataCalls.h"
35 #include "comm.h"
36 #include "comm/Connection.h"
37 #include "comm/Read.h"
38 #include "comm/Write.h"
39 #include "fd.h"
40 #include "fde.h"
41 #include "format/Quoting.h"
42 #include "helper.h"
43 #include "Mem.h"
44 #include "MemBuf.h"
45 #include "SquidIpc.h"
46 #include "SquidMath.h"
47 #include "SquidTime.h"
48 #include "Store.h"
49 #include "wordlist.h"
50
51 #define HELPER_MAX_ARGS 64
52
53 /** Initial Squid input buffer size. Helper responses may exceed this, and
54 * Squid will grow the input buffer as needed, up to ReadBufMaxSize.
55 */
56 const size_t ReadBufMinSize(4*1024);
57
58 /** Maximum safe size of a helper-to-Squid response message plus one.
59 * Squid will warn and close the stream if a helper sends a too-big response.
60 * ssl_crtd helper is known to produce responses of at least 10KB in size.
61 * Some undocumented helpers are known to produce responses exceeding 8KB.
62 */
63 const size_t ReadBufMaxSize(32*1024);
64
65 static IOCB helperHandleRead;
66 static IOCB helperStatefulHandleRead;
67 static void helperServerFree(helper_server *srv);
68 static void helperStatefulServerFree(helper_stateful_server *srv);
69 static void Enqueue(helper * hlp, helper_request *);
70 static helper_request *Dequeue(helper * hlp);
71 static helper_stateful_request *StatefulDequeue(statefulhelper * hlp);
72 static helper_server *GetFirstAvailable(helper * hlp);
73 static helper_stateful_server *StatefulGetFirstAvailable(statefulhelper * hlp);
74 static void helperDispatch(helper_server * srv, helper_request * r);
75 static void helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r);
76 static void helperKickQueue(helper * hlp);
77 static void helperStatefulKickQueue(statefulhelper * hlp);
78 static void helperStatefulServerDone(helper_stateful_server * srv);
79 static void helperRequestFree(helper_request * r);
80 static void helperStatefulRequestFree(helper_stateful_request * r);
81 static void StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r);
82 static bool helperStartStats(StoreEntry *sentry, void *hlp, const char *label);
83
84 CBDATA_CLASS_INIT(helper);
85 CBDATA_CLASS_INIT(helper_server);
86 CBDATA_CLASS_INIT(statefulhelper);
87 CBDATA_CLASS_INIT(helper_stateful_server);
88
89 InstanceIdDefinitions(HelperServerBase, "Hlpr");
90
91 void
92 HelperServerBase::initStats()
93 {
94 stats.uses=0;
95 stats.replies=0;
96 stats.pending=0;
97 stats.releases=0;
98 }
99
100 void
101 HelperServerBase::closePipesSafely()
102 {
103 #if _SQUID_WINDOWS_
104 shutdown(writePipe->fd, SD_BOTH);
105 #endif
106
107 flags.closing = true;
108 if (readPipe->fd == writePipe->fd)
109 readPipe->fd = -1;
110 else
111 readPipe->close();
112 writePipe->close();
113
114 #if _SQUID_WINDOWS_
115 if (hIpc) {
116 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
117 getCurrentTime();
118 debugs(84, DBG_IMPORTANT, "WARNING: " << hlp->id_name <<
119 " #" << index << " (" << hlp->cmdline->key << "," <<
120 (long int)pid << ") didn't exit in 5 seconds");
121 }
122 CloseHandle(hIpc);
123 }
124 #endif
125 }
126
127 void
128 HelperServerBase::closeWritePipeSafely()
129 {
130 #if _SQUID_WINDOWS_
131 shutdown(writePipe->fd, (readPipe->fd == writePipe->fd ? SD_BOTH : SD_SEND));
132 #endif
133
134 flags.closing = true;
135 if (readPipe->fd == writePipe->fd)
136 readPipe->fd = -1;
137 writePipe->close();
138
139 #if _SQUID_WINDOWS_
140 if (hIpc) {
141 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
142 getCurrentTime();
143 debugs(84, DBG_IMPORTANT, "WARNING: " << hlp->id_name <<
144 " #" << index << " (" << hlp->cmdline->key << "," <<
145 (long int)pid << ") didn't exit in 5 seconds");
146 }
147 CloseHandle(hIpc);
148 }
149 #endif
150 }
151
152 void
153 helperOpenServers(helper * hlp)
154 {
155 char *s;
156 char *progname;
157 char *shortname;
158 char *procname;
159 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
160 char fd_note_buf[FD_DESC_SZ];
161 helper_server *srv;
162 int nargs = 0;
163 int k;
164 pid_t pid;
165 int rfd;
166 int wfd;
167 void * hIpc;
168 wordlist *w;
169
170 if (hlp->cmdline == NULL)
171 return;
172
173 progname = hlp->cmdline->key;
174
175 if ((s = strrchr(progname, '/')))
176 shortname = xstrdup(s + 1);
177 else
178 shortname = xstrdup(progname);
179
180 /* figure out how many new child are actually needed. */
181 int need_new = hlp->childs.needNew();
182
183 debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
184
185 if (need_new < 1) {
186 debugs(84, DBG_IMPORTANT, "helperOpenServers: No '" << shortname << "' processes needed.");
187 }
188
189 procname = (char *)xmalloc(strlen(shortname) + 3);
190
191 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
192
193 args[nargs] = procname;
194 ++nargs;
195
196 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
197 args[nargs] = w->key;
198 ++nargs;
199 }
200
201 args[nargs] = NULL;
202 ++nargs;
203
204 assert(nargs <= HELPER_MAX_ARGS);
205
206 for (k = 0; k < need_new; ++k) {
207 getCurrentTime();
208 rfd = wfd = -1;
209 pid = ipcCreate(hlp->ipc_type,
210 progname,
211 args,
212 shortname,
213 hlp->addr,
214 &rfd,
215 &wfd,
216 &hIpc);
217
218 if (pid < 0) {
219 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
220 continue;
221 }
222
223 ++ hlp->childs.n_running;
224 ++ hlp->childs.n_active;
225 srv = new helper_server;
226 srv->hIpc = hIpc;
227 srv->pid = pid;
228 srv->initStats();
229 srv->addr = hlp->addr;
230 srv->readPipe = new Comm::Connection;
231 srv->readPipe->fd = rfd;
232 srv->writePipe = new Comm::Connection;
233 srv->writePipe->fd = wfd;
234 srv->rbuf = (char *)memAllocBuf(ReadBufMinSize, &srv->rbuf_sz);
235 srv->wqueue = new MemBuf;
236 srv->roffset = 0;
237 srv->requests = (helper_request **)xcalloc(hlp->childs.concurrency ? hlp->childs.concurrency : 1, sizeof(*srv->requests));
238 srv->parent = cbdataReference(hlp);
239 dlinkAddTail(srv, &srv->link, &hlp->servers);
240
241 if (rfd == wfd) {
242 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
243 fd_note(rfd, fd_note_buf);
244 } else {
245 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
246 fd_note(rfd, fd_note_buf);
247 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
248 fd_note(wfd, fd_note_buf);
249 }
250
251 commSetNonBlocking(rfd);
252
253 if (wfd != rfd)
254 commSetNonBlocking(wfd);
255
256 AsyncCall::Pointer closeCall = asyncCall(5,4, "helperServerFree", cbdataDialer(helperServerFree, srv));
257 comm_add_close_handler(rfd, closeCall);
258
259 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
260 CommIoCbPtrFun(helperHandleRead, srv));
261 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
262 }
263
264 hlp->last_restart = squid_curtime;
265 safe_free(shortname);
266 safe_free(procname);
267 helperKickQueue(hlp);
268 }
269
270 /**
271 * DPW 2007-05-08
272 *
273 * helperStatefulOpenServers: create the stateful child helper processes
274 */
275 void
276 helperStatefulOpenServers(statefulhelper * hlp)
277 {
278 char *shortname;
279 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
280 char fd_note_buf[FD_DESC_SZ];
281 int nargs = 0;
282
283 if (hlp->cmdline == NULL)
284 return;
285
286 if (hlp->childs.concurrency)
287 debugs(84, DBG_CRITICAL, "ERROR: concurrency= is not yet supported for stateful helpers ('" << hlp->cmdline << "')");
288
289 char *progname = hlp->cmdline->key;
290
291 char *s;
292 if ((s = strrchr(progname, '/')))
293 shortname = xstrdup(s + 1);
294 else
295 shortname = xstrdup(progname);
296
297 /* figure out haw mant new helpers are needed. */
298 int need_new = hlp->childs.needNew();
299
300 debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
301
302 if (need_new < 1) {
303 debugs(84, DBG_IMPORTANT, "helperStatefulOpenServers: No '" << shortname << "' processes needed.");
304 }
305
306 char *procname = (char *)xmalloc(strlen(shortname) + 3);
307
308 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
309
310 args[nargs] = procname;
311 ++nargs;
312
313 for (wordlist *w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
314 args[nargs] = w->key;
315 ++nargs;
316 }
317
318 args[nargs] = NULL;
319 ++nargs;
320
321 assert(nargs <= HELPER_MAX_ARGS);
322
323 for (int k = 0; k < need_new; ++k) {
324 getCurrentTime();
325 int rfd = -1;
326 int wfd = -1;
327 void * hIpc;
328 pid_t pid = ipcCreate(hlp->ipc_type,
329 progname,
330 args,
331 shortname,
332 hlp->addr,
333 &rfd,
334 &wfd,
335 &hIpc);
336
337 if (pid < 0) {
338 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
339 continue;
340 }
341
342 ++ hlp->childs.n_running;
343 ++ hlp->childs.n_active;
344 helper_stateful_server *srv = new helper_stateful_server;
345 srv->hIpc = hIpc;
346 srv->pid = pid;
347 srv->flags.reserved = false;
348 srv->initStats();
349 srv->addr = hlp->addr;
350 srv->readPipe = new Comm::Connection;
351 srv->readPipe->fd = rfd;
352 srv->writePipe = new Comm::Connection;
353 srv->writePipe->fd = wfd;
354 srv->rbuf = (char *)memAllocBuf(ReadBufMinSize, &srv->rbuf_sz);
355 srv->roffset = 0;
356 srv->parent = cbdataReference(hlp);
357
358 if (hlp->datapool != NULL)
359 srv->data = hlp->datapool->alloc();
360
361 dlinkAddTail(srv, &srv->link, &hlp->servers);
362
363 if (rfd == wfd) {
364 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
365 fd_note(rfd, fd_note_buf);
366 } else {
367 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
368 fd_note(rfd, fd_note_buf);
369 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
370 fd_note(wfd, fd_note_buf);
371 }
372
373 commSetNonBlocking(rfd);
374
375 if (wfd != rfd)
376 commSetNonBlocking(wfd);
377
378 AsyncCall::Pointer closeCall = asyncCall(5,4, "helperStatefulServerFree", cbdataDialer(helperStatefulServerFree, srv));
379 comm_add_close_handler(rfd, closeCall);
380
381 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
382 CommIoCbPtrFun(helperStatefulHandleRead, srv));
383 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
384 }
385
386 hlp->last_restart = squid_curtime;
387 safe_free(shortname);
388 safe_free(procname);
389 helperStatefulKickQueue(hlp);
390 }
391
392 void
393 helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
394 {
395 if (hlp == NULL) {
396 debugs(84, 3, "helperSubmit: hlp == NULL");
397 HelperReply nilReply;
398 callback(data, nilReply);
399 return;
400 }
401
402 helper_request *r = new helper_request;
403 helper_server *srv;
404
405 r->callback = callback;
406 r->data = cbdataReference(data);
407 r->buf = xstrdup(buf);
408
409 if ((srv = GetFirstAvailable(hlp)))
410 helperDispatch(srv, r);
411 else
412 Enqueue(hlp, r);
413
414 debugs(84, DBG_DATA, Raw("buf", buf, strlen(buf)));
415 }
416
417 /// lastserver = "server last used as part of a reserved request sequence"
418 void
419 helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPCB * callback, void *data, helper_stateful_server * lastserver)
420 {
421 if (hlp == NULL) {
422 debugs(84, 3, "helperStatefulSubmit: hlp == NULL");
423 HelperReply nilReply;
424 callback(data, nilReply);
425 return;
426 }
427
428 helper_stateful_request *r = new helper_stateful_request;
429
430 r->callback = callback;
431 r->data = cbdataReference(data);
432
433 if (buf != NULL) {
434 r->buf = xstrdup(buf);
435 r->placeholder = 0;
436 } else {
437 r->buf = NULL;
438 r->placeholder = 1;
439 }
440
441 if ((buf != NULL) && lastserver) {
442 debugs(84, 5, "StatefulSubmit with lastserver " << lastserver);
443 assert(lastserver->flags.reserved);
444 assert(!(lastserver->request));
445
446 debugs(84, 5, "StatefulSubmit dispatching");
447 helperStatefulDispatch(lastserver, r);
448 } else {
449 helper_stateful_server *srv;
450 if ((srv = StatefulGetFirstAvailable(hlp))) {
451 helperStatefulDispatch(srv, r);
452 } else
453 StatefulEnqueue(hlp, r);
454 }
455
456 debugs(84, DBG_DATA, "placeholder: '" << r->placeholder <<
457 "', " << Raw("buf", buf, strlen(buf)));
458 }
459
460 /**
461 * DPW 2007-05-08
462 *
463 * helperStatefulReleaseServer tells the helper that whoever was
464 * using it no longer needs its services.
465 */
466 void
467 helperStatefulReleaseServer(helper_stateful_server * srv)
468 {
469 debugs(84, 3, HERE << "srv-" << srv->index << " flags.reserved = " << srv->flags.reserved);
470 if (!srv->flags.reserved)
471 return;
472
473 ++ srv->stats.releases;
474
475 srv->flags.reserved = false;
476 if (srv->parent->OnEmptyQueue != NULL && srv->data)
477 srv->parent->OnEmptyQueue(srv->data);
478
479 helperStatefulServerDone(srv);
480 }
481
482 /** return a pointer to the stateful routines data area */
483 void *
484 helperStatefulServerGetData(helper_stateful_server * srv)
485 {
486 return srv->data;
487 }
488
489 /**
490 * Dump some stats about the helper states to a StoreEntry
491 */
492 void
493 helperStats(StoreEntry * sentry, helper * hlp, const char *label)
494 {
495 if (!helperStartStats(sentry, hlp, label))
496 return;
497
498 storeAppendPrintf(sentry, "program: %s\n",
499 hlp->cmdline->key);
500 storeAppendPrintf(sentry, "number active: %d of %d (%d shutting down)\n",
501 hlp->childs.n_active, hlp->childs.n_max, (hlp->childs.n_running - hlp->childs.n_active) );
502 storeAppendPrintf(sentry, "requests sent: %d\n",
503 hlp->stats.requests);
504 storeAppendPrintf(sentry, "replies received: %d\n",
505 hlp->stats.replies);
506 storeAppendPrintf(sentry, "queue length: %d\n",
507 hlp->stats.queue_size);
508 storeAppendPrintf(sentry, "avg service time: %d msec\n",
509 hlp->stats.avg_svc_time);
510 storeAppendPrintf(sentry, "\n");
511 storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%11s\t%s\t%7s\t%7s\t%7s\n",
512 "ID #",
513 "FD",
514 "PID",
515 "# Requests",
516 "# Replies",
517 "Flags",
518 "Time",
519 "Offset",
520 "Request");
521
522 for (dlink_node *link = hlp->servers.head; link; link = link->next) {
523 helper_server *srv = (helper_server*)link->data;
524 double tt = 0.001 * (srv->requests[0] ? tvSubMsec(srv->requests[0]->dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
525 storeAppendPrintf(sentry, "%7u\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c\t%7.3f\t%7d\t%s\n",
526 srv->index.value,
527 srv->readPipe->fd,
528 srv->pid,
529 srv->stats.uses,
530 srv->stats.replies,
531 srv->stats.pending ? 'B' : ' ',
532 srv->flags.writing ? 'W' : ' ',
533 srv->flags.closing ? 'C' : ' ',
534 srv->flags.shutdown ? 'S' : ' ',
535 tt < 0.0 ? 0.0 : tt,
536 (int) srv->roffset,
537 srv->requests[0] ? Format::QuoteMimeBlob(srv->requests[0]->buf) : "(none)");
538 }
539
540 storeAppendPrintf(sentry, "\nFlags key:\n\n");
541 storeAppendPrintf(sentry, " B = BUSY\n");
542 storeAppendPrintf(sentry, " W = WRITING\n");
543 storeAppendPrintf(sentry, " C = CLOSING\n");
544 storeAppendPrintf(sentry, " S = SHUTDOWN PENDING\n");
545 }
546
547 void
548 helperStatefulStats(StoreEntry * sentry, statefulhelper * hlp, const char *label)
549 {
550 if (!helperStartStats(sentry, hlp, label))
551 return;
552
553 storeAppendPrintf(sentry, "program: %s\n",
554 hlp->cmdline->key);
555 storeAppendPrintf(sentry, "number active: %d of %d (%d shutting down)\n",
556 hlp->childs.n_active, hlp->childs.n_max, (hlp->childs.n_running - hlp->childs.n_active) );
557 storeAppendPrintf(sentry, "requests sent: %d\n",
558 hlp->stats.requests);
559 storeAppendPrintf(sentry, "replies received: %d\n",
560 hlp->stats.replies);
561 storeAppendPrintf(sentry, "queue length: %d\n",
562 hlp->stats.queue_size);
563 storeAppendPrintf(sentry, "avg service time: %d msec\n",
564 hlp->stats.avg_svc_time);
565 storeAppendPrintf(sentry, "\n");
566 storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
567 "ID #",
568 "FD",
569 "PID",
570 "# Requests",
571 "# Replies",
572 "Flags",
573 "Time",
574 "Offset",
575 "Request");
576
577 for (dlink_node *link = hlp->servers.head; link; link = link->next) {
578 helper_stateful_server *srv = (helper_stateful_server *)link->data;
579 double tt = 0.001 * tvSubMsec(srv->dispatch_time, srv->flags.busy ? current_time : srv->answer_time);
580 storeAppendPrintf(sentry, "%7u\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
581 srv->index.value,
582 srv->readPipe->fd,
583 srv->pid,
584 srv->stats.uses,
585 srv->stats.replies,
586 srv->flags.busy ? 'B' : ' ',
587 srv->flags.closing ? 'C' : ' ',
588 srv->flags.reserved ? 'R' : ' ',
589 srv->flags.shutdown ? 'S' : ' ',
590 srv->request ? (srv->request->placeholder ? 'P' : ' ') : ' ',
591 tt < 0.0 ? 0.0 : tt,
592 (int) srv->roffset,
593 srv->request ? Format::QuoteMimeBlob(srv->request->buf) : "(none)");
594 }
595
596 storeAppendPrintf(sentry, "\nFlags key:\n\n");
597 storeAppendPrintf(sentry, " B = BUSY\n");
598 storeAppendPrintf(sentry, " C = CLOSING\n");
599 storeAppendPrintf(sentry, " R = RESERVED\n");
600 storeAppendPrintf(sentry, " S = SHUTDOWN PENDING\n");
601 storeAppendPrintf(sentry, " P = PLACEHOLDER\n");
602 }
603
604 void
605 helperShutdown(helper * hlp)
606 {
607 dlink_node *link = hlp->servers.head;
608
609 while (link) {
610 helper_server *srv;
611 srv = (helper_server *)link->data;
612 link = link->next;
613
614 if (srv->flags.shutdown) {
615 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
616 continue;
617 }
618
619 assert(hlp->childs.n_active > 0);
620 -- hlp->childs.n_active;
621 srv->flags.shutdown = true; /* request it to shut itself down */
622
623 if (srv->flags.closing) {
624 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
625 continue;
626 }
627
628 if (srv->stats.pending) {
629 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
630 continue;
631 }
632
633 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
634 /* the rest of the details is dealt with in the helperServerFree
635 * close handler
636 */
637 srv->closePipesSafely();
638 }
639 }
640
641 void
642 helperStatefulShutdown(statefulhelper * hlp)
643 {
644 dlink_node *link = hlp->servers.head;
645 helper_stateful_server *srv;
646
647 while (link) {
648 srv = (helper_stateful_server *)link->data;
649 link = link->next;
650
651 if (srv->flags.shutdown) {
652 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
653 continue;
654 }
655
656 assert(hlp->childs.n_active > 0);
657 -- hlp->childs.n_active;
658 srv->flags.shutdown = true; /* request it to shut itself down */
659
660 if (srv->flags.busy) {
661 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
662 continue;
663 }
664
665 if (srv->flags.closing) {
666 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
667 continue;
668 }
669
670 if (srv->flags.reserved) {
671 if (shutting_down) {
672 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Closing anyway.");
673 } else {
674 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Not Shutting Down Yet.");
675 continue;
676 }
677 }
678
679 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
680
681 /* the rest of the details is dealt with in the helperStatefulServerFree
682 * close handler
683 */
684 srv->closePipesSafely();
685 }
686 }
687
688 helper::~helper()
689 {
690 /* note, don't free id_name, it probably points to static memory */
691
692 if (queue.head)
693 debugs(84, DBG_CRITICAL, "WARNING: freeing " << id_name << " helper with " << stats.queue_size << " requests queued");
694 }
695
696 /* ====================================================================== */
697 /* LOCAL FUNCTIONS */
698 /* ====================================================================== */
699
700 static void
701 helperServerFree(helper_server *srv)
702 {
703 helper *hlp = srv->parent;
704 helper_request *r;
705 int i, concurrency = hlp->childs.concurrency;
706
707 if (!concurrency)
708 concurrency = 1;
709
710 if (srv->rbuf) {
711 memFreeBuf(srv->rbuf_sz, srv->rbuf);
712 srv->rbuf = NULL;
713 }
714
715 srv->wqueue->clean();
716 delete srv->wqueue;
717
718 if (srv->writebuf) {
719 srv->writebuf->clean();
720 delete srv->writebuf;
721 srv->writebuf = NULL;
722 }
723
724 if (Comm::IsConnOpen(srv->writePipe))
725 srv->closeWritePipeSafely();
726
727 dlinkDelete(&srv->link, &hlp->servers);
728
729 assert(hlp->childs.n_running > 0);
730 -- hlp->childs.n_running;
731
732 if (!srv->flags.shutdown) {
733 assert(hlp->childs.n_active > 0);
734 -- hlp->childs.n_active;
735 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index << " exited");
736
737 if (hlp->childs.needNew() > 0) {
738 debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
739
740 if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30) {
741 if (srv->stats.replies < 1)
742 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
743 else
744 debugs(80, DBG_CRITICAL, "ERROR: The " << hlp->id_name << " helpers are crashing too rapidly, need help!");
745 }
746
747 debugs(80, DBG_IMPORTANT, "Starting new helpers");
748 helperOpenServers(hlp);
749 }
750 }
751
752 for (i = 0; i < concurrency; ++i) {
753 // XXX: re-schedule these on another helper?
754 if ((r = srv->requests[i])) {
755 void *cbdata;
756
757 if (cbdataReferenceValidDone(r->data, &cbdata)) {
758 HelperReply nilReply;
759 r->callback(cbdata, nilReply);
760 }
761
762 helperRequestFree(r);
763
764 srv->requests[i] = NULL;
765 }
766 }
767 safe_free(srv->requests);
768
769 cbdataReferenceDone(srv->parent);
770 delete srv;
771 }
772
773 static void
774 helperStatefulServerFree(helper_stateful_server *srv)
775 {
776 statefulhelper *hlp = srv->parent;
777 helper_stateful_request *r;
778
779 if (srv->rbuf) {
780 memFreeBuf(srv->rbuf_sz, srv->rbuf);
781 srv->rbuf = NULL;
782 }
783
784 #if 0
785 srv->wqueue->clean();
786
787 delete srv->wqueue;
788
789 #endif
790
791 /* TODO: walk the local queue of requests and carry them all out */
792 if (Comm::IsConnOpen(srv->writePipe))
793 srv->closeWritePipeSafely();
794
795 dlinkDelete(&srv->link, &hlp->servers);
796
797 assert(hlp->childs.n_running > 0);
798 -- hlp->childs.n_running;
799
800 if (!srv->flags.shutdown) {
801 assert( hlp->childs.n_active > 0);
802 -- hlp->childs.n_active;
803 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index << " exited");
804
805 if (hlp->childs.needNew() > 0) {
806 debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
807
808 if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30) {
809 if (srv->stats.replies < 1)
810 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
811 else
812 debugs(80, DBG_CRITICAL, "ERROR: The " << hlp->id_name << " helpers are crashing too rapidly, need help!");
813 }
814
815 debugs(80, DBG_IMPORTANT, "Starting new helpers");
816 helperStatefulOpenServers(hlp);
817 }
818 }
819
820 if ((r = srv->request)) {
821 void *cbdata;
822
823 if (cbdataReferenceValidDone(r->data, &cbdata)) {
824 HelperReply nilReply;
825 nilReply.whichServer = srv;
826 r->callback(cbdata, nilReply);
827 }
828
829 helperStatefulRequestFree(r);
830
831 srv->request = NULL;
832 }
833
834 if (srv->data != NULL)
835 hlp->datapool->freeOne(srv->data);
836
837 cbdataReferenceDone(srv->parent);
838
839 delete srv;
840 }
841
842 /// Calls back with a pointer to the buffer with the helper output
843 static void
844 helperReturnBuffer(int request_number, helper_server * srv, helper * hlp, char * msg, char * msg_end)
845 {
846 helper_request *r = srv->requests[request_number];
847 if (r) {
848 HLPCB *callback = r->callback;
849
850 srv->requests[request_number] = NULL;
851
852 r->callback = NULL;
853
854 void *cbdata = NULL;
855 if (cbdataReferenceValidDone(r->data, &cbdata)) {
856 HelperReply response(msg, (msg_end-msg));
857 callback(cbdata, response);
858 }
859
860 -- srv->stats.pending;
861 ++ srv->stats.replies;
862
863 ++ hlp->stats.replies;
864
865 srv->answer_time = current_time;
866
867 srv->dispatch_time = r->dispatch_time;
868
869 hlp->stats.avg_svc_time =
870 Math::intAverage(hlp->stats.avg_svc_time,
871 tvSubMsec(r->dispatch_time, current_time),
872 hlp->stats.replies, REDIRECT_AV_FACTOR);
873
874 helperRequestFree(r);
875 } else {
876 debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected reply on channel " <<
877 request_number << " from " << hlp->id_name << " #" << srv->index <<
878 " '" << srv->rbuf << "'");
879 }
880
881 if (!srv->flags.shutdown) {
882 helperKickQueue(hlp);
883 } else if (!srv->flags.closing && !srv->stats.pending) {
884 srv->flags.closing=true;
885 srv->writePipe->close();
886 }
887 }
888
889 static void
890 helperHandleRead(const Comm::ConnectionPointer &conn, char *buf, size_t len, Comm::Flag flag, int xerrno, void *data)
891 {
892 char *t = NULL;
893 helper_server *srv = (helper_server *)data;
894 helper *hlp = srv->parent;
895 assert(cbdataReferenceValid(data));
896
897 /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
898
899 if (flag == Comm::ERR_CLOSING) {
900 return;
901 }
902
903 assert(conn->fd == srv->readPipe->fd);
904
905 debugs(84, 5, "helperHandleRead: " << len << " bytes from " << hlp->id_name << " #" << srv->index);
906
907 if (flag != Comm::OK || len == 0) {
908 srv->closePipesSafely();
909 return;
910 }
911
912 srv->roffset += len;
913 srv->rbuf[srv->roffset] = '\0';
914 debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
915
916 if (!srv->stats.pending) {
917 /* someone spoke without being spoken to */
918 debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected read from " <<
919 hlp->id_name << " #" << srv->index << ", " << (int)len <<
920 " bytes '" << srv->rbuf << "'");
921
922 srv->roffset = 0;
923 srv->rbuf[0] = '\0';
924 }
925
926 while ((t = strchr(srv->rbuf, hlp->eom))) {
927 /* end of reply found */
928 char *msg = srv->rbuf;
929 int i = 0;
930 int skip = 1;
931 debugs(84, 3, "helperHandleRead: end of reply found");
932
933 if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n') {
934 *t = '\0';
935 // rewind to the \r octet which is the real terminal now
936 // and remember that we have to skip forward 2 places now.
937 skip = 2;
938 --t;
939 }
940
941 *t = '\0';
942
943 if (hlp->childs.concurrency) {
944 i = strtol(msg, &msg, 10);
945
946 while (*msg && xisspace(*msg))
947 ++msg;
948 }
949
950 helperReturnBuffer(i, srv, hlp, msg, t);
951 srv->roffset -= (t - srv->rbuf) + skip;
952 memmove(srv->rbuf, t + skip, srv->roffset);
953 srv->rbuf[srv->roffset] = '\0';
954 }
955
956 if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
957 int spaceSize = srv->rbuf_sz - srv->roffset - 1;
958 assert(spaceSize >= 0);
959
960 // grow the input buffer if needed and possible
961 if (!spaceSize && srv->rbuf_sz + 4096 <= ReadBufMaxSize) {
962 srv->rbuf = (char *)memReallocBuf(srv->rbuf, srv->rbuf_sz + 4096, &srv->rbuf_sz);
963 debugs(84, 3, HERE << "Grew read buffer to " << srv->rbuf_sz);
964 spaceSize = srv->rbuf_sz - srv->roffset - 1;
965 assert(spaceSize >= 0);
966 }
967
968 // quit reading if there is no space left
969 if (!spaceSize) {
970 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
971 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
972 "Squid input buffer: " << hlp->id_name << " #" << srv->index);
973 srv->closePipesSafely();
974 return;
975 }
976
977 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
978 CommIoCbPtrFun(helperHandleRead, srv));
979 comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
980 }
981 }
982
983 static void
984 helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *buf, size_t len, Comm::Flag flag, int xerrno, void *data)
985 {
986 char *t = NULL;
987 helper_stateful_server *srv = (helper_stateful_server *)data;
988 helper_stateful_request *r;
989 statefulhelper *hlp = srv->parent;
990 assert(cbdataReferenceValid(data));
991
992 /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
993
994 if (flag == Comm::ERR_CLOSING) {
995 return;
996 }
997
998 assert(conn->fd == srv->readPipe->fd);
999
1000 debugs(84, 5, "helperStatefulHandleRead: " << len << " bytes from " <<
1001 hlp->id_name << " #" << srv->index);
1002
1003 if (flag != Comm::OK || len == 0) {
1004 srv->closePipesSafely();
1005 return;
1006 }
1007
1008 srv->roffset += len;
1009 srv->rbuf[srv->roffset] = '\0';
1010 r = srv->request;
1011 debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
1012
1013 if (r == NULL) {
1014 /* someone spoke without being spoken to */
1015 debugs(84, DBG_IMPORTANT, "helperStatefulHandleRead: unexpected read from " <<
1016 hlp->id_name << " #" << srv->index << ", " << (int)len <<
1017 " bytes '" << srv->rbuf << "'");
1018
1019 srv->roffset = 0;
1020 }
1021
1022 if ((t = strchr(srv->rbuf, hlp->eom))) {
1023 /* end of reply found */
1024 int called = 1;
1025 int skip = 1;
1026 debugs(84, 3, "helperStatefulHandleRead: end of reply found");
1027
1028 if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n') {
1029 *t = '\0';
1030 // rewind to the \r octet which is the real terminal now
1031 // and remember that we have to skip forward 2 places now.
1032 skip = 2;
1033 --t;
1034 }
1035
1036 *t = '\0';
1037
1038 if (r && cbdataReferenceValid(r->data)) {
1039 HelperReply res(srv->rbuf, (t - srv->rbuf));
1040 res.whichServer = srv;
1041 r->callback(r->data, res);
1042 } else {
1043 debugs(84, DBG_IMPORTANT, "StatefulHandleRead: no callback data registered");
1044 called = 0;
1045 }
1046 // only skip off the \0's _after_ passing its location in HelperReply above
1047 t += skip;
1048
1049 srv->flags.busy = false;
1050 /**
1051 * BUG: the below assumes that only one response per read() was received and discards any octets remaining.
1052 * Doing this prohibits concurrency support with multiple replies per read().
1053 * TODO: check that read() setup on these buffers pays attention to roffest!=0
1054 * TODO: check that replies bigger than the buffer are discarded and do not to affect future replies
1055 */
1056 srv->roffset = 0;
1057 helperStatefulRequestFree(r);
1058 srv->request = NULL;
1059
1060 -- srv->stats.pending;
1061 ++ srv->stats.replies;
1062
1063 ++ hlp->stats.replies;
1064 srv->answer_time = current_time;
1065 hlp->stats.avg_svc_time =
1066 Math::intAverage(hlp->stats.avg_svc_time,
1067 tvSubMsec(srv->dispatch_time, current_time),
1068 hlp->stats.replies, REDIRECT_AV_FACTOR);
1069
1070 if (called)
1071 helperStatefulServerDone(srv);
1072 else
1073 helperStatefulReleaseServer(srv);
1074 }
1075
1076 if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
1077 int spaceSize = srv->rbuf_sz - srv->roffset - 1;
1078 assert(spaceSize >= 0);
1079
1080 // grow the input buffer if needed and possible
1081 if (!spaceSize && srv->rbuf_sz + 4096 <= ReadBufMaxSize) {
1082 srv->rbuf = (char *)memReallocBuf(srv->rbuf, srv->rbuf_sz + 4096, &srv->rbuf_sz);
1083 debugs(84, 3, HERE << "Grew read buffer to " << srv->rbuf_sz);
1084 spaceSize = srv->rbuf_sz - srv->roffset - 1;
1085 assert(spaceSize >= 0);
1086 }
1087
1088 // quit reading if there is no space left
1089 if (!spaceSize) {
1090 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
1091 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
1092 "Squid input buffer: " << hlp->id_name << " #" << srv->index);
1093 srv->closePipesSafely();
1094 return;
1095 }
1096
1097 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
1098 CommIoCbPtrFun(helperStatefulHandleRead, srv));
1099 comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
1100 }
1101 }
1102
1103 static void
1104 Enqueue(helper * hlp, helper_request * r)
1105 {
1106 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
1107 dlinkAddTail(r, link, &hlp->queue);
1108 ++ hlp->stats.queue_size;
1109
1110 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1111 if (hlp->childs.needNew() > 0) {
1112 debugs(84, DBG_CRITICAL, "Starting new " << hlp->id_name << " helpers...");
1113 helperOpenServers(hlp);
1114 return;
1115 }
1116
1117 if (hlp->stats.queue_size < (int)hlp->childs.n_running)
1118 return;
1119
1120 if (squid_curtime - hlp->last_queue_warn < 600)
1121 return;
1122
1123 if (shutting_down || reconfiguring)
1124 return;
1125
1126 hlp->last_queue_warn = squid_curtime;
1127
1128 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1129 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1130 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1131
1132 if (hlp->stats.queue_size > (int)hlp->childs.n_running * 2)
1133 fatalf("Too many queued %s requests", hlp->id_name);
1134 }
1135
1136 static void
1137 StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r)
1138 {
1139 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
1140 dlinkAddTail(r, link, &hlp->queue);
1141 ++ hlp->stats.queue_size;
1142
1143 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1144 if (hlp->childs.needNew() > 0) {
1145 debugs(84, DBG_CRITICAL, "Starting new " << hlp->id_name << " helpers...");
1146 helperStatefulOpenServers(hlp);
1147 return;
1148 }
1149
1150 if (hlp->stats.queue_size < (int)hlp->childs.n_running)
1151 return;
1152
1153 if (hlp->stats.queue_size > (int)hlp->childs.n_running * 2)
1154 fatalf("Too many queued %s requests", hlp->id_name);
1155
1156 if (squid_curtime - hlp->last_queue_warn < 600)
1157 return;
1158
1159 if (shutting_down || reconfiguring)
1160 return;
1161
1162 hlp->last_queue_warn = squid_curtime;
1163
1164 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1165 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1166 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1167 }
1168
1169 static helper_request *
1170 Dequeue(helper * hlp)
1171 {
1172 dlink_node *link;
1173 helper_request *r = NULL;
1174
1175 if ((link = hlp->queue.head)) {
1176 r = (helper_request *)link->data;
1177 dlinkDelete(link, &hlp->queue);
1178 memFree(link, MEM_DLINK_NODE);
1179 -- hlp->stats.queue_size;
1180 }
1181
1182 return r;
1183 }
1184
1185 static helper_stateful_request *
1186 StatefulDequeue(statefulhelper * hlp)
1187 {
1188 dlink_node *link;
1189 helper_stateful_request *r = NULL;
1190
1191 if ((link = hlp->queue.head)) {
1192 r = (helper_stateful_request *)link->data;
1193 dlinkDelete(link, &hlp->queue);
1194 memFree(link, MEM_DLINK_NODE);
1195 -- hlp->stats.queue_size;
1196 }
1197
1198 return r;
1199 }
1200
1201 static helper_server *
1202 GetFirstAvailable(helper * hlp)
1203 {
1204 dlink_node *n;
1205 helper_server *srv;
1206 helper_server *selected = NULL;
1207 debugs(84, 5, "GetFirstAvailable: Running servers " << hlp->childs.n_running);
1208
1209 if (hlp->childs.n_running == 0)
1210 return NULL;
1211
1212 /* Find "least" loaded helper (approx) */
1213 for (n = hlp->servers.head; n != NULL; n = n->next) {
1214 srv = (helper_server *)n->data;
1215
1216 if (selected && selected->stats.pending <= srv->stats.pending)
1217 continue;
1218
1219 if (srv->flags.shutdown)
1220 continue;
1221
1222 if (!srv->stats.pending)
1223 return srv;
1224
1225 if (selected) {
1226 selected = srv;
1227 break;
1228 }
1229
1230 selected = srv;
1231 }
1232
1233 /* Check for overload */
1234 if (!selected) {
1235 debugs(84, 5, "GetFirstAvailable: None available.");
1236 return NULL;
1237 }
1238
1239 if (selected->stats.pending >= (hlp->childs.concurrency ? hlp->childs.concurrency : 1)) {
1240 debugs(84, 3, "GetFirstAvailable: Least-loaded helper is overloaded!");
1241 return NULL;
1242 }
1243
1244 debugs(84, 5, "GetFirstAvailable: returning srv-" << selected->index);
1245 return selected;
1246 }
1247
1248 static helper_stateful_server *
1249 StatefulGetFirstAvailable(statefulhelper * hlp)
1250 {
1251 dlink_node *n;
1252 helper_stateful_server *srv = NULL;
1253 debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp->childs.n_running);
1254
1255 if (hlp->childs.n_running == 0)
1256 return NULL;
1257
1258 for (n = hlp->servers.head; n != NULL; n = n->next) {
1259 srv = (helper_stateful_server *)n->data;
1260
1261 if (srv->flags.busy)
1262 continue;
1263
1264 if (srv->flags.reserved)
1265 continue;
1266
1267 if (srv->flags.shutdown)
1268 continue;
1269
1270 if ((hlp->IsAvailable != NULL) && (srv->data != NULL) && !(hlp->IsAvailable(srv->data)))
1271 continue;
1272
1273 debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv->index);
1274 return srv;
1275 }
1276
1277 debugs(84, 5, "StatefulGetFirstAvailable: None available.");
1278 return NULL;
1279 }
1280
1281 static void
1282 helperDispatchWriteDone(const Comm::ConnectionPointer &conn, char *buf, size_t len, Comm::Flag flag, int xerrno, void *data)
1283 {
1284 helper_server *srv = (helper_server *)data;
1285
1286 srv->writebuf->clean();
1287 delete srv->writebuf;
1288 srv->writebuf = NULL;
1289 srv->flags.writing = false;
1290
1291 if (flag != Comm::OK) {
1292 /* Helper server has crashed */
1293 debugs(84, DBG_CRITICAL, "helperDispatch: Helper " << srv->parent->id_name << " #" << srv->index << " has crashed");
1294 return;
1295 }
1296
1297 if (!srv->wqueue->isNull()) {
1298 srv->writebuf = srv->wqueue;
1299 srv->wqueue = new MemBuf;
1300 srv->flags.writing = true;
1301 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1302 CommIoCbPtrFun(helperDispatchWriteDone, srv));
1303 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
1304 }
1305 }
1306
1307 static void
1308 helperDispatch(helper_server * srv, helper_request * r)
1309 {
1310 helper *hlp = srv->parent;
1311 helper_request **ptr = NULL;
1312 unsigned int slot;
1313
1314 if (!cbdataReferenceValid(r->data)) {
1315 debugs(84, DBG_IMPORTANT, "helperDispatch: invalid callback data");
1316 helperRequestFree(r);
1317 return;
1318 }
1319
1320 for (slot = 0; slot < (hlp->childs.concurrency ? hlp->childs.concurrency : 1); ++slot) {
1321 if (!srv->requests[slot]) {
1322 ptr = &srv->requests[slot];
1323 break;
1324 }
1325 }
1326
1327 assert(ptr);
1328 *ptr = r;
1329 r->dispatch_time = current_time;
1330
1331 if (srv->wqueue->isNull())
1332 srv->wqueue->init();
1333
1334 if (hlp->childs.concurrency)
1335 srv->wqueue->Printf("%d %s", slot, r->buf);
1336 else
1337 srv->wqueue->append(r->buf, strlen(r->buf));
1338
1339 if (!srv->flags.writing) {
1340 assert(NULL == srv->writebuf);
1341 srv->writebuf = srv->wqueue;
1342 srv->wqueue = new MemBuf;
1343 srv->flags.writing = true;
1344 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1345 CommIoCbPtrFun(helperDispatchWriteDone, srv));
1346 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
1347 }
1348
1349 debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index << ", " << strlen(r->buf) << " bytes");
1350
1351 ++ srv->stats.uses;
1352 ++ srv->stats.pending;
1353 ++ hlp->stats.requests;
1354 }
1355
1356 static void
1357 helperStatefulDispatchWriteDone(const Comm::ConnectionPointer &conn, char *buf, size_t len, Comm::Flag flag,
1358 int xerrno, void *data)
1359 {
1360 /* nothing! */
1361 }
1362
1363 static void
1364 helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r)
1365 {
1366 statefulhelper *hlp = srv->parent;
1367
1368 if (!cbdataReferenceValid(r->data)) {
1369 debugs(84, DBG_IMPORTANT, "helperStatefulDispatch: invalid callback data");
1370 helperStatefulRequestFree(r);
1371 helperStatefulReleaseServer(srv);
1372 return;
1373 }
1374
1375 debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index);
1376
1377 if (r->placeholder == 1) {
1378 /* a callback is needed before this request can _use_ a helper. */
1379 /* we don't care about releasing this helper. The request NEVER
1380 * gets to the helper. So we throw away the return code */
1381 HelperReply nilReply;
1382 nilReply.whichServer = srv;
1383 r->callback(r->data, nilReply);
1384 /* throw away the placeholder */
1385 helperStatefulRequestFree(r);
1386 /* and push the queue. Note that the callback may have submitted a new
1387 * request to the helper which is why we test for the request */
1388
1389 if (srv->request == NULL)
1390 helperStatefulServerDone(srv);
1391
1392 return;
1393 }
1394
1395 srv->flags.busy = true;
1396 srv->flags.reserved = true;
1397 srv->request = r;
1398 srv->dispatch_time = current_time;
1399 AsyncCall::Pointer call = commCbCall(5,5, "helperStatefulDispatchWriteDone",
1400 CommIoCbPtrFun(helperStatefulDispatchWriteDone, hlp));
1401 Comm::Write(srv->writePipe, r->buf, strlen(r->buf), call, NULL);
1402 debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
1403 hlp->id_name << " #" << srv->index << ", " <<
1404 (int) strlen(r->buf) << " bytes");
1405
1406 ++ srv->stats.uses;
1407 ++ srv->stats.pending;
1408 ++ hlp->stats.requests;
1409 }
1410
1411 static void
1412 helperKickQueue(helper * hlp)
1413 {
1414 helper_request *r;
1415 helper_server *srv;
1416
1417 while ((srv = GetFirstAvailable(hlp)) && (r = Dequeue(hlp)))
1418 helperDispatch(srv, r);
1419 }
1420
1421 static void
1422 helperStatefulKickQueue(statefulhelper * hlp)
1423 {
1424 helper_stateful_request *r;
1425 helper_stateful_server *srv;
1426
1427 while ((srv = StatefulGetFirstAvailable(hlp)) && (r = StatefulDequeue(hlp)))
1428 helperStatefulDispatch(srv, r);
1429 }
1430
1431 static void
1432 helperStatefulServerDone(helper_stateful_server * srv)
1433 {
1434 if (!srv->flags.shutdown) {
1435 helperStatefulKickQueue(srv->parent);
1436 } else if (!srv->flags.closing && !srv->flags.reserved && !srv->flags.busy) {
1437 srv->closeWritePipeSafely();
1438 return;
1439 }
1440 }
1441
1442 static void
1443 helperRequestFree(helper_request * r)
1444 {
1445 cbdataReferenceDone(r->data);
1446 xfree(r->buf);
1447 delete r;
1448 }
1449
1450 static void
1451 helperStatefulRequestFree(helper_stateful_request * r)
1452 {
1453 if (r) {
1454 cbdataReferenceDone(r->data);
1455 xfree(r->buf);
1456 delete r;
1457 }
1458 }
1459
1460 // TODO: should helper_ and helper_stateful_ have a common parent?
1461 static bool
1462 helperStartStats(StoreEntry *sentry, void *hlp, const char *label)
1463 {
1464 if (!hlp) {
1465 if (label)
1466 storeAppendPrintf(sentry, "%s: unavailable\n", label);
1467 return false;
1468 }
1469
1470 if (label)
1471 storeAppendPrintf(sentry, "%s:\n", label);
1472
1473 return true;
1474 }