if (StoreEntry *oldE = locals->at(index)) {
debugs(20, 3, "not joining private " << *oldE);
assert(EBIT_TEST(oldE->flags, KEY_PRIVATE));
- map->closeForReading(index);
+ map->closeForReadingAndFreeIdle(index);
return nullptr;
}
}
}
+bool
+Transients::hasWriter(const StoreEntry &e)
+{
+ if (!e.hasTransients())
+ return false;
+ return map->peekAtWriter(e.mem_obj->xitTable.index);
+}
+
void
Transients::noteFreeMapSlice(const Ipc::StoreMapSliceId)
{
map->abortWriting(xitTable.index);
} else {
assert(isReader(entry));
- map->closeForReading(xitTable.index);
+ map->closeForReadingAndFreeIdle(xitTable.index);
}
locals->at(xitTable.index) = nullptr;
xitTable.index = -1;
bool isReader(const StoreEntry &) const;
/// whether the entry is in "writing to Transients" I/O state
bool isWriter(const StoreEntry &) const;
+ /// whether we or somebody else is in the "writing to Transients" I/O state
+ bool hasWriter(const StoreEntry &);
static int64_t EntryLimit();
unlockExclusive();
}
+bool
+Ipc::ReadWriteLock::unlockSharedAndSwitchToExclusive()
+{
+ assert(readers > 0);
+ if (!writeLevel++) { // we are the first writer + lock "new" readers out
+ assert(!appending);
+ unlockShared();
+ if (!readers) {
+ writing = true;
+ return true;
+ }
+ // somebody is still reading: fall through
+ } else {
+ // somebody is still writing: just stop reading
+ unlockShared();
+ }
+ --writeLevel;
+ return false;
+}
+
void
Ipc::ReadWriteLock::startAppending()
{
void unlockExclusive(); ///< undo successful exclusiveLock()
void unlockHeaders(); ///< undo successful lockHeaders()
void switchExclusiveToShared(); ///< stop writing, start reading
+ /// same as unlockShared() but also attempts to get a writer lock beforehand
+ /// \returns whether the writer lock was acquired
+ bool unlockSharedAndSwitchToExclusive();
void startAppending(); ///< writer keeps its lock but also allows reading
const Anchor &s = anchorAt(fileno);
if (s.reading())
return &s; // immediate access by lock holder so no locking
+ assert(s.writing()); // must be locked for reading or writing
+ return nullptr;
+}
+
+const Ipc::StoreMap::Anchor *
+Ipc::StoreMap::peekAtWriter(const sfileno fileno) const
+{
+ const Anchor &s = anchorAt(fileno);
if (s.writing())
- return NULL; // the caller is not a read lock holder
- assert(false); // must be locked for reading or writing
- return NULL;
+ return &s; // immediate access by lock holder so no locking
+ assert(s.reading()); // must be locked for reading or writing
+ return nullptr;
}
const Ipc::StoreMap::Anchor &
debugs(54, 5, "closed entry " << fileno << " for reading " << path);
}
+void
+Ipc::StoreMap::closeForReadingAndFreeIdle(const sfileno fileno)
+{
+ auto &s = anchorAt(fileno);
+ assert(s.reading());
+
+ if (!s.lock.unlockSharedAndSwitchToExclusive()) {
+ debugs(54, 5, "closed entry " << fileno << " for reading " << path);
+ return;
+ }
+
+ assert(s.writing());
+ assert(!s.reading());
+ freeChain(fileno, s, false);
+ debugs(54, 5, "closed idle entry " << fileno << " for reading " << path);
+}
+
bool
Ipc::StoreMap::openForUpdating(Update &update, const sfileno fileNoHint)
{
/// undoes partial update, unlocks, and cleans up
void abortUpdating(Update &update);
- /// only works on locked entries; returns nil unless the slice is readable
+ /// the caller must hold a lock on the entry
+ /// \returns nullptr unless the slice is readable
const Anchor *peekAtReader(const sfileno fileno) const;
- /// only works on locked entries; returns the corresponding Anchor
+ /// the caller must hold a lock on the entry
+ /// \returns nullptr unless the slice is writeable
+ const Anchor *peekAtWriter(const sfileno fileno) const;
+
+ /// the caller must hold a lock on the entry
+ /// \returns the corresponding Anchor
const Anchor &peekAtEntry(const sfileno fileno) const;
/// free the entry if possible or mark it as waiting to be freed if not
const Anchor *openForReadingAt(const sfileno fileno);
/// closes open entry after reading, decrements read level
void closeForReading(const sfileno fileno);
+ /// same as closeForReading() but also frees the entry if it is unlocked
+ void closeForReadingAndFreeIdle(const sfileno fileno);
/// writeable slice within an entry chain created by openForWriting()
Slice &writeableSlice(const AnchorId anchorId, const SliceId sliceId);
return entry;
} catch (const std::exception &ex) {
debugs(20, 2, "failed with " << *entry << ": " << ex.what());
- entry->release(true);
+ entry->release();
// fall through
}
}
const bool found = anchorToCache(entry, inSync);
if (found && !inSync)
throw TexcHere("cannot sync");
+ if (!found) {
+ // !found should imply hittingRequiresCollapsing() regardless of writer presence
+ if (!entry.hittingRequiresCollapsing()) {
+ debugs(20, DBG_IMPORTANT, "BUG: missing ENTRY_REQUIRES_COLLAPSING for " << entry);
+ throw TextException("transients entry missing ENTRY_REQUIRES_COLLAPSING", Here());
+ }
+
+ if (!transients->hasWriter(entry)) {
+ // prevent others from falling into the same trap
+ throw TextException("unattached transients entry missing writer", Here());
+ }
+ }
}
}