]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Convert ListenStateData to AsyncJob ConnAcceptor
authorAmos Jeffries <squid3@treenet.co.nz>
Sun, 15 Aug 2010 12:40:32 +0000 (00:40 +1200)
committerAmos Jeffries <squid3@treenet.co.nz>
Sun, 15 Aug 2010 12:40:32 +0000 (00:40 +1200)
src/ProtoPort.cc
src/ProtoPort.h
src/client_side.cc
src/comm.cc
src/comm/AcceptLimiter.cc
src/comm/AcceptLimiter.h
src/comm/ConnAcceptor.cc [moved from src/comm/ListenStateData.cc with 69% similarity]
src/comm/ConnAcceptor.h [moved from src/comm/ListenStateData.h with 65% similarity]
src/comm/ConnOpener.cc
src/comm/Makefile.am
src/ftp.cc

index c03f2efa79e2953eb88c0e11180bde498d214c74..0050e5aafc83cfbcf3cba8898acce8ea4e47e096 100644 (file)
@@ -1,7 +1,3 @@
-/*
- * $Id$
- */
-
 #include "squid.h"
 #include "ProtoPort.h"
 
@@ -16,7 +12,8 @@ http_port_list::http_port_list(const char *aProtocol)
 
 http_port_list::~http_port_list()
 {
-    delete listener;
+    listener->unsubscribe(); // trigger self-close
+    listener = NULL;
 
     safe_free(name);
     safe_free(defaultsite);
index 2afd5592ba20a59102ca2993f9a4c20c590b6ccc..d3d3769ca2742816acd21c4c2d4123453d633ce9 100644 (file)
@@ -1,12 +1,8 @@
-/*
- * $Id$
- */
 #ifndef SQUID_PROTO_PORT_H
 #define SQUID_PROTO_PORT_H
 
-//#include "typedefs.h"
 #include "cbdata.h"
-#include "comm/ListenStateData.h"
+#include "comm/ConnAcceptor.h"
 
 struct http_port_list {
     http_port_list(const char *aProtocol);
@@ -43,7 +39,7 @@ struct http_port_list {
      * If not NULL we are actively listening for client requests.
      * delete to close the socket.
      */
-    Comm::ListenStateData *listener;
+    Comm::ConnAcceptor *listener;
 
 #if USE_SSL
     // XXX: temporary hack to ease move of SSL options to http_port
index 48337de8d27226a4b4b6e2dc252820dc92994cb9..99058b656fb5d8b83ecf6cfb579d0a860ba3bfdd 100644 (file)
@@ -94,7 +94,7 @@
 #include "clientStream.h"
 #include "comm.h"
 #include "comm/Connection.h"
-#include "comm/ListenStateData.h"
+#include "comm/ConnAcceptor.h"
 #include "eui/Config.h"
 #include "fde.h"
 #include "HttpHdrContRange.h"
@@ -3440,7 +3440,7 @@ clientHttpConnectionsOpen(void)
             ++bumpCount;
 #endif
 
-        /* AYJ: 2009-12-27: bit bumpy. new ListenStateData(...) should be doing all the Comm:: stuff ... */
+        /* AYJ: 2009-12-27: bit bumpy. new ConnAcceptor(...) should be doing all the Comm:: stuff ... */
 
         const int openFlags = COMM_NONBLOCKING |
                               (s->spoof_client_ip ? COMM_TRANSPARENT : 0);
@@ -3471,8 +3471,9 @@ clientHttpConnectionOpened(int fd, int, http_port_list *s)
 
     Must(s);
 
-    s->listener = new Comm::ListenStateData(fd, true);
+    s->listener = new Comm::ConnAcceptor(fd, true);
     s->listener->subscribe(5,5, "httpAccept", new CommAcceptCbPtrFun(httpAccept, s));
+    AsyncJob::AsyncStart(s->listener);
 
     debugs(1, 1, "Accepting " <<
            (s->intercepted ? " intercepted" : "") <<
@@ -3523,8 +3524,9 @@ clientHttpsConnectionOpened(int fd, int, http_port_list *s)
 
     Must(s);
 
-    s->listener = new Comm::ListenStateData(fd, true);
+    s->listener = new Comm::ConnAcceptor(fd, true);
     s->listener->subscribe(5,5, "httpsAccept", new CommAcceptCbPtrFun(httpsAccept, s));
+    AsyncJob::AsyncStart(s->listener);
 
     debugs(1, 1, "Accepting HTTPS connections at " << s->s << ", FD " << fd << ".");
 
@@ -3550,8 +3552,8 @@ clientHttpConnectionsClose(void)
 {
     for (http_port_list *s = Config.Sockaddr.http; s; s = s->next) {
         if (s->listener) {
-            debugs(1, 1, "FD " << s->listener->fd << " Closing HTTP connection");
-            delete s->listener;
+            debugs(1, 1, "Closing HTTP port " << s->s);
+            s->listener->unsubscribe();
             s->listener = NULL;
         }
     }
@@ -3559,8 +3561,8 @@ clientHttpConnectionsClose(void)
 #if USE_SSL
     for (http_port_list *s = Config.Sockaddr.https; s; s = s->next) {
         if (s->listener) {
-            debugs(1, 1, "FD " << s->listener->fd << " Closing HTTPS connection");
-            delete s->listener;
+            debugs(1, 1, "Closing HTTPS port " << s->s);
+            s->listener->unsubscribe();
             s->listener = NULL;
         }
     }
index 4eaf66a91c1be8d20ea8c05bc277481cec799b6c..e392dc57f0cffb7202efef5b70e5191a6ba58761 100644 (file)
@@ -41,7 +41,6 @@
 #include "comm/AcceptLimiter.h"
 #include "comm/comm_internal.h"
 #include "comm/Connection.h"
-#include "comm/ListenStateData.h"
 #include "CommIO.h"
 #include "CommRead.h"
 #include "MemBuf.h"
index 7881db0348afa300fce9286a761da5ff6282d66d..f101a1fbe8bc35b399c096fe0cbb1075e65d35c9 100644 (file)
@@ -1,6 +1,7 @@
 #include "config.h"
 #include "comm/AcceptLimiter.h"
-#include "comm/ListenStateData.h"
+#include "comm/ConnAcceptor.h"
+#include "comm/Connection.h"
 #include "fde.h"
 
 Comm::AcceptLimiter Comm::AcceptLimiter::Instance_;
@@ -11,10 +12,10 @@ Comm::AcceptLimiter &Comm::AcceptLimiter::Instance()
 }
 
 void
-Comm::AcceptLimiter::defer(Comm::ListenStateData *afd)
+Comm::AcceptLimiter::defer(Comm::ConnAcceptor *afd)
 {
     afd->isLimited++;
-    debugs(5, 5, HERE << "FD " << afd->fd << " x" << afd->isLimited);
+    debugs(5, 5, HERE << "FD " << afd->conn->fd << " x" << afd->isLimited);
     deferred.push_back(afd);
 }
 
@@ -25,7 +26,7 @@ Comm::AcceptLimiter::kick()
     if (deferred.size() > 0 && fdNFree() >= RESERVED_FD) {
         debugs(5, 5, HERE << " doing one.");
         /* NP: shift() is equivalent to pop_front(). Giving us a FIFO queue. */
-        ListenStateData *temp = deferred.shift();
+        ConnAcceptor *temp = deferred.shift();
         temp->isLimited--;
         temp->acceptNext();
     }
index 57313b142c1161af76df115a2495e663bfe803fa..253840206215bc02cb514eb1107196527ac3a676 100644 (file)
@@ -1,12 +1,12 @@
-#ifndef _SQUID_SRC_COMM_ACCEPT_LIMITER_H
-#define _SQUID_SRC_COMM_ACCEPT_LIMITER_H
+#ifndef _SQUID_SRC_COMM_ACCEPTLIMITER_H
+#define _SQUID_SRC_COMM_ACCEPTLIMITER_H
 
 #include "Array.h"
 
 namespace Comm
 {
 
-class ListenStateData;
+class ConnAcceptor;
 
 /**
  * FIFO Queue holding listener socket handlers which have been activated
@@ -14,7 +14,7 @@ class ListenStateData;
  * But when doing so there were not enough FD available to handle the
  * new connection. These handlers are awaiting some FD to become free.
  *
- * defer - used only by Comm layer ListenStateData adding themselves when FD are limited.
+ * defer - used only by Comm layer ConnAcceptor adding themselves when FD are limited.
  * kick - used by Comm layer when FD are closed.
  */
 class AcceptLimiter
@@ -25,7 +25,7 @@ public:
     static AcceptLimiter &Instance();
 
     /** delay accepting a new client connection. */
-    void defer(Comm::ListenStateData *afd);
+    void defer(Comm::ConnAcceptor *afd);
 
     /** try to accept and begin processing any delayed client connections. */
     void kick();
@@ -34,9 +34,9 @@ private:
     static AcceptLimiter Instance_;
 
     /** FIFO queue */
-    Vector<Comm::ListenStateData*> deferred;
+    Vector<Comm::ConnAcceptor*> deferred;
 };
 
 }; // namepace Comm
 
-#endif /* _SQUID_SRC_COMM_ACCEPT_LIMITER_H */
+#endif /* _SQUID_SRC_COMM_ACCEPTLIMITER_H */
similarity index 69%
rename from src/comm/ListenStateData.cc
rename to src/comm/ConnAcceptor.cc
index 20aec1d374fca213b256836c830c28e5f642cb03..03389d9ba9bbb64025c000aa06af6f9372cef68e 100644 (file)
  */
 
 #include "squid.h"
+#include "base/TextException.h"
 #include "CommCalls.h"
 #include "comm/AcceptLimiter.h"
 #include "comm/Connection.h"
 #include "comm/comm_internal.h"
-#include "comm/ListenStateData.h"
+#include "comm/ConnAcceptor.h"
 #include "fde.h"
 #include "protos.h"
 #include "SquidTime.h"
 
-Comm::ListenStateData::ListenStateData(int aFd, bool accept_many) :
-        fd(aFd),
+namespace Comm {
+    CBDATA_CLASS_INIT(ConnAcceptor);
+};
+
+Comm::ConnAcceptor::ConnAcceptor(int aFd, bool accept_many) :
+        AsyncJob("Legacy_Comm::ConnAcceptor"),
         errcode(0),
         isLimited(0),
         callSection(NULL),
@@ -55,9 +60,13 @@ Comm::ListenStateData::ListenStateData(int aFd, bool accept_many) :
 {
     assert(aFd >= 0);
     assert(isOpen(aFd));
+    conn = new Connection;
+    conn->fd = aFd;
+    // TODO: figure out what the new FD local address is/was/should be.
 }
 
-Comm::ListenStateData::ListenStateData(Comm::ConnectionPointer &conn, bool accept_many, const char *note) :
+Comm::ConnAcceptor::ConnAcceptor(Comm::ConnectionPointer &newConn, bool accept_many, const char *note) :
+        AsyncJob("Comm::ConnAcceptor"),
         errcode(0),
         isLimited(0),
         callSection(NULL),
@@ -68,36 +77,30 @@ Comm::ListenStateData::ListenStateData(Comm::ConnectionPointer &conn, bool accep
         mayAcceptMore(accept_many)
 {
     /* open the conn if its not already open */
-    if (!IsConnOpen(conn)) {
-        conn->fd = comm_open(SOCK_STREAM,
-                             IPPROTO_TCP,
-                             conn->local,
-                             conn->flags,
-                             note);
-        debugs(9, 3, HERE << "Unconnected data socket created on FD " << conn->fd );
-
-        if (!conn->isOpen()) {
+    if (!IsConnOpen(newConn)) {
+        newConn->fd = comm_open_listener(SOCK_STREAM, IPPROTO_TCP, conn->local, conn->flags, note);
+        debugs(9, 3, HERE << "Unconnected data socket created on FD " << newConn->fd );
+
+        if (!newConn->isOpen()) {
             debugs(5, DBG_CRITICAL, HERE << "comm_open failed");
             errcode = -1;
             return;
         }
     }
 
-    assert(IsConnOpen(conn));
-    fd = conn->fd;
+    assert(IsConnOpen(newConn));
+    conn = newConn;
 }
 
-Comm::ListenStateData::~ListenStateData()
+Comm::ConnAcceptor::~ConnAcceptor()
 {
-    unsubscribe();
-    comm_close(fd);
-    fd = -1;
+    swanSong();
 }
 
 void
-Comm::ListenStateData::subscribe(int section, int level, const char *name, CommAcceptCbPtrFun *dialer)
+Comm::ConnAcceptor::subscribe(int section, int level, const char *name, CommAcceptCbPtrFun *dialer)
 {
-    debugs(5, 5, HERE << "FD " << fd << " AsyncCall: " << name);
+    debugs(5, 5, HERE << "FD " << conn->fd << " AsyncCall: " << name);
 
     // if this is the first subscription. start listening on the socket.
     if (callDialer == NULL && theCallback == NULL)
@@ -112,34 +115,22 @@ Comm::ListenStateData::subscribe(int section, int level, const char *name, CommA
     safe_free(callName);
     callName = xstrdup(name);
     callDialer = dialer;
-
-    // if no error so far start accepting connections.
-    if (errcode == 0)
-        commSetSelect(fd, COMM_SELECT_READ, doAccept, this, 0);
 }
 
 void
-Comm::ListenStateData::subscribe(const AsyncCall::Pointer &call)
+Comm::ConnAcceptor::subscribe(const AsyncCall::Pointer &call)
 {
-    debugs(5, 5, HERE << "FD " << fd << " AsyncCall: " << call);
+    debugs(5, 5, HERE << "FD " << conn->fd << " AsyncCall: " << call);
 
     // remove old subscription. if any.
     unsubscribe();
 
     // store new callback subscription
     theCallback = call;
-
-    // start listening on the socket.
-    if (theCallback != NULL) {
-        setListen();
-        // if no error so far start accepting connections.
-        if (errcode == 0)
-            commSetSelect(fd, COMM_SELECT_READ, doAccept, this, 0);
-    }
 }
 
 void
-Comm::ListenStateData::unsubscribe()
+Comm::ConnAcceptor::unsubscribe()
 {
     safe_free(callName);
     delete callDialer;
@@ -147,6 +138,45 @@ Comm::ListenStateData::unsubscribe()
     theCallback = NULL;
 }
 
+void
+Comm::ConnAcceptor::start()
+{
+    debugs(5, 5, HERE << "FD " << conn->fd << " AsyncCall: " << callName);
+
+    Must(IsConnOpen(conn));
+
+    setListen();
+
+    // if no error so far start accepting connections.
+    if (errcode == 0)
+        commSetSelect(conn->fd, COMM_SELECT_READ, doAccept, this, 0);
+}
+
+bool
+Comm::ConnAcceptor::doneAll() const
+{
+    if (!IsConnOpen(conn)) {
+        debugs(5,5, HERE << "Done? maybe. FD is closed." << (conn==NULL?"conn=NULL":"") << ", FD " << (conn!=NULL?conn->fd:-999));
+        return AsyncJob::doneAll();
+    }
+
+    if (callDialer == NULL && theCallback == NULL) {
+        debugs(5,5, HERE << "Done? maybe: handlers are gone.");
+        return AsyncJob::doneAll();
+    }
+
+    return false;
+}
+
+void
+Comm::ConnAcceptor::swanSong()
+{
+    debugs(5,5, HERE);
+    unsubscribe();
+    conn = NULL;
+    AsyncJob::swanSong();
+}
+
 /**
  * New-style listen and accept routines
  *
@@ -155,11 +185,11 @@ Comm::ListenStateData::unsubscribe()
  * accept()ed some time later.
  */
 void
-Comm::ListenStateData::setListen()
+Comm::ConnAcceptor::setListen()
 {
     errcode = 0; // reset local errno copy.
-    if (listen(fd, Squid_MaxFD >> 2) < 0) {
-        debugs(50, 0, HERE << "listen(FD " << fd << ", " << (Squid_MaxFD >> 2) << "): " << xstrerror());
+    if (listen(conn->fd, Squid_MaxFD >> 2) < 0) {
+        debugs(50, 0, HERE << "listen(FD " << conn->fd << ", " << (Squid_MaxFD >> 2) << "): " << xstrerror());
         errcode = errno;
         return;
     }
@@ -168,15 +198,15 @@ Comm::ListenStateData::setListen()
 #ifdef SO_ACCEPTFILTER
         struct accept_filter_arg afa;
         bzero(&afa, sizeof(afa));
-        debugs(5, DBG_IMPORTANT, "Installing accept filter '" << Config.accept_filter << "' on FD " << fd);
+        debugs(5, DBG_IMPORTANT, "Installing accept filter '" << Config.accept_filter << "' on FD " << conn->fd);
         xstrncpy(afa.af_name, Config.accept_filter, sizeof(afa.af_name));
-        if (setsockopt(fd, SOL_SOCKET, SO_ACCEPTFILTER, &afa, sizeof(afa)) < 0)
+        if (setsockopt(conn->fd, SOL_SOCKET, SO_ACCEPTFILTER, &afa, sizeof(afa)) < 0)
             debugs(5, DBG_CRITICAL, "SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror());
 #elif defined(TCP_DEFER_ACCEPT)
         int seconds = 30;
         if (strncmp(Config.accept_filter, "data=", 5) == 0)
             seconds = atoi(Config.accept_filter + 5);
-        if (setsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &seconds, sizeof(seconds)) < 0)
+        if (setsockopt(conn->fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &seconds, sizeof(seconds)) < 0)
             debugs(5, DBG_CRITICAL, "TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror());
 #else
         debugs(5, DBG_CRITICAL, "accept_filter not supported on your OS");
@@ -194,23 +224,23 @@ Comm::ListenStateData::setListen()
  * done later when enough sockets become available.
  */
 void
-Comm::ListenStateData::doAccept(int fd, void *data)
+Comm::ConnAcceptor::doAccept(int fd, void *data)
 {
     debugs(5, 2, HERE << "New connection on FD " << fd);
 
-    assert(isOpen(fd));
-    ListenStateData *afd = static_cast<ListenStateData*>(data);
+    Must(isOpen(fd));
+    ConnAcceptor *afd = static_cast<ConnAcceptor*>(data);
 
     if (!okToAccept()) {
         AcceptLimiter::Instance().defer(afd);
     } else {
         afd->acceptNext();
     }
-    commSetSelect(fd, COMM_SELECT_READ, Comm::ListenStateData::doAccept, afd, 0);
+    commSetSelect(fd, COMM_SELECT_READ, Comm::ConnAcceptor::doAccept, afd, 0);
 }
 
 bool
-Comm::ListenStateData::okToAccept()
+Comm::ConnAcceptor::okToAccept()
 {
     static time_t last_warn = 0;
 
@@ -226,7 +256,7 @@ Comm::ListenStateData::okToAccept()
 }
 
 void
-Comm::ListenStateData::acceptOne()
+Comm::ConnAcceptor::acceptOne()
 {
     /*
      * We don't worry about running low on FDs here.  Instead,
@@ -235,42 +265,42 @@ Comm::ListenStateData::acceptOne()
      */
 
     /* Accept a new connection */
-    Connection *connDetails = new Connection();
-    int newfd = oldAccept(*connDetails);
+    Connection *newConnDetails = new Connection();
+    int newfd = oldAccept(*newConnDetails);
 
     /* Check for errors */
     if (newfd < 0) {
 
         if (newfd == COMM_NOMESSAGE) {
             /* register interest again */
-            debugs(5, 5, HERE << "try later: FD " << fd << " handler: " << callName);
-            commSetSelect(fd, COMM_SELECT_READ, doAccept, this, 0);
+            debugs(5, 5, HERE << "try later: FD " << conn->fd << " handler: " << callName);
+            commSetSelect(conn->fd, COMM_SELECT_READ, doAccept, this, 0);
             return;
         }
 
         // A non-recoverable error; notify the caller */
-        debugs(5, 5, HERE << "non-recoverable error: FD " << fd << " handler: " << callName);
-        notify(-1, COMM_ERROR, connDetails);
+        debugs(5, 5, HERE << "non-recoverable error: FD " << conn->fd << " handler: " << callName);
+        notify(-1, COMM_ERROR, newConnDetails);
         mayAcceptMore = false;
         return;
     }
 
-    debugs(5, 5, HERE << "accepted: FD " << fd <<
-           " newfd: " << newfd << " from: " << connDetails->remote <<
+    debugs(5, 5, HERE << "accepted: FD " << conn->fd <<
+           " newfd: " << newfd << " from: " << newConnDetails->remote <<
            " handler: " << callName);
-    notify(newfd, COMM_OK, connDetails);
+    notify(newfd, COMM_OK, newConnDetails);
 }
 
 void
-Comm::ListenStateData::acceptNext()
+Comm::ConnAcceptor::acceptNext()
 {
-    assert(isOpen(fd));
-    debugs(5, 2, HERE << "connection on FD " << fd);
+    Must(IsConnOpen(conn));
+    debugs(5, 2, HERE << "connection on FD " << conn->fd);
     acceptOne();
 }
 
 void
-Comm::ListenStateData::notify(int newfd, comm_err_t flag, const Comm::ConnectionPointer &connDetails)
+Comm::ConnAcceptor::notify(int newfd, comm_err_t flag, const Comm::ConnectionPointer &newConnDetails)
 {
     // listener socket handlers just abandon the port with COMM_ERR_CLOSING
     // it should only happen when this object is deleted...
@@ -282,21 +312,19 @@ Comm::ListenStateData::notify(int newfd, comm_err_t flag, const Comm::Connection
         AsyncCall::Pointer call = commCbCall(callSection, callLevel, callName, *callDialer);
         typedef CommAcceptCbParams Params;
         Params &params = GetCommParams<Params>(call);
-        params.fd = fd;
+        params.fd = conn->fd;
         params.nfd = newfd;
-        params.details = connDetails;
+        params.details = newConnDetails;
         params.flag = flag;
         params.xerrno = errcode;
         ScheduleCallHere(call);
-        if (!mayAcceptMore)
-            unsubscribe();
     }
     else if (theCallback != NULL) {
         typedef CommAcceptCbParams Params;
         Params &params = GetCommParams<Params>(theCallback);
-        params.fd = fd;
+        params.fd = conn->fd;
         params.nfd = newfd;
-        params.details = connDetails;
+        params.details = newConnDetails;
         params.flag = flag;
         params.xerrno = errcode;
         ScheduleCallHere(theCallback);
@@ -311,7 +339,7 @@ Comm::ListenStateData::notify(int newfd, comm_err_t flag, const Comm::Connection
  * Wait for an incoming connection on FD.
  */
 int
-Comm::ListenStateData::oldAccept(Comm::Connection &details)
+Comm::ConnAcceptor::oldAccept(Comm::Connection &details)
 {
     PROF_start(comm_accept);
     statCounter.syscalls.sock.accepts++;
@@ -320,7 +348,7 @@ Comm::ListenStateData::oldAccept(Comm::Connection &details)
     details.local.InitAddrInfo(gai);
 
     errcode = 0; // reset local errno copy.
-    if ((sock = accept(fd, gai->ai_addr, &gai->ai_addrlen)) < 0) {
+    if ((sock = accept(conn->fd, gai->ai_addr, &gai->ai_addrlen)) < 0) {
         errcode = errno; // store last accept errno locally.
 
         details.local.FreeAddrInfo(gai);
@@ -328,18 +356,18 @@ Comm::ListenStateData::oldAccept(Comm::Connection &details)
         PROF_stop(comm_accept);
 
         if (ignoreErrno(errno)) {
-            debugs(50, 5, HERE << "FD " << fd << ": " << xstrerror());
+            debugs(50, 5, HERE << "FD " << conn->fd << ": " << xstrerror());
             return COMM_NOMESSAGE;
         } else if (ENFILE == errno || EMFILE == errno) {
-            debugs(50, 3, HERE << "FD " << fd << ": " << xstrerror());
+            debugs(50, 3, HERE << "FD " << conn->fd << ": " << xstrerror());
             return COMM_ERROR;
         } else {
-            debugs(50, 1, HERE << "FD " << fd << ": " << xstrerror());
+            debugs(50, 1, HERE << "FD " << conn->fd << ": " << xstrerror());
             return COMM_ERROR;
         }
     }
 
-    assert(sock >= 0);
+    Must(sock >= 0);
     details.fd = sock;
     details.remote = *gai;
 
@@ -377,7 +405,7 @@ Comm::ListenStateData::oldAccept(Comm::Connection &details)
     commSetNonBlocking(sock);
 
     /* IFF the socket is (tproxy) transparent, pass the flag down to allow spoofing */
-    F->flags.transparent = fd_table[fd].flags.transparent;
+    F->flags.transparent = fd_table[conn->fd].flags.transparent;
 
     PROF_stop(comm_accept);
     return sock;
similarity index 65%
rename from src/comm/ListenStateData.h
rename to src/comm/ConnAcceptor.h
index bc5ec9a4989f8091b96d7730f1a9e7fb2dd57ca1..8cac4e5c409ed7c955f45ad86c9c37a6089c1e37 100644 (file)
@@ -1,5 +1,5 @@
-#ifndef SQUID_LISTENERSTATEDATA_H
-#define SQUID_LISTENERSTATEDATA_H
+#ifndef SQUID_COMM_CONNACCEPTOR_H
+#define SQUID_COMM_CONNACCEPTOR_H
 
 #include "config.h"
 #include "CommCalls.h"
 namespace Comm
 {
 
-class ListenStateData
+class ConnAcceptor : public AsyncJob
 {
+private:
+    void start();
+    bool doneAll() const;
+    void swanSong();
 
 public:
-    ListenStateData(int fd, bool accept_many); // Legacy verion that uses new subscribe API.
-    ListenStateData(Comm::ConnectionPointer &conn, bool accept_many, const char *note);
-    ListenStateData(const ListenStateData &r); // not implemented.
-    ~ListenStateData();
+    ConnAcceptor(int fd, bool accept_many); // Legacy verion that uses new subscribe API.
+    ConnAcceptor(Comm::ConnectionPointer &conn, bool accept_many, const char *note);
+    ConnAcceptor(const ConnAcceptor &r); // not implemented.
+    ~ConnAcceptor();
 
     /** Subscribe a handler to receive calls back about new connections.
      * Replaces any existing subscribed handler.
@@ -47,21 +51,25 @@ public:
     /// Call the subscribed callback handler with details about a new connection.
     void notify(int newfd, comm_err_t flag, const Comm::ConnectionPointer &details);
 
-    /// socket being listened on for new connections
-    int fd;
+    /// conn being listened on for new connections
+    /// Reserved for read-only use.
+    ConnectionPointer conn;
 
     /// errno code of the last accept() or listen() action if one occurred.
     int errcode;
 
     /// whether this socket is delayed and on the AcceptLimiter queue.
+    /// Reserved for read-only use outside of AcceptLimiter
     int32_t isLimited;
 
 private:
-    int callSection;        ///< debug section for subscribed callback.
-    int callLevel;          ///< debug level for subscribed callback.
-    char *callName;           ///< Name for the subscribed callback.
+    int callSection;                ///< debug section for subscribed callback.
+    int callLevel;                  ///< debug level for subscribed callback.
+    char *callName;                 ///< Name for the subscribed callback.
     CommAcceptCbPtrFun *callDialer; ///< dialer to make the subscribed callback
 
+    AsyncCall::Pointer theCallback; // TODO remove legacy pointer. Store dialer of members instead.
+
 private:
     /// Method to test if there are enough file descriptors to open a new client connection
     /// if not the accept() will be postponed
@@ -73,12 +81,13 @@ private:
     void acceptOne();
     int oldAccept(Comm::Connection &details);
 
-    AsyncCall::Pointer theCallback;
     bool mayAcceptMore;
 
     void setListen();
+
+    CBDATA_CLASS2(ConnAcceptor);
 };
 
 }; // namespace Comm
 
-#endif /* SQUID_LISTENERSTATEDATA_H */
+#endif /* SQUID_COMM_CONNACCEPTOR_H */
index 71d17022e2c1f3701be317b0b7b1ec98ef51490a..8ddbdcce01b449c978e96a7495dfc78cf93d97c4 100644 (file)
@@ -112,7 +112,8 @@ Comm::ConnOpener::doneConnecting(comm_err_t status, int xerrno)
     conn_ = NULL;
 }
 
-void Comm::ConnOpener::start()
+void
+Comm::ConnOpener::start()
 {
     Must(conn_ != NULL);
 
index beb2111218a93616199220cdd4f2747fe99e7e96..2ae8858b462c34a47b5cde29f83008c1ec0f4306 100644 (file)
@@ -9,8 +9,8 @@ noinst_LTLIBRARIES = libcomm.la
 libcomm_la_SOURCES= \
        AcceptLimiter.cc \
        AcceptLimiter.h \
-       ListenStateData.cc \
-       ListenStateData.h \
+       ConnAcceptor.cc \
+       ConnAcceptor.h \
        \
        ConnOpener.cc \
        ConnOpener.h \
index eefd1e0bab9c6d39a97ee2c54fc35e6a5aadcd29..6925dd711de29ff59f794ede755b6700342d69e0 100644 (file)
@@ -35,7 +35,7 @@
 #include "squid.h"
 #include "comm.h"
 #include "comm/ConnOpener.h"
-#include "comm/ListenStateData.h"
+#include "comm/ConnAcceptor.h"
 #include "compat/strtoll.h"
 #include "errorpage.h"
 #include "fde.h"
@@ -159,7 +159,7 @@ public:
      * FTP stores a copy of the FD in the channel descriptor.
      * Use close() to properly close the channel.
      */
-    Comm::ListenStateData *listener;
+    Comm::ConnAcceptor *listener;
 
     /** A temporary handle to the connection being listened on.
      *  Data channel listeners die before handing the result conn over. If the conn is
@@ -459,8 +459,7 @@ void
 FtpStateData::dataClosed(const CommCloseCbParams &io)
 {
     if (data.listener) {
-        data.listener->unsubscribe();
-        delete data.listener;
+        data.listener->unsubscribe(); // listener job will self destruct.
         data.listener = NULL;
         data.listen_conn = NULL;
         data.conn = NULL;
@@ -2741,16 +2740,17 @@ ftpOpenListenSocket(FtpStateData * ftpState, int fallback)
     typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
     AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection",
                                     acceptDialer(ftpState, &FtpStateData::ftpAcceptDataConnection));
-    ftpState->data.listener = new Comm::ListenStateData(conn, false, ftpState->entry->url());
+    ftpState->data.listener = new Comm::ConnAcceptor(conn, false, ftpState->entry->url());
     ftpState->data.listener->subscribe(acceptCall);
 
-    if (!ftpState->data.listener || ftpState->data.listener->errcode != 0) {
+    if (ftpState->data.listener->errcode != 0) {
         conn->close();
     } else {
 
         if (!fallback)
             conn->local.SetPort(comm_local_port(conn->fd));
         ftpState->data.host = NULL;
+        AsyncJob::AsyncStart(ftpState->data.listener);
     }
 
     ftpState->data.listen_conn = conn;
@@ -2916,8 +2916,9 @@ FtpStateData::ftpAcceptDataConnection(const CommAcceptCbParams &io)
             typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
             AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection",
                                             acceptDialer(this, &FtpStateData::ftpAcceptDataConnection));
-            data.listener = new Comm::ListenStateData(data.listen_conn, false, data.host);
+            data.listener = new Comm::ConnAcceptor(data.listen_conn, false, data.host);
             data.listener->subscribe(acceptCall);
+            AsyncJob::AsyncStart(data.listener);
             return;
         }
     }
@@ -3022,8 +3023,9 @@ void FtpStateData::readStor()
         AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection",
                                         acceptDialer(this, &FtpStateData::ftpAcceptDataConnection));
 
-        data.listener = new Comm::ListenStateData(data.conn, false, data.host);
+        data.listener = new Comm::ConnAcceptor(data.conn, false, data.host);
         data.listener->subscribe(acceptCall);
+        AsyncJob::AsyncStart(data.listener);
     } else {
         debugs(9, DBG_IMPORTANT, HERE << "Unexpected reply code "<< std::setfill('0') << std::setw(3) << code);
         ftpFail(this);
@@ -3157,8 +3159,9 @@ ftpReadList(FtpStateData * ftpState)
         AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection",
                                         acceptDialer(ftpState, &FtpStateData::ftpAcceptDataConnection));
 
-        ftpState->data.listener = new Comm::ListenStateData(ftpState->data.conn, false, ftpState->data.host);
+        ftpState->data.listener = new Comm::ConnAcceptor(ftpState->data.conn, false, ftpState->data.host);
         ftpState->data.listener->subscribe(acceptCall);
+        AsyncJob::AsyncStart(ftpState->data.listener);
         return;
     } else if (!ftpState->flags.tried_nlst && code > 300) {
         ftpSendNlst(ftpState);
@@ -3203,8 +3206,9 @@ ftpReadRetr(FtpStateData * ftpState)
         typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
         AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection",
                                         acceptDialer(ftpState, &FtpStateData::ftpAcceptDataConnection));
-        ftpState->data.listener = new Comm::ListenStateData(ftpState->data.conn, false, ftpState->data.host);
+        ftpState->data.listener = new Comm::ConnAcceptor(ftpState->data.conn, false, ftpState->data.host);
         ftpState->data.listener->subscribe(acceptCall);
+        AsyncJob::AsyncStart(ftpState->data.listener);
     } else if (code >= 300) {
         if (!ftpState->flags.try_slash_hack) {
             /* Try this as a directory missing trailing slash... */
@@ -3866,8 +3870,7 @@ FtpChannel::close()
 {
     // channels with active listeners will be closed when the listener handler dies.
     if (listener) {
-        listener->unsubscribe();
-        delete listener;
+        listener->unsubscribe(); /// listener job will self-destruct.
         listener = NULL;
         comm_remove_close_handler(conn->fd, closer);
         closer = NULL;