]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Bug 2581, Bug 3081, Bug 2948: various TCP socket connection problems
authorAmos Jeffries <squid3@treenet.co.nz>
Wed, 26 Jan 2011 03:47:13 +0000 (16:47 +1300)
committerAmos Jeffries <squid3@treenet.co.nz>
Wed, 26 Jan 2011 03:47:13 +0000 (16:47 +1300)
Bug 3081:
  During conversion of listening socket handlers to AsyncCalls a violation
of the AsyncCall API was introduced. Resulting in occasional crashes from
invalid re-use of call objects.

This implements a TcpAcceptor async job which receives a listening socket
and a CallSubscription. For every connection attempt on the listener socket
a new AsyncCall is spawned from the subscription template.

Initial users are the HTTP and HTTPS listening sockets and FTP data channel.

In order to implement this job in FTP the logics surrounding data channel
handling had to be extended and reworked. Fixing bug 2948 and 2581 in the
process.

14 files changed:
src/CommCalls.h
src/ProtoPort.cc
src/ProtoPort.h
src/base/AsyncCall.h
src/base/Subscription.h
src/client_side.cc
src/comm.cc
src/comm/AcceptLimiter.cc
src/comm/AcceptLimiter.h
src/comm/ListenStateData.h [deleted file]
src/comm/Makefile.am
src/comm/TcpAcceptor.cc [moved from src/comm/ListenStateData.cc with 54% similarity]
src/comm/TcpAcceptor.h [new file with mode: 0644]
src/ftp.cc

index 84e4a72875a308653fa7c64675a410dd470306ad..7bf0ded84a452e7ef345ff9b4487f458bdac9ad1 100644 (file)
@@ -176,6 +176,7 @@ class CommAcceptCbPtrFun: public CallDialer,
 {
 public:
     typedef CommAcceptCbParams Params;
+    typedef RefCount<CommAcceptCbPtrFun> Pointer;
 
     CommAcceptCbPtrFun(IOACB *aHandler, const CommAcceptCbParams &aParams);
     void dial();
@@ -259,11 +260,19 @@ template <class Dialer>
 class CommCbFunPtrCallT: public AsyncCall
 {
 public:
+    typedef RefCount<CommCbFunPtrCallT<Dialer> > Pointer;
     typedef typename Dialer::Params Params;
 
     inline CommCbFunPtrCallT(int debugSection, int debugLevel,
                              const char *callName, const Dialer &aDialer);
 
+    inline CommCbFunPtrCallT(const CommCbFunPtrCallT &o) :
+            AsyncCall(o.debugSection, o.debugLevel, o.name),
+            dialer(o.dialer)
+        {}
+
+    ~CommCbFunPtrCallT() {}
+
     virtual CallDialer* getDialer() { return &dialer; }
 
 public:
@@ -272,6 +281,9 @@ public:
 protected:
     inline virtual bool canFire();
     inline virtual void fire();
+
+private:
+    CommCbFunPtrCallT & operator=(const CommCbFunPtrCallT &); // not defined. not permitted.
 };
 
 // Conveninece wrapper: It is often easier to call a templated function than
index 98d4142a62992876fab889434fe8270f3c67ff77..de42e8e204afe832c110319585a246670affd3c1 100644 (file)
@@ -3,15 +3,17 @@
  */
 
 #include "squid.h"
+#include "comm.h"
 #include "ProtoPort.h"
 #if HAVE_LIMITS
 #include <limits>
 #endif
 
-http_port_list::http_port_list(const char *aProtocol)
+http_port_list::http_port_list(const char *aProtocol) :
+        listenFd(-1)
 #if USE_SSL
-        :
-        http(*this), dynamicCertMemCacheSize(std::numeric_limits<size_t>::max())
+        , http(*this)
+        , dynamicCertMemCacheSize(std::numeric_limits<size_t>::max())
 #endif
 {
     protocol = xstrdup(aProtocol);
@@ -19,7 +21,10 @@ http_port_list::http_port_list(const char *aProtocol)
 
 http_port_list::~http_port_list()
 {
-    delete listener;
+    if (listenFd >= 0) {
+        comm_close(listenFd);
+        listenFd = -1;
+    }
 
     safe_free(name);
     safe_free(defaultsite);
index 6bee84f57862711765bd6f1524e163600461efd3..323a6d2084b3de3ced6ec77b2ce653dc9d10a8fe 100644 (file)
@@ -4,9 +4,7 @@
 #ifndef SQUID_PROTO_PORT_H
 #define SQUID_PROTO_PORT_H
 
-//#include "typedefs.h"
 #include "cbdata.h"
-#include "comm/ListenStateData.h"
 
 #if USE_SSL
 #include "ssl/gadgets.h"
@@ -43,11 +41,11 @@ struct http_port_list {
     } tcp_keepalive;
 
     /**
-     * The FD listening socket handler.
-     * If not NULL we are actively listening for client requests.
-     * delete to close the socket.
+     * The FD listening socket.
+     * If >= 0 we are actively listening for client requests.
+     * use comm_close(listenFd) to stop.
      */
-    Comm::ListenStateData *listener;
+    int listenFd;
 
 #if USE_SSL
     // XXX: temporary hack to ease move of SSL options to http_port
index 12da25a733d92dd4526acb09632dbd08b538a3cd..74f2864baec27256474106b4283c3bc52a46fecc 100644 (file)
@@ -84,6 +84,10 @@ protected:
 
 private:
     const char *isCanceled; // set to the cancelation reason by cancel()
+
+    // not implemented to prevent nil calls from being passed around and unknowingly scheduled, for now.
+    AsyncCall();
+    AsyncCall(const AsyncCall &);
 };
 
 inline
@@ -122,6 +126,12 @@ public:
                const Dialer &aDialer): AsyncCall(aDebugSection, aDebugLevel, aName),
             dialer(aDialer) {}
 
+    AsyncCallT(const AsyncCallT<Dialer> &o):
+            AsyncCall(o.debugSection, o.debugLevel, o.name),
+            dialer(o.dialer) {}
+
+    ~AsyncCallT() {}
+
     CallDialer *getDialer() { return &dialer; }
 
 protected:
@@ -132,6 +142,9 @@ protected:
     virtual void fire() { dialer.dial(*this); }
 
     Dialer dialer;
+
+private:
+    AsyncCallT & operator=(const AsyncCallT &); // not defined. call assignments not permitted.
 };
 
 template <class Dialer>
index ec9cc517f726d0159e83e59c12d14fa1858cfd75..74e97bba8755f533796c9556b5ae091d882f64ab 100644 (file)
@@ -42,7 +42,7 @@ class CallSubscription: public Subscription
 public:
     /// Must be passed an object. nil pointers are not permitted.
     explicit CallSubscription(const RefCount<Call_> &aCall) : call(aCall) { assert(aCall != NULL); }
-    virtual AsyncCall::Pointer callback() const { return new Call_(call); }
+    virtual AsyncCall::Pointer callback() const { return new Call_(*call); }
 
 private:
     const RefCount<Call_> call; ///< gets copied to create callback calls
index efa2d5bb1e2567d2197934cb96c4ce229953f140..391d58e78187c5b02be86a33c0f667e2bcec570b 100644 (file)
 #include "ClientRequestContext.h"
 #include "clientStream.h"
 #include "comm.h"
-#include "comm/Write.h"
-#include "comm/ListenStateData.h"
+#include "CommCalls.h"
 #include "comm/Loops.h"
+#include "comm/Write.h"
+#include "comm/TcpAcceptor.h"
 #include "ConnectionDetail.h"
 #include "eui/Config.h"
 #include "fde.h"
 #include "ident/Config.h"
 #include "ident/Ident.h"
 #include "ip/Intercept.h"
+#include "ipc/FdNotes.h"
 #include "ipc/StartListening.h"
 #include "MemBuf.h"
 #include "MemObject.h"
 #define comm_close comm_lingering_close
 #endif
 
-/// dials clientHttpConnectionOpened or clientHttpsConnectionOpened call
+/// dials clientListenerConnectionOpened call
 class ListeningStartedDialer: public CallDialer, public Ipc::StartListeningCb
 {
 public:
-    typedef void (*Handler)(int fd, int errNo, http_port_list *portCfg);
-    ListeningStartedDialer(Handler aHandler, http_port_list *aPortCfg):
-            handler(aHandler), portCfg(aPortCfg) {}
+    typedef void (*Handler)(int fd, int flags, int errNo, http_port_list *portCfg, const Ipc::FdNoteId note, const Subscription::Pointer &sub);
+    ListeningStartedDialer(Handler aHandler, int openFlags, http_port_list *aPortCfg, const Ipc::FdNoteId note, const Subscription::Pointer &aSub):
+            handler(aHandler), portCfg(aPortCfg), portTypeNote(note), commOpenListenerFlags(openFlags), sub(aSub) {}
 
     virtual void print(std::ostream &os) const {
         startPrint(os) <<
@@ -148,20 +150,19 @@ public:
     }
 
     virtual bool canDial(AsyncCall &) const { return true; }
-    virtual void dial(AsyncCall &) { (handler)(fd, errNo, portCfg); }
+    virtual void dial(AsyncCall &) { (handler)(fd, commOpenListenerFlags, errNo, portCfg, portTypeNote, sub); }
 
 public:
     Handler handler;
 
 private:
-    http_port_list *portCfg; ///< from Config.Sockaddr.http
+    http_port_list *portCfg;   ///< from Config.Sockaddr.http
+    Ipc::FdNoteId portTypeNote;    ///< Type of IPC socket being opened
+    int commOpenListenerFlags; ///< flags used by comm_open_listener
+    Subscription::Pointer sub; ///< The handler to be subscribed for this connetion listener
 };
 
-
-static void clientHttpConnectionOpened(int fd, int errNo, http_port_list *s);
-#if USE_SSL
-static void clientHttpsConnectionOpened(int fd, int errNo, http_port_list *s);
-#endif
+static void clientListenerConnectionOpened(int fd, int flags, int errNo, http_port_list *s, const Ipc::FdNoteId portTypeNote, const Subscription::Pointer &sub);
 
 /* our socket-related context */
 
@@ -3115,14 +3116,14 @@ connStateCreate(const Ip::Address &peer, const Ip::Address &me, int fd, http_por
 
 /** Handle a new connection on HTTP socket. */
 void
-httpAccept(int sock, int newfd, ConnectionDetail *details,
-           comm_err_t flag, int xerrno, void *data)
+httpAccept(int, int newfd, ConnectionDetail *details, comm_err_t flag, int xerrno, void *data)
 {
     http_port_list *s = (http_port_list *)data;
     ConnStateData *connState = NULL;
 
     if (flag != COMM_OK) {
-        debugs(33, 1, "httpAccept: FD " << sock << ": accept failure: " << xstrerr(xerrno));
+        // Its possible the call was still queued when the client disconnected
+        debugs(33, 2, "httpAccept: FD " << s->listenFd << ": accept failure: " << xstrerr(xerrno));
         return;
     }
 
@@ -3361,15 +3362,14 @@ clientNegotiateSSL(int fd, void *data)
 
 /** handle a new HTTPS connection */
 static void
-httpsAccept(int sock, int newfd, ConnectionDetail *details,
-            comm_err_t flag, int xerrno, void *data)
+httpsAccept(int, int newfd, ConnectionDetail *details, comm_err_t flag, int xerrno, void *data)
 {
     https_port_list *s = (https_port_list *)data;
     SSL_CTX *sslContext = s->staticSslContext.get();
 
     if (flag != COMM_OK) {
-        errno = xerrno;
-        debugs(33, 1, "httpsAccept: FD " << sock << ": accept failure: " << xstrerr(xerrno));
+        // Its possible the call was still queued when the client disconnected
+        debugs(33, 2, "httpsAccept: FD " << s->listenFd << ": accept failure: " << xstrerr(xerrno));
         return;
     }
 
@@ -3552,7 +3552,7 @@ ConnStateData::switchToHttps(const char *host)
 
 /// check FD after clientHttp[s]ConnectionOpened, adjust HttpSockets as needed
 static bool
-OpenedHttpSocket(int fd, const char *msgIfFail)
+OpenedHttpSocket(int fd, const Ipc::FdNoteId portType)
 {
     if (fd < 0) {
         Must(NHttpSockets > 0); // we tried to open some
@@ -3560,7 +3560,7 @@ OpenedHttpSocket(int fd, const char *msgIfFail)
         Must(HttpSockets[NHttpSockets] < 0); // no extra fds received
 
         if (!NHttpSockets) // we could not open any listen sockets at all
-            fatal(msgIfFail);
+            fatalf("Unable to open %s",FdNote(portType));
 
         return false;
     }
@@ -3616,13 +3616,16 @@ clientHttpConnectionsOpen(void)
         const int openFlags = COMM_NONBLOCKING |
                               (s->spoof_client_ip ? COMM_TRANSPARENT : 0);
 
-        AsyncCall::Pointer callback = asyncCall(33,2,
-                                                "clientHttpConnectionOpened",
-                                                ListeningStartedDialer(&clientHttpConnectionOpened, s));
-        Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->s, openFlags,
-                            Ipc::fdnHttpSocket, callback);
+        // setup the subscriptions such that new connections accepted by listenConn are handled by HTTP
+        typedef CommCbFunPtrCallT<CommAcceptCbPtrFun> AcceptCall;
+        RefCount<AcceptCall> subCall = commCbCall(5, 5, "httpAccept", CommAcceptCbPtrFun(httpAccept, s));
+        Subscription::Pointer sub = new CallSubscription<AcceptCall>(subCall);
+
+        AsyncCall::Pointer listenCall = asyncCall(33,2, "clientListenerConnectionOpened",
+                                                  ListeningStartedDialer(&clientListenerConnectionOpened, openFlags, s, Ipc::fdnHttpSocket, sub));
+        Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->s, openFlags, Ipc::fdnHttpSocket, listenCall);
 
-        HttpSockets[NHttpSockets++] = -1; // set in clientHttpConnectionOpened
+        HttpSockets[NHttpSockets++] = -1; // set in clientListenerConnectionOpened
     }
 
 #if USE_SSL
@@ -3635,27 +3638,27 @@ clientHttpConnectionsOpen(void)
 
 /// process clientHttpConnectionsOpen result
 static void
-clientHttpConnectionOpened(int fd, int, http_port_list *s)
+clientListenerConnectionOpened(int fd, int flags, int errNo, http_port_list *s, const Ipc::FdNoteId portTypeNote, const Subscription::Pointer &sub)
 {
-    if (!OpenedHttpSocket(fd, "Cannot open HTTP Port"))
+    s->listenFd = fd;
+    if (!OpenedHttpSocket(s->listenFd, portTypeNote))
         return;
 
     Must(s);
+    Must(s->listenFd >= 0);
 
-    AsyncCall::Pointer call = commCbCall(5,5, "SomeCommAcceptHandler(httpAccept)",
-                                         CommAcceptCbPtrFun(httpAccept, s));
+    // TCP: setup a job to handle accept() with subscribed handler
+    AsyncJob::Start(new Comm::TcpAcceptor(s->listenFd, s->s, flags, FdNote(portTypeNote), sub));
 
-    s->listener = new Comm::ListenStateData(fd, call, true);
-
-    debugs(1, 1, "Accepting " <<
+    debugs(1, 1, "Accepting" <<
            (s->intercepted ? " intercepted" : "") <<
            (s->spoof_client_ip ? " spoofing" : "") <<
            (s->sslBump ? " bumpy" : "") <<
            (s->accel ? " accelerated" : "")
-           << " HTTP connections at " << s->s
-           << ", FD " << fd << "." );
+           << FdNote(portTypeNote) << " connections at "
+           << " FD " << s->listenFd << " on " << s->s);
 
-    Must(AddOpenedHttpSocket(fd)); // otherwise, we have received a fd we did not ask for
+    Must(AddOpenedHttpSocket(s->listenFd)); // otherwise, we have received a fd we did not ask for
 }
 
 #if USE_SSL
@@ -3677,35 +3680,23 @@ clientHttpsConnectionsOpen(void)
             continue;
         }
 
-        AsyncCall::Pointer call = asyncCall(33, 2, "clientHttpsConnectionOpened",
-                                            ListeningStartedDialer(&clientHttpsConnectionOpened, &s->http));
-
-        Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->http.s, COMM_NONBLOCKING,
-                            Ipc::fdnHttpsSocket, call);
-
-        HttpSockets[NHttpSockets++] = -1;
-    }
-}
-
-/// process clientHttpsConnectionsOpen result
-static void
-clientHttpsConnectionOpened(int fd, int, http_port_list *s)
-{
-    if (!OpenedHttpSocket(fd, "Cannot open HTTPS Port"))
-        return;
-
-    Must(s);
+        const int openFlags = COMM_NONBLOCKING |
+                              (s->spoof_client_ip ? COMM_TRANSPARENT : 0);
 
-    AsyncCall::Pointer call = commCbCall(5,5, "SomeCommAcceptHandler(httpsAccept)",
-                                         CommAcceptCbPtrFun(httpsAccept, s));
+        // setup the subscriptions such that new connections accepted by listenConn are handled by HTTPS
+        typedef CommCbFunPtrCallT<CommAcceptCbPtrFun> AcceptCall;
+        RefCount<AcceptCall> subCall = commCbCall(5, 5, "httpsAccept", CommAcceptCbPtrFun(httpsAccept, s));
+        Subscription::Pointer sub = new CallSubscription<AcceptCall>(subCall);
 
-    s->listener = new Comm::ListenStateData(fd, call, true);
+        AsyncCall::Pointer listenCall = asyncCall(33, 2, "clientListenerConnectionOpened",
+                                                  ListeningStartedDialer(&clientListenerConnectionOpened, openFlags,
+                                                                         &s->http, Ipc::fdnHttpsSocket, sub));
 
-    debugs(1, 1, "Accepting HTTPS connections at " << s->s << ", FD " << fd << ".");
+        Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->s, openFlags, Ipc::fdnHttpsSocket, listenCall);
 
-    Must(AddOpenedHttpSocket(fd)); // otherwise, we have received a fd we did not ask for
+        HttpSockets[NHttpSockets++] = -1;
+    }
 }
-
 #endif
 
 void
@@ -3724,19 +3715,19 @@ void
 clientHttpConnectionsClose(void)
 {
     for (http_port_list *s = Config.Sockaddr.http; s; s = s->next) {
-        if (s->listener) {
-            debugs(1, 1, "FD " << s->listener->fd << " Closing HTTP connection");
-            delete s->listener;
-            s->listener = NULL;
+        if (s->listenFd >= 0) {
+            debugs(1, 1, "FD " << s->listenFd << " Closing HTTP connection");
+            comm_close(s->listenFd);
+            s->listenFd = -1;
         }
     }
 
 #if USE_SSL
     for (http_port_list *s = Config.Sockaddr.https; s; s = s->next) {
-        if (s->listener) {
-            debugs(1, 1, "FD " << s->listener->fd << " Closing HTTPS connection");
-            delete s->listener;
-            s->listener = NULL;
+        if (s->listenFd >= 0) {
+            debugs(1, 1, "FD " << s->listenFd << " Closing HTTPS connection");
+            comm_close(s->listenFd);
+            s->listenFd = -1;
         }
     }
 #endif
index ad28dd1cea94ea50d9ebc03b2d2628c06828fd0d..f88caff2cb65e8a944737f3ddae48d4c49da094e 100644 (file)
@@ -40,9 +40,9 @@
 #include "comm/AcceptLimiter.h"
 #include "comm/comm_internal.h"
 #include "comm/IoCallback.h"
-#include "comm/Write.h"
-#include "comm/ListenStateData.h"
 #include "comm/Loops.h"
+#include "comm/Write.h"
+#include "comm/TcpAcceptor.h"
 #include "CommIO.h"
 #include "CommRead.h"
 #include "ConnectionDetail.h"
@@ -144,7 +144,7 @@ fd_debug_t *fdd_table = NULL;
 bool
 isOpen(const int fd)
 {
-    return fd_table[fd].flags.open != 0;
+    return fd >= 0 && fd_table[fd].flags.open != 0;
 }
 
 /**
index 7881db0348afa300fce9286a761da5ff6282d66d..d0f5763e0f08c4e2bf72ad8f2a9fbbf6327a9577 100644 (file)
@@ -1,6 +1,6 @@
 #include "config.h"
 #include "comm/AcceptLimiter.h"
-#include "comm/ListenStateData.h"
+#include "comm/TcpAcceptor.h"
 #include "fde.h"
 
 Comm::AcceptLimiter Comm::AcceptLimiter::Instance_;
@@ -11,22 +11,41 @@ Comm::AcceptLimiter &Comm::AcceptLimiter::Instance()
 }
 
 void
-Comm::AcceptLimiter::defer(Comm::ListenStateData *afd)
+Comm::AcceptLimiter::defer(Comm::TcpAcceptor *afd)
 {
     afd->isLimited++;
     debugs(5, 5, HERE << "FD " << afd->fd << " x" << afd->isLimited);
     deferred.push_back(afd);
 }
 
+void
+Comm::AcceptLimiter::removeDead(const Comm::TcpAcceptor *afd)
+{
+    for (unsigned int i = 0; i < deferred.size() && afd->isLimited > 0; i++) {
+        if (deferred[i] == afd) {
+            deferred[i]->isLimited--;
+            deferred[i] = NULL; // fast. kick() will skip empty entries later.
+            debugs(5, 5, HERE << "FD " << afd->fd << " x" << afd->isLimited);
+        }
+    }
+}
+
 void
 Comm::AcceptLimiter::kick()
 {
+    // TODO: this could be optimized further with an iterator to search
+    //       looking for first non-NULL, followed by dumping the first N
+    //       with only one shift()/pop_front operation
+
     debugs(5, 5, HERE << " size=" << deferred.size());
-    if (deferred.size() > 0 && fdNFree() >= RESERVED_FD) {
-        debugs(5, 5, HERE << " doing one.");
+    while (deferred.size() > 0 && fdNFree() >= RESERVED_FD) {
         /* NP: shift() is equivalent to pop_front(). Giving us a FIFO queue. */
-        ListenStateData *temp = deferred.shift();
-        temp->isLimited--;
-        temp->acceptNext();
+        TcpAcceptor *temp = deferred.shift();
+        if (temp != NULL) {
+            debugs(5, 5, HERE << " doing one.");
+            temp->isLimited--;
+            temp->acceptNext();
+            break;
+        }
     }
 }
index 57313b142c1161af76df115a2495e663bfe803fa..3d5540f7e474cc3ab84556287ad36e0cfd886341 100644 (file)
@@ -6,7 +6,7 @@
 namespace Comm
 {
 
-class ListenStateData;
+class TcpAcceptor;
 
 /**
  * FIFO Queue holding listener socket handlers which have been activated
@@ -25,7 +25,10 @@ public:
     static AcceptLimiter &Instance();
 
     /** delay accepting a new client connection. */
-    void defer(Comm::ListenStateData *afd);
+    void defer(Comm::TcpAcceptor *afd);
+
+    /** remove all records of an acceptor. Only to be called by the ConnAcceptor::swanSong() */
+    void removeDead(const Comm::TcpAcceptor *afd);
 
     /** try to accept and begin processing any delayed client connections. */
     void kick();
@@ -34,7 +37,7 @@ private:
     static AcceptLimiter Instance_;
 
     /** FIFO queue */
-    Vector<Comm::ListenStateData*> deferred;
+    Vector<Comm::TcpAcceptor*> deferred;
 };
 
 }; // namepace Comm
diff --git a/src/comm/ListenStateData.h b/src/comm/ListenStateData.h
deleted file mode 100644 (file)
index b5b5872..0000000
+++ /dev/null
@@ -1,54 +0,0 @@
-#ifndef SQUID_LISTENERSTATEDATA_H
-#define SQUID_LISTENERSTATEDATA_H
-
-#include "base/AsyncCall.h"
-#include "comm.h"
-#if HAVE_MAP
-#include <map>
-#endif
-
-class ConnectionDetail;
-
-namespace Comm
-{
-
-class ListenStateData
-{
-
-public:
-    ListenStateData(int fd, AsyncCall::Pointer &call, bool accept_many);
-    ListenStateData(const ListenStateData &r); // not implemented.
-    ~ListenStateData();
-
-    void subscribe(AsyncCall::Pointer &call);
-    void acceptNext();
-    void notify(int newfd, comm_err_t flag, const ConnectionDetail &details);
-
-    int fd;
-
-    /// errno code of the last accept() or listen() action if one occurred.
-    int errcode;
-
-    /// whether this socket is delayed and on the AcceptLimiter queue.
-    int32_t isLimited;
-
-private:
-    /// Method to test if there are enough file escriptors to open a new client connection
-    /// if not the accept() will be postponed
-    static bool okToAccept();
-
-    /// Method callback for whenever an FD is ready to accept a client connection.
-    static void doAccept(int fd, void *data);
-
-    void acceptOne();
-    int oldAccept(ConnectionDetail &details);
-
-    AsyncCall::Pointer theCallback;
-    bool mayAcceptMore;
-
-    void setListen();
-};
-
-} // namespace Comm
-
-#endif /* SQUID_LISTENERSTATEDATA_H */
index 3ddf7797da262b7b81c470d2e9849ec632490cc6..cdb8f5032a8dcbf072fd4e0c1edaa524de3ea9d3 100644 (file)
@@ -7,8 +7,6 @@ noinst_LTLIBRARIES = libcomm.la
 libcomm_la_SOURCES= \
        AcceptLimiter.cc \
        AcceptLimiter.h \
-       ListenStateData.cc \
-       ListenStateData.h \
        Loops.h \
        ModDevPoll.cc \
        ModEpoll.cc \
@@ -16,6 +14,8 @@ libcomm_la_SOURCES= \
        ModPoll.cc \
        ModSelect.cc \
        ModSelectWin32.cc \
+       TcpAcceptor.cc \
+       TcpAcceptor.h \
        \
        IoCallback.cc \
        IoCallback.h \
similarity index 54%
rename from src/comm/ListenStateData.cc
rename to src/comm/TcpAcceptor.cc
index 62816145f569a35f1c15105431fb398b482fe9f5..6931a87e397e6c2fa7c97317cd5b449ca0ab3548 100644 (file)
  */
 
 #include "squid.h"
+#include "base/TextException.h"
 #include "CommCalls.h"
 #include "comm/AcceptLimiter.h"
 #include "comm/comm_internal.h"
-#include "comm/ListenStateData.h"
 #include "comm/Loops.h"
+#include "comm/TcpAcceptor.h"
 #include "ConnectionDetail.h"
 #include "fde.h"
 #include "protos.h"
 #include "SquidTime.h"
 
+namespace Comm {
+    CBDATA_CLASS_INIT(TcpAcceptor);
+};
+
+Comm::TcpAcceptor::TcpAcceptor(const int listenFd, const Ip::Address &laddr, int flags,
+                               const char *note, const Subscription::Pointer &aSub) :
+        AsyncJob("Comm::TcpAcceptor"),
+        errcode(0),
+        fd(listenFd),
+        isLimited(0),
+        theCallSub(aSub),
+        local_addr(laddr)
+{}
+
+void
+Comm::TcpAcceptor::subscribe(const Subscription::Pointer &aSub)
+{
+    debugs(5, 5, HERE << status() << " AsyncCall Subscription: " << aSub);
+    unsubscribe("subscription change");
+    theCallSub = aSub;
+}
+
+void
+Comm::TcpAcceptor::unsubscribe(const char *reason)
+{
+    debugs(5, 5, HERE << status() << " AsyncCall Subscription " << theCallSub << " removed: " << reason);
+    theCallSub = NULL;
+}
+
+void
+Comm::TcpAcceptor::start()
+{
+    debugs(5, 5, HERE << status() << " AsyncCall Subscription: " << theCallSub);
+
+    Must(isOpen(fd));
+
+    setListen();
+
+    // if no error so far start accepting connections.
+    if (errcode == 0)
+        SetSelect(fd, COMM_SELECT_READ, doAccept, this, 0);
+}
+
+bool
+Comm::TcpAcceptor::doneAll() const
+{
+    // stop when FD is closed
+    if (!isOpen(fd)) {
+        return AsyncJob::doneAll();
+    }
+
+    // stop when handlers are gone
+    if (theCallSub == NULL) {
+        return AsyncJob::doneAll();
+    }
+
+    // open FD with handlers...keep accepting.
+    return false;
+}
+
+void
+Comm::TcpAcceptor::swanSong()
+{
+    debugs(5,5, HERE);
+    unsubscribe("swanSong");
+    fd = -1;
+    AcceptLimiter::Instance().removeDead(this);
+    AsyncJob::swanSong();
+}
+
+const char *
+Comm::TcpAcceptor::status() const
+{
+    static char ipbuf[MAX_IPSTRLEN] = {'\0'};
+    if (ipbuf[0] == '\0')
+        local_addr.ToHostname(ipbuf, MAX_IPSTRLEN);
+
+    static MemBuf buf;
+    buf.reset();
+    buf.Printf(" FD %d, %s",fd, ipbuf);
+
+    const char *jobStatus = AsyncJob::status();
+    buf.append(jobStatus, strlen(jobStatus));
+
+    return buf.content();
+}
+
 /**
  * New-style listen and accept routines
  *
  * accept()ed some time later.
  */
 void
-Comm::ListenStateData::setListen()
+Comm::TcpAcceptor::setListen()
 {
     errcode = 0; // reset local errno copy.
     if (listen(fd, Squid_MaxFD >> 2) < 0) {
-        debugs(50, 0, HERE << "listen(FD " << fd << ", " << (Squid_MaxFD >> 2) << "): " << xstrerror());
+        debugs(50, DBG_CRITICAL, "ERROR: listen(" << status() << ", " << (Squid_MaxFD >> 2) << "): " << xstrerror());
         errcode = errno;
         return;
     }
@@ -67,37 +155,19 @@ Comm::ListenStateData::setListen()
         debugs(5, DBG_IMPORTANT, "Installing accept filter '" << Config.accept_filter << "' on FD " << fd);
         xstrncpy(afa.af_name, Config.accept_filter, sizeof(afa.af_name));
         if (setsockopt(fd, SOL_SOCKET, SO_ACCEPTFILTER, &afa, sizeof(afa)) < 0)
-            debugs(5, DBG_CRITICAL, "SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror());
+            debugs(5, DBG_CRITICAL, "WARNING: SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror());
 #elif defined(TCP_DEFER_ACCEPT)
         int seconds = 30;
         if (strncmp(Config.accept_filter, "data=", 5) == 0)
             seconds = atoi(Config.accept_filter + 5);
         if (setsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &seconds, sizeof(seconds)) < 0)
-            debugs(5, DBG_CRITICAL, "TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror());
+            debugs(5, DBG_CRITICAL, "WARNING: TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror());
 #else
-        debugs(5, DBG_CRITICAL, "accept_filter not supported on your OS");
+        debugs(5, DBG_CRITICAL, "WARNING: accept_filter not supported on your OS");
 #endif
     }
 }
 
-Comm::ListenStateData::ListenStateData(int aFd, AsyncCall::Pointer &call, bool accept_many) :
-        fd(aFd),
-        theCallback(call),
-        mayAcceptMore(accept_many)
-{
-    assert(aFd >= 0);
-    debugs(5, 5, HERE << "FD " << fd << " AsyncCall: " << call);
-    assert(isOpen(aFd));
-    setListen();
-    SetSelect(fd, COMM_SELECT_READ, doAccept, this, 0);
-}
-
-Comm::ListenStateData::~ListenStateData()
-{
-    comm_close(fd);
-    fd = -1;
-}
-
 /**
  * This private callback is called whenever a filedescriptor is ready
  * to dupe itself and fob off an accept()ed connection
@@ -108,23 +178,30 @@ Comm::ListenStateData::~ListenStateData()
  * done later when enough sockets become available.
  */
 void
-Comm::ListenStateData::doAccept(int fd, void *data)
+Comm::TcpAcceptor::doAccept(int fd, void *data)
 {
-    debugs(5, 2, HERE << "New connection on FD " << fd);
+    try {
+        debugs(5, 2, HERE << "New connection on FD " << fd);
 
-    assert(isOpen(fd));
-    ListenStateData *afd = static_cast<ListenStateData*>(data);
+        Must(isOpen(fd));
+        TcpAcceptor *afd = static_cast<TcpAcceptor*>(data);
+
+        if (!okToAccept()) {
+            AcceptLimiter::Instance().defer(afd);
+        } else {
+            afd->acceptNext();
+        }
+        SetSelect(fd, COMM_SELECT_READ, Comm::TcpAcceptor::doAccept, afd, 0);
 
-    if (!okToAccept()) {
-        AcceptLimiter::Instance().defer(afd);
-    } else {
-        afd->acceptNext();
+    } catch(const std::exception &e) {
+        fatalf("FATAL: error while accepting new client connection: %s\n", e.what());
+    } catch(...) {
+        fatal("FATAL: error while accepting new client connection: [unkown]\n");
     }
-    SetSelect(fd, COMM_SELECT_READ, Comm::ListenStateData::doAccept, afd, 0);
 }
 
 bool
-Comm::ListenStateData::okToAccept()
+Comm::TcpAcceptor::okToAccept()
 {
     static time_t last_warn = 0;
 
@@ -140,7 +217,7 @@ Comm::ListenStateData::okToAccept()
 }
 
 void
-Comm::ListenStateData::acceptOne()
+Comm::TcpAcceptor::acceptOne()
 {
     /*
      * We don't worry about running low on FDs here.  Instead,
@@ -149,42 +226,45 @@ Comm::ListenStateData::acceptOne()
      */
 
     /* Accept a new connection */
-    ConnectionDetail connDetails;
-    int newfd = oldAccept(connDetails);
+    ConnectionDetail newConnDetails;
+    int newFd = -1;
+    const comm_err_t flag = oldAccept(newConnDetails, &newFd);
 
     /* Check for errors */
-    if (newfd < 0) {
+    if (!isOpen(newFd)) {
 
-        if (newfd == COMM_NOMESSAGE) {
+        if (flag == COMM_NOMESSAGE) {
             /* register interest again */
-            debugs(5, 5, HERE << "try later: FD " << fd << " handler: " << theCallback);
+            debugs(5, 5, HERE << "try later: FD " << fd << " handler Subscription: " << theCallSub);
             SetSelect(fd, COMM_SELECT_READ, doAccept, this, 0);
             return;
         }
 
         // A non-recoverable error; notify the caller */
-        debugs(5, 5, HERE << "non-recoverable error: FD " << fd << " handler: " << theCallback);
-        notify(-1, COMM_ERROR, connDetails);
-        mayAcceptMore = false;
+        debugs(5, 5, HERE << "non-recoverable error:" << status() << " handler Subscription: " << theCallSub);
+        notify(flag, newConnDetails, newFd);
+        mustStop("Listener socket closed");
         return;
     }
 
-    debugs(5, 5, HERE << "accepted: FD " << fd <<
-           " newfd: " << newfd << " from: " << connDetails.peer <<
-           " handler: " << theCallback);
-    notify(newfd, COMM_OK, connDetails);
+    debugs(5, 5, HERE << "Listener: FD " << fd <<
+           " accepted new connection from " << newConnDetails.peer <<
+           " handler Subscription: " << theCallSub);
+    notify(flag, newConnDetails, newFd);
 }
 
 void
-Comm::ListenStateData::acceptNext()
+Comm::TcpAcceptor::acceptNext()
 {
-    assert(isOpen(fd));
+    Must(isOpen(fd));
     debugs(5, 2, HERE << "connection on FD " << fd);
     acceptOne();
 }
 
+// XXX: obsolete comment?
+// NP: can't be a const function because syncWithComm() side effects hit theCallSub->callback().
 void
-Comm::ListenStateData::notify(int newfd, comm_err_t flag, const ConnectionDetail &connDetails)
+Comm::TcpAcceptor::notify(const comm_err_t flag, const ConnectionDetail &connDetails, int newFd) const
 {
     // listener socket handlers just abandon the port with COMM_ERR_CLOSING
     // it should only happen when this object is deleted...
@@ -192,26 +272,29 @@ Comm::ListenStateData::notify(int newfd, comm_err_t flag, const ConnectionDetail
         return;
     }
 
-    if (theCallback != NULL) {
-        typedef CommAcceptCbParams Params;
-        Params &params = GetCommParams<Params>(theCallback);
+    if (theCallSub != NULL) {
+        AsyncCall::Pointer call = theCallSub->callback();
+        CommAcceptCbParams &params = GetCommParams<CommAcceptCbParams>(call);
         params.fd = fd;
-        params.nfd = newfd;
+        params.nfd = newFd;
         params.details = connDetails;
         params.flag = flag;
         params.xerrno = errcode;
-        ScheduleCallHere(theCallback);
-        if (!mayAcceptMore)
-            theCallback = NULL;
+        ScheduleCallHere(call);
     }
 }
 
 /**
  * accept() and process
- * Wait for an incoming connection on FD.
+ * Wait for an incoming connection on our listener socket.
+ *
+ * \retval COMM_OK         success. details parameter filled.
+ * \retval COMM_NOMESSAGE  attempted accept() but nothing useful came in.
+ * \retval COMM_ERROR      an outright failure occured.
+ *                         Or if this client has too many connections already.
  */
-int
-Comm::ListenStateData::oldAccept(ConnectionDetail &details)
+comm_err_t
+Comm::TcpAcceptor::oldAccept(ConnectionDetail &details, int *newFd)
 {
     PROF_start(comm_accept);
     statCounter.syscalls.sock.accepts++;
@@ -228,17 +311,19 @@ Comm::ListenStateData::oldAccept(ConnectionDetail &details)
         PROF_stop(comm_accept);
 
         if (ignoreErrno(errno)) {
-            debugs(50, 5, HERE << "FD " << fd << ": " << xstrerror());
+            debugs(50, 5, HERE << status() << ": " << xstrerror());
             return COMM_NOMESSAGE;
         } else if (ENFILE == errno || EMFILE == errno) {
-            debugs(50, 3, HERE << "FD " << fd << ": " << xstrerror());
+            debugs(50, 3, HERE << status() << ": " << xstrerror());
             return COMM_ERROR;
         } else {
-            debugs(50, 1, HERE << "FD " << fd << ": " << xstrerror());
+            debugs(50, 1, HERE << status() << ": " << xstrerror());
             return COMM_ERROR;
         }
     }
 
+    Must(sock >= 0);
+    *newFd = sock;
     details.peer = *gai;
 
     if ( Config.client_ip_max_connections >= 0) {
@@ -249,15 +334,16 @@ Comm::ListenStateData::oldAccept(ConnectionDetail &details)
         }
     }
 
+    // lookup the local-end details of this new connection
     details.me.InitAddrInfo(gai);
-
     details.me.SetEmpty();
     getsockname(sock, gai->ai_addr, &gai->ai_addrlen);
     details.me = *gai;
-
-    commSetCloseOnExec(sock);
+    details.me.FreeAddrInfo(gai);
 
     /* fdstat update */
+    // XXX : these are not all HTTP requests. use a note about type and ip:port details->
+    // so we end up with a uniform "(HTTP|FTP-data|HTTPS|...) remote-ip:remote-port"
     fd_open(sock, FD_SOCKET, "HTTP Request");
 
     fdd_table[sock].close_file = NULL;
@@ -266,15 +352,16 @@ Comm::ListenStateData::oldAccept(ConnectionDetail &details)
     fde *F = &fd_table[sock];
     details.peer.NtoA(F->ipaddr,MAX_IPSTRLEN);
     F->remote_port = details.peer.GetPort();
-    F->local_addr.SetPort(details.me.GetPort());
+    F->local_addr = details.me;
     F->sock_family = details.me.IsIPv6()?AF_INET6:AF_INET;
-    details.me.FreeAddrInfo(gai);
 
+    // set socket flags
+    commSetCloseOnExec(sock);
     commSetNonBlocking(sock);
 
     /* IFF the socket is (tproxy) transparent, pass the flag down to allow spoofing */
     F->flags.transparent = fd_table[fd].flags.transparent;
 
     PROF_stop(comm_accept);
-    return sock;
+    return COMM_OK;
 }
diff --git a/src/comm/TcpAcceptor.h b/src/comm/TcpAcceptor.h
new file mode 100644 (file)
index 0000000..1c3a12f
--- /dev/null
@@ -0,0 +1,99 @@
+#ifndef SQUID_COMM_TCPACCEPTOR_H
+#define SQUID_COMM_TCPACCEPTOR_H
+
+#include "base/AsyncCall.h"
+#include "base/Subscription.h"
+#include "CommCalls.h"
+#include "comm_err_t.h"
+#include "comm/TcpAcceptor.h"
+#include "ip/Address.h"
+
+#if HAVE_MAP
+#include <map>
+#endif
+
+namespace Comm
+{
+
+class AcceptLimiter;
+
+/**
+ * Listens on an FD for new incoming connections and
+ * emits an active FD descriptor for the new client.
+ *
+ * Handles all event limiting required to quash inbound connection
+ * floods within the global FD limits of available Squid_MaxFD and
+ * client_ip_max_connections.
+ *
+ * Fills the emitted connection with all connection details able to
+ * be looked up. Currently these are the local/remote IP:port details
+ * and the listening socket transparent-mode flag.
+ */
+class TcpAcceptor : public AsyncJob
+{
+private:
+    virtual void start();
+    virtual bool doneAll() const;
+    virtual void swanSong();
+    virtual const char *status() const;
+
+    TcpAcceptor(const TcpAcceptor &); // not implemented.
+
+public:
+    TcpAcceptor(const int listenFd, const Ip::Address &laddr, int flags,
+                const char *note, const Subscription::Pointer &aSub);
+
+    /** Subscribe a handler to receive calls back about new connections.
+     * Unsubscribes any existing subscribed handler.
+     */
+    void subscribe(const Subscription::Pointer &aSub);
+
+    /** Remove the currently waiting callback subscription.
+     * Already scheduled callbacks remain scheduled.
+     */
+    void unsubscribe(const char *reason);
+
+    /** Try and accept another connection (synchronous).
+     * If one is pending already the subscribed callback handler will be scheduled
+     * to handle it before this method returns.
+     */
+    void acceptNext();
+
+    /// Call the subscribed callback handler with details about a new connection.
+    void notify(const comm_err_t flags, const ConnectionDetail &newConnDetails, const int newFd) const;
+
+    /// errno code of the last accept() or listen() action if one occurred.
+    int errcode;
+
+    /// conn being listened on for new connections
+    /// Reserved for read-only use.
+    // NP: public only until we can hide it behind connection handles
+    int fd;
+
+protected:
+    friend class AcceptLimiter;
+    int32_t isLimited;                   ///< whether this socket is delayed and on the AcceptLimiter queue.
+
+private:
+    Subscription::Pointer theCallSub;    ///< used to generate AsyncCalls handling our events.
+
+    /// IP Address and port being listened on
+    Ip::Address local_addr;
+
+    /// Method to test if there are enough file descriptors to open a new client connection
+    /// if not the accept() will be postponed
+    static bool okToAccept();
+
+    /// Method callback for whenever an FD is ready to accept a client connection.
+    static void doAccept(int fd, void *data);
+
+    void acceptOne();
+    comm_err_t oldAccept(ConnectionDetail &newConnDetails, int *fd);
+    void setListen();
+
+    CBDATA_CLASS2(TcpAcceptor);
+};
+
+} // namespace Comm
+
+#endif /* SQUID_COMM_TCPACCEPTOR_H */
index 9a1c3d50e1b281770b8641a6247007b47118ee35..752c2ebf60068f3886d7193cb38e9cb2ecc29de0 100644 (file)
@@ -34,8 +34,9 @@
 
 #include "squid.h"
 #include "comm.h"
+#include "CommCalls.h"
+#include "comm/TcpAcceptor.h"
 #include "comm/Write.h"
-#include "comm/ListenStateData.h"
 #include "compat/strtoll.h"
 #include "ConnectionDetail.h"
 #include "errorpage.h"
@@ -153,13 +154,11 @@ public:
 
     void clear(); /// just resets fd and close handler. does not close active connections.
 
-    int fd; /// channel descriptor; \todo: remove because the closer has it
+    int fd; /// channel descriptor
 
-    /** Current listening socket handler. delete on shutdown or abort.
-     * FTP stores a copy of the FD in the field fd above.
-     * Use close() to properly close the channel.
-     */
-    Comm::ListenStateData *listener;
+    Ip::Address local; ///< The local IP address:port this channel is using
+
+    int flags; ///< socket flags used when opening.
 
 private:
     AsyncCall::Pointer closer; /// Comm close handler callback
@@ -245,6 +244,12 @@ public:
     void completedListing(void);
     void dataComplete();
     void dataRead(const CommIoCbParams &io);
+
+    /// ignore timeout on CTRL channel. set read timeout on DATA channel.
+    void switchTimeoutToDataChannel();
+    /// create a data channel acceptor and start listening.
+    void listenForDataChannel(const int fd, const char *note);
+
     int checkAuth(const HttpHeader * req_hdr);
     void checkUrlpath();
     void buildTitleUrl();
@@ -443,6 +448,7 @@ FTPSM *FTP_SM_FUNCS[] = {
 void
 FtpStateData::ctrlClosed(const CommCloseCbParams &io)
 {
+    debugs(9, 4, HERE);
     ctrl.clear();
     deleteThis("FtpStateData::ctrlClosed");
 }
@@ -451,10 +457,10 @@ FtpStateData::ctrlClosed(const CommCloseCbParams &io)
 void
 FtpStateData::dataClosed(const CommCloseCbParams &io)
 {
-    if (data.listener) {
-        delete data.listener;
-        data.listener = NULL;
-        data.fd = -1;
+    debugs(9, 4, HERE);
+    if (data.fd >= 0) {
+        comm_close(data.fd);
+        // NP clear() does the: data.fd = -1;
     }
     data.clear();
     failed(ERR_FTP_FAILURE, 0);
@@ -605,6 +611,46 @@ FtpStateData::loginParser(const char *login, int escaped)
     debugs(9, 9, HERE << ": OUT: login='" << login << "', escaped=" << escaped << ", user=" << user << ", password=" << password);
 }
 
+void
+FtpStateData::switchTimeoutToDataChannel()
+{
+    commSetTimeout(ctrl.fd, -1, NULL, NULL);
+
+    typedef CommCbMemFunT<FtpStateData, CommTimeoutCbParams> TimeoutDialer;
+    AsyncCall::Pointer timeoutCall = JobCallback(9, 5, TimeoutDialer, this, FtpStateData::ftpTimeout);
+    commSetTimeout(data.fd, Config.Timeout.read, timeoutCall);
+}
+
+void
+FtpStateData::listenForDataChannel(const int fd, const char *note)
+{
+    assert(data.fd < 0);
+
+    typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> AcceptDialer;
+    typedef AsyncCallT<AcceptDialer> AcceptCall;
+    RefCount<AcceptCall> call = static_cast<AcceptCall*>(JobCallback(11, 5, AcceptDialer, this, FtpStateData::ftpAcceptDataConnection));
+    Subscription::Pointer sub = new CallSubscription<AcceptCall>(call);
+
+    /* open the conn if its not already open */
+    int newFd = fd;
+    if (newFd < 0) {
+        newFd = comm_open_listener(SOCK_STREAM, IPPROTO_TCP, data.local, data.flags, note);
+        if (newFd < 0) {
+            debugs(5, DBG_CRITICAL, HERE << "comm_open_listener failed:" << data.local << " error: " << errno);
+            return;
+        }
+        debugs(9, 3, HERE << "Unconnected data socket created on FD " << newFd << ", " << data.local);
+    }
+
+    assert(newFd >= 0);
+    Comm::TcpAcceptor *tmp = new Comm::TcpAcceptor(newFd, data.local, data.flags, note, sub);
+    AsyncJob::Start(tmp);
+
+    // Ensure we have a copy of the FD opened for listening and a close handler on it.
+    data.opened(newFd, dataCloser());
+    switchTimeoutToDataChannel();
+}
+
 void
 FtpStateData::ftpTimeout(const CommTimeoutCbParams &io)
 {
@@ -1066,10 +1112,16 @@ FtpStateData::parseListing()
 
     usable = end - sbuf;
 
-    debugs(9, 3, HERE << "usable = " << usable);
+    debugs(9, 3, HERE << "usable = " << usable << " of " << len << " bytes.");
 
     if (usable == 0) {
-        debugs(9, 3, HERE << "didn't find end for " << entry->url()  );
+        if (buf[0] == '\0' && len == 1) {
+            debugs(9, 3, HERE << "NIL ends data from " << entry->url() << " transfer problem?");
+            data.readBuf->consume(len);
+        } else {
+            debugs(9, 3, HERE << "didn't find end for " << entry->url());
+            debugs(9, 3, HERE << "buffer remains (" << len << " bytes) '" << rfc1738_do_escape(buf,0) << "'");
+        }
         xfree(sbuf);
         return;
     }
@@ -1138,7 +1190,14 @@ FtpStateData::dataComplete()
      * status code after the data command.  FtpStateData was being
      * deleted in the middle of dataRead().
      */
-    scheduleReadControlReply(0);
+    /* AYJ: 2011-01-13: Bug 2581.
+     * 226 status is possibly waiting in the ctrl buffer.
+     * The connection will hang if we DONT send buffered_ok.
+     * This happens on all transfers which can be completly sent by the
+     * server before the 150 started status message is read in by Squid.
+     * ie all transfers of about one packet hang.
+     */
+    scheduleReadControlReply(1);
 }
 
 void
@@ -1674,7 +1733,7 @@ FtpStateData::scheduleReadControlReply(int buffered_ok)
          * establish one on the control socket.
          */
 
-        if (data.fd > -1) {
+        if (data.fd >= 0) {
             AsyncCall::Pointer nullCall =  NULL;
             commSetTimeout(data.fd, -1, nullCall);
         }
@@ -2722,27 +2781,24 @@ FtpStateData::ftpPasvCallback(int fd, const DnsLookupDetails &dns, comm_err_t st
 static int
 ftpOpenListenSocket(FtpStateData * ftpState, int fallback)
 {
-    int fd;
-    Ip::Address addr;
     struct addrinfo *AI = NULL;
-    int on = 1;
     int x = 0;
 
     /// Close old data channels, if any. We may open a new one below.
-    ftpState->data.close();
+    if ((ftpState->data.flags & COMM_REUSEADDR))
+        // NP: in fact it points to the control channel. just clear it.
+        ftpState->data.clear();
+    else
+        ftpState->data.close();
 
     /*
      * Set up a listen socket on the same local address as the
      * control connection.
      */
-
-    addr.InitAddrInfo(AI);
-
+    ftpState->data.local.InitAddrInfo(AI);
     x = getsockname(ftpState->ctrl.fd, AI->ai_addr, &AI->ai_addrlen);
-
-    addr = *AI;
-
-    addr.FreeAddrInfo(AI);
+    ftpState->data.local = *AI;
+    ftpState->data.local.FreeAddrInfo(AI);
 
     if (x) {
         debugs(9, DBG_CRITICAL, HERE << "getsockname(" << ftpState->ctrl.fd << ",..): " << xstrerror());
@@ -2754,38 +2810,18 @@ ftpOpenListenSocket(FtpStateData * ftpState, int fallback)
      * used for both control and data.
      */
     if (fallback) {
+        int on = 1;
         setsockopt(ftpState->ctrl.fd, SOL_SOCKET, SO_REUSEADDR, (char *) &on, sizeof(on));
+        ftpState->ctrl.flags |= COMM_REUSEADDR;
+        ftpState->data.flags |= COMM_REUSEADDR;
     } else {
         /* if not running in fallback mode a new port needs to be retrieved */
-        addr.SetPort(0);
-    }
-
-    fd = comm_open(SOCK_STREAM,
-                   IPPROTO_TCP,
-                   addr,
-                   COMM_NONBLOCKING | (fallback ? COMM_REUSEADDR : 0),
-                   ftpState->entry->url());
-    debugs(9, 3, HERE << "Unconnected data socket created on FD " << fd  );
-
-    if (fd < 0) {
-        debugs(9, DBG_CRITICAL, HERE << "comm_open failed");
-        return -1;
-    }
-
-    typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
-    AsyncCall::Pointer acceptCall = JobCallback(11, 5,
-                                    acceptDialer, ftpState, FtpStateData::ftpAcceptDataConnection);
-    ftpState->data.listener = new Comm::ListenStateData(fd, acceptCall, false);
-
-    if (!ftpState->data.listener || ftpState->data.listener->errcode != 0) {
-        comm_close(fd);
-        return -1;
+        ftpState->data.local.SetPort(0);
+        ftpState->data.flags = COMM_NONBLOCKING;
     }
 
-    ftpState->data.opened(fd, ftpState->dataCloser());
-    ftpState->data.port = comm_local_port(fd);
-    ftpState->data.host = NULL;
-    return fd;
+    ftpState->listenForDataChannel((fallback?ftpState->ctrl.fd:-1), ftpState->entry->url());
+    return ftpState->data.fd;
 }
 
 /// \ingroup ServerProtocolFTPInternal
@@ -2881,6 +2917,7 @@ ftpSendEPRT(FtpStateData * ftpState)
     debugs(9, 3, HERE);
     ftpState->flags.pasv_supported = 0;
     fd = ftpOpenListenSocket(ftpState, 0);
+    debugs(9, 3, "Listening for FTP data connection with FD " << fd);
 
     Ip::Address::InitAddrInfo(AI);
 
@@ -2933,77 +2970,68 @@ ftpReadEPRT(FtpStateData * ftpState)
  */
 void FtpStateData::ftpAcceptDataConnection(const CommAcceptCbParams &io)
 {
-    char ntoapeer[MAX_IPSTRLEN];
-    debugs(9, 3, "ftpAcceptDataConnection");
-
-    // one connection accepted. the handler has stopped listening. drop our local pointer to it.
-    data.listener = NULL;
+    debugs(9, 3, HERE);
 
     if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) {
         abortTransaction("entry aborted when accepting data conn");
         return;
     }
 
+    if (io.flag != COMM_OK) {
+        data.close();
+        debugs(9, DBG_IMPORTANT, "FTP AcceptDataConnection: FD " << io.fd << ": " << xstrerr(io.xerrno));
+        /** \todo Need to send error message on control channel*/
+        ftpFail(this);
+        return;
+    }
+
+    /* data listening conn is no longer even open. abort. */
+    if (data.fd <= 0 || fd_table[data.fd].flags.open == 0) {
+        data.clear(); // ensure that it's cleared and not just closed.
+        return;
+    }
+
     /** \par
      * When squid.conf ftp_sanitycheck is enabled, check the new connection is actually being
      * made by the remote client which is connected to the FTP control socket.
+     * Or the one which we were told to listen for by control channel messages (may differ under NAT).
      * This prevents third-party hacks, but also third-party load balancing handshakes.
      */
     if (Config.Ftp.sanitycheck) {
+        char ntoapeer[MAX_IPSTRLEN];
         io.details.peer.NtoA(ntoapeer,MAX_IPSTRLEN);
 
-        if (strcmp(fd_table[ctrl.fd].ipaddr, ntoapeer) != 0) {
+        if (strcmp(fd_table[ctrl.fd].ipaddr, ntoapeer) != 0 &&
+                strcmp(fd_table[data.fd].ipaddr, ntoapeer) != 0) {
             debugs(9, DBG_IMPORTANT,
                    "FTP data connection from unexpected server (" <<
                    io.details.peer << "), expecting " <<
-                   fd_table[ctrl.fd].ipaddr);
+                   fd_table[ctrl.fd].ipaddr << " or " << fd_table[data.fd].ipaddr);
 
-            /* close the bad soures connection down ASAP. */
+            /* close the bad sources connection down ASAP. */
             comm_close(io.nfd);
 
-            /* we are ony accepting once, so need to re-open the listener socket. */
-            typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
-            AsyncCall::Pointer acceptCall = JobCallback(11, 5,
-                                            acceptDialer, this, FtpStateData::ftpAcceptDataConnection);
-            data.listener = new Comm::ListenStateData(data.fd, acceptCall, false);
+            /* drop the bad connection (io) by ignoring the attempt. */
             return;
         }
     }
 
-    if (io.flag != COMM_OK) {
-        debugs(9, DBG_IMPORTANT, "ftpHandleDataAccept: FD " << io.nfd << ": " << xstrerr(io.xerrno));
-        /** \todo XXX Need to set error message */
-        ftpFail(this);
-        return;
-    }
-
     /**\par
-     * Replace the Listen socket with the accepted data socket */
+     * Replace the Listening socket with the accepted data socket */
     data.close();
     data.opened(io.nfd, dataCloser());
     data.port = io.details.peer.GetPort();
-    io.details.peer.NtoA(data.host,SQUIDHOSTNAMELEN);
+    data.host = xstrdup(fd_table[io.nfd].ipaddr);
 
     debugs(9, 3, "ftpAcceptDataConnection: Connected data socket on " <<
            "FD " << io.nfd << " to " << io.details.peer << " FD table says: " <<
            "ctrl-peer= " << fd_table[ctrl.fd].ipaddr << ", " <<
            "data-peer= " << fd_table[data.fd].ipaddr);
 
+    assert(haveControlChannel("ftpAcceptDataConnection"));
+    assert(ctrl.message == NULL);
 
-    AsyncCall::Pointer nullCall = NULL;
-    commSetTimeout(ctrl.fd, -1, nullCall);
-
-    typedef CommCbMemFunT<FtpStateData, CommTimeoutCbParams> TimeoutDialer;
-    AsyncCall::Pointer timeoutCall =  JobCallback(9, 5,
-                                      TimeoutDialer, this, FtpStateData::ftpTimeout);
-    commSetTimeout(data.fd, Config.Timeout.read, timeoutCall);
-
-    /*\todo XXX We should have a flag to track connect state...
-     *    host NULL -> not connected, port == local port
-     *    host set  -> connected, port == remote port
-     */
-    /* Restart state (SENT_NLST/LIST/RETR) */
-    FTP_SM_FUNCS[state] (this);
+    // Ctrl channel operations will determine what happens to this data connection
 }
 
 /// \ingroup ServerProtocolFTPInternal
@@ -3075,34 +3103,17 @@ void FtpStateData::readStor()
             return;
         }
 
-        /*\par
-         * When client status is 125, or 150 without a hostname, Begin data transfer. */
+        /* When client status is 125, or 150 without a hostname, Begin data transfer. */
         debugs(9, 3, HERE << "starting data transfer");
+        switchTimeoutToDataChannel();
         sendMoreRequestBody();
-        /** \par
-         * Cancel the timeout on the Control socket and
-         * establish one on the data socket.
-         */
-        AsyncCall::Pointer nullCall = NULL;
-        commSetTimeout(ctrl.fd, -1, nullCall);
-
-        typedef CommCbMemFunT<FtpStateData, CommTimeoutCbParams> TimeoutDialer;
-        AsyncCall::Pointer timeoutCall =  JobCallback(9, 5,
-                                          TimeoutDialer, this, FtpStateData::ftpTimeout);
-
-        commSetTimeout(data.fd, Config.Timeout.read, timeoutCall);
-
         state = WRITING_DATA;
         debugs(9, 3, HERE << "writing data channel");
     } else if (code == 150) {
         /*\par
-         * When client code is 150 with a hostname, Accept data channel. */
+         * When client code is 150 without a hostname, Accept data channel. */
         debugs(9, 3, "ftpReadStor: accepting data channel");
-        typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
-        AsyncCall::Pointer acceptCall = JobCallback(11, 5,
-                                        acceptDialer, this, FtpStateData::ftpAcceptDataConnection);
-
-        data.listener = new Comm::ListenStateData(data.fd, acceptCall, false);
+        listenForDataChannel(data.fd, data.host);
     } else {
         debugs(9, DBG_IMPORTANT, HERE << "Unexpected reply code "<< std::setfill('0') << std::setw(3) << code);
         ftpFail(this);
@@ -3222,34 +3233,15 @@ ftpReadList(FtpStateData * ftpState)
 
     if (code == 125 || (code == 150 && ftpState->data.host)) {
         /* Begin data transfer */
-        /* XXX what about Config.Timeout.read? */
+        debugs(9, 3, HERE << "begin data transfer from " << ftpState->data.host << " (" << ftpState->data.local << ")");
+        ftpState->switchTimeoutToDataChannel();
         ftpState->maybeReadVirginBody();
         ftpState->state = READING_DATA;
-        /*
-         * Cancel the timeout on the Control socket and establish one
-         * on the data socket
-         */
-        AsyncCall::Pointer nullCall = NULL;
-        commSetTimeout(ftpState->ctrl.fd, -1, nullCall);
         return;
     } else if (code == 150) {
         /* Accept data channel */
-        typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
-        AsyncCall::Pointer acceptCall = JobCallback(11, 5,
-                                        acceptDialer, ftpState, FtpStateData::ftpAcceptDataConnection);
-
-        ftpState->data.listener = new Comm::ListenStateData(ftpState->data.fd, acceptCall, false);
-        /*
-         * Cancel the timeout on the Control socket and establish one
-         * on the data socket
-         */
-        AsyncCall::Pointer nullCall = NULL;
-        commSetTimeout(ftpState->ctrl.fd, -1, nullCall);
-
-        typedef CommCbMemFunT<FtpStateData, CommTimeoutCbParams> TimeoutDialer;
-        AsyncCall::Pointer timeoutCall =  JobCallback(9, 5,
-                                          TimeoutDialer, ftpState,FtpStateData::ftpTimeout);
-        commSetTimeout(ftpState->data.fd, Config.Timeout.read, timeoutCall);
+        debugs(9, 3, HERE << "accept data channel from " << ftpState->data.host << " (" << ftpState->data.local << ")");
+        ftpState->listenForDataChannel(ftpState->data.fd, ftpState->data.host);
         return;
     } else if (!ftpState->flags.tried_nlst && code > 300) {
         ftpSendNlst(ftpState);
@@ -3285,32 +3277,12 @@ ftpReadRetr(FtpStateData * ftpState)
     if (code == 125 || (code == 150 && ftpState->data.host)) {
         /* Begin data transfer */
         debugs(9, 3, HERE << "reading data channel");
-        /* XXX what about Config.Timeout.read? */
+        ftpState->switchTimeoutToDataChannel();
         ftpState->maybeReadVirginBody();
         ftpState->state = READING_DATA;
-        /*
-         * Cancel the timeout on the Control socket and establish one
-         * on the data socket
-         */
-        AsyncCall::Pointer nullCall = NULL;
-        commSetTimeout(ftpState->ctrl.fd, -1, nullCall);
     } else if (code == 150) {
         /* Accept data channel */
-        typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
-        AsyncCall::Pointer acceptCall = JobCallback(11, 5,
-                                        acceptDialer, ftpState, FtpStateData::ftpAcceptDataConnection);
-        ftpState->data.listener = new Comm::ListenStateData(ftpState->data.fd, acceptCall, false);
-        /*
-         * Cancel the timeout on the Control socket and establish one
-         * on the data socket
-         */
-        AsyncCall::Pointer nullCall = NULL;
-        commSetTimeout(ftpState->ctrl.fd, -1, nullCall);
-
-        typedef CommCbMemFunT<FtpStateData, CommTimeoutCbParams> TimeoutDialer;
-        AsyncCall::Pointer timeoutCall =  JobCallback(9, 5,
-                                          TimeoutDialer, ftpState,FtpStateData::ftpTimeout);
-        commSetTimeout(ftpState->data.fd, Config.Timeout.read, timeoutCall);
+        ftpState->listenForDataChannel(ftpState->data.fd, ftpState->data.host);
     } else if (code >= 300) {
         if (!ftpState->flags.try_slash_hack) {
             /* Try this as a directory missing trailing slash... */
@@ -3965,6 +3937,13 @@ FtpChannel::opened(int aFd, const AsyncCall::Pointer &aCloser)
     fd = aFd;
     closer = aCloser;
     comm_add_close_handler(fd, closer);
+
+    // grab the local IP address:port details for this connection
+    struct addrinfo *AI = NULL;
+    local.InitAddrInfo(AI);
+    getsockname(aFd, AI->ai_addr, &AI->ai_addrlen);
+    local = *AI;
+    local.FreeAddrInfo(AI);
 }
 
 /// planned close: removes the close handler and calls comm_close
@@ -3972,15 +3951,11 @@ void
 FtpChannel::close()
 {
     // channels with active listeners will be closed when the listener handler dies.
-    if (listener) {
-        delete listener;
-        listener = NULL;
-        comm_remove_close_handler(fd, closer);
-        closer = NULL;
-        fd = -1;
-    } else if (fd >= 0) {
-        comm_remove_close_handler(fd, closer);
-        closer = NULL;
+    if (fd >= 0) {
+        if (closer != NULL) {
+            comm_remove_close_handler(fd, closer);
+            closer = NULL;
+        }
         comm_close(fd); // we do not expect to be called back
         fd = -1;
     }