]> git.ipfire.org Git - thirdparty/squid.git/blob - src/fs/rock/RockSwapDir.cc
Synced collapsed forwarding in rock store with working shared mem cache code.
[thirdparty/squid.git] / src / fs / rock / RockSwapDir.cc
1 /*
2 * DEBUG: section 47 Store Directory Routines
3 */
4
5 #include "squid.h"
6 #include "cache_cf.h"
7 #include "CollapsedForwarding.h"
8 #include "ConfigOption.h"
9 #include "DiskIO/DiskIOModule.h"
10 #include "DiskIO/DiskIOStrategy.h"
11 #include "DiskIO/ReadRequest.h"
12 #include "DiskIO/WriteRequest.h"
13 #include "fs/rock/RockSwapDir.h"
14 #include "fs/rock/RockIoState.h"
15 #include "fs/rock/RockIoRequests.h"
16 #include "fs/rock/RockRebuild.h"
17 #include "globals.h"
18 #include "ipc/mem/Pages.h"
19 #include "MemObject.h"
20 #include "Parsing.h"
21 #include "SquidConfig.h"
22 #include "SquidMath.h"
23 #include "tools.h"
24
25 #include <cstdlib>
26 #include <iomanip>
27
28 #if HAVE_SYS_STAT_H
29 #include <sys/stat.h>
30 #endif
31
32 const int64_t Rock::SwapDir::HeaderSize = 16*1024;
33
34 Rock::SwapDir::SwapDir(): ::SwapDir("rock"),
35 slotSize(HeaderSize), filePath(NULL), map(NULL), io(NULL),
36 waitingForPage(NULL)
37 {
38 }
39
40 Rock::SwapDir::~SwapDir()
41 {
42 delete io;
43 delete map;
44 safe_free(filePath);
45 }
46
47 StoreSearch *
48 Rock::SwapDir::search(String const url, HttpRequest *)
49 {
50 assert(false);
51 return NULL; // XXX: implement
52 }
53
54 void
55 Rock::SwapDir::get(String const key, STOREGETCLIENT cb, void *data)
56 {
57 ::SwapDir::get(key, cb, data);
58 }
59
60 // called when Squid core needs a StoreEntry with a given key
61 StoreEntry *
62 Rock::SwapDir::get(const cache_key *key)
63 {
64 if (!map || !theFile || !theFile->canRead())
65 return NULL;
66
67 sfileno filen;
68 const Ipc::StoreMapAnchor *const slot = map->openForReading(key, filen);
69 if (!slot)
70 return NULL;
71
72 // create a brand new store entry and initialize it with stored basics
73 StoreEntry *e = new StoreEntry();
74 anchorEntry(*e, filen, *slot);
75
76 e->hashInsert(key);
77 trackReferences(*e);
78
79 return e;
80 // the disk entry remains open for reading, protected from modifications
81 }
82
83 bool
84 Rock::SwapDir::anchorCollapsed(StoreEntry &collapsed, bool &inSync)
85 {
86 if (!map || !theFile || !theFile->canRead())
87 return false;
88
89 sfileno filen;
90 const Ipc::StoreMapAnchor *const slot = map->openForReading(
91 reinterpret_cast<cache_key*>(collapsed.key), filen);
92 if (!slot)
93 return false;
94
95 anchorEntry(collapsed, filen, *slot);
96 inSync = updateCollapsedWith(collapsed, *slot);
97 return true; // even if inSync is false
98 }
99
100 bool
101 Rock::SwapDir::updateCollapsed(StoreEntry &collapsed)
102 {
103 if (!map || !theFile || !theFile->canRead())
104 return false;
105
106 if (collapsed.swap_filen < 0) // no longer using a disk cache
107 return true;
108 assert(collapsed.swap_dirn == index);
109
110 const Ipc::StoreMapAnchor &s = map->readableEntry(collapsed.swap_filen);
111 return updateCollapsedWith(collapsed, s);
112 }
113
114 bool
115 Rock::SwapDir::updateCollapsedWith(StoreEntry &collapsed, const Ipc::StoreMapAnchor &anchor)
116 {
117 collapsed.swap_file_sz = anchor.basics.swap_file_sz;
118 return true;
119 }
120
121 void
122 Rock::SwapDir::anchorEntry(StoreEntry &e, const sfileno filen, const Ipc::StoreMapAnchor &anchor)
123 {
124 const Ipc::StoreMapAnchor::Basics &basics = anchor.basics;
125
126 e.swap_file_sz = basics.swap_file_sz;
127 e.lastref = basics.lastref;
128 e.timestamp = basics.timestamp;
129 e.expires = basics.expires;
130 e.lastmod = basics.lastmod;
131 e.refcount = basics.refcount;
132 e.flags = basics.flags;
133
134 if (anchor.complete()) {
135 e.store_status = STORE_OK;
136 e.swap_status = SWAPOUT_DONE;
137 } else {
138 e.store_status = STORE_PENDING;
139 e.swap_status = SWAPOUT_WRITING; // even though another worker writes?
140 }
141
142 e.ping_status = PING_NONE;
143
144 EBIT_CLR(e.flags, RELEASE_REQUEST);
145 EBIT_CLR(e.flags, KEY_PRIVATE);
146 EBIT_SET(e.flags, ENTRY_VALIDATED);
147
148 e.swap_dirn = index;
149 e.swap_filen = filen;
150 }
151
152
153 void Rock::SwapDir::disconnect(StoreEntry &e)
154 {
155 assert(e.swap_dirn == index);
156 assert(e.swap_filen >= 0);
157 // cannot have SWAPOUT_NONE entry with swap_filen >= 0
158 assert(e.swap_status != SWAPOUT_NONE);
159
160 // do not rely on e.swap_status here because there is an async delay
161 // before it switches from SWAPOUT_WRITING to SWAPOUT_DONE.
162
163 // since e has swap_filen, its slot is locked for reading and/or writing
164 // but it is difficult to know whether THIS worker is reading or writing e
165 if (e.swap_status == SWAPOUT_WRITING ||
166 (e.mem_obj && e.mem_obj->swapout.sio != NULL)) {
167 map->abortWriting(e.swap_filen);
168 e.swap_dirn = -1;
169 e.swap_filen = -1;
170 e.swap_status = SWAPOUT_NONE;
171 Store::Root().transientsAbandon(e); // broadcasts after the change
172 } else {
173 map->closeForReading(e.swap_filen);
174 e.swap_dirn = -1;
175 e.swap_filen = -1;
176 e.swap_status = SWAPOUT_NONE;
177 }
178 }
179
180 uint64_t
181 Rock::SwapDir::currentSize() const
182 {
183 const uint64_t spaceSize = !freeSlots ?
184 maxSize() : (slotSize * freeSlots->size());
185 // everything that is not free is in use
186 return maxSize() - spaceSize;
187 }
188
189 uint64_t
190 Rock::SwapDir::currentCount() const
191 {
192 return map ? map->entryCount() : 0;
193 }
194
195 /// In SMP mode only the disker process reports stats to avoid
196 /// counting the same stats by multiple processes.
197 bool
198 Rock::SwapDir::doReportStat() const
199 {
200 return ::SwapDir::doReportStat() && (!UsingSmp() || IamDiskProcess());
201 }
202
203 void
204 Rock::SwapDir::swappedOut(const StoreEntry &)
205 {
206 // stats are not stored but computed when needed
207 }
208
209 int64_t
210 Rock::SwapDir::entryLimitAllowed() const
211 {
212 const int64_t eLimitLo = map ? map->entryLimit() : 0; // dynamic shrinking unsupported
213 const int64_t eWanted = (maxSize() - HeaderSize)/slotSize;
214 return min(max(eLimitLo, eWanted), entryLimitHigh());
215 }
216
217 // TODO: encapsulate as a tool; identical to CossSwapDir::create()
218 void
219 Rock::SwapDir::create()
220 {
221 assert(path);
222 assert(filePath);
223
224 if (UsingSmp() && !IamDiskProcess()) {
225 debugs (47,3, HERE << "disker will create in " << path);
226 return;
227 }
228
229 debugs (47,3, HERE << "creating in " << path);
230
231 struct stat dir_sb;
232 if (::stat(path, &dir_sb) == 0) {
233 struct stat file_sb;
234 if (::stat(filePath, &file_sb) == 0) {
235 debugs (47, DBG_IMPORTANT, "Skipping existing Rock db: " << filePath);
236 return;
237 }
238 // else the db file is not there or is not accessible, and we will try
239 // to create it later below, generating a detailed error on failures.
240 } else { // path does not exist or is inaccessible
241 // If path exists but is not accessible, mkdir() below will fail, and
242 // the admin should see the error and act accordingly, so there is
243 // no need to distinguish ENOENT from other possible stat() errors.
244 debugs (47, DBG_IMPORTANT, "Creating Rock db directory: " << path);
245 const int res = mkdir(path, 0700);
246 if (res != 0)
247 createError("mkdir");
248 }
249
250 debugs (47, DBG_IMPORTANT, "Creating Rock db: " << filePath);
251 const int swap = open(filePath, O_WRONLY|O_CREAT|O_TRUNC|O_BINARY, 0600);
252 if (swap < 0)
253 createError("create");
254
255 #if SLOWLY_FILL_WITH_ZEROS
256 char block[1024];
257 Must(maxSize() % sizeof(block) == 0);
258 memset(block, '\0', sizeof(block));
259
260 for (off_t offset = 0; offset < maxSize(); offset += sizeof(block)) {
261 if (write(swap, block, sizeof(block)) != sizeof(block))
262 createError("write");
263 }
264 #else
265 if (ftruncate(swap, maxSize()) != 0)
266 createError("truncate");
267
268 char header[HeaderSize];
269 memset(header, '\0', sizeof(header));
270 if (write(swap, header, sizeof(header)) != sizeof(header))
271 createError("write");
272 #endif
273
274 close(swap);
275 }
276
277 // report Rock DB creation error and exit
278 void
279 Rock::SwapDir::createError(const char *const msg) {
280 debugs(47, DBG_CRITICAL, "ERROR: Failed to initialize Rock Store db in " <<
281 filePath << "; " << msg << " error: " << xstrerror());
282 fatal("Rock Store db creation error");
283 }
284
285 void
286 Rock::SwapDir::init()
287 {
288 debugs(47,2, HERE);
289
290 // XXX: SwapDirs aren't refcounted. We make IORequestor calls, which
291 // are refcounted. We up our count once to avoid implicit delete's.
292 lock();
293
294 freeSlots = shm_old(Ipc::Mem::PageStack)(freeSlotsPath());
295
296 Must(!map);
297 map = new DirMap(inodeMapPath());
298 map->cleaner = this;
299
300 const char *ioModule = needsDiskStrand() ? "IpcIo" : "Blocking";
301 if (DiskIOModule *m = DiskIOModule::Find(ioModule)) {
302 debugs(47,2, HERE << "Using DiskIO module: " << ioModule);
303 io = m->createStrategy();
304 io->init();
305 } else {
306 debugs(47, DBG_CRITICAL, "FATAL: Rock store is missing DiskIO module: " <<
307 ioModule);
308 fatal("Rock Store missing a required DiskIO module");
309 }
310
311 theFile = io->newFile(filePath);
312 theFile->configure(fileConfig);
313 theFile->open(O_RDWR, 0644, this);
314
315 // Increment early. Otherwise, if one SwapDir finishes rebuild before
316 // others start, storeRebuildComplete() will think the rebuild is over!
317 // TODO: move store_dirs_rebuilding hack to store modules that need it.
318 ++StoreController::store_dirs_rebuilding;
319 }
320
321 bool
322 Rock::SwapDir::needsDiskStrand() const
323 {
324 const bool wontEvenWorkWithoutDisker = Config.workers > 1;
325 const bool wouldWorkBetterWithDisker = DiskIOModule::Find("IpcIo");
326 return InDaemonMode() && (wontEvenWorkWithoutDisker ||
327 wouldWorkBetterWithDisker);
328 }
329
330 void
331 Rock::SwapDir::parse(int anIndex, char *aPath)
332 {
333 index = anIndex;
334
335 path = xstrdup(aPath);
336
337 // cache store is located at path/db
338 String fname(path);
339 fname.append("/rock");
340 filePath = xstrdup(fname.termedBuf());
341
342 parseSize(false);
343 parseOptions(0);
344
345 // Current openForWriting() code overwrites the old slot if needed
346 // and possible, so proactively removing old slots is probably useless.
347 assert(!repl); // repl = createRemovalPolicy(Config.replPolicy);
348
349 validateOptions();
350 }
351
352 void
353 Rock::SwapDir::reconfigure()
354 {
355 parseSize(true);
356 parseOptions(1);
357 // TODO: can we reconfigure the replacement policy (repl)?
358 validateOptions();
359 }
360
361 /// parse maximum db disk size
362 void
363 Rock::SwapDir::parseSize(const bool reconfig)
364 {
365 const int i = GetInteger();
366 if (i < 0)
367 fatal("negative Rock cache_dir size value");
368 const uint64_t new_max_size =
369 static_cast<uint64_t>(i) << 20; // MBytes to Bytes
370 if (!reconfig)
371 max_size = new_max_size;
372 else if (new_max_size != max_size) {
373 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir '" << path << "' size "
374 "cannot be changed dynamically, value left unchanged (" <<
375 (max_size >> 20) << " MB)");
376 }
377 }
378
379 ConfigOption *
380 Rock::SwapDir::getOptionTree() const
381 {
382 ConfigOptionVector *vector = dynamic_cast<ConfigOptionVector*>(::SwapDir::getOptionTree());
383 assert(vector);
384 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseSizeOption, &SwapDir::dumpSizeOption));
385 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseTimeOption, &SwapDir::dumpTimeOption));
386 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseRateOption, &SwapDir::dumpRateOption));
387 return vector;
388 }
389
390 bool
391 Rock::SwapDir::allowOptionReconfigure(const char *const option) const
392 {
393 return strcmp(option, "slot-size") != 0 &&
394 ::SwapDir::allowOptionReconfigure(option);
395 }
396
397 /// parses time-specific options; mimics ::SwapDir::optionObjectSizeParse()
398 bool
399 Rock::SwapDir::parseTimeOption(char const *option, const char *value, int reconfig)
400 {
401 // TODO: ::SwapDir or, better, Config should provide time-parsing routines,
402 // including time unit handling. Same for size and rate.
403
404 time_msec_t *storedTime;
405 if (strcmp(option, "swap-timeout") == 0)
406 storedTime = &fileConfig.ioTimeout;
407 else
408 return false;
409
410 if (!value)
411 self_destruct();
412
413 // TODO: handle time units and detect parsing errors better
414 const int64_t parsedValue = strtoll(value, NULL, 10);
415 if (parsedValue < 0) {
416 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
417 self_destruct();
418 }
419
420 const time_msec_t newTime = static_cast<time_msec_t>(parsedValue);
421
422 if (!reconfig)
423 *storedTime = newTime;
424 else if (*storedTime != newTime) {
425 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
426 << " cannot be changed dynamically, value left unchanged: " <<
427 *storedTime);
428 }
429
430 return true;
431 }
432
433 /// reports time-specific options; mimics ::SwapDir::optionObjectSizeDump()
434 void
435 Rock::SwapDir::dumpTimeOption(StoreEntry * e) const
436 {
437 if (fileConfig.ioTimeout)
438 storeAppendPrintf(e, " swap-timeout=%" PRId64,
439 static_cast<int64_t>(fileConfig.ioTimeout));
440 }
441
442 /// parses rate-specific options; mimics ::SwapDir::optionObjectSizeParse()
443 bool
444 Rock::SwapDir::parseRateOption(char const *option, const char *value, int isaReconfig)
445 {
446 int *storedRate;
447 if (strcmp(option, "max-swap-rate") == 0)
448 storedRate = &fileConfig.ioRate;
449 else
450 return false;
451
452 if (!value)
453 self_destruct();
454
455 // TODO: handle time units and detect parsing errors better
456 const int64_t parsedValue = strtoll(value, NULL, 10);
457 if (parsedValue < 0) {
458 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
459 self_destruct();
460 }
461
462 const int newRate = static_cast<int>(parsedValue);
463
464 if (newRate < 0) {
465 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << newRate);
466 self_destruct();
467 }
468
469 if (!isaReconfig)
470 *storedRate = newRate;
471 else if (*storedRate != newRate) {
472 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
473 << " cannot be changed dynamically, value left unchanged: " <<
474 *storedRate);
475 }
476
477 return true;
478 }
479
480 /// reports rate-specific options; mimics ::SwapDir::optionObjectSizeDump()
481 void
482 Rock::SwapDir::dumpRateOption(StoreEntry * e) const
483 {
484 if (fileConfig.ioRate >= 0)
485 storeAppendPrintf(e, " max-swap-rate=%d", fileConfig.ioRate);
486 }
487
488 /// parses size-specific options; mimics ::SwapDir::optionObjectSizeParse()
489 bool
490 Rock::SwapDir::parseSizeOption(char const *option, const char *value, int reconfig)
491 {
492 uint64_t *storedSize;
493 if (strcmp(option, "slot-size") == 0)
494 storedSize = &slotSize;
495 else
496 return false;
497
498 if (!value)
499 self_destruct();
500
501 // TODO: handle size units and detect parsing errors better
502 const uint64_t newSize = strtoll(value, NULL, 10);
503 if (newSize <= 0) {
504 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must be positive; got: " << newSize);
505 self_destruct();
506 }
507
508 if (newSize <= sizeof(DbCellHeader)) {
509 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must exceed " << sizeof(DbCellHeader) << "; got: " << newSize);
510 self_destruct();
511 }
512
513 if (!reconfig)
514 *storedSize = newSize;
515 else if (*storedSize != newSize) {
516 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
517 << " cannot be changed dynamically, value left unchanged: " <<
518 *storedSize);
519 }
520
521 return true;
522 }
523
524 /// reports size-specific options; mimics ::SwapDir::optionObjectSizeDump()
525 void
526 Rock::SwapDir::dumpSizeOption(StoreEntry * e) const
527 {
528 storeAppendPrintf(e, " slot-size=%" PRId64, slotSize);
529 }
530
531 /// check the results of the configuration; only level-0 debugging works here
532 void
533 Rock::SwapDir::validateOptions()
534 {
535 if (slotSize <= 0)
536 fatal("Rock store requires a positive slot-size");
537
538 const int64_t maxSizeRoundingWaste = 1024 * 1024; // size is configured in MB
539 const int64_t slotSizeRoundingWaste = slotSize;
540 const int64_t maxRoundingWaste =
541 max(maxSizeRoundingWaste, slotSizeRoundingWaste);
542 const int64_t usableDiskSize = diskOffset(entryLimitAllowed());
543 const int64_t diskWasteSize = maxSize() - usableDiskSize;
544 Must(diskWasteSize >= 0);
545
546 // warn if maximum db size is not reachable due to sfileno limit
547 if (entryLimitAllowed() == entryLimitHigh() &&
548 diskWasteSize >= maxRoundingWaste) {
549 debugs(47, DBG_CRITICAL, "Rock store cache_dir[" << index << "] '" << path << "':");
550 debugs(47, DBG_CRITICAL, "\tmaximum number of entries: " << entryLimitAllowed());
551 debugs(47, DBG_CRITICAL, "\tdb slot size: " << slotSize << " Bytes");
552 debugs(47, DBG_CRITICAL, "\tmaximum db size: " << maxSize() << " Bytes");
553 debugs(47, DBG_CRITICAL, "\tusable db size: " << usableDiskSize << " Bytes");
554 debugs(47, DBG_CRITICAL, "\tdisk space waste: " << diskWasteSize << " Bytes");
555 debugs(47, DBG_CRITICAL, "WARNING: Rock store config wastes space.");
556 }
557 }
558
559 void
560 Rock::SwapDir::rebuild()
561 {
562 //++StoreController::store_dirs_rebuilding; // see Rock::SwapDir::init()
563 AsyncJob::Start(new Rebuild(this));
564 }
565
566 bool
567 Rock::SwapDir::canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const
568 {
569 if (!::SwapDir::canStore(e, sizeof(DbCellHeader)+diskSpaceNeeded, load))
570 return false;
571
572 if (!theFile || !theFile->canWrite())
573 return false;
574
575 if (!map)
576 return false;
577
578 // Do not start I/O transaction if there are less than 10% free pages left.
579 // TODO: reserve page instead
580 if (needsDiskStrand() &&
581 Ipc::Mem::PageLevel(Ipc::Mem::PageId::ioPage) >= 0.9 * Ipc::Mem::PageLimit(Ipc::Mem::PageId::ioPage)) {
582 debugs(47, 5, HERE << "too few shared pages for IPC I/O left");
583 return false;
584 }
585
586 if (io->shedLoad())
587 return false;
588
589 load = io->load();
590 return true;
591 }
592
593 StoreIOState::Pointer
594 Rock::SwapDir::createStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
595 {
596 if (!theFile || theFile->error()) {
597 debugs(47,4, HERE << theFile);
598 return NULL;
599 }
600
601 sfileno filen;
602 Ipc::StoreMapAnchor *const slot =
603 map->openForWriting(reinterpret_cast<const cache_key *>(e.key), filen);
604 if (!slot) {
605 debugs(47, 5, HERE << "map->add failed");
606 return NULL;
607 }
608
609 assert(filen >= 0);
610 slot->set(e);
611
612 // XXX: We rely on our caller, storeSwapOutStart(), to set e.fileno.
613 // If that does not happen, the entry will not decrement the read level!
614
615 Rock::SwapDir::Pointer self(this);
616 IoState *sio = new IoState(self, &e, cbFile, cbIo, data);
617
618 sio->swap_dirn = index;
619 sio->swap_filen = filen;
620 sio->writeableAnchor_ = slot;
621
622 debugs(47,5, HERE << "dir " << index << " created new filen " <<
623 std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
624 sio->swap_filen << std::dec << " starting at " <<
625 diskOffset(sio->swap_filen));
626
627 sio->file(theFile);
628
629 trackReferences(e);
630 return sio;
631 }
632
633 int64_t
634 Rock::SwapDir::diskOffset(int filen) const
635 {
636 assert(filen >= 0);
637 return HeaderSize + slotSize*filen;
638 }
639
640 int64_t
641 Rock::SwapDir::diskOffset(Ipc::Mem::PageId &pageId) const
642 {
643 assert(pageId);
644 return diskOffset(pageId.number - 1);
645 }
646
647 int64_t
648 Rock::SwapDir::diskOffsetLimit() const
649 {
650 assert(map);
651 return diskOffset(map->entryLimit());
652 }
653
654 int
655 Rock::SwapDir::entryMaxPayloadSize() const
656 {
657 return slotSize - sizeof(DbCellHeader);
658 }
659
660 int
661 Rock::SwapDir::entriesNeeded(const int64_t objSize) const
662 {
663 return (objSize + entryMaxPayloadSize() - 1) / entryMaxPayloadSize();
664 }
665
666 bool
667 Rock::SwapDir::useFreeSlot(Ipc::Mem::PageId &pageId)
668 {
669 if (freeSlots->pop(pageId)) {
670 debugs(47, 5, "got a previously free slot: " << pageId);
671 return true;
672 }
673
674 // catch free slots delivered to noteFreeMapSlice()
675 assert(!waitingForPage);
676 waitingForPage = &pageId;
677 if (map->purgeOne()) {
678 assert(!waitingForPage); // noteFreeMapSlice() should have cleared it
679 assert(pageId.set());
680 debugs(47, 5, "got a previously busy slot: " << pageId);
681 return true;
682 }
683 assert(waitingForPage == &pageId);
684 waitingForPage = NULL;
685
686 debugs(47, 3, "cannot get a slot; entries: " << map->entryCount());
687 return false;
688 }
689
690 bool
691 Rock::SwapDir::validSlotId(const SlotId slotId) const
692 {
693 return 0 <= slotId && slotId < entryLimitAllowed();
694 }
695
696 void
697 Rock::SwapDir::noteFreeMapSlice(const sfileno sliceId)
698 {
699 Ipc::Mem::PageId pageId;
700 pageId.pool = index+1;
701 pageId.number = sliceId+1;
702 if (waitingForPage) {
703 *waitingForPage = pageId;
704 waitingForPage = NULL;
705 } else {
706 freeSlots->push(pageId);
707 }
708 }
709
710 // tries to open an old entry with swap_filen for reading
711 StoreIOState::Pointer
712 Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
713 {
714 if (!theFile || theFile->error()) {
715 debugs(47,4, HERE << theFile);
716 return NULL;
717 }
718
719 if (e.swap_filen < 0) {
720 debugs(47,4, HERE << e);
721 return NULL;
722 }
723
724 // Do not start I/O transaction if there are less than 10% free pages left.
725 // TODO: reserve page instead
726 if (needsDiskStrand() &&
727 Ipc::Mem::PageLevel(Ipc::Mem::PageId::ioPage) >= 0.9 * Ipc::Mem::PageLimit(Ipc::Mem::PageId::ioPage)) {
728 debugs(47, 5, HERE << "too few shared pages for IPC I/O left");
729 return NULL;
730 }
731
732 // The are two ways an entry can get swap_filen: our get() locked it for
733 // reading or our storeSwapOutStart() locked it for writing. Peeking at our
734 // locked entry is safe, but no support for reading a filling entry.
735 const Ipc::StoreMapAnchor *slot = map->peekAtReader(e.swap_filen);
736 if (!slot)
737 return NULL; // we were writing afterall
738
739 Rock::SwapDir::Pointer self(this);
740 IoState *sio = new IoState(self, &e, cbFile, cbIo, data);
741
742 sio->swap_dirn = index;
743 sio->swap_filen = e.swap_filen;
744 sio->readableAnchor_ = slot;
745 sio->file(theFile);
746
747 debugs(47,5, HERE << "dir " << index << " has old filen: " <<
748 std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
749 sio->swap_filen);
750
751 assert(slot->sameKey(static_cast<const cache_key*>(e.key)));
752 assert(slot->basics.swap_file_sz > 0);
753 assert(slot->basics.swap_file_sz == e.swap_file_sz);
754
755 return sio;
756 }
757
758 void
759 Rock::SwapDir::ioCompletedNotification()
760 {
761 if (!theFile)
762 fatalf("Rock cache_dir failed to initialize db file: %s", filePath);
763
764 if (theFile->error())
765 fatalf("Rock cache_dir at %s failed to open db file: %s", filePath,
766 xstrerror());
767
768 debugs(47, 2, "Rock cache_dir[" << index << "] limits: " <<
769 std::setw(12) << maxSize() << " disk bytes and " <<
770 std::setw(7) << map->entryLimit() << " entries");
771
772 rebuild();
773 }
774
775 void
776 Rock::SwapDir::closeCompleted()
777 {
778 theFile = NULL;
779 }
780
781 void
782 Rock::SwapDir::readCompleted(const char *buf, int rlen, int errflag, RefCount< ::ReadRequest> r)
783 {
784 ReadRequest *request = dynamic_cast<Rock::ReadRequest*>(r.getRaw());
785 assert(request);
786 IoState::Pointer sio = request->sio;
787
788 if (errflag == DISK_OK && rlen > 0)
789 sio->offset_ += rlen;
790
791 StoreIOState::STRCB *callb = sio->read.callback;
792 assert(callb);
793 sio->read.callback = NULL;
794 void *cbdata;
795 if (cbdataReferenceValidDone(sio->read.callback_data, &cbdata))
796 callb(cbdata, r->buf, rlen, sio.getRaw());
797 }
798
799 void
800 Rock::SwapDir::writeCompleted(int errflag, size_t rlen, RefCount< ::WriteRequest> r)
801 {
802 Rock::WriteRequest *request = dynamic_cast<Rock::WriteRequest*>(r.getRaw());
803 assert(request);
804 assert(request->sio != NULL);
805 IoState &sio = *request->sio;
806
807 // quit if somebody called IoState::close() while we were waiting
808 if (!sio.stillWaiting()) {
809 debugs(79, 3, "ignoring closed entry " << sio.swap_filen);
810 return;
811 }
812
813 if (errflag == DISK_OK) {
814 // do not increment sio.offset_ because we do it in sio->write()
815
816 // finalize the shared slice info after writing slice contents to disk
817 Ipc::StoreMap::Slice &slice =
818 map->writeableSlice(sio.swap_filen, request->sidCurrent);
819 slice.size = request->len - sizeof(DbCellHeader);
820 slice.next = request->sidNext;
821
822 if (request->sidNext < 0) {
823 // close, the entry gets the read lock
824 map->closeForWriting(sio.swap_filen, true);
825 sio.finishedWriting(errflag);
826 }
827 } else {
828 writeError(*sio.e);
829 sio.finishedWriting(errflag);
830 // and hope that Core will call disconnect() to close the map entry
831 }
832
833 CollapsedForwarding::Broadcast(*sio.e);
834 }
835
836 void
837 Rock::SwapDir::writeError(StoreEntry &e)
838 {
839 // Do not abortWriting here. The entry should keep the write lock
840 // instead of losing association with the store and confusing core.
841 map->freeEntry(e.swap_filen); // will mark as unusable, just in case
842
843 Store::Root().transientsAbandon(e);
844
845 // All callers must also call IoState callback, to propagate the error.
846 }
847
848 bool
849 Rock::SwapDir::full() const
850 {
851 return freeSlots != NULL && !freeSlots->size();
852 }
853
854 // storeSwapOutFileClosed calls this nethod on DISK_NO_SPACE_LEFT,
855 // but it should not happen for us
856 void
857 Rock::SwapDir::diskFull()
858 {
859 debugs(20, DBG_IMPORTANT, "BUG: No space left with rock cache_dir: " <<
860 filePath);
861 }
862
863 /// purge while full(); it should be sufficient to purge just one
864 void
865 Rock::SwapDir::maintain()
866 {
867 // The Store calls this to free some db space, but there is nothing wrong
868 // with a full() db, except when db has to shrink after reconfigure, and
869 // we do not support shrinking yet (it would have to purge specific slots).
870 // TODO: Disable maintain() requests when they are pointless.
871 }
872
873 void
874 Rock::SwapDir::reference(StoreEntry &e)
875 {
876 debugs(47, 5, HERE << &e << ' ' << e.swap_dirn << ' ' << e.swap_filen);
877 if (repl && repl->Referenced)
878 repl->Referenced(repl, &e, &e.repl);
879 }
880
881 bool
882 Rock::SwapDir::dereference(StoreEntry &e, bool)
883 {
884 debugs(47, 5, HERE << &e << ' ' << e.swap_dirn << ' ' << e.swap_filen);
885 if (repl && repl->Dereferenced)
886 repl->Dereferenced(repl, &e, &e.repl);
887
888 // no need to keep e in the global store_table for us; we have our own map
889 return false;
890 }
891
892 bool
893 Rock::SwapDir::unlinkdUseful() const
894 {
895 // no entry-specific files to unlink
896 return false;
897 }
898
899 void
900 Rock::SwapDir::unlink(StoreEntry &e)
901 {
902 debugs(47, 5, HERE << e);
903 ignoreReferences(e);
904 map->freeEntry(e.swap_filen);
905 disconnect(e);
906 }
907
908 void
909 Rock::SwapDir::markForUnlink(StoreEntry &e)
910 {
911 debugs(47, 5, e);
912 map->freeEntry(e.swap_filen);
913 }
914
915 void
916 Rock::SwapDir::trackReferences(StoreEntry &e)
917 {
918 debugs(47, 5, HERE << e);
919 if (repl)
920 repl->Add(repl, &e, &e.repl);
921 }
922
923 void
924 Rock::SwapDir::ignoreReferences(StoreEntry &e)
925 {
926 debugs(47, 5, HERE << e);
927 if (repl)
928 repl->Remove(repl, &e, &e.repl);
929 }
930
931 void
932 Rock::SwapDir::statfs(StoreEntry &e) const
933 {
934 storeAppendPrintf(&e, "\n");
935 storeAppendPrintf(&e, "Maximum Size: %" PRIu64 " KB\n", maxSize() >> 10);
936 storeAppendPrintf(&e, "Current Size: %.2f KB %.2f%%\n",
937 currentSize() / 1024.0,
938 Math::doublePercent(currentSize(), maxSize()));
939
940 if (map) {
941 const int limit = map->entryLimit();
942 storeAppendPrintf(&e, "Maximum entries: %9d\n", limit);
943 if (limit > 0) {
944 const int entryCount = map->entryCount();
945 storeAppendPrintf(&e, "Current entries: %9d %.2f%%\n",
946 entryCount, (100.0 * entryCount / limit));
947
948 const unsigned int slotsFree = !freeSlots ? 0 : freeSlots->size();
949 if (slotsFree <= static_cast<const unsigned int>(limit)) {
950 const int usedSlots = limit - static_cast<const int>(slotsFree);
951 storeAppendPrintf(&e, "Used slots: %9d %.2f%%\n",
952 usedSlots, (100.0 * usedSlots / limit));
953 }
954 if (limit < 100) { // XXX: otherwise too expensive to count
955 Ipc::ReadWriteLockStats stats;
956 map->updateStats(stats);
957 stats.dump(e);
958 }
959 }
960 }
961
962 storeAppendPrintf(&e, "Pending operations: %d out of %d\n",
963 store_open_disk_fd, Config.max_open_disk_fds);
964
965 storeAppendPrintf(&e, "Flags:");
966
967 if (flags.selected)
968 storeAppendPrintf(&e, " SELECTED");
969
970 if (flags.read_only)
971 storeAppendPrintf(&e, " READ-ONLY");
972
973 storeAppendPrintf(&e, "\n");
974
975 }
976
977 const char *
978 Rock::SwapDir::inodeMapPath() const {
979 static String inodesPath;
980 inodesPath = path;
981 inodesPath.append("_inodes");
982 return inodesPath.termedBuf();
983 }
984
985 const char *
986 Rock::SwapDir::freeSlotsPath() const {
987 static String spacesPath;
988 spacesPath = path;
989 spacesPath.append("_spaces");
990 return spacesPath.termedBuf();
991 }
992
993 namespace Rock
994 {
995 RunnerRegistrationEntry(rrAfterConfig, SwapDirRr);
996 }
997
998 void Rock::SwapDirRr::create(const RunnerRegistry &)
999 {
1000 Must(mapOwners.empty() && freeSlotsOwners.empty());
1001 for (int i = 0; i < Config.cacheSwap.n_configured; ++i) {
1002 if (const Rock::SwapDir *const sd = dynamic_cast<Rock::SwapDir *>(INDEXSD(i))) {
1003 const int64_t capacity = sd->entryLimitAllowed();
1004
1005 SwapDir::DirMap::Owner *const mapOwner =
1006 SwapDir::DirMap::Init(sd->inodeMapPath(), capacity);
1007 mapOwners.push_back(mapOwner);
1008
1009 // TODO: somehow remove pool id and counters from PageStack?
1010 Ipc::Mem::Owner<Ipc::Mem::PageStack> *const freeSlotsOwner =
1011 shm_new(Ipc::Mem::PageStack)(sd->freeSlotsPath(),
1012 i+1, capacity,
1013 sizeof(DbCellHeader));
1014 freeSlotsOwners.push_back(freeSlotsOwner);
1015
1016 // TODO: add method to initialize PageStack with no free pages
1017 while (true) {
1018 Ipc::Mem::PageId pageId;
1019 if (!freeSlotsOwner->object()->pop(pageId))
1020 break;
1021 }
1022 }
1023 }
1024 }
1025
1026 Rock::SwapDirRr::~SwapDirRr()
1027 {
1028 for (size_t i = 0; i < mapOwners.size(); ++i) {
1029 delete mapOwners[i];
1030 delete freeSlotsOwners[i];
1031 }
1032 }