From 04f5590583c7d4b09190196ed23a7ea4b34a0bda Mon Sep 17 00:00:00 2001 From: Amos Jeffries Date: Thu, 31 Dec 2009 15:35:01 +1300 Subject: [PATCH] Move listener socket handling to libcomm-listener.la * Renamed AcceptFD to ListenStateData * listener sockets 'owned' by the code which opened them - FtpStateData owns the FTP data sockets - config HTTP settings own the http_port/https_port listeners. * Removed fdc_table - replaced with Comm::CurrentListenerSockets. - reasonable memory savings (default 32-bit build only 8 KB, large production 64-bit build save 768 KB) - initialized in one simple fast step. - changes only made by ListenStateData constructor/destructor * HotConf ready listener sockets (depends only on further config changes to happen) --- configure.in | 2 + src/Makefile.am | 14 +- src/ProtoPort.cc | 4 +- src/ProtoPort.h | 8 + src/client_side.cc | 103 +++++------- src/comm.cc | 324 ++---------------------------------- src/comm.h | 30 ---- src/comm/AcceptLimiter.cc | 32 ++++ src/comm/AcceptLimiter.h | 41 +++++ src/comm/ListenStateData.cc | 317 +++++++++++++++++++++++++++++++++++ src/comm/ListenStateData.h | 58 +++++++ src/comm/Makefile.am | 13 ++ src/comm/comm_internal.h | 16 ++ src/fde.h | 2 + src/ftp.cc | 83 ++++++--- src/protos.h | 1 - src/store.cc | 1 + src/tests/stub_fd.cc | 3 +- 18 files changed, 616 insertions(+), 436 deletions(-) create mode 100644 src/comm/AcceptLimiter.cc create mode 100644 src/comm/AcceptLimiter.h create mode 100644 src/comm/ListenStateData.cc create mode 100644 src/comm/ListenStateData.h create mode 100644 src/comm/Makefile.am create mode 100644 src/comm/comm_internal.h diff --git a/configure.in b/configure.in index 51cbce6dc1..0035eb0116 100644 --- a/configure.in +++ b/configure.in @@ -2433,6 +2433,7 @@ AC_CHECK_HEADERS( \ linux/types.h \ machine/byte_swap.h \ malloc.h \ + map \ math.h \ memory.h \ mount.h \ @@ -4136,6 +4137,7 @@ AC_CONFIG_FILES([\ src/adaptation/Makefile \ src/adaptation/icap/Makefile \ src/adaptation/ecap/Makefile \ + src/comm/Makefile \ src/esi/Makefile \ src/eui/Makefile \ src/icmp/Makefile \ diff --git a/src/Makefile.am b/src/Makefile.am index 96195df96d..a137123953 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -34,7 +34,7 @@ LOADABLE_MODULES_SOURCES = \ LoadableModules.h \ LoadableModules.cc -SUBDIRS = base eui acl fs repl auth ip icmp ident +SUBDIRS = base comm eui acl fs repl auth ip icmp ident if USE_ADAPTATION SUBDIRS += adaptation @@ -534,6 +534,7 @@ nodist_squid_SOURCES = \ squid_LDADD = \ $(COMMON_LIBS) \ + comm/libcomm-listener.la \ icmp/libicmp.la icmp/libicmp-core.la \ @XTRA_OBJS@ \ @DISK_LINKOBJS@ \ @@ -1196,8 +1197,10 @@ tests_testCacheManager_SOURCES = \ wordlist.cc nodist_tests_testCacheManager_SOURCES = \ $(BUILT_SOURCES) +# comm.cc only requires comm/libcomm-listener.la until fdc_table is dead. tests_testCacheManager_LDADD = \ $(COMMON_LIBS) \ + comm/libcomm-listener.la \ icmp/libicmp.la icmp/libicmp-core.la \ @REPL_OBJS@ \ ${ADAPTATION_LIBS} \ @@ -1372,6 +1375,7 @@ nodist_tests_testEvent_SOURCES = \ tests_testEvent_LDADD = \ $(COMMON_LIBS) \ icmp/libicmp.la icmp/libicmp-core.la \ + comm/libcomm-listener.la \ @REPL_OBJS@ \ ${ADAPTATION_LIBS} \ $(ESI_LIBS) \ @@ -1521,6 +1525,7 @@ nodist_tests_testEventLoop_SOURCES = \ tests_testEventLoop_LDADD = \ $(COMMON_LIBS) \ icmp/libicmp.la icmp/libicmp-core.la \ + comm/libcomm-listener.la \ @REPL_OBJS@ \ ${ADAPTATION_LIBS} \ $(ESI_LIBS) \ @@ -1665,6 +1670,7 @@ nodist_tests_test_http_range_SOURCES = \ tests_test_http_range_LDADD = \ $(COMMON_LIBS) \ icmp/libicmp.la icmp/libicmp-core.la \ + comm/libcomm-listener.la \ @REPL_OBJS@ \ ${ADAPTATION_LIBS} \ $(ESI_LIBS) \ @@ -1814,6 +1820,7 @@ nodist_tests_testHttpRequest_SOURCES = \ tests_testHttpRequest_LDADD = \ $(COMMON_LIBS) \ icmp/libicmp.la icmp/libicmp-core.la \ + comm/libcomm-listener.la \ @REPL_OBJS@ \ ${ADAPTATION_LIBS} \ $(ESI_LIBS) \ @@ -1887,10 +1894,10 @@ tests_testStore_SOURCES= \ tests/testStoreHashIndex.h \ tests/TestSwapDir.cc \ tests/TestSwapDir.h \ - tests/stub_fd.cc \ tests/stub_HttpReply.cc \ tests/stub_cache_manager.cc \ - $(STORE_TEST_SOURCES) + $(STORE_TEST_SOURCES) \ + tests/stub_fd.cc nodist_tests_testStore_SOURCES= \ $(TESTSOURCES) \ @@ -2164,6 +2171,7 @@ nodist_tests_testURL_SOURCES = \ tests_testURL_LDADD = \ $(COMMON_LIBS) \ icmp/libicmp.la icmp/libicmp-core.la \ + comm/libcomm-listener.la \ @REGEXLIB@ \ @REPL_OBJS@ \ ${ADAPTATION_LIBS} \ diff --git a/src/ProtoPort.cc b/src/ProtoPort.cc index 999b0b17e2..c03f2efa79 100644 --- a/src/ProtoPort.cc +++ b/src/ProtoPort.cc @@ -1,7 +1,5 @@ - /* * $Id$ - * */ #include "squid.h" @@ -18,6 +16,8 @@ http_port_list::http_port_list(const char *aProtocol) http_port_list::~http_port_list() { + delete listener; + safe_free(name); safe_free(defaultsite); safe_free(protocol); diff --git a/src/ProtoPort.h b/src/ProtoPort.h index 15d6abf648..aa8a3321f3 100644 --- a/src/ProtoPort.h +++ b/src/ProtoPort.h @@ -6,6 +6,7 @@ //#include "typedefs.h" #include "cbdata.h" +#include "comm/ListenStateData.h" struct http_port_list { http_port_list(const char *aProtocol); @@ -37,6 +38,13 @@ struct http_port_list { unsigned int timeout; } tcp_keepalive; + /** + * The FD listening socket handler. + * If not NULL we are actively listening for client requests. + * delete to close the socket. + */ + Comm::ListenStateData *listener; + #if USE_SSL // XXX: temporary hack to ease move of SSL options to http_port http_port_list &http; diff --git a/src/client_side.cc b/src/client_side.cc index 5377c8ee06..98bd6c65b7 100644 --- a/src/client_side.cc +++ b/src/client_side.cc @@ -82,28 +82,30 @@ */ #include "squid.h" + +#include "acl/FilledChecklist.h" +#include "auth/UserRequest.h" +#include "ChunkedCodingParser.h" #include "client_side.h" +#include "client_side_reply.h" +#include "client_side_request.h" +#include "ClientRequestContext.h" #include "clientStream.h" -#include "ProtoPort.h" -#include "auth/UserRequest.h" -#include "Store.h" #include "comm.h" +#include "comm/ListenStateData.h" +#include "ConnectionDetail.h" +#include "fde.h" #include "HttpHdrContRange.h" #include "HttpReply.h" #include "HttpRequest.h" #include "ident/Config.h" #include "ident/Ident.h" #include "ip/IpIntercept.h" -#include "MemObject.h" -#include "fde.h" -#include "client_side_request.h" -#include "acl/FilledChecklist.h" -#include "ConnectionDetail.h" -#include "client_side_reply.h" -#include "ClientRequestContext.h" #include "MemBuf.h" +#include "MemObject.h" +#include "ProtoPort.h" #include "SquidTime.h" -#include "ChunkedCodingParser.h" +#include "Store.h" #if LINGERING_CLOSE #define comm_close comm_lingering_close @@ -147,7 +149,6 @@ static CSCB clientSocketRecipient; static CSD clientSocketDetach; static void clientSetKeepaliveFlag(ClientHttpRequest *); static int clientIsContentLengthValid(HttpRequest * r); -static bool okToAccept(); static int clientIsRequestBodyValid(int64_t bodyLength); static int clientIsRequestBodyTooLargeForPolicy(int64_t bodyLength); @@ -2925,8 +2926,6 @@ ConnStateData::requestTimeout(const CommTimeoutCbParams &io) #endif } - - static void clientLifetimeTimeout(int fd, void *data) { @@ -2936,22 +2935,6 @@ clientLifetimeTimeout(int fd, void *data) comm_close(fd); } -static bool -okToAccept() -{ - static time_t last_warn = 0; - - if (fdNFree() >= RESERVED_FD) - return true; - - if (last_warn + 15 < squid_curtime) { - debugs(33, 0, HERE << "WARNING! Your cache is running out of filedescriptors"); - last_warn = squid_curtime; - } - - return false; -} - ConnStateData * connStateCreate(const IpAddress &peer, const IpAddress &me, int fd, http_port_list *port) { @@ -3006,16 +2989,6 @@ httpAccept(int sock, int newfd, ConnectionDetail *details, http_port_list *s = (http_port_list *)data; ConnStateData *connState = NULL; - if (flag == COMM_ERR_CLOSING) { - return; - } - - if (!okToAccept()) - AcceptLimiter::Instance().defer (sock, httpAccept, data); - else - /* kick off another one for later */ - comm_accept(sock, httpAccept, data); - if (flag != COMM_OK) { debugs(33, 1, "httpAccept: FD " << sock << ": accept failure: " << xstrerr(xerrno)); return; @@ -3212,16 +3185,6 @@ httpsAccept(int sock, int newfd, ConnectionDetail *details, https_port_list *s = (https_port_list *)data; SSL_CTX *sslContext = s->sslContext; - if (flag == COMM_ERR_CLOSING) { - return; - } - - if (!okToAccept()) - AcceptLimiter::Instance().defer (sock, httpsAccept, data); - else - /* kick off another one for later */ - comm_accept(sock, httpsAccept, data); - if (flag != COMM_OK) { errno = xerrno; debugs(33, 1, "httpsAccept: FD " << sock << ": accept failure: " << xstrerr(xerrno)); @@ -3257,7 +3220,6 @@ httpsAccept(int sock, int newfd, ConnectionDetail *details, if (identChecklist.fastCheck()) Ident::Start(details->me, details->peer, clientIdentDone, connState); } - #endif if (s->http.tcp_keepalive.enabled) { @@ -3333,6 +3295,8 @@ clientHttpConnectionsOpen(void) ++bumpCount; #endif + /* AYJ: 2009-12-27: bit bumpy. new ListenStateData(...) should be doing all the Comm:: stuff ... */ + enter_suid(); if (s->spoof_client_ip) { @@ -3346,9 +3310,10 @@ clientHttpConnectionsOpen(void) if (fd < 0) continue; - comm_listen(fd); + AsyncCall::Pointer call = commCbCall(5,5, "SomeCommAcceptHandler(httpAccept)", + CommAcceptCbPtrFun(httpAccept, s)); - comm_accept(fd, httpAccept, s); + s->listener = new Comm::ListenStateData(fd, call, true); debugs(1, 1, "Accepting " << (s->intercepted ? " intercepted" : "") << @@ -3399,9 +3364,10 @@ clientHttpsConnectionsOpen(void) if (fd < 0) continue; - comm_listen(fd); + AsyncCall::Pointer call = commCbCall(5,5, "SomeCommAcceptHandler(httpsAccept)", + CommAcceptCbPtrFun(httpsAccept, s)); - comm_accept(fd, httpsAccept, s); + s->listener = new Comm::ListenStateData(fd, call, true); debugs(1, 1, "Accepting HTTPS connections at " << s->http.s << ", FD " << fd << "."); @@ -3416,7 +3382,6 @@ clientOpenListenSockets(void) { clientHttpConnectionsOpen(); #if USE_SSL - clientHttpsConnectionsOpen(); #endif @@ -3427,16 +3392,28 @@ clientOpenListenSockets(void) void clientHttpConnectionsClose(void) { - int i; + for (http_port_list *s = Config.Sockaddr.http; s; s = s->next) { + if (s->listener) { + debugs(1, 1, "FD " << s->listener->fd << " Closing HTTP connection"); + delete s->listener; + s->listener = NULL; + } + } - for (i = 0; i < NHttpSockets; i++) { - if (HttpSockets[i] >= 0) { - debugs(1, 1, "FD " << HttpSockets[i] << - " Closing HTTP connection"); - comm_close(HttpSockets[i]); - HttpSockets[i] = -1; +#if USE_SSL + for (http_port_list *s = Config.Sockaddr.https; s; s = s->next) { + if (s->listener) { + debugs(1, 1, "FD " << s->listener->fd << " Closing HTTPS connection"); + delete s->listener; + s->listener = NULL; } } +#endif + + // TODO see if we can drop HttpSockets array entirely */ + for (int i = 0; i < NHttpSockets; i++) { + HttpSockets[i] = -1; + } NHttpSockets = 0; } diff --git a/src/comm.cc b/src/comm.cc index 1319eccdda..5a297df815 100644 --- a/src/comm.cc +++ b/src/comm.cc @@ -37,6 +37,9 @@ #include "comm.h" #include "event.h" #include "fde.h" +#include "comm/AcceptLimiter.h" +#include "comm/comm_internal.h" +#include "comm/ListenStateData.h" #include "CommIO.h" #include "CommRead.h" #include "ConnectionDetail.h" @@ -242,44 +245,15 @@ static PF commConnectFree; static PF commHandleWrite; static IPH commConnectDnsHandle; -static PF comm_accept_try; - -class AcceptFD -{ - -public: - AcceptFD(int aFd = -1): fd(aFd), theCallback(0), mayAcceptMore(false) {} - - void subscribe(AsyncCall::Pointer &call); - void acceptNext(); - void notify(int newfd, comm_err_t, int xerrno, const ConnectionDetail &); - - int fd; - -private: - bool acceptOne(); - - AsyncCall::Pointer theCallback; - bool mayAcceptMore; -}; - typedef enum { COMM_CB_READ = 1, COMM_CB_DERIVED, } comm_callback_t; -struct _fd_debug_t { - char const *close_file; - int close_line; -}; - -typedef struct _fd_debug_t fd_debug_t; - static MemAllocator *conn_close_pool = NULL; -AcceptFD *fdc_table = NULL; // TODO: rename. And use Vector<>? fd_debug_t *fdd_table = NULL; -static bool +bool isOpen(const int fd) { return fd_table[fd].flags.open != 0; @@ -1372,69 +1346,6 @@ comm_connect_addr(int sock, const IpAddress &address) return status; } -/* Wait for an incoming connection on FD. FD should be a socket returned - * from comm_listen. */ -static int -comm_old_accept(int fd, ConnectionDetail &details) -{ - PROF_start(comm_accept); - statCounter.syscalls.sock.accepts++; - int sock; - struct addrinfo *gai = NULL; - details.me.InitAddrInfo(gai); - - if ((sock = accept(fd, gai->ai_addr, &gai->ai_addrlen)) < 0) { - - details.me.FreeAddrInfo(gai); - - PROF_stop(comm_accept); - - if (ignoreErrno(errno)) { - debugs(50, 5, "comm_old_accept: FD " << fd << ": " << xstrerror()); - return COMM_NOMESSAGE; - } else if (ENFILE == errno || EMFILE == errno) { - debugs(50, 3, "comm_old_accept: FD " << fd << ": " << xstrerror()); - return COMM_ERROR; - } else { - debugs(50, 1, "comm_old_accept: FD " << fd << ": " << xstrerror()); - return COMM_ERROR; - } - } - - details.peer = *gai; - - details.me.InitAddrInfo(gai); - - details.me.SetEmpty(); - getsockname(sock, gai->ai_addr, &gai->ai_addrlen); - details.me = *gai; - - commSetCloseOnExec(sock); - - /* fdstat update */ - fd_open(sock, FD_SOCKET, "HTTP Request"); - fdd_table[sock].close_file = NULL; - fdd_table[sock].close_line = 0; - fde *F = &fd_table[sock]; - details.peer.NtoA(F->ipaddr,MAX_IPSTRLEN); - F->remote_port = details.peer.GetPort(); - F->local_addr.SetPort(details.me.GetPort()); -#if USE_IPV6 - F->sock_family = AF_INET; -#else - F->sock_family = details.me.IsIPv4()?AF_INET:AF_INET6; -#endif - details.me.FreeAddrInfo(gai); - - commSetNonBlocking(sock); - - /* IFF the socket is (tproxy) transparent, pass the flag down to allow spoofing */ - F->flags.transparent = fd_table[fd].flags.transparent; - - PROF_stop(comm_accept); - return sock; -} - void commCallCloseHandlers(int fd) { @@ -1532,7 +1443,6 @@ comm_close_start(int fd, void *data) } - void comm_close_complete(int fd, void *data) { @@ -1549,15 +1459,10 @@ comm_close_complete(int fd, void *data) close(fd); - fdc_table[fd] = AcceptFD(fd); - statCounter.syscalls.sock.closes++; /* When an fd closes, give accept() a chance, if need be */ - - if (fdNFree() >= RESERVED_FD) - AcceptLimiter::Instance().kick(); - + Comm::AcceptLimiter::Instance().kick(); } /* @@ -1617,8 +1522,8 @@ _comm_close(int fd, char const *file, int line) commio_finish_callback(fd, COMMIO_FD_READCB(fd), COMM_ERR_CLOSING, errno); } - // notify accept handlers - fdc_table[fd].notify(-1, COMM_ERR_CLOSING, 0, ConnectionDetail()); + // Listening FD need to be closed by deleting their ListenStateData object. + assert(Comm::CurrentListenerSockets[fd] == NULL); commCallCloseHandlers(fd); @@ -1932,10 +1837,11 @@ comm_init(void) fd_table =(fde *) xcalloc(Squid_MaxFD, sizeof(fde)); fdd_table = (fd_debug_t *)xcalloc(Squid_MaxFD, sizeof(fd_debug_t)); - fdc_table = new AcceptFD[Squid_MaxFD]; - for (int pos = 0; pos < Squid_MaxFD; ++pos) { - fdc_table[pos] = AcceptFD(pos); - } + /* make sure we have an empty listening socket map */ + Comm::CurrentListenerSockets.clear(); + + /* make sure the accept() socket FIFO delay queue exists */ + Comm::AcceptLimiter::Instance(); commfd_table = (comm_fd_t *) xcalloc(Squid_MaxFD, sizeof(comm_fd_t)); for (int pos = 0; pos < Squid_MaxFD; pos++) { @@ -1965,10 +1871,7 @@ comm_exit(void) safe_free(fd_table); safe_free(fdd_table); - if (fdc_table) { - delete[] fdc_table; - fdc_table = NULL; - } + Comm::CurrentListenerSockets.clear(); safe_free(commfd_table); } @@ -2189,169 +2092,6 @@ checkTimeouts(void) } } -/* - * New-style listen and accept routines - * - * Listen simply registers our interest in an FD for listening, - * and accept takes a callback to call when an FD has been - * accept()ed. - */ -int -comm_listen(int sock) -{ - int x; - - if ((x = listen(sock, Squid_MaxFD >> 2)) < 0) { - debugs(50, 0, "comm_listen: listen(" << (Squid_MaxFD >> 2) << ", " << sock << "): " << xstrerror()); - return x; - } - - if (Config.accept_filter && strcmp(Config.accept_filter, "none") != 0) { -#ifdef SO_ACCEPTFILTER - struct accept_filter_arg afa; - bzero(&afa, sizeof(afa)); - debugs(5, DBG_CRITICAL, "Installing accept filter '" << Config.accept_filter << "' on FD " << sock); - xstrncpy(afa.af_name, Config.accept_filter, sizeof(afa.af_name)); - x = setsockopt(sock, SOL_SOCKET, SO_ACCEPTFILTER, &afa, sizeof(afa)); - if (x < 0) - debugs(5, 0, "SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror()); -#elif defined(TCP_DEFER_ACCEPT) - int seconds = 30; - if (strncmp(Config.accept_filter, "data=", 5) == 0) - seconds = atoi(Config.accept_filter + 5); - x = setsockopt(sock, IPPROTO_TCP, TCP_DEFER_ACCEPT, &seconds, sizeof(seconds)); - if (x < 0) - debugs(5, 0, "TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror()); -#else - debugs(5, 0, "accept_filter not supported on your OS"); -#endif - } - - return sock; -} - -void -comm_accept(int fd, IOACB *handler, void *handler_data) -{ - debugs(5, 5, "comm_accept: FD " << fd << " handler: " << (void*)handler); - assert(isOpen(fd)); - - AsyncCall::Pointer call = commCbCall(5,5, "SomeCommAcceptHandler", - CommAcceptCbPtrFun(handler, handler_data)); - fdc_table[fd].subscribe(call); -} - -void -comm_accept(int fd, AsyncCall::Pointer &call) -{ - debugs(5, 5, "comm_accept: FD " << fd << " AsyncCall: " << call); - assert(isOpen(fd)); - - fdc_table[fd].subscribe(call); -} - -// Called when somebody wants to be notified when our socket accepts new -// connection. We do not probe the FD until there is such interest. -void -AcceptFD::subscribe(AsyncCall::Pointer &call) -{ - /* make sure we're not pending! */ - assert(!theCallback); - theCallback = call; - -#if OPTIMISTIC_IO - mayAcceptMore = true; // even if we failed to accept last time -#endif - - if (mayAcceptMore) - acceptNext(); - else - commSetSelect(fd, COMM_SELECT_READ, comm_accept_try, NULL, 0); -} - -bool -AcceptFD::acceptOne() -{ - // If there is no callback and we accept, we will leak the accepted FD. - // When we are running out of FDs, there is often no callback. - if (!theCallback) { - debugs(5, 5, "AcceptFD::acceptOne orphaned: FD " << fd); - // XXX: can we remove this and similar "just in case" calls and - // either listen always or listen only when there is a callback? - if (!AcceptLimiter::Instance().deferring()) - commSetSelect(fd, COMM_SELECT_READ, comm_accept_try, NULL, 0); - return false; - } - - /* - * We don't worry about running low on FDs here. Instead, - * httpAccept() will use AcceptLimiter if we reach the limit - * there. - */ - - /* Accept a new connection */ - ConnectionDetail connDetails; - int newfd = comm_old_accept(fd, connDetails); - - /* Check for errors */ - - if (newfd < 0) { - assert(theCallback != NULL); - - if (newfd == COMM_NOMESSAGE) { - /* register interest again */ - debugs(5, 5, HERE << "try later: FD " << fd << - " handler: " << *theCallback); - commSetSelect(fd, COMM_SELECT_READ, comm_accept_try, NULL, 0); - return false; - } - - // A non-recoverable error; notify the caller */ - notify(-1, COMM_ERROR, errno, connDetails); - return false; - } - - assert(theCallback != NULL); - debugs(5, 5, "AcceptFD::acceptOne accepted: FD " << fd << - " newfd: " << newfd << " from: " << connDetails.peer << - " handler: " << *theCallback); - notify(newfd, COMM_OK, 0, connDetails); - return true; -} - -void -AcceptFD::acceptNext() -{ - mayAcceptMore = acceptOne(); -} - -void -AcceptFD::notify(int newfd, comm_err_t errcode, int xerrno, const ConnectionDetail &connDetails) -{ - if (theCallback != NULL) { - typedef CommAcceptCbParams Params; - Params ¶ms = GetCommParams(theCallback); - params.fd = fd; - params.nfd = newfd; - params.details = connDetails; - params.flag = errcode; - params.xerrno = xerrno; - ScheduleCallHere(theCallback); - theCallback = NULL; - } -} - -/* - * This callback is called whenever a filedescriptor is ready - * to dupe itself and fob off an accept()ed connection - */ -static void -comm_accept_try(int fd, void *) -{ - assert(isOpen(fd)); - fdc_table[fd].acceptNext(); -} - void CommIO::Initialise() { /* Initialize done pipe signal */ @@ -2406,44 +2146,6 @@ CommIO::ResetNotifications() } } -AcceptLimiter AcceptLimiter::Instance_; - -AcceptLimiter &AcceptLimiter::Instance() -{ - return Instance_; -} - -bool -AcceptLimiter::deferring() const -{ - return deferred.size() > 0; -} - -void -AcceptLimiter::defer (int fd, Acceptor::AcceptorFunction *aFunc, void *data) -{ - debugs(5, 5, "AcceptLimiter::defer: FD " << fd << " handler: " << (void*)aFunc); - Acceptor temp; - temp.theFunction = aFunc; - temp.acceptFD = fd; - temp.theData = data; - deferred.push_back(temp); -} - -void -AcceptLimiter::kick() -{ - if (!deferring()) - return; - - /* Yes, this means the first on is the last off.... - * If the list container was a little more friendly, we could sensibly us it. - */ - Acceptor temp = deferred.pop_back(); - - comm_accept (temp.acceptFD, temp.theFunction, temp.theData); -} - /// Start waiting for a possibly half-closed connection to close // by scheduling a read callback to a monitoring handler that // will close the connection on read errors. diff --git a/src/comm.h b/src/comm.h index 1d1786b33c..e9c13cc9a5 100644 --- a/src/comm.h +++ b/src/comm.h @@ -37,7 +37,6 @@ typedef void IOCB(int fd, char *, size_t size, comm_err_t flag, int xerrno, void /* comm.c */ extern bool comm_iocallbackpending(void); /* inline candidate */ -extern int comm_listen(int fd); SQUIDCEXTERN int commSetNonBlocking(int fd); SQUIDCEXTERN int commUnsetNonBlocking(int fd); SQUIDCEXTERN void commSetCloseOnExec(int fd); @@ -103,8 +102,6 @@ SQUIDCEXTERN void comm_quick_poll_required(void); class ConnectionDetail; typedef void IOACB(int fd, int nfd, ConnectionDetail *details, comm_err_t flag, int xerrno, void *data); -extern void comm_accept(int fd, IOACB *handler, void *handler_data); -extern void comm_accept(int fd, AsyncCall::Pointer &call); extern void comm_add_close_handler(int fd, PF *, void *); extern void comm_add_close_handler(int fd, AsyncCall::Pointer &); extern void comm_remove_close_handler(int fd, PF *, void *); @@ -133,33 +130,6 @@ extern bool commHasHalfClosedMonitor(int fd); inline void commMarkHalfClosed(int fd) { commStartHalfClosedMonitor(fd); } inline bool commIsHalfClosed(int fd) { return commHasHalfClosedMonitor(fd); } -/* Not sure where these should live yet */ - -class Acceptor -{ - -public: - typedef void AcceptorFunction (int, int, ConnectionDetail *, comm_err_t, int, void *); - AcceptorFunction *theFunction; - int acceptFD; - void *theData; -}; - -class AcceptLimiter -{ - -public: - static AcceptLimiter &Instance(); - void defer (int, Acceptor::AcceptorFunction *, void *); - void kick(); - - bool deferring() const; - -private: - static AcceptLimiter Instance_; - Vector deferred; -}; - /* A comm engine that calls comm_select */ class CommSelectEngine : public AsyncEngine diff --git a/src/comm/AcceptLimiter.cc b/src/comm/AcceptLimiter.cc new file mode 100644 index 0000000000..7881db0348 --- /dev/null +++ b/src/comm/AcceptLimiter.cc @@ -0,0 +1,32 @@ +#include "config.h" +#include "comm/AcceptLimiter.h" +#include "comm/ListenStateData.h" +#include "fde.h" + +Comm::AcceptLimiter Comm::AcceptLimiter::Instance_; + +Comm::AcceptLimiter &Comm::AcceptLimiter::Instance() +{ + return Instance_; +} + +void +Comm::AcceptLimiter::defer(Comm::ListenStateData *afd) +{ + afd->isLimited++; + debugs(5, 5, HERE << "FD " << afd->fd << " x" << afd->isLimited); + deferred.push_back(afd); +} + +void +Comm::AcceptLimiter::kick() +{ + debugs(5, 5, HERE << " size=" << deferred.size()); + if (deferred.size() > 0 && fdNFree() >= RESERVED_FD) { + debugs(5, 5, HERE << " doing one."); + /* NP: shift() is equivalent to pop_front(). Giving us a FIFO queue. */ + ListenStateData *temp = deferred.shift(); + temp->isLimited--; + temp->acceptNext(); + } +} diff --git a/src/comm/AcceptLimiter.h b/src/comm/AcceptLimiter.h new file mode 100644 index 0000000000..4af96bfd25 --- /dev/null +++ b/src/comm/AcceptLimiter.h @@ -0,0 +1,41 @@ +#ifndef _SQUID_SRC_COMM_ACCEPT_LIMITER_H +#define _SQUID_SRC_COMM_ACCEPT_LIMITER_H + +#include "Array.h" + +namespace Comm { + +class ListenStateData; + +/** + * FIFO Queue holding listener socket handlers which have been activated + * ready to dupe their FD and accept() a new client connection. + * But when doing so there were not enough FD available to handle the + * new connection. These handlers are awaiting some FD to become free. + * + * defer - used only by Comm layer ListenStateData adding themselves when FD are limited. + * kick - used by Comm layer when FD are closed. + */ +class AcceptLimiter +{ + +public: + /** retrieve the global instance of the queue. */ + static AcceptLimiter &Instance(); + + /** delay accepting a new client connection. */ + void defer(Comm::ListenStateData *afd); + + /** try to accept and begin processing any delayed client connections. */ + void kick(); + +private: + static AcceptLimiter Instance_; + + /** FIFO queue */ + Vector deferred; +}; + +}; // namepace Comm + +#endif /* _SQUID_SRC_COMM_ACCEPT_LIMITER_H */ diff --git a/src/comm/ListenStateData.cc b/src/comm/ListenStateData.cc new file mode 100644 index 0000000000..9d8dd0693f --- /dev/null +++ b/src/comm/ListenStateData.cc @@ -0,0 +1,317 @@ +/* + * DEBUG: section 5 Listener Socket Handler + * AUTHOR: Harvest Derived + * + * SQUID Web Proxy Cache http://www.squid-cache.org/ + * ---------------------------------------------------------- + * + * Squid is the result of efforts by numerous individuals from + * the Internet community; see the CONTRIBUTORS file for full + * details. Many organizations have provided support for Squid's + * development; see the SPONSORS file for full details. Squid is + * Copyrighted (C) 2001 by the Regents of the University of + * California; see the COPYRIGHT file for full details. Squid + * incorporates software developed and/or copyrighted by other + * sources; see the CREDITS file for full details. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA. + * + * + * Copyright (c) 2003, Robert Collins + */ + +#include "squid.h" +#include "CommCalls.h" +#include "comm/AcceptLimiter.h" +#include "comm/comm_internal.h" +#include "comm/ListenStateData.h" +#include "ConnectionDetail.h" +#include "fde.h" +#include "SquidTime.h" + +/* + * This is not strictly needed at all. + * It's only needed by the cachemgr interface to list the currently active sockets. + * which could be done for HTTP/HTTPS by listing the http_port_list->listener->fd + * BUT, FTP data connections is a bit of a problem. + * + * AYJ: for now the old way of doing Comm:: actions sequentially in some caller + * requires this to anchor each of those Comm:: functions together. + */ +std::map Comm::CurrentListenerSockets; + +/** + * Set of listener sockets which are known to have events pending but we + * do not have enough sockets available to do the accept just yet. + */ +//std::list Comm::PendingAccepts; + + +/** + * New-style listen and accept routines + * + * Listen simply registers our interest in an FD for listening, + * and accept takes a callback to call when an FD has been + * accept()ed. + */ +int +Comm::comm_listen(int sock) +{ + int x; + + if ((x = listen(sock, Squid_MaxFD >> 2)) < 0) { + debugs(50, 0, HERE << "listen(" << (Squid_MaxFD >> 2) << ", " << sock << "): " << xstrerror()); + return x; + } + + if (Config.accept_filter && strcmp(Config.accept_filter, "none") != 0) { +#ifdef SO_ACCEPTFILTER + struct accept_filter_arg afa; + bzero(&afa, sizeof(afa)); + debugs(5, DBG_IMPORTANT, "Installing accept filter '" << Config.accept_filter << "' on FD " << sock); + xstrncpy(afa.af_name, Config.accept_filter, sizeof(afa.af_name)); + x = setsockopt(sock, SOL_SOCKET, SO_ACCEPTFILTER, &afa, sizeof(afa)); + if (x < 0) + debugs(5, DBG_CRITICAL, "SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror()); +#elif defined(TCP_DEFER_ACCEPT) + int seconds = 30; + if (strncmp(Config.accept_filter, "data=", 5) == 0) + seconds = atoi(Config.accept_filter + 5); + x = setsockopt(sock, IPPROTO_TCP, TCP_DEFER_ACCEPT, &seconds, sizeof(seconds)); + if (x < 0) + debugs(5, DBG_CRITICAL, "TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror()); +#else + debugs(5, DBG_CRITICAL, "accept_filter not supported on your OS"); +#endif + } + + return sock; +} + +// TODO make this a constructor of ListenStateData... +// better yet convert the places its used to setup AsyncCalls instead... +Comm::ListenStateData * +Comm::comm_accept(int fd, IOACB *handler, void *handler_data) +{ + debugs(5, 5, HERE << "FD " << fd << " handler: " << (void*)handler); + assert(isOpen(fd)); + + AsyncCall::Pointer call = commCbCall(5,5, "SomeCommAcceptHandler", + CommAcceptCbPtrFun(handler, handler_data)); + + return new Comm::ListenStateData(fd, call, false); +} + +Comm::ListenStateData::ListenStateData(int aFd, AsyncCall::Pointer &call, bool accept_many) : + fd(aFd), + theCallback(call), + mayAcceptMore(accept_many) +{ + assert(aFd >= 0); + debugs(5, 5, HERE << "FD " << fd << " AsyncCall: " << call); + assert(isOpen(aFd)); + + CurrentListenerSockets[fd] = this; + + errcode = comm_listen(fd); + commSetSelect(fd, COMM_SELECT_READ, doAccept, this, 0); +} + +Comm::ListenStateData::~ListenStateData() +{ + // un-register listener before closing the FD. + // TODO: is this the right way to remove from a std::map<> ? + if (CurrentListenerSockets[fd]) + CurrentListenerSockets[fd] = NULL; + + comm_close(fd); + fd = -1; +} + +/** + * This private callback is called whenever a filedescriptor is ready + * to dupe itself and fob off an accept()ed connection + * + * It will either do that accept operation. Or if there are not enough FD + * available to do the clone safely will push the listening FD into a list + * of deferred operations. The list gets kicked and the dupe/accept() actually + * done later when enough sockets become available. + */ +void +Comm::ListenStateData::doAccept(int fd, void *data) +{ + debugs(5, 2, HERE << "New connection on FD " << fd); + + assert(isOpen(fd)); + ListenStateData *afd = static_cast(data); + + if (!okToAccept()) { + AcceptLimiter::Instance().defer(afd); + } + else { + afd->acceptNext(); + } + commSetSelect(fd, COMM_SELECT_READ, Comm::ListenStateData::doAccept, afd, 0); +} + +bool +Comm::ListenStateData::okToAccept() +{ + static time_t last_warn = 0; + + if (fdNFree() >= RESERVED_FD) + return true; + + if (last_warn + 15 < squid_curtime) { + debugs(5, DBG_CRITICAL, "WARNING! Your cache is running out of filedescriptors"); + last_warn = squid_curtime; + } + + return false; +} + +bool +Comm::ListenStateData::acceptOne() +{ + /* + * We don't worry about running low on FDs here. Instead, + * doAccept() will use AcceptLimiter if we reach the limit + * there. + */ + + /* Accept a new connection */ + ConnectionDetail connDetails; + int newfd = oldAccept(connDetails); + + /* Check for errors */ + if (newfd < 0) { + + if (newfd == COMM_NOMESSAGE) { + /* register interest again */ + debugs(5, 5, HERE << "try later: FD " << fd << " handler: " << *theCallback); + commSetSelect(fd, COMM_SELECT_READ, doAccept, this, 0); + return false; + } + + // A non-recoverable error; notify the caller */ + debugs(5, 5, HERE << "non-recoverable error: FD " << fd << " handler: " << *theCallback); + notify(-1, COMM_ERROR, errno, connDetails); + return false; + } + + debugs(5, 5, HERE << "accepted: FD " << fd << + " newfd: " << newfd << " from: " << connDetails.peer << + " handler: " << *theCallback); + notify(newfd, COMM_OK, 0, connDetails); + return true; +} + +void +Comm::ListenStateData::acceptNext() +{ + assert(isOpen(fd)); + debugs(5, 2, HERE << "connection on FD " << fd); + mayAcceptMore = acceptOne(); +} + +void +Comm::ListenStateData::notify(int newfd, comm_err_t errcode, int xerrno, const ConnectionDetail &connDetails) +{ + // listener socket handlers just abandon the port with COMM_ERR_CLOSING + // it should only happen when this object is deleted... + if (errcode == COMM_ERR_CLOSING) { + return; + } + + if (theCallback != NULL) { + typedef CommAcceptCbParams Params; + Params ¶ms = GetCommParams(theCallback); + params.fd = fd; + params.nfd = newfd; + params.details = connDetails; + params.flag = errcode; + params.xerrno = xerrno; + ScheduleCallHere(theCallback); + if (!mayAcceptMore) + theCallback = NULL; + } +} + +/** + * accept() and process + * Wait for an incoming connection on FD. FD should be a socket returned + * from comm_listen. */ +int +Comm::ListenStateData::oldAccept(ConnectionDetail &details) +{ + PROF_start(comm_accept); + statCounter.syscalls.sock.accepts++; + int sock; + struct addrinfo *gai = NULL; + details.me.InitAddrInfo(gai); + + if ((sock = accept(fd, gai->ai_addr, &gai->ai_addrlen)) < 0) { + + details.me.FreeAddrInfo(gai); + + PROF_stop(comm_accept); + + if (ignoreErrno(errno)) { + debugs(50, 5, HERE << "FD " << fd << ": " << xstrerror()); + return COMM_NOMESSAGE; + } else if (ENFILE == errno || EMFILE == errno) { + debugs(50, 3, HERE << "FD " << fd << ": " << xstrerror()); + return COMM_ERROR; + } else { + debugs(50, 1, HERE << "FD " << fd << ": " << xstrerror()); + return COMM_ERROR; + } + } + + details.peer = *gai; + + details.me.InitAddrInfo(gai); + + details.me.SetEmpty(); + getsockname(sock, gai->ai_addr, &gai->ai_addrlen); + details.me = *gai; + + commSetCloseOnExec(sock); + + /* fdstat update */ + fd_open(sock, FD_SOCKET, "HTTP Request"); + + fdd_table[sock].close_file = NULL; + fdd_table[sock].close_line = 0; + + fde *F = &fd_table[sock]; + details.peer.NtoA(F->ipaddr,MAX_IPSTRLEN); + F->remote_port = details.peer.GetPort(); + F->local_addr.SetPort(details.me.GetPort()); +#if USE_IPV6 + F->sock_family = AF_INET; +#else + F->sock_family = details.me.IsIPv4()?AF_INET:AF_INET6; +#endif + details.me.FreeAddrInfo(gai); + + commSetNonBlocking(sock); + + /* IFF the socket is (tproxy) transparent, pass the flag down to allow spoofing */ + F->flags.transparent = fd_table[fd].flags.transparent; + + PROF_stop(comm_accept); + return sock; +} diff --git a/src/comm/ListenStateData.h b/src/comm/ListenStateData.h new file mode 100644 index 0000000000..f43997eeea --- /dev/null +++ b/src/comm/ListenStateData.h @@ -0,0 +1,58 @@ +#ifndef SQUID_LISTENERSTATEDATA_H +#define SQUID_LISTENERSTATEDATA_H + +#include "config.h" +#include "base/AsyncCall.h" +#include "comm.h" +#if HAVE_MAP +#include +#endif + +class ConnectionDetail; + +namespace Comm { + +class ListenStateData +{ + +public: + ListenStateData(int fd, AsyncCall::Pointer &call, bool accept_many); + ListenStateData(const ListenStateData &r); // not implemented. + ~ListenStateData(); + + void subscribe(AsyncCall::Pointer &call); + void acceptNext(); + void notify(int newfd, comm_err_t, int xerrno, const ConnectionDetail &); + + int fd; + + /// errno code if any happened so far. + int errcode; + + /// whether this socket is delayed and on the AcceptLimiter queue. + int32_t isLimited; + +private: + /// Method to test if there are enough file escriptors to open a new client connection + /// if not the accept() will be postponed + static bool okToAccept(); + + /// Method callback for whenever an FD is ready to accept a client connection. + static void doAccept(int fd, void *data); + + bool acceptOne(); + int oldAccept(ConnectionDetail &details); + + AsyncCall::Pointer theCallback; + bool mayAcceptMore; +}; + +extern std::map CurrentListenerSockets; + +// remaining legacy functions. TODO replace all uses with the ListenData constructor +extern int comm_listen(int fd); +extern ListenStateData *comm_accept(int fd, IOACB *handler, void *handler_data); + +}; // namespace Comm + +#endif /* SQUID_LISTENERSTATEDATA_H */ diff --git a/src/comm/Makefile.am b/src/comm/Makefile.am new file mode 100644 index 0000000000..09cb1c1076 --- /dev/null +++ b/src/comm/Makefile.am @@ -0,0 +1,13 @@ +include $(top_srcdir)/src/Common.am +include $(top_srcdir)/src/TestHeaders.am + +noinst_LTLIBRARIES = libcomm-listener.la + +## Library holding listener comm socket handlers +libcomm_listener_la_SOURCES= \ + AcceptLimiter.cc \ + AcceptLimiter.h \ + ListenStateData.cc \ + ListenStateData.h \ + \ + comm_internal.h diff --git a/src/comm/comm_internal.h b/src/comm/comm_internal.h new file mode 100644 index 0000000000..a328545060 --- /dev/null +++ b/src/comm/comm_internal.h @@ -0,0 +1,16 @@ +#ifndef SQUID_COMM_COMM_INTERNAL_H +#define SQUID_COMM_COMM_INTERNAL_H + +/* misc collection of bits shared by Comm code, but not needed by the rest of Squid. */ + +struct _fd_debug_t { + char const *close_file; + int close_line; +}; + +typedef struct _fd_debug_t fd_debug_t; +extern fd_debug_t *fdd_table; + +extern bool isOpen(const int fd); + +#endif diff --git a/src/fde.h b/src/fde.h index 3b503d2b51..7d8f9a7400 100644 --- a/src/fde.h +++ b/src/fde.h @@ -125,4 +125,6 @@ private: }; +SQUIDCEXTERN int fdNFree(void); + #endif /* SQUID_FDE_H */ diff --git a/src/ftp.cc b/src/ftp.cc index 8541d7ab4c..54a8520bae 100644 --- a/src/ftp.cc +++ b/src/ftp.cc @@ -33,26 +33,28 @@ */ #include "squid.h" -#include "Store.h" -#include "HttpRequest.h" -#include "HttpReply.h" +#include "comm.h" +#include "comm/ListenStateData.h" +#include "ConnectionDetail.h" #include "errorpage.h" #include "fde.h" -#include "comm.h" -#include "HttpHeaderRange.h" +#include "forward.h" #include "HttpHdrContRange.h" +#include "HttpHeaderRange.h" #include "HttpHeader.h" +#include "HttpRequest.h" +#include "HttpReply.h" +#include "MemBuf.h" +#include "Server.h" +#include "SquidTime.h" +#include "Store.h" +#include "URLScheme.h" +#include "wordlist.h" + #if DELAY_POOLS #include "DelayPools.h" #include "MemObject.h" #endif -#include "ConnectionDetail.h" -#include "forward.h" -#include "Server.h" -#include "MemBuf.h" -#include "wordlist.h" -#include "SquidTime.h" -#include "URLScheme.h" /** \defgroup ServerProtocolFTPInternal Server-Side FTP Internals @@ -139,11 +141,21 @@ public: /// called after the socket is opened, sets up close handler void opened(int aFd, const AsyncCall::Pointer &aCloser); - void close(); /// clears the close handler and calls comm_close - void clear(); /// just resets fd and close handler + /** Handles all operations needed to properly close the active channel FD. + * clearing the close handler, clearing the listen socket properly, and calling comm_close + */ + void close(); + + void clear(); /// just resets fd and close handler. does not close active connections. int fd; /// channel descriptor; \todo: remove because the closer has it + /** Current listening socket handler. delete on shutdown or abort. + * FTP stores a copy of the FD in the field fd above. + * Use close() to properly close the channel. + */ + Comm::ListenStateData *listener; + private: AsyncCall::Pointer closer; /// Comm close handler callback }; @@ -434,6 +446,11 @@ FtpStateData::ctrlClosed(const CommCloseCbParams &io) void FtpStateData::dataClosed(const CommCloseCbParams &io) { + if (data.listener) { + delete data.listener; + data.listener = NULL; + data.fd = -1; + } data.clear(); failed(ERR_FTP_FAILURE, 0); /* failed closes ctrl.fd and frees ftpState */ @@ -2858,7 +2875,7 @@ ftpOpenListenSocket(FtpStateData * ftpState, int fallback) int on = 1; int x = 0; - /// Close old data channel, if any. We may open a new one below. + /// Close old data channels, if any. We may open a new one below. ftpState->data.close(); /* @@ -2902,7 +2919,12 @@ ftpOpenListenSocket(FtpStateData * ftpState, int fallback) return -1; } - if (comm_listen(fd) < 0) { + typedef CommCbMemFunT acceptDialer; + AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection", + acceptDialer(ftpState, &FtpStateData::ftpAcceptDataConnection)); + ftpState->data.listener = new Comm::ListenStateData(fd, acceptCall, false); + + if (!ftpState->data.listener || ftpState->data.listener->errcode < 0) { comm_close(fd); return -1; } @@ -3057,8 +3079,8 @@ void FtpStateData::ftpAcceptDataConnection(const CommAcceptCbParams &io) char ntoapeer[MAX_IPSTRLEN]; debugs(9, 3, "ftpAcceptDataConnection"); - if (io.flag == COMM_ERR_CLOSING) - return; + // one connection accepted. the handler has stopped listening. drop our local pointer to it. + data.listener = NULL; if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) { abortTransaction("entry aborted when accepting data conn"); @@ -3079,17 +3101,20 @@ void FtpStateData::ftpAcceptDataConnection(const CommAcceptCbParams &io) io.details.peer << "), expecting " << fd_table[ctrl.fd].ipaddr); + /* close the bad soures connection down ASAP. */ comm_close(io.nfd); + + /* we are ony accepting once, so need to re-open the listener socket. */ typedef CommCbMemFunT acceptDialer; AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection", acceptDialer(this, &FtpStateData::ftpAcceptDataConnection)); - comm_accept(data.fd, acceptCall); + data.listener = new Comm::ListenStateData(data.fd, acceptCall, false); return; } } if (io.flag != COMM_OK) { - debugs(9, DBG_IMPORTANT, "ftpHandleDataAccept: comm_accept(" << io.nfd << "): " << xstrerr(io.xerrno)); + debugs(9, DBG_IMPORTANT, "ftpHandleDataAccept: FD " << io.nfd << ": " << xstrerr(io.xerrno)); /** \todo XXX Need to set error message */ ftpFail(this); return; @@ -3220,7 +3245,7 @@ void FtpStateData::readStor() AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection", acceptDialer(this, &FtpStateData::ftpAcceptDataConnection)); - comm_accept(data.fd, acceptCall); + data.listener = new Comm::ListenStateData(data.fd, acceptCall, false); } else { debugs(9, DBG_IMPORTANT, HERE << "Unexpected reply code "<< std::setfill('0') << std::setw(3) << code); ftpFail(this); @@ -3356,7 +3381,7 @@ ftpReadList(FtpStateData * ftpState) AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection", acceptDialer(ftpState, &FtpStateData::ftpAcceptDataConnection)); - comm_accept(ftpState->data.fd, acceptCall); + ftpState->data.listener = new Comm::ListenStateData(ftpState->data.fd, acceptCall, false); /* * Cancel the timeout on the Control socket and establish one * on the data socket @@ -3417,7 +3442,7 @@ ftpReadRetr(FtpStateData * ftpState) typedef CommCbMemFunT acceptDialer; AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection", acceptDialer(ftpState, &FtpStateData::ftpAcceptDataConnection)); - comm_accept(ftpState->data.fd, acceptCall); + ftpState->data.listener = new Comm::ListenStateData(ftpState->data.fd, acceptCall, false); /* * Cancel the timeout on the Control socket and establish one * on the data socket @@ -3982,7 +4007,7 @@ FtpStateData::closeServer() /** * Did we close all FTP server connection(s)? * - \retval true Both server control and data channels are closed. + \retval true Both server control and data channels are closed. And not waitigng for a new data connection to open. \retval false Either control channel or data is still active. */ bool @@ -4061,7 +4086,15 @@ FtpChannel::opened(int aFd, const AsyncCall::Pointer &aCloser) void FtpChannel::close() { - if (fd >= 0) { + // channels with active listeners will be closed when the listener handler dies. + if (listener) { + delete listener; + listener = NULL; + comm_remove_close_handler(fd, closer); + closer = NULL; + fd = -1; + } + else if (fd >= 0) { comm_remove_close_handler(fd, closer); closer = NULL; comm_close(fd); // we do not expect to be called back diff --git a/src/protos.h b/src/protos.h index 46f5ba3739..f09e710231 100644 --- a/src/protos.h +++ b/src/protos.h @@ -143,7 +143,6 @@ SQUIDCEXTERN void fd_open(int fd, unsigned int type, const char *); SQUIDCEXTERN void fd_note(int fd, const char *); SQUIDCEXTERN void fd_bytes(int fd, int len, unsigned int type); SQUIDCEXTERN void fdDumpOpen(void); -SQUIDCEXTERN int fdNFree(void); SQUIDCEXTERN int fdUsageHigh(void); SQUIDCEXTERN void fdAdjustReserved(void); diff --git a/src/store.cc b/src/store.cc index e16f869e78..71c3954181 100644 --- a/src/store.cc +++ b/src/store.cc @@ -35,6 +35,7 @@ #include "squid.h" #include "event.h" +#include "fde.h" #include "Store.h" #include "CacheManager.h" #include "StoreClient.h" diff --git a/src/tests/stub_fd.cc b/src/tests/stub_fd.cc index 50522e538d..bace6c07bf 100644 --- a/src/tests/stub_fd.cc +++ b/src/tests/stub_fd.cc @@ -32,7 +32,8 @@ * */ -#include "squid.h" +#include "config.h" +#include "fde.h" int fdNFree(void) -- 2.47.2