#include "ipc/Port.h"
#include "ipc/TypedMsgHdr.h"
#include "CollapsedForwarding.h"
-#include "SquidConfig.h"
#include "globals.h"
+#include "SquidConfig.h"
+#include "Store.h"
+#include "store_key_md5.h"
#include "tools.h"
/// shared memory segment path to use for CollapsedForwarding queue
class CollapsedForwardingMsg
{
public:
- CollapsedForwardingMsg(): processId(-1) {}
+ CollapsedForwardingMsg(): sender(-1) { key[0] = key[1] = 0; }
public:
- int processId; /// ID of sending process
- // XXX: add entry info
+ int sender; /// kid ID of sending process
+ uint64_t key[2]; ///< StoreEntry key
};
// CollapsedForwarding
CollapsedForwarding::Init()
{
Must(!queue.get());
- queue.reset(new Queue(ShmLabel, KidIdentifier));
+ if (UsingSmp() && IamWorkerProcess())
+ queue.reset(new Queue(ShmLabel, KidIdentifier));
}
void
-CollapsedForwarding::NewData(const StoreIOState &sio)
+CollapsedForwarding::Broadcast(const cache_key *key)
{
+ if (!queue.get())
+ return;
+
CollapsedForwardingMsg msg;
- msg.processId = KidIdentifier;
- // XXX: copy data from sio
+ msg.sender = KidIdentifier;
+ memcpy(msg.key, key, sizeof(msg.key));
+
+ debugs(17, 5, storeKeyText(key) << " to " << Config.workers << "-1 workers");
// TODO: send only to workers who are waiting for data
- // XXX: does not work for non-daemon mode?
for (int workerId = 1; workerId <= Config.workers; ++workerId) {
try {
- if (queue->push(workerId, msg))
+ if (workerId != KidIdentifier && queue->push(workerId, msg))
Notify(workerId);
} catch (const Queue::Full &) {
debugs(17, DBG_IMPORTANT, "Worker collapsed forwarding push queue "
CollapsedForwarding::Notify(const int workerId)
{
// TODO: Count and report the total number of notifications, pops, pushes.
- debugs(17, 7, HERE << "kid" << workerId);
+ debugs(17, 7, "to kid" << workerId);
Ipc::TypedMsgHdr msg;
// TODO: add proper message type?
msg.setType(Ipc::mtCollapsedForwardingNotification);
void
CollapsedForwarding::HandleNewData(const char *const when)
{
- debugs(17, 4, HERE << "popping all " << when);
+ debugs(17, 4, "popping all " << when);
CollapsedForwardingMsg msg;
int workerId;
int poppedCount = 0;
while (queue->pop(workerId, msg)) {
- debugs(17, 3, HERE << "collapsed forwarding data message from " <<
- workerId);
- if (workerId != msg.processId) {
- debugs(17, DBG_IMPORTANT, HERE << "mismatching IDs: " << workerId <<
- " != " << msg.processId);
+ debugs(17, 3, "message from kid" << workerId);
+ if (workerId != msg.sender) {
+ debugs(17, DBG_IMPORTANT, "mismatching kid IDs: " << workerId <<
+ " != " << msg.sender);
}
+ Store::Root().syncCollapsed(reinterpret_cast<const cache_key*>(msg.key));
+
// XXX: stop and schedule an async call to continue
assert(++poppedCount < SQUID_MAXFD);
}
{
const int from = msg.getInt();
debugs(17, 7, HERE << "from " << from);
+ assert(queue.get());
queue->clearReaderSignal(from);
HandleNewData("after notification");
}
void CollapsedForwardingRr::open(const RunnerRegistry &)
{
- if (IamWorkerProcess())
- CollapsedForwarding::Init();
+ CollapsedForwarding::Init();
}
CollapsedForwardingRr::~CollapsedForwardingRr()
#include "ipc/Queue.h"
#include "ipc/forward.h"
+#include "typedefs.h"
#include <memory>
/// open shared memory segment
static void Init();
- /// notify other workers that new data is available
+ /// XXX: remove
static void NewData(const StoreIOState &sio);
+ /// notify other workers that new data is available
+ static void Broadcast(const cache_key *key);
+
/// kick worker with empty IPC queue
static void Notify(const int workerId);
url = xstrdup(aUrl);
}
-MemObject::MemObject(char const *aUrl, char const *aLog_url)
+MemObject::MemObject(char const *aUrl, char const *aLog_url): mem_index(-1)
{
debugs(20, 3, HERE << "new MemObject " << this);
_reply = new HttpReply;
assert(chksum == url_checksum(url));
#endif
+ assert(mem_index < 0);
if (!shutting_down)
assert(swapout.sio == NULL);
} abort;
char *log_url;
RemovalPolicyNode repl;
+ int32_t mem_index; ///< entry position inside the [shared] memory cache
int id;
int64_t object_sz;
size_t swap_hdr_sz;
#include "squid.h"
#include "base/RunnersRegistry.h"
+#include "CollapsedForwarding.h"
#include "HttpReply.h"
#include "ipc/mem/Page.h"
#include "ipc/mem/Pages.h"
if (!slot)
return NULL;
- const Ipc::StoreMapAnchor::Basics &basics = slot->basics;
-
// create a brand new store entry and initialize it with stored info
StoreEntry *e = new StoreEntry();
e->lock_count = 0;
- e->swap_file_sz = basics.swap_file_sz;
- e->lastref = basics.lastref;
- e->timestamp = basics.timestamp;
- e->expires = basics.expires;
- e->lastmod = basics.lastmod;
- e->refcount = basics.refcount;
- e->flags = basics.flags;
-
- e->store_status = STORE_OK;
- e->mem_status = IN_MEMORY; // setMemStatus(IN_MEMORY) requires mem_obj
- //e->swap_status = set in StoreEntry constructor to SWAPOUT_NONE;
- e->ping_status = PING_NONE;
+ // XXX: We do not know the URLs yet, only the key, but we need to parse and
+ // store the response for the Root().get() callers to be happy because they
+ // expect IN_MEMORY entries to already have the response headers and body.
+ // At least one caller calls createMemObject() if there is not one, so
+ // we hide the true object until that happens (to avoid leaking TBD URLs).
+ e->createMemObject("TBD", "TBD");
- EBIT_SET(e->flags, ENTRY_CACHABLE);
- EBIT_CLR(e->flags, RELEASE_REQUEST);
- EBIT_CLR(e->flags, KEY_PRIVATE);
- EBIT_SET(e->flags, ENTRY_VALIDATED);
+ anchorEntry(*e, index, *slot);
const bool copied = copyFromShm(*e, index, *slot);
// we copied everything we could to local memory; no more need to lock
map->closeForReading(index);
+ e->mem_obj->mem_index = -1;
+
+ e->hideMemObject();
if (copied) {
e->hashInsert(key);
fatal("MemStore::get(key,callback,data) should not be called");
}
+bool
+MemStore::anchorCollapsed(StoreEntry &collapsed)
+{
+ if (!map)
+ return false;
+
+ sfileno index;
+ const Ipc::StoreMapAnchor *const slot = map->openForReading(
+ reinterpret_cast<cache_key*>(collapsed.key), index);
+ if (!slot)
+ return false;
+
+ anchorEntry(collapsed, index, *slot);
+ return updateCollapsedWith(collapsed, index, *slot);
+}
+
+bool
+MemStore::updateCollapsed(StoreEntry &collapsed)
+{
+ if (!map)
+ return false;
+
+ if (collapsed.mem_status != IN_MEMORY) // no longer using a memory cache
+ return false;
+
+ const sfileno index = collapsed.mem_obj->mem_index;
+
+ // already disconnected from the cache, no need to update
+ if (index < 0)
+ return true;
+
+ const Ipc::StoreMapAnchor &anchor = map->readableEntry(index);
+ return updateCollapsedWith(collapsed, index, anchor);
+}
+
+bool
+MemStore::updateCollapsedWith(StoreEntry &collapsed, const sfileno index, const Ipc::StoreMapAnchor &anchor)
+{
+ collapsed.swap_file_sz = anchor.basics.swap_file_sz; // XXX: make atomic
+
+ const bool copied = copyFromShm(collapsed, index, anchor);
+
+ return copied; // XXX: when do we unlock the map slot?
+}
+
+/// anchors StoreEntry to an already locked map entry
+void
+MemStore::anchorEntry(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnchor &anchor)
+{
+ const Ipc::StoreMapAnchor::Basics &basics = anchor.basics;
+
+ e.swap_file_sz = basics.swap_file_sz;
+ e.lastref = basics.lastref;
+ e.timestamp = basics.timestamp;
+ e.expires = basics.expires;
+ e.lastmod = basics.lastmod;
+ e.refcount = basics.refcount;
+ e.flags = basics.flags;
+
+ assert(e.mem_obj);
+ e.store_status = STORE_OK;
+ e.setMemStatus(IN_MEMORY);
+ e.mem_obj->mem_index = index;
+ assert(e.swap_status == SWAPOUT_NONE); // set in StoreEntry constructor
+ e.ping_status = PING_NONE;
+
+ EBIT_SET(e.flags, ENTRY_CACHABLE);
+ EBIT_CLR(e.flags, RELEASE_REQUEST);
+ EBIT_CLR(e.flags, KEY_PRIVATE);
+ EBIT_SET(e.flags, ENTRY_VALIDATED);
+}
+
/// copies the entire entry from shared to local memory
bool
MemStore::copyFromShm(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnchor &anchor)
{
debugs(20, 7, "mem-loading entry " << index << " from " << anchor.start);
- // XXX: We do not know the URLs yet, only the key, but we need to parse and
- // store the response for the Root().get() callers to be happy because they
- // expect IN_MEMORY entries to already have the response headers and body.
- // At least one caller calls createMemObject() if there is not one, so
- // we hide the true object until that happens (to avoid leaking TBD URLs).
- e.createMemObject("TBD", "TBD");
-
// emulate the usual Store code but w/o inapplicable checks and callbacks:
- Ipc::StoreMapSliceId sid = anchor.start;
- int64_t offset = 0;
+ Ipc::StoreMapSliceId sid = anchor.start; // optimize: remember the last sid
+ bool wasEof = anchor.complete() && sid < 0;
+ int64_t sliceOffset = 0;
while (sid >= 0) {
const Ipc::StoreMapSlice &slice = map->readableSlice(index, sid);
- const MemStoreMap::Extras &extras = map->extras(sid);
- StoreIOBuffer sliceBuf(slice.size, offset,
- static_cast<char*>(PagePointer(extras.page)));
- if (!copyFromShmSlice(e, sliceBuf, slice.next < 0))
- return false;
- debugs(20, 9, "entry " << index << " slice " << sid << " filled " <<
- extras.page);
- offset += slice.size;
- sid = slice.next;
+ // slice state may change during copying; take snapshots now
+ wasEof = anchor.complete() && slice.next < 0;
+ const Ipc::StoreMapSlice::Size wasSize = slice.size;
+
+ if (e.mem_obj->endOffset() < sliceOffset + wasSize) {
+ // size of the slice data that we already copied
+ const size_t prefixSize = e.mem_obj->endOffset() - sliceOffset;
+ assert(prefixSize <= wasSize);
+
+ const MemStoreMap::Extras &extras = map->extras(sid);
+ char *page = static_cast<char*>(PagePointer(extras.page));
+ const StoreIOBuffer sliceBuf(wasSize - prefixSize,
+ e.mem_obj->endOffset(),
+ page + prefixSize);
+ if (!copyFromShmSlice(e, sliceBuf, wasEof))
+ return false;
+ debugs(20, 9, "entry " << index << " copied slice " << sid <<
+ " from " << extras.page << " +" << prefixSize);
+ }
+ // else skip a [possibly incomplete] slice that we copied earlier
+
+ // careful: the slice may have grown _and_ gotten the next slice ID!
+ if (slice.next >= 0) {
+ assert(!wasEof);
+ // here we know that slice.size may not change any more
+ if (wasSize >= slice.size) { // did not grow since we started copying
+ sliceOffset += wasSize;
+ sid = slice.next;
+ }
+ } else if (wasSize >= slice.size) { // did not grow
+ break;
+ }
+ }
+
+ if (!wasEof) {
+ debugs(20, 7, "mem-loaded " << e.mem_obj->endOffset() << '/' <<
+ anchor.basics.swap_file_sz << " bytes of " << e);
+ return true;
}
e.mem_obj->object_sz = e.mem_obj->endOffset(); // from StoreEntry::complete()
assert(static_cast<uint64_t>(e.mem_obj->object_sz) == anchor.basics.swap_file_sz);
// would be nice to call validLength() here, but it needs e.key
-
- e.hideMemObject();
-
+ // XXX: unlock acnhor here!
return true;
}
/// imports one shared memory slice into local memory
bool
-MemStore::copyFromShmSlice(StoreEntry &e, StoreIOBuffer &buf, bool eof)
+MemStore::copyFromShmSlice(StoreEntry &e, const StoreIOBuffer &buf, bool eof)
{
debugs(20, 7, "buf: " << buf.offset << " + " << buf.length);
if (copyToShm(e, index, *slot)) {
slot->set(e);
map->closeForWriting(index, false);
+ CollapsedForwarding::Broadcast(static_cast<const cache_key*>(e.key));
return;
}
// fall through to the error handling code
}
map->abortIo(index);
+ CollapsedForwarding::Broadcast(static_cast<cache_key*>(e.key));
}
/// copies all local data to shared memory
}
}
+void
+MemStore::unlink(StoreEntry &e)
+{
+ assert(e.mem_obj);
+ if (e.mem_obj->mem_index >= 0) {
+ map->freeEntry(e.mem_obj->mem_index);
+ disconnect(e);
+ } else {
+ map->freeEntryByKey(reinterpret_cast<cache_key*>(e.key));
+ }
+ e.destroyMemObject();
+}
+
+void
+MemStore::disconnect(StoreEntry &e)
+{
+ assert(e.mem_obj);
+ if (e.mem_obj->mem_index >= 0) {
+ map->abortIo(e.mem_obj->mem_index);
+ e.mem_obj->mem_index = -1;
+ }
+}
+
/// calculates maximum number of entries we need to store and map
int64_t
MemStore::EntryLimit()
/// whether e should be kept in local RAM for possible future caching
bool keepInLocalMemory(const StoreEntry &e) const;
+ /// remove from the cache
+ void unlink(StoreEntry &e);
+
+ /// called when the entry is about to forget its association with mem cache
+ void disconnect(StoreEntry &e);
+
/* Store API */
virtual int callback();
virtual StoreEntry * get(const cache_key *);
virtual void reference(StoreEntry &);
virtual bool dereference(StoreEntry &, bool);
virtual void maintain();
+ virtual bool anchorCollapsed(StoreEntry &collapsed);
+ virtual bool updateCollapsed(StoreEntry &collapsed);
static int64_t EntryLimit();
bool copyToShm(StoreEntry &e, const sfileno index, Ipc::StoreMapAnchor &anchor);
bool copyToShmSlice(StoreEntry &e, const sfileno index, Ipc::StoreMapAnchor &anchor, int64_t &offset);
bool copyFromShm(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnchor &anchor);
- bool copyFromShmSlice(StoreEntry &e, StoreIOBuffer &buf, bool eof);
+ bool copyFromShmSlice(StoreEntry &e, const StoreIOBuffer &buf, bool eof);
+
+ void anchorEntry(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnchor &anchor);
+ bool updateCollapsedWith(StoreEntry &collapsed, const sfileno index, const Ipc::StoreMapAnchor &anchor);
sfileno reserveSapForWriting(Ipc::Mem::PageId &page);
/// makes the entry available for collapsing future requests
virtual void allowCollapsing(StoreEntry *e, const RequestFlags &reqFlags, const HttpRequestMethod &reqMethod) {}
+ // XXX: This method belongs to Store::Root/StoreController, but it is here
+ // to avoid casting Root() to StoreController until Root() API is fixed.
+ /// Update local intransit entry after changes made by appending worker.
+ virtual void syncCollapsed(const cache_key *key) {}
+
+ // XXX: This method belongs to Store::Root/StoreController, but it is here
+ // to avoid casting Root() to StoreController until Root() API is fixed.
+ /// removes the entry from the memory cache
+ virtual void memoryUnlink(StoreEntry &e) {}
+
+ /// if the entry is found, tie it to this cache and call updateCollapsed()
+ virtual bool anchorCollapsed(StoreEntry &collapsed) { return false; }
+
+ /// update a local collapsed entry with fresh info from this cache (if any)
+ virtual bool updateCollapsed(StoreEntry &collapsed) { return false; }
+
private:
static RefCount<Store> CurrentRoot;
};
/* Store parent API */
virtual void handleIdleEntry(StoreEntry &e);
virtual void maybeTrimMemory(StoreEntry &e, const bool preserveSwappable);
+ virtual void memoryUnlink(StoreEntry &e);
virtual void allowCollapsing(StoreEntry *e, const RequestFlags &reqFlags, const HttpRequestMethod &reqMethod);
+ virtual void syncCollapsed(const cache_key *key);
virtual void init();
private:
void createOneStore(Store &aStore);
bool keepForLocalMemoryCache(const StoreEntry &e) const;
+ bool anchorCollapsedOnDisk(StoreEntry &collapsed);
StorePointer swapDir; ///< summary view of all disk caches
MemStore *memStore; ///< memory cache
// neighbors_do_private_keys (which is true in most cases and by default).
// This is nothing but waste of CPU cycles. Need a better API to avoid it.
e->setPublicKey();
+ assert(e->key);
- assert(e->next); // e->hashInsert(key) is done in setPublicKey()
return e;
}
}
Rock::WriteRequest::WriteRequest(const ::WriteRequest &base,
- const IoState::Pointer &anSio,
- const bool last):
+ const IoState::Pointer &anSio):
::WriteRequest(base),
sio(anSio),
- isLast(last)
+ sidCurrent(-1),
+ sidNext(-1)
{
}
class WriteRequest: public ::WriteRequest
{
public:
- WriteRequest(const ::WriteRequest &base, const IoState::Pointer &anSio, const bool last);
+ WriteRequest(const ::WriteRequest &base, const IoState::Pointer &anSio);
IoState::Pointer sio;
- const bool isLast;
+
+ /// slot being written using this write request
+ SlotId sidCurrent;
+
+ /// allocated next slot (negative if we are writing the last slot)
+ SlotId sidNext;
private:
CBDATA_CLASS2(WriteRequest);
// write partial buffer for all collapsed hit readers to see
// XXX: can we check that this is needed w/o stalling readers
// that appear right after our check?
- writeBufToDisk(false);
+ writeBufToDisk(-1);
}
}
// TODO: if DiskIO module is mmap-based, we should be writing whole pages
// to avoid triggering read-page;new_head+old_tail;write-page overheads
- // finalize map slice
- Ipc::StoreMap::Slice &slice =
- dir->map->writeableSlice(swap_filen, sidCurrent);
- slice.next = sidNext;
- slice.size = theBuf.size - sizeof(DbCellHeader);
-
// finalize db cell header
DbCellHeader header;
memcpy(header.key, e->key, sizeof(header.key));
// copy finalized db cell header into buffer
memcpy(theBuf.mem, &header, sizeof(DbCellHeader));
- writeBufToDisk(sidNext < 0);
+ writeBufToDisk(sidNext);
theBuf.clear();
sidCurrent = sidNext;
/// Write header-less (XXX) or complete buffer to disk.
void
-Rock::IoState::writeBufToDisk(const bool last)
+Rock::IoState::writeBufToDisk(const SlotId sidNext)
{
// and now allocate another buffer for the WriteRequest so that
// we can support concurrent WriteRequests (and to ease cleaning)
WriteRequest *const r = new WriteRequest(
::WriteRequest(static_cast<char*>(wBuf), diskOffset, theBuf.size,
- memFreeBufFunc(wBufCap)), this, last);
+ memFreeBufFunc(wBufCap)), this);
+ r->sidCurrent = sidCurrent;
+ r->sidNext = sidNext;
// theFile->write may call writeCompleted immediatelly
theFile->write(r);
void tryWrite(char const *buf, size_t size, off_t offset);
size_t writeToBuffer(char const *buf, size_t size);
void writeToDisk(const SlotId nextSlot);
- void writeBufToDisk(const bool last);
+ void writeBufToDisk(const SlotId nextSlot);
SlotId reserveSlotForWriting();
void callBack(int errflag);
if (!slot)
return NULL;
- const Ipc::StoreMapAnchor::Basics &basics = slot->basics;
-
// create a brand new store entry and initialize it with stored basics
StoreEntry *e = new StoreEntry();
e->lock_count = 0;
- e->swap_dirn = index;
- e->swap_filen = filen;
- e->swap_file_sz = basics.swap_file_sz;
- e->lastref = basics.lastref;
- e->timestamp = basics.timestamp;
- e->expires = basics.expires;
- e->lastmod = basics.lastmod;
- e->refcount = basics.refcount;
- e->flags = basics.flags;
- e->store_status = STORE_OK;
- e->setMemStatus(NOT_IN_MEMORY);
- e->swap_status = SWAPOUT_DONE;
- e->ping_status = PING_NONE;
- EBIT_SET(e->flags, ENTRY_CACHABLE);
- EBIT_CLR(e->flags, RELEASE_REQUEST);
- EBIT_CLR(e->flags, KEY_PRIVATE);
- EBIT_SET(e->flags, ENTRY_VALIDATED);
+ anchorEntry(*e, filen, *slot);
+
e->hashInsert(key);
trackReferences(*e);
// the disk entry remains open for reading, protected from modifications
}
+bool
+Rock::SwapDir::anchorCollapsed(StoreEntry &collapsed)
+{
+ if (!map || !theFile || !theFile->canRead())
+ return false;
+
+ sfileno filen;
+ const Ipc::StoreMapAnchor *const slot = map->openForReading(
+ reinterpret_cast<cache_key*>(collapsed.key), filen);
+ if (!slot)
+ return false;
+
+ anchorEntry(collapsed, filen, *slot);
+ return updateCollapsedWith(collapsed, *slot);
+}
+
+bool
+Rock::SwapDir::updateCollapsed(StoreEntry &collapsed)
+{
+ if (!map || !theFile || !theFile->canRead())
+ return false;
+
+ if (collapsed.swap_filen < 0) // no longer using a disk cache
+ return true;
+ assert(collapsed.swap_dirn == index);
+
+ const Ipc::StoreMapAnchor &s = map->readableEntry(collapsed.swap_filen);
+ return updateCollapsedWith(collapsed, s);
+}
+
+bool
+Rock::SwapDir::updateCollapsedWith(StoreEntry &collapsed, const Ipc::StoreMapAnchor &anchor)
+{
+ collapsed.swap_file_sz = anchor.basics.swap_file_sz; // XXX: make atomic
+ return true;
+}
+
+void
+Rock::SwapDir::anchorEntry(StoreEntry &e, const sfileno filen, const Ipc::StoreMapAnchor &anchor)
+{
+ const Ipc::StoreMapAnchor::Basics &basics = anchor.basics;
+
+ e.swap_file_sz = basics.swap_file_sz;
+ e.swap_dirn = index;
+ e.swap_filen = filen;
+ e.lastref = basics.lastref;
+ e.timestamp = basics.timestamp;
+ e.expires = basics.expires;
+ e.lastmod = basics.lastmod;
+ e.refcount = basics.refcount;
+ e.flags = basics.flags;
+
+ e.store_status = STORE_OK;
+ e.setMemStatus(NOT_IN_MEMORY);
+ e.swap_status = SWAPOUT_DONE;
+ e.ping_status = PING_NONE;
+
+ EBIT_SET(e.flags, ENTRY_CACHABLE);
+ EBIT_CLR(e.flags, RELEASE_REQUEST);
+ EBIT_CLR(e.flags, KEY_PRIVATE);
+ EBIT_SET(e.flags, ENTRY_VALIDATED);
+}
+
+
void Rock::SwapDir::disconnect(StoreEntry &e)
{
assert(e.swap_dirn == index);
return;
}
- // XXX: can we check that this is needed w/o stalling readers
- // that appear right after our check?
- if (Config.onoff.collapsed_forwarding)
- CollapsedForwarding::NewData(sio);
-
if (errflag == DISK_OK) {
// do not increment sio.offset_ because we do it in sio->write()
- if (request->isLast) {
+
+ // finalize the shared slice info after writing slice contents to disk
+ Ipc::StoreMap::Slice &slice =
+ map->writeableSlice(sio.swap_filen, request->sidCurrent);
+ slice.size = request->len - sizeof(DbCellHeader);
+ slice.next = request->sidNext;
+
+ if (request->sidNext < 0) {
// close, the entry gets the read lock
map->closeForWriting(sio.swap_filen, true);
sio.finishedWriting(errflag);
sio.finishedWriting(errflag);
// and hope that Core will call disconnect() to close the map entry
}
+
+ CollapsedForwarding::Broadcast(static_cast<const cache_key*>(sio.e->key));
}
void
uint64_t slotSize; ///< all db slots are of this size
protected:
+ /* Store API */
+ virtual bool anchorCollapsed(StoreEntry &collapsed);
+ virtual bool updateCollapsed(StoreEntry &collapsed);
+
/* protected ::SwapDir API */
virtual bool needsDiskStrand() const;
virtual void init();
int entryMaxPayloadSize() const;
int entriesNeeded(const int64_t objSize) const;
+ void anchorEntry(StoreEntry &e, const sfileno filen, const Ipc::StoreMapAnchor &anchor);
+ bool updateCollapsedWith(StoreEntry &collapsed, const Ipc::StoreMapAnchor &anchor);
+
friend class Rebuild;
friend class IoState;
const char *filePath; ///< location of cache storage file inside path/
Ipc::ReadWriteLock::lockShared()
{
++readers; // this locks "new" writers out
- if (!writers) // there are no old writers
+ if (!writers || appending) // there are no old writers or sharing is OK
return true;
--readers;
return false;
void
Ipc::ReadWriteLock::unlockExclusive()
{
+ appending = 0;
assert(writers-- > 0);
}
unlockExclusive();
}
+void
+Ipc::ReadWriteLock::startAppending()
+{
+ assert(writers > 0);
+ appending++;
+}
+
void
Ipc::ReadWriteLock::updateStats(ReadWriteLockStats &stats) const
{
} else if (writers) {
++stats.writeable;
stats.writers += writers;
+ stats.appenders += appending;
} else {
++stats.idle;
}
const int locked = readers + writers;
storeAppendPrintf(&e, "Readers: %9d %6.2f%%\n",
readers, (100.0 * readers / locked));
- storeAppendPrintf(&e, "Writers: %9d %6.2f%%\n",
- writers, (100.0 * writers / locked));
+ const double appPerc = writers ? (100.0 * appenders / writers) : 0.0;
+ storeAppendPrintf(&e, "Writers: %9d %6.2f%% including Appenders: %9d %6.2f%%\n",
+ writers, (100.0 * writers / locked),
+ appenders, appPerc);
}
}
class ReadWriteLockStats;
/// an atomic readers-writer or shared-exclusive lock suitable for maps/tables
+/// Also supports reading-while-appending mode when readers and writer are
+/// allowed to access the same locked object because the writer promisses
+/// to only append new data and all size-related object properties are atomic.
class ReadWriteLock
{
public:
void unlockExclusive(); ///< undo successful exclusiveLock()
void switchExclusiveToShared(); ///< stop writing, start reading
+ void startAppending(); ///< writer keeps its lock but also allows reading
+
/// adds approximate current stats to the supplied ones
void updateStats(ReadWriteLockStats &stats) const;
public:
mutable Atomic::Word readers; ///< number of users trying to read
Atomic::Word writers; ///< number of writers trying to modify protected data
+ Atomic::Word appending; ///< the writer has promissed to only append
};
/// approximate stats of a set of ReadWriteLocks
int idle; ///< number of unlocked locks
int readers; ///< sum of lock.readers
int writers; ///< sum of lock.writers
+ int appenders; ///< number of appending writers
};
} // namespace Ipc
// note: we do not lock, so comparison may be inacurate
- if (inode.state == Anchor::Empty)
+ if (inode.empty())
return +2;
if (const time_t diff = newVersion - inode.basics.timestamp)
assert(valid(fileno));
Anchor &inode = shared->slots[fileno].anchor;
- assert(inode.state == Anchor::Writeable);
+ assert(inode.writing());
// we do not iterate slices because we were told to forget about
// them; the caller is responsible for freeing them (most likely
// our slice list is incomplete or has holes)
inode.waitingToBeFreed = false;
- inode.state = Anchor::Empty;
+ inode.rewind();
inode.lock.unlockExclusive();
--shared->count;
ReadWriteLock &lock = s.lock;
if (lock.lockExclusive()) {
- assert(s.state != Anchor::Writeable); // until we start breaking locks
+ assert(s.writing() && !s.reading());
// bail if we cannot empty this position
- if (!s.waitingToBeFreed && s.state == Anchor::Readable && !overwriteExisting) {
+ if (!s.waitingToBeFreed && !s.empty() && !overwriteExisting) {
lock.unlockExclusive();
debugs(54, 5, "cannot open existing entry " << fileno <<
" for writing " << path);
}
// free if the entry was used, keeping the entry locked
- if (s.waitingToBeFreed || s.state == Anchor::Readable)
+ if (s.waitingToBeFreed || !s.empty())
freeChain(fileno, s, true);
- assert(s.state == Anchor::Empty);
+ assert(s.empty());
++shared->count;
- s.state = Anchor::Writeable;
//s.setKey(key); // XXX: the caller should do that
debugs(54, 5, "opened entry " << fileno << " for writing " << path);
return NULL;
}
+void
+Ipc::StoreMap::startAppending(const sfileno fileno)
+{
+ assert(valid(fileno));
+ Anchor &s = shared->slots[fileno].anchor;
+ assert(s.writing());
+ s.lock.startAppending();
+ debugs(54, 5, "restricted entry " << fileno << " to appending " << path);
+}
+
void
Ipc::StoreMap::closeForWriting(const sfileno fileno, bool lockForReading)
{
assert(valid(fileno));
Anchor &s = shared->slots[fileno].anchor;
- assert(s.state == Anchor::Writeable);
- s.state = Anchor::Readable;
+ assert(s.writing());
if (lockForReading) {
s.lock.switchExclusiveToShared();
debugs(54, 5, "switched entry " << fileno <<
" from writing to reading " << path);
+ assert(s.complete());
} else {
s.lock.unlockExclusive();
debugs(54, 5, "closed entry " << fileno << " for writing " << path);
+ // cannot assert completeness here because we have no lock
}
}
Ipc::StoreMap::writeableSlice(const AnchorId anchorId, const SliceId sliceId)
{
assert(valid(anchorId));
- assert(shared->slots[anchorId].anchor.state == Anchor::Writeable);
+ assert(shared->slots[anchorId].anchor.writing());
assert(valid(sliceId));
return shared->slots[sliceId].slice;
}
Ipc::StoreMap::readableSlice(const AnchorId anchorId, const SliceId sliceId) const
{
assert(valid(anchorId));
- assert(shared->slots[anchorId].anchor.state == Anchor::Readable);
+ assert(shared->slots[anchorId].anchor.reading());
assert(valid(sliceId));
return shared->slots[sliceId].slice;
}
Ipc::StoreMap::writeableEntry(const AnchorId anchorId)
{
assert(valid(anchorId));
- assert(shared->slots[anchorId].anchor.state == Anchor::Writeable);
+ assert(shared->slots[anchorId].anchor.writing());
+ return shared->slots[anchorId].anchor;
+}
+
+const Ipc::StoreMap::Anchor &
+Ipc::StoreMap::readableEntry(const AnchorId anchorId) const
+{
+ assert(valid(anchorId));
+ assert(shared->slots[anchorId].anchor.reading());
return shared->slots[anchorId].anchor;
}
debugs(54, 5, "aborting entry " << fileno << " for writing " << path);
assert(valid(fileno));
Anchor &s = shared->slots[fileno].anchor;
- assert(s.state == Anchor::Writeable);
- freeChain(fileno, s, false);
- debugs(54, 5, "closed entry " << fileno << " for writing " << path);
+ assert(s.writing());
+ s.lock.appending = false; // locks out any new readers
+ if (!s.lock.readers) {
+ freeChain(fileno, s, false);
+ debugs(54, 5, "closed clean entry " << fileno << " for writing " << path);
+ } else {
+ s.waitingToBeFreed = true;
+ // XXX: s.state &= !Anchor::Writeable;
+ s.lock.unlockExclusive();
+ debugs(54, 5, "closed dirty entry " << fileno << " for writing " << path);
+ }
}
void
// The caller is a lock holder. Thus, if we are Writeable, then the
// caller must be the writer; otherwise the caller must be the reader.
- if (s.state == Anchor::Writeable)
+ if (s.writing())
abortWriting(fileno);
else
closeForReading(fileno);
{
assert(valid(fileno));
const Anchor &s = shared->slots[fileno].anchor;
- switch (s.state) {
- case Anchor::Readable:
+ if (s.reading())
return &s; // immediate access by lock holder so no locking
- case Anchor::Writeable:
- return NULL; // cannot read the slot when it is being written
- case Anchor::Empty:
- assert(false); // must be locked for reading or writing
- }
- assert(false); // not reachable
+ if (s.writing())
+ return NULL; // the caller is not a read lock holder
+ assert(false); // must be locked for reading or writing
return NULL;
}
s.waitingToBeFreed = true; // mark to free it later
}
+void
+Ipc::StoreMap::freeEntryByKey(const cache_key *const key)
+{
+ debugs(54, 5, "marking entry with key " << storeKeyText(key)
+ << " to be freed in " << path);
+
+ const int idx = anchorIndexByKey(key);
+ Anchor &s = shared->slots[idx].anchor;
+ if (s.lock.lockExclusive()) {
+ if (s.sameKey(key))
+ freeChain(idx, s, true);
+ s.lock.unlockExclusive();
+ } else if (s.lock.lockShared()) {
+ if (s.sameKey(key))
+ s.waitingToBeFreed = true; // mark to free it later
+ s.lock.unlockShared();
+ } else {
+ // we cannot be sure that the entry we found is ours because we do not
+ // have a lock on it, but we still check to minimize false deletions
+ if (s.sameKey(key))
+ s.waitingToBeFreed = true; // mark to free it later
+ }
+}
+
/// unconditionally frees an already locked chain of slots, unlocking if needed
void
Ipc::StoreMap::freeChain(const sfileno fileno, Anchor &inode, const bool keepLocked)
{
- debugs(54, 7, "freeing " << inode.state << " entry " << fileno <<
+ debugs(54, 7, "freeing entry " << fileno <<
" in " << path);
- if (inode.state != Anchor::Empty) {
+ if (!inode.empty()) {
sfileno sliceId = inode.start;
debugs(54, 8, "first slice " << sliceId);
while (sliceId >= 0) {
}
inode.waitingToBeFreed = false;
- inode.state = Anchor::Empty;
+ inode.rewind();
if (!keepLocked)
inode.lock.unlockExclusive();
return NULL;
}
- if (s.state == Anchor::Empty) {
+ if (s.empty()) {
s.lock.unlockShared();
debugs(54, 7, "cannot open empty entry " << fileno <<
" for reading " << path);
return NULL;
}
- // cannot be Writing here if we got shared lock and checked Empty above
- assert(s.state == Anchor::Readable);
debugs(54, 5, "opened entry " << fileno << " for reading " << path);
return &s;
}
{
assert(valid(fileno));
Anchor &s = shared->slots[fileno].anchor;
- assert(s.state == Anchor::Readable);
+ assert(s.reading());
s.lock.unlockShared();
debugs(54, 5, "closed entry " << fileno << " for reading " << path);
}
assert(valid(fileno));
Anchor &s = shared->slots[fileno].anchor;
if (s.lock.lockExclusive()) {
- if (s.state == Anchor::Readable) { // skip empties
+ if (!s.empty()) {
// this entry may be marked for deletion, and that is OK
freeChain(fileno, s, false);
debugs(54, 5, "purged entry " << fileno << " from " << path);
/* Ipc::StoreMapAnchor */
-Ipc::StoreMapAnchor::StoreMapAnchor(): start(0), state(Empty)
+Ipc::StoreMapAnchor::StoreMapAnchor(): start(0)
{
memset(&key, 0, sizeof(key));
memset(&basics, 0, sizeof(basics));
void
Ipc::StoreMapAnchor::set(const StoreEntry &from)
{
- assert(state == Writeable);
+ assert(writing() && !reading());
memcpy(key, from.key, sizeof(key));
basics.timestamp = from.timestamp;
basics.lastref = from.lastref;
void
Ipc::StoreMapAnchor::rewind()
{
- assert(state == Writeable);
+ assert(writing());
start = 0;
memset(&key, 0, sizeof(key));
memset(&basics, 0, sizeof(basics));
typedef int32_t StoreMapSliceId;
/// a piece of Store entry, linked to other pieces, forming a chain
+/// slices may be appended by writers while readers read the entry
class StoreMapSlice
{
public:
- StoreMapSlice(): next(-1), size(0) {}
+ typedef uint32_t Size;
- StoreMapSliceId next; ///< ID of the next slice occupied by the entry
- uint32_t size; ///< slice contents size
+ StoreMapSlice(): size(0), next(-1) {}
+
+ Atomic::WordT<Size> size; ///< slice contents size
+ Atomic::WordT<StoreMapSliceId> next; ///< ID of the next entry slice
};
/// undo the effects of set(), setKey(), etc., but keep locks and state
void rewind();
+ /* entry state may change immediately after calling these methods unless
+ * the caller holds an appropriate lock */
+ bool empty() const { return !key[0] && !key[1]; }
+ bool reading() const { return lock.readers; }
+ bool writing() const { return lock.writers; }
+ bool complete() const { return !empty() && !writing(); }
+
public:
mutable ReadWriteLock lock; ///< protects slot data below
Atomic::WordT<uint8_t> waitingToBeFreed; ///< may be accessed w/o a lock
+ // fields marked with [app] can be modified when appending-while-reading
+
uint64_t key[2]; ///< StoreEntry key
// STORE_META_STD TLV field from StoreEntry
time_t lastref;
time_t expires;
time_t lastmod;
- uint64_t swap_file_sz;
+ uint64_t swap_file_sz; // [app]; XXX: make atomic
uint16_t refcount;
uint16_t flags;
} basics;
- StoreMapSliceId start; ///< where the chain of StoreEntry slices begins
+ /// where the chain of StoreEntry slices begins [app]; XXX: make atomic
+ StoreMapSliceId start;
+#if 0
/// possible persistent states
typedef enum {
Empty, ///< ready for writing, with nothing of value
Readable, ///< ready for reading
} State;
State state; ///< current state
+#endif
};
/// A hack to allocate one shared array for both anchors and slices.
/// locks and returns an anchor for the empty fileno position; if
/// overwriteExisting is false and the position is not empty, returns nil
Anchor *openForWritingAt(sfileno fileno, bool overwriteExisting = true);
+ /// restrict opened for writing entry to appending operations; allow reads
+ void startAppending(const sfileno fileno);
/// successfully finish creating or updating the entry at fileno pos
void closeForWriting(const sfileno fileno, bool lockForReading = false);
/// unlock and "forget" openForWriting entry, making it Empty again
/// only works on locked entries; returns nil unless the slice is readable
const Anchor *peekAtReader(const sfileno fileno) const;
- /// if possible, free the entry and return true
- /// otherwise mark it as waiting to be freed and return false
+ /// free the entry if possible or mark it as waiting to be freed if not
void freeEntry(const sfileno fileno);
+ /// free the entry if possible or mark it as waiting to be freed if not
+ /// does nothing if we cannot check that the key matches the cached entry
+ void freeEntryByKey(const cache_key *const key);
/// opens entry (identified by key) for reading, increments read level
const Anchor *openForReading(const cache_key *const key, sfileno &fileno);
const Slice &readableSlice(const AnchorId anchorId, const SliceId sliceId) const;
/// writeable anchor for the entry created by openForWriting()
Anchor &writeableEntry(const AnchorId anchorId);
+ /// readable anchor for the entry created by openForReading()
+ const Anchor &readableEntry(const AnchorId anchorId) const;
/// called by lock holder to terminate either slice writing or reading
void abortIo(const sfileno fileno);
debugs(20, 3, "StoreEntry::purgeMem: Freeing memory-copy of " << getMD5Text());
- destroyMemObject();
+ Store::Root().memoryUnlink(*this);
if (swap_status != SWAPOUT_DONE)
release();
return;
}
+ Store::Root().memoryUnlink(*this);
+
if (StoreController::store_dirs_rebuilding && swap_filen > -1) {
setPrivateKey();
- if (mem_obj)
- destroyMemObject();
-
if (swap_filen > -1) {
/*
* Fake a call to StoreEntry->lock() When rebuilding is done,
unlink();
}
- setMemStatus(NOT_IN_MEMORY);
destroyStoreEntry(static_cast<hash_link *>(this));
PROF_stop(storeRelease);
}
fatal("not implemented");
}
+/// updates the collapsed entry with the corresponding on-disk entry, if any
+bool
+StoreController::anchorCollapsedOnDisk(StoreEntry &collapsed)
+{
+ // TODO: move this loop to StoreHashIndex, just like the one in get().
+ if (const int cacheDirs = Config.cacheSwap.n_configured) {
+ // ask each cache_dir until the entry is found; use static starting
+ // point to avoid asking the same subset of disks more often
+ // TODO: coordinate with put() to be able to guess the right disk often
+ static int idx = 0;
+ for (int n = 0; n < cacheDirs; ++n) {
+ idx = (idx + 1) % cacheDirs;
+ SwapDir *sd = dynamic_cast<SwapDir*>(INDEXSD(idx));
+ if (!sd->active())
+ continue;
+
+ if (sd->anchorCollapsed(collapsed)) {
+ debugs(20, 3, "cache_dir " << idx << " anchors " << collapsed);
+ return true;
+ }
+ }
+ }
+
+ debugs(20, 4, "none of " << Config.cacheSwap.n_configured <<
+ " cache_dirs have " << collapsed);
+ return false;
+}
+
// move this into [non-shared] memory cache class when we have one
/// whether e should be kept in local RAM for possible future caching
bool
e.trimMemory(preserveSwappable);
}
+void
+StoreController::memoryUnlink(StoreEntry &e)
+{
+ if (e.mem_status != IN_MEMORY)
+ return;
+
+ if (memStore)
+ memStore->unlink(e);
+ else // TODO: move into [non-shared] memory cache class when we have one
+ e.destroyMemObject();
+}
+
void
StoreController::handleIdleEntry(StoreEntry &e)
{
debugs(20, 3, "may " << (transients ? "SMP" : "") << " collapse " << *e);
}
+void
+StoreController::syncCollapsed(const cache_key *key)
+{
+ StoreEntry *collapsed = swapDir->get(key);
+ if (!collapsed) // the entry is no longer locally active, ignore the update
+ return;
+
+ debugs(20, 7, "syncing " << *collapsed);
+
+ bool inSync = false;
+ if (memStore && collapsed->mem_status == IN_MEMORY)
+ inSync = memStore->updateCollapsed(*collapsed);
+ else if (collapsed->swap_filen >= 0)
+ inSync = collapsed->store()->updateCollapsed(*collapsed);
+ else if (memStore && memStore->anchorCollapsed(*collapsed))
+ inSync = true; // found in the memory cache
+ else if (anchorCollapsedOnDisk(*collapsed))
+ inSync = true; // found in a disk cache
+
+ if (inSync) {
+ debugs(20, 5, "synced " << *collapsed);
+ collapsed->invokeHandlers();
+ } else { // the backing entry is no longer cached; abort this hit
+ debugs(20, 3, "aborting cacheless " << *collapsed);
+ collapsed->abort();
+ }
+}
StoreHashIndex::StoreHashIndex()
{