]> git.ipfire.org Git - thirdparty/squid.git/blob - src/adaptation/icap/Xaction.cc
Deprecate log_icap and log_access configuration directives
[thirdparty/squid.git] / src / adaptation / icap / Xaction.cc
1 /*
2 * DEBUG: section 93 ICAP (RFC 3507) Client
3 */
4
5 #include "squid.h"
6 #include "adaptation/icap/Config.h"
7 #include "adaptation/icap/Launcher.h"
8 #include "adaptation/icap/Xaction.h"
9 #include "base/TextException.h"
10 #include "comm.h"
11 #include "comm/Connection.h"
12 #include "comm/ConnOpener.h"
13 #include "comm/Write.h"
14 #include "CommCalls.h"
15 #include "err_detail_type.h"
16 #include "fde.h"
17 #include "FwdState.h"
18 #include "HttpMsg.h"
19 #include "HttpReply.h"
20 #include "HttpRequest.h"
21 #include "icap_log.h"
22 #include "ipcache.h"
23 #include "Mem.h"
24 #include "pconn.h"
25 #include "SquidConfig.h"
26 #include "SquidTime.h"
27
28 //CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Icap, Xaction);
29
30 Adaptation::Icap::Xaction::Xaction(const char *aTypeName, Adaptation::Icap::ServiceRep::Pointer &aService):
31 AsyncJob(aTypeName),
32 Adaptation::Initiate(aTypeName),
33 icapRequest(NULL),
34 icapReply(NULL),
35 attempts(0),
36 connection(NULL),
37 theService(aService),
38 commBuf(NULL), commBufSize(0),
39 commEof(false),
40 reuseConnection(true),
41 isRetriable(true),
42 isRepeatable(true),
43 ignoreLastWrite(false),
44 connector(NULL), reader(NULL), writer(NULL), closer(NULL),
45 alep(new AccessLogEntry),
46 al(*alep),
47 cs(NULL)
48 {
49 debugs(93,3, typeName << " constructed, this=" << this <<
50 " [icapx" << id << ']'); // we should not call virtual status() here
51 icapRequest = new HttpRequest;
52 HTTPMSGLOCK(icapRequest);
53 icap_tr_start = current_time;
54 }
55
56 Adaptation::Icap::Xaction::~Xaction()
57 {
58 debugs(93,3, typeName << " destructed, this=" << this <<
59 " [icapx" << id << ']'); // we should not call virtual status() here
60 HTTPMSGUNLOCK(icapRequest);
61 }
62
63 Adaptation::Icap::ServiceRep &
64 Adaptation::Icap::Xaction::service()
65 {
66 Must(theService != NULL);
67 return *theService;
68 }
69
70 void Adaptation::Icap::Xaction::disableRetries()
71 {
72 debugs(93,5, typeName << (isRetriable ? " from now on" : " still") <<
73 " cannot be retried " << status());
74 isRetriable = false;
75 }
76
77 void Adaptation::Icap::Xaction::disableRepeats(const char *reason)
78 {
79 debugs(93,5, typeName << (isRepeatable ? " from now on" : " still") <<
80 " cannot be repeated because " << reason << status());
81 isRepeatable = false;
82 }
83
84 void Adaptation::Icap::Xaction::start()
85 {
86 Adaptation::Initiate::start();
87
88 readBuf.init(SQUID_TCP_SO_RCVBUF, SQUID_TCP_SO_RCVBUF);
89 commBuf = (char*)memAllocBuf(SQUID_TCP_SO_RCVBUF, &commBufSize);
90 // make sure maximum readBuf space does not exceed commBuf size
91 Must(static_cast<size_t>(readBuf.potentialSpaceSize()) <= commBufSize);
92 }
93
94 static void
95 icapLookupDnsResults(const ipcache_addrs *ia, const DnsLookupDetails &, void *data)
96 {
97 Adaptation::Icap::Xaction *xa = static_cast<Adaptation::Icap::Xaction *>(data);
98 xa->dnsLookupDone(ia);
99 }
100
101 // TODO: obey service-specific, OPTIONS-reported connection limit
102 void
103 Adaptation::Icap::Xaction::openConnection()
104 {
105 Must(!haveConnection());
106
107 Adaptation::Icap::ServiceRep &s = service();
108
109 if (!TheConfig.reuse_connections)
110 disableRetries(); // this will also safely drain pconn pool
111
112 bool wasReused = false;
113 connection = s.getConnection(isRetriable, wasReused);
114
115 if (wasReused && Comm::IsConnOpen(connection)) {
116 // Set comm Close handler
117 // fake the connect callback
118 // TODO: can we sync call Adaptation::Icap::Xaction::noteCommConnected here instead?
119 typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> Dialer;
120 CbcPointer<Xaction> self(this);
121 Dialer dialer(self, &Adaptation::Icap::Xaction::noteCommConnected);
122 dialer.params.conn = connection;
123 dialer.params.flag = COMM_OK;
124 // fake other parameters by copying from the existing connection
125 connector = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected", dialer);
126 ScheduleCallHere(connector);
127 return;
128 }
129
130 disableRetries(); // we only retry pconn failures
131
132 // Attempt to open a new connection...
133 debugs(93,3, typeName << " opens connection to " << s.cfg().host.termedBuf() << ":" << s.cfg().port);
134
135 // Locate the Service IP(s) to open
136 ipcache_nbgethostbyname(s.cfg().host.termedBuf(), icapLookupDnsResults, this);
137 }
138
139 void
140 Adaptation::Icap::Xaction::dnsLookupDone(const ipcache_addrs *ia)
141 {
142 Adaptation::Icap::ServiceRep &s = service();
143
144 if (ia == NULL) {
145 debugs(44, DBG_IMPORTANT, "ICAP: Unknown service host: " << s.cfg().host);
146
147 #if WHEN_IPCACHE_NBGETHOSTBYNAME_USES_ASYNC_CALLS
148 dieOnConnectionFailure(); // throws
149 #else // take a step back into protected Async call dialing.
150 // fake the connect callback
151 typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> Dialer;
152 CbcPointer<Xaction> self(this);
153 Dialer dialer(self, &Adaptation::Icap::Xaction::noteCommConnected);
154 dialer.params.conn = connection;
155 dialer.params.flag = COMM_ERROR;
156 // fake other parameters by copying from the existing connection
157 connector = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected", dialer);
158 ScheduleCallHere(connector);
159 #endif
160 return;
161 }
162
163 assert(ia->cur < ia->count);
164
165 connection = new Comm::Connection;
166 connection->remote = ia->in_addrs[ia->cur];
167 connection->remote.port(s.cfg().port);
168 getOutgoingAddress(NULL, connection);
169
170 // TODO: service bypass status may differ from that of a transaction
171 typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> ConnectDialer;
172 connector = JobCallback(93,3, ConnectDialer, this, Adaptation::Icap::Xaction::noteCommConnected);
173 cs = new Comm::ConnOpener(connection, connector, TheConfig.connect_timeout(service().cfg().bypass));
174 cs->setHost(s.cfg().host.termedBuf());
175 AsyncJob::Start(cs);
176 }
177
178 /*
179 * This event handler is necessary to work around the no-rentry policy
180 * of Adaptation::Icap::Xaction::callStart()
181 */
182 #if 0
183 void
184 Adaptation::Icap::Xaction::reusedConnection(void *data)
185 {
186 debugs(93, 5, HERE << "reused connection");
187 Adaptation::Icap::Xaction *x = (Adaptation::Icap::Xaction*)data;
188 x->noteCommConnected(COMM_OK);
189 }
190 #endif
191
192 void Adaptation::Icap::Xaction::closeConnection()
193 {
194 if (haveConnection()) {
195
196 if (closer != NULL) {
197 comm_remove_close_handler(connection->fd, closer);
198 closer = NULL;
199 }
200
201 cancelRead(); // may not work
202
203 if (reuseConnection && !doneWithIo()) {
204 //status() adds leading spaces.
205 debugs(93,5, HERE << "not reusing pconn due to pending I/O" << status());
206 reuseConnection = false;
207 }
208
209 if (reuseConnection)
210 disableRetries();
211
212 const bool reset = !reuseConnection &&
213 (al.icap.outcome == xoGone || al.icap.outcome == xoError);
214
215 Adaptation::Icap::ServiceRep &s = service();
216 s.putConnection(connection, reuseConnection, reset, status());
217
218 writer = NULL;
219 reader = NULL;
220 connector = NULL;
221 connection = NULL;
222 }
223 }
224
225 // connection with the ICAP service established
226 void Adaptation::Icap::Xaction::noteCommConnected(const CommConnectCbParams &io)
227 {
228 cs = NULL;
229
230 if (io.flag == COMM_TIMEOUT) {
231 handleCommTimedout();
232 return;
233 }
234
235 Must(connector != NULL);
236 connector = NULL;
237
238 if (io.flag != COMM_OK)
239 dieOnConnectionFailure(); // throws
240
241 typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommTimeoutCbParams> TimeoutDialer;
242 AsyncCall::Pointer timeoutCall = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommTimedout",
243 TimeoutDialer(this,&Adaptation::Icap::Xaction::noteCommTimedout));
244 commSetConnTimeout(io.conn, TheConfig.connect_timeout(service().cfg().bypass), timeoutCall);
245
246 typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommCloseCbParams> CloseDialer;
247 closer = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed",
248 CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed));
249 comm_add_close_handler(io.conn->fd, closer);
250
251 // ?? fd_table[io.conn->fd].noteUse(icapPconnPool);
252 service().noteConnectionUse(connection);
253
254 handleCommConnected();
255 }
256
257 void Adaptation::Icap::Xaction::dieOnConnectionFailure()
258 {
259 debugs(93, 2, HERE << typeName <<
260 " failed to connect to " << service().cfg().uri);
261 service().noteConnectionFailed("failure");
262 detailError(ERR_DETAIL_ICAP_XACT_START);
263 throw TexcHere("cannot connect to the ICAP service");
264 }
265
266 void Adaptation::Icap::Xaction::scheduleWrite(MemBuf &buf)
267 {
268 Must(haveConnection());
269
270 // comm module will free the buffer
271 typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommIoCbParams> Dialer;
272 writer = JobCallback(93, 3,
273 Dialer, this, Adaptation::Icap::Xaction::noteCommWrote);
274
275 Comm::Write(connection, &buf, writer);
276 updateTimeout();
277 }
278
279 void Adaptation::Icap::Xaction::noteCommWrote(const CommIoCbParams &io)
280 {
281 Must(writer != NULL);
282 writer = NULL;
283
284 if (ignoreLastWrite) {
285 // a hack due to comm inability to cancel a pending write
286 ignoreLastWrite = false;
287 debugs(93, 7, HERE << "ignoring last write; status: " << io.flag);
288 } else {
289 Must(io.flag == COMM_OK);
290 al.icap.bytesSent += io.size;
291 updateTimeout();
292 handleCommWrote(io.size);
293 }
294 }
295
296 // communication timeout with the ICAP service
297 void Adaptation::Icap::Xaction::noteCommTimedout(const CommTimeoutCbParams &io)
298 {
299 handleCommTimedout();
300 }
301
302 void Adaptation::Icap::Xaction::handleCommTimedout()
303 {
304 debugs(93, 2, HERE << typeName << " failed: timeout with " <<
305 theService->cfg().methodStr() << " " <<
306 theService->cfg().uri << status());
307 reuseConnection = false;
308 const bool whileConnecting = connector != NULL;
309 if (whileConnecting) {
310 assert(!haveConnection());
311 theService->noteConnectionFailed("timedout");
312 } else
313 closeConnection(); // so that late Comm callbacks do not disturb bypass
314 throw TexcHere(whileConnecting ?
315 "timed out while connecting to the ICAP service" :
316 "timed out while talking to the ICAP service");
317 }
318
319 // unexpected connection close while talking to the ICAP service
320 void Adaptation::Icap::Xaction::noteCommClosed(const CommCloseCbParams &io)
321 {
322 closer = NULL;
323 handleCommClosed();
324 }
325
326 void Adaptation::Icap::Xaction::handleCommClosed()
327 {
328 detailError(ERR_DETAIL_ICAP_XACT_CLOSE);
329 mustStop("ICAP service connection externally closed");
330 }
331
332 void Adaptation::Icap::Xaction::callException(const std::exception &e)
333 {
334 setOutcome(xoError);
335 service().noteFailure();
336 Adaptation::Initiate::callException(e);
337 }
338
339 void Adaptation::Icap::Xaction::callEnd()
340 {
341 if (doneWithIo()) {
342 debugs(93, 5, HERE << typeName << " done with I/O" << status());
343 closeConnection();
344 }
345 Adaptation::Initiate::callEnd(); // may destroy us
346 }
347
348 bool Adaptation::Icap::Xaction::doneAll() const
349 {
350 return !connector && !reader && !writer && Adaptation::Initiate::doneAll();
351 }
352
353 void Adaptation::Icap::Xaction::updateTimeout()
354 {
355 Must(haveConnection());
356
357 if (reader != NULL || writer != NULL) {
358 // restart the timeout before each I/O
359 // XXX: why does Config.Timeout lacks a write timeout?
360 // TODO: service bypass status may differ from that of a transaction
361 typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommTimeoutCbParams> TimeoutDialer;
362 AsyncCall::Pointer call = JobCallback(93, 5, TimeoutDialer, this, Adaptation::Icap::Xaction::noteCommTimedout);
363 commSetConnTimeout(connection, TheConfig.io_timeout(service().cfg().bypass), call);
364 } else {
365 // clear timeout when there is no I/O
366 // Do we need a lifetime timeout?
367 commUnsetConnTimeout(connection);
368 }
369 }
370
371 void Adaptation::Icap::Xaction::scheduleRead()
372 {
373 Must(haveConnection());
374 Must(!reader);
375 Must(readBuf.hasSpace());
376
377 /*
378 * See comments in Adaptation::Icap::Xaction.h about why we use commBuf
379 * here instead of reading directly into readBuf.buf.
380 */
381 typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommIoCbParams> Dialer;
382 reader = JobCallback(93, 3,
383 Dialer, this, Adaptation::Icap::Xaction::noteCommRead);
384
385 comm_read(connection, commBuf, readBuf.spaceSize(), reader);
386 updateTimeout();
387 }
388
389 // comm module read a portion of the ICAP response for us
390 void Adaptation::Icap::Xaction::noteCommRead(const CommIoCbParams &io)
391 {
392 Must(reader != NULL);
393 reader = NULL;
394
395 Must(io.flag == COMM_OK);
396
397 if (!io.size) {
398 commEof = true;
399 reuseConnection = false;
400
401 // detect a pconn race condition: eof on the first pconn read
402 if (!al.icap.bytesRead && retriable()) {
403 setOutcome(xoRace);
404 mustStop("pconn race");
405 return;
406 }
407 } else {
408
409 al.icap.bytesRead+=io.size;
410
411 updateTimeout();
412
413 debugs(93, 3, HERE << "read " << io.size << " bytes");
414
415 /*
416 * See comments in Adaptation::Icap::Xaction.h about why we use commBuf
417 * here instead of reading directly into readBuf.buf.
418 */
419
420 readBuf.append(commBuf, io.size);
421 disableRetries(); // because pconn did not fail
422 }
423
424 handleCommRead(io.size);
425 }
426
427 void Adaptation::Icap::Xaction::cancelRead()
428 {
429 if (reader != NULL) {
430 Must(haveConnection());
431 comm_read_cancel(connection->fd, reader);
432 reader = NULL;
433 }
434 }
435
436 bool Adaptation::Icap::Xaction::parseHttpMsg(HttpMsg *msg)
437 {
438 debugs(93, 5, HERE << "have " << readBuf.contentSize() << " head bytes to parse");
439
440 Http::StatusCode error = Http::scNone;
441 const bool parsed = msg->parse(&readBuf, commEof, &error);
442 Must(parsed || !error); // success or need more data
443
444 if (!parsed) { // need more data
445 Must(mayReadMore());
446 msg->reset();
447 return false;
448 }
449
450 readBuf.consume(msg->hdr_sz);
451 return true;
452 }
453
454 bool Adaptation::Icap::Xaction::mayReadMore() const
455 {
456 return !doneReading() && // will read more data
457 readBuf.hasSpace(); // have space for more data
458 }
459
460 bool Adaptation::Icap::Xaction::doneReading() const
461 {
462 return commEof;
463 }
464
465 bool Adaptation::Icap::Xaction::doneWriting() const
466 {
467 return !writer;
468 }
469
470 bool Adaptation::Icap::Xaction::doneWithIo() const
471 {
472 return haveConnection() &&
473 !connector && !reader && !writer && // fast checks, some redundant
474 doneReading() && doneWriting();
475 }
476
477 bool Adaptation::Icap::Xaction::haveConnection() const
478 {
479 return connection != NULL && connection->isOpen();
480 }
481
482 // initiator aborted
483 void Adaptation::Icap::Xaction::noteInitiatorAborted()
484 {
485
486 if (theInitiator.set()) {
487 debugs(93,4, HERE << "Initiator gone before ICAP transaction ended");
488 clearInitiator();
489 detailError(ERR_DETAIL_ICAP_INIT_GONE);
490 setOutcome(xoGone);
491 mustStop("initiator aborted");
492 }
493
494 }
495
496 void Adaptation::Icap::Xaction::setOutcome(const Adaptation::Icap::XactOutcome &xo)
497 {
498 if (al.icap.outcome != xoUnknown) {
499 debugs(93, 3, HERE << "Warning: reseting outcome: from " <<
500 al.icap.outcome << " to " << xo);
501 } else {
502 debugs(93, 4, HERE << xo);
503 }
504 al.icap.outcome = xo;
505 }
506
507 // This 'last chance' method is called before a 'done' transaction is deleted.
508 // It is wrong to call virtual methods from a destructor. Besides, this call
509 // indicates that the transaction will terminate as planned.
510 void Adaptation::Icap::Xaction::swanSong()
511 {
512 // kids should sing first and then call the parent method.
513 if (cs) {
514 debugs(93,6, HERE << id << " about to notify ConnOpener!");
515 CallJobHere(93, 3, cs, Comm::ConnOpener, noteAbort);
516 cs = NULL;
517 service().noteConnectionFailed("abort");
518 }
519
520 closeConnection(); // TODO: rename because we do not always close
521
522 if (!readBuf.isNull())
523 readBuf.clean();
524
525 if (commBuf)
526 memFreeBuf(commBufSize, commBuf);
527
528 tellQueryAborted();
529
530 maybeLog();
531
532 Adaptation::Initiate::swanSong();
533 }
534
535 void Adaptation::Icap::Xaction::tellQueryAborted()
536 {
537 if (theInitiator.set()) {
538 Adaptation::Icap::XactAbortInfo abortInfo(icapRequest, icapReply.getRaw(),
539 retriable(), repeatable());
540 Launcher *launcher = dynamic_cast<Launcher*>(theInitiator.get());
541 // launcher may be nil if initiator is invalid
542 CallJobHere1(91,5, CbcPointer<Launcher>(launcher),
543 Launcher, noteXactAbort, abortInfo);
544 clearInitiator();
545 }
546 }
547
548 void Adaptation::Icap::Xaction::maybeLog()
549 {
550 if (IcapLogfileStatus == LOG_ENABLE) {
551 finalizeLogInfo();
552 icapLogLog(alep);
553 }
554 }
555
556 void Adaptation::Icap::Xaction::finalizeLogInfo()
557 {
558 //prepare log data
559 al.icp.opcode = ICP_INVALID;
560
561 const Adaptation::Icap::ServiceRep &s = service();
562 al.icap.hostAddr = s.cfg().host.termedBuf();
563 al.icap.serviceName = s.cfg().key;
564 al.icap.reqUri = s.cfg().uri;
565
566 al.icap.ioTime = tvSubMsec(icap_tio_start, icap_tio_finish);
567 al.icap.trTime = tvSubMsec(icap_tr_start, current_time);
568
569 al.icap.request = icapRequest;
570 HTTPMSGLOCK(al.icap.request);
571 if (icapReply != NULL) {
572 al.icap.reply = icapReply.getRaw();
573 HTTPMSGLOCK(al.icap.reply);
574 al.icap.resStatus = icapReply->sline.status();
575 }
576 }
577
578 // returns a temporary string depicting transaction status, for debugging
579 const char *Adaptation::Icap::Xaction::status() const
580 {
581 static MemBuf buf;
582 buf.reset();
583
584 buf.append(" [", 2);
585
586 fillPendingStatus(buf);
587 buf.append("/", 1);
588 fillDoneStatus(buf);
589
590 buf.Printf(" %s%u]", id.Prefix, id.value);
591
592 buf.terminate();
593
594 return buf.content();
595 }
596
597 void Adaptation::Icap::Xaction::fillPendingStatus(MemBuf &buf) const
598 {
599 if (haveConnection()) {
600 buf.Printf("FD %d", connection->fd);
601
602 if (writer != NULL)
603 buf.append("w", 1);
604
605 if (reader != NULL)
606 buf.append("r", 1);
607
608 buf.append(";", 1);
609 }
610 }
611
612 void Adaptation::Icap::Xaction::fillDoneStatus(MemBuf &buf) const
613 {
614 if (haveConnection() && commEof)
615 buf.Printf("Comm(%d)", connection->fd);
616
617 if (stopReason != NULL)
618 buf.Printf("Stopped");
619 }
620
621 bool Adaptation::Icap::Xaction::fillVirginHttpHeader(MemBuf &buf) const
622 {
623 return false;
624 }