* Added MemObject::expectedReplySize() and used it instead of object_sz.
When deciding whether an object with a known content length can be swapped
out, do not wait until the object is completely received and its size
(mem_obj->object_sz) becomes known (while asking the store to recheck in vain
with every incoming chunk). Instead, use the known content length, if any, to
make the decision.
This optimizes the common case where the complete object is eventually
received and swapped out, preventing accumulating potentially large objects in
RAM while waiting for the end of the response. Should not affect objects with
unknown content length.
Side-effect1: probably fixes several cases of unknowingly using negative
(unknown) mem_obj->object_sz in calculations. I added a few assertions to
double check some of the remaining object_sz/objectLen() uses.
Side-effect2: When expectedReplySize() is stored on disk as StoreEntry
metadata, it may help to detect truncated entries when the writer process dies
before completing the swapout.
* Removed mem->swapout.memnode in favor of mem->swapout.queue_offset.
The code used swapout.memnode pointer to keep track of the last page that was
swapped out. The code was semi-buggy because it could reset the pointer to
NULL if no new data came in before the call to doPages(). Perhaps the code
relied on the assumption that the caller will never doPages if there is no new
data, but I am not sure that assumption was correct in all cases (it could be
that I broke the calling code, of course).
Moreover, the page pointer was kept without any protection from page
disappearing during asynchronous swapout. There were "Evil hack time" comments
discussing how the page might disappear.
Fortunately, we already have mem->swapout.queue_offset that can be fed to
getBlockContainingLocation to find the page that needs to be swapped out.
There is no need to keep the page pointer around. The queue_offset-based math
is the same so we are not adding any overheads by using that offset (in fact,
we are removing some minor computations).
* Added "close how?" parameter to storeClose() and friends.
The old code would follow the same path when closing swapout activity for an
aborted entry and when completing a perfectly healthy swapout. In non-shared
case, that could have been OK because the abort code would then release the
entry, removing any half-written entry from the index and the disk (but I am
not sure that release happened fast enough in 100% of cases).
When the index and disk storage is shared among workers, such "temporary"
inconsistencies result in truncated responses being delivered by other workers
to the user because once the swapout activity is closed, other workers can
start using the entry.
By adding the "close how?" parameter to closing methods we allow the core and
SwapDir-specific code to handle aborted swapouts appropriately.
Since swapin code is "read only", we do not currently distinguish between
aborted and fully satisfied readers: The readerGone enum value applies to both
cases. If needed, the SwapDir reading code can make that distinction by
analyzing how much was actually swapped in.
* Moved "can you store this entry?" code to virtual SwapDir::canStore().
The old code had some of the tests in SwapDir-specific canStore() methods and
some in storeDirSelect*() methods. This resulted in inconsistencies, code
duplication, and extra calculation overheads. Making this call virtual allows
individual cache_dir types to do custom access controls.
The same method is used for cache_dir load reporting (if it returns true).
Load management needs more work, but the current code is no worse than the old
one in this aspect, and further improvements are outside this change scope.
* Minimized from-disk StoreEntry loading/unpacking code duplication.
Moved common (and often rather complex!) code from store modules into
storeRebuildLoadEntry, storeRebuildParseEntry, and storeRebuildKeepEntry.
* Do not set object_sz when the entry is aborted because the true object size
(HTTP reply headers + body) is not known in this case. Setting object_sz may
fool client-side code into believing that the object is complete.
This addresses an old RBC's complaint.
* When swapout initiation fails, release StoreEntry. This prevents the caller
code from trying to swap out again and again because swap_status becomes
SWAPOUT_NONE.
TODO: Consider add SWAPOUT_ERROR, STORE_ERROR, and similar states. It may
solve several problems where the code sees _NONE or _OK and thinks everything
is peachy when in fact there was an error.
* Always call StoreEntry::abort() instead of setting ENTRY_ABORTED manually.
* Rely on entry->abort() side-effects if ENTRY_ABORTED was set.
* Added or updated comments to better document current code.
* Added operator << for dumping StoreEntry summary into the debugging log.
Needs more work to report more info (and not report yet-unknown info).
return object_sz;
}
+int64_t
+MemObject::expectedReplySize() const {
+ debugs(20, 7, HERE << "object_sz: " << object_sz);
+ if (object_sz >= 0) // complete() has been called; we know the exact answer
+ return object_sz;
+
+ if (_reply) {
+ const int64_t clen = _reply->bodySize(method);
+ debugs(20, 7, HERE << "clen: " << clen);
+ if (clen >= 0 && _reply->hdr_sz > 0) // yuck: HttpMsg sets hdr_sz to 0
+ return clen + _reply->hdr_sz;
+ }
+
+ return -1; // not enough information to predict
+}
+
void
MemObject::reset()
{
* there will be a chunk of the data which is not in memory
* and is not yet on disk.
* The -1 makes sure the page isn't freed until storeSwapOut has
- * walked to the next page. (mem->swapout.memnode)
+ * walked to the next page.
*/
int64_t on_disk;
void replaceHttpReply(HttpReply *newrep);
void stat (MemBuf * mb) const;
int64_t endOffset () const;
+ /// negative if unknown; otherwise, expected object_sz, expected endOffset
+ /// maximum, and stored reply headers+body size (all three are the same)
+ int64_t expectedReplySize() const;
int64_t size() const;
void reset();
int64_t lowestMemReaderOffset() const;
{
public:
- int64_t queue_offset; /* relative to in-mem data */
- mem_node *memnode; /* which node we're currently paging out */
+ int64_t queue_offset; ///< number of bytes sent to SwapDir for writing
StoreIOState::Pointer sio;
};
void purgeMem();
void swapOut();
bool swapOutAble() const;
- void swapOutFileClose();
+ void swapOutFileClose(int how);
const char *url() const;
int checkCachable();
int checkNegativeHit() const;
virtual void read_(char *buf, size_t size, off_t offset, STRCB * callback, void *callback_data) = 0;
virtual void write(char const *buf, size_t size, off_t offset, FREE * free_func) = 0;
- virtual void close() = 0;
+
+ typedef enum {
+ wroteAll, ///< success: caller supplied all data it wanted to swap out
+ writerGone, ///< failure: caller left before swapping out everything
+ readerDone ///< success or failure: either way, stop swapping in
+ } CloseHow;
+ virtual void close(int how) = 0; ///< finish or abort swapping per CloseHow
sdirno swap_dirn;
sfileno swap_filen;
StoreEntry *e; /* Need this so the FS layers can play god */
mode_t mode;
- off_t offset_; /* current on-disk offset pointer */
+ off_t offset_; ///< number of bytes written or read for this entry so far
STFNCB *file_callback; /* called on delayed sfileno assignments */
STIOCB *callback;
void *callback_data;
StoreIOState::Pointer storeCreate(StoreEntry *, StoreIOState::STFNCB *, StoreIOState::STIOCB *, void *);
StoreIOState::Pointer storeOpen(StoreEntry *, StoreIOState::STFNCB *, StoreIOState::STIOCB *, void *);
-SQUIDCEXTERN void storeClose(StoreIOState::Pointer);
+SQUIDCEXTERN void storeClose(StoreIOState::Pointer, int how);
SQUIDCEXTERN void storeRead(StoreIOState::Pointer, char *, size_t, off_t, StoreIOState::STRCB *, void *);
SQUIDCEXTERN void storeIOWrite(StoreIOState::Pointer, char const *, size_t, off_t, FREE *);
/// \ingroup SwapStoreAPI
SQUIDCEXTERN char *storeSwapMetaPack(tlv * tlv_list, int *length);
/// \ingroup SwapStoreAPI
-SQUIDCEXTERN size_t storeSwapMetaSize(const StoreEntry * e);
-/// \ingroup SwapStoreAPI
SQUIDCEXTERN tlv *storeSwapMetaBuild(StoreEntry * e);
/// \ingroup SwapStoreAPI
SQUIDCEXTERN void storeSwapTLVFree(tlv * n);
return 0;
}
+bool
+SwapDir::canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const
+{
+ debugs(47,8, HERE << "cache_dir[" << index << "]: needs " <<
+ diskSpaceNeeded << " <? " << max_objsize);
+
+ if (EBIT_TEST(e.flags, ENTRY_SPECIAL))
+ return false; // we do not store Squid-generated entries
+
+ if (!objectSizeIsAcceptable(diskSpaceNeeded))
+ return false; // does not satisfy size limits
+
+ if (flags.read_only)
+ return false; // cannot write at all
+
+ if (cur_size > max_size)
+ return false; // already overflowing
+
+ /* Return 999 (99.9%) constant load; TODO: add a named constant for this */
+ load = 999;
+ return true; // kids may provide more tests and should report true load
+}
+
+
void
SwapDir::sync() {}
virtual bool doubleCheck(StoreEntry &); /* Double check the obj integrity */
virtual void statfs(StoreEntry &) const; /* Dump fs statistics */
virtual void maintain(); /* Replacement maintainence */
- /* <0 == error. > 1000 == error */
- virtual int canStore(StoreEntry const &)const = 0; /* Check if the fs will store an object */
+ /// check whether we can store the entry; if we can, report current load
+ virtual bool canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const = 0;
/* These two are notifications */
virtual void reference(StoreEntry &); /* Reference this object */
virtual void dereference(StoreEntry &); /* Unreference this object */
int
clientReplyContext::storeOKTransferDone() const
{
+ assert(http->storeEntry()->objectLen() >= 0);
+ assert(http->storeEntry()->objectLen() >= headers_sz);
if (http->out.offset >= http->storeEntry()->objectLen() - headers_sz) {
debugs(88,3,HERE << "storeOKTransferDone " <<
" out.offset=" << http->out.offset <<
virtual StoreSearch *search(String const url, HttpRequest *);
virtual void unlink (StoreEntry &);
virtual void statfs (StoreEntry &)const;
- virtual int canStore(StoreEntry const &)const;
+ virtual bool canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const;
virtual int callback();
virtual void sync();
virtual StoreIOState::Pointer createStoreIO(StoreEntry &, StoreIOState::STFNCB *, StoreIOState::STIOCB *, void *);
safe_free(stripe_path);
}
-/*
- * storeCossDirCheckObj
- *
- * This routine is called by storeDirSelectSwapDir to see if the given
- * object is able to be stored on this filesystem. COSS filesystems will
- * not store everything. We don't check for maxobjsize here since its
- * done by the upper layers.
- */
-int
-CossSwapDir::canStore(StoreEntry const &e)const
+bool
+CossSwapDir::canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const
{
+ if (!SwapDir::canStore(e, diskSpaceNeeded, load))
+ return false;
- /* Check if the object is a special object, we can't cache these */
-
- if (EBIT_TEST(e.flags, ENTRY_SPECIAL))
- return -1;
-
- /* Otherwise, we're ok */
- /* Return load, cs->aq.aq_numpending out of MAX_ASYNCOP */
- return io->load();
+ load = io->load();
+ return true;
}
/*
* object is able to be stored on this filesystem. UFS filesystems will
* happily store anything as long as the LRU time isn't too small.
*/
-int
-UFSSwapDir::canStore(StoreEntry const &e)const
+bool
+UFSSwapDir::canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const
{
+ if (!SwapDir::canStore(e, diskSpaceNeeded, load))
+ return false;
+
if (IO->shedLoad())
- return -1;
+ return false;
- return IO->load();
+ load = IO->load();
+ return true;
}
* when it is safe to actually signal the lower layer for closing.
*/
void
-UFSStoreState::close()
+UFSStoreState::close(int)
{
debugs(79, 3, "UFSStoreState::close: dirno " << swap_dirn << ", fileno "<<
std::setfill('0') << std::hex << std::uppercase << std::setw(8) << swap_filen);
- tryClosing();
+ tryClosing(); // UFS does not distinguish different closure types
}
void
continue;
}
+ MemBuf buf;
+ buf.init(SM_PAGE_SIZE, SM_PAGE_SIZE);
+ if (!storeRebuildLoadEntry(fd, sd->index, buf, counts))
+ return;
+
StoreEntry tmpe;
- const bool loaded = storeRebuildLoadEntry(fd, tmpe, key, counts,
+ const bool loaded = storeRebuildParseEntry(buf, tmpe, key, counts,
(int64_t)sb.st_size);
file_close(fd);
virtual void unlink(StoreEntry &);
virtual void statfs(StoreEntry &)const;
virtual void maintain();
- virtual int canStore(StoreEntry const &)const;
+ virtual bool canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const;
virtual void reference(StoreEntry &);
virtual void dereference(StoreEntry &);
virtual StoreIOState::Pointer createStoreIO(StoreEntry &, StoreIOState::STFNCB *, StoreIOState::STIOCB *, void *);
void operator delete (void *);
UFSStoreState(SwapDir * SD, StoreEntry * anEntry, STIOCB * callback_, void *callback_data_);
~UFSStoreState();
- virtual void close();
+ virtual void close(int how);
virtual void closeCompleted();
// protected:
virtual void ioCompletedNotification();
SQUIDCEXTERN void storeRebuildComplete(struct _store_rebuild_data *);
SQUIDCEXTERN void storeRebuildProgress(int sd_index, int total, int sofar);
-/// tries to load and validate entry metadata; returns tmp entry on success
-SQUIDCEXTERN bool storeRebuildLoadEntry(int fd, StoreEntry &e, cache_key *key, struct _store_rebuild_data &counts, uint64_t expectedSize);
+/// loads entry from disk; fills supplied memory buffer on success
+bool storeRebuildLoadEntry(int fd, int diskIndex, MemBuf &buf, struct _store_rebuild_data &counts);
+/// parses entry buffer and validates entry metadata; fills e on success
+bool storeRebuildParseEntry(MemBuf &buf, StoreEntry &e, cache_key *key, struct _store_rebuild_data &counts, uint64_t expectedSize);
/// checks whether the loaded entry should be kept; updates counters
-SQUIDCEXTERN bool storeRebuildKeepEntry(const StoreEntry &e, const cache_key *key, struct _store_rebuild_data &counts);
+bool storeRebuildKeepEntry(const StoreEntry &e, const cache_key *key, struct _store_rebuild_data &counts);
/*
#include "mem_node.h"
#include "StoreMeta.h"
#include "SwapDir.h"
+#include "StoreIOState.h"
#if USE_DELAY_POOLS
#include "DelayPools.h"
#endif
store_status = STORE_OK;
- /*
- * We assign an object length here. The only other place we assign
- * the object length is in storeComplete()
- */
- /* RBC: What do we need an object length for? we've just aborted the
- * request, the request is private and negatively cached. Surely
- * the object length is inappropriate to set.
- */
- mem_obj->object_sz = mem_obj->endOffset();
-
/* Notify the server side */
/*
/* Notify the client side */
invokeHandlers();
- /* Close any swapout file */
- swapOutFileClose();
+ // abort swap out, invalidating what was created so far (release follows)
+ swapOutFileClose(StoreIOState::writerGone);
unlock(); /* unlock */
}
char const *
StoreEntry::getSerialisedMetaData()
{
- const size_t swap_hdr_sz0 = storeSwapMetaSize(this);
- assert (swap_hdr_sz0 >= 0);
- mem_obj->swap_hdr_sz = (size_t) swap_hdr_sz0;
- // now we can use swap_hdr_sz to calculate swap_file_sz
- // so that storeSwapMetaBuild/Pack can pack corrent swap_file_sz
- swap_file_sz = objectLen() + mem_obj->swap_hdr_sz;
-
StoreMeta *tlv_list = storeSwapMetaBuild(this);
int swap_hdr_sz;
char *result = storeSwapMetaPack(tlv_list, &swap_hdr_sz);
- assert(static_cast<int>(swap_hdr_sz0) == swap_hdr_sz);
storeSwapTLVFree(tlv_list);
+ assert (swap_hdr_sz >= 0);
+ mem_obj->swap_hdr_sz = (size_t) swap_hdr_sz;
return result;
}
-/*
- * Calculate TLV list size for a StoreEntry
- * XXX: Must match the actual storeSwapMetaBuild result size
- */
-size_t
-storeSwapMetaSize(const StoreEntry * e)
-{
- size_t size = 0;
- ++size; // STORE_META_OK
- size += sizeof(int); // size of header to follow
-
- const size_t pfx = sizeof(char) + sizeof(int); // in the start of list entries
-
- size += pfx + SQUID_MD5_DIGEST_LENGTH;
- size += pfx + STORE_HDR_METASIZE;
- size += pfx + strlen(e->url()) + 1;
-
- // STORE_META_OBJSIZE
- if (e->objectLen() >= 0)
- size += pfx + sizeof(int64_t);
-
- if (const char *vary = e->mem_obj->vary_headers)
- size += pfx + strlen(vary) + 1;
-
- debugs(20, 3, "storeSwapMetaSize(" << e->url() << "): " << size);
- return size;
-}
-
bool
StoreEntry::swapoutPossible()
{
if (EBIT_TEST(flags, ENTRY_ABORTED)) {
assert(EBIT_TEST(flags, RELEASE_REQUEST));
- swapOutFileClose();
+ // StoreEntry::abort() already closed the swap out file, if any
return false;
}
+ // if we decided that swapout is possible, do not repeat same checks
+ // TODO: do not repeat any checks if we decided that swapout is impossible
+ if (swap_status != SWAPOUT_NONE) {
+ debugs(20, 3, "storeSwapOut: already started");
+ return true;
+ }
+
if (EBIT_TEST(flags, ENTRY_SPECIAL)) {
debugs(20, 3, "storeSwapOut: " << url() << " SPECIAL");
return false;
}
+ // check cache_dir max-size limit if all cache_dirs have it
+ if (store_maxobjsize >= 0) {
+ assert(mem_obj);
+
+ // TODO: add estimated store metadata size to be conservative
+
+ // use guaranteed maximum if it is known
+ const int64_t expectedEnd = mem_obj->expectedReplySize();
+ debugs(20, 7, "storeSwapOut: expectedEnd = " << expectedEnd);
+ if (expectedEnd > store_maxobjsize) {
+ debugs(20, 3, "storeSwapOut: will not fit: " << expectedEnd <<
+ " > " << store_maxobjsize);
+ return false; // known to outgrow the limit eventually
+ }
+ if (expectedEnd < 0) {
+ debugs(20, 3, "storeSwapOut: wait for more info: " <<
+ store_maxobjsize);
+ return false; // may fit later, but will be rejected now
+ }
+
+ // use current minimum (always known)
+ const int64_t currentEnd = mem_obj->endOffset();
+ if (currentEnd > store_maxobjsize) {
+ debugs(20, 3, "storeSwapOut: does not fit: " << currentEnd <<
+ " > " << store_maxobjsize);
+ return false; // already does not fit and may only get bigger
+ }
+ }
+
return true;
}
storeSwapTLVFree(tlv_list);
+ assert(swap_hdr_sz >= 0);
+ assert(entry->swap_file_sz > 0);
+ assert(entry->swap_file_sz >= static_cast<uint64_t>(swap_hdr_sz));
entry->mem_obj->swap_hdr_sz = swap_hdr_sz;
entry->mem_obj->object_sz = entry->swap_file_sz - swap_hdr_sz;
-
+ debugs(90, 5, "store_client::unpackHeader: swap_file_sz=" <<
+ entry->swap_file_sz << "( " << swap_hdr_sz << " + " <<
+ entry->mem_obj->object_sz << ")");
}
void
e->swapOut();
if (sc->swapin_sio != NULL) {
- storeClose(sc->swapin_sio);
+ storeClose(sc->swapin_sio, StoreIOState::readerDone);
sc->swapin_sio = NULL;
statCounter.swap.ins++;
}
int load;
RefCount<SwapDir> sd;
- ssize_t objsize = e->objectLen();
+ // e->objectLen() is negative at this point when we are still STORE_PENDING
+ ssize_t objsize = e->mem_obj->expectedReplySize();
if (objsize != -1)
objsize += e->mem_obj->swap_hdr_sz;
sd = dynamic_cast<SwapDir *>(INDEXSD(dirn));
- if (sd->flags.read_only)
+ if (!sd->canStore(*e, objsize, load))
continue;
- if (sd->cur_size > sd->max_size)
- continue;
-
- if (!sd->objectSizeIsAcceptable(objsize))
- continue;
-
- /* check for error or overload condition */
- load = sd->canStore(*e);
-
if (load < 0 || load > 1000) {
continue;
}
static int
storeDirSelectSwapDirLeastLoad(const StoreEntry * e)
{
- ssize_t objsize;
ssize_t most_free = 0, cur_free;
ssize_t least_objsize = -1;
int least_load = INT_MAX;
int i;
RefCount<SwapDir> SD;
- /* Calculate the object size */
- objsize = e->objectLen();
+ // e->objectLen() is negative at this point when we are still STORE_PENDING
+ ssize_t objsize = e->mem_obj->expectedReplySize();
if (objsize != -1)
objsize += e->mem_obj->swap_hdr_sz;
for (i = 0; i < Config.cacheSwap.n_configured; i++) {
SD = dynamic_cast<SwapDir *>(INDEXSD(i));
SD->flags.selected = 0;
- load = SD->canStore(*e);
-
- if (load < 0 || load > 1000) {
- continue;
- }
-
- if (!SD->objectSizeIsAcceptable(objsize))
- continue;
- if (SD->flags.read_only)
+ if (!SD->canStore(*e, objsize, load))
continue;
- if (SD->cur_size > SD->max_size)
+ if (load < 0 || load > 1000)
continue;
if (load > least_load)
storeCreate(StoreEntry * e, StoreIOState::STFNCB * file_callback, StoreIOState::STIOCB * close_callback, void *callback_data)
{
assert (e);
- ssize_t objsize;
- sdirno dirn;
- RefCount<SwapDir> SD;
store_io_stats.create.calls++;
- /* This is just done for logging purposes */
- objsize = e->objectLen();
-
- if (objsize != -1)
- objsize += e->mem_obj->swap_hdr_sz;
/*
* Pick the swapdir
* We assume that the header has been packed by now ..
*/
- dirn = storeDirSelectSwapDir(e);
+ const sdirno dirn = storeDirSelectSwapDir(e);
if (dirn == -1) {
- debugs(20, 2, "storeCreate: no valid swapdirs for this object");
+ debugs(20, 2, "storeCreate: no swapdirs for " << *e);
store_io_stats.create.select_fail++;
return NULL;
}
- debugs(20, 2, "storeCreate: Selected dir '" << dirn << "' for obj size '" << objsize << "'");
- SD = dynamic_cast<SwapDir *>(INDEXSD(dirn));
+ debugs(20, 2, "storeCreate: Selected dir " << dirn << " for " << *e);
+ SwapDir *SD = dynamic_cast<SwapDir *>(INDEXSD(dirn));
/* Now that we have a fs to use, call its storeCreate function */
StoreIOState::Pointer sio = SD->createStoreIO(*e, file_callback, close_callback, callback_data);
}
void
-storeClose(StoreIOState::Pointer sio)
+storeClose(StoreIOState::Pointer sio, int how)
{
if (sio->flags.closing) {
debugs(20,3,HERE << "storeClose: flags.closing already set, bailing");
sio->flags.closing = 1;
- debugs(20,3,HERE << "storeClose: calling sio->close()");
- sio->close();
+ debugs(20,3,HERE << "storeClose: calling sio->close(" << how << ")");
+ sio->close(how);
}
void
};
bool
-storeRebuildLoadEntry(int fd, StoreEntry &tmpe, cache_key *key,
- struct _store_rebuild_data &counts, uint64_t expectedSize)
+storeRebuildLoadEntry(int fd, int diskIndex, MemBuf &buf,
+ struct _store_rebuild_data &counts)
{
if (fd < 0)
return false;
- char hdr_buf[SM_PAGE_SIZE];
+ assert(buf.hasSpace()); // caller must allocate
- ++counts.scancount;
+ const int len = FD_READ_METHOD(fd, buf.space(), buf.spaceSize());
statCounter.syscalls.disk.reads++;
- int len;
- if ((len = FD_READ_METHOD(fd, hdr_buf, SM_PAGE_SIZE)) < 0) {
- debugs(47, 1, HERE << "failed to read swap entry meta data: " << xstrerror());
+ if (len < 0) {
+ const int xerrno = errno;
+ debugs(47, 1, "cache_dir[" << diskIndex << "]: " <<
+ "failed to read swap entry meta data: " << xstrerr(xerrno));
return false;
}
+ buf.appended(len);
+ return true;
+}
+
+bool
+storeRebuildParseEntry(MemBuf &buf, StoreEntry &tmpe, cache_key *key,
+ struct _store_rebuild_data &counts,
+ uint64_t expectedSize)
+{
int swap_hdr_len = 0;
- StoreMetaUnpacker aBuilder(hdr_buf, len, &swap_hdr_len);
+ StoreMetaUnpacker aBuilder(buf.content(), buf.contentSize(), &swap_hdr_len);
if (aBuilder.isBufferZero()) {
debugs(47,5, HERE << "skipping empty record.");
return false;
return false;
}
+ // TODO: consume parsed metadata?
+
debugs(47,7, HERE << "successful swap meta unpacking");
memset(key, '\0', SQUID_MD5_DIGEST_LENGTH);
tmpe.swap_file_sz << "!=" << expectedSize);
return false;
}
+ } else
+ if (tmpe.swap_file_sz <= 0) {
+ debugs(47, 1, HERE << "missing swap entry size: " << tmpe);
+ return false;
}
if (EBIT_TEST(tmpe.flags, KEY_PRIVATE)) {
tlv **T = &TLV;
const char *url;
const char *vary;
- const int64_t objsize = e->objectLen();
assert(e->mem_obj != NULL);
+ const int64_t objsize = e->mem_obj->expectedReplySize();
assert(e->swap_status == SWAPOUT_WRITING);
url = e->url();
debugs(20, 3, "storeSwapMetaBuild: " << url );
if (sio == NULL) {
e->swap_status = SWAPOUT_NONE;
+ // our caller thinks SWAPOUT_NONE means swapping out has not started
+ // yet so we better release here to avoid being called again and again
+ e->releaseRequest();
delete c;
xfree((char*)buf);
storeLog(STORE_LOG_SWAPOUTFAIL, e);
MemObject *mem = anEntry->mem_obj;
do {
- /*
- * Evil hack time.
- * We are paging out to disk in page size chunks. however, later on when
- * we update the queue position, we might not have a page (I *think*),
- * so we do the actual page update here.
- */
+ // find the page containing the first byte we have not swapped out yet
+ mem_node *page =
+ mem->data_hdr.getBlockContainingLocation(mem->swapout.queue_offset);
- if (mem->swapout.memnode == NULL) {
- /* We need to swap out the first page */
- mem->swapout.memnode = const_cast<mem_node *>(mem->data_hdr.start());
- } else {
- /* We need to swap out the next page */
- /* 20030636 RBC - we don't have ->next anymore.
- * But we do have the next location */
- mem->swapout.memnode = mem->data_hdr.getBlockContainingLocation (mem->swapout.memnode->end());
- }
+ if (!page)
+ return; // wait for more data to become available
+
+ // memNodeWriteComplete() and absence of buffer offset math below
+ // imply that we always write from the very beginning of the page
+ assert(page->start() == mem->swapout.queue_offset);
/*
* Get the length of this buffer. We are assuming(!) that the buffer
* but we can look at this at a later date or whenever the code results
* in bad swapouts, whichever happens first. :-)
*/
- ssize_t swap_buf_len = mem->swapout.memnode->nodeBuffer.length;
+ ssize_t swap_buf_len = page->nodeBuffer.length;
debugs(20, 3, "storeSwapOut: swap_buf_len = " << swap_buf_len);
mem->swapout.queue_offset += swap_buf_len;
storeIOWrite(mem->swapout.sio,
- mem->data_hdr.NodeGet(mem->swapout.memnode),
+ mem->data_hdr.NodeGet(page),
swap_buf_len,
-1,
memNodeWriteComplete);
if (!swapoutPossible())
return;
+ // Aborted entries have STORE_OK, but swapoutPossible rejects them. Thus,
+ // store_status == STORE_OK below means we got everything we wanted.
+
debugs(20, 7, HERE << "storeSwapOut: mem->inmem_lo = " << mem_obj->inmem_lo);
debugs(20, 7, HERE << "storeSwapOut: mem->endOffset() = " << mem_obj->endOffset());
debugs(20, 7, HERE << "storeSwapOut: swapout.queue_offset = " << mem_obj->swapout.queue_offset);
if (mem_obj->swapout.sio != NULL)
debugs(20, 7, "storeSwapOut: storeOffset() = " << mem_obj->swapout.sio->offset() );
+ // buffered bytes we have not swapped out yet
int64_t swapout_maxsize = mem_obj->endOffset() - mem_obj->swapout.queue_offset;
assert(swapout_maxsize >= 0);
debugs(20, 7, HERE << "storeSwapOut: lowest_offset = " << lowest_offset);
- /*
- * Grab the swapout_size and check to see whether we're going to defer
- * the swapout based upon size
- */
- if ((store_status != STORE_OK) && (swapout_maxsize < store_maxobjsize)) {
- /*
- * NOTE: the store_maxobjsize here is the max of optional
- * max-size values from 'cache_dir' lines. It is not the
- * same as 'maximum_object_size'. By default, store_maxobjsize
- * will be set to -1. However, I am worried that this
- * deferance may consume a lot of memory in some cases.
- * It would be good to make this decision based on reply
- * content-length, rather than wait to accumulate huge
- * amounts of object data in memory.
- */
- debugs(20, 5, "storeSwapOut: Deferring starting swapping out");
- return;
+ // Check to see whether we're going to defer the swapout based upon size
+ if (store_status != STORE_OK) {
+ const int64_t expectedSize = mem_obj->expectedReplySize();
+ const int64_t maxKnownSize = expectedSize < 0 ?
+ swapout_maxsize : expectedSize;
+ debugs(20, 7, HERE << "storeSwapOut: maxKnownSize= " << maxKnownSize);
+
+ if (maxKnownSize < store_maxobjsize) {
+ /*
+ * NOTE: the store_maxobjsize here is the max of optional
+ * max-size values from 'cache_dir' lines. It is not the
+ * same as 'maximum_object_size'. By default, store_maxobjsize
+ * will be set to -1. However, I am worried that this
+ * deferance may consume a lot of memory in some cases.
+ * Should we add an option to limit this memory consumption?
+ */
+ debugs(20, 5, "storeSwapOut: Deferring swapout start for " <<
+ (store_maxobjsize - maxKnownSize) << " bytes");
+ return;
+ }
}
+// TODO: it is better to trim as soon as we swap something out, not before
trimMemory();
#if SIZEOF_OFF_T <= 4
debugs(20, 7, "storeSwapOut: swapout_size = " << swapout_maxsize);
- if (swapout_maxsize == 0) {
- if (store_status == STORE_OK)
- swapOutFileClose();
-
- return; /* Nevermore! */
+ if (swapout_maxsize == 0) { // swapped everything we got
+ if (store_status == STORE_OK) { // got everything we wanted
+ assert(mem_obj->object_sz >= 0);
+ swapOutFileClose(StoreIOState::wroteAll);
+ }
+ // else need more data to swap out
+ return;
}
if (store_status == STORE_PENDING) {
* to the filesystem at this point because storeSwapOut() is
* not going to be called again for this entry.
*/
+ assert(mem_obj->object_sz >= 0);
assert(mem_obj->endOffset() == mem_obj->swapout.queue_offset);
- swapOutFileClose();
+ swapOutFileClose(StoreIOState::wroteAll);
}
}
void
-StoreEntry::swapOutFileClose()
+StoreEntry::swapOutFileClose(int how)
{
assert(mem_obj != NULL);
- debugs(20, 3, "storeSwapOutFileClose: " << getMD5Text());
+ debugs(20, 3, "storeSwapOutFileClose: " << getMD5Text() << " how=" << how);
debugs(20, 3, "storeSwapOutFileClose: sio = " << mem_obj->swapout.sio.getRaw());
if (mem_obj->swapout.sio == NULL)
return;
- storeClose(mem_obj->swapout.sio);
+ storeClose(mem_obj->swapout.sio, how);
}
static void
assert(e->swap_status == SWAPOUT_WRITING);
cbdataFree(c);
- if (errflag) {
+ // if object_size is still unknown, the entry was probably aborted
+ if (errflag || e->objectLen() < 0) {
debugs(20, 2, "storeSwapOutFileClosed: dirno " << e->swap_dirn << ", swapfile " <<
std::hex << std::setw(8) << std::setfill('0') << std::uppercase <<
e->swap_filen << ", errflag=" << errflag);
debugs(20, 3, "storeSwapOutFileClosed: SwapOut complete: '" << e->url() << "' to " <<
e->swap_dirn << ", " << std::hex << std::setw(8) << std::setfill('0') <<
std::uppercase << e->swap_filen);
- debugs(20, 3, "storeSwapOutFileClosed: Should be:" <<
- e->swap_file_sz << " = " << e->objectLen() << " + " <<
- mem->swap_hdr_sz);
+ debugs(20, 5, HERE << "swap_file_sz = " <<
+ e->objectLen() << " + " << mem->swap_hdr_sz);
+ assert(e->objectLen() >= 0); // we checked that above
e->swap_file_sz = e->objectLen() + mem->swap_hdr_sz;
e->swap_status = SWAPOUT_DONE;
e->store()->updateSize(e->swap_file_sz, 1);
TestSwapDir::init()
{}
-int
-TestSwapDir::canStore(const StoreEntry&) const
+bool
+TestSwapDir::canStore(const StoreEntry &, int64_t, int &load) const
{
+ load = 0;
return true;
}
virtual void reconfigure(int, char*);
virtual void init();
- virtual int canStore(const StoreEntry&) const;
+ virtual bool canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const;
virtual StoreIOState::Pointer createStoreIO(StoreEntry &, StoreIOState::STFNCB *, StoreIOState::STIOCB *, void *);
virtual StoreIOState::Pointer openStoreIO(StoreEntry &, StoreIOState::STFNCB *, StoreIOState::STIOCB *, void *);
virtual void parse(int, char*);
{}
bool
-storeRebuildLoadEntry(int fd, StoreEntry &tmpe, cache_key *key,
+storeRebuildLoadEntry(MemBuf &buf, StoreEntry &e, cache_key *key,
struct _store_rebuild_data &counts, uint64_t expectedSize)
{
return false;
StoreIoStats store_io_stats;
void
-StoreEntry::swapOutFileClose()
+StoreEntry::swapOutFileClose(int)
{
fatal ("Not implemented");
}