/// open shared memory segment
static void Init();
- /// XXX: remove
- static void NewData(const StoreIOState &sio);
-
- /// notify other workers that new data is available
+ /// notify other workers about changes in entry state (e.g., new data)
static void Broadcast(const cache_key *key);
/// kick worker with empty IPC queue
url = xstrdup(aUrl);
}
-MemObject::MemObject(char const *aUrl, char const *aLog_url): mem_index(-1)
+MemObject::MemObject(char const *aUrl, char const *aLog_url):
+ smpCollapsed(false)
{
debugs(20, 3, HERE << "new MemObject " << this);
_reply = new HttpReply;
assert(chksum == url_checksum(url));
#endif
- assert(mem_index < 0);
- if (!shutting_down)
+ if (!shutting_down) { // Store::Root() is FATALly missing during shutdown
+ // TODO: Consider moving these to destroyMemoryObject while providing
+ // StoreEntry::memObjForDisconnect() or similar to get access to the
+ // hidden memory object
+ if (xitTable.index >= 0)
+ Store::Root().transientsDisconnect(*this);
+ if (memCache.index >= 0)
+ Store::Root().memoryDisconnect(*this);
+
+ assert(xitTable.index < 0);
+ assert(memCache.index < 0);
assert(swapout.sio == NULL);
+ }
data_hdr.freeContent();
SwapOut swapout;
+ /// State of an entry with regards to the [shared] in-transit table.
+ class XitTable {
+ public:
+ XitTable(): index(-1) {}
+
+ int32_t index; ///< entry position inside the in-transit table
+ };
+ XitTable xitTable; ///< current [shared] memory caching state for the entry
+
+ /// State of an entry with regards to the [shared] memory caching.
+ class MemCache {
+ public:
+ MemCache(): index(-1), offset(0), io(ioUndecided) {}
+
+ int32_t index; ///< entry position inside the memory cache
+ int64_t offset; ///< bytes written/read to/from the memory cache so far
+
+ /// I/O direction and status
+ typedef enum { ioUndecided, ioWriting, ioReading, ioDone } Io;
+ Io io; ///< current I/O state
+ };
+ MemCache memCache; ///< current [shared] memory caching state for the entry
+
+ bool smpCollapsed; ///< whether this entry gets data from another worker
+
/* Read only - this reply must be preserved by store clients */
/* The original reply. possibly with updated metadata. */
HttpRequest *request;
} abort;
char *log_url;
RemovalPolicyNode repl;
- int32_t mem_index; ///< entry position inside the [shared] memory cache
int id;
int64_t object_sz;
size_t swap_hdr_sz;
// we copied everything we could to local memory; no more need to lock
map->closeForReading(index);
- e->mem_obj->mem_index = -1;
+ e->mem_obj->memCache.index = -1;
+ e->mem_obj->memCache.io = MemObject::MemCache::ioDone;
e->hideMemObject();
}
bool
-MemStore::anchorCollapsed(StoreEntry &collapsed)
+MemStore::anchorCollapsed(StoreEntry &collapsed, bool &inSync)
{
if (!map)
return false;
return false;
anchorEntry(collapsed, index, *slot);
- return updateCollapsedWith(collapsed, index, *slot);
+ inSync = updateCollapsedWith(collapsed, index, *slot);
+ return true; // even if inSync is false
}
bool
MemStore::updateCollapsed(StoreEntry &collapsed)
{
- if (!map)
- return false;
+ assert(collapsed.mem_status == IN_MEMORY);
+ MemObject *mem_obj = collapsed.findMemObject();
+ assert(mem_obj);
- if (collapsed.mem_status != IN_MEMORY) // no longer using a memory cache
- return false;
-
- const sfileno index = collapsed.mem_obj->mem_index;
+ const sfileno index = mem_obj->memCache.index;
// already disconnected from the cache, no need to update
if (index < 0)
return true;
+ if (!map)
+ return false;
+
const Ipc::StoreMapAnchor &anchor = map->readableEntry(index);
return updateCollapsedWith(collapsed, index, anchor);
}
e.flags = basics.flags;
assert(e.mem_obj);
- e.store_status = STORE_OK;
+ if (anchor.complete()) {
+ e.store_status = STORE_OK;
+ e.mem_obj->object_sz = e.swap_file_sz;
+ } else {
+ e.store_status = STORE_PENDING;
+ assert(e.mem_obj->object_sz < 0);
+ }
e.setMemStatus(IN_MEMORY);
- e.mem_obj->mem_index = index;
assert(e.swap_status == SWAPOUT_NONE); // set in StoreEntry constructor
e.ping_status = PING_NONE;
EBIT_CLR(e.flags, RELEASE_REQUEST);
EBIT_CLR(e.flags, KEY_PRIVATE);
EBIT_SET(e.flags, ENTRY_VALIDATED);
+
+ MemObject::MemCache &mc = e.mem_obj->memCache;
+ mc.index = index;
+ mc.io = MemObject::MemCache::ioReading;
}
/// copies the entire entry from shared to local memory
MemStore::copyFromShm(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnchor &anchor)
{
debugs(20, 7, "mem-loading entry " << index << " from " << anchor.start);
+ assert(e.mem_obj);
// emulate the usual Store code but w/o inapplicable checks and callbacks:
// slice state may change during copying; take snapshots now
wasEof = anchor.complete() && slice.next < 0;
const Ipc::StoreMapSlice::Size wasSize = slice.size;
+
+ debugs(20, 9, "entry " << index << " slice " << sid << " eof " <<
+ wasEof << " wasSize " << wasSize << " <= " <<
+ anchor.basics.swap_file_sz << " sliceOffset " << sliceOffset <<
+ " mem.endOffset " << e.mem_obj->endOffset());
if (e.mem_obj->endOffset() < sliceOffset + wasSize) {
// size of the slice data that we already copied
const int result = rep->httpMsgParseStep(mb.buf, buf.length, eof);
if (result > 0) {
assert(rep->pstate == psParsed);
+ EBIT_CLR(e.flags, ENTRY_FWD_HDR_WAIT);
} else if (result < 0) {
debugs(20, DBG_IMPORTANT, "Corrupted mem-cached headers: " << e);
return false;
return true;
}
+/// whether we should cache the entry
bool
-MemStore::keepInLocalMemory(const StoreEntry &e) const
+MemStore::shouldCache(const StoreEntry &e) const
{
+ if (e.mem_status == IN_MEMORY) {
+ debugs(20, 5, "already loaded from mem-cache: " << e);
+ return false;
+ }
+
+ if (e.mem_obj && e.mem_obj->memCache.offset > 0) {
+ debugs(20, 5, "already written to mem-cache: " << e);
+ return false;
+ }
+
if (!e.memoryCachable()) {
debugs(20, 7, HERE << "Not memory cachable: " << e);
return false; // will not cache due to entry state or properties
}
assert(e.mem_obj);
- const int64_t loadedSize = e.mem_obj->endOffset();
const int64_t expectedSize = e.mem_obj->expectedReplySize(); // may be < 0
+
+ // objects of unknown size are not allowed into memory cache, for now
+ if (expectedSize < 0) {
+ debugs(20, 5, HERE << "Unknown expected size: " << e);
+ return false;
+ }
+
+ const int64_t loadedSize = e.mem_obj->endOffset();
const int64_t ramSize = max(loadedSize, expectedSize);
if (ramSize > maxObjectSize()) {
return false; // will not cache due to cachable entry size limits
}
+ if (!map) {
+ debugs(20, 5, HERE << "No map to mem-cache " << e);
+ return false;
+ }
+
return true;
}
-void
-MemStore::considerKeeping(StoreEntry &e)
+/// locks map anchor and preps to store the entry in shared memory
+bool
+MemStore::startCaching(StoreEntry &e)
{
- if (!keepInLocalMemory(e))
- return;
-
- // since we copy everything at once, we can only keep complete entries
- if (e.store_status != STORE_OK) {
- debugs(20, 7, HERE << "Incomplete: " << e);
- return;
+ sfileno index = 0;
+ Ipc::StoreMapAnchor *slot = map->openForWriting(reinterpret_cast<const cache_key *>(e.key), index);
+ if (!slot) {
+ debugs(20, 5, HERE << "No room in mem-cache map to index " << e);
+ return false;
}
- if (e.mem_status == IN_MEMORY) {
- debugs(20, 5, "already mem-cached: " << e);
- return;
+ assert(e.mem_obj);
+ e.mem_obj->memCache.index = index;
+ e.mem_obj->memCache.io = MemObject::MemCache::ioWriting;
+ slot->set(e);
+ map->startAppending(index);
+ return true;
+}
+
+/// copies all local data to shared memory
+void
+MemStore::copyToShm(StoreEntry &e)
+{
+ // prevents remote readers from getting ENTRY_FWD_HDR_WAIT entries and
+ // not knowing when the wait is over
+ if (EBIT_TEST(e.flags, ENTRY_FWD_HDR_WAIT)) {
+ debugs(20, 5, "postponing copying " << e << " for ENTRY_FWD_HDR_WAIT");
+ return;
}
+ assert(map);
assert(e.mem_obj);
- const int64_t loadedSize = e.mem_obj->endOffset();
- const int64_t expectedSize = e.mem_obj->expectedReplySize();
+ const int32_t index = e.mem_obj->memCache.index;
+ assert(index >= 0);
+ Ipc::StoreMapAnchor &anchor = map->writeableEntry(index);
- // objects of unknown size are not allowed into memory cache, for now
- if (expectedSize < 0) {
- debugs(20, 5, HERE << "Unknown expected size: " << e);
- return;
+ const int64_t eSize = e.mem_obj->endOffset();
+ if (e.mem_obj->memCache.offset >= eSize) {
+ debugs(20, 5, "postponing copying " << e << " for lack of news: " <<
+ e.mem_obj->memCache.offset << " >= " << eSize);
+ return; // nothing to do (yet)
}
- // since we copy everything at once, we can only keep fully loaded entries
- if (loadedSize != expectedSize) {
- debugs(20, 7, HERE << "partially loaded: " << loadedSize << " != " <<
- expectedSize);
- return;
+ if (anchor.start < 0) { // must allocate the very first slot for e
+ Ipc::Mem::PageId page;
+ anchor.start = reserveSapForWriting(page); // throws
+ map->extras(anchor.start).page = page;
}
- keep(e); // may still fail
-}
+ lastWritingSlice = anchor.start;
+ const size_t sliceCapacity = Ipc::Mem::PageSize();
-/// locks map anchor and calls copyToShm to store the entry in shared memory
-void
-MemStore::keep(StoreEntry &e)
-{
- if (!map) {
- debugs(20, 5, HERE << "No map to mem-cache " << e);
- return;
- }
+ // fill, skip slices that are already full
+ // Optimize: remember lastWritingSlice in e.mem_obj
+ while (e.mem_obj->memCache.offset < eSize) {
+ Ipc::StoreMap::Slice &slice =
+ map->writeableSlice(e.mem_obj->memCache.index, lastWritingSlice);
- sfileno index = 0;
- Ipc::StoreMapAnchor *slot = map->openForWriting(reinterpret_cast<const cache_key *>(e.key), index);
- if (!slot) {
- debugs(20, 5, HERE << "No room in mem-cache map to index " << e);
- return;
- }
+ if (slice.size >= sliceCapacity) {
+ if (slice.next >= 0) {
+ lastWritingSlice = slice.next;
+ continue;
+ }
- try {
- if (copyToShm(e, index, *slot)) {
- slot->set(e);
- map->closeForWriting(index, false);
- CollapsedForwarding::Broadcast(static_cast<const cache_key*>(e.key));
- return;
- }
- // fall through to the error handling code
- }
- catch (const std::exception &x) { // TODO: should we catch ... as well?
- debugs(20, 2, "mem-caching error writing entry " << index <<
- ' ' << e << ": " << x.what());
- // fall through to the error handling code
+ Ipc::Mem::PageId page;
+ slice.next = lastWritingSlice = reserveSapForWriting(page);
+ map->extras(lastWritingSlice).page = page;
+ debugs(20, 7, "entry " << index << " new slice: " << lastWritingSlice);
+ }
+
+ copyToShmSlice(e, anchor);
}
- map->abortIo(index);
- CollapsedForwarding::Broadcast(static_cast<cache_key*>(e.key));
+ anchor.basics.swap_file_sz = e.mem_obj->memCache.offset;
+ debugs(20, 7, "mem-cached available " << eSize << " bytes of " << e);
}
-/// copies all local data to shared memory
-bool
-MemStore::copyToShm(StoreEntry &e, const sfileno index, Ipc::StoreMapAnchor &anchor)
+/// copies at most one slice worth of local memory to shared memory
+void
+MemStore::copyToShmSlice(StoreEntry &e, Ipc::StoreMapAnchor &anchor)
{
- const int64_t eSize = e.mem_obj->endOffset();
- int64_t offset = 0;
- lastWritingSlice = -1;
- while (offset < eSize) {
- if (!copyToShmSlice(e, index, anchor, offset))
- return false;
- }
-
- // check that we kept everything or purge incomplete/sparse cached entry
- if (eSize != offset) {
- debugs(20, 2, "Failed to mem-cache " << e << ": " <<
- eSize << " != " << offset);
- return false;
- }
+ Ipc::StoreMap::Slice &slice =
+ map->writeableSlice(e.mem_obj->memCache.index, lastWritingSlice);
- debugs(20, 7, "mem-cached all " << eSize << " bytes of " << e);
- e.swap_file_sz = eSize;
-
- return true;
-}
-
-/// copies one slice worth of local memory to shared memory
-bool
-MemStore::copyToShmSlice(StoreEntry &e, const sfileno index, Ipc::StoreMapAnchor &anchor, int64_t &offset)
-{
- Ipc::Mem::PageId page;
- Ipc::StoreMapSliceId sid = reserveSapForWriting(page); // throws
- assert(sid >= 0 && page);
- map->extras(sid).page = page; // remember the page location for cleanup
- debugs(20, 7, "entry " << index << " slice " << sid << " has " << page);
-
- // link this slice with other entry slices to form a store entry chain
- if (!offset) {
- assert(lastWritingSlice < 0);
- anchor.start = sid;
- debugs(20, 7, "entry " << index << " starts at slice " << sid);
- } else {
- assert(lastWritingSlice >= 0);
- map->writeableSlice(index, lastWritingSlice).next = sid;
- debugs(20, 7, "entry " << index << " slice " << lastWritingSlice <<
- " followed by slice " << sid);
- }
- lastWritingSlice = sid;
+ Ipc::Mem::PageId page = map->extras(lastWritingSlice).page;
+ assert(lastWritingSlice >= 0 && page);
+ debugs(20, 7, "entry " << e << " slice " << lastWritingSlice << " has " <<
+ page);
const int64_t bufSize = Ipc::Mem::PageSize();
- StoreIOBuffer sharedSpace(bufSize, offset,
- static_cast<char*>(PagePointer(page)));
+ const int64_t sliceOffset = e.mem_obj->memCache.offset % bufSize;
+ StoreIOBuffer sharedSpace(bufSize - sliceOffset, e.mem_obj->memCache.offset,
+ static_cast<char*>(PagePointer(page)) + sliceOffset);
// check that we kept everything or purge incomplete/sparse cached entry
const ssize_t copied = e.mem_obj->data_hdr.copy(sharedSpace);
if (copied <= 0) {
- debugs(20, 2, "Failed to mem-cache " << e << " using " <<
- bufSize << " bytes from " << offset << " in " << page);
- return false;
+ debugs(20, 2, "Failed to mem-cache " << (bufSize - sliceOffset) <<
+ " bytes of " << e << " from " << e.mem_obj->memCache.offset <<
+ " in " << page);
+ throw TexcHere("data_hdr.copy failure");
}
debugs(20, 7, "mem-cached " << copied << " bytes of " << e <<
- " from " << offset << " to " << page);
+ " from " << e.mem_obj->memCache.offset << " in " << page);
- Ipc::StoreMapSlice &slice = map->writeableSlice(index, sid);
- slice.next = -1;
- slice.size = copied;
-
- offset += copied;
- return true;
+ slice.size += copied;
+ e.mem_obj->memCache.offset += copied;
}
/// finds a slot and a free page to fill or throws
}
void
-MemStore::unlink(StoreEntry &e)
+MemStore::write(StoreEntry &e)
{
assert(e.mem_obj);
- if (e.mem_obj->mem_index >= 0) {
- map->freeEntry(e.mem_obj->mem_index);
- disconnect(e);
+
+ debugs(20, 7, "entry " << e);
+
+ switch (e.mem_obj->memCache.io) {
+ case MemObject::MemCache::ioUndecided:
+ if (!shouldCache(e) || !startCaching(e)) {
+ e.mem_obj->memCache.io = MemObject::MemCache::ioDone;
+ Store::Root().transientsAbandon(e);
+ CollapsedForwarding::Broadcast(static_cast<cache_key*>(e.key));
+ return;
+ }
+ break;
+
+ case MemObject::MemCache::ioDone:
+ case MemObject::MemCache::ioReading:
+ return; // we should not write in all of the above cases
+
+ case MemObject::MemCache::ioWriting:
+ break; // already decided to write and still writing
+ }
+
+ try {
+ copyToShm(e);
+ if (e.store_status == STORE_OK) // done receiving new content
+ completeWriting(e);
+ CollapsedForwarding::Broadcast(static_cast<cache_key*>(e.key));
+ return;
+ }
+ catch (const std::exception &x) { // TODO: should we catch ... as well?
+ debugs(20, 2, "mem-caching error writing entry " << e << ": " << x.what());
+ // fall through to the error handling code
+ }
+
+ Store::Root().transientsAbandon(e);
+ disconnect(*e.mem_obj);
+ CollapsedForwarding::Broadcast(static_cast<cache_key*>(e.key));
+}
+
+void
+MemStore::completeWriting(StoreEntry &e)
+{
+ assert(e.mem_obj);
+ const int32_t index = e.mem_obj->memCache.index;
+ assert(index >= 0);
+ assert(map);
+
+ debugs(20, 5, "mem-cached all " << e.mem_obj->memCache.offset << " bytes of " << e);
+
+ e.mem_obj->memCache.index = -1;
+ e.mem_obj->memCache.io = MemObject::MemCache::ioDone;
+ map->closeForWriting(index, false);
+}
+
+void
+MemStore::unlink(StoreEntry &e)
+{
+ assert(e.mem_status == IN_MEMORY);
+ MemObject *mem_obj = e.findMemObject();
+ assert(mem_obj);
+ if (mem_obj->memCache.index >= 0) {
+ map->freeEntry(mem_obj->memCache.index);
+ disconnect(*mem_obj);
} else {
+ // the entry was loaded and then disconnected from the memory cache
map->freeEntryByKey(reinterpret_cast<cache_key*>(e.key));
}
- e.destroyMemObject();
+
+ e.destroyMemObject(); // XXX: but it may contain useful info such as a client list. The old code used to do that though, right?
}
void
-MemStore::disconnect(StoreEntry &e)
+MemStore::disconnect(MemObject &mem_obj)
{
- assert(e.mem_obj);
- if (e.mem_obj->mem_index >= 0) {
- map->abortIo(e.mem_obj->mem_index);
- e.mem_obj->mem_index = -1;
+ if (mem_obj.memCache.index >= 0) {
+ if (mem_obj.memCache.io == MemObject::MemCache::ioWriting) {
+ map->abortWriting(mem_obj.memCache.index);
+ } else {
+ assert(mem_obj.memCache.io == MemObject::MemCache::ioReading);
+ map->closeForReading(mem_obj.memCache.index);
+ }
+ mem_obj.memCache.index = -1;
+ mem_obj.memCache.io = MemObject::MemCache::ioDone;
}
}
MemStore();
virtual ~MemStore();
- /// cache the entry or forget about it until the next considerKeeping call
- void considerKeeping(StoreEntry &e);
-
/// whether e should be kept in local RAM for possible future caching
bool keepInLocalMemory(const StoreEntry &e) const;
+ /// copy non-shared entry data of the being-cached entry to our cache
+ void write(StoreEntry &e);
+
+ /// all data has been received; there will be no more write() calls
+ void completeWriting(StoreEntry &e);
+
/// remove from the cache
void unlink(StoreEntry &e);
/// called when the entry is about to forget its association with mem cache
- void disconnect(StoreEntry &e);
+ void disconnect(MemObject &mem_obj);
/* Store API */
virtual int callback();
virtual void reference(StoreEntry &);
virtual bool dereference(StoreEntry &, bool);
virtual void maintain();
- virtual bool anchorCollapsed(StoreEntry &collapsed);
+ virtual bool anchorCollapsed(StoreEntry &collapsed, bool &inSync);
virtual bool updateCollapsed(StoreEntry &collapsed);
static int64_t EntryLimit();
protected:
- void keep(StoreEntry &e);
+ bool shouldCache(const StoreEntry &e) const;
+ bool startCaching(StoreEntry &e);
- bool copyToShm(StoreEntry &e, const sfileno index, Ipc::StoreMapAnchor &anchor);
- bool copyToShmSlice(StoreEntry &e, const sfileno index, Ipc::StoreMapAnchor &anchor, int64_t &offset);
+ void copyToShm(StoreEntry &e);
+ void copyToShmSlice(StoreEntry &e, Ipc::StoreMapAnchor &anchor);
bool copyFromShm(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnchor &anchor);
bool copyFromShmSlice(StoreEntry &e, const StoreIOBuffer &buf, bool eof);
bool memoryCachable() const; ///< may be cached in memory
void createMemObject(const char *, const char *);
void hideMemObject(); ///< no mem_obj for callers until createMemObject
+
+ /// Memory cache needs this to get access to memCache.index of entries for
+ /// which the caller did not call StoreEntry::createMemObject().
+ MemObject *findMemObject() { return mem_obj ? mem_obj : hidden_mem_obj; }
+
void dump(int debug_lvl) const;
void hashDelete();
void hashInsert(const cache_key *);
// XXX: This method belongs to Store::Root/StoreController, but it is here
// because test cases use non-StoreController derivatives as Root
/// called to get rid of no longer needed entry data in RAM, if any
- virtual void maybeTrimMemory(StoreEntry &e, const bool preserveSwappable) {}
+ virtual void memoryOut(StoreEntry &e, const bool preserveSwappable) {}
// XXX: This method belongs to Store::Root/StoreController, but it is here
// to avoid casting Root() to StoreController until Root() API is fixed.
/// Update local intransit entry after changes made by appending worker.
virtual void syncCollapsed(const cache_key *key) {}
+ // XXX: This method belongs to Store::Root/StoreController, but it is here
+ // to avoid casting Root() to StoreController until Root() API is fixed.
+ /// calls Root().transients->abandon() if transients are tracked
+ virtual void transientsAbandon(StoreEntry &e) {}
+
+ // XXX: This method belongs to Store::Root/StoreController, but it is here
+ // to avoid casting Root() to StoreController until Root() API is fixed.
+ /// disassociates the entry from the intransit table
+ virtual void transientsDisconnect(MemObject &mem_obj) {}
+
// XXX: This method belongs to Store::Root/StoreController, but it is here
// to avoid casting Root() to StoreController until Root() API is fixed.
/// removes the entry from the memory cache
virtual void memoryUnlink(StoreEntry &e) {}
- /// if the entry is found, tie it to this cache and call updateCollapsed()
- virtual bool anchorCollapsed(StoreEntry &collapsed) { return false; }
+ // XXX: This method belongs to Store::Root/StoreController, but it is here
+ // to avoid casting Root() to StoreController until Root() API is fixed.
+ /// disassociates the entry from the memory cache, preserving cached data
+ virtual void memoryDisconnect(MemObject &mem_obj) {}
+
+ /// If the entry is not found, return false. Otherwise, return true after
+ /// tying the entry to this cache and setting inSync to updateCollapsed().
+ virtual bool anchorCollapsed(StoreEntry &collapsed, bool &inSync) { return false; }
/// update a local collapsed entry with fresh info from this cache (if any)
virtual bool updateCollapsed(StoreEntry &collapsed) { return false; }
/* Store parent API */
virtual void handleIdleEntry(StoreEntry &e);
- virtual void maybeTrimMemory(StoreEntry &e, const bool preserveSwappable);
+ virtual void transientsAbandon(StoreEntry &e);
+ virtual void transientsDisconnect(MemObject &mem_obj);
+ virtual void memoryOut(StoreEntry &e, const bool preserveSwappable);
virtual void memoryUnlink(StoreEntry &e);
+ virtual void memoryDisconnect(MemObject &mem_obj);
virtual void allowCollapsing(StoreEntry *e, const RequestFlags &reqFlags, const HttpRequestMethod &reqMethod);
virtual void syncCollapsed(const cache_key *key);
private:
void createOneStore(Store &aStore);
bool keepForLocalMemoryCache(const StoreEntry &e) const;
- bool anchorCollapsedOnDisk(StoreEntry &collapsed);
+ bool anchorCollapsedOnDisk(StoreEntry &collapsed, bool &inSync);
StorePointer swapDir; ///< summary view of all disk caches
MemStore *memStore; ///< memory cache
Transients::Transients(): map(NULL)
{
-debugs(0,0, "Transients::ctor");
}
Transients::~Transients()
if (!map->openForReading(key, index))
return NULL;
+ if (StoreEntry *e = copyFromShm(index))
+ return e; // keep read lock to receive updates from others
+
+ // loading failure
+ map->closeForReading(index);
+ return NULL;
+}
+
+StoreEntry *
+Transients::copyFromShm(const sfileno index)
+{
const TransientsMap::Extras &extras = map->extras(index);
// create a brand new store entry and initialize it with stored info
assert(e->mem_obj);
e->mem_obj->method = extras.reqMethod;
-
- // we copied everything we could to local memory; no more need to lock
- map->closeForReading(index);
+ e->mem_obj->xitTable.index = index;
// XXX: overwriting storeCreateEntry() which calls setPrivateKey() if
// neighbors_do_private_keys (which is true in most cases and by default).
e->setPublicKey();
assert(e->key);
+ // How do we know its SMP- and not just locally-collapsed? A worker gets
+ // locally-collapsed entries from the local store_table, not Transients.
+ // TODO: Can we remove smpCollapsed by not syncing non-transient entries?
+ e->mem_obj->smpCollapsed = true;
+
return e;
}
const HttpRequestMethod &reqMethod)
{
assert(e);
+ assert(e->mem_obj);
+ assert(e->mem_obj->xitTable.index < 0);
if (!map) {
debugs(20, 5, "No map to add " << *e);
sfileno index = 0;
Ipc::StoreMapAnchor *slot = map->openForWriting(reinterpret_cast<const cache_key *>(e->key), index);
if (!slot) {
- debugs(20, 5, "No room in map to index " << *e);
+ debugs(20, 5, "collision registering " << *e);
return;
}
try {
if (copyToShm(*e, index, reqFlags, reqMethod)) {
slot->set(*e);
- map->closeForWriting(index, false);
+ e->mem_obj->xitTable.index = index;
+ map->startAppending(index);
+ // keep write lock -- we will be supplying others with updates
return;
}
// fall through to the error handling code
// fall through to the error handling code
}
- map->abortIo(index);
+ map->abortWriting(index);
}
extras.url[urlLen] = '\0';
extras.reqFlags = reqFlags;
-
Must(reqMethod != Http::METHOD_OTHER);
extras.reqMethod = reqMethod.id();
// TODO: we should probably find the entry being deleted and abort it
}
+void
+Transients::abandon(const StoreEntry &e)
+{
+ assert(e.mem_obj && map);
+ map->freeEntry(e.mem_obj->xitTable.index); // just marks the locked entry
+ // We do not unlock the entry now because the problem is most likely with
+ // the server resource rather than a specific cache writer, so we want to
+ // prevent other readers from collapsing requests for that resource.
+}
+
+bool
+Transients::abandoned(const StoreEntry &e) const
+{
+ assert(e.mem_obj);
+ return abandonedAt(e.mem_obj->xitTable.index);
+}
+
+/// whether an in-transit entry at the index is now abandoned by its writer
+bool
+Transients::abandonedAt(const sfileno index) const
+{
+ assert(map);
+ return map->readableEntry(index).waitingToBeFreed;
+}
+
+void
+Transients::disconnect(MemObject &mem_obj)
+{
+ assert(mem_obj.xitTable.index >= 0 && map);
+ map->freeEntry(mem_obj.xitTable.index); // just marks the locked entry
+ mem_obj.xitTable.index = -1;
+ // We do not unlock the entry now because the problem is most likely with
+ // the server resource rather than a specific cache writer, so we want to
+ // prevent other readers from collapsing requests for that resource.
+}
+
/// calculates maximum number of entries we need to store and map
int64_t
Transients::EntryLimit()
void TransientsRr::create(const RunnerRegistry &)
{
-debugs(0,0, "TransientsRr::create1: " << Config.onoff.collapsed_forwarding);
if (!Config.onoff.collapsed_forwarding)
return;
const int64_t entryLimit = Transients::EntryLimit();
-debugs(0,0, "TransientsRr::create2: " << entryLimit);
if (entryLimit <= 0)
return; // no SMP configured or a misconfiguration
/// add an in-transit entry suitable for collapsing future requests
void put(StoreEntry *e, const RequestFlags &reqFlags, const HttpRequestMethod &reqMethod);
- /// cache the entry or forget about it until the next considerKeeping call
- /// XXX: remove void considerKeeping(StoreEntry &e);
+ /// the calling entry writer no longer expects to cache this entry
+ void abandon(const StoreEntry &e);
- /// whether e should be kept in local RAM for possible future caching
- /// XXX: remove bool keepInLocalMemory(const StoreEntry &e) const;
+ /// whether an in-transit entry is now abandoned by its writer
+ bool abandoned(const StoreEntry &e) const;
+
+ /// the calling entry writer no longer expects to cache this entry
+ void disconnect(MemObject &mem_obj);
/* Store API */
virtual int callback();
static int64_t EntryLimit();
protected:
+ StoreEntry *copyFromShm(const sfileno index);
bool copyToShm(const StoreEntry &e, const sfileno index, const RequestFlags &reqFlags, const HttpRequestMethod &reqMethod);
+ bool abandonedAt(const sfileno index) const;
+
// Ipc::StoreMapCleaner API
virtual void noteFreeMapSlice(const sfileno sliceId);
success = true;
} catch (const std::exception &ex) { // TODO: should we catch ... as well?
debugs(79, 2, "db write error: " << ex.what());
- dir->writeError(swap_filen);
+ dir->writeError(*e);
finishedWriting(DISK_ERROR);
// 'this' might be gone beyond this point; fall through to free buf
}
case writerGone:
assert(writeableAnchor_);
- dir->writeError(swap_filen); // abort a partially stored entry
+ dir->writeError(*e); // abort a partially stored entry
finishedWriting(DISK_ERROR);
return;
}
bool
-Rock::SwapDir::anchorCollapsed(StoreEntry &collapsed)
+Rock::SwapDir::anchorCollapsed(StoreEntry &collapsed, bool &inSync)
{
if (!map || !theFile || !theFile->canRead())
return false;
return false;
anchorEntry(collapsed, filen, *slot);
- return updateCollapsedWith(collapsed, *slot);
+ inSync = updateCollapsedWith(collapsed, *slot);
+ return false;
}
bool
// do not rely on e.swap_status here because there is an async delay
// before it switches from SWAPOUT_WRITING to SWAPOUT_DONE.
- // since e has swap_filen, its slot is locked for either reading or writing
- map->abortIo(e.swap_filen);
+ // since e has swap_filen, its slot is locked for reading and/or writing
+ // but it is difficult to know whether THIS worker is reading or writing e
+ if (e.swap_status == SWAPOUT_WRITING ||
+ (e.mem_obj && e.mem_obj->swapout.sio != NULL))
+ map->abortWriting(e.swap_filen);
+ else
+ map->closeForReading(e.swap_filen);
e.swap_dirn = -1;
e.swap_filen = -1;
e.swap_status = SWAPOUT_NONE;
sio.finishedWriting(errflag);
}
} else {
- writeError(sio.swap_filen);
+ writeError(*sio.e);
sio.finishedWriting(errflag);
// and hope that Core will call disconnect() to close the map entry
}
}
void
-Rock::SwapDir::writeError(const sfileno fileno)
+Rock::SwapDir::writeError(StoreEntry &e)
{
// Do not abortWriting here. The entry should keep the write lock
// instead of losing association with the store and confusing core.
- map->freeEntry(fileno); // will mark as unusable, just in case
+ map->freeEntry(e.swap_filen); // will mark as unusable, just in case
+
+ Store::Root().transientsAbandon(e);
+
// All callers must also call IoState callback, to propagate the error.
}
int64_t diskOffset(Ipc::Mem::PageId &pageId) const;
int64_t diskOffset(int filen) const;
- void writeError(const sfileno fileno);
+ void writeError(StoreEntry &e);
/* StoreMapCleaner API */
virtual void noteFreeMapSlice(const sfileno fileno);
protected:
/* Store API */
- virtual bool anchorCollapsed(StoreEntry &collapsed);
+ virtual bool anchorCollapsed(StoreEntry &collapsed, bool &inSync);
virtual bool updateCollapsed(StoreEntry &collapsed);
/* protected ::SwapDir API */
void
StoreEntry::hashInsert(const cache_key * someKey)
{
- debugs(20, 3, "StoreEntry::hashInsert: Inserting Entry " << this << " key '" << storeKeyText(someKey) << "'");
+ debugs(20, 3, "StoreEntry::hashInsert: Inserting Entry " << *this << " key '" << storeKeyText(someKey) << "'");
key = storeKeyDup(someKey);
hash_join(store_table, this);
}
// are we using a shared memory cache?
if (Config.memShared && IamWorkerProcess()) {
- // enumerate calling cases if shared memory is enabled
- assert(new_status != IN_MEMORY || EBIT_TEST(flags, ENTRY_SPECIAL));
// This method was designed to update replacement policy, not to
// actually purge something from the memory cache (TODO: rename?).
// Shared memory cache does not have a policy that needs updates.
rep->packHeadersInto(&p);
mem_obj->markEndOfReplyHeaders();
+ EBIT_CLR(flags, ENTRY_FWD_HDR_WAIT);
rep->body.packInto(&p);
os << "e:";
if (e.swap_filen > -1 || e.swap_dirn > -1)
- os << e.swap_filen << '@' << e.swap_dirn << '=';
+ os << e.swap_filen << '@' << e.swap_dirn;
+
+ os << '=';
// print only non-default status values, using unique letters
if (e.mem_status != NOT_IN_MEMORY ||
if (e.store_status != STORE_PENDING) os << 's';
if (e.swap_status != SWAPOUT_NONE) os << 'w' << e.swap_status;
if (e.ping_status != PING_NONE) os << 'p' << e.ping_status;
- os << '.';
}
// print only set flags, using unique letters
if (EBIT_TEST(e.flags, ENTRY_VALIDATED)) os << 'V';
if (EBIT_TEST(e.flags, ENTRY_BAD_LENGTH)) os << 'L';
if (EBIT_TEST(e.flags, ENTRY_ABORTED)) os << 'A';
- os << '/';
}
- return os << &e << '*' << e.lock_count;
+ if (e.mem_obj && e.mem_obj->smpCollapsed)
+ os << 'O';
+
+ return os << '/' << &e << '*' << e.lock_count;
}
/* NullStoreEntry */
PROF_stop(storeClient_kickReads);
copying = false;
+ anEntry->lock("store_client::copy"); // see deletion note below
+
storeClientCopy2(entry, this);
+ // Bug 3480: This store_client object may be deleted now if, for example,
+ // the client rejects the hit response copied above. Use on-stack pointers!
+
#if USE_ADAPTATION
- if (entry)
- entry->kickProducer();
+ anEntry->kickProducer();
#endif
+ anEntry->unlock("store_client::copy");
}
/*
PROF_stop(InvokeHandlers);
}
+// XXX: Does not account for remote readers of local writers, causing
+// premature StoreEntry aborts.
int
storePendingNClients(const StoreEntry * e)
{
}
/// updates the collapsed entry with the corresponding on-disk entry, if any
+/// In other words, the SwapDir::anchorCollapsed() API applied to all disks.
bool
-StoreController::anchorCollapsedOnDisk(StoreEntry &collapsed)
+StoreController::anchorCollapsedOnDisk(StoreEntry &collapsed, bool &inSync)
{
// TODO: move this loop to StoreHashIndex, just like the one in get().
if (const int cacheDirs = Config.cacheSwap.n_configured) {
if (!sd->active())
continue;
- if (sd->anchorCollapsed(collapsed)) {
+ if (sd->anchorCollapsed(collapsed, inSync)) {
debugs(20, 3, "cache_dir " << idx << " anchors " << collapsed);
return true;
}
}
void
-StoreController::maybeTrimMemory(StoreEntry &e, const bool preserveSwappable)
+StoreController::memoryOut(StoreEntry &e, const bool preserveSwappable)
{
bool keepInLocalMemory = false;
if (memStore)
- keepInLocalMemory = memStore->keepInLocalMemory(e);
+ memStore->write(e); // leave keepInLocalMemory false
else
keepInLocalMemory = keepForLocalMemoryCache(e);
e.destroyMemObject();
}
+void
+StoreController::memoryDisconnect(MemObject &mem_obj)
+{
+ if (memStore)
+ memStore->disconnect(mem_obj);
+ // else nothing to do for non-shared memory cache
+}
+
+void
+StoreController::transientsAbandon(StoreEntry &e)
+{
+ if (transients) {
+ assert(e.mem_obj);
+ if (e.mem_obj->xitTable.index >= 0)
+ transients->abandon(e);
+ }
+}
+
+void
+StoreController::transientsDisconnect(MemObject &mem_obj)
+{
+ if (transients)
+ transients->disconnect(mem_obj);
+}
+
void
StoreController::handleIdleEntry(StoreEntry &e)
{
// They are not managed [well] by any specific Store handled below.
keepInLocalMemory = true;
} else if (memStore) {
- memStore->considerKeeping(e);
// leave keepInLocalMemory false; memStore maintains its own cache
} else {
keepInLocalMemory = keepForLocalMemoryCache(e) && // in good shape and
e->makePublic(); // this is needed for both local and SMP collapsing
if (transients)
transients->put(e, reqFlags, reqMethod);
- debugs(20, 3, "may " << (transients ? "SMP" : "") << " collapse " << *e);
+ debugs(20, 3, "may " << (transients && e->mem_obj->xitTable.index >= 0 ?
+ "SMP-" : "locally-") << "collapse " << *e);
}
void
if (!collapsed) // the entry is no longer locally active, ignore the update
return;
+ if (collapsed->mem_obj && !collapsed->mem_obj->smpCollapsed) {
+ // this happens, e.g., when we tried collapsing but rejected the hit
+ debugs(20, 7, "not SMP-syncing not SMP-collapsed " << *collapsed);
+ return;
+ }
+ // XXX: collapsed->mem_obj may be still hidden here
+
debugs(20, 7, "syncing " << *collapsed);
+ bool found = false;
bool inSync = false;
- if (memStore && collapsed->mem_status == IN_MEMORY)
+ if (memStore && collapsed->mem_status == IN_MEMORY) {
+ found = true;
inSync = memStore->updateCollapsed(*collapsed);
- else if (collapsed->swap_filen >= 0)
+ } else if (collapsed->swap_filen >= 0) {
+ found = true;
inSync = collapsed->store()->updateCollapsed(*collapsed);
- else if (memStore && memStore->anchorCollapsed(*collapsed))
- inSync = true; // found in the memory cache
- else if (anchorCollapsedOnDisk(*collapsed))
- inSync = true; // found in a disk cache
+ } else {
+ if (memStore)
+ found = memStore->anchorCollapsed(*collapsed, inSync);
+ else if (Config.cacheSwap.n_configured)
+ found = anchorCollapsedOnDisk(*collapsed, inSync);
+ }
if (inSync) {
debugs(20, 5, "synced " << *collapsed);
collapsed->invokeHandlers();
- } else { // the backing entry is no longer cached; abort this hit
- debugs(20, 3, "aborting cacheless " << *collapsed);
+ } else if (found) { // unrecoverable problem syncing this entry
+ debugs(20, 3, "aborting " << *collapsed);
collapsed->abort();
+ } else { // the entry is still not in one of the caches
+ debugs(20, 7, "waiting " << *collapsed);
}
}
const bool weAreOrMayBeSwappingOut = swappingOut() || mayStartSwapOut();
- Store::Root().maybeTrimMemory(*this, weAreOrMayBeSwappingOut);
+ Store::Root().memoryOut(*this, weAreOrMayBeSwappingOut);
if (mem_obj->swapout.decision != MemObject::SwapOut::swPossible)
return; // nothing else to do
DelayId MemObject::mostBytesAllowed() const STUB_RETVAL(DelayId())
#endif
void MemObject::unlinkRequest() STUB
-void MemObject::write(StoreIOBuffer writeBuffer, STMCB *callback, void *callbackData) STUB
+void MemObject::write(const StoreIOBuffer &writeBuffer) STUB
void MemObject::replaceHttpReply(HttpReply *newrep) STUB
int64_t MemObject::lowestMemReaderOffset() const STUB_RETVAL(0)
void MemObject::kickReads() STUB