]> git.ipfire.org Git - thirdparty/squid.git/blob - src/helper.cc
58a4a2f022e38e163e6cd97f7ea7563e1792dcf5
[thirdparty/squid.git] / src / helper.cc
1 /*
2 * Copyright (C) 1996-2025 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 84 Helper process maintenance */
10
11 #include "squid.h"
12 #include "base/AsyncCbdataCalls.h"
13 #include "base/Packable.h"
14 #include "base/Raw.h"
15 #include "comm.h"
16 #include "comm/Connection.h"
17 #include "comm/Read.h"
18 #include "comm/Write.h"
19 #include "debug/Messages.h"
20 #include "fd.h"
21 #include "fde.h"
22 #include "format/Quoting.h"
23 #include "helper.h"
24 #include "helper/Reply.h"
25 #include "helper/Request.h"
26 #include "MemBuf.h"
27 #include "SquidConfig.h"
28 #include "SquidIpc.h"
29 #include "SquidMath.h"
30 #include "Store.h"
31 #include "wordlist.h"
32
33 // helper_stateful_server::data uses explicit alloc()/freeOne() */
34 #include "mem/Pool.h"
35
36 #define HELPER_MAX_ARGS 64
37
38 /// The maximum allowed request retries.
39 #define MAX_RETRIES 2
40
41 /// Helpers input buffer size.
42 /// Keep in sync with MAX_PAC_GROUP_SIZE until converted to SBuf
43 const size_t ReadBufSize(128*1024);
44
45 static IOCB helperHandleRead;
46 static IOCB helperStatefulHandleRead;
47 static void Enqueue(Helper::Client *, Helper::Xaction *);
48 static Helper::Session *GetFirstAvailable(const Helper::Client::Pointer &);
49 static helper_stateful_server *StatefulGetFirstAvailable(const statefulhelper::Pointer &);
50 static void helperDispatch(Helper::Session *, Helper::Xaction *);
51 static void helperStatefulDispatch(helper_stateful_server * srv, Helper::Xaction * r);
52 static void helperKickQueue(const Helper::Client::Pointer &);
53 static void helperStatefulKickQueue(const statefulhelper::Pointer &);
54 static void helperStatefulServerDone(helper_stateful_server * srv);
55 static void StatefulEnqueue(statefulhelper * hlp, Helper::Xaction * r);
56
57 CBDATA_NAMESPACED_CLASS_INIT(Helper, Session);
58 CBDATA_CLASS_INIT(helper_stateful_server);
59
60 InstanceIdDefinitions(Helper::SessionBase, "Hlpr");
61
62 void
63 Helper::SessionBase::initStats()
64 {
65 stats.uses=0;
66 stats.replies=0;
67 stats.pending=0;
68 stats.releases=0;
69 stats.timedout = 0;
70 }
71
72 void
73 Helper::SessionBase::closePipesSafely()
74 {
75 #if _SQUID_WINDOWS_
76 shutdown(writePipe->fd, SD_BOTH);
77 #endif
78
79 flags.closing = true;
80 if (readPipe->fd == writePipe->fd)
81 readPipe->fd = -1;
82 else
83 readPipe->close();
84 writePipe->close();
85
86 #if _SQUID_WINDOWS_
87 if (hIpc) {
88 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
89 getCurrentTime();
90 debugs(84, DBG_IMPORTANT, "WARNING: " << helper().id_name <<
91 " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
92 }
93 CloseHandle(hIpc);
94 }
95 #endif
96 }
97
98 void
99 Helper::SessionBase::closeWritePipeSafely()
100 {
101 #if _SQUID_WINDOWS_
102 shutdown(writePipe->fd, (readPipe->fd == writePipe->fd ? SD_BOTH : SD_SEND));
103 #endif
104
105 flags.closing = true;
106 if (readPipe->fd == writePipe->fd)
107 readPipe->fd = -1;
108 writePipe->close();
109
110 #if _SQUID_WINDOWS_
111 if (hIpc) {
112 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
113 getCurrentTime();
114 debugs(84, DBG_IMPORTANT, "WARNING: " << helper().id_name <<
115 " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
116 }
117 CloseHandle(hIpc);
118 }
119 #endif
120 }
121
122 void
123 Helper::SessionBase::dropQueued()
124 {
125 while (!requests.empty()) {
126 // XXX: re-schedule these on another helper?
127 const auto r = requests.front();
128 requests.pop_front();
129 r->reply.result = Helper::Unknown;
130 helper().callBack(*r);
131 delete r;
132 }
133 }
134
135 Helper::SessionBase::~SessionBase()
136 {
137 if (rbuf) {
138 memFreeBuf(rbuf_sz, rbuf);
139 rbuf = nullptr;
140 }
141 }
142
143 Helper::Session::~Session()
144 {
145 wqueue->clean();
146 delete wqueue;
147
148 if (writebuf) {
149 writebuf->clean();
150 delete writebuf;
151 writebuf = nullptr;
152 }
153
154 if (Comm::IsConnOpen(writePipe))
155 closeWritePipeSafely();
156
157 dlinkDelete(&link, &parent->servers);
158
159 assert(parent->childs.n_running > 0);
160 -- parent->childs.n_running;
161
162 assert(requests.empty());
163 }
164
165 void
166 Helper::Session::dropQueued()
167 {
168 SessionBase::dropQueued();
169 requestsIndex.clear();
170 }
171
172 helper_stateful_server::~helper_stateful_server()
173 {
174 /* TODO: walk the local queue of requests and carry them all out */
175 if (Comm::IsConnOpen(writePipe))
176 closeWritePipeSafely();
177
178 parent->cancelReservation(reservationId);
179
180 dlinkDelete(&link, &parent->servers);
181
182 assert(parent->childs.n_running > 0);
183 -- parent->childs.n_running;
184
185 assert(requests.empty());
186 }
187
188 void
189 Helper::Client::openSessions()
190 {
191 char *s;
192 char *progname;
193 char *shortname;
194 char *procname;
195 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
196 char fd_note_buf[FD_DESC_SZ];
197 int nargs = 0;
198 int k;
199 pid_t pid;
200 int rfd;
201 int wfd;
202 void * hIpc;
203 wordlist *w;
204 // Helps reducing diff. TODO: remove
205 const auto hlp = this;
206
207 if (hlp->cmdline == nullptr)
208 return;
209
210 progname = hlp->cmdline->key;
211
212 if ((s = strrchr(progname, '/')))
213 shortname = xstrdup(s + 1);
214 else
215 shortname = xstrdup(progname);
216
217 /* figure out how many new child are actually needed. */
218 int need_new = hlp->childs.needNew();
219
220 debugs(84, Important(19), "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
221
222 if (need_new < 1) {
223 debugs(84, Important(20), "helperOpenServers: No '" << shortname << "' processes needed.");
224 }
225
226 procname = (char *)xmalloc(strlen(shortname) + 3);
227
228 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
229
230 args[nargs] = procname;
231 ++nargs;
232
233 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
234 args[nargs] = w->key;
235 ++nargs;
236 }
237
238 args[nargs] = nullptr;
239 ++nargs;
240
241 assert(nargs <= HELPER_MAX_ARGS + 1);
242
243 int successfullyStarted = 0;
244
245 for (k = 0; k < need_new; ++k) {
246 getCurrentTime();
247 rfd = wfd = -1;
248 pid = ipcCreate(hlp->ipc_type,
249 progname,
250 args,
251 shortname,
252 hlp->addr,
253 &rfd,
254 &wfd,
255 &hIpc);
256
257 if (pid < 0) {
258 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
259 continue;
260 }
261
262 ++successfullyStarted;
263 ++ hlp->childs.n_running;
264 ++ hlp->childs.n_active;
265 const auto srv = new Helper::Session;
266 srv->hIpc = hIpc;
267 srv->pid = pid;
268 srv->initStats();
269 srv->addr = hlp->addr;
270 srv->readPipe = new Comm::Connection;
271 srv->readPipe->fd = rfd;
272 srv->writePipe = new Comm::Connection;
273 srv->writePipe->fd = wfd;
274 srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
275 srv->wqueue = new MemBuf;
276 srv->roffset = 0;
277 srv->nextRequestId = 0;
278 srv->replyXaction = nullptr;
279 srv->ignoreToEom = false;
280 srv->parent = hlp;
281 dlinkAddTail(srv, &srv->link, &hlp->servers);
282
283 if (rfd == wfd) {
284 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
285 fd_note(rfd, fd_note_buf);
286 } else {
287 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
288 fd_note(rfd, fd_note_buf);
289 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
290 fd_note(wfd, fd_note_buf);
291 }
292
293 commSetNonBlocking(rfd);
294
295 if (wfd != rfd)
296 commSetNonBlocking(wfd);
297
298 AsyncCall::Pointer closeCall = asyncCall(5, 4, "Helper::Session::HelperServerClosed", cbdataDialer(SessionBase::HelperServerClosed,
299 static_cast<Helper::SessionBase *>(srv)));
300
301 comm_add_close_handler(rfd, closeCall);
302
303 if (hlp->timeout && hlp->childs.concurrency) {
304 AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "Helper::Session::requestTimeout",
305 CommTimeoutCbPtrFun(Helper::Session::requestTimeout, srv));
306 commSetConnTimeout(srv->readPipe, hlp->timeout, timeoutCall);
307 }
308
309 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
310 CommIoCbPtrFun(helperHandleRead, srv));
311 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
312 }
313
314 // Call handleFewerServers() before hlp->last_restart is updated because
315 // that method uses last_restart to measure the delay since previous start.
316 // TODO: Refactor last_restart code to measure failure frequency rather than
317 // detecting a helper #X failure that is being close to the helper #Y start.
318 if (successfullyStarted < need_new)
319 hlp->handleFewerServers(false);
320
321 hlp->last_restart = squid_curtime;
322 safe_free(shortname);
323 safe_free(procname);
324 helperKickQueue(hlp);
325 }
326
327 void
328 statefulhelper::openSessions()
329 {
330 char *shortname;
331 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
332 char fd_note_buf[FD_DESC_SZ];
333 int nargs = 0;
334 // Helps reducing diff. TODO: remove
335 const auto hlp = this;
336
337 if (hlp->cmdline == nullptr)
338 return;
339
340 if (hlp->childs.concurrency)
341 debugs(84, DBG_CRITICAL, "ERROR: concurrency= is not yet supported for stateful helpers ('" << hlp->cmdline << "')");
342
343 char *progname = hlp->cmdline->key;
344
345 char *s;
346 if ((s = strrchr(progname, '/')))
347 shortname = xstrdup(s + 1);
348 else
349 shortname = xstrdup(progname);
350
351 /* figure out haw mant new helpers are needed. */
352 int need_new = hlp->childs.needNew();
353
354 debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
355
356 if (need_new < 1) {
357 debugs(84, DBG_IMPORTANT, "helperStatefulOpenServers: No '" << shortname << "' processes needed.");
358 }
359
360 char *procname = (char *)xmalloc(strlen(shortname) + 3);
361
362 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
363
364 args[nargs] = procname;
365 ++nargs;
366
367 for (wordlist *w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
368 args[nargs] = w->key;
369 ++nargs;
370 }
371
372 args[nargs] = nullptr;
373 ++nargs;
374
375 assert(nargs <= HELPER_MAX_ARGS + 1);
376
377 int successfullyStarted = 0;
378
379 for (int k = 0; k < need_new; ++k) {
380 getCurrentTime();
381 int rfd = -1;
382 int wfd = -1;
383 void * hIpc;
384 pid_t pid = ipcCreate(hlp->ipc_type,
385 progname,
386 args,
387 shortname,
388 hlp->addr,
389 &rfd,
390 &wfd,
391 &hIpc);
392
393 if (pid < 0) {
394 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
395 continue;
396 }
397
398 ++successfullyStarted;
399 ++ hlp->childs.n_running;
400 ++ hlp->childs.n_active;
401 helper_stateful_server *srv = new helper_stateful_server;
402 srv->hIpc = hIpc;
403 srv->pid = pid;
404 srv->initStats();
405 srv->addr = hlp->addr;
406 srv->readPipe = new Comm::Connection;
407 srv->readPipe->fd = rfd;
408 srv->writePipe = new Comm::Connection;
409 srv->writePipe->fd = wfd;
410 srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
411 srv->roffset = 0;
412 srv->parent = hlp;
413 srv->reservationStart = 0;
414
415 dlinkAddTail(srv, &srv->link, &hlp->servers);
416
417 if (rfd == wfd) {
418 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
419 fd_note(rfd, fd_note_buf);
420 } else {
421 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
422 fd_note(rfd, fd_note_buf);
423 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
424 fd_note(wfd, fd_note_buf);
425 }
426
427 commSetNonBlocking(rfd);
428
429 if (wfd != rfd)
430 commSetNonBlocking(wfd);
431
432 AsyncCall::Pointer closeCall = asyncCall(5, 4, "helper_stateful_server::HelperServerClosed", cbdataDialer(Helper::SessionBase::HelperServerClosed,
433 static_cast<Helper::SessionBase *>(srv)));
434
435 comm_add_close_handler(rfd, closeCall);
436
437 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
438 CommIoCbPtrFun(helperStatefulHandleRead, srv));
439 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
440 }
441
442 // Call handleFewerServers() before hlp->last_restart is updated because
443 // that method uses last_restart to measure the delay since previous start.
444 // TODO: Refactor last_restart code to measure failure frequency rather than
445 // detecting a helper #X failure that is being close to the helper #Y start.
446 if (successfullyStarted < need_new)
447 hlp->handleFewerServers(false);
448
449 hlp->last_restart = squid_curtime;
450 safe_free(shortname);
451 safe_free(procname);
452 helperStatefulKickQueue(hlp);
453 }
454
455 void
456 Helper::Client::submitRequest(Helper::Xaction * const r)
457 {
458 if (const auto srv = GetFirstAvailable(this))
459 helperDispatch(srv, r);
460 else
461 Enqueue(this, r);
462
463 syncQueueStats();
464 }
465
466 /// handles helperSubmit() and helperStatefulSubmit() failures
467 static void
468 SubmissionFailure(const Helper::Client::Pointer &hlp, HLPCB *callback, void *data)
469 {
470 auto result = Helper::Error;
471 if (!hlp) {
472 debugs(84, 3, "no helper");
473 result = Helper::Unknown;
474 }
475 // else pretend the helper has responded with ERR
476
477 callback(data, Helper::Reply(result));
478 }
479
480 void
481 helperSubmit(const Helper::Client::Pointer &hlp, const char * const buf, HLPCB * const callback, void * const data)
482 {
483 if (!hlp || !hlp->trySubmit(buf, callback, data))
484 SubmissionFailure(hlp, callback, data);
485 }
486
487 /// whether queuing an additional request would overload the helper
488 bool
489 Helper::Client::queueFull() const {
490 return stats.queue_size >= static_cast<int>(childs.queue_size);
491 }
492
493 bool
494 Helper::Client::overloaded() const {
495 return stats.queue_size > static_cast<int>(childs.queue_size);
496 }
497
498 /// synchronizes queue-dependent measurements with the current queue state
499 void
500 Helper::Client::syncQueueStats()
501 {
502 if (overloaded()) {
503 if (overloadStart) {
504 debugs(84, 5, id_name << " still overloaded; dropped " << droppedRequests);
505 } else {
506 overloadStart = squid_curtime;
507 debugs(84, 3, id_name << " became overloaded");
508 }
509 } else {
510 if (overloadStart) {
511 debugs(84, 5, id_name << " is no longer overloaded");
512 if (droppedRequests) {
513 debugs(84, DBG_IMPORTANT, "helper " << id_name <<
514 " is no longer overloaded after dropping " << droppedRequests <<
515 " requests in " << (squid_curtime - overloadStart) << " seconds");
516 droppedRequests = 0;
517 }
518 overloadStart = 0;
519 }
520 }
521 }
522
523 /// prepares the helper for request submission
524 /// returns true if and only if the submission should proceed
525 /// may kill Squid if the helper remains overloaded for too long
526 bool
527 Helper::Client::prepSubmit()
528 {
529 // re-sync for the configuration may have changed since the last submission
530 syncQueueStats();
531
532 // Nothing special to do if the new request does not overload (i.e., the
533 // queue is not even full yet) or only _starts_ overloading this helper
534 // (i.e., the queue is currently at its limit).
535 if (!overloaded())
536 return true;
537
538 if (squid_curtime - overloadStart <= 180)
539 return true; // also OK: overload has not persisted long enough to panic
540
541 if (childs.onPersistentOverload == ChildConfig::actDie)
542 fatalf("Too many queued %s requests; see on-persistent-overload.", id_name);
543
544 if (!droppedRequests) {
545 debugs(84, DBG_IMPORTANT, "WARNING: dropping requests to overloaded " <<
546 id_name << " helper configured with on-persistent-overload=err");
547 }
548 ++droppedRequests;
549 debugs(84, 3, "failed to send " << droppedRequests << " helper requests to " << id_name);
550 return false;
551 }
552
553 bool
554 Helper::Client::trySubmit(const char * const buf, HLPCB * const callback, void * const data)
555 {
556 if (!prepSubmit())
557 return false; // request was dropped
558
559 submit(buf, callback, data); // will send or queue
560 return true; // request submitted or queued
561 }
562
563 /// dispatches or enqueues a helper requests; does not enforce queue limits
564 void
565 Helper::Client::submit(const char * const buf, HLPCB * const callback, void * const data)
566 {
567 const auto r = new Xaction(callback, data, buf);
568 submitRequest(r);
569 debugs(84, DBG_DATA, Raw("buf", buf, strlen(buf)));
570 }
571
572 void
573 Helper::Client::callBack(Xaction &r)
574 {
575 const auto callback = r.request.callback;
576 Assure(callback);
577
578 r.request.callback = nullptr;
579 void *cbdata = nullptr;
580 if (cbdataReferenceValidDone(r.request.data, &cbdata))
581 callback(cbdata, r.reply);
582 }
583
584 /// Submit request or callback the caller with a Helper::Error error.
585 /// If the reservation is not set then reserves a new helper.
586 void
587 helperStatefulSubmit(const statefulhelper::Pointer &hlp, const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
588 {
589 if (!hlp || !hlp->trySubmit(buf, callback, data, reservation))
590 SubmissionFailure(hlp, callback, data);
591 }
592
593 /// If possible, submit request. Otherwise, either kill Squid or return false.
594 bool
595 statefulhelper::trySubmit(const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
596 {
597 if (!prepSubmit())
598 return false; // request was dropped
599
600 submit(buf, callback, data, reservation); // will send or queue
601 return true; // request submitted or queued
602 }
603
604 void
605 statefulhelper::reserveServer(helper_stateful_server * srv)
606 {
607 // clear any old reservation
608 if (srv->reserved()) {
609 reservations.erase(srv->reservationId);
610 srv->clearReservation();
611 }
612
613 srv->reserve();
614 reservations.insert(Reservations::value_type(srv->reservationId, srv));
615 }
616
617 void
618 statefulhelper::cancelReservation(const Helper::ReservationId reservation)
619 {
620 const auto it = reservations.find(reservation);
621 if (it == reservations.end())
622 return;
623
624 helper_stateful_server *srv = it->second;
625 reservations.erase(it);
626 srv->clearReservation();
627
628 // schedule a queue kick
629 AsyncCall::Pointer call = asyncCall(5,4, "helperStatefulServerDone", cbdataDialer(helperStatefulServerDone, srv));
630 ScheduleCallHere(call);
631 }
632
633 helper_stateful_server *
634 statefulhelper::findServer(const Helper::ReservationId & reservation)
635 {
636 const auto it = reservations.find(reservation);
637 if (it == reservations.end())
638 return nullptr;
639 return it->second;
640 }
641
642 void
643 helper_stateful_server::reserve()
644 {
645 assert(!reservationId);
646 reservationStart = squid_curtime;
647 reservationId = Helper::ReservationId::Next();
648 debugs(84, 3, "srv-" << index << " reservation id = " << reservationId);
649 }
650
651 void
652 helper_stateful_server::clearReservation()
653 {
654 debugs(84, 3, "srv-" << index << " reservation id = " << reservationId);
655 if (!reservationId)
656 return;
657
658 ++stats.releases;
659
660 reservationId.clear();
661 reservationStart = 0;
662 }
663
664 void
665 statefulhelper::submit(const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
666 {
667 Helper::Xaction *r = new Helper::Xaction(callback, data, buf);
668
669 if (buf && reservation) {
670 debugs(84, 5, reservation);
671 helper_stateful_server *lastServer = findServer(reservation);
672 if (!lastServer) {
673 debugs(84, DBG_CRITICAL, "ERROR: Helper " << id_name << " reservation expired (" << reservation << ")");
674 r->reply.result = Helper::TimedOut;
675 callBack(*r);
676 delete r;
677 return;
678 }
679 debugs(84, 5, "StatefulSubmit dispatching");
680 helperStatefulDispatch(lastServer, r);
681 } else {
682 helper_stateful_server *srv;
683 if ((srv = StatefulGetFirstAvailable(this))) {
684 reserveServer(srv);
685 helperStatefulDispatch(srv, r); // may delete r
686 } else
687 StatefulEnqueue(this, r);
688 }
689
690 // r may be dangling here
691 syncQueueStats();
692 }
693
694 void
695 Helper::Client::packStatsInto(Packable * const p, const char * const label) const
696 {
697 if (label)
698 p->appendf("%s:\n", label);
699
700 p->appendf(" program: %s\n", cmdline->key);
701 p->appendf(" number active: %d of %d (%d shutting down)\n", childs.n_active, childs.n_max, (childs.n_running - childs.n_active));
702 p->appendf(" requests sent: %d\n", stats.requests);
703 p->appendf(" replies received: %d\n", stats.replies);
704 p->appendf(" requests timedout: %d\n", stats.timedout);
705 p->appendf(" queue length: %d\n", stats.queue_size);
706 p->appendf(" avg service time: %d msec\n", stats.avg_svc_time);
707 p->append("\n",1);
708 p->appendf("%7s\t%7s\t%7s\t%11s\t%11s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
709 "ID #",
710 "FD",
711 "PID",
712 "# Requests",
713 "# Replies",
714 "# Timed-out",
715 "Flags",
716 "Time",
717 "Offset",
718 "Request");
719
720 for (dlink_node *link = servers.head; link; link = link->next) {
721 const auto srv = static_cast<SessionBase *>(link->data);
722 assert(srv);
723 const auto xaction = srv->requests.empty() ? nullptr : srv->requests.front();
724 double tt = 0.001 * (xaction ? tvSubMsec(xaction->request.dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
725 p->appendf("%7u\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
726 srv->index.value,
727 srv->readPipe->fd,
728 srv->pid,
729 srv->stats.uses,
730 srv->stats.replies,
731 srv->stats.timedout,
732 srv->stats.pending ? 'B' : ' ',
733 srv->flags.writing ? 'W' : ' ',
734 srv->flags.closing ? 'C' : ' ',
735 srv->reserved() ? 'R' : ' ',
736 srv->flags.shutdown ? 'S' : ' ',
737 xaction && xaction->request.placeholder ? 'P' : ' ',
738 tt < 0.0 ? 0.0 : tt,
739 (int) srv->roffset,
740 xaction ? Format::QuoteMimeBlob(xaction->request.buf) : "(none)");
741 }
742
743 p->append("\nFlags key:\n"
744 " B\tBUSY\n"
745 " W\tWRITING\n"
746 " C\tCLOSING\n"
747 " R\tRESERVED\n"
748 " S\tSHUTDOWN PENDING\n"
749 " P\tPLACEHOLDER\n", 101);
750 }
751
752 bool
753 Helper::Client::willOverload() const {
754 return queueFull() && !(childs.needNew() || GetFirstAvailable(this));
755 }
756
757 Helper::Client::Pointer
758 Helper::Client::Make(const char * const name)
759 {
760 return new Client(name);
761 }
762
763 statefulhelper::Pointer
764 statefulhelper::Make(const char *name)
765 {
766 return new statefulhelper(name);
767 }
768
769 void
770 helperShutdown(const Helper::Client::Pointer &hlp)
771 {
772 dlink_node *link = hlp->servers.head;
773
774 while (link) {
775 const auto srv = static_cast<Helper::Session *>(link->data);
776 link = link->next;
777
778 if (srv->flags.shutdown) {
779 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
780 continue;
781 }
782
783 assert(hlp->childs.n_active > 0);
784 -- hlp->childs.n_active;
785 srv->flags.shutdown = true; /* request it to shut itself down */
786
787 if (srv->flags.closing) {
788 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
789 continue;
790 }
791
792 if (srv->stats.pending) {
793 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
794 continue;
795 }
796
797 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
798 /* the rest of the details is dealt with in the helperServerFree
799 * close handler
800 */
801 srv->closePipesSafely();
802 }
803
804 Assure(!hlp->childs.n_active);
805 hlp->dropQueued();
806 }
807
808 void
809 helperStatefulShutdown(const statefulhelper::Pointer &hlp)
810 {
811 dlink_node *link = hlp->servers.head;
812 helper_stateful_server *srv;
813
814 while (link) {
815 srv = (helper_stateful_server *)link->data;
816 link = link->next;
817
818 if (srv->flags.shutdown) {
819 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
820 continue;
821 }
822
823 assert(hlp->childs.n_active > 0);
824 -- hlp->childs.n_active;
825 srv->flags.shutdown = true; /* request it to shut itself down */
826
827 if (srv->stats.pending) {
828 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
829 continue;
830 }
831
832 if (srv->flags.closing) {
833 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
834 continue;
835 }
836
837 if (srv->reserved()) {
838 if (shutting_down) {
839 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Closing anyway.");
840 } else {
841 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Not Shutting Down Yet.");
842 continue;
843 }
844 }
845
846 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
847
848 /* the rest of the details is dealt with in the helperStatefulServerFree
849 * close handler
850 */
851 srv->closePipesSafely();
852 }
853 }
854
855 Helper::Client::~Client()
856 {
857 /* note, don't free id_name, it probably points to static memory */
858
859 // A non-empty queue would leak Helper::Xaction objects, stalling any
860 // pending (and even future collapsed) transactions. To avoid stalling
861 // transactions, we must dropQueued(). We ought to do that when we
862 // discover that no progress is possible rather than here because
863 // reference counting may keep this object alive for a long time.
864 assert(queue.empty());
865 }
866
867 void
868 Helper::Client::handleKilledServer(SessionBase * const srv)
869 {
870 if (!srv->flags.shutdown) {
871 assert(childs.n_active > 0);
872 --childs.n_active;
873 debugs(84, DBG_CRITICAL, "WARNING: " << id_name << " #" << srv->index << " exited");
874
875 handleFewerServers(srv->stats.replies >= 1);
876
877 if (childs.needNew() > 0) {
878 srv->flags.shutdown = true;
879 openSessions();
880 }
881 }
882
883 if (!childs.n_active)
884 dropQueued();
885 }
886
887 void
888 Helper::Client::dropQueued()
889 {
890 if (queue.empty())
891 return;
892
893 Assure(!childs.n_active);
894 Assure(!GetFirstAvailable(this));
895
896 // no helper servers means nobody can advance our queued transactions
897
898 debugs(80, DBG_CRITICAL, "ERROR: Dropping " << queue.size() << ' ' <<
899 id_name << " helper requests due to lack of helper processes");
900 // similar to SessionBase::dropQueued()
901 while (const auto r = nextRequest()) {
902 r->reply.result = Helper::Unknown;
903 callBack(*r);
904 delete r;
905 }
906 }
907
908 void
909 Helper::Client::handleFewerServers(const bool madeProgress)
910 {
911 const auto needNew = childs.needNew();
912
913 if (!needNew)
914 return; // some server(s) have died, but we still have enough
915
916 debugs(80, DBG_IMPORTANT, "Too few " << id_name << " processes are running (need " << needNew << "/" << childs.n_max << ")" <<
917 Debug::Extra << "active processes: " << childs.n_active <<
918 Debug::Extra << "processes configured to start at (re)configuration: " << childs.n_startup);
919
920 if (childs.n_active < childs.n_startup && last_restart > squid_curtime - 30) {
921 if (madeProgress)
922 debugs(80, DBG_CRITICAL, "ERROR: The " << id_name << " helpers are crashing too rapidly, need help!");
923 else
924 fatalf("The %s helpers are crashing too rapidly, need help!", id_name);
925 }
926 }
927
928 void
929 Helper::SessionBase::HelperServerClosed(SessionBase * const srv)
930 {
931 srv->helper().handleKilledServer(srv);
932 srv->dropQueued();
933 delete srv;
934 }
935
936 Helper::Xaction *
937 Helper::Session::popRequest(const int request_number)
938 {
939 Xaction *r = nullptr;
940 if (parent->childs.concurrency) {
941 // If concurrency supported retrieve request from ID
942 const auto it = requestsIndex.find(request_number);
943 if (it != requestsIndex.end()) {
944 r = *(it->second);
945 requests.erase(it->second);
946 requestsIndex.erase(it);
947 }
948 } else if(!requests.empty()) {
949 // Else get the first request from queue, if any
950 r = requests.front();
951 requests.pop_front();
952 }
953
954 return r;
955 }
956
957 /// Calls back with a pointer to the buffer with the helper output
958 static void
959 helperReturnBuffer(Helper::Session * srv, const Helper::Client::Pointer &hlp, char * const msg, const size_t msgSize, const char * const msgEnd)
960 {
961 if (Helper::Xaction *r = srv->replyXaction) {
962 const bool hasSpace = r->reply.accumulate(msg, msgSize);
963 if (!hasSpace) {
964 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
965 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
966 "Squid input buffer: " << hlp->id_name << " #" << srv->index);
967 srv->closePipesSafely();
968 return;
969 }
970
971 if (!msgEnd)
972 return; // We are waiting for more data.
973
974 bool retry = false;
975 if (cbdataReferenceValid(r->request.data)) {
976 r->reply.finalize();
977 if (r->reply.result == Helper::BrokenHelper && r->request.retries < MAX_RETRIES) {
978 debugs(84, DBG_IMPORTANT, "ERROR: helper: " << r->reply << ", attempt #" << (r->request.retries + 1) << " of 2");
979 retry = true;
980 } else {
981 hlp->callBack(*r);
982 }
983 }
984
985 -- srv->stats.pending;
986 ++ srv->stats.replies;
987
988 ++ hlp->stats.replies;
989
990 srv->answer_time = current_time;
991
992 srv->dispatch_time = r->request.dispatch_time;
993
994 hlp->stats.avg_svc_time =
995 Math::intAverage(hlp->stats.avg_svc_time,
996 tvSubMsec(r->request.dispatch_time, current_time),
997 hlp->stats.replies, REDIRECT_AV_FACTOR);
998
999 // release or re-submit parsedRequestXaction object
1000 srv->replyXaction = nullptr;
1001 if (retry) {
1002 ++r->request.retries;
1003 hlp->submitRequest(r);
1004 } else
1005 delete r;
1006 }
1007
1008 if (hlp->timeout && hlp->childs.concurrency)
1009 srv->checkForTimedOutRequests(hlp->retryTimedOut);
1010
1011 if (!srv->flags.shutdown) {
1012 helperKickQueue(hlp);
1013 } else if (!srv->flags.closing && !srv->stats.pending) {
1014 srv->closeWritePipeSafely();
1015 }
1016 }
1017
1018 static void
1019 helperHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
1020 {
1021 const auto srv = static_cast<Helper::Session *>(data);
1022 const auto hlp = srv->parent;
1023 assert(cbdataReferenceValid(data));
1024
1025 /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
1026
1027 if (flag == Comm::ERR_CLOSING) {
1028 return;
1029 }
1030
1031 assert(conn->fd == srv->readPipe->fd);
1032
1033 debugs(84, 5, "helperHandleRead: " << len << " bytes from " << hlp->id_name << " #" << srv->index);
1034
1035 if (flag != Comm::OK || len == 0) {
1036 srv->closePipesSafely();
1037 return;
1038 }
1039
1040 srv->roffset += len;
1041 srv->rbuf[srv->roffset] = '\0';
1042 debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
1043
1044 if (!srv->stats.pending && !srv->stats.timedout) {
1045 /* someone spoke without being spoken to */
1046 debugs(84, DBG_IMPORTANT, "ERROR: Killing helper process after an unexpected read from " <<
1047 hlp->id_name << " #" << srv->index << ", " << (int)len <<
1048 " bytes '" << srv->rbuf << "'");
1049
1050 srv->roffset = 0;
1051 srv->rbuf[0] = '\0';
1052 srv->closePipesSafely();
1053 return;
1054 }
1055
1056 bool needsMore = false;
1057 char *msg = srv->rbuf;
1058 while (*msg && !needsMore) {
1059 int skip = 0;
1060 char *eom = strchr(msg, hlp->eom);
1061 if (eom) {
1062 skip = 1;
1063 debugs(84, 3, "helperHandleRead: end of reply found");
1064 if (eom > msg && eom[-1] == '\r' && hlp->eom == '\n') {
1065 *eom = '\0';
1066 // rewind to the \r octet which is the real terminal now
1067 // and remember that we have to skip forward 2 places now.
1068 skip = 2;
1069 --eom;
1070 }
1071 *eom = '\0';
1072 }
1073
1074 if (!srv->ignoreToEom && !srv->replyXaction) {
1075 int i = 0;
1076 if (hlp->childs.concurrency) {
1077 char *e = nullptr;
1078 i = strtol(msg, &e, 10);
1079 // Do we need to check for e == msg? Means wrong response from helper.
1080 // Will be dropped as "unexpected reply on channel 0"
1081 needsMore = !(xisspace(*e) || (eom && e == eom));
1082 if (!needsMore) {
1083 msg = e;
1084 while (*msg && xisspace(*msg))
1085 ++msg;
1086 } // else not enough data to compute request number
1087 }
1088 if (!(srv->replyXaction = srv->popRequest(i))) {
1089 if (srv->stats.timedout) {
1090 debugs(84, 3, "Timedout reply received for request-ID: " << i << " , ignore");
1091 } else {
1092 debugs(84, DBG_IMPORTANT, "ERROR: helperHandleRead: unexpected reply on channel " <<
1093 i << " from " << hlp->id_name << " #" << srv->index <<
1094 " '" << srv->rbuf << "'");
1095 }
1096 srv->ignoreToEom = true;
1097 }
1098 } // else we need to just append reply data to the current Xaction
1099
1100 if (!needsMore) {
1101 size_t msgSize = eom ? eom - msg : (srv->roffset - (msg - srv->rbuf));
1102 assert(msgSize <= srv->rbuf_sz);
1103 helperReturnBuffer(srv, hlp, msg, msgSize, eom);
1104 msg += msgSize + skip;
1105 assert(static_cast<size_t>(msg - srv->rbuf) <= srv->rbuf_sz);
1106
1107 // The next message should not ignored.
1108 if (eom && srv->ignoreToEom)
1109 srv->ignoreToEom = false;
1110 } else
1111 assert(skip == 0 && eom == nullptr);
1112 }
1113
1114 if (needsMore) {
1115 size_t msgSize = (srv->roffset - (msg - srv->rbuf));
1116 assert(msgSize <= srv->rbuf_sz);
1117 memmove(srv->rbuf, msg, msgSize);
1118 srv->roffset = msgSize;
1119 srv->rbuf[srv->roffset] = '\0';
1120 } else {
1121 // All of the responses parsed and msg points at the end of read data
1122 assert(static_cast<size_t>(msg - srv->rbuf) == srv->roffset);
1123 srv->roffset = 0;
1124 }
1125
1126 if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
1127 int spaceSize = srv->rbuf_sz - srv->roffset - 1;
1128 assert(spaceSize >= 0);
1129
1130 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
1131 CommIoCbPtrFun(helperHandleRead, srv));
1132 comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
1133 }
1134 }
1135
1136 static void
1137 helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
1138 {
1139 char *t = nullptr;
1140 helper_stateful_server *srv = (helper_stateful_server *)data;
1141 const auto hlp = srv->parent;
1142 assert(cbdataReferenceValid(data));
1143
1144 /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
1145
1146 if (flag == Comm::ERR_CLOSING) {
1147 return;
1148 }
1149
1150 assert(conn->fd == srv->readPipe->fd);
1151
1152 debugs(84, 5, "helperStatefulHandleRead: " << len << " bytes from " <<
1153 hlp->id_name << " #" << srv->index);
1154
1155 if (flag != Comm::OK || len == 0) {
1156 srv->closePipesSafely();
1157 return;
1158 }
1159
1160 srv->roffset += len;
1161 srv->rbuf[srv->roffset] = '\0';
1162 debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
1163
1164 if (srv->requests.empty()) {
1165 /* someone spoke without being spoken to */
1166 debugs(84, DBG_IMPORTANT, "ERROR: Killing helper process after an unexpected read from " <<
1167 hlp->id_name << " #" << srv->index << ", " << (int)len <<
1168 " bytes '" << srv->rbuf << "'");
1169
1170 srv->roffset = 0;
1171 srv->closePipesSafely();
1172 return;
1173 }
1174
1175 if ((t = strchr(srv->rbuf, hlp->eom))) {
1176 debugs(84, 3, "helperStatefulHandleRead: end of reply found");
1177
1178 if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n') {
1179 *t = '\0';
1180 // rewind to the \r octet which is the real terminal now
1181 --t;
1182 }
1183
1184 *t = '\0';
1185 }
1186
1187 const auto r = srv->requests.front();
1188
1189 if (!r->reply.accumulate(srv->rbuf, t ? (t - srv->rbuf) : srv->roffset)) {
1190 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
1191 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
1192 "Squid input buffer: " << hlp->id_name << " #" << srv->index);
1193 srv->closePipesSafely();
1194 return;
1195 }
1196 /**
1197 * BUG: the below assumes that only one response per read() was received and discards any octets remaining.
1198 * Doing this prohibits concurrency support with multiple replies per read().
1199 * TODO: check that read() setup on these buffers pays attention to roffest!=0
1200 * TODO: check that replies bigger than the buffer are discarded and do not to affect future replies
1201 */
1202 srv->roffset = 0;
1203
1204 if (t) {
1205 /* end of reply found */
1206 srv->requests.pop_front(); // we already have it in 'r'
1207 int called = 1;
1208
1209 if (cbdataReferenceValid(r->request.data)) {
1210 r->reply.finalize();
1211 r->reply.reservationId = srv->reservationId;
1212 hlp->callBack(*r);
1213 } else {
1214 debugs(84, DBG_IMPORTANT, "StatefulHandleRead: no callback data registered");
1215 called = 0;
1216 }
1217
1218 delete r;
1219
1220 -- srv->stats.pending;
1221 ++ srv->stats.replies;
1222
1223 ++ hlp->stats.replies;
1224 srv->answer_time = current_time;
1225 hlp->stats.avg_svc_time =
1226 Math::intAverage(hlp->stats.avg_svc_time,
1227 tvSubMsec(srv->dispatch_time, current_time),
1228 hlp->stats.replies, REDIRECT_AV_FACTOR);
1229
1230 if (called)
1231 helperStatefulServerDone(srv);
1232 else
1233 hlp->cancelReservation(srv->reservationId);
1234 }
1235
1236 if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
1237 int spaceSize = srv->rbuf_sz - 1;
1238
1239 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
1240 CommIoCbPtrFun(helperStatefulHandleRead, srv));
1241 comm_read(srv->readPipe, srv->rbuf, spaceSize, call);
1242 }
1243 }
1244
1245 /// Handles a request when all running helpers, if any, are busy.
1246 static void
1247 Enqueue(Helper::Client * const hlp, Helper::Xaction * const r)
1248 {
1249 hlp->queue.push(r);
1250 ++ hlp->stats.queue_size;
1251
1252 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1253 if (hlp->childs.needNew() > 0) {
1254 hlp->openSessions();
1255 return;
1256 }
1257
1258 if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
1259 return;
1260
1261 if (squid_curtime - hlp->last_queue_warn < 600)
1262 return;
1263
1264 if (shutting_down || reconfiguring)
1265 return;
1266
1267 hlp->last_queue_warn = squid_curtime;
1268
1269 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1270 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1271 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1272 }
1273
1274 static void
1275 StatefulEnqueue(statefulhelper * hlp, Helper::Xaction * r)
1276 {
1277 hlp->queue.push(r);
1278 ++ hlp->stats.queue_size;
1279
1280 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1281 if (hlp->childs.needNew() > 0) {
1282 hlp->openSessions();
1283 return;
1284 }
1285
1286 if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
1287 return;
1288
1289 if (squid_curtime - hlp->last_queue_warn < 600)
1290 return;
1291
1292 if (shutting_down || reconfiguring)
1293 return;
1294
1295 hlp->last_queue_warn = squid_curtime;
1296
1297 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1298 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1299 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1300 }
1301
1302 Helper::Xaction *
1303 Helper::Client::nextRequest()
1304 {
1305 if (queue.empty())
1306 return nullptr;
1307
1308 auto *r = queue.front();
1309 queue.pop();
1310 --stats.queue_size;
1311 return r;
1312 }
1313
1314 static Helper::Session *
1315 GetFirstAvailable(const Helper::Client::Pointer &hlp)
1316 {
1317 dlink_node *n;
1318 Helper::Session *selected = nullptr;
1319 debugs(84, 5, "GetFirstAvailable: Running servers " << hlp->childs.n_running);
1320
1321 if (hlp->childs.n_running == 0)
1322 return nullptr;
1323
1324 /* Find "least" loaded helper (approx) */
1325 for (n = hlp->servers.head; n != nullptr; n = n->next) {
1326 const auto srv = static_cast<Helper::Session *>(n->data);
1327
1328 if (selected && selected->stats.pending <= srv->stats.pending)
1329 continue;
1330
1331 if (srv->flags.shutdown)
1332 continue;
1333
1334 if (!srv->stats.pending)
1335 return srv;
1336
1337 if (selected) {
1338 selected = srv;
1339 break;
1340 }
1341
1342 selected = srv;
1343 }
1344
1345 if (!selected) {
1346 debugs(84, 5, "GetFirstAvailable: None available.");
1347 return nullptr;
1348 }
1349
1350 if (selected->stats.pending >= (hlp->childs.concurrency ? hlp->childs.concurrency : 1)) {
1351 debugs(84, 3, "GetFirstAvailable: Least-loaded helper is fully loaded!");
1352 return nullptr;
1353 }
1354
1355 debugs(84, 5, "GetFirstAvailable: returning srv-" << selected->index);
1356 return selected;
1357 }
1358
1359 static helper_stateful_server *
1360 StatefulGetFirstAvailable(const statefulhelper::Pointer &hlp)
1361 {
1362 dlink_node *n;
1363 helper_stateful_server *srv = nullptr;
1364 helper_stateful_server *oldestReservedServer = nullptr;
1365 debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp->childs.n_running);
1366
1367 if (hlp->childs.n_running == 0)
1368 return nullptr;
1369
1370 for (n = hlp->servers.head; n != nullptr; n = n->next) {
1371 srv = (helper_stateful_server *)n->data;
1372
1373 if (srv->stats.pending)
1374 continue;
1375
1376 if (srv->reserved()) {
1377 if ((squid_curtime - srv->reservationStart) > hlp->childs.reservationTimeout) {
1378 if (!oldestReservedServer)
1379 oldestReservedServer = srv;
1380 else if (oldestReservedServer->reservationStart < srv->reservationStart)
1381 oldestReservedServer = srv;
1382 debugs(84, 5, "the earlier reserved server is the srv-" << oldestReservedServer->index);
1383 }
1384 continue;
1385 }
1386
1387 if (srv->flags.shutdown)
1388 continue;
1389
1390 debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv->index);
1391 return srv;
1392 }
1393
1394 if (oldestReservedServer) {
1395 debugs(84, 5, "expired reservation " << oldestReservedServer->reservationId << " for srv-" << oldestReservedServer->index);
1396 return oldestReservedServer;
1397 }
1398
1399 debugs(84, 5, "StatefulGetFirstAvailable: None available.");
1400 return nullptr;
1401 }
1402
1403 static void
1404 helperDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag flag, int, void *data)
1405 {
1406 const auto srv = static_cast<Helper::Session *>(data);
1407
1408 srv->writebuf->clean();
1409 delete srv->writebuf;
1410 srv->writebuf = nullptr;
1411 srv->flags.writing = false;
1412
1413 if (flag != Comm::OK) {
1414 /* Helper server has crashed */
1415 debugs(84, DBG_CRITICAL, "helperDispatch: Helper " << srv->parent->id_name << " #" << srv->index << " has crashed");
1416 return;
1417 }
1418
1419 if (!srv->wqueue->isNull()) {
1420 srv->writebuf = srv->wqueue;
1421 srv->wqueue = new MemBuf;
1422 srv->flags.writing = true;
1423 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1424 CommIoCbPtrFun(helperDispatchWriteDone, srv));
1425 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, nullptr);
1426 }
1427 }
1428
1429 static void
1430 helperDispatch(Helper::Session * const srv, Helper::Xaction * const r)
1431 {
1432 const auto hlp = srv->parent;
1433 const uint64_t reqId = ++srv->nextRequestId;
1434
1435 if (!cbdataReferenceValid(r->request.data)) {
1436 debugs(84, DBG_IMPORTANT, "ERROR: helperDispatch: invalid callback data");
1437 delete r;
1438 return;
1439 }
1440
1441 r->request.Id = reqId;
1442 const auto it = srv->requests.insert(srv->requests.end(), r);
1443 r->request.dispatch_time = current_time;
1444
1445 if (srv->wqueue->isNull())
1446 srv->wqueue->init();
1447
1448 if (hlp->childs.concurrency) {
1449 srv->requestsIndex.insert(Helper::Session::RequestIndex::value_type(reqId, it));
1450 assert(srv->requestsIndex.size() == srv->requests.size());
1451 srv->wqueue->appendf("%" PRIu64 " %s", reqId, r->request.buf);
1452 } else
1453 srv->wqueue->append(r->request.buf, strlen(r->request.buf));
1454
1455 if (!srv->flags.writing) {
1456 assert(nullptr == srv->writebuf);
1457 srv->writebuf = srv->wqueue;
1458 srv->wqueue = new MemBuf;
1459 srv->flags.writing = true;
1460 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1461 CommIoCbPtrFun(helperDispatchWriteDone, srv));
1462 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, nullptr);
1463 }
1464
1465 debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index << ", " << strlen(r->request.buf) << " bytes");
1466
1467 ++ srv->stats.uses;
1468 ++ srv->stats.pending;
1469 ++ hlp->stats.requests;
1470 }
1471
1472 static void
1473 helperStatefulDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag, int, void *)
1474 {}
1475
1476 static void
1477 helperStatefulDispatch(helper_stateful_server * srv, Helper::Xaction * r)
1478 {
1479 const auto hlp = srv->parent;
1480
1481 if (!cbdataReferenceValid(r->request.data)) {
1482 debugs(84, DBG_IMPORTANT, "ERROR: helperStatefulDispatch: invalid callback data");
1483 delete r;
1484 hlp->cancelReservation(srv->reservationId);
1485 return;
1486 }
1487
1488 debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index);
1489
1490 assert(srv->reservationId);
1491 r->reply.reservationId = srv->reservationId;
1492
1493 if (r->request.placeholder == 1) {
1494 /* a callback is needed before this request can _use_ a helper. */
1495 /* we don't care about releasing this helper. The request NEVER
1496 * gets to the helper. So we throw away the return code */
1497 r->reply.result = Helper::Unknown;
1498 hlp->callBack(*r);
1499 /* throw away the placeholder */
1500 delete r;
1501 /* and push the queue. Note that the callback may have submitted a new
1502 * request to the helper which is why we test for the request */
1503
1504 if (!srv->requests.size())
1505 helperStatefulServerDone(srv);
1506
1507 return;
1508 }
1509
1510 srv->requests.push_back(r);
1511 srv->dispatch_time = current_time;
1512 AsyncCall::Pointer call = commCbCall(5,5, "helperStatefulDispatchWriteDone",
1513 CommIoCbPtrFun(helperStatefulDispatchWriteDone, srv));
1514 Comm::Write(srv->writePipe, r->request.buf, strlen(r->request.buf), call, nullptr);
1515 debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
1516 hlp->id_name << " #" << srv->index << ", " <<
1517 (int) strlen(r->request.buf) << " bytes");
1518
1519 ++ srv->stats.uses;
1520 ++ srv->stats.pending;
1521 ++ hlp->stats.requests;
1522 }
1523
1524 static void
1525 helperKickQueue(const Helper::Client::Pointer &hlp)
1526 {
1527 Helper::Xaction *r = nullptr;
1528 Helper::Session *srv = nullptr;
1529
1530 while ((srv = GetFirstAvailable(hlp)) && (r = hlp->nextRequest()))
1531 helperDispatch(srv, r);
1532
1533 if (!hlp->childs.n_active)
1534 hlp->dropQueued();
1535 }
1536
1537 static void
1538 helperStatefulKickQueue(const statefulhelper::Pointer &hlp)
1539 {
1540 Helper::Xaction *r;
1541 helper_stateful_server *srv;
1542 while ((srv = StatefulGetFirstAvailable(hlp)) && (r = hlp->nextRequest())) {
1543 debugs(84, 5, "found srv-" << srv->index);
1544 hlp->reserveServer(srv);
1545 helperStatefulDispatch(srv, r);
1546 }
1547
1548 if (!hlp->childs.n_active)
1549 hlp->dropQueued();
1550 }
1551
1552 static void
1553 helperStatefulServerDone(helper_stateful_server * srv)
1554 {
1555 if (!srv->flags.shutdown) {
1556 helperStatefulKickQueue(srv->parent);
1557 } else if (!srv->flags.closing && !srv->reserved() && !srv->stats.pending) {
1558 srv->closeWritePipeSafely();
1559 return;
1560 }
1561 }
1562
1563 void
1564 Helper::Session::checkForTimedOutRequests(bool const retry)
1565 {
1566 assert(parent->childs.concurrency);
1567 while(!requests.empty() && requests.front()->request.timedOut(parent->timeout)) {
1568 const auto r = requests.front();
1569 RequestIndex::iterator it;
1570 it = requestsIndex.find(r->request.Id);
1571 assert(it != requestsIndex.end());
1572 requestsIndex.erase(it);
1573 requests.pop_front();
1574 debugs(84, 2, "Request " << r->request.Id << " timed-out, remove it from queue");
1575 bool retried = false;
1576 if (retry && r->request.retries < MAX_RETRIES && cbdataReferenceValid(r->request.data)) {
1577 debugs(84, 2, "Retry request " << r->request.Id);
1578 ++r->request.retries;
1579 parent->submitRequest(r);
1580 retried = true;
1581 } else if (cbdataReferenceValid(r->request.data)) {
1582 if (!parent->onTimedOutResponse.isEmpty()) {
1583 if (r->reply.accumulate(parent->onTimedOutResponse.rawContent(), parent->onTimedOutResponse.length()))
1584 r->reply.finalize();
1585 else
1586 r->reply.result = Helper::TimedOut;
1587 parent->callBack(*r);
1588 } else {
1589 r->reply.result = Helper::TimedOut;
1590 parent->callBack(*r);
1591 }
1592 }
1593 --stats.pending;
1594 ++stats.timedout;
1595 ++parent->stats.timedout;
1596 if (!retried)
1597 delete r;
1598 }
1599 }
1600
1601 void
1602 Helper::Session::requestTimeout(const CommTimeoutCbParams &io)
1603 {
1604 debugs(26, 3, io.conn);
1605 const auto srv = static_cast<Session *>(io.data);
1606
1607 srv->checkForTimedOutRequests(srv->parent->retryTimedOut);
1608
1609 debugs(84, 3, io.conn << " establish a new timeout");
1610 AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "Helper::Session::requestTimeout",
1611 CommTimeoutCbPtrFun(Session::requestTimeout, srv));
1612
1613 const time_t timeSpent = srv->requests.empty() ? 0 : (squid_curtime - srv->requests.front()->request.dispatch_time.tv_sec);
1614 const time_t minimumNewTimeout = 1; // second
1615 const auto timeLeft = max(minimumNewTimeout, srv->parent->timeout - timeSpent);
1616
1617 commSetConnTimeout(io.conn, timeLeft, timeoutCall);
1618 }
1619