]> git.ipfire.org Git - thirdparty/squid.git/blame - src/adaptation/icap/Xaction.cc
Enforce connection re-try limit connect_retries
[thirdparty/squid.git] / src / adaptation / icap / Xaction.cc
CommitLineData
774c051c 1/*
507d0a78 2 * DEBUG: section 93 ICAP (RFC 3507) Client
774c051c 3 */
4
5#include "squid.h"
6#include "comm.h"
cfd66529 7#include "comm/ConnectStateData.h"
bd7f2ede 8#include "CommCalls.h"
5f8252d2 9#include "HttpMsg.h"
26cc52cb 10#include "adaptation/icap/Xaction.h"
3ff65596 11#include "adaptation/icap/Launcher.h"
26cc52cb 12#include "adaptation/icap/Config.h"
3d93a84d 13#include "base/TextException.h"
781ce8ff 14#include "pconn.h"
3ff65596
AR
15#include "HttpRequest.h"
16#include "HttpReply.h"
17#include "acl/FilledChecklist.h"
18#include "icap_log.h"
781ce8ff 19#include "fde.h"
3ff65596 20#include "SquidTime.h"
781ce8ff 21
22static PconnPool *icapPconnPool = new PconnPool("ICAP Servers");
774c051c 23
5f8252d2 24
26cc52cb 25//CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Icap, Xaction);
5f8252d2 26
26cc52cb 27Adaptation::Icap::Xaction::Xaction(const char *aTypeName, Adaptation::Initiator *anInitiator, Adaptation::Icap::ServiceRep::Pointer &aService):
bd7f2ede 28 AsyncJob(aTypeName),
a22e6cd3 29 Adaptation::Initiate(aTypeName, anInitiator),
3ff65596
AR
30 icapRequest(NULL),
31 icapReply(NULL),
32 attempts(0),
2413d60a 33 connection(NULL),
a22e6cd3 34 theService(aService),
774c051c 35 commBuf(NULL), commBufSize(0),
36 commEof(false),
2dfede9e 37 reuseConnection(true),
c824c43b 38 isRetriable(true),
3ff65596 39 isRepeatable(true),
cfc68405 40 ignoreLastWrite(false),
c824c43b 41 connector(NULL), reader(NULL), writer(NULL), closer(NULL)
774c051c 42{
5f8252d2 43 debugs(93,3, typeName << " constructed, this=" << this <<
9e008dda 44 " [icapx" << id << ']'); // we should not call virtual status() here
3ff65596
AR
45 icapRequest = HTTPMSGLOCK(new HttpRequest);
46 icap_tr_start = current_time;
774c051c 47}
48
26cc52cb 49Adaptation::Icap::Xaction::~Xaction()
774c051c 50{
5f8252d2 51 debugs(93,3, typeName << " destructed, this=" << this <<
9e008dda 52 " [icapx" << id << ']'); // we should not call virtual status() here
3ff65596
AR
53 HTTPMSGUNLOCK(icapRequest);
54 HTTPMSGUNLOCK(icapReply);
5f8252d2 55}
56
26cc52cb
AR
57Adaptation::Icap::ServiceRep &
58Adaptation::Icap::Xaction::service()
0bef8dd7 59{
a22e6cd3
AR
60 Must(theService != NULL);
61 return *theService;
0bef8dd7
AR
62}
63
26cc52cb 64void Adaptation::Icap::Xaction::disableRetries()
9e008dda 65{
3ff65596
AR
66 debugs(93,5, typeName << (isRetriable ? " from now on" : " still") <<
67 " cannot be retried " << status());
c824c43b 68 isRetriable = false;
69}
70
3ff65596
AR
71void Adaptation::Icap::Xaction::disableRepeats(const char *reason)
72{
73 debugs(93,5, typeName << (isRepeatable ? " from now on" : " still") <<
74 " cannot be repeated because " << reason << status());
75 isRepeatable = false;
76}
77
26cc52cb 78void Adaptation::Icap::Xaction::start()
5f8252d2 79{
0bef8dd7 80 Adaptation::Initiate::start();
5f8252d2 81
82 readBuf.init(SQUID_TCP_SO_RCVBUF, SQUID_TCP_SO_RCVBUF);
83 commBuf = (char*)memAllocBuf(SQUID_TCP_SO_RCVBUF, &commBufSize);
84 // make sure maximum readBuf space does not exceed commBuf size
85 Must(static_cast<size_t>(readBuf.potentialSpaceSize()) <= commBufSize);
774c051c 86}
87
88// TODO: obey service-specific, OPTIONS-reported connection limit
26cc52cb 89void Adaptation::Icap::Xaction::openConnection()
774c051c 90{
b7ac5457 91 Ip::Address client_addr;
7aa6b7cb 92
2413d60a 93 Must(connection == NULL || !connection->isOpen());
c824c43b 94
0bef8dd7 95 const Adaptation::Service &s = service();
774c051c 96
26cc52cb 97 if (!TheConfig.reuse_connections)
560d7d2d 98 disableRetries(); // this will also safely drain pconn pool
99
2413d60a
AJ
100 connection = new Comm::Connection;
101
102 /* NP: set these here because it applies whether a pconn or a new conn is used */
103
104 // TODO: where do we get the DNS info for the ICAP server host ??
105 // Ip::Address will do a BLOCKING lookup if s.cfg().host is a hostname
106 connection->remote = s.cfg().host.termedBuf();
107 connection->remote.SetPort(s.cfg().port);
108
c8ceec27 109 // TODO: check whether NULL domain is appropriate here
2413d60a
AJ
110 connection->fd = icapPconnPool->pop(s.cfg().host.termedBuf(), s.cfg().port, NULL, client_addr, isRetriable);
111 if (connection->isOpen()) {
112 debugs(93,3, HERE << "reused pconn FD " << connection->fd);
bd7f2ede 113
114 // fake the connect callback
26cc52cb
AR
115 // TODO: can we sync call Adaptation::Icap::Xaction::noteCommConnected here instead?
116 typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> Dialer;
117 Dialer dialer(this, &Adaptation::Icap::Xaction::noteCommConnected);
2413d60a 118 dialer.params.fd = connection->fd;
bd7f2ede 119 dialer.params.flag = COMM_OK;
120 // fake other parameters by copying from the existing connection
26cc52cb 121 connector = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected", dialer);
9e008dda 122 ScheduleCallHere(connector);
c8ceec27 123 return;
2dfede9e 124 }
125
c8ceec27 126 disableRetries(); // we only retry pconn failures
127
26cc52cb
AR
128 typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> ConnectDialer;
129 connector = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected",
130 ConnectDialer(this, &Adaptation::Icap::Xaction::noteCommConnected));
cfd66529 131
2413d60a 132 ConnectStateData *cs = new ConnectStateData(connection, connector);
cfd66529
AJ
133 cs->host = xstrdup(s.cfg().host.termedBuf());
134 cs->connect_timeout = TheConfig.connect_timeout(service().cfg().bypass);
135 cs->connect();
774c051c 136}
137
2dfede9e 138/*
139 * This event handler is necessary to work around the no-rentry policy
26cc52cb 140 * of Adaptation::Icap::Xaction::callStart()
2dfede9e 141 */
bd7f2ede 142#if 0
2dfede9e 143void
26cc52cb 144Adaptation::Icap::Xaction::reusedConnection(void *data)
2dfede9e 145{
192378eb 146 debugs(93, 5, HERE << "reused connection");
26cc52cb 147 Adaptation::Icap::Xaction *x = (Adaptation::Icap::Xaction*)data;
2dfede9e 148 x->noteCommConnected(COMM_OK);
149}
bd7f2ede 150#endif
2dfede9e 151
26cc52cb 152void Adaptation::Icap::Xaction::closeConnection()
774c051c 153{
2413d60a 154 if (connection != NULL && connection->isOpen()) {
774c051c 155
bd7f2ede 156 if (closer != NULL) {
2413d60a 157 comm_remove_close_handler(connection->fd, closer);
774c051c 158 closer = NULL;
159 }
160
c99de607 161 cancelRead(); // may not work
162
5f8252d2 163 if (reuseConnection && !doneWithIo()) {
dd6e6148 164 //status() adds leading spaces.
5f8252d2 165 debugs(93,5, HERE << "not reusing pconn due to pending I/O" << status());
c99de607 166 reuseConnection = false;
167 }
774c051c 168
2dfede9e 169 if (reuseConnection) {
b7ac5457 170 Ip::Address client_addr;
dd6e6148 171 //status() adds leading spaces.
5f8252d2 172 debugs(93,3, HERE << "pushing pconn" << status());
9e008dda 173 AsyncCall::Pointer call = NULL;
2413d60a
AJ
174 commSetTimeout(connection->fd, -1, call);
175 icapPconnPool->push(connection->fd, theService->cfg().host.termedBuf(),
9e008dda 176 theService->cfg().port, NULL, client_addr);
c824c43b 177 disableRetries();
2413d60a 178 connection->fd = -1; // prevent premature real closing.
2dfede9e 179 } else {
dd6e6148 180 //status() adds leading spaces.
5f8252d2 181 debugs(93,3, HERE << "closing pconn" << status());
c99de607 182 // comm_close will clear timeout
2413d60a 183 connection->close();
2dfede9e 184 }
774c051c 185
c99de607 186 writer = NULL;
187 reader = NULL;
774c051c 188 connector = NULL;
774c051c 189 }
190}
191
192// connection with the ICAP service established
26cc52cb 193void Adaptation::Icap::Xaction::noteCommConnected(const CommConnectCbParams &io)
774c051c 194{
cfd66529
AJ
195 if (io.flag == COMM_TIMEOUT) {
196 handleCommTimedout();
197 return;
198 }
199
bd7f2ede 200 Must(connector != NULL);
774c051c 201 connector = NULL;
c99de607 202
bd7f2ede 203 if (io.flag != COMM_OK)
c99de607 204 dieOnConnectionFailure(); // throws
205
cfd66529
AJ
206 typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommCloseCbParams> CloseDialer;
207 closer = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed",
208 CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed));
209 comm_add_close_handler(io.conn->fd, closer);
210
211 fd_table[io.conn->fd].noteUse(icapPconnPool);
774c051c 212
2413d60a 213 connection = io.conn;
774c051c 214 handleCommConnected();
774c051c 215}
216
26cc52cb 217void Adaptation::Icap::Xaction::dieOnConnectionFailure()
9e008dda 218{
4932ad93 219 debugs(93, 2, HERE << typeName <<
9e008dda 220 " failed to connect to " << service().cfg().uri);
c99de607 221 throw TexcHere("cannot connect to the ICAP service");
222}
223
26cc52cb 224void Adaptation::Icap::Xaction::scheduleWrite(MemBuf &buf)
774c051c 225{
226 // comm module will free the buffer
26cc52cb
AR
227 typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommIoCbParams> Dialer;
228 writer = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommWrote",
229 Dialer(this, &Adaptation::Icap::Xaction::noteCommWrote));
bd7f2ede 230
2413d60a 231 comm_write_mbuf(connection->fd, &buf, writer);
c99de607 232 updateTimeout();
774c051c 233}
234
26cc52cb 235void Adaptation::Icap::Xaction::noteCommWrote(const CommIoCbParams &io)
774c051c 236{
bd7f2ede 237 Must(writer != NULL);
774c051c 238 writer = NULL;
9e008dda 239
cfc68405 240 if (ignoreLastWrite) {
241 // a hack due to comm inability to cancel a pending write
9e008dda 242 ignoreLastWrite = false;
bd7f2ede 243 debugs(93, 7, HERE << "ignoring last write; status: " << io.flag);
cfc68405 244 } else {
bd7f2ede 245 Must(io.flag == COMM_OK);
3ff65596 246 al.icap.bytesSent += io.size;
cfc68405 247 updateTimeout();
bd7f2ede 248 handleCommWrote(io.size);
cfc68405 249 }
774c051c 250}
251
252// communication timeout with the ICAP service
26cc52cb 253void Adaptation::Icap::Xaction::noteCommTimedout(const CommTimeoutCbParams &io)
774c051c 254{
774c051c 255 handleCommTimedout();
774c051c 256}
257
26cc52cb 258void Adaptation::Icap::Xaction::handleCommTimedout()
774c051c 259{
4932ad93 260 debugs(93, 2, HERE << typeName << " failed: timeout with " <<
9e008dda 261 theService->cfg().methodStr() << " " <<
a7a42b14 262 theService->cfg().uri << status());
fe3e2600 263 reuseConnection = false;
bd7f2ede 264 throw TexcHere(connector != NULL ?
9e008dda
AJ
265 "timed out while connecting to the ICAP service" :
266 "timed out while talking to the ICAP service");
774c051c 267}
268
269// unexpected connection close while talking to the ICAP service
26cc52cb 270void Adaptation::Icap::Xaction::noteCommClosed(const CommCloseCbParams &io)
774c051c 271{
272 closer = NULL;
774c051c 273 handleCommClosed();
774c051c 274}
275
26cc52cb 276void Adaptation::Icap::Xaction::handleCommClosed()
774c051c 277{
278 mustStop("ICAP service connection externally closed");
279}
280
3ff65596
AR
281void Adaptation::Icap::Xaction::callException(const std::exception &e)
282{
283 setOutcome(xoError);
8277060a 284 service().noteFailure();
3ff65596
AR
285 Adaptation::Initiate::callException(e);
286}
287
288
26cc52cb 289void Adaptation::Icap::Xaction::callEnd()
774c051c 290{
c824c43b 291 if (doneWithIo()) {
292 debugs(93, 5, HERE << typeName << " done with I/O" << status());
293 closeConnection();
294 }
0bef8dd7 295 Adaptation::Initiate::callEnd(); // may destroy us
774c051c 296}
297
26cc52cb 298bool Adaptation::Icap::Xaction::doneAll() const
774c051c 299{
0bef8dd7 300 return !connector && !reader && !writer && Adaptation::Initiate::doneAll();
774c051c 301}
302
26cc52cb 303void Adaptation::Icap::Xaction::updateTimeout()
9e008dda 304{
bd7f2ede 305 if (reader != NULL || writer != NULL) {
c99de607 306 // restart the timeout before each I/O
307 // XXX: why does Config.Timeout lacks a write timeout?
cfc68405 308 // TODO: service bypass status may differ from that of a transaction
26cc52cb
AR
309 typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommTimeoutCbParams> TimeoutDialer;
310 AsyncCall::Pointer call = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommTimedout",
311 TimeoutDialer(this,&Adaptation::Icap::Xaction::noteCommTimedout));
bd7f2ede 312
2413d60a 313 commSetTimeout(connection->fd,
26cc52cb 314 TheConfig.io_timeout(service().cfg().bypass), call);
c99de607 315 } else {
316 // clear timeout when there is no I/O
317 // Do we need a lifetime timeout?
9e008dda 318 AsyncCall::Pointer call = NULL;
2413d60a 319 commSetTimeout(connection->fd, -1, call);
c99de607 320 }
321}
322
26cc52cb 323void Adaptation::Icap::Xaction::scheduleRead()
774c051c 324{
2413d60a 325 Must(connection->isOpen());
774c051c 326 Must(!reader);
327 Must(readBuf.hasSpace());
328
774c051c 329 /*
26cc52cb 330 * See comments in Adaptation::Icap::Xaction.h about why we use commBuf
bb790702 331 * here instead of reading directly into readBuf.buf.
774c051c 332 */
26cc52cb
AR
333 typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommIoCbParams> Dialer;
334 reader = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommRead",
335 Dialer(this, &Adaptation::Icap::Xaction::noteCommRead));
774c051c 336
2413d60a 337 comm_read(connection->fd, commBuf, readBuf.spaceSize(), reader);
c99de607 338 updateTimeout();
774c051c 339}
340
341// comm module read a portion of the ICAP response for us
26cc52cb 342void Adaptation::Icap::Xaction::noteCommRead(const CommIoCbParams &io)
774c051c 343{
bd7f2ede 344 Must(reader != NULL);
774c051c 345 reader = NULL;
346
bd7f2ede 347 Must(io.flag == COMM_OK);
348 Must(io.size >= 0);
774c051c 349
3ff65596
AR
350 al.icap.bytesRead+=io.size;
351
c99de607 352 updateTimeout();
353
bd7f2ede 354 debugs(93, 3, HERE << "read " << io.size << " bytes");
774c051c 355
356 /*
26cc52cb 357 * See comments in Adaptation::Icap::Xaction.h about why we use commBuf
bb790702 358 * here instead of reading directly into readBuf.buf.
774c051c 359 */
360
bd7f2ede 361 if (io.size > 0) {
362 readBuf.append(commBuf, io.size);
c824c43b 363 disableRetries(); // because pconn did not fail
364 } else {
365 reuseConnection = false;
774c051c 366 commEof = true;
c824c43b 367 }
774c051c 368
bd7f2ede 369 handleCommRead(io.size);
774c051c 370}
371
26cc52cb 372void Adaptation::Icap::Xaction::cancelRead()
774c051c 373{
bd7f2ede 374 if (reader != NULL) {
2413d60a 375 comm_read_cancel(connection->fd, reader);
bd7f2ede 376 reader = NULL;
774c051c 377 }
378}
379
26cc52cb 380bool Adaptation::Icap::Xaction::parseHttpMsg(HttpMsg *msg)
774c051c 381{
def17b6a 382 debugs(93, 5, HERE << "have " << readBuf.contentSize() << " head bytes to parse");
774c051c 383
384 http_status error = HTTP_STATUS_NONE;
385 const bool parsed = msg->parse(&readBuf, commEof, &error);
386 Must(parsed || !error); // success or need more data
387
388 if (!parsed) { // need more data
389 Must(mayReadMore());
390 msg->reset();
391 return false;
392 }
393
394 readBuf.consume(msg->hdr_sz);
395 return true;
396}
397
26cc52cb 398bool Adaptation::Icap::Xaction::mayReadMore() const
774c051c 399{
400 return !doneReading() && // will read more data
401 readBuf.hasSpace(); // have space for more data
402}
403
26cc52cb 404bool Adaptation::Icap::Xaction::doneReading() const
774c051c 405{
406 return commEof;
407}
408
26cc52cb 409bool Adaptation::Icap::Xaction::doneWriting() const
c99de607 410{
411 return !writer;
412}
413
26cc52cb 414bool Adaptation::Icap::Xaction::doneWithIo() const
c99de607 415{
2413d60a 416 return connection != NULL && connection->isOpen() && // or we could still be waiting to open it
9e008dda
AJ
417 !connector && !reader && !writer && // fast checks, some redundant
418 doneReading() && doneWriting();
c99de607 419}
420
c824c43b 421// initiator aborted
26cc52cb 422void Adaptation::Icap::Xaction::noteInitiatorAborted()
774c051c 423{
c824c43b 424
425 if (theInitiator) {
426 clearInitiator();
427 mustStop("initiator aborted");
c99de607 428 }
c824c43b 429
774c051c 430}
431
3ff65596
AR
432void Adaptation::Icap::Xaction::setOutcome(const Adaptation::Icap::XactOutcome &xo)
433{
434 if (al.icap.outcome != xoUnknown) {
435 debugs(93, 3, HERE << "Warning: reseting outcome: from " <<
e1381638 436 al.icap.outcome << " to " << xo);
3ff65596
AR
437 } else {
438 debugs(93, 4, HERE << xo);
439 }
440 al.icap.outcome = xo;
441}
442
5f8252d2 443// This 'last chance' method is called before a 'done' transaction is deleted.
444// It is wrong to call virtual methods from a destructor. Besides, this call
445// indicates that the transaction will terminate as planned.
26cc52cb 446void Adaptation::Icap::Xaction::swanSong()
774c051c 447{
5f8252d2 448 // kids should sing first and then call the parent method.
449
450 closeConnection(); // TODO: rename because we do not always close
451
452 if (!readBuf.isNull())
453 readBuf.clean();
454
455 if (commBuf)
456 memFreeBuf(commBufSize, commBuf);
774c051c 457
c824c43b 458 if (theInitiator)
3ff65596
AR
459 tellQueryAborted();
460
461 maybeLog();
774c051c 462
0bef8dd7 463 Adaptation::Initiate::swanSong();
774c051c 464}
465
e1381638
AJ
466void Adaptation::Icap::Xaction::tellQueryAborted()
467{
3ff65596
AR
468 Adaptation::Icap::Launcher *l = dynamic_cast<Adaptation::Icap::Launcher*>(theInitiator.ptr());
469 Adaptation::Icap::XactAbortInfo abortInfo(icapRequest, icapReply, retriable(), repeatable());
e1381638 470 CallJob(91, 5, __FILE__, __LINE__,
3ff65596
AR
471 "Adaptation::Icap::Launcher::noteXactAbort",
472 XactAbortCall(l, &Adaptation::Icap::Launcher::noteXactAbort, abortInfo) );
473 clearInitiator();
474}
475
476
e1381638
AJ
477void Adaptation::Icap::Xaction::maybeLog()
478{
479 if (IcapLogfileStatus == LOG_ENABLE) {
3ff65596
AR
480 ACLChecklist *checklist = new ACLFilledChecklist(::Config.accessList.icap, al.request, dash_str);
481 if (!::Config.accessList.icap || checklist->fastCheck()) {
482 finalizeLogInfo();
483 icapLogLog(&al, checklist);
484 }
485 accessLogFreeMemory(&al);
486 delete checklist;
487 }
488}
489
490void Adaptation::Icap::Xaction::finalizeLogInfo()
491{
492 //prepare log data
493 al.icp.opcode = ICP_INVALID;
e1381638 494
3ff65596
AR
495 const Adaptation::Icap::ServiceRep &s = service();
496 al.icap.hostAddr = s.cfg().host.termedBuf();
497 al.icap.serviceName = s.cfg().key;
498 al.icap.reqUri = s.cfg().uri;
e1381638 499
3ff65596
AR
500 al.icap.ioTime = tvSubMsec(icap_tio_start, icap_tio_finish);
501 al.icap.trTime = tvSubMsec(icap_tr_start, current_time);
502
503 al.icap.request = HTTPMSGLOCK(icapRequest);
504 if (icapReply) {
505 al.icap.reply = HTTPMSGLOCK(icapReply);
506 al.icap.resStatus = icapReply->sline.status;
507 }
508}
509
774c051c 510// returns a temporary string depicting transaction status, for debugging
26cc52cb 511const char *Adaptation::Icap::Xaction::status() const
774c051c 512{
513 static MemBuf buf;
514 buf.reset();
515
5f8252d2 516 buf.append(" [", 2);
774c051c 517
518 fillPendingStatus(buf);
519 buf.append("/", 1);
520 fillDoneStatus(buf);
521
5f8252d2 522 buf.Printf(" icapx%d]", id);
774c051c 523
524 buf.terminate();
525
526 return buf.content();
527}
528
26cc52cb 529void Adaptation::Icap::Xaction::fillPendingStatus(MemBuf &buf) const
774c051c 530{
2413d60a
AJ
531 if (connection->isOpen()) {
532 buf.Printf("FD %d", connection->fd);
774c051c 533
bd7f2ede 534 if (writer != NULL)
774c051c 535 buf.append("w", 1);
536
bd7f2ede 537 if (reader != NULL)
774c051c 538 buf.append("r", 1);
539
5f8252d2 540 buf.append(";", 1);
774c051c 541 }
542}
543
26cc52cb 544void Adaptation::Icap::Xaction::fillDoneStatus(MemBuf &buf) const
774c051c 545{
2413d60a
AJ
546 if (connection->isOpen() && commEof)
547 buf.Printf("Comm(%d)", connection->fd);
774c051c 548
549 if (stopReason != NULL)
550 buf.Printf("Stopped");
551}
3cfc19b3 552
26cc52cb 553bool Adaptation::Icap::Xaction::fillVirginHttpHeader(MemBuf &buf) const
3cfc19b3 554{
555 return false;
556}