]> git.ipfire.org Git - thirdparty/squid.git/blob - src/helper.cc
Keep ::helper objects alive while in use by helper_servers (#1389)
[thirdparty/squid.git] / src / helper.cc
1 /*
2 * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 84 Helper process maintenance */
10
11 #include "squid.h"
12 #include "base/AsyncCbdataCalls.h"
13 #include "base/Packable.h"
14 #include "base/Raw.h"
15 #include "comm.h"
16 #include "comm/Connection.h"
17 #include "comm/Read.h"
18 #include "comm/Write.h"
19 #include "debug/Messages.h"
20 #include "fd.h"
21 #include "fde.h"
22 #include "format/Quoting.h"
23 #include "helper.h"
24 #include "helper/Reply.h"
25 #include "helper/Request.h"
26 #include "MemBuf.h"
27 #include "SquidConfig.h"
28 #include "SquidIpc.h"
29 #include "SquidMath.h"
30 #include "Store.h"
31 #include "wordlist.h"
32
33 // helper_stateful_server::data uses explicit alloc()/freeOne() */
34 #include "mem/Pool.h"
35
36 #define HELPER_MAX_ARGS 64
37
38 /// The maximum allowed request retries.
39 #define MAX_RETRIES 2
40
41 /// Helpers input buffer size.
42 const size_t ReadBufSize(32*1024);
43
44 static IOCB helperHandleRead;
45 static IOCB helperStatefulHandleRead;
46 static void Enqueue(helper * hlp, Helper::Xaction *);
47 static helper_server *GetFirstAvailable(const helper::Pointer &);
48 static helper_stateful_server *StatefulGetFirstAvailable(const statefulhelper::Pointer &);
49 static void helperDispatch(helper_server * srv, Helper::Xaction * r);
50 static void helperStatefulDispatch(helper_stateful_server * srv, Helper::Xaction * r);
51 static void helperKickQueue(const helper::Pointer &);
52 static void helperStatefulKickQueue(const statefulhelper::Pointer &);
53 static void helperStatefulServerDone(helper_stateful_server * srv);
54 static void StatefulEnqueue(statefulhelper * hlp, Helper::Xaction * r);
55
56 CBDATA_CLASS_INIT(helper_server);
57 CBDATA_CLASS_INIT(helper_stateful_server);
58
59 InstanceIdDefinitions(HelperServerBase, "Hlpr");
60
61 void
62 HelperServerBase::initStats()
63 {
64 stats.uses=0;
65 stats.replies=0;
66 stats.pending=0;
67 stats.releases=0;
68 stats.timedout = 0;
69 }
70
71 void
72 HelperServerBase::closePipesSafely(const char *id_name)
73 {
74 #if _SQUID_WINDOWS_
75 shutdown(writePipe->fd, SD_BOTH);
76 #endif
77
78 flags.closing = true;
79 if (readPipe->fd == writePipe->fd)
80 readPipe->fd = -1;
81 else
82 readPipe->close();
83 writePipe->close();
84
85 #if _SQUID_WINDOWS_
86 if (hIpc) {
87 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
88 getCurrentTime();
89 debugs(84, DBG_IMPORTANT, "WARNING: " << id_name <<
90 " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
91 }
92 CloseHandle(hIpc);
93 }
94 #else
95 (void)id_name;
96 #endif
97 }
98
99 void
100 HelperServerBase::closeWritePipeSafely(const char *id_name)
101 {
102 #if _SQUID_WINDOWS_
103 shutdown(writePipe->fd, (readPipe->fd == writePipe->fd ? SD_BOTH : SD_SEND));
104 #endif
105
106 flags.closing = true;
107 if (readPipe->fd == writePipe->fd)
108 readPipe->fd = -1;
109 writePipe->close();
110
111 #if _SQUID_WINDOWS_
112 if (hIpc) {
113 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
114 getCurrentTime();
115 debugs(84, DBG_IMPORTANT, "WARNING: " << id_name <<
116 " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
117 }
118 CloseHandle(hIpc);
119 }
120 #else
121 (void)id_name;
122 #endif
123 }
124
125 void
126 HelperServerBase::dropQueued()
127 {
128 while (!requests.empty()) {
129 // XXX: re-schedule these on another helper?
130 Helper::Xaction *r = requests.front();
131 requests.pop_front();
132 void *cbdata;
133 if (cbdataReferenceValidDone(r->request.data, &cbdata)) {
134 r->reply.result = Helper::Unknown;
135 r->request.callback(cbdata, r->reply);
136 }
137
138 delete r;
139 }
140 }
141
142 HelperServerBase::~HelperServerBase()
143 {
144 if (rbuf) {
145 memFreeBuf(rbuf_sz, rbuf);
146 rbuf = nullptr;
147 }
148 }
149
150 helper_server::~helper_server()
151 {
152 wqueue->clean();
153 delete wqueue;
154
155 if (writebuf) {
156 writebuf->clean();
157 delete writebuf;
158 writebuf = nullptr;
159 }
160
161 if (Comm::IsConnOpen(writePipe))
162 closeWritePipeSafely(parent->id_name);
163
164 dlinkDelete(&link, &parent->servers);
165
166 assert(parent->childs.n_running > 0);
167 -- parent->childs.n_running;
168
169 assert(requests.empty());
170 }
171
172 void
173 helper_server::dropQueued()
174 {
175 HelperServerBase::dropQueued();
176 requestsIndex.clear();
177 }
178
179 helper_stateful_server::~helper_stateful_server()
180 {
181 /* TODO: walk the local queue of requests and carry them all out */
182 if (Comm::IsConnOpen(writePipe))
183 closeWritePipeSafely(parent->id_name);
184
185 parent->cancelReservation(reservationId);
186
187 dlinkDelete(&link, &parent->servers);
188
189 assert(parent->childs.n_running > 0);
190 -- parent->childs.n_running;
191
192 assert(requests.empty());
193 }
194
195 void
196 helperOpenServers(const helper::Pointer &hlp)
197 {
198 char *s;
199 char *progname;
200 char *shortname;
201 char *procname;
202 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
203 char fd_note_buf[FD_DESC_SZ];
204 helper_server *srv;
205 int nargs = 0;
206 int k;
207 pid_t pid;
208 int rfd;
209 int wfd;
210 void * hIpc;
211 wordlist *w;
212
213 if (hlp->cmdline == nullptr)
214 return;
215
216 progname = hlp->cmdline->key;
217
218 if ((s = strrchr(progname, '/')))
219 shortname = xstrdup(s + 1);
220 else
221 shortname = xstrdup(progname);
222
223 /* figure out how many new child are actually needed. */
224 int need_new = hlp->childs.needNew();
225
226 debugs(84, Important(19), "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
227
228 if (need_new < 1) {
229 debugs(84, Important(20), "helperOpenServers: No '" << shortname << "' processes needed.");
230 }
231
232 procname = (char *)xmalloc(strlen(shortname) + 3);
233
234 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
235
236 args[nargs] = procname;
237 ++nargs;
238
239 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
240 args[nargs] = w->key;
241 ++nargs;
242 }
243
244 args[nargs] = nullptr;
245 ++nargs;
246
247 assert(nargs <= HELPER_MAX_ARGS);
248
249 int successfullyStarted = 0;
250
251 for (k = 0; k < need_new; ++k) {
252 getCurrentTime();
253 rfd = wfd = -1;
254 pid = ipcCreate(hlp->ipc_type,
255 progname,
256 args,
257 shortname,
258 hlp->addr,
259 &rfd,
260 &wfd,
261 &hIpc);
262
263 if (pid < 0) {
264 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
265 continue;
266 }
267
268 ++successfullyStarted;
269 ++ hlp->childs.n_running;
270 ++ hlp->childs.n_active;
271 srv = new helper_server;
272 srv->hIpc = hIpc;
273 srv->pid = pid;
274 srv->initStats();
275 srv->addr = hlp->addr;
276 srv->readPipe = new Comm::Connection;
277 srv->readPipe->fd = rfd;
278 srv->writePipe = new Comm::Connection;
279 srv->writePipe->fd = wfd;
280 srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
281 srv->wqueue = new MemBuf;
282 srv->roffset = 0;
283 srv->nextRequestId = 0;
284 srv->replyXaction = nullptr;
285 srv->ignoreToEom = false;
286 srv->parent = hlp;
287 dlinkAddTail(srv, &srv->link, &hlp->servers);
288
289 if (rfd == wfd) {
290 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
291 fd_note(rfd, fd_note_buf);
292 } else {
293 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
294 fd_note(rfd, fd_note_buf);
295 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
296 fd_note(wfd, fd_note_buf);
297 }
298
299 commSetNonBlocking(rfd);
300
301 if (wfd != rfd)
302 commSetNonBlocking(wfd);
303
304 AsyncCall::Pointer closeCall = asyncCall(5,4, "helper_server::HelperServerClosed", cbdataDialer(helper_server::HelperServerClosed, srv));
305 comm_add_close_handler(rfd, closeCall);
306
307 if (hlp->timeout && hlp->childs.concurrency) {
308 AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "helper_server::requestTimeout",
309 CommTimeoutCbPtrFun(helper_server::requestTimeout, srv));
310 commSetConnTimeout(srv->readPipe, hlp->timeout, timeoutCall);
311 }
312
313 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
314 CommIoCbPtrFun(helperHandleRead, srv));
315 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
316 }
317
318 // Call handleFewerServers() before hlp->last_restart is updated because
319 // that method uses last_restart to measure the delay since previous start.
320 // TODO: Refactor last_restart code to measure failure frequency rather than
321 // detecting a helper #X failure that is being close to the helper #Y start.
322 if (successfullyStarted < need_new)
323 hlp->handleFewerServers(false);
324
325 hlp->last_restart = squid_curtime;
326 safe_free(shortname);
327 safe_free(procname);
328 helperKickQueue(hlp);
329 }
330
331 /**
332 * DPW 2007-05-08
333 *
334 * helperStatefulOpenServers: create the stateful child helper processes
335 */
336 void
337 helperStatefulOpenServers(const statefulhelper::Pointer &hlp)
338 {
339 char *shortname;
340 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
341 char fd_note_buf[FD_DESC_SZ];
342 int nargs = 0;
343
344 if (hlp->cmdline == nullptr)
345 return;
346
347 if (hlp->childs.concurrency)
348 debugs(84, DBG_CRITICAL, "ERROR: concurrency= is not yet supported for stateful helpers ('" << hlp->cmdline << "')");
349
350 char *progname = hlp->cmdline->key;
351
352 char *s;
353 if ((s = strrchr(progname, '/')))
354 shortname = xstrdup(s + 1);
355 else
356 shortname = xstrdup(progname);
357
358 /* figure out haw mant new helpers are needed. */
359 int need_new = hlp->childs.needNew();
360
361 debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
362
363 if (need_new < 1) {
364 debugs(84, DBG_IMPORTANT, "helperStatefulOpenServers: No '" << shortname << "' processes needed.");
365 }
366
367 char *procname = (char *)xmalloc(strlen(shortname) + 3);
368
369 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
370
371 args[nargs] = procname;
372 ++nargs;
373
374 for (wordlist *w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
375 args[nargs] = w->key;
376 ++nargs;
377 }
378
379 args[nargs] = nullptr;
380 ++nargs;
381
382 assert(nargs <= HELPER_MAX_ARGS);
383
384 int successfullyStarted = 0;
385
386 for (int k = 0; k < need_new; ++k) {
387 getCurrentTime();
388 int rfd = -1;
389 int wfd = -1;
390 void * hIpc;
391 pid_t pid = ipcCreate(hlp->ipc_type,
392 progname,
393 args,
394 shortname,
395 hlp->addr,
396 &rfd,
397 &wfd,
398 &hIpc);
399
400 if (pid < 0) {
401 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
402 continue;
403 }
404
405 ++successfullyStarted;
406 ++ hlp->childs.n_running;
407 ++ hlp->childs.n_active;
408 helper_stateful_server *srv = new helper_stateful_server;
409 srv->hIpc = hIpc;
410 srv->pid = pid;
411 srv->initStats();
412 srv->addr = hlp->addr;
413 srv->readPipe = new Comm::Connection;
414 srv->readPipe->fd = rfd;
415 srv->writePipe = new Comm::Connection;
416 srv->writePipe->fd = wfd;
417 srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
418 srv->roffset = 0;
419 srv->parent = hlp;
420 srv->reservationStart = 0;
421
422 dlinkAddTail(srv, &srv->link, &hlp->servers);
423
424 if (rfd == wfd) {
425 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
426 fd_note(rfd, fd_note_buf);
427 } else {
428 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
429 fd_note(rfd, fd_note_buf);
430 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
431 fd_note(wfd, fd_note_buf);
432 }
433
434 commSetNonBlocking(rfd);
435
436 if (wfd != rfd)
437 commSetNonBlocking(wfd);
438
439 AsyncCall::Pointer closeCall = asyncCall(5,4, "helper_stateful_server::HelperServerClosed", cbdataDialer(helper_stateful_server::HelperServerClosed, srv));
440 comm_add_close_handler(rfd, closeCall);
441
442 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
443 CommIoCbPtrFun(helperStatefulHandleRead, srv));
444 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
445 }
446
447 // Call handleFewerServers() before hlp->last_restart is updated because
448 // that method uses last_restart to measure the delay since previous start.
449 // TODO: Refactor last_restart code to measure failure frequency rather than
450 // detecting a helper #X failure that is being close to the helper #Y start.
451 if (successfullyStarted < need_new)
452 hlp->handleFewerServers(false);
453
454 hlp->last_restart = squid_curtime;
455 safe_free(shortname);
456 safe_free(procname);
457 helperStatefulKickQueue(hlp);
458 }
459
460 void
461 helper::submitRequest(Helper::Xaction *r)
462 {
463 helper_server *srv;
464
465 if ((srv = GetFirstAvailable(this)))
466 helperDispatch(srv, r);
467 else
468 Enqueue(this, r);
469
470 syncQueueStats();
471 }
472
473 /// handles helperSubmit() and helperStatefulSubmit() failures
474 static void
475 SubmissionFailure(const helper::Pointer &hlp, HLPCB *callback, void *data)
476 {
477 auto result = Helper::Error;
478 if (!hlp) {
479 debugs(84, 3, "no helper");
480 result = Helper::Unknown;
481 }
482 // else pretend the helper has responded with ERR
483
484 callback(data, Helper::Reply(result));
485 }
486
487 void
488 helperSubmit(const helper::Pointer &hlp, const char *buf, HLPCB * callback, void *data)
489 {
490 if (!hlp || !hlp->trySubmit(buf, callback, data))
491 SubmissionFailure(hlp, callback, data);
492 }
493
494 /// whether queuing an additional request would overload the helper
495 bool
496 helper::queueFull() const {
497 return stats.queue_size >= static_cast<int>(childs.queue_size);
498 }
499
500 bool
501 helper::overloaded() const {
502 return stats.queue_size > static_cast<int>(childs.queue_size);
503 }
504
505 /// synchronizes queue-dependent measurements with the current queue state
506 void
507 helper::syncQueueStats()
508 {
509 if (overloaded()) {
510 if (overloadStart) {
511 debugs(84, 5, id_name << " still overloaded; dropped " << droppedRequests);
512 } else {
513 overloadStart = squid_curtime;
514 debugs(84, 3, id_name << " became overloaded");
515 }
516 } else {
517 if (overloadStart) {
518 debugs(84, 5, id_name << " is no longer overloaded");
519 if (droppedRequests) {
520 debugs(84, DBG_IMPORTANT, "helper " << id_name <<
521 " is no longer overloaded after dropping " << droppedRequests <<
522 " requests in " << (squid_curtime - overloadStart) << " seconds");
523 droppedRequests = 0;
524 }
525 overloadStart = 0;
526 }
527 }
528 }
529
530 /// prepares the helper for request submission
531 /// returns true if and only if the submission should proceed
532 /// may kill Squid if the helper remains overloaded for too long
533 bool
534 helper::prepSubmit()
535 {
536 // re-sync for the configuration may have changed since the last submission
537 syncQueueStats();
538
539 // Nothing special to do if the new request does not overload (i.e., the
540 // queue is not even full yet) or only _starts_ overloading this helper
541 // (i.e., the queue is currently at its limit).
542 if (!overloaded())
543 return true;
544
545 if (squid_curtime - overloadStart <= 180)
546 return true; // also OK: overload has not persisted long enough to panic
547
548 if (childs.onPersistentOverload == Helper::ChildConfig::actDie)
549 fatalf("Too many queued %s requests; see on-persistent-overload.", id_name);
550
551 if (!droppedRequests) {
552 debugs(84, DBG_IMPORTANT, "WARNING: dropping requests to overloaded " <<
553 id_name << " helper configured with on-persistent-overload=err");
554 }
555 ++droppedRequests;
556 debugs(84, 3, "failed to send " << droppedRequests << " helper requests to " << id_name);
557 return false;
558 }
559
560 bool
561 helper::trySubmit(const char *buf, HLPCB * callback, void *data)
562 {
563 if (!prepSubmit())
564 return false; // request was dropped
565
566 submit(buf, callback, data); // will send or queue
567 return true; // request submitted or queued
568 }
569
570 /// dispatches or enqueues a helper requests; does not enforce queue limits
571 void
572 helper::submit(const char *buf, HLPCB * callback, void *data)
573 {
574 Helper::Xaction *r = new Helper::Xaction(callback, data, buf);
575 submitRequest(r);
576 debugs(84, DBG_DATA, Raw("buf", buf, strlen(buf)));
577 }
578
579 /// Submit request or callback the caller with a Helper::Error error.
580 /// If the reservation is not set then reserves a new helper.
581 void
582 helperStatefulSubmit(const statefulhelper::Pointer &hlp, const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
583 {
584 if (!hlp || !hlp->trySubmit(buf, callback, data, reservation))
585 SubmissionFailure(hlp, callback, data);
586 }
587
588 /// If possible, submit request. Otherwise, either kill Squid or return false.
589 bool
590 statefulhelper::trySubmit(const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
591 {
592 if (!prepSubmit())
593 return false; // request was dropped
594
595 submit(buf, callback, data, reservation); // will send or queue
596 return true; // request submitted or queued
597 }
598
599 void
600 statefulhelper::reserveServer(helper_stateful_server * srv)
601 {
602 // clear any old reservation
603 if (srv->reserved()) {
604 reservations.erase(srv->reservationId);
605 srv->clearReservation();
606 }
607
608 srv->reserve();
609 reservations.insert(Reservations::value_type(srv->reservationId, srv));
610 }
611
612 void
613 statefulhelper::cancelReservation(const Helper::ReservationId reservation)
614 {
615 const auto it = reservations.find(reservation);
616 if (it == reservations.end())
617 return;
618
619 helper_stateful_server *srv = it->second;
620 reservations.erase(it);
621 srv->clearReservation();
622
623 // schedule a queue kick
624 AsyncCall::Pointer call = asyncCall(5,4, "helperStatefulServerDone", cbdataDialer(helperStatefulServerDone, srv));
625 ScheduleCallHere(call);
626 }
627
628 helper_stateful_server *
629 statefulhelper::findServer(const Helper::ReservationId & reservation)
630 {
631 const auto it = reservations.find(reservation);
632 if (it == reservations.end())
633 return nullptr;
634 return it->second;
635 }
636
637 void
638 helper_stateful_server::reserve()
639 {
640 assert(!reservationId);
641 reservationStart = squid_curtime;
642 reservationId = Helper::ReservationId::Next();
643 debugs(84, 3, "srv-" << index << " reservation id = " << reservationId);
644 }
645
646 void
647 helper_stateful_server::clearReservation()
648 {
649 debugs(84, 3, "srv-" << index << " reservation id = " << reservationId);
650 if (!reservationId)
651 return;
652
653 ++stats.releases;
654
655 reservationId.clear();
656 reservationStart = 0;
657 }
658
659 void
660 statefulhelper::submit(const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
661 {
662 Helper::Xaction *r = new Helper::Xaction(callback, data, buf);
663
664 if (buf && reservation) {
665 debugs(84, 5, reservation);
666 helper_stateful_server *lastServer = findServer(reservation);
667 if (!lastServer) {
668 debugs(84, DBG_CRITICAL, "ERROR: Helper " << id_name << " reservation expired (" << reservation << ")");
669 r->reply.result = Helper::TimedOut;
670 r->request.callback(r->request.data, r->reply);
671 delete r;
672 return;
673 }
674 debugs(84, 5, "StatefulSubmit dispatching");
675 helperStatefulDispatch(lastServer, r);
676 } else {
677 helper_stateful_server *srv;
678 if ((srv = StatefulGetFirstAvailable(this))) {
679 reserveServer(srv);
680 helperStatefulDispatch(srv, r);
681 } else
682 StatefulEnqueue(this, r);
683 }
684
685 debugs(84, DBG_DATA, "placeholder: '" << r->request.placeholder <<
686 "', " << Raw("buf", buf, (!buf?0:strlen(buf))));
687
688 syncQueueStats();
689 }
690
691 void
692 helper::packStatsInto(Packable *p, const char *label) const
693 {
694 if (label)
695 p->appendf("%s:\n", label);
696
697 p->appendf(" program: %s\n", cmdline->key);
698 p->appendf(" number active: %d of %d (%d shutting down)\n", childs.n_active, childs.n_max, (childs.n_running - childs.n_active));
699 p->appendf(" requests sent: %d\n", stats.requests);
700 p->appendf(" replies received: %d\n", stats.replies);
701 p->appendf(" requests timedout: %d\n", stats.timedout);
702 p->appendf(" queue length: %d\n", stats.queue_size);
703 p->appendf(" avg service time: %d msec\n", stats.avg_svc_time);
704 p->append("\n",1);
705 p->appendf("%7s\t%7s\t%7s\t%11s\t%11s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
706 "ID #",
707 "FD",
708 "PID",
709 "# Requests",
710 "# Replies",
711 "# Timed-out",
712 "Flags",
713 "Time",
714 "Offset",
715 "Request");
716
717 for (dlink_node *link = servers.head; link; link = link->next) {
718 HelperServerBase *srv = static_cast<HelperServerBase *>(link->data);
719 assert(srv);
720 Helper::Xaction *xaction = srv->requests.empty() ? nullptr : srv->requests.front();
721 double tt = 0.001 * (xaction ? tvSubMsec(xaction->request.dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
722 p->appendf("%7u\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
723 srv->index.value,
724 srv->readPipe->fd,
725 srv->pid,
726 srv->stats.uses,
727 srv->stats.replies,
728 srv->stats.timedout,
729 srv->stats.pending ? 'B' : ' ',
730 srv->flags.writing ? 'W' : ' ',
731 srv->flags.closing ? 'C' : ' ',
732 srv->reserved() ? 'R' : ' ',
733 srv->flags.shutdown ? 'S' : ' ',
734 xaction && xaction->request.placeholder ? 'P' : ' ',
735 tt < 0.0 ? 0.0 : tt,
736 (int) srv->roffset,
737 xaction ? Format::QuoteMimeBlob(xaction->request.buf) : "(none)");
738 }
739
740 p->append("\nFlags key:\n"
741 " B\tBUSY\n"
742 " W\tWRITING\n"
743 " C\tCLOSING\n"
744 " R\tRESERVED\n"
745 " S\tSHUTDOWN PENDING\n"
746 " P\tPLACEHOLDER\n", 101);
747 }
748
749 bool
750 helper::willOverload() const {
751 return queueFull() && !(childs.needNew() || GetFirstAvailable(this));
752 }
753
754 helper::Pointer
755 helper::Make(const char *name)
756 {
757 return new helper(name);
758 }
759
760 statefulhelper::Pointer
761 statefulhelper::Make(const char *name)
762 {
763 return new statefulhelper(name);
764 }
765
766 void
767 helperShutdown(const helper::Pointer &hlp)
768 {
769 dlink_node *link = hlp->servers.head;
770
771 while (link) {
772 helper_server *srv;
773 srv = (helper_server *)link->data;
774 link = link->next;
775
776 if (srv->flags.shutdown) {
777 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
778 continue;
779 }
780
781 assert(hlp->childs.n_active > 0);
782 -- hlp->childs.n_active;
783 srv->flags.shutdown = true; /* request it to shut itself down */
784
785 if (srv->flags.closing) {
786 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
787 continue;
788 }
789
790 if (srv->stats.pending) {
791 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
792 continue;
793 }
794
795 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
796 /* the rest of the details is dealt with in the helperServerFree
797 * close handler
798 */
799 srv->closePipesSafely(hlp->id_name);
800 }
801 }
802
803 void
804 helperStatefulShutdown(const statefulhelper::Pointer &hlp)
805 {
806 dlink_node *link = hlp->servers.head;
807 helper_stateful_server *srv;
808
809 while (link) {
810 srv = (helper_stateful_server *)link->data;
811 link = link->next;
812
813 if (srv->flags.shutdown) {
814 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
815 continue;
816 }
817
818 assert(hlp->childs.n_active > 0);
819 -- hlp->childs.n_active;
820 srv->flags.shutdown = true; /* request it to shut itself down */
821
822 if (srv->stats.pending) {
823 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
824 continue;
825 }
826
827 if (srv->flags.closing) {
828 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
829 continue;
830 }
831
832 if (srv->reserved()) {
833 if (shutting_down) {
834 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Closing anyway.");
835 } else {
836 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Not Shutting Down Yet.");
837 continue;
838 }
839 }
840
841 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
842
843 /* the rest of the details is dealt with in the helperStatefulServerFree
844 * close handler
845 */
846 srv->closePipesSafely(hlp->id_name);
847 }
848 }
849
850 helper::~helper()
851 {
852 /* note, don't free id_name, it probably points to static memory */
853
854 // TODO: if the queue is not empty it will leak Helper::Request's
855 if (!queue.empty())
856 debugs(84, DBG_CRITICAL, "WARNING: freeing " << id_name << " helper with " << stats.queue_size << " requests queued");
857 }
858
859 void
860 helper::handleKilledServer(HelperServerBase *srv, bool &needsNewServers)
861 {
862 needsNewServers = false;
863 if (!srv->flags.shutdown) {
864 assert(childs.n_active > 0);
865 --childs.n_active;
866 debugs(84, DBG_CRITICAL, "WARNING: " << id_name << " #" << srv->index << " exited");
867
868 handleFewerServers(srv->stats.replies >= 1);
869
870 if (childs.needNew() > 0) {
871 srv->flags.shutdown = true;
872 needsNewServers = true;
873 }
874 }
875 }
876
877 void
878 helper::handleFewerServers(const bool madeProgress)
879 {
880 const auto needNew = childs.needNew();
881
882 if (!needNew)
883 return; // some server(s) have died, but we still have enough
884
885 debugs(80, DBG_IMPORTANT, "Too few " << id_name << " processes are running (need " << needNew << "/" << childs.n_max << ")" <<
886 Debug::Extra << "active processes: " << childs.n_active <<
887 Debug::Extra << "processes configured to start at (re)configuration: " << childs.n_startup);
888
889 if (childs.n_active < childs.n_startup && last_restart > squid_curtime - 30) {
890 if (madeProgress)
891 debugs(80, DBG_CRITICAL, "ERROR: The " << id_name << " helpers are crashing too rapidly, need help!");
892 else
893 fatalf("The %s helpers are crashing too rapidly, need help!", id_name);
894 }
895 }
896
897 void
898 helper_server::HelperServerClosed(helper_server *srv)
899 {
900 const auto hlp = srv->parent;
901
902 bool needsNewServers = false;
903 hlp->handleKilledServer(srv, needsNewServers);
904 if (needsNewServers) {
905 debugs(80, DBG_IMPORTANT, "Starting new helpers");
906 helperOpenServers(hlp);
907 }
908
909 srv->dropQueued();
910
911 delete srv;
912 }
913
914 // XXX: Almost duplicates helper_server::HelperServerClosed() because helperOpenServers() is not a virtual method of the `helper` class
915 // TODO: Fix the `helper` class hierarchy to use virtual functions.
916 void
917 helper_stateful_server::HelperServerClosed(helper_stateful_server *srv)
918 {
919 const auto hlp = srv->parent;
920
921 bool needsNewServers = false;
922 hlp->handleKilledServer(srv, needsNewServers);
923 if (needsNewServers) {
924 debugs(80, DBG_IMPORTANT, "Starting new helpers");
925 helperStatefulOpenServers(hlp);
926 }
927
928 srv->dropQueued();
929
930 delete srv;
931 }
932
933 Helper::Xaction *
934 helper_server::popRequest(int request_number)
935 {
936 Helper::Xaction *r = nullptr;
937 helper_server::RequestIndex::iterator it;
938 if (parent->childs.concurrency) {
939 // If concurrency supported retrieve request from ID
940 it = requestsIndex.find(request_number);
941 if (it != requestsIndex.end()) {
942 r = *(it->second);
943 requests.erase(it->second);
944 requestsIndex.erase(it);
945 }
946 } else if(!requests.empty()) {
947 // Else get the first request from queue, if any
948 r = requests.front();
949 requests.pop_front();
950 }
951
952 return r;
953 }
954
955 /// Calls back with a pointer to the buffer with the helper output
956 static void
957 helperReturnBuffer(helper_server * srv, const helper::Pointer &hlp, char * msg, size_t msgSize, char * msgEnd)
958 {
959 if (Helper::Xaction *r = srv->replyXaction) {
960 const bool hasSpace = r->reply.accumulate(msg, msgSize);
961 if (!hasSpace) {
962 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
963 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
964 "Squid input buffer: " << hlp->id_name << " #" << srv->index);
965 srv->closePipesSafely(hlp->id_name);
966 return;
967 }
968
969 if (!msgEnd)
970 return; // We are waiting for more data.
971
972 bool retry = false;
973 if (cbdataReferenceValid(r->request.data)) {
974 r->reply.finalize();
975 if (r->reply.result == Helper::BrokenHelper && r->request.retries < MAX_RETRIES) {
976 debugs(84, DBG_IMPORTANT, "ERROR: helper: " << r->reply << ", attempt #" << (r->request.retries + 1) << " of 2");
977 retry = true;
978 } else {
979 HLPCB *callback = r->request.callback;
980 r->request.callback = nullptr;
981 void *cbdata = nullptr;
982 if (cbdataReferenceValidDone(r->request.data, &cbdata))
983 callback(cbdata, r->reply);
984 }
985 }
986
987 -- srv->stats.pending;
988 ++ srv->stats.replies;
989
990 ++ hlp->stats.replies;
991
992 srv->answer_time = current_time;
993
994 srv->dispatch_time = r->request.dispatch_time;
995
996 hlp->stats.avg_svc_time =
997 Math::intAverage(hlp->stats.avg_svc_time,
998 tvSubMsec(r->request.dispatch_time, current_time),
999 hlp->stats.replies, REDIRECT_AV_FACTOR);
1000
1001 // release or re-submit parsedRequestXaction object
1002 srv->replyXaction = nullptr;
1003 if (retry) {
1004 ++r->request.retries;
1005 hlp->submitRequest(r);
1006 } else
1007 delete r;
1008 }
1009
1010 if (hlp->timeout && hlp->childs.concurrency)
1011 srv->checkForTimedOutRequests(hlp->retryTimedOut);
1012
1013 if (!srv->flags.shutdown) {
1014 helperKickQueue(hlp);
1015 } else if (!srv->flags.closing && !srv->stats.pending) {
1016 srv->closeWritePipeSafely(srv->parent->id_name);
1017 }
1018 }
1019
1020 static void
1021 helperHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
1022 {
1023 helper_server *srv = (helper_server *)data;
1024 const auto hlp = srv->parent;
1025 assert(cbdataReferenceValid(data));
1026
1027 /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
1028
1029 if (flag == Comm::ERR_CLOSING) {
1030 return;
1031 }
1032
1033 assert(conn->fd == srv->readPipe->fd);
1034
1035 debugs(84, 5, "helperHandleRead: " << len << " bytes from " << hlp->id_name << " #" << srv->index);
1036
1037 if (flag != Comm::OK || len == 0) {
1038 srv->closePipesSafely(hlp->id_name);
1039 return;
1040 }
1041
1042 srv->roffset += len;
1043 srv->rbuf[srv->roffset] = '\0';
1044 debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
1045
1046 if (!srv->stats.pending && !srv->stats.timedout) {
1047 /* someone spoke without being spoken to */
1048 debugs(84, DBG_IMPORTANT, "ERROR: helperHandleRead: unexpected read from " <<
1049 hlp->id_name << " #" << srv->index << ", " << (int)len <<
1050 " bytes '" << srv->rbuf << "'");
1051
1052 srv->roffset = 0;
1053 srv->rbuf[0] = '\0';
1054 }
1055
1056 bool needsMore = false;
1057 char *msg = srv->rbuf;
1058 while (*msg && !needsMore) {
1059 int skip = 0;
1060 char *eom = strchr(msg, hlp->eom);
1061 if (eom) {
1062 skip = 1;
1063 debugs(84, 3, "helperHandleRead: end of reply found");
1064 if (eom > msg && eom[-1] == '\r' && hlp->eom == '\n') {
1065 *eom = '\0';
1066 // rewind to the \r octet which is the real terminal now
1067 // and remember that we have to skip forward 2 places now.
1068 skip = 2;
1069 --eom;
1070 }
1071 *eom = '\0';
1072 }
1073
1074 if (!srv->ignoreToEom && !srv->replyXaction) {
1075 int i = 0;
1076 if (hlp->childs.concurrency) {
1077 char *e = nullptr;
1078 i = strtol(msg, &e, 10);
1079 // Do we need to check for e == msg? Means wrong response from helper.
1080 // Will be dropped as "unexpected reply on channel 0"
1081 needsMore = !(xisspace(*e) || (eom && e == eom));
1082 if (!needsMore) {
1083 msg = e;
1084 while (*msg && xisspace(*msg))
1085 ++msg;
1086 } // else not enough data to compute request number
1087 }
1088 if (!(srv->replyXaction = srv->popRequest(i))) {
1089 if (srv->stats.timedout) {
1090 debugs(84, 3, "Timedout reply received for request-ID: " << i << " , ignore");
1091 } else {
1092 debugs(84, DBG_IMPORTANT, "ERROR: helperHandleRead: unexpected reply on channel " <<
1093 i << " from " << hlp->id_name << " #" << srv->index <<
1094 " '" << srv->rbuf << "'");
1095 }
1096 srv->ignoreToEom = true;
1097 }
1098 } // else we need to just append reply data to the current Xaction
1099
1100 if (!needsMore) {
1101 size_t msgSize = eom ? eom - msg : (srv->roffset - (msg - srv->rbuf));
1102 assert(msgSize <= srv->rbuf_sz);
1103 helperReturnBuffer(srv, hlp, msg, msgSize, eom);
1104 msg += msgSize + skip;
1105 assert(static_cast<size_t>(msg - srv->rbuf) <= srv->rbuf_sz);
1106
1107 // The next message should not ignored.
1108 if (eom && srv->ignoreToEom)
1109 srv->ignoreToEom = false;
1110 } else
1111 assert(skip == 0 && eom == nullptr);
1112 }
1113
1114 if (needsMore) {
1115 size_t msgSize = (srv->roffset - (msg - srv->rbuf));
1116 assert(msgSize <= srv->rbuf_sz);
1117 memmove(srv->rbuf, msg, msgSize);
1118 srv->roffset = msgSize;
1119 srv->rbuf[srv->roffset] = '\0';
1120 } else {
1121 // All of the responses parsed and msg points at the end of read data
1122 assert(static_cast<size_t>(msg - srv->rbuf) == srv->roffset);
1123 srv->roffset = 0;
1124 }
1125
1126 if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
1127 int spaceSize = srv->rbuf_sz - srv->roffset - 1;
1128 assert(spaceSize >= 0);
1129
1130 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
1131 CommIoCbPtrFun(helperHandleRead, srv));
1132 comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
1133 }
1134 }
1135
1136 static void
1137 helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
1138 {
1139 char *t = nullptr;
1140 helper_stateful_server *srv = (helper_stateful_server *)data;
1141 const auto hlp = srv->parent;
1142 assert(cbdataReferenceValid(data));
1143
1144 /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
1145
1146 if (flag == Comm::ERR_CLOSING) {
1147 return;
1148 }
1149
1150 assert(conn->fd == srv->readPipe->fd);
1151
1152 debugs(84, 5, "helperStatefulHandleRead: " << len << " bytes from " <<
1153 hlp->id_name << " #" << srv->index);
1154
1155 if (flag != Comm::OK || len == 0) {
1156 srv->closePipesSafely(hlp->id_name);
1157 return;
1158 }
1159
1160 srv->roffset += len;
1161 srv->rbuf[srv->roffset] = '\0';
1162 Helper::Xaction *r = srv->requests.front();
1163 debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
1164
1165 if (r == nullptr) {
1166 /* someone spoke without being spoken to */
1167 debugs(84, DBG_IMPORTANT, "ERROR: helperStatefulHandleRead: unexpected read from " <<
1168 hlp->id_name << " #" << srv->index << ", " << (int)len <<
1169 " bytes '" << srv->rbuf << "'");
1170
1171 srv->roffset = 0;
1172 }
1173
1174 if ((t = strchr(srv->rbuf, hlp->eom))) {
1175 debugs(84, 3, "helperStatefulHandleRead: end of reply found");
1176
1177 if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n') {
1178 *t = '\0';
1179 // rewind to the \r octet which is the real terminal now
1180 --t;
1181 }
1182
1183 *t = '\0';
1184 }
1185
1186 if (r && !r->reply.accumulate(srv->rbuf, t ? (t - srv->rbuf) : srv->roffset)) {
1187 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
1188 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
1189 "Squid input buffer: " << hlp->id_name << " #" << srv->index);
1190 srv->closePipesSafely(hlp->id_name);
1191 return;
1192 }
1193 /**
1194 * BUG: the below assumes that only one response per read() was received and discards any octets remaining.
1195 * Doing this prohibits concurrency support with multiple replies per read().
1196 * TODO: check that read() setup on these buffers pays attention to roffest!=0
1197 * TODO: check that replies bigger than the buffer are discarded and do not to affect future replies
1198 */
1199 srv->roffset = 0;
1200
1201 if (t) {
1202 /* end of reply found */
1203 srv->requests.pop_front(); // we already have it in 'r'
1204 int called = 1;
1205
1206 if (r && cbdataReferenceValid(r->request.data)) {
1207 r->reply.finalize();
1208 r->reply.reservationId = srv->reservationId;
1209 r->request.callback(r->request.data, r->reply);
1210 } else {
1211 debugs(84, DBG_IMPORTANT, "StatefulHandleRead: no callback data registered");
1212 called = 0;
1213 }
1214
1215 delete r;
1216
1217 -- srv->stats.pending;
1218 ++ srv->stats.replies;
1219
1220 ++ hlp->stats.replies;
1221 srv->answer_time = current_time;
1222 hlp->stats.avg_svc_time =
1223 Math::intAverage(hlp->stats.avg_svc_time,
1224 tvSubMsec(srv->dispatch_time, current_time),
1225 hlp->stats.replies, REDIRECT_AV_FACTOR);
1226
1227 if (called)
1228 helperStatefulServerDone(srv);
1229 else
1230 hlp->cancelReservation(srv->reservationId);
1231 }
1232
1233 if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
1234 int spaceSize = srv->rbuf_sz - 1;
1235
1236 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
1237 CommIoCbPtrFun(helperStatefulHandleRead, srv));
1238 comm_read(srv->readPipe, srv->rbuf, spaceSize, call);
1239 }
1240 }
1241
1242 /// Handles a request when all running helpers, if any, are busy.
1243 static void
1244 Enqueue(helper * hlp, Helper::Xaction * r)
1245 {
1246 hlp->queue.push(r);
1247 ++ hlp->stats.queue_size;
1248
1249 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1250 if (hlp->childs.needNew() > 0) {
1251 debugs(84, DBG_CRITICAL, "Starting new " << hlp->id_name << " helpers...");
1252 helperOpenServers(hlp);
1253 return;
1254 }
1255
1256 if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
1257 return;
1258
1259 if (squid_curtime - hlp->last_queue_warn < 600)
1260 return;
1261
1262 if (shutting_down || reconfiguring)
1263 return;
1264
1265 hlp->last_queue_warn = squid_curtime;
1266
1267 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1268 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1269 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1270 }
1271
1272 static void
1273 StatefulEnqueue(statefulhelper * hlp, Helper::Xaction * r)
1274 {
1275 hlp->queue.push(r);
1276 ++ hlp->stats.queue_size;
1277
1278 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1279 if (hlp->childs.needNew() > 0) {
1280 debugs(84, DBG_CRITICAL, "Starting new " << hlp->id_name << " helpers...");
1281 helperStatefulOpenServers(hlp);
1282 return;
1283 }
1284
1285 if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
1286 return;
1287
1288 if (squid_curtime - hlp->last_queue_warn < 600)
1289 return;
1290
1291 if (shutting_down || reconfiguring)
1292 return;
1293
1294 hlp->last_queue_warn = squid_curtime;
1295
1296 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1297 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1298 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1299 }
1300
1301 Helper::Xaction *
1302 helper::nextRequest()
1303 {
1304 if (queue.empty())
1305 return nullptr;
1306
1307 auto *r = queue.front();
1308 queue.pop();
1309 --stats.queue_size;
1310 return r;
1311 }
1312
1313 static helper_server *
1314 GetFirstAvailable(const helper::Pointer &hlp)
1315 {
1316 dlink_node *n;
1317 helper_server *srv;
1318 helper_server *selected = nullptr;
1319 debugs(84, 5, "GetFirstAvailable: Running servers " << hlp->childs.n_running);
1320
1321 if (hlp->childs.n_running == 0)
1322 return nullptr;
1323
1324 /* Find "least" loaded helper (approx) */
1325 for (n = hlp->servers.head; n != nullptr; n = n->next) {
1326 srv = (helper_server *)n->data;
1327
1328 if (selected && selected->stats.pending <= srv->stats.pending)
1329 continue;
1330
1331 if (srv->flags.shutdown)
1332 continue;
1333
1334 if (!srv->stats.pending)
1335 return srv;
1336
1337 if (selected) {
1338 selected = srv;
1339 break;
1340 }
1341
1342 selected = srv;
1343 }
1344
1345 if (!selected) {
1346 debugs(84, 5, "GetFirstAvailable: None available.");
1347 return nullptr;
1348 }
1349
1350 if (selected->stats.pending >= (hlp->childs.concurrency ? hlp->childs.concurrency : 1)) {
1351 debugs(84, 3, "GetFirstAvailable: Least-loaded helper is fully loaded!");
1352 return nullptr;
1353 }
1354
1355 debugs(84, 5, "GetFirstAvailable: returning srv-" << selected->index);
1356 return selected;
1357 }
1358
1359 static helper_stateful_server *
1360 StatefulGetFirstAvailable(const statefulhelper::Pointer &hlp)
1361 {
1362 dlink_node *n;
1363 helper_stateful_server *srv = nullptr;
1364 helper_stateful_server *oldestReservedServer = nullptr;
1365 debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp->childs.n_running);
1366
1367 if (hlp->childs.n_running == 0)
1368 return nullptr;
1369
1370 for (n = hlp->servers.head; n != nullptr; n = n->next) {
1371 srv = (helper_stateful_server *)n->data;
1372
1373 if (srv->stats.pending)
1374 continue;
1375
1376 if (srv->reserved()) {
1377 if ((squid_curtime - srv->reservationStart) > hlp->childs.reservationTimeout) {
1378 if (!oldestReservedServer)
1379 oldestReservedServer = srv;
1380 else if (oldestReservedServer->reservationStart < srv->reservationStart)
1381 oldestReservedServer = srv;
1382 debugs(84, 5, "the earlier reserved server is the srv-" << oldestReservedServer->index);
1383 }
1384 continue;
1385 }
1386
1387 if (srv->flags.shutdown)
1388 continue;
1389
1390 debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv->index);
1391 return srv;
1392 }
1393
1394 if (oldestReservedServer) {
1395 debugs(84, 5, "expired reservation " << oldestReservedServer->reservationId << " for srv-" << oldestReservedServer->index);
1396 return oldestReservedServer;
1397 }
1398
1399 debugs(84, 5, "StatefulGetFirstAvailable: None available.");
1400 return nullptr;
1401 }
1402
1403 static void
1404 helperDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag flag, int, void *data)
1405 {
1406 helper_server *srv = (helper_server *)data;
1407
1408 srv->writebuf->clean();
1409 delete srv->writebuf;
1410 srv->writebuf = nullptr;
1411 srv->flags.writing = false;
1412
1413 if (flag != Comm::OK) {
1414 /* Helper server has crashed */
1415 debugs(84, DBG_CRITICAL, "helperDispatch: Helper " << srv->parent->id_name << " #" << srv->index << " has crashed");
1416 return;
1417 }
1418
1419 if (!srv->wqueue->isNull()) {
1420 srv->writebuf = srv->wqueue;
1421 srv->wqueue = new MemBuf;
1422 srv->flags.writing = true;
1423 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1424 CommIoCbPtrFun(helperDispatchWriteDone, srv));
1425 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, nullptr);
1426 }
1427 }
1428
1429 static void
1430 helperDispatch(helper_server * srv, Helper::Xaction * r)
1431 {
1432 const auto hlp = srv->parent;
1433 const uint64_t reqId = ++srv->nextRequestId;
1434
1435 if (!cbdataReferenceValid(r->request.data)) {
1436 debugs(84, DBG_IMPORTANT, "ERROR: helperDispatch: invalid callback data");
1437 delete r;
1438 return;
1439 }
1440
1441 r->request.Id = reqId;
1442 helper_server::Requests::iterator it = srv->requests.insert(srv->requests.end(), r);
1443 r->request.dispatch_time = current_time;
1444
1445 if (srv->wqueue->isNull())
1446 srv->wqueue->init();
1447
1448 if (hlp->childs.concurrency) {
1449 srv->requestsIndex.insert(helper_server::RequestIndex::value_type(reqId, it));
1450 assert(srv->requestsIndex.size() == srv->requests.size());
1451 srv->wqueue->appendf("%" PRIu64 " %s", reqId, r->request.buf);
1452 } else
1453 srv->wqueue->append(r->request.buf, strlen(r->request.buf));
1454
1455 if (!srv->flags.writing) {
1456 assert(nullptr == srv->writebuf);
1457 srv->writebuf = srv->wqueue;
1458 srv->wqueue = new MemBuf;
1459 srv->flags.writing = true;
1460 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1461 CommIoCbPtrFun(helperDispatchWriteDone, srv));
1462 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, nullptr);
1463 }
1464
1465 debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index << ", " << strlen(r->request.buf) << " bytes");
1466
1467 ++ srv->stats.uses;
1468 ++ srv->stats.pending;
1469 ++ hlp->stats.requests;
1470 }
1471
1472 static void
1473 helperStatefulDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag, int, void *)
1474 {}
1475
1476 static void
1477 helperStatefulDispatch(helper_stateful_server * srv, Helper::Xaction * r)
1478 {
1479 const auto hlp = srv->parent;
1480
1481 if (!cbdataReferenceValid(r->request.data)) {
1482 debugs(84, DBG_IMPORTANT, "ERROR: helperStatefulDispatch: invalid callback data");
1483 delete r;
1484 hlp->cancelReservation(srv->reservationId);
1485 return;
1486 }
1487
1488 debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index);
1489
1490 assert(srv->reservationId);
1491 r->reply.reservationId = srv->reservationId;
1492
1493 if (r->request.placeholder == 1) {
1494 /* a callback is needed before this request can _use_ a helper. */
1495 /* we don't care about releasing this helper. The request NEVER
1496 * gets to the helper. So we throw away the return code */
1497 r->reply.result = Helper::Unknown;
1498 r->request.callback(r->request.data, r->reply);
1499 /* throw away the placeholder */
1500 delete r;
1501 /* and push the queue. Note that the callback may have submitted a new
1502 * request to the helper which is why we test for the request */
1503
1504 if (!srv->requests.size())
1505 helperStatefulServerDone(srv);
1506
1507 return;
1508 }
1509
1510 srv->requests.push_back(r);
1511 srv->dispatch_time = current_time;
1512 AsyncCall::Pointer call = commCbCall(5,5, "helperStatefulDispatchWriteDone",
1513 CommIoCbPtrFun(helperStatefulDispatchWriteDone, srv));
1514 Comm::Write(srv->writePipe, r->request.buf, strlen(r->request.buf), call, nullptr);
1515 debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
1516 hlp->id_name << " #" << srv->index << ", " <<
1517 (int) strlen(r->request.buf) << " bytes");
1518
1519 ++ srv->stats.uses;
1520 ++ srv->stats.pending;
1521 ++ hlp->stats.requests;
1522 }
1523
1524 static void
1525 helperKickQueue(const helper::Pointer &hlp)
1526 {
1527 Helper::Xaction *r;
1528 helper_server *srv;
1529
1530 while ((srv = GetFirstAvailable(hlp)) && (r = hlp->nextRequest()))
1531 helperDispatch(srv, r);
1532 }
1533
1534 static void
1535 helperStatefulKickQueue(const statefulhelper::Pointer &hlp)
1536 {
1537 Helper::Xaction *r;
1538 helper_stateful_server *srv;
1539 while ((srv = StatefulGetFirstAvailable(hlp)) && (r = hlp->nextRequest())) {
1540 debugs(84, 5, "found srv-" << srv->index);
1541 hlp->reserveServer(srv);
1542 helperStatefulDispatch(srv, r);
1543 }
1544 }
1545
1546 static void
1547 helperStatefulServerDone(helper_stateful_server * srv)
1548 {
1549 if (!srv->flags.shutdown) {
1550 helperStatefulKickQueue(srv->parent);
1551 } else if (!srv->flags.closing && !srv->reserved() && !srv->stats.pending) {
1552 srv->closeWritePipeSafely(srv->parent->id_name);
1553 return;
1554 }
1555 }
1556
1557 void
1558 helper_server::checkForTimedOutRequests(bool const retry)
1559 {
1560 assert(parent->childs.concurrency);
1561 while(!requests.empty() && requests.front()->request.timedOut(parent->timeout)) {
1562 Helper::Xaction *r = requests.front();
1563 RequestIndex::iterator it;
1564 it = requestsIndex.find(r->request.Id);
1565 assert(it != requestsIndex.end());
1566 requestsIndex.erase(it);
1567 requests.pop_front();
1568 debugs(84, 2, "Request " << r->request.Id << " timed-out, remove it from queue");
1569 void *cbdata;
1570 bool retried = false;
1571 if (retry && r->request.retries < MAX_RETRIES && cbdataReferenceValid(r->request.data)) {
1572 debugs(84, 2, "Retry request " << r->request.Id);
1573 ++r->request.retries;
1574 parent->submitRequest(r);
1575 retried = true;
1576 } else if (cbdataReferenceValidDone(r->request.data, &cbdata)) {
1577 if (!parent->onTimedOutResponse.isEmpty()) {
1578 if (r->reply.accumulate(parent->onTimedOutResponse.rawContent(), parent->onTimedOutResponse.length()))
1579 r->reply.finalize();
1580 else
1581 r->reply.result = Helper::TimedOut;
1582 r->request.callback(cbdata, r->reply);
1583 } else {
1584 r->reply.result = Helper::TimedOut;
1585 r->request.callback(cbdata, r->reply);
1586 }
1587 }
1588 --stats.pending;
1589 ++stats.timedout;
1590 ++parent->stats.timedout;
1591 if (!retried)
1592 delete r;
1593 }
1594 }
1595
1596 void
1597 helper_server::requestTimeout(const CommTimeoutCbParams &io)
1598 {
1599 debugs(26, 3, io.conn);
1600 helper_server *srv = static_cast<helper_server *>(io.data);
1601
1602 srv->checkForTimedOutRequests(srv->parent->retryTimedOut);
1603
1604 debugs(84, 3, io.conn << " establish new helper_server::requestTimeout");
1605 AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "helper_server::requestTimeout",
1606 CommTimeoutCbPtrFun(helper_server::requestTimeout, srv));
1607
1608 const int timeSpent = srv->requests.empty() ? 0 : (squid_curtime - srv->requests.front()->request.dispatch_time.tv_sec);
1609 const int timeLeft = max(1, (static_cast<int>(srv->parent->timeout) - timeSpent));
1610
1611 commSetConnTimeout(io.conn, timeLeft, timeoutCall);
1612 }
1613