]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Bug 5055: FATAL FwdState::noteDestinationsEnd exception: opening (#877)
authorAlex Rousskov <rousskov@measurement-factory.com>
Sun, 22 Aug 2021 17:05:57 +0000 (17:05 +0000)
committerSquid Anubis <squid-anubis@squid-cache.org>
Sun, 22 Aug 2021 17:06:00 +0000 (17:06 +0000)
The bug was caused by commit 25b0ce4. Other known symptoms are:

    assertion failed: store.cc:1793: "isEmpty()"
    assertion failed: FwdState.cc:501: "serverConnection() == conn"
    assertion failed: FwdState.cc:1037: "!opening()"

This change has several overlapping parts. Unfortunately, merging
individual parts is both difficult and likely to cause crashes.

## Part 1: Bug 5055.

FwdState used to check serverConn to decide whether to open a connection
to forward the request. Since commit 25b0ce4, a nil serverConn pointer
no longer implies that a new connection should be opened: FwdState
helper jobs may already be working on preparing an existing open
connection (e.g., sending a CONNECT request or negotiating encryption).

Bad serverConn checks in both FwdState::noteDestination() and
FwdState::noteDestinationsEnd() methods led to extra connectStart()
calls creating two conflicting concurrent helper jobs.

To fix this, we replaced direct serverConn inspection with a
usingDestination() call which also checks whether we are waiting for a
helper job. Testing that fix exposed another set of bugs: The helper job
pointers or in-job connections were left stale/set after forwarding
failures. The changes described below addressed most of those problems.

## Part 2: Connection establishing helper jobs and their callbacks

A proper fix for Bug 5055 required answering a difficult question: When
should a dying job call its callbacks? We only found one answer which
required cooperation from the job creator and led to the following
rules:

* AsyncJob destructors must not call any callbacks.

* AsyncJob::swanSong() is responsible for async-calling any remaining
  (i.e. set, uncalled, and uncancelled) callbacks.

* AsyncJob::swanSong() is called (only) for started jobs.

* AsyncJob destructing sequence should validate that no callbacks remain
  uncalled for started jobs.

... where an AsyncJob x is considered "started" if AsyncJob::Start(x)
has returned without throwing.

A new JobWait class helps job creators follow these rules while keeping
track on in-progress helper jobs and killing no-longer-needed helpers.

Also fixed very similar bugs in tunnel.cc code.

## Part 3: ConnOpener fixes

1. Many ConnOpener users are written to keep a ConnectionPointer to the
   destination given to ConnOpener. This means that their connection
   magically opens when ConnOpener successfully connects, before
   ConnOpener has a chance to notify the user about the changes. Having
   multiple concurrent connection owners is always dangerous, and the
   user cannot even have a close handler registered for its now-open
   connection. When something happens to ConnOpener or its answer, the
   user job may be in trouble. Now, ConnOpener callers no longer pass
   Connection objects they own, cloning them as needed. That adjustment
   required adjustment 2:

2. Refactored ConnOpener users to stop assuming that the answer contains
   a pointer to their connection object. After adjustment 1 above, it
   does not. HappyConnOpener relied on that assumption quite a bit so we
   had to refactor to use two custom callback methods instead of one
   with a complicated if-statement distinguishing prime from spare
   attempts. This refactoring is an overall improvement because it
   simplifies the code. Other ConnOpener users just needed to remove a
   few no longer valid paranoid assertions/Musts.

3. ConnOpener users were forced to remember to close params.conn when
   processing negative answers. Some, naturally, forgot, triggering
   warnings about orphaned connections (e.g., Ident and TcpLogger).
   ConnOpener now closes its open connection before sending a negative
   answer.

4. ConnOpener would trigger orphan connection warnings if the job ended
   after opening the connection but without supplying the connection to
   the requestor (e.g., because the requestor has gone away). Now,
   ConnOpener explicitly closes its open connection if it has not been
   sent to the requestor.

Also fixed Comm::ConnOpener::cleanFd() debugging that was incorrectly
saying that the method closes the temporary descriptor.

Also fixed ConnOpener callback's syncWithComm(): The stale
CommConnectCbParams override was testing unused (i.e. always negative)
CommConnectCbParams::fd and was trying to cancel the callback that most
(possibly all) recipients rely on: ConnOpener users expect a negative
answer rather than no answer at all.

Also, after comparing the needs of two old/existing and a temporary
added ("clone everything") Connection cloning method callers, we decided
there is no need to maintain three different methods. All existing
callers should be fine with a single method because none of them suffers
from "extra" copying of members that others need. Right now, the new
cloneProfile() method copies everything except FD and a few
special-purpose members (with documented reasons for not copying).

Also added Comm::Connection::cloneDestinationDetails() debugging to
simplify tracking dependencies between half-baked Connection objects
carrying destination/flags/other metadata and open Connection objects
created by ConnOpener using that metadata (which are later delivered to
ConnOpener users and, in some cases, replace those half-baked
connections mentioned earlier. Long-term, we need to find a better way
to express these and other Connection states/stages than comments and
debugging messages.

## Part 4: Comm::Connection closure callbacks

We improved many closure callbacks to make sure (to the extent possible)
that Connection and other objects are in sync with Comm. There are lots
of small bugs, inconsistencies, and other problems in Connection closure
handlers. It is not clear whether any of those problems could result in
serious runtime errors or leaks. In theory, the rest of the code could
neutralize their negative side effects. However, even in that case, it
would only be a matter of time before the next bug bites us due to stale
Connection::fd and such. These changes themselves carry elevated risk,
but they get us closer to reliable code as far as Connection maintenance
is concerned. Without them, we will keep chasing deadly side effects of
poorly implemented closure callbacks.

Long-term, all these manual efforts to keep things in sync should become
unnecessary with the introduction of appropriate Connection ownership
APIs that automatically maintain the corresponding environments (TODO).

## Part 5: Other notable improvements in the adjusted code

Improved Security::PeerConnector::serverConn and
Http::Tunneler::connection management, especially when sending negative
answers. When sending a negative answer, we would set answer().conn to
an open connection, async-send that answer, and then hurry to close the
connection using our pointer to the shared Connection object. If
everything went according to the plan, the recipient would get a non-nil
but closed Connection object. Now, a negative answer simply means no
connection at all. Same for a tunneled answer.

Refactored ICAP connection-establishing code to to delay Connection
ownership until the ICAP connection is fully ready. This change
addresses primary Connection ownership concerns (as they apply to this
ICAP code) except orphaning of the temporary Connection object by helper
job start exceptions (now an explicit XXX). For example, the transaction
no longer shares a Connection object with ConnOpener and
IcapPeerConnector jobs.

Probably fixed a bug where PeerConnector::negotiate() assumed that
sslFinalized() does not return true after callBack(). It may (e.g., when
CertValidationHelper::Submit() throws). Same for
PeekingPeerConnector::checkForPeekAndSpliceMatched().

Fixed FwdState::advanceDestination() bug that did not save
ERR_GATEWAY_FAILURE details and "lost" the address of that failed
destination, making it unavailable to future retries (if any).

Polished PeerPoolMgr, Ident, and Gopher code to be able to fix similar
job callback and connection management issues.

Polished AsyncJob::Start() API. Start() used to return a job pointer,
but that was a bad idea:

* It implies that a failed Start() will return a nil pointer, and that
  the caller should check the result. Neither is true.

* It encourages callers to dereference the returned pointer to further
  adjust the job. That technically works (today) but violates the rules
  of communicating with an async job. The Start() method is the boundary
  after which the job is deemed asynchronous.

Also removed old "and wait for..." post-Start() comments because the
code itself became clear enough, and those comments were becoming
increasingly stale (because they duplicated the callback names above
them).

Fix Tunneler and PeerConnector handling of last-resort callback
requirements. Events like handleStopRequest() and callException() stop
the job but should not be reported as a BUG (e.g., it would be up to the
callException() to decide how to report the caught exception). There
might (or there will) be other, similar cases where the job is stopped
prematurely for some non-BUG reason beyond swanSong() knowledge. The
existence of non-bug cases does not mean there could be no bugs worth
reporting here, but until they can be identified more reliably than all
these benign/irrelevant cases, reporting no BUGs is a (much) lesser
evil.

TODO: Revise AsyncJob::doneAll(). Many of its overrides are written to
check for both positive (i.e. mission accomplished) and negative (i.e.
mission cancelled or cannot be accomplished) conditions, but the latter
is usually unnecessary, especially after we added handleStopRequest()
API to properly support external job cancellation events. Many doneAll()
overrides can probably be greatly simplified.

61 files changed:
src/BodyPipe.cc
src/CommCalls.cc
src/Downloader.cc
src/FwdState.cc
src/FwdState.h
src/HappyConnOpener.cc
src/HappyConnOpener.h
src/PeerPoolMgr.cc
src/PeerPoolMgr.h
src/ResolvedPeers.cc
src/adaptation/icap/ModXact.cc
src/adaptation/icap/ModXact.h
src/adaptation/icap/OptXact.cc
src/adaptation/icap/OptXact.h
src/adaptation/icap/ServiceRep.cc
src/adaptation/icap/ServiceRep.h
src/adaptation/icap/Xaction.cc
src/adaptation/icap/Xaction.h
src/base/AsyncJob.cc
src/base/AsyncJob.h
src/base/JobWait.cc [new file with mode: 0644]
src/base/JobWait.h [new file with mode: 0644]
src/base/Makefile.am
src/base/forward.h
src/client_side.cc
src/clients/FtpClient.cc
src/clients/FtpClient.h
src/clients/FtpGateway.cc
src/clients/FtpRelay.cc
src/clients/HttpTunneler.cc
src/clients/HttpTunneler.h
src/clients/forward.h
src/comm.cc
src/comm/ConnOpener.cc
src/comm/ConnOpener.h
src/comm/Connection.cc
src/comm/Connection.h
src/comm/TcpAcceptor.cc
src/dns_internal.cc
src/eui/Eui48.h
src/fs/rock/RockRebuild.cc
src/gopher.cc
src/ident/Ident.cc
src/log/TcpLogger.cc
src/log/TcpLogger.h
src/mgr/Forwarder.cc
src/mgr/Inquirer.cc
src/mgr/StoreToCommWriter.cc
src/security/PeerConnector.cc
src/security/PeerConnector.h
src/security/forward.h
src/servers/FtpServer.cc
src/servers/FtpServer.h
src/snmp/Forwarder.cc
src/snmp/Inquirer.cc
src/ssl/PeekingPeerConnector.cc
src/ssl/PeekingPeerConnector.h
src/tests/stub_libcomm.cc
src/tests/stub_libsecurity.cc
src/tunnel.cc
src/whois.cc

index c4d8f349b95f6ae6ee731349cd611e0ce48d3d55..e321bfe1ede1cca9f746f4487b7c495be7301a5f 100644 (file)
@@ -335,6 +335,7 @@ BodyPipe::startAutoConsumptionIfNeeded()
         return;
 
     theConsumer = new BodySink(this);
+    AsyncJob::Start(theConsumer);
     debugs(91,7, HERE << "starting auto consumption" << status());
     scheduleBodyDataNotification();
 }
index ad0fcb7c4f3768720c7af3d4ff03dd346e366b53..753fa5cafc98a5290471944b02853573ee7b5aa5 100644 (file)
@@ -53,6 +53,9 @@ CommAcceptCbParams::CommAcceptCbParams(void *aData):
 {
 }
 
+// XXX: Add CommAcceptCbParams::syncWithComm(). Adjust syncWithComm() API if all
+// implementations always return true.
+
 void
 CommAcceptCbParams::print(std::ostream &os) const
 {
@@ -72,13 +75,24 @@ CommConnectCbParams::CommConnectCbParams(void *aData):
 bool
 CommConnectCbParams::syncWithComm()
 {
-    // drop the call if the call was scheduled before comm_close but
-    // is being fired after comm_close
-    if (fd >= 0 && fd_table[fd].closing()) {
-        debugs(5, 3, HERE << "dropping late connect call: FD " << fd);
-        return false;
+    assert(conn);
+
+    // change parameters if this is a successful callback that was scheduled
+    // after its Comm-registered connection started to close
+
+    if (flag != Comm::OK) {
+        assert(!conn->isOpen());
+        return true; // not a successful callback; cannot go out of sync
     }
-    return true; // now we are in sync and can handle the call
+
+    assert(conn->isOpen());
+    if (!fd_table[conn->fd].closing())
+        return true; // no closing in progress; in sync (for now)
+
+    debugs(5, 3, "converting to Comm::ERR_CLOSING: " << conn);
+    conn->noteClosure();
+    flag = Comm::ERR_CLOSING;
+    return true; // now the callback is in sync with Comm again
 }
 
 /* CommIoCbParams */
index ee5433b6581a5a5647763429807e8559b07900e7..ba703ea1e95c485222e02a1c20926284502ab269 100644 (file)
@@ -81,6 +81,10 @@ void
 Downloader::swanSong()
 {
     debugs(33, 6, this);
+
+    if (callback_) // job-ending emergencies like handleStopRequest() or callException()
+        callBack(Http::scInternalServerError);
+
     if (context_) {
         context_->finished();
         context_ = nullptr;
@@ -251,6 +255,7 @@ Downloader::downloadFinished()
 void
 Downloader::callBack(Http::StatusCode const statusCode)
 {
+    assert(callback_);
     CbDialer *dialer = dynamic_cast<CbDialer*>(callback_->getDialer());
     Must(dialer);
     dialer->status = statusCode;
index d10efeb4a3ffd704dcc5dae47079dd04f980de6b..f68538b45bcebd42cf25e13f7e400038bfa85d76 100644 (file)
@@ -206,26 +206,22 @@ FwdState::stopAndDestroy(const char *reason)
 {
     debugs(17, 3, "for " << reason);
 
-    if (opening())
-        cancelOpening(reason);
+    cancelStep(reason);
 
     PeerSelectionInitiator::subscribed = false; // may already be false
     self = nullptr; // we hope refcounting destroys us soon; may already be nil
     /* do not place any code here as this object may be gone by now */
 }
 
-/// Notify connOpener that we no longer need connections. We do not have to do
-/// this -- connOpener would eventually notice on its own, but notifying reduces
-/// waste and speeds up spare connection opening for other transactions (that
-/// could otherwise wait for this transaction to use its spare allowance).
+/// Notify a pending subtask, if any, that we no longer need its help. We do not
+/// have to do this -- the subtask job will eventually end -- but ending it
+/// earlier reduces waste and may reduce DoS attack surface.
 void
-FwdState::cancelOpening(const char *reason)
+FwdState::cancelStep(const char *reason)
 {
-    assert(calls.connector);
-    calls.connector->cancel(reason);
-    calls.connector = nullptr;
-    notifyConnOpener();
-    connOpener.clear();
+    transportWait.cancel(reason);
+    encryptionWait.cancel(reason);
+    peerWait.cancel(reason);
 }
 
 #if STRICT_ORIGINAL_DST
@@ -324,8 +320,7 @@ FwdState::~FwdState()
 
     entry = NULL;
 
-    if (opening())
-        cancelOpening("~FwdState");
+    cancelStep("~FwdState");
 
     if (Comm::IsConnOpen(serverConn))
         closeServerConnection("~FwdState");
@@ -477,8 +472,17 @@ FwdState::fail(ErrorState * errorState)
     if (!errorState->request)
         errorState->request = request;
 
-    if (err->type != ERR_ZERO_SIZE_OBJECT)
-        return;
+    if (err->type == ERR_ZERO_SIZE_OBJECT)
+        reactToZeroSizeObject();
+
+    destinationReceipt = nullptr; // may already be nil
+}
+
+/// ERR_ZERO_SIZE_OBJECT requires special adjustments
+void
+FwdState::reactToZeroSizeObject()
+{
+    assert(err->type == ERR_ZERO_SIZE_OBJECT);
 
     if (pconnRace == racePossible) {
         debugs(17, 5, HERE << "pconn race happened");
@@ -542,6 +546,8 @@ FwdState::complete()
 
         if (Comm::IsConnOpen(serverConn))
             unregister(serverConn);
+        serverConn = nullptr;
+        destinationReceipt = nullptr;
 
         entry->reset();
 
@@ -561,6 +567,12 @@ FwdState::complete()
     }
 }
 
+bool
+FwdState::usingDestination() const
+{
+    return encryptionWait || peerWait || Comm::IsConnOpen(serverConn);
+}
+
 void
 FwdState::noteDestination(Comm::ConnectionPointer path)
 {
@@ -578,19 +590,19 @@ FwdState::noteDestination(Comm::ConnectionPointer path)
 
     destinations->addPath(path);
 
-    if (Comm::IsConnOpen(serverConn)) {
+    if (usingDestination()) {
         // We are already using a previously opened connection, so we cannot be
-        // waiting for connOpener. We still receive destinations for backup.
-        Must(!opening());
+        // waiting for it. We still receive destinations for backup.
+        Must(!transportWait);
         return;
     }
 
-    if (opening()) {
+    if (transportWait) {
         notifyConnOpener();
         return; // and continue to wait for FwdState::noteConnection() callback
     }
 
-    // This is the first path candidate we have seen. Create connOpener.
+    // This is the first path candidate we have seen. Use it.
     useDestinations();
 }
 
@@ -618,19 +630,19 @@ FwdState::noteDestinationsEnd(ErrorState *selectionError)
     // if all of them fail, forwarding as whole will fail
     Must(!selectionError); // finding at least one path means selection succeeded
 
-    if (Comm::IsConnOpen(serverConn)) {
+    if (usingDestination()) {
         // We are already using a previously opened connection, so we cannot be
-        // waiting for connOpener. We were receiving destinations for backup.
-        Must(!opening());
+        // waiting for it. We were receiving destinations for backup.
+        Must(!transportWait);
         return;
     }
 
-    Must(opening()); // or we would be stuck with nothing to do or wait for
+    Must(transportWait); // or we would be stuck with nothing to do or wait for
     notifyConnOpener();
     // and continue to wait for FwdState::noteConnection() callback
 }
 
-/// makes sure connOpener knows that destinations have changed
+/// makes sure connection opener knows that the destinations have changed
 void
 FwdState::notifyConnOpener()
 {
@@ -639,7 +651,7 @@ FwdState::notifyConnOpener()
     } else {
         debugs(17, 7, "notifying about " << *destinations);
         destinations->notificationPending = true;
-        CallJobHere(17, 5, connOpener, HappyConnOpener, noteCandidatesChange);
+        CallJobHere(17, 5, transportWait.job(), HappyConnOpener, noteCandidatesChange);
     }
 }
 
@@ -649,7 +661,7 @@ static void
 fwdServerClosedWrapper(const CommCloseCbParams &params)
 {
     FwdState *fwd = (FwdState *)params.data;
-    fwd->serverClosed(params.fd);
+    fwd->serverClosed();
 }
 
 /**** PRIVATE *****************************************************************/
@@ -719,13 +731,23 @@ FwdState::checkRetriable()
 }
 
 void
-FwdState::serverClosed(int fd)
+FwdState::serverClosed()
 {
-    // XXX: fd is often -1 here
-    debugs(17, 2, "FD " << fd << " " << entry->url() << " after " <<
-           (fd >= 0 ? fd_table[fd].pconn.uses : -1) << " requests");
-    if (fd >= 0 && serverConnection()->fd == fd)
-        fwdPconnPool->noteUses(fd_table[fd].pconn.uses);
+    // XXX: This method logic attempts to tolerate Connection::close() called
+    // for serverConn earlier, by one of our dispatch()ed jobs. If that happens,
+    // serverConn will already be closed here or, worse, it will already be open
+    // for the next forwarding attempt. The current code prevents us getting
+    // stuck, but the long term solution is to stop sharing serverConn.
+    debugs(17, 2, serverConn);
+    if (Comm::IsConnOpen(serverConn)) {
+        const auto uses = fd_table[serverConn->fd].pconn.uses;
+        debugs(17, 3, "prior uses: " << uses);
+        fwdPconnPool->noteUses(uses); // XXX: May not have come from fwdPconnPool
+        serverConn->noteClosure();
+    }
+    serverConn = nullptr;
+    closeHandler = nullptr;
+    destinationReceipt = nullptr;
     retryOrBail();
 }
 
@@ -767,6 +789,8 @@ FwdState::handleUnregisteredServerEnd()
 {
     debugs(17, 2, HERE << "self=" << self << " err=" << err << ' ' << entry->url());
     assert(!Comm::IsConnOpen(serverConn));
+    serverConn = nullptr;
+    destinationReceipt = nullptr;
     retryOrBail();
 }
 
@@ -784,6 +808,8 @@ FwdState::advanceDestination(const char *stepDescription, const Comm::Connection
     } catch (...) {
         debugs (17, 2, "exception while trying to " << stepDescription << ": " << CurrentException);
         closePendingConnection(conn, "connection preparation exception");
+        if (!err)
+            fail(new ErrorState(ERR_GATEWAY_FAILURE, Http::scInternalServerError, request, al));
         retryOrBail();
     }
 }
@@ -795,8 +821,7 @@ FwdState::noteConnection(HappyConnOpener::Answer &answer)
 {
     assert(!destinationReceipt);
 
-    calls.connector = nullptr;
-    connOpener.clear();
+    transportWait.finish();
 
     Must(n_tries <= answer.n_tries); // n_tries cannot decrease
     n_tries = answer.n_tries;
@@ -808,6 +833,8 @@ FwdState::noteConnection(HappyConnOpener::Answer &answer)
         Must(!Comm::IsConnOpen(answer.conn));
         answer.error.clear(); // preserve error for errorSendComplete()
     } else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) {
+        // The socket could get closed while our callback was queued. Sync
+        // Connection. XXX: Connection::fd may already be stale/invalid here.
         // We do not know exactly why the connection got closed, so we play it
         // safe, allowing retries only for persistent (reused) connections
         if (answer.reused) {
@@ -877,23 +904,24 @@ FwdState::establishTunnelThruProxy(const Comm::ConnectionPointer &conn)
     if (!conn->getPeer()->options.no_delay)
         tunneler->setDelayId(entry->mem_obj->mostBytesAllowed());
 #endif
-    AsyncJob::Start(tunneler);
-    // and wait for the tunnelEstablishmentDone() call
+    peerWait.start(tunneler, callback);
 }
 
 /// resumes operations after the (possibly failed) HTTP CONNECT exchange
 void
 FwdState::tunnelEstablishmentDone(Http::TunnelerAnswer &answer)
 {
+    peerWait.finish();
+
     ErrorState *error = nullptr;
     if (!answer.positive()) {
-        Must(!Comm::IsConnOpen(answer.conn));
+        Must(!answer.conn);
         error = answer.squidError.get();
         Must(error);
         answer.squidError.clear(); // preserve error for fail()
     } else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) {
-        // The socket could get closed while our callback was queued.
-        // We close Connection here to sync Connection::fd.
+        // The socket could get closed while our callback was queued. Sync
+        // Connection. XXX: Connection::fd may already be stale/invalid here.
         closePendingConnection(answer.conn, "conn was closed while waiting for tunnelEstablishmentDone");
         error = new ErrorState(ERR_CANNOT_FORWARD, Http::scServiceUnavailable, request, al);
     } else if (!answer.leftovers.isEmpty()) {
@@ -963,18 +991,21 @@ FwdState::secureConnectionToPeer(const Comm::ConnectionPointer &conn)
 #endif
         connector = new Security::BlindPeerConnector(requestPointer, conn, callback, al, sslNegotiationTimeout);
     connector->noteFwdPconnUse = true;
-    AsyncJob::Start(connector); // will call our callback
+    encryptionWait.start(connector, callback);
 }
 
 /// called when all negotiations with the TLS-speaking peer have been completed
 void
 FwdState::connectedToPeer(Security::EncryptorAnswer &answer)
 {
+    encryptionWait.finish();
+
     ErrorState *error = nullptr;
     if ((error = answer.error.get())) {
-        Must(!Comm::IsConnOpen(answer.conn));
+        assert(!answer.conn);
         answer.error.clear(); // preserve error for errorSendComplete()
     } else if (answer.tunneled) {
+        assert(!answer.conn);
         // TODO: When ConnStateData establishes tunnels, its state changes
         // [in ways that may affect logging?]. Consider informing
         // ConnStateData about our tunnel or otherwise unifying tunnel
@@ -982,6 +1013,8 @@ FwdState::connectedToPeer(Security::EncryptorAnswer &answer)
         complete(); // destroys us
         return;
     } else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) {
+        // The socket could get closed while our callback was queued. Sync
+        // Connection. XXX: Connection::fd may already be stale/invalid here.
         closePendingConnection(answer.conn, "conn was closed while waiting for connectedToPeer");
         error = new ErrorState(ERR_CANNOT_FORWARD, Http::scServiceUnavailable, request, al);
     }
@@ -1054,22 +1087,20 @@ FwdState::connectStart()
     Must(!request->pinnedConnection());
 
     assert(!destinations->empty());
-    assert(!opening());
+    assert(!usingDestination());
 
     // Ditch error page if it was created before.
     // A new one will be created if there's another problem
     delete err;
     err = nullptr;
     request->clearError();
-    serverConn = nullptr;
-    destinationReceipt = nullptr;
 
     request->hier.startPeerClock();
 
-    calls.connector = asyncCall(17, 5, "FwdState::noteConnection", HappyConnOpener::CbDialer<FwdState>(&FwdState::noteConnection, this));
+    AsyncCall::Pointer callback = asyncCall(17, 5, "FwdState::noteConnection", HappyConnOpener::CbDialer<FwdState>(&FwdState::noteConnection, this));
 
     HttpRequest::Pointer cause = request;
-    const auto cs = new HappyConnOpener(destinations, calls.connector, cause, start_t, n_tries, al);
+    const auto cs = new HappyConnOpener(destinations, callback, cause, start_t, n_tries, al);
     cs->setHost(request->url.host());
     bool retriable = checkRetriable();
     if (!retriable && Config.accessList.serverPconnForNonretriable) {
@@ -1081,8 +1112,7 @@ FwdState::connectStart()
     cs->setRetriable(retriable);
     cs->allowPersistent(pconnRace != raceHappened);
     destinations->notificationPending = true; // start() is async
-    connOpener = cs;
-    AsyncJob::Start(cs);
+    transportWait.start(cs, callback);
 }
 
 /// send request on an existing connection dedicated to the requesting client
index 0361414f62bd2ef5a649d675a3a8d778c4fb89cc..d4d607664fbcace4920d1c679ac995d9ee20cd5f 100644 (file)
@@ -9,13 +9,12 @@
 #ifndef SQUID_FORWARD_H
 #define SQUID_FORWARD_H
 
-#include "base/CbcPointer.h"
 #include "base/forward.h"
+#include "base/JobWait.h"
 #include "base/RefCount.h"
 #include "clients/forward.h"
 #include "comm.h"
 #include "comm/Connection.h"
-#include "comm/ConnOpener.h"
 #include "error/forward.h"
 #include "fde.h"
 #include "http/StatusCode.h"
@@ -39,7 +38,6 @@ class ResolvedPeers;
 typedef RefCount<ResolvedPeers> ResolvedPeersPointer;
 
 class HappyConnOpener;
-typedef CbcPointer<HappyConnOpener> HappyConnOpenerPointer;
 class HappyConnOpenerAnswer;
 
 /// Sets initial TOS value and Netfilter for the future outgoing connection.
@@ -83,7 +81,7 @@ public:
     void handleUnregisteredServerEnd();
     int reforward();
     bool reforwardableStatus(const Http::StatusCode s) const;
-    void serverClosed(int fd);
+    void serverClosed();
     void connectStart();
     void connectDone(const Comm::ConnectionPointer & conn, Comm::Flag status, int xerrno);
     bool checkRetry();
@@ -111,6 +109,9 @@ private:
     /* PeerSelectionInitiator API */
     virtual void noteDestination(Comm::ConnectionPointer conn) override;
     virtual void noteDestinationsEnd(ErrorState *selectionError) override;
+    /// whether the successfully selected path destination or the established
+    /// server connection is still in use
+    bool usingDestination() const;
 
     void noteConnection(HappyConnOpenerAnswer &);
 
@@ -153,13 +154,10 @@ private:
     /// \returns the time left for this connection to become connected or 1 second if it is less than one second left
     time_t connectingTimeout(const Comm::ConnectionPointer &conn) const;
 
-    /// whether we are waiting for HappyConnOpener
-    /// same as calls.connector but may differ from connOpener.valid()
-    bool opening() const { return connOpener.set(); }
-
-    void cancelOpening(const char *reason);
+    void cancelStep(const char *reason);
 
     void notifyConnOpener();
+    void reactToZeroSizeObject();
 
 public:
     StoreEntry *entry;
@@ -176,11 +174,6 @@ private:
     time_t start_t;
     int n_tries; ///< the number of forwarding attempts so far
 
-    // AsyncCalls which we set and may need cancelling.
-    struct {
-        AsyncCall::Pointer connector;  ///< a call linking us to the ConnOpener producing serverConn.
-    } calls;
-
     struct {
         bool connected_okay; ///< TCP link ever opened properly. This affects retry of POST,PUT,CONNECT,etc
         bool dont_retry;
@@ -188,7 +181,16 @@ private:
         bool destinationsFound; ///< at least one candidate path found
     } flags;
 
-    HappyConnOpenerPointer connOpener; ///< current connection opening job
+    /// waits for a transport connection to the peer to be established/opened
+    JobWait<HappyConnOpener> transportWait;
+
+    /// waits for the established transport connection to be secured/encrypted
+    JobWait<Security::PeerConnector> encryptionWait;
+
+    /// waits for an HTTP CONNECT tunnel through a cache_peer to be negotiated
+    /// over the (encrypted, if needed) transport connection to that cache_peer
+    JobWait<Http::Tunneler> peerWait;
+
     ResolvedPeersPointer destinations; ///< paths for forwarding the request
     Comm::ConnectionPointer serverConn; ///< a successfully opened connection to a server.
     PeerConnectionPointer destinationReceipt; ///< peer selection result (or nil)
index 3d689f7a485ed5607ab7d42f2cc5d1d78ea332f0..0436e35b6fea36d916d4d3e0bf18e46e069d56ac 100644 (file)
@@ -18,6 +18,7 @@
 #include "neighbors.h"
 #include "pconn.h"
 #include "PeerPoolMgr.h"
+#include "sbuf/Stream.h"
 #include "SquidConfig.h"
 
 CBDATA_CLASS_INIT(HappyConnOpener);
@@ -330,6 +331,8 @@ HappyConnOpener::HappyConnOpener(const ResolvedPeers::Pointer &dests, const Asyn
     fwdStart(aFwdStart),
     callback_(aCall),
     destinations(dests),
+    prime(&HappyConnOpener::notePrimeConnectDone, "HappyConnOpener::notePrimeConnectDone"),
+    spare(&HappyConnOpener::noteSpareConnectDone, "HappyConnOpener::noteSpareConnectDone"),
     ale(anAle),
     cause(request),
     n_tries(tries)
@@ -410,33 +413,43 @@ HappyConnOpener::swanSong()
     AsyncJob::swanSong();
 }
 
+/// HappyConnOpener::Attempt printer for debugging
+std::ostream &
+operator <<(std::ostream &os, const HappyConnOpener::Attempt &attempt)
+{
+    if (!attempt.path)
+        os << '-';
+    else if (attempt.path->isOpen())
+        os << "FD " << attempt.path->fd;
+    else if (attempt.connWait)
+        os << attempt.connWait;
+    else // destination is known; connection closed (and we are not opening any)
+        os << attempt.path->id;
+    return os;
+}
+
 const char *
 HappyConnOpener::status() const
 {
-    static MemBuf buf;
-    buf.reset();
+    // TODO: In a redesigned status() API, the caller may mimic this approach.
+    static SBuf buf;
+    buf.clear();
 
-    buf.append(" [", 2);
+    SBufStream os(buf);
+
+    os.write(" [", 2);
     if (stopReason)
-        buf.appendf("Stopped, reason:%s", stopReason);
-    if (prime) {
-        if (prime.path && prime.path->isOpen())
-            buf.appendf(" prime FD %d", prime.path->fd);
-        else if (prime.connector)
-            buf.appendf(" prime call%ud", prime.connector->id.value);
-    }
-    if (spare) {
-        if (spare.path && spare.path->isOpen())
-            buf.appendf(" spare FD %d", spare.path->fd);
-        else if (spare.connector)
-            buf.appendf(" spare call%ud", spare.connector->id.value);
-    }
+        os << "Stopped:" << stopReason;
+    if (prime)
+        os << "prime:" << prime;
+    if (spare)
+        os << "spare:" << spare;
     if (n_tries)
-        buf.appendf(" tries %d", n_tries);
-    buf.appendf(" %s%u]", id.prefix(), id.value);
-    buf.terminate();
+        os << " tries:" << n_tries;
+    os << ' ' << id << ']';
 
-    return buf.content();
+    buf = os.buf();
+    return buf.c_str();
 }
 
 /// Create "503 Service Unavailable" or "504 Gateway Timeout" error depending
@@ -516,7 +529,7 @@ void
 HappyConnOpener::startConnecting(Attempt &attempt, PeerConnectionPointer &dest)
 {
     Must(!attempt.path);
-    Must(!attempt.connector);
+    Must(!attempt.connWait);
     Must(dest);
 
     const auto bumpThroughPeer = cause->flags.sslBumped && dest->getPeer();
@@ -552,51 +565,52 @@ HappyConnOpener::openFreshConnection(Attempt &attempt, PeerConnectionPointer &de
     entry->mem_obj->checkUrlChecksum();
 #endif
 
-    GetMarkingsToServer(cause.getRaw(), *dest);
+    const auto conn = dest->cloneProfile();
+    GetMarkingsToServer(cause.getRaw(), *conn);
 
-    // ConnOpener modifies its destination argument so we reset the source port
-    // in case we are reusing the destination already used by our predecessor.
-    dest->local.port(0);
     ++n_tries;
 
     typedef CommCbMemFunT<HappyConnOpener, CommConnectCbParams> Dialer;
-    AsyncCall::Pointer callConnect = JobCallback(48, 5, Dialer, this, HappyConnOpener::connectDone);
+    AsyncCall::Pointer callConnect = asyncCall(48, 5, attempt.callbackMethodName,
+                                     Dialer(this, attempt.callbackMethod));
     const time_t connTimeout = dest->connectTimeout(fwdStart);
-    Comm::ConnOpener *cs = new Comm::ConnOpener(dest, callConnect, connTimeout);
-    if (!dest->getPeer())
+    auto cs = new Comm::ConnOpener(conn, callConnect, connTimeout);
+    if (!conn->getPeer())
         cs->setHost(host_);
 
-    attempt.path = dest;
-    attempt.connector = callConnect;
-    attempt.opener = cs;
+    attempt.path = dest; // but not the being-opened conn!
+    attempt.connWait.start(cs, callConnect);
+}
 
-    AsyncJob::Start(cs);
+/// Comm::ConnOpener callback for the prime connection attempt
+void
+HappyConnOpener::notePrimeConnectDone(const CommConnectCbParams &params)
+{
+    handleConnOpenerAnswer(prime, params, "new prime connection");
 }
 
-/// called by Comm::ConnOpener objects after a prime or spare connection attempt
-/// completes (successfully or not)
+/// Comm::ConnOpener callback for the spare connection attempt
 void
-HappyConnOpener::connectDone(const CommConnectCbParams &params)
+HappyConnOpener::noteSpareConnectDone(const CommConnectCbParams &params)
 {
-    Must(params.conn);
-    const bool itWasPrime = (params.conn == prime.path);
-    const bool itWasSpare = (params.conn == spare.path);
-    Must(itWasPrime != itWasSpare);
-
-    PeerConnectionPointer handledPath;
-    if (itWasPrime) {
-        handledPath = prime.path;
-        prime.finish();
-    } else {
-        handledPath = spare.path;
-        spare.finish();
-        if (gotSpareAllowance) {
-            TheSpareAllowanceGiver.jobUsedAllowance();
-            gotSpareAllowance = false;
-        }
+    if (gotSpareAllowance) {
+        TheSpareAllowanceGiver.jobUsedAllowance();
+        gotSpareAllowance = false;
     }
+    handleConnOpenerAnswer(spare, params, "new spare connection");
+}
+
+/// prime/spare-agnostic processing of a Comm::ConnOpener result
+void
+HappyConnOpener::handleConnOpenerAnswer(Attempt &attempt, const CommConnectCbParams &params, const char *what)
+{
+    Must(params.conn);
+
+    // finalize the previously selected path before attempt.finish() forgets it
+    auto handledPath = attempt.path;
+    handledPath.finalize(params.conn); // closed on errors
+    attempt.finish();
 
-    const char *what = itWasPrime ? "new prime connection" : "new spare connection";
     if (params.flag == Comm::OK) {
         sendSuccess(handledPath, false, what);
         return;
@@ -605,7 +619,6 @@ HappyConnOpener::connectDone(const CommConnectCbParams &params)
     debugs(17, 8, what << " failed: " << params.conn);
     if (const auto peer = params.conn->getPeer())
         peerConnectFailed(peer);
-    params.conn->close(); // TODO: Comm::ConnOpener should do this instead.
 
     // remember the last failure (we forward it if we cannot connect anywhere)
     lastFailedConnection = handledPath;
@@ -881,13 +894,23 @@ HappyConnOpener::ranOutOfTimeOrAttempts() const
     return false;
 }
 
+HappyConnOpener::Attempt::Attempt(const CallbackMethod method, const char *methodName):
+    callbackMethod(method),
+    callbackMethodName(methodName)
+{
+}
+
+void
+HappyConnOpener::Attempt::finish()
+{
+    connWait.finish();
+    path = nullptr;
+}
+
 void
 HappyConnOpener::Attempt::cancel(const char *reason)
 {
-    if (connector) {
-        connector->cancel(reason);
-        CallJobHere(17, 3, opener, Comm::ConnOpener, noteAbort);
-    }
-    clear();
+    connWait.cancel(reason);
+    path = nullptr;
 }
 
index dbffffd92aa0a953ae482fc4c9f6668414456885..ff20232513f010b6d999b99285a4e9d1406bcc5c 100644 (file)
@@ -156,22 +156,28 @@ private:
     /// a connection opening attempt in progress (or falsy)
     class Attempt {
     public:
+        /// HappyConnOpener method implementing a ConnOpener callback
+        using CallbackMethod = void (HappyConnOpener::*)(const CommConnectCbParams &);
+
+        Attempt(const CallbackMethod method, const char *methodName);
+
         explicit operator bool() const { return static_cast<bool>(path); }
 
         /// reacts to a natural attempt completion (successful or otherwise)
-        void finish() { clear(); }
+        void finish();
 
         /// aborts an in-progress attempt
         void cancel(const char *reason);
 
         PeerConnectionPointer path; ///< the destination we are connecting to
-        AsyncCall::Pointer connector; ///< our opener callback
-        Comm::ConnOpener::Pointer opener; ///< connects to path and calls us
 
-    private:
-        /// cleans up after the attempt ends (successfully or otherwise)
-        void clear() { path = nullptr; connector = nullptr; opener = nullptr; }
+        /// waits for a connection to the peer to be established/opened
+        JobWait<Comm::ConnOpener> connWait;
+
+        const CallbackMethod callbackMethod; ///< ConnOpener calls this method
+        const char * const callbackMethodName; ///< for callbackMethod debugging
     };
+    friend std::ostream &operator <<(std::ostream &, const Attempt &);
 
     /* AsyncJob API */
     virtual void start() override;
@@ -190,7 +196,9 @@ private:
     void openFreshConnection(Attempt &, PeerConnectionPointer &);
     bool reuseOldConnection(PeerConnectionPointer &);
 
-    void connectDone(const CommConnectCbParams &);
+    void notePrimeConnectDone(const CommConnectCbParams &);
+    void noteSpareConnectDone(const CommConnectCbParams &);
+    void handleConnOpenerAnswer(Attempt &, const CommConnectCbParams &, const char *connDescription);
 
     void checkForNewConnection();
 
index abfa86d2d5a111b4a3affab129cc16791fec9eba..f34d8584990696cca1a30da87e39adfb871e247b 100644 (file)
@@ -43,9 +43,8 @@ public:
 PeerPoolMgr::PeerPoolMgr(CachePeer *aPeer): AsyncJob("PeerPoolMgr"),
     peer(cbdataReference(aPeer)),
     request(),
-    opener(),
-    securer(),
-    closer(),
+    transportWait(),
+    encryptionWait(),
     addrUsed(0)
 {
 }
@@ -90,7 +89,7 @@ PeerPoolMgr::doneAll() const
 void
 PeerPoolMgr::handleOpenedConnection(const CommConnectCbParams &params)
 {
-    opener = NULL;
+    transportWait.finish();
 
     if (!validPeer()) {
         debugs(48, 3, "peer gone");
@@ -100,9 +99,6 @@ PeerPoolMgr::handleOpenedConnection(const CommConnectCbParams &params)
     }
 
     if (params.flag != Comm::OK) {
-        /* it might have been a timeout with a partially open link */
-        if (params.conn != NULL)
-            params.conn->close();
         peerConnectFailed(peer);
         checkpoint("conn opening failure"); // may retry
         return;
@@ -112,20 +108,16 @@ PeerPoolMgr::handleOpenedConnection(const CommConnectCbParams &params)
 
     // Handle TLS peers.
     if (peer->secure.encryptTransport) {
-        typedef CommCbMemFunT<PeerPoolMgr, CommCloseCbParams> CloserDialer;
-        closer = JobCallback(48, 3, CloserDialer, this,
-                             PeerPoolMgr::handleSecureClosure);
-        comm_add_close_handler(params.conn->fd, closer);
-
-        securer = asyncCall(48, 4, "PeerPoolMgr::handleSecuredPeer",
-                            MyAnswerDialer(this, &PeerPoolMgr::handleSecuredPeer));
+        // XXX: Exceptions orphan params.conn
+        AsyncCall::Pointer callback = asyncCall(48, 4, "PeerPoolMgr::handleSecuredPeer",
+                                                MyAnswerDialer(this, &PeerPoolMgr::handleSecuredPeer));
 
         const auto peerTimeout = peer->connectTimeout();
         const int timeUsed = squid_curtime - params.conn->startTime();
         // Use positive timeout when less than one second is left for conn.
         const int timeLeft = positiveTimeout(peerTimeout - timeUsed);
-        auto *connector = new Security::BlindPeerConnector(request, params.conn, securer, nullptr, timeLeft);
-        AsyncJob::Start(connector); // will call our callback
+        const auto connector = new Security::BlindPeerConnector(request, params.conn, callback, nullptr, timeLeft);
+        encryptionWait.start(connector, callback);
         return;
     }
 
@@ -144,16 +136,7 @@ PeerPoolMgr::pushNewConnection(const Comm::ConnectionPointer &conn)
 void
 PeerPoolMgr::handleSecuredPeer(Security::EncryptorAnswer &answer)
 {
-    Must(securer != NULL);
-    securer = NULL;
-
-    if (closer != NULL) {
-        if (answer.conn != NULL)
-            comm_remove_close_handler(answer.conn->fd, closer);
-        else
-            closer->cancel("securing completed");
-        closer = NULL;
-    }
+    encryptionWait.finish();
 
     if (!validPeer()) {
         debugs(48, 3, "peer gone");
@@ -162,35 +145,33 @@ PeerPoolMgr::handleSecuredPeer(Security::EncryptorAnswer &answer)
         return;
     }
 
+    assert(!answer.tunneled);
     if (answer.error.get()) {
-        if (answer.conn != NULL)
-            answer.conn->close();
+        assert(!answer.conn);
         // PeerConnector calls peerConnectFailed() for us;
         checkpoint("conn securing failure"); // may retry
         return;
     }
 
-    pushNewConnection(answer.conn);
-}
+    assert(answer.conn);
 
-void
-PeerPoolMgr::handleSecureClosure(const CommCloseCbParams &)
-{
-    Must(closer != NULL);
-    Must(securer != NULL);
-    securer->cancel("conn closed by a 3rd party");
-    securer = NULL;
-    closer = NULL;
-    // allow the closing connection to fully close before we check again
-    Checkpoint(this, "conn closure while securing");
+    // The socket could get closed while our callback was queued. Sync
+    // Connection. XXX: Connection::fd may already be stale/invalid here.
+    if (answer.conn->isOpen() && fd_table[answer.conn->fd].closing()) {
+        answer.conn->noteClosure();
+        checkpoint("external connection closure"); // may retry
+        return;
+    }
+
+    pushNewConnection(answer.conn);
 }
 
 void
 PeerPoolMgr::openNewConnection()
 {
     // KISS: Do nothing else when we are already doing something.
-    if (opener != NULL || securer != NULL || shutting_down) {
-        debugs(48, 7, "busy: " << opener << '|' << securer << '|' << shutting_down);
+    if (transportWait || encryptionWait || shutting_down) {
+        debugs(48, 7, "busy: " << transportWait << '|' << encryptionWait << '|' << shutting_down);
         return; // there will be another checkpoint when we are done opening/securing
     }
 
@@ -227,9 +208,9 @@ PeerPoolMgr::openNewConnection()
 
     const auto ctimeout = peer->connectTimeout();
     typedef CommCbMemFunT<PeerPoolMgr, CommConnectCbParams> Dialer;
-    opener = JobCallback(48, 5, Dialer, this, PeerPoolMgr::handleOpenedConnection);
-    Comm::ConnOpener *cs = new Comm::ConnOpener(conn, opener, ctimeout);
-    AsyncJob::Start(cs);
+    AsyncCall::Pointer callback = JobCallback(48, 5, Dialer, this, PeerPoolMgr::handleOpenedConnection);
+    const auto cs = new Comm::ConnOpener(conn, callback, ctimeout);
+    transportWait.start(cs, callback);
 }
 
 void
index 3ca6e9d57649579c5e3e8c8cfdc56bd192c51bf2..182dc203c7667afe0891d1fa456b2d16f2433bf1 100644 (file)
@@ -10,6 +10,7 @@
 #define SQUID_PEERPOOLMGR_H
 
 #include "base/AsyncJob.h"
+#include "base/JobWait.h"
 #include "comm/forward.h"
 #include "security/forward.h"
 
@@ -54,18 +55,19 @@ protected:
     /// Security::PeerConnector callback
     void handleSecuredPeer(Security::EncryptorAnswer &answer);
 
-    /// called when the connection we are trying to secure is closed by a 3rd party
-    void handleSecureClosure(const CommCloseCbParams &params);
-
     /// the final step in connection opening (and, optionally, securing) sequence
     void pushNewConnection(const Comm::ConnectionPointer &conn);
 
 private:
     CachePeer *peer; ///< the owner of the pool we manage
     RefCount<HttpRequest> request; ///< fake HTTP request for conn opening code
-    AsyncCall::Pointer opener; ///< whether we are opening a connection
-    AsyncCall::Pointer securer; ///< whether we are securing a connection
-    AsyncCall::Pointer closer; ///< monitors conn while we are securing it
+
+    /// waits for a transport connection to the peer to be established/opened
+    JobWait<Comm::ConnOpener> transportWait;
+
+    /// waits for the established transport connection to be secured/encrypted
+    JobWait<Security::BlindPeerConnector> encryptionWait;
+
     unsigned int addrUsed; ///< counter for cycling through peer addresses
 };
 
index a2e47df675001daaf4e93d51d608fc0cb4115f2e..377dcee45120989b70d11e101ff2006a9afb0870 100644 (file)
@@ -151,7 +151,7 @@ ResolvedPeers::extractFound(const char *description, const Paths::iterator &foun
         while (++pathsToSkip < paths_.size() && !paths_[pathsToSkip].available) {}
     }
 
-    const auto cleanPath = path.connection->cloneDestinationDetails();
+    const auto cleanPath = path.connection->cloneProfile();
     return PeerConnectionPointer(cleanPath, found - paths_.begin());
 }
 
index 3faaa5ece0bef66bbccadfb90cc5d2fdee68d61d..3fd49c312a6b3dc8e02d450814eb5a0f69be73c4 100644 (file)
@@ -187,8 +187,7 @@ void Adaptation::Icap::ModXact::startWriting()
     openConnection();
 }
 
-// connection with the ICAP service established
-void Adaptation::Icap::ModXact::handleCommConnected()
+void Adaptation::Icap::ModXact::startShoveling()
 {
     Must(state.writing == State::writingConnect);
 
index 509b446468b3009fcbd86e526929a0161dcd5fde..a976fb3bdf8e24fd2ba3d79684f899215f667b1a 100644 (file)
@@ -155,10 +155,11 @@ public:
     virtual void noteBodyProductionEnded(BodyPipe::Pointer);
     virtual void noteBodyProducerAborted(BodyPipe::Pointer);
 
-    // comm handlers
-    virtual void handleCommConnected();
+    /* Xaction API */
+    virtual void startShoveling();
     virtual void handleCommWrote(size_t size);
     virtual void handleCommRead(size_t size);
+
     void handleCommWroteHeaders();
     void handleCommWroteBody();
 
index 7167e7c18d08ad81d06e4572e63b6e805cbdb1d2..f8d0aee752384dc2c34b0d13573125d2f6570f37 100644 (file)
@@ -37,7 +37,7 @@ void Adaptation::Icap::OptXact::start()
     openConnection();
 }
 
-void Adaptation::Icap::OptXact::handleCommConnected()
+void Adaptation::Icap::OptXact::startShoveling()
 {
     scheduleRead();
 
index 4d5518dcbdbc827bf0567e4a77eaaff59fc7c87b..3e033637d9a14fce840bd9d91d66f9bb5445bf79 100644 (file)
@@ -30,8 +30,9 @@ public:
     OptXact(ServiceRep::Pointer &aService);
 
 protected:
+    /* Xaction API */
     virtual void start();
-    virtual void handleCommConnected();
+    virtual void startShoveling();
     virtual void handleCommWrote(size_t size);
     virtual void handleCommRead(size_t size);
 
index 98dd0f21d19db09db4439d702639df1aca07fb19..2dda24d3a6c3bb6db291b49426d770bb36880cf6 100644 (file)
@@ -112,9 +112,8 @@ void Adaptation::Icap::ServiceRep::noteFailure()
     // should be configurable.
 }
 
-// returns a persistent or brand new connection; negative int on failures
 Comm::ConnectionPointer
-Adaptation::Icap::ServiceRep::getConnection(bool retriableXact, bool &reused)
+Adaptation::Icap::ServiceRep::getIdleConnection(const bool retriableXact)
 {
     Comm::ConnectionPointer connection;
 
@@ -137,7 +136,6 @@ Adaptation::Icap::ServiceRep::getConnection(bool retriableXact, bool &reused)
     else
         theIdleConns->closeN(1);
 
-    reused = Comm::IsConnOpen(connection);
     ++theBusyConns;
     debugs(93,3, HERE << "got connection: " << connection);
     return connection;
@@ -150,7 +148,6 @@ void Adaptation::Icap::ServiceRep::putConnection(const Comm::ConnectionPointer &
     // do not pool an idle connection if we owe connections
     if (isReusable && excessConnections() == 0) {
         debugs(93, 3, HERE << "pushing pconn" << comment);
-        commUnsetConnTimeout(conn);
         theIdleConns->push(conn);
     } else {
         debugs(93, 3, HERE << (sendReset ? "RST" : "FIN") << "-closing " <<
index 50fe624f81e4f1a0384f6d7d058995a39efefc38..d7cd604d6963f86b05afb955d8e5a625d8fd499a 100644 (file)
@@ -85,7 +85,8 @@ public:
     bool wantsPreview(const SBuf &urlPath, size_t &wantedSize) const;
     bool allows204() const;
     bool allows206() const;
-    Comm::ConnectionPointer getConnection(bool isRetriable, bool &isReused);
+    /// \returns an idle persistent ICAP connection or nil
+    Comm::ConnectionPointer getIdleConnection(bool isRetriable);
     void putConnection(const Comm::ConnectionPointer &conn, bool isReusable, bool sendReset, const char *comment);
     void noteConnectionUse(const Comm::ConnectionPointer &conn);
     void noteConnectionFailed(const char *comment);
index e4e539ee4657394b737aa3457351a6a0bb677002..a8b568afe286f9cdcc21005391578a6c0f283615 100644 (file)
@@ -13,6 +13,7 @@
 #include "adaptation/icap/Config.h"
 #include "adaptation/icap/Launcher.h"
 #include "adaptation/icap/Xaction.h"
+#include "base/JobWait.h"
 #include "base/TextException.h"
 #include "comm.h"
 #include "comm/Connection.h"
@@ -82,7 +83,6 @@ Adaptation::Icap::Xaction::Xaction(const char *aTypeName, Adaptation::Icap::Serv
     icapRequest(NULL),
     icapReply(NULL),
     attempts(0),
-    connection(NULL),
     theService(aService),
     commEof(false),
     reuseConnection(true),
@@ -90,14 +90,8 @@ Adaptation::Icap::Xaction::Xaction(const char *aTypeName, Adaptation::Icap::Serv
     isRepeatable(true),
     ignoreLastWrite(false),
     waitingForDns(false),
-    stopReason(NULL),
-    connector(NULL),
-    reader(NULL),
-    writer(NULL),
-    closer(NULL),
     alep(new AccessLogEntry),
-    al(*alep),
-    cs(NULL)
+    al(*alep)
 {
     debugs(93,3, typeName << " constructed, this=" << this <<
            " [icapx" << id << ']'); // we should not call virtual status() here
@@ -153,6 +147,8 @@ static void
 icapLookupDnsResults(const ipcache_addrs *ia, const Dns::LookupDetails &, void *data)
 {
     Adaptation::Icap::Xaction *xa = static_cast<Adaptation::Icap::Xaction *>(data);
+    /// TODO: refactor with CallJobHere1, passing either std::optional (after upgrading to C++17)
+    /// or Optional<Ip::Address> (when it can take non-trivial types)
     xa->dnsLookupDone(ia);
 }
 
@@ -167,21 +163,8 @@ Adaptation::Icap::Xaction::openConnection()
     if (!TheConfig.reuse_connections)
         disableRetries(); // this will also safely drain pconn pool
 
-    bool wasReused = false;
-    connection = s.getConnection(isRetriable, wasReused);
-
-    if (wasReused && Comm::IsConnOpen(connection)) {
-        // Set comm Close handler
-        // fake the connect callback
-        // TODO: can we sync call Adaptation::Icap::Xaction::noteCommConnected here instead?
-        typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> Dialer;
-        CbcPointer<Xaction> self(this);
-        Dialer dialer(self, &Adaptation::Icap::Xaction::noteCommConnected);
-        dialer.params.conn = connection;
-        dialer.params.flag = Comm::OK;
-        // fake other parameters by copying from the existing connection
-        connector = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected", dialer);
-        ScheduleCallHere(connector);
+    if (const auto pconn = s.getIdleConnection(isRetriable)) {
+        useTransportConnection(pconn);
         return;
     }
 
@@ -210,30 +193,22 @@ Adaptation::Icap::Xaction::dnsLookupDone(const ipcache_addrs *ia)
 #if WHEN_IPCACHE_NBGETHOSTBYNAME_USES_ASYNC_CALLS
         dieOnConnectionFailure(); // throws
 #else // take a step back into protected Async call dialing.
-        // fake the connect callback
-        typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> Dialer;
-        CbcPointer<Xaction> self(this);
-        Dialer dialer(self, &Adaptation::Icap::Xaction::noteCommConnected);
-        dialer.params.conn = connection;
-        dialer.params.flag = Comm::COMM_ERROR;
-        // fake other parameters by copying from the existing connection
-        connector = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected", dialer);
-        ScheduleCallHere(connector);
+        CallJobHere(93, 3, this, Xaction, Xaction::dieOnConnectionFailure);
 #endif
         return;
     }
 
-    connection = new Comm::Connection;
-    connection->remote = ia->current();
-    connection->remote.port(s.cfg().port);
-    getOutgoingAddress(NULL, connection);
+    const Comm::ConnectionPointer conn = new Comm::Connection();
+    conn->remote = ia->current();
+    conn->remote.port(s.cfg().port);
+    getOutgoingAddress(nullptr, conn);
 
     // TODO: service bypass status may differ from that of a transaction
     typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> ConnectDialer;
-    connector = JobCallback(93,3, ConnectDialer, this, Adaptation::Icap::Xaction::noteCommConnected);
-    cs = new Comm::ConnOpener(connection, connector, TheConfig.connect_timeout(service().cfg().bypass));
+    AsyncCall::Pointer callback = JobCallback(93, 3, ConnectDialer, this, Adaptation::Icap::Xaction::noteCommConnected);
+    const auto cs = new Comm::ConnOpener(conn, callback, TheConfig.connect_timeout(service().cfg().bypass));
     cs->setHost(s.cfg().host.termedBuf());
-    AsyncJob::Start(cs.get());
+    transportWait.start(cs, callback);
 }
 
 void Adaptation::Icap::Xaction::closeConnection()
@@ -245,6 +220,8 @@ void Adaptation::Icap::Xaction::closeConnection()
             closer = NULL;
         }
 
+        commUnsetConnTimeout(connection);
+
         cancelRead(); // may not work
 
         if (reuseConnection && !doneWithIo()) {
@@ -264,54 +241,65 @@ void Adaptation::Icap::Xaction::closeConnection()
 
         writer = NULL;
         reader = NULL;
-        connector = NULL;
         connection = NULL;
     }
 }
 
-// connection with the ICAP service established
+/// called when the connection attempt to an ICAP service completes (successfully or not)
 void Adaptation::Icap::Xaction::noteCommConnected(const CommConnectCbParams &io)
 {
-    cs = NULL;
+    transportWait.finish();
 
-    if (io.flag == Comm::TIMEOUT) {
-        handleCommTimedout();
+    if (io.flag != Comm::OK) {
+        dieOnConnectionFailure(); // throws
         return;
     }
 
-    Must(connector != NULL);
-    connector = NULL;
-
-    if (io.flag != Comm::OK)
-        dieOnConnectionFailure(); // throws
-
-    typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommTimeoutCbParams> TimeoutDialer;
-    AsyncCall::Pointer timeoutCall =  asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommTimedout",
-                                      TimeoutDialer(this,&Adaptation::Icap::Xaction::noteCommTimedout));
-    commSetConnTimeout(io.conn, TheConfig.connect_timeout(service().cfg().bypass), timeoutCall);
+    useTransportConnection(io.conn);
+}
 
-    typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommCloseCbParams> CloseDialer;
-    closer =  asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed",
-                        CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed));
-    comm_add_close_handler(io.conn->fd, closer);
+/// React to the availability of a transport connection to the ICAP service.
+/// The given connection may (or may not) be secured already.
+void
+Adaptation::Icap::Xaction::useTransportConnection(const Comm::ConnectionPointer &conn)
+{
+    assert(Comm::IsConnOpen(conn));
+    assert(!connection);
 
     // If it is a reused connection and the TLS object is built
     // we should not negotiate new TLS session
-    const auto &ssl = fd_table[io.conn->fd].ssl;
+    const auto &ssl = fd_table[conn->fd].ssl;
     if (!ssl && service().cfg().secure.encryptTransport) {
+        // XXX: Exceptions orphan conn.
         CbcPointer<Adaptation::Icap::Xaction> me(this);
-        securer = asyncCall(93, 4, "Adaptation::Icap::Xaction::handleSecuredPeer",
-                            MyIcapAnswerDialer(me, &Adaptation::Icap::Xaction::handleSecuredPeer));
+        AsyncCall::Pointer callback = asyncCall(93, 4, "Adaptation::Icap::Xaction::handleSecuredPeer",
+                                                MyIcapAnswerDialer(me, &Adaptation::Icap::Xaction::handleSecuredPeer));
 
-        auto *sslConnector = new Ssl::IcapPeerConnector(theService, io.conn, securer, masterLogEntry(), TheConfig.connect_timeout(service().cfg().bypass));
-        AsyncJob::Start(sslConnector); // will call our callback
+        const auto sslConnector = new Ssl::IcapPeerConnector(theService, conn, callback, masterLogEntry(), TheConfig.connect_timeout(service().cfg().bypass));
+
+        encryptionWait.start(sslConnector, callback);
         return;
     }
 
-// ??    fd_table[io.conn->fd].noteUse(icapPconnPool);
+    useIcapConnection(conn);
+}
+
+/// react to the availability of a fully-ready ICAP connection
+void
+Adaptation::Icap::Xaction::useIcapConnection(const Comm::ConnectionPointer &conn)
+{
+    assert(!connection);
+    assert(conn);
+    assert(Comm::IsConnOpen(conn));
+    connection = conn;
     service().noteConnectionUse(connection);
 
-    handleCommConnected();
+    typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommCloseCbParams> CloseDialer;
+    closer =  asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed",
+                        CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed));
+    comm_add_close_handler(connection->fd, closer);
+
+    startShoveling();
 }
 
 void Adaptation::Icap::Xaction::dieOnConnectionFailure()
@@ -356,40 +344,26 @@ void Adaptation::Icap::Xaction::noteCommWrote(const CommIoCbParams &io)
 
 // communication timeout with the ICAP service
 void Adaptation::Icap::Xaction::noteCommTimedout(const CommTimeoutCbParams &)
-{
-    handleCommTimedout();
-}
-
-void Adaptation::Icap::Xaction::handleCommTimedout()
 {
     debugs(93, 2, HERE << typeName << " failed: timeout with " <<
            theService->cfg().methodStr() << " " <<
            theService->cfg().uri << status());
     reuseConnection = false;
-    const bool whileConnecting = connector != NULL;
-    if (whileConnecting) {
-        assert(!haveConnection());
-        theService->noteConnectionFailed("timedout");
-    } else
-        closeConnection(); // so that late Comm callbacks do not disturb bypass
-    throw TexcHere(whileConnecting ?
-                   "timed out while connecting to the ICAP service" :
-                   "timed out while talking to the ICAP service");
+    assert(haveConnection());
+    theService->noteConnectionFailed("timedout");
+    closeConnection();
+    throw TextException("timed out while talking to the ICAP service", Here());
 }
 
 // unexpected connection close while talking to the ICAP service
 void Adaptation::Icap::Xaction::noteCommClosed(const CommCloseCbParams &)
 {
-    if (securer != NULL) {
-        securer->cancel("Connection closed before SSL negotiation finished");
-        securer = NULL;
+    if (connection) {
+        connection->noteClosure();
+        connection = nullptr;
     }
     closer = NULL;
-    handleCommClosed();
-}
 
-void Adaptation::Icap::Xaction::handleCommClosed()
-{
     static const auto d = MakeNamedErrorDetail("ICAP_XACT_CLOSE");
     detailError(d);
     mustStop("ICAP service connection externally closed");
@@ -413,7 +387,8 @@ void Adaptation::Icap::Xaction::callEnd()
 
 bool Adaptation::Icap::Xaction::doneAll() const
 {
-    return !waitingForDns && !connector && !securer && !reader && !writer &&
+    return !waitingForDns && !transportWait && !encryptionWait &&
+           !reader && !writer &&
            Adaptation::Initiate::doneAll();
 }
 
@@ -557,7 +532,7 @@ bool Adaptation::Icap::Xaction::doneWriting() const
 bool Adaptation::Icap::Xaction::doneWithIo() const
 {
     return haveConnection() &&
-           !connector && !reader && !writer && // fast checks, some redundant
+           !transportWait && !reader && !writer && // fast checks, some redundant
            doneReading() && doneWriting();
 }
 
@@ -597,10 +572,7 @@ void Adaptation::Icap::Xaction::setOutcome(const Adaptation::Icap::XactOutcome &
 void Adaptation::Icap::Xaction::swanSong()
 {
     // kids should sing first and then call the parent method.
-    if (cs.valid()) {
-        debugs(93,6, HERE << id << " about to notify ConnOpener!");
-        CallJobHere(93, 3, cs, Comm::ConnOpener, noteAbort);
-        cs = NULL;
+    if (transportWait || encryptionWait) {
         service().noteConnectionFailed("abort");
     }
 
@@ -743,20 +715,12 @@ Ssl::IcapPeerConnector::noteNegotiationDone(ErrorState *error)
 void
 Adaptation::Icap::Xaction::handleSecuredPeer(Security::EncryptorAnswer &answer)
 {
-    Must(securer != NULL);
-    securer = NULL;
-
-    if (closer != NULL) {
-        if (Comm::IsConnOpen(answer.conn))
-            comm_remove_close_handler(answer.conn->fd, closer);
-        else
-            closer->cancel("securing completed");
-        closer = NULL;
-    }
+    encryptionWait.finish();
 
+    assert(!answer.tunneled);
     if (answer.error.get()) {
-        if (answer.conn != NULL)
-            answer.conn->close();
+        assert(!answer.conn);
+        // TODO: Refactor dieOnConnectionFailure() to be usable here as well.
         debugs(93, 2, typeName <<
                " TLS negotiation to " << service().cfg().uri << " failed");
         service().noteConnectionFailed("failure");
@@ -767,8 +731,18 @@ Adaptation::Icap::Xaction::handleSecuredPeer(Security::EncryptorAnswer &answer)
 
     debugs(93, 5, "TLS negotiation to " << service().cfg().uri << " complete");
 
-    service().noteConnectionUse(answer.conn);
+    assert(answer.conn);
+
+    // The socket could get closed while our callback was queued. Sync
+    // Connection. XXX: Connection::fd may already be stale/invalid here.
+    if (answer.conn->isOpen() && fd_table[answer.conn->fd].closing()) {
+        answer.conn->noteClosure();
+        service().noteConnectionFailed("external TLS connection closure");
+        static const auto d = MakeNamedErrorDetail("ICAP_XACT_SSL_CLOSE");
+        detailError(d);
+        throw TexcHere("external closure of the TLS ICAP service connection");
+    }
 
-    handleCommConnected();
+    useIcapConnection(answer.conn);
 }
 
index 8fcceaa0ac05a7f2e3d738635b6bc1274ce78d68..9c2742149a20942d1ae8e87dacd971202d0d3818 100644 (file)
@@ -12,6 +12,7 @@
 #include "AccessLogEntry.h"
 #include "adaptation/icap/ServiceRep.h"
 #include "adaptation/Initiate.h"
+#include "base/JobWait.h"
 #include "comm/ConnOpener.h"
 #include "error/forward.h"
 #include "HttpReply.h"
 
 class MemBuf;
 
+namespace Ssl {
+class IcapPeerConnector;
+}
+
 namespace Adaptation
 {
 namespace Icap
@@ -65,12 +70,12 @@ protected:
     virtual void start();
     virtual void noteInitiatorAborted(); // TODO: move to Adaptation::Initiate
 
+    /// starts sending/receiving ICAP messages
+    virtual void startShoveling() = 0;
+
     // comm hanndlers; called by comm handler wrappers
-    virtual void handleCommConnected() = 0;
     virtual void handleCommWrote(size_t sz) = 0;
     virtual void handleCommRead(size_t sz) = 0;
-    virtual void handleCommTimedout();
-    virtual void handleCommClosed();
 
     void handleSecuredPeer(Security::EncryptorAnswer &answer);
     /// record error detail if possible
@@ -78,7 +83,6 @@ protected:
 
     void openConnection();
     void closeConnection();
-    void dieOnConnectionFailure();
     bool haveConnection() const;
 
     void scheduleRead();
@@ -124,11 +128,13 @@ public:
     ServiceRep &service();
 
 private:
+    void useTransportConnection(const Comm::ConnectionPointer &);
+    void useIcapConnection(const Comm::ConnectionPointer &);
+    void dieOnConnectionFailure();
     void tellQueryAborted();
     void maybeLog();
 
 protected:
-    Comm::ConnectionPointer connection;     ///< ICAP server connection
     Adaptation::Icap::ServiceRep::Pointer theService;
 
     SBuf readBuf;
@@ -139,13 +145,8 @@ protected:
     bool ignoreLastWrite;
     bool waitingForDns; ///< expecting a ipcache_nbgethostbyname() callback
 
-    const char *stopReason;
-
-    // active (pending) comm callbacks for the ICAP server connection
-    AsyncCall::Pointer connector;
     AsyncCall::Pointer reader;
     AsyncCall::Pointer writer;
-    AsyncCall::Pointer closer;
 
     AccessLogEntry::Pointer alep; ///< icap.log entry
     AccessLogEntry &al; ///< short for *alep
@@ -155,8 +156,16 @@ protected:
     timeval icap_tio_finish;   /*time when the last byte of the ICAP responsewas received*/
 
 private:
-    Comm::ConnOpener::Pointer cs;
-    AsyncCall::Pointer securer; ///< whether we are securing a connection
+    /// waits for a transport connection to the ICAP server to be established/opened
+    JobWait<Comm::ConnOpener> transportWait;
+
+    /// waits for the established transport connection to be secured/encrypted
+    JobWait<Ssl::IcapPeerConnector> encryptionWait;
+
+    /// open and, if necessary, secured connection to the ICAP server (or nil)
+    Comm::ConnectionPointer connection;
+
+    AsyncCall::Pointer closer;
 };
 
 } // namespace Icap
index f025c039421783f7d07ac60e7559490609091de3..795f8d9fddd2e3a98e15feefce3306b240ece6a0 100644 (file)
 
 InstanceIdDefinitions(AsyncJob, "job");
 
-AsyncJob::Pointer AsyncJob::Start(AsyncJob *j)
+void
+AsyncJob::Start(const Pointer &job)
 {
-    AsyncJob::Pointer job(j);
     CallJobHere(93, 5, job, AsyncJob, start);
-    return job;
+    job->started_ = true; // it is the attempt that counts
 }
 
 AsyncJob::AsyncJob(const char *aTypeName) :
@@ -38,6 +38,7 @@ AsyncJob::~AsyncJob()
 {
     debugs(93,5, "AsyncJob destructed, this=" << this <<
            " type=" << typeName << " [" << id << ']');
+    assert(!started_ || swanSang_);
 }
 
 void AsyncJob::start()
@@ -141,9 +142,16 @@ void AsyncJob::callEnd()
         AsyncCall::Pointer inCallSaved = inCall;
         void *thisSaved = this;
 
+        // TODO: Swallow swanSong() exceptions to reduce memory leaks.
+
+        // Job callback invariant: swanSong() is (only) called for started jobs.
+        // Here to detect violations in kids that forgot to call our swanSong().
+        assert(started_);
+
+        swanSang_ = true; // it is the attempt that counts
         swanSong();
 
-        delete this; // this is the only place where the object is deleted
+        delete this; // this is the only place where a started job is deleted
 
         // careful: this object does not exist any more
         debugs(93, 6, HERE << *inCallSaved << " ended " << thisSaved);
index e02ab1b5de598b5851f2708813a71087e3bafa56..b0e4abf4dd84798e0d151fdd5792bd9683d28c2e 100644 (file)
@@ -36,8 +36,13 @@ public:
 public:
     AsyncJob(const char *aTypeName);
 
-    /// starts a freshly created job (i.e., makes the job asynchronous)
-    static Pointer Start(AsyncJob *job);
+    /// Promises to start the configured job (eventually). The job is deemed to
+    /// be running asynchronously beyond this point, so the caller should only
+    /// access the job object via AsyncCalls rather than directly.
+    ///
+    /// swanSong() is only called for jobs for which this method has returned
+    /// successfully (i.e. without throwing).
+    static void Start(const Pointer &job);
 
 protected:
     // XXX: temporary method to replace "delete this" in jobs-in-transition.
@@ -62,6 +67,11 @@ public:
     /// called when the job throws during an async call
     virtual void callException(const std::exception &e);
 
+    /// process external request to terminate now (i.e. during this async call)
+    void handleStopRequest() { mustStop("externally aborted"); }
+
+    const InstanceId<AsyncJob> id; ///< job identifier
+
 protected:
     // external destruction prohibited to ensure swanSong() is called
     virtual ~AsyncJob();
@@ -69,7 +79,9 @@ protected:
     const char *stopReason; ///< reason for forcing done() to be true
     const char *typeName; ///< kid (leaf) class name, for debugging
     AsyncCall::Pointer inCall; ///< the asynchronous call being handled, if any
-    const InstanceId<AsyncJob> id; ///< job identifier
+
+    bool started_ = false; ///< Start() has finished successfully
+    bool swanSang_ = false; ///< swanSong() was called
 };
 
 #endif /* SQUID_ASYNC_JOB_H */
diff --git a/src/base/JobWait.cc b/src/base/JobWait.cc
new file mode 100644 (file)
index 0000000..ba68324
--- /dev/null
@@ -0,0 +1,81 @@
+/*
+ * Copyright (C) 1996-2020 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#include "squid.h"
+#include "base/AsyncJobCalls.h"
+#include "base/JobWait.h"
+
+#include <cassert>
+#include <iostream>
+
+JobWaitBase::JobWaitBase() = default;
+
+JobWaitBase::~JobWaitBase()
+{
+    cancel("owner gone");
+}
+
+void
+JobWaitBase::start_(const AsyncJob::Pointer aJob, const AsyncCall::Pointer aCall)
+{
+    // Invariant: The wait will be over. We cannot guarantee that the job will
+    // call the callback, of course, but we can check these prerequisites.
+    assert(aCall);
+    assert(aJob.valid());
+
+    // "Double" waiting state leads to conflicting/mismatching callbacks
+    // detailed in finish(). Detect that bug ASAP.
+    assert(!waiting());
+
+    assert(!callback_);
+    assert(!job_);
+    callback_ = aCall;
+    job_ = aJob;
+
+    AsyncJob::Start(job_.get());
+}
+
+void
+JobWaitBase::finish()
+{
+    // Unexpected callbacks might result in disasters like secrets exposure,
+    // data corruption, or expensive message routing mistakes when the callback
+    // info is applied to the wrong message part or acted upon prematurely.
+    assert(waiting());
+    clear();
+}
+
+void
+JobWaitBase::cancel(const char *reason)
+{
+    if (callback_) {
+        callback_->cancel(reason);
+
+        // Instead of AsyncJob, the class parameter could be Job. That would
+        // avoid runtime child-to-parent CbcPointer conversion overheads, but
+        // complicate support for Jobs with virtual AsyncJob bases (GCC error:
+        // "pointer to member conversion via virtual base AsyncJob") and also
+        // cache-log "Job::handleStopRequest()" with a non-existent class name.
+        CallJobHere(callback_->debugSection, callback_->debugLevel, job_, AsyncJob, handleStopRequest);
+
+        clear();
+    }
+}
+
+void
+JobWaitBase::print(std::ostream &os) const
+{
+    // use a backarrow to emphasize that this is a callback: call24<-job6
+    if (callback_)
+        os << callback_->id << "<-";
+    if (const auto rawJob = job_.get())
+        os << rawJob->id;
+    else
+        os << job_; // raw pointer of a gone job may still be useful for triage
+}
+
diff --git a/src/base/JobWait.h b/src/base/JobWait.h
new file mode 100644 (file)
index 0000000..6b11313
--- /dev/null
@@ -0,0 +1,91 @@
+/*
+ * Copyright (C) 1996-2021 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#ifndef SQUID_BASE_JOBWAIT_H
+#define SQUID_BASE_JOBWAIT_H
+
+#include "base/AsyncJob.h"
+#include "base/CbcPointer.h"
+
+#include <iosfwd>
+
+/// Manages waiting for an AsyncJob callback. Use type-safe JobWait instead.
+/// This base class does not contain code specific to the actual Job type.
+class JobWaitBase
+{
+public:
+    JobWaitBase();
+    ~JobWaitBase();
+
+    /// no copying of any kind: each waiting context needs a dedicated AsyncCall
+    JobWaitBase(JobWaitBase &&) = delete;
+
+    explicit operator bool() const { return waiting(); }
+
+    /// whether we are currently waiting for the job to call us back
+    /// the job itself may be gone even if this returns true
+    bool waiting() const { return bool(callback_); }
+
+    /// ends wait (after receiving the call back)
+    /// forgets the job which is likely to be gone by now
+    void finish();
+
+    /// aborts wait (if any) before receiving the call back
+    /// does nothing if we are not waiting
+    void cancel(const char *reason);
+
+    /// summarizes what we are waiting for (for debugging)
+    void print(std::ostream &) const;
+
+protected:
+    /// starts waiting for the given job to call the given callback
+    void start_(AsyncJob::Pointer, AsyncCall::Pointer);
+
+private:
+    /// the common part of finish() and cancel()
+    void clear() { job_.clear(); callback_ = nullptr; }
+
+    /// the job that we are waiting to call us back (or nil)
+    AsyncJob::Pointer job_;
+
+    /// the call we are waiting for the job_ to make (or nil)
+    AsyncCall::Pointer callback_;
+};
+
+/// Manages waiting for an AsyncJob callback.
+/// Completes JobWaitBase by providing Job type-specific members.
+template <class Job>
+class JobWait: public JobWaitBase
+{
+public:
+    typedef CbcPointer<Job> JobPointer;
+
+    /// starts waiting for the given job to call the given callback
+    void start(const JobPointer &aJob, const AsyncCall::Pointer &aCallback) {
+        start_(aJob, aCallback);
+        typedJob_ = aJob;
+    }
+
+    /// \returns a cbdata pointer to the job we are waiting for (or nil)
+    /// the returned pointer may be falsy, even if we are still waiting()
+    JobPointer job() const { return waiting() ? typedJob_ : nullptr; }
+
+private:
+    /// nearly duplicates JobWaitBase::job_ but exposes the actual job type
+    JobPointer typedJob_;
+};
+
+inline
+std::ostream &operator <<(std::ostream &os, const JobWaitBase &wait)
+{
+    wait.print(os);
+    return os;
+}
+
+#endif /* SQUID_BASE_JOBWAIT_H */
+
index 4cf9d9a9f3d8e104ade1df63ac77a79a71b87f55..db47706c5457652b57b49751c3b448b3ce81ffe0 100644 (file)
@@ -35,6 +35,8 @@ libbase_la_SOURCES = \
        Here.h \
        InstanceId.cc \
        InstanceId.h \
+       JobWait.cc \
+       JobWait.h \
        Lock.h \
        LookupTable.h \
        Optional.h \
index 0d7157098a7d6cb4deada8bd5992c3f9d332f79b..085c0d9d597d188f36535e1098eeaecaeb2c66a0 100644 (file)
@@ -20,6 +20,7 @@ template <typename Value> class Optional;
 
 template<class Cbc> class CbcPointer;
 template<class RefCountableKid> class RefCount;
+template<class Job> class JobWait;
 
 typedef CbcPointer<AsyncJob> AsyncJobPointer;
 typedef RefCount<CodeContext> CodeContextPointer;
index 2bd01b98c51af153beeb64da0556783fb151c083..a23efb19e44c138b11baa6d3bac02a1d8d0f6be7 100644 (file)
@@ -501,6 +501,10 @@ httpRequestFree(void *data)
 /* This is a handler normally called by comm_close() */
 void ConnStateData::connStateClosed(const CommCloseCbParams &)
 {
+    if (clientConnection) {
+        clientConnection->noteClosure();
+        // keep closed clientConnection for logging, clientdb cleanup, etc.
+    }
     deleteThis("ConnStateData::connStateClosed");
 }
 
index 807e5673487fa108753a572bf98a863b580bbeb8..0f5c9e021a5755500db2fc00fb00cdf20e20b76d 100644 (file)
@@ -202,10 +202,6 @@ Ftp::Client::Client(FwdState *fwdState):
 
 Ftp::Client::~Client()
 {
-    if (data.opener != NULL) {
-        data.opener->cancel("Ftp::Client destructed");
-        data.opener = NULL;
-    }
     data.close();
 
     safe_free(old_request);
@@ -787,10 +783,10 @@ Ftp::Client::connectDataChannel()
     debugs(9, 3, "connecting to " << conn->remote);
 
     typedef CommCbMemFunT<Client, CommConnectCbParams> Dialer;
-    data.opener = JobCallback(9, 3, Dialer, this, Ftp::Client::dataChannelConnected);
-    Comm::ConnOpener *cs = new Comm::ConnOpener(conn, data.opener, Config.Timeout.connect);
+    AsyncCall::Pointer callback = JobCallback(9, 3, Dialer, this, Ftp::Client::dataChannelConnected);
+    const auto cs = new Comm::ConnOpener(conn, callback, Config.Timeout.connect);
     cs->setHost(data.host);
-    AsyncJob::Start(cs);
+    dataConnWait.start(cs, callback);
 }
 
 bool
@@ -812,10 +808,11 @@ void
 Ftp::Client::dataClosed(const CommCloseCbParams &)
 {
     debugs(9, 4, status());
+    if (data.conn)
+        data.conn->noteClosure();
     if (data.listenConn != NULL) {
         data.listenConn->close();
         data.listenConn = NULL;
-        // NP clear() does the: data.fd = -1;
     }
     data.clear();
 }
@@ -880,6 +877,8 @@ void
 Ftp::Client::ctrlClosed(const CommCloseCbParams &)
 {
     debugs(9, 4, status());
+    if (ctrl.conn)
+        ctrl.conn->noteClosure();
     ctrl.clear();
     doneWithFwd = "ctrlClosed()"; // assume FwdState is monitoring too
     mustStop("Ftp::Client::ctrlClosed");
index eb5ea1b120feed179c28e3f4406a3c82929fa0f9..60d8b009d691d53f7b4b85e2fe390ec44b9336a3 100644 (file)
@@ -64,7 +64,6 @@ public:
      */
     Comm::ConnectionPointer listenConn;
 
-    AsyncCall::Pointer opener; ///< Comm opener handler callback.
 private:
     AsyncCall::Pointer closer; ///< Comm close handler callback
 };
@@ -205,6 +204,10 @@ protected:
     virtual void sentRequestBody(const CommIoCbParams &io);
     virtual void doneSendingRequestBody();
 
+    /// Waits for an FTP data connection to the server to be established/opened.
+    /// This wait only happens in FTP passive mode (via PASV or EPSV).
+    JobWait<Comm::ConnOpener> dataConnWait;
+
 private:
     bool parseControlReply(size_t &bytesUsed);
 
index a80c4363cfb167162b33121afda6ce466ddbcb3a..504b1ea3cbf6512dad2a461c76ed921bf9e5310c 100644 (file)
@@ -483,11 +483,9 @@ Ftp::Gateway::timeout(const CommTimeoutCbParams &io)
         flags.pasv_supported = false;
         debugs(9, DBG_IMPORTANT, "FTP Gateway timeout in SENT_PASV state");
 
-        // cancel the data connection setup.
-        if (data.opener != NULL) {
-            data.opener->cancel("timeout");
-            data.opener = NULL;
-        }
+        // cancel the data connection setup, if any
+        dataConnWait.cancel("timeout");
+
         data.close();
     }
 
@@ -1722,7 +1720,7 @@ void
 Ftp::Gateway::dataChannelConnected(const CommConnectCbParams &io)
 {
     debugs(9, 3, HERE);
-    data.opener = NULL;
+    dataConnWait.finish();
 
     if (io.flag != Comm::OK) {
         debugs(9, 2, HERE << "Failed to connect. Retrying via another method.");
@@ -2682,9 +2680,9 @@ Ftp::Gateway::mayReadVirginReplyBody() const
     return !doneWithServer();
 }
 
-AsyncJob::Pointer
+void
 Ftp::StartGateway(FwdState *const fwdState)
 {
-    return AsyncJob::Start(new Ftp::Gateway(fwdState));
+    AsyncJob::Start(new Ftp::Gateway(fwdState));
 }
 
index b18dce21f57f992a274dfc859aff735d550ff28d..da84beea54998f4c7a828a9ac22b68fe66a051f5 100644 (file)
@@ -733,7 +733,7 @@ void
 Ftp::Relay::dataChannelConnected(const CommConnectCbParams &io)
 {
     debugs(9, 3, status());
-    data.opener = NULL;
+    dataConnWait.finish();
 
     if (io.flag != Comm::OK) {
         debugs(9, 2, "failed to connect FTP server data channel");
@@ -798,9 +798,9 @@ Ftp::Relay::HandleStoreAbort(Relay *ftpClient)
         ftpClient->dataComplete();
 }
 
-AsyncJob::Pointer
+void
 Ftp::StartRelay(FwdState *const fwdState)
 {
-    return AsyncJob::Start(new Ftp::Relay(fwdState));
+    AsyncJob::Start(new Ftp::Relay(fwdState));
 }
 
index 0c5e98d8f9b7e1b5f362a13892ff61ca17f32a26..71457866078ac5017b4d4e25159edcbf0ba05945 100644 (file)
@@ -101,6 +101,11 @@ void
 Http::Tunneler::handleConnectionClosure(const CommCloseCbParams &)
 {
     closer = nullptr;
+    if (connection) {
+        countFailingConnection();
+        connection->noteClosure();
+        connection = nullptr;
+    }
     bailWith(new ErrorState(ERR_CONNECT_FAIL, Http::scBadGateway, request.getRaw(), al));
 }
 
@@ -360,50 +365,64 @@ Http::Tunneler::bailWith(ErrorState *error)
     Must(error);
     answer().squidError = error;
 
-    if (const auto p = connection->getPeer())
-        peerConnectFailed(p);
+    if (const auto failingConnection = connection) {
+        // TODO: Reuse to-peer connections after a CONNECT error response.
+        countFailingConnection();
+        disconnect();
+        failingConnection->close();
+    }
 
     callBack();
-    disconnect();
-
-    if (noteFwdPconnUse)
-        fwdPconnPool->noteUses(fd_table[connection->fd].pconn.uses);
-    // TODO: Reuse to-peer connections after a CONNECT error response.
-    connection->close();
-    connection = nullptr;
 }
 
 void
 Http::Tunneler::sendSuccess()
 {
     assert(answer().positive());
-    callBack();
+    assert(Comm::IsConnOpen(connection));
+    answer().conn = connection;
     disconnect();
+    callBack();
+}
+
+void
+Http::Tunneler::countFailingConnection()
+{
+    assert(connection);
+    if (const auto p = connection->getPeer())
+        peerConnectFailed(p);
+    if (noteFwdPconnUse && connection->isOpen())
+        fwdPconnPool->noteUses(fd_table[connection->fd].pconn.uses);
 }
 
 void
 Http::Tunneler::disconnect()
 {
+    const auto stillOpen = Comm::IsConnOpen(connection);
+
     if (closer) {
-        comm_remove_close_handler(connection->fd, closer);
+        if (stillOpen)
+            comm_remove_close_handler(connection->fd, closer);
         closer = nullptr;
     }
 
     if (reader) {
-        Comm::ReadCancel(connection->fd, reader);
+        if (stillOpen)
+            Comm::ReadCancel(connection->fd, reader);
         reader = nullptr;
     }
 
-    // remove connection timeout handler
-    commUnsetConnTimeout(connection);
+    if (stillOpen)
+        commUnsetConnTimeout(connection);
+
+    connection = nullptr; // may still be open
 }
 
 void
 Http::Tunneler::callBack()
 {
-    debugs(83, 5, connection << status());
-    if (answer().positive())
-        answer().conn = connection;
+    debugs(83, 5, answer().conn << status());
+    assert(!connection); // returned inside answer() or gone
     auto cb = callback;
     callback = nullptr;
     ScheduleCallHere(cb);
@@ -415,11 +434,10 @@ Http::Tunneler::swanSong()
     AsyncJob::swanSong();
 
     if (callback) {
-        if (requestWritten && tunnelEstablished) {
+        if (requestWritten && tunnelEstablished && Comm::IsConnOpen(connection)) {
             sendSuccess();
         } else {
-            // we should have bailed when we discovered the job-killing problem
-            debugs(83, DBG_IMPORTANT, "BUG: Unexpected state while establishing a CONNECT tunnel " << connection << status());
+            // job-ending emergencies like handleStopRequest() or callException()
             bailWith(new ErrorState(ERR_GATEWAY_FAILURE, Http::scInternalServerError, request.getRaw(), al));
         }
         assert(!callback);
index 3810b0cebbea702041526a52c29b051c5f4ebff3..dfd844e2ffefe4c89884ea524ba281ee26444e26 100644 (file)
@@ -87,6 +87,7 @@ protected:
     void handleResponse(const bool eof);
     void bailOnResponseError(const char *error, HttpReply *);
 
+private:
     /// sends the given error to the initiator
     void bailWith(ErrorState*);
 
@@ -96,12 +97,14 @@ protected:
     /// a bailWith(), sendSuccess() helper: sends results to the initiator
     void callBack();
 
-    /// a bailWith(), sendSuccess() helper: stops monitoring the connection
+    /// stops monitoring the connection
     void disconnect();
 
+    /// updates connection usage history before the connection is closed
+    void countFailingConnection();
+
     TunnelerAnswer &answer();
 
-private:
     AsyncCall::Pointer writer; ///< called when the request has been written
     AsyncCall::Pointer reader; ///< called when the response should be read
     AsyncCall::Pointer closer; ///< called when the connection is being closed
index 56561ccead437e8e93c275fc02ab0ef49d47f30d..c423ee2f395628384019e2b93da150c9048a24f8 100644 (file)
@@ -28,10 +28,10 @@ namespace Ftp
 {
 
 /// A new FTP Gateway job
-AsyncJobPointer StartGateway(FwdState *const fwdState);
+void StartGateway(FwdState *const fwdState);
 
 /// A new FTP Relay job
-AsyncJobPointer StartRelay(FwdState *const fwdState);
+void StartRelay(FwdState *const fwdState);
 
 /** Construct an URI with leading / in PATH portion for use by CWD command
  *  possibly others. FTP encodes absolute paths as beginning with '/'
index 05122a4e29ec1ac77274fb6daf46284b2e614675..49d1dcd1efdbafca3d9085f91930ff9fd8d33c84 100644 (file)
@@ -743,6 +743,10 @@ commCallCloseHandlers(int fd)
         // If call is not canceled schedule it for execution else ignore it
         if (!call->canceled()) {
             debugs(5, 5, "commCallCloseHandlers: ch->handler=" << call);
+            // XXX: Without the following code, callback fd may be -1.
+            // typedef CommCloseCbParams Params;
+            // auto &params = GetCommParams<Params>(call);
+            // params.fd = fd;
             ScheduleCallHere(call);
         }
     }
@@ -1700,6 +1704,10 @@ DeferredReadManager::CloseHandler(const CommCloseCbParams &params)
     CbDataList<DeferredRead> *temp = (CbDataList<DeferredRead> *)params.data;
 
     temp->element.closer = NULL;
+    if (temp->element.theRead.conn) {
+        temp->element.theRead.conn->noteClosure();
+        temp->element.theRead.conn = nullptr;
+    }
     temp->element.markCancelled();
 }
 
@@ -1773,6 +1781,11 @@ DeferredReadManager::kickARead(DeferredRead const &aRead)
     if (aRead.cancelled)
         return;
 
+    // TODO: This check still allows theReader call with a closed theRead.conn.
+    // If a delayRead() caller has a close connection handler, then such a call
+    // would be useless and dangerous. If a delayRead() caller does not have it,
+    // then the caller will get stuck when an external connection closure makes
+    // aRead.cancelled (checked above) true.
     if (Comm::IsConnOpen(aRead.theRead.conn) && fd_table[aRead.theRead.conn->fd].closing())
         return;
 
index 518547703bb376085652fb1a9aa442b5d1189f78..f405f8fd5428a3d637cb8325ae6b5260f08410f8 100644 (file)
@@ -41,6 +41,14 @@ Comm::ConnOpener::ConnOpener(const Comm::ConnectionPointer &c, const AsyncCall::
     deadline_(squid_curtime + static_cast<time_t>(ctimeout))
 {
     debugs(5, 3, "will connect to " << c << " with " << ctimeout << " timeout");
+    assert(conn_); // we know where to go
+
+    // Sharing a being-modified Connection object with the caller is dangerous,
+    // but we cannot ban (or even check for) that using existing APIs. We do not
+    // want to clone "just in case" because cloning is a bit expensive, and most
+    // callers already have a non-owned Connection object to give us. Until the
+    // APIs improve, we can only check that the connection is not open.
+    assert(!conn_->isOpen());
 }
 
 Comm::ConnOpener::~ConnOpener()
@@ -78,6 +86,10 @@ Comm::ConnOpener::swanSong()
     if (temporaryFd_ >= 0)
         closeFd();
 
+    // did we abort while owning an open connection?
+    if (conn_ && conn_->isOpen())
+        conn_->close();
+
     // did we abort while waiting between retries?
     if (calls_.sleep_)
         cancelSleep();
@@ -131,9 +143,18 @@ Comm::ConnOpener::sendAnswer(Comm::Flag errFlag, int xerrno, const char *why)
                    " [" << callback_->id << ']' );
             // TODO save the pconn to the pconnPool ?
         } else {
+            assert(conn_);
+
+            // free resources earlier and simplify recipients
+            if (errFlag != Comm::OK)
+                conn_->close(); // may not be opened
+            else
+                assert(conn_->isOpen());
+
             typedef CommConnectCbParams Params;
             Params &params = GetCommParams<Params>(callback_);
             params.conn = conn_;
+            conn_ = nullptr; // release ownership; prevent closure by us
             params.flag = errFlag;
             params.xerrno = xerrno;
             ScheduleCallHere(callback_);
@@ -152,7 +173,7 @@ Comm::ConnOpener::sendAnswer(Comm::Flag errFlag, int xerrno, const char *why)
 void
 Comm::ConnOpener::cleanFd()
 {
-    debugs(5, 4, HERE << conn_ << " closing temp FD " << temporaryFd_);
+    debugs(5, 4, conn_ << "; temp FD " << temporaryFd_);
 
     Must(temporaryFd_ >= 0);
     fde &f = fd_table[temporaryFd_];
@@ -258,6 +279,7 @@ bool
 Comm::ConnOpener::createFd()
 {
     Must(temporaryFd_ < 0);
+    assert(conn_);
 
     // our initiators signal abort by cancelling their callbacks
     if (callback_ == NULL || callback_->canceled())
index c403693ec159ccfc8af0c319acd648ff81978d55..68ef114ab074e11b8af0c48af8a6b11d0582ae68 100644 (file)
 namespace Comm
 {
 
-/**
- * Async-opener of a Comm connection.
- */
+/// Asynchronously opens a TCP connection. Returns CommConnectCbParams: either
+/// Comm::OK with an open connection or another Comm::Flag with a closed one.
 class ConnOpener : public AsyncJob
 {
     CBDATA_CLASS(ConnOpener);
 
 public:
-    void noteAbort() { mustStop("externally aborted"); }
-
     typedef CbcPointer<ConnOpener> Pointer;
 
     virtual bool doneAll() const;
index 2ada05acb9cc02795ab87583a34279f1ec9c93ff..279b1c2f4a0991de54729d2332c99ab50cbe34d7 100644 (file)
@@ -7,6 +7,7 @@
  */
 
 #include "squid.h"
+#include "base/JobWait.h"
 #include "CachePeer.h"
 #include "cbdata.h"
 #include "comm.h"
@@ -60,26 +61,44 @@ Comm::Connection::~Connection()
 }
 
 Comm::ConnectionPointer
-Comm::Connection::cloneDestinationDetails() const
+Comm::Connection::cloneProfile() const
 {
-    const ConnectionPointer c = new Comm::Connection;
-    c->setAddrs(local, remote);
-    c->peerType = peerType;
-    c->flags = flags;
-    c->peer_ = cbdataReference(getPeer());
-    assert(!c->isOpen());
-    return c;
-}
+    const ConnectionPointer clone = new Comm::Connection;
+    auto &c = *clone; // optimization
+
+    /*
+     * Copy or excuse each data member. Excused members do not belong to a
+     * Connection configuration profile because their values cannot be reused
+     * across (co-existing) Connection objects and/or are tied to their own
+     * object lifetime.
+     */
+
+    c.setAddrs(local, remote);
+    c.peerType = peerType;
+    // fd excused
+    c.tos = tos;
+    c.nfmark = nfmark;
+    c.nfConnmark = nfConnmark;
+    // COMM_ORPHANED is not a part of connection opening instructions
+    c.flags = flags & ~COMM_ORPHANED;
+    // rfc931 is excused
+
+#if USE_SQUID_EUI
+    // These are currently only set when accepting connections and never used
+    // for establishing new ones, so this copying is currently in vain, but,
+    // technically, they can be a part of connection opening instructions.
+    c.remoteEui48 = remoteEui48;
+    c.remoteEui64 = remoteEui64;
+#endif
 
-Comm::ConnectionPointer
-Comm::Connection::cloneIdentDetails() const
-{
-    auto c = cloneDestinationDetails();
-    c->tos = tos;
-    c->nfmark = nfmark;
-    c->nfConnmark = nfConnmark;
-    c->startTime_ = startTime_;
-    return c;
+    // id excused
+    c.peer_ = cbdataReference(getPeer());
+    // startTime_ excused
+    // tlsHistory excused
+
+    debugs(5, 5, this << " made " << c);
+    assert(!c.isOpen());
+    return clone;
 }
 
 void
index d63e324d043a45c57ef4ba8173a020abadae6083..73fcc51369fc4d748d16315133cce5b2a2ccd2fe 100644 (file)
@@ -77,13 +77,12 @@ public:
     /** Clear the connection properties and close any open socket. */
     virtual ~Connection();
 
-    /// Create a new (closed) IDENT Connection object based on our from-Squid
-    /// connection properties.
-    ConnectionPointer cloneIdentDetails() const;
+    /// To prevent accidental copying of Connection objects that we started to
+    /// open or that are open, use cloneProfile() instead.
+    Connection(const Connection &&) = delete;
 
-    /// Create a new (closed) Connection object pointing to the same destination
-    /// as this from-Squid connection.
-    ConnectionPointer cloneDestinationDetails() const;
+    /// Create a new closed Connection with the same configuration as this one.
+    ConnectionPointer cloneProfile() const;
 
     /// close the still-open connection when its last reference is gone
     void enterOrphanage() { flags |= COMM_ORPHANED; }
@@ -140,17 +139,6 @@ public:
     virtual ScopedId codeContextGist() const override;
     virtual std::ostream &detailCodeContext(std::ostream &os) const override;
 
-private:
-    /** These objects may not be exactly duplicated. Use cloneIdentDetails() or
-     * cloneDestinationDetails() instead.
-     */
-    Connection(const Connection &c);
-
-    /** These objects may not be exactly duplicated. Use cloneIdentDetails() or
-     * cloneDestinationDetails() instead.
-     */
-    Connection & operator =(const Connection &c);
-
 public:
     /** Address/Port for the Squid end of a TCP link. */
     Ip::Address local;
index 0c3addfa5b2522820aa963343b59bbfbe57b531f..cbc89fd9e30e1bcb06242ac070f3969feefaba63 100644 (file)
@@ -193,7 +193,10 @@ void
 Comm::TcpAcceptor::handleClosure(const CommCloseCbParams &)
 {
     closer_ = NULL;
-    conn = NULL;
+    if (conn) {
+        conn->noteClosure();
+        conn = nullptr;
+    }
     Must(done());
 }
 
index 03431f21bbdbf11916c34d4e196a966270741c9d..ce0d9ded4ebd616e56f9754a1c34c1fb86fa305d 100644 (file)
@@ -874,6 +874,10 @@ static void
 idnsVCClosed(const CommCloseCbParams &params)
 {
     nsvc * vc = (nsvc *)params.data;
+    if (vc->conn) {
+        vc->conn->noteClosure();
+        vc->conn = nullptr;
+    }
     delete vc;
 }
 
index e0be32d34fd2b78c0f6a6471672e52982f3509a5..837186160df849e2499feec46bbe7c8bc2dd2558 100644 (file)
@@ -29,10 +29,8 @@ class Eui48
 
 public:
     Eui48() { clear(); }
-    Eui48(const Eui48 &t) { memcpy(this, &t, sizeof(Eui48)); }
     bool operator== (const Eui48 &t) const { return memcmp(eui, t.eui, SZ_EUI48_BUF) == 0; }
     bool operator< (const Eui48 &t) const { return memcmp(eui, t.eui, SZ_EUI48_BUF) < 0; }
-    ~Eui48() {}
 
     const unsigned char *get(void);
 
index 0a062cc35146f4374e5d2f8d9738262536528eb4..c8e641b190bb814733fad5b42844be637425c0ba 100644 (file)
@@ -290,7 +290,7 @@ Rock::Rebuild::Start(SwapDir &dir)
         return false;
     }
 
-    Must(AsyncJob::Start(new Rebuild(&dir, stats)));
+    AsyncJob::Start(new Rebuild(&dir, stats));
     return true;
 }
 
index a592984a5b4722a82ae1849dafa7d562d6b96c8c..242e8203ca032fd298bd6edc87e3ce40e30f6c86 100644 (file)
@@ -98,11 +98,8 @@ public:
         entry->lock("gopherState");
         *replybuf = 0;
     }
-    ~GopherStateData() {if(buf) swanSong();}
 
-    /* AsyncJob API emulated */
-    void deleteThis(const char *aReason);
-    void swanSong();
+    ~GopherStateData();
 
 public:
     StoreEntry *entry;
@@ -149,30 +146,18 @@ static void
 gopherStateFree(const CommCloseCbParams &params)
 {
     GopherStateData *gopherState = (GopherStateData *)params.data;
-
-    if (gopherState == NULL)
-        return;
-
-    gopherState->deleteThis("gopherStateFree");
+    // Assume that FwdState is monitoring and calls noteClosure(). See XXX about
+    // Connection sharing with FwdState in gopherStart().
+    delete gopherState;
 }
 
-void
-GopherStateData::deleteThis(const char *)
-{
-    swanSong();
-    delete this;
-}
-
-void
-GopherStateData::swanSong()
+GopherStateData::~GopherStateData()
 {
     if (entry)
         entry->unlock("gopherState");
 
-    if (buf) {
+    if (buf)
         memFree(buf, MEM_4K_BUF);
-        buf = nullptr;
-    }
 }
 
 /**
@@ -973,6 +958,7 @@ gopherStart(FwdState * fwd)
         return;
     }
 
+    // XXX: Sharing open Connection with FwdState that has its own handlers/etc.
     gopherState->serverConn = fwd->serverConnection();
     gopherSendRequest(fwd->serverConnection()->fd, gopherState);
     AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "gopherTimeout",
index d8d31560cdf790ed963fc8f5eed31f7b5ef704ba..d887b29469b3f938fecad337dd37ac286f813094 100644 (file)
@@ -11,6 +11,7 @@
 #include "squid.h"
 
 #if USE_IDENT
+#include "base/JobWait.h"
 #include "comm.h"
 #include "comm/Connection.h"
 #include "comm/ConnOpener.h"
@@ -53,8 +54,15 @@ public:
 
     Comm::ConnectionPointer conn;
     MemBuf queryMsg;  ///< the lookup message sent to IDENT server
-    IdentClient *clients;
+    IdentClient *clients = nullptr;
     char buf[IDENT_BUFSIZE];
+
+    /// waits for a connection to the IDENT server to be established/opened
+    JobWait<Comm::ConnOpener> connWait;
+
+private:
+    // use deleteThis() to destroy
+    ~IdentStateData();
 };
 
 CBDATA_CLASS_INIT(IdentStateData);
@@ -73,8 +81,9 @@ static void ClientAdd(IdentStateData * state, IDCB * callback, void *callback_da
 Ident::IdentConfig Ident::TheConfig;
 
 void
-Ident::IdentStateData::deleteThis(const char *)
+Ident::IdentStateData::deleteThis(const char *reason)
 {
+    debugs(30, 3, reason);
     swanSong();
     delete this;
 }
@@ -84,6 +93,10 @@ Ident::IdentStateData::swanSong()
 {
     if (clients != NULL)
         notify(NULL);
+}
+
+Ident::IdentStateData::~IdentStateData() {
+    assert(!clients);
 
     if (Comm::IsConnOpen(conn)) {
         comm_remove_close_handler(conn->fd, Ident::Close, this);
@@ -112,6 +125,10 @@ void
 Ident::Close(const CommCloseCbParams &params)
 {
     IdentStateData *state = (IdentStateData *)params.data;
+    if (state->conn) {
+        state->conn->noteClosure();
+        state->conn = nullptr;
+    }
     state->deleteThis("connection closed");
 }
 
@@ -127,6 +144,16 @@ void
 Ident::ConnectDone(const Comm::ConnectionPointer &conn, Comm::Flag status, int, void *data)
 {
     IdentStateData *state = (IdentStateData *)data;
+    state->connWait.finish();
+
+    // Start owning the supplied connection (so that it is not orphaned if this
+    // function bails early). As a (tiny) optimization or perhaps just diff
+    // minimization, the close handler is added later, when we know we are not
+    // bailing. This delay is safe because comm_remove_close_handler() forgives
+    // missing handlers.
+    assert(conn); // but may be closed
+    assert(!state->conn);
+    state->conn = conn;
 
     if (status != Comm::OK) {
         if (status == Comm::TIMEOUT)
@@ -149,8 +176,8 @@ Ident::ConnectDone(const Comm::ConnectionPointer &conn, Comm::Flag status, int,
         return;
     }
 
-    assert(conn != NULL && conn == state->conn);
-    comm_add_close_handler(conn->fd, Ident::Close, state);
+    assert(state->conn->isOpen());
+    comm_add_close_handler(state->conn->fd, Ident::Close, state);
 
     AsyncCall::Pointer writeCall = commCbCall(5,4, "Ident::WriteFeedback",
                                    CommIoCbPtrFun(Ident::WriteFeedback, state));
@@ -259,10 +286,10 @@ Ident::Start(const Comm::ConnectionPointer &conn, IDCB * callback, void *data)
     state->hash.key = xstrdup(key);
 
     // copy the conn details. We do not want the original FD to be re-used by IDENT.
-    state->conn = conn->cloneIdentDetails();
+    const auto identConn = conn->cloneProfile();
     // NP: use random port for secure outbound to IDENT_PORT
-    state->conn->local.port(0);
-    state->conn->remote.port(IDENT_PORT);
+    identConn->local.port(0);
+    identConn->remote.port(IDENT_PORT);
 
     // build our query from the original connection details
     state->queryMsg.init();
@@ -272,7 +299,8 @@ Ident::Start(const Comm::ConnectionPointer &conn, IDCB * callback, void *data)
     hash_join(ident_hash, &state->hash);
 
     AsyncCall::Pointer call = commCbCall(30,3, "Ident::ConnectDone", CommConnectCbPtrFun(Ident::ConnectDone, state));
-    AsyncJob::Start(new Comm::ConnOpener(state->conn, call, Ident::TheConfig.timeout));
+    const auto connOpener = new Comm::ConnOpener(identConn, call, Ident::TheConfig.timeout);
+    state->connWait.start(connOpener, call);
 }
 
 void
index 16920d9bbd69cc4209f12e5c496f5f8df6fa77cc..c4303e30e8cdeeaf6530fc4b40455aabd69ae51a 100644 (file)
@@ -256,13 +256,16 @@ Log::TcpLogger::doConnect()
 
     typedef CommCbMemFunT<TcpLogger, CommConnectCbParams> Dialer;
     AsyncCall::Pointer call = JobCallback(MY_DEBUG_SECTION, 5, Dialer, this, Log::TcpLogger::connectDone);
-    AsyncJob::Start(new Comm::ConnOpener(futureConn, call, 2));
+    const auto cs = new Comm::ConnOpener(futureConn, call, 2);
+    connWait.start(cs, call);
 }
 
 /// Comm::ConnOpener callback
 void
 Log::TcpLogger::connectDone(const CommConnectCbParams &params)
 {
+    connWait.finish();
+
     if (params.flag != Comm::OK) {
         const double delay = 0.5; // seconds
         if (connectFailures++ % 100 == 0) {
@@ -367,7 +370,10 @@ Log::TcpLogger::handleClosure(const CommCloseCbParams &)
 {
     assert(inCall != NULL);
     closer = NULL;
-    conn = NULL;
+    if (conn) {
+        conn->noteClosure();
+        conn = nullptr;
+    }
     // in all current use cases, we should not try to reconnect
     mustStop("Log::TcpLogger::handleClosure");
 }
index c6e1623264dbacf52e4ef5191efc38f7ce06d40f..c918302cff0dff3f5caf430b84b40dcadce6eace 100644 (file)
@@ -10,6 +10,8 @@
 #define _SQUID_SRC_LOG_TCPLOGGER_H
 
 #include "base/AsyncJob.h"
+#include "base/JobWait.h"
+#include "comm/forward.h"
 #include "ip/Address.h"
 
 #include <list>
@@ -103,6 +105,9 @@ private:
     Ip::Address remote; ///< where the remote logger expects our records
     AsyncCall::Pointer closer; ///< handles unexpected/external conn closures
 
+    /// waits for a connection to the remote logger to be established/opened
+    JobWait<Comm::ConnOpener> connWait;
+
     uint64_t connectFailures; ///< number of sequential connection failures
     uint64_t drops; ///< number of records dropped during the current outage
 };
index c0e6511bbc46112b9b0c25cb549e5517ef13c5fd..f48069c1468c614ecf1343dd0bb673531e28c176 100644 (file)
@@ -100,7 +100,11 @@ void
 Mgr::Forwarder::noteCommClosed(const CommCloseCbParams &)
 {
     debugs(16, 5, HERE);
-    conn = NULL; // needed?
+    closer = nullptr;
+    if (conn) {
+        conn->noteClosure();
+        conn = nullptr;
+    }
     mustStop("commClosed");
 }
 
index fc6d9a81bd4e109d130dff5ab296adcd2b81eebf..8bfa118b3a385a17d388c970c919dcdaa75b9379 100644 (file)
@@ -107,11 +107,14 @@ Mgr::Inquirer::noteWroteHeader(const CommIoCbParams& params)
 
 /// called when the HTTP client or some external force closed our socket
 void
-Mgr::Inquirer::noteCommClosed(const CommCloseCbParams& params)
+Mgr::Inquirer::noteCommClosed(const CommCloseCbParams &)
 {
     debugs(16, 5, HERE);
-    Must(!Comm::IsConnOpen(conn) && params.conn.getRaw() == conn.getRaw());
-    conn = NULL;
+    closer = nullptr;
+    if (conn) {
+        conn->noteClosure();
+        conn = nullptr;
+    }
     mustStop("commClosed");
 }
 
index cdb64351a4eb7e371dd2319608a97c5cf000caf6..f2cc23dc82bb263f256b0ae473449e0cac6f854d 100644 (file)
@@ -131,7 +131,11 @@ void
 Mgr::StoreToCommWriter::noteCommClosed(const CommCloseCbParams &)
 {
     debugs(16, 6, HERE);
-    Must(!Comm::IsConnOpen(clientConnection));
+    if (clientConnection) {
+        clientConnection->noteClosure();
+        clientConnection = nullptr;
+    }
+    closer = nullptr;
     mustStop("commClosed");
 }
 
index 418502ec56ce778594b4e2e52a3887485ab002cb..3637e7776ebd6fbed884fc3c7fb87bb201df1b99 100644 (file)
@@ -107,9 +107,15 @@ Security::PeerConnector::fillChecklist(ACLFilledChecklist &checklist) const
 void
 Security::PeerConnector::commCloseHandler(const CommCloseCbParams &params)
 {
+    debugs(83, 5, "FD " << params.fd << ", Security::PeerConnector=" << params.data);
+
     closeHandler = nullptr;
+    if (serverConn) {
+        countFailingConnection();
+        serverConn->noteClosure();
+        serverConn = nullptr;
+    }
 
-    debugs(83, 5, "FD " << params.fd << ", Security::PeerConnector=" << params.data);
     const auto err = new ErrorState(ERR_SECURE_CONNECT_FAIL, Http::scServiceUnavailable, request.getRaw(), al);
     static const auto d = MakeNamedErrorDetail("TLS_CONNECT_CLOSE");
     err->detailError(d);
@@ -129,6 +135,8 @@ Security::PeerConnector::commTimeoutHandler(const CommTimeoutCbParams &)
 bool
 Security::PeerConnector::initialize(Security::SessionPointer &serverSession)
 {
+    Must(Comm::IsConnOpen(serverConnection()));
+
     Security::ContextPointer ctx(getTlsContext());
     debugs(83, 5, serverConnection() << ", ctx=" << (void*)ctx.get());
 
@@ -178,6 +186,8 @@ Security::PeerConnector::initialize(Security::SessionPointer &serverSession)
 void
 Security::PeerConnector::recordNegotiationDetails()
 {
+    Must(Comm::IsConnOpen(serverConnection()));
+
     const int fd = serverConnection()->fd;
     Security::SessionPointer session(fd_table[fd].ssl);
 
@@ -196,6 +206,8 @@ Security::PeerConnector::recordNegotiationDetails()
 void
 Security::PeerConnector::negotiate()
 {
+    Must(Comm::IsConnOpen(serverConnection()));
+
     const int fd = serverConnection()->fd;
     if (fd_table[fd].closing())
         return;
@@ -243,7 +255,7 @@ Security::PeerConnector::handleNegotiationResult(const Security::IoResult &resul
     switch (result.category) {
     case Security::IoResult::ioSuccess:
         recordNegotiationDetails();
-        if (sslFinalized())
+        if (sslFinalized() && callback)
             sendSuccess();
         return; // we may be gone by now
 
@@ -271,6 +283,7 @@ Security::PeerConnector::sslFinalized()
 {
 #if USE_OPENSSL
     if (Ssl::TheConfig.ssl_crt_validator && useCertValidator_) {
+        Must(Comm::IsConnOpen(serverConnection()));
         const int fd = serverConnection()->fd;
         Security::SessionPointer session(fd_table[fd].ssl);
 
@@ -314,6 +327,7 @@ void
 Security::PeerConnector::sslCrtvdHandleReply(Ssl::CertValidationResponse::Pointer validationResponse)
 {
     Must(validationResponse != NULL);
+    Must(Comm::IsConnOpen(serverConnection()));
 
     ErrorDetail::Pointer errDetails;
     bool validatorFailed = false;
@@ -336,7 +350,8 @@ Security::PeerConnector::sslCrtvdHandleReply(Ssl::CertValidationResponse::Pointe
 
     if (!errDetails && !validatorFailed) {
         noteNegotiationDone(NULL);
-        sendSuccess();
+        if (callback)
+            sendSuccess();
         return;
     }
 
@@ -362,6 +377,8 @@ Security::PeerConnector::sslCrtvdHandleReply(Ssl::CertValidationResponse::Pointe
 Security::CertErrors *
 Security::PeerConnector::sslCrtvdCheckForErrors(Ssl::CertValidationResponse const &resp, ErrorDetail::Pointer &errDetails)
 {
+    Must(Comm::IsConnOpen(serverConnection()));
+
     ACLFilledChecklist *check = NULL;
     Security::SessionPointer session(fd_table[serverConnection()->fd].ssl);
 
@@ -435,9 +452,11 @@ Security::PeerConnector::negotiateSsl()
 void
 Security::PeerConnector::noteWantRead()
 {
-    const int fd = serverConnection()->fd;
     debugs(83, 5, serverConnection());
 
+    Must(Comm::IsConnOpen(serverConnection()));
+    const int fd = serverConnection()->fd;
+
     // read timeout to avoid getting stuck while reading from a silent server
     typedef CommCbMemFunT<Security::PeerConnector, CommTimeoutCbParams> TimeoutDialer;
     AsyncCall::Pointer timeoutCall = JobCallback(83, 5,
@@ -451,8 +470,10 @@ Security::PeerConnector::noteWantRead()
 void
 Security::PeerConnector::noteWantWrite()
 {
-    const int fd = serverConnection()->fd;
     debugs(83, 5, serverConnection());
+    Must(Comm::IsConnOpen(serverConnection()));
+
+    const int fd = serverConnection()->fd;
     Comm::SetSelect(fd, COMM_SELECT_WRITE, &NegotiateSsl, new Pointer(this), 0);
     return;
 }
@@ -469,57 +490,76 @@ Security::PeerConnector::noteNegotiationError(const Security::ErrorDetailPointer
     bail(anErr);
 }
 
+Security::EncryptorAnswer &
+Security::PeerConnector::answer()
+{
+    assert(callback);
+    const auto dialer = dynamic_cast<CbDialer*>(callback->getDialer());
+    assert(dialer);
+    return dialer->answer();
+}
+
 void
 Security::PeerConnector::bail(ErrorState *error)
 {
     Must(error); // or the recipient will not know there was a problem
-    Must(callback != NULL);
-    CbDialer *dialer = dynamic_cast<CbDialer*>(callback->getDialer());
-    Must(dialer);
-    dialer->answer().error = error;
+    answer().error = error;
 
-    if (const auto p = serverConnection()->getPeer())
-        peerConnectFailed(p);
+    if (const auto failingConnection = serverConn) {
+        countFailingConnection();
+        disconnect();
+        failingConnection->close();
+    }
 
     callBack();
-    disconnect();
-
-    if (noteFwdPconnUse)
-        fwdPconnPool->noteUses(fd_table[serverConn->fd].pconn.uses);
-    serverConn->close();
-    serverConn = nullptr;
 }
 
 void
 Security::PeerConnector::sendSuccess()
 {
-    callBack();
+    assert(Comm::IsConnOpen(serverConn));
+    answer().conn = serverConn;
     disconnect();
+    callBack();
+}
+
+void
+Security::PeerConnector::countFailingConnection()
+{
+    assert(serverConn);
+    if (const auto p = serverConn->getPeer())
+        peerConnectFailed(p);
+    // TODO: Calling PconnPool::noteUses() should not be our responsibility.
+    if (noteFwdPconnUse && serverConn->isOpen())
+        fwdPconnPool->noteUses(fd_table[serverConn->fd].pconn.uses);
 }
 
 void
 Security::PeerConnector::disconnect()
 {
+    const auto stillOpen = Comm::IsConnOpen(serverConn);
+
     if (closeHandler) {
-        comm_remove_close_handler(serverConnection()->fd, closeHandler);
+        if (stillOpen)
+            comm_remove_close_handler(serverConn->fd, closeHandler);
         closeHandler = nullptr;
     }
 
-    commUnsetConnTimeout(serverConnection());
+    if (stillOpen)
+        commUnsetConnTimeout(serverConn);
+
+    serverConn = nullptr;
 }
 
 void
 Security::PeerConnector::callBack()
 {
-    debugs(83, 5, "TLS setup ended for " << serverConnection());
+    debugs(83, 5, "TLS setup ended for " << answer().conn);
 
     AsyncCall::Pointer cb = callback;
     // Do this now so that if we throw below, swanSong() assert that we _tried_
     // to call back holds.
     callback = NULL; // this should make done() true
-    CbDialer *dialer = dynamic_cast<CbDialer*>(cb->getDialer());
-    Must(dialer);
-    dialer->answer().conn = serverConnection();
     ScheduleCallHere(cb);
 }
 
@@ -528,8 +568,9 @@ Security::PeerConnector::swanSong()
 {
     // XXX: unregister fd-closure monitoring and CommSetSelect interest, if any
     AsyncJob::swanSong();
-    if (callback != NULL) { // paranoid: we have left the caller waiting
-        debugs(83, DBG_IMPORTANT, "BUG: Unexpected state while connecting to a cache_peer or origin server");
+
+    if (callback) {
+        // job-ending emergencies like handleStopRequest() or callException()
         const auto anErr = new ErrorState(ERR_GATEWAY_FAILURE, Http::scInternalServerError, request.getRaw(), al);
         bail(anErr);
         assert(!callback);
@@ -550,7 +591,7 @@ Security::PeerConnector::status() const
         buf.append("Stopped, reason:", 16);
         buf.appendf("%s",stopReason);
     }
-    if (serverConn != NULL)
+    if (Comm::IsConnOpen(serverConn))
         buf.appendf(" FD %d", serverConn->fd);
     buf.appendf(" %s%u]", id.prefix(), id.value);
     buf.terminate();
@@ -598,15 +639,18 @@ Security::PeerConnector::startCertDownloading(SBuf &url)
                                       PeerConnectorCertDownloaderDialer(&Security::PeerConnector::certDownloadingDone, this));
 
     const auto dl = new Downloader(url, certCallback, XactionInitiator::initCertFetcher, certDownloadNestingLevel() + 1);
-    AsyncJob::Start(dl);
+    certDownloadWait.start(dl, certCallback);
 }
 
 void
 Security::PeerConnector::certDownloadingDone(SBuf &obj, int downloadStatus)
 {
+    certDownloadWait.finish();
+
     ++certsDownloads;
     debugs(81, 5, "Certificate downloading status: " << downloadStatus << " certificate size: " << obj.length());
 
+    Must(Comm::IsConnOpen(serverConnection()));
     const auto &sconn = *fd_table[serverConnection()->fd].ssl;
 
     // Parse Certificate. Assume that it is in DER format.
@@ -659,6 +703,7 @@ Security::PeerConnector::certDownloadingDone(SBuf &obj, int downloadStatus)
 void
 Security::PeerConnector::handleMissingCertificates(const Security::IoResult &ioResult)
 {
+    Must(Comm::IsConnOpen(serverConnection()));
     auto &sconn = *fd_table[serverConnection()->fd].ssl;
 
     // We download the missing certificate(s) once. We would prefer to clear
index fb8e00b1ff2095b2dc6d82d64450ee063aeb91a0..e9e61db1290dcb293b77b1e085d264f3d87247d5 100644 (file)
@@ -13,6 +13,7 @@
 #include "acl/ChecklistFiller.h"
 #include "base/AsyncCbdataCalls.h"
 #include "base/AsyncJob.h"
+#include "base/JobWait.h"
 #include "CommCalls.h"
 #include "http/forward.h"
 #include "security/EncryptorAnswer.h"
@@ -26,6 +27,7 @@
 #include <queue>
 
 class ErrorState;
+class Downloader;
 class AccessLogEntry;
 typedef RefCount<AccessLogEntry> AccessLogEntryPointer;
 
@@ -157,6 +159,9 @@ protected:
     /// a bail(), sendSuccess() helper: stops monitoring the connection
     void disconnect();
 
+    /// updates connection usage history before the connection is closed
+    void countFailingConnection();
+
     /// If called the certificates validator will not used
     void bypassCertValidator() {useCertValidator_ = false;}
 
@@ -164,6 +169,9 @@ protected:
     /// logging
     void recordNegotiationDetails();
 
+    /// convenience method to get to the answer fields
+    EncryptorAnswer &answer();
+
     HttpRequestPointer request; ///< peer connection trigger or cause
     Comm::ConnectionPointer serverConn; ///< TCP connection to the peer
     AccessLogEntryPointer al; ///< info for the future access.log entry
@@ -211,6 +219,8 @@ private:
 
     /// outcome of the last (failed and) suspended negotiation attempt (or nil)
     Security::IoResultPointer suspendedError_;
+
+    JobWait<Downloader> certDownloadWait; ///< waits for the missing certificate to be downloaded
 };
 
 } // namespace Security
index 53e1cc4fa329842f39dea9ef1d91a825f7935087..7cf1c5eb5a285e584a98d7351d3330f7f9ca9a10 100644 (file)
@@ -175,6 +175,7 @@ class ParsedOptions {}; // we never parse/use TLS options in this case
 typedef long ParsedPortFlags;
 
 class PeerConnector;
+class BlindPeerConnector;
 class PeerOptions;
 
 #if USE_OPENSSL
index 5203d8354eac40b855bee3ce268dd04790c0fbea..b64967bdc05ca9e6546d5b87b6df18c57ac031cc 100644 (file)
@@ -61,7 +61,7 @@ Ftp::Server::Server(const MasterXaction::Pointer &xact):
     dataConn(),
     uploadAvailSize(0),
     listener(),
-    connector(),
+    dataConnWait(),
     reader(),
     waitingForOrigin(false),
     originDataDownloadAbortedOnError(false)
@@ -1671,11 +1671,11 @@ Ftp::Server::checkDataConnPre()
 
     // active transfer: open a data connection from Squid to client
     typedef CommCbMemFunT<Server, CommConnectCbParams> Dialer;
-    connector = JobCallback(17, 3, Dialer, this, Ftp::Server::connectedForData);
-    Comm::ConnOpener *cs = new Comm::ConnOpener(dataConn, connector,
-            Config.Timeout.connect);
-    AsyncJob::Start(cs);
-    return false; // ConnStateData::processFtpRequest waits handleConnectDone
+    AsyncCall::Pointer callback = JobCallback(17, 3, Dialer, this, Ftp::Server::connectedForData);
+    const auto cs = new Comm::ConnOpener(dataConn->cloneProfile(), callback,
+                                         Config.Timeout.connect);
+    dataConnWait.start(cs, callback);
+    return false;
 }
 
 /// Check that client data connection is ready for immediate I/O.
@@ -1693,18 +1693,22 @@ Ftp::Server::checkDataConnPost() const
 void
 Ftp::Server::connectedForData(const CommConnectCbParams &params)
 {
-    connector = NULL;
+    dataConnWait.finish();
 
     if (params.flag != Comm::OK) {
-        /* it might have been a timeout with a partially open link */
-        if (params.conn != NULL)
-            params.conn->close();
         setReply(425, "Cannot open data connection.");
         Http::StreamPointer context = pipeline.front();
         Must(context->http);
         Must(context->http->storeEntry() != NULL);
+        // TODO: call closeDataConnection() to reset data conn processing?
     } else {
-        Must(dataConn == params.conn);
+        // Finalize the details and start owning the supplied connection.
+        assert(params.conn);
+        assert(dataConn);
+        assert(!dataConn->isOpen());
+        dataConn = params.conn;
+        // XXX: Missing comm_add_close_handler() to track external closures.
+
         Must(Comm::IsConnOpen(params.conn));
         fd_note(params.conn->fd, "active client ftp data");
     }
index 23812d1f1896ff52a8b15ba09b8991f966421b4e..bcc3b1746a6a3a04c359c17b7438756039801d3f 100644 (file)
 #ifndef SQUID_SERVERS_FTP_SERVER_H
 #define SQUID_SERVERS_FTP_SERVER_H
 
+#include "base/JobWait.h"
 #include "base/Lock.h"
 #include "client_side.h"
+#include "comm/forward.h"
 
 namespace Ftp
 {
@@ -188,7 +190,11 @@ private:
     size_t uploadAvailSize; ///< number of yet unused uploadBuf bytes
 
     AsyncCall::Pointer listener; ///< set when we are passively listening
-    AsyncCall::Pointer connector; ///< set when we are actively connecting
+
+    /// Waits for an FTP data connection to the client to be established/opened.
+    /// This wait only happens in FTP active mode (via PORT or EPRT).
+    JobWait<Comm::ConnOpener> dataConnWait;
+
     AsyncCall::Pointer reader; ///< set when we are reading FTP data
 
     /// whether we wait for the origin data transfer to end
index a7a6a17648e32fa986e900ab5e6b6f05f3c8b147..c120976f965661443d30a2e1b6495b920928e9a6 100644 (file)
@@ -53,6 +53,7 @@ Snmp::Forwarder::noteCommClosed(const CommCloseCbParams& params)
 {
     debugs(49, 5, HERE);
     Must(fd == params.fd);
+    closer = nullptr;
     fd = -1;
     mustStop("commClosed");
 }
@@ -68,8 +69,7 @@ void
 Snmp::Forwarder::handleException(const std::exception& e)
 {
     debugs(49, 3, HERE << e.what());
-    if (fd >= 0)
-        sendError(SNMP_ERR_GENERR);
+    sendError(SNMP_ERR_GENERR);
     Ipc::Forwarder::handleException(e);
 }
 
@@ -78,6 +78,10 @@ void
 Snmp::Forwarder::sendError(int error)
 {
     debugs(49, 3, HERE);
+
+    if (fd < 0)
+        return; // client gone
+
     Snmp::Request& req = static_cast<Snmp::Request&>(*request);
     req.pdu.command = SNMP_PDU_RESPONSE;
     req.pdu.errstat = error;
index 43cdf05dab0d536111dc6925903a1232c10abd9b..a9d93bca98f420466712c506f7f9e5b20b8cdd8d 100644 (file)
@@ -88,7 +88,11 @@ Snmp::Inquirer::noteCommClosed(const CommCloseCbParams& params)
 {
     debugs(49, 5, HERE);
     Must(!Comm::IsConnOpen(conn) || conn->fd == params.conn->fd);
-    conn = NULL;
+    closer = nullptr;
+    if (conn) {
+        conn->noteClosure();
+        conn = nullptr;
+    }
     mustStop("commClosed");
 }
 
@@ -102,6 +106,10 @@ void
 Snmp::Inquirer::sendResponse()
 {
     debugs(49, 5, HERE);
+
+    if (!Comm::IsConnOpen(conn))
+        return; // client gone
+
     aggrPdu.fixAggregate();
     aggrPdu.command = SNMP_PDU_RESPONSE;
     u_char buffer[SNMP_REQUEST_SIZE];
index 39d032810b1ccd70d42dfb76436ab6f971fb7fb4..4df73bac0f8142d8d5a938e370693fb2d6d54207 100644 (file)
 CBDATA_NAMESPACED_CLASS_INIT(Ssl, PeekingPeerConnector);
 
 void
-Ssl::PeekingPeerConnector::cbCheckForPeekAndSpliceDone(Acl::Answer answer, void *data)
+Ssl::PeekingPeerConnector::cbCheckForPeekAndSpliceDone(const Acl::Answer aclAnswer, void *data)
 {
     Ssl::PeekingPeerConnector *peerConnect = (Ssl::PeekingPeerConnector *) data;
     // Use job calls to add done() checks and other job logic/protections.
-    CallJobHere1(83, 7, CbcPointer<PeekingPeerConnector>(peerConnect), Ssl::PeekingPeerConnector, checkForPeekAndSpliceDone, answer);
+    CallJobHere1(83, 7, CbcPointer<PeekingPeerConnector>(peerConnect), Ssl::PeekingPeerConnector, checkForPeekAndSpliceDone, aclAnswer);
 }
 
 void
-Ssl::PeekingPeerConnector::checkForPeekAndSpliceDone(Acl::Answer answer)
+Ssl::PeekingPeerConnector::checkForPeekAndSpliceDone(const Acl::Answer aclAnswer)
 {
-    const Ssl::BumpMode finalAction = answer.allowed() ?
-                                      static_cast<Ssl::BumpMode>(answer.kind):
+    const Ssl::BumpMode finalAction = aclAnswer.allowed() ?
+                                      static_cast<Ssl::BumpMode>(aclAnswer.kind):
                                       checkForPeekAndSpliceGuess();
     checkForPeekAndSpliceMatched(finalAction);
 }
@@ -105,10 +105,8 @@ Ssl::PeekingPeerConnector::checkForPeekAndSpliceMatched(const Ssl::BumpMode acti
         splice = true;
         // Ssl Negotiation stops here. Last SSL checks for valid certificates
         // and if done, switch to tunnel mode
-        if (sslFinalized()) {
-            debugs(83,5, "Abort NegotiateSSL on FD " << serverConn->fd << " and splice the connection");
+        if (sslFinalized() && callback)
             callBack();
-        }
     }
 }
 
@@ -271,8 +269,11 @@ Ssl::PeekingPeerConnector::startTunneling()
     auto b = SSL_get_rbio(session.get());
     auto srvBio = static_cast<Ssl::ServerBio*>(BIO_get_data(b));
 
+    debugs(83, 5, "will tunnel instead of negotiating TLS");
     switchToTunnel(request.getRaw(), clientConn, serverConn, srvBio->rBufData());
-    tunnelInsteadOfNegotiating();
+    answer().tunneled = true;
+    disconnect();
+    callBack();
 }
 
 void
@@ -396,13 +397,3 @@ Ssl::PeekingPeerConnector::serverCertificateVerified()
     }
 }
 
-void
-Ssl::PeekingPeerConnector::tunnelInsteadOfNegotiating()
-{
-    Must(callback != NULL);
-    CbDialer *dialer = dynamic_cast<CbDialer*>(callback->getDialer());
-    Must(dialer);
-    dialer->answer().tunneled = true;
-    debugs(83, 5, "The SSL negotiation with server aborted");
-}
-
index 706828fe97bf85e3be745ecd961e75d80b255a7d..388c49772ad8e299c9f45eb088f10d60c9337833 100644 (file)
@@ -51,7 +51,7 @@ public:
     void checkForPeekAndSplice();
 
     /// Callback function for ssl_bump acl check in step3  SSL bump step.
-    void checkForPeekAndSpliceDone(Acl::Answer answer);
+    void checkForPeekAndSpliceDone(Acl::Answer);
 
     /// Handles the final bumping decision.
     void checkForPeekAndSpliceMatched(const Ssl::BumpMode finalMode);
@@ -67,7 +67,7 @@ public:
     void startTunneling();
 
     /// A wrapper function for checkForPeekAndSpliceDone for use with acl
-    static void cbCheckForPeekAndSpliceDone(Acl::Answer answer, void *data);
+    static void cbCheckForPeekAndSpliceDone(Acl::Answer, void *data);
 
 private:
 
index b73f0ec59af46a9ee0ab47f2b36fffa4dd4cfe84..555cb77d79afebf9fd760b39e20182565cf259a9 100644 (file)
@@ -22,9 +22,9 @@ void Comm::AcceptLimiter::kick() STUB
 #include "comm/Connection.h"
 Comm::Connection::Connection() STUB
 Comm::Connection::~Connection() STUB
-Comm::ConnectionPointer Comm::Connection::cloneIdentDetails() const STUB_RETVAL(nullptr)
-Comm::ConnectionPointer Comm::Connection::cloneDestinationDetails() const STUB_RETVAL(nullptr)
+Comm::ConnectionPointer Comm::Connection::cloneProfile() const STUB_RETVAL(nullptr)
 void Comm::Connection::close() STUB
+void Comm::Connection::noteClosure() STUB
 CachePeer * Comm::Connection::getPeer() const STUB_RETVAL(NULL)
 void Comm::Connection::setPeer(CachePeer *) STUB
 ScopedId Comm::Connection::codeContextGist() const STUB_RETVAL(id.detach())
index 6e89a4a6fe8e47d2c8636b447455d888232dca63..79bd8c71b20463b2bfffb6997fbc4e4162d275c2 100644 (file)
@@ -9,6 +9,7 @@
 #include "squid.h"
 #include "AccessLogEntry.h"
 #include "comm/Connection.h"
+#include "Downloader.h"
 #include "HttpRequest.h"
 
 #define STUB_API "security/libsecurity.la"
@@ -92,7 +93,9 @@ void PeerConnector::bail(ErrorState *) STUB
 void PeerConnector::sendSuccess() STUB
 void PeerConnector::callBack() STUB
 void PeerConnector::disconnect() STUB
+void PeerConnector::countFailingConnection() STUB
 void PeerConnector::recordNegotiationDetails() STUB
+EncryptorAnswer &PeerConnector::answer() STUB_RETREF(EncryptorAnswer)
 }
 
 #include "security/PeerOptions.h"
index 325f33894c50965ac21e4b61f46c148223f236d9..d59b9a02fa2017632bedace7a24d90cae74fde70 100644 (file)
@@ -11,6 +11,7 @@
 #include "squid.h"
 #include "acl/FilledChecklist.h"
 #include "base/CbcPointer.h"
+#include "base/JobWait.h"
 #include "CachePeer.h"
 #include "cbdata.h"
 #include "client_side.h"
@@ -181,9 +182,6 @@ public:
     SBuf preReadClientData;
     SBuf preReadServerData;
     time_t startTime; ///< object creation time, before any peer selection/connection attempts
-    /// Whether we are waiting for the CONNECT request/response exchange with the peer.
-    bool waitingForConnectExchange;
-    HappyConnOpenerPointer connOpener; ///< current connection opening job
     ResolvedPeersPointer destinations; ///< paths for forwarding the request
     bool destinationsFound; ///< At least one candidate path found
     /// whether another destination may be still attempted if the TCP connection
@@ -192,10 +190,15 @@ public:
     // TODO: remove after fixing deferred reads in TunnelStateData::copyRead()
     CodeContext::Pointer codeContext; ///< our creator context
 
-    // AsyncCalls which we set and may need cancelling.
-    struct {
-        AsyncCall::Pointer connector;  ///< a call linking us to the ConnOpener producing serverConn.
-    } calls;
+    /// waits for a transport connection to the peer to be established/opened
+    JobWait<HappyConnOpener> transportWait;
+
+    /// waits for the established transport connection to be secured/encrypted
+    JobWait<Security::PeerConnector> encryptionWait;
+
+    /// waits for an HTTP CONNECT tunnel through a cache_peer to be negotiated
+    /// over the (encrypted, if needed) transport connection to that cache_peer
+    JobWait<Http::Tunneler> peerWait;
 
     void copyRead(Connection &from, IOCB *completion);
 
@@ -213,12 +216,6 @@ public:
     /// when all candidate destinations have been tried and all have failed
     void noteConnection(HappyConnOpenerAnswer &);
 
-    /// whether we are waiting for HappyConnOpener
-    /// same as calls.connector but may differ from connOpener.valid()
-    bool opening() const { return connOpener.set(); }
-
-    void cancelOpening(const char *reason);
-
     /// Start using an established connection
     void connectDone(const Comm::ConnectionPointer &conn, const char *origin, const bool reused);
 
@@ -267,6 +264,9 @@ private:
 
     /// \returns whether the request should be retried (nil) or the description why it should not
     const char *checkRetry();
+    /// whether the successfully selected path destination or the established
+    /// server connection is still in use
+    bool usingDestination() const;
 
     /// details of the "last tunneling attempt" failure (if it failed)
     ErrorState *savedError = nullptr;
@@ -276,6 +276,8 @@ private:
 
     void deleteThis();
 
+    void cancelStep(const char *reason);
+
 public:
     bool keepGoingAfterRead(size_t len, Comm::Flag errcode, int xerrno, Connection &from, Connection &to);
     void copy(size_t len, Connection &from, Connection &to, IOCB *);
@@ -356,7 +358,6 @@ TunnelStateData::deleteThis()
 
 TunnelStateData::TunnelStateData(ClientHttpRequest *clientRequest) :
     startTime(squid_curtime),
-    waitingForConnectExchange(false),
     destinations(new ResolvedPeers()),
     destinationsFound(false),
     retriable(true),
@@ -389,8 +390,7 @@ TunnelStateData::~TunnelStateData()
     debugs(26, 3, "TunnelStateData destructed this=" << this);
     assert(noConnections());
     xfree(url);
-    if (opening())
-        cancelOpening("~TunnelStateData");
+    cancelStep("~TunnelStateData");
     delete savedError;
 }
 
@@ -906,7 +906,10 @@ TunnelStateData::copyServerBytes()
 static void
 tunnelStartShoveling(TunnelStateData *tunnelState)
 {
-    assert(!tunnelState->waitingForConnectExchange);
+    assert(!tunnelState->transportWait);
+    assert(!tunnelState->encryptionWait);
+    assert(!tunnelState->peerWait);
+
     assert(tunnelState->server.conn);
     AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "tunnelTimeout",
                                      CommTimeoutCbPtrFun(tunnelTimeout, tunnelState));
@@ -964,6 +967,7 @@ tunnelConnectedWriteDone(const Comm::ConnectionPointer &conn, char *, size_t len
 void
 TunnelStateData::tunnelEstablishmentDone(Http::TunnelerAnswer &answer)
 {
+    peerWait.finish();
     server.len = 0;
 
     if (logTag_ptr)
@@ -972,13 +976,11 @@ TunnelStateData::tunnelEstablishmentDone(Http::TunnelerAnswer &answer)
     if (answer.peerResponseStatus != Http::scNone)
         *status_ptr = answer.peerResponseStatus;
 
-    waitingForConnectExchange = false;
-
     auto sawProblem = false;
 
     if (!answer.positive()) {
         sawProblem = true;
-        Must(!Comm::IsConnOpen(answer.conn));
+        assert(!answer.conn);
     } else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) {
         sawProblem = true;
         closePendingConnection(answer.conn, "conn was closed while waiting for tunnelEstablishmentDone");
@@ -1044,8 +1046,7 @@ tunnelErrorComplete(int fd/*const Comm::ConnectionPointer &*/, void *data, size_
 void
 TunnelStateData::noteConnection(HappyConnOpener::Answer &answer)
 {
-    calls.connector = nullptr;
-    connOpener.clear();
+    transportWait.finish();
 
     ErrorState *error = nullptr;
     if ((error = answer.error.get())) {
@@ -1169,7 +1170,7 @@ TunnelStateData::secureConnectionToPeer(const Comm::ConnectionPointer &conn)
     AsyncCall::Pointer callback = asyncCall(5,4, "TunnelStateData::noteSecurityPeerConnectorAnswer",
                                             MyAnswerDialer(&TunnelStateData::noteSecurityPeerConnectorAnswer, this));
     const auto connector = new Security::BlindPeerConnector(request, conn, callback, al);
-    AsyncJob::Start(connector); // will call our callback
+    encryptionWait.start(connector, callback);
 }
 
 /// starts a preparation step for an established connection; retries on failures
@@ -1196,9 +1197,12 @@ TunnelStateData::advanceDestination(const char *stepDescription, const Comm::Con
 void
 TunnelStateData::noteSecurityPeerConnectorAnswer(Security::EncryptorAnswer &answer)
 {
+    encryptionWait.finish();
+
     ErrorState *error = nullptr;
+    assert(!answer.tunneled);
     if ((error = answer.error.get())) {
-        Must(!Comm::IsConnOpen(answer.conn));
+        assert(!answer.conn);
         answer.error.clear();
     } else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) {
         error = new ErrorState(ERR_CANNOT_FORWARD, Http::scServiceUnavailable, request.getRaw(), al);
@@ -1225,8 +1229,6 @@ TunnelStateData::connectedToPeer(const Comm::ConnectionPointer &conn)
 void
 TunnelStateData::establishTunnelThruProxy(const Comm::ConnectionPointer &conn)
 {
-    assert(!waitingForConnectExchange);
-
     AsyncCall::Pointer callback = asyncCall(5,4,
                                             "TunnelStateData::tunnelEstablishmentDone",
                                             Http::Tunneler::CbDialer<TunnelStateData>(&TunnelStateData::tunnelEstablishmentDone, this));
@@ -1234,9 +1236,7 @@ TunnelStateData::establishTunnelThruProxy(const Comm::ConnectionPointer &conn)
 #if USE_DELAY_POOLS
     tunneler->setDelayId(server.delayId);
 #endif
-    AsyncJob::Start(tunneler);
-    waitingForConnectExchange = true;
-    // and wait for the tunnelEstablishmentDone() call
+    peerWait.start(tunneler, callback);
 }
 
 void
@@ -1254,14 +1254,14 @@ TunnelStateData::noteDestination(Comm::ConnectionPointer path)
 
     destinations->addPath(path);
 
-    if (Comm::IsConnOpen(server.conn)) {
+    if (usingDestination()) {
         // We are already using a previously opened connection but also
         // receiving destinations in case we need to re-forward.
-        Must(!opening());
+        Must(!transportWait);
         return;
     }
 
-    if (opening()) {
+    if (transportWait) {
         notifyConnOpener();
         return; // and continue to wait for tunnelConnectDone() callback
     }
@@ -1291,17 +1291,23 @@ TunnelStateData::noteDestinationsEnd(ErrorState *selectionError)
     // if all of them fail, tunneling as whole will fail
     Must(!selectionError); // finding at least one path means selection succeeded
 
-    if (Comm::IsConnOpen(server.conn)) {
+    if (usingDestination()) {
         // We are already using a previously opened connection but also
         // receiving destinations in case we need to re-forward.
-        Must(!opening());
+        Must(!transportWait);
         return;
     }
 
-    Must(opening()); // or we would be stuck with nothing to do or wait for
+    Must(transportWait); // or we would be stuck with nothing to do or wait for
     notifyConnOpener();
 }
 
+bool
+TunnelStateData::usingDestination() const
+{
+    return encryptionWait || peerWait || Comm::IsConnOpen(server.conn);
+}
+
 /// remembers an error to be used if there will be no more connection attempts
 void
 TunnelStateData::saveError(ErrorState *error)
@@ -1322,8 +1328,7 @@ TunnelStateData::sendError(ErrorState *finalError, const char *reason)
     if (request)
         request->hier.stopPeerClock(false);
 
-    if (opening())
-        cancelOpening(reason);
+    cancelStep(reason);
 
     assert(finalError);
 
@@ -1341,18 +1346,15 @@ TunnelStateData::sendError(ErrorState *finalError, const char *reason)
     errorSend(client.conn, finalError);
 }
 
-/// Notify connOpener that we no longer need connections. We do not have to do
-/// this -- connOpener would eventually notice on its own, but notifying reduces
-/// waste and speeds up spare connection opening for other transactions (that
-/// could otherwise wait for this transaction to use its spare allowance).
+/// Notify a pending subtask, if any, that we no longer need its help. We do not
+/// have to do this -- the subtask job will eventually end -- but ending it
+/// earlier reduces waste and may reduce DoS attack surface.
 void
-TunnelStateData::cancelOpening(const char *reason)
+TunnelStateData::cancelStep(const char *reason)
 {
-    assert(calls.connector);
-    calls.connector->cancel(reason);
-    calls.connector = nullptr;
-    notifyConnOpener();
-    connOpener.clear();
+    transportWait.cancel(reason);
+    encryptionWait.cancel(reason);
+    peerWait.cancel(reason);
 }
 
 void
@@ -1362,15 +1364,14 @@ TunnelStateData::startConnecting()
         request->hier.startPeerClock();
 
     assert(!destinations->empty());
-    assert(!opening());
-    calls.connector = asyncCall(17, 5, "TunnelStateData::noteConnection", HappyConnOpener::CbDialer<TunnelStateData>(&TunnelStateData::noteConnection, this));
-    const auto cs = new HappyConnOpener(destinations, calls.connector, request, startTime, 0, al);
+    assert(!usingDestination());
+    AsyncCall::Pointer callback = asyncCall(17, 5, "TunnelStateData::noteConnection", HappyConnOpener::CbDialer<TunnelStateData>(&TunnelStateData::noteConnection, this));
+    const auto cs = new HappyConnOpener(destinations, callback, request, startTime, 0, al);
     cs->setHost(request->url.host());
     cs->setRetriable(false);
     cs->allowPersistent(false);
     destinations->notificationPending = true; // start() is async
-    connOpener = cs;
-    AsyncJob::Start(cs);
+    transportWait.start(cs, callback);
 }
 
 /// send request on an existing connection dedicated to the requesting client
@@ -1419,7 +1420,7 @@ TunnelStateData::Connection::setDelayId(DelayId const &newDelay)
 
 #endif
 
-/// makes sure connOpener knows that destinations have changed
+/// makes sure connection opener knows that the destinations have changed
 void
 TunnelStateData::notifyConnOpener()
 {
@@ -1427,7 +1428,7 @@ TunnelStateData::notifyConnOpener()
         debugs(17, 7, "reusing pending notification");
     } else {
         destinations->notificationPending = true;
-        CallJobHere(17, 5, connOpener, HappyConnOpener, noteCandidatesChange);
+        CallJobHere(17, 5, transportWait.job(), HappyConnOpener, noteCandidatesChange);
     }
 }
 
index 7804a39a6cbf9ad110dc45e6e15f135d76c39ce2..0fd47be5a57e987da3a02196e7db16bd94844509 100644 (file)
@@ -180,6 +180,7 @@ whoisClose(const CommCloseCbParams &params)
 {
     WhoisState *p = (WhoisState *)params.data;
     debugs(75, 3, "whoisClose: FD " << params.fd);
+    // We do not own a Connection. Assume that FwdState is also monitoring.
     p->entry->unlock("whoisClose");
     delete p;
 }