]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Bug 5055: FATAL FwdState::noteDestinationsEnd exception: opening (#967)
authorAlex Rousskov <rousskov@measurement-factory.com>
Tue, 8 Feb 2022 08:56:43 +0000 (03:56 -0500)
committerGitHub <noreply@github.com>
Tue, 8 Feb 2022 08:56:43 +0000 (21:56 +1300)
* Bug 5055: FATAL FwdState::noteDestinationsEnd exception: opening

The bug was caused by commit 25b0ce4. Other known symptoms are:

    assertion failed: store.cc:1793: "isEmpty()"
    assertion failed: FwdState.cc:501: "serverConnection() == conn"
    assertion failed: FwdState.cc:1037: "!opening()"

This change has several overlapping parts. Unfortunately, merging
individual parts is both difficult and likely to cause crashes.

## Part 1: Bug 5055.

FwdState used to check serverConn to decide whether to open a connection
to forward the request. Since commit 25b0ce4, a nil serverConn pointer
no longer implies that a new connection should be opened: FwdState
helper jobs may already be working on preparing an existing open
connection (e.g., sending a CONNECT request or negotiating encryption).

Bad serverConn checks in both FwdState::noteDestination() and
FwdState::noteDestinationsEnd() methods led to extra connectStart()
calls creating two conflicting concurrent helper jobs.

To fix this, we replaced direct serverConn inspection with a
usingDestination() call which also checks whether we are waiting for a
helper job. Testing that fix exposed another set of bugs: The helper job
pointers or in-job connections left stale/set after forwarding failures.
The changes described below addressed (most of) those problems.

## Part 2: Connection establishing helper jobs and their callbacks

A proper fix for Bug 5055 required answering a difficult question: When
should a dying job call its callbacks? We only found one answer which
required cooperation from the job creator and led to the following
rules:

* AsyncJob destructors must not call any callbacks.

* AsyncJob::swanSong() is responsible for async-calling any remaining
  (i.e. set, uncalled, and uncancelled) callbacks.

* AsyncJob::swanSong() is called (only) for started jobs.

* AsyncJob destructing sequence should validate that no callbacks remain
  uncalled for started jobs.

... where an AsyncJob x is considered "started" if AsyncJob::Start(x)
has returned without throwing.

A new JobWait class helps job creators follow these rules while keeping
track on in-progress helper jobs and killing no-longer-needed helpers.

Also fixed very similar bugs in tunnel.cc code.

## Part 3: ConnOpener fixes

1. Many ConnOpener users are written to keep a ConnectionPointer to the
   destination given to ConnOpener. This means that their connection
   magically opens when ConnOpener successfully connects, before
   ConnOpener has a chance to notify the user about the changes. Having
   multiple concurrent connection owners is always dangerous, and the
   user cannot even have a close handler registered for its now-open
   connection. When something happens to ConnOpener or its answer, the
   user job may be in trouble. Now, ConnOpener callers no longer pass
   Connection objects they own, cloning them as needed. That adjustment
   required adjustment 2:

2. Refactored ConnOpener users to stop assuming that the answer contains
   a pointer to their connection object. After adjustment 1 above, it
   does not. HappyConnOpener relied on that assumption quite a bit so we
   had to refactor to use two custom callback methods instead of one
   with a complicated if-statement distinguishing prime from spare
   attempts. This refactoring is an overall improvement because it
   simplifies the code. Other ConnOpener users just needed to remove a
   few no longer valid paranoid assertions/Musts.

3. ConnOpener users were forced to remember to close params.conn when
   processing negative answers. Some, naturally, forgot, triggering
   warnings about orphaned connections (e.g., Ident and TcpLogger).
   ConnOpener now closes its open connection before sending a negative
   answer.

4. ConnOpener would trigger orphan connection warnings if the job ended
   after opening the connection but without supplying the connection to
   the requestor (e.g., because the requestor has gone away). Now,
   ConnOpener explicitly closes its open connection if it has not been
   sent to the requestor.

Also fixed Comm::ConnOpener::cleanFd() debugging that was incorrectly
saying that the method closes the temporary descriptor.

Also fixed ConnOpener callback's syncWithComm(): The stale
CommConnectCbParams override was testing unused (i.e. always negative)
CommConnectCbParams::fd and was trying to cancel the callback that most
(possibly all) recipients rely on: ConnOpener users expect a negative
answer rather than no answer at all.

Also, after comparing the needs of two old/existing and a temporary
added ("clone everything") Connection cloning method callers, we decided
there is no need to maintain three different methods. All existing
callers should be fine with a single method because none of them suffers
from "extra" copying of members that others need. Right now, the new
cloneProfile() method copies everything except FD and a few
special-purpose members (with documented reasons for not copying).

Also added Comm::Connection::cloneDestinationDetails() debugging to
simplify tracking dependencies between half-baked Connection objects
carrying destination/flags/other metadata and open Connection objects
created by ConnOpener using that metadata (which are later delivered to
ConnOpener users and, in some cases, replace those half-baked
connections mentioned earlier. Long-term, we need to find a better way
to express these and other Connection states/stages than comments and
debugging messages.

## Part 4: Comm::Connection closure callbacks

We improved many closure callbacks to make sure (to the extent possible)
that Connection and other objects are in sync with Comm. There are lots
of small bugs, inconsistencies, and other problems in Connection closure
handlers. It is not clear whether any of those problems could result in
serious runtime errors or leaks. In theory, the rest of the code could
neutralize their negative side effects. However, even in that case, it
was just a matter of time before the next bug will bite us due to stale
Connection::fd and such. These changes themselves carry elevated risk,
but they get us closer to reliable code as far as Connection maintenance
is concerned; otherwise, we will keep chasing their deadly side effects.

Long-term, all these manual efforts to keep things in sync should become
unnecessary with the introduction of appropriate Connection ownership
APIs that automatically maintain the corresponding environments (TODO).

## Part 5: Other notable improvements in the adjusted code

Improved Security::PeerConnector::serverConn and
Http::Tunneler::connection management, especially when sending negative
answers. When sending a negative answer, we would set answer().conn to
an open connection, async-send that answer, and then hurry to close the
connection using our pointer to the shared Connection object. If
everything went according to the plan, the recipient would get a non-nil
but closed Connection object. Now, a negative answer simply means no
connection at all. Same for a tunneled answer.

Refactored ICAP connection-establishing code to to delay Connection
ownership until the ICAP connection is fully ready. This change
addresses primary Connection ownership concerns (as they apply to this
ICAP code) except orphaning of the temporary Connection object by helper
job start exceptions (now an explicit XXX). For example, the transaction
no longer shares a Connection object with ConnOpener and
IcapPeerConnector jobs.

Probably fixed a bug where PeerConnector::negotiate() assumed that a
sslFinalized() does not return true after callBack(). It may (e.g., when
CertValidationHelper::Submit() throws). Same for
PeekingPeerConnector::checkForPeekAndSpliceMatched().

Fixed FwdState::advanceDestination() bug that did not save
ERR_GATEWAY_FAILURE details and "lost" the address of that failed
destination, making it unavailable to future retries (if any).

Polished PeerPoolMgr, Ident, and Gopher code to be able to fix similar
job callback and connection management issues.

Polished AsyncJob::Start() API. Start() used to return a job pointer,
but that was a bad idea:

* It implies that a failed Start() will return a nil pointer, and that
  the caller should check the result. Neither is true.

* It encourages callers to dereference the returned pointer to further
  adjust the job. That technically works (today) but violates the rules
  of communicating with an async job. The Start() method is the boundary
  after which the job is deemed asynchronous.

Also removed old "and wait for..." post-Start() comments because the
code itself became clear enough, and those comments were becoming
increasingly stale (because they duplicated the callback names above
them).

Fix Tunneler and PeerConnector handling of last-resort callback
requirements. Events like handleStopRequest() and callException() stop
the job but should not be reported as a BUG (e.g., it would be up to the
callException() to decide how to report the caught exception). There
might (or there will) be other, similar cases where the job is stopped
prematurely for some non-BUG reason beyond swanSong() knowledge. The
existence of non-bug cases does not mean there could be no bugs worth
reporting here, but until they can be identified more reliably than all
these benign/irrelevant cases, reporting no BUGs is a (much) lesser
evil.

TODO: Revise AsyncJob::doneAll(). Many of its overrides are written to
check for both positive (i.e. mission accomplished) and negative (i.e.
mission cancelled or cannot be accomplished) conditions, but the latter
is usually unnecessary, especially after we added handleStopRequest()
API to properly support external job cancellation events. Many doneAll()
overrides can probably be greatly simplified.

----

Cherry picked SQUID-568-premature-serverconn-use-v5 commit 22b5f78.

* fixup: Cherry-picked SQUID-568-premature-serverconn-use PR-time fixes

In git log order:
e64a6c1: Undone an out-of-scope change and added a missing 'auto'
aeaf83d: Fixed an 'unused parameter' error
f49d009: fixup: No explicit destructors with implicit copying methods
c30c37f: Removed an unnecessary explicit copy constructor
012f5ec: Excluded Connection::rfc931 from cloning
366c78a: ICAP: do not set connect_timeout on the established conn...

This branch is now in sync with SQUID-568-premature-serverconn-use (S)
commit e64a6c1 (except for official changes merged from master/v6 into S
closer to the end of PR 877 work (i.e. S' merge commit 0a7432a).

* Fix FATAL ServiceRep::putConnection exception: theBusyConns > 0

    FATAL: check failed: theBusyConns > 0
        exception location: ServiceRep.cc(163) putConnection

Since master/v6 commit 2b6b1bc, a timeout on a ready-to-shovel
Squid-service ICAP connection was decrementing theBusyConns level one
extra time because Adaptation::Icap::Xaction::noteCommTimedout() started
calling both noteConnectionFailed() and closeConnection(). Depending on
the actual theBusyConns level, the extra decrement could result in FATAL
errors later, when putConnection() was called (for a different ICAP
transaction) with zero theBusyConns in an exception-unprotected context.

Throughout these changes, Xaction still counts the above timeouts as a
service failure. That is done by calling ServiceRep::noteFailure() from
Xaction::callException(), including in timeout cases described above.

----

Cherry-picked master/v6 commit a8ac892.

60 files changed:
src/BodyPipe.cc
src/CommCalls.cc
src/Downloader.cc
src/FwdState.cc
src/FwdState.h
src/HappyConnOpener.cc
src/HappyConnOpener.h
src/PeerPoolMgr.cc
src/PeerPoolMgr.h
src/ResolvedPeers.cc
src/adaptation/icap/ModXact.cc
src/adaptation/icap/ModXact.h
src/adaptation/icap/OptXact.cc
src/adaptation/icap/OptXact.h
src/adaptation/icap/ServiceRep.cc
src/adaptation/icap/ServiceRep.h
src/adaptation/icap/Xaction.cc
src/adaptation/icap/Xaction.h
src/base/AsyncJob.cc
src/base/AsyncJob.h
src/base/JobWait.cc [new file with mode: 0644]
src/base/JobWait.h [new file with mode: 0644]
src/base/Makefile.am
src/base/forward.h
src/client_side.cc
src/clients/FtpClient.cc
src/clients/FtpClient.h
src/clients/FtpGateway.cc
src/clients/FtpRelay.cc
src/clients/HttpTunneler.cc
src/clients/HttpTunneler.h
src/clients/forward.h
src/comm.cc
src/comm/ConnOpener.cc
src/comm/ConnOpener.h
src/comm/Connection.cc
src/comm/Connection.h
src/comm/TcpAcceptor.cc
src/dns_internal.cc
src/eui/Eui48.h
src/gopher.cc
src/ident/Ident.cc
src/log/TcpLogger.cc
src/log/TcpLogger.h
src/mgr/Forwarder.cc
src/mgr/Inquirer.cc
src/mgr/StoreToCommWriter.cc
src/security/PeerConnector.cc
src/security/PeerConnector.h
src/security/forward.h
src/servers/FtpServer.cc
src/servers/FtpServer.h
src/snmp/Forwarder.cc
src/snmp/Inquirer.cc
src/ssl/PeekingPeerConnector.cc
src/ssl/PeekingPeerConnector.h
src/tests/stub_libcomm.cc
src/tests/stub_libsecurity.cc
src/tunnel.cc
src/whois.cc

index 9abe7d48bc4b685d94ff886fa27511bb2f701870..37a3b32cd4689d0b8cbada08e72f00df4f805e77 100644 (file)
@@ -335,6 +335,7 @@ BodyPipe::startAutoConsumptionIfNeeded()
         return;
 
     theConsumer = new BodySink(this);
+    AsyncJob::Start(theConsumer);
     debugs(91,7, HERE << "starting auto consumption" << status());
     scheduleBodyDataNotification();
 }
index 4cd503dcb356bf9939b3e9fdb4937c403a57ff02..cca23f6e30b5350ba280ca661415e14d677dc33a 100644 (file)
@@ -53,6 +53,9 @@ CommAcceptCbParams::CommAcceptCbParams(void *aData):
 {
 }
 
+// XXX: Add CommAcceptCbParams::syncWithComm(). Adjust syncWithComm() API if all
+// implementations always return true.
+
 void
 CommAcceptCbParams::print(std::ostream &os) const
 {
@@ -72,13 +75,24 @@ CommConnectCbParams::CommConnectCbParams(void *aData):
 bool
 CommConnectCbParams::syncWithComm()
 {
-    // drop the call if the call was scheduled before comm_close but
-    // is being fired after comm_close
-    if (fd >= 0 && fd_table[fd].closing()) {
-        debugs(5, 3, HERE << "dropping late connect call: FD " << fd);
-        return false;
+    assert(conn);
+
+    // change parameters if this is a successful callback that was scheduled
+    // after its Comm-registered connection started to close
+
+    if (flag != Comm::OK) {
+        assert(!conn->isOpen());
+        return true; // not a successful callback; cannot go out of sync
     }
-    return true; // now we are in sync and can handle the call
+
+    assert(conn->isOpen());
+    if (!fd_table[conn->fd].closing())
+        return true; // no closing in progress; in sync (for now)
+
+    debugs(5, 3, "converting to Comm::ERR_CLOSING: " << conn);
+    conn->noteClosure();
+    flag = Comm::ERR_CLOSING;
+    return true; // now the callback is in sync with Comm again
 }
 
 /* CommIoCbParams */
index 298c3a836603cbed6955e3467dced074759a5bec..012387fb8ed5c321fabde1c8c7ff8b74174aed3a 100644 (file)
@@ -81,6 +81,10 @@ void
 Downloader::swanSong()
 {
     debugs(33, 6, this);
+
+    if (callback_) // job-ending emergencies like handleStopRequest() or callException()
+        callBack(Http::scInternalServerError);
+
     if (context_) {
         context_->finished();
         context_ = nullptr;
@@ -251,6 +255,7 @@ Downloader::downloadFinished()
 void
 Downloader::callBack(Http::StatusCode const statusCode)
 {
+    assert(callback_);
     CbDialer *dialer = dynamic_cast<CbDialer*>(callback_->getDialer());
     Must(dialer);
     dialer->status = statusCode;
index b721017c3b3e68ccbe78bc69b8e7ad5bab55dd35..b69e60c7ccb10039629b770be2d1d088b64a1122 100644 (file)
@@ -207,26 +207,22 @@ FwdState::stopAndDestroy(const char *reason)
 {
     debugs(17, 3, "for " << reason);
 
-    if (opening())
-        cancelOpening(reason);
+    cancelStep(reason);
 
     PeerSelectionInitiator::subscribed = false; // may already be false
     self = nullptr; // we hope refcounting destroys us soon; may already be nil
     /* do not place any code here as this object may be gone by now */
 }
 
-/// Notify connOpener that we no longer need connections. We do not have to do
-/// this -- connOpener would eventually notice on its own, but notifying reduces
-/// waste and speeds up spare connection opening for other transactions (that
-/// could otherwise wait for this transaction to use its spare allowance).
+/// Notify a pending subtask, if any, that we no longer need its help. We do not
+/// have to do this -- the subtask job will eventually end -- but ending it
+/// earlier reduces waste and may reduce DoS attack surface.
 void
-FwdState::cancelOpening(const char *reason)
+FwdState::cancelStep(const char *reason)
 {
-    assert(calls.connector);
-    calls.connector->cancel(reason);
-    calls.connector = nullptr;
-    notifyConnOpener();
-    connOpener.clear();
+    transportWait.cancel(reason);
+    encryptionWait.cancel(reason);
+    peerWait.cancel(reason);
 }
 
 #if STRICT_ORIGINAL_DST
@@ -348,8 +344,7 @@ FwdState::~FwdState()
 
     entry = NULL;
 
-    if (opening())
-        cancelOpening("~FwdState");
+    cancelStep("~FwdState");
 
     if (Comm::IsConnOpen(serverConn))
         closeServerConnection("~FwdState");
@@ -501,8 +496,17 @@ FwdState::fail(ErrorState * errorState)
     if (!errorState->request)
         errorState->request = request;
 
-    if (err->type != ERR_ZERO_SIZE_OBJECT)
-        return;
+    if (err->type == ERR_ZERO_SIZE_OBJECT)
+        reactToZeroSizeObject();
+
+    destinationReceipt = nullptr; // may already be nil
+}
+
+/// ERR_ZERO_SIZE_OBJECT requires special adjustments
+void
+FwdState::reactToZeroSizeObject()
+{
+    assert(err->type == ERR_ZERO_SIZE_OBJECT);
 
     if (pconnRace == racePossible) {
         debugs(17, 5, HERE << "pconn race happened");
@@ -566,6 +570,8 @@ FwdState::complete()
 
         if (Comm::IsConnOpen(serverConn))
             unregister(serverConn);
+        serverConn = nullptr;
+        destinationReceipt = nullptr;
 
         storedWholeReply_ = nullptr;
         entry->reset();
@@ -584,6 +590,12 @@ FwdState::complete()
     }
 }
 
+bool
+FwdState::usingDestination() const
+{
+    return encryptionWait || peerWait || Comm::IsConnOpen(serverConn);
+}
+
 void
 FwdState::markStoredReplyAsWhole(const char * const whyWeAreSure)
 {
@@ -613,19 +625,19 @@ FwdState::noteDestination(Comm::ConnectionPointer path)
 
     destinations->addPath(path);
 
-    if (Comm::IsConnOpen(serverConn)) {
+    if (usingDestination()) {
         // We are already using a previously opened connection, so we cannot be
-        // waiting for connOpener. We still receive destinations for backup.
-        Must(!opening());
+        // waiting for it. We still receive destinations for backup.
+        Must(!transportWait);
         return;
     }
 
-    if (opening()) {
+    if (transportWait) {
         notifyConnOpener();
         return; // and continue to wait for FwdState::noteConnection() callback
     }
 
-    // This is the first path candidate we have seen. Create connOpener.
+    // This is the first path candidate we have seen. Use it.
     useDestinations();
 }
 
@@ -653,19 +665,19 @@ FwdState::noteDestinationsEnd(ErrorState *selectionError)
     // if all of them fail, forwarding as whole will fail
     Must(!selectionError); // finding at least one path means selection succeeded
 
-    if (Comm::IsConnOpen(serverConn)) {
+    if (usingDestination()) {
         // We are already using a previously opened connection, so we cannot be
-        // waiting for connOpener. We were receiving destinations for backup.
-        Must(!opening());
+        // waiting for it. We were receiving destinations for backup.
+        Must(!transportWait);
         return;
     }
 
-    Must(opening()); // or we would be stuck with nothing to do or wait for
+    Must(transportWait); // or we would be stuck with nothing to do or wait for
     notifyConnOpener();
     // and continue to wait for FwdState::noteConnection() callback
 }
 
-/// makes sure connOpener knows that destinations have changed
+/// makes sure connection opener knows that the destinations have changed
 void
 FwdState::notifyConnOpener()
 {
@@ -674,7 +686,7 @@ FwdState::notifyConnOpener()
     } else {
         debugs(17, 7, "notifying about " << *destinations);
         destinations->notificationPending = true;
-        CallJobHere(17, 5, connOpener, HappyConnOpener, noteCandidatesChange);
+        CallJobHere(17, 5, transportWait.job(), HappyConnOpener, noteCandidatesChange);
     }
 }
 
@@ -684,7 +696,7 @@ static void
 fwdServerClosedWrapper(const CommCloseCbParams &params)
 {
     FwdState *fwd = (FwdState *)params.data;
-    fwd->serverClosed(params.fd);
+    fwd->serverClosed();
 }
 
 /**** PRIVATE *****************************************************************/
@@ -754,13 +766,23 @@ FwdState::checkRetriable()
 }
 
 void
-FwdState::serverClosed(int fd)
+FwdState::serverClosed()
 {
-    // XXX: fd is often -1 here
-    debugs(17, 2, "FD " << fd << " " << entry->url() << " after " <<
-           (fd >= 0 ? fd_table[fd].pconn.uses : -1) << " requests");
-    if (fd >= 0 && serverConnection()->fd == fd)
-        fwdPconnPool->noteUses(fd_table[fd].pconn.uses);
+    // XXX: This method logic attempts to tolerate Connection::close() called
+    // for serverConn earlier, by one of our dispatch()ed jobs. If that happens,
+    // serverConn will already be closed here or, worse, it will already be open
+    // for the next forwarding attempt. The current code prevents us getting
+    // stuck, but the long term solution is to stop sharing serverConn.
+    debugs(17, 2, serverConn);
+    if (Comm::IsConnOpen(serverConn)) {
+        const auto uses = fd_table[serverConn->fd].pconn.uses;
+        debugs(17, 3, "prior uses: " << uses);
+        fwdPconnPool->noteUses(uses); // XXX: May not have come from fwdPconnPool
+        serverConn->noteClosure();
+    }
+    serverConn = nullptr;
+    closeHandler = nullptr;
+    destinationReceipt = nullptr;
     retryOrBail();
 }
 
@@ -802,6 +824,8 @@ FwdState::handleUnregisteredServerEnd()
 {
     debugs(17, 2, HERE << "self=" << self << " err=" << err << ' ' << entry->url());
     assert(!Comm::IsConnOpen(serverConn));
+    serverConn = nullptr;
+    destinationReceipt = nullptr;
     retryOrBail();
 }
 
@@ -819,6 +843,8 @@ FwdState::advanceDestination(const char *stepDescription, const Comm::Connection
     } catch (...) {
         debugs (17, 2, "exception while trying to " << stepDescription << ": " << CurrentException);
         closePendingConnection(conn, "connection preparation exception");
+        if (!err)
+            fail(new ErrorState(ERR_GATEWAY_FAILURE, Http::scInternalServerError, request, al));
         retryOrBail();
     }
 }
@@ -830,8 +856,7 @@ FwdState::noteConnection(HappyConnOpener::Answer &answer)
 {
     assert(!destinationReceipt);
 
-    calls.connector = nullptr;
-    connOpener.clear();
+    transportWait.finish();
 
     Must(n_tries <= answer.n_tries); // n_tries cannot decrease
     n_tries = answer.n_tries;
@@ -843,6 +868,8 @@ FwdState::noteConnection(HappyConnOpener::Answer &answer)
         Must(!Comm::IsConnOpen(answer.conn));
         answer.error.clear(); // preserve error for errorSendComplete()
     } else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) {
+        // The socket could get closed while our callback was queued. Sync
+        // Connection. XXX: Connection::fd may already be stale/invalid here.
         // We do not know exactly why the connection got closed, so we play it
         // safe, allowing retries only for persistent (reused) connections
         if (answer.reused) {
@@ -912,23 +939,24 @@ FwdState::establishTunnelThruProxy(const Comm::ConnectionPointer &conn)
     if (!conn->getPeer()->options.no_delay)
         tunneler->setDelayId(entry->mem_obj->mostBytesAllowed());
 #endif
-    AsyncJob::Start(tunneler);
-    // and wait for the tunnelEstablishmentDone() call
+    peerWait.start(tunneler, callback);
 }
 
 /// resumes operations after the (possibly failed) HTTP CONNECT exchange
 void
 FwdState::tunnelEstablishmentDone(Http::TunnelerAnswer &answer)
 {
+    peerWait.finish();
+
     ErrorState *error = nullptr;
     if (!answer.positive()) {
-        Must(!Comm::IsConnOpen(answer.conn));
+        Must(!answer.conn);
         error = answer.squidError.get();
         Must(error);
         answer.squidError.clear(); // preserve error for fail()
     } else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) {
-        // The socket could get closed while our callback was queued.
-        // We close Connection here to sync Connection::fd.
+        // The socket could get closed while our callback was queued. Sync
+        // Connection. XXX: Connection::fd may already be stale/invalid here.
         closePendingConnection(answer.conn, "conn was closed while waiting for tunnelEstablishmentDone");
         error = new ErrorState(ERR_CANNOT_FORWARD, Http::scServiceUnavailable, request, al);
     } else if (!answer.leftovers.isEmpty()) {
@@ -998,18 +1026,21 @@ FwdState::secureConnectionToPeer(const Comm::ConnectionPointer &conn)
 #endif
         connector = new Security::BlindPeerConnector(requestPointer, conn, callback, al, sslNegotiationTimeout);
     connector->noteFwdPconnUse = true;
-    AsyncJob::Start(connector); // will call our callback
+    encryptionWait.start(connector, callback);
 }
 
 /// called when all negotiations with the TLS-speaking peer have been completed
 void
 FwdState::connectedToPeer(Security::EncryptorAnswer &answer)
 {
+    encryptionWait.finish();
+
     ErrorState *error = nullptr;
     if ((error = answer.error.get())) {
-        Must(!Comm::IsConnOpen(answer.conn));
+        assert(!answer.conn);
         answer.error.clear(); // preserve error for errorSendComplete()
     } else if (answer.tunneled) {
+        assert(!answer.conn);
         // TODO: When ConnStateData establishes tunnels, its state changes
         // [in ways that may affect logging?]. Consider informing
         // ConnStateData about our tunnel or otherwise unifying tunnel
@@ -1019,6 +1050,8 @@ FwdState::connectedToPeer(Security::EncryptorAnswer &answer)
         complete(); // destroys us
         return;
     } else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) {
+        // The socket could get closed while our callback was queued. Sync
+        // Connection. XXX: Connection::fd may already be stale/invalid here.
         closePendingConnection(answer.conn, "conn was closed while waiting for connectedToPeer");
         error = new ErrorState(ERR_CANNOT_FORWARD, Http::scServiceUnavailable, request, al);
     }
@@ -1091,22 +1124,20 @@ FwdState::connectStart()
     Must(!request->pinnedConnection());
 
     assert(!destinations->empty());
-    assert(!opening());
+    assert(!usingDestination());
 
     // Ditch error page if it was created before.
     // A new one will be created if there's another problem
     delete err;
     err = nullptr;
     request->clearError();
-    serverConn = nullptr;
-    destinationReceipt = nullptr;
 
     request->hier.startPeerClock();
 
-    calls.connector = asyncCall(17, 5, "FwdState::noteConnection", HappyConnOpener::CbDialer<FwdState>(&FwdState::noteConnection, this));
+    AsyncCall::Pointer callback = asyncCall(17, 5, "FwdState::noteConnection", HappyConnOpener::CbDialer<FwdState>(&FwdState::noteConnection, this));
 
     HttpRequest::Pointer cause = request;
-    const auto cs = new HappyConnOpener(destinations, calls.connector, cause, start_t, n_tries, al);
+    const auto cs = new HappyConnOpener(destinations, callback, cause, start_t, n_tries, al);
     cs->setHost(request->url.host());
     bool retriable = checkRetriable();
     if (!retriable && Config.accessList.serverPconnForNonretriable) {
@@ -1118,8 +1149,7 @@ FwdState::connectStart()
     cs->setRetriable(retriable);
     cs->allowPersistent(pconnRace != raceHappened);
     destinations->notificationPending = true; // start() is async
-    connOpener = cs;
-    AsyncJob::Start(cs);
+    transportWait.start(cs, callback);
 }
 
 /// send request on an existing connection dedicated to the requesting client
index de75f33cc3fdb2ab2cbab2eb5a43e087b186dd72..ebf5f82b15e677537f74f09335b22608f400846c 100644 (file)
@@ -9,13 +9,12 @@
 #ifndef SQUID_FORWARD_H
 #define SQUID_FORWARD_H
 
-#include "base/CbcPointer.h"
 #include "base/forward.h"
+#include "base/JobWait.h"
 #include "base/RefCount.h"
 #include "clients/forward.h"
 #include "comm.h"
 #include "comm/Connection.h"
-#include "comm/ConnOpener.h"
 #include "error/forward.h"
 #include "fde.h"
 #include "http/StatusCode.h"
@@ -38,7 +37,6 @@ class ResolvedPeers;
 typedef RefCount<ResolvedPeers> ResolvedPeersPointer;
 
 class HappyConnOpener;
-typedef CbcPointer<HappyConnOpener> HappyConnOpenerPointer;
 class HappyConnOpenerAnswer;
 
 /// Sets initial TOS value and Netfilter for the future outgoing connection.
@@ -87,7 +85,7 @@ public:
     void handleUnregisteredServerEnd();
     int reforward();
     bool reforwardableStatus(const Http::StatusCode s) const;
-    void serverClosed(int fd);
+    void serverClosed();
     void connectStart();
     void connectDone(const Comm::ConnectionPointer & conn, Comm::Flag status, int xerrno);
     bool checkRetry();
@@ -115,6 +113,9 @@ private:
     /* PeerSelectionInitiator API */
     virtual void noteDestination(Comm::ConnectionPointer conn) override;
     virtual void noteDestinationsEnd(ErrorState *selectionError) override;
+    /// whether the successfully selected path destination or the established
+    /// server connection is still in use
+    bool usingDestination() const;
 
     void noteConnection(HappyConnOpenerAnswer &);
 
@@ -157,13 +158,10 @@ private:
     /// \returns the time left for this connection to become connected or 1 second if it is less than one second left
     time_t connectingTimeout(const Comm::ConnectionPointer &conn) const;
 
-    /// whether we are waiting for HappyConnOpener
-    /// same as calls.connector but may differ from connOpener.valid()
-    bool opening() const { return connOpener.set(); }
-
-    void cancelOpening(const char *reason);
+    void cancelStep(const char *reason);
 
     void notifyConnOpener();
+    void reactToZeroSizeObject();
 
     void updateAleWithFinalError();
 
@@ -182,11 +180,6 @@ private:
     time_t start_t;
     int n_tries; ///< the number of forwarding attempts so far
 
-    // AsyncCalls which we set and may need cancelling.
-    struct {
-        AsyncCall::Pointer connector;  ///< a call linking us to the ConnOpener producing serverConn.
-    } calls;
-
     struct {
         bool connected_okay; ///< TCP link ever opened properly. This affects retry of POST,PUT,CONNECT,etc
         bool dont_retry;
@@ -194,7 +187,16 @@ private:
         bool destinationsFound; ///< at least one candidate path found
     } flags;
 
-    HappyConnOpenerPointer connOpener; ///< current connection opening job
+    /// waits for a transport connection to the peer to be established/opened
+    JobWait<HappyConnOpener> transportWait;
+
+    /// waits for the established transport connection to be secured/encrypted
+    JobWait<Security::PeerConnector> encryptionWait;
+
+    /// waits for an HTTP CONNECT tunnel through a cache_peer to be negotiated
+    /// over the (encrypted, if needed) transport connection to that cache_peer
+    JobWait<Http::Tunneler> peerWait;
+
     ResolvedPeersPointer destinations; ///< paths for forwarding the request
     Comm::ConnectionPointer serverConn; ///< a successfully opened connection to a server.
     PeerConnectionPointer destinationReceipt; ///< peer selection result (or nil)
index 6e08725065ff499608b032d2d01c66b6733e5123..6d83ff14f5d254b7a7e4aec920638909075b1156 100644 (file)
@@ -18,6 +18,7 @@
 #include "neighbors.h"
 #include "pconn.h"
 #include "PeerPoolMgr.h"
+#include "sbuf/Stream.h"
 #include "SquidConfig.h"
 
 CBDATA_CLASS_INIT(HappyConnOpener);
@@ -330,6 +331,8 @@ HappyConnOpener::HappyConnOpener(const ResolvedPeers::Pointer &dests, const Asyn
     fwdStart(aFwdStart),
     callback_(aCall),
     destinations(dests),
+    prime(&HappyConnOpener::notePrimeConnectDone, "HappyConnOpener::notePrimeConnectDone"),
+    spare(&HappyConnOpener::noteSpareConnectDone, "HappyConnOpener::noteSpareConnectDone"),
     ale(anAle),
     cause(request),
     n_tries(tries)
@@ -410,33 +413,43 @@ HappyConnOpener::swanSong()
     AsyncJob::swanSong();
 }
 
+/// HappyConnOpener::Attempt printer for debugging
+std::ostream &
+operator <<(std::ostream &os, const HappyConnOpener::Attempt &attempt)
+{
+    if (!attempt.path)
+        os << '-';
+    else if (attempt.path->isOpen())
+        os << "FD " << attempt.path->fd;
+    else if (attempt.connWait)
+        os << attempt.connWait;
+    else // destination is known; connection closed (and we are not opening any)
+        os << attempt.path->id;
+    return os;
+}
+
 const char *
 HappyConnOpener::status() const
 {
-    static MemBuf buf;
-    buf.reset();
+    // TODO: In a redesigned status() API, the caller may mimic this approach.
+    static SBuf buf;
+    buf.clear();
 
-    buf.append(" [", 2);
+    SBufStream os(buf);
+
+    os.write(" [", 2);
     if (stopReason)
-        buf.appendf("Stopped, reason:%s", stopReason);
-    if (prime) {
-        if (prime.path && prime.path->isOpen())
-            buf.appendf(" prime FD %d", prime.path->fd);
-        else if (prime.connector)
-            buf.appendf(" prime call%ud", prime.connector->id.value);
-    }
-    if (spare) {
-        if (spare.path && spare.path->isOpen())
-            buf.appendf(" spare FD %d", spare.path->fd);
-        else if (spare.connector)
-            buf.appendf(" spare call%ud", spare.connector->id.value);
-    }
+        os << "Stopped:" << stopReason;
+    if (prime)
+        os << "prime:" << prime;
+    if (spare)
+        os << "spare:" << spare;
     if (n_tries)
-        buf.appendf(" tries %d", n_tries);
-    buf.appendf(" %s%u]", id.prefix(), id.value);
-    buf.terminate();
+        os << " tries:" << n_tries;
+    os << ' ' << id << ']';
 
-    return buf.content();
+    buf = os.buf();
+    return buf.c_str();
 }
 
 /// Create "503 Service Unavailable" or "504 Gateway Timeout" error depending
@@ -516,7 +529,7 @@ void
 HappyConnOpener::startConnecting(Attempt &attempt, PeerConnectionPointer &dest)
 {
     Must(!attempt.path);
-    Must(!attempt.connector);
+    Must(!attempt.connWait);
     Must(dest);
 
     const auto bumpThroughPeer = cause->flags.sslBumped && dest->getPeer();
@@ -552,51 +565,52 @@ HappyConnOpener::openFreshConnection(Attempt &attempt, PeerConnectionPointer &de
     entry->mem_obj->checkUrlChecksum();
 #endif
 
-    GetMarkingsToServer(cause.getRaw(), *dest);
+    const auto conn = dest->cloneProfile();
+    GetMarkingsToServer(cause.getRaw(), *conn);
 
-    // ConnOpener modifies its destination argument so we reset the source port
-    // in case we are reusing the destination already used by our predecessor.
-    dest->local.port(0);
     ++n_tries;
 
     typedef CommCbMemFunT<HappyConnOpener, CommConnectCbParams> Dialer;
-    AsyncCall::Pointer callConnect = JobCallback(48, 5, Dialer, this, HappyConnOpener::connectDone);
+    AsyncCall::Pointer callConnect = asyncCall(48, 5, attempt.callbackMethodName,
+                                     Dialer(this, attempt.callbackMethod));
     const time_t connTimeout = dest->connectTimeout(fwdStart);
-    Comm::ConnOpener *cs = new Comm::ConnOpener(dest, callConnect, connTimeout);
-    if (!dest->getPeer())
+    auto cs = new Comm::ConnOpener(conn, callConnect, connTimeout);
+    if (!conn->getPeer())
         cs->setHost(host_);
 
-    attempt.path = dest;
-    attempt.connector = callConnect;
-    attempt.opener = cs;
+    attempt.path = dest; // but not the being-opened conn!
+    attempt.connWait.start(cs, callConnect);
+}
 
-    AsyncJob::Start(cs);
+/// Comm::ConnOpener callback for the prime connection attempt
+void
+HappyConnOpener::notePrimeConnectDone(const CommConnectCbParams &params)
+{
+    handleConnOpenerAnswer(prime, params, "new prime connection");
 }
 
-/// called by Comm::ConnOpener objects after a prime or spare connection attempt
-/// completes (successfully or not)
+/// Comm::ConnOpener callback for the spare connection attempt
 void
-HappyConnOpener::connectDone(const CommConnectCbParams &params)
+HappyConnOpener::noteSpareConnectDone(const CommConnectCbParams &params)
 {
-    Must(params.conn);
-    const bool itWasPrime = (params.conn == prime.path);
-    const bool itWasSpare = (params.conn == spare.path);
-    Must(itWasPrime != itWasSpare);
-
-    PeerConnectionPointer handledPath;
-    if (itWasPrime) {
-        handledPath = prime.path;
-        prime.finish();
-    } else {
-        handledPath = spare.path;
-        spare.finish();
-        if (gotSpareAllowance) {
-            TheSpareAllowanceGiver.jobUsedAllowance();
-            gotSpareAllowance = false;
-        }
+    if (gotSpareAllowance) {
+        TheSpareAllowanceGiver.jobUsedAllowance();
+        gotSpareAllowance = false;
     }
+    handleConnOpenerAnswer(spare, params, "new spare connection");
+}
+
+/// prime/spare-agnostic processing of a Comm::ConnOpener result
+void
+HappyConnOpener::handleConnOpenerAnswer(Attempt &attempt, const CommConnectCbParams &params, const char *what)
+{
+    Must(params.conn);
+
+    // finalize the previously selected path before attempt.finish() forgets it
+    auto handledPath = attempt.path;
+    handledPath.finalize(params.conn); // closed on errors
+    attempt.finish();
 
-    const char *what = itWasPrime ? "new prime connection" : "new spare connection";
     if (params.flag == Comm::OK) {
         sendSuccess(handledPath, false, what);
         return;
@@ -605,7 +619,6 @@ HappyConnOpener::connectDone(const CommConnectCbParams &params)
     debugs(17, 8, what << " failed: " << params.conn);
     if (const auto peer = params.conn->getPeer())
         peerConnectFailed(peer);
-    params.conn->close(); // TODO: Comm::ConnOpener should do this instead.
 
     // remember the last failure (we forward it if we cannot connect anywhere)
     lastFailedConnection = handledPath;
@@ -881,13 +894,23 @@ HappyConnOpener::ranOutOfTimeOrAttempts() const
     return false;
 }
 
+HappyConnOpener::Attempt::Attempt(const CallbackMethod method, const char *methodName):
+    callbackMethod(method),
+    callbackMethodName(methodName)
+{
+}
+
+void
+HappyConnOpener::Attempt::finish()
+{
+    connWait.finish();
+    path = nullptr;
+}
+
 void
 HappyConnOpener::Attempt::cancel(const char *reason)
 {
-    if (connector) {
-        connector->cancel(reason);
-        CallJobHere(17, 3, opener, Comm::ConnOpener, noteAbort);
-    }
-    clear();
+    connWait.cancel(reason);
+    path = nullptr;
 }
 
index 4f3135c60437740772578733deece2b38b50dbda..c57c431ad8a43360ecf2bb5008036373c4157630 100644 (file)
@@ -156,22 +156,28 @@ private:
     /// a connection opening attempt in progress (or falsy)
     class Attempt {
     public:
+        /// HappyConnOpener method implementing a ConnOpener callback
+        using CallbackMethod = void (HappyConnOpener::*)(const CommConnectCbParams &);
+
+        Attempt(const CallbackMethod method, const char *methodName);
+
         explicit operator bool() const { return static_cast<bool>(path); }
 
         /// reacts to a natural attempt completion (successful or otherwise)
-        void finish() { clear(); }
+        void finish();
 
         /// aborts an in-progress attempt
         void cancel(const char *reason);
 
         PeerConnectionPointer path; ///< the destination we are connecting to
-        AsyncCall::Pointer connector; ///< our opener callback
-        Comm::ConnOpener::Pointer opener; ///< connects to path and calls us
 
-    private:
-        /// cleans up after the attempt ends (successfully or otherwise)
-        void clear() { path = nullptr; connector = nullptr; opener = nullptr; }
+        /// waits for a connection to the peer to be established/opened
+        JobWait<Comm::ConnOpener> connWait;
+
+        const CallbackMethod callbackMethod; ///< ConnOpener calls this method
+        const char * const callbackMethodName; ///< for callbackMethod debugging
     };
+    friend std::ostream &operator <<(std::ostream &, const Attempt &);
 
     /* AsyncJob API */
     virtual void start() override;
@@ -190,7 +196,9 @@ private:
     void openFreshConnection(Attempt &, PeerConnectionPointer &);
     bool reuseOldConnection(PeerConnectionPointer &);
 
-    void connectDone(const CommConnectCbParams &);
+    void notePrimeConnectDone(const CommConnectCbParams &);
+    void noteSpareConnectDone(const CommConnectCbParams &);
+    void handleConnOpenerAnswer(Attempt &, const CommConnectCbParams &, const char *connDescription);
 
     void checkForNewConnection();
 
index 2caa09f44272d3684025d6accab5963d8e6d134a..7423dd669ec824db588f1478700dfb13965cb218 100644 (file)
@@ -43,9 +43,8 @@ public:
 PeerPoolMgr::PeerPoolMgr(CachePeer *aPeer): AsyncJob("PeerPoolMgr"),
     peer(cbdataReference(aPeer)),
     request(),
-    opener(),
-    securer(),
-    closer(),
+    transportWait(),
+    encryptionWait(),
     addrUsed(0)
 {
 }
@@ -90,7 +89,7 @@ PeerPoolMgr::doneAll() const
 void
 PeerPoolMgr::handleOpenedConnection(const CommConnectCbParams &params)
 {
-    opener = NULL;
+    transportWait.finish();
 
     if (!validPeer()) {
         debugs(48, 3, "peer gone");
@@ -100,9 +99,6 @@ PeerPoolMgr::handleOpenedConnection(const CommConnectCbParams &params)
     }
 
     if (params.flag != Comm::OK) {
-        /* it might have been a timeout with a partially open link */
-        if (params.conn != NULL)
-            params.conn->close();
         peerConnectFailed(peer);
         checkpoint("conn opening failure"); // may retry
         return;
@@ -112,20 +108,16 @@ PeerPoolMgr::handleOpenedConnection(const CommConnectCbParams &params)
 
     // Handle TLS peers.
     if (peer->secure.encryptTransport) {
-        typedef CommCbMemFunT<PeerPoolMgr, CommCloseCbParams> CloserDialer;
-        closer = JobCallback(48, 3, CloserDialer, this,
-                             PeerPoolMgr::handleSecureClosure);
-        comm_add_close_handler(params.conn->fd, closer);
-
-        securer = asyncCall(48, 4, "PeerPoolMgr::handleSecuredPeer",
-                            MyAnswerDialer(this, &PeerPoolMgr::handleSecuredPeer));
+        // XXX: Exceptions orphan params.conn
+        AsyncCall::Pointer callback = asyncCall(48, 4, "PeerPoolMgr::handleSecuredPeer",
+                                                MyAnswerDialer(this, &PeerPoolMgr::handleSecuredPeer));
 
         const int peerTimeout = peerConnectTimeout(peer);
         const int timeUsed = squid_curtime - params.conn->startTime();
         // Use positive timeout when less than one second is left for conn.
         const int timeLeft = positiveTimeout(peerTimeout - timeUsed);
-        auto *connector = new Security::BlindPeerConnector(request, params.conn, securer, nullptr, timeLeft);
-        AsyncJob::Start(connector); // will call our callback
+        const auto connector = new Security::BlindPeerConnector(request, params.conn, callback, nullptr, timeLeft);
+        encryptionWait.start(connector, callback);
         return;
     }
 
@@ -144,16 +136,7 @@ PeerPoolMgr::pushNewConnection(const Comm::ConnectionPointer &conn)
 void
 PeerPoolMgr::handleSecuredPeer(Security::EncryptorAnswer &answer)
 {
-    Must(securer != NULL);
-    securer = NULL;
-
-    if (closer != NULL) {
-        if (answer.conn != NULL)
-            comm_remove_close_handler(answer.conn->fd, closer);
-        else
-            closer->cancel("securing completed");
-        closer = NULL;
-    }
+    encryptionWait.finish();
 
     if (!validPeer()) {
         debugs(48, 3, "peer gone");
@@ -162,35 +145,33 @@ PeerPoolMgr::handleSecuredPeer(Security::EncryptorAnswer &answer)
         return;
     }
 
+    assert(!answer.tunneled);
     if (answer.error.get()) {
-        if (answer.conn != NULL)
-            answer.conn->close();
+        assert(!answer.conn);
         // PeerConnector calls peerConnectFailed() for us;
         checkpoint("conn securing failure"); // may retry
         return;
     }
 
-    pushNewConnection(answer.conn);
-}
+    assert(answer.conn);
 
-void
-PeerPoolMgr::handleSecureClosure(const CommCloseCbParams &params)
-{
-    Must(closer != NULL);
-    Must(securer != NULL);
-    securer->cancel("conn closed by a 3rd party");
-    securer = NULL;
-    closer = NULL;
-    // allow the closing connection to fully close before we check again
-    Checkpoint(this, "conn closure while securing");
+    // The socket could get closed while our callback was queued. Sync
+    // Connection. XXX: Connection::fd may already be stale/invalid here.
+    if (answer.conn->isOpen() && fd_table[answer.conn->fd].closing()) {
+        answer.conn->noteClosure();
+        checkpoint("external connection closure"); // may retry
+        return;
+    }
+
+    pushNewConnection(answer.conn);
 }
 
 void
 PeerPoolMgr::openNewConnection()
 {
     // KISS: Do nothing else when we are already doing something.
-    if (opener != NULL || securer != NULL || shutting_down) {
-        debugs(48, 7, "busy: " << opener << '|' << securer << '|' << shutting_down);
+    if (transportWait || encryptionWait || shutting_down) {
+        debugs(48, 7, "busy: " << transportWait << '|' << encryptionWait << '|' << shutting_down);
         return; // there will be another checkpoint when we are done opening/securing
     }
 
@@ -227,9 +208,9 @@ PeerPoolMgr::openNewConnection()
 
     const int ctimeout = peerConnectTimeout(peer);
     typedef CommCbMemFunT<PeerPoolMgr, CommConnectCbParams> Dialer;
-    opener = JobCallback(48, 5, Dialer, this, PeerPoolMgr::handleOpenedConnection);
-    Comm::ConnOpener *cs = new Comm::ConnOpener(conn, opener, ctimeout);
-    AsyncJob::Start(cs);
+    AsyncCall::Pointer callback = JobCallback(48, 5, Dialer, this, PeerPoolMgr::handleOpenedConnection);
+    const auto cs = new Comm::ConnOpener(conn, callback, ctimeout);
+    transportWait.start(cs, callback);
 }
 
 void
index 217994393a357b8d2a904ebb77538f1b10a9cb00..6da61b10bc0a4d8e509bab133695b62c87a544a2 100644 (file)
@@ -10,6 +10,7 @@
 #define SQUID_PEERPOOLMGR_H
 
 #include "base/AsyncJob.h"
+#include "base/JobWait.h"
 #include "comm/forward.h"
 #include "security/forward.h"
 
@@ -54,18 +55,19 @@ protected:
     /// Security::PeerConnector callback
     void handleSecuredPeer(Security::EncryptorAnswer &answer);
 
-    /// called when the connection we are trying to secure is closed by a 3rd party
-    void handleSecureClosure(const CommCloseCbParams &params);
-
     /// the final step in connection opening (and, optionally, securing) sequence
     void pushNewConnection(const Comm::ConnectionPointer &conn);
 
 private:
     CachePeer *peer; ///< the owner of the pool we manage
     RefCount<HttpRequest> request; ///< fake HTTP request for conn opening code
-    AsyncCall::Pointer opener; ///< whether we are opening a connection
-    AsyncCall::Pointer securer; ///< whether we are securing a connection
-    AsyncCall::Pointer closer; ///< monitors conn while we are securing it
+
+    /// waits for a transport connection to the peer to be established/opened
+    JobWait<Comm::ConnOpener> transportWait;
+
+    /// waits for the established transport connection to be secured/encrypted
+    JobWait<Security::BlindPeerConnector> encryptionWait;
+
     unsigned int addrUsed; ///< counter for cycling through peer addresses
 };
 
index 5b2c6740deb6cc1df8250e831ba3d09c7cd4c082..06ab02d23ad507395ea2e5a8841a52f4836f2baf 100644 (file)
@@ -151,7 +151,7 @@ ResolvedPeers::extractFound(const char *description, const Paths::iterator &foun
         while (++pathsToSkip < paths_.size() && !paths_[pathsToSkip].available) {}
     }
 
-    const auto cleanPath = path.connection->cloneDestinationDetails();
+    const auto cleanPath = path.connection->cloneProfile();
     return PeerConnectionPointer(cleanPath, found - paths_.begin());
 }
 
index bbeebdbb56a08e944db9d5bd47f0a083a3ea8188..982f38f8d46d5f08a2448bd9a0e03a2f92124c01 100644 (file)
@@ -187,8 +187,7 @@ void Adaptation::Icap::ModXact::startWriting()
     openConnection();
 }
 
-// connection with the ICAP service established
-void Adaptation::Icap::ModXact::handleCommConnected()
+void Adaptation::Icap::ModXact::startShoveling()
 {
     Must(state.writing == State::writingConnect);
 
index f8988ac2945f69337d992dbe091ded6407b6dc32..2fda1318b8d6cc4fec46ae41da71035c27263c12 100644 (file)
@@ -155,10 +155,11 @@ public:
     virtual void noteBodyProductionEnded(BodyPipe::Pointer);
     virtual void noteBodyProducerAborted(BodyPipe::Pointer);
 
-    // comm handlers
-    virtual void handleCommConnected();
+    /* Xaction API */
+    virtual void startShoveling();
     virtual void handleCommWrote(size_t size);
     virtual void handleCommRead(size_t size);
+
     void handleCommWroteHeaders();
     void handleCommWroteBody();
 
index 5ed4fbfbb88fec82dec50e50fca4597b837eefb0..2464ea53317afa93d7faa902a3f536e43ba971d7 100644 (file)
@@ -37,7 +37,7 @@ void Adaptation::Icap::OptXact::start()
     openConnection();
 }
 
-void Adaptation::Icap::OptXact::handleCommConnected()
+void Adaptation::Icap::OptXact::startShoveling()
 {
     scheduleRead();
 
index 3811ab48fd25914b2ca95b4d2cd284f25823a7d2..725cd6225e55c8f64277c0211b4f71ef7c9c03fb 100644 (file)
@@ -30,8 +30,9 @@ public:
     OptXact(ServiceRep::Pointer &aService);
 
 protected:
+    /* Xaction API */
     virtual void start();
-    virtual void handleCommConnected();
+    virtual void startShoveling();
     virtual void handleCommWrote(size_t size);
     virtual void handleCommRead(size_t size);
 
index 6df4757126a84889011f594eca91510eed56ef66..fbd8968881527b86ac58d149c3699dbbf4532584 100644 (file)
@@ -112,9 +112,10 @@ void Adaptation::Icap::ServiceRep::noteFailure()
     // should be configurable.
 }
 
-// returns a persistent or brand new connection; negative int on failures
+// TODO: getIdleConnection() and putConnection()/noteConnectionFailed() manage a
+// "used connection slot" resource. Automate that resource tracking (RAII/etc.).
 Comm::ConnectionPointer
-Adaptation::Icap::ServiceRep::getConnection(bool retriableXact, bool &reused)
+Adaptation::Icap::ServiceRep::getIdleConnection(const bool retriableXact)
 {
     Comm::ConnectionPointer connection;
 
@@ -137,7 +138,6 @@ Adaptation::Icap::ServiceRep::getConnection(bool retriableXact, bool &reused)
     else
         theIdleConns->closeN(1);
 
-    reused = Comm::IsConnOpen(connection);
     ++theBusyConns;
     debugs(93,3, HERE << "got connection: " << connection);
     return connection;
@@ -150,7 +150,6 @@ void Adaptation::Icap::ServiceRep::putConnection(const Comm::ConnectionPointer &
     // do not pool an idle connection if we owe connections
     if (isReusable && excessConnections() == 0) {
         debugs(93, 3, HERE << "pushing pconn" << comment);
-        commUnsetConnTimeout(conn);
         theIdleConns->push(conn);
     } else {
         debugs(93, 3, HERE << (sendReset ? "RST" : "FIN") << "-closing " <<
index f2e893244edf442ee897d9bbd7ed6213a50ecb9e..cca2df4ab089c1948be029c9998ee4ae52c0fb1a 100644 (file)
@@ -85,7 +85,8 @@ public:
     bool wantsPreview(const SBuf &urlPath, size_t &wantedSize) const;
     bool allows204() const;
     bool allows206() const;
-    Comm::ConnectionPointer getConnection(bool isRetriable, bool &isReused);
+    /// \returns an idle persistent ICAP connection or nil
+    Comm::ConnectionPointer getIdleConnection(bool isRetriable);
     void putConnection(const Comm::ConnectionPointer &conn, bool isReusable, bool sendReset, const char *comment);
     void noteConnectionUse(const Comm::ConnectionPointer &conn);
     void noteConnectionFailed(const char *comment);
index 9aacf2e1bb1f0d08498e5699f056950bc82010e6..962ed17037cf68ddf6d004a14c0040d63588e2a6 100644 (file)
@@ -13,6 +13,7 @@
 #include "adaptation/icap/Config.h"
 #include "adaptation/icap/Launcher.h"
 #include "adaptation/icap/Xaction.h"
+#include "base/JobWait.h"
 #include "base/TextException.h"
 #include "comm.h"
 #include "comm/Connection.h"
@@ -79,7 +80,6 @@ Adaptation::Icap::Xaction::Xaction(const char *aTypeName, Adaptation::Icap::Serv
     icapRequest(NULL),
     icapReply(NULL),
     attempts(0),
-    connection(NULL),
     theService(aService),
     commEof(false),
     reuseConnection(true),
@@ -87,14 +87,8 @@ Adaptation::Icap::Xaction::Xaction(const char *aTypeName, Adaptation::Icap::Serv
     isRepeatable(true),
     ignoreLastWrite(false),
     waitingForDns(false),
-    stopReason(NULL),
-    connector(NULL),
-    reader(NULL),
-    writer(NULL),
-    closer(NULL),
     alep(new AccessLogEntry),
-    al(*alep),
-    cs(NULL)
+    al(*alep)
 {
     debugs(93,3, typeName << " constructed, this=" << this <<
            " [icapx" << id << ']'); // we should not call virtual status() here
@@ -150,6 +144,8 @@ static void
 icapLookupDnsResults(const ipcache_addrs *ia, const Dns::LookupDetails &, void *data)
 {
     Adaptation::Icap::Xaction *xa = static_cast<Adaptation::Icap::Xaction *>(data);
+    /// TODO: refactor with CallJobHere1, passing either std::optional (after upgrading to C++17)
+    /// or Optional<Ip::Address> (when it can take non-trivial types)
     xa->dnsLookupDone(ia);
 }
 
@@ -164,21 +160,8 @@ Adaptation::Icap::Xaction::openConnection()
     if (!TheConfig.reuse_connections)
         disableRetries(); // this will also safely drain pconn pool
 
-    bool wasReused = false;
-    connection = s.getConnection(isRetriable, wasReused);
-
-    if (wasReused && Comm::IsConnOpen(connection)) {
-        // Set comm Close handler
-        // fake the connect callback
-        // TODO: can we sync call Adaptation::Icap::Xaction::noteCommConnected here instead?
-        typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> Dialer;
-        CbcPointer<Xaction> self(this);
-        Dialer dialer(self, &Adaptation::Icap::Xaction::noteCommConnected);
-        dialer.params.conn = connection;
-        dialer.params.flag = Comm::OK;
-        // fake other parameters by copying from the existing connection
-        connector = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected", dialer);
-        ScheduleCallHere(connector);
+    if (const auto pconn = s.getIdleConnection(isRetriable)) {
+        useTransportConnection(pconn);
         return;
     }
 
@@ -207,30 +190,22 @@ Adaptation::Icap::Xaction::dnsLookupDone(const ipcache_addrs *ia)
 #if WHEN_IPCACHE_NBGETHOSTBYNAME_USES_ASYNC_CALLS
         dieOnConnectionFailure(); // throws
 #else // take a step back into protected Async call dialing.
-        // fake the connect callback
-        typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> Dialer;
-        CbcPointer<Xaction> self(this);
-        Dialer dialer(self, &Adaptation::Icap::Xaction::noteCommConnected);
-        dialer.params.conn = connection;
-        dialer.params.flag = Comm::COMM_ERROR;
-        // fake other parameters by copying from the existing connection
-        connector = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected", dialer);
-        ScheduleCallHere(connector);
+        CallJobHere(93, 3, this, Xaction, Xaction::dieOnConnectionFailure);
 #endif
         return;
     }
 
-    connection = new Comm::Connection;
-    connection->remote = ia->current();
-    connection->remote.port(s.cfg().port);
-    getOutgoingAddress(NULL, connection);
+    const Comm::ConnectionPointer conn = new Comm::Connection();
+    conn->remote = ia->current();
+    conn->remote.port(s.cfg().port);
+    getOutgoingAddress(nullptr, conn);
 
     // TODO: service bypass status may differ from that of a transaction
     typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> ConnectDialer;
-    connector = JobCallback(93,3, ConnectDialer, this, Adaptation::Icap::Xaction::noteCommConnected);
-    cs = new Comm::ConnOpener(connection, connector, TheConfig.connect_timeout(service().cfg().bypass));
+    AsyncCall::Pointer callback = JobCallback(93, 3, ConnectDialer, this, Adaptation::Icap::Xaction::noteCommConnected);
+    const auto cs = new Comm::ConnOpener(conn, callback, TheConfig.connect_timeout(service().cfg().bypass));
     cs->setHost(s.cfg().host.termedBuf());
-    AsyncJob::Start(cs.get());
+    transportWait.start(cs, callback);
 }
 
 /*
@@ -256,6 +231,8 @@ void Adaptation::Icap::Xaction::closeConnection()
             closer = NULL;
         }
 
+        commUnsetConnTimeout(connection);
+
         cancelRead(); // may not work
 
         if (reuseConnection && !doneWithIo()) {
@@ -275,54 +252,65 @@ void Adaptation::Icap::Xaction::closeConnection()
 
         writer = NULL;
         reader = NULL;
-        connector = NULL;
         connection = NULL;
     }
 }
 
-// connection with the ICAP service established
+/// called when the connection attempt to an ICAP service completes (successfully or not)
 void Adaptation::Icap::Xaction::noteCommConnected(const CommConnectCbParams &io)
 {
-    cs = NULL;
+    transportWait.finish();
 
-    if (io.flag == Comm::TIMEOUT) {
-        handleCommTimedout();
+    if (io.flag != Comm::OK) {
+        dieOnConnectionFailure(); // throws
         return;
     }
 
-    Must(connector != NULL);
-    connector = NULL;
-
-    if (io.flag != Comm::OK)
-        dieOnConnectionFailure(); // throws
-
-    typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommTimeoutCbParams> TimeoutDialer;
-    AsyncCall::Pointer timeoutCall =  asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommTimedout",
-                                      TimeoutDialer(this,&Adaptation::Icap::Xaction::noteCommTimedout));
-    commSetConnTimeout(io.conn, TheConfig.connect_timeout(service().cfg().bypass), timeoutCall);
+    useTransportConnection(io.conn);
+}
 
-    typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommCloseCbParams> CloseDialer;
-    closer =  asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed",
-                        CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed));
-    comm_add_close_handler(io.conn->fd, closer);
+/// React to the availability of a transport connection to the ICAP service.
+/// The given connection may (or may not) be secured already.
+void
+Adaptation::Icap::Xaction::useTransportConnection(const Comm::ConnectionPointer &conn)
+{
+    assert(Comm::IsConnOpen(conn));
+    assert(!connection);
 
     // If it is a reused connection and the TLS object is built
     // we should not negotiate new TLS session
-    const auto &ssl = fd_table[io.conn->fd].ssl;
+    const auto &ssl = fd_table[conn->fd].ssl;
     if (!ssl && service().cfg().secure.encryptTransport) {
+        // XXX: Exceptions orphan conn.
         CbcPointer<Adaptation::Icap::Xaction> me(this);
-        securer = asyncCall(93, 4, "Adaptation::Icap::Xaction::handleSecuredPeer",
-                            MyIcapAnswerDialer(me, &Adaptation::Icap::Xaction::handleSecuredPeer));
+        AsyncCall::Pointer callback = asyncCall(93, 4, "Adaptation::Icap::Xaction::handleSecuredPeer",
+                                                MyIcapAnswerDialer(me, &Adaptation::Icap::Xaction::handleSecuredPeer));
 
-        auto *sslConnector = new Ssl::IcapPeerConnector(theService, io.conn, securer, masterLogEntry(), TheConfig.connect_timeout(service().cfg().bypass));
-        AsyncJob::Start(sslConnector); // will call our callback
+        const auto sslConnector = new Ssl::IcapPeerConnector(theService, conn, callback, masterLogEntry(), TheConfig.connect_timeout(service().cfg().bypass));
+
+        encryptionWait.start(sslConnector, callback);
         return;
     }
 
-// ??    fd_table[io.conn->fd].noteUse(icapPconnPool);
+    useIcapConnection(conn);
+}
+
+/// react to the availability of a fully-ready ICAP connection
+void
+Adaptation::Icap::Xaction::useIcapConnection(const Comm::ConnectionPointer &conn)
+{
+    assert(!connection);
+    assert(conn);
+    assert(Comm::IsConnOpen(conn));
+    connection = conn;
     service().noteConnectionUse(connection);
 
-    handleCommConnected();
+    typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommCloseCbParams> CloseDialer;
+    closer =  asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed",
+                        CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed));
+    comm_add_close_handler(connection->fd, closer);
+
+    startShoveling();
 }
 
 void Adaptation::Icap::Xaction::dieOnConnectionFailure()
@@ -367,40 +355,25 @@ void Adaptation::Icap::Xaction::noteCommWrote(const CommIoCbParams &io)
 
 // communication timeout with the ICAP service
 void Adaptation::Icap::Xaction::noteCommTimedout(const CommTimeoutCbParams &)
-{
-    handleCommTimedout();
-}
-
-void Adaptation::Icap::Xaction::handleCommTimedout()
 {
     debugs(93, 2, HERE << typeName << " failed: timeout with " <<
            theService->cfg().methodStr() << " " <<
            theService->cfg().uri << status());
     reuseConnection = false;
-    const bool whileConnecting = connector != NULL;
-    if (whileConnecting) {
-        assert(!haveConnection());
-        theService->noteConnectionFailed("timedout");
-    } else
-        closeConnection(); // so that late Comm callbacks do not disturb bypass
-    throw TexcHere(whileConnecting ?
-                   "timed out while connecting to the ICAP service" :
-                   "timed out while talking to the ICAP service");
+    assert(haveConnection());
+    closeConnection();
+    throw TextException("timed out while talking to the ICAP service", Here());
 }
 
 // unexpected connection close while talking to the ICAP service
 void Adaptation::Icap::Xaction::noteCommClosed(const CommCloseCbParams &)
 {
-    if (securer != NULL) {
-        securer->cancel("Connection closed before SSL negotiation finished");
-        securer = NULL;
+    if (connection) {
+        connection->noteClosure();
+        connection = nullptr;
     }
     closer = NULL;
-    handleCommClosed();
-}
 
-void Adaptation::Icap::Xaction::handleCommClosed()
-{
     static const auto d = MakeNamedErrorDetail("ICAP_XACT_CLOSE");
     detailError(d);
     mustStop("ICAP service connection externally closed");
@@ -424,7 +397,8 @@ void Adaptation::Icap::Xaction::callEnd()
 
 bool Adaptation::Icap::Xaction::doneAll() const
 {
-    return !waitingForDns && !connector && !securer && !reader && !writer &&
+    return !waitingForDns && !transportWait && !encryptionWait &&
+           !reader && !writer &&
            Adaptation::Initiate::doneAll();
 }
 
@@ -568,7 +542,7 @@ bool Adaptation::Icap::Xaction::doneWriting() const
 bool Adaptation::Icap::Xaction::doneWithIo() const
 {
     return haveConnection() &&
-           !connector && !reader && !writer && // fast checks, some redundant
+           !transportWait && !reader && !writer && // fast checks, some redundant
            doneReading() && doneWriting();
 }
 
@@ -608,10 +582,7 @@ void Adaptation::Icap::Xaction::setOutcome(const Adaptation::Icap::XactOutcome &
 void Adaptation::Icap::Xaction::swanSong()
 {
     // kids should sing first and then call the parent method.
-    if (cs.valid()) {
-        debugs(93,6, HERE << id << " about to notify ConnOpener!");
-        CallJobHere(93, 3, cs, Comm::ConnOpener, noteAbort);
-        cs = NULL;
+    if (transportWait || encryptionWait) {
         service().noteConnectionFailed("abort");
     }
 
@@ -750,20 +721,12 @@ Ssl::IcapPeerConnector::noteNegotiationDone(ErrorState *error)
 void
 Adaptation::Icap::Xaction::handleSecuredPeer(Security::EncryptorAnswer &answer)
 {
-    Must(securer != NULL);
-    securer = NULL;
-
-    if (closer != NULL) {
-        if (Comm::IsConnOpen(answer.conn))
-            comm_remove_close_handler(answer.conn->fd, closer);
-        else
-            closer->cancel("securing completed");
-        closer = NULL;
-    }
+    encryptionWait.finish();
 
+    assert(!answer.tunneled);
     if (answer.error.get()) {
-        if (answer.conn != NULL)
-            answer.conn->close();
+        assert(!answer.conn);
+        // TODO: Refactor dieOnConnectionFailure() to be usable here as well.
         debugs(93, 2, typeName <<
                " TLS negotiation to " << service().cfg().uri << " failed");
         service().noteConnectionFailed("failure");
@@ -774,8 +737,18 @@ Adaptation::Icap::Xaction::handleSecuredPeer(Security::EncryptorAnswer &answer)
 
     debugs(93, 5, "TLS negotiation to " << service().cfg().uri << " complete");
 
-    service().noteConnectionUse(answer.conn);
+    assert(answer.conn);
+
+    // The socket could get closed while our callback was queued. Sync
+    // Connection. XXX: Connection::fd may already be stale/invalid here.
+    if (answer.conn->isOpen() && fd_table[answer.conn->fd].closing()) {
+        answer.conn->noteClosure();
+        service().noteConnectionFailed("external TLS connection closure");
+        static const auto d = MakeNamedErrorDetail("ICAP_XACT_SSL_CLOSE");
+        detailError(d);
+        throw TexcHere("external closure of the TLS ICAP service connection");
+    }
 
-    handleCommConnected();
+    useIcapConnection(answer.conn);
 }
 
index b6604459444d7f78c49093337e6c2ae4ef4c7936..31a6e22fc9ea274a2efc01e4977fa68b36b22643 100644 (file)
@@ -12,6 +12,7 @@
 #include "AccessLogEntry.h"
 #include "adaptation/icap/ServiceRep.h"
 #include "adaptation/Initiate.h"
+#include "base/JobWait.h"
 #include "comm/ConnOpener.h"
 #include "error/forward.h"
 #include "HttpReply.h"
 
 class MemBuf;
 
+namespace Ssl {
+class IcapPeerConnector;
+}
+
 namespace Adaptation
 {
 namespace Icap
@@ -65,12 +70,12 @@ protected:
     virtual void start();
     virtual void noteInitiatorAborted(); // TODO: move to Adaptation::Initiate
 
+    /// starts sending/receiving ICAP messages
+    virtual void startShoveling() = 0;
+
     // comm hanndlers; called by comm handler wrappers
-    virtual void handleCommConnected() = 0;
     virtual void handleCommWrote(size_t sz) = 0;
     virtual void handleCommRead(size_t sz) = 0;
-    virtual void handleCommTimedout();
-    virtual void handleCommClosed();
 
     void handleSecuredPeer(Security::EncryptorAnswer &answer);
     /// record error detail if possible
@@ -78,7 +83,6 @@ protected:
 
     void openConnection();
     void closeConnection();
-    void dieOnConnectionFailure();
     bool haveConnection() const;
 
     void scheduleRead();
@@ -124,11 +128,13 @@ public:
     ServiceRep &service();
 
 private:
+    void useTransportConnection(const Comm::ConnectionPointer &);
+    void useIcapConnection(const Comm::ConnectionPointer &);
+    void dieOnConnectionFailure();
     void tellQueryAborted();
     void maybeLog();
 
 protected:
-    Comm::ConnectionPointer connection;     ///< ICAP server connection
     Adaptation::Icap::ServiceRep::Pointer theService;
 
     SBuf readBuf;
@@ -139,13 +145,8 @@ protected:
     bool ignoreLastWrite;
     bool waitingForDns; ///< expecting a ipcache_nbgethostbyname() callback
 
-    const char *stopReason;
-
-    // active (pending) comm callbacks for the ICAP server connection
-    AsyncCall::Pointer connector;
     AsyncCall::Pointer reader;
     AsyncCall::Pointer writer;
-    AsyncCall::Pointer closer;
 
     AccessLogEntry::Pointer alep; ///< icap.log entry
     AccessLogEntry &al; ///< short for *alep
@@ -155,8 +156,16 @@ protected:
     timeval icap_tio_finish;   /*time when the last byte of the ICAP responsewas received*/
 
 private:
-    Comm::ConnOpener::Pointer cs;
-    AsyncCall::Pointer securer; ///< whether we are securing a connection
+    /// waits for a transport connection to the ICAP server to be established/opened
+    JobWait<Comm::ConnOpener> transportWait;
+
+    /// waits for the established transport connection to be secured/encrypted
+    JobWait<Ssl::IcapPeerConnector> encryptionWait;
+
+    /// open and, if necessary, secured connection to the ICAP server (or nil)
+    Comm::ConnectionPointer connection;
+
+    AsyncCall::Pointer closer;
 };
 
 } // namespace Icap
index 3b9161e4a109d17ccde807b950d2b76de099ddee..111667d5d8f1a7546079dd0d44f5b952f4688dd6 100644 (file)
 
 InstanceIdDefinitions(AsyncJob, "job");
 
-AsyncJob::Pointer AsyncJob::Start(AsyncJob *j)
+void
+AsyncJob::Start(const Pointer &job)
 {
-    AsyncJob::Pointer job(j);
     CallJobHere(93, 5, job, AsyncJob, start);
-    return job;
+    job->started_ = true; // it is the attempt that counts
 }
 
 AsyncJob::AsyncJob(const char *aTypeName) :
@@ -38,6 +38,7 @@ AsyncJob::~AsyncJob()
 {
     debugs(93,5, "AsyncJob destructed, this=" << this <<
            " type=" << typeName << " [" << id << ']');
+    assert(!started_ || swanSang_);
 }
 
 void AsyncJob::start()
@@ -141,9 +142,16 @@ void AsyncJob::callEnd()
         AsyncCall::Pointer inCallSaved = inCall;
         void *thisSaved = this;
 
+        // TODO: Swallow swanSong() exceptions to reduce memory leaks.
+
+        // Job callback invariant: swanSong() is (only) called for started jobs.
+        // Here to detect violations in kids that forgot to call our swanSong().
+        assert(started_);
+
+        swanSang_ = true; // it is the attempt that counts
         swanSong();
 
-        delete this; // this is the only place where the object is deleted
+        delete this; // this is the only place where a started job is deleted
 
         // careful: this object does not exist any more
         debugs(93, 6, HERE << *inCallSaved << " ended " << thisSaved);
index 4d685dff52e87351dd8cffa8b73873592f2f6d62..a46e300713c489c29884f5b2b43528ce67ccdddb 100644 (file)
@@ -36,8 +36,13 @@ public:
 public:
     AsyncJob(const char *aTypeName);
 
-    /// starts a freshly created job (i.e., makes the job asynchronous)
-    static Pointer Start(AsyncJob *job);
+    /// Promises to start the configured job (eventually). The job is deemed to
+    /// be running asynchronously beyond this point, so the caller should only
+    /// access the job object via AsyncCalls rather than directly.
+    ///
+    /// swanSong() is only called for jobs for which this method has returned
+    /// successfully (i.e. without throwing).
+    static void Start(const Pointer &job);
 
 protected:
     // XXX: temporary method to replace "delete this" in jobs-in-transition.
@@ -62,6 +67,11 @@ public:
     /// called when the job throws during an async call
     virtual void callException(const std::exception &e);
 
+    /// process external request to terminate now (i.e. during this async call)
+    void handleStopRequest() { mustStop("externally aborted"); }
+
+    const InstanceId<AsyncJob> id; ///< job identifier
+
 protected:
     // external destruction prohibited to ensure swanSong() is called
     virtual ~AsyncJob();
@@ -69,7 +79,9 @@ protected:
     const char *stopReason; ///< reason for forcing done() to be true
     const char *typeName; ///< kid (leaf) class name, for debugging
     AsyncCall::Pointer inCall; ///< the asynchronous call being handled, if any
-    const InstanceId<AsyncJob> id; ///< job identifier
+
+    bool started_ = false; ///< Start() has finished successfully
+    bool swanSang_ = false; ///< swanSong() was called
 };
 
 #endif /* SQUID_ASYNC_JOB_H */
diff --git a/src/base/JobWait.cc b/src/base/JobWait.cc
new file mode 100644 (file)
index 0000000..ba68324
--- /dev/null
@@ -0,0 +1,81 @@
+/*
+ * Copyright (C) 1996-2020 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#include "squid.h"
+#include "base/AsyncJobCalls.h"
+#include "base/JobWait.h"
+
+#include <cassert>
+#include <iostream>
+
+JobWaitBase::JobWaitBase() = default;
+
+JobWaitBase::~JobWaitBase()
+{
+    cancel("owner gone");
+}
+
+void
+JobWaitBase::start_(const AsyncJob::Pointer aJob, const AsyncCall::Pointer aCall)
+{
+    // Invariant: The wait will be over. We cannot guarantee that the job will
+    // call the callback, of course, but we can check these prerequisites.
+    assert(aCall);
+    assert(aJob.valid());
+
+    // "Double" waiting state leads to conflicting/mismatching callbacks
+    // detailed in finish(). Detect that bug ASAP.
+    assert(!waiting());
+
+    assert(!callback_);
+    assert(!job_);
+    callback_ = aCall;
+    job_ = aJob;
+
+    AsyncJob::Start(job_.get());
+}
+
+void
+JobWaitBase::finish()
+{
+    // Unexpected callbacks might result in disasters like secrets exposure,
+    // data corruption, or expensive message routing mistakes when the callback
+    // info is applied to the wrong message part or acted upon prematurely.
+    assert(waiting());
+    clear();
+}
+
+void
+JobWaitBase::cancel(const char *reason)
+{
+    if (callback_) {
+        callback_->cancel(reason);
+
+        // Instead of AsyncJob, the class parameter could be Job. That would
+        // avoid runtime child-to-parent CbcPointer conversion overheads, but
+        // complicate support for Jobs with virtual AsyncJob bases (GCC error:
+        // "pointer to member conversion via virtual base AsyncJob") and also
+        // cache-log "Job::handleStopRequest()" with a non-existent class name.
+        CallJobHere(callback_->debugSection, callback_->debugLevel, job_, AsyncJob, handleStopRequest);
+
+        clear();
+    }
+}
+
+void
+JobWaitBase::print(std::ostream &os) const
+{
+    // use a backarrow to emphasize that this is a callback: call24<-job6
+    if (callback_)
+        os << callback_->id << "<-";
+    if (const auto rawJob = job_.get())
+        os << rawJob->id;
+    else
+        os << job_; // raw pointer of a gone job may still be useful for triage
+}
+
diff --git a/src/base/JobWait.h b/src/base/JobWait.h
new file mode 100644 (file)
index 0000000..6b11313
--- /dev/null
@@ -0,0 +1,91 @@
+/*
+ * Copyright (C) 1996-2021 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#ifndef SQUID_BASE_JOBWAIT_H
+#define SQUID_BASE_JOBWAIT_H
+
+#include "base/AsyncJob.h"
+#include "base/CbcPointer.h"
+
+#include <iosfwd>
+
+/// Manages waiting for an AsyncJob callback. Use type-safe JobWait instead.
+/// This base class does not contain code specific to the actual Job type.
+class JobWaitBase
+{
+public:
+    JobWaitBase();
+    ~JobWaitBase();
+
+    /// no copying of any kind: each waiting context needs a dedicated AsyncCall
+    JobWaitBase(JobWaitBase &&) = delete;
+
+    explicit operator bool() const { return waiting(); }
+
+    /// whether we are currently waiting for the job to call us back
+    /// the job itself may be gone even if this returns true
+    bool waiting() const { return bool(callback_); }
+
+    /// ends wait (after receiving the call back)
+    /// forgets the job which is likely to be gone by now
+    void finish();
+
+    /// aborts wait (if any) before receiving the call back
+    /// does nothing if we are not waiting
+    void cancel(const char *reason);
+
+    /// summarizes what we are waiting for (for debugging)
+    void print(std::ostream &) const;
+
+protected:
+    /// starts waiting for the given job to call the given callback
+    void start_(AsyncJob::Pointer, AsyncCall::Pointer);
+
+private:
+    /// the common part of finish() and cancel()
+    void clear() { job_.clear(); callback_ = nullptr; }
+
+    /// the job that we are waiting to call us back (or nil)
+    AsyncJob::Pointer job_;
+
+    /// the call we are waiting for the job_ to make (or nil)
+    AsyncCall::Pointer callback_;
+};
+
+/// Manages waiting for an AsyncJob callback.
+/// Completes JobWaitBase by providing Job type-specific members.
+template <class Job>
+class JobWait: public JobWaitBase
+{
+public:
+    typedef CbcPointer<Job> JobPointer;
+
+    /// starts waiting for the given job to call the given callback
+    void start(const JobPointer &aJob, const AsyncCall::Pointer &aCallback) {
+        start_(aJob, aCallback);
+        typedJob_ = aJob;
+    }
+
+    /// \returns a cbdata pointer to the job we are waiting for (or nil)
+    /// the returned pointer may be falsy, even if we are still waiting()
+    JobPointer job() const { return waiting() ? typedJob_ : nullptr; }
+
+private:
+    /// nearly duplicates JobWaitBase::job_ but exposes the actual job type
+    JobPointer typedJob_;
+};
+
+inline
+std::ostream &operator <<(std::ostream &os, const JobWaitBase &wait)
+{
+    wait.print(os);
+    return os;
+}
+
+#endif /* SQUID_BASE_JOBWAIT_H */
+
index c9564d755563fbc5f7c8a3dce54e92c28d96d3cc..374ea8798096728488534ca216c20c83583cbabb 100644 (file)
@@ -34,6 +34,8 @@ libbase_la_SOURCES = \
        Here.h \
        InstanceId.cc \
        InstanceId.h \
+       JobWait.cc \
+       JobWait.h \
        Lock.h \
        LookupTable.h \
        LruMap.h \
index 3803e1ea019e70a158c99509f8185d5f81b2ee50..46de97c8d7c58206c70cb70dfa5f07421653d52d 100644 (file)
@@ -17,6 +17,7 @@ class ScopedId;
 
 template<class Cbc> class CbcPointer;
 template<class RefCountableKid> class RefCount;
+template<class Job> class JobWait;
 
 typedef CbcPointer<AsyncJob> AsyncJobPointer;
 typedef RefCount<CodeContext> CodeContextPointer;
index b8d786423d500012c4b5560339a31ba7d2ef5dfd..4eb69769669c26b42f5e791d588af06d9c647077 100644 (file)
@@ -503,6 +503,10 @@ httpRequestFree(void *data)
 /* This is a handler normally called by comm_close() */
 void ConnStateData::connStateClosed(const CommCloseCbParams &)
 {
+    if (clientConnection) {
+        clientConnection->noteClosure();
+        // keep closed clientConnection for logging, clientdb cleanup, etc.
+    }
     deleteThis("ConnStateData::connStateClosed");
 }
 
index 7874db12193d6ef1ae4df7468be9cc5a701defe2..afe7827dc6fc1504a0a88aa2248decaa04869746 100644 (file)
@@ -201,10 +201,6 @@ Ftp::Client::Client(FwdState *fwdState):
 
 Ftp::Client::~Client()
 {
-    if (data.opener != NULL) {
-        data.opener->cancel("Ftp::Client destructed");
-        data.opener = NULL;
-    }
     data.close();
 
     safe_free(old_request);
@@ -786,10 +782,10 @@ Ftp::Client::connectDataChannel()
     debugs(9, 3, "connecting to " << conn->remote);
 
     typedef CommCbMemFunT<Client, CommConnectCbParams> Dialer;
-    data.opener = JobCallback(9, 3, Dialer, this, Ftp::Client::dataChannelConnected);
-    Comm::ConnOpener *cs = new Comm::ConnOpener(conn, data.opener, Config.Timeout.connect);
+    AsyncCall::Pointer callback = JobCallback(9, 3, Dialer, this, Ftp::Client::dataChannelConnected);
+    const auto cs = new Comm::ConnOpener(conn, callback, Config.Timeout.connect);
     cs->setHost(data.host);
-    AsyncJob::Start(cs);
+    dataConnWait.start(cs, callback);
 }
 
 bool
@@ -811,10 +807,11 @@ void
 Ftp::Client::dataClosed(const CommCloseCbParams &)
 {
     debugs(9, 4, status());
+    if (data.conn)
+        data.conn->noteClosure();
     if (data.listenConn != NULL) {
         data.listenConn->close();
         data.listenConn = NULL;
-        // NP clear() does the: data.fd = -1;
     }
     data.clear();
 }
@@ -879,6 +876,8 @@ void
 Ftp::Client::ctrlClosed(const CommCloseCbParams &)
 {
     debugs(9, 4, status());
+    if (ctrl.conn)
+        ctrl.conn->noteClosure();
     ctrl.clear();
     doneWithFwd = "ctrlClosed()"; // assume FwdState is monitoring too
     mustStop("Ftp::Client::ctrlClosed");
index ac8d22e18045c05aaf90699f133ef1cfc491d70c..4b2dd61d5472a7a8a29b82d5a09b70eb536ab367 100644 (file)
@@ -64,7 +64,6 @@ public:
      */
     Comm::ConnectionPointer listenConn;
 
-    AsyncCall::Pointer opener; ///< Comm opener handler callback.
 private:
     AsyncCall::Pointer closer; ///< Comm close handler callback
 };
@@ -205,6 +204,10 @@ protected:
     virtual void sentRequestBody(const CommIoCbParams &io);
     virtual void doneSendingRequestBody();
 
+    /// Waits for an FTP data connection to the server to be established/opened.
+    /// This wait only happens in FTP passive mode (via PASV or EPSV).
+    JobWait<Comm::ConnOpener> dataConnWait;
+
 private:
     bool parseControlReply(size_t &bytesUsed);
 
index 6a7d139ca50f533109578d10b3aad23d62652285..aeeaedb4834ca5245a9a425c7f6aede286d59765 100644 (file)
@@ -486,11 +486,9 @@ Ftp::Gateway::timeout(const CommTimeoutCbParams &io)
         flags.pasv_supported = false;
         debugs(9, DBG_IMPORTANT, "FTP Gateway timeout in SENT_PASV state");
 
-        // cancel the data connection setup.
-        if (data.opener != NULL) {
-            data.opener->cancel("timeout");
-            data.opener = NULL;
-        }
+        // cancel the data connection setup, if any
+        dataConnWait.cancel("timeout");
+
         data.close();
     }
 
@@ -1723,7 +1721,7 @@ void
 Ftp::Gateway::dataChannelConnected(const CommConnectCbParams &io)
 {
     debugs(9, 3, HERE);
-    data.opener = NULL;
+    dataConnWait.finish();
 
     if (io.flag != Comm::OK) {
         debugs(9, 2, HERE << "Failed to connect. Retrying via another method.");
@@ -2727,9 +2725,9 @@ Ftp::Gateway::mayReadVirginReplyBody() const
     return !doneWithServer();
 }
 
-AsyncJob::Pointer
+void
 Ftp::StartGateway(FwdState *const fwdState)
 {
-    return AsyncJob::Start(new Ftp::Gateway(fwdState));
+    AsyncJob::Start(new Ftp::Gateway(fwdState));
 }
 
index 8796a74c84fd801b625312ba0b54be2c00f839b0..ce6ba3ba610bf126503fa9655e813391eee82215 100644 (file)
@@ -739,7 +739,7 @@ void
 Ftp::Relay::dataChannelConnected(const CommConnectCbParams &io)
 {
     debugs(9, 3, status());
-    data.opener = NULL;
+    dataConnWait.finish();
 
     if (io.flag != Comm::OK) {
         debugs(9, 2, "failed to connect FTP server data channel");
@@ -804,9 +804,9 @@ Ftp::Relay::HandleStoreAbort(Relay *ftpClient)
         ftpClient->dataComplete();
 }
 
-AsyncJob::Pointer
+void
 Ftp::StartRelay(FwdState *const fwdState)
 {
-    return AsyncJob::Start(new Ftp::Relay(fwdState));
+    AsyncJob::Start(new Ftp::Relay(fwdState));
 }
 
index 023a8586cc12c59a21e0461d5f5a1dad3b1f6660..67a52e6628ab6a583a8a14a3bb75f169fbf1a973 100644 (file)
@@ -101,6 +101,11 @@ void
 Http::Tunneler::handleConnectionClosure(const CommCloseCbParams &params)
 {
     closer = nullptr;
+    if (connection) {
+        countFailingConnection();
+        connection->noteClosure();
+        connection = nullptr;
+    }
     bailWith(new ErrorState(ERR_CONNECT_FAIL, Http::scBadGateway, request.getRaw(), al));
 }
 
@@ -360,50 +365,64 @@ Http::Tunneler::bailWith(ErrorState *error)
     Must(error);
     answer().squidError = error;
 
-    if (const auto p = connection->getPeer())
-        peerConnectFailed(p);
+    if (const auto failingConnection = connection) {
+        // TODO: Reuse to-peer connections after a CONNECT error response.
+        countFailingConnection();
+        disconnect();
+        failingConnection->close();
+    }
 
     callBack();
-    disconnect();
-
-    if (noteFwdPconnUse)
-        fwdPconnPool->noteUses(fd_table[connection->fd].pconn.uses);
-    // TODO: Reuse to-peer connections after a CONNECT error response.
-    connection->close();
-    connection = nullptr;
 }
 
 void
 Http::Tunneler::sendSuccess()
 {
     assert(answer().positive());
-    callBack();
+    assert(Comm::IsConnOpen(connection));
+    answer().conn = connection;
     disconnect();
+    callBack();
+}
+
+void
+Http::Tunneler::countFailingConnection()
+{
+    assert(connection);
+    if (const auto p = connection->getPeer())
+        peerConnectFailed(p);
+    if (noteFwdPconnUse && connection->isOpen())
+        fwdPconnPool->noteUses(fd_table[connection->fd].pconn.uses);
 }
 
 void
 Http::Tunneler::disconnect()
 {
+    const auto stillOpen = Comm::IsConnOpen(connection);
+
     if (closer) {
-        comm_remove_close_handler(connection->fd, closer);
+        if (stillOpen)
+            comm_remove_close_handler(connection->fd, closer);
         closer = nullptr;
     }
 
     if (reader) {
-        Comm::ReadCancel(connection->fd, reader);
+        if (stillOpen)
+            Comm::ReadCancel(connection->fd, reader);
         reader = nullptr;
     }
 
-    // remove connection timeout handler
-    commUnsetConnTimeout(connection);
+    if (stillOpen)
+        commUnsetConnTimeout(connection);
+
+    connection = nullptr; // may still be open
 }
 
 void
 Http::Tunneler::callBack()
 {
-    debugs(83, 5, connection << status());
-    if (answer().positive())
-        answer().conn = connection;
+    debugs(83, 5, answer().conn << status());
+    assert(!connection); // returned inside answer() or gone
     auto cb = callback;
     callback = nullptr;
     ScheduleCallHere(cb);
@@ -415,11 +434,10 @@ Http::Tunneler::swanSong()
     AsyncJob::swanSong();
 
     if (callback) {
-        if (requestWritten && tunnelEstablished) {
+        if (requestWritten && tunnelEstablished && Comm::IsConnOpen(connection)) {
             sendSuccess();
         } else {
-            // we should have bailed when we discovered the job-killing problem
-            debugs(83, DBG_IMPORTANT, "BUG: Unexpected state while establishing a CONNECT tunnel " << connection << status());
+            // job-ending emergencies like handleStopRequest() or callException()
             bailWith(new ErrorState(ERR_GATEWAY_FAILURE, Http::scInternalServerError, request.getRaw(), al));
         }
         assert(!callback);
index c4c096fe7cb5b6872a708cb217dd4d5d0462af03..e88d17610f42cf92ac32d8d9c2dd114b1c586b43 100644 (file)
@@ -87,6 +87,7 @@ protected:
     void handleResponse(const bool eof);
     void bailOnResponseError(const char *error, HttpReply *);
 
+private:
     /// sends the given error to the initiator
     void bailWith(ErrorState*);
 
@@ -96,12 +97,14 @@ protected:
     /// a bailWith(), sendSuccess() helper: sends results to the initiator
     void callBack();
 
-    /// a bailWith(), sendSuccess() helper: stops monitoring the connection
+    /// stops monitoring the connection
     void disconnect();
 
+    /// updates connection usage history before the connection is closed
+    void countFailingConnection();
+
     TunnelerAnswer &answer();
 
-private:
     AsyncCall::Pointer writer; ///< called when the request has been written
     AsyncCall::Pointer reader; ///< called when the response should be read
     AsyncCall::Pointer closer; ///< called when the connection is being closed
index 0351557dac251b64dab72cb1bb66edf675bcde93..82e5eb9bd28150fc0e0aab64cea6c7c04ecec596 100644 (file)
@@ -28,10 +28,10 @@ namespace Ftp
 {
 
 /// A new FTP Gateway job
-AsyncJobPointer StartGateway(FwdState *const fwdState);
+void StartGateway(FwdState *const fwdState);
 
 /// A new FTP Relay job
-AsyncJobPointer StartRelay(FwdState *const fwdState);
+void StartRelay(FwdState *const fwdState);
 
 /** Construct an URI with leading / in PATH portion for use by CWD command
  *  possibly others. FTP encodes absolute paths as beginning with '/'
index 54ba16271698799057105da933b1bb6558357488..a0913f3245a68a1fd89d2845bbb12f1ec158bcd0 100644 (file)
@@ -743,6 +743,10 @@ commCallCloseHandlers(int fd)
         // If call is not canceled schedule it for execution else ignore it
         if (!call->canceled()) {
             debugs(5, 5, "commCallCloseHandlers: ch->handler=" << call);
+            // XXX: Without the following code, callback fd may be -1.
+            // typedef CommCloseCbParams Params;
+            // auto &params = GetCommParams<Params>(call);
+            // params.fd = fd;
             ScheduleCallHere(call);
         }
     }
@@ -1787,6 +1791,10 @@ DeferredReadManager::CloseHandler(const CommCloseCbParams &params)
     CbDataList<DeferredRead> *temp = (CbDataList<DeferredRead> *)params.data;
 
     temp->element.closer = NULL;
+    if (temp->element.theRead.conn) {
+        temp->element.theRead.conn->noteClosure();
+        temp->element.theRead.conn = nullptr;
+    }
     temp->element.markCancelled();
 }
 
@@ -1860,6 +1868,11 @@ DeferredReadManager::kickARead(DeferredRead const &aRead)
     if (aRead.cancelled)
         return;
 
+    // TODO: This check still allows theReader call with a closed theRead.conn.
+    // If a delayRead() caller has a close connection handler, then such a call
+    // would be useless and dangerous. If a delayRead() caller does not have it,
+    // then the caller will get stuck when an external connection closure makes
+    // aRead.cancelled (checked above) true.
     if (Comm::IsConnOpen(aRead.theRead.conn) && fd_table[aRead.theRead.conn->fd].closing())
         return;
 
index dc376b7781758151c9e45f4c78c849b4adcdf5e8..19c1237aeaa84207fed29f0dc284d5cde67cc030 100644 (file)
@@ -41,6 +41,14 @@ Comm::ConnOpener::ConnOpener(const Comm::ConnectionPointer &c, const AsyncCall::
     deadline_(squid_curtime + static_cast<time_t>(ctimeout))
 {
     debugs(5, 3, "will connect to " << c << " with " << ctimeout << " timeout");
+    assert(conn_); // we know where to go
+
+    // Sharing a being-modified Connection object with the caller is dangerous,
+    // but we cannot ban (or even check for) that using existing APIs. We do not
+    // want to clone "just in case" because cloning is a bit expensive, and most
+    // callers already have a non-owned Connection object to give us. Until the
+    // APIs improve, we can only check that the connection is not open.
+    assert(!conn_->isOpen());
 }
 
 Comm::ConnOpener::~ConnOpener()
@@ -78,6 +86,10 @@ Comm::ConnOpener::swanSong()
     if (temporaryFd_ >= 0)
         closeFd();
 
+    // did we abort while owning an open connection?
+    if (conn_ && conn_->isOpen())
+        conn_->close();
+
     // did we abort while waiting between retries?
     if (calls_.sleep_)
         cancelSleep();
@@ -131,9 +143,18 @@ Comm::ConnOpener::sendAnswer(Comm::Flag errFlag, int xerrno, const char *why)
                    " [" << callback_->id << ']' );
             // TODO save the pconn to the pconnPool ?
         } else {
+            assert(conn_);
+
+            // free resources earlier and simplify recipients
+            if (errFlag != Comm::OK)
+                conn_->close(); // may not be opened
+            else
+                assert(conn_->isOpen());
+
             typedef CommConnectCbParams Params;
             Params &params = GetCommParams<Params>(callback_);
             params.conn = conn_;
+            conn_ = nullptr; // release ownership; prevent closure by us
             params.flag = errFlag;
             params.xerrno = xerrno;
             ScheduleCallHere(callback_);
@@ -152,7 +173,7 @@ Comm::ConnOpener::sendAnswer(Comm::Flag errFlag, int xerrno, const char *why)
 void
 Comm::ConnOpener::cleanFd()
 {
-    debugs(5, 4, HERE << conn_ << " closing temp FD " << temporaryFd_);
+    debugs(5, 4, conn_ << "; temp FD " << temporaryFd_);
 
     Must(temporaryFd_ >= 0);
     fde &f = fd_table[temporaryFd_];
@@ -258,6 +279,7 @@ bool
 Comm::ConnOpener::createFd()
 {
     Must(temporaryFd_ < 0);
+    assert(conn_);
 
     // our initators signal abort by cancelling their callbacks
     if (callback_ == NULL || callback_->canceled())
index 329d8a3c8e9ed4a08ea8790e263941c67e1d4bc8..e1e60649dc665157e33e5a57c97a4e01bbb0190d 100644 (file)
 namespace Comm
 {
 
-/**
- * Async-opener of a Comm connection.
- */
+/// Asynchronously opens a TCP connection. Returns CommConnectCbParams: either
+/// Comm::OK with an open connection or another Comm::Flag with a closed one.
 class ConnOpener : public AsyncJob
 {
     CBDATA_CLASS(ConnOpener);
 
 public:
-    void noteAbort() { mustStop("externally aborted"); }
-
     typedef CbcPointer<ConnOpener> Pointer;
 
     virtual bool doneAll() const;
index 4e9719a01aeab31eb36122a03e09804e101580c1..9bc08da057053e2ab1604c9a0d92973d5c8e8e9a 100644 (file)
@@ -7,6 +7,7 @@
  */
 
 #include "squid.h"
+#include "base/JobWait.h"
 #include "CachePeer.h"
 #include "cbdata.h"
 #include "comm.h"
@@ -60,26 +61,44 @@ Comm::Connection::~Connection()
 }
 
 Comm::ConnectionPointer
-Comm::Connection::cloneDestinationDetails() const
+Comm::Connection::cloneProfile() const
 {
-    const ConnectionPointer c = new Comm::Connection;
-    c->setAddrs(local, remote);
-    c->peerType = peerType;
-    c->flags = flags;
-    c->peer_ = cbdataReference(getPeer());
-    assert(!c->isOpen());
-    return c;
-}
+    const ConnectionPointer clone = new Comm::Connection;
+    auto &c = *clone; // optimization
+
+    /*
+     * Copy or excuse each data member. Excused members do not belong to a
+     * Connection configuration profile because their values cannot be reused
+     * across (co-existing) Connection objects and/or are tied to their own
+     * object lifetime.
+     */
+
+    c.setAddrs(local, remote);
+    c.peerType = peerType;
+    // fd excused
+    c.tos = tos;
+    c.nfmark = nfmark;
+    c.nfConnmark = nfConnmark;
+    // COMM_ORPHANED is not a part of connection opening instructions
+    c.flags = flags & ~COMM_ORPHANED;
+    // rfc931 is excused
+
+#if USE_SQUID_EUI
+    // These are currently only set when accepting connections and never used
+    // for establishing new ones, so this copying is currently in vain, but,
+    // technically, they can be a part of connection opening instructions.
+    c.remoteEui48 = remoteEui48;
+    c.remoteEui64 = remoteEui64;
+#endif
 
-Comm::ConnectionPointer
-Comm::Connection::cloneIdentDetails() const
-{
-    auto c = cloneDestinationDetails();
-    c->tos = tos;
-    c->nfmark = nfmark;
-    c->nfConnmark = nfConnmark;
-    c->startTime_ = startTime_;
-    return c;
+    // id excused
+    c.peer_ = cbdataReference(getPeer());
+    // startTime_ excused
+    // tlsHistory excused
+
+    debugs(5, 5, this << " made " << c);
+    assert(!c.isOpen());
+    return clone;
 }
 
 void
index 5b66943ba2bc1af85e20020af44e266d87b02ad9..40c22491dc61d001cbbc2a4c732dd3b03192887f 100644 (file)
@@ -77,13 +77,12 @@ public:
     /** Clear the connection properties and close any open socket. */
     virtual ~Connection();
 
-    /// Create a new (closed) IDENT Connection object based on our from-Squid
-    /// connection properties.
-    ConnectionPointer cloneIdentDetails() const;
+    /// To prevent accidental copying of Connection objects that we started to
+    /// open or that are open, use cloneProfile() instead.
+    Connection(const Connection &&) = delete;
 
-    /// Create a new (closed) Connection object pointing to the same destination
-    /// as this from-Squid connection.
-    ConnectionPointer cloneDestinationDetails() const;
+    /// Create a new closed Connection with the same configuration as this one.
+    ConnectionPointer cloneProfile() const;
 
     /// close the still-open connection when its last reference is gone
     void enterOrphanage() { flags |= COMM_ORPHANED; }
@@ -140,17 +139,6 @@ public:
     virtual ScopedId codeContextGist() const override;
     virtual std::ostream &detailCodeContext(std::ostream &os) const override;
 
-private:
-    /** These objects may not be exactly duplicated. Use cloneIdentDetails() or
-     * cloneDestinationDetails() instead.
-     */
-    Connection(const Connection &c);
-
-    /** These objects may not be exactly duplicated. Use cloneIdentDetails() or
-     * cloneDestinationDetails() instead.
-     */
-    Connection & operator =(const Connection &c);
-
 public:
     /** Address/Port for the Squid end of a TCP link. */
     Ip::Address local;
index 341622e0f867cc616066b9469258641ff8d40dd9..510efa94f699288bf6a20a79f6a2de308bbf2654 100644 (file)
@@ -206,7 +206,10 @@ void
 Comm::TcpAcceptor::handleClosure(const CommCloseCbParams &)
 {
     closer_ = NULL;
-    conn = NULL;
+    if (conn) {
+        conn->noteClosure();
+        conn = nullptr;
+    }
     Must(done());
 }
 
index e1f4d3ee7a02869885e9896c91dfbf2f2fad0ed2..72078c64e15ac6e308687944736698aa1723a50e 100644 (file)
@@ -872,6 +872,10 @@ static void
 idnsVCClosed(const CommCloseCbParams &params)
 {
     nsvc * vc = (nsvc *)params.data;
+    if (vc->conn) {
+        vc->conn->noteClosure();
+        vc->conn = nullptr;
+    }
     delete vc;
 }
 
index 11f4e51b16261243e9b9a92cf74f02e05302165a..9aab5bbb7b1ac0d7fedca4dd7c8d2c65dd196a2f 100644 (file)
@@ -29,10 +29,8 @@ class Eui48
 
 public:
     Eui48() { clear(); }
-    Eui48(const Eui48 &t) { memcpy(this, &t, sizeof(Eui48)); }
     bool operator== (const Eui48 &t) const { return memcmp(eui, t.eui, SZ_EUI48_BUF) == 0; }
     bool operator< (const Eui48 &t) const { return memcmp(eui, t.eui, SZ_EUI48_BUF) < 0; }
-    ~Eui48() {}
 
     const unsigned char *get(void);
 
index 455220c2e9dc0d85424aed9aa36c4c56f0ef02f0..576a3f7b146f19d0e46ac95f31863613d64ae0b9 100644 (file)
@@ -98,11 +98,8 @@ public:
         entry->lock("gopherState");
         *replybuf = 0;
     }
-    ~GopherStateData() {if(buf) swanSong();}
 
-    /* AsyncJob API emulated */
-    void deleteThis(const char *aReason);
-    void swanSong();
+    ~GopherStateData();
 
 public:
     StoreEntry *entry;
@@ -156,30 +153,18 @@ static void
 gopherStateFree(const CommCloseCbParams &params)
 {
     GopherStateData *gopherState = (GopherStateData *)params.data;
-
-    if (gopherState == NULL)
-        return;
-
-    gopherState->deleteThis("gopherStateFree");
+    // Assume that FwdState is monitoring and calls noteClosure(). See XXX about
+    // Connection sharing with FwdState in gopherStart().
+    delete gopherState;
 }
 
-void
-GopherStateData::deleteThis(const char *)
-{
-    swanSong();
-    delete this;
-}
-
-void
-GopherStateData::swanSong()
+GopherStateData::~GopherStateData()
 {
     if (entry)
         entry->unlock("gopherState");
 
-    if (buf) {
+    if (buf)
         memFree(buf, MEM_4K_BUF);
-        buf = nullptr;
-    }
 }
 
 /**
@@ -986,6 +971,7 @@ gopherStart(FwdState * fwd)
         return;
     }
 
+    // XXX: Sharing open Connection with FwdState that has its own handlers/etc.
     gopherState->serverConn = fwd->serverConnection();
     gopherSendRequest(fwd->serverConnection()->fd, gopherState);
     AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "gopherTimeout",
index 50dc6baebe037b2d879a0c01a844fa3589efb45e..3c3ae5e2f39adc7bd6ae26fc43618707dda7d28e 100644 (file)
@@ -11,6 +11,7 @@
 #include "squid.h"
 
 #if USE_IDENT
+#include "base/JobWait.h"
 #include "comm.h"
 #include "comm/Connection.h"
 #include "comm/ConnOpener.h"
@@ -53,8 +54,15 @@ public:
 
     Comm::ConnectionPointer conn;
     MemBuf queryMsg;  ///< the lookup message sent to IDENT server
-    IdentClient *clients;
+    IdentClient *clients = nullptr;
     char buf[IDENT_BUFSIZE];
+
+    /// waits for a connection to the IDENT server to be established/opened
+    JobWait<Comm::ConnOpener> connWait;
+
+private:
+    // use deleteThis() to destroy
+    ~IdentStateData();
 };
 
 CBDATA_CLASS_INIT(IdentStateData);
@@ -73,8 +81,9 @@ static void ClientAdd(IdentStateData * state, IDCB * callback, void *callback_da
 Ident::IdentConfig Ident::TheConfig;
 
 void
-Ident::IdentStateData::deleteThis(const char *)
+Ident::IdentStateData::deleteThis(const char *reason)
 {
+    debugs(30, 3, reason);
     swanSong();
     delete this;
 }
@@ -84,6 +93,10 @@ Ident::IdentStateData::swanSong()
 {
     if (clients != NULL)
         notify(NULL);
+}
+
+Ident::IdentStateData::~IdentStateData() {
+    assert(!clients);
 
     if (Comm::IsConnOpen(conn)) {
         comm_remove_close_handler(conn->fd, Ident::Close, this);
@@ -112,6 +125,10 @@ void
 Ident::Close(const CommCloseCbParams &params)
 {
     IdentStateData *state = (IdentStateData *)params.data;
+    if (state->conn) {
+        state->conn->noteClosure();
+        state->conn = nullptr;
+    }
     state->deleteThis("connection closed");
 }
 
@@ -127,6 +144,16 @@ void
 Ident::ConnectDone(const Comm::ConnectionPointer &conn, Comm::Flag status, int, void *data)
 {
     IdentStateData *state = (IdentStateData *)data;
+    state->connWait.finish();
+
+    // Start owning the supplied connection (so that it is not orphaned if this
+    // function bails early). As a (tiny) optimization or perhaps just diff
+    // minimization, the close handler is added later, when we know we are not
+    // bailing. This delay is safe because comm_remove_close_handler() forgives
+    // missing handlers.
+    assert(conn); // but may be closed
+    assert(!state->conn);
+    state->conn = conn;
 
     if (status != Comm::OK) {
         if (status == Comm::TIMEOUT)
@@ -149,8 +176,8 @@ Ident::ConnectDone(const Comm::ConnectionPointer &conn, Comm::Flag status, int,
         return;
     }
 
-    assert(conn != NULL && conn == state->conn);
-    comm_add_close_handler(conn->fd, Ident::Close, state);
+    assert(state->conn->isOpen());
+    comm_add_close_handler(state->conn->fd, Ident::Close, state);
 
     AsyncCall::Pointer writeCall = commCbCall(5,4, "Ident::WriteFeedback",
                                    CommIoCbPtrFun(Ident::WriteFeedback, state));
@@ -259,10 +286,10 @@ Ident::Start(const Comm::ConnectionPointer &conn, IDCB * callback, void *data)
     state->hash.key = xstrdup(key);
 
     // copy the conn details. We do not want the original FD to be re-used by IDENT.
-    state->conn = conn->cloneIdentDetails();
+    const auto identConn = conn->cloneProfile();
     // NP: use random port for secure outbound to IDENT_PORT
-    state->conn->local.port(0);
-    state->conn->remote.port(IDENT_PORT);
+    identConn->local.port(0);
+    identConn->remote.port(IDENT_PORT);
 
     // build our query from the original connection details
     state->queryMsg.init();
@@ -272,7 +299,8 @@ Ident::Start(const Comm::ConnectionPointer &conn, IDCB * callback, void *data)
     hash_join(ident_hash, &state->hash);
 
     AsyncCall::Pointer call = commCbCall(30,3, "Ident::ConnectDone", CommConnectCbPtrFun(Ident::ConnectDone, state));
-    AsyncJob::Start(new Comm::ConnOpener(state->conn, call, Ident::TheConfig.timeout));
+    const auto connOpener = new Comm::ConnOpener(identConn, call, Ident::TheConfig.timeout);
+    state->connWait.start(connOpener, call);
 }
 
 void
index 2be2f6aeaffd4f125a93d80b9d52454e898318d8..3be1cc5db35d81aef85c28695f3e6ba7cd43d198 100644 (file)
@@ -256,13 +256,16 @@ Log::TcpLogger::doConnect()
 
     typedef CommCbMemFunT<TcpLogger, CommConnectCbParams> Dialer;
     AsyncCall::Pointer call = JobCallback(MY_DEBUG_SECTION, 5, Dialer, this, Log::TcpLogger::connectDone);
-    AsyncJob::Start(new Comm::ConnOpener(futureConn, call, 2));
+    const auto cs = new Comm::ConnOpener(futureConn, call, 2);
+    connWait.start(cs, call);
 }
 
 /// Comm::ConnOpener callback
 void
 Log::TcpLogger::connectDone(const CommConnectCbParams &params)
 {
+    connWait.finish();
+
     if (params.flag != Comm::OK) {
         const double delay = 0.5; // seconds
         if (connectFailures++ % 100 == 0) {
@@ -367,7 +370,10 @@ Log::TcpLogger::handleClosure(const CommCloseCbParams &)
 {
     assert(inCall != NULL);
     closer = NULL;
-    conn = NULL;
+    if (conn) {
+        conn->noteClosure();
+        conn = nullptr;
+    }
     // in all current use cases, we should not try to reconnect
     mustStop("Log::TcpLogger::handleClosure");
 }
index 1be2113fef7f68337bcc60f7e1984c94d9084190..ec7ca5f7b19908583ed9f31e3308e90a2ada5fcb 100644 (file)
@@ -10,6 +10,8 @@
 #define _SQUID_SRC_LOG_TCPLOGGER_H
 
 #include "base/AsyncJob.h"
+#include "base/JobWait.h"
+#include "comm/forward.h"
 #include "ip/Address.h"
 
 #include <list>
@@ -103,6 +105,9 @@ private:
     Ip::Address remote; ///< where the remote logger expects our records
     AsyncCall::Pointer closer; ///< handles unexpected/external conn closures
 
+    /// waits for a connection to the remote logger to be established/opened
+    JobWait<Comm::ConnOpener> connWait;
+
     uint64_t connectFailures; ///< number of sequential connection failures
     uint64_t drops; ///< number of records dropped during the current outage
 };
index 8edba1d783baaa03673cad5042ffbb7b0a294dae..e4da4ca088d1e9015f6f8aebfaa8d961135298c6 100644 (file)
@@ -100,7 +100,11 @@ void
 Mgr::Forwarder::noteCommClosed(const CommCloseCbParams &)
 {
     debugs(16, 5, HERE);
-    conn = NULL; // needed?
+    closer = nullptr;
+    if (conn) {
+        conn->noteClosure();
+        conn = nullptr;
+    }
     mustStop("commClosed");
 }
 
index ba41d7aa6eefe97dbb17fda2ee43553cf31a28cd..dea0452de0455a7dbcfdbe9ee71a0ce48b6ab1dc 100644 (file)
@@ -107,11 +107,14 @@ Mgr::Inquirer::noteWroteHeader(const CommIoCbParams& params)
 
 /// called when the HTTP client or some external force closed our socket
 void
-Mgr::Inquirer::noteCommClosed(const CommCloseCbParams& params)
+Mgr::Inquirer::noteCommClosed(const CommCloseCbParams &)
 {
     debugs(16, 5, HERE);
-    Must(!Comm::IsConnOpen(conn) && params.conn.getRaw() == conn.getRaw());
-    conn = NULL;
+    closer = nullptr;
+    if (conn) {
+        conn->noteClosure();
+        conn = nullptr;
+    }
     mustStop("commClosed");
 }
 
index e2cccec2f54e2e8d9704ca44435ab0d8a41210c5..d3cf73888889fcbdd564eb896ac4a48c7c2f9805 100644 (file)
@@ -131,7 +131,11 @@ void
 Mgr::StoreToCommWriter::noteCommClosed(const CommCloseCbParams &)
 {
     debugs(16, 6, HERE);
-    Must(!Comm::IsConnOpen(clientConnection));
+    if (clientConnection) {
+        clientConnection->noteClosure();
+        clientConnection = nullptr;
+    }
+    closer = nullptr;
     mustStop("commClosed");
 }
 
index 4c1401f6e85eb7bde6a42eca18246debdfd5c1e6..494edabb84d05780259fa828aa05e8350cdf795a 100644 (file)
@@ -89,9 +89,15 @@ Security::PeerConnector::start()
 void
 Security::PeerConnector::commCloseHandler(const CommCloseCbParams &params)
 {
+    debugs(83, 5, "FD " << params.fd << ", Security::PeerConnector=" << params.data);
+
     closeHandler = nullptr;
+    if (serverConn) {
+        countFailingConnection();
+        serverConn->noteClosure();
+        serverConn = nullptr;
+    }
 
-    debugs(83, 5, "FD " << params.fd << ", Security::PeerConnector=" << params.data);
     const auto err = new ErrorState(ERR_SECURE_CONNECT_FAIL, Http::scServiceUnavailable, request.getRaw(), al);
     static const auto d = MakeNamedErrorDetail("TLS_CONNECT_CLOSE");
     err->detailError(d);
@@ -111,6 +117,8 @@ Security::PeerConnector::commTimeoutHandler(const CommTimeoutCbParams &)
 bool
 Security::PeerConnector::initialize(Security::SessionPointer &serverSession)
 {
+    Must(Comm::IsConnOpen(serverConnection()));
+
     Security::ContextPointer ctx(getTlsContext());
     debugs(83, 5, serverConnection() << ", ctx=" << (void*)ctx.get());
 
@@ -162,6 +170,8 @@ Security::PeerConnector::initialize(Security::SessionPointer &serverSession)
 void
 Security::PeerConnector::recordNegotiationDetails()
 {
+    Must(Comm::IsConnOpen(serverConnection()));
+
     const int fd = serverConnection()->fd;
     Security::SessionPointer session(fd_table[fd].ssl);
 
@@ -180,6 +190,8 @@ Security::PeerConnector::recordNegotiationDetails()
 void
 Security::PeerConnector::negotiate()
 {
+    Must(Comm::IsConnOpen(serverConnection()));
+
     const int fd = serverConnection()->fd;
     if (fd_table[fd].closing())
         return;
@@ -224,7 +236,7 @@ Security::PeerConnector::handleNegotiationResult(const Security::IoResult &resul
     switch (result.category) {
     case Security::IoResult::ioSuccess:
         recordNegotiationDetails();
-        if (sslFinalized())
+        if (sslFinalized() && callback)
             sendSuccess();
         return; // we may be gone by now
 
@@ -252,6 +264,7 @@ Security::PeerConnector::sslFinalized()
 {
 #if USE_OPENSSL
     if (Ssl::TheConfig.ssl_crt_validator && useCertValidator_) {
+        Must(Comm::IsConnOpen(serverConnection()));
         const int fd = serverConnection()->fd;
         Security::SessionPointer session(fd_table[fd].ssl);
 
@@ -295,6 +308,7 @@ void
 Security::PeerConnector::sslCrtvdHandleReply(Ssl::CertValidationResponse::Pointer validationResponse)
 {
     Must(validationResponse != NULL);
+    Must(Comm::IsConnOpen(serverConnection()));
 
     ErrorDetail::Pointer errDetails;
     bool validatorFailed = false;
@@ -317,7 +331,8 @@ Security::PeerConnector::sslCrtvdHandleReply(Ssl::CertValidationResponse::Pointe
 
     if (!errDetails && !validatorFailed) {
         noteNegotiationDone(NULL);
-        sendSuccess();
+        if (callback)
+            sendSuccess();
         return;
     }
 
@@ -343,6 +358,8 @@ Security::PeerConnector::sslCrtvdHandleReply(Ssl::CertValidationResponse::Pointe
 Security::CertErrors *
 Security::PeerConnector::sslCrtvdCheckForErrors(Ssl::CertValidationResponse const &resp, ErrorDetail::Pointer &errDetails)
 {
+    Must(Comm::IsConnOpen(serverConnection()));
+
     ACLFilledChecklist *check = NULL;
     Security::SessionPointer session(fd_table[serverConnection()->fd].ssl);
 
@@ -418,9 +435,11 @@ Security::PeerConnector::negotiateSsl()
 void
 Security::PeerConnector::noteWantRead()
 {
-    const int fd = serverConnection()->fd;
     debugs(83, 5, serverConnection());
 
+    Must(Comm::IsConnOpen(serverConnection()));
+    const int fd = serverConnection()->fd;
+
     // read timeout to avoid getting stuck while reading from a silent server
     typedef CommCbMemFunT<Security::PeerConnector, CommTimeoutCbParams> TimeoutDialer;
     AsyncCall::Pointer timeoutCall = JobCallback(83, 5,
@@ -434,8 +453,10 @@ Security::PeerConnector::noteWantRead()
 void
 Security::PeerConnector::noteWantWrite()
 {
-    const int fd = serverConnection()->fd;
     debugs(83, 5, serverConnection());
+    Must(Comm::IsConnOpen(serverConnection()));
+
+    const int fd = serverConnection()->fd;
     Comm::SetSelect(fd, COMM_SELECT_WRITE, &NegotiateSsl, new Pointer(this), 0);
     return;
 }
@@ -452,57 +473,76 @@ Security::PeerConnector::noteNegotiationError(const Security::ErrorDetailPointer
     bail(anErr);
 }
 
+Security::EncryptorAnswer &
+Security::PeerConnector::answer()
+{
+    assert(callback);
+    const auto dialer = dynamic_cast<CbDialer*>(callback->getDialer());
+    assert(dialer);
+    return dialer->answer();
+}
+
 void
 Security::PeerConnector::bail(ErrorState *error)
 {
     Must(error); // or the recepient will not know there was a problem
-    Must(callback != NULL);
-    CbDialer *dialer = dynamic_cast<CbDialer*>(callback->getDialer());
-    Must(dialer);
-    dialer->answer().error = error;
+    answer().error = error;
 
-    if (const auto p = serverConnection()->getPeer())
-        peerConnectFailed(p);
+    if (const auto failingConnection = serverConn) {
+        countFailingConnection();
+        disconnect();
+        failingConnection->close();
+    }
 
     callBack();
-    disconnect();
-
-    if (noteFwdPconnUse)
-        fwdPconnPool->noteUses(fd_table[serverConn->fd].pconn.uses);
-    serverConn->close();
-    serverConn = nullptr;
 }
 
 void
 Security::PeerConnector::sendSuccess()
 {
-    callBack();
+    assert(Comm::IsConnOpen(serverConn));
+    answer().conn = serverConn;
     disconnect();
+    callBack();
+}
+
+void
+Security::PeerConnector::countFailingConnection()
+{
+    assert(serverConn);
+    if (const auto p = serverConn->getPeer())
+        peerConnectFailed(p);
+    // TODO: Calling PconnPool::noteUses() should not be our responsibility.
+    if (noteFwdPconnUse && serverConn->isOpen())
+        fwdPconnPool->noteUses(fd_table[serverConn->fd].pconn.uses);
 }
 
 void
 Security::PeerConnector::disconnect()
 {
+    const auto stillOpen = Comm::IsConnOpen(serverConn);
+
     if (closeHandler) {
-        comm_remove_close_handler(serverConnection()->fd, closeHandler);
+        if (stillOpen)
+            comm_remove_close_handler(serverConn->fd, closeHandler);
         closeHandler = nullptr;
     }
 
-    commUnsetConnTimeout(serverConnection());
+    if (stillOpen)
+        commUnsetConnTimeout(serverConn);
+
+    serverConn = nullptr;
 }
 
 void
 Security::PeerConnector::callBack()
 {
-    debugs(83, 5, "TLS setup ended for " << serverConnection());
+    debugs(83, 5, "TLS setup ended for " << answer().conn);
 
     AsyncCall::Pointer cb = callback;
     // Do this now so that if we throw below, swanSong() assert that we _tried_
     // to call back holds.
     callback = NULL; // this should make done() true
-    CbDialer *dialer = dynamic_cast<CbDialer*>(cb->getDialer());
-    Must(dialer);
-    dialer->answer().conn = serverConnection();
     ScheduleCallHere(cb);
 }
 
@@ -511,8 +551,9 @@ Security::PeerConnector::swanSong()
 {
     // XXX: unregister fd-closure monitoring and CommSetSelect interest, if any
     AsyncJob::swanSong();
-    if (callback != NULL) { // paranoid: we have left the caller waiting
-        debugs(83, DBG_IMPORTANT, "BUG: Unexpected state while connecting to a cache_peer or origin server");
+
+    if (callback) {
+        // job-ending emergencies like handleStopRequest() or callException()
         const auto anErr = new ErrorState(ERR_GATEWAY_FAILURE, Http::scInternalServerError, request.getRaw(), al);
         bail(anErr);
         assert(!callback);
@@ -533,7 +574,7 @@ Security::PeerConnector::status() const
         buf.append("Stopped, reason:", 16);
         buf.appendf("%s",stopReason);
     }
-    if (serverConn != NULL)
+    if (Comm::IsConnOpen(serverConn))
         buf.appendf(" FD %d", serverConn->fd);
     buf.appendf(" %s%u]", id.prefix(), id.value);
     buf.terminate();
@@ -581,15 +622,18 @@ Security::PeerConnector::startCertDownloading(SBuf &url)
                                       PeerConnectorCertDownloaderDialer(&Security::PeerConnector::certDownloadingDone, this));
 
     const auto dl = new Downloader(url, certCallback, XactionInitiator::initCertFetcher, certDownloadNestingLevel() + 1);
-    AsyncJob::Start(dl);
+    certDownloadWait.start(dl, certCallback);
 }
 
 void
 Security::PeerConnector::certDownloadingDone(SBuf &obj, int downloadStatus)
 {
+    certDownloadWait.finish();
+
     ++certsDownloads;
     debugs(81, 5, "Certificate downloading status: " << downloadStatus << " certificate size: " << obj.length());
 
+    Must(Comm::IsConnOpen(serverConnection()));
     const auto &sconn = *fd_table[serverConnection()->fd].ssl;
 
     // Parse Certificate. Assume that it is in DER format.
@@ -642,6 +686,7 @@ Security::PeerConnector::certDownloadingDone(SBuf &obj, int downloadStatus)
 void
 Security::PeerConnector::handleMissingCertificates(const Security::IoResult &ioResult)
 {
+    Must(Comm::IsConnOpen(serverConnection()));
     auto &sconn = *fd_table[serverConnection()->fd].ssl;
 
     // We download the missing certificate(s) once. We would prefer to clear
index 942f8627aab26e0866fb3dcf3f6c9b232a15dc58..10700d796bb7ab3ea106da92e97e0f93d60004b3 100644 (file)
@@ -12,6 +12,7 @@
 #include "acl/Acl.h"
 #include "base/AsyncCbdataCalls.h"
 #include "base/AsyncJob.h"
+#include "base/JobWait.h"
 #include "CommCalls.h"
 #include "http/forward.h"
 #include "security/EncryptorAnswer.h"
@@ -24,6 +25,7 @@
 #include <queue>
 
 class ErrorState;
+class Downloader;
 class AccessLogEntry;
 typedef RefCount<AccessLogEntry> AccessLogEntryPointer;
 
@@ -152,6 +154,9 @@ protected:
     /// a bail(), sendSuccess() helper: stops monitoring the connection
     void disconnect();
 
+    /// updates connection usage history before the connection is closed
+    void countFailingConnection();
+
     /// If called the certificates validator will not used
     void bypassCertValidator() {useCertValidator_ = false;}
 
@@ -159,6 +164,9 @@ protected:
     /// logging
     void recordNegotiationDetails();
 
+    /// convenience method to get to the answer fields
+    EncryptorAnswer &answer();
+
     HttpRequestPointer request; ///< peer connection trigger or cause
     Comm::ConnectionPointer serverConn; ///< TCP connection to the peer
     AccessLogEntryPointer al; ///< info for the future access.log entry
@@ -203,6 +211,8 @@ private:
 
     /// outcome of the last (failed and) suspended negotiation attempt (or nil)
     Security::IoResultPointer suspendedError_;
+
+    JobWait<Downloader> certDownloadWait; ///< waits for the missing certificate to be downloaded
 };
 
 } // namespace Security
index dce66e397820bc705f97a1a15e9948f7d811a12d..26225aae01558ac742707fa6e7f7d76e1ec457f3 100644 (file)
@@ -172,6 +172,7 @@ class ParsedOptions {}; // we never parse/use TLS options in this case
 typedef long ParsedPortFlags;
 
 class PeerConnector;
+class BlindPeerConnector;
 class PeerOptions;
 
 #if USE_OPENSSL
index 53f822d2f9f58924dc5204a8a85f2fa6016de337..e0a10b6b517aa6554a7f3f492b5027a16155b46a 100644 (file)
@@ -61,7 +61,7 @@ Ftp::Server::Server(const MasterXaction::Pointer &xact):
     dataConn(),
     uploadAvailSize(0),
     listener(),
-    connector(),
+    dataConnWait(),
     reader(),
     waitingForOrigin(false),
     originDataDownloadAbortedOnError(false)
@@ -1676,11 +1676,11 @@ Ftp::Server::checkDataConnPre()
 
     // active transfer: open a data connection from Squid to client
     typedef CommCbMemFunT<Server, CommConnectCbParams> Dialer;
-    connector = JobCallback(17, 3, Dialer, this, Ftp::Server::connectedForData);
-    Comm::ConnOpener *cs = new Comm::ConnOpener(dataConn, connector,
-            Config.Timeout.connect);
-    AsyncJob::Start(cs);
-    return false; // ConnStateData::processFtpRequest waits handleConnectDone
+    AsyncCall::Pointer callback = JobCallback(17, 3, Dialer, this, Ftp::Server::connectedForData);
+    const auto cs = new Comm::ConnOpener(dataConn->cloneProfile(), callback,
+                                         Config.Timeout.connect);
+    dataConnWait.start(cs, callback);
+    return false;
 }
 
 /// Check that client data connection is ready for immediate I/O.
@@ -1698,18 +1698,22 @@ Ftp::Server::checkDataConnPost() const
 void
 Ftp::Server::connectedForData(const CommConnectCbParams &params)
 {
-    connector = NULL;
+    dataConnWait.finish();
 
     if (params.flag != Comm::OK) {
-        /* it might have been a timeout with a partially open link */
-        if (params.conn != NULL)
-            params.conn->close();
         setReply(425, "Cannot open data connection.");
         Http::StreamPointer context = pipeline.front();
         Must(context->http);
         Must(context->http->storeEntry() != NULL);
+        // TODO: call closeDataConnection() to reset data conn processing?
     } else {
-        Must(dataConn == params.conn);
+        // Finalize the details and start owning the supplied connection.
+        assert(params.conn);
+        assert(dataConn);
+        assert(!dataConn->isOpen());
+        dataConn = params.conn;
+        // XXX: Missing comm_add_close_handler() to track external closures.
+
         Must(Comm::IsConnOpen(params.conn));
         fd_note(params.conn->fd, "active client ftp data");
     }
index fb9ef9081d70656e4210379b0cdbbf440fdd0b62..d6779982728027672809c784354f4bd0e199836e 100644 (file)
 #ifndef SQUID_SERVERS_FTP_SERVER_H
 #define SQUID_SERVERS_FTP_SERVER_H
 
+#include "base/JobWait.h"
 #include "base/Lock.h"
 #include "client_side.h"
+#include "comm/forward.h"
 
 namespace Ftp
 {
@@ -188,7 +190,11 @@ private:
     size_t uploadAvailSize; ///< number of yet unused uploadBuf bytes
 
     AsyncCall::Pointer listener; ///< set when we are passively listening
-    AsyncCall::Pointer connector; ///< set when we are actively connecting
+
+    /// Waits for an FTP data connection to the client to be established/opened.
+    /// This wait only happens in FTP active mode (via PORT or EPRT).
+    JobWait<Comm::ConnOpener> dataConnWait;
+
     AsyncCall::Pointer reader; ///< set when we are reading FTP data
 
     /// whether we wait for the origin data transfer to end
index 836eb5772163e1147430bef27d3db4f227320e43..259bb0441baced3e3db9c1139cf94166310f6fb7 100644 (file)
@@ -53,6 +53,7 @@ Snmp::Forwarder::noteCommClosed(const CommCloseCbParams& params)
 {
     debugs(49, 5, HERE);
     Must(fd == params.fd);
+    closer = nullptr;
     fd = -1;
     mustStop("commClosed");
 }
@@ -68,8 +69,7 @@ void
 Snmp::Forwarder::handleException(const std::exception& e)
 {
     debugs(49, 3, HERE << e.what());
-    if (fd >= 0)
-        sendError(SNMP_ERR_GENERR);
+    sendError(SNMP_ERR_GENERR);
     Ipc::Forwarder::handleException(e);
 }
 
@@ -78,6 +78,10 @@ void
 Snmp::Forwarder::sendError(int error)
 {
     debugs(49, 3, HERE);
+
+    if (fd < 0)
+        return; // client gone
+
     Snmp::Request& req = static_cast<Snmp::Request&>(*request);
     req.pdu.command = SNMP_PDU_RESPONSE;
     req.pdu.errstat = error;
index 9b6e34405e34325189b9b2a2c7192b9474bfec25..82f8c44752f01da7678c47d8c97095a5cf52fe86 100644 (file)
@@ -88,7 +88,11 @@ Snmp::Inquirer::noteCommClosed(const CommCloseCbParams& params)
 {
     debugs(49, 5, HERE);
     Must(!Comm::IsConnOpen(conn) || conn->fd == params.conn->fd);
-    conn = NULL;
+    closer = nullptr;
+    if (conn) {
+        conn->noteClosure();
+        conn = nullptr;
+    }
     mustStop("commClosed");
 }
 
@@ -102,6 +106,10 @@ void
 Snmp::Inquirer::sendResponse()
 {
     debugs(49, 5, HERE);
+
+    if (!Comm::IsConnOpen(conn))
+        return; // client gone
+
     aggrPdu.fixAggregate();
     aggrPdu.command = SNMP_PDU_RESPONSE;
     u_char buffer[SNMP_REQUEST_SIZE];
index 9a4055355f4820ebe3039ad1ae740d8bceb394ec..89e45435b7d389fd974540920b0c737d4493dbfe 100644 (file)
@@ -27,18 +27,18 @@ CBDATA_NAMESPACED_CLASS_INIT(Ssl, PeekingPeerConnector);
 void switchToTunnel(HttpRequest *request, const Comm::ConnectionPointer &clientConn, const Comm::ConnectionPointer &srvConn, const SBuf &preReadServerData);
 
 void
-Ssl::PeekingPeerConnector::cbCheckForPeekAndSpliceDone(Acl::Answer answer, void *data)
+Ssl::PeekingPeerConnector::cbCheckForPeekAndSpliceDone(const Acl::Answer aclAnswer, void *data)
 {
     Ssl::PeekingPeerConnector *peerConnect = (Ssl::PeekingPeerConnector *) data;
     // Use job calls to add done() checks and other job logic/protections.
-    CallJobHere1(83, 7, CbcPointer<PeekingPeerConnector>(peerConnect), Ssl::PeekingPeerConnector, checkForPeekAndSpliceDone, answer);
+    CallJobHere1(83, 7, CbcPointer<PeekingPeerConnector>(peerConnect), Ssl::PeekingPeerConnector, checkForPeekAndSpliceDone, aclAnswer);
 }
 
 void
-Ssl::PeekingPeerConnector::checkForPeekAndSpliceDone(Acl::Answer answer)
+Ssl::PeekingPeerConnector::checkForPeekAndSpliceDone(const Acl::Answer aclAnswer)
 {
-    const Ssl::BumpMode finalAction = answer.allowed() ?
-                                      static_cast<Ssl::BumpMode>(answer.kind):
+    const Ssl::BumpMode finalAction = aclAnswer.allowed() ?
+                                      static_cast<Ssl::BumpMode>(aclAnswer.kind):
                                       checkForPeekAndSpliceGuess();
     checkForPeekAndSpliceMatched(finalAction);
 }
@@ -106,10 +106,8 @@ Ssl::PeekingPeerConnector::checkForPeekAndSpliceMatched(const Ssl::BumpMode acti
         splice = true;
         // Ssl Negotiation stops here. Last SSL checks for valid certificates
         // and if done, switch to tunnel mode
-        if (sslFinalized()) {
-            debugs(83,5, "Abort NegotiateSSL on FD " << serverConn->fd << " and splice the connection");
+        if (sslFinalized() && callback)
             callBack();
-        }
     }
 }
 
@@ -272,8 +270,11 @@ Ssl::PeekingPeerConnector::startTunneling()
     auto b = SSL_get_rbio(session.get());
     auto srvBio = static_cast<Ssl::ServerBio*>(BIO_get_data(b));
 
+    debugs(83, 5, "will tunnel instead of negotiating TLS");
     switchToTunnel(request.getRaw(), clientConn, serverConn, srvBio->rBufData());
-    tunnelInsteadOfNegotiating();
+    answer().tunneled = true;
+    disconnect();
+    callBack();
 }
 
 void
@@ -397,13 +398,3 @@ Ssl::PeekingPeerConnector::serverCertificateVerified()
     }
 }
 
-void
-Ssl::PeekingPeerConnector::tunnelInsteadOfNegotiating()
-{
-    Must(callback != NULL);
-    CbDialer *dialer = dynamic_cast<CbDialer*>(callback->getDialer());
-    Must(dialer);
-    dialer->answer().tunneled = true;
-    debugs(83, 5, "The SSL negotiation with server aborted");
-}
-
index 3c86b887de7c58c4df49b543b5accac99c77b4f5..0145e77f930bcda708e81257d59271a5e919b887 100644 (file)
@@ -51,7 +51,7 @@ public:
     void checkForPeekAndSplice();
 
     /// Callback function for ssl_bump acl check in step3  SSL bump step.
-    void checkForPeekAndSpliceDone(Acl::Answer answer);
+    void checkForPeekAndSpliceDone(Acl::Answer);
 
     /// Handles the final bumping decision.
     void checkForPeekAndSpliceMatched(const Ssl::BumpMode finalMode);
@@ -67,7 +67,7 @@ public:
     void startTunneling();
 
     /// A wrapper function for checkForPeekAndSpliceDone for use with acl
-    static void cbCheckForPeekAndSpliceDone(Acl::Answer answer, void *data);
+    static void cbCheckForPeekAndSpliceDone(Acl::Answer, void *data);
 
 private:
 
index 45aa6deb2e4a24252481305b7862cbd45f3aec32..8fdb40bbde2a139af9c0e9fcce28cc772b91dd1e 100644 (file)
@@ -22,9 +22,9 @@ void Comm::AcceptLimiter::kick() STUB
 #include "comm/Connection.h"
 Comm::Connection::Connection() STUB
 Comm::Connection::~Connection() STUB
-Comm::ConnectionPointer Comm::Connection::cloneIdentDetails() const STUB_RETVAL(nullptr)
-Comm::ConnectionPointer Comm::Connection::cloneDestinationDetails() const STUB_RETVAL(nullptr)
+Comm::ConnectionPointer Comm::Connection::cloneProfile() const STUB_RETVAL(nullptr)
 void Comm::Connection::close() STUB
+void Comm::Connection::noteClosure() STUB
 CachePeer * Comm::Connection::getPeer() const STUB_RETVAL(NULL)
 void Comm::Connection::setPeer(CachePeer * p) STUB
 ScopedId Comm::Connection::codeContextGist() const STUB_RETVAL(id.detach())
index aa9cf0b364aea39d25819ab7d56973ecf7ea81eb..176cf337e7d5467fff36fa57fd3d1d5ab0ff481c 100644 (file)
@@ -9,6 +9,7 @@
 #include "squid.h"
 #include "AccessLogEntry.h"
 #include "comm/Connection.h"
+#include "Downloader.h"
 #include "HttpRequest.h"
 
 #define STUB_API "security/libsecurity.la"
@@ -88,7 +89,9 @@ void PeerConnector::bail(ErrorState *) STUB
 void PeerConnector::sendSuccess() STUB
 void PeerConnector::callBack() STUB
 void PeerConnector::disconnect() STUB
+void PeerConnector::countFailingConnection() STUB
 void PeerConnector::recordNegotiationDetails() STUB
+EncryptorAnswer &PeerConnector::answer() STUB_RETREF(EncryptorAnswer)
 }
 
 #include "security/PeerOptions.h"
index 386a0ecd6103405148af891a6f64d844b83b896c..4fc5abdeffbad216b903010047880f17e3bb762f 100644 (file)
@@ -11,6 +11,7 @@
 #include "squid.h"
 #include "acl/FilledChecklist.h"
 #include "base/CbcPointer.h"
+#include "base/JobWait.h"
 #include "CachePeer.h"
 #include "cbdata.h"
 #include "client_side.h"
@@ -180,9 +181,6 @@ public:
     SBuf preReadClientData;
     SBuf preReadServerData;
     time_t startTime; ///< object creation time, before any peer selection/connection attempts
-    /// Whether we are waiting for the CONNECT request/response exchange with the peer.
-    bool waitingForConnectExchange;
-    HappyConnOpenerPointer connOpener; ///< current connection opening job
     ResolvedPeersPointer destinations; ///< paths for forwarding the request
     bool destinationsFound; ///< At least one candidate path found
     /// whether another destination may be still attempted if the TCP connection
@@ -191,10 +189,15 @@ public:
     // TODO: remove after fixing deferred reads in TunnelStateData::copyRead()
     CodeContext::Pointer codeContext; ///< our creator context
 
-    // AsyncCalls which we set and may need cancelling.
-    struct {
-        AsyncCall::Pointer connector;  ///< a call linking us to the ConnOpener producing serverConn.
-    } calls;
+    /// waits for a transport connection to the peer to be established/opened
+    JobWait<HappyConnOpener> transportWait;
+
+    /// waits for the established transport connection to be secured/encrypted
+    JobWait<Security::PeerConnector> encryptionWait;
+
+    /// waits for an HTTP CONNECT tunnel through a cache_peer to be negotiated
+    /// over the (encrypted, if needed) transport connection to that cache_peer
+    JobWait<Http::Tunneler> peerWait;
 
     void copyRead(Connection &from, IOCB *completion);
 
@@ -212,12 +215,6 @@ public:
     /// when all candidate destinations have been tried and all have failed
     void noteConnection(HappyConnOpenerAnswer &);
 
-    /// whether we are waiting for HappyConnOpener
-    /// same as calls.connector but may differ from connOpener.valid()
-    bool opening() const { return connOpener.set(); }
-
-    void cancelOpening(const char *reason);
-
     /// Start using an established connection
     void connectDone(const Comm::ConnectionPointer &conn, const char *origin, const bool reused);
 
@@ -266,6 +263,9 @@ private:
 
     /// \returns whether the request should be retried (nil) or the description why it should not
     const char *checkRetry();
+    /// whether the successfully selected path destination or the established
+    /// server connection is still in use
+    bool usingDestination() const;
 
     /// details of the "last tunneling attempt" failure (if it failed)
     ErrorState *savedError = nullptr;
@@ -275,6 +275,8 @@ private:
 
     void deleteThis();
 
+    void cancelStep(const char *reason);
+
 public:
     bool keepGoingAfterRead(size_t len, Comm::Flag errcode, int xerrno, Connection &from, Connection &to);
     void copy(size_t len, Connection &from, Connection &to, IOCB *);
@@ -357,7 +359,6 @@ TunnelStateData::deleteThis()
 
 TunnelStateData::TunnelStateData(ClientHttpRequest *clientRequest) :
     startTime(squid_curtime),
-    waitingForConnectExchange(false),
     destinations(new ResolvedPeers()),
     destinationsFound(false),
     retriable(true),
@@ -390,8 +391,7 @@ TunnelStateData::~TunnelStateData()
     debugs(26, 3, "TunnelStateData destructed this=" << this);
     assert(noConnections());
     xfree(url);
-    if (opening())
-        cancelOpening("~TunnelStateData");
+    cancelStep("~TunnelStateData");
     delete savedError;
 }
 
@@ -904,7 +904,10 @@ TunnelStateData::copyServerBytes()
 static void
 tunnelStartShoveling(TunnelStateData *tunnelState)
 {
-    assert(!tunnelState->waitingForConnectExchange);
+    assert(!tunnelState->transportWait);
+    assert(!tunnelState->encryptionWait);
+    assert(!tunnelState->peerWait);
+
     assert(tunnelState->server.conn);
     AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "tunnelTimeout",
                                      CommTimeoutCbPtrFun(tunnelTimeout, tunnelState));
@@ -962,6 +965,7 @@ tunnelConnectedWriteDone(const Comm::ConnectionPointer &conn, char *, size_t len
 void
 TunnelStateData::tunnelEstablishmentDone(Http::TunnelerAnswer &answer)
 {
+    peerWait.finish();
     server.len = 0;
 
     if (logTag_ptr)
@@ -970,13 +974,11 @@ TunnelStateData::tunnelEstablishmentDone(Http::TunnelerAnswer &answer)
     if (answer.peerResponseStatus != Http::scNone)
         *status_ptr = answer.peerResponseStatus;
 
-    waitingForConnectExchange = false;
-
     auto sawProblem = false;
 
     if (!answer.positive()) {
         sawProblem = true;
-        Must(!Comm::IsConnOpen(answer.conn));
+        assert(!answer.conn);
     } else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) {
         sawProblem = true;
         closePendingConnection(answer.conn, "conn was closed while waiting for tunnelEstablishmentDone");
@@ -1042,8 +1044,7 @@ tunnelErrorComplete(int fd/*const Comm::ConnectionPointer &*/, void *data, size_
 void
 TunnelStateData::noteConnection(HappyConnOpener::Answer &answer)
 {
-    calls.connector = nullptr;
-    connOpener.clear();
+    transportWait.finish();
 
     ErrorState *error = nullptr;
     if ((error = answer.error.get())) {
@@ -1167,7 +1168,7 @@ TunnelStateData::secureConnectionToPeer(const Comm::ConnectionPointer &conn)
     AsyncCall::Pointer callback = asyncCall(5,4, "TunnelStateData::noteSecurityPeerConnectorAnswer",
                                             MyAnswerDialer(&TunnelStateData::noteSecurityPeerConnectorAnswer, this));
     const auto connector = new Security::BlindPeerConnector(request, conn, callback, al);
-    AsyncJob::Start(connector); // will call our callback
+    encryptionWait.start(connector, callback);
 }
 
 /// starts a preparation step for an established connection; retries on failures
@@ -1194,9 +1195,12 @@ TunnelStateData::advanceDestination(const char *stepDescription, const Comm::Con
 void
 TunnelStateData::noteSecurityPeerConnectorAnswer(Security::EncryptorAnswer &answer)
 {
+    encryptionWait.finish();
+
     ErrorState *error = nullptr;
+    assert(!answer.tunneled);
     if ((error = answer.error.get())) {
-        Must(!Comm::IsConnOpen(answer.conn));
+        assert(!answer.conn);
         answer.error.clear();
     } else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) {
         error = new ErrorState(ERR_CANNOT_FORWARD, Http::scServiceUnavailable, request.getRaw(), al);
@@ -1223,8 +1227,6 @@ TunnelStateData::connectedToPeer(const Comm::ConnectionPointer &conn)
 void
 TunnelStateData::establishTunnelThruProxy(const Comm::ConnectionPointer &conn)
 {
-    assert(!waitingForConnectExchange);
-
     AsyncCall::Pointer callback = asyncCall(5,4,
                                             "TunnelStateData::tunnelEstablishmentDone",
                                             Http::Tunneler::CbDialer<TunnelStateData>(&TunnelStateData::tunnelEstablishmentDone, this));
@@ -1232,9 +1234,7 @@ TunnelStateData::establishTunnelThruProxy(const Comm::ConnectionPointer &conn)
 #if USE_DELAY_POOLS
     tunneler->setDelayId(server.delayId);
 #endif
-    AsyncJob::Start(tunneler);
-    waitingForConnectExchange = true;
-    // and wait for the tunnelEstablishmentDone() call
+    peerWait.start(tunneler, callback);
 }
 
 void
@@ -1252,14 +1252,14 @@ TunnelStateData::noteDestination(Comm::ConnectionPointer path)
 
     destinations->addPath(path);
 
-    if (Comm::IsConnOpen(server.conn)) {
+    if (usingDestination()) {
         // We are already using a previously opened connection but also
         // receiving destinations in case we need to re-forward.
-        Must(!opening());
+        Must(!transportWait);
         return;
     }
 
-    if (opening()) {
+    if (transportWait) {
         notifyConnOpener();
         return; // and continue to wait for tunnelConnectDone() callback
     }
@@ -1289,17 +1289,23 @@ TunnelStateData::noteDestinationsEnd(ErrorState *selectionError)
     // if all of them fail, tunneling as whole will fail
     Must(!selectionError); // finding at least one path means selection succeeded
 
-    if (Comm::IsConnOpen(server.conn)) {
+    if (usingDestination()) {
         // We are already using a previously opened connection but also
         // receiving destinations in case we need to re-forward.
-        Must(!opening());
+        Must(!transportWait);
         return;
     }
 
-    Must(opening()); // or we would be stuck with nothing to do or wait for
+    Must(transportWait); // or we would be stuck with nothing to do or wait for
     notifyConnOpener();
 }
 
+bool
+TunnelStateData::usingDestination() const
+{
+    return encryptionWait || peerWait || Comm::IsConnOpen(server.conn);
+}
+
 /// remembers an error to be used if there will be no more connection attempts
 void
 TunnelStateData::saveError(ErrorState *error)
@@ -1320,8 +1326,7 @@ TunnelStateData::sendError(ErrorState *finalError, const char *reason)
     if (request)
         request->hier.stopPeerClock(false);
 
-    if (opening())
-        cancelOpening(reason);
+    cancelStep(reason);
 
     assert(finalError);
 
@@ -1339,18 +1344,15 @@ TunnelStateData::sendError(ErrorState *finalError, const char *reason)
     errorSend(client.conn, finalError);
 }
 
-/// Notify connOpener that we no longer need connections. We do not have to do
-/// this -- connOpener would eventually notice on its own, but notifying reduces
-/// waste and speeds up spare connection opening for other transactions (that
-/// could otherwise wait for this transaction to use its spare allowance).
+/// Notify a pending subtask, if any, that we no longer need its help. We do not
+/// have to do this -- the subtask job will eventually end -- but ending it
+/// earlier reduces waste and may reduce DoS attack surface.
 void
-TunnelStateData::cancelOpening(const char *reason)
+TunnelStateData::cancelStep(const char *reason)
 {
-    assert(calls.connector);
-    calls.connector->cancel(reason);
-    calls.connector = nullptr;
-    notifyConnOpener();
-    connOpener.clear();
+    transportWait.cancel(reason);
+    encryptionWait.cancel(reason);
+    peerWait.cancel(reason);
 }
 
 void
@@ -1360,15 +1362,14 @@ TunnelStateData::startConnecting()
         request->hier.startPeerClock();
 
     assert(!destinations->empty());
-    assert(!opening());
-    calls.connector = asyncCall(17, 5, "TunnelStateData::noteConnection", HappyConnOpener::CbDialer<TunnelStateData>(&TunnelStateData::noteConnection, this));
-    const auto cs = new HappyConnOpener(destinations, calls.connector, request, startTime, 0, al);
+    assert(!usingDestination());
+    AsyncCall::Pointer callback = asyncCall(17, 5, "TunnelStateData::noteConnection", HappyConnOpener::CbDialer<TunnelStateData>(&TunnelStateData::noteConnection, this));
+    const auto cs = new HappyConnOpener(destinations, callback, request, startTime, 0, al);
     cs->setHost(request->url.host());
     cs->setRetriable(false);
     cs->allowPersistent(false);
     destinations->notificationPending = true; // start() is async
-    connOpener = cs;
-    AsyncJob::Start(cs);
+    transportWait.start(cs, callback);
 }
 
 /// send request on an existing connection dedicated to the requesting client
@@ -1417,7 +1418,7 @@ TunnelStateData::Connection::setDelayId(DelayId const &newDelay)
 
 #endif
 
-/// makes sure connOpener knows that destinations have changed
+/// makes sure connection opener knows that the destinations have changed
 void
 TunnelStateData::notifyConnOpener()
 {
@@ -1425,7 +1426,7 @@ TunnelStateData::notifyConnOpener()
         debugs(17, 7, "reusing pending notification");
     } else {
         destinations->notificationPending = true;
-        CallJobHere(17, 5, connOpener, HappyConnOpener, noteCandidatesChange);
+        CallJobHere(17, 5, transportWait.job(), HappyConnOpener, noteCandidatesChange);
     }
 }
 
index 38dfbacdee474e1397862818907737290dea4aed..306ea5c622f4c58af9e4d46954f7cf9d1db02f7e 100644 (file)
@@ -182,6 +182,7 @@ whoisClose(const CommCloseCbParams &params)
 {
     WhoisState *p = (WhoisState *)params.data;
     debugs(75, 3, "whoisClose: FD " << params.fd);
+    // We do not own a Connection. Assume that FwdState is also monitoring.
     p->entry->unlock("whoisClose");
     delete p;
 }