]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
* Removed old Rock Store hack to update swap_file_sz before it is packed into
authorAlex Rousskov <rousskov@measurement-factory.com>
Tue, 15 Feb 2011 04:09:58 +0000 (21:09 -0700)
committerAlex Rousskov <rousskov@measurement-factory.com>
Tue, 15 Feb 2011 04:09:58 +0000 (21:09 -0700)
store entry disk "header". Prepend our own disk to each db cell. Eventually,
the same header may be used to store "next page" or other info.

Modern Squid code should not use packed swap_file_sz or object_sz because
neither can be unknown at the packing time.  The storage code should either
record its own size information (Rock Store does that now) or rely on
3rd-party metadata (UFS uses fstat(2) system call).

* Do not promise to read a being-written entry. Use peekAtReader() to check
that we can read the presumably locked (by us) entry.

It is probably possible to queue a reader somehow and wait for the entry to
become readable or even to read initial pages as we write (when we start doing
incremental writing) but it would be rather complex and expensive to
synchronize such shared read/write access across workers.

* Fixed ::StoreIOState::offset_ Rock maintenance. The old code probably
misinterpret offset_ meaning and kept the disk offset there. It should have
been just the distance from the beginning of the db cell (not db file)
instead. It probably worked because we write everything at once and the
offset_ values were usually big enough to pass "wrote everything" checks.

* Extracted storeRebuildParseEntry from storeRebuildLoadEntry.

This allows Rock Store to analyze and skip disk cell header before giving
the packed store entry info to the common core code.

* Report more disk stats. Needs more work, especially in SMP reporting case.
Map stats are only reported for tiny maps because scanning the entire map may
be too expensive for bigger maps (is it worth the overhead to maintain these
stats all the time instead of computing them from scratch on-demand?).

* Moved StoreEntryBasics into Rock namespace.

* Be more consistent in setting Rock::IoState's swap_filen, swap_dirn,
diskOffset, and payloadEnd.

* Renamed vague (and confusing after the header was added to the db cell)
entrySize to more specific payloadEnd.

* Synced with r11228 changes (making Store work with SMP-shared max-size
cache_dirs).

src/fs/rock/RockDirMap.cc
src/fs/rock/RockDirMap.h
src/fs/rock/RockFile.h
src/fs/rock/RockIoState.cc
src/fs/rock/RockIoState.h
src/fs/rock/RockRebuild.cc
src/fs/rock/RockSwapDir.cc
src/fs/rock/RockSwapDir.h

index 6424265eee845917a9d6a100be284c949a37eeb7..b850522d3a1a2d2ef5a1fe1a5210647492b6019b 100644 (file)
@@ -33,7 +33,7 @@ Rock::DirMap::DirMap(const char *const aPath):
            entryLimit());
 }
 
-StoreEntryBasics *
+Rock::StoreEntryBasics *
 Rock::DirMap::openForWriting(const cache_key *const key, sfileno &fileno)
 {
     debugs(79, 5, HERE << " trying to open slot for key " << storeKeyText(key)
@@ -97,8 +97,26 @@ Rock::DirMap::abortIo(const sfileno fileno)
         closeForReading(fileno);
 }
 
+const Rock::StoreEntryBasics *
+Rock::DirMap::peekAtReader(const sfileno fileno) const
+{
+    assert(valid(fileno));
+    const Slot &s = shared->slots[fileno];
+    switch (s.state) {
+    case Slot::Readable:
+        return &s.seBasics; // immediate access by lock holder so no locking
+    case Slot::Writeable:
+        return NULL; // cannot read the slot when it is being written
+    case Slot::Empty:
+        assert(false); // must be locked for reading or writing
+    }
+    assert(false); // not reachable
+    return NULL;
+}
+
 bool
-Rock::DirMap::putAt(const StoreEntry &e, const sfileno fileno)
+Rock::DirMap::putAt(const DbCellHeader &header, const StoreEntry &e,
+                    const sfileno fileno)
 {
     const cache_key *key = static_cast<const cache_key*>(e.key);
     debugs(79, 5, HERE << " trying to open slot for key " << storeKeyText(key)
@@ -122,7 +140,7 @@ Rock::DirMap::putAt(const StoreEntry &e, const sfileno fileno)
         if (s.state == Slot::Empty) // we may also overwrite a Readable slot
             ++shared->count;
         s.setKey(static_cast<const cache_key*>(e.key));
-        s.seBasics.set(e);
+        s.seBasics.set(header, e);
         s.state = Slot::Readable;
         s.releaseExclusiveLock();
         debugs(79, 5, HERE << " put slot at " << fileno << " for key " <<
@@ -147,7 +165,7 @@ Rock::DirMap::free(const sfileno fileno)
     freeIfNeeded(s);
 }
 
-const StoreEntryBasics *
+const Rock::StoreEntryBasics *
 Rock::DirMap::openForReading(const cache_key *const key, sfileno &fileno)
 {
     debugs(79, 5, HERE << " trying to open slot for key " << storeKeyText(key)
@@ -170,7 +188,7 @@ Rock::DirMap::openForReading(const cache_key *const key, sfileno &fileno)
     return 0;
 }
 
-const StoreEntryBasics *
+const Rock::StoreEntryBasics *
 Rock::DirMap::openForReadingAt(const sfileno fileno)
 {
     debugs(79, 5, HERE << " trying to open slot at " << fileno << " for "
@@ -218,6 +236,14 @@ Rock::DirMap::full() const
     return entryCount() >= entryLimit();
 }
 
+void
+Rock::DirMap::updateStats(MapStats &stats) const
+{
+    stats.capacity += shared->limit;
+    for (int i = 0; i < shared->limit; ++i)
+        shared->slots[i].updateStats(stats);
+}
+
 bool
 Rock::DirMap::valid(const int pos) const
 {
@@ -351,14 +377,38 @@ Rock::Slot::switchExclusiveToSharedLock()
     releaseExclusiveLock();
 }
 
+void
+Rock::Slot::updateStats(MapStats &stats) const
+{
+    switch (state) {
+    case Readable:
+        ++stats.readable;
+        stats.readers += readers;
+        break;
+    case Writeable:
+        ++stats.writeable;
+        stats.writers += writers;
+        break;
+    case Empty:
+        ++stats.empty;
+        break;
+    }
+
+    if (waitingToBeFreed)
+        ++stats.marked;
+}
+
+
 Rock::DirMap::Shared::Shared(const int aLimit): limit(aLimit), count(0)
 {
 }
 
 void
-StoreEntryBasics::set(const StoreEntry &from)
+Rock::StoreEntryBasics::set(const DbCellHeader &aHeader, const StoreEntry &from)
 {
+    assert(from.swap_file_sz > 0);
     memset(this, 0, sizeof(*this));
+    header = aHeader;
     timestamp = from.timestamp;
     lastref = from.lastref;
     expires = from.expires;
@@ -367,3 +417,40 @@ StoreEntryBasics::set(const StoreEntry &from)
     refcount = from.refcount;
     flags = from.flags;
 }
+
+
+/* MapStats */
+
+Rock::MapStats::MapStats()
+{
+    memset(this, 0, sizeof(*this));
+}
+void
+Rock::MapStats::dump(StoreEntry &e) const
+{
+    storeAppendPrintf(&e, "Available slots: %9d\n", capacity);
+
+    if (!capacity)
+        return;
+
+    storeAppendPrintf(&e, "Readable slots:  %9d %6.2f%%\n",
+        readable, (100.0 * readable / capacity));
+    storeAppendPrintf(&e, "Filling slots:   %9d %6.2f%%\n",
+        writeable, (100.0 * writeable / capacity));
+    storeAppendPrintf(&e, "Empty slots:     %9d %6.2f%%\n",
+        empty, (100.0 * empty / capacity));
+
+    if (readers || writers) {
+        const int locks = readers + writers;
+        storeAppendPrintf(&e, "Readers:         %9d %6.2f%%\n",
+            readers, (100.0 * readers / locks));
+        storeAppendPrintf(&e, "Writers:         %9d %6.2f%%\n",
+            writers, (100.0 * writers / locks));
+    }
+
+    if (readable + writeable) {
+        storeAppendPrintf(&e, "Marked slots:    %9d %6.2f%%\n",
+            marked, (100.0 * marked / (readable + writeable)));
+    }
+}
index fcd48ab4de4cc944ebbe1b26b37088978102dd8c..9e71f3dd314256ef0c3312f7c92536de4fe6f06f 100644 (file)
@@ -1,12 +1,17 @@
 #ifndef SQUID_FS_ROCK_DIR_MAP_H
 #define SQUID_FS_ROCK_DIR_MAP_H
 
+#include "fs/rock/RockFile.h"
 #include "ipc/AtomicWord.h"
 #include "ipc/SharedMemory.h"
 
+namespace Rock {
+
 class StoreEntryBasics {
 public:
-    void set(const StoreEntry &from);
+    void set(const DbCellHeader &aHeader, const StoreEntry &anEntry);
+
+    DbCellHeader header; ///< rock-specific entry metadata
 
     /* START OF ON-DISK STORE_META_STD TLV field */
     time_t timestamp;
@@ -19,7 +24,21 @@ public:
     /* END OF ON-DISK STORE_META_STD */
 };
 
-namespace Rock {
+/// aggregates basic map performance measurements; all numbers are approximate
+class MapStats {
+public:
+    MapStats();
+
+    void dump(StoreEntry &e) const;
+
+    int capacity; ///< the total number of slots in the map
+    int readable; ///< number of slots in Readable state
+    int writeable; ///< number of slots in Writeable state
+    int empty; ///< number of slots in Empty state
+    int readers; ///< sum of slot.readers
+    int writers; ///< sum of slot.writers
+    int marked; ///< number of slots marked for freeing
+};
 
 /// DirMap entry
 class Slot {
@@ -40,6 +59,9 @@ public:
     void releaseExclusiveLock(); ///< undo successful exclusiveLock()
     void switchExclusiveToSharedLock(); ///< trade exclusive for shared access
 
+    /// adds approximate current stats to the supplied ones
+    void updateStats(MapStats &stats) const;
+
 public:
     // we want two uint64_t, but older GCCs lack __sync_fetch_and_add_8
     AtomicWordT<uint32_t> key_[4]; ///< MD5 entry key
@@ -66,7 +88,10 @@ public:
     void closeForWriting(const sfileno fileno);
 
     /// stores entry info at the requested slot or returns false
-    bool putAt(const StoreEntry &e, const sfileno fileno);
+    bool putAt(const DbCellHeader &header, const StoreEntry &e, const sfileno fileno);
+
+    /// only works on locked entries; returns nil unless the slot is readable
+    const StoreEntryBasics *peekAtReader(const sfileno fileno) const;
 
     /// mark the slot as waiting to be freed and, if possible, free it
     void free(const sfileno fileno);
@@ -86,6 +111,9 @@ public:
     int entryCount() const; ///< number of used slots
     int entryLimit() const; ///< maximum number of slots that can be used
 
+    /// adds approximate current stats to the supplied ones
+    void updateStats(MapStats &stats) const;
+
     static int AbsoluteEntryLimit(); ///< maximum entryLimit() possible
 
 private:
index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..097dbcbe0b40d45002458159021fdf5e18f6824a 100644 (file)
@@ -0,0 +1,24 @@
+#ifndef SQUID_FS_ROCK_DB_CELL_H
+#define SQUID_FS_ROCK_DB_CELL_H
+
+// XXX: rename to fs/rock/RockDbCell.{cc,h}
+
+namespace Rock {
+
+/// \ingroup Rock
+/// meta-information at the beginning of every db cell
+class DbCellHeader
+{
+public:
+    DbCellHeader(): payloadSize(0), reserved(0) {}
+
+    /// whether the freshly loaded header fields make sense
+    bool sane() const { return payloadSize >= 0 && reserved == 0; }
+
+    int64_t payloadSize; ///< cell contents size excluding this header
+    int64_t reserved; ///< reserved for future use (next cell pointer?)
+};
+
+} // namespace Rock
+
+#endif /* SQUID_FS_ROCK_DB_CELL_H */
index 3f89ea63b8cae4bc592c031d4f15d9f5a951f1d3..57fb7d25bcce3388132d70e601b03a0709fd9abb 100644 (file)
@@ -4,6 +4,8 @@
  * DEBUG: section 79    Disk IO Routines
  */
 
+#include "config.h"
+#include "MemObject.h"
 #include "Parsing.h"
 #include "DiskIO/DiskIOModule.h"
 #include "DiskIO/DiskIOStrategy.h"
@@ -18,11 +20,11 @@ Rock::IoState::IoState(SwapDir *dir,
     StoreIOState::STIOCB *cbIo,
     void *data):
     slotSize(0),
-    entrySize(0)
+    diskOffset(-1),
+    payloadEnd(-1)
 {
     e = anEntry;
-    swap_filen = e->swap_filen;
-    swap_dirn = dir->index;
+    // swap_filen, swap_dirn, diskOffset, and payloadEnd are set by the caller
     slotSize = dir->max_objsize;
     file_callback = cbFile;
     callback = cbIo;
@@ -48,45 +50,55 @@ Rock::IoState::file(const RefCount<DiskFile> &aFile)
 }
 
 void
-Rock::IoState::read_(char *buf, size_t len, off_t off, STRCB *cb, void *data)
+Rock::IoState::read_(char *buf, size_t len, off_t coreOff, STRCB *cb, void *data)
 {
     assert(theFile != NULL);
     assert(theFile->canRead());
+    assert(coreOff >= 0);
+    offset_ = coreOff;
+
+    // we skip our cell header; it is only read when building the map
+    const int64_t cellOffset = sizeof(DbCellHeader) +
+        static_cast<int64_t>(coreOff);
+    assert(cellOffset <= payloadEnd);
 
     // Core specifies buffer length, but we must not exceed stored entry size
-    assert(off >= 0);
-    assert(entrySize >= 0);
-    const int64_t offset = static_cast<int64_t>(off);
-    assert(offset <= entrySize);
-    if (offset + (int64_t)len > entrySize)
-        len = entrySize - offset;
+    if (cellOffset + (int64_t)len > payloadEnd)
+        len = payloadEnd - cellOffset;
 
     assert(read.callback == NULL);
     assert(read.callback_data == NULL);
     read.callback = cb;
     read.callback_data = cbdataReference(data);
 
-    theFile->read(new ReadRequest(::ReadRequest(buf, offset_ + offset, len), this));
+    theFile->read(new ReadRequest(
+                  ::ReadRequest(buf, diskOffset + cellOffset, len), this));
 }
 
 // We only buffer data here; we actually write when close() is called.
 // We buffer, in part, to avoid forcing OS to _read_ old unwritten portions
 // of the slot when the write does not end at the page or sector boundary.
 void
-Rock::IoState::write(char const *buf, size_t size, off_t offset, FREE *dtor)
+Rock::IoState::write(char const *buf, size_t size, off_t coreOff, FREE *dtor)
 {
     // TODO: move to create?
-    if (!offset) {
+    if (!coreOff) {
         assert(theBuf.isNull());
-        assert(entrySize >= 0);
-        theBuf.init(min(entrySize, slotSize), slotSize);
+        assert(payloadEnd <= slotSize);
+        theBuf.init(min(payloadEnd, slotSize), slotSize);
+        // start with our header; TODO: consider making it a trailer
+        DbCellHeader header;
+        assert(static_cast<int64_t>(sizeof(header)) <= payloadEnd);
+        header.payloadSize = payloadEnd - sizeof(header);
+        theBuf.append(reinterpret_cast<const char*>(&header), sizeof(header));
     } else {
         // Core uses -1 offset as "append". Sigh.
-        assert(offset == -1);
+        assert(coreOff == -1);
         assert(!theBuf.isNull());
     }
 
     theBuf.append(buf, size);
+    offset_ += size; // so that Core thinks we wrote it
 
     if (dtor)
         (dtor)(const_cast<char*>(buf)); // cast due to a broken API?
@@ -103,30 +115,32 @@ Rock::IoState::startWriting()
     // TODO: if DiskIO module is mmap-based, we should be writing whole pages
     // to avoid triggering read-page;new_head+old_tail;write-page overheads
 
-    debugs(79, 5, HERE << swap_filen << " at " << offset_ << '+' <<
+    debugs(79, 5, HERE << swap_filen << " at " << diskOffset << '+' <<
         theBuf.contentSize());
 
     assert(theBuf.contentSize() <= slotSize);
     // theFile->write may call writeCompleted immediatelly
-    theFile->write(new WriteRequest(::WriteRequest(theBuf.content(), offset_,
-        theBuf.contentSize(), theBuf.freeFunc()), this));
+    theFile->write(new WriteRequest(::WriteRequest(theBuf.content(),
+                   diskOffset, theBuf.contentSize(), theBuf.freeFunc()), this));
 }
 
 // 
 void
 Rock::IoState::finishedWriting(const int errFlag)
 {
+    // we incremented offset_ while accumulating data in write()
     callBack(errFlag);
 }
 
 void
-Rock::IoState::close()
+Rock::IoState::close(int how)
 {
-    debugs(79, 3, HERE << swap_filen << " at " << offset_);
-    if (!theBuf.isNull())
+    debugs(79, 3, HERE << swap_filen << " accumulated: " << offset_ <<
+        " how=" << how);
+    if (how == wroteAll && !theBuf.isNull())
         startWriting();
     else
-        callBack(0);
+        callBack(how == writerGone ? DISK_ERROR : 0); // TODO: add DISK_CALLER_GONE
 }
 
 /// close callback (STIOCB) dialer: breaks dependencies and 
index 5cb3e6b3d0adda3a245d0c4781c322d43a4d6ee3..33adf631aefc2f5a534584169c390d92ace261f8 100644 (file)
@@ -24,13 +24,17 @@ public:
     // ::StoreIOState API
     virtual void read_(char *buf, size_t size, off_t offset, STRCB * callback, void *callback_data);
     virtual void write(char const *buf, size_t size, off_t offset, FREE * free_func);
-    virtual void close();
+    virtual void close(int how);
 
     /// called by SwapDir when writing is done
     void finishedWriting(int errFlag);
 
     int64_t slotSize; ///< db cell size
-    int64_t entrySize; ///< planned or actual stored size for the entry
+    int64_t diskOffset; ///< the start of this cell inside the db file
+
+    /// when reading: number of bytes previously written to the db cell;
+    /// when writing: maximum payload offset in a db cell
+    int64_t payloadEnd;
 
     MEMPROXY_CLASS(IoState);
 
index b28bee415b3de5912ab570655400adfa780fd3a7..24a1f6fd19c233f1967c46f3b24036c862c45b6f 100644 (file)
@@ -4,8 +4,10 @@
  * DEBUG: section 79    Disk IO Routines
  */
 
+#include "config.h"
 #include "fs/rock/RockRebuild.h"
 #include "fs/rock/RockSwapDir.h"
+#include "fs/rock/RockFile.h"
 
 CBDATA_NAMESPACED_CLASS_INIT(Rock, Rebuild);
 
@@ -104,12 +106,38 @@ Rock::Rebuild::doOneEntry() {
     debugs(47,5, HERE << sd->index << " fileno " << fileno << " at " <<
         dbOffset << " <= " << dbSize);
 
+    ++counts.scancount;
+
     if (lseek(fd, dbOffset, SEEK_SET) < 0)
         failure("cannot seek to db entry", errno);
 
+    MemBuf buf;
+    buf.init(SM_PAGE_SIZE, SM_PAGE_SIZE);
+
+    if (!storeRebuildLoadEntry(fd, sd->index, buf, counts))
+        return;
+
+    // get our header
+    DbCellHeader header;
+    if (buf.contentSize() < static_cast<mb_size_t>(sizeof(header))) {
+        debugs(47, 1, "cache_dir[" << sd->index << "]: " <<
+            "truncated swap entry meta data at " << dbOffset);
+        counts.invalid++;
+        return;
+    }
+    memcpy(&header, buf.content(), sizeof(header));
+
+    if (!header.sane()) {
+        debugs(47, 1, "cache_dir[" << sd->index << "]: " <<
+            "malformed rock db cell header at " << dbOffset);
+        counts.invalid++;
+        return;
+    }
+    buf.consume(sizeof(header)); // optimize to avoid memmove()
+
     cache_key key[SQUID_MD5_DIGEST_LENGTH];
     StoreEntry loadedE;
-    if (!storeRebuildLoadEntry(fd, loadedE, key, counts, 0)) {
+    if (!storeRebuildParseEntry(buf, loadedE, key, counts, header.payloadSize)) {
         // skip empty slots
         if (loadedE.swap_filen > 0 || loadedE.swap_file_sz > 0) {
             counts.invalid++;
@@ -125,7 +153,7 @@ Rock::Rebuild::doOneEntry() {
     counts.objcount++;
     // loadedE->dump(5);
 
-    sd->addEntry(fileno, loadedE);
+    sd->addEntry(fileno, header, loadedE);
 }
 
 void
index 0a7445f14a3ec2347c91249c02a026a02410616f..988bce78f23c5a1641215a15988d76897eb2e567 100644 (file)
@@ -296,13 +296,13 @@ Rock::SwapDir::rebuild() {
 /* Add a new object to the cache with empty memory copy and pointer to disk
  * use to rebuild store from disk. Based on UFSSwapDir::addDiskRestore */
 bool
-Rock::SwapDir::addEntry(const int fileno, const StoreEntry &from)
+Rock::SwapDir::addEntry(const int fileno, const DbCellHeader &header, const StoreEntry &from)
 {
     debugs(47, 8, HERE << &from << ' ' << from.getMD5Text() <<
        ", fileno="<< std::setfill('0') << std::hex << std::uppercase <<
        std::setw(8) << fileno);
 
-    if (map->putAt(from, fileno)) {
+    if (map->putAt(header, from, fileno)) {
         // we do not add this entry to store_table so core will not updateSize
         updateSize(from.swap_file_sz, +1);
         return true;
@@ -312,24 +312,23 @@ Rock::SwapDir::addEntry(const int fileno, const StoreEntry &from)
 }
 
 
-int
-Rock::SwapDir::canStore(const StoreEntry &e) const
+bool
+Rock::SwapDir::canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const
 {
-    debugs(47,8, HERE << e.swap_file_sz << " ? " << max_objsize);
-
-    if (EBIT_TEST(e.flags, ENTRY_SPECIAL))
-        return -1;
+    if (!::SwapDir::canStore(e, sizeof(DbCellHeader)+diskSpaceNeeded, load))
+        return false;
 
-    if (!theFile || !theFile->canRead() || !theFile->canWrite())
-        return -1;
+    if (!theFile || !theFile->canWrite())
+        return false;
 
     if (!map)
-        return -1;
+        return false;
 
     if (io->shedLoad())
-        return -1;
+        return false;
 
-    return io->load();
+    load = io->load();
+    return true;
 }
 
 StoreIOState::Pointer
@@ -340,6 +339,16 @@ Rock::SwapDir::createStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreI
         return NULL;
     }
 
+    // compute payload size for our cell header, using StoreEntry info
+    // careful: e.objectLen() may still be negative here
+    const int64_t expectedReplySize = e.mem_obj->expectedReplySize();
+    assert(expectedReplySize >= 0); // must know to prevent cell overflows
+    assert(e.mem_obj->swap_hdr_sz > 0);
+    DbCellHeader header;
+    header.payloadSize = e.mem_obj->swap_hdr_sz + expectedReplySize;
+    const int64_t payloadEnd = sizeof(DbCellHeader) + header.payloadSize;
+    assert(payloadEnd <= max_objsize);
+
     sfileno fileno;
     StoreEntryBasics *const basics =
         map->openForWriting(reinterpret_cast<const cache_key *>(e.key), fileno);
@@ -347,25 +356,24 @@ Rock::SwapDir::createStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreI
         debugs(47, 5, HERE << "Rock::SwapDir::createStoreIO: map->add failed");
         return NULL;
     }
-    basics->set(e);
+    e.swap_file_sz = header.payloadSize; // and will be copied to the map
+    basics->set(header, e);
 
-    // XXX: We rely on our caller, storeSwapOutStart(), to set e->fileno.
+    // XXX: We rely on our caller, storeSwapOutStart(), to set e.fileno.
     // If that does not happen, the entry will not decrement the read level!
 
     IoState *sio = new IoState(this, &e, cbFile, cbIo, data);
 
     sio->swap_dirn = index;
     sio->swap_filen = fileno;
-    sio->offset_ = diskOffset(sio->swap_filen);
-    sio->entrySize = e.objectLen() + e.mem_obj->swap_hdr_sz;
+    sio->payloadEnd = payloadEnd;
+    sio->diskOffset = diskOffset(sio->swap_filen);
 
     debugs(47,5, HERE << "dir " << index << " created new fileno " <<
         std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
-        sio->swap_filen << std::dec << " at " << sio->offset_ << " size: " <<
-        sio->entrySize << " (" << e.objectLen() << '+' <<
-        e.mem_obj->swap_hdr_sz << ")");
+        sio->swap_filen << std::dec << " at " << sio->diskOffset);
 
-    assert(sio->offset_ + sio->entrySize <= diskOffsetLimit());
+    assert(sio->diskOffset + payloadEnd <= diskOffsetLimit());
 
     sio->file(theFile);
 
@@ -376,15 +384,18 @@ Rock::SwapDir::createStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreI
 int64_t
 Rock::SwapDir::diskOffset(int filen) const
 {
+    assert(filen >= 0);
     return HeaderSize + max_objsize*filen;
 }
 
 int64_t
 Rock::SwapDir::diskOffsetLimit() const
 {
+    assert(map);
     return diskOffset(map->entryLimit());
 }
 
+// tries to open an old or being-written-to entry with swap_filen for reading
 StoreIOState::Pointer
 Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
 {
@@ -398,23 +409,29 @@ Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOS
         return NULL;
     }
 
-    // The only way the entry has swap_filen is if get() locked it for reading
-    // so we do not need to map->openForReadingAt(swap_filen) again here.
+    // The are two ways an entry can get swap_filen: our get() locked it for
+    // reading or our storeSwapOutStart() locked it for writing. Peeking at our
+    // locked entry is safe, but no support for reading a filling entry.
+    const StoreEntryBasics *basics = map->peekAtReader(e.swap_filen);
+    if (!basics)
+        return NULL; // we were writing afterall
 
     IoState *sio = new IoState(this, &e, cbFile, cbIo, data);
 
     sio->swap_dirn = index;
     sio->swap_filen = e.swap_filen;
+    sio->payloadEnd = sizeof(DbCellHeader) + basics->header.payloadSize;
+    assert(sio->payloadEnd <= max_objsize); // the payload fits the slot
+
     debugs(47,5, HERE << "dir " << index << " has old fileno: " <<
         std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
         sio->swap_filen);
 
-    assert(map->valid(sio->swap_filen));
-    sio->offset_ = diskOffset(sio->swap_filen);
-    sio->entrySize = e.swap_file_sz;
-    assert(sio->entrySize <= max_objsize);
+    assert(basics->swap_file_sz > 0);
+    assert(basics->swap_file_sz == e.swap_file_sz);
 
-    assert(sio->offset_ + sio->entrySize <= diskOffsetLimit());
+    sio->diskOffset = diskOffset(sio->swap_filen);
+    assert(sio->diskOffset + sio->payloadEnd <= diskOffsetLimit());
 
     sio->file(theFile);
     return sio;
@@ -460,7 +477,9 @@ Rock::SwapDir::readCompleted(const char *buf, int rlen, int errflag, RefCount< :
     assert(request);
     IoState::Pointer sio = request->sio;
 
-    // do not increment sio->offset_: callers always supply relative offset
+    if (errflag == DISK_OK && rlen > 0)
+        sio->offset_ += rlen;
+    assert(sio->diskOffset + sio->offset_ <= diskOffsetLimit()); // post-factum
 
     StoreIOState::STRCB *callback = sio->read.callback;
     assert(callback);
@@ -481,7 +500,7 @@ Rock::SwapDir::writeCompleted(int errflag, size_t rlen, RefCount< ::WriteRequest
     if (errflag == DISK_OK) {
         // close, assuming we only write once; the entry gets the read lock
         map->closeForWriting(sio.swap_filen);
-        // and sio.offset_ += rlen;
+        // do not increment sio.offset_ because we do it in sio->write()
     } else {
         // Do not abortWriting here. The entry should keep the write lock
         // instead of losing association with the store and confusing core.
@@ -490,7 +509,7 @@ Rock::SwapDir::writeCompleted(int errflag, size_t rlen, RefCount< ::WriteRequest
 
     // TODO: always compute cur_size based on map, do not store it
     cur_size = (HeaderSize + max_objsize * map->entryCount()) >> 10;
-    assert(sio.offset_ <= diskOffsetLimit()); // post-factum check
+    assert(sio.diskOffset + sio.offset_ <= diskOffsetLimit()); // post-factum
 
     sio.finishedWriting(errflag);
 }
@@ -498,7 +517,7 @@ Rock::SwapDir::writeCompleted(int errflag, size_t rlen, RefCount< ::WriteRequest
 bool
 Rock::SwapDir::full() const
 {
-    return map->full();
+    return map && map->full();
 }
 
 void
@@ -625,9 +644,17 @@ Rock::SwapDir::statfs(StoreEntry &e) const
     if (map) {
         const int limit = map->entryLimit();
         storeAppendPrintf(&e, "Maximum entries: %9d\n", limit);
-        if (limit > 0)
+        if (limit > 0) {
+            const int entryCount = map->entryCount();
             storeAppendPrintf(&e, "Current entries: %9d %.2f%%\n",
-                map->entryCount(), (100.0 * map->entryCount() / limit));
+                entryCount, (100.0 * entryCount / limit));
+
+            if (limit < 100) { // XXX: otherwise too expensive to count
+                MapStats stats;
+                map->updateStats(stats);
+                stats.dump(e);
+            }
+        }
     }    
 
     storeAppendPrintf(&e, "Pending operations: %d out of %d\n",
index 375fd530d1b38a0a66bc3a98038897b6b0d29c0c..00ff9d2c4dda80f911ae47f76620a464d2be5842 100644 (file)
@@ -32,7 +32,7 @@ protected:
     virtual bool needsDiskStrand() const;
     virtual void create();
     virtual void init();
-    virtual int canStore(StoreEntry const &) const;
+    virtual bool canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const;
     virtual StoreIOState::Pointer createStoreIO(StoreEntry &, StoreIOState::STFNCB *, StoreIOState::STIOCB *, void *);
     virtual StoreIOState::Pointer openStoreIO(StoreEntry &, StoreIOState::STFNCB *, StoreIOState::STIOCB *, void *);
     virtual void maintain();
@@ -55,7 +55,7 @@ protected:
 
     void rebuild(); ///< starts loading and validating stored entry metadata
     ///< used to add entries successfully loaded during rebuild
-    bool addEntry(const int fileno, const StoreEntry &from);
+    bool addEntry(const int fileno, const DbCellHeader &header, const StoreEntry &from);
 
     bool full() const; ///< no more entries can be stored without purging
     void trackReferences(StoreEntry &e); ///< add to replacement policy scope