]> git.ipfire.org Git - thirdparty/squid.git/blame - src/helper.cc
Use ERR_ACCESS_DENIED for HTTP 403 (Forbidden) errors (#1899)
[thirdparty/squid.git] / src / helper.cc
CommitLineData
f740a279 1/*
b8ae064d 2 * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
f740a279 3 *
bbc27441
AJ
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
f740a279 7 */
8
bbc27441
AJ
9/* DEBUG: section 84 Helper process maintenance */
10
582c2af2 11#include "squid.h"
37dedc58 12#include "base/AsyncCbdataCalls.h"
bf3e8d5a 13#include "base/Packable.h"
675b8408 14#include "base/Raw.h"
e0d28505
AJ
15#include "comm.h"
16#include "comm/Connection.h"
7e66d5e2 17#include "comm/Read.h"
ec41b64c 18#include "comm/Write.h"
675b8408 19#include "debug/Messages.h"
c4ad1349 20#include "fd.h"
b0bb5517 21#include "fde.h"
38e16f92 22#include "format/Quoting.h"
582c2af2 23#include "helper.h"
24438ec5
AJ
24#include "helper/Reply.h"
25#include "helper/Request.h"
e0d28505 26#include "MemBuf.h"
32fd6d8a 27#include "SquidConfig.h"
96097880 28#include "SquidIpc.h"
a98bcbee 29#include "SquidMath.h"
e6ccf245 30#include "Store.h"
d295d770 31#include "wordlist.h"
74addf6c 32
ed6e9fb9
AJ
33// helper_stateful_server::data uses explicit alloc()/freeOne() */
34#include "mem/Pool.h"
35
74addf6c 36#define HELPER_MAX_ARGS 64
37
32fd6d8a
CT
38/// The maximum allowed request retries.
39#define MAX_RETRIES 2
40
ddc77a2e
CT
41/// Helpers input buffer size.
42const size_t ReadBufSize(32*1024);
a78f2175 43
c4b7a5a9 44static IOCB helperHandleRead;
45static IOCB helperStatefulHandleRead;
e05a9d51
EB
46static void Enqueue(Helper::Client *, Helper::Xaction *);
47static Helper::Session *GetFirstAvailable(const Helper::Client::Pointer &);
3bd118d6 48static helper_stateful_server *StatefulGetFirstAvailable(const statefulhelper::Pointer &);
e05a9d51 49static void helperDispatch(Helper::Session *, Helper::Xaction *);
ddc77a2e 50static void helperStatefulDispatch(helper_stateful_server * srv, Helper::Xaction * r);
e05a9d51 51static void helperKickQueue(const Helper::Client::Pointer &);
3bd118d6 52static void helperStatefulKickQueue(const statefulhelper::Pointer &);
a8c4f8d6 53static void helperStatefulServerDone(helper_stateful_server * srv);
ddc77a2e 54static void StatefulEnqueue(statefulhelper * hlp, Helper::Xaction * r);
9522b380 55
e05a9d51 56CBDATA_NAMESPACED_CLASS_INIT(Helper, Session);
f3831c93 57CBDATA_CLASS_INIT(helper_stateful_server);
aa839030 58
e05a9d51 59InstanceIdDefinitions(Helper::SessionBase, "Hlpr");
e237d339 60
895dba0a 61void
e05a9d51 62Helper::SessionBase::initStats()
895dba0a
CT
63{
64 stats.uses=0;
65 stats.replies=0;
66 stats.pending=0;
67 stats.releases=0;
68 stats.timedout = 0;
69}
70
e0d28505 71void
0a958e42 72Helper::SessionBase::closePipesSafely()
e0d28505 73{
7aa9bb3e 74#if _SQUID_WINDOWS_
e0d28505
AJ
75 shutdown(writePipe->fd, SD_BOTH);
76#endif
77
be4d35dc 78 flags.closing = true;
e0d28505
AJ
79 if (readPipe->fd == writePipe->fd)
80 readPipe->fd = -1;
81 else
82 readPipe->close();
83 writePipe->close();
84
7aa9bb3e 85#if _SQUID_WINDOWS_
dc49061a
A
86 if (hIpc) {
87 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
88 getCurrentTime();
0a958e42 89 debugs(84, DBG_IMPORTANT, "WARNING: " << helper().id_name <<
41060bef 90 " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
e0d28505 91 }
dc49061a
A
92 CloseHandle(hIpc);
93 }
e0d28505
AJ
94#endif
95}
96
97void
0a958e42 98Helper::SessionBase::closeWritePipeSafely()
e0d28505 99{
7aa9bb3e 100#if _SQUID_WINDOWS_
e0d28505
AJ
101 shutdown(writePipe->fd, (readPipe->fd == writePipe->fd ? SD_BOTH : SD_SEND));
102#endif
103
be4d35dc 104 flags.closing = true;
e0d28505
AJ
105 if (readPipe->fd == writePipe->fd)
106 readPipe->fd = -1;
107 writePipe->close();
108
7aa9bb3e 109#if _SQUID_WINDOWS_
dc49061a
A
110 if (hIpc) {
111 if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
112 getCurrentTime();
0a958e42 113 debugs(84, DBG_IMPORTANT, "WARNING: " << helper().id_name <<
41060bef 114 " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
e0d28505 115 }
dc49061a
A
116 CloseHandle(hIpc);
117 }
e0d28505
AJ
118#endif
119}
120
a56fcf0b 121void
0a958e42 122Helper::SessionBase::dropQueued()
a56fcf0b
CT
123{
124 while (!requests.empty()) {
125 // XXX: re-schedule these on another helper?
e05a9d51 126 const auto r = requests.front();
a56fcf0b 127 requests.pop_front();
26b6afaf 128 r->reply.result = Helper::Unknown;
0a958e42 129 helper().callBack(*r);
a56fcf0b
CT
130 delete r;
131 }
132}
133
e05a9d51 134Helper::SessionBase::~SessionBase()
a56fcf0b
CT
135{
136 if (rbuf) {
137 memFreeBuf(rbuf_sz, rbuf);
aee3523a 138 rbuf = nullptr;
a56fcf0b
CT
139 }
140}
141
e05a9d51 142Helper::Session::~Session()
a56fcf0b
CT
143{
144 wqueue->clean();
145 delete wqueue;
146
147 if (writebuf) {
148 writebuf->clean();
149 delete writebuf;
aee3523a 150 writebuf = nullptr;
a56fcf0b
CT
151 }
152
153 if (Comm::IsConnOpen(writePipe))
0a958e42 154 closeWritePipeSafely();
a56fcf0b
CT
155
156 dlinkDelete(&link, &parent->servers);
157
158 assert(parent->childs.n_running > 0);
159 -- parent->childs.n_running;
160
161 assert(requests.empty());
a56fcf0b
CT
162}
163
164void
0a958e42 165Helper::Session::dropQueued()
a56fcf0b 166{
0a958e42 167 SessionBase::dropQueued();
a56fcf0b
CT
168 requestsIndex.clear();
169}
170
171helper_stateful_server::~helper_stateful_server()
172{
173 /* TODO: walk the local queue of requests and carry them all out */
174 if (Comm::IsConnOpen(writePipe))
0a958e42 175 closeWritePipeSafely();
a56fcf0b
CT
176
177 parent->cancelReservation(reservationId);
178
179 dlinkDelete(&link, &parent->servers);
180
181 assert(parent->childs.n_running > 0);
182 -- parent->childs.n_running;
183
184 assert(requests.empty());
a56fcf0b
CT
185}
186
74addf6c 187void
bd71920d 188Helper::Client::openSessions()
74addf6c 189{
190 char *s;
191 char *progname;
192 char *shortname;
193 char *procname;
1b8af627 194 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
74addf6c 195 char fd_note_buf[FD_DESC_SZ];
74addf6c 196 int nargs = 0;
197 int k;
b5d712b5 198 pid_t pid;
74addf6c 199 int rfd;
200 int wfd;
b5d712b5 201 void * hIpc;
74addf6c 202 wordlist *w;
bd71920d
EB
203 // Helps reducing diff. TODO: remove
204 const auto hlp = this;
62e76326 205
aee3523a 206 if (hlp->cmdline == nullptr)
62e76326 207 return;
208
74addf6c 209 progname = hlp->cmdline->key;
62e76326 210
74addf6c 211 if ((s = strrchr(progname, '/')))
62e76326 212 shortname = xstrdup(s + 1);
74addf6c 213 else
62e76326 214 shortname = xstrdup(progname);
215
48d54e4d
AJ
216 /* figure out how many new child are actually needed. */
217 int need_new = hlp->childs.needNew();
5a5b2c56 218
c59baaa8 219 debugs(84, Important(19), "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
5a5b2c56 220
04f7fd38 221 if (need_new < 1) {
c59baaa8 222 debugs(84, Important(20), "helperOpenServers: No '" << shortname << "' processes needed.");
5a5b2c56 223 }
62e76326 224
e6ccf245 225 procname = (char *)xmalloc(strlen(shortname) + 3);
62e76326 226
74addf6c 227 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
62e76326 228
a38ec4b1
FC
229 args[nargs] = procname;
230 ++nargs;
62e76326 231
a38ec4b1
FC
232 for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
233 args[nargs] = w->key;
234 ++nargs;
235 }
62e76326 236
aee3523a 237 args[nargs] = nullptr;
a38ec4b1 238 ++nargs;
62e76326 239
74addf6c 240 assert(nargs <= HELPER_MAX_ARGS);
62e76326 241
654835f0
EB
242 int successfullyStarted = 0;
243
95dc7ff4 244 for (k = 0; k < need_new; ++k) {
62e76326 245 getCurrentTime();
246 rfd = wfd = -1;
b5d712b5 247 pid = ipcCreate(hlp->ipc_type,
248 progname,
249 args,
250 shortname,
cc192b50 251 hlp->addr,
b5d712b5 252 &rfd,
253 &wfd,
254 &hIpc);
255
256 if (pid < 0) {
e0236918 257 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
62e76326 258 continue;
259 }
260
654835f0 261 ++successfullyStarted;
95dc7ff4
FC
262 ++ hlp->childs.n_running;
263 ++ hlp->childs.n_active;
e05a9d51 264 const auto srv = new Helper::Session;
895dba0a
CT
265 srv->hIpc = hIpc;
266 srv->pid = pid;
267 srv->initStats();
268 srv->addr = hlp->addr;
269 srv->readPipe = new Comm::Connection;
270 srv->readPipe->fd = rfd;
271 srv->writePipe = new Comm::Connection;
272 srv->writePipe->fd = wfd;
273 srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
274 srv->wqueue = new MemBuf;
275 srv->roffset = 0;
276 srv->nextRequestId = 0;
aee3523a 277 srv->replyXaction = nullptr;
895dba0a 278 srv->ignoreToEom = false;
3bd118d6 279 srv->parent = hlp;
62e76326 280 dlinkAddTail(srv, &srv->link, &hlp->servers);
281
282 if (rfd == wfd) {
283 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
284 fd_note(rfd, fd_note_buf);
285 } else {
286 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
287 fd_note(rfd, fd_note_buf);
288 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
289 fd_note(wfd, fd_note_buf);
290 }
291
292 commSetNonBlocking(rfd);
293
294 if (wfd != rfd)
295 commSetNonBlocking(wfd);
296
0a958e42 297 AsyncCall::Pointer closeCall = asyncCall(5, 4, "Helper::Session::HelperServerClosed", cbdataDialer(SessionBase::HelperServerClosed,
ab2acef6 298 static_cast<Helper::SessionBase *>(srv)));
0a958e42 299
37dedc58 300 comm_add_close_handler(rfd, closeCall);
07eca7e0 301
f53969cc 302 if (hlp->timeout && hlp->childs.concurrency) {
e05a9d51
EB
303 AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "Helper::Session::requestTimeout",
304 CommTimeoutCbPtrFun(Helper::Session::requestTimeout, srv));
32fd6d8a
CT
305 commSetConnTimeout(srv->readPipe, hlp->timeout, timeoutCall);
306 }
307
abd8f140
AJ
308 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
309 CommIoCbPtrFun(helperHandleRead, srv));
310 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
74addf6c 311 }
62e76326 312
654835f0
EB
313 // Call handleFewerServers() before hlp->last_restart is updated because
314 // that method uses last_restart to measure the delay since previous start.
315 // TODO: Refactor last_restart code to measure failure frequency rather than
316 // detecting a helper #X failure that is being close to the helper #Y start.
317 if (successfullyStarted < need_new)
318 hlp->handleFewerServers(false);
319
5ea33fce 320 hlp->last_restart = squid_curtime;
74addf6c 321 safe_free(shortname);
322 safe_free(procname);
838b993c 323 helperKickQueue(hlp);
74addf6c 324}
325
94439e4e 326void
bd71920d 327statefulhelper::openSessions()
94439e4e 328{
94439e4e 329 char *shortname;
1b8af627 330 const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
94439e4e 331 char fd_note_buf[FD_DESC_SZ];
94439e4e 332 int nargs = 0;
bd71920d
EB
333 // Helps reducing diff. TODO: remove
334 const auto hlp = this;
62e76326 335
aee3523a 336 if (hlp->cmdline == nullptr)
62e76326 337 return;
338
7353861b 339 if (hlp->childs.concurrency)
fa84c01d 340 debugs(84, DBG_CRITICAL, "ERROR: concurrency= is not yet supported for stateful helpers ('" << hlp->cmdline << "')");
7353861b 341
4f0ef8e8 342 char *progname = hlp->cmdline->key;
62e76326 343
4f0ef8e8 344 char *s;
94439e4e 345 if ((s = strrchr(progname, '/')))
62e76326 346 shortname = xstrdup(s + 1);
94439e4e 347 else
62e76326 348 shortname = xstrdup(progname);
349
48d54e4d
AJ
350 /* figure out haw mant new helpers are needed. */
351 int need_new = hlp->childs.needNew();
5a5b2c56 352
e0236918 353 debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
5a5b2c56 354
04f7fd38 355 if (need_new < 1) {
e0236918 356 debugs(84, DBG_IMPORTANT, "helperStatefulOpenServers: No '" << shortname << "' processes needed.");
5a5b2c56 357 }
62e76326 358
4f0ef8e8 359 char *procname = (char *)xmalloc(strlen(shortname) + 3);
62e76326 360
94439e4e 361 snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
62e76326 362
a38ec4b1
FC
363 args[nargs] = procname;
364 ++nargs;
62e76326 365
a38ec4b1
FC
366 for (wordlist *w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
367 args[nargs] = w->key;
368 ++nargs;
369 }
62e76326 370
aee3523a 371 args[nargs] = nullptr;
a38ec4b1 372 ++nargs;
62e76326 373
94439e4e 374 assert(nargs <= HELPER_MAX_ARGS);
62e76326 375
654835f0
EB
376 int successfullyStarted = 0;
377
95dc7ff4 378 for (int k = 0; k < need_new; ++k) {
62e76326 379 getCurrentTime();
26ac0430
AJ
380 int rfd = -1;
381 int wfd = -1;
382 void * hIpc;
4f0ef8e8 383 pid_t pid = ipcCreate(hlp->ipc_type,
26ac0430
AJ
384 progname,
385 args,
386 shortname,
387 hlp->addr,
388 &rfd,
389 &wfd,
390 &hIpc);
b5d712b5 391
392 if (pid < 0) {
e0236918 393 debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
62e76326 394 continue;
395 }
396
654835f0 397 ++successfullyStarted;
95dc7ff4
FC
398 ++ hlp->childs.n_running;
399 ++ hlp->childs.n_active;
895dba0a
CT
400 helper_stateful_server *srv = new helper_stateful_server;
401 srv->hIpc = hIpc;
402 srv->pid = pid;
403 srv->initStats();
404 srv->addr = hlp->addr;
405 srv->readPipe = new Comm::Connection;
406 srv->readPipe->fd = rfd;
407 srv->writePipe = new Comm::Connection;
408 srv->writePipe->fd = wfd;
409 srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
410 srv->roffset = 0;
3bd118d6 411 srv->parent = hlp;
895dba0a
CT
412 srv->reservationStart = 0;
413
62e76326 414 dlinkAddTail(srv, &srv->link, &hlp->servers);
415
416 if (rfd == wfd) {
417 snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
418 fd_note(rfd, fd_note_buf);
419 } else {
420 snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
421 fd_note(rfd, fd_note_buf);
422 snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
423 fd_note(wfd, fd_note_buf);
424 }
425
426 commSetNonBlocking(rfd);
427
428 if (wfd != rfd)
429 commSetNonBlocking(wfd);
430
0a958e42 431 AsyncCall::Pointer closeCall = asyncCall(5, 4, "helper_stateful_server::HelperServerClosed", cbdataDialer(Helper::SessionBase::HelperServerClosed,
ab2acef6 432 static_cast<Helper::SessionBase *>(srv)));
0a958e42 433
37dedc58 434 comm_add_close_handler(rfd, closeCall);
07eca7e0 435
abd8f140
AJ
436 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
437 CommIoCbPtrFun(helperStatefulHandleRead, srv));
438 comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
94439e4e 439 }
62e76326 440
654835f0
EB
441 // Call handleFewerServers() before hlp->last_restart is updated because
442 // that method uses last_restart to measure the delay since previous start.
443 // TODO: Refactor last_restart code to measure failure frequency rather than
444 // detecting a helper #X failure that is being close to the helper #Y start.
445 if (successfullyStarted < need_new)
446 hlp->handleFewerServers(false);
447
5ea33fce 448 hlp->last_restart = squid_curtime;
94439e4e 449 safe_free(shortname);
450 safe_free(procname);
451 helperStatefulKickQueue(hlp);
452}
453
32fd6d8a 454void
e05a9d51 455Helper::Client::submitRequest(Helper::Xaction * const r)
32fd6d8a 456{
e05a9d51 457 if (const auto srv = GetFirstAvailable(this))
32fd6d8a
CT
458 helperDispatch(srv, r);
459 else
460 Enqueue(this, r);
461
6082a0e2
EB
462 syncQueueStats();
463}
464
465/// handles helperSubmit() and helperStatefulSubmit() failures
466static void
e05a9d51 467SubmissionFailure(const Helper::Client::Pointer &hlp, HLPCB *callback, void *data)
6082a0e2
EB
468{
469 auto result = Helper::Error;
470 if (!hlp) {
471 debugs(84, 3, "no helper");
472 result = Helper::Unknown;
32fd6d8a 473 }
6082a0e2
EB
474 // else pretend the helper has responded with ERR
475
476 callback(data, Helper::Reply(result));
32fd6d8a
CT
477}
478
74addf6c 479void
e05a9d51 480helperSubmit(const Helper::Client::Pointer &hlp, const char * const buf, HLPCB * const callback, void * const data)
74addf6c 481{
6082a0e2
EB
482 if (!hlp || !hlp->trySubmit(buf, callback, data))
483 SubmissionFailure(hlp, callback, data);
6825b101
CT
484}
485
6082a0e2 486/// whether queuing an additional request would overload the helper
6825b101 487bool
e05a9d51 488Helper::Client::queueFull() const {
6082a0e2
EB
489 return stats.queue_size >= static_cast<int>(childs.queue_size);
490}
491
492bool
e05a9d51 493Helper::Client::overloaded() const {
6825b101
CT
494 return stats.queue_size > static_cast<int>(childs.queue_size);
495}
496
6082a0e2 497/// synchronizes queue-dependent measurements with the current queue state
6825b101 498void
e05a9d51 499Helper::Client::syncQueueStats()
6825b101 500{
6082a0e2
EB
501 if (overloaded()) {
502 if (overloadStart) {
503 debugs(84, 5, id_name << " still overloaded; dropped " << droppedRequests);
504 } else {
505 overloadStart = squid_curtime;
506 debugs(84, 3, id_name << " became overloaded");
507 }
508 } else {
509 if (overloadStart) {
510 debugs(84, 5, id_name << " is no longer overloaded");
511 if (droppedRequests) {
512 debugs(84, DBG_IMPORTANT, "helper " << id_name <<
513 " is no longer overloaded after dropping " << droppedRequests <<
514 " requests in " << (squid_curtime - overloadStart) << " seconds");
515 droppedRequests = 0;
516 }
517 overloadStart = 0;
518 }
519 }
6825b101
CT
520}
521
6082a0e2
EB
522/// prepares the helper for request submission
523/// returns true if and only if the submission should proceed
524/// may kill Squid if the helper remains overloaded for too long
6825b101 525bool
e05a9d51 526Helper::Client::prepSubmit()
6825b101 527{
6082a0e2
EB
528 // re-sync for the configuration may have changed since the last submission
529 syncQueueStats();
530
531 // Nothing special to do if the new request does not overload (i.e., the
532 // queue is not even full yet) or only _starts_ overloading this helper
533 // (i.e., the queue is currently at its limit).
534 if (!overloaded())
535 return true;
62e76326 536
6082a0e2
EB
537 if (squid_curtime - overloadStart <= 180)
538 return true; // also OK: overload has not persisted long enough to panic
539
e05a9d51 540 if (childs.onPersistentOverload == ChildConfig::actDie)
6082a0e2
EB
541 fatalf("Too many queued %s requests; see on-persistent-overload.", id_name);
542
543 if (!droppedRequests) {
544 debugs(84, DBG_IMPORTANT, "WARNING: dropping requests to overloaded " <<
545 id_name << " helper configured with on-persistent-overload=err");
6825b101 546 }
6082a0e2
EB
547 ++droppedRequests;
548 debugs(84, 3, "failed to send " << droppedRequests << " helper requests to " << id_name);
549 return false;
550}
551
552bool
e05a9d51 553Helper::Client::trySubmit(const char * const buf, HLPCB * const callback, void * const data)
6082a0e2
EB
554{
555 if (!prepSubmit())
556 return false; // request was dropped
6825b101
CT
557
558 submit(buf, callback, data); // will send or queue
559 return true; // request submitted or queued
560}
561
562/// dispatches or enqueues a helper requests; does not enforce queue limits
563void
e05a9d51 564Helper::Client::submit(const char * const buf, HLPCB * const callback, void * const data)
6825b101 565{
e05a9d51 566 const auto r = new Xaction(callback, data, buf);
32fd6d8a 567 submitRequest(r);
8289d5a9 568 debugs(84, DBG_DATA, Raw("buf", buf, strlen(buf)));
94439e4e 569}
570
26b6afaf
EB
571void
572Helper::Client::callBack(Xaction &r)
573{
574 const auto callback = r.request.callback;
575 Assure(callback);
576
577 r.request.callback = nullptr;
578 void *cbdata = nullptr;
579 if (cbdataReferenceValidDone(r.request.data, &cbdata))
580 callback(cbdata, r.reply);
581}
582
a56fcf0b
CT
583/// Submit request or callback the caller with a Helper::Error error.
584/// If the reservation is not set then reserves a new helper.
94439e4e 585void
3bd118d6 586helperStatefulSubmit(const statefulhelper::Pointer &hlp, const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
94439e4e 587{
a56fcf0b 588 if (!hlp || !hlp->trySubmit(buf, callback, data, reservation))
6082a0e2
EB
589 SubmissionFailure(hlp, callback, data);
590}
591
592/// If possible, submit request. Otherwise, either kill Squid or return false.
593bool
a56fcf0b 594statefulhelper::trySubmit(const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
6082a0e2
EB
595{
596 if (!prepSubmit())
597 return false; // request was dropped
598
a56fcf0b 599 submit(buf, callback, data, reservation); // will send or queue
6082a0e2 600 return true; // request submitted or queued
6825b101 601}
62e76326 602
a56fcf0b
CT
603void
604statefulhelper::reserveServer(helper_stateful_server * srv)
6825b101 605{
a56fcf0b
CT
606 // clear any old reservation
607 if (srv->reserved()) {
608 reservations.erase(srv->reservationId);
609 srv->clearReservation();
610 }
611
612 srv->reserve();
613 reservations.insert(Reservations::value_type(srv->reservationId, srv));
614}
615
616void
617statefulhelper::cancelReservation(const Helper::ReservationId reservation)
618{
619 const auto it = reservations.find(reservation);
620 if (it == reservations.end())
621 return;
622
623 helper_stateful_server *srv = it->second;
624 reservations.erase(it);
625 srv->clearReservation();
626
627 // schedule a queue kick
628 AsyncCall::Pointer call = asyncCall(5,4, "helperStatefulServerDone", cbdataDialer(helperStatefulServerDone, srv));
629 ScheduleCallHere(call);
630}
631
632helper_stateful_server *
633statefulhelper::findServer(const Helper::ReservationId & reservation)
634{
635 const auto it = reservations.find(reservation);
636 if (it == reservations.end())
637 return nullptr;
638 return it->second;
639}
640
641void
642helper_stateful_server::reserve()
643{
644 assert(!reservationId);
645 reservationStart = squid_curtime;
646 reservationId = Helper::ReservationId::Next();
647 debugs(84, 3, "srv-" << index << " reservation id = " << reservationId);
648}
649
650void
651helper_stateful_server::clearReservation()
652{
653 debugs(84, 3, "srv-" << index << " reservation id = " << reservationId);
654 if (!reservationId)
655 return;
62e76326 656
a56fcf0b
CT
657 ++stats.releases;
658
659 reservationId.clear();
660 reservationStart = 0;
661}
662
663void
664statefulhelper::submit(const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
665{
666 Helper::Xaction *r = new Helper::Xaction(callback, data, buf);
62e76326 667
a56fcf0b
CT
668 if (buf && reservation) {
669 debugs(84, 5, reservation);
670 helper_stateful_server *lastServer = findServer(reservation);
671 if (!lastServer) {
672 debugs(84, DBG_CRITICAL, "ERROR: Helper " << id_name << " reservation expired (" << reservation << ")");
673 r->reply.result = Helper::TimedOut;
26b6afaf 674 callBack(*r);
a56fcf0b
CT
675 delete r;
676 return;
677 }
d20ce97d 678 debugs(84, 5, "StatefulSubmit dispatching");
a56fcf0b 679 helperStatefulDispatch(lastServer, r);
94439e4e 680 } else {
26ac0430 681 helper_stateful_server *srv;
6825b101 682 if ((srv = StatefulGetFirstAvailable(this))) {
a56fcf0b 683 reserveServer(srv);
334a9819 684 helperStatefulDispatch(srv, r); // may delete r
62e76326 685 } else
6825b101 686 StatefulEnqueue(this, r);
94439e4e 687 }
62e76326 688
334a9819 689 // r may be dangling here
6082a0e2 690 syncQueueStats();
94439e4e 691}
692
74addf6c 693void
e05a9d51 694Helper::Client::packStatsInto(Packable * const p, const char * const label) const
74addf6c 695{
bf3e8d5a
AJ
696 if (label)
697 p->appendf("%s:\n", label);
698
699 p->appendf(" program: %s\n", cmdline->key);
700 p->appendf(" number active: %d of %d (%d shutting down)\n", childs.n_active, childs.n_max, (childs.n_running - childs.n_active));
701 p->appendf(" requests sent: %d\n", stats.requests);
702 p->appendf(" replies received: %d\n", stats.replies);
703 p->appendf(" requests timedout: %d\n", stats.timedout);
704 p->appendf(" queue length: %d\n", stats.queue_size);
705 p->appendf(" avg service time: %d msec\n", stats.avg_svc_time);
706 p->append("\n",1);
707 p->appendf("%7s\t%7s\t%7s\t%11s\t%11s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
99b59e51
SM
708 "ID #",
709 "FD",
710 "PID",
711 "# Requests",
712 "# Replies",
713 "# Timed-out",
714 "Flags",
715 "Time",
716 "Offset",
717 "Request");
62e76326 718
bf3e8d5a 719 for (dlink_node *link = servers.head; link; link = link->next) {
e05a9d51 720 const auto srv = static_cast<SessionBase *>(link->data);
bf3e8d5a 721 assert(srv);
e05a9d51 722 const auto xaction = srv->requests.empty() ? nullptr : srv->requests.front();
ddc77a2e 723 double tt = 0.001 * (xaction ? tvSubMsec(xaction->request.dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
bf3e8d5a 724 p->appendf("%7u\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
99b59e51
SM
725 srv->index.value,
726 srv->readPipe->fd,
727 srv->pid,
728 srv->stats.uses,
729 srv->stats.replies,
730 srv->stats.timedout,
731 srv->stats.pending ? 'B' : ' ',
732 srv->flags.writing ? 'W' : ' ',
733 srv->flags.closing ? 'C' : ' ',
a56fcf0b 734 srv->reserved() ? 'R' : ' ',
99b59e51 735 srv->flags.shutdown ? 'S' : ' ',
ddc77a2e 736 xaction && xaction->request.placeholder ? 'P' : ' ',
99b59e51
SM
737 tt < 0.0 ? 0.0 : tt,
738 (int) srv->roffset,
ddc77a2e 739 xaction ? Format::QuoteMimeBlob(xaction->request.buf) : "(none)");
94439e4e 740 }
62e76326 741
bf3e8d5a
AJ
742 p->append("\nFlags key:\n"
743 " B\tBUSY\n"
744 " W\tWRITING\n"
745 " C\tCLOSING\n"
746 " R\tRESERVED\n"
747 " S\tSHUTDOWN PENDING\n"
748 " P\tPLACEHOLDER\n", 101);
94439e4e 749}
750
6082a0e2 751bool
e05a9d51 752Helper::Client::willOverload() const {
6082a0e2
EB
753 return queueFull() && !(childs.needNew() || GetFirstAvailable(this));
754}
755
e05a9d51
EB
756Helper::Client::Pointer
757Helper::Client::Make(const char * const name)
3bd118d6 758{
e05a9d51 759 return new Client(name);
3bd118d6
EB
760}
761
762statefulhelper::Pointer
763statefulhelper::Make(const char *name)
764{
765 return new statefulhelper(name);
766}
767
74addf6c 768void
e05a9d51 769helperShutdown(const Helper::Client::Pointer &hlp)
74addf6c 770{
c68e9c6b 771 dlink_node *link = hlp->servers.head;
62e76326 772
c68e9c6b 773 while (link) {
e05a9d51 774 const auto srv = static_cast<Helper::Session *>(link->data);
62e76326 775 link = link->next;
776
8cfc76db 777 if (srv->flags.shutdown) {
e237d339 778 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
62e76326 779 continue;
780 }
781
48d54e4d 782 assert(hlp->childs.n_active > 0);
5e263176 783 -- hlp->childs.n_active;
f53969cc 784 srv->flags.shutdown = true; /* request it to shut itself down */
62e76326 785
d8f10d6a 786 if (srv->flags.closing) {
e237d339 787 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
62e76326 788 continue;
789 }
790
d8f10d6a 791 if (srv->stats.pending) {
e237d339 792 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
62e76326 793 continue;
794 }
795
e237d339 796 debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
62e76326 797 /* the rest of the details is dealt with in the helperServerFree
798 * close handler
799 */
0a958e42 800 srv->closePipesSafely();
74addf6c 801 }
569605b9
EB
802
803 Assure(!hlp->childs.n_active);
804 hlp->dropQueued();
74addf6c 805}
806
94439e4e 807void
3bd118d6 808helperStatefulShutdown(const statefulhelper::Pointer &hlp)
94439e4e 809{
810 dlink_node *link = hlp->servers.head;
811 helper_stateful_server *srv;
62e76326 812
94439e4e 813 while (link) {
62e76326 814 srv = (helper_stateful_server *)link->data;
815 link = link->next;
816
8cfc76db 817 if (srv->flags.shutdown) {
e237d339 818 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
62e76326 819 continue;
820 }
821
48d54e4d 822 assert(hlp->childs.n_active > 0);
5e263176 823 -- hlp->childs.n_active;
f53969cc 824 srv->flags.shutdown = true; /* request it to shut itself down */
62e76326 825
a1267972 826 if (srv->stats.pending) {
e237d339 827 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
62e76326 828 continue;
829 }
830
831 if (srv->flags.closing) {
e237d339 832 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
62e76326 833 continue;
834 }
835
a56fcf0b 836 if (srv->reserved()) {
d7e0f901 837 if (shutting_down) {
e237d339 838 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Closing anyway.");
e1381638 839 } else {
e237d339 840 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Not Shutting Down Yet.");
d7e0f901
AJ
841 continue;
842 }
62e76326 843 }
844
e237d339 845 debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
bf8fe701 846
62e76326 847 /* the rest of the details is dealt with in the helperStatefulServerFree
848 * close handler
849 */
0a958e42 850 srv->closePipesSafely();
94439e4e 851 }
852}
853
e05a9d51 854Helper::Client::~Client()
1f5f60dd 855{
48d54e4d 856 /* note, don't free id_name, it probably points to static memory */
62e76326 857
569605b9
EB
858 // A non-empty queue would leak Helper::Xaction objects, stalling any
859 // pending (and even future collapsed) transactions. To avoid stalling
860 // transactions, we must dropQueued(). We ought to do that when we
861 // discover that no progress is possible rather than here because
862 // reference counting may keep this object alive for a long time.
863 assert(queue.empty());
94439e4e 864}
865
a56fcf0b 866void
569605b9 867Helper::Client::handleKilledServer(SessionBase * const srv)
74addf6c 868{
1f5f60dd 869 if (!srv->flags.shutdown) {
a56fcf0b
CT
870 assert(childs.n_active > 0);
871 --childs.n_active;
872 debugs(84, DBG_CRITICAL, "WARNING: " << id_name << " #" << srv->index << " exited");
62e76326 873
654835f0 874 handleFewerServers(srv->stats.replies >= 1);
5ea33fce 875
654835f0 876 if (childs.needNew() > 0) {
a56fcf0b 877 srv->flags.shutdown = true;
569605b9 878 openSessions();
4aca092a
HN
879 }
880 }
569605b9
EB
881
882 if (!childs.n_active)
883 dropQueued();
884}
885
886void
887Helper::Client::dropQueued()
888{
889 if (queue.empty())
890 return;
891
892 Assure(!childs.n_active);
893 Assure(!GetFirstAvailable(this));
894
895 // no helper servers means nobody can advance our queued transactions
896
897 debugs(80, DBG_CRITICAL, "ERROR: Dropping " << queue.size() << ' ' <<
898 id_name << " helper requests due to lack of helper processes");
899 // similar to SessionBase::dropQueued()
900 while (const auto r = nextRequest()) {
901 r->reply.result = Helper::Unknown;
902 callBack(*r);
903 delete r;
904 }
74addf6c 905}
906
654835f0 907void
e05a9d51 908Helper::Client::handleFewerServers(const bool madeProgress)
654835f0
EB
909{
910 const auto needNew = childs.needNew();
911
912 if (!needNew)
913 return; // some server(s) have died, but we still have enough
914
915 debugs(80, DBG_IMPORTANT, "Too few " << id_name << " processes are running (need " << needNew << "/" << childs.n_max << ")" <<
916 Debug::Extra << "active processes: " << childs.n_active <<
917 Debug::Extra << "processes configured to start at (re)configuration: " << childs.n_startup);
918
919 if (childs.n_active < childs.n_startup && last_restart > squid_curtime - 30) {
920 if (madeProgress)
921 debugs(80, DBG_CRITICAL, "ERROR: The " << id_name << " helpers are crashing too rapidly, need help!");
922 else
923 fatalf("The %s helpers are crashing too rapidly, need help!", id_name);
924 }
925}
926
bd71920d 927void
0a958e42 928Helper::SessionBase::HelperServerClosed(SessionBase * const srv)
a56fcf0b 929{
0a958e42
EB
930 srv->helper().handleKilledServer(srv);
931 srv->dropQueued();
f7ebc3fd 932 delete srv;
94439e4e 933}
934
ddc77a2e 935Helper::Xaction *
e05a9d51 936Helper::Session::popRequest(const int request_number)
95d2589c 937{
e05a9d51 938 Xaction *r = nullptr;
ddc77a2e 939 if (parent->childs.concurrency) {
2f8abb64 940 // If concurrency supported retrieve request from ID
e05a9d51 941 const auto it = requestsIndex.find(request_number);
ddc77a2e 942 if (it != requestsIndex.end()) {
32fd6d8a 943 r = *(it->second);
ddc77a2e
CT
944 requests.erase(it->second);
945 requestsIndex.erase(it);
32fd6d8a 946 }
ddc77a2e 947 } else if(!requests.empty()) {
32fd6d8a 948 // Else get the first request from queue, if any
ddc77a2e
CT
949 r = requests.front();
950 requests.pop_front();
32fd6d8a
CT
951 }
952
ddc77a2e
CT
953 return r;
954}
955
956/// Calls back with a pointer to the buffer with the helper output
957static void
e05a9d51 958helperReturnBuffer(Helper::Session * srv, const Helper::Client::Pointer &hlp, char * const msg, const size_t msgSize, const char * const msgEnd)
ddc77a2e
CT
959{
960 if (Helper::Xaction *r = srv->replyXaction) {
961 const bool hasSpace = r->reply.accumulate(msg, msgSize);
962 if (!hasSpace) {
963 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
964 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
965 "Squid input buffer: " << hlp->id_name << " #" << srv->index);
0a958e42 966 srv->closePipesSafely();
ddc77a2e
CT
967 return;
968 }
969
970 if (!msgEnd)
971 return; // We are waiting for more data.
972
32fd6d8a 973 bool retry = false;
194ccc9c 974 if (cbdataReferenceValid(r->request.data)) {
ddc77a2e
CT
975 r->reply.finalize();
976 if (r->reply.result == Helper::BrokenHelper && r->request.retries < MAX_RETRIES) {
977 debugs(84, DBG_IMPORTANT, "ERROR: helper: " << r->reply << ", attempt #" << (r->request.retries + 1) << " of 2");
32fd6d8a 978 retry = true;
194ccc9c 979 } else {
26b6afaf 980 hlp->callBack(*r);
194ccc9c 981 }
12f7e09b 982 }
95d2589c 983
5e263176 984 -- srv->stats.pending;
1f7ba0b4 985 ++ srv->stats.replies;
95d2589c 986
95dc7ff4 987 ++ hlp->stats.replies;
95d2589c
CT
988
989 srv->answer_time = current_time;
990
ddc77a2e 991 srv->dispatch_time = r->request.dispatch_time;
95d2589c
CT
992
993 hlp->stats.avg_svc_time =
994 Math::intAverage(hlp->stats.avg_svc_time,
ddc77a2e 995 tvSubMsec(r->request.dispatch_time, current_time),
31bc1fa6 996 hlp->stats.replies, REDIRECT_AV_FACTOR);
95d2589c 997
ddc77a2e
CT
998 // release or re-submit parsedRequestXaction object
999 srv->replyXaction = nullptr;
32fd6d8a 1000 if (retry) {
ddc77a2e 1001 ++r->request.retries;
32fd6d8a
CT
1002 hlp->submitRequest(r);
1003 } else
1004 delete r;
95d2589c 1005 }
95d2589c 1006
32fd6d8a
CT
1007 if (hlp->timeout && hlp->childs.concurrency)
1008 srv->checkForTimedOutRequests(hlp->retryTimedOut);
1009
95d2589c
CT
1010 if (!srv->flags.shutdown) {
1011 helperKickQueue(hlp);
1012 } else if (!srv->flags.closing && !srv->stats.pending) {
0a958e42 1013 srv->closeWritePipeSafely();
95d2589c
CT
1014 }
1015}
94439e4e 1016
74addf6c 1017static void
ced8def3 1018helperHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
74addf6c 1019{
e05a9d51 1020 const auto srv = static_cast<Helper::Session *>(data);
3bd118d6 1021 const auto hlp = srv->parent;
fa80a8ef 1022 assert(cbdataReferenceValid(data));
c4b7a5a9 1023
c8407295 1024 /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
62e76326 1025
c8407295 1026 if (flag == Comm::ERR_CLOSING) {
c4b7a5a9 1027 return;
1028 }
1029
e0d28505 1030 assert(conn->fd == srv->readPipe->fd);
420e3e30 1031
e237d339 1032 debugs(84, 5, "helperHandleRead: " << len << " bytes from " << hlp->id_name << " #" << srv->index);
62e76326 1033
c8407295 1034 if (flag != Comm::OK || len == 0) {
0a958e42 1035 srv->closePipesSafely();
62e76326 1036 return;
74addf6c 1037 }
62e76326 1038
07eca7e0 1039 srv->roffset += len;
1040 srv->rbuf[srv->roffset] = '\0';
8289d5a9 1041 debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
62e76326 1042
32fd6d8a 1043 if (!srv->stats.pending && !srv->stats.timedout) {
62e76326 1044 /* someone spoke without being spoken to */
dd8b7fd7 1045 debugs(84, DBG_IMPORTANT, "ERROR: Killing helper process after an unexpected read from " <<
e237d339 1046 hlp->id_name << " #" << srv->index << ", " << (int)len <<
bf8fe701 1047 " bytes '" << srv->rbuf << "'");
1048
07eca7e0 1049 srv->roffset = 0;
1050 srv->rbuf[0] = '\0';
0a958e42 1051 srv->closePipesSafely();
dd8b7fd7 1052 return;
07eca7e0 1053 }
1054
ddc77a2e
CT
1055 bool needsMore = false;
1056 char *msg = srv->rbuf;
1057 while (*msg && !needsMore) {
1058 int skip = 0;
1059 char *eom = strchr(msg, hlp->eom);
1060 if (eom) {
1061 skip = 1;
1062 debugs(84, 3, "helperHandleRead: end of reply found");
1063 if (eom > msg && eom[-1] == '\r' && hlp->eom == '\n') {
1064 *eom = '\0';
1065 // rewind to the \r octet which is the real terminal now
1066 // and remember that we have to skip forward 2 places now.
1067 skip = 2;
1068 --eom;
1069 }
1070 *eom = '\0';
ec682a26 1071 }
bf8fe701 1072
1176e456 1073 if (!srv->ignoreToEom && !srv->replyXaction) {
ddc77a2e
CT
1074 int i = 0;
1075 if (hlp->childs.concurrency) {
aee3523a 1076 char *e = nullptr;
ddc77a2e
CT
1077 i = strtol(msg, &e, 10);
1078 // Do we need to check for e == msg? Means wrong response from helper.
2f8abb64 1079 // Will be dropped as "unexpected reply on channel 0"
ddc77a2e
CT
1080 needsMore = !(xisspace(*e) || (eom && e == eom));
1081 if (!needsMore) {
1082 msg = e;
1083 while (*msg && xisspace(*msg))
1084 ++msg;
1085 } // else not enough data to compute request number
1086 }
1087 if (!(srv->replyXaction = srv->popRequest(i))) {
1088 if (srv->stats.timedout) {
1089 debugs(84, 3, "Timedout reply received for request-ID: " << i << " , ignore");
1090 } else {
d816f28d 1091 debugs(84, DBG_IMPORTANT, "ERROR: helperHandleRead: unexpected reply on channel " <<
ddc77a2e
CT
1092 i << " from " << hlp->id_name << " #" << srv->index <<
1093 " '" << srv->rbuf << "'");
1094 }
1176e456 1095 srv->ignoreToEom = true;
ddc77a2e
CT
1096 }
1097 } // else we need to just append reply data to the current Xaction
1098
1099 if (!needsMore) {
1100 size_t msgSize = eom ? eom - msg : (srv->roffset - (msg - srv->rbuf));
1101 assert(msgSize <= srv->rbuf_sz);
1102 helperReturnBuffer(srv, hlp, msg, msgSize, eom);
1103 msg += msgSize + skip;
1104 assert(static_cast<size_t>(msg - srv->rbuf) <= srv->rbuf_sz);
1176e456
CT
1105
1106 // The next message should not ignored.
1107 if (eom && srv->ignoreToEom)
1108 srv->ignoreToEom = false;
ddc77a2e 1109 } else
aee3523a 1110 assert(skip == 0 && eom == nullptr);
ddc77a2e 1111 }
985bfe6b 1112
ddc77a2e
CT
1113 if (needsMore) {
1114 size_t msgSize = (srv->roffset - (msg - srv->rbuf));
1115 assert(msgSize <= srv->rbuf_sz);
1116 memmove(srv->rbuf, msg, msgSize);
1117 srv->roffset = msgSize;
7bef981e 1118 srv->rbuf[srv->roffset] = '\0';
ddc77a2e
CT
1119 } else {
1120 // All of the responses parsed and msg points at the end of read data
1121 assert(static_cast<size_t>(msg - srv->rbuf) == srv->roffset);
1122 srv->roffset = 0;
74addf6c 1123 }
07eca7e0 1124
b0bb5517 1125 if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
a78f2175
AR
1126 int spaceSize = srv->rbuf_sz - srv->roffset - 1;
1127 assert(spaceSize >= 0);
1128
abd8f140
AJ
1129 AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
1130 CommIoCbPtrFun(helperHandleRead, srv));
a78f2175 1131 comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
abd8f140 1132 }
74addf6c 1133}
1134
94439e4e 1135static void
ced8def3 1136helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
94439e4e 1137{
aee3523a 1138 char *t = nullptr;
e6ccf245 1139 helper_stateful_server *srv = (helper_stateful_server *)data;
3bd118d6 1140 const auto hlp = srv->parent;
fa80a8ef 1141 assert(cbdataReferenceValid(data));
c4b7a5a9 1142
c8407295 1143 /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
62e76326 1144
c8407295 1145 if (flag == Comm::ERR_CLOSING) {
c4b7a5a9 1146 return;
1147 }
1148
e0d28505 1149 assert(conn->fd == srv->readPipe->fd);
420e3e30 1150
4a7a3d56 1151 debugs(84, 5, "helperStatefulHandleRead: " << len << " bytes from " <<
e237d339 1152 hlp->id_name << " #" << srv->index);
bf8fe701 1153
c8407295 1154 if (flag != Comm::OK || len == 0) {
0a958e42 1155 srv->closePipesSafely();
62e76326 1156 return;
94439e4e 1157 }
62e76326 1158
07eca7e0 1159 srv->roffset += len;
1160 srv->rbuf[srv->roffset] = '\0';
8289d5a9 1161 debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
62e76326 1162
dd8b7fd7 1163 if (srv->requests.empty()) {
62e76326 1164 /* someone spoke without being spoken to */
dd8b7fd7 1165 debugs(84, DBG_IMPORTANT, "ERROR: Killing helper process after an unexpected read from " <<
e237d339 1166 hlp->id_name << " #" << srv->index << ", " << (int)len <<
bf8fe701 1167 " bytes '" << srv->rbuf << "'");
1168
07eca7e0 1169 srv->roffset = 0;
0a958e42 1170 srv->closePipesSafely();
dd8b7fd7 1171 return;
07eca7e0 1172 }
1173
0af9303a 1174 if ((t = strchr(srv->rbuf, hlp->eom))) {
bf8fe701 1175 debugs(84, 3, "helperStatefulHandleRead: end of reply found");
6bf4f823 1176
36cb7d3e
AJ
1177 if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n') {
1178 *t = '\0';
1179 // rewind to the \r octet which is the real terminal now
36cb7d3e
AJ
1180 --t;
1181 }
6bf4f823 1182
62e76326 1183 *t = '\0';
ddc77a2e
CT
1184 }
1185
dd8b7fd7
EB
1186 const auto r = srv->requests.front();
1187
1188 if (!r->reply.accumulate(srv->rbuf, t ? (t - srv->rbuf) : srv->roffset)) {
ddc77a2e
CT
1189 debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
1190 "helper that overflowed " << srv->rbuf_sz << "-byte " <<
1191 "Squid input buffer: " << hlp->id_name << " #" << srv->index);
0a958e42 1192 srv->closePipesSafely();
ddc77a2e
CT
1193 return;
1194 }
1195 /**
1196 * BUG: the below assumes that only one response per read() was received and discards any octets remaining.
1197 * Doing this prohibits concurrency support with multiple replies per read().
1198 * TODO: check that read() setup on these buffers pays attention to roffest!=0
1199 * TODO: check that replies bigger than the buffer are discarded and do not to affect future replies
1200 */
1201 srv->roffset = 0;
1202
1203 if (t) {
1204 /* end of reply found */
1205 srv->requests.pop_front(); // we already have it in 'r'
1206 int called = 1;
62e76326 1207
dd8b7fd7 1208 if (cbdataReferenceValid(r->request.data)) {
ddc77a2e 1209 r->reply.finalize();
a56fcf0b 1210 r->reply.reservationId = srv->reservationId;
26b6afaf 1211 hlp->callBack(*r);
62e76326 1212 } else {
e0236918 1213 debugs(84, DBG_IMPORTANT, "StatefulHandleRead: no callback data registered");
e1381638 1214 called = 0;
62e76326 1215 }
ddc77a2e 1216
002df7ca 1217 delete r;
1f7ba0b4
AJ
1218
1219 -- srv->stats.pending;
1220 ++ srv->stats.replies;
1221
95dc7ff4 1222 ++ hlp->stats.replies;
f9598528 1223 srv->answer_time = current_time;
62e76326 1224 hlp->stats.avg_svc_time =
a98bcbee
AJ
1225 Math::intAverage(hlp->stats.avg_svc_time,
1226 tvSubMsec(srv->dispatch_time, current_time),
1227 hlp->stats.replies, REDIRECT_AV_FACTOR);
62e76326 1228
e1381638
AJ
1229 if (called)
1230 helperStatefulServerDone(srv);
1231 else
a56fcf0b 1232 hlp->cancelReservation(srv->reservationId);
94439e4e 1233 }
07eca7e0 1234
b0bb5517 1235 if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
ddc77a2e 1236 int spaceSize = srv->rbuf_sz - 1;
a78f2175 1237
abd8f140
AJ
1238 AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
1239 CommIoCbPtrFun(helperStatefulHandleRead, srv));
ddc77a2e 1240 comm_read(srv->readPipe, srv->rbuf, spaceSize, call);
abd8f140 1241 }
94439e4e 1242}
1243
6825b101 1244/// Handles a request when all running helpers, if any, are busy.
74addf6c 1245static void
e05a9d51 1246Enqueue(Helper::Client * const hlp, Helper::Xaction * const r)
74addf6c 1247{
6215dc18 1248 hlp->queue.push(r);
95dc7ff4 1249 ++ hlp->stats.queue_size;
62e76326 1250
48d54e4d
AJ
1251 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1252 if (hlp->childs.needNew() > 0) {
bd71920d 1253 hlp->openSessions();
48d54e4d
AJ
1254 return;
1255 }
1256
6825b101 1257 if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
62e76326 1258 return;
1259
74addf6c 1260 if (squid_curtime - hlp->last_queue_warn < 600)
62e76326 1261 return;
1262
fe73896c 1263 if (shutting_down || reconfiguring)
62e76326 1264 return;
1265
74addf6c 1266 hlp->last_queue_warn = squid_curtime;
62e76326 1267
fa84c01d
FC
1268 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1269 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1270 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
74addf6c 1271}
1272
94439e4e 1273static void
ddc77a2e 1274StatefulEnqueue(statefulhelper * hlp, Helper::Xaction * r)
94439e4e 1275{
6215dc18 1276 hlp->queue.push(r);
95dc7ff4 1277 ++ hlp->stats.queue_size;
62e76326 1278
48d54e4d
AJ
1279 /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1280 if (hlp->childs.needNew() > 0) {
bd71920d 1281 hlp->openSessions();
48d54e4d
AJ
1282 return;
1283 }
1284
6825b101 1285 if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
62e76326 1286 return;
1287
94439e4e 1288 if (squid_curtime - hlp->last_queue_warn < 600)
62e76326 1289 return;
1290
94439e4e 1291 if (shutting_down || reconfiguring)
62e76326 1292 return;
1293
94439e4e 1294 hlp->last_queue_warn = squid_curtime;
62e76326 1295
fa84c01d
FC
1296 debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1297 debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1298 debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
94439e4e 1299}
1300
ddc77a2e 1301Helper::Xaction *
e05a9d51 1302Helper::Client::nextRequest()
74addf6c 1303{
6215dc18
AJ
1304 if (queue.empty())
1305 return nullptr;
62e76326 1306
6215dc18
AJ
1307 auto *r = queue.front();
1308 queue.pop();
1309 --stats.queue_size;
94439e4e 1310 return r;
1311}
1312
e05a9d51
EB
1313static Helper::Session *
1314GetFirstAvailable(const Helper::Client::Pointer &hlp)
74addf6c 1315{
1316 dlink_node *n;
e05a9d51 1317 Helper::Session *selected = nullptr;
48e7baac 1318 debugs(84, 5, "GetFirstAvailable: Running servers " << hlp->childs.n_running);
62e76326 1319
48d54e4d 1320 if (hlp->childs.n_running == 0)
aee3523a 1321 return nullptr;
62e76326 1322
07eca7e0 1323 /* Find "least" loaded helper (approx) */
aee3523a 1324 for (n = hlp->servers.head; n != nullptr; n = n->next) {
e05a9d51 1325 const auto srv = static_cast<Helper::Session *>(n->data);
62e76326 1326
07eca7e0 1327 if (selected && selected->stats.pending <= srv->stats.pending)
62e76326 1328 continue;
1329
d8f10d6a 1330 if (srv->flags.shutdown)
62e76326 1331 continue;
1332
07eca7e0 1333 if (!srv->stats.pending)
1334 return srv;
1335
1336 if (selected) {
1337 selected = srv;
1338 break;
1339 }
1340
1341 selected = srv;
74addf6c 1342 }
62e76326 1343
48e7baac
AJ
1344 if (!selected) {
1345 debugs(84, 5, "GetFirstAvailable: None available.");
aee3523a 1346 return nullptr;
48e7baac 1347 }
07eca7e0 1348
48e7baac 1349 if (selected->stats.pending >= (hlp->childs.concurrency ? hlp->childs.concurrency : 1)) {
6082a0e2 1350 debugs(84, 3, "GetFirstAvailable: Least-loaded helper is fully loaded!");
aee3523a 1351 return nullptr;
48e7baac 1352 }
07eca7e0 1353
48e7baac 1354 debugs(84, 5, "GetFirstAvailable: returning srv-" << selected->index);
07eca7e0 1355 return selected;
74addf6c 1356}
1357
94439e4e 1358static helper_stateful_server *
3bd118d6 1359StatefulGetFirstAvailable(const statefulhelper::Pointer &hlp)
94439e4e 1360{
1361 dlink_node *n;
aee3523a 1362 helper_stateful_server *srv = nullptr;
a56fcf0b 1363 helper_stateful_server *oldestReservedServer = nullptr;
48d54e4d 1364 debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp->childs.n_running);
62e76326 1365
48d54e4d 1366 if (hlp->childs.n_running == 0)
aee3523a 1367 return nullptr;
62e76326 1368
aee3523a 1369 for (n = hlp->servers.head; n != nullptr; n = n->next) {
62e76326 1370 srv = (helper_stateful_server *)n->data;
1371
a1267972 1372 if (srv->stats.pending)
62e76326 1373 continue;
1374
a56fcf0b
CT
1375 if (srv->reserved()) {
1376 if ((squid_curtime - srv->reservationStart) > hlp->childs.reservationTimeout) {
1377 if (!oldestReservedServer)
1378 oldestReservedServer = srv;
1379 else if (oldestReservedServer->reservationStart < srv->reservationStart)
1380 oldestReservedServer = srv;
1381 debugs(84, 5, "the earlier reserved server is the srv-" << oldestReservedServer->index);
1382 }
62e76326 1383 continue;
a56fcf0b 1384 }
62e76326 1385
d8f10d6a 1386 if (srv->flags.shutdown)
62e76326 1387 continue;
1388
26ac0430 1389 debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv->index);
62e76326 1390 return srv;
94439e4e 1391 }
62e76326 1392
a56fcf0b
CT
1393 if (oldestReservedServer) {
1394 debugs(84, 5, "expired reservation " << oldestReservedServer->reservationId << " for srv-" << oldestReservedServer->index);
1395 return oldestReservedServer;
1396 }
1397
bf8fe701 1398 debugs(84, 5, "StatefulGetFirstAvailable: None available.");
a56fcf0b 1399 return nullptr;
94439e4e 1400}
1401
42679bd6 1402static void
ced8def3 1403helperDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag flag, int, void *data)
42679bd6 1404{
e05a9d51 1405 const auto srv = static_cast<Helper::Session *>(data);
07eca7e0 1406
2fe7eff9 1407 srv->writebuf->clean();
032785bf 1408 delete srv->writebuf;
aee3523a 1409 srv->writebuf = nullptr;
be4d35dc 1410 srv->flags.writing = false;
07eca7e0 1411
c8407295 1412 if (flag != Comm::OK) {
07eca7e0 1413 /* Helper server has crashed */
e237d339 1414 debugs(84, DBG_CRITICAL, "helperDispatch: Helper " << srv->parent->id_name << " #" << srv->index << " has crashed");
07eca7e0 1415 return;
1416 }
1417
2fe7eff9 1418 if (!srv->wqueue->isNull()) {
07eca7e0 1419 srv->writebuf = srv->wqueue;
032785bf 1420 srv->wqueue = new MemBuf;
be4d35dc 1421 srv->flags.writing = true;
ec41b64c
AJ
1422 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1423 CommIoCbPtrFun(helperDispatchWriteDone, srv));
aee3523a 1424 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, nullptr);
07eca7e0 1425 }
42679bd6 1426}
1427
74addf6c 1428static void
e05a9d51 1429helperDispatch(Helper::Session * const srv, Helper::Xaction * const r)
74addf6c 1430{
3bd118d6 1431 const auto hlp = srv->parent;
32fd6d8a 1432 const uint64_t reqId = ++srv->nextRequestId;
62e76326 1433
ddc77a2e 1434 if (!cbdataReferenceValid(r->request.data)) {
d816f28d 1435 debugs(84, DBG_IMPORTANT, "ERROR: helperDispatch: invalid callback data");
002df7ca 1436 delete r;
62e76326 1437 return;
74addf6c 1438 }
62e76326 1439
ddc77a2e 1440 r->request.Id = reqId;
e05a9d51 1441 const auto it = srv->requests.insert(srv->requests.end(), r);
ddc77a2e 1442 r->request.dispatch_time = current_time;
07eca7e0 1443
2fe7eff9 1444 if (srv->wqueue->isNull())
1445 srv->wqueue->init();
07eca7e0 1446
32fd6d8a 1447 if (hlp->childs.concurrency) {
e05a9d51 1448 srv->requestsIndex.insert(Helper::Session::RequestIndex::value_type(reqId, it));
32fd6d8a 1449 assert(srv->requestsIndex.size() == srv->requests.size());
ddc77a2e 1450 srv->wqueue->appendf("%" PRIu64 " %s", reqId, r->request.buf);
32fd6d8a 1451 } else
ddc77a2e 1452 srv->wqueue->append(r->request.buf, strlen(r->request.buf));
07eca7e0 1453
1454 if (!srv->flags.writing) {
aee3523a 1455 assert(nullptr == srv->writebuf);
07eca7e0 1456 srv->writebuf = srv->wqueue;
032785bf 1457 srv->wqueue = new MemBuf;
be4d35dc 1458 srv->flags.writing = true;
ec41b64c
AJ
1459 AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1460 CommIoCbPtrFun(helperDispatchWriteDone, srv));
aee3523a 1461 Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, nullptr);
07eca7e0 1462 }
1463
ddc77a2e 1464 debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index << ", " << strlen(r->request.buf) << " bytes");
bf8fe701 1465
95dc7ff4 1466 ++ srv->stats.uses;
1f7ba0b4 1467 ++ srv->stats.pending;
95dc7ff4 1468 ++ hlp->stats.requests;
74addf6c 1469}
1470
42679bd6 1471static void
ced8def3
AJ
1472helperStatefulDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag, int, void *)
1473{}
42679bd6 1474
94439e4e 1475static void
ddc77a2e 1476helperStatefulDispatch(helper_stateful_server * srv, Helper::Xaction * r)
94439e4e 1477{
3bd118d6 1478 const auto hlp = srv->parent;
62e76326 1479
ddc77a2e 1480 if (!cbdataReferenceValid(r->request.data)) {
d816f28d 1481 debugs(84, DBG_IMPORTANT, "ERROR: helperStatefulDispatch: invalid callback data");
002df7ca 1482 delete r;
a56fcf0b 1483 hlp->cancelReservation(srv->reservationId);
62e76326 1484 return;
94439e4e 1485 }
62e76326 1486
e237d339 1487 debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index);
62e76326 1488
a56fcf0b
CT
1489 assert(srv->reservationId);
1490 r->reply.reservationId = srv->reservationId;
1491
ddc77a2e 1492 if (r->request.placeholder == 1) {
62e76326 1493 /* a callback is needed before this request can _use_ a helper. */
d20ce97d 1494 /* we don't care about releasing this helper. The request NEVER
62e76326 1495 * gets to the helper. So we throw away the return code */
ddc77a2e 1496 r->reply.result = Helper::Unknown;
26b6afaf 1497 hlp->callBack(*r);
62e76326 1498 /* throw away the placeholder */
002df7ca 1499 delete r;
62e76326 1500 /* and push the queue. Note that the callback may have submitted a new
e166785a 1501 * request to the helper which is why we test for the request */
62e76326 1502
bf3e8d5a 1503 if (!srv->requests.size())
a8c4f8d6 1504 helperStatefulServerDone(srv);
62e76326 1505
1506 return;
94439e4e 1507 }
62e76326 1508
bf3e8d5a 1509 srv->requests.push_back(r);
94439e4e 1510 srv->dispatch_time = current_time;
ec41b64c 1511 AsyncCall::Pointer call = commCbCall(5,5, "helperStatefulDispatchWriteDone",
3bd118d6 1512 CommIoCbPtrFun(helperStatefulDispatchWriteDone, srv));
aee3523a 1513 Comm::Write(srv->writePipe, r->request.buf, strlen(r->request.buf), call, nullptr);
bf8fe701 1514 debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
e237d339 1515 hlp->id_name << " #" << srv->index << ", " <<
ddc77a2e 1516 (int) strlen(r->request.buf) << " bytes");
bf8fe701 1517
95dc7ff4 1518 ++ srv->stats.uses;
1f7ba0b4 1519 ++ srv->stats.pending;
95dc7ff4 1520 ++ hlp->stats.requests;
94439e4e 1521}
1522
74addf6c 1523static void
e05a9d51 1524helperKickQueue(const Helper::Client::Pointer &hlp)
74addf6c 1525{
e05a9d51
EB
1526 Helper::Xaction *r = nullptr;
1527 Helper::Session *srv = nullptr;
62e76326 1528
6215dc18 1529 while ((srv = GetFirstAvailable(hlp)) && (r = hlp->nextRequest()))
62e76326 1530 helperDispatch(srv, r);
569605b9
EB
1531
1532 if (!hlp->childs.n_active)
1533 hlp->dropQueued();
74addf6c 1534}
1535
94439e4e 1536static void
3bd118d6 1537helperStatefulKickQueue(const statefulhelper::Pointer &hlp)
94439e4e 1538{
ddc77a2e 1539 Helper::Xaction *r;
94439e4e 1540 helper_stateful_server *srv;
a56fcf0b
CT
1541 while ((srv = StatefulGetFirstAvailable(hlp)) && (r = hlp->nextRequest())) {
1542 debugs(84, 5, "found srv-" << srv->index);
1543 hlp->reserveServer(srv);
62e76326 1544 helperStatefulDispatch(srv, r);
a56fcf0b 1545 }
569605b9
EB
1546
1547 if (!hlp->childs.n_active)
1548 hlp->dropQueued();
94439e4e 1549}
1550
1551static void
a8c4f8d6 1552helperStatefulServerDone(helper_stateful_server * srv)
94439e4e 1553{
420e3e30
HN
1554 if (!srv->flags.shutdown) {
1555 helperStatefulKickQueue(srv->parent);
a56fcf0b 1556 } else if (!srv->flags.closing && !srv->reserved() && !srv->stats.pending) {
0a958e42 1557 srv->closeWritePipeSafely();
420e3e30
HN
1558 return;
1559 }
94439e4e 1560}
1561
32fd6d8a 1562void
e05a9d51 1563Helper::Session::checkForTimedOutRequests(bool const retry)
32fd6d8a
CT
1564{
1565 assert(parent->childs.concurrency);
ddc77a2e 1566 while(!requests.empty() && requests.front()->request.timedOut(parent->timeout)) {
e05a9d51 1567 const auto r = requests.front();
32fd6d8a 1568 RequestIndex::iterator it;
ddc77a2e 1569 it = requestsIndex.find(r->request.Id);
32fd6d8a
CT
1570 assert(it != requestsIndex.end());
1571 requestsIndex.erase(it);
1572 requests.pop_front();
ddc77a2e 1573 debugs(84, 2, "Request " << r->request.Id << " timed-out, remove it from queue");
32fd6d8a 1574 bool retried = false;
ddc77a2e
CT
1575 if (retry && r->request.retries < MAX_RETRIES && cbdataReferenceValid(r->request.data)) {
1576 debugs(84, 2, "Retry request " << r->request.Id);
1577 ++r->request.retries;
32fd6d8a
CT
1578 parent->submitRequest(r);
1579 retried = true;
26b6afaf 1580 } else if (cbdataReferenceValid(r->request.data)) {
32fd6d8a 1581 if (!parent->onTimedOutResponse.isEmpty()) {
ddc77a2e
CT
1582 if (r->reply.accumulate(parent->onTimedOutResponse.rawContent(), parent->onTimedOutResponse.length()))
1583 r->reply.finalize();
1584 else
1585 r->reply.result = Helper::TimedOut;
26b6afaf 1586 parent->callBack(*r);
ddc77a2e
CT
1587 } else {
1588 r->reply.result = Helper::TimedOut;
26b6afaf 1589 parent->callBack(*r);
ddc77a2e 1590 }
32fd6d8a
CT
1591 }
1592 --stats.pending;
1593 ++stats.timedout;
1594 ++parent->stats.timedout;
1595 if (!retried)
1596 delete r;
1597 }
1598}
1599
1600void
e05a9d51 1601Helper::Session::requestTimeout(const CommTimeoutCbParams &io)
32fd6d8a 1602{
bf95c10a 1603 debugs(26, 3, io.conn);
e05a9d51 1604 const auto srv = static_cast<Session *>(io.data);
32fd6d8a 1605
32fd6d8a
CT
1606 srv->checkForTimedOutRequests(srv->parent->retryTimedOut);
1607
e05a9d51
EB
1608 debugs(84, 3, io.conn << " establish a new timeout");
1609 AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "Helper::Session::requestTimeout",
1610 CommTimeoutCbPtrFun(Session::requestTimeout, srv));
32fd6d8a 1611
4650a4fa
FC
1612 const time_t timeSpent = srv->requests.empty() ? 0 : (squid_curtime - srv->requests.front()->request.dispatch_time.tv_sec);
1613 const time_t minimumNewTimeout = 1; // second
1614 const auto timeLeft = max(minimumNewTimeout, srv->parent->timeout - timeSpent);
32fd6d8a
CT
1615
1616 commSetConnTimeout(io.conn, timeLeft, timeoutCall);
1617}
f53969cc 1618