linux/types.h \
machine/byte_swap.h \
malloc.h \
+ map \
math.h \
memory.h \
mount.h \
src/adaptation/Makefile \
src/adaptation/icap/Makefile \
src/adaptation/ecap/Makefile \
+ src/comm/Makefile \
src/esi/Makefile \
src/eui/Makefile \
src/icmp/Makefile \
LoadableModules.h \
LoadableModules.cc
-SUBDIRS = base eui acl fs repl auth ip icmp ident
+SUBDIRS = base comm eui acl fs repl auth ip icmp ident
if USE_ADAPTATION
SUBDIRS += adaptation
squid_LDADD = \
$(COMMON_LIBS) \
+ comm/libcomm-listener.la \
icmp/libicmp.la icmp/libicmp-core.la \
@XTRA_OBJS@ \
@DISK_LINKOBJS@ \
wordlist.cc
nodist_tests_testCacheManager_SOURCES = \
$(BUILT_SOURCES)
+# comm.cc only requires comm/libcomm-listener.la until fdc_table is dead.
tests_testCacheManager_LDADD = \
$(COMMON_LIBS) \
+ comm/libcomm-listener.la \
icmp/libicmp.la icmp/libicmp-core.la \
@REPL_OBJS@ \
${ADAPTATION_LIBS} \
tests_testEvent_LDADD = \
$(COMMON_LIBS) \
icmp/libicmp.la icmp/libicmp-core.la \
+ comm/libcomm-listener.la \
@REPL_OBJS@ \
${ADAPTATION_LIBS} \
$(ESI_LIBS) \
tests_testEventLoop_LDADD = \
$(COMMON_LIBS) \
icmp/libicmp.la icmp/libicmp-core.la \
+ comm/libcomm-listener.la \
@REPL_OBJS@ \
${ADAPTATION_LIBS} \
$(ESI_LIBS) \
tests_test_http_range_LDADD = \
$(COMMON_LIBS) \
icmp/libicmp.la icmp/libicmp-core.la \
+ comm/libcomm-listener.la \
@REPL_OBJS@ \
${ADAPTATION_LIBS} \
$(ESI_LIBS) \
tests_testHttpRequest_LDADD = \
$(COMMON_LIBS) \
icmp/libicmp.la icmp/libicmp-core.la \
+ comm/libcomm-listener.la \
@REPL_OBJS@ \
${ADAPTATION_LIBS} \
$(ESI_LIBS) \
tests/testStoreHashIndex.h \
tests/TestSwapDir.cc \
tests/TestSwapDir.h \
- tests/stub_fd.cc \
tests/stub_HttpReply.cc \
tests/stub_cache_manager.cc \
- $(STORE_TEST_SOURCES)
+ $(STORE_TEST_SOURCES) \
+ tests/stub_fd.cc
nodist_tests_testStore_SOURCES= \
$(TESTSOURCES) \
tests_testURL_LDADD = \
$(COMMON_LIBS) \
icmp/libicmp.la icmp/libicmp-core.la \
+ comm/libcomm-listener.la \
@REGEXLIB@ \
@REPL_OBJS@ \
${ADAPTATION_LIBS} \
-
/*
* $Id$
- *
*/
#include "squid.h"
http_port_list::~http_port_list()
{
+ delete listener;
+
safe_free(name);
safe_free(defaultsite);
safe_free(protocol);
//#include "typedefs.h"
#include "cbdata.h"
+#include "comm/ListenStateData.h"
struct http_port_list {
http_port_list(const char *aProtocol);
unsigned int timeout;
} tcp_keepalive;
+ /**
+ * The FD listening socket handler.
+ * If not NULL we are actively listening for client requests.
+ * delete to close the socket.
+ */
+ Comm::ListenStateData *listener;
+
#if USE_SSL
// XXX: temporary hack to ease move of SSL options to http_port
http_port_list &http;
*/
#include "squid.h"
+
+#include "acl/FilledChecklist.h"
+#include "auth/UserRequest.h"
+#include "ChunkedCodingParser.h"
#include "client_side.h"
+#include "client_side_reply.h"
+#include "client_side_request.h"
+#include "ClientRequestContext.h"
#include "clientStream.h"
-#include "ProtoPort.h"
-#include "auth/UserRequest.h"
-#include "Store.h"
#include "comm.h"
+#include "comm/ListenStateData.h"
+#include "ConnectionDetail.h"
+#include "fde.h"
#include "HttpHdrContRange.h"
#include "HttpReply.h"
#include "HttpRequest.h"
#include "ident/Config.h"
#include "ident/Ident.h"
#include "ip/IpIntercept.h"
-#include "MemObject.h"
-#include "fde.h"
-#include "client_side_request.h"
-#include "acl/FilledChecklist.h"
-#include "ConnectionDetail.h"
-#include "client_side_reply.h"
-#include "ClientRequestContext.h"
#include "MemBuf.h"
+#include "MemObject.h"
+#include "ProtoPort.h"
#include "SquidTime.h"
-#include "ChunkedCodingParser.h"
+#include "Store.h"
#if LINGERING_CLOSE
#define comm_close comm_lingering_close
static CSD clientSocketDetach;
static void clientSetKeepaliveFlag(ClientHttpRequest *);
static int clientIsContentLengthValid(HttpRequest * r);
-static bool okToAccept();
static int clientIsRequestBodyValid(int64_t bodyLength);
static int clientIsRequestBodyTooLargeForPolicy(int64_t bodyLength);
#endif
}
-
-
static void
clientLifetimeTimeout(int fd, void *data)
{
comm_close(fd);
}
-static bool
-okToAccept()
-{
- static time_t last_warn = 0;
-
- if (fdNFree() >= RESERVED_FD)
- return true;
-
- if (last_warn + 15 < squid_curtime) {
- debugs(33, 0, HERE << "WARNING! Your cache is running out of filedescriptors");
- last_warn = squid_curtime;
- }
-
- return false;
-}
-
ConnStateData *
connStateCreate(const IpAddress &peer, const IpAddress &me, int fd, http_port_list *port)
{
http_port_list *s = (http_port_list *)data;
ConnStateData *connState = NULL;
- if (flag == COMM_ERR_CLOSING) {
- return;
- }
-
- if (!okToAccept())
- AcceptLimiter::Instance().defer (sock, httpAccept, data);
- else
- /* kick off another one for later */
- comm_accept(sock, httpAccept, data);
-
if (flag != COMM_OK) {
debugs(33, 1, "httpAccept: FD " << sock << ": accept failure: " << xstrerr(xerrno));
return;
https_port_list *s = (https_port_list *)data;
SSL_CTX *sslContext = s->sslContext;
- if (flag == COMM_ERR_CLOSING) {
- return;
- }
-
- if (!okToAccept())
- AcceptLimiter::Instance().defer (sock, httpsAccept, data);
- else
- /* kick off another one for later */
- comm_accept(sock, httpsAccept, data);
-
if (flag != COMM_OK) {
errno = xerrno;
debugs(33, 1, "httpsAccept: FD " << sock << ": accept failure: " << xstrerr(xerrno));
if (identChecklist.fastCheck())
Ident::Start(details->me, details->peer, clientIdentDone, connState);
}
-
#endif
if (s->http.tcp_keepalive.enabled) {
++bumpCount;
#endif
+ /* AYJ: 2009-12-27: bit bumpy. new ListenStateData(...) should be doing all the Comm:: stuff ... */
+
enter_suid();
if (s->spoof_client_ip) {
if (fd < 0)
continue;
- comm_listen(fd);
+ AsyncCall::Pointer call = commCbCall(5,5, "SomeCommAcceptHandler(httpAccept)",
+ CommAcceptCbPtrFun(httpAccept, s));
- comm_accept(fd, httpAccept, s);
+ s->listener = new Comm::ListenStateData(fd, call, true);
debugs(1, 1, "Accepting " <<
(s->intercepted ? " intercepted" : "") <<
if (fd < 0)
continue;
- comm_listen(fd);
+ AsyncCall::Pointer call = commCbCall(5,5, "SomeCommAcceptHandler(httpsAccept)",
+ CommAcceptCbPtrFun(httpsAccept, s));
- comm_accept(fd, httpsAccept, s);
+ s->listener = new Comm::ListenStateData(fd, call, true);
debugs(1, 1, "Accepting HTTPS connections at " << s->http.s << ", FD " << fd << ".");
{
clientHttpConnectionsOpen();
#if USE_SSL
-
clientHttpsConnectionsOpen();
#endif
void
clientHttpConnectionsClose(void)
{
- int i;
+ for (http_port_list *s = Config.Sockaddr.http; s; s = s->next) {
+ if (s->listener) {
+ debugs(1, 1, "FD " << s->listener->fd << " Closing HTTP connection");
+ delete s->listener;
+ s->listener = NULL;
+ }
+ }
- for (i = 0; i < NHttpSockets; i++) {
- if (HttpSockets[i] >= 0) {
- debugs(1, 1, "FD " << HttpSockets[i] <<
- " Closing HTTP connection");
- comm_close(HttpSockets[i]);
- HttpSockets[i] = -1;
+#if USE_SSL
+ for (http_port_list *s = Config.Sockaddr.https; s; s = s->next) {
+ if (s->listener) {
+ debugs(1, 1, "FD " << s->listener->fd << " Closing HTTPS connection");
+ delete s->listener;
+ s->listener = NULL;
}
}
+#endif
+
+ // TODO see if we can drop HttpSockets array entirely */
+ for (int i = 0; i < NHttpSockets; i++) {
+ HttpSockets[i] = -1;
+ }
NHttpSockets = 0;
}
#include "comm.h"
#include "event.h"
#include "fde.h"
+#include "comm/AcceptLimiter.h"
+#include "comm/comm_internal.h"
+#include "comm/ListenStateData.h"
#include "CommIO.h"
#include "CommRead.h"
#include "ConnectionDetail.h"
static PF commHandleWrite;
static IPH commConnectDnsHandle;
-static PF comm_accept_try;
-
-class AcceptFD
-{
-
-public:
- AcceptFD(int aFd = -1): fd(aFd), theCallback(0), mayAcceptMore(false) {}
-
- void subscribe(AsyncCall::Pointer &call);
- void acceptNext();
- void notify(int newfd, comm_err_t, int xerrno, const ConnectionDetail &);
-
- int fd;
-
-private:
- bool acceptOne();
-
- AsyncCall::Pointer theCallback;
- bool mayAcceptMore;
-};
-
typedef enum {
COMM_CB_READ = 1,
COMM_CB_DERIVED,
} comm_callback_t;
-struct _fd_debug_t {
- char const *close_file;
- int close_line;
-};
-
-typedef struct _fd_debug_t fd_debug_t;
-
static MemAllocator *conn_close_pool = NULL;
-AcceptFD *fdc_table = NULL; // TODO: rename. And use Vector<>?
fd_debug_t *fdd_table = NULL;
-static bool
+bool
isOpen(const int fd)
{
return fd_table[fd].flags.open != 0;
return status;
}
-/* Wait for an incoming connection on FD. FD should be a socket returned
- * from comm_listen. */
-static int
-comm_old_accept(int fd, ConnectionDetail &details)
-{
- PROF_start(comm_accept);
- statCounter.syscalls.sock.accepts++;
- int sock;
- struct addrinfo *gai = NULL;
- details.me.InitAddrInfo(gai);
-
- if ((sock = accept(fd, gai->ai_addr, &gai->ai_addrlen)) < 0) {
-
- details.me.FreeAddrInfo(gai);
-
- PROF_stop(comm_accept);
-
- if (ignoreErrno(errno)) {
- debugs(50, 5, "comm_old_accept: FD " << fd << ": " << xstrerror());
- return COMM_NOMESSAGE;
- } else if (ENFILE == errno || EMFILE == errno) {
- debugs(50, 3, "comm_old_accept: FD " << fd << ": " << xstrerror());
- return COMM_ERROR;
- } else {
- debugs(50, 1, "comm_old_accept: FD " << fd << ": " << xstrerror());
- return COMM_ERROR;
- }
- }
-
- details.peer = *gai;
-
- details.me.InitAddrInfo(gai);
-
- details.me.SetEmpty();
- getsockname(sock, gai->ai_addr, &gai->ai_addrlen);
- details.me = *gai;
-
- commSetCloseOnExec(sock);
-
- /* fdstat update */
- fd_open(sock, FD_SOCKET, "HTTP Request");
- fdd_table[sock].close_file = NULL;
- fdd_table[sock].close_line = 0;
- fde *F = &fd_table[sock];
- details.peer.NtoA(F->ipaddr,MAX_IPSTRLEN);
- F->remote_port = details.peer.GetPort();
- F->local_addr.SetPort(details.me.GetPort());
-#if USE_IPV6
- F->sock_family = AF_INET;
-#else
- F->sock_family = details.me.IsIPv4()?AF_INET:AF_INET6;
-#endif
- details.me.FreeAddrInfo(gai);
-
- commSetNonBlocking(sock);
-
- /* IFF the socket is (tproxy) transparent, pass the flag down to allow spoofing */
- F->flags.transparent = fd_table[fd].flags.transparent;
-
- PROF_stop(comm_accept);
- return sock;
-}
-
void
commCallCloseHandlers(int fd)
{
}
-
void
comm_close_complete(int fd, void *data)
{
close(fd);
- fdc_table[fd] = AcceptFD(fd);
-
statCounter.syscalls.sock.closes++;
/* When an fd closes, give accept() a chance, if need be */
-
- if (fdNFree() >= RESERVED_FD)
- AcceptLimiter::Instance().kick();
-
+ Comm::AcceptLimiter::Instance().kick();
}
/*
commio_finish_callback(fd, COMMIO_FD_READCB(fd), COMM_ERR_CLOSING, errno);
}
- // notify accept handlers
- fdc_table[fd].notify(-1, COMM_ERR_CLOSING, 0, ConnectionDetail());
+ // Listening FD need to be closed by deleting their ListenStateData object.
+ assert(Comm::CurrentListenerSockets[fd] == NULL);
commCallCloseHandlers(fd);
fd_table =(fde *) xcalloc(Squid_MaxFD, sizeof(fde));
fdd_table = (fd_debug_t *)xcalloc(Squid_MaxFD, sizeof(fd_debug_t));
- fdc_table = new AcceptFD[Squid_MaxFD];
- for (int pos = 0; pos < Squid_MaxFD; ++pos) {
- fdc_table[pos] = AcceptFD(pos);
- }
+ /* make sure we have an empty listening socket map */
+ Comm::CurrentListenerSockets.clear();
+
+ /* make sure the accept() socket FIFO delay queue exists */
+ Comm::AcceptLimiter::Instance();
commfd_table = (comm_fd_t *) xcalloc(Squid_MaxFD, sizeof(comm_fd_t));
for (int pos = 0; pos < Squid_MaxFD; pos++) {
safe_free(fd_table);
safe_free(fdd_table);
- if (fdc_table) {
- delete[] fdc_table;
- fdc_table = NULL;
- }
+ Comm::CurrentListenerSockets.clear();
safe_free(commfd_table);
}
}
}
-/*
- * New-style listen and accept routines
- *
- * Listen simply registers our interest in an FD for listening,
- * and accept takes a callback to call when an FD has been
- * accept()ed.
- */
-int
-comm_listen(int sock)
-{
- int x;
-
- if ((x = listen(sock, Squid_MaxFD >> 2)) < 0) {
- debugs(50, 0, "comm_listen: listen(" << (Squid_MaxFD >> 2) << ", " << sock << "): " << xstrerror());
- return x;
- }
-
- if (Config.accept_filter && strcmp(Config.accept_filter, "none") != 0) {
-#ifdef SO_ACCEPTFILTER
- struct accept_filter_arg afa;
- bzero(&afa, sizeof(afa));
- debugs(5, DBG_CRITICAL, "Installing accept filter '" << Config.accept_filter << "' on FD " << sock);
- xstrncpy(afa.af_name, Config.accept_filter, sizeof(afa.af_name));
- x = setsockopt(sock, SOL_SOCKET, SO_ACCEPTFILTER, &afa, sizeof(afa));
- if (x < 0)
- debugs(5, 0, "SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror());
-#elif defined(TCP_DEFER_ACCEPT)
- int seconds = 30;
- if (strncmp(Config.accept_filter, "data=", 5) == 0)
- seconds = atoi(Config.accept_filter + 5);
- x = setsockopt(sock, IPPROTO_TCP, TCP_DEFER_ACCEPT, &seconds, sizeof(seconds));
- if (x < 0)
- debugs(5, 0, "TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror());
-#else
- debugs(5, 0, "accept_filter not supported on your OS");
-#endif
- }
-
- return sock;
-}
-
-void
-comm_accept(int fd, IOACB *handler, void *handler_data)
-{
- debugs(5, 5, "comm_accept: FD " << fd << " handler: " << (void*)handler);
- assert(isOpen(fd));
-
- AsyncCall::Pointer call = commCbCall(5,5, "SomeCommAcceptHandler",
- CommAcceptCbPtrFun(handler, handler_data));
- fdc_table[fd].subscribe(call);
-}
-
-void
-comm_accept(int fd, AsyncCall::Pointer &call)
-{
- debugs(5, 5, "comm_accept: FD " << fd << " AsyncCall: " << call);
- assert(isOpen(fd));
-
- fdc_table[fd].subscribe(call);
-}
-
-// Called when somebody wants to be notified when our socket accepts new
-// connection. We do not probe the FD until there is such interest.
-void
-AcceptFD::subscribe(AsyncCall::Pointer &call)
-{
- /* make sure we're not pending! */
- assert(!theCallback);
- theCallback = call;
-
-#if OPTIMISTIC_IO
- mayAcceptMore = true; // even if we failed to accept last time
-#endif
-
- if (mayAcceptMore)
- acceptNext();
- else
- commSetSelect(fd, COMM_SELECT_READ, comm_accept_try, NULL, 0);
-}
-
-bool
-AcceptFD::acceptOne()
-{
- // If there is no callback and we accept, we will leak the accepted FD.
- // When we are running out of FDs, there is often no callback.
- if (!theCallback) {
- debugs(5, 5, "AcceptFD::acceptOne orphaned: FD " << fd);
- // XXX: can we remove this and similar "just in case" calls and
- // either listen always or listen only when there is a callback?
- if (!AcceptLimiter::Instance().deferring())
- commSetSelect(fd, COMM_SELECT_READ, comm_accept_try, NULL, 0);
- return false;
- }
-
- /*
- * We don't worry about running low on FDs here. Instead,
- * httpAccept() will use AcceptLimiter if we reach the limit
- * there.
- */
-
- /* Accept a new connection */
- ConnectionDetail connDetails;
- int newfd = comm_old_accept(fd, connDetails);
-
- /* Check for errors */
-
- if (newfd < 0) {
- assert(theCallback != NULL);
-
- if (newfd == COMM_NOMESSAGE) {
- /* register interest again */
- debugs(5, 5, HERE << "try later: FD " << fd <<
- " handler: " << *theCallback);
- commSetSelect(fd, COMM_SELECT_READ, comm_accept_try, NULL, 0);
- return false;
- }
-
- // A non-recoverable error; notify the caller */
- notify(-1, COMM_ERROR, errno, connDetails);
- return false;
- }
-
- assert(theCallback != NULL);
- debugs(5, 5, "AcceptFD::acceptOne accepted: FD " << fd <<
- " newfd: " << newfd << " from: " << connDetails.peer <<
- " handler: " << *theCallback);
- notify(newfd, COMM_OK, 0, connDetails);
- return true;
-}
-
-void
-AcceptFD::acceptNext()
-{
- mayAcceptMore = acceptOne();
-}
-
-void
-AcceptFD::notify(int newfd, comm_err_t errcode, int xerrno, const ConnectionDetail &connDetails)
-{
- if (theCallback != NULL) {
- typedef CommAcceptCbParams Params;
- Params ¶ms = GetCommParams<Params>(theCallback);
- params.fd = fd;
- params.nfd = newfd;
- params.details = connDetails;
- params.flag = errcode;
- params.xerrno = xerrno;
- ScheduleCallHere(theCallback);
- theCallback = NULL;
- }
-}
-
-/*
- * This callback is called whenever a filedescriptor is ready
- * to dupe itself and fob off an accept()ed connection
- */
-static void
-comm_accept_try(int fd, void *)
-{
- assert(isOpen(fd));
- fdc_table[fd].acceptNext();
-}
-
void CommIO::Initialise()
{
/* Initialize done pipe signal */
}
}
-AcceptLimiter AcceptLimiter::Instance_;
-
-AcceptLimiter &AcceptLimiter::Instance()
-{
- return Instance_;
-}
-
-bool
-AcceptLimiter::deferring() const
-{
- return deferred.size() > 0;
-}
-
-void
-AcceptLimiter::defer (int fd, Acceptor::AcceptorFunction *aFunc, void *data)
-{
- debugs(5, 5, "AcceptLimiter::defer: FD " << fd << " handler: " << (void*)aFunc);
- Acceptor temp;
- temp.theFunction = aFunc;
- temp.acceptFD = fd;
- temp.theData = data;
- deferred.push_back(temp);
-}
-
-void
-AcceptLimiter::kick()
-{
- if (!deferring())
- return;
-
- /* Yes, this means the first on is the last off....
- * If the list container was a little more friendly, we could sensibly us it.
- */
- Acceptor temp = deferred.pop_back();
-
- comm_accept (temp.acceptFD, temp.theFunction, temp.theData);
-}
-
/// Start waiting for a possibly half-closed connection to close
// by scheduling a read callback to a monitoring handler that
// will close the connection on read errors.
/* comm.c */
extern bool comm_iocallbackpending(void); /* inline candidate */
-extern int comm_listen(int fd);
SQUIDCEXTERN int commSetNonBlocking(int fd);
SQUIDCEXTERN int commUnsetNonBlocking(int fd);
SQUIDCEXTERN void commSetCloseOnExec(int fd);
class ConnectionDetail;
typedef void IOACB(int fd, int nfd, ConnectionDetail *details, comm_err_t flag, int xerrno, void *data);
-extern void comm_accept(int fd, IOACB *handler, void *handler_data);
-extern void comm_accept(int fd, AsyncCall::Pointer &call);
extern void comm_add_close_handler(int fd, PF *, void *);
extern void comm_add_close_handler(int fd, AsyncCall::Pointer &);
extern void comm_remove_close_handler(int fd, PF *, void *);
inline void commMarkHalfClosed(int fd) { commStartHalfClosedMonitor(fd); }
inline bool commIsHalfClosed(int fd) { return commHasHalfClosedMonitor(fd); }
-/* Not sure where these should live yet */
-
-class Acceptor
-{
-
-public:
- typedef void AcceptorFunction (int, int, ConnectionDetail *, comm_err_t, int, void *);
- AcceptorFunction *theFunction;
- int acceptFD;
- void *theData;
-};
-
-class AcceptLimiter
-{
-
-public:
- static AcceptLimiter &Instance();
- void defer (int, Acceptor::AcceptorFunction *, void *);
- void kick();
-
- bool deferring() const;
-
-private:
- static AcceptLimiter Instance_;
- Vector<Acceptor> deferred;
-};
-
/* A comm engine that calls comm_select */
class CommSelectEngine : public AsyncEngine
--- /dev/null
+#include "config.h"
+#include "comm/AcceptLimiter.h"
+#include "comm/ListenStateData.h"
+#include "fde.h"
+
+Comm::AcceptLimiter Comm::AcceptLimiter::Instance_;
+
+Comm::AcceptLimiter &Comm::AcceptLimiter::Instance()
+{
+ return Instance_;
+}
+
+void
+Comm::AcceptLimiter::defer(Comm::ListenStateData *afd)
+{
+ afd->isLimited++;
+ debugs(5, 5, HERE << "FD " << afd->fd << " x" << afd->isLimited);
+ deferred.push_back(afd);
+}
+
+void
+Comm::AcceptLimiter::kick()
+{
+ debugs(5, 5, HERE << " size=" << deferred.size());
+ if (deferred.size() > 0 && fdNFree() >= RESERVED_FD) {
+ debugs(5, 5, HERE << " doing one.");
+ /* NP: shift() is equivalent to pop_front(). Giving us a FIFO queue. */
+ ListenStateData *temp = deferred.shift();
+ temp->isLimited--;
+ temp->acceptNext();
+ }
+}
--- /dev/null
+#ifndef _SQUID_SRC_COMM_ACCEPT_LIMITER_H
+#define _SQUID_SRC_COMM_ACCEPT_LIMITER_H
+
+#include "Array.h"
+
+namespace Comm {
+
+class ListenStateData;
+
+/**
+ * FIFO Queue holding listener socket handlers which have been activated
+ * ready to dupe their FD and accept() a new client connection.
+ * But when doing so there were not enough FD available to handle the
+ * new connection. These handlers are awaiting some FD to become free.
+ *
+ * defer - used only by Comm layer ListenStateData adding themselves when FD are limited.
+ * kick - used by Comm layer when FD are closed.
+ */
+class AcceptLimiter
+{
+
+public:
+ /** retrieve the global instance of the queue. */
+ static AcceptLimiter &Instance();
+
+ /** delay accepting a new client connection. */
+ void defer(Comm::ListenStateData *afd);
+
+ /** try to accept and begin processing any delayed client connections. */
+ void kick();
+
+private:
+ static AcceptLimiter Instance_;
+
+ /** FIFO queue */
+ Vector<Comm::ListenStateData*> deferred;
+};
+
+}; // namepace Comm
+
+#endif /* _SQUID_SRC_COMM_ACCEPT_LIMITER_H */
--- /dev/null
+/*
+ * DEBUG: section 5 Listener Socket Handler
+ * AUTHOR: Harvest Derived
+ *
+ * SQUID Web Proxy Cache http://www.squid-cache.org/
+ * ----------------------------------------------------------
+ *
+ * Squid is the result of efforts by numerous individuals from
+ * the Internet community; see the CONTRIBUTORS file for full
+ * details. Many organizations have provided support for Squid's
+ * development; see the SPONSORS file for full details. Squid is
+ * Copyrighted (C) 2001 by the Regents of the University of
+ * California; see the COPYRIGHT file for full details. Squid
+ * incorporates software developed and/or copyrighted by other
+ * sources; see the CREDITS file for full details.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
+ *
+ *
+ * Copyright (c) 2003, Robert Collins <robertc@squid-cache.org>
+ */
+
+#include "squid.h"
+#include "CommCalls.h"
+#include "comm/AcceptLimiter.h"
+#include "comm/comm_internal.h"
+#include "comm/ListenStateData.h"
+#include "ConnectionDetail.h"
+#include "fde.h"
+#include "SquidTime.h"
+
+/*
+ * This is not strictly needed at all.
+ * It's only needed by the cachemgr interface to list the currently active sockets.
+ * which could be done for HTTP/HTTPS by listing the http_port_list->listener->fd
+ * BUT, FTP data connections is a bit of a problem.
+ *
+ * AYJ: for now the old way of doing Comm:: actions sequentially in some caller
+ * requires this to anchor each of those Comm:: functions together.
+ */
+std::map<int, Comm::ListenStateData*> Comm::CurrentListenerSockets;
+
+/**
+ * Set of listener sockets which are known to have events pending but we
+ * do not have enough sockets available to do the accept just yet.
+ */
+//std::list<Comm::ListenStateData*> Comm::PendingAccepts;
+
+
+/**
+ * New-style listen and accept routines
+ *
+ * Listen simply registers our interest in an FD for listening,
+ * and accept takes a callback to call when an FD has been
+ * accept()ed.
+ */
+int
+Comm::comm_listen(int sock)
+{
+ int x;
+
+ if ((x = listen(sock, Squid_MaxFD >> 2)) < 0) {
+ debugs(50, 0, HERE << "listen(" << (Squid_MaxFD >> 2) << ", " << sock << "): " << xstrerror());
+ return x;
+ }
+
+ if (Config.accept_filter && strcmp(Config.accept_filter, "none") != 0) {
+#ifdef SO_ACCEPTFILTER
+ struct accept_filter_arg afa;
+ bzero(&afa, sizeof(afa));
+ debugs(5, DBG_IMPORTANT, "Installing accept filter '" << Config.accept_filter << "' on FD " << sock);
+ xstrncpy(afa.af_name, Config.accept_filter, sizeof(afa.af_name));
+ x = setsockopt(sock, SOL_SOCKET, SO_ACCEPTFILTER, &afa, sizeof(afa));
+ if (x < 0)
+ debugs(5, DBG_CRITICAL, "SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror());
+#elif defined(TCP_DEFER_ACCEPT)
+ int seconds = 30;
+ if (strncmp(Config.accept_filter, "data=", 5) == 0)
+ seconds = atoi(Config.accept_filter + 5);
+ x = setsockopt(sock, IPPROTO_TCP, TCP_DEFER_ACCEPT, &seconds, sizeof(seconds));
+ if (x < 0)
+ debugs(5, DBG_CRITICAL, "TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror());
+#else
+ debugs(5, DBG_CRITICAL, "accept_filter not supported on your OS");
+#endif
+ }
+
+ return sock;
+}
+
+// TODO make this a constructor of ListenStateData...
+// better yet convert the places its used to setup AsyncCalls instead...
+Comm::ListenStateData *
+Comm::comm_accept(int fd, IOACB *handler, void *handler_data)
+{
+ debugs(5, 5, HERE << "FD " << fd << " handler: " << (void*)handler);
+ assert(isOpen(fd));
+
+ AsyncCall::Pointer call = commCbCall(5,5, "SomeCommAcceptHandler",
+ CommAcceptCbPtrFun(handler, handler_data));
+
+ return new Comm::ListenStateData(fd, call, false);
+}
+
+Comm::ListenStateData::ListenStateData(int aFd, AsyncCall::Pointer &call, bool accept_many) :
+ fd(aFd),
+ theCallback(call),
+ mayAcceptMore(accept_many)
+{
+ assert(aFd >= 0);
+ debugs(5, 5, HERE << "FD " << fd << " AsyncCall: " << call);
+ assert(isOpen(aFd));
+
+ CurrentListenerSockets[fd] = this;
+
+ errcode = comm_listen(fd);
+ commSetSelect(fd, COMM_SELECT_READ, doAccept, this, 0);
+}
+
+Comm::ListenStateData::~ListenStateData()
+{
+ // un-register listener before closing the FD.
+ // TODO: is this the right way to remove from a std::map<> ?
+ if (CurrentListenerSockets[fd])
+ CurrentListenerSockets[fd] = NULL;
+
+ comm_close(fd);
+ fd = -1;
+}
+
+/**
+ * This private callback is called whenever a filedescriptor is ready
+ * to dupe itself and fob off an accept()ed connection
+ *
+ * It will either do that accept operation. Or if there are not enough FD
+ * available to do the clone safely will push the listening FD into a list
+ * of deferred operations. The list gets kicked and the dupe/accept() actually
+ * done later when enough sockets become available.
+ */
+void
+Comm::ListenStateData::doAccept(int fd, void *data)
+{
+ debugs(5, 2, HERE << "New connection on FD " << fd);
+
+ assert(isOpen(fd));
+ ListenStateData *afd = static_cast<ListenStateData*>(data);
+
+ if (!okToAccept()) {
+ AcceptLimiter::Instance().defer(afd);
+ }
+ else {
+ afd->acceptNext();
+ }
+ commSetSelect(fd, COMM_SELECT_READ, Comm::ListenStateData::doAccept, afd, 0);
+}
+
+bool
+Comm::ListenStateData::okToAccept()
+{
+ static time_t last_warn = 0;
+
+ if (fdNFree() >= RESERVED_FD)
+ return true;
+
+ if (last_warn + 15 < squid_curtime) {
+ debugs(5, DBG_CRITICAL, "WARNING! Your cache is running out of filedescriptors");
+ last_warn = squid_curtime;
+ }
+
+ return false;
+}
+
+bool
+Comm::ListenStateData::acceptOne()
+{
+ /*
+ * We don't worry about running low on FDs here. Instead,
+ * doAccept() will use AcceptLimiter if we reach the limit
+ * there.
+ */
+
+ /* Accept a new connection */
+ ConnectionDetail connDetails;
+ int newfd = oldAccept(connDetails);
+
+ /* Check for errors */
+ if (newfd < 0) {
+
+ if (newfd == COMM_NOMESSAGE) {
+ /* register interest again */
+ debugs(5, 5, HERE << "try later: FD " << fd << " handler: " << *theCallback);
+ commSetSelect(fd, COMM_SELECT_READ, doAccept, this, 0);
+ return false;
+ }
+
+ // A non-recoverable error; notify the caller */
+ debugs(5, 5, HERE << "non-recoverable error: FD " << fd << " handler: " << *theCallback);
+ notify(-1, COMM_ERROR, errno, connDetails);
+ return false;
+ }
+
+ debugs(5, 5, HERE << "accepted: FD " << fd <<
+ " newfd: " << newfd << " from: " << connDetails.peer <<
+ " handler: " << *theCallback);
+ notify(newfd, COMM_OK, 0, connDetails);
+ return true;
+}
+
+void
+Comm::ListenStateData::acceptNext()
+{
+ assert(isOpen(fd));
+ debugs(5, 2, HERE << "connection on FD " << fd);
+ mayAcceptMore = acceptOne();
+}
+
+void
+Comm::ListenStateData::notify(int newfd, comm_err_t errcode, int xerrno, const ConnectionDetail &connDetails)
+{
+ // listener socket handlers just abandon the port with COMM_ERR_CLOSING
+ // it should only happen when this object is deleted...
+ if (errcode == COMM_ERR_CLOSING) {
+ return;
+ }
+
+ if (theCallback != NULL) {
+ typedef CommAcceptCbParams Params;
+ Params ¶ms = GetCommParams<Params>(theCallback);
+ params.fd = fd;
+ params.nfd = newfd;
+ params.details = connDetails;
+ params.flag = errcode;
+ params.xerrno = xerrno;
+ ScheduleCallHere(theCallback);
+ if (!mayAcceptMore)
+ theCallback = NULL;
+ }
+}
+
+/**
+ * accept() and process
+ * Wait for an incoming connection on FD. FD should be a socket returned
+ * from comm_listen. */
+int
+Comm::ListenStateData::oldAccept(ConnectionDetail &details)
+{
+ PROF_start(comm_accept);
+ statCounter.syscalls.sock.accepts++;
+ int sock;
+ struct addrinfo *gai = NULL;
+ details.me.InitAddrInfo(gai);
+
+ if ((sock = accept(fd, gai->ai_addr, &gai->ai_addrlen)) < 0) {
+
+ details.me.FreeAddrInfo(gai);
+
+ PROF_stop(comm_accept);
+
+ if (ignoreErrno(errno)) {
+ debugs(50, 5, HERE << "FD " << fd << ": " << xstrerror());
+ return COMM_NOMESSAGE;
+ } else if (ENFILE == errno || EMFILE == errno) {
+ debugs(50, 3, HERE << "FD " << fd << ": " << xstrerror());
+ return COMM_ERROR;
+ } else {
+ debugs(50, 1, HERE << "FD " << fd << ": " << xstrerror());
+ return COMM_ERROR;
+ }
+ }
+
+ details.peer = *gai;
+
+ details.me.InitAddrInfo(gai);
+
+ details.me.SetEmpty();
+ getsockname(sock, gai->ai_addr, &gai->ai_addrlen);
+ details.me = *gai;
+
+ commSetCloseOnExec(sock);
+
+ /* fdstat update */
+ fd_open(sock, FD_SOCKET, "HTTP Request");
+
+ fdd_table[sock].close_file = NULL;
+ fdd_table[sock].close_line = 0;
+
+ fde *F = &fd_table[sock];
+ details.peer.NtoA(F->ipaddr,MAX_IPSTRLEN);
+ F->remote_port = details.peer.GetPort();
+ F->local_addr.SetPort(details.me.GetPort());
+#if USE_IPV6
+ F->sock_family = AF_INET;
+#else
+ F->sock_family = details.me.IsIPv4()?AF_INET:AF_INET6;
+#endif
+ details.me.FreeAddrInfo(gai);
+
+ commSetNonBlocking(sock);
+
+ /* IFF the socket is (tproxy) transparent, pass the flag down to allow spoofing */
+ F->flags.transparent = fd_table[fd].flags.transparent;
+
+ PROF_stop(comm_accept);
+ return sock;
+}
--- /dev/null
+#ifndef SQUID_LISTENERSTATEDATA_H
+#define SQUID_LISTENERSTATEDATA_H
+
+#include "config.h"
+#include "base/AsyncCall.h"
+#include "comm.h"
+#if HAVE_MAP
+#include <map>
+#endif
+
+class ConnectionDetail;
+
+namespace Comm {
+
+class ListenStateData
+{
+
+public:
+ ListenStateData(int fd, AsyncCall::Pointer &call, bool accept_many);
+ ListenStateData(const ListenStateData &r); // not implemented.
+ ~ListenStateData();
+
+ void subscribe(AsyncCall::Pointer &call);
+ void acceptNext();
+ void notify(int newfd, comm_err_t, int xerrno, const ConnectionDetail &);
+
+ int fd;
+
+ /// errno code if any happened so far.
+ int errcode;
+
+ /// whether this socket is delayed and on the AcceptLimiter queue.
+ int32_t isLimited;
+
+private:
+ /// Method to test if there are enough file escriptors to open a new client connection
+ /// if not the accept() will be postponed
+ static bool okToAccept();
+
+ /// Method callback for whenever an FD is ready to accept a client connection.
+ static void doAccept(int fd, void *data);
+
+ bool acceptOne();
+ int oldAccept(ConnectionDetail &details);
+
+ AsyncCall::Pointer theCallback;
+ bool mayAcceptMore;
+};
+
+extern std::map<int,ListenStateData*> CurrentListenerSockets;
+
+// remaining legacy functions. TODO replace all uses with the ListenData constructor
+extern int comm_listen(int fd);
+extern ListenStateData *comm_accept(int fd, IOACB *handler, void *handler_data);
+
+}; // namespace Comm
+
+#endif /* SQUID_LISTENERSTATEDATA_H */
--- /dev/null
+include $(top_srcdir)/src/Common.am
+include $(top_srcdir)/src/TestHeaders.am
+
+noinst_LTLIBRARIES = libcomm-listener.la
+
+## Library holding listener comm socket handlers
+libcomm_listener_la_SOURCES= \
+ AcceptLimiter.cc \
+ AcceptLimiter.h \
+ ListenStateData.cc \
+ ListenStateData.h \
+ \
+ comm_internal.h
--- /dev/null
+#ifndef SQUID_COMM_COMM_INTERNAL_H
+#define SQUID_COMM_COMM_INTERNAL_H
+
+/* misc collection of bits shared by Comm code, but not needed by the rest of Squid. */
+
+struct _fd_debug_t {
+ char const *close_file;
+ int close_line;
+};
+
+typedef struct _fd_debug_t fd_debug_t;
+extern fd_debug_t *fdd_table;
+
+extern bool isOpen(const int fd);
+
+#endif
};
+SQUIDCEXTERN int fdNFree(void);
+
#endif /* SQUID_FDE_H */
*/
#include "squid.h"
-#include "Store.h"
-#include "HttpRequest.h"
-#include "HttpReply.h"
+#include "comm.h"
+#include "comm/ListenStateData.h"
+#include "ConnectionDetail.h"
#include "errorpage.h"
#include "fde.h"
-#include "comm.h"
-#include "HttpHeaderRange.h"
+#include "forward.h"
#include "HttpHdrContRange.h"
+#include "HttpHeaderRange.h"
#include "HttpHeader.h"
+#include "HttpRequest.h"
+#include "HttpReply.h"
+#include "MemBuf.h"
+#include "Server.h"
+#include "SquidTime.h"
+#include "Store.h"
+#include "URLScheme.h"
+#include "wordlist.h"
+
#if DELAY_POOLS
#include "DelayPools.h"
#include "MemObject.h"
#endif
-#include "ConnectionDetail.h"
-#include "forward.h"
-#include "Server.h"
-#include "MemBuf.h"
-#include "wordlist.h"
-#include "SquidTime.h"
-#include "URLScheme.h"
/**
\defgroup ServerProtocolFTPInternal Server-Side FTP Internals
/// called after the socket is opened, sets up close handler
void opened(int aFd, const AsyncCall::Pointer &aCloser);
- void close(); /// clears the close handler and calls comm_close
- void clear(); /// just resets fd and close handler
+ /** Handles all operations needed to properly close the active channel FD.
+ * clearing the close handler, clearing the listen socket properly, and calling comm_close
+ */
+ void close();
+
+ void clear(); /// just resets fd and close handler. does not close active connections.
int fd; /// channel descriptor; \todo: remove because the closer has it
+ /** Current listening socket handler. delete on shutdown or abort.
+ * FTP stores a copy of the FD in the field fd above.
+ * Use close() to properly close the channel.
+ */
+ Comm::ListenStateData *listener;
+
private:
AsyncCall::Pointer closer; /// Comm close handler callback
};
void
FtpStateData::dataClosed(const CommCloseCbParams &io)
{
+ if (data.listener) {
+ delete data.listener;
+ data.listener = NULL;
+ data.fd = -1;
+ }
data.clear();
failed(ERR_FTP_FAILURE, 0);
/* failed closes ctrl.fd and frees ftpState */
int on = 1;
int x = 0;
- /// Close old data channel, if any. We may open a new one below.
+ /// Close old data channels, if any. We may open a new one below.
ftpState->data.close();
/*
return -1;
}
- if (comm_listen(fd) < 0) {
+ typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
+ AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection",
+ acceptDialer(ftpState, &FtpStateData::ftpAcceptDataConnection));
+ ftpState->data.listener = new Comm::ListenStateData(fd, acceptCall, false);
+
+ if (!ftpState->data.listener || ftpState->data.listener->errcode < 0) {
comm_close(fd);
return -1;
}
char ntoapeer[MAX_IPSTRLEN];
debugs(9, 3, "ftpAcceptDataConnection");
- if (io.flag == COMM_ERR_CLOSING)
- return;
+ // one connection accepted. the handler has stopped listening. drop our local pointer to it.
+ data.listener = NULL;
if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) {
abortTransaction("entry aborted when accepting data conn");
io.details.peer << "), expecting " <<
fd_table[ctrl.fd].ipaddr);
+ /* close the bad soures connection down ASAP. */
comm_close(io.nfd);
+
+ /* we are ony accepting once, so need to re-open the listener socket. */
typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection",
acceptDialer(this, &FtpStateData::ftpAcceptDataConnection));
- comm_accept(data.fd, acceptCall);
+ data.listener = new Comm::ListenStateData(data.fd, acceptCall, false);
return;
}
}
if (io.flag != COMM_OK) {
- debugs(9, DBG_IMPORTANT, "ftpHandleDataAccept: comm_accept(" << io.nfd << "): " << xstrerr(io.xerrno));
+ debugs(9, DBG_IMPORTANT, "ftpHandleDataAccept: FD " << io.nfd << ": " << xstrerr(io.xerrno));
/** \todo XXX Need to set error message */
ftpFail(this);
return;
AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection",
acceptDialer(this, &FtpStateData::ftpAcceptDataConnection));
- comm_accept(data.fd, acceptCall);
+ data.listener = new Comm::ListenStateData(data.fd, acceptCall, false);
} else {
debugs(9, DBG_IMPORTANT, HERE << "Unexpected reply code "<< std::setfill('0') << std::setw(3) << code);
ftpFail(this);
AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection",
acceptDialer(ftpState, &FtpStateData::ftpAcceptDataConnection));
- comm_accept(ftpState->data.fd, acceptCall);
+ ftpState->data.listener = new Comm::ListenStateData(ftpState->data.fd, acceptCall, false);
/*
* Cancel the timeout on the Control socket and establish one
* on the data socket
typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection",
acceptDialer(ftpState, &FtpStateData::ftpAcceptDataConnection));
- comm_accept(ftpState->data.fd, acceptCall);
+ ftpState->data.listener = new Comm::ListenStateData(ftpState->data.fd, acceptCall, false);
/*
* Cancel the timeout on the Control socket and establish one
* on the data socket
/**
* Did we close all FTP server connection(s)?
*
- \retval true Both server control and data channels are closed.
+ \retval true Both server control and data channels are closed. And not waitigng for a new data connection to open.
\retval false Either control channel or data is still active.
*/
bool
void
FtpChannel::close()
{
- if (fd >= 0) {
+ // channels with active listeners will be closed when the listener handler dies.
+ if (listener) {
+ delete listener;
+ listener = NULL;
+ comm_remove_close_handler(fd, closer);
+ closer = NULL;
+ fd = -1;
+ }
+ else if (fd >= 0) {
comm_remove_close_handler(fd, closer);
closer = NULL;
comm_close(fd); // we do not expect to be called back
SQUIDCEXTERN void fd_note(int fd, const char *);
SQUIDCEXTERN void fd_bytes(int fd, int len, unsigned int type);
SQUIDCEXTERN void fdDumpOpen(void);
-SQUIDCEXTERN int fdNFree(void);
SQUIDCEXTERN int fdUsageHigh(void);
SQUIDCEXTERN void fdAdjustReserved(void);
#include "squid.h"
#include "event.h"
+#include "fde.h"
#include "Store.h"
#include "CacheManager.h"
#include "StoreClient.h"
*
*/
-#include "squid.h"
+#include "config.h"
+#include "fde.h"
int
fdNFree(void)