]> git.ipfire.org Git - thirdparty/squid.git/blob - src/fs/rock/RockSwapDir.cc
Rock and shared memory caches fixes/improvements.
[thirdparty/squid.git] / src / fs / rock / RockSwapDir.cc
1 /*
2 * DEBUG: section 47 Store Directory Routines
3 */
4
5 #include "squid.h"
6 #include "cache_cf.h"
7 #include "CollapsedForwarding.h"
8 #include "ConfigOption.h"
9 #include "DiskIO/DiskIOModule.h"
10 #include "DiskIO/DiskIOStrategy.h"
11 #include "DiskIO/ReadRequest.h"
12 #include "DiskIO/WriteRequest.h"
13 #include "fs/rock/RockIoRequests.h"
14 #include "fs/rock/RockIoState.h"
15 #include "fs/rock/RockRebuild.h"
16 #include "fs/rock/RockSwapDir.h"
17 #include "globals.h"
18 #include "ipc/mem/Pages.h"
19 #include "MemObject.h"
20 #include "Parsing.h"
21 #include "SquidConfig.h"
22 #include "SquidMath.h"
23 #include "tools.h"
24
25 #include <cstdlib>
26 #include <iomanip>
27 #include <limits>
28
29 #if HAVE_SYS_STAT_H
30 #include <sys/stat.h>
31 #endif
32
33 const int64_t Rock::SwapDir::HeaderSize = 16*1024;
34
35 Rock::SwapDir::SwapDir(): ::SwapDir("rock"),
36 slotSize(HeaderSize), filePath(NULL), map(NULL), io(NULL),
37 waitingForPage(NULL)
38 {
39 }
40
41 Rock::SwapDir::~SwapDir()
42 {
43 delete io;
44 delete map;
45 safe_free(filePath);
46 }
47
48 StoreSearch *
49 Rock::SwapDir::search(String const url, HttpRequest *)
50 {
51 assert(false);
52 return NULL; // XXX: implement
53 }
54
55 void
56 Rock::SwapDir::get(String const key, STOREGETCLIENT cb, void *data)
57 {
58 ::SwapDir::get(key, cb, data);
59 }
60
61 // called when Squid core needs a StoreEntry with a given key
62 StoreEntry *
63 Rock::SwapDir::get(const cache_key *key)
64 {
65 if (!map || !theFile || !theFile->canRead())
66 return NULL;
67
68 sfileno filen;
69 const Ipc::StoreMapAnchor *const slot = map->openForReading(key, filen);
70 if (!slot)
71 return NULL;
72
73 // create a brand new store entry and initialize it with stored basics
74 StoreEntry *e = new StoreEntry();
75 anchorEntry(*e, filen, *slot);
76
77 e->hashInsert(key);
78 trackReferences(*e);
79
80 return e;
81 // the disk entry remains open for reading, protected from modifications
82 }
83
84 bool
85 Rock::SwapDir::anchorCollapsed(StoreEntry &collapsed, bool &inSync)
86 {
87 if (!map || !theFile || !theFile->canRead())
88 return false;
89
90 sfileno filen;
91 const Ipc::StoreMapAnchor *const slot = map->openForReading(
92 reinterpret_cast<cache_key*>(collapsed.key), filen);
93 if (!slot)
94 return false;
95
96 anchorEntry(collapsed, filen, *slot);
97 inSync = updateCollapsedWith(collapsed, *slot);
98 return true; // even if inSync is false
99 }
100
101 bool
102 Rock::SwapDir::updateCollapsed(StoreEntry &collapsed)
103 {
104 if (!map || !theFile || !theFile->canRead())
105 return false;
106
107 if (collapsed.swap_filen < 0) // no longer using a disk cache
108 return true;
109 assert(collapsed.swap_dirn == index);
110
111 const Ipc::StoreMapAnchor &s = map->readableEntry(collapsed.swap_filen);
112 return updateCollapsedWith(collapsed, s);
113 }
114
115 bool
116 Rock::SwapDir::updateCollapsedWith(StoreEntry &collapsed, const Ipc::StoreMapAnchor &anchor)
117 {
118 collapsed.swap_file_sz = anchor.basics.swap_file_sz;
119 return true;
120 }
121
122 void
123 Rock::SwapDir::anchorEntry(StoreEntry &e, const sfileno filen, const Ipc::StoreMapAnchor &anchor)
124 {
125 const Ipc::StoreMapAnchor::Basics &basics = anchor.basics;
126
127 e.swap_file_sz = basics.swap_file_sz;
128 e.lastref = basics.lastref;
129 e.timestamp = basics.timestamp;
130 e.expires = basics.expires;
131 e.lastmod = basics.lastmod;
132 e.refcount = basics.refcount;
133 e.flags = basics.flags;
134
135 if (anchor.complete()) {
136 e.store_status = STORE_OK;
137 e.swap_status = SWAPOUT_DONE;
138 } else {
139 e.store_status = STORE_PENDING;
140 e.swap_status = SWAPOUT_WRITING; // even though another worker writes?
141 }
142
143 e.ping_status = PING_NONE;
144
145 EBIT_CLR(e.flags, RELEASE_REQUEST);
146 EBIT_CLR(e.flags, KEY_PRIVATE);
147 EBIT_SET(e.flags, ENTRY_VALIDATED);
148
149 e.swap_dirn = index;
150 e.swap_filen = filen;
151 }
152
153 void Rock::SwapDir::disconnect(StoreEntry &e)
154 {
155 assert(e.swap_dirn == index);
156 assert(e.swap_filen >= 0);
157 // cannot have SWAPOUT_NONE entry with swap_filen >= 0
158 assert(e.swap_status != SWAPOUT_NONE);
159
160 // do not rely on e.swap_status here because there is an async delay
161 // before it switches from SWAPOUT_WRITING to SWAPOUT_DONE.
162
163 // since e has swap_filen, its slot is locked for reading and/or writing
164 // but it is difficult to know whether THIS worker is reading or writing e,
165 // especially since we may switch from writing to reading. This code relies
166 // on Rock::IoState::writeableAnchor_ being set when we locked for writing.
167 if (e.mem_obj && e.mem_obj->swapout.sio != NULL &&
168 dynamic_cast<IoState&>(*e.mem_obj->swapout.sio).writeableAnchor_) {
169 map->abortWriting(e.swap_filen);
170 e.swap_dirn = -1;
171 e.swap_filen = -1;
172 e.swap_status = SWAPOUT_NONE;
173 dynamic_cast<IoState&>(*e.mem_obj->swapout.sio).writeableAnchor_ = NULL;
174 Store::Root().transientsAbandon(e); // broadcasts after the change
175 } else {
176 map->closeForReading(e.swap_filen);
177 e.swap_dirn = -1;
178 e.swap_filen = -1;
179 e.swap_status = SWAPOUT_NONE;
180 }
181 }
182
183 uint64_t
184 Rock::SwapDir::currentSize() const
185 {
186 const uint64_t spaceSize = !freeSlots ?
187 maxSize() : (slotSize * freeSlots->size());
188 // everything that is not free is in use
189 return maxSize() - spaceSize;
190 }
191
192 uint64_t
193 Rock::SwapDir::currentCount() const
194 {
195 return map ? map->entryCount() : 0;
196 }
197
198 /// In SMP mode only the disker process reports stats to avoid
199 /// counting the same stats by multiple processes.
200 bool
201 Rock::SwapDir::doReportStat() const
202 {
203 return ::SwapDir::doReportStat() && (!UsingSmp() || IamDiskProcess());
204 }
205
206 void
207 Rock::SwapDir::swappedOut(const StoreEntry &)
208 {
209 // stats are not stored but computed when needed
210 }
211
212 int64_t
213 Rock::SwapDir::slotLimitAbsolute() const
214 {
215 // the max value is an invalid one; all values must be below the limit
216 assert(std::numeric_limits<Ipc::StoreMapSliceId>::max() ==
217 std::numeric_limits<SlotId>::max());
218 return std::numeric_limits<SlotId>::max();
219 }
220
221 int64_t
222 Rock::SwapDir::slotLimitActual() const
223 {
224 const int64_t sWanted = (maxSize() - HeaderSize)/slotSize;
225 const int64_t sLimitLo = map ? map->sliceLimit() : 0; // dynamic shrinking unsupported
226 const int64_t sLimitHi = slotLimitAbsolute();
227 return min(max(sLimitLo, sWanted), sLimitHi);
228 }
229
230 int64_t
231 Rock::SwapDir::entryLimitActual() const
232 {
233 return min(slotLimitActual(), entryLimitAbsolute());
234 }
235
236 // TODO: encapsulate as a tool
237 void
238 Rock::SwapDir::create()
239 {
240 assert(path);
241 assert(filePath);
242
243 if (UsingSmp() && !IamDiskProcess()) {
244 debugs (47,3, HERE << "disker will create in " << path);
245 return;
246 }
247
248 debugs (47,3, HERE << "creating in " << path);
249
250 struct stat dir_sb;
251 if (::stat(path, &dir_sb) == 0) {
252 struct stat file_sb;
253 if (::stat(filePath, &file_sb) == 0) {
254 debugs (47, DBG_IMPORTANT, "Skipping existing Rock db: " << filePath);
255 return;
256 }
257 // else the db file is not there or is not accessible, and we will try
258 // to create it later below, generating a detailed error on failures.
259 } else { // path does not exist or is inaccessible
260 // If path exists but is not accessible, mkdir() below will fail, and
261 // the admin should see the error and act accordingly, so there is
262 // no need to distinguish ENOENT from other possible stat() errors.
263 debugs (47, DBG_IMPORTANT, "Creating Rock db directory: " << path);
264 const int res = mkdir(path, 0700);
265 if (res != 0)
266 createError("mkdir");
267 }
268
269 debugs (47, DBG_IMPORTANT, "Creating Rock db: " << filePath);
270 const int swap = open(filePath, O_WRONLY|O_CREAT|O_TRUNC|O_BINARY, 0600);
271 if (swap < 0)
272 createError("create");
273
274 #if SLOWLY_FILL_WITH_ZEROS
275 char block[1024];
276 Must(maxSize() % sizeof(block) == 0);
277 memset(block, '\0', sizeof(block));
278
279 for (off_t offset = 0; offset < maxSize(); offset += sizeof(block)) {
280 if (write(swap, block, sizeof(block)) != sizeof(block))
281 createError("write");
282 }
283 #else
284 if (ftruncate(swap, maxSize()) != 0)
285 createError("truncate");
286
287 char header[HeaderSize];
288 memset(header, '\0', sizeof(header));
289 if (write(swap, header, sizeof(header)) != sizeof(header))
290 createError("write");
291 #endif
292
293 close(swap);
294 }
295
296 // report Rock DB creation error and exit
297 void
298 Rock::SwapDir::createError(const char *const msg)
299 {
300 debugs(47, DBG_CRITICAL, "ERROR: Failed to initialize Rock Store db in " <<
301 filePath << "; " << msg << " error: " << xstrerror());
302 fatal("Rock Store db creation error");
303 }
304
305 void
306 Rock::SwapDir::init()
307 {
308 debugs(47,2, HERE);
309
310 // XXX: SwapDirs aren't refcounted. We make IORequestor calls, which
311 // are refcounted. We up our count once to avoid implicit delete's.
312 lock();
313
314 freeSlots = shm_old(Ipc::Mem::PageStack)(freeSlotsPath());
315
316 Must(!map);
317 map = new DirMap(inodeMapPath());
318 map->cleaner = this;
319
320 const char *ioModule = needsDiskStrand() ? "IpcIo" : "Blocking";
321 if (DiskIOModule *m = DiskIOModule::Find(ioModule)) {
322 debugs(47,2, HERE << "Using DiskIO module: " << ioModule);
323 io = m->createStrategy();
324 io->init();
325 } else {
326 debugs(47, DBG_CRITICAL, "FATAL: Rock store is missing DiskIO module: " <<
327 ioModule);
328 fatal("Rock Store missing a required DiskIO module");
329 }
330
331 theFile = io->newFile(filePath);
332 theFile->configure(fileConfig);
333 theFile->open(O_RDWR, 0644, this);
334
335 // Increment early. Otherwise, if one SwapDir finishes rebuild before
336 // others start, storeRebuildComplete() will think the rebuild is over!
337 // TODO: move store_dirs_rebuilding hack to store modules that need it.
338 ++StoreController::store_dirs_rebuilding;
339 }
340
341 bool
342 Rock::SwapDir::needsDiskStrand() const
343 {
344 const bool wontEvenWorkWithoutDisker = Config.workers > 1;
345 const bool wouldWorkBetterWithDisker = DiskIOModule::Find("IpcIo");
346 return InDaemonMode() && (wontEvenWorkWithoutDisker ||
347 wouldWorkBetterWithDisker);
348 }
349
350 void
351 Rock::SwapDir::parse(int anIndex, char *aPath)
352 {
353 index = anIndex;
354
355 path = xstrdup(aPath);
356
357 // cache store is located at path/db
358 String fname(path);
359 fname.append("/rock");
360 filePath = xstrdup(fname.termedBuf());
361
362 parseSize(false);
363 parseOptions(0);
364
365 // Current openForWriting() code overwrites the old slot if needed
366 // and possible, so proactively removing old slots is probably useless.
367 assert(!repl); // repl = createRemovalPolicy(Config.replPolicy);
368
369 validateOptions();
370 }
371
372 void
373 Rock::SwapDir::reconfigure()
374 {
375 parseSize(true);
376 parseOptions(1);
377 // TODO: can we reconfigure the replacement policy (repl)?
378 validateOptions();
379 }
380
381 /// parse maximum db disk size
382 void
383 Rock::SwapDir::parseSize(const bool reconfig)
384 {
385 const int i = GetInteger();
386 if (i < 0)
387 fatal("negative Rock cache_dir size value");
388 const uint64_t new_max_size =
389 static_cast<uint64_t>(i) << 20; // MBytes to Bytes
390 if (!reconfig)
391 max_size = new_max_size;
392 else if (new_max_size != max_size) {
393 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir '" << path << "' size "
394 "cannot be changed dynamically, value left unchanged (" <<
395 (max_size >> 20) << " MB)");
396 }
397 }
398
399 ConfigOption *
400 Rock::SwapDir::getOptionTree() const
401 {
402 ConfigOptionVector *vector = dynamic_cast<ConfigOptionVector*>(::SwapDir::getOptionTree());
403 assert(vector);
404 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseSizeOption, &SwapDir::dumpSizeOption));
405 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseTimeOption, &SwapDir::dumpTimeOption));
406 vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseRateOption, &SwapDir::dumpRateOption));
407 return vector;
408 }
409
410 bool
411 Rock::SwapDir::allowOptionReconfigure(const char *const option) const
412 {
413 return strcmp(option, "slot-size") != 0 &&
414 ::SwapDir::allowOptionReconfigure(option);
415 }
416
417 /// parses time-specific options; mimics ::SwapDir::optionObjectSizeParse()
418 bool
419 Rock::SwapDir::parseTimeOption(char const *option, const char *value, int reconfig)
420 {
421 // TODO: ::SwapDir or, better, Config should provide time-parsing routines,
422 // including time unit handling. Same for size and rate.
423
424 time_msec_t *storedTime;
425 if (strcmp(option, "swap-timeout") == 0)
426 storedTime = &fileConfig.ioTimeout;
427 else
428 return false;
429
430 if (!value)
431 self_destruct();
432
433 // TODO: handle time units and detect parsing errors better
434 const int64_t parsedValue = strtoll(value, NULL, 10);
435 if (parsedValue < 0) {
436 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
437 self_destruct();
438 }
439
440 const time_msec_t newTime = static_cast<time_msec_t>(parsedValue);
441
442 if (!reconfig)
443 *storedTime = newTime;
444 else if (*storedTime != newTime) {
445 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
446 << " cannot be changed dynamically, value left unchanged: " <<
447 *storedTime);
448 }
449
450 return true;
451 }
452
453 /// reports time-specific options; mimics ::SwapDir::optionObjectSizeDump()
454 void
455 Rock::SwapDir::dumpTimeOption(StoreEntry * e) const
456 {
457 if (fileConfig.ioTimeout)
458 storeAppendPrintf(e, " swap-timeout=%" PRId64,
459 static_cast<int64_t>(fileConfig.ioTimeout));
460 }
461
462 /// parses rate-specific options; mimics ::SwapDir::optionObjectSizeParse()
463 bool
464 Rock::SwapDir::parseRateOption(char const *option, const char *value, int isaReconfig)
465 {
466 int *storedRate;
467 if (strcmp(option, "max-swap-rate") == 0)
468 storedRate = &fileConfig.ioRate;
469 else
470 return false;
471
472 if (!value)
473 self_destruct();
474
475 // TODO: handle time units and detect parsing errors better
476 const int64_t parsedValue = strtoll(value, NULL, 10);
477 if (parsedValue < 0) {
478 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
479 self_destruct();
480 }
481
482 const int newRate = static_cast<int>(parsedValue);
483
484 if (newRate < 0) {
485 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << newRate);
486 self_destruct();
487 }
488
489 if (!isaReconfig)
490 *storedRate = newRate;
491 else if (*storedRate != newRate) {
492 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
493 << " cannot be changed dynamically, value left unchanged: " <<
494 *storedRate);
495 }
496
497 return true;
498 }
499
500 /// reports rate-specific options; mimics ::SwapDir::optionObjectSizeDump()
501 void
502 Rock::SwapDir::dumpRateOption(StoreEntry * e) const
503 {
504 if (fileConfig.ioRate >= 0)
505 storeAppendPrintf(e, " max-swap-rate=%d", fileConfig.ioRate);
506 }
507
508 /// parses size-specific options; mimics ::SwapDir::optionObjectSizeParse()
509 bool
510 Rock::SwapDir::parseSizeOption(char const *option, const char *value, int reconfig)
511 {
512 uint64_t *storedSize;
513 if (strcmp(option, "slot-size") == 0)
514 storedSize = &slotSize;
515 else
516 return false;
517
518 if (!value)
519 self_destruct();
520
521 // TODO: handle size units and detect parsing errors better
522 const uint64_t newSize = strtoll(value, NULL, 10);
523 if (newSize <= 0) {
524 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must be positive; got: " << newSize);
525 self_destruct();
526 }
527
528 if (newSize <= sizeof(DbCellHeader)) {
529 debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must exceed " << sizeof(DbCellHeader) << "; got: " << newSize);
530 self_destruct();
531 }
532
533 if (!reconfig)
534 *storedSize = newSize;
535 else if (*storedSize != newSize) {
536 debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
537 << " cannot be changed dynamically, value left unchanged: " <<
538 *storedSize);
539 }
540
541 return true;
542 }
543
544 /// reports size-specific options; mimics ::SwapDir::optionObjectSizeDump()
545 void
546 Rock::SwapDir::dumpSizeOption(StoreEntry * e) const
547 {
548 storeAppendPrintf(e, " slot-size=%" PRId64, slotSize);
549 }
550
551 /// check the results of the configuration; only level-0 debugging works here
552 void
553 Rock::SwapDir::validateOptions()
554 {
555 if (slotSize <= 0)
556 fatal("Rock store requires a positive slot-size");
557
558 const int64_t maxSizeRoundingWaste = 1024 * 1024; // size is configured in MB
559 const int64_t slotSizeRoundingWaste = slotSize;
560 const int64_t maxRoundingWaste =
561 max(maxSizeRoundingWaste, slotSizeRoundingWaste);
562
563 // an entry consumes at least one slot; round up to reduce false warnings
564 const int64_t blockSize = static_cast<int64_t>(slotSize);
565 const int64_t maxObjSize = max(blockSize,
566 ((maxObjectSize()+blockSize-1)/blockSize)*blockSize);
567
568 // Does the "sfileno*max-size" limit match configured db capacity?
569 const double entriesMayOccupy = entryLimitAbsolute()*static_cast<double>(maxObjSize);
570 if (entriesMayOccupy + maxRoundingWaste < maxSize()) {
571 const int64_t diskWasteSize = maxSize() - static_cast<int64_t>(entriesMayOccupy);
572 debugs(47, DBG_CRITICAL, "WARNING: Rock cache_dir " << path << " wastes disk space due to entry limits:" <<
573 "\n\tconfigured db capacity: " << maxSize() << " bytes" <<
574 "\n\tconfigured db slot size: " << slotSize << " bytes" <<
575 "\n\tconfigured maximum entry size: " << maxObjectSize() << " bytes" <<
576 "\n\tmaximum number of cache_dir entries supported by Squid: " << entryLimitAbsolute() <<
577 "\n\tdisk space all entries may use: " << entriesMayOccupy << " bytes" <<
578 "\n\tdisk space wasted: " << diskWasteSize << " bytes");
579 }
580
581 // Does the "absolute slot count" limit match configured db capacity?
582 const double slotsMayOccupy = slotLimitAbsolute()*static_cast<double>(slotSize);
583 if (slotsMayOccupy + maxRoundingWaste < maxSize()) {
584 const int64_t diskWasteSize = maxSize() - static_cast<int64_t>(entriesMayOccupy);
585 debugs(47, DBG_CRITICAL, "WARNING: Rock cache_dir " << path << " wastes disk space due to slot limits:" <<
586 "\n\tconfigured db capacity: " << maxSize() << " bytes" <<
587 "\n\tconfigured db slot size: " << slotSize << " bytes" <<
588 "\n\tmaximum number of rock cache_dir slots supported by Squid: " << slotLimitAbsolute() <<
589 "\n\tdisk space all slots may use: " << slotsMayOccupy << " bytes" <<
590 "\n\tdisk space wasted: " << diskWasteSize << " bytes");
591 }
592 }
593
594 void
595 Rock::SwapDir::rebuild()
596 {
597 //++StoreController::store_dirs_rebuilding; // see Rock::SwapDir::init()
598 AsyncJob::Start(new Rebuild(this));
599 }
600
601 bool
602 Rock::SwapDir::canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const
603 {
604 if (!::SwapDir::canStore(e, sizeof(DbCellHeader)+diskSpaceNeeded, load))
605 return false;
606
607 if (!theFile || !theFile->canWrite())
608 return false;
609
610 if (!map)
611 return false;
612
613 // Do not start I/O transaction if there are less than 10% free pages left.
614 // TODO: reserve page instead
615 if (needsDiskStrand() &&
616 Ipc::Mem::PageLevel(Ipc::Mem::PageId::ioPage) >= 0.9 * Ipc::Mem::PageLimit(Ipc::Mem::PageId::ioPage)) {
617 debugs(47, 5, HERE << "too few shared pages for IPC I/O left");
618 return false;
619 }
620
621 if (io->shedLoad())
622 return false;
623
624 load = io->load();
625 return true;
626 }
627
628 StoreIOState::Pointer
629 Rock::SwapDir::createStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
630 {
631 if (!theFile || theFile->error()) {
632 debugs(47,4, HERE << theFile);
633 return NULL;
634 }
635
636 sfileno filen;
637 Ipc::StoreMapAnchor *const slot =
638 map->openForWriting(reinterpret_cast<const cache_key *>(e.key), filen);
639 if (!slot) {
640 debugs(47, 5, HERE << "map->add failed");
641 return NULL;
642 }
643
644 assert(filen >= 0);
645 slot->set(e);
646
647 // XXX: We rely on our caller, storeSwapOutStart(), to set e.fileno.
648 // If that does not happen, the entry will not decrement the read level!
649
650 Rock::SwapDir::Pointer self(this);
651 IoState *sio = new IoState(self, &e, cbFile, cbIo, data);
652
653 sio->swap_dirn = index;
654 sio->swap_filen = filen;
655 sio->writeableAnchor_ = slot;
656
657 debugs(47,5, HERE << "dir " << index << " created new filen " <<
658 std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
659 sio->swap_filen << std::dec << " starting at " <<
660 diskOffset(sio->swap_filen));
661
662 sio->file(theFile);
663
664 trackReferences(e);
665 return sio;
666 }
667
668 int64_t
669 Rock::SwapDir::diskOffset(const SlotId sid) const
670 {
671 assert(sid >= 0);
672 return HeaderSize + slotSize*sid;
673 }
674
675 int64_t
676 Rock::SwapDir::diskOffset(Ipc::Mem::PageId &pageId) const
677 {
678 assert(pageId);
679 return diskOffset(pageId.number - 1);
680 }
681
682 int64_t
683 Rock::SwapDir::diskOffsetLimit() const
684 {
685 assert(map);
686 return diskOffset(map->sliceLimit());
687 }
688
689 bool
690 Rock::SwapDir::useFreeSlot(Ipc::Mem::PageId &pageId)
691 {
692 if (freeSlots->pop(pageId)) {
693 debugs(47, 5, "got a previously free slot: " << pageId);
694 return true;
695 }
696
697 // catch free slots delivered to noteFreeMapSlice()
698 assert(!waitingForPage);
699 waitingForPage = &pageId;
700 if (map->purgeOne()) {
701 assert(!waitingForPage); // noteFreeMapSlice() should have cleared it
702 assert(pageId.set());
703 debugs(47, 5, "got a previously busy slot: " << pageId);
704 return true;
705 }
706 assert(waitingForPage == &pageId);
707 waitingForPage = NULL;
708
709 debugs(47, 3, "cannot get a slot; entries: " << map->entryCount());
710 return false;
711 }
712
713 bool
714 Rock::SwapDir::validSlotId(const SlotId slotId) const
715 {
716 return 0 <= slotId && slotId < slotLimitActual();
717 }
718
719 void
720 Rock::SwapDir::noteFreeMapSlice(const Ipc::StoreMapSliceId sliceId)
721 {
722 Ipc::Mem::PageId pageId;
723 pageId.pool = index+1;
724 pageId.number = sliceId+1;
725 if (waitingForPage) {
726 *waitingForPage = pageId;
727 waitingForPage = NULL;
728 } else {
729 freeSlots->push(pageId);
730 }
731 }
732
733 // tries to open an old entry with swap_filen for reading
734 StoreIOState::Pointer
735 Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
736 {
737 if (!theFile || theFile->error()) {
738 debugs(47,4, HERE << theFile);
739 return NULL;
740 }
741
742 if (e.swap_filen < 0) {
743 debugs(47,4, HERE << e);
744 return NULL;
745 }
746
747 // Do not start I/O transaction if there are less than 10% free pages left.
748 // TODO: reserve page instead
749 if (needsDiskStrand() &&
750 Ipc::Mem::PageLevel(Ipc::Mem::PageId::ioPage) >= 0.9 * Ipc::Mem::PageLimit(Ipc::Mem::PageId::ioPage)) {
751 debugs(47, 5, HERE << "too few shared pages for IPC I/O left");
752 return NULL;
753 }
754
755 // The are two ways an entry can get swap_filen: our get() locked it for
756 // reading or our storeSwapOutStart() locked it for writing. Peeking at our
757 // locked entry is safe, but no support for reading the entry we swap out.
758 const Ipc::StoreMapAnchor *slot = map->peekAtReader(e.swap_filen);
759 if (!slot)
760 return NULL; // we were writing afterall
761
762 Rock::SwapDir::Pointer self(this);
763 IoState *sio = new IoState(self, &e, cbFile, cbIo, data);
764
765 sio->swap_dirn = index;
766 sio->swap_filen = e.swap_filen;
767 sio->readableAnchor_ = slot;
768 sio->file(theFile);
769
770 debugs(47,5, HERE << "dir " << index << " has old filen: " <<
771 std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
772 sio->swap_filen);
773
774 assert(slot->sameKey(static_cast<const cache_key*>(e.key)));
775 // For collapsed disk hits: e.swap_file_sz and slot->basics.swap_file_sz
776 // may still be zero and basics.swap_file_sz may grow.
777 assert(slot->basics.swap_file_sz >= e.swap_file_sz);
778
779 return sio;
780 }
781
782 void
783 Rock::SwapDir::ioCompletedNotification()
784 {
785 if (!theFile)
786 fatalf("Rock cache_dir failed to initialize db file: %s", filePath);
787
788 if (theFile->error())
789 fatalf("Rock cache_dir at %s failed to open db file: %s", filePath,
790 xstrerror());
791
792 debugs(47, 2, "Rock cache_dir[" << index << "] limits: " <<
793 std::setw(12) << maxSize() << " disk bytes, " <<
794 std::setw(7) << map->entryLimit() << " entries, and " <<
795 std::setw(7) << map->sliceLimit() << " slots");
796
797 rebuild();
798 }
799
800 void
801 Rock::SwapDir::closeCompleted()
802 {
803 theFile = NULL;
804 }
805
806 void
807 Rock::SwapDir::readCompleted(const char *buf, int rlen, int errflag, RefCount< ::ReadRequest> r)
808 {
809 ReadRequest *request = dynamic_cast<Rock::ReadRequest*>(r.getRaw());
810 assert(request);
811 IoState::Pointer sio = request->sio;
812
813 if (errflag == DISK_OK && rlen > 0)
814 sio->offset_ += rlen;
815
816 sio->callReaderBack(r->buf, rlen);
817 }
818
819 void
820 Rock::SwapDir::writeCompleted(int errflag, size_t rlen, RefCount< ::WriteRequest> r)
821 {
822 Rock::WriteRequest *request = dynamic_cast<Rock::WriteRequest*>(r.getRaw());
823 assert(request);
824 assert(request->sio != NULL);
825 IoState &sio = *request->sio;
826
827 // quit if somebody called IoState::close() while we were waiting
828 if (!sio.stillWaiting()) {
829 debugs(79, 3, "ignoring closed entry " << sio.swap_filen);
830 noteFreeMapSlice(request->sidNext);
831 return;
832 }
833
834 // TODO: Fail if disk dropped one of the previous write requests.
835
836 if (errflag == DISK_OK) {
837 // do not increment sio.offset_ because we do it in sio->write()
838
839 // finalize the shared slice info after writing slice contents to disk
840 Ipc::StoreMap::Slice &slice =
841 map->writeableSlice(sio.swap_filen, request->sidCurrent);
842 slice.size = request->len - sizeof(DbCellHeader);
843 slice.next = request->sidNext;
844
845 if (request->eof) {
846 assert(sio.e);
847 assert(sio.writeableAnchor_);
848 sio.e->swap_file_sz = sio.writeableAnchor_->basics.swap_file_sz =
849 sio.offset_;
850
851 // close, the entry gets the read lock
852 map->closeForWriting(sio.swap_filen, true);
853 sio.writeableAnchor_ = NULL;
854 sio.finishedWriting(errflag);
855 }
856 } else {
857 noteFreeMapSlice(request->sidNext);
858
859 writeError(*sio.e);
860 sio.finishedWriting(errflag);
861 // and hope that Core will call disconnect() to close the map entry
862 }
863
864 CollapsedForwarding::Broadcast(*sio.e);
865 }
866
867 void
868 Rock::SwapDir::writeError(StoreEntry &e)
869 {
870 // Do not abortWriting here. The entry should keep the write lock
871 // instead of losing association with the store and confusing core.
872 map->freeEntry(e.swap_filen); // will mark as unusable, just in case
873
874 Store::Root().transientsAbandon(e);
875
876 // All callers must also call IoState callback, to propagate the error.
877 }
878
879 bool
880 Rock::SwapDir::full() const
881 {
882 return freeSlots != NULL && !freeSlots->size();
883 }
884
885 // storeSwapOutFileClosed calls this nethod on DISK_NO_SPACE_LEFT,
886 // but it should not happen for us
887 void
888 Rock::SwapDir::diskFull()
889 {
890 debugs(20, DBG_IMPORTANT, "BUG: No space left with rock cache_dir: " <<
891 filePath);
892 }
893
894 /// purge while full(); it should be sufficient to purge just one
895 void
896 Rock::SwapDir::maintain()
897 {
898 // The Store calls this to free some db space, but there is nothing wrong
899 // with a full() db, except when db has to shrink after reconfigure, and
900 // we do not support shrinking yet (it would have to purge specific slots).
901 // TODO: Disable maintain() requests when they are pointless.
902 }
903
904 void
905 Rock::SwapDir::reference(StoreEntry &e)
906 {
907 debugs(47, 5, HERE << &e << ' ' << e.swap_dirn << ' ' << e.swap_filen);
908 if (repl && repl->Referenced)
909 repl->Referenced(repl, &e, &e.repl);
910 }
911
912 bool
913 Rock::SwapDir::dereference(StoreEntry &e, bool)
914 {
915 debugs(47, 5, HERE << &e << ' ' << e.swap_dirn << ' ' << e.swap_filen);
916 if (repl && repl->Dereferenced)
917 repl->Dereferenced(repl, &e, &e.repl);
918
919 // no need to keep e in the global store_table for us; we have our own map
920 return false;
921 }
922
923 bool
924 Rock::SwapDir::unlinkdUseful() const
925 {
926 // no entry-specific files to unlink
927 return false;
928 }
929
930 void
931 Rock::SwapDir::unlink(StoreEntry &e)
932 {
933 debugs(47, 5, HERE << e);
934 ignoreReferences(e);
935 map->freeEntry(e.swap_filen);
936 disconnect(e);
937 }
938
939 void
940 Rock::SwapDir::markForUnlink(StoreEntry &e)
941 {
942 debugs(47, 5, e);
943 map->freeEntry(e.swap_filen);
944 }
945
946 void
947 Rock::SwapDir::trackReferences(StoreEntry &e)
948 {
949 debugs(47, 5, HERE << e);
950 if (repl)
951 repl->Add(repl, &e, &e.repl);
952 }
953
954 void
955 Rock::SwapDir::ignoreReferences(StoreEntry &e)
956 {
957 debugs(47, 5, HERE << e);
958 if (repl)
959 repl->Remove(repl, &e, &e.repl);
960 }
961
962 void
963 Rock::SwapDir::statfs(StoreEntry &e) const
964 {
965 storeAppendPrintf(&e, "\n");
966 storeAppendPrintf(&e, "Maximum Size: %" PRIu64 " KB\n", maxSize() >> 10);
967 storeAppendPrintf(&e, "Current Size: %.2f KB %.2f%%\n",
968 currentSize() / 1024.0,
969 Math::doublePercent(currentSize(), maxSize()));
970
971 const int entryLimit = entryLimitActual();
972 const int slotLimit = slotLimitActual();
973 storeAppendPrintf(&e, "Maximum entries: %9d\n", entryLimit);
974 if (map && entryLimit > 0) {
975 const int entryCount = map->entryCount();
976 storeAppendPrintf(&e, "Current entries: %9d %.2f%%\n",
977 entryCount, (100.0 * entryCount / entryLimit));
978 }
979
980 storeAppendPrintf(&e, "Maximum slots: %9d\n", slotLimit);
981 if (map && slotLimit > 0) {
982 const unsigned int slotsFree = !freeSlots ? 0 : freeSlots->size();
983 if (slotsFree <= static_cast<const unsigned int>(slotLimit)) {
984 const int usedSlots = slotLimit - static_cast<const int>(slotsFree);
985 storeAppendPrintf(&e, "Used slots: %9d %.2f%%\n",
986 usedSlots, (100.0 * usedSlots / slotLimit));
987 }
988 if (slotLimit < 100) { // XXX: otherwise too expensive to count
989 Ipc::ReadWriteLockStats stats;
990 map->updateStats(stats);
991 stats.dump(e);
992 }
993 }
994
995 storeAppendPrintf(&e, "Pending operations: %d out of %d\n",
996 store_open_disk_fd, Config.max_open_disk_fds);
997
998 storeAppendPrintf(&e, "Flags:");
999
1000 if (flags.selected)
1001 storeAppendPrintf(&e, " SELECTED");
1002
1003 if (flags.read_only)
1004 storeAppendPrintf(&e, " READ-ONLY");
1005
1006 storeAppendPrintf(&e, "\n");
1007
1008 }
1009
1010 SBuf
1011 Rock::SwapDir::inodeMapPath() const
1012 {
1013 return Ipc::Mem::Segment::Name(SBuf(path), "map");
1014 }
1015
1016 const char *
1017 Rock::SwapDir::freeSlotsPath() const
1018 {
1019 static String spacesPath;
1020 spacesPath = path;
1021 spacesPath.append("_spaces");
1022 return spacesPath.termedBuf();
1023 }
1024
1025 namespace Rock
1026 {
1027 RunnerRegistrationEntry(SwapDirRr);
1028 }
1029
1030 void Rock::SwapDirRr::create()
1031 {
1032 Must(mapOwners.empty() && freeSlotsOwners.empty());
1033 for (int i = 0; i < Config.cacheSwap.n_configured; ++i) {
1034 if (const Rock::SwapDir *const sd = dynamic_cast<Rock::SwapDir *>(INDEXSD(i))) {
1035 const int64_t capacity = sd->slotLimitActual();
1036
1037 SwapDir::DirMap::Owner *const mapOwner =
1038 SwapDir::DirMap::Init(sd->inodeMapPath(), capacity);
1039 mapOwners.push_back(mapOwner);
1040
1041 // TODO: somehow remove pool id and counters from PageStack?
1042 Ipc::Mem::Owner<Ipc::Mem::PageStack> *const freeSlotsOwner =
1043 shm_new(Ipc::Mem::PageStack)(sd->freeSlotsPath(),
1044 i+1, capacity, 0);
1045 freeSlotsOwners.push_back(freeSlotsOwner);
1046
1047 // TODO: add method to initialize PageStack with no free pages
1048 while (true) {
1049 Ipc::Mem::PageId pageId;
1050 if (!freeSlotsOwner->object()->pop(pageId))
1051 break;
1052 }
1053 }
1054 }
1055 }
1056
1057 Rock::SwapDirRr::~SwapDirRr()
1058 {
1059 for (size_t i = 0; i < mapOwners.size(); ++i) {
1060 delete mapOwners[i];
1061 delete freeSlotsOwners[i];
1062 }
1063 }