]> git.ipfire.org Git - thirdparty/squid.git/blob - src/fs/rock/RockSwapDir.cc
Fail Rock swapout if the disk dropped some of the write requests (#352)
[thirdparty/squid.git] / src / fs / rock / RockSwapDir.cc
1 /*
2 * Copyright (C) 1996-2018 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 47 Store Directory Routines */
10
11 #include "squid.h"
12 #include "cache_cf.h"
13 #include "CollapsedForwarding.h"
14 #include "ConfigOption.h"
15 #include "DiskIO/DiskIOModule.h"
16 #include "DiskIO/DiskIOStrategy.h"
17 #include "DiskIO/ReadRequest.h"
18 #include "DiskIO/WriteRequest.h"
19 #include "fs/rock/RockHeaderUpdater.h"
20 #include "fs/rock/RockIoRequests.h"
21 #include "fs/rock/RockIoState.h"
22 #include "fs/rock/RockRebuild.h"
23 #include "fs/rock/RockSwapDir.h"
24 #include "globals.h"
25 #include "ipc/mem/Pages.h"
26 #include "MemObject.h"
27 #include "Parsing.h"
28 #include "SquidConfig.h"
29 #include "SquidMath.h"
30 #include "tools.h"
31
32 #include <cstdlib>
33 #include <iomanip>
34 #include <limits>
35
36 #if HAVE_SYS_STAT_H
37 #include <sys/stat.h>
38 #endif
39
40 const int64_t Rock::SwapDir::HeaderSize = 16*1024;
41
42 Rock::SwapDir::SwapDir(): ::SwapDir("rock"),
43 slotSize(HeaderSize), filePath(NULL), map(NULL), io(NULL),
44 waitingForPage(NULL)
45 {
46 }
47
48 Rock::SwapDir::~SwapDir()
49 {
50 delete io;
51 delete map;
52 safe_free(filePath);
53 }
54
55 // called when Squid core needs a StoreEntry with a given key
56 StoreEntry *
57 Rock::SwapDir::get(const cache_key *key)
58 {
59 if (!map || !theFile || !theFile->canRead())
60 return NULL;
61
62 sfileno filen;
63 const Ipc::StoreMapAnchor *const slot = map->openForReading(key, filen);
64 if (!slot)
65 return NULL;
66
67 // create a brand new store entry and initialize it with stored basics
68 StoreEntry *e = new StoreEntry();
69 e->createMemObject();
70 anchorEntry(*e, filen, *slot);
71 trackReferences(*e);
72 return e;
73 }
74
75 bool
76 Rock::SwapDir::anchorToCache(StoreEntry &entry, bool &inSync)
77 {
78 if (!map || !theFile || !theFile->canRead())
79 return false;
80
81 sfileno filen;
82 const Ipc::StoreMapAnchor *const slot = map->openForReading(
83 reinterpret_cast<cache_key*>(entry.key), filen);
84 if (!slot)
85 return false;
86
87 anchorEntry(entry, filen, *slot);
88 inSync = updateAnchoredWith(entry, *slot);
89 return true; // even if inSync is false
90 }
91
92 bool
93 Rock::SwapDir::updateAnchored(StoreEntry &entry)
94 {
95 if (!map || !theFile || !theFile->canRead())
96 return false;
97
98 assert(entry.hasDisk(index));
99
100 const Ipc::StoreMapAnchor &s = map->readableEntry(entry.swap_filen);
101 return updateAnchoredWith(entry, s);
102 }
103
104 bool
105 Rock::SwapDir::updateAnchoredWith(StoreEntry &entry, const Ipc::StoreMapAnchor &anchor)
106 {
107 entry.swap_file_sz = anchor.basics.swap_file_sz;
108 return true;
109 }
110
111 void
112 Rock::SwapDir::anchorEntry(StoreEntry &e, const sfileno filen, const Ipc::StoreMapAnchor &anchor)
113 {
114 anchor.exportInto(e);
115
116 const bool complete = anchor.complete();
117 e.store_status = complete ? STORE_OK : STORE_PENDING;
118 // SWAPOUT_WRITING: even though another worker writes?
119 e.attachToDisk(index, filen, complete ? SWAPOUT_DONE : SWAPOUT_WRITING);
120
121 e.ping_status = PING_NONE;
122
123 EBIT_SET(e.flags, ENTRY_VALIDATED);
124 }
125
126 void Rock::SwapDir::disconnect(StoreEntry &e)
127 {
128 assert(e.hasDisk(index));
129
130 ignoreReferences(e);
131
132 // do not rely on e.swap_status here because there is an async delay
133 // before it switches from SWAPOUT_WRITING to SWAPOUT_DONE.
134
135 // since e has swap_filen, its slot is locked for reading and/or writing
136 // but it is difficult to know whether THIS worker is reading or writing e,
137 // especially since we may switch from writing to reading. This code relies
138 // on Rock::IoState::writeableAnchor_ being set when we locked for writing.
139 if (e.mem_obj && e.mem_obj->swapout.sio != NULL &&
140 dynamic_cast<IoState&>(*e.mem_obj->swapout.sio).writeableAnchor_) {
141 map->abortWriting(e.swap_filen);
142 e.detachFromDisk();
143 dynamic_cast<IoState&>(*e.mem_obj->swapout.sio).writeableAnchor_ = NULL;
144 Store::Root().stopSharing(e); // broadcasts after the change
145 } else {
146 map->closeForReading(e.swap_filen);
147 e.detachFromDisk();
148 }
149 }
150
151 uint64_t
152 Rock::SwapDir::currentSize() const
153 {
154 const uint64_t spaceSize = !freeSlots ?
155 maxSize() : (slotSize * freeSlots->size());
156 // everything that is not free is in use
157 return maxSize() - spaceSize;
158 }
159
160 uint64_t
161 Rock::SwapDir::currentCount() const
162 {
163 return map ? map->entryCount() : 0;
164 }
165
166 /// In SMP mode only the disker process reports stats to avoid
167 /// counting the same stats by multiple processes.
168 bool
169 Rock::SwapDir::doReportStat() const
170 {
171 return ::SwapDir::doReportStat() && (!UsingSmp() || IamDiskProcess());
172 }
173
174 void
175 Rock::SwapDir::finalizeSwapoutSuccess(const StoreEntry &)
176 {
177 // nothing to do
178 }
179
180 void
181 Rock::SwapDir::finalizeSwapoutFailure(StoreEntry &entry)
182 {
183 debugs(47, 5, entry);
184 disconnect(entry); // calls abortWriting() to free the disk entry
185 }
186
187 int64_t
188 Rock::SwapDir::slotLimitAbsolute() const
189 {
190 // the max value is an invalid one; all values must be below the limit
191 assert(std::numeric_limits<Ipc::StoreMapSliceId>::max() ==
192 std::numeric_limits<SlotId>::max());
193 return std::numeric_limits<SlotId>::max();
194 }
195
196 int64_t
197 Rock::SwapDir::slotLimitActual() const
198 {
199 const int64_t sWanted = (maxSize() - HeaderSize)/slotSize;
200 const int64_t sLimitLo = map ? map->sliceLimit() : 0; // dynamic shrinking unsupported
201 const int64_t sLimitHi = slotLimitAbsolute();
202 return min(max(sLimitLo, sWanted), sLimitHi);
203 }
204
205 int64_t
206 Rock::SwapDir::entryLimitActual() const
207 {
208 return min(slotLimitActual(), entryLimitAbsolute());
209 }
210
211 // TODO: encapsulate as a tool
212 void
213 Rock::SwapDir::create()
214 {
215 assert(path);
216 assert(filePath);
217
218 if (UsingSmp() && !IamDiskProcess()) {
219 debugs (47,3, HERE << "disker will create in " << path);
220 return;
221 }
222
223 debugs (47,3, HERE << "creating in " << path);
224
225 struct stat dir_sb;
226 if (::stat(path, &dir_sb) == 0) {
227 struct stat file_sb;
228 if (::stat(filePath, &file_sb) == 0) {
229 debugs (47, DBG_IMPORTANT, "Skipping existing Rock db: " << filePath);
230 return;
231 }
232 // else the db file is not there or is not accessible, and we will try
233 // to create it later below, generating a detailed error on failures.
234 } else { // path does not exist or is inaccessible
235 // If path exists but is not accessible, mkdir() below will fail, and
236 // the admin should see the error and act accordingly, so there is
237 // no need to distinguish ENOENT from other possible stat() errors.
238 debugs (47, DBG_IMPORTANT, "Creating Rock db directory: " << path);
239 const int res = mkdir(path, 0700);
240 if (res != 0)
241 createError("mkdir");
242 }
243
244 debugs (47, DBG_IMPORTANT, "Creating Rock db: " << filePath);
245 const int swap = open(filePath, O_WRONLY|O_CREAT|O_TRUNC|O_BINARY, 0600);
246 if (swap < 0)
247 createError("create");
248
249 #if SLOWLY_FILL_WITH_ZEROS
250 char block[1024];
251 Must(maxSize() % sizeof(block) == 0);
252 memset(block, '\0', sizeof(block));
253
254 for (off_t offset = 0; offset < maxSize(); offset += sizeof(block)) {
255 if (write(swap, block, sizeof(block)) != sizeof(block))
256 createError("write");
257 }
258 #else
259 if (ftruncate(swap, maxSize()) != 0)
260 createError("truncate");
261
262 char header[HeaderSize];
263 memset(header, '\0', sizeof(header));
264 if (write(swap, header, sizeof(header)) != sizeof(header))
265 createError("write");
266 #endif
267
268 close(swap);
269 }
270
271 // report Rock DB creation error and exit
272 void
273 Rock::SwapDir::createError(const char *const msg)
274 {
275 int xerrno = errno; // XXX: where does errno come from?
276 debugs(47, DBG_CRITICAL, "ERROR: Failed to initialize Rock Store db in " <<
277 filePath << "; " << msg << " error: " << xstrerr(xerrno));
278 fatal("Rock Store db creation error");
279 }
280
281 void
282 Rock::SwapDir::init()
283 {
284 debugs(47,2, HERE);
285
286 // XXX: SwapDirs aren't refcounted. We make IORequestor calls, which
287 // are refcounted. We up our count once to avoid implicit delete's.
288 lock();
289
290 freeSlots = shm_old(Ipc::Mem::PageStack)(freeSlotsPath());
291
292 Must(!map);
293 map = new DirMap(inodeMapPath());
294 map->cleaner = this;
295
296 const char *ioModule = needsDiskStrand() ? "IpcIo" : "Blocking";
297 if (DiskIOModule *m = DiskIOModule::Find(ioModule)) {
298 debugs(47,2, HERE << "Using DiskIO module: " << ioModule);
299 io = m->createStrategy();
300 io->init();
301 } else {
302 debugs(47, DBG_CRITICAL, "FATAL: Rock store is missing DiskIO module: " <<
303 ioModule);
304 fatal("Rock Store missing a required DiskIO module");
305 }
306
307 theFile = io->newFile(filePath);
308 theFile->configure(fileConfig);
309 theFile->open(O_RDWR, 0644, this);
310
311 // Increment early. Otherwise, if one SwapDir finishes rebuild before
312 // others start, storeRebuildComplete() will think the rebuild is over!
313 // TODO: move store_dirs_rebuilding hack to store modules that need it.
314 ++StoreController::store_dirs_rebuilding;
315 }
316
317 bool
318 Rock::SwapDir::needsDiskStrand() const
319 {
320 const bool wontEvenWorkWithoutDisker = Config.workers > 1;
321 const bool wouldWorkBetterWithDisker = DiskIOModule::Find("IpcIo");
322 return InDaemonMode() && (wontEvenWorkWithoutDisker ||
323 wouldWorkBetterWithDisker);
324 }
325
326 void
327 Rock::SwapDir::parse(int anIndex, char *aPath)
328 {
329 index = anIndex;
330
331 path = xstrdup(aPath);
332
333 // cache store is located at path/db
334 String fname(path);
335 fname.append("/rock");
336 filePath = xstrdup(fname.termedBuf());
337
338 parseSize(false);
339 parseOptions(0);
340
341 // Current openForWriting() code overwrites the old slot if needed
342 // and possible, so proactively removing old slots is probably useless.
343 assert(!repl); // repl = createRemovalPolicy(Config.replPolicy);
344
345 validateOptions();
346 }
347
348 void
349 Rock::SwapDir::reconfigure()
350 {
351 parseSize(true);
352 parseOptions(1);
353 // TODO: can we reconfigure the replacement policy (repl)?
354 validateOptions();
355 }
356
357 /// parse maximum db disk size
358 void
359 Rock::SwapDir::parseSize(const bool reconfig)
360 {
361 const int i = GetInteger();
362 if (i < 0)
363 fatal("negative Rock cache_dir size value");
364 const uint64_t new_max_size =
365 static_cast<uint64_t>(i) << 20; // MBytes to Bytes
366 if (!reconfig)
367 max_size = new_max_size;
368 else if (new_max_size != max_size) {
369 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir '" << path << "' size "
370 "cannot be changed dynamically, value left unchanged (" <<
371 (max_size >> 20) << " MB)");
372 }
373 }
374
375 ConfigOption *
376 Rock::SwapDir::getOptionTree() const
377 {
378 ConfigOption *copt = ::SwapDir::getOptionTree();
379 ConfigOptionVector *vector = dynamic_cast<ConfigOptionVector*>(copt);
380 if (vector) {
381 // if copt is actually a ConfigOptionVector
382 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseSizeOption, &SwapDir::dumpSizeOption));
383 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseTimeOption, &SwapDir::dumpTimeOption));
384 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseRateOption, &SwapDir::dumpRateOption));
385 } else {
386 // we don't know how to handle copt, as it's not a ConfigOptionVector.
387 // free it (and return nullptr)
388 delete copt;
389 copt = nullptr;
390 }
391 return copt;
392 }
393
394 bool
395 Rock::SwapDir::allowOptionReconfigure(const char *const option) const
396 {
397 return strcmp(option, "slot-size") != 0 &&
398 ::SwapDir::allowOptionReconfigure(option);
399 }
400
401 /// parses time-specific options; mimics ::SwapDir::optionObjectSizeParse()
402 bool
403 Rock::SwapDir::parseTimeOption(char const *option, const char *value, int reconfig)
404 {
405 // TODO: ::SwapDir or, better, Config should provide time-parsing routines,
406 // including time unit handling. Same for size and rate.
407
408 time_msec_t *storedTime;
409 if (strcmp(option, "swap-timeout") == 0)
410 storedTime = &fileConfig.ioTimeout;
411 else
412 return false;
413
414 if (!value) {
415 self_destruct();
416 return false;
417 }
418
419 // TODO: handle time units and detect parsing errors better
420 const int64_t parsedValue = strtoll(value, NULL, 10);
421 if (parsedValue < 0) {
422 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
423 self_destruct();
424 return false;
425 }
426
427 const time_msec_t newTime = static_cast<time_msec_t>(parsedValue);
428
429 if (!reconfig)
430 *storedTime = newTime;
431 else if (*storedTime != newTime) {
432 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
433 << " cannot be changed dynamically, value left unchanged: " <<
434 *storedTime);
435 }
436
437 return true;
438 }
439
440 /// reports time-specific options; mimics ::SwapDir::optionObjectSizeDump()
441 void
442 Rock::SwapDir::dumpTimeOption(StoreEntry * e) const
443 {
444 if (fileConfig.ioTimeout)
445 storeAppendPrintf(e, " swap-timeout=%" PRId64,
446 static_cast<int64_t>(fileConfig.ioTimeout));
447 }
448
449 /// parses rate-specific options; mimics ::SwapDir::optionObjectSizeParse()
450 bool
451 Rock::SwapDir::parseRateOption(char const *option, const char *value, int isaReconfig)
452 {
453 int *storedRate;
454 if (strcmp(option, "max-swap-rate") == 0)
455 storedRate = &fileConfig.ioRate;
456 else
457 return false;
458
459 if (!value) {
460 self_destruct();
461 return false;
462 }
463
464 // TODO: handle time units and detect parsing errors better
465 const int64_t parsedValue = strtoll(value, NULL, 10);
466 if (parsedValue < 0) {
467 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
468 self_destruct();
469 return false;
470 }
471
472 const int newRate = static_cast<int>(parsedValue);
473
474 if (newRate < 0) {
475 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << newRate);
476 self_destruct();
477 return false;
478 }
479
480 if (!isaReconfig)
481 *storedRate = newRate;
482 else if (*storedRate != newRate) {
483 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
484 << " cannot be changed dynamically, value left unchanged: " <<
485 *storedRate);
486 }
487
488 return true;
489 }
490
491 /// reports rate-specific options; mimics ::SwapDir::optionObjectSizeDump()
492 void
493 Rock::SwapDir::dumpRateOption(StoreEntry * e) const
494 {
495 if (fileConfig.ioRate >= 0)
496 storeAppendPrintf(e, " max-swap-rate=%d", fileConfig.ioRate);
497 }
498
499 /// parses size-specific options; mimics ::SwapDir::optionObjectSizeParse()
500 bool
501 Rock::SwapDir::parseSizeOption(char const *option, const char *value, int reconfig)
502 {
503 uint64_t *storedSize;
504 if (strcmp(option, "slot-size") == 0)
505 storedSize = &slotSize;
506 else
507 return false;
508
509 if (!value) {
510 self_destruct();
511 return false;
512 }
513
514 // TODO: handle size units and detect parsing errors better
515 const uint64_t newSize = strtoll(value, NULL, 10);
516 if (newSize <= 0) {
517 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must be positive; got: " << newSize);
518 self_destruct();
519 return false;
520 }
521
522 if (newSize <= sizeof(DbCellHeader)) {
523 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must exceed " << sizeof(DbCellHeader) << "; got: " << newSize);
524 self_destruct();
525 return false;
526 }
527
528 if (!reconfig)
529 *storedSize = newSize;
530 else if (*storedSize != newSize) {
531 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
532 << " cannot be changed dynamically, value left unchanged: " <<
533 *storedSize);
534 }
535
536 return true;
537 }
538
539 /// reports size-specific options; mimics ::SwapDir::optionObjectSizeDump()
540 void
541 Rock::SwapDir::dumpSizeOption(StoreEntry * e) const
542 {
543 storeAppendPrintf(e, " slot-size=%" PRId64, slotSize);
544 }
545
546 /// check the results of the configuration; only level-0 debugging works here
547 void
548 Rock::SwapDir::validateOptions()
549 {
550 if (slotSize <= 0)
551 fatal("Rock store requires a positive slot-size");
552
553 const int64_t maxSizeRoundingWaste = 1024 * 1024; // size is configured in MB
554 const int64_t slotSizeRoundingWaste = slotSize;
555 const int64_t maxRoundingWaste =
556 max(maxSizeRoundingWaste, slotSizeRoundingWaste);
557
558 // an entry consumes at least one slot; round up to reduce false warnings
559 const int64_t blockSize = static_cast<int64_t>(slotSize);
560 const int64_t maxObjSize = max(blockSize,
561 ((maxObjectSize()+blockSize-1)/blockSize)*blockSize);
562
563 // Does the "sfileno*max-size" limit match configured db capacity?
564 const double entriesMayOccupy = entryLimitAbsolute()*static_cast<double>(maxObjSize);
565 if (entriesMayOccupy + maxRoundingWaste < maxSize()) {
566 const int64_t diskWasteSize = maxSize() - static_cast<int64_t>(entriesMayOccupy);
567 debugs(47, DBG_CRITICAL, "WARNING: Rock cache_dir " << path << " wastes disk space due to entry limits:" <<
568 "\n\tconfigured db capacity: " << maxSize() << " bytes" <<
569 "\n\tconfigured db slot size: " << slotSize << " bytes" <<
570 "\n\tconfigured maximum entry size: " << maxObjectSize() << " bytes" <<
571 "\n\tmaximum number of cache_dir entries supported by Squid: " << entryLimitAbsolute() <<
572 "\n\tdisk space all entries may use: " << entriesMayOccupy << " bytes" <<
573 "\n\tdisk space wasted: " << diskWasteSize << " bytes");
574 }
575
576 // Does the "absolute slot count" limit match configured db capacity?
577 const double slotsMayOccupy = slotLimitAbsolute()*static_cast<double>(slotSize);
578 if (slotsMayOccupy + maxRoundingWaste < maxSize()) {
579 const int64_t diskWasteSize = maxSize() - static_cast<int64_t>(entriesMayOccupy);
580 debugs(47, DBG_CRITICAL, "WARNING: Rock cache_dir " << path << " wastes disk space due to slot limits:" <<
581 "\n\tconfigured db capacity: " << maxSize() << " bytes" <<
582 "\n\tconfigured db slot size: " << slotSize << " bytes" <<
583 "\n\tmaximum number of rock cache_dir slots supported by Squid: " << slotLimitAbsolute() <<
584 "\n\tdisk space all slots may use: " << slotsMayOccupy << " bytes" <<
585 "\n\tdisk space wasted: " << diskWasteSize << " bytes");
586 }
587 }
588
589 void
590 Rock::SwapDir::rebuild()
591 {
592 //++StoreController::store_dirs_rebuilding; // see Rock::SwapDir::init()
593 AsyncJob::Start(new Rebuild(this));
594 }
595
596 bool
597 Rock::SwapDir::canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const
598 {
599 if (diskSpaceNeeded >= 0)
600 diskSpaceNeeded += sizeof(DbCellHeader);
601 if (!::SwapDir::canStore(e, diskSpaceNeeded, load))
602 return false;
603
604 if (!theFile || !theFile->canWrite())
605 return false;
606
607 if (!map)
608 return false;
609
610 // Do not start I/O transaction if there are less than 10% free pages left.
611 // TODO: reserve page instead
612 if (needsDiskStrand() &&
613 Ipc::Mem::PageLevel(Ipc::Mem::PageId::ioPage) >= 0.9 * Ipc::Mem::PageLimit(Ipc::Mem::PageId::ioPage)) {
614 debugs(47, 5, HERE << "too few shared pages for IPC I/O left");
615 return false;
616 }
617
618 if (io->shedLoad())
619 return false;
620
621 load = io->load();
622 return true;
623 }
624
625 StoreIOState::Pointer
626 Rock::SwapDir::createStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
627 {
628 if (!theFile || theFile->error()) {
629 debugs(47,4, HERE << theFile);
630 return NULL;
631 }
632
633 sfileno filen;
634 Ipc::StoreMapAnchor *const slot =
635 map->openForWriting(reinterpret_cast<const cache_key *>(e.key), filen);
636 if (!slot) {
637 debugs(47, 5, HERE << "map->add failed");
638 return NULL;
639 }
640
641 assert(filen >= 0);
642 slot->set(e);
643
644 // XXX: We rely on our caller, storeSwapOutStart(), to set e.fileno.
645 // If that does not happen, the entry will not decrement the read level!
646
647 Rock::SwapDir::Pointer self(this);
648 IoState *sio = new IoState(self, &e, cbFile, cbIo, data);
649
650 sio->swap_dirn = index;
651 sio->swap_filen = filen;
652 sio->writeableAnchor_ = slot;
653
654 debugs(47,5, HERE << "dir " << index << " created new filen " <<
655 std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
656 sio->swap_filen << std::dec << " starting at " <<
657 diskOffset(sio->swap_filen));
658
659 sio->file(theFile);
660
661 trackReferences(e);
662 return sio;
663 }
664
665 StoreIOState::Pointer
666 Rock::SwapDir::createUpdateIO(const Ipc::StoreMapUpdate &update, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
667 {
668 if (!theFile || theFile->error()) {
669 debugs(47,4, theFile);
670 return nullptr;
671 }
672
673 Must(update.fresh);
674 Must(update.fresh.fileNo >= 0);
675
676 Rock::SwapDir::Pointer self(this);
677 IoState *sio = new IoState(self, update.entry, cbFile, cbIo, data);
678
679 sio->swap_dirn = index;
680 sio->swap_filen = update.fresh.fileNo;
681 sio->writeableAnchor_ = update.fresh.anchor;
682
683 debugs(47,5, "dir " << index << " updating filen " <<
684 std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
685 sio->swap_filen << std::dec << " starting at " <<
686 diskOffset(sio->swap_filen));
687
688 sio->file(theFile);
689 return sio;
690 }
691
692 int64_t
693 Rock::SwapDir::diskOffset(const SlotId sid) const
694 {
695 assert(sid >= 0);
696 return HeaderSize + slotSize*sid;
697 }
698
699 int64_t
700 Rock::SwapDir::diskOffset(Ipc::Mem::PageId &pageId) const
701 {
702 assert(pageId);
703 return diskOffset(pageId.number - 1);
704 }
705
706 int64_t
707 Rock::SwapDir::diskOffsetLimit() const
708 {
709 assert(map);
710 return diskOffset(map->sliceLimit());
711 }
712
713 Rock::SlotId
714 Rock::SwapDir::reserveSlotForWriting()
715 {
716 Ipc::Mem::PageId pageId;
717
718 if (freeSlots->pop(pageId)) {
719 const auto slotId = pageId.number - 1;
720 debugs(47, 5, "got a previously free slot: " << slotId);
721 map->prepFreeSlice(slotId);
722 return slotId;
723 }
724
725 // catch free slots delivered to noteFreeMapSlice()
726 assert(!waitingForPage);
727 waitingForPage = &pageId;
728 if (map->purgeOne()) {
729 assert(!waitingForPage); // noteFreeMapSlice() should have cleared it
730 assert(pageId.set());
731 const auto slotId = pageId.number - 1;
732 debugs(47, 5, "got a previously busy slot: " << slotId);
733 map->prepFreeSlice(slotId);
734 return slotId;
735 }
736 assert(waitingForPage == &pageId);
737 waitingForPage = NULL;
738
739 // This may happen when the number of available db slots is close to the
740 // number of concurrent requests reading or writing those slots, which may
741 // happen when the db is "small" compared to the request traffic OR when we
742 // are rebuilding and have not loaded "many" entries or empty slots yet.
743 debugs(47, 3, "cannot get a slot; entries: " << map->entryCount());
744 throw TexcHere("ran out of free db slots");
745 }
746
747 bool
748 Rock::SwapDir::validSlotId(const SlotId slotId) const
749 {
750 return 0 <= slotId && slotId < slotLimitActual();
751 }
752
753 void
754 Rock::SwapDir::noteFreeMapSlice(const Ipc::StoreMapSliceId sliceId)
755 {
756 Ipc::Mem::PageId pageId;
757 pageId.pool = index+1;
758 pageId.number = sliceId+1;
759 if (waitingForPage) {
760 *waitingForPage = pageId;
761 waitingForPage = NULL;
762 } else {
763 freeSlots->push(pageId);
764 }
765 }
766
767 // tries to open an old entry with swap_filen for reading
768 StoreIOState::Pointer
769 Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
770 {
771 if (!theFile || theFile->error()) {
772 debugs(47,4, HERE << theFile);
773 return NULL;
774 }
775
776 if (!e.hasDisk()) {
777 debugs(47,4, HERE << e);
778 return NULL;
779 }
780
781 // Do not start I/O transaction if there are less than 10% free pages left.
782 // TODO: reserve page instead
783 if (needsDiskStrand() &&
784 Ipc::Mem::PageLevel(Ipc::Mem::PageId::ioPage) >= 0.9 * Ipc::Mem::PageLimit(Ipc::Mem::PageId::ioPage)) {
785 debugs(47, 5, HERE << "too few shared pages for IPC I/O left");
786 return NULL;
787 }
788
789 // The are two ways an entry can get swap_filen: our get() locked it for
790 // reading or our storeSwapOutStart() locked it for writing. Peeking at our
791 // locked entry is safe, but no support for reading the entry we swap out.
792 const Ipc::StoreMapAnchor *slot = map->peekAtReader(e.swap_filen);
793 if (!slot)
794 return NULL; // we were writing afterall
795
796 Rock::SwapDir::Pointer self(this);
797 IoState *sio = new IoState(self, &e, cbFile, cbIo, data);
798
799 sio->swap_dirn = index;
800 sio->swap_filen = e.swap_filen;
801 sio->readableAnchor_ = slot;
802 sio->file(theFile);
803
804 debugs(47,5, HERE << "dir " << index << " has old filen: " <<
805 std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
806 sio->swap_filen);
807
808 assert(slot->sameKey(static_cast<const cache_key*>(e.key)));
809 // For collapsed disk hits: e.swap_file_sz and slot->basics.swap_file_sz
810 // may still be zero and basics.swap_file_sz may grow.
811 assert(slot->basics.swap_file_sz >= e.swap_file_sz);
812
813 return sio;
814 }
815
816 void
817 Rock::SwapDir::ioCompletedNotification()
818 {
819 if (!theFile)
820 fatalf("Rock cache_dir failed to initialize db file: %s", filePath);
821
822 if (theFile->error()) {
823 int xerrno = errno; // XXX: where does errno come from
824 fatalf("Rock cache_dir at %s failed to open db file: %s", filePath,
825 xstrerr(xerrno));
826 }
827
828 debugs(47, 2, "Rock cache_dir[" << index << "] limits: " <<
829 std::setw(12) << maxSize() << " disk bytes, " <<
830 std::setw(7) << map->entryLimit() << " entries, and " <<
831 std::setw(7) << map->sliceLimit() << " slots");
832
833 rebuild();
834 }
835
836 void
837 Rock::SwapDir::closeCompleted()
838 {
839 theFile = NULL;
840 }
841
842 void
843 Rock::SwapDir::readCompleted(const char *, int rlen, int errflag, RefCount< ::ReadRequest> r)
844 {
845 ReadRequest *request = dynamic_cast<Rock::ReadRequest*>(r.getRaw());
846 assert(request);
847 IoState::Pointer sio = request->sio;
848
849 if (errflag == DISK_OK && rlen > 0)
850 sio->offset_ += rlen;
851
852 sio->callReaderBack(r->buf, rlen);
853 }
854
855 void
856 Rock::SwapDir::writeCompleted(int errflag, size_t, RefCount< ::WriteRequest> r)
857 {
858 Rock::WriteRequest *request = dynamic_cast<Rock::WriteRequest*>(r.getRaw());
859 assert(request);
860 assert(request->sio != NULL);
861 IoState &sio = *request->sio;
862
863 // quit if somebody called IoState::close() while we were waiting
864 if (!sio.stillWaiting()) {
865 debugs(79, 3, "ignoring closed entry " << sio.swap_filen);
866 noteFreeMapSlice(request->sidNext);
867 return;
868 }
869
870 debugs(79, 7, "errflag=" << errflag << " rlen=" << request->len << " eof=" << request->eof);
871
872 if (errflag != DISK_OK)
873 handleWriteCompletionProblem(errflag, *request);
874 else if (droppedEarlierRequest(*request))
875 handleWriteCompletionProblem(DISK_ERROR, *request);
876 else
877 handleWriteCompletionSuccess(*request);
878
879 if (sio.touchingStoreEntry())
880 CollapsedForwarding::Broadcast(*sio.e);
881 }
882
883 /// code shared by writeCompleted() success handling cases
884 void
885 Rock::SwapDir::handleWriteCompletionSuccess(const WriteRequest &request)
886 {
887 auto &sio = *(request.sio);
888 sio.splicingPoint = request.sidCurrent;
889 // do not increment sio.offset_ because we do it in sio->write()
890
891 // finalize the shared slice info after writing slice contents to disk
892 Ipc::StoreMap::Slice &slice =
893 map->writeableSlice(sio.swap_filen, request.sidCurrent);
894 slice.size = request.len - sizeof(DbCellHeader);
895 slice.next = request.sidNext;
896
897 if (request.eof) {
898 assert(sio.e);
899 assert(sio.writeableAnchor_);
900 if (sio.touchingStoreEntry()) {
901 sio.e->swap_file_sz = sio.writeableAnchor_->basics.swap_file_sz =
902 sio.offset_;
903
904 map->switchWritingToReading(sio.swap_filen);
905 // sio.e keeps the (now read) lock on the anchor
906 }
907 sio.writeableAnchor_ = NULL;
908 sio.finishedWriting(DISK_OK);
909 }
910 }
911
912 /// code shared by writeCompleted() error handling cases
913 void
914 Rock::SwapDir::handleWriteCompletionProblem(const int errflag, const WriteRequest &request)
915 {
916 auto &sio = *request.sio;
917
918 noteFreeMapSlice(request.sidNext);
919
920 writeError(sio);
921 sio.finishedWriting(errflag);
922 // and hope that Core will call disconnect() to close the map entry
923 }
924
925 void
926 Rock::SwapDir::writeError(StoreIOState &sio)
927 {
928 // Do not abortWriting here. The entry should keep the write lock
929 // instead of losing association with the store and confusing core.
930 map->freeEntry(sio.swap_filen); // will mark as unusable, just in case
931
932 if (sio.touchingStoreEntry())
933 Store::Root().stopSharing(*sio.e);
934 // else noop: a fresh entry update error does not affect stale entry readers
935
936 // All callers must also call IoState callback, to propagate the error.
937 }
938
939 /// whether the disk has dropped at least one of the previous write requests
940 bool
941 Rock::SwapDir::droppedEarlierRequest(const WriteRequest &request) const
942 {
943 const auto &sio = *request.sio;
944 assert(sio.writeableAnchor_);
945 const Ipc::StoreMapSliceId expectedSliceId = sio.splicingPoint < 0 ?
946 sio.writeableAnchor_->start :
947 map->writeableSlice(sio.swap_filen, sio.splicingPoint).next;
948 if (expectedSliceId != request.sidCurrent) {
949 debugs(79, 3, "yes; expected " << expectedSliceId << ", but got " << request.sidCurrent);
950 return true;
951 }
952
953 return false;
954 }
955
956 void
957 Rock::SwapDir::updateHeaders(StoreEntry *updatedE)
958 {
959 if (!map)
960 return;
961
962 Ipc::StoreMapUpdate update(updatedE);
963 if (!map->openForUpdating(update, updatedE->swap_filen))
964 return;
965
966 try {
967 AsyncJob::Start(new HeaderUpdater(this, update));
968 } catch (const std::exception &ex) {
969 debugs(20, 2, "error starting to update entry " << *updatedE << ": " << ex.what());
970 map->abortUpdating(update);
971 }
972 }
973
974 bool
975 Rock::SwapDir::full() const
976 {
977 return freeSlots != NULL && !freeSlots->size();
978 }
979
980 // storeSwapOutFileClosed calls this nethod on DISK_NO_SPACE_LEFT,
981 // but it should not happen for us
982 void
983 Rock::SwapDir::diskFull()
984 {
985 debugs(20, DBG_IMPORTANT, "BUG: No space left with rock cache_dir: " <<
986 filePath);
987 }
988
989 /// purge while full(); it should be sufficient to purge just one
990 void
991 Rock::SwapDir::maintain()
992 {
993 // The Store calls this to free some db space, but there is nothing wrong
994 // with a full() db, except when db has to shrink after reconfigure, and
995 // we do not support shrinking yet (it would have to purge specific slots).
996 // TODO: Disable maintain() requests when they are pointless.
997 }
998
999 void
1000 Rock::SwapDir::reference(StoreEntry &e)
1001 {
1002 debugs(47, 5, HERE << &e << ' ' << e.swap_dirn << ' ' << e.swap_filen);
1003 if (repl && repl->Referenced)
1004 repl->Referenced(repl, &e, &e.repl);
1005 }
1006
1007 bool
1008 Rock::SwapDir::dereference(StoreEntry &e)
1009 {
1010 debugs(47, 5, HERE << &e << ' ' << e.swap_dirn << ' ' << e.swap_filen);
1011 if (repl && repl->Dereferenced)
1012 repl->Dereferenced(repl, &e, &e.repl);
1013
1014 // no need to keep e in the global store_table for us; we have our own map
1015 return false;
1016 }
1017
1018 bool
1019 Rock::SwapDir::unlinkdUseful() const
1020 {
1021 // no entry-specific files to unlink
1022 return false;
1023 }
1024
1025 void
1026 Rock::SwapDir::evictIfFound(const cache_key *key)
1027 {
1028 if (map)
1029 map->freeEntryByKey(key); // may not be there
1030 }
1031
1032 void
1033 Rock::SwapDir::evictCached(StoreEntry &e)
1034 {
1035 debugs(47, 5, e);
1036 if (e.hasDisk(index)) {
1037 if (map->freeEntry(e.swap_filen))
1038 CollapsedForwarding::Broadcast(e);
1039 if (!e.locked())
1040 disconnect(e);
1041 } else if (const auto key = e.publicKey()) {
1042 evictIfFound(key);
1043 }
1044 }
1045
1046 void
1047 Rock::SwapDir::trackReferences(StoreEntry &e)
1048 {
1049 debugs(47, 5, HERE << e);
1050 if (repl)
1051 repl->Add(repl, &e, &e.repl);
1052 }
1053
1054 void
1055 Rock::SwapDir::ignoreReferences(StoreEntry &e)
1056 {
1057 debugs(47, 5, HERE << e);
1058 if (repl)
1059 repl->Remove(repl, &e, &e.repl);
1060 }
1061
1062 void
1063 Rock::SwapDir::statfs(StoreEntry &e) const
1064 {
1065 storeAppendPrintf(&e, "\n");
1066 storeAppendPrintf(&e, "Maximum Size: %" PRIu64 " KB\n", maxSize() >> 10);
1067 storeAppendPrintf(&e, "Current Size: %.2f KB %.2f%%\n",
1068 currentSize() / 1024.0,
1069 Math::doublePercent(currentSize(), maxSize()));
1070
1071 const int entryLimit = entryLimitActual();
1072 const int slotLimit = slotLimitActual();
1073 storeAppendPrintf(&e, "Maximum entries: %9d\n", entryLimit);
1074 if (map && entryLimit > 0) {
1075 const int entryCount = map->entryCount();
1076 storeAppendPrintf(&e, "Current entries: %9d %.2f%%\n",
1077 entryCount, (100.0 * entryCount / entryLimit));
1078 }
1079
1080 storeAppendPrintf(&e, "Maximum slots: %9d\n", slotLimit);
1081 if (map && slotLimit > 0) {
1082 const unsigned int slotsFree = !freeSlots ? 0 : freeSlots->size();
1083 if (slotsFree <= static_cast<const unsigned int>(slotLimit)) {
1084 const int usedSlots = slotLimit - static_cast<const int>(slotsFree);
1085 storeAppendPrintf(&e, "Used slots: %9d %.2f%%\n",
1086 usedSlots, (100.0 * usedSlots / slotLimit));
1087 }
1088 if (slotLimit < 100) { // XXX: otherwise too expensive to count
1089 Ipc::ReadWriteLockStats stats;
1090 map->updateStats(stats);
1091 stats.dump(e);
1092 }
1093 }
1094
1095 storeAppendPrintf(&e, "Pending operations: %d out of %d\n",
1096 store_open_disk_fd, Config.max_open_disk_fds);
1097
1098 storeAppendPrintf(&e, "Flags:");
1099
1100 if (flags.selected)
1101 storeAppendPrintf(&e, " SELECTED");
1102
1103 if (flags.read_only)
1104 storeAppendPrintf(&e, " READ-ONLY");
1105
1106 storeAppendPrintf(&e, "\n");
1107
1108 }
1109
1110 SBuf
1111 Rock::SwapDir::inodeMapPath() const
1112 {
1113 return Ipc::Mem::Segment::Name(SBuf(path), "map");
1114 }
1115
1116 const char *
1117 Rock::SwapDir::freeSlotsPath() const
1118 {
1119 static String spacesPath;
1120 spacesPath = path;
1121 spacesPath.append("_spaces");
1122 return spacesPath.termedBuf();
1123 }
1124
1125 bool
1126 Rock::SwapDir::hasReadableEntry(const StoreEntry &e) const
1127 {
1128 return map->hasReadableEntry(reinterpret_cast<const cache_key*>(e.key));
1129 }
1130
1131 namespace Rock
1132 {
1133 RunnerRegistrationEntry(SwapDirRr);
1134 }
1135
1136 void Rock::SwapDirRr::create()
1137 {
1138 Must(mapOwners.empty() && freeSlotsOwners.empty());
1139 for (int i = 0; i < Config.cacheSwap.n_configured; ++i) {
1140 if (const Rock::SwapDir *const sd = dynamic_cast<Rock::SwapDir *>(INDEXSD(i))) {
1141 const int64_t capacity = sd->slotLimitActual();
1142
1143 SwapDir::DirMap::Owner *const mapOwner =
1144 SwapDir::DirMap::Init(sd->inodeMapPath(), capacity);
1145 mapOwners.push_back(mapOwner);
1146
1147 // TODO: somehow remove pool id and counters from PageStack?
1148 Ipc::Mem::Owner<Ipc::Mem::PageStack> *const freeSlotsOwner =
1149 shm_new(Ipc::Mem::PageStack)(sd->freeSlotsPath(),
1150 i+1, capacity, 0);
1151 freeSlotsOwners.push_back(freeSlotsOwner);
1152
1153 // TODO: add method to initialize PageStack with no free pages
1154 while (true) {
1155 Ipc::Mem::PageId pageId;
1156 if (!freeSlotsOwner->object()->pop(pageId))
1157 break;
1158 }
1159 }
1160 }
1161 }
1162
1163 Rock::SwapDirRr::~SwapDirRr()
1164 {
1165 for (size_t i = 0; i < mapOwners.size(); ++i) {
1166 delete mapOwners[i];
1167 delete freeSlotsOwners[i];
1168 }
1169 }
1170