]> git.ipfire.org Git - thirdparty/squid.git/blob - src/helper.cc
77aaab79c18b43b116001579717c0290a0bb784c
[thirdparty/squid.git] / src / helper.cc
1 /*
2 * Copyright (C) 1996-2025 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 84 Helper process maintenance */
10
11 #include "squid.h"
12 #include "base/AsyncCbdataCalls.h"
13 #include "base/Packable.h"
14 #include "base/Raw.h"
15 #include "comm.h"
16 #include "comm/Connection.h"
17 #include "comm/Read.h"
18 #include "comm/Write.h"
19 #include "debug/Messages.h"
20 #include "fd.h"
21 #include "fde.h"
22 #include "format/Quoting.h"
23 #include "helper.h"
24 #include "helper/Reply.h"
25 #include "helper/Request.h"
26 #include "MemBuf.h"
27 #include "SquidConfig.h"
28 #include "SquidIpc.h"
29 #include "SquidMath.h"
30 #include "Store.h"
31 #include "wordlist.h"
32
33 // helper_stateful_server::data uses explicit alloc()/freeOne() */
34 #include "mem/Pool.h"
35
36 #define HELPER_MAX_ARGS 64
37
38 /// The maximum allowed request retries.
39 #define MAX_RETRIES 2
40
41 /// Helpers input buffer size.
42 const size_t ReadBufSize(32*1024);
43
44 static IOCB helperHandleRead;
45 static IOCB helperStatefulHandleRead;
46 static void Enqueue(Helper::Client *, Helper::Xaction *);
47 static Helper::Session *GetFirstAvailable(const Helper::Client::Pointer &);
48 static helper_stateful_server *StatefulGetFirstAvailable(const statefulhelper::Pointer &);
49 static void helperDispatch(Helper::Session *, Helper::Xaction *);
50 static void helperStatefulDispatch(helper_stateful_server * srv, Helper::Xaction * r);
51 static void helperKickQueue(const Helper::Client::Pointer &);
52 static void helperStatefulKickQueue(const statefulhelper::Pointer &);
53 static void helperStatefulServerDone(helper_stateful_server * srv);
54 static void StatefulEnqueue(statefulhelper * hlp, Helper::Xaction * r);
55
56 CBDATA_NAMESPACED_CLASS_INIT(Helper, Session);
57 CBDATA_CLASS_INIT(helper_stateful_server);
58
59 InstanceIdDefinitions(Helper::SessionBase, "Hlpr");
60
61 void
62 Helper::SessionBase::initStats()
63 {
64 stats.uses=0;
65 stats.replies=0;
66 stats.pending=0;
67 stats.releases=0;
68 stats.timedout = 0;
69 }
70
71 void
72 Helper::SessionBase::closePipesSafely()
73 {
74 #if _SQUID_WINDOWS_
75 shutdown(writePipe->fd, SD_BOTH);
76 #endif
77
78 flags.closing = true;
79 if (readPipe->fd == writePipe->fd)
80 readPipe->fd = -1;
81 else
82 readPipe->close();
83 writePipe->close();
84
85 #if _SQUID_WINDOWS_
86 if (hIpc) {
87 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
88 getCurrentTime();
89 debugs(84, DBG_IMPORTANT, "WARNING: " << helper().id_name <<
90 " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
91 }
92 CloseHandle(hIpc);
93 }
94 #endif
95 }
96
97 void
98 Helper::SessionBase::closeWritePipeSafely()
99 {
100 #if _SQUID_WINDOWS_
101 shutdown(writePipe->fd, (readPipe->fd == writePipe->fd ? SD_BOTH : SD_SEND));
102 #endif
103
104 flags.closing = true;
105 if (readPipe->fd == writePipe->fd)
106 readPipe->fd = -1;
107 writePipe->close();
108
109 #if _SQUID_WINDOWS_
110 if (hIpc) {
111 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
112 getCurrentTime();
113 debugs(84, DBG_IMPORTANT, "WARNING: " << helper().id_name <<
114 " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
115 }
116 CloseHandle(hIpc);
117 }
118 #endif
119 }
120
121 void
122 Helper::SessionBase::dropQueued()
123 {
124 while (!requests.empty()) {
125 // XXX: re-schedule these on another helper?
126 const auto r = requests.front();
127 requests.pop_front();
128 r->reply.result = Helper::Unknown;
129 helper().callBack(*r);
130 delete r;
131 }
132 }
133
134 Helper::SessionBase::~SessionBase()
135 {
136 if (rbuf) {
137 memFreeBuf(rbuf_sz, rbuf);
138 rbuf = nullptr;
139 }
140 }
141
142 Helper::Session::~Session()
143 {
144 wqueue->clean();
145 delete wqueue;
146
147 if (writebuf) {
148 writebuf->clean();
149 delete writebuf;
150 writebuf = nullptr;
151 }
152
153 if (Comm::IsConnOpen(writePipe))
154 closeWritePipeSafely();
155
156 dlinkDelete(&link, &parent->servers);
157
158 assert(parent->childs.n_running > 0);
159 -- parent->childs.n_running;
160
161 assert(requests.empty());
162 }
163
164 void
165 Helper::Session::dropQueued()
166 {
167 SessionBase::dropQueued();
168 requestsIndex.clear();
169 }
170
171 helper_stateful_server::~helper_stateful_server()
172 {
173 /* TODO: walk the local queue of requests and carry them all out */
174 if (Comm::IsConnOpen(writePipe))
175 closeWritePipeSafely();
176
177 parent->cancelReservation(reservationId);
178
179 dlinkDelete(&link, &parent->servers);
180
181 assert(parent->childs.n_running > 0);
182 -- parent->childs.n_running;
183
184 assert(requests.empty());
185 }
186
187 void
188 Helper::Client::openSessions()
189 {
190 char *s;
191 char *progname;
192 char *shortname;
193 char *procname;
194 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
195 char fd_note_buf[FD_DESC_SZ];
196 int nargs = 0;
197 int k;
198 pid_t pid;
199 int rfd;
200 int wfd;
201 void * hIpc;
202 wordlist *w;
203 // Helps reducing diff. TODO: remove
204 const auto hlp = this;
205
206 if (hlp->cmdline == nullptr)
207 return;
208
209 progname = hlp->cmdline->key;
210
211 if ((s = strrchr(progname, '/')))
212 shortname = xstrdup(s + 1);
213 else
214 shortname = xstrdup(progname);
215
216 /* figure out how many new child are actually needed. */
217 int need_new = hlp->childs.needNew();
218
219 debugs(84, Important(19), "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
220
221 if (need_new < 1) {
222 debugs(84, Important(20), "helperOpenServers: No '" << shortname << "' processes needed.");
223 }
224
225 procname = (char *)xmalloc(strlen(shortname) + 3);
226
227 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
228
229 args[nargs] = procname;
230 ++nargs;
231
232 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
233 args[nargs] = w->key;
234 ++nargs;
235 }
236
237 args[nargs] = nullptr;
238 ++nargs;
239
240 assert(nargs <= HELPER_MAX_ARGS);
241
242 int successfullyStarted = 0;
243
244 for (k = 0; k < need_new; ++k) {
245 getCurrentTime();
246 rfd = wfd = -1;
247 pid = ipcCreate(hlp->ipc_type,
248 progname,
249 args,
250 shortname,
251 hlp->addr,
252 &rfd,
253 &wfd,
254 &hIpc);
255
256 if (pid < 0) {
257 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
258 continue;
259 }
260
261 ++successfullyStarted;
262 ++ hlp->childs.n_running;
263 ++ hlp->childs.n_active;
264 const auto srv = new Helper::Session;
265 srv->hIpc = hIpc;
266 srv->pid = pid;
267 srv->initStats();
268 srv->addr = hlp->addr;
269 srv->readPipe = new Comm::Connection;
270 srv->readPipe->fd = rfd;
271 srv->writePipe = new Comm::Connection;
272 srv->writePipe->fd = wfd;
273 srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
274 srv->wqueue = new MemBuf;
275 srv->roffset = 0;
276 srv->nextRequestId = 0;
277 srv->replyXaction = nullptr;
278 srv->ignoreToEom = false;
279 srv->parent = hlp;
280 dlinkAddTail(srv, &srv->link, &hlp->servers);
281
282 if (rfd == wfd) {
283 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
284 fd_note(rfd, fd_note_buf);
285 } else {
286 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
287 fd_note(rfd, fd_note_buf);
288 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
289 fd_note(wfd, fd_note_buf);
290 }
291
292 commSetNonBlocking(rfd);
293
294 if (wfd != rfd)
295 commSetNonBlocking(wfd);
296
297 AsyncCall::Pointer closeCall = asyncCall(5, 4, "Helper::Session::HelperServerClosed", cbdataDialer(SessionBase::HelperServerClosed,
298 static_cast<Helper::SessionBase *>(srv)));
299
300 comm_add_close_handler(rfd, closeCall);
301
302 if (hlp->timeout && hlp->childs.concurrency) {
303 AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "Helper::Session::requestTimeout",
304 CommTimeoutCbPtrFun(Helper::Session::requestTimeout, srv));
305 commSetConnTimeout(srv->readPipe, hlp->timeout, timeoutCall);
306 }
307
308 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
309 CommIoCbPtrFun(helperHandleRead, srv));
310 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
311 }
312
313 // Call handleFewerServers() before hlp->last_restart is updated because
314 // that method uses last_restart to measure the delay since previous start.
315 // TODO: Refactor last_restart code to measure failure frequency rather than
316 // detecting a helper #X failure that is being close to the helper #Y start.
317 if (successfullyStarted < need_new)
318 hlp->handleFewerServers(false);
319
320 hlp->last_restart = squid_curtime;
321 safe_free(shortname);
322 safe_free(procname);
323 helperKickQueue(hlp);
324 }
325
326 void
327 statefulhelper::openSessions()
328 {
329 char *shortname;
330 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
331 char fd_note_buf[FD_DESC_SZ];
332 int nargs = 0;
333 // Helps reducing diff. TODO: remove
334 const auto hlp = this;
335
336 if (hlp->cmdline == nullptr)
337 return;
338
339 if (hlp->childs.concurrency)
340 debugs(84, DBG_CRITICAL, "ERROR: concurrency= is not yet supported for stateful helpers ('" << hlp->cmdline << "')");
341
342 char *progname = hlp->cmdline->key;
343
344 char *s;
345 if ((s = strrchr(progname, '/')))
346 shortname = xstrdup(s + 1);
347 else
348 shortname = xstrdup(progname);
349
350 /* figure out haw mant new helpers are needed. */
351 int need_new = hlp->childs.needNew();
352
353 debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
354
355 if (need_new < 1) {
356 debugs(84, DBG_IMPORTANT, "helperStatefulOpenServers: No '" << shortname << "' processes needed.");
357 }
358
359 char *procname = (char *)xmalloc(strlen(shortname) + 3);
360
361 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
362
363 args[nargs] = procname;
364 ++nargs;
365
366 for (wordlist *w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
367 args[nargs] = w->key;
368 ++nargs;
369 }
370
371 args[nargs] = nullptr;
372 ++nargs;
373
374 assert(nargs <= HELPER_MAX_ARGS);
375
376 int successfullyStarted = 0;
377
378 for (int k = 0; k < need_new; ++k) {
379 getCurrentTime();
380 int rfd = -1;
381 int wfd = -1;
382 void * hIpc;
383 pid_t pid = ipcCreate(hlp->ipc_type,
384 progname,
385 args,
386 shortname,
387 hlp->addr,
388 &rfd,
389 &wfd,
390 &hIpc);
391
392 if (pid < 0) {
393 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
394 continue;
395 }
396
397 ++successfullyStarted;
398 ++ hlp->childs.n_running;
399 ++ hlp->childs.n_active;
400 helper_stateful_server *srv = new helper_stateful_server;
401 srv->hIpc = hIpc;
402 srv->pid = pid;
403 srv->initStats();
404 srv->addr = hlp->addr;
405 srv->readPipe = new Comm::Connection;
406 srv->readPipe->fd = rfd;
407 srv->writePipe = new Comm::Connection;
408 srv->writePipe->fd = wfd;
409 srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
410 srv->roffset = 0;
411 srv->parent = hlp;
412 srv->reservationStart = 0;
413
414 dlinkAddTail(srv, &srv->link, &hlp->servers);
415
416 if (rfd == wfd) {
417 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
418 fd_note(rfd, fd_note_buf);
419 } else {
420 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
421 fd_note(rfd, fd_note_buf);
422 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
423 fd_note(wfd, fd_note_buf);
424 }
425
426 commSetNonBlocking(rfd);
427
428 if (wfd != rfd)
429 commSetNonBlocking(wfd);
430
431 AsyncCall::Pointer closeCall = asyncCall(5, 4, "helper_stateful_server::HelperServerClosed", cbdataDialer(Helper::SessionBase::HelperServerClosed,
432 static_cast<Helper::SessionBase *>(srv)));
433
434 comm_add_close_handler(rfd, closeCall);
435
436 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
437 CommIoCbPtrFun(helperStatefulHandleRead, srv));
438 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
439 }
440
441 // Call handleFewerServers() before hlp->last_restart is updated because
442 // that method uses last_restart to measure the delay since previous start.
443 // TODO: Refactor last_restart code to measure failure frequency rather than
444 // detecting a helper #X failure that is being close to the helper #Y start.
445 if (successfullyStarted < need_new)
446 hlp->handleFewerServers(false);
447
448 hlp->last_restart = squid_curtime;
449 safe_free(shortname);
450 safe_free(procname);
451 helperStatefulKickQueue(hlp);
452 }
453
454 void
455 Helper::Client::submitRequest(Helper::Xaction * const r)
456 {
457 if (const auto srv = GetFirstAvailable(this))
458 helperDispatch(srv, r);
459 else
460 Enqueue(this, r);
461
462 syncQueueStats();
463 }
464
465 /// handles helperSubmit() and helperStatefulSubmit() failures
466 static void
467 SubmissionFailure(const Helper::Client::Pointer &hlp, HLPCB *callback, void *data)
468 {
469 auto result = Helper::Error;
470 if (!hlp) {
471 debugs(84, 3, "no helper");
472 result = Helper::Unknown;
473 }
474 // else pretend the helper has responded with ERR
475
476 callback(data, Helper::Reply(result));
477 }
478
479 void
480 helperSubmit(const Helper::Client::Pointer &hlp, const char * const buf, HLPCB * const callback, void * const data)
481 {
482 if (!hlp || !hlp->trySubmit(buf, callback, data))
483 SubmissionFailure(hlp, callback, data);
484 }
485
486 /// whether queuing an additional request would overload the helper
487 bool
488 Helper::Client::queueFull() const {
489 return stats.queue_size >= static_cast<int>(childs.queue_size);
490 }
491
492 bool
493 Helper::Client::overloaded() const {
494 return stats.queue_size > static_cast<int>(childs.queue_size);
495 }
496
497 /// synchronizes queue-dependent measurements with the current queue state
498 void
499 Helper::Client::syncQueueStats()
500 {
501 if (overloaded()) {
502 if (overloadStart) {
503 debugs(84, 5, id_name << " still overloaded; dropped " << droppedRequests);
504 } else {
505 overloadStart = squid_curtime;
506 debugs(84, 3, id_name << " became overloaded");
507 }
508 } else {
509 if (overloadStart) {
510 debugs(84, 5, id_name << " is no longer overloaded");
511 if (droppedRequests) {
512 debugs(84, DBG_IMPORTANT, "helper " << id_name <<
513 " is no longer overloaded after dropping " << droppedRequests <<
514 " requests in " << (squid_curtime - overloadStart) << " seconds");
515 droppedRequests = 0;
516 }
517 overloadStart = 0;
518 }
519 }
520 }
521
522 /// prepares the helper for request submission
523 /// returns true if and only if the submission should proceed
524 /// may kill Squid if the helper remains overloaded for too long
525 bool
526 Helper::Client::prepSubmit()
527 {
528 // re-sync for the configuration may have changed since the last submission
529 syncQueueStats();
530
531 // Nothing special to do if the new request does not overload (i.e., the
532 // queue is not even full yet) or only _starts_ overloading this helper
533 // (i.e., the queue is currently at its limit).
534 if (!overloaded())
535 return true;
536
537 if (squid_curtime - overloadStart <= 180)
538 return true; // also OK: overload has not persisted long enough to panic
539
540 if (childs.onPersistentOverload == ChildConfig::actDie)
541 fatalf("Too many queued %s requests; see on-persistent-overload.", id_name);
542
543 if (!droppedRequests) {
544 debugs(84, DBG_IMPORTANT, "WARNING: dropping requests to overloaded " <<
545 id_name << " helper configured with on-persistent-overload=err");
546 }
547 ++droppedRequests;
548 debugs(84, 3, "failed to send " << droppedRequests << " helper requests to " << id_name);
549 return false;
550 }
551
552 bool
553 Helper::Client::trySubmit(const char * const buf, HLPCB * const callback, void * const data)
554 {
555 if (!prepSubmit())
556 return false; // request was dropped
557
558 submit(buf, callback, data); // will send or queue
559 return true; // request submitted or queued
560 }
561
562 /// dispatches or enqueues a helper requests; does not enforce queue limits
563 void
564 Helper::Client::submit(const char * const buf, HLPCB * const callback, void * const data)
565 {
566 const auto r = new Xaction(callback, data, buf);
567 submitRequest(r);
568 debugs(84, DBG_DATA, Raw("buf", buf, strlen(buf)));
569 }
570
571 void
572 Helper::Client::callBack(Xaction &r)
573 {
574 const auto callback = r.request.callback;
575 Assure(callback);
576
577 r.request.callback = nullptr;
578 void *cbdata = nullptr;
579 if (cbdataReferenceValidDone(r.request.data, &cbdata))
580 callback(cbdata, r.reply);
581 }
582
583 /// Submit request or callback the caller with a Helper::Error error.
584 /// If the reservation is not set then reserves a new helper.
585 void
586 helperStatefulSubmit(const statefulhelper::Pointer &hlp, const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
587 {
588 if (!hlp || !hlp->trySubmit(buf, callback, data, reservation))
589 SubmissionFailure(hlp, callback, data);
590 }
591
592 /// If possible, submit request. Otherwise, either kill Squid or return false.
593 bool
594 statefulhelper::trySubmit(const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
595 {
596 if (!prepSubmit())
597 return false; // request was dropped
598
599 submit(buf, callback, data, reservation); // will send or queue
600 return true; // request submitted or queued
601 }
602
603 void
604 statefulhelper::reserveServer(helper_stateful_server * srv)
605 {
606 // clear any old reservation
607 if (srv->reserved()) {
608 reservations.erase(srv->reservationId);
609 srv->clearReservation();
610 }
611
612 srv->reserve();
613 reservations.insert(Reservations::value_type(srv->reservationId, srv));
614 }
615
616 void
617 statefulhelper::cancelReservation(const Helper::ReservationId reservation)
618 {
619 const auto it = reservations.find(reservation);
620 if (it == reservations.end())
621 return;
622
623 helper_stateful_server *srv = it->second;
624 reservations.erase(it);
625 srv->clearReservation();
626
627 // schedule a queue kick
628 AsyncCall::Pointer call = asyncCall(5,4, "helperStatefulServerDone", cbdataDialer(helperStatefulServerDone, srv));
629 ScheduleCallHere(call);
630 }
631
632 helper_stateful_server *
633 statefulhelper::findServer(const Helper::ReservationId & reservation)
634 {
635 const auto it = reservations.find(reservation);
636 if (it == reservations.end())
637 return nullptr;
638 return it->second;
639 }
640
641 void
642 helper_stateful_server::reserve()
643 {
644 assert(!reservationId);
645 reservationStart = squid_curtime;
646 reservationId = Helper::ReservationId::Next();
647 debugs(84, 3, "srv-" << index << " reservation id = " << reservationId);
648 }
649
650 void
651 helper_stateful_server::clearReservation()
652 {
653 debugs(84, 3, "srv-" << index << " reservation id = " << reservationId);
654 if (!reservationId)
655 return;
656
657 ++stats.releases;
658
659 reservationId.clear();
660 reservationStart = 0;
661 }
662
663 void
664 statefulhelper::submit(const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
665 {
666 Helper::Xaction *r = new Helper::Xaction(callback, data, buf);
667
668 if (buf && reservation) {
669 debugs(84, 5, reservation);
670 helper_stateful_server *lastServer = findServer(reservation);
671 if (!lastServer) {
672 debugs(84, DBG_CRITICAL, "ERROR: Helper " << id_name << " reservation expired (" << reservation << ")");
673 r->reply.result = Helper::TimedOut;
674 callBack(*r);
675 delete r;
676 return;
677 }
678 debugs(84, 5, "StatefulSubmit dispatching");
679 helperStatefulDispatch(lastServer, r);
680 } else {
681 helper_stateful_server *srv;
682 if ((srv = StatefulGetFirstAvailable(this))) {
683 reserveServer(srv);
684 helperStatefulDispatch(srv, r); // may delete r
685 } else
686 StatefulEnqueue(this, r);
687 }
688
689 // r may be dangling here
690 syncQueueStats();
691 }
692
693 void
694 Helper::Client::packStatsInto(Packable * const p, const char * const label) const
695 {
696 if (label)
697 p->appendf("%s:\n", label);
698
699 p->appendf(" program: %s\n", cmdline->key);
700 p->appendf(" number active: %d of %d (%d shutting down)\n", childs.n_active, childs.n_max, (childs.n_running - childs.n_active));
701 p->appendf(" requests sent: %d\n", stats.requests);
702 p->appendf(" replies received: %d\n", stats.replies);
703 p->appendf(" requests timedout: %d\n", stats.timedout);
704 p->appendf(" queue length: %d\n", stats.queue_size);
705 p->appendf(" avg service time: %d msec\n", stats.avg_svc_time);
706 p->append("\n",1);
707 p->appendf("%7s\t%7s\t%7s\t%11s\t%11s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
708 "ID #",
709 "FD",
710 "PID",
711 "# Requests",
712 "# Replies",
713 "# Timed-out",
714 "Flags",
715 "Time",
716 "Offset",
717 "Request");
718
719 for (dlink_node *link = servers.head; link; link = link->next) {
720 const auto srv = static_cast<SessionBase *>(link->data);
721 assert(srv);
722 const auto xaction = srv->requests.empty() ? nullptr : srv->requests.front();
723 double tt = 0.001 * (xaction ? tvSubMsec(xaction->request.dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
724 p->appendf("%7u\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
725 srv->index.value,
726 srv->readPipe->fd,
727 srv->pid,
728 srv->stats.uses,
729 srv->stats.replies,
730 srv->stats.timedout,
731 srv->stats.pending ? 'B' : ' ',
732 srv->flags.writing ? 'W' : ' ',
733 srv->flags.closing ? 'C' : ' ',
734 srv->reserved() ? 'R' : ' ',
735 srv->flags.shutdown ? 'S' : ' ',
736 xaction && xaction->request.placeholder ? 'P' : ' ',
737 tt < 0.0 ? 0.0 : tt,
738 (int) srv->roffset,
739 xaction ? Format::QuoteMimeBlob(xaction->request.buf) : "(none)");
740 }
741
742 p->append("\nFlags key:\n"
743 " B\tBUSY\n"
744 " W\tWRITING\n"
745 " C\tCLOSING\n"
746 " R\tRESERVED\n"
747 " S\tSHUTDOWN PENDING\n"
748 " P\tPLACEHOLDER\n", 101);
749 }
750
751 bool
752 Helper::Client::willOverload() const {
753 return queueFull() && !(childs.needNew() || GetFirstAvailable(this));
754 }
755
756 Helper::Client::Pointer
757 Helper::Client::Make(const char * const name)
758 {
759 return new Client(name);
760 }
761
762 statefulhelper::Pointer
763 statefulhelper::Make(const char *name)
764 {
765 return new statefulhelper(name);
766 }
767
768 void
769 helperShutdown(const Helper::Client::Pointer &hlp)
770 {
771 dlink_node *link = hlp->servers.head;
772
773 while (link) {
774 const auto srv = static_cast<Helper::Session *>(link->data);
775 link = link->next;
776
777 if (srv->flags.shutdown) {
778 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
779 continue;
780 }
781
782 assert(hlp->childs.n_active > 0);
783 -- hlp->childs.n_active;
784 srv->flags.shutdown = true; /* request it to shut itself down */
785
786 if (srv->flags.closing) {
787 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
788 continue;
789 }
790
791 if (srv->stats.pending) {
792 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
793 continue;
794 }
795
796 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
797 /* the rest of the details is dealt with in the helperServerFree
798 * close handler
799 */
800 srv->closePipesSafely();
801 }
802
803 Assure(!hlp->childs.n_active);
804 hlp->dropQueued();
805 }
806
807 void
808 helperStatefulShutdown(const statefulhelper::Pointer &hlp)
809 {
810 dlink_node *link = hlp->servers.head;
811 helper_stateful_server *srv;
812
813 while (link) {
814 srv = (helper_stateful_server *)link->data;
815 link = link->next;
816
817 if (srv->flags.shutdown) {
818 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
819 continue;
820 }
821
822 assert(hlp->childs.n_active > 0);
823 -- hlp->childs.n_active;
824 srv->flags.shutdown = true; /* request it to shut itself down */
825
826 if (srv->stats.pending) {
827 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
828 continue;
829 }
830
831 if (srv->flags.closing) {
832 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
833 continue;
834 }
835
836 if (srv->reserved()) {
837 if (shutting_down) {
838 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Closing anyway.");
839 } else {
840 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Not Shutting Down Yet.");
841 continue;
842 }
843 }
844
845 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
846
847 /* the rest of the details is dealt with in the helperStatefulServerFree
848 * close handler
849 */
850 srv->closePipesSafely();
851 }
852 }
853
854 Helper::Client::~Client()
855 {
856 /* note, don't free id_name, it probably points to static memory */
857
858 // A non-empty queue would leak Helper::Xaction objects, stalling any
859 // pending (and even future collapsed) transactions. To avoid stalling
860 // transactions, we must dropQueued(). We ought to do that when we
861 // discover that no progress is possible rather than here because
862 // reference counting may keep this object alive for a long time.
863 assert(queue.empty());
864 }
865
866 void
867 Helper::Client::handleKilledServer(SessionBase * const srv)
868 {
869 if (!srv->flags.shutdown) {
870 assert(childs.n_active > 0);
871 --childs.n_active;
872 debugs(84, DBG_CRITICAL, "WARNING: " << id_name << " #" << srv->index << " exited");
873
874 handleFewerServers(srv->stats.replies >= 1);
875
876 if (childs.needNew() > 0) {
877 srv->flags.shutdown = true;
878 openSessions();
879 }
880 }
881
882 if (!childs.n_active)
883 dropQueued();
884 }
885
886 void
887 Helper::Client::dropQueued()
888 {
889 if (queue.empty())
890 return;
891
892 Assure(!childs.n_active);
893 Assure(!GetFirstAvailable(this));
894
895 // no helper servers means nobody can advance our queued transactions
896
897 debugs(80, DBG_CRITICAL, "ERROR: Dropping " << queue.size() << ' ' <<
898 id_name << " helper requests due to lack of helper processes");
899 // similar to SessionBase::dropQueued()
900 while (const auto r = nextRequest()) {
901 r->reply.result = Helper::Unknown;
902 callBack(*r);
903 delete r;
904 }
905 }
906
907 void
908 Helper::Client::handleFewerServers(const bool madeProgress)
909 {
910 const auto needNew = childs.needNew();
911
912 if (!needNew)
913 return; // some server(s) have died, but we still have enough
914
915 debugs(80, DBG_IMPORTANT, "Too few " << id_name << " processes are running (need " << needNew << "/" << childs.n_max << ")" <<
916 Debug::Extra << "active processes: " << childs.n_active <<
917 Debug::Extra << "processes configured to start at (re)configuration: " << childs.n_startup);
918
919 if (childs.n_active < childs.n_startup && last_restart > squid_curtime - 30) {
920 if (madeProgress)
921 debugs(80, DBG_CRITICAL, "ERROR: The " << id_name << " helpers are crashing too rapidly, need help!");
922 else
923 fatalf("The %s helpers are crashing too rapidly, need help!", id_name);
924 }
925 }
926
927 void
928 Helper::SessionBase::HelperServerClosed(SessionBase * const srv)
929 {
930 srv->helper().handleKilledServer(srv);
931 srv->dropQueued();
932 delete srv;
933 }
934
935 Helper::Xaction *
936 Helper::Session::popRequest(const int request_number)
937 {
938 Xaction *r = nullptr;
939 if (parent->childs.concurrency) {
940 // If concurrency supported retrieve request from ID
941 const auto it = requestsIndex.find(request_number);
942 if (it != requestsIndex.end()) {
943 r = *(it->second);
944 requests.erase(it->second);
945 requestsIndex.erase(it);
946 }
947 } else if(!requests.empty()) {
948 // Else get the first request from queue, if any
949 r = requests.front();
950 requests.pop_front();
951 }
952
953 return r;
954 }
955
956 /// Calls back with a pointer to the buffer with the helper output
957 static void
958 helperReturnBuffer(Helper::Session * srv, const Helper::Client::Pointer &hlp, char * const msg, const size_t msgSize, const char * const msgEnd)
959 {
960 if (Helper::Xaction *r = srv->replyXaction) {
961 const bool hasSpace = r->reply.accumulate(msg, msgSize);
962 if (!hasSpace) {
963 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
964 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
965 "Squid input buffer: " << hlp->id_name << " #" << srv->index);
966 srv->closePipesSafely();
967 return;
968 }
969
970 if (!msgEnd)
971 return; // We are waiting for more data.
972
973 bool retry = false;
974 if (cbdataReferenceValid(r->request.data)) {
975 r->reply.finalize();
976 if (r->reply.result == Helper::BrokenHelper && r->request.retries < MAX_RETRIES) {
977 debugs(84, DBG_IMPORTANT, "ERROR: helper: " << r->reply << ", attempt #" << (r->request.retries + 1) << " of 2");
978 retry = true;
979 } else {
980 hlp->callBack(*r);
981 }
982 }
983
984 -- srv->stats.pending;
985 ++ srv->stats.replies;
986
987 ++ hlp->stats.replies;
988
989 srv->answer_time = current_time;
990
991 srv->dispatch_time = r->request.dispatch_time;
992
993 hlp->stats.avg_svc_time =
994 Math::intAverage(hlp->stats.avg_svc_time,
995 tvSubMsec(r->request.dispatch_time, current_time),
996 hlp->stats.replies, REDIRECT_AV_FACTOR);
997
998 // release or re-submit parsedRequestXaction object
999 srv->replyXaction = nullptr;
1000 if (retry) {
1001 ++r->request.retries;
1002 hlp->submitRequest(r);
1003 } else
1004 delete r;
1005 }
1006
1007 if (hlp->timeout && hlp->childs.concurrency)
1008 srv->checkForTimedOutRequests(hlp->retryTimedOut);
1009
1010 if (!srv->flags.shutdown) {
1011 helperKickQueue(hlp);
1012 } else if (!srv->flags.closing && !srv->stats.pending) {
1013 srv->closeWritePipeSafely();
1014 }
1015 }
1016
1017 static void
1018 helperHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
1019 {
1020 const auto srv = static_cast<Helper::Session *>(data);
1021 const auto hlp = srv->parent;
1022 assert(cbdataReferenceValid(data));
1023
1024 /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
1025
1026 if (flag == Comm::ERR_CLOSING) {
1027 return;
1028 }
1029
1030 assert(conn->fd == srv->readPipe->fd);
1031
1032 debugs(84, 5, "helperHandleRead: " << len << " bytes from " << hlp->id_name << " #" << srv->index);
1033
1034 if (flag != Comm::OK || len == 0) {
1035 srv->closePipesSafely();
1036 return;
1037 }
1038
1039 srv->roffset += len;
1040 srv->rbuf[srv->roffset] = '\0';
1041 debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
1042
1043 if (!srv->stats.pending && !srv->stats.timedout) {
1044 /* someone spoke without being spoken to */
1045 debugs(84, DBG_IMPORTANT, "ERROR: Killing helper process after an unexpected read from " <<
1046 hlp->id_name << " #" << srv->index << ", " << (int)len <<
1047 " bytes '" << srv->rbuf << "'");
1048
1049 srv->roffset = 0;
1050 srv->rbuf[0] = '\0';
1051 srv->closePipesSafely();
1052 return;
1053 }
1054
1055 bool needsMore = false;
1056 char *msg = srv->rbuf;
1057 while (*msg && !needsMore) {
1058 int skip = 0;
1059 char *eom = strchr(msg, hlp->eom);
1060 if (eom) {
1061 skip = 1;
1062 debugs(84, 3, "helperHandleRead: end of reply found");
1063 if (eom > msg && eom[-1] == '\r' && hlp->eom == '\n') {
1064 *eom = '\0';
1065 // rewind to the \r octet which is the real terminal now
1066 // and remember that we have to skip forward 2 places now.
1067 skip = 2;
1068 --eom;
1069 }
1070 *eom = '\0';
1071 }
1072
1073 if (!srv->ignoreToEom && !srv->replyXaction) {
1074 int i = 0;
1075 if (hlp->childs.concurrency) {
1076 char *e = nullptr;
1077 i = strtol(msg, &e, 10);
1078 // Do we need to check for e == msg? Means wrong response from helper.
1079 // Will be dropped as "unexpected reply on channel 0"
1080 needsMore = !(xisspace(*e) || (eom && e == eom));
1081 if (!needsMore) {
1082 msg = e;
1083 while (*msg && xisspace(*msg))
1084 ++msg;
1085 } // else not enough data to compute request number
1086 }
1087 if (!(srv->replyXaction = srv->popRequest(i))) {
1088 if (srv->stats.timedout) {
1089 debugs(84, 3, "Timedout reply received for request-ID: " << i << " , ignore");
1090 } else {
1091 debugs(84, DBG_IMPORTANT, "ERROR: helperHandleRead: unexpected reply on channel " <<
1092 i << " from " << hlp->id_name << " #" << srv->index <<
1093 " '" << srv->rbuf << "'");
1094 }
1095 srv->ignoreToEom = true;
1096 }
1097 } // else we need to just append reply data to the current Xaction
1098
1099 if (!needsMore) {
1100 size_t msgSize = eom ? eom - msg : (srv->roffset - (msg - srv->rbuf));
1101 assert(msgSize <= srv->rbuf_sz);
1102 helperReturnBuffer(srv, hlp, msg, msgSize, eom);
1103 msg += msgSize + skip;
1104 assert(static_cast<size_t>(msg - srv->rbuf) <= srv->rbuf_sz);
1105
1106 // The next message should not ignored.
1107 if (eom && srv->ignoreToEom)
1108 srv->ignoreToEom = false;
1109 } else
1110 assert(skip == 0 && eom == nullptr);
1111 }
1112
1113 if (needsMore) {
1114 size_t msgSize = (srv->roffset - (msg - srv->rbuf));
1115 assert(msgSize <= srv->rbuf_sz);
1116 memmove(srv->rbuf, msg, msgSize);
1117 srv->roffset = msgSize;
1118 srv->rbuf[srv->roffset] = '\0';
1119 } else {
1120 // All of the responses parsed and msg points at the end of read data
1121 assert(static_cast<size_t>(msg - srv->rbuf) == srv->roffset);
1122 srv->roffset = 0;
1123 }
1124
1125 if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
1126 int spaceSize = srv->rbuf_sz - srv->roffset - 1;
1127 assert(spaceSize >= 0);
1128
1129 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
1130 CommIoCbPtrFun(helperHandleRead, srv));
1131 comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
1132 }
1133 }
1134
1135 static void
1136 helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
1137 {
1138 char *t = nullptr;
1139 helper_stateful_server *srv = (helper_stateful_server *)data;
1140 const auto hlp = srv->parent;
1141 assert(cbdataReferenceValid(data));
1142
1143 /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
1144
1145 if (flag == Comm::ERR_CLOSING) {
1146 return;
1147 }
1148
1149 assert(conn->fd == srv->readPipe->fd);
1150
1151 debugs(84, 5, "helperStatefulHandleRead: " << len << " bytes from " <<
1152 hlp->id_name << " #" << srv->index);
1153
1154 if (flag != Comm::OK || len == 0) {
1155 srv->closePipesSafely();
1156 return;
1157 }
1158
1159 srv->roffset += len;
1160 srv->rbuf[srv->roffset] = '\0';
1161 debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
1162
1163 if (srv->requests.empty()) {
1164 /* someone spoke without being spoken to */
1165 debugs(84, DBG_IMPORTANT, "ERROR: Killing helper process after an unexpected read from " <<
1166 hlp->id_name << " #" << srv->index << ", " << (int)len <<
1167 " bytes '" << srv->rbuf << "'");
1168
1169 srv->roffset = 0;
1170 srv->closePipesSafely();
1171 return;
1172 }
1173
1174 if ((t = strchr(srv->rbuf, hlp->eom))) {
1175 debugs(84, 3, "helperStatefulHandleRead: end of reply found");
1176
1177 if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n') {
1178 *t = '\0';
1179 // rewind to the \r octet which is the real terminal now
1180 --t;
1181 }
1182
1183 *t = '\0';
1184 }
1185
1186 const auto r = srv->requests.front();
1187
1188 if (!r->reply.accumulate(srv->rbuf, t ? (t - srv->rbuf) : srv->roffset)) {
1189 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
1190 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
1191 "Squid input buffer: " << hlp->id_name << " #" << srv->index);
1192 srv->closePipesSafely();
1193 return;
1194 }
1195 /**
1196 * BUG: the below assumes that only one response per read() was received and discards any octets remaining.
1197 * Doing this prohibits concurrency support with multiple replies per read().
1198 * TODO: check that read() setup on these buffers pays attention to roffest!=0
1199 * TODO: check that replies bigger than the buffer are discarded and do not to affect future replies
1200 */
1201 srv->roffset = 0;
1202
1203 if (t) {
1204 /* end of reply found */
1205 srv->requests.pop_front(); // we already have it in 'r'
1206 int called = 1;
1207
1208 if (cbdataReferenceValid(r->request.data)) {
1209 r->reply.finalize();
1210 r->reply.reservationId = srv->reservationId;
1211 hlp->callBack(*r);
1212 } else {
1213 debugs(84, DBG_IMPORTANT, "StatefulHandleRead: no callback data registered");
1214 called = 0;
1215 }
1216
1217 delete r;
1218
1219 -- srv->stats.pending;
1220 ++ srv->stats.replies;
1221
1222 ++ hlp->stats.replies;
1223 srv->answer_time = current_time;
1224 hlp->stats.avg_svc_time =
1225 Math::intAverage(hlp->stats.avg_svc_time,
1226 tvSubMsec(srv->dispatch_time, current_time),
1227 hlp->stats.replies, REDIRECT_AV_FACTOR);
1228
1229 if (called)
1230 helperStatefulServerDone(srv);
1231 else
1232 hlp->cancelReservation(srv->reservationId);
1233 }
1234
1235 if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
1236 int spaceSize = srv->rbuf_sz - 1;
1237
1238 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
1239 CommIoCbPtrFun(helperStatefulHandleRead, srv));
1240 comm_read(srv->readPipe, srv->rbuf, spaceSize, call);
1241 }
1242 }
1243
1244 /// Handles a request when all running helpers, if any, are busy.
1245 static void
1246 Enqueue(Helper::Client * const hlp, Helper::Xaction * const r)
1247 {
1248 hlp->queue.push(r);
1249 ++ hlp->stats.queue_size;
1250
1251 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1252 if (hlp->childs.needNew() > 0) {
1253 hlp->openSessions();
1254 return;
1255 }
1256
1257 if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
1258 return;
1259
1260 if (squid_curtime - hlp->last_queue_warn < 600)
1261 return;
1262
1263 if (shutting_down || reconfiguring)
1264 return;
1265
1266 hlp->last_queue_warn = squid_curtime;
1267
1268 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1269 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1270 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1271 }
1272
1273 static void
1274 StatefulEnqueue(statefulhelper * hlp, Helper::Xaction * r)
1275 {
1276 hlp->queue.push(r);
1277 ++ hlp->stats.queue_size;
1278
1279 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1280 if (hlp->childs.needNew() > 0) {
1281 hlp->openSessions();
1282 return;
1283 }
1284
1285 if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
1286 return;
1287
1288 if (squid_curtime - hlp->last_queue_warn < 600)
1289 return;
1290
1291 if (shutting_down || reconfiguring)
1292 return;
1293
1294 hlp->last_queue_warn = squid_curtime;
1295
1296 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1297 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1298 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1299 }
1300
1301 Helper::Xaction *
1302 Helper::Client::nextRequest()
1303 {
1304 if (queue.empty())
1305 return nullptr;
1306
1307 auto *r = queue.front();
1308 queue.pop();
1309 --stats.queue_size;
1310 return r;
1311 }
1312
1313 static Helper::Session *
1314 GetFirstAvailable(const Helper::Client::Pointer &hlp)
1315 {
1316 dlink_node *n;
1317 Helper::Session *selected = nullptr;
1318 debugs(84, 5, "GetFirstAvailable: Running servers " << hlp->childs.n_running);
1319
1320 if (hlp->childs.n_running == 0)
1321 return nullptr;
1322
1323 /* Find "least" loaded helper (approx) */
1324 for (n = hlp->servers.head; n != nullptr; n = n->next) {
1325 const auto srv = static_cast<Helper::Session *>(n->data);
1326
1327 if (selected && selected->stats.pending <= srv->stats.pending)
1328 continue;
1329
1330 if (srv->flags.shutdown)
1331 continue;
1332
1333 if (!srv->stats.pending)
1334 return srv;
1335
1336 if (selected) {
1337 selected = srv;
1338 break;
1339 }
1340
1341 selected = srv;
1342 }
1343
1344 if (!selected) {
1345 debugs(84, 5, "GetFirstAvailable: None available.");
1346 return nullptr;
1347 }
1348
1349 if (selected->stats.pending >= (hlp->childs.concurrency ? hlp->childs.concurrency : 1)) {
1350 debugs(84, 3, "GetFirstAvailable: Least-loaded helper is fully loaded!");
1351 return nullptr;
1352 }
1353
1354 debugs(84, 5, "GetFirstAvailable: returning srv-" << selected->index);
1355 return selected;
1356 }
1357
1358 static helper_stateful_server *
1359 StatefulGetFirstAvailable(const statefulhelper::Pointer &hlp)
1360 {
1361 dlink_node *n;
1362 helper_stateful_server *srv = nullptr;
1363 helper_stateful_server *oldestReservedServer = nullptr;
1364 debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp->childs.n_running);
1365
1366 if (hlp->childs.n_running == 0)
1367 return nullptr;
1368
1369 for (n = hlp->servers.head; n != nullptr; n = n->next) {
1370 srv = (helper_stateful_server *)n->data;
1371
1372 if (srv->stats.pending)
1373 continue;
1374
1375 if (srv->reserved()) {
1376 if ((squid_curtime - srv->reservationStart) > hlp->childs.reservationTimeout) {
1377 if (!oldestReservedServer)
1378 oldestReservedServer = srv;
1379 else if (oldestReservedServer->reservationStart < srv->reservationStart)
1380 oldestReservedServer = srv;
1381 debugs(84, 5, "the earlier reserved server is the srv-" << oldestReservedServer->index);
1382 }
1383 continue;
1384 }
1385
1386 if (srv->flags.shutdown)
1387 continue;
1388
1389 debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv->index);
1390 return srv;
1391 }
1392
1393 if (oldestReservedServer) {
1394 debugs(84, 5, "expired reservation " << oldestReservedServer->reservationId << " for srv-" << oldestReservedServer->index);
1395 return oldestReservedServer;
1396 }
1397
1398 debugs(84, 5, "StatefulGetFirstAvailable: None available.");
1399 return nullptr;
1400 }
1401
1402 static void
1403 helperDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag flag, int, void *data)
1404 {
1405 const auto srv = static_cast<Helper::Session *>(data);
1406
1407 srv->writebuf->clean();
1408 delete srv->writebuf;
1409 srv->writebuf = nullptr;
1410 srv->flags.writing = false;
1411
1412 if (flag != Comm::OK) {
1413 /* Helper server has crashed */
1414 debugs(84, DBG_CRITICAL, "helperDispatch: Helper " << srv->parent->id_name << " #" << srv->index << " has crashed");
1415 return;
1416 }
1417
1418 if (!srv->wqueue->isNull()) {
1419 srv->writebuf = srv->wqueue;
1420 srv->wqueue = new MemBuf;
1421 srv->flags.writing = true;
1422 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1423 CommIoCbPtrFun(helperDispatchWriteDone, srv));
1424 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, nullptr);
1425 }
1426 }
1427
1428 static void
1429 helperDispatch(Helper::Session * const srv, Helper::Xaction * const r)
1430 {
1431 const auto hlp = srv->parent;
1432 const uint64_t reqId = ++srv->nextRequestId;
1433
1434 if (!cbdataReferenceValid(r->request.data)) {
1435 debugs(84, DBG_IMPORTANT, "ERROR: helperDispatch: invalid callback data");
1436 delete r;
1437 return;
1438 }
1439
1440 r->request.Id = reqId;
1441 const auto it = srv->requests.insert(srv->requests.end(), r);
1442 r->request.dispatch_time = current_time;
1443
1444 if (srv->wqueue->isNull())
1445 srv->wqueue->init();
1446
1447 if (hlp->childs.concurrency) {
1448 srv->requestsIndex.insert(Helper::Session::RequestIndex::value_type(reqId, it));
1449 assert(srv->requestsIndex.size() == srv->requests.size());
1450 srv->wqueue->appendf("%" PRIu64 " %s", reqId, r->request.buf);
1451 } else
1452 srv->wqueue->append(r->request.buf, strlen(r->request.buf));
1453
1454 if (!srv->flags.writing) {
1455 assert(nullptr == srv->writebuf);
1456 srv->writebuf = srv->wqueue;
1457 srv->wqueue = new MemBuf;
1458 srv->flags.writing = true;
1459 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1460 CommIoCbPtrFun(helperDispatchWriteDone, srv));
1461 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, nullptr);
1462 }
1463
1464 debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index << ", " << strlen(r->request.buf) << " bytes");
1465
1466 ++ srv->stats.uses;
1467 ++ srv->stats.pending;
1468 ++ hlp->stats.requests;
1469 }
1470
1471 static void
1472 helperStatefulDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag, int, void *)
1473 {}
1474
1475 static void
1476 helperStatefulDispatch(helper_stateful_server * srv, Helper::Xaction * r)
1477 {
1478 const auto hlp = srv->parent;
1479
1480 if (!cbdataReferenceValid(r->request.data)) {
1481 debugs(84, DBG_IMPORTANT, "ERROR: helperStatefulDispatch: invalid callback data");
1482 delete r;
1483 hlp->cancelReservation(srv->reservationId);
1484 return;
1485 }
1486
1487 debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index);
1488
1489 assert(srv->reservationId);
1490 r->reply.reservationId = srv->reservationId;
1491
1492 if (r->request.placeholder == 1) {
1493 /* a callback is needed before this request can _use_ a helper. */
1494 /* we don't care about releasing this helper. The request NEVER
1495 * gets to the helper. So we throw away the return code */
1496 r->reply.result = Helper::Unknown;
1497 hlp->callBack(*r);
1498 /* throw away the placeholder */
1499 delete r;
1500 /* and push the queue. Note that the callback may have submitted a new
1501 * request to the helper which is why we test for the request */
1502
1503 if (!srv->requests.size())
1504 helperStatefulServerDone(srv);
1505
1506 return;
1507 }
1508
1509 srv->requests.push_back(r);
1510 srv->dispatch_time = current_time;
1511 AsyncCall::Pointer call = commCbCall(5,5, "helperStatefulDispatchWriteDone",
1512 CommIoCbPtrFun(helperStatefulDispatchWriteDone, srv));
1513 Comm::Write(srv->writePipe, r->request.buf, strlen(r->request.buf), call, nullptr);
1514 debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
1515 hlp->id_name << " #" << srv->index << ", " <<
1516 (int) strlen(r->request.buf) << " bytes");
1517
1518 ++ srv->stats.uses;
1519 ++ srv->stats.pending;
1520 ++ hlp->stats.requests;
1521 }
1522
1523 static void
1524 helperKickQueue(const Helper::Client::Pointer &hlp)
1525 {
1526 Helper::Xaction *r = nullptr;
1527 Helper::Session *srv = nullptr;
1528
1529 while ((srv = GetFirstAvailable(hlp)) && (r = hlp->nextRequest()))
1530 helperDispatch(srv, r);
1531
1532 if (!hlp->childs.n_active)
1533 hlp->dropQueued();
1534 }
1535
1536 static void
1537 helperStatefulKickQueue(const statefulhelper::Pointer &hlp)
1538 {
1539 Helper::Xaction *r;
1540 helper_stateful_server *srv;
1541 while ((srv = StatefulGetFirstAvailable(hlp)) && (r = hlp->nextRequest())) {
1542 debugs(84, 5, "found srv-" << srv->index);
1543 hlp->reserveServer(srv);
1544 helperStatefulDispatch(srv, r);
1545 }
1546
1547 if (!hlp->childs.n_active)
1548 hlp->dropQueued();
1549 }
1550
1551 static void
1552 helperStatefulServerDone(helper_stateful_server * srv)
1553 {
1554 if (!srv->flags.shutdown) {
1555 helperStatefulKickQueue(srv->parent);
1556 } else if (!srv->flags.closing && !srv->reserved() && !srv->stats.pending) {
1557 srv->closeWritePipeSafely();
1558 return;
1559 }
1560 }
1561
1562 void
1563 Helper::Session::checkForTimedOutRequests(bool const retry)
1564 {
1565 assert(parent->childs.concurrency);
1566 while(!requests.empty() && requests.front()->request.timedOut(parent->timeout)) {
1567 const auto r = requests.front();
1568 RequestIndex::iterator it;
1569 it = requestsIndex.find(r->request.Id);
1570 assert(it != requestsIndex.end());
1571 requestsIndex.erase(it);
1572 requests.pop_front();
1573 debugs(84, 2, "Request " << r->request.Id << " timed-out, remove it from queue");
1574 bool retried = false;
1575 if (retry && r->request.retries < MAX_RETRIES && cbdataReferenceValid(r->request.data)) {
1576 debugs(84, 2, "Retry request " << r->request.Id);
1577 ++r->request.retries;
1578 parent->submitRequest(r);
1579 retried = true;
1580 } else if (cbdataReferenceValid(r->request.data)) {
1581 if (!parent->onTimedOutResponse.isEmpty()) {
1582 if (r->reply.accumulate(parent->onTimedOutResponse.rawContent(), parent->onTimedOutResponse.length()))
1583 r->reply.finalize();
1584 else
1585 r->reply.result = Helper::TimedOut;
1586 parent->callBack(*r);
1587 } else {
1588 r->reply.result = Helper::TimedOut;
1589 parent->callBack(*r);
1590 }
1591 }
1592 --stats.pending;
1593 ++stats.timedout;
1594 ++parent->stats.timedout;
1595 if (!retried)
1596 delete r;
1597 }
1598 }
1599
1600 void
1601 Helper::Session::requestTimeout(const CommTimeoutCbParams &io)
1602 {
1603 debugs(26, 3, io.conn);
1604 const auto srv = static_cast<Session *>(io.data);
1605
1606 srv->checkForTimedOutRequests(srv->parent->retryTimedOut);
1607
1608 debugs(84, 3, io.conn << " establish a new timeout");
1609 AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "Helper::Session::requestTimeout",
1610 CommTimeoutCbPtrFun(Session::requestTimeout, srv));
1611
1612 const time_t timeSpent = srv->requests.empty() ? 0 : (squid_curtime - srv->requests.front()->request.dispatch_time.tv_sec);
1613 const time_t minimumNewTimeout = 1; // second
1614 const auto timeLeft = max(minimumNewTimeout, srv->parent->timeout - timeSpent);
1615
1616 commSetConnTimeout(io.conn, timeLeft, timeoutCall);
1617 }
1618