]>
Commit | Line | Data |
---|---|---|
1 | /* | |
2 | * Copyright (C) 1996-2023 The Squid Software Foundation and contributors | |
3 | * | |
4 | * Squid software is distributed under GPLv2+ license and includes | |
5 | * contributions from numerous individuals and organizations. | |
6 | * Please see the COPYING and CONTRIBUTORS files for details. | |
7 | */ | |
8 | ||
9 | /* DEBUG: section 79 Disk IO Routines */ | |
10 | ||
11 | #include "squid.h" | |
12 | #include "base/AsyncJobCalls.h" | |
13 | #include "debug/Messages.h" | |
14 | #include "fs/rock/RockDbCell.h" | |
15 | #include "fs/rock/RockRebuild.h" | |
16 | #include "fs/rock/RockSwapDir.h" | |
17 | #include "fs_io.h" | |
18 | #include "globals.h" | |
19 | #include "md5.h" | |
20 | #include "sbuf/Stream.h" | |
21 | #include "Store.h" | |
22 | #include "tools.h" | |
23 | ||
24 | #include <array> | |
25 | #include <cerrno> | |
26 | #include <cstring> | |
27 | ||
28 | CBDATA_NAMESPACED_CLASS_INIT(Rock, Rebuild); | |
29 | ||
30 | /** | |
31 | \defgroup RockFsRebuild Rock Store Rebuild | |
32 | \ingroup Filesystems | |
33 | * | |
34 | \section RockFsRebuildOverview Overview | |
35 | * Several layers of information are manipualted during the rebuild: | |
36 | \par | |
37 | * Store Entry: Response message plus all the metainformation associated with | |
38 | * it. Identified by store key. At any given time, from Squid point | |
39 | * of view, there is only one entry with a given key, but several | |
40 | * different entries with the same key can be observed in any historical | |
41 | * archive (such as an access log or a store database). | |
42 | \par | |
43 | * Slot chain: A sequence of db slots representing a Store Entry state at | |
44 | * some point in time. Identified by key+version combination. Due to | |
45 | * transaction aborts, crashes, and idle periods, some chains may contain | |
46 | * incomplete or stale information. We assume that no two different chains | |
47 | * have the same key and version. If that assumption fails, we may serve a | |
48 | * hodgepodge entry during rebuild, until "extra" slots are loaded/noticed. | |
49 | \par | |
50 | * iNode: The very first db slot in an entry slot chain. This slot contains | |
51 | * at least the beginning of Store Entry metadata, but most 32KB inodes contain | |
52 | * the entire metadata, HTTP headers, and HTTP body. | |
53 | \par | |
54 | * Db slot: A db record containing a piece of a single store entry and linked | |
55 | * to other slots with the same key and version fields, forming a chain. | |
56 | * Slots are identified by their absolute position in the database file, | |
57 | * which is naturally unique. | |
58 | \par | |
59 | * When information from the newly loaded db slot contradicts the entry-level | |
60 | * information collected so far (e.g., the versions do not match or the total | |
61 | * chain size after the slot contribution exceeds the expected number), the | |
62 | * whole entry (and not just the chain or the slot!) is declared corrupted. | |
63 | \par | |
64 | * Why invalidate the whole entry? Rock Store is written for high-load | |
65 | * environments with large caches, where there is usually very few idle slots | |
66 | * in the database. A space occupied by a purged entry is usually immediately | |
67 | * reclaimed. A Squid crash or a transaction abort is rather unlikely to | |
68 | * leave a relatively large number of stale slots in the database. Thus, the | |
69 | * number of potentially corrupted entries is relatively small. On the other | |
70 | * hand, the damage from serving a single hadgepodge entry may be significant | |
71 | * to the user. In such an environment, invalidating the whole entry has | |
72 | * negligible performance impact but saves us from high-damage bugs. | |
73 | */ | |
74 | ||
75 | namespace Rock | |
76 | { | |
77 | ||
78 | static bool | |
79 | DoneLoading(const int64_t loadingPos, const int64_t dbSlotLimit) | |
80 | { | |
81 | return loadingPos >= dbSlotLimit; | |
82 | } | |
83 | ||
84 | static bool | |
85 | DoneValidating(const int64_t validationPos, const int64_t dbSlotLimit, const int64_t dbEntryLimit) | |
86 | { | |
87 | // paranoid slot checking is only enabled with squid -S | |
88 | const auto extraWork = opt_store_doublecheck ? dbSlotLimit : 0; | |
89 | return validationPos >= (dbEntryLimit + extraWork); | |
90 | } | |
91 | ||
92 | /// low-level anti-padding storage class for LoadingEntry and LoadingSlot flags | |
93 | class LoadingFlags | |
94 | { | |
95 | public: | |
96 | LoadingFlags(): state(0), anchored(0), mapped(0), finalized(0), freed(0) {} | |
97 | ||
98 | /* for LoadingEntry */ | |
99 | uint8_t state:3; ///< current entry state (one of the LoadingEntry::State values) | |
100 | uint8_t anchored:1; ///< whether we loaded the inode slot for this entry | |
101 | ||
102 | /* for LoadingSlot */ | |
103 | uint8_t mapped:1; ///< whether the slot was added to a mapped entry | |
104 | uint8_t finalized:1; ///< whether finalizeOrThrow() has scanned the slot | |
105 | uint8_t freed:1; ///< whether the slot was given to the map as free space | |
106 | }; | |
107 | ||
108 | /// smart StoreEntry-level info pointer (hides anti-padding LoadingParts arrays) | |
109 | class LoadingEntry | |
110 | { | |
111 | public: | |
112 | LoadingEntry(const sfileno fileNo, LoadingParts &source); | |
113 | ||
114 | uint64_t &size; ///< payload seen so far | |
115 | uint32_t &version; ///< DbCellHeader::version to distinguish same-URL chains | |
116 | ||
117 | /// possible store entry states during index rebuild | |
118 | typedef enum { leEmpty = 0, leLoading, leLoaded, leCorrupted, leIgnored } State; | |
119 | ||
120 | /* LoadingFlags::state */ | |
121 | State state() const { return static_cast<State>(flags.state); } | |
122 | void state(State aState) const { flags.state = aState; } | |
123 | ||
124 | /* LoadingFlags::anchored */ | |
125 | bool anchored() const { return flags.anchored; } | |
126 | void anchored(const bool beAnchored) { flags.anchored = beAnchored; } | |
127 | ||
128 | private: | |
129 | LoadingFlags &flags; ///< entry flags (see the above accessors) are ours | |
130 | }; | |
131 | ||
132 | /// smart db slot-level info pointer (hides anti-padding LoadingParts arrays) | |
133 | class LoadingSlot | |
134 | { | |
135 | public: | |
136 | LoadingSlot(const SlotId slotId, LoadingParts &source); | |
137 | ||
138 | /// another slot in some chain belonging to the same entry (unordered!) | |
139 | Ipc::StoreMapSliceId &more; | |
140 | ||
141 | /* LoadingFlags::mapped */ | |
142 | bool mapped() const { return flags.mapped; } | |
143 | void mapped(const bool beMapped) { flags.mapped = beMapped; } | |
144 | ||
145 | /* LoadingFlags::finalized */ | |
146 | bool finalized() const { return flags.finalized; } | |
147 | void finalized(const bool beFinalized) { flags.finalized = beFinalized; } | |
148 | ||
149 | /* LoadingFlags::freed */ | |
150 | bool freed() const { return flags.freed; } | |
151 | void freed(const bool beFreed) { flags.freed = beFreed; } | |
152 | ||
153 | bool used() const { return freed() || mapped() || more != -1; } | |
154 | ||
155 | private: | |
156 | LoadingFlags &flags; ///< slot flags (see the above accessors) are ours | |
157 | }; | |
158 | ||
159 | /// information about store entries being loaded from disk (and their slots) | |
160 | /// used for identifying partially stored/loaded entries | |
161 | class LoadingParts | |
162 | { | |
163 | public: | |
164 | using Sizes = Ipc::StoreMapItems<uint64_t>; | |
165 | using Versions = Ipc::StoreMapItems<uint32_t>; | |
166 | using Mores = Ipc::StoreMapItems<Ipc::StoreMapSliceId>; | |
167 | using Flags = Ipc::StoreMapItems<LoadingFlags>; | |
168 | ||
169 | LoadingParts(const SwapDir &dir, const bool resuming); | |
170 | ~LoadingParts(); | |
171 | ||
172 | // lacking copying/moving code and often too huge to copy | |
173 | LoadingParts(LoadingParts&&) = delete; | |
174 | ||
175 | Sizes &sizes() const { return *sizesOwner->object(); } | |
176 | Versions &versions() const { return *versionsOwner->object(); } | |
177 | Mores &mores() const { return *moresOwner->object(); } | |
178 | Flags &flags() const { return *flagsOwner->object(); } | |
179 | ||
180 | private: | |
181 | /* Anti-padding storage. With millions of entries, padding matters! */ | |
182 | ||
183 | /* indexed by sfileno */ | |
184 | Sizes::Owner *sizesOwner; ///< LoadingEntry::size for all entries | |
185 | Versions::Owner *versionsOwner; ///< LoadingEntry::version for all entries | |
186 | ||
187 | /* indexed by SlotId */ | |
188 | Mores::Owner *moresOwner; ///< LoadingSlot::more for all slots | |
189 | ||
190 | /* entry flags are indexed by sfileno; slot flags -- by SlotId */ | |
191 | Flags::Owner *flagsOwner; ///< all LoadingEntry and LoadingSlot flags | |
192 | }; | |
193 | ||
194 | } /* namespace Rock */ | |
195 | ||
196 | /* LoadingEntry */ | |
197 | ||
198 | Rock::LoadingEntry::LoadingEntry(const sfileno fileNo, LoadingParts &source): | |
199 | size(source.sizes().at(fileNo)), | |
200 | version(source.versions().at(fileNo)), | |
201 | flags(source.flags().at(fileNo)) | |
202 | { | |
203 | } | |
204 | ||
205 | /* LoadingSlot */ | |
206 | ||
207 | Rock::LoadingSlot::LoadingSlot(const SlotId slotId, LoadingParts &source): | |
208 | more(source.mores().at(slotId)), | |
209 | flags(source.flags().at(slotId)) | |
210 | { | |
211 | } | |
212 | ||
213 | /* LoadingParts */ | |
214 | ||
215 | template <class T> | |
216 | inline typename T::Owner * | |
217 | createOwner(const char *dirPath, const char *sfx, const int64_t limit, const bool resuming) | |
218 | { | |
219 | auto id = Ipc::Mem::Segment::Name(SBuf(dirPath), sfx); | |
220 | return resuming ? Ipc::Mem::Owner<T>::Old(id.c_str()) : shm_new(T)(id.c_str(), limit); | |
221 | } | |
222 | ||
223 | Rock::LoadingParts::LoadingParts(const SwapDir &dir, const bool resuming): | |
224 | sizesOwner(createOwner<Sizes>(dir.path, "rebuild_sizes", dir.entryLimitActual(), resuming)), | |
225 | versionsOwner(createOwner<Versions>(dir.path, "rebuild_versions", dir.entryLimitActual(), resuming)), | |
226 | moresOwner(createOwner<Mores>(dir.path, "rebuild_mores", dir.slotLimitActual(), resuming)), | |
227 | flagsOwner(createOwner<Flags>(dir.path, "rebuild_flags", dir.slotLimitActual(), resuming)) | |
228 | { | |
229 | assert(sizes().capacity == versions().capacity); // every entry has both fields | |
230 | assert(sizes().capacity <= mores().capacity); // every entry needs slot(s) | |
231 | assert(mores().capacity == flags().capacity); // every slot needs a set of flags | |
232 | ||
233 | if (!resuming) { | |
234 | // other parts rely on shared memory segments being zero-initialized | |
235 | // TODO: refactor the next slot pointer to use 0 for nil values | |
236 | mores().fill(-1); | |
237 | } | |
238 | } | |
239 | ||
240 | Rock::LoadingParts::~LoadingParts() | |
241 | { | |
242 | delete sizesOwner; | |
243 | delete versionsOwner; | |
244 | delete moresOwner; | |
245 | delete flagsOwner; | |
246 | } | |
247 | ||
248 | /* Rock::Rebuild::Stats */ | |
249 | ||
250 | SBuf | |
251 | Rock::Rebuild::Stats::Path(const char *dirPath) | |
252 | { | |
253 | return Ipc::Mem::Segment::Name(SBuf(dirPath), "rebuild_stats"); | |
254 | } | |
255 | ||
256 | Ipc::Mem::Owner<Rock::Rebuild::Stats>* | |
257 | Rock::Rebuild::Stats::Init(const SwapDir &dir) | |
258 | { | |
259 | return shm_new(Stats)(Path(dir.path).c_str()); | |
260 | } | |
261 | ||
262 | bool | |
263 | Rock::Rebuild::Stats::completed(const SwapDir &dir) const | |
264 | { | |
265 | return DoneLoading(counts.scancount, dir.slotLimitActual()) && | |
266 | DoneValidating(counts.validations, dir.slotLimitActual(), dir.entryLimitActual()); | |
267 | } | |
268 | ||
269 | /* Rebuild */ | |
270 | ||
271 | bool | |
272 | Rock::Rebuild::IsResponsible(const SwapDir &) | |
273 | { | |
274 | // in SMP mode, only the disker is responsible for populating the map | |
275 | return !UsingSmp() || IamDiskProcess(); | |
276 | } | |
277 | ||
278 | bool | |
279 | Rock::Rebuild::Start(SwapDir &dir) | |
280 | { | |
281 | if (!IsResponsible(dir)) { | |
282 | debugs(47, 2, "not responsible for indexing cache_dir #" << | |
283 | dir.index << " from " << dir.filePath); | |
284 | return false; | |
285 | } | |
286 | ||
287 | const auto stats = shm_old(Rebuild::Stats)(Stats::Path(dir.path).c_str()); | |
288 | if (stats->completed(dir)) { | |
289 | debugs(47, 2, "already indexed cache_dir #" << | |
290 | dir.index << " from " << dir.filePath); | |
291 | return false; | |
292 | } | |
293 | ||
294 | AsyncJob::Start(new Rebuild(&dir, stats)); | |
295 | return true; | |
296 | } | |
297 | ||
298 | Rock::Rebuild::Rebuild(SwapDir *dir, const Ipc::Mem::Pointer<Stats> &s): AsyncJob("Rock::Rebuild"), | |
299 | sd(dir), | |
300 | parts(nullptr), | |
301 | stats(s), | |
302 | dbSize(0), | |
303 | dbSlotSize(0), | |
304 | dbSlotLimit(0), | |
305 | dbEntryLimit(0), | |
306 | fd(-1), | |
307 | dbOffset(0), | |
308 | loadingPos(stats->counts.scancount), | |
309 | validationPos(stats->counts.validations), | |
310 | counts(stats->counts), | |
311 | resuming(stats->counts.started()) | |
312 | { | |
313 | assert(sd); | |
314 | dbSize = sd->diskOffsetLimit(); // we do not care about the trailer waste | |
315 | dbSlotSize = sd->slotSize; | |
316 | dbEntryLimit = sd->entryLimitActual(); | |
317 | dbSlotLimit = sd->slotLimitActual(); | |
318 | assert(dbEntryLimit <= dbSlotLimit); | |
319 | registerRunner(); | |
320 | } | |
321 | ||
322 | Rock::Rebuild::~Rebuild() | |
323 | { | |
324 | if (fd >= 0) | |
325 | file_close(fd); | |
326 | // normally, segments are used until the Squid instance quits, | |
327 | // but these indexing-only segments are no longer needed | |
328 | delete parts; | |
329 | } | |
330 | ||
331 | void | |
332 | Rock::Rebuild::startShutdown() | |
333 | { | |
334 | mustStop("startShutdown"); | |
335 | } | |
336 | ||
337 | /// prepares and initiates entry loading sequence | |
338 | void | |
339 | Rock::Rebuild::start() | |
340 | { | |
341 | assert(IsResponsible(*sd)); | |
342 | ||
343 | if (!resuming) { | |
344 | debugs(47, Important(18), "Loading cache_dir #" << sd->index << | |
345 | " from " << sd->filePath); | |
346 | } else { | |
347 | debugs(47, Important(63), "Resuming indexing cache_dir #" << sd->index << | |
348 | " from " << sd->filePath << ':' << progressDescription()); | |
349 | } | |
350 | ||
351 | fd = file_open(sd->filePath, O_RDONLY | O_BINARY); | |
352 | if (fd < 0) | |
353 | failure("cannot open db", errno); | |
354 | ||
355 | char hdrBuf[SwapDir::HeaderSize]; | |
356 | if (read(fd, hdrBuf, sizeof(hdrBuf)) != SwapDir::HeaderSize) | |
357 | failure("cannot read db header", errno); | |
358 | ||
359 | // slot prefix of SM_PAGE_SIZE should fit both core entry header and ours | |
360 | assert(sizeof(DbCellHeader) < SM_PAGE_SIZE); | |
361 | buf.init(SM_PAGE_SIZE, SM_PAGE_SIZE); | |
362 | ||
363 | dbOffset = SwapDir::HeaderSize + loadingPos * dbSlotSize; | |
364 | ||
365 | assert(!parts); | |
366 | parts = new LoadingParts(*sd, resuming); | |
367 | ||
368 | counts.updateStartTime(current_time); | |
369 | ||
370 | checkpoint(); | |
371 | } | |
372 | ||
373 | /// continues after a pause if not done | |
374 | void | |
375 | Rock::Rebuild::checkpoint() | |
376 | { | |
377 | if (!done()) | |
378 | eventAdd("Rock::Rebuild", Rock::Rebuild::Steps, this, 0.01, 1, true); | |
379 | } | |
380 | ||
381 | bool | |
382 | Rock::Rebuild::doneLoading() const | |
383 | { | |
384 | return DoneLoading(loadingPos, dbSlotLimit); | |
385 | } | |
386 | ||
387 | bool | |
388 | Rock::Rebuild::doneValidating() const | |
389 | { | |
390 | return DoneValidating(validationPos, dbSlotLimit, dbEntryLimit); | |
391 | } | |
392 | ||
393 | bool | |
394 | Rock::Rebuild::doneAll() const | |
395 | { | |
396 | return doneLoading() && doneValidating() && AsyncJob::doneAll(); | |
397 | } | |
398 | ||
399 | void | |
400 | Rock::Rebuild::Steps(void *data) | |
401 | { | |
402 | // use async call to enable job call protection that time events lack | |
403 | CallJobHere(47, 5, static_cast<Rebuild*>(data), Rock::Rebuild, steps); | |
404 | } | |
405 | ||
406 | void | |
407 | Rock::Rebuild::steps() | |
408 | { | |
409 | if (!doneLoading()) | |
410 | loadingSteps(); | |
411 | else | |
412 | validationSteps(); | |
413 | ||
414 | checkpoint(); | |
415 | } | |
416 | ||
417 | void | |
418 | Rock::Rebuild::loadingSteps() | |
419 | { | |
420 | debugs(47,5, sd->index << " slot " << loadingPos << " at " << | |
421 | dbOffset << " <= " << dbSize); | |
422 | ||
423 | // Balance our desire to maximize the number of entries processed at once | |
424 | // (and, hence, minimize overheads and total rebuild time) with a | |
425 | // requirement to also process Coordinator events, disk I/Os, etc. | |
426 | const int maxSpentMsec = 50; // keep small: most RAM I/Os are under 1ms | |
427 | const timeval loopStart = current_time; | |
428 | ||
429 | int64_t loaded = 0; | |
430 | while (!doneLoading()) { | |
431 | loadOneSlot(); | |
432 | dbOffset += dbSlotSize; | |
433 | ++loadingPos; | |
434 | ++loaded; | |
435 | ||
436 | if (counts.scancount % 1000 == 0) | |
437 | storeRebuildProgress(sd->index, dbSlotLimit, counts.scancount); | |
438 | ||
439 | if (opt_foreground_rebuild) | |
440 | continue; // skip "few entries at a time" check below | |
441 | ||
442 | getCurrentTime(); | |
443 | const double elapsedMsec = tvSubMsec(loopStart, current_time); | |
444 | if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) { | |
445 | debugs(47, 5, "pausing after " << loaded << " entries in " << | |
446 | elapsedMsec << "ms; " << (elapsedMsec/loaded) << "ms per entry"); | |
447 | break; | |
448 | } | |
449 | } | |
450 | } | |
451 | ||
452 | Rock::LoadingEntry | |
453 | Rock::Rebuild::loadingEntry(const sfileno fileNo) | |
454 | { | |
455 | Must(0 <= fileNo && fileNo < dbEntryLimit); | |
456 | return LoadingEntry(fileNo, *parts); | |
457 | } | |
458 | ||
459 | Rock::LoadingSlot | |
460 | Rock::Rebuild::loadingSlot(const SlotId slotId) | |
461 | { | |
462 | Must(0 <= slotId && slotId < dbSlotLimit); | |
463 | Must(slotId <= loadingPos); // cannot look ahead | |
464 | return LoadingSlot(slotId, *parts); | |
465 | } | |
466 | ||
467 | void | |
468 | Rock::Rebuild::loadOneSlot() | |
469 | { | |
470 | debugs(47,5, sd->index << " slot " << loadingPos << " at " << | |
471 | dbOffset << " <= " << dbSize); | |
472 | ||
473 | // increment before loadingPos to avoid getting stuck at a slot | |
474 | // in a case of crash | |
475 | ++counts.scancount; | |
476 | ||
477 | if (lseek(fd, dbOffset, SEEK_SET) < 0) | |
478 | failure("cannot seek to db entry", errno); | |
479 | ||
480 | buf.reset(); | |
481 | ||
482 | if (!storeRebuildLoadEntry(fd, sd->index, buf, counts)) | |
483 | return; | |
484 | ||
485 | const SlotId slotId = loadingPos; | |
486 | ||
487 | // get our header | |
488 | DbCellHeader header; | |
489 | if (buf.contentSize() < static_cast<mb_size_t>(sizeof(header))) { | |
490 | debugs(47, DBG_IMPORTANT, "WARNING: cache_dir[" << sd->index << "]: " << | |
491 | "Ignoring truncated " << buf.contentSize() << "-byte " << | |
492 | "cache entry meta data at " << dbOffset); | |
493 | freeUnusedSlot(slotId, true); | |
494 | return; | |
495 | } | |
496 | memcpy(&header, buf.content(), sizeof(header)); | |
497 | if (header.empty()) { | |
498 | freeUnusedSlot(slotId, false); | |
499 | return; | |
500 | } | |
501 | if (!header.sane(dbSlotSize, dbSlotLimit)) { | |
502 | debugs(47, DBG_IMPORTANT, "WARNING: cache_dir[" << sd->index << "]: " << | |
503 | "Ignoring malformed cache entry meta data at " << dbOffset); | |
504 | freeUnusedSlot(slotId, true); | |
505 | return; | |
506 | } | |
507 | buf.consume(sizeof(header)); // optimize to avoid memmove() | |
508 | ||
509 | useNewSlot(slotId, header); | |
510 | } | |
511 | ||
512 | /// whether the given slot buffer is likely to have nothing but zeros, as is | |
513 | /// common to slots in pre-initialized (with zeros) db files | |
514 | static bool | |
515 | ZeroedSlot(const MemBuf &buf) | |
516 | { | |
517 | // We could memcmp the entire buffer, but it is probably safe enough to test | |
518 | // a few bytes because even if we do not detect a corrupted entry, it is not | |
519 | // a big deal: Store::UnpackPrefix() rejects all-0s metadata prefix. | |
520 | static const std::array<char, 10> zeros = {}; | |
521 | ||
522 | if (static_cast<size_t>(buf.contentSize()) < zeros.size()) | |
523 | return false; // cannot be sure enough | |
524 | ||
525 | return memcmp(buf.content(), zeros.data(), zeros.size()) == 0; | |
526 | } | |
527 | ||
528 | /// parse StoreEntry basics and add them to the map, returning true on success | |
529 | bool | |
530 | Rock::Rebuild::importEntry(Ipc::StoreMapAnchor &anchor, const sfileno fileno, const DbCellHeader &header) | |
531 | { | |
532 | cache_key key[SQUID_MD5_DIGEST_LENGTH]; | |
533 | StoreEntry loadedE; | |
534 | const uint64_t knownSize = header.entrySize > 0 ? | |
535 | header.entrySize : anchor.basics.swap_file_sz.load(); | |
536 | ||
537 | if (ZeroedSlot(buf)) | |
538 | return false; | |
539 | ||
540 | if (!storeRebuildParseEntry(buf, loadedE, key, counts, knownSize)) | |
541 | return false; | |
542 | ||
543 | // the entry size may be unknown, but if it is known, it is authoritative | |
544 | ||
545 | debugs(47, 8, "importing basics for entry " << fileno << | |
546 | " inode.entrySize: " << header.entrySize << | |
547 | " swap_file_sz: " << loadedE.swap_file_sz); | |
548 | anchor.set(loadedE); | |
549 | ||
550 | // we have not validated whether all db cells for this entry were loaded | |
551 | EBIT_CLR(anchor.basics.flags, ENTRY_VALIDATED); | |
552 | ||
553 | // loadedE->dump(5); | |
554 | ||
555 | return true; | |
556 | } | |
557 | ||
558 | void | |
559 | Rock::Rebuild::validationSteps() | |
560 | { | |
561 | debugs(47, 5, sd->index << " validating from " << validationPos); | |
562 | ||
563 | // see loadingSteps() for the rationale; TODO: avoid duplication | |
564 | const int maxSpentMsec = 50; // keep small: validation does not do I/O | |
565 | const timeval loopStart = current_time; | |
566 | ||
567 | int64_t validated = 0; | |
568 | while (!doneValidating()) { | |
569 | // increment before validationPos to avoid getting stuck at a slot | |
570 | // in a case of crash | |
571 | ++counts.validations; | |
572 | if (validationPos < dbEntryLimit) | |
573 | validateOneEntry(validationPos); | |
574 | else | |
575 | validateOneSlot(validationPos - dbEntryLimit); | |
576 | ++validationPos; | |
577 | ++validated; | |
578 | ||
579 | if (validationPos % 1000 == 0) | |
580 | debugs(20, 2, "validated: " << validationPos); | |
581 | ||
582 | if (opt_foreground_rebuild) | |
583 | continue; // skip "few entries at a time" check below | |
584 | ||
585 | getCurrentTime(); | |
586 | const double elapsedMsec = tvSubMsec(loopStart, current_time); | |
587 | if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) { | |
588 | debugs(47, 5, "pausing after " << validated << " entries in " << | |
589 | elapsedMsec << "ms; " << (elapsedMsec/validated) << "ms per entry"); | |
590 | break; | |
591 | } | |
592 | } | |
593 | } | |
594 | ||
595 | /// Either make the entry accessible to all or throw. | |
596 | /// This method assumes it is called only when no more entry slots are expected. | |
597 | void | |
598 | Rock::Rebuild::finalizeOrThrow(const sfileno fileNo, LoadingEntry &le) | |
599 | { | |
600 | // walk all map-linked slots, starting from inode, and mark each | |
601 | Ipc::StoreMapAnchor &anchor = sd->map->writeableEntry(fileNo); | |
602 | Must(le.size > 0); // paranoid | |
603 | uint64_t mappedSize = 0; | |
604 | SlotId slotId = anchor.start; | |
605 | while (slotId >= 0 && mappedSize < le.size) { | |
606 | LoadingSlot slot = loadingSlot(slotId); // throws if we have not loaded that slot | |
607 | Must(!slot.finalized()); // no loops or stealing from other entries | |
608 | Must(slot.mapped()); // all our slots should be in the sd->map | |
609 | Must(!slot.freed()); // all our slots should still be present | |
610 | slot.finalized(true); | |
611 | ||
612 | Ipc::StoreMapSlice &mapSlice = sd->map->writeableSlice(fileNo, slotId); | |
613 | Must(mapSlice.size > 0); // paranoid | |
614 | mappedSize += mapSlice.size; | |
615 | slotId = mapSlice.next; | |
616 | } | |
617 | /* no hodgepodge entries: one entry - one full chain and no leftovers */ | |
618 | Must(slotId < 0); | |
619 | Must(mappedSize == le.size); | |
620 | ||
621 | if (!anchor.basics.swap_file_sz) | |
622 | anchor.basics.swap_file_sz = le.size; | |
623 | EBIT_SET(anchor.basics.flags, ENTRY_VALIDATED); | |
624 | le.state(LoadingEntry::leLoaded); | |
625 | sd->map->closeForWriting(fileNo); | |
626 | ++counts.objcount; | |
627 | } | |
628 | ||
629 | /// Either make the entry accessible to all or free it. | |
630 | /// This method must only be called when no more entry slots are expected. | |
631 | void | |
632 | Rock::Rebuild::finalizeOrFree(const sfileno fileNo, LoadingEntry &le) | |
633 | { | |
634 | try { | |
635 | finalizeOrThrow(fileNo, le); | |
636 | } catch (const std::exception &ex) { | |
637 | freeBadEntry(fileNo, ex.what()); | |
638 | } | |
639 | } | |
640 | ||
641 | void | |
642 | Rock::Rebuild::validateOneEntry(const sfileno fileNo) | |
643 | { | |
644 | LoadingEntry entry = loadingEntry(fileNo); | |
645 | switch (entry.state()) { | |
646 | ||
647 | case LoadingEntry::leLoading: | |
648 | finalizeOrFree(fileNo, entry); | |
649 | break; | |
650 | ||
651 | case LoadingEntry::leEmpty: // no entry hashed to this position | |
652 | case LoadingEntry::leLoaded: // we have already unlocked this entry | |
653 | case LoadingEntry::leCorrupted: // we have already removed this entry | |
654 | case LoadingEntry::leIgnored: // we have already discarded this entry | |
655 | break; | |
656 | } | |
657 | } | |
658 | ||
659 | void | |
660 | Rock::Rebuild::validateOneSlot(const SlotId slotId) | |
661 | { | |
662 | const LoadingSlot slot = loadingSlot(slotId); | |
663 | // there should not be any unprocessed slots left | |
664 | Must(slot.freed() || (slot.mapped() && slot.finalized())); | |
665 | } | |
666 | ||
667 | /// Marks remaining bad entry slots as free and unlocks the entry. The map | |
668 | /// cannot do this because Loading entries may have holes in the slots chain. | |
669 | void | |
670 | Rock::Rebuild::freeBadEntry(const sfileno fileno, const char *eDescription) | |
671 | { | |
672 | debugs(47, 2, "cache_dir #" << sd->index << ' ' << eDescription << | |
673 | " entry " << fileno << " is ignored during rebuild"); | |
674 | ||
675 | LoadingEntry le = loadingEntry(fileno); | |
676 | le.state(LoadingEntry::leCorrupted); | |
677 | ||
678 | Ipc::StoreMapAnchor &anchor = sd->map->writeableEntry(fileno); | |
679 | assert(anchor.start < 0 || le.size > 0); | |
680 | for (SlotId slotId = anchor.start; slotId >= 0;) { | |
681 | const SlotId next = loadingSlot(slotId).more; | |
682 | freeSlot(slotId, true); | |
683 | slotId = next; | |
684 | } | |
685 | ||
686 | sd->map->forgetWritingEntry(fileno); | |
687 | } | |
688 | ||
689 | void | |
690 | Rock::Rebuild::swanSong() | |
691 | { | |
692 | debugs(47,3, "cache_dir #" << sd->index << " rebuild level: " << | |
693 | StoreController::store_dirs_rebuilding); | |
694 | storeRebuildComplete(&counts); | |
695 | } | |
696 | ||
697 | void | |
698 | Rock::Rebuild::failure(const char *msg, int errNo) | |
699 | { | |
700 | debugs(47,5, sd->index << " slot " << loadingPos << " at " << | |
701 | dbOffset << " <= " << dbSize); | |
702 | ||
703 | if (errNo) | |
704 | debugs(47, DBG_CRITICAL, "ERROR: Rock cache_dir rebuild failure: " << xstrerr(errNo)); | |
705 | debugs(47, DBG_CRITICAL, "Do you need to run 'squid -z' to initialize storage?"); | |
706 | ||
707 | assert(sd); | |
708 | fatalf("Rock cache_dir[%d] rebuild of %s failed: %s.", | |
709 | sd->index, sd->filePath, msg); | |
710 | } | |
711 | ||
712 | /// adds slot to the free slot index | |
713 | void | |
714 | Rock::Rebuild::freeSlot(const SlotId slotId, const bool invalid) | |
715 | { | |
716 | debugs(47,5, sd->index << " frees slot " << slotId); | |
717 | LoadingSlot slot = loadingSlot(slotId); | |
718 | assert(!slot.freed()); | |
719 | slot.freed(true); | |
720 | ||
721 | if (invalid) { | |
722 | ++counts.invalid; | |
723 | //sd->unlink(fileno); leave garbage on disk, it should not hurt | |
724 | } | |
725 | ||
726 | Ipc::Mem::PageId pageId; | |
727 | pageId.pool = Ipc::Mem::PageStack::IdForSwapDirSpace(sd->index); | |
728 | pageId.number = slotId+1; | |
729 | sd->freeSlots->push(pageId); | |
730 | } | |
731 | ||
732 | /// freeSlot() for never-been-mapped slots | |
733 | void | |
734 | Rock::Rebuild::freeUnusedSlot(const SlotId slotId, const bool invalid) | |
735 | { | |
736 | LoadingSlot slot = loadingSlot(slotId); | |
737 | // mapped slots must be freed via freeBadEntry() to keep the map in sync | |
738 | assert(!slot.mapped()); | |
739 | freeSlot(slotId, invalid); | |
740 | } | |
741 | ||
742 | /// adds slot to the entry chain in the map | |
743 | void | |
744 | Rock::Rebuild::mapSlot(const SlotId slotId, const DbCellHeader &header) | |
745 | { | |
746 | LoadingSlot slot = loadingSlot(slotId); | |
747 | assert(!slot.mapped()); | |
748 | assert(!slot.freed()); | |
749 | slot.mapped(true); | |
750 | ||
751 | Ipc::StoreMapSlice slice; | |
752 | slice.next = header.nextSlot; | |
753 | slice.size = header.payloadSize; | |
754 | sd->map->importSlice(slotId, slice); | |
755 | } | |
756 | ||
757 | template <class SlotIdType> // accommodates atomic and simple SlotIds. | |
758 | void | |
759 | Rock::Rebuild::chainSlots(SlotIdType &from, const SlotId to) | |
760 | { | |
761 | LoadingSlot slot = loadingSlot(to); | |
762 | assert(slot.more < 0); | |
763 | slot.more = from; // may still be unset | |
764 | from = to; | |
765 | } | |
766 | ||
767 | /// adds slot to an existing entry chain; caller must check that the slot | |
768 | /// belongs to the chain it is being added to | |
769 | void | |
770 | Rock::Rebuild::addSlotToEntry(const sfileno fileno, const SlotId slotId, const DbCellHeader &header) | |
771 | { | |
772 | LoadingEntry le = loadingEntry(fileno); | |
773 | Ipc::StoreMapAnchor &anchor = sd->map->writeableEntry(fileno); | |
774 | ||
775 | debugs(47,9, "adding " << slotId << " to entry " << fileno); | |
776 | // we do not need to preserve the order | |
777 | if (le.anchored()) { | |
778 | LoadingSlot inode = loadingSlot(anchor.start); | |
779 | chainSlots(inode.more, slotId); | |
780 | } else { | |
781 | chainSlots(anchor.start, slotId); | |
782 | } | |
783 | ||
784 | le.size += header.payloadSize; // must precede freeBadEntry() calls | |
785 | ||
786 | if (header.firstSlot == slotId) { | |
787 | debugs(47,5, "added inode"); | |
788 | ||
789 | if (le.anchored()) { // we have already added another inode slot | |
790 | freeBadEntry(fileno, "inode conflict"); | |
791 | ++counts.clashcount; | |
792 | return; | |
793 | } | |
794 | ||
795 | le.anchored(true); | |
796 | ||
797 | if (!importEntry(anchor, fileno, header)) { | |
798 | freeBadEntry(fileno, "corrupted metainfo"); | |
799 | return; | |
800 | } | |
801 | ||
802 | // set total entry size and/or check it for consistency | |
803 | if (const uint64_t totalSize = header.entrySize) { | |
804 | assert(totalSize != static_cast<uint64_t>(-1)); | |
805 | if (!anchor.basics.swap_file_sz) { | |
806 | anchor.basics.swap_file_sz = totalSize; | |
807 | assert(anchor.basics.swap_file_sz != static_cast<uint64_t>(-1)); | |
808 | } else if (totalSize != anchor.basics.swap_file_sz) { | |
809 | freeBadEntry(fileno, "size mismatch"); | |
810 | return; | |
811 | } | |
812 | } | |
813 | } | |
814 | ||
815 | const uint64_t totalSize = anchor.basics.swap_file_sz; // may be 0/unknown | |
816 | ||
817 | if (totalSize > 0 && le.size > totalSize) { // overflow | |
818 | debugs(47, 8, "overflow: " << le.size << " > " << totalSize); | |
819 | freeBadEntry(fileno, "overflowing"); | |
820 | return; | |
821 | } | |
822 | ||
823 | mapSlot(slotId, header); | |
824 | if (totalSize > 0 && le.size == totalSize) | |
825 | finalizeOrFree(fileno, le); // entry is probably fully loaded now | |
826 | } | |
827 | ||
828 | /// initialize housekeeping information for a newly accepted entry | |
829 | void | |
830 | Rock::Rebuild::primeNewEntry(Ipc::StoreMap::Anchor &anchor, const sfileno fileno, const DbCellHeader &header) | |
831 | { | |
832 | anchor.setKey(reinterpret_cast<const cache_key*>(header.key)); | |
833 | assert(header.firstSlot >= 0); | |
834 | anchor.start = -1; // addSlotToEntry() will set it | |
835 | ||
836 | assert(anchor.basics.swap_file_sz != static_cast<uint64_t>(-1)); | |
837 | ||
838 | LoadingEntry le = loadingEntry(fileno); | |
839 | le.state(LoadingEntry::leLoading); | |
840 | le.version = header.version; | |
841 | le.size = 0; | |
842 | } | |
843 | ||
844 | /// handle a slot from an entry that we have not seen before | |
845 | void | |
846 | Rock::Rebuild::startNewEntry(const sfileno fileno, const SlotId slotId, const DbCellHeader &header) | |
847 | { | |
848 | // A miss may have been stored at our fileno while we were loading other | |
849 | // slots from disk. We ought to preserve that entry because it is fresher. | |
850 | const bool overwriteExisting = false; | |
851 | if (Ipc::StoreMap::Anchor *anchor = sd->map->openForWritingAt(fileno, overwriteExisting)) { | |
852 | primeNewEntry(*anchor, fileno, header); | |
853 | addSlotToEntry(fileno, slotId, header); // may fail | |
854 | assert(anchor->basics.swap_file_sz != static_cast<uint64_t>(-1)); | |
855 | } else { | |
856 | // A new from-network entry is occupying our map slot; let it be, but | |
857 | // save us from the trouble of going through the above motions again. | |
858 | LoadingEntry le = loadingEntry(fileno); | |
859 | le.state(LoadingEntry::leIgnored); | |
860 | freeUnusedSlot(slotId, false); | |
861 | } | |
862 | } | |
863 | ||
864 | /// does the header belong to the fileno entry being loaded? | |
865 | bool | |
866 | Rock::Rebuild::sameEntry(const sfileno fileno, const DbCellHeader &header) const | |
867 | { | |
868 | // Header updates always result in multi-start chains and often | |
869 | // result in multi-version chains so we can only compare the keys. | |
870 | const Ipc::StoreMap::Anchor &anchor = sd->map->writeableEntry(fileno); | |
871 | return anchor.sameKey(reinterpret_cast<const cache_key*>(header.key)); | |
872 | } | |
873 | ||
874 | /// handle freshly loaded (and validated) db slot header | |
875 | void | |
876 | Rock::Rebuild::useNewSlot(const SlotId slotId, const DbCellHeader &header) | |
877 | { | |
878 | const cache_key *const key = | |
879 | reinterpret_cast<const cache_key*>(header.key); | |
880 | const sfileno fileno = sd->map->fileNoByKey(key); | |
881 | assert(0 <= fileno && fileno < dbEntryLimit); | |
882 | ||
883 | LoadingEntry le = loadingEntry(fileno); | |
884 | debugs(47,9, "entry " << fileno << " state: " << le.state() << ", inode: " << | |
885 | header.firstSlot << ", size: " << header.payloadSize); | |
886 | ||
887 | switch (le.state()) { | |
888 | ||
889 | case LoadingEntry::leEmpty: { | |
890 | startNewEntry(fileno, slotId, header); | |
891 | break; | |
892 | } | |
893 | ||
894 | case LoadingEntry::leLoading: { | |
895 | if (sameEntry(fileno, header)) { | |
896 | addSlotToEntry(fileno, slotId, header); // may fail | |
897 | } else { | |
898 | // either the loading chain or this slot is stale; | |
899 | // be conservative and ignore both (and any future ones) | |
900 | freeBadEntry(fileno, "duplicated"); | |
901 | freeUnusedSlot(slotId, true); | |
902 | ++counts.dupcount; | |
903 | } | |
904 | break; | |
905 | } | |
906 | ||
907 | case LoadingEntry::leLoaded: { | |
908 | // either the previously loaded chain or this slot is stale; | |
909 | // be conservative and ignore both (and any future ones) | |
910 | le.state(LoadingEntry::leCorrupted); | |
911 | sd->map->freeEntry(fileno); // may not be immediately successful | |
912 | freeUnusedSlot(slotId, true); | |
913 | ++counts.dupcount; | |
914 | break; | |
915 | } | |
916 | ||
917 | case LoadingEntry::leCorrupted: { | |
918 | // previously seen slots messed things up so we must ignore this one | |
919 | freeUnusedSlot(slotId, true); | |
920 | break; | |
921 | } | |
922 | ||
923 | case LoadingEntry::leIgnored: { | |
924 | // already replaced by a fresher or colliding from-network entry | |
925 | freeUnusedSlot(slotId, false); | |
926 | break; | |
927 | } | |
928 | } | |
929 | } | |
930 | ||
931 | SBuf | |
932 | Rock::Rebuild::progressDescription() const | |
933 | { | |
934 | SBufStream str; | |
935 | ||
936 | str << Debug::Extra << "slots loaded: " << Progress(loadingPos, dbSlotLimit); | |
937 | ||
938 | const auto validatingEntries = validationPos < dbEntryLimit; | |
939 | const auto entriesValidated = validatingEntries ? validationPos : dbEntryLimit; | |
940 | str << Debug::Extra << "entries validated: " << Progress(entriesValidated, dbEntryLimit); | |
941 | if (opt_store_doublecheck) { | |
942 | const auto slotsValidated = validatingEntries ? 0 : (validationPos - dbEntryLimit); | |
943 | str << Debug::Extra << "slots validated: " << Progress(slotsValidated, dbSlotLimit); | |
944 | } | |
945 | ||
946 | return str.buf(); | |
947 | } | |
948 |