entryLimit());
}
-StoreEntryBasics *
+Rock::StoreEntryBasics *
Rock::DirMap::openForWriting(const cache_key *const key, sfileno &fileno)
{
debugs(79, 5, HERE << " trying to open slot for key " << storeKeyText(key)
closeForReading(fileno);
}
+const Rock::StoreEntryBasics *
+Rock::DirMap::peekAtReader(const sfileno fileno) const
+{
+ assert(valid(fileno));
+ const Slot &s = shared->slots[fileno];
+ switch (s.state) {
+ case Slot::Readable:
+ return &s.seBasics; // immediate access by lock holder so no locking
+ case Slot::Writeable:
+ return NULL; // cannot read the slot when it is being written
+ case Slot::Empty:
+ assert(false); // must be locked for reading or writing
+ }
+ assert(false); // not reachable
+ return NULL;
+}
+
bool
-Rock::DirMap::putAt(const StoreEntry &e, const sfileno fileno)
+Rock::DirMap::putAt(const DbCellHeader &header, const StoreEntry &e,
+ const sfileno fileno)
{
const cache_key *key = static_cast<const cache_key*>(e.key);
debugs(79, 5, HERE << " trying to open slot for key " << storeKeyText(key)
if (s.state == Slot::Empty) // we may also overwrite a Readable slot
++shared->count;
s.setKey(static_cast<const cache_key*>(e.key));
- s.seBasics.set(e);
+ s.seBasics.set(header, e);
s.state = Slot::Readable;
s.releaseExclusiveLock();
debugs(79, 5, HERE << " put slot at " << fileno << " for key " <<
freeIfNeeded(s);
}
-const StoreEntryBasics *
+const Rock::StoreEntryBasics *
Rock::DirMap::openForReading(const cache_key *const key, sfileno &fileno)
{
debugs(79, 5, HERE << " trying to open slot for key " << storeKeyText(key)
return 0;
}
-const StoreEntryBasics *
+const Rock::StoreEntryBasics *
Rock::DirMap::openForReadingAt(const sfileno fileno)
{
debugs(79, 5, HERE << " trying to open slot at " << fileno << " for "
return entryCount() >= entryLimit();
}
+void
+Rock::DirMap::updateStats(MapStats &stats) const
+{
+ stats.capacity += shared->limit;
+ for (int i = 0; i < shared->limit; ++i)
+ shared->slots[i].updateStats(stats);
+}
+
bool
Rock::DirMap::valid(const int pos) const
{
releaseExclusiveLock();
}
+void
+Rock::Slot::updateStats(MapStats &stats) const
+{
+ switch (state) {
+ case Readable:
+ ++stats.readable;
+ stats.readers += readers;
+ break;
+ case Writeable:
+ ++stats.writeable;
+ stats.writers += writers;
+ break;
+ case Empty:
+ ++stats.empty;
+ break;
+ }
+
+ if (waitingToBeFreed)
+ ++stats.marked;
+}
+
+
Rock::DirMap::Shared::Shared(const int aLimit): limit(aLimit), count(0)
{
}
void
-StoreEntryBasics::set(const StoreEntry &from)
+Rock::StoreEntryBasics::set(const DbCellHeader &aHeader, const StoreEntry &from)
{
+ assert(from.swap_file_sz > 0);
memset(this, 0, sizeof(*this));
+ header = aHeader;
timestamp = from.timestamp;
lastref = from.lastref;
expires = from.expires;
refcount = from.refcount;
flags = from.flags;
}
+
+
+/* MapStats */
+
+Rock::MapStats::MapStats()
+{
+ memset(this, 0, sizeof(*this));
+}
+
+void
+Rock::MapStats::dump(StoreEntry &e) const
+{
+ storeAppendPrintf(&e, "Available slots: %9d\n", capacity);
+
+ if (!capacity)
+ return;
+
+ storeAppendPrintf(&e, "Readable slots: %9d %6.2f%%\n",
+ readable, (100.0 * readable / capacity));
+ storeAppendPrintf(&e, "Filling slots: %9d %6.2f%%\n",
+ writeable, (100.0 * writeable / capacity));
+ storeAppendPrintf(&e, "Empty slots: %9d %6.2f%%\n",
+ empty, (100.0 * empty / capacity));
+
+ if (readers || writers) {
+ const int locks = readers + writers;
+ storeAppendPrintf(&e, "Readers: %9d %6.2f%%\n",
+ readers, (100.0 * readers / locks));
+ storeAppendPrintf(&e, "Writers: %9d %6.2f%%\n",
+ writers, (100.0 * writers / locks));
+ }
+
+ if (readable + writeable) {
+ storeAppendPrintf(&e, "Marked slots: %9d %6.2f%%\n",
+ marked, (100.0 * marked / (readable + writeable)));
+ }
+}
#ifndef SQUID_FS_ROCK_DIR_MAP_H
#define SQUID_FS_ROCK_DIR_MAP_H
+#include "fs/rock/RockFile.h"
#include "ipc/AtomicWord.h"
#include "ipc/SharedMemory.h"
+namespace Rock {
+
class StoreEntryBasics {
public:
- void set(const StoreEntry &from);
+ void set(const DbCellHeader &aHeader, const StoreEntry &anEntry);
+
+ DbCellHeader header; ///< rock-specific entry metadata
/* START OF ON-DISK STORE_META_STD TLV field */
time_t timestamp;
/* END OF ON-DISK STORE_META_STD */
};
-namespace Rock {
+/// aggregates basic map performance measurements; all numbers are approximate
+class MapStats {
+public:
+ MapStats();
+
+ void dump(StoreEntry &e) const;
+
+ int capacity; ///< the total number of slots in the map
+ int readable; ///< number of slots in Readable state
+ int writeable; ///< number of slots in Writeable state
+ int empty; ///< number of slots in Empty state
+ int readers; ///< sum of slot.readers
+ int writers; ///< sum of slot.writers
+ int marked; ///< number of slots marked for freeing
+};
/// DirMap entry
class Slot {
void releaseExclusiveLock(); ///< undo successful exclusiveLock()
void switchExclusiveToSharedLock(); ///< trade exclusive for shared access
+ /// adds approximate current stats to the supplied ones
+ void updateStats(MapStats &stats) const;
+
public:
// we want two uint64_t, but older GCCs lack __sync_fetch_and_add_8
AtomicWordT<uint32_t> key_[4]; ///< MD5 entry key
void closeForWriting(const sfileno fileno);
/// stores entry info at the requested slot or returns false
- bool putAt(const StoreEntry &e, const sfileno fileno);
+ bool putAt(const DbCellHeader &header, const StoreEntry &e, const sfileno fileno);
+
+ /// only works on locked entries; returns nil unless the slot is readable
+ const StoreEntryBasics *peekAtReader(const sfileno fileno) const;
/// mark the slot as waiting to be freed and, if possible, free it
void free(const sfileno fileno);
int entryCount() const; ///< number of used slots
int entryLimit() const; ///< maximum number of slots that can be used
+ /// adds approximate current stats to the supplied ones
+ void updateStats(MapStats &stats) const;
+
static int AbsoluteEntryLimit(); ///< maximum entryLimit() possible
private:
+#ifndef SQUID_FS_ROCK_DB_CELL_H
+#define SQUID_FS_ROCK_DB_CELL_H
+
+// XXX: rename to fs/rock/RockDbCell.{cc,h}
+
+namespace Rock {
+
+/// \ingroup Rock
+/// meta-information at the beginning of every db cell
+class DbCellHeader
+{
+public:
+ DbCellHeader(): payloadSize(0), reserved(0) {}
+
+ /// whether the freshly loaded header fields make sense
+ bool sane() const { return payloadSize >= 0 && reserved == 0; }
+
+ int64_t payloadSize; ///< cell contents size excluding this header
+ int64_t reserved; ///< reserved for future use (next cell pointer?)
+};
+
+} // namespace Rock
+
+#endif /* SQUID_FS_ROCK_DB_CELL_H */
* DEBUG: section 79 Disk IO Routines
*/
+#include "config.h"
+#include "MemObject.h"
#include "Parsing.h"
#include "DiskIO/DiskIOModule.h"
#include "DiskIO/DiskIOStrategy.h"
StoreIOState::STIOCB *cbIo,
void *data):
slotSize(0),
- entrySize(0)
+ diskOffset(-1),
+ payloadEnd(-1)
{
e = anEntry;
- swap_filen = e->swap_filen;
- swap_dirn = dir->index;
+ // swap_filen, swap_dirn, diskOffset, and payloadEnd are set by the caller
slotSize = dir->max_objsize;
file_callback = cbFile;
callback = cbIo;
}
void
-Rock::IoState::read_(char *buf, size_t len, off_t off, STRCB *cb, void *data)
+Rock::IoState::read_(char *buf, size_t len, off_t coreOff, STRCB *cb, void *data)
{
assert(theFile != NULL);
assert(theFile->canRead());
+ assert(coreOff >= 0);
+ offset_ = coreOff;
+
+ // we skip our cell header; it is only read when building the map
+ const int64_t cellOffset = sizeof(DbCellHeader) +
+ static_cast<int64_t>(coreOff);
+ assert(cellOffset <= payloadEnd);
// Core specifies buffer length, but we must not exceed stored entry size
- assert(off >= 0);
- assert(entrySize >= 0);
- const int64_t offset = static_cast<int64_t>(off);
- assert(offset <= entrySize);
- if (offset + (int64_t)len > entrySize)
- len = entrySize - offset;
+ if (cellOffset + (int64_t)len > payloadEnd)
+ len = payloadEnd - cellOffset;
assert(read.callback == NULL);
assert(read.callback_data == NULL);
read.callback = cb;
read.callback_data = cbdataReference(data);
- theFile->read(new ReadRequest(::ReadRequest(buf, offset_ + offset, len), this));
+ theFile->read(new ReadRequest(
+ ::ReadRequest(buf, diskOffset + cellOffset, len), this));
}
// We only buffer data here; we actually write when close() is called.
// We buffer, in part, to avoid forcing OS to _read_ old unwritten portions
// of the slot when the write does not end at the page or sector boundary.
void
-Rock::IoState::write(char const *buf, size_t size, off_t offset, FREE *dtor)
+Rock::IoState::write(char const *buf, size_t size, off_t coreOff, FREE *dtor)
{
// TODO: move to create?
- if (!offset) {
+ if (!coreOff) {
assert(theBuf.isNull());
- assert(entrySize >= 0);
- theBuf.init(min(entrySize, slotSize), slotSize);
+ assert(payloadEnd <= slotSize);
+ theBuf.init(min(payloadEnd, slotSize), slotSize);
+ // start with our header; TODO: consider making it a trailer
+ DbCellHeader header;
+ assert(static_cast<int64_t>(sizeof(header)) <= payloadEnd);
+ header.payloadSize = payloadEnd - sizeof(header);
+ theBuf.append(reinterpret_cast<const char*>(&header), sizeof(header));
} else {
// Core uses -1 offset as "append". Sigh.
- assert(offset == -1);
+ assert(coreOff == -1);
assert(!theBuf.isNull());
}
theBuf.append(buf, size);
+ offset_ += size; // so that Core thinks we wrote it
if (dtor)
(dtor)(const_cast<char*>(buf)); // cast due to a broken API?
// TODO: if DiskIO module is mmap-based, we should be writing whole pages
// to avoid triggering read-page;new_head+old_tail;write-page overheads
- debugs(79, 5, HERE << swap_filen << " at " << offset_ << '+' <<
+ debugs(79, 5, HERE << swap_filen << " at " << diskOffset << '+' <<
theBuf.contentSize());
assert(theBuf.contentSize() <= slotSize);
// theFile->write may call writeCompleted immediatelly
- theFile->write(new WriteRequest(::WriteRequest(theBuf.content(), offset_,
- theBuf.contentSize(), theBuf.freeFunc()), this));
+ theFile->write(new WriteRequest(::WriteRequest(theBuf.content(),
+ diskOffset, theBuf.contentSize(), theBuf.freeFunc()), this));
}
//
void
Rock::IoState::finishedWriting(const int errFlag)
{
+ // we incremented offset_ while accumulating data in write()
callBack(errFlag);
}
void
-Rock::IoState::close()
+Rock::IoState::close(int how)
{
- debugs(79, 3, HERE << swap_filen << " at " << offset_);
- if (!theBuf.isNull())
+ debugs(79, 3, HERE << swap_filen << " accumulated: " << offset_ <<
+ " how=" << how);
+ if (how == wroteAll && !theBuf.isNull())
startWriting();
else
- callBack(0);
+ callBack(how == writerGone ? DISK_ERROR : 0); // TODO: add DISK_CALLER_GONE
}
/// close callback (STIOCB) dialer: breaks dependencies and
// ::StoreIOState API
virtual void read_(char *buf, size_t size, off_t offset, STRCB * callback, void *callback_data);
virtual void write(char const *buf, size_t size, off_t offset, FREE * free_func);
- virtual void close();
+ virtual void close(int how);
/// called by SwapDir when writing is done
void finishedWriting(int errFlag);
int64_t slotSize; ///< db cell size
- int64_t entrySize; ///< planned or actual stored size for the entry
+ int64_t diskOffset; ///< the start of this cell inside the db file
+
+ /// when reading: number of bytes previously written to the db cell;
+ /// when writing: maximum payload offset in a db cell
+ int64_t payloadEnd;
MEMPROXY_CLASS(IoState);
* DEBUG: section 79 Disk IO Routines
*/
+#include "config.h"
#include "fs/rock/RockRebuild.h"
#include "fs/rock/RockSwapDir.h"
+#include "fs/rock/RockFile.h"
CBDATA_NAMESPACED_CLASS_INIT(Rock, Rebuild);
debugs(47,5, HERE << sd->index << " fileno " << fileno << " at " <<
dbOffset << " <= " << dbSize);
+ ++counts.scancount;
+
if (lseek(fd, dbOffset, SEEK_SET) < 0)
failure("cannot seek to db entry", errno);
+ MemBuf buf;
+ buf.init(SM_PAGE_SIZE, SM_PAGE_SIZE);
+
+ if (!storeRebuildLoadEntry(fd, sd->index, buf, counts))
+ return;
+
+ // get our header
+ DbCellHeader header;
+ if (buf.contentSize() < static_cast<mb_size_t>(sizeof(header))) {
+ debugs(47, 1, "cache_dir[" << sd->index << "]: " <<
+ "truncated swap entry meta data at " << dbOffset);
+ counts.invalid++;
+ return;
+ }
+ memcpy(&header, buf.content(), sizeof(header));
+
+ if (!header.sane()) {
+ debugs(47, 1, "cache_dir[" << sd->index << "]: " <<
+ "malformed rock db cell header at " << dbOffset);
+ counts.invalid++;
+ return;
+ }
+ buf.consume(sizeof(header)); // optimize to avoid memmove()
+
cache_key key[SQUID_MD5_DIGEST_LENGTH];
StoreEntry loadedE;
- if (!storeRebuildLoadEntry(fd, loadedE, key, counts, 0)) {
+ if (!storeRebuildParseEntry(buf, loadedE, key, counts, header.payloadSize)) {
// skip empty slots
if (loadedE.swap_filen > 0 || loadedE.swap_file_sz > 0) {
counts.invalid++;
counts.objcount++;
// loadedE->dump(5);
- sd->addEntry(fileno, loadedE);
+ sd->addEntry(fileno, header, loadedE);
}
void
/* Add a new object to the cache with empty memory copy and pointer to disk
* use to rebuild store from disk. Based on UFSSwapDir::addDiskRestore */
bool
-Rock::SwapDir::addEntry(const int fileno, const StoreEntry &from)
+Rock::SwapDir::addEntry(const int fileno, const DbCellHeader &header, const StoreEntry &from)
{
debugs(47, 8, HERE << &from << ' ' << from.getMD5Text() <<
", fileno="<< std::setfill('0') << std::hex << std::uppercase <<
std::setw(8) << fileno);
- if (map->putAt(from, fileno)) {
+ if (map->putAt(header, from, fileno)) {
// we do not add this entry to store_table so core will not updateSize
updateSize(from.swap_file_sz, +1);
return true;
}
-int
-Rock::SwapDir::canStore(const StoreEntry &e) const
+bool
+Rock::SwapDir::canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const
{
- debugs(47,8, HERE << e.swap_file_sz << " ? " << max_objsize);
-
- if (EBIT_TEST(e.flags, ENTRY_SPECIAL))
- return -1;
+ if (!::SwapDir::canStore(e, sizeof(DbCellHeader)+diskSpaceNeeded, load))
+ return false;
- if (!theFile || !theFile->canRead() || !theFile->canWrite())
- return -1;
+ if (!theFile || !theFile->canWrite())
+ return false;
if (!map)
- return -1;
+ return false;
if (io->shedLoad())
- return -1;
+ return false;
- return io->load();
+ load = io->load();
+ return true;
}
StoreIOState::Pointer
return NULL;
}
+ // compute payload size for our cell header, using StoreEntry info
+ // careful: e.objectLen() may still be negative here
+ const int64_t expectedReplySize = e.mem_obj->expectedReplySize();
+ assert(expectedReplySize >= 0); // must know to prevent cell overflows
+ assert(e.mem_obj->swap_hdr_sz > 0);
+ DbCellHeader header;
+ header.payloadSize = e.mem_obj->swap_hdr_sz + expectedReplySize;
+ const int64_t payloadEnd = sizeof(DbCellHeader) + header.payloadSize;
+ assert(payloadEnd <= max_objsize);
+
sfileno fileno;
StoreEntryBasics *const basics =
map->openForWriting(reinterpret_cast<const cache_key *>(e.key), fileno);
debugs(47, 5, HERE << "Rock::SwapDir::createStoreIO: map->add failed");
return NULL;
}
- basics->set(e);
+ e.swap_file_sz = header.payloadSize; // and will be copied to the map
+ basics->set(header, e);
- // XXX: We rely on our caller, storeSwapOutStart(), to set e->fileno.
+ // XXX: We rely on our caller, storeSwapOutStart(), to set e.fileno.
// If that does not happen, the entry will not decrement the read level!
IoState *sio = new IoState(this, &e, cbFile, cbIo, data);
sio->swap_dirn = index;
sio->swap_filen = fileno;
- sio->offset_ = diskOffset(sio->swap_filen);
- sio->entrySize = e.objectLen() + e.mem_obj->swap_hdr_sz;
+ sio->payloadEnd = payloadEnd;
+ sio->diskOffset = diskOffset(sio->swap_filen);
debugs(47,5, HERE << "dir " << index << " created new fileno " <<
std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
- sio->swap_filen << std::dec << " at " << sio->offset_ << " size: " <<
- sio->entrySize << " (" << e.objectLen() << '+' <<
- e.mem_obj->swap_hdr_sz << ")");
+ sio->swap_filen << std::dec << " at " << sio->diskOffset);
- assert(sio->offset_ + sio->entrySize <= diskOffsetLimit());
+ assert(sio->diskOffset + payloadEnd <= diskOffsetLimit());
sio->file(theFile);
int64_t
Rock::SwapDir::diskOffset(int filen) const
{
+ assert(filen >= 0);
return HeaderSize + max_objsize*filen;
}
int64_t
Rock::SwapDir::diskOffsetLimit() const
{
+ assert(map);
return diskOffset(map->entryLimit());
}
+// tries to open an old or being-written-to entry with swap_filen for reading
StoreIOState::Pointer
Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
{
return NULL;
}
- // The only way the entry has swap_filen is if get() locked it for reading
- // so we do not need to map->openForReadingAt(swap_filen) again here.
+ // The are two ways an entry can get swap_filen: our get() locked it for
+ // reading or our storeSwapOutStart() locked it for writing. Peeking at our
+ // locked entry is safe, but no support for reading a filling entry.
+ const StoreEntryBasics *basics = map->peekAtReader(e.swap_filen);
+ if (!basics)
+ return NULL; // we were writing afterall
IoState *sio = new IoState(this, &e, cbFile, cbIo, data);
sio->swap_dirn = index;
sio->swap_filen = e.swap_filen;
+ sio->payloadEnd = sizeof(DbCellHeader) + basics->header.payloadSize;
+ assert(sio->payloadEnd <= max_objsize); // the payload fits the slot
+
debugs(47,5, HERE << "dir " << index << " has old fileno: " <<
std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
sio->swap_filen);
- assert(map->valid(sio->swap_filen));
- sio->offset_ = diskOffset(sio->swap_filen);
- sio->entrySize = e.swap_file_sz;
- assert(sio->entrySize <= max_objsize);
+ assert(basics->swap_file_sz > 0);
+ assert(basics->swap_file_sz == e.swap_file_sz);
- assert(sio->offset_ + sio->entrySize <= diskOffsetLimit());
+ sio->diskOffset = diskOffset(sio->swap_filen);
+ assert(sio->diskOffset + sio->payloadEnd <= diskOffsetLimit());
sio->file(theFile);
return sio;
assert(request);
IoState::Pointer sio = request->sio;
- // do not increment sio->offset_: callers always supply relative offset
+ if (errflag == DISK_OK && rlen > 0)
+ sio->offset_ += rlen;
+ assert(sio->diskOffset + sio->offset_ <= diskOffsetLimit()); // post-factum
StoreIOState::STRCB *callback = sio->read.callback;
assert(callback);
if (errflag == DISK_OK) {
// close, assuming we only write once; the entry gets the read lock
map->closeForWriting(sio.swap_filen);
- // and sio.offset_ += rlen;
+ // do not increment sio.offset_ because we do it in sio->write()
} else {
// Do not abortWriting here. The entry should keep the write lock
// instead of losing association with the store and confusing core.
// TODO: always compute cur_size based on map, do not store it
cur_size = (HeaderSize + max_objsize * map->entryCount()) >> 10;
- assert(sio.offset_ <= diskOffsetLimit()); // post-factum check
+ assert(sio.diskOffset + sio.offset_ <= diskOffsetLimit()); // post-factum
sio.finishedWriting(errflag);
}
bool
Rock::SwapDir::full() const
{
- return map->full();
+ return map && map->full();
}
void
if (map) {
const int limit = map->entryLimit();
storeAppendPrintf(&e, "Maximum entries: %9d\n", limit);
- if (limit > 0)
+ if (limit > 0) {
+ const int entryCount = map->entryCount();
storeAppendPrintf(&e, "Current entries: %9d %.2f%%\n",
- map->entryCount(), (100.0 * map->entryCount() / limit));
+ entryCount, (100.0 * entryCount / limit));
+
+ if (limit < 100) { // XXX: otherwise too expensive to count
+ MapStats stats;
+ map->updateStats(stats);
+ stats.dump(e);
+ }
+ }
}
storeAppendPrintf(&e, "Pending operations: %d out of %d\n",
virtual bool needsDiskStrand() const;
virtual void create();
virtual void init();
- virtual int canStore(StoreEntry const &) const;
+ virtual bool canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const;
virtual StoreIOState::Pointer createStoreIO(StoreEntry &, StoreIOState::STFNCB *, StoreIOState::STIOCB *, void *);
virtual StoreIOState::Pointer openStoreIO(StoreEntry &, StoreIOState::STFNCB *, StoreIOState::STIOCB *, void *);
virtual void maintain();
void rebuild(); ///< starts loading and validating stored entry metadata
///< used to add entries successfully loaded during rebuild
- bool addEntry(const int fileno, const StoreEntry &from);
+ bool addEntry(const int fileno, const DbCellHeader &header, const StoreEntry &from);
bool full() const; ///< no more entries can be stored without purging
void trackReferences(StoreEntry &e); ///< add to replacement policy scope