]> git.ipfire.org Git - thirdparty/squid.git/blob - src/helper.cc
SourceFormat Enforcement
[thirdparty/squid.git] / src / helper.cc
1 /*
2 * Copyright (C) 1996-2016 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 84 Helper process maintenance */
10
11 #include "squid.h"
12 #include "base/AsyncCbdataCalls.h"
13 #include "base/Packable.h"
14 #include "comm.h"
15 #include "comm/Connection.h"
16 #include "comm/Read.h"
17 #include "comm/Write.h"
18 #include "fd.h"
19 #include "fde.h"
20 #include "format/Quoting.h"
21 #include "helper.h"
22 #include "helper/Reply.h"
23 #include "helper/Request.h"
24 #include "MemBuf.h"
25 #include "SquidConfig.h"
26 #include "SquidIpc.h"
27 #include "SquidMath.h"
28 #include "SquidTime.h"
29 #include "Store.h"
30 #include "wordlist.h"
31
32 // helper_stateful_server::data uses explicit alloc()/freeOne() */
33 #include "mem/Pool.h"
34
35 #define HELPER_MAX_ARGS 64
36
37 /// The maximum allowed request retries.
38 #define MAX_RETRIES 2
39
40 /** Initial Squid input buffer size. Helper responses may exceed this, and
41 * Squid will grow the input buffer as needed, up to ReadBufMaxSize.
42 */
43 const size_t ReadBufMinSize(4*1024);
44
45 /** Maximum safe size of a helper-to-Squid response message plus one.
46 * Squid will warn and close the stream if a helper sends a too-big response.
47 * ssl_crtd helper is known to produce responses of at least 10KB in size.
48 * Some undocumented helpers are known to produce responses exceeding 8KB.
49 */
50 const size_t ReadBufMaxSize(32*1024);
51
52 static IOCB helperHandleRead;
53 static IOCB helperStatefulHandleRead;
54 static void helperServerFree(helper_server *srv);
55 static void helperStatefulServerFree(helper_stateful_server *srv);
56 static void Enqueue(helper * hlp, Helper::Request *);
57 static Helper::Request *Dequeue(helper * hlp);
58 static Helper::Request *StatefulDequeue(statefulhelper * hlp);
59 static helper_server *GetFirstAvailable(helper * hlp);
60 static helper_stateful_server *StatefulGetFirstAvailable(statefulhelper * hlp);
61 static void helperDispatch(helper_server * srv, Helper::Request * r);
62 static void helperStatefulDispatch(helper_stateful_server * srv, Helper::Request * r);
63 static void helperKickQueue(helper * hlp);
64 static void helperStatefulKickQueue(statefulhelper * hlp);
65 static void helperStatefulServerDone(helper_stateful_server * srv);
66 static void StatefulEnqueue(statefulhelper * hlp, Helper::Request * r);
67
68 CBDATA_CLASS_INIT(helper);
69 CBDATA_CLASS_INIT(helper_server);
70 CBDATA_CLASS_INIT(statefulhelper);
71 CBDATA_CLASS_INIT(helper_stateful_server);
72
73 InstanceIdDefinitions(HelperServerBase, "Hlpr");
74
75 void
76 HelperServerBase::initStats()
77 {
78 stats.uses=0;
79 stats.replies=0;
80 stats.pending=0;
81 stats.releases=0;
82 stats.timedout = 0;
83 }
84
85 void
86 HelperServerBase::closePipesSafely(const char *id_name)
87 {
88 #if _SQUID_WINDOWS_
89 shutdown(writePipe->fd, SD_BOTH);
90 #endif
91
92 flags.closing = true;
93 if (readPipe->fd == writePipe->fd)
94 readPipe->fd = -1;
95 else
96 readPipe->close();
97 writePipe->close();
98
99 #if _SQUID_WINDOWS_
100 if (hIpc) {
101 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
102 getCurrentTime();
103 debugs(84, DBG_IMPORTANT, "WARNING: " << id_name <<
104 " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
105 }
106 CloseHandle(hIpc);
107 }
108 #endif
109 }
110
111 void
112 HelperServerBase::closeWritePipeSafely(const char *id_name)
113 {
114 #if _SQUID_WINDOWS_
115 shutdown(writePipe->fd, (readPipe->fd == writePipe->fd ? SD_BOTH : SD_SEND));
116 #endif
117
118 flags.closing = true;
119 if (readPipe->fd == writePipe->fd)
120 readPipe->fd = -1;
121 writePipe->close();
122
123 #if _SQUID_WINDOWS_
124 if (hIpc) {
125 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
126 getCurrentTime();
127 debugs(84, DBG_IMPORTANT, "WARNING: " << id_name <<
128 " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
129 }
130 CloseHandle(hIpc);
131 }
132 #endif
133 }
134
135 void
136 helperOpenServers(helper * hlp)
137 {
138 char *s;
139 char *progname;
140 char *shortname;
141 char *procname;
142 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
143 char fd_note_buf[FD_DESC_SZ];
144 helper_server *srv;
145 int nargs = 0;
146 int k;
147 pid_t pid;
148 int rfd;
149 int wfd;
150 void * hIpc;
151 wordlist *w;
152
153 if (hlp->cmdline == NULL)
154 return;
155
156 progname = hlp->cmdline->key;
157
158 if ((s = strrchr(progname, '/')))
159 shortname = xstrdup(s + 1);
160 else
161 shortname = xstrdup(progname);
162
163 /* figure out how many new child are actually needed. */
164 int need_new = hlp->childs.needNew();
165
166 debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
167
168 if (need_new < 1) {
169 debugs(84, DBG_IMPORTANT, "helperOpenServers: No '" << shortname << "' processes needed.");
170 }
171
172 procname = (char *)xmalloc(strlen(shortname) + 3);
173
174 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
175
176 args[nargs] = procname;
177 ++nargs;
178
179 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
180 args[nargs] = w->key;
181 ++nargs;
182 }
183
184 args[nargs] = NULL;
185 ++nargs;
186
187 assert(nargs <= HELPER_MAX_ARGS);
188
189 for (k = 0; k < need_new; ++k) {
190 getCurrentTime();
191 rfd = wfd = -1;
192 pid = ipcCreate(hlp->ipc_type,
193 progname,
194 args,
195 shortname,
196 hlp->addr,
197 &rfd,
198 &wfd,
199 &hIpc);
200
201 if (pid < 0) {
202 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
203 continue;
204 }
205
206 ++ hlp->childs.n_running;
207 ++ hlp->childs.n_active;
208 srv = new helper_server;
209 srv->hIpc = hIpc;
210 srv->pid = pid;
211 srv->initStats();
212 srv->addr = hlp->addr;
213 srv->readPipe = new Comm::Connection;
214 srv->readPipe->fd = rfd;
215 srv->writePipe = new Comm::Connection;
216 srv->writePipe->fd = wfd;
217 srv->rbuf = (char *)memAllocBuf(ReadBufMinSize, &srv->rbuf_sz);
218 srv->wqueue = new MemBuf;
219 srv->roffset = 0;
220 srv->nextRequestId = 0;
221 srv->parent = cbdataReference(hlp);
222 dlinkAddTail(srv, &srv->link, &hlp->servers);
223
224 if (rfd == wfd) {
225 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
226 fd_note(rfd, fd_note_buf);
227 } else {
228 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
229 fd_note(rfd, fd_note_buf);
230 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
231 fd_note(wfd, fd_note_buf);
232 }
233
234 commSetNonBlocking(rfd);
235
236 if (wfd != rfd)
237 commSetNonBlocking(wfd);
238
239 AsyncCall::Pointer closeCall = asyncCall(5,4, "helperServerFree", cbdataDialer(helperServerFree, srv));
240 comm_add_close_handler(rfd, closeCall);
241
242 if (hlp->timeout && hlp->childs.concurrency) {
243 AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "helper_server::requestTimeout",
244 CommTimeoutCbPtrFun(helper_server::requestTimeout, srv));
245 commSetConnTimeout(srv->readPipe, hlp->timeout, timeoutCall);
246 }
247
248 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
249 CommIoCbPtrFun(helperHandleRead, srv));
250 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
251 }
252
253 hlp->last_restart = squid_curtime;
254 safe_free(shortname);
255 safe_free(procname);
256 helperKickQueue(hlp);
257 }
258
259 /**
260 * DPW 2007-05-08
261 *
262 * helperStatefulOpenServers: create the stateful child helper processes
263 */
264 void
265 helperStatefulOpenServers(statefulhelper * hlp)
266 {
267 char *shortname;
268 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
269 char fd_note_buf[FD_DESC_SZ];
270 int nargs = 0;
271
272 if (hlp->cmdline == NULL)
273 return;
274
275 if (hlp->childs.concurrency)
276 debugs(84, DBG_CRITICAL, "ERROR: concurrency= is not yet supported for stateful helpers ('" << hlp->cmdline << "')");
277
278 char *progname = hlp->cmdline->key;
279
280 char *s;
281 if ((s = strrchr(progname, '/')))
282 shortname = xstrdup(s + 1);
283 else
284 shortname = xstrdup(progname);
285
286 /* figure out haw mant new helpers are needed. */
287 int need_new = hlp->childs.needNew();
288
289 debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
290
291 if (need_new < 1) {
292 debugs(84, DBG_IMPORTANT, "helperStatefulOpenServers: No '" << shortname << "' processes needed.");
293 }
294
295 char *procname = (char *)xmalloc(strlen(shortname) + 3);
296
297 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
298
299 args[nargs] = procname;
300 ++nargs;
301
302 for (wordlist *w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
303 args[nargs] = w->key;
304 ++nargs;
305 }
306
307 args[nargs] = NULL;
308 ++nargs;
309
310 assert(nargs <= HELPER_MAX_ARGS);
311
312 for (int k = 0; k < need_new; ++k) {
313 getCurrentTime();
314 int rfd = -1;
315 int wfd = -1;
316 void * hIpc;
317 pid_t pid = ipcCreate(hlp->ipc_type,
318 progname,
319 args,
320 shortname,
321 hlp->addr,
322 &rfd,
323 &wfd,
324 &hIpc);
325
326 if (pid < 0) {
327 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
328 continue;
329 }
330
331 ++ hlp->childs.n_running;
332 ++ hlp->childs.n_active;
333 helper_stateful_server *srv = new helper_stateful_server;
334 srv->hIpc = hIpc;
335 srv->pid = pid;
336 srv->flags.reserved = false;
337 srv->initStats();
338 srv->addr = hlp->addr;
339 srv->readPipe = new Comm::Connection;
340 srv->readPipe->fd = rfd;
341 srv->writePipe = new Comm::Connection;
342 srv->writePipe->fd = wfd;
343 srv->rbuf = (char *)memAllocBuf(ReadBufMinSize, &srv->rbuf_sz);
344 srv->roffset = 0;
345 srv->parent = cbdataReference(hlp);
346
347 if (hlp->datapool != NULL)
348 srv->data = hlp->datapool->alloc();
349
350 dlinkAddTail(srv, &srv->link, &hlp->servers);
351
352 if (rfd == wfd) {
353 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
354 fd_note(rfd, fd_note_buf);
355 } else {
356 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
357 fd_note(rfd, fd_note_buf);
358 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
359 fd_note(wfd, fd_note_buf);
360 }
361
362 commSetNonBlocking(rfd);
363
364 if (wfd != rfd)
365 commSetNonBlocking(wfd);
366
367 AsyncCall::Pointer closeCall = asyncCall(5,4, "helperStatefulServerFree", cbdataDialer(helperStatefulServerFree, srv));
368 comm_add_close_handler(rfd, closeCall);
369
370 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
371 CommIoCbPtrFun(helperStatefulHandleRead, srv));
372 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
373 }
374
375 hlp->last_restart = squid_curtime;
376 safe_free(shortname);
377 safe_free(procname);
378 helperStatefulKickQueue(hlp);
379 }
380
381 void
382 helper::submitRequest(Helper::Request *r)
383 {
384 helper_server *srv;
385
386 if ((srv = GetFirstAvailable(this)))
387 helperDispatch(srv, r);
388 else
389 Enqueue(this, r);
390
391 if (!queueFull()) {
392 full_time = 0;
393 } else if (!full_time) {
394 debugs(84, 3, id_name << " queue became full");
395 full_time = squid_curtime;
396 }
397 }
398
399 void
400 helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
401 {
402 if (hlp == NULL) {
403 debugs(84, 3, "helperSubmit: hlp == NULL");
404 Helper::Reply nilReply;
405 callback(data, nilReply);
406 return;
407 }
408 hlp->prepSubmit();
409 hlp->submit(buf, callback, data);
410 }
411
412 bool
413 helper::queueFull() const {
414 return stats.queue_size > static_cast<int>(childs.queue_size);
415 }
416
417 /// prepares the helper for request submission via trySubmit() or helperSubmit()
418 /// currently maintains full_time and kills Squid if the helper remains full for too long
419 void
420 helper::prepSubmit()
421 {
422 if (!queueFull())
423 full_time = 0;
424 else if (!full_time) // may happen here if reconfigure decreases capacity
425 full_time = squid_curtime;
426 else if (squid_curtime - full_time > 180)
427 fatalf("Too many queued %s requests", id_name);
428 }
429
430 bool
431 helper::trySubmit(const char *buf, HLPCB * callback, void *data)
432 {
433 prepSubmit();
434
435 if (queueFull()) {
436 debugs(84, DBG_IMPORTANT, id_name << " drops request due to a full queue");
437 return false; // request was ignored
438 }
439
440 submit(buf, callback, data); // will send or queue
441 return true; // request submitted or queued
442 }
443
444 /// dispatches or enqueues a helper requests; does not enforce queue limits
445 void
446 helper::submit(const char *buf, HLPCB * callback, void *data)
447 {
448 Helper::Request *r = new Helper::Request(callback, data, buf);
449 submitRequest(r);
450 debugs(84, DBG_DATA, Raw("buf", buf, strlen(buf)));
451 }
452
453 /// lastserver = "server last used as part of a reserved request sequence"
454 void
455 helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPCB * callback, void *data, helper_stateful_server * lastserver)
456 {
457 if (hlp == NULL) {
458 debugs(84, 3, "helperStatefulSubmit: hlp == NULL");
459 Helper::Reply nilReply;
460 callback(data, nilReply);
461 return;
462 }
463 hlp->prepSubmit();
464 hlp->submit(buf, callback, data, lastserver);
465 }
466
467 void statefulhelper::submit(const char *buf, HLPCB * callback, void *data, helper_stateful_server * lastserver)
468 {
469 Helper::Request *r = new Helper::Request(callback, data, buf);
470
471 if ((buf != NULL) && lastserver) {
472 debugs(84, 5, "StatefulSubmit with lastserver " << lastserver);
473 assert(lastserver->flags.reserved);
474 assert(!lastserver->requests.size());
475
476 debugs(84, 5, "StatefulSubmit dispatching");
477 helperStatefulDispatch(lastserver, r);
478 } else {
479 helper_stateful_server *srv;
480 if ((srv = StatefulGetFirstAvailable(this))) {
481 helperStatefulDispatch(srv, r);
482 } else
483 StatefulEnqueue(this, r);
484 }
485
486 debugs(84, DBG_DATA, "placeholder: '" << r->placeholder <<
487 "', " << Raw("buf", buf, (!buf?0:strlen(buf))));
488
489 if (!queueFull()) {
490 full_time = 0;
491 } else if (!full_time) {
492 debugs(84, 3, id_name << " queue became full");
493 full_time = squid_curtime;
494 }
495 }
496
497 /**
498 * DPW 2007-05-08
499 *
500 * helperStatefulReleaseServer tells the helper that whoever was
501 * using it no longer needs its services.
502 */
503 void
504 helperStatefulReleaseServer(helper_stateful_server * srv)
505 {
506 debugs(84, 3, HERE << "srv-" << srv->index << " flags.reserved = " << srv->flags.reserved);
507 if (!srv->flags.reserved)
508 return;
509
510 ++ srv->stats.releases;
511
512 srv->flags.reserved = false;
513
514 helperStatefulServerDone(srv);
515 }
516
517 /** return a pointer to the stateful routines data area */
518 void *
519 helperStatefulServerGetData(helper_stateful_server * srv)
520 {
521 return srv->data;
522 }
523
524 void
525 helper::packStatsInto(Packable *p, const char *label) const
526 {
527 if (label)
528 p->appendf("%s:\n", label);
529
530 p->appendf(" program: %s\n", cmdline->key);
531 p->appendf(" number active: %d of %d (%d shutting down)\n", childs.n_active, childs.n_max, (childs.n_running - childs.n_active));
532 p->appendf(" requests sent: %d\n", stats.requests);
533 p->appendf(" replies received: %d\n", stats.replies);
534 p->appendf(" requests timedout: %d\n", stats.timedout);
535 p->appendf(" queue length: %d\n", stats.queue_size);
536 p->appendf(" avg service time: %d msec\n", stats.avg_svc_time);
537 p->append("\n",1);
538 p->appendf("%7s\t%7s\t%7s\t%11s\t%11s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
539 "ID #",
540 "FD",
541 "PID",
542 "# Requests",
543 "# Replies",
544 "# Timed-out",
545 "Flags",
546 "Time",
547 "Offset",
548 "Request");
549
550 for (dlink_node *link = servers.head; link; link = link->next) {
551 HelperServerBase *srv = static_cast<HelperServerBase *>(link->data);
552 assert(srv);
553 Helper::Request *request = srv->requests.empty() ? NULL : srv->requests.front();
554 double tt = 0.001 * (request ? tvSubMsec(request->dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
555 p->appendf("%7u\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
556 srv->index.value,
557 srv->readPipe->fd,
558 srv->pid,
559 srv->stats.uses,
560 srv->stats.replies,
561 srv->stats.timedout,
562 srv->stats.pending ? 'B' : ' ',
563 srv->flags.writing ? 'W' : ' ',
564 srv->flags.closing ? 'C' : ' ',
565 srv->flags.reserved ? 'R' : ' ',
566 srv->flags.shutdown ? 'S' : ' ',
567 request && request->placeholder ? 'P' : ' ',
568 tt < 0.0 ? 0.0 : tt,
569 (int) srv->roffset,
570 request ? Format::QuoteMimeBlob(request->buf) : "(none)");
571 }
572
573 p->append("\nFlags key:\n"
574 " B\tBUSY\n"
575 " W\tWRITING\n"
576 " C\tCLOSING\n"
577 " R\tRESERVED\n"
578 " S\tSHUTDOWN PENDING\n"
579 " P\tPLACEHOLDER\n", 101);
580 }
581
582 void
583 helperShutdown(helper * hlp)
584 {
585 dlink_node *link = hlp->servers.head;
586
587 while (link) {
588 helper_server *srv;
589 srv = (helper_server *)link->data;
590 link = link->next;
591
592 if (srv->flags.shutdown) {
593 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
594 continue;
595 }
596
597 assert(hlp->childs.n_active > 0);
598 -- hlp->childs.n_active;
599 srv->flags.shutdown = true; /* request it to shut itself down */
600
601 if (srv->flags.closing) {
602 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
603 continue;
604 }
605
606 if (srv->stats.pending) {
607 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
608 continue;
609 }
610
611 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
612 /* the rest of the details is dealt with in the helperServerFree
613 * close handler
614 */
615 srv->closePipesSafely(hlp->id_name);
616 }
617 }
618
619 void
620 helperStatefulShutdown(statefulhelper * hlp)
621 {
622 dlink_node *link = hlp->servers.head;
623 helper_stateful_server *srv;
624
625 while (link) {
626 srv = (helper_stateful_server *)link->data;
627 link = link->next;
628
629 if (srv->flags.shutdown) {
630 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
631 continue;
632 }
633
634 assert(hlp->childs.n_active > 0);
635 -- hlp->childs.n_active;
636 srv->flags.shutdown = true; /* request it to shut itself down */
637
638 if (srv->stats.pending) {
639 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
640 continue;
641 }
642
643 if (srv->flags.closing) {
644 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
645 continue;
646 }
647
648 if (srv->flags.reserved) {
649 if (shutting_down) {
650 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Closing anyway.");
651 } else {
652 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Not Shutting Down Yet.");
653 continue;
654 }
655 }
656
657 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
658
659 /* the rest of the details is dealt with in the helperStatefulServerFree
660 * close handler
661 */
662 srv->closePipesSafely(hlp->id_name);
663 }
664 }
665
666 helper::~helper()
667 {
668 /* note, don't free id_name, it probably points to static memory */
669
670 if (queue.head)
671 debugs(84, DBG_CRITICAL, "WARNING: freeing " << id_name << " helper with " << stats.queue_size << " requests queued");
672 }
673
674 /* ====================================================================== */
675 /* LOCAL FUNCTIONS */
676 /* ====================================================================== */
677
678 static void
679 helperServerFree(helper_server *srv)
680 {
681 helper *hlp = srv->parent;
682 int concurrency = hlp->childs.concurrency;
683
684 if (!concurrency)
685 concurrency = 1;
686
687 if (srv->rbuf) {
688 memFreeBuf(srv->rbuf_sz, srv->rbuf);
689 srv->rbuf = NULL;
690 }
691
692 srv->wqueue->clean();
693 delete srv->wqueue;
694
695 if (srv->writebuf) {
696 srv->writebuf->clean();
697 delete srv->writebuf;
698 srv->writebuf = NULL;
699 }
700
701 if (Comm::IsConnOpen(srv->writePipe))
702 srv->closeWritePipeSafely(hlp->id_name);
703
704 dlinkDelete(&srv->link, &hlp->servers);
705
706 assert(hlp->childs.n_running > 0);
707 -- hlp->childs.n_running;
708
709 if (!srv->flags.shutdown) {
710 assert(hlp->childs.n_active > 0);
711 -- hlp->childs.n_active;
712 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index << " exited");
713
714 if (hlp->childs.needNew() > 0) {
715 debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
716
717 if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30) {
718 if (srv->stats.replies < 1)
719 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
720 else
721 debugs(80, DBG_CRITICAL, "ERROR: The " << hlp->id_name << " helpers are crashing too rapidly, need help!");
722 }
723
724 debugs(80, DBG_IMPORTANT, "Starting new helpers");
725 helperOpenServers(hlp);
726 }
727 }
728
729 while (!srv->requests.empty()) {
730 // XXX: re-schedule these on another helper?
731 Helper::Request *r = srv->requests.front();
732 srv->requests.pop_front();
733 void *cbdata;
734
735 if (cbdataReferenceValidDone(r->data, &cbdata)) {
736 Helper::Reply nilReply;
737 r->callback(cbdata, nilReply);
738 }
739
740 delete r;
741 }
742 srv->requestsIndex.clear();
743
744 cbdataReferenceDone(srv->parent);
745 delete srv;
746 }
747
748 static void
749 helperStatefulServerFree(helper_stateful_server *srv)
750 {
751 statefulhelper *hlp = srv->parent;
752
753 if (srv->rbuf) {
754 memFreeBuf(srv->rbuf_sz, srv->rbuf);
755 srv->rbuf = NULL;
756 }
757
758 #if 0
759 srv->wqueue->clean();
760
761 delete srv->wqueue;
762
763 #endif
764
765 /* TODO: walk the local queue of requests and carry them all out */
766 if (Comm::IsConnOpen(srv->writePipe))
767 srv->closeWritePipeSafely(hlp->id_name);
768
769 dlinkDelete(&srv->link, &hlp->servers);
770
771 assert(hlp->childs.n_running > 0);
772 -- hlp->childs.n_running;
773
774 if (!srv->flags.shutdown) {
775 assert( hlp->childs.n_active > 0);
776 -- hlp->childs.n_active;
777 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index << " exited");
778
779 if (hlp->childs.needNew() > 0) {
780 debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
781
782 if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30) {
783 if (srv->stats.replies < 1)
784 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
785 else
786 debugs(80, DBG_CRITICAL, "ERROR: The " << hlp->id_name << " helpers are crashing too rapidly, need help!");
787 }
788
789 debugs(80, DBG_IMPORTANT, "Starting new helpers");
790 helperStatefulOpenServers(hlp);
791 }
792 }
793
794 while (!srv->requests.empty()) {
795 // XXX: re-schedule these on another helper?
796 Helper::Request *r = srv->requests.front();
797 srv->requests.pop_front();
798 void *cbdata;
799
800 if (cbdataReferenceValidDone(r->data, &cbdata)) {
801 Helper::Reply nilReply;
802 r->callback(cbdata, nilReply);
803 }
804
805 delete r;
806 }
807
808 if (srv->data != NULL)
809 hlp->datapool->freeOne(srv->data);
810
811 cbdataReferenceDone(srv->parent);
812
813 delete srv;
814 }
815
816 /// Calls back with a pointer to the buffer with the helper output
817 static void
818 helperReturnBuffer(int request_number, helper_server * srv, helper * hlp, char * msg, char * msg_end)
819 {
820 Helper::Request *r = NULL;
821 helper_server::RequestIndex::iterator it;
822 if (hlp->childs.concurrency) {
823 // If concurency supported retrieve request from ID
824 it = srv->requestsIndex.find(request_number);
825 if (it != srv->requestsIndex.end()) {
826 r = *(it->second);
827 srv->requests.erase(it->second);
828 srv->requestsIndex.erase(it);
829 }
830 } else if(!srv->requests.empty()) {
831 // Else get the first request from queue, if any
832 r = srv->requests.front();
833 srv->requests.pop_front();
834 }
835
836 if (r) {
837 HLPCB *callback = r->callback;
838 r->callback = NULL;
839
840 void *cbdata = NULL;
841 bool retry = false;
842 if (cbdataReferenceValidDone(r->data, &cbdata)) {
843 Helper::Reply response(msg, (msg_end-msg));
844 if (response.result == Helper::BrokenHelper && r->retries < MAX_RETRIES) {
845 debugs(84, DBG_IMPORTANT, "ERROR: helper: " << response << ", attempt #" << (r->retries + 1) << " of 2");
846 retry = true;
847 } else
848 callback(cbdata, response);
849 }
850
851 -- srv->stats.pending;
852 ++ srv->stats.replies;
853
854 ++ hlp->stats.replies;
855
856 srv->answer_time = current_time;
857
858 srv->dispatch_time = r->dispatch_time;
859
860 hlp->stats.avg_svc_time =
861 Math::intAverage(hlp->stats.avg_svc_time,
862 tvSubMsec(r->dispatch_time, current_time),
863 hlp->stats.replies, REDIRECT_AV_FACTOR);
864
865 if (retry) {
866 ++r->retries;
867 hlp->submitRequest(r);
868 } else
869 delete r;
870 } else if (srv->stats.timedout) {
871 debugs(84, 3, "Timedout reply received for request-ID: " << request_number << " , ignore");
872 } else {
873 debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected reply on channel " <<
874 request_number << " from " << hlp->id_name << " #" << srv->index <<
875 " '" << srv->rbuf << "'");
876 }
877
878 if (hlp->timeout && hlp->childs.concurrency)
879 srv->checkForTimedOutRequests(hlp->retryTimedOut);
880
881 if (!srv->flags.shutdown) {
882 helperKickQueue(hlp);
883 } else if (!srv->flags.closing && !srv->stats.pending) {
884 srv->flags.closing=true;
885 srv->writePipe->close();
886 }
887 }
888
889 static void
890 helperHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
891 {
892 char *t = NULL;
893 helper_server *srv = (helper_server *)data;
894 helper *hlp = srv->parent;
895 assert(cbdataReferenceValid(data));
896
897 /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
898
899 if (flag == Comm::ERR_CLOSING) {
900 return;
901 }
902
903 assert(conn->fd == srv->readPipe->fd);
904
905 debugs(84, 5, "helperHandleRead: " << len << " bytes from " << hlp->id_name << " #" << srv->index);
906
907 if (flag != Comm::OK || len == 0) {
908 srv->closePipesSafely(hlp->id_name);
909 return;
910 }
911
912 srv->roffset += len;
913 srv->rbuf[srv->roffset] = '\0';
914 debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
915
916 if (!srv->stats.pending && !srv->stats.timedout) {
917 /* someone spoke without being spoken to */
918 debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected read from " <<
919 hlp->id_name << " #" << srv->index << ", " << (int)len <<
920 " bytes '" << srv->rbuf << "'");
921
922 srv->roffset = 0;
923 srv->rbuf[0] = '\0';
924 }
925
926 while ((t = strchr(srv->rbuf, hlp->eom))) {
927 /* end of reply found */
928 char *msg = srv->rbuf;
929 int i = 0;
930 int skip = 1;
931 debugs(84, 3, "helperHandleRead: end of reply found");
932
933 if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n') {
934 *t = '\0';
935 // rewind to the \r octet which is the real terminal now
936 // and remember that we have to skip forward 2 places now.
937 skip = 2;
938 --t;
939 }
940
941 *t = '\0';
942
943 if (hlp->childs.concurrency) {
944 i = strtol(msg, &msg, 10);
945
946 while (*msg && xisspace(*msg))
947 ++msg;
948 }
949
950 helperReturnBuffer(i, srv, hlp, msg, t);
951 srv->roffset -= (t - srv->rbuf) + skip;
952 memmove(srv->rbuf, t + skip, srv->roffset);
953 srv->rbuf[srv->roffset] = '\0';
954 }
955
956 if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
957 int spaceSize = srv->rbuf_sz - srv->roffset - 1;
958 assert(spaceSize >= 0);
959
960 // grow the input buffer if needed and possible
961 if (!spaceSize && srv->rbuf_sz + 4096 <= ReadBufMaxSize) {
962 srv->rbuf = (char *)memReallocBuf(srv->rbuf, srv->rbuf_sz + 4096, &srv->rbuf_sz);
963 debugs(84, 3, HERE << "Grew read buffer to " << srv->rbuf_sz);
964 spaceSize = srv->rbuf_sz - srv->roffset - 1;
965 assert(spaceSize >= 0);
966 }
967
968 // quit reading if there is no space left
969 if (!spaceSize) {
970 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
971 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
972 "Squid input buffer: " << hlp->id_name << " #" << srv->index);
973 srv->closePipesSafely(hlp->id_name);
974 return;
975 }
976
977 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
978 CommIoCbPtrFun(helperHandleRead, srv));
979 comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
980 }
981 }
982
983 static void
984 helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
985 {
986 char *t = NULL;
987 helper_stateful_server *srv = (helper_stateful_server *)data;
988 statefulhelper *hlp = srv->parent;
989 assert(cbdataReferenceValid(data));
990
991 /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
992
993 if (flag == Comm::ERR_CLOSING) {
994 return;
995 }
996
997 assert(conn->fd == srv->readPipe->fd);
998
999 debugs(84, 5, "helperStatefulHandleRead: " << len << " bytes from " <<
1000 hlp->id_name << " #" << srv->index);
1001
1002 if (flag != Comm::OK || len == 0) {
1003 srv->closePipesSafely(hlp->id_name);
1004 return;
1005 }
1006
1007 srv->roffset += len;
1008 srv->rbuf[srv->roffset] = '\0';
1009 Helper::Request *r = srv->requests.front();
1010 debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
1011
1012 if (r == NULL) {
1013 /* someone spoke without being spoken to */
1014 debugs(84, DBG_IMPORTANT, "helperStatefulHandleRead: unexpected read from " <<
1015 hlp->id_name << " #" << srv->index << ", " << (int)len <<
1016 " bytes '" << srv->rbuf << "'");
1017
1018 srv->roffset = 0;
1019 }
1020
1021 if ((t = strchr(srv->rbuf, hlp->eom))) {
1022 /* end of reply found */
1023 srv->requests.pop_front(); // we already have it in 'r'
1024 int called = 1;
1025 int skip = 1;
1026 debugs(84, 3, "helperStatefulHandleRead: end of reply found");
1027
1028 if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n') {
1029 *t = '\0';
1030 // rewind to the \r octet which is the real terminal now
1031 // and remember that we have to skip forward 2 places now.
1032 skip = 2;
1033 --t;
1034 }
1035
1036 *t = '\0';
1037
1038 if (r && cbdataReferenceValid(r->data)) {
1039 Helper::Reply res(srv->rbuf, (t - srv->rbuf));
1040 res.whichServer = srv;
1041 r->callback(r->data, res);
1042 } else {
1043 debugs(84, DBG_IMPORTANT, "StatefulHandleRead: no callback data registered");
1044 called = 0;
1045 }
1046 // only skip off the \0's _after_ passing its location in Helper::Reply above
1047 t += skip;
1048
1049 /**
1050 * BUG: the below assumes that only one response per read() was received and discards any octets remaining.
1051 * Doing this prohibits concurrency support with multiple replies per read().
1052 * TODO: check that read() setup on these buffers pays attention to roffest!=0
1053 * TODO: check that replies bigger than the buffer are discarded and do not to affect future replies
1054 */
1055 srv->roffset = 0;
1056 delete r;
1057
1058 -- srv->stats.pending;
1059 ++ srv->stats.replies;
1060
1061 ++ hlp->stats.replies;
1062 srv->answer_time = current_time;
1063 hlp->stats.avg_svc_time =
1064 Math::intAverage(hlp->stats.avg_svc_time,
1065 tvSubMsec(srv->dispatch_time, current_time),
1066 hlp->stats.replies, REDIRECT_AV_FACTOR);
1067
1068 if (called)
1069 helperStatefulServerDone(srv);
1070 else
1071 helperStatefulReleaseServer(srv);
1072 }
1073
1074 if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
1075 int spaceSize = srv->rbuf_sz - srv->roffset - 1;
1076 assert(spaceSize >= 0);
1077
1078 // grow the input buffer if needed and possible
1079 if (!spaceSize && srv->rbuf_sz + 4096 <= ReadBufMaxSize) {
1080 srv->rbuf = (char *)memReallocBuf(srv->rbuf, srv->rbuf_sz + 4096, &srv->rbuf_sz);
1081 debugs(84, 3, HERE << "Grew read buffer to " << srv->rbuf_sz);
1082 spaceSize = srv->rbuf_sz - srv->roffset - 1;
1083 assert(spaceSize >= 0);
1084 }
1085
1086 // quit reading if there is no space left
1087 if (!spaceSize) {
1088 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
1089 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
1090 "Squid input buffer: " << hlp->id_name << " #" << srv->index);
1091 srv->closePipesSafely(hlp->id_name);
1092 return;
1093 }
1094
1095 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
1096 CommIoCbPtrFun(helperStatefulHandleRead, srv));
1097 comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
1098 }
1099 }
1100
1101 /// Handles a request when all running helpers, if any, are busy.
1102 static void
1103 Enqueue(helper * hlp, Helper::Request * r)
1104 {
1105 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
1106 dlinkAddTail(r, link, &hlp->queue);
1107 ++ hlp->stats.queue_size;
1108
1109 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1110 if (hlp->childs.needNew() > 0) {
1111 debugs(84, DBG_CRITICAL, "Starting new " << hlp->id_name << " helpers...");
1112 helperOpenServers(hlp);
1113 return;
1114 }
1115
1116 if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
1117 return;
1118
1119 if (squid_curtime - hlp->last_queue_warn < 600)
1120 return;
1121
1122 if (shutting_down || reconfiguring)
1123 return;
1124
1125 hlp->last_queue_warn = squid_curtime;
1126
1127 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1128 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1129 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1130 }
1131
1132 static void
1133 StatefulEnqueue(statefulhelper * hlp, Helper::Request * r)
1134 {
1135 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
1136 dlinkAddTail(r, link, &hlp->queue);
1137 ++ hlp->stats.queue_size;
1138
1139 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1140 if (hlp->childs.needNew() > 0) {
1141 debugs(84, DBG_CRITICAL, "Starting new " << hlp->id_name << " helpers...");
1142 helperStatefulOpenServers(hlp);
1143 return;
1144 }
1145
1146 if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
1147 return;
1148
1149 if (squid_curtime - hlp->last_queue_warn < 600)
1150 return;
1151
1152 if (shutting_down || reconfiguring)
1153 return;
1154
1155 hlp->last_queue_warn = squid_curtime;
1156
1157 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1158 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1159 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1160 }
1161
1162 static Helper::Request *
1163 Dequeue(helper * hlp)
1164 {
1165 dlink_node *link;
1166 Helper::Request *r = NULL;
1167
1168 if ((link = hlp->queue.head)) {
1169 r = (Helper::Request *)link->data;
1170 dlinkDelete(link, &hlp->queue);
1171 memFree(link, MEM_DLINK_NODE);
1172 -- hlp->stats.queue_size;
1173 }
1174
1175 return r;
1176 }
1177
1178 static Helper::Request *
1179 StatefulDequeue(statefulhelper * hlp)
1180 {
1181 dlink_node *link;
1182 Helper::Request *r = NULL;
1183
1184 if ((link = hlp->queue.head)) {
1185 r = (Helper::Request *)link->data;
1186 dlinkDelete(link, &hlp->queue);
1187 memFree(link, MEM_DLINK_NODE);
1188 -- hlp->stats.queue_size;
1189 }
1190
1191 return r;
1192 }
1193
1194 static helper_server *
1195 GetFirstAvailable(helper * hlp)
1196 {
1197 dlink_node *n;
1198 helper_server *srv;
1199 helper_server *selected = NULL;
1200 debugs(84, 5, "GetFirstAvailable: Running servers " << hlp->childs.n_running);
1201
1202 if (hlp->childs.n_running == 0)
1203 return NULL;
1204
1205 /* Find "least" loaded helper (approx) */
1206 for (n = hlp->servers.head; n != NULL; n = n->next) {
1207 srv = (helper_server *)n->data;
1208
1209 if (selected && selected->stats.pending <= srv->stats.pending)
1210 continue;
1211
1212 if (srv->flags.shutdown)
1213 continue;
1214
1215 if (!srv->stats.pending)
1216 return srv;
1217
1218 if (selected) {
1219 selected = srv;
1220 break;
1221 }
1222
1223 selected = srv;
1224 }
1225
1226 /* Check for overload */
1227 if (!selected) {
1228 debugs(84, 5, "GetFirstAvailable: None available.");
1229 return NULL;
1230 }
1231
1232 if (selected->stats.pending >= (hlp->childs.concurrency ? hlp->childs.concurrency : 1)) {
1233 debugs(84, 3, "GetFirstAvailable: Least-loaded helper is overloaded!");
1234 return NULL;
1235 }
1236
1237 debugs(84, 5, "GetFirstAvailable: returning srv-" << selected->index);
1238 return selected;
1239 }
1240
1241 static helper_stateful_server *
1242 StatefulGetFirstAvailable(statefulhelper * hlp)
1243 {
1244 dlink_node *n;
1245 helper_stateful_server *srv = NULL;
1246 debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp->childs.n_running);
1247
1248 if (hlp->childs.n_running == 0)
1249 return NULL;
1250
1251 for (n = hlp->servers.head; n != NULL; n = n->next) {
1252 srv = (helper_stateful_server *)n->data;
1253
1254 if (srv->stats.pending)
1255 continue;
1256
1257 if (srv->flags.reserved)
1258 continue;
1259
1260 if (srv->flags.shutdown)
1261 continue;
1262
1263 debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv->index);
1264 return srv;
1265 }
1266
1267 debugs(84, 5, "StatefulGetFirstAvailable: None available.");
1268 return NULL;
1269 }
1270
1271 static void
1272 helperDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag flag, int, void *data)
1273 {
1274 helper_server *srv = (helper_server *)data;
1275
1276 srv->writebuf->clean();
1277 delete srv->writebuf;
1278 srv->writebuf = NULL;
1279 srv->flags.writing = false;
1280
1281 if (flag != Comm::OK) {
1282 /* Helper server has crashed */
1283 debugs(84, DBG_CRITICAL, "helperDispatch: Helper " << srv->parent->id_name << " #" << srv->index << " has crashed");
1284 return;
1285 }
1286
1287 if (!srv->wqueue->isNull()) {
1288 srv->writebuf = srv->wqueue;
1289 srv->wqueue = new MemBuf;
1290 srv->flags.writing = true;
1291 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1292 CommIoCbPtrFun(helperDispatchWriteDone, srv));
1293 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
1294 }
1295 }
1296
1297 static void
1298 helperDispatch(helper_server * srv, Helper::Request * r)
1299 {
1300 helper *hlp = srv->parent;
1301 const uint64_t reqId = ++srv->nextRequestId;
1302
1303 if (!cbdataReferenceValid(r->data)) {
1304 debugs(84, DBG_IMPORTANT, "helperDispatch: invalid callback data");
1305 delete r;
1306 return;
1307 }
1308
1309 r->Id = reqId;
1310 helper_server::Requests::iterator it = srv->requests.insert(srv->requests.end(), r);
1311 r->dispatch_time = current_time;
1312
1313 if (srv->wqueue->isNull())
1314 srv->wqueue->init();
1315
1316 if (hlp->childs.concurrency) {
1317 srv->requestsIndex.insert(helper_server::RequestIndex::value_type(reqId, it));
1318 assert(srv->requestsIndex.size() == srv->requests.size());
1319 srv->wqueue->appendf("%" PRIu64 " %s", reqId, r->buf);
1320 } else
1321 srv->wqueue->append(r->buf, strlen(r->buf));
1322
1323 if (!srv->flags.writing) {
1324 assert(NULL == srv->writebuf);
1325 srv->writebuf = srv->wqueue;
1326 srv->wqueue = new MemBuf;
1327 srv->flags.writing = true;
1328 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1329 CommIoCbPtrFun(helperDispatchWriteDone, srv));
1330 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
1331 }
1332
1333 debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index << ", " << strlen(r->buf) << " bytes");
1334
1335 ++ srv->stats.uses;
1336 ++ srv->stats.pending;
1337 ++ hlp->stats.requests;
1338 }
1339
1340 static void
1341 helperStatefulDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag, int, void *)
1342 {}
1343
1344 static void
1345 helperStatefulDispatch(helper_stateful_server * srv, Helper::Request * r)
1346 {
1347 statefulhelper *hlp = srv->parent;
1348
1349 if (!cbdataReferenceValid(r->data)) {
1350 debugs(84, DBG_IMPORTANT, "helperStatefulDispatch: invalid callback data");
1351 delete r;
1352 helperStatefulReleaseServer(srv);
1353 return;
1354 }
1355
1356 debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index);
1357
1358 if (r->placeholder == 1) {
1359 /* a callback is needed before this request can _use_ a helper. */
1360 /* we don't care about releasing this helper. The request NEVER
1361 * gets to the helper. So we throw away the return code */
1362 Helper::Reply nilReply;
1363 nilReply.whichServer = srv;
1364 r->callback(r->data, nilReply);
1365 /* throw away the placeholder */
1366 delete r;
1367 /* and push the queue. Note that the callback may have submitted a new
1368 * request to the helper which is why we test for the request */
1369
1370 if (!srv->requests.size())
1371 helperStatefulServerDone(srv);
1372
1373 return;
1374 }
1375
1376 srv->flags.reserved = true;
1377 srv->requests.push_back(r);
1378 srv->dispatch_time = current_time;
1379 AsyncCall::Pointer call = commCbCall(5,5, "helperStatefulDispatchWriteDone",
1380 CommIoCbPtrFun(helperStatefulDispatchWriteDone, hlp));
1381 Comm::Write(srv->writePipe, r->buf, strlen(r->buf), call, NULL);
1382 debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
1383 hlp->id_name << " #" << srv->index << ", " <<
1384 (int) strlen(r->buf) << " bytes");
1385
1386 ++ srv->stats.uses;
1387 ++ srv->stats.pending;
1388 ++ hlp->stats.requests;
1389 }
1390
1391 static void
1392 helperKickQueue(helper * hlp)
1393 {
1394 Helper::Request *r;
1395 helper_server *srv;
1396
1397 while ((srv = GetFirstAvailable(hlp)) && (r = Dequeue(hlp)))
1398 helperDispatch(srv, r);
1399 }
1400
1401 static void
1402 helperStatefulKickQueue(statefulhelper * hlp)
1403 {
1404 Helper::Request *r;
1405 helper_stateful_server *srv;
1406
1407 while ((srv = StatefulGetFirstAvailable(hlp)) && (r = StatefulDequeue(hlp)))
1408 helperStatefulDispatch(srv, r);
1409 }
1410
1411 static void
1412 helperStatefulServerDone(helper_stateful_server * srv)
1413 {
1414 if (!srv->flags.shutdown) {
1415 helperStatefulKickQueue(srv->parent);
1416 } else if (!srv->flags.closing && !srv->flags.reserved && !srv->stats.pending) {
1417 srv->closeWritePipeSafely(srv->parent->id_name);
1418 return;
1419 }
1420 }
1421
1422 void
1423 helper_server::checkForTimedOutRequests(bool const retry)
1424 {
1425 assert(parent->childs.concurrency);
1426 while(!requests.empty() && requests.front()->timedOut(parent->timeout)) {
1427 Helper::Request *r = requests.front();
1428 RequestIndex::iterator it;
1429 it = requestsIndex.find(r->Id);
1430 assert(it != requestsIndex.end());
1431 requestsIndex.erase(it);
1432 requests.pop_front();
1433 debugs(84, 2, "Request " << r->Id << " timed-out, remove it from queue");
1434 void *cbdata;
1435 bool retried = false;
1436 if (retry && r->retries < MAX_RETRIES && cbdataReferenceValid(r->data)) {
1437 debugs(84, 2, "Retry request " << r->Id);
1438 ++r->retries;
1439 parent->submitRequest(r);
1440 retried = true;
1441 } else if (cbdataReferenceValidDone(r->data, &cbdata)) {
1442 if (!parent->onTimedOutResponse.isEmpty()) {
1443 // Helper::Reply needs a non const buffer
1444 char *replyMsg = xstrdup(parent->onTimedOutResponse.c_str());
1445 r->callback(cbdata, Helper::Reply(replyMsg, strlen(replyMsg)));
1446 xfree(replyMsg);
1447 } else
1448 r->callback(cbdata, Helper::Reply(Helper::TimedOut));
1449 }
1450 --stats.pending;
1451 ++stats.timedout;
1452 ++parent->stats.timedout;
1453 if (!retried)
1454 delete r;
1455 }
1456 }
1457
1458 void
1459 helper_server::requestTimeout(const CommTimeoutCbParams &io)
1460 {
1461 debugs(26, 3, HERE << io.conn);
1462 helper_server *srv = static_cast<helper_server *>(io.data);
1463
1464 if (!cbdataReferenceValid(srv))
1465 return;
1466
1467 srv->checkForTimedOutRequests(srv->parent->retryTimedOut);
1468
1469 debugs(84, 3, HERE << io.conn << " establish new helper_server::requestTimeout");
1470 AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "helper_server::requestTimeout",
1471 CommTimeoutCbPtrFun(helper_server::requestTimeout, srv));
1472
1473 const int timeSpent = srv->requests.empty() ? 0 : (squid_curtime - srv->requests.front()->dispatch_time.tv_sec);
1474 const int timeLeft = max(1, (static_cast<int>(srv->parent->timeout) - timeSpent));
1475
1476 commSetConnTimeout(io.conn, timeLeft, timeoutCall);
1477 }
1478