]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Generate calls on demand and prevet early FD closure
authorAmos Jeffries <squid3@treenet.co.nz>
Sun, 15 Aug 2010 05:06:11 +0000 (17:06 +1200)
committerAmos Jeffries <squid3@treenet.co.nz>
Sun, 15 Aug 2010 05:06:11 +0000 (17:06 +1200)
src/CommCalls.h
src/client_side.cc
src/comm/ListenStateData.cc
src/comm/ListenStateData.h

index 0a9e147f1d1c59612a31a9817c44a54f4821681d..d68bdfefd62009a426d652667826c99b59fb94a8 100644 (file)
@@ -183,6 +183,9 @@ public:
     CommAcceptCbPtrFun(IOACB *aHandler, const CommAcceptCbParams &aParams);
     void dial();
 
+    // yuck. But we can't store the PtrFun dialers without it.
+//    virtual bool canDial(AsyncCall &c) { return true; }
+
     virtual void print(std::ostream &os) const;
 
 public:
index 83a4a7c2a534847ff81e128d6066eeacc13f4714..48337de8d27226a4b4b6e2dc252820dc92994cb9 100644 (file)
@@ -3142,6 +3142,9 @@ httpAccept(int sock, int newfd, Comm::ConnectionPointer &details,
     clientdbEstablished(details->remote, 1);
 
     incoming_sockets_accepted++;
+
+    // TODO: remove this when details conn is passed around properly.
+    details->fd = -1; // ConnStateData has assumed control of the FD now.
 }
 
 #if USE_SSL
@@ -3343,6 +3346,9 @@ httpsAccept(int sock, int newfd, Comm::ConnectionPointer& details,
     clientdbEstablished(details->remote, 1);
 
     incoming_sockets_accepted++;
+
+    // TODO: remove this when details conn is passed around properly.
+    details->fd = -1; // ConnStateData has assumed control of the FD now.
 }
 
 bool
@@ -3465,10 +3471,8 @@ clientHttpConnectionOpened(int fd, int, http_port_list *s)
 
     Must(s);
 
-    AsyncCall::Pointer call = commCbCall(5,5, "SomeCommAcceptHandler(httpAccept)",
-                                         CommAcceptCbPtrFun(httpAccept, s));
-
-    s->listener = new Comm::ListenStateData(fd, call, true);
+    s->listener = new Comm::ListenStateData(fd, true);
+    s->listener->subscribe(5,5, "httpAccept", new CommAcceptCbPtrFun(httpAccept, s));
 
     debugs(1, 1, "Accepting " <<
            (s->intercepted ? " intercepted" : "") <<
@@ -3519,10 +3523,8 @@ clientHttpsConnectionOpened(int fd, int, http_port_list *s)
 
     Must(s);
 
-    AsyncCall::Pointer call = commCbCall(5,5, "SomeCommAcceptHandler(httpsAccept)",
-                                         CommAcceptCbPtrFun(httpsAccept, s));
-
-    s->listener = new Comm::ListenStateData(fd, call, true);
+    s->listener = new Comm::ListenStateData(fd, true);
+    s->listener->subscribe(5,5, "httpsAccept", new CommAcceptCbPtrFun(httpsAccept, s));
 
     debugs(1, 1, "Accepting HTTPS connections at " << s->s << ", FD " << fd << ".");
 
index 78f11c528f3b6eb1a15b61fb2f4500c8ea4d7961..66b19855a39523f83de68ea1e2e42536c5dbb957 100644 (file)
 #include "protos.h"
 #include "SquidTime.h"
 
-/**
- * New-style listen and accept routines
- *
- * setListen simply registers our interest in an FD for listening.
- * The constructor takes a callback to call when an FD has been
- * accept()ed some time later.
- */
-void
-Comm::ListenStateData::setListen()
-{
-    errcode = 0; // reset local errno copy.
-    if (listen(fd, Squid_MaxFD >> 2) < 0) {
-        debugs(50, 0, HERE << "listen(FD " << fd << ", " << (Squid_MaxFD >> 2) << "): " << xstrerror());
-        errcode = errno;
-        return;
-    }
-
-    if (Config.accept_filter && strcmp(Config.accept_filter, "none") != 0) {
-#ifdef SO_ACCEPTFILTER
-        struct accept_filter_arg afa;
-        bzero(&afa, sizeof(afa));
-        debugs(5, DBG_IMPORTANT, "Installing accept filter '" << Config.accept_filter << "' on FD " << fd);
-        xstrncpy(afa.af_name, Config.accept_filter, sizeof(afa.af_name));
-        if (setsockopt(fd, SOL_SOCKET, SO_ACCEPTFILTER, &afa, sizeof(afa)) < 0)
-            debugs(5, DBG_CRITICAL, "SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror());
-#elif defined(TCP_DEFER_ACCEPT)
-        int seconds = 30;
-        if (strncmp(Config.accept_filter, "data=", 5) == 0)
-            seconds = atoi(Config.accept_filter + 5);
-        if (setsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &seconds, sizeof(seconds)) < 0)
-            debugs(5, DBG_CRITICAL, "TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror());
-#else
-        debugs(5, DBG_CRITICAL, "accept_filter not supported on your OS");
-#endif
-    }
-}
-
+// TODO remove.
 Comm::ListenStateData::ListenStateData(int aFd, AsyncCall::Pointer &call, bool accept_many) :
         fd(aFd),
         errcode(0),
@@ -93,6 +57,7 @@ Comm::ListenStateData::ListenStateData(int aFd, AsyncCall::Pointer &call, bool a
     commSetSelect(fd, COMM_SELECT_READ, doAccept, this, 0);
 }
 
+// TODO remove.
 Comm::ListenStateData::ListenStateData(Comm::ConnectionPointer &conn, AsyncCall::Pointer &call, bool accept_many, const char *note) :
         errcode(0),
         isLimited(0),
@@ -123,10 +88,120 @@ Comm::ListenStateData::ListenStateData(Comm::ConnectionPointer &conn, AsyncCall:
         commSetSelect(conn->fd, COMM_SELECT_READ, doAccept, this, 0);
 }
 
+Comm::ListenStateData::ListenStateData(int aFd, bool accept_many) :
+        fd(aFd),
+        errcode(0),
+        isLimited(0),
+        callSection(NULL),
+        callLevel(NULL),
+        callName(NULL),
+        callDialer(NULL),
+        mayAcceptMore(accept_many)
+{
+    assert(aFd >= 0);
+    assert(isOpen(aFd));
+}
+
+Comm::ListenStateData::ListenStateData(Comm::ConnectionPointer &conn, bool accept_many, const char *note) :
+        errcode(0),
+        isLimited(0),
+        callSection(NULL),
+        callLevel(NULL),
+        callName(NULL),
+        callDialer(NULL),
+        mayAcceptMore(accept_many)
+{
+    /* open the conn if its not already open */
+    if (!IsConnOpen(conn)) {
+        conn->fd = comm_open(SOCK_STREAM,
+                             IPPROTO_TCP,
+                             conn->local,
+                             conn->flags,
+                             note);
+        debugs(9, 3, HERE << "Unconnected data socket created on FD " << conn->fd );
+
+        if (!conn->isOpen()) {
+            debugs(5, DBG_CRITICAL, HERE << "comm_open failed");
+            errcode = -1;
+            return;
+        }
+    }
+
+    assert(IsConnOpen(conn));
+    fd = conn->fd;
+}
+
 Comm::ListenStateData::~ListenStateData()
 {
     comm_close(fd);
     fd = -1;
+    delete callDialer;;
+}
+
+void
+Comm::ListenStateData::subscribe(int section, int level, const char *name, CommAcceptCbPtrFun *dialer)
+{
+    debugs(5, 5, HERE << "FD " << fd << " AsyncCall: " << name);
+
+    // if this is the first subscription. start listening on the socket.
+    if (callDialer == NULL)
+        setListen();
+
+    // store the subscribed handler details.
+    callSection = section;
+    callLevel = level;
+    safe_free(callName);
+    callName = xstrdup(name);
+    callDialer = dialer;
+
+    // if no error so far start accepting connections.
+    if (errcode == 0)
+        commSetSelect(fd, COMM_SELECT_READ, doAccept, this, 0);
+}
+
+void
+Comm::ListenStateData::unsubscribe()
+{
+    safe_free(callName);
+    delete callDialer;
+    callDialer = NULL;
+}
+
+/**
+ * New-style listen and accept routines
+ *
+ * setListen simply registers our interest in an FD for listening.
+ * The constructor takes a callback to call when an FD has been
+ * accept()ed some time later.
+ */
+void
+Comm::ListenStateData::setListen()
+{
+    errcode = 0; // reset local errno copy.
+    if (listen(fd, Squid_MaxFD >> 2) < 0) {
+        debugs(50, 0, HERE << "listen(FD " << fd << ", " << (Squid_MaxFD >> 2) << "): " << xstrerror());
+        errcode = errno;
+        return;
+    }
+
+    if (Config.accept_filter && strcmp(Config.accept_filter, "none") != 0) {
+#ifdef SO_ACCEPTFILTER
+        struct accept_filter_arg afa;
+        bzero(&afa, sizeof(afa));
+        debugs(5, DBG_IMPORTANT, "Installing accept filter '" << Config.accept_filter << "' on FD " << fd);
+        xstrncpy(afa.af_name, Config.accept_filter, sizeof(afa.af_name));
+        if (setsockopt(fd, SOL_SOCKET, SO_ACCEPTFILTER, &afa, sizeof(afa)) < 0)
+            debugs(5, DBG_CRITICAL, "SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror());
+#elif defined(TCP_DEFER_ACCEPT)
+        int seconds = 30;
+        if (strncmp(Config.accept_filter, "data=", 5) == 0)
+            seconds = atoi(Config.accept_filter + 5);
+        if (setsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &seconds, sizeof(seconds)) < 0)
+            debugs(5, DBG_CRITICAL, "TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror());
+#else
+        debugs(5, DBG_CRITICAL, "accept_filter not supported on your OS");
+#endif
+    }
 }
 
 /**
@@ -188,13 +263,13 @@ Comm::ListenStateData::acceptOne()
 
         if (newfd == COMM_NOMESSAGE) {
             /* register interest again */
-            debugs(5, 5, HERE << "try later: FD " << fd << " handler: " << theCallback);
+            debugs(5, 5, HERE << "try later: FD " << fd << " handler: " << callName);
             commSetSelect(fd, COMM_SELECT_READ, doAccept, this, 0);
             return;
         }
 
         // A non-recoverable error; notify the caller */
-        debugs(5, 5, HERE << "non-recoverable error: FD " << fd << " handler: " << theCallback);
+        debugs(5, 5, HERE << "non-recoverable error: FD " << fd << " handler: " << callName);
         notify(-1, COMM_ERROR, connDetails);
         mayAcceptMore = false;
         return;
@@ -202,7 +277,7 @@ Comm::ListenStateData::acceptOne()
 
     debugs(5, 5, HERE << "accepted: FD " << fd <<
            " newfd: " << newfd << " from: " << connDetails->remote <<
-           " handler: " << theCallback);
+           " handler: " << callName);
     notify(newfd, COMM_OK, connDetails);
 }
 
@@ -223,7 +298,20 @@ Comm::ListenStateData::notify(int newfd, comm_err_t flag, const Comm::Connection
         return;
     }
 
-    if (theCallback != NULL) {
+    if (callDialer != NULL) {
+        AsyncCall::Pointer call = commCbCall(callSection, callLevel, callName, *callDialer);
+        typedef CommAcceptCbParams Params;
+        Params &params = GetCommParams<Params>(call);
+        params.fd = fd;
+        params.nfd = newfd;
+        params.details = connDetails;
+        params.flag = flag;
+        params.xerrno = errcode;
+        ScheduleCallHere(call);
+        if (!mayAcceptMore)
+            unsubscribe();
+    }
+    else if (theCallback != NULL) {
         typedef CommAcceptCbParams Params;
         Params &params = GetCommParams<Params>(theCallback);
         params.fd = fd;
index bd0ca895bdfdb37858d558c8ef2b983f7e873221..1e0a953c59f77dfceb6ca3aeeaf81f58c86384bd 100644 (file)
@@ -2,7 +2,7 @@
 #define SQUID_LISTENERSTATEDATA_H
 
 #include "config.h"
-#include "base/AsyncCall.h"
+#include "CommCalls.h"
 #include "comm/comm_err_t.h"
 #include "comm/forward.h"
 
@@ -17,15 +17,38 @@ class ListenStateData
 {
 
 public:
+// old remove ASAP when subscribe is working.
     ListenStateData(int fd, AsyncCall::Pointer &call, bool accept_many); // Legacy
     ListenStateData(Comm::ConnectionPointer &conn, AsyncCall::Pointer &call, bool accept_many, const char *note);
+
+    ListenStateData(int fd, bool accept_many); // Legacy verion that uses new subscribe API.
+    ListenStateData(Comm::ConnectionPointer &conn, bool accept_many, const char *note);
     ListenStateData(const ListenStateData &r); // not implemented.
     ~ListenStateData();
 
+    // legacy. removing ASAP when below version works.
     void subscribe(AsyncCall::Pointer &call) { theCallback = call; };
+
+    /** Subscribe a handler to receive calls back about new connections.
+     * Replaces any existing subscribed handler.
+     */
+    void subscribe(int level, int section, const char *name, CommAcceptCbPtrFun *dialer);
+//    void subscribe(int level, int section, const char *name, CommAcceptMemFun *dialer);
+
+    /** Remove the currently waiting callback subscription.
+     * Pending calls will remain scheduled.
+     */
+    void unsubscribe();
+
+    /** Try and accept another connection.
+     * If any are pending it will be passed asynchronously to the subscribed callback.
+     */
     void acceptNext();
+
+    /// Call the subscribed callback handler with details about a new connection.
     void notify(int newfd, comm_err_t flag, const Comm::ConnectionPointer &details);
 
+    /// socket being listened on for new connections
     int fd;
 
     /// errno code of the last accept() or listen() action if one occurred.
@@ -34,6 +57,12 @@ public:
     /// whether this socket is delayed and on the AcceptLimiter queue.
     int32_t isLimited;
 
+private:
+    int callSection;        ///< debug section for subscribed callback.
+    int callLevel;          ///< debug level for subscribed callback.
+    char *callName;           ///< Name for the subscribed callback.
+    CommAcceptCbPtrFun *callDialer; ///< dialer to make the subscribed callback
+
 private:
     /// Method to test if there are enough file descriptors to open a new client connection
     /// if not the accept() will be postponed