]> git.ipfire.org Git - thirdparty/squid.git/blob - src/helper.cc
Merge from trunk rev.14687
[thirdparty/squid.git] / src / helper.cc
1 /*
2 * Copyright (C) 1996-2016 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 84 Helper process maintenance */
10
11 #include "squid.h"
12 #include "base/AsyncCbdataCalls.h"
13 #include "base/Packable.h"
14 #include "comm.h"
15 #include "comm/Connection.h"
16 #include "comm/Read.h"
17 #include "comm/Write.h"
18 #include "fd.h"
19 #include "fde.h"
20 #include "format/Quoting.h"
21 #include "helper.h"
22 #include "helper/Reply.h"
23 #include "helper/Request.h"
24 #include "MemBuf.h"
25 #include "SquidConfig.h"
26 #include "SquidIpc.h"
27 #include "SquidMath.h"
28 #include "SquidTime.h"
29 #include "Store.h"
30 #include "wordlist.h"
31
32 // helper_stateful_server::data uses explicit alloc()/freeOne() */
33 #include "mem/Pool.h"
34
35 #define HELPER_MAX_ARGS 64
36
37 /// The maximum allowed request retries.
38 #define MAX_RETRIES 2
39
40 /** Initial Squid input buffer size. Helper responses may exceed this, and
41 * Squid will grow the input buffer as needed, up to ReadBufMaxSize.
42 */
43 const size_t ReadBufMinSize(4*1024);
44
45 /** Maximum safe size of a helper-to-Squid response message plus one.
46 * Squid will warn and close the stream if a helper sends a too-big response.
47 * ssl_crtd helper is known to produce responses of at least 10KB in size.
48 * Some undocumented helpers are known to produce responses exceeding 8KB.
49 */
50 const size_t ReadBufMaxSize(32*1024);
51
52 static IOCB helperHandleRead;
53 static IOCB helperStatefulHandleRead;
54 static void helperServerFree(helper_server *srv);
55 static void helperStatefulServerFree(helper_stateful_server *srv);
56 static void Enqueue(helper * hlp, Helper::Request *);
57 static helper_server *GetFirstAvailable(helper * hlp);
58 static helper_stateful_server *StatefulGetFirstAvailable(statefulhelper * hlp);
59 static void helperDispatch(helper_server * srv, Helper::Request * r);
60 static void helperStatefulDispatch(helper_stateful_server * srv, Helper::Request * r);
61 static void helperKickQueue(helper * hlp);
62 static void helperStatefulKickQueue(statefulhelper * hlp);
63 static void helperStatefulServerDone(helper_stateful_server * srv);
64 static void StatefulEnqueue(statefulhelper * hlp, Helper::Request * r);
65
66 CBDATA_CLASS_INIT(helper);
67 CBDATA_CLASS_INIT(helper_server);
68 CBDATA_CLASS_INIT(statefulhelper);
69 CBDATA_CLASS_INIT(helper_stateful_server);
70
71 InstanceIdDefinitions(HelperServerBase, "Hlpr");
72
73 void
74 HelperServerBase::initStats()
75 {
76 stats.uses=0;
77 stats.replies=0;
78 stats.pending=0;
79 stats.releases=0;
80 stats.timedout = 0;
81 }
82
83 void
84 HelperServerBase::closePipesSafely(const char *id_name)
85 {
86 #if _SQUID_WINDOWS_
87 shutdown(writePipe->fd, SD_BOTH);
88 #endif
89
90 flags.closing = true;
91 if (readPipe->fd == writePipe->fd)
92 readPipe->fd = -1;
93 else
94 readPipe->close();
95 writePipe->close();
96
97 #if _SQUID_WINDOWS_
98 if (hIpc) {
99 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
100 getCurrentTime();
101 debugs(84, DBG_IMPORTANT, "WARNING: " << id_name <<
102 " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
103 }
104 CloseHandle(hIpc);
105 }
106 #endif
107 }
108
109 void
110 HelperServerBase::closeWritePipeSafely(const char *id_name)
111 {
112 #if _SQUID_WINDOWS_
113 shutdown(writePipe->fd, (readPipe->fd == writePipe->fd ? SD_BOTH : SD_SEND));
114 #endif
115
116 flags.closing = true;
117 if (readPipe->fd == writePipe->fd)
118 readPipe->fd = -1;
119 writePipe->close();
120
121 #if _SQUID_WINDOWS_
122 if (hIpc) {
123 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
124 getCurrentTime();
125 debugs(84, DBG_IMPORTANT, "WARNING: " << id_name <<
126 " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
127 }
128 CloseHandle(hIpc);
129 }
130 #endif
131 }
132
133 void
134 helperOpenServers(helper * hlp)
135 {
136 char *s;
137 char *progname;
138 char *shortname;
139 char *procname;
140 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
141 char fd_note_buf[FD_DESC_SZ];
142 helper_server *srv;
143 int nargs = 0;
144 int k;
145 pid_t pid;
146 int rfd;
147 int wfd;
148 void * hIpc;
149 wordlist *w;
150
151 if (hlp->cmdline == NULL)
152 return;
153
154 progname = hlp->cmdline->key;
155
156 if ((s = strrchr(progname, '/')))
157 shortname = xstrdup(s + 1);
158 else
159 shortname = xstrdup(progname);
160
161 /* figure out how many new child are actually needed. */
162 int need_new = hlp->childs.needNew();
163
164 debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
165
166 if (need_new < 1) {
167 debugs(84, DBG_IMPORTANT, "helperOpenServers: No '" << shortname << "' processes needed.");
168 }
169
170 procname = (char *)xmalloc(strlen(shortname) + 3);
171
172 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
173
174 args[nargs] = procname;
175 ++nargs;
176
177 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
178 args[nargs] = w->key;
179 ++nargs;
180 }
181
182 args[nargs] = NULL;
183 ++nargs;
184
185 assert(nargs <= HELPER_MAX_ARGS);
186
187 for (k = 0; k < need_new; ++k) {
188 getCurrentTime();
189 rfd = wfd = -1;
190 pid = ipcCreate(hlp->ipc_type,
191 progname,
192 args,
193 shortname,
194 hlp->addr,
195 &rfd,
196 &wfd,
197 &hIpc);
198
199 if (pid < 0) {
200 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
201 continue;
202 }
203
204 ++ hlp->childs.n_running;
205 ++ hlp->childs.n_active;
206 srv = new helper_server;
207 srv->hIpc = hIpc;
208 srv->pid = pid;
209 srv->initStats();
210 srv->addr = hlp->addr;
211 srv->readPipe = new Comm::Connection;
212 srv->readPipe->fd = rfd;
213 srv->writePipe = new Comm::Connection;
214 srv->writePipe->fd = wfd;
215 srv->rbuf = (char *)memAllocBuf(ReadBufMinSize, &srv->rbuf_sz);
216 srv->wqueue = new MemBuf;
217 srv->roffset = 0;
218 srv->nextRequestId = 0;
219 srv->parent = cbdataReference(hlp);
220 dlinkAddTail(srv, &srv->link, &hlp->servers);
221
222 if (rfd == wfd) {
223 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
224 fd_note(rfd, fd_note_buf);
225 } else {
226 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
227 fd_note(rfd, fd_note_buf);
228 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
229 fd_note(wfd, fd_note_buf);
230 }
231
232 commSetNonBlocking(rfd);
233
234 if (wfd != rfd)
235 commSetNonBlocking(wfd);
236
237 AsyncCall::Pointer closeCall = asyncCall(5,4, "helperServerFree", cbdataDialer(helperServerFree, srv));
238 comm_add_close_handler(rfd, closeCall);
239
240 if (hlp->timeout && hlp->childs.concurrency) {
241 AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "helper_server::requestTimeout",
242 CommTimeoutCbPtrFun(helper_server::requestTimeout, srv));
243 commSetConnTimeout(srv->readPipe, hlp->timeout, timeoutCall);
244 }
245
246 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
247 CommIoCbPtrFun(helperHandleRead, srv));
248 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
249 }
250
251 hlp->last_restart = squid_curtime;
252 safe_free(shortname);
253 safe_free(procname);
254 helperKickQueue(hlp);
255 }
256
257 /**
258 * DPW 2007-05-08
259 *
260 * helperStatefulOpenServers: create the stateful child helper processes
261 */
262 void
263 helperStatefulOpenServers(statefulhelper * hlp)
264 {
265 char *shortname;
266 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
267 char fd_note_buf[FD_DESC_SZ];
268 int nargs = 0;
269
270 if (hlp->cmdline == NULL)
271 return;
272
273 if (hlp->childs.concurrency)
274 debugs(84, DBG_CRITICAL, "ERROR: concurrency= is not yet supported for stateful helpers ('" << hlp->cmdline << "')");
275
276 char *progname = hlp->cmdline->key;
277
278 char *s;
279 if ((s = strrchr(progname, '/')))
280 shortname = xstrdup(s + 1);
281 else
282 shortname = xstrdup(progname);
283
284 /* figure out haw mant new helpers are needed. */
285 int need_new = hlp->childs.needNew();
286
287 debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
288
289 if (need_new < 1) {
290 debugs(84, DBG_IMPORTANT, "helperStatefulOpenServers: No '" << shortname << "' processes needed.");
291 }
292
293 char *procname = (char *)xmalloc(strlen(shortname) + 3);
294
295 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
296
297 args[nargs] = procname;
298 ++nargs;
299
300 for (wordlist *w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
301 args[nargs] = w->key;
302 ++nargs;
303 }
304
305 args[nargs] = NULL;
306 ++nargs;
307
308 assert(nargs <= HELPER_MAX_ARGS);
309
310 for (int k = 0; k < need_new; ++k) {
311 getCurrentTime();
312 int rfd = -1;
313 int wfd = -1;
314 void * hIpc;
315 pid_t pid = ipcCreate(hlp->ipc_type,
316 progname,
317 args,
318 shortname,
319 hlp->addr,
320 &rfd,
321 &wfd,
322 &hIpc);
323
324 if (pid < 0) {
325 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
326 continue;
327 }
328
329 ++ hlp->childs.n_running;
330 ++ hlp->childs.n_active;
331 helper_stateful_server *srv = new helper_stateful_server;
332 srv->hIpc = hIpc;
333 srv->pid = pid;
334 srv->flags.reserved = false;
335 srv->initStats();
336 srv->addr = hlp->addr;
337 srv->readPipe = new Comm::Connection;
338 srv->readPipe->fd = rfd;
339 srv->writePipe = new Comm::Connection;
340 srv->writePipe->fd = wfd;
341 srv->rbuf = (char *)memAllocBuf(ReadBufMinSize, &srv->rbuf_sz);
342 srv->roffset = 0;
343 srv->parent = cbdataReference(hlp);
344
345 if (hlp->datapool != NULL)
346 srv->data = hlp->datapool->alloc();
347
348 dlinkAddTail(srv, &srv->link, &hlp->servers);
349
350 if (rfd == wfd) {
351 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
352 fd_note(rfd, fd_note_buf);
353 } else {
354 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
355 fd_note(rfd, fd_note_buf);
356 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
357 fd_note(wfd, fd_note_buf);
358 }
359
360 commSetNonBlocking(rfd);
361
362 if (wfd != rfd)
363 commSetNonBlocking(wfd);
364
365 AsyncCall::Pointer closeCall = asyncCall(5,4, "helperStatefulServerFree", cbdataDialer(helperStatefulServerFree, srv));
366 comm_add_close_handler(rfd, closeCall);
367
368 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
369 CommIoCbPtrFun(helperStatefulHandleRead, srv));
370 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
371 }
372
373 hlp->last_restart = squid_curtime;
374 safe_free(shortname);
375 safe_free(procname);
376 helperStatefulKickQueue(hlp);
377 }
378
379 void
380 helper::submitRequest(Helper::Request *r)
381 {
382 helper_server *srv;
383
384 if ((srv = GetFirstAvailable(this)))
385 helperDispatch(srv, r);
386 else
387 Enqueue(this, r);
388
389 if (!queueFull()) {
390 full_time = 0;
391 } else if (!full_time) {
392 debugs(84, 3, id_name << " queue became full");
393 full_time = squid_curtime;
394 }
395 }
396
397 void
398 helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
399 {
400 if (hlp == NULL) {
401 debugs(84, 3, "helperSubmit: hlp == NULL");
402 Helper::Reply nilReply;
403 callback(data, nilReply);
404 return;
405 }
406 hlp->prepSubmit();
407 hlp->submit(buf, callback, data);
408 }
409
410 bool
411 helper::queueFull() const {
412 return stats.queue_size > static_cast<int>(childs.queue_size);
413 }
414
415 /// prepares the helper for request submission via trySubmit() or helperSubmit()
416 /// currently maintains full_time and kills Squid if the helper remains full for too long
417 void
418 helper::prepSubmit()
419 {
420 if (!queueFull())
421 full_time = 0;
422 else if (!full_time) // may happen here if reconfigure decreases capacity
423 full_time = squid_curtime;
424 else if (squid_curtime - full_time > 180)
425 fatalf("Too many queued %s requests", id_name);
426 }
427
428 bool
429 helper::trySubmit(const char *buf, HLPCB * callback, void *data)
430 {
431 prepSubmit();
432
433 if (queueFull()) {
434 debugs(84, DBG_IMPORTANT, id_name << " drops request due to a full queue");
435 return false; // request was ignored
436 }
437
438 submit(buf, callback, data); // will send or queue
439 return true; // request submitted or queued
440 }
441
442 /// dispatches or enqueues a helper requests; does not enforce queue limits
443 void
444 helper::submit(const char *buf, HLPCB * callback, void *data)
445 {
446 Helper::Request *r = new Helper::Request(callback, data, buf);
447 submitRequest(r);
448 debugs(84, DBG_DATA, Raw("buf", buf, strlen(buf)));
449 }
450
451 /// lastserver = "server last used as part of a reserved request sequence"
452 void
453 helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPCB * callback, void *data, helper_stateful_server * lastserver)
454 {
455 if (hlp == NULL) {
456 debugs(84, 3, "helperStatefulSubmit: hlp == NULL");
457 Helper::Reply nilReply;
458 callback(data, nilReply);
459 return;
460 }
461 hlp->prepSubmit();
462 hlp->submit(buf, callback, data, lastserver);
463 }
464
465 void statefulhelper::submit(const char *buf, HLPCB * callback, void *data, helper_stateful_server * lastserver)
466 {
467 Helper::Request *r = new Helper::Request(callback, data, buf);
468
469 if ((buf != NULL) && lastserver) {
470 debugs(84, 5, "StatefulSubmit with lastserver " << lastserver);
471 assert(lastserver->flags.reserved);
472 assert(!lastserver->requests.size());
473
474 debugs(84, 5, "StatefulSubmit dispatching");
475 helperStatefulDispatch(lastserver, r);
476 } else {
477 helper_stateful_server *srv;
478 if ((srv = StatefulGetFirstAvailable(this))) {
479 helperStatefulDispatch(srv, r);
480 } else
481 StatefulEnqueue(this, r);
482 }
483
484 debugs(84, DBG_DATA, "placeholder: '" << r->placeholder <<
485 "', " << Raw("buf", buf, (!buf?0:strlen(buf))));
486
487 if (!queueFull()) {
488 full_time = 0;
489 } else if (!full_time) {
490 debugs(84, 3, id_name << " queue became full");
491 full_time = squid_curtime;
492 }
493 }
494
495 /**
496 * DPW 2007-05-08
497 *
498 * helperStatefulReleaseServer tells the helper that whoever was
499 * using it no longer needs its services.
500 */
501 void
502 helperStatefulReleaseServer(helper_stateful_server * srv)
503 {
504 debugs(84, 3, HERE << "srv-" << srv->index << " flags.reserved = " << srv->flags.reserved);
505 if (!srv->flags.reserved)
506 return;
507
508 ++ srv->stats.releases;
509
510 srv->flags.reserved = false;
511
512 helperStatefulServerDone(srv);
513 }
514
515 /** return a pointer to the stateful routines data area */
516 void *
517 helperStatefulServerGetData(helper_stateful_server * srv)
518 {
519 return srv->data;
520 }
521
522 void
523 helper::packStatsInto(Packable *p, const char *label) const
524 {
525 if (label)
526 p->appendf("%s:\n", label);
527
528 p->appendf(" program: %s\n", cmdline->key);
529 p->appendf(" number active: %d of %d (%d shutting down)\n", childs.n_active, childs.n_max, (childs.n_running - childs.n_active));
530 p->appendf(" requests sent: %d\n", stats.requests);
531 p->appendf(" replies received: %d\n", stats.replies);
532 p->appendf(" requests timedout: %d\n", stats.timedout);
533 p->appendf(" queue length: %d\n", stats.queue_size);
534 p->appendf(" avg service time: %d msec\n", stats.avg_svc_time);
535 p->append("\n",1);
536 p->appendf("%7s\t%7s\t%7s\t%11s\t%11s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
537 "ID #",
538 "FD",
539 "PID",
540 "# Requests",
541 "# Replies",
542 "# Timed-out",
543 "Flags",
544 "Time",
545 "Offset",
546 "Request");
547
548 for (dlink_node *link = servers.head; link; link = link->next) {
549 HelperServerBase *srv = static_cast<HelperServerBase *>(link->data);
550 assert(srv);
551 Helper::Request *request = srv->requests.empty() ? NULL : srv->requests.front();
552 double tt = 0.001 * (request ? tvSubMsec(request->dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
553 p->appendf("%7u\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
554 srv->index.value,
555 srv->readPipe->fd,
556 srv->pid,
557 srv->stats.uses,
558 srv->stats.replies,
559 srv->stats.timedout,
560 srv->stats.pending ? 'B' : ' ',
561 srv->flags.writing ? 'W' : ' ',
562 srv->flags.closing ? 'C' : ' ',
563 srv->flags.reserved ? 'R' : ' ',
564 srv->flags.shutdown ? 'S' : ' ',
565 request && request->placeholder ? 'P' : ' ',
566 tt < 0.0 ? 0.0 : tt,
567 (int) srv->roffset,
568 request ? Format::QuoteMimeBlob(request->buf) : "(none)");
569 }
570
571 p->append("\nFlags key:\n"
572 " B\tBUSY\n"
573 " W\tWRITING\n"
574 " C\tCLOSING\n"
575 " R\tRESERVED\n"
576 " S\tSHUTDOWN PENDING\n"
577 " P\tPLACEHOLDER\n", 101);
578 }
579
580 void
581 helperShutdown(helper * hlp)
582 {
583 dlink_node *link = hlp->servers.head;
584
585 while (link) {
586 helper_server *srv;
587 srv = (helper_server *)link->data;
588 link = link->next;
589
590 if (srv->flags.shutdown) {
591 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
592 continue;
593 }
594
595 assert(hlp->childs.n_active > 0);
596 -- hlp->childs.n_active;
597 srv->flags.shutdown = true; /* request it to shut itself down */
598
599 if (srv->flags.closing) {
600 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
601 continue;
602 }
603
604 if (srv->stats.pending) {
605 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
606 continue;
607 }
608
609 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
610 /* the rest of the details is dealt with in the helperServerFree
611 * close handler
612 */
613 srv->closePipesSafely(hlp->id_name);
614 }
615 }
616
617 void
618 helperStatefulShutdown(statefulhelper * hlp)
619 {
620 dlink_node *link = hlp->servers.head;
621 helper_stateful_server *srv;
622
623 while (link) {
624 srv = (helper_stateful_server *)link->data;
625 link = link->next;
626
627 if (srv->flags.shutdown) {
628 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
629 continue;
630 }
631
632 assert(hlp->childs.n_active > 0);
633 -- hlp->childs.n_active;
634 srv->flags.shutdown = true; /* request it to shut itself down */
635
636 if (srv->stats.pending) {
637 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
638 continue;
639 }
640
641 if (srv->flags.closing) {
642 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
643 continue;
644 }
645
646 if (srv->flags.reserved) {
647 if (shutting_down) {
648 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Closing anyway.");
649 } else {
650 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Not Shutting Down Yet.");
651 continue;
652 }
653 }
654
655 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
656
657 /* the rest of the details is dealt with in the helperStatefulServerFree
658 * close handler
659 */
660 srv->closePipesSafely(hlp->id_name);
661 }
662 }
663
664 helper::~helper()
665 {
666 /* note, don't free id_name, it probably points to static memory */
667
668 // TODO: if the queue is not empty it will leak Helper::Request's
669 if (!queue.empty())
670 debugs(84, DBG_CRITICAL, "WARNING: freeing " << id_name << " helper with " << stats.queue_size << " requests queued");
671 }
672
673 /* ====================================================================== */
674 /* LOCAL FUNCTIONS */
675 /* ====================================================================== */
676
677 static void
678 helperServerFree(helper_server *srv)
679 {
680 helper *hlp = srv->parent;
681 int concurrency = hlp->childs.concurrency;
682
683 if (!concurrency)
684 concurrency = 1;
685
686 if (srv->rbuf) {
687 memFreeBuf(srv->rbuf_sz, srv->rbuf);
688 srv->rbuf = NULL;
689 }
690
691 srv->wqueue->clean();
692 delete srv->wqueue;
693
694 if (srv->writebuf) {
695 srv->writebuf->clean();
696 delete srv->writebuf;
697 srv->writebuf = NULL;
698 }
699
700 if (Comm::IsConnOpen(srv->writePipe))
701 srv->closeWritePipeSafely(hlp->id_name);
702
703 dlinkDelete(&srv->link, &hlp->servers);
704
705 assert(hlp->childs.n_running > 0);
706 -- hlp->childs.n_running;
707
708 if (!srv->flags.shutdown) {
709 assert(hlp->childs.n_active > 0);
710 -- hlp->childs.n_active;
711 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index << " exited");
712
713 if (hlp->childs.needNew() > 0) {
714 debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
715
716 if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30) {
717 if (srv->stats.replies < 1)
718 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
719 else
720 debugs(80, DBG_CRITICAL, "ERROR: The " << hlp->id_name << " helpers are crashing too rapidly, need help!");
721 }
722
723 debugs(80, DBG_IMPORTANT, "Starting new helpers");
724 helperOpenServers(hlp);
725 }
726 }
727
728 while (!srv->requests.empty()) {
729 // XXX: re-schedule these on another helper?
730 Helper::Request *r = srv->requests.front();
731 srv->requests.pop_front();
732 void *cbdata;
733
734 if (cbdataReferenceValidDone(r->data, &cbdata)) {
735 Helper::Reply nilReply;
736 r->callback(cbdata, nilReply);
737 }
738
739 delete r;
740 }
741 srv->requestsIndex.clear();
742
743 cbdataReferenceDone(srv->parent);
744 delete srv;
745 }
746
747 static void
748 helperStatefulServerFree(helper_stateful_server *srv)
749 {
750 statefulhelper *hlp = srv->parent;
751
752 if (srv->rbuf) {
753 memFreeBuf(srv->rbuf_sz, srv->rbuf);
754 srv->rbuf = NULL;
755 }
756
757 #if 0
758 srv->wqueue->clean();
759
760 delete srv->wqueue;
761
762 #endif
763
764 /* TODO: walk the local queue of requests and carry them all out */
765 if (Comm::IsConnOpen(srv->writePipe))
766 srv->closeWritePipeSafely(hlp->id_name);
767
768 dlinkDelete(&srv->link, &hlp->servers);
769
770 assert(hlp->childs.n_running > 0);
771 -- hlp->childs.n_running;
772
773 if (!srv->flags.shutdown) {
774 assert( hlp->childs.n_active > 0);
775 -- hlp->childs.n_active;
776 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index << " exited");
777
778 if (hlp->childs.needNew() > 0) {
779 debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
780
781 if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30) {
782 if (srv->stats.replies < 1)
783 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
784 else
785 debugs(80, DBG_CRITICAL, "ERROR: The " << hlp->id_name << " helpers are crashing too rapidly, need help!");
786 }
787
788 debugs(80, DBG_IMPORTANT, "Starting new helpers");
789 helperStatefulOpenServers(hlp);
790 }
791 }
792
793 while (!srv->requests.empty()) {
794 // XXX: re-schedule these on another helper?
795 Helper::Request *r = srv->requests.front();
796 srv->requests.pop_front();
797 void *cbdata;
798
799 if (cbdataReferenceValidDone(r->data, &cbdata)) {
800 Helper::Reply nilReply;
801 r->callback(cbdata, nilReply);
802 }
803
804 delete r;
805 }
806
807 if (srv->data != NULL)
808 hlp->datapool->freeOne(srv->data);
809
810 cbdataReferenceDone(srv->parent);
811
812 delete srv;
813 }
814
815 /// Calls back with a pointer to the buffer with the helper output
816 static void
817 helperReturnBuffer(int request_number, helper_server * srv, helper * hlp, char * msg, char * msg_end)
818 {
819 Helper::Request *r = NULL;
820 helper_server::RequestIndex::iterator it;
821 if (hlp->childs.concurrency) {
822 // If concurency supported retrieve request from ID
823 it = srv->requestsIndex.find(request_number);
824 if (it != srv->requestsIndex.end()) {
825 r = *(it->second);
826 srv->requests.erase(it->second);
827 srv->requestsIndex.erase(it);
828 }
829 } else if(!srv->requests.empty()) {
830 // Else get the first request from queue, if any
831 r = srv->requests.front();
832 srv->requests.pop_front();
833 }
834
835 if (r) {
836 HLPCB *callback = r->callback;
837 r->callback = NULL;
838
839 void *cbdata = NULL;
840 bool retry = false;
841 if (cbdataReferenceValidDone(r->data, &cbdata)) {
842 Helper::Reply response(msg, (msg_end-msg));
843 if (response.result == Helper::BrokenHelper && r->retries < MAX_RETRIES) {
844 debugs(84, DBG_IMPORTANT, "ERROR: helper: " << response << ", attempt #" << (r->retries + 1) << " of 2");
845 retry = true;
846 } else
847 callback(cbdata, response);
848 }
849
850 -- srv->stats.pending;
851 ++ srv->stats.replies;
852
853 ++ hlp->stats.replies;
854
855 srv->answer_time = current_time;
856
857 srv->dispatch_time = r->dispatch_time;
858
859 hlp->stats.avg_svc_time =
860 Math::intAverage(hlp->stats.avg_svc_time,
861 tvSubMsec(r->dispatch_time, current_time),
862 hlp->stats.replies, REDIRECT_AV_FACTOR);
863
864 if (retry) {
865 ++r->retries;
866 hlp->submitRequest(r);
867 } else
868 delete r;
869 } else if (srv->stats.timedout) {
870 debugs(84, 3, "Timedout reply received for request-ID: " << request_number << " , ignore");
871 } else {
872 debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected reply on channel " <<
873 request_number << " from " << hlp->id_name << " #" << srv->index <<
874 " '" << srv->rbuf << "'");
875 }
876
877 if (hlp->timeout && hlp->childs.concurrency)
878 srv->checkForTimedOutRequests(hlp->retryTimedOut);
879
880 if (!srv->flags.shutdown) {
881 helperKickQueue(hlp);
882 } else if (!srv->flags.closing && !srv->stats.pending) {
883 srv->flags.closing=true;
884 srv->writePipe->close();
885 }
886 }
887
888 static void
889 helperHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
890 {
891 char *t = NULL;
892 helper_server *srv = (helper_server *)data;
893 helper *hlp = srv->parent;
894 assert(cbdataReferenceValid(data));
895
896 /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
897
898 if (flag == Comm::ERR_CLOSING) {
899 return;
900 }
901
902 assert(conn->fd == srv->readPipe->fd);
903
904 debugs(84, 5, "helperHandleRead: " << len << " bytes from " << hlp->id_name << " #" << srv->index);
905
906 if (flag != Comm::OK || len == 0) {
907 srv->closePipesSafely(hlp->id_name);
908 return;
909 }
910
911 srv->roffset += len;
912 srv->rbuf[srv->roffset] = '\0';
913 debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
914
915 if (!srv->stats.pending && !srv->stats.timedout) {
916 /* someone spoke without being spoken to */
917 debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected read from " <<
918 hlp->id_name << " #" << srv->index << ", " << (int)len <<
919 " bytes '" << srv->rbuf << "'");
920
921 srv->roffset = 0;
922 srv->rbuf[0] = '\0';
923 }
924
925 while ((t = strchr(srv->rbuf, hlp->eom))) {
926 /* end of reply found */
927 char *msg = srv->rbuf;
928 int i = 0;
929 int skip = 1;
930 debugs(84, 3, "helperHandleRead: end of reply found");
931
932 if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n') {
933 *t = '\0';
934 // rewind to the \r octet which is the real terminal now
935 // and remember that we have to skip forward 2 places now.
936 skip = 2;
937 --t;
938 }
939
940 *t = '\0';
941
942 if (hlp->childs.concurrency) {
943 i = strtol(msg, &msg, 10);
944
945 while (*msg && xisspace(*msg))
946 ++msg;
947 }
948
949 helperReturnBuffer(i, srv, hlp, msg, t);
950 srv->roffset -= (t - srv->rbuf) + skip;
951 memmove(srv->rbuf, t + skip, srv->roffset);
952 srv->rbuf[srv->roffset] = '\0';
953 }
954
955 if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
956 int spaceSize = srv->rbuf_sz - srv->roffset - 1;
957 assert(spaceSize >= 0);
958
959 // grow the input buffer if needed and possible
960 if (!spaceSize && srv->rbuf_sz + 4096 <= ReadBufMaxSize) {
961 srv->rbuf = (char *)memReallocBuf(srv->rbuf, srv->rbuf_sz + 4096, &srv->rbuf_sz);
962 debugs(84, 3, HERE << "Grew read buffer to " << srv->rbuf_sz);
963 spaceSize = srv->rbuf_sz - srv->roffset - 1;
964 assert(spaceSize >= 0);
965 }
966
967 // quit reading if there is no space left
968 if (!spaceSize) {
969 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
970 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
971 "Squid input buffer: " << hlp->id_name << " #" << srv->index);
972 srv->closePipesSafely(hlp->id_name);
973 return;
974 }
975
976 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
977 CommIoCbPtrFun(helperHandleRead, srv));
978 comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
979 }
980 }
981
982 static void
983 helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
984 {
985 char *t = NULL;
986 helper_stateful_server *srv = (helper_stateful_server *)data;
987 statefulhelper *hlp = srv->parent;
988 assert(cbdataReferenceValid(data));
989
990 /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
991
992 if (flag == Comm::ERR_CLOSING) {
993 return;
994 }
995
996 assert(conn->fd == srv->readPipe->fd);
997
998 debugs(84, 5, "helperStatefulHandleRead: " << len << " bytes from " <<
999 hlp->id_name << " #" << srv->index);
1000
1001 if (flag != Comm::OK || len == 0) {
1002 srv->closePipesSafely(hlp->id_name);
1003 return;
1004 }
1005
1006 srv->roffset += len;
1007 srv->rbuf[srv->roffset] = '\0';
1008 Helper::Request *r = srv->requests.front();
1009 debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
1010
1011 if (r == NULL) {
1012 /* someone spoke without being spoken to */
1013 debugs(84, DBG_IMPORTANT, "helperStatefulHandleRead: unexpected read from " <<
1014 hlp->id_name << " #" << srv->index << ", " << (int)len <<
1015 " bytes '" << srv->rbuf << "'");
1016
1017 srv->roffset = 0;
1018 }
1019
1020 if ((t = strchr(srv->rbuf, hlp->eom))) {
1021 /* end of reply found */
1022 srv->requests.pop_front(); // we already have it in 'r'
1023 int called = 1;
1024 int skip = 1;
1025 debugs(84, 3, "helperStatefulHandleRead: end of reply found");
1026
1027 if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n') {
1028 *t = '\0';
1029 // rewind to the \r octet which is the real terminal now
1030 // and remember that we have to skip forward 2 places now.
1031 skip = 2;
1032 --t;
1033 }
1034
1035 *t = '\0';
1036
1037 if (r && cbdataReferenceValid(r->data)) {
1038 Helper::Reply res(srv->rbuf, (t - srv->rbuf));
1039 res.whichServer = srv;
1040 r->callback(r->data, res);
1041 } else {
1042 debugs(84, DBG_IMPORTANT, "StatefulHandleRead: no callback data registered");
1043 called = 0;
1044 }
1045 // only skip off the \0's _after_ passing its location in Helper::Reply above
1046 t += skip;
1047
1048 /**
1049 * BUG: the below assumes that only one response per read() was received and discards any octets remaining.
1050 * Doing this prohibits concurrency support with multiple replies per read().
1051 * TODO: check that read() setup on these buffers pays attention to roffest!=0
1052 * TODO: check that replies bigger than the buffer are discarded and do not to affect future replies
1053 */
1054 srv->roffset = 0;
1055 delete r;
1056
1057 -- srv->stats.pending;
1058 ++ srv->stats.replies;
1059
1060 ++ hlp->stats.replies;
1061 srv->answer_time = current_time;
1062 hlp->stats.avg_svc_time =
1063 Math::intAverage(hlp->stats.avg_svc_time,
1064 tvSubMsec(srv->dispatch_time, current_time),
1065 hlp->stats.replies, REDIRECT_AV_FACTOR);
1066
1067 if (called)
1068 helperStatefulServerDone(srv);
1069 else
1070 helperStatefulReleaseServer(srv);
1071 }
1072
1073 if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
1074 int spaceSize = srv->rbuf_sz - srv->roffset - 1;
1075 assert(spaceSize >= 0);
1076
1077 // grow the input buffer if needed and possible
1078 if (!spaceSize && srv->rbuf_sz + 4096 <= ReadBufMaxSize) {
1079 srv->rbuf = (char *)memReallocBuf(srv->rbuf, srv->rbuf_sz + 4096, &srv->rbuf_sz);
1080 debugs(84, 3, HERE << "Grew read buffer to " << srv->rbuf_sz);
1081 spaceSize = srv->rbuf_sz - srv->roffset - 1;
1082 assert(spaceSize >= 0);
1083 }
1084
1085 // quit reading if there is no space left
1086 if (!spaceSize) {
1087 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
1088 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
1089 "Squid input buffer: " << hlp->id_name << " #" << srv->index);
1090 srv->closePipesSafely(hlp->id_name);
1091 return;
1092 }
1093
1094 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
1095 CommIoCbPtrFun(helperStatefulHandleRead, srv));
1096 comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
1097 }
1098 }
1099
1100 /// Handles a request when all running helpers, if any, are busy.
1101 static void
1102 Enqueue(helper * hlp, Helper::Request * r)
1103 {
1104 hlp->queue.push(r);
1105 ++ hlp->stats.queue_size;
1106
1107 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1108 if (hlp->childs.needNew() > 0) {
1109 debugs(84, DBG_CRITICAL, "Starting new " << hlp->id_name << " helpers...");
1110 helperOpenServers(hlp);
1111 return;
1112 }
1113
1114 if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
1115 return;
1116
1117 if (squid_curtime - hlp->last_queue_warn < 600)
1118 return;
1119
1120 if (shutting_down || reconfiguring)
1121 return;
1122
1123 hlp->last_queue_warn = squid_curtime;
1124
1125 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1126 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1127 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1128 }
1129
1130 static void
1131 StatefulEnqueue(statefulhelper * hlp, Helper::Request * r)
1132 {
1133 hlp->queue.push(r);
1134 ++ hlp->stats.queue_size;
1135
1136 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1137 if (hlp->childs.needNew() > 0) {
1138 debugs(84, DBG_CRITICAL, "Starting new " << hlp->id_name << " helpers...");
1139 helperStatefulOpenServers(hlp);
1140 return;
1141 }
1142
1143 if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
1144 return;
1145
1146 if (squid_curtime - hlp->last_queue_warn < 600)
1147 return;
1148
1149 if (shutting_down || reconfiguring)
1150 return;
1151
1152 hlp->last_queue_warn = squid_curtime;
1153
1154 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1155 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1156 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1157 }
1158
1159 Helper::Request *
1160 helper::nextRequest()
1161 {
1162 if (queue.empty())
1163 return nullptr;
1164
1165 auto *r = queue.front();
1166 queue.pop();
1167 --stats.queue_size;
1168 return r;
1169 }
1170
1171 static helper_server *
1172 GetFirstAvailable(helper * hlp)
1173 {
1174 dlink_node *n;
1175 helper_server *srv;
1176 helper_server *selected = NULL;
1177 debugs(84, 5, "GetFirstAvailable: Running servers " << hlp->childs.n_running);
1178
1179 if (hlp->childs.n_running == 0)
1180 return NULL;
1181
1182 /* Find "least" loaded helper (approx) */
1183 for (n = hlp->servers.head; n != NULL; n = n->next) {
1184 srv = (helper_server *)n->data;
1185
1186 if (selected && selected->stats.pending <= srv->stats.pending)
1187 continue;
1188
1189 if (srv->flags.shutdown)
1190 continue;
1191
1192 if (!srv->stats.pending)
1193 return srv;
1194
1195 if (selected) {
1196 selected = srv;
1197 break;
1198 }
1199
1200 selected = srv;
1201 }
1202
1203 /* Check for overload */
1204 if (!selected) {
1205 debugs(84, 5, "GetFirstAvailable: None available.");
1206 return NULL;
1207 }
1208
1209 if (selected->stats.pending >= (hlp->childs.concurrency ? hlp->childs.concurrency : 1)) {
1210 debugs(84, 3, "GetFirstAvailable: Least-loaded helper is overloaded!");
1211 return NULL;
1212 }
1213
1214 debugs(84, 5, "GetFirstAvailable: returning srv-" << selected->index);
1215 return selected;
1216 }
1217
1218 static helper_stateful_server *
1219 StatefulGetFirstAvailable(statefulhelper * hlp)
1220 {
1221 dlink_node *n;
1222 helper_stateful_server *srv = NULL;
1223 debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp->childs.n_running);
1224
1225 if (hlp->childs.n_running == 0)
1226 return NULL;
1227
1228 for (n = hlp->servers.head; n != NULL; n = n->next) {
1229 srv = (helper_stateful_server *)n->data;
1230
1231 if (srv->stats.pending)
1232 continue;
1233
1234 if (srv->flags.reserved)
1235 continue;
1236
1237 if (srv->flags.shutdown)
1238 continue;
1239
1240 debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv->index);
1241 return srv;
1242 }
1243
1244 debugs(84, 5, "StatefulGetFirstAvailable: None available.");
1245 return NULL;
1246 }
1247
1248 static void
1249 helperDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag flag, int, void *data)
1250 {
1251 helper_server *srv = (helper_server *)data;
1252
1253 srv->writebuf->clean();
1254 delete srv->writebuf;
1255 srv->writebuf = NULL;
1256 srv->flags.writing = false;
1257
1258 if (flag != Comm::OK) {
1259 /* Helper server has crashed */
1260 debugs(84, DBG_CRITICAL, "helperDispatch: Helper " << srv->parent->id_name << " #" << srv->index << " has crashed");
1261 return;
1262 }
1263
1264 if (!srv->wqueue->isNull()) {
1265 srv->writebuf = srv->wqueue;
1266 srv->wqueue = new MemBuf;
1267 srv->flags.writing = true;
1268 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1269 CommIoCbPtrFun(helperDispatchWriteDone, srv));
1270 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
1271 }
1272 }
1273
1274 static void
1275 helperDispatch(helper_server * srv, Helper::Request * r)
1276 {
1277 helper *hlp = srv->parent;
1278 const uint64_t reqId = ++srv->nextRequestId;
1279
1280 if (!cbdataReferenceValid(r->data)) {
1281 debugs(84, DBG_IMPORTANT, "helperDispatch: invalid callback data");
1282 delete r;
1283 return;
1284 }
1285
1286 r->Id = reqId;
1287 helper_server::Requests::iterator it = srv->requests.insert(srv->requests.end(), r);
1288 r->dispatch_time = current_time;
1289
1290 if (srv->wqueue->isNull())
1291 srv->wqueue->init();
1292
1293 if (hlp->childs.concurrency) {
1294 srv->requestsIndex.insert(helper_server::RequestIndex::value_type(reqId, it));
1295 assert(srv->requestsIndex.size() == srv->requests.size());
1296 srv->wqueue->appendf("%" PRIu64 " %s", reqId, r->buf);
1297 } else
1298 srv->wqueue->append(r->buf, strlen(r->buf));
1299
1300 if (!srv->flags.writing) {
1301 assert(NULL == srv->writebuf);
1302 srv->writebuf = srv->wqueue;
1303 srv->wqueue = new MemBuf;
1304 srv->flags.writing = true;
1305 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1306 CommIoCbPtrFun(helperDispatchWriteDone, srv));
1307 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
1308 }
1309
1310 debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index << ", " << strlen(r->buf) << " bytes");
1311
1312 ++ srv->stats.uses;
1313 ++ srv->stats.pending;
1314 ++ hlp->stats.requests;
1315 }
1316
1317 static void
1318 helperStatefulDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag, int, void *)
1319 {}
1320
1321 static void
1322 helperStatefulDispatch(helper_stateful_server * srv, Helper::Request * r)
1323 {
1324 statefulhelper *hlp = srv->parent;
1325
1326 if (!cbdataReferenceValid(r->data)) {
1327 debugs(84, DBG_IMPORTANT, "helperStatefulDispatch: invalid callback data");
1328 delete r;
1329 helperStatefulReleaseServer(srv);
1330 return;
1331 }
1332
1333 debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index);
1334
1335 if (r->placeholder == 1) {
1336 /* a callback is needed before this request can _use_ a helper. */
1337 /* we don't care about releasing this helper. The request NEVER
1338 * gets to the helper. So we throw away the return code */
1339 Helper::Reply nilReply;
1340 nilReply.whichServer = srv;
1341 r->callback(r->data, nilReply);
1342 /* throw away the placeholder */
1343 delete r;
1344 /* and push the queue. Note that the callback may have submitted a new
1345 * request to the helper which is why we test for the request */
1346
1347 if (!srv->requests.size())
1348 helperStatefulServerDone(srv);
1349
1350 return;
1351 }
1352
1353 srv->flags.reserved = true;
1354 srv->requests.push_back(r);
1355 srv->dispatch_time = current_time;
1356 AsyncCall::Pointer call = commCbCall(5,5, "helperStatefulDispatchWriteDone",
1357 CommIoCbPtrFun(helperStatefulDispatchWriteDone, hlp));
1358 Comm::Write(srv->writePipe, r->buf, strlen(r->buf), call, NULL);
1359 debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
1360 hlp->id_name << " #" << srv->index << ", " <<
1361 (int) strlen(r->buf) << " bytes");
1362
1363 ++ srv->stats.uses;
1364 ++ srv->stats.pending;
1365 ++ hlp->stats.requests;
1366 }
1367
1368 static void
1369 helperKickQueue(helper * hlp)
1370 {
1371 Helper::Request *r;
1372 helper_server *srv;
1373
1374 while ((srv = GetFirstAvailable(hlp)) && (r = hlp->nextRequest()))
1375 helperDispatch(srv, r);
1376 }
1377
1378 static void
1379 helperStatefulKickQueue(statefulhelper * hlp)
1380 {
1381 Helper::Request *r;
1382 helper_stateful_server *srv;
1383
1384 while ((srv = StatefulGetFirstAvailable(hlp)) && (r = hlp->nextRequest()))
1385 helperStatefulDispatch(srv, r);
1386 }
1387
1388 static void
1389 helperStatefulServerDone(helper_stateful_server * srv)
1390 {
1391 if (!srv->flags.shutdown) {
1392 helperStatefulKickQueue(srv->parent);
1393 } else if (!srv->flags.closing && !srv->flags.reserved && !srv->stats.pending) {
1394 srv->closeWritePipeSafely(srv->parent->id_name);
1395 return;
1396 }
1397 }
1398
1399 void
1400 helper_server::checkForTimedOutRequests(bool const retry)
1401 {
1402 assert(parent->childs.concurrency);
1403 while(!requests.empty() && requests.front()->timedOut(parent->timeout)) {
1404 Helper::Request *r = requests.front();
1405 RequestIndex::iterator it;
1406 it = requestsIndex.find(r->Id);
1407 assert(it != requestsIndex.end());
1408 requestsIndex.erase(it);
1409 requests.pop_front();
1410 debugs(84, 2, "Request " << r->Id << " timed-out, remove it from queue");
1411 void *cbdata;
1412 bool retried = false;
1413 if (retry && r->retries < MAX_RETRIES && cbdataReferenceValid(r->data)) {
1414 debugs(84, 2, "Retry request " << r->Id);
1415 ++r->retries;
1416 parent->submitRequest(r);
1417 retried = true;
1418 } else if (cbdataReferenceValidDone(r->data, &cbdata)) {
1419 if (!parent->onTimedOutResponse.isEmpty()) {
1420 // Helper::Reply needs a non const buffer
1421 char *replyMsg = xstrdup(parent->onTimedOutResponse.c_str());
1422 r->callback(cbdata, Helper::Reply(replyMsg, strlen(replyMsg)));
1423 xfree(replyMsg);
1424 } else
1425 r->callback(cbdata, Helper::Reply(Helper::TimedOut));
1426 }
1427 --stats.pending;
1428 ++stats.timedout;
1429 ++parent->stats.timedout;
1430 if (!retried)
1431 delete r;
1432 }
1433 }
1434
1435 void
1436 helper_server::requestTimeout(const CommTimeoutCbParams &io)
1437 {
1438 debugs(26, 3, HERE << io.conn);
1439 helper_server *srv = static_cast<helper_server *>(io.data);
1440
1441 if (!cbdataReferenceValid(srv))
1442 return;
1443
1444 srv->checkForTimedOutRequests(srv->parent->retryTimedOut);
1445
1446 debugs(84, 3, HERE << io.conn << " establish new helper_server::requestTimeout");
1447 AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "helper_server::requestTimeout",
1448 CommTimeoutCbPtrFun(helper_server::requestTimeout, srv));
1449
1450 const int timeSpent = srv->requests.empty() ? 0 : (squid_curtime - srv->requests.front()->dispatch_time.tv_sec);
1451 const int timeLeft = max(1, (static_cast<int>(srv->parent->timeout) - timeSpent));
1452
1453 commSetConnTimeout(io.conn, timeLeft, timeoutCall);
1454 }
1455