]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Fixed several ConnOpener problems
authorAlex Rousskov <rousskov@measurement-factory.com>
Sat, 9 Feb 2013 06:36:36 +0000 (23:36 -0700)
committerAmos Jeffries <squid3@treenet.co.nz>
Sat, 9 Feb 2013 06:36:36 +0000 (23:36 -0700)
... by relying on AsyncJob protections and comm_close(), while maintaining a
tighter grip on various I/O and sleep states.

Problems addressed:

* Connection descriptor was not closed when attempting to reconnect after
  failures. We now properly close on failures, sleep with descriptor closed,
  and then reopen.

* Timeout handler was not cleaned up properly in some cases, causing memory
  leaks (for the handler Pointer) and possibly timeouts that were fired (for
  then-active handler) after the connection was passed to the initiator.

* Comm close handler was not cleaned up properly.

* statCounter.syscalls.sock.closes counter was not updated on FD closure.

* Waiting pending accepts were not kicked on FD closure.

* Connection timeout was enforced for each connection attempt instead of
  applying to all attempts taken together.

and possibly other problems. The full extent of all side-effects of mishandled
race conditions and state conflicts is probably unknown.

src/comm/ConnOpener.cc
src/comm/ConnOpener.h

index 0b763c981dad99ccf653adbec180327db2d18adc..7f19c50b000b6167917a8c4d55e1019626e562ae 100644 (file)
@@ -23,8 +23,7 @@ Comm::ConnOpener::ConnOpener(Comm::ConnectionPointer &c, AsyncCall::Pointer &han
         callback_(handler),
         totalTries_(0),
         failRetries_(0),
-        connectTimeout_(ctimeout),
-        connectStart_(0)
+        deadline_(squid_curtime + static_cast<time_t>(ctimeout))
 {}
 
 Comm::ConnOpener::~ConnOpener()
@@ -45,36 +44,24 @@ Comm::ConnOpener::doneAll() const
         return AsyncJob::doneAll();
     }
 
+    // otherwise, we must be waiting for something
+    Must(temporaryFd_ >= 0 || calls_.sleep_);
     return false;
 }
 
 void
 Comm::ConnOpener::swanSong()
 {
-    // cancel any event watchers
-    // done here to get the "swanSong" mention in cancel debugging.
-    if (calls_.earlyAbort_ != NULL) {
-        calls_.earlyAbort_->cancel("Comm::ConnOpener::swanSong");
-        calls_.earlyAbort_ = NULL;
-    }
-    if (calls_.timeout_ != NULL) {
-        calls_.timeout_->cancel("Comm::ConnOpener::swanSong");
-        calls_.timeout_ = NULL;
-    }
-
     if (callback_ != NULL) {
-        if (callback_->canceled())
-            callback_ = NULL;
-        else
-            // inform the still-waiting caller we are dying
-            doneConnecting(COMM_ERR_CONNECT, 0);
+        // inform the still-waiting caller we are dying
+        sendAnswer(COMM_ERR_CONNECT, 0, "Comm::ConnOpener::swanSong");
     }
 
-    // rollback what we can from the job state
-    if (temporaryFd_ >= 0) {
-        // doneConnecting() handles partial FD connection cleanup
-        doneConnecting(COMM_ERR_CONNECT, 0);
-    }
+    if (temporaryFd_ >= 0)
+        closeFd();
+
+    if (calls_.sleep_)
+        cancelSleep();
 
     AsyncJob::swanSong();
 }
@@ -100,14 +87,13 @@ Comm::ConnOpener::getHost() const
 /**
  * Connection attempt are completed. One way or the other.
  * Pass the results back to the external handler.
- * NP: on errors the earlyAbort call should be cancelled first with a reason.
  */
 void
-Comm::ConnOpener::doneConnecting(comm_err_t status, int xerrno)
+Comm::ConnOpener::sendAnswer(comm_err_t errFlag, int xerrno, const char *why)
 {
     // only mark the address good/bad AFTER connect is finished.
     if (host_ != NULL) {
-        if (xerrno == 0)
+        if (xerrno == 0) // XXX: should not we use errFlag instead?
             ipcacheMarkGoodAddr(host_, conn_->remote);
         else {
             ipcacheMarkBadAddr(host_, conn_->remote);
@@ -119,34 +105,105 @@ Comm::ConnOpener::doneConnecting(comm_err_t status, int xerrno)
     }
 
     if (callback_ != NULL) {
-        typedef CommConnectCbParams Params;
-        Params &params = GetCommParams<Params>(callback_);
-        params.conn = conn_;
-        params.flag = status;
-        params.xerrno = xerrno;
-        ScheduleCallHere(callback_);
+        // avoid scheduling cancelled callbacks, assuming they are common
+        // enough to make this extra check an optimization
+        if (callback_->canceled()) {
+            debugs(5, 4, conn_ << " not calling canceled " << *callback_ <<
+                   " [" << callback_->id << ']' );
+        } else {
+            typedef CommConnectCbParams Params;
+            Params &params = GetCommParams<Params>(callback_);
+            params.conn = conn_;
+            params.flag = errFlag;
+            params.xerrno = xerrno;
+            ScheduleCallHere(callback_);
+        }
         callback_ = NULL;
     }
 
-    if (temporaryFd_ >= 0) {
-        debugs(5, 4, HERE << conn_ << " closing temp FD " << temporaryFd_);
-        // it never reached fully open, so cleanup the FD handlers
-        // Note that comm_close() sequence does not happen for partially open FD
-        Comm::SetSelect(temporaryFd_, COMM_SELECT_WRITE, NULL, NULL, 0);
+    // The job will stop without this call because nil callback_ makes
+    // doneAll() true, but this explicit call creates nicer debugging.
+    mustStop(why);
+}
+
+/// cleans up this job I/O state without closing temporaryFd
+/// required before closing temporaryFd or keeping it in conn_
+/// leaves FD bare so must only be called via closeFd() or keepFd()
+void
+Comm::ConnOpener::cleanFd()
+{
+    debugs(5, 4, HERE << conn_ << " closing temp FD " << temporaryFd_);
+
+    Must(temporaryFd_ >= 0);
+    fde &f = fd_table[temporaryFd_];
+
+    // Our write_handler was set without using Comm::Write API, so we cannot
+    // use a cancellable Pointer-free job callback and simply cancel it here.
+    if (f.write_handler) {
+
+        /* XXX: We are about to remove write_handler, which was responsible
+         * for deleting write_data, so we have to delete write_data
+         * ourselves. Comm currently calls SetSelect handlers synchronously
+         * so if write_handler is set, we know it has not been called yet.
+         * ConnOpener converts that sync call into an async one, but only
+         * after deleting ptr, so that is not a problem.
+         */
+
+        delete static_cast<Pointer*>(f.write_data);
+        f.write_data = NULL;
+        f.write_handler = NULL;
+    }
+    // Comm::DoSelect does not do this when calling and resetting write_handler
+    // (because it expects more writes to come?). We could mimic that
+    // optimization by resetting Comm "Select" state only when the FD is
+    // actually closed.
+    Comm::SetSelect(temporaryFd_, COMM_SELECT_WRITE, NULL, NULL, 0);
+
+    if (calls_.timeout_ != NULL) {
+        calls_.timeout_->cancel("Comm::ConnOpener::cleanFd");
+        calls_.timeout_ = NULL;
+    }
+    // Comm checkTimeouts() and commCloseAllSockets() do not clear .timeout
+    // when calling timeoutHandler (XXX fix them), so we clear unconditionally.
+    f.timeoutHandler = NULL;
+    f.timeout = 0;
+
+    if (calls_.earlyAbort_ != NULL) {
+        comm_remove_close_handler(temporaryFd_, calls_.earlyAbort_);
         calls_.earlyAbort_ = NULL;
-        if (calls_.timeout_ != NULL) {
-            calls_.timeout_->cancel("Comm::ConnOpener::doneConnecting");
-            calls_.timeout_ = NULL;
-        }
-        fd_table[temporaryFd_].timeoutHandler = NULL;
-        fd_table[temporaryFd_].timeout = 0;
-        close(temporaryFd_);
-        fd_close(temporaryFd_);
-        temporaryFd_ = -1;
     }
+}
+
+/// cleans I/O state and ends I/O for temporaryFd_
+void
+Comm::ConnOpener::closeFd()
+{
+    if (temporaryFd_ < 0)
+        return;
+
+    cleanFd();
+
+    // comm_close() below uses COMMIO_FD_WRITECB(fd)->active() to clear Comm
+    // "Select" state. It will not clear ours. XXX: It should always clear
+    // because a callback may have been active but was called before comm_close
+    // Update: we now do this in cleanFd()
+    // Comm::SetSelect(temporaryFd_, COMM_SELECT_WRITE, NULL, NULL, 0);
 
-    /* ensure cleared local state, we are done. */
-    conn_ = NULL;
+    comm_close(temporaryFd_);
+    temporaryFd_ = -1;
+}
+
+/// cleans I/O state and moves temporaryFd_ to the conn_ for long-term use
+void
+Comm::ConnOpener::keepFd()
+{
+    Must(conn_ != NULL);
+    Must(temporaryFd_ >= 0);
+
+    cleanFd();
+
+    conn_->fd = temporaryFd_;
+    temporaryFd_ = -1;
 }
 
 void
@@ -154,17 +211,41 @@ Comm::ConnOpener::start()
 {
     Must(conn_ != NULL);
 
-    /* get a socket open ready for connecting with */
+    /* outbound sockets have no need to be protocol agnostic. */
+    if (!(Ip::EnableIpv6&IPV6_SPECIAL_V4MAPPING) && conn_->remote.IsIPv4()) {
+        conn_->local.SetIPv4();
+    }
+
+    if (createFd())
+        connect();
+}
+
+/// called at the end of Comm::ConnOpener::DelayedConnectRetry event
+void
+Comm::ConnOpener::restart()
+{
+    debugs(5, 5, conn_ << " restarting after sleep");
+    calls_.sleep_ = false;
+
+    if (createFd())
+        connect();
+}
+
+/// Create a socket for the future connection or return false.
+/// If false is returned, done() is guaranteed to return true and end the job.
+bool
+Comm::ConnOpener::createFd()
+{
+    Must(temporaryFd_ < 0);
+
+    // our initators signal abort by cancelling their callbacks
+    if (callback_ == NULL || callback_->canceled())
+        return false;
+
+    temporaryFd_ = comm_openex(SOCK_STREAM, IPPROTO_TCP, conn_->local, conn_->flags, conn_->tos, conn_->nfmark, host_);
     if (temporaryFd_ < 0) {
-        /* outbound sockets have no need to be protocol agnostic. */
-        if (!(Ip::EnableIpv6&IPV6_SPECIAL_V4MAPPING) && conn_->remote.IsIPv4()) {
-            conn_->local.SetIPv4();
-        }
-        temporaryFd_ = comm_openex(SOCK_STREAM, IPPROTO_TCP, conn_->local, conn_->flags, conn_->tos, conn_->nfmark, host_);
-        if (temporaryFd_ < 0) {
-            doneConnecting(COMM_ERR_CONNECT, 0);
-            return;
-        }
+        sendAnswer(COMM_ERR_CONNECT, 0, "Comm::ConnOpener::createFd");
+        return false;
     }
 
     typedef CommCbMemFunT<Comm::ConnOpener, CommCloseCbParams> abortDialer;
@@ -173,26 +254,25 @@ Comm::ConnOpener::start()
 
     typedef CommCbMemFunT<Comm::ConnOpener, CommTimeoutCbParams> timeoutDialer;
     calls_.timeout_ = JobCallback(5, 4, timeoutDialer, this, Comm::ConnOpener::timeout);
-    debugs(5, 3, HERE << conn_ << " timeout " << connectTimeout_);
+    debugs(5, 3, conn_ << " will timeout in " << (deadline_ - squid_curtime));
 
-    // Update the fd_table directly because conn_ is not yet storing the FD
+    // Update the fd_table directly because commSetConnTimeout() needs open conn_
     assert(temporaryFd_ < Squid_MaxFD);
     assert(fd_table[temporaryFd_].flags.open);
     typedef CommTimeoutCbParams Params;
     Params &params = GetCommParams<Params>(calls_.timeout_);
     params.conn = conn_;
     fd_table[temporaryFd_].timeoutHandler = calls_.timeout_;
-    fd_table[temporaryFd_].timeout = squid_curtime + (time_t) connectTimeout_;
+    fd_table[temporaryFd_].timeout = deadline_;
 
-    connectStart_ = squid_curtime;
-    connect();
+    return true;
 }
 
 void
 Comm::ConnOpener::connected()
 {
-    conn_->fd = temporaryFd_;
-    temporaryFd_ = -1;
+    Must(temporaryFd_ >= 0);
+    keepFd();
 
     /*
      * stats.conn_open is used to account for the number of
@@ -210,64 +290,81 @@ Comm::ConnOpener::connected()
      *       Also, legacy code still depends on comm_local_port() with no access to Comm::Connection
      *       when those are done comm_local_port can become one of our member functions to do the below.
      */
-    fd_table[conn_->fd].flags.open = 1;
+    Must(fd_table[conn_->fd].flags.open);
     fd_table[conn_->fd].local_addr = conn_->local;
+
+    sendAnswer(COMM_OK, 0, "Comm::ConnOpener::connected");
 }
 
-/** Make an FD connection attempt.
- * Handles the case(s) when a partially setup connection gets closed early.
- */
+/// Make an FD connection attempt.
 void
 Comm::ConnOpener::connect()
 {
     Must(conn_ != NULL);
-
-    // our parent Jobs signal abort by cancelling their callbacks.
-    if (callback_ == NULL || callback_->canceled())
-        return;
+    Must(temporaryFd_ >= 0);
 
     ++ totalTries_;
 
     switch (comm_connect_addr(temporaryFd_, conn_->remote) ) {
 
     case COMM_INPROGRESS:
-        // check for timeout FIRST.
-        if (squid_curtime - connectStart_ > connectTimeout_) {
-            debugs(5, 5, HERE << conn_ << ": * - ERR took too long already.");
-            calls_.earlyAbort_->cancel("Comm::ConnOpener::connect timed out");
-            doneConnecting(COMM_TIMEOUT, errno);
-            return;
-        } else {
-            debugs(5, 5, HERE << conn_ << ": COMM_INPROGRESS");
-            Comm::SetSelect(temporaryFd_, COMM_SELECT_WRITE, Comm::ConnOpener::InProgressConnectRetry, new Pointer(this), 0);
-        }
+        debugs(5, 5, HERE << conn_ << ": COMM_INPROGRESS");
+        Comm::SetSelect(temporaryFd_, COMM_SELECT_WRITE, Comm::ConnOpener::InProgressConnectRetry, new Pointer(this), 0);
         break;
 
     case COMM_OK:
         debugs(5, 5, HERE << conn_ << ": COMM_OK - connected");
         connected();
-        doneConnecting(COMM_OK, 0);
         break;
 
-    default:
+    default: {
+        const int xerrno = errno;
+
         ++failRetries_;
+        debugs(5, 7, conn_ << ": failure #" << failRetries_ << " <= " <<
+               Config.connect_retries << ": " << xstrerr(xerrno));
 
-        // check for timeout FIRST.
-        if (squid_curtime - connectStart_ > connectTimeout_) {
-            debugs(5, 5, HERE << conn_ << ": * - ERR took too long to receive response.");
-            calls_.earlyAbort_->cancel("Comm::ConnOpener::connect timed out");
-            doneConnecting(COMM_TIMEOUT, errno);
-        } else if (failRetries_ < Config.connect_retries) {
+        if (failRetries_ < Config.connect_retries) {
             debugs(5, 5, HERE << conn_ << ": * - try again");
-            eventAdd("Comm::ConnOpener::DelayedConnectRetry", Comm::ConnOpener::DelayedConnectRetry, new Pointer(this), 0.05, 0, false);
+            sleep();
             return;
         } else {
             // send ERROR back to the upper layer.
             debugs(5, 5, HERE << conn_ << ": * - ERR tried too many times already.");
-            calls_.earlyAbort_->cancel("Comm::ConnOpener::connect failed");
-            doneConnecting(COMM_ERR_CONNECT, errno);
+            sendAnswer(COMM_ERR_CONNECT, xerrno, "Comm::ConnOpener::connect");
         }
     }
+    }
+}
+
+/// Close and wait a little before trying to open and connect again.
+void
+Comm::ConnOpener::sleep()
+{
+    Must(!calls_.sleep_);
+    closeFd();
+    calls_.sleep_ = true;
+    eventAdd("Comm::ConnOpener::DelayedConnectRetry",
+             Comm::ConnOpener::DelayedConnectRetry,
+             new Pointer(this), 0.05, 0, false);
+}
+
+/// cleans up this job sleep state
+void
+Comm::ConnOpener::cancelSleep()
+{
+    if (calls_.sleep_) {
+        // It would be nice to delete the sleep event, but it might be out of
+        // the event queue and in the async queue already, so (a) we do not know
+        // whether we can safely delete the call ptr here and (b) eventDelete()
+        // will assert if the event went async. Thus, we let the event run so
+        // that it deletes the call ptr [after this job is gone]. Note that we
+        // are called only when the job ends so this "hanging event" will do
+        // nothing but deleting the call ptr.  TODO: Revise eventDelete() API.
+        // eventDelete(Comm::ConnOpener::DelayedConnectRetry, calls_.sleep);
+        calls_.sleep_ = false;
+        debugs(5, 9, conn_ << " stops sleeping");
+    }
 }
 
 /**
@@ -298,7 +395,9 @@ void
 Comm::ConnOpener::earlyAbort(const CommCloseCbParams &io)
 {
     debugs(5, 3, HERE << io.conn);
-    doneConnecting(COMM_ERR_CLOSING, io.xerrno); // NP: is closing or shutdown better?
+    calls_.earlyAbort_ = NULL;
+    // NP: is closing or shutdown better?
+    sendAnswer(COMM_ERR_CLOSING, io.xerrno, "Comm::ConnOpener::earlyAbort");
 }
 
 /**
@@ -308,7 +407,9 @@ Comm::ConnOpener::earlyAbort(const CommCloseCbParams &io)
 void
 Comm::ConnOpener::timeout(const CommTimeoutCbParams &)
 {
-    connect();
+    debugs(5, 5, HERE << conn_ << ": * - ERR took too long to receive response.");
+    calls_.timeout_ = NULL;
+    sendAnswer(COMM_TIMEOUT, ETIMEDOUT, "Comm::ConnOpener::timeout");
 }
 
 /* Legacy Wrapper for the retry event after COMM_INPROGRESS
@@ -330,7 +431,7 @@ Comm::ConnOpener::InProgressConnectRetry(int fd, void *data)
 }
 
 /* Legacy Wrapper for the retry event with small delay after errors.
- * XXX: As soon as eventAdd() accepts Async calls we can use a ConnOpener::connect call
+ * XXX: As soon as eventAdd() accepts Async calls we can use a ConnOpener::restart call
  */
 void
 Comm::ConnOpener::DelayedConnectRetry(void *data)
@@ -341,7 +442,7 @@ Comm::ConnOpener::DelayedConnectRetry(void *data)
         // Ew. we are now outside the all AsyncJob protections.
         // get back inside by scheduling another call...
         typedef NullaryMemFunT<Comm::ConnOpener> Dialer;
-        AsyncCall::Pointer call = JobCallback(5, 4, Dialer, cs, Comm::ConnOpener::connect);
+        AsyncCall::Pointer call = JobCallback(5, 4, Dialer, cs, Comm::ConnOpener::restart);
         ScheduleCallHere(call);
     }
     delete ptr;
index 45e4c41287a388ac5c6b0abcd4af8d27d2c7a32c..8769f615b8be91e6a6069c5dd46bdac767b7c325 100644 (file)
@@ -40,13 +40,23 @@ private:
 
     void earlyAbort(const CommCloseCbParams &);
     void timeout(const CommTimeoutCbParams &);
-    void doneConnecting(comm_err_t status, int xerrno);
+    void sendAnswer(comm_err_t errFlag, int xerrno, const char *why);
     static void InProgressConnectRetry(int fd, void *data);
     static void DelayedConnectRetry(void *data);
     void connect();
     void connected();
     void lookupLocalAddress();
 
+    void sleep();
+    void restart();
+
+    bool createFd();
+    void closeFd();
+    void keepFd();
+    void cleanFd();
+
+    void cancelSleep();
+
 private:
     char *host_;                         ///< domain name we are trying to connect to.
     int temporaryFd_;                    ///< the FD being opened. Do NOT set conn_->fd until it is fully open.
@@ -56,19 +66,16 @@ private:
     int totalTries_;   ///< total number of connection attempts over all destinations so far.
     int failRetries_;  ///< number of retries current destination has been tried.
 
-    /**
-     * time at which to abandon the connection.
-     * the connection-done callback will be passed COMM_TIMEOUT
-     */
-    time_t connectTimeout_;
-
-    /// time at which this series of connection attempts was started.
-    time_t connectStart_;
+    /// if we are not done by then, we will call back with COMM_TIMEOUT
+    time_t deadline_;
 
     /// handles to calls which we may need to cancel.
     struct Calls {
         AsyncCall::Pointer earlyAbort_;
         AsyncCall::Pointer timeout_;
+        /// Whether we are idling before retrying to connect; not yet a call
+        /// [that we can cancel], but it will probably become one eventually.
+        bool sleep_;
     } calls_;
 
     CBDATA_CLASS2(ConnOpener);