Removed ENTRY_CACHABLE. AFAICT, it was just negating RELEASE_REQUEST AFAICT.
Broadcast transients index instead of key because key may become private.
Squid uses private keys to resolve store_table collisions (among other
things). Thus, a public entry may become private at any time, at any worker.
Using keys results in collapsed entries getting stuck waiting for an update.
The transients index remains constant and can be used for reliable
synchronization.
Using transient index, however, requires storing a pointer to the transient
entry corresponding to that index. Otherwise, there is no API to find the
entry object when a notification comes: Store::Root().get() needs a key.
Mark an entry for release when setting its key from public to private. The old
code was only logging SWAP_LOG_DEL, but we now need to prevent requests in
other workers from collapsing on top of a now-private cache entry. In many
cases, such an entry is in trouble (but not all cases because private keys are
also used for store_table collision resolution).
Fixed syncing of abandoned entries.
Prevent new requests from collapsing on writer-less transient entries.
class CollapsedForwardingMsg
{
public:
- CollapsedForwardingMsg(): sender(-1) { key[0] = key[1] = 0; }
+ CollapsedForwardingMsg(): sender(-1), xitIndex(-1) {}
public:
- int sender; /// kid ID of sending process
- uint64_t key[2]; ///< StoreEntry key
+ int sender; ///< kid ID of sending process
+
+ /// transients index, so that workers can find [private] entries to sync
+ sfileno xitIndex;
};
// CollapsedForwarding
CollapsedForwardingMsg msg;
msg.sender = KidIdentifier;
- memcpy(msg.key, e.key, sizeof(msg.key));
+ msg.xitIndex = e.mem_obj->xitTable.index;
- debugs(17, 5, storeKeyText(static_cast<cache_key*>(e.key)) << " to " <<
- Config.workers << "-1 workers");
+ debugs(17, 5, e << " to " << Config.workers << "-1 workers");
// TODO: send only to workers who are waiting for data
for (int workerId = 1; workerId <= Config.workers; ++workerId) {
" != " << msg.sender);
}
- const cache_key *key = reinterpret_cast<const cache_key*>(msg.key);
- debugs(17, 7, "hadling " << storeKeyText(key));
- Store::Root().syncCollapsed(key);
- debugs(17, 7, "handled " << storeKeyText(key));
+ debugs(17, 7, "handling entry " << msg.xitIndex << " in transients_map");
+ Store::Root().syncCollapsed(msg.xitIndex);
+ debugs(17, 7, "handled entry " << msg.xitIndex << " in transients_map");
// XXX: stop and schedule an async call to continue
assert(++poppedCount < SQUID_MAXFD);
assert(e.swap_status == SWAPOUT_NONE); // set in StoreEntry constructor
e.ping_status = PING_NONE;
- EBIT_SET(e.flags, ENTRY_CACHABLE);
EBIT_CLR(e.flags, RELEASE_REQUEST);
EBIT_CLR(e.flags, KEY_PRIVATE);
EBIT_SET(e.flags, ENTRY_VALIDATED);
// XXX: This method belongs to Store::Root/StoreController, but it is here
// to avoid casting Root() to StoreController until Root() API is fixed.
/// Update local intransit entry after changes made by appending worker.
- virtual void syncCollapsed(const cache_key *key) {}
+ virtual void syncCollapsed(const sfileno xitIndex) {}
// XXX: This method belongs to Store::Root/StoreController, but it is here
// to avoid casting Root() to StoreController until Root() API is fixed.
virtual void memoryUnlink(StoreEntry &e);
virtual void memoryDisconnect(MemObject &mem_obj);
virtual void allowCollapsing(StoreEntry *e, const RequestFlags &reqFlags, const HttpRequestMethod &reqMethod);
- virtual void syncCollapsed(const cache_key *key);
+ virtual void syncCollapsed(const sfileno xitIndex);
virtual void init();
static const char *MapLabel = "transients_map";
-Transients::Transients(): map(NULL)
+Transients::Transients(): map(NULL), locals(NULL)
{
}
Transients::~Transients()
{
delete map;
+ delete locals;
}
void
Must(!map);
map = new TransientsMap(MapLabel);
map->cleaner = this;
+
+ locals = new Locals(entryLimit, NULL);
}
void
if (!anchor)
return NULL;
- // Without a writer, either the response has been cached already or we will
- // get stuck waiting for it to be cached (because nobody will cache it).
- if (!anchor->writing()) {
- debugs(20, 5, "ignoring writer-less entry " << index);
- } else if (StoreEntry *e = copyFromShm(index)) {
- return e; // keep read lock to receive updates from others
+ // If we already have a local entry, the store_table should have found it.
+ // Since it did not, the local entry key must have changed from public to
+ // private. We still need to keep the private entry around for syncing as
+ // its clients depend on it, but we should not allow new clients to join.
+ if (StoreEntry *oldE = locals->at(index)) {
+ debugs(20, 3, "not joining private " << *oldE);
+ assert(EBIT_TEST(oldE->flags, KEY_PRIVATE));
+ } else if (StoreEntry *newE = copyFromShm(index)) {
+ return newE; // keep read lock to receive updates from others
}
- // missing writer or loading failure
+ // private entry or loading failure
map->closeForReading(index);
return NULL;
}
// TODO: Can we remove smpCollapsed by not syncing non-transient entries?
e->mem_obj->smpCollapsed = true;
+ assert(!locals->at(index));
+ // We do not lock e because we do not want to prevent its destruction;
+ // e is tied to us via mem_obj so we will know when it is destructed.
+ locals->at(index) = e;
return e;
}
fatal("Transients::get(key,callback,data) should not be called");
}
+StoreEntry *
+Transients::findCollapsed(const sfileno index)
+{
+ if (!map)
+ return NULL;
+
+ if (StoreEntry *oldE = locals->at(index)) {
+ debugs(20, 5, "found " << *oldE << " at " << index << " in " << MapLabel);
+ assert(oldE->mem_obj && oldE->mem_obj->xitTable.index == index);
+ return oldE;
+ }
+
+ debugs(20, 3, "no entry at " << index << " in " << MapLabel);
+ return NULL;
+}
+
void
Transients::startWriting(StoreEntry *e, const RequestFlags &reqFlags,
const HttpRequestMethod &reqMethod)
{
if (e.mem_obj && e.mem_obj->xitTable.index >= 0) {
assert(e.mem_obj->xitTable.io == MemObject::ioWriting);
+ // there will be no more updates from us after this, so we must prevent
+ // future readers from joining
+ map->freeEntry(e.mem_obj->xitTable.index); // just marks the locked entry
map->closeForWriting(e.mem_obj->xitTable.index);
e.mem_obj->xitTable.index = -1;
e.mem_obj->xitTable.io = MemObject::ioDone;
assert(mem_obj.xitTable.io == MemObject::ioReading);
map->closeForReading(mem_obj.xitTable.index);
}
+ locals->at(mem_obj.xitTable.index) = NULL;
mem_obj.xitTable.index = -1;
mem_obj.xitTable.io = MemObject::ioDone;
}
#include "ipc/mem/PageStack.h"
#include "ipc/StoreMap.h"
#include "Store.h"
+#include <vector>
// StoreEntry restoration info not already stored by Ipc::StoreMap
struct TransientsMapExtras {
Transients();
virtual ~Transients();
+ /// return a local, previously collapsed entry
+ StoreEntry *findCollapsed(const sfileno xitIndex);
+
/// add an in-transit entry suitable for collapsing future requests
void startWriting(StoreEntry *e, const RequestFlags &reqFlags, const HttpRequestMethod &reqMethod);
virtual void noteFreeMapSlice(const sfileno sliceId);
private:
- TransientsMap *map; ///< index of mem-cached entries
+ /// shared packed info indexed by Store keys, for creating new StoreEntries
+ TransientsMap *map;
+
+ typedef std::vector<StoreEntry*> Locals;
+ /// local collapsed entries indexed by transient ID, for syncing old StoreEntries
+ Locals *locals;
};
// TODO: Why use Store as a base? We are not really a cache.
DELAY_SENDING,
RELEASE_REQUEST,
REFRESH_REQUEST,
- ENTRY_CACHABLE,
+ ENTRY_CACHABLE_RESERVED_FOR_FUTURE_USE,
ENTRY_DISPATCHED,
KEY_PRIVATE,
ENTRY_FWD_HDR_WAIT,
e->lastmod = lastmod;
e->refcount = refcount;
e->flags = flags;
- EBIT_SET(e->flags, ENTRY_CACHABLE);
EBIT_CLR(e->flags, RELEASE_REQUEST);
EBIT_CLR(e->flags, KEY_PRIVATE);
e->ping_status = PING_NONE;
e.swap_status = SWAPOUT_DONE;
e.ping_status = PING_NONE;
- EBIT_SET(e.flags, ENTRY_CACHABLE);
EBIT_CLR(e.flags, RELEASE_REQUEST);
EBIT_CLR(e.flags, KEY_PRIVATE);
EBIT_SET(e.flags, ENTRY_VALIDATED);
e->lastmod = lastmod;
e->refcount = refcount;
e->flags = newFlags;
- EBIT_SET(e->flags, ENTRY_CACHABLE);
EBIT_CLR(e->flags, RELEASE_REQUEST);
EBIT_CLR(e->flags, KEY_PRIVATE);
e->ping_status = PING_NONE;
* Authenticated requests can't be cached.
*/
e->release();
- } else if (EBIT_TEST(e->flags, ENTRY_CACHABLE) && !getCurrentOffset()) {
+ } else if (!EBIT_TEST(e->flags, RELEASE_REQUEST) && !getCurrentOffset()) {
e->setPublicKey();
} else {
e->release();
CommIoCbPtrFun(gopherSendComplete, gopherState));
Comm::Write(gopherState->serverConn, buf, strlen(buf), call, NULL);
- if (EBIT_TEST(gopherState->entry->flags, ENTRY_CACHABLE))
- gopherState->entry->setPublicKey(); /* Make it public */
+ gopherState->entry->makePublic();
}
/// \ingroup ServerProtocolGopherInternal
#define REFRESH_OVERRIDE(flag) 0
#endif
+ if (EBIT_TEST(entry->flags, RELEASE_REQUEST)) {
+ debugs(22, 3, "NO because " << *entry << " has been released.");
+ return 0;
+ }
+
// Check for Surrogate/1.0 protocol conditions
// NP: reverse-proxy traffic our parent server has instructed us never to cache
if (surrogateNoStore) {
if (EBIT_TEST(flags, REFRESH_REQUEST))
strcat(buf, "REFRESH_REQUEST,");
- if (EBIT_TEST(flags, ENTRY_CACHABLE))
- strcat(buf, "CACHABLE,");
-
if (EBIT_TEST(flags, ENTRY_DISPATCHED))
strcat(buf, "DISPATCHED,");
{
/* This object can be cached for a long time */
- if (EBIT_TEST(flags, ENTRY_CACHABLE))
+ if (!EBIT_TEST(flags, RELEASE_REQUEST))
setPublicKey();
}
/* This object should never be cached at all */
expireNow();
releaseRequest(); /* delete object when not used */
- /* releaseRequest clears ENTRY_CACHABLE flag */
}
void
{
/* This object may be negatively cached */
negativeCache();
-
- if (EBIT_TEST(flags, ENTRY_CACHABLE))
- setPublicKey();
+ makePublic();
}
size_t
if (EBIT_TEST(flags, RELEASE_REQUEST))
return;
- setReleaseFlag();
-
- /*
- * Clear cachable flag here because we might get called before
- * anyone else even looks at the cachability flag. Also, this
- * prevents httpMakePublic from really setting a public key.
- */
- EBIT_CLR(flags, ENTRY_CACHABLE);
+ setReleaseFlag(); // makes validToSend() false, preventing future hits
setPrivateKey();
}
return; /* is already private */
if (key) {
+ setReleaseFlag(); // will markForUnlink(); all caches/workers will know
+
+ // TODO: move into SwapDir::markForUnlink() already called by Root()
if (swap_filen > -1)
storeDirSwapLog(this, SWAP_LOG_DEL);
* store clients won't be able to access object data which has
* been freed from memory.
*
- * If RELEASE_REQUEST is set, then ENTRY_CACHABLE should not
- * be set, and StoreEntry::setPublicKey() should not be called.
+ * If RELEASE_REQUEST is set, setPublicKey() should not be called.
*/
#if MORE_DEBUG_OUTPUT
e->mem_obj->setUris(url, log_url, method);
if (flags.cachable) {
- EBIT_SET(e->flags, ENTRY_CACHABLE);
EBIT_CLR(e->flags, RELEASE_REQUEST);
} else {
- /* StoreEntry::releaseRequest() clears ENTRY_CACHABLE */
e->releaseRequest();
}
if (store_status == STORE_OK && EBIT_TEST(flags, ENTRY_BAD_LENGTH)) {
debugs(20, 2, "StoreEntry::checkCachable: NO: wrong content-length");
++store_check_cachable_hist.no.wrong_content_length;
- } else if (!EBIT_TEST(flags, ENTRY_CACHABLE)) {
+ } else if (EBIT_TEST(flags, RELEASE_REQUEST)) {
debugs(20, 2, "StoreEntry::checkCachable: NO: not cachable");
- ++store_check_cachable_hist.no.not_entry_cachable;
+ ++store_check_cachable_hist.no.not_entry_cachable; // TODO: rename?
} else if (EBIT_TEST(flags, ENTRY_NEGCACHED)) {
debugs(20, 3, "StoreEntry::checkCachable: NO: negative cached");
++store_check_cachable_hist.no.negative_cached;
}
releaseRequest();
- /* StoreEntry::releaseRequest() cleared ENTRY_CACHABLE */
return 0;
}
if (EBIT_TEST(e.flags, DELAY_SENDING)) os << 'P';
if (EBIT_TEST(e.flags, RELEASE_REQUEST)) os << 'X';
if (EBIT_TEST(e.flags, REFRESH_REQUEST)) os << 'F';
- if (EBIT_TEST(e.flags, ENTRY_CACHABLE)) os << 'C';
if (EBIT_TEST(e.flags, ENTRY_DISPATCHED)) os << 'D';
if (EBIT_TEST(e.flags, KEY_PRIVATE)) os << 'I';
if (EBIT_TEST(e.flags, ENTRY_FWD_HDR_WAIT)) os << 'W';
/* check various entry flags (mimics StoreEntry::checkCachable XXX) */
- if (!EBIT_TEST(e->flags, ENTRY_CACHABLE)) {
- debugs(71, 6, "storeDigestAddable: NO: not cachable");
- return 0;
- }
-
if (EBIT_TEST(e->flags, KEY_PRIVATE)) {
debugs(71, 6, "storeDigestAddable: NO: private key");
return 0;
}
void
-StoreController::syncCollapsed(const cache_key *key)
+StoreController::syncCollapsed(const sfileno xitIndex)
{
- StoreEntry *collapsed = swapDir->get(key);
- if (!collapsed) { // the entry is no longer locally active, ignore update
- debugs(20, 7, "not SMP-syncing not-local " << storeKeyText(key));
- return;
- }
-
- if (!collapsed->mem_obj) {
- // without mem_obj, the entry cannot be transient so we ignore it
- debugs(20, 7, "not SMP-syncing not-shared " << *collapsed);
- return;
- }
-
- if (collapsed->mem_obj->xitTable.index < 0) {
- debugs(20, 7, "not SMP-syncing not-transient " << *collapsed);
- return;
- }
-
- // this must be done before the abandoned() check because we may be looking
- // at an entry we are writing while abandoned() requires a reading lock.
- if (!collapsed->mem_obj->smpCollapsed) {
- // this happens, e.g., when we tried collapsing but rejected the hit
- // or a stale notification was received (and we are now the writer)
- debugs(20, 7, "not SMP-syncing not-SMP-collapsed " << *collapsed);
- return;
- }
-
assert(transients);
- if (transients->abandoned(*collapsed)) {
- debugs(20, 3, "aborting abandoned " << *collapsed);
- collapsed->abort();
+
+ StoreEntry *collapsed = transients->findCollapsed(xitIndex);
+ if (!collapsed) { // the entry is no longer locally active, ignore update
+ debugs(20, 7, "not SMP-syncing not-transient " << xitIndex);
return;
}
+ assert(collapsed->mem_obj);
+ assert(collapsed->mem_obj->smpCollapsed);
debugs(20, 7, "syncing " << *collapsed);
+ bool abandoned = transients->abandoned(*collapsed);
bool found = false;
bool inSync = false;
if (memStore && collapsed->mem_obj->memCache.io == MemObject::ioDone) {
found = anchorCollapsed(*collapsed, inSync);
}
+ if (abandoned && collapsed->store_status == STORE_PENDING) {
+ debugs(20, 3, "aborting abandoned but STORE_PENDING " << *collapsed);
+ collapsed->abort();
+ return;
+ }
+
if (inSync) {
debugs(20, 5, "synced " << *collapsed);
collapsed->invokeHandlers();
e->expires = squid_curtime;
e->lastmod = squid_curtime;
e->refcount = 1;
- EBIT_SET(e->flags, ENTRY_CACHABLE);
EBIT_CLR(e->flags, RELEASE_REQUEST);
EBIT_CLR(e->flags, KEY_PRIVATE);
e->ping_status = PING_NONE;
e->expires = squid_curtime;
e->lastmod = squid_curtime;
e->refcount = 1;
- EBIT_SET(e->flags, ENTRY_CACHABLE);
EBIT_CLR(e->flags, RELEASE_REQUEST);
EBIT_CLR(e->flags, KEY_PRIVATE);
e->ping_status = PING_NONE;
entry->timestampsSet();
entry->flush();
- if (!EBIT_TEST(entry->flags, RELEASE_REQUEST))
- entry->setPublicKey();
+ entry->makePublic();
fwd->complete();
debugs(75, 3, "whoisReadReply: Done: " << entry->url());