return;
theConsumer = new BodySink(this);
+ AsyncJob::Start(theConsumer);
debugs(91,7, HERE << "starting auto consumption" << status());
scheduleBodyDataNotification();
}
{
}
+// XXX: Add CommAcceptCbParams::syncWithComm(). Adjust syncWithComm() API if all
+// implementations always return true.
+
void
CommAcceptCbParams::print(std::ostream &os) const
{
bool
CommConnectCbParams::syncWithComm()
{
- // drop the call if the call was scheduled before comm_close but
- // is being fired after comm_close
- if (fd >= 0 && fd_table[fd].closing()) {
- debugs(5, 3, HERE << "dropping late connect call: FD " << fd);
- return false;
+ assert(conn);
+
+ // change parameters if this is a successful callback that was scheduled
+ // after its Comm-registered connection started to close
+
+ if (flag != Comm::OK) {
+ assert(!conn->isOpen());
+ return true; // not a successful callback; cannot go out of sync
}
- return true; // now we are in sync and can handle the call
+
+ assert(conn->isOpen());
+ if (!fd_table[conn->fd].closing())
+ return true; // no closing in progress; in sync (for now)
+
+ debugs(5, 3, "converting to Comm::ERR_CLOSING: " << conn);
+ conn->noteClosure();
+ flag = Comm::ERR_CLOSING;
+ return true; // now the callback is in sync with Comm again
}
/* CommIoCbParams */
Downloader::swanSong()
{
debugs(33, 6, this);
+
+ if (callback_) // job-ending emergencies like handleStopRequest() or callException()
+ callBack(Http::scInternalServerError);
+
if (context_) {
context_->finished();
context_ = nullptr;
void
Downloader::callBack(Http::StatusCode const statusCode)
{
+ assert(callback_);
CbDialer *dialer = dynamic_cast<CbDialer*>(callback_->getDialer());
Must(dialer);
dialer->status = statusCode;
{
debugs(17, 3, "for " << reason);
- if (opening())
- cancelOpening(reason);
+ cancelStep(reason);
PeerSelectionInitiator::subscribed = false; // may already be false
self = nullptr; // we hope refcounting destroys us soon; may already be nil
/* do not place any code here as this object may be gone by now */
}
-/// Notify connOpener that we no longer need connections. We do not have to do
-/// this -- connOpener would eventually notice on its own, but notifying reduces
-/// waste and speeds up spare connection opening for other transactions (that
-/// could otherwise wait for this transaction to use its spare allowance).
+/// Notify a pending subtask, if any, that we no longer need its help. We do not
+/// have to do this -- the subtask job will eventually end -- but ending it
+/// earlier reduces waste and may reduce DoS attack surface.
void
-FwdState::cancelOpening(const char *reason)
+FwdState::cancelStep(const char *reason)
{
- assert(calls.connector);
- calls.connector->cancel(reason);
- calls.connector = nullptr;
- notifyConnOpener();
- connOpener.clear();
+ transportWait.cancel(reason);
+ encryptionWait.cancel(reason);
+ peerWait.cancel(reason);
}
#if STRICT_ORIGINAL_DST
entry = NULL;
- if (opening())
- cancelOpening("~FwdState");
+ cancelStep("~FwdState");
if (Comm::IsConnOpen(serverConn))
closeServerConnection("~FwdState");
if (!errorState->request)
errorState->request = request;
- if (err->type != ERR_ZERO_SIZE_OBJECT)
- return;
+ if (err->type == ERR_ZERO_SIZE_OBJECT)
+ reactToZeroSizeObject();
+
+ destinationReceipt = nullptr; // may already be nil
+}
+
+/// ERR_ZERO_SIZE_OBJECT requires special adjustments
+void
+FwdState::reactToZeroSizeObject()
+{
+ assert(err->type == ERR_ZERO_SIZE_OBJECT);
if (pconnRace == racePossible) {
debugs(17, 5, HERE << "pconn race happened");
if (Comm::IsConnOpen(serverConn))
unregister(serverConn);
+ serverConn = nullptr;
+ destinationReceipt = nullptr;
entry->reset();
}
}
+bool
+FwdState::usingDestination() const
+{
+ return encryptionWait || peerWait || Comm::IsConnOpen(serverConn);
+}
+
void
FwdState::noteDestination(Comm::ConnectionPointer path)
{
destinations->addPath(path);
- if (Comm::IsConnOpen(serverConn)) {
+ if (usingDestination()) {
// We are already using a previously opened connection, so we cannot be
- // waiting for connOpener. We still receive destinations for backup.
- Must(!opening());
+ // waiting for it. We still receive destinations for backup.
+ Must(!transportWait);
return;
}
- if (opening()) {
+ if (transportWait) {
notifyConnOpener();
return; // and continue to wait for FwdState::noteConnection() callback
}
- // This is the first path candidate we have seen. Create connOpener.
+ // This is the first path candidate we have seen. Use it.
useDestinations();
}
// if all of them fail, forwarding as whole will fail
Must(!selectionError); // finding at least one path means selection succeeded
- if (Comm::IsConnOpen(serverConn)) {
+ if (usingDestination()) {
// We are already using a previously opened connection, so we cannot be
- // waiting for connOpener. We were receiving destinations for backup.
- Must(!opening());
+ // waiting for it. We were receiving destinations for backup.
+ Must(!transportWait);
return;
}
- Must(opening()); // or we would be stuck with nothing to do or wait for
+ Must(transportWait); // or we would be stuck with nothing to do or wait for
notifyConnOpener();
// and continue to wait for FwdState::noteConnection() callback
}
-/// makes sure connOpener knows that destinations have changed
+/// makes sure connection opener knows that the destinations have changed
void
FwdState::notifyConnOpener()
{
} else {
debugs(17, 7, "notifying about " << *destinations);
destinations->notificationPending = true;
- CallJobHere(17, 5, connOpener, HappyConnOpener, noteCandidatesChange);
+ CallJobHere(17, 5, transportWait.job(), HappyConnOpener, noteCandidatesChange);
}
}
fwdServerClosedWrapper(const CommCloseCbParams ¶ms)
{
FwdState *fwd = (FwdState *)params.data;
- fwd->serverClosed(params.fd);
+ fwd->serverClosed();
}
/**** PRIVATE *****************************************************************/
}
void
-FwdState::serverClosed(int fd)
+FwdState::serverClosed()
{
- // XXX: fd is often -1 here
- debugs(17, 2, "FD " << fd << " " << entry->url() << " after " <<
- (fd >= 0 ? fd_table[fd].pconn.uses : -1) << " requests");
- if (fd >= 0 && serverConnection()->fd == fd)
- fwdPconnPool->noteUses(fd_table[fd].pconn.uses);
+ // XXX: This method logic attempts to tolerate Connection::close() called
+ // for serverConn earlier, by one of our dispatch()ed jobs. If that happens,
+ // serverConn will already be closed here or, worse, it will already be open
+ // for the next forwarding attempt. The current code prevents us getting
+ // stuck, but the long term solution is to stop sharing serverConn.
+ debugs(17, 2, serverConn);
+ if (Comm::IsConnOpen(serverConn)) {
+ const auto uses = fd_table[serverConn->fd].pconn.uses;
+ debugs(17, 3, "prior uses: " << uses);
+ fwdPconnPool->noteUses(uses); // XXX: May not have come from fwdPconnPool
+ serverConn->noteClosure();
+ }
+ serverConn = nullptr;
+ closeHandler = nullptr;
+ destinationReceipt = nullptr;
retryOrBail();
}
{
debugs(17, 2, HERE << "self=" << self << " err=" << err << ' ' << entry->url());
assert(!Comm::IsConnOpen(serverConn));
+ serverConn = nullptr;
+ destinationReceipt = nullptr;
retryOrBail();
}
} catch (...) {
debugs (17, 2, "exception while trying to " << stepDescription << ": " << CurrentException);
closePendingConnection(conn, "connection preparation exception");
+ if (!err)
+ fail(new ErrorState(ERR_GATEWAY_FAILURE, Http::scInternalServerError, request, al));
retryOrBail();
}
}
{
assert(!destinationReceipt);
- calls.connector = nullptr;
- connOpener.clear();
+ transportWait.finish();
Must(n_tries <= answer.n_tries); // n_tries cannot decrease
n_tries = answer.n_tries;
Must(!Comm::IsConnOpen(answer.conn));
answer.error.clear(); // preserve error for errorSendComplete()
} else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) {
+ // The socket could get closed while our callback was queued. Sync
+ // Connection. XXX: Connection::fd may already be stale/invalid here.
// We do not know exactly why the connection got closed, so we play it
// safe, allowing retries only for persistent (reused) connections
if (answer.reused) {
if (!conn->getPeer()->options.no_delay)
tunneler->setDelayId(entry->mem_obj->mostBytesAllowed());
#endif
- AsyncJob::Start(tunneler);
- // and wait for the tunnelEstablishmentDone() call
+ peerWait.start(tunneler, callback);
}
/// resumes operations after the (possibly failed) HTTP CONNECT exchange
void
FwdState::tunnelEstablishmentDone(Http::TunnelerAnswer &answer)
{
+ peerWait.finish();
+
ErrorState *error = nullptr;
if (!answer.positive()) {
- Must(!Comm::IsConnOpen(answer.conn));
+ Must(!answer.conn);
error = answer.squidError.get();
Must(error);
answer.squidError.clear(); // preserve error for fail()
} else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) {
- // The socket could get closed while our callback was queued.
- // We close Connection here to sync Connection::fd.
+ // The socket could get closed while our callback was queued. Sync
+ // Connection. XXX: Connection::fd may already be stale/invalid here.
closePendingConnection(answer.conn, "conn was closed while waiting for tunnelEstablishmentDone");
error = new ErrorState(ERR_CANNOT_FORWARD, Http::scServiceUnavailable, request, al);
} else if (!answer.leftovers.isEmpty()) {
#endif
connector = new Security::BlindPeerConnector(requestPointer, conn, callback, al, sslNegotiationTimeout);
connector->noteFwdPconnUse = true;
- AsyncJob::Start(connector); // will call our callback
+ encryptionWait.start(connector, callback);
}
/// called when all negotiations with the TLS-speaking peer have been completed
void
FwdState::connectedToPeer(Security::EncryptorAnswer &answer)
{
+ encryptionWait.finish();
+
ErrorState *error = nullptr;
if ((error = answer.error.get())) {
- Must(!Comm::IsConnOpen(answer.conn));
+ assert(!answer.conn);
answer.error.clear(); // preserve error for errorSendComplete()
} else if (answer.tunneled) {
+ assert(!answer.conn);
// TODO: When ConnStateData establishes tunnels, its state changes
// [in ways that may affect logging?]. Consider informing
// ConnStateData about our tunnel or otherwise unifying tunnel
complete(); // destroys us
return;
} else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) {
+ // The socket could get closed while our callback was queued. Sync
+ // Connection. XXX: Connection::fd may already be stale/invalid here.
closePendingConnection(answer.conn, "conn was closed while waiting for connectedToPeer");
error = new ErrorState(ERR_CANNOT_FORWARD, Http::scServiceUnavailable, request, al);
}
Must(!request->pinnedConnection());
assert(!destinations->empty());
- assert(!opening());
+ assert(!usingDestination());
// Ditch error page if it was created before.
// A new one will be created if there's another problem
delete err;
err = nullptr;
request->clearError();
- serverConn = nullptr;
- destinationReceipt = nullptr;
request->hier.startPeerClock();
- calls.connector = asyncCall(17, 5, "FwdState::noteConnection", HappyConnOpener::CbDialer<FwdState>(&FwdState::noteConnection, this));
+ AsyncCall::Pointer callback = asyncCall(17, 5, "FwdState::noteConnection", HappyConnOpener::CbDialer<FwdState>(&FwdState::noteConnection, this));
HttpRequest::Pointer cause = request;
- const auto cs = new HappyConnOpener(destinations, calls.connector, cause, start_t, n_tries, al);
+ const auto cs = new HappyConnOpener(destinations, callback, cause, start_t, n_tries, al);
cs->setHost(request->url.host());
bool retriable = checkRetriable();
if (!retriable && Config.accessList.serverPconnForNonretriable) {
cs->setRetriable(retriable);
cs->allowPersistent(pconnRace != raceHappened);
destinations->notificationPending = true; // start() is async
- connOpener = cs;
- AsyncJob::Start(cs);
+ transportWait.start(cs, callback);
}
/// send request on an existing connection dedicated to the requesting client
#ifndef SQUID_FORWARD_H
#define SQUID_FORWARD_H
-#include "base/CbcPointer.h"
#include "base/forward.h"
+#include "base/JobWait.h"
#include "base/RefCount.h"
#include "clients/forward.h"
#include "comm.h"
#include "comm/Connection.h"
-#include "comm/ConnOpener.h"
#include "error/forward.h"
#include "fde.h"
#include "http/StatusCode.h"
typedef RefCount<ResolvedPeers> ResolvedPeersPointer;
class HappyConnOpener;
-typedef CbcPointer<HappyConnOpener> HappyConnOpenerPointer;
class HappyConnOpenerAnswer;
/// Sets initial TOS value and Netfilter for the future outgoing connection.
void handleUnregisteredServerEnd();
int reforward();
bool reforwardableStatus(const Http::StatusCode s) const;
- void serverClosed(int fd);
+ void serverClosed();
void connectStart();
void connectDone(const Comm::ConnectionPointer & conn, Comm::Flag status, int xerrno);
bool checkRetry();
/* PeerSelectionInitiator API */
virtual void noteDestination(Comm::ConnectionPointer conn) override;
virtual void noteDestinationsEnd(ErrorState *selectionError) override;
+ /// whether the successfully selected path destination or the established
+ /// server connection is still in use
+ bool usingDestination() const;
void noteConnection(HappyConnOpenerAnswer &);
/// \returns the time left for this connection to become connected or 1 second if it is less than one second left
time_t connectingTimeout(const Comm::ConnectionPointer &conn) const;
- /// whether we are waiting for HappyConnOpener
- /// same as calls.connector but may differ from connOpener.valid()
- bool opening() const { return connOpener.set(); }
-
- void cancelOpening(const char *reason);
+ void cancelStep(const char *reason);
void notifyConnOpener();
+ void reactToZeroSizeObject();
public:
StoreEntry *entry;
time_t start_t;
int n_tries; ///< the number of forwarding attempts so far
- // AsyncCalls which we set and may need cancelling.
- struct {
- AsyncCall::Pointer connector; ///< a call linking us to the ConnOpener producing serverConn.
- } calls;
-
struct {
bool connected_okay; ///< TCP link ever opened properly. This affects retry of POST,PUT,CONNECT,etc
bool dont_retry;
bool destinationsFound; ///< at least one candidate path found
} flags;
- HappyConnOpenerPointer connOpener; ///< current connection opening job
+ /// waits for a transport connection to the peer to be established/opened
+ JobWait<HappyConnOpener> transportWait;
+
+ /// waits for the established transport connection to be secured/encrypted
+ JobWait<Security::PeerConnector> encryptionWait;
+
+ /// waits for an HTTP CONNECT tunnel through a cache_peer to be negotiated
+ /// over the (encrypted, if needed) transport connection to that cache_peer
+ JobWait<Http::Tunneler> peerWait;
+
ResolvedPeersPointer destinations; ///< paths for forwarding the request
Comm::ConnectionPointer serverConn; ///< a successfully opened connection to a server.
PeerConnectionPointer destinationReceipt; ///< peer selection result (or nil)
#include "neighbors.h"
#include "pconn.h"
#include "PeerPoolMgr.h"
+#include "sbuf/Stream.h"
#include "SquidConfig.h"
CBDATA_CLASS_INIT(HappyConnOpener);
fwdStart(aFwdStart),
callback_(aCall),
destinations(dests),
+ prime(&HappyConnOpener::notePrimeConnectDone, "HappyConnOpener::notePrimeConnectDone"),
+ spare(&HappyConnOpener::noteSpareConnectDone, "HappyConnOpener::noteSpareConnectDone"),
ale(anAle),
cause(request),
n_tries(tries)
AsyncJob::swanSong();
}
+/// HappyConnOpener::Attempt printer for debugging
+std::ostream &
+operator <<(std::ostream &os, const HappyConnOpener::Attempt &attempt)
+{
+ if (!attempt.path)
+ os << '-';
+ else if (attempt.path->isOpen())
+ os << "FD " << attempt.path->fd;
+ else if (attempt.connWait)
+ os << attempt.connWait;
+ else // destination is known; connection closed (and we are not opening any)
+ os << attempt.path->id;
+ return os;
+}
+
const char *
HappyConnOpener::status() const
{
- static MemBuf buf;
- buf.reset();
+ // TODO: In a redesigned status() API, the caller may mimic this approach.
+ static SBuf buf;
+ buf.clear();
- buf.append(" [", 2);
+ SBufStream os(buf);
+
+ os.write(" [", 2);
if (stopReason)
- buf.appendf("Stopped, reason:%s", stopReason);
- if (prime) {
- if (prime.path && prime.path->isOpen())
- buf.appendf(" prime FD %d", prime.path->fd);
- else if (prime.connector)
- buf.appendf(" prime call%ud", prime.connector->id.value);
- }
- if (spare) {
- if (spare.path && spare.path->isOpen())
- buf.appendf(" spare FD %d", spare.path->fd);
- else if (spare.connector)
- buf.appendf(" spare call%ud", spare.connector->id.value);
- }
+ os << "Stopped:" << stopReason;
+ if (prime)
+ os << "prime:" << prime;
+ if (spare)
+ os << "spare:" << spare;
if (n_tries)
- buf.appendf(" tries %d", n_tries);
- buf.appendf(" %s%u]", id.prefix(), id.value);
- buf.terminate();
+ os << " tries:" << n_tries;
+ os << ' ' << id << ']';
- return buf.content();
+ buf = os.buf();
+ return buf.c_str();
}
/// Create "503 Service Unavailable" or "504 Gateway Timeout" error depending
HappyConnOpener::startConnecting(Attempt &attempt, PeerConnectionPointer &dest)
{
Must(!attempt.path);
- Must(!attempt.connector);
+ Must(!attempt.connWait);
Must(dest);
const auto bumpThroughPeer = cause->flags.sslBumped && dest->getPeer();
entry->mem_obj->checkUrlChecksum();
#endif
- GetMarkingsToServer(cause.getRaw(), *dest);
+ const auto conn = dest->cloneProfile();
+ GetMarkingsToServer(cause.getRaw(), *conn);
- // ConnOpener modifies its destination argument so we reset the source port
- // in case we are reusing the destination already used by our predecessor.
- dest->local.port(0);
++n_tries;
typedef CommCbMemFunT<HappyConnOpener, CommConnectCbParams> Dialer;
- AsyncCall::Pointer callConnect = JobCallback(48, 5, Dialer, this, HappyConnOpener::connectDone);
+ AsyncCall::Pointer callConnect = asyncCall(48, 5, attempt.callbackMethodName,
+ Dialer(this, attempt.callbackMethod));
const time_t connTimeout = dest->connectTimeout(fwdStart);
- Comm::ConnOpener *cs = new Comm::ConnOpener(dest, callConnect, connTimeout);
- if (!dest->getPeer())
+ auto cs = new Comm::ConnOpener(conn, callConnect, connTimeout);
+ if (!conn->getPeer())
cs->setHost(host_);
- attempt.path = dest;
- attempt.connector = callConnect;
- attempt.opener = cs;
+ attempt.path = dest; // but not the being-opened conn!
+ attempt.connWait.start(cs, callConnect);
+}
- AsyncJob::Start(cs);
+/// Comm::ConnOpener callback for the prime connection attempt
+void
+HappyConnOpener::notePrimeConnectDone(const CommConnectCbParams ¶ms)
+{
+ handleConnOpenerAnswer(prime, params, "new prime connection");
}
-/// called by Comm::ConnOpener objects after a prime or spare connection attempt
-/// completes (successfully or not)
+/// Comm::ConnOpener callback for the spare connection attempt
void
-HappyConnOpener::connectDone(const CommConnectCbParams ¶ms)
+HappyConnOpener::noteSpareConnectDone(const CommConnectCbParams ¶ms)
{
- Must(params.conn);
- const bool itWasPrime = (params.conn == prime.path);
- const bool itWasSpare = (params.conn == spare.path);
- Must(itWasPrime != itWasSpare);
-
- PeerConnectionPointer handledPath;
- if (itWasPrime) {
- handledPath = prime.path;
- prime.finish();
- } else {
- handledPath = spare.path;
- spare.finish();
- if (gotSpareAllowance) {
- TheSpareAllowanceGiver.jobUsedAllowance();
- gotSpareAllowance = false;
- }
+ if (gotSpareAllowance) {
+ TheSpareAllowanceGiver.jobUsedAllowance();
+ gotSpareAllowance = false;
}
+ handleConnOpenerAnswer(spare, params, "new spare connection");
+}
+
+/// prime/spare-agnostic processing of a Comm::ConnOpener result
+void
+HappyConnOpener::handleConnOpenerAnswer(Attempt &attempt, const CommConnectCbParams ¶ms, const char *what)
+{
+ Must(params.conn);
+
+ // finalize the previously selected path before attempt.finish() forgets it
+ auto handledPath = attempt.path;
+ handledPath.finalize(params.conn); // closed on errors
+ attempt.finish();
- const char *what = itWasPrime ? "new prime connection" : "new spare connection";
if (params.flag == Comm::OK) {
sendSuccess(handledPath, false, what);
return;
debugs(17, 8, what << " failed: " << params.conn);
if (const auto peer = params.conn->getPeer())
peerConnectFailed(peer);
- params.conn->close(); // TODO: Comm::ConnOpener should do this instead.
// remember the last failure (we forward it if we cannot connect anywhere)
lastFailedConnection = handledPath;
return false;
}
+HappyConnOpener::Attempt::Attempt(const CallbackMethod method, const char *methodName):
+ callbackMethod(method),
+ callbackMethodName(methodName)
+{
+}
+
+void
+HappyConnOpener::Attempt::finish()
+{
+ connWait.finish();
+ path = nullptr;
+}
+
void
HappyConnOpener::Attempt::cancel(const char *reason)
{
- if (connector) {
- connector->cancel(reason);
- CallJobHere(17, 3, opener, Comm::ConnOpener, noteAbort);
- }
- clear();
+ connWait.cancel(reason);
+ path = nullptr;
}
/// a connection opening attempt in progress (or falsy)
class Attempt {
public:
+ /// HappyConnOpener method implementing a ConnOpener callback
+ using CallbackMethod = void (HappyConnOpener::*)(const CommConnectCbParams &);
+
+ Attempt(const CallbackMethod method, const char *methodName);
+
explicit operator bool() const { return static_cast<bool>(path); }
/// reacts to a natural attempt completion (successful or otherwise)
- void finish() { clear(); }
+ void finish();
/// aborts an in-progress attempt
void cancel(const char *reason);
PeerConnectionPointer path; ///< the destination we are connecting to
- AsyncCall::Pointer connector; ///< our opener callback
- Comm::ConnOpener::Pointer opener; ///< connects to path and calls us
- private:
- /// cleans up after the attempt ends (successfully or otherwise)
- void clear() { path = nullptr; connector = nullptr; opener = nullptr; }
+ /// waits for a connection to the peer to be established/opened
+ JobWait<Comm::ConnOpener> connWait;
+
+ const CallbackMethod callbackMethod; ///< ConnOpener calls this method
+ const char * const callbackMethodName; ///< for callbackMethod debugging
};
+ friend std::ostream &operator <<(std::ostream &, const Attempt &);
/* AsyncJob API */
virtual void start() override;
void openFreshConnection(Attempt &, PeerConnectionPointer &);
bool reuseOldConnection(PeerConnectionPointer &);
- void connectDone(const CommConnectCbParams &);
+ void notePrimeConnectDone(const CommConnectCbParams &);
+ void noteSpareConnectDone(const CommConnectCbParams &);
+ void handleConnOpenerAnswer(Attempt &, const CommConnectCbParams &, const char *connDescription);
void checkForNewConnection();
PeerPoolMgr::PeerPoolMgr(CachePeer *aPeer): AsyncJob("PeerPoolMgr"),
peer(cbdataReference(aPeer)),
request(),
- opener(),
- securer(),
- closer(),
+ transportWait(),
+ encryptionWait(),
addrUsed(0)
{
}
void
PeerPoolMgr::handleOpenedConnection(const CommConnectCbParams ¶ms)
{
- opener = NULL;
+ transportWait.finish();
if (!validPeer()) {
debugs(48, 3, "peer gone");
}
if (params.flag != Comm::OK) {
- /* it might have been a timeout with a partially open link */
- if (params.conn != NULL)
- params.conn->close();
peerConnectFailed(peer);
checkpoint("conn opening failure"); // may retry
return;
// Handle TLS peers.
if (peer->secure.encryptTransport) {
- typedef CommCbMemFunT<PeerPoolMgr, CommCloseCbParams> CloserDialer;
- closer = JobCallback(48, 3, CloserDialer, this,
- PeerPoolMgr::handleSecureClosure);
- comm_add_close_handler(params.conn->fd, closer);
-
- securer = asyncCall(48, 4, "PeerPoolMgr::handleSecuredPeer",
- MyAnswerDialer(this, &PeerPoolMgr::handleSecuredPeer));
+ // XXX: Exceptions orphan params.conn
+ AsyncCall::Pointer callback = asyncCall(48, 4, "PeerPoolMgr::handleSecuredPeer",
+ MyAnswerDialer(this, &PeerPoolMgr::handleSecuredPeer));
const auto peerTimeout = peer->connectTimeout();
const int timeUsed = squid_curtime - params.conn->startTime();
// Use positive timeout when less than one second is left for conn.
const int timeLeft = positiveTimeout(peerTimeout - timeUsed);
- auto *connector = new Security::BlindPeerConnector(request, params.conn, securer, nullptr, timeLeft);
- AsyncJob::Start(connector); // will call our callback
+ const auto connector = new Security::BlindPeerConnector(request, params.conn, callback, nullptr, timeLeft);
+ encryptionWait.start(connector, callback);
return;
}
void
PeerPoolMgr::handleSecuredPeer(Security::EncryptorAnswer &answer)
{
- Must(securer != NULL);
- securer = NULL;
-
- if (closer != NULL) {
- if (answer.conn != NULL)
- comm_remove_close_handler(answer.conn->fd, closer);
- else
- closer->cancel("securing completed");
- closer = NULL;
- }
+ encryptionWait.finish();
if (!validPeer()) {
debugs(48, 3, "peer gone");
return;
}
+ assert(!answer.tunneled);
if (answer.error.get()) {
- if (answer.conn != NULL)
- answer.conn->close();
+ assert(!answer.conn);
// PeerConnector calls peerConnectFailed() for us;
checkpoint("conn securing failure"); // may retry
return;
}
- pushNewConnection(answer.conn);
-}
+ assert(answer.conn);
-void
-PeerPoolMgr::handleSecureClosure(const CommCloseCbParams &)
-{
- Must(closer != NULL);
- Must(securer != NULL);
- securer->cancel("conn closed by a 3rd party");
- securer = NULL;
- closer = NULL;
- // allow the closing connection to fully close before we check again
- Checkpoint(this, "conn closure while securing");
+ // The socket could get closed while our callback was queued. Sync
+ // Connection. XXX: Connection::fd may already be stale/invalid here.
+ if (answer.conn->isOpen() && fd_table[answer.conn->fd].closing()) {
+ answer.conn->noteClosure();
+ checkpoint("external connection closure"); // may retry
+ return;
+ }
+
+ pushNewConnection(answer.conn);
}
void
PeerPoolMgr::openNewConnection()
{
// KISS: Do nothing else when we are already doing something.
- if (opener != NULL || securer != NULL || shutting_down) {
- debugs(48, 7, "busy: " << opener << '|' << securer << '|' << shutting_down);
+ if (transportWait || encryptionWait || shutting_down) {
+ debugs(48, 7, "busy: " << transportWait << '|' << encryptionWait << '|' << shutting_down);
return; // there will be another checkpoint when we are done opening/securing
}
const auto ctimeout = peer->connectTimeout();
typedef CommCbMemFunT<PeerPoolMgr, CommConnectCbParams> Dialer;
- opener = JobCallback(48, 5, Dialer, this, PeerPoolMgr::handleOpenedConnection);
- Comm::ConnOpener *cs = new Comm::ConnOpener(conn, opener, ctimeout);
- AsyncJob::Start(cs);
+ AsyncCall::Pointer callback = JobCallback(48, 5, Dialer, this, PeerPoolMgr::handleOpenedConnection);
+ const auto cs = new Comm::ConnOpener(conn, callback, ctimeout);
+ transportWait.start(cs, callback);
}
void
#define SQUID_PEERPOOLMGR_H
#include "base/AsyncJob.h"
+#include "base/JobWait.h"
#include "comm/forward.h"
#include "security/forward.h"
/// Security::PeerConnector callback
void handleSecuredPeer(Security::EncryptorAnswer &answer);
- /// called when the connection we are trying to secure is closed by a 3rd party
- void handleSecureClosure(const CommCloseCbParams ¶ms);
-
/// the final step in connection opening (and, optionally, securing) sequence
void pushNewConnection(const Comm::ConnectionPointer &conn);
private:
CachePeer *peer; ///< the owner of the pool we manage
RefCount<HttpRequest> request; ///< fake HTTP request for conn opening code
- AsyncCall::Pointer opener; ///< whether we are opening a connection
- AsyncCall::Pointer securer; ///< whether we are securing a connection
- AsyncCall::Pointer closer; ///< monitors conn while we are securing it
+
+ /// waits for a transport connection to the peer to be established/opened
+ JobWait<Comm::ConnOpener> transportWait;
+
+ /// waits for the established transport connection to be secured/encrypted
+ JobWait<Security::BlindPeerConnector> encryptionWait;
+
unsigned int addrUsed; ///< counter for cycling through peer addresses
};
while (++pathsToSkip < paths_.size() && !paths_[pathsToSkip].available) {}
}
- const auto cleanPath = path.connection->cloneDestinationDetails();
+ const auto cleanPath = path.connection->cloneProfile();
return PeerConnectionPointer(cleanPath, found - paths_.begin());
}
openConnection();
}
-// connection with the ICAP service established
-void Adaptation::Icap::ModXact::handleCommConnected()
+void Adaptation::Icap::ModXact::startShoveling()
{
Must(state.writing == State::writingConnect);
virtual void noteBodyProductionEnded(BodyPipe::Pointer);
virtual void noteBodyProducerAborted(BodyPipe::Pointer);
- // comm handlers
- virtual void handleCommConnected();
+ /* Xaction API */
+ virtual void startShoveling();
virtual void handleCommWrote(size_t size);
virtual void handleCommRead(size_t size);
+
void handleCommWroteHeaders();
void handleCommWroteBody();
openConnection();
}
-void Adaptation::Icap::OptXact::handleCommConnected()
+void Adaptation::Icap::OptXact::startShoveling()
{
scheduleRead();
OptXact(ServiceRep::Pointer &aService);
protected:
+ /* Xaction API */
virtual void start();
- virtual void handleCommConnected();
+ virtual void startShoveling();
virtual void handleCommWrote(size_t size);
virtual void handleCommRead(size_t size);
// should be configurable.
}
-// returns a persistent or brand new connection; negative int on failures
Comm::ConnectionPointer
-Adaptation::Icap::ServiceRep::getConnection(bool retriableXact, bool &reused)
+Adaptation::Icap::ServiceRep::getIdleConnection(const bool retriableXact)
{
Comm::ConnectionPointer connection;
else
theIdleConns->closeN(1);
- reused = Comm::IsConnOpen(connection);
++theBusyConns;
debugs(93,3, HERE << "got connection: " << connection);
return connection;
// do not pool an idle connection if we owe connections
if (isReusable && excessConnections() == 0) {
debugs(93, 3, HERE << "pushing pconn" << comment);
- commUnsetConnTimeout(conn);
theIdleConns->push(conn);
} else {
debugs(93, 3, HERE << (sendReset ? "RST" : "FIN") << "-closing " <<
bool wantsPreview(const SBuf &urlPath, size_t &wantedSize) const;
bool allows204() const;
bool allows206() const;
- Comm::ConnectionPointer getConnection(bool isRetriable, bool &isReused);
+ /// \returns an idle persistent ICAP connection or nil
+ Comm::ConnectionPointer getIdleConnection(bool isRetriable);
void putConnection(const Comm::ConnectionPointer &conn, bool isReusable, bool sendReset, const char *comment);
void noteConnectionUse(const Comm::ConnectionPointer &conn);
void noteConnectionFailed(const char *comment);
#include "adaptation/icap/Config.h"
#include "adaptation/icap/Launcher.h"
#include "adaptation/icap/Xaction.h"
+#include "base/JobWait.h"
#include "base/TextException.h"
#include "comm.h"
#include "comm/Connection.h"
icapRequest(NULL),
icapReply(NULL),
attempts(0),
- connection(NULL),
theService(aService),
commEof(false),
reuseConnection(true),
isRepeatable(true),
ignoreLastWrite(false),
waitingForDns(false),
- stopReason(NULL),
- connector(NULL),
- reader(NULL),
- writer(NULL),
- closer(NULL),
alep(new AccessLogEntry),
- al(*alep),
- cs(NULL)
+ al(*alep)
{
debugs(93,3, typeName << " constructed, this=" << this <<
" [icapx" << id << ']'); // we should not call virtual status() here
icapLookupDnsResults(const ipcache_addrs *ia, const Dns::LookupDetails &, void *data)
{
Adaptation::Icap::Xaction *xa = static_cast<Adaptation::Icap::Xaction *>(data);
+ /// TODO: refactor with CallJobHere1, passing either std::optional (after upgrading to C++17)
+ /// or Optional<Ip::Address> (when it can take non-trivial types)
xa->dnsLookupDone(ia);
}
if (!TheConfig.reuse_connections)
disableRetries(); // this will also safely drain pconn pool
- bool wasReused = false;
- connection = s.getConnection(isRetriable, wasReused);
-
- if (wasReused && Comm::IsConnOpen(connection)) {
- // Set comm Close handler
- // fake the connect callback
- // TODO: can we sync call Adaptation::Icap::Xaction::noteCommConnected here instead?
- typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> Dialer;
- CbcPointer<Xaction> self(this);
- Dialer dialer(self, &Adaptation::Icap::Xaction::noteCommConnected);
- dialer.params.conn = connection;
- dialer.params.flag = Comm::OK;
- // fake other parameters by copying from the existing connection
- connector = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected", dialer);
- ScheduleCallHere(connector);
+ if (const auto pconn = s.getIdleConnection(isRetriable)) {
+ useTransportConnection(pconn);
return;
}
#if WHEN_IPCACHE_NBGETHOSTBYNAME_USES_ASYNC_CALLS
dieOnConnectionFailure(); // throws
#else // take a step back into protected Async call dialing.
- // fake the connect callback
- typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> Dialer;
- CbcPointer<Xaction> self(this);
- Dialer dialer(self, &Adaptation::Icap::Xaction::noteCommConnected);
- dialer.params.conn = connection;
- dialer.params.flag = Comm::COMM_ERROR;
- // fake other parameters by copying from the existing connection
- connector = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected", dialer);
- ScheduleCallHere(connector);
+ CallJobHere(93, 3, this, Xaction, Xaction::dieOnConnectionFailure);
#endif
return;
}
- connection = new Comm::Connection;
- connection->remote = ia->current();
- connection->remote.port(s.cfg().port);
- getOutgoingAddress(NULL, connection);
+ const Comm::ConnectionPointer conn = new Comm::Connection();
+ conn->remote = ia->current();
+ conn->remote.port(s.cfg().port);
+ getOutgoingAddress(nullptr, conn);
// TODO: service bypass status may differ from that of a transaction
typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> ConnectDialer;
- connector = JobCallback(93,3, ConnectDialer, this, Adaptation::Icap::Xaction::noteCommConnected);
- cs = new Comm::ConnOpener(connection, connector, TheConfig.connect_timeout(service().cfg().bypass));
+ AsyncCall::Pointer callback = JobCallback(93, 3, ConnectDialer, this, Adaptation::Icap::Xaction::noteCommConnected);
+ const auto cs = new Comm::ConnOpener(conn, callback, TheConfig.connect_timeout(service().cfg().bypass));
cs->setHost(s.cfg().host.termedBuf());
- AsyncJob::Start(cs.get());
+ transportWait.start(cs, callback);
}
void Adaptation::Icap::Xaction::closeConnection()
closer = NULL;
}
+ commUnsetConnTimeout(connection);
+
cancelRead(); // may not work
if (reuseConnection && !doneWithIo()) {
writer = NULL;
reader = NULL;
- connector = NULL;
connection = NULL;
}
}
-// connection with the ICAP service established
+/// called when the connection attempt to an ICAP service completes (successfully or not)
void Adaptation::Icap::Xaction::noteCommConnected(const CommConnectCbParams &io)
{
- cs = NULL;
+ transportWait.finish();
- if (io.flag == Comm::TIMEOUT) {
- handleCommTimedout();
+ if (io.flag != Comm::OK) {
+ dieOnConnectionFailure(); // throws
return;
}
- Must(connector != NULL);
- connector = NULL;
-
- if (io.flag != Comm::OK)
- dieOnConnectionFailure(); // throws
-
- typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommTimeoutCbParams> TimeoutDialer;
- AsyncCall::Pointer timeoutCall = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommTimedout",
- TimeoutDialer(this,&Adaptation::Icap::Xaction::noteCommTimedout));
- commSetConnTimeout(io.conn, TheConfig.connect_timeout(service().cfg().bypass), timeoutCall);
+ useTransportConnection(io.conn);
+}
- typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommCloseCbParams> CloseDialer;
- closer = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed",
- CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed));
- comm_add_close_handler(io.conn->fd, closer);
+/// React to the availability of a transport connection to the ICAP service.
+/// The given connection may (or may not) be secured already.
+void
+Adaptation::Icap::Xaction::useTransportConnection(const Comm::ConnectionPointer &conn)
+{
+ assert(Comm::IsConnOpen(conn));
+ assert(!connection);
// If it is a reused connection and the TLS object is built
// we should not negotiate new TLS session
- const auto &ssl = fd_table[io.conn->fd].ssl;
+ const auto &ssl = fd_table[conn->fd].ssl;
if (!ssl && service().cfg().secure.encryptTransport) {
+ // XXX: Exceptions orphan conn.
CbcPointer<Adaptation::Icap::Xaction> me(this);
- securer = asyncCall(93, 4, "Adaptation::Icap::Xaction::handleSecuredPeer",
- MyIcapAnswerDialer(me, &Adaptation::Icap::Xaction::handleSecuredPeer));
+ AsyncCall::Pointer callback = asyncCall(93, 4, "Adaptation::Icap::Xaction::handleSecuredPeer",
+ MyIcapAnswerDialer(me, &Adaptation::Icap::Xaction::handleSecuredPeer));
- auto *sslConnector = new Ssl::IcapPeerConnector(theService, io.conn, securer, masterLogEntry(), TheConfig.connect_timeout(service().cfg().bypass));
- AsyncJob::Start(sslConnector); // will call our callback
+ const auto sslConnector = new Ssl::IcapPeerConnector(theService, conn, callback, masterLogEntry(), TheConfig.connect_timeout(service().cfg().bypass));
+
+ encryptionWait.start(sslConnector, callback);
return;
}
-// ?? fd_table[io.conn->fd].noteUse(icapPconnPool);
+ useIcapConnection(conn);
+}
+
+/// react to the availability of a fully-ready ICAP connection
+void
+Adaptation::Icap::Xaction::useIcapConnection(const Comm::ConnectionPointer &conn)
+{
+ assert(!connection);
+ assert(conn);
+ assert(Comm::IsConnOpen(conn));
+ connection = conn;
service().noteConnectionUse(connection);
- handleCommConnected();
+ typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommCloseCbParams> CloseDialer;
+ closer = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed",
+ CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed));
+ comm_add_close_handler(connection->fd, closer);
+
+ startShoveling();
}
void Adaptation::Icap::Xaction::dieOnConnectionFailure()
// communication timeout with the ICAP service
void Adaptation::Icap::Xaction::noteCommTimedout(const CommTimeoutCbParams &)
-{
- handleCommTimedout();
-}
-
-void Adaptation::Icap::Xaction::handleCommTimedout()
{
debugs(93, 2, HERE << typeName << " failed: timeout with " <<
theService->cfg().methodStr() << " " <<
theService->cfg().uri << status());
reuseConnection = false;
- const bool whileConnecting = connector != NULL;
- if (whileConnecting) {
- assert(!haveConnection());
- theService->noteConnectionFailed("timedout");
- } else
- closeConnection(); // so that late Comm callbacks do not disturb bypass
- throw TexcHere(whileConnecting ?
- "timed out while connecting to the ICAP service" :
- "timed out while talking to the ICAP service");
+ assert(haveConnection());
+ theService->noteConnectionFailed("timedout");
+ closeConnection();
+ throw TextException("timed out while talking to the ICAP service", Here());
}
// unexpected connection close while talking to the ICAP service
void Adaptation::Icap::Xaction::noteCommClosed(const CommCloseCbParams &)
{
- if (securer != NULL) {
- securer->cancel("Connection closed before SSL negotiation finished");
- securer = NULL;
+ if (connection) {
+ connection->noteClosure();
+ connection = nullptr;
}
closer = NULL;
- handleCommClosed();
-}
-void Adaptation::Icap::Xaction::handleCommClosed()
-{
static const auto d = MakeNamedErrorDetail("ICAP_XACT_CLOSE");
detailError(d);
mustStop("ICAP service connection externally closed");
bool Adaptation::Icap::Xaction::doneAll() const
{
- return !waitingForDns && !connector && !securer && !reader && !writer &&
+ return !waitingForDns && !transportWait && !encryptionWait &&
+ !reader && !writer &&
Adaptation::Initiate::doneAll();
}
bool Adaptation::Icap::Xaction::doneWithIo() const
{
return haveConnection() &&
- !connector && !reader && !writer && // fast checks, some redundant
+ !transportWait && !reader && !writer && // fast checks, some redundant
doneReading() && doneWriting();
}
void Adaptation::Icap::Xaction::swanSong()
{
// kids should sing first and then call the parent method.
- if (cs.valid()) {
- debugs(93,6, HERE << id << " about to notify ConnOpener!");
- CallJobHere(93, 3, cs, Comm::ConnOpener, noteAbort);
- cs = NULL;
+ if (transportWait || encryptionWait) {
service().noteConnectionFailed("abort");
}
void
Adaptation::Icap::Xaction::handleSecuredPeer(Security::EncryptorAnswer &answer)
{
- Must(securer != NULL);
- securer = NULL;
-
- if (closer != NULL) {
- if (Comm::IsConnOpen(answer.conn))
- comm_remove_close_handler(answer.conn->fd, closer);
- else
- closer->cancel("securing completed");
- closer = NULL;
- }
+ encryptionWait.finish();
+ assert(!answer.tunneled);
if (answer.error.get()) {
- if (answer.conn != NULL)
- answer.conn->close();
+ assert(!answer.conn);
+ // TODO: Refactor dieOnConnectionFailure() to be usable here as well.
debugs(93, 2, typeName <<
" TLS negotiation to " << service().cfg().uri << " failed");
service().noteConnectionFailed("failure");
debugs(93, 5, "TLS negotiation to " << service().cfg().uri << " complete");
- service().noteConnectionUse(answer.conn);
+ assert(answer.conn);
+
+ // The socket could get closed while our callback was queued. Sync
+ // Connection. XXX: Connection::fd may already be stale/invalid here.
+ if (answer.conn->isOpen() && fd_table[answer.conn->fd].closing()) {
+ answer.conn->noteClosure();
+ service().noteConnectionFailed("external TLS connection closure");
+ static const auto d = MakeNamedErrorDetail("ICAP_XACT_SSL_CLOSE");
+ detailError(d);
+ throw TexcHere("external closure of the TLS ICAP service connection");
+ }
- handleCommConnected();
+ useIcapConnection(answer.conn);
}
#include "AccessLogEntry.h"
#include "adaptation/icap/ServiceRep.h"
#include "adaptation/Initiate.h"
+#include "base/JobWait.h"
#include "comm/ConnOpener.h"
#include "error/forward.h"
#include "HttpReply.h"
class MemBuf;
+namespace Ssl {
+class IcapPeerConnector;
+}
+
namespace Adaptation
{
namespace Icap
virtual void start();
virtual void noteInitiatorAborted(); // TODO: move to Adaptation::Initiate
+ /// starts sending/receiving ICAP messages
+ virtual void startShoveling() = 0;
+
// comm hanndlers; called by comm handler wrappers
- virtual void handleCommConnected() = 0;
virtual void handleCommWrote(size_t sz) = 0;
virtual void handleCommRead(size_t sz) = 0;
- virtual void handleCommTimedout();
- virtual void handleCommClosed();
void handleSecuredPeer(Security::EncryptorAnswer &answer);
/// record error detail if possible
void openConnection();
void closeConnection();
- void dieOnConnectionFailure();
bool haveConnection() const;
void scheduleRead();
ServiceRep &service();
private:
+ void useTransportConnection(const Comm::ConnectionPointer &);
+ void useIcapConnection(const Comm::ConnectionPointer &);
+ void dieOnConnectionFailure();
void tellQueryAborted();
void maybeLog();
protected:
- Comm::ConnectionPointer connection; ///< ICAP server connection
Adaptation::Icap::ServiceRep::Pointer theService;
SBuf readBuf;
bool ignoreLastWrite;
bool waitingForDns; ///< expecting a ipcache_nbgethostbyname() callback
- const char *stopReason;
-
- // active (pending) comm callbacks for the ICAP server connection
- AsyncCall::Pointer connector;
AsyncCall::Pointer reader;
AsyncCall::Pointer writer;
- AsyncCall::Pointer closer;
AccessLogEntry::Pointer alep; ///< icap.log entry
AccessLogEntry &al; ///< short for *alep
timeval icap_tio_finish; /*time when the last byte of the ICAP responsewas received*/
private:
- Comm::ConnOpener::Pointer cs;
- AsyncCall::Pointer securer; ///< whether we are securing a connection
+ /// waits for a transport connection to the ICAP server to be established/opened
+ JobWait<Comm::ConnOpener> transportWait;
+
+ /// waits for the established transport connection to be secured/encrypted
+ JobWait<Ssl::IcapPeerConnector> encryptionWait;
+
+ /// open and, if necessary, secured connection to the ICAP server (or nil)
+ Comm::ConnectionPointer connection;
+
+ AsyncCall::Pointer closer;
};
} // namespace Icap
InstanceIdDefinitions(AsyncJob, "job");
-AsyncJob::Pointer AsyncJob::Start(AsyncJob *j)
+void
+AsyncJob::Start(const Pointer &job)
{
- AsyncJob::Pointer job(j);
CallJobHere(93, 5, job, AsyncJob, start);
- return job;
+ job->started_ = true; // it is the attempt that counts
}
AsyncJob::AsyncJob(const char *aTypeName) :
{
debugs(93,5, "AsyncJob destructed, this=" << this <<
" type=" << typeName << " [" << id << ']');
+ assert(!started_ || swanSang_);
}
void AsyncJob::start()
AsyncCall::Pointer inCallSaved = inCall;
void *thisSaved = this;
+ // TODO: Swallow swanSong() exceptions to reduce memory leaks.
+
+ // Job callback invariant: swanSong() is (only) called for started jobs.
+ // Here to detect violations in kids that forgot to call our swanSong().
+ assert(started_);
+
+ swanSang_ = true; // it is the attempt that counts
swanSong();
- delete this; // this is the only place where the object is deleted
+ delete this; // this is the only place where a started job is deleted
// careful: this object does not exist any more
debugs(93, 6, HERE << *inCallSaved << " ended " << thisSaved);
public:
AsyncJob(const char *aTypeName);
- /// starts a freshly created job (i.e., makes the job asynchronous)
- static Pointer Start(AsyncJob *job);
+ /// Promises to start the configured job (eventually). The job is deemed to
+ /// be running asynchronously beyond this point, so the caller should only
+ /// access the job object via AsyncCalls rather than directly.
+ ///
+ /// swanSong() is only called for jobs for which this method has returned
+ /// successfully (i.e. without throwing).
+ static void Start(const Pointer &job);
protected:
// XXX: temporary method to replace "delete this" in jobs-in-transition.
/// called when the job throws during an async call
virtual void callException(const std::exception &e);
+ /// process external request to terminate now (i.e. during this async call)
+ void handleStopRequest() { mustStop("externally aborted"); }
+
+ const InstanceId<AsyncJob> id; ///< job identifier
+
protected:
// external destruction prohibited to ensure swanSong() is called
virtual ~AsyncJob();
const char *stopReason; ///< reason for forcing done() to be true
const char *typeName; ///< kid (leaf) class name, for debugging
AsyncCall::Pointer inCall; ///< the asynchronous call being handled, if any
- const InstanceId<AsyncJob> id; ///< job identifier
+
+ bool started_ = false; ///< Start() has finished successfully
+ bool swanSang_ = false; ///< swanSong() was called
};
#endif /* SQUID_ASYNC_JOB_H */
--- /dev/null
+/*
+ * Copyright (C) 1996-2020 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#include "squid.h"
+#include "base/AsyncJobCalls.h"
+#include "base/JobWait.h"
+
+#include <cassert>
+#include <iostream>
+
+JobWaitBase::JobWaitBase() = default;
+
+JobWaitBase::~JobWaitBase()
+{
+ cancel("owner gone");
+}
+
+void
+JobWaitBase::start_(const AsyncJob::Pointer aJob, const AsyncCall::Pointer aCall)
+{
+ // Invariant: The wait will be over. We cannot guarantee that the job will
+ // call the callback, of course, but we can check these prerequisites.
+ assert(aCall);
+ assert(aJob.valid());
+
+ // "Double" waiting state leads to conflicting/mismatching callbacks
+ // detailed in finish(). Detect that bug ASAP.
+ assert(!waiting());
+
+ assert(!callback_);
+ assert(!job_);
+ callback_ = aCall;
+ job_ = aJob;
+
+ AsyncJob::Start(job_.get());
+}
+
+void
+JobWaitBase::finish()
+{
+ // Unexpected callbacks might result in disasters like secrets exposure,
+ // data corruption, or expensive message routing mistakes when the callback
+ // info is applied to the wrong message part or acted upon prematurely.
+ assert(waiting());
+ clear();
+}
+
+void
+JobWaitBase::cancel(const char *reason)
+{
+ if (callback_) {
+ callback_->cancel(reason);
+
+ // Instead of AsyncJob, the class parameter could be Job. That would
+ // avoid runtime child-to-parent CbcPointer conversion overheads, but
+ // complicate support for Jobs with virtual AsyncJob bases (GCC error:
+ // "pointer to member conversion via virtual base AsyncJob") and also
+ // cache-log "Job::handleStopRequest()" with a non-existent class name.
+ CallJobHere(callback_->debugSection, callback_->debugLevel, job_, AsyncJob, handleStopRequest);
+
+ clear();
+ }
+}
+
+void
+JobWaitBase::print(std::ostream &os) const
+{
+ // use a backarrow to emphasize that this is a callback: call24<-job6
+ if (callback_)
+ os << callback_->id << "<-";
+ if (const auto rawJob = job_.get())
+ os << rawJob->id;
+ else
+ os << job_; // raw pointer of a gone job may still be useful for triage
+}
+
--- /dev/null
+/*
+ * Copyright (C) 1996-2021 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#ifndef SQUID_BASE_JOBWAIT_H
+#define SQUID_BASE_JOBWAIT_H
+
+#include "base/AsyncJob.h"
+#include "base/CbcPointer.h"
+
+#include <iosfwd>
+
+/// Manages waiting for an AsyncJob callback. Use type-safe JobWait instead.
+/// This base class does not contain code specific to the actual Job type.
+class JobWaitBase
+{
+public:
+ JobWaitBase();
+ ~JobWaitBase();
+
+ /// no copying of any kind: each waiting context needs a dedicated AsyncCall
+ JobWaitBase(JobWaitBase &&) = delete;
+
+ explicit operator bool() const { return waiting(); }
+
+ /// whether we are currently waiting for the job to call us back
+ /// the job itself may be gone even if this returns true
+ bool waiting() const { return bool(callback_); }
+
+ /// ends wait (after receiving the call back)
+ /// forgets the job which is likely to be gone by now
+ void finish();
+
+ /// aborts wait (if any) before receiving the call back
+ /// does nothing if we are not waiting
+ void cancel(const char *reason);
+
+ /// summarizes what we are waiting for (for debugging)
+ void print(std::ostream &) const;
+
+protected:
+ /// starts waiting for the given job to call the given callback
+ void start_(AsyncJob::Pointer, AsyncCall::Pointer);
+
+private:
+ /// the common part of finish() and cancel()
+ void clear() { job_.clear(); callback_ = nullptr; }
+
+ /// the job that we are waiting to call us back (or nil)
+ AsyncJob::Pointer job_;
+
+ /// the call we are waiting for the job_ to make (or nil)
+ AsyncCall::Pointer callback_;
+};
+
+/// Manages waiting for an AsyncJob callback.
+/// Completes JobWaitBase by providing Job type-specific members.
+template <class Job>
+class JobWait: public JobWaitBase
+{
+public:
+ typedef CbcPointer<Job> JobPointer;
+
+ /// starts waiting for the given job to call the given callback
+ void start(const JobPointer &aJob, const AsyncCall::Pointer &aCallback) {
+ start_(aJob, aCallback);
+ typedJob_ = aJob;
+ }
+
+ /// \returns a cbdata pointer to the job we are waiting for (or nil)
+ /// the returned pointer may be falsy, even if we are still waiting()
+ JobPointer job() const { return waiting() ? typedJob_ : nullptr; }
+
+private:
+ /// nearly duplicates JobWaitBase::job_ but exposes the actual job type
+ JobPointer typedJob_;
+};
+
+inline
+std::ostream &operator <<(std::ostream &os, const JobWaitBase &wait)
+{
+ wait.print(os);
+ return os;
+}
+
+#endif /* SQUID_BASE_JOBWAIT_H */
+
Here.h \
InstanceId.cc \
InstanceId.h \
+ JobWait.cc \
+ JobWait.h \
Lock.h \
LookupTable.h \
Optional.h \
template<class Cbc> class CbcPointer;
template<class RefCountableKid> class RefCount;
+template<class Job> class JobWait;
typedef CbcPointer<AsyncJob> AsyncJobPointer;
typedef RefCount<CodeContext> CodeContextPointer;
/* This is a handler normally called by comm_close() */
void ConnStateData::connStateClosed(const CommCloseCbParams &)
{
+ if (clientConnection) {
+ clientConnection->noteClosure();
+ // keep closed clientConnection for logging, clientdb cleanup, etc.
+ }
deleteThis("ConnStateData::connStateClosed");
}
Ftp::Client::~Client()
{
- if (data.opener != NULL) {
- data.opener->cancel("Ftp::Client destructed");
- data.opener = NULL;
- }
data.close();
safe_free(old_request);
debugs(9, 3, "connecting to " << conn->remote);
typedef CommCbMemFunT<Client, CommConnectCbParams> Dialer;
- data.opener = JobCallback(9, 3, Dialer, this, Ftp::Client::dataChannelConnected);
- Comm::ConnOpener *cs = new Comm::ConnOpener(conn, data.opener, Config.Timeout.connect);
+ AsyncCall::Pointer callback = JobCallback(9, 3, Dialer, this, Ftp::Client::dataChannelConnected);
+ const auto cs = new Comm::ConnOpener(conn, callback, Config.Timeout.connect);
cs->setHost(data.host);
- AsyncJob::Start(cs);
+ dataConnWait.start(cs, callback);
}
bool
Ftp::Client::dataClosed(const CommCloseCbParams &)
{
debugs(9, 4, status());
+ if (data.conn)
+ data.conn->noteClosure();
if (data.listenConn != NULL) {
data.listenConn->close();
data.listenConn = NULL;
- // NP clear() does the: data.fd = -1;
}
data.clear();
}
Ftp::Client::ctrlClosed(const CommCloseCbParams &)
{
debugs(9, 4, status());
+ if (ctrl.conn)
+ ctrl.conn->noteClosure();
ctrl.clear();
doneWithFwd = "ctrlClosed()"; // assume FwdState is monitoring too
mustStop("Ftp::Client::ctrlClosed");
*/
Comm::ConnectionPointer listenConn;
- AsyncCall::Pointer opener; ///< Comm opener handler callback.
private:
AsyncCall::Pointer closer; ///< Comm close handler callback
};
virtual void sentRequestBody(const CommIoCbParams &io);
virtual void doneSendingRequestBody();
+ /// Waits for an FTP data connection to the server to be established/opened.
+ /// This wait only happens in FTP passive mode (via PASV or EPSV).
+ JobWait<Comm::ConnOpener> dataConnWait;
+
private:
bool parseControlReply(size_t &bytesUsed);
flags.pasv_supported = false;
debugs(9, DBG_IMPORTANT, "FTP Gateway timeout in SENT_PASV state");
- // cancel the data connection setup.
- if (data.opener != NULL) {
- data.opener->cancel("timeout");
- data.opener = NULL;
- }
+ // cancel the data connection setup, if any
+ dataConnWait.cancel("timeout");
+
data.close();
}
Ftp::Gateway::dataChannelConnected(const CommConnectCbParams &io)
{
debugs(9, 3, HERE);
- data.opener = NULL;
+ dataConnWait.finish();
if (io.flag != Comm::OK) {
debugs(9, 2, HERE << "Failed to connect. Retrying via another method.");
return !doneWithServer();
}
-AsyncJob::Pointer
+void
Ftp::StartGateway(FwdState *const fwdState)
{
- return AsyncJob::Start(new Ftp::Gateway(fwdState));
+ AsyncJob::Start(new Ftp::Gateway(fwdState));
}
Ftp::Relay::dataChannelConnected(const CommConnectCbParams &io)
{
debugs(9, 3, status());
- data.opener = NULL;
+ dataConnWait.finish();
if (io.flag != Comm::OK) {
debugs(9, 2, "failed to connect FTP server data channel");
ftpClient->dataComplete();
}
-AsyncJob::Pointer
+void
Ftp::StartRelay(FwdState *const fwdState)
{
- return AsyncJob::Start(new Ftp::Relay(fwdState));
+ AsyncJob::Start(new Ftp::Relay(fwdState));
}
Http::Tunneler::handleConnectionClosure(const CommCloseCbParams &)
{
closer = nullptr;
+ if (connection) {
+ countFailingConnection();
+ connection->noteClosure();
+ connection = nullptr;
+ }
bailWith(new ErrorState(ERR_CONNECT_FAIL, Http::scBadGateway, request.getRaw(), al));
}
Must(error);
answer().squidError = error;
- if (const auto p = connection->getPeer())
- peerConnectFailed(p);
+ if (const auto failingConnection = connection) {
+ // TODO: Reuse to-peer connections after a CONNECT error response.
+ countFailingConnection();
+ disconnect();
+ failingConnection->close();
+ }
callBack();
- disconnect();
-
- if (noteFwdPconnUse)
- fwdPconnPool->noteUses(fd_table[connection->fd].pconn.uses);
- // TODO: Reuse to-peer connections after a CONNECT error response.
- connection->close();
- connection = nullptr;
}
void
Http::Tunneler::sendSuccess()
{
assert(answer().positive());
- callBack();
+ assert(Comm::IsConnOpen(connection));
+ answer().conn = connection;
disconnect();
+ callBack();
+}
+
+void
+Http::Tunneler::countFailingConnection()
+{
+ assert(connection);
+ if (const auto p = connection->getPeer())
+ peerConnectFailed(p);
+ if (noteFwdPconnUse && connection->isOpen())
+ fwdPconnPool->noteUses(fd_table[connection->fd].pconn.uses);
}
void
Http::Tunneler::disconnect()
{
+ const auto stillOpen = Comm::IsConnOpen(connection);
+
if (closer) {
- comm_remove_close_handler(connection->fd, closer);
+ if (stillOpen)
+ comm_remove_close_handler(connection->fd, closer);
closer = nullptr;
}
if (reader) {
- Comm::ReadCancel(connection->fd, reader);
+ if (stillOpen)
+ Comm::ReadCancel(connection->fd, reader);
reader = nullptr;
}
- // remove connection timeout handler
- commUnsetConnTimeout(connection);
+ if (stillOpen)
+ commUnsetConnTimeout(connection);
+
+ connection = nullptr; // may still be open
}
void
Http::Tunneler::callBack()
{
- debugs(83, 5, connection << status());
- if (answer().positive())
- answer().conn = connection;
+ debugs(83, 5, answer().conn << status());
+ assert(!connection); // returned inside answer() or gone
auto cb = callback;
callback = nullptr;
ScheduleCallHere(cb);
AsyncJob::swanSong();
if (callback) {
- if (requestWritten && tunnelEstablished) {
+ if (requestWritten && tunnelEstablished && Comm::IsConnOpen(connection)) {
sendSuccess();
} else {
- // we should have bailed when we discovered the job-killing problem
- debugs(83, DBG_IMPORTANT, "BUG: Unexpected state while establishing a CONNECT tunnel " << connection << status());
+ // job-ending emergencies like handleStopRequest() or callException()
bailWith(new ErrorState(ERR_GATEWAY_FAILURE, Http::scInternalServerError, request.getRaw(), al));
}
assert(!callback);
void handleResponse(const bool eof);
void bailOnResponseError(const char *error, HttpReply *);
+private:
/// sends the given error to the initiator
void bailWith(ErrorState*);
/// a bailWith(), sendSuccess() helper: sends results to the initiator
void callBack();
- /// a bailWith(), sendSuccess() helper: stops monitoring the connection
+ /// stops monitoring the connection
void disconnect();
+ /// updates connection usage history before the connection is closed
+ void countFailingConnection();
+
TunnelerAnswer &answer();
-private:
AsyncCall::Pointer writer; ///< called when the request has been written
AsyncCall::Pointer reader; ///< called when the response should be read
AsyncCall::Pointer closer; ///< called when the connection is being closed
{
/// A new FTP Gateway job
-AsyncJobPointer StartGateway(FwdState *const fwdState);
+void StartGateway(FwdState *const fwdState);
/// A new FTP Relay job
-AsyncJobPointer StartRelay(FwdState *const fwdState);
+void StartRelay(FwdState *const fwdState);
/** Construct an URI with leading / in PATH portion for use by CWD command
* possibly others. FTP encodes absolute paths as beginning with '/'
// If call is not canceled schedule it for execution else ignore it
if (!call->canceled()) {
debugs(5, 5, "commCallCloseHandlers: ch->handler=" << call);
+ // XXX: Without the following code, callback fd may be -1.
+ // typedef CommCloseCbParams Params;
+ // auto ¶ms = GetCommParams<Params>(call);
+ // params.fd = fd;
ScheduleCallHere(call);
}
}
CbDataList<DeferredRead> *temp = (CbDataList<DeferredRead> *)params.data;
temp->element.closer = NULL;
+ if (temp->element.theRead.conn) {
+ temp->element.theRead.conn->noteClosure();
+ temp->element.theRead.conn = nullptr;
+ }
temp->element.markCancelled();
}
if (aRead.cancelled)
return;
+ // TODO: This check still allows theReader call with a closed theRead.conn.
+ // If a delayRead() caller has a close connection handler, then such a call
+ // would be useless and dangerous. If a delayRead() caller does not have it,
+ // then the caller will get stuck when an external connection closure makes
+ // aRead.cancelled (checked above) true.
if (Comm::IsConnOpen(aRead.theRead.conn) && fd_table[aRead.theRead.conn->fd].closing())
return;
deadline_(squid_curtime + static_cast<time_t>(ctimeout))
{
debugs(5, 3, "will connect to " << c << " with " << ctimeout << " timeout");
+ assert(conn_); // we know where to go
+
+ // Sharing a being-modified Connection object with the caller is dangerous,
+ // but we cannot ban (or even check for) that using existing APIs. We do not
+ // want to clone "just in case" because cloning is a bit expensive, and most
+ // callers already have a non-owned Connection object to give us. Until the
+ // APIs improve, we can only check that the connection is not open.
+ assert(!conn_->isOpen());
}
Comm::ConnOpener::~ConnOpener()
if (temporaryFd_ >= 0)
closeFd();
+ // did we abort while owning an open connection?
+ if (conn_ && conn_->isOpen())
+ conn_->close();
+
// did we abort while waiting between retries?
if (calls_.sleep_)
cancelSleep();
" [" << callback_->id << ']' );
// TODO save the pconn to the pconnPool ?
} else {
+ assert(conn_);
+
+ // free resources earlier and simplify recipients
+ if (errFlag != Comm::OK)
+ conn_->close(); // may not be opened
+ else
+ assert(conn_->isOpen());
+
typedef CommConnectCbParams Params;
Params ¶ms = GetCommParams<Params>(callback_);
params.conn = conn_;
+ conn_ = nullptr; // release ownership; prevent closure by us
params.flag = errFlag;
params.xerrno = xerrno;
ScheduleCallHere(callback_);
void
Comm::ConnOpener::cleanFd()
{
- debugs(5, 4, HERE << conn_ << " closing temp FD " << temporaryFd_);
+ debugs(5, 4, conn_ << "; temp FD " << temporaryFd_);
Must(temporaryFd_ >= 0);
fde &f = fd_table[temporaryFd_];
Comm::ConnOpener::createFd()
{
Must(temporaryFd_ < 0);
+ assert(conn_);
// our initiators signal abort by cancelling their callbacks
if (callback_ == NULL || callback_->canceled())
namespace Comm
{
-/**
- * Async-opener of a Comm connection.
- */
+/// Asynchronously opens a TCP connection. Returns CommConnectCbParams: either
+/// Comm::OK with an open connection or another Comm::Flag with a closed one.
class ConnOpener : public AsyncJob
{
CBDATA_CLASS(ConnOpener);
public:
- void noteAbort() { mustStop("externally aborted"); }
-
typedef CbcPointer<ConnOpener> Pointer;
virtual bool doneAll() const;
*/
#include "squid.h"
+#include "base/JobWait.h"
#include "CachePeer.h"
#include "cbdata.h"
#include "comm.h"
}
Comm::ConnectionPointer
-Comm::Connection::cloneDestinationDetails() const
+Comm::Connection::cloneProfile() const
{
- const ConnectionPointer c = new Comm::Connection;
- c->setAddrs(local, remote);
- c->peerType = peerType;
- c->flags = flags;
- c->peer_ = cbdataReference(getPeer());
- assert(!c->isOpen());
- return c;
-}
+ const ConnectionPointer clone = new Comm::Connection;
+ auto &c = *clone; // optimization
+
+ /*
+ * Copy or excuse each data member. Excused members do not belong to a
+ * Connection configuration profile because their values cannot be reused
+ * across (co-existing) Connection objects and/or are tied to their own
+ * object lifetime.
+ */
+
+ c.setAddrs(local, remote);
+ c.peerType = peerType;
+ // fd excused
+ c.tos = tos;
+ c.nfmark = nfmark;
+ c.nfConnmark = nfConnmark;
+ // COMM_ORPHANED is not a part of connection opening instructions
+ c.flags = flags & ~COMM_ORPHANED;
+ // rfc931 is excused
+
+#if USE_SQUID_EUI
+ // These are currently only set when accepting connections and never used
+ // for establishing new ones, so this copying is currently in vain, but,
+ // technically, they can be a part of connection opening instructions.
+ c.remoteEui48 = remoteEui48;
+ c.remoteEui64 = remoteEui64;
+#endif
-Comm::ConnectionPointer
-Comm::Connection::cloneIdentDetails() const
-{
- auto c = cloneDestinationDetails();
- c->tos = tos;
- c->nfmark = nfmark;
- c->nfConnmark = nfConnmark;
- c->startTime_ = startTime_;
- return c;
+ // id excused
+ c.peer_ = cbdataReference(getPeer());
+ // startTime_ excused
+ // tlsHistory excused
+
+ debugs(5, 5, this << " made " << c);
+ assert(!c.isOpen());
+ return clone;
}
void
/** Clear the connection properties and close any open socket. */
virtual ~Connection();
- /// Create a new (closed) IDENT Connection object based on our from-Squid
- /// connection properties.
- ConnectionPointer cloneIdentDetails() const;
+ /// To prevent accidental copying of Connection objects that we started to
+ /// open or that are open, use cloneProfile() instead.
+ Connection(const Connection &&) = delete;
- /// Create a new (closed) Connection object pointing to the same destination
- /// as this from-Squid connection.
- ConnectionPointer cloneDestinationDetails() const;
+ /// Create a new closed Connection with the same configuration as this one.
+ ConnectionPointer cloneProfile() const;
/// close the still-open connection when its last reference is gone
void enterOrphanage() { flags |= COMM_ORPHANED; }
virtual ScopedId codeContextGist() const override;
virtual std::ostream &detailCodeContext(std::ostream &os) const override;
-private:
- /** These objects may not be exactly duplicated. Use cloneIdentDetails() or
- * cloneDestinationDetails() instead.
- */
- Connection(const Connection &c);
-
- /** These objects may not be exactly duplicated. Use cloneIdentDetails() or
- * cloneDestinationDetails() instead.
- */
- Connection & operator =(const Connection &c);
-
public:
/** Address/Port for the Squid end of a TCP link. */
Ip::Address local;
Comm::TcpAcceptor::handleClosure(const CommCloseCbParams &)
{
closer_ = NULL;
- conn = NULL;
+ if (conn) {
+ conn->noteClosure();
+ conn = nullptr;
+ }
Must(done());
}
idnsVCClosed(const CommCloseCbParams ¶ms)
{
nsvc * vc = (nsvc *)params.data;
+ if (vc->conn) {
+ vc->conn->noteClosure();
+ vc->conn = nullptr;
+ }
delete vc;
}
public:
Eui48() { clear(); }
- Eui48(const Eui48 &t) { memcpy(this, &t, sizeof(Eui48)); }
bool operator== (const Eui48 &t) const { return memcmp(eui, t.eui, SZ_EUI48_BUF) == 0; }
bool operator< (const Eui48 &t) const { return memcmp(eui, t.eui, SZ_EUI48_BUF) < 0; }
- ~Eui48() {}
const unsigned char *get(void);
return false;
}
- Must(AsyncJob::Start(new Rebuild(&dir, stats)));
+ AsyncJob::Start(new Rebuild(&dir, stats));
return true;
}
entry->lock("gopherState");
*replybuf = 0;
}
- ~GopherStateData() {if(buf) swanSong();}
- /* AsyncJob API emulated */
- void deleteThis(const char *aReason);
- void swanSong();
+ ~GopherStateData();
public:
StoreEntry *entry;
gopherStateFree(const CommCloseCbParams ¶ms)
{
GopherStateData *gopherState = (GopherStateData *)params.data;
-
- if (gopherState == NULL)
- return;
-
- gopherState->deleteThis("gopherStateFree");
+ // Assume that FwdState is monitoring and calls noteClosure(). See XXX about
+ // Connection sharing with FwdState in gopherStart().
+ delete gopherState;
}
-void
-GopherStateData::deleteThis(const char *)
-{
- swanSong();
- delete this;
-}
-
-void
-GopherStateData::swanSong()
+GopherStateData::~GopherStateData()
{
if (entry)
entry->unlock("gopherState");
- if (buf) {
+ if (buf)
memFree(buf, MEM_4K_BUF);
- buf = nullptr;
- }
}
/**
return;
}
+ // XXX: Sharing open Connection with FwdState that has its own handlers/etc.
gopherState->serverConn = fwd->serverConnection();
gopherSendRequest(fwd->serverConnection()->fd, gopherState);
AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "gopherTimeout",
#include "squid.h"
#if USE_IDENT
+#include "base/JobWait.h"
#include "comm.h"
#include "comm/Connection.h"
#include "comm/ConnOpener.h"
Comm::ConnectionPointer conn;
MemBuf queryMsg; ///< the lookup message sent to IDENT server
- IdentClient *clients;
+ IdentClient *clients = nullptr;
char buf[IDENT_BUFSIZE];
+
+ /// waits for a connection to the IDENT server to be established/opened
+ JobWait<Comm::ConnOpener> connWait;
+
+private:
+ // use deleteThis() to destroy
+ ~IdentStateData();
};
CBDATA_CLASS_INIT(IdentStateData);
Ident::IdentConfig Ident::TheConfig;
void
-Ident::IdentStateData::deleteThis(const char *)
+Ident::IdentStateData::deleteThis(const char *reason)
{
+ debugs(30, 3, reason);
swanSong();
delete this;
}
{
if (clients != NULL)
notify(NULL);
+}
+
+Ident::IdentStateData::~IdentStateData() {
+ assert(!clients);
if (Comm::IsConnOpen(conn)) {
comm_remove_close_handler(conn->fd, Ident::Close, this);
Ident::Close(const CommCloseCbParams ¶ms)
{
IdentStateData *state = (IdentStateData *)params.data;
+ if (state->conn) {
+ state->conn->noteClosure();
+ state->conn = nullptr;
+ }
state->deleteThis("connection closed");
}
Ident::ConnectDone(const Comm::ConnectionPointer &conn, Comm::Flag status, int, void *data)
{
IdentStateData *state = (IdentStateData *)data;
+ state->connWait.finish();
+
+ // Start owning the supplied connection (so that it is not orphaned if this
+ // function bails early). As a (tiny) optimization or perhaps just diff
+ // minimization, the close handler is added later, when we know we are not
+ // bailing. This delay is safe because comm_remove_close_handler() forgives
+ // missing handlers.
+ assert(conn); // but may be closed
+ assert(!state->conn);
+ state->conn = conn;
if (status != Comm::OK) {
if (status == Comm::TIMEOUT)
return;
}
- assert(conn != NULL && conn == state->conn);
- comm_add_close_handler(conn->fd, Ident::Close, state);
+ assert(state->conn->isOpen());
+ comm_add_close_handler(state->conn->fd, Ident::Close, state);
AsyncCall::Pointer writeCall = commCbCall(5,4, "Ident::WriteFeedback",
CommIoCbPtrFun(Ident::WriteFeedback, state));
state->hash.key = xstrdup(key);
// copy the conn details. We do not want the original FD to be re-used by IDENT.
- state->conn = conn->cloneIdentDetails();
+ const auto identConn = conn->cloneProfile();
// NP: use random port for secure outbound to IDENT_PORT
- state->conn->local.port(0);
- state->conn->remote.port(IDENT_PORT);
+ identConn->local.port(0);
+ identConn->remote.port(IDENT_PORT);
// build our query from the original connection details
state->queryMsg.init();
hash_join(ident_hash, &state->hash);
AsyncCall::Pointer call = commCbCall(30,3, "Ident::ConnectDone", CommConnectCbPtrFun(Ident::ConnectDone, state));
- AsyncJob::Start(new Comm::ConnOpener(state->conn, call, Ident::TheConfig.timeout));
+ const auto connOpener = new Comm::ConnOpener(identConn, call, Ident::TheConfig.timeout);
+ state->connWait.start(connOpener, call);
}
void
typedef CommCbMemFunT<TcpLogger, CommConnectCbParams> Dialer;
AsyncCall::Pointer call = JobCallback(MY_DEBUG_SECTION, 5, Dialer, this, Log::TcpLogger::connectDone);
- AsyncJob::Start(new Comm::ConnOpener(futureConn, call, 2));
+ const auto cs = new Comm::ConnOpener(futureConn, call, 2);
+ connWait.start(cs, call);
}
/// Comm::ConnOpener callback
void
Log::TcpLogger::connectDone(const CommConnectCbParams ¶ms)
{
+ connWait.finish();
+
if (params.flag != Comm::OK) {
const double delay = 0.5; // seconds
if (connectFailures++ % 100 == 0) {
{
assert(inCall != NULL);
closer = NULL;
- conn = NULL;
+ if (conn) {
+ conn->noteClosure();
+ conn = nullptr;
+ }
// in all current use cases, we should not try to reconnect
mustStop("Log::TcpLogger::handleClosure");
}
#define _SQUID_SRC_LOG_TCPLOGGER_H
#include "base/AsyncJob.h"
+#include "base/JobWait.h"
+#include "comm/forward.h"
#include "ip/Address.h"
#include <list>
Ip::Address remote; ///< where the remote logger expects our records
AsyncCall::Pointer closer; ///< handles unexpected/external conn closures
+ /// waits for a connection to the remote logger to be established/opened
+ JobWait<Comm::ConnOpener> connWait;
+
uint64_t connectFailures; ///< number of sequential connection failures
uint64_t drops; ///< number of records dropped during the current outage
};
Mgr::Forwarder::noteCommClosed(const CommCloseCbParams &)
{
debugs(16, 5, HERE);
- conn = NULL; // needed?
+ closer = nullptr;
+ if (conn) {
+ conn->noteClosure();
+ conn = nullptr;
+ }
mustStop("commClosed");
}
/// called when the HTTP client or some external force closed our socket
void
-Mgr::Inquirer::noteCommClosed(const CommCloseCbParams& params)
+Mgr::Inquirer::noteCommClosed(const CommCloseCbParams &)
{
debugs(16, 5, HERE);
- Must(!Comm::IsConnOpen(conn) && params.conn.getRaw() == conn.getRaw());
- conn = NULL;
+ closer = nullptr;
+ if (conn) {
+ conn->noteClosure();
+ conn = nullptr;
+ }
mustStop("commClosed");
}
Mgr::StoreToCommWriter::noteCommClosed(const CommCloseCbParams &)
{
debugs(16, 6, HERE);
- Must(!Comm::IsConnOpen(clientConnection));
+ if (clientConnection) {
+ clientConnection->noteClosure();
+ clientConnection = nullptr;
+ }
+ closer = nullptr;
mustStop("commClosed");
}
void
Security::PeerConnector::commCloseHandler(const CommCloseCbParams ¶ms)
{
+ debugs(83, 5, "FD " << params.fd << ", Security::PeerConnector=" << params.data);
+
closeHandler = nullptr;
+ if (serverConn) {
+ countFailingConnection();
+ serverConn->noteClosure();
+ serverConn = nullptr;
+ }
- debugs(83, 5, "FD " << params.fd << ", Security::PeerConnector=" << params.data);
const auto err = new ErrorState(ERR_SECURE_CONNECT_FAIL, Http::scServiceUnavailable, request.getRaw(), al);
static const auto d = MakeNamedErrorDetail("TLS_CONNECT_CLOSE");
err->detailError(d);
bool
Security::PeerConnector::initialize(Security::SessionPointer &serverSession)
{
+ Must(Comm::IsConnOpen(serverConnection()));
+
Security::ContextPointer ctx(getTlsContext());
debugs(83, 5, serverConnection() << ", ctx=" << (void*)ctx.get());
void
Security::PeerConnector::recordNegotiationDetails()
{
+ Must(Comm::IsConnOpen(serverConnection()));
+
const int fd = serverConnection()->fd;
Security::SessionPointer session(fd_table[fd].ssl);
void
Security::PeerConnector::negotiate()
{
+ Must(Comm::IsConnOpen(serverConnection()));
+
const int fd = serverConnection()->fd;
if (fd_table[fd].closing())
return;
switch (result.category) {
case Security::IoResult::ioSuccess:
recordNegotiationDetails();
- if (sslFinalized())
+ if (sslFinalized() && callback)
sendSuccess();
return; // we may be gone by now
{
#if USE_OPENSSL
if (Ssl::TheConfig.ssl_crt_validator && useCertValidator_) {
+ Must(Comm::IsConnOpen(serverConnection()));
const int fd = serverConnection()->fd;
Security::SessionPointer session(fd_table[fd].ssl);
Security::PeerConnector::sslCrtvdHandleReply(Ssl::CertValidationResponse::Pointer validationResponse)
{
Must(validationResponse != NULL);
+ Must(Comm::IsConnOpen(serverConnection()));
ErrorDetail::Pointer errDetails;
bool validatorFailed = false;
if (!errDetails && !validatorFailed) {
noteNegotiationDone(NULL);
- sendSuccess();
+ if (callback)
+ sendSuccess();
return;
}
Security::CertErrors *
Security::PeerConnector::sslCrtvdCheckForErrors(Ssl::CertValidationResponse const &resp, ErrorDetail::Pointer &errDetails)
{
+ Must(Comm::IsConnOpen(serverConnection()));
+
ACLFilledChecklist *check = NULL;
Security::SessionPointer session(fd_table[serverConnection()->fd].ssl);
void
Security::PeerConnector::noteWantRead()
{
- const int fd = serverConnection()->fd;
debugs(83, 5, serverConnection());
+ Must(Comm::IsConnOpen(serverConnection()));
+ const int fd = serverConnection()->fd;
+
// read timeout to avoid getting stuck while reading from a silent server
typedef CommCbMemFunT<Security::PeerConnector, CommTimeoutCbParams> TimeoutDialer;
AsyncCall::Pointer timeoutCall = JobCallback(83, 5,
void
Security::PeerConnector::noteWantWrite()
{
- const int fd = serverConnection()->fd;
debugs(83, 5, serverConnection());
+ Must(Comm::IsConnOpen(serverConnection()));
+
+ const int fd = serverConnection()->fd;
Comm::SetSelect(fd, COMM_SELECT_WRITE, &NegotiateSsl, new Pointer(this), 0);
return;
}
bail(anErr);
}
+Security::EncryptorAnswer &
+Security::PeerConnector::answer()
+{
+ assert(callback);
+ const auto dialer = dynamic_cast<CbDialer*>(callback->getDialer());
+ assert(dialer);
+ return dialer->answer();
+}
+
void
Security::PeerConnector::bail(ErrorState *error)
{
Must(error); // or the recipient will not know there was a problem
- Must(callback != NULL);
- CbDialer *dialer = dynamic_cast<CbDialer*>(callback->getDialer());
- Must(dialer);
- dialer->answer().error = error;
+ answer().error = error;
- if (const auto p = serverConnection()->getPeer())
- peerConnectFailed(p);
+ if (const auto failingConnection = serverConn) {
+ countFailingConnection();
+ disconnect();
+ failingConnection->close();
+ }
callBack();
- disconnect();
-
- if (noteFwdPconnUse)
- fwdPconnPool->noteUses(fd_table[serverConn->fd].pconn.uses);
- serverConn->close();
- serverConn = nullptr;
}
void
Security::PeerConnector::sendSuccess()
{
- callBack();
+ assert(Comm::IsConnOpen(serverConn));
+ answer().conn = serverConn;
disconnect();
+ callBack();
+}
+
+void
+Security::PeerConnector::countFailingConnection()
+{
+ assert(serverConn);
+ if (const auto p = serverConn->getPeer())
+ peerConnectFailed(p);
+ // TODO: Calling PconnPool::noteUses() should not be our responsibility.
+ if (noteFwdPconnUse && serverConn->isOpen())
+ fwdPconnPool->noteUses(fd_table[serverConn->fd].pconn.uses);
}
void
Security::PeerConnector::disconnect()
{
+ const auto stillOpen = Comm::IsConnOpen(serverConn);
+
if (closeHandler) {
- comm_remove_close_handler(serverConnection()->fd, closeHandler);
+ if (stillOpen)
+ comm_remove_close_handler(serverConn->fd, closeHandler);
closeHandler = nullptr;
}
- commUnsetConnTimeout(serverConnection());
+ if (stillOpen)
+ commUnsetConnTimeout(serverConn);
+
+ serverConn = nullptr;
}
void
Security::PeerConnector::callBack()
{
- debugs(83, 5, "TLS setup ended for " << serverConnection());
+ debugs(83, 5, "TLS setup ended for " << answer().conn);
AsyncCall::Pointer cb = callback;
// Do this now so that if we throw below, swanSong() assert that we _tried_
// to call back holds.
callback = NULL; // this should make done() true
- CbDialer *dialer = dynamic_cast<CbDialer*>(cb->getDialer());
- Must(dialer);
- dialer->answer().conn = serverConnection();
ScheduleCallHere(cb);
}
{
// XXX: unregister fd-closure monitoring and CommSetSelect interest, if any
AsyncJob::swanSong();
- if (callback != NULL) { // paranoid: we have left the caller waiting
- debugs(83, DBG_IMPORTANT, "BUG: Unexpected state while connecting to a cache_peer or origin server");
+
+ if (callback) {
+ // job-ending emergencies like handleStopRequest() or callException()
const auto anErr = new ErrorState(ERR_GATEWAY_FAILURE, Http::scInternalServerError, request.getRaw(), al);
bail(anErr);
assert(!callback);
buf.append("Stopped, reason:", 16);
buf.appendf("%s",stopReason);
}
- if (serverConn != NULL)
+ if (Comm::IsConnOpen(serverConn))
buf.appendf(" FD %d", serverConn->fd);
buf.appendf(" %s%u]", id.prefix(), id.value);
buf.terminate();
PeerConnectorCertDownloaderDialer(&Security::PeerConnector::certDownloadingDone, this));
const auto dl = new Downloader(url, certCallback, XactionInitiator::initCertFetcher, certDownloadNestingLevel() + 1);
- AsyncJob::Start(dl);
+ certDownloadWait.start(dl, certCallback);
}
void
Security::PeerConnector::certDownloadingDone(SBuf &obj, int downloadStatus)
{
+ certDownloadWait.finish();
+
++certsDownloads;
debugs(81, 5, "Certificate downloading status: " << downloadStatus << " certificate size: " << obj.length());
+ Must(Comm::IsConnOpen(serverConnection()));
const auto &sconn = *fd_table[serverConnection()->fd].ssl;
// Parse Certificate. Assume that it is in DER format.
void
Security::PeerConnector::handleMissingCertificates(const Security::IoResult &ioResult)
{
+ Must(Comm::IsConnOpen(serverConnection()));
auto &sconn = *fd_table[serverConnection()->fd].ssl;
// We download the missing certificate(s) once. We would prefer to clear
#include "acl/ChecklistFiller.h"
#include "base/AsyncCbdataCalls.h"
#include "base/AsyncJob.h"
+#include "base/JobWait.h"
#include "CommCalls.h"
#include "http/forward.h"
#include "security/EncryptorAnswer.h"
#include <queue>
class ErrorState;
+class Downloader;
class AccessLogEntry;
typedef RefCount<AccessLogEntry> AccessLogEntryPointer;
/// a bail(), sendSuccess() helper: stops monitoring the connection
void disconnect();
+ /// updates connection usage history before the connection is closed
+ void countFailingConnection();
+
/// If called the certificates validator will not used
void bypassCertValidator() {useCertValidator_ = false;}
/// logging
void recordNegotiationDetails();
+ /// convenience method to get to the answer fields
+ EncryptorAnswer &answer();
+
HttpRequestPointer request; ///< peer connection trigger or cause
Comm::ConnectionPointer serverConn; ///< TCP connection to the peer
AccessLogEntryPointer al; ///< info for the future access.log entry
/// outcome of the last (failed and) suspended negotiation attempt (or nil)
Security::IoResultPointer suspendedError_;
+
+ JobWait<Downloader> certDownloadWait; ///< waits for the missing certificate to be downloaded
};
} // namespace Security
typedef long ParsedPortFlags;
class PeerConnector;
+class BlindPeerConnector;
class PeerOptions;
#if USE_OPENSSL
dataConn(),
uploadAvailSize(0),
listener(),
- connector(),
+ dataConnWait(),
reader(),
waitingForOrigin(false),
originDataDownloadAbortedOnError(false)
// active transfer: open a data connection from Squid to client
typedef CommCbMemFunT<Server, CommConnectCbParams> Dialer;
- connector = JobCallback(17, 3, Dialer, this, Ftp::Server::connectedForData);
- Comm::ConnOpener *cs = new Comm::ConnOpener(dataConn, connector,
- Config.Timeout.connect);
- AsyncJob::Start(cs);
- return false; // ConnStateData::processFtpRequest waits handleConnectDone
+ AsyncCall::Pointer callback = JobCallback(17, 3, Dialer, this, Ftp::Server::connectedForData);
+ const auto cs = new Comm::ConnOpener(dataConn->cloneProfile(), callback,
+ Config.Timeout.connect);
+ dataConnWait.start(cs, callback);
+ return false;
}
/// Check that client data connection is ready for immediate I/O.
void
Ftp::Server::connectedForData(const CommConnectCbParams ¶ms)
{
- connector = NULL;
+ dataConnWait.finish();
if (params.flag != Comm::OK) {
- /* it might have been a timeout with a partially open link */
- if (params.conn != NULL)
- params.conn->close();
setReply(425, "Cannot open data connection.");
Http::StreamPointer context = pipeline.front();
Must(context->http);
Must(context->http->storeEntry() != NULL);
+ // TODO: call closeDataConnection() to reset data conn processing?
} else {
- Must(dataConn == params.conn);
+ // Finalize the details and start owning the supplied connection.
+ assert(params.conn);
+ assert(dataConn);
+ assert(!dataConn->isOpen());
+ dataConn = params.conn;
+ // XXX: Missing comm_add_close_handler() to track external closures.
+
Must(Comm::IsConnOpen(params.conn));
fd_note(params.conn->fd, "active client ftp data");
}
#ifndef SQUID_SERVERS_FTP_SERVER_H
#define SQUID_SERVERS_FTP_SERVER_H
+#include "base/JobWait.h"
#include "base/Lock.h"
#include "client_side.h"
+#include "comm/forward.h"
namespace Ftp
{
size_t uploadAvailSize; ///< number of yet unused uploadBuf bytes
AsyncCall::Pointer listener; ///< set when we are passively listening
- AsyncCall::Pointer connector; ///< set when we are actively connecting
+
+ /// Waits for an FTP data connection to the client to be established/opened.
+ /// This wait only happens in FTP active mode (via PORT or EPRT).
+ JobWait<Comm::ConnOpener> dataConnWait;
+
AsyncCall::Pointer reader; ///< set when we are reading FTP data
/// whether we wait for the origin data transfer to end
{
debugs(49, 5, HERE);
Must(fd == params.fd);
+ closer = nullptr;
fd = -1;
mustStop("commClosed");
}
Snmp::Forwarder::handleException(const std::exception& e)
{
debugs(49, 3, HERE << e.what());
- if (fd >= 0)
- sendError(SNMP_ERR_GENERR);
+ sendError(SNMP_ERR_GENERR);
Ipc::Forwarder::handleException(e);
}
Snmp::Forwarder::sendError(int error)
{
debugs(49, 3, HERE);
+
+ if (fd < 0)
+ return; // client gone
+
Snmp::Request& req = static_cast<Snmp::Request&>(*request);
req.pdu.command = SNMP_PDU_RESPONSE;
req.pdu.errstat = error;
{
debugs(49, 5, HERE);
Must(!Comm::IsConnOpen(conn) || conn->fd == params.conn->fd);
- conn = NULL;
+ closer = nullptr;
+ if (conn) {
+ conn->noteClosure();
+ conn = nullptr;
+ }
mustStop("commClosed");
}
Snmp::Inquirer::sendResponse()
{
debugs(49, 5, HERE);
+
+ if (!Comm::IsConnOpen(conn))
+ return; // client gone
+
aggrPdu.fixAggregate();
aggrPdu.command = SNMP_PDU_RESPONSE;
u_char buffer[SNMP_REQUEST_SIZE];
CBDATA_NAMESPACED_CLASS_INIT(Ssl, PeekingPeerConnector);
void
-Ssl::PeekingPeerConnector::cbCheckForPeekAndSpliceDone(Acl::Answer answer, void *data)
+Ssl::PeekingPeerConnector::cbCheckForPeekAndSpliceDone(const Acl::Answer aclAnswer, void *data)
{
Ssl::PeekingPeerConnector *peerConnect = (Ssl::PeekingPeerConnector *) data;
// Use job calls to add done() checks and other job logic/protections.
- CallJobHere1(83, 7, CbcPointer<PeekingPeerConnector>(peerConnect), Ssl::PeekingPeerConnector, checkForPeekAndSpliceDone, answer);
+ CallJobHere1(83, 7, CbcPointer<PeekingPeerConnector>(peerConnect), Ssl::PeekingPeerConnector, checkForPeekAndSpliceDone, aclAnswer);
}
void
-Ssl::PeekingPeerConnector::checkForPeekAndSpliceDone(Acl::Answer answer)
+Ssl::PeekingPeerConnector::checkForPeekAndSpliceDone(const Acl::Answer aclAnswer)
{
- const Ssl::BumpMode finalAction = answer.allowed() ?
- static_cast<Ssl::BumpMode>(answer.kind):
+ const Ssl::BumpMode finalAction = aclAnswer.allowed() ?
+ static_cast<Ssl::BumpMode>(aclAnswer.kind):
checkForPeekAndSpliceGuess();
checkForPeekAndSpliceMatched(finalAction);
}
splice = true;
// Ssl Negotiation stops here. Last SSL checks for valid certificates
// and if done, switch to tunnel mode
- if (sslFinalized()) {
- debugs(83,5, "Abort NegotiateSSL on FD " << serverConn->fd << " and splice the connection");
+ if (sslFinalized() && callback)
callBack();
- }
}
}
auto b = SSL_get_rbio(session.get());
auto srvBio = static_cast<Ssl::ServerBio*>(BIO_get_data(b));
+ debugs(83, 5, "will tunnel instead of negotiating TLS");
switchToTunnel(request.getRaw(), clientConn, serverConn, srvBio->rBufData());
- tunnelInsteadOfNegotiating();
+ answer().tunneled = true;
+ disconnect();
+ callBack();
}
void
}
}
-void
-Ssl::PeekingPeerConnector::tunnelInsteadOfNegotiating()
-{
- Must(callback != NULL);
- CbDialer *dialer = dynamic_cast<CbDialer*>(callback->getDialer());
- Must(dialer);
- dialer->answer().tunneled = true;
- debugs(83, 5, "The SSL negotiation with server aborted");
-}
-
void checkForPeekAndSplice();
/// Callback function for ssl_bump acl check in step3 SSL bump step.
- void checkForPeekAndSpliceDone(Acl::Answer answer);
+ void checkForPeekAndSpliceDone(Acl::Answer);
/// Handles the final bumping decision.
void checkForPeekAndSpliceMatched(const Ssl::BumpMode finalMode);
void startTunneling();
/// A wrapper function for checkForPeekAndSpliceDone for use with acl
- static void cbCheckForPeekAndSpliceDone(Acl::Answer answer, void *data);
+ static void cbCheckForPeekAndSpliceDone(Acl::Answer, void *data);
private:
#include "comm/Connection.h"
Comm::Connection::Connection() STUB
Comm::Connection::~Connection() STUB
-Comm::ConnectionPointer Comm::Connection::cloneIdentDetails() const STUB_RETVAL(nullptr)
-Comm::ConnectionPointer Comm::Connection::cloneDestinationDetails() const STUB_RETVAL(nullptr)
+Comm::ConnectionPointer Comm::Connection::cloneProfile() const STUB_RETVAL(nullptr)
void Comm::Connection::close() STUB
+void Comm::Connection::noteClosure() STUB
CachePeer * Comm::Connection::getPeer() const STUB_RETVAL(NULL)
void Comm::Connection::setPeer(CachePeer *) STUB
ScopedId Comm::Connection::codeContextGist() const STUB_RETVAL(id.detach())
#include "squid.h"
#include "AccessLogEntry.h"
#include "comm/Connection.h"
+#include "Downloader.h"
#include "HttpRequest.h"
#define STUB_API "security/libsecurity.la"
void PeerConnector::sendSuccess() STUB
void PeerConnector::callBack() STUB
void PeerConnector::disconnect() STUB
+void PeerConnector::countFailingConnection() STUB
void PeerConnector::recordNegotiationDetails() STUB
+EncryptorAnswer &PeerConnector::answer() STUB_RETREF(EncryptorAnswer)
}
#include "security/PeerOptions.h"
#include "squid.h"
#include "acl/FilledChecklist.h"
#include "base/CbcPointer.h"
+#include "base/JobWait.h"
#include "CachePeer.h"
#include "cbdata.h"
#include "client_side.h"
SBuf preReadClientData;
SBuf preReadServerData;
time_t startTime; ///< object creation time, before any peer selection/connection attempts
- /// Whether we are waiting for the CONNECT request/response exchange with the peer.
- bool waitingForConnectExchange;
- HappyConnOpenerPointer connOpener; ///< current connection opening job
ResolvedPeersPointer destinations; ///< paths for forwarding the request
bool destinationsFound; ///< At least one candidate path found
/// whether another destination may be still attempted if the TCP connection
// TODO: remove after fixing deferred reads in TunnelStateData::copyRead()
CodeContext::Pointer codeContext; ///< our creator context
- // AsyncCalls which we set and may need cancelling.
- struct {
- AsyncCall::Pointer connector; ///< a call linking us to the ConnOpener producing serverConn.
- } calls;
+ /// waits for a transport connection to the peer to be established/opened
+ JobWait<HappyConnOpener> transportWait;
+
+ /// waits for the established transport connection to be secured/encrypted
+ JobWait<Security::PeerConnector> encryptionWait;
+
+ /// waits for an HTTP CONNECT tunnel through a cache_peer to be negotiated
+ /// over the (encrypted, if needed) transport connection to that cache_peer
+ JobWait<Http::Tunneler> peerWait;
void copyRead(Connection &from, IOCB *completion);
/// when all candidate destinations have been tried and all have failed
void noteConnection(HappyConnOpenerAnswer &);
- /// whether we are waiting for HappyConnOpener
- /// same as calls.connector but may differ from connOpener.valid()
- bool opening() const { return connOpener.set(); }
-
- void cancelOpening(const char *reason);
-
/// Start using an established connection
void connectDone(const Comm::ConnectionPointer &conn, const char *origin, const bool reused);
/// \returns whether the request should be retried (nil) or the description why it should not
const char *checkRetry();
+ /// whether the successfully selected path destination or the established
+ /// server connection is still in use
+ bool usingDestination() const;
/// details of the "last tunneling attempt" failure (if it failed)
ErrorState *savedError = nullptr;
void deleteThis();
+ void cancelStep(const char *reason);
+
public:
bool keepGoingAfterRead(size_t len, Comm::Flag errcode, int xerrno, Connection &from, Connection &to);
void copy(size_t len, Connection &from, Connection &to, IOCB *);
TunnelStateData::TunnelStateData(ClientHttpRequest *clientRequest) :
startTime(squid_curtime),
- waitingForConnectExchange(false),
destinations(new ResolvedPeers()),
destinationsFound(false),
retriable(true),
debugs(26, 3, "TunnelStateData destructed this=" << this);
assert(noConnections());
xfree(url);
- if (opening())
- cancelOpening("~TunnelStateData");
+ cancelStep("~TunnelStateData");
delete savedError;
}
static void
tunnelStartShoveling(TunnelStateData *tunnelState)
{
- assert(!tunnelState->waitingForConnectExchange);
+ assert(!tunnelState->transportWait);
+ assert(!tunnelState->encryptionWait);
+ assert(!tunnelState->peerWait);
+
assert(tunnelState->server.conn);
AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "tunnelTimeout",
CommTimeoutCbPtrFun(tunnelTimeout, tunnelState));
void
TunnelStateData::tunnelEstablishmentDone(Http::TunnelerAnswer &answer)
{
+ peerWait.finish();
server.len = 0;
if (logTag_ptr)
if (answer.peerResponseStatus != Http::scNone)
*status_ptr = answer.peerResponseStatus;
- waitingForConnectExchange = false;
-
auto sawProblem = false;
if (!answer.positive()) {
sawProblem = true;
- Must(!Comm::IsConnOpen(answer.conn));
+ assert(!answer.conn);
} else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) {
sawProblem = true;
closePendingConnection(answer.conn, "conn was closed while waiting for tunnelEstablishmentDone");
void
TunnelStateData::noteConnection(HappyConnOpener::Answer &answer)
{
- calls.connector = nullptr;
- connOpener.clear();
+ transportWait.finish();
ErrorState *error = nullptr;
if ((error = answer.error.get())) {
AsyncCall::Pointer callback = asyncCall(5,4, "TunnelStateData::noteSecurityPeerConnectorAnswer",
MyAnswerDialer(&TunnelStateData::noteSecurityPeerConnectorAnswer, this));
const auto connector = new Security::BlindPeerConnector(request, conn, callback, al);
- AsyncJob::Start(connector); // will call our callback
+ encryptionWait.start(connector, callback);
}
/// starts a preparation step for an established connection; retries on failures
void
TunnelStateData::noteSecurityPeerConnectorAnswer(Security::EncryptorAnswer &answer)
{
+ encryptionWait.finish();
+
ErrorState *error = nullptr;
+ assert(!answer.tunneled);
if ((error = answer.error.get())) {
- Must(!Comm::IsConnOpen(answer.conn));
+ assert(!answer.conn);
answer.error.clear();
} else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) {
error = new ErrorState(ERR_CANNOT_FORWARD, Http::scServiceUnavailable, request.getRaw(), al);
void
TunnelStateData::establishTunnelThruProxy(const Comm::ConnectionPointer &conn)
{
- assert(!waitingForConnectExchange);
-
AsyncCall::Pointer callback = asyncCall(5,4,
"TunnelStateData::tunnelEstablishmentDone",
Http::Tunneler::CbDialer<TunnelStateData>(&TunnelStateData::tunnelEstablishmentDone, this));
#if USE_DELAY_POOLS
tunneler->setDelayId(server.delayId);
#endif
- AsyncJob::Start(tunneler);
- waitingForConnectExchange = true;
- // and wait for the tunnelEstablishmentDone() call
+ peerWait.start(tunneler, callback);
}
void
destinations->addPath(path);
- if (Comm::IsConnOpen(server.conn)) {
+ if (usingDestination()) {
// We are already using a previously opened connection but also
// receiving destinations in case we need to re-forward.
- Must(!opening());
+ Must(!transportWait);
return;
}
- if (opening()) {
+ if (transportWait) {
notifyConnOpener();
return; // and continue to wait for tunnelConnectDone() callback
}
// if all of them fail, tunneling as whole will fail
Must(!selectionError); // finding at least one path means selection succeeded
- if (Comm::IsConnOpen(server.conn)) {
+ if (usingDestination()) {
// We are already using a previously opened connection but also
// receiving destinations in case we need to re-forward.
- Must(!opening());
+ Must(!transportWait);
return;
}
- Must(opening()); // or we would be stuck with nothing to do or wait for
+ Must(transportWait); // or we would be stuck with nothing to do or wait for
notifyConnOpener();
}
+bool
+TunnelStateData::usingDestination() const
+{
+ return encryptionWait || peerWait || Comm::IsConnOpen(server.conn);
+}
+
/// remembers an error to be used if there will be no more connection attempts
void
TunnelStateData::saveError(ErrorState *error)
if (request)
request->hier.stopPeerClock(false);
- if (opening())
- cancelOpening(reason);
+ cancelStep(reason);
assert(finalError);
errorSend(client.conn, finalError);
}
-/// Notify connOpener that we no longer need connections. We do not have to do
-/// this -- connOpener would eventually notice on its own, but notifying reduces
-/// waste and speeds up spare connection opening for other transactions (that
-/// could otherwise wait for this transaction to use its spare allowance).
+/// Notify a pending subtask, if any, that we no longer need its help. We do not
+/// have to do this -- the subtask job will eventually end -- but ending it
+/// earlier reduces waste and may reduce DoS attack surface.
void
-TunnelStateData::cancelOpening(const char *reason)
+TunnelStateData::cancelStep(const char *reason)
{
- assert(calls.connector);
- calls.connector->cancel(reason);
- calls.connector = nullptr;
- notifyConnOpener();
- connOpener.clear();
+ transportWait.cancel(reason);
+ encryptionWait.cancel(reason);
+ peerWait.cancel(reason);
}
void
request->hier.startPeerClock();
assert(!destinations->empty());
- assert(!opening());
- calls.connector = asyncCall(17, 5, "TunnelStateData::noteConnection", HappyConnOpener::CbDialer<TunnelStateData>(&TunnelStateData::noteConnection, this));
- const auto cs = new HappyConnOpener(destinations, calls.connector, request, startTime, 0, al);
+ assert(!usingDestination());
+ AsyncCall::Pointer callback = asyncCall(17, 5, "TunnelStateData::noteConnection", HappyConnOpener::CbDialer<TunnelStateData>(&TunnelStateData::noteConnection, this));
+ const auto cs = new HappyConnOpener(destinations, callback, request, startTime, 0, al);
cs->setHost(request->url.host());
cs->setRetriable(false);
cs->allowPersistent(false);
destinations->notificationPending = true; // start() is async
- connOpener = cs;
- AsyncJob::Start(cs);
+ transportWait.start(cs, callback);
}
/// send request on an existing connection dedicated to the requesting client
#endif
-/// makes sure connOpener knows that destinations have changed
+/// makes sure connection opener knows that the destinations have changed
void
TunnelStateData::notifyConnOpener()
{
debugs(17, 7, "reusing pending notification");
} else {
destinations->notificationPending = true;
- CallJobHere(17, 5, connOpener, HappyConnOpener, noteCandidatesChange);
+ CallJobHere(17, 5, transportWait.job(), HappyConnOpener, noteCandidatesChange);
}
}
{
WhoisState *p = (WhoisState *)params.data;
debugs(75, 3, "whoisClose: FD " << params.fd);
+ // We do not own a Connection. Assume that FwdState is also monitoring.
p->entry->unlock("whoisClose");
delete p;
}