]>
git.ipfire.org Git - thirdparty/squid.git/blob - src/adaptation/icap/Xaction.cc
2 * DEBUG: section 93 ICAP (RFC 3507) Client
9 #include "adaptation/icap/Xaction.h"
10 #include "adaptation/icap/Config.h"
11 #include "TextException.h"
15 static PconnPool
*icapPconnPool
= new PconnPool("ICAP Servers");
18 //CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Icap, Xaction);
20 Adaptation::Icap::Xaction::Xaction(const char *aTypeName
, Adaptation::Initiator
*anInitiator
, Adaptation::Icap::ServiceRep::Pointer
&aService
):
22 Adaptation::Initiate(aTypeName
, anInitiator
, aService
.getRaw()),
24 commBuf(NULL
), commBufSize(0),
26 reuseConnection(true),
28 ignoreLastWrite(false),
29 connector(NULL
), reader(NULL
), writer(NULL
), closer(NULL
)
31 debugs(93,3, typeName
<< " constructed, this=" << this <<
32 " [icapx" << id
<< ']'); // we should not call virtual status() here
35 Adaptation::Icap::Xaction::~Xaction()
37 debugs(93,3, typeName
<< " destructed, this=" << this <<
38 " [icapx" << id
<< ']'); // we should not call virtual status() here
41 Adaptation::Icap::ServiceRep
&
42 Adaptation::Icap::Xaction::service()
44 Adaptation::Icap::ServiceRep
*s
= dynamic_cast<Adaptation::Icap::ServiceRep
*>(&Initiate::service());
49 void Adaptation::Icap::Xaction::disableRetries()
51 debugs(93,5, typeName
<< (isRetriable
? " becomes" : " remains") <<
52 " final" << status());
56 void Adaptation::Icap::Xaction::start()
58 Adaptation::Initiate::start();
60 readBuf
.init(SQUID_TCP_SO_RCVBUF
, SQUID_TCP_SO_RCVBUF
);
61 commBuf
= (char*)memAllocBuf(SQUID_TCP_SO_RCVBUF
, &commBufSize
);
62 // make sure maximum readBuf space does not exceed commBuf size
63 Must(static_cast<size_t>(readBuf
.potentialSpaceSize()) <= commBufSize
);
66 // TODO: obey service-specific, OPTIONS-reported connection limit
67 void Adaptation::Icap::Xaction::openConnection()
69 IpAddress client_addr
;
73 const Adaptation::Service
&s
= service();
75 if (!TheConfig
.reuse_connections
)
76 disableRetries(); // this will also safely drain pconn pool
78 // TODO: check whether NULL domain is appropriate here
79 connection
= icapPconnPool
->pop(s
.cfg().host
.termedBuf(), s
.cfg().port
, NULL
, client_addr
, isRetriable
);
80 if (connection
>= 0) {
81 debugs(93,3, HERE
<< "reused pconn FD " << connection
);
83 // fake the connect callback
84 // TODO: can we sync call Adaptation::Icap::Xaction::noteCommConnected here instead?
85 typedef CommCbMemFunT
<Adaptation::Icap::Xaction
, CommConnectCbParams
> Dialer
;
86 Dialer
dialer(this, &Adaptation::Icap::Xaction::noteCommConnected
);
87 dialer
.params
.fd
= connection
;
88 dialer
.params
.flag
= COMM_OK
;
89 // fake other parameters by copying from the existing connection
90 connector
= asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected", dialer
);
91 ScheduleCallHere(connector
);
95 disableRetries(); // we only retry pconn failures
98 connection
= comm_open(SOCK_STREAM
, 0, outgoing
,
99 COMM_NONBLOCKING
, s
.cfg().uri
.termedBuf());
102 dieOnConnectionFailure(); // throws
104 debugs(93,3, typeName
<< " opens connection to " << s
.cfg().host
<< ":" << s
.cfg().port
);
106 // TODO: service bypass status may differ from that of a transaction
107 typedef CommCbMemFunT
<Adaptation::Icap::Xaction
, CommTimeoutCbParams
> TimeoutDialer
;
108 AsyncCall::Pointer timeoutCall
= asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommTimedout",
109 TimeoutDialer(this,&Adaptation::Icap::Xaction::noteCommTimedout
));
111 commSetTimeout(connection
, TheConfig
.connect_timeout(
112 service().cfg().bypass
), timeoutCall
);
114 typedef CommCbMemFunT
<Adaptation::Icap::Xaction
, CommCloseCbParams
> CloseDialer
;
115 closer
= asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed",
116 CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed
));
117 comm_add_close_handler(connection
, closer
);
119 typedef CommCbMemFunT
<Adaptation::Icap::Xaction
, CommConnectCbParams
> ConnectDialer
;
120 connector
= asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected",
121 ConnectDialer(this, &Adaptation::Icap::Xaction::noteCommConnected
));
122 commConnectStart(connection
, s
.cfg().host
.termedBuf(), s
.cfg().port
, connector
);
126 * This event handler is necessary to work around the no-rentry policy
127 * of Adaptation::Icap::Xaction::callStart()
131 Adaptation::Icap::Xaction::reusedConnection(void *data
)
133 debugs(93, 5, HERE
<< "reused connection");
134 Adaptation::Icap::Xaction
*x
= (Adaptation::Icap::Xaction
*)data
;
135 x
->noteCommConnected(COMM_OK
);
139 void Adaptation::Icap::Xaction::closeConnection()
141 if (connection
>= 0) {
143 if (closer
!= NULL
) {
144 comm_remove_close_handler(connection
, closer
);
148 cancelRead(); // may not work
150 if (reuseConnection
&& !doneWithIo()) {
151 //status() adds leading spaces.
152 debugs(93,5, HERE
<< "not reusing pconn due to pending I/O" << status());
153 reuseConnection
= false;
156 if (reuseConnection
) {
157 IpAddress client_addr
;
158 //status() adds leading spaces.
159 debugs(93,3, HERE
<< "pushing pconn" << status());
160 AsyncCall::Pointer call
= NULL
;
161 commSetTimeout(connection
, -1, call
);
162 icapPconnPool
->push(connection
, theService
->cfg().host
.termedBuf(),
163 theService
->cfg().port
, NULL
, client_addr
);
166 //status() adds leading spaces.
167 debugs(93,3, HERE
<< "closing pconn" << status());
168 // comm_close will clear timeout
169 comm_close(connection
);
179 // connection with the ICAP service established
180 void Adaptation::Icap::Xaction::noteCommConnected(const CommConnectCbParams
&io
)
182 Must(connector
!= NULL
);
185 if (io
.flag
!= COMM_OK
)
186 dieOnConnectionFailure(); // throws
188 fd_table
[connection
].noteUse(icapPconnPool
);
190 handleCommConnected();
193 void Adaptation::Icap::Xaction::dieOnConnectionFailure()
195 debugs(93, 2, HERE
<< typeName
<<
196 " failed to connect to " << service().cfg().uri
);
197 theService
->noteFailure();
198 throw TexcHere("cannot connect to the ICAP service");
201 void Adaptation::Icap::Xaction::scheduleWrite(MemBuf
&buf
)
203 // comm module will free the buffer
204 typedef CommCbMemFunT
<Adaptation::Icap::Xaction
, CommIoCbParams
> Dialer
;
205 writer
= asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommWrote",
206 Dialer(this, &Adaptation::Icap::Xaction::noteCommWrote
));
208 comm_write_mbuf(connection
, &buf
, writer
);
212 void Adaptation::Icap::Xaction::noteCommWrote(const CommIoCbParams
&io
)
214 Must(writer
!= NULL
);
217 if (ignoreLastWrite
) {
218 // a hack due to comm inability to cancel a pending write
219 ignoreLastWrite
= false;
220 debugs(93, 7, HERE
<< "ignoring last write; status: " << io
.flag
);
222 Must(io
.flag
== COMM_OK
);
224 handleCommWrote(io
.size
);
228 // communication timeout with the ICAP service
229 void Adaptation::Icap::Xaction::noteCommTimedout(const CommTimeoutCbParams
&io
)
231 handleCommTimedout();
234 void Adaptation::Icap::Xaction::handleCommTimedout()
236 debugs(93, 2, HERE
<< typeName
<< " failed: timeout with " <<
237 theService
->cfg().methodStr() << " " <<
238 theService
->cfg().uri
<< status());
239 reuseConnection
= false;
240 service().noteFailure();
242 throw TexcHere(connector
!= NULL
?
243 "timed out while connecting to the ICAP service" :
244 "timed out while talking to the ICAP service");
247 // unexpected connection close while talking to the ICAP service
248 void Adaptation::Icap::Xaction::noteCommClosed(const CommCloseCbParams
&io
)
254 void Adaptation::Icap::Xaction::handleCommClosed()
256 mustStop("ICAP service connection externally closed");
259 void Adaptation::Icap::Xaction::callEnd()
262 debugs(93, 5, HERE
<< typeName
<< " done with I/O" << status());
265 Adaptation::Initiate::callEnd(); // may destroy us
268 bool Adaptation::Icap::Xaction::doneAll() const
270 return !connector
&& !reader
&& !writer
&& Adaptation::Initiate::doneAll();
273 void Adaptation::Icap::Xaction::updateTimeout()
275 if (reader
!= NULL
|| writer
!= NULL
) {
276 // restart the timeout before each I/O
277 // XXX: why does Config.Timeout lacks a write timeout?
278 // TODO: service bypass status may differ from that of a transaction
279 typedef CommCbMemFunT
<Adaptation::Icap::Xaction
, CommTimeoutCbParams
> TimeoutDialer
;
280 AsyncCall::Pointer call
= asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommTimedout",
281 TimeoutDialer(this,&Adaptation::Icap::Xaction::noteCommTimedout
));
283 commSetTimeout(connection
,
284 TheConfig
.io_timeout(service().cfg().bypass
), call
);
286 // clear timeout when there is no I/O
287 // Do we need a lifetime timeout?
288 AsyncCall::Pointer call
= NULL
;
289 commSetTimeout(connection
, -1, call
);
293 void Adaptation::Icap::Xaction::scheduleRead()
295 Must(connection
>= 0);
297 Must(readBuf
.hasSpace());
300 * See comments in Adaptation::Icap::Xaction.h about why we use commBuf
301 * here instead of reading directly into readBuf.buf.
303 typedef CommCbMemFunT
<Adaptation::Icap::Xaction
, CommIoCbParams
> Dialer
;
304 reader
= asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommRead",
305 Dialer(this, &Adaptation::Icap::Xaction::noteCommRead
));
307 comm_read(connection
, commBuf
, readBuf
.spaceSize(), reader
);
311 // comm module read a portion of the ICAP response for us
312 void Adaptation::Icap::Xaction::noteCommRead(const CommIoCbParams
&io
)
314 Must(reader
!= NULL
);
317 Must(io
.flag
== COMM_OK
);
322 debugs(93, 3, HERE
<< "read " << io
.size
<< " bytes");
325 * See comments in Adaptation::Icap::Xaction.h about why we use commBuf
326 * here instead of reading directly into readBuf.buf.
330 readBuf
.append(commBuf
, io
.size
);
331 disableRetries(); // because pconn did not fail
333 reuseConnection
= false;
337 handleCommRead(io
.size
);
340 void Adaptation::Icap::Xaction::cancelRead()
342 if (reader
!= NULL
) {
343 comm_read_cancel(connection
, reader
);
348 bool Adaptation::Icap::Xaction::parseHttpMsg(HttpMsg
*msg
)
350 debugs(93, 5, HERE
<< "have " << readBuf
.contentSize() << " head bytes to parse");
352 http_status error
= HTTP_STATUS_NONE
;
353 const bool parsed
= msg
->parse(&readBuf
, commEof
, &error
);
354 Must(parsed
|| !error
); // success or need more data
356 if (!parsed
) { // need more data
362 readBuf
.consume(msg
->hdr_sz
);
366 bool Adaptation::Icap::Xaction::mayReadMore() const
368 return !doneReading() && // will read more data
369 readBuf
.hasSpace(); // have space for more data
372 bool Adaptation::Icap::Xaction::doneReading() const
377 bool Adaptation::Icap::Xaction::doneWriting() const
382 bool Adaptation::Icap::Xaction::doneWithIo() const
384 return connection
>= 0 && // or we could still be waiting to open it
385 !connector
&& !reader
&& !writer
&& // fast checks, some redundant
386 doneReading() && doneWriting();
390 void Adaptation::Icap::Xaction::noteInitiatorAborted()
395 mustStop("initiator aborted");
400 // This 'last chance' method is called before a 'done' transaction is deleted.
401 // It is wrong to call virtual methods from a destructor. Besides, this call
402 // indicates that the transaction will terminate as planned.
403 void Adaptation::Icap::Xaction::swanSong()
405 // kids should sing first and then call the parent method.
407 closeConnection(); // TODO: rename because we do not always close
409 if (!readBuf
.isNull())
413 memFreeBuf(commBufSize
, commBuf
);
416 tellQueryAborted(!isRetriable
);
418 Adaptation::Initiate::swanSong();
421 // returns a temporary string depicting transaction status, for debugging
422 const char *Adaptation::Icap::Xaction::status() const
429 fillPendingStatus(buf
);
433 buf
.Printf(" icapx%d]", id
);
437 return buf
.content();
440 void Adaptation::Icap::Xaction::fillPendingStatus(MemBuf
&buf
) const
442 if (connection
>= 0) {
443 buf
.Printf("FD %d", connection
);
455 void Adaptation::Icap::Xaction::fillDoneStatus(MemBuf
&buf
) const
457 if (connection
>= 0 && commEof
)
458 buf
.Printf("Comm(%d)", connection
);
460 if (stopReason
!= NULL
)
461 buf
.Printf("Stopped");
464 bool Adaptation::Icap::Xaction::fillVirginHttpHeader(MemBuf
&buf
) const