]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Remove crud from Listener. handle pre-made calls nice
authorAmos Jeffries <squid3@treenet.co.nz>
Sun, 15 Aug 2010 06:45:04 +0000 (18:45 +1200)
committerAmos Jeffries <squid3@treenet.co.nz>
Sun, 15 Aug 2010 06:45:04 +0000 (18:45 +1200)
src/CommCalls.h
src/comm/ListenStateData.cc
src/comm/ListenStateData.h
src/ftp.cc

index d68bdfefd62009a426d652667826c99b59fb94a8..0a9e147f1d1c59612a31a9817c44a54f4821681d 100644 (file)
@@ -183,9 +183,6 @@ public:
     CommAcceptCbPtrFun(IOACB *aHandler, const CommAcceptCbParams &aParams);
     void dial();
 
-    // yuck. But we can't store the PtrFun dialers without it.
-//    virtual bool canDial(AsyncCall &c) { return true; }
-
     virtual void print(std::ostream &os) const;
 
 public:
index 66b19855a39523f83de68ea1e2e42536c5dbb957..20aec1d374fca213b256836c830c28e5f642cb03 100644 (file)
 #include "protos.h"
 #include "SquidTime.h"
 
-// TODO remove.
-Comm::ListenStateData::ListenStateData(int aFd, AsyncCall::Pointer &call, bool accept_many) :
-        fd(aFd),
-        errcode(0),
-        isLimited(0),
-        theCallback(call),
-        mayAcceptMore(accept_many)
-{
-    assert(aFd >= 0);
-    debugs(5, 5, HERE << "FD " << fd << " AsyncCall: " << call);
-    assert(isOpen(aFd));
-    setListen();
-    commSetSelect(fd, COMM_SELECT_READ, doAccept, this, 0);
-}
-
-// TODO remove.
-Comm::ListenStateData::ListenStateData(Comm::ConnectionPointer &conn, AsyncCall::Pointer &call, bool accept_many, const char *note) :
-        errcode(0),
-        isLimited(0),
-        theCallback(call),
-        mayAcceptMore(accept_many)
-{
-    /* open the conn if its not already open */
-    if (!IsConnOpen(conn)) {
-        conn->fd = comm_open(SOCK_STREAM,
-                             IPPROTO_TCP,
-                             conn->local,
-                             conn->flags,
-                             note);
-        debugs(9, 3, HERE << "Unconnected data socket created on FD " << conn->fd );
-
-        if (!conn->isOpen()) {
-            debugs(5, DBG_CRITICAL, HERE << "comm_open failed");
-            errcode = -1;
-            return;
-        }
-    }
-
-    assert(IsConnOpen(conn));
-    fd = conn->fd;
-    debugs(5, 5, HERE << "FD " << conn->fd << " AsyncCall: " << call);
-    setListen();
-    if (errcode == 0)
-        commSetSelect(conn->fd, COMM_SELECT_READ, doAccept, this, 0);
-}
-
 Comm::ListenStateData::ListenStateData(int aFd, bool accept_many) :
         fd(aFd),
         errcode(0),
@@ -96,6 +50,7 @@ Comm::ListenStateData::ListenStateData(int aFd, bool accept_many) :
         callLevel(NULL),
         callName(NULL),
         callDialer(NULL),
+        theCallback(NULL),
         mayAcceptMore(accept_many)
 {
     assert(aFd >= 0);
@@ -109,6 +64,7 @@ Comm::ListenStateData::ListenStateData(Comm::ConnectionPointer &conn, bool accep
         callLevel(NULL),
         callName(NULL),
         callDialer(NULL),
+        theCallback(NULL),
         mayAcceptMore(accept_many)
 {
     /* open the conn if its not already open */
@@ -133,9 +89,9 @@ Comm::ListenStateData::ListenStateData(Comm::ConnectionPointer &conn, bool accep
 
 Comm::ListenStateData::~ListenStateData()
 {
+    unsubscribe();
     comm_close(fd);
     fd = -1;
-    delete callDialer;;
 }
 
 void
@@ -144,9 +100,12 @@ Comm::ListenStateData::subscribe(int section, int level, const char *name, CommA
     debugs(5, 5, HERE << "FD " << fd << " AsyncCall: " << name);
 
     // if this is the first subscription. start listening on the socket.
-    if (callDialer == NULL)
+    if (callDialer == NULL && theCallback == NULL)
         setListen();
 
+    // remove old subscription. if any.
+    unsubscribe();
+
     // store the subscribed handler details.
     callSection = section;
     callLevel = level;
@@ -159,12 +118,33 @@ Comm::ListenStateData::subscribe(int section, int level, const char *name, CommA
         commSetSelect(fd, COMM_SELECT_READ, doAccept, this, 0);
 }
 
+void
+Comm::ListenStateData::subscribe(const AsyncCall::Pointer &call)
+{
+    debugs(5, 5, HERE << "FD " << fd << " AsyncCall: " << call);
+
+    // remove old subscription. if any.
+    unsubscribe();
+
+    // store new callback subscription
+    theCallback = call;
+
+    // start listening on the socket.
+    if (theCallback != NULL) {
+        setListen();
+        // if no error so far start accepting connections.
+        if (errcode == 0)
+            commSetSelect(fd, COMM_SELECT_READ, doAccept, this, 0);
+    }
+}
+
 void
 Comm::ListenStateData::unsubscribe()
 {
     safe_free(callName);
     delete callDialer;
     callDialer = NULL;
+    theCallback = NULL;
 }
 
 /**
@@ -320,8 +300,9 @@ Comm::ListenStateData::notify(int newfd, comm_err_t flag, const Comm::Connection
         params.flag = flag;
         params.xerrno = errcode;
         ScheduleCallHere(theCallback);
-        if (!mayAcceptMore)
-            theCallback = NULL;
+        // only permit the call to be scheduled once.
+        mayAcceptMore = false;
+        theCallback = NULL;
     }
 }
 
index 1e0a953c59f77dfceb6ca3aeeaf81f58c86384bd..bc5ec9a4989f8091b96d7730f1a9e7fb2dd57ca1 100644 (file)
@@ -17,31 +17,30 @@ class ListenStateData
 {
 
 public:
-// old remove ASAP when subscribe is working.
-    ListenStateData(int fd, AsyncCall::Pointer &call, bool accept_many); // Legacy
-    ListenStateData(Comm::ConnectionPointer &conn, AsyncCall::Pointer &call, bool accept_many, const char *note);
-
     ListenStateData(int fd, bool accept_many); // Legacy verion that uses new subscribe API.
     ListenStateData(Comm::ConnectionPointer &conn, bool accept_many, const char *note);
     ListenStateData(const ListenStateData &r); // not implemented.
     ~ListenStateData();
 
-    // legacy. removing ASAP when below version works.
-    void subscribe(AsyncCall::Pointer &call) { theCallback = call; };
-
     /** Subscribe a handler to receive calls back about new connections.
      * Replaces any existing subscribed handler.
      */
     void subscribe(int level, int section, const char *name, CommAcceptCbPtrFun *dialer);
-//    void subscribe(int level, int section, const char *name, CommAcceptMemFun *dialer);
+
+    /** Subscribe a handler to receive calls back about new connections.
+     * Replaces any existing subscribed handler.
+     * Due to not being able to re-use calls, only permits one to be received.
+     */
+    void subscribe(const AsyncCall::Pointer &call);
 
     /** Remove the currently waiting callback subscription.
      * Pending calls will remain scheduled.
      */
     void unsubscribe();
 
-    /** Try and accept another connection.
-     * If any are pending it will be passed asynchronously to the subscribed callback.
+    /** Try and accept another connection (synchronous).
+     * If one is pending already the subscribed callback handler will be scheduled
+     * to handle it before this method returns.
      */
     void acceptNext();
 
index ef997408b554d3eb1ff347e11c3dbe341d52dca0..eefd1e0bab9c6d39a97ee2c54fc35e6a5aadcd29 100644 (file)
@@ -459,8 +459,10 @@ void
 FtpStateData::dataClosed(const CommCloseCbParams &io)
 {
     if (data.listener) {
+        data.listener->unsubscribe();
         delete data.listener;
         data.listener = NULL;
+        data.listen_conn = NULL;
         data.conn = NULL;
     }
     data.clear();
@@ -2739,7 +2741,8 @@ ftpOpenListenSocket(FtpStateData * ftpState, int fallback)
     typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
     AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection",
                                     acceptDialer(ftpState, &FtpStateData::ftpAcceptDataConnection));
-    ftpState->data.listener = new Comm::ListenStateData(conn, acceptCall, false, ftpState->entry->url());
+    ftpState->data.listener = new Comm::ListenStateData(conn, false, ftpState->entry->url());
+    ftpState->data.listener->subscribe(acceptCall);
 
     if (!ftpState->data.listener || ftpState->data.listener->errcode != 0) {
         conn->close();
@@ -2913,8 +2916,8 @@ FtpStateData::ftpAcceptDataConnection(const CommAcceptCbParams &io)
             typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
             AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection",
                                             acceptDialer(this, &FtpStateData::ftpAcceptDataConnection));
+            data.listener = new Comm::ListenStateData(data.listen_conn, false, data.host);
             data.listener->subscribe(acceptCall);
-            data.listener->acceptNext();
             return;
         }
     }
@@ -3019,7 +3022,8 @@ void FtpStateData::readStor()
         AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection",
                                         acceptDialer(this, &FtpStateData::ftpAcceptDataConnection));
 
-        data.listener = new Comm::ListenStateData(data.conn, acceptCall, false, data.host);
+        data.listener = new Comm::ListenStateData(data.conn, false, data.host);
+        data.listener->subscribe(acceptCall);
     } else {
         debugs(9, DBG_IMPORTANT, HERE << "Unexpected reply code "<< std::setfill('0') << std::setw(3) << code);
         ftpFail(this);
@@ -3153,7 +3157,8 @@ ftpReadList(FtpStateData * ftpState)
         AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection",
                                         acceptDialer(ftpState, &FtpStateData::ftpAcceptDataConnection));
 
-        ftpState->data.listener = new Comm::ListenStateData(ftpState->data.conn, acceptCall, false, ftpState->data.host);
+        ftpState->data.listener = new Comm::ListenStateData(ftpState->data.conn, false, ftpState->data.host);
+        ftpState->data.listener->subscribe(acceptCall);
         return;
     } else if (!ftpState->flags.tried_nlst && code > 300) {
         ftpSendNlst(ftpState);
@@ -3198,7 +3203,8 @@ ftpReadRetr(FtpStateData * ftpState)
         typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
         AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection",
                                         acceptDialer(ftpState, &FtpStateData::ftpAcceptDataConnection));
-        ftpState->data.listener = new Comm::ListenStateData(ftpState->data.conn, acceptCall, false, ftpState->data.host);
+        ftpState->data.listener = new Comm::ListenStateData(ftpState->data.conn, false, ftpState->data.host);
+        ftpState->data.listener->subscribe(acceptCall);
     } else if (code >= 300) {
         if (!ftpState->flags.try_slash_hack) {
             /* Try this as a directory missing trailing slash... */
@@ -3860,6 +3866,7 @@ FtpChannel::close()
 {
     // channels with active listeners will be closed when the listener handler dies.
     if (listener) {
+        listener->unsubscribe();
         delete listener;
         listener = NULL;
         comm_remove_close_handler(conn->fd, closer);