]> git.ipfire.org Git - thirdparty/squid.git/blame - src/ICAP/ICAPXaction.cc
SourceFormat Update
[thirdparty/squid.git] / src / ICAP / ICAPXaction.cc
CommitLineData
774c051c 1/*
507d0a78 2 * DEBUG: section 93 ICAP (RFC 3507) Client
774c051c 3 */
4
5#include "squid.h"
6#include "comm.h"
bd7f2ede 7#include "CommCalls.h"
5f8252d2 8#include "HttpMsg.h"
774c051c 9#include "ICAPXaction.h"
560d7d2d 10#include "ICAPConfig.h"
774c051c 11#include "TextException.h"
781ce8ff 12#include "pconn.h"
13#include "fde.h"
14
15static PconnPool *icapPconnPool = new PconnPool("ICAP Servers");
774c051c 16
5f8252d2 17
18//CBDATA_CLASS_INIT(ICAPXaction);
19
0bef8dd7 20ICAPXaction::ICAPXaction(const char *aTypeName, Adaptation::Initiator *anInitiator, ICAPServiceRep::Pointer &aService):
bd7f2ede 21 AsyncJob(aTypeName),
0bef8dd7 22 Adaptation::Initiate(aTypeName, anInitiator, aService.getRaw()),
774c051c 23 connection(-1),
24 commBuf(NULL), commBufSize(0),
25 commEof(false),
2dfede9e 26 reuseConnection(true),
c824c43b 27 isRetriable(true),
cfc68405 28 ignoreLastWrite(false),
c824c43b 29 connector(NULL), reader(NULL), writer(NULL), closer(NULL)
774c051c 30{
5f8252d2 31 debugs(93,3, typeName << " constructed, this=" << this <<
9e008dda 32 " [icapx" << id << ']'); // we should not call virtual status() here
774c051c 33}
34
35ICAPXaction::~ICAPXaction()
36{
5f8252d2 37 debugs(93,3, typeName << " destructed, this=" << this <<
9e008dda 38 " [icapx" << id << ']'); // we should not call virtual status() here
5f8252d2 39}
40
0bef8dd7
AR
41ICAPServiceRep &
42ICAPXaction::service()
43{
44 ICAPServiceRep *s = dynamic_cast<ICAPServiceRep*>(&Initiate::service());
45 Must(s);
46 return *s;
47}
48
9e008dda
AJ
49void ICAPXaction::disableRetries()
50{
cfc68405 51 debugs(93,5, typeName << (isRetriable ? " becomes" : " remains") <<
9e008dda 52 " final" << status());
c824c43b 53 isRetriable = false;
54}
55
5f8252d2 56void ICAPXaction::start()
57{
0bef8dd7 58 Adaptation::Initiate::start();
5f8252d2 59
60 readBuf.init(SQUID_TCP_SO_RCVBUF, SQUID_TCP_SO_RCVBUF);
61 commBuf = (char*)memAllocBuf(SQUID_TCP_SO_RCVBUF, &commBufSize);
62 // make sure maximum readBuf space does not exceed commBuf size
63 Must(static_cast<size_t>(readBuf.potentialSpaceSize()) <= commBufSize);
774c051c 64}
65
66// TODO: obey service-specific, OPTIONS-reported connection limit
67void ICAPXaction::openConnection()
68{
7aa6b7cb 69 IPAddress client_addr;
70
c824c43b 71 Must(connection < 0);
72
0bef8dd7 73 const Adaptation::Service &s = service();
774c051c 74
560d7d2d 75 if (!TheICAPConfig.reuse_connections)
76 disableRetries(); // this will also safely drain pconn pool
77
c8ceec27 78 // TODO: check whether NULL domain is appropriate here
0bef8dd7 79 connection = icapPconnPool->pop(s.cfg().host.buf(), s.cfg().port, NULL, client_addr, isRetriable);
c8ceec27 80 if (connection >= 0) {
81 debugs(93,3, HERE << "reused pconn FD " << connection);
bd7f2ede 82
83 // fake the connect callback
84 // TODO: can we sync call ICAPXaction::noteCommConnected here instead?
85 typedef CommCbMemFunT<ICAPXaction, CommConnectCbParams> Dialer;
86 Dialer dialer(this, &ICAPXaction::noteCommConnected);
87 dialer.params.flag = COMM_OK;
88 // fake other parameters by copying from the existing connection
89 connector = asyncCall(93,3, "ICAPXaction::noteCommConnected", dialer);
9e008dda 90 ScheduleCallHere(connector);
c8ceec27 91 return;
2dfede9e 92 }
93
c8ceec27 94 disableRetries(); // we only retry pconn failures
95
eb03221c 96 IPAddress outgoing;
9e008dda
AJ
97 connection = comm_open(SOCK_STREAM, 0, outgoing,
98 COMM_NONBLOCKING, s.cfg().uri.buf());
774c051c 99
c824c43b 100 if (connection < 0)
101 dieOnConnectionFailure(); // throws
774c051c 102
0bef8dd7 103 debugs(93,3, typeName << " opens connection to " << s.cfg().host.buf() << ":" << s.cfg().port);
774c051c 104
cfc68405 105 // TODO: service bypass status may differ from that of a transaction
bd7f2ede 106 typedef CommCbMemFunT<ICAPXaction, CommTimeoutCbParams> TimeoutDialer;
107 AsyncCall::Pointer timeoutCall = asyncCall(93, 5, "ICAPXaction::noteCommTimedout",
9e008dda 108 TimeoutDialer(this,&ICAPXaction::noteCommTimedout));
774c051c 109
0bef8dd7 110 commSetTimeout(connection, TheICAPConfig.connect_timeout(
9e008dda 111 service().cfg().bypass), timeoutCall);
774c051c 112
bd7f2ede 113 typedef CommCbMemFunT<ICAPXaction, CommCloseCbParams> CloseDialer;
114 closer = asyncCall(93, 5, "ICAPXaction::noteCommClosed",
9e008dda 115 CloseDialer(this,&ICAPXaction::noteCommClosed));
bd7f2ede 116 comm_add_close_handler(connection, closer);
117
118 typedef CommCbMemFunT<ICAPXaction, CommConnectCbParams> ConnectDialer;
119 connector = asyncCall(93,3, "ICAPXaction::noteCommConnected",
9e008dda 120 ConnectDialer(this, &ICAPXaction::noteCommConnected));
0bef8dd7 121 commConnectStart(connection, s.cfg().host.buf(), s.cfg().port, connector);
774c051c 122}
123
2dfede9e 124/*
125 * This event handler is necessary to work around the no-rentry policy
126 * of ICAPXaction::callStart()
127 */
bd7f2ede 128#if 0
2dfede9e 129void
130ICAPXaction::reusedConnection(void *data)
131{
bf8fe701 132 debugs(93, 5, "ICAPXaction::reusedConnection");
2dfede9e 133 ICAPXaction *x = (ICAPXaction*)data;
2dfede9e 134 x->noteCommConnected(COMM_OK);
135}
bd7f2ede 136#endif
2dfede9e 137
774c051c 138void ICAPXaction::closeConnection()
139{
140 if (connection >= 0) {
774c051c 141
bd7f2ede 142 if (closer != NULL) {
143 comm_remove_close_handler(connection, closer);
774c051c 144 closer = NULL;
145 }
146
c99de607 147 cancelRead(); // may not work
148
5f8252d2 149 if (reuseConnection && !doneWithIo()) {
150 debugs(93,5, HERE << "not reusing pconn due to pending I/O" << status());
c99de607 151 reuseConnection = false;
152 }
774c051c 153
2dfede9e 154 if (reuseConnection) {
b70ba605 155 IPAddress client_addr;
5f8252d2 156 debugs(93,3, HERE << "pushing pconn" << status());
9e008dda
AJ
157 AsyncCall::Pointer call = NULL;
158 commSetTimeout(connection, -1, call);
0bef8dd7 159 icapPconnPool->push(connection, theService->cfg().host.buf(),
9e008dda 160 theService->cfg().port, NULL, client_addr);
c824c43b 161 disableRetries();
2dfede9e 162 } else {
5f8252d2 163 debugs(93,3, HERE << "closing pconn" << status());
c99de607 164 // comm_close will clear timeout
2dfede9e 165 comm_close(connection);
166 }
774c051c 167
c99de607 168 writer = NULL;
169 reader = NULL;
774c051c 170 connector = NULL;
171 connection = -1;
172 }
173}
174
175// connection with the ICAP service established
bd7f2ede 176void ICAPXaction::noteCommConnected(const CommConnectCbParams &io)
774c051c 177{
bd7f2ede 178 Must(connector != NULL);
774c051c 179 connector = NULL;
c99de607 180
bd7f2ede 181 if (io.flag != COMM_OK)
c99de607 182 dieOnConnectionFailure(); // throws
183
184 fd_table[connection].noteUse(icapPconnPool);
774c051c 185
186 handleCommConnected();
774c051c 187}
188
9e008dda
AJ
189void ICAPXaction::dieOnConnectionFailure()
190{
4932ad93 191 debugs(93, 2, HERE << typeName <<
9e008dda 192 " failed to connect to " << service().cfg().uri);
c99de607 193 theService->noteFailure();
c99de607 194 throw TexcHere("cannot connect to the ICAP service");
195}
196
774c051c 197void ICAPXaction::scheduleWrite(MemBuf &buf)
198{
199 // comm module will free the buffer
bd7f2ede 200 typedef CommCbMemFunT<ICAPXaction, CommIoCbParams> Dialer;
201 writer = asyncCall(93,3, "ICAPXaction::noteCommWrote",
9e008dda 202 Dialer(this, &ICAPXaction::noteCommWrote));
bd7f2ede 203
204 comm_write_mbuf(connection, &buf, writer);
c99de607 205 updateTimeout();
774c051c 206}
207
bd7f2ede 208void ICAPXaction::noteCommWrote(const CommIoCbParams &io)
774c051c 209{
bd7f2ede 210 Must(writer != NULL);
774c051c 211 writer = NULL;
9e008dda 212
cfc68405 213 if (ignoreLastWrite) {
214 // a hack due to comm inability to cancel a pending write
9e008dda 215 ignoreLastWrite = false;
bd7f2ede 216 debugs(93, 7, HERE << "ignoring last write; status: " << io.flag);
cfc68405 217 } else {
bd7f2ede 218 Must(io.flag == COMM_OK);
cfc68405 219 updateTimeout();
bd7f2ede 220 handleCommWrote(io.size);
cfc68405 221 }
774c051c 222}
223
224// communication timeout with the ICAP service
bd7f2ede 225void ICAPXaction::noteCommTimedout(const CommTimeoutCbParams &io)
774c051c 226{
774c051c 227 handleCommTimedout();
774c051c 228}
229
230void ICAPXaction::handleCommTimedout()
231{
4932ad93 232 debugs(93, 2, HERE << typeName << " failed: timeout with " <<
9e008dda
AJ
233 theService->cfg().methodStr() << " " <<
234 theService->cfg().uri.buf() << status());
fe3e2600 235 reuseConnection = false;
cfc68405 236 service().noteFailure();
3cfc19b3 237
bd7f2ede 238 throw TexcHere(connector != NULL ?
9e008dda
AJ
239 "timed out while connecting to the ICAP service" :
240 "timed out while talking to the ICAP service");
774c051c 241}
242
243// unexpected connection close while talking to the ICAP service
bd7f2ede 244void ICAPXaction::noteCommClosed(const CommCloseCbParams &io)
774c051c 245{
246 closer = NULL;
774c051c 247 handleCommClosed();
774c051c 248}
249
250void ICAPXaction::handleCommClosed()
251{
252 mustStop("ICAP service connection externally closed");
253}
254
c824c43b 255void ICAPXaction::callEnd()
774c051c 256{
c824c43b 257 if (doneWithIo()) {
258 debugs(93, 5, HERE << typeName << " done with I/O" << status());
259 closeConnection();
260 }
0bef8dd7 261 Adaptation::Initiate::callEnd(); // may destroy us
774c051c 262}
263
264bool ICAPXaction::doneAll() const
265{
0bef8dd7 266 return !connector && !reader && !writer && Adaptation::Initiate::doneAll();
774c051c 267}
268
9e008dda
AJ
269void ICAPXaction::updateTimeout()
270{
bd7f2ede 271 if (reader != NULL || writer != NULL) {
c99de607 272 // restart the timeout before each I/O
273 // XXX: why does Config.Timeout lacks a write timeout?
cfc68405 274 // TODO: service bypass status may differ from that of a transaction
9e008dda
AJ
275 typedef CommCbMemFunT<ICAPXaction, CommTimeoutCbParams> TimeoutDialer;
276 AsyncCall::Pointer call = asyncCall(93, 5, "ICAPXaction::noteCommTimedout",
277 TimeoutDialer(this,&ICAPXaction::noteCommTimedout));
bd7f2ede 278
9e008dda
AJ
279 commSetTimeout(connection,
280 TheICAPConfig.io_timeout(service().cfg().bypass), call);
c99de607 281 } else {
282 // clear timeout when there is no I/O
283 // Do we need a lifetime timeout?
9e008dda 284 AsyncCall::Pointer call = NULL;
bd7f2ede 285 commSetTimeout(connection, -1, call);
c99de607 286 }
287}
288
774c051c 289void ICAPXaction::scheduleRead()
290{
291 Must(connection >= 0);
292 Must(!reader);
293 Must(readBuf.hasSpace());
294
774c051c 295 /*
296 * See comments in ICAPXaction.h about why we use commBuf
297 * here instead of reading directly into readBuf.buf.
298 */
bd7f2ede 299 typedef CommCbMemFunT<ICAPXaction, CommIoCbParams> Dialer;
300 reader = asyncCall(93,3, "ICAPXaction::noteCommRead",
9e008dda 301 Dialer(this, &ICAPXaction::noteCommRead));
774c051c 302
bd7f2ede 303 comm_read(connection, commBuf, readBuf.spaceSize(), reader);
c99de607 304 updateTimeout();
774c051c 305}
306
307// comm module read a portion of the ICAP response for us
bd7f2ede 308void ICAPXaction::noteCommRead(const CommIoCbParams &io)
774c051c 309{
bd7f2ede 310 Must(reader != NULL);
774c051c 311 reader = NULL;
312
bd7f2ede 313 Must(io.flag == COMM_OK);
314 Must(io.size >= 0);
774c051c 315
c99de607 316 updateTimeout();
317
bd7f2ede 318 debugs(93, 3, HERE << "read " << io.size << " bytes");
774c051c 319
320 /*
321 * See comments in ICAPXaction.h about why we use commBuf
322 * here instead of reading directly into readBuf.buf.
323 */
324
bd7f2ede 325 if (io.size > 0) {
326 readBuf.append(commBuf, io.size);
c824c43b 327 disableRetries(); // because pconn did not fail
328 } else {
329 reuseConnection = false;
774c051c 330 commEof = true;
c824c43b 331 }
774c051c 332
bd7f2ede 333 handleCommRead(io.size);
774c051c 334}
335
336void ICAPXaction::cancelRead()
337{
bd7f2ede 338 if (reader != NULL) {
339 comm_read_cancel(connection, reader);
340 reader = NULL;
774c051c 341 }
342}
343
344bool ICAPXaction::parseHttpMsg(HttpMsg *msg)
345{
def17b6a 346 debugs(93, 5, HERE << "have " << readBuf.contentSize() << " head bytes to parse");
774c051c 347
348 http_status error = HTTP_STATUS_NONE;
349 const bool parsed = msg->parse(&readBuf, commEof, &error);
350 Must(parsed || !error); // success or need more data
351
352 if (!parsed) { // need more data
353 Must(mayReadMore());
354 msg->reset();
355 return false;
356 }
357
358 readBuf.consume(msg->hdr_sz);
359 return true;
360}
361
362bool ICAPXaction::mayReadMore() const
363{
364 return !doneReading() && // will read more data
365 readBuf.hasSpace(); // have space for more data
366}
367
368bool ICAPXaction::doneReading() const
369{
370 return commEof;
371}
372
c99de607 373bool ICAPXaction::doneWriting() const
374{
375 return !writer;
376}
377
378bool ICAPXaction::doneWithIo() const
379{
380 return connection >= 0 && // or we could still be waiting to open it
9e008dda
AJ
381 !connector && !reader && !writer && // fast checks, some redundant
382 doneReading() && doneWriting();
c99de607 383}
384
c824c43b 385// initiator aborted
386void ICAPXaction::noteInitiatorAborted()
774c051c 387{
c824c43b 388
389 if (theInitiator) {
390 clearInitiator();
391 mustStop("initiator aborted");
c99de607 392 }
c824c43b 393
774c051c 394}
395
5f8252d2 396// This 'last chance' method is called before a 'done' transaction is deleted.
397// It is wrong to call virtual methods from a destructor. Besides, this call
398// indicates that the transaction will terminate as planned.
399void ICAPXaction::swanSong()
774c051c 400{
5f8252d2 401 // kids should sing first and then call the parent method.
402
403 closeConnection(); // TODO: rename because we do not always close
404
405 if (!readBuf.isNull())
406 readBuf.clean();
407
408 if (commBuf)
409 memFreeBuf(commBufSize, commBuf);
774c051c 410
c824c43b 411 if (theInitiator)
412 tellQueryAborted(!isRetriable);
774c051c 413
0bef8dd7 414 Adaptation::Initiate::swanSong();
774c051c 415}
416
417// returns a temporary string depicting transaction status, for debugging
418const char *ICAPXaction::status() const
419{
420 static MemBuf buf;
421 buf.reset();
422
5f8252d2 423 buf.append(" [", 2);
774c051c 424
425 fillPendingStatus(buf);
426 buf.append("/", 1);
427 fillDoneStatus(buf);
428
5f8252d2 429 buf.Printf(" icapx%d]", id);
774c051c 430
431 buf.terminate();
432
433 return buf.content();
434}
435
436void ICAPXaction::fillPendingStatus(MemBuf &buf) const
437{
438 if (connection >= 0) {
5f8252d2 439 buf.Printf("FD %d", connection);
774c051c 440
bd7f2ede 441 if (writer != NULL)
774c051c 442 buf.append("w", 1);
443
bd7f2ede 444 if (reader != NULL)
774c051c 445 buf.append("r", 1);
446
5f8252d2 447 buf.append(";", 1);
774c051c 448 }
449}
450
451void ICAPXaction::fillDoneStatus(MemBuf &buf) const
452{
453 if (connection >= 0 && commEof)
454 buf.Printf("Comm(%d)", connection);
455
456 if (stopReason != NULL)
457 buf.Printf("Stopped");
458}
3cfc19b3 459
460bool ICAPXaction::fillVirginHttpHeader(MemBuf &buf) const
461{
462 return false;
463}