#include "neighbors.h"
#include "pconn.h"
#include "PeerPoolMgr.h"
-#include "PeerSelectState.h"
#include "security/BlindPeerConnector.h"
#include "SquidConfig.h"
#include "SquidTime.h"
#include <cerrno>
-static PSC fwdPeerSelectionCompleteWrapper;
static CLCB fwdServerClosedWrapper;
static CNCB fwdConnectDoneWrapper;
debugs(17, 7, HERE << "store entry aborted; no connection to close");
}
fwd->serverDestinations.clear();
- fwd->self = NULL;
+ fwd->stopAndDestroy("store entry aborted");
}
void
#endif
// do full route options selection
- peerSelect(&serverDestinations, request, al, entry, fwdPeerSelectionCompleteWrapper, this);
+ startSelectingDestinations(request, al, entry);
+}
+
+/// ends forwarding; relies on refcounting so the effect may not be immediate
+void
+FwdState::stopAndDestroy(const char *reason)
+{
+ debugs(17, 3, "for " << reason);
+ PeerSelectionInitiator::subscribed = false; // may already be false
+ self = nullptr; // we hope refcounting destroys us soon; may already be nil
+ /* do not place any code here as this object may be gone by now */
}
#if STRICT_ORIGINAL_DST
connectStart();
} else {
+ if (PeerSelectionInitiator::subscribed) {
+ debugs(17, 4, "wait for more destinations to try");
+ return; // expect a noteDestination*() call
+ }
+
debugs(17, 3, HERE << "Connection failed: " << entry->url());
if (!err) {
ErrorState *anErr = new ErrorState(ERR_CANNOT_FORWARD, Http::scInternalServerError, request);
fail(anErr);
} // else use actual error from last connection attempt
- self = NULL; // refcounted
+
+ stopAndDestroy("tried all destinations");
}
}
entry->reset();
// drop the last path off the selection list. try the next one.
- serverDestinations.erase(serverDestinations.begin());
+ if (!serverDestinations.empty()) // paranoid
+ serverDestinations.erase(serverDestinations.begin());
startConnectionOrFail();
} else {
if (!Comm::IsConnOpen(serverConn))
completed();
- self = NULL; // refcounted
+ stopAndDestroy("forwarding completed");
}
}
-/**** CALLBACK WRAPPERS ************************************************************/
+void
+FwdState::noteDestination(Comm::ConnectionPointer path)
+{
+ const bool wasBlocked = serverDestinations.empty();
+ serverDestinations.push_back(path);
+ if (wasBlocked)
+ startConnectionOrFail();
+ // else continue to use one of the previously noted destinations;
+ // if all of them fail, we may try this path
+}
-static void
-fwdPeerSelectionCompleteWrapper(Comm::ConnectionList *, ErrorState *err, void *data)
+void
+FwdState::noteDestinationsEnd(ErrorState *selectionError)
{
- FwdState *fwd = (FwdState *) data;
- if (err)
- fwd->fail(err);
- fwd->startConnectionOrFail();
+ PeerSelectionInitiator::subscribed = false;
+ if (const bool wasBlocked = serverDestinations.empty()) {
+
+ if (selectionError) {
+ debugs(17, 3, "Will abort forwarding because path selection has failed.");
+ Must(!err); // if we tried to connect, then path selection succeeded
+ fail(selectionError);
+ }
+ else if (err)
+ debugs(17, 3, "Will abort forwarding because all found paths have failed.");
+ else
+ debugs(17, 3, "Will abort forwarding because path selection found no paths.");
+
+ startConnectionOrFail(); // will choose the OrFail code path
+ return;
+ }
+ // else continue to use one of the previously noted destinations;
+ // if all of them fail, forwarding as whole will fail
+ Must(!selectionError); // finding at least one path means selection succeeded
}
+/**** CALLBACK WRAPPERS ************************************************************/
+
static void
fwdServerClosedWrapper(const CommCloseCbParams ¶ms)
{
errorAppendEntry(entry, anErr);
}
- self = NULL; // refcounted
+ stopAndDestroy("cannot retry");
}
// If the Server quits before nibbling at the request body, the body sender
debugs(50, 4, "fwdConnectStart: Ssl bumped connections through parent proxy are not allowed");
ErrorState *anErr = new ErrorState(ERR_CANNOT_FORWARD, Http::scServiceUnavailable, request);
fail(anErr);
- self = NULL; // refcounted
+ stopAndDestroy("SslBump misconfiguration");
return;
}
debugs(17,2,HERE << "Pinned connection failed: " << pinned_connection);
ErrorState *anErr = new ErrorState(ERR_ZERO_SIZE_OBJECT, Http::scServiceUnavailable, request);
fail(anErr);
- self = NULL; // refcounted
+ stopAndDestroy("pinned connection failure");
return;
}
if (request->bodyNibbled())
return 0;
- if (serverDestinations.size() <= 1) {
+ if (serverDestinations.size() <= 1 && !PeerSelectionInitiator::subscribed) {
// NP: <= 1 since total count includes the recently failed one.
debugs(17, 3, HERE << "No alternative forwarding paths left");
return 0;
#include "fde.h"
#include "http/StatusCode.h"
#include "ip/Address.h"
+#include "PeerSelectState.h"
#include "security/forward.h"
#if USE_OPENSSL
#include "ssl/support.h"
class HelperReply;
-class FwdState : public RefCountable
+class FwdState: public RefCountable, public PeerSelectionInitiator
{
- CBDATA_CLASS(FwdState);
+ CBDATA_CHILD(FwdState);
public:
typedef RefCount<FwdState> Pointer;
- ~FwdState();
+ virtual ~FwdState();
static void initModule();
/// Initiates request forwarding to a peer or origin server.
// hidden for safer management of self; use static fwdStart
FwdState(const Comm::ConnectionPointer &client, StoreEntry *, HttpRequest *, const AccessLogEntryPointer &alp);
void start(Pointer aSelf);
+ void stopAndDestroy(const char *reason);
+
+ /* PeerSelectionInitiator API */
+ virtual void noteDestination(Comm::ConnectionPointer conn) override;
+ virtual void noteDestinationsEnd(ErrorState *selectionError) override;
#if STRICT_ORIGINAL_DST
void selectPeerForIntercepted();
wordlist.h \
wordlist.cc
nodist_tests_testHttpReply_SOURCES=\
- $(TESTSOURCES)
+ $(TESTSOURCES) \
+ hier_code.cc
tests_testHttpReply_LDFLAGS = $(LIBADD_DL)
tests_testHttpReply_LDADD=\
CommCalls.o \
tests/stub_tools.cc
nodist_tests_testDiskIO_SOURCES= \
$(TESTSOURCES) \
+ hier_code.cc \
SquidMath.cc \
SquidMath.h \
swap_log_op.cc
nodist_tests_testUfs_SOURCES = \
$(TESTSOURCES) \
+ hier_code.cc \
SquidMath.cc \
SquidMath.h \
swap_log_op.cc
$(DELAY_POOL_SOURCE) \
$(UNLINKDSOURCE)
nodist_tests_testRock_SOURCES = \
- swap_log_op.cc \
+ $(TESTSOURCES) \
+ hier_code.cc \
SquidMath.cc \
SquidMath.h \
- $(TESTSOURCES)
+ swap_log_op.cc
tests_testRock_LDADD = \
http/libhttp.la \
parser/libparser.la \
class StoreEntry;
class ErrorState;
-typedef void PSC(Comm::ConnectionList *, ErrorState *, void *);
-
-void peerSelect(Comm::ConnectionList *, HttpRequest *, AccessLogEntry::Pointer const&, StoreEntry *, PSC *, void *data);
void peerSelectInit(void);
-/**
- * A CachePeer which has been selected as a possible destination.
- * Listed as pointers here so as to prevent duplicates being added but will
- * be converted to a set of IP address path options before handing back out
- * to the caller.
- *
- * Certain connection flags and outgoing settings will also be looked up and
- * set based on the received request and CachePeer settings before handing back.
- */
-class FwdServer
+/// Interface for those who need a list of peers to forward a request to.
+class PeerSelectionInitiator: public CbdataParent
{
- MEMPROXY_CLASS(FwdServer);
-
public:
- FwdServer(CachePeer *p, hier_code c) :
- _peer(p),
- code(c),
- next(nullptr)
- {}
-
- CbcPointer<CachePeer> _peer; /* NULL --> origin server */
- hier_code code;
- FwdServer *next;
+ virtual ~PeerSelectionInitiator() = default;
+
+ /// called when a new unique destination has been found
+ virtual void noteDestination(Comm::ConnectionPointer path) = 0;
+
+ /// called when there will be no more noteDestination() calls
+ /// \param error is a possible reason why no destinations were found; it is
+ /// guaranteed to be nil if there was at least one noteDestination() call
+ virtual void noteDestinationsEnd(ErrorState *error) = 0;
+
+ /// whether noteDestination() and noteDestinationsEnd() calls are allowed
+ bool subscribed = false;
+
+/* protected: */
+ /// Initiates asynchronous peer selection that eventually
+ /// results in zero or more noteDestination() calls and
+ /// exactly one noteDestinationsEnd() call.
+ void startSelectingDestinations(HttpRequest *request, const AccessLogEntry::Pointer &ale, StoreEntry *entry);
};
+class FwdServer;
+
class ps_state
{
CBDATA_CLASS(ps_state);
public:
- ps_state();
+ explicit ps_state(PeerSelectionInitiator *initiator);
~ps_state();
// Produce a URL for display identifying the transaction we are
// trying to locate a peer for.
const SBuf url() const;
+ /// \returns valid/interested peer initiator or nil
+ PeerSelectionInitiator *interestedInitiator();
+
+ /// \returns whether the initiator may use more destinations
+ bool wantsMoreDestinations() const;
+
+ /// processes a newly discovered/finalized path
+ void handlePath(Comm::ConnectionPointer &path, FwdServer &fs);
+
HttpRequest *request;
AccessLogEntry::Pointer al; ///< info for the future access.log entry
StoreEntry *entry;
allow_t always_direct;
allow_t never_direct;
int direct; // TODO: fold always_direct/never_direct/prefer_direct into this now that ACL can do a multi-state result.
- PSC *callback;
- void *callback_data;
+ size_t foundPaths = 0; ///< number of unique destinations identified so far
+ void *peerCountMcastPeerXXX = nullptr; ///< a hack to help peerCountMcastPeersStart()
ErrorState *lastError;
- Comm::ConnectionList *paths; ///< the callers paths array. to be filled with our final results.
FwdServer *servers; ///< temporary linked list of peers we will pass back.
/*
peer_t hit_type;
ping_data ping;
ACLChecklist *acl_checklist;
+
+ const InstanceId<ps_state> id; ///< unique identification in worker log
+
+private:
+
+ typedef CbcPointer<PeerSelectionInitiator> Initiator;
+ Initiator initiator_; ///< recipient of the destinations we select; use interestedInitiator() to access
};
#endif /* SQUID_PEERSELECTSTATE_H */
#include "security/NegotiationHistory.h"
#include "SquidConfig.h"
#include "SquidTime.h"
+#include <ostream>
class CachePeer;
bool
return min(ctimeout, ftimeout);
}
+std::ostream &
+operator << (std::ostream &os, const Comm::Connection &conn)
+{
+ os << "local=" << conn.local << " remote=" << conn.remote;
+ if (conn.peerType)
+ os << ' ' << hier_code_str[conn.peerType];
+ if (conn.fd >= 0)
+ os << " FD " << conn.fd;
+ if (conn.flags != COMM_UNSET)
+ os << " flags=" << conn.flags;
+#if USE_IDENT
+ if (*conn.rfc931)
+ os << " IDENT::" << conn.rfc931;
+#endif
+ return os;
+}
}; // namespace Comm
-// NP: Order and namespace here is very important.
-// * The second define inlines the first.
-// * Stream inheritance overloading is searched in the global scope first.
-
-inline std::ostream &
-operator << (std::ostream &os, const Comm::Connection &conn)
-{
- os << "local=" << conn.local << " remote=" << conn.remote;
- if (conn.fd >= 0)
- os << " FD " << conn.fd;
- if (conn.flags != COMM_UNSET)
- os << " flags=" << conn.flags;
-#if USE_IDENT
- if (*conn.rfc931)
- os << " IDENT::" << conn.rfc931;
-#endif
- return os;
-}
+std::ostream &operator << (std::ostream &os, const Comm::Connection &conn);
inline std::ostream &
operator << (std::ostream &os, const Comm::ConnectionPointer &conn)
static void
peerCountMcastPeersStart(void *data)
{
+ // XXX: Do not create lots of complex fake objects (while abusing their
+ // APIs) to pass around a few basic data points like start_ping and ping!
CachePeer *p = (CachePeer *)data;
ps_state *psstate;
StoreEntry *fake;
strcat(url, "/");
fake = storeCreateEntry(url, url, RequestFlags(), Http::METHOD_GET);
HttpRequest *req = HttpRequest::CreateFromUrl(url);
- psstate = new ps_state;
+ psstate = new ps_state(nullptr);
psstate->request = req;
HTTPMSGLOCK(psstate->request);
psstate->entry = fake;
- psstate->callback = NULL;
- psstate->callback_data = cbdataReference(p);
+ psstate->peerCountMcastPeerXXX = cbdataReference(p);
psstate->ping.start = current_time;
mem = fake->mem_obj;
mem->request = psstate->request;
ps_state *psstate = (ps_state *)data;
StoreEntry *fake = psstate->entry;
- if (cbdataReferenceValid(psstate->callback_data)) {
- CachePeer *p = (CachePeer *)psstate->callback_data;
+ if (cbdataReferenceValid(psstate->peerCountMcastPeerXXX)) {
+ CachePeer *p = (CachePeer *)psstate->peerCountMcastPeerXXX;
p->mcast.flags.counting = false;
p->mcast.avg_n_members = Math::doubleAverage(p->mcast.avg_n_members, (double) psstate->ping.n_recv, ++p->mcast.n_times_counted, 10);
debugs(15, DBG_IMPORTANT, "Group " << p->host << ": " << psstate->ping.n_recv <<
p->mcast.n_replies_expected = (int) p->mcast.avg_n_members;
}
- cbdataReferenceDone(psstate->callback_data);
+ cbdataReferenceDone(psstate->peerCountMcastPeerXXX);
fake->abort(); // sets ENTRY_ABORTED and initiates releated cleanup
fake->mem_obj->request = nullptr;
#include "squid.h"
#include "acl/FilledChecklist.h"
+#include "base/InstanceId.h"
#include "CachePeer.h"
#include "carp.h"
#include "client_side.h"
#include "Store.h"
#include "URL.h"
+/**
+ * A CachePeer which has been selected as a possible destination.
+ * Listed as pointers here so as to prevent duplicates being added but will
+ * be converted to a set of IP address path options before handing back out
+ * to the caller.
+ *
+ * Certain connection flags and outgoing settings will also be looked up and
+ * set based on the received request and CachePeer settings before handing back.
+ */
+class FwdServer
+{
+ MEMPROXY_CLASS(FwdServer);
+
+public:
+ FwdServer(CachePeer *p, hier_code c) :
+ _peer(p),
+ code(c),
+ next(nullptr)
+ {}
+
+ CbcPointer<CachePeer> _peer; /* NULL --> origin server */
+ hier_code code;
+ FwdServer *next;
+};
+
static struct {
int timeouts;
} PeerStats;
return n;
}
-void
-peerSelect(Comm::ConnectionList * paths,
+static void
+peerSelect(PeerSelectionInitiator *initiator,
HttpRequest * request,
AccessLogEntry::Pointer const &al,
- StoreEntry * entry,
- PSC * callback,
- void *callback_data)
+ StoreEntry * entry)
{
- ps_state *psstate;
-
if (entry)
debugs(44, 3, *entry << ' ' << entry->url());
else
debugs(44, 3, request->method);
- psstate = new ps_state;
+ const auto psstate = new ps_state(initiator);
psstate->request = request;
HTTPMSGLOCK(psstate->request);
psstate->al = al;
psstate->entry = entry;
- psstate->paths = paths;
-
- psstate->callback = callback;
-
- psstate->callback_data = cbdataReference(callback_data);
#if USE_CACHE_DIGESTS
peerSelectFoo(psstate);
}
+void
+PeerSelectionInitiator::startSelectingDestinations(HttpRequest *request, const AccessLogEntry::Pointer &ale, StoreEntry *entry)
+{
+ subscribed = true;
+ peerSelect(this, request, ale, entry);
+ // and wait for noteDestination() and/or noteDestinationsEnd() calls
+}
+
static void
peerCheckNeverDirectDone(allow_t answer, void *data)
{
peerSelectFoo(psstate);
}
+/// \returns true (after destroying psstate) if the peer initiator is gone
+/// \returns false (without side effects) otherwise
+static bool
+peerSelectionAborted(ps_state *psstate)
+{
+ if (psstate->interestedInitiator())
+ return false;
+
+ debugs(44, 3, "Aborting peer selection: Initiator gone or lost interest.");
+ delete psstate;
+ return true;
+}
+
void
peerSelectDnsPaths(ps_state *psstate)
{
- FwdServer *fs = psstate->servers;
-
- if (!cbdataReferenceValid(psstate->callback_data)) {
- debugs(44, 3, "Aborting peer selection. Parent Job went away.");
- delete psstate;
+ if (peerSelectionAborted(psstate))
return;
- }
+
+ FwdServer *fs = psstate->servers;
// Bug 3243: CVE 2009-0801
// Bypass of browser same-origin access control in intercepted communication
// construct a "result" adding the ORIGINAL_DST to the set instead of DIRECT
Comm::ConnectionPointer p = new Comm::Connection();
p->remote = req->clientConnectionManager->clientConnection->local;
- p->peerType = ORIGINAL_DST; // fs->code is DIRECT. This fixes the display.
- p->setPeer(fs->_peer.get());
-
- // check for a configured outgoing address for this destination...
- getOutgoingAddress(psstate->request, p);
- psstate->paths->push_back(p);
+ fs->code = ORIGINAL_DST; // fs->code is DIRECT. This fixes the display.
+ psstate->handlePath(p, *fs);
}
// clear the used fs and continue
}
// convert the list of FwdServer destinations into destinations IP addresses
- if (fs && psstate->paths->size() < (unsigned int)Config.forward_max_tries) {
+ if (fs && psstate->wantsMoreDestinations()) {
// send the next one off for DNS lookup.
const char *host = fs->_peer.valid() ? fs->_peer->host : psstate->request->url.host();
debugs(44, 2, "Find IP destination for: " << psstate->url() << "' via " << host);
// Bug 3605: clear any extra listed FwdServer destinations, when the options exceeds max_foward_tries.
// due to the allocation method of fs, we must deallocate each manually.
// TODO: use a std::list so we can get the size and abort adding whenever the selection loops reach Config.forward_max_tries
- if (fs && psstate->paths->size() >= (unsigned int)Config.forward_max_tries) {
+ if (fs) {
assert(fs == psstate->servers);
while (fs) {
psstate->servers = fs->next;
}
// done with DNS lookups. pass back to caller
- PSC *callback = psstate->callback;
- psstate->callback = NULL;
- debugs(44, 2, (psstate->paths->size()<1?"Failed to select source":"Found sources") << " for '" << psstate->url() << "'");
+ debugs(44, 2, psstate->id << " found all " << psstate->foundPaths << " destinations for " << psstate->url());
debugs(44, 2, " always_direct = " << psstate->always_direct);
debugs(44, 2, " never_direct = " << psstate->never_direct);
- if (psstate->paths) {
- for (size_t i = 0; i < psstate->paths->size(); ++i) {
- if ((*psstate->paths)[i]->peerType == HIER_DIRECT)
- debugs(44, 2, " DIRECT = " << (*psstate->paths)[i]);
- else if ((*psstate->paths)[i]->peerType == ORIGINAL_DST)
- debugs(44, 2, " ORIGINAL_DST = " << (*psstate->paths)[i]);
- else if ((*psstate->paths)[i]->peerType == PINNED)
- debugs(44, 2, " PINNED = " << (*psstate->paths)[i]);
- else
- debugs(44, 2, " cache_peer = " << (*psstate->paths)[i]);
- }
- }
debugs(44, 2, " timedout = " << psstate->ping.timedout);
psstate->ping.stop = current_time;
- psstate->request->hier.ping = psstate->ping;
+ psstate->request->hier.ping = psstate->ping; // final result
- void *cbdata;
- if (cbdataReferenceValidDone(psstate->callback_data, &cbdata)) {
- callback(psstate->paths, psstate->lastError, cbdata);
- psstate->lastError = NULL; // FwdState has taken control over the ErrorState object.
+ if (psstate->lastError && psstate->foundPaths) {
+ // nobody cares about errors if we found destinations despite them
+ debugs(44, 3, "forgetting the last error");
+ delete psstate->lastError;
+ psstate->lastError = nullptr;
}
+ if (const auto initiator = psstate->interestedInitiator())
+ initiator->noteDestinationsEnd(psstate->lastError);
+ psstate->lastError = nullptr; // initiator owns the ErrorState object now
delete psstate;
}
peerSelectDnsResults(const ipcache_addrs *ia, const Dns::LookupDetails &details, void *data)
{
ps_state *psstate = (ps_state *)data;
-
- if (!cbdataReferenceValid(psstate->callback_data)) {
- debugs(44, 3, "Aborting peer selection. Parent Job went away.");
- delete psstate;
+ if (peerSelectionAborted(psstate))
return;
- }
psstate->request->recordLookup(details);
if (ip >= ia->count) ip = 0; // looped back to zero.
- // Enforce forward_max_tries configuration.
- if (psstate->paths->size() >= (unsigned int)Config.forward_max_tries)
+ if (!psstate->wantsMoreDestinations())
break;
// for TPROXY spoofing we must skip unusable addresses.
}
p->remote.port(fs->_peer.valid() ? fs->_peer->http_port : psstate->request->url.port());
- p->peerType = fs->code;
- p->setPeer(fs->_peer.get());
- // check for a configured outgoing address for this destination...
- getOutgoingAddress(psstate->request, p);
- psstate->paths->push_back(p);
+ psstate->handlePath(p, *fs);
}
} else {
debugs(44, 3, "Unknown host: " << (fs->_peer.valid() ? fs->_peer->host : psstate->request->url.host()));
static void
peerSelectFoo(ps_state * ps)
{
- if (!cbdataReferenceValid(ps->callback_data)) {
- debugs(44, 3, "Aborting peer selection. Parent Job went away.");
- delete ps;
+ if (peerSelectionAborted(ps))
return;
- }
StoreEntry *entry = ps->entry;
HttpRequest *request = ps->request;
peerPingTimeout(void *data)
{
ps_state *psstate = (ps_state *)data;
- StoreEntry *entry = psstate->entry;
+ debugs(44, 3, psstate->url());
- if (entry)
- debugs(44, 3, psstate->url());
-
- if (!cbdataReferenceValid(psstate->callback_data)) {
- /* request aborted */
- if (entry)
- entry->ping_status = PING_DONE;
- cbdataReferenceDone(psstate->callback_data);
- delete psstate;
+ if (StoreEntry *entry = psstate->entry)
+ entry->ping_status = PING_DONE;
+
+ if (peerSelectionAborted(psstate))
return;
- }
++PeerStats.timeouts;
psstate->ping.timedout = 1;
*FSVR = fs;
}
-ps_state::ps_state() : request (NULL),
+ps_state::ps_state(PeerSelectionInitiator *initiator):
+ request(nullptr),
entry (NULL),
always_direct(Config.accessList.AlwaysDirect?ACCESS_DUNNO:ACCESS_DENIED),
never_direct(Config.accessList.NeverDirect?ACCESS_DUNNO:ACCESS_DENIED),
direct(DIRECT_UNKNOWN),
- callback (NULL),
- callback_data (NULL),
lastError(NULL),
- paths(NULL),
servers (NULL),
first_parent_miss(),
closest_parent_miss(),
hit(NULL),
hit_type(PEER_NONE),
- acl_checklist (NULL)
+ acl_checklist (NULL),
+ initiator_(initiator)
{
; // no local defaults.
}
return noUrl;
}
+/// \returns valid/interested peer initiator or nil
+PeerSelectionInitiator *
+ps_state::interestedInitiator()
+{
+ const auto initiator = initiator_.valid();
+
+ if (!initiator) {
+ debugs(44, 3, id << " initiator gone");
+ return nullptr;
+ }
+
+ if (!initiator->subscribed) {
+ debugs(44, 3, id << " initiator lost interest");
+ return nullptr;
+ }
+
+ return initiator;
+}
+
+bool
+ps_state::wantsMoreDestinations() const {
+ const auto maxCount = Config.forward_max_tries;
+ return maxCount >= 0 && foundPaths <
+ static_cast<std::make_unsigned<decltype(maxCount)>::type>(maxCount);
+}
+
+void
+ps_state::handlePath(Comm::ConnectionPointer &path, FwdServer &fs)
+{
+ ++foundPaths;
+
+ path->peerType = fs.code;
+ path->setPeer(fs._peer.get());
+
+ // check for a configured outgoing address for this destination...
+ getOutgoingAddress(request, path);
+
+ request->hier.ping = ping; // may be updated later
+
+ debugs(44, 2, id << " found " << path << ", destination #" << foundPaths << " for " << url());
+ debugs(44, 2, " always_direct = " << always_direct);
+ debugs(44, 2, " never_direct = " << never_direct);
+ debugs(44, 2, " timedout = " << ping.timedout);
+
+ if (const auto initiator = interestedInitiator())
+ initiator->noteDestination(path);
+}
+
+InstanceIdDefinitions(ps_state, "PeerSelector");
+
ping_data::ping_data() :
n_sent(0),
n_recv(0),
#define STUB_API "comm.cc"
#include "tests/STUB.h"
+#include <ostream>
+
void comm_read(const Comm::ConnectionPointer &conn, char *buf, int size, IOCB *handler, void *handler_data) STUB
void comm_read(const Comm::ConnectionPointer &conn, char*, int, AsyncCall::Pointer &callback) STUB
bool commHasHalfClosedMonitor(int fd) STUB_RETVAL(false)
int CommSelectEngine::checkEvents(int timeout) STUB_RETVAL(0)
+std::ostream &operator << (std::ostream &os, const Comm::Connection &conn) STUB_RETVAL(os << "[Connection object]")
*
* TODO 2: then convert this into a AsyncJob, possibly a child of 'Server'
*/
-class TunnelStateData
+class TunnelStateData: public PeerSelectionInitiator
{
- CBDATA_CLASS(TunnelStateData);
+ CBDATA_CHILD(TunnelStateData);
public:
TunnelStateData(ClientHttpRequest *);
- ~TunnelStateData();
+ virtual ~TunnelStateData();
TunnelStateData(const TunnelStateData &); // do not implement
TunnelStateData &operator =(const TunnelStateData &); // do not implement
/// recovering from the previous connect failure
void startConnecting();
+ void noteConnectFailure(const Comm::ConnectionPointer &conn);
+
class Connection
{
/// continue to set up connection to a peer, going async for SSL peers
void connectToPeer();
+ /* PeerSelectionInitiator API */
+ virtual void noteDestination(Comm::ConnectionPointer conn) override;
+ virtual void noteDestinationsEnd(ErrorState *selectionError) override;
+
+ void saveError(ErrorState *finalError);
+ void sendError(ErrorState *finalError, const char *reason);
+
private:
/// Gives Security::PeerConnector access to Answer in the TunnelStateData callback dialer.
class MyAnswerDialer: public CallDialer, public Security::PeerConnector::CbDialer
/// callback handler after connection setup (including any encryption)
void connectedToPeer(Security::EncryptorAnswer &answer);
+ /// details of the "last tunneling attempt" failure (if it failed)
+ ErrorState *savedError = nullptr;
+
public:
bool keepGoingAfterRead(size_t len, Comm::Flag errcode, int xerrno, Connection &from, Connection &to);
void copy(size_t len, Connection &from, Connection &to, IOCB *);
static CLCB tunnelServerClosed;
static CLCB tunnelClientClosed;
static CTCB tunnelTimeout;
-static PSC tunnelPeerSelectComplete;
static EVH tunnelDelayedClientRead;
static EVH tunnelDelayedServerRead;
static void tunnelConnected(const Comm::ConnectionPointer &server, void *);
xfree(url);
serverDestinations.clear();
delete connectRespBuf;
+ delete savedError;
}
TunnelStateData::Connection::~Connection()
}
// if we have no reply suitable to relay, use 502 Bad Gateway
- if (!sz || sz > static_cast<size_t>(connectRespBuf->contentSize())) {
- ErrorState *err = new ErrorState(ERR_CONNECT_FAIL, Http::scBadGateway, request.getRaw());
- *status_ptr = Http::scBadGateway;
- err->callback = tunnelErrorComplete;
- err->callback_data = this;
- errorSend(http->getConn()->clientConnection, err);
- return;
- }
+ if (!sz || sz > static_cast<size_t>(connectRespBuf->contentSize()))
+ return sendError(new ErrorState(ERR_CONNECT_FAIL, Http::scBadGateway, request.getRaw()),
+ "peer error without reply");
// if we need to send back the server response. write its headers to the client
server.len = sz;
tunnelState->server.conn->close();
}
+/// reacts to a failure to establish the given TCP connection
+void
+TunnelStateData::noteConnectFailure(const Comm::ConnectionPointer &conn)
+{
+ debugs(26, 4, "removing the failed one from " << serverDestinations.size() <<
+ " destinations: " << conn);
+
+ if (CachePeer *peer = conn->getPeer())
+ peerConnectFailed(peer);
+
+ assert(!serverDestinations.empty());
+ serverDestinations.erase(serverDestinations.begin());
+
+ // Since no TCP payload has been passed to client or server, we may
+ // TCP-connect to other destinations (including alternate IPs).
+
+ if (!FwdState::EnoughTimeToReForward(startTime))
+ return sendError(savedError, "forwarding timeout");
+
+ if (!serverDestinations.empty())
+ return startConnecting();
+
+ if (!PeerSelectionInitiator::subscribed)
+ return sendError(savedError, "tried all destinations");
+
+ debugs(26, 4, "wait for more destinations to try");
+ // expect a noteDestination*() call
+}
+
static void
tunnelConnectDone(const Comm::ConnectionPointer &conn, Comm::Flag status, int xerrno, void *data)
{
TunnelStateData *tunnelState = (TunnelStateData *)data;
if (status != Comm::OK) {
- debugs(26, 4, HERE << conn << ", comm failure recovery.");
- {
- assert(!tunnelState->serverDestinations.empty());
- const Comm::Connection &failedDest = *tunnelState->serverDestinations.front();
- if (CachePeer *peer = failedDest.getPeer())
- peerConnectFailed(peer);
- debugs(26, 4, "removing the failed one from " << tunnelState->serverDestinations.size() <<
- " destinations: " << failedDest);
- }
- /* At this point only the TCP handshake has failed. no data has been passed.
- * we are allowed to re-try the TCP-level connection to alternate IPs for CONNECT.
- */
- tunnelState->serverDestinations.erase(tunnelState->serverDestinations.begin());
- if (!tunnelState->serverDestinations.empty() && FwdState::EnoughTimeToReForward(tunnelState->startTime)) {
- debugs(26, 4, "re-forwarding");
- tunnelState->startConnecting();
- } else {
- debugs(26, 4, HERE << "terminate with error.");
- ErrorState *err = new ErrorState(ERR_CONNECT_FAIL, Http::scServiceUnavailable, tunnelState->request.getRaw());
- *tunnelState->status_ptr = Http::scServiceUnavailable;
- err->xerrno = xerrno;
- // on timeout is this still: err->xerrno = ETIMEDOUT;
- err->port = conn->remote.port();
- err->callback = tunnelErrorComplete;
- err->callback_data = tunnelState;
- errorSend(tunnelState->client.conn, err);
- if (tunnelState->request != NULL)
- tunnelState->request->hier.stopPeerClock(false);
- }
+ ErrorState *err = new ErrorState(ERR_CONNECT_FAIL, Http::scServiceUnavailable, tunnelState->request.getRaw());
+ err->xerrno = xerrno;
+ // on timeout is this still: err->xerrno = ETIMEDOUT;
+ err->port = conn->remote.port();
+ tunnelState->saveError(err);
+ tunnelState->noteConnectFailure(conn);
return;
}
#if USE_DELAY_POOLS
//server.setDelayId called from tunnelConnectDone after server side connection established
#endif
-
- peerSelect(&(tunnelState->serverDestinations), request, http->al,
- NULL,
- tunnelPeerSelectComplete,
- tunnelState);
+ tunnelState->startSelectingDestinations(request, http->al, nullptr);
}
void
TunnelStateData::connectedToPeer(Security::EncryptorAnswer &answer)
{
if (ErrorState *error = answer.error.get()) {
- *status_ptr = error->httpStatus;
- error->callback = tunnelErrorComplete;
- error->callback_data = this;
- errorSend(client.conn, error);
- answer.error.clear(); // preserve error for errorSendComplete()
+ answer.error.clear(); // sendError() will own the error
+ sendError(error, "TLS peer connection error");
return;
}
return nullptr;
}
-static void
-tunnelPeerSelectComplete(Comm::ConnectionList *peer_paths, ErrorState *err, void *data)
+void
+TunnelStateData::noteDestination(Comm::ConnectionPointer path)
{
- TunnelStateData *tunnelState = (TunnelStateData *)data;
+ const bool wasBlocked = serverDestinations.empty();
+ serverDestinations.push_back(path);
+ if (wasBlocked)
+ startConnecting();
+ // else continue to use one of the previously noted destinations;
+ // if all of them fail, we may try this path
+}
- bool bail = false;
- if (!peer_paths || peer_paths->empty()) {
- debugs(26, 3, HERE << "No paths found. Aborting CONNECT");
- bail = true;
- }
+void
+TunnelStateData::noteDestinationsEnd(ErrorState *selectionError)
+{
+ PeerSelectionInitiator::subscribed = false;
+ if (const bool wasBlocked = serverDestinations.empty()) {
- if (!bail && tunnelState->serverDestinations[0]->peerType == PINNED) {
- Comm::ConnectionPointer serverConn = borrowPinnedConnection(tunnelState->request.getRaw(), tunnelState->serverDestinations[0]);
- debugs(26,7, "pinned peer connection: " << serverConn);
- if (Comm::IsConnOpen(serverConn)) {
- tunnelConnectDone(serverConn, Comm::OK, 0, (void *)tunnelState);
- return;
- }
- bail = true;
- }
+ if (selectionError)
+ return sendError(selectionError, "path selection has failed");
- if (bail) {
- if (!err) {
- err = new ErrorState(ERR_CANNOT_FORWARD, Http::scServiceUnavailable, tunnelState->request.getRaw());
- }
- *tunnelState->status_ptr = err->httpStatus;
- err->callback = tunnelErrorComplete;
- err->callback_data = tunnelState;
- errorSend(tunnelState->client.conn, err);
- return;
+ if (savedError)
+ return sendError(savedError, "all found paths have failed");
+
+ return sendError(new ErrorState(ERR_CANNOT_FORWARD, Http::scServiceUnavailable, request.getRaw()),
+ "path selection found no paths");
}
- delete err;
+ // else continue to use one of the previously noted destinations;
+ // if all of them fail, tunneling as whole will fail
+ Must(!selectionError); // finding at least one path means selection succeeded
+}
- if (tunnelState->request != NULL)
- tunnelState->request->hier.startPeerClock();
+/// remembers an error to be used if there will be no more connection attempts
+void
+TunnelStateData::saveError(ErrorState *error)
+{
+ debugs(26, 4, savedError << " ? " << error);
+ assert(error);
+ delete savedError; // may be nil
+ savedError = error;
+}
+
+/// Starts sending the given error message to the client, leading to the
+/// eventual transaction termination. Call with savedError to send savedError.
+void
+TunnelStateData::sendError(ErrorState *finalError, const char *reason)
+{
+ debugs(26, 3, "aborting transaction for " << reason);
+
+ if (request)
+ request->hier.stopPeerClock(false);
+
+ assert(finalError);
- debugs(26, 3, "paths=" << peer_paths->size() << ", p[0]={" << (*peer_paths)[0] << "}, serverDest[0]={" <<
- tunnelState->serverDestinations[0] << "}");
+ // get rid of any cached error unless that is what the caller is sending
+ if (savedError != finalError)
+ delete savedError; // may be nil
+ savedError = nullptr;
- tunnelState->startConnecting();
+ // we cannot try other destinations after responding with an error
+ PeerSelectionInitiator::subscribed = false; // may already be false
+
+ *status_ptr = finalError->httpStatus;
+ finalError->callback = tunnelErrorComplete;
+ finalError->callback_data = this;
+ errorSend(client.conn, finalError);
}
void
TunnelStateData::startConnecting()
{
+ if (request)
+ request->hier.startPeerClock();
+
+ assert(!serverDestinations.empty());
Comm::ConnectionPointer &dest = serverDestinations.front();
+ debugs(26, 3, "to " << dest);
+
+ if (dest->peerType == PINNED) {
+ Comm::ConnectionPointer serverConn = borrowPinnedConnection(request.getRaw(), dest);
+ debugs(26,7, "pinned peer connection: " << serverConn);
+ if (Comm::IsConnOpen(serverConn)) {
+ tunnelConnectDone(serverConn, Comm::OK, 0, (void *)this);
+ return;
+ }
+ // a PINNED path failure is fatal; do not wait for more paths
+ sendError(new ErrorState(ERR_CANNOT_FORWARD, Http::scServiceUnavailable, request.getRaw()),
+ "pinned path failure");
+ return;
+ }
+
GetMarkingsToServer(request.getRaw(), *dest);
const time_t connectTimeout = dest->connectTimeout(startTime);