From: Amos Jeffries Date: Sun, 15 Aug 2010 05:06:11 +0000 (+1200) Subject: Generate calls on demand and prevet early FD closure X-Git-Tag: take08~55^2~124^2~84 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=0ba55a12a8c575fac0d77a6ae4392228b35b7f8b;p=thirdparty%2Fsquid.git Generate calls on demand and prevet early FD closure --- diff --git a/src/CommCalls.h b/src/CommCalls.h index 0a9e147f1d..d68bdfefd6 100644 --- a/src/CommCalls.h +++ b/src/CommCalls.h @@ -183,6 +183,9 @@ public: CommAcceptCbPtrFun(IOACB *aHandler, const CommAcceptCbParams &aParams); void dial(); + // yuck. But we can't store the PtrFun dialers without it. +// virtual bool canDial(AsyncCall &c) { return true; } + virtual void print(std::ostream &os) const; public: diff --git a/src/client_side.cc b/src/client_side.cc index 83a4a7c2a5..48337de8d2 100644 --- a/src/client_side.cc +++ b/src/client_side.cc @@ -3142,6 +3142,9 @@ httpAccept(int sock, int newfd, Comm::ConnectionPointer &details, clientdbEstablished(details->remote, 1); incoming_sockets_accepted++; + + // TODO: remove this when details conn is passed around properly. + details->fd = -1; // ConnStateData has assumed control of the FD now. } #if USE_SSL @@ -3343,6 +3346,9 @@ httpsAccept(int sock, int newfd, Comm::ConnectionPointer& details, clientdbEstablished(details->remote, 1); incoming_sockets_accepted++; + + // TODO: remove this when details conn is passed around properly. + details->fd = -1; // ConnStateData has assumed control of the FD now. } bool @@ -3465,10 +3471,8 @@ clientHttpConnectionOpened(int fd, int, http_port_list *s) Must(s); - AsyncCall::Pointer call = commCbCall(5,5, "SomeCommAcceptHandler(httpAccept)", - CommAcceptCbPtrFun(httpAccept, s)); - - s->listener = new Comm::ListenStateData(fd, call, true); + s->listener = new Comm::ListenStateData(fd, true); + s->listener->subscribe(5,5, "httpAccept", new CommAcceptCbPtrFun(httpAccept, s)); debugs(1, 1, "Accepting " << (s->intercepted ? " intercepted" : "") << @@ -3519,10 +3523,8 @@ clientHttpsConnectionOpened(int fd, int, http_port_list *s) Must(s); - AsyncCall::Pointer call = commCbCall(5,5, "SomeCommAcceptHandler(httpsAccept)", - CommAcceptCbPtrFun(httpsAccept, s)); - - s->listener = new Comm::ListenStateData(fd, call, true); + s->listener = new Comm::ListenStateData(fd, true); + s->listener->subscribe(5,5, "httpsAccept", new CommAcceptCbPtrFun(httpsAccept, s)); debugs(1, 1, "Accepting HTTPS connections at " << s->s << ", FD " << fd << "."); diff --git a/src/comm/ListenStateData.cc b/src/comm/ListenStateData.cc index 78f11c528f..66b19855a3 100644 --- a/src/comm/ListenStateData.cc +++ b/src/comm/ListenStateData.cc @@ -42,43 +42,7 @@ #include "protos.h" #include "SquidTime.h" -/** - * New-style listen and accept routines - * - * setListen simply registers our interest in an FD for listening. - * The constructor takes a callback to call when an FD has been - * accept()ed some time later. - */ -void -Comm::ListenStateData::setListen() -{ - errcode = 0; // reset local errno copy. - if (listen(fd, Squid_MaxFD >> 2) < 0) { - debugs(50, 0, HERE << "listen(FD " << fd << ", " << (Squid_MaxFD >> 2) << "): " << xstrerror()); - errcode = errno; - return; - } - - if (Config.accept_filter && strcmp(Config.accept_filter, "none") != 0) { -#ifdef SO_ACCEPTFILTER - struct accept_filter_arg afa; - bzero(&afa, sizeof(afa)); - debugs(5, DBG_IMPORTANT, "Installing accept filter '" << Config.accept_filter << "' on FD " << fd); - xstrncpy(afa.af_name, Config.accept_filter, sizeof(afa.af_name)); - if (setsockopt(fd, SOL_SOCKET, SO_ACCEPTFILTER, &afa, sizeof(afa)) < 0) - debugs(5, DBG_CRITICAL, "SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror()); -#elif defined(TCP_DEFER_ACCEPT) - int seconds = 30; - if (strncmp(Config.accept_filter, "data=", 5) == 0) - seconds = atoi(Config.accept_filter + 5); - if (setsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &seconds, sizeof(seconds)) < 0) - debugs(5, DBG_CRITICAL, "TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror()); -#else - debugs(5, DBG_CRITICAL, "accept_filter not supported on your OS"); -#endif - } -} - +// TODO remove. Comm::ListenStateData::ListenStateData(int aFd, AsyncCall::Pointer &call, bool accept_many) : fd(aFd), errcode(0), @@ -93,6 +57,7 @@ Comm::ListenStateData::ListenStateData(int aFd, AsyncCall::Pointer &call, bool a commSetSelect(fd, COMM_SELECT_READ, doAccept, this, 0); } +// TODO remove. Comm::ListenStateData::ListenStateData(Comm::ConnectionPointer &conn, AsyncCall::Pointer &call, bool accept_many, const char *note) : errcode(0), isLimited(0), @@ -123,10 +88,120 @@ Comm::ListenStateData::ListenStateData(Comm::ConnectionPointer &conn, AsyncCall: commSetSelect(conn->fd, COMM_SELECT_READ, doAccept, this, 0); } +Comm::ListenStateData::ListenStateData(int aFd, bool accept_many) : + fd(aFd), + errcode(0), + isLimited(0), + callSection(NULL), + callLevel(NULL), + callName(NULL), + callDialer(NULL), + mayAcceptMore(accept_many) +{ + assert(aFd >= 0); + assert(isOpen(aFd)); +} + +Comm::ListenStateData::ListenStateData(Comm::ConnectionPointer &conn, bool accept_many, const char *note) : + errcode(0), + isLimited(0), + callSection(NULL), + callLevel(NULL), + callName(NULL), + callDialer(NULL), + mayAcceptMore(accept_many) +{ + /* open the conn if its not already open */ + if (!IsConnOpen(conn)) { + conn->fd = comm_open(SOCK_STREAM, + IPPROTO_TCP, + conn->local, + conn->flags, + note); + debugs(9, 3, HERE << "Unconnected data socket created on FD " << conn->fd ); + + if (!conn->isOpen()) { + debugs(5, DBG_CRITICAL, HERE << "comm_open failed"); + errcode = -1; + return; + } + } + + assert(IsConnOpen(conn)); + fd = conn->fd; +} + Comm::ListenStateData::~ListenStateData() { comm_close(fd); fd = -1; + delete callDialer;; +} + +void +Comm::ListenStateData::subscribe(int section, int level, const char *name, CommAcceptCbPtrFun *dialer) +{ + debugs(5, 5, HERE << "FD " << fd << " AsyncCall: " << name); + + // if this is the first subscription. start listening on the socket. + if (callDialer == NULL) + setListen(); + + // store the subscribed handler details. + callSection = section; + callLevel = level; + safe_free(callName); + callName = xstrdup(name); + callDialer = dialer; + + // if no error so far start accepting connections. + if (errcode == 0) + commSetSelect(fd, COMM_SELECT_READ, doAccept, this, 0); +} + +void +Comm::ListenStateData::unsubscribe() +{ + safe_free(callName); + delete callDialer; + callDialer = NULL; +} + +/** + * New-style listen and accept routines + * + * setListen simply registers our interest in an FD for listening. + * The constructor takes a callback to call when an FD has been + * accept()ed some time later. + */ +void +Comm::ListenStateData::setListen() +{ + errcode = 0; // reset local errno copy. + if (listen(fd, Squid_MaxFD >> 2) < 0) { + debugs(50, 0, HERE << "listen(FD " << fd << ", " << (Squid_MaxFD >> 2) << "): " << xstrerror()); + errcode = errno; + return; + } + + if (Config.accept_filter && strcmp(Config.accept_filter, "none") != 0) { +#ifdef SO_ACCEPTFILTER + struct accept_filter_arg afa; + bzero(&afa, sizeof(afa)); + debugs(5, DBG_IMPORTANT, "Installing accept filter '" << Config.accept_filter << "' on FD " << fd); + xstrncpy(afa.af_name, Config.accept_filter, sizeof(afa.af_name)); + if (setsockopt(fd, SOL_SOCKET, SO_ACCEPTFILTER, &afa, sizeof(afa)) < 0) + debugs(5, DBG_CRITICAL, "SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror()); +#elif defined(TCP_DEFER_ACCEPT) + int seconds = 30; + if (strncmp(Config.accept_filter, "data=", 5) == 0) + seconds = atoi(Config.accept_filter + 5); + if (setsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &seconds, sizeof(seconds)) < 0) + debugs(5, DBG_CRITICAL, "TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror()); +#else + debugs(5, DBG_CRITICAL, "accept_filter not supported on your OS"); +#endif + } } /** @@ -188,13 +263,13 @@ Comm::ListenStateData::acceptOne() if (newfd == COMM_NOMESSAGE) { /* register interest again */ - debugs(5, 5, HERE << "try later: FD " << fd << " handler: " << theCallback); + debugs(5, 5, HERE << "try later: FD " << fd << " handler: " << callName); commSetSelect(fd, COMM_SELECT_READ, doAccept, this, 0); return; } // A non-recoverable error; notify the caller */ - debugs(5, 5, HERE << "non-recoverable error: FD " << fd << " handler: " << theCallback); + debugs(5, 5, HERE << "non-recoverable error: FD " << fd << " handler: " << callName); notify(-1, COMM_ERROR, connDetails); mayAcceptMore = false; return; @@ -202,7 +277,7 @@ Comm::ListenStateData::acceptOne() debugs(5, 5, HERE << "accepted: FD " << fd << " newfd: " << newfd << " from: " << connDetails->remote << - " handler: " << theCallback); + " handler: " << callName); notify(newfd, COMM_OK, connDetails); } @@ -223,7 +298,20 @@ Comm::ListenStateData::notify(int newfd, comm_err_t flag, const Comm::Connection return; } - if (theCallback != NULL) { + if (callDialer != NULL) { + AsyncCall::Pointer call = commCbCall(callSection, callLevel, callName, *callDialer); + typedef CommAcceptCbParams Params; + Params ¶ms = GetCommParams(call); + params.fd = fd; + params.nfd = newfd; + params.details = connDetails; + params.flag = flag; + params.xerrno = errcode; + ScheduleCallHere(call); + if (!mayAcceptMore) + unsubscribe(); + } + else if (theCallback != NULL) { typedef CommAcceptCbParams Params; Params ¶ms = GetCommParams(theCallback); params.fd = fd; diff --git a/src/comm/ListenStateData.h b/src/comm/ListenStateData.h index bd0ca895bd..1e0a953c59 100644 --- a/src/comm/ListenStateData.h +++ b/src/comm/ListenStateData.h @@ -2,7 +2,7 @@ #define SQUID_LISTENERSTATEDATA_H #include "config.h" -#include "base/AsyncCall.h" +#include "CommCalls.h" #include "comm/comm_err_t.h" #include "comm/forward.h" @@ -17,15 +17,38 @@ class ListenStateData { public: +// old remove ASAP when subscribe is working. ListenStateData(int fd, AsyncCall::Pointer &call, bool accept_many); // Legacy ListenStateData(Comm::ConnectionPointer &conn, AsyncCall::Pointer &call, bool accept_many, const char *note); + + ListenStateData(int fd, bool accept_many); // Legacy verion that uses new subscribe API. + ListenStateData(Comm::ConnectionPointer &conn, bool accept_many, const char *note); ListenStateData(const ListenStateData &r); // not implemented. ~ListenStateData(); + // legacy. removing ASAP when below version works. void subscribe(AsyncCall::Pointer &call) { theCallback = call; }; + + /** Subscribe a handler to receive calls back about new connections. + * Replaces any existing subscribed handler. + */ + void subscribe(int level, int section, const char *name, CommAcceptCbPtrFun *dialer); +// void subscribe(int level, int section, const char *name, CommAcceptMemFun *dialer); + + /** Remove the currently waiting callback subscription. + * Pending calls will remain scheduled. + */ + void unsubscribe(); + + /** Try and accept another connection. + * If any are pending it will be passed asynchronously to the subscribed callback. + */ void acceptNext(); + + /// Call the subscribed callback handler with details about a new connection. void notify(int newfd, comm_err_t flag, const Comm::ConnectionPointer &details); + /// socket being listened on for new connections int fd; /// errno code of the last accept() or listen() action if one occurred. @@ -34,6 +57,12 @@ public: /// whether this socket is delayed and on the AcceptLimiter queue. int32_t isLimited; +private: + int callSection; ///< debug section for subscribed callback. + int callLevel; ///< debug level for subscribed callback. + char *callName; ///< Name for the subscribed callback. + CommAcceptCbPtrFun *callDialer; ///< dialer to make the subscribed callback + private: /// Method to test if there are enough file descriptors to open a new client connection /// if not the accept() will be postponed