]> git.ipfire.org Git - thirdparty/squid.git/blob - src/helper.cc
Turned flags to bool types for PeerDigest, RefreshPattern,
[thirdparty/squid.git] / src / helper.cc
1 /*
2 * DEBUG: section 84 Helper process maintenance
3 * AUTHOR: Harvest Derived?
4 *
5 * SQUID Web Proxy Cache http://www.squid-cache.org/
6 * ----------------------------------------------------------
7 *
8 * Squid is the result of efforts by numerous individuals from
9 * the Internet community; see the CONTRIBUTORS file for full
10 * details. Many organizations have provided support for Squid's
11 * development; see the SPONSORS file for full details. Squid is
12 * Copyrighted (C) 2001 by the Regents of the University of
13 * California; see the COPYRIGHT file for full details. Squid
14 * incorporates software developed and/or copyrighted by other
15 * sources; see the CREDITS file for full details.
16 *
17 * This program is free software; you can redistribute it and/or modify
18 * it under the terms of the GNU General Public License as published by
19 * the Free Software Foundation; either version 2 of the License, or
20 * (at your option) any later version.
21 *
22 * This program is distributed in the hope that it will be useful,
23 * but WITHOUT ANY WARRANTY; without even the implied warranty of
24 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
25 * GNU General Public License for more details.
26 *
27 * You should have received a copy of the GNU General Public License
28 * along with this program; if not, write to the Free Software
29 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
30 *
31 */
32
33 #include "squid.h"
34 #include "base/AsyncCbdataCalls.h"
35 #include "comm.h"
36 #include "comm/Connection.h"
37 #include "comm/Write.h"
38 #include "fd.h"
39 #include "format/Quoting.h"
40 #include "helper.h"
41 #include "Mem.h"
42 #include "MemBuf.h"
43 #include "SquidIpc.h"
44 #include "SquidMath.h"
45 #include "SquidTime.h"
46 #include "Store.h"
47 #include "wordlist.h"
48
49 #define HELPER_MAX_ARGS 64
50
51 /** Initial Squid input buffer size. Helper responses may exceed this, and
52 * Squid will grow the input buffer as needed, up to ReadBufMaxSize.
53 */
54 const size_t ReadBufMinSize(4*1024);
55
56 /** Maximum safe size of a helper-to-Squid response message plus one.
57 * Squid will warn and close the stream if a helper sends a too-big response.
58 * ssl_crtd helper is known to produce responses of at least 10KB in size.
59 * Some undocumented helpers are known to produce responses exceeding 8KB.
60 */
61 const size_t ReadBufMaxSize(32*1024);
62
63 static IOCB helperHandleRead;
64 static IOCB helperStatefulHandleRead;
65 static void helperServerFree(helper_server *srv);
66 static void helperStatefulServerFree(helper_stateful_server *srv);
67 static void Enqueue(helper * hlp, helper_request *);
68 static helper_request *Dequeue(helper * hlp);
69 static helper_stateful_request *StatefulDequeue(statefulhelper * hlp);
70 static helper_server *GetFirstAvailable(helper * hlp);
71 static helper_stateful_server *StatefulGetFirstAvailable(statefulhelper * hlp);
72 static void helperDispatch(helper_server * srv, helper_request * r);
73 static void helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r);
74 static void helperKickQueue(helper * hlp);
75 static void helperStatefulKickQueue(statefulhelper * hlp);
76 static void helperStatefulServerDone(helper_stateful_server * srv);
77 static void helperRequestFree(helper_request * r);
78 static void helperStatefulRequestFree(helper_stateful_request * r);
79 static void StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r);
80 static bool helperStartStats(StoreEntry *sentry, void *hlp, const char *label);
81
82 CBDATA_CLASS_INIT(helper);
83 CBDATA_TYPE(helper_server);
84 CBDATA_CLASS_INIT(statefulhelper);
85 CBDATA_TYPE(helper_stateful_server);
86
87 void
88 HelperServerBase::initStats()
89 {
90 stats.uses=0;
91 stats.replies=0;
92 stats.pending=0;
93 stats.releases=0;
94 }
95
96 void
97 HelperServerBase::closePipesSafely()
98 {
99 #if _SQUID_WINDOWS_
100 int no = index + 1;
101
102 shutdown(writePipe->fd, SD_BOTH);
103 #endif
104
105 flags.closing = true;
106 if (readPipe->fd == writePipe->fd)
107 readPipe->fd = -1;
108 else
109 readPipe->close();
110 writePipe->close();
111
112 #if _SQUID_WINDOWS_
113 if (hIpc) {
114 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
115 getCurrentTime();
116 debugs(84, DBG_IMPORTANT, "WARNING: " << hlp->id_name <<
117 " #" << no << " (" << hlp->cmdline->key << "," <<
118 (long int)pid << ") didn't exit in 5 seconds");
119 }
120 CloseHandle(hIpc);
121 }
122 #endif
123 }
124
125 void
126 HelperServerBase::closeWritePipeSafely()
127 {
128 #if _SQUID_WINDOWS_
129 int no = index + 1;
130
131 shutdown(writePipe->fd, (readPipe->fd == writePipe->fd ? SD_BOTH : SD_SEND));
132 #endif
133
134 flags.closing = true;
135 if (readPipe->fd == writePipe->fd)
136 readPipe->fd = -1;
137 writePipe->close();
138
139 #if _SQUID_WINDOWS_
140 if (hIpc) {
141 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
142 getCurrentTime();
143 debugs(84, DBG_IMPORTANT, "WARNING: " << hlp->id_name <<
144 " #" << no << " (" << hlp->cmdline->key << "," <<
145 (long int)pid << ") didn't exit in 5 seconds");
146 }
147 CloseHandle(hIpc);
148 }
149 #endif
150 }
151
152 void
153 helperOpenServers(helper * hlp)
154 {
155 char *s;
156 char *progname;
157 char *shortname;
158 char *procname;
159 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
160 char fd_note_buf[FD_DESC_SZ];
161 helper_server *srv;
162 int nargs = 0;
163 int k;
164 pid_t pid;
165 int rfd;
166 int wfd;
167 void * hIpc;
168 wordlist *w;
169
170 if (hlp->cmdline == NULL)
171 return;
172
173 progname = hlp->cmdline->key;
174
175 if ((s = strrchr(progname, '/')))
176 shortname = xstrdup(s + 1);
177 else
178 shortname = xstrdup(progname);
179
180 /* figure out how many new child are actually needed. */
181 int need_new = hlp->childs.needNew();
182
183 debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
184
185 if (need_new < 1) {
186 debugs(84, DBG_IMPORTANT, "helperOpenServers: No '" << shortname << "' processes needed.");
187 }
188
189 procname = (char *)xmalloc(strlen(shortname) + 3);
190
191 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
192
193 args[nargs] = procname;
194 ++nargs;
195
196 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
197 args[nargs] = w->key;
198 ++nargs;
199 }
200
201 args[nargs] = NULL;
202 ++nargs;
203
204 assert(nargs <= HELPER_MAX_ARGS);
205
206 for (k = 0; k < need_new; ++k) {
207 getCurrentTime();
208 rfd = wfd = -1;
209 pid = ipcCreate(hlp->ipc_type,
210 progname,
211 args,
212 shortname,
213 hlp->addr,
214 &rfd,
215 &wfd,
216 &hIpc);
217
218 if (pid < 0) {
219 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
220 continue;
221 }
222
223 ++ hlp->childs.n_running;
224 ++ hlp->childs.n_active;
225 CBDATA_INIT_TYPE(helper_server);
226 srv = cbdataAlloc(helper_server);
227 srv->hIpc = hIpc;
228 srv->pid = pid;
229 srv->initStats();
230 srv->index = k;
231 srv->addr = hlp->addr;
232 srv->readPipe = new Comm::Connection;
233 srv->readPipe->fd = rfd;
234 srv->writePipe = new Comm::Connection;
235 srv->writePipe->fd = wfd;
236 srv->rbuf = (char *)memAllocBuf(ReadBufMinSize, &srv->rbuf_sz);
237 srv->wqueue = new MemBuf;
238 srv->roffset = 0;
239 srv->requests = (helper_request **)xcalloc(hlp->childs.concurrency ? hlp->childs.concurrency : 1, sizeof(*srv->requests));
240 srv->parent = cbdataReference(hlp);
241 dlinkAddTail(srv, &srv->link, &hlp->servers);
242
243 if (rfd == wfd) {
244 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
245 fd_note(rfd, fd_note_buf);
246 } else {
247 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
248 fd_note(rfd, fd_note_buf);
249 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
250 fd_note(wfd, fd_note_buf);
251 }
252
253 commSetNonBlocking(rfd);
254
255 if (wfd != rfd)
256 commSetNonBlocking(wfd);
257
258 AsyncCall::Pointer closeCall = asyncCall(5,4, "helperServerFree", cbdataDialer(helperServerFree, srv));
259 comm_add_close_handler(rfd, closeCall);
260
261 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
262 CommIoCbPtrFun(helperHandleRead, srv));
263 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
264 }
265
266 hlp->last_restart = squid_curtime;
267 safe_free(shortname);
268 safe_free(procname);
269 helperKickQueue(hlp);
270 }
271
272 /**
273 * DPW 2007-05-08
274 *
275 * helperStatefulOpenServers: create the stateful child helper processes
276 */
277 void
278 helperStatefulOpenServers(statefulhelper * hlp)
279 {
280 char *shortname;
281 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
282 char fd_note_buf[FD_DESC_SZ];
283 int nargs = 0;
284
285 if (hlp->cmdline == NULL)
286 return;
287
288 if (hlp->childs.concurrency)
289 debugs(84, DBG_CRITICAL, "ERROR: concurrency= is not yet supported for stateful helpers ('" << hlp->cmdline << "')");
290
291 char *progname = hlp->cmdline->key;
292
293 char *s;
294 if ((s = strrchr(progname, '/')))
295 shortname = xstrdup(s + 1);
296 else
297 shortname = xstrdup(progname);
298
299 /* figure out haw mant new helpers are needed. */
300 int need_new = hlp->childs.needNew();
301
302 debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
303
304 if (need_new < 1) {
305 debugs(84, DBG_IMPORTANT, "helperStatefulOpenServers: No '" << shortname << "' processes needed.");
306 }
307
308 char *procname = (char *)xmalloc(strlen(shortname) + 3);
309
310 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
311
312 args[nargs] = procname;
313 ++nargs;
314
315 for (wordlist *w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
316 args[nargs] = w->key;
317 ++nargs;
318 }
319
320 args[nargs] = NULL;
321 ++nargs;
322
323 assert(nargs <= HELPER_MAX_ARGS);
324
325 for (int k = 0; k < need_new; ++k) {
326 getCurrentTime();
327 int rfd = -1;
328 int wfd = -1;
329 void * hIpc;
330 pid_t pid = ipcCreate(hlp->ipc_type,
331 progname,
332 args,
333 shortname,
334 hlp->addr,
335 &rfd,
336 &wfd,
337 &hIpc);
338
339 if (pid < 0) {
340 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
341 continue;
342 }
343
344 ++ hlp->childs.n_running;
345 ++ hlp->childs.n_active;
346 CBDATA_INIT_TYPE(helper_stateful_server);
347 helper_stateful_server *srv = cbdataAlloc(helper_stateful_server);
348 srv->hIpc = hIpc;
349 srv->pid = pid;
350 srv->flags.reserved = false;
351 srv->initStats();
352 srv->index = k;
353 srv->addr = hlp->addr;
354 srv->readPipe = new Comm::Connection;
355 srv->readPipe->fd = rfd;
356 srv->writePipe = new Comm::Connection;
357 srv->writePipe->fd = wfd;
358 srv->rbuf = (char *)memAllocBuf(ReadBufMinSize, &srv->rbuf_sz);
359 srv->roffset = 0;
360 srv->parent = cbdataReference(hlp);
361
362 if (hlp->datapool != NULL)
363 srv->data = hlp->datapool->alloc();
364
365 dlinkAddTail(srv, &srv->link, &hlp->servers);
366
367 if (rfd == wfd) {
368 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
369 fd_note(rfd, fd_note_buf);
370 } else {
371 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
372 fd_note(rfd, fd_note_buf);
373 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
374 fd_note(wfd, fd_note_buf);
375 }
376
377 commSetNonBlocking(rfd);
378
379 if (wfd != rfd)
380 commSetNonBlocking(wfd);
381
382 AsyncCall::Pointer closeCall = asyncCall(5,4, "helperStatefulServerFree", cbdataDialer(helperStatefulServerFree, srv));
383 comm_add_close_handler(rfd, closeCall);
384
385 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
386 CommIoCbPtrFun(helperStatefulHandleRead, srv));
387 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
388 }
389
390 hlp->last_restart = squid_curtime;
391 safe_free(shortname);
392 safe_free(procname);
393 helperStatefulKickQueue(hlp);
394 }
395
396 void
397 helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
398 {
399 if (hlp == NULL) {
400 debugs(84, 3, "helperSubmit: hlp == NULL");
401 HelperReply nilReply;
402 callback(data, nilReply);
403 return;
404 }
405
406 helper_request *r = new helper_request;
407 helper_server *srv;
408
409 r->callback = callback;
410 r->data = cbdataReference(data);
411 r->buf = xstrdup(buf);
412
413 if ((srv = GetFirstAvailable(hlp)))
414 helperDispatch(srv, r);
415 else
416 Enqueue(hlp, r);
417
418 debugs(84, DBG_DATA, Raw("buf", buf, strlen(buf)));
419 }
420
421 /// lastserver = "server last used as part of a reserved request sequence"
422 void
423 helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPCB * callback, void *data, helper_stateful_server * lastserver)
424 {
425 if (hlp == NULL) {
426 debugs(84, 3, "helperStatefulSubmit: hlp == NULL");
427 HelperReply nilReply;
428 callback(data, nilReply);
429 return;
430 }
431
432 helper_stateful_request *r = new helper_stateful_request;
433
434 r->callback = callback;
435 r->data = cbdataReference(data);
436
437 if (buf != NULL) {
438 r->buf = xstrdup(buf);
439 r->placeholder = 0;
440 } else {
441 r->buf = NULL;
442 r->placeholder = 1;
443 }
444
445 if ((buf != NULL) && lastserver) {
446 debugs(84, 5, "StatefulSubmit with lastserver " << lastserver);
447 assert(lastserver->flags.reserved);
448 assert(!(lastserver->request));
449
450 debugs(84, 5, "StatefulSubmit dispatching");
451 helperStatefulDispatch(lastserver, r);
452 } else {
453 helper_stateful_server *srv;
454 if ((srv = StatefulGetFirstAvailable(hlp))) {
455 helperStatefulDispatch(srv, r);
456 } else
457 StatefulEnqueue(hlp, r);
458 }
459
460 debugs(84, DBG_DATA, "placeholder: '" << r->placeholder <<
461 "', " << Raw("buf", buf, strlen(buf)));
462 }
463
464 /**
465 * DPW 2007-05-08
466 *
467 * helperStatefulReleaseServer tells the helper that whoever was
468 * using it no longer needs its services.
469 */
470 void
471 helperStatefulReleaseServer(helper_stateful_server * srv)
472 {
473 debugs(84, 3, HERE << "srv-" << srv->index << " flags.reserved = " << srv->flags.reserved);
474 if (!srv->flags.reserved)
475 return;
476
477 ++ srv->stats.releases;
478
479 srv->flags.reserved = false;
480 if (srv->parent->OnEmptyQueue != NULL && srv->data)
481 srv->parent->OnEmptyQueue(srv->data);
482
483 helperStatefulServerDone(srv);
484 }
485
486 /** return a pointer to the stateful routines data area */
487 void *
488 helperStatefulServerGetData(helper_stateful_server * srv)
489 {
490 return srv->data;
491 }
492
493 /**
494 * Dump some stats about the helper states to a StoreEntry
495 */
496 void
497 helperStats(StoreEntry * sentry, helper * hlp, const char *label)
498 {
499 if (!helperStartStats(sentry, hlp, label))
500 return;
501
502 storeAppendPrintf(sentry, "program: %s\n",
503 hlp->cmdline->key);
504 storeAppendPrintf(sentry, "number active: %d of %d (%d shutting down)\n",
505 hlp->childs.n_active, hlp->childs.n_max, (hlp->childs.n_running - hlp->childs.n_active) );
506 storeAppendPrintf(sentry, "requests sent: %d\n",
507 hlp->stats.requests);
508 storeAppendPrintf(sentry, "replies received: %d\n",
509 hlp->stats.replies);
510 storeAppendPrintf(sentry, "queue length: %d\n",
511 hlp->stats.queue_size);
512 storeAppendPrintf(sentry, "avg service time: %d msec\n",
513 hlp->stats.avg_svc_time);
514 storeAppendPrintf(sentry, "\n");
515 storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%11s\t%s\t%7s\t%7s\t%7s\n",
516 "#",
517 "FD",
518 "PID",
519 "# Requests",
520 "# Replies",
521 "Flags",
522 "Time",
523 "Offset",
524 "Request");
525
526 for (dlink_node *link = hlp->servers.head; link; link = link->next) {
527 helper_server *srv = (helper_server*)link->data;
528 double tt = 0.001 * (srv->requests[0] ? tvSubMsec(srv->requests[0]->dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
529 storeAppendPrintf(sentry, "%7d\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "%c%c%c%c\t%7.3f\t%7d\t%s\n",
530 srv->index + 1,
531 srv->readPipe->fd,
532 srv->pid,
533 srv->stats.uses,
534 srv->stats.replies,
535 srv->stats.pending ? 'B' : ' ',
536 srv->flags.writing ? 'W' : ' ',
537 srv->flags.closing ? 'C' : ' ',
538 srv->flags.shutdown ? 'S' : ' ',
539 tt < 0.0 ? 0.0 : tt,
540 (int) srv->roffset,
541 srv->requests[0] ? Format::QuoteMimeBlob(srv->requests[0]->buf) : "(none)");
542 }
543
544 storeAppendPrintf(sentry, "\nFlags key:\n\n");
545 storeAppendPrintf(sentry, " B = BUSY\n");
546 storeAppendPrintf(sentry, " W = WRITING\n");
547 storeAppendPrintf(sentry, " C = CLOSING\n");
548 storeAppendPrintf(sentry, " S = SHUTDOWN PENDING\n");
549 }
550
551 void
552 helperStatefulStats(StoreEntry * sentry, statefulhelper * hlp, const char *label)
553 {
554 if (!helperStartStats(sentry, hlp, label))
555 return;
556
557 storeAppendPrintf(sentry, "program: %s\n",
558 hlp->cmdline->key);
559 storeAppendPrintf(sentry, "number active: %d of %d (%d shutting down)\n",
560 hlp->childs.n_active, hlp->childs.n_max, (hlp->childs.n_running - hlp->childs.n_active) );
561 storeAppendPrintf(sentry, "requests sent: %d\n",
562 hlp->stats.requests);
563 storeAppendPrintf(sentry, "replies received: %d\n",
564 hlp->stats.replies);
565 storeAppendPrintf(sentry, "queue length: %d\n",
566 hlp->stats.queue_size);
567 storeAppendPrintf(sentry, "avg service time: %d msec\n",
568 hlp->stats.avg_svc_time);
569 storeAppendPrintf(sentry, "\n");
570 storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
571 "#",
572 "FD",
573 "PID",
574 "# Requests",
575 "# Replies",
576 "Flags",
577 "Time",
578 "Offset",
579 "Request");
580
581 for (dlink_node *link = hlp->servers.head; link; link = link->next) {
582 helper_stateful_server *srv = (helper_stateful_server *)link->data;
583 double tt = 0.001 * tvSubMsec(srv->dispatch_time, srv->flags.busy ? current_time : srv->answer_time);
584 storeAppendPrintf(sentry, "%7d\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
585 srv->index + 1,
586 srv->readPipe->fd,
587 srv->pid,
588 srv->stats.uses,
589 srv->stats.replies,
590 srv->flags.busy ? 'B' : ' ',
591 srv->flags.closing ? 'C' : ' ',
592 srv->flags.reserved ? 'R' : ' ',
593 srv->flags.shutdown ? 'S' : ' ',
594 srv->request ? (srv->request->placeholder ? 'P' : ' ') : ' ',
595 tt < 0.0 ? 0.0 : tt,
596 (int) srv->roffset,
597 srv->request ? Format::QuoteMimeBlob(srv->request->buf) : "(none)");
598 }
599
600 storeAppendPrintf(sentry, "\nFlags key:\n\n");
601 storeAppendPrintf(sentry, " B = BUSY\n");
602 storeAppendPrintf(sentry, " C = CLOSING\n");
603 storeAppendPrintf(sentry, " R = RESERVED\n");
604 storeAppendPrintf(sentry, " S = SHUTDOWN PENDING\n");
605 storeAppendPrintf(sentry, " P = PLACEHOLDER\n");
606 }
607
608 void
609 helperShutdown(helper * hlp)
610 {
611 dlink_node *link = hlp->servers.head;
612
613 while (link) {
614 helper_server *srv;
615 srv = (helper_server *)link->data;
616 link = link->next;
617
618 if (srv->flags.shutdown) {
619 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " has already SHUT DOWN.");
620 continue;
621 }
622
623 assert(hlp->childs.n_active > 0);
624 -- hlp->childs.n_active;
625 srv->flags.shutdown = true; /* request it to shut itself down */
626
627 if (srv->flags.closing) {
628 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is CLOSING.");
629 continue;
630 }
631
632 if (srv->stats.pending) {
633 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is BUSY.");
634 continue;
635 }
636
637 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " shutting down.");
638 /* the rest of the details is dealt with in the helperServerFree
639 * close handler
640 */
641 srv->closePipesSafely();
642 }
643 }
644
645 void
646 helperStatefulShutdown(statefulhelper * hlp)
647 {
648 dlink_node *link = hlp->servers.head;
649 helper_stateful_server *srv;
650
651 while (link) {
652 srv = (helper_stateful_server *)link->data;
653 link = link->next;
654
655 if (srv->flags.shutdown) {
656 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " has already SHUT DOWN.");
657 continue;
658 }
659
660 assert(hlp->childs.n_active > 0);
661 -- hlp->childs.n_active;
662 srv->flags.shutdown = true; /* request it to shut itself down */
663
664 if (srv->flags.busy) {
665 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is BUSY.");
666 continue;
667 }
668
669 if (srv->flags.closing) {
670 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is CLOSING.");
671 continue;
672 }
673
674 if (srv->flags.reserved) {
675 if (shutting_down) {
676 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is RESERVED. Closing anyway.");
677 } else {
678 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " is RESERVED. Not Shutting Down Yet.");
679 continue;
680 }
681 }
682
683 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " shutting down.");
684
685 /* the rest of the details is dealt with in the helperStatefulServerFree
686 * close handler
687 */
688 srv->closePipesSafely();
689 }
690 }
691
692 helper::~helper()
693 {
694 /* note, don't free id_name, it probably points to static memory */
695
696 if (queue.head)
697 debugs(84, DBG_CRITICAL, "WARNING: freeing " << id_name << " helper with " << stats.queue_size << " requests queued");
698 }
699
700 /* ====================================================================== */
701 /* LOCAL FUNCTIONS */
702 /* ====================================================================== */
703
704 static void
705 helperServerFree(helper_server *srv)
706 {
707 helper *hlp = srv->parent;
708 helper_request *r;
709 int i, concurrency = hlp->childs.concurrency;
710
711 if (!concurrency)
712 concurrency = 1;
713
714 if (srv->rbuf) {
715 memFreeBuf(srv->rbuf_sz, srv->rbuf);
716 srv->rbuf = NULL;
717 }
718
719 srv->wqueue->clean();
720 delete srv->wqueue;
721
722 if (srv->writebuf) {
723 srv->writebuf->clean();
724 delete srv->writebuf;
725 srv->writebuf = NULL;
726 }
727
728 if (Comm::IsConnOpen(srv->writePipe))
729 srv->closeWritePipeSafely();
730
731 dlinkDelete(&srv->link, &hlp->servers);
732
733 assert(hlp->childs.n_running > 0);
734 -- hlp->childs.n_running;
735
736 if (!srv->flags.shutdown) {
737 assert(hlp->childs.n_active > 0);
738 -- hlp->childs.n_active;
739 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index + 1 << " exited");
740
741 if (hlp->childs.needNew() > 0) {
742 debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
743
744 if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30) {
745 if (srv->stats.replies < 1)
746 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
747 else
748 debugs(80, DBG_CRITICAL, "ERROR: The " << hlp->id_name << " helpers are crashing too rapidly, need help!");
749 }
750
751 debugs(80, DBG_IMPORTANT, "Starting new helpers");
752 helperOpenServers(hlp);
753 }
754 }
755
756 for (i = 0; i < concurrency; ++i) {
757 // XXX: re-schedule these on another helper?
758 if ((r = srv->requests[i])) {
759 void *cbdata;
760
761 if (cbdataReferenceValidDone(r->data, &cbdata)) {
762 HelperReply nilReply;
763 r->callback(cbdata, nilReply);
764 }
765
766 helperRequestFree(r);
767
768 srv->requests[i] = NULL;
769 }
770 }
771 safe_free(srv->requests);
772
773 cbdataReferenceDone(srv->parent);
774 cbdataFree(srv);
775 }
776
777 static void
778 helperStatefulServerFree(helper_stateful_server *srv)
779 {
780 statefulhelper *hlp = srv->parent;
781 helper_stateful_request *r;
782
783 if (srv->rbuf) {
784 memFreeBuf(srv->rbuf_sz, srv->rbuf);
785 srv->rbuf = NULL;
786 }
787
788 #if 0
789 srv->wqueue->clean();
790
791 delete srv->wqueue;
792
793 #endif
794
795 /* TODO: walk the local queue of requests and carry them all out */
796 if (Comm::IsConnOpen(srv->writePipe))
797 srv->closeWritePipeSafely();
798
799 dlinkDelete(&srv->link, &hlp->servers);
800
801 assert(hlp->childs.n_running > 0);
802 -- hlp->childs.n_running;
803
804 if (!srv->flags.shutdown) {
805 assert( hlp->childs.n_active > 0);
806 -- hlp->childs.n_active;
807 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index + 1 << " exited");
808
809 if (hlp->childs.needNew() > 0) {
810 debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
811
812 if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30) {
813 if (srv->stats.replies < 1)
814 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
815 else
816 debugs(80, DBG_CRITICAL, "ERROR: The " << hlp->id_name << " helpers are crashing too rapidly, need help!");
817 }
818
819 debugs(80, DBG_IMPORTANT, "Starting new helpers");
820 helperStatefulOpenServers(hlp);
821 }
822 }
823
824 if ((r = srv->request)) {
825 void *cbdata;
826
827 if (cbdataReferenceValidDone(r->data, &cbdata)) {
828 HelperReply nilReply;
829 nilReply.whichServer = srv;
830 r->callback(cbdata, nilReply);
831 }
832
833 helperStatefulRequestFree(r);
834
835 srv->request = NULL;
836 }
837
838 if (srv->data != NULL)
839 hlp->datapool->freeOne(srv->data);
840
841 cbdataReferenceDone(srv->parent);
842
843 cbdataFree(srv);
844 }
845
846 /// Calls back with a pointer to the buffer with the helper output
847 static void
848 helperReturnBuffer(int request_number, helper_server * srv, helper * hlp, char * msg, char * msg_end)
849 {
850 helper_request *r = srv->requests[request_number];
851 if (r) {
852 HLPCB *callback = r->callback;
853
854 srv->requests[request_number] = NULL;
855
856 r->callback = NULL;
857
858 void *cbdata = NULL;
859 if (cbdataReferenceValidDone(r->data, &cbdata)) {
860 HelperReply response(msg, (msg_end-msg));
861 callback(cbdata, response);
862 }
863
864 -- srv->stats.pending;
865 ++ srv->stats.replies;
866
867 ++ hlp->stats.replies;
868
869 srv->answer_time = current_time;
870
871 srv->dispatch_time = r->dispatch_time;
872
873 hlp->stats.avg_svc_time =
874 Math::intAverage(hlp->stats.avg_svc_time,
875 tvSubMsec(r->dispatch_time, current_time),
876 hlp->stats.replies, REDIRECT_AV_FACTOR);
877
878 helperRequestFree(r);
879 } else {
880 debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected reply on channel " <<
881 request_number << " from " << hlp->id_name << " #" << srv->index + 1 <<
882 " '" << srv->rbuf << "'");
883 }
884
885 if (!srv->flags.shutdown) {
886 helperKickQueue(hlp);
887 } else if (!srv->flags.closing && !srv->stats.pending) {
888 srv->flags.closing=true;
889 srv->writePipe->close();
890 }
891 }
892
893 static void
894 helperHandleRead(const Comm::ConnectionPointer &conn, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
895 {
896 char *t = NULL;
897 helper_server *srv = (helper_server *)data;
898 helper *hlp = srv->parent;
899 assert(cbdataReferenceValid(data));
900
901 /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
902
903 if (flag == COMM_ERR_CLOSING) {
904 return;
905 }
906
907 assert(conn->fd == srv->readPipe->fd);
908
909 debugs(84, 5, "helperHandleRead: " << len << " bytes from " << hlp->id_name << " #" << srv->index + 1);
910
911 if (flag != COMM_OK || len == 0) {
912 srv->closePipesSafely();
913 return;
914 }
915
916 srv->roffset += len;
917 srv->rbuf[srv->roffset] = '\0';
918 debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
919
920 if (!srv->stats.pending) {
921 /* someone spoke without being spoken to */
922 debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected read from " <<
923 hlp->id_name << " #" << srv->index + 1 << ", " << (int)len <<
924 " bytes '" << srv->rbuf << "'");
925
926 srv->roffset = 0;
927 srv->rbuf[0] = '\0';
928 }
929
930 while ((t = strchr(srv->rbuf, hlp->eom))) {
931 /* end of reply found */
932 char *msg = srv->rbuf;
933 int i = 0;
934 int skip = 1;
935 debugs(84, 3, "helperHandleRead: end of reply found");
936
937 if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n') {
938 *t = '\0';
939 // rewind to the \r octet which is the real terminal now
940 // and remember that we have to skip forward 2 places now.
941 skip = 2;
942 --t;
943 }
944
945 *t = '\0';
946
947 if (hlp->childs.concurrency) {
948 i = strtol(msg, &msg, 10);
949
950 while (*msg && xisspace(*msg))
951 ++msg;
952 }
953
954 helperReturnBuffer(i, srv, hlp, msg, t);
955 srv->roffset -= (t - srv->rbuf) + skip;
956 memmove(srv->rbuf, t, srv->roffset);
957 }
958
959 if (Comm::IsConnOpen(srv->readPipe)) {
960 int spaceSize = srv->rbuf_sz - srv->roffset - 1;
961 assert(spaceSize >= 0);
962
963 // grow the input buffer if needed and possible
964 if (!spaceSize && srv->rbuf_sz + 4096 <= ReadBufMaxSize) {
965 srv->rbuf = (char *)memReallocBuf(srv->rbuf, srv->rbuf_sz + 4096, &srv->rbuf_sz);
966 debugs(84, 3, HERE << "Grew read buffer to " << srv->rbuf_sz);
967 spaceSize = srv->rbuf_sz - srv->roffset - 1;
968 assert(spaceSize >= 0);
969 }
970
971 // quit reading if there is no space left
972 if (!spaceSize) {
973 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
974 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
975 "Squid input buffer: " << hlp->id_name << " #" <<
976 (srv->index + 1));
977 srv->closePipesSafely();
978 return;
979 }
980
981 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
982 CommIoCbPtrFun(helperHandleRead, srv));
983 comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
984 }
985 }
986
987 static void
988 helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
989 {
990 char *t = NULL;
991 helper_stateful_server *srv = (helper_stateful_server *)data;
992 helper_stateful_request *r;
993 statefulhelper *hlp = srv->parent;
994 assert(cbdataReferenceValid(data));
995
996 /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
997
998 if (flag == COMM_ERR_CLOSING) {
999 return;
1000 }
1001
1002 assert(conn->fd == srv->readPipe->fd);
1003
1004 debugs(84, 5, "helperStatefulHandleRead: " << len << " bytes from " <<
1005 hlp->id_name << " #" << srv->index + 1);
1006
1007 if (flag != COMM_OK || len == 0) {
1008 srv->closePipesSafely();
1009 return;
1010 }
1011
1012 srv->roffset += len;
1013 srv->rbuf[srv->roffset] = '\0';
1014 r = srv->request;
1015 debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
1016
1017 if (r == NULL) {
1018 /* someone spoke without being spoken to */
1019 debugs(84, DBG_IMPORTANT, "helperStatefulHandleRead: unexpected read from " <<
1020 hlp->id_name << " #" << srv->index + 1 << ", " << (int)len <<
1021 " bytes '" << srv->rbuf << "'");
1022
1023 srv->roffset = 0;
1024 }
1025
1026 if ((t = strchr(srv->rbuf, hlp->eom))) {
1027 /* end of reply found */
1028 int called = 1;
1029 int skip = 1;
1030 debugs(84, 3, "helperStatefulHandleRead: end of reply found");
1031
1032 if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n') {
1033 *t = '\0';
1034 // rewind to the \r octet which is the real terminal now
1035 // and remember that we have to skip forward 2 places now.
1036 skip = 2;
1037 --t;
1038 }
1039
1040 *t = '\0';
1041
1042 if (r && cbdataReferenceValid(r->data)) {
1043 HelperReply res(srv->rbuf, (t - srv->rbuf));
1044 res.whichServer = srv;
1045 r->callback(r->data, res);
1046 } else {
1047 debugs(84, DBG_IMPORTANT, "StatefulHandleRead: no callback data registered");
1048 called = 0;
1049 }
1050 // only skip off the \0's _after_ passing its location in HelperReply above
1051 t += skip;
1052
1053 srv->flags.busy = false;
1054 /**
1055 * BUG: the below assumes that only one response per read() was received and discards any octets remaining.
1056 * Doing this prohibits concurrency support with multiple replies per read().
1057 * TODO: check that read() setup on these buffers pays attention to roffest!=0
1058 * TODO: check that replies bigger than the buffer are discarded and do not to affect future replies
1059 */
1060 srv->roffset = 0;
1061 helperStatefulRequestFree(r);
1062 srv->request = NULL;
1063
1064 -- srv->stats.pending;
1065 ++ srv->stats.replies;
1066
1067 ++ hlp->stats.replies;
1068 srv->answer_time = current_time;
1069 hlp->stats.avg_svc_time =
1070 Math::intAverage(hlp->stats.avg_svc_time,
1071 tvSubMsec(srv->dispatch_time, current_time),
1072 hlp->stats.replies, REDIRECT_AV_FACTOR);
1073
1074 if (called)
1075 helperStatefulServerDone(srv);
1076 else
1077 helperStatefulReleaseServer(srv);
1078 }
1079
1080 if (Comm::IsConnOpen(srv->readPipe)) {
1081 int spaceSize = srv->rbuf_sz - srv->roffset - 1;
1082 assert(spaceSize >= 0);
1083
1084 // grow the input buffer if needed and possible
1085 if (!spaceSize && srv->rbuf_sz + 4096 <= ReadBufMaxSize) {
1086 srv->rbuf = (char *)memReallocBuf(srv->rbuf, srv->rbuf_sz + 4096, &srv->rbuf_sz);
1087 debugs(84, 3, HERE << "Grew read buffer to " << srv->rbuf_sz);
1088 spaceSize = srv->rbuf_sz - srv->roffset - 1;
1089 assert(spaceSize >= 0);
1090 }
1091
1092 // quit reading if there is no space left
1093 if (!spaceSize) {
1094 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
1095 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
1096 "Squid input buffer: " << hlp->id_name << " #" <<
1097 (srv->index + 1));
1098 srv->closePipesSafely();
1099 return;
1100 }
1101
1102 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
1103 CommIoCbPtrFun(helperStatefulHandleRead, srv));
1104 comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
1105 }
1106 }
1107
1108 static void
1109 Enqueue(helper * hlp, helper_request * r)
1110 {
1111 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
1112 dlinkAddTail(r, link, &hlp->queue);
1113 ++ hlp->stats.queue_size;
1114
1115 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1116 if (hlp->childs.needNew() > 0) {
1117 debugs(84, DBG_CRITICAL, "Starting new " << hlp->id_name << " helpers...");
1118 helperOpenServers(hlp);
1119 return;
1120 }
1121
1122 if (hlp->stats.queue_size < (int)hlp->childs.n_running)
1123 return;
1124
1125 if (squid_curtime - hlp->last_queue_warn < 600)
1126 return;
1127
1128 if (shutting_down || reconfiguring)
1129 return;
1130
1131 hlp->last_queue_warn = squid_curtime;
1132
1133 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1134 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1135 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1136
1137 if (hlp->stats.queue_size > (int)hlp->childs.n_running * 2)
1138 fatalf("Too many queued %s requests", hlp->id_name);
1139 }
1140
1141 static void
1142 StatefulEnqueue(statefulhelper * hlp, helper_stateful_request * r)
1143 {
1144 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
1145 dlinkAddTail(r, link, &hlp->queue);
1146 ++ hlp->stats.queue_size;
1147
1148 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1149 if (hlp->childs.needNew() > 0) {
1150 debugs(84, DBG_CRITICAL, "Starting new " << hlp->id_name << " helpers...");
1151 helperStatefulOpenServers(hlp);
1152 return;
1153 }
1154
1155 if (hlp->stats.queue_size < (int)hlp->childs.n_running)
1156 return;
1157
1158 if (hlp->stats.queue_size > (int)hlp->childs.n_running * 2)
1159 fatalf("Too many queued %s requests", hlp->id_name);
1160
1161 if (squid_curtime - hlp->last_queue_warn < 600)
1162 return;
1163
1164 if (shutting_down || reconfiguring)
1165 return;
1166
1167 hlp->last_queue_warn = squid_curtime;
1168
1169 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1170 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1171 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1172 }
1173
1174 static helper_request *
1175 Dequeue(helper * hlp)
1176 {
1177 dlink_node *link;
1178 helper_request *r = NULL;
1179
1180 if ((link = hlp->queue.head)) {
1181 r = (helper_request *)link->data;
1182 dlinkDelete(link, &hlp->queue);
1183 memFree(link, MEM_DLINK_NODE);
1184 -- hlp->stats.queue_size;
1185 }
1186
1187 return r;
1188 }
1189
1190 static helper_stateful_request *
1191 StatefulDequeue(statefulhelper * hlp)
1192 {
1193 dlink_node *link;
1194 helper_stateful_request *r = NULL;
1195
1196 if ((link = hlp->queue.head)) {
1197 r = (helper_stateful_request *)link->data;
1198 dlinkDelete(link, &hlp->queue);
1199 memFree(link, MEM_DLINK_NODE);
1200 -- hlp->stats.queue_size;
1201 }
1202
1203 return r;
1204 }
1205
1206 static helper_server *
1207 GetFirstAvailable(helper * hlp)
1208 {
1209 dlink_node *n;
1210 helper_server *srv;
1211 helper_server *selected = NULL;
1212 debugs(84, 5, "GetFirstAvailable: Running servers " << hlp->childs.n_running);
1213
1214 if (hlp->childs.n_running == 0)
1215 return NULL;
1216
1217 /* Find "least" loaded helper (approx) */
1218 for (n = hlp->servers.head; n != NULL; n = n->next) {
1219 srv = (helper_server *)n->data;
1220
1221 if (selected && selected->stats.pending <= srv->stats.pending)
1222 continue;
1223
1224 if (srv->flags.shutdown)
1225 continue;
1226
1227 if (!srv->stats.pending)
1228 return srv;
1229
1230 if (selected) {
1231 selected = srv;
1232 break;
1233 }
1234
1235 selected = srv;
1236 }
1237
1238 /* Check for overload */
1239 if (!selected) {
1240 debugs(84, 5, "GetFirstAvailable: None available.");
1241 return NULL;
1242 }
1243
1244 if (selected->stats.pending >= (hlp->childs.concurrency ? hlp->childs.concurrency : 1)) {
1245 debugs(84, 3, "GetFirstAvailable: Least-loaded helper is overloaded!");
1246 return NULL;
1247 }
1248
1249 debugs(84, 5, "GetFirstAvailable: returning srv-" << selected->index);
1250 return selected;
1251 }
1252
1253 static helper_stateful_server *
1254 StatefulGetFirstAvailable(statefulhelper * hlp)
1255 {
1256 dlink_node *n;
1257 helper_stateful_server *srv = NULL;
1258 debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp->childs.n_running);
1259
1260 if (hlp->childs.n_running == 0)
1261 return NULL;
1262
1263 for (n = hlp->servers.head; n != NULL; n = n->next) {
1264 srv = (helper_stateful_server *)n->data;
1265
1266 if (srv->flags.busy)
1267 continue;
1268
1269 if (srv->flags.reserved)
1270 continue;
1271
1272 if (srv->flags.shutdown)
1273 continue;
1274
1275 if ((hlp->IsAvailable != NULL) && (srv->data != NULL) && !(hlp->IsAvailable(srv->data)))
1276 continue;
1277
1278 debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv->index);
1279 return srv;
1280 }
1281
1282 debugs(84, 5, "StatefulGetFirstAvailable: None available.");
1283 return NULL;
1284 }
1285
1286 static void
1287 helperDispatchWriteDone(const Comm::ConnectionPointer &conn, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
1288 {
1289 helper_server *srv = (helper_server *)data;
1290
1291 srv->writebuf->clean();
1292 delete srv->writebuf;
1293 srv->writebuf = NULL;
1294 srv->flags.writing = false;
1295
1296 if (flag != COMM_OK) {
1297 /* Helper server has crashed */
1298 debugs(84, DBG_CRITICAL, "helperDispatch: Helper " << srv->parent->id_name << " #" << srv->index + 1 << " has crashed");
1299 return;
1300 }
1301
1302 if (!srv->wqueue->isNull()) {
1303 srv->writebuf = srv->wqueue;
1304 srv->wqueue = new MemBuf;
1305 srv->flags.writing = true;
1306 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1307 CommIoCbPtrFun(helperDispatchWriteDone, srv));
1308 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
1309 }
1310 }
1311
1312 static void
1313 helperDispatch(helper_server * srv, helper_request * r)
1314 {
1315 helper *hlp = srv->parent;
1316 helper_request **ptr = NULL;
1317 unsigned int slot;
1318
1319 if (!cbdataReferenceValid(r->data)) {
1320 debugs(84, DBG_IMPORTANT, "helperDispatch: invalid callback data");
1321 helperRequestFree(r);
1322 return;
1323 }
1324
1325 for (slot = 0; slot < (hlp->childs.concurrency ? hlp->childs.concurrency : 1); ++slot) {
1326 if (!srv->requests[slot]) {
1327 ptr = &srv->requests[slot];
1328 break;
1329 }
1330 }
1331
1332 assert(ptr);
1333 *ptr = r;
1334 r->dispatch_time = current_time;
1335
1336 if (srv->wqueue->isNull())
1337 srv->wqueue->init();
1338
1339 if (hlp->childs.concurrency)
1340 srv->wqueue->Printf("%d %s", slot, r->buf);
1341 else
1342 srv->wqueue->append(r->buf, strlen(r->buf));
1343
1344 if (!srv->flags.writing) {
1345 assert(NULL == srv->writebuf);
1346 srv->writebuf = srv->wqueue;
1347 srv->wqueue = new MemBuf;
1348 srv->flags.writing = true;
1349 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1350 CommIoCbPtrFun(helperDispatchWriteDone, srv));
1351 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
1352 }
1353
1354 debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index + 1 << ", " << strlen(r->buf) << " bytes");
1355
1356 ++ srv->stats.uses;
1357 ++ srv->stats.pending;
1358 ++ hlp->stats.requests;
1359 }
1360
1361 static void
1362 helperStatefulDispatchWriteDone(const Comm::ConnectionPointer &conn, char *buf, size_t len, comm_err_t flag,
1363 int xerrno, void *data)
1364 {
1365 /* nothing! */
1366 }
1367
1368 static void
1369 helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r)
1370 {
1371 statefulhelper *hlp = srv->parent;
1372
1373 if (!cbdataReferenceValid(r->data)) {
1374 debugs(84, DBG_IMPORTANT, "helperStatefulDispatch: invalid callback data");
1375 helperStatefulRequestFree(r);
1376 helperStatefulReleaseServer(srv);
1377 return;
1378 }
1379
1380 debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index + 1);
1381
1382 if (r->placeholder == 1) {
1383 /* a callback is needed before this request can _use_ a helper. */
1384 /* we don't care about releasing this helper. The request NEVER
1385 * gets to the helper. So we throw away the return code */
1386 HelperReply nilReply;
1387 nilReply.whichServer = srv;
1388 r->callback(r->data, nilReply);
1389 /* throw away the placeholder */
1390 helperStatefulRequestFree(r);
1391 /* and push the queue. Note that the callback may have submitted a new
1392 * request to the helper which is why we test for the request */
1393
1394 if (srv->request == NULL)
1395 helperStatefulServerDone(srv);
1396
1397 return;
1398 }
1399
1400 srv->flags.busy = true;
1401 srv->flags.reserved = true;
1402 srv->request = r;
1403 srv->dispatch_time = current_time;
1404 AsyncCall::Pointer call = commCbCall(5,5, "helperStatefulDispatchWriteDone",
1405 CommIoCbPtrFun(helperStatefulDispatchWriteDone, hlp));
1406 Comm::Write(srv->writePipe, r->buf, strlen(r->buf), call, NULL);
1407 debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
1408 hlp->id_name << " #" << srv->index + 1 << ", " <<
1409 (int) strlen(r->buf) << " bytes");
1410
1411 ++ srv->stats.uses;
1412 ++ srv->stats.pending;
1413 ++ hlp->stats.requests;
1414 }
1415
1416 static void
1417 helperKickQueue(helper * hlp)
1418 {
1419 helper_request *r;
1420 helper_server *srv;
1421
1422 while ((srv = GetFirstAvailable(hlp)) && (r = Dequeue(hlp)))
1423 helperDispatch(srv, r);
1424 }
1425
1426 static void
1427 helperStatefulKickQueue(statefulhelper * hlp)
1428 {
1429 helper_stateful_request *r;
1430 helper_stateful_server *srv;
1431
1432 while ((srv = StatefulGetFirstAvailable(hlp)) && (r = StatefulDequeue(hlp)))
1433 helperStatefulDispatch(srv, r);
1434 }
1435
1436 static void
1437 helperStatefulServerDone(helper_stateful_server * srv)
1438 {
1439 if (!srv->flags.shutdown) {
1440 helperStatefulKickQueue(srv->parent);
1441 } else if (!srv->flags.closing && !srv->flags.reserved && !srv->flags.busy) {
1442 srv->closeWritePipeSafely();
1443 return;
1444 }
1445 }
1446
1447 static void
1448 helperRequestFree(helper_request * r)
1449 {
1450 cbdataReferenceDone(r->data);
1451 xfree(r->buf);
1452 delete r;
1453 }
1454
1455 static void
1456 helperStatefulRequestFree(helper_stateful_request * r)
1457 {
1458 if (r) {
1459 cbdataReferenceDone(r->data);
1460 xfree(r->buf);
1461 delete r;
1462 }
1463 }
1464
1465 // TODO: should helper_ and helper_stateful_ have a common parent?
1466 static bool
1467 helperStartStats(StoreEntry *sentry, void *hlp, const char *label)
1468 {
1469 if (!hlp) {
1470 if (label)
1471 storeAppendPrintf(sentry, "%s: unavailable\n", label);
1472 return false;
1473 }
1474
1475 if (label)
1476 storeAppendPrintf(sentry, "%s:\n", label);
1477
1478 return true;
1479 }