]> git.ipfire.org Git - thirdparty/squid.git/blob - src/helper.cc
Docs: Copyright updates for 2018 (#114)
[thirdparty/squid.git] / src / helper.cc
1 /*
2 * Copyright (C) 1996-2018 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 84 Helper process maintenance */
10
11 #include "squid.h"
12 #include "base/AsyncCbdataCalls.h"
13 #include "base/Packable.h"
14 #include "comm.h"
15 #include "comm/Connection.h"
16 #include "comm/Read.h"
17 #include "comm/Write.h"
18 #include "fd.h"
19 #include "fde.h"
20 #include "format/Quoting.h"
21 #include "helper.h"
22 #include "helper/Reply.h"
23 #include "helper/Request.h"
24 #include "MemBuf.h"
25 #include "SquidConfig.h"
26 #include "SquidIpc.h"
27 #include "SquidMath.h"
28 #include "SquidTime.h"
29 #include "Store.h"
30 #include "wordlist.h"
31
32 // helper_stateful_server::data uses explicit alloc()/freeOne() */
33 #include "mem/Pool.h"
34
35 #define HELPER_MAX_ARGS 64
36
37 /// The maximum allowed request retries.
38 #define MAX_RETRIES 2
39
40 /// Helpers input buffer size.
41 const size_t ReadBufSize(32*1024);
42
43 static IOCB helperHandleRead;
44 static IOCB helperStatefulHandleRead;
45 static void helperServerFree(helper_server *srv);
46 static void helperStatefulServerFree(helper_stateful_server *srv);
47 static void Enqueue(helper * hlp, Helper::Xaction *);
48 static helper_server *GetFirstAvailable(const helper * hlp);
49 static helper_stateful_server *StatefulGetFirstAvailable(const statefulhelper * hlp);
50 static void helperDispatch(helper_server * srv, Helper::Xaction * r);
51 static void helperStatefulDispatch(helper_stateful_server * srv, Helper::Xaction * r);
52 static void helperKickQueue(helper * hlp);
53 static void helperStatefulKickQueue(statefulhelper * hlp);
54 static void helperStatefulServerDone(helper_stateful_server * srv);
55 static void StatefulEnqueue(statefulhelper * hlp, Helper::Xaction * r);
56
57 CBDATA_CLASS_INIT(helper);
58 CBDATA_CLASS_INIT(helper_server);
59 CBDATA_CLASS_INIT(statefulhelper);
60 CBDATA_CLASS_INIT(helper_stateful_server);
61
62 InstanceIdDefinitions(HelperServerBase, "Hlpr");
63
64 void
65 HelperServerBase::initStats()
66 {
67 stats.uses=0;
68 stats.replies=0;
69 stats.pending=0;
70 stats.releases=0;
71 stats.timedout = 0;
72 }
73
74 void
75 HelperServerBase::closePipesSafely(const char *id_name)
76 {
77 #if _SQUID_WINDOWS_
78 shutdown(writePipe->fd, SD_BOTH);
79 #endif
80
81 flags.closing = true;
82 if (readPipe->fd == writePipe->fd)
83 readPipe->fd = -1;
84 else
85 readPipe->close();
86 writePipe->close();
87
88 #if _SQUID_WINDOWS_
89 if (hIpc) {
90 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
91 getCurrentTime();
92 debugs(84, DBG_IMPORTANT, "WARNING: " << id_name <<
93 " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
94 }
95 CloseHandle(hIpc);
96 }
97 #endif
98 }
99
100 void
101 HelperServerBase::closeWritePipeSafely(const char *id_name)
102 {
103 #if _SQUID_WINDOWS_
104 shutdown(writePipe->fd, (readPipe->fd == writePipe->fd ? SD_BOTH : SD_SEND));
105 #endif
106
107 flags.closing = true;
108 if (readPipe->fd == writePipe->fd)
109 readPipe->fd = -1;
110 writePipe->close();
111
112 #if _SQUID_WINDOWS_
113 if (hIpc) {
114 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
115 getCurrentTime();
116 debugs(84, DBG_IMPORTANT, "WARNING: " << id_name <<
117 " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
118 }
119 CloseHandle(hIpc);
120 }
121 #endif
122 }
123
124 void
125 helperOpenServers(helper * hlp)
126 {
127 char *s;
128 char *progname;
129 char *shortname;
130 char *procname;
131 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
132 char fd_note_buf[FD_DESC_SZ];
133 helper_server *srv;
134 int nargs = 0;
135 int k;
136 pid_t pid;
137 int rfd;
138 int wfd;
139 void * hIpc;
140 wordlist *w;
141
142 if (hlp->cmdline == NULL)
143 return;
144
145 progname = hlp->cmdline->key;
146
147 if ((s = strrchr(progname, '/')))
148 shortname = xstrdup(s + 1);
149 else
150 shortname = xstrdup(progname);
151
152 /* figure out how many new child are actually needed. */
153 int need_new = hlp->childs.needNew();
154
155 debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
156
157 if (need_new < 1) {
158 debugs(84, DBG_IMPORTANT, "helperOpenServers: No '" << shortname << "' processes needed.");
159 }
160
161 procname = (char *)xmalloc(strlen(shortname) + 3);
162
163 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
164
165 args[nargs] = procname;
166 ++nargs;
167
168 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
169 args[nargs] = w->key;
170 ++nargs;
171 }
172
173 args[nargs] = NULL;
174 ++nargs;
175
176 assert(nargs <= HELPER_MAX_ARGS);
177
178 for (k = 0; k < need_new; ++k) {
179 getCurrentTime();
180 rfd = wfd = -1;
181 pid = ipcCreate(hlp->ipc_type,
182 progname,
183 args,
184 shortname,
185 hlp->addr,
186 &rfd,
187 &wfd,
188 &hIpc);
189
190 if (pid < 0) {
191 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
192 continue;
193 }
194
195 ++ hlp->childs.n_running;
196 ++ hlp->childs.n_active;
197 srv = new helper_server;
198 srv->hIpc = hIpc;
199 srv->pid = pid;
200 srv->initStats();
201 srv->addr = hlp->addr;
202 srv->readPipe = new Comm::Connection;
203 srv->readPipe->fd = rfd;
204 srv->writePipe = new Comm::Connection;
205 srv->writePipe->fd = wfd;
206 srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
207 srv->wqueue = new MemBuf;
208 srv->roffset = 0;
209 srv->nextRequestId = 0;
210 srv->replyXaction = NULL;
211 srv->ignoreToEom = false;
212 srv->parent = cbdataReference(hlp);
213 dlinkAddTail(srv, &srv->link, &hlp->servers);
214
215 if (rfd == wfd) {
216 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
217 fd_note(rfd, fd_note_buf);
218 } else {
219 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
220 fd_note(rfd, fd_note_buf);
221 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
222 fd_note(wfd, fd_note_buf);
223 }
224
225 commSetNonBlocking(rfd);
226
227 if (wfd != rfd)
228 commSetNonBlocking(wfd);
229
230 AsyncCall::Pointer closeCall = asyncCall(5,4, "helperServerFree", cbdataDialer(helperServerFree, srv));
231 comm_add_close_handler(rfd, closeCall);
232
233 if (hlp->timeout && hlp->childs.concurrency) {
234 AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "helper_server::requestTimeout",
235 CommTimeoutCbPtrFun(helper_server::requestTimeout, srv));
236 commSetConnTimeout(srv->readPipe, hlp->timeout, timeoutCall);
237 }
238
239 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
240 CommIoCbPtrFun(helperHandleRead, srv));
241 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
242 }
243
244 hlp->last_restart = squid_curtime;
245 safe_free(shortname);
246 safe_free(procname);
247 helperKickQueue(hlp);
248 }
249
250 /**
251 * DPW 2007-05-08
252 *
253 * helperStatefulOpenServers: create the stateful child helper processes
254 */
255 void
256 helperStatefulOpenServers(statefulhelper * hlp)
257 {
258 char *shortname;
259 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
260 char fd_note_buf[FD_DESC_SZ];
261 int nargs = 0;
262
263 if (hlp->cmdline == NULL)
264 return;
265
266 if (hlp->childs.concurrency)
267 debugs(84, DBG_CRITICAL, "ERROR: concurrency= is not yet supported for stateful helpers ('" << hlp->cmdline << "')");
268
269 char *progname = hlp->cmdline->key;
270
271 char *s;
272 if ((s = strrchr(progname, '/')))
273 shortname = xstrdup(s + 1);
274 else
275 shortname = xstrdup(progname);
276
277 /* figure out haw mant new helpers are needed. */
278 int need_new = hlp->childs.needNew();
279
280 debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
281
282 if (need_new < 1) {
283 debugs(84, DBG_IMPORTANT, "helperStatefulOpenServers: No '" << shortname << "' processes needed.");
284 }
285
286 char *procname = (char *)xmalloc(strlen(shortname) + 3);
287
288 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
289
290 args[nargs] = procname;
291 ++nargs;
292
293 for (wordlist *w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
294 args[nargs] = w->key;
295 ++nargs;
296 }
297
298 args[nargs] = NULL;
299 ++nargs;
300
301 assert(nargs <= HELPER_MAX_ARGS);
302
303 for (int k = 0; k < need_new; ++k) {
304 getCurrentTime();
305 int rfd = -1;
306 int wfd = -1;
307 void * hIpc;
308 pid_t pid = ipcCreate(hlp->ipc_type,
309 progname,
310 args,
311 shortname,
312 hlp->addr,
313 &rfd,
314 &wfd,
315 &hIpc);
316
317 if (pid < 0) {
318 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
319 continue;
320 }
321
322 ++ hlp->childs.n_running;
323 ++ hlp->childs.n_active;
324 helper_stateful_server *srv = new helper_stateful_server;
325 srv->hIpc = hIpc;
326 srv->pid = pid;
327 srv->flags.reserved = false;
328 srv->initStats();
329 srv->addr = hlp->addr;
330 srv->readPipe = new Comm::Connection;
331 srv->readPipe->fd = rfd;
332 srv->writePipe = new Comm::Connection;
333 srv->writePipe->fd = wfd;
334 srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
335 srv->roffset = 0;
336 srv->parent = cbdataReference(hlp);
337
338 dlinkAddTail(srv, &srv->link, &hlp->servers);
339
340 if (rfd == wfd) {
341 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
342 fd_note(rfd, fd_note_buf);
343 } else {
344 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
345 fd_note(rfd, fd_note_buf);
346 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
347 fd_note(wfd, fd_note_buf);
348 }
349
350 commSetNonBlocking(rfd);
351
352 if (wfd != rfd)
353 commSetNonBlocking(wfd);
354
355 AsyncCall::Pointer closeCall = asyncCall(5,4, "helperStatefulServerFree", cbdataDialer(helperStatefulServerFree, srv));
356 comm_add_close_handler(rfd, closeCall);
357
358 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
359 CommIoCbPtrFun(helperStatefulHandleRead, srv));
360 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
361 }
362
363 hlp->last_restart = squid_curtime;
364 safe_free(shortname);
365 safe_free(procname);
366 helperStatefulKickQueue(hlp);
367 }
368
369 void
370 helper::submitRequest(Helper::Xaction *r)
371 {
372 helper_server *srv;
373
374 if ((srv = GetFirstAvailable(this)))
375 helperDispatch(srv, r);
376 else
377 Enqueue(this, r);
378
379 syncQueueStats();
380 }
381
382 /// handles helperSubmit() and helperStatefulSubmit() failures
383 static void
384 SubmissionFailure(helper *hlp, HLPCB *callback, void *data)
385 {
386 auto result = Helper::Error;
387 if (!hlp) {
388 debugs(84, 3, "no helper");
389 result = Helper::Unknown;
390 }
391 // else pretend the helper has responded with ERR
392
393 callback(data, Helper::Reply(result));
394 }
395
396 void
397 helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
398 {
399 if (!hlp || !hlp->trySubmit(buf, callback, data))
400 SubmissionFailure(hlp, callback, data);
401 }
402
403 /// whether queuing an additional request would overload the helper
404 bool
405 helper::queueFull() const {
406 return stats.queue_size >= static_cast<int>(childs.queue_size);
407 }
408
409 bool
410 helper::overloaded() const {
411 return stats.queue_size > static_cast<int>(childs.queue_size);
412 }
413
414 /// synchronizes queue-dependent measurements with the current queue state
415 void
416 helper::syncQueueStats()
417 {
418 if (overloaded()) {
419 if (overloadStart) {
420 debugs(84, 5, id_name << " still overloaded; dropped " << droppedRequests);
421 } else {
422 overloadStart = squid_curtime;
423 debugs(84, 3, id_name << " became overloaded");
424 }
425 } else {
426 if (overloadStart) {
427 debugs(84, 5, id_name << " is no longer overloaded");
428 if (droppedRequests) {
429 debugs(84, DBG_IMPORTANT, "helper " << id_name <<
430 " is no longer overloaded after dropping " << droppedRequests <<
431 " requests in " << (squid_curtime - overloadStart) << " seconds");
432 droppedRequests = 0;
433 }
434 overloadStart = 0;
435 }
436 }
437 }
438
439 /// prepares the helper for request submission
440 /// returns true if and only if the submission should proceed
441 /// may kill Squid if the helper remains overloaded for too long
442 bool
443 helper::prepSubmit()
444 {
445 // re-sync for the configuration may have changed since the last submission
446 syncQueueStats();
447
448 // Nothing special to do if the new request does not overload (i.e., the
449 // queue is not even full yet) or only _starts_ overloading this helper
450 // (i.e., the queue is currently at its limit).
451 if (!overloaded())
452 return true;
453
454 if (squid_curtime - overloadStart <= 180)
455 return true; // also OK: overload has not persisted long enough to panic
456
457 if (childs.onPersistentOverload == Helper::ChildConfig::actDie)
458 fatalf("Too many queued %s requests; see on-persistent-overload.", id_name);
459
460 if (!droppedRequests) {
461 debugs(84, DBG_IMPORTANT, "WARNING: dropping requests to overloaded " <<
462 id_name << " helper configured with on-persistent-overload=err");
463 }
464 ++droppedRequests;
465 debugs(84, 3, "failed to send " << droppedRequests << " helper requests to " << id_name);
466 return false;
467 }
468
469 bool
470 helper::trySubmit(const char *buf, HLPCB * callback, void *data)
471 {
472 if (!prepSubmit())
473 return false; // request was dropped
474
475 submit(buf, callback, data); // will send or queue
476 return true; // request submitted or queued
477 }
478
479 /// dispatches or enqueues a helper requests; does not enforce queue limits
480 void
481 helper::submit(const char *buf, HLPCB * callback, void *data)
482 {
483 Helper::Xaction *r = new Helper::Xaction(callback, data, buf);
484 submitRequest(r);
485 debugs(84, DBG_DATA, Raw("buf", buf, strlen(buf)));
486 }
487
488 /// lastserver = "server last used as part of a reserved request sequence"
489 void
490 helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPCB * callback, void *data, helper_stateful_server * lastserver)
491 {
492 if (!hlp || !hlp->trySubmit(buf, callback, data, lastserver))
493 SubmissionFailure(hlp, callback, data);
494 }
495
496 /// If possible, submit request. Otherwise, either kill Squid or return false.
497 bool
498 statefulhelper::trySubmit(const char *buf, HLPCB * callback, void *data, helper_stateful_server *lastserver)
499 {
500 if (!prepSubmit())
501 return false; // request was dropped
502
503 submit(buf, callback, data, lastserver); // will send or queue
504 return true; // request submitted or queued
505 }
506
507 void statefulhelper::submit(const char *buf, HLPCB * callback, void *data, helper_stateful_server * lastserver)
508 {
509 Helper::Xaction *r = new Helper::Xaction(callback, data, buf);
510
511 if ((buf != NULL) && lastserver) {
512 debugs(84, 5, "StatefulSubmit with lastserver " << lastserver);
513 assert(lastserver->flags.reserved);
514 assert(!lastserver->requests.size());
515
516 debugs(84, 5, "StatefulSubmit dispatching");
517 helperStatefulDispatch(lastserver, r);
518 } else {
519 helper_stateful_server *srv;
520 if ((srv = StatefulGetFirstAvailable(this))) {
521 helperStatefulDispatch(srv, r);
522 } else
523 StatefulEnqueue(this, r);
524 }
525
526 debugs(84, DBG_DATA, "placeholder: '" << r->request.placeholder <<
527 "', " << Raw("buf", buf, (!buf?0:strlen(buf))));
528
529 syncQueueStats();
530 }
531
532 /**
533 * DPW 2007-05-08
534 *
535 * helperStatefulReleaseServer tells the helper that whoever was
536 * using it no longer needs its services.
537 */
538 void
539 helperStatefulReleaseServer(helper_stateful_server * srv)
540 {
541 debugs(84, 3, HERE << "srv-" << srv->index << " flags.reserved = " << srv->flags.reserved);
542 if (!srv->flags.reserved)
543 return;
544
545 ++ srv->stats.releases;
546
547 srv->flags.reserved = false;
548
549 helperStatefulServerDone(srv);
550 }
551
552 void
553 helper::packStatsInto(Packable *p, const char *label) const
554 {
555 if (label)
556 p->appendf("%s:\n", label);
557
558 p->appendf(" program: %s\n", cmdline->key);
559 p->appendf(" number active: %d of %d (%d shutting down)\n", childs.n_active, childs.n_max, (childs.n_running - childs.n_active));
560 p->appendf(" requests sent: %d\n", stats.requests);
561 p->appendf(" replies received: %d\n", stats.replies);
562 p->appendf(" requests timedout: %d\n", stats.timedout);
563 p->appendf(" queue length: %d\n", stats.queue_size);
564 p->appendf(" avg service time: %d msec\n", stats.avg_svc_time);
565 p->append("\n",1);
566 p->appendf("%7s\t%7s\t%7s\t%11s\t%11s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
567 "ID #",
568 "FD",
569 "PID",
570 "# Requests",
571 "# Replies",
572 "# Timed-out",
573 "Flags",
574 "Time",
575 "Offset",
576 "Request");
577
578 for (dlink_node *link = servers.head; link; link = link->next) {
579 HelperServerBase *srv = static_cast<HelperServerBase *>(link->data);
580 assert(srv);
581 Helper::Xaction *xaction = srv->requests.empty() ? NULL : srv->requests.front();
582 double tt = 0.001 * (xaction ? tvSubMsec(xaction->request.dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
583 p->appendf("%7u\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
584 srv->index.value,
585 srv->readPipe->fd,
586 srv->pid,
587 srv->stats.uses,
588 srv->stats.replies,
589 srv->stats.timedout,
590 srv->stats.pending ? 'B' : ' ',
591 srv->flags.writing ? 'W' : ' ',
592 srv->flags.closing ? 'C' : ' ',
593 srv->flags.reserved ? 'R' : ' ',
594 srv->flags.shutdown ? 'S' : ' ',
595 xaction && xaction->request.placeholder ? 'P' : ' ',
596 tt < 0.0 ? 0.0 : tt,
597 (int) srv->roffset,
598 xaction ? Format::QuoteMimeBlob(xaction->request.buf) : "(none)");
599 }
600
601 p->append("\nFlags key:\n"
602 " B\tBUSY\n"
603 " W\tWRITING\n"
604 " C\tCLOSING\n"
605 " R\tRESERVED\n"
606 " S\tSHUTDOWN PENDING\n"
607 " P\tPLACEHOLDER\n", 101);
608 }
609
610 bool
611 helper::willOverload() const {
612 return queueFull() && !(childs.needNew() || GetFirstAvailable(this));
613 }
614
615 void
616 helperShutdown(helper * hlp)
617 {
618 dlink_node *link = hlp->servers.head;
619
620 while (link) {
621 helper_server *srv;
622 srv = (helper_server *)link->data;
623 link = link->next;
624
625 if (srv->flags.shutdown) {
626 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
627 continue;
628 }
629
630 assert(hlp->childs.n_active > 0);
631 -- hlp->childs.n_active;
632 srv->flags.shutdown = true; /* request it to shut itself down */
633
634 if (srv->flags.closing) {
635 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
636 continue;
637 }
638
639 if (srv->stats.pending) {
640 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
641 continue;
642 }
643
644 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
645 /* the rest of the details is dealt with in the helperServerFree
646 * close handler
647 */
648 srv->closePipesSafely(hlp->id_name);
649 }
650 }
651
652 void
653 helperStatefulShutdown(statefulhelper * hlp)
654 {
655 dlink_node *link = hlp->servers.head;
656 helper_stateful_server *srv;
657
658 while (link) {
659 srv = (helper_stateful_server *)link->data;
660 link = link->next;
661
662 if (srv->flags.shutdown) {
663 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
664 continue;
665 }
666
667 assert(hlp->childs.n_active > 0);
668 -- hlp->childs.n_active;
669 srv->flags.shutdown = true; /* request it to shut itself down */
670
671 if (srv->stats.pending) {
672 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
673 continue;
674 }
675
676 if (srv->flags.closing) {
677 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
678 continue;
679 }
680
681 if (srv->flags.reserved) {
682 if (shutting_down) {
683 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Closing anyway.");
684 } else {
685 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Not Shutting Down Yet.");
686 continue;
687 }
688 }
689
690 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
691
692 /* the rest of the details is dealt with in the helperStatefulServerFree
693 * close handler
694 */
695 srv->closePipesSafely(hlp->id_name);
696 }
697 }
698
699 helper::~helper()
700 {
701 /* note, don't free id_name, it probably points to static memory */
702
703 // TODO: if the queue is not empty it will leak Helper::Request's
704 if (!queue.empty())
705 debugs(84, DBG_CRITICAL, "WARNING: freeing " << id_name << " helper with " << stats.queue_size << " requests queued");
706 }
707
708 /* ====================================================================== */
709 /* LOCAL FUNCTIONS */
710 /* ====================================================================== */
711
712 static void
713 helperServerFree(helper_server *srv)
714 {
715 helper *hlp = srv->parent;
716 int concurrency = hlp->childs.concurrency;
717
718 if (!concurrency)
719 concurrency = 1;
720
721 if (srv->rbuf) {
722 memFreeBuf(srv->rbuf_sz, srv->rbuf);
723 srv->rbuf = NULL;
724 }
725
726 srv->wqueue->clean();
727 delete srv->wqueue;
728
729 if (srv->writebuf) {
730 srv->writebuf->clean();
731 delete srv->writebuf;
732 srv->writebuf = NULL;
733 }
734
735 if (Comm::IsConnOpen(srv->writePipe))
736 srv->closeWritePipeSafely(hlp->id_name);
737
738 dlinkDelete(&srv->link, &hlp->servers);
739
740 assert(hlp->childs.n_running > 0);
741 -- hlp->childs.n_running;
742
743 if (!srv->flags.shutdown) {
744 assert(hlp->childs.n_active > 0);
745 -- hlp->childs.n_active;
746 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index << " exited");
747
748 if (hlp->childs.needNew() > 0) {
749 debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
750
751 if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30) {
752 if (srv->stats.replies < 1)
753 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
754 else
755 debugs(80, DBG_CRITICAL, "ERROR: The " << hlp->id_name << " helpers are crashing too rapidly, need help!");
756 }
757
758 debugs(80, DBG_IMPORTANT, "Starting new helpers");
759 helperOpenServers(hlp);
760 }
761 }
762
763 while (!srv->requests.empty()) {
764 // XXX: re-schedule these on another helper?
765 Helper::Xaction *r = srv->requests.front();
766 srv->requests.pop_front();
767 void *cbdata;
768
769 if (cbdataReferenceValidDone(r->request.data, &cbdata)) {
770 r->reply.result = Helper::Unknown;
771 r->request.callback(cbdata, r->reply);
772 }
773
774 delete r;
775 }
776 srv->requestsIndex.clear();
777
778 cbdataReferenceDone(srv->parent);
779 delete srv;
780 }
781
782 static void
783 helperStatefulServerFree(helper_stateful_server *srv)
784 {
785 statefulhelper *hlp = srv->parent;
786
787 if (srv->rbuf) {
788 memFreeBuf(srv->rbuf_sz, srv->rbuf);
789 srv->rbuf = NULL;
790 }
791
792 #if 0
793 srv->wqueue->clean();
794
795 delete srv->wqueue;
796
797 #endif
798
799 /* TODO: walk the local queue of requests and carry them all out */
800 if (Comm::IsConnOpen(srv->writePipe))
801 srv->closeWritePipeSafely(hlp->id_name);
802
803 dlinkDelete(&srv->link, &hlp->servers);
804
805 assert(hlp->childs.n_running > 0);
806 -- hlp->childs.n_running;
807
808 if (!srv->flags.shutdown) {
809 assert( hlp->childs.n_active > 0);
810 -- hlp->childs.n_active;
811 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index << " exited");
812
813 if (hlp->childs.needNew() > 0) {
814 debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
815
816 if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30) {
817 if (srv->stats.replies < 1)
818 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
819 else
820 debugs(80, DBG_CRITICAL, "ERROR: The " << hlp->id_name << " helpers are crashing too rapidly, need help!");
821 }
822
823 debugs(80, DBG_IMPORTANT, "Starting new helpers");
824 helperStatefulOpenServers(hlp);
825 }
826 }
827
828 while (!srv->requests.empty()) {
829 // XXX: re-schedule these on another helper?
830 Helper::Xaction *r = srv->requests.front();
831 srv->requests.pop_front();
832 void *cbdata;
833
834 if (cbdataReferenceValidDone(r->request.data, &cbdata)) {
835 r->reply.result = Helper::Unknown;
836 r->request.callback(cbdata, r->reply);
837 }
838
839 delete r;
840 }
841
842 cbdataReferenceDone(srv->parent);
843
844 delete srv;
845 }
846
847 Helper::Xaction *
848 helper_server::popRequest(int request_number)
849 {
850 Helper::Xaction *r = nullptr;
851 helper_server::RequestIndex::iterator it;
852 if (parent->childs.concurrency) {
853 // If concurency supported retrieve request from ID
854 it = requestsIndex.find(request_number);
855 if (it != requestsIndex.end()) {
856 r = *(it->second);
857 requests.erase(it->second);
858 requestsIndex.erase(it);
859 }
860 } else if(!requests.empty()) {
861 // Else get the first request from queue, if any
862 r = requests.front();
863 requests.pop_front();
864 }
865
866 return r;
867 }
868
869 /// Calls back with a pointer to the buffer with the helper output
870 static void
871 helperReturnBuffer(helper_server * srv, helper * hlp, char * msg, size_t msgSize, char * msgEnd)
872 {
873 if (Helper::Xaction *r = srv->replyXaction) {
874 const bool hasSpace = r->reply.accumulate(msg, msgSize);
875 if (!hasSpace) {
876 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
877 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
878 "Squid input buffer: " << hlp->id_name << " #" << srv->index);
879 srv->closePipesSafely(hlp->id_name);
880 return;
881 }
882
883 if (!msgEnd)
884 return; // We are waiting for more data.
885
886 bool retry = false;
887 if (cbdataReferenceValid(r->request.data)) {
888 r->reply.finalize();
889 if (r->reply.result == Helper::BrokenHelper && r->request.retries < MAX_RETRIES) {
890 debugs(84, DBG_IMPORTANT, "ERROR: helper: " << r->reply << ", attempt #" << (r->request.retries + 1) << " of 2");
891 retry = true;
892 } else {
893 HLPCB *callback = r->request.callback;
894 r->request.callback = nullptr;
895 void *cbdata = nullptr;
896 if (cbdataReferenceValidDone(r->request.data, &cbdata))
897 callback(cbdata, r->reply);
898 }
899 }
900
901 -- srv->stats.pending;
902 ++ srv->stats.replies;
903
904 ++ hlp->stats.replies;
905
906 srv->answer_time = current_time;
907
908 srv->dispatch_time = r->request.dispatch_time;
909
910 hlp->stats.avg_svc_time =
911 Math::intAverage(hlp->stats.avg_svc_time,
912 tvSubMsec(r->request.dispatch_time, current_time),
913 hlp->stats.replies, REDIRECT_AV_FACTOR);
914
915 // release or re-submit parsedRequestXaction object
916 srv->replyXaction = nullptr;
917 if (retry) {
918 ++r->request.retries;
919 hlp->submitRequest(r);
920 } else
921 delete r;
922 }
923
924 if (hlp->timeout && hlp->childs.concurrency)
925 srv->checkForTimedOutRequests(hlp->retryTimedOut);
926
927 if (!srv->flags.shutdown) {
928 helperKickQueue(hlp);
929 } else if (!srv->flags.closing && !srv->stats.pending) {
930 srv->flags.closing=true;
931 srv->writePipe->close();
932 }
933 }
934
935 static void
936 helperHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
937 {
938 helper_server *srv = (helper_server *)data;
939 helper *hlp = srv->parent;
940 assert(cbdataReferenceValid(data));
941
942 /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
943
944 if (flag == Comm::ERR_CLOSING) {
945 return;
946 }
947
948 assert(conn->fd == srv->readPipe->fd);
949
950 debugs(84, 5, "helperHandleRead: " << len << " bytes from " << hlp->id_name << " #" << srv->index);
951
952 if (flag != Comm::OK || len == 0) {
953 srv->closePipesSafely(hlp->id_name);
954 return;
955 }
956
957 srv->roffset += len;
958 srv->rbuf[srv->roffset] = '\0';
959 debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
960
961 if (!srv->stats.pending && !srv->stats.timedout) {
962 /* someone spoke without being spoken to */
963 debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected read from " <<
964 hlp->id_name << " #" << srv->index << ", " << (int)len <<
965 " bytes '" << srv->rbuf << "'");
966
967 srv->roffset = 0;
968 srv->rbuf[0] = '\0';
969 }
970
971 bool needsMore = false;
972 char *msg = srv->rbuf;
973 while (*msg && !needsMore) {
974 int skip = 0;
975 char *eom = strchr(msg, hlp->eom);
976 if (eom) {
977 skip = 1;
978 debugs(84, 3, "helperHandleRead: end of reply found");
979 if (eom > msg && eom[-1] == '\r' && hlp->eom == '\n') {
980 *eom = '\0';
981 // rewind to the \r octet which is the real terminal now
982 // and remember that we have to skip forward 2 places now.
983 skip = 2;
984 --eom;
985 }
986 *eom = '\0';
987 }
988
989 if (!srv->ignoreToEom && !srv->replyXaction) {
990 int i = 0;
991 if (hlp->childs.concurrency) {
992 char *e = NULL;
993 i = strtol(msg, &e, 10);
994 // Do we need to check for e == msg? Means wrong response from helper.
995 // Will be droped as "unexpected reply on channel 0"
996 needsMore = !(xisspace(*e) || (eom && e == eom));
997 if (!needsMore) {
998 msg = e;
999 while (*msg && xisspace(*msg))
1000 ++msg;
1001 } // else not enough data to compute request number
1002 }
1003 if (!(srv->replyXaction = srv->popRequest(i))) {
1004 if (srv->stats.timedout) {
1005 debugs(84, 3, "Timedout reply received for request-ID: " << i << " , ignore");
1006 } else {
1007 debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected reply on channel " <<
1008 i << " from " << hlp->id_name << " #" << srv->index <<
1009 " '" << srv->rbuf << "'");
1010 }
1011 srv->ignoreToEom = true;
1012 }
1013 } // else we need to just append reply data to the current Xaction
1014
1015 if (!needsMore) {
1016 size_t msgSize = eom ? eom - msg : (srv->roffset - (msg - srv->rbuf));
1017 assert(msgSize <= srv->rbuf_sz);
1018 helperReturnBuffer(srv, hlp, msg, msgSize, eom);
1019 msg += msgSize + skip;
1020 assert(static_cast<size_t>(msg - srv->rbuf) <= srv->rbuf_sz);
1021
1022 // The next message should not ignored.
1023 if (eom && srv->ignoreToEom)
1024 srv->ignoreToEom = false;
1025 } else
1026 assert(skip == 0 && eom == NULL);
1027 }
1028
1029 if (needsMore) {
1030 size_t msgSize = (srv->roffset - (msg - srv->rbuf));
1031 assert(msgSize <= srv->rbuf_sz);
1032 memmove(srv->rbuf, msg, msgSize);
1033 srv->roffset = msgSize;
1034 srv->rbuf[srv->roffset] = '\0';
1035 } else {
1036 // All of the responses parsed and msg points at the end of read data
1037 assert(static_cast<size_t>(msg - srv->rbuf) == srv->roffset);
1038 srv->roffset = 0;
1039 }
1040
1041 if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
1042 int spaceSize = srv->rbuf_sz - srv->roffset - 1;
1043 assert(spaceSize >= 0);
1044
1045 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
1046 CommIoCbPtrFun(helperHandleRead, srv));
1047 comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
1048 }
1049 }
1050
1051 static void
1052 helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
1053 {
1054 char *t = NULL;
1055 helper_stateful_server *srv = (helper_stateful_server *)data;
1056 statefulhelper *hlp = srv->parent;
1057 assert(cbdataReferenceValid(data));
1058
1059 /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
1060
1061 if (flag == Comm::ERR_CLOSING) {
1062 return;
1063 }
1064
1065 assert(conn->fd == srv->readPipe->fd);
1066
1067 debugs(84, 5, "helperStatefulHandleRead: " << len << " bytes from " <<
1068 hlp->id_name << " #" << srv->index);
1069
1070 if (flag != Comm::OK || len == 0) {
1071 srv->closePipesSafely(hlp->id_name);
1072 return;
1073 }
1074
1075 srv->roffset += len;
1076 srv->rbuf[srv->roffset] = '\0';
1077 Helper::Xaction *r = srv->requests.front();
1078 debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
1079
1080 if (r == NULL) {
1081 /* someone spoke without being spoken to */
1082 debugs(84, DBG_IMPORTANT, "helperStatefulHandleRead: unexpected read from " <<
1083 hlp->id_name << " #" << srv->index << ", " << (int)len <<
1084 " bytes '" << srv->rbuf << "'");
1085
1086 srv->roffset = 0;
1087 }
1088
1089 if ((t = strchr(srv->rbuf, hlp->eom))) {
1090 debugs(84, 3, "helperStatefulHandleRead: end of reply found");
1091
1092 if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n') {
1093 *t = '\0';
1094 // rewind to the \r octet which is the real terminal now
1095 --t;
1096 }
1097
1098 *t = '\0';
1099 }
1100
1101 if (r && !r->reply.accumulate(srv->rbuf, t ? (t - srv->rbuf) : srv->roffset)) {
1102 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
1103 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
1104 "Squid input buffer: " << hlp->id_name << " #" << srv->index);
1105 srv->closePipesSafely(hlp->id_name);
1106 return;
1107 }
1108 /**
1109 * BUG: the below assumes that only one response per read() was received and discards any octets remaining.
1110 * Doing this prohibits concurrency support with multiple replies per read().
1111 * TODO: check that read() setup on these buffers pays attention to roffest!=0
1112 * TODO: check that replies bigger than the buffer are discarded and do not to affect future replies
1113 */
1114 srv->roffset = 0;
1115
1116 if (t) {
1117 /* end of reply found */
1118 srv->requests.pop_front(); // we already have it in 'r'
1119 int called = 1;
1120
1121 if (r && cbdataReferenceValid(r->request.data)) {
1122 r->reply.finalize();
1123 r->reply.whichServer = srv;
1124 r->request.callback(r->request.data, r->reply);
1125 } else {
1126 debugs(84, DBG_IMPORTANT, "StatefulHandleRead: no callback data registered");
1127 called = 0;
1128 }
1129
1130 delete r;
1131
1132 -- srv->stats.pending;
1133 ++ srv->stats.replies;
1134
1135 ++ hlp->stats.replies;
1136 srv->answer_time = current_time;
1137 hlp->stats.avg_svc_time =
1138 Math::intAverage(hlp->stats.avg_svc_time,
1139 tvSubMsec(srv->dispatch_time, current_time),
1140 hlp->stats.replies, REDIRECT_AV_FACTOR);
1141
1142 if (called)
1143 helperStatefulServerDone(srv);
1144 else
1145 helperStatefulReleaseServer(srv);
1146 }
1147
1148 if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
1149 int spaceSize = srv->rbuf_sz - 1;
1150
1151 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
1152 CommIoCbPtrFun(helperStatefulHandleRead, srv));
1153 comm_read(srv->readPipe, srv->rbuf, spaceSize, call);
1154 }
1155 }
1156
1157 /// Handles a request when all running helpers, if any, are busy.
1158 static void
1159 Enqueue(helper * hlp, Helper::Xaction * r)
1160 {
1161 hlp->queue.push(r);
1162 ++ hlp->stats.queue_size;
1163
1164 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1165 if (hlp->childs.needNew() > 0) {
1166 debugs(84, DBG_CRITICAL, "Starting new " << hlp->id_name << " helpers...");
1167 helperOpenServers(hlp);
1168 return;
1169 }
1170
1171 if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
1172 return;
1173
1174 if (squid_curtime - hlp->last_queue_warn < 600)
1175 return;
1176
1177 if (shutting_down || reconfiguring)
1178 return;
1179
1180 hlp->last_queue_warn = squid_curtime;
1181
1182 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1183 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1184 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1185 }
1186
1187 static void
1188 StatefulEnqueue(statefulhelper * hlp, Helper::Xaction * r)
1189 {
1190 hlp->queue.push(r);
1191 ++ hlp->stats.queue_size;
1192
1193 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1194 if (hlp->childs.needNew() > 0) {
1195 debugs(84, DBG_CRITICAL, "Starting new " << hlp->id_name << " helpers...");
1196 helperStatefulOpenServers(hlp);
1197 return;
1198 }
1199
1200 if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
1201 return;
1202
1203 if (squid_curtime - hlp->last_queue_warn < 600)
1204 return;
1205
1206 if (shutting_down || reconfiguring)
1207 return;
1208
1209 hlp->last_queue_warn = squid_curtime;
1210
1211 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1212 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1213 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1214 }
1215
1216 Helper::Xaction *
1217 helper::nextRequest()
1218 {
1219 if (queue.empty())
1220 return nullptr;
1221
1222 auto *r = queue.front();
1223 queue.pop();
1224 --stats.queue_size;
1225 return r;
1226 }
1227
1228 static helper_server *
1229 GetFirstAvailable(const helper * hlp)
1230 {
1231 dlink_node *n;
1232 helper_server *srv;
1233 helper_server *selected = NULL;
1234 debugs(84, 5, "GetFirstAvailable: Running servers " << hlp->childs.n_running);
1235
1236 if (hlp->childs.n_running == 0)
1237 return NULL;
1238
1239 /* Find "least" loaded helper (approx) */
1240 for (n = hlp->servers.head; n != NULL; n = n->next) {
1241 srv = (helper_server *)n->data;
1242
1243 if (selected && selected->stats.pending <= srv->stats.pending)
1244 continue;
1245
1246 if (srv->flags.shutdown)
1247 continue;
1248
1249 if (!srv->stats.pending)
1250 return srv;
1251
1252 if (selected) {
1253 selected = srv;
1254 break;
1255 }
1256
1257 selected = srv;
1258 }
1259
1260 if (!selected) {
1261 debugs(84, 5, "GetFirstAvailable: None available.");
1262 return NULL;
1263 }
1264
1265 if (selected->stats.pending >= (hlp->childs.concurrency ? hlp->childs.concurrency : 1)) {
1266 debugs(84, 3, "GetFirstAvailable: Least-loaded helper is fully loaded!");
1267 return NULL;
1268 }
1269
1270 debugs(84, 5, "GetFirstAvailable: returning srv-" << selected->index);
1271 return selected;
1272 }
1273
1274 static helper_stateful_server *
1275 StatefulGetFirstAvailable(const statefulhelper * hlp)
1276 {
1277 dlink_node *n;
1278 helper_stateful_server *srv = NULL;
1279 debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp->childs.n_running);
1280
1281 if (hlp->childs.n_running == 0)
1282 return NULL;
1283
1284 for (n = hlp->servers.head; n != NULL; n = n->next) {
1285 srv = (helper_stateful_server *)n->data;
1286
1287 if (srv->stats.pending)
1288 continue;
1289
1290 if (srv->flags.reserved)
1291 continue;
1292
1293 if (srv->flags.shutdown)
1294 continue;
1295
1296 debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv->index);
1297 return srv;
1298 }
1299
1300 debugs(84, 5, "StatefulGetFirstAvailable: None available.");
1301 return NULL;
1302 }
1303
1304 static void
1305 helperDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag flag, int, void *data)
1306 {
1307 helper_server *srv = (helper_server *)data;
1308
1309 srv->writebuf->clean();
1310 delete srv->writebuf;
1311 srv->writebuf = NULL;
1312 srv->flags.writing = false;
1313
1314 if (flag != Comm::OK) {
1315 /* Helper server has crashed */
1316 debugs(84, DBG_CRITICAL, "helperDispatch: Helper " << srv->parent->id_name << " #" << srv->index << " has crashed");
1317 return;
1318 }
1319
1320 if (!srv->wqueue->isNull()) {
1321 srv->writebuf = srv->wqueue;
1322 srv->wqueue = new MemBuf;
1323 srv->flags.writing = true;
1324 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1325 CommIoCbPtrFun(helperDispatchWriteDone, srv));
1326 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
1327 }
1328 }
1329
1330 static void
1331 helperDispatch(helper_server * srv, Helper::Xaction * r)
1332 {
1333 helper *hlp = srv->parent;
1334 const uint64_t reqId = ++srv->nextRequestId;
1335
1336 if (!cbdataReferenceValid(r->request.data)) {
1337 debugs(84, DBG_IMPORTANT, "helperDispatch: invalid callback data");
1338 delete r;
1339 return;
1340 }
1341
1342 r->request.Id = reqId;
1343 helper_server::Requests::iterator it = srv->requests.insert(srv->requests.end(), r);
1344 r->request.dispatch_time = current_time;
1345
1346 if (srv->wqueue->isNull())
1347 srv->wqueue->init();
1348
1349 if (hlp->childs.concurrency) {
1350 srv->requestsIndex.insert(helper_server::RequestIndex::value_type(reqId, it));
1351 assert(srv->requestsIndex.size() == srv->requests.size());
1352 srv->wqueue->appendf("%" PRIu64 " %s", reqId, r->request.buf);
1353 } else
1354 srv->wqueue->append(r->request.buf, strlen(r->request.buf));
1355
1356 if (!srv->flags.writing) {
1357 assert(NULL == srv->writebuf);
1358 srv->writebuf = srv->wqueue;
1359 srv->wqueue = new MemBuf;
1360 srv->flags.writing = true;
1361 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1362 CommIoCbPtrFun(helperDispatchWriteDone, srv));
1363 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
1364 }
1365
1366 debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index << ", " << strlen(r->request.buf) << " bytes");
1367
1368 ++ srv->stats.uses;
1369 ++ srv->stats.pending;
1370 ++ hlp->stats.requests;
1371 }
1372
1373 static void
1374 helperStatefulDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag, int, void *)
1375 {}
1376
1377 static void
1378 helperStatefulDispatch(helper_stateful_server * srv, Helper::Xaction * r)
1379 {
1380 statefulhelper *hlp = srv->parent;
1381
1382 if (!cbdataReferenceValid(r->request.data)) {
1383 debugs(84, DBG_IMPORTANT, "helperStatefulDispatch: invalid callback data");
1384 delete r;
1385 helperStatefulReleaseServer(srv);
1386 return;
1387 }
1388
1389 debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index);
1390
1391 if (r->request.placeholder == 1) {
1392 /* a callback is needed before this request can _use_ a helper. */
1393 /* we don't care about releasing this helper. The request NEVER
1394 * gets to the helper. So we throw away the return code */
1395 r->reply.result = Helper::Unknown;
1396 r->reply.whichServer = srv;
1397 r->request.callback(r->request.data, r->reply);
1398 /* throw away the placeholder */
1399 delete r;
1400 /* and push the queue. Note that the callback may have submitted a new
1401 * request to the helper which is why we test for the request */
1402
1403 if (!srv->requests.size())
1404 helperStatefulServerDone(srv);
1405
1406 return;
1407 }
1408
1409 srv->flags.reserved = true;
1410 srv->requests.push_back(r);
1411 srv->dispatch_time = current_time;
1412 AsyncCall::Pointer call = commCbCall(5,5, "helperStatefulDispatchWriteDone",
1413 CommIoCbPtrFun(helperStatefulDispatchWriteDone, hlp));
1414 Comm::Write(srv->writePipe, r->request.buf, strlen(r->request.buf), call, NULL);
1415 debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
1416 hlp->id_name << " #" << srv->index << ", " <<
1417 (int) strlen(r->request.buf) << " bytes");
1418
1419 ++ srv->stats.uses;
1420 ++ srv->stats.pending;
1421 ++ hlp->stats.requests;
1422 }
1423
1424 static void
1425 helperKickQueue(helper * hlp)
1426 {
1427 Helper::Xaction *r;
1428 helper_server *srv;
1429
1430 while ((srv = GetFirstAvailable(hlp)) && (r = hlp->nextRequest()))
1431 helperDispatch(srv, r);
1432 }
1433
1434 static void
1435 helperStatefulKickQueue(statefulhelper * hlp)
1436 {
1437 Helper::Xaction *r;
1438 helper_stateful_server *srv;
1439
1440 while ((srv = StatefulGetFirstAvailable(hlp)) && (r = hlp->nextRequest()))
1441 helperStatefulDispatch(srv, r);
1442 }
1443
1444 static void
1445 helperStatefulServerDone(helper_stateful_server * srv)
1446 {
1447 if (!srv->flags.shutdown) {
1448 helperStatefulKickQueue(srv->parent);
1449 } else if (!srv->flags.closing && !srv->flags.reserved && !srv->stats.pending) {
1450 srv->closeWritePipeSafely(srv->parent->id_name);
1451 return;
1452 }
1453 }
1454
1455 void
1456 helper_server::checkForTimedOutRequests(bool const retry)
1457 {
1458 assert(parent->childs.concurrency);
1459 while(!requests.empty() && requests.front()->request.timedOut(parent->timeout)) {
1460 Helper::Xaction *r = requests.front();
1461 RequestIndex::iterator it;
1462 it = requestsIndex.find(r->request.Id);
1463 assert(it != requestsIndex.end());
1464 requestsIndex.erase(it);
1465 requests.pop_front();
1466 debugs(84, 2, "Request " << r->request.Id << " timed-out, remove it from queue");
1467 void *cbdata;
1468 bool retried = false;
1469 if (retry && r->request.retries < MAX_RETRIES && cbdataReferenceValid(r->request.data)) {
1470 debugs(84, 2, "Retry request " << r->request.Id);
1471 ++r->request.retries;
1472 parent->submitRequest(r);
1473 retried = true;
1474 } else if (cbdataReferenceValidDone(r->request.data, &cbdata)) {
1475 if (!parent->onTimedOutResponse.isEmpty()) {
1476 if (r->reply.accumulate(parent->onTimedOutResponse.rawContent(), parent->onTimedOutResponse.length()))
1477 r->reply.finalize();
1478 else
1479 r->reply.result = Helper::TimedOut;
1480 r->request.callback(cbdata, r->reply);
1481 } else {
1482 r->reply.result = Helper::TimedOut;
1483 r->request.callback(cbdata, r->reply);
1484 }
1485 }
1486 --stats.pending;
1487 ++stats.timedout;
1488 ++parent->stats.timedout;
1489 if (!retried)
1490 delete r;
1491 }
1492 }
1493
1494 void
1495 helper_server::requestTimeout(const CommTimeoutCbParams &io)
1496 {
1497 debugs(26, 3, HERE << io.conn);
1498 helper_server *srv = static_cast<helper_server *>(io.data);
1499
1500 if (!cbdataReferenceValid(srv))
1501 return;
1502
1503 srv->checkForTimedOutRequests(srv->parent->retryTimedOut);
1504
1505 debugs(84, 3, HERE << io.conn << " establish new helper_server::requestTimeout");
1506 AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "helper_server::requestTimeout",
1507 CommTimeoutCbPtrFun(helper_server::requestTimeout, srv));
1508
1509 const int timeSpent = srv->requests.empty() ? 0 : (squid_curtime - srv->requests.front()->request.dispatch_time.tv_sec);
1510 const int timeLeft = max(1, (static_cast<int>(srv->parent->timeout) - timeSpent));
1511
1512 commSetConnTimeout(io.conn, timeLeft, timeoutCall);
1513 }
1514