From 1332f8d606485b5a2f57277634c2f6f7855bd9a6 Mon Sep 17 00:00:00 2001 From: Alex Rousskov Date: Tue, 8 Feb 2022 03:56:43 -0500 Subject: [PATCH] Bug 5055: FATAL FwdState::noteDestinationsEnd exception: opening (#967) * Bug 5055: FATAL FwdState::noteDestinationsEnd exception: opening The bug was caused by commit 25b0ce4. Other known symptoms are: assertion failed: store.cc:1793: "isEmpty()" assertion failed: FwdState.cc:501: "serverConnection() == conn" assertion failed: FwdState.cc:1037: "!opening()" This change has several overlapping parts. Unfortunately, merging individual parts is both difficult and likely to cause crashes. ## Part 1: Bug 5055. FwdState used to check serverConn to decide whether to open a connection to forward the request. Since commit 25b0ce4, a nil serverConn pointer no longer implies that a new connection should be opened: FwdState helper jobs may already be working on preparing an existing open connection (e.g., sending a CONNECT request or negotiating encryption). Bad serverConn checks in both FwdState::noteDestination() and FwdState::noteDestinationsEnd() methods led to extra connectStart() calls creating two conflicting concurrent helper jobs. To fix this, we replaced direct serverConn inspection with a usingDestination() call which also checks whether we are waiting for a helper job. Testing that fix exposed another set of bugs: The helper job pointers or in-job connections left stale/set after forwarding failures. The changes described below addressed (most of) those problems. ## Part 2: Connection establishing helper jobs and their callbacks A proper fix for Bug 5055 required answering a difficult question: When should a dying job call its callbacks? We only found one answer which required cooperation from the job creator and led to the following rules: * AsyncJob destructors must not call any callbacks. * AsyncJob::swanSong() is responsible for async-calling any remaining (i.e. set, uncalled, and uncancelled) callbacks. * AsyncJob::swanSong() is called (only) for started jobs. * AsyncJob destructing sequence should validate that no callbacks remain uncalled for started jobs. ... where an AsyncJob x is considered "started" if AsyncJob::Start(x) has returned without throwing. A new JobWait class helps job creators follow these rules while keeping track on in-progress helper jobs and killing no-longer-needed helpers. Also fixed very similar bugs in tunnel.cc code. ## Part 3: ConnOpener fixes 1. Many ConnOpener users are written to keep a ConnectionPointer to the destination given to ConnOpener. This means that their connection magically opens when ConnOpener successfully connects, before ConnOpener has a chance to notify the user about the changes. Having multiple concurrent connection owners is always dangerous, and the user cannot even have a close handler registered for its now-open connection. When something happens to ConnOpener or its answer, the user job may be in trouble. Now, ConnOpener callers no longer pass Connection objects they own, cloning them as needed. That adjustment required adjustment 2: 2. Refactored ConnOpener users to stop assuming that the answer contains a pointer to their connection object. After adjustment 1 above, it does not. HappyConnOpener relied on that assumption quite a bit so we had to refactor to use two custom callback methods instead of one with a complicated if-statement distinguishing prime from spare attempts. This refactoring is an overall improvement because it simplifies the code. Other ConnOpener users just needed to remove a few no longer valid paranoid assertions/Musts. 3. ConnOpener users were forced to remember to close params.conn when processing negative answers. Some, naturally, forgot, triggering warnings about orphaned connections (e.g., Ident and TcpLogger). ConnOpener now closes its open connection before sending a negative answer. 4. ConnOpener would trigger orphan connection warnings if the job ended after opening the connection but without supplying the connection to the requestor (e.g., because the requestor has gone away). Now, ConnOpener explicitly closes its open connection if it has not been sent to the requestor. Also fixed Comm::ConnOpener::cleanFd() debugging that was incorrectly saying that the method closes the temporary descriptor. Also fixed ConnOpener callback's syncWithComm(): The stale CommConnectCbParams override was testing unused (i.e. always negative) CommConnectCbParams::fd and was trying to cancel the callback that most (possibly all) recipients rely on: ConnOpener users expect a negative answer rather than no answer at all. Also, after comparing the needs of two old/existing and a temporary added ("clone everything") Connection cloning method callers, we decided there is no need to maintain three different methods. All existing callers should be fine with a single method because none of them suffers from "extra" copying of members that others need. Right now, the new cloneProfile() method copies everything except FD and a few special-purpose members (with documented reasons for not copying). Also added Comm::Connection::cloneDestinationDetails() debugging to simplify tracking dependencies between half-baked Connection objects carrying destination/flags/other metadata and open Connection objects created by ConnOpener using that metadata (which are later delivered to ConnOpener users and, in some cases, replace those half-baked connections mentioned earlier. Long-term, we need to find a better way to express these and other Connection states/stages than comments and debugging messages. ## Part 4: Comm::Connection closure callbacks We improved many closure callbacks to make sure (to the extent possible) that Connection and other objects are in sync with Comm. There are lots of small bugs, inconsistencies, and other problems in Connection closure handlers. It is not clear whether any of those problems could result in serious runtime errors or leaks. In theory, the rest of the code could neutralize their negative side effects. However, even in that case, it was just a matter of time before the next bug will bite us due to stale Connection::fd and such. These changes themselves carry elevated risk, but they get us closer to reliable code as far as Connection maintenance is concerned; otherwise, we will keep chasing their deadly side effects. Long-term, all these manual efforts to keep things in sync should become unnecessary with the introduction of appropriate Connection ownership APIs that automatically maintain the corresponding environments (TODO). ## Part 5: Other notable improvements in the adjusted code Improved Security::PeerConnector::serverConn and Http::Tunneler::connection management, especially when sending negative answers. When sending a negative answer, we would set answer().conn to an open connection, async-send that answer, and then hurry to close the connection using our pointer to the shared Connection object. If everything went according to the plan, the recipient would get a non-nil but closed Connection object. Now, a negative answer simply means no connection at all. Same for a tunneled answer. Refactored ICAP connection-establishing code to to delay Connection ownership until the ICAP connection is fully ready. This change addresses primary Connection ownership concerns (as they apply to this ICAP code) except orphaning of the temporary Connection object by helper job start exceptions (now an explicit XXX). For example, the transaction no longer shares a Connection object with ConnOpener and IcapPeerConnector jobs. Probably fixed a bug where PeerConnector::negotiate() assumed that a sslFinalized() does not return true after callBack(). It may (e.g., when CertValidationHelper::Submit() throws). Same for PeekingPeerConnector::checkForPeekAndSpliceMatched(). Fixed FwdState::advanceDestination() bug that did not save ERR_GATEWAY_FAILURE details and "lost" the address of that failed destination, making it unavailable to future retries (if any). Polished PeerPoolMgr, Ident, and Gopher code to be able to fix similar job callback and connection management issues. Polished AsyncJob::Start() API. Start() used to return a job pointer, but that was a bad idea: * It implies that a failed Start() will return a nil pointer, and that the caller should check the result. Neither is true. * It encourages callers to dereference the returned pointer to further adjust the job. That technically works (today) but violates the rules of communicating with an async job. The Start() method is the boundary after which the job is deemed asynchronous. Also removed old "and wait for..." post-Start() comments because the code itself became clear enough, and those comments were becoming increasingly stale (because they duplicated the callback names above them). Fix Tunneler and PeerConnector handling of last-resort callback requirements. Events like handleStopRequest() and callException() stop the job but should not be reported as a BUG (e.g., it would be up to the callException() to decide how to report the caught exception). There might (or there will) be other, similar cases where the job is stopped prematurely for some non-BUG reason beyond swanSong() knowledge. The existence of non-bug cases does not mean there could be no bugs worth reporting here, but until they can be identified more reliably than all these benign/irrelevant cases, reporting no BUGs is a (much) lesser evil. TODO: Revise AsyncJob::doneAll(). Many of its overrides are written to check for both positive (i.e. mission accomplished) and negative (i.e. mission cancelled or cannot be accomplished) conditions, but the latter is usually unnecessary, especially after we added handleStopRequest() API to properly support external job cancellation events. Many doneAll() overrides can probably be greatly simplified. ---- Cherry picked SQUID-568-premature-serverconn-use-v5 commit 22b5f78. * fixup: Cherry-picked SQUID-568-premature-serverconn-use PR-time fixes In git log order: * e64a6c1: Undone an out-of-scope change and added a missing 'auto' * aeaf83d: Fixed an 'unused parameter' error * f49d009: fixup: No explicit destructors with implicit copying methods * c30c37f: Removed an unnecessary explicit copy constructor * 012f5ec: Excluded Connection::rfc931 from cloning * 366c78a: ICAP: do not set connect_timeout on the established conn... This branch is now in sync with SQUID-568-premature-serverconn-use (S) commit e64a6c1 (except for official changes merged from master/v6 into S closer to the end of PR 877 work (i.e. S' merge commit 0a7432a). * Fix FATAL ServiceRep::putConnection exception: theBusyConns > 0 FATAL: check failed: theBusyConns > 0 exception location: ServiceRep.cc(163) putConnection Since master/v6 commit 2b6b1bc, a timeout on a ready-to-shovel Squid-service ICAP connection was decrementing theBusyConns level one extra time because Adaptation::Icap::Xaction::noteCommTimedout() started calling both noteConnectionFailed() and closeConnection(). Depending on the actual theBusyConns level, the extra decrement could result in FATAL errors later, when putConnection() was called (for a different ICAP transaction) with zero theBusyConns in an exception-unprotected context. Throughout these changes, Xaction still counts the above timeouts as a service failure. That is done by calling ServiceRep::noteFailure() from Xaction::callException(), including in timeout cases described above. ---- Cherry-picked master/v6 commit a8ac892. --- src/BodyPipe.cc | 1 + src/CommCalls.cc | 26 ++++- src/Downloader.cc | 5 + src/FwdState.cc | 130 +++++++++++++-------- src/FwdState.h | 32 +++--- src/HappyConnOpener.cc | 139 ++++++++++++---------- src/HappyConnOpener.h | 22 ++-- src/PeerPoolMgr.cc | 71 +++++------- src/PeerPoolMgr.h | 14 ++- src/ResolvedPeers.cc | 2 +- src/adaptation/icap/ModXact.cc | 3 +- src/adaptation/icap/ModXact.h | 5 +- src/adaptation/icap/OptXact.cc | 2 +- src/adaptation/icap/OptXact.h | 3 +- src/adaptation/icap/ServiceRep.cc | 7 +- src/adaptation/icap/ServiceRep.h | 3 +- src/adaptation/icap/Xaction.cc | 185 +++++++++++++----------------- src/adaptation/icap/Xaction.h | 33 ++++-- src/base/AsyncJob.cc | 16 ++- src/base/AsyncJob.h | 18 ++- src/base/JobWait.cc | 81 +++++++++++++ src/base/JobWait.h | 91 +++++++++++++++ src/base/Makefile.am | 2 + src/base/forward.h | 1 + src/client_side.cc | 4 + src/clients/FtpClient.cc | 15 ++- src/clients/FtpClient.h | 5 +- src/clients/FtpGateway.cc | 14 +-- src/clients/FtpRelay.cc | 6 +- src/clients/HttpTunneler.cc | 58 ++++++---- src/clients/HttpTunneler.h | 7 +- src/clients/forward.h | 4 +- src/comm.cc | 13 +++ src/comm/ConnOpener.cc | 24 +++- src/comm/ConnOpener.h | 7 +- src/comm/Connection.cc | 55 ++++++--- src/comm/Connection.h | 22 +--- src/comm/TcpAcceptor.cc | 5 +- src/dns_internal.cc | 4 + src/eui/Eui48.h | 2 - src/gopher.cc | 28 ++--- src/ident/Ident.cc | 44 +++++-- src/log/TcpLogger.cc | 10 +- src/log/TcpLogger.h | 5 + src/mgr/Forwarder.cc | 6 +- src/mgr/Inquirer.cc | 9 +- src/mgr/StoreToCommWriter.cc | 6 +- src/security/PeerConnector.cc | 101 +++++++++++----- src/security/PeerConnector.h | 10 ++ src/security/forward.h | 1 + src/servers/FtpServer.cc | 26 +++-- src/servers/FtpServer.h | 8 +- src/snmp/Forwarder.cc | 8 +- src/snmp/Inquirer.cc | 10 +- src/ssl/PeekingPeerConnector.cc | 29 ++--- src/ssl/PeekingPeerConnector.h | 4 +- src/tests/stub_libcomm.cc | 4 +- src/tests/stub_libsecurity.cc | 3 + src/tunnel.cc | 109 +++++++++--------- src/whois.cc | 1 + 60 files changed, 991 insertions(+), 568 deletions(-) create mode 100644 src/base/JobWait.cc create mode 100644 src/base/JobWait.h diff --git a/src/BodyPipe.cc b/src/BodyPipe.cc index 9abe7d48bc..37a3b32cd4 100644 --- a/src/BodyPipe.cc +++ b/src/BodyPipe.cc @@ -335,6 +335,7 @@ BodyPipe::startAutoConsumptionIfNeeded() return; theConsumer = new BodySink(this); + AsyncJob::Start(theConsumer); debugs(91,7, HERE << "starting auto consumption" << status()); scheduleBodyDataNotification(); } diff --git a/src/CommCalls.cc b/src/CommCalls.cc index 4cd503dcb3..cca23f6e30 100644 --- a/src/CommCalls.cc +++ b/src/CommCalls.cc @@ -53,6 +53,9 @@ CommAcceptCbParams::CommAcceptCbParams(void *aData): { } +// XXX: Add CommAcceptCbParams::syncWithComm(). Adjust syncWithComm() API if all +// implementations always return true. + void CommAcceptCbParams::print(std::ostream &os) const { @@ -72,13 +75,24 @@ CommConnectCbParams::CommConnectCbParams(void *aData): bool CommConnectCbParams::syncWithComm() { - // drop the call if the call was scheduled before comm_close but - // is being fired after comm_close - if (fd >= 0 && fd_table[fd].closing()) { - debugs(5, 3, HERE << "dropping late connect call: FD " << fd); - return false; + assert(conn); + + // change parameters if this is a successful callback that was scheduled + // after its Comm-registered connection started to close + + if (flag != Comm::OK) { + assert(!conn->isOpen()); + return true; // not a successful callback; cannot go out of sync } - return true; // now we are in sync and can handle the call + + assert(conn->isOpen()); + if (!fd_table[conn->fd].closing()) + return true; // no closing in progress; in sync (for now) + + debugs(5, 3, "converting to Comm::ERR_CLOSING: " << conn); + conn->noteClosure(); + flag = Comm::ERR_CLOSING; + return true; // now the callback is in sync with Comm again } /* CommIoCbParams */ diff --git a/src/Downloader.cc b/src/Downloader.cc index 298c3a8366..012387fb8e 100644 --- a/src/Downloader.cc +++ b/src/Downloader.cc @@ -81,6 +81,10 @@ void Downloader::swanSong() { debugs(33, 6, this); + + if (callback_) // job-ending emergencies like handleStopRequest() or callException() + callBack(Http::scInternalServerError); + if (context_) { context_->finished(); context_ = nullptr; @@ -251,6 +255,7 @@ Downloader::downloadFinished() void Downloader::callBack(Http::StatusCode const statusCode) { + assert(callback_); CbDialer *dialer = dynamic_cast(callback_->getDialer()); Must(dialer); dialer->status = statusCode; diff --git a/src/FwdState.cc b/src/FwdState.cc index b721017c3b..b69e60c7cc 100644 --- a/src/FwdState.cc +++ b/src/FwdState.cc @@ -207,26 +207,22 @@ FwdState::stopAndDestroy(const char *reason) { debugs(17, 3, "for " << reason); - if (opening()) - cancelOpening(reason); + cancelStep(reason); PeerSelectionInitiator::subscribed = false; // may already be false self = nullptr; // we hope refcounting destroys us soon; may already be nil /* do not place any code here as this object may be gone by now */ } -/// Notify connOpener that we no longer need connections. We do not have to do -/// this -- connOpener would eventually notice on its own, but notifying reduces -/// waste and speeds up spare connection opening for other transactions (that -/// could otherwise wait for this transaction to use its spare allowance). +/// Notify a pending subtask, if any, that we no longer need its help. We do not +/// have to do this -- the subtask job will eventually end -- but ending it +/// earlier reduces waste and may reduce DoS attack surface. void -FwdState::cancelOpening(const char *reason) +FwdState::cancelStep(const char *reason) { - assert(calls.connector); - calls.connector->cancel(reason); - calls.connector = nullptr; - notifyConnOpener(); - connOpener.clear(); + transportWait.cancel(reason); + encryptionWait.cancel(reason); + peerWait.cancel(reason); } #if STRICT_ORIGINAL_DST @@ -348,8 +344,7 @@ FwdState::~FwdState() entry = NULL; - if (opening()) - cancelOpening("~FwdState"); + cancelStep("~FwdState"); if (Comm::IsConnOpen(serverConn)) closeServerConnection("~FwdState"); @@ -501,8 +496,17 @@ FwdState::fail(ErrorState * errorState) if (!errorState->request) errorState->request = request; - if (err->type != ERR_ZERO_SIZE_OBJECT) - return; + if (err->type == ERR_ZERO_SIZE_OBJECT) + reactToZeroSizeObject(); + + destinationReceipt = nullptr; // may already be nil +} + +/// ERR_ZERO_SIZE_OBJECT requires special adjustments +void +FwdState::reactToZeroSizeObject() +{ + assert(err->type == ERR_ZERO_SIZE_OBJECT); if (pconnRace == racePossible) { debugs(17, 5, HERE << "pconn race happened"); @@ -566,6 +570,8 @@ FwdState::complete() if (Comm::IsConnOpen(serverConn)) unregister(serverConn); + serverConn = nullptr; + destinationReceipt = nullptr; storedWholeReply_ = nullptr; entry->reset(); @@ -584,6 +590,12 @@ FwdState::complete() } } +bool +FwdState::usingDestination() const +{ + return encryptionWait || peerWait || Comm::IsConnOpen(serverConn); +} + void FwdState::markStoredReplyAsWhole(const char * const whyWeAreSure) { @@ -613,19 +625,19 @@ FwdState::noteDestination(Comm::ConnectionPointer path) destinations->addPath(path); - if (Comm::IsConnOpen(serverConn)) { + if (usingDestination()) { // We are already using a previously opened connection, so we cannot be - // waiting for connOpener. We still receive destinations for backup. - Must(!opening()); + // waiting for it. We still receive destinations for backup. + Must(!transportWait); return; } - if (opening()) { + if (transportWait) { notifyConnOpener(); return; // and continue to wait for FwdState::noteConnection() callback } - // This is the first path candidate we have seen. Create connOpener. + // This is the first path candidate we have seen. Use it. useDestinations(); } @@ -653,19 +665,19 @@ FwdState::noteDestinationsEnd(ErrorState *selectionError) // if all of them fail, forwarding as whole will fail Must(!selectionError); // finding at least one path means selection succeeded - if (Comm::IsConnOpen(serverConn)) { + if (usingDestination()) { // We are already using a previously opened connection, so we cannot be - // waiting for connOpener. We were receiving destinations for backup. - Must(!opening()); + // waiting for it. We were receiving destinations for backup. + Must(!transportWait); return; } - Must(opening()); // or we would be stuck with nothing to do or wait for + Must(transportWait); // or we would be stuck with nothing to do or wait for notifyConnOpener(); // and continue to wait for FwdState::noteConnection() callback } -/// makes sure connOpener knows that destinations have changed +/// makes sure connection opener knows that the destinations have changed void FwdState::notifyConnOpener() { @@ -674,7 +686,7 @@ FwdState::notifyConnOpener() } else { debugs(17, 7, "notifying about " << *destinations); destinations->notificationPending = true; - CallJobHere(17, 5, connOpener, HappyConnOpener, noteCandidatesChange); + CallJobHere(17, 5, transportWait.job(), HappyConnOpener, noteCandidatesChange); } } @@ -684,7 +696,7 @@ static void fwdServerClosedWrapper(const CommCloseCbParams ¶ms) { FwdState *fwd = (FwdState *)params.data; - fwd->serverClosed(params.fd); + fwd->serverClosed(); } /**** PRIVATE *****************************************************************/ @@ -754,13 +766,23 @@ FwdState::checkRetriable() } void -FwdState::serverClosed(int fd) +FwdState::serverClosed() { - // XXX: fd is often -1 here - debugs(17, 2, "FD " << fd << " " << entry->url() << " after " << - (fd >= 0 ? fd_table[fd].pconn.uses : -1) << " requests"); - if (fd >= 0 && serverConnection()->fd == fd) - fwdPconnPool->noteUses(fd_table[fd].pconn.uses); + // XXX: This method logic attempts to tolerate Connection::close() called + // for serverConn earlier, by one of our dispatch()ed jobs. If that happens, + // serverConn will already be closed here or, worse, it will already be open + // for the next forwarding attempt. The current code prevents us getting + // stuck, but the long term solution is to stop sharing serverConn. + debugs(17, 2, serverConn); + if (Comm::IsConnOpen(serverConn)) { + const auto uses = fd_table[serverConn->fd].pconn.uses; + debugs(17, 3, "prior uses: " << uses); + fwdPconnPool->noteUses(uses); // XXX: May not have come from fwdPconnPool + serverConn->noteClosure(); + } + serverConn = nullptr; + closeHandler = nullptr; + destinationReceipt = nullptr; retryOrBail(); } @@ -802,6 +824,8 @@ FwdState::handleUnregisteredServerEnd() { debugs(17, 2, HERE << "self=" << self << " err=" << err << ' ' << entry->url()); assert(!Comm::IsConnOpen(serverConn)); + serverConn = nullptr; + destinationReceipt = nullptr; retryOrBail(); } @@ -819,6 +843,8 @@ FwdState::advanceDestination(const char *stepDescription, const Comm::Connection } catch (...) { debugs (17, 2, "exception while trying to " << stepDescription << ": " << CurrentException); closePendingConnection(conn, "connection preparation exception"); + if (!err) + fail(new ErrorState(ERR_GATEWAY_FAILURE, Http::scInternalServerError, request, al)); retryOrBail(); } } @@ -830,8 +856,7 @@ FwdState::noteConnection(HappyConnOpener::Answer &answer) { assert(!destinationReceipt); - calls.connector = nullptr; - connOpener.clear(); + transportWait.finish(); Must(n_tries <= answer.n_tries); // n_tries cannot decrease n_tries = answer.n_tries; @@ -843,6 +868,8 @@ FwdState::noteConnection(HappyConnOpener::Answer &answer) Must(!Comm::IsConnOpen(answer.conn)); answer.error.clear(); // preserve error for errorSendComplete() } else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) { + // The socket could get closed while our callback was queued. Sync + // Connection. XXX: Connection::fd may already be stale/invalid here. // We do not know exactly why the connection got closed, so we play it // safe, allowing retries only for persistent (reused) connections if (answer.reused) { @@ -912,23 +939,24 @@ FwdState::establishTunnelThruProxy(const Comm::ConnectionPointer &conn) if (!conn->getPeer()->options.no_delay) tunneler->setDelayId(entry->mem_obj->mostBytesAllowed()); #endif - AsyncJob::Start(tunneler); - // and wait for the tunnelEstablishmentDone() call + peerWait.start(tunneler, callback); } /// resumes operations after the (possibly failed) HTTP CONNECT exchange void FwdState::tunnelEstablishmentDone(Http::TunnelerAnswer &answer) { + peerWait.finish(); + ErrorState *error = nullptr; if (!answer.positive()) { - Must(!Comm::IsConnOpen(answer.conn)); + Must(!answer.conn); error = answer.squidError.get(); Must(error); answer.squidError.clear(); // preserve error for fail() } else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) { - // The socket could get closed while our callback was queued. - // We close Connection here to sync Connection::fd. + // The socket could get closed while our callback was queued. Sync + // Connection. XXX: Connection::fd may already be stale/invalid here. closePendingConnection(answer.conn, "conn was closed while waiting for tunnelEstablishmentDone"); error = new ErrorState(ERR_CANNOT_FORWARD, Http::scServiceUnavailable, request, al); } else if (!answer.leftovers.isEmpty()) { @@ -998,18 +1026,21 @@ FwdState::secureConnectionToPeer(const Comm::ConnectionPointer &conn) #endif connector = new Security::BlindPeerConnector(requestPointer, conn, callback, al, sslNegotiationTimeout); connector->noteFwdPconnUse = true; - AsyncJob::Start(connector); // will call our callback + encryptionWait.start(connector, callback); } /// called when all negotiations with the TLS-speaking peer have been completed void FwdState::connectedToPeer(Security::EncryptorAnswer &answer) { + encryptionWait.finish(); + ErrorState *error = nullptr; if ((error = answer.error.get())) { - Must(!Comm::IsConnOpen(answer.conn)); + assert(!answer.conn); answer.error.clear(); // preserve error for errorSendComplete() } else if (answer.tunneled) { + assert(!answer.conn); // TODO: When ConnStateData establishes tunnels, its state changes // [in ways that may affect logging?]. Consider informing // ConnStateData about our tunnel or otherwise unifying tunnel @@ -1019,6 +1050,8 @@ FwdState::connectedToPeer(Security::EncryptorAnswer &answer) complete(); // destroys us return; } else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) { + // The socket could get closed while our callback was queued. Sync + // Connection. XXX: Connection::fd may already be stale/invalid here. closePendingConnection(answer.conn, "conn was closed while waiting for connectedToPeer"); error = new ErrorState(ERR_CANNOT_FORWARD, Http::scServiceUnavailable, request, al); } @@ -1091,22 +1124,20 @@ FwdState::connectStart() Must(!request->pinnedConnection()); assert(!destinations->empty()); - assert(!opening()); + assert(!usingDestination()); // Ditch error page if it was created before. // A new one will be created if there's another problem delete err; err = nullptr; request->clearError(); - serverConn = nullptr; - destinationReceipt = nullptr; request->hier.startPeerClock(); - calls.connector = asyncCall(17, 5, "FwdState::noteConnection", HappyConnOpener::CbDialer(&FwdState::noteConnection, this)); + AsyncCall::Pointer callback = asyncCall(17, 5, "FwdState::noteConnection", HappyConnOpener::CbDialer(&FwdState::noteConnection, this)); HttpRequest::Pointer cause = request; - const auto cs = new HappyConnOpener(destinations, calls.connector, cause, start_t, n_tries, al); + const auto cs = new HappyConnOpener(destinations, callback, cause, start_t, n_tries, al); cs->setHost(request->url.host()); bool retriable = checkRetriable(); if (!retriable && Config.accessList.serverPconnForNonretriable) { @@ -1118,8 +1149,7 @@ FwdState::connectStart() cs->setRetriable(retriable); cs->allowPersistent(pconnRace != raceHappened); destinations->notificationPending = true; // start() is async - connOpener = cs; - AsyncJob::Start(cs); + transportWait.start(cs, callback); } /// send request on an existing connection dedicated to the requesting client diff --git a/src/FwdState.h b/src/FwdState.h index de75f33cc3..ebf5f82b15 100644 --- a/src/FwdState.h +++ b/src/FwdState.h @@ -9,13 +9,12 @@ #ifndef SQUID_FORWARD_H #define SQUID_FORWARD_H -#include "base/CbcPointer.h" #include "base/forward.h" +#include "base/JobWait.h" #include "base/RefCount.h" #include "clients/forward.h" #include "comm.h" #include "comm/Connection.h" -#include "comm/ConnOpener.h" #include "error/forward.h" #include "fde.h" #include "http/StatusCode.h" @@ -38,7 +37,6 @@ class ResolvedPeers; typedef RefCount ResolvedPeersPointer; class HappyConnOpener; -typedef CbcPointer HappyConnOpenerPointer; class HappyConnOpenerAnswer; /// Sets initial TOS value and Netfilter for the future outgoing connection. @@ -87,7 +85,7 @@ public: void handleUnregisteredServerEnd(); int reforward(); bool reforwardableStatus(const Http::StatusCode s) const; - void serverClosed(int fd); + void serverClosed(); void connectStart(); void connectDone(const Comm::ConnectionPointer & conn, Comm::Flag status, int xerrno); bool checkRetry(); @@ -115,6 +113,9 @@ private: /* PeerSelectionInitiator API */ virtual void noteDestination(Comm::ConnectionPointer conn) override; virtual void noteDestinationsEnd(ErrorState *selectionError) override; + /// whether the successfully selected path destination or the established + /// server connection is still in use + bool usingDestination() const; void noteConnection(HappyConnOpenerAnswer &); @@ -157,13 +158,10 @@ private: /// \returns the time left for this connection to become connected or 1 second if it is less than one second left time_t connectingTimeout(const Comm::ConnectionPointer &conn) const; - /// whether we are waiting for HappyConnOpener - /// same as calls.connector but may differ from connOpener.valid() - bool opening() const { return connOpener.set(); } - - void cancelOpening(const char *reason); + void cancelStep(const char *reason); void notifyConnOpener(); + void reactToZeroSizeObject(); void updateAleWithFinalError(); @@ -182,11 +180,6 @@ private: time_t start_t; int n_tries; ///< the number of forwarding attempts so far - // AsyncCalls which we set and may need cancelling. - struct { - AsyncCall::Pointer connector; ///< a call linking us to the ConnOpener producing serverConn. - } calls; - struct { bool connected_okay; ///< TCP link ever opened properly. This affects retry of POST,PUT,CONNECT,etc bool dont_retry; @@ -194,7 +187,16 @@ private: bool destinationsFound; ///< at least one candidate path found } flags; - HappyConnOpenerPointer connOpener; ///< current connection opening job + /// waits for a transport connection to the peer to be established/opened + JobWait transportWait; + + /// waits for the established transport connection to be secured/encrypted + JobWait encryptionWait; + + /// waits for an HTTP CONNECT tunnel through a cache_peer to be negotiated + /// over the (encrypted, if needed) transport connection to that cache_peer + JobWait peerWait; + ResolvedPeersPointer destinations; ///< paths for forwarding the request Comm::ConnectionPointer serverConn; ///< a successfully opened connection to a server. PeerConnectionPointer destinationReceipt; ///< peer selection result (or nil) diff --git a/src/HappyConnOpener.cc b/src/HappyConnOpener.cc index 6e08725065..6d83ff14f5 100644 --- a/src/HappyConnOpener.cc +++ b/src/HappyConnOpener.cc @@ -18,6 +18,7 @@ #include "neighbors.h" #include "pconn.h" #include "PeerPoolMgr.h" +#include "sbuf/Stream.h" #include "SquidConfig.h" CBDATA_CLASS_INIT(HappyConnOpener); @@ -330,6 +331,8 @@ HappyConnOpener::HappyConnOpener(const ResolvedPeers::Pointer &dests, const Asyn fwdStart(aFwdStart), callback_(aCall), destinations(dests), + prime(&HappyConnOpener::notePrimeConnectDone, "HappyConnOpener::notePrimeConnectDone"), + spare(&HappyConnOpener::noteSpareConnectDone, "HappyConnOpener::noteSpareConnectDone"), ale(anAle), cause(request), n_tries(tries) @@ -410,33 +413,43 @@ HappyConnOpener::swanSong() AsyncJob::swanSong(); } +/// HappyConnOpener::Attempt printer for debugging +std::ostream & +operator <<(std::ostream &os, const HappyConnOpener::Attempt &attempt) +{ + if (!attempt.path) + os << '-'; + else if (attempt.path->isOpen()) + os << "FD " << attempt.path->fd; + else if (attempt.connWait) + os << attempt.connWait; + else // destination is known; connection closed (and we are not opening any) + os << attempt.path->id; + return os; +} + const char * HappyConnOpener::status() const { - static MemBuf buf; - buf.reset(); + // TODO: In a redesigned status() API, the caller may mimic this approach. + static SBuf buf; + buf.clear(); - buf.append(" [", 2); + SBufStream os(buf); + + os.write(" [", 2); if (stopReason) - buf.appendf("Stopped, reason:%s", stopReason); - if (prime) { - if (prime.path && prime.path->isOpen()) - buf.appendf(" prime FD %d", prime.path->fd); - else if (prime.connector) - buf.appendf(" prime call%ud", prime.connector->id.value); - } - if (spare) { - if (spare.path && spare.path->isOpen()) - buf.appendf(" spare FD %d", spare.path->fd); - else if (spare.connector) - buf.appendf(" spare call%ud", spare.connector->id.value); - } + os << "Stopped:" << stopReason; + if (prime) + os << "prime:" << prime; + if (spare) + os << "spare:" << spare; if (n_tries) - buf.appendf(" tries %d", n_tries); - buf.appendf(" %s%u]", id.prefix(), id.value); - buf.terminate(); + os << " tries:" << n_tries; + os << ' ' << id << ']'; - return buf.content(); + buf = os.buf(); + return buf.c_str(); } /// Create "503 Service Unavailable" or "504 Gateway Timeout" error depending @@ -516,7 +529,7 @@ void HappyConnOpener::startConnecting(Attempt &attempt, PeerConnectionPointer &dest) { Must(!attempt.path); - Must(!attempt.connector); + Must(!attempt.connWait); Must(dest); const auto bumpThroughPeer = cause->flags.sslBumped && dest->getPeer(); @@ -552,51 +565,52 @@ HappyConnOpener::openFreshConnection(Attempt &attempt, PeerConnectionPointer &de entry->mem_obj->checkUrlChecksum(); #endif - GetMarkingsToServer(cause.getRaw(), *dest); + const auto conn = dest->cloneProfile(); + GetMarkingsToServer(cause.getRaw(), *conn); - // ConnOpener modifies its destination argument so we reset the source port - // in case we are reusing the destination already used by our predecessor. - dest->local.port(0); ++n_tries; typedef CommCbMemFunT Dialer; - AsyncCall::Pointer callConnect = JobCallback(48, 5, Dialer, this, HappyConnOpener::connectDone); + AsyncCall::Pointer callConnect = asyncCall(48, 5, attempt.callbackMethodName, + Dialer(this, attempt.callbackMethod)); const time_t connTimeout = dest->connectTimeout(fwdStart); - Comm::ConnOpener *cs = new Comm::ConnOpener(dest, callConnect, connTimeout); - if (!dest->getPeer()) + auto cs = new Comm::ConnOpener(conn, callConnect, connTimeout); + if (!conn->getPeer()) cs->setHost(host_); - attempt.path = dest; - attempt.connector = callConnect; - attempt.opener = cs; + attempt.path = dest; // but not the being-opened conn! + attempt.connWait.start(cs, callConnect); +} - AsyncJob::Start(cs); +/// Comm::ConnOpener callback for the prime connection attempt +void +HappyConnOpener::notePrimeConnectDone(const CommConnectCbParams ¶ms) +{ + handleConnOpenerAnswer(prime, params, "new prime connection"); } -/// called by Comm::ConnOpener objects after a prime or spare connection attempt -/// completes (successfully or not) +/// Comm::ConnOpener callback for the spare connection attempt void -HappyConnOpener::connectDone(const CommConnectCbParams ¶ms) +HappyConnOpener::noteSpareConnectDone(const CommConnectCbParams ¶ms) { - Must(params.conn); - const bool itWasPrime = (params.conn == prime.path); - const bool itWasSpare = (params.conn == spare.path); - Must(itWasPrime != itWasSpare); - - PeerConnectionPointer handledPath; - if (itWasPrime) { - handledPath = prime.path; - prime.finish(); - } else { - handledPath = spare.path; - spare.finish(); - if (gotSpareAllowance) { - TheSpareAllowanceGiver.jobUsedAllowance(); - gotSpareAllowance = false; - } + if (gotSpareAllowance) { + TheSpareAllowanceGiver.jobUsedAllowance(); + gotSpareAllowance = false; } + handleConnOpenerAnswer(spare, params, "new spare connection"); +} + +/// prime/spare-agnostic processing of a Comm::ConnOpener result +void +HappyConnOpener::handleConnOpenerAnswer(Attempt &attempt, const CommConnectCbParams ¶ms, const char *what) +{ + Must(params.conn); + + // finalize the previously selected path before attempt.finish() forgets it + auto handledPath = attempt.path; + handledPath.finalize(params.conn); // closed on errors + attempt.finish(); - const char *what = itWasPrime ? "new prime connection" : "new spare connection"; if (params.flag == Comm::OK) { sendSuccess(handledPath, false, what); return; @@ -605,7 +619,6 @@ HappyConnOpener::connectDone(const CommConnectCbParams ¶ms) debugs(17, 8, what << " failed: " << params.conn); if (const auto peer = params.conn->getPeer()) peerConnectFailed(peer); - params.conn->close(); // TODO: Comm::ConnOpener should do this instead. // remember the last failure (we forward it if we cannot connect anywhere) lastFailedConnection = handledPath; @@ -881,13 +894,23 @@ HappyConnOpener::ranOutOfTimeOrAttempts() const return false; } +HappyConnOpener::Attempt::Attempt(const CallbackMethod method, const char *methodName): + callbackMethod(method), + callbackMethodName(methodName) +{ +} + +void +HappyConnOpener::Attempt::finish() +{ + connWait.finish(); + path = nullptr; +} + void HappyConnOpener::Attempt::cancel(const char *reason) { - if (connector) { - connector->cancel(reason); - CallJobHere(17, 3, opener, Comm::ConnOpener, noteAbort); - } - clear(); + connWait.cancel(reason); + path = nullptr; } diff --git a/src/HappyConnOpener.h b/src/HappyConnOpener.h index 4f3135c604..c57c431ad8 100644 --- a/src/HappyConnOpener.h +++ b/src/HappyConnOpener.h @@ -156,22 +156,28 @@ private: /// a connection opening attempt in progress (or falsy) class Attempt { public: + /// HappyConnOpener method implementing a ConnOpener callback + using CallbackMethod = void (HappyConnOpener::*)(const CommConnectCbParams &); + + Attempt(const CallbackMethod method, const char *methodName); + explicit operator bool() const { return static_cast(path); } /// reacts to a natural attempt completion (successful or otherwise) - void finish() { clear(); } + void finish(); /// aborts an in-progress attempt void cancel(const char *reason); PeerConnectionPointer path; ///< the destination we are connecting to - AsyncCall::Pointer connector; ///< our opener callback - Comm::ConnOpener::Pointer opener; ///< connects to path and calls us - private: - /// cleans up after the attempt ends (successfully or otherwise) - void clear() { path = nullptr; connector = nullptr; opener = nullptr; } + /// waits for a connection to the peer to be established/opened + JobWait connWait; + + const CallbackMethod callbackMethod; ///< ConnOpener calls this method + const char * const callbackMethodName; ///< for callbackMethod debugging }; + friend std::ostream &operator <<(std::ostream &, const Attempt &); /* AsyncJob API */ virtual void start() override; @@ -190,7 +196,9 @@ private: void openFreshConnection(Attempt &, PeerConnectionPointer &); bool reuseOldConnection(PeerConnectionPointer &); - void connectDone(const CommConnectCbParams &); + void notePrimeConnectDone(const CommConnectCbParams &); + void noteSpareConnectDone(const CommConnectCbParams &); + void handleConnOpenerAnswer(Attempt &, const CommConnectCbParams &, const char *connDescription); void checkForNewConnection(); diff --git a/src/PeerPoolMgr.cc b/src/PeerPoolMgr.cc index 2caa09f442..7423dd669e 100644 --- a/src/PeerPoolMgr.cc +++ b/src/PeerPoolMgr.cc @@ -43,9 +43,8 @@ public: PeerPoolMgr::PeerPoolMgr(CachePeer *aPeer): AsyncJob("PeerPoolMgr"), peer(cbdataReference(aPeer)), request(), - opener(), - securer(), - closer(), + transportWait(), + encryptionWait(), addrUsed(0) { } @@ -90,7 +89,7 @@ PeerPoolMgr::doneAll() const void PeerPoolMgr::handleOpenedConnection(const CommConnectCbParams ¶ms) { - opener = NULL; + transportWait.finish(); if (!validPeer()) { debugs(48, 3, "peer gone"); @@ -100,9 +99,6 @@ PeerPoolMgr::handleOpenedConnection(const CommConnectCbParams ¶ms) } if (params.flag != Comm::OK) { - /* it might have been a timeout with a partially open link */ - if (params.conn != NULL) - params.conn->close(); peerConnectFailed(peer); checkpoint("conn opening failure"); // may retry return; @@ -112,20 +108,16 @@ PeerPoolMgr::handleOpenedConnection(const CommConnectCbParams ¶ms) // Handle TLS peers. if (peer->secure.encryptTransport) { - typedef CommCbMemFunT CloserDialer; - closer = JobCallback(48, 3, CloserDialer, this, - PeerPoolMgr::handleSecureClosure); - comm_add_close_handler(params.conn->fd, closer); - - securer = asyncCall(48, 4, "PeerPoolMgr::handleSecuredPeer", - MyAnswerDialer(this, &PeerPoolMgr::handleSecuredPeer)); + // XXX: Exceptions orphan params.conn + AsyncCall::Pointer callback = asyncCall(48, 4, "PeerPoolMgr::handleSecuredPeer", + MyAnswerDialer(this, &PeerPoolMgr::handleSecuredPeer)); const int peerTimeout = peerConnectTimeout(peer); const int timeUsed = squid_curtime - params.conn->startTime(); // Use positive timeout when less than one second is left for conn. const int timeLeft = positiveTimeout(peerTimeout - timeUsed); - auto *connector = new Security::BlindPeerConnector(request, params.conn, securer, nullptr, timeLeft); - AsyncJob::Start(connector); // will call our callback + const auto connector = new Security::BlindPeerConnector(request, params.conn, callback, nullptr, timeLeft); + encryptionWait.start(connector, callback); return; } @@ -144,16 +136,7 @@ PeerPoolMgr::pushNewConnection(const Comm::ConnectionPointer &conn) void PeerPoolMgr::handleSecuredPeer(Security::EncryptorAnswer &answer) { - Must(securer != NULL); - securer = NULL; - - if (closer != NULL) { - if (answer.conn != NULL) - comm_remove_close_handler(answer.conn->fd, closer); - else - closer->cancel("securing completed"); - closer = NULL; - } + encryptionWait.finish(); if (!validPeer()) { debugs(48, 3, "peer gone"); @@ -162,35 +145,33 @@ PeerPoolMgr::handleSecuredPeer(Security::EncryptorAnswer &answer) return; } + assert(!answer.tunneled); if (answer.error.get()) { - if (answer.conn != NULL) - answer.conn->close(); + assert(!answer.conn); // PeerConnector calls peerConnectFailed() for us; checkpoint("conn securing failure"); // may retry return; } - pushNewConnection(answer.conn); -} + assert(answer.conn); -void -PeerPoolMgr::handleSecureClosure(const CommCloseCbParams ¶ms) -{ - Must(closer != NULL); - Must(securer != NULL); - securer->cancel("conn closed by a 3rd party"); - securer = NULL; - closer = NULL; - // allow the closing connection to fully close before we check again - Checkpoint(this, "conn closure while securing"); + // The socket could get closed while our callback was queued. Sync + // Connection. XXX: Connection::fd may already be stale/invalid here. + if (answer.conn->isOpen() && fd_table[answer.conn->fd].closing()) { + answer.conn->noteClosure(); + checkpoint("external connection closure"); // may retry + return; + } + + pushNewConnection(answer.conn); } void PeerPoolMgr::openNewConnection() { // KISS: Do nothing else when we are already doing something. - if (opener != NULL || securer != NULL || shutting_down) { - debugs(48, 7, "busy: " << opener << '|' << securer << '|' << shutting_down); + if (transportWait || encryptionWait || shutting_down) { + debugs(48, 7, "busy: " << transportWait << '|' << encryptionWait << '|' << shutting_down); return; // there will be another checkpoint when we are done opening/securing } @@ -227,9 +208,9 @@ PeerPoolMgr::openNewConnection() const int ctimeout = peerConnectTimeout(peer); typedef CommCbMemFunT Dialer; - opener = JobCallback(48, 5, Dialer, this, PeerPoolMgr::handleOpenedConnection); - Comm::ConnOpener *cs = new Comm::ConnOpener(conn, opener, ctimeout); - AsyncJob::Start(cs); + AsyncCall::Pointer callback = JobCallback(48, 5, Dialer, this, PeerPoolMgr::handleOpenedConnection); + const auto cs = new Comm::ConnOpener(conn, callback, ctimeout); + transportWait.start(cs, callback); } void diff --git a/src/PeerPoolMgr.h b/src/PeerPoolMgr.h index 217994393a..6da61b10bc 100644 --- a/src/PeerPoolMgr.h +++ b/src/PeerPoolMgr.h @@ -10,6 +10,7 @@ #define SQUID_PEERPOOLMGR_H #include "base/AsyncJob.h" +#include "base/JobWait.h" #include "comm/forward.h" #include "security/forward.h" @@ -54,18 +55,19 @@ protected: /// Security::PeerConnector callback void handleSecuredPeer(Security::EncryptorAnswer &answer); - /// called when the connection we are trying to secure is closed by a 3rd party - void handleSecureClosure(const CommCloseCbParams ¶ms); - /// the final step in connection opening (and, optionally, securing) sequence void pushNewConnection(const Comm::ConnectionPointer &conn); private: CachePeer *peer; ///< the owner of the pool we manage RefCount request; ///< fake HTTP request for conn opening code - AsyncCall::Pointer opener; ///< whether we are opening a connection - AsyncCall::Pointer securer; ///< whether we are securing a connection - AsyncCall::Pointer closer; ///< monitors conn while we are securing it + + /// waits for a transport connection to the peer to be established/opened + JobWait transportWait; + + /// waits for the established transport connection to be secured/encrypted + JobWait encryptionWait; + unsigned int addrUsed; ///< counter for cycling through peer addresses }; diff --git a/src/ResolvedPeers.cc b/src/ResolvedPeers.cc index 5b2c6740de..06ab02d23a 100644 --- a/src/ResolvedPeers.cc +++ b/src/ResolvedPeers.cc @@ -151,7 +151,7 @@ ResolvedPeers::extractFound(const char *description, const Paths::iterator &foun while (++pathsToSkip < paths_.size() && !paths_[pathsToSkip].available) {} } - const auto cleanPath = path.connection->cloneDestinationDetails(); + const auto cleanPath = path.connection->cloneProfile(); return PeerConnectionPointer(cleanPath, found - paths_.begin()); } diff --git a/src/adaptation/icap/ModXact.cc b/src/adaptation/icap/ModXact.cc index bbeebdbb56..982f38f8d4 100644 --- a/src/adaptation/icap/ModXact.cc +++ b/src/adaptation/icap/ModXact.cc @@ -187,8 +187,7 @@ void Adaptation::Icap::ModXact::startWriting() openConnection(); } -// connection with the ICAP service established -void Adaptation::Icap::ModXact::handleCommConnected() +void Adaptation::Icap::ModXact::startShoveling() { Must(state.writing == State::writingConnect); diff --git a/src/adaptation/icap/ModXact.h b/src/adaptation/icap/ModXact.h index f8988ac294..2fda1318b8 100644 --- a/src/adaptation/icap/ModXact.h +++ b/src/adaptation/icap/ModXact.h @@ -155,10 +155,11 @@ public: virtual void noteBodyProductionEnded(BodyPipe::Pointer); virtual void noteBodyProducerAborted(BodyPipe::Pointer); - // comm handlers - virtual void handleCommConnected(); + /* Xaction API */ + virtual void startShoveling(); virtual void handleCommWrote(size_t size); virtual void handleCommRead(size_t size); + void handleCommWroteHeaders(); void handleCommWroteBody(); diff --git a/src/adaptation/icap/OptXact.cc b/src/adaptation/icap/OptXact.cc index 5ed4fbfbb8..2464ea5331 100644 --- a/src/adaptation/icap/OptXact.cc +++ b/src/adaptation/icap/OptXact.cc @@ -37,7 +37,7 @@ void Adaptation::Icap::OptXact::start() openConnection(); } -void Adaptation::Icap::OptXact::handleCommConnected() +void Adaptation::Icap::OptXact::startShoveling() { scheduleRead(); diff --git a/src/adaptation/icap/OptXact.h b/src/adaptation/icap/OptXact.h index 3811ab48fd..725cd6225e 100644 --- a/src/adaptation/icap/OptXact.h +++ b/src/adaptation/icap/OptXact.h @@ -30,8 +30,9 @@ public: OptXact(ServiceRep::Pointer &aService); protected: + /* Xaction API */ virtual void start(); - virtual void handleCommConnected(); + virtual void startShoveling(); virtual void handleCommWrote(size_t size); virtual void handleCommRead(size_t size); diff --git a/src/adaptation/icap/ServiceRep.cc b/src/adaptation/icap/ServiceRep.cc index 6df4757126..fbd8968881 100644 --- a/src/adaptation/icap/ServiceRep.cc +++ b/src/adaptation/icap/ServiceRep.cc @@ -112,9 +112,10 @@ void Adaptation::Icap::ServiceRep::noteFailure() // should be configurable. } -// returns a persistent or brand new connection; negative int on failures +// TODO: getIdleConnection() and putConnection()/noteConnectionFailed() manage a +// "used connection slot" resource. Automate that resource tracking (RAII/etc.). Comm::ConnectionPointer -Adaptation::Icap::ServiceRep::getConnection(bool retriableXact, bool &reused) +Adaptation::Icap::ServiceRep::getIdleConnection(const bool retriableXact) { Comm::ConnectionPointer connection; @@ -137,7 +138,6 @@ Adaptation::Icap::ServiceRep::getConnection(bool retriableXact, bool &reused) else theIdleConns->closeN(1); - reused = Comm::IsConnOpen(connection); ++theBusyConns; debugs(93,3, HERE << "got connection: " << connection); return connection; @@ -150,7 +150,6 @@ void Adaptation::Icap::ServiceRep::putConnection(const Comm::ConnectionPointer & // do not pool an idle connection if we owe connections if (isReusable && excessConnections() == 0) { debugs(93, 3, HERE << "pushing pconn" << comment); - commUnsetConnTimeout(conn); theIdleConns->push(conn); } else { debugs(93, 3, HERE << (sendReset ? "RST" : "FIN") << "-closing " << diff --git a/src/adaptation/icap/ServiceRep.h b/src/adaptation/icap/ServiceRep.h index f2e893244e..cca2df4ab0 100644 --- a/src/adaptation/icap/ServiceRep.h +++ b/src/adaptation/icap/ServiceRep.h @@ -85,7 +85,8 @@ public: bool wantsPreview(const SBuf &urlPath, size_t &wantedSize) const; bool allows204() const; bool allows206() const; - Comm::ConnectionPointer getConnection(bool isRetriable, bool &isReused); + /// \returns an idle persistent ICAP connection or nil + Comm::ConnectionPointer getIdleConnection(bool isRetriable); void putConnection(const Comm::ConnectionPointer &conn, bool isReusable, bool sendReset, const char *comment); void noteConnectionUse(const Comm::ConnectionPointer &conn); void noteConnectionFailed(const char *comment); diff --git a/src/adaptation/icap/Xaction.cc b/src/adaptation/icap/Xaction.cc index 9aacf2e1bb..962ed17037 100644 --- a/src/adaptation/icap/Xaction.cc +++ b/src/adaptation/icap/Xaction.cc @@ -13,6 +13,7 @@ #include "adaptation/icap/Config.h" #include "adaptation/icap/Launcher.h" #include "adaptation/icap/Xaction.h" +#include "base/JobWait.h" #include "base/TextException.h" #include "comm.h" #include "comm/Connection.h" @@ -79,7 +80,6 @@ Adaptation::Icap::Xaction::Xaction(const char *aTypeName, Adaptation::Icap::Serv icapRequest(NULL), icapReply(NULL), attempts(0), - connection(NULL), theService(aService), commEof(false), reuseConnection(true), @@ -87,14 +87,8 @@ Adaptation::Icap::Xaction::Xaction(const char *aTypeName, Adaptation::Icap::Serv isRepeatable(true), ignoreLastWrite(false), waitingForDns(false), - stopReason(NULL), - connector(NULL), - reader(NULL), - writer(NULL), - closer(NULL), alep(new AccessLogEntry), - al(*alep), - cs(NULL) + al(*alep) { debugs(93,3, typeName << " constructed, this=" << this << " [icapx" << id << ']'); // we should not call virtual status() here @@ -150,6 +144,8 @@ static void icapLookupDnsResults(const ipcache_addrs *ia, const Dns::LookupDetails &, void *data) { Adaptation::Icap::Xaction *xa = static_cast(data); + /// TODO: refactor with CallJobHere1, passing either std::optional (after upgrading to C++17) + /// or Optional (when it can take non-trivial types) xa->dnsLookupDone(ia); } @@ -164,21 +160,8 @@ Adaptation::Icap::Xaction::openConnection() if (!TheConfig.reuse_connections) disableRetries(); // this will also safely drain pconn pool - bool wasReused = false; - connection = s.getConnection(isRetriable, wasReused); - - if (wasReused && Comm::IsConnOpen(connection)) { - // Set comm Close handler - // fake the connect callback - // TODO: can we sync call Adaptation::Icap::Xaction::noteCommConnected here instead? - typedef CommCbMemFunT Dialer; - CbcPointer self(this); - Dialer dialer(self, &Adaptation::Icap::Xaction::noteCommConnected); - dialer.params.conn = connection; - dialer.params.flag = Comm::OK; - // fake other parameters by copying from the existing connection - connector = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected", dialer); - ScheduleCallHere(connector); + if (const auto pconn = s.getIdleConnection(isRetriable)) { + useTransportConnection(pconn); return; } @@ -207,30 +190,22 @@ Adaptation::Icap::Xaction::dnsLookupDone(const ipcache_addrs *ia) #if WHEN_IPCACHE_NBGETHOSTBYNAME_USES_ASYNC_CALLS dieOnConnectionFailure(); // throws #else // take a step back into protected Async call dialing. - // fake the connect callback - typedef CommCbMemFunT Dialer; - CbcPointer self(this); - Dialer dialer(self, &Adaptation::Icap::Xaction::noteCommConnected); - dialer.params.conn = connection; - dialer.params.flag = Comm::COMM_ERROR; - // fake other parameters by copying from the existing connection - connector = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected", dialer); - ScheduleCallHere(connector); + CallJobHere(93, 3, this, Xaction, Xaction::dieOnConnectionFailure); #endif return; } - connection = new Comm::Connection; - connection->remote = ia->current(); - connection->remote.port(s.cfg().port); - getOutgoingAddress(NULL, connection); + const Comm::ConnectionPointer conn = new Comm::Connection(); + conn->remote = ia->current(); + conn->remote.port(s.cfg().port); + getOutgoingAddress(nullptr, conn); // TODO: service bypass status may differ from that of a transaction typedef CommCbMemFunT ConnectDialer; - connector = JobCallback(93,3, ConnectDialer, this, Adaptation::Icap::Xaction::noteCommConnected); - cs = new Comm::ConnOpener(connection, connector, TheConfig.connect_timeout(service().cfg().bypass)); + AsyncCall::Pointer callback = JobCallback(93, 3, ConnectDialer, this, Adaptation::Icap::Xaction::noteCommConnected); + const auto cs = new Comm::ConnOpener(conn, callback, TheConfig.connect_timeout(service().cfg().bypass)); cs->setHost(s.cfg().host.termedBuf()); - AsyncJob::Start(cs.get()); + transportWait.start(cs, callback); } /* @@ -256,6 +231,8 @@ void Adaptation::Icap::Xaction::closeConnection() closer = NULL; } + commUnsetConnTimeout(connection); + cancelRead(); // may not work if (reuseConnection && !doneWithIo()) { @@ -275,54 +252,65 @@ void Adaptation::Icap::Xaction::closeConnection() writer = NULL; reader = NULL; - connector = NULL; connection = NULL; } } -// connection with the ICAP service established +/// called when the connection attempt to an ICAP service completes (successfully or not) void Adaptation::Icap::Xaction::noteCommConnected(const CommConnectCbParams &io) { - cs = NULL; + transportWait.finish(); - if (io.flag == Comm::TIMEOUT) { - handleCommTimedout(); + if (io.flag != Comm::OK) { + dieOnConnectionFailure(); // throws return; } - Must(connector != NULL); - connector = NULL; - - if (io.flag != Comm::OK) - dieOnConnectionFailure(); // throws - - typedef CommCbMemFunT TimeoutDialer; - AsyncCall::Pointer timeoutCall = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommTimedout", - TimeoutDialer(this,&Adaptation::Icap::Xaction::noteCommTimedout)); - commSetConnTimeout(io.conn, TheConfig.connect_timeout(service().cfg().bypass), timeoutCall); + useTransportConnection(io.conn); +} - typedef CommCbMemFunT CloseDialer; - closer = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed", - CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed)); - comm_add_close_handler(io.conn->fd, closer); +/// React to the availability of a transport connection to the ICAP service. +/// The given connection may (or may not) be secured already. +void +Adaptation::Icap::Xaction::useTransportConnection(const Comm::ConnectionPointer &conn) +{ + assert(Comm::IsConnOpen(conn)); + assert(!connection); // If it is a reused connection and the TLS object is built // we should not negotiate new TLS session - const auto &ssl = fd_table[io.conn->fd].ssl; + const auto &ssl = fd_table[conn->fd].ssl; if (!ssl && service().cfg().secure.encryptTransport) { + // XXX: Exceptions orphan conn. CbcPointer me(this); - securer = asyncCall(93, 4, "Adaptation::Icap::Xaction::handleSecuredPeer", - MyIcapAnswerDialer(me, &Adaptation::Icap::Xaction::handleSecuredPeer)); + AsyncCall::Pointer callback = asyncCall(93, 4, "Adaptation::Icap::Xaction::handleSecuredPeer", + MyIcapAnswerDialer(me, &Adaptation::Icap::Xaction::handleSecuredPeer)); - auto *sslConnector = new Ssl::IcapPeerConnector(theService, io.conn, securer, masterLogEntry(), TheConfig.connect_timeout(service().cfg().bypass)); - AsyncJob::Start(sslConnector); // will call our callback + const auto sslConnector = new Ssl::IcapPeerConnector(theService, conn, callback, masterLogEntry(), TheConfig.connect_timeout(service().cfg().bypass)); + + encryptionWait.start(sslConnector, callback); return; } -// ?? fd_table[io.conn->fd].noteUse(icapPconnPool); + useIcapConnection(conn); +} + +/// react to the availability of a fully-ready ICAP connection +void +Adaptation::Icap::Xaction::useIcapConnection(const Comm::ConnectionPointer &conn) +{ + assert(!connection); + assert(conn); + assert(Comm::IsConnOpen(conn)); + connection = conn; service().noteConnectionUse(connection); - handleCommConnected(); + typedef CommCbMemFunT CloseDialer; + closer = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed", + CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed)); + comm_add_close_handler(connection->fd, closer); + + startShoveling(); } void Adaptation::Icap::Xaction::dieOnConnectionFailure() @@ -367,40 +355,25 @@ void Adaptation::Icap::Xaction::noteCommWrote(const CommIoCbParams &io) // communication timeout with the ICAP service void Adaptation::Icap::Xaction::noteCommTimedout(const CommTimeoutCbParams &) -{ - handleCommTimedout(); -} - -void Adaptation::Icap::Xaction::handleCommTimedout() { debugs(93, 2, HERE << typeName << " failed: timeout with " << theService->cfg().methodStr() << " " << theService->cfg().uri << status()); reuseConnection = false; - const bool whileConnecting = connector != NULL; - if (whileConnecting) { - assert(!haveConnection()); - theService->noteConnectionFailed("timedout"); - } else - closeConnection(); // so that late Comm callbacks do not disturb bypass - throw TexcHere(whileConnecting ? - "timed out while connecting to the ICAP service" : - "timed out while talking to the ICAP service"); + assert(haveConnection()); + closeConnection(); + throw TextException("timed out while talking to the ICAP service", Here()); } // unexpected connection close while talking to the ICAP service void Adaptation::Icap::Xaction::noteCommClosed(const CommCloseCbParams &) { - if (securer != NULL) { - securer->cancel("Connection closed before SSL negotiation finished"); - securer = NULL; + if (connection) { + connection->noteClosure(); + connection = nullptr; } closer = NULL; - handleCommClosed(); -} -void Adaptation::Icap::Xaction::handleCommClosed() -{ static const auto d = MakeNamedErrorDetail("ICAP_XACT_CLOSE"); detailError(d); mustStop("ICAP service connection externally closed"); @@ -424,7 +397,8 @@ void Adaptation::Icap::Xaction::callEnd() bool Adaptation::Icap::Xaction::doneAll() const { - return !waitingForDns && !connector && !securer && !reader && !writer && + return !waitingForDns && !transportWait && !encryptionWait && + !reader && !writer && Adaptation::Initiate::doneAll(); } @@ -568,7 +542,7 @@ bool Adaptation::Icap::Xaction::doneWriting() const bool Adaptation::Icap::Xaction::doneWithIo() const { return haveConnection() && - !connector && !reader && !writer && // fast checks, some redundant + !transportWait && !reader && !writer && // fast checks, some redundant doneReading() && doneWriting(); } @@ -608,10 +582,7 @@ void Adaptation::Icap::Xaction::setOutcome(const Adaptation::Icap::XactOutcome & void Adaptation::Icap::Xaction::swanSong() { // kids should sing first and then call the parent method. - if (cs.valid()) { - debugs(93,6, HERE << id << " about to notify ConnOpener!"); - CallJobHere(93, 3, cs, Comm::ConnOpener, noteAbort); - cs = NULL; + if (transportWait || encryptionWait) { service().noteConnectionFailed("abort"); } @@ -750,20 +721,12 @@ Ssl::IcapPeerConnector::noteNegotiationDone(ErrorState *error) void Adaptation::Icap::Xaction::handleSecuredPeer(Security::EncryptorAnswer &answer) { - Must(securer != NULL); - securer = NULL; - - if (closer != NULL) { - if (Comm::IsConnOpen(answer.conn)) - comm_remove_close_handler(answer.conn->fd, closer); - else - closer->cancel("securing completed"); - closer = NULL; - } + encryptionWait.finish(); + assert(!answer.tunneled); if (answer.error.get()) { - if (answer.conn != NULL) - answer.conn->close(); + assert(!answer.conn); + // TODO: Refactor dieOnConnectionFailure() to be usable here as well. debugs(93, 2, typeName << " TLS negotiation to " << service().cfg().uri << " failed"); service().noteConnectionFailed("failure"); @@ -774,8 +737,18 @@ Adaptation::Icap::Xaction::handleSecuredPeer(Security::EncryptorAnswer &answer) debugs(93, 5, "TLS negotiation to " << service().cfg().uri << " complete"); - service().noteConnectionUse(answer.conn); + assert(answer.conn); + + // The socket could get closed while our callback was queued. Sync + // Connection. XXX: Connection::fd may already be stale/invalid here. + if (answer.conn->isOpen() && fd_table[answer.conn->fd].closing()) { + answer.conn->noteClosure(); + service().noteConnectionFailed("external TLS connection closure"); + static const auto d = MakeNamedErrorDetail("ICAP_XACT_SSL_CLOSE"); + detailError(d); + throw TexcHere("external closure of the TLS ICAP service connection"); + } - handleCommConnected(); + useIcapConnection(answer.conn); } diff --git a/src/adaptation/icap/Xaction.h b/src/adaptation/icap/Xaction.h index b660445944..31a6e22fc9 100644 --- a/src/adaptation/icap/Xaction.h +++ b/src/adaptation/icap/Xaction.h @@ -12,6 +12,7 @@ #include "AccessLogEntry.h" #include "adaptation/icap/ServiceRep.h" #include "adaptation/Initiate.h" +#include "base/JobWait.h" #include "comm/ConnOpener.h" #include "error/forward.h" #include "HttpReply.h" @@ -20,6 +21,10 @@ class MemBuf; +namespace Ssl { +class IcapPeerConnector; +} + namespace Adaptation { namespace Icap @@ -65,12 +70,12 @@ protected: virtual void start(); virtual void noteInitiatorAborted(); // TODO: move to Adaptation::Initiate + /// starts sending/receiving ICAP messages + virtual void startShoveling() = 0; + // comm hanndlers; called by comm handler wrappers - virtual void handleCommConnected() = 0; virtual void handleCommWrote(size_t sz) = 0; virtual void handleCommRead(size_t sz) = 0; - virtual void handleCommTimedout(); - virtual void handleCommClosed(); void handleSecuredPeer(Security::EncryptorAnswer &answer); /// record error detail if possible @@ -78,7 +83,6 @@ protected: void openConnection(); void closeConnection(); - void dieOnConnectionFailure(); bool haveConnection() const; void scheduleRead(); @@ -124,11 +128,13 @@ public: ServiceRep &service(); private: + void useTransportConnection(const Comm::ConnectionPointer &); + void useIcapConnection(const Comm::ConnectionPointer &); + void dieOnConnectionFailure(); void tellQueryAborted(); void maybeLog(); protected: - Comm::ConnectionPointer connection; ///< ICAP server connection Adaptation::Icap::ServiceRep::Pointer theService; SBuf readBuf; @@ -139,13 +145,8 @@ protected: bool ignoreLastWrite; bool waitingForDns; ///< expecting a ipcache_nbgethostbyname() callback - const char *stopReason; - - // active (pending) comm callbacks for the ICAP server connection - AsyncCall::Pointer connector; AsyncCall::Pointer reader; AsyncCall::Pointer writer; - AsyncCall::Pointer closer; AccessLogEntry::Pointer alep; ///< icap.log entry AccessLogEntry &al; ///< short for *alep @@ -155,8 +156,16 @@ protected: timeval icap_tio_finish; /*time when the last byte of the ICAP responsewas received*/ private: - Comm::ConnOpener::Pointer cs; - AsyncCall::Pointer securer; ///< whether we are securing a connection + /// waits for a transport connection to the ICAP server to be established/opened + JobWait transportWait; + + /// waits for the established transport connection to be secured/encrypted + JobWait encryptionWait; + + /// open and, if necessary, secured connection to the ICAP server (or nil) + Comm::ConnectionPointer connection; + + AsyncCall::Pointer closer; }; } // namespace Icap diff --git a/src/base/AsyncJob.cc b/src/base/AsyncJob.cc index 3b9161e4a1..111667d5d8 100644 --- a/src/base/AsyncJob.cc +++ b/src/base/AsyncJob.cc @@ -20,11 +20,11 @@ InstanceIdDefinitions(AsyncJob, "job"); -AsyncJob::Pointer AsyncJob::Start(AsyncJob *j) +void +AsyncJob::Start(const Pointer &job) { - AsyncJob::Pointer job(j); CallJobHere(93, 5, job, AsyncJob, start); - return job; + job->started_ = true; // it is the attempt that counts } AsyncJob::AsyncJob(const char *aTypeName) : @@ -38,6 +38,7 @@ AsyncJob::~AsyncJob() { debugs(93,5, "AsyncJob destructed, this=" << this << " type=" << typeName << " [" << id << ']'); + assert(!started_ || swanSang_); } void AsyncJob::start() @@ -141,9 +142,16 @@ void AsyncJob::callEnd() AsyncCall::Pointer inCallSaved = inCall; void *thisSaved = this; + // TODO: Swallow swanSong() exceptions to reduce memory leaks. + + // Job callback invariant: swanSong() is (only) called for started jobs. + // Here to detect violations in kids that forgot to call our swanSong(). + assert(started_); + + swanSang_ = true; // it is the attempt that counts swanSong(); - delete this; // this is the only place where the object is deleted + delete this; // this is the only place where a started job is deleted // careful: this object does not exist any more debugs(93, 6, HERE << *inCallSaved << " ended " << thisSaved); diff --git a/src/base/AsyncJob.h b/src/base/AsyncJob.h index 4d685dff52..a46e300713 100644 --- a/src/base/AsyncJob.h +++ b/src/base/AsyncJob.h @@ -36,8 +36,13 @@ public: public: AsyncJob(const char *aTypeName); - /// starts a freshly created job (i.e., makes the job asynchronous) - static Pointer Start(AsyncJob *job); + /// Promises to start the configured job (eventually). The job is deemed to + /// be running asynchronously beyond this point, so the caller should only + /// access the job object via AsyncCalls rather than directly. + /// + /// swanSong() is only called for jobs for which this method has returned + /// successfully (i.e. without throwing). + static void Start(const Pointer &job); protected: // XXX: temporary method to replace "delete this" in jobs-in-transition. @@ -62,6 +67,11 @@ public: /// called when the job throws during an async call virtual void callException(const std::exception &e); + /// process external request to terminate now (i.e. during this async call) + void handleStopRequest() { mustStop("externally aborted"); } + + const InstanceId id; ///< job identifier + protected: // external destruction prohibited to ensure swanSong() is called virtual ~AsyncJob(); @@ -69,7 +79,9 @@ protected: const char *stopReason; ///< reason for forcing done() to be true const char *typeName; ///< kid (leaf) class name, for debugging AsyncCall::Pointer inCall; ///< the asynchronous call being handled, if any - const InstanceId id; ///< job identifier + + bool started_ = false; ///< Start() has finished successfully + bool swanSang_ = false; ///< swanSong() was called }; #endif /* SQUID_ASYNC_JOB_H */ diff --git a/src/base/JobWait.cc b/src/base/JobWait.cc new file mode 100644 index 0000000000..ba68324157 --- /dev/null +++ b/src/base/JobWait.cc @@ -0,0 +1,81 @@ +/* + * Copyright (C) 1996-2020 The Squid Software Foundation and contributors + * + * Squid software is distributed under GPLv2+ license and includes + * contributions from numerous individuals and organizations. + * Please see the COPYING and CONTRIBUTORS files for details. + */ + +#include "squid.h" +#include "base/AsyncJobCalls.h" +#include "base/JobWait.h" + +#include +#include + +JobWaitBase::JobWaitBase() = default; + +JobWaitBase::~JobWaitBase() +{ + cancel("owner gone"); +} + +void +JobWaitBase::start_(const AsyncJob::Pointer aJob, const AsyncCall::Pointer aCall) +{ + // Invariant: The wait will be over. We cannot guarantee that the job will + // call the callback, of course, but we can check these prerequisites. + assert(aCall); + assert(aJob.valid()); + + // "Double" waiting state leads to conflicting/mismatching callbacks + // detailed in finish(). Detect that bug ASAP. + assert(!waiting()); + + assert(!callback_); + assert(!job_); + callback_ = aCall; + job_ = aJob; + + AsyncJob::Start(job_.get()); +} + +void +JobWaitBase::finish() +{ + // Unexpected callbacks might result in disasters like secrets exposure, + // data corruption, or expensive message routing mistakes when the callback + // info is applied to the wrong message part or acted upon prematurely. + assert(waiting()); + clear(); +} + +void +JobWaitBase::cancel(const char *reason) +{ + if (callback_) { + callback_->cancel(reason); + + // Instead of AsyncJob, the class parameter could be Job. That would + // avoid runtime child-to-parent CbcPointer conversion overheads, but + // complicate support for Jobs with virtual AsyncJob bases (GCC error: + // "pointer to member conversion via virtual base AsyncJob") and also + // cache-log "Job::handleStopRequest()" with a non-existent class name. + CallJobHere(callback_->debugSection, callback_->debugLevel, job_, AsyncJob, handleStopRequest); + + clear(); + } +} + +void +JobWaitBase::print(std::ostream &os) const +{ + // use a backarrow to emphasize that this is a callback: call24<-job6 + if (callback_) + os << callback_->id << "<-"; + if (const auto rawJob = job_.get()) + os << rawJob->id; + else + os << job_; // raw pointer of a gone job may still be useful for triage +} + diff --git a/src/base/JobWait.h b/src/base/JobWait.h new file mode 100644 index 0000000000..6b11313319 --- /dev/null +++ b/src/base/JobWait.h @@ -0,0 +1,91 @@ +/* + * Copyright (C) 1996-2021 The Squid Software Foundation and contributors + * + * Squid software is distributed under GPLv2+ license and includes + * contributions from numerous individuals and organizations. + * Please see the COPYING and CONTRIBUTORS files for details. + */ + +#ifndef SQUID_BASE_JOBWAIT_H +#define SQUID_BASE_JOBWAIT_H + +#include "base/AsyncJob.h" +#include "base/CbcPointer.h" + +#include + +/// Manages waiting for an AsyncJob callback. Use type-safe JobWait instead. +/// This base class does not contain code specific to the actual Job type. +class JobWaitBase +{ +public: + JobWaitBase(); + ~JobWaitBase(); + + /// no copying of any kind: each waiting context needs a dedicated AsyncCall + JobWaitBase(JobWaitBase &&) = delete; + + explicit operator bool() const { return waiting(); } + + /// whether we are currently waiting for the job to call us back + /// the job itself may be gone even if this returns true + bool waiting() const { return bool(callback_); } + + /// ends wait (after receiving the call back) + /// forgets the job which is likely to be gone by now + void finish(); + + /// aborts wait (if any) before receiving the call back + /// does nothing if we are not waiting + void cancel(const char *reason); + + /// summarizes what we are waiting for (for debugging) + void print(std::ostream &) const; + +protected: + /// starts waiting for the given job to call the given callback + void start_(AsyncJob::Pointer, AsyncCall::Pointer); + +private: + /// the common part of finish() and cancel() + void clear() { job_.clear(); callback_ = nullptr; } + + /// the job that we are waiting to call us back (or nil) + AsyncJob::Pointer job_; + + /// the call we are waiting for the job_ to make (or nil) + AsyncCall::Pointer callback_; +}; + +/// Manages waiting for an AsyncJob callback. +/// Completes JobWaitBase by providing Job type-specific members. +template +class JobWait: public JobWaitBase +{ +public: + typedef CbcPointer JobPointer; + + /// starts waiting for the given job to call the given callback + void start(const JobPointer &aJob, const AsyncCall::Pointer &aCallback) { + start_(aJob, aCallback); + typedJob_ = aJob; + } + + /// \returns a cbdata pointer to the job we are waiting for (or nil) + /// the returned pointer may be falsy, even if we are still waiting() + JobPointer job() const { return waiting() ? typedJob_ : nullptr; } + +private: + /// nearly duplicates JobWaitBase::job_ but exposes the actual job type + JobPointer typedJob_; +}; + +inline +std::ostream &operator <<(std::ostream &os, const JobWaitBase &wait) +{ + wait.print(os); + return os; +} + +#endif /* SQUID_BASE_JOBWAIT_H */ + diff --git a/src/base/Makefile.am b/src/base/Makefile.am index c9564d7555..374ea87980 100644 --- a/src/base/Makefile.am +++ b/src/base/Makefile.am @@ -34,6 +34,8 @@ libbase_la_SOURCES = \ Here.h \ InstanceId.cc \ InstanceId.h \ + JobWait.cc \ + JobWait.h \ Lock.h \ LookupTable.h \ LruMap.h \ diff --git a/src/base/forward.h b/src/base/forward.h index 3803e1ea01..46de97c8d7 100644 --- a/src/base/forward.h +++ b/src/base/forward.h @@ -17,6 +17,7 @@ class ScopedId; template class CbcPointer; template class RefCount; +template class JobWait; typedef CbcPointer AsyncJobPointer; typedef RefCount CodeContextPointer; diff --git a/src/client_side.cc b/src/client_side.cc index b8d786423d..4eb6976966 100644 --- a/src/client_side.cc +++ b/src/client_side.cc @@ -503,6 +503,10 @@ httpRequestFree(void *data) /* This is a handler normally called by comm_close() */ void ConnStateData::connStateClosed(const CommCloseCbParams &) { + if (clientConnection) { + clientConnection->noteClosure(); + // keep closed clientConnection for logging, clientdb cleanup, etc. + } deleteThis("ConnStateData::connStateClosed"); } diff --git a/src/clients/FtpClient.cc b/src/clients/FtpClient.cc index 7874db1219..afe7827dc6 100644 --- a/src/clients/FtpClient.cc +++ b/src/clients/FtpClient.cc @@ -201,10 +201,6 @@ Ftp::Client::Client(FwdState *fwdState): Ftp::Client::~Client() { - if (data.opener != NULL) { - data.opener->cancel("Ftp::Client destructed"); - data.opener = NULL; - } data.close(); safe_free(old_request); @@ -786,10 +782,10 @@ Ftp::Client::connectDataChannel() debugs(9, 3, "connecting to " << conn->remote); typedef CommCbMemFunT Dialer; - data.opener = JobCallback(9, 3, Dialer, this, Ftp::Client::dataChannelConnected); - Comm::ConnOpener *cs = new Comm::ConnOpener(conn, data.opener, Config.Timeout.connect); + AsyncCall::Pointer callback = JobCallback(9, 3, Dialer, this, Ftp::Client::dataChannelConnected); + const auto cs = new Comm::ConnOpener(conn, callback, Config.Timeout.connect); cs->setHost(data.host); - AsyncJob::Start(cs); + dataConnWait.start(cs, callback); } bool @@ -811,10 +807,11 @@ void Ftp::Client::dataClosed(const CommCloseCbParams &) { debugs(9, 4, status()); + if (data.conn) + data.conn->noteClosure(); if (data.listenConn != NULL) { data.listenConn->close(); data.listenConn = NULL; - // NP clear() does the: data.fd = -1; } data.clear(); } @@ -879,6 +876,8 @@ void Ftp::Client::ctrlClosed(const CommCloseCbParams &) { debugs(9, 4, status()); + if (ctrl.conn) + ctrl.conn->noteClosure(); ctrl.clear(); doneWithFwd = "ctrlClosed()"; // assume FwdState is monitoring too mustStop("Ftp::Client::ctrlClosed"); diff --git a/src/clients/FtpClient.h b/src/clients/FtpClient.h index ac8d22e180..4b2dd61d54 100644 --- a/src/clients/FtpClient.h +++ b/src/clients/FtpClient.h @@ -64,7 +64,6 @@ public: */ Comm::ConnectionPointer listenConn; - AsyncCall::Pointer opener; ///< Comm opener handler callback. private: AsyncCall::Pointer closer; ///< Comm close handler callback }; @@ -205,6 +204,10 @@ protected: virtual void sentRequestBody(const CommIoCbParams &io); virtual void doneSendingRequestBody(); + /// Waits for an FTP data connection to the server to be established/opened. + /// This wait only happens in FTP passive mode (via PASV or EPSV). + JobWait dataConnWait; + private: bool parseControlReply(size_t &bytesUsed); diff --git a/src/clients/FtpGateway.cc b/src/clients/FtpGateway.cc index 6a7d139ca5..aeeaedb483 100644 --- a/src/clients/FtpGateway.cc +++ b/src/clients/FtpGateway.cc @@ -486,11 +486,9 @@ Ftp::Gateway::timeout(const CommTimeoutCbParams &io) flags.pasv_supported = false; debugs(9, DBG_IMPORTANT, "FTP Gateway timeout in SENT_PASV state"); - // cancel the data connection setup. - if (data.opener != NULL) { - data.opener->cancel("timeout"); - data.opener = NULL; - } + // cancel the data connection setup, if any + dataConnWait.cancel("timeout"); + data.close(); } @@ -1723,7 +1721,7 @@ void Ftp::Gateway::dataChannelConnected(const CommConnectCbParams &io) { debugs(9, 3, HERE); - data.opener = NULL; + dataConnWait.finish(); if (io.flag != Comm::OK) { debugs(9, 2, HERE << "Failed to connect. Retrying via another method."); @@ -2727,9 +2725,9 @@ Ftp::Gateway::mayReadVirginReplyBody() const return !doneWithServer(); } -AsyncJob::Pointer +void Ftp::StartGateway(FwdState *const fwdState) { - return AsyncJob::Start(new Ftp::Gateway(fwdState)); + AsyncJob::Start(new Ftp::Gateway(fwdState)); } diff --git a/src/clients/FtpRelay.cc b/src/clients/FtpRelay.cc index 8796a74c84..ce6ba3ba61 100644 --- a/src/clients/FtpRelay.cc +++ b/src/clients/FtpRelay.cc @@ -739,7 +739,7 @@ void Ftp::Relay::dataChannelConnected(const CommConnectCbParams &io) { debugs(9, 3, status()); - data.opener = NULL; + dataConnWait.finish(); if (io.flag != Comm::OK) { debugs(9, 2, "failed to connect FTP server data channel"); @@ -804,9 +804,9 @@ Ftp::Relay::HandleStoreAbort(Relay *ftpClient) ftpClient->dataComplete(); } -AsyncJob::Pointer +void Ftp::StartRelay(FwdState *const fwdState) { - return AsyncJob::Start(new Ftp::Relay(fwdState)); + AsyncJob::Start(new Ftp::Relay(fwdState)); } diff --git a/src/clients/HttpTunneler.cc b/src/clients/HttpTunneler.cc index 023a8586cc..67a52e6628 100644 --- a/src/clients/HttpTunneler.cc +++ b/src/clients/HttpTunneler.cc @@ -101,6 +101,11 @@ void Http::Tunneler::handleConnectionClosure(const CommCloseCbParams ¶ms) { closer = nullptr; + if (connection) { + countFailingConnection(); + connection->noteClosure(); + connection = nullptr; + } bailWith(new ErrorState(ERR_CONNECT_FAIL, Http::scBadGateway, request.getRaw(), al)); } @@ -360,50 +365,64 @@ Http::Tunneler::bailWith(ErrorState *error) Must(error); answer().squidError = error; - if (const auto p = connection->getPeer()) - peerConnectFailed(p); + if (const auto failingConnection = connection) { + // TODO: Reuse to-peer connections after a CONNECT error response. + countFailingConnection(); + disconnect(); + failingConnection->close(); + } callBack(); - disconnect(); - - if (noteFwdPconnUse) - fwdPconnPool->noteUses(fd_table[connection->fd].pconn.uses); - // TODO: Reuse to-peer connections after a CONNECT error response. - connection->close(); - connection = nullptr; } void Http::Tunneler::sendSuccess() { assert(answer().positive()); - callBack(); + assert(Comm::IsConnOpen(connection)); + answer().conn = connection; disconnect(); + callBack(); +} + +void +Http::Tunneler::countFailingConnection() +{ + assert(connection); + if (const auto p = connection->getPeer()) + peerConnectFailed(p); + if (noteFwdPconnUse && connection->isOpen()) + fwdPconnPool->noteUses(fd_table[connection->fd].pconn.uses); } void Http::Tunneler::disconnect() { + const auto stillOpen = Comm::IsConnOpen(connection); + if (closer) { - comm_remove_close_handler(connection->fd, closer); + if (stillOpen) + comm_remove_close_handler(connection->fd, closer); closer = nullptr; } if (reader) { - Comm::ReadCancel(connection->fd, reader); + if (stillOpen) + Comm::ReadCancel(connection->fd, reader); reader = nullptr; } - // remove connection timeout handler - commUnsetConnTimeout(connection); + if (stillOpen) + commUnsetConnTimeout(connection); + + connection = nullptr; // may still be open } void Http::Tunneler::callBack() { - debugs(83, 5, connection << status()); - if (answer().positive()) - answer().conn = connection; + debugs(83, 5, answer().conn << status()); + assert(!connection); // returned inside answer() or gone auto cb = callback; callback = nullptr; ScheduleCallHere(cb); @@ -415,11 +434,10 @@ Http::Tunneler::swanSong() AsyncJob::swanSong(); if (callback) { - if (requestWritten && tunnelEstablished) { + if (requestWritten && tunnelEstablished && Comm::IsConnOpen(connection)) { sendSuccess(); } else { - // we should have bailed when we discovered the job-killing problem - debugs(83, DBG_IMPORTANT, "BUG: Unexpected state while establishing a CONNECT tunnel " << connection << status()); + // job-ending emergencies like handleStopRequest() or callException() bailWith(new ErrorState(ERR_GATEWAY_FAILURE, Http::scInternalServerError, request.getRaw(), al)); } assert(!callback); diff --git a/src/clients/HttpTunneler.h b/src/clients/HttpTunneler.h index c4c096fe7c..e88d17610f 100644 --- a/src/clients/HttpTunneler.h +++ b/src/clients/HttpTunneler.h @@ -87,6 +87,7 @@ protected: void handleResponse(const bool eof); void bailOnResponseError(const char *error, HttpReply *); +private: /// sends the given error to the initiator void bailWith(ErrorState*); @@ -96,12 +97,14 @@ protected: /// a bailWith(), sendSuccess() helper: sends results to the initiator void callBack(); - /// a bailWith(), sendSuccess() helper: stops monitoring the connection + /// stops monitoring the connection void disconnect(); + /// updates connection usage history before the connection is closed + void countFailingConnection(); + TunnelerAnswer &answer(); -private: AsyncCall::Pointer writer; ///< called when the request has been written AsyncCall::Pointer reader; ///< called when the response should be read AsyncCall::Pointer closer; ///< called when the connection is being closed diff --git a/src/clients/forward.h b/src/clients/forward.h index 0351557dac..82e5eb9bd2 100644 --- a/src/clients/forward.h +++ b/src/clients/forward.h @@ -28,10 +28,10 @@ namespace Ftp { /// A new FTP Gateway job -AsyncJobPointer StartGateway(FwdState *const fwdState); +void StartGateway(FwdState *const fwdState); /// A new FTP Relay job -AsyncJobPointer StartRelay(FwdState *const fwdState); +void StartRelay(FwdState *const fwdState); /** Construct an URI with leading / in PATH portion for use by CWD command * possibly others. FTP encodes absolute paths as beginning with '/' diff --git a/src/comm.cc b/src/comm.cc index 54ba162716..a0913f3245 100644 --- a/src/comm.cc +++ b/src/comm.cc @@ -743,6 +743,10 @@ commCallCloseHandlers(int fd) // If call is not canceled schedule it for execution else ignore it if (!call->canceled()) { debugs(5, 5, "commCallCloseHandlers: ch->handler=" << call); + // XXX: Without the following code, callback fd may be -1. + // typedef CommCloseCbParams Params; + // auto ¶ms = GetCommParams(call); + // params.fd = fd; ScheduleCallHere(call); } } @@ -1787,6 +1791,10 @@ DeferredReadManager::CloseHandler(const CommCloseCbParams ¶ms) CbDataList *temp = (CbDataList *)params.data; temp->element.closer = NULL; + if (temp->element.theRead.conn) { + temp->element.theRead.conn->noteClosure(); + temp->element.theRead.conn = nullptr; + } temp->element.markCancelled(); } @@ -1860,6 +1868,11 @@ DeferredReadManager::kickARead(DeferredRead const &aRead) if (aRead.cancelled) return; + // TODO: This check still allows theReader call with a closed theRead.conn. + // If a delayRead() caller has a close connection handler, then such a call + // would be useless and dangerous. If a delayRead() caller does not have it, + // then the caller will get stuck when an external connection closure makes + // aRead.cancelled (checked above) true. if (Comm::IsConnOpen(aRead.theRead.conn) && fd_table[aRead.theRead.conn->fd].closing()) return; diff --git a/src/comm/ConnOpener.cc b/src/comm/ConnOpener.cc index dc376b7781..19c1237aea 100644 --- a/src/comm/ConnOpener.cc +++ b/src/comm/ConnOpener.cc @@ -41,6 +41,14 @@ Comm::ConnOpener::ConnOpener(const Comm::ConnectionPointer &c, const AsyncCall:: deadline_(squid_curtime + static_cast(ctimeout)) { debugs(5, 3, "will connect to " << c << " with " << ctimeout << " timeout"); + assert(conn_); // we know where to go + + // Sharing a being-modified Connection object with the caller is dangerous, + // but we cannot ban (or even check for) that using existing APIs. We do not + // want to clone "just in case" because cloning is a bit expensive, and most + // callers already have a non-owned Connection object to give us. Until the + // APIs improve, we can only check that the connection is not open. + assert(!conn_->isOpen()); } Comm::ConnOpener::~ConnOpener() @@ -78,6 +86,10 @@ Comm::ConnOpener::swanSong() if (temporaryFd_ >= 0) closeFd(); + // did we abort while owning an open connection? + if (conn_ && conn_->isOpen()) + conn_->close(); + // did we abort while waiting between retries? if (calls_.sleep_) cancelSleep(); @@ -131,9 +143,18 @@ Comm::ConnOpener::sendAnswer(Comm::Flag errFlag, int xerrno, const char *why) " [" << callback_->id << ']' ); // TODO save the pconn to the pconnPool ? } else { + assert(conn_); + + // free resources earlier and simplify recipients + if (errFlag != Comm::OK) + conn_->close(); // may not be opened + else + assert(conn_->isOpen()); + typedef CommConnectCbParams Params; Params ¶ms = GetCommParams(callback_); params.conn = conn_; + conn_ = nullptr; // release ownership; prevent closure by us params.flag = errFlag; params.xerrno = xerrno; ScheduleCallHere(callback_); @@ -152,7 +173,7 @@ Comm::ConnOpener::sendAnswer(Comm::Flag errFlag, int xerrno, const char *why) void Comm::ConnOpener::cleanFd() { - debugs(5, 4, HERE << conn_ << " closing temp FD " << temporaryFd_); + debugs(5, 4, conn_ << "; temp FD " << temporaryFd_); Must(temporaryFd_ >= 0); fde &f = fd_table[temporaryFd_]; @@ -258,6 +279,7 @@ bool Comm::ConnOpener::createFd() { Must(temporaryFd_ < 0); + assert(conn_); // our initators signal abort by cancelling their callbacks if (callback_ == NULL || callback_->canceled()) diff --git a/src/comm/ConnOpener.h b/src/comm/ConnOpener.h index 329d8a3c8e..e1e60649dc 100644 --- a/src/comm/ConnOpener.h +++ b/src/comm/ConnOpener.h @@ -19,16 +19,13 @@ namespace Comm { -/** - * Async-opener of a Comm connection. - */ +/// Asynchronously opens a TCP connection. Returns CommConnectCbParams: either +/// Comm::OK with an open connection or another Comm::Flag with a closed one. class ConnOpener : public AsyncJob { CBDATA_CLASS(ConnOpener); public: - void noteAbort() { mustStop("externally aborted"); } - typedef CbcPointer Pointer; virtual bool doneAll() const; diff --git a/src/comm/Connection.cc b/src/comm/Connection.cc index 4e9719a01a..9bc08da057 100644 --- a/src/comm/Connection.cc +++ b/src/comm/Connection.cc @@ -7,6 +7,7 @@ */ #include "squid.h" +#include "base/JobWait.h" #include "CachePeer.h" #include "cbdata.h" #include "comm.h" @@ -60,26 +61,44 @@ Comm::Connection::~Connection() } Comm::ConnectionPointer -Comm::Connection::cloneDestinationDetails() const +Comm::Connection::cloneProfile() const { - const ConnectionPointer c = new Comm::Connection; - c->setAddrs(local, remote); - c->peerType = peerType; - c->flags = flags; - c->peer_ = cbdataReference(getPeer()); - assert(!c->isOpen()); - return c; -} + const ConnectionPointer clone = new Comm::Connection; + auto &c = *clone; // optimization + + /* + * Copy or excuse each data member. Excused members do not belong to a + * Connection configuration profile because their values cannot be reused + * across (co-existing) Connection objects and/or are tied to their own + * object lifetime. + */ + + c.setAddrs(local, remote); + c.peerType = peerType; + // fd excused + c.tos = tos; + c.nfmark = nfmark; + c.nfConnmark = nfConnmark; + // COMM_ORPHANED is not a part of connection opening instructions + c.flags = flags & ~COMM_ORPHANED; + // rfc931 is excused + +#if USE_SQUID_EUI + // These are currently only set when accepting connections and never used + // for establishing new ones, so this copying is currently in vain, but, + // technically, they can be a part of connection opening instructions. + c.remoteEui48 = remoteEui48; + c.remoteEui64 = remoteEui64; +#endif -Comm::ConnectionPointer -Comm::Connection::cloneIdentDetails() const -{ - auto c = cloneDestinationDetails(); - c->tos = tos; - c->nfmark = nfmark; - c->nfConnmark = nfConnmark; - c->startTime_ = startTime_; - return c; + // id excused + c.peer_ = cbdataReference(getPeer()); + // startTime_ excused + // tlsHistory excused + + debugs(5, 5, this << " made " << c); + assert(!c.isOpen()); + return clone; } void diff --git a/src/comm/Connection.h b/src/comm/Connection.h index 5b66943ba2..40c22491dc 100644 --- a/src/comm/Connection.h +++ b/src/comm/Connection.h @@ -77,13 +77,12 @@ public: /** Clear the connection properties and close any open socket. */ virtual ~Connection(); - /// Create a new (closed) IDENT Connection object based on our from-Squid - /// connection properties. - ConnectionPointer cloneIdentDetails() const; + /// To prevent accidental copying of Connection objects that we started to + /// open or that are open, use cloneProfile() instead. + Connection(const Connection &&) = delete; - /// Create a new (closed) Connection object pointing to the same destination - /// as this from-Squid connection. - ConnectionPointer cloneDestinationDetails() const; + /// Create a new closed Connection with the same configuration as this one. + ConnectionPointer cloneProfile() const; /// close the still-open connection when its last reference is gone void enterOrphanage() { flags |= COMM_ORPHANED; } @@ -140,17 +139,6 @@ public: virtual ScopedId codeContextGist() const override; virtual std::ostream &detailCodeContext(std::ostream &os) const override; -private: - /** These objects may not be exactly duplicated. Use cloneIdentDetails() or - * cloneDestinationDetails() instead. - */ - Connection(const Connection &c); - - /** These objects may not be exactly duplicated. Use cloneIdentDetails() or - * cloneDestinationDetails() instead. - */ - Connection & operator =(const Connection &c); - public: /** Address/Port for the Squid end of a TCP link. */ Ip::Address local; diff --git a/src/comm/TcpAcceptor.cc b/src/comm/TcpAcceptor.cc index 341622e0f8..510efa94f6 100644 --- a/src/comm/TcpAcceptor.cc +++ b/src/comm/TcpAcceptor.cc @@ -206,7 +206,10 @@ void Comm::TcpAcceptor::handleClosure(const CommCloseCbParams &) { closer_ = NULL; - conn = NULL; + if (conn) { + conn->noteClosure(); + conn = nullptr; + } Must(done()); } diff --git a/src/dns_internal.cc b/src/dns_internal.cc index e1f4d3ee7a..72078c64e1 100644 --- a/src/dns_internal.cc +++ b/src/dns_internal.cc @@ -872,6 +872,10 @@ static void idnsVCClosed(const CommCloseCbParams ¶ms) { nsvc * vc = (nsvc *)params.data; + if (vc->conn) { + vc->conn->noteClosure(); + vc->conn = nullptr; + } delete vc; } diff --git a/src/eui/Eui48.h b/src/eui/Eui48.h index 11f4e51b16..9aab5bbb7b 100644 --- a/src/eui/Eui48.h +++ b/src/eui/Eui48.h @@ -29,10 +29,8 @@ class Eui48 public: Eui48() { clear(); } - Eui48(const Eui48 &t) { memcpy(this, &t, sizeof(Eui48)); } bool operator== (const Eui48 &t) const { return memcmp(eui, t.eui, SZ_EUI48_BUF) == 0; } bool operator< (const Eui48 &t) const { return memcmp(eui, t.eui, SZ_EUI48_BUF) < 0; } - ~Eui48() {} const unsigned char *get(void); diff --git a/src/gopher.cc b/src/gopher.cc index 455220c2e9..576a3f7b14 100644 --- a/src/gopher.cc +++ b/src/gopher.cc @@ -98,11 +98,8 @@ public: entry->lock("gopherState"); *replybuf = 0; } - ~GopherStateData() {if(buf) swanSong();} - /* AsyncJob API emulated */ - void deleteThis(const char *aReason); - void swanSong(); + ~GopherStateData(); public: StoreEntry *entry; @@ -156,30 +153,18 @@ static void gopherStateFree(const CommCloseCbParams ¶ms) { GopherStateData *gopherState = (GopherStateData *)params.data; - - if (gopherState == NULL) - return; - - gopherState->deleteThis("gopherStateFree"); + // Assume that FwdState is monitoring and calls noteClosure(). See XXX about + // Connection sharing with FwdState in gopherStart(). + delete gopherState; } -void -GopherStateData::deleteThis(const char *) -{ - swanSong(); - delete this; -} - -void -GopherStateData::swanSong() +GopherStateData::~GopherStateData() { if (entry) entry->unlock("gopherState"); - if (buf) { + if (buf) memFree(buf, MEM_4K_BUF); - buf = nullptr; - } } /** @@ -986,6 +971,7 @@ gopherStart(FwdState * fwd) return; } + // XXX: Sharing open Connection with FwdState that has its own handlers/etc. gopherState->serverConn = fwd->serverConnection(); gopherSendRequest(fwd->serverConnection()->fd, gopherState); AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "gopherTimeout", diff --git a/src/ident/Ident.cc b/src/ident/Ident.cc index 50dc6baebe..3c3ae5e2f3 100644 --- a/src/ident/Ident.cc +++ b/src/ident/Ident.cc @@ -11,6 +11,7 @@ #include "squid.h" #if USE_IDENT +#include "base/JobWait.h" #include "comm.h" #include "comm/Connection.h" #include "comm/ConnOpener.h" @@ -53,8 +54,15 @@ public: Comm::ConnectionPointer conn; MemBuf queryMsg; ///< the lookup message sent to IDENT server - IdentClient *clients; + IdentClient *clients = nullptr; char buf[IDENT_BUFSIZE]; + + /// waits for a connection to the IDENT server to be established/opened + JobWait connWait; + +private: + // use deleteThis() to destroy + ~IdentStateData(); }; CBDATA_CLASS_INIT(IdentStateData); @@ -73,8 +81,9 @@ static void ClientAdd(IdentStateData * state, IDCB * callback, void *callback_da Ident::IdentConfig Ident::TheConfig; void -Ident::IdentStateData::deleteThis(const char *) +Ident::IdentStateData::deleteThis(const char *reason) { + debugs(30, 3, reason); swanSong(); delete this; } @@ -84,6 +93,10 @@ Ident::IdentStateData::swanSong() { if (clients != NULL) notify(NULL); +} + +Ident::IdentStateData::~IdentStateData() { + assert(!clients); if (Comm::IsConnOpen(conn)) { comm_remove_close_handler(conn->fd, Ident::Close, this); @@ -112,6 +125,10 @@ void Ident::Close(const CommCloseCbParams ¶ms) { IdentStateData *state = (IdentStateData *)params.data; + if (state->conn) { + state->conn->noteClosure(); + state->conn = nullptr; + } state->deleteThis("connection closed"); } @@ -127,6 +144,16 @@ void Ident::ConnectDone(const Comm::ConnectionPointer &conn, Comm::Flag status, int, void *data) { IdentStateData *state = (IdentStateData *)data; + state->connWait.finish(); + + // Start owning the supplied connection (so that it is not orphaned if this + // function bails early). As a (tiny) optimization or perhaps just diff + // minimization, the close handler is added later, when we know we are not + // bailing. This delay is safe because comm_remove_close_handler() forgives + // missing handlers. + assert(conn); // but may be closed + assert(!state->conn); + state->conn = conn; if (status != Comm::OK) { if (status == Comm::TIMEOUT) @@ -149,8 +176,8 @@ Ident::ConnectDone(const Comm::ConnectionPointer &conn, Comm::Flag status, int, return; } - assert(conn != NULL && conn == state->conn); - comm_add_close_handler(conn->fd, Ident::Close, state); + assert(state->conn->isOpen()); + comm_add_close_handler(state->conn->fd, Ident::Close, state); AsyncCall::Pointer writeCall = commCbCall(5,4, "Ident::WriteFeedback", CommIoCbPtrFun(Ident::WriteFeedback, state)); @@ -259,10 +286,10 @@ Ident::Start(const Comm::ConnectionPointer &conn, IDCB * callback, void *data) state->hash.key = xstrdup(key); // copy the conn details. We do not want the original FD to be re-used by IDENT. - state->conn = conn->cloneIdentDetails(); + const auto identConn = conn->cloneProfile(); // NP: use random port for secure outbound to IDENT_PORT - state->conn->local.port(0); - state->conn->remote.port(IDENT_PORT); + identConn->local.port(0); + identConn->remote.port(IDENT_PORT); // build our query from the original connection details state->queryMsg.init(); @@ -272,7 +299,8 @@ Ident::Start(const Comm::ConnectionPointer &conn, IDCB * callback, void *data) hash_join(ident_hash, &state->hash); AsyncCall::Pointer call = commCbCall(30,3, "Ident::ConnectDone", CommConnectCbPtrFun(Ident::ConnectDone, state)); - AsyncJob::Start(new Comm::ConnOpener(state->conn, call, Ident::TheConfig.timeout)); + const auto connOpener = new Comm::ConnOpener(identConn, call, Ident::TheConfig.timeout); + state->connWait.start(connOpener, call); } void diff --git a/src/log/TcpLogger.cc b/src/log/TcpLogger.cc index 2be2f6aeaf..3be1cc5db3 100644 --- a/src/log/TcpLogger.cc +++ b/src/log/TcpLogger.cc @@ -256,13 +256,16 @@ Log::TcpLogger::doConnect() typedef CommCbMemFunT Dialer; AsyncCall::Pointer call = JobCallback(MY_DEBUG_SECTION, 5, Dialer, this, Log::TcpLogger::connectDone); - AsyncJob::Start(new Comm::ConnOpener(futureConn, call, 2)); + const auto cs = new Comm::ConnOpener(futureConn, call, 2); + connWait.start(cs, call); } /// Comm::ConnOpener callback void Log::TcpLogger::connectDone(const CommConnectCbParams ¶ms) { + connWait.finish(); + if (params.flag != Comm::OK) { const double delay = 0.5; // seconds if (connectFailures++ % 100 == 0) { @@ -367,7 +370,10 @@ Log::TcpLogger::handleClosure(const CommCloseCbParams &) { assert(inCall != NULL); closer = NULL; - conn = NULL; + if (conn) { + conn->noteClosure(); + conn = nullptr; + } // in all current use cases, we should not try to reconnect mustStop("Log::TcpLogger::handleClosure"); } diff --git a/src/log/TcpLogger.h b/src/log/TcpLogger.h index 1be2113fef..ec7ca5f7b1 100644 --- a/src/log/TcpLogger.h +++ b/src/log/TcpLogger.h @@ -10,6 +10,8 @@ #define _SQUID_SRC_LOG_TCPLOGGER_H #include "base/AsyncJob.h" +#include "base/JobWait.h" +#include "comm/forward.h" #include "ip/Address.h" #include @@ -103,6 +105,9 @@ private: Ip::Address remote; ///< where the remote logger expects our records AsyncCall::Pointer closer; ///< handles unexpected/external conn closures + /// waits for a connection to the remote logger to be established/opened + JobWait connWait; + uint64_t connectFailures; ///< number of sequential connection failures uint64_t drops; ///< number of records dropped during the current outage }; diff --git a/src/mgr/Forwarder.cc b/src/mgr/Forwarder.cc index 8edba1d783..e4da4ca088 100644 --- a/src/mgr/Forwarder.cc +++ b/src/mgr/Forwarder.cc @@ -100,7 +100,11 @@ void Mgr::Forwarder::noteCommClosed(const CommCloseCbParams &) { debugs(16, 5, HERE); - conn = NULL; // needed? + closer = nullptr; + if (conn) { + conn->noteClosure(); + conn = nullptr; + } mustStop("commClosed"); } diff --git a/src/mgr/Inquirer.cc b/src/mgr/Inquirer.cc index ba41d7aa6e..dea0452de0 100644 --- a/src/mgr/Inquirer.cc +++ b/src/mgr/Inquirer.cc @@ -107,11 +107,14 @@ Mgr::Inquirer::noteWroteHeader(const CommIoCbParams& params) /// called when the HTTP client or some external force closed our socket void -Mgr::Inquirer::noteCommClosed(const CommCloseCbParams& params) +Mgr::Inquirer::noteCommClosed(const CommCloseCbParams &) { debugs(16, 5, HERE); - Must(!Comm::IsConnOpen(conn) && params.conn.getRaw() == conn.getRaw()); - conn = NULL; + closer = nullptr; + if (conn) { + conn->noteClosure(); + conn = nullptr; + } mustStop("commClosed"); } diff --git a/src/mgr/StoreToCommWriter.cc b/src/mgr/StoreToCommWriter.cc index e2cccec2f5..d3cf738888 100644 --- a/src/mgr/StoreToCommWriter.cc +++ b/src/mgr/StoreToCommWriter.cc @@ -131,7 +131,11 @@ void Mgr::StoreToCommWriter::noteCommClosed(const CommCloseCbParams &) { debugs(16, 6, HERE); - Must(!Comm::IsConnOpen(clientConnection)); + if (clientConnection) { + clientConnection->noteClosure(); + clientConnection = nullptr; + } + closer = nullptr; mustStop("commClosed"); } diff --git a/src/security/PeerConnector.cc b/src/security/PeerConnector.cc index 4c1401f6e8..494edabb84 100644 --- a/src/security/PeerConnector.cc +++ b/src/security/PeerConnector.cc @@ -89,9 +89,15 @@ Security::PeerConnector::start() void Security::PeerConnector::commCloseHandler(const CommCloseCbParams ¶ms) { + debugs(83, 5, "FD " << params.fd << ", Security::PeerConnector=" << params.data); + closeHandler = nullptr; + if (serverConn) { + countFailingConnection(); + serverConn->noteClosure(); + serverConn = nullptr; + } - debugs(83, 5, "FD " << params.fd << ", Security::PeerConnector=" << params.data); const auto err = new ErrorState(ERR_SECURE_CONNECT_FAIL, Http::scServiceUnavailable, request.getRaw(), al); static const auto d = MakeNamedErrorDetail("TLS_CONNECT_CLOSE"); err->detailError(d); @@ -111,6 +117,8 @@ Security::PeerConnector::commTimeoutHandler(const CommTimeoutCbParams &) bool Security::PeerConnector::initialize(Security::SessionPointer &serverSession) { + Must(Comm::IsConnOpen(serverConnection())); + Security::ContextPointer ctx(getTlsContext()); debugs(83, 5, serverConnection() << ", ctx=" << (void*)ctx.get()); @@ -162,6 +170,8 @@ Security::PeerConnector::initialize(Security::SessionPointer &serverSession) void Security::PeerConnector::recordNegotiationDetails() { + Must(Comm::IsConnOpen(serverConnection())); + const int fd = serverConnection()->fd; Security::SessionPointer session(fd_table[fd].ssl); @@ -180,6 +190,8 @@ Security::PeerConnector::recordNegotiationDetails() void Security::PeerConnector::negotiate() { + Must(Comm::IsConnOpen(serverConnection())); + const int fd = serverConnection()->fd; if (fd_table[fd].closing()) return; @@ -224,7 +236,7 @@ Security::PeerConnector::handleNegotiationResult(const Security::IoResult &resul switch (result.category) { case Security::IoResult::ioSuccess: recordNegotiationDetails(); - if (sslFinalized()) + if (sslFinalized() && callback) sendSuccess(); return; // we may be gone by now @@ -252,6 +264,7 @@ Security::PeerConnector::sslFinalized() { #if USE_OPENSSL if (Ssl::TheConfig.ssl_crt_validator && useCertValidator_) { + Must(Comm::IsConnOpen(serverConnection())); const int fd = serverConnection()->fd; Security::SessionPointer session(fd_table[fd].ssl); @@ -295,6 +308,7 @@ void Security::PeerConnector::sslCrtvdHandleReply(Ssl::CertValidationResponse::Pointer validationResponse) { Must(validationResponse != NULL); + Must(Comm::IsConnOpen(serverConnection())); ErrorDetail::Pointer errDetails; bool validatorFailed = false; @@ -317,7 +331,8 @@ Security::PeerConnector::sslCrtvdHandleReply(Ssl::CertValidationResponse::Pointe if (!errDetails && !validatorFailed) { noteNegotiationDone(NULL); - sendSuccess(); + if (callback) + sendSuccess(); return; } @@ -343,6 +358,8 @@ Security::PeerConnector::sslCrtvdHandleReply(Ssl::CertValidationResponse::Pointe Security::CertErrors * Security::PeerConnector::sslCrtvdCheckForErrors(Ssl::CertValidationResponse const &resp, ErrorDetail::Pointer &errDetails) { + Must(Comm::IsConnOpen(serverConnection())); + ACLFilledChecklist *check = NULL; Security::SessionPointer session(fd_table[serverConnection()->fd].ssl); @@ -418,9 +435,11 @@ Security::PeerConnector::negotiateSsl() void Security::PeerConnector::noteWantRead() { - const int fd = serverConnection()->fd; debugs(83, 5, serverConnection()); + Must(Comm::IsConnOpen(serverConnection())); + const int fd = serverConnection()->fd; + // read timeout to avoid getting stuck while reading from a silent server typedef CommCbMemFunT TimeoutDialer; AsyncCall::Pointer timeoutCall = JobCallback(83, 5, @@ -434,8 +453,10 @@ Security::PeerConnector::noteWantRead() void Security::PeerConnector::noteWantWrite() { - const int fd = serverConnection()->fd; debugs(83, 5, serverConnection()); + Must(Comm::IsConnOpen(serverConnection())); + + const int fd = serverConnection()->fd; Comm::SetSelect(fd, COMM_SELECT_WRITE, &NegotiateSsl, new Pointer(this), 0); return; } @@ -452,57 +473,76 @@ Security::PeerConnector::noteNegotiationError(const Security::ErrorDetailPointer bail(anErr); } +Security::EncryptorAnswer & +Security::PeerConnector::answer() +{ + assert(callback); + const auto dialer = dynamic_cast(callback->getDialer()); + assert(dialer); + return dialer->answer(); +} + void Security::PeerConnector::bail(ErrorState *error) { Must(error); // or the recepient will not know there was a problem - Must(callback != NULL); - CbDialer *dialer = dynamic_cast(callback->getDialer()); - Must(dialer); - dialer->answer().error = error; + answer().error = error; - if (const auto p = serverConnection()->getPeer()) - peerConnectFailed(p); + if (const auto failingConnection = serverConn) { + countFailingConnection(); + disconnect(); + failingConnection->close(); + } callBack(); - disconnect(); - - if (noteFwdPconnUse) - fwdPconnPool->noteUses(fd_table[serverConn->fd].pconn.uses); - serverConn->close(); - serverConn = nullptr; } void Security::PeerConnector::sendSuccess() { - callBack(); + assert(Comm::IsConnOpen(serverConn)); + answer().conn = serverConn; disconnect(); + callBack(); +} + +void +Security::PeerConnector::countFailingConnection() +{ + assert(serverConn); + if (const auto p = serverConn->getPeer()) + peerConnectFailed(p); + // TODO: Calling PconnPool::noteUses() should not be our responsibility. + if (noteFwdPconnUse && serverConn->isOpen()) + fwdPconnPool->noteUses(fd_table[serverConn->fd].pconn.uses); } void Security::PeerConnector::disconnect() { + const auto stillOpen = Comm::IsConnOpen(serverConn); + if (closeHandler) { - comm_remove_close_handler(serverConnection()->fd, closeHandler); + if (stillOpen) + comm_remove_close_handler(serverConn->fd, closeHandler); closeHandler = nullptr; } - commUnsetConnTimeout(serverConnection()); + if (stillOpen) + commUnsetConnTimeout(serverConn); + + serverConn = nullptr; } void Security::PeerConnector::callBack() { - debugs(83, 5, "TLS setup ended for " << serverConnection()); + debugs(83, 5, "TLS setup ended for " << answer().conn); AsyncCall::Pointer cb = callback; // Do this now so that if we throw below, swanSong() assert that we _tried_ // to call back holds. callback = NULL; // this should make done() true - CbDialer *dialer = dynamic_cast(cb->getDialer()); - Must(dialer); - dialer->answer().conn = serverConnection(); ScheduleCallHere(cb); } @@ -511,8 +551,9 @@ Security::PeerConnector::swanSong() { // XXX: unregister fd-closure monitoring and CommSetSelect interest, if any AsyncJob::swanSong(); - if (callback != NULL) { // paranoid: we have left the caller waiting - debugs(83, DBG_IMPORTANT, "BUG: Unexpected state while connecting to a cache_peer or origin server"); + + if (callback) { + // job-ending emergencies like handleStopRequest() or callException() const auto anErr = new ErrorState(ERR_GATEWAY_FAILURE, Http::scInternalServerError, request.getRaw(), al); bail(anErr); assert(!callback); @@ -533,7 +574,7 @@ Security::PeerConnector::status() const buf.append("Stopped, reason:", 16); buf.appendf("%s",stopReason); } - if (serverConn != NULL) + if (Comm::IsConnOpen(serverConn)) buf.appendf(" FD %d", serverConn->fd); buf.appendf(" %s%u]", id.prefix(), id.value); buf.terminate(); @@ -581,15 +622,18 @@ Security::PeerConnector::startCertDownloading(SBuf &url) PeerConnectorCertDownloaderDialer(&Security::PeerConnector::certDownloadingDone, this)); const auto dl = new Downloader(url, certCallback, XactionInitiator::initCertFetcher, certDownloadNestingLevel() + 1); - AsyncJob::Start(dl); + certDownloadWait.start(dl, certCallback); } void Security::PeerConnector::certDownloadingDone(SBuf &obj, int downloadStatus) { + certDownloadWait.finish(); + ++certsDownloads; debugs(81, 5, "Certificate downloading status: " << downloadStatus << " certificate size: " << obj.length()); + Must(Comm::IsConnOpen(serverConnection())); const auto &sconn = *fd_table[serverConnection()->fd].ssl; // Parse Certificate. Assume that it is in DER format. @@ -642,6 +686,7 @@ Security::PeerConnector::certDownloadingDone(SBuf &obj, int downloadStatus) void Security::PeerConnector::handleMissingCertificates(const Security::IoResult &ioResult) { + Must(Comm::IsConnOpen(serverConnection())); auto &sconn = *fd_table[serverConnection()->fd].ssl; // We download the missing certificate(s) once. We would prefer to clear diff --git a/src/security/PeerConnector.h b/src/security/PeerConnector.h index 942f8627aa..10700d796b 100644 --- a/src/security/PeerConnector.h +++ b/src/security/PeerConnector.h @@ -12,6 +12,7 @@ #include "acl/Acl.h" #include "base/AsyncCbdataCalls.h" #include "base/AsyncJob.h" +#include "base/JobWait.h" #include "CommCalls.h" #include "http/forward.h" #include "security/EncryptorAnswer.h" @@ -24,6 +25,7 @@ #include class ErrorState; +class Downloader; class AccessLogEntry; typedef RefCount AccessLogEntryPointer; @@ -152,6 +154,9 @@ protected: /// a bail(), sendSuccess() helper: stops monitoring the connection void disconnect(); + /// updates connection usage history before the connection is closed + void countFailingConnection(); + /// If called the certificates validator will not used void bypassCertValidator() {useCertValidator_ = false;} @@ -159,6 +164,9 @@ protected: /// logging void recordNegotiationDetails(); + /// convenience method to get to the answer fields + EncryptorAnswer &answer(); + HttpRequestPointer request; ///< peer connection trigger or cause Comm::ConnectionPointer serverConn; ///< TCP connection to the peer AccessLogEntryPointer al; ///< info for the future access.log entry @@ -203,6 +211,8 @@ private: /// outcome of the last (failed and) suspended negotiation attempt (or nil) Security::IoResultPointer suspendedError_; + + JobWait certDownloadWait; ///< waits for the missing certificate to be downloaded }; } // namespace Security diff --git a/src/security/forward.h b/src/security/forward.h index dce66e3978..26225aae01 100644 --- a/src/security/forward.h +++ b/src/security/forward.h @@ -172,6 +172,7 @@ class ParsedOptions {}; // we never parse/use TLS options in this case typedef long ParsedPortFlags; class PeerConnector; +class BlindPeerConnector; class PeerOptions; #if USE_OPENSSL diff --git a/src/servers/FtpServer.cc b/src/servers/FtpServer.cc index 53f822d2f9..e0a10b6b51 100644 --- a/src/servers/FtpServer.cc +++ b/src/servers/FtpServer.cc @@ -61,7 +61,7 @@ Ftp::Server::Server(const MasterXaction::Pointer &xact): dataConn(), uploadAvailSize(0), listener(), - connector(), + dataConnWait(), reader(), waitingForOrigin(false), originDataDownloadAbortedOnError(false) @@ -1676,11 +1676,11 @@ Ftp::Server::checkDataConnPre() // active transfer: open a data connection from Squid to client typedef CommCbMemFunT Dialer; - connector = JobCallback(17, 3, Dialer, this, Ftp::Server::connectedForData); - Comm::ConnOpener *cs = new Comm::ConnOpener(dataConn, connector, - Config.Timeout.connect); - AsyncJob::Start(cs); - return false; // ConnStateData::processFtpRequest waits handleConnectDone + AsyncCall::Pointer callback = JobCallback(17, 3, Dialer, this, Ftp::Server::connectedForData); + const auto cs = new Comm::ConnOpener(dataConn->cloneProfile(), callback, + Config.Timeout.connect); + dataConnWait.start(cs, callback); + return false; } /// Check that client data connection is ready for immediate I/O. @@ -1698,18 +1698,22 @@ Ftp::Server::checkDataConnPost() const void Ftp::Server::connectedForData(const CommConnectCbParams ¶ms) { - connector = NULL; + dataConnWait.finish(); if (params.flag != Comm::OK) { - /* it might have been a timeout with a partially open link */ - if (params.conn != NULL) - params.conn->close(); setReply(425, "Cannot open data connection."); Http::StreamPointer context = pipeline.front(); Must(context->http); Must(context->http->storeEntry() != NULL); + // TODO: call closeDataConnection() to reset data conn processing? } else { - Must(dataConn == params.conn); + // Finalize the details and start owning the supplied connection. + assert(params.conn); + assert(dataConn); + assert(!dataConn->isOpen()); + dataConn = params.conn; + // XXX: Missing comm_add_close_handler() to track external closures. + Must(Comm::IsConnOpen(params.conn)); fd_note(params.conn->fd, "active client ftp data"); } diff --git a/src/servers/FtpServer.h b/src/servers/FtpServer.h index fb9ef9081d..d677998272 100644 --- a/src/servers/FtpServer.h +++ b/src/servers/FtpServer.h @@ -11,8 +11,10 @@ #ifndef SQUID_SERVERS_FTP_SERVER_H #define SQUID_SERVERS_FTP_SERVER_H +#include "base/JobWait.h" #include "base/Lock.h" #include "client_side.h" +#include "comm/forward.h" namespace Ftp { @@ -188,7 +190,11 @@ private: size_t uploadAvailSize; ///< number of yet unused uploadBuf bytes AsyncCall::Pointer listener; ///< set when we are passively listening - AsyncCall::Pointer connector; ///< set when we are actively connecting + + /// Waits for an FTP data connection to the client to be established/opened. + /// This wait only happens in FTP active mode (via PORT or EPRT). + JobWait dataConnWait; + AsyncCall::Pointer reader; ///< set when we are reading FTP data /// whether we wait for the origin data transfer to end diff --git a/src/snmp/Forwarder.cc b/src/snmp/Forwarder.cc index 836eb57721..259bb0441b 100644 --- a/src/snmp/Forwarder.cc +++ b/src/snmp/Forwarder.cc @@ -53,6 +53,7 @@ Snmp::Forwarder::noteCommClosed(const CommCloseCbParams& params) { debugs(49, 5, HERE); Must(fd == params.fd); + closer = nullptr; fd = -1; mustStop("commClosed"); } @@ -68,8 +69,7 @@ void Snmp::Forwarder::handleException(const std::exception& e) { debugs(49, 3, HERE << e.what()); - if (fd >= 0) - sendError(SNMP_ERR_GENERR); + sendError(SNMP_ERR_GENERR); Ipc::Forwarder::handleException(e); } @@ -78,6 +78,10 @@ void Snmp::Forwarder::sendError(int error) { debugs(49, 3, HERE); + + if (fd < 0) + return; // client gone + Snmp::Request& req = static_cast(*request); req.pdu.command = SNMP_PDU_RESPONSE; req.pdu.errstat = error; diff --git a/src/snmp/Inquirer.cc b/src/snmp/Inquirer.cc index 9b6e34405e..82f8c44752 100644 --- a/src/snmp/Inquirer.cc +++ b/src/snmp/Inquirer.cc @@ -88,7 +88,11 @@ Snmp::Inquirer::noteCommClosed(const CommCloseCbParams& params) { debugs(49, 5, HERE); Must(!Comm::IsConnOpen(conn) || conn->fd == params.conn->fd); - conn = NULL; + closer = nullptr; + if (conn) { + conn->noteClosure(); + conn = nullptr; + } mustStop("commClosed"); } @@ -102,6 +106,10 @@ void Snmp::Inquirer::sendResponse() { debugs(49, 5, HERE); + + if (!Comm::IsConnOpen(conn)) + return; // client gone + aggrPdu.fixAggregate(); aggrPdu.command = SNMP_PDU_RESPONSE; u_char buffer[SNMP_REQUEST_SIZE]; diff --git a/src/ssl/PeekingPeerConnector.cc b/src/ssl/PeekingPeerConnector.cc index 9a4055355f..89e45435b7 100644 --- a/src/ssl/PeekingPeerConnector.cc +++ b/src/ssl/PeekingPeerConnector.cc @@ -27,18 +27,18 @@ CBDATA_NAMESPACED_CLASS_INIT(Ssl, PeekingPeerConnector); void switchToTunnel(HttpRequest *request, const Comm::ConnectionPointer &clientConn, const Comm::ConnectionPointer &srvConn, const SBuf &preReadServerData); void -Ssl::PeekingPeerConnector::cbCheckForPeekAndSpliceDone(Acl::Answer answer, void *data) +Ssl::PeekingPeerConnector::cbCheckForPeekAndSpliceDone(const Acl::Answer aclAnswer, void *data) { Ssl::PeekingPeerConnector *peerConnect = (Ssl::PeekingPeerConnector *) data; // Use job calls to add done() checks and other job logic/protections. - CallJobHere1(83, 7, CbcPointer(peerConnect), Ssl::PeekingPeerConnector, checkForPeekAndSpliceDone, answer); + CallJobHere1(83, 7, CbcPointer(peerConnect), Ssl::PeekingPeerConnector, checkForPeekAndSpliceDone, aclAnswer); } void -Ssl::PeekingPeerConnector::checkForPeekAndSpliceDone(Acl::Answer answer) +Ssl::PeekingPeerConnector::checkForPeekAndSpliceDone(const Acl::Answer aclAnswer) { - const Ssl::BumpMode finalAction = answer.allowed() ? - static_cast(answer.kind): + const Ssl::BumpMode finalAction = aclAnswer.allowed() ? + static_cast(aclAnswer.kind): checkForPeekAndSpliceGuess(); checkForPeekAndSpliceMatched(finalAction); } @@ -106,10 +106,8 @@ Ssl::PeekingPeerConnector::checkForPeekAndSpliceMatched(const Ssl::BumpMode acti splice = true; // Ssl Negotiation stops here. Last SSL checks for valid certificates // and if done, switch to tunnel mode - if (sslFinalized()) { - debugs(83,5, "Abort NegotiateSSL on FD " << serverConn->fd << " and splice the connection"); + if (sslFinalized() && callback) callBack(); - } } } @@ -272,8 +270,11 @@ Ssl::PeekingPeerConnector::startTunneling() auto b = SSL_get_rbio(session.get()); auto srvBio = static_cast(BIO_get_data(b)); + debugs(83, 5, "will tunnel instead of negotiating TLS"); switchToTunnel(request.getRaw(), clientConn, serverConn, srvBio->rBufData()); - tunnelInsteadOfNegotiating(); + answer().tunneled = true; + disconnect(); + callBack(); } void @@ -397,13 +398,3 @@ Ssl::PeekingPeerConnector::serverCertificateVerified() } } -void -Ssl::PeekingPeerConnector::tunnelInsteadOfNegotiating() -{ - Must(callback != NULL); - CbDialer *dialer = dynamic_cast(callback->getDialer()); - Must(dialer); - dialer->answer().tunneled = true; - debugs(83, 5, "The SSL negotiation with server aborted"); -} - diff --git a/src/ssl/PeekingPeerConnector.h b/src/ssl/PeekingPeerConnector.h index 3c86b887de..0145e77f93 100644 --- a/src/ssl/PeekingPeerConnector.h +++ b/src/ssl/PeekingPeerConnector.h @@ -51,7 +51,7 @@ public: void checkForPeekAndSplice(); /// Callback function for ssl_bump acl check in step3 SSL bump step. - void checkForPeekAndSpliceDone(Acl::Answer answer); + void checkForPeekAndSpliceDone(Acl::Answer); /// Handles the final bumping decision. void checkForPeekAndSpliceMatched(const Ssl::BumpMode finalMode); @@ -67,7 +67,7 @@ public: void startTunneling(); /// A wrapper function for checkForPeekAndSpliceDone for use with acl - static void cbCheckForPeekAndSpliceDone(Acl::Answer answer, void *data); + static void cbCheckForPeekAndSpliceDone(Acl::Answer, void *data); private: diff --git a/src/tests/stub_libcomm.cc b/src/tests/stub_libcomm.cc index 45aa6deb2e..8fdb40bbde 100644 --- a/src/tests/stub_libcomm.cc +++ b/src/tests/stub_libcomm.cc @@ -22,9 +22,9 @@ void Comm::AcceptLimiter::kick() STUB #include "comm/Connection.h" Comm::Connection::Connection() STUB Comm::Connection::~Connection() STUB -Comm::ConnectionPointer Comm::Connection::cloneIdentDetails() const STUB_RETVAL(nullptr) -Comm::ConnectionPointer Comm::Connection::cloneDestinationDetails() const STUB_RETVAL(nullptr) +Comm::ConnectionPointer Comm::Connection::cloneProfile() const STUB_RETVAL(nullptr) void Comm::Connection::close() STUB +void Comm::Connection::noteClosure() STUB CachePeer * Comm::Connection::getPeer() const STUB_RETVAL(NULL) void Comm::Connection::setPeer(CachePeer * p) STUB ScopedId Comm::Connection::codeContextGist() const STUB_RETVAL(id.detach()) diff --git a/src/tests/stub_libsecurity.cc b/src/tests/stub_libsecurity.cc index aa9cf0b364..176cf337e7 100644 --- a/src/tests/stub_libsecurity.cc +++ b/src/tests/stub_libsecurity.cc @@ -9,6 +9,7 @@ #include "squid.h" #include "AccessLogEntry.h" #include "comm/Connection.h" +#include "Downloader.h" #include "HttpRequest.h" #define STUB_API "security/libsecurity.la" @@ -88,7 +89,9 @@ void PeerConnector::bail(ErrorState *) STUB void PeerConnector::sendSuccess() STUB void PeerConnector::callBack() STUB void PeerConnector::disconnect() STUB +void PeerConnector::countFailingConnection() STUB void PeerConnector::recordNegotiationDetails() STUB +EncryptorAnswer &PeerConnector::answer() STUB_RETREF(EncryptorAnswer) } #include "security/PeerOptions.h" diff --git a/src/tunnel.cc b/src/tunnel.cc index 386a0ecd61..4fc5abdeff 100644 --- a/src/tunnel.cc +++ b/src/tunnel.cc @@ -11,6 +11,7 @@ #include "squid.h" #include "acl/FilledChecklist.h" #include "base/CbcPointer.h" +#include "base/JobWait.h" #include "CachePeer.h" #include "cbdata.h" #include "client_side.h" @@ -180,9 +181,6 @@ public: SBuf preReadClientData; SBuf preReadServerData; time_t startTime; ///< object creation time, before any peer selection/connection attempts - /// Whether we are waiting for the CONNECT request/response exchange with the peer. - bool waitingForConnectExchange; - HappyConnOpenerPointer connOpener; ///< current connection opening job ResolvedPeersPointer destinations; ///< paths for forwarding the request bool destinationsFound; ///< At least one candidate path found /// whether another destination may be still attempted if the TCP connection @@ -191,10 +189,15 @@ public: // TODO: remove after fixing deferred reads in TunnelStateData::copyRead() CodeContext::Pointer codeContext; ///< our creator context - // AsyncCalls which we set and may need cancelling. - struct { - AsyncCall::Pointer connector; ///< a call linking us to the ConnOpener producing serverConn. - } calls; + /// waits for a transport connection to the peer to be established/opened + JobWait transportWait; + + /// waits for the established transport connection to be secured/encrypted + JobWait encryptionWait; + + /// waits for an HTTP CONNECT tunnel through a cache_peer to be negotiated + /// over the (encrypted, if needed) transport connection to that cache_peer + JobWait peerWait; void copyRead(Connection &from, IOCB *completion); @@ -212,12 +215,6 @@ public: /// when all candidate destinations have been tried and all have failed void noteConnection(HappyConnOpenerAnswer &); - /// whether we are waiting for HappyConnOpener - /// same as calls.connector but may differ from connOpener.valid() - bool opening() const { return connOpener.set(); } - - void cancelOpening(const char *reason); - /// Start using an established connection void connectDone(const Comm::ConnectionPointer &conn, const char *origin, const bool reused); @@ -266,6 +263,9 @@ private: /// \returns whether the request should be retried (nil) or the description why it should not const char *checkRetry(); + /// whether the successfully selected path destination or the established + /// server connection is still in use + bool usingDestination() const; /// details of the "last tunneling attempt" failure (if it failed) ErrorState *savedError = nullptr; @@ -275,6 +275,8 @@ private: void deleteThis(); + void cancelStep(const char *reason); + public: bool keepGoingAfterRead(size_t len, Comm::Flag errcode, int xerrno, Connection &from, Connection &to); void copy(size_t len, Connection &from, Connection &to, IOCB *); @@ -357,7 +359,6 @@ TunnelStateData::deleteThis() TunnelStateData::TunnelStateData(ClientHttpRequest *clientRequest) : startTime(squid_curtime), - waitingForConnectExchange(false), destinations(new ResolvedPeers()), destinationsFound(false), retriable(true), @@ -390,8 +391,7 @@ TunnelStateData::~TunnelStateData() debugs(26, 3, "TunnelStateData destructed this=" << this); assert(noConnections()); xfree(url); - if (opening()) - cancelOpening("~TunnelStateData"); + cancelStep("~TunnelStateData"); delete savedError; } @@ -904,7 +904,10 @@ TunnelStateData::copyServerBytes() static void tunnelStartShoveling(TunnelStateData *tunnelState) { - assert(!tunnelState->waitingForConnectExchange); + assert(!tunnelState->transportWait); + assert(!tunnelState->encryptionWait); + assert(!tunnelState->peerWait); + assert(tunnelState->server.conn); AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "tunnelTimeout", CommTimeoutCbPtrFun(tunnelTimeout, tunnelState)); @@ -962,6 +965,7 @@ tunnelConnectedWriteDone(const Comm::ConnectionPointer &conn, char *, size_t len void TunnelStateData::tunnelEstablishmentDone(Http::TunnelerAnswer &answer) { + peerWait.finish(); server.len = 0; if (logTag_ptr) @@ -970,13 +974,11 @@ TunnelStateData::tunnelEstablishmentDone(Http::TunnelerAnswer &answer) if (answer.peerResponseStatus != Http::scNone) *status_ptr = answer.peerResponseStatus; - waitingForConnectExchange = false; - auto sawProblem = false; if (!answer.positive()) { sawProblem = true; - Must(!Comm::IsConnOpen(answer.conn)); + assert(!answer.conn); } else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) { sawProblem = true; closePendingConnection(answer.conn, "conn was closed while waiting for tunnelEstablishmentDone"); @@ -1042,8 +1044,7 @@ tunnelErrorComplete(int fd/*const Comm::ConnectionPointer &*/, void *data, size_ void TunnelStateData::noteConnection(HappyConnOpener::Answer &answer) { - calls.connector = nullptr; - connOpener.clear(); + transportWait.finish(); ErrorState *error = nullptr; if ((error = answer.error.get())) { @@ -1167,7 +1168,7 @@ TunnelStateData::secureConnectionToPeer(const Comm::ConnectionPointer &conn) AsyncCall::Pointer callback = asyncCall(5,4, "TunnelStateData::noteSecurityPeerConnectorAnswer", MyAnswerDialer(&TunnelStateData::noteSecurityPeerConnectorAnswer, this)); const auto connector = new Security::BlindPeerConnector(request, conn, callback, al); - AsyncJob::Start(connector); // will call our callback + encryptionWait.start(connector, callback); } /// starts a preparation step for an established connection; retries on failures @@ -1194,9 +1195,12 @@ TunnelStateData::advanceDestination(const char *stepDescription, const Comm::Con void TunnelStateData::noteSecurityPeerConnectorAnswer(Security::EncryptorAnswer &answer) { + encryptionWait.finish(); + ErrorState *error = nullptr; + assert(!answer.tunneled); if ((error = answer.error.get())) { - Must(!Comm::IsConnOpen(answer.conn)); + assert(!answer.conn); answer.error.clear(); } else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) { error = new ErrorState(ERR_CANNOT_FORWARD, Http::scServiceUnavailable, request.getRaw(), al); @@ -1223,8 +1227,6 @@ TunnelStateData::connectedToPeer(const Comm::ConnectionPointer &conn) void TunnelStateData::establishTunnelThruProxy(const Comm::ConnectionPointer &conn) { - assert(!waitingForConnectExchange); - AsyncCall::Pointer callback = asyncCall(5,4, "TunnelStateData::tunnelEstablishmentDone", Http::Tunneler::CbDialer(&TunnelStateData::tunnelEstablishmentDone, this)); @@ -1232,9 +1234,7 @@ TunnelStateData::establishTunnelThruProxy(const Comm::ConnectionPointer &conn) #if USE_DELAY_POOLS tunneler->setDelayId(server.delayId); #endif - AsyncJob::Start(tunneler); - waitingForConnectExchange = true; - // and wait for the tunnelEstablishmentDone() call + peerWait.start(tunneler, callback); } void @@ -1252,14 +1252,14 @@ TunnelStateData::noteDestination(Comm::ConnectionPointer path) destinations->addPath(path); - if (Comm::IsConnOpen(server.conn)) { + if (usingDestination()) { // We are already using a previously opened connection but also // receiving destinations in case we need to re-forward. - Must(!opening()); + Must(!transportWait); return; } - if (opening()) { + if (transportWait) { notifyConnOpener(); return; // and continue to wait for tunnelConnectDone() callback } @@ -1289,17 +1289,23 @@ TunnelStateData::noteDestinationsEnd(ErrorState *selectionError) // if all of them fail, tunneling as whole will fail Must(!selectionError); // finding at least one path means selection succeeded - if (Comm::IsConnOpen(server.conn)) { + if (usingDestination()) { // We are already using a previously opened connection but also // receiving destinations in case we need to re-forward. - Must(!opening()); + Must(!transportWait); return; } - Must(opening()); // or we would be stuck with nothing to do or wait for + Must(transportWait); // or we would be stuck with nothing to do or wait for notifyConnOpener(); } +bool +TunnelStateData::usingDestination() const +{ + return encryptionWait || peerWait || Comm::IsConnOpen(server.conn); +} + /// remembers an error to be used if there will be no more connection attempts void TunnelStateData::saveError(ErrorState *error) @@ -1320,8 +1326,7 @@ TunnelStateData::sendError(ErrorState *finalError, const char *reason) if (request) request->hier.stopPeerClock(false); - if (opening()) - cancelOpening(reason); + cancelStep(reason); assert(finalError); @@ -1339,18 +1344,15 @@ TunnelStateData::sendError(ErrorState *finalError, const char *reason) errorSend(client.conn, finalError); } -/// Notify connOpener that we no longer need connections. We do not have to do -/// this -- connOpener would eventually notice on its own, but notifying reduces -/// waste and speeds up spare connection opening for other transactions (that -/// could otherwise wait for this transaction to use its spare allowance). +/// Notify a pending subtask, if any, that we no longer need its help. We do not +/// have to do this -- the subtask job will eventually end -- but ending it +/// earlier reduces waste and may reduce DoS attack surface. void -TunnelStateData::cancelOpening(const char *reason) +TunnelStateData::cancelStep(const char *reason) { - assert(calls.connector); - calls.connector->cancel(reason); - calls.connector = nullptr; - notifyConnOpener(); - connOpener.clear(); + transportWait.cancel(reason); + encryptionWait.cancel(reason); + peerWait.cancel(reason); } void @@ -1360,15 +1362,14 @@ TunnelStateData::startConnecting() request->hier.startPeerClock(); assert(!destinations->empty()); - assert(!opening()); - calls.connector = asyncCall(17, 5, "TunnelStateData::noteConnection", HappyConnOpener::CbDialer(&TunnelStateData::noteConnection, this)); - const auto cs = new HappyConnOpener(destinations, calls.connector, request, startTime, 0, al); + assert(!usingDestination()); + AsyncCall::Pointer callback = asyncCall(17, 5, "TunnelStateData::noteConnection", HappyConnOpener::CbDialer(&TunnelStateData::noteConnection, this)); + const auto cs = new HappyConnOpener(destinations, callback, request, startTime, 0, al); cs->setHost(request->url.host()); cs->setRetriable(false); cs->allowPersistent(false); destinations->notificationPending = true; // start() is async - connOpener = cs; - AsyncJob::Start(cs); + transportWait.start(cs, callback); } /// send request on an existing connection dedicated to the requesting client @@ -1417,7 +1418,7 @@ TunnelStateData::Connection::setDelayId(DelayId const &newDelay) #endif -/// makes sure connOpener knows that destinations have changed +/// makes sure connection opener knows that the destinations have changed void TunnelStateData::notifyConnOpener() { @@ -1425,7 +1426,7 @@ TunnelStateData::notifyConnOpener() debugs(17, 7, "reusing pending notification"); } else { destinations->notificationPending = true; - CallJobHere(17, 5, connOpener, HappyConnOpener, noteCandidatesChange); + CallJobHere(17, 5, transportWait.job(), HappyConnOpener, noteCandidatesChange); } } diff --git a/src/whois.cc b/src/whois.cc index 38dfbacdee..306ea5c622 100644 --- a/src/whois.cc +++ b/src/whois.cc @@ -182,6 +182,7 @@ whoisClose(const CommCloseCbParams ¶ms) { WhoisState *p = (WhoisState *)params.data; debugs(75, 3, "whoisClose: FD " << params.fd); + // We do not own a Connection. Assume that FwdState is also monitoring. p->entry->unlock("whoisClose"); delete p; } -- 2.47.2