]> git.ipfire.org Git - thirdparty/squid.git/blob - src/helper.cc
Removed squid-old.h
[thirdparty/squid.git] / src / helper.cc
1 /*
2 * $Id$
3 *
4 * DEBUG: section 84 Helper process maintenance
5 * AUTHOR: Harvest Derived?
6 *
7 * SQUID Web Proxy Cache http://www.squid-cache.org/
8 * ----------------------------------------------------------
9 *
10 * Squid is the result of efforts by numerous individuals from
11 * the Internet community; see the CONTRIBUTORS file for full
12 * details. Many organizations have provided support for Squid's
13 * development; see the SPONSORS file for full details. Squid is
14 * Copyrighted (C) 2001 by the Regents of the University of
15 * California; see the COPYRIGHT file for full details. Squid
16 * incorporates software developed and/or copyrighted by other
17 * sources; see the CREDITS file for full details.
18 *
19 * This program is free software; you can redistribute it and/or modify
20 * it under the terms of the GNU General Public License as published by
21 * the Free Software Foundation; either version 2 of the License, or
22 * (at your option) any later version.
23 *
24 * This program is distributed in the hope that it will be useful,
25 * but WITHOUT ANY WARRANTY; without even the implied warranty of
26 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
27 * GNU General Public License for more details.
28 *
29 * You should have received a copy of the GNU General Public License
30 * along with this program; if not, write to the Free Software
31 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
32 *
33 */
34
35 #include "squid.h"
36 #include "base/AsyncCbdataCalls.h"
37 #include "comm.h"
38 #include "comm/Connection.h"
39 #include "comm/Write.h"
40 #include "format/Quoting.h"
41 #include "helper.h"
42 #include "MemBuf.h"
43 #include "protos.h"
44 #include "SquidMath.h"
45 #include "SquidTime.h"
46 #include "Store.h"
47 #include "wordlist.h"
48
49 #define HELPER_MAX_ARGS 64
50
51
52 /** Initial Squid input buffer size. Helper responses may exceed this, and
53 * Squid will grow the input buffer as needed, up to ReadBufMaxSize.
54 */
55 const size_t ReadBufMinSize(4*1024);
56
57 /** Maximum safe size of a helper-to-Squid response message plus one.
58 * Squid will warn and close the stream if a helper sends a too-big response.
59 * ssl_crtd helper is known to produce responses of at least 10KB in size.
60 * Some undocumented helpers are known to produce responses exceeding 8KB.
61 */
62 const size_t ReadBufMaxSize(32*1024);
63
64 static IOCB helperHandleRead;
65 static IOCB helperStatefulHandleRead;
66 static void helperServerFree(helper_server *srv);
67 static void helperStatefulServerFree(helper_stateful_server *srv);
68 static void Enqueue(helper * hlp, helper_request *);
69 static helper_request *Dequeue(helper * hlp);
70 static helper_stateful_request *StatefulDequeue(statefulhelper * hlp);
71 static helper_server *GetFirstAvailable(helper * hlp);
72 static helper_stateful_server *StatefulGetFirstAvailable(statefulhelper * hlp);
73 static void helperDispatch(helper_server * srv, helper_request * r);
74 static void helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r);
75 static void helperKickQueue(helper * hlp);
76 static void helperStatefulKickQueue(statefulhelper * hlp);
77 static void helperStatefulServerDone(helper_stateful_server * srv);
78 static void helperRequestFree(helper_request * r);
79 static void helperStatefulRequestFree(helper_stateful_request * r);
80 static void StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r);
81 static bool helperStartStats(StoreEntry *sentry, void *hlp, const char *label);
82
83
84 CBDATA_CLASS_INIT(helper);
85 CBDATA_TYPE(helper_server);
86 CBDATA_CLASS_INIT(statefulhelper);
87 CBDATA_TYPE(helper_stateful_server);
88
89 void
90 HelperServerBase::closePipesSafely()
91 {
92 #if _SQUID_MSWIN_
93 int no = index + 1;
94
95 shutdown(writePipe->fd, SD_BOTH);
96 #endif
97
98 flags.closing = 1;
99 if (readPipe->fd == writePipe->fd)
100 readPipe->fd = -1;
101 else
102 readPipe->close();
103 writePipe->close();
104
105 #if _SQUID_MSWIN_
106 if (hIpc) {
107 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
108 getCurrentTime();
109 debugs(84, DBG_IMPORTANT, "WARNING: " << hlp->id_name <<
110 " #" << no << " (" << hlp->cmdline->key << "," <<
111 (long int)pid << ") didn't exit in 5 seconds");
112 }
113 CloseHandle(hIpc);
114 }
115 #endif
116 }
117
118 void
119 HelperServerBase::closeWritePipeSafely()
120 {
121 #if _SQUID_MSWIN_
122 int no = index + 1;
123
124 shutdown(writePipe->fd, (readPipe->fd == writePipe->fd ? SD_BOTH : SD_SEND));
125 #endif
126
127 flags.closing = 1;
128 if (readPipe->fd == writePipe->fd)
129 readPipe->fd = -1;
130 writePipe->close();
131
132 #if _SQUID_MSWIN_
133 if (hIpc) {
134 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
135 getCurrentTime();
136 debugs(84, DBG_IMPORTANT, "WARNING: " << hlp->id_name <<
137 " #" << no << " (" << hlp->cmdline->key << "," <<
138 (long int)pid << ") didn't exit in 5 seconds");
139 }
140 CloseHandle(hIpc);
141 }
142 #endif
143 }
144
145 void
146 helperOpenServers(helper * hlp)
147 {
148 char *s;
149 char *progname;
150 char *shortname;
151 char *procname;
152 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
153 char fd_note_buf[FD_DESC_SZ];
154 helper_server *srv;
155 int nargs = 0;
156 int k;
157 pid_t pid;
158 int rfd;
159 int wfd;
160 void * hIpc;
161 wordlist *w;
162
163 if (hlp->cmdline == NULL)
164 return;
165
166 progname = hlp->cmdline->key;
167
168 if ((s = strrchr(progname, '/')))
169 shortname = xstrdup(s + 1);
170 else
171 shortname = xstrdup(progname);
172
173 /* figure out how many new child are actually needed. */
174 int need_new = hlp->childs.needNew();
175
176 debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
177
178 if (need_new < 1) {
179 debugs(84, DBG_IMPORTANT, "helperOpenServers: No '" << shortname << "' processes needed.");
180 }
181
182 procname = (char *)xmalloc(strlen(shortname) + 3);
183
184 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
185
186 args[nargs] = procname;
187 ++nargs;
188
189 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
190 args[nargs] = w->key;
191 ++nargs;
192 }
193
194 args[nargs] = NULL;
195 ++nargs;
196
197 assert(nargs <= HELPER_MAX_ARGS);
198
199 for (k = 0; k < need_new; ++k) {
200 getCurrentTime();
201 rfd = wfd = -1;
202 pid = ipcCreate(hlp->ipc_type,
203 progname,
204 args,
205 shortname,
206 hlp->addr,
207 &rfd,
208 &wfd,
209 &hIpc);
210
211 if (pid < 0) {
212 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
213 continue;
214 }
215
216 ++ hlp->childs.n_running;
217 ++ hlp->childs.n_active;
218 CBDATA_INIT_TYPE(helper_server);
219 srv = cbdataAlloc(helper_server);
220 srv->hIpc = hIpc;
221 srv->pid = pid;
222 srv->index = k;
223 srv->addr = hlp->addr;
224 srv->readPipe = new Comm::Connection;
225 srv->readPipe->fd = rfd;
226 srv->writePipe = new Comm::Connection;
227 srv->writePipe->fd = wfd;
228 srv->rbuf = (char *)memAllocBuf(ReadBufMinSize, &srv->rbuf_sz);
229 srv->wqueue = new MemBuf;
230 srv->roffset = 0;
231 srv->requests = (helper_request **)xcalloc(hlp->childs.concurrency ? hlp->childs.concurrency : 1, sizeof(*srv->requests));
232 srv->parent = cbdataReference(hlp);
233 dlinkAddTail(srv, &srv->link, &hlp->servers);
234
235 if (rfd == wfd) {
236 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
237 fd_note(rfd, fd_note_buf);
238 } else {
239 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
240 fd_note(rfd, fd_note_buf);
241 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
242 fd_note(wfd, fd_note_buf);
243 }
244
245 commSetNonBlocking(rfd);
246
247 if (wfd != rfd)
248 commSetNonBlocking(wfd);
249
250 AsyncCall::Pointer closeCall = asyncCall(5,4, "helperServerFree", cbdataDialer(helperServerFree, srv));
251 comm_add_close_handler(rfd, closeCall);
252
253 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
254 CommIoCbPtrFun(helperHandleRead, srv));
255 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
256 }
257
258 hlp->last_restart = squid_curtime;
259 safe_free(shortname);
260 safe_free(procname);
261 helperKickQueue(hlp);
262 }
263
264 /**
265 * DPW 2007-05-08
266 *
267 * helperStatefulOpenServers: create the stateful child helper processes
268 */
269 void
270 helperStatefulOpenServers(statefulhelper * hlp)
271 {
272 char *shortname;
273 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
274 char fd_note_buf[FD_DESC_SZ];
275 int nargs = 0;
276
277 if (hlp->cmdline == NULL)
278 return;
279
280 if (hlp->childs.concurrency)
281 debugs(84, DBG_CRITICAL, "ERROR: concurrency= is not yet supported for stateful helpers ('" << hlp->cmdline << "')");
282
283 char *progname = hlp->cmdline->key;
284
285 char *s;
286 if ((s = strrchr(progname, '/')))
287 shortname = xstrdup(s + 1);
288 else
289 shortname = xstrdup(progname);
290
291 /* figure out haw mant new helpers are needed. */
292 int need_new = hlp->childs.needNew();
293
294 debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
295
296 if (need_new < 1) {
297 debugs(84, DBG_IMPORTANT, "helperStatefulOpenServers: No '" << shortname << "' processes needed.");
298 }
299
300 char *procname = (char *)xmalloc(strlen(shortname) + 3);
301
302 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
303
304 args[nargs] = procname;
305 ++nargs;
306
307 for (wordlist *w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
308 args[nargs] = w->key;
309 ++nargs;
310 }
311
312 args[nargs] = NULL;
313 ++nargs;
314
315 assert(nargs <= HELPER_MAX_ARGS);
316
317 for (int k = 0; k < need_new; ++k) {
318 getCurrentTime();
319 int rfd = -1;
320 int wfd = -1;
321 void * hIpc;
322 pid_t pid = ipcCreate(hlp->ipc_type,
323 progname,
324 args,
325 shortname,
326 hlp->addr,
327 &rfd,
328 &wfd,
329 &hIpc);
330
331 if (pid < 0) {
332 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
333 continue;
334 }
335
336 ++ hlp->childs.n_running;
337 ++ hlp->childs.n_active;
338 CBDATA_INIT_TYPE(helper_stateful_server);
339 helper_stateful_server *srv = cbdataAlloc(helper_stateful_server);
340 srv->hIpc = hIpc;
341 srv->pid = pid;
342 srv->flags.reserved = 0;
343 srv->stats.submits = 0;
344 srv->stats.releases = 0;
345 srv->index = k;
346 srv->addr = hlp->addr;
347 srv->readPipe = new Comm::Connection;
348 srv->readPipe->fd = rfd;
349 srv->writePipe = new Comm::Connection;
350 srv->writePipe->fd = wfd;
351 srv->rbuf = (char *)memAllocBuf(ReadBufMinSize, &srv->rbuf_sz);
352 srv->roffset = 0;
353 srv->parent = cbdataReference(hlp);
354
355 if (hlp->datapool != NULL)
356 srv->data = hlp->datapool->alloc();
357
358 dlinkAddTail(srv, &srv->link, &hlp->servers);
359
360 if (rfd == wfd) {
361 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
362 fd_note(rfd, fd_note_buf);
363 } else {
364 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
365 fd_note(rfd, fd_note_buf);
366 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
367 fd_note(wfd, fd_note_buf);
368 }
369
370 commSetNonBlocking(rfd);
371
372 if (wfd != rfd)
373 commSetNonBlocking(wfd);
374
375 AsyncCall::Pointer closeCall = asyncCall(5,4, "helperStatefulServerFree", cbdataDialer(helperStatefulServerFree, srv));
376 comm_add_close_handler(rfd, closeCall);
377
378 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
379 CommIoCbPtrFun(helperStatefulHandleRead, srv));
380 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
381 }
382
383 hlp->last_restart = squid_curtime;
384 safe_free(shortname);
385 safe_free(procname);
386 helperStatefulKickQueue(hlp);
387 }
388
389
390 void
391 helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
392 {
393 if (hlp == NULL) {
394 debugs(84, 3, "helperSubmit: hlp == NULL");
395 callback(data, NULL);
396 return;
397 }
398
399 helper_request *r = new helper_request;
400 helper_server *srv;
401
402 r->callback = callback;
403 r->data = cbdataReference(data);
404 r->buf = xstrdup(buf);
405
406 if ((srv = GetFirstAvailable(hlp)))
407 helperDispatch(srv, r);
408 else
409 Enqueue(hlp, r);
410
411 debugs(84, 9, "helperSubmit: " << buf);
412 }
413
414 /// lastserver = "server last used as part of a reserved request sequence"
415 void
416 helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPSCB * callback, void *data, helper_stateful_server * lastserver)
417 {
418 if (hlp == NULL) {
419 debugs(84, 3, "helperStatefulSubmit: hlp == NULL");
420 callback(data, 0, NULL);
421 return;
422 }
423
424 helper_stateful_request *r = new helper_stateful_request;
425
426 r->callback = callback;
427 r->data = cbdataReference(data);
428
429 if (buf != NULL) {
430 r->buf = xstrdup(buf);
431 r->placeholder = 0;
432 } else {
433 r->buf = NULL;
434 r->placeholder = 1;
435 }
436
437 if ((buf != NULL) && lastserver) {
438 debugs(84, 5, "StatefulSubmit with lastserver " << lastserver);
439 assert(lastserver->flags.reserved);
440 assert(!(lastserver->request));
441
442 debugs(84, 5, "StatefulSubmit dispatching");
443 helperStatefulDispatch(lastserver, r);
444 } else {
445 helper_stateful_server *srv;
446 if ((srv = StatefulGetFirstAvailable(hlp))) {
447 helperStatefulDispatch(srv, r);
448 } else
449 StatefulEnqueue(hlp, r);
450 }
451
452 debugs(84, 9, "helperStatefulSubmit: placeholder: '" << r->placeholder << "', buf '" << buf << "'.");
453 }
454
455 /**
456 * DPW 2007-05-08
457 *
458 * helperStatefulReleaseServer tells the helper that whoever was
459 * using it no longer needs its services.
460 */
461 void
462 helperStatefulReleaseServer(helper_stateful_server * srv)
463 {
464 debugs(84, 3, HERE << "srv-" << srv->index << " flags.reserved = " << srv->flags.reserved);
465 if (!srv->flags.reserved)
466 return;
467
468 ++ srv->stats.releases;
469
470 srv->flags.reserved = 0;
471 if (srv->parent->OnEmptyQueue != NULL && srv->data)
472 srv->parent->OnEmptyQueue(srv->data);
473
474 helperStatefulServerDone(srv);
475 }
476
477 /** return a pointer to the stateful routines data area */
478 void *
479 helperStatefulServerGetData(helper_stateful_server * srv)
480 {
481 return srv->data;
482 }
483
484 /**
485 * Dump some stats about the helper states to a StoreEntry
486 */
487 void
488 helperStats(StoreEntry * sentry, helper * hlp, const char *label)
489 {
490 if (!helperStartStats(sentry, hlp, label))
491 return;
492
493 storeAppendPrintf(sentry, "program: %s\n",
494 hlp->cmdline->key);
495 storeAppendPrintf(sentry, "number active: %d of %d (%d shutting down)\n",
496 hlp->childs.n_active, hlp->childs.n_max, (hlp->childs.n_running - hlp->childs.n_active) );
497 storeAppendPrintf(sentry, "requests sent: %d\n",
498 hlp->stats.requests);
499 storeAppendPrintf(sentry, "replies received: %d\n",
500 hlp->stats.replies);
501 storeAppendPrintf(sentry, "queue length: %d\n",
502 hlp->stats.queue_size);
503 storeAppendPrintf(sentry, "avg service time: %d msec\n",
504 hlp->stats.avg_svc_time);
505 storeAppendPrintf(sentry, "\n");
506 storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%s\t%7s\t%7s\t%7s\n",
507 "#",
508 "FD",
509 "PID",
510 "# Requests",
511 "Flags",
512 "Time",
513 "Offset",
514 "Request");
515
516 for (dlink_node *link = hlp->servers.head; link; link = link->next) {
517 helper_server *srv = (helper_server*)link->data;
518 double tt = 0.001 * (srv->requests[0] ? tvSubMsec(srv->requests[0]->dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
519 storeAppendPrintf(sentry, "%7d\t%7d\t%7d\t%11d\t%c%c%c%c\t%7.3f\t%7d\t%s\n",
520 srv->index + 1,
521 srv->readPipe->fd,
522 srv->pid,
523 srv->stats.uses,
524 srv->stats.pending ? 'B' : ' ',
525 srv->flags.writing ? 'W' : ' ',
526 srv->flags.closing ? 'C' : ' ',
527 srv->flags.shutdown ? 'S' : ' ',
528 tt < 0.0 ? 0.0 : tt,
529 (int) srv->roffset,
530 srv->requests[0] ? Format::QuoteMimeBlob(srv->requests[0]->buf) : "(none)");
531 }
532
533 storeAppendPrintf(sentry, "\nFlags key:\n\n");
534 storeAppendPrintf(sentry, " B = BUSY\n");
535 storeAppendPrintf(sentry, " W = WRITING\n");
536 storeAppendPrintf(sentry, " C = CLOSING\n");
537 storeAppendPrintf(sentry, " S = SHUTDOWN PENDING\n");
538 }
539
540 void
541 helperStatefulStats(StoreEntry * sentry, statefulhelper * hlp, const char *label)
542 {
543 if (!helperStartStats(sentry, hlp, label))
544 return;
545
546 storeAppendPrintf(sentry, "program: %s\n",
547 hlp->cmdline->key);
548 storeAppendPrintf(sentry, "number active: %d of %d (%d shutting down)\n",
549 hlp->childs.n_active, hlp->childs.n_max, (hlp->childs.n_running - hlp->childs.n_active) );
550 storeAppendPrintf(sentry, "requests sent: %d\n",
551 hlp->stats.requests);
552 storeAppendPrintf(sentry, "replies received: %d\n",
553 hlp->stats.replies);
554 storeAppendPrintf(sentry, "queue length: %d\n",
555 hlp->stats.queue_size);
556 storeAppendPrintf(sentry, "avg service time: %d msec\n",
557 hlp->stats.avg_svc_time);
558 storeAppendPrintf(sentry, "\n");
559 storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
560 "#",
561 "FD",
562 "PID",
563 "# Requests",
564 "Flags",
565 "Time",
566 "Offset",
567 "Request");
568
569 for (dlink_node *link = hlp->servers.head; link; link = link->next) {
570 helper_stateful_server *srv = (helper_stateful_server *)link->data;
571 double tt = 0.001 * tvSubMsec(srv->dispatch_time, srv->flags.busy ? current_time : srv->answer_time);
572 storeAppendPrintf(sentry, "%7d\t%7d\t%7d\t%11d\t%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
573 srv->index + 1,
574 srv->readPipe->fd,
575 srv->pid,
576 srv->stats.uses,
577 srv->flags.busy ? 'B' : ' ',
578 srv->flags.closing ? 'C' : ' ',
579 srv->flags.reserved ? 'R' : ' ',
580 srv->flags.shutdown ? 'S' : ' ',
581 srv->request ? (srv->request->placeholder ? 'P' : ' ') : ' ',
582 tt < 0.0 ? 0.0 : tt,
583 (int) srv->roffset,
584 srv->request ? Format::QuoteMimeBlob(srv->request->buf) : "(none)");
585 }
586
587 storeAppendPrintf(sentry, "\nFlags key:\n\n");
588 storeAppendPrintf(sentry, " B = BUSY\n");
589 storeAppendPrintf(sentry, " C = CLOSING\n");
590 storeAppendPrintf(sentry, " R = RESERVED\n");
591 storeAppendPrintf(sentry, " S = SHUTDOWN PENDING\n");
592 storeAppendPrintf(sentry, " P = PLACEHOLDER\n");
593 }
594
595 void
596 helperShutdown(helper * hlp)
597 {
598 dlink_node *link = hlp->servers.head;
599
600 while (link) {
601 helper_server *srv;
602 srv = (helper_server *)link->data;
603 link = link->next;
604
605 if (srv->flags.shutdown) {
606 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " has already SHUT DOWN.");
607 continue;
608 }
609
610 assert(hlp->childs.n_active > 0);
611 -- hlp->childs.n_active;
612 srv->flags.shutdown = 1; /* request it to shut itself down */
613
614 if (srv->flags.closing) {
615 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is CLOSING.");
616 continue;
617 }
618
619 if (srv->stats.pending) {
620 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is BUSY.");
621 continue;
622 }
623
624 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " shutting down.");
625 /* the rest of the details is dealt with in the helperServerFree
626 * close handler
627 */
628 srv->closePipesSafely();
629 }
630 }
631
632 void
633 helperStatefulShutdown(statefulhelper * hlp)
634 {
635 dlink_node *link = hlp->servers.head;
636 helper_stateful_server *srv;
637
638 while (link) {
639 srv = (helper_stateful_server *)link->data;
640 link = link->next;
641
642 if (srv->flags.shutdown) {
643 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " has already SHUT DOWN.");
644 continue;
645 }
646
647 assert(hlp->childs.n_active > 0);
648 -- hlp->childs.n_active;
649 srv->flags.shutdown = 1; /* request it to shut itself down */
650
651 if (srv->flags.busy) {
652 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is BUSY.");
653 continue;
654 }
655
656 if (srv->flags.closing) {
657 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is CLOSING.");
658 continue;
659 }
660
661 if (srv->flags.reserved) {
662 if (shutting_down) {
663 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is RESERVED. Closing anyway.");
664 } else {
665 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is RESERVED. Not Shutting Down Yet.");
666 continue;
667 }
668 }
669
670 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " shutting down.");
671
672 /* the rest of the details is dealt with in the helperStatefulServerFree
673 * close handler
674 */
675 srv->closePipesSafely();
676 }
677 }
678
679 helper::~helper()
680 {
681 /* note, don't free id_name, it probably points to static memory */
682
683 if (queue.head)
684 debugs(84, DBG_CRITICAL, "WARNING: freeing " << id_name << " helper with " << stats.queue_size << " requests queued");
685 }
686
687 /* ====================================================================== */
688 /* LOCAL FUNCTIONS */
689 /* ====================================================================== */
690
691 static void
692 helperServerFree(helper_server *srv)
693 {
694 helper *hlp = srv->parent;
695 helper_request *r;
696 int i, concurrency = hlp->childs.concurrency;
697
698 if (!concurrency)
699 concurrency = 1;
700
701 if (srv->rbuf) {
702 memFreeBuf(srv->rbuf_sz, srv->rbuf);
703 srv->rbuf = NULL;
704 }
705
706 srv->wqueue->clean();
707 delete srv->wqueue;
708
709 if (srv->writebuf) {
710 srv->writebuf->clean();
711 delete srv->writebuf;
712 srv->writebuf = NULL;
713 }
714
715 if (Comm::IsConnOpen(srv->writePipe))
716 srv->closeWritePipeSafely();
717
718 dlinkDelete(&srv->link, &hlp->servers);
719
720 assert(hlp->childs.n_running > 0);
721 -- hlp->childs.n_running;
722
723 if (!srv->flags.shutdown) {
724 assert(hlp->childs.n_active > 0);
725 -- hlp->childs.n_active;
726 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index + 1 << " exited");
727
728 if (hlp->childs.needNew() > 0) {
729 debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
730
731 if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30)
732 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
733
734 debugs(80, DBG_IMPORTANT, "Starting new helpers");
735 helperOpenServers(hlp);
736 }
737 }
738
739 for (i = 0; i < concurrency; ++i) {
740 if ((r = srv->requests[i])) {
741 void *cbdata;
742
743 if (cbdataReferenceValidDone(r->data, &cbdata))
744 r->callback(cbdata, NULL);
745
746 helperRequestFree(r);
747
748 srv->requests[i] = NULL;
749 }
750 }
751 safe_free(srv->requests);
752
753 cbdataReferenceDone(srv->parent);
754 cbdataFree(srv);
755 }
756
757 static void
758 helperStatefulServerFree(helper_stateful_server *srv)
759 {
760 statefulhelper *hlp = srv->parent;
761 helper_stateful_request *r;
762
763 if (srv->rbuf) {
764 memFreeBuf(srv->rbuf_sz, srv->rbuf);
765 srv->rbuf = NULL;
766 }
767
768 #if 0
769 srv->wqueue->clean();
770
771 delete srv->wqueue;
772
773 #endif
774
775 /* TODO: walk the local queue of requests and carry them all out */
776 if (Comm::IsConnOpen(srv->writePipe))
777 srv->closeWritePipeSafely();
778
779 dlinkDelete(&srv->link, &hlp->servers);
780
781 assert(hlp->childs.n_running > 0);
782 -- hlp->childs.n_running;
783
784 if (!srv->flags.shutdown) {
785 assert( hlp->childs.n_active > 0);
786 -- hlp->childs.n_active;
787 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index + 1 << " exited");
788
789 if (hlp->childs.needNew() > 0) {
790 debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
791
792 if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30)
793 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
794
795 debugs(80, DBG_IMPORTANT, "Starting new helpers");
796 helperStatefulOpenServers(hlp);
797 }
798 }
799
800 if ((r = srv->request)) {
801 void *cbdata;
802
803 if (cbdataReferenceValidDone(r->data, &cbdata))
804 r->callback(cbdata, srv, NULL);
805
806 helperStatefulRequestFree(r);
807
808 srv->request = NULL;
809 }
810
811 if (srv->data != NULL)
812 hlp->datapool->freeOne(srv->data);
813
814 cbdataReferenceDone(srv->parent);
815
816 cbdataFree(srv);
817 }
818
819 /// Calls back with a pointer to the buffer with the helper output
820 static void helperReturnBuffer(int request_number, helper_server * srv, helper * hlp, char * msg, char * msg_end)
821 {
822 helper_request *r = srv->requests[request_number];
823 if (r) {
824 HLPCB *callback = r->callback;
825
826 srv->requests[request_number] = NULL;
827
828 r->callback = NULL;
829
830 void *cbdata = NULL;
831 if (cbdataReferenceValidDone(r->data, &cbdata))
832 callback(cbdata, msg);
833
834 -- srv->stats.pending;
835
836 ++ hlp->stats.replies;
837
838 srv->answer_time = current_time;
839
840 srv->dispatch_time = r->dispatch_time;
841
842 hlp->stats.avg_svc_time =
843 Math::intAverage(hlp->stats.avg_svc_time,
844 tvSubMsec(r->dispatch_time, current_time),
845 hlp->stats.replies, REDIRECT_AV_FACTOR);
846
847 helperRequestFree(r);
848 } else {
849 debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected reply on channel " <<
850 request_number << " from " << hlp->id_name << " #" << srv->index + 1 <<
851 " '" << srv->rbuf << "'");
852 }
853 srv->roffset -= (msg_end - srv->rbuf);
854 memmove(srv->rbuf, msg_end, srv->roffset + 1);
855
856 if (!srv->flags.shutdown) {
857 helperKickQueue(hlp);
858 } else if (!srv->flags.closing && !srv->stats.pending) {
859 srv->flags.closing=1;
860 srv->writePipe->close();
861 return;
862 }
863 }
864
865 static void
866 helperHandleRead(const Comm::ConnectionPointer &conn, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
867 {
868 char *t = NULL;
869 helper_server *srv = (helper_server *)data;
870 helper *hlp = srv->parent;
871 assert(cbdataReferenceValid(data));
872
873 /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
874
875 if (flag == COMM_ERR_CLOSING) {
876 return;
877 }
878
879 assert(conn->fd == srv->readPipe->fd);
880
881 debugs(84, 5, "helperHandleRead: " << len << " bytes from " << hlp->id_name << " #" << srv->index + 1);
882
883 if (flag != COMM_OK || len == 0) {
884 srv->closePipesSafely();
885 return;
886 }
887
888 srv->roffset += len;
889 srv->rbuf[srv->roffset] = '\0';
890 debugs(84, 9, "helperHandleRead: '" << srv->rbuf << "'");
891
892 if (!srv->stats.pending) {
893 /* someone spoke without being spoken to */
894 debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected read from " <<
895 hlp->id_name << " #" << srv->index + 1 << ", " << (int)len <<
896 " bytes '" << srv->rbuf << "'");
897
898 srv->roffset = 0;
899 srv->rbuf[0] = '\0';
900 }
901
902 while ((t = strchr(srv->rbuf, hlp->eom))) {
903 /* end of reply found */
904 char *msg = srv->rbuf;
905 int i = 0;
906 debugs(84, 3, "helperHandleRead: end of reply found");
907
908 if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n')
909 t[-1] = '\0';
910
911 *t = '\0';
912 ++t;
913
914 if (hlp->childs.concurrency) {
915 i = strtol(msg, &msg, 10);
916
917 while (*msg && xisspace(*msg))
918 ++msg;
919 }
920
921 helperReturnBuffer(i, srv, hlp, msg, t);
922 }
923
924 if (Comm::IsConnOpen(srv->readPipe)) {
925 int spaceSize = srv->rbuf_sz - srv->roffset - 1;
926 assert(spaceSize >= 0);
927
928 // grow the input buffer if needed and possible
929 if (!spaceSize && srv->rbuf_sz + 4096 <= ReadBufMaxSize) {
930 srv->rbuf = (char *)memReallocBuf(srv->rbuf, srv->rbuf_sz + 4096, &srv->rbuf_sz);
931 debugs(84, 3, HERE << "Grew read buffer to " << srv->rbuf_sz);
932 spaceSize = srv->rbuf_sz - srv->roffset - 1;
933 assert(spaceSize >= 0);
934 }
935
936 // quit reading if there is no space left
937 if (!spaceSize) {
938 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
939 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
940 "Squid input buffer: " << hlp->id_name << " #" <<
941 (srv->index + 1));
942 srv->closePipesSafely();
943 return;
944 }
945
946 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
947 CommIoCbPtrFun(helperHandleRead, srv));
948 comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
949 }
950 }
951
952 static void
953 helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
954 {
955 char *t = NULL;
956 helper_stateful_server *srv = (helper_stateful_server *)data;
957 helper_stateful_request *r;
958 statefulhelper *hlp = srv->parent;
959 assert(cbdataReferenceValid(data));
960
961 /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
962
963 if (flag == COMM_ERR_CLOSING) {
964 return;
965 }
966
967 assert(conn->fd == srv->readPipe->fd);
968
969 debugs(84, 5, "helperStatefulHandleRead: " << len << " bytes from " <<
970 hlp->id_name << " #" << srv->index + 1);
971
972
973 if (flag != COMM_OK || len == 0) {
974 srv->closePipesSafely();
975 return;
976 }
977
978 srv->roffset += len;
979 srv->rbuf[srv->roffset] = '\0';
980 r = srv->request;
981
982 if (r == NULL) {
983 /* someone spoke without being spoken to */
984 debugs(84, DBG_IMPORTANT, "helperStatefulHandleRead: unexpected read from " <<
985 hlp->id_name << " #" << srv->index + 1 << ", " << (int)len <<
986 " bytes '" << srv->rbuf << "'");
987
988 srv->roffset = 0;
989 }
990
991 if ((t = strchr(srv->rbuf, hlp->eom))) {
992 /* end of reply found */
993 int called = 1;
994 debugs(84, 3, "helperStatefulHandleRead: end of reply found");
995
996 if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n')
997 t[-1] = '\0';
998
999 *t = '\0';
1000
1001 if (r && cbdataReferenceValid(r->data)) {
1002 r->callback(r->data, srv, srv->rbuf);
1003 } else {
1004 debugs(84, DBG_IMPORTANT, "StatefulHandleRead: no callback data registered");
1005 called = 0;
1006 }
1007
1008 srv->flags.busy = 0;
1009 srv->roffset = 0;
1010 helperStatefulRequestFree(r);
1011 srv->request = NULL;
1012 ++ hlp->stats.replies;
1013 srv->answer_time = current_time;
1014 hlp->stats.avg_svc_time =
1015 Math::intAverage(hlp->stats.avg_svc_time,
1016 tvSubMsec(srv->dispatch_time, current_time),
1017 hlp->stats.replies, REDIRECT_AV_FACTOR);
1018
1019 if (called)
1020 helperStatefulServerDone(srv);
1021 else
1022 helperStatefulReleaseServer(srv);
1023 }
1024
1025 if (Comm::IsConnOpen(srv->readPipe)) {
1026 int spaceSize = srv->rbuf_sz - srv->roffset - 1;
1027 assert(spaceSize >= 0);
1028
1029 // grow the input buffer if needed and possible
1030 if (!spaceSize && srv->rbuf_sz + 4096 <= ReadBufMaxSize) {
1031 srv->rbuf = (char *)memReallocBuf(srv->rbuf, srv->rbuf_sz + 4096, &srv->rbuf_sz);
1032 debugs(84, 3, HERE << "Grew read buffer to " << srv->rbuf_sz);
1033 spaceSize = srv->rbuf_sz - srv->roffset - 1;
1034 assert(spaceSize >= 0);
1035 }
1036
1037 // quit reading if there is no space left
1038 if (!spaceSize) {
1039 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
1040 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
1041 "Squid input buffer: " << hlp->id_name << " #" <<
1042 (srv->index + 1));
1043 srv->closePipesSafely();
1044 return;
1045 }
1046
1047 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
1048 CommIoCbPtrFun(helperStatefulHandleRead, srv));
1049 comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
1050 }
1051 }
1052
1053 static void
1054 Enqueue(helper * hlp, helper_request * r)
1055 {
1056 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
1057 dlinkAddTail(r, link, &hlp->queue);
1058 ++ hlp->stats.queue_size;
1059
1060 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1061 if (hlp->childs.needNew() > 0) {
1062 debugs(84, DBG_CRITICAL, "Starting new " << hlp->id_name << " helpers...");
1063 helperOpenServers(hlp);
1064 return;
1065 }
1066
1067 if (hlp->stats.queue_size < (int)hlp->childs.n_running)
1068 return;
1069
1070 if (squid_curtime - hlp->last_queue_warn < 600)
1071 return;
1072
1073 if (shutting_down || reconfiguring)
1074 return;
1075
1076 hlp->last_queue_warn = squid_curtime;
1077
1078 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1079 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1080 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1081
1082 if (hlp->stats.queue_size > (int)hlp->childs.n_running * 2)
1083 fatalf("Too many queued %s requests", hlp->id_name);
1084 }
1085
1086 static void
1087 StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r)
1088 {
1089 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
1090 dlinkAddTail(r, link, &hlp->queue);
1091 ++ hlp->stats.queue_size;
1092
1093 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1094 if (hlp->childs.needNew() > 0) {
1095 debugs(84, DBG_CRITICAL, "Starting new " << hlp->id_name << " helpers...");
1096 helperStatefulOpenServers(hlp);
1097 return;
1098 }
1099
1100 if (hlp->stats.queue_size < (int)hlp->childs.n_running)
1101 return;
1102
1103 if (hlp->stats.queue_size > (int)hlp->childs.n_running * 2)
1104 fatalf("Too many queued %s requests", hlp->id_name);
1105
1106 if (squid_curtime - hlp->last_queue_warn < 600)
1107 return;
1108
1109 if (shutting_down || reconfiguring)
1110 return;
1111
1112 hlp->last_queue_warn = squid_curtime;
1113
1114 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1115 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1116 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1117 }
1118
1119 static helper_request *
1120 Dequeue(helper * hlp)
1121 {
1122 dlink_node *link;
1123 helper_request *r = NULL;
1124
1125 if ((link = hlp->queue.head)) {
1126 r = (helper_request *)link->data;
1127 dlinkDelete(link, &hlp->queue);
1128 memFree(link, MEM_DLINK_NODE);
1129 -- hlp->stats.queue_size;
1130 }
1131
1132 return r;
1133 }
1134
1135 static helper_stateful_request *
1136 StatefulDequeue(statefulhelper * hlp)
1137 {
1138 dlink_node *link;
1139 helper_stateful_request *r = NULL;
1140
1141 if ((link = hlp->queue.head)) {
1142 r = (helper_stateful_request *)link->data;
1143 dlinkDelete(link, &hlp->queue);
1144 memFree(link, MEM_DLINK_NODE);
1145 -- hlp->stats.queue_size;
1146 }
1147
1148 return r;
1149 }
1150
1151 static helper_server *
1152 GetFirstAvailable(helper * hlp)
1153 {
1154 dlink_node *n;
1155 helper_server *srv;
1156 helper_server *selected = NULL;
1157 debugs(84, 5, "GetFirstAvailable: Running servers " << hlp->childs.n_running);
1158
1159 if (hlp->childs.n_running == 0)
1160 return NULL;
1161
1162 /* Find "least" loaded helper (approx) */
1163 for (n = hlp->servers.head; n != NULL; n = n->next) {
1164 srv = (helper_server *)n->data;
1165
1166 if (selected && selected->stats.pending <= srv->stats.pending)
1167 continue;
1168
1169 if (srv->flags.shutdown)
1170 continue;
1171
1172 if (!srv->stats.pending)
1173 return srv;
1174
1175 if (selected) {
1176 selected = srv;
1177 break;
1178 }
1179
1180 selected = srv;
1181 }
1182
1183 /* Check for overload */
1184 if (!selected) {
1185 debugs(84, 5, "GetFirstAvailable: None available.");
1186 return NULL;
1187 }
1188
1189 if (selected->stats.pending >= (hlp->childs.concurrency ? hlp->childs.concurrency : 1)) {
1190 debugs(84, 3, "GetFirstAvailable: Least-loaded helper is overloaded!");
1191 return NULL;
1192 }
1193
1194 debugs(84, 5, "GetFirstAvailable: returning srv-" << selected->index);
1195 return selected;
1196 }
1197
1198 static helper_stateful_server *
1199 StatefulGetFirstAvailable(statefulhelper * hlp)
1200 {
1201 dlink_node *n;
1202 helper_stateful_server *srv = NULL;
1203 debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp->childs.n_running);
1204
1205 if (hlp->childs.n_running == 0)
1206 return NULL;
1207
1208 for (n = hlp->servers.head; n != NULL; n = n->next) {
1209 srv = (helper_stateful_server *)n->data;
1210
1211 if (srv->flags.busy)
1212 continue;
1213
1214 if (srv->flags.reserved)
1215 continue;
1216
1217 if (srv->flags.shutdown)
1218 continue;
1219
1220 if ((hlp->IsAvailable != NULL) && (srv->data != NULL) && !(hlp->IsAvailable(srv->data)))
1221 continue;
1222
1223 debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv->index);
1224 return srv;
1225 }
1226
1227 debugs(84, 5, "StatefulGetFirstAvailable: None available.");
1228 return NULL;
1229 }
1230
1231
1232 static void
1233 helperDispatchWriteDone(const Comm::ConnectionPointer &conn, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
1234 {
1235 helper_server *srv = (helper_server *)data;
1236
1237 srv->writebuf->clean();
1238 delete srv->writebuf;
1239 srv->writebuf = NULL;
1240 srv->flags.writing = 0;
1241
1242 if (flag != COMM_OK) {
1243 /* Helper server has crashed */
1244 debugs(84, DBG_CRITICAL, "helperDispatch: Helper " << srv->parent->id_name << " #" << srv->index + 1 << " has crashed");
1245 return;
1246 }
1247
1248 if (!srv->wqueue->isNull()) {
1249 srv->writebuf = srv->wqueue;
1250 srv->wqueue = new MemBuf;
1251 srv->flags.writing = 1;
1252 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1253 CommIoCbPtrFun(helperDispatchWriteDone, srv));
1254 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
1255 }
1256 }
1257
1258 static void
1259 helperDispatch(helper_server * srv, helper_request * r)
1260 {
1261 helper *hlp = srv->parent;
1262 helper_request **ptr = NULL;
1263 unsigned int slot;
1264
1265 if (!cbdataReferenceValid(r->data)) {
1266 debugs(84, DBG_IMPORTANT, "helperDispatch: invalid callback data");
1267 helperRequestFree(r);
1268 return;
1269 }
1270
1271 for (slot = 0; slot < (hlp->childs.concurrency ? hlp->childs.concurrency : 1); ++slot) {
1272 if (!srv->requests[slot]) {
1273 ptr = &srv->requests[slot];
1274 break;
1275 }
1276 }
1277
1278 assert(ptr);
1279 *ptr = r;
1280 srv->stats.pending += 1;
1281 r->dispatch_time = current_time;
1282
1283 if (srv->wqueue->isNull())
1284 srv->wqueue->init();
1285
1286 if (hlp->childs.concurrency)
1287 srv->wqueue->Printf("%d %s", slot, r->buf);
1288 else
1289 srv->wqueue->append(r->buf, strlen(r->buf));
1290
1291 if (!srv->flags.writing) {
1292 assert(NULL == srv->writebuf);
1293 srv->writebuf = srv->wqueue;
1294 srv->wqueue = new MemBuf;
1295 srv->flags.writing = 1;
1296 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1297 CommIoCbPtrFun(helperDispatchWriteDone, srv));
1298 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
1299 }
1300
1301 debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index + 1 << ", " << strlen(r->buf) << " bytes");
1302
1303 ++ srv->stats.uses;
1304 ++ hlp->stats.requests;
1305 }
1306
1307 static void
1308 helperStatefulDispatchWriteDone(const Comm::ConnectionPointer &conn, char *buf, size_t len, comm_err_t flag,
1309 int xerrno, void *data)
1310 {
1311 /* nothing! */
1312 }
1313
1314
1315 static void
1316 helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r)
1317 {
1318 statefulhelper *hlp = srv->parent;
1319
1320 if (!cbdataReferenceValid(r->data)) {
1321 debugs(84, DBG_IMPORTANT, "helperStatefulDispatch: invalid callback data");
1322 helperStatefulRequestFree(r);
1323 helperStatefulReleaseServer(srv);
1324 return;
1325 }
1326
1327 debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index + 1);
1328
1329 if (r->placeholder == 1) {
1330 /* a callback is needed before this request can _use_ a helper. */
1331 /* we don't care about releasing this helper. The request NEVER
1332 * gets to the helper. So we throw away the return code */
1333 r->callback(r->data, srv, NULL);
1334 /* throw away the placeholder */
1335 helperStatefulRequestFree(r);
1336 /* and push the queue. Note that the callback may have submitted a new
1337 * request to the helper which is why we test for the request*/
1338
1339 if (srv->request == NULL)
1340 helperStatefulServerDone(srv);
1341
1342 return;
1343 }
1344
1345 srv->flags.busy = 1;
1346 srv->flags.reserved = 1;
1347 srv->request = r;
1348 srv->dispatch_time = current_time;
1349 AsyncCall::Pointer call = commCbCall(5,5, "helperStatefulDispatchWriteDone",
1350 CommIoCbPtrFun(helperStatefulDispatchWriteDone, hlp));
1351 Comm::Write(srv->writePipe, r->buf, strlen(r->buf), call, NULL);
1352 debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
1353 hlp->id_name << " #" << srv->index + 1 << ", " <<
1354 (int) strlen(r->buf) << " bytes");
1355
1356 ++ srv->stats.uses;
1357 ++ hlp->stats.requests;
1358 }
1359
1360
1361 static void
1362 helperKickQueue(helper * hlp)
1363 {
1364 helper_request *r;
1365 helper_server *srv;
1366
1367 while ((srv = GetFirstAvailable(hlp)) && (r = Dequeue(hlp)))
1368 helperDispatch(srv, r);
1369 }
1370
1371 static void
1372 helperStatefulKickQueue(statefulhelper * hlp)
1373 {
1374 helper_stateful_request *r;
1375 helper_stateful_server *srv;
1376
1377 while ((srv = StatefulGetFirstAvailable(hlp)) && (r = StatefulDequeue(hlp)))
1378 helperStatefulDispatch(srv, r);
1379 }
1380
1381 static void
1382 helperStatefulServerDone(helper_stateful_server * srv)
1383 {
1384 if (!srv->flags.shutdown) {
1385 helperStatefulKickQueue(srv->parent);
1386 } else if (!srv->flags.closing && !srv->flags.reserved && !srv->flags.busy) {
1387 srv->closeWritePipeSafely();
1388 return;
1389 }
1390 }
1391
1392 static void
1393 helperRequestFree(helper_request * r)
1394 {
1395 cbdataReferenceDone(r->data);
1396 xfree(r->buf);
1397 delete r;
1398 }
1399
1400 static void
1401 helperStatefulRequestFree(helper_stateful_request * r)
1402 {
1403 if (r) {
1404 cbdataReferenceDone(r->data);
1405 xfree(r->buf);
1406 delete r;
1407 }
1408 }
1409
1410 // TODO: should helper_ and helper_stateful_ have a common parent?
1411 static bool
1412 helperStartStats(StoreEntry *sentry, void *hlp, const char *label)
1413 {
1414 if (!hlp) {
1415 if (label)
1416 storeAppendPrintf(sentry, "%s: unavailable\n", label);
1417 return false;
1418 }
1419
1420 if (label)
1421 storeAppendPrintf(sentry, "%s:\n", label);
1422
1423 return true;
1424 }