/*
- * Copyright (C) 1996-2014 The Squid Software Foundation and contributors
+ * Copyright (C) 1996-2018 The Squid Software Foundation and contributors
*
* Squid software is distributed under GPLv2+ license and includes
* contributions from numerous individuals and organizations.
// used except for a positivity test. A unique value is handy for debugging.
static const uint32_t SpacePoolId = 510716;
+/// Packs to shared memory, allocating new slots/pages as needed.
+/// Requires an Ipc::StoreMapAnchor locked for writing.
+class ShmWriter: public Packable
+{
+public:
+ ShmWriter(MemStore &aStore, StoreEntry *anEntry, const sfileno aFileNo, Ipc::StoreMapSliceId aFirstSlice = -1);
+
+ /* Packable API */
+ virtual void append(const char *aBuf, int aSize) override;
+ virtual void vappendf(const char *fmt, va_list ap) override;
+
+public:
+ StoreEntry *entry; ///< the entry being updated
+
+ /// the slot keeping the first byte of the appended content (at least)
+ /// either set via constructor parameter or allocated by the first append
+ Ipc::StoreMapSliceId firstSlice;
+
+ /// the slot keeping the last byte of the appended content (at least)
+ Ipc::StoreMapSliceId lastSlice;
+
+ uint64_t totalWritten; ///< cumulative number of bytes appended so far
+
+protected:
+ void copyToShm();
+ void copyToShmSlice(Ipc::StoreMap::Slice &slice);
+
+private:
+ MemStore &store;
+ const sfileno fileNo;
+
+ /* set by (and only valid during) append calls */
+ const char *buf; ///< content being appended now
+ int bufSize; ///< buf size
+ int bufWritten; ///< buf bytes appended so far
+};
+
+/* ShmWriter */
+
+ShmWriter::ShmWriter(MemStore &aStore, StoreEntry *anEntry, const sfileno aFileNo, Ipc::StoreMapSliceId aFirstSlice):
+ entry(anEntry),
+ firstSlice(aFirstSlice),
+ lastSlice(firstSlice),
+ totalWritten(0),
+ store(aStore),
+ fileNo(aFileNo),
+ buf(nullptr),
+ bufSize(0),
+ bufWritten(0)
+{
+ Must(entry);
+}
+
+void
+ShmWriter::append(const char *aBuf, int aBufSize)
+{
+ Must(!buf);
+ buf = aBuf;
+ bufSize = aBufSize;
+ if (bufSize) {
+ Must(buf);
+ bufWritten = 0;
+ copyToShm();
+ }
+ buf = nullptr;
+ bufSize = 0;
+ bufWritten = 0;
+}
+
+void
+ShmWriter::vappendf(const char *fmt, va_list ap)
+{
+ SBuf vaBuf;
+#if defined(VA_COPY)
+ va_list apCopy;
+ VA_COPY(apCopy, ap);
+ vaBuf.vappendf(fmt, apCopy);
+ va_end(apCopy);
+#else
+ vaBuf.vappendf(fmt, ap);
+#endif
+ append(vaBuf.rawContent(), vaBuf.length());
+}
+
+/// copies the entire buffer to shared memory
+void
+ShmWriter::copyToShm()
+{
+ Must(bufSize > 0); // do not use up shared memory pages for nothing
+ Must(firstSlice < 0 || lastSlice >= 0);
+
+ // fill, skip slices that are already full
+ while (bufWritten < bufSize) {
+ Ipc::StoreMap::Slice &slice = store.nextAppendableSlice(fileNo, lastSlice);
+ if (firstSlice < 0)
+ firstSlice = lastSlice;
+ copyToShmSlice(slice);
+ }
+
+ debugs(20, 7, "stored " << bufWritten << '/' << totalWritten << " header bytes of " << *entry);
+}
+
+/// copies at most one slice worth of buffer to shared memory
+void
+ShmWriter::copyToShmSlice(Ipc::StoreMap::Slice &slice)
+{
+ Ipc::Mem::PageId page = store.pageForSlice(lastSlice);
+ debugs(20, 7, "entry " << *entry << " slice " << lastSlice << " has " <<
+ page);
+
+ Must(bufWritten <= bufSize);
+ const int64_t writingDebt = bufSize - bufWritten;
+ const int64_t pageSize = Ipc::Mem::PageSize();
+ const int64_t sliceOffset = totalWritten % pageSize;
+ const int64_t copySize = std::min(writingDebt, pageSize - sliceOffset);
+ memcpy(static_cast<char*>(PagePointer(page)) + sliceOffset, buf + bufWritten,
+ copySize);
+
+ debugs(20, 7, "copied " << slice.size << '+' << copySize << " bytes of " <<
+ entry << " from " << sliceOffset << " in " << page);
+
+ slice.size += copySize;
+ bufWritten += copySize;
+ totalWritten += copySize;
+ // fresh anchor.basics.swap_file_sz is already set [to the stale value]
+
+ // either we wrote everything or we filled the entire slice
+ Must(bufWritten == bufSize || sliceOffset + copySize == pageSize);
+}
+
+/* MemStore */
+
MemStore::MemStore(): map(NULL), lastWritingSlice(-1)
{
}
}
bool
-MemStore::dereference(StoreEntry &, bool)
+MemStore::dereference(StoreEntry &)
{
// no need to keep e in the global store_table for us; we have our own map
return false;
}
-int
-MemStore::callback()
-{
- return 0;
-}
-
-StoreSearch *
-MemStore::search(String const, HttpRequest *)
-{
- fatal("not implemented");
- return NULL;
-}
-
StoreEntry *
MemStore::get(const cache_key *key)
{
// XXX: We do not know the URLs yet, only the key, but we need to parse and
// store the response for the Root().get() callers to be happy because they
// expect IN_MEMORY entries to already have the response headers and body.
- e->makeMemObject();
+ e->createMemObject();
anchorEntry(*e, index, *slot);
}
void
-MemStore::get(String const key, STOREGETCLIENT aCallback, void *aCallbackData)
+MemStore::updateHeaders(StoreEntry *updatedE)
+{
+ if (!map)
+ return;
+
+ Ipc::StoreMapUpdate update(updatedE);
+ assert(updatedE);
+ assert(updatedE->mem_obj);
+ if (!map->openForUpdating(update, updatedE->mem_obj->memCache.index))
+ return;
+
+ try {
+ updateHeadersOrThrow(update);
+ } catch (const std::exception &ex) {
+ debugs(20, 2, "error starting to update entry " << *updatedE << ": " << ex.what());
+ map->abortUpdating(update);
+ }
+}
+
+void
+MemStore::updateHeadersOrThrow(Ipc::StoreMapUpdate &update)
{
- // XXX: not needed but Store parent forces us to implement this
- fatal("MemStore::get(key,callback,data) should not be called");
+ // our +/- hdr_sz math below does not work if the chains differ [in size]
+ Must(update.stale.anchor->basics.swap_file_sz == update.fresh.anchor->basics.swap_file_sz);
+
+ const HttpReply *rawReply = update.entry->getReply();
+ Must(rawReply);
+ const HttpReply &reply = *rawReply;
+ const uint64_t staleHdrSz = reply.hdr_sz;
+ debugs(20, 7, "stale hdr_sz: " << staleHdrSz);
+
+ /* we will need to copy same-slice payload after the stored headers later */
+ Must(staleHdrSz > 0);
+ update.stale.splicingPoint = map->sliceContaining(update.stale.fileNo, staleHdrSz);
+ Must(update.stale.splicingPoint >= 0);
+ Must(update.stale.anchor->basics.swap_file_sz >= staleHdrSz);
+
+ Must(update.stale.anchor);
+ ShmWriter writer(*this, update.entry, update.fresh.fileNo);
+ reply.packHeadersInto(&writer);
+ const uint64_t freshHdrSz = writer.totalWritten;
+ debugs(20, 7, "fresh hdr_sz: " << freshHdrSz << " diff: " << (freshHdrSz - staleHdrSz));
+
+ /* copy same-slice payload remaining after the stored headers */
+ const Ipc::StoreMapSlice &slice = map->readableSlice(update.stale.fileNo, update.stale.splicingPoint);
+ const Ipc::StoreMapSlice::Size sliceCapacity = Ipc::Mem::PageSize();
+ const Ipc::StoreMapSlice::Size headersInLastSlice = staleHdrSz % sliceCapacity;
+ Must(headersInLastSlice > 0); // or sliceContaining() would have stopped earlier
+ Must(slice.size >= headersInLastSlice);
+ const Ipc::StoreMapSlice::Size payloadInLastSlice = slice.size - headersInLastSlice;
+ const MemStoreMapExtras::Item &extra = extras->items[update.stale.splicingPoint];
+ char *page = static_cast<char*>(PagePointer(extra.page));
+ debugs(20, 5, "appending same-slice payload: " << payloadInLastSlice);
+ writer.append(page + headersInLastSlice, payloadInLastSlice);
+ update.fresh.splicingPoint = writer.lastSlice;
+
+ update.fresh.anchor->basics.swap_file_sz -= staleHdrSz;
+ update.fresh.anchor->basics.swap_file_sz += freshHdrSz;
+
+ map->closeForUpdating(update);
}
bool
e.lastref = basics.lastref;
e.timestamp = basics.timestamp;
e.expires = basics.expires;
- e.lastmod = basics.lastmod;
+ e.lastModified(basics.lastmod);
e.refcount = basics.refcount;
e.flags = basics.flags;
e.ping_status = PING_NONE;
EBIT_CLR(e.flags, RELEASE_REQUEST);
- EBIT_CLR(e.flags, KEY_PRIVATE);
+ e.clearPrivate();
EBIT_SET(e.flags, ENTRY_VALIDATED);
MemObject::MemCache &mc = e.mem_obj->memCache;
// from store_client::readBody()
// parse headers if needed; they might span multiple slices!
HttpReply *rep = (HttpReply *)e.getReply();
- if (rep->pstate < psParsed) {
+ if (rep->pstate < Http::Message::psParsed) {
// XXX: have to copy because httpMsgParseStep() requires 0-termination
MemBuf mb;
mb.init(buf.length+1, buf.length+1);
mb.terminate();
const int result = rep->httpMsgParseStep(mb.buf, buf.length, eof);
if (result > 0) {
- assert(rep->pstate == psParsed);
+ assert(rep->pstate == Http::Message::psParsed);
EBIT_CLR(e.flags, ENTRY_FWD_HDR_WAIT);
} else if (result < 0) {
debugs(20, DBG_IMPORTANT, "Corrupted mem-cached headers: " << e);
assert(e.mem_obj);
- if (e.mem_obj->vary_headers) {
+ if (!e.mem_obj->vary_headers.isEmpty()) {
// XXX: We must store/load SerialisedMetaData to cache Vary in RAM
debugs(20, 5, "Vary not yet supported: " << e.mem_obj->vary_headers);
return false;
}
const int64_t expectedSize = e.mem_obj->expectedReplySize(); // may be < 0
-
- // objects of unknown size are not allowed into memory cache, for now
- if (expectedSize < 0) {
- debugs(20, 5, "Unknown expected size: " << e);
- return false;
- }
-
const int64_t loadedSize = e.mem_obj->endOffset();
const int64_t ramSize = max(loadedSize, expectedSize);
-
if (ramSize > maxObjectSize()) {
debugs(20, 5, HERE << "Too big max(" <<
loadedSize << ", " << expectedSize << "): " << e);
e.mem_obj->memCache.index = index;
e.mem_obj->memCache.io = MemObject::ioWriting;
slot->set(e);
- map->startAppending(index);
+ // Do not allow others to feed off an unknown-size entry because we will
+ // stop swapping it out if it grows too large.
+ if (e.mem_obj->expectedReplySize() >= 0)
+ map->startAppending(index);
e.memOutDecision(true);
return true;
}
assert(map);
assert(e.mem_obj);
- const int32_t index = e.mem_obj->memCache.index;
- assert(index >= 0);
- Ipc::StoreMapAnchor &anchor = map->writeableEntry(index);
-
const int64_t eSize = e.mem_obj->endOffset();
if (e.mem_obj->memCache.offset >= eSize) {
debugs(20, 5, "postponing copying " << e << " for lack of news: " <<
return; // nothing to do (yet)
}
- if (anchor.start < 0) { // must allocate the very first slot for e
- Ipc::Mem::PageId page;
- anchor.start = reserveSapForWriting(page); // throws
- extras->items[anchor.start].page = page;
- }
+ // throw if an accepted unknown-size entry grew too big or max-size changed
+ Must(eSize <= maxObjectSize());
+ const int32_t index = e.mem_obj->memCache.index;
+ assert(index >= 0);
+ Ipc::StoreMapAnchor &anchor = map->writeableEntry(index);
lastWritingSlice = anchor.start;
- const size_t sliceCapacity = Ipc::Mem::PageSize();
// fill, skip slices that are already full
// Optimize: remember lastWritingSlice in e.mem_obj
while (e.mem_obj->memCache.offset < eSize) {
- Ipc::StoreMap::Slice &slice =
- map->writeableSlice(e.mem_obj->memCache.index, lastWritingSlice);
-
- if (slice.size >= sliceCapacity) {
- if (slice.next >= 0) {
- lastWritingSlice = slice.next;
- continue;
- }
-
- Ipc::Mem::PageId page;
- slice.next = lastWritingSlice = reserveSapForWriting(page);
- extras->items[lastWritingSlice].page = page;
- debugs(20, 7, "entry " << index << " new slice: " << lastWritingSlice);
- }
-
- copyToShmSlice(e, anchor);
+ Ipc::StoreMap::Slice &slice = nextAppendableSlice(
+ e.mem_obj->memCache.index, lastWritingSlice);
+ if (anchor.start < 0)
+ anchor.start = lastWritingSlice;
+ copyToShmSlice(e, anchor, slice);
}
debugs(20, 7, "mem-cached available " << eSize << " bytes of " << e);
/// copies at most one slice worth of local memory to shared memory
void
-MemStore::copyToShmSlice(StoreEntry &e, Ipc::StoreMapAnchor &anchor)
+MemStore::copyToShmSlice(StoreEntry &e, Ipc::StoreMapAnchor &anchor, Ipc::StoreMap::Slice &slice)
{
- Ipc::StoreMap::Slice &slice =
- map->writeableSlice(e.mem_obj->memCache.index, lastWritingSlice);
-
- Ipc::Mem::PageId page = extras->items[lastWritingSlice].page;
- assert(lastWritingSlice >= 0 && page);
+ Ipc::Mem::PageId page = pageForSlice(lastWritingSlice);
debugs(20, 7, "entry " << e << " slice " << lastWritingSlice << " has " <<
page);
anchor.basics.swap_file_sz = e.mem_obj->memCache.offset;
}
+/// starts checking with the entry chain slice at a given offset and
+/// returns a not-full (but not necessarily empty) slice, updating sliceOffset
+Ipc::StoreMap::Slice &
+MemStore::nextAppendableSlice(const sfileno fileNo, sfileno &sliceOffset)
+{
+ // allocate the very first slot for the entry if needed
+ if (sliceOffset < 0) {
+ Ipc::StoreMapAnchor &anchor = map->writeableEntry(fileNo);
+ Must(anchor.start < 0);
+ Ipc::Mem::PageId page;
+ sliceOffset = reserveSapForWriting(page); // throws
+ extras->items[sliceOffset].page = page;
+ anchor.start = sliceOffset;
+ }
+
+ const size_t sliceCapacity = Ipc::Mem::PageSize();
+ do {
+ Ipc::StoreMap::Slice &slice = map->writeableSlice(fileNo, sliceOffset);
+
+ if (slice.size >= sliceCapacity) {
+ if (slice.next >= 0) {
+ sliceOffset = slice.next;
+ continue;
+ }
+
+ Ipc::Mem::PageId page;
+ slice.next = sliceOffset = reserveSapForWriting(page);
+ extras->items[sliceOffset].page = page;
+ debugs(20, 7, "entry " << fileNo << " new slice: " << sliceOffset);
+ continue; // to get and return the slice at the new sliceOffset
+ }
+
+ return slice;
+ } while (true);
+ /* not reached */
+}
+
+/// safely returns a previously allocated memory page for the given entry slice
+Ipc::Mem::PageId
+MemStore::pageForSlice(Ipc::StoreMapSliceId sliceId)
+{
+ Must(extras);
+ Must(sliceId >= 0);
+ Ipc::Mem::PageId page = extras->items[sliceId].page;
+ Must(page);
+ return page;
+}
+
/// finds a slot and a free page to fill or throws
sfileno
MemStore::reserveSapForWriting(Ipc::Mem::PageId &page)
if (e.mem_obj && e.mem_obj->memCache.index >= 0) {
map->freeEntry(e.mem_obj->memCache.index);
disconnect(e);
- } else {
+ } else if (map) {
// the entry may have been loaded and then disconnected from the cache
map->freeEntryByKey(reinterpret_cast<cache_key*>(e.key));
}
{
// decide whether to use a shared memory cache if the user did not specify
if (!Config.memShared.configured()) {
- Config.memShared.configure(Ipc::Atomic::Enabled() &&
- Ipc::Mem::Segment::Enabled() && UsingSmp() &&
+ Config.memShared.configure(Ipc::Mem::Segment::Enabled() && UsingSmp() &&
Config.memMaxSize > 0);
- } else if (Config.memShared && !Ipc::Atomic::Enabled()) {
- // bail if the user wants shared memory cache but we cannot support it
- fatal("memory_cache_shared is on, but no support for atomic operations detected");
} else if (Config.memShared && !Ipc::Mem::Segment::Enabled()) {
fatal("memory_cache_shared is on, but no support for shared memory detected");
} else if (Config.memShared && !UsingSmp()) {