]> git.ipfire.org Git - thirdparty/squid.git/blame - src/adaptation/icap/Xaction.cc
SourceLayout: Add Ip namespace for internal libip
[thirdparty/squid.git] / src / adaptation / icap / Xaction.cc
CommitLineData
774c051c 1/*
507d0a78 2 * DEBUG: section 93 ICAP (RFC 3507) Client
774c051c 3 */
4
5#include "squid.h"
6#include "comm.h"
bd7f2ede 7#include "CommCalls.h"
5f8252d2 8#include "HttpMsg.h"
26cc52cb 9#include "adaptation/icap/Xaction.h"
3ff65596 10#include "adaptation/icap/Launcher.h"
26cc52cb 11#include "adaptation/icap/Config.h"
3d93a84d 12#include "base/TextException.h"
781ce8ff 13#include "pconn.h"
3ff65596
AR
14#include "HttpRequest.h"
15#include "HttpReply.h"
16#include "acl/FilledChecklist.h"
17#include "icap_log.h"
781ce8ff 18#include "fde.h"
3ff65596 19#include "SquidTime.h"
781ce8ff 20
21static PconnPool *icapPconnPool = new PconnPool("ICAP Servers");
774c051c 22
5f8252d2 23
26cc52cb 24//CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Icap, Xaction);
5f8252d2 25
26cc52cb 26Adaptation::Icap::Xaction::Xaction(const char *aTypeName, Adaptation::Initiator *anInitiator, Adaptation::Icap::ServiceRep::Pointer &aService):
bd7f2ede 27 AsyncJob(aTypeName),
a22e6cd3 28 Adaptation::Initiate(aTypeName, anInitiator),
3ff65596
AR
29 icapRequest(NULL),
30 icapReply(NULL),
31 attempts(0),
774c051c 32 connection(-1),
a22e6cd3 33 theService(aService),
774c051c 34 commBuf(NULL), commBufSize(0),
35 commEof(false),
2dfede9e 36 reuseConnection(true),
c824c43b 37 isRetriable(true),
3ff65596 38 isRepeatable(true),
cfc68405 39 ignoreLastWrite(false),
c824c43b 40 connector(NULL), reader(NULL), writer(NULL), closer(NULL)
774c051c 41{
5f8252d2 42 debugs(93,3, typeName << " constructed, this=" << this <<
9e008dda 43 " [icapx" << id << ']'); // we should not call virtual status() here
3ff65596
AR
44 icapRequest = HTTPMSGLOCK(new HttpRequest);
45 icap_tr_start = current_time;
774c051c 46}
47
26cc52cb 48Adaptation::Icap::Xaction::~Xaction()
774c051c 49{
5f8252d2 50 debugs(93,3, typeName << " destructed, this=" << this <<
9e008dda 51 " [icapx" << id << ']'); // we should not call virtual status() here
3ff65596
AR
52 HTTPMSGUNLOCK(icapRequest);
53 HTTPMSGUNLOCK(icapReply);
5f8252d2 54}
55
26cc52cb
AR
56Adaptation::Icap::ServiceRep &
57Adaptation::Icap::Xaction::service()
0bef8dd7 58{
a22e6cd3
AR
59 Must(theService != NULL);
60 return *theService;
0bef8dd7
AR
61}
62
26cc52cb 63void Adaptation::Icap::Xaction::disableRetries()
9e008dda 64{
3ff65596
AR
65 debugs(93,5, typeName << (isRetriable ? " from now on" : " still") <<
66 " cannot be retried " << status());
c824c43b 67 isRetriable = false;
68}
69
3ff65596
AR
70void Adaptation::Icap::Xaction::disableRepeats(const char *reason)
71{
72 debugs(93,5, typeName << (isRepeatable ? " from now on" : " still") <<
73 " cannot be repeated because " << reason << status());
74 isRepeatable = false;
75}
76
26cc52cb 77void Adaptation::Icap::Xaction::start()
5f8252d2 78{
0bef8dd7 79 Adaptation::Initiate::start();
5f8252d2 80
81 readBuf.init(SQUID_TCP_SO_RCVBUF, SQUID_TCP_SO_RCVBUF);
82 commBuf = (char*)memAllocBuf(SQUID_TCP_SO_RCVBUF, &commBufSize);
83 // make sure maximum readBuf space does not exceed commBuf size
84 Must(static_cast<size_t>(readBuf.potentialSpaceSize()) <= commBufSize);
774c051c 85}
86
87// TODO: obey service-specific, OPTIONS-reported connection limit
26cc52cb 88void Adaptation::Icap::Xaction::openConnection()
774c051c 89{
b7ac5457 90 Ip::Address client_addr;
7aa6b7cb 91
c824c43b 92 Must(connection < 0);
93
0bef8dd7 94 const Adaptation::Service &s = service();
774c051c 95
26cc52cb 96 if (!TheConfig.reuse_connections)
560d7d2d 97 disableRetries(); // this will also safely drain pconn pool
98
c8ceec27 99 // TODO: check whether NULL domain is appropriate here
a7a42b14 100 connection = icapPconnPool->pop(s.cfg().host.termedBuf(), s.cfg().port, NULL, client_addr, isRetriable);
c8ceec27 101 if (connection >= 0) {
102 debugs(93,3, HERE << "reused pconn FD " << connection);
bd7f2ede 103
104 // fake the connect callback
26cc52cb
AR
105 // TODO: can we sync call Adaptation::Icap::Xaction::noteCommConnected here instead?
106 typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> Dialer;
107 Dialer dialer(this, &Adaptation::Icap::Xaction::noteCommConnected);
af6a12ee 108 dialer.params.fd = connection;
bd7f2ede 109 dialer.params.flag = COMM_OK;
110 // fake other parameters by copying from the existing connection
26cc52cb 111 connector = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected", dialer);
9e008dda 112 ScheduleCallHere(connector);
c8ceec27 113 return;
2dfede9e 114 }
115
c8ceec27 116 disableRetries(); // we only retry pconn failures
117
b7ac5457 118 Ip::Address outgoing;
9e008dda 119 connection = comm_open(SOCK_STREAM, 0, outgoing,
a7a42b14 120 COMM_NONBLOCKING, s.cfg().uri.termedBuf());
774c051c 121
c824c43b 122 if (connection < 0)
123 dieOnConnectionFailure(); // throws
774c051c 124
a7a42b14 125 debugs(93,3, typeName << " opens connection to " << s.cfg().host << ":" << s.cfg().port);
774c051c 126
cfc68405 127 // TODO: service bypass status may differ from that of a transaction
26cc52cb
AR
128 typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommTimeoutCbParams> TimeoutDialer;
129 AsyncCall::Pointer timeoutCall = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommTimedout",
130 TimeoutDialer(this,&Adaptation::Icap::Xaction::noteCommTimedout));
774c051c 131
26cc52cb 132 commSetTimeout(connection, TheConfig.connect_timeout(
9e008dda 133 service().cfg().bypass), timeoutCall);
774c051c 134
26cc52cb
AR
135 typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommCloseCbParams> CloseDialer;
136 closer = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed",
137 CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed));
bd7f2ede 138 comm_add_close_handler(connection, closer);
139
26cc52cb
AR
140 typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> ConnectDialer;
141 connector = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected",
142 ConnectDialer(this, &Adaptation::Icap::Xaction::noteCommConnected));
a7a42b14 143 commConnectStart(connection, s.cfg().host.termedBuf(), s.cfg().port, connector);
774c051c 144}
145
2dfede9e 146/*
147 * This event handler is necessary to work around the no-rentry policy
26cc52cb 148 * of Adaptation::Icap::Xaction::callStart()
2dfede9e 149 */
bd7f2ede 150#if 0
2dfede9e 151void
26cc52cb 152Adaptation::Icap::Xaction::reusedConnection(void *data)
2dfede9e 153{
192378eb 154 debugs(93, 5, HERE << "reused connection");
26cc52cb 155 Adaptation::Icap::Xaction *x = (Adaptation::Icap::Xaction*)data;
2dfede9e 156 x->noteCommConnected(COMM_OK);
157}
bd7f2ede 158#endif
2dfede9e 159
26cc52cb 160void Adaptation::Icap::Xaction::closeConnection()
774c051c 161{
162 if (connection >= 0) {
774c051c 163
bd7f2ede 164 if (closer != NULL) {
165 comm_remove_close_handler(connection, closer);
774c051c 166 closer = NULL;
167 }
168
c99de607 169 cancelRead(); // may not work
170
5f8252d2 171 if (reuseConnection && !doneWithIo()) {
dd6e6148 172 //status() adds leading spaces.
5f8252d2 173 debugs(93,5, HERE << "not reusing pconn due to pending I/O" << status());
c99de607 174 reuseConnection = false;
175 }
774c051c 176
2dfede9e 177 if (reuseConnection) {
b7ac5457 178 Ip::Address client_addr;
dd6e6148 179 //status() adds leading spaces.
5f8252d2 180 debugs(93,3, HERE << "pushing pconn" << status());
9e008dda
AJ
181 AsyncCall::Pointer call = NULL;
182 commSetTimeout(connection, -1, call);
a7a42b14 183 icapPconnPool->push(connection, theService->cfg().host.termedBuf(),
9e008dda 184 theService->cfg().port, NULL, client_addr);
c824c43b 185 disableRetries();
2dfede9e 186 } else {
dd6e6148 187 //status() adds leading spaces.
5f8252d2 188 debugs(93,3, HERE << "closing pconn" << status());
c99de607 189 // comm_close will clear timeout
2dfede9e 190 comm_close(connection);
191 }
774c051c 192
c99de607 193 writer = NULL;
194 reader = NULL;
774c051c 195 connector = NULL;
196 connection = -1;
197 }
198}
199
200// connection with the ICAP service established
26cc52cb 201void Adaptation::Icap::Xaction::noteCommConnected(const CommConnectCbParams &io)
774c051c 202{
bd7f2ede 203 Must(connector != NULL);
774c051c 204 connector = NULL;
c99de607 205
bd7f2ede 206 if (io.flag != COMM_OK)
c99de607 207 dieOnConnectionFailure(); // throws
208
209 fd_table[connection].noteUse(icapPconnPool);
774c051c 210
211 handleCommConnected();
774c051c 212}
213
26cc52cb 214void Adaptation::Icap::Xaction::dieOnConnectionFailure()
9e008dda 215{
4932ad93 216 debugs(93, 2, HERE << typeName <<
9e008dda 217 " failed to connect to " << service().cfg().uri);
c99de607 218 throw TexcHere("cannot connect to the ICAP service");
219}
220
26cc52cb 221void Adaptation::Icap::Xaction::scheduleWrite(MemBuf &buf)
774c051c 222{
223 // comm module will free the buffer
26cc52cb
AR
224 typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommIoCbParams> Dialer;
225 writer = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommWrote",
226 Dialer(this, &Adaptation::Icap::Xaction::noteCommWrote));
bd7f2ede 227
228 comm_write_mbuf(connection, &buf, writer);
c99de607 229 updateTimeout();
774c051c 230}
231
26cc52cb 232void Adaptation::Icap::Xaction::noteCommWrote(const CommIoCbParams &io)
774c051c 233{
bd7f2ede 234 Must(writer != NULL);
774c051c 235 writer = NULL;
9e008dda 236
cfc68405 237 if (ignoreLastWrite) {
238 // a hack due to comm inability to cancel a pending write
9e008dda 239 ignoreLastWrite = false;
bd7f2ede 240 debugs(93, 7, HERE << "ignoring last write; status: " << io.flag);
cfc68405 241 } else {
bd7f2ede 242 Must(io.flag == COMM_OK);
3ff65596 243 al.icap.bytesSent += io.size;
cfc68405 244 updateTimeout();
bd7f2ede 245 handleCommWrote(io.size);
cfc68405 246 }
774c051c 247}
248
249// communication timeout with the ICAP service
26cc52cb 250void Adaptation::Icap::Xaction::noteCommTimedout(const CommTimeoutCbParams &io)
774c051c 251{
774c051c 252 handleCommTimedout();
774c051c 253}
254
26cc52cb 255void Adaptation::Icap::Xaction::handleCommTimedout()
774c051c 256{
4932ad93 257 debugs(93, 2, HERE << typeName << " failed: timeout with " <<
9e008dda 258 theService->cfg().methodStr() << " " <<
a7a42b14 259 theService->cfg().uri << status());
fe3e2600 260 reuseConnection = false;
bd7f2ede 261 throw TexcHere(connector != NULL ?
9e008dda
AJ
262 "timed out while connecting to the ICAP service" :
263 "timed out while talking to the ICAP service");
774c051c 264}
265
266// unexpected connection close while talking to the ICAP service
26cc52cb 267void Adaptation::Icap::Xaction::noteCommClosed(const CommCloseCbParams &io)
774c051c 268{
269 closer = NULL;
774c051c 270 handleCommClosed();
774c051c 271}
272
26cc52cb 273void Adaptation::Icap::Xaction::handleCommClosed()
774c051c 274{
275 mustStop("ICAP service connection externally closed");
276}
277
3ff65596
AR
278void Adaptation::Icap::Xaction::callException(const std::exception &e)
279{
280 setOutcome(xoError);
8277060a 281 service().noteFailure();
3ff65596
AR
282 Adaptation::Initiate::callException(e);
283}
284
285
26cc52cb 286void Adaptation::Icap::Xaction::callEnd()
774c051c 287{
c824c43b 288 if (doneWithIo()) {
289 debugs(93, 5, HERE << typeName << " done with I/O" << status());
290 closeConnection();
291 }
0bef8dd7 292 Adaptation::Initiate::callEnd(); // may destroy us
774c051c 293}
294
26cc52cb 295bool Adaptation::Icap::Xaction::doneAll() const
774c051c 296{
0bef8dd7 297 return !connector && !reader && !writer && Adaptation::Initiate::doneAll();
774c051c 298}
299
26cc52cb 300void Adaptation::Icap::Xaction::updateTimeout()
9e008dda 301{
bd7f2ede 302 if (reader != NULL || writer != NULL) {
c99de607 303 // restart the timeout before each I/O
304 // XXX: why does Config.Timeout lacks a write timeout?
cfc68405 305 // TODO: service bypass status may differ from that of a transaction
26cc52cb
AR
306 typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommTimeoutCbParams> TimeoutDialer;
307 AsyncCall::Pointer call = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommTimedout",
308 TimeoutDialer(this,&Adaptation::Icap::Xaction::noteCommTimedout));
bd7f2ede 309
9e008dda 310 commSetTimeout(connection,
26cc52cb 311 TheConfig.io_timeout(service().cfg().bypass), call);
c99de607 312 } else {
313 // clear timeout when there is no I/O
314 // Do we need a lifetime timeout?
9e008dda 315 AsyncCall::Pointer call = NULL;
bd7f2ede 316 commSetTimeout(connection, -1, call);
c99de607 317 }
318}
319
26cc52cb 320void Adaptation::Icap::Xaction::scheduleRead()
774c051c 321{
322 Must(connection >= 0);
323 Must(!reader);
324 Must(readBuf.hasSpace());
325
774c051c 326 /*
26cc52cb 327 * See comments in Adaptation::Icap::Xaction.h about why we use commBuf
bb790702 328 * here instead of reading directly into readBuf.buf.
774c051c 329 */
26cc52cb
AR
330 typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommIoCbParams> Dialer;
331 reader = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommRead",
332 Dialer(this, &Adaptation::Icap::Xaction::noteCommRead));
774c051c 333
bd7f2ede 334 comm_read(connection, commBuf, readBuf.spaceSize(), reader);
c99de607 335 updateTimeout();
774c051c 336}
337
338// comm module read a portion of the ICAP response for us
26cc52cb 339void Adaptation::Icap::Xaction::noteCommRead(const CommIoCbParams &io)
774c051c 340{
bd7f2ede 341 Must(reader != NULL);
774c051c 342 reader = NULL;
343
bd7f2ede 344 Must(io.flag == COMM_OK);
345 Must(io.size >= 0);
774c051c 346
3ff65596
AR
347 al.icap.bytesRead+=io.size;
348
c99de607 349 updateTimeout();
350
bd7f2ede 351 debugs(93, 3, HERE << "read " << io.size << " bytes");
774c051c 352
353 /*
26cc52cb 354 * See comments in Adaptation::Icap::Xaction.h about why we use commBuf
bb790702 355 * here instead of reading directly into readBuf.buf.
774c051c 356 */
357
bd7f2ede 358 if (io.size > 0) {
359 readBuf.append(commBuf, io.size);
c824c43b 360 disableRetries(); // because pconn did not fail
361 } else {
362 reuseConnection = false;
774c051c 363 commEof = true;
c824c43b 364 }
774c051c 365
bd7f2ede 366 handleCommRead(io.size);
774c051c 367}
368
26cc52cb 369void Adaptation::Icap::Xaction::cancelRead()
774c051c 370{
bd7f2ede 371 if (reader != NULL) {
372 comm_read_cancel(connection, reader);
373 reader = NULL;
774c051c 374 }
375}
376
26cc52cb 377bool Adaptation::Icap::Xaction::parseHttpMsg(HttpMsg *msg)
774c051c 378{
def17b6a 379 debugs(93, 5, HERE << "have " << readBuf.contentSize() << " head bytes to parse");
774c051c 380
381 http_status error = HTTP_STATUS_NONE;
382 const bool parsed = msg->parse(&readBuf, commEof, &error);
383 Must(parsed || !error); // success or need more data
384
385 if (!parsed) { // need more data
386 Must(mayReadMore());
387 msg->reset();
388 return false;
389 }
390
391 readBuf.consume(msg->hdr_sz);
392 return true;
393}
394
26cc52cb 395bool Adaptation::Icap::Xaction::mayReadMore() const
774c051c 396{
397 return !doneReading() && // will read more data
398 readBuf.hasSpace(); // have space for more data
399}
400
26cc52cb 401bool Adaptation::Icap::Xaction::doneReading() const
774c051c 402{
403 return commEof;
404}
405
26cc52cb 406bool Adaptation::Icap::Xaction::doneWriting() const
c99de607 407{
408 return !writer;
409}
410
26cc52cb 411bool Adaptation::Icap::Xaction::doneWithIo() const
c99de607 412{
413 return connection >= 0 && // or we could still be waiting to open it
9e008dda
AJ
414 !connector && !reader && !writer && // fast checks, some redundant
415 doneReading() && doneWriting();
c99de607 416}
417
c824c43b 418// initiator aborted
26cc52cb 419void Adaptation::Icap::Xaction::noteInitiatorAborted()
774c051c 420{
c824c43b 421
422 if (theInitiator) {
423 clearInitiator();
424 mustStop("initiator aborted");
c99de607 425 }
c824c43b 426
774c051c 427}
428
3ff65596
AR
429void Adaptation::Icap::Xaction::setOutcome(const Adaptation::Icap::XactOutcome &xo)
430{
431 if (al.icap.outcome != xoUnknown) {
432 debugs(93, 3, HERE << "Warning: reseting outcome: from " <<
e1381638 433 al.icap.outcome << " to " << xo);
3ff65596
AR
434 } else {
435 debugs(93, 4, HERE << xo);
436 }
437 al.icap.outcome = xo;
438}
439
5f8252d2 440// This 'last chance' method is called before a 'done' transaction is deleted.
441// It is wrong to call virtual methods from a destructor. Besides, this call
442// indicates that the transaction will terminate as planned.
26cc52cb 443void Adaptation::Icap::Xaction::swanSong()
774c051c 444{
5f8252d2 445 // kids should sing first and then call the parent method.
446
447 closeConnection(); // TODO: rename because we do not always close
448
449 if (!readBuf.isNull())
450 readBuf.clean();
451
452 if (commBuf)
453 memFreeBuf(commBufSize, commBuf);
774c051c 454
c824c43b 455 if (theInitiator)
3ff65596
AR
456 tellQueryAborted();
457
458 maybeLog();
774c051c 459
0bef8dd7 460 Adaptation::Initiate::swanSong();
774c051c 461}
462
e1381638
AJ
463void Adaptation::Icap::Xaction::tellQueryAborted()
464{
3ff65596
AR
465 Adaptation::Icap::Launcher *l = dynamic_cast<Adaptation::Icap::Launcher*>(theInitiator.ptr());
466 Adaptation::Icap::XactAbortInfo abortInfo(icapRequest, icapReply, retriable(), repeatable());
e1381638 467 CallJob(91, 5, __FILE__, __LINE__,
3ff65596
AR
468 "Adaptation::Icap::Launcher::noteXactAbort",
469 XactAbortCall(l, &Adaptation::Icap::Launcher::noteXactAbort, abortInfo) );
470 clearInitiator();
471}
472
473
e1381638
AJ
474void Adaptation::Icap::Xaction::maybeLog()
475{
476 if (IcapLogfileStatus == LOG_ENABLE) {
3ff65596
AR
477 ACLChecklist *checklist = new ACLFilledChecklist(::Config.accessList.icap, al.request, dash_str);
478 if (!::Config.accessList.icap || checklist->fastCheck()) {
479 finalizeLogInfo();
480 icapLogLog(&al, checklist);
481 }
482 accessLogFreeMemory(&al);
483 delete checklist;
484 }
485}
486
487void Adaptation::Icap::Xaction::finalizeLogInfo()
488{
489 //prepare log data
490 al.icp.opcode = ICP_INVALID;
e1381638 491
3ff65596
AR
492 const Adaptation::Icap::ServiceRep &s = service();
493 al.icap.hostAddr = s.cfg().host.termedBuf();
494 al.icap.serviceName = s.cfg().key;
495 al.icap.reqUri = s.cfg().uri;
e1381638 496
3ff65596
AR
497 al.icap.ioTime = tvSubMsec(icap_tio_start, icap_tio_finish);
498 al.icap.trTime = tvSubMsec(icap_tr_start, current_time);
499
500 al.icap.request = HTTPMSGLOCK(icapRequest);
501 if (icapReply) {
502 al.icap.reply = HTTPMSGLOCK(icapReply);
503 al.icap.resStatus = icapReply->sline.status;
504 }
505}
506
774c051c 507// returns a temporary string depicting transaction status, for debugging
26cc52cb 508const char *Adaptation::Icap::Xaction::status() const
774c051c 509{
510 static MemBuf buf;
511 buf.reset();
512
5f8252d2 513 buf.append(" [", 2);
774c051c 514
515 fillPendingStatus(buf);
516 buf.append("/", 1);
517 fillDoneStatus(buf);
518
5f8252d2 519 buf.Printf(" icapx%d]", id);
774c051c 520
521 buf.terminate();
522
523 return buf.content();
524}
525
26cc52cb 526void Adaptation::Icap::Xaction::fillPendingStatus(MemBuf &buf) const
774c051c 527{
528 if (connection >= 0) {
5f8252d2 529 buf.Printf("FD %d", connection);
774c051c 530
bd7f2ede 531 if (writer != NULL)
774c051c 532 buf.append("w", 1);
533
bd7f2ede 534 if (reader != NULL)
774c051c 535 buf.append("r", 1);
536
5f8252d2 537 buf.append(";", 1);
774c051c 538 }
539}
540
26cc52cb 541void Adaptation::Icap::Xaction::fillDoneStatus(MemBuf &buf) const
774c051c 542{
543 if (connection >= 0 && commEof)
544 buf.Printf("Comm(%d)", connection);
545
546 if (stopReason != NULL)
547 buf.Printf("Stopped");
548}
3cfc19b3 549
26cc52cb 550bool Adaptation::Icap::Xaction::fillVirginHttpHeader(MemBuf &buf) const
3cfc19b3 551{
552 return false;
553}