]> git.ipfire.org Git - thirdparty/squid.git/blob - src/adaptation/icap/Xaction.cc
SourceFormat Enforcement
[thirdparty/squid.git] / src / adaptation / icap / Xaction.cc
1 /*
2 * DEBUG: section 93 ICAP (RFC 3507) Client
3 */
4
5 #include "squid.h"
6 #include "acl/FilledChecklist.h"
7 #include "adaptation/icap/Config.h"
8 #include "adaptation/icap/Launcher.h"
9 #include "adaptation/icap/Xaction.h"
10 #include "base/TextException.h"
11 #include "comm.h"
12 #include "comm/Connection.h"
13 #include "comm/ConnOpener.h"
14 #include "comm/Write.h"
15 #include "CommCalls.h"
16 #include "err_detail_type.h"
17 #include "fde.h"
18 #include "globals.h"
19 #include "HttpMsg.h"
20 #include "HttpReply.h"
21 #include "HttpRequest.h"
22 #include "icap_log.h"
23 #include "ipcache.h"
24 #include "pconn.h"
25 #include "protos.h"
26 #include "SquidTime.h"
27
28 //CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Icap, Xaction);
29
30 Adaptation::Icap::Xaction::Xaction(const char *aTypeName, Adaptation::Icap::ServiceRep::Pointer &aService):
31 AsyncJob(aTypeName),
32 Adaptation::Initiate(aTypeName),
33 icapRequest(NULL),
34 icapReply(NULL),
35 attempts(0),
36 connection(NULL),
37 theService(aService),
38 commBuf(NULL), commBufSize(0),
39 commEof(false),
40 reuseConnection(true),
41 isRetriable(true),
42 isRepeatable(true),
43 ignoreLastWrite(false),
44 connector(NULL), reader(NULL), writer(NULL), closer(NULL),
45 alep(new AccessLogEntry),
46 al(*alep)
47 {
48 debugs(93,3, typeName << " constructed, this=" << this <<
49 " [icapx" << id << ']'); // we should not call virtual status() here
50 icapRequest = HTTPMSGLOCK(new HttpRequest);
51 icap_tr_start = current_time;
52 }
53
54 Adaptation::Icap::Xaction::~Xaction()
55 {
56 debugs(93,3, typeName << " destructed, this=" << this <<
57 " [icapx" << id << ']'); // we should not call virtual status() here
58 HTTPMSGUNLOCK(icapRequest);
59 }
60
61 Adaptation::Icap::ServiceRep &
62 Adaptation::Icap::Xaction::service()
63 {
64 Must(theService != NULL);
65 return *theService;
66 }
67
68 void Adaptation::Icap::Xaction::disableRetries()
69 {
70 debugs(93,5, typeName << (isRetriable ? " from now on" : " still") <<
71 " cannot be retried " << status());
72 isRetriable = false;
73 }
74
75 void Adaptation::Icap::Xaction::disableRepeats(const char *reason)
76 {
77 debugs(93,5, typeName << (isRepeatable ? " from now on" : " still") <<
78 " cannot be repeated because " << reason << status());
79 isRepeatable = false;
80 }
81
82 void Adaptation::Icap::Xaction::start()
83 {
84 Adaptation::Initiate::start();
85
86 readBuf.init(SQUID_TCP_SO_RCVBUF, SQUID_TCP_SO_RCVBUF);
87 commBuf = (char*)memAllocBuf(SQUID_TCP_SO_RCVBUF, &commBufSize);
88 // make sure maximum readBuf space does not exceed commBuf size
89 Must(static_cast<size_t>(readBuf.potentialSpaceSize()) <= commBufSize);
90 }
91
92 static void
93 icapLookupDnsResults(const ipcache_addrs *ia, const DnsLookupDetails &, void *data)
94 {
95 Adaptation::Icap::Xaction *xa = static_cast<Adaptation::Icap::Xaction *>(data);
96 xa->dnsLookupDone(ia);
97 }
98
99 // TODO: obey service-specific, OPTIONS-reported connection limit
100 void
101 Adaptation::Icap::Xaction::openConnection()
102 {
103 Must(!haveConnection());
104
105 Adaptation::Icap::ServiceRep &s = service();
106
107 if (!TheConfig.reuse_connections)
108 disableRetries(); // this will also safely drain pconn pool
109
110 bool wasReused = false;
111 connection = s.getConnection(isRetriable, wasReused);
112
113 if (wasReused && Comm::IsConnOpen(connection)) {
114 // Set comm Close handler
115 // fake the connect callback
116 // TODO: can we sync call Adaptation::Icap::Xaction::noteCommConnected here instead?
117 typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> Dialer;
118 CbcPointer<Xaction> self(this);
119 Dialer dialer(self, &Adaptation::Icap::Xaction::noteCommConnected);
120 dialer.params.conn = connection;
121 dialer.params.flag = COMM_OK;
122 // fake other parameters by copying from the existing connection
123 connector = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected", dialer);
124 ScheduleCallHere(connector);
125 return;
126 }
127
128 disableRetries(); // we only retry pconn failures
129
130 // Attempt to open a new connection...
131 debugs(93,3, typeName << " opens connection to " << s.cfg().host.termedBuf() << ":" << s.cfg().port);
132
133 // Locate the Service IP(s) to open
134 ipcache_nbgethostbyname(s.cfg().host.termedBuf(), icapLookupDnsResults, this);
135 }
136
137 void
138 Adaptation::Icap::Xaction::dnsLookupDone(const ipcache_addrs *ia)
139 {
140 Adaptation::Icap::ServiceRep &s = service();
141
142 if (ia == NULL) {
143 debugs(44, DBG_IMPORTANT, "ICAP: Unknown service host: " << s.cfg().host);
144
145 #if WHEN_IPCACHE_NBGETHOSTBYNAME_USES_ASYNC_CALLS
146 dieOnConnectionFailure(); // throws
147 #else // take a step back into protected Async call dialing.
148 // fake the connect callback
149 typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> Dialer;
150 CbcPointer<Xaction> self(this);
151 Dialer dialer(self, &Adaptation::Icap::Xaction::noteCommConnected);
152 dialer.params.conn = connection;
153 dialer.params.flag = COMM_ERROR;
154 // fake other parameters by copying from the existing connection
155 connector = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected", dialer);
156 ScheduleCallHere(connector);
157 #endif
158 return;
159 }
160
161 assert(ia->cur < ia->count);
162
163 connection = new Comm::Connection;
164 connection->remote = ia->in_addrs[ia->cur];
165 connection->remote.SetPort(s.cfg().port);
166 getOutgoingAddress(NULL, connection);
167
168 // TODO: service bypass status may differ from that of a transaction
169 typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> ConnectDialer;
170 connector = JobCallback(93,3, ConnectDialer, this, Adaptation::Icap::Xaction::noteCommConnected);
171 Comm::ConnOpener *cs = new Comm::ConnOpener(connection, connector, TheConfig.connect_timeout(service().cfg().bypass));
172 cs->setHost(s.cfg().host.termedBuf());
173 AsyncJob::Start(cs);
174 }
175
176 /*
177 * This event handler is necessary to work around the no-rentry policy
178 * of Adaptation::Icap::Xaction::callStart()
179 */
180 #if 0
181 void
182 Adaptation::Icap::Xaction::reusedConnection(void *data)
183 {
184 debugs(93, 5, HERE << "reused connection");
185 Adaptation::Icap::Xaction *x = (Adaptation::Icap::Xaction*)data;
186 x->noteCommConnected(COMM_OK);
187 }
188 #endif
189
190 void Adaptation::Icap::Xaction::closeConnection()
191 {
192 if (haveConnection()) {
193
194 if (closer != NULL) {
195 comm_remove_close_handler(connection->fd, closer);
196 closer = NULL;
197 }
198
199 cancelRead(); // may not work
200
201 if (reuseConnection && !doneWithIo()) {
202 //status() adds leading spaces.
203 debugs(93,5, HERE << "not reusing pconn due to pending I/O" << status());
204 reuseConnection = false;
205 }
206
207 if (reuseConnection)
208 disableRetries();
209
210 const bool reset = !reuseConnection &&
211 (al.icap.outcome == xoGone || al.icap.outcome == xoError);
212
213 Adaptation::Icap::ServiceRep &s = service();
214 s.putConnection(connection, reuseConnection, reset, status());
215
216 writer = NULL;
217 reader = NULL;
218 connector = NULL;
219 connection = NULL;
220 }
221 }
222
223 // connection with the ICAP service established
224 void Adaptation::Icap::Xaction::noteCommConnected(const CommConnectCbParams &io)
225 {
226 if (io.flag == COMM_TIMEOUT) {
227 handleCommTimedout();
228 return;
229 }
230
231 Must(connector != NULL);
232 connector = NULL;
233
234 if (io.flag != COMM_OK)
235 dieOnConnectionFailure(); // throws
236
237 typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommTimeoutCbParams> TimeoutDialer;
238 AsyncCall::Pointer timeoutCall = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommTimedout",
239 TimeoutDialer(this,&Adaptation::Icap::Xaction::noteCommTimedout));
240 commSetConnTimeout(io.conn, TheConfig.connect_timeout(service().cfg().bypass), timeoutCall);
241
242 typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommCloseCbParams> CloseDialer;
243 closer = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed",
244 CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed));
245 comm_add_close_handler(io.conn->fd, closer);
246
247 // ?? fd_table[io.conn->fd].noteUse(icapPconnPool);
248 service().noteConnectionUse(connection);
249
250 handleCommConnected();
251 }
252
253 void Adaptation::Icap::Xaction::dieOnConnectionFailure()
254 {
255 debugs(93, 2, HERE << typeName <<
256 " failed to connect to " << service().cfg().uri);
257 service().noteConnectionFailed("failure");
258 detailError(ERR_DETAIL_ICAP_XACT_START);
259 throw TexcHere("cannot connect to the ICAP service");
260 }
261
262 void Adaptation::Icap::Xaction::scheduleWrite(MemBuf &buf)
263 {
264 Must(haveConnection());
265
266 // comm module will free the buffer
267 typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommIoCbParams> Dialer;
268 writer = JobCallback(93, 3,
269 Dialer, this, Adaptation::Icap::Xaction::noteCommWrote);
270
271 Comm::Write(connection, &buf, writer);
272 updateTimeout();
273 }
274
275 void Adaptation::Icap::Xaction::noteCommWrote(const CommIoCbParams &io)
276 {
277 Must(writer != NULL);
278 writer = NULL;
279
280 if (ignoreLastWrite) {
281 // a hack due to comm inability to cancel a pending write
282 ignoreLastWrite = false;
283 debugs(93, 7, HERE << "ignoring last write; status: " << io.flag);
284 } else {
285 Must(io.flag == COMM_OK);
286 al.icap.bytesSent += io.size;
287 updateTimeout();
288 handleCommWrote(io.size);
289 }
290 }
291
292 // communication timeout with the ICAP service
293 void Adaptation::Icap::Xaction::noteCommTimedout(const CommTimeoutCbParams &io)
294 {
295 handleCommTimedout();
296 }
297
298 void Adaptation::Icap::Xaction::handleCommTimedout()
299 {
300 debugs(93, 2, HERE << typeName << " failed: timeout with " <<
301 theService->cfg().methodStr() << " " <<
302 theService->cfg().uri << status());
303 reuseConnection = false;
304 const bool whileConnecting = connector != NULL;
305 if (whileConnecting) {
306 assert(!haveConnection());
307 theService->noteConnectionFailed("timedout");
308 } else
309 closeConnection(); // so that late Comm callbacks do not disturb bypass
310 throw TexcHere(whileConnecting ?
311 "timed out while connecting to the ICAP service" :
312 "timed out while talking to the ICAP service");
313 }
314
315 // unexpected connection close while talking to the ICAP service
316 void Adaptation::Icap::Xaction::noteCommClosed(const CommCloseCbParams &io)
317 {
318 closer = NULL;
319 handleCommClosed();
320 }
321
322 void Adaptation::Icap::Xaction::handleCommClosed()
323 {
324 detailError(ERR_DETAIL_ICAP_XACT_CLOSE);
325 mustStop("ICAP service connection externally closed");
326 }
327
328 void Adaptation::Icap::Xaction::callException(const std::exception &e)
329 {
330 setOutcome(xoError);
331 service().noteFailure();
332 Adaptation::Initiate::callException(e);
333 }
334
335 void Adaptation::Icap::Xaction::callEnd()
336 {
337 if (doneWithIo()) {
338 debugs(93, 5, HERE << typeName << " done with I/O" << status());
339 closeConnection();
340 }
341 Adaptation::Initiate::callEnd(); // may destroy us
342 }
343
344 bool Adaptation::Icap::Xaction::doneAll() const
345 {
346 return !connector && !reader && !writer && Adaptation::Initiate::doneAll();
347 }
348
349 void Adaptation::Icap::Xaction::updateTimeout()
350 {
351 Must(haveConnection());
352
353 if (reader != NULL || writer != NULL) {
354 // restart the timeout before each I/O
355 // XXX: why does Config.Timeout lacks a write timeout?
356 // TODO: service bypass status may differ from that of a transaction
357 typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommTimeoutCbParams> TimeoutDialer;
358 AsyncCall::Pointer call = JobCallback(93, 5, TimeoutDialer, this, Adaptation::Icap::Xaction::noteCommTimedout);
359 commSetConnTimeout(connection, TheConfig.io_timeout(service().cfg().bypass), call);
360 } else {
361 // clear timeout when there is no I/O
362 // Do we need a lifetime timeout?
363 commUnsetConnTimeout(connection);
364 }
365 }
366
367 void Adaptation::Icap::Xaction::scheduleRead()
368 {
369 Must(haveConnection());
370 Must(!reader);
371 Must(readBuf.hasSpace());
372
373 /*
374 * See comments in Adaptation::Icap::Xaction.h about why we use commBuf
375 * here instead of reading directly into readBuf.buf.
376 */
377 typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommIoCbParams> Dialer;
378 reader = JobCallback(93, 3,
379 Dialer, this, Adaptation::Icap::Xaction::noteCommRead);
380
381 comm_read(connection, commBuf, readBuf.spaceSize(), reader);
382 updateTimeout();
383 }
384
385 // comm module read a portion of the ICAP response for us
386 void Adaptation::Icap::Xaction::noteCommRead(const CommIoCbParams &io)
387 {
388 Must(reader != NULL);
389 reader = NULL;
390
391 Must(io.flag == COMM_OK);
392
393 if (!io.size) {
394 commEof = true;
395 reuseConnection = false;
396
397 // detect a pconn race condition: eof on the first pconn read
398 if (!al.icap.bytesRead && retriable()) {
399 setOutcome(xoRace);
400 mustStop("pconn race");
401 return;
402 }
403 } else {
404
405 al.icap.bytesRead+=io.size;
406
407 updateTimeout();
408
409 debugs(93, 3, HERE << "read " << io.size << " bytes");
410
411 /*
412 * See comments in Adaptation::Icap::Xaction.h about why we use commBuf
413 * here instead of reading directly into readBuf.buf.
414 */
415
416 readBuf.append(commBuf, io.size);
417 disableRetries(); // because pconn did not fail
418 }
419
420 handleCommRead(io.size);
421 }
422
423 void Adaptation::Icap::Xaction::cancelRead()
424 {
425 if (reader != NULL) {
426 Must(haveConnection());
427 comm_read_cancel(connection->fd, reader);
428 reader = NULL;
429 }
430 }
431
432 bool Adaptation::Icap::Xaction::parseHttpMsg(HttpMsg *msg)
433 {
434 debugs(93, 5, HERE << "have " << readBuf.contentSize() << " head bytes to parse");
435
436 http_status error = HTTP_STATUS_NONE;
437 const bool parsed = msg->parse(&readBuf, commEof, &error);
438 Must(parsed || !error); // success or need more data
439
440 if (!parsed) { // need more data
441 Must(mayReadMore());
442 msg->reset();
443 return false;
444 }
445
446 readBuf.consume(msg->hdr_sz);
447 return true;
448 }
449
450 bool Adaptation::Icap::Xaction::mayReadMore() const
451 {
452 return !doneReading() && // will read more data
453 readBuf.hasSpace(); // have space for more data
454 }
455
456 bool Adaptation::Icap::Xaction::doneReading() const
457 {
458 return commEof;
459 }
460
461 bool Adaptation::Icap::Xaction::doneWriting() const
462 {
463 return !writer;
464 }
465
466 bool Adaptation::Icap::Xaction::doneWithIo() const
467 {
468 return haveConnection() &&
469 !connector && !reader && !writer && // fast checks, some redundant
470 doneReading() && doneWriting();
471 }
472
473 bool Adaptation::Icap::Xaction::haveConnection() const
474 {
475 return connection != NULL && connection->isOpen();
476 }
477
478 // initiator aborted
479 void Adaptation::Icap::Xaction::noteInitiatorAborted()
480 {
481
482 if (theInitiator.set()) {
483 debugs(93,4, HERE << "Initiator gone before ICAP transaction ended");
484 clearInitiator();
485 detailError(ERR_DETAIL_ICAP_INIT_GONE);
486 setOutcome(xoGone);
487 mustStop("initiator aborted");
488 }
489
490 }
491
492 void Adaptation::Icap::Xaction::setOutcome(const Adaptation::Icap::XactOutcome &xo)
493 {
494 if (al.icap.outcome != xoUnknown) {
495 debugs(93, 3, HERE << "Warning: reseting outcome: from " <<
496 al.icap.outcome << " to " << xo);
497 } else {
498 debugs(93, 4, HERE << xo);
499 }
500 al.icap.outcome = xo;
501 }
502
503 // This 'last chance' method is called before a 'done' transaction is deleted.
504 // It is wrong to call virtual methods from a destructor. Besides, this call
505 // indicates that the transaction will terminate as planned.
506 void Adaptation::Icap::Xaction::swanSong()
507 {
508 // kids should sing first and then call the parent method.
509
510 closeConnection(); // TODO: rename because we do not always close
511
512 if (!readBuf.isNull())
513 readBuf.clean();
514
515 if (commBuf)
516 memFreeBuf(commBufSize, commBuf);
517
518 tellQueryAborted();
519
520 maybeLog();
521
522 Adaptation::Initiate::swanSong();
523 }
524
525 void Adaptation::Icap::Xaction::tellQueryAborted()
526 {
527 if (theInitiator.set()) {
528 Adaptation::Icap::XactAbortInfo abortInfo(icapRequest, icapReply,
529 retriable(), repeatable());
530 Launcher *launcher = dynamic_cast<Launcher*>(theInitiator.get());
531 // launcher may be nil if initiator is invalid
532 CallJobHere1(91,5, CbcPointer<Launcher>(launcher),
533 Launcher, noteXactAbort, abortInfo);
534 clearInitiator();
535 }
536 }
537
538 void Adaptation::Icap::Xaction::maybeLog()
539 {
540 if (IcapLogfileStatus == LOG_ENABLE) {
541 ACLChecklist *checklist = new ACLFilledChecklist(::Config.accessList.icap, al.request, dash_str);
542 if (!::Config.accessList.icap || checklist->fastCheck() == ACCESS_ALLOWED) {
543 finalizeLogInfo();
544 icapLogLog(alep, checklist);
545 }
546 delete checklist;
547 }
548 }
549
550 void Adaptation::Icap::Xaction::finalizeLogInfo()
551 {
552 //prepare log data
553 al.icp.opcode = ICP_INVALID;
554
555 const Adaptation::Icap::ServiceRep &s = service();
556 al.icap.hostAddr = s.cfg().host.termedBuf();
557 al.icap.serviceName = s.cfg().key;
558 al.icap.reqUri = s.cfg().uri;
559
560 al.icap.ioTime = tvSubMsec(icap_tio_start, icap_tio_finish);
561 al.icap.trTime = tvSubMsec(icap_tr_start, current_time);
562
563 al.icap.request = HTTPMSGLOCK(icapRequest);
564 if (icapReply) {
565 al.icap.reply = HTTPMSGLOCK(icapReply);
566 al.icap.resStatus = icapReply->sline.status;
567 }
568 }
569
570 // returns a temporary string depicting transaction status, for debugging
571 const char *Adaptation::Icap::Xaction::status() const
572 {
573 static MemBuf buf;
574 buf.reset();
575
576 buf.append(" [", 2);
577
578 fillPendingStatus(buf);
579 buf.append("/", 1);
580 fillDoneStatus(buf);
581
582 buf.Printf(" %s%u]", id.Prefix, id.value);
583
584 buf.terminate();
585
586 return buf.content();
587 }
588
589 void Adaptation::Icap::Xaction::fillPendingStatus(MemBuf &buf) const
590 {
591 if (haveConnection()) {
592 buf.Printf("FD %d", connection->fd);
593
594 if (writer != NULL)
595 buf.append("w", 1);
596
597 if (reader != NULL)
598 buf.append("r", 1);
599
600 buf.append(";", 1);
601 }
602 }
603
604 void Adaptation::Icap::Xaction::fillDoneStatus(MemBuf &buf) const
605 {
606 if (haveConnection() && commEof)
607 buf.Printf("Comm(%d)", connection->fd);
608
609 if (stopReason != NULL)
610 buf.Printf("Stopped");
611 }
612
613 bool Adaptation::Icap::Xaction::fillVirginHttpHeader(MemBuf &buf) const
614 {
615 return false;
616 }