/*
- * Copyright (C) 1996-2018 The Squid Software Foundation and contributors
+ * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
*
* Squid software is distributed under GPLv2+ license and includes
* contributions from numerous individuals and organizations.
#include "squid.h"
#include "base/AsyncJobCalls.h"
+#include "debug/Messages.h"
#include "fs/rock/RockDbCell.h"
#include "fs/rock/RockRebuild.h"
#include "fs/rock/RockSwapDir.h"
#include "fs_io.h"
#include "globals.h"
-#include "ipc/StoreMap.h"
#include "md5.h"
-#include "SquidTime.h"
+#include "sbuf/Stream.h"
#include "Store.h"
-#include "store_rebuild.h"
#include "tools.h"
+#include <array>
#include <cerrno>
+#include <cstring>
CBDATA_NAMESPACED_CLASS_INIT(Rock, Rebuild);
\defgroup RockFsRebuild Rock Store Rebuild
\ingroup Filesystems
*
- \section Overview Overview
+ \section RockFsRebuildOverview Overview
* Several layers of information are manipualted during the rebuild:
\par
* Store Entry: Response message plus all the metainformation associated with
namespace Rock
{
+static bool
+DoneLoading(const int64_t loadingPos, const int64_t dbSlotLimit)
+{
+ return loadingPos >= dbSlotLimit;
+}
+
+static bool
+DoneValidating(const int64_t validationPos, const int64_t dbSlotLimit, const int64_t dbEntryLimit)
+{
+ // paranoid slot checking is only enabled with squid -S
+ const auto extraWork = opt_store_doublecheck ? dbSlotLimit : 0;
+ return validationPos >= (dbEntryLimit + extraWork);
+}
+
/// low-level anti-padding storage class for LoadingEntry and LoadingSlot flags
class LoadingFlags
{
class LoadingParts
{
public:
- LoadingParts(int dbSlotLimit, int dbEntryLimit);
- LoadingParts(LoadingParts&&) = delete; // paranoid (often too huge to copy)
+ using Sizes = Ipc::StoreMapItems<uint64_t>;
+ using Versions = Ipc::StoreMapItems<uint32_t>;
+ using Mores = Ipc::StoreMapItems<Ipc::StoreMapSliceId>;
+ using Flags = Ipc::StoreMapItems<LoadingFlags>;
-private:
- friend class LoadingEntry;
- friend class LoadingSlot;
+ LoadingParts(const SwapDir &dir, const bool resuming);
+ ~LoadingParts();
+
+ // lacking copying/moving code and often too huge to copy
+ LoadingParts(LoadingParts&&) = delete;
+ Sizes &sizes() const { return *sizesOwner->object(); }
+ Versions &versions() const { return *versionsOwner->object(); }
+ Mores &mores() const { return *moresOwner->object(); }
+ Flags &flags() const { return *flagsOwner->object(); }
+
+private:
/* Anti-padding storage. With millions of entries, padding matters! */
/* indexed by sfileno */
- std::vector<uint64_t> sizes; ///< LoadingEntry::size for all entries
- std::vector<uint32_t> versions; ///< LoadingEntry::version for all entries
+ Sizes::Owner *sizesOwner; ///< LoadingEntry::size for all entries
+ Versions::Owner *versionsOwner; ///< LoadingEntry::version for all entries
/* indexed by SlotId */
- std::vector<Ipc::StoreMapSliceId> mores; ///< LoadingSlot::more for all slots
+ Mores::Owner *moresOwner; ///< LoadingSlot::more for all slots
/* entry flags are indexed by sfileno; slot flags -- by SlotId */
- std::vector<LoadingFlags> flags; ///< all LoadingEntry and LoadingSlot flags
+ Flags::Owner *flagsOwner; ///< all LoadingEntry and LoadingSlot flags
};
} /* namespace Rock */
/* LoadingEntry */
Rock::LoadingEntry::LoadingEntry(const sfileno fileNo, LoadingParts &source):
- size(source.sizes.at(fileNo)),
- version(source.versions.at(fileNo)),
- flags(source.flags.at(fileNo))
+ size(source.sizes().at(fileNo)),
+ version(source.versions().at(fileNo)),
+ flags(source.flags().at(fileNo))
{
}
/* LoadingSlot */
Rock::LoadingSlot::LoadingSlot(const SlotId slotId, LoadingParts &source):
- more(source.mores.at(slotId)),
- flags(source.flags.at(slotId))
+ more(source.mores().at(slotId)),
+ flags(source.flags().at(slotId))
{
}
/* LoadingParts */
-Rock::LoadingParts::LoadingParts(const int dbEntryLimit, const int dbSlotLimit):
- sizes(dbEntryLimit, 0),
- versions(dbEntryLimit, 0),
- mores(dbSlotLimit, -1),
- flags(dbSlotLimit)
+template <class T>
+inline typename T::Owner *
+createOwner(const char *dirPath, const char *sfx, const int64_t limit, const bool resuming)
{
- assert(sizes.size() == versions.size()); // every entry has both fields
- assert(sizes.size() <= mores.size()); // every entry needs slot(s)
- assert(mores.size() == flags.size()); // every slot needs a set of flags
+ auto id = Ipc::Mem::Segment::Name(SBuf(dirPath), sfx);
+ return resuming ? Ipc::Mem::Owner<T>::Old(id.c_str()) : shm_new(T)(id.c_str(), limit);
+}
+
+Rock::LoadingParts::LoadingParts(const SwapDir &dir, const bool resuming):
+ sizesOwner(createOwner<Sizes>(dir.path, "rebuild_sizes", dir.entryLimitActual(), resuming)),
+ versionsOwner(createOwner<Versions>(dir.path, "rebuild_versions", dir.entryLimitActual(), resuming)),
+ moresOwner(createOwner<Mores>(dir.path, "rebuild_mores", dir.slotLimitActual(), resuming)),
+ flagsOwner(createOwner<Flags>(dir.path, "rebuild_flags", dir.slotLimitActual(), resuming))
+{
+ assert(sizes().capacity == versions().capacity); // every entry has both fields
+ assert(sizes().capacity <= mores().capacity); // every entry needs slot(s)
+ assert(mores().capacity == flags().capacity); // every slot needs a set of flags
+
+ if (!resuming) {
+ // other parts rely on shared memory segments being zero-initialized
+ // TODO: refactor the next slot pointer to use 0 for nil values
+ mores().fill(-1);
+ }
+}
+
+Rock::LoadingParts::~LoadingParts()
+{
+ delete sizesOwner;
+ delete versionsOwner;
+ delete moresOwner;
+ delete flagsOwner;
+}
+
+/* Rock::Rebuild::Stats */
+
+SBuf
+Rock::Rebuild::Stats::Path(const char *dirPath)
+{
+ return Ipc::Mem::Segment::Name(SBuf(dirPath), "rebuild_stats");
+}
+
+Ipc::Mem::Owner<Rock::Rebuild::Stats>*
+Rock::Rebuild::Stats::Init(const SwapDir &dir)
+{
+ return shm_new(Stats)(Path(dir.path).c_str());
+}
+
+bool
+Rock::Rebuild::Stats::completed(const SwapDir &dir) const
+{
+ return DoneLoading(counts.scancount, dir.slotLimitActual()) &&
+ DoneValidating(counts.validations, dir.slotLimitActual(), dir.entryLimitActual());
}
/* Rebuild */
-Rock::Rebuild::Rebuild(SwapDir *dir): AsyncJob("Rock::Rebuild"),
+bool
+Rock::Rebuild::IsResponsible(const SwapDir &)
+{
+ // in SMP mode, only the disker is responsible for populating the map
+ return !UsingSmp() || IamDiskProcess();
+}
+
+bool
+Rock::Rebuild::Start(SwapDir &dir)
+{
+ if (!IsResponsible(dir)) {
+ debugs(47, 2, "not responsible for indexing cache_dir #" <<
+ dir.index << " from " << dir.filePath);
+ return false;
+ }
+
+ const auto stats = shm_old(Rebuild::Stats)(Stats::Path(dir.path).c_str());
+ if (stats->completed(dir)) {
+ debugs(47, 2, "already indexed cache_dir #" <<
+ dir.index << " from " << dir.filePath);
+ return false;
+ }
+
+ AsyncJob::Start(new Rebuild(&dir, stats));
+ return true;
+}
+
+Rock::Rebuild::Rebuild(SwapDir *dir, const Ipc::Mem::Pointer<Stats> &s): AsyncJob("Rock::Rebuild"),
sd(dir),
parts(nullptr),
+ stats(s),
dbSize(0),
dbSlotSize(0),
dbSlotLimit(0),
dbEntryLimit(0),
fd(-1),
dbOffset(0),
- loadingPos(0),
- validationPos(0)
+ loadingPos(stats->counts.scancount),
+ validationPos(stats->counts.validations),
+ counts(stats->counts),
+ resuming(stats->counts.started())
{
assert(sd);
- memset(&counts, 0, sizeof(counts));
dbSize = sd->diskOffsetLimit(); // we do not care about the trailer waste
dbSlotSize = sd->slotSize;
dbEntryLimit = sd->entryLimitActual();
dbSlotLimit = sd->slotLimitActual();
assert(dbEntryLimit <= dbSlotLimit);
+ registerRunner();
}
Rock::Rebuild::~Rebuild()
{
if (fd >= 0)
file_close(fd);
+ // normally, segments are used until the Squid instance quits,
+ // but these indexing-only segments are no longer needed
delete parts;
}
+void
+Rock::Rebuild::startShutdown()
+{
+ mustStop("startShutdown");
+}
+
/// prepares and initiates entry loading sequence
void
Rock::Rebuild::start()
{
- // in SMP mode, only the disker is responsible for populating the map
- if (UsingSmp() && !IamDiskProcess()) {
- debugs(47, 2, "Non-disker skips rebuilding of cache_dir #" <<
- sd->index << " from " << sd->filePath);
- mustStop("non-disker");
- return;
- }
+ assert(IsResponsible(*sd));
- debugs(47, DBG_IMPORTANT, "Loading cache_dir #" << sd->index <<
- " from " << sd->filePath);
+ if (!resuming) {
+ debugs(47, Important(18), "Loading cache_dir #" << sd->index <<
+ " from " << sd->filePath);
+ } else {
+ debugs(47, Important(63), "Resuming indexing cache_dir #" << sd->index <<
+ " from " << sd->filePath << ':' << progressDescription());
+ }
fd = file_open(sd->filePath, O_RDONLY | O_BINARY);
if (fd < 0)
assert(sizeof(DbCellHeader) < SM_PAGE_SIZE);
buf.init(SM_PAGE_SIZE, SM_PAGE_SIZE);
- dbOffset = SwapDir::HeaderSize;
+ dbOffset = SwapDir::HeaderSize + loadingPos * dbSlotSize;
- parts = new LoadingParts(dbEntryLimit, dbSlotLimit);
+ assert(!parts);
+ parts = new LoadingParts(*sd, resuming);
+
+ counts.updateStartTime(current_time);
checkpoint();
}
bool
Rock::Rebuild::doneLoading() const
{
- return loadingPos >= dbSlotLimit;
+ return DoneLoading(loadingPos, dbSlotLimit);
}
bool
Rock::Rebuild::doneValidating() const
{
- // paranoid slot checking is only enabled with squid -S
- return validationPos >= dbEntryLimit +
- (opt_store_doublecheck ? dbSlotLimit : 0);
+ return DoneValidating(validationPos, dbSlotLimit, dbEntryLimit);
}
bool
const int maxSpentMsec = 50; // keep small: most RAM I/Os are under 1ms
const timeval loopStart = current_time;
- int loaded = 0;
+ int64_t loaded = 0;
while (!doneLoading()) {
loadOneSlot();
dbOffset += dbSlotSize;
getCurrentTime();
const double elapsedMsec = tvSubMsec(loopStart, current_time);
if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) {
- debugs(47, 5, HERE << "pausing after " << loaded << " entries in " <<
+ debugs(47, 5, "pausing after " << loaded << " entries in " <<
elapsedMsec << "ms; " << (elapsedMsec/loaded) << "ms per entry");
break;
}
debugs(47,5, sd->index << " slot " << loadingPos << " at " <<
dbOffset << " <= " << dbSize);
+ // increment before loadingPos to avoid getting stuck at a slot
+ // in a case of crash
++counts.scancount;
if (lseek(fd, dbOffset, SEEK_SET) < 0)
useNewSlot(slotId, header);
}
+/// whether the given slot buffer is likely to have nothing but zeros, as is
+/// common to slots in pre-initialized (with zeros) db files
+static bool
+ZeroedSlot(const MemBuf &buf)
+{
+ // We could memcmp the entire buffer, but it is probably safe enough to test
+ // a few bytes because even if we do not detect a corrupted entry, it is not
+ // a big deal: Store::UnpackPrefix() rejects all-0s metadata prefix.
+ static const std::array<char, 10> zeros = {};
+
+ if (static_cast<size_t>(buf.contentSize()) < zeros.size())
+ return false; // cannot be sure enough
+
+ return memcmp(buf.content(), zeros.data(), zeros.size()) == 0;
+}
+
/// parse StoreEntry basics and add them to the map, returning true on success
bool
Rock::Rebuild::importEntry(Ipc::StoreMapAnchor &anchor, const sfileno fileno, const DbCellHeader &header)
StoreEntry loadedE;
const uint64_t knownSize = header.entrySize > 0 ?
header.entrySize : anchor.basics.swap_file_sz.load();
+
+ if (ZeroedSlot(buf))
+ return false;
+
if (!storeRebuildParseEntry(buf, loadedE, key, counts, knownSize))
return false;
const int maxSpentMsec = 50; // keep small: validation does not do I/O
const timeval loopStart = current_time;
- int validated = 0;
+ int64_t validated = 0;
while (!doneValidating()) {
+ // increment before validationPos to avoid getting stuck at a slot
+ // in a case of crash
+ ++counts.validations;
if (validationPos < dbEntryLimit)
validateOneEntry(validationPos);
else
void
Rock::Rebuild::swanSong()
{
- debugs(47,3, HERE << "cache_dir #" << sd->index << " rebuild level: " <<
+ debugs(47,3, "cache_dir #" << sd->index << " rebuild level: " <<
StoreController::store_dirs_rebuilding);
- --StoreController::store_dirs_rebuilding;
storeRebuildComplete(&counts);
}
}
Ipc::Mem::PageId pageId;
- pageId.pool = sd->index+1;
+ pageId.pool = Ipc::Mem::PageStack::IdForSwapDirSpace(sd->index);
pageId.number = slotId+1;
sd->freeSlots->push(pageId);
}
}
}
+SBuf
+Rock::Rebuild::progressDescription() const
+{
+ SBufStream str;
+
+ str << Debug::Extra << "slots loaded: " << Progress(loadingPos, dbSlotLimit);
+
+ const auto validatingEntries = validationPos < dbEntryLimit;
+ const auto entriesValidated = validatingEntries ? validationPos : dbEntryLimit;
+ str << Debug::Extra << "entries validated: " << Progress(entriesValidated, dbEntryLimit);
+ if (opt_store_doublecheck) {
+ const auto slotsValidated = validatingEntries ? 0 : (validationPos - dbEntryLimit);
+ str << Debug::Extra << "slots validated: " << Progress(slotsValidated, dbSlotLimit);
+ }
+
+ return str.buf();
+}
+