/*
- * Copyright (C) 1996-2016 The Squid Software Foundation and contributors
+ * Copyright (C) 1996-2020 The Squid Software Foundation and contributors
*
* Squid software is distributed under GPLv2+ license and includes
* contributions from numerous individuals and organizations.
// create a brand new store entry and initialize it with stored basics
StoreEntry *e = new StoreEntry();
+ e->createMemObject();
anchorEntry(*e, filen, *slot);
-
- e->hashInsert(key);
trackReferences(*e);
-
return e;
- // the disk entry remains open for reading, protected from modifications
}
bool
-Rock::SwapDir::anchorCollapsed(StoreEntry &collapsed, bool &inSync)
+Rock::SwapDir::anchorToCache(StoreEntry &entry, bool &inSync)
{
if (!map || !theFile || !theFile->canRead())
return false;
sfileno filen;
const Ipc::StoreMapAnchor *const slot = map->openForReading(
- reinterpret_cast<cache_key*>(collapsed.key), filen);
+ reinterpret_cast<cache_key*>(entry.key), filen);
if (!slot)
return false;
- anchorEntry(collapsed, filen, *slot);
- inSync = updateCollapsedWith(collapsed, *slot);
+ anchorEntry(entry, filen, *slot);
+ inSync = updateAnchoredWith(entry, *slot);
return true; // even if inSync is false
}
bool
-Rock::SwapDir::updateCollapsed(StoreEntry &collapsed)
+Rock::SwapDir::updateAnchored(StoreEntry &entry)
{
if (!map || !theFile || !theFile->canRead())
return false;
- if (collapsed.swap_filen < 0) // no longer using a disk cache
- return true;
- assert(collapsed.swap_dirn == index);
+ assert(entry.hasDisk(index));
- const Ipc::StoreMapAnchor &s = map->readableEntry(collapsed.swap_filen);
- return updateCollapsedWith(collapsed, s);
+ const Ipc::StoreMapAnchor &s = map->readableEntry(entry.swap_filen);
+ return updateAnchoredWith(entry, s);
}
bool
-Rock::SwapDir::updateCollapsedWith(StoreEntry &collapsed, const Ipc::StoreMapAnchor &anchor)
+Rock::SwapDir::updateAnchoredWith(StoreEntry &entry, const Ipc::StoreMapAnchor &anchor)
{
- collapsed.swap_file_sz = anchor.basics.swap_file_sz;
+ entry.swap_file_sz = anchor.basics.swap_file_sz;
return true;
}
void
Rock::SwapDir::anchorEntry(StoreEntry &e, const sfileno filen, const Ipc::StoreMapAnchor &anchor)
{
- const Ipc::StoreMapAnchor::Basics &basics = anchor.basics;
+ anchor.exportInto(e);
- e.swap_file_sz = basics.swap_file_sz;
- e.lastref = basics.lastref;
- e.timestamp = basics.timestamp;
- e.expires = basics.expires;
- e.lastmod = basics.lastmod;
- e.refcount = basics.refcount;
- e.flags = basics.flags;
-
- if (anchor.complete()) {
- e.store_status = STORE_OK;
- e.swap_status = SWAPOUT_DONE;
- } else {
- e.store_status = STORE_PENDING;
- e.swap_status = SWAPOUT_WRITING; // even though another worker writes?
- }
+ const bool complete = anchor.complete();
+ e.store_status = complete ? STORE_OK : STORE_PENDING;
+ // SWAPOUT_WRITING: even though another worker writes?
+ e.attachToDisk(index, filen, complete ? SWAPOUT_DONE : SWAPOUT_WRITING);
e.ping_status = PING_NONE;
- EBIT_CLR(e.flags, RELEASE_REQUEST);
- EBIT_CLR(e.flags, KEY_PRIVATE);
EBIT_SET(e.flags, ENTRY_VALIDATED);
-
- e.swap_dirn = index;
- e.swap_filen = filen;
}
void Rock::SwapDir::disconnect(StoreEntry &e)
{
- assert(e.swap_dirn == index);
- assert(e.swap_filen >= 0);
- // cannot have SWAPOUT_NONE entry with swap_filen >= 0
- assert(e.swap_status != SWAPOUT_NONE);
+ assert(e.hasDisk(index));
+
+ ignoreReferences(e);
// do not rely on e.swap_status here because there is an async delay
// before it switches from SWAPOUT_WRITING to SWAPOUT_DONE.
if (e.mem_obj && e.mem_obj->swapout.sio != NULL &&
dynamic_cast<IoState&>(*e.mem_obj->swapout.sio).writeableAnchor_) {
map->abortWriting(e.swap_filen);
- e.swap_dirn = -1;
- e.swap_filen = -1;
- e.swap_status = SWAPOUT_NONE;
+ e.detachFromDisk();
dynamic_cast<IoState&>(*e.mem_obj->swapout.sio).writeableAnchor_ = NULL;
- Store::Root().transientsAbandon(e); // broadcasts after the change
+ Store::Root().stopSharing(e); // broadcasts after the change
} else {
map->closeForReading(e.swap_filen);
- e.swap_dirn = -1;
- e.swap_filen = -1;
- e.swap_status = SWAPOUT_NONE;
+ e.detachFromDisk();
}
}
}
void
-Rock::SwapDir::swappedOut(const StoreEntry &)
+Rock::SwapDir::finalizeSwapoutSuccess(const StoreEntry &)
{
- // stats are not stored but computed when needed
+ // nothing to do
+}
+
+void
+Rock::SwapDir::finalizeSwapoutFailure(StoreEntry &entry)
+{
+ debugs(47, 5, entry);
+ disconnect(entry); // calls abortWriting() to free the disk entry
}
int64_t
void
Rock::SwapDir::createError(const char *const msg)
{
+ int xerrno = errno; // XXX: where does errno come from?
debugs(47, DBG_CRITICAL, "ERROR: Failed to initialize Rock Store db in " <<
- filePath << "; " << msg << " error: " << xstrerror());
+ filePath << "; " << msg << " error: " << xstrerr(xerrno));
fatal("Rock Store db creation error");
}
else
return false;
- if (!value)
+ if (!value) {
self_destruct();
+ return false;
+ }
// TODO: handle time units and detect parsing errors better
const int64_t parsedValue = strtoll(value, NULL, 10);
if (parsedValue < 0) {
debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
self_destruct();
+ return false;
}
const time_msec_t newTime = static_cast<time_msec_t>(parsedValue);
else
return false;
- if (!value)
+ if (!value) {
self_destruct();
+ return false;
+ }
// TODO: handle time units and detect parsing errors better
const int64_t parsedValue = strtoll(value, NULL, 10);
if (parsedValue < 0) {
debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
self_destruct();
+ return false;
}
const int newRate = static_cast<int>(parsedValue);
if (newRate < 0) {
debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << newRate);
self_destruct();
+ return false;
}
if (!isaReconfig)
else
return false;
- if (!value)
+ if (!value) {
self_destruct();
+ return false;
+ }
// TODO: handle size units and detect parsing errors better
const uint64_t newSize = strtoll(value, NULL, 10);
if (newSize <= 0) {
debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must be positive; got: " << newSize);
self_destruct();
+ return false;
}
if (newSize <= sizeof(DbCellHeader)) {
debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must exceed " << sizeof(DbCellHeader) << "; got: " << newSize);
self_destruct();
+ return false;
}
if (!reconfig)
bool
Rock::SwapDir::canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const
{
- if (!::SwapDir::canStore(e, sizeof(DbCellHeader)+diskSpaceNeeded, load))
+ if (diskSpaceNeeded >= 0)
+ diskSpaceNeeded += sizeof(DbCellHeader);
+ if (!::SwapDir::canStore(e, diskSpaceNeeded, load))
return false;
if (!theFile || !theFile->canWrite())
return diskOffset(map->sliceLimit());
}
-bool
-Rock::SwapDir::useFreeSlot(Ipc::Mem::PageId &pageId)
+Rock::SlotId
+Rock::SwapDir::reserveSlotForWriting()
{
+ Ipc::Mem::PageId pageId;
+
if (freeSlots->pop(pageId)) {
- debugs(47, 5, "got a previously free slot: " << pageId);
- return true;
+ const auto slotId = pageId.number - 1;
+ debugs(47, 5, "got a previously free slot: " << slotId);
+ map->prepFreeSlice(slotId);
+ return slotId;
}
// catch free slots delivered to noteFreeMapSlice()
if (map->purgeOne()) {
assert(!waitingForPage); // noteFreeMapSlice() should have cleared it
assert(pageId.set());
- debugs(47, 5, "got a previously busy slot: " << pageId);
- return true;
+ const auto slotId = pageId.number - 1;
+ debugs(47, 5, "got a previously busy slot: " << slotId);
+ map->prepFreeSlice(slotId);
+ return slotId;
}
assert(waitingForPage == &pageId);
waitingForPage = NULL;
+ // This may happen when the number of available db slots is close to the
+ // number of concurrent requests reading or writing those slots, which may
+ // happen when the db is "small" compared to the request traffic OR when we
+ // are rebuilding and have not loaded "many" entries or empty slots yet.
debugs(47, 3, "cannot get a slot; entries: " << map->entryCount());
- return false;
+ throw TexcHere("ran out of free db slots");
}
bool
Rock::SwapDir::noteFreeMapSlice(const Ipc::StoreMapSliceId sliceId)
{
Ipc::Mem::PageId pageId;
- pageId.pool = index+1;
+ pageId.pool = Ipc::Mem::PageStack::IdForSwapDirSpace(index);
pageId.number = sliceId+1;
if (waitingForPage) {
*waitingForPage = pageId;
return NULL;
}
- if (e.swap_filen < 0) {
+ if (!e.hasDisk()) {
debugs(47,4, HERE << e);
return NULL;
}
if (!theFile)
fatalf("Rock cache_dir failed to initialize db file: %s", filePath);
- if (theFile->error())
+ if (theFile->error()) {
+ int xerrno = errno; // XXX: where does errno come from
fatalf("Rock cache_dir at %s failed to open db file: %s", filePath,
- xstrerror());
+ xstrerr(xerrno));
+ }
debugs(47, 2, "Rock cache_dir[" << index << "] limits: " <<
std::setw(12) << maxSize() << " disk bytes, " <<
ReadRequest *request = dynamic_cast<Rock::ReadRequest*>(r.getRaw());
assert(request);
IoState::Pointer sio = request->sio;
-
- if (errflag == DISK_OK && rlen > 0)
- sio->offset_ += rlen;
-
- sio->callReaderBack(r->buf, rlen);
+ sio->handleReadCompletion(*request, rlen, errflag);
}
void
Rock::SwapDir::writeCompleted(int errflag, size_t, RefCount< ::WriteRequest> r)
{
+ // TODO: Move details into IoState::handleWriteCompletion() after figuring
+ // out how to deal with map access. See readCompleted().
+
Rock::WriteRequest *request = dynamic_cast<Rock::WriteRequest*>(r.getRaw());
assert(request);
assert(request->sio != NULL);
// quit if somebody called IoState::close() while we were waiting
if (!sio.stillWaiting()) {
debugs(79, 3, "ignoring closed entry " << sio.swap_filen);
- noteFreeMapSlice(request->sidNext);
+ noteFreeMapSlice(request->sidCurrent);
return;
}
debugs(79, 7, "errflag=" << errflag << " rlen=" << request->len << " eof=" << request->eof);
- // TODO: Fail if disk dropped one of the previous write requests.
-
- if (errflag == DISK_OK) {
- // do not increment sio.offset_ because we do it in sio->write()
+ if (errflag != DISK_OK)
+ handleWriteCompletionProblem(errflag, *request);
+ else if (!sio.expectedReply(request->id))
+ handleWriteCompletionProblem(DISK_ERROR, *request);
+ else
+ handleWriteCompletionSuccess(*request);
- // finalize the shared slice info after writing slice contents to disk
- Ipc::StoreMap::Slice &slice =
- map->writeableSlice(sio.swap_filen, request->sidCurrent);
- slice.size = request->len - sizeof(DbCellHeader);
- slice.next = request->sidNext;
+ if (sio.touchingStoreEntry())
+ CollapsedForwarding::Broadcast(*sio.e);
+}
- if (request->eof) {
- assert(sio.e);
- assert(sio.writeableAnchor_);
- if (sio.touchingStoreEntry()) {
- sio.e->swap_file_sz = sio.writeableAnchor_->basics.swap_file_sz =
- sio.offset_;
+/// code shared by writeCompleted() success handling cases
+void
+Rock::SwapDir::handleWriteCompletionSuccess(const WriteRequest &request)
+{
+ auto &sio = *(request.sio);
+ sio.splicingPoint = request.sidCurrent;
+ // do not increment sio.offset_ because we do it in sio->write()
- // close, the entry gets the read lock
- map->closeForWriting(sio.swap_filen, true);
- }
- sio.writeableAnchor_ = NULL;
- sio.splicingPoint = request->sidCurrent;
- sio.finishedWriting(errflag);
- }
+ assert(sio.writeableAnchor_);
+ if (sio.writeableAnchor_->start < 0) { // wrote the first slot
+ Must(request.sidPrevious < 0);
+ sio.writeableAnchor_->start = request.sidCurrent;
} else {
- noteFreeMapSlice(request->sidNext);
+ Must(request.sidPrevious >= 0);
+ map->writeableSlice(sio.swap_filen, request.sidPrevious).next = request.sidCurrent;
+ }
- writeError(sio);
- sio.finishedWriting(errflag);
- // and hope that Core will call disconnect() to close the map entry
+ // finalize the shared slice info after writing slice contents to disk;
+ // the chain gets possession of the slice we were writing
+ Ipc::StoreMap::Slice &slice =
+ map->writeableSlice(sio.swap_filen, request.sidCurrent);
+ slice.size = request.len - sizeof(DbCellHeader);
+ Must(slice.next < 0);
+
+ if (request.eof) {
+ assert(sio.e);
+ if (sio.touchingStoreEntry()) {
+ sio.e->swap_file_sz = sio.writeableAnchor_->basics.swap_file_sz =
+ sio.offset_;
+
+ map->switchWritingToReading(sio.swap_filen);
+ // sio.e keeps the (now read) lock on the anchor
+ }
+ sio.writeableAnchor_ = NULL;
+ sio.finishedWriting(DISK_OK);
}
+}
- if (sio.touchingStoreEntry())
- CollapsedForwarding::Broadcast(*sio.e);
+/// code shared by writeCompleted() error handling cases
+void
+Rock::SwapDir::handleWriteCompletionProblem(const int errflag, const WriteRequest &request)
+{
+ auto &sio = *request.sio;
+
+ noteFreeMapSlice(request.sidCurrent);
+
+ writeError(sio);
+ sio.finishedWriting(errflag);
+ // and hope that Core will call disconnect() to close the map entry
}
void
map->freeEntry(sio.swap_filen); // will mark as unusable, just in case
if (sio.touchingStoreEntry())
- Store::Root().transientsAbandon(*sio.e);
+ Store::Root().stopSharing(*sio.e);
// else noop: a fresh entry update error does not affect stale entry readers
// All callers must also call IoState callback, to propagate the error.
}
void
-Rock::SwapDir::unlink(StoreEntry &e)
+Rock::SwapDir::evictIfFound(const cache_key *key)
{
- debugs(47, 5, HERE << e);
- ignoreReferences(e);
- map->freeEntry(e.swap_filen);
- disconnect(e);
+ if (map)
+ map->freeEntryByKey(key); // may not be there
}
void
-Rock::SwapDir::markForUnlink(StoreEntry &e)
+Rock::SwapDir::evictCached(StoreEntry &e)
{
debugs(47, 5, e);
- map->freeEntry(e.swap_filen);
+ if (e.hasDisk(index)) {
+ if (map->freeEntry(e.swap_filen))
+ CollapsedForwarding::Broadcast(e);
+ if (!e.locked())
+ disconnect(e);
+ } else if (const auto key = e.publicKey()) {
+ evictIfFound(key);
+ }
}
void
return spacesPath.termedBuf();
}
+bool
+Rock::SwapDir::hasReadableEntry(const StoreEntry &e) const
+{
+ return map->hasReadableEntry(reinterpret_cast<const cache_key*>(e.key));
+}
+
namespace Rock
{
RunnerRegistrationEntry(SwapDirRr);
// TODO: somehow remove pool id and counters from PageStack?
Ipc::Mem::Owner<Ipc::Mem::PageStack> *const freeSlotsOwner =
shm_new(Ipc::Mem::PageStack)(sd->freeSlotsPath(),
- i+1, capacity, 0);
+ Ipc::Mem::PageStack::IdForSwapDirSpace(i),
+ capacity,
+ 0);
freeSlotsOwners.push_back(freeSlotsOwner);
// TODO: add method to initialize PageStack with no free pages