StoreEntry *e = new StoreEntry();
e->createMemObject();
- e->mem_obj->xitTable.index = index;
- e->mem_obj->xitTable.io = Store::ioReading;
- anchor->exportInto(*e);
-
- if (EBIT_TEST(anchor->basics.flags, ENTRY_REQUIRES_COLLAPSING)) {
- assert(hadWriter);
- e->setCollapsingRequirement(true);
- }
+ anchorEntry(*e, index, *anchor);
// keep read lock to receive updates from others
return e;
Must(map); // configured to track transients
+ if (direction == Store::ioWriting)
+ return addWriterEntry(*e, key);
+
+ assert(direction == Store::ioReading);
+ addReaderEntry(*e, key);
+}
+
+/// addEntry() helper used for cache entry creators/writers
+void
+Transients::addWriterEntry(StoreEntry &e, const cache_key *key)
+{
sfileno index = 0;
- Ipc::StoreMapAnchor *slot = map->openForWriting(key, index);
- Must(slot); // no writer collisions
+ const auto anchor = map->openForWriting(key, index);
+ if (!anchor)
+ throw TextException("writer collision", Here());
// set ASAP in hope to unlock the slot if something throws
- e->mem_obj->xitTable.index = index;
- e->mem_obj->xitTable.io = Store::ioWriting;
-
- slot->set(*e, key);
- if (direction == Store::ioWriting) {
- // allow reading and receive remote DELETE events, but do not switch to
- // the reading lock because transientReaders() callers want true readers
- map->startAppending(index);
- } else {
- assert(direction == Store::ioReading);
- // keep the entry locked (for reading) to receive remote DELETE events
- map->switchWritingToReading(index);
- e->mem_obj->xitTable.io = Store::ioReading;
- }
+ // and to provide index to such methods as hasWriter()
+ auto &xitTable = e.mem_obj->xitTable;
+ xitTable.index = index;
+ xitTable.io = Store::ioWriting;
+
+ anchor->set(e, key);
+ // allow reading and receive remote DELETE events, but do not switch to
+ // the reading lock because transientReaders() callers want true readers
+ map->startAppending(index);
+}
+
+/// addEntry() helper used for cache readers
+/// readers do not modify the cache, but they must create a Transients entry
+void
+Transients::addReaderEntry(StoreEntry &e, const cache_key *key)
+{
+ sfileno index = 0;
+ const auto anchor = map->openOrCreateForReading(key, index, e);
+ if (!anchor)
+ throw TextException("reader collision", Here());
+
+ anchorEntry(e, index, *anchor);
+ // keep the entry locked (for reading) to receive remote DELETE events
+}
+
+/// fills (recently created) StoreEntry with information currently in Transients
+void
+Transients::anchorEntry(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnchor &anchor)
+{
+ // set ASAP in hope to unlock the slot if something throws
+ // and to provide index to such methods as hasWriter()
+ auto &xitTable = e.mem_obj->xitTable;
+ xitTable.index = index;
+ xitTable.io = Store::ioReading;
+
+ const auto hadWriter = hasWriter(e); // before computing collapsingRequired
+ anchor.exportInto(e);
+ const bool collapsingRequired = EBIT_TEST(anchor.basics.flags, ENTRY_REQUIRES_COLLAPSING);
+ assert(!collapsingRequired || hadWriter);
+ e.setCollapsingRequirement(collapsingRequired);
}
bool
debugs(54, 8, "closed entry " << fileno << " for writing " << path);
}
+const Ipc::StoreMap::Anchor *
+Ipc::StoreMap::openOrCreateForReading(const cache_key *const key, sfileno &fileno, const StoreEntry &entry)
+{
+ debugs(54, 5, "opening/creating entry with key " << storeKeyText(key)
+ << " for reading " << path);
+
+ // start with reading so that we do not overwrite an existing unlocked entry
+ auto idx = fileNoByKey(key);
+ if (const auto anchor = openForReadingAt(idx, key)) {
+ fileno = idx;
+ return anchor;
+ }
+
+ // the competing openOrCreateForReading() workers race to create a new entry
+ idx = fileNoByKey(key);
+ if (auto anchor = openForWritingAt(idx)) {
+ anchor->set(entry, key);
+ anchor->lock.switchExclusiveToShared();
+ // race ended
+ assert(anchor->complete());
+ fileno = idx;
+ debugs(54, 5, "switched entry " << fileno << " from writing to reading " << path);
+ return anchor;
+ }
+
+ // we lost the above race; see if the winner-created entry is now readable
+ // TODO: Do some useful housekeeping work here to give the winner more time.
+ idx = fileNoByKey(key);
+ if (const auto anchor = openForReadingAt(idx, key)) {
+ fileno = idx;
+ return anchor;
+ }
+
+ // slow entry creator or some other problem
+ return nullptr;
+}
+
Ipc::StoreMap::Anchor *
Ipc::StoreMap::openForWriting(const cache_key *const key, sfileno &fileno)
{