* Bug 5055: FATAL FwdState::noteDestinationsEnd exception: opening
The bug was caused by commit
25b0ce4. Other known symptoms are:
assertion failed: store.cc:1793: "isEmpty()"
assertion failed: FwdState.cc:501: "serverConnection() == conn"
assertion failed: FwdState.cc:1037: "!opening()"
This change has several overlapping parts. Unfortunately, merging
individual parts is both difficult and likely to cause crashes.
## Part 1: Bug 5055.
FwdState used to check serverConn to decide whether to open a connection
to forward the request. Since commit
25b0ce4, a nil serverConn pointer
no longer implies that a new connection should be opened: FwdState
helper jobs may already be working on preparing an existing open
connection (e.g., sending a CONNECT request or negotiating encryption).
Bad serverConn checks in both FwdState::noteDestination() and
FwdState::noteDestinationsEnd() methods led to extra connectStart()
calls creating two conflicting concurrent helper jobs.
To fix this, we replaced direct serverConn inspection with a
usingDestination() call which also checks whether we are waiting for a
helper job. Testing that fix exposed another set of bugs: The helper job
pointers or in-job connections left stale/set after forwarding failures.
The changes described below addressed (most of) those problems.
## Part 2: Connection establishing helper jobs and their callbacks
A proper fix for Bug 5055 required answering a difficult question: When
should a dying job call its callbacks? We only found one answer which
required cooperation from the job creator and led to the following
rules:
* AsyncJob destructors must not call any callbacks.
* AsyncJob::swanSong() is responsible for async-calling any remaining
(i.e. set, uncalled, and uncancelled) callbacks.
* AsyncJob::swanSong() is called (only) for started jobs.
* AsyncJob destructing sequence should validate that no callbacks remain
uncalled for started jobs.
... where an AsyncJob x is considered "started" if AsyncJob::Start(x)
has returned without throwing.
A new JobWait class helps job creators follow these rules while keeping
track on in-progress helper jobs and killing no-longer-needed helpers.
Also fixed very similar bugs in tunnel.cc code.
## Part 3: ConnOpener fixes
1. Many ConnOpener users are written to keep a ConnectionPointer to the
destination given to ConnOpener. This means that their connection
magically opens when ConnOpener successfully connects, before
ConnOpener has a chance to notify the user about the changes. Having
multiple concurrent connection owners is always dangerous, and the
user cannot even have a close handler registered for its now-open
connection. When something happens to ConnOpener or its answer, the
user job may be in trouble. Now, ConnOpener callers no longer pass
Connection objects they own, cloning them as needed. That adjustment
required adjustment 2:
2. Refactored ConnOpener users to stop assuming that the answer contains
a pointer to their connection object. After adjustment 1 above, it
does not. HappyConnOpener relied on that assumption quite a bit so we
had to refactor to use two custom callback methods instead of one
with a complicated if-statement distinguishing prime from spare
attempts. This refactoring is an overall improvement because it
simplifies the code. Other ConnOpener users just needed to remove a
few no longer valid paranoid assertions/Musts.
3. ConnOpener users were forced to remember to close params.conn when
processing negative answers. Some, naturally, forgot, triggering
warnings about orphaned connections (e.g., Ident and TcpLogger).
ConnOpener now closes its open connection before sending a negative
answer.
4. ConnOpener would trigger orphan connection warnings if the job ended
after opening the connection but without supplying the connection to
the requestor (e.g., because the requestor has gone away). Now,
ConnOpener explicitly closes its open connection if it has not been
sent to the requestor.
Also fixed Comm::ConnOpener::cleanFd() debugging that was incorrectly
saying that the method closes the temporary descriptor.
Also fixed ConnOpener callback's syncWithComm(): The stale
CommConnectCbParams override was testing unused (i.e. always negative)
CommConnectCbParams::fd and was trying to cancel the callback that most
(possibly all) recipients rely on: ConnOpener users expect a negative
answer rather than no answer at all.
Also, after comparing the needs of two old/existing and a temporary
added ("clone everything") Connection cloning method callers, we decided
there is no need to maintain three different methods. All existing
callers should be fine with a single method because none of them suffers
from "extra" copying of members that others need. Right now, the new
cloneProfile() method copies everything except FD and a few
special-purpose members (with documented reasons for not copying).
Also added Comm::Connection::cloneDestinationDetails() debugging to
simplify tracking dependencies between half-baked Connection objects
carrying destination/flags/other metadata and open Connection objects
created by ConnOpener using that metadata (which are later delivered to
ConnOpener users and, in some cases, replace those half-baked
connections mentioned earlier. Long-term, we need to find a better way
to express these and other Connection states/stages than comments and
debugging messages.
## Part 4: Comm::Connection closure callbacks
We improved many closure callbacks to make sure (to the extent possible)
that Connection and other objects are in sync with Comm. There are lots
of small bugs, inconsistencies, and other problems in Connection closure
handlers. It is not clear whether any of those problems could result in
serious runtime errors or leaks. In theory, the rest of the code could
neutralize their negative side effects. However, even in that case, it
was just a matter of time before the next bug will bite us due to stale
Connection::fd and such. These changes themselves carry elevated risk,
but they get us closer to reliable code as far as Connection maintenance
is concerned; otherwise, we will keep chasing their deadly side effects.
Long-term, all these manual efforts to keep things in sync should become
unnecessary with the introduction of appropriate Connection ownership
APIs that automatically maintain the corresponding environments (TODO).
## Part 5: Other notable improvements in the adjusted code
Improved Security::PeerConnector::serverConn and
Http::Tunneler::connection management, especially when sending negative
answers. When sending a negative answer, we would set answer().conn to
an open connection, async-send that answer, and then hurry to close the
connection using our pointer to the shared Connection object. If
everything went according to the plan, the recipient would get a non-nil
but closed Connection object. Now, a negative answer simply means no
connection at all. Same for a tunneled answer.
Refactored ICAP connection-establishing code to to delay Connection
ownership until the ICAP connection is fully ready. This change
addresses primary Connection ownership concerns (as they apply to this
ICAP code) except orphaning of the temporary Connection object by helper
job start exceptions (now an explicit XXX). For example, the transaction
no longer shares a Connection object with ConnOpener and
IcapPeerConnector jobs.
Probably fixed a bug where PeerConnector::negotiate() assumed that a
sslFinalized() does not return true after callBack(). It may (e.g., when
CertValidationHelper::Submit() throws). Same for
PeekingPeerConnector::checkForPeekAndSpliceMatched().
Fixed FwdState::advanceDestination() bug that did not save
ERR_GATEWAY_FAILURE details and "lost" the address of that failed
destination, making it unavailable to future retries (if any).
Polished PeerPoolMgr, Ident, and Gopher code to be able to fix similar
job callback and connection management issues.
Polished AsyncJob::Start() API. Start() used to return a job pointer,
but that was a bad idea:
* It implies that a failed Start() will return a nil pointer, and that
the caller should check the result. Neither is true.
* It encourages callers to dereference the returned pointer to further
adjust the job. That technically works (today) but violates the rules
of communicating with an async job. The Start() method is the boundary
after which the job is deemed asynchronous.
Also removed old "and wait for..." post-Start() comments because the
code itself became clear enough, and those comments were becoming
increasingly stale (because they duplicated the callback names above
them).
Fix Tunneler and PeerConnector handling of last-resort callback
requirements. Events like handleStopRequest() and callException() stop
the job but should not be reported as a BUG (e.g., it would be up to the
callException() to decide how to report the caught exception). There
might (or there will) be other, similar cases where the job is stopped
prematurely for some non-BUG reason beyond swanSong() knowledge. The
existence of non-bug cases does not mean there could be no bugs worth
reporting here, but until they can be identified more reliably than all
these benign/irrelevant cases, reporting no BUGs is a (much) lesser
evil.
TODO: Revise AsyncJob::doneAll(). Many of its overrides are written to
check for both positive (i.e. mission accomplished) and negative (i.e.
mission cancelled or cannot be accomplished) conditions, but the latter
is usually unnecessary, especially after we added handleStopRequest()
API to properly support external job cancellation events. Many doneAll()
overrides can probably be greatly simplified.
----
Cherry picked SQUID-568-premature-serverconn-use-v5 commit
22b5f78.
* fixup: Cherry-picked SQUID-568-premature-serverconn-use PR-time fixes
In git log order:
*
e64a6c1: Undone an out-of-scope change and added a missing 'auto'
*
aeaf83d: Fixed an 'unused parameter' error
*
f49d009: fixup: No explicit destructors with implicit copying methods
*
c30c37f: Removed an unnecessary explicit copy constructor
*
012f5ec: Excluded Connection::rfc931 from cloning
*
366c78a: ICAP: do not set connect_timeout on the established conn...
This branch is now in sync with SQUID-568-premature-serverconn-use (S)
commit
e64a6c1 (except for official changes merged from master/v6 into S
closer to the end of PR 877 work (i.e. S' merge commit
0a7432a).
* Fix FATAL ServiceRep::putConnection exception: theBusyConns > 0
FATAL: check failed: theBusyConns > 0
exception location: ServiceRep.cc(163) putConnection
Since master/v6 commit
2b6b1bc, a timeout on a ready-to-shovel
Squid-service ICAP connection was decrementing theBusyConns level one
extra time because Adaptation::Icap::Xaction::noteCommTimedout() started
calling both noteConnectionFailed() and closeConnection(). Depending on
the actual theBusyConns level, the extra decrement could result in FATAL
errors later, when putConnection() was called (for a different ICAP
transaction) with zero theBusyConns in an exception-unprotected context.
Throughout these changes, Xaction still counts the above timeouts as a
service failure. That is done by calling ServiceRep::noteFailure() from
Xaction::callException(), including in timeout cases described above.
----
Cherry-picked master/v6 commit
a8ac892.
return;
theConsumer = new BodySink(this);
+ AsyncJob::Start(theConsumer);
debugs(91,7, HERE << "starting auto consumption" << status());
scheduleBodyDataNotification();
}
{
}
+// XXX: Add CommAcceptCbParams::syncWithComm(). Adjust syncWithComm() API if all
+// implementations always return true.
+
void
CommAcceptCbParams::print(std::ostream &os) const
{
bool
CommConnectCbParams::syncWithComm()
{
- // drop the call if the call was scheduled before comm_close but
- // is being fired after comm_close
- if (fd >= 0 && fd_table[fd].closing()) {
- debugs(5, 3, HERE << "dropping late connect call: FD " << fd);
- return false;
+ assert(conn);
+
+ // change parameters if this is a successful callback that was scheduled
+ // after its Comm-registered connection started to close
+
+ if (flag != Comm::OK) {
+ assert(!conn->isOpen());
+ return true; // not a successful callback; cannot go out of sync
}
- return true; // now we are in sync and can handle the call
+
+ assert(conn->isOpen());
+ if (!fd_table[conn->fd].closing())
+ return true; // no closing in progress; in sync (for now)
+
+ debugs(5, 3, "converting to Comm::ERR_CLOSING: " << conn);
+ conn->noteClosure();
+ flag = Comm::ERR_CLOSING;
+ return true; // now the callback is in sync with Comm again
}
/* CommIoCbParams */
Downloader::swanSong()
{
debugs(33, 6, this);
+
+ if (callback_) // job-ending emergencies like handleStopRequest() or callException()
+ callBack(Http::scInternalServerError);
+
if (context_) {
context_->finished();
context_ = nullptr;
void
Downloader::callBack(Http::StatusCode const statusCode)
{
+ assert(callback_);
CbDialer *dialer = dynamic_cast<CbDialer*>(callback_->getDialer());
Must(dialer);
dialer->status = statusCode;
{
debugs(17, 3, "for " << reason);
- if (opening())
- cancelOpening(reason);
+ cancelStep(reason);
PeerSelectionInitiator::subscribed = false; // may already be false
self = nullptr; // we hope refcounting destroys us soon; may already be nil
/* do not place any code here as this object may be gone by now */
}
-/// Notify connOpener that we no longer need connections. We do not have to do
-/// this -- connOpener would eventually notice on its own, but notifying reduces
-/// waste and speeds up spare connection opening for other transactions (that
-/// could otherwise wait for this transaction to use its spare allowance).
+/// Notify a pending subtask, if any, that we no longer need its help. We do not
+/// have to do this -- the subtask job will eventually end -- but ending it
+/// earlier reduces waste and may reduce DoS attack surface.
void
-FwdState::cancelOpening(const char *reason)
+FwdState::cancelStep(const char *reason)
{
- assert(calls.connector);
- calls.connector->cancel(reason);
- calls.connector = nullptr;
- notifyConnOpener();
- connOpener.clear();
+ transportWait.cancel(reason);
+ encryptionWait.cancel(reason);
+ peerWait.cancel(reason);
}
#if STRICT_ORIGINAL_DST
entry = NULL;
- if (opening())
- cancelOpening("~FwdState");
+ cancelStep("~FwdState");
if (Comm::IsConnOpen(serverConn))
closeServerConnection("~FwdState");
if (!errorState->request)
errorState->request = request;
- if (err->type != ERR_ZERO_SIZE_OBJECT)
- return;
+ if (err->type == ERR_ZERO_SIZE_OBJECT)
+ reactToZeroSizeObject();
+
+ destinationReceipt = nullptr; // may already be nil
+}
+
+/// ERR_ZERO_SIZE_OBJECT requires special adjustments
+void
+FwdState::reactToZeroSizeObject()
+{
+ assert(err->type == ERR_ZERO_SIZE_OBJECT);
if (pconnRace == racePossible) {
debugs(17, 5, HERE << "pconn race happened");
if (Comm::IsConnOpen(serverConn))
unregister(serverConn);
+ serverConn = nullptr;
+ destinationReceipt = nullptr;
storedWholeReply_ = nullptr;
entry->reset();
}
}
+bool
+FwdState::usingDestination() const
+{
+ return encryptionWait || peerWait || Comm::IsConnOpen(serverConn);
+}
+
void
FwdState::markStoredReplyAsWhole(const char * const whyWeAreSure)
{
destinations->addPath(path);
- if (Comm::IsConnOpen(serverConn)) {
+ if (usingDestination()) {
// We are already using a previously opened connection, so we cannot be
- // waiting for connOpener. We still receive destinations for backup.
- Must(!opening());
+ // waiting for it. We still receive destinations for backup.
+ Must(!transportWait);
return;
}
- if (opening()) {
+ if (transportWait) {
notifyConnOpener();
return; // and continue to wait for FwdState::noteConnection() callback
}
- // This is the first path candidate we have seen. Create connOpener.
+ // This is the first path candidate we have seen. Use it.
useDestinations();
}
// if all of them fail, forwarding as whole will fail
Must(!selectionError); // finding at least one path means selection succeeded
- if (Comm::IsConnOpen(serverConn)) {
+ if (usingDestination()) {
// We are already using a previously opened connection, so we cannot be
- // waiting for connOpener. We were receiving destinations for backup.
- Must(!opening());
+ // waiting for it. We were receiving destinations for backup.
+ Must(!transportWait);
return;
}
- Must(opening()); // or we would be stuck with nothing to do or wait for
+ Must(transportWait); // or we would be stuck with nothing to do or wait for
notifyConnOpener();
// and continue to wait for FwdState::noteConnection() callback
}
-/// makes sure connOpener knows that destinations have changed
+/// makes sure connection opener knows that the destinations have changed
void
FwdState::notifyConnOpener()
{
} else {
debugs(17, 7, "notifying about " << *destinations);
destinations->notificationPending = true;
- CallJobHere(17, 5, connOpener, HappyConnOpener, noteCandidatesChange);
+ CallJobHere(17, 5, transportWait.job(), HappyConnOpener, noteCandidatesChange);
}
}
fwdServerClosedWrapper(const CommCloseCbParams ¶ms)
{
FwdState *fwd = (FwdState *)params.data;
- fwd->serverClosed(params.fd);
+ fwd->serverClosed();
}
/**** PRIVATE *****************************************************************/
}
void
-FwdState::serverClosed(int fd)
+FwdState::serverClosed()
{
- // XXX: fd is often -1 here
- debugs(17, 2, "FD " << fd << " " << entry->url() << " after " <<
- (fd >= 0 ? fd_table[fd].pconn.uses : -1) << " requests");
- if (fd >= 0 && serverConnection()->fd == fd)
- fwdPconnPool->noteUses(fd_table[fd].pconn.uses);
+ // XXX: This method logic attempts to tolerate Connection::close() called
+ // for serverConn earlier, by one of our dispatch()ed jobs. If that happens,
+ // serverConn will already be closed here or, worse, it will already be open
+ // for the next forwarding attempt. The current code prevents us getting
+ // stuck, but the long term solution is to stop sharing serverConn.
+ debugs(17, 2, serverConn);
+ if (Comm::IsConnOpen(serverConn)) {
+ const auto uses = fd_table[serverConn->fd].pconn.uses;
+ debugs(17, 3, "prior uses: " << uses);
+ fwdPconnPool->noteUses(uses); // XXX: May not have come from fwdPconnPool
+ serverConn->noteClosure();
+ }
+ serverConn = nullptr;
+ closeHandler = nullptr;
+ destinationReceipt = nullptr;
retryOrBail();
}
{
debugs(17, 2, HERE << "self=" << self << " err=" << err << ' ' << entry->url());
assert(!Comm::IsConnOpen(serverConn));
+ serverConn = nullptr;
+ destinationReceipt = nullptr;
retryOrBail();
}
} catch (...) {
debugs (17, 2, "exception while trying to " << stepDescription << ": " << CurrentException);
closePendingConnection(conn, "connection preparation exception");
+ if (!err)
+ fail(new ErrorState(ERR_GATEWAY_FAILURE, Http::scInternalServerError, request, al));
retryOrBail();
}
}
{
assert(!destinationReceipt);
- calls.connector = nullptr;
- connOpener.clear();
+ transportWait.finish();
Must(n_tries <= answer.n_tries); // n_tries cannot decrease
n_tries = answer.n_tries;
Must(!Comm::IsConnOpen(answer.conn));
answer.error.clear(); // preserve error for errorSendComplete()
} else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) {
+ // The socket could get closed while our callback was queued. Sync
+ // Connection. XXX: Connection::fd may already be stale/invalid here.
// We do not know exactly why the connection got closed, so we play it
// safe, allowing retries only for persistent (reused) connections
if (answer.reused) {
if (!conn->getPeer()->options.no_delay)
tunneler->setDelayId(entry->mem_obj->mostBytesAllowed());
#endif
- AsyncJob::Start(tunneler);
- // and wait for the tunnelEstablishmentDone() call
+ peerWait.start(tunneler, callback);
}
/// resumes operations after the (possibly failed) HTTP CONNECT exchange
void
FwdState::tunnelEstablishmentDone(Http::TunnelerAnswer &answer)
{
+ peerWait.finish();
+
ErrorState *error = nullptr;
if (!answer.positive()) {
- Must(!Comm::IsConnOpen(answer.conn));
+ Must(!answer.conn);
error = answer.squidError.get();
Must(error);
answer.squidError.clear(); // preserve error for fail()
} else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) {
- // The socket could get closed while our callback was queued.
- // We close Connection here to sync Connection::fd.
+ // The socket could get closed while our callback was queued. Sync
+ // Connection. XXX: Connection::fd may already be stale/invalid here.
closePendingConnection(answer.conn, "conn was closed while waiting for tunnelEstablishmentDone");
error = new ErrorState(ERR_CANNOT_FORWARD, Http::scServiceUnavailable, request, al);
} else if (!answer.leftovers.isEmpty()) {
#endif
connector = new Security::BlindPeerConnector(requestPointer, conn, callback, al, sslNegotiationTimeout);
connector->noteFwdPconnUse = true;
- AsyncJob::Start(connector); // will call our callback
+ encryptionWait.start(connector, callback);
}
/// called when all negotiations with the TLS-speaking peer have been completed
void
FwdState::connectedToPeer(Security::EncryptorAnswer &answer)
{
+ encryptionWait.finish();
+
ErrorState *error = nullptr;
if ((error = answer.error.get())) {
- Must(!Comm::IsConnOpen(answer.conn));
+ assert(!answer.conn);
answer.error.clear(); // preserve error for errorSendComplete()
} else if (answer.tunneled) {
+ assert(!answer.conn);
// TODO: When ConnStateData establishes tunnels, its state changes
// [in ways that may affect logging?]. Consider informing
// ConnStateData about our tunnel or otherwise unifying tunnel
complete(); // destroys us
return;
} else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) {
+ // The socket could get closed while our callback was queued. Sync
+ // Connection. XXX: Connection::fd may already be stale/invalid here.
closePendingConnection(answer.conn, "conn was closed while waiting for connectedToPeer");
error = new ErrorState(ERR_CANNOT_FORWARD, Http::scServiceUnavailable, request, al);
}
Must(!request->pinnedConnection());
assert(!destinations->empty());
- assert(!opening());
+ assert(!usingDestination());
// Ditch error page if it was created before.
// A new one will be created if there's another problem
delete err;
err = nullptr;
request->clearError();
- serverConn = nullptr;
- destinationReceipt = nullptr;
request->hier.startPeerClock();
- calls.connector = asyncCall(17, 5, "FwdState::noteConnection", HappyConnOpener::CbDialer<FwdState>(&FwdState::noteConnection, this));
+ AsyncCall::Pointer callback = asyncCall(17, 5, "FwdState::noteConnection", HappyConnOpener::CbDialer<FwdState>(&FwdState::noteConnection, this));
HttpRequest::Pointer cause = request;
- const auto cs = new HappyConnOpener(destinations, calls.connector, cause, start_t, n_tries, al);
+ const auto cs = new HappyConnOpener(destinations, callback, cause, start_t, n_tries, al);
cs->setHost(request->url.host());
bool retriable = checkRetriable();
if (!retriable && Config.accessList.serverPconnForNonretriable) {
cs->setRetriable(retriable);
cs->allowPersistent(pconnRace != raceHappened);
destinations->notificationPending = true; // start() is async
- connOpener = cs;
- AsyncJob::Start(cs);
+ transportWait.start(cs, callback);
}
/// send request on an existing connection dedicated to the requesting client
#ifndef SQUID_FORWARD_H
#define SQUID_FORWARD_H
-#include "base/CbcPointer.h"
#include "base/forward.h"
+#include "base/JobWait.h"
#include "base/RefCount.h"
#include "clients/forward.h"
#include "comm.h"
#include "comm/Connection.h"
-#include "comm/ConnOpener.h"
#include "error/forward.h"
#include "fde.h"
#include "http/StatusCode.h"
typedef RefCount<ResolvedPeers> ResolvedPeersPointer;
class HappyConnOpener;
-typedef CbcPointer<HappyConnOpener> HappyConnOpenerPointer;
class HappyConnOpenerAnswer;
/// Sets initial TOS value and Netfilter for the future outgoing connection.
void handleUnregisteredServerEnd();
int reforward();
bool reforwardableStatus(const Http::StatusCode s) const;
- void serverClosed(int fd);
+ void serverClosed();
void connectStart();
void connectDone(const Comm::ConnectionPointer & conn, Comm::Flag status, int xerrno);
bool checkRetry();
/* PeerSelectionInitiator API */
virtual void noteDestination(Comm::ConnectionPointer conn) override;
virtual void noteDestinationsEnd(ErrorState *selectionError) override;
+ /// whether the successfully selected path destination or the established
+ /// server connection is still in use
+ bool usingDestination() const;
void noteConnection(HappyConnOpenerAnswer &);
/// \returns the time left for this connection to become connected or 1 second if it is less than one second left
time_t connectingTimeout(const Comm::ConnectionPointer &conn) const;
- /// whether we are waiting for HappyConnOpener
- /// same as calls.connector but may differ from connOpener.valid()
- bool opening() const { return connOpener.set(); }
-
- void cancelOpening(const char *reason);
+ void cancelStep(const char *reason);
void notifyConnOpener();
+ void reactToZeroSizeObject();
void updateAleWithFinalError();
time_t start_t;
int n_tries; ///< the number of forwarding attempts so far
- // AsyncCalls which we set and may need cancelling.
- struct {
- AsyncCall::Pointer connector; ///< a call linking us to the ConnOpener producing serverConn.
- } calls;
-
struct {
bool connected_okay; ///< TCP link ever opened properly. This affects retry of POST,PUT,CONNECT,etc
bool dont_retry;
bool destinationsFound; ///< at least one candidate path found
} flags;
- HappyConnOpenerPointer connOpener; ///< current connection opening job
+ /// waits for a transport connection to the peer to be established/opened
+ JobWait<HappyConnOpener> transportWait;
+
+ /// waits for the established transport connection to be secured/encrypted
+ JobWait<Security::PeerConnector> encryptionWait;
+
+ /// waits for an HTTP CONNECT tunnel through a cache_peer to be negotiated
+ /// over the (encrypted, if needed) transport connection to that cache_peer
+ JobWait<Http::Tunneler> peerWait;
+
ResolvedPeersPointer destinations; ///< paths for forwarding the request
Comm::ConnectionPointer serverConn; ///< a successfully opened connection to a server.
PeerConnectionPointer destinationReceipt; ///< peer selection result (or nil)
#include "neighbors.h"
#include "pconn.h"
#include "PeerPoolMgr.h"
+#include "sbuf/Stream.h"
#include "SquidConfig.h"
CBDATA_CLASS_INIT(HappyConnOpener);
fwdStart(aFwdStart),
callback_(aCall),
destinations(dests),
+ prime(&HappyConnOpener::notePrimeConnectDone, "HappyConnOpener::notePrimeConnectDone"),
+ spare(&HappyConnOpener::noteSpareConnectDone, "HappyConnOpener::noteSpareConnectDone"),
ale(anAle),
cause(request),
n_tries(tries)
AsyncJob::swanSong();
}
+/// HappyConnOpener::Attempt printer for debugging
+std::ostream &
+operator <<(std::ostream &os, const HappyConnOpener::Attempt &attempt)
+{
+ if (!attempt.path)
+ os << '-';
+ else if (attempt.path->isOpen())
+ os << "FD " << attempt.path->fd;
+ else if (attempt.connWait)
+ os << attempt.connWait;
+ else // destination is known; connection closed (and we are not opening any)
+ os << attempt.path->id;
+ return os;
+}
+
const char *
HappyConnOpener::status() const
{
- static MemBuf buf;
- buf.reset();
+ // TODO: In a redesigned status() API, the caller may mimic this approach.
+ static SBuf buf;
+ buf.clear();
- buf.append(" [", 2);
+ SBufStream os(buf);
+
+ os.write(" [", 2);
if (stopReason)
- buf.appendf("Stopped, reason:%s", stopReason);
- if (prime) {
- if (prime.path && prime.path->isOpen())
- buf.appendf(" prime FD %d", prime.path->fd);
- else if (prime.connector)
- buf.appendf(" prime call%ud", prime.connector->id.value);
- }
- if (spare) {
- if (spare.path && spare.path->isOpen())
- buf.appendf(" spare FD %d", spare.path->fd);
- else if (spare.connector)
- buf.appendf(" spare call%ud", spare.connector->id.value);
- }
+ os << "Stopped:" << stopReason;
+ if (prime)
+ os << "prime:" << prime;
+ if (spare)
+ os << "spare:" << spare;
if (n_tries)
- buf.appendf(" tries %d", n_tries);
- buf.appendf(" %s%u]", id.prefix(), id.value);
- buf.terminate();
+ os << " tries:" << n_tries;
+ os << ' ' << id << ']';
- return buf.content();
+ buf = os.buf();
+ return buf.c_str();
}
/// Create "503 Service Unavailable" or "504 Gateway Timeout" error depending
HappyConnOpener::startConnecting(Attempt &attempt, PeerConnectionPointer &dest)
{
Must(!attempt.path);
- Must(!attempt.connector);
+ Must(!attempt.connWait);
Must(dest);
const auto bumpThroughPeer = cause->flags.sslBumped && dest->getPeer();
entry->mem_obj->checkUrlChecksum();
#endif
- GetMarkingsToServer(cause.getRaw(), *dest);
+ const auto conn = dest->cloneProfile();
+ GetMarkingsToServer(cause.getRaw(), *conn);
- // ConnOpener modifies its destination argument so we reset the source port
- // in case we are reusing the destination already used by our predecessor.
- dest->local.port(0);
++n_tries;
typedef CommCbMemFunT<HappyConnOpener, CommConnectCbParams> Dialer;
- AsyncCall::Pointer callConnect = JobCallback(48, 5, Dialer, this, HappyConnOpener::connectDone);
+ AsyncCall::Pointer callConnect = asyncCall(48, 5, attempt.callbackMethodName,
+ Dialer(this, attempt.callbackMethod));
const time_t connTimeout = dest->connectTimeout(fwdStart);
- Comm::ConnOpener *cs = new Comm::ConnOpener(dest, callConnect, connTimeout);
- if (!dest->getPeer())
+ auto cs = new Comm::ConnOpener(conn, callConnect, connTimeout);
+ if (!conn->getPeer())
cs->setHost(host_);
- attempt.path = dest;
- attempt.connector = callConnect;
- attempt.opener = cs;
+ attempt.path = dest; // but not the being-opened conn!
+ attempt.connWait.start(cs, callConnect);
+}
- AsyncJob::Start(cs);
+/// Comm::ConnOpener callback for the prime connection attempt
+void
+HappyConnOpener::notePrimeConnectDone(const CommConnectCbParams ¶ms)
+{
+ handleConnOpenerAnswer(prime, params, "new prime connection");
}
-/// called by Comm::ConnOpener objects after a prime or spare connection attempt
-/// completes (successfully or not)
+/// Comm::ConnOpener callback for the spare connection attempt
void
-HappyConnOpener::connectDone(const CommConnectCbParams ¶ms)
+HappyConnOpener::noteSpareConnectDone(const CommConnectCbParams ¶ms)
{
- Must(params.conn);
- const bool itWasPrime = (params.conn == prime.path);
- const bool itWasSpare = (params.conn == spare.path);
- Must(itWasPrime != itWasSpare);
-
- PeerConnectionPointer handledPath;
- if (itWasPrime) {
- handledPath = prime.path;
- prime.finish();
- } else {
- handledPath = spare.path;
- spare.finish();
- if (gotSpareAllowance) {
- TheSpareAllowanceGiver.jobUsedAllowance();
- gotSpareAllowance = false;
- }
+ if (gotSpareAllowance) {
+ TheSpareAllowanceGiver.jobUsedAllowance();
+ gotSpareAllowance = false;
}
+ handleConnOpenerAnswer(spare, params, "new spare connection");
+}
+
+/// prime/spare-agnostic processing of a Comm::ConnOpener result
+void
+HappyConnOpener::handleConnOpenerAnswer(Attempt &attempt, const CommConnectCbParams ¶ms, const char *what)
+{
+ Must(params.conn);
+
+ // finalize the previously selected path before attempt.finish() forgets it
+ auto handledPath = attempt.path;
+ handledPath.finalize(params.conn); // closed on errors
+ attempt.finish();
- const char *what = itWasPrime ? "new prime connection" : "new spare connection";
if (params.flag == Comm::OK) {
sendSuccess(handledPath, false, what);
return;
debugs(17, 8, what << " failed: " << params.conn);
if (const auto peer = params.conn->getPeer())
peerConnectFailed(peer);
- params.conn->close(); // TODO: Comm::ConnOpener should do this instead.
// remember the last failure (we forward it if we cannot connect anywhere)
lastFailedConnection = handledPath;
return false;
}
+HappyConnOpener::Attempt::Attempt(const CallbackMethod method, const char *methodName):
+ callbackMethod(method),
+ callbackMethodName(methodName)
+{
+}
+
+void
+HappyConnOpener::Attempt::finish()
+{
+ connWait.finish();
+ path = nullptr;
+}
+
void
HappyConnOpener::Attempt::cancel(const char *reason)
{
- if (connector) {
- connector->cancel(reason);
- CallJobHere(17, 3, opener, Comm::ConnOpener, noteAbort);
- }
- clear();
+ connWait.cancel(reason);
+ path = nullptr;
}
/// a connection opening attempt in progress (or falsy)
class Attempt {
public:
+ /// HappyConnOpener method implementing a ConnOpener callback
+ using CallbackMethod = void (HappyConnOpener::*)(const CommConnectCbParams &);
+
+ Attempt(const CallbackMethod method, const char *methodName);
+
explicit operator bool() const { return static_cast<bool>(path); }
/// reacts to a natural attempt completion (successful or otherwise)
- void finish() { clear(); }
+ void finish();
/// aborts an in-progress attempt
void cancel(const char *reason);
PeerConnectionPointer path; ///< the destination we are connecting to
- AsyncCall::Pointer connector; ///< our opener callback
- Comm::ConnOpener::Pointer opener; ///< connects to path and calls us
- private:
- /// cleans up after the attempt ends (successfully or otherwise)
- void clear() { path = nullptr; connector = nullptr; opener = nullptr; }
+ /// waits for a connection to the peer to be established/opened
+ JobWait<Comm::ConnOpener> connWait;
+
+ const CallbackMethod callbackMethod; ///< ConnOpener calls this method
+ const char * const callbackMethodName; ///< for callbackMethod debugging
};
+ friend std::ostream &operator <<(std::ostream &, const Attempt &);
/* AsyncJob API */
virtual void start() override;
void openFreshConnection(Attempt &, PeerConnectionPointer &);
bool reuseOldConnection(PeerConnectionPointer &);
- void connectDone(const CommConnectCbParams &);
+ void notePrimeConnectDone(const CommConnectCbParams &);
+ void noteSpareConnectDone(const CommConnectCbParams &);
+ void handleConnOpenerAnswer(Attempt &, const CommConnectCbParams &, const char *connDescription);
void checkForNewConnection();
PeerPoolMgr::PeerPoolMgr(CachePeer *aPeer): AsyncJob("PeerPoolMgr"),
peer(cbdataReference(aPeer)),
request(),
- opener(),
- securer(),
- closer(),
+ transportWait(),
+ encryptionWait(),
addrUsed(0)
{
}
void
PeerPoolMgr::handleOpenedConnection(const CommConnectCbParams ¶ms)
{
- opener = NULL;
+ transportWait.finish();
if (!validPeer()) {
debugs(48, 3, "peer gone");
}
if (params.flag != Comm::OK) {
- /* it might have been a timeout with a partially open link */
- if (params.conn != NULL)
- params.conn->close();
peerConnectFailed(peer);
checkpoint("conn opening failure"); // may retry
return;
// Handle TLS peers.
if (peer->secure.encryptTransport) {
- typedef CommCbMemFunT<PeerPoolMgr, CommCloseCbParams> CloserDialer;
- closer = JobCallback(48, 3, CloserDialer, this,
- PeerPoolMgr::handleSecureClosure);
- comm_add_close_handler(params.conn->fd, closer);
-
- securer = asyncCall(48, 4, "PeerPoolMgr::handleSecuredPeer",
- MyAnswerDialer(this, &PeerPoolMgr::handleSecuredPeer));
+ // XXX: Exceptions orphan params.conn
+ AsyncCall::Pointer callback = asyncCall(48, 4, "PeerPoolMgr::handleSecuredPeer",
+ MyAnswerDialer(this, &PeerPoolMgr::handleSecuredPeer));
const int peerTimeout = peerConnectTimeout(peer);
const int timeUsed = squid_curtime - params.conn->startTime();
// Use positive timeout when less than one second is left for conn.
const int timeLeft = positiveTimeout(peerTimeout - timeUsed);
- auto *connector = new Security::BlindPeerConnector(request, params.conn, securer, nullptr, timeLeft);
- AsyncJob::Start(connector); // will call our callback
+ const auto connector = new Security::BlindPeerConnector(request, params.conn, callback, nullptr, timeLeft);
+ encryptionWait.start(connector, callback);
return;
}
void
PeerPoolMgr::handleSecuredPeer(Security::EncryptorAnswer &answer)
{
- Must(securer != NULL);
- securer = NULL;
-
- if (closer != NULL) {
- if (answer.conn != NULL)
- comm_remove_close_handler(answer.conn->fd, closer);
- else
- closer->cancel("securing completed");
- closer = NULL;
- }
+ encryptionWait.finish();
if (!validPeer()) {
debugs(48, 3, "peer gone");
return;
}
+ assert(!answer.tunneled);
if (answer.error.get()) {
- if (answer.conn != NULL)
- answer.conn->close();
+ assert(!answer.conn);
// PeerConnector calls peerConnectFailed() for us;
checkpoint("conn securing failure"); // may retry
return;
}
- pushNewConnection(answer.conn);
-}
+ assert(answer.conn);
-void
-PeerPoolMgr::handleSecureClosure(const CommCloseCbParams ¶ms)
-{
- Must(closer != NULL);
- Must(securer != NULL);
- securer->cancel("conn closed by a 3rd party");
- securer = NULL;
- closer = NULL;
- // allow the closing connection to fully close before we check again
- Checkpoint(this, "conn closure while securing");
+ // The socket could get closed while our callback was queued. Sync
+ // Connection. XXX: Connection::fd may already be stale/invalid here.
+ if (answer.conn->isOpen() && fd_table[answer.conn->fd].closing()) {
+ answer.conn->noteClosure();
+ checkpoint("external connection closure"); // may retry
+ return;
+ }
+
+ pushNewConnection(answer.conn);
}
void
PeerPoolMgr::openNewConnection()
{
// KISS: Do nothing else when we are already doing something.
- if (opener != NULL || securer != NULL || shutting_down) {
- debugs(48, 7, "busy: " << opener << '|' << securer << '|' << shutting_down);
+ if (transportWait || encryptionWait || shutting_down) {
+ debugs(48, 7, "busy: " << transportWait << '|' << encryptionWait << '|' << shutting_down);
return; // there will be another checkpoint when we are done opening/securing
}
const int ctimeout = peerConnectTimeout(peer);
typedef CommCbMemFunT<PeerPoolMgr, CommConnectCbParams> Dialer;
- opener = JobCallback(48, 5, Dialer, this, PeerPoolMgr::handleOpenedConnection);
- Comm::ConnOpener *cs = new Comm::ConnOpener(conn, opener, ctimeout);
- AsyncJob::Start(cs);
+ AsyncCall::Pointer callback = JobCallback(48, 5, Dialer, this, PeerPoolMgr::handleOpenedConnection);
+ const auto cs = new Comm::ConnOpener(conn, callback, ctimeout);
+ transportWait.start(cs, callback);
}
void
#define SQUID_PEERPOOLMGR_H
#include "base/AsyncJob.h"
+#include "base/JobWait.h"
#include "comm/forward.h"
#include "security/forward.h"
/// Security::PeerConnector callback
void handleSecuredPeer(Security::EncryptorAnswer &answer);
- /// called when the connection we are trying to secure is closed by a 3rd party
- void handleSecureClosure(const CommCloseCbParams ¶ms);
-
/// the final step in connection opening (and, optionally, securing) sequence
void pushNewConnection(const Comm::ConnectionPointer &conn);
private:
CachePeer *peer; ///< the owner of the pool we manage
RefCount<HttpRequest> request; ///< fake HTTP request for conn opening code
- AsyncCall::Pointer opener; ///< whether we are opening a connection
- AsyncCall::Pointer securer; ///< whether we are securing a connection
- AsyncCall::Pointer closer; ///< monitors conn while we are securing it
+
+ /// waits for a transport connection to the peer to be established/opened
+ JobWait<Comm::ConnOpener> transportWait;
+
+ /// waits for the established transport connection to be secured/encrypted
+ JobWait<Security::BlindPeerConnector> encryptionWait;
+
unsigned int addrUsed; ///< counter for cycling through peer addresses
};
while (++pathsToSkip < paths_.size() && !paths_[pathsToSkip].available) {}
}
- const auto cleanPath = path.connection->cloneDestinationDetails();
+ const auto cleanPath = path.connection->cloneProfile();
return PeerConnectionPointer(cleanPath, found - paths_.begin());
}
openConnection();
}
-// connection with the ICAP service established
-void Adaptation::Icap::ModXact::handleCommConnected()
+void Adaptation::Icap::ModXact::startShoveling()
{
Must(state.writing == State::writingConnect);
virtual void noteBodyProductionEnded(BodyPipe::Pointer);
virtual void noteBodyProducerAborted(BodyPipe::Pointer);
- // comm handlers
- virtual void handleCommConnected();
+ /* Xaction API */
+ virtual void startShoveling();
virtual void handleCommWrote(size_t size);
virtual void handleCommRead(size_t size);
+
void handleCommWroteHeaders();
void handleCommWroteBody();
openConnection();
}
-void Adaptation::Icap::OptXact::handleCommConnected()
+void Adaptation::Icap::OptXact::startShoveling()
{
scheduleRead();
OptXact(ServiceRep::Pointer &aService);
protected:
+ /* Xaction API */
virtual void start();
- virtual void handleCommConnected();
+ virtual void startShoveling();
virtual void handleCommWrote(size_t size);
virtual void handleCommRead(size_t size);
// should be configurable.
}
-// returns a persistent or brand new connection; negative int on failures
+// TODO: getIdleConnection() and putConnection()/noteConnectionFailed() manage a
+// "used connection slot" resource. Automate that resource tracking (RAII/etc.).
Comm::ConnectionPointer
-Adaptation::Icap::ServiceRep::getConnection(bool retriableXact, bool &reused)
+Adaptation::Icap::ServiceRep::getIdleConnection(const bool retriableXact)
{
Comm::ConnectionPointer connection;
else
theIdleConns->closeN(1);
- reused = Comm::IsConnOpen(connection);
++theBusyConns;
debugs(93,3, HERE << "got connection: " << connection);
return connection;
// do not pool an idle connection if we owe connections
if (isReusable && excessConnections() == 0) {
debugs(93, 3, HERE << "pushing pconn" << comment);
- commUnsetConnTimeout(conn);
theIdleConns->push(conn);
} else {
debugs(93, 3, HERE << (sendReset ? "RST" : "FIN") << "-closing " <<
bool wantsPreview(const SBuf &urlPath, size_t &wantedSize) const;
bool allows204() const;
bool allows206() const;
- Comm::ConnectionPointer getConnection(bool isRetriable, bool &isReused);
+ /// \returns an idle persistent ICAP connection or nil
+ Comm::ConnectionPointer getIdleConnection(bool isRetriable);
void putConnection(const Comm::ConnectionPointer &conn, bool isReusable, bool sendReset, const char *comment);
void noteConnectionUse(const Comm::ConnectionPointer &conn);
void noteConnectionFailed(const char *comment);
#include "adaptation/icap/Config.h"
#include "adaptation/icap/Launcher.h"
#include "adaptation/icap/Xaction.h"
+#include "base/JobWait.h"
#include "base/TextException.h"
#include "comm.h"
#include "comm/Connection.h"
icapRequest(NULL),
icapReply(NULL),
attempts(0),
- connection(NULL),
theService(aService),
commEof(false),
reuseConnection(true),
isRepeatable(true),
ignoreLastWrite(false),
waitingForDns(false),
- stopReason(NULL),
- connector(NULL),
- reader(NULL),
- writer(NULL),
- closer(NULL),
alep(new AccessLogEntry),
- al(*alep),
- cs(NULL)
+ al(*alep)
{
debugs(93,3, typeName << " constructed, this=" << this <<
" [icapx" << id << ']'); // we should not call virtual status() here
icapLookupDnsResults(const ipcache_addrs *ia, const Dns::LookupDetails &, void *data)
{
Adaptation::Icap::Xaction *xa = static_cast<Adaptation::Icap::Xaction *>(data);
+ /// TODO: refactor with CallJobHere1, passing either std::optional (after upgrading to C++17)
+ /// or Optional<Ip::Address> (when it can take non-trivial types)
xa->dnsLookupDone(ia);
}
if (!TheConfig.reuse_connections)
disableRetries(); // this will also safely drain pconn pool
- bool wasReused = false;
- connection = s.getConnection(isRetriable, wasReused);
-
- if (wasReused && Comm::IsConnOpen(connection)) {
- // Set comm Close handler
- // fake the connect callback
- // TODO: can we sync call Adaptation::Icap::Xaction::noteCommConnected here instead?
- typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> Dialer;
- CbcPointer<Xaction> self(this);
- Dialer dialer(self, &Adaptation::Icap::Xaction::noteCommConnected);
- dialer.params.conn = connection;
- dialer.params.flag = Comm::OK;
- // fake other parameters by copying from the existing connection
- connector = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected", dialer);
- ScheduleCallHere(connector);
+ if (const auto pconn = s.getIdleConnection(isRetriable)) {
+ useTransportConnection(pconn);
return;
}
#if WHEN_IPCACHE_NBGETHOSTBYNAME_USES_ASYNC_CALLS
dieOnConnectionFailure(); // throws
#else // take a step back into protected Async call dialing.
- // fake the connect callback
- typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> Dialer;
- CbcPointer<Xaction> self(this);
- Dialer dialer(self, &Adaptation::Icap::Xaction::noteCommConnected);
- dialer.params.conn = connection;
- dialer.params.flag = Comm::COMM_ERROR;
- // fake other parameters by copying from the existing connection
- connector = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected", dialer);
- ScheduleCallHere(connector);
+ CallJobHere(93, 3, this, Xaction, Xaction::dieOnConnectionFailure);
#endif
return;
}
- connection = new Comm::Connection;
- connection->remote = ia->current();
- connection->remote.port(s.cfg().port);
- getOutgoingAddress(NULL, connection);
+ const Comm::ConnectionPointer conn = new Comm::Connection();
+ conn->remote = ia->current();
+ conn->remote.port(s.cfg().port);
+ getOutgoingAddress(nullptr, conn);
// TODO: service bypass status may differ from that of a transaction
typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> ConnectDialer;
- connector = JobCallback(93,3, ConnectDialer, this, Adaptation::Icap::Xaction::noteCommConnected);
- cs = new Comm::ConnOpener(connection, connector, TheConfig.connect_timeout(service().cfg().bypass));
+ AsyncCall::Pointer callback = JobCallback(93, 3, ConnectDialer, this, Adaptation::Icap::Xaction::noteCommConnected);
+ const auto cs = new Comm::ConnOpener(conn, callback, TheConfig.connect_timeout(service().cfg().bypass));
cs->setHost(s.cfg().host.termedBuf());
- AsyncJob::Start(cs.get());
+ transportWait.start(cs, callback);
}
/*
closer = NULL;
}
+ commUnsetConnTimeout(connection);
+
cancelRead(); // may not work
if (reuseConnection && !doneWithIo()) {
writer = NULL;
reader = NULL;
- connector = NULL;
connection = NULL;
}
}
-// connection with the ICAP service established
+/// called when the connection attempt to an ICAP service completes (successfully or not)
void Adaptation::Icap::Xaction::noteCommConnected(const CommConnectCbParams &io)
{
- cs = NULL;
+ transportWait.finish();
- if (io.flag == Comm::TIMEOUT) {
- handleCommTimedout();
+ if (io.flag != Comm::OK) {
+ dieOnConnectionFailure(); // throws
return;
}
- Must(connector != NULL);
- connector = NULL;
-
- if (io.flag != Comm::OK)
- dieOnConnectionFailure(); // throws
-
- typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommTimeoutCbParams> TimeoutDialer;
- AsyncCall::Pointer timeoutCall = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommTimedout",
- TimeoutDialer(this,&Adaptation::Icap::Xaction::noteCommTimedout));
- commSetConnTimeout(io.conn, TheConfig.connect_timeout(service().cfg().bypass), timeoutCall);
+ useTransportConnection(io.conn);
+}
- typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommCloseCbParams> CloseDialer;
- closer = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed",
- CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed));
- comm_add_close_handler(io.conn->fd, closer);
+/// React to the availability of a transport connection to the ICAP service.
+/// The given connection may (or may not) be secured already.
+void
+Adaptation::Icap::Xaction::useTransportConnection(const Comm::ConnectionPointer &conn)
+{
+ assert(Comm::IsConnOpen(conn));
+ assert(!connection);
// If it is a reused connection and the TLS object is built
// we should not negotiate new TLS session
- const auto &ssl = fd_table[io.conn->fd].ssl;
+ const auto &ssl = fd_table[conn->fd].ssl;
if (!ssl && service().cfg().secure.encryptTransport) {
+ // XXX: Exceptions orphan conn.
CbcPointer<Adaptation::Icap::Xaction> me(this);
- securer = asyncCall(93, 4, "Adaptation::Icap::Xaction::handleSecuredPeer",
- MyIcapAnswerDialer(me, &Adaptation::Icap::Xaction::handleSecuredPeer));
+ AsyncCall::Pointer callback = asyncCall(93, 4, "Adaptation::Icap::Xaction::handleSecuredPeer",
+ MyIcapAnswerDialer(me, &Adaptation::Icap::Xaction::handleSecuredPeer));
- auto *sslConnector = new Ssl::IcapPeerConnector(theService, io.conn, securer, masterLogEntry(), TheConfig.connect_timeout(service().cfg().bypass));
- AsyncJob::Start(sslConnector); // will call our callback
+ const auto sslConnector = new Ssl::IcapPeerConnector(theService, conn, callback, masterLogEntry(), TheConfig.connect_timeout(service().cfg().bypass));
+
+ encryptionWait.start(sslConnector, callback);
return;
}
-// ?? fd_table[io.conn->fd].noteUse(icapPconnPool);
+ useIcapConnection(conn);
+}
+
+/// react to the availability of a fully-ready ICAP connection
+void
+Adaptation::Icap::Xaction::useIcapConnection(const Comm::ConnectionPointer &conn)
+{
+ assert(!connection);
+ assert(conn);
+ assert(Comm::IsConnOpen(conn));
+ connection = conn;
service().noteConnectionUse(connection);
- handleCommConnected();
+ typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommCloseCbParams> CloseDialer;
+ closer = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed",
+ CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed));
+ comm_add_close_handler(connection->fd, closer);
+
+ startShoveling();
}
void Adaptation::Icap::Xaction::dieOnConnectionFailure()
// communication timeout with the ICAP service
void Adaptation::Icap::Xaction::noteCommTimedout(const CommTimeoutCbParams &)
-{
- handleCommTimedout();
-}
-
-void Adaptation::Icap::Xaction::handleCommTimedout()
{
debugs(93, 2, HERE << typeName << " failed: timeout with " <<
theService->cfg().methodStr() << " " <<
theService->cfg().uri << status());
reuseConnection = false;
- const bool whileConnecting = connector != NULL;
- if (whileConnecting) {
- assert(!haveConnection());
- theService->noteConnectionFailed("timedout");
- } else
- closeConnection(); // so that late Comm callbacks do not disturb bypass
- throw TexcHere(whileConnecting ?
- "timed out while connecting to the ICAP service" :
- "timed out while talking to the ICAP service");
+ assert(haveConnection());
+ closeConnection();
+ throw TextException("timed out while talking to the ICAP service", Here());
}
// unexpected connection close while talking to the ICAP service
void Adaptation::Icap::Xaction::noteCommClosed(const CommCloseCbParams &)
{
- if (securer != NULL) {
- securer->cancel("Connection closed before SSL negotiation finished");
- securer = NULL;
+ if (connection) {
+ connection->noteClosure();
+ connection = nullptr;
}
closer = NULL;
- handleCommClosed();
-}
-void Adaptation::Icap::Xaction::handleCommClosed()
-{
static const auto d = MakeNamedErrorDetail("ICAP_XACT_CLOSE");
detailError(d);
mustStop("ICAP service connection externally closed");
bool Adaptation::Icap::Xaction::doneAll() const
{
- return !waitingForDns && !connector && !securer && !reader && !writer &&
+ return !waitingForDns && !transportWait && !encryptionWait &&
+ !reader && !writer &&
Adaptation::Initiate::doneAll();
}
bool Adaptation::Icap::Xaction::doneWithIo() const
{
return haveConnection() &&
- !connector && !reader && !writer && // fast checks, some redundant
+ !transportWait && !reader && !writer && // fast checks, some redundant
doneReading() && doneWriting();
}
void Adaptation::Icap::Xaction::swanSong()
{
// kids should sing first and then call the parent method.
- if (cs.valid()) {
- debugs(93,6, HERE << id << " about to notify ConnOpener!");
- CallJobHere(93, 3, cs, Comm::ConnOpener, noteAbort);
- cs = NULL;
+ if (transportWait || encryptionWait) {
service().noteConnectionFailed("abort");
}
void
Adaptation::Icap::Xaction::handleSecuredPeer(Security::EncryptorAnswer &answer)
{
- Must(securer != NULL);
- securer = NULL;
-
- if (closer != NULL) {
- if (Comm::IsConnOpen(answer.conn))
- comm_remove_close_handler(answer.conn->fd, closer);
- else
- closer->cancel("securing completed");
- closer = NULL;
- }
+ encryptionWait.finish();
+ assert(!answer.tunneled);
if (answer.error.get()) {
- if (answer.conn != NULL)
- answer.conn->close();
+ assert(!answer.conn);
+ // TODO: Refactor dieOnConnectionFailure() to be usable here as well.
debugs(93, 2, typeName <<
" TLS negotiation to " << service().cfg().uri << " failed");
service().noteConnectionFailed("failure");
debugs(93, 5, "TLS negotiation to " << service().cfg().uri << " complete");
- service().noteConnectionUse(answer.conn);
+ assert(answer.conn);
+
+ // The socket could get closed while our callback was queued. Sync
+ // Connection. XXX: Connection::fd may already be stale/invalid here.
+ if (answer.conn->isOpen() && fd_table[answer.conn->fd].closing()) {
+ answer.conn->noteClosure();
+ service().noteConnectionFailed("external TLS connection closure");
+ static const auto d = MakeNamedErrorDetail("ICAP_XACT_SSL_CLOSE");
+ detailError(d);
+ throw TexcHere("external closure of the TLS ICAP service connection");
+ }
- handleCommConnected();
+ useIcapConnection(answer.conn);
}
#include "AccessLogEntry.h"
#include "adaptation/icap/ServiceRep.h"
#include "adaptation/Initiate.h"
+#include "base/JobWait.h"
#include "comm/ConnOpener.h"
#include "error/forward.h"
#include "HttpReply.h"
class MemBuf;
+namespace Ssl {
+class IcapPeerConnector;
+}
+
namespace Adaptation
{
namespace Icap
virtual void start();
virtual void noteInitiatorAborted(); // TODO: move to Adaptation::Initiate
+ /// starts sending/receiving ICAP messages
+ virtual void startShoveling() = 0;
+
// comm hanndlers; called by comm handler wrappers
- virtual void handleCommConnected() = 0;
virtual void handleCommWrote(size_t sz) = 0;
virtual void handleCommRead(size_t sz) = 0;
- virtual void handleCommTimedout();
- virtual void handleCommClosed();
void handleSecuredPeer(Security::EncryptorAnswer &answer);
/// record error detail if possible
void openConnection();
void closeConnection();
- void dieOnConnectionFailure();
bool haveConnection() const;
void scheduleRead();
ServiceRep &service();
private:
+ void useTransportConnection(const Comm::ConnectionPointer &);
+ void useIcapConnection(const Comm::ConnectionPointer &);
+ void dieOnConnectionFailure();
void tellQueryAborted();
void maybeLog();
protected:
- Comm::ConnectionPointer connection; ///< ICAP server connection
Adaptation::Icap::ServiceRep::Pointer theService;
SBuf readBuf;
bool ignoreLastWrite;
bool waitingForDns; ///< expecting a ipcache_nbgethostbyname() callback
- const char *stopReason;
-
- // active (pending) comm callbacks for the ICAP server connection
- AsyncCall::Pointer connector;
AsyncCall::Pointer reader;
AsyncCall::Pointer writer;
- AsyncCall::Pointer closer;
AccessLogEntry::Pointer alep; ///< icap.log entry
AccessLogEntry &al; ///< short for *alep
timeval icap_tio_finish; /*time when the last byte of the ICAP responsewas received*/
private:
- Comm::ConnOpener::Pointer cs;
- AsyncCall::Pointer securer; ///< whether we are securing a connection
+ /// waits for a transport connection to the ICAP server to be established/opened
+ JobWait<Comm::ConnOpener> transportWait;
+
+ /// waits for the established transport connection to be secured/encrypted
+ JobWait<Ssl::IcapPeerConnector> encryptionWait;
+
+ /// open and, if necessary, secured connection to the ICAP server (or nil)
+ Comm::ConnectionPointer connection;
+
+ AsyncCall::Pointer closer;
};
} // namespace Icap
InstanceIdDefinitions(AsyncJob, "job");
-AsyncJob::Pointer AsyncJob::Start(AsyncJob *j)
+void
+AsyncJob::Start(const Pointer &job)
{
- AsyncJob::Pointer job(j);
CallJobHere(93, 5, job, AsyncJob, start);
- return job;
+ job->started_ = true; // it is the attempt that counts
}
AsyncJob::AsyncJob(const char *aTypeName) :
{
debugs(93,5, "AsyncJob destructed, this=" << this <<
" type=" << typeName << " [" << id << ']');
+ assert(!started_ || swanSang_);
}
void AsyncJob::start()
AsyncCall::Pointer inCallSaved = inCall;
void *thisSaved = this;
+ // TODO: Swallow swanSong() exceptions to reduce memory leaks.
+
+ // Job callback invariant: swanSong() is (only) called for started jobs.
+ // Here to detect violations in kids that forgot to call our swanSong().
+ assert(started_);
+
+ swanSang_ = true; // it is the attempt that counts
swanSong();
- delete this; // this is the only place where the object is deleted
+ delete this; // this is the only place where a started job is deleted
// careful: this object does not exist any more
debugs(93, 6, HERE << *inCallSaved << " ended " << thisSaved);
public:
AsyncJob(const char *aTypeName);
- /// starts a freshly created job (i.e., makes the job asynchronous)
- static Pointer Start(AsyncJob *job);
+ /// Promises to start the configured job (eventually). The job is deemed to
+ /// be running asynchronously beyond this point, so the caller should only
+ /// access the job object via AsyncCalls rather than directly.
+ ///
+ /// swanSong() is only called for jobs for which this method has returned
+ /// successfully (i.e. without throwing).
+ static void Start(const Pointer &job);
protected:
// XXX: temporary method to replace "delete this" in jobs-in-transition.
/// called when the job throws during an async call
virtual void callException(const std::exception &e);
+ /// process external request to terminate now (i.e. during this async call)
+ void handleStopRequest() { mustStop("externally aborted"); }
+
+ const InstanceId<AsyncJob> id; ///< job identifier
+
protected:
// external destruction prohibited to ensure swanSong() is called
virtual ~AsyncJob();
const char *stopReason; ///< reason for forcing done() to be true
const char *typeName; ///< kid (leaf) class name, for debugging
AsyncCall::Pointer inCall; ///< the asynchronous call being handled, if any
- const InstanceId<AsyncJob> id; ///< job identifier
+
+ bool started_ = false; ///< Start() has finished successfully
+ bool swanSang_ = false; ///< swanSong() was called
};
#endif /* SQUID_ASYNC_JOB_H */
--- /dev/null
+/*
+ * Copyright (C) 1996-2020 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#include "squid.h"
+#include "base/AsyncJobCalls.h"
+#include "base/JobWait.h"
+
+#include <cassert>
+#include <iostream>
+
+JobWaitBase::JobWaitBase() = default;
+
+JobWaitBase::~JobWaitBase()
+{
+ cancel("owner gone");
+}
+
+void
+JobWaitBase::start_(const AsyncJob::Pointer aJob, const AsyncCall::Pointer aCall)
+{
+ // Invariant: The wait will be over. We cannot guarantee that the job will
+ // call the callback, of course, but we can check these prerequisites.
+ assert(aCall);
+ assert(aJob.valid());
+
+ // "Double" waiting state leads to conflicting/mismatching callbacks
+ // detailed in finish(). Detect that bug ASAP.
+ assert(!waiting());
+
+ assert(!callback_);
+ assert(!job_);
+ callback_ = aCall;
+ job_ = aJob;
+
+ AsyncJob::Start(job_.get());
+}
+
+void
+JobWaitBase::finish()
+{
+ // Unexpected callbacks might result in disasters like secrets exposure,
+ // data corruption, or expensive message routing mistakes when the callback
+ // info is applied to the wrong message part or acted upon prematurely.
+ assert(waiting());
+ clear();
+}
+
+void
+JobWaitBase::cancel(const char *reason)
+{
+ if (callback_) {
+ callback_->cancel(reason);
+
+ // Instead of AsyncJob, the class parameter could be Job. That would
+ // avoid runtime child-to-parent CbcPointer conversion overheads, but
+ // complicate support for Jobs with virtual AsyncJob bases (GCC error:
+ // "pointer to member conversion via virtual base AsyncJob") and also
+ // cache-log "Job::handleStopRequest()" with a non-existent class name.
+ CallJobHere(callback_->debugSection, callback_->debugLevel, job_, AsyncJob, handleStopRequest);
+
+ clear();
+ }
+}
+
+void
+JobWaitBase::print(std::ostream &os) const
+{
+ // use a backarrow to emphasize that this is a callback: call24<-job6
+ if (callback_)
+ os << callback_->id << "<-";
+ if (const auto rawJob = job_.get())
+ os << rawJob->id;
+ else
+ os << job_; // raw pointer of a gone job may still be useful for triage
+}
+
--- /dev/null
+/*
+ * Copyright (C) 1996-2021 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#ifndef SQUID_BASE_JOBWAIT_H
+#define SQUID_BASE_JOBWAIT_H
+
+#include "base/AsyncJob.h"
+#include "base/CbcPointer.h"
+
+#include <iosfwd>
+
+/// Manages waiting for an AsyncJob callback. Use type-safe JobWait instead.
+/// This base class does not contain code specific to the actual Job type.
+class JobWaitBase
+{
+public:
+ JobWaitBase();
+ ~JobWaitBase();
+
+ /// no copying of any kind: each waiting context needs a dedicated AsyncCall
+ JobWaitBase(JobWaitBase &&) = delete;
+
+ explicit operator bool() const { return waiting(); }
+
+ /// whether we are currently waiting for the job to call us back
+ /// the job itself may be gone even if this returns true
+ bool waiting() const { return bool(callback_); }
+
+ /// ends wait (after receiving the call back)
+ /// forgets the job which is likely to be gone by now
+ void finish();
+
+ /// aborts wait (if any) before receiving the call back
+ /// does nothing if we are not waiting
+ void cancel(const char *reason);
+
+ /// summarizes what we are waiting for (for debugging)
+ void print(std::ostream &) const;
+
+protected:
+ /// starts waiting for the given job to call the given callback
+ void start_(AsyncJob::Pointer, AsyncCall::Pointer);
+
+private:
+ /// the common part of finish() and cancel()
+ void clear() { job_.clear(); callback_ = nullptr; }
+
+ /// the job that we are waiting to call us back (or nil)
+ AsyncJob::Pointer job_;
+
+ /// the call we are waiting for the job_ to make (or nil)
+ AsyncCall::Pointer callback_;
+};
+
+/// Manages waiting for an AsyncJob callback.
+/// Completes JobWaitBase by providing Job type-specific members.
+template <class Job>
+class JobWait: public JobWaitBase
+{
+public:
+ typedef CbcPointer<Job> JobPointer;
+
+ /// starts waiting for the given job to call the given callback
+ void start(const JobPointer &aJob, const AsyncCall::Pointer &aCallback) {
+ start_(aJob, aCallback);
+ typedJob_ = aJob;
+ }
+
+ /// \returns a cbdata pointer to the job we are waiting for (or nil)
+ /// the returned pointer may be falsy, even if we are still waiting()
+ JobPointer job() const { return waiting() ? typedJob_ : nullptr; }
+
+private:
+ /// nearly duplicates JobWaitBase::job_ but exposes the actual job type
+ JobPointer typedJob_;
+};
+
+inline
+std::ostream &operator <<(std::ostream &os, const JobWaitBase &wait)
+{
+ wait.print(os);
+ return os;
+}
+
+#endif /* SQUID_BASE_JOBWAIT_H */
+
Here.h \
InstanceId.cc \
InstanceId.h \
+ JobWait.cc \
+ JobWait.h \
Lock.h \
LookupTable.h \
LruMap.h \
template<class Cbc> class CbcPointer;
template<class RefCountableKid> class RefCount;
+template<class Job> class JobWait;
typedef CbcPointer<AsyncJob> AsyncJobPointer;
typedef RefCount<CodeContext> CodeContextPointer;
/* This is a handler normally called by comm_close() */
void ConnStateData::connStateClosed(const CommCloseCbParams &)
{
+ if (clientConnection) {
+ clientConnection->noteClosure();
+ // keep closed clientConnection for logging, clientdb cleanup, etc.
+ }
deleteThis("ConnStateData::connStateClosed");
}
Ftp::Client::~Client()
{
- if (data.opener != NULL) {
- data.opener->cancel("Ftp::Client destructed");
- data.opener = NULL;
- }
data.close();
safe_free(old_request);
debugs(9, 3, "connecting to " << conn->remote);
typedef CommCbMemFunT<Client, CommConnectCbParams> Dialer;
- data.opener = JobCallback(9, 3, Dialer, this, Ftp::Client::dataChannelConnected);
- Comm::ConnOpener *cs = new Comm::ConnOpener(conn, data.opener, Config.Timeout.connect);
+ AsyncCall::Pointer callback = JobCallback(9, 3, Dialer, this, Ftp::Client::dataChannelConnected);
+ const auto cs = new Comm::ConnOpener(conn, callback, Config.Timeout.connect);
cs->setHost(data.host);
- AsyncJob::Start(cs);
+ dataConnWait.start(cs, callback);
}
bool
Ftp::Client::dataClosed(const CommCloseCbParams &)
{
debugs(9, 4, status());
+ if (data.conn)
+ data.conn->noteClosure();
if (data.listenConn != NULL) {
data.listenConn->close();
data.listenConn = NULL;
- // NP clear() does the: data.fd = -1;
}
data.clear();
}
Ftp::Client::ctrlClosed(const CommCloseCbParams &)
{
debugs(9, 4, status());
+ if (ctrl.conn)
+ ctrl.conn->noteClosure();
ctrl.clear();
doneWithFwd = "ctrlClosed()"; // assume FwdState is monitoring too
mustStop("Ftp::Client::ctrlClosed");
*/
Comm::ConnectionPointer listenConn;
- AsyncCall::Pointer opener; ///< Comm opener handler callback.
private:
AsyncCall::Pointer closer; ///< Comm close handler callback
};
virtual void sentRequestBody(const CommIoCbParams &io);
virtual void doneSendingRequestBody();
+ /// Waits for an FTP data connection to the server to be established/opened.
+ /// This wait only happens in FTP passive mode (via PASV or EPSV).
+ JobWait<Comm::ConnOpener> dataConnWait;
+
private:
bool parseControlReply(size_t &bytesUsed);
flags.pasv_supported = false;
debugs(9, DBG_IMPORTANT, "FTP Gateway timeout in SENT_PASV state");
- // cancel the data connection setup.
- if (data.opener != NULL) {
- data.opener->cancel("timeout");
- data.opener = NULL;
- }
+ // cancel the data connection setup, if any
+ dataConnWait.cancel("timeout");
+
data.close();
}
Ftp::Gateway::dataChannelConnected(const CommConnectCbParams &io)
{
debugs(9, 3, HERE);
- data.opener = NULL;
+ dataConnWait.finish();
if (io.flag != Comm::OK) {
debugs(9, 2, HERE << "Failed to connect. Retrying via another method.");
return !doneWithServer();
}
-AsyncJob::Pointer
+void
Ftp::StartGateway(FwdState *const fwdState)
{
- return AsyncJob::Start(new Ftp::Gateway(fwdState));
+ AsyncJob::Start(new Ftp::Gateway(fwdState));
}
Ftp::Relay::dataChannelConnected(const CommConnectCbParams &io)
{
debugs(9, 3, status());
- data.opener = NULL;
+ dataConnWait.finish();
if (io.flag != Comm::OK) {
debugs(9, 2, "failed to connect FTP server data channel");
ftpClient->dataComplete();
}
-AsyncJob::Pointer
+void
Ftp::StartRelay(FwdState *const fwdState)
{
- return AsyncJob::Start(new Ftp::Relay(fwdState));
+ AsyncJob::Start(new Ftp::Relay(fwdState));
}
Http::Tunneler::handleConnectionClosure(const CommCloseCbParams ¶ms)
{
closer = nullptr;
+ if (connection) {
+ countFailingConnection();
+ connection->noteClosure();
+ connection = nullptr;
+ }
bailWith(new ErrorState(ERR_CONNECT_FAIL, Http::scBadGateway, request.getRaw(), al));
}
Must(error);
answer().squidError = error;
- if (const auto p = connection->getPeer())
- peerConnectFailed(p);
+ if (const auto failingConnection = connection) {
+ // TODO: Reuse to-peer connections after a CONNECT error response.
+ countFailingConnection();
+ disconnect();
+ failingConnection->close();
+ }
callBack();
- disconnect();
-
- if (noteFwdPconnUse)
- fwdPconnPool->noteUses(fd_table[connection->fd].pconn.uses);
- // TODO: Reuse to-peer connections after a CONNECT error response.
- connection->close();
- connection = nullptr;
}
void
Http::Tunneler::sendSuccess()
{
assert(answer().positive());
- callBack();
+ assert(Comm::IsConnOpen(connection));
+ answer().conn = connection;
disconnect();
+ callBack();
+}
+
+void
+Http::Tunneler::countFailingConnection()
+{
+ assert(connection);
+ if (const auto p = connection->getPeer())
+ peerConnectFailed(p);
+ if (noteFwdPconnUse && connection->isOpen())
+ fwdPconnPool->noteUses(fd_table[connection->fd].pconn.uses);
}
void
Http::Tunneler::disconnect()
{
+ const auto stillOpen = Comm::IsConnOpen(connection);
+
if (closer) {
- comm_remove_close_handler(connection->fd, closer);
+ if (stillOpen)
+ comm_remove_close_handler(connection->fd, closer);
closer = nullptr;
}
if (reader) {
- Comm::ReadCancel(connection->fd, reader);
+ if (stillOpen)
+ Comm::ReadCancel(connection->fd, reader);
reader = nullptr;
}
- // remove connection timeout handler
- commUnsetConnTimeout(connection);
+ if (stillOpen)
+ commUnsetConnTimeout(connection);
+
+ connection = nullptr; // may still be open
}
void
Http::Tunneler::callBack()
{
- debugs(83, 5, connection << status());
- if (answer().positive())
- answer().conn = connection;
+ debugs(83, 5, answer().conn << status());
+ assert(!connection); // returned inside answer() or gone
auto cb = callback;
callback = nullptr;
ScheduleCallHere(cb);
AsyncJob::swanSong();
if (callback) {
- if (requestWritten && tunnelEstablished) {
+ if (requestWritten && tunnelEstablished && Comm::IsConnOpen(connection)) {
sendSuccess();
} else {
- // we should have bailed when we discovered the job-killing problem
- debugs(83, DBG_IMPORTANT, "BUG: Unexpected state while establishing a CONNECT tunnel " << connection << status());
+ // job-ending emergencies like handleStopRequest() or callException()
bailWith(new ErrorState(ERR_GATEWAY_FAILURE, Http::scInternalServerError, request.getRaw(), al));
}
assert(!callback);
void handleResponse(const bool eof);
void bailOnResponseError(const char *error, HttpReply *);
+private:
/// sends the given error to the initiator
void bailWith(ErrorState*);
/// a bailWith(), sendSuccess() helper: sends results to the initiator
void callBack();
- /// a bailWith(), sendSuccess() helper: stops monitoring the connection
+ /// stops monitoring the connection
void disconnect();
+ /// updates connection usage history before the connection is closed
+ void countFailingConnection();
+
TunnelerAnswer &answer();
-private:
AsyncCall::Pointer writer; ///< called when the request has been written
AsyncCall::Pointer reader; ///< called when the response should be read
AsyncCall::Pointer closer; ///< called when the connection is being closed
{
/// A new FTP Gateway job
-AsyncJobPointer StartGateway(FwdState *const fwdState);
+void StartGateway(FwdState *const fwdState);
/// A new FTP Relay job
-AsyncJobPointer StartRelay(FwdState *const fwdState);
+void StartRelay(FwdState *const fwdState);
/** Construct an URI with leading / in PATH portion for use by CWD command
* possibly others. FTP encodes absolute paths as beginning with '/'
// If call is not canceled schedule it for execution else ignore it
if (!call->canceled()) {
debugs(5, 5, "commCallCloseHandlers: ch->handler=" << call);
+ // XXX: Without the following code, callback fd may be -1.
+ // typedef CommCloseCbParams Params;
+ // auto ¶ms = GetCommParams<Params>(call);
+ // params.fd = fd;
ScheduleCallHere(call);
}
}
CbDataList<DeferredRead> *temp = (CbDataList<DeferredRead> *)params.data;
temp->element.closer = NULL;
+ if (temp->element.theRead.conn) {
+ temp->element.theRead.conn->noteClosure();
+ temp->element.theRead.conn = nullptr;
+ }
temp->element.markCancelled();
}
if (aRead.cancelled)
return;
+ // TODO: This check still allows theReader call with a closed theRead.conn.
+ // If a delayRead() caller has a close connection handler, then such a call
+ // would be useless and dangerous. If a delayRead() caller does not have it,
+ // then the caller will get stuck when an external connection closure makes
+ // aRead.cancelled (checked above) true.
if (Comm::IsConnOpen(aRead.theRead.conn) && fd_table[aRead.theRead.conn->fd].closing())
return;
deadline_(squid_curtime + static_cast<time_t>(ctimeout))
{
debugs(5, 3, "will connect to " << c << " with " << ctimeout << " timeout");
+ assert(conn_); // we know where to go
+
+ // Sharing a being-modified Connection object with the caller is dangerous,
+ // but we cannot ban (or even check for) that using existing APIs. We do not
+ // want to clone "just in case" because cloning is a bit expensive, and most
+ // callers already have a non-owned Connection object to give us. Until the
+ // APIs improve, we can only check that the connection is not open.
+ assert(!conn_->isOpen());
}
Comm::ConnOpener::~ConnOpener()
if (temporaryFd_ >= 0)
closeFd();
+ // did we abort while owning an open connection?
+ if (conn_ && conn_->isOpen())
+ conn_->close();
+
// did we abort while waiting between retries?
if (calls_.sleep_)
cancelSleep();
" [" << callback_->id << ']' );
// TODO save the pconn to the pconnPool ?
} else {
+ assert(conn_);
+
+ // free resources earlier and simplify recipients
+ if (errFlag != Comm::OK)
+ conn_->close(); // may not be opened
+ else
+ assert(conn_->isOpen());
+
typedef CommConnectCbParams Params;
Params ¶ms = GetCommParams<Params>(callback_);
params.conn = conn_;
+ conn_ = nullptr; // release ownership; prevent closure by us
params.flag = errFlag;
params.xerrno = xerrno;
ScheduleCallHere(callback_);
void
Comm::ConnOpener::cleanFd()
{
- debugs(5, 4, HERE << conn_ << " closing temp FD " << temporaryFd_);
+ debugs(5, 4, conn_ << "; temp FD " << temporaryFd_);
Must(temporaryFd_ >= 0);
fde &f = fd_table[temporaryFd_];
Comm::ConnOpener::createFd()
{
Must(temporaryFd_ < 0);
+ assert(conn_);
// our initators signal abort by cancelling their callbacks
if (callback_ == NULL || callback_->canceled())
namespace Comm
{
-/**
- * Async-opener of a Comm connection.
- */
+/// Asynchronously opens a TCP connection. Returns CommConnectCbParams: either
+/// Comm::OK with an open connection or another Comm::Flag with a closed one.
class ConnOpener : public AsyncJob
{
CBDATA_CLASS(ConnOpener);
public:
- void noteAbort() { mustStop("externally aborted"); }
-
typedef CbcPointer<ConnOpener> Pointer;
virtual bool doneAll() const;
*/
#include "squid.h"
+#include "base/JobWait.h"
#include "CachePeer.h"
#include "cbdata.h"
#include "comm.h"
}
Comm::ConnectionPointer
-Comm::Connection::cloneDestinationDetails() const
+Comm::Connection::cloneProfile() const
{
- const ConnectionPointer c = new Comm::Connection;
- c->setAddrs(local, remote);
- c->peerType = peerType;
- c->flags = flags;
- c->peer_ = cbdataReference(getPeer());
- assert(!c->isOpen());
- return c;
-}
+ const ConnectionPointer clone = new Comm::Connection;
+ auto &c = *clone; // optimization
+
+ /*
+ * Copy or excuse each data member. Excused members do not belong to a
+ * Connection configuration profile because their values cannot be reused
+ * across (co-existing) Connection objects and/or are tied to their own
+ * object lifetime.
+ */
+
+ c.setAddrs(local, remote);
+ c.peerType = peerType;
+ // fd excused
+ c.tos = tos;
+ c.nfmark = nfmark;
+ c.nfConnmark = nfConnmark;
+ // COMM_ORPHANED is not a part of connection opening instructions
+ c.flags = flags & ~COMM_ORPHANED;
+ // rfc931 is excused
+
+#if USE_SQUID_EUI
+ // These are currently only set when accepting connections and never used
+ // for establishing new ones, so this copying is currently in vain, but,
+ // technically, they can be a part of connection opening instructions.
+ c.remoteEui48 = remoteEui48;
+ c.remoteEui64 = remoteEui64;
+#endif
-Comm::ConnectionPointer
-Comm::Connection::cloneIdentDetails() const
-{
- auto c = cloneDestinationDetails();
- c->tos = tos;
- c->nfmark = nfmark;
- c->nfConnmark = nfConnmark;
- c->startTime_ = startTime_;
- return c;
+ // id excused
+ c.peer_ = cbdataReference(getPeer());
+ // startTime_ excused
+ // tlsHistory excused
+
+ debugs(5, 5, this << " made " << c);
+ assert(!c.isOpen());
+ return clone;
}
void
/** Clear the connection properties and close any open socket. */
virtual ~Connection();
- /// Create a new (closed) IDENT Connection object based on our from-Squid
- /// connection properties.
- ConnectionPointer cloneIdentDetails() const;
+ /// To prevent accidental copying of Connection objects that we started to
+ /// open or that are open, use cloneProfile() instead.
+ Connection(const Connection &&) = delete;
- /// Create a new (closed) Connection object pointing to the same destination
- /// as this from-Squid connection.
- ConnectionPointer cloneDestinationDetails() const;
+ /// Create a new closed Connection with the same configuration as this one.
+ ConnectionPointer cloneProfile() const;
/// close the still-open connection when its last reference is gone
void enterOrphanage() { flags |= COMM_ORPHANED; }
virtual ScopedId codeContextGist() const override;
virtual std::ostream &detailCodeContext(std::ostream &os) const override;
-private:
- /** These objects may not be exactly duplicated. Use cloneIdentDetails() or
- * cloneDestinationDetails() instead.
- */
- Connection(const Connection &c);
-
- /** These objects may not be exactly duplicated. Use cloneIdentDetails() or
- * cloneDestinationDetails() instead.
- */
- Connection & operator =(const Connection &c);
-
public:
/** Address/Port for the Squid end of a TCP link. */
Ip::Address local;
Comm::TcpAcceptor::handleClosure(const CommCloseCbParams &)
{
closer_ = NULL;
- conn = NULL;
+ if (conn) {
+ conn->noteClosure();
+ conn = nullptr;
+ }
Must(done());
}
idnsVCClosed(const CommCloseCbParams ¶ms)
{
nsvc * vc = (nsvc *)params.data;
+ if (vc->conn) {
+ vc->conn->noteClosure();
+ vc->conn = nullptr;
+ }
delete vc;
}
public:
Eui48() { clear(); }
- Eui48(const Eui48 &t) { memcpy(this, &t, sizeof(Eui48)); }
bool operator== (const Eui48 &t) const { return memcmp(eui, t.eui, SZ_EUI48_BUF) == 0; }
bool operator< (const Eui48 &t) const { return memcmp(eui, t.eui, SZ_EUI48_BUF) < 0; }
- ~Eui48() {}
const unsigned char *get(void);
entry->lock("gopherState");
*replybuf = 0;
}
- ~GopherStateData() {if(buf) swanSong();}
- /* AsyncJob API emulated */
- void deleteThis(const char *aReason);
- void swanSong();
+ ~GopherStateData();
public:
StoreEntry *entry;
gopherStateFree(const CommCloseCbParams ¶ms)
{
GopherStateData *gopherState = (GopherStateData *)params.data;
-
- if (gopherState == NULL)
- return;
-
- gopherState->deleteThis("gopherStateFree");
+ // Assume that FwdState is monitoring and calls noteClosure(). See XXX about
+ // Connection sharing with FwdState in gopherStart().
+ delete gopherState;
}
-void
-GopherStateData::deleteThis(const char *)
-{
- swanSong();
- delete this;
-}
-
-void
-GopherStateData::swanSong()
+GopherStateData::~GopherStateData()
{
if (entry)
entry->unlock("gopherState");
- if (buf) {
+ if (buf)
memFree(buf, MEM_4K_BUF);
- buf = nullptr;
- }
}
/**
return;
}
+ // XXX: Sharing open Connection with FwdState that has its own handlers/etc.
gopherState->serverConn = fwd->serverConnection();
gopherSendRequest(fwd->serverConnection()->fd, gopherState);
AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "gopherTimeout",
#include "squid.h"
#if USE_IDENT
+#include "base/JobWait.h"
#include "comm.h"
#include "comm/Connection.h"
#include "comm/ConnOpener.h"
Comm::ConnectionPointer conn;
MemBuf queryMsg; ///< the lookup message sent to IDENT server
- IdentClient *clients;
+ IdentClient *clients = nullptr;
char buf[IDENT_BUFSIZE];
+
+ /// waits for a connection to the IDENT server to be established/opened
+ JobWait<Comm::ConnOpener> connWait;
+
+private:
+ // use deleteThis() to destroy
+ ~IdentStateData();
};
CBDATA_CLASS_INIT(IdentStateData);
Ident::IdentConfig Ident::TheConfig;
void
-Ident::IdentStateData::deleteThis(const char *)
+Ident::IdentStateData::deleteThis(const char *reason)
{
+ debugs(30, 3, reason);
swanSong();
delete this;
}
{
if (clients != NULL)
notify(NULL);
+}
+
+Ident::IdentStateData::~IdentStateData() {
+ assert(!clients);
if (Comm::IsConnOpen(conn)) {
comm_remove_close_handler(conn->fd, Ident::Close, this);
Ident::Close(const CommCloseCbParams ¶ms)
{
IdentStateData *state = (IdentStateData *)params.data;
+ if (state->conn) {
+ state->conn->noteClosure();
+ state->conn = nullptr;
+ }
state->deleteThis("connection closed");
}
Ident::ConnectDone(const Comm::ConnectionPointer &conn, Comm::Flag status, int, void *data)
{
IdentStateData *state = (IdentStateData *)data;
+ state->connWait.finish();
+
+ // Start owning the supplied connection (so that it is not orphaned if this
+ // function bails early). As a (tiny) optimization or perhaps just diff
+ // minimization, the close handler is added later, when we know we are not
+ // bailing. This delay is safe because comm_remove_close_handler() forgives
+ // missing handlers.
+ assert(conn); // but may be closed
+ assert(!state->conn);
+ state->conn = conn;
if (status != Comm::OK) {
if (status == Comm::TIMEOUT)
return;
}
- assert(conn != NULL && conn == state->conn);
- comm_add_close_handler(conn->fd, Ident::Close, state);
+ assert(state->conn->isOpen());
+ comm_add_close_handler(state->conn->fd, Ident::Close, state);
AsyncCall::Pointer writeCall = commCbCall(5,4, "Ident::WriteFeedback",
CommIoCbPtrFun(Ident::WriteFeedback, state));
state->hash.key = xstrdup(key);
// copy the conn details. We do not want the original FD to be re-used by IDENT.
- state->conn = conn->cloneIdentDetails();
+ const auto identConn = conn->cloneProfile();
// NP: use random port for secure outbound to IDENT_PORT
- state->conn->local.port(0);
- state->conn->remote.port(IDENT_PORT);
+ identConn->local.port(0);
+ identConn->remote.port(IDENT_PORT);
// build our query from the original connection details
state->queryMsg.init();
hash_join(ident_hash, &state->hash);
AsyncCall::Pointer call = commCbCall(30,3, "Ident::ConnectDone", CommConnectCbPtrFun(Ident::ConnectDone, state));
- AsyncJob::Start(new Comm::ConnOpener(state->conn, call, Ident::TheConfig.timeout));
+ const auto connOpener = new Comm::ConnOpener(identConn, call, Ident::TheConfig.timeout);
+ state->connWait.start(connOpener, call);
}
void
typedef CommCbMemFunT<TcpLogger, CommConnectCbParams> Dialer;
AsyncCall::Pointer call = JobCallback(MY_DEBUG_SECTION, 5, Dialer, this, Log::TcpLogger::connectDone);
- AsyncJob::Start(new Comm::ConnOpener(futureConn, call, 2));
+ const auto cs = new Comm::ConnOpener(futureConn, call, 2);
+ connWait.start(cs, call);
}
/// Comm::ConnOpener callback
void
Log::TcpLogger::connectDone(const CommConnectCbParams ¶ms)
{
+ connWait.finish();
+
if (params.flag != Comm::OK) {
const double delay = 0.5; // seconds
if (connectFailures++ % 100 == 0) {
{
assert(inCall != NULL);
closer = NULL;
- conn = NULL;
+ if (conn) {
+ conn->noteClosure();
+ conn = nullptr;
+ }
// in all current use cases, we should not try to reconnect
mustStop("Log::TcpLogger::handleClosure");
}
#define _SQUID_SRC_LOG_TCPLOGGER_H
#include "base/AsyncJob.h"
+#include "base/JobWait.h"
+#include "comm/forward.h"
#include "ip/Address.h"
#include <list>
Ip::Address remote; ///< where the remote logger expects our records
AsyncCall::Pointer closer; ///< handles unexpected/external conn closures
+ /// waits for a connection to the remote logger to be established/opened
+ JobWait<Comm::ConnOpener> connWait;
+
uint64_t connectFailures; ///< number of sequential connection failures
uint64_t drops; ///< number of records dropped during the current outage
};
Mgr::Forwarder::noteCommClosed(const CommCloseCbParams &)
{
debugs(16, 5, HERE);
- conn = NULL; // needed?
+ closer = nullptr;
+ if (conn) {
+ conn->noteClosure();
+ conn = nullptr;
+ }
mustStop("commClosed");
}
/// called when the HTTP client or some external force closed our socket
void
-Mgr::Inquirer::noteCommClosed(const CommCloseCbParams& params)
+Mgr::Inquirer::noteCommClosed(const CommCloseCbParams &)
{
debugs(16, 5, HERE);
- Must(!Comm::IsConnOpen(conn) && params.conn.getRaw() == conn.getRaw());
- conn = NULL;
+ closer = nullptr;
+ if (conn) {
+ conn->noteClosure();
+ conn = nullptr;
+ }
mustStop("commClosed");
}
Mgr::StoreToCommWriter::noteCommClosed(const CommCloseCbParams &)
{
debugs(16, 6, HERE);
- Must(!Comm::IsConnOpen(clientConnection));
+ if (clientConnection) {
+ clientConnection->noteClosure();
+ clientConnection = nullptr;
+ }
+ closer = nullptr;
mustStop("commClosed");
}
void
Security::PeerConnector::commCloseHandler(const CommCloseCbParams ¶ms)
{
+ debugs(83, 5, "FD " << params.fd << ", Security::PeerConnector=" << params.data);
+
closeHandler = nullptr;
+ if (serverConn) {
+ countFailingConnection();
+ serverConn->noteClosure();
+ serverConn = nullptr;
+ }
- debugs(83, 5, "FD " << params.fd << ", Security::PeerConnector=" << params.data);
const auto err = new ErrorState(ERR_SECURE_CONNECT_FAIL, Http::scServiceUnavailable, request.getRaw(), al);
static const auto d = MakeNamedErrorDetail("TLS_CONNECT_CLOSE");
err->detailError(d);
bool
Security::PeerConnector::initialize(Security::SessionPointer &serverSession)
{
+ Must(Comm::IsConnOpen(serverConnection()));
+
Security::ContextPointer ctx(getTlsContext());
debugs(83, 5, serverConnection() << ", ctx=" << (void*)ctx.get());
void
Security::PeerConnector::recordNegotiationDetails()
{
+ Must(Comm::IsConnOpen(serverConnection()));
+
const int fd = serverConnection()->fd;
Security::SessionPointer session(fd_table[fd].ssl);
void
Security::PeerConnector::negotiate()
{
+ Must(Comm::IsConnOpen(serverConnection()));
+
const int fd = serverConnection()->fd;
if (fd_table[fd].closing())
return;
switch (result.category) {
case Security::IoResult::ioSuccess:
recordNegotiationDetails();
- if (sslFinalized())
+ if (sslFinalized() && callback)
sendSuccess();
return; // we may be gone by now
{
#if USE_OPENSSL
if (Ssl::TheConfig.ssl_crt_validator && useCertValidator_) {
+ Must(Comm::IsConnOpen(serverConnection()));
const int fd = serverConnection()->fd;
Security::SessionPointer session(fd_table[fd].ssl);
Security::PeerConnector::sslCrtvdHandleReply(Ssl::CertValidationResponse::Pointer validationResponse)
{
Must(validationResponse != NULL);
+ Must(Comm::IsConnOpen(serverConnection()));
ErrorDetail::Pointer errDetails;
bool validatorFailed = false;
if (!errDetails && !validatorFailed) {
noteNegotiationDone(NULL);
- sendSuccess();
+ if (callback)
+ sendSuccess();
return;
}
Security::CertErrors *
Security::PeerConnector::sslCrtvdCheckForErrors(Ssl::CertValidationResponse const &resp, ErrorDetail::Pointer &errDetails)
{
+ Must(Comm::IsConnOpen(serverConnection()));
+
ACLFilledChecklist *check = NULL;
Security::SessionPointer session(fd_table[serverConnection()->fd].ssl);
void
Security::PeerConnector::noteWantRead()
{
- const int fd = serverConnection()->fd;
debugs(83, 5, serverConnection());
+ Must(Comm::IsConnOpen(serverConnection()));
+ const int fd = serverConnection()->fd;
+
// read timeout to avoid getting stuck while reading from a silent server
typedef CommCbMemFunT<Security::PeerConnector, CommTimeoutCbParams> TimeoutDialer;
AsyncCall::Pointer timeoutCall = JobCallback(83, 5,
void
Security::PeerConnector::noteWantWrite()
{
- const int fd = serverConnection()->fd;
debugs(83, 5, serverConnection());
+ Must(Comm::IsConnOpen(serverConnection()));
+
+ const int fd = serverConnection()->fd;
Comm::SetSelect(fd, COMM_SELECT_WRITE, &NegotiateSsl, new Pointer(this), 0);
return;
}
bail(anErr);
}
+Security::EncryptorAnswer &
+Security::PeerConnector::answer()
+{
+ assert(callback);
+ const auto dialer = dynamic_cast<CbDialer*>(callback->getDialer());
+ assert(dialer);
+ return dialer->answer();
+}
+
void
Security::PeerConnector::bail(ErrorState *error)
{
Must(error); // or the recepient will not know there was a problem
- Must(callback != NULL);
- CbDialer *dialer = dynamic_cast<CbDialer*>(callback->getDialer());
- Must(dialer);
- dialer->answer().error = error;
+ answer().error = error;
- if (const auto p = serverConnection()->getPeer())
- peerConnectFailed(p);
+ if (const auto failingConnection = serverConn) {
+ countFailingConnection();
+ disconnect();
+ failingConnection->close();
+ }
callBack();
- disconnect();
-
- if (noteFwdPconnUse)
- fwdPconnPool->noteUses(fd_table[serverConn->fd].pconn.uses);
- serverConn->close();
- serverConn = nullptr;
}
void
Security::PeerConnector::sendSuccess()
{
- callBack();
+ assert(Comm::IsConnOpen(serverConn));
+ answer().conn = serverConn;
disconnect();
+ callBack();
+}
+
+void
+Security::PeerConnector::countFailingConnection()
+{
+ assert(serverConn);
+ if (const auto p = serverConn->getPeer())
+ peerConnectFailed(p);
+ // TODO: Calling PconnPool::noteUses() should not be our responsibility.
+ if (noteFwdPconnUse && serverConn->isOpen())
+ fwdPconnPool->noteUses(fd_table[serverConn->fd].pconn.uses);
}
void
Security::PeerConnector::disconnect()
{
+ const auto stillOpen = Comm::IsConnOpen(serverConn);
+
if (closeHandler) {
- comm_remove_close_handler(serverConnection()->fd, closeHandler);
+ if (stillOpen)
+ comm_remove_close_handler(serverConn->fd, closeHandler);
closeHandler = nullptr;
}
- commUnsetConnTimeout(serverConnection());
+ if (stillOpen)
+ commUnsetConnTimeout(serverConn);
+
+ serverConn = nullptr;
}
void
Security::PeerConnector::callBack()
{
- debugs(83, 5, "TLS setup ended for " << serverConnection());
+ debugs(83, 5, "TLS setup ended for " << answer().conn);
AsyncCall::Pointer cb = callback;
// Do this now so that if we throw below, swanSong() assert that we _tried_
// to call back holds.
callback = NULL; // this should make done() true
- CbDialer *dialer = dynamic_cast<CbDialer*>(cb->getDialer());
- Must(dialer);
- dialer->answer().conn = serverConnection();
ScheduleCallHere(cb);
}
{
// XXX: unregister fd-closure monitoring and CommSetSelect interest, if any
AsyncJob::swanSong();
- if (callback != NULL) { // paranoid: we have left the caller waiting
- debugs(83, DBG_IMPORTANT, "BUG: Unexpected state while connecting to a cache_peer or origin server");
+
+ if (callback) {
+ // job-ending emergencies like handleStopRequest() or callException()
const auto anErr = new ErrorState(ERR_GATEWAY_FAILURE, Http::scInternalServerError, request.getRaw(), al);
bail(anErr);
assert(!callback);
buf.append("Stopped, reason:", 16);
buf.appendf("%s",stopReason);
}
- if (serverConn != NULL)
+ if (Comm::IsConnOpen(serverConn))
buf.appendf(" FD %d", serverConn->fd);
buf.appendf(" %s%u]", id.prefix(), id.value);
buf.terminate();
PeerConnectorCertDownloaderDialer(&Security::PeerConnector::certDownloadingDone, this));
const auto dl = new Downloader(url, certCallback, XactionInitiator::initCertFetcher, certDownloadNestingLevel() + 1);
- AsyncJob::Start(dl);
+ certDownloadWait.start(dl, certCallback);
}
void
Security::PeerConnector::certDownloadingDone(SBuf &obj, int downloadStatus)
{
+ certDownloadWait.finish();
+
++certsDownloads;
debugs(81, 5, "Certificate downloading status: " << downloadStatus << " certificate size: " << obj.length());
+ Must(Comm::IsConnOpen(serverConnection()));
const auto &sconn = *fd_table[serverConnection()->fd].ssl;
// Parse Certificate. Assume that it is in DER format.
void
Security::PeerConnector::handleMissingCertificates(const Security::IoResult &ioResult)
{
+ Must(Comm::IsConnOpen(serverConnection()));
auto &sconn = *fd_table[serverConnection()->fd].ssl;
// We download the missing certificate(s) once. We would prefer to clear
#include "acl/Acl.h"
#include "base/AsyncCbdataCalls.h"
#include "base/AsyncJob.h"
+#include "base/JobWait.h"
#include "CommCalls.h"
#include "http/forward.h"
#include "security/EncryptorAnswer.h"
#include <queue>
class ErrorState;
+class Downloader;
class AccessLogEntry;
typedef RefCount<AccessLogEntry> AccessLogEntryPointer;
/// a bail(), sendSuccess() helper: stops monitoring the connection
void disconnect();
+ /// updates connection usage history before the connection is closed
+ void countFailingConnection();
+
/// If called the certificates validator will not used
void bypassCertValidator() {useCertValidator_ = false;}
/// logging
void recordNegotiationDetails();
+ /// convenience method to get to the answer fields
+ EncryptorAnswer &answer();
+
HttpRequestPointer request; ///< peer connection trigger or cause
Comm::ConnectionPointer serverConn; ///< TCP connection to the peer
AccessLogEntryPointer al; ///< info for the future access.log entry
/// outcome of the last (failed and) suspended negotiation attempt (or nil)
Security::IoResultPointer suspendedError_;
+
+ JobWait<Downloader> certDownloadWait; ///< waits for the missing certificate to be downloaded
};
} // namespace Security
typedef long ParsedPortFlags;
class PeerConnector;
+class BlindPeerConnector;
class PeerOptions;
#if USE_OPENSSL
dataConn(),
uploadAvailSize(0),
listener(),
- connector(),
+ dataConnWait(),
reader(),
waitingForOrigin(false),
originDataDownloadAbortedOnError(false)
// active transfer: open a data connection from Squid to client
typedef CommCbMemFunT<Server, CommConnectCbParams> Dialer;
- connector = JobCallback(17, 3, Dialer, this, Ftp::Server::connectedForData);
- Comm::ConnOpener *cs = new Comm::ConnOpener(dataConn, connector,
- Config.Timeout.connect);
- AsyncJob::Start(cs);
- return false; // ConnStateData::processFtpRequest waits handleConnectDone
+ AsyncCall::Pointer callback = JobCallback(17, 3, Dialer, this, Ftp::Server::connectedForData);
+ const auto cs = new Comm::ConnOpener(dataConn->cloneProfile(), callback,
+ Config.Timeout.connect);
+ dataConnWait.start(cs, callback);
+ return false;
}
/// Check that client data connection is ready for immediate I/O.
void
Ftp::Server::connectedForData(const CommConnectCbParams ¶ms)
{
- connector = NULL;
+ dataConnWait.finish();
if (params.flag != Comm::OK) {
- /* it might have been a timeout with a partially open link */
- if (params.conn != NULL)
- params.conn->close();
setReply(425, "Cannot open data connection.");
Http::StreamPointer context = pipeline.front();
Must(context->http);
Must(context->http->storeEntry() != NULL);
+ // TODO: call closeDataConnection() to reset data conn processing?
} else {
- Must(dataConn == params.conn);
+ // Finalize the details and start owning the supplied connection.
+ assert(params.conn);
+ assert(dataConn);
+ assert(!dataConn->isOpen());
+ dataConn = params.conn;
+ // XXX: Missing comm_add_close_handler() to track external closures.
+
Must(Comm::IsConnOpen(params.conn));
fd_note(params.conn->fd, "active client ftp data");
}
#ifndef SQUID_SERVERS_FTP_SERVER_H
#define SQUID_SERVERS_FTP_SERVER_H
+#include "base/JobWait.h"
#include "base/Lock.h"
#include "client_side.h"
+#include "comm/forward.h"
namespace Ftp
{
size_t uploadAvailSize; ///< number of yet unused uploadBuf bytes
AsyncCall::Pointer listener; ///< set when we are passively listening
- AsyncCall::Pointer connector; ///< set when we are actively connecting
+
+ /// Waits for an FTP data connection to the client to be established/opened.
+ /// This wait only happens in FTP active mode (via PORT or EPRT).
+ JobWait<Comm::ConnOpener> dataConnWait;
+
AsyncCall::Pointer reader; ///< set when we are reading FTP data
/// whether we wait for the origin data transfer to end
{
debugs(49, 5, HERE);
Must(fd == params.fd);
+ closer = nullptr;
fd = -1;
mustStop("commClosed");
}
Snmp::Forwarder::handleException(const std::exception& e)
{
debugs(49, 3, HERE << e.what());
- if (fd >= 0)
- sendError(SNMP_ERR_GENERR);
+ sendError(SNMP_ERR_GENERR);
Ipc::Forwarder::handleException(e);
}
Snmp::Forwarder::sendError(int error)
{
debugs(49, 3, HERE);
+
+ if (fd < 0)
+ return; // client gone
+
Snmp::Request& req = static_cast<Snmp::Request&>(*request);
req.pdu.command = SNMP_PDU_RESPONSE;
req.pdu.errstat = error;
{
debugs(49, 5, HERE);
Must(!Comm::IsConnOpen(conn) || conn->fd == params.conn->fd);
- conn = NULL;
+ closer = nullptr;
+ if (conn) {
+ conn->noteClosure();
+ conn = nullptr;
+ }
mustStop("commClosed");
}
Snmp::Inquirer::sendResponse()
{
debugs(49, 5, HERE);
+
+ if (!Comm::IsConnOpen(conn))
+ return; // client gone
+
aggrPdu.fixAggregate();
aggrPdu.command = SNMP_PDU_RESPONSE;
u_char buffer[SNMP_REQUEST_SIZE];
void switchToTunnel(HttpRequest *request, const Comm::ConnectionPointer &clientConn, const Comm::ConnectionPointer &srvConn, const SBuf &preReadServerData);
void
-Ssl::PeekingPeerConnector::cbCheckForPeekAndSpliceDone(Acl::Answer answer, void *data)
+Ssl::PeekingPeerConnector::cbCheckForPeekAndSpliceDone(const Acl::Answer aclAnswer, void *data)
{
Ssl::PeekingPeerConnector *peerConnect = (Ssl::PeekingPeerConnector *) data;
// Use job calls to add done() checks and other job logic/protections.
- CallJobHere1(83, 7, CbcPointer<PeekingPeerConnector>(peerConnect), Ssl::PeekingPeerConnector, checkForPeekAndSpliceDone, answer);
+ CallJobHere1(83, 7, CbcPointer<PeekingPeerConnector>(peerConnect), Ssl::PeekingPeerConnector, checkForPeekAndSpliceDone, aclAnswer);
}
void
-Ssl::PeekingPeerConnector::checkForPeekAndSpliceDone(Acl::Answer answer)
+Ssl::PeekingPeerConnector::checkForPeekAndSpliceDone(const Acl::Answer aclAnswer)
{
- const Ssl::BumpMode finalAction = answer.allowed() ?
- static_cast<Ssl::BumpMode>(answer.kind):
+ const Ssl::BumpMode finalAction = aclAnswer.allowed() ?
+ static_cast<Ssl::BumpMode>(aclAnswer.kind):
checkForPeekAndSpliceGuess();
checkForPeekAndSpliceMatched(finalAction);
}
splice = true;
// Ssl Negotiation stops here. Last SSL checks for valid certificates
// and if done, switch to tunnel mode
- if (sslFinalized()) {
- debugs(83,5, "Abort NegotiateSSL on FD " << serverConn->fd << " and splice the connection");
+ if (sslFinalized() && callback)
callBack();
- }
}
}
auto b = SSL_get_rbio(session.get());
auto srvBio = static_cast<Ssl::ServerBio*>(BIO_get_data(b));
+ debugs(83, 5, "will tunnel instead of negotiating TLS");
switchToTunnel(request.getRaw(), clientConn, serverConn, srvBio->rBufData());
- tunnelInsteadOfNegotiating();
+ answer().tunneled = true;
+ disconnect();
+ callBack();
}
void
}
}
-void
-Ssl::PeekingPeerConnector::tunnelInsteadOfNegotiating()
-{
- Must(callback != NULL);
- CbDialer *dialer = dynamic_cast<CbDialer*>(callback->getDialer());
- Must(dialer);
- dialer->answer().tunneled = true;
- debugs(83, 5, "The SSL negotiation with server aborted");
-}
-
void checkForPeekAndSplice();
/// Callback function for ssl_bump acl check in step3 SSL bump step.
- void checkForPeekAndSpliceDone(Acl::Answer answer);
+ void checkForPeekAndSpliceDone(Acl::Answer);
/// Handles the final bumping decision.
void checkForPeekAndSpliceMatched(const Ssl::BumpMode finalMode);
void startTunneling();
/// A wrapper function for checkForPeekAndSpliceDone for use with acl
- static void cbCheckForPeekAndSpliceDone(Acl::Answer answer, void *data);
+ static void cbCheckForPeekAndSpliceDone(Acl::Answer, void *data);
private:
#include "comm/Connection.h"
Comm::Connection::Connection() STUB
Comm::Connection::~Connection() STUB
-Comm::ConnectionPointer Comm::Connection::cloneIdentDetails() const STUB_RETVAL(nullptr)
-Comm::ConnectionPointer Comm::Connection::cloneDestinationDetails() const STUB_RETVAL(nullptr)
+Comm::ConnectionPointer Comm::Connection::cloneProfile() const STUB_RETVAL(nullptr)
void Comm::Connection::close() STUB
+void Comm::Connection::noteClosure() STUB
CachePeer * Comm::Connection::getPeer() const STUB_RETVAL(NULL)
void Comm::Connection::setPeer(CachePeer * p) STUB
ScopedId Comm::Connection::codeContextGist() const STUB_RETVAL(id.detach())
#include "squid.h"
#include "AccessLogEntry.h"
#include "comm/Connection.h"
+#include "Downloader.h"
#include "HttpRequest.h"
#define STUB_API "security/libsecurity.la"
void PeerConnector::sendSuccess() STUB
void PeerConnector::callBack() STUB
void PeerConnector::disconnect() STUB
+void PeerConnector::countFailingConnection() STUB
void PeerConnector::recordNegotiationDetails() STUB
+EncryptorAnswer &PeerConnector::answer() STUB_RETREF(EncryptorAnswer)
}
#include "security/PeerOptions.h"
#include "squid.h"
#include "acl/FilledChecklist.h"
#include "base/CbcPointer.h"
+#include "base/JobWait.h"
#include "CachePeer.h"
#include "cbdata.h"
#include "client_side.h"
SBuf preReadClientData;
SBuf preReadServerData;
time_t startTime; ///< object creation time, before any peer selection/connection attempts
- /// Whether we are waiting for the CONNECT request/response exchange with the peer.
- bool waitingForConnectExchange;
- HappyConnOpenerPointer connOpener; ///< current connection opening job
ResolvedPeersPointer destinations; ///< paths for forwarding the request
bool destinationsFound; ///< At least one candidate path found
/// whether another destination may be still attempted if the TCP connection
// TODO: remove after fixing deferred reads in TunnelStateData::copyRead()
CodeContext::Pointer codeContext; ///< our creator context
- // AsyncCalls which we set and may need cancelling.
- struct {
- AsyncCall::Pointer connector; ///< a call linking us to the ConnOpener producing serverConn.
- } calls;
+ /// waits for a transport connection to the peer to be established/opened
+ JobWait<HappyConnOpener> transportWait;
+
+ /// waits for the established transport connection to be secured/encrypted
+ JobWait<Security::PeerConnector> encryptionWait;
+
+ /// waits for an HTTP CONNECT tunnel through a cache_peer to be negotiated
+ /// over the (encrypted, if needed) transport connection to that cache_peer
+ JobWait<Http::Tunneler> peerWait;
void copyRead(Connection &from, IOCB *completion);
/// when all candidate destinations have been tried and all have failed
void noteConnection(HappyConnOpenerAnswer &);
- /// whether we are waiting for HappyConnOpener
- /// same as calls.connector but may differ from connOpener.valid()
- bool opening() const { return connOpener.set(); }
-
- void cancelOpening(const char *reason);
-
/// Start using an established connection
void connectDone(const Comm::ConnectionPointer &conn, const char *origin, const bool reused);
/// \returns whether the request should be retried (nil) or the description why it should not
const char *checkRetry();
+ /// whether the successfully selected path destination or the established
+ /// server connection is still in use
+ bool usingDestination() const;
/// details of the "last tunneling attempt" failure (if it failed)
ErrorState *savedError = nullptr;
void deleteThis();
+ void cancelStep(const char *reason);
+
public:
bool keepGoingAfterRead(size_t len, Comm::Flag errcode, int xerrno, Connection &from, Connection &to);
void copy(size_t len, Connection &from, Connection &to, IOCB *);
TunnelStateData::TunnelStateData(ClientHttpRequest *clientRequest) :
startTime(squid_curtime),
- waitingForConnectExchange(false),
destinations(new ResolvedPeers()),
destinationsFound(false),
retriable(true),
debugs(26, 3, "TunnelStateData destructed this=" << this);
assert(noConnections());
xfree(url);
- if (opening())
- cancelOpening("~TunnelStateData");
+ cancelStep("~TunnelStateData");
delete savedError;
}
static void
tunnelStartShoveling(TunnelStateData *tunnelState)
{
- assert(!tunnelState->waitingForConnectExchange);
+ assert(!tunnelState->transportWait);
+ assert(!tunnelState->encryptionWait);
+ assert(!tunnelState->peerWait);
+
assert(tunnelState->server.conn);
AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "tunnelTimeout",
CommTimeoutCbPtrFun(tunnelTimeout, tunnelState));
void
TunnelStateData::tunnelEstablishmentDone(Http::TunnelerAnswer &answer)
{
+ peerWait.finish();
server.len = 0;
if (logTag_ptr)
if (answer.peerResponseStatus != Http::scNone)
*status_ptr = answer.peerResponseStatus;
- waitingForConnectExchange = false;
-
auto sawProblem = false;
if (!answer.positive()) {
sawProblem = true;
- Must(!Comm::IsConnOpen(answer.conn));
+ assert(!answer.conn);
} else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) {
sawProblem = true;
closePendingConnection(answer.conn, "conn was closed while waiting for tunnelEstablishmentDone");
void
TunnelStateData::noteConnection(HappyConnOpener::Answer &answer)
{
- calls.connector = nullptr;
- connOpener.clear();
+ transportWait.finish();
ErrorState *error = nullptr;
if ((error = answer.error.get())) {
AsyncCall::Pointer callback = asyncCall(5,4, "TunnelStateData::noteSecurityPeerConnectorAnswer",
MyAnswerDialer(&TunnelStateData::noteSecurityPeerConnectorAnswer, this));
const auto connector = new Security::BlindPeerConnector(request, conn, callback, al);
- AsyncJob::Start(connector); // will call our callback
+ encryptionWait.start(connector, callback);
}
/// starts a preparation step for an established connection; retries on failures
void
TunnelStateData::noteSecurityPeerConnectorAnswer(Security::EncryptorAnswer &answer)
{
+ encryptionWait.finish();
+
ErrorState *error = nullptr;
+ assert(!answer.tunneled);
if ((error = answer.error.get())) {
- Must(!Comm::IsConnOpen(answer.conn));
+ assert(!answer.conn);
answer.error.clear();
} else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) {
error = new ErrorState(ERR_CANNOT_FORWARD, Http::scServiceUnavailable, request.getRaw(), al);
void
TunnelStateData::establishTunnelThruProxy(const Comm::ConnectionPointer &conn)
{
- assert(!waitingForConnectExchange);
-
AsyncCall::Pointer callback = asyncCall(5,4,
"TunnelStateData::tunnelEstablishmentDone",
Http::Tunneler::CbDialer<TunnelStateData>(&TunnelStateData::tunnelEstablishmentDone, this));
#if USE_DELAY_POOLS
tunneler->setDelayId(server.delayId);
#endif
- AsyncJob::Start(tunneler);
- waitingForConnectExchange = true;
- // and wait for the tunnelEstablishmentDone() call
+ peerWait.start(tunneler, callback);
}
void
destinations->addPath(path);
- if (Comm::IsConnOpen(server.conn)) {
+ if (usingDestination()) {
// We are already using a previously opened connection but also
// receiving destinations in case we need to re-forward.
- Must(!opening());
+ Must(!transportWait);
return;
}
- if (opening()) {
+ if (transportWait) {
notifyConnOpener();
return; // and continue to wait for tunnelConnectDone() callback
}
// if all of them fail, tunneling as whole will fail
Must(!selectionError); // finding at least one path means selection succeeded
- if (Comm::IsConnOpen(server.conn)) {
+ if (usingDestination()) {
// We are already using a previously opened connection but also
// receiving destinations in case we need to re-forward.
- Must(!opening());
+ Must(!transportWait);
return;
}
- Must(opening()); // or we would be stuck with nothing to do or wait for
+ Must(transportWait); // or we would be stuck with nothing to do or wait for
notifyConnOpener();
}
+bool
+TunnelStateData::usingDestination() const
+{
+ return encryptionWait || peerWait || Comm::IsConnOpen(server.conn);
+}
+
/// remembers an error to be used if there will be no more connection attempts
void
TunnelStateData::saveError(ErrorState *error)
if (request)
request->hier.stopPeerClock(false);
- if (opening())
- cancelOpening(reason);
+ cancelStep(reason);
assert(finalError);
errorSend(client.conn, finalError);
}
-/// Notify connOpener that we no longer need connections. We do not have to do
-/// this -- connOpener would eventually notice on its own, but notifying reduces
-/// waste and speeds up spare connection opening for other transactions (that
-/// could otherwise wait for this transaction to use its spare allowance).
+/// Notify a pending subtask, if any, that we no longer need its help. We do not
+/// have to do this -- the subtask job will eventually end -- but ending it
+/// earlier reduces waste and may reduce DoS attack surface.
void
-TunnelStateData::cancelOpening(const char *reason)
+TunnelStateData::cancelStep(const char *reason)
{
- assert(calls.connector);
- calls.connector->cancel(reason);
- calls.connector = nullptr;
- notifyConnOpener();
- connOpener.clear();
+ transportWait.cancel(reason);
+ encryptionWait.cancel(reason);
+ peerWait.cancel(reason);
}
void
request->hier.startPeerClock();
assert(!destinations->empty());
- assert(!opening());
- calls.connector = asyncCall(17, 5, "TunnelStateData::noteConnection", HappyConnOpener::CbDialer<TunnelStateData>(&TunnelStateData::noteConnection, this));
- const auto cs = new HappyConnOpener(destinations, calls.connector, request, startTime, 0, al);
+ assert(!usingDestination());
+ AsyncCall::Pointer callback = asyncCall(17, 5, "TunnelStateData::noteConnection", HappyConnOpener::CbDialer<TunnelStateData>(&TunnelStateData::noteConnection, this));
+ const auto cs = new HappyConnOpener(destinations, callback, request, startTime, 0, al);
cs->setHost(request->url.host());
cs->setRetriable(false);
cs->allowPersistent(false);
destinations->notificationPending = true; // start() is async
- connOpener = cs;
- AsyncJob::Start(cs);
+ transportWait.start(cs, callback);
}
/// send request on an existing connection dedicated to the requesting client
#endif
-/// makes sure connOpener knows that destinations have changed
+/// makes sure connection opener knows that the destinations have changed
void
TunnelStateData::notifyConnOpener()
{
debugs(17, 7, "reusing pending notification");
} else {
destinations->notificationPending = true;
- CallJobHere(17, 5, connOpener, HappyConnOpener, noteCandidatesChange);
+ CallJobHere(17, 5, transportWait.job(), HappyConnOpener, noteCandidatesChange);
}
}
{
WhoisState *p = (WhoisState *)params.data;
debugs(75, 3, "whoisClose: FD " << params.fd);
+ // We do not own a Connection. Assume that FwdState is also monitoring.
p->entry->unlock("whoisClose");
delete p;
}