2 * Copyright (C) 1996-2021 The Squid Software Foundation and contributors
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
9 /* DEBUG: section 47 Store Directory Routines */
13 #include "CollapsedForwarding.h"
14 #include "ConfigOption.h"
15 #include "DiskIO/DiskIOModule.h"
16 #include "DiskIO/DiskIOStrategy.h"
17 #include "DiskIO/ReadRequest.h"
18 #include "DiskIO/WriteRequest.h"
19 #include "fs/rock/RockHeaderUpdater.h"
20 #include "fs/rock/RockIoRequests.h"
21 #include "fs/rock/RockIoState.h"
22 #include "fs/rock/RockSwapDir.h"
24 #include "ipc/mem/Pages.h"
25 #include "MemObject.h"
27 #include "SquidConfig.h"
28 #include "SquidMath.h"
39 const int64_t Rock::SwapDir::HeaderSize
= 16*1024;
41 Rock::SwapDir::SwapDir(): ::SwapDir("rock"),
42 slotSize(HeaderSize
), filePath(NULL
), map(NULL
), io(NULL
),
47 Rock::SwapDir::~SwapDir()
54 // called when Squid core needs a StoreEntry with a given key
56 Rock::SwapDir::get(const cache_key
*key
)
58 if (!map
|| !theFile
|| !theFile
->canRead())
62 const Ipc::StoreMapAnchor
*const slot
= map
->openForReading(key
, filen
);
66 // create a brand new store entry and initialize it with stored basics
67 StoreEntry
*e
= new StoreEntry();
69 anchorEntry(*e
, filen
, *slot
);
75 Rock::SwapDir::anchorToCache(StoreEntry
&entry
, bool &inSync
)
77 if (!map
|| !theFile
|| !theFile
->canRead())
81 const Ipc::StoreMapAnchor
*const slot
= map
->openForReading(
82 reinterpret_cast<cache_key
*>(entry
.key
), filen
);
86 anchorEntry(entry
, filen
, *slot
);
87 inSync
= updateAnchoredWith(entry
, *slot
);
88 return true; // even if inSync is false
92 Rock::SwapDir::updateAnchored(StoreEntry
&entry
)
94 if (!map
|| !theFile
|| !theFile
->canRead())
97 assert(entry
.hasDisk(index
));
99 const Ipc::StoreMapAnchor
&s
= map
->readableEntry(entry
.swap_filen
);
100 return updateAnchoredWith(entry
, s
);
104 Rock::SwapDir::updateAnchoredWith(StoreEntry
&entry
, const Ipc::StoreMapAnchor
&anchor
)
106 entry
.swap_file_sz
= anchor
.basics
.swap_file_sz
;
111 Rock::SwapDir::anchorEntry(StoreEntry
&e
, const sfileno filen
, const Ipc::StoreMapAnchor
&anchor
)
113 anchor
.exportInto(e
);
115 const bool complete
= anchor
.complete();
116 e
.store_status
= complete
? STORE_OK
: STORE_PENDING
;
117 // SWAPOUT_WRITING: even though another worker writes?
118 e
.attachToDisk(index
, filen
, complete
? SWAPOUT_DONE
: SWAPOUT_WRITING
);
120 e
.ping_status
= PING_NONE
;
122 EBIT_SET(e
.flags
, ENTRY_VALIDATED
);
125 void Rock::SwapDir::disconnect(StoreEntry
&e
)
127 assert(e
.hasDisk(index
));
131 // do not rely on e.swap_status here because there is an async delay
132 // before it switches from SWAPOUT_WRITING to SWAPOUT_DONE.
134 // since e has swap_filen, its slot is locked for reading and/or writing
135 // but it is difficult to know whether THIS worker is reading or writing e,
136 // especially since we may switch from writing to reading. This code relies
137 // on Rock::IoState::writeableAnchor_ being set when we locked for writing.
138 if (e
.mem_obj
&& e
.mem_obj
->swapout
.sio
!= NULL
&&
139 dynamic_cast<IoState
&>(*e
.mem_obj
->swapout
.sio
).writeableAnchor_
) {
140 map
->abortWriting(e
.swap_filen
);
142 dynamic_cast<IoState
&>(*e
.mem_obj
->swapout
.sio
).writeableAnchor_
= NULL
;
143 Store::Root().stopSharing(e
); // broadcasts after the change
145 map
->closeForReading(e
.swap_filen
);
151 Rock::SwapDir::currentSize() const
153 const uint64_t spaceSize
= !freeSlots
?
154 maxSize() : (slotSize
* freeSlots
->size());
155 // everything that is not free is in use
156 return maxSize() - spaceSize
;
160 Rock::SwapDir::currentCount() const
162 return map
? map
->entryCount() : 0;
165 /// In SMP mode only the disker process reports stats to avoid
166 /// counting the same stats by multiple processes.
168 Rock::SwapDir::doReportStat() const
170 return ::SwapDir::doReportStat() && (!UsingSmp() || IamDiskProcess());
174 Rock::SwapDir::finalizeSwapoutSuccess(const StoreEntry
&)
180 Rock::SwapDir::finalizeSwapoutFailure(StoreEntry
&entry
)
182 debugs(47, 5, entry
);
183 disconnect(entry
); // calls abortWriting() to free the disk entry
187 Rock::SwapDir::slotLimitAbsolute() const
189 // the max value is an invalid one; all values must be below the limit
190 assert(std::numeric_limits
<Ipc::StoreMapSliceId
>::max() ==
191 std::numeric_limits
<SlotId
>::max());
192 return std::numeric_limits
<SlotId
>::max();
196 Rock::SwapDir::slotLimitActual() const
198 const int64_t sWanted
= (maxSize() - HeaderSize
)/slotSize
;
199 const int64_t sLimitLo
= map
? map
->sliceLimit() : 0; // dynamic shrinking unsupported
200 const int64_t sLimitHi
= slotLimitAbsolute();
201 return min(max(sLimitLo
, sWanted
), sLimitHi
);
205 Rock::SwapDir::entryLimitActual() const
207 return min(slotLimitActual(), entryLimitAbsolute());
210 // TODO: encapsulate as a tool
212 Rock::SwapDir::create()
217 if (UsingSmp() && !IamDiskProcess()) {
218 debugs (47,3, HERE
<< "disker will create in " << path
);
222 debugs (47,3, HERE
<< "creating in " << path
);
225 if (::stat(path
, &dir_sb
) == 0) {
227 if (::stat(filePath
, &file_sb
) == 0) {
228 debugs (47, DBG_IMPORTANT
, "Skipping existing Rock db: " << filePath
);
231 // else the db file is not there or is not accessible, and we will try
232 // to create it later below, generating a detailed error on failures.
233 } else { // path does not exist or is inaccessible
234 // If path exists but is not accessible, mkdir() below will fail, and
235 // the admin should see the error and act accordingly, so there is
236 // no need to distinguish ENOENT from other possible stat() errors.
237 debugs (47, DBG_IMPORTANT
, "Creating Rock db directory: " << path
);
238 const int res
= mkdir(path
, 0700);
240 createError("mkdir");
243 debugs (47, DBG_IMPORTANT
, "Creating Rock db: " << filePath
);
244 const int swap
= open(filePath
, O_WRONLY
|O_CREAT
|O_TRUNC
|O_BINARY
, 0600);
246 createError("create");
248 #if SLOWLY_FILL_WITH_ZEROS
250 Must(maxSize() % sizeof(block
) == 0);
251 memset(block
, '\0', sizeof(block
));
253 for (off_t offset
= 0; offset
< maxSize(); offset
+= sizeof(block
)) {
254 if (write(swap
, block
, sizeof(block
)) != sizeof(block
))
255 createError("write");
258 if (ftruncate(swap
, maxSize()) != 0)
259 createError("truncate");
261 char header
[HeaderSize
];
262 memset(header
, '\0', sizeof(header
));
263 if (write(swap
, header
, sizeof(header
)) != sizeof(header
))
264 createError("write");
270 // report Rock DB creation error and exit
272 Rock::SwapDir::createError(const char *const msg
)
274 int xerrno
= errno
; // XXX: where does errno come from?
275 debugs(47, DBG_CRITICAL
, "ERROR: Failed to initialize Rock Store db in " <<
276 filePath
<< "; " << msg
<< " error: " << xstrerr(xerrno
));
277 fatal("Rock Store db creation error");
281 Rock::SwapDir::init()
285 // XXX: SwapDirs aren't refcounted. We make IORequestor calls, which
286 // are refcounted. We up our count once to avoid implicit delete's.
289 freeSlots
= shm_old(Ipc::Mem::PageStack
)(freeSlotsPath());
292 map
= new DirMap(inodeMapPath());
295 const char *ioModule
= needsDiskStrand() ? "IpcIo" : "Blocking";
296 if (DiskIOModule
*m
= DiskIOModule::Find(ioModule
)) {
297 debugs(47,2, HERE
<< "Using DiskIO module: " << ioModule
);
298 io
= m
->createStrategy();
301 debugs(47, DBG_CRITICAL
, "FATAL: Rock store is missing DiskIO module: " <<
303 fatal("Rock Store missing a required DiskIO module");
306 theFile
= io
->newFile(filePath
);
307 theFile
->configure(fileConfig
);
308 theFile
->open(O_RDWR
, 0644, this);
312 Rock::SwapDir::needsDiskStrand() const
314 const bool wontEvenWorkWithoutDisker
= Config
.workers
> 1;
315 const bool wouldWorkBetterWithDisker
= DiskIOModule::Find("IpcIo");
316 return InDaemonMode() && (wontEvenWorkWithoutDisker
||
317 wouldWorkBetterWithDisker
);
321 Rock::SwapDir::parse(int anIndex
, char *aPath
)
325 path
= xstrdup(aPath
);
327 // cache store is located at path/db
329 fname
.append("/rock");
330 filePath
= xstrdup(fname
.termedBuf());
335 // Current openForWriting() code overwrites the old slot if needed
336 // and possible, so proactively removing old slots is probably useless.
337 assert(!repl
); // repl = createRemovalPolicy(Config.replPolicy);
343 Rock::SwapDir::reconfigure()
347 // TODO: can we reconfigure the replacement policy (repl)?
351 /// parse maximum db disk size
353 Rock::SwapDir::parseSize(const bool reconfig
)
355 const int i
= GetInteger();
357 fatal("negative Rock cache_dir size value");
358 const uint64_t new_max_size
=
359 static_cast<uint64_t>(i
) << 20; // MBytes to Bytes
361 max_size
= new_max_size
;
362 else if (new_max_size
!= max_size
) {
363 debugs(3, DBG_IMPORTANT
, "WARNING: cache_dir '" << path
<< "' size "
364 "cannot be changed dynamically, value left unchanged (" <<
365 (max_size
>> 20) << " MB)");
370 Rock::SwapDir::getOptionTree() const
372 ConfigOption
*copt
= ::SwapDir::getOptionTree();
373 ConfigOptionVector
*vector
= dynamic_cast<ConfigOptionVector
*>(copt
);
375 // if copt is actually a ConfigOptionVector
376 vector
->options
.push_back(new ConfigOptionAdapter
<SwapDir
>(*const_cast<SwapDir
*>(this), &SwapDir::parseSizeOption
, &SwapDir::dumpSizeOption
));
377 vector
->options
.push_back(new ConfigOptionAdapter
<SwapDir
>(*const_cast<SwapDir
*>(this), &SwapDir::parseTimeOption
, &SwapDir::dumpTimeOption
));
378 vector
->options
.push_back(new ConfigOptionAdapter
<SwapDir
>(*const_cast<SwapDir
*>(this), &SwapDir::parseRateOption
, &SwapDir::dumpRateOption
));
380 // we don't know how to handle copt, as it's not a ConfigOptionVector.
381 // free it (and return nullptr)
389 Rock::SwapDir::allowOptionReconfigure(const char *const option
) const
391 return strcmp(option
, "slot-size") != 0 &&
392 ::SwapDir::allowOptionReconfigure(option
);
395 /// parses time-specific options; mimics ::SwapDir::optionObjectSizeParse()
397 Rock::SwapDir::parseTimeOption(char const *option
, const char *value
, int reconfig
)
399 // TODO: ::SwapDir or, better, Config should provide time-parsing routines,
400 // including time unit handling. Same for size and rate.
402 time_msec_t
*storedTime
;
403 if (strcmp(option
, "swap-timeout") == 0)
404 storedTime
= &fileConfig
.ioTimeout
;
413 // TODO: handle time units and detect parsing errors better
414 const int64_t parsedValue
= strtoll(value
, NULL
, 10);
415 if (parsedValue
< 0) {
416 debugs(3, DBG_CRITICAL
, "FATAL: cache_dir " << path
<< ' ' << option
<< " must not be negative but is: " << parsedValue
);
421 const time_msec_t newTime
= static_cast<time_msec_t
>(parsedValue
);
424 *storedTime
= newTime
;
425 else if (*storedTime
!= newTime
) {
426 debugs(3, DBG_IMPORTANT
, "WARNING: cache_dir " << path
<< ' ' << option
427 << " cannot be changed dynamically, value left unchanged: " <<
434 /// reports time-specific options; mimics ::SwapDir::optionObjectSizeDump()
436 Rock::SwapDir::dumpTimeOption(StoreEntry
* e
) const
438 if (fileConfig
.ioTimeout
)
439 storeAppendPrintf(e
, " swap-timeout=%" PRId64
,
440 static_cast<int64_t>(fileConfig
.ioTimeout
));
443 /// parses rate-specific options; mimics ::SwapDir::optionObjectSizeParse()
445 Rock::SwapDir::parseRateOption(char const *option
, const char *value
, int isaReconfig
)
448 if (strcmp(option
, "max-swap-rate") == 0)
449 storedRate
= &fileConfig
.ioRate
;
458 // TODO: handle time units and detect parsing errors better
459 const int64_t parsedValue
= strtoll(value
, NULL
, 10);
460 if (parsedValue
< 0) {
461 debugs(3, DBG_CRITICAL
, "FATAL: cache_dir " << path
<< ' ' << option
<< " must not be negative but is: " << parsedValue
);
466 const int newRate
= static_cast<int>(parsedValue
);
469 debugs(3, DBG_CRITICAL
, "FATAL: cache_dir " << path
<< ' ' << option
<< " must not be negative but is: " << newRate
);
475 *storedRate
= newRate
;
476 else if (*storedRate
!= newRate
) {
477 debugs(3, DBG_IMPORTANT
, "WARNING: cache_dir " << path
<< ' ' << option
478 << " cannot be changed dynamically, value left unchanged: " <<
485 /// reports rate-specific options; mimics ::SwapDir::optionObjectSizeDump()
487 Rock::SwapDir::dumpRateOption(StoreEntry
* e
) const
489 if (fileConfig
.ioRate
>= 0)
490 storeAppendPrintf(e
, " max-swap-rate=%d", fileConfig
.ioRate
);
493 /// parses size-specific options; mimics ::SwapDir::optionObjectSizeParse()
495 Rock::SwapDir::parseSizeOption(char const *option
, const char *value
, int reconfig
)
497 uint64_t *storedSize
;
498 if (strcmp(option
, "slot-size") == 0)
499 storedSize
= &slotSize
;
508 // TODO: handle size units and detect parsing errors better
509 const uint64_t newSize
= strtoll(value
, NULL
, 10);
511 debugs(3, DBG_CRITICAL
, "FATAL: cache_dir " << path
<< ' ' << option
<< " must be positive; got: " << newSize
);
516 if (newSize
<= sizeof(DbCellHeader
)) {
517 debugs(3, DBG_CRITICAL
, "FATAL: cache_dir " << path
<< ' ' << option
<< " must exceed " << sizeof(DbCellHeader
) << "; got: " << newSize
);
523 *storedSize
= newSize
;
524 else if (*storedSize
!= newSize
) {
525 debugs(3, DBG_IMPORTANT
, "WARNING: cache_dir " << path
<< ' ' << option
526 << " cannot be changed dynamically, value left unchanged: " <<
533 /// reports size-specific options; mimics ::SwapDir::optionObjectSizeDump()
535 Rock::SwapDir::dumpSizeOption(StoreEntry
* e
) const
537 storeAppendPrintf(e
, " slot-size=%" PRId64
, slotSize
);
540 /// check the results of the configuration; only level-0 debugging works here
542 Rock::SwapDir::validateOptions()
545 fatal("Rock store requires a positive slot-size");
547 const int64_t maxSizeRoundingWaste
= 1024 * 1024; // size is configured in MB
548 const int64_t slotSizeRoundingWaste
= slotSize
;
549 const int64_t maxRoundingWaste
=
550 max(maxSizeRoundingWaste
, slotSizeRoundingWaste
);
552 // an entry consumes at least one slot; round up to reduce false warnings
553 const int64_t blockSize
= static_cast<int64_t>(slotSize
);
554 const int64_t maxObjSize
= max(blockSize
,
555 ((maxObjectSize()+blockSize
-1)/blockSize
)*blockSize
);
557 // Does the "sfileno*max-size" limit match configured db capacity?
558 const double entriesMayOccupy
= entryLimitAbsolute()*static_cast<double>(maxObjSize
);
559 if (entriesMayOccupy
+ maxRoundingWaste
< maxSize()) {
560 const int64_t diskWasteSize
= maxSize() - static_cast<int64_t>(entriesMayOccupy
);
561 debugs(47, DBG_CRITICAL
, "WARNING: Rock cache_dir " << path
<< " wastes disk space due to entry limits:" <<
562 "\n\tconfigured db capacity: " << maxSize() << " bytes" <<
563 "\n\tconfigured db slot size: " << slotSize
<< " bytes" <<
564 "\n\tconfigured maximum entry size: " << maxObjectSize() << " bytes" <<
565 "\n\tmaximum number of cache_dir entries supported by Squid: " << entryLimitAbsolute() <<
566 "\n\tdisk space all entries may use: " << entriesMayOccupy
<< " bytes" <<
567 "\n\tdisk space wasted: " << diskWasteSize
<< " bytes");
570 // Does the "absolute slot count" limit match configured db capacity?
571 const double slotsMayOccupy
= slotLimitAbsolute()*static_cast<double>(slotSize
);
572 if (slotsMayOccupy
+ maxRoundingWaste
< maxSize()) {
573 const int64_t diskWasteSize
= maxSize() - static_cast<int64_t>(entriesMayOccupy
);
574 debugs(47, DBG_CRITICAL
, "WARNING: Rock cache_dir " << path
<< " wastes disk space due to slot limits:" <<
575 "\n\tconfigured db capacity: " << maxSize() << " bytes" <<
576 "\n\tconfigured db slot size: " << slotSize
<< " bytes" <<
577 "\n\tmaximum number of rock cache_dir slots supported by Squid: " << slotLimitAbsolute() <<
578 "\n\tdisk space all slots may use: " << slotsMayOccupy
<< " bytes" <<
579 "\n\tdisk space wasted: " << diskWasteSize
<< " bytes");
584 Rock::SwapDir::canStore(const StoreEntry
&e
, int64_t diskSpaceNeeded
, int &load
) const
586 if (diskSpaceNeeded
>= 0)
587 diskSpaceNeeded
+= sizeof(DbCellHeader
);
588 if (!::SwapDir::canStore(e
, diskSpaceNeeded
, load
))
591 if (!theFile
|| !theFile
->canWrite())
597 // Do not start I/O transaction if there are less than 10% free pages left.
598 // TODO: reserve page instead
599 if (needsDiskStrand() &&
600 Ipc::Mem::PageLevel(Ipc::Mem::PageId::ioPage
) >= 0.9 * Ipc::Mem::PageLimit(Ipc::Mem::PageId::ioPage
)) {
601 debugs(47, 5, HERE
<< "too few shared pages for IPC I/O left");
612 StoreIOState::Pointer
613 Rock::SwapDir::createStoreIO(StoreEntry
&e
, StoreIOState::STFNCB
*cbFile
, StoreIOState::STIOCB
*cbIo
, void *data
)
615 if (!theFile
|| theFile
->error()) {
616 debugs(47,4, HERE
<< theFile
);
621 Ipc::StoreMapAnchor
*const slot
=
622 map
->openForWriting(reinterpret_cast<const cache_key
*>(e
.key
), filen
);
624 debugs(47, 5, HERE
<< "map->add failed");
631 // XXX: We rely on our caller, storeSwapOutStart(), to set e.fileno.
632 // If that does not happen, the entry will not decrement the read level!
634 Rock::SwapDir::Pointer
self(this);
635 IoState
*sio
= new IoState(self
, &e
, cbFile
, cbIo
, data
);
637 sio
->swap_dirn
= index
;
638 sio
->swap_filen
= filen
;
639 sio
->writeableAnchor_
= slot
;
641 debugs(47,5, HERE
<< "dir " << index
<< " created new filen " <<
642 std::setfill('0') << std::hex
<< std::uppercase
<< std::setw(8) <<
643 sio
->swap_filen
<< std::dec
<< " starting at " <<
644 diskOffset(sio
->swap_filen
));
652 StoreIOState::Pointer
653 Rock::SwapDir::createUpdateIO(const Ipc::StoreMapUpdate
&update
, StoreIOState::STFNCB
*cbFile
, StoreIOState::STIOCB
*cbIo
, void *data
)
655 if (!theFile
|| theFile
->error()) {
656 debugs(47,4, theFile
);
661 Must(update
.fresh
.fileNo
>= 0);
663 Rock::SwapDir::Pointer
self(this);
664 IoState
*sio
= new IoState(self
, update
.entry
, cbFile
, cbIo
, data
);
666 sio
->swap_dirn
= index
;
667 sio
->swap_filen
= update
.fresh
.fileNo
;
668 sio
->writeableAnchor_
= update
.fresh
.anchor
;
670 debugs(47,5, "dir " << index
<< " updating filen " <<
671 std::setfill('0') << std::hex
<< std::uppercase
<< std::setw(8) <<
672 sio
->swap_filen
<< std::dec
<< " starting at " <<
673 diskOffset(sio
->swap_filen
));
680 Rock::SwapDir::diskOffset(const SlotId sid
) const
683 return HeaderSize
+ slotSize
*sid
;
687 Rock::SwapDir::diskOffset(Ipc::Mem::PageId
&pageId
) const
690 return diskOffset(pageId
.number
- 1);
694 Rock::SwapDir::diskOffsetLimit() const
697 return diskOffset(map
->sliceLimit());
701 Rock::SwapDir::reserveSlotForWriting()
703 Ipc::Mem::PageId pageId
;
705 if (freeSlots
->pop(pageId
)) {
706 const auto slotId
= pageId
.number
- 1;
707 debugs(47, 5, "got a previously free slot: " << slotId
);
708 map
->prepFreeSlice(slotId
);
712 // catch free slots delivered to noteFreeMapSlice()
713 assert(!waitingForPage
);
714 waitingForPage
= &pageId
;
715 if (map
->purgeOne()) {
716 assert(!waitingForPage
); // noteFreeMapSlice() should have cleared it
717 assert(pageId
.set());
718 const auto slotId
= pageId
.number
- 1;
719 debugs(47, 5, "got a previously busy slot: " << slotId
);
720 map
->prepFreeSlice(slotId
);
723 assert(waitingForPage
== &pageId
);
724 waitingForPage
= NULL
;
726 // This may happen when the number of available db slots is close to the
727 // number of concurrent requests reading or writing those slots, which may
728 // happen when the db is "small" compared to the request traffic OR when we
729 // are rebuilding and have not loaded "many" entries or empty slots yet.
730 debugs(47, 3, "cannot get a slot; entries: " << map
->entryCount());
731 throw TexcHere("ran out of free db slots");
735 Rock::SwapDir::validSlotId(const SlotId slotId
) const
737 return 0 <= slotId
&& slotId
< slotLimitActual();
741 Rock::SwapDir::noteFreeMapSlice(const Ipc::StoreMapSliceId sliceId
)
743 Ipc::Mem::PageId pageId
;
744 pageId
.pool
= Ipc::Mem::PageStack::IdForSwapDirSpace(index
);
745 pageId
.number
= sliceId
+1;
746 if (waitingForPage
) {
747 *waitingForPage
= pageId
;
748 waitingForPage
= NULL
;
750 freeSlots
->push(pageId
);
754 // tries to open an old entry with swap_filen for reading
755 StoreIOState::Pointer
756 Rock::SwapDir::openStoreIO(StoreEntry
&e
, StoreIOState::STFNCB
*cbFile
, StoreIOState::STIOCB
*cbIo
, void *data
)
758 if (!theFile
|| theFile
->error()) {
759 debugs(47,4, HERE
<< theFile
);
764 debugs(47,4, HERE
<< e
);
768 // Do not start I/O transaction if there are less than 10% free pages left.
769 // TODO: reserve page instead
770 if (needsDiskStrand() &&
771 Ipc::Mem::PageLevel(Ipc::Mem::PageId::ioPage
) >= 0.9 * Ipc::Mem::PageLimit(Ipc::Mem::PageId::ioPage
)) {
772 debugs(47, 5, HERE
<< "too few shared pages for IPC I/O left");
776 // The are two ways an entry can get swap_filen: our get() locked it for
777 // reading or our storeSwapOutStart() locked it for writing. Peeking at our
778 // locked entry is safe, but no support for reading the entry we swap out.
779 const Ipc::StoreMapAnchor
*slot
= map
->peekAtReader(e
.swap_filen
);
781 return NULL
; // we were writing after all
783 Rock::SwapDir::Pointer
self(this);
784 IoState
*sio
= new IoState(self
, &e
, cbFile
, cbIo
, data
);
786 sio
->swap_dirn
= index
;
787 sio
->swap_filen
= e
.swap_filen
;
788 sio
->readableAnchor_
= slot
;
791 debugs(47,5, HERE
<< "dir " << index
<< " has old filen: " <<
792 std::setfill('0') << std::hex
<< std::uppercase
<< std::setw(8) <<
795 assert(slot
->sameKey(static_cast<const cache_key
*>(e
.key
)));
796 // For collapsed disk hits: e.swap_file_sz and slot->basics.swap_file_sz
797 // may still be zero and basics.swap_file_sz may grow.
798 assert(slot
->basics
.swap_file_sz
>= e
.swap_file_sz
);
804 Rock::SwapDir::ioCompletedNotification()
807 fatalf("Rock cache_dir failed to initialize db file: %s", filePath
);
809 if (theFile
->error()) {
810 int xerrno
= errno
; // XXX: where does errno come from
811 fatalf("Rock cache_dir at %s failed to open db file: %s", filePath
,
815 debugs(47, 2, "Rock cache_dir[" << index
<< "] limits: " <<
816 std::setw(12) << maxSize() << " disk bytes, " <<
817 std::setw(7) << map
->entryLimit() << " entries, and " <<
818 std::setw(7) << map
->sliceLimit() << " slots");
820 if (!Rebuild::Start(*this))
821 storeRebuildComplete(nullptr);
825 Rock::SwapDir::closeCompleted()
831 Rock::SwapDir::readCompleted(const char *, int rlen
, int errflag
, RefCount
< ::ReadRequest
> r
)
833 ReadRequest
*request
= dynamic_cast<Rock::ReadRequest
*>(r
.getRaw());
835 IoState::Pointer sio
= request
->sio
;
836 sio
->handleReadCompletion(*request
, rlen
, errflag
);
840 Rock::SwapDir::writeCompleted(int errflag
, size_t, RefCount
< ::WriteRequest
> r
)
842 // TODO: Move details into IoState::handleWriteCompletion() after figuring
843 // out how to deal with map access. See readCompleted().
845 Rock::WriteRequest
*request
= dynamic_cast<Rock::WriteRequest
*>(r
.getRaw());
847 assert(request
->sio
!= NULL
);
848 IoState
&sio
= *request
->sio
;
850 // quit if somebody called IoState::close() while we were waiting
851 if (!sio
.stillWaiting()) {
852 debugs(79, 3, "ignoring closed entry " << sio
.swap_filen
);
853 noteFreeMapSlice(request
->sidCurrent
);
857 debugs(79, 7, "errflag=" << errflag
<< " rlen=" << request
->len
<< " eof=" << request
->eof
);
859 if (errflag
!= DISK_OK
)
860 handleWriteCompletionProblem(errflag
, *request
);
861 else if (!sio
.expectedReply(request
->id
))
862 handleWriteCompletionProblem(DISK_ERROR
, *request
);
864 handleWriteCompletionSuccess(*request
);
866 if (sio
.touchingStoreEntry())
867 CollapsedForwarding::Broadcast(*sio
.e
);
870 /// code shared by writeCompleted() success handling cases
872 Rock::SwapDir::handleWriteCompletionSuccess(const WriteRequest
&request
)
874 auto &sio
= *(request
.sio
);
875 sio
.splicingPoint
= request
.sidCurrent
;
876 // do not increment sio.offset_ because we do it in sio->write()
878 assert(sio
.writeableAnchor_
);
879 if (sio
.writeableAnchor_
->start
< 0) { // wrote the first slot
880 Must(request
.sidPrevious
< 0);
881 sio
.writeableAnchor_
->start
= request
.sidCurrent
;
883 Must(request
.sidPrevious
>= 0);
884 map
->writeableSlice(sio
.swap_filen
, request
.sidPrevious
).next
= request
.sidCurrent
;
887 // finalize the shared slice info after writing slice contents to disk;
888 // the chain gets possession of the slice we were writing
889 Ipc::StoreMap::Slice
&slice
=
890 map
->writeableSlice(sio
.swap_filen
, request
.sidCurrent
);
891 slice
.size
= request
.len
- sizeof(DbCellHeader
);
892 Must(slice
.next
< 0);
896 if (sio
.touchingStoreEntry()) {
897 sio
.e
->swap_file_sz
= sio
.writeableAnchor_
->basics
.swap_file_sz
=
900 map
->switchWritingToReading(sio
.swap_filen
);
901 // sio.e keeps the (now read) lock on the anchor
903 sio
.writeableAnchor_
= NULL
;
904 sio
.finishedWriting(DISK_OK
);
908 /// code shared by writeCompleted() error handling cases
910 Rock::SwapDir::handleWriteCompletionProblem(const int errflag
, const WriteRequest
&request
)
912 auto &sio
= *request
.sio
;
914 noteFreeMapSlice(request
.sidCurrent
);
917 sio
.finishedWriting(errflag
);
918 // and hope that Core will call disconnect() to close the map entry
922 Rock::SwapDir::writeError(StoreIOState
&sio
)
924 // Do not abortWriting here. The entry should keep the write lock
925 // instead of losing association with the store and confusing core.
926 map
->freeEntry(sio
.swap_filen
); // will mark as unusable, just in case
928 if (sio
.touchingStoreEntry())
929 Store::Root().stopSharing(*sio
.e
);
930 // else noop: a fresh entry update error does not affect stale entry readers
932 // All callers must also call IoState callback, to propagate the error.
936 Rock::SwapDir::updateHeaders(StoreEntry
*updatedE
)
941 Ipc::StoreMapUpdate
update(updatedE
);
942 if (!map
->openForUpdating(update
, updatedE
->swap_filen
))
946 AsyncJob::Start(new HeaderUpdater(this, update
));
947 } catch (const std::exception
&ex
) {
948 debugs(20, 2, "error starting to update entry " << *updatedE
<< ": " << ex
.what());
949 map
->abortUpdating(update
);
954 Rock::SwapDir::full() const
956 return freeSlots
!= NULL
&& !freeSlots
->size();
959 // storeSwapOutFileClosed calls this nethod on DISK_NO_SPACE_LEFT,
960 // but it should not happen for us
962 Rock::SwapDir::diskFull()
964 debugs(20, DBG_IMPORTANT
, "BUG: No space left with rock cache_dir: " <<
968 /// purge while full(); it should be sufficient to purge just one
970 Rock::SwapDir::maintain()
972 // The Store calls this to free some db space, but there is nothing wrong
973 // with a full() db, except when db has to shrink after reconfigure, and
974 // we do not support shrinking yet (it would have to purge specific slots).
975 // TODO: Disable maintain() requests when they are pointless.
979 Rock::SwapDir::reference(StoreEntry
&e
)
981 debugs(47, 5, HERE
<< &e
<< ' ' << e
.swap_dirn
<< ' ' << e
.swap_filen
);
982 if (repl
&& repl
->Referenced
)
983 repl
->Referenced(repl
, &e
, &e
.repl
);
987 Rock::SwapDir::dereference(StoreEntry
&e
)
989 debugs(47, 5, HERE
<< &e
<< ' ' << e
.swap_dirn
<< ' ' << e
.swap_filen
);
990 if (repl
&& repl
->Dereferenced
)
991 repl
->Dereferenced(repl
, &e
, &e
.repl
);
993 // no need to keep e in the global store_table for us; we have our own map
998 Rock::SwapDir::unlinkdUseful() const
1000 // no entry-specific files to unlink
1005 Rock::SwapDir::evictIfFound(const cache_key
*key
)
1008 map
->freeEntryByKey(key
); // may not be there
1012 Rock::SwapDir::evictCached(StoreEntry
&e
)
1015 if (e
.hasDisk(index
)) {
1016 if (map
->freeEntry(e
.swap_filen
))
1017 CollapsedForwarding::Broadcast(e
);
1020 } else if (const auto key
= e
.publicKey()) {
1026 Rock::SwapDir::trackReferences(StoreEntry
&e
)
1028 debugs(47, 5, HERE
<< e
);
1030 repl
->Add(repl
, &e
, &e
.repl
);
1034 Rock::SwapDir::ignoreReferences(StoreEntry
&e
)
1036 debugs(47, 5, HERE
<< e
);
1038 repl
->Remove(repl
, &e
, &e
.repl
);
1042 Rock::SwapDir::statfs(StoreEntry
&e
) const
1044 storeAppendPrintf(&e
, "\n");
1045 storeAppendPrintf(&e
, "Maximum Size: %" PRIu64
" KB\n", maxSize() >> 10);
1046 storeAppendPrintf(&e
, "Current Size: %.2f KB %.2f%%\n",
1047 currentSize() / 1024.0,
1048 Math::doublePercent(currentSize(), maxSize()));
1050 const int entryLimit
= entryLimitActual();
1051 const int slotLimit
= slotLimitActual();
1052 storeAppendPrintf(&e
, "Maximum entries: %9d\n", entryLimit
);
1053 if (map
&& entryLimit
> 0) {
1054 const int entryCount
= map
->entryCount();
1055 storeAppendPrintf(&e
, "Current entries: %9d %.2f%%\n",
1056 entryCount
, (100.0 * entryCount
/ entryLimit
));
1059 storeAppendPrintf(&e
, "Maximum slots: %9d\n", slotLimit
);
1060 if (map
&& slotLimit
> 0) {
1061 const unsigned int slotsFree
= !freeSlots
? 0 : freeSlots
->size();
1062 if (slotsFree
<= static_cast<const unsigned int>(slotLimit
)) {
1063 const int usedSlots
= slotLimit
- static_cast<const int>(slotsFree
);
1064 storeAppendPrintf(&e
, "Used slots: %9d %.2f%%\n",
1065 usedSlots
, (100.0 * usedSlots
/ slotLimit
));
1067 if (slotLimit
< 100) { // XXX: otherwise too expensive to count
1068 Ipc::ReadWriteLockStats stats
;
1069 map
->updateStats(stats
);
1074 storeAppendPrintf(&e
, "Pending operations: %d out of %d\n",
1075 store_open_disk_fd
, Config
.max_open_disk_fds
);
1077 storeAppendPrintf(&e
, "Flags:");
1080 storeAppendPrintf(&e
, " SELECTED");
1082 if (flags
.read_only
)
1083 storeAppendPrintf(&e
, " READ-ONLY");
1085 storeAppendPrintf(&e
, "\n");
1090 Rock::SwapDir::inodeMapPath() const
1092 return Ipc::Mem::Segment::Name(SBuf(path
), "map");
1096 Rock::SwapDir::freeSlotsPath() const
1098 static String spacesPath
;
1100 spacesPath
.append("_spaces");
1101 return spacesPath
.termedBuf();
1105 Rock::SwapDir::hasReadableEntry(const StoreEntry
&e
) const
1107 return map
->hasReadableEntry(reinterpret_cast<const cache_key
*>(e
.key
));
1112 RunnerRegistrationEntry(SwapDirRr
);
1115 void Rock::SwapDirRr::create()
1117 Must(mapOwners
.empty() && freeSlotsOwners
.empty());
1118 for (int i
= 0; i
< Config
.cacheSwap
.n_configured
; ++i
) {
1119 if (const Rock::SwapDir
*const sd
= dynamic_cast<Rock::SwapDir
*>(INDEXSD(i
))) {
1120 rebuildStatsOwners
.push_back(Rebuild::Stats::Init(*sd
));
1122 const int64_t capacity
= sd
->slotLimitActual();
1124 SwapDir::DirMap::Owner
*const mapOwner
=
1125 SwapDir::DirMap::Init(sd
->inodeMapPath(), capacity
);
1126 mapOwners
.push_back(mapOwner
);
1128 // TODO: somehow remove pool id and counters from PageStack?
1129 Ipc::Mem::PageStack::Config config
;
1130 config
.poolId
= Ipc::Mem::PageStack::IdForSwapDirSpace(i
);
1131 config
.pageSize
= 0; // this is an index of slots on _disk_
1132 config
.capacity
= capacity
;
1133 config
.createFull
= false; // Rebuild finds and pushes free slots
1134 Ipc::Mem::Owner
<Ipc::Mem::PageStack
> *const freeSlotsOwner
=
1135 shm_new(Ipc::Mem::PageStack
)(sd
->freeSlotsPath(), config
);
1136 freeSlotsOwners
.push_back(freeSlotsOwner
);
1141 Rock::SwapDirRr::~SwapDirRr()
1143 for (size_t i
= 0; i
< mapOwners
.size(); ++i
) {
1144 delete rebuildStatsOwners
[i
];
1145 delete mapOwners
[i
];
1146 delete freeSlotsOwners
[i
];