]> git.ipfire.org Git - thirdparty/squid.git/blob - src/fs/rock/RockSwapDir.cc
Merged from upstream
[thirdparty/squid.git] / src / fs / rock / RockSwapDir.cc
1 /*
2 * DEBUG: section 47 Store Directory Routines
3 */
4
5 #include "squid.h"
6 #include "cache_cf.h"
7 #include "CollapsedForwarding.h"
8 #include "ConfigOption.h"
9 #include "DiskIO/DiskIOModule.h"
10 #include "DiskIO/DiskIOStrategy.h"
11 #include "DiskIO/ReadRequest.h"
12 #include "DiskIO/WriteRequest.h"
13 #include "fs/rock/RockIoRequests.h"
14 #include "fs/rock/RockIoState.h"
15 #include "fs/rock/RockRebuild.h"
16 #include "fs/rock/RockSwapDir.h"
17 #include "globals.h"
18 #include "ipc/mem/Pages.h"
19 #include "MemObject.h"
20 #include "Parsing.h"
21 #include "SquidConfig.h"
22 #include "SquidMath.h"
23 #include "tools.h"
24
25 #include <cstdlib>
26 #include <iomanip>
27
28 #if HAVE_SYS_STAT_H
29 #include <sys/stat.h>
30 #endif
31
32 const int64_t Rock::SwapDir::HeaderSize = 16*1024;
33
34 Rock::SwapDir::SwapDir(): ::SwapDir("rock"),
35 slotSize(HeaderSize), filePath(NULL), map(NULL), io(NULL),
36 waitingForPage(NULL)
37 {
38 }
39
40 Rock::SwapDir::~SwapDir()
41 {
42 delete io;
43 delete map;
44 safe_free(filePath);
45 }
46
47 StoreSearch *
48 Rock::SwapDir::search(String const url, HttpRequest *)
49 {
50 assert(false);
51 return NULL; // XXX: implement
52 }
53
54 void
55 Rock::SwapDir::get(String const key, STOREGETCLIENT cb, void *data)
56 {
57 ::SwapDir::get(key, cb, data);
58 }
59
60 // called when Squid core needs a StoreEntry with a given key
61 StoreEntry *
62 Rock::SwapDir::get(const cache_key *key)
63 {
64 if (!map || !theFile || !theFile->canRead())
65 return NULL;
66
67 sfileno filen;
68 const Ipc::StoreMapAnchor *const slot = map->openForReading(key, filen);
69 if (!slot)
70 return NULL;
71
72 // create a brand new store entry and initialize it with stored basics
73 StoreEntry *e = new StoreEntry();
74 anchorEntry(*e, filen, *slot);
75
76 e->hashInsert(key);
77 trackReferences(*e);
78
79 return e;
80 // the disk entry remains open for reading, protected from modifications
81 }
82
83 bool
84 Rock::SwapDir::anchorCollapsed(StoreEntry &collapsed, bool &inSync)
85 {
86 if (!map || !theFile || !theFile->canRead())
87 return false;
88
89 sfileno filen;
90 const Ipc::StoreMapAnchor *const slot = map->openForReading(
91 reinterpret_cast<cache_key*>(collapsed.key), filen);
92 if (!slot)
93 return false;
94
95 anchorEntry(collapsed, filen, *slot);
96 inSync = updateCollapsedWith(collapsed, *slot);
97 return true; // even if inSync is false
98 }
99
100 bool
101 Rock::SwapDir::updateCollapsed(StoreEntry &collapsed)
102 {
103 if (!map || !theFile || !theFile->canRead())
104 return false;
105
106 if (collapsed.swap_filen < 0) // no longer using a disk cache
107 return true;
108 assert(collapsed.swap_dirn == index);
109
110 const Ipc::StoreMapAnchor &s = map->readableEntry(collapsed.swap_filen);
111 return updateCollapsedWith(collapsed, s);
112 }
113
114 bool
115 Rock::SwapDir::updateCollapsedWith(StoreEntry &collapsed, const Ipc::StoreMapAnchor &anchor)
116 {
117 collapsed.swap_file_sz = anchor.basics.swap_file_sz;
118 return true;
119 }
120
121 void
122 Rock::SwapDir::anchorEntry(StoreEntry &e, const sfileno filen, const Ipc::StoreMapAnchor &anchor)
123 {
124 const Ipc::StoreMapAnchor::Basics &basics = anchor.basics;
125
126 e.swap_file_sz = basics.swap_file_sz;
127 e.lastref = basics.lastref;
128 e.timestamp = basics.timestamp;
129 e.expires = basics.expires;
130 e.lastmod = basics.lastmod;
131 e.refcount = basics.refcount;
132 e.flags = basics.flags;
133
134 if (anchor.complete()) {
135 e.store_status = STORE_OK;
136 e.swap_status = SWAPOUT_DONE;
137 } else {
138 e.store_status = STORE_PENDING;
139 e.swap_status = SWAPOUT_WRITING; // even though another worker writes?
140 }
141
142 e.ping_status = PING_NONE;
143
144 EBIT_CLR(e.flags, RELEASE_REQUEST);
145 EBIT_CLR(e.flags, KEY_PRIVATE);
146 EBIT_SET(e.flags, ENTRY_VALIDATED);
147
148 e.swap_dirn = index;
149 e.swap_filen = filen;
150 }
151
152 void Rock::SwapDir::disconnect(StoreEntry &e)
153 {
154 assert(e.swap_dirn == index);
155 assert(e.swap_filen >= 0);
156 // cannot have SWAPOUT_NONE entry with swap_filen >= 0
157 assert(e.swap_status != SWAPOUT_NONE);
158
159 // do not rely on e.swap_status here because there is an async delay
160 // before it switches from SWAPOUT_WRITING to SWAPOUT_DONE.
161
162 // since e has swap_filen, its slot is locked for reading and/or writing
163 // but it is difficult to know whether THIS worker is reading or writing e,
164 // especially since we may switch from writing to reading. This code relies
165 // on Rock::IoState::writeableAnchor_ being set when we locked for writing.
166 if (e.mem_obj && e.mem_obj->swapout.sio != NULL &&
167 dynamic_cast<IoState&>(*e.mem_obj->swapout.sio).writeableAnchor_) {
168 map->abortWriting(e.swap_filen);
169 e.swap_dirn = -1;
170 e.swap_filen = -1;
171 e.swap_status = SWAPOUT_NONE;
172 dynamic_cast<IoState&>(*e.mem_obj->swapout.sio).writeableAnchor_ = NULL;
173 Store::Root().transientsAbandon(e); // broadcasts after the change
174 } else {
175 map->closeForReading(e.swap_filen);
176 e.swap_dirn = -1;
177 e.swap_filen = -1;
178 e.swap_status = SWAPOUT_NONE;
179 }
180 }
181
182 uint64_t
183 Rock::SwapDir::currentSize() const
184 {
185 const uint64_t spaceSize = !freeSlots ?
186 maxSize() : (slotSize * freeSlots->size());
187 // everything that is not free is in use
188 return maxSize() - spaceSize;
189 }
190
191 uint64_t
192 Rock::SwapDir::currentCount() const
193 {
194 return map ? map->entryCount() : 0;
195 }
196
197 /// In SMP mode only the disker process reports stats to avoid
198 /// counting the same stats by multiple processes.
199 bool
200 Rock::SwapDir::doReportStat() const
201 {
202 return ::SwapDir::doReportStat() && (!UsingSmp() || IamDiskProcess());
203 }
204
205 void
206 Rock::SwapDir::swappedOut(const StoreEntry &)
207 {
208 // stats are not stored but computed when needed
209 }
210
211 int64_t
212 Rock::SwapDir::entryLimitAllowed() const
213 {
214 const int64_t eLimitLo = map ? map->entryLimit() : 0; // dynamic shrinking unsupported
215 const int64_t eWanted = (maxSize() - HeaderSize)/slotSize;
216 return min(max(eLimitLo, eWanted), entryLimitHigh());
217 }
218
219 // TODO: encapsulate as a tool
220 void
221 Rock::SwapDir::create()
222 {
223 assert(path);
224 assert(filePath);
225
226 if (UsingSmp() && !IamDiskProcess()) {
227 debugs (47,3, HERE << "disker will create in " << path);
228 return;
229 }
230
231 debugs (47,3, HERE << "creating in " << path);
232
233 struct stat dir_sb;
234 if (::stat(path, &dir_sb) == 0) {
235 struct stat file_sb;
236 if (::stat(filePath, &file_sb) == 0) {
237 debugs (47, DBG_IMPORTANT, "Skipping existing Rock db: " << filePath);
238 return;
239 }
240 // else the db file is not there or is not accessible, and we will try
241 // to create it later below, generating a detailed error on failures.
242 } else { // path does not exist or is inaccessible
243 // If path exists but is not accessible, mkdir() below will fail, and
244 // the admin should see the error and act accordingly, so there is
245 // no need to distinguish ENOENT from other possible stat() errors.
246 debugs (47, DBG_IMPORTANT, "Creating Rock db directory: " << path);
247 const int res = mkdir(path, 0700);
248 if (res != 0)
249 createError("mkdir");
250 }
251
252 debugs (47, DBG_IMPORTANT, "Creating Rock db: " << filePath);
253 const int swap = open(filePath, O_WRONLY|O_CREAT|O_TRUNC|O_BINARY, 0600);
254 if (swap < 0)
255 createError("create");
256
257 #if SLOWLY_FILL_WITH_ZEROS
258 char block[1024];
259 Must(maxSize() % sizeof(block) == 0);
260 memset(block, '\0', sizeof(block));
261
262 for (off_t offset = 0; offset < maxSize(); offset += sizeof(block)) {
263 if (write(swap, block, sizeof(block)) != sizeof(block))
264 createError("write");
265 }
266 #else
267 if (ftruncate(swap, maxSize()) != 0)
268 createError("truncate");
269
270 char header[HeaderSize];
271 memset(header, '\0', sizeof(header));
272 if (write(swap, header, sizeof(header)) != sizeof(header))
273 createError("write");
274 #endif
275
276 close(swap);
277 }
278
279 // report Rock DB creation error and exit
280 void
281 Rock::SwapDir::createError(const char *const msg)
282 {
283 debugs(47, DBG_CRITICAL, "ERROR: Failed to initialize Rock Store db in " <<
284 filePath << "; " << msg << " error: " << xstrerror());
285 fatal("Rock Store db creation error");
286 }
287
288 void
289 Rock::SwapDir::init()
290 {
291 debugs(47,2, HERE);
292
293 // XXX: SwapDirs aren't refcounted. We make IORequestor calls, which
294 // are refcounted. We up our count once to avoid implicit delete's.
295 lock();
296
297 freeSlots = shm_old(Ipc::Mem::PageStack)(freeSlotsPath());
298
299 Must(!map);
300 map = new DirMap(inodeMapPath());
301 map->cleaner = this;
302
303 const char *ioModule = needsDiskStrand() ? "IpcIo" : "Blocking";
304 if (DiskIOModule *m = DiskIOModule::Find(ioModule)) {
305 debugs(47,2, HERE << "Using DiskIO module: " << ioModule);
306 io = m->createStrategy();
307 io->init();
308 } else {
309 debugs(47, DBG_CRITICAL, "FATAL: Rock store is missing DiskIO module: " <<
310 ioModule);
311 fatal("Rock Store missing a required DiskIO module");
312 }
313
314 theFile = io->newFile(filePath);
315 theFile->configure(fileConfig);
316 theFile->open(O_RDWR, 0644, this);
317
318 // Increment early. Otherwise, if one SwapDir finishes rebuild before
319 // others start, storeRebuildComplete() will think the rebuild is over!
320 // TODO: move store_dirs_rebuilding hack to store modules that need it.
321 ++StoreController::store_dirs_rebuilding;
322 }
323
324 bool
325 Rock::SwapDir::needsDiskStrand() const
326 {
327 const bool wontEvenWorkWithoutDisker = Config.workers > 1;
328 const bool wouldWorkBetterWithDisker = DiskIOModule::Find("IpcIo");
329 return InDaemonMode() && (wontEvenWorkWithoutDisker ||
330 wouldWorkBetterWithDisker);
331 }
332
333 void
334 Rock::SwapDir::parse(int anIndex, char *aPath)
335 {
336 index = anIndex;
337
338 path = xstrdup(aPath);
339
340 // cache store is located at path/db
341 String fname(path);
342 fname.append("/rock");
343 filePath = xstrdup(fname.termedBuf());
344
345 parseSize(false);
346 parseOptions(0);
347
348 // Current openForWriting() code overwrites the old slot if needed
349 // and possible, so proactively removing old slots is probably useless.
350 assert(!repl); // repl = createRemovalPolicy(Config.replPolicy);
351
352 validateOptions();
353 }
354
355 void
356 Rock::SwapDir::reconfigure()
357 {
358 parseSize(true);
359 parseOptions(1);
360 // TODO: can we reconfigure the replacement policy (repl)?
361 validateOptions();
362 }
363
364 /// parse maximum db disk size
365 void
366 Rock::SwapDir::parseSize(const bool reconfig)
367 {
368 const int i = GetInteger();
369 if (i < 0)
370 fatal("negative Rock cache_dir size value");
371 const uint64_t new_max_size =
372 static_cast<uint64_t>(i) << 20; // MBytes to Bytes
373 if (!reconfig)
374 max_size = new_max_size;
375 else if (new_max_size != max_size) {
376 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir '" << path << "' size "
377 "cannot be changed dynamically, value left unchanged (" <<
378 (max_size >> 20) << " MB)");
379 }
380 }
381
382 ConfigOption *
383 Rock::SwapDir::getOptionTree() const
384 {
385 ConfigOptionVector *vector = dynamic_cast<ConfigOptionVector*>(::SwapDir::getOptionTree());
386 assert(vector);
387 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseSizeOption, &SwapDir::dumpSizeOption));
388 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseTimeOption, &SwapDir::dumpTimeOption));
389 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseRateOption, &SwapDir::dumpRateOption));
390 return vector;
391 }
392
393 bool
394 Rock::SwapDir::allowOptionReconfigure(const char *const option) const
395 {
396 return strcmp(option, "slot-size") != 0 &&
397 ::SwapDir::allowOptionReconfigure(option);
398 }
399
400 /// parses time-specific options; mimics ::SwapDir::optionObjectSizeParse()
401 bool
402 Rock::SwapDir::parseTimeOption(char const *option, const char *value, int reconfig)
403 {
404 // TODO: ::SwapDir or, better, Config should provide time-parsing routines,
405 // including time unit handling. Same for size and rate.
406
407 time_msec_t *storedTime;
408 if (strcmp(option, "swap-timeout") == 0)
409 storedTime = &fileConfig.ioTimeout;
410 else
411 return false;
412
413 if (!value)
414 self_destruct();
415
416 // TODO: handle time units and detect parsing errors better
417 const int64_t parsedValue = strtoll(value, NULL, 10);
418 if (parsedValue < 0) {
419 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
420 self_destruct();
421 }
422
423 const time_msec_t newTime = static_cast<time_msec_t>(parsedValue);
424
425 if (!reconfig)
426 *storedTime = newTime;
427 else if (*storedTime != newTime) {
428 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
429 << " cannot be changed dynamically, value left unchanged: " <<
430 *storedTime);
431 }
432
433 return true;
434 }
435
436 /// reports time-specific options; mimics ::SwapDir::optionObjectSizeDump()
437 void
438 Rock::SwapDir::dumpTimeOption(StoreEntry * e) const
439 {
440 if (fileConfig.ioTimeout)
441 storeAppendPrintf(e, " swap-timeout=%" PRId64,
442 static_cast<int64_t>(fileConfig.ioTimeout));
443 }
444
445 /// parses rate-specific options; mimics ::SwapDir::optionObjectSizeParse()
446 bool
447 Rock::SwapDir::parseRateOption(char const *option, const char *value, int isaReconfig)
448 {
449 int *storedRate;
450 if (strcmp(option, "max-swap-rate") == 0)
451 storedRate = &fileConfig.ioRate;
452 else
453 return false;
454
455 if (!value)
456 self_destruct();
457
458 // TODO: handle time units and detect parsing errors better
459 const int64_t parsedValue = strtoll(value, NULL, 10);
460 if (parsedValue < 0) {
461 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
462 self_destruct();
463 }
464
465 const int newRate = static_cast<int>(parsedValue);
466
467 if (newRate < 0) {
468 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << newRate);
469 self_destruct();
470 }
471
472 if (!isaReconfig)
473 *storedRate = newRate;
474 else if (*storedRate != newRate) {
475 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
476 << " cannot be changed dynamically, value left unchanged: " <<
477 *storedRate);
478 }
479
480 return true;
481 }
482
483 /// reports rate-specific options; mimics ::SwapDir::optionObjectSizeDump()
484 void
485 Rock::SwapDir::dumpRateOption(StoreEntry * e) const
486 {
487 if (fileConfig.ioRate >= 0)
488 storeAppendPrintf(e, " max-swap-rate=%d", fileConfig.ioRate);
489 }
490
491 /// parses size-specific options; mimics ::SwapDir::optionObjectSizeParse()
492 bool
493 Rock::SwapDir::parseSizeOption(char const *option, const char *value, int reconfig)
494 {
495 uint64_t *storedSize;
496 if (strcmp(option, "slot-size") == 0)
497 storedSize = &slotSize;
498 else
499 return false;
500
501 if (!value)
502 self_destruct();
503
504 // TODO: handle size units and detect parsing errors better
505 const uint64_t newSize = strtoll(value, NULL, 10);
506 if (newSize <= 0) {
507 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must be positive; got: " << newSize);
508 self_destruct();
509 }
510
511 if (newSize <= sizeof(DbCellHeader)) {
512 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must exceed " << sizeof(DbCellHeader) << "; got: " << newSize);
513 self_destruct();
514 }
515
516 if (!reconfig)
517 *storedSize = newSize;
518 else if (*storedSize != newSize) {
519 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
520 << " cannot be changed dynamically, value left unchanged: " <<
521 *storedSize);
522 }
523
524 return true;
525 }
526
527 /// reports size-specific options; mimics ::SwapDir::optionObjectSizeDump()
528 void
529 Rock::SwapDir::dumpSizeOption(StoreEntry * e) const
530 {
531 storeAppendPrintf(e, " slot-size=%" PRId64, slotSize);
532 }
533
534 /// check the results of the configuration; only level-0 debugging works here
535 void
536 Rock::SwapDir::validateOptions()
537 {
538 if (slotSize <= 0)
539 fatal("Rock store requires a positive slot-size");
540
541 const int64_t maxSizeRoundingWaste = 1024 * 1024; // size is configured in MB
542 const int64_t slotSizeRoundingWaste = slotSize;
543 const int64_t maxRoundingWaste =
544 max(maxSizeRoundingWaste, slotSizeRoundingWaste);
545 const int64_t usableDiskSize = diskOffset(entryLimitAllowed());
546 const int64_t diskWasteSize = maxSize() - usableDiskSize;
547 Must(diskWasteSize >= 0);
548
549 // warn if maximum db size is not reachable due to sfileno limit
550 if (entryLimitAllowed() == entryLimitHigh() &&
551 diskWasteSize >= maxRoundingWaste) {
552 debugs(47, DBG_CRITICAL, "Rock store cache_dir[" << index << "] '" << path << "':");
553 debugs(47, DBG_CRITICAL, "\tmaximum number of entries: " << entryLimitAllowed());
554 debugs(47, DBG_CRITICAL, "\tdb slot size: " << slotSize << " Bytes");
555 debugs(47, DBG_CRITICAL, "\tmaximum db size: " << maxSize() << " Bytes");
556 debugs(47, DBG_CRITICAL, "\tusable db size: " << usableDiskSize << " Bytes");
557 debugs(47, DBG_CRITICAL, "\tdisk space waste: " << diskWasteSize << " Bytes");
558 debugs(47, DBG_CRITICAL, "WARNING: Rock store config wastes space.");
559 }
560 }
561
562 void
563 Rock::SwapDir::rebuild()
564 {
565 //++StoreController::store_dirs_rebuilding; // see Rock::SwapDir::init()
566 AsyncJob::Start(new Rebuild(this));
567 }
568
569 bool
570 Rock::SwapDir::canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const
571 {
572 if (!::SwapDir::canStore(e, sizeof(DbCellHeader)+diskSpaceNeeded, load))
573 return false;
574
575 if (!theFile || !theFile->canWrite())
576 return false;
577
578 if (!map)
579 return false;
580
581 // Do not start I/O transaction if there are less than 10% free pages left.
582 // TODO: reserve page instead
583 if (needsDiskStrand() &&
584 Ipc::Mem::PageLevel(Ipc::Mem::PageId::ioPage) >= 0.9 * Ipc::Mem::PageLimit(Ipc::Mem::PageId::ioPage)) {
585 debugs(47, 5, HERE << "too few shared pages for IPC I/O left");
586 return false;
587 }
588
589 if (io->shedLoad())
590 return false;
591
592 load = io->load();
593 return true;
594 }
595
596 StoreIOState::Pointer
597 Rock::SwapDir::createStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
598 {
599 if (!theFile || theFile->error()) {
600 debugs(47,4, HERE << theFile);
601 return NULL;
602 }
603
604 sfileno filen;
605 Ipc::StoreMapAnchor *const slot =
606 map->openForWriting(reinterpret_cast<const cache_key *>(e.key), filen);
607 if (!slot) {
608 debugs(47, 5, HERE << "map->add failed");
609 return NULL;
610 }
611
612 assert(filen >= 0);
613 slot->set(e);
614
615 // XXX: We rely on our caller, storeSwapOutStart(), to set e.fileno.
616 // If that does not happen, the entry will not decrement the read level!
617
618 Rock::SwapDir::Pointer self(this);
619 IoState *sio = new IoState(self, &e, cbFile, cbIo, data);
620
621 sio->swap_dirn = index;
622 sio->swap_filen = filen;
623 sio->writeableAnchor_ = slot;
624
625 debugs(47,5, HERE << "dir " << index << " created new filen " <<
626 std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
627 sio->swap_filen << std::dec << " starting at " <<
628 diskOffset(sio->swap_filen));
629
630 sio->file(theFile);
631
632 trackReferences(e);
633 return sio;
634 }
635
636 int64_t
637 Rock::SwapDir::diskOffset(int filen) const
638 {
639 assert(filen >= 0);
640 return HeaderSize + slotSize*filen;
641 }
642
643 int64_t
644 Rock::SwapDir::diskOffset(Ipc::Mem::PageId &pageId) const
645 {
646 assert(pageId);
647 return diskOffset(pageId.number - 1);
648 }
649
650 int64_t
651 Rock::SwapDir::diskOffsetLimit() const
652 {
653 assert(map);
654 return diskOffset(map->entryLimit());
655 }
656
657 int
658 Rock::SwapDir::entryMaxPayloadSize() const
659 {
660 return slotSize - sizeof(DbCellHeader);
661 }
662
663 int
664 Rock::SwapDir::entriesNeeded(const int64_t objSize) const
665 {
666 return (objSize + entryMaxPayloadSize() - 1) / entryMaxPayloadSize();
667 }
668
669 bool
670 Rock::SwapDir::useFreeSlot(Ipc::Mem::PageId &pageId)
671 {
672 if (freeSlots->pop(pageId)) {
673 debugs(47, 5, "got a previously free slot: " << pageId);
674 return true;
675 }
676
677 // catch free slots delivered to noteFreeMapSlice()
678 assert(!waitingForPage);
679 waitingForPage = &pageId;
680 if (map->purgeOne()) {
681 assert(!waitingForPage); // noteFreeMapSlice() should have cleared it
682 assert(pageId.set());
683 debugs(47, 5, "got a previously busy slot: " << pageId);
684 return true;
685 }
686 assert(waitingForPage == &pageId);
687 waitingForPage = NULL;
688
689 debugs(47, 3, "cannot get a slot; entries: " << map->entryCount());
690 return false;
691 }
692
693 bool
694 Rock::SwapDir::validSlotId(const SlotId slotId) const
695 {
696 return 0 <= slotId && slotId < entryLimitAllowed();
697 }
698
699 void
700 Rock::SwapDir::noteFreeMapSlice(const sfileno sliceId)
701 {
702 Ipc::Mem::PageId pageId;
703 pageId.pool = index+1;
704 pageId.number = sliceId+1;
705 if (waitingForPage) {
706 *waitingForPage = pageId;
707 waitingForPage = NULL;
708 } else {
709 freeSlots->push(pageId);
710 }
711 }
712
713 // tries to open an old entry with swap_filen for reading
714 StoreIOState::Pointer
715 Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
716 {
717 if (!theFile || theFile->error()) {
718 debugs(47,4, HERE << theFile);
719 return NULL;
720 }
721
722 if (e.swap_filen < 0) {
723 debugs(47,4, HERE << e);
724 return NULL;
725 }
726
727 // Do not start I/O transaction if there are less than 10% free pages left.
728 // TODO: reserve page instead
729 if (needsDiskStrand() &&
730 Ipc::Mem::PageLevel(Ipc::Mem::PageId::ioPage) >= 0.9 * Ipc::Mem::PageLimit(Ipc::Mem::PageId::ioPage)) {
731 debugs(47, 5, HERE << "too few shared pages for IPC I/O left");
732 return NULL;
733 }
734
735 // The are two ways an entry can get swap_filen: our get() locked it for
736 // reading or our storeSwapOutStart() locked it for writing. Peeking at our
737 // locked entry is safe, but no support for reading the entry we swap out.
738 const Ipc::StoreMapAnchor *slot = map->peekAtReader(e.swap_filen);
739 if (!slot)
740 return NULL; // we were writing afterall
741
742 Rock::SwapDir::Pointer self(this);
743 IoState *sio = new IoState(self, &e, cbFile, cbIo, data);
744
745 sio->swap_dirn = index;
746 sio->swap_filen = e.swap_filen;
747 sio->readableAnchor_ = slot;
748 sio->file(theFile);
749
750 debugs(47,5, HERE << "dir " << index << " has old filen: " <<
751 std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
752 sio->swap_filen);
753
754 assert(slot->sameKey(static_cast<const cache_key*>(e.key)));
755 // For collapsed disk hits: e.swap_file_sz and slot->basics.swap_file_sz
756 // may still be zero and basics.swap_file_sz may grow.
757 assert(slot->basics.swap_file_sz >= e.swap_file_sz);
758
759 return sio;
760 }
761
762 void
763 Rock::SwapDir::ioCompletedNotification()
764 {
765 if (!theFile)
766 fatalf("Rock cache_dir failed to initialize db file: %s", filePath);
767
768 if (theFile->error())
769 fatalf("Rock cache_dir at %s failed to open db file: %s", filePath,
770 xstrerror());
771
772 debugs(47, 2, "Rock cache_dir[" << index << "] limits: " <<
773 std::setw(12) << maxSize() << " disk bytes and " <<
774 std::setw(7) << map->entryLimit() << " entries");
775
776 rebuild();
777 }
778
779 void
780 Rock::SwapDir::closeCompleted()
781 {
782 theFile = NULL;
783 }
784
785 void
786 Rock::SwapDir::readCompleted(const char *buf, int rlen, int errflag, RefCount< ::ReadRequest> r)
787 {
788 ReadRequest *request = dynamic_cast<Rock::ReadRequest*>(r.getRaw());
789 assert(request);
790 IoState::Pointer sio = request->sio;
791
792 if (errflag == DISK_OK && rlen > 0)
793 sio->offset_ += rlen;
794
795 sio->callReaderBack(r->buf, rlen);
796 }
797
798 void
799 Rock::SwapDir::writeCompleted(int errflag, size_t rlen, RefCount< ::WriteRequest> r)
800 {
801 Rock::WriteRequest *request = dynamic_cast<Rock::WriteRequest*>(r.getRaw());
802 assert(request);
803 assert(request->sio != NULL);
804 IoState &sio = *request->sio;
805
806 // quit if somebody called IoState::close() while we were waiting
807 if (!sio.stillWaiting()) {
808 debugs(79, 3, "ignoring closed entry " << sio.swap_filen);
809 noteFreeMapSlice(request->sidNext);
810 return;
811 }
812
813 // TODO: Fail if disk dropped one of the previous write requests.
814
815 if (errflag == DISK_OK) {
816 // do not increment sio.offset_ because we do it in sio->write()
817
818 // finalize the shared slice info after writing slice contents to disk
819 Ipc::StoreMap::Slice &slice =
820 map->writeableSlice(sio.swap_filen, request->sidCurrent);
821 slice.size = request->len - sizeof(DbCellHeader);
822 slice.next = request->sidNext;
823
824 if (request->eof) {
825 assert(sio.e);
826 assert(sio.writeableAnchor_);
827 sio.e->swap_file_sz = sio.writeableAnchor_->basics.swap_file_sz =
828 sio.offset_;
829
830 // close, the entry gets the read lock
831 map->closeForWriting(sio.swap_filen, true);
832 sio.writeableAnchor_ = NULL;
833 sio.finishedWriting(errflag);
834 }
835 } else {
836 noteFreeMapSlice(request->sidNext);
837
838 writeError(*sio.e);
839 sio.finishedWriting(errflag);
840 // and hope that Core will call disconnect() to close the map entry
841 }
842
843 CollapsedForwarding::Broadcast(*sio.e);
844 }
845
846 void
847 Rock::SwapDir::writeError(StoreEntry &e)
848 {
849 // Do not abortWriting here. The entry should keep the write lock
850 // instead of losing association with the store and confusing core.
851 map->freeEntry(e.swap_filen); // will mark as unusable, just in case
852
853 Store::Root().transientsAbandon(e);
854
855 // All callers must also call IoState callback, to propagate the error.
856 }
857
858 bool
859 Rock::SwapDir::full() const
860 {
861 return freeSlots != NULL && !freeSlots->size();
862 }
863
864 // storeSwapOutFileClosed calls this nethod on DISK_NO_SPACE_LEFT,
865 // but it should not happen for us
866 void
867 Rock::SwapDir::diskFull()
868 {
869 debugs(20, DBG_IMPORTANT, "BUG: No space left with rock cache_dir: " <<
870 filePath);
871 }
872
873 /// purge while full(); it should be sufficient to purge just one
874 void
875 Rock::SwapDir::maintain()
876 {
877 // The Store calls this to free some db space, but there is nothing wrong
878 // with a full() db, except when db has to shrink after reconfigure, and
879 // we do not support shrinking yet (it would have to purge specific slots).
880 // TODO: Disable maintain() requests when they are pointless.
881 }
882
883 void
884 Rock::SwapDir::reference(StoreEntry &e)
885 {
886 debugs(47, 5, HERE << &e << ' ' << e.swap_dirn << ' ' << e.swap_filen);
887 if (repl && repl->Referenced)
888 repl->Referenced(repl, &e, &e.repl);
889 }
890
891 bool
892 Rock::SwapDir::dereference(StoreEntry &e, bool)
893 {
894 debugs(47, 5, HERE << &e << ' ' << e.swap_dirn << ' ' << e.swap_filen);
895 if (repl && repl->Dereferenced)
896 repl->Dereferenced(repl, &e, &e.repl);
897
898 // no need to keep e in the global store_table for us; we have our own map
899 return false;
900 }
901
902 bool
903 Rock::SwapDir::unlinkdUseful() const
904 {
905 // no entry-specific files to unlink
906 return false;
907 }
908
909 void
910 Rock::SwapDir::unlink(StoreEntry &e)
911 {
912 debugs(47, 5, HERE << e);
913 ignoreReferences(e);
914 map->freeEntry(e.swap_filen);
915 disconnect(e);
916 }
917
918 void
919 Rock::SwapDir::markForUnlink(StoreEntry &e)
920 {
921 debugs(47, 5, e);
922 map->freeEntry(e.swap_filen);
923 }
924
925 void
926 Rock::SwapDir::trackReferences(StoreEntry &e)
927 {
928 debugs(47, 5, HERE << e);
929 if (repl)
930 repl->Add(repl, &e, &e.repl);
931 }
932
933 void
934 Rock::SwapDir::ignoreReferences(StoreEntry &e)
935 {
936 debugs(47, 5, HERE << e);
937 if (repl)
938 repl->Remove(repl, &e, &e.repl);
939 }
940
941 void
942 Rock::SwapDir::statfs(StoreEntry &e) const
943 {
944 storeAppendPrintf(&e, "\n");
945 storeAppendPrintf(&e, "Maximum Size: %" PRIu64 " KB\n", maxSize() >> 10);
946 storeAppendPrintf(&e, "Current Size: %.2f KB %.2f%%\n",
947 currentSize() / 1024.0,
948 Math::doublePercent(currentSize(), maxSize()));
949
950 if (map) {
951 const int limit = map->entryLimit();
952 storeAppendPrintf(&e, "Maximum entries: %9d\n", limit);
953 if (limit > 0) {
954 const int entryCount = map->entryCount();
955 storeAppendPrintf(&e, "Current entries: %9d %.2f%%\n",
956 entryCount, (100.0 * entryCount / limit));
957
958 const unsigned int slotsFree = !freeSlots ? 0 : freeSlots->size();
959 if (slotsFree <= static_cast<const unsigned int>(limit)) {
960 const int usedSlots = limit - static_cast<const int>(slotsFree);
961 storeAppendPrintf(&e, "Used slots: %9d %.2f%%\n",
962 usedSlots, (100.0 * usedSlots / limit));
963 }
964 if (limit < 100) { // XXX: otherwise too expensive to count
965 Ipc::ReadWriteLockStats stats;
966 map->updateStats(stats);
967 stats.dump(e);
968 }
969 }
970 }
971
972 storeAppendPrintf(&e, "Pending operations: %d out of %d\n",
973 store_open_disk_fd, Config.max_open_disk_fds);
974
975 storeAppendPrintf(&e, "Flags:");
976
977 if (flags.selected)
978 storeAppendPrintf(&e, " SELECTED");
979
980 if (flags.read_only)
981 storeAppendPrintf(&e, " READ-ONLY");
982
983 storeAppendPrintf(&e, "\n");
984
985 }
986
987 const char *
988 Rock::SwapDir::inodeMapPath() const
989 {
990 static String inodesPath;
991 inodesPath = path;
992 inodesPath.append("_inodes");
993 return inodesPath.termedBuf();
994 }
995
996 const char *
997 Rock::SwapDir::freeSlotsPath() const
998 {
999 static String spacesPath;
1000 spacesPath = path;
1001 spacesPath.append("_spaces");
1002 return spacesPath.termedBuf();
1003 }
1004
1005 namespace Rock
1006 {
1007 RunnerRegistrationEntry(rrAfterConfig, SwapDirRr);
1008 }
1009
1010 void Rock::SwapDirRr::create(const RunnerRegistry &)
1011 {
1012 Must(mapOwners.empty() && freeSlotsOwners.empty());
1013 for (int i = 0; i < Config.cacheSwap.n_configured; ++i) {
1014 if (const Rock::SwapDir *const sd = dynamic_cast<Rock::SwapDir *>(INDEXSD(i))) {
1015 const int64_t capacity = sd->entryLimitAllowed();
1016
1017 SwapDir::DirMap::Owner *const mapOwner =
1018 SwapDir::DirMap::Init(sd->inodeMapPath(), capacity);
1019 mapOwners.push_back(mapOwner);
1020
1021 // TODO: somehow remove pool id and counters from PageStack?
1022 Ipc::Mem::Owner<Ipc::Mem::PageStack> *const freeSlotsOwner =
1023 shm_new(Ipc::Mem::PageStack)(sd->freeSlotsPath(),
1024 i+1, capacity,
1025 sizeof(DbCellHeader));
1026 freeSlotsOwners.push_back(freeSlotsOwner);
1027
1028 // TODO: add method to initialize PageStack with no free pages
1029 while (true) {
1030 Ipc::Mem::PageId pageId;
1031 if (!freeSlotsOwner->object()->pop(pageId))
1032 break;
1033 }
1034 }
1035 }
1036 }
1037
1038 Rock::SwapDirRr::~SwapDirRr()
1039 {
1040 for (size_t i = 0; i < mapOwners.size(); ++i) {
1041 delete mapOwners[i];
1042 delete freeSlotsOwners[i];
1043 }
1044 }