2 * Copyright (C) 1996-2015 The Squid Software Foundation and contributors
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
9 /* DEBUG: section 93 ICAP (RFC 3507) Client */
12 #include "adaptation/icap/Config.h"
13 #include "adaptation/icap/Launcher.h"
14 #include "adaptation/icap/Xaction.h"
15 #include "base/TextException.h"
17 #include "comm/Connection.h"
18 #include "comm/ConnOpener.h"
19 #include "comm/Read.h"
20 #include "comm/Write.h"
21 #include "CommCalls.h"
22 #include "err_detail_type.h"
26 #include "HttpReply.h"
27 #include "HttpRequest.h"
31 #include "SquidConfig.h"
32 #include "SquidTime.h"
34 Adaptation::Icap::Xaction::Xaction(const char *aTypeName
, Adaptation::Icap::ServiceRep::Pointer
&aService
):
36 Adaptation::Initiate(aTypeName
),
45 reuseConnection(true),
48 ignoreLastWrite(false),
54 alep(new AccessLogEntry
),
58 debugs(93,3, typeName
<< " constructed, this=" << this <<
59 " [icapx" << id
<< ']'); // we should not call virtual status() here
60 icapRequest
= new HttpRequest
;
61 HTTPMSGLOCK(icapRequest
);
62 icap_tr_start
= current_time
;
63 memset(&icap_tio_start
, 0, sizeof(icap_tio_start
));
64 memset(&icap_tio_finish
, 0, sizeof(icap_tio_finish
));
67 Adaptation::Icap::Xaction::~Xaction()
69 debugs(93,3, typeName
<< " destructed, this=" << this <<
70 " [icapx" << id
<< ']'); // we should not call virtual status() here
71 HTTPMSGUNLOCK(icapRequest
);
74 Adaptation::Icap::ServiceRep
&
75 Adaptation::Icap::Xaction::service()
77 Must(theService
!= NULL
);
81 void Adaptation::Icap::Xaction::disableRetries()
83 debugs(93,5, typeName
<< (isRetriable
? " from now on" : " still") <<
84 " cannot be retried " << status());
88 void Adaptation::Icap::Xaction::disableRepeats(const char *reason
)
90 debugs(93,5, typeName
<< (isRepeatable
? " from now on" : " still") <<
91 " cannot be repeated because " << reason
<< status());
95 void Adaptation::Icap::Xaction::start()
97 Adaptation::Initiate::start();
99 readBuf
.init(SQUID_TCP_SO_RCVBUF
, SQUID_TCP_SO_RCVBUF
);
100 commBuf
= (char*)memAllocBuf(SQUID_TCP_SO_RCVBUF
, &commBufSize
);
101 // make sure maximum readBuf space does not exceed commBuf size
102 Must(static_cast<size_t>(readBuf
.potentialSpaceSize()) <= commBufSize
);
106 icapLookupDnsResults(const ipcache_addrs
*ia
, const DnsLookupDetails
&, void *data
)
108 Adaptation::Icap::Xaction
*xa
= static_cast<Adaptation::Icap::Xaction
*>(data
);
109 xa
->dnsLookupDone(ia
);
112 // TODO: obey service-specific, OPTIONS-reported connection limit
114 Adaptation::Icap::Xaction::openConnection()
116 Must(!haveConnection());
118 Adaptation::Icap::ServiceRep
&s
= service();
120 if (!TheConfig
.reuse_connections
)
121 disableRetries(); // this will also safely drain pconn pool
123 bool wasReused
= false;
124 connection
= s
.getConnection(isRetriable
, wasReused
);
126 if (wasReused
&& Comm::IsConnOpen(connection
)) {
127 // Set comm Close handler
128 // fake the connect callback
129 // TODO: can we sync call Adaptation::Icap::Xaction::noteCommConnected here instead?
130 typedef CommCbMemFunT
<Adaptation::Icap::Xaction
, CommConnectCbParams
> Dialer
;
131 CbcPointer
<Xaction
> self(this);
132 Dialer
dialer(self
, &Adaptation::Icap::Xaction::noteCommConnected
);
133 dialer
.params
.conn
= connection
;
134 dialer
.params
.flag
= Comm::OK
;
135 // fake other parameters by copying from the existing connection
136 connector
= asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected", dialer
);
137 ScheduleCallHere(connector
);
141 disableRetries(); // we only retry pconn failures
143 // Attempt to open a new connection...
144 debugs(93,3, typeName
<< " opens connection to " << s
.cfg().host
.termedBuf() << ":" << s
.cfg().port
);
146 // Locate the Service IP(s) to open
147 ipcache_nbgethostbyname(s
.cfg().host
.termedBuf(), icapLookupDnsResults
, this);
151 Adaptation::Icap::Xaction::dnsLookupDone(const ipcache_addrs
*ia
)
153 Adaptation::Icap::ServiceRep
&s
= service();
156 debugs(44, DBG_IMPORTANT
, "ICAP: Unknown service host: " << s
.cfg().host
);
158 #if WHEN_IPCACHE_NBGETHOSTBYNAME_USES_ASYNC_CALLS
159 dieOnConnectionFailure(); // throws
160 #else // take a step back into protected Async call dialing.
161 // fake the connect callback
162 typedef CommCbMemFunT
<Adaptation::Icap::Xaction
, CommConnectCbParams
> Dialer
;
163 CbcPointer
<Xaction
> self(this);
164 Dialer
dialer(self
, &Adaptation::Icap::Xaction::noteCommConnected
);
165 dialer
.params
.conn
= connection
;
166 dialer
.params
.flag
= Comm::COMM_ERROR
;
167 // fake other parameters by copying from the existing connection
168 connector
= asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected", dialer
);
169 ScheduleCallHere(connector
);
174 assert(ia
->cur
< ia
->count
);
176 connection
= new Comm::Connection
;
177 connection
->remote
= ia
->in_addrs
[ia
->cur
];
178 connection
->remote
.port(s
.cfg().port
);
179 getOutgoingAddress(NULL
, connection
);
181 // TODO: service bypass status may differ from that of a transaction
182 typedef CommCbMemFunT
<Adaptation::Icap::Xaction
, CommConnectCbParams
> ConnectDialer
;
183 connector
= JobCallback(93,3, ConnectDialer
, this, Adaptation::Icap::Xaction::noteCommConnected
);
184 cs
= new Comm::ConnOpener(connection
, connector
, TheConfig
.connect_timeout(service().cfg().bypass
));
185 cs
->setHost(s
.cfg().host
.termedBuf());
190 * This event handler is necessary to work around the no-rentry policy
191 * of Adaptation::Icap::Xaction::callStart()
195 Adaptation::Icap::Xaction::reusedConnection(void *data
)
197 debugs(93, 5, HERE
<< "reused connection");
198 Adaptation::Icap::Xaction
*x
= (Adaptation::Icap::Xaction
*)data
;
199 x
->noteCommConnected(Comm::OK
);
203 void Adaptation::Icap::Xaction::closeConnection()
205 if (haveConnection()) {
207 if (closer
!= NULL
) {
208 comm_remove_close_handler(connection
->fd
, closer
);
212 cancelRead(); // may not work
214 if (reuseConnection
&& !doneWithIo()) {
215 //status() adds leading spaces.
216 debugs(93,5, HERE
<< "not reusing pconn due to pending I/O" << status());
217 reuseConnection
= false;
223 const bool reset
= !reuseConnection
&&
224 (al
.icap
.outcome
== xoGone
|| al
.icap
.outcome
== xoError
);
226 Adaptation::Icap::ServiceRep
&s
= service();
227 s
.putConnection(connection
, reuseConnection
, reset
, status());
236 // connection with the ICAP service established
237 void Adaptation::Icap::Xaction::noteCommConnected(const CommConnectCbParams
&io
)
241 if (io
.flag
== Comm::TIMEOUT
) {
242 handleCommTimedout();
246 Must(connector
!= NULL
);
249 if (io
.flag
!= Comm::OK
)
250 dieOnConnectionFailure(); // throws
252 typedef CommCbMemFunT
<Adaptation::Icap::Xaction
, CommTimeoutCbParams
> TimeoutDialer
;
253 AsyncCall::Pointer timeoutCall
= asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommTimedout",
254 TimeoutDialer(this,&Adaptation::Icap::Xaction::noteCommTimedout
));
255 commSetConnTimeout(io
.conn
, TheConfig
.connect_timeout(service().cfg().bypass
), timeoutCall
);
257 typedef CommCbMemFunT
<Adaptation::Icap::Xaction
, CommCloseCbParams
> CloseDialer
;
258 closer
= asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed",
259 CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed
));
260 comm_add_close_handler(io
.conn
->fd
, closer
);
262 // ?? fd_table[io.conn->fd].noteUse(icapPconnPool);
263 service().noteConnectionUse(connection
);
265 handleCommConnected();
268 void Adaptation::Icap::Xaction::dieOnConnectionFailure()
270 debugs(93, 2, HERE
<< typeName
<<
271 " failed to connect to " << service().cfg().uri
);
272 service().noteConnectionFailed("failure");
273 detailError(ERR_DETAIL_ICAP_XACT_START
);
274 throw TexcHere("cannot connect to the ICAP service");
277 void Adaptation::Icap::Xaction::scheduleWrite(MemBuf
&buf
)
279 Must(haveConnection());
281 // comm module will free the buffer
282 typedef CommCbMemFunT
<Adaptation::Icap::Xaction
, CommIoCbParams
> Dialer
;
283 writer
= JobCallback(93, 3,
284 Dialer
, this, Adaptation::Icap::Xaction::noteCommWrote
);
286 Comm::Write(connection
, &buf
, writer
);
290 void Adaptation::Icap::Xaction::noteCommWrote(const CommIoCbParams
&io
)
292 Must(writer
!= NULL
);
295 if (ignoreLastWrite
) {
296 // a hack due to comm inability to cancel a pending write
297 ignoreLastWrite
= false;
298 debugs(93, 7, HERE
<< "ignoring last write; status: " << io
.flag
);
300 Must(io
.flag
== Comm::OK
);
301 al
.icap
.bytesSent
+= io
.size
;
303 handleCommWrote(io
.size
);
307 // communication timeout with the ICAP service
308 void Adaptation::Icap::Xaction::noteCommTimedout(const CommTimeoutCbParams
&)
310 handleCommTimedout();
313 void Adaptation::Icap::Xaction::handleCommTimedout()
315 debugs(93, 2, HERE
<< typeName
<< " failed: timeout with " <<
316 theService
->cfg().methodStr() << " " <<
317 theService
->cfg().uri
<< status());
318 reuseConnection
= false;
319 const bool whileConnecting
= connector
!= NULL
;
320 if (whileConnecting
) {
321 assert(!haveConnection());
322 theService
->noteConnectionFailed("timedout");
324 closeConnection(); // so that late Comm callbacks do not disturb bypass
325 throw TexcHere(whileConnecting
?
326 "timed out while connecting to the ICAP service" :
327 "timed out while talking to the ICAP service");
330 // unexpected connection close while talking to the ICAP service
331 void Adaptation::Icap::Xaction::noteCommClosed(const CommCloseCbParams
&)
337 void Adaptation::Icap::Xaction::handleCommClosed()
339 detailError(ERR_DETAIL_ICAP_XACT_CLOSE
);
340 mustStop("ICAP service connection externally closed");
343 void Adaptation::Icap::Xaction::callException(const std::exception
&e
)
346 service().noteFailure();
347 Adaptation::Initiate::callException(e
);
350 void Adaptation::Icap::Xaction::callEnd()
353 debugs(93, 5, HERE
<< typeName
<< " done with I/O" << status());
356 Adaptation::Initiate::callEnd(); // may destroy us
359 bool Adaptation::Icap::Xaction::doneAll() const
361 return !connector
&& !reader
&& !writer
&& Adaptation::Initiate::doneAll();
364 void Adaptation::Icap::Xaction::updateTimeout()
366 Must(haveConnection());
368 if (reader
!= NULL
|| writer
!= NULL
) {
369 // restart the timeout before each I/O
370 // XXX: why does Config.Timeout lacks a write timeout?
371 // TODO: service bypass status may differ from that of a transaction
372 typedef CommCbMemFunT
<Adaptation::Icap::Xaction
, CommTimeoutCbParams
> TimeoutDialer
;
373 AsyncCall::Pointer call
= JobCallback(93, 5, TimeoutDialer
, this, Adaptation::Icap::Xaction::noteCommTimedout
);
374 commSetConnTimeout(connection
, TheConfig
.io_timeout(service().cfg().bypass
), call
);
376 // clear timeout when there is no I/O
377 // Do we need a lifetime timeout?
378 commUnsetConnTimeout(connection
);
382 void Adaptation::Icap::Xaction::scheduleRead()
384 Must(haveConnection());
386 Must(readBuf
.hasSpace());
389 * See comments in Adaptation::Icap::Xaction.h about why we use commBuf
390 * here instead of reading directly into readBuf.buf.
392 typedef CommCbMemFunT
<Adaptation::Icap::Xaction
, CommIoCbParams
> Dialer
;
393 reader
= JobCallback(93, 3,
394 Dialer
, this, Adaptation::Icap::Xaction::noteCommRead
);
396 comm_read(connection
, commBuf
, readBuf
.spaceSize(), reader
);
400 // comm module read a portion of the ICAP response for us
401 void Adaptation::Icap::Xaction::noteCommRead(const CommIoCbParams
&io
)
403 Must(reader
!= NULL
);
406 Must(io
.flag
== Comm::OK
);
410 reuseConnection
= false;
412 // detect a pconn race condition: eof on the first pconn read
413 if (!al
.icap
.bytesRead
&& retriable()) {
415 mustStop("pconn race");
420 al
.icap
.bytesRead
+=io
.size
;
424 debugs(93, 3, HERE
<< "read " << io
.size
<< " bytes");
427 * See comments in Adaptation::Icap::Xaction.h about why we use commBuf
428 * here instead of reading directly into readBuf.buf.
431 readBuf
.append(commBuf
, io
.size
);
432 disableRetries(); // because pconn did not fail
435 handleCommRead(io
.size
);
438 void Adaptation::Icap::Xaction::cancelRead()
440 if (reader
!= NULL
) {
441 Must(haveConnection());
442 Comm::ReadCancel(connection
->fd
, reader
);
447 bool Adaptation::Icap::Xaction::parseHttpMsg(HttpMsg
*msg
)
449 debugs(93, 5, HERE
<< "have " << readBuf
.contentSize() << " head bytes to parse");
451 Http::StatusCode error
= Http::scNone
;
452 const bool parsed
= msg
->parse(&readBuf
, commEof
, &error
);
453 Must(parsed
|| !error
); // success or need more data
455 if (!parsed
) { // need more data
461 readBuf
.consume(msg
->hdr_sz
);
465 bool Adaptation::Icap::Xaction::mayReadMore() const
467 return !doneReading() && // will read more data
468 readBuf
.hasSpace(); // have space for more data
471 bool Adaptation::Icap::Xaction::doneReading() const
476 bool Adaptation::Icap::Xaction::doneWriting() const
481 bool Adaptation::Icap::Xaction::doneWithIo() const
483 return haveConnection() &&
484 !connector
&& !reader
&& !writer
&& // fast checks, some redundant
485 doneReading() && doneWriting();
488 bool Adaptation::Icap::Xaction::haveConnection() const
490 return connection
!= NULL
&& connection
->isOpen();
494 void Adaptation::Icap::Xaction::noteInitiatorAborted()
497 if (theInitiator
.set()) {
498 debugs(93,4, HERE
<< "Initiator gone before ICAP transaction ended");
500 detailError(ERR_DETAIL_ICAP_INIT_GONE
);
502 mustStop("initiator aborted");
507 void Adaptation::Icap::Xaction::setOutcome(const Adaptation::Icap::XactOutcome
&xo
)
509 if (al
.icap
.outcome
!= xoUnknown
) {
510 debugs(93, 3, HERE
<< "Warning: reseting outcome: from " <<
511 al
.icap
.outcome
<< " to " << xo
);
513 debugs(93, 4, HERE
<< xo
);
515 al
.icap
.outcome
= xo
;
518 // This 'last chance' method is called before a 'done' transaction is deleted.
519 // It is wrong to call virtual methods from a destructor. Besides, this call
520 // indicates that the transaction will terminate as planned.
521 void Adaptation::Icap::Xaction::swanSong()
523 // kids should sing first and then call the parent method.
525 debugs(93,6, HERE
<< id
<< " about to notify ConnOpener!");
526 CallJobHere(93, 3, cs
, Comm::ConnOpener
, noteAbort
);
528 service().noteConnectionFailed("abort");
531 closeConnection(); // TODO: rename because we do not always close
533 if (!readBuf
.isNull())
537 memFreeBuf(commBufSize
, commBuf
);
543 Adaptation::Initiate::swanSong();
546 void Adaptation::Icap::Xaction::tellQueryAborted()
548 if (theInitiator
.set()) {
549 Adaptation::Icap::XactAbortInfo
abortInfo(icapRequest
, icapReply
.getRaw(),
550 retriable(), repeatable());
551 Launcher
*launcher
= dynamic_cast<Launcher
*>(theInitiator
.get());
552 // launcher may be nil if initiator is invalid
553 CallJobHere1(91,5, CbcPointer
<Launcher
>(launcher
),
554 Launcher
, noteXactAbort
, abortInfo
);
559 void Adaptation::Icap::Xaction::maybeLog()
561 if (IcapLogfileStatus
== LOG_ENABLE
) {
567 void Adaptation::Icap::Xaction::finalizeLogInfo()
570 al
.icp
.opcode
= ICP_INVALID
;
572 const Adaptation::Icap::ServiceRep
&s
= service();
573 al
.icap
.hostAddr
= s
.cfg().host
.termedBuf();
574 al
.icap
.serviceName
= s
.cfg().key
;
575 al
.icap
.reqUri
= s
.cfg().uri
;
577 tvSub(al
.icap
.ioTime
, icap_tio_start
, icap_tio_finish
);
578 tvSub(al
.icap
.trTime
, icap_tr_start
, current_time
);
580 al
.icap
.request
= icapRequest
;
581 HTTPMSGLOCK(al
.icap
.request
);
582 if (icapReply
!= NULL
) {
583 al
.icap
.reply
= icapReply
.getRaw();
584 HTTPMSGLOCK(al
.icap
.reply
);
585 al
.icap
.resStatus
= icapReply
->sline
.status();
589 // returns a temporary string depicting transaction status, for debugging
590 const char *Adaptation::Icap::Xaction::status() const
597 fillPendingStatus(buf
);
601 buf
.Printf(" %s%u]", id
.Prefix
, id
.value
);
605 return buf
.content();
608 void Adaptation::Icap::Xaction::fillPendingStatus(MemBuf
&buf
) const
610 if (haveConnection()) {
611 buf
.Printf("FD %d", connection
->fd
);
623 void Adaptation::Icap::Xaction::fillDoneStatus(MemBuf
&buf
) const
625 if (haveConnection() && commEof
)
626 buf
.Printf("Comm(%d)", connection
->fd
);
628 if (stopReason
!= NULL
)
629 buf
.Printf("Stopped");
632 bool Adaptation::Icap::Xaction::fillVirginHttpHeader(MemBuf
&) const