]> git.ipfire.org Git - thirdparty/squid.git/blob - src/helper.cc
Helper protocol upgrade: add optional kv-pair field to responses
[thirdparty/squid.git] / src / helper.cc
1 /*
2 * DEBUG: section 84 Helper process maintenance
3 * AUTHOR: Harvest Derived?
4 *
5 * SQUID Web Proxy Cache http://www.squid-cache.org/
6 * ----------------------------------------------------------
7 *
8 * Squid is the result of efforts by numerous individuals from
9 * the Internet community; see the CONTRIBUTORS file for full
10 * details. Many organizations have provided support for Squid's
11 * development; see the SPONSORS file for full details. Squid is
12 * Copyrighted (C) 2001 by the Regents of the University of
13 * California; see the COPYRIGHT file for full details. Squid
14 * incorporates software developed and/or copyrighted by other
15 * sources; see the CREDITS file for full details.
16 *
17 * This program is free software; you can redistribute it and/or modify
18 * it under the terms of the GNU General Public License as published by
19 * the Free Software Foundation; either version 2 of the License, or
20 * (at your option) any later version.
21 *
22 * This program is distributed in the hope that it will be useful,
23 * but WITHOUT ANY WARRANTY; without even the implied warranty of
24 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
25 * GNU General Public License for more details.
26 *
27 * You should have received a copy of the GNU General Public License
28 * along with this program; if not, write to the Free Software
29 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
30 *
31 */
32
33 #include "squid.h"
34 #include "base/AsyncCbdataCalls.h"
35 #include "comm.h"
36 #include "comm/Connection.h"
37 #include "comm/Write.h"
38 #include "fd.h"
39 #include "format/Quoting.h"
40 #include "helper.h"
41 #include "Mem.h"
42 #include "MemBuf.h"
43 #include "SquidIpc.h"
44 #include "SquidMath.h"
45 #include "SquidTime.h"
46 #include "Store.h"
47 #include "wordlist.h"
48
49 #define HELPER_MAX_ARGS 64
50
51 /** Initial Squid input buffer size. Helper responses may exceed this, and
52 * Squid will grow the input buffer as needed, up to ReadBufMaxSize.
53 */
54 const size_t ReadBufMinSize(4*1024);
55
56 /** Maximum safe size of a helper-to-Squid response message plus one.
57 * Squid will warn and close the stream if a helper sends a too-big response.
58 * ssl_crtd helper is known to produce responses of at least 10KB in size.
59 * Some undocumented helpers are known to produce responses exceeding 8KB.
60 */
61 const size_t ReadBufMaxSize(32*1024);
62
63 static IOCB helperHandleRead;
64 static IOCB helperStatefulHandleRead;
65 static void helperServerFree(helper_server *srv);
66 static void helperStatefulServerFree(helper_stateful_server *srv);
67 static void Enqueue(helper * hlp, helper_request *);
68 static helper_request *Dequeue(helper * hlp);
69 static helper_stateful_request *StatefulDequeue(statefulhelper * hlp);
70 static helper_server *GetFirstAvailable(helper * hlp);
71 static helper_stateful_server *StatefulGetFirstAvailable(statefulhelper * hlp);
72 static void helperDispatch(helper_server * srv, helper_request * r);
73 static void helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r);
74 static void helperKickQueue(helper * hlp);
75 static void helperStatefulKickQueue(statefulhelper * hlp);
76 static void helperStatefulServerDone(helper_stateful_server * srv);
77 static void helperRequestFree(helper_request * r);
78 static void helperStatefulRequestFree(helper_stateful_request * r);
79 static void StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r);
80 static bool helperStartStats(StoreEntry *sentry, void *hlp, const char *label);
81
82 CBDATA_CLASS_INIT(helper);
83 CBDATA_TYPE(helper_server);
84 CBDATA_CLASS_INIT(statefulhelper);
85 CBDATA_TYPE(helper_stateful_server);
86
87 void
88 HelperServerBase::initStats()
89 {
90 stats.uses=0;
91 stats.replies=0;
92 stats.pending=0;
93 stats.releases=0;
94 }
95
96 void
97 HelperServerBase::closePipesSafely()
98 {
99 #if _SQUID_WINDOWS_
100 int no = index + 1;
101
102 shutdown(writePipe->fd, SD_BOTH);
103 #endif
104
105 flags.closing = 1;
106 if (readPipe->fd == writePipe->fd)
107 readPipe->fd = -1;
108 else
109 readPipe->close();
110 writePipe->close();
111
112 #if _SQUID_WINDOWS_
113 if (hIpc) {
114 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
115 getCurrentTime();
116 debugs(84, DBG_IMPORTANT, "WARNING: " << hlp->id_name <<
117 " #" << no << " (" << hlp->cmdline->key << "," <<
118 (long int)pid << ") didn't exit in 5 seconds");
119 }
120 CloseHandle(hIpc);
121 }
122 #endif
123 }
124
125 void
126 HelperServerBase::closeWritePipeSafely()
127 {
128 #if _SQUID_WINDOWS_
129 int no = index + 1;
130
131 shutdown(writePipe->fd, (readPipe->fd == writePipe->fd ? SD_BOTH : SD_SEND));
132 #endif
133
134 flags.closing = 1;
135 if (readPipe->fd == writePipe->fd)
136 readPipe->fd = -1;
137 writePipe->close();
138
139 #if _SQUID_WINDOWS_
140 if (hIpc) {
141 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
142 getCurrentTime();
143 debugs(84, DBG_IMPORTANT, "WARNING: " << hlp->id_name <<
144 " #" << no << " (" << hlp->cmdline->key << "," <<
145 (long int)pid << ") didn't exit in 5 seconds");
146 }
147 CloseHandle(hIpc);
148 }
149 #endif
150 }
151
152 void
153 helperOpenServers(helper * hlp)
154 {
155 char *s;
156 char *progname;
157 char *shortname;
158 char *procname;
159 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
160 char fd_note_buf[FD_DESC_SZ];
161 helper_server *srv;
162 int nargs = 0;
163 int k;
164 pid_t pid;
165 int rfd;
166 int wfd;
167 void * hIpc;
168 wordlist *w;
169
170 if (hlp->cmdline == NULL)
171 return;
172
173 progname = hlp->cmdline->key;
174
175 if ((s = strrchr(progname, '/')))
176 shortname = xstrdup(s + 1);
177 else
178 shortname = xstrdup(progname);
179
180 /* figure out how many new child are actually needed. */
181 int need_new = hlp->childs.needNew();
182
183 debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
184
185 if (need_new < 1) {
186 debugs(84, DBG_IMPORTANT, "helperOpenServers: No '" << shortname << "' processes needed.");
187 }
188
189 procname = (char *)xmalloc(strlen(shortname) + 3);
190
191 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
192
193 args[nargs] = procname;
194 ++nargs;
195
196 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
197 args[nargs] = w->key;
198 ++nargs;
199 }
200
201 args[nargs] = NULL;
202 ++nargs;
203
204 assert(nargs <= HELPER_MAX_ARGS);
205
206 for (k = 0; k < need_new; ++k) {
207 getCurrentTime();
208 rfd = wfd = -1;
209 pid = ipcCreate(hlp->ipc_type,
210 progname,
211 args,
212 shortname,
213 hlp->addr,
214 &rfd,
215 &wfd,
216 &hIpc);
217
218 if (pid < 0) {
219 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
220 continue;
221 }
222
223 ++ hlp->childs.n_running;
224 ++ hlp->childs.n_active;
225 CBDATA_INIT_TYPE(helper_server);
226 srv = cbdataAlloc(helper_server);
227 srv->hIpc = hIpc;
228 srv->pid = pid;
229 srv->initStats();
230 srv->index = k;
231 srv->addr = hlp->addr;
232 srv->readPipe = new Comm::Connection;
233 srv->readPipe->fd = rfd;
234 srv->writePipe = new Comm::Connection;
235 srv->writePipe->fd = wfd;
236 srv->rbuf = (char *)memAllocBuf(ReadBufMinSize, &srv->rbuf_sz);
237 srv->wqueue = new MemBuf;
238 srv->roffset = 0;
239 srv->requests = (helper_request **)xcalloc(hlp->childs.concurrency ? hlp->childs.concurrency : 1, sizeof(*srv->requests));
240 srv->parent = cbdataReference(hlp);
241 dlinkAddTail(srv, &srv->link, &hlp->servers);
242
243 if (rfd == wfd) {
244 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
245 fd_note(rfd, fd_note_buf);
246 } else {
247 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
248 fd_note(rfd, fd_note_buf);
249 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
250 fd_note(wfd, fd_note_buf);
251 }
252
253 commSetNonBlocking(rfd);
254
255 if (wfd != rfd)
256 commSetNonBlocking(wfd);
257
258 AsyncCall::Pointer closeCall = asyncCall(5,4, "helperServerFree", cbdataDialer(helperServerFree, srv));
259 comm_add_close_handler(rfd, closeCall);
260
261 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
262 CommIoCbPtrFun(helperHandleRead, srv));
263 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
264 }
265
266 hlp->last_restart = squid_curtime;
267 safe_free(shortname);
268 safe_free(procname);
269 helperKickQueue(hlp);
270 }
271
272 /**
273 * DPW 2007-05-08
274 *
275 * helperStatefulOpenServers: create the stateful child helper processes
276 */
277 void
278 helperStatefulOpenServers(statefulhelper * hlp)
279 {
280 char *shortname;
281 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
282 char fd_note_buf[FD_DESC_SZ];
283 int nargs = 0;
284
285 if (hlp->cmdline == NULL)
286 return;
287
288 if (hlp->childs.concurrency)
289 debugs(84, DBG_CRITICAL, "ERROR: concurrency= is not yet supported for stateful helpers ('" << hlp->cmdline << "')");
290
291 char *progname = hlp->cmdline->key;
292
293 char *s;
294 if ((s = strrchr(progname, '/')))
295 shortname = xstrdup(s + 1);
296 else
297 shortname = xstrdup(progname);
298
299 /* figure out haw mant new helpers are needed. */
300 int need_new = hlp->childs.needNew();
301
302 debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
303
304 if (need_new < 1) {
305 debugs(84, DBG_IMPORTANT, "helperStatefulOpenServers: No '" << shortname << "' processes needed.");
306 }
307
308 char *procname = (char *)xmalloc(strlen(shortname) + 3);
309
310 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
311
312 args[nargs] = procname;
313 ++nargs;
314
315 for (wordlist *w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
316 args[nargs] = w->key;
317 ++nargs;
318 }
319
320 args[nargs] = NULL;
321 ++nargs;
322
323 assert(nargs <= HELPER_MAX_ARGS);
324
325 for (int k = 0; k < need_new; ++k) {
326 getCurrentTime();
327 int rfd = -1;
328 int wfd = -1;
329 void * hIpc;
330 pid_t pid = ipcCreate(hlp->ipc_type,
331 progname,
332 args,
333 shortname,
334 hlp->addr,
335 &rfd,
336 &wfd,
337 &hIpc);
338
339 if (pid < 0) {
340 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
341 continue;
342 }
343
344 ++ hlp->childs.n_running;
345 ++ hlp->childs.n_active;
346 CBDATA_INIT_TYPE(helper_stateful_server);
347 helper_stateful_server *srv = cbdataAlloc(helper_stateful_server);
348 srv->hIpc = hIpc;
349 srv->pid = pid;
350 srv->flags.reserved = 0;
351 srv->initStats();
352 srv->index = k;
353 srv->addr = hlp->addr;
354 srv->readPipe = new Comm::Connection;
355 srv->readPipe->fd = rfd;
356 srv->writePipe = new Comm::Connection;
357 srv->writePipe->fd = wfd;
358 srv->rbuf = (char *)memAllocBuf(ReadBufMinSize, &srv->rbuf_sz);
359 srv->roffset = 0;
360 srv->parent = cbdataReference(hlp);
361
362 if (hlp->datapool != NULL)
363 srv->data = hlp->datapool->alloc();
364
365 dlinkAddTail(srv, &srv->link, &hlp->servers);
366
367 if (rfd == wfd) {
368 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
369 fd_note(rfd, fd_note_buf);
370 } else {
371 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
372 fd_note(rfd, fd_note_buf);
373 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
374 fd_note(wfd, fd_note_buf);
375 }
376
377 commSetNonBlocking(rfd);
378
379 if (wfd != rfd)
380 commSetNonBlocking(wfd);
381
382 AsyncCall::Pointer closeCall = asyncCall(5,4, "helperStatefulServerFree", cbdataDialer(helperStatefulServerFree, srv));
383 comm_add_close_handler(rfd, closeCall);
384
385 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
386 CommIoCbPtrFun(helperStatefulHandleRead, srv));
387 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
388 }
389
390 hlp->last_restart = squid_curtime;
391 safe_free(shortname);
392 safe_free(procname);
393 helperStatefulKickQueue(hlp);
394 }
395
396 void
397 helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
398 {
399 if (hlp == NULL) {
400 debugs(84, 3, "helperSubmit: hlp == NULL");
401 HelperReply nilReply;
402 callback(data, nilReply);
403 return;
404 }
405
406 helper_request *r = new helper_request;
407 helper_server *srv;
408
409 r->callback = callback;
410 r->data = cbdataReference(data);
411 r->buf = xstrdup(buf);
412
413 if ((srv = GetFirstAvailable(hlp)))
414 helperDispatch(srv, r);
415 else
416 Enqueue(hlp, r);
417
418 debugs(84, 9, "helperSubmit: " << buf);
419 }
420
421 /// lastserver = "server last used as part of a reserved request sequence"
422 void
423 helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPCB * callback, void *data, helper_stateful_server * lastserver)
424 {
425 if (hlp == NULL) {
426 debugs(84, 3, "helperStatefulSubmit: hlp == NULL");
427 HelperReply nilReply;
428 callback(data, nilReply);
429 return;
430 }
431
432 helper_stateful_request *r = new helper_stateful_request;
433
434 r->callback = callback;
435 r->data = cbdataReference(data);
436
437 if (buf != NULL) {
438 r->buf = xstrdup(buf);
439 r->placeholder = 0;
440 } else {
441 r->buf = NULL;
442 r->placeholder = 1;
443 }
444
445 if ((buf != NULL) && lastserver) {
446 debugs(84, 5, "StatefulSubmit with lastserver " << lastserver);
447 assert(lastserver->flags.reserved);
448 assert(!(lastserver->request));
449
450 debugs(84, 5, "StatefulSubmit dispatching");
451 helperStatefulDispatch(lastserver, r);
452 } else {
453 helper_stateful_server *srv;
454 if ((srv = StatefulGetFirstAvailable(hlp))) {
455 helperStatefulDispatch(srv, r);
456 } else
457 StatefulEnqueue(hlp, r);
458 }
459
460 debugs(84, 9, "helperStatefulSubmit: placeholder: '" << r->placeholder << "', buf '" << buf << "'.");
461 }
462
463 /**
464 * DPW 2007-05-08
465 *
466 * helperStatefulReleaseServer tells the helper that whoever was
467 * using it no longer needs its services.
468 */
469 void
470 helperStatefulReleaseServer(helper_stateful_server * srv)
471 {
472 debugs(84, 3, HERE << "srv-" << srv->index << " flags.reserved = " << srv->flags.reserved);
473 if (!srv->flags.reserved)
474 return;
475
476 ++ srv->stats.releases;
477
478 srv->flags.reserved = 0;
479 if (srv->parent->OnEmptyQueue != NULL && srv->data)
480 srv->parent->OnEmptyQueue(srv->data);
481
482 helperStatefulServerDone(srv);
483 }
484
485 /** return a pointer to the stateful routines data area */
486 void *
487 helperStatefulServerGetData(helper_stateful_server * srv)
488 {
489 return srv->data;
490 }
491
492 /**
493 * Dump some stats about the helper states to a StoreEntry
494 */
495 void
496 helperStats(StoreEntry * sentry, helper * hlp, const char *label)
497 {
498 if (!helperStartStats(sentry, hlp, label))
499 return;
500
501 storeAppendPrintf(sentry, "program: %s\n",
502 hlp->cmdline->key);
503 storeAppendPrintf(sentry, "number active: %d of %d (%d shutting down)\n",
504 hlp->childs.n_active, hlp->childs.n_max, (hlp->childs.n_running - hlp->childs.n_active) );
505 storeAppendPrintf(sentry, "requests sent: %d\n",
506 hlp->stats.requests);
507 storeAppendPrintf(sentry, "replies received: %d\n",
508 hlp->stats.replies);
509 storeAppendPrintf(sentry, "queue length: %d\n",
510 hlp->stats.queue_size);
511 storeAppendPrintf(sentry, "avg service time: %d msec\n",
512 hlp->stats.avg_svc_time);
513 storeAppendPrintf(sentry, "\n");
514 storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%11s\t%s\t%7s\t%7s\t%7s\n",
515 "#",
516 "FD",
517 "PID",
518 "# Requests",
519 "# Replies",
520 "Flags",
521 "Time",
522 "Offset",
523 "Request");
524
525 for (dlink_node *link = hlp->servers.head; link; link = link->next) {
526 helper_server *srv = (helper_server*)link->data;
527 double tt = 0.001 * (srv->requests[0] ? tvSubMsec(srv->requests[0]->dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
528 storeAppendPrintf(sentry, "%7d\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "%c%c%c%c\t%7.3f\t%7d\t%s\n",
529 srv->index + 1,
530 srv->readPipe->fd,
531 srv->pid,
532 srv->stats.uses,
533 srv->stats.replies,
534 srv->stats.pending ? 'B' : ' ',
535 srv->flags.writing ? 'W' : ' ',
536 srv->flags.closing ? 'C' : ' ',
537 srv->flags.shutdown ? 'S' : ' ',
538 tt < 0.0 ? 0.0 : tt,
539 (int) srv->roffset,
540 srv->requests[0] ? Format::QuoteMimeBlob(srv->requests[0]->buf) : "(none)");
541 }
542
543 storeAppendPrintf(sentry, "\nFlags key:\n\n");
544 storeAppendPrintf(sentry, " B = BUSY\n");
545 storeAppendPrintf(sentry, " W = WRITING\n");
546 storeAppendPrintf(sentry, " C = CLOSING\n");
547 storeAppendPrintf(sentry, " S = SHUTDOWN PENDING\n");
548 }
549
550 void
551 helperStatefulStats(StoreEntry * sentry, statefulhelper * hlp, const char *label)
552 {
553 if (!helperStartStats(sentry, hlp, label))
554 return;
555
556 storeAppendPrintf(sentry, "program: %s\n",
557 hlp->cmdline->key);
558 storeAppendPrintf(sentry, "number active: %d of %d (%d shutting down)\n",
559 hlp->childs.n_active, hlp->childs.n_max, (hlp->childs.n_running - hlp->childs.n_active) );
560 storeAppendPrintf(sentry, "requests sent: %d\n",
561 hlp->stats.requests);
562 storeAppendPrintf(sentry, "replies received: %d\n",
563 hlp->stats.replies);
564 storeAppendPrintf(sentry, "queue length: %d\n",
565 hlp->stats.queue_size);
566 storeAppendPrintf(sentry, "avg service time: %d msec\n",
567 hlp->stats.avg_svc_time);
568 storeAppendPrintf(sentry, "\n");
569 storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
570 "#",
571 "FD",
572 "PID",
573 "# Requests",
574 "# Replies",
575 "Flags",
576 "Time",
577 "Offset",
578 "Request");
579
580 for (dlink_node *link = hlp->servers.head; link; link = link->next) {
581 helper_stateful_server *srv = (helper_stateful_server *)link->data;
582 double tt = 0.001 * tvSubMsec(srv->dispatch_time, srv->flags.busy ? current_time : srv->answer_time);
583 storeAppendPrintf(sentry, "%7d\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
584 srv->index + 1,
585 srv->readPipe->fd,
586 srv->pid,
587 srv->stats.uses,
588 srv->stats.replies,
589 srv->flags.busy ? 'B' : ' ',
590 srv->flags.closing ? 'C' : ' ',
591 srv->flags.reserved ? 'R' : ' ',
592 srv->flags.shutdown ? 'S' : ' ',
593 srv->request ? (srv->request->placeholder ? 'P' : ' ') : ' ',
594 tt < 0.0 ? 0.0 : tt,
595 (int) srv->roffset,
596 srv->request ? Format::QuoteMimeBlob(srv->request->buf) : "(none)");
597 }
598
599 storeAppendPrintf(sentry, "\nFlags key:\n\n");
600 storeAppendPrintf(sentry, " B = BUSY\n");
601 storeAppendPrintf(sentry, " C = CLOSING\n");
602 storeAppendPrintf(sentry, " R = RESERVED\n");
603 storeAppendPrintf(sentry, " S = SHUTDOWN PENDING\n");
604 storeAppendPrintf(sentry, " P = PLACEHOLDER\n");
605 }
606
607 void
608 helperShutdown(helper * hlp)
609 {
610 dlink_node *link = hlp->servers.head;
611
612 while (link) {
613 helper_server *srv;
614 srv = (helper_server *)link->data;
615 link = link->next;
616
617 if (srv->flags.shutdown) {
618 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " has already SHUT DOWN.");
619 continue;
620 }
621
622 assert(hlp->childs.n_active > 0);
623 -- hlp->childs.n_active;
624 srv->flags.shutdown = 1; /* request it to shut itself down */
625
626 if (srv->flags.closing) {
627 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is CLOSING.");
628 continue;
629 }
630
631 if (srv->stats.pending) {
632 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is BUSY.");
633 continue;
634 }
635
636 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " shutting down.");
637 /* the rest of the details is dealt with in the helperServerFree
638 * close handler
639 */
640 srv->closePipesSafely();
641 }
642 }
643
644 void
645 helperStatefulShutdown(statefulhelper * hlp)
646 {
647 dlink_node *link = hlp->servers.head;
648 helper_stateful_server *srv;
649
650 while (link) {
651 srv = (helper_stateful_server *)link->data;
652 link = link->next;
653
654 if (srv->flags.shutdown) {
655 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " has already SHUT DOWN.");
656 continue;
657 }
658
659 assert(hlp->childs.n_active > 0);
660 -- hlp->childs.n_active;
661 srv->flags.shutdown = 1; /* request it to shut itself down */
662
663 if (srv->flags.busy) {
664 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is BUSY.");
665 continue;
666 }
667
668 if (srv->flags.closing) {
669 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is CLOSING.");
670 continue;
671 }
672
673 if (srv->flags.reserved) {
674 if (shutting_down) {
675 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is RESERVED. Closing anyway.");
676 } else {
677 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is RESERVED. Not Shutting Down Yet.");
678 continue;
679 }
680 }
681
682 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " shutting down.");
683
684 /* the rest of the details is dealt with in the helperStatefulServerFree
685 * close handler
686 */
687 srv->closePipesSafely();
688 }
689 }
690
691 helper::~helper()
692 {
693 /* note, don't free id_name, it probably points to static memory */
694
695 if (queue.head)
696 debugs(84, DBG_CRITICAL, "WARNING: freeing " << id_name << " helper with " << stats.queue_size << " requests queued");
697 }
698
699 /* ====================================================================== */
700 /* LOCAL FUNCTIONS */
701 /* ====================================================================== */
702
703 static void
704 helperServerFree(helper_server *srv)
705 {
706 helper *hlp = srv->parent;
707 helper_request *r;
708 int i, concurrency = hlp->childs.concurrency;
709
710 if (!concurrency)
711 concurrency = 1;
712
713 if (srv->rbuf) {
714 memFreeBuf(srv->rbuf_sz, srv->rbuf);
715 srv->rbuf = NULL;
716 }
717
718 srv->wqueue->clean();
719 delete srv->wqueue;
720
721 if (srv->writebuf) {
722 srv->writebuf->clean();
723 delete srv->writebuf;
724 srv->writebuf = NULL;
725 }
726
727 if (Comm::IsConnOpen(srv->writePipe))
728 srv->closeWritePipeSafely();
729
730 dlinkDelete(&srv->link, &hlp->servers);
731
732 assert(hlp->childs.n_running > 0);
733 -- hlp->childs.n_running;
734
735 if (!srv->flags.shutdown) {
736 assert(hlp->childs.n_active > 0);
737 -- hlp->childs.n_active;
738 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index + 1 << " exited");
739
740 if (hlp->childs.needNew() > 0) {
741 debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
742
743 if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30) {
744 if (srv->stats.replies < 1)
745 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
746 else
747 debugs(80, DBG_CRITICAL, "ERROR: The " << hlp->id_name << " helpers are crashing too rapidly, need help!");
748 }
749
750 debugs(80, DBG_IMPORTANT, "Starting new helpers");
751 helperOpenServers(hlp);
752 }
753 }
754
755 for (i = 0; i < concurrency; ++i) {
756 // XXX: re-schedule these on another helper?
757 if ((r = srv->requests[i])) {
758 void *cbdata;
759
760 if (cbdataReferenceValidDone(r->data, &cbdata)) {
761 HelperReply nilReply;
762 r->callback(cbdata, nilReply);
763 }
764
765 helperRequestFree(r);
766
767 srv->requests[i] = NULL;
768 }
769 }
770 safe_free(srv->requests);
771
772 cbdataReferenceDone(srv->parent);
773 cbdataFree(srv);
774 }
775
776 static void
777 helperStatefulServerFree(helper_stateful_server *srv)
778 {
779 statefulhelper *hlp = srv->parent;
780 helper_stateful_request *r;
781
782 if (srv->rbuf) {
783 memFreeBuf(srv->rbuf_sz, srv->rbuf);
784 srv->rbuf = NULL;
785 }
786
787 #if 0
788 srv->wqueue->clean();
789
790 delete srv->wqueue;
791
792 #endif
793
794 /* TODO: walk the local queue of requests and carry them all out */
795 if (Comm::IsConnOpen(srv->writePipe))
796 srv->closeWritePipeSafely();
797
798 dlinkDelete(&srv->link, &hlp->servers);
799
800 assert(hlp->childs.n_running > 0);
801 -- hlp->childs.n_running;
802
803 if (!srv->flags.shutdown) {
804 assert( hlp->childs.n_active > 0);
805 -- hlp->childs.n_active;
806 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index + 1 << " exited");
807
808 if (hlp->childs.needNew() > 0) {
809 debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
810
811 if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30) {
812 if (srv->stats.replies < 1)
813 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
814 else
815 debugs(80, DBG_CRITICAL, "ERROR: The " << hlp->id_name << " helpers are crashing too rapidly, need help!");
816 }
817
818 debugs(80, DBG_IMPORTANT, "Starting new helpers");
819 helperStatefulOpenServers(hlp);
820 }
821 }
822
823 if ((r = srv->request)) {
824 void *cbdata;
825
826 if (cbdataReferenceValidDone(r->data, &cbdata)) {
827 HelperReply nilReply;
828 nilReply.whichServer = srv;
829 r->callback(cbdata, nilReply);
830 }
831
832 helperStatefulRequestFree(r);
833
834 srv->request = NULL;
835 }
836
837 if (srv->data != NULL)
838 hlp->datapool->freeOne(srv->data);
839
840 cbdataReferenceDone(srv->parent);
841
842 cbdataFree(srv);
843 }
844
845 /// Calls back with a pointer to the buffer with the helper output
846 static void
847 helperReturnBuffer(int request_number, helper_server * srv, helper * hlp, char * msg, char * msg_end)
848 {
849 helper_request *r = srv->requests[request_number];
850 if (r) {
851 // TODO: parse the reply into new helper reply object
852 // pass that to the callback instead of msg
853
854 HLPCB *callback = r->callback;
855
856 srv->requests[request_number] = NULL;
857
858 r->callback = NULL;
859
860 void *cbdata = NULL;
861 if (cbdataReferenceValidDone(r->data, &cbdata)) {
862 HelperReply response(msg, (msg_end-msg));
863 callback(cbdata, response);
864 }
865
866 -- srv->stats.pending;
867 ++ srv->stats.replies;
868
869 ++ hlp->stats.replies;
870
871 srv->answer_time = current_time;
872
873 srv->dispatch_time = r->dispatch_time;
874
875 hlp->stats.avg_svc_time =
876 Math::intAverage(hlp->stats.avg_svc_time,
877 tvSubMsec(r->dispatch_time, current_time),
878 hlp->stats.replies, REDIRECT_AV_FACTOR);
879
880 helperRequestFree(r);
881 } else {
882 debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected reply on channel " <<
883 request_number << " from " << hlp->id_name << " #" << srv->index + 1 <<
884 " '" << srv->rbuf << "'");
885 }
886 srv->roffset -= (msg_end - srv->rbuf);
887 memmove(srv->rbuf, msg_end, srv->roffset + 1);
888
889 if (!srv->flags.shutdown) {
890 helperKickQueue(hlp);
891 } else if (!srv->flags.closing && !srv->stats.pending) {
892 srv->flags.closing=1;
893 srv->writePipe->close();
894 return;
895 }
896 }
897
898 static void
899 helperHandleRead(const Comm::ConnectionPointer &conn, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
900 {
901 char *t = NULL;
902 helper_server *srv = (helper_server *)data;
903 helper *hlp = srv->parent;
904 assert(cbdataReferenceValid(data));
905
906 /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
907
908 if (flag == COMM_ERR_CLOSING) {
909 return;
910 }
911
912 assert(conn->fd == srv->readPipe->fd);
913
914 debugs(84, 5, "helperHandleRead: " << len << " bytes from " << hlp->id_name << " #" << srv->index + 1);
915
916 if (flag != COMM_OK || len == 0) {
917 srv->closePipesSafely();
918 return;
919 }
920
921 srv->roffset += len;
922 srv->rbuf[srv->roffset] = '\0';
923 debugs(84, 9, "helperHandleRead: '" << srv->rbuf << "'");
924
925 if (!srv->stats.pending) {
926 /* someone spoke without being spoken to */
927 debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected read from " <<
928 hlp->id_name << " #" << srv->index + 1 << ", " << (int)len <<
929 " bytes '" << srv->rbuf << "'");
930
931 srv->roffset = 0;
932 srv->rbuf[0] = '\0';
933 }
934
935 while ((t = strchr(srv->rbuf, hlp->eom))) {
936 /* end of reply found */
937 char *msg = srv->rbuf;
938 int i = 0;
939 debugs(84, 3, "helperHandleRead: end of reply found");
940
941 if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n')
942 t[-1] = '\0';
943
944 *t = '\0';
945
946 if (hlp->childs.concurrency) {
947 i = strtol(msg, &msg, 10);
948
949 while (*msg && xisspace(*msg))
950 ++msg;
951 }
952
953 helperReturnBuffer(i, srv, hlp, msg, t);
954 // only skip off the \0 _after_ passing its location to helperReturnBuffer
955 ++t;
956 }
957
958 if (Comm::IsConnOpen(srv->readPipe)) {
959 int spaceSize = srv->rbuf_sz - srv->roffset - 1;
960 assert(spaceSize >= 0);
961
962 // grow the input buffer if needed and possible
963 if (!spaceSize && srv->rbuf_sz + 4096 <= ReadBufMaxSize) {
964 srv->rbuf = (char *)memReallocBuf(srv->rbuf, srv->rbuf_sz + 4096, &srv->rbuf_sz);
965 debugs(84, 3, HERE << "Grew read buffer to " << srv->rbuf_sz);
966 spaceSize = srv->rbuf_sz - srv->roffset - 1;
967 assert(spaceSize >= 0);
968 }
969
970 // quit reading if there is no space left
971 if (!spaceSize) {
972 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
973 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
974 "Squid input buffer: " << hlp->id_name << " #" <<
975 (srv->index + 1));
976 srv->closePipesSafely();
977 return;
978 }
979
980 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
981 CommIoCbPtrFun(helperHandleRead, srv));
982 comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
983 }
984 }
985
986 static void
987 helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
988 {
989 char *t = NULL;
990 helper_stateful_server *srv = (helper_stateful_server *)data;
991 helper_stateful_request *r;
992 statefulhelper *hlp = srv->parent;
993 assert(cbdataReferenceValid(data));
994
995 /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
996
997 if (flag == COMM_ERR_CLOSING) {
998 return;
999 }
1000
1001 assert(conn->fd == srv->readPipe->fd);
1002
1003 debugs(84, 5, "helperStatefulHandleRead: " << len << " bytes from " <<
1004 hlp->id_name << " #" << srv->index + 1);
1005
1006 if (flag != COMM_OK || len == 0) {
1007 srv->closePipesSafely();
1008 return;
1009 }
1010
1011 srv->roffset += len;
1012 srv->rbuf[srv->roffset] = '\0';
1013 r = srv->request;
1014
1015 if (r == NULL) {
1016 /* someone spoke without being spoken to */
1017 debugs(84, DBG_IMPORTANT, "helperStatefulHandleRead: unexpected read from " <<
1018 hlp->id_name << " #" << srv->index + 1 << ", " << (int)len <<
1019 " bytes '" << srv->rbuf << "'");
1020
1021 srv->roffset = 0;
1022 }
1023
1024 if ((t = strchr(srv->rbuf, hlp->eom))) {
1025 /* end of reply found */
1026 int called = 1;
1027 int skip = 1;
1028 debugs(84, 3, "helperStatefulHandleRead: end of reply found");
1029
1030 if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n') {
1031 *t = '\0';
1032 // rewind to the \r octet which is the real terminal now
1033 // and remember that we have to skip forward 2 places now.
1034 skip = 2;
1035 --t;
1036 }
1037
1038 *t = '\0';
1039
1040 if (r && cbdataReferenceValid(r->data)) {
1041 HelperReply res(srv->rbuf, (t - srv->rbuf));
1042 res.whichServer = srv;
1043 r->callback(r->data, res);
1044 } else {
1045 debugs(84, DBG_IMPORTANT, "StatefulHandleRead: no callback data registered");
1046 called = 0;
1047 }
1048 // only skip off the \0's _after_ passing its location in HelperReply above
1049 t += skip;
1050
1051 srv->flags.busy = 0;
1052 srv->roffset = 0;
1053 helperStatefulRequestFree(r);
1054 srv->request = NULL;
1055
1056 -- srv->stats.pending;
1057 ++ srv->stats.replies;
1058
1059 ++ hlp->stats.replies;
1060 srv->answer_time = current_time;
1061 hlp->stats.avg_svc_time =
1062 Math::intAverage(hlp->stats.avg_svc_time,
1063 tvSubMsec(srv->dispatch_time, current_time),
1064 hlp->stats.replies, REDIRECT_AV_FACTOR);
1065
1066 if (called)
1067 helperStatefulServerDone(srv);
1068 else
1069 helperStatefulReleaseServer(srv);
1070 }
1071
1072 if (Comm::IsConnOpen(srv->readPipe)) {
1073 int spaceSize = srv->rbuf_sz - srv->roffset - 1;
1074 assert(spaceSize >= 0);
1075
1076 // grow the input buffer if needed and possible
1077 if (!spaceSize && srv->rbuf_sz + 4096 <= ReadBufMaxSize) {
1078 srv->rbuf = (char *)memReallocBuf(srv->rbuf, srv->rbuf_sz + 4096, &srv->rbuf_sz);
1079 debugs(84, 3, HERE << "Grew read buffer to " << srv->rbuf_sz);
1080 spaceSize = srv->rbuf_sz - srv->roffset - 1;
1081 assert(spaceSize >= 0);
1082 }
1083
1084 // quit reading if there is no space left
1085 if (!spaceSize) {
1086 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
1087 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
1088 "Squid input buffer: " << hlp->id_name << " #" <<
1089 (srv->index + 1));
1090 srv->closePipesSafely();
1091 return;
1092 }
1093
1094 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
1095 CommIoCbPtrFun(helperStatefulHandleRead, srv));
1096 comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
1097 }
1098 }
1099
1100 static void
1101 Enqueue(helper * hlp, helper_request * r)
1102 {
1103 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
1104 dlinkAddTail(r, link, &hlp->queue);
1105 ++ hlp->stats.queue_size;
1106
1107 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1108 if (hlp->childs.needNew() > 0) {
1109 debugs(84, DBG_CRITICAL, "Starting new " << hlp->id_name << " helpers...");
1110 helperOpenServers(hlp);
1111 return;
1112 }
1113
1114 if (hlp->stats.queue_size < (int)hlp->childs.n_running)
1115 return;
1116
1117 if (squid_curtime - hlp->last_queue_warn < 600)
1118 return;
1119
1120 if (shutting_down || reconfiguring)
1121 return;
1122
1123 hlp->last_queue_warn = squid_curtime;
1124
1125 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1126 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1127 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1128
1129 if (hlp->stats.queue_size > (int)hlp->childs.n_running * 2)
1130 fatalf("Too many queued %s requests", hlp->id_name);
1131 }
1132
1133 static void
1134 StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r)
1135 {
1136 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
1137 dlinkAddTail(r, link, &hlp->queue);
1138 ++ hlp->stats.queue_size;
1139
1140 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1141 if (hlp->childs.needNew() > 0) {
1142 debugs(84, DBG_CRITICAL, "Starting new " << hlp->id_name << " helpers...");
1143 helperStatefulOpenServers(hlp);
1144 return;
1145 }
1146
1147 if (hlp->stats.queue_size < (int)hlp->childs.n_running)
1148 return;
1149
1150 if (hlp->stats.queue_size > (int)hlp->childs.n_running * 2)
1151 fatalf("Too many queued %s requests", hlp->id_name);
1152
1153 if (squid_curtime - hlp->last_queue_warn < 600)
1154 return;
1155
1156 if (shutting_down || reconfiguring)
1157 return;
1158
1159 hlp->last_queue_warn = squid_curtime;
1160
1161 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1162 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1163 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1164 }
1165
1166 static helper_request *
1167 Dequeue(helper * hlp)
1168 {
1169 dlink_node *link;
1170 helper_request *r = NULL;
1171
1172 if ((link = hlp->queue.head)) {
1173 r = (helper_request *)link->data;
1174 dlinkDelete(link, &hlp->queue);
1175 memFree(link, MEM_DLINK_NODE);
1176 -- hlp->stats.queue_size;
1177 }
1178
1179 return r;
1180 }
1181
1182 static helper_stateful_request *
1183 StatefulDequeue(statefulhelper * hlp)
1184 {
1185 dlink_node *link;
1186 helper_stateful_request *r = NULL;
1187
1188 if ((link = hlp->queue.head)) {
1189 r = (helper_stateful_request *)link->data;
1190 dlinkDelete(link, &hlp->queue);
1191 memFree(link, MEM_DLINK_NODE);
1192 -- hlp->stats.queue_size;
1193 }
1194
1195 return r;
1196 }
1197
1198 static helper_server *
1199 GetFirstAvailable(helper * hlp)
1200 {
1201 dlink_node *n;
1202 helper_server *srv;
1203 helper_server *selected = NULL;
1204 debugs(84, 5, "GetFirstAvailable: Running servers " << hlp->childs.n_running);
1205
1206 if (hlp->childs.n_running == 0)
1207 return NULL;
1208
1209 /* Find "least" loaded helper (approx) */
1210 for (n = hlp->servers.head; n != NULL; n = n->next) {
1211 srv = (helper_server *)n->data;
1212
1213 if (selected && selected->stats.pending <= srv->stats.pending)
1214 continue;
1215
1216 if (srv->flags.shutdown)
1217 continue;
1218
1219 if (!srv->stats.pending)
1220 return srv;
1221
1222 if (selected) {
1223 selected = srv;
1224 break;
1225 }
1226
1227 selected = srv;
1228 }
1229
1230 /* Check for overload */
1231 if (!selected) {
1232 debugs(84, 5, "GetFirstAvailable: None available.");
1233 return NULL;
1234 }
1235
1236 if (selected->stats.pending >= (hlp->childs.concurrency ? hlp->childs.concurrency : 1)) {
1237 debugs(84, 3, "GetFirstAvailable: Least-loaded helper is overloaded!");
1238 return NULL;
1239 }
1240
1241 debugs(84, 5, "GetFirstAvailable: returning srv-" << selected->index);
1242 return selected;
1243 }
1244
1245 static helper_stateful_server *
1246 StatefulGetFirstAvailable(statefulhelper * hlp)
1247 {
1248 dlink_node *n;
1249 helper_stateful_server *srv = NULL;
1250 debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp->childs.n_running);
1251
1252 if (hlp->childs.n_running == 0)
1253 return NULL;
1254
1255 for (n = hlp->servers.head; n != NULL; n = n->next) {
1256 srv = (helper_stateful_server *)n->data;
1257
1258 if (srv->flags.busy)
1259 continue;
1260
1261 if (srv->flags.reserved)
1262 continue;
1263
1264 if (srv->flags.shutdown)
1265 continue;
1266
1267 if ((hlp->IsAvailable != NULL) && (srv->data != NULL) && !(hlp->IsAvailable(srv->data)))
1268 continue;
1269
1270 debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv->index);
1271 return srv;
1272 }
1273
1274 debugs(84, 5, "StatefulGetFirstAvailable: None available.");
1275 return NULL;
1276 }
1277
1278 static void
1279 helperDispatchWriteDone(const Comm::ConnectionPointer &conn, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
1280 {
1281 helper_server *srv = (helper_server *)data;
1282
1283 srv->writebuf->clean();
1284 delete srv->writebuf;
1285 srv->writebuf = NULL;
1286 srv->flags.writing = 0;
1287
1288 if (flag != COMM_OK) {
1289 /* Helper server has crashed */
1290 debugs(84, DBG_CRITICAL, "helperDispatch: Helper " << srv->parent->id_name << " #" << srv->index + 1 << " has crashed");
1291 return;
1292 }
1293
1294 if (!srv->wqueue->isNull()) {
1295 srv->writebuf = srv->wqueue;
1296 srv->wqueue = new MemBuf;
1297 srv->flags.writing = 1;
1298 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1299 CommIoCbPtrFun(helperDispatchWriteDone, srv));
1300 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
1301 }
1302 }
1303
1304 static void
1305 helperDispatch(helper_server * srv, helper_request * r)
1306 {
1307 helper *hlp = srv->parent;
1308 helper_request **ptr = NULL;
1309 unsigned int slot;
1310
1311 if (!cbdataReferenceValid(r->data)) {
1312 debugs(84, DBG_IMPORTANT, "helperDispatch: invalid callback data");
1313 helperRequestFree(r);
1314 return;
1315 }
1316
1317 for (slot = 0; slot < (hlp->childs.concurrency ? hlp->childs.concurrency : 1); ++slot) {
1318 if (!srv->requests[slot]) {
1319 ptr = &srv->requests[slot];
1320 break;
1321 }
1322 }
1323
1324 assert(ptr);
1325 *ptr = r;
1326 r->dispatch_time = current_time;
1327
1328 if (srv->wqueue->isNull())
1329 srv->wqueue->init();
1330
1331 if (hlp->childs.concurrency)
1332 srv->wqueue->Printf("%d %s", slot, r->buf);
1333 else
1334 srv->wqueue->append(r->buf, strlen(r->buf));
1335
1336 if (!srv->flags.writing) {
1337 assert(NULL == srv->writebuf);
1338 srv->writebuf = srv->wqueue;
1339 srv->wqueue = new MemBuf;
1340 srv->flags.writing = 1;
1341 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1342 CommIoCbPtrFun(helperDispatchWriteDone, srv));
1343 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
1344 }
1345
1346 debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index + 1 << ", " << strlen(r->buf) << " bytes");
1347
1348 ++ srv->stats.uses;
1349 ++ srv->stats.pending;
1350 ++ hlp->stats.requests;
1351 }
1352
1353 static void
1354 helperStatefulDispatchWriteDone(const Comm::ConnectionPointer &conn, char *buf, size_t len, comm_err_t flag,
1355 int xerrno, void *data)
1356 {
1357 /* nothing! */
1358 }
1359
1360 static void
1361 helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r)
1362 {
1363 statefulhelper *hlp = srv->parent;
1364
1365 if (!cbdataReferenceValid(r->data)) {
1366 debugs(84, DBG_IMPORTANT, "helperStatefulDispatch: invalid callback data");
1367 helperStatefulRequestFree(r);
1368 helperStatefulReleaseServer(srv);
1369 return;
1370 }
1371
1372 debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index + 1);
1373
1374 if (r->placeholder == 1) {
1375 /* a callback is needed before this request can _use_ a helper. */
1376 /* we don't care about releasing this helper. The request NEVER
1377 * gets to the helper. So we throw away the return code */
1378 HelperReply nilReply;
1379 nilReply.whichServer = srv;
1380 r->callback(r->data, nilReply);
1381 /* throw away the placeholder */
1382 helperStatefulRequestFree(r);
1383 /* and push the queue. Note that the callback may have submitted a new
1384 * request to the helper which is why we test for the request */
1385
1386 if (srv->request == NULL)
1387 helperStatefulServerDone(srv);
1388
1389 return;
1390 }
1391
1392 srv->flags.busy = 1;
1393 srv->flags.reserved = 1;
1394 srv->request = r;
1395 srv->dispatch_time = current_time;
1396 AsyncCall::Pointer call = commCbCall(5,5, "helperStatefulDispatchWriteDone",
1397 CommIoCbPtrFun(helperStatefulDispatchWriteDone, hlp));
1398 Comm::Write(srv->writePipe, r->buf, strlen(r->buf), call, NULL);
1399 debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
1400 hlp->id_name << " #" << srv->index + 1 << ", " <<
1401 (int) strlen(r->buf) << " bytes");
1402
1403 ++ srv->stats.uses;
1404 ++ srv->stats.pending;
1405 ++ hlp->stats.requests;
1406 }
1407
1408 static void
1409 helperKickQueue(helper * hlp)
1410 {
1411 helper_request *r;
1412 helper_server *srv;
1413
1414 while ((srv = GetFirstAvailable(hlp)) && (r = Dequeue(hlp)))
1415 helperDispatch(srv, r);
1416 }
1417
1418 static void
1419 helperStatefulKickQueue(statefulhelper * hlp)
1420 {
1421 helper_stateful_request *r;
1422 helper_stateful_server *srv;
1423
1424 while ((srv = StatefulGetFirstAvailable(hlp)) && (r = StatefulDequeue(hlp)))
1425 helperStatefulDispatch(srv, r);
1426 }
1427
1428 static void
1429 helperStatefulServerDone(helper_stateful_server * srv)
1430 {
1431 if (!srv->flags.shutdown) {
1432 helperStatefulKickQueue(srv->parent);
1433 } else if (!srv->flags.closing && !srv->flags.reserved && !srv->flags.busy) {
1434 srv->closeWritePipeSafely();
1435 return;
1436 }
1437 }
1438
1439 static void
1440 helperRequestFree(helper_request * r)
1441 {
1442 cbdataReferenceDone(r->data);
1443 xfree(r->buf);
1444 delete r;
1445 }
1446
1447 static void
1448 helperStatefulRequestFree(helper_stateful_request * r)
1449 {
1450 if (r) {
1451 cbdataReferenceDone(r->data);
1452 xfree(r->buf);
1453 delete r;
1454 }
1455 }
1456
1457 // TODO: should helper_ and helper_stateful_ have a common parent?
1458 static bool
1459 helperStartStats(StoreEntry *sentry, void *hlp, const char *label)
1460 {
1461 if (!hlp) {
1462 if (label)
1463 storeAppendPrintf(sentry, "%s: unavailable\n", label);
1464 return false;
1465 }
1466
1467 if (label)
1468 storeAppendPrintf(sentry, "%s:\n", label);
1469
1470 return true;
1471 }