2 * Copyright (C) 1996-2014 The Squid Software Foundation and contributors
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
9 /* DEBUG: section 84 Helper process maintenance */
12 #include "base/AsyncCbdataCalls.h"
14 #include "comm/Connection.h"
15 #include "comm/Read.h"
16 #include "comm/Write.h"
19 #include "format/Quoting.h"
21 #include "helper/Reply.h"
22 #include "helper/Request.h"
26 #include "SquidMath.h"
27 #include "SquidTime.h"
31 #define HELPER_MAX_ARGS 64
33 /** Initial Squid input buffer size. Helper responses may exceed this, and
34 * Squid will grow the input buffer as needed, up to ReadBufMaxSize.
36 const size_t ReadBufMinSize(4*1024);
38 /** Maximum safe size of a helper-to-Squid response message plus one.
39 * Squid will warn and close the stream if a helper sends a too-big response.
40 * ssl_crtd helper is known to produce responses of at least 10KB in size.
41 * Some undocumented helpers are known to produce responses exceeding 8KB.
43 const size_t ReadBufMaxSize(32*1024);
45 static IOCB helperHandleRead
;
46 static IOCB helperStatefulHandleRead
;
47 static void helperServerFree(helper_server
*srv
);
48 static void helperStatefulServerFree(helper_stateful_server
*srv
);
49 static void Enqueue(helper
* hlp
, Helper::Request
*);
50 static Helper::Request
*Dequeue(helper
* hlp
);
51 static Helper::Request
*StatefulDequeue(statefulhelper
* hlp
);
52 static helper_server
*GetFirstAvailable(helper
* hlp
);
53 static helper_stateful_server
*StatefulGetFirstAvailable(statefulhelper
* hlp
);
54 static void helperDispatch(helper_server
* srv
, Helper::Request
* r
);
55 static void helperStatefulDispatch(helper_stateful_server
* srv
, Helper::Request
* r
);
56 static void helperKickQueue(helper
* hlp
);
57 static void helperStatefulKickQueue(statefulhelper
* hlp
);
58 static void helperStatefulServerDone(helper_stateful_server
* srv
);
59 static void StatefulEnqueue(statefulhelper
* hlp
, Helper::Request
* r
);
60 static bool helperStartStats(StoreEntry
*sentry
, void *hlp
, const char *label
);
62 CBDATA_CLASS_INIT(helper
);
63 CBDATA_CLASS_INIT(helper_server
);
64 CBDATA_CLASS_INIT(statefulhelper
);
65 CBDATA_CLASS_INIT(helper_stateful_server
);
67 InstanceIdDefinitions(HelperServerBase
, "Hlpr");
70 HelperServerBase::initStats()
79 HelperServerBase::closePipesSafely(const char *id_name
)
82 shutdown(writePipe
->fd
, SD_BOTH
);
86 if (readPipe
->fd
== writePipe
->fd
)
94 if (WaitForSingleObject(hIpc
, 5000) != WAIT_OBJECT_0
) {
96 debugs(84, DBG_IMPORTANT
, "WARNING: " << id_name
<<
97 " #" << index
<< " (PID " << (long int)pid
<< ") didn't exit in 5 seconds");
105 HelperServerBase::closeWritePipeSafely(const char *id_name
)
108 shutdown(writePipe
->fd
, (readPipe
->fd
== writePipe
->fd
? SD_BOTH
: SD_SEND
));
111 flags
.closing
= true;
112 if (readPipe
->fd
== writePipe
->fd
)
118 if (WaitForSingleObject(hIpc
, 5000) != WAIT_OBJECT_0
) {
120 debugs(84, DBG_IMPORTANT
, "WARNING: " << id_name
<<
121 " #" << index
<< " (PID " << (long int)pid
<< ") didn't exit in 5 seconds");
129 helperOpenServers(helper
* hlp
)
135 const char *args
[HELPER_MAX_ARGS
+1]; // save space for a NULL terminator
136 char fd_note_buf
[FD_DESC_SZ
];
146 if (hlp
->cmdline
== NULL
)
149 progname
= hlp
->cmdline
->key
;
151 if ((s
= strrchr(progname
, '/')))
152 shortname
= xstrdup(s
+ 1);
154 shortname
= xstrdup(progname
);
156 /* figure out how many new child are actually needed. */
157 int need_new
= hlp
->childs
.needNew();
159 debugs(84, DBG_IMPORTANT
, "helperOpenServers: Starting " << need_new
<< "/" << hlp
->childs
.n_max
<< " '" << shortname
<< "' processes");
162 debugs(84, DBG_IMPORTANT
, "helperOpenServers: No '" << shortname
<< "' processes needed.");
165 procname
= (char *)xmalloc(strlen(shortname
) + 3);
167 snprintf(procname
, strlen(shortname
) + 3, "(%s)", shortname
);
169 args
[nargs
] = procname
;
172 for (w
= hlp
->cmdline
->next
; w
&& nargs
< HELPER_MAX_ARGS
; w
= w
->next
) {
173 args
[nargs
] = w
->key
;
180 assert(nargs
<= HELPER_MAX_ARGS
);
182 for (k
= 0; k
< need_new
; ++k
) {
185 pid
= ipcCreate(hlp
->ipc_type
,
195 debugs(84, DBG_IMPORTANT
, "WARNING: Cannot run '" << progname
<< "' process.");
199 ++ hlp
->childs
.n_running
;
200 ++ hlp
->childs
.n_active
;
201 srv
= new helper_server
;
205 srv
->addr
= hlp
->addr
;
206 srv
->readPipe
= new Comm::Connection
;
207 srv
->readPipe
->fd
= rfd
;
208 srv
->writePipe
= new Comm::Connection
;
209 srv
->writePipe
->fd
= wfd
;
210 srv
->rbuf
= (char *)memAllocBuf(ReadBufMinSize
, &srv
->rbuf_sz
);
211 srv
->wqueue
= new MemBuf
;
213 srv
->requests
= (Helper::Request
**)xcalloc(hlp
->childs
.concurrency
? hlp
->childs
.concurrency
: 1, sizeof(*srv
->requests
));
214 srv
->parent
= cbdataReference(hlp
);
215 dlinkAddTail(srv
, &srv
->link
, &hlp
->servers
);
218 snprintf(fd_note_buf
, FD_DESC_SZ
, "%s #%d", shortname
, k
+ 1);
219 fd_note(rfd
, fd_note_buf
);
221 snprintf(fd_note_buf
, FD_DESC_SZ
, "reading %s #%d", shortname
, k
+ 1);
222 fd_note(rfd
, fd_note_buf
);
223 snprintf(fd_note_buf
, FD_DESC_SZ
, "writing %s #%d", shortname
, k
+ 1);
224 fd_note(wfd
, fd_note_buf
);
227 commSetNonBlocking(rfd
);
230 commSetNonBlocking(wfd
);
232 AsyncCall::Pointer closeCall
= asyncCall(5,4, "helperServerFree", cbdataDialer(helperServerFree
, srv
));
233 comm_add_close_handler(rfd
, closeCall
);
235 AsyncCall::Pointer call
= commCbCall(5,4, "helperHandleRead",
236 CommIoCbPtrFun(helperHandleRead
, srv
));
237 comm_read(srv
->readPipe
, srv
->rbuf
, srv
->rbuf_sz
- 1, call
);
240 hlp
->last_restart
= squid_curtime
;
241 safe_free(shortname
);
243 helperKickQueue(hlp
);
249 * helperStatefulOpenServers: create the stateful child helper processes
252 helperStatefulOpenServers(statefulhelper
* hlp
)
255 const char *args
[HELPER_MAX_ARGS
+1]; // save space for a NULL terminator
256 char fd_note_buf
[FD_DESC_SZ
];
259 if (hlp
->cmdline
== NULL
)
262 if (hlp
->childs
.concurrency
)
263 debugs(84, DBG_CRITICAL
, "ERROR: concurrency= is not yet supported for stateful helpers ('" << hlp
->cmdline
<< "')");
265 char *progname
= hlp
->cmdline
->key
;
268 if ((s
= strrchr(progname
, '/')))
269 shortname
= xstrdup(s
+ 1);
271 shortname
= xstrdup(progname
);
273 /* figure out haw mant new helpers are needed. */
274 int need_new
= hlp
->childs
.needNew();
276 debugs(84, DBG_IMPORTANT
, "helperOpenServers: Starting " << need_new
<< "/" << hlp
->childs
.n_max
<< " '" << shortname
<< "' processes");
279 debugs(84, DBG_IMPORTANT
, "helperStatefulOpenServers: No '" << shortname
<< "' processes needed.");
282 char *procname
= (char *)xmalloc(strlen(shortname
) + 3);
284 snprintf(procname
, strlen(shortname
) + 3, "(%s)", shortname
);
286 args
[nargs
] = procname
;
289 for (wordlist
*w
= hlp
->cmdline
->next
; w
&& nargs
< HELPER_MAX_ARGS
; w
= w
->next
) {
290 args
[nargs
] = w
->key
;
297 assert(nargs
<= HELPER_MAX_ARGS
);
299 for (int k
= 0; k
< need_new
; ++k
) {
304 pid_t pid
= ipcCreate(hlp
->ipc_type
,
314 debugs(84, DBG_IMPORTANT
, "WARNING: Cannot run '" << progname
<< "' process.");
318 ++ hlp
->childs
.n_running
;
319 ++ hlp
->childs
.n_active
;
320 helper_stateful_server
*srv
= new helper_stateful_server
;
323 srv
->flags
.reserved
= false;
325 srv
->addr
= hlp
->addr
;
326 srv
->readPipe
= new Comm::Connection
;
327 srv
->readPipe
->fd
= rfd
;
328 srv
->writePipe
= new Comm::Connection
;
329 srv
->writePipe
->fd
= wfd
;
330 srv
->rbuf
= (char *)memAllocBuf(ReadBufMinSize
, &srv
->rbuf_sz
);
332 srv
->parent
= cbdataReference(hlp
);
334 if (hlp
->datapool
!= NULL
)
335 srv
->data
= hlp
->datapool
->alloc();
337 dlinkAddTail(srv
, &srv
->link
, &hlp
->servers
);
340 snprintf(fd_note_buf
, FD_DESC_SZ
, "%s #%d", shortname
, k
+ 1);
341 fd_note(rfd
, fd_note_buf
);
343 snprintf(fd_note_buf
, FD_DESC_SZ
, "reading %s #%d", shortname
, k
+ 1);
344 fd_note(rfd
, fd_note_buf
);
345 snprintf(fd_note_buf
, FD_DESC_SZ
, "writing %s #%d", shortname
, k
+ 1);
346 fd_note(wfd
, fd_note_buf
);
349 commSetNonBlocking(rfd
);
352 commSetNonBlocking(wfd
);
354 AsyncCall::Pointer closeCall
= asyncCall(5,4, "helperStatefulServerFree", cbdataDialer(helperStatefulServerFree
, srv
));
355 comm_add_close_handler(rfd
, closeCall
);
357 AsyncCall::Pointer call
= commCbCall(5,4, "helperStatefulHandleRead",
358 CommIoCbPtrFun(helperStatefulHandleRead
, srv
));
359 comm_read(srv
->readPipe
, srv
->rbuf
, srv
->rbuf_sz
- 1, call
);
362 hlp
->last_restart
= squid_curtime
;
363 safe_free(shortname
);
365 helperStatefulKickQueue(hlp
);
369 helperSubmit(helper
* hlp
, const char *buf
, HLPCB
* callback
, void *data
)
372 debugs(84, 3, "helperSubmit: hlp == NULL");
373 Helper::Reply nilReply
;
374 callback(data
, nilReply
);
378 hlp
->submit(buf
, callback
, data
);
382 helper::queueFull() const {
383 return stats
.queue_size
> static_cast<int>(childs
.queue_size
);
386 /// prepares the helper for request submission via trySubmit() or helperSubmit()
387 /// currently maintains full_time and kills Squid if the helper remains full for too long
393 else if (!full_time
) // may happen here if reconfigure decreases capacity
394 full_time
= squid_curtime
;
395 else if (squid_curtime
- full_time
> 180)
396 fatalf("Too many queued %s requests", id_name
);
400 helper::trySubmit(const char *buf
, HLPCB
* callback
, void *data
)
405 debugs(84, DBG_IMPORTANT
, id_name
<< " drops request due to a full queue");
406 return false; // request was ignored
409 submit(buf
, callback
, data
); // will send or queue
410 return true; // request submitted or queued
413 /// dispatches or enqueues a helper requests; does not enforce queue limits
415 helper::submit(const char *buf
, HLPCB
* callback
, void *data
)
417 Helper::Request
*r
= new Helper::Request(callback
, data
, buf
);
420 if ((srv
= GetFirstAvailable(this)))
421 helperDispatch(srv
, r
);
425 debugs(84, DBG_DATA
, Raw("buf", buf
, strlen(buf
)));
429 } else if (!full_time
) {
430 debugs(84, 3, id_name
<< " queue became full");
431 full_time
= squid_curtime
;
435 /// lastserver = "server last used as part of a reserved request sequence"
437 helperStatefulSubmit(statefulhelper
* hlp
, const char *buf
, HLPCB
* callback
, void *data
, helper_stateful_server
* lastserver
)
440 debugs(84, 3, "helperStatefulSubmit: hlp == NULL");
441 Helper::Reply nilReply
;
442 callback(data
, nilReply
);
446 hlp
->submit(buf
, callback
, data
, lastserver
);
449 void statefulhelper::submit(const char *buf
, HLPCB
* callback
, void *data
, helper_stateful_server
* lastserver
)
451 Helper::Request
*r
= new Helper::Request(callback
, data
, buf
);
453 if ((buf
!= NULL
) && lastserver
) {
454 debugs(84, 5, "StatefulSubmit with lastserver " << lastserver
);
455 assert(lastserver
->flags
.reserved
);
456 assert(!(lastserver
->request
));
458 debugs(84, 5, "StatefulSubmit dispatching");
459 helperStatefulDispatch(lastserver
, r
);
461 helper_stateful_server
*srv
;
462 if ((srv
= StatefulGetFirstAvailable(this))) {
463 helperStatefulDispatch(srv
, r
);
465 StatefulEnqueue(this, r
);
468 debugs(84, DBG_DATA
, "placeholder: '" << r
->placeholder
<<
469 "', " << Raw("buf", buf
, (!buf
?0:strlen(buf
))));
473 } else if (!full_time
) {
474 debugs(84, 3, id_name
<< " queue became full");
475 full_time
= squid_curtime
;
482 * helperStatefulReleaseServer tells the helper that whoever was
483 * using it no longer needs its services.
486 helperStatefulReleaseServer(helper_stateful_server
* srv
)
488 debugs(84, 3, HERE
<< "srv-" << srv
->index
<< " flags.reserved = " << srv
->flags
.reserved
);
489 if (!srv
->flags
.reserved
)
492 ++ srv
->stats
.releases
;
494 srv
->flags
.reserved
= false;
495 if (srv
->parent
->OnEmptyQueue
!= NULL
&& srv
->data
)
496 srv
->parent
->OnEmptyQueue(srv
->data
);
498 helperStatefulServerDone(srv
);
501 /** return a pointer to the stateful routines data area */
503 helperStatefulServerGetData(helper_stateful_server
* srv
)
509 * Dump some stats about the helper states to a StoreEntry
512 helperStats(StoreEntry
* sentry
, helper
* hlp
, const char *label
)
514 if (!helperStartStats(sentry
, hlp
, label
))
517 storeAppendPrintf(sentry
, "program: %s\n",
519 storeAppendPrintf(sentry
, "number active: %d of %d (%d shutting down)\n",
520 hlp
->childs
.n_active
, hlp
->childs
.n_max
, (hlp
->childs
.n_running
- hlp
->childs
.n_active
) );
521 storeAppendPrintf(sentry
, "requests sent: %d\n",
522 hlp
->stats
.requests
);
523 storeAppendPrintf(sentry
, "replies received: %d\n",
525 storeAppendPrintf(sentry
, "queue length: %d\n",
526 hlp
->stats
.queue_size
);
527 storeAppendPrintf(sentry
, "avg service time: %d msec\n",
528 hlp
->stats
.avg_svc_time
);
529 storeAppendPrintf(sentry
, "\n");
530 storeAppendPrintf(sentry
, "%7s\t%7s\t%7s\t%11s\t%11s\t%s\t%7s\t%7s\t%7s\n",
541 for (dlink_node
*link
= hlp
->servers
.head
; link
; link
= link
->next
) {
542 helper_server
*srv
= (helper_server
*)link
->data
;
543 double tt
= 0.001 * (srv
->requests
[0] ? tvSubMsec(srv
->requests
[0]->dispatch_time
, current_time
) : tvSubMsec(srv
->dispatch_time
, srv
->answer_time
));
544 storeAppendPrintf(sentry
, "%7u\t%7d\t%7d\t%11" PRIu64
"\t%11" PRIu64
"\t%c%c%c%c\t%7.3f\t%7d\t%s\n",
550 srv
->stats
.pending
? 'B' : ' ',
551 srv
->flags
.writing
? 'W' : ' ',
552 srv
->flags
.closing
? 'C' : ' ',
553 srv
->flags
.shutdown
? 'S' : ' ',
556 srv
->requests
[0] ? Format::QuoteMimeBlob(srv
->requests
[0]->buf
) : "(none)");
559 storeAppendPrintf(sentry
, "\nFlags key:\n\n");
560 storeAppendPrintf(sentry
, " B = BUSY\n");
561 storeAppendPrintf(sentry
, " W = WRITING\n");
562 storeAppendPrintf(sentry
, " C = CLOSING\n");
563 storeAppendPrintf(sentry
, " S = SHUTDOWN PENDING\n");
567 helperStatefulStats(StoreEntry
* sentry
, statefulhelper
* hlp
, const char *label
)
569 if (!helperStartStats(sentry
, hlp
, label
))
572 storeAppendPrintf(sentry
, "program: %s\n",
574 storeAppendPrintf(sentry
, "number active: %d of %d (%d shutting down)\n",
575 hlp
->childs
.n_active
, hlp
->childs
.n_max
, (hlp
->childs
.n_running
- hlp
->childs
.n_active
) );
576 storeAppendPrintf(sentry
, "requests sent: %d\n",
577 hlp
->stats
.requests
);
578 storeAppendPrintf(sentry
, "replies received: %d\n",
580 storeAppendPrintf(sentry
, "queue length: %d\n",
581 hlp
->stats
.queue_size
);
582 storeAppendPrintf(sentry
, "avg service time: %d msec\n",
583 hlp
->stats
.avg_svc_time
);
584 storeAppendPrintf(sentry
, "\n");
585 storeAppendPrintf(sentry
, "%7s\t%7s\t%7s\t%11s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
596 for (dlink_node
*link
= hlp
->servers
.head
; link
; link
= link
->next
) {
597 helper_stateful_server
*srv
= (helper_stateful_server
*)link
->data
;
598 double tt
= 0.001 * tvSubMsec(srv
->dispatch_time
, srv
->stats
.pending
? current_time
: srv
->answer_time
);
599 storeAppendPrintf(sentry
, "%7u\t%7d\t%7d\t%11" PRIu64
"\t%11" PRIu64
"\t%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
605 srv
->stats
.pending
? 'B' : ' ',
606 srv
->flags
.closing
? 'C' : ' ',
607 srv
->flags
.reserved
? 'R' : ' ',
608 srv
->flags
.shutdown
? 'S' : ' ',
609 srv
->request
? (srv
->request
->placeholder
? 'P' : ' ') : ' ',
612 srv
->request
? Format::QuoteMimeBlob(srv
->request
->buf
) : "(none)");
615 storeAppendPrintf(sentry
, "\nFlags key:\n\n");
616 storeAppendPrintf(sentry
, " B = BUSY\n");
617 storeAppendPrintf(sentry
, " C = CLOSING\n");
618 storeAppendPrintf(sentry
, " R = RESERVED\n");
619 storeAppendPrintf(sentry
, " S = SHUTDOWN PENDING\n");
620 storeAppendPrintf(sentry
, " P = PLACEHOLDER\n");
624 helperShutdown(helper
* hlp
)
626 dlink_node
*link
= hlp
->servers
.head
;
630 srv
= (helper_server
*)link
->data
;
633 if (srv
->flags
.shutdown
) {
634 debugs(84, 3, "helperShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " has already SHUT DOWN.");
638 assert(hlp
->childs
.n_active
> 0);
639 -- hlp
->childs
.n_active
;
640 srv
->flags
.shutdown
= true; /* request it to shut itself down */
642 if (srv
->flags
.closing
) {
643 debugs(84, 3, "helperShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " is CLOSING.");
647 if (srv
->stats
.pending
) {
648 debugs(84, 3, "helperShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " is BUSY.");
652 debugs(84, 3, "helperShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " shutting down.");
653 /* the rest of the details is dealt with in the helperServerFree
656 srv
->closePipesSafely(hlp
->id_name
);
661 helperStatefulShutdown(statefulhelper
* hlp
)
663 dlink_node
*link
= hlp
->servers
.head
;
664 helper_stateful_server
*srv
;
667 srv
= (helper_stateful_server
*)link
->data
;
670 if (srv
->flags
.shutdown
) {
671 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " has already SHUT DOWN.");
675 assert(hlp
->childs
.n_active
> 0);
676 -- hlp
->childs
.n_active
;
677 srv
->flags
.shutdown
= true; /* request it to shut itself down */
679 if (srv
->stats
.pending
) {
680 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " is BUSY.");
684 if (srv
->flags
.closing
) {
685 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " is CLOSING.");
689 if (srv
->flags
.reserved
) {
691 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " is RESERVED. Closing anyway.");
693 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " is RESERVED. Not Shutting Down Yet.");
698 debugs(84, 3, "helperStatefulShutdown: " << hlp
->id_name
<< " #" << srv
->index
<< " shutting down.");
700 /* the rest of the details is dealt with in the helperStatefulServerFree
703 srv
->closePipesSafely(hlp
->id_name
);
709 /* note, don't free id_name, it probably points to static memory */
712 debugs(84, DBG_CRITICAL
, "WARNING: freeing " << id_name
<< " helper with " << stats
.queue_size
<< " requests queued");
715 /* ====================================================================== */
716 /* LOCAL FUNCTIONS */
717 /* ====================================================================== */
720 helperServerFree(helper_server
*srv
)
722 helper
*hlp
= srv
->parent
;
724 int i
, concurrency
= hlp
->childs
.concurrency
;
730 memFreeBuf(srv
->rbuf_sz
, srv
->rbuf
);
734 srv
->wqueue
->clean();
738 srv
->writebuf
->clean();
739 delete srv
->writebuf
;
740 srv
->writebuf
= NULL
;
743 if (Comm::IsConnOpen(srv
->writePipe
))
744 srv
->closeWritePipeSafely(hlp
->id_name
);
746 dlinkDelete(&srv
->link
, &hlp
->servers
);
748 assert(hlp
->childs
.n_running
> 0);
749 -- hlp
->childs
.n_running
;
751 if (!srv
->flags
.shutdown
) {
752 assert(hlp
->childs
.n_active
> 0);
753 -- hlp
->childs
.n_active
;
754 debugs(84, DBG_CRITICAL
, "WARNING: " << hlp
->id_name
<< " #" << srv
->index
<< " exited");
756 if (hlp
->childs
.needNew() > 0) {
757 debugs(80, DBG_IMPORTANT
, "Too few " << hlp
->id_name
<< " processes are running (need " << hlp
->childs
.needNew() << "/" << hlp
->childs
.n_max
<< ")");
759 if (hlp
->childs
.n_active
< hlp
->childs
.n_startup
&& hlp
->last_restart
> squid_curtime
- 30) {
760 if (srv
->stats
.replies
< 1)
761 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp
->id_name
);
763 debugs(80, DBG_CRITICAL
, "ERROR: The " << hlp
->id_name
<< " helpers are crashing too rapidly, need help!");
766 debugs(80, DBG_IMPORTANT
, "Starting new helpers");
767 helperOpenServers(hlp
);
771 for (i
= 0; i
< concurrency
; ++i
) {
772 // XXX: re-schedule these on another helper?
773 if ((r
= srv
->requests
[i
])) {
776 if (cbdataReferenceValidDone(r
->data
, &cbdata
)) {
777 Helper::Reply nilReply
;
778 r
->callback(cbdata
, nilReply
);
783 srv
->requests
[i
] = NULL
;
786 safe_free(srv
->requests
);
788 cbdataReferenceDone(srv
->parent
);
793 helperStatefulServerFree(helper_stateful_server
*srv
)
795 statefulhelper
*hlp
= srv
->parent
;
799 memFreeBuf(srv
->rbuf_sz
, srv
->rbuf
);
804 srv
->wqueue
->clean();
810 /* TODO: walk the local queue of requests and carry them all out */
811 if (Comm::IsConnOpen(srv
->writePipe
))
812 srv
->closeWritePipeSafely(hlp
->id_name
);
814 dlinkDelete(&srv
->link
, &hlp
->servers
);
816 assert(hlp
->childs
.n_running
> 0);
817 -- hlp
->childs
.n_running
;
819 if (!srv
->flags
.shutdown
) {
820 assert( hlp
->childs
.n_active
> 0);
821 -- hlp
->childs
.n_active
;
822 debugs(84, DBG_CRITICAL
, "WARNING: " << hlp
->id_name
<< " #" << srv
->index
<< " exited");
824 if (hlp
->childs
.needNew() > 0) {
825 debugs(80, DBG_IMPORTANT
, "Too few " << hlp
->id_name
<< " processes are running (need " << hlp
->childs
.needNew() << "/" << hlp
->childs
.n_max
<< ")");
827 if (hlp
->childs
.n_active
< hlp
->childs
.n_startup
&& hlp
->last_restart
> squid_curtime
- 30) {
828 if (srv
->stats
.replies
< 1)
829 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp
->id_name
);
831 debugs(80, DBG_CRITICAL
, "ERROR: The " << hlp
->id_name
<< " helpers are crashing too rapidly, need help!");
834 debugs(80, DBG_IMPORTANT
, "Starting new helpers");
835 helperStatefulOpenServers(hlp
);
839 if ((r
= srv
->request
)) {
842 if (cbdataReferenceValidDone(r
->data
, &cbdata
)) {
843 Helper::Reply nilReply
;
844 nilReply
.whichServer
= srv
;
845 r
->callback(cbdata
, nilReply
);
853 if (srv
->data
!= NULL
)
854 hlp
->datapool
->freeOne(srv
->data
);
856 cbdataReferenceDone(srv
->parent
);
861 /// Calls back with a pointer to the buffer with the helper output
863 helperReturnBuffer(int request_number
, helper_server
* srv
, helper
* hlp
, char * msg
, char * msg_end
)
865 Helper::Request
*r
= srv
->requests
[request_number
];
867 HLPCB
*callback
= r
->callback
;
869 srv
->requests
[request_number
] = NULL
;
874 if (cbdataReferenceValidDone(r
->data
, &cbdata
)) {
875 Helper::Reply
response(msg
, (msg_end
-msg
));
876 callback(cbdata
, response
);
879 -- srv
->stats
.pending
;
880 ++ srv
->stats
.replies
;
882 ++ hlp
->stats
.replies
;
884 srv
->answer_time
= current_time
;
886 srv
->dispatch_time
= r
->dispatch_time
;
888 hlp
->stats
.avg_svc_time
=
889 Math::intAverage(hlp
->stats
.avg_svc_time
,
890 tvSubMsec(r
->dispatch_time
, current_time
),
891 hlp
->stats
.replies
, REDIRECT_AV_FACTOR
);
895 debugs(84, DBG_IMPORTANT
, "helperHandleRead: unexpected reply on channel " <<
896 request_number
<< " from " << hlp
->id_name
<< " #" << srv
->index
<<
897 " '" << srv
->rbuf
<< "'");
900 if (!srv
->flags
.shutdown
) {
901 helperKickQueue(hlp
);
902 } else if (!srv
->flags
.closing
&& !srv
->stats
.pending
) {
903 srv
->flags
.closing
=true;
904 srv
->writePipe
->close();
909 helperHandleRead(const Comm::ConnectionPointer
&conn
, char *buf
, size_t len
, Comm::Flag flag
, int xerrno
, void *data
)
912 helper_server
*srv
= (helper_server
*)data
;
913 helper
*hlp
= srv
->parent
;
914 assert(cbdataReferenceValid(data
));
916 /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
918 if (flag
== Comm::ERR_CLOSING
) {
922 assert(conn
->fd
== srv
->readPipe
->fd
);
924 debugs(84, 5, "helperHandleRead: " << len
<< " bytes from " << hlp
->id_name
<< " #" << srv
->index
);
926 if (flag
!= Comm::OK
|| len
== 0) {
927 srv
->closePipesSafely(hlp
->id_name
);
932 srv
->rbuf
[srv
->roffset
] = '\0';
933 debugs(84, DBG_DATA
, Raw("accumulated", srv
->rbuf
, srv
->roffset
));
935 if (!srv
->stats
.pending
) {
936 /* someone spoke without being spoken to */
937 debugs(84, DBG_IMPORTANT
, "helperHandleRead: unexpected read from " <<
938 hlp
->id_name
<< " #" << srv
->index
<< ", " << (int)len
<<
939 " bytes '" << srv
->rbuf
<< "'");
945 while ((t
= strchr(srv
->rbuf
, hlp
->eom
))) {
946 /* end of reply found */
947 char *msg
= srv
->rbuf
;
950 debugs(84, 3, "helperHandleRead: end of reply found");
952 if (t
> srv
->rbuf
&& t
[-1] == '\r' && hlp
->eom
== '\n') {
954 // rewind to the \r octet which is the real terminal now
955 // and remember that we have to skip forward 2 places now.
962 if (hlp
->childs
.concurrency
) {
963 i
= strtol(msg
, &msg
, 10);
965 while (*msg
&& xisspace(*msg
))
969 helperReturnBuffer(i
, srv
, hlp
, msg
, t
);
970 srv
->roffset
-= (t
- srv
->rbuf
) + skip
;
971 memmove(srv
->rbuf
, t
+ skip
, srv
->roffset
);
972 srv
->rbuf
[srv
->roffset
] = '\0';
975 if (Comm::IsConnOpen(srv
->readPipe
) && !fd_table
[srv
->readPipe
->fd
].closing()) {
976 int spaceSize
= srv
->rbuf_sz
- srv
->roffset
- 1;
977 assert(spaceSize
>= 0);
979 // grow the input buffer if needed and possible
980 if (!spaceSize
&& srv
->rbuf_sz
+ 4096 <= ReadBufMaxSize
) {
981 srv
->rbuf
= (char *)memReallocBuf(srv
->rbuf
, srv
->rbuf_sz
+ 4096, &srv
->rbuf_sz
);
982 debugs(84, 3, HERE
<< "Grew read buffer to " << srv
->rbuf_sz
);
983 spaceSize
= srv
->rbuf_sz
- srv
->roffset
- 1;
984 assert(spaceSize
>= 0);
987 // quit reading if there is no space left
989 debugs(84, DBG_IMPORTANT
, "ERROR: Disconnecting from a " <<
990 "helper that overflowed " << srv
->rbuf_sz
<< "-byte " <<
991 "Squid input buffer: " << hlp
->id_name
<< " #" << srv
->index
);
992 srv
->closePipesSafely(hlp
->id_name
);
996 AsyncCall::Pointer call
= commCbCall(5,4, "helperHandleRead",
997 CommIoCbPtrFun(helperHandleRead
, srv
));
998 comm_read(srv
->readPipe
, srv
->rbuf
+ srv
->roffset
, spaceSize
, call
);
1003 helperStatefulHandleRead(const Comm::ConnectionPointer
&conn
, char *buf
, size_t len
, Comm::Flag flag
, int xerrno
, void *data
)
1006 helper_stateful_server
*srv
= (helper_stateful_server
*)data
;
1008 statefulhelper
*hlp
= srv
->parent
;
1009 assert(cbdataReferenceValid(data
));
1011 /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
1013 if (flag
== Comm::ERR_CLOSING
) {
1017 assert(conn
->fd
== srv
->readPipe
->fd
);
1019 debugs(84, 5, "helperStatefulHandleRead: " << len
<< " bytes from " <<
1020 hlp
->id_name
<< " #" << srv
->index
);
1022 if (flag
!= Comm::OK
|| len
== 0) {
1023 srv
->closePipesSafely(hlp
->id_name
);
1027 srv
->roffset
+= len
;
1028 srv
->rbuf
[srv
->roffset
] = '\0';
1030 debugs(84, DBG_DATA
, Raw("accumulated", srv
->rbuf
, srv
->roffset
));
1033 /* someone spoke without being spoken to */
1034 debugs(84, DBG_IMPORTANT
, "helperStatefulHandleRead: unexpected read from " <<
1035 hlp
->id_name
<< " #" << srv
->index
<< ", " << (int)len
<<
1036 " bytes '" << srv
->rbuf
<< "'");
1041 if ((t
= strchr(srv
->rbuf
, hlp
->eom
))) {
1042 /* end of reply found */
1045 debugs(84, 3, "helperStatefulHandleRead: end of reply found");
1047 if (t
> srv
->rbuf
&& t
[-1] == '\r' && hlp
->eom
== '\n') {
1049 // rewind to the \r octet which is the real terminal now
1050 // and remember that we have to skip forward 2 places now.
1057 if (r
&& cbdataReferenceValid(r
->data
)) {
1058 Helper::Reply
res(srv
->rbuf
, (t
- srv
->rbuf
));
1059 res
.whichServer
= srv
;
1060 r
->callback(r
->data
, res
);
1062 debugs(84, DBG_IMPORTANT
, "StatefulHandleRead: no callback data registered");
1065 // only skip off the \0's _after_ passing its location in Helper::Reply above
1069 * BUG: the below assumes that only one response per read() was received and discards any octets remaining.
1070 * Doing this prohibits concurrency support with multiple replies per read().
1071 * TODO: check that read() setup on these buffers pays attention to roffest!=0
1072 * TODO: check that replies bigger than the buffer are discarded and do not to affect future replies
1076 srv
->request
= NULL
;
1078 -- srv
->stats
.pending
;
1079 ++ srv
->stats
.replies
;
1081 ++ hlp
->stats
.replies
;
1082 srv
->answer_time
= current_time
;
1083 hlp
->stats
.avg_svc_time
=
1084 Math::intAverage(hlp
->stats
.avg_svc_time
,
1085 tvSubMsec(srv
->dispatch_time
, current_time
),
1086 hlp
->stats
.replies
, REDIRECT_AV_FACTOR
);
1089 helperStatefulServerDone(srv
);
1091 helperStatefulReleaseServer(srv
);
1094 if (Comm::IsConnOpen(srv
->readPipe
) && !fd_table
[srv
->readPipe
->fd
].closing()) {
1095 int spaceSize
= srv
->rbuf_sz
- srv
->roffset
- 1;
1096 assert(spaceSize
>= 0);
1098 // grow the input buffer if needed and possible
1099 if (!spaceSize
&& srv
->rbuf_sz
+ 4096 <= ReadBufMaxSize
) {
1100 srv
->rbuf
= (char *)memReallocBuf(srv
->rbuf
, srv
->rbuf_sz
+ 4096, &srv
->rbuf_sz
);
1101 debugs(84, 3, HERE
<< "Grew read buffer to " << srv
->rbuf_sz
);
1102 spaceSize
= srv
->rbuf_sz
- srv
->roffset
- 1;
1103 assert(spaceSize
>= 0);
1106 // quit reading if there is no space left
1108 debugs(84, DBG_IMPORTANT
, "ERROR: Disconnecting from a " <<
1109 "helper that overflowed " << srv
->rbuf_sz
<< "-byte " <<
1110 "Squid input buffer: " << hlp
->id_name
<< " #" << srv
->index
);
1111 srv
->closePipesSafely(hlp
->id_name
);
1115 AsyncCall::Pointer call
= commCbCall(5,4, "helperStatefulHandleRead",
1116 CommIoCbPtrFun(helperStatefulHandleRead
, srv
));
1117 comm_read(srv
->readPipe
, srv
->rbuf
+ srv
->roffset
, spaceSize
, call
);
1121 /// Handles a request when all running helpers, if any, are busy.
1123 Enqueue(helper
* hlp
, Helper::Request
* r
)
1125 dlink_node
*link
= (dlink_node
*)memAllocate(MEM_DLINK_NODE
);
1126 dlinkAddTail(r
, link
, &hlp
->queue
);
1127 ++ hlp
->stats
.queue_size
;
1129 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1130 if (hlp
->childs
.needNew() > 0) {
1131 debugs(84, DBG_CRITICAL
, "Starting new " << hlp
->id_name
<< " helpers...");
1132 helperOpenServers(hlp
);
1136 if (hlp
->stats
.queue_size
< (int)hlp
->childs
.queue_size
)
1139 if (squid_curtime
- hlp
->last_queue_warn
< 600)
1142 if (shutting_down
|| reconfiguring
)
1145 hlp
->last_queue_warn
= squid_curtime
;
1147 debugs(84, DBG_CRITICAL
, "WARNING: All " << hlp
->childs
.n_active
<< "/" << hlp
->childs
.n_max
<< " " << hlp
->id_name
<< " processes are busy.");
1148 debugs(84, DBG_CRITICAL
, "WARNING: " << hlp
->stats
.queue_size
<< " pending requests queued");
1149 debugs(84, DBG_CRITICAL
, "WARNING: Consider increasing the number of " << hlp
->id_name
<< " processes in your config file.");
1153 StatefulEnqueue(statefulhelper
* hlp
, Helper::Request
* r
)
1155 dlink_node
*link
= (dlink_node
*)memAllocate(MEM_DLINK_NODE
);
1156 dlinkAddTail(r
, link
, &hlp
->queue
);
1157 ++ hlp
->stats
.queue_size
;
1159 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1160 if (hlp
->childs
.needNew() > 0) {
1161 debugs(84, DBG_CRITICAL
, "Starting new " << hlp
->id_name
<< " helpers...");
1162 helperStatefulOpenServers(hlp
);
1166 if (hlp
->stats
.queue_size
< (int)hlp
->childs
.queue_size
)
1169 if (squid_curtime
- hlp
->last_queue_warn
< 600)
1172 if (shutting_down
|| reconfiguring
)
1175 hlp
->last_queue_warn
= squid_curtime
;
1177 debugs(84, DBG_CRITICAL
, "WARNING: All " << hlp
->childs
.n_active
<< "/" << hlp
->childs
.n_max
<< " " << hlp
->id_name
<< " processes are busy.");
1178 debugs(84, DBG_CRITICAL
, "WARNING: " << hlp
->stats
.queue_size
<< " pending requests queued");
1179 debugs(84, DBG_CRITICAL
, "WARNING: Consider increasing the number of " << hlp
->id_name
<< " processes in your config file.");
1182 static Helper::Request
*
1183 Dequeue(helper
* hlp
)
1186 Helper::Request
*r
= NULL
;
1188 if ((link
= hlp
->queue
.head
)) {
1189 r
= (Helper::Request
*)link
->data
;
1190 dlinkDelete(link
, &hlp
->queue
);
1191 memFree(link
, MEM_DLINK_NODE
);
1192 -- hlp
->stats
.queue_size
;
1198 static Helper::Request
*
1199 StatefulDequeue(statefulhelper
* hlp
)
1202 Helper::Request
*r
= NULL
;
1204 if ((link
= hlp
->queue
.head
)) {
1205 r
= (Helper::Request
*)link
->data
;
1206 dlinkDelete(link
, &hlp
->queue
);
1207 memFree(link
, MEM_DLINK_NODE
);
1208 -- hlp
->stats
.queue_size
;
1214 static helper_server
*
1215 GetFirstAvailable(helper
* hlp
)
1219 helper_server
*selected
= NULL
;
1220 debugs(84, 5, "GetFirstAvailable: Running servers " << hlp
->childs
.n_running
);
1222 if (hlp
->childs
.n_running
== 0)
1225 /* Find "least" loaded helper (approx) */
1226 for (n
= hlp
->servers
.head
; n
!= NULL
; n
= n
->next
) {
1227 srv
= (helper_server
*)n
->data
;
1229 if (selected
&& selected
->stats
.pending
<= srv
->stats
.pending
)
1232 if (srv
->flags
.shutdown
)
1235 if (!srv
->stats
.pending
)
1246 /* Check for overload */
1248 debugs(84, 5, "GetFirstAvailable: None available.");
1252 if (selected
->stats
.pending
>= (hlp
->childs
.concurrency
? hlp
->childs
.concurrency
: 1)) {
1253 debugs(84, 3, "GetFirstAvailable: Least-loaded helper is overloaded!");
1257 debugs(84, 5, "GetFirstAvailable: returning srv-" << selected
->index
);
1261 static helper_stateful_server
*
1262 StatefulGetFirstAvailable(statefulhelper
* hlp
)
1265 helper_stateful_server
*srv
= NULL
;
1266 debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp
->childs
.n_running
);
1268 if (hlp
->childs
.n_running
== 0)
1271 for (n
= hlp
->servers
.head
; n
!= NULL
; n
= n
->next
) {
1272 srv
= (helper_stateful_server
*)n
->data
;
1274 if (srv
->stats
.pending
)
1277 if (srv
->flags
.reserved
)
1280 if (srv
->flags
.shutdown
)
1283 if ((hlp
->IsAvailable
!= NULL
) && (srv
->data
!= NULL
) && !(hlp
->IsAvailable(srv
->data
)))
1286 debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv
->index
);
1290 debugs(84, 5, "StatefulGetFirstAvailable: None available.");
1295 helperDispatchWriteDone(const Comm::ConnectionPointer
&conn
, char *buf
, size_t len
, Comm::Flag flag
, int xerrno
, void *data
)
1297 helper_server
*srv
= (helper_server
*)data
;
1299 srv
->writebuf
->clean();
1300 delete srv
->writebuf
;
1301 srv
->writebuf
= NULL
;
1302 srv
->flags
.writing
= false;
1304 if (flag
!= Comm::OK
) {
1305 /* Helper server has crashed */
1306 debugs(84, DBG_CRITICAL
, "helperDispatch: Helper " << srv
->parent
->id_name
<< " #" << srv
->index
<< " has crashed");
1310 if (!srv
->wqueue
->isNull()) {
1311 srv
->writebuf
= srv
->wqueue
;
1312 srv
->wqueue
= new MemBuf
;
1313 srv
->flags
.writing
= true;
1314 AsyncCall::Pointer call
= commCbCall(5,5, "helperDispatchWriteDone",
1315 CommIoCbPtrFun(helperDispatchWriteDone
, srv
));
1316 Comm::Write(srv
->writePipe
, srv
->writebuf
->content(), srv
->writebuf
->contentSize(), call
, NULL
);
1321 helperDispatch(helper_server
* srv
, Helper::Request
* r
)
1323 helper
*hlp
= srv
->parent
;
1324 Helper::Request
**ptr
= NULL
;
1327 if (!cbdataReferenceValid(r
->data
)) {
1328 debugs(84, DBG_IMPORTANT
, "helperDispatch: invalid callback data");
1333 for (slot
= 0; slot
< (hlp
->childs
.concurrency
? hlp
->childs
.concurrency
: 1); ++slot
) {
1334 if (!srv
->requests
[slot
]) {
1335 ptr
= &srv
->requests
[slot
];
1342 r
->dispatch_time
= current_time
;
1344 if (srv
->wqueue
->isNull())
1345 srv
->wqueue
->init();
1347 if (hlp
->childs
.concurrency
)
1348 srv
->wqueue
->Printf("%d %s", slot
, r
->buf
);
1350 srv
->wqueue
->append(r
->buf
, strlen(r
->buf
));
1352 if (!srv
->flags
.writing
) {
1353 assert(NULL
== srv
->writebuf
);
1354 srv
->writebuf
= srv
->wqueue
;
1355 srv
->wqueue
= new MemBuf
;
1356 srv
->flags
.writing
= true;
1357 AsyncCall::Pointer call
= commCbCall(5,5, "helperDispatchWriteDone",
1358 CommIoCbPtrFun(helperDispatchWriteDone
, srv
));
1359 Comm::Write(srv
->writePipe
, srv
->writebuf
->content(), srv
->writebuf
->contentSize(), call
, NULL
);
1362 debugs(84, 5, "helperDispatch: Request sent to " << hlp
->id_name
<< " #" << srv
->index
<< ", " << strlen(r
->buf
) << " bytes");
1365 ++ srv
->stats
.pending
;
1366 ++ hlp
->stats
.requests
;
1370 helperStatefulDispatchWriteDone(const Comm::ConnectionPointer
&conn
, char *buf
, size_t len
, Comm::Flag flag
,
1371 int xerrno
, void *data
)
1377 helperStatefulDispatch(helper_stateful_server
* srv
, Helper::Request
* r
)
1379 statefulhelper
*hlp
= srv
->parent
;
1381 if (!cbdataReferenceValid(r
->data
)) {
1382 debugs(84, DBG_IMPORTANT
, "helperStatefulDispatch: invalid callback data");
1384 helperStatefulReleaseServer(srv
);
1388 debugs(84, 9, "helperStatefulDispatch busying helper " << hlp
->id_name
<< " #" << srv
->index
);
1390 if (r
->placeholder
== 1) {
1391 /* a callback is needed before this request can _use_ a helper. */
1392 /* we don't care about releasing this helper. The request NEVER
1393 * gets to the helper. So we throw away the return code */
1394 Helper::Reply nilReply
;
1395 nilReply
.whichServer
= srv
;
1396 r
->callback(r
->data
, nilReply
);
1397 /* throw away the placeholder */
1399 /* and push the queue. Note that the callback may have submitted a new
1400 * request to the helper which is why we test for the request */
1402 if (srv
->request
== NULL
)
1403 helperStatefulServerDone(srv
);
1408 srv
->flags
.reserved
= true;
1410 srv
->dispatch_time
= current_time
;
1411 AsyncCall::Pointer call
= commCbCall(5,5, "helperStatefulDispatchWriteDone",
1412 CommIoCbPtrFun(helperStatefulDispatchWriteDone
, hlp
));
1413 Comm::Write(srv
->writePipe
, r
->buf
, strlen(r
->buf
), call
, NULL
);
1414 debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
1415 hlp
->id_name
<< " #" << srv
->index
<< ", " <<
1416 (int) strlen(r
->buf
) << " bytes");
1419 ++ srv
->stats
.pending
;
1420 ++ hlp
->stats
.requests
;
1424 helperKickQueue(helper
* hlp
)
1429 while ((srv
= GetFirstAvailable(hlp
)) && (r
= Dequeue(hlp
)))
1430 helperDispatch(srv
, r
);
1434 helperStatefulKickQueue(statefulhelper
* hlp
)
1437 helper_stateful_server
*srv
;
1439 while ((srv
= StatefulGetFirstAvailable(hlp
)) && (r
= StatefulDequeue(hlp
)))
1440 helperStatefulDispatch(srv
, r
);
1444 helperStatefulServerDone(helper_stateful_server
* srv
)
1446 if (!srv
->flags
.shutdown
) {
1447 helperStatefulKickQueue(srv
->parent
);
1448 } else if (!srv
->flags
.closing
&& !srv
->flags
.reserved
&& !srv
->stats
.pending
) {
1449 srv
->closeWritePipeSafely(srv
->parent
->id_name
);
1454 // TODO: should helper_ and helper_stateful_ have a common parent?
1456 helperStartStats(StoreEntry
*sentry
, void *hlp
, const char *label
)
1460 storeAppendPrintf(sentry
, "%s: unavailable\n", label
);
1465 storeAppendPrintf(sentry
, "%s:\n", label
);