]> git.ipfire.org Git - thirdparty/squid.git/blame - src/helper.cc
Fix parsing of malformed quoted squid.conf strings (#2239)
[thirdparty/squid.git] / src / helper.cc
CommitLineData
f740a279 1/*
1f7b830e 2 * Copyright (C) 1996-2025 The Squid Software Foundation and contributors
f740a279 3 *
bbc27441
AJ
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
f740a279 7 */
8
bbc27441
AJ
9/* DEBUG: section 84 Helper process maintenance */
10
582c2af2 11#include "squid.h"
37dedc58 12#include "base/AsyncCbdataCalls.h"
bf3e8d5a 13#include "base/Packable.h"
675b8408 14#include "base/Raw.h"
e0d28505
AJ
15#include "comm.h"
16#include "comm/Connection.h"
7e66d5e2 17#include "comm/Read.h"
ec41b64c 18#include "comm/Write.h"
675b8408 19#include "debug/Messages.h"
c4ad1349 20#include "fd.h"
b0bb5517 21#include "fde.h"
38e16f92 22#include "format/Quoting.h"
582c2af2 23#include "helper.h"
24438ec5
AJ
24#include "helper/Reply.h"
25#include "helper/Request.h"
e0d28505 26#include "MemBuf.h"
32fd6d8a 27#include "SquidConfig.h"
96097880 28#include "SquidIpc.h"
a98bcbee 29#include "SquidMath.h"
e6ccf245 30#include "Store.h"
d295d770 31#include "wordlist.h"
74addf6c 32
ed6e9fb9
AJ
33// helper_stateful_server::data uses explicit alloc()/freeOne() */
34#include "mem/Pool.h"
35
74addf6c 36#define HELPER_MAX_ARGS 64
37
32fd6d8a
CT
38/// The maximum allowed request retries.
39#define MAX_RETRIES 2
40
ddc77a2e 41/// Helpers input buffer size.
bfc9f840
NZ
42/// Keep in sync with MAX_PAC_GROUP_SIZE until converted to SBuf
43const size_t ReadBufSize(128*1024);
a78f2175 44
c4b7a5a9 45static IOCB helperHandleRead;
46static IOCB helperStatefulHandleRead;
e05a9d51
EB
47static void Enqueue(Helper::Client *, Helper::Xaction *);
48static Helper::Session *GetFirstAvailable(const Helper::Client::Pointer &);
3bd118d6 49static helper_stateful_server *StatefulGetFirstAvailable(const statefulhelper::Pointer &);
e05a9d51 50static void helperDispatch(Helper::Session *, Helper::Xaction *);
ddc77a2e 51static void helperStatefulDispatch(helper_stateful_server * srv, Helper::Xaction * r);
e05a9d51 52static void helperKickQueue(const Helper::Client::Pointer &);
3bd118d6 53static void helperStatefulKickQueue(const statefulhelper::Pointer &);
a8c4f8d6 54static void helperStatefulServerDone(helper_stateful_server * srv);
ddc77a2e 55static void StatefulEnqueue(statefulhelper * hlp, Helper::Xaction * r);
9522b380 56
e05a9d51 57CBDATA_NAMESPACED_CLASS_INIT(Helper, Session);
f3831c93 58CBDATA_CLASS_INIT(helper_stateful_server);
aa839030 59
e05a9d51 60InstanceIdDefinitions(Helper::SessionBase, "Hlpr");
e237d339 61
895dba0a 62void
e05a9d51 63Helper::SessionBase::initStats()
895dba0a
CT
64{
65 stats.uses=0;
66 stats.replies=0;
67 stats.pending=0;
68 stats.releases=0;
69 stats.timedout = 0;
70}
71
e0d28505 72void
0a958e42 73Helper::SessionBase::closePipesSafely()
e0d28505 74{
7aa9bb3e 75#if _SQUID_WINDOWS_
e0d28505
AJ
76 shutdown(writePipe->fd, SD_BOTH);
77#endif
78
be4d35dc 79 flags.closing = true;
e0d28505
AJ
80 if (readPipe->fd == writePipe->fd)
81 readPipe->fd = -1;
82 else
83 readPipe->close();
84 writePipe->close();
85
7aa9bb3e 86#if _SQUID_WINDOWS_
dc49061a
A
87 if (hIpc) {
88 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
89 getCurrentTime();
0a958e42 90 debugs(84, DBG_IMPORTANT, "WARNING: " << helper().id_name <<
41060bef 91 " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
e0d28505 92 }
dc49061a
A
93 CloseHandle(hIpc);
94 }
e0d28505
AJ
95#endif
96}
97
98void
0a958e42 99Helper::SessionBase::closeWritePipeSafely()
e0d28505 100{
7aa9bb3e 101#if _SQUID_WINDOWS_
e0d28505
AJ
102 shutdown(writePipe->fd, (readPipe->fd == writePipe->fd ? SD_BOTH : SD_SEND));
103#endif
104
be4d35dc 105 flags.closing = true;
e0d28505
AJ
106 if (readPipe->fd == writePipe->fd)
107 readPipe->fd = -1;
108 writePipe->close();
109
7aa9bb3e 110#if _SQUID_WINDOWS_
dc49061a
A
111 if (hIpc) {
112 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
113 getCurrentTime();
0a958e42 114 debugs(84, DBG_IMPORTANT, "WARNING: " << helper().id_name <<
41060bef 115 " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
e0d28505 116 }
dc49061a
A
117 CloseHandle(hIpc);
118 }
e0d28505
AJ
119#endif
120}
121
a56fcf0b 122void
0a958e42 123Helper::SessionBase::dropQueued()
a56fcf0b
CT
124{
125 while (!requests.empty()) {
126 // XXX: re-schedule these on another helper?
e05a9d51 127 const auto r = requests.front();
a56fcf0b 128 requests.pop_front();
26b6afaf 129 r->reply.result = Helper::Unknown;
0a958e42 130 helper().callBack(*r);
a56fcf0b
CT
131 delete r;
132 }
133}
134
e05a9d51 135Helper::SessionBase::~SessionBase()
a56fcf0b
CT
136{
137 if (rbuf) {
138 memFreeBuf(rbuf_sz, rbuf);
aee3523a 139 rbuf = nullptr;
a56fcf0b
CT
140 }
141}
142
e05a9d51 143Helper::Session::~Session()
a56fcf0b
CT
144{
145 wqueue->clean();
146 delete wqueue;
147
148 if (writebuf) {
149 writebuf->clean();
150 delete writebuf;
aee3523a 151 writebuf = nullptr;
a56fcf0b
CT
152 }
153
154 if (Comm::IsConnOpen(writePipe))
0a958e42 155 closeWritePipeSafely();
a56fcf0b
CT
156
157 dlinkDelete(&link, &parent->servers);
158
159 assert(parent->childs.n_running > 0);
160 -- parent->childs.n_running;
161
162 assert(requests.empty());
a56fcf0b
CT
163}
164
165void
0a958e42 166Helper::Session::dropQueued()
a56fcf0b 167{
0a958e42 168 SessionBase::dropQueued();
a56fcf0b
CT
169 requestsIndex.clear();
170}
171
172helper_stateful_server::~helper_stateful_server()
173{
174 /* TODO: walk the local queue of requests and carry them all out */
175 if (Comm::IsConnOpen(writePipe))
0a958e42 176 closeWritePipeSafely();
a56fcf0b
CT
177
178 parent->cancelReservation(reservationId);
179
180 dlinkDelete(&link, &parent->servers);
181
182 assert(parent->childs.n_running > 0);
183 -- parent->childs.n_running;
184
185 assert(requests.empty());
a56fcf0b
CT
186}
187
74addf6c 188void
bd71920d 189Helper::Client::openSessions()
74addf6c 190{
191 char *s;
192 char *progname;
193 char *shortname;
194 char *procname;
1b8af627 195 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
74addf6c 196 char fd_note_buf[FD_DESC_SZ];
74addf6c 197 int nargs = 0;
198 int k;
b5d712b5 199 pid_t pid;
74addf6c 200 int rfd;
201 int wfd;
b5d712b5 202 void * hIpc;
74addf6c 203 wordlist *w;
bd71920d
EB
204 // Helps reducing diff. TODO: remove
205 const auto hlp = this;
62e76326 206
aee3523a 207 if (hlp->cmdline == nullptr)
62e76326 208 return;
209
74addf6c 210 progname = hlp->cmdline->key;
62e76326 211
74addf6c 212 if ((s = strrchr(progname, '/')))
62e76326 213 shortname = xstrdup(s + 1);
74addf6c 214 else
62e76326 215 shortname = xstrdup(progname);
216
48d54e4d
AJ
217 /* figure out how many new child are actually needed. */
218 int need_new = hlp->childs.needNew();
5a5b2c56 219
c59baaa8 220 debugs(84, Important(19), "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
5a5b2c56 221
04f7fd38 222 if (need_new < 1) {
c59baaa8 223 debugs(84, Important(20), "helperOpenServers: No '" << shortname << "' processes needed.");
5a5b2c56 224 }
62e76326 225
e6ccf245 226 procname = (char *)xmalloc(strlen(shortname) + 3);
62e76326 227
74addf6c 228 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
62e76326 229
a38ec4b1
FC
230 args[nargs] = procname;
231 ++nargs;
62e76326 232
a38ec4b1
FC
233 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
234 args[nargs] = w->key;
235 ++nargs;
236 }
62e76326 237
aee3523a 238 args[nargs] = nullptr;
a38ec4b1 239 ++nargs;
62e76326 240
bcfcb68d 241 assert(nargs <= HELPER_MAX_ARGS + 1);
62e76326 242
654835f0
EB
243 int successfullyStarted = 0;
244
95dc7ff4 245 for (k = 0; k < need_new; ++k) {
62e76326 246 getCurrentTime();
247 rfd = wfd = -1;
b5d712b5 248 pid = ipcCreate(hlp->ipc_type,
249 progname,
250 args,
251 shortname,
cc192b50 252 hlp->addr,
b5d712b5 253 &rfd,
254 &wfd,
255 &hIpc);
256
257 if (pid < 0) {
e0236918 258 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
62e76326 259 continue;
260 }
261
654835f0 262 ++successfullyStarted;
95dc7ff4
FC
263 ++ hlp->childs.n_running;
264 ++ hlp->childs.n_active;
e05a9d51 265 const auto srv = new Helper::Session;
895dba0a
CT
266 srv->hIpc = hIpc;
267 srv->pid = pid;
268 srv->initStats();
269 srv->addr = hlp->addr;
270 srv->readPipe = new Comm::Connection;
271 srv->readPipe->fd = rfd;
272 srv->writePipe = new Comm::Connection;
273 srv->writePipe->fd = wfd;
274 srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
275 srv->wqueue = new MemBuf;
276 srv->roffset = 0;
277 srv->nextRequestId = 0;
aee3523a 278 srv->replyXaction = nullptr;
895dba0a 279 srv->ignoreToEom = false;
3bd118d6 280 srv->parent = hlp;
62e76326 281 dlinkAddTail(srv, &srv->link, &hlp->servers);
282
283 if (rfd == wfd) {
284 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
285 fd_note(rfd, fd_note_buf);
286 } else {
287 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
288 fd_note(rfd, fd_note_buf);
289 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
290 fd_note(wfd, fd_note_buf);
291 }
292
293 commSetNonBlocking(rfd);
294
295 if (wfd != rfd)
296 commSetNonBlocking(wfd);
297
0a958e42 298 AsyncCall::Pointer closeCall = asyncCall(5, 4, "Helper::Session::HelperServerClosed", cbdataDialer(SessionBase::HelperServerClosed,
ab2acef6 299 static_cast<Helper::SessionBase *>(srv)));
0a958e42 300
37dedc58 301 comm_add_close_handler(rfd, closeCall);
07eca7e0 302
f53969cc 303 if (hlp->timeout && hlp->childs.concurrency) {
e05a9d51
EB
304 AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "Helper::Session::requestTimeout",
305 CommTimeoutCbPtrFun(Helper::Session::requestTimeout, srv));
32fd6d8a
CT
306 commSetConnTimeout(srv->readPipe, hlp->timeout, timeoutCall);
307 }
308
abd8f140
AJ
309 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
310 CommIoCbPtrFun(helperHandleRead, srv));
311 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
74addf6c 312 }
62e76326 313
654835f0
EB
314 // Call handleFewerServers() before hlp->last_restart is updated because
315 // that method uses last_restart to measure the delay since previous start.
316 // TODO: Refactor last_restart code to measure failure frequency rather than
317 // detecting a helper #X failure that is being close to the helper #Y start.
318 if (successfullyStarted < need_new)
319 hlp->handleFewerServers(false);
320
5ea33fce 321 hlp->last_restart = squid_curtime;
74addf6c 322 safe_free(shortname);
323 safe_free(procname);
838b993c 324 helperKickQueue(hlp);
74addf6c 325}
326
94439e4e 327void
bd71920d 328statefulhelper::openSessions()
94439e4e 329{
94439e4e 330 char *shortname;
1b8af627 331 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
94439e4e 332 char fd_note_buf[FD_DESC_SZ];
94439e4e 333 int nargs = 0;
bd71920d
EB
334 // Helps reducing diff. TODO: remove
335 const auto hlp = this;
62e76326 336
aee3523a 337 if (hlp->cmdline == nullptr)
62e76326 338 return;
339
7353861b 340 if (hlp->childs.concurrency)
fa84c01d 341 debugs(84, DBG_CRITICAL, "ERROR: concurrency= is not yet supported for stateful helpers ('" << hlp->cmdline << "')");
7353861b 342
4f0ef8e8 343 char *progname = hlp->cmdline->key;
62e76326 344
4f0ef8e8 345 char *s;
94439e4e 346 if ((s = strrchr(progname, '/')))
62e76326 347 shortname = xstrdup(s + 1);
94439e4e 348 else
62e76326 349 shortname = xstrdup(progname);
350
48d54e4d
AJ
351 /* figure out haw mant new helpers are needed. */
352 int need_new = hlp->childs.needNew();
5a5b2c56 353
e0236918 354 debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
5a5b2c56 355
04f7fd38 356 if (need_new < 1) {
e0236918 357 debugs(84, DBG_IMPORTANT, "helperStatefulOpenServers: No '" << shortname << "' processes needed.");
5a5b2c56 358 }
62e76326 359
4f0ef8e8 360 char *procname = (char *)xmalloc(strlen(shortname) + 3);
62e76326 361
94439e4e 362 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
62e76326 363
a38ec4b1
FC
364 args[nargs] = procname;
365 ++nargs;
62e76326 366
a38ec4b1
FC
367 for (wordlist *w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
368 args[nargs] = w->key;
369 ++nargs;
370 }
62e76326 371
aee3523a 372 args[nargs] = nullptr;
a38ec4b1 373 ++nargs;
62e76326 374
bcfcb68d 375 assert(nargs <= HELPER_MAX_ARGS + 1);
62e76326 376
654835f0
EB
377 int successfullyStarted = 0;
378
95dc7ff4 379 for (int k = 0; k < need_new; ++k) {
62e76326 380 getCurrentTime();
26ac0430
AJ
381 int rfd = -1;
382 int wfd = -1;
383 void * hIpc;
4f0ef8e8 384 pid_t pid = ipcCreate(hlp->ipc_type,
26ac0430
AJ
385 progname,
386 args,
387 shortname,
388 hlp->addr,
389 &rfd,
390 &wfd,
391 &hIpc);
b5d712b5 392
393 if (pid < 0) {
e0236918 394 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
62e76326 395 continue;
396 }
397
654835f0 398 ++successfullyStarted;
95dc7ff4
FC
399 ++ hlp->childs.n_running;
400 ++ hlp->childs.n_active;
895dba0a
CT
401 helper_stateful_server *srv = new helper_stateful_server;
402 srv->hIpc = hIpc;
403 srv->pid = pid;
404 srv->initStats();
405 srv->addr = hlp->addr;
406 srv->readPipe = new Comm::Connection;
407 srv->readPipe->fd = rfd;
408 srv->writePipe = new Comm::Connection;
409 srv->writePipe->fd = wfd;
410 srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
411 srv->roffset = 0;
3bd118d6 412 srv->parent = hlp;
895dba0a
CT
413 srv->reservationStart = 0;
414
62e76326 415 dlinkAddTail(srv, &srv->link, &hlp->servers);
416
417 if (rfd == wfd) {
418 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
419 fd_note(rfd, fd_note_buf);
420 } else {
421 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
422 fd_note(rfd, fd_note_buf);
423 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
424 fd_note(wfd, fd_note_buf);
425 }
426
427 commSetNonBlocking(rfd);
428
429 if (wfd != rfd)
430 commSetNonBlocking(wfd);
431
0a958e42 432 AsyncCall::Pointer closeCall = asyncCall(5, 4, "helper_stateful_server::HelperServerClosed", cbdataDialer(Helper::SessionBase::HelperServerClosed,
ab2acef6 433 static_cast<Helper::SessionBase *>(srv)));
0a958e42 434
37dedc58 435 comm_add_close_handler(rfd, closeCall);
07eca7e0 436
abd8f140
AJ
437 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
438 CommIoCbPtrFun(helperStatefulHandleRead, srv));
439 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
94439e4e 440 }
62e76326 441
654835f0
EB
442 // Call handleFewerServers() before hlp->last_restart is updated because
443 // that method uses last_restart to measure the delay since previous start.
444 // TODO: Refactor last_restart code to measure failure frequency rather than
445 // detecting a helper #X failure that is being close to the helper #Y start.
446 if (successfullyStarted < need_new)
447 hlp->handleFewerServers(false);
448
5ea33fce 449 hlp->last_restart = squid_curtime;
94439e4e 450 safe_free(shortname);
451 safe_free(procname);
452 helperStatefulKickQueue(hlp);
453}
454
32fd6d8a 455void
e05a9d51 456Helper::Client::submitRequest(Helper::Xaction * const r)
32fd6d8a 457{
e05a9d51 458 if (const auto srv = GetFirstAvailable(this))
32fd6d8a
CT
459 helperDispatch(srv, r);
460 else
461 Enqueue(this, r);
462
6082a0e2
EB
463 syncQueueStats();
464}
465
466/// handles helperSubmit() and helperStatefulSubmit() failures
467static void
e05a9d51 468SubmissionFailure(const Helper::Client::Pointer &hlp, HLPCB *callback, void *data)
6082a0e2
EB
469{
470 auto result = Helper::Error;
471 if (!hlp) {
472 debugs(84, 3, "no helper");
473 result = Helper::Unknown;
32fd6d8a 474 }
6082a0e2
EB
475 // else pretend the helper has responded with ERR
476
477 callback(data, Helper::Reply(result));
32fd6d8a
CT
478}
479
74addf6c 480void
e05a9d51 481helperSubmit(const Helper::Client::Pointer &hlp, const char * const buf, HLPCB * const callback, void * const data)
74addf6c 482{
6082a0e2
EB
483 if (!hlp || !hlp->trySubmit(buf, callback, data))
484 SubmissionFailure(hlp, callback, data);
6825b101
CT
485}
486
6082a0e2 487/// whether queuing an additional request would overload the helper
6825b101 488bool
e05a9d51 489Helper::Client::queueFull() const {
6082a0e2
EB
490 return stats.queue_size >= static_cast<int>(childs.queue_size);
491}
492
493bool
e05a9d51 494Helper::Client::overloaded() const {
6825b101
CT
495 return stats.queue_size > static_cast<int>(childs.queue_size);
496}
497
6082a0e2 498/// synchronizes queue-dependent measurements with the current queue state
6825b101 499void
e05a9d51 500Helper::Client::syncQueueStats()
6825b101 501{
6082a0e2
EB
502 if (overloaded()) {
503 if (overloadStart) {
504 debugs(84, 5, id_name << " still overloaded; dropped " << droppedRequests);
505 } else {
506 overloadStart = squid_curtime;
507 debugs(84, 3, id_name << " became overloaded");
508 }
509 } else {
510 if (overloadStart) {
511 debugs(84, 5, id_name << " is no longer overloaded");
512 if (droppedRequests) {
513 debugs(84, DBG_IMPORTANT, "helper " << id_name <<
514 " is no longer overloaded after dropping " << droppedRequests <<
515 " requests in " << (squid_curtime - overloadStart) << " seconds");
516 droppedRequests = 0;
517 }
518 overloadStart = 0;
519 }
520 }
6825b101
CT
521}
522
6082a0e2
EB
523/// prepares the helper for request submission
524/// returns true if and only if the submission should proceed
525/// may kill Squid if the helper remains overloaded for too long
6825b101 526bool
e05a9d51 527Helper::Client::prepSubmit()
6825b101 528{
6082a0e2
EB
529 // re-sync for the configuration may have changed since the last submission
530 syncQueueStats();
531
532 // Nothing special to do if the new request does not overload (i.e., the
533 // queue is not even full yet) or only _starts_ overloading this helper
534 // (i.e., the queue is currently at its limit).
535 if (!overloaded())
536 return true;
62e76326 537
6082a0e2
EB
538 if (squid_curtime - overloadStart <= 180)
539 return true; // also OK: overload has not persisted long enough to panic
540
e05a9d51 541 if (childs.onPersistentOverload == ChildConfig::actDie)
6082a0e2
EB
542 fatalf("Too many queued %s requests; see on-persistent-overload.", id_name);
543
544 if (!droppedRequests) {
545 debugs(84, DBG_IMPORTANT, "WARNING: dropping requests to overloaded " <<
546 id_name << " helper configured with on-persistent-overload=err");
6825b101 547 }
6082a0e2
EB
548 ++droppedRequests;
549 debugs(84, 3, "failed to send " << droppedRequests << " helper requests to " << id_name);
550 return false;
551}
552
553bool
e05a9d51 554Helper::Client::trySubmit(const char * const buf, HLPCB * const callback, void * const data)
6082a0e2
EB
555{
556 if (!prepSubmit())
557 return false; // request was dropped
6825b101
CT
558
559 submit(buf, callback, data); // will send or queue
560 return true; // request submitted or queued
561}
562
563/// dispatches or enqueues a helper requests; does not enforce queue limits
564void
e05a9d51 565Helper::Client::submit(const char * const buf, HLPCB * const callback, void * const data)
6825b101 566{
e05a9d51 567 const auto r = new Xaction(callback, data, buf);
32fd6d8a 568 submitRequest(r);
8289d5a9 569 debugs(84, DBG_DATA, Raw("buf", buf, strlen(buf)));
94439e4e 570}
571
26b6afaf
EB
572void
573Helper::Client::callBack(Xaction &r)
574{
575 const auto callback = r.request.callback;
576 Assure(callback);
577
578 r.request.callback = nullptr;
579 void *cbdata = nullptr;
580 if (cbdataReferenceValidDone(r.request.data, &cbdata))
581 callback(cbdata, r.reply);
582}
583
a56fcf0b
CT
584/// Submit request or callback the caller with a Helper::Error error.
585/// If the reservation is not set then reserves a new helper.
94439e4e 586void
3bd118d6 587helperStatefulSubmit(const statefulhelper::Pointer &hlp, const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
94439e4e 588{
a56fcf0b 589 if (!hlp || !hlp->trySubmit(buf, callback, data, reservation))
6082a0e2
EB
590 SubmissionFailure(hlp, callback, data);
591}
592
593/// If possible, submit request. Otherwise, either kill Squid or return false.
594bool
a56fcf0b 595statefulhelper::trySubmit(const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
6082a0e2
EB
596{
597 if (!prepSubmit())
598 return false; // request was dropped
599
a56fcf0b 600 submit(buf, callback, data, reservation); // will send or queue
6082a0e2 601 return true; // request submitted or queued
6825b101 602}
62e76326 603
a56fcf0b
CT
604void
605statefulhelper::reserveServer(helper_stateful_server * srv)
6825b101 606{
a56fcf0b
CT
607 // clear any old reservation
608 if (srv->reserved()) {
609 reservations.erase(srv->reservationId);
610 srv->clearReservation();
611 }
612
613 srv->reserve();
614 reservations.insert(Reservations::value_type(srv->reservationId, srv));
615}
616
617void
618statefulhelper::cancelReservation(const Helper::ReservationId reservation)
619{
620 const auto it = reservations.find(reservation);
621 if (it == reservations.end())
622 return;
623
624 helper_stateful_server *srv = it->second;
625 reservations.erase(it);
626 srv->clearReservation();
627
628 // schedule a queue kick
629 AsyncCall::Pointer call = asyncCall(5,4, "helperStatefulServerDone", cbdataDialer(helperStatefulServerDone, srv));
630 ScheduleCallHere(call);
631}
632
633helper_stateful_server *
634statefulhelper::findServer(const Helper::ReservationId & reservation)
635{
636 const auto it = reservations.find(reservation);
637 if (it == reservations.end())
638 return nullptr;
639 return it->second;
640}
641
642void
643helper_stateful_server::reserve()
644{
645 assert(!reservationId);
646 reservationStart = squid_curtime;
647 reservationId = Helper::ReservationId::Next();
648 debugs(84, 3, "srv-" << index << " reservation id = " << reservationId);
649}
650
651void
652helper_stateful_server::clearReservation()
653{
654 debugs(84, 3, "srv-" << index << " reservation id = " << reservationId);
655 if (!reservationId)
656 return;
62e76326 657
a56fcf0b
CT
658 ++stats.releases;
659
660 reservationId.clear();
661 reservationStart = 0;
662}
663
664void
665statefulhelper::submit(const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
666{
667 Helper::Xaction *r = new Helper::Xaction(callback, data, buf);
62e76326 668
a56fcf0b
CT
669 if (buf && reservation) {
670 debugs(84, 5, reservation);
671 helper_stateful_server *lastServer = findServer(reservation);
672 if (!lastServer) {
673 debugs(84, DBG_CRITICAL, "ERROR: Helper " << id_name << " reservation expired (" << reservation << ")");
674 r->reply.result = Helper::TimedOut;
26b6afaf 675 callBack(*r);
a56fcf0b
CT
676 delete r;
677 return;
678 }
d20ce97d 679 debugs(84, 5, "StatefulSubmit dispatching");
a56fcf0b 680 helperStatefulDispatch(lastServer, r);
94439e4e 681 } else {
26ac0430 682 helper_stateful_server *srv;
6825b101 683 if ((srv = StatefulGetFirstAvailable(this))) {
a56fcf0b 684 reserveServer(srv);
334a9819 685 helperStatefulDispatch(srv, r); // may delete r
62e76326 686 } else
6825b101 687 StatefulEnqueue(this, r);
94439e4e 688 }
62e76326 689
334a9819 690 // r may be dangling here
6082a0e2 691 syncQueueStats();
94439e4e 692}
693
74addf6c 694void
e05a9d51 695Helper::Client::packStatsInto(Packable * const p, const char * const label) const
74addf6c 696{
bf3e8d5a
AJ
697 if (label)
698 p->appendf("%s:\n", label);
699
700 p->appendf(" program: %s\n", cmdline->key);
701 p->appendf(" number active: %d of %d (%d shutting down)\n", childs.n_active, childs.n_max, (childs.n_running - childs.n_active));
702 p->appendf(" requests sent: %d\n", stats.requests);
703 p->appendf(" replies received: %d\n", stats.replies);
704 p->appendf(" requests timedout: %d\n", stats.timedout);
705 p->appendf(" queue length: %d\n", stats.queue_size);
706 p->appendf(" avg service time: %d msec\n", stats.avg_svc_time);
707 p->append("\n",1);
708 p->appendf("%7s\t%7s\t%7s\t%11s\t%11s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
99b59e51
SM
709 "ID #",
710 "FD",
711 "PID",
712 "# Requests",
713 "# Replies",
714 "# Timed-out",
715 "Flags",
716 "Time",
717 "Offset",
718 "Request");
62e76326 719
bf3e8d5a 720 for (dlink_node *link = servers.head; link; link = link->next) {
e05a9d51 721 const auto srv = static_cast<SessionBase *>(link->data);
bf3e8d5a 722 assert(srv);
e05a9d51 723 const auto xaction = srv->requests.empty() ? nullptr : srv->requests.front();
ddc77a2e 724 double tt = 0.001 * (xaction ? tvSubMsec(xaction->request.dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
bf3e8d5a 725 p->appendf("%7u\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
99b59e51
SM
726 srv->index.value,
727 srv->readPipe->fd,
728 srv->pid,
729 srv->stats.uses,
730 srv->stats.replies,
731 srv->stats.timedout,
732 srv->stats.pending ? 'B' : ' ',
733 srv->flags.writing ? 'W' : ' ',
734 srv->flags.closing ? 'C' : ' ',
a56fcf0b 735 srv->reserved() ? 'R' : ' ',
99b59e51 736 srv->flags.shutdown ? 'S' : ' ',
ddc77a2e 737 xaction && xaction->request.placeholder ? 'P' : ' ',
99b59e51
SM
738 tt < 0.0 ? 0.0 : tt,
739 (int) srv->roffset,
ddc77a2e 740 xaction ? Format::QuoteMimeBlob(xaction->request.buf) : "(none)");
94439e4e 741 }
62e76326 742
bf3e8d5a
AJ
743 p->append("\nFlags key:\n"
744 " B\tBUSY\n"
745 " W\tWRITING\n"
746 " C\tCLOSING\n"
747 " R\tRESERVED\n"
748 " S\tSHUTDOWN PENDING\n"
749 " P\tPLACEHOLDER\n", 101);
94439e4e 750}
751
6082a0e2 752bool
e05a9d51 753Helper::Client::willOverload() const {
6082a0e2
EB
754 return queueFull() && !(childs.needNew() || GetFirstAvailable(this));
755}
756
e05a9d51
EB
757Helper::Client::Pointer
758Helper::Client::Make(const char * const name)
3bd118d6 759{
e05a9d51 760 return new Client(name);
3bd118d6
EB
761}
762
763statefulhelper::Pointer
764statefulhelper::Make(const char *name)
765{
766 return new statefulhelper(name);
767}
768
74addf6c 769void
e05a9d51 770helperShutdown(const Helper::Client::Pointer &hlp)
74addf6c 771{
c68e9c6b 772 dlink_node *link = hlp->servers.head;
62e76326 773
c68e9c6b 774 while (link) {
e05a9d51 775 const auto srv = static_cast<Helper::Session *>(link->data);
62e76326 776 link = link->next;
777
8cfc76db 778 if (srv->flags.shutdown) {
e237d339 779 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
62e76326 780 continue;
781 }
782
48d54e4d 783 assert(hlp->childs.n_active > 0);
5e263176 784 -- hlp->childs.n_active;
f53969cc 785 srv->flags.shutdown = true; /* request it to shut itself down */
62e76326 786
d8f10d6a 787 if (srv->flags.closing) {
e237d339 788 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
62e76326 789 continue;
790 }
791
d8f10d6a 792 if (srv->stats.pending) {
e237d339 793 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
62e76326 794 continue;
795 }
796
e237d339 797 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
62e76326 798 /* the rest of the details is dealt with in the helperServerFree
799 * close handler
800 */
0a958e42 801 srv->closePipesSafely();
74addf6c 802 }
569605b9
EB
803
804 Assure(!hlp->childs.n_active);
805 hlp->dropQueued();
74addf6c 806}
807
94439e4e 808void
3bd118d6 809helperStatefulShutdown(const statefulhelper::Pointer &hlp)
94439e4e 810{
811 dlink_node *link = hlp->servers.head;
812 helper_stateful_server *srv;
62e76326 813
94439e4e 814 while (link) {
62e76326 815 srv = (helper_stateful_server *)link->data;
816 link = link->next;
817
8cfc76db 818 if (srv->flags.shutdown) {
e237d339 819 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
62e76326 820 continue;
821 }
822
48d54e4d 823 assert(hlp->childs.n_active > 0);
5e263176 824 -- hlp->childs.n_active;
f53969cc 825 srv->flags.shutdown = true; /* request it to shut itself down */
62e76326 826
a1267972 827 if (srv->stats.pending) {
e237d339 828 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
62e76326 829 continue;
830 }
831
832 if (srv->flags.closing) {
e237d339 833 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
62e76326 834 continue;
835 }
836
a56fcf0b 837 if (srv->reserved()) {
d7e0f901 838 if (shutting_down) {
e237d339 839 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Closing anyway.");
e1381638 840 } else {
e237d339 841 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Not Shutting Down Yet.");
d7e0f901
AJ
842 continue;
843 }
62e76326 844 }
845
e237d339 846 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
bf8fe701 847
62e76326 848 /* the rest of the details is dealt with in the helperStatefulServerFree
849 * close handler
850 */
0a958e42 851 srv->closePipesSafely();
94439e4e 852 }
853}
854
e05a9d51 855Helper::Client::~Client()
1f5f60dd 856{
48d54e4d 857 /* note, don't free id_name, it probably points to static memory */
62e76326 858
569605b9
EB
859 // A non-empty queue would leak Helper::Xaction objects, stalling any
860 // pending (and even future collapsed) transactions. To avoid stalling
861 // transactions, we must dropQueued(). We ought to do that when we
862 // discover that no progress is possible rather than here because
863 // reference counting may keep this object alive for a long time.
864 assert(queue.empty());
94439e4e 865}
866
a56fcf0b 867void
569605b9 868Helper::Client::handleKilledServer(SessionBase * const srv)
74addf6c 869{
1f5f60dd 870 if (!srv->flags.shutdown) {
a56fcf0b
CT
871 assert(childs.n_active > 0);
872 --childs.n_active;
873 debugs(84, DBG_CRITICAL, "WARNING: " << id_name << " #" << srv->index << " exited");
62e76326 874
654835f0 875 handleFewerServers(srv->stats.replies >= 1);
5ea33fce 876
654835f0 877 if (childs.needNew() > 0) {
a56fcf0b 878 srv->flags.shutdown = true;
569605b9 879 openSessions();
4aca092a
HN
880 }
881 }
569605b9
EB
882
883 if (!childs.n_active)
884 dropQueued();
885}
886
887void
888Helper::Client::dropQueued()
889{
890 if (queue.empty())
891 return;
892
893 Assure(!childs.n_active);
894 Assure(!GetFirstAvailable(this));
895
896 // no helper servers means nobody can advance our queued transactions
897
898 debugs(80, DBG_CRITICAL, "ERROR: Dropping " << queue.size() << ' ' <<
899 id_name << " helper requests due to lack of helper processes");
900 // similar to SessionBase::dropQueued()
901 while (const auto r = nextRequest()) {
902 r->reply.result = Helper::Unknown;
903 callBack(*r);
904 delete r;
905 }
74addf6c 906}
907
654835f0 908void
e05a9d51 909Helper::Client::handleFewerServers(const bool madeProgress)
654835f0
EB
910{
911 const auto needNew = childs.needNew();
912
913 if (!needNew)
914 return; // some server(s) have died, but we still have enough
915
916 debugs(80, DBG_IMPORTANT, "Too few " << id_name << " processes are running (need " << needNew << "/" << childs.n_max << ")" <<
917 Debug::Extra << "active processes: " << childs.n_active <<
918 Debug::Extra << "processes configured to start at (re)configuration: " << childs.n_startup);
919
920 if (childs.n_active < childs.n_startup && last_restart > squid_curtime - 30) {
921 if (madeProgress)
922 debugs(80, DBG_CRITICAL, "ERROR: The " << id_name << " helpers are crashing too rapidly, need help!");
923 else
924 fatalf("The %s helpers are crashing too rapidly, need help!", id_name);
925 }
926}
927
bd71920d 928void
0a958e42 929Helper::SessionBase::HelperServerClosed(SessionBase * const srv)
a56fcf0b 930{
0a958e42
EB
931 srv->helper().handleKilledServer(srv);
932 srv->dropQueued();
f7ebc3fd 933 delete srv;
94439e4e 934}
935
ddc77a2e 936Helper::Xaction *
e05a9d51 937Helper::Session::popRequest(const int request_number)
95d2589c 938{
e05a9d51 939 Xaction *r = nullptr;
ddc77a2e 940 if (parent->childs.concurrency) {
2f8abb64 941 // If concurrency supported retrieve request from ID
e05a9d51 942 const auto it = requestsIndex.find(request_number);
ddc77a2e 943 if (it != requestsIndex.end()) {
32fd6d8a 944 r = *(it->second);
ddc77a2e
CT
945 requests.erase(it->second);
946 requestsIndex.erase(it);
32fd6d8a 947 }
ddc77a2e 948 } else if(!requests.empty()) {
32fd6d8a 949 // Else get the first request from queue, if any
ddc77a2e
CT
950 r = requests.front();
951 requests.pop_front();
32fd6d8a
CT
952 }
953
ddc77a2e
CT
954 return r;
955}
956
957/// Calls back with a pointer to the buffer with the helper output
958static void
e05a9d51 959helperReturnBuffer(Helper::Session * srv, const Helper::Client::Pointer &hlp, char * const msg, const size_t msgSize, const char * const msgEnd)
ddc77a2e
CT
960{
961 if (Helper::Xaction *r = srv->replyXaction) {
962 const bool hasSpace = r->reply.accumulate(msg, msgSize);
963 if (!hasSpace) {
964 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
965 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
966 "Squid input buffer: " << hlp->id_name << " #" << srv->index);
0a958e42 967 srv->closePipesSafely();
ddc77a2e
CT
968 return;
969 }
970
971 if (!msgEnd)
972 return; // We are waiting for more data.
973
32fd6d8a 974 bool retry = false;
194ccc9c 975 if (cbdataReferenceValid(r->request.data)) {
ddc77a2e
CT
976 r->reply.finalize();
977 if (r->reply.result == Helper::BrokenHelper && r->request.retries < MAX_RETRIES) {
978 debugs(84, DBG_IMPORTANT, "ERROR: helper: " << r->reply << ", attempt #" << (r->request.retries + 1) << " of 2");
32fd6d8a 979 retry = true;
194ccc9c 980 } else {
26b6afaf 981 hlp->callBack(*r);
194ccc9c 982 }
12f7e09b 983 }
95d2589c 984
5e263176 985 -- srv->stats.pending;
1f7ba0b4 986 ++ srv->stats.replies;
95d2589c 987
95dc7ff4 988 ++ hlp->stats.replies;
95d2589c
CT
989
990 srv->answer_time = current_time;
991
ddc77a2e 992 srv->dispatch_time = r->request.dispatch_time;
95d2589c
CT
993
994 hlp->stats.avg_svc_time =
995 Math::intAverage(hlp->stats.avg_svc_time,
ddc77a2e 996 tvSubMsec(r->request.dispatch_time, current_time),
31bc1fa6 997 hlp->stats.replies, REDIRECT_AV_FACTOR);
95d2589c 998
ddc77a2e
CT
999 // release or re-submit parsedRequestXaction object
1000 srv->replyXaction = nullptr;
32fd6d8a 1001 if (retry) {
ddc77a2e 1002 ++r->request.retries;
32fd6d8a
CT
1003 hlp->submitRequest(r);
1004 } else
1005 delete r;
95d2589c 1006 }
95d2589c 1007
32fd6d8a
CT
1008 if (hlp->timeout && hlp->childs.concurrency)
1009 srv->checkForTimedOutRequests(hlp->retryTimedOut);
1010
95d2589c
CT
1011 if (!srv->flags.shutdown) {
1012 helperKickQueue(hlp);
1013 } else if (!srv->flags.closing && !srv->stats.pending) {
0a958e42 1014 srv->closeWritePipeSafely();
95d2589c
CT
1015 }
1016}
94439e4e 1017
74addf6c 1018static void
ced8def3 1019helperHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
74addf6c 1020{
e05a9d51 1021 const auto srv = static_cast<Helper::Session *>(data);
3bd118d6 1022 const auto hlp = srv->parent;
fa80a8ef 1023 assert(cbdataReferenceValid(data));
c4b7a5a9 1024
c8407295 1025 /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
62e76326 1026
c8407295 1027 if (flag == Comm::ERR_CLOSING) {
c4b7a5a9 1028 return;
1029 }
1030
e0d28505 1031 assert(conn->fd == srv->readPipe->fd);
420e3e30 1032
e237d339 1033 debugs(84, 5, "helperHandleRead: " << len << " bytes from " << hlp->id_name << " #" << srv->index);
62e76326 1034
c8407295 1035 if (flag != Comm::OK || len == 0) {
0a958e42 1036 srv->closePipesSafely();
62e76326 1037 return;
74addf6c 1038 }
62e76326 1039
07eca7e0 1040 srv->roffset += len;
1041 srv->rbuf[srv->roffset] = '\0';
8289d5a9 1042 debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
62e76326 1043
32fd6d8a 1044 if (!srv->stats.pending && !srv->stats.timedout) {
62e76326 1045 /* someone spoke without being spoken to */
dd8b7fd7 1046 debugs(84, DBG_IMPORTANT, "ERROR: Killing helper process after an unexpected read from " <<
e237d339 1047 hlp->id_name << " #" << srv->index << ", " << (int)len <<
bf8fe701 1048 " bytes '" << srv->rbuf << "'");
1049
07eca7e0 1050 srv->roffset = 0;
1051 srv->rbuf[0] = '\0';
0a958e42 1052 srv->closePipesSafely();
dd8b7fd7 1053 return;
07eca7e0 1054 }
1055
ddc77a2e
CT
1056 bool needsMore = false;
1057 char *msg = srv->rbuf;
1058 while (*msg && !needsMore) {
1059 int skip = 0;
1060 char *eom = strchr(msg, hlp->eom);
1061 if (eom) {
1062 skip = 1;
1063 debugs(84, 3, "helperHandleRead: end of reply found");
1064 if (eom > msg && eom[-1] == '\r' && hlp->eom == '\n') {
1065 *eom = '\0';
1066 // rewind to the \r octet which is the real terminal now
1067 // and remember that we have to skip forward 2 places now.
1068 skip = 2;
1069 --eom;
1070 }
1071 *eom = '\0';
ec682a26 1072 }
bf8fe701 1073
1176e456 1074 if (!srv->ignoreToEom && !srv->replyXaction) {
ddc77a2e
CT
1075 int i = 0;
1076 if (hlp->childs.concurrency) {
aee3523a 1077 char *e = nullptr;
ddc77a2e
CT
1078 i = strtol(msg, &e, 10);
1079 // Do we need to check for e == msg? Means wrong response from helper.
2f8abb64 1080 // Will be dropped as "unexpected reply on channel 0"
ddc77a2e
CT
1081 needsMore = !(xisspace(*e) || (eom && e == eom));
1082 if (!needsMore) {
1083 msg = e;
1084 while (*msg && xisspace(*msg))
1085 ++msg;
1086 } // else not enough data to compute request number
1087 }
1088 if (!(srv->replyXaction = srv->popRequest(i))) {
1089 if (srv->stats.timedout) {
1090 debugs(84, 3, "Timedout reply received for request-ID: " << i << " , ignore");
1091 } else {
d816f28d 1092 debugs(84, DBG_IMPORTANT, "ERROR: helperHandleRead: unexpected reply on channel " <<
ddc77a2e
CT
1093 i << " from " << hlp->id_name << " #" << srv->index <<
1094 " '" << srv->rbuf << "'");
1095 }
1176e456 1096 srv->ignoreToEom = true;
ddc77a2e
CT
1097 }
1098 } // else we need to just append reply data to the current Xaction
1099
1100 if (!needsMore) {
1101 size_t msgSize = eom ? eom - msg : (srv->roffset - (msg - srv->rbuf));
1102 assert(msgSize <= srv->rbuf_sz);
1103 helperReturnBuffer(srv, hlp, msg, msgSize, eom);
1104 msg += msgSize + skip;
1105 assert(static_cast<size_t>(msg - srv->rbuf) <= srv->rbuf_sz);
1176e456
CT
1106
1107 // The next message should not ignored.
1108 if (eom && srv->ignoreToEom)
1109 srv->ignoreToEom = false;
ddc77a2e 1110 } else
aee3523a 1111 assert(skip == 0 && eom == nullptr);
ddc77a2e 1112 }
985bfe6b 1113
ddc77a2e
CT
1114 if (needsMore) {
1115 size_t msgSize = (srv->roffset - (msg - srv->rbuf));
1116 assert(msgSize <= srv->rbuf_sz);
1117 memmove(srv->rbuf, msg, msgSize);
1118 srv->roffset = msgSize;
7bef981e 1119 srv->rbuf[srv->roffset] = '\0';
ddc77a2e
CT
1120 } else {
1121 // All of the responses parsed and msg points at the end of read data
1122 assert(static_cast<size_t>(msg - srv->rbuf) == srv->roffset);
1123 srv->roffset = 0;
74addf6c 1124 }
07eca7e0 1125
b0bb5517 1126 if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
a78f2175
AR
1127 int spaceSize = srv->rbuf_sz - srv->roffset - 1;
1128 assert(spaceSize >= 0);
1129
abd8f140
AJ
1130 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
1131 CommIoCbPtrFun(helperHandleRead, srv));
a78f2175 1132 comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
abd8f140 1133 }
74addf6c 1134}
1135
94439e4e 1136static void
ced8def3 1137helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
94439e4e 1138{
aee3523a 1139 char *t = nullptr;
e6ccf245 1140 helper_stateful_server *srv = (helper_stateful_server *)data;
3bd118d6 1141 const auto hlp = srv->parent;
fa80a8ef 1142 assert(cbdataReferenceValid(data));
c4b7a5a9 1143
c8407295 1144 /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
62e76326 1145
c8407295 1146 if (flag == Comm::ERR_CLOSING) {
c4b7a5a9 1147 return;
1148 }
1149
e0d28505 1150 assert(conn->fd == srv->readPipe->fd);
420e3e30 1151
4a7a3d56 1152 debugs(84, 5, "helperStatefulHandleRead: " << len << " bytes from " <<
e237d339 1153 hlp->id_name << " #" << srv->index);
bf8fe701 1154
c8407295 1155 if (flag != Comm::OK || len == 0) {
0a958e42 1156 srv->closePipesSafely();
62e76326 1157 return;
94439e4e 1158 }
62e76326 1159
07eca7e0 1160 srv->roffset += len;
1161 srv->rbuf[srv->roffset] = '\0';
8289d5a9 1162 debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
62e76326 1163
dd8b7fd7 1164 if (srv->requests.empty()) {
62e76326 1165 /* someone spoke without being spoken to */
dd8b7fd7 1166 debugs(84, DBG_IMPORTANT, "ERROR: Killing helper process after an unexpected read from " <<
e237d339 1167 hlp->id_name << " #" << srv->index << ", " << (int)len <<
bf8fe701 1168 " bytes '" << srv->rbuf << "'");
1169
07eca7e0 1170 srv->roffset = 0;
0a958e42 1171 srv->closePipesSafely();
dd8b7fd7 1172 return;
07eca7e0 1173 }
1174
0af9303a 1175 if ((t = strchr(srv->rbuf, hlp->eom))) {
bf8fe701 1176 debugs(84, 3, "helperStatefulHandleRead: end of reply found");
6bf4f823 1177
36cb7d3e
AJ
1178 if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n') {
1179 *t = '\0';
1180 // rewind to the \r octet which is the real terminal now
36cb7d3e
AJ
1181 --t;
1182 }
6bf4f823 1183
62e76326 1184 *t = '\0';
ddc77a2e
CT
1185 }
1186
dd8b7fd7
EB
1187 const auto r = srv->requests.front();
1188
1189 if (!r->reply.accumulate(srv->rbuf, t ? (t - srv->rbuf) : srv->roffset)) {
ddc77a2e
CT
1190 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
1191 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
1192 "Squid input buffer: " << hlp->id_name << " #" << srv->index);
0a958e42 1193 srv->closePipesSafely();
ddc77a2e
CT
1194 return;
1195 }
1196 /**
1197 * BUG: the below assumes that only one response per read() was received and discards any octets remaining.
1198 * Doing this prohibits concurrency support with multiple replies per read().
1199 * TODO: check that read() setup on these buffers pays attention to roffest!=0
1200 * TODO: check that replies bigger than the buffer are discarded and do not to affect future replies
1201 */
1202 srv->roffset = 0;
1203
1204 if (t) {
1205 /* end of reply found */
1206 srv->requests.pop_front(); // we already have it in 'r'
1207 int called = 1;
62e76326 1208
dd8b7fd7 1209 if (cbdataReferenceValid(r->request.data)) {
ddc77a2e 1210 r->reply.finalize();
a56fcf0b 1211 r->reply.reservationId = srv->reservationId;
26b6afaf 1212 hlp->callBack(*r);
62e76326 1213 } else {
e0236918 1214 debugs(84, DBG_IMPORTANT, "StatefulHandleRead: no callback data registered");
e1381638 1215 called = 0;
62e76326 1216 }
ddc77a2e 1217
002df7ca 1218 delete r;
1f7ba0b4
AJ
1219
1220 -- srv->stats.pending;
1221 ++ srv->stats.replies;
1222
95dc7ff4 1223 ++ hlp->stats.replies;
f9598528 1224 srv->answer_time = current_time;
62e76326 1225 hlp->stats.avg_svc_time =
a98bcbee
AJ
1226 Math::intAverage(hlp->stats.avg_svc_time,
1227 tvSubMsec(srv->dispatch_time, current_time),
1228 hlp->stats.replies, REDIRECT_AV_FACTOR);
62e76326 1229
e1381638
AJ
1230 if (called)
1231 helperStatefulServerDone(srv);
1232 else
a56fcf0b 1233 hlp->cancelReservation(srv->reservationId);
94439e4e 1234 }
07eca7e0 1235
b0bb5517 1236 if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
ddc77a2e 1237 int spaceSize = srv->rbuf_sz - 1;
a78f2175 1238
abd8f140
AJ
1239 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
1240 CommIoCbPtrFun(helperStatefulHandleRead, srv));
ddc77a2e 1241 comm_read(srv->readPipe, srv->rbuf, spaceSize, call);
abd8f140 1242 }
94439e4e 1243}
1244
6825b101 1245/// Handles a request when all running helpers, if any, are busy.
74addf6c 1246static void
e05a9d51 1247Enqueue(Helper::Client * const hlp, Helper::Xaction * const r)
74addf6c 1248{
6215dc18 1249 hlp->queue.push(r);
95dc7ff4 1250 ++ hlp->stats.queue_size;
62e76326 1251
48d54e4d
AJ
1252 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1253 if (hlp->childs.needNew() > 0) {
bd71920d 1254 hlp->openSessions();
48d54e4d
AJ
1255 return;
1256 }
1257
6825b101 1258 if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
62e76326 1259 return;
1260
74addf6c 1261 if (squid_curtime - hlp->last_queue_warn < 600)
62e76326 1262 return;
1263
fe73896c 1264 if (shutting_down || reconfiguring)
62e76326 1265 return;
1266
74addf6c 1267 hlp->last_queue_warn = squid_curtime;
62e76326 1268
fa84c01d
FC
1269 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1270 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1271 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
74addf6c 1272}
1273
94439e4e 1274static void
ddc77a2e 1275StatefulEnqueue(statefulhelper * hlp, Helper::Xaction * r)
94439e4e 1276{
6215dc18 1277 hlp->queue.push(r);
95dc7ff4 1278 ++ hlp->stats.queue_size;
62e76326 1279
48d54e4d
AJ
1280 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1281 if (hlp->childs.needNew() > 0) {
bd71920d 1282 hlp->openSessions();
48d54e4d
AJ
1283 return;
1284 }
1285
6825b101 1286 if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
62e76326 1287 return;
1288
94439e4e 1289 if (squid_curtime - hlp->last_queue_warn < 600)
62e76326 1290 return;
1291
94439e4e 1292 if (shutting_down || reconfiguring)
62e76326 1293 return;
1294
94439e4e 1295 hlp->last_queue_warn = squid_curtime;
62e76326 1296
fa84c01d
FC
1297 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1298 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1299 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
94439e4e 1300}
1301
ddc77a2e 1302Helper::Xaction *
e05a9d51 1303Helper::Client::nextRequest()
74addf6c 1304{
6215dc18
AJ
1305 if (queue.empty())
1306 return nullptr;
62e76326 1307
6215dc18
AJ
1308 auto *r = queue.front();
1309 queue.pop();
1310 --stats.queue_size;
94439e4e 1311 return r;
1312}
1313
e05a9d51
EB
1314static Helper::Session *
1315GetFirstAvailable(const Helper::Client::Pointer &hlp)
74addf6c 1316{
1317 dlink_node *n;
e05a9d51 1318 Helper::Session *selected = nullptr;
48e7baac 1319 debugs(84, 5, "GetFirstAvailable: Running servers " << hlp->childs.n_running);
62e76326 1320
48d54e4d 1321 if (hlp->childs.n_running == 0)
aee3523a 1322 return nullptr;
62e76326 1323
07eca7e0 1324 /* Find "least" loaded helper (approx) */
aee3523a 1325 for (n = hlp->servers.head; n != nullptr; n = n->next) {
e05a9d51 1326 const auto srv = static_cast<Helper::Session *>(n->data);
62e76326 1327
07eca7e0 1328 if (selected && selected->stats.pending <= srv->stats.pending)
62e76326 1329 continue;
1330
d8f10d6a 1331 if (srv->flags.shutdown)
62e76326 1332 continue;
1333
07eca7e0 1334 if (!srv->stats.pending)
1335 return srv;
1336
1337 if (selected) {
1338 selected = srv;
1339 break;
1340 }
1341
1342 selected = srv;
74addf6c 1343 }
62e76326 1344
48e7baac
AJ
1345 if (!selected) {
1346 debugs(84, 5, "GetFirstAvailable: None available.");
aee3523a 1347 return nullptr;
48e7baac 1348 }
07eca7e0 1349
48e7baac 1350 if (selected->stats.pending >= (hlp->childs.concurrency ? hlp->childs.concurrency : 1)) {
6082a0e2 1351 debugs(84, 3, "GetFirstAvailable: Least-loaded helper is fully loaded!");
aee3523a 1352 return nullptr;
48e7baac 1353 }
07eca7e0 1354
48e7baac 1355 debugs(84, 5, "GetFirstAvailable: returning srv-" << selected->index);
07eca7e0 1356 return selected;
74addf6c 1357}
1358
94439e4e 1359static helper_stateful_server *
3bd118d6 1360StatefulGetFirstAvailable(const statefulhelper::Pointer &hlp)
94439e4e 1361{
1362 dlink_node *n;
aee3523a 1363 helper_stateful_server *srv = nullptr;
a56fcf0b 1364 helper_stateful_server *oldestReservedServer = nullptr;
48d54e4d 1365 debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp->childs.n_running);
62e76326 1366
48d54e4d 1367 if (hlp->childs.n_running == 0)
aee3523a 1368 return nullptr;
62e76326 1369
aee3523a 1370 for (n = hlp->servers.head; n != nullptr; n = n->next) {
62e76326 1371 srv = (helper_stateful_server *)n->data;
1372
a1267972 1373 if (srv->stats.pending)
62e76326 1374 continue;
1375
a56fcf0b
CT
1376 if (srv->reserved()) {
1377 if ((squid_curtime - srv->reservationStart) > hlp->childs.reservationTimeout) {
1378 if (!oldestReservedServer)
1379 oldestReservedServer = srv;
1380 else if (oldestReservedServer->reservationStart < srv->reservationStart)
1381 oldestReservedServer = srv;
1382 debugs(84, 5, "the earlier reserved server is the srv-" << oldestReservedServer->index);
1383 }
62e76326 1384 continue;
a56fcf0b 1385 }
62e76326 1386
d8f10d6a 1387 if (srv->flags.shutdown)
62e76326 1388 continue;
1389
26ac0430 1390 debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv->index);
62e76326 1391 return srv;
94439e4e 1392 }
62e76326 1393
a56fcf0b
CT
1394 if (oldestReservedServer) {
1395 debugs(84, 5, "expired reservation " << oldestReservedServer->reservationId << " for srv-" << oldestReservedServer->index);
1396 return oldestReservedServer;
1397 }
1398
bf8fe701 1399 debugs(84, 5, "StatefulGetFirstAvailable: None available.");
a56fcf0b 1400 return nullptr;
94439e4e 1401}
1402
42679bd6 1403static void
ced8def3 1404helperDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag flag, int, void *data)
42679bd6 1405{
e05a9d51 1406 const auto srv = static_cast<Helper::Session *>(data);
07eca7e0 1407
2fe7eff9 1408 srv->writebuf->clean();
032785bf 1409 delete srv->writebuf;
aee3523a 1410 srv->writebuf = nullptr;
be4d35dc 1411 srv->flags.writing = false;
07eca7e0 1412
c8407295 1413 if (flag != Comm::OK) {
07eca7e0 1414 /* Helper server has crashed */
e237d339 1415 debugs(84, DBG_CRITICAL, "helperDispatch: Helper " << srv->parent->id_name << " #" << srv->index << " has crashed");
07eca7e0 1416 return;
1417 }
1418
2fe7eff9 1419 if (!srv->wqueue->isNull()) {
07eca7e0 1420 srv->writebuf = srv->wqueue;
032785bf 1421 srv->wqueue = new MemBuf;
be4d35dc 1422 srv->flags.writing = true;
ec41b64c
AJ
1423 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1424 CommIoCbPtrFun(helperDispatchWriteDone, srv));
aee3523a 1425 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, nullptr);
07eca7e0 1426 }
42679bd6 1427}
1428
74addf6c 1429static void
e05a9d51 1430helperDispatch(Helper::Session * const srv, Helper::Xaction * const r)
74addf6c 1431{
3bd118d6 1432 const auto hlp = srv->parent;
32fd6d8a 1433 const uint64_t reqId = ++srv->nextRequestId;
62e76326 1434
ddc77a2e 1435 if (!cbdataReferenceValid(r->request.data)) {
d816f28d 1436 debugs(84, DBG_IMPORTANT, "ERROR: helperDispatch: invalid callback data");
002df7ca 1437 delete r;
62e76326 1438 return;
74addf6c 1439 }
62e76326 1440
ddc77a2e 1441 r->request.Id = reqId;
e05a9d51 1442 const auto it = srv->requests.insert(srv->requests.end(), r);
ddc77a2e 1443 r->request.dispatch_time = current_time;
07eca7e0 1444
2fe7eff9 1445 if (srv->wqueue->isNull())
1446 srv->wqueue->init();
07eca7e0 1447
32fd6d8a 1448 if (hlp->childs.concurrency) {
e05a9d51 1449 srv->requestsIndex.insert(Helper::Session::RequestIndex::value_type(reqId, it));
32fd6d8a 1450 assert(srv->requestsIndex.size() == srv->requests.size());
ddc77a2e 1451 srv->wqueue->appendf("%" PRIu64 " %s", reqId, r->request.buf);
32fd6d8a 1452 } else
ddc77a2e 1453 srv->wqueue->append(r->request.buf, strlen(r->request.buf));
07eca7e0 1454
1455 if (!srv->flags.writing) {
aee3523a 1456 assert(nullptr == srv->writebuf);
07eca7e0 1457 srv->writebuf = srv->wqueue;
032785bf 1458 srv->wqueue = new MemBuf;
be4d35dc 1459 srv->flags.writing = true;
ec41b64c
AJ
1460 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1461 CommIoCbPtrFun(helperDispatchWriteDone, srv));
aee3523a 1462 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, nullptr);
07eca7e0 1463 }
1464
ddc77a2e 1465 debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index << ", " << strlen(r->request.buf) << " bytes");
bf8fe701 1466
95dc7ff4 1467 ++ srv->stats.uses;
1f7ba0b4 1468 ++ srv->stats.pending;
95dc7ff4 1469 ++ hlp->stats.requests;
74addf6c 1470}
1471
42679bd6 1472static void
ced8def3
AJ
1473helperStatefulDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag, int, void *)
1474{}
42679bd6 1475
94439e4e 1476static void
ddc77a2e 1477helperStatefulDispatch(helper_stateful_server * srv, Helper::Xaction * r)
94439e4e 1478{
3bd118d6 1479 const auto hlp = srv->parent;
62e76326 1480
ddc77a2e 1481 if (!cbdataReferenceValid(r->request.data)) {
d816f28d 1482 debugs(84, DBG_IMPORTANT, "ERROR: helperStatefulDispatch: invalid callback data");
002df7ca 1483 delete r;
a56fcf0b 1484 hlp->cancelReservation(srv->reservationId);
62e76326 1485 return;
94439e4e 1486 }
62e76326 1487
e237d339 1488 debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index);
62e76326 1489
a56fcf0b
CT
1490 assert(srv->reservationId);
1491 r->reply.reservationId = srv->reservationId;
1492
ddc77a2e 1493 if (r->request.placeholder == 1) {
62e76326 1494 /* a callback is needed before this request can _use_ a helper. */
d20ce97d 1495 /* we don't care about releasing this helper. The request NEVER
62e76326 1496 * gets to the helper. So we throw away the return code */
ddc77a2e 1497 r->reply.result = Helper::Unknown;
26b6afaf 1498 hlp->callBack(*r);
62e76326 1499 /* throw away the placeholder */
002df7ca 1500 delete r;
62e76326 1501 /* and push the queue. Note that the callback may have submitted a new
e166785a 1502 * request to the helper which is why we test for the request */
62e76326 1503
bf3e8d5a 1504 if (!srv->requests.size())
a8c4f8d6 1505 helperStatefulServerDone(srv);
62e76326 1506
1507 return;
94439e4e 1508 }
62e76326 1509
bf3e8d5a 1510 srv->requests.push_back(r);
94439e4e 1511 srv->dispatch_time = current_time;
ec41b64c 1512 AsyncCall::Pointer call = commCbCall(5,5, "helperStatefulDispatchWriteDone",
3bd118d6 1513 CommIoCbPtrFun(helperStatefulDispatchWriteDone, srv));
aee3523a 1514 Comm::Write(srv->writePipe, r->request.buf, strlen(r->request.buf), call, nullptr);
bf8fe701 1515 debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
e237d339 1516 hlp->id_name << " #" << srv->index << ", " <<
ddc77a2e 1517 (int) strlen(r->request.buf) << " bytes");
bf8fe701 1518
95dc7ff4 1519 ++ srv->stats.uses;
1f7ba0b4 1520 ++ srv->stats.pending;
95dc7ff4 1521 ++ hlp->stats.requests;
94439e4e 1522}
1523
74addf6c 1524static void
e05a9d51 1525helperKickQueue(const Helper::Client::Pointer &hlp)
74addf6c 1526{
e05a9d51
EB
1527 Helper::Xaction *r = nullptr;
1528 Helper::Session *srv = nullptr;
62e76326 1529
6215dc18 1530 while ((srv = GetFirstAvailable(hlp)) && (r = hlp->nextRequest()))
62e76326 1531 helperDispatch(srv, r);
569605b9
EB
1532
1533 if (!hlp->childs.n_active)
1534 hlp->dropQueued();
74addf6c 1535}
1536
94439e4e 1537static void
3bd118d6 1538helperStatefulKickQueue(const statefulhelper::Pointer &hlp)
94439e4e 1539{
ddc77a2e 1540 Helper::Xaction *r;
94439e4e 1541 helper_stateful_server *srv;
a56fcf0b
CT
1542 while ((srv = StatefulGetFirstAvailable(hlp)) && (r = hlp->nextRequest())) {
1543 debugs(84, 5, "found srv-" << srv->index);
1544 hlp->reserveServer(srv);
62e76326 1545 helperStatefulDispatch(srv, r);
a56fcf0b 1546 }
569605b9
EB
1547
1548 if (!hlp->childs.n_active)
1549 hlp->dropQueued();
94439e4e 1550}
1551
1552static void
a8c4f8d6 1553helperStatefulServerDone(helper_stateful_server * srv)
94439e4e 1554{
420e3e30
HN
1555 if (!srv->flags.shutdown) {
1556 helperStatefulKickQueue(srv->parent);
a56fcf0b 1557 } else if (!srv->flags.closing && !srv->reserved() && !srv->stats.pending) {
0a958e42 1558 srv->closeWritePipeSafely();
420e3e30
HN
1559 return;
1560 }
94439e4e 1561}
1562
32fd6d8a 1563void
e05a9d51 1564Helper::Session::checkForTimedOutRequests(bool const retry)
32fd6d8a
CT
1565{
1566 assert(parent->childs.concurrency);
ddc77a2e 1567 while(!requests.empty() && requests.front()->request.timedOut(parent->timeout)) {
e05a9d51 1568 const auto r = requests.front();
32fd6d8a 1569 RequestIndex::iterator it;
ddc77a2e 1570 it = requestsIndex.find(r->request.Id);
32fd6d8a
CT
1571 assert(it != requestsIndex.end());
1572 requestsIndex.erase(it);
1573 requests.pop_front();
ddc77a2e 1574 debugs(84, 2, "Request " << r->request.Id << " timed-out, remove it from queue");
32fd6d8a 1575 bool retried = false;
ddc77a2e
CT
1576 if (retry && r->request.retries < MAX_RETRIES && cbdataReferenceValid(r->request.data)) {
1577 debugs(84, 2, "Retry request " << r->request.Id);
1578 ++r->request.retries;
32fd6d8a
CT
1579 parent->submitRequest(r);
1580 retried = true;
26b6afaf 1581 } else if (cbdataReferenceValid(r->request.data)) {
32fd6d8a 1582 if (!parent->onTimedOutResponse.isEmpty()) {
ddc77a2e
CT
1583 if (r->reply.accumulate(parent->onTimedOutResponse.rawContent(), parent->onTimedOutResponse.length()))
1584 r->reply.finalize();
1585 else
1586 r->reply.result = Helper::TimedOut;
26b6afaf 1587 parent->callBack(*r);
ddc77a2e
CT
1588 } else {
1589 r->reply.result = Helper::TimedOut;
26b6afaf 1590 parent->callBack(*r);
ddc77a2e 1591 }
32fd6d8a
CT
1592 }
1593 --stats.pending;
1594 ++stats.timedout;
1595 ++parent->stats.timedout;
1596 if (!retried)
1597 delete r;
1598 }
1599}
1600
1601void
e05a9d51 1602Helper::Session::requestTimeout(const CommTimeoutCbParams &io)
32fd6d8a 1603{
bf95c10a 1604 debugs(26, 3, io.conn);
e05a9d51 1605 const auto srv = static_cast<Session *>(io.data);
32fd6d8a 1606
32fd6d8a
CT
1607 srv->checkForTimedOutRequests(srv->parent->retryTimedOut);
1608
e05a9d51
EB
1609 debugs(84, 3, io.conn << " establish a new timeout");
1610 AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "Helper::Session::requestTimeout",
1611 CommTimeoutCbPtrFun(Session::requestTimeout, srv));
32fd6d8a 1612
4650a4fa
FC
1613 const time_t timeSpent = srv->requests.empty() ? 0 : (squid_curtime - srv->requests.front()->request.dispatch_time.tv_sec);
1614 const time_t minimumNewTimeout = 1; // second
1615 const auto timeLeft = max(minimumNewTimeout, srv->parent->timeout - timeSpent);
32fd6d8a
CT
1616
1617 commSetConnTimeout(io.conn, timeLeft, timeoutCall);
1618}
f53969cc 1619