]> git.ipfire.org Git - thirdparty/squid.git/blob - src/fs/rock/RockSwapDir.cc
Added BaseMultiQueue class, a common base of the old FewToFewBiQueue class and
[thirdparty/squid.git] / src / fs / rock / RockSwapDir.cc
1 /*
2 * DEBUG: section 47 Store Directory Routines
3 */
4
5 #include "squid.h"
6 #include "cache_cf.h"
7 #include "CollapsedForwarding.h"
8 #include "ConfigOption.h"
9 #include "DiskIO/DiskIOModule.h"
10 #include "DiskIO/DiskIOStrategy.h"
11 #include "DiskIO/ReadRequest.h"
12 #include "DiskIO/WriteRequest.h"
13 #include "fs/rock/RockSwapDir.h"
14 #include "fs/rock/RockIoState.h"
15 #include "fs/rock/RockIoRequests.h"
16 #include "fs/rock/RockRebuild.h"
17 #include "globals.h"
18 #include "ipc/mem/Pages.h"
19 #include "MemObject.h"
20 #include "Parsing.h"
21 #include "SquidConfig.h"
22 #include "SquidMath.h"
23 #include "tools.h"
24
25 #include <cstdlib>
26 #include <iomanip>
27
28 #if HAVE_SYS_STAT_H
29 #include <sys/stat.h>
30 #endif
31
32 const int64_t Rock::SwapDir::HeaderSize = 16*1024;
33
34 Rock::SwapDir::SwapDir(): ::SwapDir("rock"),
35 slotSize(HeaderSize), filePath(NULL), map(NULL), io(NULL),
36 waitingForPage(NULL)
37 {
38 }
39
40 Rock::SwapDir::~SwapDir()
41 {
42 delete io;
43 delete map;
44 safe_free(filePath);
45 }
46
47 StoreSearch *
48 Rock::SwapDir::search(String const url, HttpRequest *)
49 {
50 assert(false);
51 return NULL; // XXX: implement
52 }
53
54 void
55 Rock::SwapDir::get(String const key, STOREGETCLIENT cb, void *data)
56 {
57 ::SwapDir::get(key, cb, data);
58 }
59
60 // called when Squid core needs a StoreEntry with a given key
61 StoreEntry *
62 Rock::SwapDir::get(const cache_key *key)
63 {
64 if (!map || !theFile || !theFile->canRead())
65 return NULL;
66
67 sfileno filen;
68 const Ipc::StoreMapAnchor *const slot = map->openForReading(key, filen);
69 if (!slot)
70 return NULL;
71
72 const Ipc::StoreMapAnchor::Basics &basics = slot->basics;
73
74 // create a brand new store entry and initialize it with stored basics
75 StoreEntry *e = new StoreEntry();
76 e->lock_count = 0;
77 e->swap_dirn = index;
78 e->swap_filen = filen;
79 e->swap_file_sz = basics.swap_file_sz;
80 e->lastref = basics.lastref;
81 e->timestamp = basics.timestamp;
82 e->expires = basics.expires;
83 e->lastmod = basics.lastmod;
84 e->refcount = basics.refcount;
85 e->flags = basics.flags;
86 e->store_status = STORE_OK;
87 e->setMemStatus(NOT_IN_MEMORY);
88 e->swap_status = SWAPOUT_DONE;
89 e->ping_status = PING_NONE;
90 EBIT_SET(e->flags, ENTRY_CACHABLE);
91 EBIT_CLR(e->flags, RELEASE_REQUEST);
92 EBIT_CLR(e->flags, KEY_PRIVATE);
93 EBIT_SET(e->flags, ENTRY_VALIDATED);
94 e->hashInsert(key);
95 trackReferences(*e);
96
97 return e;
98 // the disk entry remains open for reading, protected from modifications
99 }
100
101 void Rock::SwapDir::disconnect(StoreEntry &e)
102 {
103 assert(e.swap_dirn == index);
104 assert(e.swap_filen >= 0);
105 // cannot have SWAPOUT_NONE entry with swap_filen >= 0
106 assert(e.swap_status != SWAPOUT_NONE);
107
108 // do not rely on e.swap_status here because there is an async delay
109 // before it switches from SWAPOUT_WRITING to SWAPOUT_DONE.
110
111 // since e has swap_filen, its slot is locked for either reading or writing
112 map->abortIo(e.swap_filen);
113 e.swap_dirn = -1;
114 e.swap_filen = -1;
115 e.swap_status = SWAPOUT_NONE;
116 }
117
118 uint64_t
119 Rock::SwapDir::currentSize() const
120 {
121 const uint64_t spaceSize = !freeSlots ?
122 maxSize() : (slotSize * freeSlots->size());
123 // everything that is not free is in use
124 return maxSize() - spaceSize;
125 }
126
127 uint64_t
128 Rock::SwapDir::currentCount() const
129 {
130 return map ? map->entryCount() : 0;
131 }
132
133 /// In SMP mode only the disker process reports stats to avoid
134 /// counting the same stats by multiple processes.
135 bool
136 Rock::SwapDir::doReportStat() const
137 {
138 return ::SwapDir::doReportStat() && (!UsingSmp() || IamDiskProcess());
139 }
140
141 void
142 Rock::SwapDir::swappedOut(const StoreEntry &)
143 {
144 // stats are not stored but computed when needed
145 }
146
147 int64_t
148 Rock::SwapDir::entryLimitAllowed() const
149 {
150 const int64_t eLimitLo = map ? map->entryLimit() : 0; // dynamic shrinking unsupported
151 const int64_t eWanted = (maxSize() - HeaderSize)/slotSize;
152 return min(max(eLimitLo, eWanted), entryLimitHigh());
153 }
154
155 // TODO: encapsulate as a tool; identical to CossSwapDir::create()
156 void
157 Rock::SwapDir::create()
158 {
159 assert(path);
160 assert(filePath);
161
162 if (UsingSmp() && !IamDiskProcess()) {
163 debugs (47,3, HERE << "disker will create in " << path);
164 return;
165 }
166
167 debugs (47,3, HERE << "creating in " << path);
168
169 struct stat dir_sb;
170 if (::stat(path, &dir_sb) == 0) {
171 struct stat file_sb;
172 if (::stat(filePath, &file_sb) == 0) {
173 debugs (47, DBG_IMPORTANT, "Skipping existing Rock db: " << filePath);
174 return;
175 }
176 // else the db file is not there or is not accessible, and we will try
177 // to create it later below, generating a detailed error on failures.
178 } else { // path does not exist or is inaccessible
179 // If path exists but is not accessible, mkdir() below will fail, and
180 // the admin should see the error and act accordingly, so there is
181 // no need to distinguish ENOENT from other possible stat() errors.
182 debugs (47, DBG_IMPORTANT, "Creating Rock db directory: " << path);
183 const int res = mkdir(path, 0700);
184 if (res != 0)
185 createError("mkdir");
186 }
187
188 debugs (47, DBG_IMPORTANT, "Creating Rock db: " << filePath);
189 const int swap = open(filePath, O_WRONLY|O_CREAT|O_TRUNC|O_BINARY, 0600);
190 if (swap < 0)
191 createError("create");
192
193 #if SLOWLY_FILL_WITH_ZEROS
194 char block[1024];
195 Must(maxSize() % sizeof(block) == 0);
196 memset(block, '\0', sizeof(block));
197
198 for (off_t offset = 0; offset < maxSize(); offset += sizeof(block)) {
199 if (write(swap, block, sizeof(block)) != sizeof(block))
200 createError("write");
201 }
202 #else
203 if (ftruncate(swap, maxSize()) != 0)
204 createError("truncate");
205
206 char header[HeaderSize];
207 memset(header, '\0', sizeof(header));
208 if (write(swap, header, sizeof(header)) != sizeof(header))
209 createError("write");
210 #endif
211
212 close(swap);
213 }
214
215 // report Rock DB creation error and exit
216 void
217 Rock::SwapDir::createError(const char *const msg) {
218 debugs(47, DBG_CRITICAL, "ERROR: Failed to initialize Rock Store db in " <<
219 filePath << "; " << msg << " error: " << xstrerror());
220 fatal("Rock Store db creation error");
221 }
222
223 void
224 Rock::SwapDir::init()
225 {
226 debugs(47,2, HERE);
227
228 // XXX: SwapDirs aren't refcounted. We make IORequestor calls, which
229 // are refcounted. We up our count once to avoid implicit delete's.
230 lock();
231
232 freeSlots = shm_old(Ipc::Mem::PageStack)(freeSlotsPath());
233
234 Must(!map);
235 map = new DirMap(inodeMapPath());
236 map->cleaner = this;
237
238 const char *ioModule = needsDiskStrand() ? "IpcIo" : "Blocking";
239 if (DiskIOModule *m = DiskIOModule::Find(ioModule)) {
240 debugs(47,2, HERE << "Using DiskIO module: " << ioModule);
241 io = m->createStrategy();
242 io->init();
243 } else {
244 debugs(47, DBG_CRITICAL, "FATAL: Rock store is missing DiskIO module: " <<
245 ioModule);
246 fatal("Rock Store missing a required DiskIO module");
247 }
248
249 theFile = io->newFile(filePath);
250 theFile->configure(fileConfig);
251 theFile->open(O_RDWR, 0644, this);
252
253 // Increment early. Otherwise, if one SwapDir finishes rebuild before
254 // others start, storeRebuildComplete() will think the rebuild is over!
255 // TODO: move store_dirs_rebuilding hack to store modules that need it.
256 ++StoreController::store_dirs_rebuilding;
257 }
258
259 bool
260 Rock::SwapDir::needsDiskStrand() const
261 {
262 const bool wontEvenWorkWithoutDisker = Config.workers > 1;
263 const bool wouldWorkBetterWithDisker = DiskIOModule::Find("IpcIo");
264 return InDaemonMode() && (wontEvenWorkWithoutDisker ||
265 wouldWorkBetterWithDisker);
266 }
267
268 void
269 Rock::SwapDir::parse(int anIndex, char *aPath)
270 {
271 index = anIndex;
272
273 path = xstrdup(aPath);
274
275 // cache store is located at path/db
276 String fname(path);
277 fname.append("/rock");
278 filePath = xstrdup(fname.termedBuf());
279
280 parseSize(false);
281 parseOptions(0);
282
283 // Current openForWriting() code overwrites the old slot if needed
284 // and possible, so proactively removing old slots is probably useless.
285 assert(!repl); // repl = createRemovalPolicy(Config.replPolicy);
286
287 validateOptions();
288 }
289
290 void
291 Rock::SwapDir::reconfigure()
292 {
293 parseSize(true);
294 parseOptions(1);
295 // TODO: can we reconfigure the replacement policy (repl)?
296 validateOptions();
297 }
298
299 /// parse maximum db disk size
300 void
301 Rock::SwapDir::parseSize(const bool reconfig)
302 {
303 const int i = GetInteger();
304 if (i < 0)
305 fatal("negative Rock cache_dir size value");
306 const uint64_t new_max_size =
307 static_cast<uint64_t>(i) << 20; // MBytes to Bytes
308 if (!reconfig)
309 max_size = new_max_size;
310 else if (new_max_size != max_size) {
311 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir '" << path << "' size "
312 "cannot be changed dynamically, value left unchanged (" <<
313 (max_size >> 20) << " MB)");
314 }
315 }
316
317 ConfigOption *
318 Rock::SwapDir::getOptionTree() const
319 {
320 ConfigOptionVector *vector = dynamic_cast<ConfigOptionVector*>(::SwapDir::getOptionTree());
321 assert(vector);
322 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseSizeOption, &SwapDir::dumpSizeOption));
323 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseTimeOption, &SwapDir::dumpTimeOption));
324 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseRateOption, &SwapDir::dumpRateOption));
325 return vector;
326 }
327
328 bool
329 Rock::SwapDir::allowOptionReconfigure(const char *const option) const
330 {
331 return strcmp(option, "slot-size") != 0 &&
332 ::SwapDir::allowOptionReconfigure(option);
333 }
334
335 /// parses time-specific options; mimics ::SwapDir::optionObjectSizeParse()
336 bool
337 Rock::SwapDir::parseTimeOption(char const *option, const char *value, int reconfig)
338 {
339 // TODO: ::SwapDir or, better, Config should provide time-parsing routines,
340 // including time unit handling. Same for size and rate.
341
342 time_msec_t *storedTime;
343 if (strcmp(option, "swap-timeout") == 0)
344 storedTime = &fileConfig.ioTimeout;
345 else
346 return false;
347
348 if (!value)
349 self_destruct();
350
351 // TODO: handle time units and detect parsing errors better
352 const int64_t parsedValue = strtoll(value, NULL, 10);
353 if (parsedValue < 0) {
354 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
355 self_destruct();
356 }
357
358 const time_msec_t newTime = static_cast<time_msec_t>(parsedValue);
359
360 if (!reconfig)
361 *storedTime = newTime;
362 else if (*storedTime != newTime) {
363 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
364 << " cannot be changed dynamically, value left unchanged: " <<
365 *storedTime);
366 }
367
368 return true;
369 }
370
371 /// reports time-specific options; mimics ::SwapDir::optionObjectSizeDump()
372 void
373 Rock::SwapDir::dumpTimeOption(StoreEntry * e) const
374 {
375 if (fileConfig.ioTimeout)
376 storeAppendPrintf(e, " swap-timeout=%" PRId64,
377 static_cast<int64_t>(fileConfig.ioTimeout));
378 }
379
380 /// parses rate-specific options; mimics ::SwapDir::optionObjectSizeParse()
381 bool
382 Rock::SwapDir::parseRateOption(char const *option, const char *value, int isaReconfig)
383 {
384 int *storedRate;
385 if (strcmp(option, "max-swap-rate") == 0)
386 storedRate = &fileConfig.ioRate;
387 else
388 return false;
389
390 if (!value)
391 self_destruct();
392
393 // TODO: handle time units and detect parsing errors better
394 const int64_t parsedValue = strtoll(value, NULL, 10);
395 if (parsedValue < 0) {
396 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
397 self_destruct();
398 }
399
400 const int newRate = static_cast<int>(parsedValue);
401
402 if (newRate < 0) {
403 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << newRate);
404 self_destruct();
405 }
406
407 if (!isaReconfig)
408 *storedRate = newRate;
409 else if (*storedRate != newRate) {
410 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
411 << " cannot be changed dynamically, value left unchanged: " <<
412 *storedRate);
413 }
414
415 return true;
416 }
417
418 /// reports rate-specific options; mimics ::SwapDir::optionObjectSizeDump()
419 void
420 Rock::SwapDir::dumpRateOption(StoreEntry * e) const
421 {
422 if (fileConfig.ioRate >= 0)
423 storeAppendPrintf(e, " max-swap-rate=%d", fileConfig.ioRate);
424 }
425
426 /// parses size-specific options; mimics ::SwapDir::optionObjectSizeParse()
427 bool
428 Rock::SwapDir::parseSizeOption(char const *option, const char *value, int reconfig)
429 {
430 uint64_t *storedSize;
431 if (strcmp(option, "slot-size") == 0)
432 storedSize = &slotSize;
433 else
434 return false;
435
436 if (!value)
437 self_destruct();
438
439 // TODO: handle size units and detect parsing errors better
440 const uint64_t newSize = strtoll(value, NULL, 10);
441 if (newSize <= 0) {
442 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must be positive; got: " << newSize);
443 self_destruct();
444 }
445
446 if (newSize <= sizeof(DbCellHeader)) {
447 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must exceed " << sizeof(DbCellHeader) << "; got: " << newSize);
448 self_destruct();
449 }
450
451 if (!reconfig)
452 *storedSize = newSize;
453 else if (*storedSize != newSize) {
454 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
455 << " cannot be changed dynamically, value left unchanged: " <<
456 *storedSize);
457 }
458
459 return true;
460 }
461
462 /// reports size-specific options; mimics ::SwapDir::optionObjectSizeDump()
463 void
464 Rock::SwapDir::dumpSizeOption(StoreEntry * e) const
465 {
466 storeAppendPrintf(e, " slot-size=%" PRId64, slotSize);
467 }
468
469 /// check the results of the configuration; only level-0 debugging works here
470 void
471 Rock::SwapDir::validateOptions()
472 {
473 if (slotSize <= 0)
474 fatal("Rock store requires a positive slot-size");
475
476 const int64_t maxSizeRoundingWaste = 1024 * 1024; // size is configured in MB
477 const int64_t slotSizeRoundingWaste = slotSize;
478 const int64_t maxRoundingWaste =
479 max(maxSizeRoundingWaste, slotSizeRoundingWaste);
480 const int64_t usableDiskSize = diskOffset(entryLimitAllowed());
481 const int64_t diskWasteSize = maxSize() - usableDiskSize;
482 Must(diskWasteSize >= 0);
483
484 // warn if maximum db size is not reachable due to sfileno limit
485 if (entryLimitAllowed() == entryLimitHigh() &&
486 diskWasteSize >= maxRoundingWaste) {
487 debugs(47, DBG_CRITICAL, "Rock store cache_dir[" << index << "] '" << path << "':");
488 debugs(47, DBG_CRITICAL, "\tmaximum number of entries: " << entryLimitAllowed());
489 debugs(47, DBG_CRITICAL, "\tdb slot size: " << slotSize << " Bytes");
490 debugs(47, DBG_CRITICAL, "\tmaximum db size: " << maxSize() << " Bytes");
491 debugs(47, DBG_CRITICAL, "\tusable db size: " << usableDiskSize << " Bytes");
492 debugs(47, DBG_CRITICAL, "\tdisk space waste: " << diskWasteSize << " Bytes");
493 debugs(47, DBG_CRITICAL, "WARNING: Rock store config wastes space.");
494 }
495 }
496
497 void
498 Rock::SwapDir::rebuild()
499 {
500 //++StoreController::store_dirs_rebuilding; // see Rock::SwapDir::init()
501 AsyncJob::Start(new Rebuild(this));
502 }
503
504 bool
505 Rock::SwapDir::canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const
506 {
507 if (!::SwapDir::canStore(e, sizeof(DbCellHeader)+diskSpaceNeeded, load))
508 return false;
509
510 if (!theFile || !theFile->canWrite())
511 return false;
512
513 if (!map)
514 return false;
515
516 // Do not start I/O transaction if there are less than 10% free pages left.
517 // TODO: reserve page instead
518 if (needsDiskStrand() &&
519 Ipc::Mem::PageLevel(Ipc::Mem::PageId::ioPage) >= 0.9 * Ipc::Mem::PageLimit(Ipc::Mem::PageId::ioPage)) {
520 debugs(47, 5, HERE << "too few shared pages for IPC I/O left");
521 return false;
522 }
523
524 if (io->shedLoad())
525 return false;
526
527 load = io->load();
528 return true;
529 }
530
531 StoreIOState::Pointer
532 Rock::SwapDir::createStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
533 {
534 if (!theFile || theFile->error()) {
535 debugs(47,4, HERE << theFile);
536 return NULL;
537 }
538
539 sfileno filen;
540 Ipc::StoreMapAnchor *const slot =
541 map->openForWriting(reinterpret_cast<const cache_key *>(e.key), filen);
542 if (!slot) {
543 debugs(47, 5, HERE << "map->add failed");
544 return NULL;
545 }
546
547 assert(filen >= 0);
548 slot->set(e);
549
550 // XXX: We rely on our caller, storeSwapOutStart(), to set e.fileno.
551 // If that does not happen, the entry will not decrement the read level!
552
553 Rock::SwapDir::Pointer self(this);
554 IoState *sio = new IoState(self, &e, cbFile, cbIo, data);
555
556 sio->swap_dirn = index;
557 sio->swap_filen = filen;
558 sio->writeableAnchor_ = slot;
559
560 debugs(47,5, HERE << "dir " << index << " created new filen " <<
561 std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
562 sio->swap_filen << std::dec << " starting at " <<
563 diskOffset(sio->swap_filen));
564
565 sio->file(theFile);
566
567 trackReferences(e);
568 return sio;
569 }
570
571 int64_t
572 Rock::SwapDir::diskOffset(int filen) const
573 {
574 assert(filen >= 0);
575 return HeaderSize + slotSize*filen;
576 }
577
578 int64_t
579 Rock::SwapDir::diskOffset(Ipc::Mem::PageId &pageId) const
580 {
581 assert(pageId);
582 return diskOffset(pageId.number - 1);
583 }
584
585 int64_t
586 Rock::SwapDir::diskOffsetLimit() const
587 {
588 assert(map);
589 return diskOffset(map->entryLimit());
590 }
591
592 int
593 Rock::SwapDir::entryMaxPayloadSize() const
594 {
595 return slotSize - sizeof(DbCellHeader);
596 }
597
598 int
599 Rock::SwapDir::entriesNeeded(const int64_t objSize) const
600 {
601 return (objSize + entryMaxPayloadSize() - 1) / entryMaxPayloadSize();
602 }
603
604 bool
605 Rock::SwapDir::useFreeSlot(Ipc::Mem::PageId &pageId)
606 {
607 if (freeSlots->pop(pageId)) {
608 debugs(47, 5, "got a previously free slot: " << pageId);
609 return true;
610 }
611
612 // catch free slots delivered to noteFreeMapSlice()
613 assert(!waitingForPage);
614 waitingForPage = &pageId;
615 if (map->purgeOne()) {
616 assert(!waitingForPage); // noteFreeMapSlice() should have cleared it
617 assert(pageId.set());
618 debugs(47, 5, "got a previously busy slot: " << pageId);
619 return true;
620 }
621 assert(waitingForPage == &pageId);
622 waitingForPage = NULL;
623
624 debugs(47, 3, "cannot get a slot; entries: " << map->entryCount());
625 return false;
626 }
627
628 bool
629 Rock::SwapDir::validSlotId(const SlotId slotId) const
630 {
631 return 0 <= slotId && slotId < entryLimitAllowed();
632 }
633
634 void
635 Rock::SwapDir::noteFreeMapSlice(const sfileno sliceId)
636 {
637 Ipc::Mem::PageId pageId;
638 pageId.pool = index+1;
639 pageId.number = sliceId+1;
640 if (waitingForPage) {
641 *waitingForPage = pageId;
642 waitingForPage = NULL;
643 } else {
644 freeSlots->push(pageId);
645 }
646 }
647
648 // tries to open an old entry with swap_filen for reading
649 StoreIOState::Pointer
650 Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
651 {
652 if (!theFile || theFile->error()) {
653 debugs(47,4, HERE << theFile);
654 return NULL;
655 }
656
657 if (e.swap_filen < 0) {
658 debugs(47,4, HERE << e);
659 return NULL;
660 }
661
662 // Do not start I/O transaction if there are less than 10% free pages left.
663 // TODO: reserve page instead
664 if (needsDiskStrand() &&
665 Ipc::Mem::PageLevel(Ipc::Mem::PageId::ioPage) >= 0.9 * Ipc::Mem::PageLimit(Ipc::Mem::PageId::ioPage)) {
666 debugs(47, 5, HERE << "too few shared pages for IPC I/O left");
667 return NULL;
668 }
669
670 // The are two ways an entry can get swap_filen: our get() locked it for
671 // reading or our storeSwapOutStart() locked it for writing. Peeking at our
672 // locked entry is safe, but no support for reading a filling entry.
673 const Ipc::StoreMapAnchor *slot = map->peekAtReader(e.swap_filen);
674 if (!slot)
675 return NULL; // we were writing afterall
676
677 Rock::SwapDir::Pointer self(this);
678 IoState *sio = new IoState(self, &e, cbFile, cbIo, data);
679
680 sio->swap_dirn = index;
681 sio->swap_filen = e.swap_filen;
682 sio->readableAnchor_ = slot;
683 sio->file(theFile);
684
685 debugs(47,5, HERE << "dir " << index << " has old filen: " <<
686 std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
687 sio->swap_filen);
688
689 assert(slot->sameKey(static_cast<const cache_key*>(e.key)));
690 assert(slot->basics.swap_file_sz > 0);
691 assert(slot->basics.swap_file_sz == e.swap_file_sz);
692
693 return sio;
694 }
695
696 void
697 Rock::SwapDir::ioCompletedNotification()
698 {
699 if (!theFile)
700 fatalf("Rock cache_dir failed to initialize db file: %s", filePath);
701
702 if (theFile->error())
703 fatalf("Rock cache_dir at %s failed to open db file: %s", filePath,
704 xstrerror());
705
706 debugs(47, 2, "Rock cache_dir[" << index << "] limits: " <<
707 std::setw(12) << maxSize() << " disk bytes and " <<
708 std::setw(7) << map->entryLimit() << " entries");
709
710 rebuild();
711 }
712
713 void
714 Rock::SwapDir::closeCompleted()
715 {
716 theFile = NULL;
717 }
718
719 void
720 Rock::SwapDir::readCompleted(const char *buf, int rlen, int errflag, RefCount< ::ReadRequest> r)
721 {
722 ReadRequest *request = dynamic_cast<Rock::ReadRequest*>(r.getRaw());
723 assert(request);
724 IoState::Pointer sio = request->sio;
725
726 if (errflag == DISK_OK && rlen > 0)
727 sio->offset_ += rlen;
728
729 StoreIOState::STRCB *callb = sio->read.callback;
730 assert(callb);
731 sio->read.callback = NULL;
732 void *cbdata;
733 if (cbdataReferenceValidDone(sio->read.callback_data, &cbdata))
734 callb(cbdata, r->buf, rlen, sio.getRaw());
735 }
736
737 void
738 Rock::SwapDir::writeCompleted(int errflag, size_t rlen, RefCount< ::WriteRequest> r)
739 {
740 Rock::WriteRequest *request = dynamic_cast<Rock::WriteRequest*>(r.getRaw());
741 assert(request);
742 assert(request->sio != NULL);
743 IoState &sio = *request->sio;
744
745 // quit if somebody called IoState::close() while we were waiting
746 if (!sio.stillWaiting()) {
747 debugs(79, 3, "ignoring closed entry " << sio.swap_filen);
748 return;
749 }
750
751 // XXX: check that there are workers waiting for data, i.e. readers > 0
752 CollapsedForwarding::NewData(sio);
753
754 if (errflag == DISK_OK) {
755 // do not increment sio.offset_ because we do it in sio->write()
756 if (request->isLast) {
757 // close, the entry gets the read lock
758 map->closeForWriting(sio.swap_filen, true);
759 sio.finishedWriting(errflag);
760 }
761 } else {
762 writeError(sio.swap_filen);
763 sio.finishedWriting(errflag);
764 // and hope that Core will call disconnect() to close the map entry
765 }
766 }
767
768 void
769 Rock::SwapDir::writeError(const sfileno fileno)
770 {
771 // Do not abortWriting here. The entry should keep the write lock
772 // instead of losing association with the store and confusing core.
773 map->freeEntry(fileno); // will mark as unusable, just in case
774 // All callers must also call IoState callback, to propagate the error.
775 }
776
777 bool
778 Rock::SwapDir::full() const
779 {
780 return freeSlots != NULL && !freeSlots->size();
781 }
782
783 // storeSwapOutFileClosed calls this nethod on DISK_NO_SPACE_LEFT,
784 // but it should not happen for us
785 void
786 Rock::SwapDir::diskFull()
787 {
788 debugs(20, DBG_IMPORTANT, "BUG: No space left with rock cache_dir: " <<
789 filePath);
790 }
791
792 /// purge while full(); it should be sufficient to purge just one
793 void
794 Rock::SwapDir::maintain()
795 {
796 // The Store calls this to free some db space, but there is nothing wrong
797 // with a full() db, except when db has to shrink after reconfigure, and
798 // we do not support shrinking yet (it would have to purge specific slots).
799 // TODO: Disable maintain() requests when they are pointless.
800 }
801
802 void
803 Rock::SwapDir::reference(StoreEntry &e)
804 {
805 debugs(47, 5, HERE << &e << ' ' << e.swap_dirn << ' ' << e.swap_filen);
806 if (repl && repl->Referenced)
807 repl->Referenced(repl, &e, &e.repl);
808 }
809
810 bool
811 Rock::SwapDir::dereference(StoreEntry &e, bool)
812 {
813 debugs(47, 5, HERE << &e << ' ' << e.swap_dirn << ' ' << e.swap_filen);
814 if (repl && repl->Dereferenced)
815 repl->Dereferenced(repl, &e, &e.repl);
816
817 // no need to keep e in the global store_table for us; we have our own map
818 return false;
819 }
820
821 bool
822 Rock::SwapDir::unlinkdUseful() const
823 {
824 // no entry-specific files to unlink
825 return false;
826 }
827
828 void
829 Rock::SwapDir::unlink(StoreEntry &e)
830 {
831 debugs(47, 5, HERE << e);
832 ignoreReferences(e);
833 map->freeEntry(e.swap_filen);
834 disconnect(e);
835 }
836
837 void
838 Rock::SwapDir::trackReferences(StoreEntry &e)
839 {
840 debugs(47, 5, HERE << e);
841 if (repl)
842 repl->Add(repl, &e, &e.repl);
843 }
844
845 void
846 Rock::SwapDir::ignoreReferences(StoreEntry &e)
847 {
848 debugs(47, 5, HERE << e);
849 if (repl)
850 repl->Remove(repl, &e, &e.repl);
851 }
852
853 void
854 Rock::SwapDir::statfs(StoreEntry &e) const
855 {
856 storeAppendPrintf(&e, "\n");
857 storeAppendPrintf(&e, "Maximum Size: %" PRIu64 " KB\n", maxSize() >> 10);
858 storeAppendPrintf(&e, "Current Size: %.2f KB %.2f%%\n",
859 currentSize() / 1024.0,
860 Math::doublePercent(currentSize(), maxSize()));
861
862 if (map) {
863 const int limit = map->entryLimit();
864 storeAppendPrintf(&e, "Maximum entries: %9d\n", limit);
865 if (limit > 0) {
866 const int entryCount = map->entryCount();
867 storeAppendPrintf(&e, "Current entries: %9d %.2f%%\n",
868 entryCount, (100.0 * entryCount / limit));
869
870 const unsigned int slotsFree = !freeSlots ? 0 : freeSlots->size();
871 if (slotsFree <= static_cast<const unsigned int>(limit)) {
872 const int usedSlots = limit - static_cast<const int>(slotsFree);
873 storeAppendPrintf(&e, "Used slots: %9d %.2f%%\n",
874 usedSlots, (100.0 * usedSlots / limit));
875 }
876 if (limit < 100) { // XXX: otherwise too expensive to count
877 Ipc::ReadWriteLockStats stats;
878 map->updateStats(stats);
879 stats.dump(e);
880 }
881 }
882 }
883
884 storeAppendPrintf(&e, "Pending operations: %d out of %d\n",
885 store_open_disk_fd, Config.max_open_disk_fds);
886
887 storeAppendPrintf(&e, "Flags:");
888
889 if (flags.selected)
890 storeAppendPrintf(&e, " SELECTED");
891
892 if (flags.read_only)
893 storeAppendPrintf(&e, " READ-ONLY");
894
895 storeAppendPrintf(&e, "\n");
896
897 }
898
899 const char *
900 Rock::SwapDir::inodeMapPath() const {
901 static String inodesPath;
902 inodesPath = path;
903 inodesPath.append("_inodes");
904 return inodesPath.termedBuf();
905 }
906
907 const char *
908 Rock::SwapDir::freeSlotsPath() const {
909 static String spacesPath;
910 spacesPath = path;
911 spacesPath.append("_spaces");
912 return spacesPath.termedBuf();
913 }
914
915 namespace Rock
916 {
917 RunnerRegistrationEntry(rrAfterConfig, SwapDirRr);
918 }
919
920 void Rock::SwapDirRr::create(const RunnerRegistry &)
921 {
922 Must(mapOwners.empty() && freeSlotsOwners.empty());
923 for (int i = 0; i < Config.cacheSwap.n_configured; ++i) {
924 if (const Rock::SwapDir *const sd = dynamic_cast<Rock::SwapDir *>(INDEXSD(i))) {
925 const int64_t capacity = sd->entryLimitAllowed();
926
927 SwapDir::DirMap::Owner *const mapOwner =
928 SwapDir::DirMap::Init(sd->inodeMapPath(), capacity);
929 mapOwners.push_back(mapOwner);
930
931 // XXX: remove pool id and counters from PageStack
932 Ipc::Mem::Owner<Ipc::Mem::PageStack> *const freeSlotsOwner =
933 shm_new(Ipc::Mem::PageStack)(sd->freeSlotsPath(),
934 i+1, capacity,
935 sizeof(DbCellHeader));
936 freeSlotsOwners.push_back(freeSlotsOwner);
937
938 // XXX: add method to initialize PageStack with no free pages
939 while (true) {
940 Ipc::Mem::PageId pageId;
941 if (!freeSlotsOwner->object()->pop(pageId))
942 break;
943 }
944 }
945 }
946 }
947
948 Rock::SwapDirRr::~SwapDirRr()
949 {
950 for (size_t i = 0; i < mapOwners.size(); ++i) {
951 delete mapOwners[i];
952 delete freeSlotsOwners[i];
953 }
954 }