#define SQUID_STORECLIENT_H
#include "acl/ChecklistFiller.h"
+#include "base/AsyncCall.h"
#include "base/forward.h"
#include "dlink.h"
#include "StoreIOBuffer.h"
public:
store_client(StoreEntry *);
~store_client();
- bool memReaderHasLowerOffset(int64_t) const;
+
+ /// An offset into the stored response bytes, including the HTTP response
+ /// headers (if any). Note that this offset does not include Store entry
+ /// metadata, because it is not a part of the stored response.
+ /// \retval 0 means the client wants to read HTTP response headers.
+ /// \retval +N the response byte that the client wants to read next.
+ /// \retval -N should not occur.
+ // TODO: Callers do not expect negative offset. Verify that the return
+ // value cannot be negative and convert to unsigned in this case.
+ int64_t readOffset() const { return copyInto.offset; }
+
int getType() const;
- void fail();
- void callback(ssize_t len, bool error = false);
+
+ /// React to the end of reading the response from disk. There will be no
+ /// more readHeader() and readBody() callbacks for the current storeRead()
+ /// swapin after this notification.
+ void noteSwapInDone(bool error);
+
void doCopy (StoreEntry *e);
void readHeader(const char *buf, ssize_t len);
void readBody(const char *buf, ssize_t len);
+
+ /// Request StoreIOBuffer-described response data via an asynchronous STCB
+ /// callback. At most one outstanding request is allowed per store_client.
void copy(StoreEntry *, StoreIOBuffer, STCB *, void *);
+
void dumpStats(MemBuf * output, int clientNumber) const;
int64_t cmp_offset;
StoreIOState::Pointer swapin_sio;
struct {
+ /// whether we are expecting a response to be swapped in from disk
+ /// (i.e. whether async storeRead() is currently in progress)
+ // TODO: a better name reflecting the 'in' scope of the flag
bool disk_io_pending;
+
+ /// whether the store_client::doCopy()-initiated STCB sequence is
+ /// currently in progress
bool store_copying;
- bool copy_event_pending;
} flags;
#if USE_DELAY_POOLS
DelayId delayId;
+
+ /// The maximum number of bytes the Store client can read/copy next without
+ /// overflowing its buffer and without violating delay pool limits. Store
+ /// I/O is not rate-limited, but we assume that the same number of bytes may
+ /// be read from the Squid-to-server connection that may be rate-limited.
+ int bytesWanted() const;
+
void setDelayId(DelayId delay_id);
#endif
dlink_node node;
- /* Below here is private - do no alter outside storeClient calls */
- StoreIOBuffer copyInto;
private:
bool moreToSend() const;
bool startSwapin();
bool unpackHeader(char const *buf, ssize_t len);
+ void fail();
+ void callback(ssize_t);
+ void noteCopiedBytes(size_t);
+ void noteEof();
+ void noteNews();
+ void finishCallback();
+ static void FinishCallback(store_client *);
+
int type;
bool object_ok;
+ /// Storage and metadata associated with the current copy() request. Ought
+ /// to be ignored when not answering a copy() request.
+ StoreIOBuffer copyInto;
+
+ /// The number of bytes loaded from Store into copyInto while answering the
+ /// current copy() request. Ought to be ignored when not answering.
+ size_t copiedSize;
+
/* Until we finish stuffing code into store_client */
public:
Callback ():callback_handler(NULL), callback_data(NULL) {}
Callback (STCB *, void *);
+
+ /// Whether the copy() answer is needed/expected (by the client) and has
+ /// not been computed (by us). False during (asynchronous) answer
+ /// delivery to the STCB callback_handler.
bool pending() const;
+
STCB *callback_handler;
void *callback_data;
CodeContextPointer codeContext; ///< Store client context
+
+ /// a scheduled asynchronous finishCallback() call (or nil)
+ AsyncCall::Pointer notifier;
} _callback;
};
#include "squid.h"
#include "acl/FilledChecklist.h"
+#include "base/AsyncCbdataCalls.h"
#include "base/CodeContext.h"
#include "event.h"
#include "globals.h"
static StoreIOState::STRCB storeClientReadBody;
static StoreIOState::STRCB storeClientReadHeader;
static void storeClientCopy2(StoreEntry * e, store_client * sc);
-static EVH storeClientCopyEvent;
static bool CheckQuickAbortIsReasonable(StoreEntry * entry);
CBDATA_CLASS_INIT(store_client);
/* store_client */
-bool
-store_client::memReaderHasLowerOffset(int64_t anOffset) const
-{
- return getType() == STORE_MEM_CLIENT && copyInto.offset < anOffset;
-}
-
int
store_client::getType() const
{
return sc;
}
+/// schedules asynchronous STCB call to relay disk or memory read results
+/// \param outcome an error signal (if negative), an EOF signal (if zero), or the number of bytes read
void
-store_client::callback(ssize_t sz, bool error)
+store_client::callback(const ssize_t outcome)
{
- size_t bSz = 0;
+ if (outcome > 0)
+ return noteCopiedBytes(outcome);
- if (sz >= 0 && !error)
- bSz = sz;
+ if (outcome < 0)
+ return fail();
- StoreIOBuffer result(bSz, 0,copyInto.data);
+ noteEof();
+}
- if (sz < 0 || error)
- result.flags.error = 1;
+/// finishCallback() wrapper; TODO: Add NullaryMemFunT for non-jobs.
+void
+store_client::FinishCallback(store_client * const sc)
+{
+ sc->finishCallback();
+}
- result.offset = cmp_offset;
- assert(_callback.pending());
- cmp_offset = copyInto.offset + bSz;
+/// finishes a copy()-STCB sequence by synchronously calling STCB
+void
+store_client::finishCallback()
+{
+ Assure(_callback.callback_handler);
+ Assure(_callback.notifier);
+
+ // callers are not ready to handle a content+error combination
+ Assure(object_ok || !copiedSize);
+
+ StoreIOBuffer result(copiedSize, copyInto.offset, copyInto.data);
+ result.flags.error = object_ok ? 0 : 1;
+ copiedSize = 0;
+
+ cmp_offset = result.offset + result.length;
STCB *temphandler = _callback.callback_handler;
void *cbdata = _callback.callback_data;
_callback = Callback(NULL, NULL);
cbdataReferenceDone(cbdata);
}
-static void
-storeClientCopyEvent(void *data)
+/// schedules asynchronous STCB call to relay a successful disk or memory read
+/// \param bytesCopied the number of response bytes copied into copyInto
+void
+store_client::noteCopiedBytes(const size_t bytesCopied)
{
- store_client *sc = (store_client *)data;
- debugs(90, 3, "storeClientCopyEvent: Running");
- assert (sc->flags.copy_event_pending);
- sc->flags.copy_event_pending = false;
-
- if (!sc->_callback.pending())
- return;
+ debugs(90, 5, bytesCopied);
+ Assure(bytesCopied > 0);
+ Assure(!copiedSize);
+ copiedSize = bytesCopied;
+ noteNews();
+}
- storeClientCopy2(sc->entry, sc);
+void
+store_client::noteEof()
+{
+ debugs(90, 5, copiedSize);
+ Assure(!copiedSize);
+ noteNews();
}
store_client::store_client(StoreEntry *e) :
#endif
entry(e),
type(e->storeClientType()),
- object_ok(true)
+ object_ok(true),
+ copiedSize(0)
{
flags.disk_io_pending = false;
flags.store_copying = false;
- flags.copy_event_pending = false;
++ entry->refcount;
if (getType() == STORE_DISK_CLIENT) {
storeClientCopy2(StoreEntry * e, store_client * sc)
{
/* reentrancy not allowed - note this could lead to
- * dropped events
+ * dropped notifications about response data availability
*/
- if (sc->flags.copy_event_pending) {
- return;
- }
-
if (sc->flags.store_copying) {
- sc->flags.copy_event_pending = true;
- debugs(90, 3, "storeClientCopy2: Queueing storeClientCopyEvent()");
- eventAdd("storeClientCopyEvent", storeClientCopyEvent, sc, 0.0, 0);
+ debugs(90, 3, "prevented recursive copying for " << *e);
return;
}
* if the peer aborts, we want to give the client(s)
* everything we got before the abort condition occurred.
*/
- /* Warning: doCopy may indirectly free itself in callbacks,
- * hence the lock to keep it active for the duration of
- * this function
- * XXX: Locking does not prevent calling sc destructor (it only prevents
- * freeing sc memory) so sc may become invalid from C++ p.o.v.
- */
- CbcPointer<store_client> tmpLock = sc;
- assert (!sc->flags.store_copying);
sc->doCopy(e);
- assert(!sc->flags.store_copying);
}
void
store_client::doCopy(StoreEntry *anEntry)
{
+ Assure(_callback.pending());
+ Assure(!flags.disk_io_pending);
+ Assure(!flags.store_copying);
+
assert (anEntry == entry);
flags.store_copying = true;
MemObject *mem = entry->mem_obj;
if (!moreToSend()) {
/* There is no more to send! */
debugs(33, 3, "There is no more to send!");
- callback(0);
+ noteEof();
flags.store_copying = false;
return;
}
}
}
+void
+store_client::noteSwapInDone(const bool error)
+{
+ Assure(_callback.pending());
+ if (error)
+ fail();
+ else
+ noteEof();
+}
+
void
store_client::scheduleRead()
{
/* What the client wants is in memory */
/* Old style */
debugs(90, 3, "store_client::doCopy: Copying normal from memory");
- size_t sz = entry->mem_obj->data_hdr.copy(copyInto);
+ const auto sz = entry->mem_obj->data_hdr.copy(copyInto); // may be <= 0 per copy() API
callback(sz);
flags.store_copying = false;
}
void
store_client::fail()
{
+ debugs(90, 3, (object_ok ? "once" : "again"));
+ if (!object_ok)
+ return; // we failed earlier; nothing to do now
+
object_ok = false;
+
+ noteNews();
+}
+
+/// if necessary and possible, informs the Store reader about copy() result
+void
+store_client::noteNews()
+{
/* synchronous open failures callback from the store,
* before startSwapin detects the failure.
* TODO: fix this inconsistent behaviour - probably by
* not synchronous
*/
- if (_callback.pending())
- callback(0, true);
+ if (!_callback.callback_handler) {
+ debugs(90, 5, "client lost interest");
+ return;
+ }
+
+ if (_callback.notifier) {
+ debugs(90, 5, "earlier news is being delivered by " << _callback.notifier);
+ return;
+ }
+
+ _callback.notifier = asyncCall(90, 4, "store_client::FinishCallback", cbdataDialer(store_client::FinishCallback, this));
+ ScheduleCallHere(_callback.notifier);
+
+ Assure(!_callback.pending());
}
static void
++statCounter.swap.ins;
}
- if (sc->_callback.pending()) {
- /* callback with ssize = -1 to indicate unexpected termination */
- debugs(90, 3, "store_client for " << *e << " has a callback");
- sc->fail();
+ if (sc->_callback.callback_handler || sc->_callback.notifier) {
+ debugs(90, 3, "forgetting store_client callback for " << *e);
+ // Do not notify: Callers want to stop copying and forget about this
+ // pending copy request. Some would mishandle a notification from here.
+ if (sc->_callback.notifier)
+ sc->_callback.notifier->cancel("storeUnregister");
}
#if STORE_CLIENT_LIST_DEBUG
#endif
+ // XXX: We might be inside sc store_client method somewhere up the call
+ // stack. TODO: Convert store_client to AsyncJob to make destruction async.
delete sc;
assert(e->locked());
if (sc->flags.disk_io_pending)
continue;
+ if (sc->flags.store_copying)
+ continue;
+
+ // XXX: If invokeHandlers() is (indirectly) called from a store_client
+ // method, then the above three conditions may not be sufficient to
+ // prevent us from reentering the same store_client object! This
+ // probably does not happen in the current code, but no observed
+ // invariant prevents this from (accidentally) happening in the future.
+
+ // TODO: Convert store_client into AsyncJob; make this call asynchronous
CodeContext::Reset(sc->_callback.codeContext);
debugs(90, 3, "checking client #" << i);
storeClientCopy2(this, sc);
if (flags.store_copying)
output->append(" store_copying", 14);
- if (flags.copy_event_pending)
- output->append(" copy_event_pending", 19);
+ if (_callback.notifier)
+ output->append(" notifying", 10);
output->append("\n",1);
}
bool
store_client::Callback::pending() const
{
- return callback_handler && callback_data;
+ return callback_handler && !notifier;
}
store_client::Callback::Callback(STCB *function, void *data):
}
#if USE_DELAY_POOLS
+int
+store_client::bytesWanted() const
+{
+ // TODO: To avoid using stale copyInto, return zero if !_callback.pending()?
+ return delayId.bytesWanted(0, copyInto.length);
+}
+
void
store_client::setDelayId(DelayId delay_id)
{