]> git.ipfire.org Git - thirdparty/squid.git/blob - src/helper.cc
Make helper queue size configurable, with consistent defaults and better overflow...
[thirdparty/squid.git] / src / helper.cc
1 /*
2 * Copyright (C) 1996-2014 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 84 Helper process maintenance */
10
11 #include "squid.h"
12 #include "base/AsyncCbdataCalls.h"
13 #include "comm.h"
14 #include "comm/Connection.h"
15 #include "comm/Read.h"
16 #include "comm/Write.h"
17 #include "fd.h"
18 #include "fde.h"
19 #include "format/Quoting.h"
20 #include "helper.h"
21 #include "helper/Reply.h"
22 #include "helper/Request.h"
23 #include "Mem.h"
24 #include "MemBuf.h"
25 #include "SquidIpc.h"
26 #include "SquidMath.h"
27 #include "SquidTime.h"
28 #include "Store.h"
29 #include "wordlist.h"
30
31 #define HELPER_MAX_ARGS 64
32
33 /** Initial Squid input buffer size. Helper responses may exceed this, and
34 * Squid will grow the input buffer as needed, up to ReadBufMaxSize.
35 */
36 const size_t ReadBufMinSize(4*1024);
37
38 /** Maximum safe size of a helper-to-Squid response message plus one.
39 * Squid will warn and close the stream if a helper sends a too-big response.
40 * ssl_crtd helper is known to produce responses of at least 10KB in size.
41 * Some undocumented helpers are known to produce responses exceeding 8KB.
42 */
43 const size_t ReadBufMaxSize(32*1024);
44
45 static IOCB helperHandleRead;
46 static IOCB helperStatefulHandleRead;
47 static void helperServerFree(helper_server *srv);
48 static void helperStatefulServerFree(helper_stateful_server *srv);
49 static void Enqueue(helper * hlp, Helper::Request *);
50 static Helper::Request *Dequeue(helper * hlp);
51 static Helper::Request *StatefulDequeue(statefulhelper * hlp);
52 static helper_server *GetFirstAvailable(helper * hlp);
53 static helper_stateful_server *StatefulGetFirstAvailable(statefulhelper * hlp);
54 static void helperDispatch(helper_server * srv, Helper::Request * r);
55 static void helperStatefulDispatch(helper_stateful_server * srv, Helper::Request * r);
56 static void helperKickQueue(helper * hlp);
57 static void helperStatefulKickQueue(statefulhelper * hlp);
58 static void helperStatefulServerDone(helper_stateful_server * srv);
59 static void StatefulEnqueue(statefulhelper * hlp, Helper::Request * r);
60 static bool helperStartStats(StoreEntry *sentry, void *hlp, const char *label);
61
62 CBDATA_CLASS_INIT(helper);
63 CBDATA_CLASS_INIT(helper_server);
64 CBDATA_CLASS_INIT(statefulhelper);
65 CBDATA_CLASS_INIT(helper_stateful_server);
66
67 InstanceIdDefinitions(HelperServerBase, "Hlpr");
68
69 void
70 HelperServerBase::initStats()
71 {
72 stats.uses=0;
73 stats.replies=0;
74 stats.pending=0;
75 stats.releases=0;
76 }
77
78 void
79 HelperServerBase::closePipesSafely(const char *id_name)
80 {
81 #if _SQUID_WINDOWS_
82 shutdown(writePipe->fd, SD_BOTH);
83 #endif
84
85 flags.closing = true;
86 if (readPipe->fd == writePipe->fd)
87 readPipe->fd = -1;
88 else
89 readPipe->close();
90 writePipe->close();
91
92 #if _SQUID_WINDOWS_
93 if (hIpc) {
94 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
95 getCurrentTime();
96 debugs(84, DBG_IMPORTANT, "WARNING: " << id_name <<
97 " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
98 }
99 CloseHandle(hIpc);
100 }
101 #endif
102 }
103
104 void
105 HelperServerBase::closeWritePipeSafely(const char *id_name)
106 {
107 #if _SQUID_WINDOWS_
108 shutdown(writePipe->fd, (readPipe->fd == writePipe->fd ? SD_BOTH : SD_SEND));
109 #endif
110
111 flags.closing = true;
112 if (readPipe->fd == writePipe->fd)
113 readPipe->fd = -1;
114 writePipe->close();
115
116 #if _SQUID_WINDOWS_
117 if (hIpc) {
118 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
119 getCurrentTime();
120 debugs(84, DBG_IMPORTANT, "WARNING: " << id_name <<
121 " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
122 }
123 CloseHandle(hIpc);
124 }
125 #endif
126 }
127
128 void
129 helperOpenServers(helper * hlp)
130 {
131 char *s;
132 char *progname;
133 char *shortname;
134 char *procname;
135 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
136 char fd_note_buf[FD_DESC_SZ];
137 helper_server *srv;
138 int nargs = 0;
139 int k;
140 pid_t pid;
141 int rfd;
142 int wfd;
143 void * hIpc;
144 wordlist *w;
145
146 if (hlp->cmdline == NULL)
147 return;
148
149 progname = hlp->cmdline->key;
150
151 if ((s = strrchr(progname, '/')))
152 shortname = xstrdup(s + 1);
153 else
154 shortname = xstrdup(progname);
155
156 /* figure out how many new child are actually needed. */
157 int need_new = hlp->childs.needNew();
158
159 debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
160
161 if (need_new < 1) {
162 debugs(84, DBG_IMPORTANT, "helperOpenServers: No '" << shortname << "' processes needed.");
163 }
164
165 procname = (char *)xmalloc(strlen(shortname) + 3);
166
167 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
168
169 args[nargs] = procname;
170 ++nargs;
171
172 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
173 args[nargs] = w->key;
174 ++nargs;
175 }
176
177 args[nargs] = NULL;
178 ++nargs;
179
180 assert(nargs <= HELPER_MAX_ARGS);
181
182 for (k = 0; k < need_new; ++k) {
183 getCurrentTime();
184 rfd = wfd = -1;
185 pid = ipcCreate(hlp->ipc_type,
186 progname,
187 args,
188 shortname,
189 hlp->addr,
190 &rfd,
191 &wfd,
192 &hIpc);
193
194 if (pid < 0) {
195 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
196 continue;
197 }
198
199 ++ hlp->childs.n_running;
200 ++ hlp->childs.n_active;
201 srv = new helper_server;
202 srv->hIpc = hIpc;
203 srv->pid = pid;
204 srv->initStats();
205 srv->addr = hlp->addr;
206 srv->readPipe = new Comm::Connection;
207 srv->readPipe->fd = rfd;
208 srv->writePipe = new Comm::Connection;
209 srv->writePipe->fd = wfd;
210 srv->rbuf = (char *)memAllocBuf(ReadBufMinSize, &srv->rbuf_sz);
211 srv->wqueue = new MemBuf;
212 srv->roffset = 0;
213 srv->requests = (Helper::Request **)xcalloc(hlp->childs.concurrency ? hlp->childs.concurrency : 1, sizeof(*srv->requests));
214 srv->parent = cbdataReference(hlp);
215 dlinkAddTail(srv, &srv->link, &hlp->servers);
216
217 if (rfd == wfd) {
218 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
219 fd_note(rfd, fd_note_buf);
220 } else {
221 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
222 fd_note(rfd, fd_note_buf);
223 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
224 fd_note(wfd, fd_note_buf);
225 }
226
227 commSetNonBlocking(rfd);
228
229 if (wfd != rfd)
230 commSetNonBlocking(wfd);
231
232 AsyncCall::Pointer closeCall = asyncCall(5,4, "helperServerFree", cbdataDialer(helperServerFree, srv));
233 comm_add_close_handler(rfd, closeCall);
234
235 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
236 CommIoCbPtrFun(helperHandleRead, srv));
237 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
238 }
239
240 hlp->last_restart = squid_curtime;
241 safe_free(shortname);
242 safe_free(procname);
243 helperKickQueue(hlp);
244 }
245
246 /**
247 * DPW 2007-05-08
248 *
249 * helperStatefulOpenServers: create the stateful child helper processes
250 */
251 void
252 helperStatefulOpenServers(statefulhelper * hlp)
253 {
254 char *shortname;
255 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
256 char fd_note_buf[FD_DESC_SZ];
257 int nargs = 0;
258
259 if (hlp->cmdline == NULL)
260 return;
261
262 if (hlp->childs.concurrency)
263 debugs(84, DBG_CRITICAL, "ERROR: concurrency= is not yet supported for stateful helpers ('" << hlp->cmdline << "')");
264
265 char *progname = hlp->cmdline->key;
266
267 char *s;
268 if ((s = strrchr(progname, '/')))
269 shortname = xstrdup(s + 1);
270 else
271 shortname = xstrdup(progname);
272
273 /* figure out haw mant new helpers are needed. */
274 int need_new = hlp->childs.needNew();
275
276 debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
277
278 if (need_new < 1) {
279 debugs(84, DBG_IMPORTANT, "helperStatefulOpenServers: No '" << shortname << "' processes needed.");
280 }
281
282 char *procname = (char *)xmalloc(strlen(shortname) + 3);
283
284 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
285
286 args[nargs] = procname;
287 ++nargs;
288
289 for (wordlist *w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
290 args[nargs] = w->key;
291 ++nargs;
292 }
293
294 args[nargs] = NULL;
295 ++nargs;
296
297 assert(nargs <= HELPER_MAX_ARGS);
298
299 for (int k = 0; k < need_new; ++k) {
300 getCurrentTime();
301 int rfd = -1;
302 int wfd = -1;
303 void * hIpc;
304 pid_t pid = ipcCreate(hlp->ipc_type,
305 progname,
306 args,
307 shortname,
308 hlp->addr,
309 &rfd,
310 &wfd,
311 &hIpc);
312
313 if (pid < 0) {
314 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
315 continue;
316 }
317
318 ++ hlp->childs.n_running;
319 ++ hlp->childs.n_active;
320 helper_stateful_server *srv = new helper_stateful_server;
321 srv->hIpc = hIpc;
322 srv->pid = pid;
323 srv->flags.reserved = false;
324 srv->initStats();
325 srv->addr = hlp->addr;
326 srv->readPipe = new Comm::Connection;
327 srv->readPipe->fd = rfd;
328 srv->writePipe = new Comm::Connection;
329 srv->writePipe->fd = wfd;
330 srv->rbuf = (char *)memAllocBuf(ReadBufMinSize, &srv->rbuf_sz);
331 srv->roffset = 0;
332 srv->parent = cbdataReference(hlp);
333
334 if (hlp->datapool != NULL)
335 srv->data = hlp->datapool->alloc();
336
337 dlinkAddTail(srv, &srv->link, &hlp->servers);
338
339 if (rfd == wfd) {
340 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
341 fd_note(rfd, fd_note_buf);
342 } else {
343 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
344 fd_note(rfd, fd_note_buf);
345 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
346 fd_note(wfd, fd_note_buf);
347 }
348
349 commSetNonBlocking(rfd);
350
351 if (wfd != rfd)
352 commSetNonBlocking(wfd);
353
354 AsyncCall::Pointer closeCall = asyncCall(5,4, "helperStatefulServerFree", cbdataDialer(helperStatefulServerFree, srv));
355 comm_add_close_handler(rfd, closeCall);
356
357 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
358 CommIoCbPtrFun(helperStatefulHandleRead, srv));
359 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
360 }
361
362 hlp->last_restart = squid_curtime;
363 safe_free(shortname);
364 safe_free(procname);
365 helperStatefulKickQueue(hlp);
366 }
367
368 void
369 helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
370 {
371 if (hlp == NULL) {
372 debugs(84, 3, "helperSubmit: hlp == NULL");
373 Helper::Reply nilReply;
374 callback(data, nilReply);
375 return;
376 }
377 hlp->prepSubmit();
378 hlp->submit(buf, callback, data);
379 }
380
381 bool
382 helper::queueFull() const {
383 return stats.queue_size > static_cast<int>(childs.queue_size);
384 }
385
386 /// prepares the helper for request submission via trySubmit() or helperSubmit()
387 /// currently maintains full_time and kills Squid if the helper remains full for too long
388 void
389 helper::prepSubmit()
390 {
391 if (!queueFull())
392 full_time = 0;
393 else if (!full_time) // may happen here if reconfigure decreases capacity
394 full_time = squid_curtime;
395 else if (squid_curtime - full_time > 180)
396 fatalf("Too many queued %s requests", id_name);
397 }
398
399 bool
400 helper::trySubmit(const char *buf, HLPCB * callback, void *data)
401 {
402 prepSubmit();
403
404 if (queueFull()) {
405 debugs(84, DBG_IMPORTANT, id_name << " drops request due to a full queue");
406 return false; // request was ignored
407 }
408
409 submit(buf, callback, data); // will send or queue
410 return true; // request submitted or queued
411 }
412
413 /// dispatches or enqueues a helper requests; does not enforce queue limits
414 void
415 helper::submit(const char *buf, HLPCB * callback, void *data)
416 {
417 Helper::Request *r = new Helper::Request(callback, data, buf);
418 helper_server *srv;
419
420 if ((srv = GetFirstAvailable(this)))
421 helperDispatch(srv, r);
422 else
423 Enqueue(this, r);
424
425 debugs(84, DBG_DATA, Raw("buf", buf, strlen(buf)));
426
427 if (!queueFull()) {
428 full_time = 0;
429 } else if (!full_time) {
430 debugs(84, 3, id_name << " queue became full");
431 full_time = squid_curtime;
432 }
433 }
434
435 /// lastserver = "server last used as part of a reserved request sequence"
436 void
437 helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPCB * callback, void *data, helper_stateful_server * lastserver)
438 {
439 if (hlp == NULL) {
440 debugs(84, 3, "helperStatefulSubmit: hlp == NULL");
441 Helper::Reply nilReply;
442 callback(data, nilReply);
443 return;
444 }
445 hlp->prepSubmit();
446 hlp->submit(buf, callback, data, lastserver);
447 }
448
449 void statefulhelper::submit(const char *buf, HLPCB * callback, void *data, helper_stateful_server * lastserver)
450 {
451 Helper::Request *r = new Helper::Request(callback, data, buf);
452
453 if ((buf != NULL) && lastserver) {
454 debugs(84, 5, "StatefulSubmit with lastserver " << lastserver);
455 assert(lastserver->flags.reserved);
456 assert(!(lastserver->request));
457
458 debugs(84, 5, "StatefulSubmit dispatching");
459 helperStatefulDispatch(lastserver, r);
460 } else {
461 helper_stateful_server *srv;
462 if ((srv = StatefulGetFirstAvailable(this))) {
463 helperStatefulDispatch(srv, r);
464 } else
465 StatefulEnqueue(this, r);
466 }
467
468 debugs(84, DBG_DATA, "placeholder: '" << r->placeholder <<
469 "', " << Raw("buf", buf, (!buf?0:strlen(buf))));
470
471 if (!queueFull()) {
472 full_time = 0;
473 } else if (!full_time) {
474 debugs(84, 3, id_name << " queue became full");
475 full_time = squid_curtime;
476 }
477 }
478
479 /**
480 * DPW 2007-05-08
481 *
482 * helperStatefulReleaseServer tells the helper that whoever was
483 * using it no longer needs its services.
484 */
485 void
486 helperStatefulReleaseServer(helper_stateful_server * srv)
487 {
488 debugs(84, 3, HERE << "srv-" << srv->index << " flags.reserved = " << srv->flags.reserved);
489 if (!srv->flags.reserved)
490 return;
491
492 ++ srv->stats.releases;
493
494 srv->flags.reserved = false;
495 if (srv->parent->OnEmptyQueue != NULL && srv->data)
496 srv->parent->OnEmptyQueue(srv->data);
497
498 helperStatefulServerDone(srv);
499 }
500
501 /** return a pointer to the stateful routines data area */
502 void *
503 helperStatefulServerGetData(helper_stateful_server * srv)
504 {
505 return srv->data;
506 }
507
508 /**
509 * Dump some stats about the helper states to a StoreEntry
510 */
511 void
512 helperStats(StoreEntry * sentry, helper * hlp, const char *label)
513 {
514 if (!helperStartStats(sentry, hlp, label))
515 return;
516
517 storeAppendPrintf(sentry, "program: %s\n",
518 hlp->cmdline->key);
519 storeAppendPrintf(sentry, "number active: %d of %d (%d shutting down)\n",
520 hlp->childs.n_active, hlp->childs.n_max, (hlp->childs.n_running - hlp->childs.n_active) );
521 storeAppendPrintf(sentry, "requests sent: %d\n",
522 hlp->stats.requests);
523 storeAppendPrintf(sentry, "replies received: %d\n",
524 hlp->stats.replies);
525 storeAppendPrintf(sentry, "queue length: %d\n",
526 hlp->stats.queue_size);
527 storeAppendPrintf(sentry, "avg service time: %d msec\n",
528 hlp->stats.avg_svc_time);
529 storeAppendPrintf(sentry, "\n");
530 storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%11s\t%s\t%7s\t%7s\t%7s\n",
531 "ID #",
532 "FD",
533 "PID",
534 "# Requests",
535 "# Replies",
536 "Flags",
537 "Time",
538 "Offset",
539 "Request");
540
541 for (dlink_node *link = hlp->servers.head; link; link = link->next) {
542 helper_server *srv = (helper_server*)link->data;
543 double tt = 0.001 * (srv->requests[0] ? tvSubMsec(srv->requests[0]->dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
544 storeAppendPrintf(sentry, "%7u\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c\t%7.3f\t%7d\t%s\n",
545 srv->index.value,
546 srv->readPipe->fd,
547 srv->pid,
548 srv->stats.uses,
549 srv->stats.replies,
550 srv->stats.pending ? 'B' : ' ',
551 srv->flags.writing ? 'W' : ' ',
552 srv->flags.closing ? 'C' : ' ',
553 srv->flags.shutdown ? 'S' : ' ',
554 tt < 0.0 ? 0.0 : tt,
555 (int) srv->roffset,
556 srv->requests[0] ? Format::QuoteMimeBlob(srv->requests[0]->buf) : "(none)");
557 }
558
559 storeAppendPrintf(sentry, "\nFlags key:\n\n");
560 storeAppendPrintf(sentry, " B = BUSY\n");
561 storeAppendPrintf(sentry, " W = WRITING\n");
562 storeAppendPrintf(sentry, " C = CLOSING\n");
563 storeAppendPrintf(sentry, " S = SHUTDOWN PENDING\n");
564 }
565
566 void
567 helperStatefulStats(StoreEntry * sentry, statefulhelper * hlp, const char *label)
568 {
569 if (!helperStartStats(sentry, hlp, label))
570 return;
571
572 storeAppendPrintf(sentry, "program: %s\n",
573 hlp->cmdline->key);
574 storeAppendPrintf(sentry, "number active: %d of %d (%d shutting down)\n",
575 hlp->childs.n_active, hlp->childs.n_max, (hlp->childs.n_running - hlp->childs.n_active) );
576 storeAppendPrintf(sentry, "requests sent: %d\n",
577 hlp->stats.requests);
578 storeAppendPrintf(sentry, "replies received: %d\n",
579 hlp->stats.replies);
580 storeAppendPrintf(sentry, "queue length: %d\n",
581 hlp->stats.queue_size);
582 storeAppendPrintf(sentry, "avg service time: %d msec\n",
583 hlp->stats.avg_svc_time);
584 storeAppendPrintf(sentry, "\n");
585 storeAppendPrintf(sentry, "%7s\t%7s\t%7s\t%11s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
586 "ID #",
587 "FD",
588 "PID",
589 "# Requests",
590 "# Replies",
591 "Flags",
592 "Time",
593 "Offset",
594 "Request");
595
596 for (dlink_node *link = hlp->servers.head; link; link = link->next) {
597 helper_stateful_server *srv = (helper_stateful_server *)link->data;
598 double tt = 0.001 * tvSubMsec(srv->dispatch_time, srv->stats.pending ? current_time : srv->answer_time);
599 storeAppendPrintf(sentry, "%7u\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
600 srv->index.value,
601 srv->readPipe->fd,
602 srv->pid,
603 srv->stats.uses,
604 srv->stats.replies,
605 srv->stats.pending ? 'B' : ' ',
606 srv->flags.closing ? 'C' : ' ',
607 srv->flags.reserved ? 'R' : ' ',
608 srv->flags.shutdown ? 'S' : ' ',
609 srv->request ? (srv->request->placeholder ? 'P' : ' ') : ' ',
610 tt < 0.0 ? 0.0 : tt,
611 (int) srv->roffset,
612 srv->request ? Format::QuoteMimeBlob(srv->request->buf) : "(none)");
613 }
614
615 storeAppendPrintf(sentry, "\nFlags key:\n\n");
616 storeAppendPrintf(sentry, " B = BUSY\n");
617 storeAppendPrintf(sentry, " C = CLOSING\n");
618 storeAppendPrintf(sentry, " R = RESERVED\n");
619 storeAppendPrintf(sentry, " S = SHUTDOWN PENDING\n");
620 storeAppendPrintf(sentry, " P = PLACEHOLDER\n");
621 }
622
623 void
624 helperShutdown(helper * hlp)
625 {
626 dlink_node *link = hlp->servers.head;
627
628 while (link) {
629 helper_server *srv;
630 srv = (helper_server *)link->data;
631 link = link->next;
632
633 if (srv->flags.shutdown) {
634 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
635 continue;
636 }
637
638 assert(hlp->childs.n_active > 0);
639 -- hlp->childs.n_active;
640 srv->flags.shutdown = true; /* request it to shut itself down */
641
642 if (srv->flags.closing) {
643 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
644 continue;
645 }
646
647 if (srv->stats.pending) {
648 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
649 continue;
650 }
651
652 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
653 /* the rest of the details is dealt with in the helperServerFree
654 * close handler
655 */
656 srv->closePipesSafely(hlp->id_name);
657 }
658 }
659
660 void
661 helperStatefulShutdown(statefulhelper * hlp)
662 {
663 dlink_node *link = hlp->servers.head;
664 helper_stateful_server *srv;
665
666 while (link) {
667 srv = (helper_stateful_server *)link->data;
668 link = link->next;
669
670 if (srv->flags.shutdown) {
671 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
672 continue;
673 }
674
675 assert(hlp->childs.n_active > 0);
676 -- hlp->childs.n_active;
677 srv->flags.shutdown = true; /* request it to shut itself down */
678
679 if (srv->stats.pending) {
680 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
681 continue;
682 }
683
684 if (srv->flags.closing) {
685 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
686 continue;
687 }
688
689 if (srv->flags.reserved) {
690 if (shutting_down) {
691 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Closing anyway.");
692 } else {
693 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Not Shutting Down Yet.");
694 continue;
695 }
696 }
697
698 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
699
700 /* the rest of the details is dealt with in the helperStatefulServerFree
701 * close handler
702 */
703 srv->closePipesSafely(hlp->id_name);
704 }
705 }
706
707 helper::~helper()
708 {
709 /* note, don't free id_name, it probably points to static memory */
710
711 if (queue.head)
712 debugs(84, DBG_CRITICAL, "WARNING: freeing " << id_name << " helper with " << stats.queue_size << " requests queued");
713 }
714
715 /* ====================================================================== */
716 /* LOCAL FUNCTIONS */
717 /* ====================================================================== */
718
719 static void
720 helperServerFree(helper_server *srv)
721 {
722 helper *hlp = srv->parent;
723 Helper::Request *r;
724 int i, concurrency = hlp->childs.concurrency;
725
726 if (!concurrency)
727 concurrency = 1;
728
729 if (srv->rbuf) {
730 memFreeBuf(srv->rbuf_sz, srv->rbuf);
731 srv->rbuf = NULL;
732 }
733
734 srv->wqueue->clean();
735 delete srv->wqueue;
736
737 if (srv->writebuf) {
738 srv->writebuf->clean();
739 delete srv->writebuf;
740 srv->writebuf = NULL;
741 }
742
743 if (Comm::IsConnOpen(srv->writePipe))
744 srv->closeWritePipeSafely(hlp->id_name);
745
746 dlinkDelete(&srv->link, &hlp->servers);
747
748 assert(hlp->childs.n_running > 0);
749 -- hlp->childs.n_running;
750
751 if (!srv->flags.shutdown) {
752 assert(hlp->childs.n_active > 0);
753 -- hlp->childs.n_active;
754 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index << " exited");
755
756 if (hlp->childs.needNew() > 0) {
757 debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
758
759 if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30) {
760 if (srv->stats.replies < 1)
761 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
762 else
763 debugs(80, DBG_CRITICAL, "ERROR: The " << hlp->id_name << " helpers are crashing too rapidly, need help!");
764 }
765
766 debugs(80, DBG_IMPORTANT, "Starting new helpers");
767 helperOpenServers(hlp);
768 }
769 }
770
771 for (i = 0; i < concurrency; ++i) {
772 // XXX: re-schedule these on another helper?
773 if ((r = srv->requests[i])) {
774 void *cbdata;
775
776 if (cbdataReferenceValidDone(r->data, &cbdata)) {
777 Helper::Reply nilReply;
778 r->callback(cbdata, nilReply);
779 }
780
781 delete r;
782
783 srv->requests[i] = NULL;
784 }
785 }
786 safe_free(srv->requests);
787
788 cbdataReferenceDone(srv->parent);
789 delete srv;
790 }
791
792 static void
793 helperStatefulServerFree(helper_stateful_server *srv)
794 {
795 statefulhelper *hlp = srv->parent;
796 Helper::Request *r;
797
798 if (srv->rbuf) {
799 memFreeBuf(srv->rbuf_sz, srv->rbuf);
800 srv->rbuf = NULL;
801 }
802
803 #if 0
804 srv->wqueue->clean();
805
806 delete srv->wqueue;
807
808 #endif
809
810 /* TODO: walk the local queue of requests and carry them all out */
811 if (Comm::IsConnOpen(srv->writePipe))
812 srv->closeWritePipeSafely(hlp->id_name);
813
814 dlinkDelete(&srv->link, &hlp->servers);
815
816 assert(hlp->childs.n_running > 0);
817 -- hlp->childs.n_running;
818
819 if (!srv->flags.shutdown) {
820 assert( hlp->childs.n_active > 0);
821 -- hlp->childs.n_active;
822 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index << " exited");
823
824 if (hlp->childs.needNew() > 0) {
825 debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
826
827 if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30) {
828 if (srv->stats.replies < 1)
829 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
830 else
831 debugs(80, DBG_CRITICAL, "ERROR: The " << hlp->id_name << " helpers are crashing too rapidly, need help!");
832 }
833
834 debugs(80, DBG_IMPORTANT, "Starting new helpers");
835 helperStatefulOpenServers(hlp);
836 }
837 }
838
839 if ((r = srv->request)) {
840 void *cbdata;
841
842 if (cbdataReferenceValidDone(r->data, &cbdata)) {
843 Helper::Reply nilReply;
844 nilReply.whichServer = srv;
845 r->callback(cbdata, nilReply);
846 }
847
848 delete r;
849
850 srv->request = NULL;
851 }
852
853 if (srv->data != NULL)
854 hlp->datapool->freeOne(srv->data);
855
856 cbdataReferenceDone(srv->parent);
857
858 delete srv;
859 }
860
861 /// Calls back with a pointer to the buffer with the helper output
862 static void
863 helperReturnBuffer(int request_number, helper_server * srv, helper * hlp, char * msg, char * msg_end)
864 {
865 Helper::Request *r = srv->requests[request_number];
866 if (r) {
867 HLPCB *callback = r->callback;
868
869 srv->requests[request_number] = NULL;
870
871 r->callback = NULL;
872
873 void *cbdata = NULL;
874 if (cbdataReferenceValidDone(r->data, &cbdata)) {
875 Helper::Reply response(msg, (msg_end-msg));
876 callback(cbdata, response);
877 }
878
879 -- srv->stats.pending;
880 ++ srv->stats.replies;
881
882 ++ hlp->stats.replies;
883
884 srv->answer_time = current_time;
885
886 srv->dispatch_time = r->dispatch_time;
887
888 hlp->stats.avg_svc_time =
889 Math::intAverage(hlp->stats.avg_svc_time,
890 tvSubMsec(r->dispatch_time, current_time),
891 hlp->stats.replies, REDIRECT_AV_FACTOR);
892
893 delete r;
894 } else {
895 debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected reply on channel " <<
896 request_number << " from " << hlp->id_name << " #" << srv->index <<
897 " '" << srv->rbuf << "'");
898 }
899
900 if (!srv->flags.shutdown) {
901 helperKickQueue(hlp);
902 } else if (!srv->flags.closing && !srv->stats.pending) {
903 srv->flags.closing=true;
904 srv->writePipe->close();
905 }
906 }
907
908 static void
909 helperHandleRead(const Comm::ConnectionPointer &conn, char *buf, size_t len, Comm::Flag flag, int xerrno, void *data)
910 {
911 char *t = NULL;
912 helper_server *srv = (helper_server *)data;
913 helper *hlp = srv->parent;
914 assert(cbdataReferenceValid(data));
915
916 /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
917
918 if (flag == Comm::ERR_CLOSING) {
919 return;
920 }
921
922 assert(conn->fd == srv->readPipe->fd);
923
924 debugs(84, 5, "helperHandleRead: " << len << " bytes from " << hlp->id_name << " #" << srv->index);
925
926 if (flag != Comm::OK || len == 0) {
927 srv->closePipesSafely(hlp->id_name);
928 return;
929 }
930
931 srv->roffset += len;
932 srv->rbuf[srv->roffset] = '\0';
933 debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
934
935 if (!srv->stats.pending) {
936 /* someone spoke without being spoken to */
937 debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected read from " <<
938 hlp->id_name << " #" << srv->index << ", " << (int)len <<
939 " bytes '" << srv->rbuf << "'");
940
941 srv->roffset = 0;
942 srv->rbuf[0] = '\0';
943 }
944
945 while ((t = strchr(srv->rbuf, hlp->eom))) {
946 /* end of reply found */
947 char *msg = srv->rbuf;
948 int i = 0;
949 int skip = 1;
950 debugs(84, 3, "helperHandleRead: end of reply found");
951
952 if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n') {
953 *t = '\0';
954 // rewind to the \r octet which is the real terminal now
955 // and remember that we have to skip forward 2 places now.
956 skip = 2;
957 --t;
958 }
959
960 *t = '\0';
961
962 if (hlp->childs.concurrency) {
963 i = strtol(msg, &msg, 10);
964
965 while (*msg && xisspace(*msg))
966 ++msg;
967 }
968
969 helperReturnBuffer(i, srv, hlp, msg, t);
970 srv->roffset -= (t - srv->rbuf) + skip;
971 memmove(srv->rbuf, t + skip, srv->roffset);
972 srv->rbuf[srv->roffset] = '\0';
973 }
974
975 if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
976 int spaceSize = srv->rbuf_sz - srv->roffset - 1;
977 assert(spaceSize >= 0);
978
979 // grow the input buffer if needed and possible
980 if (!spaceSize && srv->rbuf_sz + 4096 <= ReadBufMaxSize) {
981 srv->rbuf = (char *)memReallocBuf(srv->rbuf, srv->rbuf_sz + 4096, &srv->rbuf_sz);
982 debugs(84, 3, HERE << "Grew read buffer to " << srv->rbuf_sz);
983 spaceSize = srv->rbuf_sz - srv->roffset - 1;
984 assert(spaceSize >= 0);
985 }
986
987 // quit reading if there is no space left
988 if (!spaceSize) {
989 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
990 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
991 "Squid input buffer: " << hlp->id_name << " #" << srv->index);
992 srv->closePipesSafely(hlp->id_name);
993 return;
994 }
995
996 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
997 CommIoCbPtrFun(helperHandleRead, srv));
998 comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
999 }
1000 }
1001
1002 static void
1003 helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *buf, size_t len, Comm::Flag flag, int xerrno, void *data)
1004 {
1005 char *t = NULL;
1006 helper_stateful_server *srv = (helper_stateful_server *)data;
1007 Helper::Request *r;
1008 statefulhelper *hlp = srv->parent;
1009 assert(cbdataReferenceValid(data));
1010
1011 /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
1012
1013 if (flag == Comm::ERR_CLOSING) {
1014 return;
1015 }
1016
1017 assert(conn->fd == srv->readPipe->fd);
1018
1019 debugs(84, 5, "helperStatefulHandleRead: " << len << " bytes from " <<
1020 hlp->id_name << " #" << srv->index);
1021
1022 if (flag != Comm::OK || len == 0) {
1023 srv->closePipesSafely(hlp->id_name);
1024 return;
1025 }
1026
1027 srv->roffset += len;
1028 srv->rbuf[srv->roffset] = '\0';
1029 r = srv->request;
1030 debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
1031
1032 if (r == NULL) {
1033 /* someone spoke without being spoken to */
1034 debugs(84, DBG_IMPORTANT, "helperStatefulHandleRead: unexpected read from " <<
1035 hlp->id_name << " #" << srv->index << ", " << (int)len <<
1036 " bytes '" << srv->rbuf << "'");
1037
1038 srv->roffset = 0;
1039 }
1040
1041 if ((t = strchr(srv->rbuf, hlp->eom))) {
1042 /* end of reply found */
1043 int called = 1;
1044 int skip = 1;
1045 debugs(84, 3, "helperStatefulHandleRead: end of reply found");
1046
1047 if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n') {
1048 *t = '\0';
1049 // rewind to the \r octet which is the real terminal now
1050 // and remember that we have to skip forward 2 places now.
1051 skip = 2;
1052 --t;
1053 }
1054
1055 *t = '\0';
1056
1057 if (r && cbdataReferenceValid(r->data)) {
1058 Helper::Reply res(srv->rbuf, (t - srv->rbuf));
1059 res.whichServer = srv;
1060 r->callback(r->data, res);
1061 } else {
1062 debugs(84, DBG_IMPORTANT, "StatefulHandleRead: no callback data registered");
1063 called = 0;
1064 }
1065 // only skip off the \0's _after_ passing its location in Helper::Reply above
1066 t += skip;
1067
1068 /**
1069 * BUG: the below assumes that only one response per read() was received and discards any octets remaining.
1070 * Doing this prohibits concurrency support with multiple replies per read().
1071 * TODO: check that read() setup on these buffers pays attention to roffest!=0
1072 * TODO: check that replies bigger than the buffer are discarded and do not to affect future replies
1073 */
1074 srv->roffset = 0;
1075 delete r;
1076 srv->request = NULL;
1077
1078 -- srv->stats.pending;
1079 ++ srv->stats.replies;
1080
1081 ++ hlp->stats.replies;
1082 srv->answer_time = current_time;
1083 hlp->stats.avg_svc_time =
1084 Math::intAverage(hlp->stats.avg_svc_time,
1085 tvSubMsec(srv->dispatch_time, current_time),
1086 hlp->stats.replies, REDIRECT_AV_FACTOR);
1087
1088 if (called)
1089 helperStatefulServerDone(srv);
1090 else
1091 helperStatefulReleaseServer(srv);
1092 }
1093
1094 if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
1095 int spaceSize = srv->rbuf_sz - srv->roffset - 1;
1096 assert(spaceSize >= 0);
1097
1098 // grow the input buffer if needed and possible
1099 if (!spaceSize && srv->rbuf_sz + 4096 <= ReadBufMaxSize) {
1100 srv->rbuf = (char *)memReallocBuf(srv->rbuf, srv->rbuf_sz + 4096, &srv->rbuf_sz);
1101 debugs(84, 3, HERE << "Grew read buffer to " << srv->rbuf_sz);
1102 spaceSize = srv->rbuf_sz - srv->roffset - 1;
1103 assert(spaceSize >= 0);
1104 }
1105
1106 // quit reading if there is no space left
1107 if (!spaceSize) {
1108 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
1109 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
1110 "Squid input buffer: " << hlp->id_name << " #" << srv->index);
1111 srv->closePipesSafely(hlp->id_name);
1112 return;
1113 }
1114
1115 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
1116 CommIoCbPtrFun(helperStatefulHandleRead, srv));
1117 comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
1118 }
1119 }
1120
1121 /// Handles a request when all running helpers, if any, are busy.
1122 static void
1123 Enqueue(helper * hlp, Helper::Request * r)
1124 {
1125 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
1126 dlinkAddTail(r, link, &hlp->queue);
1127 ++ hlp->stats.queue_size;
1128
1129 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1130 if (hlp->childs.needNew() > 0) {
1131 debugs(84, DBG_CRITICAL, "Starting new " << hlp->id_name << " helpers...");
1132 helperOpenServers(hlp);
1133 return;
1134 }
1135
1136 if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
1137 return;
1138
1139 if (squid_curtime - hlp->last_queue_warn < 600)
1140 return;
1141
1142 if (shutting_down || reconfiguring)
1143 return;
1144
1145 hlp->last_queue_warn = squid_curtime;
1146
1147 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1148 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1149 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1150 }
1151
1152 static void
1153 StatefulEnqueue(statefulhelper * hlp, Helper::Request * r)
1154 {
1155 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
1156 dlinkAddTail(r, link, &hlp->queue);
1157 ++ hlp->stats.queue_size;
1158
1159 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1160 if (hlp->childs.needNew() > 0) {
1161 debugs(84, DBG_CRITICAL, "Starting new " << hlp->id_name << " helpers...");
1162 helperStatefulOpenServers(hlp);
1163 return;
1164 }
1165
1166 if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
1167 return;
1168
1169 if (squid_curtime - hlp->last_queue_warn < 600)
1170 return;
1171
1172 if (shutting_down || reconfiguring)
1173 return;
1174
1175 hlp->last_queue_warn = squid_curtime;
1176
1177 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1178 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1179 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1180 }
1181
1182 static Helper::Request *
1183 Dequeue(helper * hlp)
1184 {
1185 dlink_node *link;
1186 Helper::Request *r = NULL;
1187
1188 if ((link = hlp->queue.head)) {
1189 r = (Helper::Request *)link->data;
1190 dlinkDelete(link, &hlp->queue);
1191 memFree(link, MEM_DLINK_NODE);
1192 -- hlp->stats.queue_size;
1193 }
1194
1195 return r;
1196 }
1197
1198 static Helper::Request *
1199 StatefulDequeue(statefulhelper * hlp)
1200 {
1201 dlink_node *link;
1202 Helper::Request *r = NULL;
1203
1204 if ((link = hlp->queue.head)) {
1205 r = (Helper::Request *)link->data;
1206 dlinkDelete(link, &hlp->queue);
1207 memFree(link, MEM_DLINK_NODE);
1208 -- hlp->stats.queue_size;
1209 }
1210
1211 return r;
1212 }
1213
1214 static helper_server *
1215 GetFirstAvailable(helper * hlp)
1216 {
1217 dlink_node *n;
1218 helper_server *srv;
1219 helper_server *selected = NULL;
1220 debugs(84, 5, "GetFirstAvailable: Running servers " << hlp->childs.n_running);
1221
1222 if (hlp->childs.n_running == 0)
1223 return NULL;
1224
1225 /* Find "least" loaded helper (approx) */
1226 for (n = hlp->servers.head; n != NULL; n = n->next) {
1227 srv = (helper_server *)n->data;
1228
1229 if (selected && selected->stats.pending <= srv->stats.pending)
1230 continue;
1231
1232 if (srv->flags.shutdown)
1233 continue;
1234
1235 if (!srv->stats.pending)
1236 return srv;
1237
1238 if (selected) {
1239 selected = srv;
1240 break;
1241 }
1242
1243 selected = srv;
1244 }
1245
1246 /* Check for overload */
1247 if (!selected) {
1248 debugs(84, 5, "GetFirstAvailable: None available.");
1249 return NULL;
1250 }
1251
1252 if (selected->stats.pending >= (hlp->childs.concurrency ? hlp->childs.concurrency : 1)) {
1253 debugs(84, 3, "GetFirstAvailable: Least-loaded helper is overloaded!");
1254 return NULL;
1255 }
1256
1257 debugs(84, 5, "GetFirstAvailable: returning srv-" << selected->index);
1258 return selected;
1259 }
1260
1261 static helper_stateful_server *
1262 StatefulGetFirstAvailable(statefulhelper * hlp)
1263 {
1264 dlink_node *n;
1265 helper_stateful_server *srv = NULL;
1266 debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp->childs.n_running);
1267
1268 if (hlp->childs.n_running == 0)
1269 return NULL;
1270
1271 for (n = hlp->servers.head; n != NULL; n = n->next) {
1272 srv = (helper_stateful_server *)n->data;
1273
1274 if (srv->stats.pending)
1275 continue;
1276
1277 if (srv->flags.reserved)
1278 continue;
1279
1280 if (srv->flags.shutdown)
1281 continue;
1282
1283 if ((hlp->IsAvailable != NULL) && (srv->data != NULL) && !(hlp->IsAvailable(srv->data)))
1284 continue;
1285
1286 debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv->index);
1287 return srv;
1288 }
1289
1290 debugs(84, 5, "StatefulGetFirstAvailable: None available.");
1291 return NULL;
1292 }
1293
1294 static void
1295 helperDispatchWriteDone(const Comm::ConnectionPointer &conn, char *buf, size_t len, Comm::Flag flag, int xerrno, void *data)
1296 {
1297 helper_server *srv = (helper_server *)data;
1298
1299 srv->writebuf->clean();
1300 delete srv->writebuf;
1301 srv->writebuf = NULL;
1302 srv->flags.writing = false;
1303
1304 if (flag != Comm::OK) {
1305 /* Helper server has crashed */
1306 debugs(84, DBG_CRITICAL, "helperDispatch: Helper " << srv->parent->id_name << " #" << srv->index << " has crashed");
1307 return;
1308 }
1309
1310 if (!srv->wqueue->isNull()) {
1311 srv->writebuf = srv->wqueue;
1312 srv->wqueue = new MemBuf;
1313 srv->flags.writing = true;
1314 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1315 CommIoCbPtrFun(helperDispatchWriteDone, srv));
1316 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
1317 }
1318 }
1319
1320 static void
1321 helperDispatch(helper_server * srv, Helper::Request * r)
1322 {
1323 helper *hlp = srv->parent;
1324 Helper::Request **ptr = NULL;
1325 unsigned int slot;
1326
1327 if (!cbdataReferenceValid(r->data)) {
1328 debugs(84, DBG_IMPORTANT, "helperDispatch: invalid callback data");
1329 delete r;
1330 return;
1331 }
1332
1333 for (slot = 0; slot < (hlp->childs.concurrency ? hlp->childs.concurrency : 1); ++slot) {
1334 if (!srv->requests[slot]) {
1335 ptr = &srv->requests[slot];
1336 break;
1337 }
1338 }
1339
1340 assert(ptr);
1341 *ptr = r;
1342 r->dispatch_time = current_time;
1343
1344 if (srv->wqueue->isNull())
1345 srv->wqueue->init();
1346
1347 if (hlp->childs.concurrency)
1348 srv->wqueue->Printf("%d %s", slot, r->buf);
1349 else
1350 srv->wqueue->append(r->buf, strlen(r->buf));
1351
1352 if (!srv->flags.writing) {
1353 assert(NULL == srv->writebuf);
1354 srv->writebuf = srv->wqueue;
1355 srv->wqueue = new MemBuf;
1356 srv->flags.writing = true;
1357 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1358 CommIoCbPtrFun(helperDispatchWriteDone, srv));
1359 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
1360 }
1361
1362 debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index << ", " << strlen(r->buf) << " bytes");
1363
1364 ++ srv->stats.uses;
1365 ++ srv->stats.pending;
1366 ++ hlp->stats.requests;
1367 }
1368
1369 static void
1370 helperStatefulDispatchWriteDone(const Comm::ConnectionPointer &conn, char *buf, size_t len, Comm::Flag flag,
1371 int xerrno, void *data)
1372 {
1373 /* nothing! */
1374 }
1375
1376 static void
1377 helperStatefulDispatch(helper_stateful_server * srv, Helper::Request * r)
1378 {
1379 statefulhelper *hlp = srv->parent;
1380
1381 if (!cbdataReferenceValid(r->data)) {
1382 debugs(84, DBG_IMPORTANT, "helperStatefulDispatch: invalid callback data");
1383 delete r;
1384 helperStatefulReleaseServer(srv);
1385 return;
1386 }
1387
1388 debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index);
1389
1390 if (r->placeholder == 1) {
1391 /* a callback is needed before this request can _use_ a helper. */
1392 /* we don't care about releasing this helper. The request NEVER
1393 * gets to the helper. So we throw away the return code */
1394 Helper::Reply nilReply;
1395 nilReply.whichServer = srv;
1396 r->callback(r->data, nilReply);
1397 /* throw away the placeholder */
1398 delete r;
1399 /* and push the queue. Note that the callback may have submitted a new
1400 * request to the helper which is why we test for the request */
1401
1402 if (srv->request == NULL)
1403 helperStatefulServerDone(srv);
1404
1405 return;
1406 }
1407
1408 srv->flags.reserved = true;
1409 srv->request = r;
1410 srv->dispatch_time = current_time;
1411 AsyncCall::Pointer call = commCbCall(5,5, "helperStatefulDispatchWriteDone",
1412 CommIoCbPtrFun(helperStatefulDispatchWriteDone, hlp));
1413 Comm::Write(srv->writePipe, r->buf, strlen(r->buf), call, NULL);
1414 debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
1415 hlp->id_name << " #" << srv->index << ", " <<
1416 (int) strlen(r->buf) << " bytes");
1417
1418 ++ srv->stats.uses;
1419 ++ srv->stats.pending;
1420 ++ hlp->stats.requests;
1421 }
1422
1423 static void
1424 helperKickQueue(helper * hlp)
1425 {
1426 Helper::Request *r;
1427 helper_server *srv;
1428
1429 while ((srv = GetFirstAvailable(hlp)) && (r = Dequeue(hlp)))
1430 helperDispatch(srv, r);
1431 }
1432
1433 static void
1434 helperStatefulKickQueue(statefulhelper * hlp)
1435 {
1436 Helper::Request *r;
1437 helper_stateful_server *srv;
1438
1439 while ((srv = StatefulGetFirstAvailable(hlp)) && (r = StatefulDequeue(hlp)))
1440 helperStatefulDispatch(srv, r);
1441 }
1442
1443 static void
1444 helperStatefulServerDone(helper_stateful_server * srv)
1445 {
1446 if (!srv->flags.shutdown) {
1447 helperStatefulKickQueue(srv->parent);
1448 } else if (!srv->flags.closing && !srv->flags.reserved && !srv->stats.pending) {
1449 srv->closeWritePipeSafely(srv->parent->id_name);
1450 return;
1451 }
1452 }
1453
1454 // TODO: should helper_ and helper_stateful_ have a common parent?
1455 static bool
1456 helperStartStats(StoreEntry *sentry, void *hlp, const char *label)
1457 {
1458 if (!hlp) {
1459 if (label)
1460 storeAppendPrintf(sentry, "%s: unavailable\n", label);
1461 return false;
1462 }
1463
1464 if (label)
1465 storeAppendPrintf(sentry, "%s:\n", label);
1466
1467 return true;
1468 }