]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Happy Eyeballs: Use each fully resolved forwarding destination ASAP.
authorAlex Rousskov <rousskov@measurement-factory.com>
Fri, 9 Jun 2017 04:38:40 +0000 (22:38 -0600)
committerAlex Rousskov <rousskov@measurement-factory.com>
Fri, 9 Jun 2017 04:38:40 +0000 (22:38 -0600)
Do not wait for all request forwarding destinations to be resolved. Use
each resolved destination as soon as it is needed. This change does not
improve or affect IPv4/IPv6 handling (Squid still waits for both DNS A
and AAAA answers when resolving a single destination name), but the peer
selection code can now deliver each IP address to the FwdState/tunneling
code without waiting for all other destination names to be DNS-resolved.
TODO: Deliver A and AAAA answers to the peer selection code ASAP.

This change speeds up forwarding in peering environments where peers may
need frequent DNS resolutions but that was not the motivation here.

This change is a step towards a more complete Happy Eyeballs support in
Squid. The general path can be roughly summarized as follows:

1. Squid has already supported: Use parallel A and AAAA queries.
2. This change: ASAP delivery of IPs from peer selection to FwdState.
3. The next step: ASAP delivery of IPs from DNS to peer selection.
4. A separate project may add: Use parallel TCP connections.

Also fixed missing cbdataReferenceDone(psstate->callback_data) calls
in three error handling cases. These cases probably leaked memory.

src/FwdState.cc
src/FwdState.h
src/Makefile.am
src/PeerSelectState.h
src/comm/Connection.cc
src/comm/Connection.h
src/neighbors.cc
src/peer_select.cc
src/tests/stub_comm.cc
src/tunnel.cc

index 113765f8eb3ba08b9ad60bf546d279f7721afec2..0e9d6cdee95d3a2edff384d6e99fa7e78b9a76fe 100644 (file)
@@ -44,7 +44,6 @@
 #include "neighbors.h"
 #include "pconn.h"
 #include "PeerPoolMgr.h"
-#include "PeerSelectState.h"
 #include "security/BlindPeerConnector.h"
 #include "SquidConfig.h"
 #include "SquidTime.h"
@@ -66,7 +65,6 @@
 
 #include <cerrno>
 
-static PSC fwdPeerSelectionCompleteWrapper;
 static CLCB fwdServerClosedWrapper;
 static CNCB fwdConnectDoneWrapper;
 
@@ -114,7 +112,7 @@ FwdState::abort(void* d)
         debugs(17, 7, HERE << "store entry aborted; no connection to close");
     }
     fwd->serverDestinations.clear();
-    fwd->self = NULL;
+    fwd->stopAndDestroy("store entry aborted");
 }
 
 void
@@ -183,7 +181,17 @@ void FwdState::start(Pointer aSelf)
 #endif
 
     // do full route options selection
-    peerSelect(&serverDestinations, request, al, entry, fwdPeerSelectionCompleteWrapper, this);
+    startSelectingDestinations(request, al, entry);
+}
+
+/// ends forwarding; relies on refcounting so the effect may not be immediate
+void
+FwdState::stopAndDestroy(const char *reason)
+{
+    debugs(17, 3, "for " << reason);
+    PeerSelectionInitiator::subscribed = false; // may already be false
+    self = nullptr; // we hope refcounting destroys us soon; may already be nil
+    /* do not place any code here as this object may be gone by now */
 }
 
 #if STRICT_ORIGINAL_DST
@@ -431,12 +439,18 @@ FwdState::startConnectionOrFail()
 
         connectStart();
     } else {
+        if (PeerSelectionInitiator::subscribed) {
+            debugs(17, 4, "wait for more destinations to try");
+            return; // expect a noteDestination*() call
+        }
+
         debugs(17, 3, HERE << "Connection failed: " << entry->url());
         if (!err) {
             ErrorState *anErr = new ErrorState(ERR_CANNOT_FORWARD, Http::scInternalServerError, request);
             fail(anErr);
         } // else use actual error from last connection attempt
-        self = NULL;       // refcounted
+
+        stopAndDestroy("tried all destinations");
     }
 }
 
@@ -515,7 +529,8 @@ FwdState::complete()
         entry->reset();
 
         // drop the last path off the selection list. try the next one.
-        serverDestinations.erase(serverDestinations.begin());
+        if (!serverDestinations.empty()) // paranoid
+            serverDestinations.erase(serverDestinations.begin());
         startConnectionOrFail();
 
     } else {
@@ -529,21 +544,47 @@ FwdState::complete()
         if (!Comm::IsConnOpen(serverConn))
             completed();
 
-        self = NULL; // refcounted
+        stopAndDestroy("forwarding completed");
     }
 }
 
-/**** CALLBACK WRAPPERS ************************************************************/
+void
+FwdState::noteDestination(Comm::ConnectionPointer path)
+{
+    const bool wasBlocked = serverDestinations.empty();
+    serverDestinations.push_back(path);
+    if (wasBlocked)
+        startConnectionOrFail();
+    // else continue to use one of the previously noted destinations;
+    // if all of them fail, we may try this path
+}
 
-static void
-fwdPeerSelectionCompleteWrapper(Comm::ConnectionList *, ErrorState *err, void *data)
+void
+FwdState::noteDestinationsEnd(ErrorState *selectionError)
 {
-    FwdState *fwd = (FwdState *) data;
-    if (err)
-        fwd->fail(err);
-    fwd->startConnectionOrFail();
+    PeerSelectionInitiator::subscribed = false;
+    if (const bool wasBlocked = serverDestinations.empty()) {
+
+        if (selectionError) {
+            debugs(17, 3, "Will abort forwarding because path selection has failed.");
+            Must(!err); // if we tried to connect, then path selection succeeded
+            fail(selectionError);
+        }
+        else if (err)
+            debugs(17, 3, "Will abort forwarding because all found paths have failed.");
+        else
+            debugs(17, 3, "Will abort forwarding because path selection found no paths.");
+
+        startConnectionOrFail(); // will choose the OrFail code path
+        return;
+    }
+    // else continue to use one of the previously noted destinations;
+    // if all of them fail, forwarding as whole will fail
+    Must(!selectionError); // finding at least one path means selection succeeded
 }
 
+/**** CALLBACK WRAPPERS ************************************************************/
+
 static void
 fwdServerClosedWrapper(const CommCloseCbParams &params)
 {
@@ -656,7 +697,7 @@ FwdState::retryOrBail()
         errorAppendEntry(entry, anErr);
     }
 
-    self = NULL;    // refcounted
+    stopAndDestroy("cannot retry");
 }
 
 // If the Server quits before nibbling at the request body, the body sender
@@ -834,7 +875,7 @@ FwdState::connectStart()
         debugs(50, 4, "fwdConnectStart: Ssl bumped connections through parent proxy are not allowed");
         ErrorState *anErr = new ErrorState(ERR_CANNOT_FORWARD, Http::scServiceUnavailable, request);
         fail(anErr);
-        self = NULL; // refcounted
+        stopAndDestroy("SslBump misconfiguration");
         return;
     }
 
@@ -868,7 +909,7 @@ FwdState::connectStart()
         debugs(17,2,HERE << "Pinned connection failed: " << pinned_connection);
         ErrorState *anErr = new ErrorState(ERR_ZERO_SIZE_OBJECT, Http::scServiceUnavailable, request);
         fail(anErr);
-        self = NULL; // refcounted
+        stopAndDestroy("pinned connection failure");
         return;
     }
 
@@ -1076,7 +1117,7 @@ FwdState::reforward()
     if (request->bodyNibbled())
         return 0;
 
-    if (serverDestinations.size() <= 1) {
+    if (serverDestinations.size() <= 1 && !PeerSelectionInitiator::subscribed) {
         // NP: <= 1 since total count includes the recently failed one.
         debugs(17, 3, HERE << "No alternative forwarding paths left");
         return 0;
index 4e87bb5b90ae6274cac208b3e22c7a7b6ce28808..443fd9ff006406e32906c02debeeb62b3bd78e6d 100644 (file)
@@ -16,6 +16,7 @@
 #include "fde.h"
 #include "http/StatusCode.h"
 #include "ip/Address.h"
+#include "PeerSelectState.h"
 #include "security/forward.h"
 #if USE_OPENSSL
 #include "ssl/support.h"
@@ -55,13 +56,13 @@ void GetMarkingsToServer(HttpRequest * request, Comm::Connection &conn);
 
 class HelperReply;
 
-class FwdState : public RefCountable
+class FwdState: public RefCountable, public PeerSelectionInitiator
 {
-    CBDATA_CLASS(FwdState);
+    CBDATA_CHILD(FwdState);
 
 public:
     typedef RefCount<FwdState> Pointer;
-    ~FwdState();
+    virtual ~FwdState();
     static void initModule();
 
     /// Initiates request forwarding to a peer or origin server.
@@ -109,6 +110,11 @@ private:
     // hidden for safer management of self; use static fwdStart
     FwdState(const Comm::ConnectionPointer &client, StoreEntry *, HttpRequest *, const AccessLogEntryPointer &alp);
     void start(Pointer aSelf);
+    void stopAndDestroy(const char *reason);
+
+    /* PeerSelectionInitiator API */
+    virtual void noteDestination(Comm::ConnectionPointer conn) override;
+    virtual void noteDestinationsEnd(ErrorState *selectionError) override;
 
 #if STRICT_ORIGINAL_DST
     void selectPeerForIntercepted();
index f619f9e057ad4a3484a4c2af3e797b2733fb8d46..323653c9ad44db5b92fc46f7f191acfbe7139ef4 100644 (file)
@@ -999,7 +999,8 @@ tests_testHttpReply_SOURCES=\
        wordlist.h \
        wordlist.cc
 nodist_tests_testHttpReply_SOURCES=\
-       $(TESTSOURCES)
+       $(TESTSOURCES) \
+       hier_code.cc
 tests_testHttpReply_LDFLAGS = $(LIBADD_DL)
 tests_testHttpReply_LDADD=\
        CommCalls.o \
@@ -1557,6 +1558,7 @@ tests_testDiskIO_SOURCES = \
        tests/stub_tools.cc
 nodist_tests_testDiskIO_SOURCES= \
        $(TESTSOURCES) \
+       hier_code.cc \
        SquidMath.cc \
        SquidMath.h \
        swap_log_op.cc
@@ -3021,6 +3023,7 @@ tests_testUfs_SOURCES = \
 
 nodist_tests_testUfs_SOURCES = \
        $(TESTSOURCES) \
+       hier_code.cc \
        SquidMath.cc \
        SquidMath.h \
        swap_log_op.cc
@@ -3200,10 +3203,11 @@ tests_testRock_SOURCES = \
        $(DELAY_POOL_SOURCE) \
        $(UNLINKDSOURCE)
 nodist_tests_testRock_SOURCES = \
-       swap_log_op.cc \
+       $(TESTSOURCES) \
+       hier_code.cc \
        SquidMath.cc \
        SquidMath.h \
-       $(TESTSOURCES)
+       swap_log_op.cc
 tests_testRock_LDADD = \
        http/libhttp.la \
        parser/libparser.la \
index 86d9107291ee548e996acd7cf638c0ae38f20236..b1744c468324917c2949865b15f2500ed98d646a 100644 (file)
@@ -22,59 +22,65 @@ class HttpRequest;
 class StoreEntry;
 class ErrorState;
 
-typedef void PSC(Comm::ConnectionList *, ErrorState *, void *);
-
-void peerSelect(Comm::ConnectionList *, HttpRequest *, AccessLogEntry::Pointer const&, StoreEntry *, PSC *, void *data);
 void peerSelectInit(void);
 
-/**
- * A CachePeer which has been selected as a possible destination.
- * Listed as pointers here so as to prevent duplicates being added but will
- * be converted to a set of IP address path options before handing back out
- * to the caller.
- *
- * Certain connection flags and outgoing settings will also be looked up and
- * set based on the received request and CachePeer settings before handing back.
- */
-class FwdServer
+/// Interface for those who need a list of peers to forward a request to.
+class PeerSelectionInitiator: public CbdataParent
 {
-    MEMPROXY_CLASS(FwdServer);
-
 public:
-    FwdServer(CachePeer *p, hier_code c) :
-        _peer(p),
-        code(c),
-        next(nullptr)
-    {}
-
-    CbcPointer<CachePeer> _peer;                /* NULL --> origin server */
-    hier_code code;
-    FwdServer *next;
+    virtual ~PeerSelectionInitiator() = default;
+
+    /// called when a new unique destination has been found
+    virtual void noteDestination(Comm::ConnectionPointer path) = 0;
+
+    /// called when there will be no more noteDestination() calls
+    /// \param error is a possible reason why no destinations were found; it is
+    /// guaranteed to be nil if there was at least one noteDestination() call
+    virtual void noteDestinationsEnd(ErrorState *error) = 0;
+
+    /// whether noteDestination() and noteDestinationsEnd() calls are allowed
+    bool subscribed = false;
+
+/* protected: */
+    /// Initiates asynchronous peer selection that eventually
+    /// results in zero or more noteDestination() calls and
+    /// exactly one noteDestinationsEnd() call.
+    void startSelectingDestinations(HttpRequest *request, const AccessLogEntry::Pointer &ale, StoreEntry *entry);
 };
 
+class FwdServer;
+
 class ps_state
 {
     CBDATA_CLASS(ps_state);
 
 public:
-    ps_state();
+    explicit ps_state(PeerSelectionInitiator *initiator);
     ~ps_state();
 
     // Produce a URL for display identifying the transaction we are
     // trying to locate a peer for.
     const SBuf url() const;
 
+    /// \returns valid/interested peer initiator or nil
+    PeerSelectionInitiator *interestedInitiator();
+
+    /// \returns whether the initiator may use more destinations
+    bool wantsMoreDestinations() const;
+
+    /// processes a newly discovered/finalized path
+    void handlePath(Comm::ConnectionPointer &path, FwdServer &fs);
+
     HttpRequest *request;
     AccessLogEntry::Pointer al; ///< info for the future access.log entry
     StoreEntry *entry;
     allow_t always_direct;
     allow_t never_direct;
     int direct;   // TODO: fold always_direct/never_direct/prefer_direct into this now that ACL can do a multi-state result.
-    PSC *callback;
-    void *callback_data;
+    size_t foundPaths = 0; ///< number of unique destinations identified so far
+    void *peerCountMcastPeerXXX = nullptr; ///< a hack to help peerCountMcastPeersStart()
     ErrorState *lastError;
 
-    Comm::ConnectionList *paths;    ///< the callers paths array. to be filled with our final results.
     FwdServer *servers;    ///< temporary linked list of peers we will pass back.
 
     /*
@@ -96,6 +102,13 @@ public:
     peer_t hit_type;
     ping_data ping;
     ACLChecklist *acl_checklist;
+
+    const InstanceId<ps_state> id; ///< unique identification in worker log
+
+private:
+
+    typedef CbcPointer<PeerSelectionInitiator> Initiator;
+    Initiator initiator_; ///< recipient of the destinations we select; use interestedInitiator() to access
 };
 
 #endif /* SQUID_PEERSELECTSTATE_H */
index 246f8bf279e1e7e6136ca6d4a88356a4f0f12290..73da9cb8782fb3db89d5d9cf585d0f0ca10e33bb 100644 (file)
@@ -17,6 +17,7 @@
 #include "security/NegotiationHistory.h"
 #include "SquidConfig.h"
 #include "SquidTime.h"
+#include <ostream>
 
 class CachePeer;
 bool
@@ -152,3 +153,19 @@ Comm::Connection::connectTimeout(const time_t fwdStart) const
     return min(ctimeout, ftimeout);
 }
 
+std::ostream &
+operator << (std::ostream &os, const Comm::Connection &conn)
+{
+    os << "local=" << conn.local << " remote=" << conn.remote;
+    if (conn.peerType)
+        os << ' ' << hier_code_str[conn.peerType];
+    if (conn.fd >= 0)
+        os << " FD " << conn.fd;
+    if (conn.flags != COMM_UNSET)
+        os << " flags=" << conn.flags;
+#if USE_IDENT
+    if (*conn.rfc931)
+        os << " IDENT::" << conn.rfc931;
+#endif
+    return os;
+}
index 67f0e063b14f929259edb077a0eeaad4fbb37a3c..6e20b037b5a6bd852e1da9fad5ce7c29c2f13049 100644 (file)
@@ -172,24 +172,7 @@ private:
 
 }; // namespace Comm
 
-// NP: Order and namespace here is very important.
-//     * The second define inlines the first.
-//     * Stream inheritance overloading is searched in the global scope first.
-
-inline std::ostream &
-operator << (std::ostream &os, const Comm::Connection &conn)
-{
-    os << "local=" << conn.local << " remote=" << conn.remote;
-    if (conn.fd >= 0)
-        os << " FD " << conn.fd;
-    if (conn.flags != COMM_UNSET)
-        os << " flags=" << conn.flags;
-#if USE_IDENT
-    if (*conn.rfc931)
-        os << " IDENT::" << conn.rfc931;
-#endif
-    return os;
-}
+std::ostream &operator << (std::ostream &os, const Comm::Connection &conn);
 
 inline std::ostream &
 operator << (std::ostream &os, const Comm::ConnectionPointer &conn)
index cb95fea621896e363a491aafd4d77154956db94d..b16994d639325892dacf3032f6c6dd6cbdd96f09 100644 (file)
@@ -1369,6 +1369,8 @@ peerCountMcastPeersSchedule(CachePeer * p, time_t when)
 static void
 peerCountMcastPeersStart(void *data)
 {
+    // XXX: Do not create lots of complex fake objects (while abusing their
+    // APIs) to pass around a few basic data points like start_ping and ping!
     CachePeer *p = (CachePeer *)data;
     ps_state *psstate;
     StoreEntry *fake;
@@ -1383,12 +1385,11 @@ peerCountMcastPeersStart(void *data)
     strcat(url, "/");
     fake = storeCreateEntry(url, url, RequestFlags(), Http::METHOD_GET);
     HttpRequest *req = HttpRequest::CreateFromUrl(url);
-    psstate = new ps_state;
+    psstate = new ps_state(nullptr);
     psstate->request = req;
     HTTPMSGLOCK(psstate->request);
     psstate->entry = fake;
-    psstate->callback = NULL;
-    psstate->callback_data = cbdataReference(p);
+    psstate->peerCountMcastPeerXXX = cbdataReference(p);
     psstate->ping.start = current_time;
     mem = fake->mem_obj;
     mem->request = psstate->request;
@@ -1415,8 +1416,8 @@ peerCountMcastPeersDone(void *data)
     ps_state *psstate = (ps_state *)data;
     StoreEntry *fake = psstate->entry;
 
-    if (cbdataReferenceValid(psstate->callback_data)) {
-        CachePeer *p = (CachePeer *)psstate->callback_data;
+    if (cbdataReferenceValid(psstate->peerCountMcastPeerXXX)) {
+        CachePeer *p = (CachePeer *)psstate->peerCountMcastPeerXXX;
         p->mcast.flags.counting = false;
         p->mcast.avg_n_members = Math::doubleAverage(p->mcast.avg_n_members, (double) psstate->ping.n_recv, ++p->mcast.n_times_counted, 10);
         debugs(15, DBG_IMPORTANT, "Group " << p->host  << ": " << psstate->ping.n_recv  <<
@@ -1425,7 +1426,7 @@ peerCountMcastPeersDone(void *data)
         p->mcast.n_replies_expected = (int) p->mcast.avg_n_members;
     }
 
-    cbdataReferenceDone(psstate->callback_data);
+    cbdataReferenceDone(psstate->peerCountMcastPeerXXX);
 
     fake->abort(); // sets ENTRY_ABORTED and initiates releated cleanup
     fake->mem_obj->request = nullptr;
index 807d43ab16c398183ed4da841737f39a53a97c86..baae98814bb463e9fb5587502de1ecb64e180f82 100644 (file)
@@ -10,6 +10,7 @@
 
 #include "squid.h"
 #include "acl/FilledChecklist.h"
+#include "base/InstanceId.h"
 #include "CachePeer.h"
 #include "carp.h"
 #include "client_side.h"
 #include "Store.h"
 #include "URL.h"
 
+/**
+ * A CachePeer which has been selected as a possible destination.
+ * Listed as pointers here so as to prevent duplicates being added but will
+ * be converted to a set of IP address path options before handing back out
+ * to the caller.
+ *
+ * Certain connection flags and outgoing settings will also be looked up and
+ * set based on the received request and CachePeer settings before handing back.
+ */
+class FwdServer
+{
+    MEMPROXY_CLASS(FwdServer);
+
+public:
+    FwdServer(CachePeer *p, hier_code c) :
+        _peer(p),
+        code(c),
+        next(nullptr)
+    {}
+
+    CbcPointer<CachePeer> _peer;                /* NULL --> origin server */
+    hier_code code;
+    FwdServer *next;
+};
+
 static struct {
     int timeouts;
 } PeerStats;
@@ -122,33 +148,24 @@ peerSelectIcpPing(HttpRequest * request, int direct, StoreEntry * entry)
     return n;
 }
 
-void
-peerSelect(Comm::ConnectionList * paths,
+static void
+peerSelect(PeerSelectionInitiator *initiator,
            HttpRequest * request,
            AccessLogEntry::Pointer const &al,
-           StoreEntry * entry,
-           PSC * callback,
-           void *callback_data)
+           StoreEntry * entry)
 {
-    ps_state *psstate;
-
     if (entry)
         debugs(44, 3, *entry << ' ' << entry->url());
     else
         debugs(44, 3, request->method);
 
-    psstate = new ps_state;
+    const auto psstate = new ps_state(initiator);
 
     psstate->request = request;
     HTTPMSGLOCK(psstate->request);
     psstate->al = al;
 
     psstate->entry = entry;
-    psstate->paths = paths;
-
-    psstate->callback = callback;
-
-    psstate->callback_data = cbdataReference(callback_data);
 
 #if USE_CACHE_DIGESTS
 
@@ -162,6 +179,14 @@ peerSelect(Comm::ConnectionList * paths,
     peerSelectFoo(psstate);
 }
 
+void
+PeerSelectionInitiator::startSelectingDestinations(HttpRequest *request, const AccessLogEntry::Pointer &ale, StoreEntry *entry)
+{
+    subscribed = true;
+    peerSelect(this, request, ale, entry);
+    // and wait for noteDestination() and/or noteDestinationsEnd() calls
+}
+
 static void
 peerCheckNeverDirectDone(allow_t answer, void *data)
 {
@@ -208,16 +233,26 @@ peerCheckAlwaysDirectDone(allow_t answer, void *data)
     peerSelectFoo(psstate);
 }
 
+/// \returns true (after destroying psstate) if the peer initiator is gone
+/// \returns false (without side effects) otherwise
+static bool
+peerSelectionAborted(ps_state *psstate)
+{
+    if (psstate->interestedInitiator())
+        return false;
+
+    debugs(44, 3, "Aborting peer selection: Initiator gone or lost interest.");
+    delete psstate;
+    return true;
+}
+
 void
 peerSelectDnsPaths(ps_state *psstate)
 {
-    FwdServer *fs = psstate->servers;
-
-    if (!cbdataReferenceValid(psstate->callback_data)) {
-        debugs(44, 3, "Aborting peer selection. Parent Job went away.");
-        delete psstate;
+    if (peerSelectionAborted(psstate))
         return;
-    }
+
+    FwdServer *fs = psstate->servers;
 
     // Bug 3243: CVE 2009-0801
     // Bypass of browser same-origin access control in intercepted communication
@@ -234,12 +269,8 @@ peerSelectDnsPaths(ps_state *psstate)
             // construct a "result" adding the ORIGINAL_DST to the set instead of DIRECT
             Comm::ConnectionPointer p = new Comm::Connection();
             p->remote = req->clientConnectionManager->clientConnection->local;
-            p->peerType = ORIGINAL_DST; // fs->code is DIRECT. This fixes the display.
-            p->setPeer(fs->_peer.get());
-
-            // check for a configured outgoing address for this destination...
-            getOutgoingAddress(psstate->request, p);
-            psstate->paths->push_back(p);
+            fs->code = ORIGINAL_DST; // fs->code is DIRECT. This fixes the display.
+            psstate->handlePath(p, *fs);
         }
 
         // clear the used fs and continue
@@ -250,7 +281,7 @@ peerSelectDnsPaths(ps_state *psstate)
     }
 
     // convert the list of FwdServer destinations into destinations IP addresses
-    if (fs && psstate->paths->size() < (unsigned int)Config.forward_max_tries) {
+    if (fs && psstate->wantsMoreDestinations()) {
         // send the next one off for DNS lookup.
         const char *host = fs->_peer.valid() ? fs->_peer->host : psstate->request->url.host();
         debugs(44, 2, "Find IP destination for: " << psstate->url() << "' via " << host);
@@ -261,7 +292,7 @@ peerSelectDnsPaths(ps_state *psstate)
     // Bug 3605: clear any extra listed FwdServer destinations, when the options exceeds max_foward_tries.
     // due to the allocation method of fs, we must deallocate each manually.
     // TODO: use a std::list so we can get the size and abort adding whenever the selection loops reach Config.forward_max_tries
-    if (fs && psstate->paths->size() >= (unsigned int)Config.forward_max_tries) {
+    if (fs) {
         assert(fs == psstate->servers);
         while (fs) {
             psstate->servers = fs->next;
@@ -271,35 +302,25 @@ peerSelectDnsPaths(ps_state *psstate)
     }
 
     // done with DNS lookups. pass back to caller
-    PSC *callback = psstate->callback;
-    psstate->callback = NULL;
 
-    debugs(44, 2, (psstate->paths->size()<1?"Failed to select source":"Found sources") << " for '" << psstate->url() << "'");
+    debugs(44, 2, psstate->id << " found all " << psstate->foundPaths << " destinations for " << psstate->url());
     debugs(44, 2, "  always_direct = " << psstate->always_direct);
     debugs(44, 2, "   never_direct = " << psstate->never_direct);
-    if (psstate->paths) {
-        for (size_t i = 0; i < psstate->paths->size(); ++i) {
-            if ((*psstate->paths)[i]->peerType == HIER_DIRECT)
-                debugs(44, 2, "         DIRECT = " << (*psstate->paths)[i]);
-            else if ((*psstate->paths)[i]->peerType == ORIGINAL_DST)
-                debugs(44, 2, "   ORIGINAL_DST = " << (*psstate->paths)[i]);
-            else if ((*psstate->paths)[i]->peerType == PINNED)
-                debugs(44, 2, "         PINNED = " << (*psstate->paths)[i]);
-            else
-                debugs(44, 2, "     cache_peer = " << (*psstate->paths)[i]);
-        }
-    }
     debugs(44, 2, "       timedout = " << psstate->ping.timedout);
 
     psstate->ping.stop = current_time;
-    psstate->request->hier.ping = psstate->ping;
+    psstate->request->hier.ping = psstate->ping; // final result
 
-    void *cbdata;
-    if (cbdataReferenceValidDone(psstate->callback_data, &cbdata)) {
-        callback(psstate->paths, psstate->lastError, cbdata);
-        psstate->lastError = NULL; // FwdState has taken control over the ErrorState object.
+    if (psstate->lastError && psstate->foundPaths) {
+        // nobody cares about errors if we found destinations despite them
+        debugs(44, 3, "forgetting the last error");
+        delete psstate->lastError;
+        psstate->lastError = nullptr;
     }
 
+    if (const auto initiator = psstate->interestedInitiator())
+        initiator->noteDestinationsEnd(psstate->lastError);
+    psstate->lastError = nullptr; // initiator owns the ErrorState object now
     delete psstate;
 }
 
@@ -307,12 +328,8 @@ static void
 peerSelectDnsResults(const ipcache_addrs *ia, const Dns::LookupDetails &details, void *data)
 {
     ps_state *psstate = (ps_state *)data;
-
-    if (!cbdataReferenceValid(psstate->callback_data)) {
-        debugs(44, 3, "Aborting peer selection. Parent Job went away.");
-        delete psstate;
+    if (peerSelectionAborted(psstate))
         return;
-    }
 
     psstate->request->recordLookup(details);
 
@@ -328,8 +345,7 @@ peerSelectDnsResults(const ipcache_addrs *ia, const Dns::LookupDetails &details,
 
             if (ip >= ia->count) ip = 0; // looped back to zero.
 
-            // Enforce forward_max_tries configuration.
-            if (psstate->paths->size() >= (unsigned int)Config.forward_max_tries)
+            if (!psstate->wantsMoreDestinations())
                 break;
 
             // for TPROXY spoofing we must skip unusable addresses.
@@ -351,12 +367,8 @@ peerSelectDnsResults(const ipcache_addrs *ia, const Dns::LookupDetails &details,
             }
 
             p->remote.port(fs->_peer.valid() ? fs->_peer->http_port : psstate->request->url.port());
-            p->peerType = fs->code;
-            p->setPeer(fs->_peer.get());
 
-            // check for a configured outgoing address for this destination...
-            getOutgoingAddress(psstate->request, p);
-            psstate->paths->push_back(p);
+            psstate->handlePath(p, *fs);
         }
     } else {
         debugs(44, 3, "Unknown host: " << (fs->_peer.valid() ? fs->_peer->host : psstate->request->url.host()));
@@ -422,11 +434,8 @@ peerCheckNetdbDirect(ps_state * psstate)
 static void
 peerSelectFoo(ps_state * ps)
 {
-    if (!cbdataReferenceValid(ps->callback_data)) {
-        debugs(44, 3, "Aborting peer selection. Parent Job went away.");
-        delete ps;
+    if (peerSelectionAborted(ps))
         return;
-    }
 
     StoreEntry *entry = ps->entry;
     HttpRequest *request = ps->request;
@@ -742,19 +751,13 @@ static void
 peerPingTimeout(void *data)
 {
     ps_state *psstate = (ps_state *)data;
-    StoreEntry *entry = psstate->entry;
+    debugs(44, 3, psstate->url());
 
-    if (entry)
-        debugs(44, 3, psstate->url());
-
-    if (!cbdataReferenceValid(psstate->callback_data)) {
-        /* request aborted */
-        if (entry)
-            entry->ping_status = PING_DONE;
-        cbdataReferenceDone(psstate->callback_data);
-        delete psstate;
+    if (StoreEntry *entry = psstate->entry)
+        entry->ping_status = PING_DONE;
+
+    if (peerSelectionAborted(psstate))
         return;
-    }
 
     ++PeerStats.timeouts;
     psstate->ping.timedout = 1;
@@ -937,21 +940,20 @@ peerAddFwdServer(FwdServer ** FSVR, CachePeer * p, hier_code code)
     *FSVR = fs;
 }
 
-ps_state::ps_state() : request (NULL),
+ps_state::ps_state(PeerSelectionInitiator *initiator):
+    request(nullptr),
     entry (NULL),
     always_direct(Config.accessList.AlwaysDirect?ACCESS_DUNNO:ACCESS_DENIED),
     never_direct(Config.accessList.NeverDirect?ACCESS_DUNNO:ACCESS_DENIED),
     direct(DIRECT_UNKNOWN),
-    callback (NULL),
-    callback_data (NULL),
     lastError(NULL),
-    paths(NULL),
     servers (NULL),
     first_parent_miss(),
     closest_parent_miss(),
     hit(NULL),
     hit_type(PEER_NONE),
-    acl_checklist (NULL)
+    acl_checklist (NULL),
+    initiator_(initiator)
 {
     ; // no local defaults.
 }
@@ -969,6 +971,56 @@ ps_state::url() const
     return noUrl;
 }
 
+/// \returns valid/interested peer initiator or nil
+PeerSelectionInitiator *
+ps_state::interestedInitiator()
+{
+    const auto initiator = initiator_.valid();
+
+    if (!initiator) {
+        debugs(44, 3, id << " initiator gone");
+        return nullptr;
+    }
+
+    if (!initiator->subscribed) {
+        debugs(44, 3, id << " initiator lost interest");
+        return nullptr;
+    }
+
+    return initiator;
+}
+
+bool
+ps_state::wantsMoreDestinations() const {
+    const auto maxCount = Config.forward_max_tries;
+    return maxCount >= 0 && foundPaths <
+        static_cast<std::make_unsigned<decltype(maxCount)>::type>(maxCount);
+}
+
+void
+ps_state::handlePath(Comm::ConnectionPointer &path, FwdServer &fs)
+{
+    ++foundPaths;
+
+    path->peerType = fs.code;
+    path->setPeer(fs._peer.get());
+
+    // check for a configured outgoing address for this destination...
+    getOutgoingAddress(request, path);
+
+    request->hier.ping = ping; // may be updated later
+
+    debugs(44, 2, id << " found " << path << ", destination #" << foundPaths << " for " << url());
+    debugs(44, 2, "  always_direct = " << always_direct);
+    debugs(44, 2, "   never_direct = " << never_direct);
+    debugs(44, 2, "       timedout = " << ping.timedout);
+
+    if (const auto initiator = interestedInitiator())
+        initiator->noteDestination(path);
+}
+
+InstanceIdDefinitions(ps_state, "PeerSelector");
+
 ping_data::ping_data() :
     n_sent(0),
     n_recv(0),
index 55dda4dff8f44d35a5c177bdfe8cde1c54d97269..1f7f215656c2a9277b9ea129488b15cebea35178 100644 (file)
@@ -14,6 +14,8 @@
 #define STUB_API "comm.cc"
 #include "tests/STUB.h"
 
+#include <ostream>
+
 void comm_read(const Comm::ConnectionPointer &conn, char *buf, int size, IOCB *handler, void *handler_data) STUB
 void comm_read(const Comm::ConnectionPointer &conn, char*, int, AsyncCall::Pointer &callback) STUB
 
@@ -69,3 +71,4 @@ void commStartHalfClosedMonitor(int fd) STUB
 bool commHasHalfClosedMonitor(int fd) STUB_RETVAL(false)
 int CommSelectEngine::checkEvents(int timeout) STUB_RETVAL(0)
 
+std::ostream &operator << (std::ostream &os, const Comm::Connection &conn) STUB_RETVAL(os << "[Connection object]")
index b92b4d07d8ebcb72e863748086c1b12e04057ccd..c50bf70a9c25cf407a97a98438daf4e6454c1d9b 100644 (file)
  *
  * TODO 2: then convert this into a AsyncJob, possibly a child of 'Server'
  */
-class TunnelStateData
+class TunnelStateData: public PeerSelectionInitiator
 {
-    CBDATA_CLASS(TunnelStateData);
+    CBDATA_CHILD(TunnelStateData);
 
 public:
     TunnelStateData(ClientHttpRequest *);
-    ~TunnelStateData();
+    virtual ~TunnelStateData();
     TunnelStateData(const TunnelStateData &); // do not implement
     TunnelStateData &operator =(const TunnelStateData &); // do not implement
 
@@ -126,6 +126,8 @@ public:
     /// recovering from the previous connect failure
     void startConnecting();
 
+    void noteConnectFailure(const Comm::ConnectionPointer &conn);
+
     class Connection
     {
 
@@ -181,6 +183,13 @@ public:
     /// continue to set up connection to a peer, going async for SSL peers
     void connectToPeer();
 
+    /* PeerSelectionInitiator API */
+    virtual void noteDestination(Comm::ConnectionPointer conn) override;
+    virtual void noteDestinationsEnd(ErrorState *selectionError) override;
+
+    void saveError(ErrorState *finalError);
+    void sendError(ErrorState *finalError, const char *reason);
+
 private:
     /// Gives Security::PeerConnector access to Answer in the TunnelStateData callback dialer.
     class MyAnswerDialer: public CallDialer, public Security::PeerConnector::CbDialer
@@ -210,6 +219,9 @@ private:
     /// callback handler after connection setup (including any encryption)
     void connectedToPeer(Security::EncryptorAnswer &answer);
 
+    /// details of the "last tunneling attempt" failure (if it failed)
+    ErrorState *savedError = nullptr;
+
 public:
     bool keepGoingAfterRead(size_t len, Comm::Flag errcode, int xerrno, Connection &from, Connection &to);
     void copy(size_t len, Connection &from, Connection &to, IOCB *);
@@ -232,7 +244,6 @@ static ERCB tunnelErrorComplete;
 static CLCB tunnelServerClosed;
 static CLCB tunnelClientClosed;
 static CTCB tunnelTimeout;
-static PSC tunnelPeerSelectComplete;
 static EVH tunnelDelayedClientRead;
 static EVH tunnelDelayedServerRead;
 static void tunnelConnected(const Comm::ConnectionPointer &server, void *);
@@ -327,6 +338,7 @@ TunnelStateData::~TunnelStateData()
     xfree(url);
     serverDestinations.clear();
     delete connectRespBuf;
+    delete savedError;
 }
 
 TunnelStateData::Connection::~Connection()
@@ -447,14 +459,9 @@ TunnelStateData::informUserOfPeerError(const char *errMsg, const size_t sz)
     }
 
     // if we have no reply suitable to relay, use 502 Bad Gateway
-    if (!sz || sz > static_cast<size_t>(connectRespBuf->contentSize())) {
-        ErrorState *err = new ErrorState(ERR_CONNECT_FAIL, Http::scBadGateway, request.getRaw());
-        *status_ptr = Http::scBadGateway;
-        err->callback = tunnelErrorComplete;
-        err->callback_data = this;
-        errorSend(http->getConn()->clientConnection, err);
-        return;
-    }
+    if (!sz || sz > static_cast<size_t>(connectRespBuf->contentSize()))
+        return sendError(new ErrorState(ERR_CONNECT_FAIL, Http::scBadGateway, request.getRaw()),
+                         "peer error without reply");
 
     // if we need to send back the server response. write its headers to the client
     server.len = sz;
@@ -986,41 +993,47 @@ tunnelErrorComplete(int fd/*const Comm::ConnectionPointer &*/, void *data, size_
         tunnelState->server.conn->close();
 }
 
+/// reacts to a failure to establish the given TCP connection
+void
+TunnelStateData::noteConnectFailure(const Comm::ConnectionPointer &conn)
+{
+    debugs(26, 4, "removing the failed one from " << serverDestinations.size() <<
+           " destinations: " << conn);
+
+    if (CachePeer *peer = conn->getPeer())
+        peerConnectFailed(peer);
+
+    assert(!serverDestinations.empty());
+    serverDestinations.erase(serverDestinations.begin());
+
+    // Since no TCP payload has been passed to client or server, we may
+    // TCP-connect to other destinations (including alternate IPs).
+
+    if (!FwdState::EnoughTimeToReForward(startTime))
+        return sendError(savedError, "forwarding timeout");
+
+    if (!serverDestinations.empty())
+        return startConnecting();
+
+    if (!PeerSelectionInitiator::subscribed)
+        return sendError(savedError, "tried all destinations");
+
+    debugs(26, 4, "wait for more destinations to try");
+    // expect a noteDestination*() call
+}
+
 static void
 tunnelConnectDone(const Comm::ConnectionPointer &conn, Comm::Flag status, int xerrno, void *data)
 {
     TunnelStateData *tunnelState = (TunnelStateData *)data;
 
     if (status != Comm::OK) {
-        debugs(26, 4, HERE << conn << ", comm failure recovery.");
-        {
-            assert(!tunnelState->serverDestinations.empty());
-            const Comm::Connection &failedDest = *tunnelState->serverDestinations.front();
-            if (CachePeer *peer = failedDest.getPeer())
-                peerConnectFailed(peer);
-            debugs(26, 4, "removing the failed one from " << tunnelState->serverDestinations.size() <<
-                   " destinations: " << failedDest);
-        }
-        /* At this point only the TCP handshake has failed. no data has been passed.
-         * we are allowed to re-try the TCP-level connection to alternate IPs for CONNECT.
-         */
-        tunnelState->serverDestinations.erase(tunnelState->serverDestinations.begin());
-        if (!tunnelState->serverDestinations.empty() && FwdState::EnoughTimeToReForward(tunnelState->startTime)) {
-            debugs(26, 4, "re-forwarding");
-            tunnelState->startConnecting();
-        } else {
-            debugs(26, 4, HERE << "terminate with error.");
-            ErrorState *err = new ErrorState(ERR_CONNECT_FAIL, Http::scServiceUnavailable, tunnelState->request.getRaw());
-            *tunnelState->status_ptr = Http::scServiceUnavailable;
-            err->xerrno = xerrno;
-            // on timeout is this still:    err->xerrno = ETIMEDOUT;
-            err->port = conn->remote.port();
-            err->callback = tunnelErrorComplete;
-            err->callback_data = tunnelState;
-            errorSend(tunnelState->client.conn, err);
-            if (tunnelState->request != NULL)
-                tunnelState->request->hier.stopPeerClock(false);
-        }
+        ErrorState *err = new ErrorState(ERR_CONNECT_FAIL, Http::scServiceUnavailable, tunnelState->request.getRaw());
+        err->xerrno = xerrno;
+        // on timeout is this still:    err->xerrno = ETIMEDOUT;
+        err->port = conn->remote.port();
+        tunnelState->saveError(err);
+        tunnelState->noteConnectFailure(conn);
         return;
     }
 
@@ -1101,11 +1114,7 @@ tunnelStart(ClientHttpRequest * http)
 #if USE_DELAY_POOLS
     //server.setDelayId called from tunnelConnectDone after server side connection established
 #endif
-
-    peerSelect(&(tunnelState->serverDestinations), request, http->al,
-               NULL,
-               tunnelPeerSelectComplete,
-               tunnelState);
+    tunnelState->startSelectingDestinations(request, http->al, nullptr);
 }
 
 void
@@ -1130,11 +1139,8 @@ void
 TunnelStateData::connectedToPeer(Security::EncryptorAnswer &answer)
 {
     if (ErrorState *error = answer.error.get()) {
-        *status_ptr = error->httpStatus;
-        error->callback = tunnelErrorComplete;
-        error->callback_data = this;
-        errorSend(client.conn, error);
-        answer.error.clear(); // preserve error for errorSendComplete()
+        answer.error.clear(); // sendError() will own the error
+        sendError(error, "TLS peer connection error");
         return;
     }
 
@@ -1199,52 +1205,96 @@ borrowPinnedConnection(HttpRequest *request, Comm::ConnectionPointer &serverDest
     return nullptr;
 }
 
-static void
-tunnelPeerSelectComplete(Comm::ConnectionList *peer_paths, ErrorState *err, void *data)
+void
+TunnelStateData::noteDestination(Comm::ConnectionPointer path)
 {
-    TunnelStateData *tunnelState = (TunnelStateData *)data;
+    const bool wasBlocked = serverDestinations.empty();
+    serverDestinations.push_back(path);
+    if (wasBlocked)
+        startConnecting();
+    // else continue to use one of the previously noted destinations;
+    // if all of them fail, we may try this path
+}
 
-    bool bail = false;
-    if (!peer_paths || peer_paths->empty()) {
-        debugs(26, 3, HERE << "No paths found. Aborting CONNECT");
-        bail = true;
-    }
+void
+TunnelStateData::noteDestinationsEnd(ErrorState *selectionError)
+{
+    PeerSelectionInitiator::subscribed = false;
+    if (const bool wasBlocked = serverDestinations.empty()) {
 
-    if (!bail && tunnelState->serverDestinations[0]->peerType == PINNED) {
-        Comm::ConnectionPointer serverConn = borrowPinnedConnection(tunnelState->request.getRaw(), tunnelState->serverDestinations[0]);
-        debugs(26,7, "pinned peer connection: " << serverConn);
-        if (Comm::IsConnOpen(serverConn)) {
-            tunnelConnectDone(serverConn, Comm::OK, 0, (void *)tunnelState);
-            return;
-        }
-        bail = true;
-    }
+        if (selectionError)
+            return sendError(selectionError, "path selection has failed");
 
-    if (bail) {
-        if (!err) {
-            err = new ErrorState(ERR_CANNOT_FORWARD, Http::scServiceUnavailable, tunnelState->request.getRaw());
-        }
-        *tunnelState->status_ptr = err->httpStatus;
-        err->callback = tunnelErrorComplete;
-        err->callback_data = tunnelState;
-        errorSend(tunnelState->client.conn, err);
-        return;
+        if (savedError)
+            return sendError(savedError, "all found paths have failed");
+
+        return sendError(new ErrorState(ERR_CANNOT_FORWARD, Http::scServiceUnavailable, request.getRaw()),
+                         "path selection found no paths");
     }
-    delete err;
+    // else continue to use one of the previously noted destinations;
+    // if all of them fail, tunneling as whole will fail
+    Must(!selectionError); // finding at least one path means selection succeeded
+}
 
-    if (tunnelState->request != NULL)
-        tunnelState->request->hier.startPeerClock();
+/// remembers an error to be used if there will be no more connection attempts
+void
+TunnelStateData::saveError(ErrorState *error)
+{
+    debugs(26, 4, savedError << " ? " << error);
+    assert(error);
+    delete savedError; // may be nil
+    savedError = error;
+}
+
+/// Starts sending the given error message to the client, leading to the
+/// eventual transaction termination. Call with savedError to send savedError.
+void
+TunnelStateData::sendError(ErrorState *finalError, const char *reason)
+{
+    debugs(26, 3, "aborting transaction for " << reason);
+
+    if (request)
+        request->hier.stopPeerClock(false);
+
+    assert(finalError);
 
-    debugs(26, 3, "paths=" << peer_paths->size() << ", p[0]={" << (*peer_paths)[0] << "}, serverDest[0]={" <<
-           tunnelState->serverDestinations[0] << "}");
+    // get rid of any cached error unless that is what the caller is sending
+    if (savedError != finalError)
+        delete savedError; // may be nil
+    savedError = nullptr;
 
-    tunnelState->startConnecting();
+    // we cannot try other destinations after responding with an error
+    PeerSelectionInitiator::subscribed = false; // may already be false
+
+    *status_ptr = finalError->httpStatus;
+    finalError->callback = tunnelErrorComplete;
+    finalError->callback_data = this;
+    errorSend(client.conn, finalError);
 }
 
 void
 TunnelStateData::startConnecting()
 {
+    if (request)
+        request->hier.startPeerClock();
+
+    assert(!serverDestinations.empty());
     Comm::ConnectionPointer &dest = serverDestinations.front();
+    debugs(26, 3, "to " << dest);
+
+    if (dest->peerType == PINNED) {
+        Comm::ConnectionPointer serverConn = borrowPinnedConnection(request.getRaw(), dest);
+        debugs(26,7, "pinned peer connection: " << serverConn);
+        if (Comm::IsConnOpen(serverConn)) {
+            tunnelConnectDone(serverConn, Comm::OK, 0, (void *)this);
+            return;
+        }
+        // a PINNED path failure is fatal; do not wait for more paths
+        sendError(new ErrorState(ERR_CANNOT_FORWARD, Http::scServiceUnavailable, request.getRaw()),
+                  "pinned path failure");
+        return;
+    }
+
     GetMarkingsToServer(request.getRaw(), *dest);
 
     const time_t connectTimeout = dest->connectTimeout(startTime);