2 * DEBUG: section 93 ICAP (RFC 3507) Client
6 #include "acl/FilledChecklist.h"
7 #include "adaptation/icap/Config.h"
8 #include "adaptation/icap/Launcher.h"
9 #include "adaptation/icap/Xaction.h"
10 #include "base/TextException.h"
12 #include "comm/Connection.h"
13 #include "comm/ConnOpener.h"
14 #include "comm/Write.h"
15 #include "CommCalls.h"
16 #include "err_detail_type.h"
20 #include "HttpReply.h"
21 #include "HttpRequest.h"
26 #include "SquidTime.h"
28 //CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Icap, Xaction);
30 Adaptation::Icap::Xaction::Xaction(const char *aTypeName
, Adaptation::Icap::ServiceRep::Pointer
&aService
):
32 Adaptation::Initiate(aTypeName
),
38 commBuf(NULL
), commBufSize(0),
40 reuseConnection(true),
43 ignoreLastWrite(false),
44 connector(NULL
), reader(NULL
), writer(NULL
), closer(NULL
),
45 alep(new AccessLogEntry
),
48 debugs(93,3, typeName
<< " constructed, this=" << this <<
49 " [icapx" << id
<< ']'); // we should not call virtual status() here
50 icapRequest
= HTTPMSGLOCK(new HttpRequest
);
51 icap_tr_start
= current_time
;
54 Adaptation::Icap::Xaction::~Xaction()
56 debugs(93,3, typeName
<< " destructed, this=" << this <<
57 " [icapx" << id
<< ']'); // we should not call virtual status() here
58 HTTPMSGUNLOCK(icapRequest
);
61 Adaptation::Icap::ServiceRep
&
62 Adaptation::Icap::Xaction::service()
64 Must(theService
!= NULL
);
68 void Adaptation::Icap::Xaction::disableRetries()
70 debugs(93,5, typeName
<< (isRetriable
? " from now on" : " still") <<
71 " cannot be retried " << status());
75 void Adaptation::Icap::Xaction::disableRepeats(const char *reason
)
77 debugs(93,5, typeName
<< (isRepeatable
? " from now on" : " still") <<
78 " cannot be repeated because " << reason
<< status());
82 void Adaptation::Icap::Xaction::start()
84 Adaptation::Initiate::start();
86 readBuf
.init(SQUID_TCP_SO_RCVBUF
, SQUID_TCP_SO_RCVBUF
);
87 commBuf
= (char*)memAllocBuf(SQUID_TCP_SO_RCVBUF
, &commBufSize
);
88 // make sure maximum readBuf space does not exceed commBuf size
89 Must(static_cast<size_t>(readBuf
.potentialSpaceSize()) <= commBufSize
);
93 icapLookupDnsResults(const ipcache_addrs
*ia
, const DnsLookupDetails
&, void *data
)
95 Adaptation::Icap::Xaction
*xa
= static_cast<Adaptation::Icap::Xaction
*>(data
);
96 xa
->dnsLookupDone(ia
);
99 // TODO: obey service-specific, OPTIONS-reported connection limit
101 Adaptation::Icap::Xaction::openConnection()
103 Must(!haveConnection());
105 Adaptation::Icap::ServiceRep
&s
= service();
107 if (!TheConfig
.reuse_connections
)
108 disableRetries(); // this will also safely drain pconn pool
110 bool wasReused
= false;
111 connection
= s
.getConnection(isRetriable
, wasReused
);
113 if (wasReused
&& Comm::IsConnOpen(connection
)) {
114 // Set comm Close handler
115 // fake the connect callback
116 // TODO: can we sync call Adaptation::Icap::Xaction::noteCommConnected here instead?
117 typedef CommCbMemFunT
<Adaptation::Icap::Xaction
, CommConnectCbParams
> Dialer
;
118 CbcPointer
<Xaction
> self(this);
119 Dialer
dialer(self
, &Adaptation::Icap::Xaction::noteCommConnected
);
120 dialer
.params
.conn
= connection
;
121 dialer
.params
.flag
= COMM_OK
;
122 // fake other parameters by copying from the existing connection
123 connector
= asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected", dialer
);
124 ScheduleCallHere(connector
);
128 disableRetries(); // we only retry pconn failures
130 // Attempt to open a new connection...
131 debugs(93,3, typeName
<< " opens connection to " << s
.cfg().host
.termedBuf() << ":" << s
.cfg().port
);
133 // Locate the Service IP(s) to open
134 ipcache_nbgethostbyname(s
.cfg().host
.termedBuf(), icapLookupDnsResults
, this);
138 Adaptation::Icap::Xaction::dnsLookupDone(const ipcache_addrs
*ia
)
140 Adaptation::Icap::ServiceRep
&s
= service();
143 debugs(44, DBG_IMPORTANT
, "ICAP: Unknown service host: " << s
.cfg().host
);
145 #if WHEN_IPCACHE_NBGETHOSTBYNAME_USES_ASYNC_CALLS
146 dieOnConnectionFailure(); // throws
147 #else // take a step back into protected Async call dialing.
148 // fake the connect callback
149 typedef CommCbMemFunT
<Adaptation::Icap::Xaction
, CommConnectCbParams
> Dialer
;
150 CbcPointer
<Xaction
> self(this);
151 Dialer
dialer(self
, &Adaptation::Icap::Xaction::noteCommConnected
);
152 dialer
.params
.conn
= connection
;
153 dialer
.params
.flag
= COMM_ERROR
;
154 // fake other parameters by copying from the existing connection
155 connector
= asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected", dialer
);
156 ScheduleCallHere(connector
);
161 assert(ia
->cur
< ia
->count
);
163 connection
= new Comm::Connection
;
164 connection
->remote
= ia
->in_addrs
[ia
->cur
];
165 connection
->remote
.SetPort(s
.cfg().port
);
166 getOutgoingAddress(NULL
, connection
);
168 // TODO: service bypass status may differ from that of a transaction
169 typedef CommCbMemFunT
<Adaptation::Icap::Xaction
, CommConnectCbParams
> ConnectDialer
;
170 connector
= JobCallback(93,3, ConnectDialer
, this, Adaptation::Icap::Xaction::noteCommConnected
);
171 Comm::ConnOpener
*cs
= new Comm::ConnOpener(connection
, connector
, TheConfig
.connect_timeout(service().cfg().bypass
));
172 cs
->setHost(s
.cfg().host
.termedBuf());
177 * This event handler is necessary to work around the no-rentry policy
178 * of Adaptation::Icap::Xaction::callStart()
182 Adaptation::Icap::Xaction::reusedConnection(void *data
)
184 debugs(93, 5, HERE
<< "reused connection");
185 Adaptation::Icap::Xaction
*x
= (Adaptation::Icap::Xaction
*)data
;
186 x
->noteCommConnected(COMM_OK
);
190 void Adaptation::Icap::Xaction::closeConnection()
192 if (haveConnection()) {
194 if (closer
!= NULL
) {
195 comm_remove_close_handler(connection
->fd
, closer
);
199 cancelRead(); // may not work
201 if (reuseConnection
&& !doneWithIo()) {
202 //status() adds leading spaces.
203 debugs(93,5, HERE
<< "not reusing pconn due to pending I/O" << status());
204 reuseConnection
= false;
210 const bool reset
= !reuseConnection
&&
211 (al
.icap
.outcome
== xoGone
|| al
.icap
.outcome
== xoError
);
213 Adaptation::Icap::ServiceRep
&s
= service();
214 s
.putConnection(connection
, reuseConnection
, reset
, status());
223 // connection with the ICAP service established
224 void Adaptation::Icap::Xaction::noteCommConnected(const CommConnectCbParams
&io
)
226 if (io
.flag
== COMM_TIMEOUT
) {
227 handleCommTimedout();
231 Must(connector
!= NULL
);
234 if (io
.flag
!= COMM_OK
)
235 dieOnConnectionFailure(); // throws
237 typedef CommCbMemFunT
<Adaptation::Icap::Xaction
, CommTimeoutCbParams
> TimeoutDialer
;
238 AsyncCall::Pointer timeoutCall
= asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommTimedout",
239 TimeoutDialer(this,&Adaptation::Icap::Xaction::noteCommTimedout
));
240 commSetConnTimeout(io
.conn
, TheConfig
.connect_timeout(service().cfg().bypass
), timeoutCall
);
242 typedef CommCbMemFunT
<Adaptation::Icap::Xaction
, CommCloseCbParams
> CloseDialer
;
243 closer
= asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed",
244 CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed
));
245 comm_add_close_handler(io
.conn
->fd
, closer
);
247 // ?? fd_table[io.conn->fd].noteUse(icapPconnPool);
248 service().noteConnectionUse(connection
);
250 handleCommConnected();
253 void Adaptation::Icap::Xaction::dieOnConnectionFailure()
255 debugs(93, 2, HERE
<< typeName
<<
256 " failed to connect to " << service().cfg().uri
);
257 service().noteConnectionFailed("failure");
258 detailError(ERR_DETAIL_ICAP_XACT_START
);
259 throw TexcHere("cannot connect to the ICAP service");
262 void Adaptation::Icap::Xaction::scheduleWrite(MemBuf
&buf
)
264 Must(haveConnection());
266 // comm module will free the buffer
267 typedef CommCbMemFunT
<Adaptation::Icap::Xaction
, CommIoCbParams
> Dialer
;
268 writer
= JobCallback(93, 3,
269 Dialer
, this, Adaptation::Icap::Xaction::noteCommWrote
);
271 Comm::Write(connection
, &buf
, writer
);
275 void Adaptation::Icap::Xaction::noteCommWrote(const CommIoCbParams
&io
)
277 Must(writer
!= NULL
);
280 if (ignoreLastWrite
) {
281 // a hack due to comm inability to cancel a pending write
282 ignoreLastWrite
= false;
283 debugs(93, 7, HERE
<< "ignoring last write; status: " << io
.flag
);
285 Must(io
.flag
== COMM_OK
);
286 al
.icap
.bytesSent
+= io
.size
;
288 handleCommWrote(io
.size
);
292 // communication timeout with the ICAP service
293 void Adaptation::Icap::Xaction::noteCommTimedout(const CommTimeoutCbParams
&io
)
295 handleCommTimedout();
298 void Adaptation::Icap::Xaction::handleCommTimedout()
300 debugs(93, 2, HERE
<< typeName
<< " failed: timeout with " <<
301 theService
->cfg().methodStr() << " " <<
302 theService
->cfg().uri
<< status());
303 reuseConnection
= false;
304 const bool whileConnecting
= connector
!= NULL
;
305 if (whileConnecting
) {
306 assert(!haveConnection());
307 theService
->noteConnectionFailed("timedout");
309 closeConnection(); // so that late Comm callbacks do not disturb bypass
310 throw TexcHere(whileConnecting
?
311 "timed out while connecting to the ICAP service" :
312 "timed out while talking to the ICAP service");
315 // unexpected connection close while talking to the ICAP service
316 void Adaptation::Icap::Xaction::noteCommClosed(const CommCloseCbParams
&io
)
322 void Adaptation::Icap::Xaction::handleCommClosed()
324 detailError(ERR_DETAIL_ICAP_XACT_CLOSE
);
325 mustStop("ICAP service connection externally closed");
328 void Adaptation::Icap::Xaction::callException(const std::exception
&e
)
331 service().noteFailure();
332 Adaptation::Initiate::callException(e
);
335 void Adaptation::Icap::Xaction::callEnd()
338 debugs(93, 5, HERE
<< typeName
<< " done with I/O" << status());
341 Adaptation::Initiate::callEnd(); // may destroy us
344 bool Adaptation::Icap::Xaction::doneAll() const
346 return !connector
&& !reader
&& !writer
&& Adaptation::Initiate::doneAll();
349 void Adaptation::Icap::Xaction::updateTimeout()
351 Must(haveConnection());
353 if (reader
!= NULL
|| writer
!= NULL
) {
354 // restart the timeout before each I/O
355 // XXX: why does Config.Timeout lacks a write timeout?
356 // TODO: service bypass status may differ from that of a transaction
357 typedef CommCbMemFunT
<Adaptation::Icap::Xaction
, CommTimeoutCbParams
> TimeoutDialer
;
358 AsyncCall::Pointer call
= JobCallback(93, 5, TimeoutDialer
, this, Adaptation::Icap::Xaction::noteCommTimedout
);
359 commSetConnTimeout(connection
, TheConfig
.io_timeout(service().cfg().bypass
), call
);
361 // clear timeout when there is no I/O
362 // Do we need a lifetime timeout?
363 commUnsetConnTimeout(connection
);
367 void Adaptation::Icap::Xaction::scheduleRead()
369 Must(haveConnection());
371 Must(readBuf
.hasSpace());
374 * See comments in Adaptation::Icap::Xaction.h about why we use commBuf
375 * here instead of reading directly into readBuf.buf.
377 typedef CommCbMemFunT
<Adaptation::Icap::Xaction
, CommIoCbParams
> Dialer
;
378 reader
= JobCallback(93, 3,
379 Dialer
, this, Adaptation::Icap::Xaction::noteCommRead
);
381 comm_read(connection
, commBuf
, readBuf
.spaceSize(), reader
);
385 // comm module read a portion of the ICAP response for us
386 void Adaptation::Icap::Xaction::noteCommRead(const CommIoCbParams
&io
)
388 Must(reader
!= NULL
);
391 Must(io
.flag
== COMM_OK
);
395 reuseConnection
= false;
397 // detect a pconn race condition: eof on the first pconn read
398 if (!al
.icap
.bytesRead
&& retriable()) {
400 mustStop("pconn race");
405 al
.icap
.bytesRead
+=io
.size
;
409 debugs(93, 3, HERE
<< "read " << io
.size
<< " bytes");
412 * See comments in Adaptation::Icap::Xaction.h about why we use commBuf
413 * here instead of reading directly into readBuf.buf.
416 readBuf
.append(commBuf
, io
.size
);
417 disableRetries(); // because pconn did not fail
420 handleCommRead(io
.size
);
423 void Adaptation::Icap::Xaction::cancelRead()
425 if (reader
!= NULL
) {
426 Must(haveConnection());
427 comm_read_cancel(connection
->fd
, reader
);
432 bool Adaptation::Icap::Xaction::parseHttpMsg(HttpMsg
*msg
)
434 debugs(93, 5, HERE
<< "have " << readBuf
.contentSize() << " head bytes to parse");
436 http_status error
= HTTP_STATUS_NONE
;
437 const bool parsed
= msg
->parse(&readBuf
, commEof
, &error
);
438 Must(parsed
|| !error
); // success or need more data
440 if (!parsed
) { // need more data
446 readBuf
.consume(msg
->hdr_sz
);
450 bool Adaptation::Icap::Xaction::mayReadMore() const
452 return !doneReading() && // will read more data
453 readBuf
.hasSpace(); // have space for more data
456 bool Adaptation::Icap::Xaction::doneReading() const
461 bool Adaptation::Icap::Xaction::doneWriting() const
466 bool Adaptation::Icap::Xaction::doneWithIo() const
468 return haveConnection() &&
469 !connector
&& !reader
&& !writer
&& // fast checks, some redundant
470 doneReading() && doneWriting();
473 bool Adaptation::Icap::Xaction::haveConnection() const
475 return connection
!= NULL
&& connection
->isOpen();
479 void Adaptation::Icap::Xaction::noteInitiatorAborted()
482 if (theInitiator
.set()) {
483 debugs(93,4, HERE
<< "Initiator gone before ICAP transaction ended");
485 detailError(ERR_DETAIL_ICAP_INIT_GONE
);
487 mustStop("initiator aborted");
492 void Adaptation::Icap::Xaction::setOutcome(const Adaptation::Icap::XactOutcome
&xo
)
494 if (al
.icap
.outcome
!= xoUnknown
) {
495 debugs(93, 3, HERE
<< "Warning: reseting outcome: from " <<
496 al
.icap
.outcome
<< " to " << xo
);
498 debugs(93, 4, HERE
<< xo
);
500 al
.icap
.outcome
= xo
;
503 // This 'last chance' method is called before a 'done' transaction is deleted.
504 // It is wrong to call virtual methods from a destructor. Besides, this call
505 // indicates that the transaction will terminate as planned.
506 void Adaptation::Icap::Xaction::swanSong()
508 // kids should sing first and then call the parent method.
510 closeConnection(); // TODO: rename because we do not always close
512 if (!readBuf
.isNull())
516 memFreeBuf(commBufSize
, commBuf
);
522 Adaptation::Initiate::swanSong();
525 void Adaptation::Icap::Xaction::tellQueryAborted()
527 if (theInitiator
.set()) {
528 Adaptation::Icap::XactAbortInfo
abortInfo(icapRequest
, icapReply
,
529 retriable(), repeatable());
530 Launcher
*launcher
= dynamic_cast<Launcher
*>(theInitiator
.get());
531 // launcher may be nil if initiator is invalid
532 CallJobHere1(91,5, CbcPointer
<Launcher
>(launcher
),
533 Launcher
, noteXactAbort
, abortInfo
);
538 void Adaptation::Icap::Xaction::maybeLog()
540 if (IcapLogfileStatus
== LOG_ENABLE
) {
541 ACLChecklist
*checklist
= new ACLFilledChecklist(::Config
.accessList
.icap
, al
.request
, dash_str
);
542 if (!::Config
.accessList
.icap
|| checklist
->fastCheck() == ACCESS_ALLOWED
) {
544 icapLogLog(alep
, checklist
);
550 void Adaptation::Icap::Xaction::finalizeLogInfo()
553 al
.icp
.opcode
= ICP_INVALID
;
555 const Adaptation::Icap::ServiceRep
&s
= service();
556 al
.icap
.hostAddr
= s
.cfg().host
.termedBuf();
557 al
.icap
.serviceName
= s
.cfg().key
;
558 al
.icap
.reqUri
= s
.cfg().uri
;
560 al
.icap
.ioTime
= tvSubMsec(icap_tio_start
, icap_tio_finish
);
561 al
.icap
.trTime
= tvSubMsec(icap_tr_start
, current_time
);
563 al
.icap
.request
= HTTPMSGLOCK(icapRequest
);
565 al
.icap
.reply
= HTTPMSGLOCK(icapReply
);
566 al
.icap
.resStatus
= icapReply
->sline
.status
;
570 // returns a temporary string depicting transaction status, for debugging
571 const char *Adaptation::Icap::Xaction::status() const
578 fillPendingStatus(buf
);
582 buf
.Printf(" %s%u]", id
.Prefix
, id
.value
);
586 return buf
.content();
589 void Adaptation::Icap::Xaction::fillPendingStatus(MemBuf
&buf
) const
591 if (haveConnection()) {
592 buf
.Printf("FD %d", connection
->fd
);
604 void Adaptation::Icap::Xaction::fillDoneStatus(MemBuf
&buf
) const
606 if (haveConnection() && commEof
)
607 buf
.Printf("Comm(%d)", connection
->fd
);
609 if (stopReason
!= NULL
)
610 buf
.Printf("Stopped");
613 bool Adaptation::Icap::Xaction::fillVirginHttpHeader(MemBuf
&buf
) const