]> git.ipfire.org Git - thirdparty/squid.git/blob - src/fs/rock/RockSwapDir.cc
SourceFormat Enforcement
[thirdparty/squid.git] / src / fs / rock / RockSwapDir.cc
1 /*
2 * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 47 Store Directory Routines */
10
11 #include "squid.h"
12 #include "cache_cf.h"
13 #include "CollapsedForwarding.h"
14 #include "ConfigOption.h"
15 #include "DiskIO/DiskIOModule.h"
16 #include "DiskIO/DiskIOStrategy.h"
17 #include "DiskIO/ReadRequest.h"
18 #include "DiskIO/WriteRequest.h"
19 #include "fs/rock/RockHeaderUpdater.h"
20 #include "fs/rock/RockIoRequests.h"
21 #include "fs/rock/RockIoState.h"
22 #include "fs/rock/RockRebuild.h"
23 #include "fs/rock/RockSwapDir.h"
24 #include "globals.h"
25 #include "ipc/mem/Pages.h"
26 #include "MemObject.h"
27 #include "Parsing.h"
28 #include "SquidConfig.h"
29 #include "SquidMath.h"
30 #include "tools.h"
31
32 #include <cstdlib>
33 #include <iomanip>
34 #include <limits>
35
36 #if HAVE_SYS_STAT_H
37 #include <sys/stat.h>
38 #endif
39
40 const int64_t Rock::SwapDir::HeaderSize = 16*1024;
41
42 Rock::SwapDir::SwapDir(): ::SwapDir("rock"),
43 slotSize(HeaderSize), filePath(NULL), map(NULL), io(NULL),
44 waitingForPage(NULL)
45 {
46 }
47
48 Rock::SwapDir::~SwapDir()
49 {
50 delete io;
51 delete map;
52 safe_free(filePath);
53 }
54
55 // called when Squid core needs a StoreEntry with a given key
56 StoreEntry *
57 Rock::SwapDir::get(const cache_key *key)
58 {
59 if (!map || !theFile || !theFile->canRead())
60 return NULL;
61
62 sfileno filen;
63 const Ipc::StoreMapAnchor *const slot = map->openForReading(key, filen);
64 if (!slot)
65 return NULL;
66
67 // create a brand new store entry and initialize it with stored basics
68 StoreEntry *e = new StoreEntry();
69 anchorEntry(*e, filen, *slot);
70
71 e->hashInsert(key);
72 trackReferences(*e);
73
74 return e;
75 // the disk entry remains open for reading, protected from modifications
76 }
77
78 bool
79 Rock::SwapDir::anchorCollapsed(StoreEntry &collapsed, bool &inSync)
80 {
81 if (!map || !theFile || !theFile->canRead())
82 return false;
83
84 sfileno filen;
85 const Ipc::StoreMapAnchor *const slot = map->openForReading(
86 reinterpret_cast<cache_key*>(collapsed.key), filen);
87 if (!slot)
88 return false;
89
90 anchorEntry(collapsed, filen, *slot);
91 inSync = updateCollapsedWith(collapsed, *slot);
92 return true; // even if inSync is false
93 }
94
95 bool
96 Rock::SwapDir::updateCollapsed(StoreEntry &collapsed)
97 {
98 if (!map || !theFile || !theFile->canRead())
99 return false;
100
101 if (collapsed.swap_filen < 0) // no longer using a disk cache
102 return true;
103 assert(collapsed.swap_dirn == index);
104
105 const Ipc::StoreMapAnchor &s = map->readableEntry(collapsed.swap_filen);
106 return updateCollapsedWith(collapsed, s);
107 }
108
109 bool
110 Rock::SwapDir::updateCollapsedWith(StoreEntry &collapsed, const Ipc::StoreMapAnchor &anchor)
111 {
112 collapsed.swap_file_sz = anchor.basics.swap_file_sz;
113 return true;
114 }
115
116 void
117 Rock::SwapDir::anchorEntry(StoreEntry &e, const sfileno filen, const Ipc::StoreMapAnchor &anchor)
118 {
119 const Ipc::StoreMapAnchor::Basics &basics = anchor.basics;
120
121 e.swap_file_sz = basics.swap_file_sz;
122 e.lastref = basics.lastref;
123 e.timestamp = basics.timestamp;
124 e.expires = basics.expires;
125 e.lastModified(basics.lastmod);
126 e.refcount = basics.refcount;
127 e.flags = basics.flags;
128
129 if (anchor.complete()) {
130 e.store_status = STORE_OK;
131 e.swap_status = SWAPOUT_DONE;
132 } else {
133 e.store_status = STORE_PENDING;
134 e.swap_status = SWAPOUT_WRITING; // even though another worker writes?
135 }
136
137 e.ping_status = PING_NONE;
138
139 EBIT_CLR(e.flags, RELEASE_REQUEST);
140 EBIT_CLR(e.flags, KEY_PRIVATE);
141 EBIT_SET(e.flags, ENTRY_VALIDATED);
142
143 e.swap_dirn = index;
144 e.swap_filen = filen;
145 }
146
147 void Rock::SwapDir::disconnect(StoreEntry &e)
148 {
149 assert(e.swap_dirn == index);
150 assert(e.swap_filen >= 0);
151 // cannot have SWAPOUT_NONE entry with swap_filen >= 0
152 assert(e.swap_status != SWAPOUT_NONE);
153
154 // do not rely on e.swap_status here because there is an async delay
155 // before it switches from SWAPOUT_WRITING to SWAPOUT_DONE.
156
157 // since e has swap_filen, its slot is locked for reading and/or writing
158 // but it is difficult to know whether THIS worker is reading or writing e,
159 // especially since we may switch from writing to reading. This code relies
160 // on Rock::IoState::writeableAnchor_ being set when we locked for writing.
161 if (e.mem_obj && e.mem_obj->swapout.sio != NULL &&
162 dynamic_cast<IoState&>(*e.mem_obj->swapout.sio).writeableAnchor_) {
163 map->abortWriting(e.swap_filen);
164 e.swap_dirn = -1;
165 e.swap_filen = -1;
166 e.swap_status = SWAPOUT_NONE;
167 dynamic_cast<IoState&>(*e.mem_obj->swapout.sio).writeableAnchor_ = NULL;
168 Store::Root().transientsAbandon(e); // broadcasts after the change
169 } else {
170 map->closeForReading(e.swap_filen);
171 e.swap_dirn = -1;
172 e.swap_filen = -1;
173 e.swap_status = SWAPOUT_NONE;
174 }
175 }
176
177 uint64_t
178 Rock::SwapDir::currentSize() const
179 {
180 const uint64_t spaceSize = !freeSlots ?
181 maxSize() : (slotSize * freeSlots->size());
182 // everything that is not free is in use
183 return maxSize() - spaceSize;
184 }
185
186 uint64_t
187 Rock::SwapDir::currentCount() const
188 {
189 return map ? map->entryCount() : 0;
190 }
191
192 /// In SMP mode only the disker process reports stats to avoid
193 /// counting the same stats by multiple processes.
194 bool
195 Rock::SwapDir::doReportStat() const
196 {
197 return ::SwapDir::doReportStat() && (!UsingSmp() || IamDiskProcess());
198 }
199
200 void
201 Rock::SwapDir::swappedOut(const StoreEntry &)
202 {
203 // stats are not stored but computed when needed
204 }
205
206 int64_t
207 Rock::SwapDir::slotLimitAbsolute() const
208 {
209 // the max value is an invalid one; all values must be below the limit
210 assert(std::numeric_limits<Ipc::StoreMapSliceId>::max() ==
211 std::numeric_limits<SlotId>::max());
212 return std::numeric_limits<SlotId>::max();
213 }
214
215 int64_t
216 Rock::SwapDir::slotLimitActual() const
217 {
218 const int64_t sWanted = (maxSize() - HeaderSize)/slotSize;
219 const int64_t sLimitLo = map ? map->sliceLimit() : 0; // dynamic shrinking unsupported
220 const int64_t sLimitHi = slotLimitAbsolute();
221 return min(max(sLimitLo, sWanted), sLimitHi);
222 }
223
224 int64_t
225 Rock::SwapDir::entryLimitActual() const
226 {
227 return min(slotLimitActual(), entryLimitAbsolute());
228 }
229
230 // TODO: encapsulate as a tool
231 void
232 Rock::SwapDir::create()
233 {
234 assert(path);
235 assert(filePath);
236
237 if (UsingSmp() && !IamDiskProcess()) {
238 debugs (47,3, HERE << "disker will create in " << path);
239 return;
240 }
241
242 debugs (47,3, HERE << "creating in " << path);
243
244 struct stat dir_sb;
245 if (::stat(path, &dir_sb) == 0) {
246 struct stat file_sb;
247 if (::stat(filePath, &file_sb) == 0) {
248 debugs (47, DBG_IMPORTANT, "Skipping existing Rock db: " << filePath);
249 return;
250 }
251 // else the db file is not there or is not accessible, and we will try
252 // to create it later below, generating a detailed error on failures.
253 } else { // path does not exist or is inaccessible
254 // If path exists but is not accessible, mkdir() below will fail, and
255 // the admin should see the error and act accordingly, so there is
256 // no need to distinguish ENOENT from other possible stat() errors.
257 debugs (47, DBG_IMPORTANT, "Creating Rock db directory: " << path);
258 const int res = mkdir(path, 0700);
259 if (res != 0)
260 createError("mkdir");
261 }
262
263 debugs (47, DBG_IMPORTANT, "Creating Rock db: " << filePath);
264 const int swap = open(filePath, O_WRONLY|O_CREAT|O_TRUNC|O_BINARY, 0600);
265 if (swap < 0)
266 createError("create");
267
268 #if SLOWLY_FILL_WITH_ZEROS
269 char block[1024];
270 Must(maxSize() % sizeof(block) == 0);
271 memset(block, '\0', sizeof(block));
272
273 for (off_t offset = 0; offset < maxSize(); offset += sizeof(block)) {
274 if (write(swap, block, sizeof(block)) != sizeof(block))
275 createError("write");
276 }
277 #else
278 if (ftruncate(swap, maxSize()) != 0)
279 createError("truncate");
280
281 char header[HeaderSize];
282 memset(header, '\0', sizeof(header));
283 if (write(swap, header, sizeof(header)) != sizeof(header))
284 createError("write");
285 #endif
286
287 close(swap);
288 }
289
290 // report Rock DB creation error and exit
291 void
292 Rock::SwapDir::createError(const char *const msg)
293 {
294 int xerrno = errno; // XXX: where does errno come from?
295 debugs(47, DBG_CRITICAL, "ERROR: Failed to initialize Rock Store db in " <<
296 filePath << "; " << msg << " error: " << xstrerr(xerrno));
297 fatal("Rock Store db creation error");
298 }
299
300 void
301 Rock::SwapDir::init()
302 {
303 debugs(47,2, HERE);
304
305 // XXX: SwapDirs aren't refcounted. We make IORequestor calls, which
306 // are refcounted. We up our count once to avoid implicit delete's.
307 lock();
308
309 freeSlots = shm_old(Ipc::Mem::PageStack)(freeSlotsPath());
310
311 Must(!map);
312 map = new DirMap(inodeMapPath());
313 map->cleaner = this;
314
315 const char *ioModule = needsDiskStrand() ? "IpcIo" : "Blocking";
316 if (DiskIOModule *m = DiskIOModule::Find(ioModule)) {
317 debugs(47,2, HERE << "Using DiskIO module: " << ioModule);
318 io = m->createStrategy();
319 io->init();
320 } else {
321 debugs(47, DBG_CRITICAL, "FATAL: Rock store is missing DiskIO module: " <<
322 ioModule);
323 fatal("Rock Store missing a required DiskIO module");
324 }
325
326 theFile = io->newFile(filePath);
327 theFile->configure(fileConfig);
328 theFile->open(O_RDWR, 0644, this);
329
330 // Increment early. Otherwise, if one SwapDir finishes rebuild before
331 // others start, storeRebuildComplete() will think the rebuild is over!
332 // TODO: move store_dirs_rebuilding hack to store modules that need it.
333 ++StoreController::store_dirs_rebuilding;
334 }
335
336 bool
337 Rock::SwapDir::needsDiskStrand() const
338 {
339 const bool wontEvenWorkWithoutDisker = Config.workers > 1;
340 const bool wouldWorkBetterWithDisker = DiskIOModule::Find("IpcIo");
341 return InDaemonMode() && (wontEvenWorkWithoutDisker ||
342 wouldWorkBetterWithDisker);
343 }
344
345 void
346 Rock::SwapDir::parse(int anIndex, char *aPath)
347 {
348 index = anIndex;
349
350 path = xstrdup(aPath);
351
352 // cache store is located at path/db
353 String fname(path);
354 fname.append("/rock");
355 filePath = xstrdup(fname.termedBuf());
356
357 parseSize(false);
358 parseOptions(0);
359
360 // Current openForWriting() code overwrites the old slot if needed
361 // and possible, so proactively removing old slots is probably useless.
362 assert(!repl); // repl = createRemovalPolicy(Config.replPolicy);
363
364 validateOptions();
365 }
366
367 void
368 Rock::SwapDir::reconfigure()
369 {
370 parseSize(true);
371 parseOptions(1);
372 // TODO: can we reconfigure the replacement policy (repl)?
373 validateOptions();
374 }
375
376 /// parse maximum db disk size
377 void
378 Rock::SwapDir::parseSize(const bool reconfig)
379 {
380 const int i = GetInteger();
381 if (i < 0)
382 fatal("negative Rock cache_dir size value");
383 const uint64_t new_max_size =
384 static_cast<uint64_t>(i) << 20; // MBytes to Bytes
385 if (!reconfig)
386 max_size = new_max_size;
387 else if (new_max_size != max_size) {
388 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir '" << path << "' size "
389 "cannot be changed dynamically, value left unchanged (" <<
390 (max_size >> 20) << " MB)");
391 }
392 }
393
394 ConfigOption *
395 Rock::SwapDir::getOptionTree() const
396 {
397 ConfigOption *copt = ::SwapDir::getOptionTree();
398 ConfigOptionVector *vector = dynamic_cast<ConfigOptionVector*>(copt);
399 if (vector) {
400 // if copt is actually a ConfigOptionVector
401 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseSizeOption, &SwapDir::dumpSizeOption));
402 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseTimeOption, &SwapDir::dumpTimeOption));
403 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseRateOption, &SwapDir::dumpRateOption));
404 } else {
405 // we don't know how to handle copt, as it's not a ConfigOptionVector.
406 // free it (and return nullptr)
407 delete copt;
408 copt = nullptr;
409 }
410 return copt;
411 }
412
413 bool
414 Rock::SwapDir::allowOptionReconfigure(const char *const option) const
415 {
416 return strcmp(option, "slot-size") != 0 &&
417 ::SwapDir::allowOptionReconfigure(option);
418 }
419
420 /// parses time-specific options; mimics ::SwapDir::optionObjectSizeParse()
421 bool
422 Rock::SwapDir::parseTimeOption(char const *option, const char *value, int reconfig)
423 {
424 // TODO: ::SwapDir or, better, Config should provide time-parsing routines,
425 // including time unit handling. Same for size and rate.
426
427 time_msec_t *storedTime;
428 if (strcmp(option, "swap-timeout") == 0)
429 storedTime = &fileConfig.ioTimeout;
430 else
431 return false;
432
433 if (!value)
434 self_destruct();
435
436 // TODO: handle time units and detect parsing errors better
437 const int64_t parsedValue = strtoll(value, NULL, 10);
438 if (parsedValue < 0) {
439 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
440 self_destruct();
441 }
442
443 const time_msec_t newTime = static_cast<time_msec_t>(parsedValue);
444
445 if (!reconfig)
446 *storedTime = newTime;
447 else if (*storedTime != newTime) {
448 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
449 << " cannot be changed dynamically, value left unchanged: " <<
450 *storedTime);
451 }
452
453 return true;
454 }
455
456 /// reports time-specific options; mimics ::SwapDir::optionObjectSizeDump()
457 void
458 Rock::SwapDir::dumpTimeOption(StoreEntry * e) const
459 {
460 if (fileConfig.ioTimeout)
461 storeAppendPrintf(e, " swap-timeout=%" PRId64,
462 static_cast<int64_t>(fileConfig.ioTimeout));
463 }
464
465 /// parses rate-specific options; mimics ::SwapDir::optionObjectSizeParse()
466 bool
467 Rock::SwapDir::parseRateOption(char const *option, const char *value, int isaReconfig)
468 {
469 int *storedRate;
470 if (strcmp(option, "max-swap-rate") == 0)
471 storedRate = &fileConfig.ioRate;
472 else
473 return false;
474
475 if (!value)
476 self_destruct();
477
478 // TODO: handle time units and detect parsing errors better
479 const int64_t parsedValue = strtoll(value, NULL, 10);
480 if (parsedValue < 0) {
481 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
482 self_destruct();
483 }
484
485 const int newRate = static_cast<int>(parsedValue);
486
487 if (newRate < 0) {
488 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << newRate);
489 self_destruct();
490 }
491
492 if (!isaReconfig)
493 *storedRate = newRate;
494 else if (*storedRate != newRate) {
495 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
496 << " cannot be changed dynamically, value left unchanged: " <<
497 *storedRate);
498 }
499
500 return true;
501 }
502
503 /// reports rate-specific options; mimics ::SwapDir::optionObjectSizeDump()
504 void
505 Rock::SwapDir::dumpRateOption(StoreEntry * e) const
506 {
507 if (fileConfig.ioRate >= 0)
508 storeAppendPrintf(e, " max-swap-rate=%d", fileConfig.ioRate);
509 }
510
511 /// parses size-specific options; mimics ::SwapDir::optionObjectSizeParse()
512 bool
513 Rock::SwapDir::parseSizeOption(char const *option, const char *value, int reconfig)
514 {
515 uint64_t *storedSize;
516 if (strcmp(option, "slot-size") == 0)
517 storedSize = &slotSize;
518 else
519 return false;
520
521 if (!value)
522 self_destruct();
523
524 // TODO: handle size units and detect parsing errors better
525 const uint64_t newSize = strtoll(value, NULL, 10);
526 if (newSize <= 0) {
527 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must be positive; got: " << newSize);
528 self_destruct();
529 }
530
531 if (newSize <= sizeof(DbCellHeader)) {
532 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must exceed " << sizeof(DbCellHeader) << "; got: " << newSize);
533 self_destruct();
534 }
535
536 if (!reconfig)
537 *storedSize = newSize;
538 else if (*storedSize != newSize) {
539 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
540 << " cannot be changed dynamically, value left unchanged: " <<
541 *storedSize);
542 }
543
544 return true;
545 }
546
547 /// reports size-specific options; mimics ::SwapDir::optionObjectSizeDump()
548 void
549 Rock::SwapDir::dumpSizeOption(StoreEntry * e) const
550 {
551 storeAppendPrintf(e, " slot-size=%" PRId64, slotSize);
552 }
553
554 /// check the results of the configuration; only level-0 debugging works here
555 void
556 Rock::SwapDir::validateOptions()
557 {
558 if (slotSize <= 0)
559 fatal("Rock store requires a positive slot-size");
560
561 const int64_t maxSizeRoundingWaste = 1024 * 1024; // size is configured in MB
562 const int64_t slotSizeRoundingWaste = slotSize;
563 const int64_t maxRoundingWaste =
564 max(maxSizeRoundingWaste, slotSizeRoundingWaste);
565
566 // an entry consumes at least one slot; round up to reduce false warnings
567 const int64_t blockSize = static_cast<int64_t>(slotSize);
568 const int64_t maxObjSize = max(blockSize,
569 ((maxObjectSize()+blockSize-1)/blockSize)*blockSize);
570
571 // Does the "sfileno*max-size" limit match configured db capacity?
572 const double entriesMayOccupy = entryLimitAbsolute()*static_cast<double>(maxObjSize);
573 if (entriesMayOccupy + maxRoundingWaste < maxSize()) {
574 const int64_t diskWasteSize = maxSize() - static_cast<int64_t>(entriesMayOccupy);
575 debugs(47, DBG_CRITICAL, "WARNING: Rock cache_dir " << path << " wastes disk space due to entry limits:" <<
576 "\n\tconfigured db capacity: " << maxSize() << " bytes" <<
577 "\n\tconfigured db slot size: " << slotSize << " bytes" <<
578 "\n\tconfigured maximum entry size: " << maxObjectSize() << " bytes" <<
579 "\n\tmaximum number of cache_dir entries supported by Squid: " << entryLimitAbsolute() <<
580 "\n\tdisk space all entries may use: " << entriesMayOccupy << " bytes" <<
581 "\n\tdisk space wasted: " << diskWasteSize << " bytes");
582 }
583
584 // Does the "absolute slot count" limit match configured db capacity?
585 const double slotsMayOccupy = slotLimitAbsolute()*static_cast<double>(slotSize);
586 if (slotsMayOccupy + maxRoundingWaste < maxSize()) {
587 const int64_t diskWasteSize = maxSize() - static_cast<int64_t>(entriesMayOccupy);
588 debugs(47, DBG_CRITICAL, "WARNING: Rock cache_dir " << path << " wastes disk space due to slot limits:" <<
589 "\n\tconfigured db capacity: " << maxSize() << " bytes" <<
590 "\n\tconfigured db slot size: " << slotSize << " bytes" <<
591 "\n\tmaximum number of rock cache_dir slots supported by Squid: " << slotLimitAbsolute() <<
592 "\n\tdisk space all slots may use: " << slotsMayOccupy << " bytes" <<
593 "\n\tdisk space wasted: " << diskWasteSize << " bytes");
594 }
595 }
596
597 void
598 Rock::SwapDir::rebuild()
599 {
600 //++StoreController::store_dirs_rebuilding; // see Rock::SwapDir::init()
601 AsyncJob::Start(new Rebuild(this));
602 }
603
604 bool
605 Rock::SwapDir::canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const
606 {
607 if (diskSpaceNeeded >= 0)
608 diskSpaceNeeded += sizeof(DbCellHeader);
609 if (!::SwapDir::canStore(e, diskSpaceNeeded, load))
610 return false;
611
612 if (!theFile || !theFile->canWrite())
613 return false;
614
615 if (!map)
616 return false;
617
618 // Do not start I/O transaction if there are less than 10% free pages left.
619 // TODO: reserve page instead
620 if (needsDiskStrand() &&
621 Ipc::Mem::PageLevel(Ipc::Mem::PageId::ioPage) >= 0.9 * Ipc::Mem::PageLimit(Ipc::Mem::PageId::ioPage)) {
622 debugs(47, 5, HERE << "too few shared pages for IPC I/O left");
623 return false;
624 }
625
626 if (io->shedLoad())
627 return false;
628
629 load = io->load();
630 return true;
631 }
632
633 StoreIOState::Pointer
634 Rock::SwapDir::createStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
635 {
636 if (!theFile || theFile->error()) {
637 debugs(47,4, HERE << theFile);
638 return NULL;
639 }
640
641 sfileno filen;
642 Ipc::StoreMapAnchor *const slot =
643 map->openForWriting(reinterpret_cast<const cache_key *>(e.key), filen);
644 if (!slot) {
645 debugs(47, 5, HERE << "map->add failed");
646 return NULL;
647 }
648
649 assert(filen >= 0);
650 slot->set(e);
651
652 // XXX: We rely on our caller, storeSwapOutStart(), to set e.fileno.
653 // If that does not happen, the entry will not decrement the read level!
654
655 Rock::SwapDir::Pointer self(this);
656 IoState *sio = new IoState(self, &e, cbFile, cbIo, data);
657
658 sio->swap_dirn = index;
659 sio->swap_filen = filen;
660 sio->writeableAnchor_ = slot;
661
662 debugs(47,5, HERE << "dir " << index << " created new filen " <<
663 std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
664 sio->swap_filen << std::dec << " starting at " <<
665 diskOffset(sio->swap_filen));
666
667 sio->file(theFile);
668
669 trackReferences(e);
670 return sio;
671 }
672
673 StoreIOState::Pointer
674 Rock::SwapDir::createUpdateIO(const Ipc::StoreMapUpdate &update, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
675 {
676 if (!theFile || theFile->error()) {
677 debugs(47,4, theFile);
678 return nullptr;
679 }
680
681 Must(update.fresh);
682 Must(update.fresh.fileNo >= 0);
683
684 Rock::SwapDir::Pointer self(this);
685 IoState *sio = new IoState(self, update.entry, cbFile, cbIo, data);
686
687 sio->swap_dirn = index;
688 sio->swap_filen = update.fresh.fileNo;
689 sio->writeableAnchor_ = update.fresh.anchor;
690
691 debugs(47,5, "dir " << index << " updating filen " <<
692 std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
693 sio->swap_filen << std::dec << " starting at " <<
694 diskOffset(sio->swap_filen));
695
696 sio->file(theFile);
697 return sio;
698 }
699
700 int64_t
701 Rock::SwapDir::diskOffset(const SlotId sid) const
702 {
703 assert(sid >= 0);
704 return HeaderSize + slotSize*sid;
705 }
706
707 int64_t
708 Rock::SwapDir::diskOffset(Ipc::Mem::PageId &pageId) const
709 {
710 assert(pageId);
711 return diskOffset(pageId.number - 1);
712 }
713
714 int64_t
715 Rock::SwapDir::diskOffsetLimit() const
716 {
717 assert(map);
718 return diskOffset(map->sliceLimit());
719 }
720
721 bool
722 Rock::SwapDir::useFreeSlot(Ipc::Mem::PageId &pageId)
723 {
724 if (freeSlots->pop(pageId)) {
725 debugs(47, 5, "got a previously free slot: " << pageId);
726 return true;
727 }
728
729 // catch free slots delivered to noteFreeMapSlice()
730 assert(!waitingForPage);
731 waitingForPage = &pageId;
732 if (map->purgeOne()) {
733 assert(!waitingForPage); // noteFreeMapSlice() should have cleared it
734 assert(pageId.set());
735 debugs(47, 5, "got a previously busy slot: " << pageId);
736 return true;
737 }
738 assert(waitingForPage == &pageId);
739 waitingForPage = NULL;
740
741 debugs(47, 3, "cannot get a slot; entries: " << map->entryCount());
742 return false;
743 }
744
745 bool
746 Rock::SwapDir::validSlotId(const SlotId slotId) const
747 {
748 return 0 <= slotId && slotId < slotLimitActual();
749 }
750
751 void
752 Rock::SwapDir::noteFreeMapSlice(const Ipc::StoreMapSliceId sliceId)
753 {
754 Ipc::Mem::PageId pageId;
755 pageId.pool = index+1;
756 pageId.number = sliceId+1;
757 if (waitingForPage) {
758 *waitingForPage = pageId;
759 waitingForPage = NULL;
760 } else {
761 freeSlots->push(pageId);
762 }
763 }
764
765 // tries to open an old entry with swap_filen for reading
766 StoreIOState::Pointer
767 Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
768 {
769 if (!theFile || theFile->error()) {
770 debugs(47,4, HERE << theFile);
771 return NULL;
772 }
773
774 if (e.swap_filen < 0) {
775 debugs(47,4, HERE << e);
776 return NULL;
777 }
778
779 // Do not start I/O transaction if there are less than 10% free pages left.
780 // TODO: reserve page instead
781 if (needsDiskStrand() &&
782 Ipc::Mem::PageLevel(Ipc::Mem::PageId::ioPage) >= 0.9 * Ipc::Mem::PageLimit(Ipc::Mem::PageId::ioPage)) {
783 debugs(47, 5, HERE << "too few shared pages for IPC I/O left");
784 return NULL;
785 }
786
787 // The are two ways an entry can get swap_filen: our get() locked it for
788 // reading or our storeSwapOutStart() locked it for writing. Peeking at our
789 // locked entry is safe, but no support for reading the entry we swap out.
790 const Ipc::StoreMapAnchor *slot = map->peekAtReader(e.swap_filen);
791 if (!slot)
792 return NULL; // we were writing afterall
793
794 Rock::SwapDir::Pointer self(this);
795 IoState *sio = new IoState(self, &e, cbFile, cbIo, data);
796
797 sio->swap_dirn = index;
798 sio->swap_filen = e.swap_filen;
799 sio->readableAnchor_ = slot;
800 sio->file(theFile);
801
802 debugs(47,5, HERE << "dir " << index << " has old filen: " <<
803 std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
804 sio->swap_filen);
805
806 assert(slot->sameKey(static_cast<const cache_key*>(e.key)));
807 // For collapsed disk hits: e.swap_file_sz and slot->basics.swap_file_sz
808 // may still be zero and basics.swap_file_sz may grow.
809 assert(slot->basics.swap_file_sz >= e.swap_file_sz);
810
811 return sio;
812 }
813
814 void
815 Rock::SwapDir::ioCompletedNotification()
816 {
817 if (!theFile)
818 fatalf("Rock cache_dir failed to initialize db file: %s", filePath);
819
820 if (theFile->error()) {
821 int xerrno = errno; // XXX: where does errno come from
822 fatalf("Rock cache_dir at %s failed to open db file: %s", filePath,
823 xstrerr(xerrno));
824 }
825
826 debugs(47, 2, "Rock cache_dir[" << index << "] limits: " <<
827 std::setw(12) << maxSize() << " disk bytes, " <<
828 std::setw(7) << map->entryLimit() << " entries, and " <<
829 std::setw(7) << map->sliceLimit() << " slots");
830
831 rebuild();
832 }
833
834 void
835 Rock::SwapDir::closeCompleted()
836 {
837 theFile = NULL;
838 }
839
840 void
841 Rock::SwapDir::readCompleted(const char *, int rlen, int errflag, RefCount< ::ReadRequest> r)
842 {
843 ReadRequest *request = dynamic_cast<Rock::ReadRequest*>(r.getRaw());
844 assert(request);
845 IoState::Pointer sio = request->sio;
846
847 if (errflag == DISK_OK && rlen > 0)
848 sio->offset_ += rlen;
849
850 sio->callReaderBack(r->buf, rlen);
851 }
852
853 void
854 Rock::SwapDir::writeCompleted(int errflag, size_t, RefCount< ::WriteRequest> r)
855 {
856 Rock::WriteRequest *request = dynamic_cast<Rock::WriteRequest*>(r.getRaw());
857 assert(request);
858 assert(request->sio != NULL);
859 IoState &sio = *request->sio;
860
861 // quit if somebody called IoState::close() while we were waiting
862 if (!sio.stillWaiting()) {
863 debugs(79, 3, "ignoring closed entry " << sio.swap_filen);
864 noteFreeMapSlice(request->sidNext);
865 return;
866 }
867
868 debugs(79, 7, "errflag=" << errflag << " rlen=" << request->len << " eof=" << request->eof);
869
870 // TODO: Fail if disk dropped one of the previous write requests.
871
872 if (errflag == DISK_OK) {
873 // do not increment sio.offset_ because we do it in sio->write()
874
875 // finalize the shared slice info after writing slice contents to disk
876 Ipc::StoreMap::Slice &slice =
877 map->writeableSlice(sio.swap_filen, request->sidCurrent);
878 slice.size = request->len - sizeof(DbCellHeader);
879 slice.next = request->sidNext;
880
881 if (request->eof) {
882 assert(sio.e);
883 assert(sio.writeableAnchor_);
884 if (sio.touchingStoreEntry()) {
885 sio.e->swap_file_sz = sio.writeableAnchor_->basics.swap_file_sz =
886 sio.offset_;
887
888 // close, the entry gets the read lock
889 map->closeForWriting(sio.swap_filen, true);
890 }
891 sio.writeableAnchor_ = NULL;
892 sio.splicingPoint = request->sidCurrent;
893 sio.finishedWriting(errflag);
894 }
895 } else {
896 noteFreeMapSlice(request->sidNext);
897
898 writeError(sio);
899 sio.finishedWriting(errflag);
900 // and hope that Core will call disconnect() to close the map entry
901 }
902
903 if (sio.touchingStoreEntry())
904 CollapsedForwarding::Broadcast(*sio.e);
905 }
906
907 void
908 Rock::SwapDir::writeError(StoreIOState &sio)
909 {
910 // Do not abortWriting here. The entry should keep the write lock
911 // instead of losing association with the store and confusing core.
912 map->freeEntry(sio.swap_filen); // will mark as unusable, just in case
913
914 if (sio.touchingStoreEntry())
915 Store::Root().transientsAbandon(*sio.e);
916 // else noop: a fresh entry update error does not affect stale entry readers
917
918 // All callers must also call IoState callback, to propagate the error.
919 }
920
921 void
922 Rock::SwapDir::updateHeaders(StoreEntry *updatedE)
923 {
924 if (!map)
925 return;
926
927 Ipc::StoreMapUpdate update(updatedE);
928 if (!map->openForUpdating(update, updatedE->swap_filen))
929 return;
930
931 try {
932 AsyncJob::Start(new HeaderUpdater(this, update));
933 } catch (const std::exception &ex) {
934 debugs(20, 2, "error starting to update entry " << *updatedE << ": " << ex.what());
935 map->abortUpdating(update);
936 }
937 }
938
939 bool
940 Rock::SwapDir::full() const
941 {
942 return freeSlots != NULL && !freeSlots->size();
943 }
944
945 // storeSwapOutFileClosed calls this nethod on DISK_NO_SPACE_LEFT,
946 // but it should not happen for us
947 void
948 Rock::SwapDir::diskFull()
949 {
950 debugs(20, DBG_IMPORTANT, "BUG: No space left with rock cache_dir: " <<
951 filePath);
952 }
953
954 /// purge while full(); it should be sufficient to purge just one
955 void
956 Rock::SwapDir::maintain()
957 {
958 // The Store calls this to free some db space, but there is nothing wrong
959 // with a full() db, except when db has to shrink after reconfigure, and
960 // we do not support shrinking yet (it would have to purge specific slots).
961 // TODO: Disable maintain() requests when they are pointless.
962 }
963
964 void
965 Rock::SwapDir::reference(StoreEntry &e)
966 {
967 debugs(47, 5, HERE << &e << ' ' << e.swap_dirn << ' ' << e.swap_filen);
968 if (repl && repl->Referenced)
969 repl->Referenced(repl, &e, &e.repl);
970 }
971
972 bool
973 Rock::SwapDir::dereference(StoreEntry &e)
974 {
975 debugs(47, 5, HERE << &e << ' ' << e.swap_dirn << ' ' << e.swap_filen);
976 if (repl && repl->Dereferenced)
977 repl->Dereferenced(repl, &e, &e.repl);
978
979 // no need to keep e in the global store_table for us; we have our own map
980 return false;
981 }
982
983 bool
984 Rock::SwapDir::unlinkdUseful() const
985 {
986 // no entry-specific files to unlink
987 return false;
988 }
989
990 void
991 Rock::SwapDir::unlink(StoreEntry &e)
992 {
993 debugs(47, 5, HERE << e);
994 ignoreReferences(e);
995 map->freeEntry(e.swap_filen);
996 disconnect(e);
997 }
998
999 void
1000 Rock::SwapDir::markForUnlink(StoreEntry &e)
1001 {
1002 debugs(47, 5, e);
1003 map->freeEntry(e.swap_filen);
1004 }
1005
1006 void
1007 Rock::SwapDir::trackReferences(StoreEntry &e)
1008 {
1009 debugs(47, 5, HERE << e);
1010 if (repl)
1011 repl->Add(repl, &e, &e.repl);
1012 }
1013
1014 void
1015 Rock::SwapDir::ignoreReferences(StoreEntry &e)
1016 {
1017 debugs(47, 5, HERE << e);
1018 if (repl)
1019 repl->Remove(repl, &e, &e.repl);
1020 }
1021
1022 void
1023 Rock::SwapDir::statfs(StoreEntry &e) const
1024 {
1025 storeAppendPrintf(&e, "\n");
1026 storeAppendPrintf(&e, "Maximum Size: %" PRIu64 " KB\n", maxSize() >> 10);
1027 storeAppendPrintf(&e, "Current Size: %.2f KB %.2f%%\n",
1028 currentSize() / 1024.0,
1029 Math::doublePercent(currentSize(), maxSize()));
1030
1031 const int entryLimit = entryLimitActual();
1032 const int slotLimit = slotLimitActual();
1033 storeAppendPrintf(&e, "Maximum entries: %9d\n", entryLimit);
1034 if (map && entryLimit > 0) {
1035 const int entryCount = map->entryCount();
1036 storeAppendPrintf(&e, "Current entries: %9d %.2f%%\n",
1037 entryCount, (100.0 * entryCount / entryLimit));
1038 }
1039
1040 storeAppendPrintf(&e, "Maximum slots: %9d\n", slotLimit);
1041 if (map && slotLimit > 0) {
1042 const unsigned int slotsFree = !freeSlots ? 0 : freeSlots->size();
1043 if (slotsFree <= static_cast<const unsigned int>(slotLimit)) {
1044 const int usedSlots = slotLimit - static_cast<const int>(slotsFree);
1045 storeAppendPrintf(&e, "Used slots: %9d %.2f%%\n",
1046 usedSlots, (100.0 * usedSlots / slotLimit));
1047 }
1048 if (slotLimit < 100) { // XXX: otherwise too expensive to count
1049 Ipc::ReadWriteLockStats stats;
1050 map->updateStats(stats);
1051 stats.dump(e);
1052 }
1053 }
1054
1055 storeAppendPrintf(&e, "Pending operations: %d out of %d\n",
1056 store_open_disk_fd, Config.max_open_disk_fds);
1057
1058 storeAppendPrintf(&e, "Flags:");
1059
1060 if (flags.selected)
1061 storeAppendPrintf(&e, " SELECTED");
1062
1063 if (flags.read_only)
1064 storeAppendPrintf(&e, " READ-ONLY");
1065
1066 storeAppendPrintf(&e, "\n");
1067
1068 }
1069
1070 SBuf
1071 Rock::SwapDir::inodeMapPath() const
1072 {
1073 return Ipc::Mem::Segment::Name(SBuf(path), "map");
1074 }
1075
1076 const char *
1077 Rock::SwapDir::freeSlotsPath() const
1078 {
1079 static String spacesPath;
1080 spacesPath = path;
1081 spacesPath.append("_spaces");
1082 return spacesPath.termedBuf();
1083 }
1084
1085 namespace Rock
1086 {
1087 RunnerRegistrationEntry(SwapDirRr);
1088 }
1089
1090 void Rock::SwapDirRr::create()
1091 {
1092 Must(mapOwners.empty() && freeSlotsOwners.empty());
1093 for (int i = 0; i < Config.cacheSwap.n_configured; ++i) {
1094 if (const Rock::SwapDir *const sd = dynamic_cast<Rock::SwapDir *>(INDEXSD(i))) {
1095 const int64_t capacity = sd->slotLimitActual();
1096
1097 SwapDir::DirMap::Owner *const mapOwner =
1098 SwapDir::DirMap::Init(sd->inodeMapPath(), capacity);
1099 mapOwners.push_back(mapOwner);
1100
1101 // TODO: somehow remove pool id and counters from PageStack?
1102 Ipc::Mem::Owner<Ipc::Mem::PageStack> *const freeSlotsOwner =
1103 shm_new(Ipc::Mem::PageStack)(sd->freeSlotsPath(),
1104 i+1, capacity, 0);
1105 freeSlotsOwners.push_back(freeSlotsOwner);
1106
1107 // TODO: add method to initialize PageStack with no free pages
1108 while (true) {
1109 Ipc::Mem::PageId pageId;
1110 if (!freeSlotsOwner->object()->pop(pageId))
1111 break;
1112 }
1113 }
1114 }
1115 }
1116
1117 Rock::SwapDirRr::~SwapDirRr()
1118 {
1119 for (size_t i = 0; i < mapOwners.size(); ++i) {
1120 delete mapOwners[i];
1121 delete freeSlotsOwners[i];
1122 }
1123 }
1124