From: Eduard Bagdasaryan Date: Fri, 13 May 2022 03:10:32 +0000 (+0000) Subject: Preserve caller context across (and improve) deferred reads (#1025) X-Git-Tag: SQUID_6_0_1~187 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=a928fdfda5ada32c76878b8ef3bde78c808a0bed;p=thirdparty%2Fsquid.git Preserve caller context across (and improve) deferred reads (#1025) The transaction context was not saved/restored when dealing with deferred reads initiated by events like the DelayPools::Update() event. To fix this, we refactored MemObject::delayRead() and its descendants to use an AsyncCall, which automatically stores/restores code context. Using explicit async callbacks highlighted the danger of passing Connection object via CommRead that does not maintain a closure callback. There was also a related "stuck transaction" suspicion documented in DeferredReadManager::kickARead(). Fortunately, all these problems could now be solved by removing DeferredRead and CommRead classes! The delayed readers already store the Connection object, maintain closure callbacks, and have to check stored Connection validity before reading anyway. The general/centralized delayed reading logic is not really about reading and Connections (those parts are handled by transaction-specific code) but about triggering reading attempts. Asynchronous calls are perfect (and sufficient) for doing that. Also fixed Delay Pools for Gopher: delayAwareRead() was initiated only once from gopherSendComplete() and the subsequent read calls were delay-unaware (i.e. immediate) reads. Also fixed a Delay Pools problem with active transactions: A transaction started with Delay Pools on becomes stuck if a reconfiguration turns Delay Pools off. Also refactored the existing AsyncCall FIFO intrusive storage, making its reuse possible (and marked one candidate with a TODO). --- diff --git a/src/CommRead.h b/src/CommRead.h deleted file mode 100644 index b0459d78e1..0000000000 --- a/src/CommRead.h +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright (C) 1996-2022 The Squid Software Foundation and contributors - * - * Squid software is distributed under GPLv2+ license and includes - * contributions from numerous individuals and organizations. - * Please see the COPYING and CONTRIBUTORS files for details. - */ - -/* DEBUG: section 05 Comm */ - -#ifndef COMMREAD_H -#define COMMREAD_H - -#include "base/CbDataList.h" -#include "comm.h" -#include "comm/forward.h" -#include "CommCalls.h" - -class CommRead -{ - -public: - CommRead(); - CommRead(const Comm::ConnectionPointer &c, char *buf, int len, AsyncCall::Pointer &callback); - Comm::ConnectionPointer conn; - char *buf; - int len; - AsyncCall::Pointer callback; -}; - -class DeferredRead -{ - -public: - typedef void DeferrableRead(void *context, CommRead const &); - DeferredRead (); - DeferredRead (DeferrableRead *, void *, CommRead const &); - void markCancelled(); - DeferrableRead *theReader; - void *theContext; - CommRead theRead; - bool cancelled; - AsyncCall::Pointer closer; ///< internal close handler used by Comm - -private: -}; - -class DeferredReadManager -{ - -public: - ~DeferredReadManager(); - void delayRead(DeferredRead const &); - void kickReads(int const count); - -private: - static CLCB CloseHandler; - static DeferredRead popHead(CbDataListContainer &deferredReads); - void kickARead(DeferredRead const &); - void flushReads(); - CbDataListContainer deferredReads; -}; - -#endif /* COMMREAD_H */ - diff --git a/src/CompositePoolNode.h b/src/CompositePoolNode.h index add27d2d29..e00df8ff22 100644 --- a/src/CompositePoolNode.h +++ b/src/CompositePoolNode.h @@ -13,7 +13,7 @@ #if USE_DELAY_POOLS #include "auth/UserRequest.h" -#include "CommRead.h" +#include "base/DelayedAsyncCalls.h" #include "DelayIdComposite.h" #include "DelayPools.h" #include "ip/Address.h" @@ -37,7 +37,7 @@ public: class CompositeSelectionDetails; virtual DelayIdComposite::Pointer id(CompositeSelectionDetails &) = 0; - void delayRead(DeferredRead const &); + void delayRead(const AsyncCallPointer &); /// \ingroup DelayPoolsAPI class CompositeSelectionDetails @@ -55,7 +55,7 @@ public: protected: void kickReads(); - DeferredReadManager deferredReads; + DelayedAsyncCalls deferredReads; }; #endif /* USE_DELAY_POOLS */ diff --git a/src/DelayId.cc b/src/DelayId.cc index 961ea89ef0..67da4a6387 100644 --- a/src/DelayId.cc +++ b/src/DelayId.cc @@ -15,8 +15,8 @@ */ #if USE_DELAY_POOLS #include "acl/FilledChecklist.h" +#include "base/DelayedAsyncCalls.h" #include "client_side_request.h" -#include "CommRead.h" #include "DelayId.h" #include "DelayPool.h" #include "DelayPools.h" @@ -164,7 +164,7 @@ DelayId::bytesIn(int qty) } void -DelayId::delayRead(DeferredRead const &aRead) +DelayId::delayRead(const AsyncCall::Pointer &aRead) { assert (compositeId != NULL); compositeId->delayRead(aRead); diff --git a/src/DelayId.h b/src/DelayId.h index adf9e4cbec..c7249db45f 100644 --- a/src/DelayId.h +++ b/src/DelayId.h @@ -11,6 +11,7 @@ #if USE_DELAY_POOLS +#include "base/forward.h" #include "DelayIdComposite.h" class ClientHttpRequest; @@ -34,7 +35,7 @@ public: int bytesWanted(int min, int max) const; void bytesIn (int qty); void setNoDelay(bool const); - void delayRead(DeferredRead const &); + void delayRead(const AsyncCallPointer &); private: unsigned short pool_; diff --git a/src/DelayIdComposite.h b/src/DelayIdComposite.h index 61be1bbd2a..e6c0a53e69 100644 --- a/src/DelayIdComposite.h +++ b/src/DelayIdComposite.h @@ -12,11 +12,10 @@ #define DELAYIDCOMPOSITE_H #if USE_DELAY_POOLS +#include "base/forward.h" #include "base/RefCount.h" #include "fatal.h" -class DeferredRead; - class DelayIdComposite : public RefCountable { @@ -27,7 +26,7 @@ public: virtual int bytesWanted (int min, int max) const =0; virtual void bytesIn(int qty) = 0; /* only aggregate and vector need this today */ - virtual void delayRead(DeferredRead const &) {fatal("Not implemented");} + virtual void delayRead(const AsyncCallPointer &) { fatal("Not implemented"); } }; #endif /* USE_DELAY_POOLS */ diff --git a/src/DelayPool.cc b/src/DelayPool.cc index f7f3214d6f..e684ab08d7 100644 --- a/src/DelayPool.cc +++ b/src/DelayPool.cc @@ -77,9 +77,9 @@ DelayPool::freeData() // TODO: create DelayIdComposite.cc void -CompositePoolNode::delayRead(DeferredRead const &aRead) +CompositePoolNode::delayRead(const AsyncCall::Pointer &aRead) { - deferredReads.delayRead(aRead); + deferredReads.delay(aRead); } #include "comm.h" @@ -87,7 +87,7 @@ CompositePoolNode::delayRead(DeferredRead const &aRead) void CompositePoolNode::kickReads() { - deferredReads.kickReads(-1); + deferredReads.schedule(); } #endif /* USE_DELAY_POOLS */ diff --git a/src/DelayTagged.cc b/src/DelayTagged.cc index c016ec811c..bb55d05473 100644 --- a/src/DelayTagged.cc +++ b/src/DelayTagged.cc @@ -165,7 +165,7 @@ DelayTagged::Id::bytesIn(int qty) } void -DelayTagged::Id::delayRead(DeferredRead const &aRead) +DelayTagged::Id::delayRead(const AsyncCall::Pointer &aRead) { theTagged->delayRead(aRead); } diff --git a/src/DelayTagged.h b/src/DelayTagged.h index 7a1e58fdb8..5061bce6c8 100644 --- a/src/DelayTagged.h +++ b/src/DelayTagged.h @@ -14,6 +14,7 @@ #if USE_DELAY_POOLS #include "auth/Gadgets.h" +#include "base/forward.h" #include "CompositePoolNode.h" #include "DelayBucket.h" #include "DelayIdComposite.h" @@ -64,7 +65,7 @@ private: ~Id(); virtual int bytesWanted (int min, int max) const; virtual void bytesIn(int qty); - virtual void delayRead(DeferredRead const &); + virtual void delayRead(const AsyncCallPointer &); private: RefCount theTagged; diff --git a/src/DelayVector.cc b/src/DelayVector.cc index 342b441fef..e09249eaad 100644 --- a/src/DelayVector.cc +++ b/src/DelayVector.cc @@ -11,8 +11,9 @@ #include "squid.h" #if USE_DELAY_POOLS +#include "base/AsyncCall.h" +#include "base/DelayedAsyncCalls.h" #include "comm/Connection.h" -#include "CommRead.h" #include "DelayVector.h" DelayVector::DelayVector() @@ -126,7 +127,7 @@ DelayVector::Id::bytesIn(int qty) } void -DelayVector::Id::delayRead(DeferredRead const &aRead) +DelayVector::Id::delayRead(const AsyncCallPointer &aRead) { theVector->delayRead(aRead); } diff --git a/src/DelayVector.h b/src/DelayVector.h index 1b2bd54f03..d0243721b5 100644 --- a/src/DelayVector.h +++ b/src/DelayVector.h @@ -11,6 +11,7 @@ #if USE_DELAY_POOLS +#include "base/forward.h" #include "CompositePoolNode.h" /// \ingroup DelayPoolsAPI @@ -42,7 +43,7 @@ private: ~Id(); virtual int bytesWanted (int min, int max) const; virtual void bytesIn(int qty); - virtual void delayRead(DeferredRead const &); + virtual void delayRead(const AsyncCallPointer &); private: RefCount theVector; diff --git a/src/DiskIO/DiskDaemon/DiskdIOStrategy.cc b/src/DiskIO/DiskDaemon/DiskdIOStrategy.cc index 8da369263c..bb450a4c64 100644 --- a/src/DiskIO/DiskDaemon/DiskdIOStrategy.cc +++ b/src/DiskIO/DiskDaemon/DiskdIOStrategy.cc @@ -9,6 +9,7 @@ /* DEBUG: section 79 Squid-side DISKD I/O functions. */ #include "squid.h" +#include "comm.h" #include "comm/Loops.h" #include "ConfigOption.h" #include "diomsg.h" diff --git a/src/DiskIO/DiskThreads/DiskThreadsDiskFile.cc b/src/DiskIO/DiskThreads/DiskThreadsDiskFile.cc index 6f552ab414..275509e780 100644 --- a/src/DiskIO/DiskThreads/DiskThreadsDiskFile.cc +++ b/src/DiskIO/DiskThreads/DiskThreadsDiskFile.cc @@ -9,6 +9,7 @@ /* DEBUG: section 79 Disk IO Routines */ #include "squid.h" +#include "comm.h" #include "DiskIO/IORequestor.h" #include "DiskIO/ReadRequest.h" #include "DiskIO/WriteRequest.h" diff --git a/src/Makefile.am b/src/Makefile.am index faf9a1d0ab..153064aa14 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -227,7 +227,6 @@ squid_SOURCES = \ CollapsedForwarding.cc \ CollapsedForwarding.h \ CollapsingHistory.h \ - CommRead.h \ CommandLine.cc \ CommandLine.h \ ConfigOption.cc \ @@ -2775,7 +2774,8 @@ nodist_tests_testEventLoop_SOURCES = \ EventLoop.cc \ tests/stub_debug.cc \ tests/stub_fatal.cc \ - tests/stub_libtime.cc + tests/stub_libtime.cc \ + tests/stub_SBuf.cc tests_testEventLoop_LDADD = \ base/libbase.la \ $(LIBCPPUNIT_LIBS) \ diff --git a/src/MemObject.cc b/src/MemObject.cc index 6ac49c2fb1..d0d70ec621 100644 --- a/src/MemObject.cc +++ b/src/MemObject.cc @@ -438,7 +438,7 @@ MemObject::setNoDelay(bool const newValue) } void -MemObject::delayRead(DeferredRead const &aRead) +MemObject::delayRead(const AsyncCall::Pointer &aRead) { #if USE_DELAY_POOLS if (readAheadPolicyCanRead()) { @@ -448,13 +448,13 @@ MemObject::delayRead(DeferredRead const &aRead) } } #endif - deferredReads.delayRead(aRead); + deferredReads.delay(aRead); } void MemObject::kickReads() { - deferredReads.kickReads(-1); + deferredReads.schedule(); } #if USE_DELAY_POOLS diff --git a/src/MemObject.h b/src/MemObject.h index 1b9b6f0572..e752b32748 100644 --- a/src/MemObject.h +++ b/src/MemObject.h @@ -9,7 +9,7 @@ #ifndef SQUID_MEMOBJECT_H #define SQUID_MEMOBJECT_H -#include "CommRead.h" +#include "base/DelayedAsyncCalls.h" #include "dlink.h" #include "http/RequestMethod.h" #include "RemovalPolicy.h" @@ -192,7 +192,7 @@ public: PeerSelector *ircb_data = nullptr; /// used for notifying StoreEntry writers about 3rd-party initiated aborts - AsyncCall::Pointer abortCallback; + AsyncCallPointer abortCallback; RemovalPolicyNode repl; int id = 0; int64_t object_sz = -1; @@ -203,7 +203,7 @@ public: SBuf vary_headers; - void delayRead(DeferredRead const &); + void delayRead(const AsyncCallPointer &); void kickReads(); private: @@ -213,7 +213,7 @@ private: mutable String storeId_; ///< StoreId for our entry (usually request URI) mutable String logUri_; ///< URI used for logging (usually request URI) - DeferredReadManager deferredReads; + DelayedAsyncCalls deferredReads; }; /** global current memory removal policy */ diff --git a/src/Store.h b/src/Store.h index 937172448e..f407fb80e3 100644 --- a/src/Store.h +++ b/src/Store.h @@ -9,11 +9,11 @@ #ifndef SQUID_STORE_H #define SQUID_STORE_H +#include "base/DelayedAsyncCalls.h" #include "base/Packable.h" #include "base/Range.h" #include "base/RefCount.h" #include "comm/forward.h" -#include "CommRead.h" #include "hash.h" #include "http/forward.h" #include "http/RequestMethod.h" @@ -42,7 +42,6 @@ class StoreEntry : public hash_link, public Packable { public: - static DeferredRead::DeferrableRead DeferReader; bool checkDeferRead(int fd) const; const char *getMD5Text() const; @@ -171,8 +170,6 @@ public: void destroyMemObject(); int checkTooSmall(); - void delayAwareRead(const Comm::ConnectionPointer &conn, char *buf, int len, AsyncCall::Pointer callback); - void setNoDelay (bool const); void lastModified(const time_t when) { lastModified_ = when; } /// \returns entry's 'effective' modification time diff --git a/src/adaptation/icap/ModXact.cc b/src/adaptation/icap/ModXact.cc index ede6822c40..a5d955c284 100644 --- a/src/adaptation/icap/ModXact.cc +++ b/src/adaptation/icap/ModXact.cc @@ -436,7 +436,7 @@ void Adaptation::Icap::ModXact::virginConsume() const bool wantToPostpone = isRepeatable || canStartBypass || protectGroupBypass; // Why > 2? HttpState does not use the last bytes in the buffer - // because delayAwareRead() is arguably broken. See + // because Client::delayRead() is arguably broken. See // HttpStateData::maybeReadVirginBody for more details. if (wantToPostpone && bp.buf().spaceSize() > 2) { // Postponing may increase memory footprint and slow the HTTP side diff --git a/src/base/AsyncCall.h b/src/base/AsyncCall.h index 1b3a32b027..c88f221f59 100644 --- a/src/base/AsyncCall.h +++ b/src/base/AsyncCall.h @@ -35,13 +35,11 @@ */ class CallDialer; -class AsyncCallQueue; class AsyncCall: public RefCountable { public: typedef RefCount Pointer; - friend class AsyncCallQueue; AsyncCall(int aDebugSection, int aDebugLevel, const char *aName); virtual ~AsyncCall(); @@ -83,7 +81,7 @@ protected: virtual void fire() = 0; - AsyncCall::Pointer theNext; // used exclusively by AsyncCallQueue + AsyncCall::Pointer theNext; ///< for AsyncCallList and similar lists private: const char *isCanceled; // set to the cancellation reason by cancel() diff --git a/src/base/AsyncCallList.cc b/src/base/AsyncCallList.cc new file mode 100644 index 0000000000..faa0173111 --- /dev/null +++ b/src/base/AsyncCallList.cc @@ -0,0 +1,48 @@ +/* + * Copyright (C) 1996-2022 The Squid Software Foundation and contributors + * + * Squid software is distributed under GPLv2+ license and includes + * contributions from numerous individuals and organizations. + * Please see the COPYING and CONTRIBUTORS files for details. + */ + +#include "squid.h" +#include "base/Assure.h" +#include "base/AsyncCall.h" +#include "base/AsyncCallList.h" + +void +AsyncCallList::add(const AsyncCall::Pointer &call) +{ + Assure(call); + Assure(!call->Next()); + if (tail) { // append to the existing list + Assure(head); + Assure(!tail->Next()); + tail->setNext(call); + tail = call; + } else { // create a list from scratch + Assure(!head); + head = tail = call; + } + ++length; + Assure(length); // no overflows +} + +AsyncCall::Pointer +AsyncCallList::extract() +{ + if (!head) + return AsyncCallPointer(); + + Assure(tail); + Assure(length); + const auto call = head; + head = call->Next(); + call->setNext(nullptr); + if (tail == call) + tail = nullptr; + --length; + return call; +} + diff --git a/src/base/AsyncCallList.h b/src/base/AsyncCallList.h new file mode 100644 index 0000000000..8a8ad0c10c --- /dev/null +++ b/src/base/AsyncCallList.h @@ -0,0 +1,43 @@ +/* + * Copyright (C) 1996-2022 The Squid Software Foundation and contributors + * + * Squid software is distributed under GPLv2+ license and includes + * contributions from numerous individuals and organizations. + * Please see the COPYING and CONTRIBUTORS files for details. + */ + +#ifndef SQUID_BASE_ASYNCCALLLIST_H +#define SQUID_BASE_ASYNCCALLLIST_H + +#include "base/forward.h" +#include "base/RefCount.h" + +/// An efficient (but intrusive) AsyncCall storage preserving FIFO order. +/// A given AsyncCall object may reside in at most one such storage. +class AsyncCallList +{ +public: + AsyncCallList() = default; + // prohibit copying: no AsyncCall should be present in two lists + AsyncCallList(const AsyncCallList &) = delete; + AsyncCallList &operator=(const AsyncCallList &) = delete; + + /// stores the given async call + void add(const AsyncCallPointer &); + + /// removes the earliest add()-ed call that is still stored (if any) + /// \returns the removed call (or nil) + /// \retval nil means the list stored no calls at extract() time + AsyncCallPointer extract(); + + /// the number of currently stored calls + size_t size() const { return length; } + +private: + AsyncCallPointer head; ///< the earliest still-stored call (or nil) + AsyncCallPointer tail; ///< the latest still-stored call (or nil) + size_t length = 0; ///< \copydoc size() +}; + +#endif /* SQUID_BASE_ASYNCCALLLIST_H */ + diff --git a/src/base/AsyncCallQueue.cc b/src/base/AsyncCallQueue.cc index fdfb2d8473..6c4f3e5737 100644 --- a/src/base/AsyncCallQueue.cc +++ b/src/base/AsyncCallQueue.cc @@ -15,52 +15,23 @@ AsyncCallQueue *AsyncCallQueue::TheInstance = 0; -AsyncCallQueue::AsyncCallQueue(): theHead(NULL), theTail(NULL) -{ -} - -void AsyncCallQueue::schedule(AsyncCall::Pointer &call) -{ - assert(call != NULL); - assert(!call->theNext); - if (theHead != NULL) { // append - assert(!theTail->theNext); - theTail->theNext = call; - theTail = call; - } else { // create queue from cratch - theHead = theTail = call; - } -} - // Fire all scheduled calls; returns true if at least one call was fired. // The calls may be added while the current call is in progress. bool AsyncCallQueue::fire() { - const bool made = theHead != NULL; - while (theHead) { - CodeContext::Reset(theHead->codeContext); - fireNext(); + const auto made = scheduled.size() > 0; + while (const auto call = scheduled.extract()) { + CodeContext::Reset(call->codeContext); + debugs(call->debugSection, call->debugLevel, "entering " << *call); + call->make(); + debugs(call->debugSection, call->debugLevel, "leaving " << *call); } if (made) CodeContext::Reset(); return made; } -void -AsyncCallQueue::fireNext() -{ - AsyncCall::Pointer call = theHead; - theHead = call->theNext; - call->theNext = NULL; - if (theTail == call) - theTail = NULL; - - debugs(call->debugSection, call->debugLevel, "entering " << *call); - call->make(); - debugs(call->debugSection, call->debugLevel, "leaving " << *call); -} - AsyncCallQueue & AsyncCallQueue::Instance() { diff --git a/src/base/AsyncCallQueue.h b/src/base/AsyncCallQueue.h index 6e13996da3..0a31be8fcb 100644 --- a/src/base/AsyncCallQueue.h +++ b/src/base/AsyncCallQueue.h @@ -9,9 +9,8 @@ #ifndef SQUID_ASYNCCALLQUEUE_H #define SQUID_ASYNCCALLQUEUE_H -#include "base/AsyncCall.h" - -//class AsyncCall; +#include "base/AsyncCallList.h" +#include "base/forward.h" // The queue of asynchronous calls. All calls are fired during a single main // loop iteration until the queue is exhausted @@ -22,18 +21,15 @@ public: static AsyncCallQueue &Instance(); // make this async call when we get a chance - void schedule(AsyncCall::Pointer &call); + void schedule(const AsyncCallPointer &call) { scheduled.add(call); } // fire all scheduled calls; returns true if at least one was fired bool fire(); private: - AsyncCallQueue(); - - void fireNext(); + AsyncCallQueue() = default; - AsyncCall::Pointer theHead; - AsyncCall::Pointer theTail; + AsyncCallList scheduled; ///< calls waiting to be fire()d, in FIFO order static AsyncCallQueue *TheInstance; }; diff --git a/src/base/DelayedAsyncCalls.cc b/src/base/DelayedAsyncCalls.cc new file mode 100644 index 0000000000..ddd2a9ffdc --- /dev/null +++ b/src/base/DelayedAsyncCalls.cc @@ -0,0 +1,27 @@ +/* + * Copyright (C) 1996-2022 The Squid Software Foundation and contributors + * + * Squid software is distributed under GPLv2+ license and includes + * contributions from numerous individuals and organizations. + * Please see the COPYING and CONTRIBUTORS files for details. + */ + +#include "squid.h" +#include "base/AsyncCall.h" +#include "base/DelayedAsyncCalls.h" +#include "debug/Stream.h" + +void +DelayedAsyncCalls::delay(const AsyncCall::Pointer &call) +{ + debugs(5, 3, call << " after " << deferredReads.size()); + deferredReads.add(call); +} + +void +DelayedAsyncCalls::schedule() +{ + while (auto call = deferredReads.extract()) + ScheduleCallHere(call); +} + diff --git a/src/base/DelayedAsyncCalls.h b/src/base/DelayedAsyncCalls.h new file mode 100644 index 0000000000..affe2ad539 --- /dev/null +++ b/src/base/DelayedAsyncCalls.h @@ -0,0 +1,33 @@ +/* + * Copyright (C) 1996-2022 The Squid Software Foundation and contributors + * + * Squid software is distributed under GPLv2+ license and includes + * contributions from numerous individuals and organizations. + * Please see the COPYING and CONTRIBUTORS files for details. + */ + +#ifndef SQUID_BASE_DELAYEDASYNCCALLS_H +#define SQUID_BASE_DELAYEDASYNCCALLS_H + +#include "base/AsyncCallList.h" + +/// a FIFO list of async calls, all to be scheduled in FIFO order (on demand via +/// the schedule() method or automatically at object destruction time) +class DelayedAsyncCalls +{ +public: + ~DelayedAsyncCalls() { schedule(); } + + /// stores the given call to schedule it at schedule() or destruction time + void delay(const AsyncCallPointer &); + + /// schedules and forgets all async calls previously stored by delay() + void schedule(); + +private: + /// delay()-ed calls waiting to be scheduled, in delay() call order + AsyncCallList deferredReads; +}; + +#endif /* SQUID_BASE_DELAYEDASYNCCALLS_H */ + diff --git a/src/base/Makefile.am b/src/base/Makefile.am index 90b70855c5..9f7591d6c2 100644 --- a/src/base/Makefile.am +++ b/src/base/Makefile.am @@ -15,6 +15,8 @@ libbase_la_SOURCES = \ Assure.h \ AsyncCall.cc \ AsyncCall.h \ + AsyncCallList.cc \ + AsyncCallList.h \ AsyncCallQueue.cc \ AsyncCallQueue.h \ AsyncCbdataCalls.h \ @@ -30,6 +32,8 @@ libbase_la_SOURCES = \ ClpMap.h \ CodeContext.cc \ CodeContext.h \ + DelayedAsyncCalls.cc \ + DelayedAsyncCalls.h \ EnumIterator.h \ File.cc \ File.h \ diff --git a/src/base/forward.h b/src/base/forward.h index 4535474049..ba5b57519a 100644 --- a/src/base/forward.h +++ b/src/base/forward.h @@ -9,10 +9,12 @@ #ifndef SQUID_SRC_BASE_FORWARD_H #define SQUID_SRC_BASE_FORWARD_H +class AsyncCall; class AsyncCallQueue; class AsyncJob; class CallDialer; class CodeContext; +class DelayedAsyncCalls; class ScopedId; class BadOptionalAccess; class Raw; @@ -26,6 +28,7 @@ template class JobWait; typedef CbcPointer AsyncJobPointer; typedef RefCount CodeContextPointer; +using AsyncCallPointer = RefCount; #endif /* SQUID_SRC_BASE_FORWARD_H */ diff --git a/src/clients/Client.cc b/src/clients/Client.cc index 530dbbabc4..bf3bfd77de 100644 --- a/src/clients/Client.cc +++ b/src/clients/Client.cc @@ -1025,6 +1025,15 @@ Client::adjustBodyBytesRead(const int64_t delta) Must(bodyBytesRead >= 0); } +void +Client::delayRead() +{ + using DeferredReadDialer = NullaryMemFunT; + AsyncCall::Pointer call = asyncCall(11, 5, "Client::noteDelayAwareReadChance", + DeferredReadDialer(this, &Client::noteDelayAwareReadChance)); + entry->mem().delayRead(call); +} + void Client::addVirginReplyBody(const char *data, ssize_t len) { diff --git a/src/clients/Client.h b/src/clients/Client.h index be79b395e4..8b2598324a 100644 --- a/src/clients/Client.h +++ b/src/clients/Client.h @@ -113,6 +113,10 @@ protected: /// whether we may receive more virgin response body bytes virtual bool mayReadVirginReplyBody() const = 0; + /// Called when a previously delayed dataConnection() read may be possible. + /// \sa delayRead() + virtual void noteDelayAwareReadChance() = 0; + /// Entry-dependent callbacks use this check to quit if the entry went bad bool abortOnBadEntry(const char *abortReason); @@ -160,6 +164,10 @@ protected: void adjustBodyBytesRead(const int64_t delta); + /// Defer reading until it is likely to become possible. + /// Eventually, noteDelayAwareReadChance() will be called. + void delayRead(); + // These should be private int64_t currentOffset = 0; /**< Our current offset in the StoreEntry */ MemBuf *responseBodyBuffer = nullptr; /**< Data temporarily buffered for ICAP */ diff --git a/src/clients/FtpClient.cc b/src/clients/FtpClient.cc index 3442f793b0..afd9cd5149 100644 --- a/src/clients/FtpClient.cc +++ b/src/clients/FtpClient.cc @@ -10,6 +10,8 @@ #include "squid.h" #include "acl/FilledChecklist.h" +#include "base/AsyncJobCalls.h" +#include "base/Range.h" #include "client_side.h" #include "clients/FtpClient.h" #include "comm/ConnOpener.h" @@ -902,6 +904,13 @@ Ftp::Client::dataConnection() const return data.conn; } +void +Ftp::Client::noteDelayAwareReadChance() +{ + data.read_pending = false; + maybeReadVirginBody(); +} + void Ftp::Client::maybeReadVirginBody() { @@ -930,9 +939,16 @@ Ftp::Client::maybeReadVirginBody() debugs(9,5,"queueing read on FD " << data.conn->fd); - typedef CommCbMemFunT Dialer; - entry->delayAwareRead(data.conn, data.readBuf->space(), read_sz, - JobCallback(9, 5, Dialer, this, Ftp::Client::dataRead)); + const auto amountToRead = entry->bytesWanted(Range(0, read_sz)); + + if (amountToRead <= 0) { + delayRead(); + return; + } + + using ReadDialer = CommCbMemFunT; + AsyncCall::Pointer readCallback = JobCallback(9, 5, ReadDialer, this, Client::dataRead); + comm_read(data.conn, data.readBuf->space(), amountToRead, readCallback); } void diff --git a/src/clients/FtpClient.h b/src/clients/FtpClient.h index 4b2dd61d54..10c0e176ff 100644 --- a/src/clients/FtpClient.h +++ b/src/clients/FtpClient.h @@ -186,6 +186,7 @@ protected: virtual bool doneWithServer() const; virtual const Comm::ConnectionPointer & dataConnection() const; virtual void abortAll(const char *reason); + virtual void noteDelayAwareReadChance(); virtual Http::StatusCode failedHttpStatus(err_type &error); void ctrlClosed(const CommCloseCbParams &io); diff --git a/src/comm.cc b/src/comm.cc index d5b30528b9..4bfab57d6c 100644 --- a/src/comm.cc +++ b/src/comm.cc @@ -18,7 +18,6 @@ #include "comm/Read.h" #include "comm/TcpAcceptor.h" #include "comm/Write.h" -#include "CommRead.h" #include "compat/cmsg.h" #include "DescriptorSet.h" #include "event.h" @@ -808,8 +807,8 @@ comm_close_complete(const FdeCbParams ¶ms) * + call read handlers with ERR_CLOSING * + call closing handlers * - * NOTE: Comm::ERR_CLOSING will NOT be called for CommReads' sitting in a - * DeferredReadManager. + * A deferred reader has no Comm read handler mentioned above. To stay in sync, + * such a reader must register a Comm closing handler. */ void _comm_close(int fd, char const *file, int line) @@ -939,6 +938,8 @@ comm_add_close_handler(int fd, AsyncCall::Pointer &call) // for (c = fd_table[fd].closeHandler; c; c = c->next) // assert(c->handler != handler || c->data != data); + // TODO: Consider enhancing AsyncCallList to support random-access close + // handlers, perhaps after upgrading the remaining legacy CLCB handlers. call->setNext(fd_table[fd].closeHandler); fd_table[fd].closeHandler = call; @@ -1644,149 +1645,6 @@ commHalfClosedReader(const Comm::ConnectionPointer &conn, char *, size_t size, C commPlanHalfClosedCheck(); // make sure this fd will be checked again } -CommRead::CommRead() : conn(NULL), buf(NULL), len(0), callback(NULL) {} - -CommRead::CommRead(const Comm::ConnectionPointer &c, char *buf_, int len_, AsyncCall::Pointer &callback_) - : conn(c), buf(buf_), len(len_), callback(callback_) {} - -DeferredRead::DeferredRead () : theReader(NULL), theContext(NULL), theRead(), cancelled(false) {} - -DeferredRead::DeferredRead (DeferrableRead *aReader, void *data, CommRead const &aRead) : theReader(aReader), theContext (data), theRead(aRead), cancelled(false) {} - -DeferredReadManager::~DeferredReadManager() -{ - flushReads(); - assert (deferredReads.empty()); -} - -/* explicit instantiation required for some systems */ - -/// \cond AUTODOCS_IGNORE -template cbdata_type CbDataList::CBDATA_CbDataList; -/// \endcond - -void -DeferredReadManager::delayRead(DeferredRead const &aRead) -{ - debugs(5, 3, "Adding deferred read on " << aRead.theRead.conn); - CbDataList *temp = deferredReads.push_back(aRead); - - // We have to use a global function as a closer and point to temp - // instead of "this" because DeferredReadManager is not a job and - // is not even cbdata protected - // XXX: and yet we use cbdata protection functions on it?? - AsyncCall::Pointer closer = commCbCall(5,4, - "DeferredReadManager::CloseHandler", - CommCloseCbPtrFun(&CloseHandler, temp)); - comm_add_close_handler(aRead.theRead.conn->fd, closer); - temp->element.closer = closer; // remember so that we can cancel -} - -void -DeferredReadManager::CloseHandler(const CommCloseCbParams ¶ms) -{ - if (!cbdataReferenceValid(params.data)) - return; - - CbDataList *temp = (CbDataList *)params.data; - - temp->element.closer = NULL; - if (temp->element.theRead.conn) { - temp->element.theRead.conn->noteClosure(); - temp->element.theRead.conn = nullptr; - } - temp->element.markCancelled(); -} - -DeferredRead -DeferredReadManager::popHead(CbDataListContainer &deferredReads) -{ - assert (!deferredReads.empty()); - - DeferredRead &read = deferredReads.head->element; - - // NOTE: at this point the connection has been paused/stalled for an unknown - // amount of time. We must re-validate that it is active and usable. - - // If the connection has been closed already. Cancel this read. - if (!fd_table || !Comm::IsConnOpen(read.theRead.conn)) { - if (read.closer != NULL) { - read.closer->cancel("Connection closed before."); - read.closer = NULL; - } - read.markCancelled(); - } - - if (!read.cancelled) { - comm_remove_close_handler(read.theRead.conn->fd, read.closer); - read.closer = NULL; - } - - DeferredRead result = deferredReads.pop_front(); - - return result; -} - -void -DeferredReadManager::kickReads(int const count) -{ - /* if we had CbDataList::size() we could consolidate this and flushReads */ - - if (count < 1) { - flushReads(); - return; - } - - size_t remaining = count; - - while (!deferredReads.empty() && remaining) { - DeferredRead aRead = popHead(deferredReads); - kickARead(aRead); - - if (!aRead.cancelled) - --remaining; - } -} - -void -DeferredReadManager::flushReads() -{ - CbDataListContainer reads; - reads = deferredReads; - deferredReads = CbDataListContainer(); - - // XXX: For fairness this SHOULD randomize the order - while (!reads.empty()) { - DeferredRead aRead = popHead(reads); - kickARead(aRead); - } -} - -void -DeferredReadManager::kickARead(DeferredRead const &aRead) -{ - if (aRead.cancelled) - return; - - // TODO: This check still allows theReader call with a closed theRead.conn. - // If a delayRead() caller has a close connection handler, then such a call - // would be useless and dangerous. If a delayRead() caller does not have it, - // then the caller will get stuck when an external connection closure makes - // aRead.cancelled (checked above) true. - if (Comm::IsConnOpen(aRead.theRead.conn) && fd_table[aRead.theRead.conn->fd].closing()) - return; - - debugs(5, 3, "Kicking deferred read on " << aRead.theRead.conn); - - aRead.theReader(aRead.theContext, aRead.theRead); -} - -void -DeferredRead::markCancelled() -{ - cancelled = true; -} - int CommSelectEngine::checkEvents(int timeout) { diff --git a/src/delay_pools.cc b/src/delay_pools.cc index 457847a469..2ef5a75e52 100644 --- a/src/delay_pools.cc +++ b/src/delay_pools.cc @@ -70,7 +70,7 @@ private: AggregateId (RefCount); virtual int bytesWanted (int min, int max) const; virtual void bytesIn(int qty); - virtual void delayRead(DeferredRead const &); + virtual void delayRead(const AsyncCallPointer &); private: RefCount theAggregate; @@ -239,7 +239,7 @@ protected: }; void -Aggregate::AggregateId::delayRead(DeferredRead const &aRead) +Aggregate::AggregateId::delayRead(const AsyncCall::Pointer &aRead) { theAggregate->delayRead(aRead); } @@ -475,7 +475,6 @@ DelayPools::InitDelayData() void DelayPools::FreeDelayData() { - eventDelete(DelayPools::Update, NULL); delete[] DelayPools::delay_data; pools_ = 0; } @@ -483,7 +482,9 @@ DelayPools::FreeDelayData() void DelayPools::Update(void *) { - if (!pools()) + // To prevent stuck transactions, stop updates only after no new transactions can + // register (because the pools were disabled) and the last registered transaction is gone. + if (!pools() && toUpdate.empty()) return; eventAdd("DelayPools::Update", Update, NULL, 1.0, 1); diff --git a/src/gopher.cc b/src/gopher.cc index 2006b4183a..2e16cd96ed 100644 --- a/src/gopher.cc +++ b/src/gopher.cc @@ -9,6 +9,7 @@ /* DEBUG: section 10 Gopher */ #include "squid.h" +#include "base/AsyncCbdataCalls.h" #include "comm.h" #include "comm/Read.h" #include "comm/Write.h" @@ -102,6 +103,9 @@ public: ~GopherStateData(); + /// queues or defers a read call + static void DelayAwareRead(GopherStateData *); + /// URL for icon to display (or nil), given the Gopher item-type code. /// The returned c-string is invalidated by the next call to this function. const char *iconUrl(char); @@ -806,12 +810,33 @@ gopherReadReply(const Comm::ConnectionPointer &conn, char *buf, size_t len, Comm } else { entry->append(buf, len); } - AsyncCall::Pointer call = commCbCall(5,4, "gopherReadReply", - CommIoCbPtrFun(gopherReadReply, gopherState)); - comm_read(conn, buf, read_sz, call); + GopherStateData::DelayAwareRead(gopherState); } } +void +GopherStateData::DelayAwareRead(GopherStateData *gopherState) +{ + const auto &conn = gopherState->serverConn; + + if (!Comm::IsConnOpen(conn) || fd_table[conn->fd].closing()) { + debugs(10, 3, "will not read from " << conn); + return; + } + + const auto amountToRead = gopherState->entry->bytesWanted(Range(0, BUFSIZ)); + + if (amountToRead <= 0) { + AsyncCall::Pointer delayCall = asyncCall(10, 3, "GopherStateData::DelayAwareRead", + cbdataDialer(&GopherStateData::DelayAwareRead, gopherState)); + gopherState->entry->mem().delayRead(delayCall); + return; + } + + AsyncCall::Pointer readCall = commCbCall(5, 5, "gopherReadReply", CommIoCbPtrFun(gopherReadReply, gopherState)); + comm_read(conn, gopherState->replybuf, amountToRead, readCall); +} + /** * This will be called when request write is complete. Schedule read of reply. */ @@ -879,10 +904,7 @@ gopherSendComplete(const Comm::ConnectionPointer &conn, char *, size_t size, Com entry->flush(); } - /* Schedule read reply. */ - AsyncCall::Pointer call = commCbCall(5,5, "gopherReadReply", - CommIoCbPtrFun(gopherReadReply, gopherState)); - entry->delayAwareRead(conn, gopherState->replybuf, BUFSIZ, call); + GopherStateData::DelayAwareRead(gopherState); } /** diff --git a/src/http.cc b/src/http.cc index a7a70ffc50..b587606cdd 100644 --- a/src/http.cc +++ b/src/http.cc @@ -16,6 +16,7 @@ #include "squid.h" #include "acl/FilledChecklist.h" #include "base/AsyncJobCalls.h" +#include "base/DelayedAsyncCalls.h" #include "base/Raw.h" #include "base/TextException.h" #include "base64.h" @@ -24,7 +25,6 @@ #include "comm/Connection.h" #include "comm/Read.h" #include "comm/Write.h" -#include "CommRead.h" #include "error/Detail.h" #include "errorpage.h" #include "fd.h" @@ -1164,12 +1164,11 @@ HttpStateData::persistentConnStatus() const return statusIfComplete(); } -static void -readDelayed(void *context, CommRead const &) +void +HttpStateData::noteDelayAwareReadChance() { - HttpStateData *state = static_cast(context); - state->flags.do_next_read = true; - state->maybeReadVirginBody(); + flags.do_next_read = true; + maybeReadVirginBody(); } void @@ -1207,9 +1206,7 @@ HttpStateData::readReply(const CommIoCbParams &io) rd.size = entry->bytesWanted(Range(0, inBuf.spaceSize())); if (rd.size <= 0) { - assert(entry->mem_obj); - AsyncCall::Pointer nilCall; - entry->mem_obj->delayRead(DeferredRead(readDelayed, this, CommRead(io.conn, NULL, 0, nilCall))); + delayRead(); return; } diff --git a/src/http.h b/src/http.h index c4161211f8..2c369786b1 100644 --- a/src/http.h +++ b/src/http.h @@ -75,6 +75,9 @@ public: void processSurrogateControl(HttpReply *); protected: + /* Client API */ + virtual void noteDelayAwareReadChance(); + void processReply(); void proceedAfter1xx(); void handle1xx(HttpReply *msg); diff --git a/src/mgr/Forwarder.cc b/src/mgr/Forwarder.cc index 5c9f7dcc33..4d884b51c4 100644 --- a/src/mgr/Forwarder.cc +++ b/src/mgr/Forwarder.cc @@ -12,6 +12,7 @@ #include "AccessLogEntry.h" #include "base/AsyncJobCalls.h" #include "base/TextException.h" +#include "comm.h" #include "comm/Connection.h" #include "CommCalls.h" #include "errorpage.h" diff --git a/src/mgr/StoreToCommWriter.cc b/src/mgr/StoreToCommWriter.cc index 74d95ea907..f2b043e2e5 100644 --- a/src/mgr/StoreToCommWriter.cc +++ b/src/mgr/StoreToCommWriter.cc @@ -11,6 +11,7 @@ #include "squid.h" #include "base/AsyncCbdataCalls.h" #include "base/TextException.h" +#include "comm.h" #include "comm/Connection.h" #include "comm/Write.h" #include "CommCalls.h" diff --git a/src/store.cc b/src/store.cc index a9980f6d2f..58ae4dbe73 100644 --- a/src/store.cc +++ b/src/store.cc @@ -206,44 +206,6 @@ StoreEntry::getMD5Text() const return storeKeyText((const cache_key *)key); } -#include "comm.h" - -void -StoreEntry::DeferReader(void *theContext, CommRead const &aRead) -{ - StoreEntry *anEntry = (StoreEntry *)theContext; - anEntry->delayAwareRead(aRead.conn, - aRead.buf, - aRead.len, - aRead.callback); -} - -void -StoreEntry::delayAwareRead(const Comm::ConnectionPointer &conn, char *buf, int len, AsyncCall::Pointer callback) -{ - size_t amountToRead = bytesWanted(Range(0, len)); - /* sketch: readdeferer* = getdeferer. - * ->deferRead (fd, buf, len, callback, DelayAwareRead, this) - */ - - if (amountToRead <= 0) { - assert (mem_obj); - mem_obj->delayRead(DeferredRead(DeferReader, this, CommRead(conn, buf, len, callback))); - return; - } - - if (fd_table[conn->fd].closing()) { - // Readers must have closing callbacks if they want to be notified. No - // readers appeared to care around 2009/12/14 as they skipped reading - // for other reasons. Closing may already be true at the delyaAwareRead - // call time or may happen while we wait after delayRead() above. - debugs(20, 3, "will not read from closing " << conn << " for " << callback); - return; // the read callback will never be called - } - - comm_read(conn, buf, amountToRead, callback); -} - size_t StoreEntry::bytesWanted (Range const aRange, bool ignoreDelayPools) const { diff --git a/src/tests/stub_DelayId.cc b/src/tests/stub_DelayId.cc index 23942a09bb..5624a4476e 100644 --- a/src/tests/stub_DelayId.cc +++ b/src/tests/stub_DelayId.cc @@ -20,7 +20,7 @@ DelayId::DelayId(): pool_(0), compositeId(NULL), markedAsNoDelay(false) {} DelayId::~DelayId() {} -void DelayId::delayRead(DeferredRead const&) STUB_NOP +void DelayId::delayRead(const AsyncCallPointer &) STUB_NOP void BandwidthBucket::refillBucket() STUB bool BandwidthBucket::applyQuota(int &, Comm::IoCallback *) STUB_RETVAL(false) BandwidthBucket *BandwidthBucket::SelectBucket(fde *) STUB_RETVAL(nullptr) diff --git a/src/tests/stub_MemObject.cc b/src/tests/stub_MemObject.cc index e9dce01ce1..990b5b51d0 100644 --- a/src/tests/stub_MemObject.cc +++ b/src/tests/stub_MemObject.cc @@ -38,7 +38,7 @@ const char *MemObject::storeId() const STUB_RETVAL(NULL) const char *MemObject::logUri() const STUB_RETVAL(NULL) void MemObject::setUris(char const *, char const *, const HttpRequestMethod &) STUB void MemObject::reset() STUB -void MemObject::delayRead(DeferredRead const &) STUB +void MemObject::delayRead(const AsyncCallPointer &) STUB bool MemObject::readAheadPolicyCanRead() const STUB_RETVAL(false) void MemObject::setNoDelay(bool const) STUB MemObject::~MemObject() STUB diff --git a/src/tests/stub_comm.cc b/src/tests/stub_comm.cc index f86e93356e..25de5d0640 100644 --- a/src/tests/stub_comm.cc +++ b/src/tests/stub_comm.cc @@ -21,14 +21,10 @@ // void comm_read(const Comm::ConnectionPointer &, char *, int, IOCB *, void *) STUB // void comm_read(const Comm::ConnectionPointer &, char*, int, AsyncCall::Pointer &) STUB -/* should be in stub_CommRead */ -#include "CommRead.h" -CommRead::CommRead(const Comm::ConnectionPointer &, char *, int, AsyncCall::Pointer &) STUB -CommRead::CommRead() STUB -DeferredReadManager::~DeferredReadManager() STUB -DeferredRead::DeferredRead(DeferrableRead *, void *, CommRead const &) STUB -void DeferredReadManager::delayRead(DeferredRead const &) STUB -void DeferredReadManager::kickReads(int const) STUB +/* should be in stub_libbase */ +#include "base/DelayedAsyncCalls.h" +void DelayedAsyncCalls::delay(const AsyncCall::Pointer &) STUB +void DelayedAsyncCalls::schedule() STUB #include "comm.h" bool comm_iocallbackpending(void) STUB_RETVAL(false) diff --git a/src/tests/stub_store.cc b/src/tests/stub_store.cc index 2bf80f1ece..72cced6b3a 100644 --- a/src/tests/stub_store.cc +++ b/src/tests/stub_store.cc @@ -64,7 +64,6 @@ bool StoreEntry::timestampsSet() STUB_RETVAL(false) void StoreEntry::unregisterAbortCallback(const char *) STUB void StoreEntry::destroyMemObject() STUB int StoreEntry::checkTooSmall() STUB_RETVAL(0) -void StoreEntry::delayAwareRead(const Comm::ConnectionPointer&, char *, int, AsyncCall::Pointer) STUB void StoreEntry::setNoDelay (bool const) STUB bool StoreEntry::modifiedSince(const time_t, const int) const STUB_RETVAL(false) bool StoreEntry::hasIfMatchEtag(const HttpRequest &) const STUB_RETVAL(false) diff --git a/src/tunnel.cc b/src/tunnel.cc index 7b1accdc2b..b956a075bd 100644 --- a/src/tunnel.cc +++ b/src/tunnel.cc @@ -863,7 +863,7 @@ TunnelStateData::copyRead(Connection &from, IOCB *completion) int bw = from.bytesWanted(1, SQUID_TCP_SO_RCVBUF); // XXX: Delay pools must not delay client-to-Squid traffic (i.e. when // from.readPendingFunc is tunnelDelayedClientRead()). - // XXX: Bug #4913: Use DeferredRead instead. + // XXX: Bug #4913: For delay pools, use delayRead() API instead. if (bw == 1 && ++from.delayedLoops < 10) { from.readPending = this; eventAdd("tunnelDelayedServerRead", from.readPendingFunc, from.readPending, 0.3, true);