]> git.ipfire.org Git - thirdparty/squid.git/blob - src/fs/rock/RockSwapDir.cc
Several fixes and improvements to help collapsed forwarding work reliably:
[thirdparty/squid.git] / src / fs / rock / RockSwapDir.cc
1 /*
2 * DEBUG: section 47 Store Directory Routines
3 */
4
5 #include "squid.h"
6 #include "cache_cf.h"
7 #include "CollapsedForwarding.h"
8 #include "ConfigOption.h"
9 #include "DiskIO/DiskIOModule.h"
10 #include "DiskIO/DiskIOStrategy.h"
11 #include "DiskIO/ReadRequest.h"
12 #include "DiskIO/WriteRequest.h"
13 #include "fs/rock/RockSwapDir.h"
14 #include "fs/rock/RockIoState.h"
15 #include "fs/rock/RockIoRequests.h"
16 #include "fs/rock/RockRebuild.h"
17 #include "globals.h"
18 #include "ipc/mem/Pages.h"
19 #include "MemObject.h"
20 #include "Parsing.h"
21 #include "SquidConfig.h"
22 #include "SquidMath.h"
23 #include "tools.h"
24
25 #include <cstdlib>
26 #include <iomanip>
27
28 #if HAVE_SYS_STAT_H
29 #include <sys/stat.h>
30 #endif
31
32 const int64_t Rock::SwapDir::HeaderSize = 16*1024;
33
34 Rock::SwapDir::SwapDir(): ::SwapDir("rock"),
35 slotSize(HeaderSize), filePath(NULL), map(NULL), io(NULL),
36 waitingForPage(NULL)
37 {
38 }
39
40 Rock::SwapDir::~SwapDir()
41 {
42 delete io;
43 delete map;
44 safe_free(filePath);
45 }
46
47 StoreSearch *
48 Rock::SwapDir::search(String const url, HttpRequest *)
49 {
50 assert(false);
51 return NULL; // XXX: implement
52 }
53
54 void
55 Rock::SwapDir::get(String const key, STOREGETCLIENT cb, void *data)
56 {
57 ::SwapDir::get(key, cb, data);
58 }
59
60 // called when Squid core needs a StoreEntry with a given key
61 StoreEntry *
62 Rock::SwapDir::get(const cache_key *key)
63 {
64 if (!map || !theFile || !theFile->canRead())
65 return NULL;
66
67 sfileno filen;
68 const Ipc::StoreMapAnchor *const slot = map->openForReading(key, filen);
69 if (!slot)
70 return NULL;
71
72 // create a brand new store entry and initialize it with stored basics
73 StoreEntry *e = new StoreEntry();
74 anchorEntry(*e, filen, *slot);
75
76 e->hashInsert(key);
77 trackReferences(*e);
78
79 return e;
80 // the disk entry remains open for reading, protected from modifications
81 }
82
83 bool
84 Rock::SwapDir::anchorCollapsed(StoreEntry &collapsed, bool &inSync)
85 {
86 if (!map || !theFile || !theFile->canRead())
87 return false;
88
89 sfileno filen;
90 const Ipc::StoreMapAnchor *const slot = map->openForReading(
91 reinterpret_cast<cache_key*>(collapsed.key), filen);
92 if (!slot)
93 return false;
94
95 anchorEntry(collapsed, filen, *slot);
96 inSync = updateCollapsedWith(collapsed, *slot);
97 return false;
98 }
99
100 bool
101 Rock::SwapDir::updateCollapsed(StoreEntry &collapsed)
102 {
103 if (!map || !theFile || !theFile->canRead())
104 return false;
105
106 if (collapsed.swap_filen < 0) // no longer using a disk cache
107 return true;
108 assert(collapsed.swap_dirn == index);
109
110 const Ipc::StoreMapAnchor &s = map->readableEntry(collapsed.swap_filen);
111 return updateCollapsedWith(collapsed, s);
112 }
113
114 bool
115 Rock::SwapDir::updateCollapsedWith(StoreEntry &collapsed, const Ipc::StoreMapAnchor &anchor)
116 {
117 collapsed.swap_file_sz = anchor.basics.swap_file_sz; // XXX: make atomic
118 return true;
119 }
120
121 void
122 Rock::SwapDir::anchorEntry(StoreEntry &e, const sfileno filen, const Ipc::StoreMapAnchor &anchor)
123 {
124 const Ipc::StoreMapAnchor::Basics &basics = anchor.basics;
125
126 e.swap_file_sz = basics.swap_file_sz;
127 e.swap_dirn = index;
128 e.swap_filen = filen;
129 e.lastref = basics.lastref;
130 e.timestamp = basics.timestamp;
131 e.expires = basics.expires;
132 e.lastmod = basics.lastmod;
133 e.refcount = basics.refcount;
134 e.flags = basics.flags;
135
136 e.store_status = STORE_OK;
137 e.setMemStatus(NOT_IN_MEMORY);
138 e.swap_status = SWAPOUT_DONE;
139 e.ping_status = PING_NONE;
140
141 EBIT_CLR(e.flags, RELEASE_REQUEST);
142 EBIT_CLR(e.flags, KEY_PRIVATE);
143 EBIT_SET(e.flags, ENTRY_VALIDATED);
144 }
145
146
147 void Rock::SwapDir::disconnect(StoreEntry &e)
148 {
149 assert(e.swap_dirn == index);
150 assert(e.swap_filen >= 0);
151 // cannot have SWAPOUT_NONE entry with swap_filen >= 0
152 assert(e.swap_status != SWAPOUT_NONE);
153
154 // do not rely on e.swap_status here because there is an async delay
155 // before it switches from SWAPOUT_WRITING to SWAPOUT_DONE.
156
157 // since e has swap_filen, its slot is locked for reading and/or writing
158 // but it is difficult to know whether THIS worker is reading or writing e
159 if (e.swap_status == SWAPOUT_WRITING ||
160 (e.mem_obj && e.mem_obj->swapout.sio != NULL))
161 map->abortWriting(e.swap_filen);
162 else
163 map->closeForReading(e.swap_filen);
164 e.swap_dirn = -1;
165 e.swap_filen = -1;
166 e.swap_status = SWAPOUT_NONE;
167 }
168
169 uint64_t
170 Rock::SwapDir::currentSize() const
171 {
172 const uint64_t spaceSize = !freeSlots ?
173 maxSize() : (slotSize * freeSlots->size());
174 // everything that is not free is in use
175 return maxSize() - spaceSize;
176 }
177
178 uint64_t
179 Rock::SwapDir::currentCount() const
180 {
181 return map ? map->entryCount() : 0;
182 }
183
184 /// In SMP mode only the disker process reports stats to avoid
185 /// counting the same stats by multiple processes.
186 bool
187 Rock::SwapDir::doReportStat() const
188 {
189 return ::SwapDir::doReportStat() && (!UsingSmp() || IamDiskProcess());
190 }
191
192 void
193 Rock::SwapDir::swappedOut(const StoreEntry &)
194 {
195 // stats are not stored but computed when needed
196 }
197
198 int64_t
199 Rock::SwapDir::entryLimitAllowed() const
200 {
201 const int64_t eLimitLo = map ? map->entryLimit() : 0; // dynamic shrinking unsupported
202 const int64_t eWanted = (maxSize() - HeaderSize)/slotSize;
203 return min(max(eLimitLo, eWanted), entryLimitHigh());
204 }
205
206 // TODO: encapsulate as a tool; identical to CossSwapDir::create()
207 void
208 Rock::SwapDir::create()
209 {
210 assert(path);
211 assert(filePath);
212
213 if (UsingSmp() && !IamDiskProcess()) {
214 debugs (47,3, HERE << "disker will create in " << path);
215 return;
216 }
217
218 debugs (47,3, HERE << "creating in " << path);
219
220 struct stat dir_sb;
221 if (::stat(path, &dir_sb) == 0) {
222 struct stat file_sb;
223 if (::stat(filePath, &file_sb) == 0) {
224 debugs (47, DBG_IMPORTANT, "Skipping existing Rock db: " << filePath);
225 return;
226 }
227 // else the db file is not there or is not accessible, and we will try
228 // to create it later below, generating a detailed error on failures.
229 } else { // path does not exist or is inaccessible
230 // If path exists but is not accessible, mkdir() below will fail, and
231 // the admin should see the error and act accordingly, so there is
232 // no need to distinguish ENOENT from other possible stat() errors.
233 debugs (47, DBG_IMPORTANT, "Creating Rock db directory: " << path);
234 const int res = mkdir(path, 0700);
235 if (res != 0)
236 createError("mkdir");
237 }
238
239 debugs (47, DBG_IMPORTANT, "Creating Rock db: " << filePath);
240 const int swap = open(filePath, O_WRONLY|O_CREAT|O_TRUNC|O_BINARY, 0600);
241 if (swap < 0)
242 createError("create");
243
244 #if SLOWLY_FILL_WITH_ZEROS
245 char block[1024];
246 Must(maxSize() % sizeof(block) == 0);
247 memset(block, '\0', sizeof(block));
248
249 for (off_t offset = 0; offset < maxSize(); offset += sizeof(block)) {
250 if (write(swap, block, sizeof(block)) != sizeof(block))
251 createError("write");
252 }
253 #else
254 if (ftruncate(swap, maxSize()) != 0)
255 createError("truncate");
256
257 char header[HeaderSize];
258 memset(header, '\0', sizeof(header));
259 if (write(swap, header, sizeof(header)) != sizeof(header))
260 createError("write");
261 #endif
262
263 close(swap);
264 }
265
266 // report Rock DB creation error and exit
267 void
268 Rock::SwapDir::createError(const char *const msg) {
269 debugs(47, DBG_CRITICAL, "ERROR: Failed to initialize Rock Store db in " <<
270 filePath << "; " << msg << " error: " << xstrerror());
271 fatal("Rock Store db creation error");
272 }
273
274 void
275 Rock::SwapDir::init()
276 {
277 debugs(47,2, HERE);
278
279 // XXX: SwapDirs aren't refcounted. We make IORequestor calls, which
280 // are refcounted. We up our count once to avoid implicit delete's.
281 lock();
282
283 freeSlots = shm_old(Ipc::Mem::PageStack)(freeSlotsPath());
284
285 Must(!map);
286 map = new DirMap(inodeMapPath());
287 map->cleaner = this;
288
289 const char *ioModule = needsDiskStrand() ? "IpcIo" : "Blocking";
290 if (DiskIOModule *m = DiskIOModule::Find(ioModule)) {
291 debugs(47,2, HERE << "Using DiskIO module: " << ioModule);
292 io = m->createStrategy();
293 io->init();
294 } else {
295 debugs(47, DBG_CRITICAL, "FATAL: Rock store is missing DiskIO module: " <<
296 ioModule);
297 fatal("Rock Store missing a required DiskIO module");
298 }
299
300 theFile = io->newFile(filePath);
301 theFile->configure(fileConfig);
302 theFile->open(O_RDWR, 0644, this);
303
304 // Increment early. Otherwise, if one SwapDir finishes rebuild before
305 // others start, storeRebuildComplete() will think the rebuild is over!
306 // TODO: move store_dirs_rebuilding hack to store modules that need it.
307 ++StoreController::store_dirs_rebuilding;
308 }
309
310 bool
311 Rock::SwapDir::needsDiskStrand() const
312 {
313 const bool wontEvenWorkWithoutDisker = Config.workers > 1;
314 const bool wouldWorkBetterWithDisker = DiskIOModule::Find("IpcIo");
315 return InDaemonMode() && (wontEvenWorkWithoutDisker ||
316 wouldWorkBetterWithDisker);
317 }
318
319 void
320 Rock::SwapDir::parse(int anIndex, char *aPath)
321 {
322 index = anIndex;
323
324 path = xstrdup(aPath);
325
326 // cache store is located at path/db
327 String fname(path);
328 fname.append("/rock");
329 filePath = xstrdup(fname.termedBuf());
330
331 parseSize(false);
332 parseOptions(0);
333
334 // Current openForWriting() code overwrites the old slot if needed
335 // and possible, so proactively removing old slots is probably useless.
336 assert(!repl); // repl = createRemovalPolicy(Config.replPolicy);
337
338 validateOptions();
339 }
340
341 void
342 Rock::SwapDir::reconfigure()
343 {
344 parseSize(true);
345 parseOptions(1);
346 // TODO: can we reconfigure the replacement policy (repl)?
347 validateOptions();
348 }
349
350 /// parse maximum db disk size
351 void
352 Rock::SwapDir::parseSize(const bool reconfig)
353 {
354 const int i = GetInteger();
355 if (i < 0)
356 fatal("negative Rock cache_dir size value");
357 const uint64_t new_max_size =
358 static_cast<uint64_t>(i) << 20; // MBytes to Bytes
359 if (!reconfig)
360 max_size = new_max_size;
361 else if (new_max_size != max_size) {
362 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir '" << path << "' size "
363 "cannot be changed dynamically, value left unchanged (" <<
364 (max_size >> 20) << " MB)");
365 }
366 }
367
368 ConfigOption *
369 Rock::SwapDir::getOptionTree() const
370 {
371 ConfigOptionVector *vector = dynamic_cast<ConfigOptionVector*>(::SwapDir::getOptionTree());
372 assert(vector);
373 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseSizeOption, &SwapDir::dumpSizeOption));
374 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseTimeOption, &SwapDir::dumpTimeOption));
375 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseRateOption, &SwapDir::dumpRateOption));
376 return vector;
377 }
378
379 bool
380 Rock::SwapDir::allowOptionReconfigure(const char *const option) const
381 {
382 return strcmp(option, "slot-size") != 0 &&
383 ::SwapDir::allowOptionReconfigure(option);
384 }
385
386 /// parses time-specific options; mimics ::SwapDir::optionObjectSizeParse()
387 bool
388 Rock::SwapDir::parseTimeOption(char const *option, const char *value, int reconfig)
389 {
390 // TODO: ::SwapDir or, better, Config should provide time-parsing routines,
391 // including time unit handling. Same for size and rate.
392
393 time_msec_t *storedTime;
394 if (strcmp(option, "swap-timeout") == 0)
395 storedTime = &fileConfig.ioTimeout;
396 else
397 return false;
398
399 if (!value)
400 self_destruct();
401
402 // TODO: handle time units and detect parsing errors better
403 const int64_t parsedValue = strtoll(value, NULL, 10);
404 if (parsedValue < 0) {
405 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
406 self_destruct();
407 }
408
409 const time_msec_t newTime = static_cast<time_msec_t>(parsedValue);
410
411 if (!reconfig)
412 *storedTime = newTime;
413 else if (*storedTime != newTime) {
414 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
415 << " cannot be changed dynamically, value left unchanged: " <<
416 *storedTime);
417 }
418
419 return true;
420 }
421
422 /// reports time-specific options; mimics ::SwapDir::optionObjectSizeDump()
423 void
424 Rock::SwapDir::dumpTimeOption(StoreEntry * e) const
425 {
426 if (fileConfig.ioTimeout)
427 storeAppendPrintf(e, " swap-timeout=%" PRId64,
428 static_cast<int64_t>(fileConfig.ioTimeout));
429 }
430
431 /// parses rate-specific options; mimics ::SwapDir::optionObjectSizeParse()
432 bool
433 Rock::SwapDir::parseRateOption(char const *option, const char *value, int isaReconfig)
434 {
435 int *storedRate;
436 if (strcmp(option, "max-swap-rate") == 0)
437 storedRate = &fileConfig.ioRate;
438 else
439 return false;
440
441 if (!value)
442 self_destruct();
443
444 // TODO: handle time units and detect parsing errors better
445 const int64_t parsedValue = strtoll(value, NULL, 10);
446 if (parsedValue < 0) {
447 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
448 self_destruct();
449 }
450
451 const int newRate = static_cast<int>(parsedValue);
452
453 if (newRate < 0) {
454 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << newRate);
455 self_destruct();
456 }
457
458 if (!isaReconfig)
459 *storedRate = newRate;
460 else if (*storedRate != newRate) {
461 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
462 << " cannot be changed dynamically, value left unchanged: " <<
463 *storedRate);
464 }
465
466 return true;
467 }
468
469 /// reports rate-specific options; mimics ::SwapDir::optionObjectSizeDump()
470 void
471 Rock::SwapDir::dumpRateOption(StoreEntry * e) const
472 {
473 if (fileConfig.ioRate >= 0)
474 storeAppendPrintf(e, " max-swap-rate=%d", fileConfig.ioRate);
475 }
476
477 /// parses size-specific options; mimics ::SwapDir::optionObjectSizeParse()
478 bool
479 Rock::SwapDir::parseSizeOption(char const *option, const char *value, int reconfig)
480 {
481 uint64_t *storedSize;
482 if (strcmp(option, "slot-size") == 0)
483 storedSize = &slotSize;
484 else
485 return false;
486
487 if (!value)
488 self_destruct();
489
490 // TODO: handle size units and detect parsing errors better
491 const uint64_t newSize = strtoll(value, NULL, 10);
492 if (newSize <= 0) {
493 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must be positive; got: " << newSize);
494 self_destruct();
495 }
496
497 if (newSize <= sizeof(DbCellHeader)) {
498 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must exceed " << sizeof(DbCellHeader) << "; got: " << newSize);
499 self_destruct();
500 }
501
502 if (!reconfig)
503 *storedSize = newSize;
504 else if (*storedSize != newSize) {
505 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
506 << " cannot be changed dynamically, value left unchanged: " <<
507 *storedSize);
508 }
509
510 return true;
511 }
512
513 /// reports size-specific options; mimics ::SwapDir::optionObjectSizeDump()
514 void
515 Rock::SwapDir::dumpSizeOption(StoreEntry * e) const
516 {
517 storeAppendPrintf(e, " slot-size=%" PRId64, slotSize);
518 }
519
520 /// check the results of the configuration; only level-0 debugging works here
521 void
522 Rock::SwapDir::validateOptions()
523 {
524 if (slotSize <= 0)
525 fatal("Rock store requires a positive slot-size");
526
527 const int64_t maxSizeRoundingWaste = 1024 * 1024; // size is configured in MB
528 const int64_t slotSizeRoundingWaste = slotSize;
529 const int64_t maxRoundingWaste =
530 max(maxSizeRoundingWaste, slotSizeRoundingWaste);
531 const int64_t usableDiskSize = diskOffset(entryLimitAllowed());
532 const int64_t diskWasteSize = maxSize() - usableDiskSize;
533 Must(diskWasteSize >= 0);
534
535 // warn if maximum db size is not reachable due to sfileno limit
536 if (entryLimitAllowed() == entryLimitHigh() &&
537 diskWasteSize >= maxRoundingWaste) {
538 debugs(47, DBG_CRITICAL, "Rock store cache_dir[" << index << "] '" << path << "':");
539 debugs(47, DBG_CRITICAL, "\tmaximum number of entries: " << entryLimitAllowed());
540 debugs(47, DBG_CRITICAL, "\tdb slot size: " << slotSize << " Bytes");
541 debugs(47, DBG_CRITICAL, "\tmaximum db size: " << maxSize() << " Bytes");
542 debugs(47, DBG_CRITICAL, "\tusable db size: " << usableDiskSize << " Bytes");
543 debugs(47, DBG_CRITICAL, "\tdisk space waste: " << diskWasteSize << " Bytes");
544 debugs(47, DBG_CRITICAL, "WARNING: Rock store config wastes space.");
545 }
546 }
547
548 void
549 Rock::SwapDir::rebuild()
550 {
551 //++StoreController::store_dirs_rebuilding; // see Rock::SwapDir::init()
552 AsyncJob::Start(new Rebuild(this));
553 }
554
555 bool
556 Rock::SwapDir::canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const
557 {
558 if (!::SwapDir::canStore(e, sizeof(DbCellHeader)+diskSpaceNeeded, load))
559 return false;
560
561 if (!theFile || !theFile->canWrite())
562 return false;
563
564 if (!map)
565 return false;
566
567 // Do not start I/O transaction if there are less than 10% free pages left.
568 // TODO: reserve page instead
569 if (needsDiskStrand() &&
570 Ipc::Mem::PageLevel(Ipc::Mem::PageId::ioPage) >= 0.9 * Ipc::Mem::PageLimit(Ipc::Mem::PageId::ioPage)) {
571 debugs(47, 5, HERE << "too few shared pages for IPC I/O left");
572 return false;
573 }
574
575 if (io->shedLoad())
576 return false;
577
578 load = io->load();
579 return true;
580 }
581
582 StoreIOState::Pointer
583 Rock::SwapDir::createStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
584 {
585 if (!theFile || theFile->error()) {
586 debugs(47,4, HERE << theFile);
587 return NULL;
588 }
589
590 sfileno filen;
591 Ipc::StoreMapAnchor *const slot =
592 map->openForWriting(reinterpret_cast<const cache_key *>(e.key), filen);
593 if (!slot) {
594 debugs(47, 5, HERE << "map->add failed");
595 return NULL;
596 }
597
598 assert(filen >= 0);
599 slot->set(e);
600
601 // XXX: We rely on our caller, storeSwapOutStart(), to set e.fileno.
602 // If that does not happen, the entry will not decrement the read level!
603
604 Rock::SwapDir::Pointer self(this);
605 IoState *sio = new IoState(self, &e, cbFile, cbIo, data);
606
607 sio->swap_dirn = index;
608 sio->swap_filen = filen;
609 sio->writeableAnchor_ = slot;
610
611 debugs(47,5, HERE << "dir " << index << " created new filen " <<
612 std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
613 sio->swap_filen << std::dec << " starting at " <<
614 diskOffset(sio->swap_filen));
615
616 sio->file(theFile);
617
618 trackReferences(e);
619 return sio;
620 }
621
622 int64_t
623 Rock::SwapDir::diskOffset(int filen) const
624 {
625 assert(filen >= 0);
626 return HeaderSize + slotSize*filen;
627 }
628
629 int64_t
630 Rock::SwapDir::diskOffset(Ipc::Mem::PageId &pageId) const
631 {
632 assert(pageId);
633 return diskOffset(pageId.number - 1);
634 }
635
636 int64_t
637 Rock::SwapDir::diskOffsetLimit() const
638 {
639 assert(map);
640 return diskOffset(map->entryLimit());
641 }
642
643 int
644 Rock::SwapDir::entryMaxPayloadSize() const
645 {
646 return slotSize - sizeof(DbCellHeader);
647 }
648
649 int
650 Rock::SwapDir::entriesNeeded(const int64_t objSize) const
651 {
652 return (objSize + entryMaxPayloadSize() - 1) / entryMaxPayloadSize();
653 }
654
655 bool
656 Rock::SwapDir::useFreeSlot(Ipc::Mem::PageId &pageId)
657 {
658 if (freeSlots->pop(pageId)) {
659 debugs(47, 5, "got a previously free slot: " << pageId);
660 return true;
661 }
662
663 // catch free slots delivered to noteFreeMapSlice()
664 assert(!waitingForPage);
665 waitingForPage = &pageId;
666 if (map->purgeOne()) {
667 assert(!waitingForPage); // noteFreeMapSlice() should have cleared it
668 assert(pageId.set());
669 debugs(47, 5, "got a previously busy slot: " << pageId);
670 return true;
671 }
672 assert(waitingForPage == &pageId);
673 waitingForPage = NULL;
674
675 debugs(47, 3, "cannot get a slot; entries: " << map->entryCount());
676 return false;
677 }
678
679 bool
680 Rock::SwapDir::validSlotId(const SlotId slotId) const
681 {
682 return 0 <= slotId && slotId < entryLimitAllowed();
683 }
684
685 void
686 Rock::SwapDir::noteFreeMapSlice(const sfileno sliceId)
687 {
688 Ipc::Mem::PageId pageId;
689 pageId.pool = index+1;
690 pageId.number = sliceId+1;
691 if (waitingForPage) {
692 *waitingForPage = pageId;
693 waitingForPage = NULL;
694 } else {
695 freeSlots->push(pageId);
696 }
697 }
698
699 // tries to open an old entry with swap_filen for reading
700 StoreIOState::Pointer
701 Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
702 {
703 if (!theFile || theFile->error()) {
704 debugs(47,4, HERE << theFile);
705 return NULL;
706 }
707
708 if (e.swap_filen < 0) {
709 debugs(47,4, HERE << e);
710 return NULL;
711 }
712
713 // Do not start I/O transaction if there are less than 10% free pages left.
714 // TODO: reserve page instead
715 if (needsDiskStrand() &&
716 Ipc::Mem::PageLevel(Ipc::Mem::PageId::ioPage) >= 0.9 * Ipc::Mem::PageLimit(Ipc::Mem::PageId::ioPage)) {
717 debugs(47, 5, HERE << "too few shared pages for IPC I/O left");
718 return NULL;
719 }
720
721 // The are two ways an entry can get swap_filen: our get() locked it for
722 // reading or our storeSwapOutStart() locked it for writing. Peeking at our
723 // locked entry is safe, but no support for reading a filling entry.
724 const Ipc::StoreMapAnchor *slot = map->peekAtReader(e.swap_filen);
725 if (!slot)
726 return NULL; // we were writing afterall
727
728 Rock::SwapDir::Pointer self(this);
729 IoState *sio = new IoState(self, &e, cbFile, cbIo, data);
730
731 sio->swap_dirn = index;
732 sio->swap_filen = e.swap_filen;
733 sio->readableAnchor_ = slot;
734 sio->file(theFile);
735
736 debugs(47,5, HERE << "dir " << index << " has old filen: " <<
737 std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
738 sio->swap_filen);
739
740 assert(slot->sameKey(static_cast<const cache_key*>(e.key)));
741 assert(slot->basics.swap_file_sz > 0);
742 assert(slot->basics.swap_file_sz == e.swap_file_sz);
743
744 return sio;
745 }
746
747 void
748 Rock::SwapDir::ioCompletedNotification()
749 {
750 if (!theFile)
751 fatalf("Rock cache_dir failed to initialize db file: %s", filePath);
752
753 if (theFile->error())
754 fatalf("Rock cache_dir at %s failed to open db file: %s", filePath,
755 xstrerror());
756
757 debugs(47, 2, "Rock cache_dir[" << index << "] limits: " <<
758 std::setw(12) << maxSize() << " disk bytes and " <<
759 std::setw(7) << map->entryLimit() << " entries");
760
761 rebuild();
762 }
763
764 void
765 Rock::SwapDir::closeCompleted()
766 {
767 theFile = NULL;
768 }
769
770 void
771 Rock::SwapDir::readCompleted(const char *buf, int rlen, int errflag, RefCount< ::ReadRequest> r)
772 {
773 ReadRequest *request = dynamic_cast<Rock::ReadRequest*>(r.getRaw());
774 assert(request);
775 IoState::Pointer sio = request->sio;
776
777 if (errflag == DISK_OK && rlen > 0)
778 sio->offset_ += rlen;
779
780 StoreIOState::STRCB *callb = sio->read.callback;
781 assert(callb);
782 sio->read.callback = NULL;
783 void *cbdata;
784 if (cbdataReferenceValidDone(sio->read.callback_data, &cbdata))
785 callb(cbdata, r->buf, rlen, sio.getRaw());
786 }
787
788 void
789 Rock::SwapDir::writeCompleted(int errflag, size_t rlen, RefCount< ::WriteRequest> r)
790 {
791 Rock::WriteRequest *request = dynamic_cast<Rock::WriteRequest*>(r.getRaw());
792 assert(request);
793 assert(request->sio != NULL);
794 IoState &sio = *request->sio;
795
796 // quit if somebody called IoState::close() while we were waiting
797 if (!sio.stillWaiting()) {
798 debugs(79, 3, "ignoring closed entry " << sio.swap_filen);
799 return;
800 }
801
802 if (errflag == DISK_OK) {
803 // do not increment sio.offset_ because we do it in sio->write()
804
805 // finalize the shared slice info after writing slice contents to disk
806 Ipc::StoreMap::Slice &slice =
807 map->writeableSlice(sio.swap_filen, request->sidCurrent);
808 slice.size = request->len - sizeof(DbCellHeader);
809 slice.next = request->sidNext;
810
811 if (request->sidNext < 0) {
812 // close, the entry gets the read lock
813 map->closeForWriting(sio.swap_filen, true);
814 sio.finishedWriting(errflag);
815 }
816 } else {
817 writeError(*sio.e);
818 sio.finishedWriting(errflag);
819 // and hope that Core will call disconnect() to close the map entry
820 }
821
822 CollapsedForwarding::Broadcast(*sio.e);
823 }
824
825 void
826 Rock::SwapDir::writeError(StoreEntry &e)
827 {
828 // Do not abortWriting here. The entry should keep the write lock
829 // instead of losing association with the store and confusing core.
830 map->freeEntry(e.swap_filen); // will mark as unusable, just in case
831
832 Store::Root().transientsAbandon(e);
833
834 // All callers must also call IoState callback, to propagate the error.
835 }
836
837 bool
838 Rock::SwapDir::full() const
839 {
840 return freeSlots != NULL && !freeSlots->size();
841 }
842
843 // storeSwapOutFileClosed calls this nethod on DISK_NO_SPACE_LEFT,
844 // but it should not happen for us
845 void
846 Rock::SwapDir::diskFull()
847 {
848 debugs(20, DBG_IMPORTANT, "BUG: No space left with rock cache_dir: " <<
849 filePath);
850 }
851
852 /// purge while full(); it should be sufficient to purge just one
853 void
854 Rock::SwapDir::maintain()
855 {
856 // The Store calls this to free some db space, but there is nothing wrong
857 // with a full() db, except when db has to shrink after reconfigure, and
858 // we do not support shrinking yet (it would have to purge specific slots).
859 // TODO: Disable maintain() requests when they are pointless.
860 }
861
862 void
863 Rock::SwapDir::reference(StoreEntry &e)
864 {
865 debugs(47, 5, HERE << &e << ' ' << e.swap_dirn << ' ' << e.swap_filen);
866 if (repl && repl->Referenced)
867 repl->Referenced(repl, &e, &e.repl);
868 }
869
870 bool
871 Rock::SwapDir::dereference(StoreEntry &e, bool)
872 {
873 debugs(47, 5, HERE << &e << ' ' << e.swap_dirn << ' ' << e.swap_filen);
874 if (repl && repl->Dereferenced)
875 repl->Dereferenced(repl, &e, &e.repl);
876
877 // no need to keep e in the global store_table for us; we have our own map
878 return false;
879 }
880
881 bool
882 Rock::SwapDir::unlinkdUseful() const
883 {
884 // no entry-specific files to unlink
885 return false;
886 }
887
888 void
889 Rock::SwapDir::unlink(StoreEntry &e)
890 {
891 debugs(47, 5, HERE << e);
892 ignoreReferences(e);
893 map->freeEntry(e.swap_filen);
894 disconnect(e);
895 }
896
897 void
898 Rock::SwapDir::markForUnlink(StoreEntry &e)
899 {
900 debugs(47, 5, e);
901 map->freeEntry(e.swap_filen);
902 }
903
904 void
905 Rock::SwapDir::trackReferences(StoreEntry &e)
906 {
907 debugs(47, 5, HERE << e);
908 if (repl)
909 repl->Add(repl, &e, &e.repl);
910 }
911
912 void
913 Rock::SwapDir::ignoreReferences(StoreEntry &e)
914 {
915 debugs(47, 5, HERE << e);
916 if (repl)
917 repl->Remove(repl, &e, &e.repl);
918 }
919
920 void
921 Rock::SwapDir::statfs(StoreEntry &e) const
922 {
923 storeAppendPrintf(&e, "\n");
924 storeAppendPrintf(&e, "Maximum Size: %" PRIu64 " KB\n", maxSize() >> 10);
925 storeAppendPrintf(&e, "Current Size: %.2f KB %.2f%%\n",
926 currentSize() / 1024.0,
927 Math::doublePercent(currentSize(), maxSize()));
928
929 if (map) {
930 const int limit = map->entryLimit();
931 storeAppendPrintf(&e, "Maximum entries: %9d\n", limit);
932 if (limit > 0) {
933 const int entryCount = map->entryCount();
934 storeAppendPrintf(&e, "Current entries: %9d %.2f%%\n",
935 entryCount, (100.0 * entryCount / limit));
936
937 const unsigned int slotsFree = !freeSlots ? 0 : freeSlots->size();
938 if (slotsFree <= static_cast<const unsigned int>(limit)) {
939 const int usedSlots = limit - static_cast<const int>(slotsFree);
940 storeAppendPrintf(&e, "Used slots: %9d %.2f%%\n",
941 usedSlots, (100.0 * usedSlots / limit));
942 }
943 if (limit < 100) { // XXX: otherwise too expensive to count
944 Ipc::ReadWriteLockStats stats;
945 map->updateStats(stats);
946 stats.dump(e);
947 }
948 }
949 }
950
951 storeAppendPrintf(&e, "Pending operations: %d out of %d\n",
952 store_open_disk_fd, Config.max_open_disk_fds);
953
954 storeAppendPrintf(&e, "Flags:");
955
956 if (flags.selected)
957 storeAppendPrintf(&e, " SELECTED");
958
959 if (flags.read_only)
960 storeAppendPrintf(&e, " READ-ONLY");
961
962 storeAppendPrintf(&e, "\n");
963
964 }
965
966 const char *
967 Rock::SwapDir::inodeMapPath() const {
968 static String inodesPath;
969 inodesPath = path;
970 inodesPath.append("_inodes");
971 return inodesPath.termedBuf();
972 }
973
974 const char *
975 Rock::SwapDir::freeSlotsPath() const {
976 static String spacesPath;
977 spacesPath = path;
978 spacesPath.append("_spaces");
979 return spacesPath.termedBuf();
980 }
981
982 namespace Rock
983 {
984 RunnerRegistrationEntry(rrAfterConfig, SwapDirRr);
985 }
986
987 void Rock::SwapDirRr::create(const RunnerRegistry &)
988 {
989 Must(mapOwners.empty() && freeSlotsOwners.empty());
990 for (int i = 0; i < Config.cacheSwap.n_configured; ++i) {
991 if (const Rock::SwapDir *const sd = dynamic_cast<Rock::SwapDir *>(INDEXSD(i))) {
992 const int64_t capacity = sd->entryLimitAllowed();
993
994 SwapDir::DirMap::Owner *const mapOwner =
995 SwapDir::DirMap::Init(sd->inodeMapPath(), capacity);
996 mapOwners.push_back(mapOwner);
997
998 // XXX: remove pool id and counters from PageStack
999 Ipc::Mem::Owner<Ipc::Mem::PageStack> *const freeSlotsOwner =
1000 shm_new(Ipc::Mem::PageStack)(sd->freeSlotsPath(),
1001 i+1, capacity,
1002 sizeof(DbCellHeader));
1003 freeSlotsOwners.push_back(freeSlotsOwner);
1004
1005 // XXX: add method to initialize PageStack with no free pages
1006 while (true) {
1007 Ipc::Mem::PageId pageId;
1008 if (!freeSlotsOwner->object()->pop(pageId))
1009 break;
1010 }
1011 }
1012 }
1013 }
1014
1015 Rock::SwapDirRr::~SwapDirRr()
1016 {
1017 for (size_t i = 0; i < mapOwners.size(); ++i) {
1018 delete mapOwners[i];
1019 delete freeSlotsOwners[i];
1020 }
1021 }