2 * Copyright (C) 1996-2018 The Squid Software Foundation and contributors
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
9 /* DEBUG: section 47 Store Directory Routines */
13 #include "CollapsedForwarding.h"
14 #include "ConfigOption.h"
15 #include "DiskIO/DiskIOModule.h"
16 #include "DiskIO/DiskIOStrategy.h"
17 #include "DiskIO/ReadRequest.h"
18 #include "DiskIO/WriteRequest.h"
19 #include "fs/rock/RockHeaderUpdater.h"
20 #include "fs/rock/RockIoRequests.h"
21 #include "fs/rock/RockIoState.h"
22 #include "fs/rock/RockRebuild.h"
23 #include "fs/rock/RockSwapDir.h"
25 #include "ipc/mem/Pages.h"
26 #include "MemObject.h"
28 #include "SquidConfig.h"
29 #include "SquidMath.h"
40 const int64_t Rock::SwapDir::HeaderSize
= 16*1024;
42 Rock::SwapDir::SwapDir(): ::SwapDir("rock"),
43 slotSize(HeaderSize
), filePath(NULL
), map(NULL
), io(NULL
),
48 Rock::SwapDir::~SwapDir()
55 // called when Squid core needs a StoreEntry with a given key
57 Rock::SwapDir::get(const cache_key
*key
)
59 if (!map
|| !theFile
|| !theFile
->canRead())
63 const Ipc::StoreMapAnchor
*const slot
= map
->openForReading(key
, filen
);
67 // create a brand new store entry and initialize it with stored basics
68 StoreEntry
*e
= new StoreEntry();
70 anchorEntry(*e
, filen
, *slot
);
76 Rock::SwapDir::anchorToCache(StoreEntry
&entry
, bool &inSync
)
78 if (!map
|| !theFile
|| !theFile
->canRead())
82 const Ipc::StoreMapAnchor
*const slot
= map
->openForReading(
83 reinterpret_cast<cache_key
*>(entry
.key
), filen
);
87 anchorEntry(entry
, filen
, *slot
);
88 inSync
= updateAnchoredWith(entry
, *slot
);
89 return true; // even if inSync is false
93 Rock::SwapDir::updateAnchored(StoreEntry
&entry
)
95 if (!map
|| !theFile
|| !theFile
->canRead())
98 assert(entry
.hasDisk(index
));
100 const Ipc::StoreMapAnchor
&s
= map
->readableEntry(entry
.swap_filen
);
101 return updateAnchoredWith(entry
, s
);
105 Rock::SwapDir::updateAnchoredWith(StoreEntry
&entry
, const Ipc::StoreMapAnchor
&anchor
)
107 entry
.swap_file_sz
= anchor
.basics
.swap_file_sz
;
112 Rock::SwapDir::anchorEntry(StoreEntry
&e
, const sfileno filen
, const Ipc::StoreMapAnchor
&anchor
)
114 anchor
.exportInto(e
);
116 const bool complete
= anchor
.complete();
117 e
.store_status
= complete
? STORE_OK
: STORE_PENDING
;
118 // SWAPOUT_WRITING: even though another worker writes?
119 e
.attachToDisk(index
, filen
, complete
? SWAPOUT_DONE
: SWAPOUT_WRITING
);
121 e
.ping_status
= PING_NONE
;
123 EBIT_SET(e
.flags
, ENTRY_VALIDATED
);
126 void Rock::SwapDir::disconnect(StoreEntry
&e
)
128 assert(e
.hasDisk(index
));
132 // do not rely on e.swap_status here because there is an async delay
133 // before it switches from SWAPOUT_WRITING to SWAPOUT_DONE.
135 // since e has swap_filen, its slot is locked for reading and/or writing
136 // but it is difficult to know whether THIS worker is reading or writing e,
137 // especially since we may switch from writing to reading. This code relies
138 // on Rock::IoState::writeableAnchor_ being set when we locked for writing.
139 if (e
.mem_obj
&& e
.mem_obj
->swapout
.sio
!= NULL
&&
140 dynamic_cast<IoState
&>(*e
.mem_obj
->swapout
.sio
).writeableAnchor_
) {
141 map
->abortWriting(e
.swap_filen
);
143 dynamic_cast<IoState
&>(*e
.mem_obj
->swapout
.sio
).writeableAnchor_
= NULL
;
144 Store::Root().stopSharing(e
); // broadcasts after the change
146 map
->closeForReading(e
.swap_filen
);
152 Rock::SwapDir::currentSize() const
154 const uint64_t spaceSize
= !freeSlots
?
155 maxSize() : (slotSize
* freeSlots
->size());
156 // everything that is not free is in use
157 return maxSize() - spaceSize
;
161 Rock::SwapDir::currentCount() const
163 return map
? map
->entryCount() : 0;
166 /// In SMP mode only the disker process reports stats to avoid
167 /// counting the same stats by multiple processes.
169 Rock::SwapDir::doReportStat() const
171 return ::SwapDir::doReportStat() && (!UsingSmp() || IamDiskProcess());
175 Rock::SwapDir::finalizeSwapoutSuccess(const StoreEntry
&)
181 Rock::SwapDir::finalizeSwapoutFailure(StoreEntry
&entry
)
183 debugs(47, 5, entry
);
184 disconnect(entry
); // calls abortWriting() to free the disk entry
188 Rock::SwapDir::slotLimitAbsolute() const
190 // the max value is an invalid one; all values must be below the limit
191 assert(std::numeric_limits
<Ipc::StoreMapSliceId
>::max() ==
192 std::numeric_limits
<SlotId
>::max());
193 return std::numeric_limits
<SlotId
>::max();
197 Rock::SwapDir::slotLimitActual() const
199 const int64_t sWanted
= (maxSize() - HeaderSize
)/slotSize
;
200 const int64_t sLimitLo
= map
? map
->sliceLimit() : 0; // dynamic shrinking unsupported
201 const int64_t sLimitHi
= slotLimitAbsolute();
202 return min(max(sLimitLo
, sWanted
), sLimitHi
);
206 Rock::SwapDir::entryLimitActual() const
208 return min(slotLimitActual(), entryLimitAbsolute());
211 // TODO: encapsulate as a tool
213 Rock::SwapDir::create()
218 if (UsingSmp() && !IamDiskProcess()) {
219 debugs (47,3, HERE
<< "disker will create in " << path
);
223 debugs (47,3, HERE
<< "creating in " << path
);
226 if (::stat(path
, &dir_sb
) == 0) {
228 if (::stat(filePath
, &file_sb
) == 0) {
229 debugs (47, DBG_IMPORTANT
, "Skipping existing Rock db: " << filePath
);
232 // else the db file is not there or is not accessible, and we will try
233 // to create it later below, generating a detailed error on failures.
234 } else { // path does not exist or is inaccessible
235 // If path exists but is not accessible, mkdir() below will fail, and
236 // the admin should see the error and act accordingly, so there is
237 // no need to distinguish ENOENT from other possible stat() errors.
238 debugs (47, DBG_IMPORTANT
, "Creating Rock db directory: " << path
);
239 const int res
= mkdir(path
, 0700);
241 createError("mkdir");
244 debugs (47, DBG_IMPORTANT
, "Creating Rock db: " << filePath
);
245 const int swap
= open(filePath
, O_WRONLY
|O_CREAT
|O_TRUNC
|O_BINARY
, 0600);
247 createError("create");
249 #if SLOWLY_FILL_WITH_ZEROS
251 Must(maxSize() % sizeof(block
) == 0);
252 memset(block
, '\0', sizeof(block
));
254 for (off_t offset
= 0; offset
< maxSize(); offset
+= sizeof(block
)) {
255 if (write(swap
, block
, sizeof(block
)) != sizeof(block
))
256 createError("write");
259 if (ftruncate(swap
, maxSize()) != 0)
260 createError("truncate");
262 char header
[HeaderSize
];
263 memset(header
, '\0', sizeof(header
));
264 if (write(swap
, header
, sizeof(header
)) != sizeof(header
))
265 createError("write");
271 // report Rock DB creation error and exit
273 Rock::SwapDir::createError(const char *const msg
)
275 int xerrno
= errno
; // XXX: where does errno come from?
276 debugs(47, DBG_CRITICAL
, "ERROR: Failed to initialize Rock Store db in " <<
277 filePath
<< "; " << msg
<< " error: " << xstrerr(xerrno
));
278 fatal("Rock Store db creation error");
282 Rock::SwapDir::init()
286 // XXX: SwapDirs aren't refcounted. We make IORequestor calls, which
287 // are refcounted. We up our count once to avoid implicit delete's.
290 freeSlots
= shm_old(Ipc::Mem::PageStack
)(freeSlotsPath());
293 map
= new DirMap(inodeMapPath());
296 const char *ioModule
= needsDiskStrand() ? "IpcIo" : "Blocking";
297 if (DiskIOModule
*m
= DiskIOModule::Find(ioModule
)) {
298 debugs(47,2, HERE
<< "Using DiskIO module: " << ioModule
);
299 io
= m
->createStrategy();
302 debugs(47, DBG_CRITICAL
, "FATAL: Rock store is missing DiskIO module: " <<
304 fatal("Rock Store missing a required DiskIO module");
307 theFile
= io
->newFile(filePath
);
308 theFile
->configure(fileConfig
);
309 theFile
->open(O_RDWR
, 0644, this);
311 // Increment early. Otherwise, if one SwapDir finishes rebuild before
312 // others start, storeRebuildComplete() will think the rebuild is over!
313 // TODO: move store_dirs_rebuilding hack to store modules that need it.
314 ++StoreController::store_dirs_rebuilding
;
318 Rock::SwapDir::needsDiskStrand() const
320 const bool wontEvenWorkWithoutDisker
= Config
.workers
> 1;
321 const bool wouldWorkBetterWithDisker
= DiskIOModule::Find("IpcIo");
322 return InDaemonMode() && (wontEvenWorkWithoutDisker
||
323 wouldWorkBetterWithDisker
);
327 Rock::SwapDir::parse(int anIndex
, char *aPath
)
331 path
= xstrdup(aPath
);
333 // cache store is located at path/db
335 fname
.append("/rock");
336 filePath
= xstrdup(fname
.termedBuf());
341 // Current openForWriting() code overwrites the old slot if needed
342 // and possible, so proactively removing old slots is probably useless.
343 assert(!repl
); // repl = createRemovalPolicy(Config.replPolicy);
349 Rock::SwapDir::reconfigure()
353 // TODO: can we reconfigure the replacement policy (repl)?
357 /// parse maximum db disk size
359 Rock::SwapDir::parseSize(const bool reconfig
)
361 const int i
= GetInteger();
363 fatal("negative Rock cache_dir size value");
364 const uint64_t new_max_size
=
365 static_cast<uint64_t>(i
) << 20; // MBytes to Bytes
367 max_size
= new_max_size
;
368 else if (new_max_size
!= max_size
) {
369 debugs(3, DBG_IMPORTANT
, "WARNING: cache_dir '" << path
<< "' size "
370 "cannot be changed dynamically, value left unchanged (" <<
371 (max_size
>> 20) << " MB)");
376 Rock::SwapDir::getOptionTree() const
378 ConfigOption
*copt
= ::SwapDir::getOptionTree();
379 ConfigOptionVector
*vector
= dynamic_cast<ConfigOptionVector
*>(copt
);
381 // if copt is actually a ConfigOptionVector
382 vector
->options
.push_back(new ConfigOptionAdapter
<SwapDir
>(*const_cast<SwapDir
*>(this), &SwapDir::parseSizeOption
, &SwapDir::dumpSizeOption
));
383 vector
->options
.push_back(new ConfigOptionAdapter
<SwapDir
>(*const_cast<SwapDir
*>(this), &SwapDir::parseTimeOption
, &SwapDir::dumpTimeOption
));
384 vector
->options
.push_back(new ConfigOptionAdapter
<SwapDir
>(*const_cast<SwapDir
*>(this), &SwapDir::parseRateOption
, &SwapDir::dumpRateOption
));
386 // we don't know how to handle copt, as it's not a ConfigOptionVector.
387 // free it (and return nullptr)
395 Rock::SwapDir::allowOptionReconfigure(const char *const option
) const
397 return strcmp(option
, "slot-size") != 0 &&
398 ::SwapDir::allowOptionReconfigure(option
);
401 /// parses time-specific options; mimics ::SwapDir::optionObjectSizeParse()
403 Rock::SwapDir::parseTimeOption(char const *option
, const char *value
, int reconfig
)
405 // TODO: ::SwapDir or, better, Config should provide time-parsing routines,
406 // including time unit handling. Same for size and rate.
408 time_msec_t
*storedTime
;
409 if (strcmp(option
, "swap-timeout") == 0)
410 storedTime
= &fileConfig
.ioTimeout
;
419 // TODO: handle time units and detect parsing errors better
420 const int64_t parsedValue
= strtoll(value
, NULL
, 10);
421 if (parsedValue
< 0) {
422 debugs(3, DBG_CRITICAL
, "FATAL: cache_dir " << path
<< ' ' << option
<< " must not be negative but is: " << parsedValue
);
427 const time_msec_t newTime
= static_cast<time_msec_t
>(parsedValue
);
430 *storedTime
= newTime
;
431 else if (*storedTime
!= newTime
) {
432 debugs(3, DBG_IMPORTANT
, "WARNING: cache_dir " << path
<< ' ' << option
433 << " cannot be changed dynamically, value left unchanged: " <<
440 /// reports time-specific options; mimics ::SwapDir::optionObjectSizeDump()
442 Rock::SwapDir::dumpTimeOption(StoreEntry
* e
) const
444 if (fileConfig
.ioTimeout
)
445 storeAppendPrintf(e
, " swap-timeout=%" PRId64
,
446 static_cast<int64_t>(fileConfig
.ioTimeout
));
449 /// parses rate-specific options; mimics ::SwapDir::optionObjectSizeParse()
451 Rock::SwapDir::parseRateOption(char const *option
, const char *value
, int isaReconfig
)
454 if (strcmp(option
, "max-swap-rate") == 0)
455 storedRate
= &fileConfig
.ioRate
;
464 // TODO: handle time units and detect parsing errors better
465 const int64_t parsedValue
= strtoll(value
, NULL
, 10);
466 if (parsedValue
< 0) {
467 debugs(3, DBG_CRITICAL
, "FATAL: cache_dir " << path
<< ' ' << option
<< " must not be negative but is: " << parsedValue
);
472 const int newRate
= static_cast<int>(parsedValue
);
475 debugs(3, DBG_CRITICAL
, "FATAL: cache_dir " << path
<< ' ' << option
<< " must not be negative but is: " << newRate
);
481 *storedRate
= newRate
;
482 else if (*storedRate
!= newRate
) {
483 debugs(3, DBG_IMPORTANT
, "WARNING: cache_dir " << path
<< ' ' << option
484 << " cannot be changed dynamically, value left unchanged: " <<
491 /// reports rate-specific options; mimics ::SwapDir::optionObjectSizeDump()
493 Rock::SwapDir::dumpRateOption(StoreEntry
* e
) const
495 if (fileConfig
.ioRate
>= 0)
496 storeAppendPrintf(e
, " max-swap-rate=%d", fileConfig
.ioRate
);
499 /// parses size-specific options; mimics ::SwapDir::optionObjectSizeParse()
501 Rock::SwapDir::parseSizeOption(char const *option
, const char *value
, int reconfig
)
503 uint64_t *storedSize
;
504 if (strcmp(option
, "slot-size") == 0)
505 storedSize
= &slotSize
;
514 // TODO: handle size units and detect parsing errors better
515 const uint64_t newSize
= strtoll(value
, NULL
, 10);
517 debugs(3, DBG_CRITICAL
, "FATAL: cache_dir " << path
<< ' ' << option
<< " must be positive; got: " << newSize
);
522 if (newSize
<= sizeof(DbCellHeader
)) {
523 debugs(3, DBG_CRITICAL
, "FATAL: cache_dir " << path
<< ' ' << option
<< " must exceed " << sizeof(DbCellHeader
) << "; got: " << newSize
);
529 *storedSize
= newSize
;
530 else if (*storedSize
!= newSize
) {
531 debugs(3, DBG_IMPORTANT
, "WARNING: cache_dir " << path
<< ' ' << option
532 << " cannot be changed dynamically, value left unchanged: " <<
539 /// reports size-specific options; mimics ::SwapDir::optionObjectSizeDump()
541 Rock::SwapDir::dumpSizeOption(StoreEntry
* e
) const
543 storeAppendPrintf(e
, " slot-size=%" PRId64
, slotSize
);
546 /// check the results of the configuration; only level-0 debugging works here
548 Rock::SwapDir::validateOptions()
551 fatal("Rock store requires a positive slot-size");
553 const int64_t maxSizeRoundingWaste
= 1024 * 1024; // size is configured in MB
554 const int64_t slotSizeRoundingWaste
= slotSize
;
555 const int64_t maxRoundingWaste
=
556 max(maxSizeRoundingWaste
, slotSizeRoundingWaste
);
558 // an entry consumes at least one slot; round up to reduce false warnings
559 const int64_t blockSize
= static_cast<int64_t>(slotSize
);
560 const int64_t maxObjSize
= max(blockSize
,
561 ((maxObjectSize()+blockSize
-1)/blockSize
)*blockSize
);
563 // Does the "sfileno*max-size" limit match configured db capacity?
564 const double entriesMayOccupy
= entryLimitAbsolute()*static_cast<double>(maxObjSize
);
565 if (entriesMayOccupy
+ maxRoundingWaste
< maxSize()) {
566 const int64_t diskWasteSize
= maxSize() - static_cast<int64_t>(entriesMayOccupy
);
567 debugs(47, DBG_CRITICAL
, "WARNING: Rock cache_dir " << path
<< " wastes disk space due to entry limits:" <<
568 "\n\tconfigured db capacity: " << maxSize() << " bytes" <<
569 "\n\tconfigured db slot size: " << slotSize
<< " bytes" <<
570 "\n\tconfigured maximum entry size: " << maxObjectSize() << " bytes" <<
571 "\n\tmaximum number of cache_dir entries supported by Squid: " << entryLimitAbsolute() <<
572 "\n\tdisk space all entries may use: " << entriesMayOccupy
<< " bytes" <<
573 "\n\tdisk space wasted: " << diskWasteSize
<< " bytes");
576 // Does the "absolute slot count" limit match configured db capacity?
577 const double slotsMayOccupy
= slotLimitAbsolute()*static_cast<double>(slotSize
);
578 if (slotsMayOccupy
+ maxRoundingWaste
< maxSize()) {
579 const int64_t diskWasteSize
= maxSize() - static_cast<int64_t>(entriesMayOccupy
);
580 debugs(47, DBG_CRITICAL
, "WARNING: Rock cache_dir " << path
<< " wastes disk space due to slot limits:" <<
581 "\n\tconfigured db capacity: " << maxSize() << " bytes" <<
582 "\n\tconfigured db slot size: " << slotSize
<< " bytes" <<
583 "\n\tmaximum number of rock cache_dir slots supported by Squid: " << slotLimitAbsolute() <<
584 "\n\tdisk space all slots may use: " << slotsMayOccupy
<< " bytes" <<
585 "\n\tdisk space wasted: " << diskWasteSize
<< " bytes");
590 Rock::SwapDir::rebuild()
592 //++StoreController::store_dirs_rebuilding; // see Rock::SwapDir::init()
593 AsyncJob::Start(new Rebuild(this));
597 Rock::SwapDir::canStore(const StoreEntry
&e
, int64_t diskSpaceNeeded
, int &load
) const
599 if (diskSpaceNeeded
>= 0)
600 diskSpaceNeeded
+= sizeof(DbCellHeader
);
601 if (!::SwapDir::canStore(e
, diskSpaceNeeded
, load
))
604 if (!theFile
|| !theFile
->canWrite())
610 // Do not start I/O transaction if there are less than 10% free pages left.
611 // TODO: reserve page instead
612 if (needsDiskStrand() &&
613 Ipc::Mem::PageLevel(Ipc::Mem::PageId::ioPage
) >= 0.9 * Ipc::Mem::PageLimit(Ipc::Mem::PageId::ioPage
)) {
614 debugs(47, 5, HERE
<< "too few shared pages for IPC I/O left");
625 StoreIOState::Pointer
626 Rock::SwapDir::createStoreIO(StoreEntry
&e
, StoreIOState::STFNCB
*cbFile
, StoreIOState::STIOCB
*cbIo
, void *data
)
628 if (!theFile
|| theFile
->error()) {
629 debugs(47,4, HERE
<< theFile
);
634 Ipc::StoreMapAnchor
*const slot
=
635 map
->openForWriting(reinterpret_cast<const cache_key
*>(e
.key
), filen
);
637 debugs(47, 5, HERE
<< "map->add failed");
644 // XXX: We rely on our caller, storeSwapOutStart(), to set e.fileno.
645 // If that does not happen, the entry will not decrement the read level!
647 Rock::SwapDir::Pointer
self(this);
648 IoState
*sio
= new IoState(self
, &e
, cbFile
, cbIo
, data
);
650 sio
->swap_dirn
= index
;
651 sio
->swap_filen
= filen
;
652 sio
->writeableAnchor_
= slot
;
654 debugs(47,5, HERE
<< "dir " << index
<< " created new filen " <<
655 std::setfill('0') << std::hex
<< std::uppercase
<< std::setw(8) <<
656 sio
->swap_filen
<< std::dec
<< " starting at " <<
657 diskOffset(sio
->swap_filen
));
665 StoreIOState::Pointer
666 Rock::SwapDir::createUpdateIO(const Ipc::StoreMapUpdate
&update
, StoreIOState::STFNCB
*cbFile
, StoreIOState::STIOCB
*cbIo
, void *data
)
668 if (!theFile
|| theFile
->error()) {
669 debugs(47,4, theFile
);
674 Must(update
.fresh
.fileNo
>= 0);
676 Rock::SwapDir::Pointer
self(this);
677 IoState
*sio
= new IoState(self
, update
.entry
, cbFile
, cbIo
, data
);
679 sio
->swap_dirn
= index
;
680 sio
->swap_filen
= update
.fresh
.fileNo
;
681 sio
->writeableAnchor_
= update
.fresh
.anchor
;
683 debugs(47,5, "dir " << index
<< " updating filen " <<
684 std::setfill('0') << std::hex
<< std::uppercase
<< std::setw(8) <<
685 sio
->swap_filen
<< std::dec
<< " starting at " <<
686 diskOffset(sio
->swap_filen
));
693 Rock::SwapDir::diskOffset(const SlotId sid
) const
696 return HeaderSize
+ slotSize
*sid
;
700 Rock::SwapDir::diskOffset(Ipc::Mem::PageId
&pageId
) const
703 return diskOffset(pageId
.number
- 1);
707 Rock::SwapDir::diskOffsetLimit() const
710 return diskOffset(map
->sliceLimit());
714 Rock::SwapDir::reserveSlotForWriting()
716 Ipc::Mem::PageId pageId
;
718 if (freeSlots
->pop(pageId
)) {
719 const auto slotId
= pageId
.number
- 1;
720 debugs(47, 5, "got a previously free slot: " << slotId
);
721 map
->prepFreeSlice(slotId
);
725 // catch free slots delivered to noteFreeMapSlice()
726 assert(!waitingForPage
);
727 waitingForPage
= &pageId
;
728 if (map
->purgeOne()) {
729 assert(!waitingForPage
); // noteFreeMapSlice() should have cleared it
730 assert(pageId
.set());
731 const auto slotId
= pageId
.number
- 1;
732 debugs(47, 5, "got a previously busy slot: " << slotId
);
733 map
->prepFreeSlice(slotId
);
736 assert(waitingForPage
== &pageId
);
737 waitingForPage
= NULL
;
739 // This may happen when the number of available db slots is close to the
740 // number of concurrent requests reading or writing those slots, which may
741 // happen when the db is "small" compared to the request traffic OR when we
742 // are rebuilding and have not loaded "many" entries or empty slots yet.
743 debugs(47, 3, "cannot get a slot; entries: " << map
->entryCount());
744 throw TexcHere("ran out of free db slots");
748 Rock::SwapDir::validSlotId(const SlotId slotId
) const
750 return 0 <= slotId
&& slotId
< slotLimitActual();
754 Rock::SwapDir::noteFreeMapSlice(const Ipc::StoreMapSliceId sliceId
)
756 Ipc::Mem::PageId pageId
;
757 pageId
.pool
= index
+1;
758 pageId
.number
= sliceId
+1;
759 if (waitingForPage
) {
760 *waitingForPage
= pageId
;
761 waitingForPage
= NULL
;
763 freeSlots
->push(pageId
);
767 // tries to open an old entry with swap_filen for reading
768 StoreIOState::Pointer
769 Rock::SwapDir::openStoreIO(StoreEntry
&e
, StoreIOState::STFNCB
*cbFile
, StoreIOState::STIOCB
*cbIo
, void *data
)
771 if (!theFile
|| theFile
->error()) {
772 debugs(47,4, HERE
<< theFile
);
777 debugs(47,4, HERE
<< e
);
781 // Do not start I/O transaction if there are less than 10% free pages left.
782 // TODO: reserve page instead
783 if (needsDiskStrand() &&
784 Ipc::Mem::PageLevel(Ipc::Mem::PageId::ioPage
) >= 0.9 * Ipc::Mem::PageLimit(Ipc::Mem::PageId::ioPage
)) {
785 debugs(47, 5, HERE
<< "too few shared pages for IPC I/O left");
789 // The are two ways an entry can get swap_filen: our get() locked it for
790 // reading or our storeSwapOutStart() locked it for writing. Peeking at our
791 // locked entry is safe, but no support for reading the entry we swap out.
792 const Ipc::StoreMapAnchor
*slot
= map
->peekAtReader(e
.swap_filen
);
794 return NULL
; // we were writing afterall
796 Rock::SwapDir::Pointer
self(this);
797 IoState
*sio
= new IoState(self
, &e
, cbFile
, cbIo
, data
);
799 sio
->swap_dirn
= index
;
800 sio
->swap_filen
= e
.swap_filen
;
801 sio
->readableAnchor_
= slot
;
804 debugs(47,5, HERE
<< "dir " << index
<< " has old filen: " <<
805 std::setfill('0') << std::hex
<< std::uppercase
<< std::setw(8) <<
808 assert(slot
->sameKey(static_cast<const cache_key
*>(e
.key
)));
809 // For collapsed disk hits: e.swap_file_sz and slot->basics.swap_file_sz
810 // may still be zero and basics.swap_file_sz may grow.
811 assert(slot
->basics
.swap_file_sz
>= e
.swap_file_sz
);
817 Rock::SwapDir::ioCompletedNotification()
820 fatalf("Rock cache_dir failed to initialize db file: %s", filePath
);
822 if (theFile
->error()) {
823 int xerrno
= errno
; // XXX: where does errno come from
824 fatalf("Rock cache_dir at %s failed to open db file: %s", filePath
,
828 debugs(47, 2, "Rock cache_dir[" << index
<< "] limits: " <<
829 std::setw(12) << maxSize() << " disk bytes, " <<
830 std::setw(7) << map
->entryLimit() << " entries, and " <<
831 std::setw(7) << map
->sliceLimit() << " slots");
837 Rock::SwapDir::closeCompleted()
843 Rock::SwapDir::readCompleted(const char *, int rlen
, int errflag
, RefCount
< ::ReadRequest
> r
)
845 ReadRequest
*request
= dynamic_cast<Rock::ReadRequest
*>(r
.getRaw());
847 IoState::Pointer sio
= request
->sio
;
849 if (errflag
== DISK_OK
&& rlen
> 0)
850 sio
->offset_
+= rlen
;
852 sio
->callReaderBack(r
->buf
, rlen
);
856 Rock::SwapDir::writeCompleted(int errflag
, size_t, RefCount
< ::WriteRequest
> r
)
858 Rock::WriteRequest
*request
= dynamic_cast<Rock::WriteRequest
*>(r
.getRaw());
860 assert(request
->sio
!= NULL
);
861 IoState
&sio
= *request
->sio
;
863 // quit if somebody called IoState::close() while we were waiting
864 if (!sio
.stillWaiting()) {
865 debugs(79, 3, "ignoring closed entry " << sio
.swap_filen
);
866 noteFreeMapSlice(request
->sidNext
);
870 debugs(79, 7, "errflag=" << errflag
<< " rlen=" << request
->len
<< " eof=" << request
->eof
);
872 if (errflag
!= DISK_OK
)
873 handleWriteCompletionProblem(errflag
, *request
);
874 else if (droppedEarlierRequest(*request
))
875 handleWriteCompletionProblem(DISK_ERROR
, *request
);
877 handleWriteCompletionSuccess(*request
);
879 if (sio
.touchingStoreEntry())
880 CollapsedForwarding::Broadcast(*sio
.e
);
883 /// code shared by writeCompleted() success handling cases
885 Rock::SwapDir::handleWriteCompletionSuccess(const WriteRequest
&request
)
887 auto &sio
= *(request
.sio
);
888 sio
.splicingPoint
= request
.sidCurrent
;
889 // do not increment sio.offset_ because we do it in sio->write()
891 // finalize the shared slice info after writing slice contents to disk
892 Ipc::StoreMap::Slice
&slice
=
893 map
->writeableSlice(sio
.swap_filen
, request
.sidCurrent
);
894 slice
.size
= request
.len
- sizeof(DbCellHeader
);
895 slice
.next
= request
.sidNext
;
899 assert(sio
.writeableAnchor_
);
900 if (sio
.touchingStoreEntry()) {
901 sio
.e
->swap_file_sz
= sio
.writeableAnchor_
->basics
.swap_file_sz
=
904 map
->switchWritingToReading(sio
.swap_filen
);
905 // sio.e keeps the (now read) lock on the anchor
907 sio
.writeableAnchor_
= NULL
;
908 sio
.finishedWriting(DISK_OK
);
912 /// code shared by writeCompleted() error handling cases
914 Rock::SwapDir::handleWriteCompletionProblem(const int errflag
, const WriteRequest
&request
)
916 auto &sio
= *request
.sio
;
918 noteFreeMapSlice(request
.sidNext
);
921 sio
.finishedWriting(errflag
);
922 // and hope that Core will call disconnect() to close the map entry
926 Rock::SwapDir::writeError(StoreIOState
&sio
)
928 // Do not abortWriting here. The entry should keep the write lock
929 // instead of losing association with the store and confusing core.
930 map
->freeEntry(sio
.swap_filen
); // will mark as unusable, just in case
932 if (sio
.touchingStoreEntry())
933 Store::Root().stopSharing(*sio
.e
);
934 // else noop: a fresh entry update error does not affect stale entry readers
936 // All callers must also call IoState callback, to propagate the error.
939 /// whether the disk has dropped at least one of the previous write requests
941 Rock::SwapDir::droppedEarlierRequest(const WriteRequest
&request
) const
943 const auto &sio
= *request
.sio
;
944 assert(sio
.writeableAnchor_
);
945 const Ipc::StoreMapSliceId expectedSliceId
= sio
.splicingPoint
< 0 ?
946 sio
.writeableAnchor_
->start
:
947 map
->writeableSlice(sio
.swap_filen
, sio
.splicingPoint
).next
;
948 if (expectedSliceId
!= request
.sidCurrent
) {
949 debugs(79, 3, "yes; expected " << expectedSliceId
<< ", but got " << request
.sidCurrent
);
957 Rock::SwapDir::updateHeaders(StoreEntry
*updatedE
)
962 Ipc::StoreMapUpdate
update(updatedE
);
963 if (!map
->openForUpdating(update
, updatedE
->swap_filen
))
967 AsyncJob::Start(new HeaderUpdater(this, update
));
968 } catch (const std::exception
&ex
) {
969 debugs(20, 2, "error starting to update entry " << *updatedE
<< ": " << ex
.what());
970 map
->abortUpdating(update
);
975 Rock::SwapDir::full() const
977 return freeSlots
!= NULL
&& !freeSlots
->size();
980 // storeSwapOutFileClosed calls this nethod on DISK_NO_SPACE_LEFT,
981 // but it should not happen for us
983 Rock::SwapDir::diskFull()
985 debugs(20, DBG_IMPORTANT
, "BUG: No space left with rock cache_dir: " <<
989 /// purge while full(); it should be sufficient to purge just one
991 Rock::SwapDir::maintain()
993 // The Store calls this to free some db space, but there is nothing wrong
994 // with a full() db, except when db has to shrink after reconfigure, and
995 // we do not support shrinking yet (it would have to purge specific slots).
996 // TODO: Disable maintain() requests when they are pointless.
1000 Rock::SwapDir::reference(StoreEntry
&e
)
1002 debugs(47, 5, HERE
<< &e
<< ' ' << e
.swap_dirn
<< ' ' << e
.swap_filen
);
1003 if (repl
&& repl
->Referenced
)
1004 repl
->Referenced(repl
, &e
, &e
.repl
);
1008 Rock::SwapDir::dereference(StoreEntry
&e
)
1010 debugs(47, 5, HERE
<< &e
<< ' ' << e
.swap_dirn
<< ' ' << e
.swap_filen
);
1011 if (repl
&& repl
->Dereferenced
)
1012 repl
->Dereferenced(repl
, &e
, &e
.repl
);
1014 // no need to keep e in the global store_table for us; we have our own map
1019 Rock::SwapDir::unlinkdUseful() const
1021 // no entry-specific files to unlink
1026 Rock::SwapDir::evictIfFound(const cache_key
*key
)
1029 map
->freeEntryByKey(key
); // may not be there
1033 Rock::SwapDir::evictCached(StoreEntry
&e
)
1036 if (e
.hasDisk(index
)) {
1037 if (map
->freeEntry(e
.swap_filen
))
1038 CollapsedForwarding::Broadcast(e
);
1041 } else if (const auto key
= e
.publicKey()) {
1047 Rock::SwapDir::trackReferences(StoreEntry
&e
)
1049 debugs(47, 5, HERE
<< e
);
1051 repl
->Add(repl
, &e
, &e
.repl
);
1055 Rock::SwapDir::ignoreReferences(StoreEntry
&e
)
1057 debugs(47, 5, HERE
<< e
);
1059 repl
->Remove(repl
, &e
, &e
.repl
);
1063 Rock::SwapDir::statfs(StoreEntry
&e
) const
1065 storeAppendPrintf(&e
, "\n");
1066 storeAppendPrintf(&e
, "Maximum Size: %" PRIu64
" KB\n", maxSize() >> 10);
1067 storeAppendPrintf(&e
, "Current Size: %.2f KB %.2f%%\n",
1068 currentSize() / 1024.0,
1069 Math::doublePercent(currentSize(), maxSize()));
1071 const int entryLimit
= entryLimitActual();
1072 const int slotLimit
= slotLimitActual();
1073 storeAppendPrintf(&e
, "Maximum entries: %9d\n", entryLimit
);
1074 if (map
&& entryLimit
> 0) {
1075 const int entryCount
= map
->entryCount();
1076 storeAppendPrintf(&e
, "Current entries: %9d %.2f%%\n",
1077 entryCount
, (100.0 * entryCount
/ entryLimit
));
1080 storeAppendPrintf(&e
, "Maximum slots: %9d\n", slotLimit
);
1081 if (map
&& slotLimit
> 0) {
1082 const unsigned int slotsFree
= !freeSlots
? 0 : freeSlots
->size();
1083 if (slotsFree
<= static_cast<const unsigned int>(slotLimit
)) {
1084 const int usedSlots
= slotLimit
- static_cast<const int>(slotsFree
);
1085 storeAppendPrintf(&e
, "Used slots: %9d %.2f%%\n",
1086 usedSlots
, (100.0 * usedSlots
/ slotLimit
));
1088 if (slotLimit
< 100) { // XXX: otherwise too expensive to count
1089 Ipc::ReadWriteLockStats stats
;
1090 map
->updateStats(stats
);
1095 storeAppendPrintf(&e
, "Pending operations: %d out of %d\n",
1096 store_open_disk_fd
, Config
.max_open_disk_fds
);
1098 storeAppendPrintf(&e
, "Flags:");
1101 storeAppendPrintf(&e
, " SELECTED");
1103 if (flags
.read_only
)
1104 storeAppendPrintf(&e
, " READ-ONLY");
1106 storeAppendPrintf(&e
, "\n");
1111 Rock::SwapDir::inodeMapPath() const
1113 return Ipc::Mem::Segment::Name(SBuf(path
), "map");
1117 Rock::SwapDir::freeSlotsPath() const
1119 static String spacesPath
;
1121 spacesPath
.append("_spaces");
1122 return spacesPath
.termedBuf();
1126 Rock::SwapDir::hasReadableEntry(const StoreEntry
&e
) const
1128 return map
->hasReadableEntry(reinterpret_cast<const cache_key
*>(e
.key
));
1133 RunnerRegistrationEntry(SwapDirRr
);
1136 void Rock::SwapDirRr::create()
1138 Must(mapOwners
.empty() && freeSlotsOwners
.empty());
1139 for (int i
= 0; i
< Config
.cacheSwap
.n_configured
; ++i
) {
1140 if (const Rock::SwapDir
*const sd
= dynamic_cast<Rock::SwapDir
*>(INDEXSD(i
))) {
1141 const int64_t capacity
= sd
->slotLimitActual();
1143 SwapDir::DirMap::Owner
*const mapOwner
=
1144 SwapDir::DirMap::Init(sd
->inodeMapPath(), capacity
);
1145 mapOwners
.push_back(mapOwner
);
1147 // TODO: somehow remove pool id and counters from PageStack?
1148 Ipc::Mem::Owner
<Ipc::Mem::PageStack
> *const freeSlotsOwner
=
1149 shm_new(Ipc::Mem::PageStack
)(sd
->freeSlotsPath(),
1151 freeSlotsOwners
.push_back(freeSlotsOwner
);
1153 // TODO: add method to initialize PageStack with no free pages
1155 Ipc::Mem::PageId pageId
;
1156 if (!freeSlotsOwner
->object()->pop(pageId
))
1163 Rock::SwapDirRr::~SwapDirRr()
1165 for (size_t i
= 0; i
< mapOwners
.size(); ++i
) {
1166 delete mapOwners
[i
];
1167 delete freeSlotsOwners
[i
];