]> git.ipfire.org Git - thirdparty/squid.git/blob - src/fs/rock/RockSwapDir.cc
Tightened StoreEntry locking. Fixed entry touching and synchronization code:
[thirdparty/squid.git] / src / fs / rock / RockSwapDir.cc
1 /*
2 * DEBUG: section 47 Store Directory Routines
3 */
4
5 #include "squid.h"
6 #include "cache_cf.h"
7 #include "CollapsedForwarding.h"
8 #include "ConfigOption.h"
9 #include "DiskIO/DiskIOModule.h"
10 #include "DiskIO/DiskIOStrategy.h"
11 #include "DiskIO/ReadRequest.h"
12 #include "DiskIO/WriteRequest.h"
13 #include "fs/rock/RockSwapDir.h"
14 #include "fs/rock/RockIoState.h"
15 #include "fs/rock/RockIoRequests.h"
16 #include "fs/rock/RockRebuild.h"
17 #include "globals.h"
18 #include "ipc/mem/Pages.h"
19 #include "MemObject.h"
20 #include "Parsing.h"
21 #include "SquidConfig.h"
22 #include "SquidMath.h"
23 #include "tools.h"
24
25 #include <cstdlib>
26 #include <iomanip>
27
28 #if HAVE_SYS_STAT_H
29 #include <sys/stat.h>
30 #endif
31
32 const int64_t Rock::SwapDir::HeaderSize = 16*1024;
33
34 Rock::SwapDir::SwapDir(): ::SwapDir("rock"),
35 slotSize(HeaderSize), filePath(NULL), map(NULL), io(NULL),
36 waitingForPage(NULL)
37 {
38 }
39
40 Rock::SwapDir::~SwapDir()
41 {
42 delete io;
43 delete map;
44 safe_free(filePath);
45 }
46
47 StoreSearch *
48 Rock::SwapDir::search(String const url, HttpRequest *)
49 {
50 assert(false);
51 return NULL; // XXX: implement
52 }
53
54 void
55 Rock::SwapDir::get(String const key, STOREGETCLIENT cb, void *data)
56 {
57 ::SwapDir::get(key, cb, data);
58 }
59
60 // called when Squid core needs a StoreEntry with a given key
61 StoreEntry *
62 Rock::SwapDir::get(const cache_key *key)
63 {
64 if (!map || !theFile || !theFile->canRead())
65 return NULL;
66
67 sfileno filen;
68 const Ipc::StoreMapAnchor *const slot = map->openForReading(key, filen);
69 if (!slot)
70 return NULL;
71
72 // create a brand new store entry and initialize it with stored basics
73 StoreEntry *e = new StoreEntry();
74 anchorEntry(*e, filen, *slot);
75
76 e->hashInsert(key);
77 trackReferences(*e);
78
79 return e;
80 // the disk entry remains open for reading, protected from modifications
81 }
82
83 bool
84 Rock::SwapDir::anchorCollapsed(StoreEntry &collapsed, bool &inSync)
85 {
86 if (!map || !theFile || !theFile->canRead())
87 return false;
88
89 sfileno filen;
90 const Ipc::StoreMapAnchor *const slot = map->openForReading(
91 reinterpret_cast<cache_key*>(collapsed.key), filen);
92 if (!slot)
93 return false;
94
95 anchorEntry(collapsed, filen, *slot);
96 inSync = updateCollapsedWith(collapsed, *slot);
97 return false;
98 }
99
100 bool
101 Rock::SwapDir::updateCollapsed(StoreEntry &collapsed)
102 {
103 if (!map || !theFile || !theFile->canRead())
104 return false;
105
106 if (collapsed.swap_filen < 0) // no longer using a disk cache
107 return true;
108 assert(collapsed.swap_dirn == index);
109
110 const Ipc::StoreMapAnchor &s = map->readableEntry(collapsed.swap_filen);
111 return updateCollapsedWith(collapsed, s);
112 }
113
114 bool
115 Rock::SwapDir::updateCollapsedWith(StoreEntry &collapsed, const Ipc::StoreMapAnchor &anchor)
116 {
117 collapsed.swap_file_sz = anchor.basics.swap_file_sz; // XXX: make atomic
118 return true;
119 }
120
121 void
122 Rock::SwapDir::anchorEntry(StoreEntry &e, const sfileno filen, const Ipc::StoreMapAnchor &anchor)
123 {
124 const Ipc::StoreMapAnchor::Basics &basics = anchor.basics;
125
126 e.swap_file_sz = basics.swap_file_sz;
127 e.swap_dirn = index;
128 e.swap_filen = filen;
129 e.lastref = basics.lastref;
130 e.timestamp = basics.timestamp;
131 e.expires = basics.expires;
132 e.lastmod = basics.lastmod;
133 e.refcount = basics.refcount;
134 e.flags = basics.flags;
135
136 e.store_status = STORE_OK;
137 e.setMemStatus(NOT_IN_MEMORY);
138 e.swap_status = SWAPOUT_DONE;
139 e.ping_status = PING_NONE;
140
141 EBIT_SET(e.flags, ENTRY_CACHABLE);
142 EBIT_CLR(e.flags, RELEASE_REQUEST);
143 EBIT_CLR(e.flags, KEY_PRIVATE);
144 EBIT_SET(e.flags, ENTRY_VALIDATED);
145 }
146
147
148 void Rock::SwapDir::disconnect(StoreEntry &e)
149 {
150 assert(e.swap_dirn == index);
151 assert(e.swap_filen >= 0);
152 // cannot have SWAPOUT_NONE entry with swap_filen >= 0
153 assert(e.swap_status != SWAPOUT_NONE);
154
155 // do not rely on e.swap_status here because there is an async delay
156 // before it switches from SWAPOUT_WRITING to SWAPOUT_DONE.
157
158 // since e has swap_filen, its slot is locked for reading and/or writing
159 // but it is difficult to know whether THIS worker is reading or writing e
160 if (e.swap_status == SWAPOUT_WRITING ||
161 (e.mem_obj && e.mem_obj->swapout.sio != NULL))
162 map->abortWriting(e.swap_filen);
163 else
164 map->closeForReading(e.swap_filen);
165 e.swap_dirn = -1;
166 e.swap_filen = -1;
167 e.swap_status = SWAPOUT_NONE;
168 }
169
170 uint64_t
171 Rock::SwapDir::currentSize() const
172 {
173 const uint64_t spaceSize = !freeSlots ?
174 maxSize() : (slotSize * freeSlots->size());
175 // everything that is not free is in use
176 return maxSize() - spaceSize;
177 }
178
179 uint64_t
180 Rock::SwapDir::currentCount() const
181 {
182 return map ? map->entryCount() : 0;
183 }
184
185 /// In SMP mode only the disker process reports stats to avoid
186 /// counting the same stats by multiple processes.
187 bool
188 Rock::SwapDir::doReportStat() const
189 {
190 return ::SwapDir::doReportStat() && (!UsingSmp() || IamDiskProcess());
191 }
192
193 void
194 Rock::SwapDir::swappedOut(const StoreEntry &)
195 {
196 // stats are not stored but computed when needed
197 }
198
199 int64_t
200 Rock::SwapDir::entryLimitAllowed() const
201 {
202 const int64_t eLimitLo = map ? map->entryLimit() : 0; // dynamic shrinking unsupported
203 const int64_t eWanted = (maxSize() - HeaderSize)/slotSize;
204 return min(max(eLimitLo, eWanted), entryLimitHigh());
205 }
206
207 // TODO: encapsulate as a tool; identical to CossSwapDir::create()
208 void
209 Rock::SwapDir::create()
210 {
211 assert(path);
212 assert(filePath);
213
214 if (UsingSmp() && !IamDiskProcess()) {
215 debugs (47,3, HERE << "disker will create in " << path);
216 return;
217 }
218
219 debugs (47,3, HERE << "creating in " << path);
220
221 struct stat dir_sb;
222 if (::stat(path, &dir_sb) == 0) {
223 struct stat file_sb;
224 if (::stat(filePath, &file_sb) == 0) {
225 debugs (47, DBG_IMPORTANT, "Skipping existing Rock db: " << filePath);
226 return;
227 }
228 // else the db file is not there or is not accessible, and we will try
229 // to create it later below, generating a detailed error on failures.
230 } else { // path does not exist or is inaccessible
231 // If path exists but is not accessible, mkdir() below will fail, and
232 // the admin should see the error and act accordingly, so there is
233 // no need to distinguish ENOENT from other possible stat() errors.
234 debugs (47, DBG_IMPORTANT, "Creating Rock db directory: " << path);
235 const int res = mkdir(path, 0700);
236 if (res != 0)
237 createError("mkdir");
238 }
239
240 debugs (47, DBG_IMPORTANT, "Creating Rock db: " << filePath);
241 const int swap = open(filePath, O_WRONLY|O_CREAT|O_TRUNC|O_BINARY, 0600);
242 if (swap < 0)
243 createError("create");
244
245 #if SLOWLY_FILL_WITH_ZEROS
246 char block[1024];
247 Must(maxSize() % sizeof(block) == 0);
248 memset(block, '\0', sizeof(block));
249
250 for (off_t offset = 0; offset < maxSize(); offset += sizeof(block)) {
251 if (write(swap, block, sizeof(block)) != sizeof(block))
252 createError("write");
253 }
254 #else
255 if (ftruncate(swap, maxSize()) != 0)
256 createError("truncate");
257
258 char header[HeaderSize];
259 memset(header, '\0', sizeof(header));
260 if (write(swap, header, sizeof(header)) != sizeof(header))
261 createError("write");
262 #endif
263
264 close(swap);
265 }
266
267 // report Rock DB creation error and exit
268 void
269 Rock::SwapDir::createError(const char *const msg) {
270 debugs(47, DBG_CRITICAL, "ERROR: Failed to initialize Rock Store db in " <<
271 filePath << "; " << msg << " error: " << xstrerror());
272 fatal("Rock Store db creation error");
273 }
274
275 void
276 Rock::SwapDir::init()
277 {
278 debugs(47,2, HERE);
279
280 // XXX: SwapDirs aren't refcounted. We make IORequestor calls, which
281 // are refcounted. We up our count once to avoid implicit delete's.
282 lock();
283
284 freeSlots = shm_old(Ipc::Mem::PageStack)(freeSlotsPath());
285
286 Must(!map);
287 map = new DirMap(inodeMapPath());
288 map->cleaner = this;
289
290 const char *ioModule = needsDiskStrand() ? "IpcIo" : "Blocking";
291 if (DiskIOModule *m = DiskIOModule::Find(ioModule)) {
292 debugs(47,2, HERE << "Using DiskIO module: " << ioModule);
293 io = m->createStrategy();
294 io->init();
295 } else {
296 debugs(47, DBG_CRITICAL, "FATAL: Rock store is missing DiskIO module: " <<
297 ioModule);
298 fatal("Rock Store missing a required DiskIO module");
299 }
300
301 theFile = io->newFile(filePath);
302 theFile->configure(fileConfig);
303 theFile->open(O_RDWR, 0644, this);
304
305 // Increment early. Otherwise, if one SwapDir finishes rebuild before
306 // others start, storeRebuildComplete() will think the rebuild is over!
307 // TODO: move store_dirs_rebuilding hack to store modules that need it.
308 ++StoreController::store_dirs_rebuilding;
309 }
310
311 bool
312 Rock::SwapDir::needsDiskStrand() const
313 {
314 const bool wontEvenWorkWithoutDisker = Config.workers > 1;
315 const bool wouldWorkBetterWithDisker = DiskIOModule::Find("IpcIo");
316 return InDaemonMode() && (wontEvenWorkWithoutDisker ||
317 wouldWorkBetterWithDisker);
318 }
319
320 void
321 Rock::SwapDir::parse(int anIndex, char *aPath)
322 {
323 index = anIndex;
324
325 path = xstrdup(aPath);
326
327 // cache store is located at path/db
328 String fname(path);
329 fname.append("/rock");
330 filePath = xstrdup(fname.termedBuf());
331
332 parseSize(false);
333 parseOptions(0);
334
335 // Current openForWriting() code overwrites the old slot if needed
336 // and possible, so proactively removing old slots is probably useless.
337 assert(!repl); // repl = createRemovalPolicy(Config.replPolicy);
338
339 validateOptions();
340 }
341
342 void
343 Rock::SwapDir::reconfigure()
344 {
345 parseSize(true);
346 parseOptions(1);
347 // TODO: can we reconfigure the replacement policy (repl)?
348 validateOptions();
349 }
350
351 /// parse maximum db disk size
352 void
353 Rock::SwapDir::parseSize(const bool reconfig)
354 {
355 const int i = GetInteger();
356 if (i < 0)
357 fatal("negative Rock cache_dir size value");
358 const uint64_t new_max_size =
359 static_cast<uint64_t>(i) << 20; // MBytes to Bytes
360 if (!reconfig)
361 max_size = new_max_size;
362 else if (new_max_size != max_size) {
363 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir '" << path << "' size "
364 "cannot be changed dynamically, value left unchanged (" <<
365 (max_size >> 20) << " MB)");
366 }
367 }
368
369 ConfigOption *
370 Rock::SwapDir::getOptionTree() const
371 {
372 ConfigOptionVector *vector = dynamic_cast<ConfigOptionVector*>(::SwapDir::getOptionTree());
373 assert(vector);
374 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseSizeOption, &SwapDir::dumpSizeOption));
375 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseTimeOption, &SwapDir::dumpTimeOption));
376 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseRateOption, &SwapDir::dumpRateOption));
377 return vector;
378 }
379
380 bool
381 Rock::SwapDir::allowOptionReconfigure(const char *const option) const
382 {
383 return strcmp(option, "slot-size") != 0 &&
384 ::SwapDir::allowOptionReconfigure(option);
385 }
386
387 /// parses time-specific options; mimics ::SwapDir::optionObjectSizeParse()
388 bool
389 Rock::SwapDir::parseTimeOption(char const *option, const char *value, int reconfig)
390 {
391 // TODO: ::SwapDir or, better, Config should provide time-parsing routines,
392 // including time unit handling. Same for size and rate.
393
394 time_msec_t *storedTime;
395 if (strcmp(option, "swap-timeout") == 0)
396 storedTime = &fileConfig.ioTimeout;
397 else
398 return false;
399
400 if (!value)
401 self_destruct();
402
403 // TODO: handle time units and detect parsing errors better
404 const int64_t parsedValue = strtoll(value, NULL, 10);
405 if (parsedValue < 0) {
406 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
407 self_destruct();
408 }
409
410 const time_msec_t newTime = static_cast<time_msec_t>(parsedValue);
411
412 if (!reconfig)
413 *storedTime = newTime;
414 else if (*storedTime != newTime) {
415 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
416 << " cannot be changed dynamically, value left unchanged: " <<
417 *storedTime);
418 }
419
420 return true;
421 }
422
423 /// reports time-specific options; mimics ::SwapDir::optionObjectSizeDump()
424 void
425 Rock::SwapDir::dumpTimeOption(StoreEntry * e) const
426 {
427 if (fileConfig.ioTimeout)
428 storeAppendPrintf(e, " swap-timeout=%" PRId64,
429 static_cast<int64_t>(fileConfig.ioTimeout));
430 }
431
432 /// parses rate-specific options; mimics ::SwapDir::optionObjectSizeParse()
433 bool
434 Rock::SwapDir::parseRateOption(char const *option, const char *value, int isaReconfig)
435 {
436 int *storedRate;
437 if (strcmp(option, "max-swap-rate") == 0)
438 storedRate = &fileConfig.ioRate;
439 else
440 return false;
441
442 if (!value)
443 self_destruct();
444
445 // TODO: handle time units and detect parsing errors better
446 const int64_t parsedValue = strtoll(value, NULL, 10);
447 if (parsedValue < 0) {
448 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
449 self_destruct();
450 }
451
452 const int newRate = static_cast<int>(parsedValue);
453
454 if (newRate < 0) {
455 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << newRate);
456 self_destruct();
457 }
458
459 if (!isaReconfig)
460 *storedRate = newRate;
461 else if (*storedRate != newRate) {
462 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
463 << " cannot be changed dynamically, value left unchanged: " <<
464 *storedRate);
465 }
466
467 return true;
468 }
469
470 /// reports rate-specific options; mimics ::SwapDir::optionObjectSizeDump()
471 void
472 Rock::SwapDir::dumpRateOption(StoreEntry * e) const
473 {
474 if (fileConfig.ioRate >= 0)
475 storeAppendPrintf(e, " max-swap-rate=%d", fileConfig.ioRate);
476 }
477
478 /// parses size-specific options; mimics ::SwapDir::optionObjectSizeParse()
479 bool
480 Rock::SwapDir::parseSizeOption(char const *option, const char *value, int reconfig)
481 {
482 uint64_t *storedSize;
483 if (strcmp(option, "slot-size") == 0)
484 storedSize = &slotSize;
485 else
486 return false;
487
488 if (!value)
489 self_destruct();
490
491 // TODO: handle size units and detect parsing errors better
492 const uint64_t newSize = strtoll(value, NULL, 10);
493 if (newSize <= 0) {
494 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must be positive; got: " << newSize);
495 self_destruct();
496 }
497
498 if (newSize <= sizeof(DbCellHeader)) {
499 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must exceed " << sizeof(DbCellHeader) << "; got: " << newSize);
500 self_destruct();
501 }
502
503 if (!reconfig)
504 *storedSize = newSize;
505 else if (*storedSize != newSize) {
506 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
507 << " cannot be changed dynamically, value left unchanged: " <<
508 *storedSize);
509 }
510
511 return true;
512 }
513
514 /// reports size-specific options; mimics ::SwapDir::optionObjectSizeDump()
515 void
516 Rock::SwapDir::dumpSizeOption(StoreEntry * e) const
517 {
518 storeAppendPrintf(e, " slot-size=%" PRId64, slotSize);
519 }
520
521 /// check the results of the configuration; only level-0 debugging works here
522 void
523 Rock::SwapDir::validateOptions()
524 {
525 if (slotSize <= 0)
526 fatal("Rock store requires a positive slot-size");
527
528 const int64_t maxSizeRoundingWaste = 1024 * 1024; // size is configured in MB
529 const int64_t slotSizeRoundingWaste = slotSize;
530 const int64_t maxRoundingWaste =
531 max(maxSizeRoundingWaste, slotSizeRoundingWaste);
532 const int64_t usableDiskSize = diskOffset(entryLimitAllowed());
533 const int64_t diskWasteSize = maxSize() - usableDiskSize;
534 Must(diskWasteSize >= 0);
535
536 // warn if maximum db size is not reachable due to sfileno limit
537 if (entryLimitAllowed() == entryLimitHigh() &&
538 diskWasteSize >= maxRoundingWaste) {
539 debugs(47, DBG_CRITICAL, "Rock store cache_dir[" << index << "] '" << path << "':");
540 debugs(47, DBG_CRITICAL, "\tmaximum number of entries: " << entryLimitAllowed());
541 debugs(47, DBG_CRITICAL, "\tdb slot size: " << slotSize << " Bytes");
542 debugs(47, DBG_CRITICAL, "\tmaximum db size: " << maxSize() << " Bytes");
543 debugs(47, DBG_CRITICAL, "\tusable db size: " << usableDiskSize << " Bytes");
544 debugs(47, DBG_CRITICAL, "\tdisk space waste: " << diskWasteSize << " Bytes");
545 debugs(47, DBG_CRITICAL, "WARNING: Rock store config wastes space.");
546 }
547 }
548
549 void
550 Rock::SwapDir::rebuild()
551 {
552 //++StoreController::store_dirs_rebuilding; // see Rock::SwapDir::init()
553 AsyncJob::Start(new Rebuild(this));
554 }
555
556 bool
557 Rock::SwapDir::canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const
558 {
559 if (!::SwapDir::canStore(e, sizeof(DbCellHeader)+diskSpaceNeeded, load))
560 return false;
561
562 if (!theFile || !theFile->canWrite())
563 return false;
564
565 if (!map)
566 return false;
567
568 // Do not start I/O transaction if there are less than 10% free pages left.
569 // TODO: reserve page instead
570 if (needsDiskStrand() &&
571 Ipc::Mem::PageLevel(Ipc::Mem::PageId::ioPage) >= 0.9 * Ipc::Mem::PageLimit(Ipc::Mem::PageId::ioPage)) {
572 debugs(47, 5, HERE << "too few shared pages for IPC I/O left");
573 return false;
574 }
575
576 if (io->shedLoad())
577 return false;
578
579 load = io->load();
580 return true;
581 }
582
583 StoreIOState::Pointer
584 Rock::SwapDir::createStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
585 {
586 if (!theFile || theFile->error()) {
587 debugs(47,4, HERE << theFile);
588 return NULL;
589 }
590
591 sfileno filen;
592 Ipc::StoreMapAnchor *const slot =
593 map->openForWriting(reinterpret_cast<const cache_key *>(e.key), filen);
594 if (!slot) {
595 debugs(47, 5, HERE << "map->add failed");
596 return NULL;
597 }
598
599 assert(filen >= 0);
600 slot->set(e);
601
602 // XXX: We rely on our caller, storeSwapOutStart(), to set e.fileno.
603 // If that does not happen, the entry will not decrement the read level!
604
605 Rock::SwapDir::Pointer self(this);
606 IoState *sio = new IoState(self, &e, cbFile, cbIo, data);
607
608 sio->swap_dirn = index;
609 sio->swap_filen = filen;
610 sio->writeableAnchor_ = slot;
611
612 debugs(47,5, HERE << "dir " << index << " created new filen " <<
613 std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
614 sio->swap_filen << std::dec << " starting at " <<
615 diskOffset(sio->swap_filen));
616
617 sio->file(theFile);
618
619 trackReferences(e);
620 return sio;
621 }
622
623 int64_t
624 Rock::SwapDir::diskOffset(int filen) const
625 {
626 assert(filen >= 0);
627 return HeaderSize + slotSize*filen;
628 }
629
630 int64_t
631 Rock::SwapDir::diskOffset(Ipc::Mem::PageId &pageId) const
632 {
633 assert(pageId);
634 return diskOffset(pageId.number - 1);
635 }
636
637 int64_t
638 Rock::SwapDir::diskOffsetLimit() const
639 {
640 assert(map);
641 return diskOffset(map->entryLimit());
642 }
643
644 int
645 Rock::SwapDir::entryMaxPayloadSize() const
646 {
647 return slotSize - sizeof(DbCellHeader);
648 }
649
650 int
651 Rock::SwapDir::entriesNeeded(const int64_t objSize) const
652 {
653 return (objSize + entryMaxPayloadSize() - 1) / entryMaxPayloadSize();
654 }
655
656 bool
657 Rock::SwapDir::useFreeSlot(Ipc::Mem::PageId &pageId)
658 {
659 if (freeSlots->pop(pageId)) {
660 debugs(47, 5, "got a previously free slot: " << pageId);
661 return true;
662 }
663
664 // catch free slots delivered to noteFreeMapSlice()
665 assert(!waitingForPage);
666 waitingForPage = &pageId;
667 if (map->purgeOne()) {
668 assert(!waitingForPage); // noteFreeMapSlice() should have cleared it
669 assert(pageId.set());
670 debugs(47, 5, "got a previously busy slot: " << pageId);
671 return true;
672 }
673 assert(waitingForPage == &pageId);
674 waitingForPage = NULL;
675
676 debugs(47, 3, "cannot get a slot; entries: " << map->entryCount());
677 return false;
678 }
679
680 bool
681 Rock::SwapDir::validSlotId(const SlotId slotId) const
682 {
683 return 0 <= slotId && slotId < entryLimitAllowed();
684 }
685
686 void
687 Rock::SwapDir::noteFreeMapSlice(const sfileno sliceId)
688 {
689 Ipc::Mem::PageId pageId;
690 pageId.pool = index+1;
691 pageId.number = sliceId+1;
692 if (waitingForPage) {
693 *waitingForPage = pageId;
694 waitingForPage = NULL;
695 } else {
696 freeSlots->push(pageId);
697 }
698 }
699
700 // tries to open an old entry with swap_filen for reading
701 StoreIOState::Pointer
702 Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
703 {
704 if (!theFile || theFile->error()) {
705 debugs(47,4, HERE << theFile);
706 return NULL;
707 }
708
709 if (e.swap_filen < 0) {
710 debugs(47,4, HERE << e);
711 return NULL;
712 }
713
714 // Do not start I/O transaction if there are less than 10% free pages left.
715 // TODO: reserve page instead
716 if (needsDiskStrand() &&
717 Ipc::Mem::PageLevel(Ipc::Mem::PageId::ioPage) >= 0.9 * Ipc::Mem::PageLimit(Ipc::Mem::PageId::ioPage)) {
718 debugs(47, 5, HERE << "too few shared pages for IPC I/O left");
719 return NULL;
720 }
721
722 // The are two ways an entry can get swap_filen: our get() locked it for
723 // reading or our storeSwapOutStart() locked it for writing. Peeking at our
724 // locked entry is safe, but no support for reading a filling entry.
725 const Ipc::StoreMapAnchor *slot = map->peekAtReader(e.swap_filen);
726 if (!slot)
727 return NULL; // we were writing afterall
728
729 Rock::SwapDir::Pointer self(this);
730 IoState *sio = new IoState(self, &e, cbFile, cbIo, data);
731
732 sio->swap_dirn = index;
733 sio->swap_filen = e.swap_filen;
734 sio->readableAnchor_ = slot;
735 sio->file(theFile);
736
737 debugs(47,5, HERE << "dir " << index << " has old filen: " <<
738 std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
739 sio->swap_filen);
740
741 assert(slot->sameKey(static_cast<const cache_key*>(e.key)));
742 assert(slot->basics.swap_file_sz > 0);
743 assert(slot->basics.swap_file_sz == e.swap_file_sz);
744
745 return sio;
746 }
747
748 void
749 Rock::SwapDir::ioCompletedNotification()
750 {
751 if (!theFile)
752 fatalf("Rock cache_dir failed to initialize db file: %s", filePath);
753
754 if (theFile->error())
755 fatalf("Rock cache_dir at %s failed to open db file: %s", filePath,
756 xstrerror());
757
758 debugs(47, 2, "Rock cache_dir[" << index << "] limits: " <<
759 std::setw(12) << maxSize() << " disk bytes and " <<
760 std::setw(7) << map->entryLimit() << " entries");
761
762 rebuild();
763 }
764
765 void
766 Rock::SwapDir::closeCompleted()
767 {
768 theFile = NULL;
769 }
770
771 void
772 Rock::SwapDir::readCompleted(const char *buf, int rlen, int errflag, RefCount< ::ReadRequest> r)
773 {
774 ReadRequest *request = dynamic_cast<Rock::ReadRequest*>(r.getRaw());
775 assert(request);
776 IoState::Pointer sio = request->sio;
777
778 if (errflag == DISK_OK && rlen > 0)
779 sio->offset_ += rlen;
780
781 StoreIOState::STRCB *callb = sio->read.callback;
782 assert(callb);
783 sio->read.callback = NULL;
784 void *cbdata;
785 if (cbdataReferenceValidDone(sio->read.callback_data, &cbdata))
786 callb(cbdata, r->buf, rlen, sio.getRaw());
787 }
788
789 void
790 Rock::SwapDir::writeCompleted(int errflag, size_t rlen, RefCount< ::WriteRequest> r)
791 {
792 Rock::WriteRequest *request = dynamic_cast<Rock::WriteRequest*>(r.getRaw());
793 assert(request);
794 assert(request->sio != NULL);
795 IoState &sio = *request->sio;
796
797 // quit if somebody called IoState::close() while we were waiting
798 if (!sio.stillWaiting()) {
799 debugs(79, 3, "ignoring closed entry " << sio.swap_filen);
800 return;
801 }
802
803 if (errflag == DISK_OK) {
804 // do not increment sio.offset_ because we do it in sio->write()
805
806 // finalize the shared slice info after writing slice contents to disk
807 Ipc::StoreMap::Slice &slice =
808 map->writeableSlice(sio.swap_filen, request->sidCurrent);
809 slice.size = request->len - sizeof(DbCellHeader);
810 slice.next = request->sidNext;
811
812 if (request->sidNext < 0) {
813 // close, the entry gets the read lock
814 map->closeForWriting(sio.swap_filen, true);
815 sio.finishedWriting(errflag);
816 }
817 } else {
818 writeError(*sio.e);
819 sio.finishedWriting(errflag);
820 // and hope that Core will call disconnect() to close the map entry
821 }
822
823 CollapsedForwarding::Broadcast(*sio.e);
824 }
825
826 void
827 Rock::SwapDir::writeError(StoreEntry &e)
828 {
829 // Do not abortWriting here. The entry should keep the write lock
830 // instead of losing association with the store and confusing core.
831 map->freeEntry(e.swap_filen); // will mark as unusable, just in case
832
833 Store::Root().transientsAbandon(e);
834
835 // All callers must also call IoState callback, to propagate the error.
836 }
837
838 bool
839 Rock::SwapDir::full() const
840 {
841 return freeSlots != NULL && !freeSlots->size();
842 }
843
844 // storeSwapOutFileClosed calls this nethod on DISK_NO_SPACE_LEFT,
845 // but it should not happen for us
846 void
847 Rock::SwapDir::diskFull()
848 {
849 debugs(20, DBG_IMPORTANT, "BUG: No space left with rock cache_dir: " <<
850 filePath);
851 }
852
853 /// purge while full(); it should be sufficient to purge just one
854 void
855 Rock::SwapDir::maintain()
856 {
857 // The Store calls this to free some db space, but there is nothing wrong
858 // with a full() db, except when db has to shrink after reconfigure, and
859 // we do not support shrinking yet (it would have to purge specific slots).
860 // TODO: Disable maintain() requests when they are pointless.
861 }
862
863 void
864 Rock::SwapDir::reference(StoreEntry &e)
865 {
866 debugs(47, 5, HERE << &e << ' ' << e.swap_dirn << ' ' << e.swap_filen);
867 if (repl && repl->Referenced)
868 repl->Referenced(repl, &e, &e.repl);
869 }
870
871 bool
872 Rock::SwapDir::dereference(StoreEntry &e, bool)
873 {
874 debugs(47, 5, HERE << &e << ' ' << e.swap_dirn << ' ' << e.swap_filen);
875 if (repl && repl->Dereferenced)
876 repl->Dereferenced(repl, &e, &e.repl);
877
878 // no need to keep e in the global store_table for us; we have our own map
879 return false;
880 }
881
882 bool
883 Rock::SwapDir::unlinkdUseful() const
884 {
885 // no entry-specific files to unlink
886 return false;
887 }
888
889 void
890 Rock::SwapDir::unlink(StoreEntry &e)
891 {
892 debugs(47, 5, HERE << e);
893 ignoreReferences(e);
894 map->freeEntry(e.swap_filen);
895 disconnect(e);
896 }
897
898 void
899 Rock::SwapDir::markForUnlink(StoreEntry &e)
900 {
901 debugs(47, 5, e);
902 map->freeEntry(e.swap_filen);
903 }
904
905 void
906 Rock::SwapDir::trackReferences(StoreEntry &e)
907 {
908 debugs(47, 5, HERE << e);
909 if (repl)
910 repl->Add(repl, &e, &e.repl);
911 }
912
913 void
914 Rock::SwapDir::ignoreReferences(StoreEntry &e)
915 {
916 debugs(47, 5, HERE << e);
917 if (repl)
918 repl->Remove(repl, &e, &e.repl);
919 }
920
921 void
922 Rock::SwapDir::statfs(StoreEntry &e) const
923 {
924 storeAppendPrintf(&e, "\n");
925 storeAppendPrintf(&e, "Maximum Size: %" PRIu64 " KB\n", maxSize() >> 10);
926 storeAppendPrintf(&e, "Current Size: %.2f KB %.2f%%\n",
927 currentSize() / 1024.0,
928 Math::doublePercent(currentSize(), maxSize()));
929
930 if (map) {
931 const int limit = map->entryLimit();
932 storeAppendPrintf(&e, "Maximum entries: %9d\n", limit);
933 if (limit > 0) {
934 const int entryCount = map->entryCount();
935 storeAppendPrintf(&e, "Current entries: %9d %.2f%%\n",
936 entryCount, (100.0 * entryCount / limit));
937
938 const unsigned int slotsFree = !freeSlots ? 0 : freeSlots->size();
939 if (slotsFree <= static_cast<const unsigned int>(limit)) {
940 const int usedSlots = limit - static_cast<const int>(slotsFree);
941 storeAppendPrintf(&e, "Used slots: %9d %.2f%%\n",
942 usedSlots, (100.0 * usedSlots / limit));
943 }
944 if (limit < 100) { // XXX: otherwise too expensive to count
945 Ipc::ReadWriteLockStats stats;
946 map->updateStats(stats);
947 stats.dump(e);
948 }
949 }
950 }
951
952 storeAppendPrintf(&e, "Pending operations: %d out of %d\n",
953 store_open_disk_fd, Config.max_open_disk_fds);
954
955 storeAppendPrintf(&e, "Flags:");
956
957 if (flags.selected)
958 storeAppendPrintf(&e, " SELECTED");
959
960 if (flags.read_only)
961 storeAppendPrintf(&e, " READ-ONLY");
962
963 storeAppendPrintf(&e, "\n");
964
965 }
966
967 const char *
968 Rock::SwapDir::inodeMapPath() const {
969 static String inodesPath;
970 inodesPath = path;
971 inodesPath.append("_inodes");
972 return inodesPath.termedBuf();
973 }
974
975 const char *
976 Rock::SwapDir::freeSlotsPath() const {
977 static String spacesPath;
978 spacesPath = path;
979 spacesPath.append("_spaces");
980 return spacesPath.termedBuf();
981 }
982
983 namespace Rock
984 {
985 RunnerRegistrationEntry(rrAfterConfig, SwapDirRr);
986 }
987
988 void Rock::SwapDirRr::create(const RunnerRegistry &)
989 {
990 Must(mapOwners.empty() && freeSlotsOwners.empty());
991 for (int i = 0; i < Config.cacheSwap.n_configured; ++i) {
992 if (const Rock::SwapDir *const sd = dynamic_cast<Rock::SwapDir *>(INDEXSD(i))) {
993 const int64_t capacity = sd->entryLimitAllowed();
994
995 SwapDir::DirMap::Owner *const mapOwner =
996 SwapDir::DirMap::Init(sd->inodeMapPath(), capacity);
997 mapOwners.push_back(mapOwner);
998
999 // XXX: remove pool id and counters from PageStack
1000 Ipc::Mem::Owner<Ipc::Mem::PageStack> *const freeSlotsOwner =
1001 shm_new(Ipc::Mem::PageStack)(sd->freeSlotsPath(),
1002 i+1, capacity,
1003 sizeof(DbCellHeader));
1004 freeSlotsOwners.push_back(freeSlotsOwner);
1005
1006 // XXX: add method to initialize PageStack with no free pages
1007 while (true) {
1008 Ipc::Mem::PageId pageId;
1009 if (!freeSlotsOwner->object()->pop(pageId))
1010 break;
1011 }
1012 }
1013 }
1014 }
1015
1016 Rock::SwapDirRr::~SwapDirRr()
1017 {
1018 for (size_t i = 0; i < mapOwners.size(); ++i) {
1019 delete mapOwners[i];
1020 delete freeSlotsOwners[i];
1021 }
1022 }