From 8253d451bad3c74742b0791fb2b874ca7c564e1b Mon Sep 17 00:00:00 2001 From: Alex Rousskov Date: Thu, 12 Apr 2018 22:12:39 +0000 Subject: [PATCH] Fixed Transient reader locking broken by 4310f8b (#161) The closeForWriting() call comment is correct -- we must keep the lock, but the optional argument was accidentally lost (when undoing the failed attempt to remove all long-term transient locks in the feature branch). Also polished stale comments (which led to the above bug discovery!). Also polished addEntry() aspects that I have missed in 4310f8b. --- src/MemStore.cc | 2 +- src/Transients.cc | 23 +++++++++++++---------- src/fs/rock/RockRebuild.cc | 2 +- src/fs/rock/RockSwapDir.cc | 5 ++--- src/ipc/MemMap.cc | 22 +++++++++++++++------- src/ipc/MemMap.h | 5 ++++- src/ipc/StoreMap.cc | 26 +++++++++++++++----------- src/ipc/StoreMap.h | 4 +++- 8 files changed, 54 insertions(+), 35 deletions(-) diff --git a/src/MemStore.cc b/src/MemStore.cc index f058730c31..15c7159423 100644 --- a/src/MemStore.cc +++ b/src/MemStore.cc @@ -882,7 +882,7 @@ MemStore::completeWriting(StoreEntry &e) e.mem_obj->memCache.index = -1; e.mem_obj->memCache.io = MemObject::ioDone; - map->closeForWriting(index, false); + map->closeForWriting(index); CollapsedForwarding::Broadcast(e); // before we close our transient entry! Store::Root().transientsCompleteWriting(e); diff --git a/src/Transients.cc b/src/Transients.cc index f1baa7a75e..0fe09ed9bb 100644 --- a/src/Transients.cc +++ b/src/Transients.cc @@ -189,14 +189,11 @@ Transients::findCollapsed(const sfileno index) void Transients::monitorIo(StoreEntry *e, const cache_key *key, const Store::IoStatus direction) { - assert(direction == Store::ioReading || direction == Store::ioWriting); - if (!e->hasTransients()) { addEntry(e, key, direction); - e->mem_obj->xitTable.io = direction; + assert(e->hasTransients()); } - assert(e->hasTransients()); const auto index = e->mem_obj->xitTable.index; if (const auto old = locals->at(index)) { assert(old == e); @@ -207,7 +204,7 @@ Transients::monitorIo(StoreEntry *e, const cache_key *key, const Store::IoStatus } } -/// creates a new Transients entry or throws +/// creates a new Transients entry void Transients::addEntry(StoreEntry *e, const cache_key *key, const Store::IoStatus direction) { @@ -221,14 +218,20 @@ Transients::addEntry(StoreEntry *e, const cache_key *key, const Store::IoStatus Ipc::StoreMapAnchor *slot = map->openForWriting(key, index); Must(slot); // no writer collisions - slot->set(*e, key); + // set ASAP in hope to unlock the slot if something throws e->mem_obj->xitTable.index = index; + e->mem_obj->xitTable.io = Store::ioWriting; + + slot->set(*e, key); if (direction == Store::ioWriting) { - // keep write lock; the caller will decide what to do with it - map->startAppending(e->mem_obj->xitTable.index); + // allow reading and receive remote DELETE events, but do not switch to + // the reading lock because transientReaders() callers want true readers + map->startAppending(index); } else { + assert(direction == Store::ioReading); // keep the entry locked (for reading) to receive remote DELETE events - map->closeForWriting(e->mem_obj->xitTable.index); + map->switchWritingToReading(index); + e->mem_obj->xitTable.io = Store::ioReading; } } @@ -255,7 +258,7 @@ Transients::completeWriting(const StoreEntry &e) { assert(e.hasTransients()); assert(isWriter(e)); - map->closeForWriting(e.mem_obj->xitTable.index, true); + map->switchWritingToReading(e.mem_obj->xitTable.index); e.mem_obj->xitTable.io = Store::ioReading; } diff --git a/src/fs/rock/RockRebuild.cc b/src/fs/rock/RockRebuild.cc index a2258b98a5..25fe4e713e 100644 --- a/src/fs/rock/RockRebuild.cc +++ b/src/fs/rock/RockRebuild.cc @@ -491,7 +491,7 @@ Rock::Rebuild::finalizeOrThrow(const sfileno fileNo, LoadingEntry &le) anchor.basics.swap_file_sz = le.size; EBIT_SET(anchor.basics.flags, ENTRY_VALIDATED); le.state(LoadingEntry::leLoaded); - sd->map->closeForWriting(fileNo, false); + sd->map->closeForWriting(fileNo); ++counts.objcount; } diff --git a/src/fs/rock/RockSwapDir.cc b/src/fs/rock/RockSwapDir.cc index 8f36cf29e5..7eec16381c 100644 --- a/src/fs/rock/RockSwapDir.cc +++ b/src/fs/rock/RockSwapDir.cc @@ -876,9 +876,8 @@ Rock::SwapDir::writeCompleted(int errflag, size_t, RefCount< ::WriteRequest> r) if (sio.touchingStoreEntry()) { sio.e->swap_file_sz = sio.writeableAnchor_->basics.swap_file_sz = sio.offset_; - - // close, the entry gets the read lock - map->closeForWriting(sio.swap_filen, true); + map->switchWritingToReading(sio.swap_filen); + // sio.e keeps the (now read) lock on the anchor } sio.writeableAnchor_ = NULL; sio.splicingPoint = request->sidCurrent; diff --git a/src/ipc/MemMap.cc b/src/ipc/MemMap.cc index 2ccd787511..a3b4adc7cd 100644 --- a/src/ipc/MemMap.cc +++ b/src/ipc/MemMap.cc @@ -88,17 +88,25 @@ Ipc::MemMap::openForWritingAt(const sfileno fileno, bool overwriteExisting) } void -Ipc::MemMap::closeForWriting(const sfileno fileno, bool lockForReading) +Ipc::MemMap::closeForWriting(const sfileno fileno) { - debugs(54, 5, "closing slot at " << fileno << " for writing and " - "openning for reading in map [" << path << ']'); + debugs(54, 5, "stop writing slot at " << fileno << + " in map [" << path << ']'); assert(valid(fileno)); Slot &s = shared->slots[fileno]; assert(s.writing()); - if (lockForReading) - s.lock.switchExclusiveToShared(); - else - s.lock.unlockExclusive(); + s.lock.unlockExclusive(); +} + +void +Ipc::MemMap::switchWritingToReading(const sfileno fileno) +{ + debugs(54, 5, "switching writing slot at " << fileno << + " to reading in map [" << path << ']'); + assert(valid(fileno)); + Slot &s = shared->slots[fileno]; + assert(s.writing()); + s.lock.switchExclusiveToShared(); } /// terminate writing the entry, freeing its slot for others to use diff --git a/src/ipc/MemMap.h b/src/ipc/MemMap.h index e5e409e2d3..f04e3f49e2 100644 --- a/src/ipc/MemMap.h +++ b/src/ipc/MemMap.h @@ -90,7 +90,10 @@ public: Slot *openForWritingAt(sfileno fileno, bool overwriteExisting = true); /// successfully finish writing the entry - void closeForWriting(const sfileno fileno, bool lockForReading = false); + void closeForWriting(const sfileno fileno); + + /// stop writing the locked entry and start reading it + void switchWritingToReading(const sfileno fileno); /// only works on locked entries; returns nil unless the slot is readable const Slot *peekAtReader(const sfileno fileno) const; diff --git a/src/ipc/StoreMap.cc b/src/ipc/StoreMap.cc index e474e45878..6640010510 100644 --- a/src/ipc/StoreMap.cc +++ b/src/ipc/StoreMap.cc @@ -155,20 +155,24 @@ Ipc::StoreMap::startAppending(const sfileno fileno) } void -Ipc::StoreMap::closeForWriting(const sfileno fileno, bool lockForReading) +Ipc::StoreMap::closeForWriting(const sfileno fileno) { Anchor &s = anchorAt(fileno); assert(s.writing()); - if (lockForReading) { - s.lock.switchExclusiveToShared(); - debugs(54, 5, "switched entry " << fileno << - " from writing to reading " << path); - assert(s.complete()); - } else { - s.lock.unlockExclusive(); - debugs(54, 5, "closed entry " << fileno << " for writing " << path); - // cannot assert completeness here because we have no lock - } + // TODO: assert(!s.empty()); // i.e., unlocked s becomes s.complete() + s.lock.unlockExclusive(); + debugs(54, 5, "closed entry " << fileno << " for writing " << path); + // cannot assert completeness here because we have no lock +} + +void +Ipc::StoreMap::switchWritingToReading(const sfileno fileno) +{ + debugs(54, 5, "switching entry " << fileno << " from writing to reading " << path); + Anchor &s = anchorAt(fileno); + assert(s.writing()); + s.lock.switchExclusiveToShared(); + assert(s.complete()); } Ipc::StoreMap::Slice & diff --git a/src/ipc/StoreMap.h b/src/ipc/StoreMap.h index bda1f28fea..ce3e9b2cfa 100644 --- a/src/ipc/StoreMap.h +++ b/src/ipc/StoreMap.h @@ -233,7 +233,9 @@ public: /// restrict opened for writing entry to appending operations; allow reads void startAppending(const sfileno fileno); /// successfully finish creating or updating the entry at fileno pos - void closeForWriting(const sfileno fileno, bool lockForReading = false); + void closeForWriting(const sfileno fileno); + /// stop writing (or updating) the locked entry and start reading it + void switchWritingToReading(const sfileno fileno); /// unlock and "forget" openForWriting entry, making it Empty again /// this call does not free entry slices so the caller has to do that void forgetWritingEntry(const sfileno fileno); -- 2.39.5