Needs polishing, a better replacement policy, and rebuild fixes.
ufs/RebuildState.cc
librock_la_SOURCES = \
+ rock/RockDbCell.cc \
rock/RockDbCell.h \
rock/RockIoState.cc \
rock/RockIoState.h \
--- /dev/null
+/*
+ * DEBUG: section 79 Disk IO Routines
+ */
+
+#include "squid.h"
+#include "fs/rock/RockDbCell.h"
+#include "ipc/StoreMap.h"
+#include "tools.h"
+
+Rock::DbCellHeader::DbCellHeader(): firstSlot(0), nextSlot(0), version(0),
+ payloadSize(0) {
+ xmemset(&key, 0, sizeof(key));
+}
+
+bool
+Rock::DbCellHeader::sane() const {
+ return firstSlot > 0;
+}
#ifndef SQUID_FS_ROCK_DB_CELL_H
#define SQUID_FS_ROCK_DB_CELL_H
+namespace Ipc
+{
+class StoreMapSlot;
+}
+
namespace Rock
{
class DbCellHeader
{
public:
- DbCellHeader(): payloadSize(0), reserved(0) {}
+ DbCellHeader();
/// whether the freshly loaded header fields make sense
- bool sane() const { return payloadSize >= 0 && reserved == 0; }
+ bool sane() const;
- int64_t payloadSize; ///< cell contents size excluding this header
- int64_t reserved; ///< reserved for future use (next cell pointer?)
+ uint64_t key[2]; ///< StoreEntry key
+ uint32_t firstSlot; ///< first slot pointer in the entry chain
+ uint32_t nextSlot; ///< next slot pointer in the entry chain
+ uint32_t version; ///< entry chain version
+ uint32_t payloadSize; ///< cell contents size excluding this header
};
} // namespace Rock
}
Rock::WriteRequest::WriteRequest(const ::WriteRequest &base,
- const IoState::Pointer &anSio):
+ const IoState::Pointer &anSio,
+ const bool last):
::WriteRequest(base),
- sio(anSio)
+ sio(anSio),
+ isLast(last)
{
}
class WriteRequest: public ::WriteRequest
{
public:
- WriteRequest(const ::WriteRequest &base, const IoState::Pointer &anSio);
+ WriteRequest(const ::WriteRequest &base, const IoState::Pointer &anSio, const bool last);
IoState::Pointer sio;
+ const bool isLast;
private:
CBDATA_CLASS2(WriteRequest);
#include "fs/rock/RockSwapDir.h"
#include "globals.h"
-Rock::IoState::IoState(SwapDir *dir,
+Rock::IoState::IoState(SwapDir &aDir,
StoreEntry *anEntry,
StoreIOState::STFNCB *cbFile,
StoreIOState::STIOCB *cbIo,
void *data):
- slotSize(0),
- diskOffset(-1),
- payloadEnd(-1)
+ dbSlot(NULL),
+ dir(aDir),
+ slotSize(dir.max_objsize),
+ objOffset(0)
{
e = anEntry;
- // swap_filen, swap_dirn, diskOffset, and payloadEnd are set by the caller
- slotSize = dir->max_objsize;
+ // swap_filen, swap_dirn and diskOffset are set by the caller
file_callback = cbFile;
callback = cbIo;
callback_data = cbdataReference(data);
{
assert(theFile != NULL);
assert(coreOff >= 0);
- offset_ = coreOff;
- // we skip our cell header; it is only read when building the map
- const int64_t cellOffset = sizeof(DbCellHeader) +
- static_cast<int64_t>(coreOff);
- assert(cellOffset <= payloadEnd);
+ Ipc::Mem::PageId pageId;
+ pageId.pool = dir.index;
+ if (coreOff < objOffset) { // rewind
+ pageId.number = dbSlot->firstSlot;
+ dbSlot = &dir.dbSlot(pageId);
+ objOffset = 0;
+ }
+
+ while (coreOff >= objOffset + dbSlot->payloadSize) {
+ objOffset += dbSlot->payloadSize;
+ pageId.number = dbSlot->nextSlot;
+ assert(pageId); // XXX: should be an error?
+ dbSlot = &dir.dbSlot(pageId);
+ }
+ if (pageId)
+ diskOffset = dir.diskOffset(pageId);
- // Core specifies buffer length, but we must not exceed stored entry size
- if (cellOffset + (int64_t)len > payloadEnd)
- len = payloadEnd - cellOffset;
+ offset_ = coreOff;
+ len = min(len,
+ static_cast<size_t>(objOffset + dbSlot->payloadSize - coreOff));
assert(read.callback == NULL);
assert(read.callback_data == NULL);
read.callback = cb;
read.callback_data = cbdataReference(data);
- theFile->read(new ReadRequest(
- ::ReadRequest(buf, diskOffset + cellOffset, len), this));
+ theFile->read(new ReadRequest(::ReadRequest(buf,
+ diskOffset + sizeof(DbCellHeader) + coreOff - objOffset, len), this));
}
-// We only buffer data here; we actually write when close() is called.
+// We only write data when full slot is accumulated or when close() is called.
// We buffer, in part, to avoid forcing OS to _read_ old unwritten portions
// of the slot when the write does not end at the page or sector boundary.
void
Rock::IoState::write(char const *buf, size_t size, off_t coreOff, FREE *dtor)
{
- // TODO: move to create?
- if (!coreOff) {
- assert(theBuf.isNull());
- assert(payloadEnd <= slotSize);
- theBuf.init(min(payloadEnd, slotSize), slotSize);
- // start with our header; TODO: consider making it a trailer
- DbCellHeader header;
- assert(static_cast<int64_t>(sizeof(header)) <= payloadEnd);
- header.payloadSize = payloadEnd - sizeof(header);
- theBuf.append(reinterpret_cast<const char*>(&header), sizeof(header));
- } else {
- // Core uses -1 offset as "append". Sigh.
- assert(coreOff == -1);
- assert(!theBuf.isNull());
+ assert(dbSlot);
+
+ if (theBuf.isNull()) {
+ theBuf.init(min(size + sizeof(DbCellHeader), slotSize), slotSize);
+ theBuf.appended(sizeof(DbCellHeader)); // will fill header in doWrite
}
- theBuf.append(buf, size);
- offset_ += size; // so that Core thinks we wrote it
+ if (size <= static_cast<size_t>(theBuf.spaceSize()))
+ theBuf.append(buf, size);
+ else {
+ Ipc::Mem::PageId pageId;
+ if (!dir.popDbSlot(pageId)) {
+ debugs(79, DBG_IMPORTANT, "WARNING: Rock cache_dir '" << dir.path <<
+ "' run out of DB slots");
+ dir.writeError(swap_filen);
+ // XXX: do we need to destroy buf on error?
+ if (dtor)
+ (dtor)(const_cast<char*>(buf)); // cast due to a broken API?
+ // XXX: do we need to call callback on error?
+ callBack(DISK_ERROR);
+ return;
+ }
+ DbCellHeader &nextDbSlot = dir.dbSlot(pageId);
+ memcpy(nextDbSlot.key, dbSlot->key, sizeof(nextDbSlot.key));
+ nextDbSlot.firstSlot = dbSlot->firstSlot;
+ nextDbSlot.nextSlot = 0;
+ nextDbSlot.version = dbSlot->version;
+ nextDbSlot.payloadSize = 0;
+
+ dbSlot->nextSlot = pageId.number;
+
+ const size_t left = size - theBuf.spaceSize();
+ offset_ += theBuf.spaceSize(); // so that Core thinks we wrote it
+ theBuf.append(buf, theBuf.spaceSize());
+
+ doWrite();
+
+ dbSlot = &nextDbSlot;
+ diskOffset = dir.diskOffset(pageId);
+ theBuf.init(min(left, slotSize), slotSize);
+ write(buf + size - left, left, -1, NULL);
+ }
if (dtor)
(dtor)(const_cast<char*>(buf)); // cast due to a broken API?
// write what was buffered during write() calls
void
-Rock::IoState::startWriting()
+Rock::IoState::doWrite(const bool isLast)
{
assert(theFile != NULL);
assert(!theBuf.isNull());
debugs(79, 5, HERE << swap_filen << " at " << diskOffset << '+' <<
theBuf.contentSize());
- assert(theBuf.contentSize() <= slotSize);
+ dbSlot->payloadSize = theBuf.contentSize() - sizeof(DbCellHeader);
+ memcpy(theBuf.content(), dbSlot, sizeof(DbCellHeader));
+
+ assert(static_cast<size_t>(theBuf.contentSize()) <= slotSize);
// theFile->write may call writeCompleted immediatelly
- theFile->write(new WriteRequest(::WriteRequest(theBuf.content(),
- diskOffset, theBuf.contentSize(), theBuf.freeFunc()), this));
+ WriteRequest *const r = new WriteRequest(
+ ::WriteRequest(theBuf.content(), diskOffset, theBuf.contentSize(),
+ theBuf.freeFunc()), this, isLast);
+ theFile->write(r);
}
//
debugs(79, 3, HERE << swap_filen << " accumulated: " << offset_ <<
" how=" << how);
if (how == wroteAll && !theBuf.isNull())
- startWriting();
+ doWrite(true);
else
callBack(how == writerGone ? DISK_ERROR : 0); // TODO: add DISK_CALLER_GONE
}
namespace Rock
{
+class DbCellHeader;
class SwapDir;
/// \ingroup Rock
public:
typedef RefCount<IoState> Pointer;
- IoState(SwapDir *dir, StoreEntry *e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data);
+ IoState(SwapDir &aDir, StoreEntry *e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data);
virtual ~IoState();
void file(const RefCount<DiskFile> &aFile);
virtual void write(char const *buf, size_t size, off_t offset, FREE * free_func);
virtual void close(int how);
- /// called by SwapDir when writing is done
void finishedWriting(int errFlag);
- int64_t slotSize; ///< db cell size
int64_t diskOffset; ///< the start of this cell inside the db file
-
- /// when reading: number of bytes previously written to the db cell;
- /// when writing: maximum payload offset in a db cell
- int64_t payloadEnd;
+ DbCellHeader *dbSlot; ///< current db slot, used for writing
MEMPROXY_CLASS(IoState);
private:
- void startWriting();
+ void doWrite(const bool isLast = false);
void callBack(int errflag);
+ SwapDir &dir; ///< swap dir object
+ const size_t slotSize; ///< db cell size
+ int64_t objOffset; ///< object offset for current db slot
+
RefCount<DiskFile> theFile; // "file" responsible for this I/O
MemBuf theBuf; // use for write content accumulation only
};
#include "fs/rock/RockRebuild.h"
#include "fs/rock/RockSwapDir.h"
#include "fs/rock/RockDbCell.h"
+#include "ipc/StoreMap.h"
#include "globals.h"
#include "md5.h"
#include "tools.h"
dbSize(0),
dbEntrySize(0),
dbEntryLimit(0),
+ dbSlot(0),
fd(-1),
dbOffset(0),
filen(0)
dbSize = sd->diskOffsetLimit(); // we do not care about the trailer waste
dbEntrySize = sd->max_objsize;
dbEntryLimit = sd->entryLimit();
+ loaded.reserve(dbSize);
+ for (size_t i = 0; i < loaded.size(); ++i)
+ loaded.push_back(false);
}
Rock::Rebuild::~Rebuild()
void
Rock::Rebuild::checkpoint()
{
- if (!done())
+ if (dbOffset < dbSize)
eventAdd("Rock::Rebuild", Rock::Rebuild::Steps, this, 0.01, 1, true);
+ else
+ if (!doneAll()) {
+ eventAdd("Rock::Rebuild::Step2", Rock::Rebuild::Steps2, this, 0.01, 1,
+ true);
+ }
}
bool
Rock::Rebuild::doneAll() const
{
- return dbOffset >= dbSize && AsyncJob::doneAll();
+ return dbSlot >= dbSize && AsyncJob::doneAll();
}
void
CallJobHere(47, 5, static_cast<Rebuild*>(data), Rock::Rebuild, steps);
}
+void
+Rock::Rebuild::Steps2(void *data)
+{
+ // use async call to enable job call protection that time events lack
+ CallJobHere(47, 5, static_cast<Rebuild*>(data), Rock::Rebuild, steps2);
+}
+
void
Rock::Rebuild::steps()
{
checkpoint();
}
+void
+Rock::Rebuild::steps2()
+{
+ debugs(47,5, HERE << sd->index << " filen " << filen << " at " <<
+ dbSlot << " <= " << dbSize);
+
+ // Balance our desire to maximize the number of slots processed at once
+ // (and, hence, minimize overheads and total rebuild time) with a
+ // requirement to also process Coordinator events, disk I/Os, etc.
+ const int maxSpentMsec = 50; // keep small: most RAM I/Os are under 1ms
+ const timeval loopStart = current_time;
+
+ int loaded = 0;
+ while (dbSlot < dbSize) {
+ doOneSlot();
+ ++dbSlot;
+ ++loaded;
+
+ if (opt_foreground_rebuild)
+ continue; // skip "few entries at a time" check below
+
+ getCurrentTime();
+ const double elapsedMsec = tvSubMsec(loopStart, current_time);
+ if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) {
+ debugs(47, 5, HERE << "pausing after " << loaded << " slots in " <<
+ elapsedMsec << "ms; " << (elapsedMsec/loaded) << "ms per slot");
+ break;
+ }
+ }
+
+ checkpoint();
+}
+
void
Rock::Rebuild::doOneEntry()
{
failure("cannot seek to db entry", errno);
MemBuf buf;
- buf.init(SM_PAGE_SIZE, SM_PAGE_SIZE);
+ buf.init(sizeof(DbCellHeader), sizeof(DbCellHeader));
if (!storeRebuildLoadEntry(fd, sd->index, buf, counts))
return;
// get our header
- DbCellHeader header;
+ Ipc::Mem::PageId pageId;
+ pageId.pool = sd->index;
+ pageId.number = filen + 1;
+ DbCellHeader &header = sd->dbSlot(pageId);
+ assert(!header.sane());
+
if (buf.contentSize() < static_cast<mb_size_t>(sizeof(header))) {
debugs(47, DBG_IMPORTANT, "WARNING: cache_dir[" << sd->index << "]: " <<
"Ignoring truncated cache entry meta data at " << dbOffset);
- ++counts.invalid;
+ invalidSlot(pageId);
return;
}
memcpy(&header, buf.content(), sizeof(header));
if (!header.sane()) {
debugs(47, DBG_IMPORTANT, "WARNING: cache_dir[" << sd->index << "]: " <<
"Ignoring malformed cache entry meta data at " << dbOffset);
- ++counts.invalid;
- return;
- }
- buf.consume(sizeof(header)); // optimize to avoid memmove()
-
- cache_key key[SQUID_MD5_DIGEST_LENGTH];
- StoreEntry loadedE;
- if (!storeRebuildParseEntry(buf, loadedE, key, counts, header.payloadSize)) {
- // skip empty slots
- if (loadedE.swap_filen > 0 || loadedE.swap_file_sz > 0) {
- ++counts.invalid;
- //sd->unlink(filen); leave garbage on disk, it should not hurt
- }
+ invalidSlot(pageId);
return;
}
+}
- assert(loadedE.swap_filen < dbEntryLimit);
- if (!storeRebuildKeepEntry(loadedE, key, counts))
+void
+Rock::Rebuild::doOneSlot()
+{
+ debugs(47,5, HERE << sd->index << " filen " << filen << " at " <<
+ dbSlot << " <= " << dbSize);
+
+ if (loaded[dbSlot])
return;
- ++counts.objcount;
- // loadedE->dump(5);
+ Ipc::Mem::PageId pageId;
+ pageId.pool = sd->index;
+ pageId.number = dbSlot + 1;
+ const DbCellHeader &dbSlot = sd->dbSlot(pageId);
+ assert(dbSlot.sane());
+
+ pageId.number = dbSlot.firstSlot;
+ //const DbCellHeader &firstChainSlot = sd->dbSlot(pageId);
- sd->addEntry(filen, header, loadedE);
+ /* Process all not yet loaded slots, verify entry chains, if chain
+ is valid, load entry from first slot similar to small rock,
+ call SwapDir::addEntry (needs to be restored). */
}
void
fatalf("Rock cache_dir[%d] rebuild of %s failed: %s.",
sd->index, sd->filePath, msg);
}
+
+void Rock::Rebuild::invalidSlot(Ipc::Mem::PageId &pageId)
+{
+ ++counts.invalid;
+ loaded[pageId.number - 1] = true;
+ sd->dbSlotIndex->push(pageId);
+}
#include "cbdata.h"
#include "store_rebuild.h"
+namespace Ipc
+{
+namespace Mem
+{
+class PageId;
+}
+}
+
namespace Rock
{
private:
void checkpoint();
void steps();
+ void steps2();
void doOneEntry();
+ void doOneSlot();
void failure(const char *msg, int errNo = 0);
+ void invalidSlot(Ipc::Mem::PageId &pageId);
SwapDir *sd;
int64_t dbSize;
int dbEntrySize;
int dbEntryLimit;
+ int dbSlot;
int fd; // store db file descriptor
int64_t dbOffset;
int filen;
+ Vector<bool> loaded; ///< true iff rebuilt is complete for a given slot
+
StoreRebuildData counts;
static void Steps(void *data);
+ static void Steps2(void *data);
CBDATA_CLASS2(Rebuild);
};
theFile->configure(fileConfig);
theFile->open(O_RDWR, 0644, this);
+ dbSlotIndex = shm_old(Ipc::Mem::PageStack)(path);
+ dbSlots = new (reinterpret_cast<char *>(dbSlotIndex.getRaw()) +
+ dbSlotIndex->stackSize()) DbCellHeader[entryLimitAllowed()];
+
// Increment early. Otherwise, if one SwapDir finishes rebuild before
// others start, storeRebuildComplete() will think the rebuild is over!
// TODO: move store_dirs_rebuilding hack to store modules that need it.
AsyncJob::Start(new Rebuild(this));
}
-/* Add a new object to the cache with empty memory copy and pointer to disk
- * use to rebuild store from disk. Based on UFSSwapDir::addDiskRestore */
-bool
-Rock::SwapDir::addEntry(const int filen, const DbCellHeader &header, const StoreEntry &from)
-{
- debugs(47, 8, HERE << &from << ' ' << from.getMD5Text() <<
- ", filen="<< std::setfill('0') << std::hex << std::uppercase <<
- std::setw(8) << filen);
-
- sfileno newLocation = 0;
- if (Ipc::StoreMapSlot *slot = map->openForWriting(reinterpret_cast<const cache_key *>(from.key), newLocation)) {
- if (filen == newLocation) {
- slot->set(from);
- map->extras(filen) = header;
- } // else some other, newer entry got into our cell
- map->closeForWriting(newLocation, false);
- return filen == newLocation;
- }
-
- return false;
-}
-
bool
Rock::SwapDir::canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const
{
if (!map)
return false;
+ // TODO: consider DB slots freed when older object would be replaced
+ if (dbSlotIndex->size() <
+ static_cast<unsigned int>(max(entriesNeeded(diskSpaceNeeded), 1)))
+ return false;
+
// Do not start I/O transaction if there are less than 10% free pages left.
// TODO: reserve page instead
if (needsDiskStrand() &&
return NULL;
}
- // compute payload size for our cell header, using StoreEntry info
- // careful: e.objectLen() may still be negative here
- const int64_t expectedReplySize = e.mem_obj->expectedReplySize();
- assert(expectedReplySize >= 0); // must know to prevent cell overflows
- assert(e.mem_obj->swap_hdr_sz > 0);
- DbCellHeader header;
- header.payloadSize = e.mem_obj->swap_hdr_sz + expectedReplySize;
- const int64_t payloadEnd = sizeof(DbCellHeader) + header.payloadSize;
- assert(payloadEnd <= max_objsize);
-
sfileno filen;
Ipc::StoreMapSlot *const slot =
map->openForWriting(reinterpret_cast<const cache_key *>(e.key), filen);
debugs(47, 5, HERE << "map->add failed");
return NULL;
}
- e.swap_file_sz = header.payloadSize; // and will be copied to the map
+
+ Ipc::Mem::PageId pageId;
+ if (!popDbSlot(pageId)) {
+ debugs(79, DBG_IMPORTANT, "WARNING: Rock cache_dir '" << filePath <<
+ "' run out of DB slots");
+ map->free(filen);
+ }
+
slot->set(e);
- map->extras(filen) = header;
// XXX: We rely on our caller, storeSwapOutStart(), to set e.fileno.
// If that does not happen, the entry will not decrement the read level!
- IoState *sio = new IoState(this, &e, cbFile, cbIo, data);
+ IoState *sio = new IoState(*this, &e, cbFile, cbIo, data);
sio->swap_dirn = index;
sio->swap_filen = filen;
- sio->payloadEnd = payloadEnd;
- sio->diskOffset = diskOffset(sio->swap_filen);
+ sio->diskOffset = diskOffset(pageId);
+
+ DbCellHeader &firstDbSlot = dbSlot(pageId);
+ memcpy(firstDbSlot.key, e.key, sizeof(firstDbSlot.key));
+ firstDbSlot.firstSlot = pageId.number;
+ firstDbSlot.nextSlot = 0;
+ ++firstDbSlot.version;
+ firstDbSlot.payloadSize = 0;
+ sio->dbSlot = &firstDbSlot;
debugs(47,5, HERE << "dir " << index << " created new filen " <<
std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
- sio->swap_filen << std::dec << " at " << sio->diskOffset);
-
- assert(sio->diskOffset + payloadEnd <= diskOffsetLimit());
+ sio->swap_filen << std::dec << " at " <<
+ diskOffset(sio->swap_filen));
sio->file(theFile);
int64_t
Rock::SwapDir::diskOffset(int filen) const
{
- assert(filen >= 0);
- return HeaderSize + max_objsize*filen;
+ assert(filen >= 0);
+ return HeaderSize + max_objsize*filen;
+}
+
+int64_t
+Rock::SwapDir::diskOffset(Ipc::Mem::PageId &pageId) const
+{
+ assert(pageId);
+ return diskOffset(pageId.number - 1);
}
int64_t
return diskOffset(map->entryLimit());
}
+int
+Rock::SwapDir::entryMaxPayloadSize() const
+{
+ return max_objsize - sizeof(DbCellHeader);
+}
+
+int
+Rock::SwapDir::entriesNeeded(const int64_t objSize) const
+{
+ return (objSize + entryMaxPayloadSize() - 1) / entryMaxPayloadSize();
+}
+
+bool
+Rock::SwapDir::popDbSlot(Ipc::Mem::PageId &pageId)
+{
+ return dbSlotIndex->pop(pageId);
+}
+
+Rock::DbCellHeader &
+Rock::SwapDir::dbSlot(const Ipc::Mem::PageId &pageId)
+{
+ const DbCellHeader &s = const_cast<const SwapDir *>(this)->dbSlot(pageId);
+ return const_cast<DbCellHeader &>(s);
+}
+
+const Rock::DbCellHeader &
+Rock::SwapDir::dbSlot(const Ipc::Mem::PageId &pageId) const
+{
+ assert(dbSlotIndex->pageIdIsValid(pageId));
+ return dbSlots[pageId.number - 1];
+}
+
+void
+Rock::SwapDir::cleanReadable(const sfileno fileno)
+{
+ Ipc::Mem::PageId pageId = map->extras(fileno).pageId;
+ Ipc::Mem::PageId nextPageId = pageId;
+ while (pageId) {
+ const DbCellHeader &curDbSlot = dbSlot(pageId);
+ nextPageId.number = curDbSlot.nextSlot;
+ const DbCellHeader &nextDbSlot = dbSlot(nextPageId);
+ const bool sameChain = memcmp(curDbSlot.key, nextDbSlot.key,
+ sizeof(curDbSlot.key)) == 0 &&
+ curDbSlot.version == nextDbSlot.version;
+ dbSlotIndex->push(pageId);
+ if (sameChain)
+ pageId = nextPageId;
+ }
+}
+
// tries to open an old or being-written-to entry with swap_filen for reading
StoreIOState::Pointer
Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
if (!slot)
return NULL; // we were writing afterall
- IoState *sio = new IoState(this, &e, cbFile, cbIo, data);
+ IoState *sio = new IoState(*this, &e, cbFile, cbIo, data);
sio->swap_dirn = index;
sio->swap_filen = e.swap_filen;
- sio->payloadEnd = sizeof(DbCellHeader) + map->extras(e.swap_filen).payloadSize;
- assert(sio->payloadEnd <= max_objsize); // the payload fits the slot
+ sio->dbSlot = &dbSlot(map->extras(e.swap_filen).pageId);
+
+ const Ipc::Mem::PageId &pageId = map->extras(e.swap_filen).pageId;
+ sio->diskOffset = diskOffset(pageId);
+ DbCellHeader &firstDbSlot = dbSlot(map->extras(e.swap_filen).pageId);
+ assert(memcmp(firstDbSlot.key, e.key, sizeof(firstDbSlot.key)));
+ assert(firstDbSlot.firstSlot == pageId.number);
debugs(47,5, HERE << "dir " << index << " has old filen: " <<
std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
assert(slot->basics.swap_file_sz > 0);
assert(slot->basics.swap_file_sz == e.swap_file_sz);
- sio->diskOffset = diskOffset(sio->swap_filen);
- assert(sio->diskOffset + sio->payloadEnd <= diskOffsetLimit());
-
sio->file(theFile);
return sio;
}
if (errflag == DISK_OK && rlen > 0)
sio->offset_ += rlen;
- assert(sio->diskOffset + sio->offset_ <= diskOffsetLimit()); // post-factum
StoreIOState::STRCB *callback = sio->read.callback;
assert(callback);
// close, assuming we only write once; the entry gets the read lock
map->closeForWriting(sio.swap_filen, true);
// do not increment sio.offset_ because we do it in sio->write()
- } else {
- // Do not abortWriting here. The entry should keep the write lock
- // instead of losing association with the store and confusing core.
- map->free(sio.swap_filen); // will mark as unusable, just in case
- }
-
- assert(sio.diskOffset + sio.offset_ <= diskOffsetLimit()); // post-factum
+ if (request->isLast)
+ sio.finishedWriting(errflag);
+ } else
+ writeError(sio.swap_filen);
+}
- sio.finishedWriting(errflag);
+void
+Rock::SwapDir::writeError(const sfileno fileno)
+{
+ // Do not abortWriting here. The entry should keep the write lock
+ // instead of losing association with the store and confusing core.
+ map->free(fileno); // will mark as unusable, just in case
+ // XXX: should we call IoState callback?
}
bool
void Rock::SwapDirRr::create(const RunnerRegistry &)
{
- Must(owners.empty());
+ Must(mapOwners.empty() && dbSlotsOwners.empty());
for (int i = 0; i < Config.cacheSwap.n_configured; ++i) {
if (const Rock::SwapDir *const sd = dynamic_cast<Rock::SwapDir *>(INDEXSD(i))) {
- Rock::SwapDir::DirMap::Owner *const owner =
- Rock::SwapDir::DirMap::Init(sd->path, sd->entryLimitAllowed());
- owners.push_back(owner);
+ const int64_t capacity = sd->entryLimitAllowed();
+ SwapDir::DirMap::Owner *const mapOwner =
+ SwapDir::DirMap::Init(sd->path, capacity);
+ mapOwners.push_back(mapOwner);
+
+ // XXX: remove pool id and counters from PageStack
+ Ipc::Mem::Owner<Ipc::Mem::PageStack> *const dbSlotsOwner =
+ shm_new(Ipc::Mem::PageStack)(sd->path, i, capacity,
+ sizeof(DbCellHeader));
+ dbSlotsOwners.push_back(dbSlotsOwner);
+
+ // XXX: add method to initialize PageStack with no free pages
+ while (true) {
+ Ipc::Mem::PageId pageId;
+ if (!dbSlotsOwner->object()->pop(pageId))
+ break;
+ }
}
}
}
Rock::SwapDirRr::~SwapDirRr()
{
- for (size_t i = 0; i < owners.size(); ++i)
- delete owners[i];
+ for (size_t i = 0; i < mapOwners.size(); ++i) {
+ delete mapOwners[i];
+ delete dbSlotsOwners[i];
+ }
}
#include "DiskIO/IORequestor.h"
#include "fs/rock/RockDbCell.h"
#include "ipc/StoreMap.h"
+#include "ipc/mem/Page.h"
+#include "ipc/mem/PageStack.h"
class DiskIOStrategy;
class ReadRequest;
class Rebuild;
/// \ingroup Rock
-class SwapDir: public ::SwapDir, public IORequestor
+class SwapDir: public ::SwapDir, public IORequestor, public Ipc::StoreMapCleaner
{
public:
SwapDir();
virtual void create();
virtual void parse(int index, char *path);
+ // XXX: stop misusing max_objsize as slot size
+ virtual int64_t maxObjectSize() const { return max_objsize * entryLimitAllowed(); }
+
int64_t entryLimitHigh() const { return SwapFilenMax; } ///< Core limit
int64_t entryLimitAllowed() const;
- typedef Ipc::StoreMapWithExtras<DbCellHeader> DirMap;
+ bool popDbSlot(Ipc::Mem::PageId &pageId);
+ DbCellHeader &dbSlot(const Ipc::Mem::PageId &pageId);
+ const DbCellHeader &dbSlot(const Ipc::Mem::PageId &pageId) const;
+
+ int64_t diskOffset(Ipc::Mem::PageId &pageId) const;
+ void writeError(const sfileno fileno);
+
+ virtual void cleanReadable(const sfileno fileno);
+
+ // TODO: merge with MemStoreMapExtras?
+ struct MapExtras {
+ Ipc::Mem::PageId pageId;
+ };
+ typedef Ipc::StoreMapWithExtras<MapExtras> DirMap;
protected:
/* protected ::SwapDir API */
void dumpRateOption(StoreEntry * e) const;
void rebuild(); ///< starts loading and validating stored entry metadata
- ///< used to add entries successfully loaded during rebuild
- bool addEntry(const int fileno, const DbCellHeader &header, const StoreEntry &from);
bool full() const; ///< no more entries can be stored without purging
void trackReferences(StoreEntry &e); ///< add to replacement policy scope
int64_t diskOffset(int filen) const;
int64_t diskOffsetLimit() const;
int entryLimit() const { return map->entryLimit(); }
+ int entryMaxPayloadSize() const;
+ int entriesNeeded(const int64_t objSize) const;
friend class Rebuild;
const char *filePath; ///< location of cache storage file inside path/
DiskIOStrategy *io;
RefCount<DiskFile> theFile; ///< cache storage for this cache_dir
DirMap *map;
+ DbCellHeader *dbSlots;
+ Ipc::Mem::Pointer<Ipc::Mem::PageStack> dbSlotIndex;
/* configurable options */
DiskFile::Config fileConfig; ///< file-level configuration options
virtual void create(const RunnerRegistry &);
private:
- Vector<SwapDir::DirMap::Owner *> owners;
+ Vector<SwapDir::DirMap::Owner *> mapOwners;
+ Vector< Ipc::Mem::Owner<Ipc::Mem::PageStack> *> dbSlotsOwners;
};
} // namespace Rock
~Owner();
+ Class *object() { return theObject; }
+
private:
Owner(const char *const id, const off_t sharedSize);