]> git.ipfire.org Git - thirdparty/squid.git/blame - src/ICAP/ICAPXaction.cc
Merging async-call branch changes to HEAD:
[thirdparty/squid.git] / src / ICAP / ICAPXaction.cc
CommitLineData
774c051c 1/*
507d0a78 2 * DEBUG: section 93 ICAP (RFC 3507) Client
774c051c 3 */
4
5#include "squid.h"
6#include "comm.h"
5f8252d2 7#include "HttpMsg.h"
774c051c 8#include "ICAPXaction.h"
560d7d2d 9#include "ICAPConfig.h"
774c051c 10#include "TextException.h"
781ce8ff 11#include "pconn.h"
12#include "fde.h"
13
14static PconnPool *icapPconnPool = new PconnPool("ICAP Servers");
774c051c 15
5f8252d2 16int ICAPXaction::TheLastId = 0;
17
18//CBDATA_CLASS_INIT(ICAPXaction);
19
774c051c 20/* comm module handlers (wrappers around corresponding ICAPXaction methods */
21
22// TODO: Teach comm module to call object methods directly
23
774c051c 24static
25ICAPXaction &ICAPXaction_fromData(void *data)
26{
27 ICAPXaction *x = static_cast<ICAPXaction*>(data);
28 assert(x);
29 return *x;
30}
31
32static
33void ICAPXaction_noteCommTimedout(int, void *data)
34{
35 ICAPXaction_fromData(data).noteCommTimedout();
36}
37
38static
39void ICAPXaction_noteCommClosed(int, void *data)
40{
41 ICAPXaction_fromData(data).noteCommClosed();
42}
43
44static
45void ICAPXaction_noteCommConnected(int, comm_err_t status, int xerrno, void *data)
46{
47 ICAPXaction_fromData(data).noteCommConnected(status);
48}
49
50static
c99de607 51void ICAPXaction_noteCommWrote(int, char *, size_t size, comm_err_t status, int xerrno, void *data)
774c051c 52{
53 ICAPXaction_fromData(data).noteCommWrote(status, size);
54}
55
56static
57void ICAPXaction_noteCommRead(int, char *, size_t size, comm_err_t status, int xerrno, void *data)
58{
3b299123 59 debugs(93,3,HERE << data << " read returned " << size);
774c051c 60 ICAPXaction_fromData(data).noteCommRead(status, size);
61}
62
c824c43b 63ICAPXaction::ICAPXaction(const char *aTypeName, ICAPInitiator *anInitiator, ICAPServiceRep::Pointer &aService):
64 ICAPInitiate(aTypeName, anInitiator, aService),
5f8252d2 65 id(++TheLastId),
774c051c 66 connection(-1),
67 commBuf(NULL), commBufSize(0),
68 commEof(false),
2dfede9e 69 reuseConnection(true),
c824c43b 70 isRetriable(true),
cfc68405 71 ignoreLastWrite(false),
c824c43b 72 connector(NULL), reader(NULL), writer(NULL), closer(NULL)
774c051c 73{
5f8252d2 74 debugs(93,3, typeName << " constructed, this=" << this <<
75 " [icapx" << id << ']'); // we should not call virtual status() here
774c051c 76}
77
78ICAPXaction::~ICAPXaction()
79{
5f8252d2 80 debugs(93,3, typeName << " destructed, this=" << this <<
81 " [icapx" << id << ']'); // we should not call virtual status() here
82}
83
c824c43b 84void ICAPXaction::disableRetries() {
cfc68405 85 debugs(93,5, typeName << (isRetriable ? " becomes" : " remains") <<
c824c43b 86 " final" << status());
87 isRetriable = false;
88}
89
5f8252d2 90void ICAPXaction::start()
91{
c824c43b 92 ICAPInitiate::start();
5f8252d2 93
94 readBuf.init(SQUID_TCP_SO_RCVBUF, SQUID_TCP_SO_RCVBUF);
95 commBuf = (char*)memAllocBuf(SQUID_TCP_SO_RCVBUF, &commBufSize);
96 // make sure maximum readBuf space does not exceed commBuf size
97 Must(static_cast<size_t>(readBuf.potentialSpaceSize()) <= commBufSize);
774c051c 98}
99
100// TODO: obey service-specific, OPTIONS-reported connection limit
101void ICAPXaction::openConnection()
102{
7aa6b7cb 103 IPAddress client_addr;
104
c824c43b 105 Must(connection < 0);
106
774c051c 107 const ICAPServiceRep &s = service();
774c051c 108
560d7d2d 109 if (!TheICAPConfig.reuse_connections)
110 disableRetries(); // this will also safely drain pconn pool
111
c8ceec27 112 // TODO: check whether NULL domain is appropriate here
b70ba605 113 connection = icapPconnPool->pop(s.host.buf(), s.port, NULL, client_addr, isRetriable);
c8ceec27 114 if (connection >= 0) {
115 debugs(93,3, HERE << "reused pconn FD " << connection);
116 connector = &ICAPXaction_noteCommConnected; // make doneAll() false
117 eventAdd("ICAPXaction::reusedConnection",
118 reusedConnection,
119 this,
120 0.0,
121 0,
122 true);
123 return;
2dfede9e 124 }
125
c8ceec27 126 disableRetries(); // we only retry pconn failures
127
eb03221c 128 IPAddress outgoing;
b70ba605 129 connection = comm_open(SOCK_STREAM, 0, outgoing,
30abd221 130 COMM_NONBLOCKING, s.uri.buf());
774c051c 131
c824c43b 132 if (connection < 0)
133 dieOnConnectionFailure(); // throws
774c051c 134
30abd221 135 debugs(93,3, typeName << " opens connection to " << s.host.buf() << ":" << s.port);
774c051c 136
cfc68405 137 // TODO: service bypass status may differ from that of a transaction
138 commSetTimeout(connection, TheICAPConfig.connect_timeout(service().bypass),
774c051c 139 &ICAPXaction_noteCommTimedout, this);
140
141 closer = &ICAPXaction_noteCommClosed;
142 comm_add_close_handler(connection, closer, this);
143
144 connector = &ICAPXaction_noteCommConnected;
30abd221 145 commConnectStart(connection, s.host.buf(), s.port, connector, this);
774c051c 146}
147
2dfede9e 148/*
149 * This event handler is necessary to work around the no-rentry policy
150 * of ICAPXaction::callStart()
151 */
152void
153ICAPXaction::reusedConnection(void *data)
154{
bf8fe701 155 debugs(93, 5, "ICAPXaction::reusedConnection");
2dfede9e 156 ICAPXaction *x = (ICAPXaction*)data;
2dfede9e 157 x->noteCommConnected(COMM_OK);
158}
159
774c051c 160void ICAPXaction::closeConnection()
161{
162 if (connection >= 0) {
774c051c 163
164 if (closer) {
165 comm_remove_close_handler(connection, closer, this);
166 closer = NULL;
167 }
168
c99de607 169 cancelRead(); // may not work
170
5f8252d2 171 if (reuseConnection && !doneWithIo()) {
172 debugs(93,5, HERE << "not reusing pconn due to pending I/O" << status());
c99de607 173 reuseConnection = false;
174 }
774c051c 175
2dfede9e 176 if (reuseConnection) {
b70ba605 177 IPAddress client_addr;
5f8252d2 178 debugs(93,3, HERE << "pushing pconn" << status());
c99de607 179 commSetTimeout(connection, -1, NULL, NULL);
b70ba605 180 icapPconnPool->push(connection, theService->host.buf(), theService->port, NULL, client_addr);
c824c43b 181 disableRetries();
2dfede9e 182 } else {
5f8252d2 183 debugs(93,3, HERE << "closing pconn" << status());
c99de607 184 // comm_close will clear timeout
2dfede9e 185 comm_close(connection);
186 }
774c051c 187
c99de607 188 writer = NULL;
189 reader = NULL;
774c051c 190 connector = NULL;
191 connection = -1;
192 }
193}
194
195// connection with the ICAP service established
196void ICAPXaction::noteCommConnected(comm_err_t commStatus)
197{
198 ICAPXaction_Enter(noteCommConnected);
199
200 Must(connector);
201 connector = NULL;
c99de607 202
203 if (commStatus != COMM_OK)
204 dieOnConnectionFailure(); // throws
205
206 fd_table[connection].noteUse(icapPconnPool);
774c051c 207
208 handleCommConnected();
209
210 ICAPXaction_Exit();
211}
212
c99de607 213void ICAPXaction::dieOnConnectionFailure() {
4932ad93 214 debugs(93, 2, HERE << typeName <<
215 " failed to connect to " << service().uri);
c99de607 216 theService->noteFailure();
c99de607 217 throw TexcHere("cannot connect to the ICAP service");
218}
219
774c051c 220void ICAPXaction::scheduleWrite(MemBuf &buf)
221{
222 // comm module will free the buffer
c99de607 223 writer = &ICAPXaction_noteCommWrote;
3a8c65c3 224 comm_write_mbuf(connection, &buf, writer, this);
c99de607 225 updateTimeout();
774c051c 226}
227
228void ICAPXaction::noteCommWrote(comm_err_t commStatus, size_t size)
229{
230 ICAPXaction_Enter(noteCommWrote);
231
232 Must(writer);
233 writer = NULL;
cfc68405 234
235 if (ignoreLastWrite) {
236 // a hack due to comm inability to cancel a pending write
237 ignoreLastWrite = false;
238 debugs(93, 7, HERE << "ignoring last write; status: " << commStatus);
239 } else {
240 Must(commStatus == COMM_OK);
241 updateTimeout();
242 handleCommWrote(size);
243 }
774c051c 244
245 ICAPXaction_Exit();
246}
247
248// communication timeout with the ICAP service
249void ICAPXaction::noteCommTimedout()
250{
251 ICAPXaction_Enter(noteCommTimedout);
252
253 handleCommTimedout();
254
255 ICAPXaction_Exit();
256}
257
258void ICAPXaction::handleCommTimedout()
259{
4932ad93 260 debugs(93, 2, HERE << typeName << " failed: timeout with " <<
cfc68405 261 theService->methodStr() << " " << theService->uri.buf() << status());
fe3e2600 262 reuseConnection = false;
cfc68405 263 service().noteFailure();
3cfc19b3 264
cfc68405 265 throw TexcHere(connector ?
266 "timed out while connecting to the ICAP service" :
267 "timed out while talking to the ICAP service");
774c051c 268}
269
270// unexpected connection close while talking to the ICAP service
271void ICAPXaction::noteCommClosed()
272{
273 closer = NULL;
274 ICAPXaction_Enter(noteCommClosed);
275
276 handleCommClosed();
277
278 ICAPXaction_Exit();
279}
280
281void ICAPXaction::handleCommClosed()
282{
283 mustStop("ICAP service connection externally closed");
284}
285
c824c43b 286void ICAPXaction::callEnd()
774c051c 287{
c824c43b 288 if (doneWithIo()) {
289 debugs(93, 5, HERE << typeName << " done with I/O" << status());
290 closeConnection();
291 }
292 ICAPInitiate::callEnd(); // may destroy us
774c051c 293}
294
295bool ICAPXaction::doneAll() const
296{
c824c43b 297 return !connector && !reader && !writer && ICAPInitiate::doneAll();
774c051c 298}
299
c99de607 300void ICAPXaction::updateTimeout() {
301 if (reader || writer) {
302 // restart the timeout before each I/O
303 // XXX: why does Config.Timeout lacks a write timeout?
cfc68405 304 // TODO: service bypass status may differ from that of a transaction
305 commSetTimeout(connection, TheICAPConfig.io_timeout(service().bypass),
c99de607 306 &ICAPXaction_noteCommTimedout, this);
307 } else {
308 // clear timeout when there is no I/O
309 // Do we need a lifetime timeout?
310 commSetTimeout(connection, -1, NULL, NULL);
311 }
312}
313
774c051c 314void ICAPXaction::scheduleRead()
315{
316 Must(connection >= 0);
317 Must(!reader);
318 Must(readBuf.hasSpace());
319
320 reader = &ICAPXaction_noteCommRead;
321 /*
322 * See comments in ICAPXaction.h about why we use commBuf
323 * here instead of reading directly into readBuf.buf.
324 */
325
326 comm_read(connection, commBuf, readBuf.spaceSize(), reader, this);
c99de607 327 updateTimeout();
774c051c 328}
329
330// comm module read a portion of the ICAP response for us
331void ICAPXaction::noteCommRead(comm_err_t commStatus, size_t sz)
332{
333 ICAPXaction_Enter(noteCommRead);
334
335 Must(reader);
336 reader = NULL;
337
338 Must(commStatus == COMM_OK);
339 Must(sz >= 0);
340
c99de607 341 updateTimeout();
342
3b299123 343 debugs(93, 3, HERE << "read " << sz << " bytes");
774c051c 344
345 /*
346 * See comments in ICAPXaction.h about why we use commBuf
347 * here instead of reading directly into readBuf.buf.
348 */
349
c824c43b 350 if (sz > 0) {
774c051c 351 readBuf.append(commBuf, sz);
c824c43b 352 disableRetries(); // because pconn did not fail
353 } else {
354 reuseConnection = false;
774c051c 355 commEof = true;
c824c43b 356 }
774c051c 357
358 handleCommRead(sz);
359
360 ICAPXaction_Exit();
361}
362
363void ICAPXaction::cancelRead()
364{
365 if (reader) {
366 // check callback presence because comm module removes
367 // fdc_table[].read.callback after the actual I/O but
368 // before we get the callback via a queued event.
369 // These checks try to mimic the comm_read_cancel() assertions.
370
371 if (comm_has_pending_read(connection) &&
c99de607 372 !comm_has_pending_read_callback(connection)) {
774c051c 373 comm_read_cancel(connection, reader, this);
c99de607 374 reader = NULL;
375 }
774c051c 376 }
377}
378
379bool ICAPXaction::parseHttpMsg(HttpMsg *msg)
380{
def17b6a 381 debugs(93, 5, HERE << "have " << readBuf.contentSize() << " head bytes to parse");
774c051c 382
383 http_status error = HTTP_STATUS_NONE;
384 const bool parsed = msg->parse(&readBuf, commEof, &error);
385 Must(parsed || !error); // success or need more data
386
387 if (!parsed) { // need more data
388 Must(mayReadMore());
389 msg->reset();
390 return false;
391 }
392
393 readBuf.consume(msg->hdr_sz);
394 return true;
395}
396
397bool ICAPXaction::mayReadMore() const
398{
399 return !doneReading() && // will read more data
400 readBuf.hasSpace(); // have space for more data
401}
402
403bool ICAPXaction::doneReading() const
404{
405 return commEof;
406}
407
c99de607 408bool ICAPXaction::doneWriting() const
409{
410 return !writer;
411}
412
413bool ICAPXaction::doneWithIo() const
414{
415 return connection >= 0 && // or we could still be waiting to open it
416 !connector && !reader && !writer && // fast checks, some redundant
417 doneReading() && doneWriting();
418}
419
c824c43b 420// initiator aborted
421void ICAPXaction::noteInitiatorAborted()
774c051c 422{
c824c43b 423 ICAPXaction_Enter(noteInitiatorAborted);
424
425 if (theInitiator) {
426 clearInitiator();
427 mustStop("initiator aborted");
c99de607 428 }
c824c43b 429
430 ICAPXaction_Exit();
774c051c 431}
432
5f8252d2 433// This 'last chance' method is called before a 'done' transaction is deleted.
434// It is wrong to call virtual methods from a destructor. Besides, this call
435// indicates that the transaction will terminate as planned.
436void ICAPXaction::swanSong()
774c051c 437{
5f8252d2 438 // kids should sing first and then call the parent method.
439
440 closeConnection(); // TODO: rename because we do not always close
441
442 if (!readBuf.isNull())
443 readBuf.clean();
444
445 if (commBuf)
446 memFreeBuf(commBufSize, commBuf);
774c051c 447
c824c43b 448 if (theInitiator)
449 tellQueryAborted(!isRetriable);
774c051c 450
c824c43b 451 ICAPInitiate::swanSong();
774c051c 452}
453
454// returns a temporary string depicting transaction status, for debugging
455const char *ICAPXaction::status() const
456{
457 static MemBuf buf;
458 buf.reset();
459
5f8252d2 460 buf.append(" [", 2);
774c051c 461
462 fillPendingStatus(buf);
463 buf.append("/", 1);
464 fillDoneStatus(buf);
465
5f8252d2 466 buf.Printf(" icapx%d]", id);
774c051c 467
468 buf.terminate();
469
470 return buf.content();
471}
472
473void ICAPXaction::fillPendingStatus(MemBuf &buf) const
474{
475 if (connection >= 0) {
5f8252d2 476 buf.Printf("FD %d", connection);
774c051c 477
478 if (writer)
479 buf.append("w", 1);
480
481 if (reader)
482 buf.append("r", 1);
483
5f8252d2 484 buf.append(";", 1);
774c051c 485 }
486}
487
488void ICAPXaction::fillDoneStatus(MemBuf &buf) const
489{
490 if (connection >= 0 && commEof)
491 buf.Printf("Comm(%d)", connection);
492
493 if (stopReason != NULL)
494 buf.Printf("Stopped");
495}
3cfc19b3 496
497bool ICAPXaction::fillVirginHttpHeader(MemBuf &buf) const
498{
499 return false;
500}