]> git.ipfire.org Git - thirdparty/squid.git/blob - src/fs/rock/RockSwapDir.cc
Working Large Rock implementation, including concurrent store "rebuild"
[thirdparty/squid.git] / src / fs / rock / RockSwapDir.cc
1 /*
2 * DEBUG: section 47 Store Directory Routines
3 */
4
5 #include "squid.h"
6 #include "cache_cf.h"
7 #include "ConfigOption.h"
8 #include "DiskIO/DiskIOModule.h"
9 #include "DiskIO/DiskIOStrategy.h"
10 #include "DiskIO/ReadRequest.h"
11 #include "DiskIO/WriteRequest.h"
12 #include "fs/rock/RockSwapDir.h"
13 #include "fs/rock/RockIoState.h"
14 #include "fs/rock/RockIoRequests.h"
15 #include "fs/rock/RockRebuild.h"
16 #include "globals.h"
17 #include "ipc/mem/Pages.h"
18 #include "MemObject.h"
19 #include "Parsing.h"
20 #include "SquidConfig.h"
21 #include "SquidMath.h"
22 #include "tools.h"
23
24 #include <cstdlib>
25 #include <iomanip>
26
27 #if HAVE_SYS_STAT_H
28 #include <sys/stat.h>
29 #endif
30
31 const int64_t Rock::SwapDir::HeaderSize = 16*1024;
32
33 Rock::SwapDir::SwapDir(): ::SwapDir("rock"),
34 slotSize(HeaderSize), filePath(NULL), map(NULL), io(NULL), allSlots(NULL),
35 waitingForPage(NULL)
36 {
37 }
38
39 Rock::SwapDir::~SwapDir()
40 {
41 delete io;
42 delete map;
43 safe_free(filePath);
44 }
45
46 StoreSearch *
47 Rock::SwapDir::search(String const url, HttpRequest *)
48 {
49 assert(false);
50 return NULL; // XXX: implement
51 }
52
53 void
54 Rock::SwapDir::get(String const key, STOREGETCLIENT cb, void *data)
55 {
56 ::SwapDir::get(key, cb, data);
57 }
58
59 // called when Squid core needs a StoreEntry with a given key
60 StoreEntry *
61 Rock::SwapDir::get(const cache_key *key)
62 {
63 if (!map || !theFile || !theFile->canRead())
64 return NULL;
65
66 sfileno filen;
67 const Ipc::StoreMapAnchor *const slot = map->openForReading(key, filen);
68 if (!slot)
69 return NULL;
70
71 const Ipc::StoreMapAnchor::Basics &basics = slot->basics;
72
73 // create a brand new store entry and initialize it with stored basics
74 StoreEntry *e = new StoreEntry();
75 e->lock_count = 0;
76 e->swap_dirn = index;
77 e->swap_filen = filen;
78 e->swap_file_sz = basics.swap_file_sz;
79 e->lastref = basics.lastref;
80 e->timestamp = basics.timestamp;
81 e->expires = basics.expires;
82 e->lastmod = basics.lastmod;
83 e->refcount = basics.refcount;
84 e->flags = basics.flags;
85 e->store_status = STORE_OK;
86 e->setMemStatus(NOT_IN_MEMORY);
87 e->swap_status = SWAPOUT_DONE;
88 e->ping_status = PING_NONE;
89 EBIT_SET(e->flags, ENTRY_CACHABLE);
90 EBIT_CLR(e->flags, RELEASE_REQUEST);
91 EBIT_CLR(e->flags, KEY_PRIVATE);
92 EBIT_SET(e->flags, ENTRY_VALIDATED);
93 e->hashInsert(key);
94 trackReferences(*e);
95
96 return e;
97 // the disk entry remains open for reading, protected from modifications
98 }
99
100 void Rock::SwapDir::disconnect(StoreEntry &e)
101 {
102 assert(e.swap_dirn == index);
103 assert(e.swap_filen >= 0);
104 // cannot have SWAPOUT_NONE entry with swap_filen >= 0
105 assert(e.swap_status != SWAPOUT_NONE);
106
107 // do not rely on e.swap_status here because there is an async delay
108 // before it switches from SWAPOUT_WRITING to SWAPOUT_DONE.
109
110 // since e has swap_filen, its slot is locked for either reading or writing
111 map->abortIo(e.swap_filen);
112 e.swap_dirn = -1;
113 e.swap_filen = -1;
114 e.swap_status = SWAPOUT_NONE;
115 }
116
117 uint64_t
118 Rock::SwapDir::currentSize() const
119 {
120 const uint64_t spaceSize = !freeSlots ?
121 maxSize() : (slotSize * freeSlots->size());
122 // everything that is not free is in use
123 return maxSize() - spaceSize;
124 }
125
126 uint64_t
127 Rock::SwapDir::currentCount() const
128 {
129 return map ? map->entryCount() : 0;
130 }
131
132 /// In SMP mode only the disker process reports stats to avoid
133 /// counting the same stats by multiple processes.
134 bool
135 Rock::SwapDir::doReportStat() const
136 {
137 return ::SwapDir::doReportStat() && (!UsingSmp() || IamDiskProcess());
138 }
139
140 void
141 Rock::SwapDir::swappedOut(const StoreEntry &)
142 {
143 // stats are not stored but computed when needed
144 }
145
146 int64_t
147 Rock::SwapDir::entryLimitAllowed() const
148 {
149 const int64_t eLimitLo = map ? map->entryLimit() : 0; // dynamic shrinking unsupported
150 const int64_t eWanted = (maxSize() - HeaderSize)/slotSize;
151 return min(max(eLimitLo, eWanted), entryLimitHigh());
152 }
153
154 // TODO: encapsulate as a tool; identical to CossSwapDir::create()
155 void
156 Rock::SwapDir::create()
157 {
158 assert(path);
159 assert(filePath);
160
161 if (UsingSmp() && !IamDiskProcess()) {
162 debugs (47,3, HERE << "disker will create in " << path);
163 return;
164 }
165
166 debugs (47,3, HERE << "creating in " << path);
167
168 struct stat swap_sb;
169 if (::stat(path, &swap_sb) < 0) {
170 debugs (47, DBG_IMPORTANT, "Creating Rock db directory: " << path);
171 const int res = mkdir(path, 0700);
172 if (res != 0)
173 createError("mkdir");
174 }
175
176 const int swap = open(filePath, O_WRONLY|O_CREAT|O_TRUNC|O_BINARY, 0600);
177 if (swap < 0)
178 createError("create");
179
180 #if SLOWLY_FILL_WITH_ZEROS
181 char block[1024];
182 Must(maxSize() % sizeof(block) == 0);
183 memset(block, '\0', sizeof(block));
184
185 for (off_t offset = 0; offset < maxSize(); offset += sizeof(block)) {
186 if (write(swap, block, sizeof(block)) != sizeof(block))
187 createError("write");
188 }
189 #else
190 if (ftruncate(swap, maxSize()) != 0)
191 createError("truncate");
192
193 char header[HeaderSize];
194 memset(header, '\0', sizeof(header));
195 if (write(swap, header, sizeof(header)) != sizeof(header))
196 createError("write");
197 #endif
198
199 close(swap);
200 }
201
202 // report Rock DB creation error and exit
203 void
204 Rock::SwapDir::createError(const char *const msg) {
205 debugs(47, DBG_CRITICAL, "ERROR: Failed to initialize Rock Store db in " <<
206 filePath << "; " << msg << " error: " << xstrerror());
207 fatal("Rock Store db creation error");
208 }
209
210 void
211 Rock::SwapDir::init()
212 {
213 debugs(47,2, HERE);
214
215 // XXX: SwapDirs aren't refcounted. We make IORequestor calls, which
216 // are refcounted. We up our count once to avoid implicit delete's.
217 lock();
218
219 Must(!map);
220 map = new DirMap(inodeMapPath());
221 map->cleaner = this;
222
223 const char *ioModule = needsDiskStrand() ? "IpcIo" : "Blocking";
224 if (DiskIOModule *m = DiskIOModule::Find(ioModule)) {
225 debugs(47,2, HERE << "Using DiskIO module: " << ioModule);
226 io = m->createStrategy();
227 io->init();
228 } else {
229 debugs(47, DBG_CRITICAL, "FATAL: Rock store is missing DiskIO module: " <<
230 ioModule);
231 fatal("Rock Store missing a required DiskIO module");
232 }
233
234 theFile = io->newFile(filePath);
235 theFile->configure(fileConfig);
236 theFile->open(O_RDWR, 0644, this);
237
238 freeSlots = shm_old(Ipc::Mem::PageStack)(freeSlotsPath());
239
240 // Increment early. Otherwise, if one SwapDir finishes rebuild before
241 // others start, storeRebuildComplete() will think the rebuild is over!
242 // TODO: move store_dirs_rebuilding hack to store modules that need it.
243 ++StoreController::store_dirs_rebuilding;
244 }
245
246 bool
247 Rock::SwapDir::needsDiskStrand() const
248 {
249 const bool wontEvenWorkWithoutDisker = Config.workers > 1;
250 const bool wouldWorkBetterWithDisker = DiskIOModule::Find("IpcIo");
251 return InDaemonMode() && (wontEvenWorkWithoutDisker ||
252 wouldWorkBetterWithDisker);
253 }
254
255 void
256 Rock::SwapDir::parse(int anIndex, char *aPath)
257 {
258 index = anIndex;
259
260 path = xstrdup(aPath);
261
262 // cache store is located at path/db
263 String fname(path);
264 fname.append("/rock");
265 filePath = xstrdup(fname.termedBuf());
266
267 parseSize(false);
268 parseOptions(0);
269
270 // Current openForWriting() code overwrites the old slot if needed
271 // and possible, so proactively removing old slots is probably useless.
272 assert(!repl); // repl = createRemovalPolicy(Config.replPolicy);
273
274 validateOptions();
275 }
276
277 void
278 Rock::SwapDir::reconfigure()
279 {
280 parseSize(true);
281 parseOptions(1);
282 // TODO: can we reconfigure the replacement policy (repl)?
283 validateOptions();
284 }
285
286 /// parse maximum db disk size
287 void
288 Rock::SwapDir::parseSize(const bool reconfiguring)
289 {
290 const int i = GetInteger();
291 if (i < 0)
292 fatal("negative Rock cache_dir size value");
293 const uint64_t new_max_size =
294 static_cast<uint64_t>(i) << 20; // MBytes to Bytes
295 if (!reconfiguring)
296 max_size = new_max_size;
297 else if (new_max_size != max_size) {
298 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir '" << path << "' size "
299 "cannot be changed dynamically, value left unchanged (" <<
300 (max_size >> 20) << " MB)");
301 }
302 }
303
304 ConfigOption *
305 Rock::SwapDir::getOptionTree() const
306 {
307 ConfigOptionVector *vector = dynamic_cast<ConfigOptionVector*>(::SwapDir::getOptionTree());
308 assert(vector);
309 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseSizeOption, &SwapDir::dumpSizeOption));
310 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseTimeOption, &SwapDir::dumpTimeOption));
311 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseRateOption, &SwapDir::dumpRateOption));
312 return vector;
313 }
314
315 bool
316 Rock::SwapDir::allowOptionReconfigure(const char *const option) const
317 {
318 return strcmp(option, "slot-size") != 0 &&
319 ::SwapDir::allowOptionReconfigure(option);
320 }
321
322 /// parses time-specific options; mimics ::SwapDir::optionObjectSizeParse()
323 bool
324 Rock::SwapDir::parseTimeOption(char const *option, const char *value, int reconfiguring)
325 {
326 // TODO: ::SwapDir or, better, Config should provide time-parsing routines,
327 // including time unit handling. Same for size and rate.
328
329 time_msec_t *storedTime;
330 if (strcmp(option, "swap-timeout") == 0)
331 storedTime = &fileConfig.ioTimeout;
332 else
333 return false;
334
335 if (!value)
336 self_destruct();
337
338 // TODO: handle time units and detect parsing errors better
339 const int64_t parsedValue = strtoll(value, NULL, 10);
340 if (parsedValue < 0) {
341 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
342 self_destruct();
343 }
344
345 const time_msec_t newTime = static_cast<time_msec_t>(parsedValue);
346
347 if (!reconfiguring)
348 *storedTime = newTime;
349 else if (*storedTime != newTime) {
350 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
351 << " cannot be changed dynamically, value left unchanged: " <<
352 *storedTime);
353 }
354
355 return true;
356 }
357
358 /// reports time-specific options; mimics ::SwapDir::optionObjectSizeDump()
359 void
360 Rock::SwapDir::dumpTimeOption(StoreEntry * e) const
361 {
362 if (fileConfig.ioTimeout)
363 storeAppendPrintf(e, " swap-timeout=%" PRId64,
364 static_cast<int64_t>(fileConfig.ioTimeout));
365 }
366
367 /// parses rate-specific options; mimics ::SwapDir::optionObjectSizeParse()
368 bool
369 Rock::SwapDir::parseRateOption(char const *option, const char *value, int isaReconfig)
370 {
371 int *storedRate;
372 if (strcmp(option, "max-swap-rate") == 0)
373 storedRate = &fileConfig.ioRate;
374 else
375 return false;
376
377 if (!value)
378 self_destruct();
379
380 // TODO: handle time units and detect parsing errors better
381 const int64_t parsedValue = strtoll(value, NULL, 10);
382 if (parsedValue < 0) {
383 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
384 self_destruct();
385 }
386
387 const int newRate = static_cast<int>(parsedValue);
388
389 if (newRate < 0) {
390 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << newRate);
391 self_destruct();
392 }
393
394 if (!isaReconfig)
395 *storedRate = newRate;
396 else if (*storedRate != newRate) {
397 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
398 << " cannot be changed dynamically, value left unchanged: " <<
399 *storedRate);
400 }
401
402 return true;
403 }
404
405 /// reports rate-specific options; mimics ::SwapDir::optionObjectSizeDump()
406 void
407 Rock::SwapDir::dumpRateOption(StoreEntry * e) const
408 {
409 if (fileConfig.ioRate >= 0)
410 storeAppendPrintf(e, " max-swap-rate=%d", fileConfig.ioRate);
411 }
412
413 /// parses size-specific options; mimics ::SwapDir::optionObjectSizeParse()
414 bool
415 Rock::SwapDir::parseSizeOption(char const *option, const char *value, int reconfiguring)
416 {
417 uint64_t *storedSize;
418 if (strcmp(option, "slot-size") == 0)
419 storedSize = &slotSize;
420 else
421 return false;
422
423 if (!value)
424 self_destruct();
425
426 // TODO: handle size units and detect parsing errors better
427 const uint64_t newSize = strtoll(value, NULL, 10);
428 if (newSize <= 0) {
429 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must be positive; got: " << newSize);
430 self_destruct();
431 }
432
433 if (newSize <= sizeof(DbCellHeader)) {
434 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must exceed " << sizeof(DbCellHeader) << "; got: " << newSize);
435 self_destruct();
436 }
437
438 if (!reconfiguring)
439 *storedSize = newSize;
440 else if (*storedSize != newSize) {
441 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
442 << " cannot be changed dynamically, value left unchanged: " <<
443 *storedSize);
444 }
445
446 return true;
447 }
448
449 /// reports size-specific options; mimics ::SwapDir::optionObjectSizeDump()
450 void
451 Rock::SwapDir::dumpSizeOption(StoreEntry * e) const
452 {
453 storeAppendPrintf(e, " slot-size=%" PRId64, slotSize);
454 }
455
456 /// check the results of the configuration; only level-0 debugging works here
457 void
458 Rock::SwapDir::validateOptions()
459 {
460 if (slotSize <= 0)
461 fatal("Rock store requires a positive slot-size");
462
463 const int64_t maxSizeRoundingWaste = 1024 * 1024; // size is configured in MB
464 const int64_t slotSizeRoundingWaste = slotSize;
465 const int64_t maxRoundingWaste =
466 max(maxSizeRoundingWaste, slotSizeRoundingWaste);
467 const int64_t usableDiskSize = diskOffset(entryLimitAllowed());
468 const int64_t diskWasteSize = maxSize() - usableDiskSize;
469 Must(diskWasteSize >= 0);
470
471 // warn if maximum db size is not reachable due to sfileno limit
472 if (entryLimitAllowed() == entryLimitHigh() &&
473 diskWasteSize >= maxRoundingWaste) {
474 debugs(47, DBG_CRITICAL, "Rock store cache_dir[" << index << "] '" << path << "':");
475 debugs(47, DBG_CRITICAL, "\tmaximum number of entries: " << entryLimitAllowed());
476 debugs(47, DBG_CRITICAL, "\tdb slot size: " << slotSize << " Bytes");
477 debugs(47, DBG_CRITICAL, "\tmaximum db size: " << maxSize() << " Bytes");
478 debugs(47, DBG_CRITICAL, "\tusable db size: " << usableDiskSize << " Bytes");
479 debugs(47, DBG_CRITICAL, "\tdisk space waste: " << diskWasteSize << " Bytes");
480 debugs(47, DBG_CRITICAL, "WARNING: Rock store config wastes space.");
481 }
482 }
483
484 void
485 Rock::SwapDir::rebuild()
486 {
487 //++StoreController::store_dirs_rebuilding; // see Rock::SwapDir::init()
488 AsyncJob::Start(new Rebuild(this));
489 }
490
491 bool
492 Rock::SwapDir::canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const
493 {
494 if (!::SwapDir::canStore(e, sizeof(DbCellHeader)+diskSpaceNeeded, load))
495 return false;
496
497 if (!theFile || !theFile->canWrite())
498 return false;
499
500 if (!map)
501 return false;
502
503 // Do not start I/O transaction if there are less than 10% free pages left.
504 // TODO: reserve page instead
505 if (needsDiskStrand() &&
506 Ipc::Mem::PageLevel(Ipc::Mem::PageId::ioPage) >= 0.9 * Ipc::Mem::PageLimit(Ipc::Mem::PageId::ioPage)) {
507 debugs(47, 5, HERE << "too few shared pages for IPC I/O left");
508 return false;
509 }
510
511 if (io->shedLoad())
512 return false;
513
514 load = io->load();
515 return true;
516 }
517
518 StoreIOState::Pointer
519 Rock::SwapDir::createStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
520 {
521 if (!theFile || theFile->error()) {
522 debugs(47,4, HERE << theFile);
523 return NULL;
524 }
525
526 sfileno filen;
527 Ipc::StoreMapAnchor *const slot =
528 map->openForWriting(reinterpret_cast<const cache_key *>(e.key), filen);
529 if (!slot) {
530 debugs(47, 5, HERE << "map->add failed");
531 return NULL;
532 }
533
534 assert(filen >= 0);
535 slot->set(e);
536
537 // XXX: We rely on our caller, storeSwapOutStart(), to set e.fileno.
538 // If that does not happen, the entry will not decrement the read level!
539
540 Rock::SwapDir::Pointer self(this);
541 IoState *sio = new IoState(self, &e, cbFile, cbIo, data);
542
543 sio->swap_dirn = index;
544 sio->swap_filen = filen;
545 sio->writeableAnchor_ = slot;
546
547 debugs(47,5, HERE << "dir " << index << " created new filen " <<
548 std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
549 sio->swap_filen << std::dec << " starting at " <<
550 diskOffset(sio->swap_filen));
551
552 sio->file(theFile);
553
554 trackReferences(e);
555 return sio;
556 }
557
558 int64_t
559 Rock::SwapDir::diskOffset(int filen) const
560 {
561 assert(filen >= 0);
562 return HeaderSize + slotSize*filen;
563 }
564
565 int64_t
566 Rock::SwapDir::diskOffset(Ipc::Mem::PageId &pageId) const
567 {
568 assert(pageId);
569 return diskOffset(pageId.number - 1);
570 }
571
572 int64_t
573 Rock::SwapDir::diskOffsetLimit() const
574 {
575 assert(map);
576 return diskOffset(map->entryLimit());
577 }
578
579 int
580 Rock::SwapDir::entryMaxPayloadSize() const
581 {
582 return slotSize - sizeof(DbCellHeader);
583 }
584
585 int
586 Rock::SwapDir::entriesNeeded(const int64_t objSize) const
587 {
588 return (objSize + entryMaxPayloadSize() - 1) / entryMaxPayloadSize();
589 }
590
591 bool
592 Rock::SwapDir::useFreeSlot(Ipc::Mem::PageId &pageId)
593 {
594 if (freeSlots->pop(pageId)) {
595 debugs(47, 5, "got a previously free slot: " << pageId);
596 return true;
597 }
598
599 // catch free slots delivered to noteFreeMapSlice()
600 assert(!waitingForPage);
601 waitingForPage = &pageId;
602 if (map->purgeOne()) {
603 assert(!waitingForPage); // noteFreeMapSlice() should have cleared it
604 assert(pageId.set());
605 debugs(47, 5, "got a previously busy slot: " << pageId);
606 return true;
607 }
608 assert(waitingForPage == &pageId);
609 waitingForPage = NULL;
610
611 debugs(47, 3, "cannot get a slot; entries: " << map->entryCount());
612 return false;
613 }
614
615 bool
616 Rock::SwapDir::validSlotId(const SlotId slotId) const
617 {
618 return 0 <= slotId && slotId < entryLimitAllowed();
619 }
620
621 void
622 Rock::SwapDir::noteFreeMapSlice(const sfileno sliceId)
623 {
624 Ipc::Mem::PageId pageId;
625 pageId.pool = index+1;
626 pageId.number = sliceId+1;
627 if (waitingForPage) {
628 *waitingForPage = pageId;
629 waitingForPage = NULL;
630 } else {
631 freeSlots->push(pageId);
632 }
633 }
634
635 // tries to open an old entry with swap_filen for reading
636 StoreIOState::Pointer
637 Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
638 {
639 if (!theFile || theFile->error()) {
640 debugs(47,4, HERE << theFile);
641 return NULL;
642 }
643
644 if (e.swap_filen < 0) {
645 debugs(47,4, HERE << e);
646 return NULL;
647 }
648
649 // Do not start I/O transaction if there are less than 10% free pages left.
650 // TODO: reserve page instead
651 if (needsDiskStrand() &&
652 Ipc::Mem::PageLevel(Ipc::Mem::PageId::ioPage) >= 0.9 * Ipc::Mem::PageLimit(Ipc::Mem::PageId::ioPage)) {
653 debugs(47, 5, HERE << "too few shared pages for IPC I/O left");
654 return NULL;
655 }
656
657 // The are two ways an entry can get swap_filen: our get() locked it for
658 // reading or our storeSwapOutStart() locked it for writing. Peeking at our
659 // locked entry is safe, but no support for reading a filling entry.
660 const Ipc::StoreMapAnchor *slot = map->peekAtReader(e.swap_filen);
661 if (!slot)
662 return NULL; // we were writing afterall
663
664 Rock::SwapDir::Pointer self(this);
665 IoState *sio = new IoState(self, &e, cbFile, cbIo, data);
666
667 sio->swap_dirn = index;
668 sio->swap_filen = e.swap_filen;
669 sio->readableAnchor_ = slot;
670 sio->file(theFile);
671
672 debugs(47,5, HERE << "dir " << index << " has old filen: " <<
673 std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
674 sio->swap_filen);
675
676 assert(slot->sameKey(static_cast<const cache_key*>(e.key)));
677 assert(slot->basics.swap_file_sz > 0);
678 assert(slot->basics.swap_file_sz == e.swap_file_sz);
679
680 return sio;
681 }
682
683 void
684 Rock::SwapDir::ioCompletedNotification()
685 {
686 if (!theFile)
687 fatalf("Rock cache_dir failed to initialize db file: %s", filePath);
688
689 if (theFile->error())
690 fatalf("Rock cache_dir at %s failed to open db file: %s", filePath,
691 xstrerror());
692
693 debugs(47, 2, "Rock cache_dir[" << index << "] limits: " <<
694 std::setw(12) << maxSize() << " disk bytes and " <<
695 std::setw(7) << map->entryLimit() << " entries");
696
697 rebuild();
698 }
699
700 void
701 Rock::SwapDir::closeCompleted()
702 {
703 theFile = NULL;
704 }
705
706 void
707 Rock::SwapDir::readCompleted(const char *buf, int rlen, int errflag, RefCount< ::ReadRequest> r)
708 {
709 ReadRequest *request = dynamic_cast<Rock::ReadRequest*>(r.getRaw());
710 assert(request);
711 IoState::Pointer sio = request->sio;
712
713 if (errflag == DISK_OK && rlen > 0)
714 sio->offset_ += rlen;
715
716 StoreIOState::STRCB *callback = sio->read.callback;
717 assert(callback);
718 sio->read.callback = NULL;
719 void *cbdata;
720 if (cbdataReferenceValidDone(sio->read.callback_data, &cbdata))
721 callback(cbdata, r->buf, rlen, sio.getRaw());
722 }
723
724 void
725 Rock::SwapDir::writeCompleted(int errflag, size_t rlen, RefCount< ::WriteRequest> r)
726 {
727 Rock::WriteRequest *request = dynamic_cast<Rock::WriteRequest*>(r.getRaw());
728 assert(request);
729 assert(request->sio != NULL);
730 IoState &sio = *request->sio;
731
732 // quit if somebody called IoState::close() while we were waiting
733 if (!sio.stillWaiting()) {
734 debugs(79, 3, "ignoring closed entry " << sio.swap_filen);
735 return;
736 }
737
738 if (errflag == DISK_OK) {
739 // do not increment sio.offset_ because we do it in sio->write()
740 if (request->isLast) {
741 // close, the entry gets the read lock
742 map->closeForWriting(sio.swap_filen, true);
743 sio.finishedWriting(errflag);
744 }
745 } else {
746 writeError(sio.swap_filen);
747 sio.finishedWriting(errflag);
748 // and hope that Core will call disconnect() to close the map entry
749 }
750 }
751
752 void
753 Rock::SwapDir::writeError(const sfileno fileno)
754 {
755 // Do not abortWriting here. The entry should keep the write lock
756 // instead of losing association with the store and confusing core.
757 map->freeEntry(fileno); // will mark as unusable, just in case
758 // All callers must also call IoState callback, to propagate the error.
759 }
760
761 bool
762 Rock::SwapDir::full() const
763 {
764 return freeSlots != NULL && !freeSlots->size();
765 }
766
767 // storeSwapOutFileClosed calls this nethod on DISK_NO_SPACE_LEFT,
768 // but it should not happen for us
769 void
770 Rock::SwapDir::diskFull()
771 {
772 debugs(20, DBG_IMPORTANT, "BUG: No space left with rock cache_dir: " <<
773 filePath);
774 }
775
776 /// purge while full(); it should be sufficient to purge just one
777 void
778 Rock::SwapDir::maintain()
779 {
780 // The Store calls this to free some db space, but there is nothing wrong
781 // with a full() db, except when db has to shrink after reconfigure, and
782 // we do not support shrinking yet (it would have to purge specific slots).
783 // TODO: Disable maintain() requests when they are pointless.
784 }
785
786 void
787 Rock::SwapDir::reference(StoreEntry &e)
788 {
789 debugs(47, 5, HERE << &e << ' ' << e.swap_dirn << ' ' << e.swap_filen);
790 if (repl && repl->Referenced)
791 repl->Referenced(repl, &e, &e.repl);
792 }
793
794 bool
795 Rock::SwapDir::dereference(StoreEntry &e, bool)
796 {
797 debugs(47, 5, HERE << &e << ' ' << e.swap_dirn << ' ' << e.swap_filen);
798 if (repl && repl->Dereferenced)
799 repl->Dereferenced(repl, &e, &e.repl);
800
801 // no need to keep e in the global store_table for us; we have our own map
802 return false;
803 }
804
805 bool
806 Rock::SwapDir::unlinkdUseful() const
807 {
808 // no entry-specific files to unlink
809 return false;
810 }
811
812 void
813 Rock::SwapDir::unlink(StoreEntry &e)
814 {
815 debugs(47, 5, HERE << e);
816 ignoreReferences(e);
817 map->freeEntry(e.swap_filen);
818 disconnect(e);
819 }
820
821 void
822 Rock::SwapDir::trackReferences(StoreEntry &e)
823 {
824 debugs(47, 5, HERE << e);
825 if (repl)
826 repl->Add(repl, &e, &e.repl);
827 }
828
829 void
830 Rock::SwapDir::ignoreReferences(StoreEntry &e)
831 {
832 debugs(47, 5, HERE << e);
833 if (repl)
834 repl->Remove(repl, &e, &e.repl);
835 }
836
837 void
838 Rock::SwapDir::statfs(StoreEntry &e) const
839 {
840 storeAppendPrintf(&e, "\n");
841 storeAppendPrintf(&e, "Maximum Size: %" PRIu64 " KB\n", maxSize() >> 10);
842 storeAppendPrintf(&e, "Current Size: %.2f KB %.2f%%\n",
843 currentSize() / 1024.0,
844 Math::doublePercent(currentSize(), maxSize()));
845
846 if (map) {
847 const int limit = map->entryLimit();
848 storeAppendPrintf(&e, "Maximum entries: %9d\n", limit);
849 if (limit > 0) {
850 const int entryCount = map->entryCount();
851 storeAppendPrintf(&e, "Current entries: %9d %.2f%%\n",
852 entryCount, (100.0 * entryCount / limit));
853
854 const unsigned int slotsFree = !freeSlots ? 0 : freeSlots->size();
855 if (slotsFree <= static_cast<const unsigned int>(limit)) {
856 const int usedSlots = limit - static_cast<const int>(slotsFree);
857 storeAppendPrintf(&e, "Used slots: %9d %.2f%%\n",
858 usedSlots, (100.0 * usedSlots / limit));
859 }
860 if (limit < 100) { // XXX: otherwise too expensive to count
861 Ipc::ReadWriteLockStats stats;
862 map->updateStats(stats);
863 stats.dump(e);
864 }
865 }
866 }
867
868 storeAppendPrintf(&e, "Pending operations: %d out of %d\n",
869 store_open_disk_fd, Config.max_open_disk_fds);
870
871 storeAppendPrintf(&e, "Flags:");
872
873 if (flags.selected)
874 storeAppendPrintf(&e, " SELECTED");
875
876 if (flags.read_only)
877 storeAppendPrintf(&e, " READ-ONLY");
878
879 storeAppendPrintf(&e, "\n");
880
881 }
882
883 const char *
884 Rock::SwapDir::inodeMapPath() const {
885 static String inodesPath;
886 inodesPath = path;
887 inodesPath.append("_inodes");
888 return inodesPath.termedBuf();
889 }
890
891 const char *
892 Rock::SwapDir::freeSlotsPath() const {
893 static String spacesPath;
894 spacesPath = path;
895 spacesPath.append("_spaces");
896 return spacesPath.termedBuf();
897 }
898
899 namespace Rock
900 {
901 RunnerRegistrationEntry(rrAfterConfig, SwapDirRr);
902 }
903
904 void Rock::SwapDirRr::create(const RunnerRegistry &)
905 {
906 Must(mapOwners.empty() && freeSlotsOwners.empty());
907 for (int i = 0; i < Config.cacheSwap.n_configured; ++i) {
908 if (const Rock::SwapDir *const sd = dynamic_cast<Rock::SwapDir *>(INDEXSD(i))) {
909 const int64_t capacity = sd->entryLimitAllowed();
910
911 SwapDir::DirMap::Owner *const mapOwner =
912 SwapDir::DirMap::Init(sd->inodeMapPath(), capacity);
913 mapOwners.push_back(mapOwner);
914
915 // XXX: remove pool id and counters from PageStack
916 Ipc::Mem::Owner<Ipc::Mem::PageStack> *const freeSlotsOwner =
917 shm_new(Ipc::Mem::PageStack)(sd->freeSlotsPath(),
918 i+1, capacity,
919 sizeof(DbCellHeader));
920 freeSlotsOwners.push_back(freeSlotsOwner);
921
922 // XXX: add method to initialize PageStack with no free pages
923 while (true) {
924 Ipc::Mem::PageId pageId;
925 if (!freeSlotsOwner->object()->pop(pageId))
926 break;
927 }
928 }
929 }
930 }
931
932 Rock::SwapDirRr::~SwapDirRr()
933 {
934 for (size_t i = 0; i < mapOwners.size(); ++i) {
935 delete mapOwners[i];
936 delete freeSlotsOwners[i];
937 }
938 }