]> git.ipfire.org Git - thirdparty/squid.git/blame - src/helper.cc
NoNewGlobals for MapLabel (#1746)
[thirdparty/squid.git] / src / helper.cc
CommitLineData
f740a279 1/*
b8ae064d 2 * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
f740a279 3 *
bbc27441
AJ
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
f740a279 7 */
8
bbc27441
AJ
9/* DEBUG: section 84 Helper process maintenance */
10
582c2af2 11#include "squid.h"
37dedc58 12#include "base/AsyncCbdataCalls.h"
bf3e8d5a 13#include "base/Packable.h"
675b8408 14#include "base/Raw.h"
e0d28505
AJ
15#include "comm.h"
16#include "comm/Connection.h"
7e66d5e2 17#include "comm/Read.h"
ec41b64c 18#include "comm/Write.h"
675b8408 19#include "debug/Messages.h"
c4ad1349 20#include "fd.h"
b0bb5517 21#include "fde.h"
38e16f92 22#include "format/Quoting.h"
582c2af2 23#include "helper.h"
24438ec5
AJ
24#include "helper/Reply.h"
25#include "helper/Request.h"
e0d28505 26#include "MemBuf.h"
32fd6d8a 27#include "SquidConfig.h"
96097880 28#include "SquidIpc.h"
a98bcbee 29#include "SquidMath.h"
e6ccf245 30#include "Store.h"
d295d770 31#include "wordlist.h"
74addf6c 32
ed6e9fb9
AJ
33// helper_stateful_server::data uses explicit alloc()/freeOne() */
34#include "mem/Pool.h"
35
74addf6c 36#define HELPER_MAX_ARGS 64
37
32fd6d8a
CT
38/// The maximum allowed request retries.
39#define MAX_RETRIES 2
40
ddc77a2e
CT
41/// Helpers input buffer size.
42const size_t ReadBufSize(32*1024);
a78f2175 43
c4b7a5a9 44static IOCB helperHandleRead;
45static IOCB helperStatefulHandleRead;
e05a9d51
EB
46static void Enqueue(Helper::Client *, Helper::Xaction *);
47static Helper::Session *GetFirstAvailable(const Helper::Client::Pointer &);
3bd118d6 48static helper_stateful_server *StatefulGetFirstAvailable(const statefulhelper::Pointer &);
e05a9d51 49static void helperDispatch(Helper::Session *, Helper::Xaction *);
ddc77a2e 50static void helperStatefulDispatch(helper_stateful_server * srv, Helper::Xaction * r);
e05a9d51 51static void helperKickQueue(const Helper::Client::Pointer &);
3bd118d6 52static void helperStatefulKickQueue(const statefulhelper::Pointer &);
a8c4f8d6 53static void helperStatefulServerDone(helper_stateful_server * srv);
ddc77a2e 54static void StatefulEnqueue(statefulhelper * hlp, Helper::Xaction * r);
9522b380 55
e05a9d51 56CBDATA_NAMESPACED_CLASS_INIT(Helper, Session);
f3831c93 57CBDATA_CLASS_INIT(helper_stateful_server);
aa839030 58
e05a9d51 59InstanceIdDefinitions(Helper::SessionBase, "Hlpr");
e237d339 60
895dba0a 61void
e05a9d51 62Helper::SessionBase::initStats()
895dba0a
CT
63{
64 stats.uses=0;
65 stats.replies=0;
66 stats.pending=0;
67 stats.releases=0;
68 stats.timedout = 0;
69}
70
e0d28505 71void
0a958e42 72Helper::SessionBase::closePipesSafely()
e0d28505 73{
7aa9bb3e 74#if _SQUID_WINDOWS_
e0d28505
AJ
75 shutdown(writePipe->fd, SD_BOTH);
76#endif
77
be4d35dc 78 flags.closing = true;
e0d28505
AJ
79 if (readPipe->fd == writePipe->fd)
80 readPipe->fd = -1;
81 else
82 readPipe->close();
83 writePipe->close();
84
7aa9bb3e 85#if _SQUID_WINDOWS_
dc49061a
A
86 if (hIpc) {
87 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
88 getCurrentTime();
0a958e42 89 debugs(84, DBG_IMPORTANT, "WARNING: " << helper().id_name <<
41060bef 90 " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
e0d28505 91 }
dc49061a
A
92 CloseHandle(hIpc);
93 }
e0d28505
AJ
94#endif
95}
96
97void
0a958e42 98Helper::SessionBase::closeWritePipeSafely()
e0d28505 99{
7aa9bb3e 100#if _SQUID_WINDOWS_
e0d28505
AJ
101 shutdown(writePipe->fd, (readPipe->fd == writePipe->fd ? SD_BOTH : SD_SEND));
102#endif
103
be4d35dc 104 flags.closing = true;
e0d28505
AJ
105 if (readPipe->fd == writePipe->fd)
106 readPipe->fd = -1;
107 writePipe->close();
108
7aa9bb3e 109#if _SQUID_WINDOWS_
dc49061a
A
110 if (hIpc) {
111 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
112 getCurrentTime();
0a958e42 113 debugs(84, DBG_IMPORTANT, "WARNING: " << helper().id_name <<
41060bef 114 " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
e0d28505 115 }
dc49061a
A
116 CloseHandle(hIpc);
117 }
e0d28505
AJ
118#endif
119}
120
a56fcf0b 121void
0a958e42 122Helper::SessionBase::dropQueued()
a56fcf0b
CT
123{
124 while (!requests.empty()) {
125 // XXX: re-schedule these on another helper?
e05a9d51 126 const auto r = requests.front();
a56fcf0b 127 requests.pop_front();
26b6afaf 128 r->reply.result = Helper::Unknown;
0a958e42 129 helper().callBack(*r);
a56fcf0b
CT
130 delete r;
131 }
132}
133
e05a9d51 134Helper::SessionBase::~SessionBase()
a56fcf0b
CT
135{
136 if (rbuf) {
137 memFreeBuf(rbuf_sz, rbuf);
aee3523a 138 rbuf = nullptr;
a56fcf0b
CT
139 }
140}
141
e05a9d51 142Helper::Session::~Session()
a56fcf0b
CT
143{
144 wqueue->clean();
145 delete wqueue;
146
147 if (writebuf) {
148 writebuf->clean();
149 delete writebuf;
aee3523a 150 writebuf = nullptr;
a56fcf0b
CT
151 }
152
153 if (Comm::IsConnOpen(writePipe))
0a958e42 154 closeWritePipeSafely();
a56fcf0b
CT
155
156 dlinkDelete(&link, &parent->servers);
157
158 assert(parent->childs.n_running > 0);
159 -- parent->childs.n_running;
160
161 assert(requests.empty());
a56fcf0b
CT
162}
163
164void
0a958e42 165Helper::Session::dropQueued()
a56fcf0b 166{
0a958e42 167 SessionBase::dropQueued();
a56fcf0b
CT
168 requestsIndex.clear();
169}
170
171helper_stateful_server::~helper_stateful_server()
172{
173 /* TODO: walk the local queue of requests and carry them all out */
174 if (Comm::IsConnOpen(writePipe))
0a958e42 175 closeWritePipeSafely();
a56fcf0b
CT
176
177 parent->cancelReservation(reservationId);
178
179 dlinkDelete(&link, &parent->servers);
180
181 assert(parent->childs.n_running > 0);
182 -- parent->childs.n_running;
183
184 assert(requests.empty());
a56fcf0b
CT
185}
186
74addf6c 187void
bd71920d 188Helper::Client::openSessions()
74addf6c 189{
190 char *s;
191 char *progname;
192 char *shortname;
193 char *procname;
1b8af627 194 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
74addf6c 195 char fd_note_buf[FD_DESC_SZ];
74addf6c 196 int nargs = 0;
197 int k;
b5d712b5 198 pid_t pid;
74addf6c 199 int rfd;
200 int wfd;
b5d712b5 201 void * hIpc;
74addf6c 202 wordlist *w;
bd71920d
EB
203 // Helps reducing diff. TODO: remove
204 const auto hlp = this;
62e76326 205
aee3523a 206 if (hlp->cmdline == nullptr)
62e76326 207 return;
208
74addf6c 209 progname = hlp->cmdline->key;
62e76326 210
74addf6c 211 if ((s = strrchr(progname, '/')))
62e76326 212 shortname = xstrdup(s + 1);
74addf6c 213 else
62e76326 214 shortname = xstrdup(progname);
215
48d54e4d
AJ
216 /* figure out how many new child are actually needed. */
217 int need_new = hlp->childs.needNew();
5a5b2c56 218
c59baaa8 219 debugs(84, Important(19), "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
5a5b2c56 220
04f7fd38 221 if (need_new < 1) {
c59baaa8 222 debugs(84, Important(20), "helperOpenServers: No '" << shortname << "' processes needed.");
5a5b2c56 223 }
62e76326 224
e6ccf245 225 procname = (char *)xmalloc(strlen(shortname) + 3);
62e76326 226
74addf6c 227 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
62e76326 228
a38ec4b1
FC
229 args[nargs] = procname;
230 ++nargs;
62e76326 231
a38ec4b1
FC
232 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
233 args[nargs] = w->key;
234 ++nargs;
235 }
62e76326 236
aee3523a 237 args[nargs] = nullptr;
a38ec4b1 238 ++nargs;
62e76326 239
74addf6c 240 assert(nargs <= HELPER_MAX_ARGS);
62e76326 241
654835f0
EB
242 int successfullyStarted = 0;
243
95dc7ff4 244 for (k = 0; k < need_new; ++k) {
62e76326 245 getCurrentTime();
246 rfd = wfd = -1;
b5d712b5 247 pid = ipcCreate(hlp->ipc_type,
248 progname,
249 args,
250 shortname,
cc192b50 251 hlp->addr,
b5d712b5 252 &rfd,
253 &wfd,
254 &hIpc);
255
256 if (pid < 0) {
e0236918 257 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
62e76326 258 continue;
259 }
260
654835f0 261 ++successfullyStarted;
95dc7ff4
FC
262 ++ hlp->childs.n_running;
263 ++ hlp->childs.n_active;
e05a9d51 264 const auto srv = new Helper::Session;
895dba0a
CT
265 srv->hIpc = hIpc;
266 srv->pid = pid;
267 srv->initStats();
268 srv->addr = hlp->addr;
269 srv->readPipe = new Comm::Connection;
270 srv->readPipe->fd = rfd;
271 srv->writePipe = new Comm::Connection;
272 srv->writePipe->fd = wfd;
273 srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
274 srv->wqueue = new MemBuf;
275 srv->roffset = 0;
276 srv->nextRequestId = 0;
aee3523a 277 srv->replyXaction = nullptr;
895dba0a 278 srv->ignoreToEom = false;
3bd118d6 279 srv->parent = hlp;
62e76326 280 dlinkAddTail(srv, &srv->link, &hlp->servers);
281
282 if (rfd == wfd) {
283 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
284 fd_note(rfd, fd_note_buf);
285 } else {
286 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
287 fd_note(rfd, fd_note_buf);
288 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
289 fd_note(wfd, fd_note_buf);
290 }
291
292 commSetNonBlocking(rfd);
293
294 if (wfd != rfd)
295 commSetNonBlocking(wfd);
296
0a958e42 297 AsyncCall::Pointer closeCall = asyncCall(5, 4, "Helper::Session::HelperServerClosed", cbdataDialer(SessionBase::HelperServerClosed,
ab2acef6 298 static_cast<Helper::SessionBase *>(srv)));
0a958e42 299
37dedc58 300 comm_add_close_handler(rfd, closeCall);
07eca7e0 301
f53969cc 302 if (hlp->timeout && hlp->childs.concurrency) {
e05a9d51
EB
303 AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "Helper::Session::requestTimeout",
304 CommTimeoutCbPtrFun(Helper::Session::requestTimeout, srv));
32fd6d8a
CT
305 commSetConnTimeout(srv->readPipe, hlp->timeout, timeoutCall);
306 }
307
abd8f140
AJ
308 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
309 CommIoCbPtrFun(helperHandleRead, srv));
310 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
74addf6c 311 }
62e76326 312
654835f0
EB
313 // Call handleFewerServers() before hlp->last_restart is updated because
314 // that method uses last_restart to measure the delay since previous start.
315 // TODO: Refactor last_restart code to measure failure frequency rather than
316 // detecting a helper #X failure that is being close to the helper #Y start.
317 if (successfullyStarted < need_new)
318 hlp->handleFewerServers(false);
319
5ea33fce 320 hlp->last_restart = squid_curtime;
74addf6c 321 safe_free(shortname);
322 safe_free(procname);
838b993c 323 helperKickQueue(hlp);
74addf6c 324}
325
94439e4e 326void
bd71920d 327statefulhelper::openSessions()
94439e4e 328{
94439e4e 329 char *shortname;
1b8af627 330 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
94439e4e 331 char fd_note_buf[FD_DESC_SZ];
94439e4e 332 int nargs = 0;
bd71920d
EB
333 // Helps reducing diff. TODO: remove
334 const auto hlp = this;
62e76326 335
aee3523a 336 if (hlp->cmdline == nullptr)
62e76326 337 return;
338
7353861b 339 if (hlp->childs.concurrency)
fa84c01d 340 debugs(84, DBG_CRITICAL, "ERROR: concurrency= is not yet supported for stateful helpers ('" << hlp->cmdline << "')");
7353861b 341
4f0ef8e8 342 char *progname = hlp->cmdline->key;
62e76326 343
4f0ef8e8 344 char *s;
94439e4e 345 if ((s = strrchr(progname, '/')))
62e76326 346 shortname = xstrdup(s + 1);
94439e4e 347 else
62e76326 348 shortname = xstrdup(progname);
349
48d54e4d
AJ
350 /* figure out haw mant new helpers are needed. */
351 int need_new = hlp->childs.needNew();
5a5b2c56 352
e0236918 353 debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
5a5b2c56 354
04f7fd38 355 if (need_new < 1) {
e0236918 356 debugs(84, DBG_IMPORTANT, "helperStatefulOpenServers: No '" << shortname << "' processes needed.");
5a5b2c56 357 }
62e76326 358
4f0ef8e8 359 char *procname = (char *)xmalloc(strlen(shortname) + 3);
62e76326 360
94439e4e 361 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
62e76326 362
a38ec4b1
FC
363 args[nargs] = procname;
364 ++nargs;
62e76326 365
a38ec4b1
FC
366 for (wordlist *w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
367 args[nargs] = w->key;
368 ++nargs;
369 }
62e76326 370
aee3523a 371 args[nargs] = nullptr;
a38ec4b1 372 ++nargs;
62e76326 373
94439e4e 374 assert(nargs <= HELPER_MAX_ARGS);
62e76326 375
654835f0
EB
376 int successfullyStarted = 0;
377
95dc7ff4 378 for (int k = 0; k < need_new; ++k) {
62e76326 379 getCurrentTime();
26ac0430
AJ
380 int rfd = -1;
381 int wfd = -1;
382 void * hIpc;
4f0ef8e8 383 pid_t pid = ipcCreate(hlp->ipc_type,
26ac0430
AJ
384 progname,
385 args,
386 shortname,
387 hlp->addr,
388 &rfd,
389 &wfd,
390 &hIpc);
b5d712b5 391
392 if (pid < 0) {
e0236918 393 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
62e76326 394 continue;
395 }
396
654835f0 397 ++successfullyStarted;
95dc7ff4
FC
398 ++ hlp->childs.n_running;
399 ++ hlp->childs.n_active;
895dba0a
CT
400 helper_stateful_server *srv = new helper_stateful_server;
401 srv->hIpc = hIpc;
402 srv->pid = pid;
403 srv->initStats();
404 srv->addr = hlp->addr;
405 srv->readPipe = new Comm::Connection;
406 srv->readPipe->fd = rfd;
407 srv->writePipe = new Comm::Connection;
408 srv->writePipe->fd = wfd;
409 srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
410 srv->roffset = 0;
3bd118d6 411 srv->parent = hlp;
895dba0a
CT
412 srv->reservationStart = 0;
413
62e76326 414 dlinkAddTail(srv, &srv->link, &hlp->servers);
415
416 if (rfd == wfd) {
417 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
418 fd_note(rfd, fd_note_buf);
419 } else {
420 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
421 fd_note(rfd, fd_note_buf);
422 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
423 fd_note(wfd, fd_note_buf);
424 }
425
426 commSetNonBlocking(rfd);
427
428 if (wfd != rfd)
429 commSetNonBlocking(wfd);
430
0a958e42 431 AsyncCall::Pointer closeCall = asyncCall(5, 4, "helper_stateful_server::HelperServerClosed", cbdataDialer(Helper::SessionBase::HelperServerClosed,
ab2acef6 432 static_cast<Helper::SessionBase *>(srv)));
0a958e42 433
37dedc58 434 comm_add_close_handler(rfd, closeCall);
07eca7e0 435
abd8f140
AJ
436 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
437 CommIoCbPtrFun(helperStatefulHandleRead, srv));
438 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
94439e4e 439 }
62e76326 440
654835f0
EB
441 // Call handleFewerServers() before hlp->last_restart is updated because
442 // that method uses last_restart to measure the delay since previous start.
443 // TODO: Refactor last_restart code to measure failure frequency rather than
444 // detecting a helper #X failure that is being close to the helper #Y start.
445 if (successfullyStarted < need_new)
446 hlp->handleFewerServers(false);
447
5ea33fce 448 hlp->last_restart = squid_curtime;
94439e4e 449 safe_free(shortname);
450 safe_free(procname);
451 helperStatefulKickQueue(hlp);
452}
453
32fd6d8a 454void
e05a9d51 455Helper::Client::submitRequest(Helper::Xaction * const r)
32fd6d8a 456{
e05a9d51 457 if (const auto srv = GetFirstAvailable(this))
32fd6d8a
CT
458 helperDispatch(srv, r);
459 else
460 Enqueue(this, r);
461
6082a0e2
EB
462 syncQueueStats();
463}
464
465/// handles helperSubmit() and helperStatefulSubmit() failures
466static void
e05a9d51 467SubmissionFailure(const Helper::Client::Pointer &hlp, HLPCB *callback, void *data)
6082a0e2
EB
468{
469 auto result = Helper::Error;
470 if (!hlp) {
471 debugs(84, 3, "no helper");
472 result = Helper::Unknown;
32fd6d8a 473 }
6082a0e2
EB
474 // else pretend the helper has responded with ERR
475
476 callback(data, Helper::Reply(result));
32fd6d8a
CT
477}
478
74addf6c 479void
e05a9d51 480helperSubmit(const Helper::Client::Pointer &hlp, const char * const buf, HLPCB * const callback, void * const data)
74addf6c 481{
6082a0e2
EB
482 if (!hlp || !hlp->trySubmit(buf, callback, data))
483 SubmissionFailure(hlp, callback, data);
6825b101
CT
484}
485
6082a0e2 486/// whether queuing an additional request would overload the helper
6825b101 487bool
e05a9d51 488Helper::Client::queueFull() const {
6082a0e2
EB
489 return stats.queue_size >= static_cast<int>(childs.queue_size);
490}
491
492bool
e05a9d51 493Helper::Client::overloaded() const {
6825b101
CT
494 return stats.queue_size > static_cast<int>(childs.queue_size);
495}
496
6082a0e2 497/// synchronizes queue-dependent measurements with the current queue state
6825b101 498void
e05a9d51 499Helper::Client::syncQueueStats()
6825b101 500{
6082a0e2
EB
501 if (overloaded()) {
502 if (overloadStart) {
503 debugs(84, 5, id_name << " still overloaded; dropped " << droppedRequests);
504 } else {
505 overloadStart = squid_curtime;
506 debugs(84, 3, id_name << " became overloaded");
507 }
508 } else {
509 if (overloadStart) {
510 debugs(84, 5, id_name << " is no longer overloaded");
511 if (droppedRequests) {
512 debugs(84, DBG_IMPORTANT, "helper " << id_name <<
513 " is no longer overloaded after dropping " << droppedRequests <<
514 " requests in " << (squid_curtime - overloadStart) << " seconds");
515 droppedRequests = 0;
516 }
517 overloadStart = 0;
518 }
519 }
6825b101
CT
520}
521
6082a0e2
EB
522/// prepares the helper for request submission
523/// returns true if and only if the submission should proceed
524/// may kill Squid if the helper remains overloaded for too long
6825b101 525bool
e05a9d51 526Helper::Client::prepSubmit()
6825b101 527{
6082a0e2
EB
528 // re-sync for the configuration may have changed since the last submission
529 syncQueueStats();
530
531 // Nothing special to do if the new request does not overload (i.e., the
532 // queue is not even full yet) or only _starts_ overloading this helper
533 // (i.e., the queue is currently at its limit).
534 if (!overloaded())
535 return true;
62e76326 536
6082a0e2
EB
537 if (squid_curtime - overloadStart <= 180)
538 return true; // also OK: overload has not persisted long enough to panic
539
e05a9d51 540 if (childs.onPersistentOverload == ChildConfig::actDie)
6082a0e2
EB
541 fatalf("Too many queued %s requests; see on-persistent-overload.", id_name);
542
543 if (!droppedRequests) {
544 debugs(84, DBG_IMPORTANT, "WARNING: dropping requests to overloaded " <<
545 id_name << " helper configured with on-persistent-overload=err");
6825b101 546 }
6082a0e2
EB
547 ++droppedRequests;
548 debugs(84, 3, "failed to send " << droppedRequests << " helper requests to " << id_name);
549 return false;
550}
551
552bool
e05a9d51 553Helper::Client::trySubmit(const char * const buf, HLPCB * const callback, void * const data)
6082a0e2
EB
554{
555 if (!prepSubmit())
556 return false; // request was dropped
6825b101
CT
557
558 submit(buf, callback, data); // will send or queue
559 return true; // request submitted or queued
560}
561
562/// dispatches or enqueues a helper requests; does not enforce queue limits
563void
e05a9d51 564Helper::Client::submit(const char * const buf, HLPCB * const callback, void * const data)
6825b101 565{
e05a9d51 566 const auto r = new Xaction(callback, data, buf);
32fd6d8a 567 submitRequest(r);
8289d5a9 568 debugs(84, DBG_DATA, Raw("buf", buf, strlen(buf)));
94439e4e 569}
570
26b6afaf
EB
571void
572Helper::Client::callBack(Xaction &r)
573{
574 const auto callback = r.request.callback;
575 Assure(callback);
576
577 r.request.callback = nullptr;
578 void *cbdata = nullptr;
579 if (cbdataReferenceValidDone(r.request.data, &cbdata))
580 callback(cbdata, r.reply);
581}
582
a56fcf0b
CT
583/// Submit request or callback the caller with a Helper::Error error.
584/// If the reservation is not set then reserves a new helper.
94439e4e 585void
3bd118d6 586helperStatefulSubmit(const statefulhelper::Pointer &hlp, const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
94439e4e 587{
a56fcf0b 588 if (!hlp || !hlp->trySubmit(buf, callback, data, reservation))
6082a0e2
EB
589 SubmissionFailure(hlp, callback, data);
590}
591
592/// If possible, submit request. Otherwise, either kill Squid or return false.
593bool
a56fcf0b 594statefulhelper::trySubmit(const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
6082a0e2
EB
595{
596 if (!prepSubmit())
597 return false; // request was dropped
598
a56fcf0b 599 submit(buf, callback, data, reservation); // will send or queue
6082a0e2 600 return true; // request submitted or queued
6825b101 601}
62e76326 602
a56fcf0b
CT
603void
604statefulhelper::reserveServer(helper_stateful_server * srv)
6825b101 605{
a56fcf0b
CT
606 // clear any old reservation
607 if (srv->reserved()) {
608 reservations.erase(srv->reservationId);
609 srv->clearReservation();
610 }
611
612 srv->reserve();
613 reservations.insert(Reservations::value_type(srv->reservationId, srv));
614}
615
616void
617statefulhelper::cancelReservation(const Helper::ReservationId reservation)
618{
619 const auto it = reservations.find(reservation);
620 if (it == reservations.end())
621 return;
622
623 helper_stateful_server *srv = it->second;
624 reservations.erase(it);
625 srv->clearReservation();
626
627 // schedule a queue kick
628 AsyncCall::Pointer call = asyncCall(5,4, "helperStatefulServerDone", cbdataDialer(helperStatefulServerDone, srv));
629 ScheduleCallHere(call);
630}
631
632helper_stateful_server *
633statefulhelper::findServer(const Helper::ReservationId & reservation)
634{
635 const auto it = reservations.find(reservation);
636 if (it == reservations.end())
637 return nullptr;
638 return it->second;
639}
640
641void
642helper_stateful_server::reserve()
643{
644 assert(!reservationId);
645 reservationStart = squid_curtime;
646 reservationId = Helper::ReservationId::Next();
647 debugs(84, 3, "srv-" << index << " reservation id = " << reservationId);
648}
649
650void
651helper_stateful_server::clearReservation()
652{
653 debugs(84, 3, "srv-" << index << " reservation id = " << reservationId);
654 if (!reservationId)
655 return;
62e76326 656
a56fcf0b
CT
657 ++stats.releases;
658
659 reservationId.clear();
660 reservationStart = 0;
661}
662
663void
664statefulhelper::submit(const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
665{
666 Helper::Xaction *r = new Helper::Xaction(callback, data, buf);
62e76326 667
a56fcf0b
CT
668 if (buf && reservation) {
669 debugs(84, 5, reservation);
670 helper_stateful_server *lastServer = findServer(reservation);
671 if (!lastServer) {
672 debugs(84, DBG_CRITICAL, "ERROR: Helper " << id_name << " reservation expired (" << reservation << ")");
673 r->reply.result = Helper::TimedOut;
26b6afaf 674 callBack(*r);
a56fcf0b
CT
675 delete r;
676 return;
677 }
d20ce97d 678 debugs(84, 5, "StatefulSubmit dispatching");
a56fcf0b 679 helperStatefulDispatch(lastServer, r);
94439e4e 680 } else {
26ac0430 681 helper_stateful_server *srv;
6825b101 682 if ((srv = StatefulGetFirstAvailable(this))) {
a56fcf0b 683 reserveServer(srv);
62e76326 684 helperStatefulDispatch(srv, r);
685 } else
6825b101 686 StatefulEnqueue(this, r);
94439e4e 687 }
62e76326 688
ddc77a2e 689 debugs(84, DBG_DATA, "placeholder: '" << r->request.placeholder <<
3d01c5ab 690 "', " << Raw("buf", buf, (!buf?0:strlen(buf))));
6825b101 691
6082a0e2 692 syncQueueStats();
94439e4e 693}
694
74addf6c 695void
e05a9d51 696Helper::Client::packStatsInto(Packable * const p, const char * const label) const
74addf6c 697{
bf3e8d5a
AJ
698 if (label)
699 p->appendf("%s:\n", label);
700
701 p->appendf(" program: %s\n", cmdline->key);
702 p->appendf(" number active: %d of %d (%d shutting down)\n", childs.n_active, childs.n_max, (childs.n_running - childs.n_active));
703 p->appendf(" requests sent: %d\n", stats.requests);
704 p->appendf(" replies received: %d\n", stats.replies);
705 p->appendf(" requests timedout: %d\n", stats.timedout);
706 p->appendf(" queue length: %d\n", stats.queue_size);
707 p->appendf(" avg service time: %d msec\n", stats.avg_svc_time);
708 p->append("\n",1);
709 p->appendf("%7s\t%7s\t%7s\t%11s\t%11s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
99b59e51
SM
710 "ID #",
711 "FD",
712 "PID",
713 "# Requests",
714 "# Replies",
715 "# Timed-out",
716 "Flags",
717 "Time",
718 "Offset",
719 "Request");
62e76326 720
bf3e8d5a 721 for (dlink_node *link = servers.head; link; link = link->next) {
e05a9d51 722 const auto srv = static_cast<SessionBase *>(link->data);
bf3e8d5a 723 assert(srv);
e05a9d51 724 const auto xaction = srv->requests.empty() ? nullptr : srv->requests.front();
ddc77a2e 725 double tt = 0.001 * (xaction ? tvSubMsec(xaction->request.dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
bf3e8d5a 726 p->appendf("%7u\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
99b59e51
SM
727 srv->index.value,
728 srv->readPipe->fd,
729 srv->pid,
730 srv->stats.uses,
731 srv->stats.replies,
732 srv->stats.timedout,
733 srv->stats.pending ? 'B' : ' ',
734 srv->flags.writing ? 'W' : ' ',
735 srv->flags.closing ? 'C' : ' ',
a56fcf0b 736 srv->reserved() ? 'R' : ' ',
99b59e51 737 srv->flags.shutdown ? 'S' : ' ',
ddc77a2e 738 xaction && xaction->request.placeholder ? 'P' : ' ',
99b59e51
SM
739 tt < 0.0 ? 0.0 : tt,
740 (int) srv->roffset,
ddc77a2e 741 xaction ? Format::QuoteMimeBlob(xaction->request.buf) : "(none)");
94439e4e 742 }
62e76326 743
bf3e8d5a
AJ
744 p->append("\nFlags key:\n"
745 " B\tBUSY\n"
746 " W\tWRITING\n"
747 " C\tCLOSING\n"
748 " R\tRESERVED\n"
749 " S\tSHUTDOWN PENDING\n"
750 " P\tPLACEHOLDER\n", 101);
94439e4e 751}
752
6082a0e2 753bool
e05a9d51 754Helper::Client::willOverload() const {
6082a0e2
EB
755 return queueFull() && !(childs.needNew() || GetFirstAvailable(this));
756}
757
e05a9d51
EB
758Helper::Client::Pointer
759Helper::Client::Make(const char * const name)
3bd118d6 760{
e05a9d51 761 return new Client(name);
3bd118d6
EB
762}
763
764statefulhelper::Pointer
765statefulhelper::Make(const char *name)
766{
767 return new statefulhelper(name);
768}
769
74addf6c 770void
e05a9d51 771helperShutdown(const Helper::Client::Pointer &hlp)
74addf6c 772{
c68e9c6b 773 dlink_node *link = hlp->servers.head;
62e76326 774
c68e9c6b 775 while (link) {
e05a9d51 776 const auto srv = static_cast<Helper::Session *>(link->data);
62e76326 777 link = link->next;
778
8cfc76db 779 if (srv->flags.shutdown) {
e237d339 780 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
62e76326 781 continue;
782 }
783
48d54e4d 784 assert(hlp->childs.n_active > 0);
5e263176 785 -- hlp->childs.n_active;
f53969cc 786 srv->flags.shutdown = true; /* request it to shut itself down */
62e76326 787
d8f10d6a 788 if (srv->flags.closing) {
e237d339 789 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
62e76326 790 continue;
791 }
792
d8f10d6a 793 if (srv->stats.pending) {
e237d339 794 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
62e76326 795 continue;
796 }
797
e237d339 798 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
62e76326 799 /* the rest of the details is dealt with in the helperServerFree
800 * close handler
801 */
0a958e42 802 srv->closePipesSafely();
74addf6c 803 }
569605b9
EB
804
805 Assure(!hlp->childs.n_active);
806 hlp->dropQueued();
74addf6c 807}
808
94439e4e 809void
3bd118d6 810helperStatefulShutdown(const statefulhelper::Pointer &hlp)
94439e4e 811{
812 dlink_node *link = hlp->servers.head;
813 helper_stateful_server *srv;
62e76326 814
94439e4e 815 while (link) {
62e76326 816 srv = (helper_stateful_server *)link->data;
817 link = link->next;
818
8cfc76db 819 if (srv->flags.shutdown) {
e237d339 820 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
62e76326 821 continue;
822 }
823
48d54e4d 824 assert(hlp->childs.n_active > 0);
5e263176 825 -- hlp->childs.n_active;
f53969cc 826 srv->flags.shutdown = true; /* request it to shut itself down */
62e76326 827
a1267972 828 if (srv->stats.pending) {
e237d339 829 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
62e76326 830 continue;
831 }
832
833 if (srv->flags.closing) {
e237d339 834 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
62e76326 835 continue;
836 }
837
a56fcf0b 838 if (srv->reserved()) {
d7e0f901 839 if (shutting_down) {
e237d339 840 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Closing anyway.");
e1381638 841 } else {
e237d339 842 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Not Shutting Down Yet.");
d7e0f901
AJ
843 continue;
844 }
62e76326 845 }
846
e237d339 847 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
bf8fe701 848
62e76326 849 /* the rest of the details is dealt with in the helperStatefulServerFree
850 * close handler
851 */
0a958e42 852 srv->closePipesSafely();
94439e4e 853 }
854}
855
e05a9d51 856Helper::Client::~Client()
1f5f60dd 857{
48d54e4d 858 /* note, don't free id_name, it probably points to static memory */
62e76326 859
569605b9
EB
860 // A non-empty queue would leak Helper::Xaction objects, stalling any
861 // pending (and even future collapsed) transactions. To avoid stalling
862 // transactions, we must dropQueued(). We ought to do that when we
863 // discover that no progress is possible rather than here because
864 // reference counting may keep this object alive for a long time.
865 assert(queue.empty());
94439e4e 866}
867
a56fcf0b 868void
569605b9 869Helper::Client::handleKilledServer(SessionBase * const srv)
74addf6c 870{
1f5f60dd 871 if (!srv->flags.shutdown) {
a56fcf0b
CT
872 assert(childs.n_active > 0);
873 --childs.n_active;
874 debugs(84, DBG_CRITICAL, "WARNING: " << id_name << " #" << srv->index << " exited");
62e76326 875
654835f0 876 handleFewerServers(srv->stats.replies >= 1);
5ea33fce 877
654835f0 878 if (childs.needNew() > 0) {
a56fcf0b 879 srv->flags.shutdown = true;
569605b9 880 openSessions();
4aca092a
HN
881 }
882 }
569605b9
EB
883
884 if (!childs.n_active)
885 dropQueued();
886}
887
888void
889Helper::Client::dropQueued()
890{
891 if (queue.empty())
892 return;
893
894 Assure(!childs.n_active);
895 Assure(!GetFirstAvailable(this));
896
897 // no helper servers means nobody can advance our queued transactions
898
899 debugs(80, DBG_CRITICAL, "ERROR: Dropping " << queue.size() << ' ' <<
900 id_name << " helper requests due to lack of helper processes");
901 // similar to SessionBase::dropQueued()
902 while (const auto r = nextRequest()) {
903 r->reply.result = Helper::Unknown;
904 callBack(*r);
905 delete r;
906 }
74addf6c 907}
908
654835f0 909void
e05a9d51 910Helper::Client::handleFewerServers(const bool madeProgress)
654835f0
EB
911{
912 const auto needNew = childs.needNew();
913
914 if (!needNew)
915 return; // some server(s) have died, but we still have enough
916
917 debugs(80, DBG_IMPORTANT, "Too few " << id_name << " processes are running (need " << needNew << "/" << childs.n_max << ")" <<
918 Debug::Extra << "active processes: " << childs.n_active <<
919 Debug::Extra << "processes configured to start at (re)configuration: " << childs.n_startup);
920
921 if (childs.n_active < childs.n_startup && last_restart > squid_curtime - 30) {
922 if (madeProgress)
923 debugs(80, DBG_CRITICAL, "ERROR: The " << id_name << " helpers are crashing too rapidly, need help!");
924 else
925 fatalf("The %s helpers are crashing too rapidly, need help!", id_name);
926 }
927}
928
bd71920d 929void
0a958e42 930Helper::SessionBase::HelperServerClosed(SessionBase * const srv)
a56fcf0b 931{
0a958e42
EB
932 srv->helper().handleKilledServer(srv);
933 srv->dropQueued();
f7ebc3fd 934 delete srv;
94439e4e 935}
936
ddc77a2e 937Helper::Xaction *
e05a9d51 938Helper::Session::popRequest(const int request_number)
95d2589c 939{
e05a9d51 940 Xaction *r = nullptr;
ddc77a2e 941 if (parent->childs.concurrency) {
2f8abb64 942 // If concurrency supported retrieve request from ID
e05a9d51 943 const auto it = requestsIndex.find(request_number);
ddc77a2e 944 if (it != requestsIndex.end()) {
32fd6d8a 945 r = *(it->second);
ddc77a2e
CT
946 requests.erase(it->second);
947 requestsIndex.erase(it);
32fd6d8a 948 }
ddc77a2e 949 } else if(!requests.empty()) {
32fd6d8a 950 // Else get the first request from queue, if any
ddc77a2e
CT
951 r = requests.front();
952 requests.pop_front();
32fd6d8a
CT
953 }
954
ddc77a2e
CT
955 return r;
956}
957
958/// Calls back with a pointer to the buffer with the helper output
959static void
e05a9d51 960helperReturnBuffer(Helper::Session * srv, const Helper::Client::Pointer &hlp, char * const msg, const size_t msgSize, const char * const msgEnd)
ddc77a2e
CT
961{
962 if (Helper::Xaction *r = srv->replyXaction) {
963 const bool hasSpace = r->reply.accumulate(msg, msgSize);
964 if (!hasSpace) {
965 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
966 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
967 "Squid input buffer: " << hlp->id_name << " #" << srv->index);
0a958e42 968 srv->closePipesSafely();
ddc77a2e
CT
969 return;
970 }
971
972 if (!msgEnd)
973 return; // We are waiting for more data.
974
32fd6d8a 975 bool retry = false;
194ccc9c 976 if (cbdataReferenceValid(r->request.data)) {
ddc77a2e
CT
977 r->reply.finalize();
978 if (r->reply.result == Helper::BrokenHelper && r->request.retries < MAX_RETRIES) {
979 debugs(84, DBG_IMPORTANT, "ERROR: helper: " << r->reply << ", attempt #" << (r->request.retries + 1) << " of 2");
32fd6d8a 980 retry = true;
194ccc9c 981 } else {
26b6afaf 982 hlp->callBack(*r);
194ccc9c 983 }
12f7e09b 984 }
95d2589c 985
5e263176 986 -- srv->stats.pending;
1f7ba0b4 987 ++ srv->stats.replies;
95d2589c 988
95dc7ff4 989 ++ hlp->stats.replies;
95d2589c
CT
990
991 srv->answer_time = current_time;
992
ddc77a2e 993 srv->dispatch_time = r->request.dispatch_time;
95d2589c
CT
994
995 hlp->stats.avg_svc_time =
996 Math::intAverage(hlp->stats.avg_svc_time,
ddc77a2e 997 tvSubMsec(r->request.dispatch_time, current_time),
31bc1fa6 998 hlp->stats.replies, REDIRECT_AV_FACTOR);
95d2589c 999
ddc77a2e
CT
1000 // release or re-submit parsedRequestXaction object
1001 srv->replyXaction = nullptr;
32fd6d8a 1002 if (retry) {
ddc77a2e 1003 ++r->request.retries;
32fd6d8a
CT
1004 hlp->submitRequest(r);
1005 } else
1006 delete r;
95d2589c 1007 }
95d2589c 1008
32fd6d8a
CT
1009 if (hlp->timeout && hlp->childs.concurrency)
1010 srv->checkForTimedOutRequests(hlp->retryTimedOut);
1011
95d2589c
CT
1012 if (!srv->flags.shutdown) {
1013 helperKickQueue(hlp);
1014 } else if (!srv->flags.closing && !srv->stats.pending) {
0a958e42 1015 srv->closeWritePipeSafely();
95d2589c
CT
1016 }
1017}
94439e4e 1018
74addf6c 1019static void
ced8def3 1020helperHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
74addf6c 1021{
e05a9d51 1022 const auto srv = static_cast<Helper::Session *>(data);
3bd118d6 1023 const auto hlp = srv->parent;
fa80a8ef 1024 assert(cbdataReferenceValid(data));
c4b7a5a9 1025
c8407295 1026 /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
62e76326 1027
c8407295 1028 if (flag == Comm::ERR_CLOSING) {
c4b7a5a9 1029 return;
1030 }
1031
e0d28505 1032 assert(conn->fd == srv->readPipe->fd);
420e3e30 1033
e237d339 1034 debugs(84, 5, "helperHandleRead: " << len << " bytes from " << hlp->id_name << " #" << srv->index);
62e76326 1035
c8407295 1036 if (flag != Comm::OK || len == 0) {
0a958e42 1037 srv->closePipesSafely();
62e76326 1038 return;
74addf6c 1039 }
62e76326 1040
07eca7e0 1041 srv->roffset += len;
1042 srv->rbuf[srv->roffset] = '\0';
8289d5a9 1043 debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
62e76326 1044
32fd6d8a 1045 if (!srv->stats.pending && !srv->stats.timedout) {
62e76326 1046 /* someone spoke without being spoken to */
dd8b7fd7 1047 debugs(84, DBG_IMPORTANT, "ERROR: Killing helper process after an unexpected read from " <<
e237d339 1048 hlp->id_name << " #" << srv->index << ", " << (int)len <<
bf8fe701 1049 " bytes '" << srv->rbuf << "'");
1050
07eca7e0 1051 srv->roffset = 0;
1052 srv->rbuf[0] = '\0';
0a958e42 1053 srv->closePipesSafely();
dd8b7fd7 1054 return;
07eca7e0 1055 }
1056
ddc77a2e
CT
1057 bool needsMore = false;
1058 char *msg = srv->rbuf;
1059 while (*msg && !needsMore) {
1060 int skip = 0;
1061 char *eom = strchr(msg, hlp->eom);
1062 if (eom) {
1063 skip = 1;
1064 debugs(84, 3, "helperHandleRead: end of reply found");
1065 if (eom > msg && eom[-1] == '\r' && hlp->eom == '\n') {
1066 *eom = '\0';
1067 // rewind to the \r octet which is the real terminal now
1068 // and remember that we have to skip forward 2 places now.
1069 skip = 2;
1070 --eom;
1071 }
1072 *eom = '\0';
ec682a26 1073 }
bf8fe701 1074
1176e456 1075 if (!srv->ignoreToEom && !srv->replyXaction) {
ddc77a2e
CT
1076 int i = 0;
1077 if (hlp->childs.concurrency) {
aee3523a 1078 char *e = nullptr;
ddc77a2e
CT
1079 i = strtol(msg, &e, 10);
1080 // Do we need to check for e == msg? Means wrong response from helper.
2f8abb64 1081 // Will be dropped as "unexpected reply on channel 0"
ddc77a2e
CT
1082 needsMore = !(xisspace(*e) || (eom && e == eom));
1083 if (!needsMore) {
1084 msg = e;
1085 while (*msg && xisspace(*msg))
1086 ++msg;
1087 } // else not enough data to compute request number
1088 }
1089 if (!(srv->replyXaction = srv->popRequest(i))) {
1090 if (srv->stats.timedout) {
1091 debugs(84, 3, "Timedout reply received for request-ID: " << i << " , ignore");
1092 } else {
d816f28d 1093 debugs(84, DBG_IMPORTANT, "ERROR: helperHandleRead: unexpected reply on channel " <<
ddc77a2e
CT
1094 i << " from " << hlp->id_name << " #" << srv->index <<
1095 " '" << srv->rbuf << "'");
1096 }
1176e456 1097 srv->ignoreToEom = true;
ddc77a2e
CT
1098 }
1099 } // else we need to just append reply data to the current Xaction
1100
1101 if (!needsMore) {
1102 size_t msgSize = eom ? eom - msg : (srv->roffset - (msg - srv->rbuf));
1103 assert(msgSize <= srv->rbuf_sz);
1104 helperReturnBuffer(srv, hlp, msg, msgSize, eom);
1105 msg += msgSize + skip;
1106 assert(static_cast<size_t>(msg - srv->rbuf) <= srv->rbuf_sz);
1176e456
CT
1107
1108 // The next message should not ignored.
1109 if (eom && srv->ignoreToEom)
1110 srv->ignoreToEom = false;
ddc77a2e 1111 } else
aee3523a 1112 assert(skip == 0 && eom == nullptr);
ddc77a2e 1113 }
985bfe6b 1114
ddc77a2e
CT
1115 if (needsMore) {
1116 size_t msgSize = (srv->roffset - (msg - srv->rbuf));
1117 assert(msgSize <= srv->rbuf_sz);
1118 memmove(srv->rbuf, msg, msgSize);
1119 srv->roffset = msgSize;
7bef981e 1120 srv->rbuf[srv->roffset] = '\0';
ddc77a2e
CT
1121 } else {
1122 // All of the responses parsed and msg points at the end of read data
1123 assert(static_cast<size_t>(msg - srv->rbuf) == srv->roffset);
1124 srv->roffset = 0;
74addf6c 1125 }
07eca7e0 1126
b0bb5517 1127 if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
a78f2175
AR
1128 int spaceSize = srv->rbuf_sz - srv->roffset - 1;
1129 assert(spaceSize >= 0);
1130
abd8f140
AJ
1131 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
1132 CommIoCbPtrFun(helperHandleRead, srv));
a78f2175 1133 comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
abd8f140 1134 }
74addf6c 1135}
1136
94439e4e 1137static void
ced8def3 1138helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
94439e4e 1139{
aee3523a 1140 char *t = nullptr;
e6ccf245 1141 helper_stateful_server *srv = (helper_stateful_server *)data;
3bd118d6 1142 const auto hlp = srv->parent;
fa80a8ef 1143 assert(cbdataReferenceValid(data));
c4b7a5a9 1144
c8407295 1145 /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
62e76326 1146
c8407295 1147 if (flag == Comm::ERR_CLOSING) {
c4b7a5a9 1148 return;
1149 }
1150
e0d28505 1151 assert(conn->fd == srv->readPipe->fd);
420e3e30 1152
4a7a3d56 1153 debugs(84, 5, "helperStatefulHandleRead: " << len << " bytes from " <<
e237d339 1154 hlp->id_name << " #" << srv->index);
bf8fe701 1155
c8407295 1156 if (flag != Comm::OK || len == 0) {
0a958e42 1157 srv->closePipesSafely();
62e76326 1158 return;
94439e4e 1159 }
62e76326 1160
07eca7e0 1161 srv->roffset += len;
1162 srv->rbuf[srv->roffset] = '\0';
8289d5a9 1163 debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
62e76326 1164
dd8b7fd7 1165 if (srv->requests.empty()) {
62e76326 1166 /* someone spoke without being spoken to */
dd8b7fd7 1167 debugs(84, DBG_IMPORTANT, "ERROR: Killing helper process after an unexpected read from " <<
e237d339 1168 hlp->id_name << " #" << srv->index << ", " << (int)len <<
bf8fe701 1169 " bytes '" << srv->rbuf << "'");
1170
07eca7e0 1171 srv->roffset = 0;
0a958e42 1172 srv->closePipesSafely();
dd8b7fd7 1173 return;
07eca7e0 1174 }
1175
0af9303a 1176 if ((t = strchr(srv->rbuf, hlp->eom))) {
bf8fe701 1177 debugs(84, 3, "helperStatefulHandleRead: end of reply found");
6bf4f823 1178
36cb7d3e
AJ
1179 if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n') {
1180 *t = '\0';
1181 // rewind to the \r octet which is the real terminal now
36cb7d3e
AJ
1182 --t;
1183 }
6bf4f823 1184
62e76326 1185 *t = '\0';
ddc77a2e
CT
1186 }
1187
dd8b7fd7
EB
1188 const auto r = srv->requests.front();
1189
1190 if (!r->reply.accumulate(srv->rbuf, t ? (t - srv->rbuf) : srv->roffset)) {
ddc77a2e
CT
1191 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
1192 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
1193 "Squid input buffer: " << hlp->id_name << " #" << srv->index);
0a958e42 1194 srv->closePipesSafely();
ddc77a2e
CT
1195 return;
1196 }
1197 /**
1198 * BUG: the below assumes that only one response per read() was received and discards any octets remaining.
1199 * Doing this prohibits concurrency support with multiple replies per read().
1200 * TODO: check that read() setup on these buffers pays attention to roffest!=0
1201 * TODO: check that replies bigger than the buffer are discarded and do not to affect future replies
1202 */
1203 srv->roffset = 0;
1204
1205 if (t) {
1206 /* end of reply found */
1207 srv->requests.pop_front(); // we already have it in 'r'
1208 int called = 1;
62e76326 1209
dd8b7fd7 1210 if (cbdataReferenceValid(r->request.data)) {
ddc77a2e 1211 r->reply.finalize();
a56fcf0b 1212 r->reply.reservationId = srv->reservationId;
26b6afaf 1213 hlp->callBack(*r);
62e76326 1214 } else {
e0236918 1215 debugs(84, DBG_IMPORTANT, "StatefulHandleRead: no callback data registered");
e1381638 1216 called = 0;
62e76326 1217 }
ddc77a2e 1218
002df7ca 1219 delete r;
1f7ba0b4
AJ
1220
1221 -- srv->stats.pending;
1222 ++ srv->stats.replies;
1223
95dc7ff4 1224 ++ hlp->stats.replies;
f9598528 1225 srv->answer_time = current_time;
62e76326 1226 hlp->stats.avg_svc_time =
a98bcbee
AJ
1227 Math::intAverage(hlp->stats.avg_svc_time,
1228 tvSubMsec(srv->dispatch_time, current_time),
1229 hlp->stats.replies, REDIRECT_AV_FACTOR);
62e76326 1230
e1381638
AJ
1231 if (called)
1232 helperStatefulServerDone(srv);
1233 else
a56fcf0b 1234 hlp->cancelReservation(srv->reservationId);
94439e4e 1235 }
07eca7e0 1236
b0bb5517 1237 if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
ddc77a2e 1238 int spaceSize = srv->rbuf_sz - 1;
a78f2175 1239
abd8f140
AJ
1240 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
1241 CommIoCbPtrFun(helperStatefulHandleRead, srv));
ddc77a2e 1242 comm_read(srv->readPipe, srv->rbuf, spaceSize, call);
abd8f140 1243 }
94439e4e 1244}
1245
6825b101 1246/// Handles a request when all running helpers, if any, are busy.
74addf6c 1247static void
e05a9d51 1248Enqueue(Helper::Client * const hlp, Helper::Xaction * const r)
74addf6c 1249{
6215dc18 1250 hlp->queue.push(r);
95dc7ff4 1251 ++ hlp->stats.queue_size;
62e76326 1252
48d54e4d
AJ
1253 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1254 if (hlp->childs.needNew() > 0) {
bd71920d 1255 hlp->openSessions();
48d54e4d
AJ
1256 return;
1257 }
1258
6825b101 1259 if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
62e76326 1260 return;
1261
74addf6c 1262 if (squid_curtime - hlp->last_queue_warn < 600)
62e76326 1263 return;
1264
fe73896c 1265 if (shutting_down || reconfiguring)
62e76326 1266 return;
1267
74addf6c 1268 hlp->last_queue_warn = squid_curtime;
62e76326 1269
fa84c01d
FC
1270 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1271 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1272 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
74addf6c 1273}
1274
94439e4e 1275static void
ddc77a2e 1276StatefulEnqueue(statefulhelper * hlp, Helper::Xaction * r)
94439e4e 1277{
6215dc18 1278 hlp->queue.push(r);
95dc7ff4 1279 ++ hlp->stats.queue_size;
62e76326 1280
48d54e4d
AJ
1281 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1282 if (hlp->childs.needNew() > 0) {
bd71920d 1283 hlp->openSessions();
48d54e4d
AJ
1284 return;
1285 }
1286
6825b101 1287 if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
62e76326 1288 return;
1289
94439e4e 1290 if (squid_curtime - hlp->last_queue_warn < 600)
62e76326 1291 return;
1292
94439e4e 1293 if (shutting_down || reconfiguring)
62e76326 1294 return;
1295
94439e4e 1296 hlp->last_queue_warn = squid_curtime;
62e76326 1297
fa84c01d
FC
1298 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1299 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1300 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
94439e4e 1301}
1302
ddc77a2e 1303Helper::Xaction *
e05a9d51 1304Helper::Client::nextRequest()
74addf6c 1305{
6215dc18
AJ
1306 if (queue.empty())
1307 return nullptr;
62e76326 1308
6215dc18
AJ
1309 auto *r = queue.front();
1310 queue.pop();
1311 --stats.queue_size;
94439e4e 1312 return r;
1313}
1314
e05a9d51
EB
1315static Helper::Session *
1316GetFirstAvailable(const Helper::Client::Pointer &hlp)
74addf6c 1317{
1318 dlink_node *n;
e05a9d51 1319 Helper::Session *selected = nullptr;
48e7baac 1320 debugs(84, 5, "GetFirstAvailable: Running servers " << hlp->childs.n_running);
62e76326 1321
48d54e4d 1322 if (hlp->childs.n_running == 0)
aee3523a 1323 return nullptr;
62e76326 1324
07eca7e0 1325 /* Find "least" loaded helper (approx) */
aee3523a 1326 for (n = hlp->servers.head; n != nullptr; n = n->next) {
e05a9d51 1327 const auto srv = static_cast<Helper::Session *>(n->data);
62e76326 1328
07eca7e0 1329 if (selected && selected->stats.pending <= srv->stats.pending)
62e76326 1330 continue;
1331
d8f10d6a 1332 if (srv->flags.shutdown)
62e76326 1333 continue;
1334
07eca7e0 1335 if (!srv->stats.pending)
1336 return srv;
1337
1338 if (selected) {
1339 selected = srv;
1340 break;
1341 }
1342
1343 selected = srv;
74addf6c 1344 }
62e76326 1345
48e7baac
AJ
1346 if (!selected) {
1347 debugs(84, 5, "GetFirstAvailable: None available.");
aee3523a 1348 return nullptr;
48e7baac 1349 }
07eca7e0 1350
48e7baac 1351 if (selected->stats.pending >= (hlp->childs.concurrency ? hlp->childs.concurrency : 1)) {
6082a0e2 1352 debugs(84, 3, "GetFirstAvailable: Least-loaded helper is fully loaded!");
aee3523a 1353 return nullptr;
48e7baac 1354 }
07eca7e0 1355
48e7baac 1356 debugs(84, 5, "GetFirstAvailable: returning srv-" << selected->index);
07eca7e0 1357 return selected;
74addf6c 1358}
1359
94439e4e 1360static helper_stateful_server *
3bd118d6 1361StatefulGetFirstAvailable(const statefulhelper::Pointer &hlp)
94439e4e 1362{
1363 dlink_node *n;
aee3523a 1364 helper_stateful_server *srv = nullptr;
a56fcf0b 1365 helper_stateful_server *oldestReservedServer = nullptr;
48d54e4d 1366 debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp->childs.n_running);
62e76326 1367
48d54e4d 1368 if (hlp->childs.n_running == 0)
aee3523a 1369 return nullptr;
62e76326 1370
aee3523a 1371 for (n = hlp->servers.head; n != nullptr; n = n->next) {
62e76326 1372 srv = (helper_stateful_server *)n->data;
1373
a1267972 1374 if (srv->stats.pending)
62e76326 1375 continue;
1376
a56fcf0b
CT
1377 if (srv->reserved()) {
1378 if ((squid_curtime - srv->reservationStart) > hlp->childs.reservationTimeout) {
1379 if (!oldestReservedServer)
1380 oldestReservedServer = srv;
1381 else if (oldestReservedServer->reservationStart < srv->reservationStart)
1382 oldestReservedServer = srv;
1383 debugs(84, 5, "the earlier reserved server is the srv-" << oldestReservedServer->index);
1384 }
62e76326 1385 continue;
a56fcf0b 1386 }
62e76326 1387
d8f10d6a 1388 if (srv->flags.shutdown)
62e76326 1389 continue;
1390
26ac0430 1391 debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv->index);
62e76326 1392 return srv;
94439e4e 1393 }
62e76326 1394
a56fcf0b
CT
1395 if (oldestReservedServer) {
1396 debugs(84, 5, "expired reservation " << oldestReservedServer->reservationId << " for srv-" << oldestReservedServer->index);
1397 return oldestReservedServer;
1398 }
1399
bf8fe701 1400 debugs(84, 5, "StatefulGetFirstAvailable: None available.");
a56fcf0b 1401 return nullptr;
94439e4e 1402}
1403
42679bd6 1404static void
ced8def3 1405helperDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag flag, int, void *data)
42679bd6 1406{
e05a9d51 1407 const auto srv = static_cast<Helper::Session *>(data);
07eca7e0 1408
2fe7eff9 1409 srv->writebuf->clean();
032785bf 1410 delete srv->writebuf;
aee3523a 1411 srv->writebuf = nullptr;
be4d35dc 1412 srv->flags.writing = false;
07eca7e0 1413
c8407295 1414 if (flag != Comm::OK) {
07eca7e0 1415 /* Helper server has crashed */
e237d339 1416 debugs(84, DBG_CRITICAL, "helperDispatch: Helper " << srv->parent->id_name << " #" << srv->index << " has crashed");
07eca7e0 1417 return;
1418 }
1419
2fe7eff9 1420 if (!srv->wqueue->isNull()) {
07eca7e0 1421 srv->writebuf = srv->wqueue;
032785bf 1422 srv->wqueue = new MemBuf;
be4d35dc 1423 srv->flags.writing = true;
ec41b64c
AJ
1424 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1425 CommIoCbPtrFun(helperDispatchWriteDone, srv));
aee3523a 1426 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, nullptr);
07eca7e0 1427 }
42679bd6 1428}
1429
74addf6c 1430static void
e05a9d51 1431helperDispatch(Helper::Session * const srv, Helper::Xaction * const r)
74addf6c 1432{
3bd118d6 1433 const auto hlp = srv->parent;
32fd6d8a 1434 const uint64_t reqId = ++srv->nextRequestId;
62e76326 1435
ddc77a2e 1436 if (!cbdataReferenceValid(r->request.data)) {
d816f28d 1437 debugs(84, DBG_IMPORTANT, "ERROR: helperDispatch: invalid callback data");
002df7ca 1438 delete r;
62e76326 1439 return;
74addf6c 1440 }
62e76326 1441
ddc77a2e 1442 r->request.Id = reqId;
e05a9d51 1443 const auto it = srv->requests.insert(srv->requests.end(), r);
ddc77a2e 1444 r->request.dispatch_time = current_time;
07eca7e0 1445
2fe7eff9 1446 if (srv->wqueue->isNull())
1447 srv->wqueue->init();
07eca7e0 1448
32fd6d8a 1449 if (hlp->childs.concurrency) {
e05a9d51 1450 srv->requestsIndex.insert(Helper::Session::RequestIndex::value_type(reqId, it));
32fd6d8a 1451 assert(srv->requestsIndex.size() == srv->requests.size());
ddc77a2e 1452 srv->wqueue->appendf("%" PRIu64 " %s", reqId, r->request.buf);
32fd6d8a 1453 } else
ddc77a2e 1454 srv->wqueue->append(r->request.buf, strlen(r->request.buf));
07eca7e0 1455
1456 if (!srv->flags.writing) {
aee3523a 1457 assert(nullptr == srv->writebuf);
07eca7e0 1458 srv->writebuf = srv->wqueue;
032785bf 1459 srv->wqueue = new MemBuf;
be4d35dc 1460 srv->flags.writing = true;
ec41b64c
AJ
1461 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1462 CommIoCbPtrFun(helperDispatchWriteDone, srv));
aee3523a 1463 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, nullptr);
07eca7e0 1464 }
1465
ddc77a2e 1466 debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index << ", " << strlen(r->request.buf) << " bytes");
bf8fe701 1467
95dc7ff4 1468 ++ srv->stats.uses;
1f7ba0b4 1469 ++ srv->stats.pending;
95dc7ff4 1470 ++ hlp->stats.requests;
74addf6c 1471}
1472
42679bd6 1473static void
ced8def3
AJ
1474helperStatefulDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag, int, void *)
1475{}
42679bd6 1476
94439e4e 1477static void
ddc77a2e 1478helperStatefulDispatch(helper_stateful_server * srv, Helper::Xaction * r)
94439e4e 1479{
3bd118d6 1480 const auto hlp = srv->parent;
62e76326 1481
ddc77a2e 1482 if (!cbdataReferenceValid(r->request.data)) {
d816f28d 1483 debugs(84, DBG_IMPORTANT, "ERROR: helperStatefulDispatch: invalid callback data");
002df7ca 1484 delete r;
a56fcf0b 1485 hlp->cancelReservation(srv->reservationId);
62e76326 1486 return;
94439e4e 1487 }
62e76326 1488
e237d339 1489 debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index);
62e76326 1490
a56fcf0b
CT
1491 assert(srv->reservationId);
1492 r->reply.reservationId = srv->reservationId;
1493
ddc77a2e 1494 if (r->request.placeholder == 1) {
62e76326 1495 /* a callback is needed before this request can _use_ a helper. */
d20ce97d 1496 /* we don't care about releasing this helper. The request NEVER
62e76326 1497 * gets to the helper. So we throw away the return code */
ddc77a2e 1498 r->reply.result = Helper::Unknown;
26b6afaf 1499 hlp->callBack(*r);
62e76326 1500 /* throw away the placeholder */
002df7ca 1501 delete r;
62e76326 1502 /* and push the queue. Note that the callback may have submitted a new
e166785a 1503 * request to the helper which is why we test for the request */
62e76326 1504
bf3e8d5a 1505 if (!srv->requests.size())
a8c4f8d6 1506 helperStatefulServerDone(srv);
62e76326 1507
1508 return;
94439e4e 1509 }
62e76326 1510
bf3e8d5a 1511 srv->requests.push_back(r);
94439e4e 1512 srv->dispatch_time = current_time;
ec41b64c 1513 AsyncCall::Pointer call = commCbCall(5,5, "helperStatefulDispatchWriteDone",
3bd118d6 1514 CommIoCbPtrFun(helperStatefulDispatchWriteDone, srv));
aee3523a 1515 Comm::Write(srv->writePipe, r->request.buf, strlen(r->request.buf), call, nullptr);
bf8fe701 1516 debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
e237d339 1517 hlp->id_name << " #" << srv->index << ", " <<
ddc77a2e 1518 (int) strlen(r->request.buf) << " bytes");
bf8fe701 1519
95dc7ff4 1520 ++ srv->stats.uses;
1f7ba0b4 1521 ++ srv->stats.pending;
95dc7ff4 1522 ++ hlp->stats.requests;
94439e4e 1523}
1524
74addf6c 1525static void
e05a9d51 1526helperKickQueue(const Helper::Client::Pointer &hlp)
74addf6c 1527{
e05a9d51
EB
1528 Helper::Xaction *r = nullptr;
1529 Helper::Session *srv = nullptr;
62e76326 1530
6215dc18 1531 while ((srv = GetFirstAvailable(hlp)) && (r = hlp->nextRequest()))
62e76326 1532 helperDispatch(srv, r);
569605b9
EB
1533
1534 if (!hlp->childs.n_active)
1535 hlp->dropQueued();
74addf6c 1536}
1537
94439e4e 1538static void
3bd118d6 1539helperStatefulKickQueue(const statefulhelper::Pointer &hlp)
94439e4e 1540{
ddc77a2e 1541 Helper::Xaction *r;
94439e4e 1542 helper_stateful_server *srv;
a56fcf0b
CT
1543 while ((srv = StatefulGetFirstAvailable(hlp)) && (r = hlp->nextRequest())) {
1544 debugs(84, 5, "found srv-" << srv->index);
1545 hlp->reserveServer(srv);
62e76326 1546 helperStatefulDispatch(srv, r);
a56fcf0b 1547 }
569605b9
EB
1548
1549 if (!hlp->childs.n_active)
1550 hlp->dropQueued();
94439e4e 1551}
1552
1553static void
a8c4f8d6 1554helperStatefulServerDone(helper_stateful_server * srv)
94439e4e 1555{
420e3e30
HN
1556 if (!srv->flags.shutdown) {
1557 helperStatefulKickQueue(srv->parent);
a56fcf0b 1558 } else if (!srv->flags.closing && !srv->reserved() && !srv->stats.pending) {
0a958e42 1559 srv->closeWritePipeSafely();
420e3e30
HN
1560 return;
1561 }
94439e4e 1562}
1563
32fd6d8a 1564void
e05a9d51 1565Helper::Session::checkForTimedOutRequests(bool const retry)
32fd6d8a
CT
1566{
1567 assert(parent->childs.concurrency);
ddc77a2e 1568 while(!requests.empty() && requests.front()->request.timedOut(parent->timeout)) {
e05a9d51 1569 const auto r = requests.front();
32fd6d8a 1570 RequestIndex::iterator it;
ddc77a2e 1571 it = requestsIndex.find(r->request.Id);
32fd6d8a
CT
1572 assert(it != requestsIndex.end());
1573 requestsIndex.erase(it);
1574 requests.pop_front();
ddc77a2e 1575 debugs(84, 2, "Request " << r->request.Id << " timed-out, remove it from queue");
32fd6d8a 1576 bool retried = false;
ddc77a2e
CT
1577 if (retry && r->request.retries < MAX_RETRIES && cbdataReferenceValid(r->request.data)) {
1578 debugs(84, 2, "Retry request " << r->request.Id);
1579 ++r->request.retries;
32fd6d8a
CT
1580 parent->submitRequest(r);
1581 retried = true;
26b6afaf 1582 } else if (cbdataReferenceValid(r->request.data)) {
32fd6d8a 1583 if (!parent->onTimedOutResponse.isEmpty()) {
ddc77a2e
CT
1584 if (r->reply.accumulate(parent->onTimedOutResponse.rawContent(), parent->onTimedOutResponse.length()))
1585 r->reply.finalize();
1586 else
1587 r->reply.result = Helper::TimedOut;
26b6afaf 1588 parent->callBack(*r);
ddc77a2e
CT
1589 } else {
1590 r->reply.result = Helper::TimedOut;
26b6afaf 1591 parent->callBack(*r);
ddc77a2e 1592 }
32fd6d8a
CT
1593 }
1594 --stats.pending;
1595 ++stats.timedout;
1596 ++parent->stats.timedout;
1597 if (!retried)
1598 delete r;
1599 }
1600}
1601
1602void
e05a9d51 1603Helper::Session::requestTimeout(const CommTimeoutCbParams &io)
32fd6d8a 1604{
bf95c10a 1605 debugs(26, 3, io.conn);
e05a9d51 1606 const auto srv = static_cast<Session *>(io.data);
32fd6d8a 1607
32fd6d8a
CT
1608 srv->checkForTimedOutRequests(srv->parent->retryTimedOut);
1609
e05a9d51
EB
1610 debugs(84, 3, io.conn << " establish a new timeout");
1611 AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "Helper::Session::requestTimeout",
1612 CommTimeoutCbPtrFun(Session::requestTimeout, srv));
32fd6d8a 1613
4650a4fa
FC
1614 const time_t timeSpent = srv->requests.empty() ? 0 : (squid_curtime - srv->requests.front()->request.dispatch_time.tv_sec);
1615 const time_t minimumNewTimeout = 1; // second
1616 const auto timeLeft = max(minimumNewTimeout, srv->parent->timeout - timeSpent);
32fd6d8a
CT
1617
1618 commSetConnTimeout(io.conn, timeLeft, timeoutCall);
1619}
f53969cc 1620