]> git.ipfire.org Git - thirdparty/squid.git/blob - src/fs/rock/RockSwapDir.cc
Source Format Enforcement (#763)
[thirdparty/squid.git] / src / fs / rock / RockSwapDir.cc
1 /*
2 * Copyright (C) 1996-2021 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 47 Store Directory Routines */
10
11 #include "squid.h"
12 #include "cache_cf.h"
13 #include "CollapsedForwarding.h"
14 #include "ConfigOption.h"
15 #include "DiskIO/DiskIOModule.h"
16 #include "DiskIO/DiskIOStrategy.h"
17 #include "DiskIO/ReadRequest.h"
18 #include "DiskIO/WriteRequest.h"
19 #include "fs/rock/RockHeaderUpdater.h"
20 #include "fs/rock/RockIoRequests.h"
21 #include "fs/rock/RockIoState.h"
22 #include "fs/rock/RockSwapDir.h"
23 #include "globals.h"
24 #include "ipc/mem/Pages.h"
25 #include "MemObject.h"
26 #include "Parsing.h"
27 #include "SquidConfig.h"
28 #include "SquidMath.h"
29 #include "tools.h"
30
31 #include <cstdlib>
32 #include <iomanip>
33 #include <limits>
34
35 #if HAVE_SYS_STAT_H
36 #include <sys/stat.h>
37 #endif
38
39 const int64_t Rock::SwapDir::HeaderSize = 16*1024;
40
41 Rock::SwapDir::SwapDir(): ::SwapDir("rock"),
42 slotSize(HeaderSize), filePath(NULL), map(NULL), io(NULL),
43 waitingForPage(NULL)
44 {
45 }
46
47 Rock::SwapDir::~SwapDir()
48 {
49 delete io;
50 delete map;
51 safe_free(filePath);
52 }
53
54 // called when Squid core needs a StoreEntry with a given key
55 StoreEntry *
56 Rock::SwapDir::get(const cache_key *key)
57 {
58 if (!map || !theFile || !theFile->canRead())
59 return NULL;
60
61 sfileno filen;
62 const Ipc::StoreMapAnchor *const slot = map->openForReading(key, filen);
63 if (!slot)
64 return NULL;
65
66 // create a brand new store entry and initialize it with stored basics
67 StoreEntry *e = new StoreEntry();
68 e->createMemObject();
69 anchorEntry(*e, filen, *slot);
70 trackReferences(*e);
71 return e;
72 }
73
74 bool
75 Rock::SwapDir::anchorToCache(StoreEntry &entry, bool &inSync)
76 {
77 if (!map || !theFile || !theFile->canRead())
78 return false;
79
80 sfileno filen;
81 const Ipc::StoreMapAnchor *const slot = map->openForReading(
82 reinterpret_cast<cache_key*>(entry.key), filen);
83 if (!slot)
84 return false;
85
86 anchorEntry(entry, filen, *slot);
87 inSync = updateAnchoredWith(entry, *slot);
88 return true; // even if inSync is false
89 }
90
91 bool
92 Rock::SwapDir::updateAnchored(StoreEntry &entry)
93 {
94 if (!map || !theFile || !theFile->canRead())
95 return false;
96
97 assert(entry.hasDisk(index));
98
99 const Ipc::StoreMapAnchor &s = map->readableEntry(entry.swap_filen);
100 return updateAnchoredWith(entry, s);
101 }
102
103 bool
104 Rock::SwapDir::updateAnchoredWith(StoreEntry &entry, const Ipc::StoreMapAnchor &anchor)
105 {
106 entry.swap_file_sz = anchor.basics.swap_file_sz;
107 return true;
108 }
109
110 void
111 Rock::SwapDir::anchorEntry(StoreEntry &e, const sfileno filen, const Ipc::StoreMapAnchor &anchor)
112 {
113 anchor.exportInto(e);
114
115 const bool complete = anchor.complete();
116 e.store_status = complete ? STORE_OK : STORE_PENDING;
117 // SWAPOUT_WRITING: even though another worker writes?
118 e.attachToDisk(index, filen, complete ? SWAPOUT_DONE : SWAPOUT_WRITING);
119
120 e.ping_status = PING_NONE;
121
122 EBIT_SET(e.flags, ENTRY_VALIDATED);
123 }
124
125 void Rock::SwapDir::disconnect(StoreEntry &e)
126 {
127 assert(e.hasDisk(index));
128
129 ignoreReferences(e);
130
131 // do not rely on e.swap_status here because there is an async delay
132 // before it switches from SWAPOUT_WRITING to SWAPOUT_DONE.
133
134 // since e has swap_filen, its slot is locked for reading and/or writing
135 // but it is difficult to know whether THIS worker is reading or writing e,
136 // especially since we may switch from writing to reading. This code relies
137 // on Rock::IoState::writeableAnchor_ being set when we locked for writing.
138 if (e.mem_obj && e.mem_obj->swapout.sio != NULL &&
139 dynamic_cast<IoState&>(*e.mem_obj->swapout.sio).writeableAnchor_) {
140 map->abortWriting(e.swap_filen);
141 e.detachFromDisk();
142 dynamic_cast<IoState&>(*e.mem_obj->swapout.sio).writeableAnchor_ = NULL;
143 Store::Root().stopSharing(e); // broadcasts after the change
144 } else {
145 map->closeForReading(e.swap_filen);
146 e.detachFromDisk();
147 }
148 }
149
150 uint64_t
151 Rock::SwapDir::currentSize() const
152 {
153 const uint64_t spaceSize = !freeSlots ?
154 maxSize() : (slotSize * freeSlots->size());
155 // everything that is not free is in use
156 return maxSize() - spaceSize;
157 }
158
159 uint64_t
160 Rock::SwapDir::currentCount() const
161 {
162 return map ? map->entryCount() : 0;
163 }
164
165 /// In SMP mode only the disker process reports stats to avoid
166 /// counting the same stats by multiple processes.
167 bool
168 Rock::SwapDir::doReportStat() const
169 {
170 return ::SwapDir::doReportStat() && (!UsingSmp() || IamDiskProcess());
171 }
172
173 void
174 Rock::SwapDir::finalizeSwapoutSuccess(const StoreEntry &)
175 {
176 // nothing to do
177 }
178
179 void
180 Rock::SwapDir::finalizeSwapoutFailure(StoreEntry &entry)
181 {
182 debugs(47, 5, entry);
183 disconnect(entry); // calls abortWriting() to free the disk entry
184 }
185
186 int64_t
187 Rock::SwapDir::slotLimitAbsolute() const
188 {
189 // the max value is an invalid one; all values must be below the limit
190 assert(std::numeric_limits<Ipc::StoreMapSliceId>::max() ==
191 std::numeric_limits<SlotId>::max());
192 return std::numeric_limits<SlotId>::max();
193 }
194
195 int64_t
196 Rock::SwapDir::slotLimitActual() const
197 {
198 const int64_t sWanted = (maxSize() - HeaderSize)/slotSize;
199 const int64_t sLimitLo = map ? map->sliceLimit() : 0; // dynamic shrinking unsupported
200 const int64_t sLimitHi = slotLimitAbsolute();
201 return min(max(sLimitLo, sWanted), sLimitHi);
202 }
203
204 int64_t
205 Rock::SwapDir::entryLimitActual() const
206 {
207 return min(slotLimitActual(), entryLimitAbsolute());
208 }
209
210 // TODO: encapsulate as a tool
211 void
212 Rock::SwapDir::create()
213 {
214 assert(path);
215 assert(filePath);
216
217 if (UsingSmp() && !IamDiskProcess()) {
218 debugs (47,3, HERE << "disker will create in " << path);
219 return;
220 }
221
222 debugs (47,3, HERE << "creating in " << path);
223
224 struct stat dir_sb;
225 if (::stat(path, &dir_sb) == 0) {
226 struct stat file_sb;
227 if (::stat(filePath, &file_sb) == 0) {
228 debugs (47, DBG_IMPORTANT, "Skipping existing Rock db: " << filePath);
229 return;
230 }
231 // else the db file is not there or is not accessible, and we will try
232 // to create it later below, generating a detailed error on failures.
233 } else { // path does not exist or is inaccessible
234 // If path exists but is not accessible, mkdir() below will fail, and
235 // the admin should see the error and act accordingly, so there is
236 // no need to distinguish ENOENT from other possible stat() errors.
237 debugs (47, DBG_IMPORTANT, "Creating Rock db directory: " << path);
238 const int res = mkdir(path, 0700);
239 if (res != 0)
240 createError("mkdir");
241 }
242
243 debugs (47, DBG_IMPORTANT, "Creating Rock db: " << filePath);
244 const int swap = open(filePath, O_WRONLY|O_CREAT|O_TRUNC|O_BINARY, 0600);
245 if (swap < 0)
246 createError("create");
247
248 #if SLOWLY_FILL_WITH_ZEROS
249 char block[1024];
250 Must(maxSize() % sizeof(block) == 0);
251 memset(block, '\0', sizeof(block));
252
253 for (off_t offset = 0; offset < maxSize(); offset += sizeof(block)) {
254 if (write(swap, block, sizeof(block)) != sizeof(block))
255 createError("write");
256 }
257 #else
258 if (ftruncate(swap, maxSize()) != 0)
259 createError("truncate");
260
261 char header[HeaderSize];
262 memset(header, '\0', sizeof(header));
263 if (write(swap, header, sizeof(header)) != sizeof(header))
264 createError("write");
265 #endif
266
267 close(swap);
268 }
269
270 // report Rock DB creation error and exit
271 void
272 Rock::SwapDir::createError(const char *const msg)
273 {
274 int xerrno = errno; // XXX: where does errno come from?
275 debugs(47, DBG_CRITICAL, "ERROR: Failed to initialize Rock Store db in " <<
276 filePath << "; " << msg << " error: " << xstrerr(xerrno));
277 fatal("Rock Store db creation error");
278 }
279
280 void
281 Rock::SwapDir::init()
282 {
283 debugs(47,2, HERE);
284
285 // XXX: SwapDirs aren't refcounted. We make IORequestor calls, which
286 // are refcounted. We up our count once to avoid implicit delete's.
287 lock();
288
289 freeSlots = shm_old(Ipc::Mem::PageStack)(freeSlotsPath());
290
291 Must(!map);
292 map = new DirMap(inodeMapPath());
293 map->cleaner = this;
294
295 const char *ioModule = needsDiskStrand() ? "IpcIo" : "Blocking";
296 if (DiskIOModule *m = DiskIOModule::Find(ioModule)) {
297 debugs(47,2, HERE << "Using DiskIO module: " << ioModule);
298 io = m->createStrategy();
299 io->init();
300 } else {
301 debugs(47, DBG_CRITICAL, "FATAL: Rock store is missing DiskIO module: " <<
302 ioModule);
303 fatal("Rock Store missing a required DiskIO module");
304 }
305
306 theFile = io->newFile(filePath);
307 theFile->configure(fileConfig);
308 theFile->open(O_RDWR, 0644, this);
309 }
310
311 bool
312 Rock::SwapDir::needsDiskStrand() const
313 {
314 const bool wontEvenWorkWithoutDisker = Config.workers > 1;
315 const bool wouldWorkBetterWithDisker = DiskIOModule::Find("IpcIo");
316 return InDaemonMode() && (wontEvenWorkWithoutDisker ||
317 wouldWorkBetterWithDisker);
318 }
319
320 void
321 Rock::SwapDir::parse(int anIndex, char *aPath)
322 {
323 index = anIndex;
324
325 path = xstrdup(aPath);
326
327 // cache store is located at path/db
328 String fname(path);
329 fname.append("/rock");
330 filePath = xstrdup(fname.termedBuf());
331
332 parseSize(false);
333 parseOptions(0);
334
335 // Current openForWriting() code overwrites the old slot if needed
336 // and possible, so proactively removing old slots is probably useless.
337 assert(!repl); // repl = createRemovalPolicy(Config.replPolicy);
338
339 validateOptions();
340 }
341
342 void
343 Rock::SwapDir::reconfigure()
344 {
345 parseSize(true);
346 parseOptions(1);
347 // TODO: can we reconfigure the replacement policy (repl)?
348 validateOptions();
349 }
350
351 /// parse maximum db disk size
352 void
353 Rock::SwapDir::parseSize(const bool reconfig)
354 {
355 const int i = GetInteger();
356 if (i < 0)
357 fatal("negative Rock cache_dir size value");
358 const uint64_t new_max_size =
359 static_cast<uint64_t>(i) << 20; // MBytes to Bytes
360 if (!reconfig)
361 max_size = new_max_size;
362 else if (new_max_size != max_size) {
363 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir '" << path << "' size "
364 "cannot be changed dynamically, value left unchanged (" <<
365 (max_size >> 20) << " MB)");
366 }
367 }
368
369 ConfigOption *
370 Rock::SwapDir::getOptionTree() const
371 {
372 ConfigOption *copt = ::SwapDir::getOptionTree();
373 ConfigOptionVector *vector = dynamic_cast<ConfigOptionVector*>(copt);
374 if (vector) {
375 // if copt is actually a ConfigOptionVector
376 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseSizeOption, &SwapDir::dumpSizeOption));
377 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseTimeOption, &SwapDir::dumpTimeOption));
378 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseRateOption, &SwapDir::dumpRateOption));
379 } else {
380 // we don't know how to handle copt, as it's not a ConfigOptionVector.
381 // free it (and return nullptr)
382 delete copt;
383 copt = nullptr;
384 }
385 return copt;
386 }
387
388 bool
389 Rock::SwapDir::allowOptionReconfigure(const char *const option) const
390 {
391 return strcmp(option, "slot-size") != 0 &&
392 ::SwapDir::allowOptionReconfigure(option);
393 }
394
395 /// parses time-specific options; mimics ::SwapDir::optionObjectSizeParse()
396 bool
397 Rock::SwapDir::parseTimeOption(char const *option, const char *value, int reconfig)
398 {
399 // TODO: ::SwapDir or, better, Config should provide time-parsing routines,
400 // including time unit handling. Same for size and rate.
401
402 time_msec_t *storedTime;
403 if (strcmp(option, "swap-timeout") == 0)
404 storedTime = &fileConfig.ioTimeout;
405 else
406 return false;
407
408 if (!value) {
409 self_destruct();
410 return false;
411 }
412
413 // TODO: handle time units and detect parsing errors better
414 const int64_t parsedValue = strtoll(value, NULL, 10);
415 if (parsedValue < 0) {
416 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
417 self_destruct();
418 return false;
419 }
420
421 const time_msec_t newTime = static_cast<time_msec_t>(parsedValue);
422
423 if (!reconfig)
424 *storedTime = newTime;
425 else if (*storedTime != newTime) {
426 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
427 << " cannot be changed dynamically, value left unchanged: " <<
428 *storedTime);
429 }
430
431 return true;
432 }
433
434 /// reports time-specific options; mimics ::SwapDir::optionObjectSizeDump()
435 void
436 Rock::SwapDir::dumpTimeOption(StoreEntry * e) const
437 {
438 if (fileConfig.ioTimeout)
439 storeAppendPrintf(e, " swap-timeout=%" PRId64,
440 static_cast<int64_t>(fileConfig.ioTimeout));
441 }
442
443 /// parses rate-specific options; mimics ::SwapDir::optionObjectSizeParse()
444 bool
445 Rock::SwapDir::parseRateOption(char const *option, const char *value, int isaReconfig)
446 {
447 int *storedRate;
448 if (strcmp(option, "max-swap-rate") == 0)
449 storedRate = &fileConfig.ioRate;
450 else
451 return false;
452
453 if (!value) {
454 self_destruct();
455 return false;
456 }
457
458 // TODO: handle time units and detect parsing errors better
459 const int64_t parsedValue = strtoll(value, NULL, 10);
460 if (parsedValue < 0) {
461 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
462 self_destruct();
463 return false;
464 }
465
466 const int newRate = static_cast<int>(parsedValue);
467
468 if (newRate < 0) {
469 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << newRate);
470 self_destruct();
471 return false;
472 }
473
474 if (!isaReconfig)
475 *storedRate = newRate;
476 else if (*storedRate != newRate) {
477 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
478 << " cannot be changed dynamically, value left unchanged: " <<
479 *storedRate);
480 }
481
482 return true;
483 }
484
485 /// reports rate-specific options; mimics ::SwapDir::optionObjectSizeDump()
486 void
487 Rock::SwapDir::dumpRateOption(StoreEntry * e) const
488 {
489 if (fileConfig.ioRate >= 0)
490 storeAppendPrintf(e, " max-swap-rate=%d", fileConfig.ioRate);
491 }
492
493 /// parses size-specific options; mimics ::SwapDir::optionObjectSizeParse()
494 bool
495 Rock::SwapDir::parseSizeOption(char const *option, const char *value, int reconfig)
496 {
497 uint64_t *storedSize;
498 if (strcmp(option, "slot-size") == 0)
499 storedSize = &slotSize;
500 else
501 return false;
502
503 if (!value) {
504 self_destruct();
505 return false;
506 }
507
508 // TODO: handle size units and detect parsing errors better
509 const uint64_t newSize = strtoll(value, NULL, 10);
510 if (newSize <= 0) {
511 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must be positive; got: " << newSize);
512 self_destruct();
513 return false;
514 }
515
516 if (newSize <= sizeof(DbCellHeader)) {
517 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must exceed " << sizeof(DbCellHeader) << "; got: " << newSize);
518 self_destruct();
519 return false;
520 }
521
522 if (!reconfig)
523 *storedSize = newSize;
524 else if (*storedSize != newSize) {
525 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
526 << " cannot be changed dynamically, value left unchanged: " <<
527 *storedSize);
528 }
529
530 return true;
531 }
532
533 /// reports size-specific options; mimics ::SwapDir::optionObjectSizeDump()
534 void
535 Rock::SwapDir::dumpSizeOption(StoreEntry * e) const
536 {
537 storeAppendPrintf(e, " slot-size=%" PRId64, slotSize);
538 }
539
540 /// check the results of the configuration; only level-0 debugging works here
541 void
542 Rock::SwapDir::validateOptions()
543 {
544 if (slotSize <= 0)
545 fatal("Rock store requires a positive slot-size");
546
547 const int64_t maxSizeRoundingWaste = 1024 * 1024; // size is configured in MB
548 const int64_t slotSizeRoundingWaste = slotSize;
549 const int64_t maxRoundingWaste =
550 max(maxSizeRoundingWaste, slotSizeRoundingWaste);
551
552 // an entry consumes at least one slot; round up to reduce false warnings
553 const int64_t blockSize = static_cast<int64_t>(slotSize);
554 const int64_t maxObjSize = max(blockSize,
555 ((maxObjectSize()+blockSize-1)/blockSize)*blockSize);
556
557 // Does the "sfileno*max-size" limit match configured db capacity?
558 const double entriesMayOccupy = entryLimitAbsolute()*static_cast<double>(maxObjSize);
559 if (entriesMayOccupy + maxRoundingWaste < maxSize()) {
560 const int64_t diskWasteSize = maxSize() - static_cast<int64_t>(entriesMayOccupy);
561 debugs(47, DBG_CRITICAL, "WARNING: Rock cache_dir " << path << " wastes disk space due to entry limits:" <<
562 "\n\tconfigured db capacity: " << maxSize() << " bytes" <<
563 "\n\tconfigured db slot size: " << slotSize << " bytes" <<
564 "\n\tconfigured maximum entry size: " << maxObjectSize() << " bytes" <<
565 "\n\tmaximum number of cache_dir entries supported by Squid: " << entryLimitAbsolute() <<
566 "\n\tdisk space all entries may use: " << entriesMayOccupy << " bytes" <<
567 "\n\tdisk space wasted: " << diskWasteSize << " bytes");
568 }
569
570 // Does the "absolute slot count" limit match configured db capacity?
571 const double slotsMayOccupy = slotLimitAbsolute()*static_cast<double>(slotSize);
572 if (slotsMayOccupy + maxRoundingWaste < maxSize()) {
573 const int64_t diskWasteSize = maxSize() - static_cast<int64_t>(entriesMayOccupy);
574 debugs(47, DBG_CRITICAL, "WARNING: Rock cache_dir " << path << " wastes disk space due to slot limits:" <<
575 "\n\tconfigured db capacity: " << maxSize() << " bytes" <<
576 "\n\tconfigured db slot size: " << slotSize << " bytes" <<
577 "\n\tmaximum number of rock cache_dir slots supported by Squid: " << slotLimitAbsolute() <<
578 "\n\tdisk space all slots may use: " << slotsMayOccupy << " bytes" <<
579 "\n\tdisk space wasted: " << diskWasteSize << " bytes");
580 }
581 }
582
583 bool
584 Rock::SwapDir::canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const
585 {
586 if (diskSpaceNeeded >= 0)
587 diskSpaceNeeded += sizeof(DbCellHeader);
588 if (!::SwapDir::canStore(e, diskSpaceNeeded, load))
589 return false;
590
591 if (!theFile || !theFile->canWrite())
592 return false;
593
594 if (!map)
595 return false;
596
597 // Do not start I/O transaction if there are less than 10% free pages left.
598 // TODO: reserve page instead
599 if (needsDiskStrand() &&
600 Ipc::Mem::PageLevel(Ipc::Mem::PageId::ioPage) >= 0.9 * Ipc::Mem::PageLimit(Ipc::Mem::PageId::ioPage)) {
601 debugs(47, 5, HERE << "too few shared pages for IPC I/O left");
602 return false;
603 }
604
605 if (io->shedLoad())
606 return false;
607
608 load = io->load();
609 return true;
610 }
611
612 StoreIOState::Pointer
613 Rock::SwapDir::createStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
614 {
615 if (!theFile || theFile->error()) {
616 debugs(47,4, HERE << theFile);
617 return NULL;
618 }
619
620 sfileno filen;
621 Ipc::StoreMapAnchor *const slot =
622 map->openForWriting(reinterpret_cast<const cache_key *>(e.key), filen);
623 if (!slot) {
624 debugs(47, 5, HERE << "map->add failed");
625 return NULL;
626 }
627
628 assert(filen >= 0);
629 slot->set(e);
630
631 // XXX: We rely on our caller, storeSwapOutStart(), to set e.fileno.
632 // If that does not happen, the entry will not decrement the read level!
633
634 Rock::SwapDir::Pointer self(this);
635 IoState *sio = new IoState(self, &e, cbFile, cbIo, data);
636
637 sio->swap_dirn = index;
638 sio->swap_filen = filen;
639 sio->writeableAnchor_ = slot;
640
641 debugs(47,5, HERE << "dir " << index << " created new filen " <<
642 std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
643 sio->swap_filen << std::dec << " starting at " <<
644 diskOffset(sio->swap_filen));
645
646 sio->file(theFile);
647
648 trackReferences(e);
649 return sio;
650 }
651
652 StoreIOState::Pointer
653 Rock::SwapDir::createUpdateIO(const Ipc::StoreMapUpdate &update, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
654 {
655 if (!theFile || theFile->error()) {
656 debugs(47,4, theFile);
657 return nullptr;
658 }
659
660 Must(update.fresh);
661 Must(update.fresh.fileNo >= 0);
662
663 Rock::SwapDir::Pointer self(this);
664 IoState *sio = new IoState(self, update.entry, cbFile, cbIo, data);
665
666 sio->swap_dirn = index;
667 sio->swap_filen = update.fresh.fileNo;
668 sio->writeableAnchor_ = update.fresh.anchor;
669
670 debugs(47,5, "dir " << index << " updating filen " <<
671 std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
672 sio->swap_filen << std::dec << " starting at " <<
673 diskOffset(sio->swap_filen));
674
675 sio->file(theFile);
676 return sio;
677 }
678
679 int64_t
680 Rock::SwapDir::diskOffset(const SlotId sid) const
681 {
682 assert(sid >= 0);
683 return HeaderSize + slotSize*sid;
684 }
685
686 int64_t
687 Rock::SwapDir::diskOffset(Ipc::Mem::PageId &pageId) const
688 {
689 assert(pageId);
690 return diskOffset(pageId.number - 1);
691 }
692
693 int64_t
694 Rock::SwapDir::diskOffsetLimit() const
695 {
696 assert(map);
697 return diskOffset(map->sliceLimit());
698 }
699
700 Rock::SlotId
701 Rock::SwapDir::reserveSlotForWriting()
702 {
703 Ipc::Mem::PageId pageId;
704
705 if (freeSlots->pop(pageId)) {
706 const auto slotId = pageId.number - 1;
707 debugs(47, 5, "got a previously free slot: " << slotId);
708 map->prepFreeSlice(slotId);
709 return slotId;
710 }
711
712 // catch free slots delivered to noteFreeMapSlice()
713 assert(!waitingForPage);
714 waitingForPage = &pageId;
715 if (map->purgeOne()) {
716 assert(!waitingForPage); // noteFreeMapSlice() should have cleared it
717 assert(pageId.set());
718 const auto slotId = pageId.number - 1;
719 debugs(47, 5, "got a previously busy slot: " << slotId);
720 map->prepFreeSlice(slotId);
721 return slotId;
722 }
723 assert(waitingForPage == &pageId);
724 waitingForPage = NULL;
725
726 // This may happen when the number of available db slots is close to the
727 // number of concurrent requests reading or writing those slots, which may
728 // happen when the db is "small" compared to the request traffic OR when we
729 // are rebuilding and have not loaded "many" entries or empty slots yet.
730 debugs(47, 3, "cannot get a slot; entries: " << map->entryCount());
731 throw TexcHere("ran out of free db slots");
732 }
733
734 bool
735 Rock::SwapDir::validSlotId(const SlotId slotId) const
736 {
737 return 0 <= slotId && slotId < slotLimitActual();
738 }
739
740 void
741 Rock::SwapDir::noteFreeMapSlice(const Ipc::StoreMapSliceId sliceId)
742 {
743 Ipc::Mem::PageId pageId;
744 pageId.pool = Ipc::Mem::PageStack::IdForSwapDirSpace(index);
745 pageId.number = sliceId+1;
746 if (waitingForPage) {
747 *waitingForPage = pageId;
748 waitingForPage = NULL;
749 } else {
750 freeSlots->push(pageId);
751 }
752 }
753
754 // tries to open an old entry with swap_filen for reading
755 StoreIOState::Pointer
756 Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
757 {
758 if (!theFile || theFile->error()) {
759 debugs(47,4, HERE << theFile);
760 return NULL;
761 }
762
763 if (!e.hasDisk()) {
764 debugs(47,4, HERE << e);
765 return NULL;
766 }
767
768 // Do not start I/O transaction if there are less than 10% free pages left.
769 // TODO: reserve page instead
770 if (needsDiskStrand() &&
771 Ipc::Mem::PageLevel(Ipc::Mem::PageId::ioPage) >= 0.9 * Ipc::Mem::PageLimit(Ipc::Mem::PageId::ioPage)) {
772 debugs(47, 5, HERE << "too few shared pages for IPC I/O left");
773 return NULL;
774 }
775
776 // The are two ways an entry can get swap_filen: our get() locked it for
777 // reading or our storeSwapOutStart() locked it for writing. Peeking at our
778 // locked entry is safe, but no support for reading the entry we swap out.
779 const Ipc::StoreMapAnchor *slot = map->peekAtReader(e.swap_filen);
780 if (!slot)
781 return NULL; // we were writing after all
782
783 Rock::SwapDir::Pointer self(this);
784 IoState *sio = new IoState(self, &e, cbFile, cbIo, data);
785
786 sio->swap_dirn = index;
787 sio->swap_filen = e.swap_filen;
788 sio->readableAnchor_ = slot;
789 sio->file(theFile);
790
791 debugs(47,5, HERE << "dir " << index << " has old filen: " <<
792 std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
793 sio->swap_filen);
794
795 assert(slot->sameKey(static_cast<const cache_key*>(e.key)));
796 // For collapsed disk hits: e.swap_file_sz and slot->basics.swap_file_sz
797 // may still be zero and basics.swap_file_sz may grow.
798 assert(slot->basics.swap_file_sz >= e.swap_file_sz);
799
800 return sio;
801 }
802
803 void
804 Rock::SwapDir::ioCompletedNotification()
805 {
806 if (!theFile)
807 fatalf("Rock cache_dir failed to initialize db file: %s", filePath);
808
809 if (theFile->error()) {
810 int xerrno = errno; // XXX: where does errno come from
811 fatalf("Rock cache_dir at %s failed to open db file: %s", filePath,
812 xstrerr(xerrno));
813 }
814
815 debugs(47, 2, "Rock cache_dir[" << index << "] limits: " <<
816 std::setw(12) << maxSize() << " disk bytes, " <<
817 std::setw(7) << map->entryLimit() << " entries, and " <<
818 std::setw(7) << map->sliceLimit() << " slots");
819
820 if (!Rebuild::Start(*this))
821 storeRebuildComplete(nullptr);
822 }
823
824 void
825 Rock::SwapDir::closeCompleted()
826 {
827 theFile = NULL;
828 }
829
830 void
831 Rock::SwapDir::readCompleted(const char *, int rlen, int errflag, RefCount< ::ReadRequest> r)
832 {
833 ReadRequest *request = dynamic_cast<Rock::ReadRequest*>(r.getRaw());
834 assert(request);
835 IoState::Pointer sio = request->sio;
836 sio->handleReadCompletion(*request, rlen, errflag);
837 }
838
839 void
840 Rock::SwapDir::writeCompleted(int errflag, size_t, RefCount< ::WriteRequest> r)
841 {
842 // TODO: Move details into IoState::handleWriteCompletion() after figuring
843 // out how to deal with map access. See readCompleted().
844
845 Rock::WriteRequest *request = dynamic_cast<Rock::WriteRequest*>(r.getRaw());
846 assert(request);
847 assert(request->sio != NULL);
848 IoState &sio = *request->sio;
849
850 // quit if somebody called IoState::close() while we were waiting
851 if (!sio.stillWaiting()) {
852 debugs(79, 3, "ignoring closed entry " << sio.swap_filen);
853 noteFreeMapSlice(request->sidCurrent);
854 return;
855 }
856
857 debugs(79, 7, "errflag=" << errflag << " rlen=" << request->len << " eof=" << request->eof);
858
859 if (errflag != DISK_OK)
860 handleWriteCompletionProblem(errflag, *request);
861 else if (!sio.expectedReply(request->id))
862 handleWriteCompletionProblem(DISK_ERROR, *request);
863 else
864 handleWriteCompletionSuccess(*request);
865
866 if (sio.touchingStoreEntry())
867 CollapsedForwarding::Broadcast(*sio.e);
868 }
869
870 /// code shared by writeCompleted() success handling cases
871 void
872 Rock::SwapDir::handleWriteCompletionSuccess(const WriteRequest &request)
873 {
874 auto &sio = *(request.sio);
875 sio.splicingPoint = request.sidCurrent;
876 // do not increment sio.offset_ because we do it in sio->write()
877
878 assert(sio.writeableAnchor_);
879 if (sio.writeableAnchor_->start < 0) { // wrote the first slot
880 Must(request.sidPrevious < 0);
881 sio.writeableAnchor_->start = request.sidCurrent;
882 } else {
883 Must(request.sidPrevious >= 0);
884 map->writeableSlice(sio.swap_filen, request.sidPrevious).next = request.sidCurrent;
885 }
886
887 // finalize the shared slice info after writing slice contents to disk;
888 // the chain gets possession of the slice we were writing
889 Ipc::StoreMap::Slice &slice =
890 map->writeableSlice(sio.swap_filen, request.sidCurrent);
891 slice.size = request.len - sizeof(DbCellHeader);
892 Must(slice.next < 0);
893
894 if (request.eof) {
895 assert(sio.e);
896 if (sio.touchingStoreEntry()) {
897 sio.e->swap_file_sz = sio.writeableAnchor_->basics.swap_file_sz =
898 sio.offset_;
899
900 map->switchWritingToReading(sio.swap_filen);
901 // sio.e keeps the (now read) lock on the anchor
902 }
903 sio.writeableAnchor_ = NULL;
904 sio.finishedWriting(DISK_OK);
905 }
906 }
907
908 /// code shared by writeCompleted() error handling cases
909 void
910 Rock::SwapDir::handleWriteCompletionProblem(const int errflag, const WriteRequest &request)
911 {
912 auto &sio = *request.sio;
913
914 noteFreeMapSlice(request.sidCurrent);
915
916 writeError(sio);
917 sio.finishedWriting(errflag);
918 // and hope that Core will call disconnect() to close the map entry
919 }
920
921 void
922 Rock::SwapDir::writeError(StoreIOState &sio)
923 {
924 // Do not abortWriting here. The entry should keep the write lock
925 // instead of losing association with the store and confusing core.
926 map->freeEntry(sio.swap_filen); // will mark as unusable, just in case
927
928 if (sio.touchingStoreEntry())
929 Store::Root().stopSharing(*sio.e);
930 // else noop: a fresh entry update error does not affect stale entry readers
931
932 // All callers must also call IoState callback, to propagate the error.
933 }
934
935 void
936 Rock::SwapDir::updateHeaders(StoreEntry *updatedE)
937 {
938 if (!map)
939 return;
940
941 Ipc::StoreMapUpdate update(updatedE);
942 if (!map->openForUpdating(update, updatedE->swap_filen))
943 return;
944
945 try {
946 AsyncJob::Start(new HeaderUpdater(this, update));
947 } catch (const std::exception &ex) {
948 debugs(20, 2, "error starting to update entry " << *updatedE << ": " << ex.what());
949 map->abortUpdating(update);
950 }
951 }
952
953 bool
954 Rock::SwapDir::full() const
955 {
956 return freeSlots != NULL && !freeSlots->size();
957 }
958
959 // storeSwapOutFileClosed calls this nethod on DISK_NO_SPACE_LEFT,
960 // but it should not happen for us
961 void
962 Rock::SwapDir::diskFull()
963 {
964 debugs(20, DBG_IMPORTANT, "BUG: No space left with rock cache_dir: " <<
965 filePath);
966 }
967
968 /// purge while full(); it should be sufficient to purge just one
969 void
970 Rock::SwapDir::maintain()
971 {
972 // The Store calls this to free some db space, but there is nothing wrong
973 // with a full() db, except when db has to shrink after reconfigure, and
974 // we do not support shrinking yet (it would have to purge specific slots).
975 // TODO: Disable maintain() requests when they are pointless.
976 }
977
978 void
979 Rock::SwapDir::reference(StoreEntry &e)
980 {
981 debugs(47, 5, HERE << &e << ' ' << e.swap_dirn << ' ' << e.swap_filen);
982 if (repl && repl->Referenced)
983 repl->Referenced(repl, &e, &e.repl);
984 }
985
986 bool
987 Rock::SwapDir::dereference(StoreEntry &e)
988 {
989 debugs(47, 5, HERE << &e << ' ' << e.swap_dirn << ' ' << e.swap_filen);
990 if (repl && repl->Dereferenced)
991 repl->Dereferenced(repl, &e, &e.repl);
992
993 // no need to keep e in the global store_table for us; we have our own map
994 return false;
995 }
996
997 bool
998 Rock::SwapDir::unlinkdUseful() const
999 {
1000 // no entry-specific files to unlink
1001 return false;
1002 }
1003
1004 void
1005 Rock::SwapDir::evictIfFound(const cache_key *key)
1006 {
1007 if (map)
1008 map->freeEntryByKey(key); // may not be there
1009 }
1010
1011 void
1012 Rock::SwapDir::evictCached(StoreEntry &e)
1013 {
1014 debugs(47, 5, e);
1015 if (e.hasDisk(index)) {
1016 if (map->freeEntry(e.swap_filen))
1017 CollapsedForwarding::Broadcast(e);
1018 if (!e.locked())
1019 disconnect(e);
1020 } else if (const auto key = e.publicKey()) {
1021 evictIfFound(key);
1022 }
1023 }
1024
1025 void
1026 Rock::SwapDir::trackReferences(StoreEntry &e)
1027 {
1028 debugs(47, 5, HERE << e);
1029 if (repl)
1030 repl->Add(repl, &e, &e.repl);
1031 }
1032
1033 void
1034 Rock::SwapDir::ignoreReferences(StoreEntry &e)
1035 {
1036 debugs(47, 5, HERE << e);
1037 if (repl)
1038 repl->Remove(repl, &e, &e.repl);
1039 }
1040
1041 void
1042 Rock::SwapDir::statfs(StoreEntry &e) const
1043 {
1044 storeAppendPrintf(&e, "\n");
1045 storeAppendPrintf(&e, "Maximum Size: %" PRIu64 " KB\n", maxSize() >> 10);
1046 storeAppendPrintf(&e, "Current Size: %.2f KB %.2f%%\n",
1047 currentSize() / 1024.0,
1048 Math::doublePercent(currentSize(), maxSize()));
1049
1050 const int entryLimit = entryLimitActual();
1051 const int slotLimit = slotLimitActual();
1052 storeAppendPrintf(&e, "Maximum entries: %9d\n", entryLimit);
1053 if (map && entryLimit > 0) {
1054 const int entryCount = map->entryCount();
1055 storeAppendPrintf(&e, "Current entries: %9d %.2f%%\n",
1056 entryCount, (100.0 * entryCount / entryLimit));
1057 }
1058
1059 storeAppendPrintf(&e, "Maximum slots: %9d\n", slotLimit);
1060 if (map && slotLimit > 0) {
1061 const unsigned int slotsFree = !freeSlots ? 0 : freeSlots->size();
1062 if (slotsFree <= static_cast<const unsigned int>(slotLimit)) {
1063 const int usedSlots = slotLimit - static_cast<const int>(slotsFree);
1064 storeAppendPrintf(&e, "Used slots: %9d %.2f%%\n",
1065 usedSlots, (100.0 * usedSlots / slotLimit));
1066 }
1067 if (slotLimit < 100) { // XXX: otherwise too expensive to count
1068 Ipc::ReadWriteLockStats stats;
1069 map->updateStats(stats);
1070 stats.dump(e);
1071 }
1072 }
1073
1074 storeAppendPrintf(&e, "Pending operations: %d out of %d\n",
1075 store_open_disk_fd, Config.max_open_disk_fds);
1076
1077 storeAppendPrintf(&e, "Flags:");
1078
1079 if (flags.selected)
1080 storeAppendPrintf(&e, " SELECTED");
1081
1082 if (flags.read_only)
1083 storeAppendPrintf(&e, " READ-ONLY");
1084
1085 storeAppendPrintf(&e, "\n");
1086
1087 }
1088
1089 SBuf
1090 Rock::SwapDir::inodeMapPath() const
1091 {
1092 return Ipc::Mem::Segment::Name(SBuf(path), "map");
1093 }
1094
1095 const char *
1096 Rock::SwapDir::freeSlotsPath() const
1097 {
1098 static String spacesPath;
1099 spacesPath = path;
1100 spacesPath.append("_spaces");
1101 return spacesPath.termedBuf();
1102 }
1103
1104 bool
1105 Rock::SwapDir::hasReadableEntry(const StoreEntry &e) const
1106 {
1107 return map->hasReadableEntry(reinterpret_cast<const cache_key*>(e.key));
1108 }
1109
1110 namespace Rock
1111 {
1112 RunnerRegistrationEntry(SwapDirRr);
1113 }
1114
1115 void Rock::SwapDirRr::create()
1116 {
1117 Must(mapOwners.empty() && freeSlotsOwners.empty());
1118 for (int i = 0; i < Config.cacheSwap.n_configured; ++i) {
1119 if (const Rock::SwapDir *const sd = dynamic_cast<Rock::SwapDir *>(INDEXSD(i))) {
1120 rebuildStatsOwners.push_back(Rebuild::Stats::Init(*sd));
1121
1122 const int64_t capacity = sd->slotLimitActual();
1123
1124 SwapDir::DirMap::Owner *const mapOwner =
1125 SwapDir::DirMap::Init(sd->inodeMapPath(), capacity);
1126 mapOwners.push_back(mapOwner);
1127
1128 // TODO: somehow remove pool id and counters from PageStack?
1129 Ipc::Mem::PageStack::Config config;
1130 config.poolId = Ipc::Mem::PageStack::IdForSwapDirSpace(i);
1131 config.pageSize = 0; // this is an index of slots on _disk_
1132 config.capacity = capacity;
1133 config.createFull = false; // Rebuild finds and pushes free slots
1134 Ipc::Mem::Owner<Ipc::Mem::PageStack> *const freeSlotsOwner =
1135 shm_new(Ipc::Mem::PageStack)(sd->freeSlotsPath(), config);
1136 freeSlotsOwners.push_back(freeSlotsOwner);
1137 }
1138 }
1139 }
1140
1141 Rock::SwapDirRr::~SwapDirRr()
1142 {
1143 for (size_t i = 0; i < mapOwners.size(); ++i) {
1144 delete rebuildStatsOwners[i];
1145 delete mapOwners[i];
1146 delete freeSlotsOwners[i];
1147 }
1148 }
1149