]> git.ipfire.org Git - thirdparty/squid.git/blob - src/adaptation/icap/Xaction.cc
Author: Alex Rousskov <rousskov@measurement-factory.com>
[thirdparty/squid.git] / src / adaptation / icap / Xaction.cc
1 /*
2 * DEBUG: section 93 ICAP (RFC 3507) Client
3 */
4
5 #include "squid.h"
6 #include "comm.h"
7 #include "CommCalls.h"
8 #include "HttpMsg.h"
9 #include "adaptation/icap/Xaction.h"
10 #include "adaptation/icap/Config.h"
11 #include "TextException.h"
12 #include "pconn.h"
13 #include "fde.h"
14
15 static PconnPool *icapPconnPool = new PconnPool("ICAP Servers");
16
17
18 //CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Icap, Xaction);
19
20 Adaptation::Icap::Xaction::Xaction(const char *aTypeName, Adaptation::Initiator *anInitiator, Adaptation::Icap::ServiceRep::Pointer &aService):
21 AsyncJob(aTypeName),
22 Adaptation::Initiate(aTypeName, anInitiator, aService.getRaw()),
23 connection(-1),
24 commBuf(NULL), commBufSize(0),
25 commEof(false),
26 reuseConnection(true),
27 isRetriable(true),
28 ignoreLastWrite(false),
29 connector(NULL), reader(NULL), writer(NULL), closer(NULL)
30 {
31 debugs(93,3, typeName << " constructed, this=" << this <<
32 " [icapx" << id << ']'); // we should not call virtual status() here
33 }
34
35 Adaptation::Icap::Xaction::~Xaction()
36 {
37 debugs(93,3, typeName << " destructed, this=" << this <<
38 " [icapx" << id << ']'); // we should not call virtual status() here
39 }
40
41 Adaptation::Icap::ServiceRep &
42 Adaptation::Icap::Xaction::service()
43 {
44 Adaptation::Icap::ServiceRep *s = dynamic_cast<Adaptation::Icap::ServiceRep*>(&Initiate::service());
45 Must(s);
46 return *s;
47 }
48
49 void Adaptation::Icap::Xaction::disableRetries()
50 {
51 debugs(93,5, typeName << (isRetriable ? " becomes" : " remains") <<
52 " final" << status());
53 isRetriable = false;
54 }
55
56 void Adaptation::Icap::Xaction::start()
57 {
58 Adaptation::Initiate::start();
59
60 readBuf.init(SQUID_TCP_SO_RCVBUF, SQUID_TCP_SO_RCVBUF);
61 commBuf = (char*)memAllocBuf(SQUID_TCP_SO_RCVBUF, &commBufSize);
62 // make sure maximum readBuf space does not exceed commBuf size
63 Must(static_cast<size_t>(readBuf.potentialSpaceSize()) <= commBufSize);
64 }
65
66 // TODO: obey service-specific, OPTIONS-reported connection limit
67 void Adaptation::Icap::Xaction::openConnection()
68 {
69 IpAddress client_addr;
70
71 Must(connection < 0);
72
73 const Adaptation::Service &s = service();
74
75 if (!TheConfig.reuse_connections)
76 disableRetries(); // this will also safely drain pconn pool
77
78 // TODO: check whether NULL domain is appropriate here
79 connection = icapPconnPool->pop(s.cfg().host.termedBuf(), s.cfg().port, NULL, client_addr, isRetriable);
80 if (connection >= 0) {
81 debugs(93,3, HERE << "reused pconn FD " << connection);
82
83 // fake the connect callback
84 // TODO: can we sync call Adaptation::Icap::Xaction::noteCommConnected here instead?
85 typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> Dialer;
86 Dialer dialer(this, &Adaptation::Icap::Xaction::noteCommConnected);
87 dialer.params.fd = connection;
88 dialer.params.flag = COMM_OK;
89 // fake other parameters by copying from the existing connection
90 connector = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected", dialer);
91 ScheduleCallHere(connector);
92 return;
93 }
94
95 disableRetries(); // we only retry pconn failures
96
97 IpAddress outgoing;
98 connection = comm_open(SOCK_STREAM, 0, outgoing,
99 COMM_NONBLOCKING, s.cfg().uri.termedBuf());
100
101 if (connection < 0)
102 dieOnConnectionFailure(); // throws
103
104 debugs(93,3, typeName << " opens connection to " << s.cfg().host << ":" << s.cfg().port);
105
106 // TODO: service bypass status may differ from that of a transaction
107 typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommTimeoutCbParams> TimeoutDialer;
108 AsyncCall::Pointer timeoutCall = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommTimedout",
109 TimeoutDialer(this,&Adaptation::Icap::Xaction::noteCommTimedout));
110
111 commSetTimeout(connection, TheConfig.connect_timeout(
112 service().cfg().bypass), timeoutCall);
113
114 typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommCloseCbParams> CloseDialer;
115 closer = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed",
116 CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed));
117 comm_add_close_handler(connection, closer);
118
119 typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> ConnectDialer;
120 connector = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected",
121 ConnectDialer(this, &Adaptation::Icap::Xaction::noteCommConnected));
122 commConnectStart(connection, s.cfg().host.termedBuf(), s.cfg().port, connector);
123 }
124
125 /*
126 * This event handler is necessary to work around the no-rentry policy
127 * of Adaptation::Icap::Xaction::callStart()
128 */
129 #if 0
130 void
131 Adaptation::Icap::Xaction::reusedConnection(void *data)
132 {
133 debugs(93, 5, HERE << "reused connection");
134 Adaptation::Icap::Xaction *x = (Adaptation::Icap::Xaction*)data;
135 x->noteCommConnected(COMM_OK);
136 }
137 #endif
138
139 void Adaptation::Icap::Xaction::closeConnection()
140 {
141 if (connection >= 0) {
142
143 if (closer != NULL) {
144 comm_remove_close_handler(connection, closer);
145 closer = NULL;
146 }
147
148 cancelRead(); // may not work
149
150 if (reuseConnection && !doneWithIo()) {
151 //status() adds leading spaces.
152 debugs(93,5, HERE << "not reusing pconn due to pending I/O" << status());
153 reuseConnection = false;
154 }
155
156 if (reuseConnection) {
157 IpAddress client_addr;
158 //status() adds leading spaces.
159 debugs(93,3, HERE << "pushing pconn" << status());
160 AsyncCall::Pointer call = NULL;
161 commSetTimeout(connection, -1, call);
162 icapPconnPool->push(connection, theService->cfg().host.termedBuf(),
163 theService->cfg().port, NULL, client_addr);
164 disableRetries();
165 } else {
166 //status() adds leading spaces.
167 debugs(93,3, HERE << "closing pconn" << status());
168 // comm_close will clear timeout
169 comm_close(connection);
170 }
171
172 writer = NULL;
173 reader = NULL;
174 connector = NULL;
175 connection = -1;
176 }
177 }
178
179 // connection with the ICAP service established
180 void Adaptation::Icap::Xaction::noteCommConnected(const CommConnectCbParams &io)
181 {
182 Must(connector != NULL);
183 connector = NULL;
184
185 if (io.flag != COMM_OK)
186 dieOnConnectionFailure(); // throws
187
188 fd_table[connection].noteUse(icapPconnPool);
189
190 handleCommConnected();
191 }
192
193 void Adaptation::Icap::Xaction::dieOnConnectionFailure()
194 {
195 debugs(93, 2, HERE << typeName <<
196 " failed to connect to " << service().cfg().uri);
197 theService->noteFailure();
198 throw TexcHere("cannot connect to the ICAP service");
199 }
200
201 void Adaptation::Icap::Xaction::scheduleWrite(MemBuf &buf)
202 {
203 // comm module will free the buffer
204 typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommIoCbParams> Dialer;
205 writer = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommWrote",
206 Dialer(this, &Adaptation::Icap::Xaction::noteCommWrote));
207
208 comm_write_mbuf(connection, &buf, writer);
209 updateTimeout();
210 }
211
212 void Adaptation::Icap::Xaction::noteCommWrote(const CommIoCbParams &io)
213 {
214 Must(writer != NULL);
215 writer = NULL;
216
217 if (ignoreLastWrite) {
218 // a hack due to comm inability to cancel a pending write
219 ignoreLastWrite = false;
220 debugs(93, 7, HERE << "ignoring last write; status: " << io.flag);
221 } else {
222 Must(io.flag == COMM_OK);
223 updateTimeout();
224 handleCommWrote(io.size);
225 }
226 }
227
228 // communication timeout with the ICAP service
229 void Adaptation::Icap::Xaction::noteCommTimedout(const CommTimeoutCbParams &io)
230 {
231 handleCommTimedout();
232 }
233
234 void Adaptation::Icap::Xaction::handleCommTimedout()
235 {
236 debugs(93, 2, HERE << typeName << " failed: timeout with " <<
237 theService->cfg().methodStr() << " " <<
238 theService->cfg().uri << status());
239 reuseConnection = false;
240 service().noteFailure();
241
242 throw TexcHere(connector != NULL ?
243 "timed out while connecting to the ICAP service" :
244 "timed out while talking to the ICAP service");
245 }
246
247 // unexpected connection close while talking to the ICAP service
248 void Adaptation::Icap::Xaction::noteCommClosed(const CommCloseCbParams &io)
249 {
250 closer = NULL;
251 handleCommClosed();
252 }
253
254 void Adaptation::Icap::Xaction::handleCommClosed()
255 {
256 mustStop("ICAP service connection externally closed");
257 }
258
259 void Adaptation::Icap::Xaction::callEnd()
260 {
261 if (doneWithIo()) {
262 debugs(93, 5, HERE << typeName << " done with I/O" << status());
263 closeConnection();
264 }
265 Adaptation::Initiate::callEnd(); // may destroy us
266 }
267
268 bool Adaptation::Icap::Xaction::doneAll() const
269 {
270 return !connector && !reader && !writer && Adaptation::Initiate::doneAll();
271 }
272
273 void Adaptation::Icap::Xaction::updateTimeout()
274 {
275 if (reader != NULL || writer != NULL) {
276 // restart the timeout before each I/O
277 // XXX: why does Config.Timeout lacks a write timeout?
278 // TODO: service bypass status may differ from that of a transaction
279 typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommTimeoutCbParams> TimeoutDialer;
280 AsyncCall::Pointer call = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommTimedout",
281 TimeoutDialer(this,&Adaptation::Icap::Xaction::noteCommTimedout));
282
283 commSetTimeout(connection,
284 TheConfig.io_timeout(service().cfg().bypass), call);
285 } else {
286 // clear timeout when there is no I/O
287 // Do we need a lifetime timeout?
288 AsyncCall::Pointer call = NULL;
289 commSetTimeout(connection, -1, call);
290 }
291 }
292
293 void Adaptation::Icap::Xaction::scheduleRead()
294 {
295 Must(connection >= 0);
296 Must(!reader);
297 Must(readBuf.hasSpace());
298
299 /*
300 * See comments in Adaptation::Icap::Xaction.h about why we use commBuf
301 * here instead of reading directly into readBuf.buf.
302 */
303 typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommIoCbParams> Dialer;
304 reader = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommRead",
305 Dialer(this, &Adaptation::Icap::Xaction::noteCommRead));
306
307 comm_read(connection, commBuf, readBuf.spaceSize(), reader);
308 updateTimeout();
309 }
310
311 // comm module read a portion of the ICAP response for us
312 void Adaptation::Icap::Xaction::noteCommRead(const CommIoCbParams &io)
313 {
314 Must(reader != NULL);
315 reader = NULL;
316
317 Must(io.flag == COMM_OK);
318 Must(io.size >= 0);
319
320 updateTimeout();
321
322 debugs(93, 3, HERE << "read " << io.size << " bytes");
323
324 /*
325 * See comments in Adaptation::Icap::Xaction.h about why we use commBuf
326 * here instead of reading directly into readBuf.buf.
327 */
328
329 if (io.size > 0) {
330 readBuf.append(commBuf, io.size);
331 disableRetries(); // because pconn did not fail
332 } else {
333 reuseConnection = false;
334 commEof = true;
335 }
336
337 handleCommRead(io.size);
338 }
339
340 void Adaptation::Icap::Xaction::cancelRead()
341 {
342 if (reader != NULL) {
343 comm_read_cancel(connection, reader);
344 reader = NULL;
345 }
346 }
347
348 bool Adaptation::Icap::Xaction::parseHttpMsg(HttpMsg *msg)
349 {
350 debugs(93, 5, HERE << "have " << readBuf.contentSize() << " head bytes to parse");
351
352 http_status error = HTTP_STATUS_NONE;
353 const bool parsed = msg->parse(&readBuf, commEof, &error);
354 Must(parsed || !error); // success or need more data
355
356 if (!parsed) { // need more data
357 Must(mayReadMore());
358 msg->reset();
359 return false;
360 }
361
362 readBuf.consume(msg->hdr_sz);
363 return true;
364 }
365
366 bool Adaptation::Icap::Xaction::mayReadMore() const
367 {
368 return !doneReading() && // will read more data
369 readBuf.hasSpace(); // have space for more data
370 }
371
372 bool Adaptation::Icap::Xaction::doneReading() const
373 {
374 return commEof;
375 }
376
377 bool Adaptation::Icap::Xaction::doneWriting() const
378 {
379 return !writer;
380 }
381
382 bool Adaptation::Icap::Xaction::doneWithIo() const
383 {
384 return connection >= 0 && // or we could still be waiting to open it
385 !connector && !reader && !writer && // fast checks, some redundant
386 doneReading() && doneWriting();
387 }
388
389 // initiator aborted
390 void Adaptation::Icap::Xaction::noteInitiatorAborted()
391 {
392
393 if (theInitiator) {
394 clearInitiator();
395 mustStop("initiator aborted");
396 }
397
398 }
399
400 // This 'last chance' method is called before a 'done' transaction is deleted.
401 // It is wrong to call virtual methods from a destructor. Besides, this call
402 // indicates that the transaction will terminate as planned.
403 void Adaptation::Icap::Xaction::swanSong()
404 {
405 // kids should sing first and then call the parent method.
406
407 closeConnection(); // TODO: rename because we do not always close
408
409 if (!readBuf.isNull())
410 readBuf.clean();
411
412 if (commBuf)
413 memFreeBuf(commBufSize, commBuf);
414
415 if (theInitiator)
416 tellQueryAborted(!isRetriable);
417
418 Adaptation::Initiate::swanSong();
419 }
420
421 // returns a temporary string depicting transaction status, for debugging
422 const char *Adaptation::Icap::Xaction::status() const
423 {
424 static MemBuf buf;
425 buf.reset();
426
427 buf.append(" [", 2);
428
429 fillPendingStatus(buf);
430 buf.append("/", 1);
431 fillDoneStatus(buf);
432
433 buf.Printf(" icapx%d]", id);
434
435 buf.terminate();
436
437 return buf.content();
438 }
439
440 void Adaptation::Icap::Xaction::fillPendingStatus(MemBuf &buf) const
441 {
442 if (connection >= 0) {
443 buf.Printf("FD %d", connection);
444
445 if (writer != NULL)
446 buf.append("w", 1);
447
448 if (reader != NULL)
449 buf.append("r", 1);
450
451 buf.append(";", 1);
452 }
453 }
454
455 void Adaptation::Icap::Xaction::fillDoneStatus(MemBuf &buf) const
456 {
457 if (connection >= 0 && commEof)
458 buf.Printf("Comm(%d)", connection);
459
460 if (stopReason != NULL)
461 buf.Printf("Stopped");
462 }
463
464 bool Adaptation::Icap::Xaction::fillVirginHttpHeader(MemBuf &buf) const
465 {
466 return false;
467 }