From 1fa761af8f958150470a6c9c7d22192c82a036c9 Mon Sep 17 00:00:00 2001 From: Eduard Bagdasaryan Date: Wed, 8 Jun 2022 21:03:52 +0000 Subject: [PATCH] Break long store_client call chains with async calls (#1056) The store_client class design created very long call chains spanning Squid-client and Squid-server processing and multiple transactions. These call chains also create ideal conditions for dangerous recursive relationships between communicating classes (a.k.a. "reentrancy" among Squid developers). For example, storeClientCopy() enters store_client and triggers disk I/O that triggers invokeHandlers() that re-enters the same store_client object and starts competing with the original storeClientCopy() processing state. The official code prevented the worst recursion cases with three(!) boolean flags and time-based events abused to break some of the call chains, but that approach did not solve all of the problems while also losing transaction context information across time-based events. This change effectively makes STCB storeClientCopy() callbacks asynchronous, eliminating the need for time-based events and one of the flags. It shortens many call chains and preserves transaction context. The remaining problems can and should be eliminated by converting store_client into AsyncJob, but those changes deserve a dedicated PR. store_client orchestrates cooperation of multiple asynchronous players: * Sink: A Store client requests a STCB callback via a storeClientCopy()/copy() call. A set _callback.callback_handler implies that the client is waiting for this callback. * Source1: A Store disk reading subsystem activated by the storeRead() call "spontaneously" delivers response bytes via storeClientRead*() callbacks. The disk_io_pending flag implies waiting for them. * Source2: Store memory subsystem activated by storeClientListAdd() "spontaneously" delivers response bytes via invokeHandlers(). * Source3: Store disk subsystem activated by storeSwapInStart() "spontaneously" notifies of EOF/error by calling noteSwapInDone(). * Source4: A store_client object owner may delete the object by "spontaneously" calling storeUnregister(). The official code was converting this event into an error-notifying callback. We continue to answer each storeClientCopy() request with the first available information even though several SourceN calls are possible while we are waiting to complete the STCB callback. The StoreIOBuffer API and STCB recipients do not support data+error/eof combinations, and future code will move this wait to the main event loop anyway. This first-available approach means that the creation of the notifier call effectively ends answer processing -- store_client just waits for that call to fire so that it can relay the answer to still-synchronous STCB. When STCB itself becomes asynchronous, this logic will continue to work. Also stopped calling STCB from storeUnregister(). Conceptually, the storeUnregister() and storeClientCopy() callers ought to represent the same get-content-from-Store task; there should be no need to notify that task about what it is doing. Technically, analysis of STCB callbacks showed that many such notifications would be dangerous (if they are or become reachable). At the time of the storeUnregister() call, the STCB callbacks are usually unset (e.g., when storeUnregister() is called from the destructor, after that object has finished copying -- a very common case) or do not do anything (useful). Also removed callback_data from the Callback::pending() condition. It is conceptually wrong to require non-nil callback parameter, and it is never cleared separately from the callback_handler data member anyway. Also hid copyInto into the private store_client section to make sure it is not modified while we are waiting to complete the STCB callback. This move required adding a couple of read-only wrapper methods like bytesWanted() and noteSwapInDone(). Also simplified error/EOF/bytes handling on copy()-STCB path using dedicated methods (e.g., store_client::callback() API is no longer mixing EOF and error signals). --- src/MemObject.cc | 6 +- src/StoreClient.h | 65 ++++++++++-- src/store_client.cc | 178 ++++++++++++++++++++++----------- src/store_swapin.cc | 2 +- src/tests/stub_store_client.cc | 5 +- 5 files changed, 188 insertions(+), 68 deletions(-) diff --git a/src/MemObject.cc b/src/MemObject.cc index d0d70ec621..251562ea79 100644 --- a/src/MemObject.cc +++ b/src/MemObject.cc @@ -168,8 +168,8 @@ struct LowestMemReader : public unary_function { LowestMemReader(int64_t seed):current(seed) {} void operator() (store_client const &x) { - if (x.memReaderHasLowerOffset(current)) - current = x.copyInto.offset; + if (x.getType() == STORE_MEM_CLIENT) + current = std::min(current, x.readOffset()); } int64_t current; @@ -468,7 +468,7 @@ MemObject::mostBytesAllowed() const for (dlink_node *node = clients.head; node; node = node->next) { store_client *sc = (store_client *) node->data; - j = sc->delayId.bytesWanted(0, sc->copyInto.length); + j = sc->bytesWanted(); if (j > jmax) { jmax = j; diff --git a/src/StoreClient.h b/src/StoreClient.h index 4d34f23129..09f4bb02f3 100644 --- a/src/StoreClient.h +++ b/src/StoreClient.h @@ -10,6 +10,7 @@ #define SQUID_STORECLIENT_H #include "acl/ChecklistFiller.h" +#include "base/AsyncCall.h" #include "base/forward.h" #include "dlink.h" #include "StoreIOBuffer.h" @@ -59,14 +60,32 @@ class store_client public: store_client(StoreEntry *); ~store_client(); - bool memReaderHasLowerOffset(int64_t) const; + + /// An offset into the stored response bytes, including the HTTP response + /// headers (if any). Note that this offset does not include Store entry + /// metadata, because it is not a part of the stored response. + /// \retval 0 means the client wants to read HTTP response headers. + /// \retval +N the response byte that the client wants to read next. + /// \retval -N should not occur. + // TODO: Callers do not expect negative offset. Verify that the return + // value cannot be negative and convert to unsigned in this case. + int64_t readOffset() const { return copyInto.offset; } + int getType() const; - void fail(); - void callback(ssize_t len, bool error = false); + + /// React to the end of reading the response from disk. There will be no + /// more readHeader() and readBody() callbacks for the current storeRead() + /// swapin after this notification. + void noteSwapInDone(bool error); + void doCopy (StoreEntry *e); void readHeader(const char *buf, ssize_t len); void readBody(const char *buf, ssize_t len); + + /// Request StoreIOBuffer-described response data via an asynchronous STCB + /// callback. At most one outstanding request is allowed per store_client. void copy(StoreEntry *, StoreIOBuffer, STCB *, void *); + void dumpStats(MemBuf * output, int clientNumber) const; int64_t cmp_offset; @@ -79,19 +98,29 @@ public: StoreIOState::Pointer swapin_sio; struct { + /// whether we are expecting a response to be swapped in from disk + /// (i.e. whether async storeRead() is currently in progress) + // TODO: a better name reflecting the 'in' scope of the flag bool disk_io_pending; + + /// whether the store_client::doCopy()-initiated STCB sequence is + /// currently in progress bool store_copying; - bool copy_event_pending; } flags; #if USE_DELAY_POOLS DelayId delayId; + + /// The maximum number of bytes the Store client can read/copy next without + /// overflowing its buffer and without violating delay pool limits. Store + /// I/O is not rate-limited, but we assume that the same number of bytes may + /// be read from the Squid-to-server connection that may be rate-limited. + int bytesWanted() const; + void setDelayId(DelayId delay_id); #endif dlink_node node; - /* Below here is private - do no alter outside storeClient calls */ - StoreIOBuffer copyInto; private: bool moreToSend() const; @@ -103,9 +132,25 @@ private: bool startSwapin(); bool unpackHeader(char const *buf, ssize_t len); + void fail(); + void callback(ssize_t); + void noteCopiedBytes(size_t); + void noteEof(); + void noteNews(); + void finishCallback(); + static void FinishCallback(store_client *); + int type; bool object_ok; + /// Storage and metadata associated with the current copy() request. Ought + /// to be ignored when not answering a copy() request. + StoreIOBuffer copyInto; + + /// The number of bytes loaded from Store into copyInto while answering the + /// current copy() request. Ought to be ignored when not answering. + size_t copiedSize; + /* Until we finish stuffing code into store_client */ public: @@ -114,10 +159,18 @@ public: Callback ():callback_handler(NULL), callback_data(NULL) {} Callback (STCB *, void *); + + /// Whether the copy() answer is needed/expected (by the client) and has + /// not been computed (by us). False during (asynchronous) answer + /// delivery to the STCB callback_handler. bool pending() const; + STCB *callback_handler; void *callback_data; CodeContextPointer codeContext; ///< Store client context + + /// a scheduled asynchronous finishCallback() call (or nil) + AsyncCall::Pointer notifier; } _callback; }; diff --git a/src/store_client.cc b/src/store_client.cc index 3b5856eac9..7874e77ced 100644 --- a/src/store_client.cc +++ b/src/store_client.cc @@ -10,6 +10,7 @@ #include "squid.h" #include "acl/FilledChecklist.h" +#include "base/AsyncCbdataCalls.h" #include "base/CodeContext.h" #include "event.h" #include "globals.h" @@ -40,7 +41,6 @@ static StoreIOState::STRCB storeClientReadBody; static StoreIOState::STRCB storeClientReadHeader; static void storeClientCopy2(StoreEntry * e, store_client * sc); -static EVH storeClientCopyEvent; static bool CheckQuickAbortIsReasonable(StoreEntry * entry); CBDATA_CLASS_INIT(store_client); @@ -85,12 +85,6 @@ StoreClient::startCollapsingOn(const StoreEntry &e, const bool doingRevalidation /* store_client */ -bool -store_client::memReaderHasLowerOffset(int64_t anOffset) const -{ - return getType() == STORE_MEM_CLIENT && copyInto.offset < anOffset; -} - int store_client::getType() const { @@ -146,22 +140,42 @@ storeClientListAdd(StoreEntry * e, void *data) return sc; } +/// schedules asynchronous STCB call to relay disk or memory read results +/// \param outcome an error signal (if negative), an EOF signal (if zero), or the number of bytes read void -store_client::callback(ssize_t sz, bool error) +store_client::callback(const ssize_t outcome) { - size_t bSz = 0; + if (outcome > 0) + return noteCopiedBytes(outcome); - if (sz >= 0 && !error) - bSz = sz; + if (outcome < 0) + return fail(); - StoreIOBuffer result(bSz, 0,copyInto.data); + noteEof(); +} - if (sz < 0 || error) - result.flags.error = 1; +/// finishCallback() wrapper; TODO: Add NullaryMemFunT for non-jobs. +void +store_client::FinishCallback(store_client * const sc) +{ + sc->finishCallback(); +} - result.offset = cmp_offset; - assert(_callback.pending()); - cmp_offset = copyInto.offset + bSz; +/// finishes a copy()-STCB sequence by synchronously calling STCB +void +store_client::finishCallback() +{ + Assure(_callback.callback_handler); + Assure(_callback.notifier); + + // callers are not ready to handle a content+error combination + Assure(object_ok || !copiedSize); + + StoreIOBuffer result(copiedSize, copyInto.offset, copyInto.data); + result.flags.error = object_ok ? 0 : 1; + copiedSize = 0; + + cmp_offset = result.offset + result.length; STCB *temphandler = _callback.callback_handler; void *cbdata = _callback.callback_data; _callback = Callback(NULL, NULL); @@ -173,18 +187,24 @@ store_client::callback(ssize_t sz, bool error) cbdataReferenceDone(cbdata); } -static void -storeClientCopyEvent(void *data) +/// schedules asynchronous STCB call to relay a successful disk or memory read +/// \param bytesCopied the number of response bytes copied into copyInto +void +store_client::noteCopiedBytes(const size_t bytesCopied) { - store_client *sc = (store_client *)data; - debugs(90, 3, "storeClientCopyEvent: Running"); - assert (sc->flags.copy_event_pending); - sc->flags.copy_event_pending = false; - - if (!sc->_callback.pending()) - return; + debugs(90, 5, bytesCopied); + Assure(bytesCopied > 0); + Assure(!copiedSize); + copiedSize = bytesCopied; + noteNews(); +} - storeClientCopy2(sc->entry, sc); +void +store_client::noteEof() +{ + debugs(90, 5, copiedSize); + Assure(!copiedSize); + noteNews(); } store_client::store_client(StoreEntry *e) : @@ -194,11 +214,11 @@ store_client::store_client(StoreEntry *e) : #endif entry(e), type(e->storeClientType()), - object_ok(true) + object_ok(true), + copiedSize(0) { flags.disk_io_pending = false; flags.store_copying = false; - flags.copy_event_pending = false; ++ entry->refcount; if (getType() == STORE_DISK_CLIENT) { @@ -312,17 +332,11 @@ static void storeClientCopy2(StoreEntry * e, store_client * sc) { /* reentrancy not allowed - note this could lead to - * dropped events + * dropped notifications about response data availability */ - if (sc->flags.copy_event_pending) { - return; - } - if (sc->flags.store_copying) { - sc->flags.copy_event_pending = true; - debugs(90, 3, "storeClientCopy2: Queueing storeClientCopyEvent()"); - eventAdd("storeClientCopyEvent", storeClientCopyEvent, sc, 0.0, 0); + debugs(90, 3, "prevented recursive copying for " << *e); return; } @@ -335,21 +349,16 @@ storeClientCopy2(StoreEntry * e, store_client * sc) * if the peer aborts, we want to give the client(s) * everything we got before the abort condition occurred. */ - /* Warning: doCopy may indirectly free itself in callbacks, - * hence the lock to keep it active for the duration of - * this function - * XXX: Locking does not prevent calling sc destructor (it only prevents - * freeing sc memory) so sc may become invalid from C++ p.o.v. - */ - CbcPointer tmpLock = sc; - assert (!sc->flags.store_copying); sc->doCopy(e); - assert(!sc->flags.store_copying); } void store_client::doCopy(StoreEntry *anEntry) { + Assure(_callback.pending()); + Assure(!flags.disk_io_pending); + Assure(!flags.store_copying); + assert (anEntry == entry); flags.store_copying = true; MemObject *mem = entry->mem_obj; @@ -361,7 +370,7 @@ store_client::doCopy(StoreEntry *anEntry) if (!moreToSend()) { /* There is no more to send! */ debugs(33, 3, "There is no more to send!"); - callback(0); + noteEof(); flags.store_copying = false; return; } @@ -422,6 +431,16 @@ store_client::startSwapin() } } +void +store_client::noteSwapInDone(const bool error) +{ + Assure(_callback.pending()); + if (error) + fail(); + else + noteEof(); +} + void store_client::scheduleRead() { @@ -461,7 +480,7 @@ store_client::scheduleMemRead() /* What the client wants is in memory */ /* Old style */ debugs(90, 3, "store_client::doCopy: Copying normal from memory"); - size_t sz = entry->mem_obj->data_hdr.copy(copyInto); + const auto sz = entry->mem_obj->data_hdr.copy(copyInto); // may be <= 0 per copy() API callback(sz); flags.store_copying = false; } @@ -531,7 +550,19 @@ store_client::readBody(const char *, ssize_t len) void store_client::fail() { + debugs(90, 3, (object_ok ? "once" : "again")); + if (!object_ok) + return; // we failed earlier; nothing to do now + object_ok = false; + + noteNews(); +} + +/// if necessary and possible, informs the Store reader about copy() result +void +store_client::noteNews() +{ /* synchronous open failures callback from the store, * before startSwapin detects the failure. * TODO: fix this inconsistent behaviour - probably by @@ -539,8 +570,20 @@ store_client::fail() * not synchronous */ - if (_callback.pending()) - callback(0, true); + if (!_callback.callback_handler) { + debugs(90, 5, "client lost interest"); + return; + } + + if (_callback.notifier) { + debugs(90, 5, "earlier news is being delivered by " << _callback.notifier); + return; + } + + _callback.notifier = asyncCall(90, 4, "store_client::FinishCallback", cbdataDialer(store_client::FinishCallback, this)); + ScheduleCallHere(_callback.notifier); + + Assure(!_callback.pending()); } static void @@ -705,10 +748,12 @@ storeUnregister(store_client * sc, StoreEntry * e, void *data) ++statCounter.swap.ins; } - if (sc->_callback.pending()) { - /* callback with ssize = -1 to indicate unexpected termination */ - debugs(90, 3, "store_client for " << *e << " has a callback"); - sc->fail(); + if (sc->_callback.callback_handler || sc->_callback.notifier) { + debugs(90, 3, "forgetting store_client callback for " << *e); + // Do not notify: Callers want to stop copying and forget about this + // pending copy request. Some would mishandle a notification from here. + if (sc->_callback.notifier) + sc->_callback.notifier->cancel("storeUnregister"); } #if STORE_CLIENT_LIST_DEBUG @@ -716,6 +761,8 @@ storeUnregister(store_client * sc, StoreEntry * e, void *data) #endif + // XXX: We might be inside sc store_client method somewhere up the call + // stack. TODO: Convert store_client to AsyncJob to make destruction async. delete sc; assert(e->locked()); @@ -771,6 +818,16 @@ StoreEntry::invokeHandlers() if (sc->flags.disk_io_pending) continue; + if (sc->flags.store_copying) + continue; + + // XXX: If invokeHandlers() is (indirectly) called from a store_client + // method, then the above three conditions may not be sufficient to + // prevent us from reentering the same store_client object! This + // probably does not happen in the current code, but no observed + // invariant prevents this from (accidentally) happening in the future. + + // TODO: Convert store_client into AsyncJob; make this call asynchronous CodeContext::Reset(sc->_callback.codeContext); debugs(90, 3, "checking client #" << i); storeClientCopy2(this, sc); @@ -905,8 +962,8 @@ store_client::dumpStats(MemBuf * output, int clientNumber) const if (flags.store_copying) output->append(" store_copying", 14); - if (flags.copy_event_pending) - output->append(" copy_event_pending", 19); + if (_callback.notifier) + output->append(" notifying", 10); output->append("\n",1); } @@ -914,7 +971,7 @@ store_client::dumpStats(MemBuf * output, int clientNumber) const bool store_client::Callback::pending() const { - return callback_handler && callback_data; + return callback_handler && !notifier; } store_client::Callback::Callback(STCB *function, void *data): @@ -925,6 +982,13 @@ store_client::Callback::Callback(STCB *function, void *data): } #if USE_DELAY_POOLS +int +store_client::bytesWanted() const +{ + // TODO: To avoid using stale copyInto, return zero if !_callback.pending()? + return delayId.bytesWanted(0, copyInto.length); +} + void store_client::setDelayId(DelayId delay_id) { diff --git a/src/store_swapin.cc b/src/store_swapin.cc index 251bc2fda1..a65afe1889 100644 --- a/src/store_swapin.cc +++ b/src/store_swapin.cc @@ -56,7 +56,7 @@ storeSwapInFileClosed(void *data, int errflag, StoreIOState::Pointer) if (sc->_callback.pending()) { assert (errflag <= 0); - sc->callback(0, errflag ? true : false); + sc->noteSwapInDone(errflag); } ++statCounter.swap.ins; diff --git a/src/tests/stub_store_client.cc b/src/tests/stub_store_client.cc index 9d0e2b8001..cf8446655c 100644 --- a/src/tests/stub_store_client.cc +++ b/src/tests/stub_store_client.cc @@ -24,7 +24,10 @@ void storeLogOpen(void) STUB void storeDigestInit(void) STUB void storeRebuildStart(void) STUB void storeReplSetup(void) STUB -bool store_client::memReaderHasLowerOffset(int64_t) const STUB_RETVAL(false) +void store_client::noteSwapInDone(bool) STUB +#if USE_DELAY_POOLS +int store_client::bytesWanted() const STUB_RETVAL(0) +#endif void store_client::dumpStats(MemBuf *, int) const STUB int store_client::getType() const STUB_RETVAL(0) -- 2.39.5