]> git.ipfire.org Git - thirdparty/squid.git/blob - src/fs/rock/RockSwapDir.cc
Merged from parent (large-rock r12530 including trunk r12732; v3.3.3+).
[thirdparty/squid.git] / src / fs / rock / RockSwapDir.cc
1 /*
2 * DEBUG: section 47 Store Directory Routines
3 */
4
5 #include "squid.h"
6 #include "cache_cf.h"
7 #include "ConfigOption.h"
8 #include "DiskIO/DiskIOModule.h"
9 #include "DiskIO/DiskIOStrategy.h"
10 #include "DiskIO/ReadRequest.h"
11 #include "DiskIO/WriteRequest.h"
12 #include "fs/rock/RockSwapDir.h"
13 #include "fs/rock/RockIoState.h"
14 #include "fs/rock/RockIoRequests.h"
15 #include "fs/rock/RockRebuild.h"
16 #include "globals.h"
17 #include "ipc/mem/Pages.h"
18 #include "MemObject.h"
19 #include "Parsing.h"
20 #include "SquidConfig.h"
21 #include "SquidMath.h"
22 #include "tools.h"
23
24 #include <cstdlib>
25 #include <iomanip>
26
27 #if HAVE_SYS_STAT_H
28 #include <sys/stat.h>
29 #endif
30
31 const int64_t Rock::SwapDir::HeaderSize = 16*1024;
32
33 Rock::SwapDir::SwapDir(): ::SwapDir("rock"),
34 slotSize(HeaderSize), filePath(NULL), map(NULL), io(NULL),
35 waitingForPage(NULL)
36 {
37 }
38
39 Rock::SwapDir::~SwapDir()
40 {
41 delete io;
42 delete map;
43 safe_free(filePath);
44 }
45
46 StoreSearch *
47 Rock::SwapDir::search(String const url, HttpRequest *)
48 {
49 assert(false);
50 return NULL; // XXX: implement
51 }
52
53 void
54 Rock::SwapDir::get(String const key, STOREGETCLIENT cb, void *data)
55 {
56 ::SwapDir::get(key, cb, data);
57 }
58
59 // called when Squid core needs a StoreEntry with a given key
60 StoreEntry *
61 Rock::SwapDir::get(const cache_key *key)
62 {
63 if (!map || !theFile || !theFile->canRead())
64 return NULL;
65
66 sfileno filen;
67 const Ipc::StoreMapAnchor *const slot = map->openForReading(key, filen);
68 if (!slot)
69 return NULL;
70
71 const Ipc::StoreMapAnchor::Basics &basics = slot->basics;
72
73 // create a brand new store entry and initialize it with stored basics
74 StoreEntry *e = new StoreEntry();
75 e->lock_count = 0;
76 e->swap_dirn = index;
77 e->swap_filen = filen;
78 e->swap_file_sz = basics.swap_file_sz;
79 e->lastref = basics.lastref;
80 e->timestamp = basics.timestamp;
81 e->expires = basics.expires;
82 e->lastmod = basics.lastmod;
83 e->refcount = basics.refcount;
84 e->flags = basics.flags;
85 e->store_status = STORE_OK;
86 e->setMemStatus(NOT_IN_MEMORY);
87 e->swap_status = SWAPOUT_DONE;
88 e->ping_status = PING_NONE;
89 EBIT_SET(e->flags, ENTRY_CACHABLE);
90 EBIT_CLR(e->flags, RELEASE_REQUEST);
91 EBIT_CLR(e->flags, KEY_PRIVATE);
92 EBIT_SET(e->flags, ENTRY_VALIDATED);
93 e->hashInsert(key);
94 trackReferences(*e);
95
96 return e;
97 // the disk entry remains open for reading, protected from modifications
98 }
99
100 void Rock::SwapDir::disconnect(StoreEntry &e)
101 {
102 assert(e.swap_dirn == index);
103 assert(e.swap_filen >= 0);
104 // cannot have SWAPOUT_NONE entry with swap_filen >= 0
105 assert(e.swap_status != SWAPOUT_NONE);
106
107 // do not rely on e.swap_status here because there is an async delay
108 // before it switches from SWAPOUT_WRITING to SWAPOUT_DONE.
109
110 // since e has swap_filen, its slot is locked for either reading or writing
111 map->abortIo(e.swap_filen);
112 e.swap_dirn = -1;
113 e.swap_filen = -1;
114 e.swap_status = SWAPOUT_NONE;
115 }
116
117 uint64_t
118 Rock::SwapDir::currentSize() const
119 {
120 const uint64_t spaceSize = !freeSlots ?
121 maxSize() : (slotSize * freeSlots->size());
122 // everything that is not free is in use
123 return maxSize() - spaceSize;
124 }
125
126 uint64_t
127 Rock::SwapDir::currentCount() const
128 {
129 return map ? map->entryCount() : 0;
130 }
131
132 /// In SMP mode only the disker process reports stats to avoid
133 /// counting the same stats by multiple processes.
134 bool
135 Rock::SwapDir::doReportStat() const
136 {
137 return ::SwapDir::doReportStat() && (!UsingSmp() || IamDiskProcess());
138 }
139
140 void
141 Rock::SwapDir::swappedOut(const StoreEntry &)
142 {
143 // stats are not stored but computed when needed
144 }
145
146 int64_t
147 Rock::SwapDir::entryLimitAllowed() const
148 {
149 const int64_t eLimitLo = map ? map->entryLimit() : 0; // dynamic shrinking unsupported
150 const int64_t eWanted = (maxSize() - HeaderSize)/slotSize;
151 return min(max(eLimitLo, eWanted), entryLimitHigh());
152 }
153
154 // TODO: encapsulate as a tool; identical to CossSwapDir::create()
155 void
156 Rock::SwapDir::create()
157 {
158 assert(path);
159 assert(filePath);
160
161 if (UsingSmp() && !IamDiskProcess()) {
162 debugs (47,3, HERE << "disker will create in " << path);
163 return;
164 }
165
166 debugs (47,3, HERE << "creating in " << path);
167
168 struct stat dir_sb;
169 if (::stat(path, &dir_sb) == 0) {
170 struct stat file_sb;
171 if (::stat(filePath, &file_sb) == 0) {
172 debugs (47, DBG_IMPORTANT, "Skipping existing Rock db: " << filePath);
173 return;
174 }
175 // else the db file is not there or is not accessible, and we will try
176 // to create it later below, generating a detailed error on failures.
177 } else { // path does not exist or is inaccessible
178 // If path exists but is not accessible, mkdir() below will fail, and
179 // the admin should see the error and act accordingly, so there is
180 // no need to distinguish ENOENT from other possible stat() errors.
181 debugs (47, DBG_IMPORTANT, "Creating Rock db directory: " << path);
182 const int res = mkdir(path, 0700);
183 if (res != 0)
184 createError("mkdir");
185 }
186
187 debugs (47, DBG_IMPORTANT, "Creating Rock db: " << filePath);
188 const int swap = open(filePath, O_WRONLY|O_CREAT|O_TRUNC|O_BINARY, 0600);
189 if (swap < 0)
190 createError("create");
191
192 #if SLOWLY_FILL_WITH_ZEROS
193 char block[1024];
194 Must(maxSize() % sizeof(block) == 0);
195 memset(block, '\0', sizeof(block));
196
197 for (off_t offset = 0; offset < maxSize(); offset += sizeof(block)) {
198 if (write(swap, block, sizeof(block)) != sizeof(block))
199 createError("write");
200 }
201 #else
202 if (ftruncate(swap, maxSize()) != 0)
203 createError("truncate");
204
205 char header[HeaderSize];
206 memset(header, '\0', sizeof(header));
207 if (write(swap, header, sizeof(header)) != sizeof(header))
208 createError("write");
209 #endif
210
211 close(swap);
212 }
213
214 // report Rock DB creation error and exit
215 void
216 Rock::SwapDir::createError(const char *const msg) {
217 debugs(47, DBG_CRITICAL, "ERROR: Failed to initialize Rock Store db in " <<
218 filePath << "; " << msg << " error: " << xstrerror());
219 fatal("Rock Store db creation error");
220 }
221
222 void
223 Rock::SwapDir::init()
224 {
225 debugs(47,2, HERE);
226
227 // XXX: SwapDirs aren't refcounted. We make IORequestor calls, which
228 // are refcounted. We up our count once to avoid implicit delete's.
229 lock();
230
231 freeSlots = shm_old(Ipc::Mem::PageStack)(freeSlotsPath());
232
233 Must(!map);
234 map = new DirMap(inodeMapPath());
235 map->cleaner = this;
236
237 const char *ioModule = needsDiskStrand() ? "IpcIo" : "Blocking";
238 if (DiskIOModule *m = DiskIOModule::Find(ioModule)) {
239 debugs(47,2, HERE << "Using DiskIO module: " << ioModule);
240 io = m->createStrategy();
241 io->init();
242 } else {
243 debugs(47, DBG_CRITICAL, "FATAL: Rock store is missing DiskIO module: " <<
244 ioModule);
245 fatal("Rock Store missing a required DiskIO module");
246 }
247
248 theFile = io->newFile(filePath);
249 theFile->configure(fileConfig);
250 theFile->open(O_RDWR, 0644, this);
251
252 // Increment early. Otherwise, if one SwapDir finishes rebuild before
253 // others start, storeRebuildComplete() will think the rebuild is over!
254 // TODO: move store_dirs_rebuilding hack to store modules that need it.
255 ++StoreController::store_dirs_rebuilding;
256 }
257
258 bool
259 Rock::SwapDir::needsDiskStrand() const
260 {
261 const bool wontEvenWorkWithoutDisker = Config.workers > 1;
262 const bool wouldWorkBetterWithDisker = DiskIOModule::Find("IpcIo");
263 return InDaemonMode() && (wontEvenWorkWithoutDisker ||
264 wouldWorkBetterWithDisker);
265 }
266
267 void
268 Rock::SwapDir::parse(int anIndex, char *aPath)
269 {
270 index = anIndex;
271
272 path = xstrdup(aPath);
273
274 // cache store is located at path/db
275 String fname(path);
276 fname.append("/rock");
277 filePath = xstrdup(fname.termedBuf());
278
279 parseSize(false);
280 parseOptions(0);
281
282 // Current openForWriting() code overwrites the old slot if needed
283 // and possible, so proactively removing old slots is probably useless.
284 assert(!repl); // repl = createRemovalPolicy(Config.replPolicy);
285
286 validateOptions();
287 }
288
289 void
290 Rock::SwapDir::reconfigure()
291 {
292 parseSize(true);
293 parseOptions(1);
294 // TODO: can we reconfigure the replacement policy (repl)?
295 validateOptions();
296 }
297
298 /// parse maximum db disk size
299 void
300 Rock::SwapDir::parseSize(const bool reconfig)
301 {
302 const int i = GetInteger();
303 if (i < 0)
304 fatal("negative Rock cache_dir size value");
305 const uint64_t new_max_size =
306 static_cast<uint64_t>(i) << 20; // MBytes to Bytes
307 if (!reconfig)
308 max_size = new_max_size;
309 else if (new_max_size != max_size) {
310 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir '" << path << "' size "
311 "cannot be changed dynamically, value left unchanged (" <<
312 (max_size >> 20) << " MB)");
313 }
314 }
315
316 ConfigOption *
317 Rock::SwapDir::getOptionTree() const
318 {
319 ConfigOptionVector *vector = dynamic_cast<ConfigOptionVector*>(::SwapDir::getOptionTree());
320 assert(vector);
321 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseSizeOption, &SwapDir::dumpSizeOption));
322 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseTimeOption, &SwapDir::dumpTimeOption));
323 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseRateOption, &SwapDir::dumpRateOption));
324 return vector;
325 }
326
327 bool
328 Rock::SwapDir::allowOptionReconfigure(const char *const option) const
329 {
330 return strcmp(option, "slot-size") != 0 &&
331 ::SwapDir::allowOptionReconfigure(option);
332 }
333
334 /// parses time-specific options; mimics ::SwapDir::optionObjectSizeParse()
335 bool
336 Rock::SwapDir::parseTimeOption(char const *option, const char *value, int reconfig)
337 {
338 // TODO: ::SwapDir or, better, Config should provide time-parsing routines,
339 // including time unit handling. Same for size and rate.
340
341 time_msec_t *storedTime;
342 if (strcmp(option, "swap-timeout") == 0)
343 storedTime = &fileConfig.ioTimeout;
344 else
345 return false;
346
347 if (!value)
348 self_destruct();
349
350 // TODO: handle time units and detect parsing errors better
351 const int64_t parsedValue = strtoll(value, NULL, 10);
352 if (parsedValue < 0) {
353 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
354 self_destruct();
355 }
356
357 const time_msec_t newTime = static_cast<time_msec_t>(parsedValue);
358
359 if (!reconfig)
360 *storedTime = newTime;
361 else if (*storedTime != newTime) {
362 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
363 << " cannot be changed dynamically, value left unchanged: " <<
364 *storedTime);
365 }
366
367 return true;
368 }
369
370 /// reports time-specific options; mimics ::SwapDir::optionObjectSizeDump()
371 void
372 Rock::SwapDir::dumpTimeOption(StoreEntry * e) const
373 {
374 if (fileConfig.ioTimeout)
375 storeAppendPrintf(e, " swap-timeout=%" PRId64,
376 static_cast<int64_t>(fileConfig.ioTimeout));
377 }
378
379 /// parses rate-specific options; mimics ::SwapDir::optionObjectSizeParse()
380 bool
381 Rock::SwapDir::parseRateOption(char const *option, const char *value, int isaReconfig)
382 {
383 int *storedRate;
384 if (strcmp(option, "max-swap-rate") == 0)
385 storedRate = &fileConfig.ioRate;
386 else
387 return false;
388
389 if (!value)
390 self_destruct();
391
392 // TODO: handle time units and detect parsing errors better
393 const int64_t parsedValue = strtoll(value, NULL, 10);
394 if (parsedValue < 0) {
395 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
396 self_destruct();
397 }
398
399 const int newRate = static_cast<int>(parsedValue);
400
401 if (newRate < 0) {
402 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << newRate);
403 self_destruct();
404 }
405
406 if (!isaReconfig)
407 *storedRate = newRate;
408 else if (*storedRate != newRate) {
409 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
410 << " cannot be changed dynamically, value left unchanged: " <<
411 *storedRate);
412 }
413
414 return true;
415 }
416
417 /// reports rate-specific options; mimics ::SwapDir::optionObjectSizeDump()
418 void
419 Rock::SwapDir::dumpRateOption(StoreEntry * e) const
420 {
421 if (fileConfig.ioRate >= 0)
422 storeAppendPrintf(e, " max-swap-rate=%d", fileConfig.ioRate);
423 }
424
425 /// parses size-specific options; mimics ::SwapDir::optionObjectSizeParse()
426 bool
427 Rock::SwapDir::parseSizeOption(char const *option, const char *value, int reconfig)
428 {
429 uint64_t *storedSize;
430 if (strcmp(option, "slot-size") == 0)
431 storedSize = &slotSize;
432 else
433 return false;
434
435 if (!value)
436 self_destruct();
437
438 // TODO: handle size units and detect parsing errors better
439 const uint64_t newSize = strtoll(value, NULL, 10);
440 if (newSize <= 0) {
441 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must be positive; got: " << newSize);
442 self_destruct();
443 }
444
445 if (newSize <= sizeof(DbCellHeader)) {
446 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must exceed " << sizeof(DbCellHeader) << "; got: " << newSize);
447 self_destruct();
448 }
449
450 if (!reconfig)
451 *storedSize = newSize;
452 else if (*storedSize != newSize) {
453 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
454 << " cannot be changed dynamically, value left unchanged: " <<
455 *storedSize);
456 }
457
458 return true;
459 }
460
461 /// reports size-specific options; mimics ::SwapDir::optionObjectSizeDump()
462 void
463 Rock::SwapDir::dumpSizeOption(StoreEntry * e) const
464 {
465 storeAppendPrintf(e, " slot-size=%" PRId64, slotSize);
466 }
467
468 /// check the results of the configuration; only level-0 debugging works here
469 void
470 Rock::SwapDir::validateOptions()
471 {
472 if (slotSize <= 0)
473 fatal("Rock store requires a positive slot-size");
474
475 const int64_t maxSizeRoundingWaste = 1024 * 1024; // size is configured in MB
476 const int64_t slotSizeRoundingWaste = slotSize;
477 const int64_t maxRoundingWaste =
478 max(maxSizeRoundingWaste, slotSizeRoundingWaste);
479 const int64_t usableDiskSize = diskOffset(entryLimitAllowed());
480 const int64_t diskWasteSize = maxSize() - usableDiskSize;
481 Must(diskWasteSize >= 0);
482
483 // warn if maximum db size is not reachable due to sfileno limit
484 if (entryLimitAllowed() == entryLimitHigh() &&
485 diskWasteSize >= maxRoundingWaste) {
486 debugs(47, DBG_CRITICAL, "Rock store cache_dir[" << index << "] '" << path << "':");
487 debugs(47, DBG_CRITICAL, "\tmaximum number of entries: " << entryLimitAllowed());
488 debugs(47, DBG_CRITICAL, "\tdb slot size: " << slotSize << " Bytes");
489 debugs(47, DBG_CRITICAL, "\tmaximum db size: " << maxSize() << " Bytes");
490 debugs(47, DBG_CRITICAL, "\tusable db size: " << usableDiskSize << " Bytes");
491 debugs(47, DBG_CRITICAL, "\tdisk space waste: " << diskWasteSize << " Bytes");
492 debugs(47, DBG_CRITICAL, "WARNING: Rock store config wastes space.");
493 }
494 }
495
496 void
497 Rock::SwapDir::rebuild()
498 {
499 //++StoreController::store_dirs_rebuilding; // see Rock::SwapDir::init()
500 AsyncJob::Start(new Rebuild(this));
501 }
502
503 bool
504 Rock::SwapDir::canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const
505 {
506 if (!::SwapDir::canStore(e, sizeof(DbCellHeader)+diskSpaceNeeded, load))
507 return false;
508
509 if (!theFile || !theFile->canWrite())
510 return false;
511
512 if (!map)
513 return false;
514
515 // Do not start I/O transaction if there are less than 10% free pages left.
516 // TODO: reserve page instead
517 if (needsDiskStrand() &&
518 Ipc::Mem::PageLevel(Ipc::Mem::PageId::ioPage) >= 0.9 * Ipc::Mem::PageLimit(Ipc::Mem::PageId::ioPage)) {
519 debugs(47, 5, HERE << "too few shared pages for IPC I/O left");
520 return false;
521 }
522
523 if (io->shedLoad())
524 return false;
525
526 load = io->load();
527 return true;
528 }
529
530 StoreIOState::Pointer
531 Rock::SwapDir::createStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
532 {
533 if (!theFile || theFile->error()) {
534 debugs(47,4, HERE << theFile);
535 return NULL;
536 }
537
538 sfileno filen;
539 Ipc::StoreMapAnchor *const slot =
540 map->openForWriting(reinterpret_cast<const cache_key *>(e.key), filen);
541 if (!slot) {
542 debugs(47, 5, HERE << "map->add failed");
543 return NULL;
544 }
545
546 assert(filen >= 0);
547 slot->set(e);
548
549 // XXX: We rely on our caller, storeSwapOutStart(), to set e.fileno.
550 // If that does not happen, the entry will not decrement the read level!
551
552 Rock::SwapDir::Pointer self(this);
553 IoState *sio = new IoState(self, &e, cbFile, cbIo, data);
554
555 sio->swap_dirn = index;
556 sio->swap_filen = filen;
557 sio->writeableAnchor_ = slot;
558
559 debugs(47,5, HERE << "dir " << index << " created new filen " <<
560 std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
561 sio->swap_filen << std::dec << " starting at " <<
562 diskOffset(sio->swap_filen));
563
564 sio->file(theFile);
565
566 trackReferences(e);
567 return sio;
568 }
569
570 int64_t
571 Rock::SwapDir::diskOffset(int filen) const
572 {
573 assert(filen >= 0);
574 return HeaderSize + slotSize*filen;
575 }
576
577 int64_t
578 Rock::SwapDir::diskOffset(Ipc::Mem::PageId &pageId) const
579 {
580 assert(pageId);
581 return diskOffset(pageId.number - 1);
582 }
583
584 int64_t
585 Rock::SwapDir::diskOffsetLimit() const
586 {
587 assert(map);
588 return diskOffset(map->entryLimit());
589 }
590
591 int
592 Rock::SwapDir::entryMaxPayloadSize() const
593 {
594 return slotSize - sizeof(DbCellHeader);
595 }
596
597 int
598 Rock::SwapDir::entriesNeeded(const int64_t objSize) const
599 {
600 return (objSize + entryMaxPayloadSize() - 1) / entryMaxPayloadSize();
601 }
602
603 bool
604 Rock::SwapDir::useFreeSlot(Ipc::Mem::PageId &pageId)
605 {
606 if (freeSlots->pop(pageId)) {
607 debugs(47, 5, "got a previously free slot: " << pageId);
608 return true;
609 }
610
611 // catch free slots delivered to noteFreeMapSlice()
612 assert(!waitingForPage);
613 waitingForPage = &pageId;
614 if (map->purgeOne()) {
615 assert(!waitingForPage); // noteFreeMapSlice() should have cleared it
616 assert(pageId.set());
617 debugs(47, 5, "got a previously busy slot: " << pageId);
618 return true;
619 }
620 assert(waitingForPage == &pageId);
621 waitingForPage = NULL;
622
623 debugs(47, 3, "cannot get a slot; entries: " << map->entryCount());
624 return false;
625 }
626
627 bool
628 Rock::SwapDir::validSlotId(const SlotId slotId) const
629 {
630 return 0 <= slotId && slotId < entryLimitAllowed();
631 }
632
633 void
634 Rock::SwapDir::noteFreeMapSlice(const sfileno sliceId)
635 {
636 Ipc::Mem::PageId pageId;
637 pageId.pool = index+1;
638 pageId.number = sliceId+1;
639 if (waitingForPage) {
640 *waitingForPage = pageId;
641 waitingForPage = NULL;
642 } else {
643 freeSlots->push(pageId);
644 }
645 }
646
647 // tries to open an old entry with swap_filen for reading
648 StoreIOState::Pointer
649 Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
650 {
651 if (!theFile || theFile->error()) {
652 debugs(47,4, HERE << theFile);
653 return NULL;
654 }
655
656 if (e.swap_filen < 0) {
657 debugs(47,4, HERE << e);
658 return NULL;
659 }
660
661 // Do not start I/O transaction if there are less than 10% free pages left.
662 // TODO: reserve page instead
663 if (needsDiskStrand() &&
664 Ipc::Mem::PageLevel(Ipc::Mem::PageId::ioPage) >= 0.9 * Ipc::Mem::PageLimit(Ipc::Mem::PageId::ioPage)) {
665 debugs(47, 5, HERE << "too few shared pages for IPC I/O left");
666 return NULL;
667 }
668
669 // The are two ways an entry can get swap_filen: our get() locked it for
670 // reading or our storeSwapOutStart() locked it for writing. Peeking at our
671 // locked entry is safe, but no support for reading a filling entry.
672 const Ipc::StoreMapAnchor *slot = map->peekAtReader(e.swap_filen);
673 if (!slot)
674 return NULL; // we were writing afterall
675
676 Rock::SwapDir::Pointer self(this);
677 IoState *sio = new IoState(self, &e, cbFile, cbIo, data);
678
679 sio->swap_dirn = index;
680 sio->swap_filen = e.swap_filen;
681 sio->readableAnchor_ = slot;
682 sio->file(theFile);
683
684 debugs(47,5, HERE << "dir " << index << " has old filen: " <<
685 std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
686 sio->swap_filen);
687
688 assert(slot->sameKey(static_cast<const cache_key*>(e.key)));
689 assert(slot->basics.swap_file_sz > 0);
690 assert(slot->basics.swap_file_sz == e.swap_file_sz);
691
692 return sio;
693 }
694
695 void
696 Rock::SwapDir::ioCompletedNotification()
697 {
698 if (!theFile)
699 fatalf("Rock cache_dir failed to initialize db file: %s", filePath);
700
701 if (theFile->error())
702 fatalf("Rock cache_dir at %s failed to open db file: %s", filePath,
703 xstrerror());
704
705 debugs(47, 2, "Rock cache_dir[" << index << "] limits: " <<
706 std::setw(12) << maxSize() << " disk bytes and " <<
707 std::setw(7) << map->entryLimit() << " entries");
708
709 rebuild();
710 }
711
712 void
713 Rock::SwapDir::closeCompleted()
714 {
715 theFile = NULL;
716 }
717
718 void
719 Rock::SwapDir::readCompleted(const char *buf, int rlen, int errflag, RefCount< ::ReadRequest> r)
720 {
721 ReadRequest *request = dynamic_cast<Rock::ReadRequest*>(r.getRaw());
722 assert(request);
723 IoState::Pointer sio = request->sio;
724
725 if (errflag == DISK_OK && rlen > 0)
726 sio->offset_ += rlen;
727
728 StoreIOState::STRCB *callb = sio->read.callback;
729 assert(callb);
730 sio->read.callback = NULL;
731 void *cbdata;
732 if (cbdataReferenceValidDone(sio->read.callback_data, &cbdata))
733 callb(cbdata, r->buf, rlen, sio.getRaw());
734 }
735
736 void
737 Rock::SwapDir::writeCompleted(int errflag, size_t rlen, RefCount< ::WriteRequest> r)
738 {
739 Rock::WriteRequest *request = dynamic_cast<Rock::WriteRequest*>(r.getRaw());
740 assert(request);
741 assert(request->sio != NULL);
742 IoState &sio = *request->sio;
743
744 // quit if somebody called IoState::close() while we were waiting
745 if (!sio.stillWaiting()) {
746 debugs(79, 3, "ignoring closed entry " << sio.swap_filen);
747 return;
748 }
749
750 if (errflag == DISK_OK) {
751 // do not increment sio.offset_ because we do it in sio->write()
752 if (request->isLast) {
753 // close, the entry gets the read lock
754 map->closeForWriting(sio.swap_filen, true);
755 sio.finishedWriting(errflag);
756 }
757 } else {
758 writeError(sio.swap_filen);
759 sio.finishedWriting(errflag);
760 // and hope that Core will call disconnect() to close the map entry
761 }
762 }
763
764 void
765 Rock::SwapDir::writeError(const sfileno fileno)
766 {
767 // Do not abortWriting here. The entry should keep the write lock
768 // instead of losing association with the store and confusing core.
769 map->freeEntry(fileno); // will mark as unusable, just in case
770 // All callers must also call IoState callback, to propagate the error.
771 }
772
773 bool
774 Rock::SwapDir::full() const
775 {
776 return freeSlots != NULL && !freeSlots->size();
777 }
778
779 // storeSwapOutFileClosed calls this nethod on DISK_NO_SPACE_LEFT,
780 // but it should not happen for us
781 void
782 Rock::SwapDir::diskFull()
783 {
784 debugs(20, DBG_IMPORTANT, "BUG: No space left with rock cache_dir: " <<
785 filePath);
786 }
787
788 /// purge while full(); it should be sufficient to purge just one
789 void
790 Rock::SwapDir::maintain()
791 {
792 // The Store calls this to free some db space, but there is nothing wrong
793 // with a full() db, except when db has to shrink after reconfigure, and
794 // we do not support shrinking yet (it would have to purge specific slots).
795 // TODO: Disable maintain() requests when they are pointless.
796 }
797
798 void
799 Rock::SwapDir::reference(StoreEntry &e)
800 {
801 debugs(47, 5, HERE << &e << ' ' << e.swap_dirn << ' ' << e.swap_filen);
802 if (repl && repl->Referenced)
803 repl->Referenced(repl, &e, &e.repl);
804 }
805
806 bool
807 Rock::SwapDir::dereference(StoreEntry &e, bool)
808 {
809 debugs(47, 5, HERE << &e << ' ' << e.swap_dirn << ' ' << e.swap_filen);
810 if (repl && repl->Dereferenced)
811 repl->Dereferenced(repl, &e, &e.repl);
812
813 // no need to keep e in the global store_table for us; we have our own map
814 return false;
815 }
816
817 bool
818 Rock::SwapDir::unlinkdUseful() const
819 {
820 // no entry-specific files to unlink
821 return false;
822 }
823
824 void
825 Rock::SwapDir::unlink(StoreEntry &e)
826 {
827 debugs(47, 5, HERE << e);
828 ignoreReferences(e);
829 map->freeEntry(e.swap_filen);
830 disconnect(e);
831 }
832
833 void
834 Rock::SwapDir::trackReferences(StoreEntry &e)
835 {
836 debugs(47, 5, HERE << e);
837 if (repl)
838 repl->Add(repl, &e, &e.repl);
839 }
840
841 void
842 Rock::SwapDir::ignoreReferences(StoreEntry &e)
843 {
844 debugs(47, 5, HERE << e);
845 if (repl)
846 repl->Remove(repl, &e, &e.repl);
847 }
848
849 void
850 Rock::SwapDir::statfs(StoreEntry &e) const
851 {
852 storeAppendPrintf(&e, "\n");
853 storeAppendPrintf(&e, "Maximum Size: %" PRIu64 " KB\n", maxSize() >> 10);
854 storeAppendPrintf(&e, "Current Size: %.2f KB %.2f%%\n",
855 currentSize() / 1024.0,
856 Math::doublePercent(currentSize(), maxSize()));
857
858 if (map) {
859 const int limit = map->entryLimit();
860 storeAppendPrintf(&e, "Maximum entries: %9d\n", limit);
861 if (limit > 0) {
862 const int entryCount = map->entryCount();
863 storeAppendPrintf(&e, "Current entries: %9d %.2f%%\n",
864 entryCount, (100.0 * entryCount / limit));
865
866 const unsigned int slotsFree = !freeSlots ? 0 : freeSlots->size();
867 if (slotsFree <= static_cast<const unsigned int>(limit)) {
868 const int usedSlots = limit - static_cast<const int>(slotsFree);
869 storeAppendPrintf(&e, "Used slots: %9d %.2f%%\n",
870 usedSlots, (100.0 * usedSlots / limit));
871 }
872 if (limit < 100) { // XXX: otherwise too expensive to count
873 Ipc::ReadWriteLockStats stats;
874 map->updateStats(stats);
875 stats.dump(e);
876 }
877 }
878 }
879
880 storeAppendPrintf(&e, "Pending operations: %d out of %d\n",
881 store_open_disk_fd, Config.max_open_disk_fds);
882
883 storeAppendPrintf(&e, "Flags:");
884
885 if (flags.selected)
886 storeAppendPrintf(&e, " SELECTED");
887
888 if (flags.read_only)
889 storeAppendPrintf(&e, " READ-ONLY");
890
891 storeAppendPrintf(&e, "\n");
892
893 }
894
895 const char *
896 Rock::SwapDir::inodeMapPath() const {
897 static String inodesPath;
898 inodesPath = path;
899 inodesPath.append("_inodes");
900 return inodesPath.termedBuf();
901 }
902
903 const char *
904 Rock::SwapDir::freeSlotsPath() const {
905 static String spacesPath;
906 spacesPath = path;
907 spacesPath.append("_spaces");
908 return spacesPath.termedBuf();
909 }
910
911 namespace Rock
912 {
913 RunnerRegistrationEntry(rrAfterConfig, SwapDirRr);
914 }
915
916 void Rock::SwapDirRr::create(const RunnerRegistry &)
917 {
918 Must(mapOwners.empty() && freeSlotsOwners.empty());
919 for (int i = 0; i < Config.cacheSwap.n_configured; ++i) {
920 if (const Rock::SwapDir *const sd = dynamic_cast<Rock::SwapDir *>(INDEXSD(i))) {
921 const int64_t capacity = sd->entryLimitAllowed();
922
923 SwapDir::DirMap::Owner *const mapOwner =
924 SwapDir::DirMap::Init(sd->inodeMapPath(), capacity);
925 mapOwners.push_back(mapOwner);
926
927 // XXX: remove pool id and counters from PageStack
928 Ipc::Mem::Owner<Ipc::Mem::PageStack> *const freeSlotsOwner =
929 shm_new(Ipc::Mem::PageStack)(sd->freeSlotsPath(),
930 i+1, capacity,
931 sizeof(DbCellHeader));
932 freeSlotsOwners.push_back(freeSlotsOwner);
933
934 // XXX: add method to initialize PageStack with no free pages
935 while (true) {
936 Ipc::Mem::PageId pageId;
937 if (!freeSlotsOwner->object()->pop(pageId))
938 break;
939 }
940 }
941 }
942 }
943
944 Rock::SwapDirRr::~SwapDirRr()
945 {
946 for (size_t i = 0; i < mapOwners.size(); ++i) {
947 delete mapOwners[i];
948 delete freeSlotsOwners[i];
949 }
950 }