]> git.ipfire.org Git - thirdparty/squid.git/blob - src/helper.cc
Fix concurrency support in stateless helpers: Parse multiple replies correctly.
[thirdparty/squid.git] / src / helper.cc
1 /*
2 * DEBUG: section 84 Helper process maintenance
3 * AUTHOR: Harvest Derived?
4 *
5 * SQUID Web Proxy Cache http://www.squid-cache.org/
6 * ----------------------------------------------------------
7 *
8 * Squid is the result of efforts by numerous individuals from
9 * the Internet community; see the CONTRIBUTORS file for full
10 * details. Many organizations have provided support for Squid's
11 * development; see the SPONSORS file for full details. Squid is
12 * Copyrighted (C) 2001 by the Regents of the University of
13 * California; see the COPYRIGHT file for full details. Squid
14 * incorporates software developed and/or copyrighted by other
15 * sources; see the CREDITS file for full details.
16 *
17 * This program is free software; you can redistribute it and/or modify
18 * it under the terms of the GNU General Public License as published by
19 * the Free Software Foundation; either version 2 of the License, or
20 * (at your option) any later version.
21 *
22 * This program is distributed in the hope that it will be useful,
23 * but WITHOUT ANY WARRANTY; without even the implied warranty of
24 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
25 * GNU General Public License for more details.
26 *
27 * You should have received a copy of the GNU General Public License
28 * along with this program; if not, write to the Free Software
29 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
30 *
31 */
32
33 #include "squid.h"
34 #include "base/AsyncCbdataCalls.h"
35 #include "comm.h"
36 #include "comm/Connection.h"
37 #include "comm/Write.h"
38 #include "fd.h"
39 #include "format/Quoting.h"
40 #include "helper.h"
41 #include "Mem.h"
42 #include "MemBuf.h"
43 #include "SquidIpc.h"
44 #include "SquidMath.h"
45 #include "SquidTime.h"
46 #include "Store.h"
47 #include "wordlist.h"
48
49 #define HELPER_MAX_ARGS 64
50
51 /** Initial Squid input buffer size. Helper responses may exceed this, and
52 * Squid will grow the input buffer as needed, up to ReadBufMaxSize.
53 */
54 const size_t ReadBufMinSize(4*1024);
55
56 /** Maximum safe size of a helper-to-Squid response message plus one.
57 * Squid will warn and close the stream if a helper sends a too-big response.
58 * ssl_crtd helper is known to produce responses of at least 10KB in size.
59 * Some undocumented helpers are known to produce responses exceeding 8KB.
60 */
61 const size_t ReadBufMaxSize(32*1024);
62
63 static IOCB helperHandleRead;
64 static IOCB helperStatefulHandleRead;
65 static void helperServerFree(helper_server *srv);
66 static void helperStatefulServerFree(helper_stateful_server *srv);
67 static void Enqueue(helper * hlp, helper_request *);
68 static helper_request *Dequeue(helper * hlp);
69 static helper_stateful_request *StatefulDequeue(statefulhelper * hlp);
70 static helper_server *GetFirstAvailable(helper * hlp);
71 static helper_stateful_server *StatefulGetFirstAvailable(statefulhelper * hlp);
72 static void helperDispatch(helper_server * srv, helper_request * r);
73 static void helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r);
74 static void helperKickQueue(helper * hlp);
75 static void helperStatefulKickQueue(statefulhelper * hlp);
76 static void helperStatefulServerDone(helper_stateful_server * srv);
77 static void helperRequestFree(helper_request * r);
78 static void helperStatefulRequestFree(helper_stateful_request * r);
79 static void StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r);
80 static bool helperStartStats(StoreEntry *sentry, void *hlp, const char *label);
81
82 CBDATA_CLASS_INIT(helper);
83 CBDATA_TYPE(helper_server);
84 CBDATA_CLASS_INIT(statefulhelper);
85 CBDATA_TYPE(helper_stateful_server);
86
87 void
88 HelperServerBase::initStats()
89 {
90 stats.uses=0;
91 stats.replies=0;
92 stats.pending=0;
93 stats.releases=0;
94 }
95
96 void
97 HelperServerBase::closePipesSafely()
98 {
99 #if _SQUID_WINDOWS_
100 int no = index + 1;
101
102 shutdown(writePipe->fd, SD_BOTH);
103 #endif
104
105 flags.closing = true;
106 if (readPipe->fd == writePipe->fd)
107 readPipe->fd = -1;
108 else
109 readPipe->close();
110 writePipe->close();
111
112 #if _SQUID_WINDOWS_
113 if (hIpc) {
114 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
115 getCurrentTime();
116 debugs(84, DBG_IMPORTANT, "WARNING: " << hlp->id_name <<
117 " #" << no << " (" << hlp->cmdline->key << "," <<
118 (long int)pid << ") didn't exit in 5 seconds");
119 }
120 CloseHandle(hIpc);
121 }
122 #endif
123 }
124
125 void
126 HelperServerBase::closeWritePipeSafely()
127 {
128 #if _SQUID_WINDOWS_
129 int no = index + 1;
130
131 shutdown(writePipe->fd, (readPipe->fd == writePipe->fd ? SD_BOTH : SD_SEND));
132 #endif
133
134 flags.closing = true;
135 if (readPipe->fd == writePipe->fd)
136 readPipe->fd = -1;
137 writePipe->close();
138
139 #if _SQUID_WINDOWS_
140 if (hIpc) {
141 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
142 getCurrentTime();
143 debugs(84, DBG_IMPORTANT, "WARNING: " << hlp->id_name <<
144 " #" << no << " (" << hlp->cmdline->key << "," <<
145 (long int)pid << ") didn't exit in 5 seconds");
146 }
147 CloseHandle(hIpc);
148 }
149 #endif
150 }
151
152 void
153 helperOpenServers(helper * hlp)
154 {
155 char *s;
156 char *progname;
157 char *shortname;
158 char *procname;
159 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
160 char fd_note_buf[FD_DESC_SZ];
161 helper_server *srv;
162 int nargs = 0;
163 int k;
164 pid_t pid;
165 int rfd;
166 int wfd;
167 void * hIpc;
168 wordlist *w;
169
170 if (hlp->cmdline == NULL)
171 return;
172
173 progname = hlp->cmdline->key;
174
175 if ((s = strrchr(progname, '/')))
176 shortname = xstrdup(s + 1);
177 else
178 shortname = xstrdup(progname);
179
180 /* figure out how many new child are actually needed. */
181 int need_new = hlp->childs.needNew();
182
183 debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
184
185 if (need_new < 1) {
186 debugs(84, DBG_IMPORTANT, "helperOpenServers: No '" << shortname << "' processes needed.");
187 }
188
189 procname = (char *)xmalloc(strlen(shortname) + 3);
190
191 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
192
193 args[nargs] = procname;
194 ++nargs;
195
196 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
197 args[nargs] = w->key;
198 ++nargs;
199 }
200
201 args[nargs] = NULL;
202 ++nargs;
203
204 assert(nargs <= HELPER_MAX_ARGS);
205
206 for (k = 0; k < need_new; ++k) {
207 getCurrentTime();
208 rfd = wfd = -1;
209 pid = ipcCreate(hlp->ipc_type,
210 progname,
211 args,
212 shortname,
213 hlp->addr,
214 &rfd,
215 &wfd,
216 &hIpc);
217
218 if (pid < 0) {
219 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
220 continue;
221 }
222
223 ++ hlp->childs.n_running;
224 ++ hlp->childs.n_active;
225 CBDATA_INIT_TYPE(helper_server);
226 srv = cbdataAlloc(helper_server);
227 srv->hIpc = hIpc;
228 srv->pid = pid;
229 srv->initStats();
230 srv->index = k;
231 srv->addr = hlp->addr;
232 srv->readPipe = new Comm::Connection;
233 srv->readPipe->fd = rfd;
234 srv->writePipe = new Comm::Connection;
235 srv->writePipe->fd = wfd;
236 srv->rbuf = (char *)memAllocBuf(ReadBufMinSize, &srv->rbuf_sz);
237 srv->wqueue = new MemBuf;
238 srv->roffset = 0;
239 srv->requests = (helper_request **)xcalloc(hlp->childs.concurrency ? hlp->childs.concurrency : 1, sizeof(*srv->requests));
240 srv->parent = cbdataReference(hlp);
241 dlinkAddTail(srv, &srv->link, &hlp->servers);
242
243 if (rfd == wfd) {
244 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
245 fd_note(rfd, fd_note_buf);
246 } else {
247 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
248 fd_note(rfd, fd_note_buf);
249 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
250 fd_note(wfd, fd_note_buf);
251 }
252
253 commSetNonBlocking(rfd);
254
255 if (wfd != rfd)
256 commSetNonBlocking(wfd);
257
258 AsyncCall::Pointer closeCall = asyncCall(5,4, "helperServerFree", cbdataDialer(helperServerFree, srv));
259 comm_add_close_handler(rfd, closeCall);
260
261 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
262 CommIoCbPtrFun(helperHandleRead, srv));
263 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
264 }
265
266 hlp->last_restart = squid_curtime;
267 safe_free(shortname);
268 safe_free(procname);
269 helperKickQueue(hlp);
270 }
271
272 /**
273 * DPW 2007-05-08
274 *
275 * helperStatefulOpenServers: create the stateful child helper processes
276 */
277 void
278 helperStatefulOpenServers(statefulhelper * hlp)
279 {
280 char *shortname;
281 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
282 char fd_note_buf[FD_DESC_SZ];
283 int nargs = 0;
284
285 if (hlp->cmdline == NULL)
286 return;
287
288 if (hlp->childs.concurrency)
289 debugs(84, DBG_CRITICAL, "ERROR: concurrency= is not yet supported for stateful helpers ('" << hlp->cmdline << "')");
290
291 char *progname = hlp->cmdline->key;
292
293 char *s;
294 if ((s = strrchr(progname, '/')))
295 shortname = xstrdup(s + 1);
296 else
297 shortname = xstrdup(progname);
298
299 /* figure out haw mant new helpers are needed. */
300 int need_new = hlp->childs.needNew();
301
302 debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
303
304 if (need_new < 1) {
305 debugs(84, DBG_IMPORTANT, "helperStatefulOpenServers: No '" << shortname << "' processes needed.");
306 }
307
308 char *procname = (char *)xmalloc(strlen(shortname) + 3);
309
310 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
311
312 args[nargs] = procname;
313 ++nargs;
314
315 for (wordlist *w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
316 args[nargs] = w->key;
317 ++nargs;
318 }
319
320 args[nargs] = NULL;
321 ++nargs;
322
323 assert(nargs <= HELPER_MAX_ARGS);
324
325 for (int k = 0; k < need_new; ++k) {
326 getCurrentTime();
327 int rfd = -1;
328 int wfd = -1;
329 void * hIpc;
330 pid_t pid = ipcCreate(hlp->ipc_type,
331 progname,
332 args,
333 shortname,
334 hlp->addr,
335 &rfd,
336 &wfd,
337 &hIpc);
338
339 if (pid < 0) {
340 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
341 continue;
342 }
343
344 ++ hlp->childs.n_running;
345 ++ hlp->childs.n_active;
346 CBDATA_INIT_TYPE(helper_stateful_server);
347 helper_stateful_server *srv = cbdataAlloc(helper_stateful_server);
348 srv->hIpc = hIpc;
349 srv->pid = pid;
350 srv->flags.reserved = false;
351 srv->initStats();
352 srv->index = k;
353 srv->addr = hlp->addr;
354 srv->readPipe = new Comm::Connection;
355 srv->readPipe->fd = rfd;
356 srv->writePipe = new Comm::Connection;
357 srv->writePipe->fd = wfd;
358 srv->rbuf = (char *)memAllocBuf(ReadBufMinSize, &srv->rbuf_sz);
359 srv->roffset = 0;
360 srv->parent = cbdataReference(hlp);
361
362 if (hlp->datapool != NULL)
363 srv->data = hlp->datapool->alloc();
364
365 dlinkAddTail(srv, &srv->link, &hlp->servers);
366
367 if (rfd == wfd) {
368 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
369 fd_note(rfd, fd_note_buf);
370 } else {
371 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
372 fd_note(rfd, fd_note_buf);
373 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
374 fd_note(wfd, fd_note_buf);
375 }
376
377 commSetNonBlocking(rfd);
378
379 if (wfd != rfd)
380 commSetNonBlocking(wfd);
381
382 AsyncCall::Pointer closeCall = asyncCall(5,4, "helperStatefulServerFree", cbdataDialer(helperStatefulServerFree, srv));
383 comm_add_close_handler(rfd, closeCall);
384
385 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
386 CommIoCbPtrFun(helperStatefulHandleRead, srv));
387 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
388 }
389
390 hlp->last_restart = squid_curtime;
391 safe_free(shortname);
392 safe_free(procname);
393 helperStatefulKickQueue(hlp);
394 }
395
396 void
397 helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
398 {
399 if (hlp == NULL) {
400 debugs(84, 3, "helperSubmit: hlp == NULL");
401 HelperReply nilReply;
402 callback(data, nilReply);
403 return;
404 }
405
406 helper_request *r = new helper_request;
407 helper_server *srv;
408
409 r->callback = callback;
410 r->data = cbdataReference(data);
411 r->buf = xstrdup(buf);
412
413 if ((srv = GetFirstAvailable(hlp)))
414 helperDispatch(srv, r);
415 else
416 Enqueue(hlp, r);
417
418 debugs(84, DBG_DATA, Raw("buf", buf, strlen(buf)));
419 }
420
421 /// lastserver = "server last used as part of a reserved request sequence"
422 void
423 helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPCB * callback, void *data, helper_stateful_server * lastserver)
424 {
425 if (hlp == NULL) {
426 debugs(84, 3, "helperStatefulSubmit: hlp == NULL");
427 HelperReply nilReply;
428 callback(data, nilReply);
429 return;
430 }
431
432 helper_stateful_request *r = new helper_stateful_request;
433
434 r->callback = callback;
435 r->data = cbdataReference(data);
436
437 if (buf != NULL) {
438 r->buf = xstrdup(buf);
439 r->placeholder = 0;
440 } else {
441 r->buf = NULL;
442 r->placeholder = 1;
443 }
444
445 if ((buf != NULL) && lastserver) {
446 debugs(84, 5, "StatefulSubmit with lastserver " << lastserver);
447 assert(lastserver->flags.reserved);
448 assert(!(lastserver->request));
449
450 debugs(84, 5, "StatefulSubmit dispatching");
451 helperStatefulDispatch(lastserver, r);
452 } else {
453 helper_stateful_server *srv;
454 if ((srv = StatefulGetFirstAvailable(hlp))) {
455 helperStatefulDispatch(srv, r);
456 } else
457 StatefulEnqueue(hlp, r);
458 }
459
460 debugs(84, DBG_DATA, "placeholder: '" << r->placeholder <<
461 "', " << Raw("buf", buf, strlen(buf)));
462 }
463
464 /**
465 * DPW 2007-05-08
466 *
467 * helperStatefulReleaseServer tells the helper that whoever was
468 * using it no longer needs its services.
469 */
470 void
471 helperStatefulReleaseServer(helper_stateful_server * srv)
472 {
473 debugs(84, 3, HERE << "srv-" << srv->index << " flags.reserved = " << srv->flags.reserved);
474 if (!srv->flags.reserved)
475 return;
476
477 ++ srv->stats.releases;
478
479 srv->flags.reserved = false;
480 if (srv->parent->OnEmptyQueue != NULL && srv->data)
481 srv->parent->OnEmptyQueue(srv->data);
482
483 helperStatefulServerDone(srv);
484 }
485
486 /** return a pointer to the stateful routines data area */
487 void *
488 helperStatefulServerGetData(helper_stateful_server * srv)
489 {
490 return srv->data;
491 }
492
493 /**
494 * Dump some stats about the helper states to a StoreEntry
495 */
496 void
497 helperStats(StoreEntry * sentry, helper * hlp, const char *label)
498 {
499 if (!helperStartStats(sentry, hlp, label))
500 return;
501
502 storeAppendPrintf(sentry, "program: %s\n",
503 hlp->cmdline->key);
504 storeAppendPrintf(sentry, "number active: %d of %d (%d shutting down)\n",
505 hlp->childs.n_active, hlp->childs.n_max, (hlp->childs.n_running - hlp->childs.n_active) );
506 storeAppendPrintf(sentry, "requests sent: %d\n",
507 hlp->stats.requests);
508 storeAppendPrintf(sentry, "replies received: %d\n",
509 hlp->stats.replies);
510 storeAppendPrintf(sentry, "queue length: %d\n",
511 hlp->stats.queue_size);
512 storeAppendPrintf(sentry, "avg service time: %d msec\n",
513 hlp->stats.avg_svc_time);
514 storeAppendPrintf(sentry, "\n");
515 storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%11s\t%s\t%7s\t%7s\t%7s\n",
516 "#",
517 "FD",
518 "PID",
519 "# Requests",
520 "# Replies",
521 "Flags",
522 "Time",
523 "Offset",
524 "Request");
525
526 for (dlink_node *link = hlp->servers.head; link; link = link->next) {
527 helper_server *srv = (helper_server*)link->data;
528 double tt = 0.001 * (srv->requests[0] ? tvSubMsec(srv->requests[0]->dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
529 storeAppendPrintf(sentry, "%7d\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "%c%c%c%c\t%7.3f\t%7d\t%s\n",
530 srv->index + 1,
531 srv->readPipe->fd,
532 srv->pid,
533 srv->stats.uses,
534 srv->stats.replies,
535 srv->stats.pending ? 'B' : ' ',
536 srv->flags.writing ? 'W' : ' ',
537 srv->flags.closing ? 'C' : ' ',
538 srv->flags.shutdown ? 'S' : ' ',
539 tt < 0.0 ? 0.0 : tt,
540 (int) srv->roffset,
541 srv->requests[0] ? Format::QuoteMimeBlob(srv->requests[0]->buf) : "(none)");
542 }
543
544 storeAppendPrintf(sentry, "\nFlags key:\n\n");
545 storeAppendPrintf(sentry, " B = BUSY\n");
546 storeAppendPrintf(sentry, " W = WRITING\n");
547 storeAppendPrintf(sentry, " C = CLOSING\n");
548 storeAppendPrintf(sentry, " S = SHUTDOWN PENDING\n");
549 }
550
551 void
552 helperStatefulStats(StoreEntry * sentry, statefulhelper * hlp, const char *label)
553 {
554 if (!helperStartStats(sentry, hlp, label))
555 return;
556
557 storeAppendPrintf(sentry, "program: %s\n",
558 hlp->cmdline->key);
559 storeAppendPrintf(sentry, "number active: %d of %d (%d shutting down)\n",
560 hlp->childs.n_active, hlp->childs.n_max, (hlp->childs.n_running - hlp->childs.n_active) );
561 storeAppendPrintf(sentry, "requests sent: %d\n",
562 hlp->stats.requests);
563 storeAppendPrintf(sentry, "replies received: %d\n",
564 hlp->stats.replies);
565 storeAppendPrintf(sentry, "queue length: %d\n",
566 hlp->stats.queue_size);
567 storeAppendPrintf(sentry, "avg service time: %d msec\n",
568 hlp->stats.avg_svc_time);
569 storeAppendPrintf(sentry, "\n");
570 storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
571 "#",
572 "FD",
573 "PID",
574 "# Requests",
575 "# Replies",
576 "Flags",
577 "Time",
578 "Offset",
579 "Request");
580
581 for (dlink_node *link = hlp->servers.head; link; link = link->next) {
582 helper_stateful_server *srv = (helper_stateful_server *)link->data;
583 double tt = 0.001 * tvSubMsec(srv->dispatch_time, srv->flags.busy ? current_time : srv->answer_time);
584 storeAppendPrintf(sentry, "%7d\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
585 srv->index + 1,
586 srv->readPipe->fd,
587 srv->pid,
588 srv->stats.uses,
589 srv->stats.replies,
590 srv->flags.busy ? 'B' : ' ',
591 srv->flags.closing ? 'C' : ' ',
592 srv->flags.reserved ? 'R' : ' ',
593 srv->flags.shutdown ? 'S' : ' ',
594 srv->request ? (srv->request->placeholder ? 'P' : ' ') : ' ',
595 tt < 0.0 ? 0.0 : tt,
596 (int) srv->roffset,
597 srv->request ? Format::QuoteMimeBlob(srv->request->buf) : "(none)");
598 }
599
600 storeAppendPrintf(sentry, "\nFlags key:\n\n");
601 storeAppendPrintf(sentry, " B = BUSY\n");
602 storeAppendPrintf(sentry, " C = CLOSING\n");
603 storeAppendPrintf(sentry, " R = RESERVED\n");
604 storeAppendPrintf(sentry, " S = SHUTDOWN PENDING\n");
605 storeAppendPrintf(sentry, " P = PLACEHOLDER\n");
606 }
607
608 void
609 helperShutdown(helper * hlp)
610 {
611 dlink_node *link = hlp->servers.head;
612
613 while (link) {
614 helper_server *srv;
615 srv = (helper_server *)link->data;
616 link = link->next;
617
618 if (srv->flags.shutdown) {
619 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " has already SHUT DOWN.");
620 continue;
621 }
622
623 assert(hlp->childs.n_active > 0);
624 -- hlp->childs.n_active;
625 srv->flags.shutdown = true; /* request it to shut itself down */
626
627 if (srv->flags.closing) {
628 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is CLOSING.");
629 continue;
630 }
631
632 if (srv->stats.pending) {
633 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is BUSY.");
634 continue;
635 }
636
637 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " shutting down.");
638 /* the rest of the details is dealt with in the helperServerFree
639 * close handler
640 */
641 srv->closePipesSafely();
642 }
643 }
644
645 void
646 helperStatefulShutdown(statefulhelper * hlp)
647 {
648 dlink_node *link = hlp->servers.head;
649 helper_stateful_server *srv;
650
651 while (link) {
652 srv = (helper_stateful_server *)link->data;
653 link = link->next;
654
655 if (srv->flags.shutdown) {
656 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " has already SHUT DOWN.");
657 continue;
658 }
659
660 assert(hlp->childs.n_active > 0);
661 -- hlp->childs.n_active;
662 srv->flags.shutdown = true; /* request it to shut itself down */
663
664 if (srv->flags.busy) {
665 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is BUSY.");
666 continue;
667 }
668
669 if (srv->flags.closing) {
670 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is CLOSING.");
671 continue;
672 }
673
674 if (srv->flags.reserved) {
675 if (shutting_down) {
676 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is RESERVED. Closing anyway.");
677 } else {
678 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is RESERVED. Not Shutting Down Yet.");
679 continue;
680 }
681 }
682
683 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " shutting down.");
684
685 /* the rest of the details is dealt with in the helperStatefulServerFree
686 * close handler
687 */
688 srv->closePipesSafely();
689 }
690 }
691
692 helper::~helper()
693 {
694 /* note, don't free id_name, it probably points to static memory */
695
696 if (queue.head)
697 debugs(84, DBG_CRITICAL, "WARNING: freeing " << id_name << " helper with " << stats.queue_size << " requests queued");
698 }
699
700 /* ====================================================================== */
701 /* LOCAL FUNCTIONS */
702 /* ====================================================================== */
703
704 static void
705 helperServerFree(helper_server *srv)
706 {
707 helper *hlp = srv->parent;
708 helper_request *r;
709 int i, concurrency = hlp->childs.concurrency;
710
711 if (!concurrency)
712 concurrency = 1;
713
714 if (srv->rbuf) {
715 memFreeBuf(srv->rbuf_sz, srv->rbuf);
716 srv->rbuf = NULL;
717 }
718
719 srv->wqueue->clean();
720 delete srv->wqueue;
721
722 if (srv->writebuf) {
723 srv->writebuf->clean();
724 delete srv->writebuf;
725 srv->writebuf = NULL;
726 }
727
728 if (Comm::IsConnOpen(srv->writePipe))
729 srv->closeWritePipeSafely();
730
731 dlinkDelete(&srv->link, &hlp->servers);
732
733 assert(hlp->childs.n_running > 0);
734 -- hlp->childs.n_running;
735
736 if (!srv->flags.shutdown) {
737 assert(hlp->childs.n_active > 0);
738 -- hlp->childs.n_active;
739 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index + 1 << " exited");
740
741 if (hlp->childs.needNew() > 0) {
742 debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
743
744 if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30) {
745 if (srv->stats.replies < 1)
746 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
747 else
748 debugs(80, DBG_CRITICAL, "ERROR: The " << hlp->id_name << " helpers are crashing too rapidly, need help!");
749 }
750
751 debugs(80, DBG_IMPORTANT, "Starting new helpers");
752 helperOpenServers(hlp);
753 }
754 }
755
756 for (i = 0; i < concurrency; ++i) {
757 // XXX: re-schedule these on another helper?
758 if ((r = srv->requests[i])) {
759 void *cbdata;
760
761 if (cbdataReferenceValidDone(r->data, &cbdata)) {
762 HelperReply nilReply;
763 r->callback(cbdata, nilReply);
764 }
765
766 helperRequestFree(r);
767
768 srv->requests[i] = NULL;
769 }
770 }
771 safe_free(srv->requests);
772
773 cbdataReferenceDone(srv->parent);
774 cbdataFree(srv);
775 }
776
777 static void
778 helperStatefulServerFree(helper_stateful_server *srv)
779 {
780 statefulhelper *hlp = srv->parent;
781 helper_stateful_request *r;
782
783 if (srv->rbuf) {
784 memFreeBuf(srv->rbuf_sz, srv->rbuf);
785 srv->rbuf = NULL;
786 }
787
788 #if 0
789 srv->wqueue->clean();
790
791 delete srv->wqueue;
792
793 #endif
794
795 /* TODO: walk the local queue of requests and carry them all out */
796 if (Comm::IsConnOpen(srv->writePipe))
797 srv->closeWritePipeSafely();
798
799 dlinkDelete(&srv->link, &hlp->servers);
800
801 assert(hlp->childs.n_running > 0);
802 -- hlp->childs.n_running;
803
804 if (!srv->flags.shutdown) {
805 assert( hlp->childs.n_active > 0);
806 -- hlp->childs.n_active;
807 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index + 1 << " exited");
808
809 if (hlp->childs.needNew() > 0) {
810 debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
811
812 if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30) {
813 if (srv->stats.replies < 1)
814 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
815 else
816 debugs(80, DBG_CRITICAL, "ERROR: The " << hlp->id_name << " helpers are crashing too rapidly, need help!");
817 }
818
819 debugs(80, DBG_IMPORTANT, "Starting new helpers");
820 helperStatefulOpenServers(hlp);
821 }
822 }
823
824 if ((r = srv->request)) {
825 void *cbdata;
826
827 if (cbdataReferenceValidDone(r->data, &cbdata)) {
828 HelperReply nilReply;
829 nilReply.whichServer = srv;
830 r->callback(cbdata, nilReply);
831 }
832
833 helperStatefulRequestFree(r);
834
835 srv->request = NULL;
836 }
837
838 if (srv->data != NULL)
839 hlp->datapool->freeOne(srv->data);
840
841 cbdataReferenceDone(srv->parent);
842
843 cbdataFree(srv);
844 }
845
846 /// Calls back with a pointer to the buffer with the helper output
847 static void
848 helperReturnBuffer(int request_number, helper_server * srv, helper * hlp, char * msg, char * msg_end)
849 {
850 helper_request *r = srv->requests[request_number];
851 if (r) {
852 HLPCB *callback = r->callback;
853
854 srv->requests[request_number] = NULL;
855
856 r->callback = NULL;
857
858 void *cbdata = NULL;
859 if (cbdataReferenceValidDone(r->data, &cbdata)) {
860 HelperReply response(msg, (msg_end-msg));
861 callback(cbdata, response);
862 }
863
864 -- srv->stats.pending;
865 ++ srv->stats.replies;
866
867 ++ hlp->stats.replies;
868
869 srv->answer_time = current_time;
870
871 srv->dispatch_time = r->dispatch_time;
872
873 hlp->stats.avg_svc_time =
874 Math::intAverage(hlp->stats.avg_svc_time,
875 tvSubMsec(r->dispatch_time, current_time),
876 hlp->stats.replies, REDIRECT_AV_FACTOR);
877
878 helperRequestFree(r);
879 } else {
880 debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected reply on channel " <<
881 request_number << " from " << hlp->id_name << " #" << srv->index + 1 <<
882 " '" << srv->rbuf << "'");
883 }
884
885 if (!srv->flags.shutdown) {
886 helperKickQueue(hlp);
887 } else if (!srv->flags.closing && !srv->stats.pending) {
888 srv->flags.closing=true;
889 srv->writePipe->close();
890 }
891 }
892
893 static void
894 helperHandleRead(const Comm::ConnectionPointer &conn, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
895 {
896 char *t = NULL;
897 helper_server *srv = (helper_server *)data;
898 helper *hlp = srv->parent;
899 assert(cbdataReferenceValid(data));
900
901 /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
902
903 if (flag == COMM_ERR_CLOSING) {
904 return;
905 }
906
907 assert(conn->fd == srv->readPipe->fd);
908
909 debugs(84, 5, "helperHandleRead: " << len << " bytes from " << hlp->id_name << " #" << srv->index + 1);
910
911 if (flag != COMM_OK || len == 0) {
912 srv->closePipesSafely();
913 return;
914 }
915
916 srv->roffset += len;
917 srv->rbuf[srv->roffset] = '\0';
918 debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
919
920 if (!srv->stats.pending) {
921 /* someone spoke without being spoken to */
922 debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected read from " <<
923 hlp->id_name << " #" << srv->index + 1 << ", " << (int)len <<
924 " bytes '" << srv->rbuf << "'");
925
926 srv->roffset = 0;
927 srv->rbuf[0] = '\0';
928 }
929
930 while ((t = strchr(srv->rbuf, hlp->eom))) {
931 /* end of reply found */
932 char *msg = srv->rbuf;
933 int i = 0;
934 int skip = 1;
935 debugs(84, 3, "helperHandleRead: end of reply found");
936
937 if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n') {
938 *t = '\0';
939 // rewind to the \r octet which is the real terminal now
940 // and remember that we have to skip forward 2 places now.
941 skip = 2;
942 --t;
943 }
944
945 *t = '\0';
946
947 if (hlp->childs.concurrency) {
948 i = strtol(msg, &msg, 10);
949
950 while (*msg && xisspace(*msg))
951 ++msg;
952 }
953
954 helperReturnBuffer(i, srv, hlp, msg, t);
955 srv->roffset -= (t - srv->rbuf) + skip;
956 memmove(srv->rbuf, t + skip, srv->roffset);
957 srv->rbuf[srv->roffset] = '\0';
958 }
959
960 if (Comm::IsConnOpen(srv->readPipe)) {
961 int spaceSize = srv->rbuf_sz - srv->roffset - 1;
962 assert(spaceSize >= 0);
963
964 // grow the input buffer if needed and possible
965 if (!spaceSize && srv->rbuf_sz + 4096 <= ReadBufMaxSize) {
966 srv->rbuf = (char *)memReallocBuf(srv->rbuf, srv->rbuf_sz + 4096, &srv->rbuf_sz);
967 debugs(84, 3, HERE << "Grew read buffer to " << srv->rbuf_sz);
968 spaceSize = srv->rbuf_sz - srv->roffset - 1;
969 assert(spaceSize >= 0);
970 }
971
972 // quit reading if there is no space left
973 if (!spaceSize) {
974 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
975 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
976 "Squid input buffer: " << hlp->id_name << " #" <<
977 (srv->index + 1));
978 srv->closePipesSafely();
979 return;
980 }
981
982 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
983 CommIoCbPtrFun(helperHandleRead, srv));
984 comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
985 }
986 }
987
988 static void
989 helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
990 {
991 char *t = NULL;
992 helper_stateful_server *srv = (helper_stateful_server *)data;
993 helper_stateful_request *r;
994 statefulhelper *hlp = srv->parent;
995 assert(cbdataReferenceValid(data));
996
997 /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
998
999 if (flag == COMM_ERR_CLOSING) {
1000 return;
1001 }
1002
1003 assert(conn->fd == srv->readPipe->fd);
1004
1005 debugs(84, 5, "helperStatefulHandleRead: " << len << " bytes from " <<
1006 hlp->id_name << " #" << srv->index + 1);
1007
1008 if (flag != COMM_OK || len == 0) {
1009 srv->closePipesSafely();
1010 return;
1011 }
1012
1013 srv->roffset += len;
1014 srv->rbuf[srv->roffset] = '\0';
1015 r = srv->request;
1016 debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
1017
1018 if (r == NULL) {
1019 /* someone spoke without being spoken to */
1020 debugs(84, DBG_IMPORTANT, "helperStatefulHandleRead: unexpected read from " <<
1021 hlp->id_name << " #" << srv->index + 1 << ", " << (int)len <<
1022 " bytes '" << srv->rbuf << "'");
1023
1024 srv->roffset = 0;
1025 }
1026
1027 if ((t = strchr(srv->rbuf, hlp->eom))) {
1028 /* end of reply found */
1029 int called = 1;
1030 int skip = 1;
1031 debugs(84, 3, "helperStatefulHandleRead: end of reply found");
1032
1033 if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n') {
1034 *t = '\0';
1035 // rewind to the \r octet which is the real terminal now
1036 // and remember that we have to skip forward 2 places now.
1037 skip = 2;
1038 --t;
1039 }
1040
1041 *t = '\0';
1042
1043 if (r && cbdataReferenceValid(r->data)) {
1044 HelperReply res(srv->rbuf, (t - srv->rbuf));
1045 res.whichServer = srv;
1046 r->callback(r->data, res);
1047 } else {
1048 debugs(84, DBG_IMPORTANT, "StatefulHandleRead: no callback data registered");
1049 called = 0;
1050 }
1051 // only skip off the \0's _after_ passing its location in HelperReply above
1052 t += skip;
1053
1054 srv->flags.busy = false;
1055 /**
1056 * BUG: the below assumes that only one response per read() was received and discards any octets remaining.
1057 * Doing this prohibits concurrency support with multiple replies per read().
1058 * TODO: check that read() setup on these buffers pays attention to roffest!=0
1059 * TODO: check that replies bigger than the buffer are discarded and do not to affect future replies
1060 */
1061 srv->roffset = 0;
1062 helperStatefulRequestFree(r);
1063 srv->request = NULL;
1064
1065 -- srv->stats.pending;
1066 ++ srv->stats.replies;
1067
1068 ++ hlp->stats.replies;
1069 srv->answer_time = current_time;
1070 hlp->stats.avg_svc_time =
1071 Math::intAverage(hlp->stats.avg_svc_time,
1072 tvSubMsec(srv->dispatch_time, current_time),
1073 hlp->stats.replies, REDIRECT_AV_FACTOR);
1074
1075 if (called)
1076 helperStatefulServerDone(srv);
1077 else
1078 helperStatefulReleaseServer(srv);
1079 }
1080
1081 if (Comm::IsConnOpen(srv->readPipe)) {
1082 int spaceSize = srv->rbuf_sz - srv->roffset - 1;
1083 assert(spaceSize >= 0);
1084
1085 // grow the input buffer if needed and possible
1086 if (!spaceSize && srv->rbuf_sz + 4096 <= ReadBufMaxSize) {
1087 srv->rbuf = (char *)memReallocBuf(srv->rbuf, srv->rbuf_sz + 4096, &srv->rbuf_sz);
1088 debugs(84, 3, HERE << "Grew read buffer to " << srv->rbuf_sz);
1089 spaceSize = srv->rbuf_sz - srv->roffset - 1;
1090 assert(spaceSize >= 0);
1091 }
1092
1093 // quit reading if there is no space left
1094 if (!spaceSize) {
1095 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
1096 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
1097 "Squid input buffer: " << hlp->id_name << " #" <<
1098 (srv->index + 1));
1099 srv->closePipesSafely();
1100 return;
1101 }
1102
1103 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
1104 CommIoCbPtrFun(helperStatefulHandleRead, srv));
1105 comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
1106 }
1107 }
1108
1109 static void
1110 Enqueue(helper * hlp, helper_request * r)
1111 {
1112 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
1113 dlinkAddTail(r, link, &hlp->queue);
1114 ++ hlp->stats.queue_size;
1115
1116 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1117 if (hlp->childs.needNew() > 0) {
1118 debugs(84, DBG_CRITICAL, "Starting new " << hlp->id_name << " helpers...");
1119 helperOpenServers(hlp);
1120 return;
1121 }
1122
1123 if (hlp->stats.queue_size < (int)hlp->childs.n_running)
1124 return;
1125
1126 if (squid_curtime - hlp->last_queue_warn < 600)
1127 return;
1128
1129 if (shutting_down || reconfiguring)
1130 return;
1131
1132 hlp->last_queue_warn = squid_curtime;
1133
1134 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1135 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1136 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1137
1138 if (hlp->stats.queue_size > (int)hlp->childs.n_running * 2)
1139 fatalf("Too many queued %s requests", hlp->id_name);
1140 }
1141
1142 static void
1143 StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r)
1144 {
1145 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
1146 dlinkAddTail(r, link, &hlp->queue);
1147 ++ hlp->stats.queue_size;
1148
1149 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1150 if (hlp->childs.needNew() > 0) {
1151 debugs(84, DBG_CRITICAL, "Starting new " << hlp->id_name << " helpers...");
1152 helperStatefulOpenServers(hlp);
1153 return;
1154 }
1155
1156 if (hlp->stats.queue_size < (int)hlp->childs.n_running)
1157 return;
1158
1159 if (hlp->stats.queue_size > (int)hlp->childs.n_running * 2)
1160 fatalf("Too many queued %s requests", hlp->id_name);
1161
1162 if (squid_curtime - hlp->last_queue_warn < 600)
1163 return;
1164
1165 if (shutting_down || reconfiguring)
1166 return;
1167
1168 hlp->last_queue_warn = squid_curtime;
1169
1170 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1171 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1172 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1173 }
1174
1175 static helper_request *
1176 Dequeue(helper * hlp)
1177 {
1178 dlink_node *link;
1179 helper_request *r = NULL;
1180
1181 if ((link = hlp->queue.head)) {
1182 r = (helper_request *)link->data;
1183 dlinkDelete(link, &hlp->queue);
1184 memFree(link, MEM_DLINK_NODE);
1185 -- hlp->stats.queue_size;
1186 }
1187
1188 return r;
1189 }
1190
1191 static helper_stateful_request *
1192 StatefulDequeue(statefulhelper * hlp)
1193 {
1194 dlink_node *link;
1195 helper_stateful_request *r = NULL;
1196
1197 if ((link = hlp->queue.head)) {
1198 r = (helper_stateful_request *)link->data;
1199 dlinkDelete(link, &hlp->queue);
1200 memFree(link, MEM_DLINK_NODE);
1201 -- hlp->stats.queue_size;
1202 }
1203
1204 return r;
1205 }
1206
1207 static helper_server *
1208 GetFirstAvailable(helper * hlp)
1209 {
1210 dlink_node *n;
1211 helper_server *srv;
1212 helper_server *selected = NULL;
1213 debugs(84, 5, "GetFirstAvailable: Running servers " << hlp->childs.n_running);
1214
1215 if (hlp->childs.n_running == 0)
1216 return NULL;
1217
1218 /* Find "least" loaded helper (approx) */
1219 for (n = hlp->servers.head; n != NULL; n = n->next) {
1220 srv = (helper_server *)n->data;
1221
1222 if (selected && selected->stats.pending <= srv->stats.pending)
1223 continue;
1224
1225 if (srv->flags.shutdown)
1226 continue;
1227
1228 if (!srv->stats.pending)
1229 return srv;
1230
1231 if (selected) {
1232 selected = srv;
1233 break;
1234 }
1235
1236 selected = srv;
1237 }
1238
1239 /* Check for overload */
1240 if (!selected) {
1241 debugs(84, 5, "GetFirstAvailable: None available.");
1242 return NULL;
1243 }
1244
1245 if (selected->stats.pending >= (hlp->childs.concurrency ? hlp->childs.concurrency : 1)) {
1246 debugs(84, 3, "GetFirstAvailable: Least-loaded helper is overloaded!");
1247 return NULL;
1248 }
1249
1250 debugs(84, 5, "GetFirstAvailable: returning srv-" << selected->index);
1251 return selected;
1252 }
1253
1254 static helper_stateful_server *
1255 StatefulGetFirstAvailable(statefulhelper * hlp)
1256 {
1257 dlink_node *n;
1258 helper_stateful_server *srv = NULL;
1259 debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp->childs.n_running);
1260
1261 if (hlp->childs.n_running == 0)
1262 return NULL;
1263
1264 for (n = hlp->servers.head; n != NULL; n = n->next) {
1265 srv = (helper_stateful_server *)n->data;
1266
1267 if (srv->flags.busy)
1268 continue;
1269
1270 if (srv->flags.reserved)
1271 continue;
1272
1273 if (srv->flags.shutdown)
1274 continue;
1275
1276 if ((hlp->IsAvailable != NULL) && (srv->data != NULL) && !(hlp->IsAvailable(srv->data)))
1277 continue;
1278
1279 debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv->index);
1280 return srv;
1281 }
1282
1283 debugs(84, 5, "StatefulGetFirstAvailable: None available.");
1284 return NULL;
1285 }
1286
1287 static void
1288 helperDispatchWriteDone(const Comm::ConnectionPointer &conn, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
1289 {
1290 helper_server *srv = (helper_server *)data;
1291
1292 srv->writebuf->clean();
1293 delete srv->writebuf;
1294 srv->writebuf = NULL;
1295 srv->flags.writing = false;
1296
1297 if (flag != COMM_OK) {
1298 /* Helper server has crashed */
1299 debugs(84, DBG_CRITICAL, "helperDispatch: Helper " << srv->parent->id_name << " #" << srv->index + 1 << " has crashed");
1300 return;
1301 }
1302
1303 if (!srv->wqueue->isNull()) {
1304 srv->writebuf = srv->wqueue;
1305 srv->wqueue = new MemBuf;
1306 srv->flags.writing = true;
1307 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1308 CommIoCbPtrFun(helperDispatchWriteDone, srv));
1309 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
1310 }
1311 }
1312
1313 static void
1314 helperDispatch(helper_server * srv, helper_request * r)
1315 {
1316 helper *hlp = srv->parent;
1317 helper_request **ptr = NULL;
1318 unsigned int slot;
1319
1320 if (!cbdataReferenceValid(r->data)) {
1321 debugs(84, DBG_IMPORTANT, "helperDispatch: invalid callback data");
1322 helperRequestFree(r);
1323 return;
1324 }
1325
1326 for (slot = 0; slot < (hlp->childs.concurrency ? hlp->childs.concurrency : 1); ++slot) {
1327 if (!srv->requests[slot]) {
1328 ptr = &srv->requests[slot];
1329 break;
1330 }
1331 }
1332
1333 assert(ptr);
1334 *ptr = r;
1335 r->dispatch_time = current_time;
1336
1337 if (srv->wqueue->isNull())
1338 srv->wqueue->init();
1339
1340 if (hlp->childs.concurrency)
1341 srv->wqueue->Printf("%d %s", slot, r->buf);
1342 else
1343 srv->wqueue->append(r->buf, strlen(r->buf));
1344
1345 if (!srv->flags.writing) {
1346 assert(NULL == srv->writebuf);
1347 srv->writebuf = srv->wqueue;
1348 srv->wqueue = new MemBuf;
1349 srv->flags.writing = true;
1350 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1351 CommIoCbPtrFun(helperDispatchWriteDone, srv));
1352 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
1353 }
1354
1355 debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index + 1 << ", " << strlen(r->buf) << " bytes");
1356
1357 ++ srv->stats.uses;
1358 ++ srv->stats.pending;
1359 ++ hlp->stats.requests;
1360 }
1361
1362 static void
1363 helperStatefulDispatchWriteDone(const Comm::ConnectionPointer &conn, char *buf, size_t len, comm_err_t flag,
1364 int xerrno, void *data)
1365 {
1366 /* nothing! */
1367 }
1368
1369 static void
1370 helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r)
1371 {
1372 statefulhelper *hlp = srv->parent;
1373
1374 if (!cbdataReferenceValid(r->data)) {
1375 debugs(84, DBG_IMPORTANT, "helperStatefulDispatch: invalid callback data");
1376 helperStatefulRequestFree(r);
1377 helperStatefulReleaseServer(srv);
1378 return;
1379 }
1380
1381 debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index + 1);
1382
1383 if (r->placeholder == 1) {
1384 /* a callback is needed before this request can _use_ a helper. */
1385 /* we don't care about releasing this helper. The request NEVER
1386 * gets to the helper. So we throw away the return code */
1387 HelperReply nilReply;
1388 nilReply.whichServer = srv;
1389 r->callback(r->data, nilReply);
1390 /* throw away the placeholder */
1391 helperStatefulRequestFree(r);
1392 /* and push the queue. Note that the callback may have submitted a new
1393 * request to the helper which is why we test for the request */
1394
1395 if (srv->request == NULL)
1396 helperStatefulServerDone(srv);
1397
1398 return;
1399 }
1400
1401 srv->flags.busy = true;
1402 srv->flags.reserved = true;
1403 srv->request = r;
1404 srv->dispatch_time = current_time;
1405 AsyncCall::Pointer call = commCbCall(5,5, "helperStatefulDispatchWriteDone",
1406 CommIoCbPtrFun(helperStatefulDispatchWriteDone, hlp));
1407 Comm::Write(srv->writePipe, r->buf, strlen(r->buf), call, NULL);
1408 debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
1409 hlp->id_name << " #" << srv->index + 1 << ", " <<
1410 (int) strlen(r->buf) << " bytes");
1411
1412 ++ srv->stats.uses;
1413 ++ srv->stats.pending;
1414 ++ hlp->stats.requests;
1415 }
1416
1417 static void
1418 helperKickQueue(helper * hlp)
1419 {
1420 helper_request *r;
1421 helper_server *srv;
1422
1423 while ((srv = GetFirstAvailable(hlp)) && (r = Dequeue(hlp)))
1424 helperDispatch(srv, r);
1425 }
1426
1427 static void
1428 helperStatefulKickQueue(statefulhelper * hlp)
1429 {
1430 helper_stateful_request *r;
1431 helper_stateful_server *srv;
1432
1433 while ((srv = StatefulGetFirstAvailable(hlp)) && (r = StatefulDequeue(hlp)))
1434 helperStatefulDispatch(srv, r);
1435 }
1436
1437 static void
1438 helperStatefulServerDone(helper_stateful_server * srv)
1439 {
1440 if (!srv->flags.shutdown) {
1441 helperStatefulKickQueue(srv->parent);
1442 } else if (!srv->flags.closing && !srv->flags.reserved && !srv->flags.busy) {
1443 srv->closeWritePipeSafely();
1444 return;
1445 }
1446 }
1447
1448 static void
1449 helperRequestFree(helper_request * r)
1450 {
1451 cbdataReferenceDone(r->data);
1452 xfree(r->buf);
1453 delete r;
1454 }
1455
1456 static void
1457 helperStatefulRequestFree(helper_stateful_request * r)
1458 {
1459 if (r) {
1460 cbdataReferenceDone(r->data);
1461 xfree(r->buf);
1462 delete r;
1463 }
1464 }
1465
1466 // TODO: should helper_ and helper_stateful_ have a common parent?
1467 static bool
1468 helperStartStats(StoreEntry *sentry, void *hlp, const char *label)
1469 {
1470 if (!hlp) {
1471 if (label)
1472 storeAppendPrintf(sentry, "%s: unavailable\n", label);
1473 return false;
1474 }
1475
1476 if (label)
1477 storeAppendPrintf(sentry, "%s:\n", label);
1478
1479 return true;
1480 }