]> git.ipfire.org Git - thirdparty/squid.git/blob - src/helper.cc
Merge from trunk
[thirdparty/squid.git] / src / helper.cc
1 /*
2 * DEBUG: section 84 Helper process maintenance
3 * AUTHOR: Harvest Derived?
4 *
5 * SQUID Web Proxy Cache http://www.squid-cache.org/
6 * ----------------------------------------------------------
7 *
8 * Squid is the result of efforts by numerous individuals from
9 * the Internet community; see the CONTRIBUTORS file for full
10 * details. Many organizations have provided support for Squid's
11 * development; see the SPONSORS file for full details. Squid is
12 * Copyrighted (C) 2001 by the Regents of the University of
13 * California; see the COPYRIGHT file for full details. Squid
14 * incorporates software developed and/or copyrighted by other
15 * sources; see the CREDITS file for full details.
16 *
17 * This program is free software; you can redistribute it and/or modify
18 * it under the terms of the GNU General Public License as published by
19 * the Free Software Foundation; either version 2 of the License, or
20 * (at your option) any later version.
21 *
22 * This program is distributed in the hope that it will be useful,
23 * but WITHOUT ANY WARRANTY; without even the implied warranty of
24 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
25 * GNU General Public License for more details.
26 *
27 * You should have received a copy of the GNU General Public License
28 * along with this program; if not, write to the Free Software
29 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
30 *
31 */
32
33 #include "squid.h"
34 #include "base/AsyncCbdataCalls.h"
35 #include "comm.h"
36 #include "comm/Connection.h"
37 #include "comm/Write.h"
38 #include "fd.h"
39 #include "fde.h"
40 #include "format/Quoting.h"
41 #include "helper.h"
42 #include "Mem.h"
43 #include "MemBuf.h"
44 #include "SquidIpc.h"
45 #include "SquidMath.h"
46 #include "SquidTime.h"
47 #include "Store.h"
48 #include "wordlist.h"
49
50 #define HELPER_MAX_ARGS 64
51
52 /** Initial Squid input buffer size. Helper responses may exceed this, and
53 * Squid will grow the input buffer as needed, up to ReadBufMaxSize.
54 */
55 const size_t ReadBufMinSize(4*1024);
56
57 /** Maximum safe size of a helper-to-Squid response message plus one.
58 * Squid will warn and close the stream if a helper sends a too-big response.
59 * ssl_crtd helper is known to produce responses of at least 10KB in size.
60 * Some undocumented helpers are known to produce responses exceeding 8KB.
61 */
62 const size_t ReadBufMaxSize(32*1024);
63
64 static IOCB helperHandleRead;
65 static IOCB helperStatefulHandleRead;
66 static void helperServerFree(helper_server *srv);
67 static void helperStatefulServerFree(helper_stateful_server *srv);
68 static void Enqueue(helper * hlp, helper_request *);
69 static helper_request *Dequeue(helper * hlp);
70 static helper_stateful_request *StatefulDequeue(statefulhelper * hlp);
71 static helper_server *GetFirstAvailable(helper * hlp);
72 static helper_stateful_server *StatefulGetFirstAvailable(statefulhelper * hlp);
73 static void helperDispatch(helper_server * srv, helper_request * r);
74 static void helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r);
75 static void helperKickQueue(helper * hlp);
76 static void helperStatefulKickQueue(statefulhelper * hlp);
77 static void helperStatefulServerDone(helper_stateful_server * srv);
78 static void helperRequestFree(helper_request * r);
79 static void helperStatefulRequestFree(helper_stateful_request * r);
80 static void StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r);
81 static bool helperStartStats(StoreEntry *sentry, void *hlp, const char *label);
82
83 CBDATA_CLASS_INIT(helper);
84 CBDATA_TYPE(helper_server);
85 CBDATA_CLASS_INIT(statefulhelper);
86 CBDATA_TYPE(helper_stateful_server);
87
88 void
89 HelperServerBase::initStats()
90 {
91 stats.uses=0;
92 stats.replies=0;
93 stats.pending=0;
94 stats.releases=0;
95 }
96
97 void
98 HelperServerBase::closePipesSafely()
99 {
100 #if _SQUID_WINDOWS_
101 int no = index + 1;
102
103 shutdown(writePipe->fd, SD_BOTH);
104 #endif
105
106 flags.closing = true;
107 if (readPipe->fd == writePipe->fd)
108 readPipe->fd = -1;
109 else
110 readPipe->close();
111 writePipe->close();
112
113 #if _SQUID_WINDOWS_
114 if (hIpc) {
115 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
116 getCurrentTime();
117 debugs(84, DBG_IMPORTANT, "WARNING: " << hlp->id_name <<
118 " #" << no << " (" << hlp->cmdline->key << "," <<
119 (long int)pid << ") didn't exit in 5 seconds");
120 }
121 CloseHandle(hIpc);
122 }
123 #endif
124 }
125
126 void
127 HelperServerBase::closeWritePipeSafely()
128 {
129 #if _SQUID_WINDOWS_
130 int no = index + 1;
131
132 shutdown(writePipe->fd, (readPipe->fd == writePipe->fd ? SD_BOTH : SD_SEND));
133 #endif
134
135 flags.closing = true;
136 if (readPipe->fd == writePipe->fd)
137 readPipe->fd = -1;
138 writePipe->close();
139
140 #if _SQUID_WINDOWS_
141 if (hIpc) {
142 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
143 getCurrentTime();
144 debugs(84, DBG_IMPORTANT, "WARNING: " << hlp->id_name <<
145 " #" << no << " (" << hlp->cmdline->key << "," <<
146 (long int)pid << ") didn't exit in 5 seconds");
147 }
148 CloseHandle(hIpc);
149 }
150 #endif
151 }
152
153 void
154 helperOpenServers(helper * hlp)
155 {
156 char *s;
157 char *progname;
158 char *shortname;
159 char *procname;
160 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
161 char fd_note_buf[FD_DESC_SZ];
162 helper_server *srv;
163 int nargs = 0;
164 int k;
165 pid_t pid;
166 int rfd;
167 int wfd;
168 void * hIpc;
169 wordlist *w;
170
171 if (hlp->cmdline == NULL)
172 return;
173
174 progname = hlp->cmdline->key;
175
176 if ((s = strrchr(progname, '/')))
177 shortname = xstrdup(s + 1);
178 else
179 shortname = xstrdup(progname);
180
181 /* figure out how many new child are actually needed. */
182 int need_new = hlp->childs.needNew();
183
184 debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
185
186 if (need_new < 1) {
187 debugs(84, DBG_IMPORTANT, "helperOpenServers: No '" << shortname << "' processes needed.");
188 }
189
190 procname = (char *)xmalloc(strlen(shortname) + 3);
191
192 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
193
194 args[nargs] = procname;
195 ++nargs;
196
197 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
198 args[nargs] = w->key;
199 ++nargs;
200 }
201
202 args[nargs] = NULL;
203 ++nargs;
204
205 assert(nargs <= HELPER_MAX_ARGS);
206
207 for (k = 0; k < need_new; ++k) {
208 getCurrentTime();
209 rfd = wfd = -1;
210 pid = ipcCreate(hlp->ipc_type,
211 progname,
212 args,
213 shortname,
214 hlp->addr,
215 &rfd,
216 &wfd,
217 &hIpc);
218
219 if (pid < 0) {
220 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
221 continue;
222 }
223
224 ++ hlp->childs.n_running;
225 ++ hlp->childs.n_active;
226 CBDATA_INIT_TYPE(helper_server);
227 srv = cbdataAlloc(helper_server);
228 srv->hIpc = hIpc;
229 srv->pid = pid;
230 srv->initStats();
231 srv->index = k;
232 srv->addr = hlp->addr;
233 srv->readPipe = new Comm::Connection;
234 srv->readPipe->fd = rfd;
235 srv->writePipe = new Comm::Connection;
236 srv->writePipe->fd = wfd;
237 srv->rbuf = (char *)memAllocBuf(ReadBufMinSize, &srv->rbuf_sz);
238 srv->wqueue = new MemBuf;
239 srv->roffset = 0;
240 srv->requests = (helper_request **)xcalloc(hlp->childs.concurrency ? hlp->childs.concurrency : 1, sizeof(*srv->requests));
241 srv->parent = cbdataReference(hlp);
242 dlinkAddTail(srv, &srv->link, &hlp->servers);
243
244 if (rfd == wfd) {
245 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
246 fd_note(rfd, fd_note_buf);
247 } else {
248 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
249 fd_note(rfd, fd_note_buf);
250 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
251 fd_note(wfd, fd_note_buf);
252 }
253
254 commSetNonBlocking(rfd);
255
256 if (wfd != rfd)
257 commSetNonBlocking(wfd);
258
259 AsyncCall::Pointer closeCall = asyncCall(5,4, "helperServerFree", cbdataDialer(helperServerFree, srv));
260 comm_add_close_handler(rfd, closeCall);
261
262 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
263 CommIoCbPtrFun(helperHandleRead, srv));
264 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
265 }
266
267 hlp->last_restart = squid_curtime;
268 safe_free(shortname);
269 safe_free(procname);
270 helperKickQueue(hlp);
271 }
272
273 /**
274 * DPW 2007-05-08
275 *
276 * helperStatefulOpenServers: create the stateful child helper processes
277 */
278 void
279 helperStatefulOpenServers(statefulhelper * hlp)
280 {
281 char *shortname;
282 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
283 char fd_note_buf[FD_DESC_SZ];
284 int nargs = 0;
285
286 if (hlp->cmdline == NULL)
287 return;
288
289 if (hlp->childs.concurrency)
290 debugs(84, DBG_CRITICAL, "ERROR: concurrency= is not yet supported for stateful helpers ('" << hlp->cmdline << "')");
291
292 char *progname = hlp->cmdline->key;
293
294 char *s;
295 if ((s = strrchr(progname, '/')))
296 shortname = xstrdup(s + 1);
297 else
298 shortname = xstrdup(progname);
299
300 /* figure out haw mant new helpers are needed. */
301 int need_new = hlp->childs.needNew();
302
303 debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
304
305 if (need_new < 1) {
306 debugs(84, DBG_IMPORTANT, "helperStatefulOpenServers: No '" << shortname << "' processes needed.");
307 }
308
309 char *procname = (char *)xmalloc(strlen(shortname) + 3);
310
311 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
312
313 args[nargs] = procname;
314 ++nargs;
315
316 for (wordlist *w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
317 args[nargs] = w->key;
318 ++nargs;
319 }
320
321 args[nargs] = NULL;
322 ++nargs;
323
324 assert(nargs <= HELPER_MAX_ARGS);
325
326 for (int k = 0; k < need_new; ++k) {
327 getCurrentTime();
328 int rfd = -1;
329 int wfd = -1;
330 void * hIpc;
331 pid_t pid = ipcCreate(hlp->ipc_type,
332 progname,
333 args,
334 shortname,
335 hlp->addr,
336 &rfd,
337 &wfd,
338 &hIpc);
339
340 if (pid < 0) {
341 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
342 continue;
343 }
344
345 ++ hlp->childs.n_running;
346 ++ hlp->childs.n_active;
347 CBDATA_INIT_TYPE(helper_stateful_server);
348 helper_stateful_server *srv = cbdataAlloc(helper_stateful_server);
349 srv->hIpc = hIpc;
350 srv->pid = pid;
351 srv->flags.reserved = false;
352 srv->initStats();
353 srv->index = k;
354 srv->addr = hlp->addr;
355 srv->readPipe = new Comm::Connection;
356 srv->readPipe->fd = rfd;
357 srv->writePipe = new Comm::Connection;
358 srv->writePipe->fd = wfd;
359 srv->rbuf = (char *)memAllocBuf(ReadBufMinSize, &srv->rbuf_sz);
360 srv->roffset = 0;
361 srv->parent = cbdataReference(hlp);
362
363 if (hlp->datapool != NULL)
364 srv->data = hlp->datapool->alloc();
365
366 dlinkAddTail(srv, &srv->link, &hlp->servers);
367
368 if (rfd == wfd) {
369 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
370 fd_note(rfd, fd_note_buf);
371 } else {
372 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
373 fd_note(rfd, fd_note_buf);
374 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
375 fd_note(wfd, fd_note_buf);
376 }
377
378 commSetNonBlocking(rfd);
379
380 if (wfd != rfd)
381 commSetNonBlocking(wfd);
382
383 AsyncCall::Pointer closeCall = asyncCall(5,4, "helperStatefulServerFree", cbdataDialer(helperStatefulServerFree, srv));
384 comm_add_close_handler(rfd, closeCall);
385
386 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
387 CommIoCbPtrFun(helperStatefulHandleRead, srv));
388 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
389 }
390
391 hlp->last_restart = squid_curtime;
392 safe_free(shortname);
393 safe_free(procname);
394 helperStatefulKickQueue(hlp);
395 }
396
397 void
398 helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
399 {
400 if (hlp == NULL) {
401 debugs(84, 3, "helperSubmit: hlp == NULL");
402 HelperReply nilReply;
403 callback(data, nilReply);
404 return;
405 }
406
407 helper_request *r = new helper_request;
408 helper_server *srv;
409
410 r->callback = callback;
411 r->data = cbdataReference(data);
412 r->buf = xstrdup(buf);
413
414 if ((srv = GetFirstAvailable(hlp)))
415 helperDispatch(srv, r);
416 else
417 Enqueue(hlp, r);
418
419 debugs(84, DBG_DATA, Raw("buf", buf, strlen(buf)));
420 }
421
422 /// lastserver = "server last used as part of a reserved request sequence"
423 void
424 helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPCB * callback, void *data, helper_stateful_server * lastserver)
425 {
426 if (hlp == NULL) {
427 debugs(84, 3, "helperStatefulSubmit: hlp == NULL");
428 HelperReply nilReply;
429 callback(data, nilReply);
430 return;
431 }
432
433 helper_stateful_request *r = new helper_stateful_request;
434
435 r->callback = callback;
436 r->data = cbdataReference(data);
437
438 if (buf != NULL) {
439 r->buf = xstrdup(buf);
440 r->placeholder = 0;
441 } else {
442 r->buf = NULL;
443 r->placeholder = 1;
444 }
445
446 if ((buf != NULL) && lastserver) {
447 debugs(84, 5, "StatefulSubmit with lastserver " << lastserver);
448 assert(lastserver->flags.reserved);
449 assert(!(lastserver->request));
450
451 debugs(84, 5, "StatefulSubmit dispatching");
452 helperStatefulDispatch(lastserver, r);
453 } else {
454 helper_stateful_server *srv;
455 if ((srv = StatefulGetFirstAvailable(hlp))) {
456 helperStatefulDispatch(srv, r);
457 } else
458 StatefulEnqueue(hlp, r);
459 }
460
461 debugs(84, DBG_DATA, "placeholder: '" << r->placeholder <<
462 "', " << Raw("buf", buf, strlen(buf)));
463 }
464
465 /**
466 * DPW 2007-05-08
467 *
468 * helperStatefulReleaseServer tells the helper that whoever was
469 * using it no longer needs its services.
470 */
471 void
472 helperStatefulReleaseServer(helper_stateful_server * srv)
473 {
474 debugs(84, 3, HERE << "srv-" << srv->index << " flags.reserved = " << srv->flags.reserved);
475 if (!srv->flags.reserved)
476 return;
477
478 ++ srv->stats.releases;
479
480 srv->flags.reserved = false;
481 if (srv->parent->OnEmptyQueue != NULL && srv->data)
482 srv->parent->OnEmptyQueue(srv->data);
483
484 helperStatefulServerDone(srv);
485 }
486
487 /** return a pointer to the stateful routines data area */
488 void *
489 helperStatefulServerGetData(helper_stateful_server * srv)
490 {
491 return srv->data;
492 }
493
494 /**
495 * Dump some stats about the helper states to a StoreEntry
496 */
497 void
498 helperStats(StoreEntry * sentry, helper * hlp, const char *label)
499 {
500 if (!helperStartStats(sentry, hlp, label))
501 return;
502
503 storeAppendPrintf(sentry, "program: %s\n",
504 hlp->cmdline->key);
505 storeAppendPrintf(sentry, "number active: %d of %d (%d shutting down)\n",
506 hlp->childs.n_active, hlp->childs.n_max, (hlp->childs.n_running - hlp->childs.n_active) );
507 storeAppendPrintf(sentry, "requests sent: %d\n",
508 hlp->stats.requests);
509 storeAppendPrintf(sentry, "replies received: %d\n",
510 hlp->stats.replies);
511 storeAppendPrintf(sentry, "queue length: %d\n",
512 hlp->stats.queue_size);
513 storeAppendPrintf(sentry, "avg service time: %d msec\n",
514 hlp->stats.avg_svc_time);
515 storeAppendPrintf(sentry, "\n");
516 storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%11s\t%s\t%7s\t%7s\t%7s\n",
517 "#",
518 "FD",
519 "PID",
520 "# Requests",
521 "# Replies",
522 "Flags",
523 "Time",
524 "Offset",
525 "Request");
526
527 for (dlink_node *link = hlp->servers.head; link; link = link->next) {
528 helper_server *srv = (helper_server*)link->data;
529 double tt = 0.001 * (srv->requests[0] ? tvSubMsec(srv->requests[0]->dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
530 storeAppendPrintf(sentry, "%7d\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c\t%7.3f\t%7d\t%s\n",
531 srv->index + 1,
532 srv->readPipe->fd,
533 srv->pid,
534 srv->stats.uses,
535 srv->stats.replies,
536 srv->stats.pending ? 'B' : ' ',
537 srv->flags.writing ? 'W' : ' ',
538 srv->flags.closing ? 'C' : ' ',
539 srv->flags.shutdown ? 'S' : ' ',
540 tt < 0.0 ? 0.0 : tt,
541 (int) srv->roffset,
542 srv->requests[0] ? Format::QuoteMimeBlob(srv->requests[0]->buf) : "(none)");
543 }
544
545 storeAppendPrintf(sentry, "\nFlags key:\n\n");
546 storeAppendPrintf(sentry, " B = BUSY\n");
547 storeAppendPrintf(sentry, " W = WRITING\n");
548 storeAppendPrintf(sentry, " C = CLOSING\n");
549 storeAppendPrintf(sentry, " S = SHUTDOWN PENDING\n");
550 }
551
552 void
553 helperStatefulStats(StoreEntry * sentry, statefulhelper * hlp, const char *label)
554 {
555 if (!helperStartStats(sentry, hlp, label))
556 return;
557
558 storeAppendPrintf(sentry, "program: %s\n",
559 hlp->cmdline->key);
560 storeAppendPrintf(sentry, "number active: %d of %d (%d shutting down)\n",
561 hlp->childs.n_active, hlp->childs.n_max, (hlp->childs.n_running - hlp->childs.n_active) );
562 storeAppendPrintf(sentry, "requests sent: %d\n",
563 hlp->stats.requests);
564 storeAppendPrintf(sentry, "replies received: %d\n",
565 hlp->stats.replies);
566 storeAppendPrintf(sentry, "queue length: %d\n",
567 hlp->stats.queue_size);
568 storeAppendPrintf(sentry, "avg service time: %d msec\n",
569 hlp->stats.avg_svc_time);
570 storeAppendPrintf(sentry, "\n");
571 storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
572 "#",
573 "FD",
574 "PID",
575 "# Requests",
576 "# Replies",
577 "Flags",
578 "Time",
579 "Offset",
580 "Request");
581
582 for (dlink_node *link = hlp->servers.head; link; link = link->next) {
583 helper_stateful_server *srv = (helper_stateful_server *)link->data;
584 double tt = 0.001 * tvSubMsec(srv->dispatch_time, srv->flags.busy ? current_time : srv->answer_time);
585 storeAppendPrintf(sentry, "%7d\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
586 srv->index + 1,
587 srv->readPipe->fd,
588 srv->pid,
589 srv->stats.uses,
590 srv->stats.replies,
591 srv->flags.busy ? 'B' : ' ',
592 srv->flags.closing ? 'C' : ' ',
593 srv->flags.reserved ? 'R' : ' ',
594 srv->flags.shutdown ? 'S' : ' ',
595 srv->request ? (srv->request->placeholder ? 'P' : ' ') : ' ',
596 tt < 0.0 ? 0.0 : tt,
597 (int) srv->roffset,
598 srv->request ? Format::QuoteMimeBlob(srv->request->buf) : "(none)");
599 }
600
601 storeAppendPrintf(sentry, "\nFlags key:\n\n");
602 storeAppendPrintf(sentry, " B = BUSY\n");
603 storeAppendPrintf(sentry, " C = CLOSING\n");
604 storeAppendPrintf(sentry, " R = RESERVED\n");
605 storeAppendPrintf(sentry, " S = SHUTDOWN PENDING\n");
606 storeAppendPrintf(sentry, " P = PLACEHOLDER\n");
607 }
608
609 void
610 helperShutdown(helper * hlp)
611 {
612 dlink_node *link = hlp->servers.head;
613
614 while (link) {
615 helper_server *srv;
616 srv = (helper_server *)link->data;
617 link = link->next;
618
619 if (srv->flags.shutdown) {
620 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " has already SHUT DOWN.");
621 continue;
622 }
623
624 assert(hlp->childs.n_active > 0);
625 -- hlp->childs.n_active;
626 srv->flags.shutdown = true; /* request it to shut itself down */
627
628 if (srv->flags.closing) {
629 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is CLOSING.");
630 continue;
631 }
632
633 if (srv->stats.pending) {
634 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is BUSY.");
635 continue;
636 }
637
638 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " shutting down.");
639 /* the rest of the details is dealt with in the helperServerFree
640 * close handler
641 */
642 srv->closePipesSafely();
643 }
644 }
645
646 void
647 helperStatefulShutdown(statefulhelper * hlp)
648 {
649 dlink_node *link = hlp->servers.head;
650 helper_stateful_server *srv;
651
652 while (link) {
653 srv = (helper_stateful_server *)link->data;
654 link = link->next;
655
656 if (srv->flags.shutdown) {
657 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " has already SHUT DOWN.");
658 continue;
659 }
660
661 assert(hlp->childs.n_active > 0);
662 -- hlp->childs.n_active;
663 srv->flags.shutdown = true; /* request it to shut itself down */
664
665 if (srv->flags.busy) {
666 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is BUSY.");
667 continue;
668 }
669
670 if (srv->flags.closing) {
671 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is CLOSING.");
672 continue;
673 }
674
675 if (srv->flags.reserved) {
676 if (shutting_down) {
677 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is RESERVED. Closing anyway.");
678 } else {
679 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is RESERVED. Not Shutting Down Yet.");
680 continue;
681 }
682 }
683
684 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " shutting down.");
685
686 /* the rest of the details is dealt with in the helperStatefulServerFree
687 * close handler
688 */
689 srv->closePipesSafely();
690 }
691 }
692
693 helper::~helper()
694 {
695 /* note, don't free id_name, it probably points to static memory */
696
697 if (queue.head)
698 debugs(84, DBG_CRITICAL, "WARNING: freeing " << id_name << " helper with " << stats.queue_size << " requests queued");
699 }
700
701 /* ====================================================================== */
702 /* LOCAL FUNCTIONS */
703 /* ====================================================================== */
704
705 static void
706 helperServerFree(helper_server *srv)
707 {
708 helper *hlp = srv->parent;
709 helper_request *r;
710 int i, concurrency = hlp->childs.concurrency;
711
712 if (!concurrency)
713 concurrency = 1;
714
715 if (srv->rbuf) {
716 memFreeBuf(srv->rbuf_sz, srv->rbuf);
717 srv->rbuf = NULL;
718 }
719
720 srv->wqueue->clean();
721 delete srv->wqueue;
722
723 if (srv->writebuf) {
724 srv->writebuf->clean();
725 delete srv->writebuf;
726 srv->writebuf = NULL;
727 }
728
729 if (Comm::IsConnOpen(srv->writePipe))
730 srv->closeWritePipeSafely();
731
732 dlinkDelete(&srv->link, &hlp->servers);
733
734 assert(hlp->childs.n_running > 0);
735 -- hlp->childs.n_running;
736
737 if (!srv->flags.shutdown) {
738 assert(hlp->childs.n_active > 0);
739 -- hlp->childs.n_active;
740 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index + 1 << " exited");
741
742 if (hlp->childs.needNew() > 0) {
743 debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
744
745 if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30) {
746 if (srv->stats.replies < 1)
747 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
748 else
749 debugs(80, DBG_CRITICAL, "ERROR: The " << hlp->id_name << " helpers are crashing too rapidly, need help!");
750 }
751
752 debugs(80, DBG_IMPORTANT, "Starting new helpers");
753 helperOpenServers(hlp);
754 }
755 }
756
757 for (i = 0; i < concurrency; ++i) {
758 // XXX: re-schedule these on another helper?
759 if ((r = srv->requests[i])) {
760 void *cbdata;
761
762 if (cbdataReferenceValidDone(r->data, &cbdata)) {
763 HelperReply nilReply;
764 r->callback(cbdata, nilReply);
765 }
766
767 helperRequestFree(r);
768
769 srv->requests[i] = NULL;
770 }
771 }
772 safe_free(srv->requests);
773
774 cbdataReferenceDone(srv->parent);
775 delete srv;
776 }
777
778 static void
779 helperStatefulServerFree(helper_stateful_server *srv)
780 {
781 statefulhelper *hlp = srv->parent;
782 helper_stateful_request *r;
783
784 if (srv->rbuf) {
785 memFreeBuf(srv->rbuf_sz, srv->rbuf);
786 srv->rbuf = NULL;
787 }
788
789 #if 0
790 srv->wqueue->clean();
791
792 delete srv->wqueue;
793
794 #endif
795
796 /* TODO: walk the local queue of requests and carry them all out */
797 if (Comm::IsConnOpen(srv->writePipe))
798 srv->closeWritePipeSafely();
799
800 dlinkDelete(&srv->link, &hlp->servers);
801
802 assert(hlp->childs.n_running > 0);
803 -- hlp->childs.n_running;
804
805 if (!srv->flags.shutdown) {
806 assert( hlp->childs.n_active > 0);
807 -- hlp->childs.n_active;
808 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index + 1 << " exited");
809
810 if (hlp->childs.needNew() > 0) {
811 debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
812
813 if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30) {
814 if (srv->stats.replies < 1)
815 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
816 else
817 debugs(80, DBG_CRITICAL, "ERROR: The " << hlp->id_name << " helpers are crashing too rapidly, need help!");
818 }
819
820 debugs(80, DBG_IMPORTANT, "Starting new helpers");
821 helperStatefulOpenServers(hlp);
822 }
823 }
824
825 if ((r = srv->request)) {
826 void *cbdata;
827
828 if (cbdataReferenceValidDone(r->data, &cbdata)) {
829 HelperReply nilReply;
830 nilReply.whichServer = srv;
831 r->callback(cbdata, nilReply);
832 }
833
834 helperStatefulRequestFree(r);
835
836 srv->request = NULL;
837 }
838
839 if (srv->data != NULL)
840 hlp->datapool->freeOne(srv->data);
841
842 cbdataReferenceDone(srv->parent);
843
844 delete srv;
845 }
846
847 /// Calls back with a pointer to the buffer with the helper output
848 static void
849 helperReturnBuffer(int request_number, helper_server * srv, helper * hlp, char * msg, char * msg_end)
850 {
851 helper_request *r = srv->requests[request_number];
852 if (r) {
853 HLPCB *callback = r->callback;
854
855 srv->requests[request_number] = NULL;
856
857 r->callback = NULL;
858
859 void *cbdata = NULL;
860 if (cbdataReferenceValidDone(r->data, &cbdata)) {
861 HelperReply response(msg, (msg_end-msg));
862 callback(cbdata, response);
863 }
864
865 -- srv->stats.pending;
866 ++ srv->stats.replies;
867
868 ++ hlp->stats.replies;
869
870 srv->answer_time = current_time;
871
872 srv->dispatch_time = r->dispatch_time;
873
874 hlp->stats.avg_svc_time =
875 Math::intAverage(hlp->stats.avg_svc_time,
876 tvSubMsec(r->dispatch_time, current_time),
877 hlp->stats.replies, REDIRECT_AV_FACTOR);
878
879 helperRequestFree(r);
880 } else {
881 debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected reply on channel " <<
882 request_number << " from " << hlp->id_name << " #" << srv->index + 1 <<
883 " '" << srv->rbuf << "'");
884 }
885
886 if (!srv->flags.shutdown) {
887 helperKickQueue(hlp);
888 } else if (!srv->flags.closing && !srv->stats.pending) {
889 srv->flags.closing=true;
890 srv->writePipe->close();
891 }
892 }
893
894 static void
895 helperHandleRead(const Comm::ConnectionPointer &conn, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
896 {
897 char *t = NULL;
898 helper_server *srv = (helper_server *)data;
899 helper *hlp = srv->parent;
900 assert(cbdataReferenceValid(data));
901
902 /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
903
904 if (flag == COMM_ERR_CLOSING) {
905 return;
906 }
907
908 assert(conn->fd == srv->readPipe->fd);
909
910 debugs(84, 5, "helperHandleRead: " << len << " bytes from " << hlp->id_name << " #" << srv->index + 1);
911
912 if (flag != COMM_OK || len == 0) {
913 srv->closePipesSafely();
914 return;
915 }
916
917 srv->roffset += len;
918 srv->rbuf[srv->roffset] = '\0';
919 debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
920
921 if (!srv->stats.pending) {
922 /* someone spoke without being spoken to */
923 debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected read from " <<
924 hlp->id_name << " #" << srv->index + 1 << ", " << (int)len <<
925 " bytes '" << srv->rbuf << "'");
926
927 srv->roffset = 0;
928 srv->rbuf[0] = '\0';
929 }
930
931 while ((t = strchr(srv->rbuf, hlp->eom))) {
932 /* end of reply found */
933 char *msg = srv->rbuf;
934 int i = 0;
935 int skip = 1;
936 debugs(84, 3, "helperHandleRead: end of reply found");
937
938 if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n') {
939 *t = '\0';
940 // rewind to the \r octet which is the real terminal now
941 // and remember that we have to skip forward 2 places now.
942 skip = 2;
943 --t;
944 }
945
946 *t = '\0';
947
948 if (hlp->childs.concurrency) {
949 i = strtol(msg, &msg, 10);
950
951 while (*msg && xisspace(*msg))
952 ++msg;
953 }
954
955 helperReturnBuffer(i, srv, hlp, msg, t);
956 srv->roffset -= (t - srv->rbuf) + skip;
957 memmove(srv->rbuf, t + skip, srv->roffset);
958 srv->rbuf[srv->roffset] = '\0';
959 }
960
961 if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
962 int spaceSize = srv->rbuf_sz - srv->roffset - 1;
963 assert(spaceSize >= 0);
964
965 // grow the input buffer if needed and possible
966 if (!spaceSize && srv->rbuf_sz + 4096 <= ReadBufMaxSize) {
967 srv->rbuf = (char *)memReallocBuf(srv->rbuf, srv->rbuf_sz + 4096, &srv->rbuf_sz);
968 debugs(84, 3, HERE << "Grew read buffer to " << srv->rbuf_sz);
969 spaceSize = srv->rbuf_sz - srv->roffset - 1;
970 assert(spaceSize >= 0);
971 }
972
973 // quit reading if there is no space left
974 if (!spaceSize) {
975 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
976 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
977 "Squid input buffer: " << hlp->id_name << " #" <<
978 (srv->index + 1));
979 srv->closePipesSafely();
980 return;
981 }
982
983 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
984 CommIoCbPtrFun(helperHandleRead, srv));
985 comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
986 }
987 }
988
989 static void
990 helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
991 {
992 char *t = NULL;
993 helper_stateful_server *srv = (helper_stateful_server *)data;
994 helper_stateful_request *r;
995 statefulhelper *hlp = srv->parent;
996 assert(cbdataReferenceValid(data));
997
998 /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
999
1000 if (flag == COMM_ERR_CLOSING) {
1001 return;
1002 }
1003
1004 assert(conn->fd == srv->readPipe->fd);
1005
1006 debugs(84, 5, "helperStatefulHandleRead: " << len << " bytes from " <<
1007 hlp->id_name << " #" << srv->index + 1);
1008
1009 if (flag != COMM_OK || len == 0) {
1010 srv->closePipesSafely();
1011 return;
1012 }
1013
1014 srv->roffset += len;
1015 srv->rbuf[srv->roffset] = '\0';
1016 r = srv->request;
1017 debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
1018
1019 if (r == NULL) {
1020 /* someone spoke without being spoken to */
1021 debugs(84, DBG_IMPORTANT, "helperStatefulHandleRead: unexpected read from " <<
1022 hlp->id_name << " #" << srv->index + 1 << ", " << (int)len <<
1023 " bytes '" << srv->rbuf << "'");
1024
1025 srv->roffset = 0;
1026 }
1027
1028 if ((t = strchr(srv->rbuf, hlp->eom))) {
1029 /* end of reply found */
1030 int called = 1;
1031 int skip = 1;
1032 debugs(84, 3, "helperStatefulHandleRead: end of reply found");
1033
1034 if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n') {
1035 *t = '\0';
1036 // rewind to the \r octet which is the real terminal now
1037 // and remember that we have to skip forward 2 places now.
1038 skip = 2;
1039 --t;
1040 }
1041
1042 *t = '\0';
1043
1044 if (r && cbdataReferenceValid(r->data)) {
1045 HelperReply res(srv->rbuf, (t - srv->rbuf));
1046 res.whichServer = srv;
1047 r->callback(r->data, res);
1048 } else {
1049 debugs(84, DBG_IMPORTANT, "StatefulHandleRead: no callback data registered");
1050 called = 0;
1051 }
1052 // only skip off the \0's _after_ passing its location in HelperReply above
1053 t += skip;
1054
1055 srv->flags.busy = false;
1056 /**
1057 * BUG: the below assumes that only one response per read() was received and discards any octets remaining.
1058 * Doing this prohibits concurrency support with multiple replies per read().
1059 * TODO: check that read() setup on these buffers pays attention to roffest!=0
1060 * TODO: check that replies bigger than the buffer are discarded and do not to affect future replies
1061 */
1062 srv->roffset = 0;
1063 helperStatefulRequestFree(r);
1064 srv->request = NULL;
1065
1066 -- srv->stats.pending;
1067 ++ srv->stats.replies;
1068
1069 ++ hlp->stats.replies;
1070 srv->answer_time = current_time;
1071 hlp->stats.avg_svc_time =
1072 Math::intAverage(hlp->stats.avg_svc_time,
1073 tvSubMsec(srv->dispatch_time, current_time),
1074 hlp->stats.replies, REDIRECT_AV_FACTOR);
1075
1076 if (called)
1077 helperStatefulServerDone(srv);
1078 else
1079 helperStatefulReleaseServer(srv);
1080 }
1081
1082 if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
1083 int spaceSize = srv->rbuf_sz - srv->roffset - 1;
1084 assert(spaceSize >= 0);
1085
1086 // grow the input buffer if needed and possible
1087 if (!spaceSize && srv->rbuf_sz + 4096 <= ReadBufMaxSize) {
1088 srv->rbuf = (char *)memReallocBuf(srv->rbuf, srv->rbuf_sz + 4096, &srv->rbuf_sz);
1089 debugs(84, 3, HERE << "Grew read buffer to " << srv->rbuf_sz);
1090 spaceSize = srv->rbuf_sz - srv->roffset - 1;
1091 assert(spaceSize >= 0);
1092 }
1093
1094 // quit reading if there is no space left
1095 if (!spaceSize) {
1096 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
1097 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
1098 "Squid input buffer: " << hlp->id_name << " #" <<
1099 (srv->index + 1));
1100 srv->closePipesSafely();
1101 return;
1102 }
1103
1104 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
1105 CommIoCbPtrFun(helperStatefulHandleRead, srv));
1106 comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
1107 }
1108 }
1109
1110 static void
1111 Enqueue(helper * hlp, helper_request * r)
1112 {
1113 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
1114 dlinkAddTail(r, link, &hlp->queue);
1115 ++ hlp->stats.queue_size;
1116
1117 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1118 if (hlp->childs.needNew() > 0) {
1119 debugs(84, DBG_CRITICAL, "Starting new " << hlp->id_name << " helpers...");
1120 helperOpenServers(hlp);
1121 return;
1122 }
1123
1124 if (hlp->stats.queue_size < (int)hlp->childs.n_running)
1125 return;
1126
1127 if (squid_curtime - hlp->last_queue_warn < 600)
1128 return;
1129
1130 if (shutting_down || reconfiguring)
1131 return;
1132
1133 hlp->last_queue_warn = squid_curtime;
1134
1135 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1136 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1137 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1138
1139 if (hlp->stats.queue_size > (int)hlp->childs.n_running * 2)
1140 fatalf("Too many queued %s requests", hlp->id_name);
1141 }
1142
1143 static void
1144 StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r)
1145 {
1146 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
1147 dlinkAddTail(r, link, &hlp->queue);
1148 ++ hlp->stats.queue_size;
1149
1150 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1151 if (hlp->childs.needNew() > 0) {
1152 debugs(84, DBG_CRITICAL, "Starting new " << hlp->id_name << " helpers...");
1153 helperStatefulOpenServers(hlp);
1154 return;
1155 }
1156
1157 if (hlp->stats.queue_size < (int)hlp->childs.n_running)
1158 return;
1159
1160 if (hlp->stats.queue_size > (int)hlp->childs.n_running * 2)
1161 fatalf("Too many queued %s requests", hlp->id_name);
1162
1163 if (squid_curtime - hlp->last_queue_warn < 600)
1164 return;
1165
1166 if (shutting_down || reconfiguring)
1167 return;
1168
1169 hlp->last_queue_warn = squid_curtime;
1170
1171 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1172 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1173 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1174 }
1175
1176 static helper_request *
1177 Dequeue(helper * hlp)
1178 {
1179 dlink_node *link;
1180 helper_request *r = NULL;
1181
1182 if ((link = hlp->queue.head)) {
1183 r = (helper_request *)link->data;
1184 dlinkDelete(link, &hlp->queue);
1185 memFree(link, MEM_DLINK_NODE);
1186 -- hlp->stats.queue_size;
1187 }
1188
1189 return r;
1190 }
1191
1192 static helper_stateful_request *
1193 StatefulDequeue(statefulhelper * hlp)
1194 {
1195 dlink_node *link;
1196 helper_stateful_request *r = NULL;
1197
1198 if ((link = hlp->queue.head)) {
1199 r = (helper_stateful_request *)link->data;
1200 dlinkDelete(link, &hlp->queue);
1201 memFree(link, MEM_DLINK_NODE);
1202 -- hlp->stats.queue_size;
1203 }
1204
1205 return r;
1206 }
1207
1208 static helper_server *
1209 GetFirstAvailable(helper * hlp)
1210 {
1211 dlink_node *n;
1212 helper_server *srv;
1213 helper_server *selected = NULL;
1214 debugs(84, 5, "GetFirstAvailable: Running servers " << hlp->childs.n_running);
1215
1216 if (hlp->childs.n_running == 0)
1217 return NULL;
1218
1219 /* Find "least" loaded helper (approx) */
1220 for (n = hlp->servers.head; n != NULL; n = n->next) {
1221 srv = (helper_server *)n->data;
1222
1223 if (selected && selected->stats.pending <= srv->stats.pending)
1224 continue;
1225
1226 if (srv->flags.shutdown)
1227 continue;
1228
1229 if (!srv->stats.pending)
1230 return srv;
1231
1232 if (selected) {
1233 selected = srv;
1234 break;
1235 }
1236
1237 selected = srv;
1238 }
1239
1240 /* Check for overload */
1241 if (!selected) {
1242 debugs(84, 5, "GetFirstAvailable: None available.");
1243 return NULL;
1244 }
1245
1246 if (selected->stats.pending >= (hlp->childs.concurrency ? hlp->childs.concurrency : 1)) {
1247 debugs(84, 3, "GetFirstAvailable: Least-loaded helper is overloaded!");
1248 return NULL;
1249 }
1250
1251 debugs(84, 5, "GetFirstAvailable: returning srv-" << selected->index);
1252 return selected;
1253 }
1254
1255 static helper_stateful_server *
1256 StatefulGetFirstAvailable(statefulhelper * hlp)
1257 {
1258 dlink_node *n;
1259 helper_stateful_server *srv = NULL;
1260 debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp->childs.n_running);
1261
1262 if (hlp->childs.n_running == 0)
1263 return NULL;
1264
1265 for (n = hlp->servers.head; n != NULL; n = n->next) {
1266 srv = (helper_stateful_server *)n->data;
1267
1268 if (srv->flags.busy)
1269 continue;
1270
1271 if (srv->flags.reserved)
1272 continue;
1273
1274 if (srv->flags.shutdown)
1275 continue;
1276
1277 if ((hlp->IsAvailable != NULL) && (srv->data != NULL) && !(hlp->IsAvailable(srv->data)))
1278 continue;
1279
1280 debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv->index);
1281 return srv;
1282 }
1283
1284 debugs(84, 5, "StatefulGetFirstAvailable: None available.");
1285 return NULL;
1286 }
1287
1288 static void
1289 helperDispatchWriteDone(const Comm::ConnectionPointer &conn, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
1290 {
1291 helper_server *srv = (helper_server *)data;
1292
1293 srv->writebuf->clean();
1294 delete srv->writebuf;
1295 srv->writebuf = NULL;
1296 srv->flags.writing = false;
1297
1298 if (flag != COMM_OK) {
1299 /* Helper server has crashed */
1300 debugs(84, DBG_CRITICAL, "helperDispatch: Helper " << srv->parent->id_name << " #" << srv->index + 1 << " has crashed");
1301 return;
1302 }
1303
1304 if (!srv->wqueue->isNull()) {
1305 srv->writebuf = srv->wqueue;
1306 srv->wqueue = new MemBuf;
1307 srv->flags.writing = true;
1308 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1309 CommIoCbPtrFun(helperDispatchWriteDone, srv));
1310 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
1311 }
1312 }
1313
1314 static void
1315 helperDispatch(helper_server * srv, helper_request * r)
1316 {
1317 helper *hlp = srv->parent;
1318 helper_request **ptr = NULL;
1319 unsigned int slot;
1320
1321 if (!cbdataReferenceValid(r->data)) {
1322 debugs(84, DBG_IMPORTANT, "helperDispatch: invalid callback data");
1323 helperRequestFree(r);
1324 return;
1325 }
1326
1327 for (slot = 0; slot < (hlp->childs.concurrency ? hlp->childs.concurrency : 1); ++slot) {
1328 if (!srv->requests[slot]) {
1329 ptr = &srv->requests[slot];
1330 break;
1331 }
1332 }
1333
1334 assert(ptr);
1335 *ptr = r;
1336 r->dispatch_time = current_time;
1337
1338 if (srv->wqueue->isNull())
1339 srv->wqueue->init();
1340
1341 if (hlp->childs.concurrency)
1342 srv->wqueue->Printf("%d %s", slot, r->buf);
1343 else
1344 srv->wqueue->append(r->buf, strlen(r->buf));
1345
1346 if (!srv->flags.writing) {
1347 assert(NULL == srv->writebuf);
1348 srv->writebuf = srv->wqueue;
1349 srv->wqueue = new MemBuf;
1350 srv->flags.writing = true;
1351 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1352 CommIoCbPtrFun(helperDispatchWriteDone, srv));
1353 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
1354 }
1355
1356 debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index + 1 << ", " << strlen(r->buf) << " bytes");
1357
1358 ++ srv->stats.uses;
1359 ++ srv->stats.pending;
1360 ++ hlp->stats.requests;
1361 }
1362
1363 static void
1364 helperStatefulDispatchWriteDone(const Comm::ConnectionPointer &conn, char *buf, size_t len, comm_err_t flag,
1365 int xerrno, void *data)
1366 {
1367 /* nothing! */
1368 }
1369
1370 static void
1371 helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r)
1372 {
1373 statefulhelper *hlp = srv->parent;
1374
1375 if (!cbdataReferenceValid(r->data)) {
1376 debugs(84, DBG_IMPORTANT, "helperStatefulDispatch: invalid callback data");
1377 helperStatefulRequestFree(r);
1378 helperStatefulReleaseServer(srv);
1379 return;
1380 }
1381
1382 debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index + 1);
1383
1384 if (r->placeholder == 1) {
1385 /* a callback is needed before this request can _use_ a helper. */
1386 /* we don't care about releasing this helper. The request NEVER
1387 * gets to the helper. So we throw away the return code */
1388 HelperReply nilReply;
1389 nilReply.whichServer = srv;
1390 r->callback(r->data, nilReply);
1391 /* throw away the placeholder */
1392 helperStatefulRequestFree(r);
1393 /* and push the queue. Note that the callback may have submitted a new
1394 * request to the helper which is why we test for the request */
1395
1396 if (srv->request == NULL)
1397 helperStatefulServerDone(srv);
1398
1399 return;
1400 }
1401
1402 srv->flags.busy = true;
1403 srv->flags.reserved = true;
1404 srv->request = r;
1405 srv->dispatch_time = current_time;
1406 AsyncCall::Pointer call = commCbCall(5,5, "helperStatefulDispatchWriteDone",
1407 CommIoCbPtrFun(helperStatefulDispatchWriteDone, hlp));
1408 Comm::Write(srv->writePipe, r->buf, strlen(r->buf), call, NULL);
1409 debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
1410 hlp->id_name << " #" << srv->index + 1 << ", " <<
1411 (int) strlen(r->buf) << " bytes");
1412
1413 ++ srv->stats.uses;
1414 ++ srv->stats.pending;
1415 ++ hlp->stats.requests;
1416 }
1417
1418 static void
1419 helperKickQueue(helper * hlp)
1420 {
1421 helper_request *r;
1422 helper_server *srv;
1423
1424 while ((srv = GetFirstAvailable(hlp)) && (r = Dequeue(hlp)))
1425 helperDispatch(srv, r);
1426 }
1427
1428 static void
1429 helperStatefulKickQueue(statefulhelper * hlp)
1430 {
1431 helper_stateful_request *r;
1432 helper_stateful_server *srv;
1433
1434 while ((srv = StatefulGetFirstAvailable(hlp)) && (r = StatefulDequeue(hlp)))
1435 helperStatefulDispatch(srv, r);
1436 }
1437
1438 static void
1439 helperStatefulServerDone(helper_stateful_server * srv)
1440 {
1441 if (!srv->flags.shutdown) {
1442 helperStatefulKickQueue(srv->parent);
1443 } else if (!srv->flags.closing && !srv->flags.reserved && !srv->flags.busy) {
1444 srv->closeWritePipeSafely();
1445 return;
1446 }
1447 }
1448
1449 static void
1450 helperRequestFree(helper_request * r)
1451 {
1452 cbdataReferenceDone(r->data);
1453 xfree(r->buf);
1454 delete r;
1455 }
1456
1457 static void
1458 helperStatefulRequestFree(helper_stateful_request * r)
1459 {
1460 if (r) {
1461 cbdataReferenceDone(r->data);
1462 xfree(r->buf);
1463 delete r;
1464 }
1465 }
1466
1467 // TODO: should helper_ and helper_stateful_ have a common parent?
1468 static bool
1469 helperStartStats(StoreEntry *sentry, void *hlp, const char *label)
1470 {
1471 if (!hlp) {
1472 if (label)
1473 storeAppendPrintf(sentry, "%s: unavailable\n", label);
1474 return false;
1475 }
1476
1477 if (label)
1478 storeAppendPrintf(sentry, "%s:\n", label);
1479
1480 return true;
1481 }