]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Move listener socket handling to libcomm-listener.la
authorAmos Jeffries <squid3@treenet.co.nz>
Thu, 31 Dec 2009 02:35:01 +0000 (15:35 +1300)
committerAmos Jeffries <squid3@treenet.co.nz>
Thu, 31 Dec 2009 02:35:01 +0000 (15:35 +1300)
* Renamed AcceptFD to ListenStateData
* listener sockets 'owned' by the code which opened them
  - FtpStateData owns the FTP data sockets
  - config HTTP settings own the http_port/https_port listeners.
* Removed fdc_table - replaced with Comm::CurrentListenerSockets.
  - reasonable memory savings (default 32-bit build only 8 KB, large production 64-bit build save 768 KB)
  - initialized in one simple fast step.
  - changes only made by ListenStateData constructor/destructor
* HotConf ready listener sockets (depends only on further config changes to happen)

18 files changed:
configure.in
src/Makefile.am
src/ProtoPort.cc
src/ProtoPort.h
src/client_side.cc
src/comm.cc
src/comm.h
src/comm/AcceptLimiter.cc [new file with mode: 0644]
src/comm/AcceptLimiter.h [new file with mode: 0644]
src/comm/ListenStateData.cc [new file with mode: 0644]
src/comm/ListenStateData.h [new file with mode: 0644]
src/comm/Makefile.am [new file with mode: 0644]
src/comm/comm_internal.h [new file with mode: 0644]
src/fde.h
src/ftp.cc
src/protos.h
src/store.cc
src/tests/stub_fd.cc

index 51cbce6dc17712a98a69827784334d0b14f1aa10..0035eb01169f8a4a9c0a56e75efc295f8dbd3aa2 100644 (file)
@@ -2433,6 +2433,7 @@ AC_CHECK_HEADERS( \
        linux/types.h \
        machine/byte_swap.h \
        malloc.h \
+       map \
        math.h \
        memory.h \
        mount.h \
@@ -4136,6 +4137,7 @@ AC_CONFIG_FILES([\
        src/adaptation/Makefile \
        src/adaptation/icap/Makefile \
        src/adaptation/ecap/Makefile \
+       src/comm/Makefile \
        src/esi/Makefile \
        src/eui/Makefile \
        src/icmp/Makefile \
index 96195df96ddb24fd37b4c625e7c43d4bf06ff52e..a1371239531b99f7fd40277067fd9938bd369d14 100644 (file)
@@ -34,7 +34,7 @@ LOADABLE_MODULES_SOURCES = \
        LoadableModules.h \
        LoadableModules.cc
 
-SUBDIRS        = base eui acl fs repl auth ip icmp ident
+SUBDIRS        = base comm eui acl fs repl auth ip icmp ident
 
 if USE_ADAPTATION
 SUBDIRS += adaptation
@@ -534,6 +534,7 @@ nodist_squid_SOURCES = \
 
 squid_LDADD = \
        $(COMMON_LIBS) \
+       comm/libcomm-listener.la \
        icmp/libicmp.la icmp/libicmp-core.la \
        @XTRA_OBJS@ \
        @DISK_LINKOBJS@ \
@@ -1196,8 +1197,10 @@ tests_testCacheManager_SOURCES = \
        wordlist.cc
 nodist_tests_testCacheManager_SOURCES = \
        $(BUILT_SOURCES)
+# comm.cc only requires comm/libcomm-listener.la until fdc_table is dead.
 tests_testCacheManager_LDADD = \
        $(COMMON_LIBS) \
+       comm/libcomm-listener.la \
        icmp/libicmp.la icmp/libicmp-core.la \
        @REPL_OBJS@ \
        ${ADAPTATION_LIBS} \
@@ -1372,6 +1375,7 @@ nodist_tests_testEvent_SOURCES = \
 tests_testEvent_LDADD = \
        $(COMMON_LIBS) \
        icmp/libicmp.la icmp/libicmp-core.la \
+       comm/libcomm-listener.la \
        @REPL_OBJS@ \
        ${ADAPTATION_LIBS} \
        $(ESI_LIBS) \
@@ -1521,6 +1525,7 @@ nodist_tests_testEventLoop_SOURCES = \
 tests_testEventLoop_LDADD = \
        $(COMMON_LIBS) \
        icmp/libicmp.la icmp/libicmp-core.la \
+       comm/libcomm-listener.la \
        @REPL_OBJS@ \
        ${ADAPTATION_LIBS} \
        $(ESI_LIBS) \
@@ -1665,6 +1670,7 @@ nodist_tests_test_http_range_SOURCES = \
 tests_test_http_range_LDADD = \
        $(COMMON_LIBS) \
        icmp/libicmp.la icmp/libicmp-core.la \
+       comm/libcomm-listener.la \
        @REPL_OBJS@ \
        ${ADAPTATION_LIBS} \
        $(ESI_LIBS) \
@@ -1814,6 +1820,7 @@ nodist_tests_testHttpRequest_SOURCES = \
 tests_testHttpRequest_LDADD = \
        $(COMMON_LIBS) \
        icmp/libicmp.la icmp/libicmp-core.la \
+       comm/libcomm-listener.la \
        @REPL_OBJS@ \
        ${ADAPTATION_LIBS} \
        $(ESI_LIBS) \
@@ -1887,10 +1894,10 @@ tests_testStore_SOURCES= \
        tests/testStoreHashIndex.h \
        tests/TestSwapDir.cc \
        tests/TestSwapDir.h \
-       tests/stub_fd.cc \
        tests/stub_HttpReply.cc \
        tests/stub_cache_manager.cc \
-       $(STORE_TEST_SOURCES)
+       $(STORE_TEST_SOURCES) \
+       tests/stub_fd.cc
 
 nodist_tests_testStore_SOURCES= \
        $(TESTSOURCES) \
@@ -2164,6 +2171,7 @@ nodist_tests_testURL_SOURCES = \
 tests_testURL_LDADD = \
        $(COMMON_LIBS) \
        icmp/libicmp.la icmp/libicmp-core.la \
+       comm/libcomm-listener.la \
        @REGEXLIB@ \
        @REPL_OBJS@ \
        ${ADAPTATION_LIBS} \
index 999b0b17e283df516073348cbe12320a9ca08861..c03f2efa79e2953eb88c0e11180bde498d214c74 100644 (file)
@@ -1,7 +1,5 @@
-
 /*
  * $Id$
- *
  */
 
 #include "squid.h"
@@ -18,6 +16,8 @@ http_port_list::http_port_list(const char *aProtocol)
 
 http_port_list::~http_port_list()
 {
+    delete listener;
+
     safe_free(name);
     safe_free(defaultsite);
     safe_free(protocol);
index 15d6abf6486f49eeca520a51c46525ce8bcb773e..aa8a3321f3c4b716090549cf3f0a6f86511d73f3 100644 (file)
@@ -6,6 +6,7 @@
 
 //#include "typedefs.h"
 #include "cbdata.h"
+#include "comm/ListenStateData.h"
 
 struct http_port_list {
     http_port_list(const char *aProtocol);
@@ -37,6 +38,13 @@ struct http_port_list {
         unsigned int timeout;
     } tcp_keepalive;
 
+    /**
+     * The FD listening socket handler.
+     * If not NULL we are actively listening for client requests.
+     * delete to close the socket.
+     */
+    Comm::ListenStateData *listener;
+
 #if USE_SSL
     // XXX: temporary hack to ease move of SSL options to http_port
     http_port_list &http;
index 5377c8ee06849b8bf36b468c2d38456efd8196b1..98bd6c65b7f143b6f05acbda3446964a412e31e5 100644 (file)
  */
 
 #include "squid.h"
+
+#include "acl/FilledChecklist.h"
+#include "auth/UserRequest.h"
+#include "ChunkedCodingParser.h"
 #include "client_side.h"
+#include "client_side_reply.h"
+#include "client_side_request.h"
+#include "ClientRequestContext.h"
 #include "clientStream.h"
-#include "ProtoPort.h"
-#include "auth/UserRequest.h"
-#include "Store.h"
 #include "comm.h"
+#include "comm/ListenStateData.h"
+#include "ConnectionDetail.h"
+#include "fde.h"
 #include "HttpHdrContRange.h"
 #include "HttpReply.h"
 #include "HttpRequest.h"
 #include "ident/Config.h"
 #include "ident/Ident.h"
 #include "ip/IpIntercept.h"
-#include "MemObject.h"
-#include "fde.h"
-#include "client_side_request.h"
-#include "acl/FilledChecklist.h"
-#include "ConnectionDetail.h"
-#include "client_side_reply.h"
-#include "ClientRequestContext.h"
 #include "MemBuf.h"
+#include "MemObject.h"
+#include "ProtoPort.h"
 #include "SquidTime.h"
-#include "ChunkedCodingParser.h"
+#include "Store.h"
 
 #if LINGERING_CLOSE
 #define comm_close comm_lingering_close
@@ -147,7 +149,6 @@ static CSCB clientSocketRecipient;
 static CSD clientSocketDetach;
 static void clientSetKeepaliveFlag(ClientHttpRequest *);
 static int clientIsContentLengthValid(HttpRequest * r);
-static bool okToAccept();
 static int clientIsRequestBodyValid(int64_t bodyLength);
 static int clientIsRequestBodyTooLargeForPolicy(int64_t bodyLength);
 
@@ -2925,8 +2926,6 @@ ConnStateData::requestTimeout(const CommTimeoutCbParams &io)
 #endif
 }
 
-
-
 static void
 clientLifetimeTimeout(int fd, void *data)
 {
@@ -2936,22 +2935,6 @@ clientLifetimeTimeout(int fd, void *data)
     comm_close(fd);
 }
 
-static bool
-okToAccept()
-{
-    static time_t last_warn = 0;
-
-    if (fdNFree() >= RESERVED_FD)
-        return true;
-
-    if (last_warn + 15 < squid_curtime) {
-        debugs(33, 0, HERE << "WARNING! Your cache is running out of filedescriptors");
-        last_warn = squid_curtime;
-    }
-
-    return false;
-}
-
 ConnStateData *
 connStateCreate(const IpAddress &peer, const IpAddress &me, int fd, http_port_list *port)
 {
@@ -3006,16 +2989,6 @@ httpAccept(int sock, int newfd, ConnectionDetail *details,
     http_port_list *s = (http_port_list *)data;
     ConnStateData *connState = NULL;
 
-    if (flag == COMM_ERR_CLOSING) {
-        return;
-    }
-
-    if (!okToAccept())
-        AcceptLimiter::Instance().defer (sock, httpAccept, data);
-    else
-        /* kick off another one for later */
-        comm_accept(sock, httpAccept, data);
-
     if (flag != COMM_OK) {
         debugs(33, 1, "httpAccept: FD " << sock << ": accept failure: " << xstrerr(xerrno));
         return;
@@ -3212,16 +3185,6 @@ httpsAccept(int sock, int newfd, ConnectionDetail *details,
     https_port_list *s = (https_port_list *)data;
     SSL_CTX *sslContext = s->sslContext;
 
-    if (flag == COMM_ERR_CLOSING) {
-        return;
-    }
-
-    if (!okToAccept())
-        AcceptLimiter::Instance().defer (sock, httpsAccept, data);
-    else
-        /* kick off another one for later */
-        comm_accept(sock, httpsAccept, data);
-
     if (flag != COMM_OK) {
         errno = xerrno;
         debugs(33, 1, "httpsAccept: FD " << sock << ": accept failure: " << xstrerr(xerrno));
@@ -3257,7 +3220,6 @@ httpsAccept(int sock, int newfd, ConnectionDetail *details,
         if (identChecklist.fastCheck())
             Ident::Start(details->me, details->peer, clientIdentDone, connState);
     }
-
 #endif
 
     if (s->http.tcp_keepalive.enabled) {
@@ -3333,6 +3295,8 @@ clientHttpConnectionsOpen(void)
             ++bumpCount;
 #endif
 
+        /* AYJ: 2009-12-27: bit bumpy. new ListenStateData(...) should be doing all the Comm:: stuff ... */
+
         enter_suid();
 
         if (s->spoof_client_ip) {
@@ -3346,9 +3310,10 @@ clientHttpConnectionsOpen(void)
         if (fd < 0)
             continue;
 
-        comm_listen(fd);
+        AsyncCall::Pointer call = commCbCall(5,5, "SomeCommAcceptHandler(httpAccept)",
+                                         CommAcceptCbPtrFun(httpAccept, s));
 
-        comm_accept(fd, httpAccept, s);
+        s->listener = new Comm::ListenStateData(fd, call, true);
 
         debugs(1, 1, "Accepting " <<
                (s->intercepted ? " intercepted" : "") <<
@@ -3399,9 +3364,10 @@ clientHttpsConnectionsOpen(void)
         if (fd < 0)
             continue;
 
-        comm_listen(fd);
+        AsyncCall::Pointer call = commCbCall(5,5, "SomeCommAcceptHandler(httpsAccept)",
+                                         CommAcceptCbPtrFun(httpsAccept, s));
 
-        comm_accept(fd, httpsAccept, s);
+        s->listener = new Comm::ListenStateData(fd, call, true);
 
         debugs(1, 1, "Accepting HTTPS connections at " << s->http.s << ", FD " << fd << ".");
 
@@ -3416,7 +3382,6 @@ clientOpenListenSockets(void)
 {
     clientHttpConnectionsOpen();
 #if USE_SSL
-
     clientHttpsConnectionsOpen();
 #endif
 
@@ -3427,16 +3392,28 @@ clientOpenListenSockets(void)
 void
 clientHttpConnectionsClose(void)
 {
-    int i;
+    for (http_port_list *s = Config.Sockaddr.http; s; s = s->next) {
+        if (s->listener) {
+            debugs(1, 1, "FD " << s->listener->fd << " Closing HTTP connection");
+            delete s->listener;
+            s->listener = NULL;
+        }
+    }
 
-    for (i = 0; i < NHttpSockets; i++) {
-        if (HttpSockets[i] >= 0) {
-            debugs(1, 1, "FD " << HttpSockets[i] <<
-                   " Closing HTTP connection");
-            comm_close(HttpSockets[i]);
-            HttpSockets[i] = -1;
+#if USE_SSL
+    for (http_port_list *s = Config.Sockaddr.https; s; s = s->next) {
+        if (s->listener) {
+            debugs(1, 1, "FD " << s->listener->fd << " Closing HTTPS connection");
+            delete s->listener;
+            s->listener = NULL;
         }
     }
+#endif
+
+    // TODO see if we can drop HttpSockets array entirely */
+    for (int i = 0; i < NHttpSockets; i++) {
+        HttpSockets[i] = -1;
+    }
 
     NHttpSockets = 0;
 }
index 1319eccdda7f4a83a17df0d04da81ef8af022716..5a297df815c1766d49ec09d7c8fa0d4b77bbf140 100644 (file)
@@ -37,6 +37,9 @@
 #include "comm.h"
 #include "event.h"
 #include "fde.h"
+#include "comm/AcceptLimiter.h"
+#include "comm/comm_internal.h"
+#include "comm/ListenStateData.h"
 #include "CommIO.h"
 #include "CommRead.h"
 #include "ConnectionDetail.h"
@@ -242,44 +245,15 @@ static PF commConnectFree;
 static PF commHandleWrite;
 static IPH commConnectDnsHandle;
 
-static PF comm_accept_try;
-
-class AcceptFD
-{
-
-public:
-    AcceptFD(int aFd = -1): fd(aFd), theCallback(0), mayAcceptMore(false) {}
-
-    void subscribe(AsyncCall::Pointer &call);
-    void acceptNext();
-    void notify(int newfd, comm_err_t, int xerrno, const ConnectionDetail &);
-
-    int fd;
-
-private:
-    bool acceptOne();
-
-    AsyncCall::Pointer theCallback;
-    bool mayAcceptMore;
-};
-
 typedef enum {
     COMM_CB_READ = 1,
     COMM_CB_DERIVED,
 } comm_callback_t;
 
-struct _fd_debug_t {
-    char const *close_file;
-    int close_line;
-};
-
-typedef struct _fd_debug_t fd_debug_t;
-
 static MemAllocator *conn_close_pool = NULL;
-AcceptFD *fdc_table = NULL; // TODO: rename. And use Vector<>?
 fd_debug_t *fdd_table = NULL;
 
-static bool
+bool
 isOpen(const int fd)
 {
     return fd_table[fd].flags.open != 0;
@@ -1372,69 +1346,6 @@ comm_connect_addr(int sock, const IpAddress &address)
     return status;
 }
 
-/* Wait for an incoming connection on FD.  FD should be a socket returned
- * from comm_listen. */
-static int
-comm_old_accept(int fd, ConnectionDetail &details)
-{
-    PROF_start(comm_accept);
-    statCounter.syscalls.sock.accepts++;
-    int sock;
-    struct addrinfo *gai = NULL;
-    details.me.InitAddrInfo(gai);
-
-    if ((sock = accept(fd, gai->ai_addr, &gai->ai_addrlen)) < 0) {
-
-        details.me.FreeAddrInfo(gai);
-
-        PROF_stop(comm_accept);
-
-        if (ignoreErrno(errno)) {
-            debugs(50, 5, "comm_old_accept: FD " << fd << ": " << xstrerror());
-            return COMM_NOMESSAGE;
-        } else if (ENFILE == errno || EMFILE == errno) {
-            debugs(50, 3, "comm_old_accept: FD " << fd << ": " << xstrerror());
-            return COMM_ERROR;
-        } else {
-            debugs(50, 1, "comm_old_accept: FD " << fd << ": " << xstrerror());
-            return COMM_ERROR;
-        }
-    }
-
-    details.peer = *gai;
-
-    details.me.InitAddrInfo(gai);
-
-    details.me.SetEmpty();
-    getsockname(sock, gai->ai_addr, &gai->ai_addrlen);
-    details.me = *gai;
-
-    commSetCloseOnExec(sock);
-
-    /* fdstat update */
-    fd_open(sock, FD_SOCKET, "HTTP Request");
-    fdd_table[sock].close_file = NULL;
-    fdd_table[sock].close_line = 0;
-    fde *F = &fd_table[sock];
-    details.peer.NtoA(F->ipaddr,MAX_IPSTRLEN);
-    F->remote_port = details.peer.GetPort();
-    F->local_addr.SetPort(details.me.GetPort());
-#if USE_IPV6
-    F->sock_family = AF_INET;
-#else
-    F->sock_family = details.me.IsIPv4()?AF_INET:AF_INET6;
-#endif
-    details.me.FreeAddrInfo(gai);
-
-    commSetNonBlocking(sock);
-
-    /* IFF the socket is (tproxy) transparent, pass the flag down to allow spoofing */
-    F->flags.transparent = fd_table[fd].flags.transparent;
-
-    PROF_stop(comm_accept);
-    return sock;
-}
-
 void
 commCallCloseHandlers(int fd)
 {
@@ -1532,7 +1443,6 @@ comm_close_start(int fd, void *data)
 
 }
 
-
 void
 comm_close_complete(int fd, void *data)
 {
@@ -1549,15 +1459,10 @@ comm_close_complete(int fd, void *data)
 
     close(fd);
 
-    fdc_table[fd] = AcceptFD(fd);
-
     statCounter.syscalls.sock.closes++;
 
     /* When an fd closes, give accept() a chance, if need be */
-
-    if (fdNFree() >= RESERVED_FD)
-        AcceptLimiter::Instance().kick();
-
+    Comm::AcceptLimiter::Instance().kick();
 }
 
 /*
@@ -1617,8 +1522,8 @@ _comm_close(int fd, char const *file, int line)
         commio_finish_callback(fd, COMMIO_FD_READCB(fd), COMM_ERR_CLOSING, errno);
     }
 
-    // notify accept handlers
-    fdc_table[fd].notify(-1, COMM_ERR_CLOSING, 0, ConnectionDetail());
+    // Listening FD need to be closed by deleting their ListenStateData object.
+    assert(Comm::CurrentListenerSockets[fd] == NULL);
 
     commCallCloseHandlers(fd);
 
@@ -1932,10 +1837,11 @@ comm_init(void)
     fd_table =(fde *) xcalloc(Squid_MaxFD, sizeof(fde));
     fdd_table = (fd_debug_t *)xcalloc(Squid_MaxFD, sizeof(fd_debug_t));
 
-    fdc_table = new AcceptFD[Squid_MaxFD];
-    for (int pos = 0; pos < Squid_MaxFD; ++pos) {
-        fdc_table[pos] = AcceptFD(pos);
-    }
+    /* make sure we have an empty listening socket map */
+    Comm::CurrentListenerSockets.clear();
+
+    /* make sure the accept() socket FIFO delay queue exists */
+    Comm::AcceptLimiter::Instance();
 
     commfd_table = (comm_fd_t *) xcalloc(Squid_MaxFD, sizeof(comm_fd_t));
     for (int pos = 0; pos < Squid_MaxFD; pos++) {
@@ -1965,10 +1871,7 @@ comm_exit(void)
 
     safe_free(fd_table);
     safe_free(fdd_table);
-    if (fdc_table) {
-        delete[] fdc_table;
-        fdc_table = NULL;
-    }
+    Comm::CurrentListenerSockets.clear();
     safe_free(commfd_table);
 }
 
@@ -2189,169 +2092,6 @@ checkTimeouts(void)
     }
 }
 
-/*
- * New-style listen and accept routines
- *
- * Listen simply registers our interest in an FD for listening,
- * and accept takes a callback to call when an FD has been
- * accept()ed.
- */
-int
-comm_listen(int sock)
-{
-    int x;
-
-    if ((x = listen(sock, Squid_MaxFD >> 2)) < 0) {
-        debugs(50, 0, "comm_listen: listen(" << (Squid_MaxFD >> 2) << ", " << sock << "): " << xstrerror());
-        return x;
-    }
-
-    if (Config.accept_filter && strcmp(Config.accept_filter, "none") != 0) {
-#ifdef SO_ACCEPTFILTER
-        struct accept_filter_arg afa;
-        bzero(&afa, sizeof(afa));
-        debugs(5, DBG_CRITICAL, "Installing accept filter '" << Config.accept_filter << "' on FD " << sock);
-        xstrncpy(afa.af_name, Config.accept_filter, sizeof(afa.af_name));
-        x = setsockopt(sock, SOL_SOCKET, SO_ACCEPTFILTER, &afa, sizeof(afa));
-        if (x < 0)
-            debugs(5, 0, "SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror());
-#elif defined(TCP_DEFER_ACCEPT)
-        int seconds = 30;
-        if (strncmp(Config.accept_filter, "data=", 5) == 0)
-            seconds = atoi(Config.accept_filter + 5);
-        x = setsockopt(sock, IPPROTO_TCP, TCP_DEFER_ACCEPT, &seconds, sizeof(seconds));
-        if (x < 0)
-            debugs(5, 0, "TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror());
-#else
-        debugs(5, 0, "accept_filter not supported on your OS");
-#endif
-    }
-
-    return sock;
-}
-
-void
-comm_accept(int fd, IOACB *handler, void *handler_data)
-{
-    debugs(5, 5, "comm_accept: FD " << fd << " handler: " << (void*)handler);
-    assert(isOpen(fd));
-
-    AsyncCall::Pointer call = commCbCall(5,5, "SomeCommAcceptHandler",
-                                         CommAcceptCbPtrFun(handler, handler_data));
-    fdc_table[fd].subscribe(call);
-}
-
-void
-comm_accept(int fd, AsyncCall::Pointer &call)
-{
-    debugs(5, 5, "comm_accept: FD " << fd << " AsyncCall: " << call);
-    assert(isOpen(fd));
-
-    fdc_table[fd].subscribe(call);
-}
-
-// Called when somebody wants to be notified when our socket accepts new
-// connection. We do not probe the FD until there is such interest.
-void
-AcceptFD::subscribe(AsyncCall::Pointer &call)
-{
-    /* make sure we're not pending! */
-    assert(!theCallback);
-    theCallback = call;
-
-#if OPTIMISTIC_IO
-    mayAcceptMore = true; // even if we failed to accept last time
-#endif
-
-    if (mayAcceptMore)
-        acceptNext();
-    else
-        commSetSelect(fd, COMM_SELECT_READ, comm_accept_try, NULL, 0);
-}
-
-bool
-AcceptFD::acceptOne()
-{
-    // If there is no callback and we accept, we will leak the accepted FD.
-    // When we are running out of FDs, there is often no callback.
-    if (!theCallback) {
-        debugs(5, 5, "AcceptFD::acceptOne orphaned: FD " << fd);
-        // XXX: can we remove this and similar "just in case" calls and
-        // either listen always or listen only when there is a callback?
-        if (!AcceptLimiter::Instance().deferring())
-            commSetSelect(fd, COMM_SELECT_READ, comm_accept_try, NULL, 0);
-        return false;
-    }
-
-    /*
-     * We don't worry about running low on FDs here.  Instead,
-     * httpAccept() will use AcceptLimiter if we reach the limit
-     * there.
-     */
-
-    /* Accept a new connection */
-    ConnectionDetail connDetails;
-    int newfd = comm_old_accept(fd, connDetails);
-
-    /* Check for errors */
-
-    if (newfd < 0) {
-        assert(theCallback != NULL);
-
-        if (newfd == COMM_NOMESSAGE) {
-            /* register interest again */
-            debugs(5, 5, HERE << "try later: FD " << fd <<
-                   " handler: " << *theCallback);
-            commSetSelect(fd, COMM_SELECT_READ, comm_accept_try, NULL, 0);
-            return false;
-        }
-
-        // A non-recoverable error; notify the caller */
-        notify(-1, COMM_ERROR, errno, connDetails);
-        return false;
-    }
-
-    assert(theCallback != NULL);
-    debugs(5, 5, "AcceptFD::acceptOne accepted: FD " << fd <<
-           " newfd: " << newfd << " from: " << connDetails.peer <<
-           " handler: " << *theCallback);
-    notify(newfd, COMM_OK, 0, connDetails);
-    return true;
-}
-
-void
-AcceptFD::acceptNext()
-{
-    mayAcceptMore = acceptOne();
-}
-
-void
-AcceptFD::notify(int newfd, comm_err_t errcode, int xerrno, const ConnectionDetail &connDetails)
-{
-    if (theCallback != NULL) {
-        typedef CommAcceptCbParams Params;
-        Params &params = GetCommParams<Params>(theCallback);
-        params.fd = fd;
-        params.nfd = newfd;
-        params.details = connDetails;
-        params.flag = errcode;
-        params.xerrno = xerrno;
-        ScheduleCallHere(theCallback);
-        theCallback = NULL;
-    }
-}
-
-/*
- * This callback is called whenever a filedescriptor is ready
- * to dupe itself and fob off an accept()ed connection
- */
-static void
-comm_accept_try(int fd, void *)
-{
-    assert(isOpen(fd));
-    fdc_table[fd].acceptNext();
-}
-
 void CommIO::Initialise()
 {
     /* Initialize done pipe signal */
@@ -2406,44 +2146,6 @@ CommIO::ResetNotifications()
     }
 }
 
-AcceptLimiter AcceptLimiter::Instance_;
-
-AcceptLimiter &AcceptLimiter::Instance()
-{
-    return Instance_;
-}
-
-bool
-AcceptLimiter::deferring() const
-{
-    return deferred.size() > 0;
-}
-
-void
-AcceptLimiter::defer (int fd, Acceptor::AcceptorFunction *aFunc, void *data)
-{
-    debugs(5, 5, "AcceptLimiter::defer: FD " << fd << " handler: " << (void*)aFunc);
-    Acceptor temp;
-    temp.theFunction = aFunc;
-    temp.acceptFD = fd;
-    temp.theData = data;
-    deferred.push_back(temp);
-}
-
-void
-AcceptLimiter::kick()
-{
-    if (!deferring())
-        return;
-
-    /* Yes, this means the first on is the last off....
-     * If the list container was a little more friendly, we could sensibly us it.
-     */
-    Acceptor temp = deferred.pop_back();
-
-    comm_accept (temp.acceptFD, temp.theFunction, temp.theData);
-}
-
 /// Start waiting for a possibly half-closed connection to close
 // by scheduling a read callback to a monitoring handler that
 // will close the connection on read errors.
index 1d1786b33c187674403cb3a1fdbe0aeed33022e5..e9c13cc9a5ae638e414b2b8ec8177c21cb4f592d 100644 (file)
@@ -37,7 +37,6 @@ typedef void IOCB(int fd, char *, size_t size, comm_err_t flag, int xerrno, void
 /* comm.c */
 extern bool comm_iocallbackpending(void); /* inline candidate */
 
-extern int comm_listen(int fd);
 SQUIDCEXTERN int commSetNonBlocking(int fd);
 SQUIDCEXTERN int commUnsetNonBlocking(int fd);
 SQUIDCEXTERN void commSetCloseOnExec(int fd);
@@ -103,8 +102,6 @@ SQUIDCEXTERN void comm_quick_poll_required(void);
 
 class ConnectionDetail;
 typedef void IOACB(int fd, int nfd, ConnectionDetail *details, comm_err_t flag, int xerrno, void *data);
-extern void comm_accept(int fd, IOACB *handler, void *handler_data);
-extern void comm_accept(int fd, AsyncCall::Pointer &call);
 extern void comm_add_close_handler(int fd, PF *, void *);
 extern void comm_add_close_handler(int fd, AsyncCall::Pointer &);
 extern void comm_remove_close_handler(int fd, PF *, void *);
@@ -133,33 +130,6 @@ extern bool commHasHalfClosedMonitor(int fd);
 inline void commMarkHalfClosed(int fd) { commStartHalfClosedMonitor(fd); }
 inline bool commIsHalfClosed(int fd) { return commHasHalfClosedMonitor(fd); }
 
-/* Not sure where these should live yet */
-
-class Acceptor
-{
-
-public:
-    typedef void AcceptorFunction (int, int, ConnectionDetail *, comm_err_t, int, void *);
-    AcceptorFunction *theFunction;
-    int acceptFD;
-    void *theData;
-};
-
-class AcceptLimiter
-{
-
-public:
-    static AcceptLimiter &Instance();
-    void defer (int, Acceptor::AcceptorFunction *, void *);
-    void kick();
-
-    bool deferring() const;
-
-private:
-    static AcceptLimiter Instance_;
-    Vector<Acceptor> deferred;
-};
-
 /* A comm engine that calls comm_select */
 
 class CommSelectEngine : public AsyncEngine
diff --git a/src/comm/AcceptLimiter.cc b/src/comm/AcceptLimiter.cc
new file mode 100644 (file)
index 0000000..7881db0
--- /dev/null
@@ -0,0 +1,32 @@
+#include "config.h"
+#include "comm/AcceptLimiter.h"
+#include "comm/ListenStateData.h"
+#include "fde.h"
+
+Comm::AcceptLimiter Comm::AcceptLimiter::Instance_;
+
+Comm::AcceptLimiter &Comm::AcceptLimiter::Instance()
+{
+    return Instance_;
+}
+
+void
+Comm::AcceptLimiter::defer(Comm::ListenStateData *afd)
+{
+    afd->isLimited++;
+    debugs(5, 5, HERE << "FD " << afd->fd << " x" << afd->isLimited);
+    deferred.push_back(afd);
+}
+
+void
+Comm::AcceptLimiter::kick()
+{
+    debugs(5, 5, HERE << " size=" << deferred.size());
+    if (deferred.size() > 0 && fdNFree() >= RESERVED_FD) {
+        debugs(5, 5, HERE << " doing one.");
+        /* NP: shift() is equivalent to pop_front(). Giving us a FIFO queue. */
+        ListenStateData *temp = deferred.shift();
+        temp->isLimited--;
+        temp->acceptNext();
+    }
+}
diff --git a/src/comm/AcceptLimiter.h b/src/comm/AcceptLimiter.h
new file mode 100644 (file)
index 0000000..4af96bf
--- /dev/null
@@ -0,0 +1,41 @@
+#ifndef _SQUID_SRC_COMM_ACCEPT_LIMITER_H
+#define _SQUID_SRC_COMM_ACCEPT_LIMITER_H
+
+#include "Array.h"
+
+namespace Comm {
+
+class ListenStateData;
+
+/**
+ * FIFO Queue holding listener socket handlers which have been activated
+ * ready to dupe their FD and accept() a new client connection.
+ * But when doing so there were not enough FD available to handle the
+ * new connection. These handlers are awaiting some FD to become free.
+ *
+ * defer - used only by Comm layer ListenStateData adding themselves when FD are limited.
+ * kick - used by Comm layer when FD are closed.
+ */
+class AcceptLimiter
+{
+
+public:
+    /** retrieve the global instance of the queue. */
+    static AcceptLimiter &Instance();
+
+    /** delay accepting a new client connection. */
+    void defer(Comm::ListenStateData *afd);
+
+    /** try to accept and begin processing any delayed client connections. */
+    void kick();
+
+private:
+    static AcceptLimiter Instance_;
+
+    /** FIFO queue */
+    Vector<Comm::ListenStateData*> deferred;
+};
+
+}; // namepace Comm
+
+#endif /* _SQUID_SRC_COMM_ACCEPT_LIMITER_H */
diff --git a/src/comm/ListenStateData.cc b/src/comm/ListenStateData.cc
new file mode 100644 (file)
index 0000000..9d8dd06
--- /dev/null
@@ -0,0 +1,317 @@
+/*
+ * DEBUG: section 5     Listener Socket Handler
+ * AUTHOR: Harvest Derived
+ *
+ * SQUID Web Proxy Cache          http://www.squid-cache.org/
+ * ----------------------------------------------------------
+ *
+ *  Squid is the result of efforts by numerous individuals from
+ *  the Internet community; see the CONTRIBUTORS file for full
+ *  details.   Many organizations have provided support for Squid's
+ *  development; see the SPONSORS file for full details.  Squid is
+ *  Copyrighted (C) 2001 by the Regents of the University of
+ *  California; see the COPYRIGHT file for full details.  Squid
+ *  incorporates software developed and/or copyrighted by other
+ *  sources; see the CREDITS file for full details.
+ *
+ *  This program is free software; you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation; either version 2 of the License, or
+ *  (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *
+ *  You should have received a copy of the GNU General Public License
+ *  along with this program; if not, write to the Free Software
+ *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
+ *
+ *
+ * Copyright (c) 2003, Robert Collins <robertc@squid-cache.org>
+ */
+
+#include "squid.h"
+#include "CommCalls.h"
+#include "comm/AcceptLimiter.h"
+#include "comm/comm_internal.h"
+#include "comm/ListenStateData.h"
+#include "ConnectionDetail.h"
+#include "fde.h"
+#include "SquidTime.h"
+
+/*
+ * This is not strictly needed at all.
+ * It's only needed by the cachemgr interface to list the currently active sockets.
+ * which could be done for HTTP/HTTPS by listing the http_port_list->listener->fd
+ * BUT, FTP data connections is a bit of a problem.
+ *
+ * AYJ: for now the old way of doing Comm:: actions sequentially in some caller
+ *      requires this to anchor each of those Comm:: functions together.
+ */
+std::map<int, Comm::ListenStateData*> Comm::CurrentListenerSockets;
+
+/**
+ * Set of listener sockets which are known to have events pending but we
+ * do not have enough sockets available to do the accept just yet.
+ */
+//std::list<Comm::ListenStateData*> Comm::PendingAccepts;
+
+
+/**
+ * New-style listen and accept routines
+ *
+ * Listen simply registers our interest in an FD for listening,
+ * and accept takes a callback to call when an FD has been
+ * accept()ed.
+ */
+int
+Comm::comm_listen(int sock)
+{
+    int x;
+
+    if ((x = listen(sock, Squid_MaxFD >> 2)) < 0) {
+        debugs(50, 0, HERE << "listen(" << (Squid_MaxFD >> 2) << ", " << sock << "): " << xstrerror());
+        return x;
+    }
+
+    if (Config.accept_filter && strcmp(Config.accept_filter, "none") != 0) {
+#ifdef SO_ACCEPTFILTER
+        struct accept_filter_arg afa;
+        bzero(&afa, sizeof(afa));
+        debugs(5, DBG_IMPORTANT, "Installing accept filter '" << Config.accept_filter << "' on FD " << sock);
+        xstrncpy(afa.af_name, Config.accept_filter, sizeof(afa.af_name));
+        x = setsockopt(sock, SOL_SOCKET, SO_ACCEPTFILTER, &afa, sizeof(afa));
+        if (x < 0)
+            debugs(5, DBG_CRITICAL, "SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror());
+#elif defined(TCP_DEFER_ACCEPT)
+        int seconds = 30;
+        if (strncmp(Config.accept_filter, "data=", 5) == 0)
+            seconds = atoi(Config.accept_filter + 5);
+        x = setsockopt(sock, IPPROTO_TCP, TCP_DEFER_ACCEPT, &seconds, sizeof(seconds));
+        if (x < 0)
+            debugs(5, DBG_CRITICAL, "TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror());
+#else
+        debugs(5, DBG_CRITICAL, "accept_filter not supported on your OS");
+#endif
+    }
+
+    return sock;
+}
+
+// TODO make this a constructor of ListenStateData...
+// better yet convert the places its used to setup AsyncCalls instead...
+Comm::ListenStateData *
+Comm::comm_accept(int fd, IOACB *handler, void *handler_data)
+{
+    debugs(5, 5, HERE << "FD " << fd << " handler: " << (void*)handler);
+    assert(isOpen(fd));
+
+    AsyncCall::Pointer call = commCbCall(5,5, "SomeCommAcceptHandler",
+                                         CommAcceptCbPtrFun(handler, handler_data));
+
+    return new Comm::ListenStateData(fd, call, false);
+}
+
+Comm::ListenStateData::ListenStateData(int aFd, AsyncCall::Pointer &call, bool accept_many) :
+    fd(aFd),
+    theCallback(call),
+    mayAcceptMore(accept_many)
+{
+    assert(aFd >= 0);
+    debugs(5, 5, HERE << "FD " << fd << " AsyncCall: " << call);
+    assert(isOpen(aFd));
+
+    CurrentListenerSockets[fd] = this;
+
+    errcode = comm_listen(fd);
+    commSetSelect(fd, COMM_SELECT_READ, doAccept, this, 0);
+}
+
+Comm::ListenStateData::~ListenStateData()
+{
+    // un-register listener before closing the FD.
+    // TODO: is this the right way to remove from a std::map<> ?
+    if (CurrentListenerSockets[fd])
+        CurrentListenerSockets[fd] = NULL;
+
+    comm_close(fd);
+    fd = -1;
+}
+
+/**
+ * This private callback is called whenever a filedescriptor is ready
+ * to dupe itself and fob off an accept()ed connection
+ *
+ * It will either do that accept operation. Or if there are not enough FD
+ * available to do the clone safely will push the listening FD into a list
+ * of deferred operations. The list gets kicked and the dupe/accept() actually
+ * done later when enough sockets become available.
+ */
+void
+Comm::ListenStateData::doAccept(int fd, void *data)
+{
+    debugs(5, 2, HERE << "New connection on FD " << fd);
+
+    assert(isOpen(fd));
+    ListenStateData *afd = static_cast<ListenStateData*>(data);
+
+    if (!okToAccept()) {
+        AcceptLimiter::Instance().defer(afd);
+    }
+    else {
+        afd->acceptNext();
+    }
+    commSetSelect(fd, COMM_SELECT_READ, Comm::ListenStateData::doAccept, afd, 0);
+}
+
+bool
+Comm::ListenStateData::okToAccept()
+{
+    static time_t last_warn = 0;
+
+    if (fdNFree() >= RESERVED_FD)
+        return true;
+
+    if (last_warn + 15 < squid_curtime) {
+        debugs(5, DBG_CRITICAL, "WARNING! Your cache is running out of filedescriptors");
+        last_warn = squid_curtime;
+    }
+
+    return false;
+}
+
+bool
+Comm::ListenStateData::acceptOne()
+{
+    /*
+     * We don't worry about running low on FDs here.  Instead,
+     * doAccept() will use AcceptLimiter if we reach the limit
+     * there.
+     */
+
+    /* Accept a new connection */
+    ConnectionDetail connDetails;
+    int newfd = oldAccept(connDetails);
+
+    /* Check for errors */
+    if (newfd < 0) {
+
+        if (newfd == COMM_NOMESSAGE) {
+            /* register interest again */
+            debugs(5, 5, HERE << "try later: FD " << fd << " handler: " << *theCallback);
+            commSetSelect(fd, COMM_SELECT_READ, doAccept, this, 0);
+            return false;
+        }
+
+        // A non-recoverable error; notify the caller */
+        debugs(5, 5, HERE << "non-recoverable error: FD " << fd << " handler: " << *theCallback);
+        notify(-1, COMM_ERROR, errno, connDetails);
+        return false;
+    }
+
+    debugs(5, 5, HERE << "accepted: FD " << fd <<
+           " newfd: " << newfd << " from: " << connDetails.peer <<
+           " handler: " << *theCallback);
+    notify(newfd, COMM_OK, 0, connDetails);
+    return true;
+}
+
+void
+Comm::ListenStateData::acceptNext()
+{
+    assert(isOpen(fd));
+    debugs(5, 2, HERE << "connection on FD " << fd);
+    mayAcceptMore = acceptOne();
+}
+
+void
+Comm::ListenStateData::notify(int newfd, comm_err_t errcode, int xerrno, const ConnectionDetail &connDetails)
+{
+    // listener socket handlers just abandon the port with COMM_ERR_CLOSING
+    // it should only happen when this object is deleted...
+    if (errcode == COMM_ERR_CLOSING) {
+        return;
+    }
+
+    if (theCallback != NULL) {
+        typedef CommAcceptCbParams Params;
+        Params &params = GetCommParams<Params>(theCallback);
+        params.fd = fd;
+        params.nfd = newfd;
+        params.details = connDetails;
+        params.flag = errcode;
+        params.xerrno = xerrno;
+        ScheduleCallHere(theCallback);
+        if (!mayAcceptMore)
+            theCallback = NULL;
+    }
+}
+
+/**
+ * accept() and process 
+ * Wait for an incoming connection on FD.  FD should be a socket returned
+ * from comm_listen. */
+int
+Comm::ListenStateData::oldAccept(ConnectionDetail &details)
+{
+    PROF_start(comm_accept);
+    statCounter.syscalls.sock.accepts++;
+    int sock;
+    struct addrinfo *gai = NULL;
+    details.me.InitAddrInfo(gai);
+
+    if ((sock = accept(fd, gai->ai_addr, &gai->ai_addrlen)) < 0) {
+
+        details.me.FreeAddrInfo(gai);
+
+        PROF_stop(comm_accept);
+
+        if (ignoreErrno(errno)) {
+            debugs(50, 5, HERE << "FD " << fd << ": " << xstrerror());
+            return COMM_NOMESSAGE;
+        } else if (ENFILE == errno || EMFILE == errno) {
+            debugs(50, 3, HERE << "FD " << fd << ": " << xstrerror());
+            return COMM_ERROR;
+        } else {
+            debugs(50, 1, HERE << "FD " << fd << ": " << xstrerror());
+            return COMM_ERROR;
+        }
+    }
+
+    details.peer = *gai;
+
+    details.me.InitAddrInfo(gai);
+
+    details.me.SetEmpty();
+    getsockname(sock, gai->ai_addr, &gai->ai_addrlen);
+    details.me = *gai;
+
+    commSetCloseOnExec(sock);
+
+    /* fdstat update */
+    fd_open(sock, FD_SOCKET, "HTTP Request");
+
+    fdd_table[sock].close_file = NULL;
+    fdd_table[sock].close_line = 0;
+
+    fde *F = &fd_table[sock];
+    details.peer.NtoA(F->ipaddr,MAX_IPSTRLEN);
+    F->remote_port = details.peer.GetPort();
+    F->local_addr.SetPort(details.me.GetPort());
+#if USE_IPV6
+    F->sock_family = AF_INET;
+#else
+    F->sock_family = details.me.IsIPv4()?AF_INET:AF_INET6;
+#endif
+    details.me.FreeAddrInfo(gai);
+
+    commSetNonBlocking(sock);
+
+    /* IFF the socket is (tproxy) transparent, pass the flag down to allow spoofing */
+    F->flags.transparent = fd_table[fd].flags.transparent;
+
+    PROF_stop(comm_accept);
+    return sock;
+}
diff --git a/src/comm/ListenStateData.h b/src/comm/ListenStateData.h
new file mode 100644 (file)
index 0000000..f43997e
--- /dev/null
@@ -0,0 +1,58 @@
+#ifndef SQUID_LISTENERSTATEDATA_H
+#define SQUID_LISTENERSTATEDATA_H
+
+#include "config.h"
+#include "base/AsyncCall.h"
+#include "comm.h"
+#if HAVE_MAP
+#include <map>
+#endif
+
+class ConnectionDetail;
+
+namespace Comm {
+
+class ListenStateData
+{
+
+public:
+    ListenStateData(int fd, AsyncCall::Pointer &call, bool accept_many);
+    ListenStateData(const ListenStateData &r); // not implemented.
+    ~ListenStateData();
+
+    void subscribe(AsyncCall::Pointer &call);
+    void acceptNext();
+    void notify(int newfd, comm_err_t, int xerrno, const ConnectionDetail &);
+
+    int fd;
+
+    /// errno code if any happened so far.
+    int errcode;
+
+    /// whether this socket is delayed and on the AcceptLimiter queue.
+    int32_t isLimited;
+
+private:
+    /// Method to test if there are enough file escriptors to open a new client connection
+    /// if not the accept() will be postponed
+    static bool okToAccept();
+
+    /// Method callback for whenever an FD is ready to accept a client connection.
+    static void doAccept(int fd, void *data);
+
+    bool acceptOne();
+    int oldAccept(ConnectionDetail &details);
+
+    AsyncCall::Pointer theCallback;
+    bool mayAcceptMore;
+};
+
+extern std::map<int,ListenStateData*> CurrentListenerSockets;
+
+// remaining legacy functions. TODO replace all uses with the ListenData constructor
+extern int comm_listen(int fd);
+extern ListenStateData *comm_accept(int fd, IOACB *handler, void *handler_data);
+
+}; // namespace Comm
+
+#endif /* SQUID_LISTENERSTATEDATA_H */
diff --git a/src/comm/Makefile.am b/src/comm/Makefile.am
new file mode 100644 (file)
index 0000000..09cb1c1
--- /dev/null
@@ -0,0 +1,13 @@
+include $(top_srcdir)/src/Common.am
+include $(top_srcdir)/src/TestHeaders.am
+
+noinst_LTLIBRARIES = libcomm-listener.la
+
+## Library holding listener comm socket handlers
+libcomm_listener_la_SOURCES= \
+       AcceptLimiter.cc \
+       AcceptLimiter.h \
+       ListenStateData.cc \
+       ListenStateData.h \
+       \
+       comm_internal.h
diff --git a/src/comm/comm_internal.h b/src/comm/comm_internal.h
new file mode 100644 (file)
index 0000000..a328545
--- /dev/null
@@ -0,0 +1,16 @@
+#ifndef SQUID_COMM_COMM_INTERNAL_H
+#define SQUID_COMM_COMM_INTERNAL_H
+
+/* misc collection of bits shared by Comm code, but not needed by the rest of Squid. */
+
+struct _fd_debug_t {
+    char const *close_file;
+    int close_line;
+};
+
+typedef struct _fd_debug_t fd_debug_t;
+extern fd_debug_t *fdd_table;
+
+extern bool isOpen(const int fd);
+
+#endif
index 3b503d2b51e0e903db4d9173b8c85ae3c590726f..7d8f9a74007f73998425b320379975fbb7747eba 100644 (file)
--- a/src/fde.h
+++ b/src/fde.h
@@ -125,4 +125,6 @@ private:
 
 };
 
+SQUIDCEXTERN int fdNFree(void);
+
 #endif /* SQUID_FDE_H */
index 8541d7ab4cb51b14d73fec09d59d75ec290c7df4..54a8520bae06570380612d4904df72dc2bfbfd51 100644 (file)
  */
 
 #include "squid.h"
-#include "Store.h"
-#include "HttpRequest.h"
-#include "HttpReply.h"
+#include "comm.h"
+#include "comm/ListenStateData.h"
+#include "ConnectionDetail.h"
 #include "errorpage.h"
 #include "fde.h"
-#include "comm.h"
-#include "HttpHeaderRange.h"
+#include "forward.h"
 #include "HttpHdrContRange.h"
+#include "HttpHeaderRange.h"
 #include "HttpHeader.h"
+#include "HttpRequest.h"
+#include "HttpReply.h"
+#include "MemBuf.h"
+#include "Server.h"
+#include "SquidTime.h"
+#include "Store.h"
+#include "URLScheme.h"
+#include "wordlist.h"
+
 #if DELAY_POOLS
 #include "DelayPools.h"
 #include "MemObject.h"
 #endif
-#include "ConnectionDetail.h"
-#include "forward.h"
-#include "Server.h"
-#include "MemBuf.h"
-#include "wordlist.h"
-#include "SquidTime.h"
-#include "URLScheme.h"
 
 /**
  \defgroup ServerProtocolFTPInternal Server-Side FTP Internals
@@ -139,11 +141,21 @@ public:
     /// called after the socket is opened, sets up close handler
     void opened(int aFd, const AsyncCall::Pointer &aCloser);
 
-    void close(); /// clears the close handler and calls comm_close
-    void clear(); /// just resets fd and close handler
+    /** Handles all operations needed to properly close the active channel FD.
+     * clearing the close handler, clearing the listen socket properly, and calling comm_close
+     */
+    void close();
+
+    void clear(); /// just resets fd and close handler. does not close active connections.
 
     int fd; /// channel descriptor; \todo: remove because the closer has it
 
+    /** Current listening socket handler. delete on shutdown or abort.
+     * FTP stores a copy of the FD in the field fd above.
+     * Use close() to properly close the channel.
+     */
+    Comm::ListenStateData *listener;
+
 private:
     AsyncCall::Pointer closer; /// Comm close handler callback
 };
@@ -434,6 +446,11 @@ FtpStateData::ctrlClosed(const CommCloseCbParams &io)
 void
 FtpStateData::dataClosed(const CommCloseCbParams &io)
 {
+    if (data.listener) {
+        delete data.listener;
+        data.listener = NULL;
+        data.fd = -1;
+    }
     data.clear();
     failed(ERR_FTP_FAILURE, 0);
     /* failed closes ctrl.fd and frees ftpState */
@@ -2858,7 +2875,7 @@ ftpOpenListenSocket(FtpStateData * ftpState, int fallback)
     int on = 1;
     int x = 0;
 
-    /// Close old data channel, if any. We may open a new one below.
+    /// Close old data channels, if any. We may open a new one below.
     ftpState->data.close();
 
     /*
@@ -2902,7 +2919,12 @@ ftpOpenListenSocket(FtpStateData * ftpState, int fallback)
         return -1;
     }
 
-    if (comm_listen(fd) < 0) {
+    typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
+    AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection",
+                                    acceptDialer(ftpState, &FtpStateData::ftpAcceptDataConnection));
+    ftpState->data.listener = new Comm::ListenStateData(fd, acceptCall, false);
+
+    if (!ftpState->data.listener || ftpState->data.listener->errcode < 0) {
         comm_close(fd);
         return -1;
     }
@@ -3057,8 +3079,8 @@ void FtpStateData::ftpAcceptDataConnection(const CommAcceptCbParams &io)
     char ntoapeer[MAX_IPSTRLEN];
     debugs(9, 3, "ftpAcceptDataConnection");
 
-    if (io.flag == COMM_ERR_CLOSING)
-        return;
+    // one connection accepted. the handler has stopped listening. drop our local pointer to it.
+    data.listener = NULL;
 
     if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) {
         abortTransaction("entry aborted when accepting data conn");
@@ -3079,17 +3101,20 @@ void FtpStateData::ftpAcceptDataConnection(const CommAcceptCbParams &io)
                    io.details.peer << "), expecting " <<
                    fd_table[ctrl.fd].ipaddr);
 
+            /* close the bad soures connection down ASAP. */
             comm_close(io.nfd);
+
+            /* we are ony accepting once, so need to re-open the listener socket. */
             typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
             AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection",
                                             acceptDialer(this, &FtpStateData::ftpAcceptDataConnection));
-            comm_accept(data.fd, acceptCall);
+            data.listener = new Comm::ListenStateData(data.fd, acceptCall, false);
             return;
         }
     }
 
     if (io.flag != COMM_OK) {
-        debugs(9, DBG_IMPORTANT, "ftpHandleDataAccept: comm_accept(" << io.nfd << "): " << xstrerr(io.xerrno));
+        debugs(9, DBG_IMPORTANT, "ftpHandleDataAccept: FD " << io.nfd << ": " << xstrerr(io.xerrno));
         /** \todo XXX Need to set error message */
         ftpFail(this);
         return;
@@ -3220,7 +3245,7 @@ void FtpStateData::readStor()
         AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection",
                                         acceptDialer(this, &FtpStateData::ftpAcceptDataConnection));
 
-        comm_accept(data.fd, acceptCall);
+        data.listener = new Comm::ListenStateData(data.fd, acceptCall, false);
     } else {
         debugs(9, DBG_IMPORTANT, HERE << "Unexpected reply code "<< std::setfill('0') << std::setw(3) << code);
         ftpFail(this);
@@ -3356,7 +3381,7 @@ ftpReadList(FtpStateData * ftpState)
         AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection",
                                         acceptDialer(ftpState, &FtpStateData::ftpAcceptDataConnection));
 
-        comm_accept(ftpState->data.fd, acceptCall);
+        ftpState->data.listener = new Comm::ListenStateData(ftpState->data.fd, acceptCall, false);
         /*
          * Cancel the timeout on the Control socket and establish one
          * on the data socket
@@ -3417,7 +3442,7 @@ ftpReadRetr(FtpStateData * ftpState)
         typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
         AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection",
                                         acceptDialer(ftpState, &FtpStateData::ftpAcceptDataConnection));
-        comm_accept(ftpState->data.fd, acceptCall);
+        ftpState->data.listener = new Comm::ListenStateData(ftpState->data.fd, acceptCall, false);
         /*
          * Cancel the timeout on the Control socket and establish one
          * on the data socket
@@ -3982,7 +4007,7 @@ FtpStateData::closeServer()
 /**
  * Did we close all FTP server connection(s)?
  *
- \retval true  Both server control and data channels are closed.
+ \retval true  Both server control and data channels are closed. And not waitigng for a new data connection to open.
  \retval false Either control channel or data is still active.
  */
 bool
@@ -4061,7 +4086,15 @@ FtpChannel::opened(int aFd, const AsyncCall::Pointer &aCloser)
 void
 FtpChannel::close()
 {
-    if (fd >= 0) {
+    // channels with active listeners will be closed when the listener handler dies.
+    if (listener) {
+        delete listener;
+        listener = NULL;
+        comm_remove_close_handler(fd, closer);
+        closer = NULL;
+        fd = -1;
+    }
+    else if (fd >= 0) {
         comm_remove_close_handler(fd, closer);
         closer = NULL;
         comm_close(fd); // we do not expect to be called back
index 46f5ba37392cf10230b172a5bf5353f6d6058920..f09e710231844560f64b7e6652f5e6b47b6eb480 100644 (file)
@@ -143,7 +143,6 @@ SQUIDCEXTERN void fd_open(int fd, unsigned int type, const char *);
 SQUIDCEXTERN void fd_note(int fd, const char *);
 SQUIDCEXTERN void fd_bytes(int fd, int len, unsigned int type);
 SQUIDCEXTERN void fdDumpOpen(void);
-SQUIDCEXTERN int fdNFree(void);
 SQUIDCEXTERN int fdUsageHigh(void);
 SQUIDCEXTERN void fdAdjustReserved(void);
 
index e16f869e78ab212523ad103fc0084837346b9d8c..71c3954181bb4f9d9d167e23395581ac708cba52 100644 (file)
@@ -35,6 +35,7 @@
 
 #include "squid.h"
 #include "event.h"
+#include "fde.h"
 #include "Store.h"
 #include "CacheManager.h"
 #include "StoreClient.h"
index 50522e538da90aaed79ab5a00cee1353aaea634a..bace6c07bfce4f5db178784b12b83a2a2ce9ddf8 100644 (file)
@@ -32,7 +32,8 @@
  *
  */
 
-#include "squid.h"
+#include "config.h"
+#include "fde.h"
 
 int
 fdNFree(void)