#include "fs/rock/RockSwapDir.h"
#include "fs_io.h"
#include "globals.h"
-#include "ipc/StoreMap.h"
#include "md5.h"
+#include "sbuf/Stream.h"
#include "SquidTime.h"
#include "Store.h"
-#include "store_rebuild.h"
#include "tools.h"
#include <cerrno>
namespace Rock
{
+static bool
+DoneLoading(const int64_t loadingPos, const int64_t dbSlotLimit)
+{
+ return loadingPos >= dbSlotLimit;
+}
+
+static bool
+DoneValidating(const int64_t validationPos, const int64_t dbSlotLimit, const int64_t dbEntryLimit)
+{
+ // paranoid slot checking is only enabled with squid -S
+ const auto extraWork = opt_store_doublecheck ? dbSlotLimit : 0;
+ return validationPos >= (dbEntryLimit + extraWork);
+}
+
/// low-level anti-padding storage class for LoadingEntry and LoadingSlot flags
class LoadingFlags
{
class LoadingParts
{
public:
- LoadingParts(int dbSlotLimit, int dbEntryLimit);
- LoadingParts(LoadingParts&&) = delete; // paranoid (often too huge to copy)
+ using Sizes = Ipc::StoreMapItems<uint64_t>;
+ using Versions = Ipc::StoreMapItems<uint32_t>;
+ using Mores = Ipc::StoreMapItems<Ipc::StoreMapSliceId>;
+ using Flags = Ipc::StoreMapItems<LoadingFlags>;
-private:
- friend class LoadingEntry;
- friend class LoadingSlot;
+ LoadingParts(const SwapDir &dir, const bool resuming);
+ ~LoadingParts();
+ // lacking copying/moving code and often too huge to copy
+ LoadingParts(LoadingParts&&) = delete;
+
+ Sizes &sizes() const { return *sizesOwner->object(); }
+ Versions &versions() const { return *versionsOwner->object(); }
+ Mores &mores() const { return *moresOwner->object(); }
+ Flags &flags() const { return *flagsOwner->object(); }
+
+private:
/* Anti-padding storage. With millions of entries, padding matters! */
/* indexed by sfileno */
- std::vector<uint64_t> sizes; ///< LoadingEntry::size for all entries
- std::vector<uint32_t> versions; ///< LoadingEntry::version for all entries
+ Sizes::Owner *sizesOwner; ///< LoadingEntry::size for all entries
+ Versions::Owner *versionsOwner; ///< LoadingEntry::version for all entries
/* indexed by SlotId */
- std::vector<Ipc::StoreMapSliceId> mores; ///< LoadingSlot::more for all slots
+ Mores::Owner *moresOwner; ///< LoadingSlot::more for all slots
/* entry flags are indexed by sfileno; slot flags -- by SlotId */
- std::vector<LoadingFlags> flags; ///< all LoadingEntry and LoadingSlot flags
+ Flags::Owner *flagsOwner; ///< all LoadingEntry and LoadingSlot flags
};
} /* namespace Rock */
/* LoadingEntry */
Rock::LoadingEntry::LoadingEntry(const sfileno fileNo, LoadingParts &source):
- size(source.sizes.at(fileNo)),
- version(source.versions.at(fileNo)),
- flags(source.flags.at(fileNo))
+ size(source.sizes().at(fileNo)),
+ version(source.versions().at(fileNo)),
+ flags(source.flags().at(fileNo))
{
}
/* LoadingSlot */
Rock::LoadingSlot::LoadingSlot(const SlotId slotId, LoadingParts &source):
- more(source.mores.at(slotId)),
- flags(source.flags.at(slotId))
+ more(source.mores().at(slotId)),
+ flags(source.flags().at(slotId))
{
}
/* LoadingParts */
-Rock::LoadingParts::LoadingParts(const int dbEntryLimit, const int dbSlotLimit):
- sizes(dbEntryLimit, 0),
- versions(dbEntryLimit, 0),
- mores(dbSlotLimit, -1),
- flags(dbSlotLimit)
+template <class T>
+inline typename T::Owner *
+createOwner(const char *dirPath, const char *sfx, const int64_t limit, const bool resuming)
+{
+ auto id = Ipc::Mem::Segment::Name(SBuf(dirPath), sfx);
+ return resuming ? Ipc::Mem::Owner<T>::Old(id.c_str()) : shm_new(T)(id.c_str(), limit);
+}
+
+Rock::LoadingParts::LoadingParts(const SwapDir &dir, const bool resuming):
+ sizesOwner(createOwner<Sizes>(dir.path, "rebuild_sizes", dir.entryLimitActual(), resuming)),
+ versionsOwner(createOwner<Versions>(dir.path, "rebuild_versions", dir.entryLimitActual(), resuming)),
+ moresOwner(createOwner<Mores>(dir.path, "rebuild_mores", dir.slotLimitActual(), resuming)),
+ flagsOwner(createOwner<Flags>(dir.path, "rebuild_flags", dir.slotLimitActual(), resuming))
{
- assert(sizes.size() == versions.size()); // every entry has both fields
- assert(sizes.size() <= mores.size()); // every entry needs slot(s)
- assert(mores.size() == flags.size()); // every slot needs a set of flags
+ assert(sizes().capacity == versions().capacity); // every entry has both fields
+ assert(sizes().capacity <= mores().capacity); // every entry needs slot(s)
+ assert(mores().capacity == flags().capacity); // every slot needs a set of flags
+
+ if (!resuming) {
+ // other parts rely on shared memory segments being zero-initialized
+ // TODO: refactor the next slot pointer to use 0 for nil values
+ mores().fill(-1);
+ }
+}
+
+Rock::LoadingParts::~LoadingParts()
+{
+ delete sizesOwner;
+ delete versionsOwner;
+ delete moresOwner;
+ delete flagsOwner;
+}
+
+/* Rock::Rebuild::Stats */
+
+SBuf
+Rock::Rebuild::Stats::Path(const char *dirPath)
+{
+ return Ipc::Mem::Segment::Name(SBuf(dirPath), "rebuild_stats");
+}
+
+Ipc::Mem::Owner<Rock::Rebuild::Stats>*
+Rock::Rebuild::Stats::Init(const SwapDir &dir)
+{
+ return shm_new(Stats)(Path(dir.path).c_str());
+}
+
+bool
+Rock::Rebuild::Stats::completed(const SwapDir &sd) const
+{
+ return DoneLoading(counts.scancount, sd.slotLimitActual()) &&
+ DoneValidating(counts.validations, sd.slotLimitActual(), sd.entryLimitActual());
}
/* Rebuild */
-Rock::Rebuild::Rebuild(SwapDir *dir): AsyncJob("Rock::Rebuild"),
+bool
+Rock::Rebuild::IsResponsible(const SwapDir &sd)
+{
+ // in SMP mode, only the disker is responsible for populating the map
+ return !UsingSmp() || IamDiskProcess();
+}
+
+bool
+Rock::Rebuild::Start(SwapDir &dir)
+{
+ if (!IsResponsible(dir)) {
+ debugs(47, 2, "not responsible for indexing cache_dir #" <<
+ dir.index << " from " << dir.filePath);
+ return false;
+ }
+
+ const auto stats = shm_old(Rebuild::Stats)(Stats::Path(dir.path).c_str());
+ if (stats->completed(dir)) {
+ debugs(47, 2, "already indexed cache_dir #" <<
+ dir.index << " from " << dir.filePath);
+ return false;
+ }
+
+ Must(AsyncJob::Start(new Rebuild(&dir, stats)));
+ return true;
+}
+
+Rock::Rebuild::Rebuild(SwapDir *dir, const Ipc::Mem::Pointer<Stats> &s): AsyncJob("Rock::Rebuild"),
sd(dir),
parts(nullptr),
+ stats(s),
dbSize(0),
dbSlotSize(0),
dbSlotLimit(0),
dbEntryLimit(0),
fd(-1),
dbOffset(0),
- loadingPos(0),
- validationPos(0)
+ loadingPos(stats->counts.scancount),
+ validationPos(stats->counts.validations),
+ counts(stats->counts),
+ resuming(stats->counts.started())
{
assert(sd);
dbSize = sd->diskOffsetLimit(); // we do not care about the trailer waste
dbEntryLimit = sd->entryLimitActual();
dbSlotLimit = sd->slotLimitActual();
assert(dbEntryLimit <= dbSlotLimit);
+ registerRunner();
}
Rock::Rebuild::~Rebuild()
{
if (fd >= 0)
file_close(fd);
+ // normally, segments are used until the Squid instance quits,
+ // but these indexing-only segments are no longer needed
delete parts;
}
+void
+Rock::Rebuild::startShutdown()
+{
+ mustStop("startShutdown");
+}
+
/// prepares and initiates entry loading sequence
void
Rock::Rebuild::start()
{
- // in SMP mode, only the disker is responsible for populating the map
- if (UsingSmp() && !IamDiskProcess()) {
- debugs(47, 2, "Non-disker skips rebuilding of cache_dir #" <<
- sd->index << " from " << sd->filePath);
- mustStop("non-disker");
- return;
- }
+ assert(IsResponsible(*sd));
- debugs(47, DBG_IMPORTANT, "Loading cache_dir #" << sd->index <<
- " from " << sd->filePath);
+ if (!resuming) {
+ debugs(47, DBG_IMPORTANT, "Loading cache_dir #" << sd->index <<
+ " from " << sd->filePath);
+ } else {
+ debugs(47, DBG_IMPORTANT, "Resuming indexing cache_dir #" << sd->index <<
+ " from " << sd->filePath << ':' << progressDescription());
+ }
fd = file_open(sd->filePath, O_RDONLY | O_BINARY);
if (fd < 0)
assert(sizeof(DbCellHeader) < SM_PAGE_SIZE);
buf.init(SM_PAGE_SIZE, SM_PAGE_SIZE);
- dbOffset = SwapDir::HeaderSize;
+ dbOffset = SwapDir::HeaderSize + loadingPos * dbSlotSize;
- parts = new LoadingParts(dbEntryLimit, dbSlotLimit);
+ assert(!parts);
+ parts = new LoadingParts(*sd, resuming);
+
+ counts.updateStartTime(current_time);
checkpoint();
}
bool
Rock::Rebuild::doneLoading() const
{
- return loadingPos >= dbSlotLimit;
+ return DoneLoading(loadingPos, dbSlotLimit);
}
bool
Rock::Rebuild::doneValidating() const
{
- // paranoid slot checking is only enabled with squid -S
- return validationPos >= dbEntryLimit +
- (opt_store_doublecheck ? dbSlotLimit : 0);
+ return DoneValidating(validationPos, dbSlotLimit, dbEntryLimit);
}
bool
const int maxSpentMsec = 50; // keep small: most RAM I/Os are under 1ms
const timeval loopStart = current_time;
- int loaded = 0;
+ int64_t loaded = 0;
while (!doneLoading()) {
loadOneSlot();
dbOffset += dbSlotSize;
debugs(47,5, sd->index << " slot " << loadingPos << " at " <<
dbOffset << " <= " << dbSize);
+ // increment before loadingPos to avoid getting stuck at a slot
+ // in a case of crash
++counts.scancount;
if (lseek(fd, dbOffset, SEEK_SET) < 0)
const int maxSpentMsec = 50; // keep small: validation does not do I/O
const timeval loopStart = current_time;
- int validated = 0;
+ int64_t validated = 0;
while (!doneValidating()) {
+ // increment before validationPos to avoid getting stuck at a slot
+ // in a case of crash
+ ++counts.validations;
if (validationPos < dbEntryLimit)
validateOneEntry(validationPos);
else
{
debugs(47,3, HERE << "cache_dir #" << sd->index << " rebuild level: " <<
StoreController::store_dirs_rebuilding);
- --StoreController::store_dirs_rebuilding;
storeRebuildComplete(&counts);
}
}
}
+SBuf
+Rock::Rebuild::progressDescription() const
+{
+ SBufStream str;
+
+ str << Debug::Extra << "slots loaded: " << Progress(loadingPos, dbSlotLimit);
+
+ const auto validatingEntries = validationPos < dbEntryLimit;
+ const auto entriesValidated = validatingEntries ? validationPos : dbEntryLimit;
+ str << Debug::Extra << "entries validated: " << Progress(entriesValidated, dbEntryLimit);
+ if (opt_store_doublecheck) {
+ const auto slotsValidated = validatingEntries ? 0 : (validationPos - dbEntryLimit);
+ str << Debug::Extra << "slots validated: " << Progress(slotsValidated, dbSlotLimit);
+ }
+
+ return str.buf();
+}
+
#define SQUID_FS_ROCK_REBUILD_H
#include "base/AsyncJob.h"
+#include "base/RunnersRegistry.h"
#include "cbdata.h"
#include "fs/rock/forward.h"
+#include "ipc/mem/Pointer.h"
+#include "ipc/StoreMap.h"
#include "MemBuf.h"
#include "store_rebuild.h"
/// \ingroup Rock
/// manages store rebuild process: loading meta information from db on disk
-class Rebuild: public AsyncJob
+class Rebuild: public AsyncJob, private IndependentRunner
{
CBDATA_CHILD(Rebuild);
public:
- Rebuild(SwapDir *dir);
- virtual ~Rebuild() override;
+ /// cache_dir indexing statistics shared across same-kid process restarts
+ class Stats
+ {
+ public:
+ static SBuf Path(const char *dirPath);
+ static Ipc::Mem::Owner<Stats> *Init(const SwapDir &);
+
+ static size_t SharedMemorySize() { return sizeof(Stats); }
+ size_t sharedMemorySize() const { return SharedMemorySize(); }
+
+ /// whether the rebuild is finished already
+ bool completed(const SwapDir &) const;
+
+ StoreRebuildData counts;
+ };
+
+ /// starts indexing the given cache_dir if that indexing is necessary
+ /// \returns whether the indexing was necessary (and, hence, started)
+ static bool Start(SwapDir &dir);
protected:
+ /// whether the current kid is responsible for rebuilding this db file
+ static bool IsResponsible(const SwapDir &);
+
+ Rebuild(SwapDir *dir, const Ipc::Mem::Pointer<Stats> &);
+ virtual ~Rebuild() override;
+
+ /* Registered Runner API */
+ virtual void startShutdown() override;
+
/* AsyncJob API */
virtual void start() override;
virtual bool doneAll() const override;
bool sameEntry(const sfileno fileno, const DbCellHeader &header) const;
+ SBuf progressDescription() const;
+
SwapDir *sd;
LoadingParts *parts; ///< parts of store entries being loaded from disk
+ Ipc::Mem::Pointer<Stats> stats; ///< indexing statistics in shared memory
+
int64_t dbSize;
int dbSlotSize; ///< the size of a db cell, including the cell header
- int dbSlotLimit; ///< total number of db cells
- int dbEntryLimit; ///< maximum number of entries that can be stored in db
+ int64_t dbSlotLimit; ///< total number of db cells
+ int64_t dbEntryLimit; ///< maximum number of entries that can be stored in db
int fd; // store db file descriptor
- int64_t dbOffset;
- sfileno loadingPos; ///< index of the db slot being loaded from disk now
- sfileno validationPos; ///< index of the loaded db slot being validated now
+ int64_t dbOffset; // TODO: calculate in a method, using loadingPos
+ int64_t loadingPos; ///< index of the db slot being loaded from disk now
+ int64_t validationPos; ///< index of the loaded db slot being validated now
MemBuf buf; ///< space to load current db slot (and entry metadata) into
- StoreRebuildData counts;
+ StoreRebuildData &counts; ///< a reference to the shared memory counters
+
+ /// whether we have started indexing this cache_dir before,
+ /// presumably in the previous process performing the same-kid role
+ const bool resuming;
static void Steps(void *data);
};
#include "fs/rock/RockHeaderUpdater.h"
#include "fs/rock/RockIoRequests.h"
#include "fs/rock/RockIoState.h"
-#include "fs/rock/RockRebuild.h"
#include "fs/rock/RockSwapDir.h"
#include "globals.h"
#include "ipc/mem/Pages.h"
theFile = io->newFile(filePath);
theFile->configure(fileConfig);
theFile->open(O_RDWR, 0644, this);
-
- // Increment early. Otherwise, if one SwapDir finishes rebuild before
- // others start, storeRebuildComplete() will think the rebuild is over!
- // TODO: move store_dirs_rebuilding hack to store modules that need it.
- ++StoreController::store_dirs_rebuilding;
}
bool
}
}
-void
-Rock::SwapDir::rebuild()
-{
- //++StoreController::store_dirs_rebuilding; // see Rock::SwapDir::init()
- AsyncJob::Start(new Rebuild(this));
-}
-
bool
Rock::SwapDir::canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const
{
std::setw(7) << map->entryLimit() << " entries, and " <<
std::setw(7) << map->sliceLimit() << " slots");
- rebuild();
+ if (!Rebuild::Start(*this))
+ storeRebuildComplete(nullptr);
}
void
Must(mapOwners.empty() && freeSlotsOwners.empty());
for (int i = 0; i < Config.cacheSwap.n_configured; ++i) {
if (const Rock::SwapDir *const sd = dynamic_cast<Rock::SwapDir *>(INDEXSD(i))) {
+ rebuildStatsOwners.push_back(Rebuild::Stats::Init(*sd));
+
const int64_t capacity = sd->slotLimitActual();
SwapDir::DirMap::Owner *const mapOwner =
Rock::SwapDirRr::~SwapDirRr()
{
for (size_t i = 0; i < mapOwners.size(); ++i) {
+ delete rebuildStatsOwners[i];
delete mapOwners[i];
delete freeSlotsOwners[i];
}
#include "DiskIO/IORequestor.h"
#include "fs/rock/forward.h"
#include "fs/rock/RockDbCell.h"
+#include "fs/rock/RockRebuild.h"
#include "ipc/mem/Page.h"
#include "ipc/mem/PageStack.h"
#include "ipc/StoreMap.h"
#include "store/Disk.h"
+#include "store_rebuild.h"
#include <vector>
class DiskIOStrategy;
bool parseSizeOption(char const *option, const char *value, int reconfiguring);
void dumpSizeOption(StoreEntry * e) const;
- void rebuild(); ///< starts loading and validating stored entry metadata
-
bool full() const; ///< no more entries can be stored without purging
void trackReferences(StoreEntry &e); ///< add to replacement policy scope
void ignoreReferences(StoreEntry &e); ///< delete from repl policy scope
virtual void create();
private:
+ std::vector<Ipc::Mem::Owner<Rebuild::Stats> *> rebuildStatsOwners;
std::vector<SwapDir::DirMap::Owner *> mapOwners;
std::vector< Ipc::Mem::Owner<Ipc::Mem::PageStack> *> freeSlotsOwners;
};
if (!clean)
flags.need_to_validate = true;
+ counts.updateStartTime(current_time);
+
debugs(47, DBG_IMPORTANT, "Rebuilding storage in " << sd->path << " (" <<
(clean ? "clean log" : (LogParser ? "dirty log" : "no log")) << ")");
}
if (!rb->isDone() || reconfiguring)
eventAdd("storeRebuild", RebuildStep, rb, 0.01, 1);
else {
- -- StoreController::store_dirs_rebuilding;
storeRebuildComplete(&rb->counts);
delete rb;
}
void
Fs::Ufs::UFSSwapDir::rebuild()
{
- ++StoreController::store_dirs_rebuilding;
eventAdd("storeRebuild", Fs::Ufs::RebuildState::RebuildStep, new Fs::Ufs::RebuildState(this), 0.0, 1);
}
size_t sharedMemorySize() const { return SharedMemorySize(capacity); }
static size_t SharedMemorySize(const int aCapacity) { return sizeof(StoreMapItems<Item>) + aCapacity*sizeof(Item); }
+ Item &at(const int index)
+ {
+ assert(index >= 0);
+ assert(index < capacity);
+ return items[index];
+ }
+
+ const Item &at(const int index) const
+ {
+ return const_cast<StoreMapItems<C>&>(*this).at(index);
+ }
+
+ /// reset all items to the same value
+ void fill(const Item &value)
+ {
+ for (int index = 0; index < capacity; ++index)
+ items[index] = value;
+ }
+
const int capacity; ///< total number of items
Ipc::Mem::FlexibleArray<Item> items; ///< storage
};
static Owner *New(const char *const id, const P1 &p1, const P2 &p2, const P3 &p3);
template <class P1, class P2, class P3, class P4>
static Owner *New(const char *const id, const P1 &p1, const P2 &p2, const P3 &p3, const P4 &p4);
+ /// attaches to the existing shared memory segment, becoming its owner
+ static Owner *Old(const char *const id);
~Owner();
Class *object() { return theObject; }
private:
+ explicit Owner(const char *const id);
Owner(const char *const id, const off_t sharedSize);
// not implemented
Must(theSegment.mem());
}
+template <class Class>
+Owner<Class>::Owner(const char *const id):
+ theSegment(id), theObject(nullptr)
+{
+ theSegment.open(true);
+ Must(theSegment.mem());
+}
+
template <class Class>
Owner<Class>::~Owner()
{
theObject->~Class();
}
+template <class Class>
+Owner<Class> *
+Owner<Class>::Old(const char *const id)
+{
+ auto owner = new Owner(id);
+ owner->theObject = reinterpret_cast<Class*>(owner->theSegment.mem());
+ Must(static_cast<off_t>(owner->theObject->sharedMemorySize()) <= owner->theSegment.size());
+ return owner;
+}
+
template <class Class>
Owner<Class> *
Owner<Class>::New(const char *const id)
template <class Class>
Object<Class>::Object(const char *const id): theSegment(id)
{
- theSegment.open();
+ theSegment.open(false);
Must(theSegment.mem());
theObject = reinterpret_cast<Class*>(theSegment.mem());
Must(static_cast<off_t>(theObject->sharedMemorySize()) <= theSegment.size());
}
void
-Ipc::Mem::Segment::open()
+Ipc::Mem::Segment::open(const bool unlinkWhenDone)
{
assert(theFD < 0);
}
theSize = statSize("Ipc::Mem::Segment::open");
+ doUnlink = unlinkWhenDone;
debugs(54, 3, HERE << "opened " << theName << " segment: " << theSize);
/// Create a new shared memory segment. Unlinks the segment on destruction.
void create(const off_t aSize);
- void open(); ///< Open an existing shared memory segment.
+ /// opens an existing shared memory segment
+ /// \param unlinkWhenDone whether to delete the segment on destruction
+ void open(const bool unlinkWhenDone);
const String &name() { return theName; } ///< shared memory segment name
off_t size() { return theSize; } ///< shared memory segment size
#include "Store.h"
#include "store/Disk.h"
#include "store/Disks.h"
+#include "store_rebuild.h"
#include "swap_log_op.h"
#include "util.h" // for tvSubDsec() which should be in SquidTime.h
store_table = hash_create(storeKeyHashCmp,
store_hash_buckets, storeKeyHashHash);
+ // Increment _before_ any possible storeRebuildComplete() calls so that
+ // storeRebuildComplete() can reliably detect when all disks are done. The
+ // level is decremented in each corresponding storeRebuildComplete() call.
+ StoreController::store_dirs_rebuilding += Config.cacheSwap.n_configured;
+
for (int i = 0; i < Config.cacheSwap.n_configured; ++i) {
/* this starts a search of the store dirs, loading their
* index. under the new Store api this should be
*/
if (Dir(i).active())
store(i)->init();
+ else
+ storeRebuildComplete(nullptr);
}
if (strcasecmp(Config.store_dir_select_algorithm, "round-robin") == 0) {
static StoreRebuildData counts;
-static struct timeval rebuild_start;
static void storeCleanup(void *);
+// TODO: Either convert to Progress or replace with StoreRebuildData.
+// TODO: Handle unknown totals (UFS cache_dir that lost swap.state) correctly.
typedef struct {
/* total number of "swap.state" entries that will be read */
int total;
static store_rebuild_progress *RebuildProgress = NULL;
+void
+StoreRebuildData::updateStartTime(const timeval &dirStartTime)
+{
+ startTime = started() ? std::min(startTime, dirStartTime) : dirStartTime;
+}
+
static int
storeCleanupDoubleCheck(StoreEntry * e)
{
storeRebuildComplete(StoreRebuildData *dc)
{
- double dt;
- counts.objcount += dc->objcount;
- counts.expcount += dc->expcount;
- counts.scancount += dc->scancount;
- counts.clashcount += dc->clashcount;
- counts.dupcount += dc->dupcount;
- counts.cancelcount += dc->cancelcount;
- counts.invalid += dc->invalid;
- counts.badflags += dc->badflags;
- counts.bad_log_op += dc->bad_log_op;
- counts.zero_object_sz += dc->zero_object_sz;
+ if (dc) {
+ counts.objcount += dc->objcount;
+ counts.expcount += dc->expcount;
+ counts.scancount += dc->scancount;
+ counts.clashcount += dc->clashcount;
+ counts.dupcount += dc->dupcount;
+ counts.cancelcount += dc->cancelcount;
+ counts.invalid += dc->invalid;
+ counts.badflags += dc->badflags;
+ counts.bad_log_op += dc->bad_log_op;
+ counts.zero_object_sz += dc->zero_object_sz;
+ counts.validations += dc->validations;
+ counts.updateStartTime(dc->startTime);
+ }
+ // else the caller was not responsible for indexing its cache_dir
+
+ assert(StoreController::store_dirs_rebuilding > 1);
+ --StoreController::store_dirs_rebuilding;
+
/*
* When store_dirs_rebuilding == 1, it means we are done reading
* or scanning all cache_dirs. Now report the stats and start
if (StoreController::store_dirs_rebuilding > 1)
return;
- dt = tvSubDsec(rebuild_start, current_time);
+ const auto dt = tvSubDsec(counts.startTime, current_time);
debugs(20, DBG_IMPORTANT, "Finished rebuilding storage from disk.");
debugs(20, DBG_IMPORTANT, " " << std::setw(7) << counts.scancount << " Entries scanned");
storeRebuildStart(void)
{
counts = StoreRebuildData(); // reset counters
- rebuild_start = current_time;
/*
* Note: store_dirs_rebuilding is initialized to 1.
*
storeRebuildProgress(int sd_index, int total, int sofar)
{
static time_t last_report = 0;
+ // TODO: Switch to int64_t and fix handling of unknown totals.
double n = 0.0;
double d = 0.0;
d += (double) RebuildProgress[sd_index].total;
}
- debugs(20, DBG_IMPORTANT, "Store rebuilding is "<< std::setw(4)<< std::setprecision(2) << 100.0 * n / d << "% complete");
+ debugs(20, DBG_IMPORTANT, "Indexing cache entries: " << Progress(n, d));
last_report = squid_curtime;
}
+void
+Progress::print(std::ostream &os) const
+{
+ if (goal > 0) {
+ const auto percent = 100.0 * completed / goal;
+ os << std::setprecision(2) << percent << "% (" <<
+ completed << " out of " << goal << ")";
+ } else if (!completed && !goal) {
+ os << "nothing to do";
+ } else {
+ // unknown (i.e. negative) or buggy (i.e. zero when completed != 0) goal
+ os << completed;
+ }
+}
+
#include "fde.h"
#include "Generic.h"
#include "StoreMeta.h"
#include "store_key_md5.h"
+class MemBuf;
+
+/// cache_dir(s) indexing statistics
class StoreRebuildData
{
public:
+ /// maintain earliest initiation time across multiple indexing cache_dirs
+ void updateStartTime(const timeval &dirStartTime);
+
+ /// whether we have worked on indexing this(these) cache_dir(s) before
+ bool started() const { return startTime.tv_sec > 0; }
+
+ // when adding members, keep the class compatible with placement new onto a
+ // zeroed shared memory segment (see Rock::Rebuild::Stats usage)
+
int objcount = 0; /* # objects successfully reloaded */
int expcount = 0; /* # objects expired */
int scancount = 0; /* # entries scanned or read from state file */
int badflags = 0; /* # bad e->flags */
int bad_log_op = 0;
int zero_object_sz = 0;
+ int64_t validations = 0; ///< the number of validated cache entries, slots
+ timeval startTime = {}; ///< absolute time when the rebuild was initiated
+};
+
+/// advancement of work that consists of (usually known number) of similar steps
+class Progress
+{
+public:
+ Progress(const int64_t stepsCompleted, const int64_t stepsTotal):
+ completed(stepsCompleted), goal(stepsTotal) {}
+
+ /// brief progress report suitable for level-0/1 debugging
+ void print(std::ostream &os) const;
+
+ int64_t completed; ///< the number of finished work steps
+ int64_t goal; ///< the known total number of work steps (or negative)
};
+inline std::ostream &
+operator <<(std::ostream &os, const Progress &p)
+{
+ p.print(os);
+ return os;
+}
+
void storeRebuildStart(void);
void storeRebuildComplete(StoreRebuildData *);
void storeRebuildProgress(int sd_index, int total, int sofar);
#include "squid.h"
#include "MemBuf.h"
+#include "SquidTime.h"
#include "store/Controller.h"
#include "store_rebuild.h"
void storeRebuildProgress(int sd_index, int total, int sofar) STUB
bool storeRebuildParseEntry(MemBuf &, StoreEntry &, cache_key *, StoreRebuildData &, uint64_t) STUB_RETVAL(false)
+void StoreRebuildData::updateStartTime(const timeval &dirStartTime)
+{
+ startTime = started() ? std::min(startTime, dirStartTime) : dirStartTime;
+}
+
void storeRebuildComplete(StoreRebuildData *)
{
--StoreController::store_dirs_rebuilding;
+ if (StoreController::store_dirs_rebuilding == 1)
+ --StoreController::store_dirs_rebuilding; // normally in storeCleanup()
}
bool
return true;
}
+void Progress::print(std::ostream &) const STUB
+