]> git.ipfire.org Git - thirdparty/squid.git/blobdiff - src/adaptation/icap/Xaction.cc
Merge from trunk
[thirdparty/squid.git] / src / adaptation / icap / Xaction.cc
index 14fedfabaf94180c5c12f96c68fff4f40bcb90cd..574d93d57c41a3e0e409eac6081bd23e632f1770 100644 (file)
@@ -4,6 +4,8 @@
 
 #include "squid.h"
 #include "comm.h"
+#include "comm/Connection.h"
+#include "comm/ConnOpener.h"
 #include "CommCalls.h"
 #include "HttpMsg.h"
 #include "adaptation/icap/Xaction.h"
@@ -13,7 +15,6 @@
 #include "pconn.h"
 #include "HttpRequest.h"
 #include "HttpReply.h"
-#include "ip/tools.h"
 #include "acl/FilledChecklist.h"
 #include "icap_log.h"
 #include "fde.h"
@@ -24,14 +25,13 @@ static PconnPool *icapPconnPool = new PconnPool("ICAP Servers");
 
 //CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Icap, Xaction);
 
-Adaptation::Icap::Xaction::Xaction(const char *aTypeName,
-                                   Adaptation::Icap::ServiceRep::Pointer &aService):
+Adaptation::Icap::Xaction::Xaction(const char *aTypeName, Adaptation::Icap::ServiceRep::Pointer &aService):
         AsyncJob(aTypeName),
         Adaptation::Initiate(aTypeName),
         icapRequest(NULL),
         icapReply(NULL),
         attempts(0),
-        connection(-1),
+        connection(NULL),
         theService(aService),
         commBuf(NULL), commBufSize(0),
         commEof(false),
@@ -86,28 +86,35 @@ void Adaptation::Icap::Xaction::start()
 }
 
 // TODO: obey service-specific, OPTIONS-reported connection limit
-void Adaptation::Icap::Xaction::openConnection()
+void
+Adaptation::Icap::Xaction::openConnection()
 {
-    Ip::Address client_addr;
-
-    Must(connection < 0);
+    Must(!haveConnection());
 
     const Adaptation::Service &s = service();
 
     if (!TheConfig.reuse_connections)
         disableRetries(); // this will also safely drain pconn pool
 
+    connection = new Comm::Connection;
+
+    /* NP: set these here because it applies whether a pconn or a new conn is used */
+
+    // TODO: Avoid blocking lookup if s.cfg().host is a hostname
+    connection->remote = s.cfg().host.termedBuf();
+    connection->remote.SetPort(s.cfg().port);
+
     // TODO: check whether NULL domain is appropriate here
-    connection = icapPconnPool->pop(s.cfg().host.termedBuf(), s.cfg().port, NULL, client_addr, isRetriable);
-    if (connection >= 0) {
-        debugs(93,3, HERE << "reused pconn FD " << connection);
+    icapPconnPool->pop(connection, NULL, isRetriable);
+    if (connection->isOpen()) {
+        debugs(93,3, HERE << "reused pconn " << connection);
 
         // fake the connect callback
         // TODO: can we sync call Adaptation::Icap::Xaction::noteCommConnected here instead?
         typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> Dialer;
         CbcPointer<Xaction> self(this);
         Dialer dialer(self, &Adaptation::Icap::Xaction::noteCommConnected);
-        dialer.params.fd = connection;
+        dialer.params.conn = connection;
         dialer.params.flag = COMM_OK;
         // fake other parameters by copying from the existing connection
         connector = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected", dialer);
@@ -117,40 +124,11 @@ void Adaptation::Icap::Xaction::openConnection()
 
     disableRetries(); // we only retry pconn failures
 
-    Ip::Address outgoing;
-    if (!Ip::EnableIpv6 && !outgoing.SetIPv4()) {
-        debugs(31, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << outgoing << " is not an IPv4 address.");
-        dieOnConnectionFailure(); // throws
-    }
-    /* split-stack for now requires default IPv4-only socket */
-    if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && outgoing.IsAnyAddr() && !s.cfg().ipv6) {
-        outgoing.SetIPv4();
-    }
-
-    connection = comm_open(SOCK_STREAM, 0, outgoing,
-                           COMM_NONBLOCKING, s.cfg().uri.termedBuf());
-
-    if (connection < 0)
-        dieOnConnectionFailure(); // throws
-
-    debugs(93,3, typeName << " opens connection to " << s.cfg().host << ":" << s.cfg().port);
-
-    // TODO: service bypass status may differ from that of a transaction
-    typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommTimeoutCbParams> TimeoutDialer;
-    AsyncCall::Pointer timeoutCall = JobCallback(93, 5,
-                                     TimeoutDialer, this, Adaptation::Icap::Xaction::noteCommTimedout);
-    commSetTimeout(connection, TheConfig.connect_timeout(
-                       service().cfg().bypass), timeoutCall);
-
-    typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommCloseCbParams> CloseDialer;
-    closer = JobCallback(93, 5,
-                         CloseDialer, this, Adaptation::Icap::Xaction::noteCommClosed);
-    comm_add_close_handler(connection, closer);
-
     typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> ConnectDialer;
-    connector = JobCallback(93,3,
-                            ConnectDialer, this, Adaptation::Icap::Xaction::noteCommConnected);
-    commConnectStart(connection, s.cfg().host.termedBuf(), s.cfg().port, connector);
+    connector = JobCallback(93,3, ConnectDialer, this, Adaptation::Icap::Xaction::noteCommConnected);
+    Comm::ConnOpener *cs = new Comm::ConnOpener(connection, connector, TheConfig.connect_timeout(service().cfg().bypass));
+    cs->setHost(s.cfg().host.termedBuf());
+    AsyncJob::Start(cs);
 }
 
 /*
@@ -169,10 +147,10 @@ Adaptation::Icap::Xaction::reusedConnection(void *data)
 
 void Adaptation::Icap::Xaction::closeConnection()
 {
-    if (connection >= 0) {
+    if (haveConnection()) {
 
         if (closer != NULL) {
-            comm_remove_close_handler(connection, closer);
+            comm_remove_close_handler(connection->fd, closer);
             closer = NULL;
         }
 
@@ -185,38 +163,45 @@ void Adaptation::Icap::Xaction::closeConnection()
         }
 
         if (reuseConnection) {
-            Ip::Address client_addr;
             //status() adds leading spaces.
             debugs(93,3, HERE << "pushing pconn" << status());
-            AsyncCall::Pointer call = NULL;
-            commSetTimeout(connection, -1, call);
-            icapPconnPool->push(connection, theService->cfg().host.termedBuf(),
-                                theService->cfg().port, NULL, client_addr);
+            AsyncCall::Pointer nul;
+            commSetTimeout(connection->fd, -1, nul);
+            icapPconnPool->push(connection, NULL);
             disableRetries();
         } else {
             //status() adds leading spaces.
             debugs(93,3, HERE << "closing pconn" << status());
-            // comm_close will clear timeout
-            comm_close(connection);
+            connection->close();
         }
 
         writer = NULL;
         reader = NULL;
         connector = NULL;
-        connection = -1;
+        connection = NULL;
     }
 }
 
 // connection with the ICAP service established
 void Adaptation::Icap::Xaction::noteCommConnected(const CommConnectCbParams &io)
 {
+    if (io.flag == COMM_TIMEOUT) {
+        handleCommTimedout();
+        return;
+    }
+
     Must(connector != NULL);
     connector = NULL;
 
     if (io.flag != COMM_OK)
         dieOnConnectionFailure(); // throws
 
-    fd_table[connection].noteUse(icapPconnPool);
+    typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommCloseCbParams> CloseDialer;
+    closer =  asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed",
+                        CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed));
+    comm_add_close_handler(io.conn->fd, closer);
+
+    fd_table[io.conn->fd].noteUse(icapPconnPool);
 
     handleCommConnected();
 }
@@ -230,11 +215,11 @@ void Adaptation::Icap::Xaction::dieOnConnectionFailure()
 
 void Adaptation::Icap::Xaction::scheduleWrite(MemBuf &buf)
 {
+    Must(haveConnection());
+
     // comm module will free the buffer
     typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommIoCbParams> Dialer;
-    writer = JobCallback(93,3,
-                         Dialer, this, Adaptation::Icap::Xaction::noteCommWrote);
-
+    writer = JobCallback(93, 3, Dialer, this, Adaptation::Icap::Xaction::noteCommWrote);
     comm_write_mbuf(connection, &buf, writer);
     updateTimeout();
 }
@@ -311,27 +296,26 @@ bool Adaptation::Icap::Xaction::doneAll() const
 
 void Adaptation::Icap::Xaction::updateTimeout()
 {
+    Must(haveConnection());
+
     if (reader != NULL || writer != NULL) {
         // restart the timeout before each I/O
         // XXX: why does Config.Timeout lacks a write timeout?
         // TODO: service bypass status may differ from that of a transaction
         typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommTimeoutCbParams> TimeoutDialer;
-        AsyncCall::Pointer call = JobCallback(93,5,
-                                              TimeoutDialer, this, Adaptation::Icap::Xaction::noteCommTimedout);
-
-        commSetTimeout(connection,
-                       TheConfig.io_timeout(service().cfg().bypass), call);
+        AsyncCall::Pointer call = JobCallback(93, 5, TimeoutDialer, this, Adaptation::Icap::Xaction::noteCommTimedout);
+        commSetTimeout(connection->fd, TheConfig.io_timeout(service().cfg().bypass), call);
     } else {
         // clear timeout when there is no I/O
         // Do we need a lifetime timeout?
         AsyncCall::Pointer call = NULL;
-        commSetTimeout(connection, -1, call);
+        commSetTimeout(connection->fd, -1, call);
     }
 }
 
 void Adaptation::Icap::Xaction::scheduleRead()
 {
-    Must(connection >= 0);
+    Must(haveConnection());
     Must(!reader);
     Must(readBuf.hasSpace());
 
@@ -340,9 +324,7 @@ void Adaptation::Icap::Xaction::scheduleRead()
      * here instead of reading directly into readBuf.buf.
      */
     typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommIoCbParams> Dialer;
-    reader = JobCallback(93,3,
-                         Dialer, this, Adaptation::Icap::Xaction::noteCommRead);
-
+    reader = JobCallback(93, 3, Dialer, this, Adaptation::Icap::Xaction::noteCommRead);
     comm_read(connection, commBuf, readBuf.spaceSize(), reader);
     updateTimeout();
 }
@@ -389,7 +371,8 @@ void Adaptation::Icap::Xaction::noteCommRead(const CommIoCbParams &io)
 void Adaptation::Icap::Xaction::cancelRead()
 {
     if (reader != NULL) {
-        comm_read_cancel(connection, reader);
+        Must(haveConnection());
+        comm_read_cancel(connection->fd, reader);
         reader = NULL;
     }
 }
@@ -430,11 +413,16 @@ bool Adaptation::Icap::Xaction::doneWriting() const
 
 bool Adaptation::Icap::Xaction::doneWithIo() const
 {
-    return connection >= 0 && // or we could still be waiting to open it
+    return haveConnection() &&
            !connector && !reader && !writer && // fast checks, some redundant
            doneReading() && doneWriting();
 }
 
+bool Adaptation::Icap::Xaction::haveConnection() const
+{
+    return connection != NULL && connection->isOpen();
+}
+
 // initiator aborted
 void Adaptation::Icap::Xaction::noteInitiatorAborted()
 {
@@ -547,8 +535,8 @@ const char *Adaptation::Icap::Xaction::status() const
 
 void Adaptation::Icap::Xaction::fillPendingStatus(MemBuf &buf) const
 {
-    if (connection >= 0) {
-        buf.Printf("FD %d", connection);
+    if (haveConnection()) {
+        buf.Printf("FD %d", connection->fd);
 
         if (writer != NULL)
             buf.append("w", 1);
@@ -562,8 +550,8 @@ void Adaptation::Icap::Xaction::fillPendingStatus(MemBuf &buf) const
 
 void Adaptation::Icap::Xaction::fillDoneStatus(MemBuf &buf) const
 {
-    if (connection >= 0 && commEof)
-        buf.Printf("Comm(%d)", connection);
+    if (haveConnection() && commEof)
+        buf.Printf("Comm(%d)", connection->fd);
 
     if (stopReason != NULL)
         buf.Printf("Stopped");