]> git.ipfire.org Git - thirdparty/squid.git/blob - src/helper.cc
Merged from trunk 13172.
[thirdparty/squid.git] / src / helper.cc
1 /*
2 * DEBUG: section 84 Helper process maintenance
3 * AUTHOR: Harvest Derived?
4 *
5 * SQUID Web Proxy Cache http://www.squid-cache.org/
6 * ----------------------------------------------------------
7 *
8 * Squid is the result of efforts by numerous individuals from
9 * the Internet community; see the CONTRIBUTORS file for full
10 * details. Many organizations have provided support for Squid's
11 * development; see the SPONSORS file for full details. Squid is
12 * Copyrighted (C) 2001 by the Regents of the University of
13 * California; see the COPYRIGHT file for full details. Squid
14 * incorporates software developed and/or copyrighted by other
15 * sources; see the CREDITS file for full details.
16 *
17 * This program is free software; you can redistribute it and/or modify
18 * it under the terms of the GNU General Public License as published by
19 * the Free Software Foundation; either version 2 of the License, or
20 * (at your option) any later version.
21 *
22 * This program is distributed in the hope that it will be useful,
23 * but WITHOUT ANY WARRANTY; without even the implied warranty of
24 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
25 * GNU General Public License for more details.
26 *
27 * You should have received a copy of the GNU General Public License
28 * along with this program; if not, write to the Free Software
29 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
30 *
31 */
32
33 #include "squid.h"
34 #include "base/AsyncCbdataCalls.h"
35 #include "comm.h"
36 #include "comm/Connection.h"
37 #include "comm/Write.h"
38 #include "fd.h"
39 #include "fde.h"
40 #include "format/Quoting.h"
41 #include "helper.h"
42 #include "Mem.h"
43 #include "MemBuf.h"
44 #include "SquidIpc.h"
45 #include "SquidMath.h"
46 #include "SquidTime.h"
47 #include "Store.h"
48 #include "wordlist.h"
49
50 #define HELPER_MAX_ARGS 64
51
52 /** Initial Squid input buffer size. Helper responses may exceed this, and
53 * Squid will grow the input buffer as needed, up to ReadBufMaxSize.
54 */
55 const size_t ReadBufMinSize(4*1024);
56
57 /** Maximum safe size of a helper-to-Squid response message plus one.
58 * Squid will warn and close the stream if a helper sends a too-big response.
59 * ssl_crtd helper is known to produce responses of at least 10KB in size.
60 * Some undocumented helpers are known to produce responses exceeding 8KB.
61 */
62 const size_t ReadBufMaxSize(32*1024);
63
64 static IOCB helperHandleRead;
65 static IOCB helperStatefulHandleRead;
66 static void helperServerFree(helper_server *srv);
67 static void helperStatefulServerFree(helper_stateful_server *srv);
68 static void Enqueue(helper * hlp, helper_request *);
69 static helper_request *Dequeue(helper * hlp);
70 static helper_stateful_request *StatefulDequeue(statefulhelper * hlp);
71 static helper_server *GetFirstAvailable(helper * hlp);
72 static helper_stateful_server *StatefulGetFirstAvailable(statefulhelper * hlp);
73 static void helperDispatch(helper_server * srv, helper_request * r);
74 static void helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r);
75 static void helperKickQueue(helper * hlp);
76 static void helperStatefulKickQueue(statefulhelper * hlp);
77 static void helperStatefulServerDone(helper_stateful_server * srv);
78 static void helperRequestFree(helper_request * r);
79 static void helperStatefulRequestFree(helper_stateful_request * r);
80 static void StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r);
81 static bool helperStartStats(StoreEntry *sentry, void *hlp, const char *label);
82
83 CBDATA_CLASS_INIT(helper);
84 CBDATA_CLASS_INIT(helper_server);
85 CBDATA_CLASS_INIT(statefulhelper);
86 CBDATA_CLASS_INIT(helper_stateful_server);
87
88 void
89 HelperServerBase::initStats()
90 {
91 stats.uses=0;
92 stats.replies=0;
93 stats.pending=0;
94 stats.releases=0;
95 }
96
97 void
98 HelperServerBase::closePipesSafely()
99 {
100 #if _SQUID_WINDOWS_
101 int no = index + 1;
102
103 shutdown(writePipe->fd, SD_BOTH);
104 #endif
105
106 flags.closing = true;
107 if (readPipe->fd == writePipe->fd)
108 readPipe->fd = -1;
109 else
110 readPipe->close();
111 writePipe->close();
112
113 #if _SQUID_WINDOWS_
114 if (hIpc) {
115 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
116 getCurrentTime();
117 debugs(84, DBG_IMPORTANT, "WARNING: " << hlp->id_name <<
118 " #" << no << " (" << hlp->cmdline->key << "," <<
119 (long int)pid << ") didn't exit in 5 seconds");
120 }
121 CloseHandle(hIpc);
122 }
123 #endif
124 }
125
126 void
127 HelperServerBase::closeWritePipeSafely()
128 {
129 #if _SQUID_WINDOWS_
130 int no = index + 1;
131
132 shutdown(writePipe->fd, (readPipe->fd == writePipe->fd ? SD_BOTH : SD_SEND));
133 #endif
134
135 flags.closing = true;
136 if (readPipe->fd == writePipe->fd)
137 readPipe->fd = -1;
138 writePipe->close();
139
140 #if _SQUID_WINDOWS_
141 if (hIpc) {
142 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
143 getCurrentTime();
144 debugs(84, DBG_IMPORTANT, "WARNING: " << hlp->id_name <<
145 " #" << no << " (" << hlp->cmdline->key << "," <<
146 (long int)pid << ") didn't exit in 5 seconds");
147 }
148 CloseHandle(hIpc);
149 }
150 #endif
151 }
152
153 void
154 helperOpenServers(helper * hlp)
155 {
156 char *s;
157 char *progname;
158 char *shortname;
159 char *procname;
160 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
161 char fd_note_buf[FD_DESC_SZ];
162 helper_server *srv;
163 int nargs = 0;
164 int k;
165 pid_t pid;
166 int rfd;
167 int wfd;
168 void * hIpc;
169 wordlist *w;
170
171 if (hlp->cmdline == NULL)
172 return;
173
174 progname = hlp->cmdline->key;
175
176 if ((s = strrchr(progname, '/')))
177 shortname = xstrdup(s + 1);
178 else
179 shortname = xstrdup(progname);
180
181 /* figure out how many new child are actually needed. */
182 int need_new = hlp->childs.needNew();
183
184 debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
185
186 if (need_new < 1) {
187 debugs(84, DBG_IMPORTANT, "helperOpenServers: No '" << shortname << "' processes needed.");
188 }
189
190 procname = (char *)xmalloc(strlen(shortname) + 3);
191
192 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
193
194 args[nargs] = procname;
195 ++nargs;
196
197 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
198 args[nargs] = w->key;
199 ++nargs;
200 }
201
202 args[nargs] = NULL;
203 ++nargs;
204
205 assert(nargs <= HELPER_MAX_ARGS);
206
207 for (k = 0; k < need_new; ++k) {
208 getCurrentTime();
209 rfd = wfd = -1;
210 pid = ipcCreate(hlp->ipc_type,
211 progname,
212 args,
213 shortname,
214 hlp->addr,
215 &rfd,
216 &wfd,
217 &hIpc);
218
219 if (pid < 0) {
220 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
221 continue;
222 }
223
224 ++ hlp->childs.n_running;
225 ++ hlp->childs.n_active;
226 srv = new helper_server;
227 srv->hIpc = hIpc;
228 srv->pid = pid;
229 srv->initStats();
230 srv->index = k;
231 srv->addr = hlp->addr;
232 srv->readPipe = new Comm::Connection;
233 srv->readPipe->fd = rfd;
234 srv->writePipe = new Comm::Connection;
235 srv->writePipe->fd = wfd;
236 srv->rbuf = (char *)memAllocBuf(ReadBufMinSize, &srv->rbuf_sz);
237 srv->wqueue = new MemBuf;
238 srv->roffset = 0;
239 srv->requests = (helper_request **)xcalloc(hlp->childs.concurrency ? hlp->childs.concurrency : 1, sizeof(*srv->requests));
240 srv->parent = cbdataReference(hlp);
241 dlinkAddTail(srv, &srv->link, &hlp->servers);
242
243 if (rfd == wfd) {
244 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
245 fd_note(rfd, fd_note_buf);
246 } else {
247 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
248 fd_note(rfd, fd_note_buf);
249 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
250 fd_note(wfd, fd_note_buf);
251 }
252
253 commSetNonBlocking(rfd);
254
255 if (wfd != rfd)
256 commSetNonBlocking(wfd);
257
258 AsyncCall::Pointer closeCall = asyncCall(5,4, "helperServerFree", cbdataDialer(helperServerFree, srv));
259 comm_add_close_handler(rfd, closeCall);
260
261 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
262 CommIoCbPtrFun(helperHandleRead, srv));
263 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
264 }
265
266 hlp->last_restart = squid_curtime;
267 safe_free(shortname);
268 safe_free(procname);
269 helperKickQueue(hlp);
270 }
271
272 /**
273 * DPW 2007-05-08
274 *
275 * helperStatefulOpenServers: create the stateful child helper processes
276 */
277 void
278 helperStatefulOpenServers(statefulhelper * hlp)
279 {
280 char *shortname;
281 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
282 char fd_note_buf[FD_DESC_SZ];
283 int nargs = 0;
284
285 if (hlp->cmdline == NULL)
286 return;
287
288 if (hlp->childs.concurrency)
289 debugs(84, DBG_CRITICAL, "ERROR: concurrency= is not yet supported for stateful helpers ('" << hlp->cmdline << "')");
290
291 char *progname = hlp->cmdline->key;
292
293 char *s;
294 if ((s = strrchr(progname, '/')))
295 shortname = xstrdup(s + 1);
296 else
297 shortname = xstrdup(progname);
298
299 /* figure out haw mant new helpers are needed. */
300 int need_new = hlp->childs.needNew();
301
302 debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
303
304 if (need_new < 1) {
305 debugs(84, DBG_IMPORTANT, "helperStatefulOpenServers: No '" << shortname << "' processes needed.");
306 }
307
308 char *procname = (char *)xmalloc(strlen(shortname) + 3);
309
310 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
311
312 args[nargs] = procname;
313 ++nargs;
314
315 for (wordlist *w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
316 args[nargs] = w->key;
317 ++nargs;
318 }
319
320 args[nargs] = NULL;
321 ++nargs;
322
323 assert(nargs <= HELPER_MAX_ARGS);
324
325 for (int k = 0; k < need_new; ++k) {
326 getCurrentTime();
327 int rfd = -1;
328 int wfd = -1;
329 void * hIpc;
330 pid_t pid = ipcCreate(hlp->ipc_type,
331 progname,
332 args,
333 shortname,
334 hlp->addr,
335 &rfd,
336 &wfd,
337 &hIpc);
338
339 if (pid < 0) {
340 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
341 continue;
342 }
343
344 ++ hlp->childs.n_running;
345 ++ hlp->childs.n_active;
346 helper_stateful_server *srv = new helper_stateful_server;
347 srv->hIpc = hIpc;
348 srv->pid = pid;
349 srv->flags.reserved = false;
350 srv->initStats();
351 srv->index = k;
352 srv->addr = hlp->addr;
353 srv->readPipe = new Comm::Connection;
354 srv->readPipe->fd = rfd;
355 srv->writePipe = new Comm::Connection;
356 srv->writePipe->fd = wfd;
357 srv->rbuf = (char *)memAllocBuf(ReadBufMinSize, &srv->rbuf_sz);
358 srv->roffset = 0;
359 srv->parent = cbdataReference(hlp);
360
361 if (hlp->datapool != NULL)
362 srv->data = hlp->datapool->alloc();
363
364 dlinkAddTail(srv, &srv->link, &hlp->servers);
365
366 if (rfd == wfd) {
367 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
368 fd_note(rfd, fd_note_buf);
369 } else {
370 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
371 fd_note(rfd, fd_note_buf);
372 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
373 fd_note(wfd, fd_note_buf);
374 }
375
376 commSetNonBlocking(rfd);
377
378 if (wfd != rfd)
379 commSetNonBlocking(wfd);
380
381 AsyncCall::Pointer closeCall = asyncCall(5,4, "helperStatefulServerFree", cbdataDialer(helperStatefulServerFree, srv));
382 comm_add_close_handler(rfd, closeCall);
383
384 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
385 CommIoCbPtrFun(helperStatefulHandleRead, srv));
386 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
387 }
388
389 hlp->last_restart = squid_curtime;
390 safe_free(shortname);
391 safe_free(procname);
392 helperStatefulKickQueue(hlp);
393 }
394
395 void
396 helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
397 {
398 if (hlp == NULL) {
399 debugs(84, 3, "helperSubmit: hlp == NULL");
400 HelperReply nilReply;
401 callback(data, nilReply);
402 return;
403 }
404
405 helper_request *r = new helper_request;
406 helper_server *srv;
407
408 r->callback = callback;
409 r->data = cbdataReference(data);
410 r->buf = xstrdup(buf);
411
412 if ((srv = GetFirstAvailable(hlp)))
413 helperDispatch(srv, r);
414 else
415 Enqueue(hlp, r);
416
417 debugs(84, DBG_DATA, Raw("buf", buf, strlen(buf)));
418 }
419
420 /// lastserver = "server last used as part of a reserved request sequence"
421 void
422 helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPCB * callback, void *data, helper_stateful_server * lastserver)
423 {
424 if (hlp == NULL) {
425 debugs(84, 3, "helperStatefulSubmit: hlp == NULL");
426 HelperReply nilReply;
427 callback(data, nilReply);
428 return;
429 }
430
431 helper_stateful_request *r = new helper_stateful_request;
432
433 r->callback = callback;
434 r->data = cbdataReference(data);
435
436 if (buf != NULL) {
437 r->buf = xstrdup(buf);
438 r->placeholder = 0;
439 } else {
440 r->buf = NULL;
441 r->placeholder = 1;
442 }
443
444 if ((buf != NULL) && lastserver) {
445 debugs(84, 5, "StatefulSubmit with lastserver " << lastserver);
446 assert(lastserver->flags.reserved);
447 assert(!(lastserver->request));
448
449 debugs(84, 5, "StatefulSubmit dispatching");
450 helperStatefulDispatch(lastserver, r);
451 } else {
452 helper_stateful_server *srv;
453 if ((srv = StatefulGetFirstAvailable(hlp))) {
454 helperStatefulDispatch(srv, r);
455 } else
456 StatefulEnqueue(hlp, r);
457 }
458
459 debugs(84, DBG_DATA, "placeholder: '" << r->placeholder <<
460 "', " << Raw("buf", buf, strlen(buf)));
461 }
462
463 /**
464 * DPW 2007-05-08
465 *
466 * helperStatefulReleaseServer tells the helper that whoever was
467 * using it no longer needs its services.
468 */
469 void
470 helperStatefulReleaseServer(helper_stateful_server * srv)
471 {
472 debugs(84, 3, HERE << "srv-" << srv->index << " flags.reserved = " << srv->flags.reserved);
473 if (!srv->flags.reserved)
474 return;
475
476 ++ srv->stats.releases;
477
478 srv->flags.reserved = false;
479 if (srv->parent->OnEmptyQueue != NULL && srv->data)
480 srv->parent->OnEmptyQueue(srv->data);
481
482 helperStatefulServerDone(srv);
483 }
484
485 /** return a pointer to the stateful routines data area */
486 void *
487 helperStatefulServerGetData(helper_stateful_server * srv)
488 {
489 return srv->data;
490 }
491
492 /**
493 * Dump some stats about the helper states to a StoreEntry
494 */
495 void
496 helperStats(StoreEntry * sentry, helper * hlp, const char *label)
497 {
498 if (!helperStartStats(sentry, hlp, label))
499 return;
500
501 storeAppendPrintf(sentry, "program: %s\n",
502 hlp->cmdline->key);
503 storeAppendPrintf(sentry, "number active: %d of %d (%d shutting down)\n",
504 hlp->childs.n_active, hlp->childs.n_max, (hlp->childs.n_running - hlp->childs.n_active) );
505 storeAppendPrintf(sentry, "requests sent: %d\n",
506 hlp->stats.requests);
507 storeAppendPrintf(sentry, "replies received: %d\n",
508 hlp->stats.replies);
509 storeAppendPrintf(sentry, "queue length: %d\n",
510 hlp->stats.queue_size);
511 storeAppendPrintf(sentry, "avg service time: %d msec\n",
512 hlp->stats.avg_svc_time);
513 storeAppendPrintf(sentry, "\n");
514 storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%11s\t%s\t%7s\t%7s\t%7s\n",
515 "#",
516 "FD",
517 "PID",
518 "# Requests",
519 "# Replies",
520 "Flags",
521 "Time",
522 "Offset",
523 "Request");
524
525 for (dlink_node *link = hlp->servers.head; link; link = link->next) {
526 helper_server *srv = (helper_server*)link->data;
527 double tt = 0.001 * (srv->requests[0] ? tvSubMsec(srv->requests[0]->dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
528 storeAppendPrintf(sentry, "%7d\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c\t%7.3f\t%7d\t%s\n",
529 srv->index + 1,
530 srv->readPipe->fd,
531 srv->pid,
532 srv->stats.uses,
533 srv->stats.replies,
534 srv->stats.pending ? 'B' : ' ',
535 srv->flags.writing ? 'W' : ' ',
536 srv->flags.closing ? 'C' : ' ',
537 srv->flags.shutdown ? 'S' : ' ',
538 tt < 0.0 ? 0.0 : tt,
539 (int) srv->roffset,
540 srv->requests[0] ? Format::QuoteMimeBlob(srv->requests[0]->buf) : "(none)");
541 }
542
543 storeAppendPrintf(sentry, "\nFlags key:\n\n");
544 storeAppendPrintf(sentry, " B = BUSY\n");
545 storeAppendPrintf(sentry, " W = WRITING\n");
546 storeAppendPrintf(sentry, " C = CLOSING\n");
547 storeAppendPrintf(sentry, " S = SHUTDOWN PENDING\n");
548 }
549
550 void
551 helperStatefulStats(StoreEntry * sentry, statefulhelper * hlp, const char *label)
552 {
553 if (!helperStartStats(sentry, hlp, label))
554 return;
555
556 storeAppendPrintf(sentry, "program: %s\n",
557 hlp->cmdline->key);
558 storeAppendPrintf(sentry, "number active: %d of %d (%d shutting down)\n",
559 hlp->childs.n_active, hlp->childs.n_max, (hlp->childs.n_running - hlp->childs.n_active) );
560 storeAppendPrintf(sentry, "requests sent: %d\n",
561 hlp->stats.requests);
562 storeAppendPrintf(sentry, "replies received: %d\n",
563 hlp->stats.replies);
564 storeAppendPrintf(sentry, "queue length: %d\n",
565 hlp->stats.queue_size);
566 storeAppendPrintf(sentry, "avg service time: %d msec\n",
567 hlp->stats.avg_svc_time);
568 storeAppendPrintf(sentry, "\n");
569 storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
570 "#",
571 "FD",
572 "PID",
573 "# Requests",
574 "# Replies",
575 "Flags",
576 "Time",
577 "Offset",
578 "Request");
579
580 for (dlink_node *link = hlp->servers.head; link; link = link->next) {
581 helper_stateful_server *srv = (helper_stateful_server *)link->data;
582 double tt = 0.001 * tvSubMsec(srv->dispatch_time, srv->flags.busy ? current_time : srv->answer_time);
583 storeAppendPrintf(sentry, "%7d\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
584 srv->index + 1,
585 srv->readPipe->fd,
586 srv->pid,
587 srv->stats.uses,
588 srv->stats.replies,
589 srv->flags.busy ? 'B' : ' ',
590 srv->flags.closing ? 'C' : ' ',
591 srv->flags.reserved ? 'R' : ' ',
592 srv->flags.shutdown ? 'S' : ' ',
593 srv->request ? (srv->request->placeholder ? 'P' : ' ') : ' ',
594 tt < 0.0 ? 0.0 : tt,
595 (int) srv->roffset,
596 srv->request ? Format::QuoteMimeBlob(srv->request->buf) : "(none)");
597 }
598
599 storeAppendPrintf(sentry, "\nFlags key:\n\n");
600 storeAppendPrintf(sentry, " B = BUSY\n");
601 storeAppendPrintf(sentry, " C = CLOSING\n");
602 storeAppendPrintf(sentry, " R = RESERVED\n");
603 storeAppendPrintf(sentry, " S = SHUTDOWN PENDING\n");
604 storeAppendPrintf(sentry, " P = PLACEHOLDER\n");
605 }
606
607 void
608 helperShutdown(helper * hlp)
609 {
610 dlink_node *link = hlp->servers.head;
611
612 while (link) {
613 helper_server *srv;
614 srv = (helper_server *)link->data;
615 link = link->next;
616
617 if (srv->flags.shutdown) {
618 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " has already SHUT DOWN.");
619 continue;
620 }
621
622 assert(hlp->childs.n_active > 0);
623 -- hlp->childs.n_active;
624 srv->flags.shutdown = true; /* request it to shut itself down */
625
626 if (srv->flags.closing) {
627 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is CLOSING.");
628 continue;
629 }
630
631 if (srv->stats.pending) {
632 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is BUSY.");
633 continue;
634 }
635
636 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " shutting down.");
637 /* the rest of the details is dealt with in the helperServerFree
638 * close handler
639 */
640 srv->closePipesSafely();
641 }
642 }
643
644 void
645 helperStatefulShutdown(statefulhelper * hlp)
646 {
647 dlink_node *link = hlp->servers.head;
648 helper_stateful_server *srv;
649
650 while (link) {
651 srv = (helper_stateful_server *)link->data;
652 link = link->next;
653
654 if (srv->flags.shutdown) {
655 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " has already SHUT DOWN.");
656 continue;
657 }
658
659 assert(hlp->childs.n_active > 0);
660 -- hlp->childs.n_active;
661 srv->flags.shutdown = true; /* request it to shut itself down */
662
663 if (srv->flags.busy) {
664 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is BUSY.");
665 continue;
666 }
667
668 if (srv->flags.closing) {
669 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is CLOSING.");
670 continue;
671 }
672
673 if (srv->flags.reserved) {
674 if (shutting_down) {
675 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is RESERVED. Closing anyway.");
676 } else {
677 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is RESERVED. Not Shutting Down Yet.");
678 continue;
679 }
680 }
681
682 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " shutting down.");
683
684 /* the rest of the details is dealt with in the helperStatefulServerFree
685 * close handler
686 */
687 srv->closePipesSafely();
688 }
689 }
690
691 helper::~helper()
692 {
693 /* note, don't free id_name, it probably points to static memory */
694
695 if (queue.head)
696 debugs(84, DBG_CRITICAL, "WARNING: freeing " << id_name << " helper with " << stats.queue_size << " requests queued");
697 }
698
699 /* ====================================================================== */
700 /* LOCAL FUNCTIONS */
701 /* ====================================================================== */
702
703 static void
704 helperServerFree(helper_server *srv)
705 {
706 helper *hlp = srv->parent;
707 helper_request *r;
708 int i, concurrency = hlp->childs.concurrency;
709
710 if (!concurrency)
711 concurrency = 1;
712
713 if (srv->rbuf) {
714 memFreeBuf(srv->rbuf_sz, srv->rbuf);
715 srv->rbuf = NULL;
716 }
717
718 srv->wqueue->clean();
719 delete srv->wqueue;
720
721 if (srv->writebuf) {
722 srv->writebuf->clean();
723 delete srv->writebuf;
724 srv->writebuf = NULL;
725 }
726
727 if (Comm::IsConnOpen(srv->writePipe))
728 srv->closeWritePipeSafely();
729
730 dlinkDelete(&srv->link, &hlp->servers);
731
732 assert(hlp->childs.n_running > 0);
733 -- hlp->childs.n_running;
734
735 if (!srv->flags.shutdown) {
736 assert(hlp->childs.n_active > 0);
737 -- hlp->childs.n_active;
738 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index + 1 << " exited");
739
740 if (hlp->childs.needNew() > 0) {
741 debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
742
743 if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30) {
744 if (srv->stats.replies < 1)
745 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
746 else
747 debugs(80, DBG_CRITICAL, "ERROR: The " << hlp->id_name << " helpers are crashing too rapidly, need help!");
748 }
749
750 debugs(80, DBG_IMPORTANT, "Starting new helpers");
751 helperOpenServers(hlp);
752 }
753 }
754
755 for (i = 0; i < concurrency; ++i) {
756 // XXX: re-schedule these on another helper?
757 if ((r = srv->requests[i])) {
758 void *cbdata;
759
760 if (cbdataReferenceValidDone(r->data, &cbdata)) {
761 HelperReply nilReply;
762 r->callback(cbdata, nilReply);
763 }
764
765 helperRequestFree(r);
766
767 srv->requests[i] = NULL;
768 }
769 }
770 safe_free(srv->requests);
771
772 cbdataReferenceDone(srv->parent);
773 delete srv;
774 }
775
776 static void
777 helperStatefulServerFree(helper_stateful_server *srv)
778 {
779 statefulhelper *hlp = srv->parent;
780 helper_stateful_request *r;
781
782 if (srv->rbuf) {
783 memFreeBuf(srv->rbuf_sz, srv->rbuf);
784 srv->rbuf = NULL;
785 }
786
787 #if 0
788 srv->wqueue->clean();
789
790 delete srv->wqueue;
791
792 #endif
793
794 /* TODO: walk the local queue of requests and carry them all out */
795 if (Comm::IsConnOpen(srv->writePipe))
796 srv->closeWritePipeSafely();
797
798 dlinkDelete(&srv->link, &hlp->servers);
799
800 assert(hlp->childs.n_running > 0);
801 -- hlp->childs.n_running;
802
803 if (!srv->flags.shutdown) {
804 assert( hlp->childs.n_active > 0);
805 -- hlp->childs.n_active;
806 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index + 1 << " exited");
807
808 if (hlp->childs.needNew() > 0) {
809 debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
810
811 if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30) {
812 if (srv->stats.replies < 1)
813 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
814 else
815 debugs(80, DBG_CRITICAL, "ERROR: The " << hlp->id_name << " helpers are crashing too rapidly, need help!");
816 }
817
818 debugs(80, DBG_IMPORTANT, "Starting new helpers");
819 helperStatefulOpenServers(hlp);
820 }
821 }
822
823 if ((r = srv->request)) {
824 void *cbdata;
825
826 if (cbdataReferenceValidDone(r->data, &cbdata)) {
827 HelperReply nilReply;
828 nilReply.whichServer = srv;
829 r->callback(cbdata, nilReply);
830 }
831
832 helperStatefulRequestFree(r);
833
834 srv->request = NULL;
835 }
836
837 if (srv->data != NULL)
838 hlp->datapool->freeOne(srv->data);
839
840 cbdataReferenceDone(srv->parent);
841
842 delete srv;
843 }
844
845 /// Calls back with a pointer to the buffer with the helper output
846 static void
847 helperReturnBuffer(int request_number, helper_server * srv, helper * hlp, char * msg, char * msg_end)
848 {
849 helper_request *r = srv->requests[request_number];
850 if (r) {
851 HLPCB *callback = r->callback;
852
853 srv->requests[request_number] = NULL;
854
855 r->callback = NULL;
856
857 void *cbdata = NULL;
858 if (cbdataReferenceValidDone(r->data, &cbdata)) {
859 HelperReply response(msg, (msg_end-msg));
860 callback(cbdata, response);
861 }
862
863 -- srv->stats.pending;
864 ++ srv->stats.replies;
865
866 ++ hlp->stats.replies;
867
868 srv->answer_time = current_time;
869
870 srv->dispatch_time = r->dispatch_time;
871
872 hlp->stats.avg_svc_time =
873 Math::intAverage(hlp->stats.avg_svc_time,
874 tvSubMsec(r->dispatch_time, current_time),
875 hlp->stats.replies, REDIRECT_AV_FACTOR);
876
877 helperRequestFree(r);
878 } else {
879 debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected reply on channel " <<
880 request_number << " from " << hlp->id_name << " #" << srv->index + 1 <<
881 " '" << srv->rbuf << "'");
882 }
883
884 if (!srv->flags.shutdown) {
885 helperKickQueue(hlp);
886 } else if (!srv->flags.closing && !srv->stats.pending) {
887 srv->flags.closing=true;
888 srv->writePipe->close();
889 }
890 }
891
892 static void
893 helperHandleRead(const Comm::ConnectionPointer &conn, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
894 {
895 char *t = NULL;
896 helper_server *srv = (helper_server *)data;
897 helper *hlp = srv->parent;
898 assert(cbdataReferenceValid(data));
899
900 /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
901
902 if (flag == COMM_ERR_CLOSING) {
903 return;
904 }
905
906 assert(conn->fd == srv->readPipe->fd);
907
908 debugs(84, 5, "helperHandleRead: " << len << " bytes from " << hlp->id_name << " #" << srv->index + 1);
909
910 if (flag != COMM_OK || len == 0) {
911 srv->closePipesSafely();
912 return;
913 }
914
915 srv->roffset += len;
916 srv->rbuf[srv->roffset] = '\0';
917 debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
918
919 if (!srv->stats.pending) {
920 /* someone spoke without being spoken to */
921 debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected read from " <<
922 hlp->id_name << " #" << srv->index + 1 << ", " << (int)len <<
923 " bytes '" << srv->rbuf << "'");
924
925 srv->roffset = 0;
926 srv->rbuf[0] = '\0';
927 }
928
929 while ((t = strchr(srv->rbuf, hlp->eom))) {
930 /* end of reply found */
931 char *msg = srv->rbuf;
932 int i = 0;
933 int skip = 1;
934 debugs(84, 3, "helperHandleRead: end of reply found");
935
936 if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n') {
937 *t = '\0';
938 // rewind to the \r octet which is the real terminal now
939 // and remember that we have to skip forward 2 places now.
940 skip = 2;
941 --t;
942 }
943
944 *t = '\0';
945
946 if (hlp->childs.concurrency) {
947 i = strtol(msg, &msg, 10);
948
949 while (*msg && xisspace(*msg))
950 ++msg;
951 }
952
953 helperReturnBuffer(i, srv, hlp, msg, t);
954 srv->roffset -= (t - srv->rbuf) + skip;
955 memmove(srv->rbuf, t + skip, srv->roffset);
956 srv->rbuf[srv->roffset] = '\0';
957 }
958
959 if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
960 int spaceSize = srv->rbuf_sz - srv->roffset - 1;
961 assert(spaceSize >= 0);
962
963 // grow the input buffer if needed and possible
964 if (!spaceSize && srv->rbuf_sz + 4096 <= ReadBufMaxSize) {
965 srv->rbuf = (char *)memReallocBuf(srv->rbuf, srv->rbuf_sz + 4096, &srv->rbuf_sz);
966 debugs(84, 3, HERE << "Grew read buffer to " << srv->rbuf_sz);
967 spaceSize = srv->rbuf_sz - srv->roffset - 1;
968 assert(spaceSize >= 0);
969 }
970
971 // quit reading if there is no space left
972 if (!spaceSize) {
973 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
974 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
975 "Squid input buffer: " << hlp->id_name << " #" <<
976 (srv->index + 1));
977 srv->closePipesSafely();
978 return;
979 }
980
981 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
982 CommIoCbPtrFun(helperHandleRead, srv));
983 comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
984 }
985 }
986
987 static void
988 helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
989 {
990 char *t = NULL;
991 helper_stateful_server *srv = (helper_stateful_server *)data;
992 helper_stateful_request *r;
993 statefulhelper *hlp = srv->parent;
994 assert(cbdataReferenceValid(data));
995
996 /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
997
998 if (flag == COMM_ERR_CLOSING) {
999 return;
1000 }
1001
1002 assert(conn->fd == srv->readPipe->fd);
1003
1004 debugs(84, 5, "helperStatefulHandleRead: " << len << " bytes from " <<
1005 hlp->id_name << " #" << srv->index + 1);
1006
1007 if (flag != COMM_OK || len == 0) {
1008 srv->closePipesSafely();
1009 return;
1010 }
1011
1012 srv->roffset += len;
1013 srv->rbuf[srv->roffset] = '\0';
1014 r = srv->request;
1015 debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
1016
1017 if (r == NULL) {
1018 /* someone spoke without being spoken to */
1019 debugs(84, DBG_IMPORTANT, "helperStatefulHandleRead: unexpected read from " <<
1020 hlp->id_name << " #" << srv->index + 1 << ", " << (int)len <<
1021 " bytes '" << srv->rbuf << "'");
1022
1023 srv->roffset = 0;
1024 }
1025
1026 if ((t = strchr(srv->rbuf, hlp->eom))) {
1027 /* end of reply found */
1028 int called = 1;
1029 int skip = 1;
1030 debugs(84, 3, "helperStatefulHandleRead: end of reply found");
1031
1032 if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n') {
1033 *t = '\0';
1034 // rewind to the \r octet which is the real terminal now
1035 // and remember that we have to skip forward 2 places now.
1036 skip = 2;
1037 --t;
1038 }
1039
1040 *t = '\0';
1041
1042 if (r && cbdataReferenceValid(r->data)) {
1043 HelperReply res(srv->rbuf, (t - srv->rbuf));
1044 res.whichServer = srv;
1045 r->callback(r->data, res);
1046 } else {
1047 debugs(84, DBG_IMPORTANT, "StatefulHandleRead: no callback data registered");
1048 called = 0;
1049 }
1050 // only skip off the \0's _after_ passing its location in HelperReply above
1051 t += skip;
1052
1053 srv->flags.busy = false;
1054 /**
1055 * BUG: the below assumes that only one response per read() was received and discards any octets remaining.
1056 * Doing this prohibits concurrency support with multiple replies per read().
1057 * TODO: check that read() setup on these buffers pays attention to roffest!=0
1058 * TODO: check that replies bigger than the buffer are discarded and do not to affect future replies
1059 */
1060 srv->roffset = 0;
1061 helperStatefulRequestFree(r);
1062 srv->request = NULL;
1063
1064 -- srv->stats.pending;
1065 ++ srv->stats.replies;
1066
1067 ++ hlp->stats.replies;
1068 srv->answer_time = current_time;
1069 hlp->stats.avg_svc_time =
1070 Math::intAverage(hlp->stats.avg_svc_time,
1071 tvSubMsec(srv->dispatch_time, current_time),
1072 hlp->stats.replies, REDIRECT_AV_FACTOR);
1073
1074 if (called)
1075 helperStatefulServerDone(srv);
1076 else
1077 helperStatefulReleaseServer(srv);
1078 }
1079
1080 if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
1081 int spaceSize = srv->rbuf_sz - srv->roffset - 1;
1082 assert(spaceSize >= 0);
1083
1084 // grow the input buffer if needed and possible
1085 if (!spaceSize && srv->rbuf_sz + 4096 <= ReadBufMaxSize) {
1086 srv->rbuf = (char *)memReallocBuf(srv->rbuf, srv->rbuf_sz + 4096, &srv->rbuf_sz);
1087 debugs(84, 3, HERE << "Grew read buffer to " << srv->rbuf_sz);
1088 spaceSize = srv->rbuf_sz - srv->roffset - 1;
1089 assert(spaceSize >= 0);
1090 }
1091
1092 // quit reading if there is no space left
1093 if (!spaceSize) {
1094 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
1095 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
1096 "Squid input buffer: " << hlp->id_name << " #" <<
1097 (srv->index + 1));
1098 srv->closePipesSafely();
1099 return;
1100 }
1101
1102 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
1103 CommIoCbPtrFun(helperStatefulHandleRead, srv));
1104 comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
1105 }
1106 }
1107
1108 static void
1109 Enqueue(helper * hlp, helper_request * r)
1110 {
1111 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
1112 dlinkAddTail(r, link, &hlp->queue);
1113 ++ hlp->stats.queue_size;
1114
1115 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1116 if (hlp->childs.needNew() > 0) {
1117 debugs(84, DBG_CRITICAL, "Starting new " << hlp->id_name << " helpers...");
1118 helperOpenServers(hlp);
1119 return;
1120 }
1121
1122 if (hlp->stats.queue_size < (int)hlp->childs.n_running)
1123 return;
1124
1125 if (squid_curtime - hlp->last_queue_warn < 600)
1126 return;
1127
1128 if (shutting_down || reconfiguring)
1129 return;
1130
1131 hlp->last_queue_warn = squid_curtime;
1132
1133 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1134 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1135 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1136
1137 if (hlp->stats.queue_size > (int)hlp->childs.n_running * 2)
1138 fatalf("Too many queued %s requests", hlp->id_name);
1139 }
1140
1141 static void
1142 StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r)
1143 {
1144 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
1145 dlinkAddTail(r, link, &hlp->queue);
1146 ++ hlp->stats.queue_size;
1147
1148 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1149 if (hlp->childs.needNew() > 0) {
1150 debugs(84, DBG_CRITICAL, "Starting new " << hlp->id_name << " helpers...");
1151 helperStatefulOpenServers(hlp);
1152 return;
1153 }
1154
1155 if (hlp->stats.queue_size < (int)hlp->childs.n_running)
1156 return;
1157
1158 if (hlp->stats.queue_size > (int)hlp->childs.n_running * 2)
1159 fatalf("Too many queued %s requests", hlp->id_name);
1160
1161 if (squid_curtime - hlp->last_queue_warn < 600)
1162 return;
1163
1164 if (shutting_down || reconfiguring)
1165 return;
1166
1167 hlp->last_queue_warn = squid_curtime;
1168
1169 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1170 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1171 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1172 }
1173
1174 static helper_request *
1175 Dequeue(helper * hlp)
1176 {
1177 dlink_node *link;
1178 helper_request *r = NULL;
1179
1180 if ((link = hlp->queue.head)) {
1181 r = (helper_request *)link->data;
1182 dlinkDelete(link, &hlp->queue);
1183 memFree(link, MEM_DLINK_NODE);
1184 -- hlp->stats.queue_size;
1185 }
1186
1187 return r;
1188 }
1189
1190 static helper_stateful_request *
1191 StatefulDequeue(statefulhelper * hlp)
1192 {
1193 dlink_node *link;
1194 helper_stateful_request *r = NULL;
1195
1196 if ((link = hlp->queue.head)) {
1197 r = (helper_stateful_request *)link->data;
1198 dlinkDelete(link, &hlp->queue);
1199 memFree(link, MEM_DLINK_NODE);
1200 -- hlp->stats.queue_size;
1201 }
1202
1203 return r;
1204 }
1205
1206 static helper_server *
1207 GetFirstAvailable(helper * hlp)
1208 {
1209 dlink_node *n;
1210 helper_server *srv;
1211 helper_server *selected = NULL;
1212 debugs(84, 5, "GetFirstAvailable: Running servers " << hlp->childs.n_running);
1213
1214 if (hlp->childs.n_running == 0)
1215 return NULL;
1216
1217 /* Find "least" loaded helper (approx) */
1218 for (n = hlp->servers.head; n != NULL; n = n->next) {
1219 srv = (helper_server *)n->data;
1220
1221 if (selected && selected->stats.pending <= srv->stats.pending)
1222 continue;
1223
1224 if (srv->flags.shutdown)
1225 continue;
1226
1227 if (!srv->stats.pending)
1228 return srv;
1229
1230 if (selected) {
1231 selected = srv;
1232 break;
1233 }
1234
1235 selected = srv;
1236 }
1237
1238 /* Check for overload */
1239 if (!selected) {
1240 debugs(84, 5, "GetFirstAvailable: None available.");
1241 return NULL;
1242 }
1243
1244 if (selected->stats.pending >= (hlp->childs.concurrency ? hlp->childs.concurrency : 1)) {
1245 debugs(84, 3, "GetFirstAvailable: Least-loaded helper is overloaded!");
1246 return NULL;
1247 }
1248
1249 debugs(84, 5, "GetFirstAvailable: returning srv-" << selected->index);
1250 return selected;
1251 }
1252
1253 static helper_stateful_server *
1254 StatefulGetFirstAvailable(statefulhelper * hlp)
1255 {
1256 dlink_node *n;
1257 helper_stateful_server *srv = NULL;
1258 debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp->childs.n_running);
1259
1260 if (hlp->childs.n_running == 0)
1261 return NULL;
1262
1263 for (n = hlp->servers.head; n != NULL; n = n->next) {
1264 srv = (helper_stateful_server *)n->data;
1265
1266 if (srv->flags.busy)
1267 continue;
1268
1269 if (srv->flags.reserved)
1270 continue;
1271
1272 if (srv->flags.shutdown)
1273 continue;
1274
1275 if ((hlp->IsAvailable != NULL) && (srv->data != NULL) && !(hlp->IsAvailable(srv->data)))
1276 continue;
1277
1278 debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv->index);
1279 return srv;
1280 }
1281
1282 debugs(84, 5, "StatefulGetFirstAvailable: None available.");
1283 return NULL;
1284 }
1285
1286 static void
1287 helperDispatchWriteDone(const Comm::ConnectionPointer &conn, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
1288 {
1289 helper_server *srv = (helper_server *)data;
1290
1291 srv->writebuf->clean();
1292 delete srv->writebuf;
1293 srv->writebuf = NULL;
1294 srv->flags.writing = false;
1295
1296 if (flag != COMM_OK) {
1297 /* Helper server has crashed */
1298 debugs(84, DBG_CRITICAL, "helperDispatch: Helper " << srv->parent->id_name << " #" << srv->index + 1 << " has crashed");
1299 return;
1300 }
1301
1302 if (!srv->wqueue->isNull()) {
1303 srv->writebuf = srv->wqueue;
1304 srv->wqueue = new MemBuf;
1305 srv->flags.writing = true;
1306 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1307 CommIoCbPtrFun(helperDispatchWriteDone, srv));
1308 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
1309 }
1310 }
1311
1312 static void
1313 helperDispatch(helper_server * srv, helper_request * r)
1314 {
1315 helper *hlp = srv->parent;
1316 helper_request **ptr = NULL;
1317 unsigned int slot;
1318
1319 if (!cbdataReferenceValid(r->data)) {
1320 debugs(84, DBG_IMPORTANT, "helperDispatch: invalid callback data");
1321 helperRequestFree(r);
1322 return;
1323 }
1324
1325 for (slot = 0; slot < (hlp->childs.concurrency ? hlp->childs.concurrency : 1); ++slot) {
1326 if (!srv->requests[slot]) {
1327 ptr = &srv->requests[slot];
1328 break;
1329 }
1330 }
1331
1332 assert(ptr);
1333 *ptr = r;
1334 r->dispatch_time = current_time;
1335
1336 if (srv->wqueue->isNull())
1337 srv->wqueue->init();
1338
1339 if (hlp->childs.concurrency)
1340 srv->wqueue->Printf("%d %s", slot, r->buf);
1341 else
1342 srv->wqueue->append(r->buf, strlen(r->buf));
1343
1344 if (!srv->flags.writing) {
1345 assert(NULL == srv->writebuf);
1346 srv->writebuf = srv->wqueue;
1347 srv->wqueue = new MemBuf;
1348 srv->flags.writing = true;
1349 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1350 CommIoCbPtrFun(helperDispatchWriteDone, srv));
1351 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
1352 }
1353
1354 debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index + 1 << ", " << strlen(r->buf) << " bytes");
1355
1356 ++ srv->stats.uses;
1357 ++ srv->stats.pending;
1358 ++ hlp->stats.requests;
1359 }
1360
1361 static void
1362 helperStatefulDispatchWriteDone(const Comm::ConnectionPointer &conn, char *buf, size_t len, comm_err_t flag,
1363 int xerrno, void *data)
1364 {
1365 /* nothing! */
1366 }
1367
1368 static void
1369 helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r)
1370 {
1371 statefulhelper *hlp = srv->parent;
1372
1373 if (!cbdataReferenceValid(r->data)) {
1374 debugs(84, DBG_IMPORTANT, "helperStatefulDispatch: invalid callback data");
1375 helperStatefulRequestFree(r);
1376 helperStatefulReleaseServer(srv);
1377 return;
1378 }
1379
1380 debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index + 1);
1381
1382 if (r->placeholder == 1) {
1383 /* a callback is needed before this request can _use_ a helper. */
1384 /* we don't care about releasing this helper. The request NEVER
1385 * gets to the helper. So we throw away the return code */
1386 HelperReply nilReply;
1387 nilReply.whichServer = srv;
1388 r->callback(r->data, nilReply);
1389 /* throw away the placeholder */
1390 helperStatefulRequestFree(r);
1391 /* and push the queue. Note that the callback may have submitted a new
1392 * request to the helper which is why we test for the request */
1393
1394 if (srv->request == NULL)
1395 helperStatefulServerDone(srv);
1396
1397 return;
1398 }
1399
1400 srv->flags.busy = true;
1401 srv->flags.reserved = true;
1402 srv->request = r;
1403 srv->dispatch_time = current_time;
1404 AsyncCall::Pointer call = commCbCall(5,5, "helperStatefulDispatchWriteDone",
1405 CommIoCbPtrFun(helperStatefulDispatchWriteDone, hlp));
1406 Comm::Write(srv->writePipe, r->buf, strlen(r->buf), call, NULL);
1407 debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
1408 hlp->id_name << " #" << srv->index + 1 << ", " <<
1409 (int) strlen(r->buf) << " bytes");
1410
1411 ++ srv->stats.uses;
1412 ++ srv->stats.pending;
1413 ++ hlp->stats.requests;
1414 }
1415
1416 static void
1417 helperKickQueue(helper * hlp)
1418 {
1419 helper_request *r;
1420 helper_server *srv;
1421
1422 while ((srv = GetFirstAvailable(hlp)) && (r = Dequeue(hlp)))
1423 helperDispatch(srv, r);
1424 }
1425
1426 static void
1427 helperStatefulKickQueue(statefulhelper * hlp)
1428 {
1429 helper_stateful_request *r;
1430 helper_stateful_server *srv;
1431
1432 while ((srv = StatefulGetFirstAvailable(hlp)) && (r = StatefulDequeue(hlp)))
1433 helperStatefulDispatch(srv, r);
1434 }
1435
1436 static void
1437 helperStatefulServerDone(helper_stateful_server * srv)
1438 {
1439 if (!srv->flags.shutdown) {
1440 helperStatefulKickQueue(srv->parent);
1441 } else if (!srv->flags.closing && !srv->flags.reserved && !srv->flags.busy) {
1442 srv->closeWritePipeSafely();
1443 return;
1444 }
1445 }
1446
1447 static void
1448 helperRequestFree(helper_request * r)
1449 {
1450 cbdataReferenceDone(r->data);
1451 xfree(r->buf);
1452 delete r;
1453 }
1454
1455 static void
1456 helperStatefulRequestFree(helper_stateful_request * r)
1457 {
1458 if (r) {
1459 cbdataReferenceDone(r->data);
1460 xfree(r->buf);
1461 delete r;
1462 }
1463 }
1464
1465 // TODO: should helper_ and helper_stateful_ have a common parent?
1466 static bool
1467 helperStartStats(StoreEntry *sentry, void *hlp, const char *label)
1468 {
1469 if (!hlp) {
1470 if (label)
1471 storeAppendPrintf(sentry, "%s: unavailable\n", label);
1472 return false;
1473 }
1474
1475 if (label)
1476 storeAppendPrintf(sentry, "%s:\n", label);
1477
1478 return true;
1479 }