]> git.ipfire.org Git - thirdparty/squid.git/blob - src/helper.cc
Make Squid death due to overloaded helpers optional.
[thirdparty/squid.git] / src / helper.cc
1 /*
2 * Copyright (C) 1996-2016 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 84 Helper process maintenance */
10
11 #include "squid.h"
12 #include "base/AsyncCbdataCalls.h"
13 #include "base/Packable.h"
14 #include "comm.h"
15 #include "comm/Connection.h"
16 #include "comm/Read.h"
17 #include "comm/Write.h"
18 #include "fd.h"
19 #include "fde.h"
20 #include "format/Quoting.h"
21 #include "helper.h"
22 #include "helper/Reply.h"
23 #include "helper/Request.h"
24 #include "MemBuf.h"
25 #include "SquidConfig.h"
26 #include "SquidIpc.h"
27 #include "SquidMath.h"
28 #include "SquidTime.h"
29 #include "Store.h"
30 #include "wordlist.h"
31
32 // helper_stateful_server::data uses explicit alloc()/freeOne() */
33 #include "mem/Pool.h"
34
35 #define HELPER_MAX_ARGS 64
36
37 /// The maximum allowed request retries.
38 #define MAX_RETRIES 2
39
40 /// Helpers input buffer size.
41 const size_t ReadBufSize(32*1024);
42
43 static IOCB helperHandleRead;
44 static IOCB helperStatefulHandleRead;
45 static void helperServerFree(helper_server *srv);
46 static void helperStatefulServerFree(helper_stateful_server *srv);
47 static void Enqueue(helper * hlp, Helper::Xaction *);
48 static helper_server *GetFirstAvailable(const helper * hlp);
49 static helper_stateful_server *StatefulGetFirstAvailable(const statefulhelper * hlp);
50 static void helperDispatch(helper_server * srv, Helper::Xaction * r);
51 static void helperStatefulDispatch(helper_stateful_server * srv, Helper::Xaction * r);
52 static void helperKickQueue(helper * hlp);
53 static void helperStatefulKickQueue(statefulhelper * hlp);
54 static void helperStatefulServerDone(helper_stateful_server * srv);
55 static void StatefulEnqueue(statefulhelper * hlp, Helper::Xaction * r);
56
57 CBDATA_CLASS_INIT(helper);
58 CBDATA_CLASS_INIT(helper_server);
59 CBDATA_CLASS_INIT(statefulhelper);
60 CBDATA_CLASS_INIT(helper_stateful_server);
61
62 InstanceIdDefinitions(HelperServerBase, "Hlpr");
63
64 void
65 HelperServerBase::initStats()
66 {
67 stats.uses=0;
68 stats.replies=0;
69 stats.pending=0;
70 stats.releases=0;
71 stats.timedout = 0;
72 }
73
74 void
75 HelperServerBase::closePipesSafely(const char *id_name)
76 {
77 #if _SQUID_WINDOWS_
78 shutdown(writePipe->fd, SD_BOTH);
79 #endif
80
81 flags.closing = true;
82 if (readPipe->fd == writePipe->fd)
83 readPipe->fd = -1;
84 else
85 readPipe->close();
86 writePipe->close();
87
88 #if _SQUID_WINDOWS_
89 if (hIpc) {
90 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
91 getCurrentTime();
92 debugs(84, DBG_IMPORTANT, "WARNING: " << id_name <<
93 " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
94 }
95 CloseHandle(hIpc);
96 }
97 #endif
98 }
99
100 void
101 HelperServerBase::closeWritePipeSafely(const char *id_name)
102 {
103 #if _SQUID_WINDOWS_
104 shutdown(writePipe->fd, (readPipe->fd == writePipe->fd ? SD_BOTH : SD_SEND));
105 #endif
106
107 flags.closing = true;
108 if (readPipe->fd == writePipe->fd)
109 readPipe->fd = -1;
110 writePipe->close();
111
112 #if _SQUID_WINDOWS_
113 if (hIpc) {
114 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
115 getCurrentTime();
116 debugs(84, DBG_IMPORTANT, "WARNING: " << id_name <<
117 " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
118 }
119 CloseHandle(hIpc);
120 }
121 #endif
122 }
123
124 void
125 helperOpenServers(helper * hlp)
126 {
127 char *s;
128 char *progname;
129 char *shortname;
130 char *procname;
131 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
132 char fd_note_buf[FD_DESC_SZ];
133 helper_server *srv;
134 int nargs = 0;
135 int k;
136 pid_t pid;
137 int rfd;
138 int wfd;
139 void * hIpc;
140 wordlist *w;
141
142 if (hlp->cmdline == NULL)
143 return;
144
145 progname = hlp->cmdline->key;
146
147 if ((s = strrchr(progname, '/')))
148 shortname = xstrdup(s + 1);
149 else
150 shortname = xstrdup(progname);
151
152 /* figure out how many new child are actually needed. */
153 int need_new = hlp->childs.needNew();
154
155 debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
156
157 if (need_new < 1) {
158 debugs(84, DBG_IMPORTANT, "helperOpenServers: No '" << shortname << "' processes needed.");
159 }
160
161 procname = (char *)xmalloc(strlen(shortname) + 3);
162
163 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
164
165 args[nargs] = procname;
166 ++nargs;
167
168 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
169 args[nargs] = w->key;
170 ++nargs;
171 }
172
173 args[nargs] = NULL;
174 ++nargs;
175
176 assert(nargs <= HELPER_MAX_ARGS);
177
178 for (k = 0; k < need_new; ++k) {
179 getCurrentTime();
180 rfd = wfd = -1;
181 pid = ipcCreate(hlp->ipc_type,
182 progname,
183 args,
184 shortname,
185 hlp->addr,
186 &rfd,
187 &wfd,
188 &hIpc);
189
190 if (pid < 0) {
191 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
192 continue;
193 }
194
195 ++ hlp->childs.n_running;
196 ++ hlp->childs.n_active;
197 srv = new helper_server;
198 srv->hIpc = hIpc;
199 srv->pid = pid;
200 srv->initStats();
201 srv->addr = hlp->addr;
202 srv->readPipe = new Comm::Connection;
203 srv->readPipe->fd = rfd;
204 srv->writePipe = new Comm::Connection;
205 srv->writePipe->fd = wfd;
206 srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
207 srv->wqueue = new MemBuf;
208 srv->roffset = 0;
209 srv->nextRequestId = 0;
210 srv->replyXaction = NULL;
211 srv->ignoreToEom = false;
212 srv->parent = cbdataReference(hlp);
213 dlinkAddTail(srv, &srv->link, &hlp->servers);
214
215 if (rfd == wfd) {
216 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
217 fd_note(rfd, fd_note_buf);
218 } else {
219 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
220 fd_note(rfd, fd_note_buf);
221 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
222 fd_note(wfd, fd_note_buf);
223 }
224
225 commSetNonBlocking(rfd);
226
227 if (wfd != rfd)
228 commSetNonBlocking(wfd);
229
230 AsyncCall::Pointer closeCall = asyncCall(5,4, "helperServerFree", cbdataDialer(helperServerFree, srv));
231 comm_add_close_handler(rfd, closeCall);
232
233 if (hlp->timeout && hlp->childs.concurrency) {
234 AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "helper_server::requestTimeout",
235 CommTimeoutCbPtrFun(helper_server::requestTimeout, srv));
236 commSetConnTimeout(srv->readPipe, hlp->timeout, timeoutCall);
237 }
238
239 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
240 CommIoCbPtrFun(helperHandleRead, srv));
241 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
242 }
243
244 hlp->last_restart = squid_curtime;
245 safe_free(shortname);
246 safe_free(procname);
247 helperKickQueue(hlp);
248 }
249
250 /**
251 * DPW 2007-05-08
252 *
253 * helperStatefulOpenServers: create the stateful child helper processes
254 */
255 void
256 helperStatefulOpenServers(statefulhelper * hlp)
257 {
258 char *shortname;
259 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
260 char fd_note_buf[FD_DESC_SZ];
261 int nargs = 0;
262
263 if (hlp->cmdline == NULL)
264 return;
265
266 if (hlp->childs.concurrency)
267 debugs(84, DBG_CRITICAL, "ERROR: concurrency= is not yet supported for stateful helpers ('" << hlp->cmdline << "')");
268
269 char *progname = hlp->cmdline->key;
270
271 char *s;
272 if ((s = strrchr(progname, '/')))
273 shortname = xstrdup(s + 1);
274 else
275 shortname = xstrdup(progname);
276
277 /* figure out haw mant new helpers are needed. */
278 int need_new = hlp->childs.needNew();
279
280 debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
281
282 if (need_new < 1) {
283 debugs(84, DBG_IMPORTANT, "helperStatefulOpenServers: No '" << shortname << "' processes needed.");
284 }
285
286 char *procname = (char *)xmalloc(strlen(shortname) + 3);
287
288 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
289
290 args[nargs] = procname;
291 ++nargs;
292
293 for (wordlist *w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
294 args[nargs] = w->key;
295 ++nargs;
296 }
297
298 args[nargs] = NULL;
299 ++nargs;
300
301 assert(nargs <= HELPER_MAX_ARGS);
302
303 for (int k = 0; k < need_new; ++k) {
304 getCurrentTime();
305 int rfd = -1;
306 int wfd = -1;
307 void * hIpc;
308 pid_t pid = ipcCreate(hlp->ipc_type,
309 progname,
310 args,
311 shortname,
312 hlp->addr,
313 &rfd,
314 &wfd,
315 &hIpc);
316
317 if (pid < 0) {
318 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
319 continue;
320 }
321
322 ++ hlp->childs.n_running;
323 ++ hlp->childs.n_active;
324 helper_stateful_server *srv = new helper_stateful_server;
325 srv->hIpc = hIpc;
326 srv->pid = pid;
327 srv->flags.reserved = false;
328 srv->initStats();
329 srv->addr = hlp->addr;
330 srv->readPipe = new Comm::Connection;
331 srv->readPipe->fd = rfd;
332 srv->writePipe = new Comm::Connection;
333 srv->writePipe->fd = wfd;
334 srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
335 srv->roffset = 0;
336 srv->parent = cbdataReference(hlp);
337
338 if (hlp->datapool != NULL)
339 srv->data = hlp->datapool->alloc();
340
341 dlinkAddTail(srv, &srv->link, &hlp->servers);
342
343 if (rfd == wfd) {
344 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
345 fd_note(rfd, fd_note_buf);
346 } else {
347 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
348 fd_note(rfd, fd_note_buf);
349 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
350 fd_note(wfd, fd_note_buf);
351 }
352
353 commSetNonBlocking(rfd);
354
355 if (wfd != rfd)
356 commSetNonBlocking(wfd);
357
358 AsyncCall::Pointer closeCall = asyncCall(5,4, "helperStatefulServerFree", cbdataDialer(helperStatefulServerFree, srv));
359 comm_add_close_handler(rfd, closeCall);
360
361 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
362 CommIoCbPtrFun(helperStatefulHandleRead, srv));
363 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
364 }
365
366 hlp->last_restart = squid_curtime;
367 safe_free(shortname);
368 safe_free(procname);
369 helperStatefulKickQueue(hlp);
370 }
371
372 void
373 helper::submitRequest(Helper::Xaction *r)
374 {
375 helper_server *srv;
376
377 if ((srv = GetFirstAvailable(this)))
378 helperDispatch(srv, r);
379 else
380 Enqueue(this, r);
381
382 syncQueueStats();
383 }
384
385 /// handles helperSubmit() and helperStatefulSubmit() failures
386 static void
387 SubmissionFailure(helper *hlp, HLPCB *callback, void *data)
388 {
389 auto result = Helper::Error;
390 if (!hlp) {
391 debugs(84, 3, "no helper");
392 result = Helper::Unknown;
393 }
394 // else pretend the helper has responded with ERR
395
396 callback(data, Helper::Reply(result));
397 }
398
399 void
400 helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
401 {
402 if (!hlp || !hlp->trySubmit(buf, callback, data))
403 SubmissionFailure(hlp, callback, data);
404 }
405
406 /// whether queuing an additional request would overload the helper
407 bool
408 helper::queueFull() const {
409 return stats.queue_size >= static_cast<int>(childs.queue_size);
410 }
411
412 bool
413 helper::overloaded() const {
414 return stats.queue_size > static_cast<int>(childs.queue_size);
415 }
416
417 /// synchronizes queue-dependent measurements with the current queue state
418 void
419 helper::syncQueueStats()
420 {
421 if (overloaded()) {
422 if (overloadStart) {
423 debugs(84, 5, id_name << " still overloaded; dropped " << droppedRequests);
424 } else {
425 overloadStart = squid_curtime;
426 debugs(84, 3, id_name << " became overloaded");
427 }
428 } else {
429 if (overloadStart) {
430 debugs(84, 5, id_name << " is no longer overloaded");
431 if (droppedRequests) {
432 debugs(84, DBG_IMPORTANT, "helper " << id_name <<
433 " is no longer overloaded after dropping " << droppedRequests <<
434 " requests in " << (squid_curtime - overloadStart) << " seconds");
435 droppedRequests = 0;
436 }
437 overloadStart = 0;
438 }
439 }
440 }
441
442 /// prepares the helper for request submission
443 /// returns true if and only if the submission should proceed
444 /// may kill Squid if the helper remains overloaded for too long
445 bool
446 helper::prepSubmit()
447 {
448 // re-sync for the configuration may have changed since the last submission
449 syncQueueStats();
450
451 // Nothing special to do if the new request does not overload (i.e., the
452 // queue is not even full yet) or only _starts_ overloading this helper
453 // (i.e., the queue is currently at its limit).
454 if (!overloaded())
455 return true;
456
457 if (squid_curtime - overloadStart <= 180)
458 return true; // also OK: overload has not persisted long enough to panic
459
460 if (childs.onPersistentOverload == Helper::ChildConfig::actDie)
461 fatalf("Too many queued %s requests; see on-persistent-overload.", id_name);
462
463 if (!droppedRequests) {
464 debugs(84, DBG_IMPORTANT, "WARNING: dropping requests to overloaded " <<
465 id_name << " helper configured with on-persistent-overload=err");
466 }
467 ++droppedRequests;
468 debugs(84, 3, "failed to send " << droppedRequests << " helper requests to " << id_name);
469 return false;
470 }
471
472 bool
473 helper::trySubmit(const char *buf, HLPCB * callback, void *data)
474 {
475 if (!prepSubmit())
476 return false; // request was dropped
477
478 submit(buf, callback, data); // will send or queue
479 return true; // request submitted or queued
480 }
481
482 /// dispatches or enqueues a helper requests; does not enforce queue limits
483 void
484 helper::submit(const char *buf, HLPCB * callback, void *data)
485 {
486 Helper::Xaction *r = new Helper::Xaction(callback, data, buf);
487 submitRequest(r);
488 debugs(84, DBG_DATA, Raw("buf", buf, strlen(buf)));
489 }
490
491 /// lastserver = "server last used as part of a reserved request sequence"
492 void
493 helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPCB * callback, void *data, helper_stateful_server * lastserver)
494 {
495 if (!hlp || !hlp->trySubmit(buf, callback, data, lastserver))
496 SubmissionFailure(hlp, callback, data);
497 }
498
499 /// If possible, submit request. Otherwise, either kill Squid or return false.
500 bool
501 statefulhelper::trySubmit(const char *buf, HLPCB * callback, void *data, helper_stateful_server *lastserver)
502 {
503 if (!prepSubmit())
504 return false; // request was dropped
505
506 submit(buf, callback, data, lastserver); // will send or queue
507 return true; // request submitted or queued
508 }
509
510 void statefulhelper::submit(const char *buf, HLPCB * callback, void *data, helper_stateful_server * lastserver)
511 {
512 Helper::Xaction *r = new Helper::Xaction(callback, data, buf);
513
514 if ((buf != NULL) && lastserver) {
515 debugs(84, 5, "StatefulSubmit with lastserver " << lastserver);
516 assert(lastserver->flags.reserved);
517 assert(!lastserver->requests.size());
518
519 debugs(84, 5, "StatefulSubmit dispatching");
520 helperStatefulDispatch(lastserver, r);
521 } else {
522 helper_stateful_server *srv;
523 if ((srv = StatefulGetFirstAvailable(this))) {
524 helperStatefulDispatch(srv, r);
525 } else
526 StatefulEnqueue(this, r);
527 }
528
529 debugs(84, DBG_DATA, "placeholder: '" << r->request.placeholder <<
530 "', " << Raw("buf", buf, (!buf?0:strlen(buf))));
531
532 syncQueueStats();
533 }
534
535 /**
536 * DPW 2007-05-08
537 *
538 * helperStatefulReleaseServer tells the helper that whoever was
539 * using it no longer needs its services.
540 */
541 void
542 helperStatefulReleaseServer(helper_stateful_server * srv)
543 {
544 debugs(84, 3, HERE << "srv-" << srv->index << " flags.reserved = " << srv->flags.reserved);
545 if (!srv->flags.reserved)
546 return;
547
548 ++ srv->stats.releases;
549
550 srv->flags.reserved = false;
551
552 helperStatefulServerDone(srv);
553 }
554
555 /** return a pointer to the stateful routines data area */
556 void *
557 helperStatefulServerGetData(helper_stateful_server * srv)
558 {
559 return srv->data;
560 }
561
562 void
563 helper::packStatsInto(Packable *p, const char *label) const
564 {
565 if (label)
566 p->appendf("%s:\n", label);
567
568 p->appendf(" program: %s\n", cmdline->key);
569 p->appendf(" number active: %d of %d (%d shutting down)\n", childs.n_active, childs.n_max, (childs.n_running - childs.n_active));
570 p->appendf(" requests sent: %d\n", stats.requests);
571 p->appendf(" replies received: %d\n", stats.replies);
572 p->appendf(" requests timedout: %d\n", stats.timedout);
573 p->appendf(" queue length: %d\n", stats.queue_size);
574 p->appendf(" avg service time: %d msec\n", stats.avg_svc_time);
575 p->append("\n",1);
576 p->appendf("%7s\t%7s\t%7s\t%11s\t%11s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
577 "ID #",
578 "FD",
579 "PID",
580 "# Requests",
581 "# Replies",
582 "# Timed-out",
583 "Flags",
584 "Time",
585 "Offset",
586 "Request");
587
588 for (dlink_node *link = servers.head; link; link = link->next) {
589 HelperServerBase *srv = static_cast<HelperServerBase *>(link->data);
590 assert(srv);
591 Helper::Xaction *xaction = srv->requests.empty() ? NULL : srv->requests.front();
592 double tt = 0.001 * (xaction ? tvSubMsec(xaction->request.dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
593 p->appendf("%7u\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
594 srv->index.value,
595 srv->readPipe->fd,
596 srv->pid,
597 srv->stats.uses,
598 srv->stats.replies,
599 srv->stats.timedout,
600 srv->stats.pending ? 'B' : ' ',
601 srv->flags.writing ? 'W' : ' ',
602 srv->flags.closing ? 'C' : ' ',
603 srv->flags.reserved ? 'R' : ' ',
604 srv->flags.shutdown ? 'S' : ' ',
605 xaction && xaction->request.placeholder ? 'P' : ' ',
606 tt < 0.0 ? 0.0 : tt,
607 (int) srv->roffset,
608 xaction ? Format::QuoteMimeBlob(xaction->request.buf) : "(none)");
609 }
610
611 p->append("\nFlags key:\n"
612 " B\tBUSY\n"
613 " W\tWRITING\n"
614 " C\tCLOSING\n"
615 " R\tRESERVED\n"
616 " S\tSHUTDOWN PENDING\n"
617 " P\tPLACEHOLDER\n", 101);
618 }
619
620 bool
621 helper::willOverload() const {
622 return queueFull() && !(childs.needNew() || GetFirstAvailable(this));
623 }
624
625 void
626 helperShutdown(helper * hlp)
627 {
628 dlink_node *link = hlp->servers.head;
629
630 while (link) {
631 helper_server *srv;
632 srv = (helper_server *)link->data;
633 link = link->next;
634
635 if (srv->flags.shutdown) {
636 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
637 continue;
638 }
639
640 assert(hlp->childs.n_active > 0);
641 -- hlp->childs.n_active;
642 srv->flags.shutdown = true; /* request it to shut itself down */
643
644 if (srv->flags.closing) {
645 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
646 continue;
647 }
648
649 if (srv->stats.pending) {
650 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
651 continue;
652 }
653
654 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
655 /* the rest of the details is dealt with in the helperServerFree
656 * close handler
657 */
658 srv->closePipesSafely(hlp->id_name);
659 }
660 }
661
662 void
663 helperStatefulShutdown(statefulhelper * hlp)
664 {
665 dlink_node *link = hlp->servers.head;
666 helper_stateful_server *srv;
667
668 while (link) {
669 srv = (helper_stateful_server *)link->data;
670 link = link->next;
671
672 if (srv->flags.shutdown) {
673 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
674 continue;
675 }
676
677 assert(hlp->childs.n_active > 0);
678 -- hlp->childs.n_active;
679 srv->flags.shutdown = true; /* request it to shut itself down */
680
681 if (srv->stats.pending) {
682 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
683 continue;
684 }
685
686 if (srv->flags.closing) {
687 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
688 continue;
689 }
690
691 if (srv->flags.reserved) {
692 if (shutting_down) {
693 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Closing anyway.");
694 } else {
695 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Not Shutting Down Yet.");
696 continue;
697 }
698 }
699
700 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
701
702 /* the rest of the details is dealt with in the helperStatefulServerFree
703 * close handler
704 */
705 srv->closePipesSafely(hlp->id_name);
706 }
707 }
708
709 helper::~helper()
710 {
711 /* note, don't free id_name, it probably points to static memory */
712
713 // TODO: if the queue is not empty it will leak Helper::Request's
714 if (!queue.empty())
715 debugs(84, DBG_CRITICAL, "WARNING: freeing " << id_name << " helper with " << stats.queue_size << " requests queued");
716 }
717
718 /* ====================================================================== */
719 /* LOCAL FUNCTIONS */
720 /* ====================================================================== */
721
722 static void
723 helperServerFree(helper_server *srv)
724 {
725 helper *hlp = srv->parent;
726 int concurrency = hlp->childs.concurrency;
727
728 if (!concurrency)
729 concurrency = 1;
730
731 if (srv->rbuf) {
732 memFreeBuf(srv->rbuf_sz, srv->rbuf);
733 srv->rbuf = NULL;
734 }
735
736 srv->wqueue->clean();
737 delete srv->wqueue;
738
739 if (srv->writebuf) {
740 srv->writebuf->clean();
741 delete srv->writebuf;
742 srv->writebuf = NULL;
743 }
744
745 if (Comm::IsConnOpen(srv->writePipe))
746 srv->closeWritePipeSafely(hlp->id_name);
747
748 dlinkDelete(&srv->link, &hlp->servers);
749
750 assert(hlp->childs.n_running > 0);
751 -- hlp->childs.n_running;
752
753 if (!srv->flags.shutdown) {
754 assert(hlp->childs.n_active > 0);
755 -- hlp->childs.n_active;
756 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index << " exited");
757
758 if (hlp->childs.needNew() > 0) {
759 debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
760
761 if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30) {
762 if (srv->stats.replies < 1)
763 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
764 else
765 debugs(80, DBG_CRITICAL, "ERROR: The " << hlp->id_name << " helpers are crashing too rapidly, need help!");
766 }
767
768 debugs(80, DBG_IMPORTANT, "Starting new helpers");
769 helperOpenServers(hlp);
770 }
771 }
772
773 while (!srv->requests.empty()) {
774 // XXX: re-schedule these on another helper?
775 Helper::Xaction *r = srv->requests.front();
776 srv->requests.pop_front();
777 void *cbdata;
778
779 if (cbdataReferenceValidDone(r->request.data, &cbdata)) {
780 r->reply.result = Helper::Unknown;
781 r->request.callback(cbdata, r->reply);
782 }
783
784 delete r;
785 }
786 srv->requestsIndex.clear();
787
788 cbdataReferenceDone(srv->parent);
789 delete srv;
790 }
791
792 static void
793 helperStatefulServerFree(helper_stateful_server *srv)
794 {
795 statefulhelper *hlp = srv->parent;
796
797 if (srv->rbuf) {
798 memFreeBuf(srv->rbuf_sz, srv->rbuf);
799 srv->rbuf = NULL;
800 }
801
802 #if 0
803 srv->wqueue->clean();
804
805 delete srv->wqueue;
806
807 #endif
808
809 /* TODO: walk the local queue of requests and carry them all out */
810 if (Comm::IsConnOpen(srv->writePipe))
811 srv->closeWritePipeSafely(hlp->id_name);
812
813 dlinkDelete(&srv->link, &hlp->servers);
814
815 assert(hlp->childs.n_running > 0);
816 -- hlp->childs.n_running;
817
818 if (!srv->flags.shutdown) {
819 assert( hlp->childs.n_active > 0);
820 -- hlp->childs.n_active;
821 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index << " exited");
822
823 if (hlp->childs.needNew() > 0) {
824 debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
825
826 if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30) {
827 if (srv->stats.replies < 1)
828 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
829 else
830 debugs(80, DBG_CRITICAL, "ERROR: The " << hlp->id_name << " helpers are crashing too rapidly, need help!");
831 }
832
833 debugs(80, DBG_IMPORTANT, "Starting new helpers");
834 helperStatefulOpenServers(hlp);
835 }
836 }
837
838 while (!srv->requests.empty()) {
839 // XXX: re-schedule these on another helper?
840 Helper::Xaction *r = srv->requests.front();
841 srv->requests.pop_front();
842 void *cbdata;
843
844 if (cbdataReferenceValidDone(r->request.data, &cbdata)) {
845 r->reply.result = Helper::Unknown;
846 r->request.callback(cbdata, r->reply);
847 }
848
849 delete r;
850 }
851
852 if (srv->data != NULL)
853 hlp->datapool->freeOne(srv->data);
854
855 cbdataReferenceDone(srv->parent);
856
857 delete srv;
858 }
859
860 Helper::Xaction *
861 helper_server::popRequest(int request_number)
862 {
863 Helper::Xaction *r = nullptr;
864 helper_server::RequestIndex::iterator it;
865 if (parent->childs.concurrency) {
866 // If concurency supported retrieve request from ID
867 it = requestsIndex.find(request_number);
868 if (it != requestsIndex.end()) {
869 r = *(it->second);
870 requests.erase(it->second);
871 requestsIndex.erase(it);
872 }
873 } else if(!requests.empty()) {
874 // Else get the first request from queue, if any
875 r = requests.front();
876 requests.pop_front();
877 }
878
879 return r;
880 }
881
882 /// Calls back with a pointer to the buffer with the helper output
883 static void
884 helperReturnBuffer(helper_server * srv, helper * hlp, char * msg, size_t msgSize, char * msgEnd)
885 {
886 if (Helper::Xaction *r = srv->replyXaction) {
887 const bool hasSpace = r->reply.accumulate(msg, msgSize);
888 if (!hasSpace) {
889 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
890 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
891 "Squid input buffer: " << hlp->id_name << " #" << srv->index);
892 srv->closePipesSafely(hlp->id_name);
893 return;
894 }
895
896 if (!msgEnd)
897 return; // We are waiting for more data.
898
899 HLPCB *callback = r->request.callback;
900 r->request.callback = nullptr;
901
902 bool retry = false;
903 void *cbdata = nullptr;
904 if (cbdataReferenceValidDone(r->request.data, &cbdata)) {
905 r->reply.finalize();
906 if (r->reply.result == Helper::BrokenHelper && r->request.retries < MAX_RETRIES) {
907 debugs(84, DBG_IMPORTANT, "ERROR: helper: " << r->reply << ", attempt #" << (r->request.retries + 1) << " of 2");
908 retry = true;
909 } else
910 callback(cbdata, r->reply);
911 }
912
913 -- srv->stats.pending;
914 ++ srv->stats.replies;
915
916 ++ hlp->stats.replies;
917
918 srv->answer_time = current_time;
919
920 srv->dispatch_time = r->request.dispatch_time;
921
922 hlp->stats.avg_svc_time =
923 Math::intAverage(hlp->stats.avg_svc_time,
924 tvSubMsec(r->request.dispatch_time, current_time),
925 hlp->stats.replies, REDIRECT_AV_FACTOR);
926
927 // release or re-submit parsedRequestXaction object
928 srv->replyXaction = nullptr;
929 if (retry) {
930 ++r->request.retries;
931 hlp->submitRequest(r);
932 } else
933 delete r;
934 }
935
936 if (hlp->timeout && hlp->childs.concurrency)
937 srv->checkForTimedOutRequests(hlp->retryTimedOut);
938
939 if (!srv->flags.shutdown) {
940 helperKickQueue(hlp);
941 } else if (!srv->flags.closing && !srv->stats.pending) {
942 srv->flags.closing=true;
943 srv->writePipe->close();
944 }
945 }
946
947 static void
948 helperHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
949 {
950 helper_server *srv = (helper_server *)data;
951 helper *hlp = srv->parent;
952 assert(cbdataReferenceValid(data));
953
954 /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
955
956 if (flag == Comm::ERR_CLOSING) {
957 return;
958 }
959
960 assert(conn->fd == srv->readPipe->fd);
961
962 debugs(84, 5, "helperHandleRead: " << len << " bytes from " << hlp->id_name << " #" << srv->index);
963
964 if (flag != Comm::OK || len == 0) {
965 srv->closePipesSafely(hlp->id_name);
966 return;
967 }
968
969 srv->roffset += len;
970 srv->rbuf[srv->roffset] = '\0';
971 debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
972
973 if (!srv->stats.pending && !srv->stats.timedout) {
974 /* someone spoke without being spoken to */
975 debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected read from " <<
976 hlp->id_name << " #" << srv->index << ", " << (int)len <<
977 " bytes '" << srv->rbuf << "'");
978
979 srv->roffset = 0;
980 srv->rbuf[0] = '\0';
981 }
982
983 bool needsMore = false;
984 char *msg = srv->rbuf;
985 while (*msg && !needsMore) {
986 int skip = 0;
987 char *eom = strchr(msg, hlp->eom);
988 if (eom) {
989 skip = 1;
990 debugs(84, 3, "helperHandleRead: end of reply found");
991 if (eom > msg && eom[-1] == '\r' && hlp->eom == '\n') {
992 *eom = '\0';
993 // rewind to the \r octet which is the real terminal now
994 // and remember that we have to skip forward 2 places now.
995 skip = 2;
996 --eom;
997 }
998 *eom = '\0';
999 }
1000
1001 if (!srv->ignoreToEom && !srv->replyXaction) {
1002 int i = 0;
1003 if (hlp->childs.concurrency) {
1004 char *e = NULL;
1005 i = strtol(msg, &e, 10);
1006 // Do we need to check for e == msg? Means wrong response from helper.
1007 // Will be droped as "unexpected reply on channel 0"
1008 needsMore = !(xisspace(*e) || (eom && e == eom));
1009 if (!needsMore) {
1010 msg = e;
1011 while (*msg && xisspace(*msg))
1012 ++msg;
1013 } // else not enough data to compute request number
1014 }
1015 if (!(srv->replyXaction = srv->popRequest(i))) {
1016 if (srv->stats.timedout) {
1017 debugs(84, 3, "Timedout reply received for request-ID: " << i << " , ignore");
1018 } else {
1019 debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected reply on channel " <<
1020 i << " from " << hlp->id_name << " #" << srv->index <<
1021 " '" << srv->rbuf << "'");
1022 }
1023 srv->ignoreToEom = true;
1024 }
1025 } // else we need to just append reply data to the current Xaction
1026
1027 if (!needsMore) {
1028 size_t msgSize = eom ? eom - msg : (srv->roffset - (msg - srv->rbuf));
1029 assert(msgSize <= srv->rbuf_sz);
1030 helperReturnBuffer(srv, hlp, msg, msgSize, eom);
1031 msg += msgSize + skip;
1032 assert(static_cast<size_t>(msg - srv->rbuf) <= srv->rbuf_sz);
1033
1034 // The next message should not ignored.
1035 if (eom && srv->ignoreToEom)
1036 srv->ignoreToEom = false;
1037 } else
1038 assert(skip == 0 && eom == NULL);
1039 }
1040
1041 if (needsMore) {
1042 size_t msgSize = (srv->roffset - (msg - srv->rbuf));
1043 assert(msgSize <= srv->rbuf_sz);
1044 memmove(srv->rbuf, msg, msgSize);
1045 srv->roffset = msgSize;
1046 srv->rbuf[srv->roffset] = '\0';
1047 } else {
1048 // All of the responses parsed and msg points at the end of read data
1049 assert(static_cast<size_t>(msg - srv->rbuf) == srv->roffset);
1050 srv->roffset = 0;
1051 }
1052
1053 if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
1054 int spaceSize = srv->rbuf_sz - srv->roffset - 1;
1055 assert(spaceSize >= 0);
1056
1057 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
1058 CommIoCbPtrFun(helperHandleRead, srv));
1059 comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
1060 }
1061 }
1062
1063 static void
1064 helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
1065 {
1066 char *t = NULL;
1067 helper_stateful_server *srv = (helper_stateful_server *)data;
1068 statefulhelper *hlp = srv->parent;
1069 assert(cbdataReferenceValid(data));
1070
1071 /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
1072
1073 if (flag == Comm::ERR_CLOSING) {
1074 return;
1075 }
1076
1077 assert(conn->fd == srv->readPipe->fd);
1078
1079 debugs(84, 5, "helperStatefulHandleRead: " << len << " bytes from " <<
1080 hlp->id_name << " #" << srv->index);
1081
1082 if (flag != Comm::OK || len == 0) {
1083 srv->closePipesSafely(hlp->id_name);
1084 return;
1085 }
1086
1087 srv->roffset += len;
1088 srv->rbuf[srv->roffset] = '\0';
1089 Helper::Xaction *r = srv->requests.front();
1090 debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
1091
1092 if (r == NULL) {
1093 /* someone spoke without being spoken to */
1094 debugs(84, DBG_IMPORTANT, "helperStatefulHandleRead: unexpected read from " <<
1095 hlp->id_name << " #" << srv->index << ", " << (int)len <<
1096 " bytes '" << srv->rbuf << "'");
1097
1098 srv->roffset = 0;
1099 }
1100
1101 if ((t = strchr(srv->rbuf, hlp->eom))) {
1102 debugs(84, 3, "helperStatefulHandleRead: end of reply found");
1103
1104 if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n') {
1105 *t = '\0';
1106 // rewind to the \r octet which is the real terminal now
1107 --t;
1108 }
1109
1110 *t = '\0';
1111 }
1112
1113 if (r && !r->reply.accumulate(srv->rbuf, t ? (t - srv->rbuf) : srv->roffset)) {
1114 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
1115 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
1116 "Squid input buffer: " << hlp->id_name << " #" << srv->index);
1117 srv->closePipesSafely(hlp->id_name);
1118 return;
1119 }
1120 /**
1121 * BUG: the below assumes that only one response per read() was received and discards any octets remaining.
1122 * Doing this prohibits concurrency support with multiple replies per read().
1123 * TODO: check that read() setup on these buffers pays attention to roffest!=0
1124 * TODO: check that replies bigger than the buffer are discarded and do not to affect future replies
1125 */
1126 srv->roffset = 0;
1127
1128 if (t) {
1129 /* end of reply found */
1130 srv->requests.pop_front(); // we already have it in 'r'
1131 int called = 1;
1132
1133 if (r && cbdataReferenceValid(r->request.data)) {
1134 r->reply.finalize();
1135 r->reply.whichServer = srv;
1136 r->request.callback(r->request.data, r->reply);
1137 } else {
1138 debugs(84, DBG_IMPORTANT, "StatefulHandleRead: no callback data registered");
1139 called = 0;
1140 }
1141
1142 delete r;
1143
1144 -- srv->stats.pending;
1145 ++ srv->stats.replies;
1146
1147 ++ hlp->stats.replies;
1148 srv->answer_time = current_time;
1149 hlp->stats.avg_svc_time =
1150 Math::intAverage(hlp->stats.avg_svc_time,
1151 tvSubMsec(srv->dispatch_time, current_time),
1152 hlp->stats.replies, REDIRECT_AV_FACTOR);
1153
1154 if (called)
1155 helperStatefulServerDone(srv);
1156 else
1157 helperStatefulReleaseServer(srv);
1158 }
1159
1160 if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
1161 int spaceSize = srv->rbuf_sz - 1;
1162
1163 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
1164 CommIoCbPtrFun(helperStatefulHandleRead, srv));
1165 comm_read(srv->readPipe, srv->rbuf, spaceSize, call);
1166 }
1167 }
1168
1169 /// Handles a request when all running helpers, if any, are busy.
1170 static void
1171 Enqueue(helper * hlp, Helper::Xaction * r)
1172 {
1173 hlp->queue.push(r);
1174 ++ hlp->stats.queue_size;
1175
1176 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1177 if (hlp->childs.needNew() > 0) {
1178 debugs(84, DBG_CRITICAL, "Starting new " << hlp->id_name << " helpers...");
1179 helperOpenServers(hlp);
1180 return;
1181 }
1182
1183 if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
1184 return;
1185
1186 if (squid_curtime - hlp->last_queue_warn < 600)
1187 return;
1188
1189 if (shutting_down || reconfiguring)
1190 return;
1191
1192 hlp->last_queue_warn = squid_curtime;
1193
1194 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1195 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1196 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1197 }
1198
1199 static void
1200 StatefulEnqueue(statefulhelper * hlp, Helper::Xaction * r)
1201 {
1202 hlp->queue.push(r);
1203 ++ hlp->stats.queue_size;
1204
1205 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1206 if (hlp->childs.needNew() > 0) {
1207 debugs(84, DBG_CRITICAL, "Starting new " << hlp->id_name << " helpers...");
1208 helperStatefulOpenServers(hlp);
1209 return;
1210 }
1211
1212 if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
1213 return;
1214
1215 if (squid_curtime - hlp->last_queue_warn < 600)
1216 return;
1217
1218 if (shutting_down || reconfiguring)
1219 return;
1220
1221 hlp->last_queue_warn = squid_curtime;
1222
1223 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1224 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1225 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1226 }
1227
1228 Helper::Xaction *
1229 helper::nextRequest()
1230 {
1231 if (queue.empty())
1232 return nullptr;
1233
1234 auto *r = queue.front();
1235 queue.pop();
1236 --stats.queue_size;
1237 return r;
1238 }
1239
1240 static helper_server *
1241 GetFirstAvailable(const helper * hlp)
1242 {
1243 dlink_node *n;
1244 helper_server *srv;
1245 helper_server *selected = NULL;
1246 debugs(84, 5, "GetFirstAvailable: Running servers " << hlp->childs.n_running);
1247
1248 if (hlp->childs.n_running == 0)
1249 return NULL;
1250
1251 /* Find "least" loaded helper (approx) */
1252 for (n = hlp->servers.head; n != NULL; n = n->next) {
1253 srv = (helper_server *)n->data;
1254
1255 if (selected && selected->stats.pending <= srv->stats.pending)
1256 continue;
1257
1258 if (srv->flags.shutdown)
1259 continue;
1260
1261 if (!srv->stats.pending)
1262 return srv;
1263
1264 if (selected) {
1265 selected = srv;
1266 break;
1267 }
1268
1269 selected = srv;
1270 }
1271
1272 if (!selected) {
1273 debugs(84, 5, "GetFirstAvailable: None available.");
1274 return NULL;
1275 }
1276
1277 if (selected->stats.pending >= (hlp->childs.concurrency ? hlp->childs.concurrency : 1)) {
1278 debugs(84, 3, "GetFirstAvailable: Least-loaded helper is fully loaded!");
1279 return NULL;
1280 }
1281
1282 debugs(84, 5, "GetFirstAvailable: returning srv-" << selected->index);
1283 return selected;
1284 }
1285
1286 static helper_stateful_server *
1287 StatefulGetFirstAvailable(const statefulhelper * hlp)
1288 {
1289 dlink_node *n;
1290 helper_stateful_server *srv = NULL;
1291 debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp->childs.n_running);
1292
1293 if (hlp->childs.n_running == 0)
1294 return NULL;
1295
1296 for (n = hlp->servers.head; n != NULL; n = n->next) {
1297 srv = (helper_stateful_server *)n->data;
1298
1299 if (srv->stats.pending)
1300 continue;
1301
1302 if (srv->flags.reserved)
1303 continue;
1304
1305 if (srv->flags.shutdown)
1306 continue;
1307
1308 debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv->index);
1309 return srv;
1310 }
1311
1312 debugs(84, 5, "StatefulGetFirstAvailable: None available.");
1313 return NULL;
1314 }
1315
1316 static void
1317 helperDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag flag, int, void *data)
1318 {
1319 helper_server *srv = (helper_server *)data;
1320
1321 srv->writebuf->clean();
1322 delete srv->writebuf;
1323 srv->writebuf = NULL;
1324 srv->flags.writing = false;
1325
1326 if (flag != Comm::OK) {
1327 /* Helper server has crashed */
1328 debugs(84, DBG_CRITICAL, "helperDispatch: Helper " << srv->parent->id_name << " #" << srv->index << " has crashed");
1329 return;
1330 }
1331
1332 if (!srv->wqueue->isNull()) {
1333 srv->writebuf = srv->wqueue;
1334 srv->wqueue = new MemBuf;
1335 srv->flags.writing = true;
1336 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1337 CommIoCbPtrFun(helperDispatchWriteDone, srv));
1338 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
1339 }
1340 }
1341
1342 static void
1343 helperDispatch(helper_server * srv, Helper::Xaction * r)
1344 {
1345 helper *hlp = srv->parent;
1346 const uint64_t reqId = ++srv->nextRequestId;
1347
1348 if (!cbdataReferenceValid(r->request.data)) {
1349 debugs(84, DBG_IMPORTANT, "helperDispatch: invalid callback data");
1350 delete r;
1351 return;
1352 }
1353
1354 r->request.Id = reqId;
1355 helper_server::Requests::iterator it = srv->requests.insert(srv->requests.end(), r);
1356 r->request.dispatch_time = current_time;
1357
1358 if (srv->wqueue->isNull())
1359 srv->wqueue->init();
1360
1361 if (hlp->childs.concurrency) {
1362 srv->requestsIndex.insert(helper_server::RequestIndex::value_type(reqId, it));
1363 assert(srv->requestsIndex.size() == srv->requests.size());
1364 srv->wqueue->appendf("%" PRIu64 " %s", reqId, r->request.buf);
1365 } else
1366 srv->wqueue->append(r->request.buf, strlen(r->request.buf));
1367
1368 if (!srv->flags.writing) {
1369 assert(NULL == srv->writebuf);
1370 srv->writebuf = srv->wqueue;
1371 srv->wqueue = new MemBuf;
1372 srv->flags.writing = true;
1373 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1374 CommIoCbPtrFun(helperDispatchWriteDone, srv));
1375 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
1376 }
1377
1378 debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index << ", " << strlen(r->request.buf) << " bytes");
1379
1380 ++ srv->stats.uses;
1381 ++ srv->stats.pending;
1382 ++ hlp->stats.requests;
1383 }
1384
1385 static void
1386 helperStatefulDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag, int, void *)
1387 {}
1388
1389 static void
1390 helperStatefulDispatch(helper_stateful_server * srv, Helper::Xaction * r)
1391 {
1392 statefulhelper *hlp = srv->parent;
1393
1394 if (!cbdataReferenceValid(r->request.data)) {
1395 debugs(84, DBG_IMPORTANT, "helperStatefulDispatch: invalid callback data");
1396 delete r;
1397 helperStatefulReleaseServer(srv);
1398 return;
1399 }
1400
1401 debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index);
1402
1403 if (r->request.placeholder == 1) {
1404 /* a callback is needed before this request can _use_ a helper. */
1405 /* we don't care about releasing this helper. The request NEVER
1406 * gets to the helper. So we throw away the return code */
1407 r->reply.result = Helper::Unknown;
1408 r->reply.whichServer = srv;
1409 r->request.callback(r->request.data, r->reply);
1410 /* throw away the placeholder */
1411 delete r;
1412 /* and push the queue. Note that the callback may have submitted a new
1413 * request to the helper which is why we test for the request */
1414
1415 if (!srv->requests.size())
1416 helperStatefulServerDone(srv);
1417
1418 return;
1419 }
1420
1421 srv->flags.reserved = true;
1422 srv->requests.push_back(r);
1423 srv->dispatch_time = current_time;
1424 AsyncCall::Pointer call = commCbCall(5,5, "helperStatefulDispatchWriteDone",
1425 CommIoCbPtrFun(helperStatefulDispatchWriteDone, hlp));
1426 Comm::Write(srv->writePipe, r->request.buf, strlen(r->request.buf), call, NULL);
1427 debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
1428 hlp->id_name << " #" << srv->index << ", " <<
1429 (int) strlen(r->request.buf) << " bytes");
1430
1431 ++ srv->stats.uses;
1432 ++ srv->stats.pending;
1433 ++ hlp->stats.requests;
1434 }
1435
1436 static void
1437 helperKickQueue(helper * hlp)
1438 {
1439 Helper::Xaction *r;
1440 helper_server *srv;
1441
1442 while ((srv = GetFirstAvailable(hlp)) && (r = hlp->nextRequest()))
1443 helperDispatch(srv, r);
1444 }
1445
1446 static void
1447 helperStatefulKickQueue(statefulhelper * hlp)
1448 {
1449 Helper::Xaction *r;
1450 helper_stateful_server *srv;
1451
1452 while ((srv = StatefulGetFirstAvailable(hlp)) && (r = hlp->nextRequest()))
1453 helperStatefulDispatch(srv, r);
1454 }
1455
1456 static void
1457 helperStatefulServerDone(helper_stateful_server * srv)
1458 {
1459 if (!srv->flags.shutdown) {
1460 helperStatefulKickQueue(srv->parent);
1461 } else if (!srv->flags.closing && !srv->flags.reserved && !srv->stats.pending) {
1462 srv->closeWritePipeSafely(srv->parent->id_name);
1463 return;
1464 }
1465 }
1466
1467 void
1468 helper_server::checkForTimedOutRequests(bool const retry)
1469 {
1470 assert(parent->childs.concurrency);
1471 while(!requests.empty() && requests.front()->request.timedOut(parent->timeout)) {
1472 Helper::Xaction *r = requests.front();
1473 RequestIndex::iterator it;
1474 it = requestsIndex.find(r->request.Id);
1475 assert(it != requestsIndex.end());
1476 requestsIndex.erase(it);
1477 requests.pop_front();
1478 debugs(84, 2, "Request " << r->request.Id << " timed-out, remove it from queue");
1479 void *cbdata;
1480 bool retried = false;
1481 if (retry && r->request.retries < MAX_RETRIES && cbdataReferenceValid(r->request.data)) {
1482 debugs(84, 2, "Retry request " << r->request.Id);
1483 ++r->request.retries;
1484 parent->submitRequest(r);
1485 retried = true;
1486 } else if (cbdataReferenceValidDone(r->request.data, &cbdata)) {
1487 if (!parent->onTimedOutResponse.isEmpty()) {
1488 if (r->reply.accumulate(parent->onTimedOutResponse.rawContent(), parent->onTimedOutResponse.length()))
1489 r->reply.finalize();
1490 else
1491 r->reply.result = Helper::TimedOut;
1492 r->request.callback(cbdata, r->reply);
1493 } else {
1494 r->reply.result = Helper::TimedOut;
1495 r->request.callback(cbdata, r->reply);
1496 }
1497 }
1498 --stats.pending;
1499 ++stats.timedout;
1500 ++parent->stats.timedout;
1501 if (!retried)
1502 delete r;
1503 }
1504 }
1505
1506 void
1507 helper_server::requestTimeout(const CommTimeoutCbParams &io)
1508 {
1509 debugs(26, 3, HERE << io.conn);
1510 helper_server *srv = static_cast<helper_server *>(io.data);
1511
1512 if (!cbdataReferenceValid(srv))
1513 return;
1514
1515 srv->checkForTimedOutRequests(srv->parent->retryTimedOut);
1516
1517 debugs(84, 3, HERE << io.conn << " establish new helper_server::requestTimeout");
1518 AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "helper_server::requestTimeout",
1519 CommTimeoutCbPtrFun(helper_server::requestTimeout, srv));
1520
1521 const int timeSpent = srv->requests.empty() ? 0 : (squid_curtime - srv->requests.front()->request.dispatch_time.tv_sec);
1522 const int timeLeft = max(1, (static_cast<int>(srv->parent->timeout) - timeSpent));
1523
1524 commSetConnTimeout(io.conn, timeLeft, timeoutCall);
1525 }
1526