]> git.ipfire.org Git - thirdparty/squid.git/blob - src/helper.cc
Parser-NG: Transfer-Encoding:chunked Parser
[thirdparty/squid.git] / src / helper.cc
1 /*
2 * Copyright (C) 1996-2015 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 84 Helper process maintenance */
10
11 #include "squid.h"
12 #include "base/AsyncCbdataCalls.h"
13 #include "base/Packable.h"
14 #include "comm.h"
15 #include "comm/Connection.h"
16 #include "comm/Read.h"
17 #include "comm/Write.h"
18 #include "fd.h"
19 #include "fde.h"
20 #include "format/Quoting.h"
21 #include "helper.h"
22 #include "helper/Reply.h"
23 #include "helper/Request.h"
24 #include "MemBuf.h"
25 #include "SquidConfig.h"
26 #include "SquidIpc.h"
27 #include "SquidMath.h"
28 #include "SquidTime.h"
29 #include "Store.h"
30 #include "wordlist.h"
31
32 // helper_stateful_server::data uses explicit alloc()/freeOne() */
33 #include "mem/Pool.h"
34
35 #define HELPER_MAX_ARGS 64
36
37 /// The maximum allowed request retries.
38 #define MAX_RETRIES 2
39
40 /** Initial Squid input buffer size. Helper responses may exceed this, and
41 * Squid will grow the input buffer as needed, up to ReadBufMaxSize.
42 */
43 const size_t ReadBufMinSize(4*1024);
44
45 /** Maximum safe size of a helper-to-Squid response message plus one.
46 * Squid will warn and close the stream if a helper sends a too-big response.
47 * ssl_crtd helper is known to produce responses of at least 10KB in size.
48 * Some undocumented helpers are known to produce responses exceeding 8KB.
49 */
50 const size_t ReadBufMaxSize(32*1024);
51
52 static IOCB helperHandleRead;
53 static IOCB helperStatefulHandleRead;
54 static void helperServerFree(helper_server *srv);
55 static void helperStatefulServerFree(helper_stateful_server *srv);
56 static void Enqueue(helper * hlp, Helper::Request *);
57 static Helper::Request *Dequeue(helper * hlp);
58 static Helper::Request *StatefulDequeue(statefulhelper * hlp);
59 static helper_server *GetFirstAvailable(helper * hlp);
60 static helper_stateful_server *StatefulGetFirstAvailable(statefulhelper * hlp);
61 static void helperDispatch(helper_server * srv, Helper::Request * r);
62 static void helperStatefulDispatch(helper_stateful_server * srv, Helper::Request * r);
63 static void helperKickQueue(helper * hlp);
64 static void helperStatefulKickQueue(statefulhelper * hlp);
65 static void helperStatefulServerDone(helper_stateful_server * srv);
66 static void StatefulEnqueue(statefulhelper * hlp, Helper::Request * r);
67
68 CBDATA_CLASS_INIT(helper);
69 CBDATA_CLASS_INIT(helper_server);
70 CBDATA_CLASS_INIT(statefulhelper);
71 CBDATA_CLASS_INIT(helper_stateful_server);
72
73 InstanceIdDefinitions(HelperServerBase, "Hlpr");
74
75 void
76 HelperServerBase::initStats()
77 {
78 stats.uses=0;
79 stats.replies=0;
80 stats.pending=0;
81 stats.releases=0;
82 stats.timedout = 0;
83 }
84
85 void
86 HelperServerBase::closePipesSafely(const char *id_name)
87 {
88 #if _SQUID_WINDOWS_
89 shutdown(writePipe->fd, SD_BOTH);
90 #endif
91
92 flags.closing = true;
93 if (readPipe->fd == writePipe->fd)
94 readPipe->fd = -1;
95 else
96 readPipe->close();
97 writePipe->close();
98
99 #if _SQUID_WINDOWS_
100 if (hIpc) {
101 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
102 getCurrentTime();
103 debugs(84, DBG_IMPORTANT, "WARNING: " << id_name <<
104 " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
105 }
106 CloseHandle(hIpc);
107 }
108 #endif
109 }
110
111 void
112 HelperServerBase::closeWritePipeSafely(const char *id_name)
113 {
114 #if _SQUID_WINDOWS_
115 shutdown(writePipe->fd, (readPipe->fd == writePipe->fd ? SD_BOTH : SD_SEND));
116 #endif
117
118 flags.closing = true;
119 if (readPipe->fd == writePipe->fd)
120 readPipe->fd = -1;
121 writePipe->close();
122
123 #if _SQUID_WINDOWS_
124 if (hIpc) {
125 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
126 getCurrentTime();
127 debugs(84, DBG_IMPORTANT, "WARNING: " << id_name <<
128 " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
129 }
130 CloseHandle(hIpc);
131 }
132 #endif
133 }
134
135 void
136 helperOpenServers(helper * hlp)
137 {
138 char *s;
139 char *progname;
140 char *shortname;
141 char *procname;
142 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
143 char fd_note_buf[FD_DESC_SZ];
144 helper_server *srv;
145 int nargs = 0;
146 int k;
147 pid_t pid;
148 int rfd;
149 int wfd;
150 void * hIpc;
151 wordlist *w;
152
153 if (hlp->cmdline == NULL)
154 return;
155
156 progname = hlp->cmdline->key;
157
158 if ((s = strrchr(progname, '/')))
159 shortname = xstrdup(s + 1);
160 else
161 shortname = xstrdup(progname);
162
163 /* figure out how many new child are actually needed. */
164 int need_new = hlp->childs.needNew();
165
166 debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
167
168 if (need_new < 1) {
169 debugs(84, DBG_IMPORTANT, "helperOpenServers: No '" << shortname << "' processes needed.");
170 }
171
172 procname = (char *)xmalloc(strlen(shortname) + 3);
173
174 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
175
176 args[nargs] = procname;
177 ++nargs;
178
179 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
180 args[nargs] = w->key;
181 ++nargs;
182 }
183
184 args[nargs] = NULL;
185 ++nargs;
186
187 assert(nargs <= HELPER_MAX_ARGS);
188
189 for (k = 0; k < need_new; ++k) {
190 getCurrentTime();
191 rfd = wfd = -1;
192 pid = ipcCreate(hlp->ipc_type,
193 progname,
194 args,
195 shortname,
196 hlp->addr,
197 &rfd,
198 &wfd,
199 &hIpc);
200
201 if (pid < 0) {
202 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
203 continue;
204 }
205
206 ++ hlp->childs.n_running;
207 ++ hlp->childs.n_active;
208 srv = new helper_server;
209 srv->hIpc = hIpc;
210 srv->pid = pid;
211 srv->initStats();
212 srv->addr = hlp->addr;
213 srv->readPipe = new Comm::Connection;
214 srv->readPipe->fd = rfd;
215 srv->writePipe = new Comm::Connection;
216 srv->writePipe->fd = wfd;
217 srv->rbuf = (char *)memAllocBuf(ReadBufMinSize, &srv->rbuf_sz);
218 srv->wqueue = new MemBuf;
219 srv->roffset = 0;
220 srv->nextRequestId = 0;
221 srv->parent = cbdataReference(hlp);
222 dlinkAddTail(srv, &srv->link, &hlp->servers);
223
224 if (rfd == wfd) {
225 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
226 fd_note(rfd, fd_note_buf);
227 } else {
228 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
229 fd_note(rfd, fd_note_buf);
230 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
231 fd_note(wfd, fd_note_buf);
232 }
233
234 commSetNonBlocking(rfd);
235
236 if (wfd != rfd)
237 commSetNonBlocking(wfd);
238
239 AsyncCall::Pointer closeCall = asyncCall(5,4, "helperServerFree", cbdataDialer(helperServerFree, srv));
240 comm_add_close_handler(rfd, closeCall);
241
242 if (hlp->timeout && hlp->childs.concurrency) {
243 AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "helper_server::requestTimeout",
244 CommTimeoutCbPtrFun(helper_server::requestTimeout, srv));
245 commSetConnTimeout(srv->readPipe, hlp->timeout, timeoutCall);
246 }
247
248 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
249 CommIoCbPtrFun(helperHandleRead, srv));
250 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
251 }
252
253 hlp->last_restart = squid_curtime;
254 safe_free(shortname);
255 safe_free(procname);
256 helperKickQueue(hlp);
257 }
258
259 /**
260 * DPW 2007-05-08
261 *
262 * helperStatefulOpenServers: create the stateful child helper processes
263 */
264 void
265 helperStatefulOpenServers(statefulhelper * hlp)
266 {
267 char *shortname;
268 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
269 char fd_note_buf[FD_DESC_SZ];
270 int nargs = 0;
271
272 if (hlp->cmdline == NULL)
273 return;
274
275 if (hlp->childs.concurrency)
276 debugs(84, DBG_CRITICAL, "ERROR: concurrency= is not yet supported for stateful helpers ('" << hlp->cmdline << "')");
277
278 char *progname = hlp->cmdline->key;
279
280 char *s;
281 if ((s = strrchr(progname, '/')))
282 shortname = xstrdup(s + 1);
283 else
284 shortname = xstrdup(progname);
285
286 /* figure out haw mant new helpers are needed. */
287 int need_new = hlp->childs.needNew();
288
289 debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
290
291 if (need_new < 1) {
292 debugs(84, DBG_IMPORTANT, "helperStatefulOpenServers: No '" << shortname << "' processes needed.");
293 }
294
295 char *procname = (char *)xmalloc(strlen(shortname) + 3);
296
297 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
298
299 args[nargs] = procname;
300 ++nargs;
301
302 for (wordlist *w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
303 args[nargs] = w->key;
304 ++nargs;
305 }
306
307 args[nargs] = NULL;
308 ++nargs;
309
310 assert(nargs <= HELPER_MAX_ARGS);
311
312 for (int k = 0; k < need_new; ++k) {
313 getCurrentTime();
314 int rfd = -1;
315 int wfd = -1;
316 void * hIpc;
317 pid_t pid = ipcCreate(hlp->ipc_type,
318 progname,
319 args,
320 shortname,
321 hlp->addr,
322 &rfd,
323 &wfd,
324 &hIpc);
325
326 if (pid < 0) {
327 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
328 continue;
329 }
330
331 ++ hlp->childs.n_running;
332 ++ hlp->childs.n_active;
333 helper_stateful_server *srv = new helper_stateful_server;
334 srv->hIpc = hIpc;
335 srv->pid = pid;
336 srv->flags.reserved = false;
337 srv->initStats();
338 srv->addr = hlp->addr;
339 srv->readPipe = new Comm::Connection;
340 srv->readPipe->fd = rfd;
341 srv->writePipe = new Comm::Connection;
342 srv->writePipe->fd = wfd;
343 srv->rbuf = (char *)memAllocBuf(ReadBufMinSize, &srv->rbuf_sz);
344 srv->roffset = 0;
345 srv->parent = cbdataReference(hlp);
346
347 if (hlp->datapool != NULL)
348 srv->data = hlp->datapool->alloc();
349
350 dlinkAddTail(srv, &srv->link, &hlp->servers);
351
352 if (rfd == wfd) {
353 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
354 fd_note(rfd, fd_note_buf);
355 } else {
356 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
357 fd_note(rfd, fd_note_buf);
358 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
359 fd_note(wfd, fd_note_buf);
360 }
361
362 commSetNonBlocking(rfd);
363
364 if (wfd != rfd)
365 commSetNonBlocking(wfd);
366
367 AsyncCall::Pointer closeCall = asyncCall(5,4, "helperStatefulServerFree", cbdataDialer(helperStatefulServerFree, srv));
368 comm_add_close_handler(rfd, closeCall);
369
370 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
371 CommIoCbPtrFun(helperStatefulHandleRead, srv));
372 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
373 }
374
375 hlp->last_restart = squid_curtime;
376 safe_free(shortname);
377 safe_free(procname);
378 helperStatefulKickQueue(hlp);
379 }
380
381 void
382 helper::submitRequest(Helper::Request *r)
383 {
384 helper_server *srv;
385
386 if ((srv = GetFirstAvailable(this)))
387 helperDispatch(srv, r);
388 else
389 Enqueue(this, r);
390
391 if (!queueFull()) {
392 full_time = 0;
393 } else if (!full_time) {
394 debugs(84, 3, id_name << " queue became full");
395 full_time = squid_curtime;
396 }
397 }
398
399 void
400 helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
401 {
402 if (hlp == NULL) {
403 debugs(84, 3, "helperSubmit: hlp == NULL");
404 Helper::Reply nilReply;
405 callback(data, nilReply);
406 return;
407 }
408 hlp->prepSubmit();
409 hlp->submit(buf, callback, data);
410 }
411
412 bool
413 helper::queueFull() const {
414 return stats.queue_size > static_cast<int>(childs.queue_size);
415 }
416
417 /// prepares the helper for request submission via trySubmit() or helperSubmit()
418 /// currently maintains full_time and kills Squid if the helper remains full for too long
419 void
420 helper::prepSubmit()
421 {
422 if (!queueFull())
423 full_time = 0;
424 else if (!full_time) // may happen here if reconfigure decreases capacity
425 full_time = squid_curtime;
426 else if (squid_curtime - full_time > 180)
427 fatalf("Too many queued %s requests", id_name);
428 }
429
430 bool
431 helper::trySubmit(const char *buf, HLPCB * callback, void *data)
432 {
433 prepSubmit();
434
435 if (queueFull()) {
436 debugs(84, DBG_IMPORTANT, id_name << " drops request due to a full queue");
437 return false; // request was ignored
438 }
439
440 submit(buf, callback, data); // will send or queue
441 return true; // request submitted or queued
442 }
443
444 /// dispatches or enqueues a helper requests; does not enforce queue limits
445 void
446 helper::submit(const char *buf, HLPCB * callback, void *data)
447 {
448 Helper::Request *r = new Helper::Request(callback, data, buf);
449 submitRequest(r);
450 debugs(84, DBG_DATA, Raw("buf", buf, strlen(buf)));
451 }
452
453 /// lastserver = "server last used as part of a reserved request sequence"
454 void
455 helperStatefulSubmit(statefulhelper * hlp, const char *buf, HLPCB * callback, void *data, helper_stateful_server * lastserver)
456 {
457 if (hlp == NULL) {
458 debugs(84, 3, "helperStatefulSubmit: hlp == NULL");
459 Helper::Reply nilReply;
460 callback(data, nilReply);
461 return;
462 }
463 hlp->prepSubmit();
464 hlp->submit(buf, callback, data, lastserver);
465 }
466
467 void statefulhelper::submit(const char *buf, HLPCB * callback, void *data, helper_stateful_server * lastserver)
468 {
469 Helper::Request *r = new Helper::Request(callback, data, buf);
470
471 if ((buf != NULL) && lastserver) {
472 debugs(84, 5, "StatefulSubmit with lastserver " << lastserver);
473 assert(lastserver->flags.reserved);
474 assert(!lastserver->requests.size());
475
476 debugs(84, 5, "StatefulSubmit dispatching");
477 helperStatefulDispatch(lastserver, r);
478 } else {
479 helper_stateful_server *srv;
480 if ((srv = StatefulGetFirstAvailable(this))) {
481 helperStatefulDispatch(srv, r);
482 } else
483 StatefulEnqueue(this, r);
484 }
485
486 debugs(84, DBG_DATA, "placeholder: '" << r->placeholder <<
487 "', " << Raw("buf", buf, (!buf?0:strlen(buf))));
488
489 if (!queueFull()) {
490 full_time = 0;
491 } else if (!full_time) {
492 debugs(84, 3, id_name << " queue became full");
493 full_time = squid_curtime;
494 }
495 }
496
497 /**
498 * DPW 2007-05-08
499 *
500 * helperStatefulReleaseServer tells the helper that whoever was
501 * using it no longer needs its services.
502 */
503 void
504 helperStatefulReleaseServer(helper_stateful_server * srv)
505 {
506 debugs(84, 3, HERE << "srv-" << srv->index << " flags.reserved = " << srv->flags.reserved);
507 if (!srv->flags.reserved)
508 return;
509
510 ++ srv->stats.releases;
511
512 srv->flags.reserved = false;
513 if (srv->parent->OnEmptyQueue != NULL && srv->data)
514 srv->parent->OnEmptyQueue(srv->data);
515
516 helperStatefulServerDone(srv);
517 }
518
519 /** return a pointer to the stateful routines data area */
520 void *
521 helperStatefulServerGetData(helper_stateful_server * srv)
522 {
523 return srv->data;
524 }
525
526 void
527 helper::packStatsInto(Packable *p, const char *label) const
528 {
529 if (label)
530 p->appendf("%s:\n", label);
531
532 p->appendf(" program: %s\n", cmdline->key);
533 p->appendf(" number active: %d of %d (%d shutting down)\n", childs.n_active, childs.n_max, (childs.n_running - childs.n_active));
534 p->appendf(" requests sent: %d\n", stats.requests);
535 p->appendf(" replies received: %d\n", stats.replies);
536 p->appendf(" requests timedout: %d\n", stats.timedout);
537 p->appendf(" queue length: %d\n", stats.queue_size);
538 p->appendf(" avg service time: %d msec\n", stats.avg_svc_time);
539 p->append("\n",1);
540 p->appendf("%7s\t%7s\t%7s\t%11s\t%11s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
541 "ID #",
542 "FD",
543 "PID",
544 "# Requests",
545 "# Replies",
546 "# Timed-out",
547 "Flags",
548 "Time",
549 "Offset",
550 "Request");
551
552 for (dlink_node *link = servers.head; link; link = link->next) {
553 HelperServerBase *srv = static_cast<HelperServerBase *>(link->data);
554 assert(srv);
555 Helper::Request *request = srv->requests.empty() ? NULL : srv->requests.front();
556 double tt = 0.001 * (request ? tvSubMsec(request->dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
557 p->appendf("%7u\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
558 srv->index.value,
559 srv->readPipe->fd,
560 srv->pid,
561 srv->stats.uses,
562 srv->stats.replies,
563 srv->stats.timedout,
564 srv->stats.pending ? 'B' : ' ',
565 srv->flags.writing ? 'W' : ' ',
566 srv->flags.closing ? 'C' : ' ',
567 srv->flags.reserved ? 'R' : ' ',
568 srv->flags.shutdown ? 'S' : ' ',
569 request && request->placeholder ? 'P' : ' ',
570 tt < 0.0 ? 0.0 : tt,
571 (int) srv->roffset,
572 request ? Format::QuoteMimeBlob(request->buf) : "(none)");
573 }
574
575 p->append("\nFlags key:\n"
576 " B\tBUSY\n"
577 " W\tWRITING\n"
578 " C\tCLOSING\n"
579 " R\tRESERVED\n"
580 " S\tSHUTDOWN PENDING\n"
581 " P\tPLACEHOLDER\n", 101);
582 }
583
584 void
585 helperShutdown(helper * hlp)
586 {
587 dlink_node *link = hlp->servers.head;
588
589 while (link) {
590 helper_server *srv;
591 srv = (helper_server *)link->data;
592 link = link->next;
593
594 if (srv->flags.shutdown) {
595 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
596 continue;
597 }
598
599 assert(hlp->childs.n_active > 0);
600 -- hlp->childs.n_active;
601 srv->flags.shutdown = true; /* request it to shut itself down */
602
603 if (srv->flags.closing) {
604 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
605 continue;
606 }
607
608 if (srv->stats.pending) {
609 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
610 continue;
611 }
612
613 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
614 /* the rest of the details is dealt with in the helperServerFree
615 * close handler
616 */
617 srv->closePipesSafely(hlp->id_name);
618 }
619 }
620
621 void
622 helperStatefulShutdown(statefulhelper * hlp)
623 {
624 dlink_node *link = hlp->servers.head;
625 helper_stateful_server *srv;
626
627 while (link) {
628 srv = (helper_stateful_server *)link->data;
629 link = link->next;
630
631 if (srv->flags.shutdown) {
632 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
633 continue;
634 }
635
636 assert(hlp->childs.n_active > 0);
637 -- hlp->childs.n_active;
638 srv->flags.shutdown = true; /* request it to shut itself down */
639
640 if (srv->stats.pending) {
641 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
642 continue;
643 }
644
645 if (srv->flags.closing) {
646 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
647 continue;
648 }
649
650 if (srv->flags.reserved) {
651 if (shutting_down) {
652 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Closing anyway.");
653 } else {
654 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Not Shutting Down Yet.");
655 continue;
656 }
657 }
658
659 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
660
661 /* the rest of the details is dealt with in the helperStatefulServerFree
662 * close handler
663 */
664 srv->closePipesSafely(hlp->id_name);
665 }
666 }
667
668 helper::~helper()
669 {
670 /* note, don't free id_name, it probably points to static memory */
671
672 if (queue.head)
673 debugs(84, DBG_CRITICAL, "WARNING: freeing " << id_name << " helper with " << stats.queue_size << " requests queued");
674 }
675
676 /* ====================================================================== */
677 /* LOCAL FUNCTIONS */
678 /* ====================================================================== */
679
680 static void
681 helperServerFree(helper_server *srv)
682 {
683 helper *hlp = srv->parent;
684 int concurrency = hlp->childs.concurrency;
685
686 if (!concurrency)
687 concurrency = 1;
688
689 if (srv->rbuf) {
690 memFreeBuf(srv->rbuf_sz, srv->rbuf);
691 srv->rbuf = NULL;
692 }
693
694 srv->wqueue->clean();
695 delete srv->wqueue;
696
697 if (srv->writebuf) {
698 srv->writebuf->clean();
699 delete srv->writebuf;
700 srv->writebuf = NULL;
701 }
702
703 if (Comm::IsConnOpen(srv->writePipe))
704 srv->closeWritePipeSafely(hlp->id_name);
705
706 dlinkDelete(&srv->link, &hlp->servers);
707
708 assert(hlp->childs.n_running > 0);
709 -- hlp->childs.n_running;
710
711 if (!srv->flags.shutdown) {
712 assert(hlp->childs.n_active > 0);
713 -- hlp->childs.n_active;
714 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index << " exited");
715
716 if (hlp->childs.needNew() > 0) {
717 debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
718
719 if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30) {
720 if (srv->stats.replies < 1)
721 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
722 else
723 debugs(80, DBG_CRITICAL, "ERROR: The " << hlp->id_name << " helpers are crashing too rapidly, need help!");
724 }
725
726 debugs(80, DBG_IMPORTANT, "Starting new helpers");
727 helperOpenServers(hlp);
728 }
729 }
730
731 while (!srv->requests.empty()) {
732 // XXX: re-schedule these on another helper?
733 Helper::Request *r = srv->requests.front();
734 srv->requests.pop_front();
735 void *cbdata;
736
737 if (cbdataReferenceValidDone(r->data, &cbdata)) {
738 Helper::Reply nilReply;
739 r->callback(cbdata, nilReply);
740 }
741
742 delete r;
743 }
744 srv->requestsIndex.clear();
745
746 cbdataReferenceDone(srv->parent);
747 delete srv;
748 }
749
750 static void
751 helperStatefulServerFree(helper_stateful_server *srv)
752 {
753 statefulhelper *hlp = srv->parent;
754
755 if (srv->rbuf) {
756 memFreeBuf(srv->rbuf_sz, srv->rbuf);
757 srv->rbuf = NULL;
758 }
759
760 #if 0
761 srv->wqueue->clean();
762
763 delete srv->wqueue;
764
765 #endif
766
767 /* TODO: walk the local queue of requests and carry them all out */
768 if (Comm::IsConnOpen(srv->writePipe))
769 srv->closeWritePipeSafely(hlp->id_name);
770
771 dlinkDelete(&srv->link, &hlp->servers);
772
773 assert(hlp->childs.n_running > 0);
774 -- hlp->childs.n_running;
775
776 if (!srv->flags.shutdown) {
777 assert( hlp->childs.n_active > 0);
778 -- hlp->childs.n_active;
779 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->id_name << " #" << srv->index << " exited");
780
781 if (hlp->childs.needNew() > 0) {
782 debugs(80, DBG_IMPORTANT, "Too few " << hlp->id_name << " processes are running (need " << hlp->childs.needNew() << "/" << hlp->childs.n_max << ")");
783
784 if (hlp->childs.n_active < hlp->childs.n_startup && hlp->last_restart > squid_curtime - 30) {
785 if (srv->stats.replies < 1)
786 fatalf("The %s helpers are crashing too rapidly, need help!\n", hlp->id_name);
787 else
788 debugs(80, DBG_CRITICAL, "ERROR: The " << hlp->id_name << " helpers are crashing too rapidly, need help!");
789 }
790
791 debugs(80, DBG_IMPORTANT, "Starting new helpers");
792 helperStatefulOpenServers(hlp);
793 }
794 }
795
796 while (!srv->requests.empty()) {
797 // XXX: re-schedule these on another helper?
798 Helper::Request *r = srv->requests.front();
799 srv->requests.pop_front();
800 void *cbdata;
801
802 if (cbdataReferenceValidDone(r->data, &cbdata)) {
803 Helper::Reply nilReply;
804 r->callback(cbdata, nilReply);
805 }
806
807 delete r;
808 }
809
810 if (srv->data != NULL)
811 hlp->datapool->freeOne(srv->data);
812
813 cbdataReferenceDone(srv->parent);
814
815 delete srv;
816 }
817
818 /// Calls back with a pointer to the buffer with the helper output
819 static void
820 helperReturnBuffer(int request_number, helper_server * srv, helper * hlp, char * msg, char * msg_end)
821 {
822 Helper::Request *r = NULL;
823 helper_server::RequestIndex::iterator it;
824 if (hlp->childs.concurrency) {
825 // If concurency supported retrieve request from ID
826 it = srv->requestsIndex.find(request_number);
827 if (it != srv->requestsIndex.end()) {
828 r = *(it->second);
829 srv->requests.erase(it->second);
830 srv->requestsIndex.erase(it);
831 }
832 } else if(!srv->requests.empty()) {
833 // Else get the first request from queue, if any
834 r = srv->requests.front();
835 srv->requests.pop_front();
836 }
837
838 if (r) {
839 HLPCB *callback = r->callback;
840 r->callback = NULL;
841
842 void *cbdata = NULL;
843 bool retry = false;
844 if (cbdataReferenceValidDone(r->data, &cbdata)) {
845 Helper::Reply response(msg, (msg_end-msg));
846 if (response.result == Helper::BrokenHelper && r->retries < MAX_RETRIES) {
847 debugs(84, DBG_IMPORTANT, "ERROR: helper: " << response << ", attempt #" << (r->retries + 1) << " of 2");
848 retry = true;
849 } else
850 callback(cbdata, response);
851 }
852
853 -- srv->stats.pending;
854 ++ srv->stats.replies;
855
856 ++ hlp->stats.replies;
857
858 srv->answer_time = current_time;
859
860 srv->dispatch_time = r->dispatch_time;
861
862 hlp->stats.avg_svc_time =
863 Math::intAverage(hlp->stats.avg_svc_time,
864 tvSubMsec(r->dispatch_time, current_time),
865 hlp->stats.replies, REDIRECT_AV_FACTOR);
866
867 if (retry) {
868 ++r->retries;
869 hlp->submitRequest(r);
870 } else
871 delete r;
872 } else if (srv->stats.timedout) {
873 debugs(84, 3, "Timedout reply received for request-ID: " << request_number << " , ignore");
874 } else {
875 debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected reply on channel " <<
876 request_number << " from " << hlp->id_name << " #" << srv->index <<
877 " '" << srv->rbuf << "'");
878 }
879
880 if (hlp->timeout && hlp->childs.concurrency)
881 srv->checkForTimedOutRequests(hlp->retryTimedOut);
882
883 if (!srv->flags.shutdown) {
884 helperKickQueue(hlp);
885 } else if (!srv->flags.closing && !srv->stats.pending) {
886 srv->flags.closing=true;
887 srv->writePipe->close();
888 }
889 }
890
891 static void
892 helperHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
893 {
894 char *t = NULL;
895 helper_server *srv = (helper_server *)data;
896 helper *hlp = srv->parent;
897 assert(cbdataReferenceValid(data));
898
899 /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
900
901 if (flag == Comm::ERR_CLOSING) {
902 return;
903 }
904
905 assert(conn->fd == srv->readPipe->fd);
906
907 debugs(84, 5, "helperHandleRead: " << len << " bytes from " << hlp->id_name << " #" << srv->index);
908
909 if (flag != Comm::OK || len == 0) {
910 srv->closePipesSafely(hlp->id_name);
911 return;
912 }
913
914 srv->roffset += len;
915 srv->rbuf[srv->roffset] = '\0';
916 debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
917
918 if (!srv->stats.pending && !srv->stats.timedout) {
919 /* someone spoke without being spoken to */
920 debugs(84, DBG_IMPORTANT, "helperHandleRead: unexpected read from " <<
921 hlp->id_name << " #" << srv->index << ", " << (int)len <<
922 " bytes '" << srv->rbuf << "'");
923
924 srv->roffset = 0;
925 srv->rbuf[0] = '\0';
926 }
927
928 while ((t = strchr(srv->rbuf, hlp->eom))) {
929 /* end of reply found */
930 char *msg = srv->rbuf;
931 int i = 0;
932 int skip = 1;
933 debugs(84, 3, "helperHandleRead: end of reply found");
934
935 if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n') {
936 *t = '\0';
937 // rewind to the \r octet which is the real terminal now
938 // and remember that we have to skip forward 2 places now.
939 skip = 2;
940 --t;
941 }
942
943 *t = '\0';
944
945 if (hlp->childs.concurrency) {
946 i = strtol(msg, &msg, 10);
947
948 while (*msg && xisspace(*msg))
949 ++msg;
950 }
951
952 helperReturnBuffer(i, srv, hlp, msg, t);
953 srv->roffset -= (t - srv->rbuf) + skip;
954 memmove(srv->rbuf, t + skip, srv->roffset);
955 srv->rbuf[srv->roffset] = '\0';
956 }
957
958 if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
959 int spaceSize = srv->rbuf_sz - srv->roffset - 1;
960 assert(spaceSize >= 0);
961
962 // grow the input buffer if needed and possible
963 if (!spaceSize && srv->rbuf_sz + 4096 <= ReadBufMaxSize) {
964 srv->rbuf = (char *)memReallocBuf(srv->rbuf, srv->rbuf_sz + 4096, &srv->rbuf_sz);
965 debugs(84, 3, HERE << "Grew read buffer to " << srv->rbuf_sz);
966 spaceSize = srv->rbuf_sz - srv->roffset - 1;
967 assert(spaceSize >= 0);
968 }
969
970 // quit reading if there is no space left
971 if (!spaceSize) {
972 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
973 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
974 "Squid input buffer: " << hlp->id_name << " #" << srv->index);
975 srv->closePipesSafely(hlp->id_name);
976 return;
977 }
978
979 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
980 CommIoCbPtrFun(helperHandleRead, srv));
981 comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
982 }
983 }
984
985 static void
986 helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
987 {
988 char *t = NULL;
989 helper_stateful_server *srv = (helper_stateful_server *)data;
990 statefulhelper *hlp = srv->parent;
991 assert(cbdataReferenceValid(data));
992
993 /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
994
995 if (flag == Comm::ERR_CLOSING) {
996 return;
997 }
998
999 assert(conn->fd == srv->readPipe->fd);
1000
1001 debugs(84, 5, "helperStatefulHandleRead: " << len << " bytes from " <<
1002 hlp->id_name << " #" << srv->index);
1003
1004 if (flag != Comm::OK || len == 0) {
1005 srv->closePipesSafely(hlp->id_name);
1006 return;
1007 }
1008
1009 srv->roffset += len;
1010 srv->rbuf[srv->roffset] = '\0';
1011 Helper::Request *r = srv->requests.front();
1012 debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
1013
1014 if (r == NULL) {
1015 /* someone spoke without being spoken to */
1016 debugs(84, DBG_IMPORTANT, "helperStatefulHandleRead: unexpected read from " <<
1017 hlp->id_name << " #" << srv->index << ", " << (int)len <<
1018 " bytes '" << srv->rbuf << "'");
1019
1020 srv->roffset = 0;
1021 }
1022
1023 if ((t = strchr(srv->rbuf, hlp->eom))) {
1024 /* end of reply found */
1025 srv->requests.pop_front(); // we already have it in 'r'
1026 int called = 1;
1027 int skip = 1;
1028 debugs(84, 3, "helperStatefulHandleRead: end of reply found");
1029
1030 if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n') {
1031 *t = '\0';
1032 // rewind to the \r octet which is the real terminal now
1033 // and remember that we have to skip forward 2 places now.
1034 skip = 2;
1035 --t;
1036 }
1037
1038 *t = '\0';
1039
1040 if (r && cbdataReferenceValid(r->data)) {
1041 Helper::Reply res(srv->rbuf, (t - srv->rbuf));
1042 res.whichServer = srv;
1043 r->callback(r->data, res);
1044 } else {
1045 debugs(84, DBG_IMPORTANT, "StatefulHandleRead: no callback data registered");
1046 called = 0;
1047 }
1048 // only skip off the \0's _after_ passing its location in Helper::Reply above
1049 t += skip;
1050
1051 /**
1052 * BUG: the below assumes that only one response per read() was received and discards any octets remaining.
1053 * Doing this prohibits concurrency support with multiple replies per read().
1054 * TODO: check that read() setup on these buffers pays attention to roffest!=0
1055 * TODO: check that replies bigger than the buffer are discarded and do not to affect future replies
1056 */
1057 srv->roffset = 0;
1058 delete r;
1059
1060 -- srv->stats.pending;
1061 ++ srv->stats.replies;
1062
1063 ++ hlp->stats.replies;
1064 srv->answer_time = current_time;
1065 hlp->stats.avg_svc_time =
1066 Math::intAverage(hlp->stats.avg_svc_time,
1067 tvSubMsec(srv->dispatch_time, current_time),
1068 hlp->stats.replies, REDIRECT_AV_FACTOR);
1069
1070 if (called)
1071 helperStatefulServerDone(srv);
1072 else
1073 helperStatefulReleaseServer(srv);
1074 }
1075
1076 if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
1077 int spaceSize = srv->rbuf_sz - srv->roffset - 1;
1078 assert(spaceSize >= 0);
1079
1080 // grow the input buffer if needed and possible
1081 if (!spaceSize && srv->rbuf_sz + 4096 <= ReadBufMaxSize) {
1082 srv->rbuf = (char *)memReallocBuf(srv->rbuf, srv->rbuf_sz + 4096, &srv->rbuf_sz);
1083 debugs(84, 3, HERE << "Grew read buffer to " << srv->rbuf_sz);
1084 spaceSize = srv->rbuf_sz - srv->roffset - 1;
1085 assert(spaceSize >= 0);
1086 }
1087
1088 // quit reading if there is no space left
1089 if (!spaceSize) {
1090 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
1091 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
1092 "Squid input buffer: " << hlp->id_name << " #" << srv->index);
1093 srv->closePipesSafely(hlp->id_name);
1094 return;
1095 }
1096
1097 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
1098 CommIoCbPtrFun(helperStatefulHandleRead, srv));
1099 comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
1100 }
1101 }
1102
1103 /// Handles a request when all running helpers, if any, are busy.
1104 static void
1105 Enqueue(helper * hlp, Helper::Request * r)
1106 {
1107 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
1108 dlinkAddTail(r, link, &hlp->queue);
1109 ++ hlp->stats.queue_size;
1110
1111 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1112 if (hlp->childs.needNew() > 0) {
1113 debugs(84, DBG_CRITICAL, "Starting new " << hlp->id_name << " helpers...");
1114 helperOpenServers(hlp);
1115 return;
1116 }
1117
1118 if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
1119 return;
1120
1121 if (squid_curtime - hlp->last_queue_warn < 600)
1122 return;
1123
1124 if (shutting_down || reconfiguring)
1125 return;
1126
1127 hlp->last_queue_warn = squid_curtime;
1128
1129 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1130 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1131 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1132 }
1133
1134 static void
1135 StatefulEnqueue(statefulhelper * hlp, Helper::Request * r)
1136 {
1137 dlink_node *link = (dlink_node *)memAllocate(MEM_DLINK_NODE);
1138 dlinkAddTail(r, link, &hlp->queue);
1139 ++ hlp->stats.queue_size;
1140
1141 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1142 if (hlp->childs.needNew() > 0) {
1143 debugs(84, DBG_CRITICAL, "Starting new " << hlp->id_name << " helpers...");
1144 helperStatefulOpenServers(hlp);
1145 return;
1146 }
1147
1148 if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
1149 return;
1150
1151 if (squid_curtime - hlp->last_queue_warn < 600)
1152 return;
1153
1154 if (shutting_down || reconfiguring)
1155 return;
1156
1157 hlp->last_queue_warn = squid_curtime;
1158
1159 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1160 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1161 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1162 }
1163
1164 static Helper::Request *
1165 Dequeue(helper * hlp)
1166 {
1167 dlink_node *link;
1168 Helper::Request *r = NULL;
1169
1170 if ((link = hlp->queue.head)) {
1171 r = (Helper::Request *)link->data;
1172 dlinkDelete(link, &hlp->queue);
1173 memFree(link, MEM_DLINK_NODE);
1174 -- hlp->stats.queue_size;
1175 }
1176
1177 return r;
1178 }
1179
1180 static Helper::Request *
1181 StatefulDequeue(statefulhelper * hlp)
1182 {
1183 dlink_node *link;
1184 Helper::Request *r = NULL;
1185
1186 if ((link = hlp->queue.head)) {
1187 r = (Helper::Request *)link->data;
1188 dlinkDelete(link, &hlp->queue);
1189 memFree(link, MEM_DLINK_NODE);
1190 -- hlp->stats.queue_size;
1191 }
1192
1193 return r;
1194 }
1195
1196 static helper_server *
1197 GetFirstAvailable(helper * hlp)
1198 {
1199 dlink_node *n;
1200 helper_server *srv;
1201 helper_server *selected = NULL;
1202 debugs(84, 5, "GetFirstAvailable: Running servers " << hlp->childs.n_running);
1203
1204 if (hlp->childs.n_running == 0)
1205 return NULL;
1206
1207 /* Find "least" loaded helper (approx) */
1208 for (n = hlp->servers.head; n != NULL; n = n->next) {
1209 srv = (helper_server *)n->data;
1210
1211 if (selected && selected->stats.pending <= srv->stats.pending)
1212 continue;
1213
1214 if (srv->flags.shutdown)
1215 continue;
1216
1217 if (!srv->stats.pending)
1218 return srv;
1219
1220 if (selected) {
1221 selected = srv;
1222 break;
1223 }
1224
1225 selected = srv;
1226 }
1227
1228 /* Check for overload */
1229 if (!selected) {
1230 debugs(84, 5, "GetFirstAvailable: None available.");
1231 return NULL;
1232 }
1233
1234 if (selected->stats.pending >= (hlp->childs.concurrency ? hlp->childs.concurrency : 1)) {
1235 debugs(84, 3, "GetFirstAvailable: Least-loaded helper is overloaded!");
1236 return NULL;
1237 }
1238
1239 debugs(84, 5, "GetFirstAvailable: returning srv-" << selected->index);
1240 return selected;
1241 }
1242
1243 static helper_stateful_server *
1244 StatefulGetFirstAvailable(statefulhelper * hlp)
1245 {
1246 dlink_node *n;
1247 helper_stateful_server *srv = NULL;
1248 debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp->childs.n_running);
1249
1250 if (hlp->childs.n_running == 0)
1251 return NULL;
1252
1253 for (n = hlp->servers.head; n != NULL; n = n->next) {
1254 srv = (helper_stateful_server *)n->data;
1255
1256 if (srv->stats.pending)
1257 continue;
1258
1259 if (srv->flags.reserved)
1260 continue;
1261
1262 if (srv->flags.shutdown)
1263 continue;
1264
1265 if ((hlp->IsAvailable != NULL) && (srv->data != NULL) && !(hlp->IsAvailable(srv->data)))
1266 continue;
1267
1268 debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv->index);
1269 return srv;
1270 }
1271
1272 debugs(84, 5, "StatefulGetFirstAvailable: None available.");
1273 return NULL;
1274 }
1275
1276 static void
1277 helperDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag flag, int, void *data)
1278 {
1279 helper_server *srv = (helper_server *)data;
1280
1281 srv->writebuf->clean();
1282 delete srv->writebuf;
1283 srv->writebuf = NULL;
1284 srv->flags.writing = false;
1285
1286 if (flag != Comm::OK) {
1287 /* Helper server has crashed */
1288 debugs(84, DBG_CRITICAL, "helperDispatch: Helper " << srv->parent->id_name << " #" << srv->index << " has crashed");
1289 return;
1290 }
1291
1292 if (!srv->wqueue->isNull()) {
1293 srv->writebuf = srv->wqueue;
1294 srv->wqueue = new MemBuf;
1295 srv->flags.writing = true;
1296 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1297 CommIoCbPtrFun(helperDispatchWriteDone, srv));
1298 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
1299 }
1300 }
1301
1302 static void
1303 helperDispatch(helper_server * srv, Helper::Request * r)
1304 {
1305 helper *hlp = srv->parent;
1306 const uint64_t reqId = ++srv->nextRequestId;
1307
1308 if (!cbdataReferenceValid(r->data)) {
1309 debugs(84, DBG_IMPORTANT, "helperDispatch: invalid callback data");
1310 delete r;
1311 return;
1312 }
1313
1314 r->Id = reqId;
1315 helper_server::Requests::iterator it = srv->requests.insert(srv->requests.end(), r);
1316 r->dispatch_time = current_time;
1317
1318 if (srv->wqueue->isNull())
1319 srv->wqueue->init();
1320
1321 if (hlp->childs.concurrency) {
1322 srv->requestsIndex.insert(helper_server::RequestIndex::value_type(reqId, it));
1323 assert(srv->requestsIndex.size() == srv->requests.size());
1324 srv->wqueue->appendf("%" PRIu64 " %s", reqId, r->buf);
1325 } else
1326 srv->wqueue->append(r->buf, strlen(r->buf));
1327
1328 if (!srv->flags.writing) {
1329 assert(NULL == srv->writebuf);
1330 srv->writebuf = srv->wqueue;
1331 srv->wqueue = new MemBuf;
1332 srv->flags.writing = true;
1333 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1334 CommIoCbPtrFun(helperDispatchWriteDone, srv));
1335 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
1336 }
1337
1338 debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index << ", " << strlen(r->buf) << " bytes");
1339
1340 ++ srv->stats.uses;
1341 ++ srv->stats.pending;
1342 ++ hlp->stats.requests;
1343 }
1344
1345 static void
1346 helperStatefulDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag, int, void *)
1347 {}
1348
1349 static void
1350 helperStatefulDispatch(helper_stateful_server * srv, Helper::Request * r)
1351 {
1352 statefulhelper *hlp = srv->parent;
1353
1354 if (!cbdataReferenceValid(r->data)) {
1355 debugs(84, DBG_IMPORTANT, "helperStatefulDispatch: invalid callback data");
1356 delete r;
1357 helperStatefulReleaseServer(srv);
1358 return;
1359 }
1360
1361 debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index);
1362
1363 if (r->placeholder == 1) {
1364 /* a callback is needed before this request can _use_ a helper. */
1365 /* we don't care about releasing this helper. The request NEVER
1366 * gets to the helper. So we throw away the return code */
1367 Helper::Reply nilReply;
1368 nilReply.whichServer = srv;
1369 r->callback(r->data, nilReply);
1370 /* throw away the placeholder */
1371 delete r;
1372 /* and push the queue. Note that the callback may have submitted a new
1373 * request to the helper which is why we test for the request */
1374
1375 if (!srv->requests.size())
1376 helperStatefulServerDone(srv);
1377
1378 return;
1379 }
1380
1381 srv->flags.reserved = true;
1382 srv->requests.push_back(r);
1383 srv->dispatch_time = current_time;
1384 AsyncCall::Pointer call = commCbCall(5,5, "helperStatefulDispatchWriteDone",
1385 CommIoCbPtrFun(helperStatefulDispatchWriteDone, hlp));
1386 Comm::Write(srv->writePipe, r->buf, strlen(r->buf), call, NULL);
1387 debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
1388 hlp->id_name << " #" << srv->index << ", " <<
1389 (int) strlen(r->buf) << " bytes");
1390
1391 ++ srv->stats.uses;
1392 ++ srv->stats.pending;
1393 ++ hlp->stats.requests;
1394 }
1395
1396 static void
1397 helperKickQueue(helper * hlp)
1398 {
1399 Helper::Request *r;
1400 helper_server *srv;
1401
1402 while ((srv = GetFirstAvailable(hlp)) && (r = Dequeue(hlp)))
1403 helperDispatch(srv, r);
1404 }
1405
1406 static void
1407 helperStatefulKickQueue(statefulhelper * hlp)
1408 {
1409 Helper::Request *r;
1410 helper_stateful_server *srv;
1411
1412 while ((srv = StatefulGetFirstAvailable(hlp)) && (r = StatefulDequeue(hlp)))
1413 helperStatefulDispatch(srv, r);
1414 }
1415
1416 static void
1417 helperStatefulServerDone(helper_stateful_server * srv)
1418 {
1419 if (!srv->flags.shutdown) {
1420 helperStatefulKickQueue(srv->parent);
1421 } else if (!srv->flags.closing && !srv->flags.reserved && !srv->stats.pending) {
1422 srv->closeWritePipeSafely(srv->parent->id_name);
1423 return;
1424 }
1425 }
1426
1427 void
1428 helper_server::checkForTimedOutRequests(bool const retry)
1429 {
1430 assert(parent->childs.concurrency);
1431 while(!requests.empty() && requests.front()->timedOut(parent->timeout)) {
1432 Helper::Request *r = requests.front();
1433 RequestIndex::iterator it;
1434 it = requestsIndex.find(r->Id);
1435 assert(it != requestsIndex.end());
1436 requestsIndex.erase(it);
1437 requests.pop_front();
1438 debugs(84, 2, "Request " << r->Id << " timed-out, remove it from queue");
1439 void *cbdata;
1440 bool retried = false;
1441 if (retry && r->retries < MAX_RETRIES && cbdataReferenceValid(r->data)) {
1442 debugs(84, 2, "Retry request " << r->Id);
1443 ++r->retries;
1444 parent->submitRequest(r);
1445 retried = true;
1446 } else if (cbdataReferenceValidDone(r->data, &cbdata)) {
1447 if (!parent->onTimedOutResponse.isEmpty()) {
1448 // Helper::Reply needs a non const buffer
1449 char *replyMsg = xstrdup(parent->onTimedOutResponse.c_str());
1450 r->callback(cbdata, Helper::Reply(replyMsg, strlen(replyMsg)));
1451 xfree(replyMsg);
1452 } else
1453 r->callback(cbdata, Helper::Reply(Helper::TimedOut));
1454 }
1455 --stats.pending;
1456 ++stats.timedout;
1457 ++parent->stats.timedout;
1458 if (!retried)
1459 delete r;
1460 }
1461 }
1462
1463 void
1464 helper_server::requestTimeout(const CommTimeoutCbParams &io)
1465 {
1466 debugs(26, 3, HERE << io.conn);
1467 helper_server *srv = static_cast<helper_server *>(io.data);
1468
1469 if (!cbdataReferenceValid(srv))
1470 return;
1471
1472 srv->checkForTimedOutRequests(srv->parent->retryTimedOut);
1473
1474 debugs(84, 3, HERE << io.conn << " establish new helper_server::requestTimeout");
1475 AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "helper_server::requestTimeout",
1476 CommTimeoutCbPtrFun(helper_server::requestTimeout, srv));
1477
1478 const int timeSpent = srv->requests.empty() ? 0 : (squid_curtime - srv->requests.front()->dispatch_time.tv_sec);
1479 const int timeLeft = max(1, (static_cast<int>(srv->parent->timeout) - timeSpent));
1480
1481 commSetConnTimeout(io.conn, timeLeft, timeoutCall);
1482 }
1483