and a simple "scan" replacement policy.
Needs polishing and possibly a swap.state equivalent for faster rebuild.
librock_la_SOURCES = \
rock/RockDbCell.cc \
rock/RockDbCell.h \
+ rock/RockForward.h \
rock/RockIoState.cc \
rock/RockIoState.h \
rock/RockIoRequests.cc \
#include "squid.h"
#include "fs/rock/RockDbCell.h"
-#include "ipc/StoreMap.h"
-#include "tools.h"
-Rock::DbCellHeader::DbCellHeader(): firstSlot(0), nextSlot(0), version(0),
- payloadSize(0) {
- memset(&key, 0, sizeof(key));
-}
-
-bool
-Rock::DbCellHeader::sane() const {
- return firstSlot > 0;
+Rock::DbCellHeader::DbCellHeader()
+{
+ memset(this, 0, sizeof(*this));
}
#ifndef SQUID_FS_ROCK_DB_CELL_H
#define SQUID_FS_ROCK_DB_CELL_H
-namespace Ipc
-{
-class StoreMapSlot;
-}
+#include "typedefs.h"
namespace Rock
{
/** \ingroup Rock
* Meta-information at the beginning of every db cell.
+ * Links multiple map slots belonging to the same entry into an entry chain.
* Stored on disk and used as sizeof() argument so it must remain POD.
*/
class DbCellHeader
public:
DbCellHeader();
- /// whether the freshly loaded header fields make sense
- bool sane() const;
+ /// true iff no entry occupies this slot
+ bool empty() const { return !firstSlot && !nextSlot && !payloadSize; }
+
+ /* members below are not meaningful if empty() */
+
+ /// whether this slot is not corrupted
+ bool sane() const { return firstSlot >= 0 && nextSlot >= -1 &&
+ version > 0 && payloadSize > 0; }
uint64_t key[2]; ///< StoreEntry key
- uint32_t firstSlot; ///< first slot pointer in the entry chain
- uint32_t nextSlot; ///< next slot pointer in the entry chain
- uint32_t version; ///< entry chain version
- uint32_t payloadSize; ///< cell contents size excluding this header
+ uint64_t entrySize; ///< total entry content size or zero if still unknown
+ uint32_t payloadSize; ///< slot contents size, always positive
+ uint32_t version; ///< detects conflicts among same-key entries
+ sfileno firstSlot; ///< slot ID of the first slot occupied by the entry
+ sfileno nextSlot; ///< slot ID of the next slot occupied by the entry
};
} // namespace Rock
--- /dev/null
+#ifndef SQUID_FS_ROCK_FORWARD_H
+#define SQUID_FS_ROCK_FORWARD_H
+
+namespace Ipc
+{
+
+class StoreMapAnchor;
+class StoreMapSlice;
+
+namespace Mem
+{
+class PageId;
+}
+
+}
+
+
+namespace Rock
+{
+
+class SwapDir;
+
+/// db cell number, starting with cell 0 (always occupied by the db header)
+typedef sfileno SlotId;
+
+class Rebuild;
+
+class IoState;
+
+class DbCellHeader;
+
+}
+
+
+#endif /* SQUID_FS_ROCK_FORWARD_H */
*/
#include "squid.h"
-#include "MemObject.h"
-#include "Parsing.h"
+#include "base/TextException.h"
#include "DiskIO/DiskIOModule.h"
#include "DiskIO/DiskIOStrategy.h"
#include "DiskIO/WriteRequest.h"
#include "fs/rock/RockIoRequests.h"
#include "fs/rock/RockSwapDir.h"
#include "globals.h"
+#include "MemObject.h"
+#include "Mem.h"
+#include "Parsing.h"
-Rock::IoState::IoState(SwapDir &aDir,
+Rock::IoState::IoState(Rock::SwapDir::Pointer &aDir,
StoreEntry *anEntry,
StoreIOState::STFNCB *cbFile,
StoreIOState::STIOCB *cbIo,
void *data):
- dbSlot(NULL),
+ readableAnchor_(NULL),
+ writeableAnchor_(NULL),
+ sidCurrent(-1),
dir(aDir),
- slotSize(dir.slotSize),
- objOffset(0)
+ slotSize(dir->slotSize),
+ objOffset(0),
+ theBuf(dir->slotSize)
{
e = anEntry;
- // swap_filen, swap_dirn and diskOffset are set by the caller
+ e->lock("rock I/O");
+ // anchor, swap_filen, and swap_dirn are set by the caller
file_callback = cbFile;
callback = cbIo;
callback_data = cbdataReference(data);
Rock::IoState::~IoState()
{
--store_open_disk_fd;
+
+ // The dir map entry may still be open for reading at the point because
+ // the map entry lock is associated with StoreEntry, not IoState.
+ // assert(!readableAnchor_);
+ assert(!writeableAnchor_);
+
if (callback_data)
cbdataReferenceDone(callback_data);
theFile = NULL;
+
+ e->unlock("rock I/O");
}
void
theFile = aFile;
}
+const Ipc::StoreMapAnchor &
+Rock::IoState::readAnchor() const
+{
+ assert(readableAnchor_);
+ return *readableAnchor_;
+}
+
+Ipc::StoreMapAnchor &
+Rock::IoState::writeAnchor()
+{
+ assert(writeableAnchor_);
+ return *writeableAnchor_;
+}
+
+/// convenience wrapper returning the map slot we are reading now
+const Ipc::StoreMapSlice &
+Rock::IoState::currentReadableSlice() const
+{
+ return dir->map->readableSlice(swap_filen, sidCurrent);
+}
+
void
Rock::IoState::read_(char *buf, size_t len, off_t coreOff, STRCB *cb, void *data)
{
+ debugs(79, 7, swap_filen << " reads from " << coreOff);
+
assert(theFile != NULL);
assert(coreOff >= 0);
- Ipc::Mem::PageId pageId;
- pageId.pool = dir.index;
- if (coreOff < objOffset) { // rewind
- pageId.number = dbSlot->firstSlot;
- dbSlot = &dir.dbSlot(pageId);
+ // if we are dealing with the first read or
+ // if the offset went backwords, start searching from the beginning
+ if (sidCurrent < 0 || coreOff < objOffset) {
+ sidCurrent = readAnchor().start;
objOffset = 0;
}
- while (coreOff >= objOffset + dbSlot->payloadSize) {
- objOffset += dbSlot->payloadSize;
- pageId.number = dbSlot->nextSlot;
- assert(pageId); // XXX: should be an error?
- dbSlot = &dir.dbSlot(pageId);
+ while (coreOff >= objOffset + currentReadableSlice().size) {
+ objOffset += currentReadableSlice().size;
+ sidCurrent = currentReadableSlice().next;
+ assert(sidCurrent >= 0); // XXX: handle "read offset too big" error
}
- if (pageId)
- diskOffset = dir.diskOffset(pageId);
offset_ = coreOff;
len = min(len,
- static_cast<size_t>(objOffset + dbSlot->payloadSize - coreOff));
+ static_cast<size_t>(objOffset + currentReadableSlice().size - coreOff));
assert(read.callback == NULL);
assert(read.callback_data == NULL);
read.callback = cb;
read.callback_data = cbdataReference(data);
+ const uint64_t diskOffset = dir->diskOffset(sidCurrent);
theFile->read(new ReadRequest(::ReadRequest(buf,
diskOffset + sizeof(DbCellHeader) + coreOff - objOffset, len), this));
}
-// We only write data when full slot is accumulated or when close() is called.
-// We buffer, in part, to avoid forcing OS to _read_ old unwritten portions
-// of the slot when the write does not end at the page or sector boundary.
-void
+/// wraps tryWrite() to handle deep write failures centrally and safely
+bool
Rock::IoState::write(char const *buf, size_t size, off_t coreOff, FREE *dtor)
{
- assert(dbSlot);
-
- if (theBuf.isNull()) {
- theBuf.init(min(size + sizeof(DbCellHeader), slotSize), slotSize);
- theBuf.appended(sizeof(DbCellHeader)); // will fill header in doWrite
+ bool success = false;
+ try {
+ tryWrite(buf, size, coreOff);
+ success = true;
+ } catch (const std::exception &e) { // TODO: should we catch ... as well?
+ debugs(79, 2, "db write error: " << e.what());
+ dir->writeError(swap_filen);
+ finishedWriting(DISK_ERROR);
+ // 'this' might be gone beyond this point; fall through to free buf
}
- if (size <= static_cast<size_t>(theBuf.spaceSize()))
- theBuf.append(buf, size);
- else {
- Ipc::Mem::PageId pageId;
- if (!dir.popDbSlot(pageId)) {
- debugs(79, DBG_IMPORTANT, "WARNING: Rock cache_dir '" << dir.path <<
- "' run out of DB slots");
- dir.writeError(swap_filen);
- // XXX: do we need to destroy buf on error?
- if (dtor)
- (dtor)(const_cast<char*>(buf)); // cast due to a broken API?
- // XXX: do we need to call callback on error?
- callBack(DISK_ERROR);
- return;
- }
- DbCellHeader &nextDbSlot = dir.dbSlot(pageId);
- memcpy(nextDbSlot.key, dbSlot->key, sizeof(nextDbSlot.key));
- nextDbSlot.firstSlot = dbSlot->firstSlot;
- nextDbSlot.nextSlot = 0;
- nextDbSlot.version = dbSlot->version;
- nextDbSlot.payloadSize = 0;
+ // careful: 'this' might be gone here
+
+ if (dtor)
+ (dtor)(const_cast<char*>(buf)); // cast due to a broken API?
+
+ return success;
+}
+
+/** We only write data when full slot is accumulated or when close() is called.
+ We buffer, in part, to avoid forcing OS to _read_ old unwritten portions of
+ the slot when the write does not end at the page or sector boundary. */
+void
+Rock::IoState::tryWrite(char const *buf, size_t size, off_t coreOff)
+{
+ debugs(79, 7, swap_filen << " writes " << size << " more");
+
+ // either this is the first write or append; we do not support write gaps
+ assert(!coreOff || coreOff == -1);
- dbSlot->nextSlot = pageId.number;
+ // allocate the first slice diring the first write
+ if (!coreOff) {
+ assert(sidCurrent < 0);
+ sidCurrent = reserveSlotForWriting(); // throws on failures
+ assert(sidCurrent >= 0);
+ writeAnchor().start = sidCurrent;
+ }
- const size_t left = size - theBuf.spaceSize();
- offset_ += theBuf.spaceSize(); // so that Core thinks we wrote it
- theBuf.append(buf, theBuf.spaceSize());
+ // buffer incoming data in slot buffer and write overflowing or final slots
+ // quit when no data left or we stopped writing on reentrant error
+ while (size > 0 && theFile != NULL) {
+ assert(sidCurrent >= 0);
+ const size_t processed = writeToBuffer(buf, size);
+ buf += processed;
+ size -= processed;
+ const bool overflow = size > 0;
+
+ // We do not write a full buffer without overflow because
+ // we would not yet know what to set the nextSlot to.
+ if (overflow) {
+ const SlotId sidNext = reserveSlotForWriting(); // throws
+ assert(sidNext >= 0);
+ writeToDisk(sidNext);
+ }
+ }
+}
- doWrite();
+/// Buffers incoming data for the current slot.
+/// Returns the number of bytes buffered.
+size_t
+Rock::IoState::writeToBuffer(char const *buf, size_t size)
+{
+ // do not buffer a cell header for nothing
+ if (!size)
+ return 0;
- dbSlot = &nextDbSlot;
- diskOffset = dir.diskOffset(pageId);
- theBuf.init(min(left, slotSize), slotSize);
- write(buf + size - left, left, -1, NULL);
+ if (!theBuf.size) {
+ // will fill the header in writeToDisk when the next slot is known
+ theBuf.appended(sizeof(DbCellHeader));
}
- if (dtor)
- (dtor)(const_cast<char*>(buf)); // cast due to a broken API?
+ size_t forCurrentSlot = min(size, static_cast<size_t>(theBuf.spaceSize()));
+ theBuf.append(buf, forCurrentSlot);
+ offset_ += forCurrentSlot; // so that Core thinks we wrote it
+ return forCurrentSlot;
}
-// write what was buffered during write() calls
+/// write what was buffered during write() calls
+/// negative sidNext means this is the last write request for this entry
void
-Rock::IoState::doWrite(const bool isLast)
+Rock::IoState::writeToDisk(const SlotId sidNext)
{
assert(theFile != NULL);
- assert(!theBuf.isNull());
+ assert(theBuf.size >= sizeof(DbCellHeader));
+
+ if (sidNext < 0) { // we are writing the last slot
+ e->swap_file_sz = offset_;
+ writeAnchor().basics.swap_file_sz = offset_; // would not hurt, right?
+ }
// TODO: if DiskIO module is mmap-based, we should be writing whole pages
// to avoid triggering read-page;new_head+old_tail;write-page overheads
+ const uint64_t diskOffset = dir->diskOffset(sidCurrent);
debugs(79, 5, HERE << swap_filen << " at " << diskOffset << '+' <<
- theBuf.contentSize());
+ theBuf.size);
+
+ // finalize map slice
+ Ipc::StoreMap::Slice &slice =
+ dir->map->writeableSlice(swap_filen, sidCurrent);
+ slice.next = sidNext;
+ slice.size = theBuf.size - sizeof(DbCellHeader);
+
+ // finalize db cell header
+ DbCellHeader header;
+ memcpy(header.key, e->key, sizeof(header.key));
+ header.firstSlot = writeAnchor().start;
+ header.nextSlot = sidNext;
+ header.payloadSize = theBuf.size - sizeof(DbCellHeader);
+ header.entrySize = e->swap_file_sz; // may still be zero unless sidNext < 0
+ header.version = writeAnchor().basics.timestamp;
+
+ // copy finalized db cell header into buffer
+ memcpy(theBuf.mem, &header, sizeof(DbCellHeader));
+
+ // and now allocate another buffer for the WriteRequest so that
+ // we can support concurrent WriteRequests (and to ease cleaning)
+ // TODO: should we limit the number of outstanding requests?
+ size_t wBufCap = 0;
+ void *wBuf = memAllocBuf(theBuf.size, &wBufCap);
+ memcpy(wBuf, theBuf.mem, theBuf.size);
+
+ WriteRequest *const r = new WriteRequest(
+ ::WriteRequest(static_cast<char*>(wBuf), diskOffset, theBuf.size,
+ memFreeBufFunc(wBufCap)), this, sidNext < 0);
+ theBuf.clear();
- dbSlot->payloadSize = theBuf.contentSize() - sizeof(DbCellHeader);
- memcpy(theBuf.content(), dbSlot, sizeof(DbCellHeader));
+ sidCurrent = sidNext;
- assert(static_cast<size_t>(theBuf.contentSize()) <= slotSize);
// theFile->write may call writeCompleted immediatelly
- WriteRequest *const r = new WriteRequest(
- ::WriteRequest(theBuf.content(), diskOffset, theBuf.contentSize(),
- theBuf.freeFunc()), this, isLast);
theFile->write(r);
}
-//
+/// finds and returns a free db slot to fill or throws
+Rock::SlotId
+Rock::IoState::reserveSlotForWriting()
+{
+ Ipc::Mem::PageId pageId;
+ if (dir->useFreeSlot(pageId))
+ return pageId.number-1;
+
+ // This may happen when the number of available db slots is close to the
+ // number of concurrent requests reading or writing those slots, which may
+ // happen when the db is "small" compared to the request traffic OR when we
+ // are rebuilding and have not loaded "many" entries or empty slots yet.
+ throw TexcHere("ran out of free db slots");
+}
+
void
Rock::IoState::finishedWriting(const int errFlag)
{
// we incremented offset_ while accumulating data in write()
+ writeableAnchor_ = NULL;
callBack(errFlag);
}
void
Rock::IoState::close(int how)
{
- debugs(79, 3, HERE << swap_filen << " accumulated: " << offset_ <<
- " how=" << how);
- if (how == wroteAll && !theBuf.isNull())
- doWrite(true);
- else
- callBack(how == writerGone ? DISK_ERROR : 0); // TODO: add DISK_CALLER_GONE
+ debugs(79, 3, swap_filen << " offset: " << offset_ << " how: " << how <<
+ " buf: " << theBuf.size << " callback: " << callback);
+
+ if (!theFile) {
+ debugs(79, 3, "I/O already canceled");
+ assert(!callback);
+ assert(!writeableAnchor_);
+ assert(!readableAnchor_);
+ return;
+ }
+
+ switch (how) {
+ case wroteAll:
+ assert(theBuf.size > 0); // we never flush last bytes on our own
+ writeToDisk(-1); // flush last, yet unwritten slot to disk
+ return; // writeCompleted() will callBack()
+
+ case writerGone:
+ assert(writeableAnchor_);
+ dir->writeError(swap_filen); // abort a partially stored entry
+ finishedWriting(DISK_ERROR);
+ return;
+
+ case readerDone:
+ callBack(0);
+ return;
+ }
}
/// close callback (STIOCB) dialer: breaks dependencies and
#ifndef SQUID_FS_ROCK_IO_STATE_H
#define SQUID_FS_ROCK_IO_STATE_H
-#include "MemBuf.h"
-#include "SwapDir.h"
+#include "fs/rock/RockSwapDir.h"
+#include "MemBlob.h"
class DiskFile;
public:
typedef RefCount<IoState> Pointer;
- IoState(SwapDir &aDir, StoreEntry *e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data);
+ IoState(Rock::SwapDir::Pointer &aDir, StoreEntry *e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data);
virtual ~IoState();
void file(const RefCount<DiskFile> &aFile);
// ::StoreIOState API
virtual void read_(char *buf, size_t size, off_t offset, STRCB * callback, void *callback_data);
- virtual void write(char const *buf, size_t size, off_t offset, FREE * free_func);
+ virtual bool write(char const *buf, size_t size, off_t offset, FREE * free_func);
virtual void close(int how);
- void finishedWriting(int errFlag);
+ /// whether we are still waiting for the I/O results (i.e., not closed)
+ bool stillWaiting() const { return theFile != NULL; }
- int64_t diskOffset; ///< the start of this cell inside the db file
- DbCellHeader *dbSlot; ///< current db slot, used for writing
+ /// called by SwapDir::writeCompleted() after the last write and on error
+ void finishedWriting(const int errFlag);
MEMPROXY_CLASS(IoState);
+ /* one and only one of these will be set and locked; access via *Anchor() */
+ const Ipc::StoreMapAnchor *readableAnchor_; ///< starting point for reading
+ Ipc::StoreMapAnchor *writeableAnchor_; ///< starting point for writing
+
+ SlotId sidCurrent; ///< ID of the db slot currently being read or written
+
private:
- void doWrite(const bool isLast = false);
+ const Ipc::StoreMapAnchor &readAnchor() const;
+ Ipc::StoreMapAnchor &writeAnchor();
+ const Ipc::StoreMapSlice ¤tReadableSlice() const;
+
+ void tryWrite(char const *buf, size_t size, off_t offset);
+ size_t writeToBuffer(char const *buf, size_t size);
+ void writeToDisk(const SlotId nextSlot);
+ SlotId reserveSlotForWriting();
+
void callBack(int errflag);
- SwapDir &dir; ///< swap dir object
+ Rock::SwapDir::Pointer dir; ///< swap dir that initiated I/O
const size_t slotSize; ///< db cell size
int64_t objOffset; ///< object offset for current db slot
RefCount<DiskFile> theFile; // "file" responsible for this I/O
- MemBuf theBuf; // use for write content accumulation only
+ MemBlob theBuf; // use for write content accumulation only
};
MEMPROXY_CLASS_INLINE(IoState);
CBDATA_NAMESPACED_CLASS_INIT(Rock, Rebuild);
+namespace Rock {
+
+/// maintains information about the store entry being loaded from disk
+/// used for identifying partially stored/loaded entries
+class LoadingEntry {
+public:
+ LoadingEntry(): size(0), version(0), state(leEmpty), anchored(0),
+ mapped(0), freed(0), more(-1) {}
+
+ /* store entry-level information indexed by sfileno */
+ uint64_t size; ///< payload seen so far
+ uint32_t version; ///< DbCellHeader::version to distinguish same-URL chains
+ uint32_t state:3; ///< current entry state (one of the State values)
+ uint32_t anchored:1; ///< whether we loaded the inode slot for this entry
+
+ /* db slot-level information indexed by slotId, starting with firstSlot */
+ uint32_t mapped:1; ///< whether this slot was added to a mapped entry
+ uint32_t freed:1; ///< whether this slot was marked as free
+ sfileno more:25; ///< another slot in some entry chain (unordered)
+ bool used() const { return freed || mapped || more != -1; }
+
+ /// possible entry states
+ typedef enum { leEmpty = 0, leLoading, leLoaded, leCorrupted, leIgnored } State;
+};
+
+} /* namespace Rock */
+
+/**
+ Several layers of information is manipualted during the rebuild:
+
+ Store Entry: Response message plus all the metainformation associated with
+ it. Identified by store key. At any given time, from Squid point
+ of view, there is only one entry with a given key, but several
+ different entries with the same key can be observed in any historical
+ archive (such as an access log or a store database).
+
+ Slot chain: A sequence of db slots representing a Store Entry state at
+ some point in time. Identified by key+version combination. Due to
+ transaction aborts, crashes, and idle periods, some chains may contain
+ incomplete or stale information. We assume that no two different chains
+ have the same key and version. If that assumption fails, we may serve a
+ hodgepodge entry during rebuild, until "extra" slots are loaded/noticed.
+
+ Db slot: A db record containing a piece of a single store entry and linked
+ to other slots with the same key and version fields, forming a chain.
+ Slots are identified by their absolute position in the database file,
+ which is naturally unique.
+
+
+ Except for the "mapped", "freed", and "more" fields, LoadingEntry info is
+ entry-level and is stored at fileno position. In other words, the array of
+ LoadingEntries should be interpreted as two arrays, one that maps slot ID
+ to the LoadingEntry::mapped/free/more members, and the second one that maps
+ fileno to all other LoadingEntry members. StoreMap maps slot key to fileno.
+
+
+ When information from the newly loaded db slot contradicts the entry-level
+ information collected so far (e.g., the versions do not match or the total
+ chain size after the slot contribution exceeds the expected number), the
+ whole entry (and not just the chain or the slot!) is declared corrupted.
+
+ Why invalidate the whole entry? Rock Store is written for high-load
+ environments with large caches, where there is usually very few idle slots
+ in the database. A space occupied by a purged entry is usually immediately
+ reclaimed. A Squid crash or a transaction abort is rather unlikely to
+ leave a relatively large number of stale slots in the database. Thus, the
+ number of potentially corrupted entries is relatively small. On the other
+ hand, the damage from serving a single hadgepodge entry may be significant
+ to the user. In such an environment, invalidating the whole entry has
+ negligible performance impact but saves us from high-damage bugs.
+*/
+
+
Rock::Rebuild::Rebuild(SwapDir *dir): AsyncJob("Rock::Rebuild"),
sd(dir),
+ entries(NULL),
dbSize(0),
dbEntrySize(0),
dbEntryLimit(0),
dbSlot(0),
fd(-1),
dbOffset(0),
- filen(0)
+ slotPos(0),
+ validationPos(0)
{
assert(sd);
memset(&counts, 0, sizeof(counts));
dbSize = sd->diskOffsetLimit(); // we do not care about the trailer waste
dbEntrySize = sd->slotSize;
dbEntryLimit = sd->entryLimit();
- processed.reserve(dbEntryLimit);
- while (static_cast<int>(processed.size()) < dbEntryLimit)
- processed.push_back(false);
}
Rock::Rebuild::~Rebuild()
{
if (fd >= 0)
file_close(fd);
+ delete[] entries;
}
/// prepares and initiates entry loading sequence
if (fd < 0)
failure("cannot open db", errno);
- char buf[SwapDir::HeaderSize];
- if (read(fd, buf, sizeof(buf)) != SwapDir::HeaderSize)
+ char hdrBuf[SwapDir::HeaderSize];
+ if (read(fd, hdrBuf, sizeof(hdrBuf)) != SwapDir::HeaderSize)
failure("cannot read db header", errno);
+ // slot prefix of SM_PAGE_SIZE should fit both core entry header and ours
+ assert(sizeof(DbCellHeader) < SM_PAGE_SIZE);
+ buf.init(SM_PAGE_SIZE, SM_PAGE_SIZE);
+
dbOffset = SwapDir::HeaderSize;
- filen = 0;
+ slotPos = 0;
+
+ entries = new LoadingEntry[dbEntryLimit];
checkpoint();
}
void
Rock::Rebuild::checkpoint()
{
- if (dbOffset < dbSize)
+ if (!done())
eventAdd("Rock::Rebuild", Rock::Rebuild::Steps, this, 0.01, 1, true);
- else
- if (!doneAll()) {
- eventAdd("Rock::Rebuild::Step2", Rock::Rebuild::Steps2, this, 0.01, 1,
- true);
- }
}
bool
Rock::Rebuild::doneAll() const
{
- return dbSlot >= dbSize && AsyncJob::doneAll();
+ return dbOffset >= dbSize && validationPos >= dbEntryLimit &&
+ AsyncJob::doneAll();
}
void
}
void
-Rock::Rebuild::Steps2(void *data)
+Rock::Rebuild::steps()
{
- // use async call to enable job call protection that time events lack
- CallJobHere(47, 5, static_cast<Rebuild*>(data), Rock::Rebuild, steps2);
+ if (dbOffset < dbSize)
+ loadingSteps();
+ else
+ validationSteps();
+
+ checkpoint();
}
void
-Rock::Rebuild::steps()
+Rock::Rebuild::loadingSteps()
{
- debugs(47,5, HERE << sd->index << " filen " << filen << " at " <<
+ debugs(47,5, HERE << sd->index << " slot " << slotPos << " at " <<
dbOffset << " <= " << dbSize);
// Balance our desire to maximize the number of entries processed at once
int loaded = 0;
while (loaded < dbEntryLimit && dbOffset < dbSize) {
- doOneEntry();
+ loadOneSlot();
dbOffset += dbEntrySize;
- ++filen;
+ ++slotPos;
++loaded;
if (counts.scancount % 1000 == 0)
break;
}
}
-
- checkpoint();
}
void
-Rock::Rebuild::steps2()
+Rock::Rebuild::loadOneSlot()
{
- debugs(47,5, HERE << sd->index << " filen " << filen << " at " <<
- dbSlot << " <= " << dbSize);
-
- // Balance our desire to maximize the number of slots processed at once
- // (and, hence, minimize overheads and total rebuild time) with a
- // requirement to also process Coordinator events, disk I/Os, etc.
- const int maxSpentMsec = 50; // keep small: most RAM I/Os are under 1ms
- const timeval loopStart = current_time;
-
- int loaded = 0;
- while (dbSlot < dbSize) {
- doOneSlot();
- ++dbSlot;
- ++loaded;
-
- if (opt_foreground_rebuild)
- continue; // skip "few entries at a time" check below
-
- getCurrentTime();
- const double elapsedMsec = tvSubMsec(loopStart, current_time);
- if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) {
- debugs(47, 5, HERE << "pausing after " << loaded << " slots in " <<
- elapsedMsec << "ms; " << (elapsedMsec/loaded) << "ms per slot");
- break;
- }
- }
-
- checkpoint();
-}
-
-void
-Rock::Rebuild::doOneEntry()
-{
- debugs(47,5, HERE << sd->index << " filen " << filen << " at " <<
+ debugs(47,5, HERE << sd->index << " slot " << slotPos << " at " <<
dbOffset << " <= " << dbSize);
++counts.scancount;
if (lseek(fd, dbOffset, SEEK_SET) < 0)
failure("cannot seek to db entry", errno);
- MemBuf buf;
- buf.init(sizeof(DbCellHeader), sizeof(DbCellHeader));
+ buf.reset();
if (!storeRebuildLoadEntry(fd, sd->index, buf, counts))
return;
- // get our header
- Ipc::Mem::PageId pageId;
- pageId.pool = sd->index;
- pageId.number = filen + 1;
- DbCellHeader &header = sd->dbSlot(pageId);
- assert(!header.sane());
+ const SlotId slotId = slotPos;
+ // get our header
+ DbCellHeader header;
if (buf.contentSize() < static_cast<mb_size_t>(sizeof(header))) {
debugs(47, DBG_IMPORTANT, "WARNING: cache_dir[" << sd->index << "]: " <<
"Ignoring truncated cache entry meta data at " << dbOffset);
- invalidSlot(pageId);
+ freeSlotIfIdle(slotId, true);
return;
}
- memcpy(&header, buf.content(), sizeof(header));
+ memcpy(&header, buf.content(), sizeof(header));
+ if (header.empty()) {
+ freeSlotIfIdle(slotId, false);
+ return;
+ }
if (!header.sane()) {
debugs(47, DBG_IMPORTANT, "WARNING: cache_dir[" << sd->index << "]: " <<
"Ignoring malformed cache entry meta data at " << dbOffset);
- invalidSlot(pageId);
+ freeSlotIfIdle(slotId, true);
return;
}
+ buf.consume(sizeof(header)); // optimize to avoid memmove()
+
+ useNewSlot(slotId, header);
+}
+
+/// parse StoreEntry basics and add them to the map, returning true on success
+bool
+Rock::Rebuild::importEntry(Ipc::StoreMapAnchor &anchor, const sfileno fileno, const DbCellHeader &header)
+{
+ cache_key key[SQUID_MD5_DIGEST_LENGTH];
+ StoreEntry loadedE;
+ if (!storeRebuildParseEntry(buf, loadedE, key, counts, 0))
+ return false;
+
+ const uint64_t knownSize = header.entrySize > 0 ?
+ header.entrySize : anchor.basics.swap_file_sz;
+ if (!loadedE.swap_file_sz && knownSize)
+ loadedE.swap_file_sz = knownSize;
+ // the entry size may still be unknown at this time
+
+ debugs(47, 8, "importing entry basics for " << fileno);
+ anchor.set(loadedE);
+
+ // we have not validated whether all db cells for this entry were loaded
+ EBIT_CLR(anchor.basics.flags, ENTRY_VALIDATED);
+
+ // loadedE->dump(5);
+
+ return true;
}
void
-Rock::Rebuild::doOneSlot()
+Rock::Rebuild::validationSteps()
{
- debugs(47,5, HERE << sd->index << " filen " << filen << " at " <<
- dbSlot << " <= " << dbSize);
+ debugs(47, 5, sd->index << " validating from " << validationPos);
- if (processed[dbSlot])
- return;
+ // see loadingSteps() for the rationale; TODO: avoid duplication
+ const int maxSpentMsec = 50; // keep small: validation does not do I/O
+ const timeval loopStart = current_time;
- Ipc::Mem::PageId pageId;
- pageId.pool = sd->index;
- pageId.number = dbSlot + 1;
- const DbCellHeader &dbSlot = sd->dbSlot(pageId);
- assert(dbSlot.sane());
+ int validated = 0;
+ while (validationPos < dbEntryLimit) {
+ validateOneEntry();
+ ++validationPos;
+ ++validated;
- pageId.number = dbSlot.firstSlot;
- //const DbCellHeader &firstChainSlot = sd->dbSlot(pageId);
+ if (validationPos % 1000 == 0)
+ debugs(20, 2, "validated: " << validationPos);
- /* Process all not yet loaded slots, verify entry chains, if chain
- is valid, load entry from first slot similar to small rock,
- call SwapDir::addEntry (needs to be restored). */
+ if (opt_foreground_rebuild)
+ continue; // skip "few entries at a time" check below
+
+ getCurrentTime();
+ const double elapsedMsec = tvSubMsec(loopStart, current_time);
+ if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) {
+ debugs(47, 5, "pausing after " << validated << " entries in " <<
+ elapsedMsec << "ms; " << (elapsedMsec/validated) << "ms per entry");
+ break;
+ }
+ }
+}
+
+void
+Rock::Rebuild::validateOneEntry()
+{
+ LoadingEntry &e = entries[validationPos];
+ switch (e.state) {
+
+ case LoadingEntry::leEmpty:
+ break; // no entry hashed to this position
+
+ case LoadingEntry::leLoading:
+ freeBadEntry(validationPos, "partially stored");
+ break;
+
+ case LoadingEntry::leLoaded:
+ break; // we have already unlocked this entry
+
+ case LoadingEntry::leCorrupted:
+ break; // we have already removed this entry
+ }
+}
+
+/// Marks remaining bad entry slots as free and unlocks the entry. The map
+/// cannot do this because Loading entries may have holes in the slots chain.
+void
+Rock::Rebuild::freeBadEntry(const sfileno fileno, const char *eDescription)
+{
+ debugs(47, 2, "cache_dir #" << sd->index << ' ' << eDescription <<
+ " entry " << fileno << " is ignored during rebuild");
+
+ Ipc::StoreMapAnchor &anchor = sd->map->writeableEntry(fileno);
+
+ bool freedSome = false;
+ // free all loaded non-anchor slots
+ SlotId slotId = entries[anchor.start].more;
+ while (slotId >= 0) {
+ const SlotId next = entries[slotId].more;
+ freeSlot(slotId, false);
+ slotId = next;
+ freedSome = true;
+ }
+ // free anchor slot if it was loaded
+ if (entries[fileno].anchored) {
+ freeSlot(anchor.start, false);
+ freedSome = true;
+ }
+ assert(freedSome);
+
+ sd->map->forgetWritingEntry(fileno);
+ ++counts.invalid;
}
void
void
Rock::Rebuild::failure(const char *msg, int errNo)
{
- debugs(47,5, HERE << sd->index << " filen " << filen << " at " <<
+ debugs(47,5, HERE << sd->index << " slot " << slotPos << " at " <<
dbOffset << " <= " << dbSize);
if (errNo)
sd->index, sd->filePath, msg);
}
-void Rock::Rebuild::invalidSlot(Ipc::Mem::PageId &pageId)
+/// adds slot to the free slot index
+void
+Rock::Rebuild::freeSlot(const SlotId slotId, const bool invalid)
{
- ++counts.invalid;
- processed[pageId.number - 1] = true;
- sd->dbSlotIndex->push(pageId);
+ debugs(47,5, sd->index << " frees slot " << slotId);
+ LoadingEntry &le = entries[slotId];
+ assert(!le.freed);
+ le.freed = 1;
+
+ if (invalid) {
+ ++counts.invalid;
+ //sd->unlink(fileno); leave garbage on disk, it should not hurt
+ }
+
+ Ipc::Mem::PageId pageId;
+ pageId.pool = sd->index+1;
+ pageId.number = slotId+1;
+ sd->freeSlots->push(pageId);
+}
+
+/// adds slot to the free slot index but only if the slot is unused
+void
+Rock::Rebuild::freeSlotIfIdle(const SlotId slotId, const bool invalid)
+{
+ const LoadingEntry &le = entries[slotId];
+
+ // mapped slots must be freed via freeBadEntry() to keep the map in sync
+ assert(!le.mapped);
+
+ if (!le.used())
+ freeSlot(slotId, invalid);
+}
+
+/// adds slot to the entry chain in the map
+void
+Rock::Rebuild::mapSlot(const SlotId slotId, const DbCellHeader &header)
+{
+ LoadingEntry &le = entries[slotId];
+ assert(!le.mapped);
+ assert(!le.freed);
+ le.mapped = 1;
+
+ Ipc::StoreMapSlice slice;
+ slice.next = header.nextSlot;
+ slice.size = header.payloadSize;
+ sd->map->importSlice(slotId, slice);
+}
+
+/// adds slot to an existing entry chain; caller must check that the slot
+/// belongs to the chain it is being added to
+void
+Rock::Rebuild::addSlotToEntry(const sfileno fileno, const SlotId slotId, const DbCellHeader &header)
+{
+ LoadingEntry &le = entries[fileno];
+ Ipc::StoreMapAnchor &anchor = sd->map->writeableEntry(fileno);
+
+ assert(le.version == header.version);
+
+ // mark anchor as loaded or add the secondary slot to the chain
+ LoadingEntry &inode = entries[header.firstSlot];
+ if (header.firstSlot == slotId) {
+ debugs(47,5, "adding inode");
+ assert(!inode.freed);
+ le.anchored = 1;
+ } else {
+ debugs(47,9, "linking " << slotId << " to " << inode.more);
+ // we do not need to preserve the order
+ LoadingEntry &slice = entries[slotId];
+ assert(!slice.freed);
+ assert(slice.more < 0);
+ slice.more = inode.more;
+ inode.more = slotId;
+ }
+
+ if (header.firstSlot == slotId && !importEntry(anchor, fileno, header)) {
+ le.state = LoadingEntry::leCorrupted;
+ freeBadEntry(fileno, "corrupted metainfo");
+ return;
+ }
+
+ // set total entry size and/or check it for consistency
+ uint64_t totalSize = header.entrySize;
+ assert(totalSize != static_cast<uint64_t>(-1));
+ if (!totalSize && anchor.basics.swap_file_sz) {
+ assert(anchor.basics.swap_file_sz != static_cast<uint64_t>(-1));
+ // perhaps we loaded a later slot (with entrySize) earlier
+ totalSize = anchor.basics.swap_file_sz;
+ } else
+ if (totalSize && !anchor.basics.swap_file_sz) {
+ anchor.basics.swap_file_sz = totalSize;
+ assert(anchor.basics.swap_file_sz != static_cast<uint64_t>(-1));
+ } else
+ if (totalSize != anchor.basics.swap_file_sz) {
+ le.state = LoadingEntry::leCorrupted;
+ freeBadEntry(fileno, "size mismatch");
+ return;
+ }
+
+ le.size += header.payloadSize;
+
+ if (totalSize > 0 && le.size > totalSize) { // overflow
+ le.state = LoadingEntry::leCorrupted;
+ freeBadEntry(fileno, "overflowing");
+ return;
+ }
+
+ mapSlot(slotId, header);
+ if (totalSize > 0 && le.size == totalSize) {
+ // entry fully loaded, unlock it
+ // we have validated that all db cells for this entry were loaded
+ EBIT_SET(anchor.basics.flags, ENTRY_VALIDATED);
+ le.state = LoadingEntry::leLoaded;
+ sd->map->closeForWriting(fileno, false);
+ ++counts.objcount;
+ }
+}
+
+/// initialize housekeeping information for a newly accepted entry
+void
+Rock::Rebuild::primeNewEntry(Ipc::StoreMap::Anchor &anchor, const sfileno fileno, const DbCellHeader &header)
+{
+ anchor.setKey(reinterpret_cast<const cache_key*>(header.key));
+ assert(header.firstSlot >= 0);
+ anchor.start = header.firstSlot;
+
+ assert(anchor.basics.swap_file_sz != static_cast<uint64_t>(-1));
+
+ LoadingEntry &le = entries[fileno];
+ le.state = LoadingEntry::leLoading;
+ le.version = header.version;
+ le.size = 0;
+}
+
+/// handle a slot from an entry that we have not seen before
+void
+Rock::Rebuild::startNewEntry(const sfileno fileno, const SlotId slotId, const DbCellHeader &header)
+{
+ // If some other from-disk entry is/was using this slot as its inode OR
+ // if some other from-disk entry is/was using our inode slot, then the
+ // entries are conflicting. We cannot identify other entries, so we just
+ // remove ours and hope that the others were/will be handled correctly.
+ const LoadingEntry &slice = entries[slotId];
+ const LoadingEntry &inode = entries[header.firstSlot];
+ if (slice.used() || inode.used()) {
+ debugs(47,8, "slice/inode used: " << slice.used() << inode.used());
+ LoadingEntry &le = entries[fileno];
+ le.state = LoadingEntry::leCorrupted;
+ freeSlotIfIdle(slotId, slotId == header.firstSlot);
+ // if not idle, the other entry will handle its slice
+ ++counts.clashcount;
+ return;
+ }
+
+ // A miss may have been stored at our fileno while we were loading other
+ // slots from disk. We ought to preserve that entry because it is fresher.
+ const bool overwriteExisting = false;
+ if (Ipc::StoreMap::Anchor *anchor = sd->map->openForWritingAt(fileno, overwriteExisting)) {
+ primeNewEntry(*anchor, fileno, header);
+ addSlotToEntry(fileno, slotId, header); // may fail
+ assert(anchor->basics.swap_file_sz != static_cast<uint64_t>(-1));
+ } else {
+ // A new from-network entry is occupying our map slot; let it be, but
+ // save us from the trouble of going through the above motions again.
+ LoadingEntry &le = entries[fileno];
+ le.state = LoadingEntry::leIgnored;
+ freeSlotIfIdle(slotId, false);
+ }
+}
+
+/// does the header belong to the fileno entry being loaded?
+bool
+Rock::Rebuild::sameEntry(const sfileno fileno, const DbCellHeader &header) const
+{
+ const Ipc::StoreMap::Anchor &anchor = sd->map->writeableEntry(fileno);
+ const LoadingEntry &le = entries[fileno];
+ // any order will work, but do fast comparisons first:
+ return le.version == header.version &&
+ anchor.start == static_cast<Ipc::StoreMapSliceId>(header.firstSlot) &&
+ anchor.sameKey(reinterpret_cast<const cache_key*>(header.key));
+}
+
+/// is the new header consistent with information already loaded?
+bool
+Rock::Rebuild::canAdd(const sfileno fileno, const SlotId slotId, const DbCellHeader &header) const
+{
+ if (!sameEntry(fileno, header)) {
+ debugs(79, 7, "cannot add; wrong entry");
+ return false;
+ }
+
+ const LoadingEntry &le = entries[slotId];
+ // We cannot add a slot that was already declared free or mapped.
+ if (le.freed || le.mapped) {
+ debugs(79, 7, "cannot add; freed/mapped: " << le.freed << le.mapped);
+ return false;
+ }
+
+ if (slotId == header.firstSlot) {
+ // If we are the inode, the anchored flag cannot be set yet.
+ if (entries[fileno].anchored) {
+ debugs(79, 7, "cannot add; extra anchor");
+ return false;
+ }
+
+ // And there should have been some other slot for this entry to exist.
+ if (le.more < 0) {
+ debugs(79, 7, "cannot add; missing slots");
+ return false;
+ }
+
+ return true;
+ }
+
+ // We are the continuation slice so the more field is reserved for us.
+ if (le.more >= 0) {
+ debugs(79, 7, "cannot add; foreign slot");
+ return false;
+ }
+
+ return true;
+}
+
+/// handle freshly loaded (and validated) db slot header
+void
+Rock::Rebuild::useNewSlot(const SlotId slotId, const DbCellHeader &header)
+{
+ LoadingEntry &slice = entries[slotId];
+ assert(!slice.freed); // we cannot free what was not loaded
+
+ const cache_key *const key =
+ reinterpret_cast<const cache_key*>(header.key);
+ const sfileno fileno = sd->map->anchorIndexByKey(key);
+ assert(0 <= fileno && fileno < dbEntryLimit);
+
+ LoadingEntry &le = entries[fileno];
+ debugs(47,9, "entry " << fileno << " state: " << le.state << ", inode: " <<
+ header.firstSlot << ", size: " << header.payloadSize);
+
+ switch (le.state) {
+
+ case LoadingEntry::leEmpty: {
+ startNewEntry(fileno, slotId, header);
+ break;
+ }
+
+ case LoadingEntry::leLoading: {
+ if (canAdd(fileno, slotId, header)) {
+ addSlotToEntry(fileno, slotId, header);
+ } else {
+ // either the loading chain or this slot is stale;
+ // be conservative and ignore both (and any future ones)
+ le.state = LoadingEntry::leCorrupted;
+ freeBadEntry(fileno, "duplicated");
+ freeSlotIfIdle(slotId, slotId == header.firstSlot);
+ ++counts.dupcount;
+ }
+ break;
+ }
+
+ case LoadingEntry::leLoaded: {
+ // either the previously loaded chain or this slot is stale;
+ // be conservative and ignore both (and any future ones)
+ le.state = LoadingEntry::leCorrupted;
+ sd->map->freeEntry(fileno); // may not be immediately successful
+ freeSlotIfIdle(slotId, slotId == header.firstSlot);
+ ++counts.dupcount;
+ break;
+ }
+
+ case LoadingEntry::leCorrupted: {
+ // previously seen slots messed things up so we must ignore this one
+ freeSlotIfIdle(slotId, false);
+ break;
+ }
+
+ case LoadingEntry::leIgnored: {
+ // already replaced by a fresher or colliding from-network entry
+ freeSlotIfIdle(slotId, false);
+ break;
+ }
+ }
}
#include "base/AsyncJob.h"
#include "cbdata.h"
+#include "fs/rock/RockForward.h"
+#include "MemBuf.h"
#include "store_rebuild.h"
-namespace Ipc
-{
-namespace Mem
-{
-class PageId;
-}
-}
-
namespace Rock
{
-class SwapDir;
+class LoadingEntry;
/// \ingroup Rock
/// manages store rebuild process: loading meta information from db on disk
private:
void checkpoint();
void steps();
- void steps2();
- void doOneEntry();
- void doOneSlot();
+ void loadingSteps();
+ void validationSteps();
+ void loadOneSlot();
+ void validateOneEntry();
+ bool importEntry(Ipc::StoreMapAnchor &anchor, const sfileno slotId, const DbCellHeader &header);
+ void freeBadEntry(const sfileno fileno, const char *eDescription);
+
void failure(const char *msg, int errNo = 0);
- void invalidSlot(Ipc::Mem::PageId &pageId);
+
+ void startNewEntry(const sfileno fileno, const SlotId slotId, const DbCellHeader &header);
+ void primeNewEntry(Ipc::StoreMapAnchor &anchor, const sfileno fileno, const DbCellHeader &header);
+ void addSlotToEntry(const sfileno fileno, const SlotId slotId, const DbCellHeader &header);
+ void useNewSlot(const SlotId slotId, const DbCellHeader &header);
+
+ void mapSlot(const SlotId slotId, const DbCellHeader &header);
+ void freeSlotIfIdle(const SlotId slotId, const bool invalid);
+ void freeBusySlot(const SlotId slotId, const bool invalid);
+ void freeSlot(const SlotId slotId, const bool invalid);
+
+ bool canAdd(const sfileno fileno, const SlotId slotId, const DbCellHeader &header) const;
+ bool sameEntry(const sfileno fileno, const DbCellHeader &header) const;
+
SwapDir *sd;
+ LoadingEntry *entries; ///< store entries being loaded from disk
int64_t dbSize;
int dbEntrySize;
int fd; // store db file descriptor
int64_t dbOffset;
- int filen;
-
- // TODO: use std::bitmap?
- Vector<bool> processed; ///< true iff rebuilt is complete for a given slot
+ sfileno slotPos;
+ sfileno validationPos;
+ MemBuf buf;
StoreRebuildData counts;
static void Steps(void *data);
- static void Steps2(void *data);
CBDATA_CLASS2(Rebuild);
};
const int64_t Rock::SwapDir::HeaderSize = 16*1024;
Rock::SwapDir::SwapDir(): ::SwapDir("rock"),
- slotSize(HeaderSize), filePath(NULL), io(NULL), map(NULL), dbSlots(NULL)
+ slotSize(HeaderSize), filePath(NULL), map(NULL), io(NULL), allSlots(NULL),
+ waitingForPage(NULL)
{
}
return NULL;
sfileno filen;
- const Ipc::StoreMapSlot *const slot = map->openForReading(key, filen);
+ const Ipc::StoreMapAnchor *const slot = map->openForReading(key, filen);
if (!slot)
return NULL;
- const Ipc::StoreMapSlot::Basics &basics = slot->basics;
+ const Ipc::StoreMapAnchor::Basics &basics = slot->basics;
// create a brand new store entry and initialize it with stored basics
StoreEntry *e = new StoreEntry();
uint64_t
Rock::SwapDir::currentSize() const
{
- const uint64_t spaceSize = !dbSlotIndex ?
- maxSize() : (slotSize * dbSlotIndex->size());
+ const uint64_t spaceSize = !freeSlots ?
+ maxSize() : (slotSize * freeSlots->size());
// everything that is not free is in use
return maxSize() - spaceSize;
}
Must(!map);
map = new DirMap(inodeMapPath());
+ map->cleaner = this;
const char *ioModule = needsDiskStrand() ? "IpcIo" : "Blocking";
if (DiskIOModule *m = DiskIOModule::Find(ioModule)) {
theFile->configure(fileConfig);
theFile->open(O_RDWR, 0644, this);
- dbSlotIndex = shm_old(Ipc::Mem::PageStack)(spaceIndexPath());
- dbSlots = new (reinterpret_cast<char *>(dbSlotIndex.getRaw()) +
- dbSlotIndex->stackSize()) DbCellHeader[entryLimitAllowed()];
+ freeSlots = shm_old(Ipc::Mem::PageStack)(freeSlotsPath());
// Increment early. Otherwise, if one SwapDir finishes rebuild before
// others start, storeRebuildComplete() will think the rebuild is over!
if (!map)
return false;
- // TODO: consider DB slots freed when older object would be replaced
- if (dbSlotIndex->size() <
- static_cast<unsigned int>(max(entriesNeeded(diskSpaceNeeded), 1)))
- return false;
-
// Do not start I/O transaction if there are less than 10% free pages left.
// TODO: reserve page instead
if (needsDiskStrand() &&
}
sfileno filen;
- Ipc::StoreMapSlot *const slot =
+ Ipc::StoreMapAnchor *const slot =
map->openForWriting(reinterpret_cast<const cache_key *>(e.key), filen);
if (!slot) {
debugs(47, 5, HERE << "map->add failed");
return NULL;
}
- Ipc::Mem::PageId pageId;
- if (!popDbSlot(pageId)) {
- debugs(79, DBG_IMPORTANT, "WARNING: Rock cache_dir '" << filePath <<
- "' run out of DB slots");
- map->free(filen);
- }
-
+ assert(filen >= 0);
slot->set(e);
// XXX: We rely on our caller, storeSwapOutStart(), to set e.fileno.
// If that does not happen, the entry will not decrement the read level!
- IoState *sio = new IoState(*this, &e, cbFile, cbIo, data);
+ Rock::SwapDir::Pointer self(this);
+ IoState *sio = new IoState(self, &e, cbFile, cbIo, data);
sio->swap_dirn = index;
sio->swap_filen = filen;
- sio->diskOffset = diskOffset(pageId);
-
- DbCellHeader &firstDbSlot = dbSlot(pageId);
- memcpy(firstDbSlot.key, e.key, sizeof(firstDbSlot.key));
- firstDbSlot.firstSlot = pageId.number;
- firstDbSlot.nextSlot = 0;
- ++firstDbSlot.version;
- firstDbSlot.payloadSize = 0;
- sio->dbSlot = &firstDbSlot;
+ sio->writeableAnchor_ = slot;
debugs(47,5, HERE << "dir " << index << " created new filen " <<
std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
- sio->swap_filen << std::dec << " at " <<
+ sio->swap_filen << std::dec << " starting at " <<
diskOffset(sio->swap_filen));
sio->file(theFile);
}
bool
-Rock::SwapDir::popDbSlot(Ipc::Mem::PageId &pageId)
+Rock::SwapDir::useFreeSlot(Ipc::Mem::PageId &pageId)
{
- return dbSlotIndex->pop(pageId);
-}
+ if (freeSlots->pop(pageId)) {
+ debugs(47, 5, "got a previously free slot: " << pageId);
+ return true;
+ }
-Rock::DbCellHeader &
-Rock::SwapDir::dbSlot(const Ipc::Mem::PageId &pageId)
-{
- const DbCellHeader &s = const_cast<const SwapDir *>(this)->dbSlot(pageId);
- return const_cast<DbCellHeader &>(s);
+ // catch free slots delivered to noteFreeMapSlice()
+ assert(!waitingForPage);
+ waitingForPage = &pageId;
+ if (map->purgeOne()) {
+ assert(!waitingForPage); // noteFreeMapSlice() should have cleared it
+ assert(pageId.set());
+ debugs(47, 5, "got a previously busy slot: " << pageId);
+ return true;
+ }
+ assert(waitingForPage == &pageId);
+ waitingForPage = NULL;
+
+ debugs(47, 3, "cannot get a slot; entries: " << map->entryCount());
+ return false;
}
-const Rock::DbCellHeader &
-Rock::SwapDir::dbSlot(const Ipc::Mem::PageId &pageId) const
+bool
+Rock::SwapDir::validSlotId(const SlotId slotId) const
{
- assert(dbSlotIndex->pageIdIsValid(pageId));
- return dbSlots[pageId.number - 1];
+ return 0 <= slotId && slotId < entryLimitAllowed();
}
void
-Rock::SwapDir::cleanReadable(const sfileno fileno)
-{
- Ipc::Mem::PageId pageId = map->extras(fileno).pageId;
- Ipc::Mem::PageId nextPageId = pageId;
- while (pageId) {
- const DbCellHeader &curDbSlot = dbSlot(pageId);
- nextPageId.number = curDbSlot.nextSlot;
- const DbCellHeader &nextDbSlot = dbSlot(nextPageId);
- const bool sameChain = memcmp(curDbSlot.key, nextDbSlot.key,
- sizeof(curDbSlot.key)) == 0 &&
- curDbSlot.version == nextDbSlot.version;
- dbSlotIndex->push(pageId);
- if (sameChain)
- pageId = nextPageId;
+Rock::SwapDir::noteFreeMapSlice(const sfileno sliceId)
+{
+ Ipc::Mem::PageId pageId;
+ pageId.pool = index+1;
+ pageId.number = sliceId+1;
+ if (waitingForPage) {
+ *waitingForPage = pageId;
+ waitingForPage = NULL;
+ } else {
+ freeSlots->push(pageId);
}
}
-// tries to open an old or being-written-to entry with swap_filen for reading
+// tries to open an old entry with swap_filen for reading
StoreIOState::Pointer
Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
{
// The are two ways an entry can get swap_filen: our get() locked it for
// reading or our storeSwapOutStart() locked it for writing. Peeking at our
// locked entry is safe, but no support for reading a filling entry.
- const Ipc::StoreMapSlot *slot = map->peekAtReader(e.swap_filen);
+ const Ipc::StoreMapAnchor *slot = map->peekAtReader(e.swap_filen);
if (!slot)
return NULL; // we were writing afterall
- IoState *sio = new IoState(*this, &e, cbFile, cbIo, data);
+ Rock::SwapDir::Pointer self(this);
+ IoState *sio = new IoState(self, &e, cbFile, cbIo, data);
sio->swap_dirn = index;
sio->swap_filen = e.swap_filen;
- sio->dbSlot = &dbSlot(map->extras(e.swap_filen).pageId);
-
- const Ipc::Mem::PageId &pageId = map->extras(e.swap_filen).pageId;
- sio->diskOffset = diskOffset(pageId);
- DbCellHeader &firstDbSlot = dbSlot(map->extras(e.swap_filen).pageId);
- assert(memcmp(firstDbSlot.key, e.key, sizeof(firstDbSlot.key)));
- assert(firstDbSlot.firstSlot == pageId.number);
+ sio->readableAnchor_ = slot;
+ sio->file(theFile);
debugs(47,5, HERE << "dir " << index << " has old filen: " <<
std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
sio->swap_filen);
+ assert(slot->sameKey(static_cast<const cache_key*>(e.key)));
assert(slot->basics.swap_file_sz > 0);
assert(slot->basics.swap_file_sz == e.swap_file_sz);
- sio->file(theFile);
return sio;
}
assert(request->sio != NULL);
IoState &sio = *request->sio;
+ // quit if somebody called IoState::close() while we were waiting
+ if (!sio.stillWaiting()) {
+ debugs(79, 3, "ignoring closed entry " << sio.swap_filen);
+ return;
+ }
+
if (errflag == DISK_OK) {
- // close, assuming we only write once; the entry gets the read lock
- map->closeForWriting(sio.swap_filen, true);
// do not increment sio.offset_ because we do it in sio->write()
- if (request->isLast)
+ if (request->isLast) {
+ // close, the entry gets the read lock
+ map->closeForWriting(sio.swap_filen, true);
sio.finishedWriting(errflag);
- } else
+ }
+ } else {
writeError(sio.swap_filen);
+ sio.finishedWriting(errflag);
+ // and hope that Core will call disconnect() to close the map entry
+ }
}
void
{
// Do not abortWriting here. The entry should keep the write lock
// instead of losing association with the store and confusing core.
- map->free(fileno); // will mark as unusable, just in case
- // XXX: should we call IoState callback?
+ map->freeEntry(fileno); // will mark as unusable, just in case
+ // All callers must also call IoState callback, to propagate the error.
}
bool
Rock::SwapDir::full() const
{
- return map && map->full();
+ return freeSlots != NULL && !freeSlots->size();
}
// storeSwapOutFileClosed calls this nethod on DISK_NO_SPACE_LEFT,
void
Rock::SwapDir::maintain()
{
- debugs(47,3, HERE << "cache_dir[" << index << "] guards: " <<
- !repl << !map << !full() << StoreController::store_dirs_rebuilding);
-
- if (!repl)
- return; // no means (cannot find a victim)
-
- if (!map)
- return; // no victims (yet)
-
- if (!full())
- return; // no need (to find a victim)
-
- // XXX: UFSSwapDir::maintain says we must quit during rebuild
- if (StoreController::store_dirs_rebuilding)
- return;
-
- debugs(47,3, HERE << "cache_dir[" << index << "] state: " << map->full() <<
- ' ' << currentSize() << " < " << diskOffsetLimit());
-
- // Hopefully, we find a removable entry much sooner (TODO: use time?)
- const int maxProbed = 10000;
- RemovalPurgeWalker *walker = repl->PurgeInit(repl, maxProbed);
-
- // It really should not take that long, but this will stop "infinite" loops
- const int maxFreed = 1000;
- int freed = 0;
- // TODO: should we purge more than needed to minimize overheads?
- for (; freed < maxFreed && full(); ++freed) {
- if (StoreEntry *e = walker->Next(walker))
- e->release(); // will call our unlink() method
- else
- break; // no more objects
- }
-
- debugs(47,2, HERE << "Rock cache_dir[" << index << "] freed " << freed <<
- " scanned " << walker->scanned << '/' << walker->locked);
-
- walker->Done(walker);
-
- if (full()) {
- debugs(47, DBG_CRITICAL, "ERROR: Rock cache_dir[" << index << "] " <<
- "is still full after freeing " << freed << " entries. A bug?");
- }
+ // The Store calls this to free some db space, but there is nothing wrong
+ // with a full() db, except when db has to shrink after reconfigure, and
+ // we do not support shrinking yet (it would have to purge specific slots).
+ // TODO: Disable maintain() requests when they are pointless.
}
void
{
debugs(47, 5, HERE << e);
ignoreReferences(e);
- map->free(e.swap_filen);
+ map->freeEntry(e.swap_filen);
disconnect(e);
}
storeAppendPrintf(&e, "Current entries: %9d %.2f%%\n",
entryCount, (100.0 * entryCount / limit));
+ const unsigned int slotsFree = !freeSlots ? 0 : freeSlots->size();
+ if (slotsFree <= static_cast<const unsigned int>(limit)) {
+ const int usedSlots = limit - static_cast<const int>(slotsFree);
+ storeAppendPrintf(&e, "Used slots: %9d %.2f%%\n",
+ usedSlots, (100.0 * usedSlots / limit));
+ }
if (limit < 100) { // XXX: otherwise too expensive to count
Ipc::ReadWriteLockStats stats;
map->updateStats(stats);
}
const char *
-Rock::SwapDir::spaceIndexPath() const {
+Rock::SwapDir::freeSlotsPath() const {
static String spacesPath;
spacesPath = path;
spacesPath.append("_spaces");
void Rock::SwapDirRr::create(const RunnerRegistry &)
{
- Must(mapOwners.empty() && dbSlotsOwners.empty());
+ Must(mapOwners.empty() && freeSlotsOwners.empty());
for (int i = 0; i < Config.cacheSwap.n_configured; ++i) {
if (const Rock::SwapDir *const sd = dynamic_cast<Rock::SwapDir *>(INDEXSD(i))) {
const int64_t capacity = sd->entryLimitAllowed();
mapOwners.push_back(mapOwner);
// XXX: remove pool id and counters from PageStack
- Ipc::Mem::Owner<Ipc::Mem::PageStack> *const dbSlotsOwner =
- shm_new(Ipc::Mem::PageStack)(sd->spaceIndexPath(),
- i, capacity,
+ Ipc::Mem::Owner<Ipc::Mem::PageStack> *const freeSlotsOwner =
+ shm_new(Ipc::Mem::PageStack)(sd->freeSlotsPath(),
+ i+1, capacity,
sizeof(DbCellHeader));
- dbSlotsOwners.push_back(dbSlotsOwner);
+ freeSlotsOwners.push_back(freeSlotsOwner);
// XXX: add method to initialize PageStack with no free pages
while (true) {
Ipc::Mem::PageId pageId;
- if (!dbSlotsOwner->object()->pop(pageId))
+ if (!freeSlotsOwner->object()->pop(pageId))
break;
}
}
{
for (size_t i = 0; i < mapOwners.size(); ++i) {
delete mapOwners[i];
- delete dbSlotsOwners[i];
+ delete freeSlotsOwners[i];
}
}
#include "DiskIO/DiskFile.h"
#include "DiskIO/IORequestor.h"
#include "fs/rock/RockDbCell.h"
+#include "fs/rock/RockForward.h"
#include "ipc/StoreMap.h"
#include "ipc/mem/Page.h"
#include "ipc/mem/PageStack.h"
namespace Rock
{
-class Rebuild;
-
/// \ingroup Rock
class SwapDir: public ::SwapDir, public IORequestor, public Ipc::StoreMapCleaner
{
public:
+ typedef RefCount<SwapDir> Pointer;
+ typedef Ipc::StoreMap DirMap;
+
SwapDir();
virtual ~SwapDir();
// temporary path to the shared memory map of first slots of cached entries
const char *inodeMapPath() const;
// temporary path to the shared memory stack of free slots
- const char *spaceIndexPath() const;
+ const char *freeSlotsPath() const;
int64_t entryLimitHigh() const { return SwapFilenMax; } ///< Core limit
int64_t entryLimitAllowed() const;
- bool popDbSlot(Ipc::Mem::PageId &pageId);
- DbCellHeader &dbSlot(const Ipc::Mem::PageId &pageId);
- const DbCellHeader &dbSlot(const Ipc::Mem::PageId &pageId) const;
+ /// removes a slot from a list of free slots or returns false
+ bool useFreeSlot(Ipc::Mem::PageId &pageId);
+ /// whether the given slot ID may point to a slot in this db
+ bool validSlotId(const SlotId slotId) const;
+ /// purges one or more entries to make full() false and free some slots
+ void purgeSome();
int64_t diskOffset(Ipc::Mem::PageId &pageId) const;
+ int64_t diskOffset(int filen) const;
void writeError(const sfileno fileno);
- virtual void cleanReadable(const sfileno fileno);
-
- // TODO: merge with MemStoreMapExtras?
- struct MapExtras {
- Ipc::Mem::PageId pageId;
- };
- typedef Ipc::StoreMapWithExtras<MapExtras> DirMap;
+ /* StoreMapCleaner API */
+ virtual void noteFreeMapSlice(const sfileno fileno);
uint64_t slotSize; ///< all db slots are of this size
void trackReferences(StoreEntry &e); ///< add to replacement policy scope
void ignoreReferences(StoreEntry &e); ///< delete from repl policy scope
- int64_t diskOffset(int filen) const;
int64_t diskOffsetLimit() const;
int entryLimit() const { return map->entryLimit(); }
int entryMaxPayloadSize() const;
int entriesNeeded(const int64_t objSize) const;
friend class Rebuild;
+ friend class IoState;
const char *filePath; ///< location of cache storage file inside path/
+ DirMap *map; ///< entry key/sfileno to MaxExtras/inode mapping
private:
void createError(const char *const msg);
DiskIOStrategy *io;
RefCount<DiskFile> theFile; ///< cache storage for this cache_dir
- DirMap *map;
- DbCellHeader *dbSlots;
- Ipc::Mem::Pointer<Ipc::Mem::PageStack> dbSlotIndex;
+ DbCellHeader *allSlots; ///< SlotId to DbCellHeader mapping
+ Ipc::Mem::Pointer<Ipc::Mem::PageStack> freeSlots; ///< free slots
+ Ipc::Mem::PageId *waitingForPage; ///< one-page cache for a "hot" free slot
/* configurable options */
DiskFile::Config fileConfig; ///< file-level configuration options
private:
Vector<SwapDir::DirMap::Owner *> mapOwners;
- Vector< Ipc::Mem::Owner<Ipc::Mem::PageStack> *> dbSlotsOwners;
+ Vector< Ipc::Mem::Owner<Ipc::Mem::PageStack> *> freeSlotsOwners;
};
} // namespace Rock
StartListening.h \
StoreMap.cc \
StoreMap.h \
+ StoreMapSlice.cc \
+ StoreMapSlice.h \
StrandCoord.cc \
StrandCoord.h \
StrandCoords.h \
shared->limit);
}
-Ipc::StoreMap::Slot *
+int
+Ipc::StoreMap::compareVersions(const sfileno fileno, time_t newVersion) const
+{
+ assert(valid(fileno));
+ Anchor &inode = shared->slots[fileno].anchor;
+
+ // note: we do not lock, so comparison may be inacurate
+
+ if (inode.state == Anchor::Empty)
+ return +2;
+
+ if (const time_t diff = newVersion - inode.basics.timestamp)
+ return diff < 0 ? -1 : +1;
+
+ return 0;
+}
+
+void
+Ipc::StoreMap::forgetWritingEntry(sfileno fileno)
+{
+ assert(valid(fileno));
+ Anchor &inode = shared->slots[fileno].anchor;
+
+ assert(inode.state == Anchor::Writeable);
+
+ // we do not iterate slices because we were told to forget about
+ // them; the caller is responsible for freeing them (most likely
+ // our slice list is incomplete or has holes)
+
+ inode.waitingToBeFreed = false;
+ inode.state = Anchor::Empty;
+
+ inode.lock.unlockExclusive();
+ --shared->count;
+
+ debugs(54, 8, "closed entry " << fileno << " for writing " << path);
+}
+
+Ipc::StoreMap::Anchor *
Ipc::StoreMap::openForWriting(const cache_key *const key, sfileno &fileno)
{
- debugs(54, 5, HERE << " trying to open slot for key " << storeKeyText(key)
+ debugs(54, 5, "opening entry with key " << storeKeyText(key)
<< " for writing in map [" << path << ']');
- const int idx = slotIndexByKey(key);
+ const int idx = anchorIndexByKey(key);
- Slot &s = shared->slots[idx];
+ if (Anchor *anchor = openForWritingAt(idx)) {
+ fileno = idx;
+ return anchor;
+ }
+
+ return NULL;
+}
+
+Ipc::StoreMap::Anchor *
+Ipc::StoreMap::openForWritingAt(const sfileno fileno, bool overwriteExisting)
+{
+ Anchor &s = shared->slots[fileno].anchor;
ReadWriteLock &lock = s.lock;
if (lock.lockExclusive()) {
- assert(s.state != Slot::Writeable); // until we start breaking locks
+ assert(s.state != Anchor::Writeable); // until we start breaking locks
+
+ // bail if we cannot empty this position
+ if (!s.waitingToBeFreed && s.state == Anchor::Readable && !overwriteExisting) {
+ lock.unlockExclusive();
+ debugs(54, 5, "cannot open existing entry at " << fileno <<
+ " for writing in map [" << path << ']');
+ return NULL;
+ }
// free if the entry was used, keeping the entry locked
- if (s.waitingToBeFreed || s.state == Slot::Readable)
- freeLocked(s, true);
+ if (s.waitingToBeFreed || s.state == Anchor::Readable)
+ freeChain(fileno, s, true);
- assert(s.state == Slot::Empty);
+ assert(s.state == Anchor::Empty);
++shared->count;
- s.state = Slot::Writeable;
- fileno = idx;
+ s.state = Anchor::Writeable;
+
//s.setKey(key); // XXX: the caller should do that
- debugs(54, 5, HERE << " opened slot at " << idx <<
- " for writing in map [" << path << ']');
+ debugs(54, 5, "opened entry " << fileno << " for writing " << path);
return &s; // and keep the entry locked
- }
+ }
- debugs(54, 5, HERE << " failed to open slot at " << idx <<
+ debugs(54, 5, "cannot open busy entry at " << fileno <<
" for writing in map [" << path << ']');
return NULL;
}
void
Ipc::StoreMap::closeForWriting(const sfileno fileno, bool lockForReading)
{
- debugs(54, 5, HERE << " closing slot at " << fileno << " for writing and "
- "openning for reading in map [" << path << ']');
assert(valid(fileno));
- Slot &s = shared->slots[fileno];
- assert(s.state == Slot::Writeable);
- s.state = Slot::Readable;
- if (lockForReading)
+ Anchor &s = shared->slots[fileno].anchor;
+ assert(s.state == Anchor::Writeable);
+ s.state = Anchor::Readable;
+ if (lockForReading) {
s.lock.switchExclusiveToShared();
- else
+ debugs(54, 5, "switched entry at " << fileno <<
+ " from writing to reading in map [" << path << ']');
+ } else {
s.lock.unlockExclusive();
+ debugs(54, 5, "closed entry " << fileno << " for writing " << path);
+ }
+}
+
+Ipc::StoreMap::Slice &
+Ipc::StoreMap::writeableSlice(const AnchorId anchorId, const SliceId sliceId)
+{
+ assert(valid(anchorId));
+ assert(shared->slots[anchorId].anchor.state == Anchor::Writeable);
+ assert(valid(sliceId));
+ return shared->slots[sliceId].slice;
+}
+
+const Ipc::StoreMap::Slice &
+Ipc::StoreMap::readableSlice(const AnchorId anchorId, const SliceId sliceId) const
+{
+ assert(valid(anchorId));
+ assert(shared->slots[anchorId].anchor.state == Anchor::Readable);
+ assert(valid(sliceId));
+ return shared->slots[sliceId].slice;
+}
+
+Ipc::StoreMap::Anchor &
+Ipc::StoreMap::writeableEntry(const AnchorId anchorId)
+{
+ assert(valid(anchorId));
+ assert(shared->slots[anchorId].anchor.state == Anchor::Writeable);
+ return shared->slots[anchorId].anchor;
}
/// terminate writing the entry, freeing its slot for others to use
void
Ipc::StoreMap::abortWriting(const sfileno fileno)
{
- debugs(54, 5, HERE << " abort writing slot at " << fileno <<
- " in map [" << path << ']');
+ debugs(54, 5, "abort entry at " << fileno <<
+ " for writing in map [" << path << ']');
assert(valid(fileno));
- Slot &s = shared->slots[fileno];
- assert(s.state == Slot::Writeable);
- freeLocked(s, false);
+ Anchor &s = shared->slots[fileno].anchor;
+ assert(s.state == Anchor::Writeable);
+ freeChain(fileno, s, false);
+ debugs(54, 5, "closed entry " << fileno << " for writing " << path);
}
void
Ipc::StoreMap::abortIo(const sfileno fileno)
{
- debugs(54, 5, HERE << " abort I/O for slot at " << fileno <<
- " in map [" << path << ']');
+ debugs(54, 5, "abort entry at " << fileno <<
+ " for I/O in map [" << path << ']');
assert(valid(fileno));
- Slot &s = shared->slots[fileno];
+ Anchor &s = shared->slots[fileno].anchor;
// The caller is a lock holder. Thus, if we are Writeable, then the
// caller must be the writer; otherwise the caller must be the reader.
- if (s.state == Slot::Writeable)
+ if (s.state == Anchor::Writeable)
abortWriting(fileno);
else
closeForReading(fileno);
}
-const Ipc::StoreMap::Slot *
+const Ipc::StoreMap::Anchor *
Ipc::StoreMap::peekAtReader(const sfileno fileno) const
{
assert(valid(fileno));
- const Slot &s = shared->slots[fileno];
+ const Anchor &s = shared->slots[fileno].anchor;
switch (s.state) {
- case Slot::Readable:
+ case Anchor::Readable:
return &s; // immediate access by lock holder so no locking
- case Slot::Writeable:
+ case Anchor::Writeable:
return NULL; // cannot read the slot when it is being written
- case Slot::Empty:
+ case Anchor::Empty:
assert(false); // must be locked for reading or writing
}
assert(false); // not reachable
}
void
-Ipc::StoreMap::free(const sfileno fileno)
+Ipc::StoreMap::freeEntry(const sfileno fileno)
{
- debugs(54, 5, HERE << " marking slot at " << fileno << " to be freed in"
+ debugs(54, 5, HERE << " marking entry at " << fileno << " to be freed in"
" map [" << path << ']');
assert(valid(fileno));
- Slot &s = shared->slots[fileno];
+ Anchor &s = shared->slots[fileno].anchor;
if (s.lock.lockExclusive())
- freeLocked(s, false);
+ freeChain(fileno, s, false);
else
s.waitingToBeFreed = true; // mark to free it later
}
-const Ipc::StoreMap::Slot *
+/// unconditionally frees an already locked chain of slots, unlocking if needed
+void
+Ipc::StoreMap::freeChain(const sfileno fileno, Anchor &inode, const bool keepLocked)
+{
+ debugs(54, 7, "freeing " << inode.state << " entry " << fileno <<
+ " in map [" << path << ']');
+ if (inode.state == Anchor::Readable && cleaner) {
+ sfileno sliceId = inode.start;
+ debugs(54, 7, "first slice " << sliceId);
+ while (sliceId >= 0) {
+ const sfileno nextId = shared->slots[sliceId].slice.next;
+ cleaner->noteFreeMapSlice(sliceId); // might change slice state
+ sliceId = nextId;
+ }
+ }
+
+ inode.waitingToBeFreed = false;
+ inode.state = Anchor::Empty;
+
+ if (!keepLocked)
+ inode.lock.unlockExclusive();
+ --shared->count;
+ debugs(54, 5, "freed entry " << fileno <<
+ " in map [" << path << ']');
+}
+
+const Ipc::StoreMap::Anchor *
Ipc::StoreMap::openForReading(const cache_key *const key, sfileno &fileno)
{
- debugs(54, 5, HERE << " trying to open slot for key " << storeKeyText(key)
+ debugs(54, 5, "opening entry with key " << storeKeyText(key)
<< " for reading in map [" << path << ']');
- const int idx = slotIndexByKey(key);
- if (const Slot *slot = openForReadingAt(idx)) {
+ const int idx = anchorIndexByKey(key);
+ if (const Anchor *slot = openForReadingAt(idx)) {
if (slot->sameKey(key)) {
fileno = idx;
- debugs(54, 5, HERE << " opened slot at " << fileno << " for key "
- << storeKeyText(key) << " for reading in map [" << path <<
- ']');
return slot; // locked for reading
}
slot->lock.unlockShared();
+ debugs(54, 7, "closed entry " << idx << " for reading " << path);
}
- debugs(54, 5, HERE << " failed to open slot for key " << storeKeyText(key)
- << " for reading in map [" << path << ']');
return NULL;
}
-const Ipc::StoreMap::Slot *
+const Ipc::StoreMap::Anchor *
Ipc::StoreMap::openForReadingAt(const sfileno fileno)
{
- debugs(54, 5, HERE << " trying to open slot at " << fileno << " for "
- "reading in map [" << path << ']');
+ debugs(54, 5, "opening entry at " << fileno <<
+ " for reading in map [" << path << ']');
assert(valid(fileno));
- Slot &s = shared->slots[fileno];
+ Anchor &s = shared->slots[fileno].anchor;
if (!s.lock.lockShared()) {
- debugs(54, 5, HERE << " failed to lock slot at " << fileno << " for "
- "reading in map [" << path << ']');
+ debugs(54, 5, "cannot open busy entry at " << fileno <<
+ "for reading in map [" << path << ']');
return NULL;
}
- if (s.state == Slot::Empty) {
+ if (s.state == Anchor::Empty) {
s.lock.unlockShared();
- debugs(54, 7, HERE << " empty slot at " << fileno << " for "
- "reading in map [" << path << ']');
+ debugs(54, 7, "cannot open empty entry at " << fileno <<
+ " for reading in map [" << path << ']');
return NULL;
}
if (s.waitingToBeFreed) {
s.lock.unlockShared();
- debugs(54, 7, HERE << " dirty slot at " << fileno << " for "
- "reading in map [" << path << ']');
+ debugs(54, 7, HERE << "cannot open marked entry at " << fileno <<
+ " for reading in map [" << path << ']');
return NULL;
}
// cannot be Writing here if we got shared lock and checked Empty above
- assert(s.state == Slot::Readable);
- debugs(54, 5, HERE << " opened slot at " << fileno << " for reading in"
- " map [" << path << ']');
+ assert(s.state == Anchor::Readable);
+ debugs(54, 5, "opened entry " << fileno << " for reading " << path);
return &s;
}
void
Ipc::StoreMap::closeForReading(const sfileno fileno)
{
- debugs(54, 5, HERE << " closing slot at " << fileno << " for reading in "
- "map [" << path << ']');
assert(valid(fileno));
- Slot &s = shared->slots[fileno];
- assert(s.state == Slot::Readable);
+ Anchor &s = shared->slots[fileno].anchor;
+ assert(s.state == Anchor::Readable);
s.lock.unlockShared();
+ debugs(54, 5, "closed entry " << fileno << " for reading " << path);
+}
+
+bool
+Ipc::StoreMap::purgeOne()
+{
+ // Hopefully, we find a removable entry much sooner (TODO: use time?).
+ // The min() will protect us from division by zero inside the loop.
+ const int searchLimit = min(10000, entryLimit());
+ int tries = 0;
+ for (; tries < searchLimit; ++tries) {
+ const sfileno fileno = shared->victim++ % shared->limit;
+ assert(valid(fileno));
+ Anchor &s = shared->slots[fileno].anchor;
+ if (s.lock.lockExclusive()) {
+ if (s.state == Anchor::Readable) { // skip empties
+ // this entry may be marked for deletion, and that is OK
+ freeChain(fileno, s, false);
+ debugs(54, 5, "purged entry at " << fileno);
+ return true;
+ }
+ s.lock.unlockExclusive();
+ }
+ }
+ debugs(54, 5, "found no entries to purge; tried: " << tries);
+ return false;
+}
+
+void
+Ipc::StoreMap::importSlice(const SliceId sliceId, const Slice &slice)
+{
+ // Slices are imported into positions that should not be available via
+ // "get free slice" API. This is not something we can double check
+ // reliably because the anchor for the imported slice may not have been
+ // imported yet.
+ assert(valid(sliceId));
+ shared->slots[sliceId].slice = slice;
}
int
Ipc::StoreMap::updateStats(ReadWriteLockStats &stats) const
{
for (int i = 0; i < shared->limit; ++i)
- shared->slots[i].lock.updateStats(stats);
+ shared->slots[i].anchor.lock.updateStats(stats);
}
bool
return 0 <= pos && pos < entryLimit();
}
-int
-Ipc::StoreMap::slotIndexByKey(const cache_key *const key) const
+sfileno
+Ipc::StoreMap::anchorIndexByKey(const cache_key *const key) const
{
const uint64_t *const k = reinterpret_cast<const uint64_t *>(key);
// TODO: use a better hash function
return (k[0] + k[1]) % shared->limit;
}
-Ipc::StoreMap::Slot &
-Ipc::StoreMap::slotByKey(const cache_key *const key)
+Ipc::StoreMap::Anchor &
+Ipc::StoreMap::anchorByKey(const cache_key *const key)
{
- return shared->slots[slotIndexByKey(key)];
+ return shared->slots[anchorIndexByKey(key)].anchor;
}
-/// unconditionally frees the already exclusively locked slot and releases lock
-void
-Ipc::StoreMap::freeLocked(Slot &s, bool keepLocked)
-{
- if (s.state == Slot::Readable && cleaner)
- cleaner->cleanReadable(&s - shared->slots.raw());
-
- s.waitingToBeFreed = false;
- s.state = Slot::Empty;
- if (!keepLocked)
- s.lock.unlockExclusive();
- --shared->count;
- debugs(54, 5, HERE << " freed slot at " << (&s - shared->slots.raw()) <<
- " in map [" << path << ']');
-}
-/* Ipc::StoreMapSlot */
+/* Ipc::StoreMapAnchor */
-Ipc::StoreMapSlot::StoreMapSlot(): state(Empty)
+Ipc::StoreMapAnchor::StoreMapAnchor(): start(0), state(Empty)
{
memset(&key, 0, sizeof(key));
memset(&basics, 0, sizeof(basics));
+ // keep in sync with rewind()
}
void
-Ipc::StoreMapSlot::setKey(const cache_key *const aKey)
+Ipc::StoreMapAnchor::setKey(const cache_key *const aKey)
{
memcpy(key, aKey, sizeof(key));
}
bool
-Ipc::StoreMapSlot::sameKey(const cache_key *const aKey) const
+Ipc::StoreMapAnchor::sameKey(const cache_key *const aKey) const
{
const uint64_t *const k = reinterpret_cast<const uint64_t *>(aKey);
return k[0] == key[0] && k[1] == key[1];
}
void
-Ipc::StoreMapSlot::set(const StoreEntry &from)
+Ipc::StoreMapAnchor::set(const StoreEntry &from)
{
+ assert(state == Writeable);
memcpy(key, from.key, sizeof(key));
- // XXX: header = aHeader;
basics.timestamp = from.timestamp;
basics.lastref = from.lastref;
basics.expires = from.expires;
basics.flags = from.flags;
}
+void
+Ipc::StoreMapAnchor::rewind()
+{
+ assert(state == Writeable);
+ start = 0;
+ memset(&key, 0, sizeof(key));
+ memset(&basics, 0, sizeof(basics));
+ // but keep the lock
+}
+
/* Ipc::StoreMap::Shared */
Ipc::StoreMap::Shared::Shared(const int aLimit, const size_t anExtrasSize):
- limit(aLimit), extrasSize(anExtrasSize), count(0), slots(aLimit)
+ limit(aLimit), extrasSize(anExtrasSize), count(0), victim(0),
+ slots(aLimit)
{
}
size_t
Ipc::StoreMap::Shared::SharedMemorySize(const int limit, const size_t extrasSize)
{
- return sizeof(Shared) + limit * (sizeof(Slot) + extrasSize);
+ return sizeof(Shared) + limit * (sizeof(StoreMapSlot) + extrasSize);
}
#include "ipc/ReadWriteLock.h"
#include "ipc/mem/FlexibleArray.h"
#include "ipc/mem/Pointer.h"
-#include "typedefs.h"
+#include "ipc/StoreMapSlice.h"
namespace Ipc
{
-/// a StoreMap element, holding basic shareable StoreEntry info
-class StoreMapSlot
+/// Maintains shareable information about a StoreEntry as a whole.
+/// An anchor points to one or more StoreEntry slices. This is the
+/// only lockable part of shared StoreEntry information, providing
+/// protection for all StoreEntry slices.
+class StoreMapAnchor
{
public:
- StoreMapSlot();
+ StoreMapAnchor();
- /// store StoreEntry key and basics
+ /// store StoreEntry key and basics for an inode slot
void set(const StoreEntry &anEntry);
void setKey(const cache_key *const aKey);
bool sameKey(const cache_key *const aKey) const;
+ /// undo the effects of set(), setKey(), etc., but keep locks and state
+ void rewind();
+
public:
mutable ReadWriteLock lock; ///< protects slot data below
Atomic::WordT<uint8_t> waitingToBeFreed; ///< may be accessed w/o a lock
uint16_t flags;
} basics;
+ StoreMapSliceId start; ///< where the chain of StoreEntry slices begins
+
/// possible persistent states
typedef enum {
Empty, ///< ready for writing, with nothing of value
State state; ///< current state
};
+/// XXX: a hack to allocate one shared array for both anchors and slices
+class StoreMapSlot {
+public:
+ StoreMapAnchor anchor;
+ StoreMapSlice slice;
+};
+
class StoreMapCleaner;
-/// map of StoreMapSlots indexed by their keys, with read/write slot locking
+/// map of StoreMapSlots indexed by their keys, with read/write slice locking
/// kids extend to store custom data
class StoreMap
{
public:
- typedef StoreMapSlot Slot;
+ typedef StoreMapAnchor Anchor;
+ typedef sfileno AnchorId;
+ typedef StoreMapSlice Slice;
+ typedef StoreMapSliceId SliceId;
/// data shared across maps in different processes
class Shared
size_t sharedMemorySize() const;
static size_t SharedMemorySize(const int limit, const size_t anExtrasSize);
- const int limit; ///< maximum number of map slots
- const size_t extrasSize; ///< size of slot extra data
- Atomic::Word count; ///< current number of map slots
- Ipc::Mem::FlexibleArray<Slot> slots; ///< slots storage
+ const int limit; ///< maximum number of store entries
+ const size_t extrasSize; ///< size of slice extra data
+ Atomic::Word count; ///< current number of entries
+ Atomic::WordT<sfileno> victim; ///< starting point for purge search
+ Ipc::Mem::FlexibleArray<StoreMapSlot> slots; ///< storage
};
public:
StoreMap(const char *const aPath);
- /// finds, reservers space for writing a new entry or returns nil
- Slot *openForWriting(const cache_key *const key, sfileno &fileno);
- /// successfully finish writing the entry
+ /// computes map entry position for a given entry key
+ sfileno anchorIndexByKey(const cache_key *const key) const;
+
+ /// Like strcmp(mapped, new), but for store entry versions/timestamps.
+ /// Returns +2 if the mapped entry does not exist; -1/0/+1 otherwise.
+ /// Comparison may be inaccurate unless the caller is a lock holder.
+ int compareVersions(const sfileno oldFileno, time_t newVersion) const;
+
+ /// finds, locks, and returns an anchor for an empty key position,
+ /// erasing the old entry (if any)
+ Anchor *openForWriting(const cache_key *const key, sfileno &fileno);
+ /// locks and returns an anchor for the empty fileno position; if
+ /// overwriteExisting is false and the position is not empty, returns nil
+ Anchor *openForWritingAt(sfileno fileno, bool overwriteExisting = true);
+ /// successfully finish creating or updating the entry at fileno pos
void closeForWriting(const sfileno fileno, bool lockForReading = false);
+ /// unlock and "forget" openForWriting entry, making it Empty again
+ /// this call does not free entry slices so the caller has to do that
+ void forgetWritingEntry(const sfileno fileno);
+
+ /// only works on locked entries; returns nil unless the slice is readable
+ const Anchor *peekAtReader(const sfileno fileno) const;
+
+ /// if possible, free the entry and return true
+ /// otherwise mark it as waiting to be freed and return false
+ void freeEntry(const sfileno fileno);
+
+ /// opens entry (identified by key) for reading, increments read level
+ const Anchor *openForReading(const cache_key *const key, sfileno &fileno);
+ /// opens entry (identified by sfileno) for reading, increments read level
+ const Anchor *openForReadingAt(const sfileno fileno);
+ /// closes open entry after reading, decrements read level
+ void closeForReading(const sfileno fileno);
- /// only works on locked entries; returns nil unless the slot is readable
- const Slot *peekAtReader(const sfileno fileno) const;
+ /// writeable slice within an entry chain created by openForWriting()
+ Slice &writeableSlice(const AnchorId anchorId, const SliceId sliceId);
+ /// readable slice within an entry chain opened by openForReading()
+ const Slice &readableSlice(const AnchorId anchorId, const SliceId sliceId) const;
+ /// writeable anchor for the entry created by openForWriting()
+ Anchor &writeableEntry(const AnchorId anchorId);
- /// mark the slot as waiting to be freed and, if possible, free it
- void free(const sfileno fileno);
+ /// called by lock holder to terminate either slice writing or reading
+ void abortIo(const sfileno fileno);
- /// open slot for reading, increments read level
- const Slot *openForReading(const cache_key *const key, sfileno &fileno);
- /// open slot for reading, increments read level
- const Slot *openForReadingAt(const sfileno fileno);
- /// close slot after reading, decrements read level
- void closeForReading(const sfileno fileno);
+ /// finds an unlocked entry and frees it or returns false
+ bool purgeOne();
- /// called by lock holder to terminate either slot writing or reading
- void abortIo(const sfileno fileno);
+ /// copies slice to its designated position
+ void importSlice(const SliceId sliceId, const Slice &slice);
- bool full() const; ///< there are no empty slots left
- bool valid(const int n) const; ///< whether n is a valid slot coordinate
- int entryCount() const; ///< number of used slots
- int entryLimit() const; ///< maximum number of slots that can be used
+ bool full() const; ///< there are no empty slices left; XXX: remove as unused?
+ bool valid(const int n) const; ///< whether n is a valid slice coordinate
+ int entryCount() const; ///< number of writeable and readable entries
+ int entryLimit() const; ///< maximum entryCount() possible
/// adds approximate current stats to the supplied ones
void updateStats(ReadWriteLockStats &stats) const;
Mem::Pointer<Shared> shared;
private:
- int slotIndexByKey(const cache_key *const key) const;
- Slot &slotByKey(const cache_key *const key);
+ Anchor &anchorByKey(const cache_key *const key);
- Slot *openForReading(Slot &s);
+ Anchor *openForReading(Slice &s);
void abortWriting(const sfileno fileno);
- void freeIfNeeded(Slot &s);
- void freeLocked(Slot &s, bool keepLocked);
+
+ void freeChain(const sfileno fileno, Anchor &inode, const bool keepLock);
};
-/// StoreMap with extra slot data
+/// StoreMap with extra slice data
/// Note: ExtrasT must be POD, it is initialized with zeroes, no
/// constructors or destructors are called
template <class ExtrasT>
ExtrasT *sharedExtras; ///< pointer to extras in shared memory
};
-/// API for adjusting external state when dirty map slot is being freed
+/// API for adjusting external state when dirty map slice is being freed
class StoreMapCleaner
{
public:
virtual ~StoreMapCleaner() {}
- /// adjust slot-linked state before a locked Readable slot is erased
- virtual void cleanReadable(const sfileno fileno) = 0;
+ /// adjust slice-linked state before a locked Readable slice is erased
+ virtual void noteFreeMapSlice(const sfileno sliceId) = 0;
};
// StoreMapWithExtras implementation
--- /dev/null
+/*
+ * DEBUG: section 54 Interprocess Communication
+ */
+
+#include "squid.h"
+#include "ipc/StoreMapSlice.h"
+#include "tools.h"
+
+Ipc::StoreMapSlice::StoreMapSlice()
+{
+ memset(this, 0, sizeof(*this));
+}
--- /dev/null
+#ifndef SQUID_IPC_STORE_MAP_SLICE_H
+#define SQUID_IPC_STORE_MAP_SLICE_H
+
+#include "typedefs.h"
+
+namespace Ipc
+{
+
+typedef uint32_t StoreMapSliceId;
+
+/// a piece of Store entry, linked to other pieces, forming a chain
+class StoreMapSlice
+{
+public:
+ StoreMapSlice(): next(0), /* location(0), */ size(0) {}
+
+ StoreMapSliceId next; ///< ID of the next slice occupied by the entry
+// uint32_t location; ///< slice contents location
+ uint32_t size; ///< slice contents size
+};
+
+} // namespace Ipc
+
+#endif /* SQUID_IPC_STORE_MAP_SLICE_H */