]> git.ipfire.org Git - thirdparty/squid.git/blob - src/adaptation/icap/Xaction.cc
SourceFormat Enforcement
[thirdparty/squid.git] / src / adaptation / icap / Xaction.cc
1 /*
2 * Copyright (C) 1996-2015 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 93 ICAP (RFC 3507) Client */
10
11 #include "squid.h"
12 #include "adaptation/icap/Config.h"
13 #include "adaptation/icap/Launcher.h"
14 #include "adaptation/icap/Xaction.h"
15 #include "base/TextException.h"
16 #include "comm.h"
17 #include "comm/Connection.h"
18 #include "comm/ConnOpener.h"
19 #include "comm/Read.h"
20 #include "comm/Write.h"
21 #include "CommCalls.h"
22 #include "err_detail_type.h"
23 #include "fde.h"
24 #include "FwdState.h"
25 #include "HttpMsg.h"
26 #include "HttpReply.h"
27 #include "HttpRequest.h"
28 #include "icap_log.h"
29 #include "ipcache.h"
30 #include "pconn.h"
31 #include "SquidConfig.h"
32 #include "SquidTime.h"
33
34 Adaptation::Icap::Xaction::Xaction(const char *aTypeName, Adaptation::Icap::ServiceRep::Pointer &aService):
35 AsyncJob(aTypeName),
36 Adaptation::Initiate(aTypeName),
37 icapRequest(NULL),
38 icapReply(NULL),
39 attempts(0),
40 connection(NULL),
41 theService(aService),
42 commBuf(NULL),
43 commBufSize(0),
44 commEof(false),
45 reuseConnection(true),
46 isRetriable(true),
47 isRepeatable(true),
48 ignoreLastWrite(false),
49 stopReason(NULL),
50 connector(NULL),
51 reader(NULL),
52 writer(NULL),
53 closer(NULL),
54 alep(new AccessLogEntry),
55 al(*alep),
56 cs(NULL)
57 {
58 debugs(93,3, typeName << " constructed, this=" << this <<
59 " [icapx" << id << ']'); // we should not call virtual status() here
60 icapRequest = new HttpRequest;
61 HTTPMSGLOCK(icapRequest);
62 icap_tr_start = current_time;
63 memset(&icap_tio_start, 0, sizeof(icap_tio_start));
64 memset(&icap_tio_finish, 0, sizeof(icap_tio_finish));
65 }
66
67 Adaptation::Icap::Xaction::~Xaction()
68 {
69 debugs(93,3, typeName << " destructed, this=" << this <<
70 " [icapx" << id << ']'); // we should not call virtual status() here
71 HTTPMSGUNLOCK(icapRequest);
72 }
73
74 Adaptation::Icap::ServiceRep &
75 Adaptation::Icap::Xaction::service()
76 {
77 Must(theService != NULL);
78 return *theService;
79 }
80
81 void Adaptation::Icap::Xaction::disableRetries()
82 {
83 debugs(93,5, typeName << (isRetriable ? " from now on" : " still") <<
84 " cannot be retried " << status());
85 isRetriable = false;
86 }
87
88 void Adaptation::Icap::Xaction::disableRepeats(const char *reason)
89 {
90 debugs(93,5, typeName << (isRepeatable ? " from now on" : " still") <<
91 " cannot be repeated because " << reason << status());
92 isRepeatable = false;
93 }
94
95 void Adaptation::Icap::Xaction::start()
96 {
97 Adaptation::Initiate::start();
98
99 readBuf.init(SQUID_TCP_SO_RCVBUF, SQUID_TCP_SO_RCVBUF);
100 commBuf = (char*)memAllocBuf(SQUID_TCP_SO_RCVBUF, &commBufSize);
101 // make sure maximum readBuf space does not exceed commBuf size
102 Must(static_cast<size_t>(readBuf.potentialSpaceSize()) <= commBufSize);
103 }
104
105 static void
106 icapLookupDnsResults(const ipcache_addrs *ia, const DnsLookupDetails &, void *data)
107 {
108 Adaptation::Icap::Xaction *xa = static_cast<Adaptation::Icap::Xaction *>(data);
109 xa->dnsLookupDone(ia);
110 }
111
112 // TODO: obey service-specific, OPTIONS-reported connection limit
113 void
114 Adaptation::Icap::Xaction::openConnection()
115 {
116 Must(!haveConnection());
117
118 Adaptation::Icap::ServiceRep &s = service();
119
120 if (!TheConfig.reuse_connections)
121 disableRetries(); // this will also safely drain pconn pool
122
123 bool wasReused = false;
124 connection = s.getConnection(isRetriable, wasReused);
125
126 if (wasReused && Comm::IsConnOpen(connection)) {
127 // Set comm Close handler
128 // fake the connect callback
129 // TODO: can we sync call Adaptation::Icap::Xaction::noteCommConnected here instead?
130 typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> Dialer;
131 CbcPointer<Xaction> self(this);
132 Dialer dialer(self, &Adaptation::Icap::Xaction::noteCommConnected);
133 dialer.params.conn = connection;
134 dialer.params.flag = Comm::OK;
135 // fake other parameters by copying from the existing connection
136 connector = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected", dialer);
137 ScheduleCallHere(connector);
138 return;
139 }
140
141 disableRetries(); // we only retry pconn failures
142
143 // Attempt to open a new connection...
144 debugs(93,3, typeName << " opens connection to " << s.cfg().host.termedBuf() << ":" << s.cfg().port);
145
146 // Locate the Service IP(s) to open
147 ipcache_nbgethostbyname(s.cfg().host.termedBuf(), icapLookupDnsResults, this);
148 }
149
150 void
151 Adaptation::Icap::Xaction::dnsLookupDone(const ipcache_addrs *ia)
152 {
153 Adaptation::Icap::ServiceRep &s = service();
154
155 if (ia == NULL) {
156 debugs(44, DBG_IMPORTANT, "ICAP: Unknown service host: " << s.cfg().host);
157
158 #if WHEN_IPCACHE_NBGETHOSTBYNAME_USES_ASYNC_CALLS
159 dieOnConnectionFailure(); // throws
160 #else // take a step back into protected Async call dialing.
161 // fake the connect callback
162 typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> Dialer;
163 CbcPointer<Xaction> self(this);
164 Dialer dialer(self, &Adaptation::Icap::Xaction::noteCommConnected);
165 dialer.params.conn = connection;
166 dialer.params.flag = Comm::COMM_ERROR;
167 // fake other parameters by copying from the existing connection
168 connector = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected", dialer);
169 ScheduleCallHere(connector);
170 #endif
171 return;
172 }
173
174 assert(ia->cur < ia->count);
175
176 connection = new Comm::Connection;
177 connection->remote = ia->in_addrs[ia->cur];
178 connection->remote.port(s.cfg().port);
179 getOutgoingAddress(NULL, connection);
180
181 // TODO: service bypass status may differ from that of a transaction
182 typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> ConnectDialer;
183 connector = JobCallback(93,3, ConnectDialer, this, Adaptation::Icap::Xaction::noteCommConnected);
184 cs = new Comm::ConnOpener(connection, connector, TheConfig.connect_timeout(service().cfg().bypass));
185 cs->setHost(s.cfg().host.termedBuf());
186 AsyncJob::Start(cs);
187 }
188
189 /*
190 * This event handler is necessary to work around the no-rentry policy
191 * of Adaptation::Icap::Xaction::callStart()
192 */
193 #if 0
194 void
195 Adaptation::Icap::Xaction::reusedConnection(void *data)
196 {
197 debugs(93, 5, HERE << "reused connection");
198 Adaptation::Icap::Xaction *x = (Adaptation::Icap::Xaction*)data;
199 x->noteCommConnected(Comm::OK);
200 }
201 #endif
202
203 void Adaptation::Icap::Xaction::closeConnection()
204 {
205 if (haveConnection()) {
206
207 if (closer != NULL) {
208 comm_remove_close_handler(connection->fd, closer);
209 closer = NULL;
210 }
211
212 cancelRead(); // may not work
213
214 if (reuseConnection && !doneWithIo()) {
215 //status() adds leading spaces.
216 debugs(93,5, HERE << "not reusing pconn due to pending I/O" << status());
217 reuseConnection = false;
218 }
219
220 if (reuseConnection)
221 disableRetries();
222
223 const bool reset = !reuseConnection &&
224 (al.icap.outcome == xoGone || al.icap.outcome == xoError);
225
226 Adaptation::Icap::ServiceRep &s = service();
227 s.putConnection(connection, reuseConnection, reset, status());
228
229 writer = NULL;
230 reader = NULL;
231 connector = NULL;
232 connection = NULL;
233 }
234 }
235
236 // connection with the ICAP service established
237 void Adaptation::Icap::Xaction::noteCommConnected(const CommConnectCbParams &io)
238 {
239 cs = NULL;
240
241 if (io.flag == Comm::TIMEOUT) {
242 handleCommTimedout();
243 return;
244 }
245
246 Must(connector != NULL);
247 connector = NULL;
248
249 if (io.flag != Comm::OK)
250 dieOnConnectionFailure(); // throws
251
252 typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommTimeoutCbParams> TimeoutDialer;
253 AsyncCall::Pointer timeoutCall = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommTimedout",
254 TimeoutDialer(this,&Adaptation::Icap::Xaction::noteCommTimedout));
255 commSetConnTimeout(io.conn, TheConfig.connect_timeout(service().cfg().bypass), timeoutCall);
256
257 typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommCloseCbParams> CloseDialer;
258 closer = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed",
259 CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed));
260 comm_add_close_handler(io.conn->fd, closer);
261
262 // ?? fd_table[io.conn->fd].noteUse(icapPconnPool);
263 service().noteConnectionUse(connection);
264
265 handleCommConnected();
266 }
267
268 void Adaptation::Icap::Xaction::dieOnConnectionFailure()
269 {
270 debugs(93, 2, HERE << typeName <<
271 " failed to connect to " << service().cfg().uri);
272 service().noteConnectionFailed("failure");
273 detailError(ERR_DETAIL_ICAP_XACT_START);
274 throw TexcHere("cannot connect to the ICAP service");
275 }
276
277 void Adaptation::Icap::Xaction::scheduleWrite(MemBuf &buf)
278 {
279 Must(haveConnection());
280
281 // comm module will free the buffer
282 typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommIoCbParams> Dialer;
283 writer = JobCallback(93, 3,
284 Dialer, this, Adaptation::Icap::Xaction::noteCommWrote);
285
286 Comm::Write(connection, &buf, writer);
287 updateTimeout();
288 }
289
290 void Adaptation::Icap::Xaction::noteCommWrote(const CommIoCbParams &io)
291 {
292 Must(writer != NULL);
293 writer = NULL;
294
295 if (ignoreLastWrite) {
296 // a hack due to comm inability to cancel a pending write
297 ignoreLastWrite = false;
298 debugs(93, 7, HERE << "ignoring last write; status: " << io.flag);
299 } else {
300 Must(io.flag == Comm::OK);
301 al.icap.bytesSent += io.size;
302 updateTimeout();
303 handleCommWrote(io.size);
304 }
305 }
306
307 // communication timeout with the ICAP service
308 void Adaptation::Icap::Xaction::noteCommTimedout(const CommTimeoutCbParams &)
309 {
310 handleCommTimedout();
311 }
312
313 void Adaptation::Icap::Xaction::handleCommTimedout()
314 {
315 debugs(93, 2, HERE << typeName << " failed: timeout with " <<
316 theService->cfg().methodStr() << " " <<
317 theService->cfg().uri << status());
318 reuseConnection = false;
319 const bool whileConnecting = connector != NULL;
320 if (whileConnecting) {
321 assert(!haveConnection());
322 theService->noteConnectionFailed("timedout");
323 } else
324 closeConnection(); // so that late Comm callbacks do not disturb bypass
325 throw TexcHere(whileConnecting ?
326 "timed out while connecting to the ICAP service" :
327 "timed out while talking to the ICAP service");
328 }
329
330 // unexpected connection close while talking to the ICAP service
331 void Adaptation::Icap::Xaction::noteCommClosed(const CommCloseCbParams &)
332 {
333 closer = NULL;
334 handleCommClosed();
335 }
336
337 void Adaptation::Icap::Xaction::handleCommClosed()
338 {
339 detailError(ERR_DETAIL_ICAP_XACT_CLOSE);
340 mustStop("ICAP service connection externally closed");
341 }
342
343 void Adaptation::Icap::Xaction::callException(const std::exception &e)
344 {
345 setOutcome(xoError);
346 service().noteFailure();
347 Adaptation::Initiate::callException(e);
348 }
349
350 void Adaptation::Icap::Xaction::callEnd()
351 {
352 if (doneWithIo()) {
353 debugs(93, 5, HERE << typeName << " done with I/O" << status());
354 closeConnection();
355 }
356 Adaptation::Initiate::callEnd(); // may destroy us
357 }
358
359 bool Adaptation::Icap::Xaction::doneAll() const
360 {
361 return !connector && !reader && !writer && Adaptation::Initiate::doneAll();
362 }
363
364 void Adaptation::Icap::Xaction::updateTimeout()
365 {
366 Must(haveConnection());
367
368 if (reader != NULL || writer != NULL) {
369 // restart the timeout before each I/O
370 // XXX: why does Config.Timeout lacks a write timeout?
371 // TODO: service bypass status may differ from that of a transaction
372 typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommTimeoutCbParams> TimeoutDialer;
373 AsyncCall::Pointer call = JobCallback(93, 5, TimeoutDialer, this, Adaptation::Icap::Xaction::noteCommTimedout);
374 commSetConnTimeout(connection, TheConfig.io_timeout(service().cfg().bypass), call);
375 } else {
376 // clear timeout when there is no I/O
377 // Do we need a lifetime timeout?
378 commUnsetConnTimeout(connection);
379 }
380 }
381
382 void Adaptation::Icap::Xaction::scheduleRead()
383 {
384 Must(haveConnection());
385 Must(!reader);
386 Must(readBuf.hasSpace());
387
388 /*
389 * See comments in Adaptation::Icap::Xaction.h about why we use commBuf
390 * here instead of reading directly into readBuf.buf.
391 */
392 typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommIoCbParams> Dialer;
393 reader = JobCallback(93, 3,
394 Dialer, this, Adaptation::Icap::Xaction::noteCommRead);
395
396 comm_read(connection, commBuf, readBuf.spaceSize(), reader);
397 updateTimeout();
398 }
399
400 // comm module read a portion of the ICAP response for us
401 void Adaptation::Icap::Xaction::noteCommRead(const CommIoCbParams &io)
402 {
403 Must(reader != NULL);
404 reader = NULL;
405
406 Must(io.flag == Comm::OK);
407
408 if (!io.size) {
409 commEof = true;
410 reuseConnection = false;
411
412 // detect a pconn race condition: eof on the first pconn read
413 if (!al.icap.bytesRead && retriable()) {
414 setOutcome(xoRace);
415 mustStop("pconn race");
416 return;
417 }
418 } else {
419
420 al.icap.bytesRead+=io.size;
421
422 updateTimeout();
423
424 debugs(93, 3, HERE << "read " << io.size << " bytes");
425
426 /*
427 * See comments in Adaptation::Icap::Xaction.h about why we use commBuf
428 * here instead of reading directly into readBuf.buf.
429 */
430
431 readBuf.append(commBuf, io.size);
432 disableRetries(); // because pconn did not fail
433 }
434
435 handleCommRead(io.size);
436 }
437
438 void Adaptation::Icap::Xaction::cancelRead()
439 {
440 if (reader != NULL) {
441 Must(haveConnection());
442 Comm::ReadCancel(connection->fd, reader);
443 reader = NULL;
444 }
445 }
446
447 bool Adaptation::Icap::Xaction::parseHttpMsg(HttpMsg *msg)
448 {
449 debugs(93, 5, HERE << "have " << readBuf.contentSize() << " head bytes to parse");
450
451 Http::StatusCode error = Http::scNone;
452 const bool parsed = msg->parse(&readBuf, commEof, &error);
453 Must(parsed || !error); // success or need more data
454
455 if (!parsed) { // need more data
456 Must(mayReadMore());
457 msg->reset();
458 return false;
459 }
460
461 readBuf.consume(msg->hdr_sz);
462 return true;
463 }
464
465 bool Adaptation::Icap::Xaction::mayReadMore() const
466 {
467 return !doneReading() && // will read more data
468 readBuf.hasSpace(); // have space for more data
469 }
470
471 bool Adaptation::Icap::Xaction::doneReading() const
472 {
473 return commEof;
474 }
475
476 bool Adaptation::Icap::Xaction::doneWriting() const
477 {
478 return !writer;
479 }
480
481 bool Adaptation::Icap::Xaction::doneWithIo() const
482 {
483 return haveConnection() &&
484 !connector && !reader && !writer && // fast checks, some redundant
485 doneReading() && doneWriting();
486 }
487
488 bool Adaptation::Icap::Xaction::haveConnection() const
489 {
490 return connection != NULL && connection->isOpen();
491 }
492
493 // initiator aborted
494 void Adaptation::Icap::Xaction::noteInitiatorAborted()
495 {
496
497 if (theInitiator.set()) {
498 debugs(93,4, HERE << "Initiator gone before ICAP transaction ended");
499 clearInitiator();
500 detailError(ERR_DETAIL_ICAP_INIT_GONE);
501 setOutcome(xoGone);
502 mustStop("initiator aborted");
503 }
504
505 }
506
507 void Adaptation::Icap::Xaction::setOutcome(const Adaptation::Icap::XactOutcome &xo)
508 {
509 if (al.icap.outcome != xoUnknown) {
510 debugs(93, 3, HERE << "Warning: reseting outcome: from " <<
511 al.icap.outcome << " to " << xo);
512 } else {
513 debugs(93, 4, HERE << xo);
514 }
515 al.icap.outcome = xo;
516 }
517
518 // This 'last chance' method is called before a 'done' transaction is deleted.
519 // It is wrong to call virtual methods from a destructor. Besides, this call
520 // indicates that the transaction will terminate as planned.
521 void Adaptation::Icap::Xaction::swanSong()
522 {
523 // kids should sing first and then call the parent method.
524 if (cs) {
525 debugs(93,6, HERE << id << " about to notify ConnOpener!");
526 CallJobHere(93, 3, cs, Comm::ConnOpener, noteAbort);
527 cs = NULL;
528 service().noteConnectionFailed("abort");
529 }
530
531 closeConnection(); // TODO: rename because we do not always close
532
533 if (!readBuf.isNull())
534 readBuf.clean();
535
536 if (commBuf)
537 memFreeBuf(commBufSize, commBuf);
538
539 tellQueryAborted();
540
541 maybeLog();
542
543 Adaptation::Initiate::swanSong();
544 }
545
546 void Adaptation::Icap::Xaction::tellQueryAborted()
547 {
548 if (theInitiator.set()) {
549 Adaptation::Icap::XactAbortInfo abortInfo(icapRequest, icapReply.getRaw(),
550 retriable(), repeatable());
551 Launcher *launcher = dynamic_cast<Launcher*>(theInitiator.get());
552 // launcher may be nil if initiator is invalid
553 CallJobHere1(91,5, CbcPointer<Launcher>(launcher),
554 Launcher, noteXactAbort, abortInfo);
555 clearInitiator();
556 }
557 }
558
559 void Adaptation::Icap::Xaction::maybeLog()
560 {
561 if (IcapLogfileStatus == LOG_ENABLE) {
562 finalizeLogInfo();
563 icapLogLog(alep);
564 }
565 }
566
567 void Adaptation::Icap::Xaction::finalizeLogInfo()
568 {
569 //prepare log data
570 al.icp.opcode = ICP_INVALID;
571
572 const Adaptation::Icap::ServiceRep &s = service();
573 al.icap.hostAddr = s.cfg().host.termedBuf();
574 al.icap.serviceName = s.cfg().key;
575 al.icap.reqUri = s.cfg().uri;
576
577 tvSub(al.icap.ioTime, icap_tio_start, icap_tio_finish);
578 tvSub(al.icap.trTime, icap_tr_start, current_time);
579
580 al.icap.request = icapRequest;
581 HTTPMSGLOCK(al.icap.request);
582 if (icapReply != NULL) {
583 al.icap.reply = icapReply.getRaw();
584 HTTPMSGLOCK(al.icap.reply);
585 al.icap.resStatus = icapReply->sline.status();
586 }
587 }
588
589 // returns a temporary string depicting transaction status, for debugging
590 const char *Adaptation::Icap::Xaction::status() const
591 {
592 static MemBuf buf;
593 buf.reset();
594
595 buf.append(" [", 2);
596
597 fillPendingStatus(buf);
598 buf.append("/", 1);
599 fillDoneStatus(buf);
600
601 buf.Printf(" %s%u]", id.Prefix, id.value);
602
603 buf.terminate();
604
605 return buf.content();
606 }
607
608 void Adaptation::Icap::Xaction::fillPendingStatus(MemBuf &buf) const
609 {
610 if (haveConnection()) {
611 buf.Printf("FD %d", connection->fd);
612
613 if (writer != NULL)
614 buf.append("w", 1);
615
616 if (reader != NULL)
617 buf.append("r", 1);
618
619 buf.append(";", 1);
620 }
621 }
622
623 void Adaptation::Icap::Xaction::fillDoneStatus(MemBuf &buf) const
624 {
625 if (haveConnection() && commEof)
626 buf.Printf("Comm(%d)", connection->fd);
627
628 if (stopReason != NULL)
629 buf.Printf("Stopped");
630 }
631
632 bool Adaptation::Icap::Xaction::fillVirginHttpHeader(MemBuf &) const
633 {
634 return false;
635 }
636