]> git.ipfire.org Git - thirdparty/squid.git/blob - src/helper.cc
Windows: Fix error displaying helper name on pipe close errors
[thirdparty/squid.git] / src / helper.cc
1 /*
2 * Copyright (C) 1996-2014 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 84 Helper process maintenance */
10
11 #include "squid.h"
12 #include "base/AsyncCbdataCalls.h"
13 #include "comm.h"
14 #include "comm/Connection.h"
15 #include "comm/Read.h"
16 #include "comm/Write.h"
17 #include "fd.h"
18 #include "fde.h"
19 #include "format/Quoting.h"
20 #include "helper.h"
21 #include "Mem.h"
22 #include "MemBuf.h"
23 #include "SquidIpc.h"
24 #include "SquidMath.h"
25 #include "SquidTime.h"
26 #include "Store.h"
27 #include "wordlist.h"
28
29 #define HELPER_MAX_ARGS 64
30
31 /** Initial Squid input buffer size. Helper responses may exceed this, and
32 * Squid will grow the input buffer as needed, up to ReadBufMaxSize.
33 */
34 const size_t ReadBufMinSize(4*1024);
35
36 /** Maximum safe size of a helper-to-Squid response message plus one.
37 * Squid will warn and close the stream if a helper sends a too-big response.
38 * ssl_crtd helper is known to produce responses of at least 10KB in size.
39 * Some undocumented helpers are known to produce responses exceeding 8KB.
40 */
41 const size_t ReadBufMaxSize(32*1024);
42
43 static IOCB helperHandleRead;
44 static IOCB helperStatefulHandleRead;
45 static void helperServerFree(helper_server *srv);
46 static void helperStatefulServerFree(helper_stateful_server *srv);
47 static void Enqueue(helper * hlp, helper_request *);
48 static helper_request *Dequeue(helper * hlp);
49 static helper_stateful_request *StatefulDequeue(statefulhelper * hlp);
50 static helper_server *GetFirstAvailable(helper * hlp);
51 static helper_stateful_server *StatefulGetFirstAvailable(statefulhelper * hlp);
52 static void helperDispatch(helper_server * srv, helper_request * r);
53 static void helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r);
54 static void helperKickQueue(helper * hlp);
55 static void helperStatefulKickQueue(statefulhelper * hlp);
56 static void helperStatefulServerDone(helper_stateful_server * srv);
57 static void helperRequestFree(helper_request * r);
58 static void helperStatefulRequestFree(helper_stateful_request * r);
59 static void StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r);
60 static bool helperStartStats(StoreEntry *sentry, void *hlp, const char *label);
61
62 CBDATA_CLASS_INIT(helper);
63 CBDATA_CLASS_INIT(helper_server);
64 CBDATA_CLASS_INIT(statefulhelper);
65 CBDATA_CLASS_INIT(helper_stateful_server);
66
67 InstanceIdDefinitions(HelperServerBase, "Hlpr");
68
69 void
70 HelperServerBase::initStats()
71 {
72 stats.uses=0;
73 stats.replies=0;
74 stats.pending=0;
75 stats.releases=0;
76 }
77
78 void
79 HelperServerBase::closePipesSafely(const char *id_name)
80 {
81 #if _SQUID_WINDOWS_
82 shutdown(writePipe->fd, SD_BOTH);
83 #endif
84
85 flags.closing = true;
86 if (readPipe->fd == writePipe->fd)
87 readPipe->fd = -1;
88 else
89 readPipe->close();
90 writePipe->close();
91
92 #if _SQUID_WINDOWS_
93 if (hIpc) {
94 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
95 getCurrentTime();
96 debugs(84, DBG_IMPORTANT, "WARNING: " << id_name <<
97 " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
98 }
99 CloseHandle(hIpc);
100 }
101 #endif
102 }
103
104 void
105 HelperServerBase::closeWritePipeSafely(const char *id_name)
106 {
107 #if _SQUID_WINDOWS_
108 shutdown(writePipe->fd, (readPipe->fd == writePipe->fd ? SD_BOTH : SD_SEND));
109 #endif
110
111 flags.closing = true;
112 if (readPipe->fd == writePipe->fd)
113 readPipe->fd = -1;
114 writePipe->close();
115
116 #if _SQUID_WINDOWS_
117 if (hIpc) {
118 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
119 getCurrentTime();
120 debugs(84, DBG_IMPORTANT, "WARNING: " << id_name <<
121 " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
122 }
123 CloseHandle(hIpc);
124 }
125 #endif
126 }
127
128 void
129 helperOpenServers(helper * hlp)
130 {
131 char *s;
132 char *progname;
133 char *shortname;
134 char *procname;
135 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
136 char fd_note_buf[FD_DESC_SZ];
137 helper_server *srv;
138 int nargs = 0;
139 int k;
140 pid_t pid;
141 int rfd;
142 int wfd;
143 void * hIpc;
144 wordlist *w;
145
146 if (hlp->cmdline == NULL)
147 return;
148
149 progname = hlp->cmdline->key;
150
151 if ((s = strrchr(progname, '/')))
152 shortname = xstrdup(s + 1);
153 else
154 shortname = xstrdup(progname);
155
156 /* figure out how many new child are actually needed. */
157 int need_new = hlp->childs.needNew();
158
159 debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
160
161 if (need_new < 1) {
162 debugs(84, DBG_IMPORTANT, "helperOpenServers: No '" << shortname << "' processes needed.");
163 }
164
165 procname = (char *)xmalloc(strlen(shortname) + 3);
166
167 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
168
169 args[nargs] = procname;
170 ++nargs;
171
172 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
173 args[nargs] = w->key;
174 ++nargs;
175 }
176
177 args[nargs] = NULL;
178 ++nargs;
179
180 assert(nargs <= HELPER_MAX_ARGS);
181
182 for (k = 0; k < need_new; ++k) {
183 getCurrentTime();
184 rfd = wfd = -1;
185 pid = ipcCreate(hlp->ipc_type,
186 progname,
187 args,
188 shortname,
189 hlp->addr,
190 &rfd,
191 &wfd,
192 &hIpc);
193
194 if (pid < 0) {
195 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
196 continue;
197 }
198
199 ++ hlp->childs.n_running;
200 ++ hlp->childs.n_active;
201 srv = new helper_server;
202 srv->hIpc = hIpc;
203 srv->pid = pid;
204 srv->initStats();
205 srv->addr = hlp->addr;
206 srv->readPipe = new Comm::Connection;
207 srv->readPipe->fd = rfd;
208 srv->writePipe = new Comm::Connection;
209 srv->writePipe->fd = wfd;
210 srv->rbuf = (char *)memAllocBuf(ReadBufMinSize, &srv->rbuf_sz);
211 srv->wqueue = new MemBuf;
212 srv->roffset = 0;
213 srv->requests = (helper_request **)xcalloc(hlp->childs.concurrency ? hlp->childs.concurrency : 1, sizeof(*srv->requests));
214 srv->parent = cbdataReference(hlp);
215 dlinkAddTail(srv, &srv->link, &hlp->servers);
216
217 if (rfd == wfd) {
218 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
219 fd_note(rfd, fd_note_buf);
220 } else {
221 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
222 fd_note(rfd, fd_note_buf);
223 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
224 fd_note(wfd, fd_note_buf);
225 }
226
227 commSetNonBlocking(rfd);
228
229 if (wfd != rfd)
230 commSetNonBlocking(wfd);
231
232 AsyncCall::Pointer closeCall = asyncCall(5,4, "helperServerFree", cbdataDialer(helperServerFree, srv));
233 comm_add_close_handler(rfd, closeCall);
234
235 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
236 CommIoCbPtrFun(helperHandleRead, srv));
237 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
238 }
239
240 hlp->last_restart = squid_curtime;
241 safe_free(shortname);
242 safe_free(procname);
243 helperKickQueue(hlp);
244 }
245
246 /**
247 * DPW 2007-05-08
248 *
249 * helperStatefulOpenServers: create the stateful child helper processes
250 */
251 void
252 helperStatefulOpenServers(statefulhelper * hlp)
253 {
254 char *shortname;
255 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
256 char fd_note_buf[FD_DESC_SZ];
257 int nargs = 0;
258
259 if (hlp->cmdline == NULL)
260 return;
261
262 if (hlp->childs.concurrency)
263 debugs(84, DBG_CRITICAL, "ERROR: concurrency= is not yet supported for stateful helpers ('" << hlp->cmdline << "')");
264
265 char *progname = hlp->cmdline->key;
266
267 char *s;
268 if ((s = strrchr(progname, '/')))
269 shortname = xstrdup(s + 1);
270 else
271 shortname = xstrdup(progname);
272
273 /* figure out haw mant new helpers are needed. */
274 int need_new = hlp->childs.needNew();
275
276 debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
277
278 if (need_new < 1) {
279 debugs(84, DBG_IMPORTANT, "helperStatefulOpenServers: No '" << shortname << "' processes needed.");
280 }
281
282 char *procname = (char *)xmalloc(strlen(shortname) + 3);
283
284 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
285
286 args[nargs] = procname;
287 ++nargs;
288
289 for (wordlist *w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
290 args[nargs] = w->key;
291 ++nargs;
292 }
293
294 args[nargs] = NULL;
295 ++nargs;
296
297 assert(nargs <= HELPER_MAX_ARGS);
298
299 for (int k = 0; k < need_new; ++k) {
300 getCurrentTime();
301 int rfd = -1;
302 int wfd = -1;
303 void * hIpc;
304 pid_t pid = ipcCreate(hlp->ipc_type,
305 progname,
306 args,
307 shortname,
308 hlp->addr,
309 &rfd,
310 &wfd,
311 &hIpc);
312
313 if (pid < 0) {
314 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
315 continue;
316 }
317
318 ++ hlp->childs.n_running;
319 ++ hlp->childs.n_active;
320 helper_stateful_server *srv = new helper_stateful_server;
321 srv->hIpc = hIpc;
322 srv->pid = pid;
323 srv->flags.reserved = false;
324 srv->initStats();
325 srv->addr = hlp->addr;
326 srv->readPipe = new Comm::Connection;
327 srv->readPipe->fd = rfd;
328 srv->writePipe = new Comm::Connection;
329 srv->writePipe->fd = wfd;
330 srv->rbuf = (char *)memAllocBuf(ReadBufMinSize, &srv->rbuf_sz);
331 srv->roffset = 0;
332 srv->parent = cbdataReference(hlp);
333
334 if (hlp->datapool != NULL)
335 srv->data = hlp->datapool->alloc();
336
337 dlinkAddTail(srv, &srv->link, &hlp->servers);
338
339 if (rfd == wfd) {
340 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
341 fd_note(rfd, fd_note_buf);
342 } else {
343 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
344 fd_note(rfd, fd_note_buf);
345 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
346 fd_note(wfd, fd_note_buf);
347 }
348
349 commSetNonBlocking(rfd);
350
351 if (wfd != rfd)
352 commSetNonBlocking(wfd);
353
354 AsyncCall::Pointer closeCall = asyncCall(5,4, "helperStatefulServerFree", cbdataDialer(helperStatefulServerFree, srv));
355 comm_add_close_handler(rfd, closeCall);
356
357 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
358 CommIoCbPtrFun(helperStatefulHandleRead, srv));
359 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
360 }
361
362 hlp->last_restart = squid_curtime;
363 safe_free(shortname);
364 safe_free(procname);
365 helperStatefulKickQueue(hlp);
366 }
367
368 void
369 helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
370 {
371 if (hlp == NULL) {
372 debugs(84, 3, "helperSubmit: hlp == NULL");
373 HelperReply nilReply;
374 callback(data, nilReply);
375 return;
376 }
377
378 helper_request *r = new helper_request;
379 helper_server *srv;
380
381 r->callback = callback;
382 r->data = cbdataReference(data);
383 r->buf = xstrdup(buf);
384
385 if ((srv = GetFirstAvailable(hlp)))
386 helperDispatch(srv, r);
387 else
388 Enqueue(hlp, r);
389
390 debugs(84, DBG_DATA, Raw("buf", buf, strlen(buf)));
391 }
392
393 /// lastserver = "server last used as part of a reserved request sequence"
394 void
395 helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPCB * callback, void *data, helper_stateful_server * lastserver)
396 {
397 if (hlp == NULL) {
398 debugs(84, 3, "helperStatefulSubmit: hlp == NULL");
399 HelperReply nilReply;
400 callback(data, nilReply);
401 return;
402 }
403
404 helper_stateful_request *r = new helper_stateful_request;
405
406 r->callback = callback;
407 r->data = cbdataReference(data);
408
409 if (buf != NULL) {
410 r->buf = xstrdup(buf);
411 r->placeholder = 0;
412 } else {
413 r->buf = NULL;
414 r->placeholder = 1;
415 }
416
417 if ((buf != NULL) && lastserver) {
418 debugs(84, 5, "StatefulSubmit with lastserver " << lastserver);
419 assert(lastserver->flags.reserved);
420 assert(!(lastserver->request));
421
422 debugs(84, 5, "StatefulSubmit dispatching");
423 helperStatefulDispatch(lastserver, r);
424 } else {
425 helper_stateful_server *srv;
426 if ((srv = StatefulGetFirstAvailable(hlp))) {
427 helperStatefulDispatch(srv, r);
428 } else
429 StatefulEnqueue(hlp, r);
430 }
431
432 debugs(84, DBG_DATA, "placeholder: '" << r->placeholder <<
433 "', " << Raw("buf", buf, strlen(buf)));
434 }
435
436 /**
437 * DPW 2007-05-08
438 *
439 * helperStatefulReleaseServer tells the helper that whoever was
440 * using it no longer needs its services.
441 */
442 void
443 helperStatefulReleaseServer(helper_stateful_server * srv)
444 {
445 debugs(84, 3, HERE << "srv-" << srv->index << " flags.reserved = " << srv->flags.reserved);
446 if (!srv->flags.reserved)
447 return;
448
449 ++ srv->stats.releases;
450
451 srv->flags.reserved = false;
452 if (srv->parent->OnEmptyQueue != NULL && srv->data)
453 srv->parent->OnEmptyQueue(srv->data);
454
455 helperStatefulServerDone(srv);
456 }
457
458 /** return a pointer to the stateful routines data area */
459 void *
460 helperStatefulServerGetData(helper_stateful_server * srv)
461 {
462 return srv->data;
463 }
464
465 /**
466 * Dump some stats about the helper states to a StoreEntry
467 */
468 void
469 helperStats(StoreEntry * sentry, helper * hlp, const char *label)
470 {
471 if (!helperStartStats(sentry, hlp, label))
472 return;
473
474 storeAppendPrintf(sentry, "program: %s\n",
475 hlp->cmdline->key);
476 storeAppendPrintf(sentry, "number active: %d of %d (%d shutting down)\n",
477 hlp->childs.n_active, hlp->childs.n_max, (hlp->childs.n_running - hlp->childs.n_active) );
478 storeAppendPrintf(sentry, "requests sent: %d\n",
479 hlp->stats.requests);
480 storeAppendPrintf(sentry, "replies received: %d\n",
481 hlp->stats.replies);
482 storeAppendPrintf(sentry, "queue length: %d\n",
483 hlp->stats.queue_size);
484 storeAppendPrintf(sentry, "avg service time: %d msec\n",
485 hlp->stats.avg_svc_time);
486 storeAppendPrintf(sentry, "\n");
487 storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%11s\t%s\t%7s\t%7s\t%7s\n",
488 "ID #",
489 "FD",
490 "PID",
491 "# Requests",
492 "# Replies",
493 "Flags",
494 "Time",
495 "Offset",
496 "Request");
497
498 for (dlink_node *link = hlp->servers.head; link; link = link->next) {
499 helper_server *srv = (helper_server*)link->data;
500 double tt = 0.001 * (srv->requests[0] ? tvSubMsec(srv->requests[0]->dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
501 storeAppendPrintf(sentry, "%7u\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c\t%7.3f\t%7d\t%s\n",
502 srv->index.value,
503 srv->readPipe->fd,
504 srv->pid,
505 srv->stats.uses,
506 srv->stats.replies,
507 srv->stats.pending ? 'B' : ' ',
508 srv->flags.writing ? 'W' : ' ',
509 srv->flags.closing ? 'C' : ' ',
510 srv->flags.shutdown ? 'S' : ' ',
511 tt < 0.0 ? 0.0 : tt,
512 (int) srv->roffset,
513 srv->requests[0] ? Format::QuoteMimeBlob(srv->requests[0]->buf) : "(none)");
514 }
515
516 storeAppendPrintf(sentry, "\nFlags key:\n\n");
517 storeAppendPrintf(sentry, " B = BUSY\n");
518 storeAppendPrintf(sentry, " W = WRITING\n");
519 storeAppendPrintf(sentry, " C = CLOSING\n");
520 storeAppendPrintf(sentry, " S = SHUTDOWN PENDING\n");
521 }
522
523 void
524 helperStatefulStats(StoreEntry * sentry, statefulhelper * hlp, const char *label)
525 {
526 if (!helperStartStats(sentry, hlp, label))
527 return;
528
529 storeAppendPrintf(sentry, "program: %s\n",
530 hlp->cmdline->key);
531 storeAppendPrintf(sentry, "number active: %d of %d (%d shutting down)\n",
532 hlp->childs.n_active, hlp->childs.n_max, (hlp->childs.n_running - hlp->childs.n_active) );
533 storeAppendPrintf(sentry, "requests sent: %d\n",
534 hlp->stats.requests);
535 storeAppendPrintf(sentry, "replies received: %d\n",
536 hlp->stats.replies);
537 storeAppendPrintf(sentry, "queue length: %d\n",
538 hlp->stats.queue_size);
539 storeAppendPrintf(sentry, "avg service time: %d msec\n",
540 hlp->stats.avg_svc_time);
541 storeAppendPrintf(sentry, "\n");
542 storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
543 "ID #",
544 "FD",
545 "PID",
546 "# Requests",
547 "# Replies",
548 "Flags",
549 "Time",
550 "Offset",
551 "Request");
552
553 for (dlink_node *link = hlp->servers.head; link; link = link->next) {
554 helper_stateful_server *srv = (helper_stateful_server *)link->data;
555 double tt = 0.001 * tvSubMsec(srv->dispatch_time, srv->flags.busy ? current_time : srv->answer_time);
556 storeAppendPrintf(sentry, "%7u\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
557 srv->index.value,
558 srv->readPipe->fd,
559 srv->pid,
560 srv->stats.uses,
561 srv->stats.replies,
562 srv->flags.busy ? 'B' : ' ',
563 srv->flags.closing ? 'C' : ' ',
564 srv->flags.reserved ? 'R' : ' ',
565 srv->flags.shutdown ? 'S' : ' ',
566 srv->request ? (srv->request->placeholder ? 'P' : ' ') : ' ',
567 tt < 0.0 ? 0.0 : tt,
568 (int) srv->roffset,
569 srv->request ? Format::QuoteMimeBlob(srv->request->buf) : "(none)");
570 }
571
572 storeAppendPrintf(sentry, "\nFlags key:\n\n");
573 storeAppendPrintf(sentry, " B = BUSY\n");
574 storeAppendPrintf(sentry, " C = CLOSING\n");
575 storeAppendPrintf(sentry, " R = RESERVED\n");
576 storeAppendPrintf(sentry, " S = SHUTDOWN PENDING\n");
577 storeAppendPrintf(sentry, " P = PLACEHOLDER\n");
578 }
579
580 void
581 helperShutdown(helper * hlp)
582 {
583 dlink_node *link = hlp->servers.head;
584
585 while (link) {
586 helper_server *srv;
587 srv = (helper_server *)link->data;
588 link = link->next;
589
590 if (srv->flags.shutdown) {
591 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
592 continue;
593 }
594
595 assert(hlp->childs.n_active > 0);
596 -- hlp->childs.n_active;
597 srv->flags.shutdown = true; /* request it to shut itself down */
598
599 if (srv->flags.closing) {
600 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
601 continue;
602 }
603
604 if (srv->stats.pending) {
605 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
606 continue;
607 }
608
609 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
610 /* the rest of the details is dealt with in the helperServerFree
611 * close handler
612 */
613 srv->closePipesSafely(hlp->id_name);
614 }
615 }
616
617 void
618 helperStatefulShutdown(statefulhelper * hlp)
619 {
620 dlink_node *link = hlp->servers.head;
621 helper_stateful_server *srv;
622
623 while (link) {
624 srv = (helper_stateful_server *)link->data;
625 link = link->next;
626
627 if (srv->flags.shutdown) {
628 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
629 continue;
630 }
631
632 assert(hlp->childs.n_active > 0);
633 -- hlp->childs.n_active;
634 srv->flags.shutdown = true; /* request it to shut itself down */
635
636 if (srv->flags.busy) {
637 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
638 continue;
639 }
640
641 if (srv->flags.closing) {
642 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
643 continue;
644 }
645
646 if (srv->flags.reserved) {
647 if (shutting_down) {
648 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Closing anyway.");
649 } else {
650 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Not Shutting Down Yet.");
651 continue;
652 }
653 }
654
655 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
656
657 /* the rest of the details is dealt with in the helperStatefulServerFree
658 * close handler
659 */
660 srv->closePipesSafely(hlp->id_name);
661 }
662 }
663
664 helper::~helper()
665 {
666 /* note, don't free id_name, it probably points to static memory */
667
668 if (queue.head)
669 debugs(84, DBG_CRITICAL, "WARNING: freeing " << id_name << " helper with " << stats.queue_size << " requests queued");
670 }
671
672 /* ====================================================================== */
673 /* LOCAL FUNCTIONS */
674 /* ====================================================================== */
675
676 static void
677 helperServerFree(helper_server *srv)
678 {
679 helper *hlp = srv->parent;
680 helper_request *r;
681 int i, concurrency = hlp->childs.concurrency;
682
683 if (!concurrency)
684 concurrency = 1;
685
686 if (srv->rbuf) {
687 memFreeBuf(srv->rbuf_sz, srv->rbuf);
688 srv->rbuf = NULL;
689 }
690
691 srv->wqueue->clean();
692 delete srv->wqueue;
693
694 if (srv->writebuf) {
695 srv->writebuf->clean();
696 delete srv->writebuf;
697 srv->writebuf = NULL;
698 }
699
700 if (Comm::IsConnOpen(srv->writePipe))
701 srv->closeWritePipeSafely(hlp->id_name);
702
703 dlinkDelete(&srv->link, &hlp->servers);
704
705 assert(hlp->childs.n_running > 0);
706 -- hlp->childs.n_running;
707
708 if (!srv->flags.shutdown) {
709 assert(hlp->childs.n_active > 0);
710 -- hlp->childs.n_active;
711 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index << " exited");
712
713 if (hlp->childs.needNew() > 0) {
714 debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
715
716 if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30) {
717 if (srv->stats.replies < 1)
718 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
719 else
720 debugs(80, DBG_CRITICAL, "ERROR: The " << hlp->id_name << " helpers are crashing too rapidly, need help!");
721 }
722
723 debugs(80, DBG_IMPORTANT, "Starting new helpers");
724 helperOpenServers(hlp);
725 }
726 }
727
728 for (i = 0; i < concurrency; ++i) {
729 // XXX: re-schedule these on another helper?
730 if ((r = srv->requests[i])) {
731 void *cbdata;
732
733 if (cbdataReferenceValidDone(r->data, &cbdata)) {
734 HelperReply nilReply;
735 r->callback(cbdata, nilReply);
736 }
737
738 helperRequestFree(r);
739
740 srv->requests[i] = NULL;
741 }
742 }
743 safe_free(srv->requests);
744
745 cbdataReferenceDone(srv->parent);
746 delete srv;
747 }
748
749 static void
750 helperStatefulServerFree(helper_stateful_server *srv)
751 {
752 statefulhelper *hlp = srv->parent;
753 helper_stateful_request *r;
754
755 if (srv->rbuf) {
756 memFreeBuf(srv->rbuf_sz, srv->rbuf);
757 srv->rbuf = NULL;
758 }
759
760 #if 0
761 srv->wqueue->clean();
762
763 delete srv->wqueue;
764
765 #endif
766
767 /* TODO: walk the local queue of requests and carry them all out */
768 if (Comm::IsConnOpen(srv->writePipe))
769 srv->closeWritePipeSafely(hlp->id_name);
770
771 dlinkDelete(&srv->link, &hlp->servers);
772
773 assert(hlp->childs.n_running > 0);
774 -- hlp->childs.n_running;
775
776 if (!srv->flags.shutdown) {
777 assert( hlp->childs.n_active > 0);
778 -- hlp->childs.n_active;
779 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index << " exited");
780
781 if (hlp->childs.needNew() > 0) {
782 debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
783
784 if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30) {
785 if (srv->stats.replies < 1)
786 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
787 else
788 debugs(80, DBG_CRITICAL, "ERROR: The " << hlp->id_name << " helpers are crashing too rapidly, need help!");
789 }
790
791 debugs(80, DBG_IMPORTANT, "Starting new helpers");
792 helperStatefulOpenServers(hlp);
793 }
794 }
795
796 if ((r = srv->request)) {
797 void *cbdata;
798
799 if (cbdataReferenceValidDone(r->data, &cbdata)) {
800 HelperReply nilReply;
801 nilReply.whichServer = srv;
802 r->callback(cbdata, nilReply);
803 }
804
805 helperStatefulRequestFree(r);
806
807 srv->request = NULL;
808 }
809
810 if (srv->data != NULL)
811 hlp->datapool->freeOne(srv->data);
812
813 cbdataReferenceDone(srv->parent);
814
815 delete srv;
816 }
817
818 /// Calls back with a pointer to the buffer with the helper output
819 static void
820 helperReturnBuffer(int request_number, helper_server * srv, helper * hlp, char * msg, char * msg_end)
821 {
822 helper_request *r = srv->requests[request_number];
823 if (r) {
824 HLPCB *callback = r->callback;
825
826 srv->requests[request_number] = NULL;
827
828 r->callback = NULL;
829
830 void *cbdata = NULL;
831 if (cbdataReferenceValidDone(r->data, &cbdata)) {
832 HelperReply response(msg, (msg_end-msg));
833 callback(cbdata, response);
834 }
835
836 -- srv->stats.pending;
837 ++ srv->stats.replies;
838
839 ++ hlp->stats.replies;
840
841 srv->answer_time = current_time;
842
843 srv->dispatch_time = r->dispatch_time;
844
845 hlp->stats.avg_svc_time =
846 Math::intAverage(hlp->stats.avg_svc_time,
847 tvSubMsec(r->dispatch_time, current_time),
848 hlp->stats.replies, REDIRECT_AV_FACTOR);
849
850 helperRequestFree(r);
851 } else {
852 debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected reply on channel " <<
853 request_number << " from " << hlp->id_name << " #" << srv->index <<
854 " '" << srv->rbuf << "'");
855 }
856
857 if (!srv->flags.shutdown) {
858 helperKickQueue(hlp);
859 } else if (!srv->flags.closing && !srv->stats.pending) {
860 srv->flags.closing=true;
861 srv->writePipe->close();
862 }
863 }
864
865 static void
866 helperHandleRead(const Comm::ConnectionPointer &conn, char *buf, size_t len, Comm::Flag flag, int xerrno, void *data)
867 {
868 char *t = NULL;
869 helper_server *srv = (helper_server *)data;
870 helper *hlp = srv->parent;
871 assert(cbdataReferenceValid(data));
872
873 /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
874
875 if (flag == Comm::ERR_CLOSING) {
876 return;
877 }
878
879 assert(conn->fd == srv->readPipe->fd);
880
881 debugs(84, 5, "helperHandleRead: " << len << " bytes from " << hlp->id_name << " #" << srv->index);
882
883 if (flag != Comm::OK || len == 0) {
884 srv->closePipesSafely(hlp->id_name);
885 return;
886 }
887
888 srv->roffset += len;
889 srv->rbuf[srv->roffset] = '\0';
890 debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
891
892 if (!srv->stats.pending) {
893 /* someone spoke without being spoken to */
894 debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected read from " <<
895 hlp->id_name << " #" << srv->index << ", " << (int)len <<
896 " bytes '" << srv->rbuf << "'");
897
898 srv->roffset = 0;
899 srv->rbuf[0] = '\0';
900 }
901
902 while ((t = strchr(srv->rbuf, hlp->eom))) {
903 /* end of reply found */
904 char *msg = srv->rbuf;
905 int i = 0;
906 int skip = 1;
907 debugs(84, 3, "helperHandleRead: end of reply found");
908
909 if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n') {
910 *t = '\0';
911 // rewind to the \r octet which is the real terminal now
912 // and remember that we have to skip forward 2 places now.
913 skip = 2;
914 --t;
915 }
916
917 *t = '\0';
918
919 if (hlp->childs.concurrency) {
920 i = strtol(msg, &msg, 10);
921
922 while (*msg && xisspace(*msg))
923 ++msg;
924 }
925
926 helperReturnBuffer(i, srv, hlp, msg, t);
927 srv->roffset -= (t - srv->rbuf) + skip;
928 memmove(srv->rbuf, t + skip, srv->roffset);
929 srv->rbuf[srv->roffset] = '\0';
930 }
931
932 if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
933 int spaceSize = srv->rbuf_sz - srv->roffset - 1;
934 assert(spaceSize >= 0);
935
936 // grow the input buffer if needed and possible
937 if (!spaceSize && srv->rbuf_sz + 4096 <= ReadBufMaxSize) {
938 srv->rbuf = (char *)memReallocBuf(srv->rbuf, srv->rbuf_sz + 4096, &srv->rbuf_sz);
939 debugs(84, 3, HERE << "Grew read buffer to " << srv->rbuf_sz);
940 spaceSize = srv->rbuf_sz - srv->roffset - 1;
941 assert(spaceSize >= 0);
942 }
943
944 // quit reading if there is no space left
945 if (!spaceSize) {
946 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
947 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
948 "Squid input buffer: " << hlp->id_name << " #" << srv->index);
949 srv->closePipesSafely(hlp->id_name);
950 return;
951 }
952
953 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
954 CommIoCbPtrFun(helperHandleRead, srv));
955 comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
956 }
957 }
958
959 static void
960 helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *buf, size_t len, Comm::Flag flag, int xerrno, void *data)
961 {
962 char *t = NULL;
963 helper_stateful_server *srv = (helper_stateful_server *)data;
964 helper_stateful_request *r;
965 statefulhelper *hlp = srv->parent;
966 assert(cbdataReferenceValid(data));
967
968 /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
969
970 if (flag == Comm::ERR_CLOSING) {
971 return;
972 }
973
974 assert(conn->fd == srv->readPipe->fd);
975
976 debugs(84, 5, "helperStatefulHandleRead: " << len << " bytes from " <<
977 hlp->id_name << " #" << srv->index);
978
979 if (flag != Comm::OK || len == 0) {
980 srv->closePipesSafely(hlp->id_name);
981 return;
982 }
983
984 srv->roffset += len;
985 srv->rbuf[srv->roffset] = '\0';
986 r = srv->request;
987 debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
988
989 if (r == NULL) {
990 /* someone spoke without being spoken to */
991 debugs(84, DBG_IMPORTANT, "helperStatefulHandleRead: unexpected read from " <<
992 hlp->id_name << " #" << srv->index << ", " << (int)len <<
993 " bytes '" << srv->rbuf << "'");
994
995 srv->roffset = 0;
996 }
997
998 if ((t = strchr(srv->rbuf, hlp->eom))) {
999 /* end of reply found */
1000 int called = 1;
1001 int skip = 1;
1002 debugs(84, 3, "helperStatefulHandleRead: end of reply found");
1003
1004 if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n') {
1005 *t = '\0';
1006 // rewind to the \r octet which is the real terminal now
1007 // and remember that we have to skip forward 2 places now.
1008 skip = 2;
1009 --t;
1010 }
1011
1012 *t = '\0';
1013
1014 if (r && cbdataReferenceValid(r->data)) {
1015 HelperReply res(srv->rbuf, (t - srv->rbuf));
1016 res.whichServer = srv;
1017 r->callback(r->data, res);
1018 } else {
1019 debugs(84, DBG_IMPORTANT, "StatefulHandleRead: no callback data registered");
1020 called = 0;
1021 }
1022 // only skip off the \0's _after_ passing its location in HelperReply above
1023 t += skip;
1024
1025 srv->flags.busy = false;
1026 /**
1027 * BUG: the below assumes that only one response per read() was received and discards any octets remaining.
1028 * Doing this prohibits concurrency support with multiple replies per read().
1029 * TODO: check that read() setup on these buffers pays attention to roffest!=0
1030 * TODO: check that replies bigger than the buffer are discarded and do not to affect future replies
1031 */
1032 srv->roffset = 0;
1033 helperStatefulRequestFree(r);
1034 srv->request = NULL;
1035
1036 -- srv->stats.pending;
1037 ++ srv->stats.replies;
1038
1039 ++ hlp->stats.replies;
1040 srv->answer_time = current_time;
1041 hlp->stats.avg_svc_time =
1042 Math::intAverage(hlp->stats.avg_svc_time,
1043 tvSubMsec(srv->dispatch_time, current_time),
1044 hlp->stats.replies, REDIRECT_AV_FACTOR);
1045
1046 if (called)
1047 helperStatefulServerDone(srv);
1048 else
1049 helperStatefulReleaseServer(srv);
1050 }
1051
1052 if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
1053 int spaceSize = srv->rbuf_sz - srv->roffset - 1;
1054 assert(spaceSize >= 0);
1055
1056 // grow the input buffer if needed and possible
1057 if (!spaceSize && srv->rbuf_sz + 4096 <= ReadBufMaxSize) {
1058 srv->rbuf = (char *)memReallocBuf(srv->rbuf, srv->rbuf_sz + 4096, &srv->rbuf_sz);
1059 debugs(84, 3, HERE << "Grew read buffer to " << srv->rbuf_sz);
1060 spaceSize = srv->rbuf_sz - srv->roffset - 1;
1061 assert(spaceSize >= 0);
1062 }
1063
1064 // quit reading if there is no space left
1065 if (!spaceSize) {
1066 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
1067 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
1068 "Squid input buffer: " << hlp->id_name << " #" << srv->index);
1069 srv->closePipesSafely(hlp->id_name);
1070 return;
1071 }
1072
1073 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
1074 CommIoCbPtrFun(helperStatefulHandleRead, srv));
1075 comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
1076 }
1077 }
1078
1079 static void
1080 Enqueue(helper * hlp, helper_request * r)
1081 {
1082 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
1083 dlinkAddTail(r, link, &hlp->queue);
1084 ++ hlp->stats.queue_size;
1085
1086 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1087 if (hlp->childs.needNew() > 0) {
1088 debugs(84, DBG_CRITICAL, "Starting new " << hlp->id_name << " helpers...");
1089 helperOpenServers(hlp);
1090 return;
1091 }
1092
1093 if (hlp->stats.queue_size < (int)hlp->childs.n_running)
1094 return;
1095
1096 if (squid_curtime - hlp->last_queue_warn < 600)
1097 return;
1098
1099 if (shutting_down || reconfiguring)
1100 return;
1101
1102 hlp->last_queue_warn = squid_curtime;
1103
1104 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1105 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1106 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1107
1108 if (hlp->stats.queue_size > (int)hlp->childs.n_running * 2)
1109 fatalf("Too many queued %s requests", hlp->id_name);
1110 }
1111
1112 static void
1113 StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r)
1114 {
1115 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
1116 dlinkAddTail(r, link, &hlp->queue);
1117 ++ hlp->stats.queue_size;
1118
1119 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1120 if (hlp->childs.needNew() > 0) {
1121 debugs(84, DBG_CRITICAL, "Starting new " << hlp->id_name << " helpers...");
1122 helperStatefulOpenServers(hlp);
1123 return;
1124 }
1125
1126 if (hlp->stats.queue_size < (int)hlp->childs.n_running)
1127 return;
1128
1129 if (hlp->stats.queue_size > (int)hlp->childs.n_running * 2)
1130 fatalf("Too many queued %s requests", hlp->id_name);
1131
1132 if (squid_curtime - hlp->last_queue_warn < 600)
1133 return;
1134
1135 if (shutting_down || reconfiguring)
1136 return;
1137
1138 hlp->last_queue_warn = squid_curtime;
1139
1140 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1141 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1142 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1143 }
1144
1145 static helper_request *
1146 Dequeue(helper * hlp)
1147 {
1148 dlink_node *link;
1149 helper_request *r = NULL;
1150
1151 if ((link = hlp->queue.head)) {
1152 r = (helper_request *)link->data;
1153 dlinkDelete(link, &hlp->queue);
1154 memFree(link, MEM_DLINK_NODE);
1155 -- hlp->stats.queue_size;
1156 }
1157
1158 return r;
1159 }
1160
1161 static helper_stateful_request *
1162 StatefulDequeue(statefulhelper * hlp)
1163 {
1164 dlink_node *link;
1165 helper_stateful_request *r = NULL;
1166
1167 if ((link = hlp->queue.head)) {
1168 r = (helper_stateful_request *)link->data;
1169 dlinkDelete(link, &hlp->queue);
1170 memFree(link, MEM_DLINK_NODE);
1171 -- hlp->stats.queue_size;
1172 }
1173
1174 return r;
1175 }
1176
1177 static helper_server *
1178 GetFirstAvailable(helper * hlp)
1179 {
1180 dlink_node *n;
1181 helper_server *srv;
1182 helper_server *selected = NULL;
1183 debugs(84, 5, "GetFirstAvailable: Running servers " << hlp->childs.n_running);
1184
1185 if (hlp->childs.n_running == 0)
1186 return NULL;
1187
1188 /* Find "least" loaded helper (approx) */
1189 for (n = hlp->servers.head; n != NULL; n = n->next) {
1190 srv = (helper_server *)n->data;
1191
1192 if (selected && selected->stats.pending <= srv->stats.pending)
1193 continue;
1194
1195 if (srv->flags.shutdown)
1196 continue;
1197
1198 if (!srv->stats.pending)
1199 return srv;
1200
1201 if (selected) {
1202 selected = srv;
1203 break;
1204 }
1205
1206 selected = srv;
1207 }
1208
1209 /* Check for overload */
1210 if (!selected) {
1211 debugs(84, 5, "GetFirstAvailable: None available.");
1212 return NULL;
1213 }
1214
1215 if (selected->stats.pending >= (hlp->childs.concurrency ? hlp->childs.concurrency : 1)) {
1216 debugs(84, 3, "GetFirstAvailable: Least-loaded helper is overloaded!");
1217 return NULL;
1218 }
1219
1220 debugs(84, 5, "GetFirstAvailable: returning srv-" << selected->index);
1221 return selected;
1222 }
1223
1224 static helper_stateful_server *
1225 StatefulGetFirstAvailable(statefulhelper * hlp)
1226 {
1227 dlink_node *n;
1228 helper_stateful_server *srv = NULL;
1229 debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp->childs.n_running);
1230
1231 if (hlp->childs.n_running == 0)
1232 return NULL;
1233
1234 for (n = hlp->servers.head; n != NULL; n = n->next) {
1235 srv = (helper_stateful_server *)n->data;
1236
1237 if (srv->flags.busy)
1238 continue;
1239
1240 if (srv->flags.reserved)
1241 continue;
1242
1243 if (srv->flags.shutdown)
1244 continue;
1245
1246 if ((hlp->IsAvailable != NULL) && (srv->data != NULL) && !(hlp->IsAvailable(srv->data)))
1247 continue;
1248
1249 debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv->index);
1250 return srv;
1251 }
1252
1253 debugs(84, 5, "StatefulGetFirstAvailable: None available.");
1254 return NULL;
1255 }
1256
1257 static void
1258 helperDispatchWriteDone(const Comm::ConnectionPointer &conn, char *buf, size_t len, Comm::Flag flag, int xerrno, void *data)
1259 {
1260 helper_server *srv = (helper_server *)data;
1261
1262 srv->writebuf->clean();
1263 delete srv->writebuf;
1264 srv->writebuf = NULL;
1265 srv->flags.writing = false;
1266
1267 if (flag != Comm::OK) {
1268 /* Helper server has crashed */
1269 debugs(84, DBG_CRITICAL, "helperDispatch: Helper " << srv->parent->id_name << " #" << srv->index << " has crashed");
1270 return;
1271 }
1272
1273 if (!srv->wqueue->isNull()) {
1274 srv->writebuf = srv->wqueue;
1275 srv->wqueue = new MemBuf;
1276 srv->flags.writing = true;
1277 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1278 CommIoCbPtrFun(helperDispatchWriteDone, srv));
1279 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
1280 }
1281 }
1282
1283 static void
1284 helperDispatch(helper_server * srv, helper_request * r)
1285 {
1286 helper *hlp = srv->parent;
1287 helper_request **ptr = NULL;
1288 unsigned int slot;
1289
1290 if (!cbdataReferenceValid(r->data)) {
1291 debugs(84, DBG_IMPORTANT, "helperDispatch: invalid callback data");
1292 helperRequestFree(r);
1293 return;
1294 }
1295
1296 for (slot = 0; slot < (hlp->childs.concurrency ? hlp->childs.concurrency : 1); ++slot) {
1297 if (!srv->requests[slot]) {
1298 ptr = &srv->requests[slot];
1299 break;
1300 }
1301 }
1302
1303 assert(ptr);
1304 *ptr = r;
1305 r->dispatch_time = current_time;
1306
1307 if (srv->wqueue->isNull())
1308 srv->wqueue->init();
1309
1310 if (hlp->childs.concurrency)
1311 srv->wqueue->Printf("%d %s", slot, r->buf);
1312 else
1313 srv->wqueue->append(r->buf, strlen(r->buf));
1314
1315 if (!srv->flags.writing) {
1316 assert(NULL == srv->writebuf);
1317 srv->writebuf = srv->wqueue;
1318 srv->wqueue = new MemBuf;
1319 srv->flags.writing = true;
1320 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1321 CommIoCbPtrFun(helperDispatchWriteDone, srv));
1322 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
1323 }
1324
1325 debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index << ", " << strlen(r->buf) << " bytes");
1326
1327 ++ srv->stats.uses;
1328 ++ srv->stats.pending;
1329 ++ hlp->stats.requests;
1330 }
1331
1332 static void
1333 helperStatefulDispatchWriteDone(const Comm::ConnectionPointer &conn, char *buf, size_t len, Comm::Flag flag,
1334 int xerrno, void *data)
1335 {
1336 /* nothing! */
1337 }
1338
1339 static void
1340 helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r)
1341 {
1342 statefulhelper *hlp = srv->parent;
1343
1344 if (!cbdataReferenceValid(r->data)) {
1345 debugs(84, DBG_IMPORTANT, "helperStatefulDispatch: invalid callback data");
1346 helperStatefulRequestFree(r);
1347 helperStatefulReleaseServer(srv);
1348 return;
1349 }
1350
1351 debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index);
1352
1353 if (r->placeholder == 1) {
1354 /* a callback is needed before this request can _use_ a helper. */
1355 /* we don't care about releasing this helper. The request NEVER
1356 * gets to the helper. So we throw away the return code */
1357 HelperReply nilReply;
1358 nilReply.whichServer = srv;
1359 r->callback(r->data, nilReply);
1360 /* throw away the placeholder */
1361 helperStatefulRequestFree(r);
1362 /* and push the queue. Note that the callback may have submitted a new
1363 * request to the helper which is why we test for the request */
1364
1365 if (srv->request == NULL)
1366 helperStatefulServerDone(srv);
1367
1368 return;
1369 }
1370
1371 srv->flags.busy = true;
1372 srv->flags.reserved = true;
1373 srv->request = r;
1374 srv->dispatch_time = current_time;
1375 AsyncCall::Pointer call = commCbCall(5,5, "helperStatefulDispatchWriteDone",
1376 CommIoCbPtrFun(helperStatefulDispatchWriteDone, hlp));
1377 Comm::Write(srv->writePipe, r->buf, strlen(r->buf), call, NULL);
1378 debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
1379 hlp->id_name << " #" << srv->index << ", " <<
1380 (int) strlen(r->buf) << " bytes");
1381
1382 ++ srv->stats.uses;
1383 ++ srv->stats.pending;
1384 ++ hlp->stats.requests;
1385 }
1386
1387 static void
1388 helperKickQueue(helper * hlp)
1389 {
1390 helper_request *r;
1391 helper_server *srv;
1392
1393 while ((srv = GetFirstAvailable(hlp)) && (r = Dequeue(hlp)))
1394 helperDispatch(srv, r);
1395 }
1396
1397 static void
1398 helperStatefulKickQueue(statefulhelper * hlp)
1399 {
1400 helper_stateful_request *r;
1401 helper_stateful_server *srv;
1402
1403 while ((srv = StatefulGetFirstAvailable(hlp)) && (r = StatefulDequeue(hlp)))
1404 helperStatefulDispatch(srv, r);
1405 }
1406
1407 static void
1408 helperStatefulServerDone(helper_stateful_server * srv)
1409 {
1410 if (!srv->flags.shutdown) {
1411 helperStatefulKickQueue(srv->parent);
1412 } else if (!srv->flags.closing && !srv->flags.reserved && !srv->flags.busy) {
1413 srv->closeWritePipeSafely(srv->parent->id_name);
1414 return;
1415 }
1416 }
1417
1418 static void
1419 helperRequestFree(helper_request * r)
1420 {
1421 cbdataReferenceDone(r->data);
1422 xfree(r->buf);
1423 delete r;
1424 }
1425
1426 static void
1427 helperStatefulRequestFree(helper_stateful_request * r)
1428 {
1429 if (r) {
1430 cbdataReferenceDone(r->data);
1431 xfree(r->buf);
1432 delete r;
1433 }
1434 }
1435
1436 // TODO: should helper_ and helper_stateful_ have a common parent?
1437 static bool
1438 helperStartStats(StoreEntry *sentry, void *hlp, const char *label)
1439 {
1440 if (!hlp) {
1441 if (label)
1442 storeAppendPrintf(sentry, "%s: unavailable\n", label);
1443 return false;
1444 }
1445
1446 if (label)
1447 storeAppendPrintf(sentry, "%s:\n", label);
1448
1449 return true;
1450 }